1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
use worker::Worker; use futures::Poll; use tokio_executor; use std::error::Error; use std::fmt; /// Error raised by `blocking`. pub struct BlockingError { _p: (), } /// Enter a blocking section of code. /// /// The `blocking` function annotates a section of code that performs a blocking /// operation, either by issuing a blocking syscall or by performing a long /// running CPU-bound computation. /// /// When the `blocking` function enters, it hands off the responsibility of /// processing the current work queue to another thread. Then, it calls the /// supplied closure. The closure is permitted to block indefinitely. /// /// If the maximum number of concurrent `blocking` calls has been reached, then /// `NotReady` is returned and the task is notified once existing `blocking` /// calls complete. The maximum value is specified when creating a thread pool /// using [`Builder::max_blocking`][build] /// /// NB: The entire task that called `blocking` is blocked whenever the supplied /// closure blocks, even if you have used future combinators such as `select` - /// the other futures in this task will not make progress until the closure /// returns. /// If this is not desired, ensure that `blocking` runs in its own task (e.g. /// using `futures::sync::oneshot::spawn`). /// /// [build]: struct.Builder.html#method.max_blocking /// /// # Return /// /// When the blocking closure is executed, `Ok(Ready(T))` is returned, where /// `T` is the closure's return value. /// /// If the thread pool has shutdown, `Err` is returned. /// /// If the number of concurrent `blocking` calls has reached the maximum, /// `Ok(NotReady)` is returned and the current task is notified when a call to /// `blocking` will succeed. /// /// If `blocking` is called from outside the context of a Tokio thread pool, /// `Err` is returned. /// /// # Background /// /// By default, the Tokio thread pool expects that tasks will only run for short /// periods at a time before yielding back to the thread pool. This is the basic /// premise of cooperative multitasking. /// /// However, it is common to want to perform a blocking operation while /// processing an asynchronous computation. Examples of blocking operation /// include: /// /// * Performing synchronous file operations (reading and writing). /// * Blocking on acquiring a mutex. /// * Performing a CPU bound computation, like cryptographic encryption or /// decryption. /// /// One option for dealing with blocking operations in an asynchronous context /// is to use a thread pool dedicated to performing these operations. This not /// ideal as it requires bidirectional message passing as well as a channel to /// communicate which adds a level of buffering. /// /// Instead, `blocking` hands off the responsibility of processing the work queue /// to another thread. This hand off is light compared to a channel and does not /// require buffering. /// /// # Examples /// /// Block on receiving a message from a `std` channel. This example is a little /// silly as using the non-blocking channel from the `futures` crate would make /// more sense. The blocking receive can be replaced with any blocking operation /// that needs to be performed. /// /// ```rust /// # extern crate futures; /// # extern crate tokio_threadpool; /// /// use tokio_threadpool::{ThreadPool, blocking}; /// /// use futures::Future; /// use futures::future::{lazy, poll_fn}; /// /// use std::sync::mpsc; /// use std::thread; /// use std::time::Duration; /// /// pub fn main() { /// // This is a *blocking* channel /// let (tx, rx) = mpsc::channel(); /// /// // Spawn a thread to send a message /// thread::spawn(move || { /// thread::sleep(Duration::from_millis(500)); /// tx.send("hello").unwrap(); /// }); /// /// let pool = ThreadPool::new(); /// /// pool.spawn(lazy(move || { /// // Because `blocking` returns `Poll`, it is intended to be used /// // from the context of a `Future` implementation. Since we don't /// // have a complicated requirement, we can use `poll_fn` in this /// // case. /// poll_fn(move || { /// blocking(|| { /// let msg = rx.recv().unwrap(); /// println!("message = {}", msg); /// }).map_err(|_| panic!("the threadpool shut down")) /// }) /// })); /// /// // Wait for the task we just spawned to complete. /// pool.shutdown_on_idle().wait().unwrap(); /// } /// ``` pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError> where F: FnOnce() -> T, { let res = Worker::with_current(|worker| { let worker = match worker { Some(worker) => worker, None => { return Err(BlockingError { _p: () }); } }; // Transition the worker state to blocking. This will exit the fn early // with `NotReady` if the pool does not have enough capacity to enter // blocking mode. worker.transition_to_blocking() }); // If the transition cannot happen, exit early try_ready!(res); // Currently in blocking mode, so call the inner closure. // // "Exit" the current executor in case the blocking function wants // to call a different executor. let ret = tokio_executor::exit(move || f()); // Try to transition out of blocking mode. This is a fast path that takes // back ownership of the worker if the worker handoff didn't complete yet. Worker::with_current(|worker| { // Worker must be set since it was above. worker.unwrap().transition_from_blocking(); }); // Return the result Ok(ret.into()) } impl fmt::Display for BlockingError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "{}", self.description()) } } impl fmt::Debug for BlockingError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BlockingError") .field("reason", &self.description()) .finish() } } impl Error for BlockingError { fn description(&self) -> &str { "`blocking` annotation used from outside the context of a thread pool" } }