use std::prelude::v1::*;
use std::cell::Cell;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT};
use std::sync::atomic::{AtomicUsize, Ordering};
use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink};
use super::core;
use super::{BorrowedTask, NotifyHandle, Spawn, spawn, Notify, UnsafeNotify};
mod unpark_mutex;
pub use self::unpark_mutex::UnparkMutex;
mod data;
pub use self::data::*;
mod task_rc;
#[allow(deprecated)]
#[cfg(feature = "with-deprecated")]
pub use self::task_rc::TaskRc;
pub use task_impl::core::init;
thread_local!(static CURRENT_TASK: Cell<*mut u8> = Cell::new(ptr::null_mut()));
pub fn is_in_task() -> bool {
    CURRENT_TASK.with(|task| !task.get().is_null())
}
static INIT: Once = ONCE_INIT;
pub fn get_ptr() -> Option<*mut u8> {
    
    
    
    
    if core::is_get_ptr(0x1) {
        Some(CURRENT_TASK.with(|c| c.get()))
    } else {
        core::get_ptr()
    }
}
fn tls_slot() -> *const Cell<*mut u8> {
    CURRENT_TASK.with(|c| c as *const _)
}
pub fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R
    where F: FnOnce() -> R
{
    
    
    
    
    INIT.call_once(|| unsafe {
        let get = mem::transmute::<usize, _>(0x1);
        let set = mem::transmute::<usize, _>(0x2);
        init(get, set);
    });
    
    if core::is_get_ptr(0x1) {
        struct Reset(*const Cell<*mut u8>, *mut u8);
        impl Drop for Reset {
            #[inline]
            fn drop(&mut self) {
                unsafe {
                    (*self.0).set(self.1);
                }
            }
        }
        unsafe {
            let slot = tls_slot();
            let _reset = Reset(slot, (*slot).get());
            (*slot).set(task as *const _ as *mut u8);
            f()
        }
    } else {
        core::set(task, f)
    }
}
#[derive(Copy, Clone)]
#[allow(deprecated)]
pub enum BorrowedUnpark<'a> {
    Old(&'a Arc<Unpark>),
    New(core::BorrowedUnpark<'a>),
}
#[derive(Copy, Clone)]
#[allow(deprecated)]
pub enum BorrowedEvents<'a> {
    None,
    One(&'a UnparkEvent, &'a BorrowedEvents<'a>),
}
#[derive(Clone)]
pub enum TaskUnpark {
    #[allow(deprecated)]
    Old(Arc<Unpark>),
    New(core::TaskUnpark),
}
#[derive(Clone)]
#[allow(deprecated)]
pub enum UnparkEvents {
    None,
    One(UnparkEvent),
    Many(Box<[UnparkEvent]>),
}
impl<'a> BorrowedUnpark<'a> {
    #[inline]
    pub fn new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a> {
        BorrowedUnpark::New(core::BorrowedUnpark::new(f, id))
    }
    #[inline]
    pub fn to_owned(&self) -> TaskUnpark {
        match *self {
            BorrowedUnpark::Old(old) => TaskUnpark::Old(old.clone()),
            BorrowedUnpark::New(new) => TaskUnpark::New(new.to_owned()),
        }
    }
}
impl<'a> BorrowedEvents<'a> {
    #[inline]
    pub fn new() -> BorrowedEvents<'a> {
        BorrowedEvents::None
    }
    #[inline]
    pub fn to_owned(&self) -> UnparkEvents {
        let mut one_event = None;
        let mut list = Vec::new();
        let mut cur = self;
        while let BorrowedEvents::One(event, next) = *cur {
            let event = event.clone();
            match one_event.take() {
                None if list.len() == 0 => one_event = Some(event),
                None => list.push(event),
                Some(event2) =>  {
                    list.push(event2);
                    list.push(event);
                }
            }
            cur = next;
        }
        match one_event {
            None if list.len() == 0 => UnparkEvents::None,
            None => UnparkEvents::Many(list.into_boxed_slice()),
            Some(e) => UnparkEvents::One(e),
        }
    }
}
impl UnparkEvents {
    pub fn notify(&self) {
        match *self {
            UnparkEvents::None => {}
            UnparkEvents::One(ref e) => e.unpark(),
            UnparkEvents::Many(ref list) => {
                for event in list.iter() {
                    event.unpark();
                }
            }
        }
    }
    pub fn will_notify(&self, events: &BorrowedEvents) -> bool {
        
        
        match *self {
            UnparkEvents::None => {}
            _ => return false,
        }
        match *events {
            BorrowedEvents::None => return true,
            _ => {},
        }
        return false
    }
}
#[allow(deprecated)]
impl TaskUnpark {
    pub fn notify(&self) {
        match *self {
            TaskUnpark::Old(ref old) => old.unpark(),
            TaskUnpark::New(ref new) => new.notify(),
        }
    }
    pub fn will_notify(&self, unpark: &BorrowedUnpark) -> bool {
        match (unpark, self) {
            (&BorrowedUnpark::Old(old1), &TaskUnpark::Old(ref old2)) => {
                &**old1 as *const Unpark == &**old2 as *const Unpark
            }
            (&BorrowedUnpark::New(ref new1), &TaskUnpark::New(ref new2)) => {
                new2.will_notify(new1)
            }
            _ => false,
        }
    }
}
impl<F: Future> Spawn<F> {
    #[doc(hidden)]
    #[deprecated(note = "recommended to use `poll_future_notify` instead")]
    #[allow(deprecated)]
    pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
        self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll())
    }
    
    
    
    
    
    
    pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
        ThreadNotify::with_current(|notify| {
            loop {
                match self.poll_future_notify(notify, 0)? {
                    Async::NotReady => notify.park(),
                    Async::Ready(e) => return Ok(e),
                }
            }
        })
    }
    #[doc(hidden)]
    #[deprecated]
    #[allow(deprecated)]
    pub fn execute(self, exec: Arc<Executor>)
        where F: Future<Item=(), Error=()> + Send + 'static,
    {
        exec.clone().execute(Run {
            
            
            
            
            spawn: spawn(Box::new(self.into_inner())),
            inner: Arc::new(RunInner {
                exec: exec,
                mutex: UnparkMutex::new()
            }),
        })
    }
}
impl<S: Stream> Spawn<S> {
    #[deprecated(note = "recommended to use `poll_stream_notify` instead")]
    #[allow(deprecated)]
    #[doc(hidden)]
    pub fn poll_stream(&mut self, unpark: Arc<Unpark>)
                       -> Poll<Option<S::Item>, S::Error> {
        self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll())
    }
    
    
    pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
        ThreadNotify::with_current(|notify| {
            loop {
                match self.poll_stream_notify(notify, 0) {
                    Ok(Async::NotReady) => notify.park(),
                    Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
                    Ok(Async::Ready(None)) => return None,
                    Err(e) => return Some(Err(e)),
                }
            }
        })
    }
}
impl<S: Sink> Spawn<S> {
    #[doc(hidden)]
    #[deprecated(note = "recommended to use `start_send_notify` instead")]
    #[allow(deprecated)]
    pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>)
                       -> StartSend<S::SinkItem, S::SinkError> {
        self.enter(BorrowedUnpark::Old(unpark), |s| s.start_send(value))
    }
    #[deprecated(note = "recommended to use `poll_flush_notify` instead")]
    #[allow(deprecated)]
    #[doc(hidden)]
    pub fn poll_flush(&mut self, unpark: &Arc<Unpark>)
                       -> Poll<(), S::SinkError> {
        self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete())
    }
    
    
    
    
    
    pub fn wait_send(&mut self, mut value: S::SinkItem)
                     -> Result<(), S::SinkError> {
        ThreadNotify::with_current(|notify| {
            loop {
                value = match self.start_send_notify(value, notify, 0)? {
                    AsyncSink::NotReady(v) => v,
                    AsyncSink::Ready => return Ok(()),
                };
                notify.park();
            }
        })
    }
    
    
    
    
    
    
    
    
    pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
        ThreadNotify::with_current(|notify| {
            loop {
                if self.poll_flush_notify(notify, 0)?.is_ready() {
                    return Ok(())
                }
                notify.park();
            }
        })
    }
    
    
    
    
    
    pub fn wait_close(&mut self) -> Result<(), S::SinkError> {
        ThreadNotify::with_current(|notify| {
            loop {
                if self.close_notify(notify, 0)?.is_ready() {
                    return Ok(())
                }
                notify.park();
            }
        })
    }
}
#[deprecated(note = "recommended to use `Notify` instead")]
pub trait Unpark: Send + Sync {
    
    
    
    
    
