#![warn(missing_docs)]
#![doc(html_root_url = "https://docs.rs/r2d2/0.8")]
use log::error;
use parking_lot::{Condvar, Mutex, MutexGuard};
use std::cmp;
use std::error;
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
pub use crate::config::Builder;
use crate::config::Config;
use crate::event::{AcquireEvent, CheckinEvent, CheckoutEvent, ReleaseEvent, TimeoutEvent};
pub use crate::event::{HandleEvent, NopEventHandler};
mod config;
pub mod event;
#[cfg(test)]
mod test;
static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
pub trait ManageConnection: Send + Sync + 'static {
type Connection: Send + 'static;
type Error: error::Error + 'static;
fn connect(&self) -> Result<Self::Connection, Self::Error>;
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
fn has_broken(&self, conn: &mut Self::Connection) -> bool;
}
pub trait HandleError<E>: fmt::Debug + Send + Sync + 'static {
fn handle_error(&self, error: E);
}
#[derive(Copy, Clone, Debug)]
pub struct NopErrorHandler;
impl<E> HandleError<E> for NopErrorHandler {
fn handle_error(&self, _: E) {}
}
#[derive(Copy, Clone, Debug)]
pub struct LoggingErrorHandler;
impl<E> HandleError<E> for LoggingErrorHandler
where
E: error::Error,
{
fn handle_error(&self, error: E) {
error!("{}", error);
}
}
pub trait CustomizeConnection<C, E>: fmt::Debug + Send + Sync + 'static {
#[allow(unused_variables)]
fn on_acquire(&self, conn: &mut C) -> Result<(), E> {
Ok(())
}
#[allow(unused_variables)]
fn on_release(&self, conn: C) {}
}
#[derive(Copy, Clone, Debug)]
pub struct NopConnectionCustomizer;
impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
struct Conn<C> {
conn: C,
birth: Instant,
id: u64,
}
struct IdleConn<C> {
conn: Conn<C>,
idle_start: Instant,
}
struct PoolInternals<C> {
conns: Vec<IdleConn<C>>,
num_conns: u32,
pending_conns: u32,
last_error: Option<String>,
}
struct SharedPool<M>
where
M: ManageConnection,
{
config: Config<M::Connection, M::Error>,
manager: M,
internals: Mutex<PoolInternals<M::Connection>>,
cond: Condvar,
}
fn drop_conns<M>(
shared: &Arc<SharedPool<M>>,
mut internals: MutexGuard<PoolInternals<M::Connection>>,
conns: Vec<Conn<M::Connection>>,
) where
M: ManageConnection,
{
internals.num_conns -= conns.len() as u32;
establish_idle_connections(shared, &mut internals);
drop(internals);
for conn in conns {
let event = ReleaseEvent {
id: conn.id,
age: conn.birth.elapsed(),
};
shared.config.event_handler.handle_release(event);
shared.config.connection_customizer.on_release(conn.conn);
}
}
fn establish_idle_connections<M>(
shared: &Arc<SharedPool<M>>,
internals: &mut PoolInternals<M::Connection>,
) where
M: ManageConnection,
{
let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
let idle = internals.conns.len() as u32;
for _ in idle..min {
add_connection(shared, internals);
}
}
fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
where
M: ManageConnection,
{
if internals.num_conns + internals.pending_conns >= shared.config.max_size {
return;
}
internals.pending_conns += 1;
inner(Duration::from_secs(0), shared);
fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
where
M: ManageConnection,
{
let new_shared = Arc::downgrade(shared);
shared.config.thread_pool.execute_after(delay, move || {
let shared = match new_shared.upgrade() {
Some(shared) => shared,
None => return,
};
let conn = shared.manager.connect().and_then(|mut conn| {
shared
.config
.connection_customizer
.on_acquire(&mut conn)
.map(|_| conn)
});
match conn {
Ok(conn) => {
let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
let event = AcquireEvent { id };
shared.config.event_handler.handle_acquire(event);
let mut internals = shared.internals.lock();
internals.last_error = None;
let now = Instant::now();
let conn = IdleConn {
conn: Conn {
conn,
birth: now,
id,
},
idle_start: now,
};
internals.conns.push(conn);
internals.pending_conns -= 1;
internals.num_conns += 1;
shared.cond.notify_one();
}
Err(err) => {
shared.internals.lock().last_error = Some(err.to_string());
shared.config.error_handler.handle_error(err);
let delay = cmp::max(Duration::from_millis(200), delay);
let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
inner(delay, &shared);
}
}
});
}
}
fn reap_connections<M>(shared: &Weak<SharedPool<M>>)
where
M: ManageConnection,
{
let shared = match shared.upgrade() {
Some(shared) => shared,
None => return,
};
let mut old = Vec::with_capacity(shared.config.max_size as usize);
let mut to_drop = vec![];
let mut internals = shared.internals.lock();
mem::swap(&mut old, &mut internals.conns);
let now = Instant::now();
for conn in old {
let mut reap = false;
if let Some(timeout) = shared.config.idle_timeout {
reap |= now - conn.idle_start >= timeout;
}
if let Some(lifetime) = shared.config.max_lifetime {
reap |= now - conn.conn.birth >= lifetime;
}
if reap {
to_drop.push(conn.conn);
} else {
internals.conns.push(conn);
}
}
drop_conns(&shared, internals, to_drop);
}
pub struct Pool<M>(Arc<SharedPool<M>>)
where
M: ManageConnection;
impl<M> Clone for Pool<M>
where
M: ManageConnection,
{
fn clone(&self) -> Pool<M> {
Pool(self.0.clone())
}
}
impl<M> fmt::Debug for Pool<M>
where
M: ManageConnection + fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Pool")
.field("state", &self.state())
.field("config", &self.0.config)
.field("manager", &self.0.manager)
.finish()
}
}
impl<M> Pool<M>
where
M: ManageConnection,
{
pub fn new(manager: M) -> Result<Pool<M>, Error> {
Pool::builder().build(manager)
}
pub fn builder() -> Builder<M> {
Builder::new()
}
fn new_inner(
config: Config<M::Connection, M::Error>,
manager: M,
reaper_rate: Duration,
) -> Pool<M> {
let internals = PoolInternals {
conns: Vec::with_capacity(config.max_size as usize),
num_conns: 0,
pending_conns: 0,
last_error: None,
};
let shared = Arc::new(SharedPool {
config,
manager,
internals: Mutex::new(internals),
cond: Condvar::new(),
});
establish_idle_connections(&shared, &mut shared.internals.lock());
if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
let s = Arc::downgrade(&shared);
shared
.config
.thread_pool
.execute_at_fixed_rate(reaper_rate, reaper_rate, move || reap_connections(&s));
}
Pool(shared)
}
fn wait_for_initialization(&self) -> Result<(), Error> {
let end = Instant::now() + self.0.config.connection_timeout;
let mut internals = self.0.internals.lock();
let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
while internals.num_conns != initial_size {
if self.0.cond.wait_until(&mut internals, end).timed_out() {
return Err(Error(internals.last_error.take()));
}
}
Ok(())
}
pub fn get(&self) -> Result<PooledConnection<M>, Error> {
self.get_timeout(self.0.config.connection_timeout)
}
pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
let start = Instant::now();
let end = start + timeout;
let mut internals = self.0.internals.lock();
loop {
match self.try_get_inner(internals) {
Ok(conn) => {
let event = CheckoutEvent {
id: conn.conn.as_ref().unwrap().id,
duration: start.elapsed(),
};
self.0.config.event_handler.handle_checkout(event);
return Ok(conn);
}
Err(i) => internals = i,
}
add_connection(&self.0, &mut internals);
if self.0.cond.wait_until(&mut internals, end).timed_out() {
let event = TimeoutEvent { timeout };
self.0.config.event_handler.handle_timeout(event);
return Err(Error(internals.last_error.take()));
}
}
}
pub fn try_get(&self) -> Option<PooledConnection<M>> {
self.try_get_inner(self.0.internals.lock()).ok()
}
fn try_get_inner<'a>(
&'a self,
mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
loop {
if let Some(mut conn) = internals.conns.pop() {
establish_idle_connections(&self.0, &mut internals);
drop(internals);
if self.0.config.test_on_check_out {
if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
let msg = e.to_string();
self.0.config.error_handler.handle_error(e);
internals = self.0.internals.lock();
internals.last_error = Some(msg);
drop_conns(&self.0, internals, vec![conn.conn]);
internals = self.0.internals.lock();
continue;
}
}
return Ok(PooledConnection {
pool: self.clone(),
checkout: Instant::now(),
conn: Some(conn.conn),
});
} else {
return Err(internals);
}
}
}
fn put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>) {
let event = CheckinEvent {
id: conn.id,
duration: checkout.elapsed(),
};
self.0.config.event_handler.handle_checkin(event);
let broken = self.0.manager.has_broken(&mut conn.conn);
let mut internals = self.0.internals.lock();
if broken {
drop_conns(&self.0, internals, vec![conn]);
} else {
let conn = IdleConn {
conn,
idle_start: Instant::now(),
};
internals.conns.push(conn);
self.0.cond.notify_one();
}
}
pub fn state(&self) -> State {
let internals = self.0.internals.lock();
State {
connections: internals.num_conns,
idle_connections: internals.conns.len() as u32,
_p: (),
}
}
pub fn max_size(&self) -> u32 {
self.0.config.max_size
}
pub fn min_idle(&self) -> Option<u32> {
self.0.config.min_idle
}
pub fn test_on_check_out(&self) -> bool {
self.0.config.test_on_check_out
}
pub fn max_lifetime(&self) -> Option<Duration> {
self.0.config.max_lifetime
}
pub fn idle_timeout(&self) -> Option<Duration> {
self.0.config.idle_timeout
}
pub fn connection_timeout(&self) -> Duration {
self.0.config.connection_timeout
}
}
#[derive(Debug)]
pub struct Error(Option<String>);
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(error::Error::description(self))?;
if let Some(ref err) = self.0 {
write!(fmt, ": {}", err)?;
}
Ok(())
}
}
impl error::Error for Error {
fn description(&self) -> &str {
"timed out waiting for connection"
}
}
pub struct State {
pub connections: u32,
pub idle_connections: u32,
_p: (),
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("State")
.field("connections", &self.connections)
.field("idle_connections", &self.idle_connections)
.finish()
}
}
pub struct PooledConnection<M>
where
M: ManageConnection,
{
pool: Pool<M>,
checkout: Instant,
conn: Option<Conn<M::Connection>>,
}
impl<M> fmt::Debug for PooledConnection<M>
where
M: ManageConnection,
M::Connection: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
}
}
impl<M> Drop for PooledConnection<M>
where
M: ManageConnection,
{
fn drop(&mut self) {
self.pool.put_back(self.checkout, self.conn.take().unwrap());
}
}
impl<M> Deref for PooledConnection<M>
where
M: ManageConnection,
{
type Target = M::Connection;
fn deref(&self) -> &M::Connection {
&self.conn.as_ref().unwrap().conn
}
}
impl<M> DerefMut for PooledConnection<M>
where
M: ManageConnection,
{
fn deref_mut(&mut self) -> &mut M::Connection {
&mut self.conn.as_mut().unwrap().conn
}
}