Croissant Runtime
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
crsn/crsn/src/runtime/run_thread.rs

293 lines
12 KiB

use std::sync::Arc;
use std::time::{Duration, Instant};
pub use info::ThreadInfo;
pub use state::RunState;
use crate::asm::data::literal::Addr;
use crate::module::{EvalRes, CrsnUniq, SchedSignal};
use crate::runtime::fault::Fault;
use crate::runtime::frame::{StackFrame, REG_COUNT};
use crate::runtime::program::Program;
use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState};
use std::{mem, thread};
use crate::asm::instr::cond::Flag;
use parking_lot::RwLock;
use crate::asm::instr::Flatten;
#[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub struct ThreadToken(pub u32);
pub struct RunThread {
pub(crate) info: Arc<ThreadInfo>,
pub(crate) state: RunState,
}
pub mod info;
pub mod state;
pub struct ThreadParams<'a> {
pub id: ThreadToken,
pub uniq: Arc<CrsnUniq>,
pub program: Arc<Program>,
pub pc: Addr,
pub cycle_time: Duration,
pub scheduler_interval: Duration,
pub args: &'a [u64],
}
impl RunThread {
pub fn new(params: ThreadParams) -> Self {
let extensions = params.program.extensions.clone();
// TODO investigate if this division to 2 structs is still needed
let ti = Arc::new(ThreadInfo {
id: params.id,
uniq: params.uniq,
program: params.program,
cycle_time: params.cycle_time,
scheduler_interval: RwLock::new(params.scheduler_interval),
extensions,
});
let rs = RunState {
thread_info: ti.clone(),
cr: Box::new(CoroutineContext {
handle: 0, // this is the root
frame: StackFrame::new(params.pc, params.args),
call_stack: vec![],
cr_state: Default::default(),
}),
parked: Default::default(),
global_regs: [0; REG_COUNT],
ext_data: Default::default(),
cr_deadline: None,
critical_section: 0,
};
Self {
info: ti,
state: rs,
}
}
/// Start synchronously
pub fn run(mut self) {
let mut loop_helper = spin_sleep::LoopHelper::builder()
.build_with_target_rate(1.0/self.info.cycle_time.as_secs_f64());
loop_helper.loop_start();
let mut orig_pc;
let fault = 'run: loop {
let mut want_switch = false;
orig_pc = self.state.cr.frame.pc;
match self.eval_op() {
Ok(EvalRes { cycles, advance, sched }) => {
for _ in 0..cycles {
loop_helper.loop_sleep();
loop_helper.loop_start();
}
trace!("Step {}; Status = {}", advance, self.state.cr.frame.status);
self.state.cr.frame.pc.advance(advance);
if let Some(dl) = self.state.cr_deadline {
want_switch = dl <= Instant::now();
}
match sched {
SchedSignal::Normal => {}
SchedSignal::Yield(None) => {
trace!("yield");
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true;
}
SchedSignal::Yield(Some(value)) => {
trace!("yield {}", value);
self.state.cr.cr_state = CoroutineState::YieldedValue(value);
want_switch = true;
}
SchedSignal::Sleep(time) => {
trace!("sleep {:?}", time);
self.state.cr.cr_state = CoroutineState::Sleep { due: Instant::now() + time };
want_switch = true;
}
SchedSignal::Ret(results) => {
self.state.cr.cr_state = CoroutineState::Finished(results);
want_switch = true;
// TODO prioritize a thread that is waiting for this return value
}
SchedSignal::Join(handle) => {
trace!("Join cr {:#}", handle);
let mut found = false;
'find: for (n, th) in self.state.parked.iter_mut().enumerate() {
if th.handle == handle {
if let CoroutineState::Finished(_) = &th.cr_state {
let crs = mem::replace(&mut th.cr_state, CoroutineState::Ready);
self.state.parked.remove(n); // delete it
found = true;
if let CoroutineState::Finished(vals) = crs {
self.state.cr.frame.set_retvals(&vals);
break 'find;
} else {
unreachable!();
}
} else {
self.state.cr.frame.pc = orig_pc; // Retry
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true;
found = true;
}
}
}
if !found {
self.state.set_flag(Flag::Invalid, true);
warn!("Join with invalid thread handle {:#}!", handle);
self.state.cr.frame.set_retvals(&[]);
}
}
}
}
Err(Fault::Blocked) => {
trace!("Thread reports being blocked!");
self.state.cr.frame.pc = orig_pc; // Retry
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true;
}
Err(e) => {
break 'run e;
}
}
// Resolve the next coroutine to run, or wait a bit...
'next: loop {
if want_switch {
let now = Instant::now();
if self.state.critical_section > 0 {
match self.state.cr.cr_state {
CoroutineState::Ready => {}
CoroutineState::Sleep { due } => {
let time = due.saturating_duration_since(now);
trace!("Sleep in critical: {:?}", time);
thread::sleep(time);
continue 'run;
}
CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => {
// This is not good
break 'run Fault::Deadlock;
}
}
// This is either a bug, or waiting for IO.
// If it is the latter, try to give the OS a bit of breathing room..
std::thread::yield_now();
continue 'run;
}
trace!("Switch requested");
let mut candidate = None;
let mut closest_due = None;
'findbest: for _ in 0..self.state.parked.len() {
if let Some(mut rt) = self.state.parked.pop_front() {
match rt.cr_state {
CoroutineState::Ready => {
trace!("Found READY thread to run next");
candidate = Some(rt);
break 'findbest;
}
CoroutineState::Sleep { due } => {
if due <= now {
trace!("Found DUE sleeping thread to run next");
rt.cr_state = CoroutineState::Ready;
candidate = Some(rt);
break 'findbest;
} else {
match closest_due {
Some(d) => {
if d > due {
closest_due = Some(due);
}
},
None => {
closest_due = Some(due);
}
}
self.state.parked.push_back(rt);
}
}
_ => {
self.state.parked.push_back(rt);
}
}
}
}
if let Some(cr) = candidate {
trace!("Context switch to {:?}", cr);
// Do switch
let old = mem::replace(&mut self.state.cr, cr);
self.state.parked.push_back(old);
self.state.start_task_switching();
} else if let Some(due) = closest_due {
if self.state.cr.cr_state == CoroutineState::Ready {
trace!("No candidate found to run, retry same thread.");
// Let it run another quantum, maybe it was just blocked
continue 'run;
}
let time = due.saturating_duration_since(now);
trace!("No thread to switch to, sleep {:?}", time);
thread::sleep(time);
continue 'next;
} else {
// Nothing to run?
thread::yield_now();
trace!("No thread to switch to!");
}
let n_alive = self.state.parked.iter()
.filter(|p| p.cr_state.is_alive()).count();
if n_alive == 0 {
trace!("Stop task switching, no parked threads are alive");
self.state.stop_task_switching(); // This should improve performance in single-threaded mode
}
match self.state.cr.cr_state {
CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => {
break 'run Fault::RootReturned;
}
_ => {}
}
}
break 'next;
}
};
match fault {
Fault::Halt => {
debug!("Thread ended.");
}
e => {
eprintln!("*** Program failed: {} ***", e);
if let Some(instr) = self.info.program.ops.get(orig_pc.0 as usize) {
eprintln!("Source location: {}, line {}, column {}", self.info.program.file_names[instr.pos().file as usize].display(), instr.pos().line, instr.pos().column);
} else {
eprintln!("Instruction address: {}", orig_pc);
}
debug!("\nCore dump: {:?}", self.state);
}
}
}
}