    fn unpark(&self);
}
#[deprecated]
#[allow(deprecated)]
pub trait Executor: Send + Sync + 'static {
    
    fn execute(&self, r: Run);
}
#[deprecated]
pub struct Run {
    spawn: Spawn<Box<Future<Item = (), Error = ()> + Send>>,
    inner: Arc<RunInner>,
}
#[allow(deprecated)]
struct RunInner {
    mutex: UnparkMutex<Run>,
    exec: Arc<Executor>,
}
#[allow(deprecated)]
impl Run {
    
    
    pub fn run(self) {
        let Run { mut spawn, inner } = self;
        
        
        unsafe {
            inner.mutex.start_poll();
            loop {
                match spawn.poll_future_notify(&inner, 0) {
                    Ok(Async::NotReady) => {}
                    Ok(Async::Ready(())) |
                    Err(()) => return inner.mutex.complete(),
                }
                let run = Run { spawn: spawn, inner: inner.clone() };
                match inner.mutex.wait(run) {
                    Ok(()) => return,            
                    Err(r) => spawn = r.spawn,   
                }
            }
        }
    }
}
#[allow(deprecated)]
impl fmt::Debug for Run {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Run")
         .field("contents", &"...")
         .finish()
    }
}
#[allow(deprecated)]
impl Notify for RunInner {
    fn notify(&self, _id: usize) {
        match self.mutex.notify() {
            Ok(run) => self.exec.execute(run),
            Err(()) => {}
        }
    }
}
struct ThreadNotify {
    state: AtomicUsize,
    mutex: Mutex<()>,
    condvar: Condvar,
}
const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;
thread_local! {
    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
        state: AtomicUsize::new(IDLE),
        mutex: Mutex::new(()),
        condvar: Condvar::new(),
    });
}
impl ThreadNotify {
    fn with_current<F, R>(f: F) -> R
        where F: FnOnce(&Arc<ThreadNotify>) -> R,
    {
        CURRENT_THREAD_NOTIFY.with(|notify| f(notify))
    }
    fn park(&self) {
        
        
        match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
            NOTIFY => return,
            IDLE => {},
            _ => unreachable!(),
        }
        
        
        let mut m = self.mutex.lock().unwrap();
        
