use std::sync::Arc; use std::time::{Duration, Instant}; pub use info::ThreadInfo; pub use state::RunState; use crate::asm::data::literal::Addr; use crate::module::{EvalRes, CrsnUniq, SchedSignal}; use crate::runtime::fault::Fault; use crate::runtime::frame::{StackFrame, REG_COUNT}; use crate::runtime::program::Program; use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState}; use std::{mem, thread}; use crate::asm::instr::cond::Flag; use parking_lot::RwLock; use crate::asm::instr::Flatten; #[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)] pub struct ThreadToken(pub u32); pub struct RunThread { pub(crate) info: Arc, pub(crate) state: RunState, } pub mod info; pub mod state; pub struct ThreadParams<'a> { pub id: ThreadToken, pub uniq: Arc, pub program: Arc, pub pc: Addr, pub cycle_time: Duration, pub scheduler_interval: Duration, pub args: &'a [u64], } impl RunThread { pub fn new(params: ThreadParams) -> Self { let extensions = params.program.extensions.clone(); // TODO investigate if this division to 2 structs is still needed let ti = Arc::new(ThreadInfo { id: params.id, uniq: params.uniq, program: params.program, cycle_time: params.cycle_time, scheduler_interval: RwLock::new(params.scheduler_interval), extensions, }); let rs = RunState { thread_info: ti.clone(), cr: Box::new(CoroutineContext { handle: 0, // this is the root frame: StackFrame::new(params.pc, params.args), call_stack: vec![], cr_state: Default::default(), }), parked: Default::default(), global_regs: [0; REG_COUNT], ext_data: Default::default(), cr_deadline: None, critical_section: 0, }; Self { info: ti, state: rs, } } /// Start synchronously pub fn run(mut self) { let mut loop_helper = spin_sleep::LoopHelper::builder() .build_with_target_rate(1.0/self.info.cycle_time.as_secs_f64()); loop_helper.loop_start(); let mut orig_pc; let fault = 'run: loop { let mut want_switch = false; orig_pc = self.state.cr.frame.pc; match self.eval_op() { Ok(EvalRes { cycles, advance, sched }) => { for _ in 0..cycles { loop_helper.loop_sleep(); loop_helper.loop_start(); } trace!("Step {}; Status = {}", advance, self.state.cr.frame.status); self.state.cr.frame.pc.advance(advance); if let Some(dl) = self.state.cr_deadline { want_switch = dl <= Instant::now(); } match sched { SchedSignal::Normal => {} SchedSignal::Yield(None) => { trace!("yield"); self.state.cr.cr_state = CoroutineState::Ready; want_switch = true; } SchedSignal::Yield(Some(value)) => { trace!("yield {}", value); self.state.cr.cr_state = CoroutineState::YieldedValue(value); want_switch = true; } SchedSignal::Sleep(time) => { trace!("sleep {:?}", time); self.state.cr.cr_state = CoroutineState::Sleep { due: Instant::now() + time }; want_switch = true; } SchedSignal::Ret(results) => { self.state.cr.cr_state = CoroutineState::Finished(results); want_switch = true; // TODO prioritize a thread that is waiting for this return value } SchedSignal::Join(handle) => { trace!("Join cr {:#}", handle); let mut found = false; 'find: for (n, th) in self.state.parked.iter_mut().enumerate() { if th.handle == handle { if let CoroutineState::Finished(_) = &th.cr_state { let crs = mem::replace(&mut th.cr_state, CoroutineState::Ready); self.state.parked.remove(n); // delete it found = true; if let CoroutineState::Finished(vals) = crs { self.state.cr.frame.set_retvals(&vals); break 'find; } else { unreachable!(); } } else { self.state.cr.frame.pc = orig_pc; // Retry self.state.cr.cr_state = CoroutineState::Ready; want_switch = true; found = true; } } } if !found { self.state.set_flag(Flag::Invalid, true); warn!("Join with invalid thread handle {:#}!", handle); self.state.cr.frame.set_retvals(&[]); } } } } Err(Fault::Blocked) => { trace!("Thread reports being blocked!"); self.state.cr.frame.pc = orig_pc; // Retry self.state.cr.cr_state = CoroutineState::Ready; want_switch = true; } Err(e) => { break 'run e; } } // Resolve the next coroutine to run, or wait a bit... 'next: loop { if want_switch { let now = Instant::now(); if self.state.critical_section > 0 { match self.state.cr.cr_state { CoroutineState::Ready => {} CoroutineState::Sleep { due } => { let time = due.saturating_duration_since(now); trace!("Sleep in critical: {:?}", time); thread::sleep(time); continue 'run; } CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => { // This is not good break 'run Fault::Deadlock; } } // This is either a bug, or waiting for IO. // If it is the latter, try to give the OS a bit of breathing room.. std::thread::yield_now(); continue 'run; } trace!("Switch requested"); let mut candidate = None; let mut closest_due = None; 'findbest: for _ in 0..self.state.parked.len() { if let Some(mut rt) = self.state.parked.pop_front() { match rt.cr_state { CoroutineState::Ready => { trace!("Found READY thread to run next"); candidate = Some(rt); break 'findbest; } CoroutineState::Sleep { due } => { if due <= now { trace!("Found DUE sleeping thread to run next"); rt.cr_state = CoroutineState::Ready; candidate = Some(rt); break 'findbest; } else { match closest_due { Some(d) => { if d > due { closest_due = Some(due); } }, None => { closest_due = Some(due); } } self.state.parked.push_back(rt); } } _ => { self.state.parked.push_back(rt); } } } } if let Some(cr) = candidate { trace!("Context switch to {:?}", cr); // Do switch let old = mem::replace(&mut self.state.cr, cr); self.state.parked.push_back(old); self.state.start_task_switching(); } else if let Some(due) = closest_due { if self.state.cr.cr_state == CoroutineState::Ready { trace!("No candidate found to run, retry same thread."); // Let it run another quantum, maybe it was just blocked continue 'run; } let time = due.saturating_duration_since(now); trace!("No thread to switch to, sleep {:?}", time); thread::sleep(time); continue 'next; } else { // Nothing to run? thread::yield_now(); trace!("No thread to switch to!"); } let n_alive = self.state.parked.iter() .filter(|p| p.cr_state.is_alive()).count(); if n_alive == 0 { trace!("Stop task switching, no parked threads are alive"); self.state.stop_task_switching(); // This should improve performance in single-threaded mode } match self.state.cr.cr_state { CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => { break 'run Fault::RootReturned; } _ => {} } } break 'next; } }; match fault { Fault::Halt => { debug!("Thread ended."); } e => { eprintln!("*** Program failed: {} ***", e); if let Some(instr) = self.info.program.ops.get(orig_pc.0 as usize) { eprintln!("Source location: {}, line {}, column {}", self.info.program.file_names[instr.pos().file as usize].display(), instr.pos().line, instr.pos().column); } else { eprintln!("Instruction address: {}", orig_pc); } debug!("\nCore dump: {:?}", self.state); } } } }