From 8659240d01f039cda922c068128b43549400d414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Sat, 31 Oct 2020 17:32:02 +0100 Subject: [PATCH 1/4] wip coroutines and scheduler --- crsn/src/asm/instr/op.rs | 3 +- crsn/src/asm/mod.rs | 8 +- crsn/src/asm/parse/parse_data.rs | 2 +- crsn/src/builtin/defs.rs | 9 +- crsn/src/builtin/exec.rs | 61 +++++++++--- crsn/src/builtin/parse.rs | 65 ++++++++++-- crsn/src/module/eval_res.rs | 22 +++++ crsn/src/module/mod.rs | 2 +- crsn/src/runtime/exec.rs | 4 +- crsn/src/runtime/fault.rs | 3 + crsn/src/runtime/frame.rs | 6 +- crsn/src/runtime/run_thread.rs | 142 +++++++++++++++++++++++++-- crsn/src/runtime/run_thread/info.rs | 2 + crsn/src/runtime/run_thread/state.rs | 127 +++++++++++++++++++----- launcher/src/main.rs | 4 + 15 files changed, 399 insertions(+), 61 deletions(-) diff --git a/crsn/src/asm/instr/op.rs b/crsn/src/asm/instr/op.rs index 296c1ce..555af50 100644 --- a/crsn/src/asm/instr/op.rs +++ b/crsn/src/asm/instr/op.rs @@ -4,7 +4,7 @@ use sexp::{Atom, Sexp, SourcePosition}; use crate::asm::instr::Cond; use crate::builtin::defs::BuiltinOp; -use crate::module::{EvalRes, OpTrait}; +use crate::module::{EvalRes, OpTrait, SchedSignal}; use crate::runtime::fault::Fault; use crate::runtime::run_thread::{info::ThreadInfo, state::RunState}; @@ -30,6 +30,7 @@ impl OpTrait for Op { return Ok(EvalRes { cycles: 0, advance: 1, + sched: SchedSignal::Normal }); } } diff --git a/crsn/src/asm/mod.rs b/crsn/src/asm/mod.rs index 1974265..c880636 100644 --- a/crsn/src/asm/mod.rs +++ b/crsn/src/asm/mod.rs @@ -42,6 +42,7 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec Result), + Call { proc: RoutineName, args: Vec }, + /// Spawn a coroutine. The invocation is similar to (call). + Spawn { handle: Wr, proc: RoutineName, args: Vec }, /// Exit the current routine with return values Ret(Vec), /// Mark a routine entry point (call target). @@ -144,6 +146,11 @@ pub enum BuiltinOp { Routine(RoutineName), /// Skip backward or forward. The skip count can be defined by an argument. Skip(Rd), + /// Join a coroutine + Join(RdObj), + /// Yield control, optionally yielding a value that must be consumed (by reading the task handle) + /// before execution can resume + Yield { value: Option }, /// Deny jumps, skips and run across this address, producing a run-time fault. Barrier { kind: Barrier, diff --git a/crsn/src/builtin/exec.rs b/crsn/src/builtin/exec.rs index 535ba1a..140f53a 100644 --- a/crsn/src/builtin/exec.rs +++ b/crsn/src/builtin/exec.rs @@ -4,7 +4,7 @@ use sexp::Sexp; use crate::asm::data::literal::{Addr, Value}; use crate::builtin::defs::{Barrier, BuiltinOp, LdsValue}; -use crate::module::{EvalRes, OpTrait}; +use crate::module::{EvalRes, OpTrait, SchedSignal}; use crate::runtime::fault::Fault; use crate::runtime::frame::StackFrame; use crate::runtime::run_thread::{state::RunState, ThreadInfo}; @@ -63,7 +63,7 @@ impl OpTrait for BuiltinOp { BuiltinOp::Jump(_name) => { panic!("jump not translated to skip by assembler!"); } - BuiltinOp::Call(name, args) => { + BuiltinOp::Call { proc: name, args } => { match program.find_routine(&name) { Ok(pos) => { let mut values = Vec::with_capacity(args.len()); @@ -72,8 +72,8 @@ impl OpTrait for BuiltinOp { } let mut frame2 = StackFrame::new(pos, &values); - std::mem::swap(&mut state.frame, &mut frame2); - state.call_stack.push(frame2); + std::mem::swap(&mut state.cr.frame, &mut frame2); + state.cr.call_stack.push(frame2); res.advance = 0; } Err(e) => { @@ -81,18 +81,40 @@ impl OpTrait for BuiltinOp { } } } - BuiltinOp::Ret(retvals) => { - match state.call_stack.pop() { - Some(previous) => { - let mut values = Vec::with_capacity(retvals.len()); - for arg in retvals { + BuiltinOp::Spawn { handle, proc, args } => { + match program.find_routine(&proc) { + Ok(pos) => { + let mut values = Vec::with_capacity(args.len()); + for arg in args { values.push(state.read(arg)?); } - state.frame = previous; - state.frame.set_retvals(&values); + + let mut frame2 = StackFrame::new(pos, &values); + let handle_val = state.add_coroutine(frame2); + state.write(handle, handle_val)?; + + // Yield execution so the new thread can start running + res.sched = SchedSignal::Yield(None); + } + Err(e) => { + return Err(e); + } + } + } + BuiltinOp::Ret(retvals) => { + let mut values = Vec::with_capacity(retvals.len()); + for arg in retvals { + values.push(state.read(arg)?); + } + + match state.cr.call_stack.pop() { + Some(previous) => { + state.cr.frame = previous; + state.cr.frame.set_retvals(&values); } None => { - return Err(Fault::CallStackUnderflow); + // Stack underflow - if this is a coroutine, it's finished + res.sched = SchedSignal::Ret(values); } } } @@ -196,15 +218,15 @@ impl OpTrait for BuiltinOp { } } BuiltinOp::StoreFlags { dst } => { - let packed = state.frame.status.store(); + let packed = state.cr.frame.status.store(); state.write(dst, packed)?; } BuiltinOp::LoadFlags { src } => { let x = state.read(src)?; - state.frame.status.load(x); + state.cr.frame.status.load(x); } BuiltinOp::Sleep { count: micros, unit_us } => { - std::thread::sleep(Duration::from_micros(state.read(micros)? * unit_us.micros())) + res.sched = SchedSignal::Sleep(Duration::from_micros(state.read(micros)? * unit_us.micros())); } BuiltinOp::Delete(obj) => { let hv = obj.read(state)?; @@ -221,6 +243,15 @@ impl OpTrait for BuiltinOp { state.set_flag(Flag::Invalid, true); } } + BuiltinOp::Yield { value } => { + res.sched = SchedSignal::Yield(match value { + None => None, + Some(rd) => Some(state.read(rd)?) + }); + } + BuiltinOp::Join(obj) => { + res.sched = SchedSignal::Join(obj.read(state)?); + } } Ok(res) diff --git a/crsn/src/builtin/parse.rs b/crsn/src/builtin/parse.rs index a1563c6..9b3c90b 100644 --- a/crsn/src/builtin/parse.rs +++ b/crsn/src/builtin/parse.rs @@ -1,4 +1,4 @@ -use sexp::{Atom, Sexp, SourcePosition, atom_qs}; +use sexp::{Atom, Sexp, SourcePosition, atom_qs, atom_s}; use crate::asm::data::literal::{RoutineName}; use crate::asm::data::reg::parse_reg; @@ -25,21 +25,21 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok BuiltinOp::Halt } - "uslp" => { + "uslp" | "usleep" => { BuiltinOp::Sleep { count: args.next_rd()?, unit_us: SleepUnit::Usec, } } - "mslp" => { + "mslp" | "msleep" => { BuiltinOp::Sleep { count: args.next_rd()?, unit_us: SleepUnit::Msec, } } - "sslp" => { + "sslp" | "ssleep" => { BuiltinOp::Sleep { count: args.next_rd()?, unit_us: SleepUnit::Sec, @@ -132,7 +132,7 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok for t in args { call_args.push(parse_rd(t, pcx)?); } - BuiltinOp::Call(dest, call_args) + BuiltinOp::Call { proc: dest, args: call_args } } "ret" => { @@ -258,6 +258,38 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok } } + "spawn" => { + let handle = args.next_wr()?; + let dest = RoutineName { name: args.next_string()?.0, arity: args.len() as u8 }; + + let mut call_args = vec![]; + for t in args { + call_args.push(parse_rd(t, pcx)?); + } + + BuiltinOp::Spawn { + handle, + proc: dest, + args: call_args + } + } + + "join" => { + BuiltinOp::Join(args.next_rdobj()?) + } + + "yield" => { + if args.have_more() { + BuiltinOp::Yield { + value: Some(args.next_rd()?), + } + } else { + BuiltinOp::Yield { + value: None, + } + } + } + "del" => { BuiltinOp::Delete(args.next_rdobj()?) } @@ -326,7 +358,7 @@ pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp { BuiltinOp::Jump(label) => sexp::list(&[A("j"), A(label)]), BuiltinOp::FarLabel(label) => sexp::list(&[A("far"), A(label)]), BuiltinOp::FarJump(label) => sexp::list(&[A("fj"), A(label)]), - BuiltinOp::Call(name, args) => { + BuiltinOp::Call { proc: name, args } => { if args.is_empty() { sexp::list(&[A("call"), A(&name.name)]) } else { @@ -416,6 +448,22 @@ pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp { } } } + BuiltinOp::Spawn { handle, proc, args } => { + if args.is_empty() { + sexp::list(&[A("spawn"), A(handle), A(proc)]) + } else { + let mut v = vec![A("spawn"), A(handle), A(proc)]; + v.extend(args.iter().map(|r| A(r))); + sexp::list(&v) + } + } + BuiltinOp::Join(handle) => sexp::list(&[A("join"), A(handle)]), + BuiltinOp::Yield { value } => { + match value { + None => sexp::list(&[A("yield")]), + Some(rd) => sexp::list(&[A("yield"), A(rd)]), + } + } } } @@ -505,6 +553,11 @@ mod test { ("(far :label)", "(far :label)"), ("(del @r5)", "(del @r5)"), ("(sym cat r0)(del @cat)", "(del @r0)"), + ("(spawn r0 foo 1 2 3)", "(spawn r0 foo 1 2 3)"), + ("(yield)", "(yield)"), + ("(yield -1)", "(yield -1)"), + ("(yield r5)", "(yield r5)"), + ("(join @r5)", "(join @r5)"), ]; let parser = BuiltinOps::new(); diff --git a/crsn/src/module/eval_res.rs b/crsn/src/module/eval_res.rs index 514558f..e05c5bd 100644 --- a/crsn/src/module/eval_res.rs +++ b/crsn/src/module/eval_res.rs @@ -1,3 +1,6 @@ +use crate::asm::data::literal::Value; +use std::time::Duration; + /// Cycles spent executing an instruction pub type CyclesSpent = usize; @@ -6,6 +9,7 @@ pub type CyclesSpent = usize; pub struct EvalRes { pub cycles: CyclesSpent, pub advance: i64, + pub sched: SchedSignal, } impl Default for EvalRes { @@ -13,6 +17,24 @@ impl Default for EvalRes { Self { cycles: 1, advance: 1, + sched: SchedSignal::Normal, } } } + +/// Signal to the scheduler +#[derive(Debug)] +pub enum SchedSignal { + /// No signal, execution went normally. + Normal, + /// Yield control, optionally with a value. + /// If a value is yielded, it must be consumed through the handle before execution can resume. + Yield(Option), + /// The routine requests a delay in execution. The actual sleep time can be longer due to task + /// switching overhead. + Sleep(Duration), + /// Return from a coroutine - the thread ends and should be joined. + Ret(Vec), + /// Request to join a coroutine/thread + Join(Value), +} diff --git a/crsn/src/module/mod.rs b/crsn/src/module/mod.rs index 661aed5..d3212d8 100644 --- a/crsn/src/module/mod.rs +++ b/crsn/src/module/mod.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; -pub use eval_res::EvalRes; +pub use eval_res::*; use sexp::{Sexp, SourcePosition}; use crate::asm::data::literal::Value; diff --git a/crsn/src/runtime/exec.rs b/crsn/src/runtime/exec.rs index 4763823..4dd5f16 100644 --- a/crsn/src/runtime/exec.rs +++ b/crsn/src/runtime/exec.rs @@ -8,9 +8,9 @@ impl RunThread { let state = &mut self.state; let info = &self.info; - let op = info.program.fetch_instr(state.frame.pc); + let op = info.program.fetch_instr(state.cr.frame.pc); - trace!("### {:04} : {:?}", state.frame.pc.0, op); + trace!("### {:04} : {:?}", state.cr.frame.pc.0, op); op.execute(info, state) } diff --git a/crsn/src/runtime/fault.rs b/crsn/src/runtime/fault.rs index 97918a2..4903ec2 100644 --- a/crsn/src/runtime/fault.rs +++ b/crsn/src/runtime/fault.rs @@ -26,6 +26,9 @@ pub enum Fault { #[error("Program ended.")] Halt, + #[error("Instruction did not finish in time for context switch, retry later")] + Blocked, + #[error("Operation not allowed: {0}")] NotAllowed(Cow<'static, str>), diff --git a/crsn/src/runtime/frame.rs b/crsn/src/runtime/frame.rs index b71e79f..e7090b6 100644 --- a/crsn/src/runtime/frame.rs +++ b/crsn/src/runtime/frame.rs @@ -33,10 +33,14 @@ impl StackFrame { sf } - #[inline(always)] pub fn set_retvals(&mut self, vals: &[Value]) { for n in 0..(vals.len().min(REG_COUNT)) { self.res[n] = vals[n]; } + + // Zero the rest + for n in vals.len()..REG_COUNT { + self.res[n] = 0; + } } } diff --git a/crsn/src/runtime/run_thread.rs b/crsn/src/runtime/run_thread.rs index cb04b63..9c6d3d7 100644 --- a/crsn/src/runtime/run_thread.rs +++ b/crsn/src/runtime/run_thread.rs @@ -1,16 +1,19 @@ use std::sync::Arc; use std::thread::JoinHandle; -use std::time::{Duration}; +use std::time::{Duration, Instant}; pub use info::ThreadInfo; pub use state::RunState; use crate::asm::data::literal::Addr; -use crate::module::{EvalRes, CrsnUniq}; +use crate::module::{EvalRes, CrsnUniq, SchedSignal}; use crate::runtime::fault::Fault; use crate::runtime::frame::{StackFrame, REG_COUNT}; use crate::runtime::program::Program; +use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState}; +use std::{mem, thread}; +use crate::asm::instr::cond::Flag; #[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)] pub struct ThreadToken(pub u32); @@ -29,6 +32,7 @@ pub struct ThreadParams<'a> { pub program: Arc, pub pc: Addr, pub cycle_time: Duration, + pub scheduler_interval: Duration, pub args: &'a [u64], } @@ -41,15 +45,22 @@ impl RunThread { uniq: params.uniq, program: params.program, cycle_time: params.cycle_time, + scheduler_interval: params.scheduler_interval, extensions, }); let rs = RunState { thread_info: ti.clone(), - frame: StackFrame::new(params.pc, params.args), - call_stack: vec![], + cr: CoroutineContext { + handle: 0, // this is the root + frame: StackFrame::new(params.pc, params.args), + call_stack: vec![], + cr_state: Default::default(), + }, + parked: Default::default(), global_regs: [0; REG_COUNT], ext_data: Default::default(), + cr_deadline: None }; Self { @@ -72,14 +83,69 @@ impl RunThread { loop_helper.loop_start(); 'run: loop { + let mut want_switch = false; match self.eval_op() { - Ok(EvalRes { cycles, advance }) => { + Ok(EvalRes { cycles, advance, sched }) => { for _ in 0..cycles { loop_helper.loop_sleep(); loop_helper.loop_start(); } - trace!("Step {}; Status = {}", advance, self.state.frame.status); - self.state.frame.pc.advance(advance); + trace!("Step {}; Status = {}", advance, self.state.cr.frame.status); + self.state.cr.frame.pc.advance(advance); + + if let Some(dl) = self.state.cr_deadline { + want_switch = dl <= Instant::now(); + } + + match sched { + SchedSignal::Normal => {} + SchedSignal::Yield(None) => { + self.state.cr.cr_state = CoroutineState::Ready; + want_switch = true; + } + SchedSignal::Yield(Some(value)) => { + self.state.cr.cr_state = CoroutineState::YieldedValue(value); + want_switch = true; + } + SchedSignal::Sleep(time) => { + self.state.cr.cr_state = CoroutineState::Sleep { due: Instant::now() + time }; + want_switch = true; + } + SchedSignal::Ret(results) => { + self.state.cr.cr_state = CoroutineState::Finished(results); + want_switch = true; + // TODO prioritize a thread that is waiting for this return value + } + SchedSignal::Join(handle) => { + let mut found = false; + 'find: for (n, th) in self.state.parked.iter_mut().enumerate() { + if th.handle == handle { + if let CoroutineState::Finished(_) = &th.cr_state { + let mut crs = mem::replace(&mut th.cr_state, CoroutineState::Ready); + self.state.parked.remove(n); // delete it + + if let CoroutineState::Finished(vals) = crs { + self.state.cr.frame.set_retvals(&vals); + found = true; + break 'find; + } else { + unreachable!(); + } + } + } + } + + if !found { + self.state.set_flag(Flag::Invalid, true); + warn!("Join with invalid thread handle!"); + self.state.cr.frame.set_retvals(&[]); + } + } + } + } + Err(Fault::Blocked) => { + // Do not advance, the last instruction will be re-tried + want_switch = true; } Err(Fault::Halt) => { // TODO implement coordinated shutdown when more threads are running! @@ -91,6 +157,68 @@ impl RunThread { break 'run; } } + + if want_switch { + trace!("Switch requested"); + + let now = Instant::now(); + + let mut candidate = None; + let mut closest_due = None; + for _ in 0..self.state.parked.len() { + if let Some(mut rt) = self.state.parked.pop_front() { + match rt.cr_state { + CoroutineState::Ready => { + candidate = Some(rt); + break; + } + CoroutineState::Sleep { due } => { + if due <= now { + rt.cr_state = CoroutineState::Ready; + candidate = Some(rt); + break; + } else { + match closest_due { + Some(d) => { + if d > due { + closest_due = Some(due); + } + }, + None => { + closest_due = Some(due); + } + } + self.state.parked.push_back(rt); + } + } + _ => { + self.state.parked.push_back(rt); + } + } + } + } + + if let Some(cr) = candidate { + trace!("Context switch to {:?}", cr); + + // Do switch + let old = mem::replace(&mut self.state.cr, cr); + self.state.parked.push_back(old); + self.state.cr_deadline = Some(now + self.info.scheduler_interval); + } else if let Some(due) = closest_due { + let time = due.saturating_duration_since(now); + trace!("No thread to switch to, sleep {:?}", time); + thread::sleep(time); + } + + let n_alive = self.state.parked.iter() + .filter(|p| p.cr_state.is_alive()).count(); + + if n_alive <= 1 { + trace!("Stop task switching, no parked threads are alive"); + self.state.cr_deadline = None; + } + } } debug!("Thread ended."); diff --git a/crsn/src/runtime/run_thread/info.rs b/crsn/src/runtime/run_thread/info.rs index 7e2ed08..ac976e5 100644 --- a/crsn/src/runtime/run_thread/info.rs +++ b/crsn/src/runtime/run_thread/info.rs @@ -17,6 +17,8 @@ pub struct ThreadInfo { pub program: Arc, /// Program to run pub(crate) cycle_time: Duration, + /// Interval one thread/coroutine is let to run before the context switches + pub(crate) scheduler_interval: Duration, /// Extensions pub extensions: Arc>>, } diff --git a/crsn/src/runtime/run_thread/state.rs b/crsn/src/runtime/run_thread/state.rs index 48133e5..fc4f055 100644 --- a/crsn/src/runtime/run_thread/state.rs +++ b/crsn/src/runtime/run_thread/state.rs @@ -1,5 +1,5 @@ use std::any::{Any, TypeId}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use crate::asm::data::{Rd, RdData, RdObj, Register, Wr, WrData}; use crate::asm::data::literal::{Addr, Value}; @@ -12,34 +12,95 @@ use nudge::{likely}; use crate::asm::instr::cond::Flag; use std::fmt::{Debug, Formatter}; use std::fmt; +use std::time::Instant; pub struct RunState { pub thread_info: Arc, - /// Active stack frame - pub frame: StackFrame, - /// Call stack - pub call_stack: CallStack, + /// The active coroutine + pub cr: CoroutineContext, + /// Parked coroutines + pub parked: VecDeque, /// General purpose registers that stay valid for the entire run-time of the thread pub global_regs: [Value; REG_COUNT], /// Extension data pub ext_data: ExtensionDataStore, + /// Execution deadline, if multi-tasked + pub cr_deadline: Option, +} + +#[derive(Debug,Default)] +pub struct CoroutineContext { + pub handle: Value, + /// Active stack frame + pub frame: StackFrame, + /// Call stack + pub call_stack: CallStack, + /// Coroutine run state + pub cr_state: CoroutineState, +} + +/// Execution state of an inactive coroutine +#[derive(Debug, Eq, PartialEq)] +pub enum CoroutineState { + /// Ready to run, just started, or interrupted by the scheduler + Ready, + /// Delay in progress + Sleep { due: Instant }, + /// The task finished + Finished(Vec), + /// The task yielded a value that must be consumed before it can resume. + /// State switches to Ready when the value is read. + YieldedValue(Value), +} + +impl CoroutineState { + pub fn is_alive(&self) -> bool { + match self { + CoroutineState::Ready => true, + CoroutineState::Sleep { .. } => true, + CoroutineState::Finished(_) => false, + CoroutineState::YieldedValue(_) => true, + } + } +} + +impl Default for CoroutineState { + fn default() -> Self { + Self::Ready + } } impl RunState { /// Clear everything. Caution: when done at runtime, this effectively reboots the thread pub fn clear_all(&mut self) { - self.frame = Default::default(); - self.call_stack = Default::default(); + self.cr = Default::default(); + self.parked = Default::default(); self.global_regs = Default::default(); self.ext_data = Default::default(); } + + /// Add a coroutine, marked as Ready, to run next + pub fn add_coroutine(&mut self, frame : StackFrame) -> Value { + let handle = self.thread_info.unique_handle(); + self.parked.push_back(CoroutineContext { + handle, + frame, + call_stack: vec![], + cr_state: Default::default() + }); + if self.cr_deadline.is_none() { + // start context switching + self.cr_deadline = Some(Instant::now() + self.thread_info.scheduler_interval); + } + handle + } } impl Debug for RunState { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("RunState") - .field("frame", &self.frame) - .field("call_stack", &self.call_stack) + .field("cr", &self.cr) + .field("parked", &self.parked) .field("global_regs", &self.global_regs) //.field("ext_data", &self.ext_data) .finish() @@ -79,44 +140,44 @@ impl RunState { #[inline(always)] pub fn set_flag(&mut self, flag: Flag, set: bool) { trace!("Flag {} = {:?}", flag, set); - self.frame.status.set(flag, set); + self.cr.frame.status.set(flag, set); } /// Check status flags for a condition #[inline(always)] pub fn test_cond(&self, cond: Cond) -> bool { - self.frame.status.test(cond) + self.cr.frame.status.test(cond) } /// Set program counter - address of the next instruction to run pub fn set_pc(&mut self, pc: Addr) { trace!("PC := {}", pc); - self.frame.pc = pc; + self.cr.frame.pc = pc; } /// Get program counter - address of the next instruction to run pub fn get_pc(&self) -> Addr { - self.frame.pc + self.cr.frame.pc } /// Clear status flags #[inline(always)] pub fn clear_status(&mut self) { - self.frame.status.clear(); + self.cr.frame.status.clear(); } /// Update status flags using a variable. /// The update is additive - call `clear_status()` first if desired! #[inline(always)] pub fn update_status(&mut self, val: Value) { - self.frame.status.update(val); + self.cr.frame.status.update(val); } /// Update status flags using a variable. /// The update is additive - call `clear_status()` first if desired! #[inline(always)] pub fn update_status_float(&mut self, val: f64) { - self.frame.status.update_float(val); + self.cr.frame.status.update_float(val); } /// Read object handle value @@ -133,8 +194,8 @@ impl RunState { RdData::Immediate(v) => Ok(v), RdData::Register(Register::Gen(rn)) => { if likely(rn < REG_COUNT as u8) { - trace!("Rd {:?} = {}", rd, self.frame.gen[rn as usize]); - Ok(self.frame.gen[rn as usize]) + trace!("Rd {:?} = {}", rd, self.cr.frame.gen[rn as usize]); + Ok(self.cr.frame.gen[rn as usize]) } else { Err(Fault::RegisterNotExist { reg: Register::Gen(rn) }) } @@ -149,16 +210,16 @@ impl RunState { } RdData::Register(Register::Arg(rn)) => { if likely(rn < REG_COUNT as u8) { - trace!("Rd {:?} = {}", rd, self.frame.arg[rn as usize]); - Ok(self.frame.arg[rn as usize]) + trace!("Rd {:?} = {}", rd, self.cr.frame.arg[rn as usize]); + Ok(self.cr.frame.arg[rn as usize]) } else { Err(Fault::RegisterNotExist { reg: Register::Arg(rn) }) } } RdData::Register(Register::Res(rn)) => { if likely(rn < REG_COUNT as u8) { - trace!("Rd {:?} = {}", rd, self.frame.res[rn as usize]); - Ok(self.frame.res[rn as usize]) + trace!("Rd {:?} = {}", rd, self.cr.frame.res[rn as usize]); + Ok(self.cr.frame.res[rn as usize]) } else { Err(Fault::RegisterNotExist { reg: Register::Res(rn) }) // TODO use match after @ when stabilized https://github.com/rust-lang/rust/issues/65490 } @@ -174,6 +235,26 @@ impl RunState { } fn read_object(&mut self, reference: Value) -> Result { + // values yielded from coroutines + for cr in &mut self.parked { + if cr.handle == reference { + match cr.cr_state { + CoroutineState::Ready | CoroutineState::Sleep { .. } => { + return Err(Fault::Blocked); + } + CoroutineState::Finished(_) => { + self.set_flag(Flag::Eof, true); + warn!("Attempt to read yielded value of a finished coroutine"); + return Ok(0); + } + CoroutineState::YieldedValue(v) => { + cr.cr_state = CoroutineState::Ready; + return Ok(v); + } + } + } + } + // This is a shitty dirty hack to allow iterating over the extensions while passing a mutable reference // to self to the reading methods. Since the extensions array is in an Arc, it can't be mutated internally // anyway, and we know it will still live after the method returns - unless someone does something incredibly stupid. @@ -213,7 +294,7 @@ impl RunState { match wr.0 { WrData::Register(Register::Gen(rn)) => { if likely(rn < REG_COUNT as u8) { - self.frame.gen[rn as usize] = val; + self.cr.frame.gen[rn as usize] = val; Ok(()) } else { Err(Fault::RegisterNotExist { reg: Register::Gen(rn) }) diff --git a/launcher/src/main.rs b/launcher/src/main.rs index 13f441a..bee91c4 100644 --- a/launcher/src/main.rs +++ b/launcher/src/main.rs @@ -38,6 +38,8 @@ struct Config { assemble_debug: bool, #[serde(with = "serde_duration_millis")] cycle_time: Duration, + #[serde(with = "serde_duration_millis")] + switch_time: Duration, } impl Default for Config { @@ -51,6 +53,7 @@ impl Default for Config { assemble_only: false, assemble_debug: false, cycle_time: Duration::default(), + switch_time: Duration::from_millis(10), } } } @@ -177,6 +180,7 @@ fn main() -> anyhow::Result<()> { program: parsed, pc: Addr(0), cycle_time: config.cycle_time, + scheduler_interval: config.switch_time, args }); From c8d01b6151f77b62494d1d0147e3f3ab268d2bab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Sat, 31 Oct 2020 17:55:23 +0100 Subject: [PATCH 2/4] coroutine fixes, first example --- crsn/src/runtime/run_thread.rs | 16 +++++++++--- crsn/src/runtime/run_thread/state.rs | 5 +++- examples/coroutines1.csn | 38 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) create mode 100644 examples/coroutines1.csn diff --git a/crsn/src/runtime/run_thread.rs b/crsn/src/runtime/run_thread.rs index 9c6d3d7..fb58990 100644 --- a/crsn/src/runtime/run_thread.rs +++ b/crsn/src/runtime/run_thread.rs @@ -91,6 +91,7 @@ impl RunThread { loop_helper.loop_start(); } trace!("Step {}; Status = {}", advance, self.state.cr.frame.status); + let orig_pc = self.state.cr.frame.pc; self.state.cr.frame.pc.advance(advance); if let Some(dl) = self.state.cr_deadline { @@ -100,14 +101,17 @@ impl RunThread { match sched { SchedSignal::Normal => {} SchedSignal::Yield(None) => { + trace!("yield"); self.state.cr.cr_state = CoroutineState::Ready; want_switch = true; } SchedSignal::Yield(Some(value)) => { + trace!("yield {}", value); self.state.cr.cr_state = CoroutineState::YieldedValue(value); want_switch = true; } SchedSignal::Sleep(time) => { + trace!("sleep {:?}", time); self.state.cr.cr_state = CoroutineState::Sleep { due: Instant::now() + time }; want_switch = true; } @@ -117,27 +121,33 @@ impl RunThread { // TODO prioritize a thread that is waiting for this return value } SchedSignal::Join(handle) => { + trace!("Join cr {:#}", handle); let mut found = false; 'find: for (n, th) in self.state.parked.iter_mut().enumerate() { if th.handle == handle { if let CoroutineState::Finished(_) = &th.cr_state { let mut crs = mem::replace(&mut th.cr_state, CoroutineState::Ready); self.state.parked.remove(n); // delete it + found = true; if let CoroutineState::Finished(vals) = crs { self.state.cr.frame.set_retvals(&vals); - found = true; break 'find; } else { unreachable!(); } + } else { + self.state.cr.frame.pc = orig_pc; // Retry + self.state.cr.cr_state = CoroutineState::Ready; + want_switch = true; + found = true; } } } if !found { self.state.set_flag(Flag::Invalid, true); - warn!("Join with invalid thread handle!"); + warn!("Join with invalid thread handle {:#}!", handle); self.state.cr.frame.set_retvals(&[]); } } @@ -214,7 +224,7 @@ impl RunThread { let n_alive = self.state.parked.iter() .filter(|p| p.cr_state.is_alive()).count(); - if n_alive <= 1 { + if n_alive == 0 { trace!("Stop task switching, no parked threads are alive"); self.state.cr_deadline = None; } diff --git a/crsn/src/runtime/run_thread/state.rs b/crsn/src/runtime/run_thread/state.rs index fc4f055..5b3ffc2 100644 --- a/crsn/src/runtime/run_thread/state.rs +++ b/crsn/src/runtime/run_thread/state.rs @@ -82,7 +82,10 @@ impl RunState { /// Add a coroutine, marked as Ready, to run next pub fn add_coroutine(&mut self, frame : StackFrame) -> Value { let handle = self.thread_info.unique_handle(); - self.parked.push_back(CoroutineContext { + trace!("Spawn cr {:#}", handle); + + // front - so it runs ASAP + self.parked.push_front(CoroutineContext { handle, frame, call_stack: vec![], diff --git a/examples/coroutines1.csn b/examples/coroutines1.csn new file mode 100644 index 0000000..e550114 --- /dev/null +++ b/examples/coroutines1.csn @@ -0,0 +1,38 @@ +( + (lds @cout "main\n") + (msleep 500) + (lds @cout "main\n") + (msleep 500) + + (lds @cout "Spawnign bg\n") + + (spawn r15 bg 10) + + (msleep 500) + (lds @cout "FG\n") + (msleep 500) + (lds @cout "FG\n") + (msleep 500) + (lds @cout "FG\n") + + (msleep 1000) + + (lds @cout "Wait for BG\n") + (join @r15) + (lds @cout "Joined\n") + (msleep 500) + (lds @cout "main\n") + (msleep 500) + (lds @cout "main\n") + (msleep 500) + + (proc bg times + (ld r0 times) + (:x) + (msleep 500) + (lds @cout "***BG\n") + (dec r0 (nz? (j :x))) + (lds @cout "***BG done.\n") + (ret) + ) +) From af52270a29cacd56a3a915239e906927ed85774d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Sat, 31 Oct 2020 18:17:50 +0100 Subject: [PATCH 3/4] add generator example --- examples/coroutines2.csn | 44 ++++++++++++++++++++++++++++++++++++++++ examples/itoa.csn | 27 +++++++++++++++++++----- 2 files changed, 66 insertions(+), 5 deletions(-) create mode 100644 examples/coroutines2.csn diff --git a/examples/coroutines2.csn b/examples/coroutines2.csn new file mode 100644 index 0000000..c1d32f6 --- /dev/null +++ b/examples/coroutines2.csn @@ -0,0 +1,44 @@ +( + ; using a coroutine as a generator + + (spawn r15 fibo) + + (ld r0 20) + (:next) + + (ld r1 @r15) ; Read a yielded value + (call printnum r1) + (lds @cout ", ") + + (dec r0) + (j.nz :next) + (ld @cout '\n') + (halt) + + (proc fibo + (ld r0 0) + (ld r1 1) + (:x) + (yield r1) + (add r2 r0 r1) + (ld r0 r1) + (ld r1 r2) + (j :x) + ) + + (proc printnum num + (mkbf r15) + (ld r1 num) + (tst r1 (<0? (mul r1 -1))) + (:next) + (mod r0 r1 10) + (add r0 '0') + (bfrpush @r15 r0) + (div r1 10 (z? + (tst num (<0? (bfrpush @r15 '-'))) + (lds @cout @r15) + (del @r15) + (ret))) + (j :next) + ) +) diff --git a/examples/itoa.csn b/examples/itoa.csn index b5f63ef..66c58ba 100644 --- a/examples/itoa.csn +++ b/examples/itoa.csn @@ -1,24 +1,41 @@ ( ; This is an example implementation of itoa using a buffer of characters (ld r0 -123456789) - + (mkbf r1) (call itoa r1 r0) - + ; print it (:pn) (bfrpop @cout @r1 (em? (ld @cout '\n') (halt))) (j :pn) (halt) - - (proc itoa buf num + + (proc itoa buf num (ld r1 num) (tst r1 (<0? (mul r1 -1))) (:next) (mod r0 r1 10) (add r0 '0') (bfrpush @buf r0) - (div r1 10 (z? + (div r1 10 (z? (tst num (<0? (bfrpush @buf '-'))) (ret))) (j :next) ) + + ; other version that prints it + (proc printnum num + (mkbf r15) + (ld r1 num) + (tst r1 (<0? (mul r1 -1))) + (:next) + (mod r0 r1 10) + (add r0 '0') + (bfrpush @r15 r0) + (div r1 10 (z? + (tst num (<0? (bfrpush @r15 '-'))) + (lds @cout @r15) + (del @r15) + (ret))) + (j :next) + ) ) From 5a0e3705895868944607eb66a84768d50c4a3daf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Sat, 31 Oct 2020 18:47:10 +0100 Subject: [PATCH 4/4] document coroutines --- README.md | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2358805..afb7f58 100644 --- a/README.md +++ b/README.md @@ -286,7 +286,77 @@ It can also be written like this: ... ) ``` - + +## Coroutines + +Croissant implements something that could be called "pre-emptive coroutines". They do not provide any performance gain, +but add asynchronicity to the program, and can work as generators! + +There is no true parallelism, it is difficult to implement safely and efficiently with a global state. + +### Spawning + +*Any procedure can be used as a coroutine.* + +A coroutine is created using the `spawn` instruction, which produces an object handle. + +``` +(spawn r0 do_stuff 1 2 3) +``` + +At this point, the program is evenly divided between the original and the coroutine "thread". + +The spawned coroutine is scheduled to run immediately after being spawned. + +### Task switching + +Coroutines take turns to execute the program. The scheduling interval can be configured. + +Control can be given up using the `yield` instruction; for example, when waiting for a mutex. This happens automatically when +a `sleep` instruction is invoked. + +### Race conditions + +Take care when working with objects, resources and global registers: you can get race conditions +with coroutines. Use atomic instructions (`cas`, `casXX`, `bfcas`…) to implement synchronization. + +The `casXX` instruction is very powerful: you can use one bit of a register as a mutex and the rest of it to store some useful data. +You can also use one register for up to 64 mutexes. + +Remember to only use global registers (or buffer items) as mutexes: `g0`-`g15`. Each coroutine has its own set of *regular* registers. + +### Using coroutines as generators + +A coroutine can "yield a value" by invoking the `yield` instruction with an operand. This can be done any number of times. + +``` +(yield r0) +``` + +The coroutine is blocked until the value is consumed by someone. To consume a yielded value, read the coroutine object handle: + +``` +(spawn r5 foo) + +(ld r0 @r5) ; read a yielded value +``` + +### Joining a coroutine + +Use the `join` instruction with a coroutine object handle to wait for its completion. + +A coroutine completes by calling `ret` at its top level. This naturally means that a coroutine can return values! + +The returned values are placed in the result registers, just like with the `call` instruction. + +``` +(spawn r5 foo) +; ... + +(join @r0) +; res0-res15 now contain return values +``` + # Instruction Set Crsn instruction set is composed of extensions.