        match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
            NOTIFY => {
                
                
                self.state.store(IDLE, Ordering::SeqCst);
                return;
            }
            IDLE => {},
            _ => unreachable!(),
        }
        
        loop {
            m = self.condvar.wait(m).unwrap();
            
            if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
                return;
            }
        }
    }
}
impl Notify for ThreadNotify {
    fn notify(&self, _unpark_id: usize) {
        
        
        match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
            IDLE | NOTIFY => return,
            SLEEP => {}
            _ => unreachable!(),
        }
        
        let _m = self.mutex.lock().unwrap();
        
        match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
            SLEEP => {}
            _ => return,
        }
        
        self.condvar.notify_one();
    }
}
#[deprecated(note = "recommended to use `FuturesUnordered` instead")]
#[allow(deprecated)]
pub fn with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R
    where F: FnOnce() -> R
{
    super::with(|task| {
        let new_task = BorrowedTask {
            id: task.id,
            unpark: task.unpark,
            events: BorrowedEvents::One(&event, &task.events),
            map: task.map,
        };
        super::set(&new_task, f)
    })
}
#[derive(Clone)]
#[deprecated(note = "recommended to use `FuturesUnordered` instead")]
#[allow(deprecated)]
pub struct UnparkEvent {
    set: Arc<EventSet>,
    item: usize,
}
#[allow(deprecated)]
impl UnparkEvent {
    
    
    #[deprecated(note = "recommended to use `FuturesUnordered` instead")]
    pub fn new(set: Arc<EventSet>, id: usize) -> UnparkEvent {
        UnparkEvent {
            set: set,
            item: id,
        }
    }
    fn unpark(&self) {
        self.set.insert(self.item);
    }
}
#[allow(deprecated)]
impl fmt::Debug for UnparkEvent {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("UnparkEvent")
         .field("set", &"...")
         .field("item", &self.item)
         .finish()
    }
}
#[deprecated(since="0.1.18", note = "recommended to use `FuturesUnordered` instead")]
pub trait EventSet: Send + Sync + 'static {
    
    fn insert(&self, id: usize);
}
struct ArcWrapped<T>(PhantomData<T>);
impl<T: Notify + 'static> Notify for ArcWrapped<T> {
    fn notify(&self, id: usize) {
        unsafe {
            let me: *const ArcWrapped<T> = self;
            T::notify(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
                      id)
        }
    }
    fn clone_id(&self, id: usize) -> usize {
        unsafe {
            let me: *const ArcWrapped<T> = self;
            T::clone_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
                        id)
        }
    }
    fn drop_id(&self, id: usize) {
        unsafe {
            let me: *const ArcWrapped<T> = self;
            T::drop_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
                       id)
        }
    }
}
unsafe impl<T: Notify + 'static> UnsafeNotify for ArcWrapped<T> {
    unsafe fn clone_raw(&self) -> NotifyHandle {
        let me: *const ArcWrapped<T> = self;
        let arc = (*(&me as *const *const ArcWrapped<T> as *const Arc<T>)).clone();
        NotifyHandle::from(arc)
    }
    unsafe fn drop_raw(&self) {
        let mut me: *const ArcWrapped<T> = self;
        let me = &mut me as *mut *const ArcWrapped<T> as *mut Arc<T>;
        ptr::drop_in_place(me);
    }
}
impl<T> From<Arc<T>> for NotifyHandle
    where T: Notify + 'static,
{
    fn from(rc: Arc<T>) -> NotifyHandle {
        unsafe {
            let ptr = mem::transmute::<Arc<T>, *mut ArcWrapped<T>>(rc);
            NotifyHandle::new(ptr)
        }
    }
}
#[cfg(feature = "nightly")]
mod nightly {
    use super::{TaskUnpark, UnparkEvents};
    use core::marker::Unpin;
    impl Unpin for TaskUnpark {}
    impl Unpin for UnparkEvents {}
}