wip coroutines and scheduler

coroutines
Ondřej Hruška 4 years ago
parent d0d4f1c15e
commit 8659240d01
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 3
      crsn/src/asm/instr/op.rs
  2. 8
      crsn/src/asm/mod.rs
  3. 2
      crsn/src/asm/parse/parse_data.rs
  4. 9
      crsn/src/builtin/defs.rs
  5. 55
      crsn/src/builtin/exec.rs
  6. 65
      crsn/src/builtin/parse.rs
  7. 22
      crsn/src/module/eval_res.rs
  8. 2
      crsn/src/module/mod.rs
  9. 4
      crsn/src/runtime/exec.rs
  10. 3
      crsn/src/runtime/fault.rs
  11. 6
      crsn/src/runtime/frame.rs
  12. 138
      crsn/src/runtime/run_thread.rs
  13. 2
      crsn/src/runtime/run_thread/info.rs
  14. 127
      crsn/src/runtime/run_thread/state.rs
  15. 4
      launcher/src/main.rs

@ -4,7 +4,7 @@ use sexp::{Atom, Sexp, SourcePosition};
use crate::asm::instr::Cond; use crate::asm::instr::Cond;
use crate::builtin::defs::BuiltinOp; use crate::builtin::defs::BuiltinOp;
use crate::module::{EvalRes, OpTrait}; use crate::module::{EvalRes, OpTrait, SchedSignal};
use crate::runtime::fault::Fault; use crate::runtime::fault::Fault;
use crate::runtime::run_thread::{info::ThreadInfo, state::RunState}; use crate::runtime::run_thread::{info::ThreadInfo, state::RunState};
@ -30,6 +30,7 @@ impl OpTrait for Op {
return Ok(EvalRes { return Ok(EvalRes {
cycles: 0, cycles: 0,
advance: 1, advance: 1,
sched: SchedSignal::Normal
}); });
} }
} }

@ -42,6 +42,7 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec<Box<dyn CrsnExt
uniq: Default::default(), uniq: Default::default(),
program: Program::new(vec![], parsers_arc.clone()).unwrap(), program: Program::new(vec![], parsers_arc.clone()).unwrap(),
cycle_time: Default::default(), cycle_time: Default::default(),
scheduler_interval: Default::default(),
extensions: parsers_arc.clone(), extensions: parsers_arc.clone(),
}); });
@ -57,10 +58,11 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec<Box<dyn CrsnExt
// This allows to evaluate nearly all instructions at compile time. // This allows to evaluate nearly all instructions at compile time.
const_eval: RunState { const_eval: RunState {
thread_info: ti.clone(), thread_info: ti.clone(),
frame: Default::default(), cr: Default::default(),
call_stack: vec![], parked: Default::default(),
global_regs: [0; REG_COUNT], global_regs: [0; REG_COUNT],
ext_data: Default::default() ext_data: Default::default(),
cr_deadline: None
}, },
const_eval_ti: ti, const_eval_ti: ti,
parsing_expr: false parsing_expr: false

@ -139,7 +139,7 @@ pub fn parse_data_disp(tok: Sexp, pcx: &ParserContext) -> Result<DataDisp, CrsnE
state_mut.parsing_expr = false; state_mut.parsing_expr = false;
} }
return Ok(DataDisp::Immediate(state_mut.const_eval.frame.gen[0])); return Ok(DataDisp::Immediate(state_mut.const_eval.cr.frame.gen[0]));
} }
Err(CrsnError::Parse("List not expected here".into(), pos)) Err(CrsnError::Parse("List not expected here".into(), pos))

@ -136,7 +136,9 @@ pub enum BuiltinOp {
FarJump(Label), FarJump(Label),
/// Call a routine with arguments. /// Call a routine with arguments.
/// The arguments are passed as argX. Return values are stored in resX registers. /// The arguments are passed as argX. Return values are stored in resX registers.
Call(RoutineName, Vec<Rd>), Call { proc: RoutineName, args: Vec<Rd> },
/// Spawn a coroutine. The invocation is similar to (call).
Spawn { handle: Wr, proc: RoutineName, args: Vec<Rd> },
/// Exit the current routine with return values /// Exit the current routine with return values
Ret(Vec<Rd>), Ret(Vec<Rd>),
/// Mark a routine entry point (call target). /// Mark a routine entry point (call target).
@ -144,6 +146,11 @@ pub enum BuiltinOp {
Routine(RoutineName), Routine(RoutineName),
/// Skip backward or forward. The skip count can be defined by an argument. /// Skip backward or forward. The skip count can be defined by an argument.
Skip(Rd), Skip(Rd),
/// Join a coroutine
Join(RdObj),
/// Yield control, optionally yielding a value that must be consumed (by reading the task handle)
/// before execution can resume
Yield { value: Option<Rd> },
/// Deny jumps, skips and run across this address, producing a run-time fault. /// Deny jumps, skips and run across this address, producing a run-time fault.
Barrier { Barrier {
kind: Barrier, kind: Barrier,

@ -4,7 +4,7 @@ use sexp::Sexp;
use crate::asm::data::literal::{Addr, Value}; use crate::asm::data::literal::{Addr, Value};
use crate::builtin::defs::{Barrier, BuiltinOp, LdsValue}; use crate::builtin::defs::{Barrier, BuiltinOp, LdsValue};
use crate::module::{EvalRes, OpTrait}; use crate::module::{EvalRes, OpTrait, SchedSignal};
use crate::runtime::fault::Fault; use crate::runtime::fault::Fault;
use crate::runtime::frame::StackFrame; use crate::runtime::frame::StackFrame;
use crate::runtime::run_thread::{state::RunState, ThreadInfo}; use crate::runtime::run_thread::{state::RunState, ThreadInfo};
@ -63,7 +63,7 @@ impl OpTrait for BuiltinOp {
BuiltinOp::Jump(_name) => { BuiltinOp::Jump(_name) => {
panic!("jump not translated to skip by assembler!"); panic!("jump not translated to skip by assembler!");
} }
BuiltinOp::Call(name, args) => { BuiltinOp::Call { proc: name, args } => {
match program.find_routine(&name) { match program.find_routine(&name) {
Ok(pos) => { Ok(pos) => {
let mut values = Vec::with_capacity(args.len()); let mut values = Vec::with_capacity(args.len());
@ -72,8 +72,8 @@ impl OpTrait for BuiltinOp {
} }
let mut frame2 = StackFrame::new(pos, &values); let mut frame2 = StackFrame::new(pos, &values);
std::mem::swap(&mut state.frame, &mut frame2); std::mem::swap(&mut state.cr.frame, &mut frame2);
state.call_stack.push(frame2); state.cr.call_stack.push(frame2);
res.advance = 0; res.advance = 0;
} }
Err(e) => { Err(e) => {
@ -81,18 +81,40 @@ impl OpTrait for BuiltinOp {
} }
} }
} }
BuiltinOp::Spawn { handle, proc, args } => {
match program.find_routine(&proc) {
Ok(pos) => {
let mut values = Vec::with_capacity(args.len());
for arg in args {
values.push(state.read(arg)?);
}
let mut frame2 = StackFrame::new(pos, &values);
let handle_val = state.add_coroutine(frame2);
state.write(handle, handle_val)?;
// Yield execution so the new thread can start running
res.sched = SchedSignal::Yield(None);
}
Err(e) => {
return Err(e);
}
}
}
BuiltinOp::Ret(retvals) => { BuiltinOp::Ret(retvals) => {
match state.call_stack.pop() {
Some(previous) => {
let mut values = Vec::with_capacity(retvals.len()); let mut values = Vec::with_capacity(retvals.len());
for arg in retvals { for arg in retvals {
values.push(state.read(arg)?); values.push(state.read(arg)?);
} }
state.frame = previous;
state.frame.set_retvals(&values); match state.cr.call_stack.pop() {
Some(previous) => {
state.cr.frame = previous;
state.cr.frame.set_retvals(&values);
} }
None => { None => {
return Err(Fault::CallStackUnderflow); // Stack underflow - if this is a coroutine, it's finished
res.sched = SchedSignal::Ret(values);
} }
} }
} }
@ -196,15 +218,15 @@ impl OpTrait for BuiltinOp {
} }
} }
BuiltinOp::StoreFlags { dst } => { BuiltinOp::StoreFlags { dst } => {
let packed = state.frame.status.store(); let packed = state.cr.frame.status.store();
state.write(dst, packed)?; state.write(dst, packed)?;
} }
BuiltinOp::LoadFlags { src } => { BuiltinOp::LoadFlags { src } => {
let x = state.read(src)?; let x = state.read(src)?;
state.frame.status.load(x); state.cr.frame.status.load(x);
} }
BuiltinOp::Sleep { count: micros, unit_us } => { BuiltinOp::Sleep { count: micros, unit_us } => {
std::thread::sleep(Duration::from_micros(state.read(micros)? * unit_us.micros())) res.sched = SchedSignal::Sleep(Duration::from_micros(state.read(micros)? * unit_us.micros()));
} }
BuiltinOp::Delete(obj) => { BuiltinOp::Delete(obj) => {
let hv = obj.read(state)?; let hv = obj.read(state)?;
@ -221,6 +243,15 @@ impl OpTrait for BuiltinOp {
state.set_flag(Flag::Invalid, true); state.set_flag(Flag::Invalid, true);
} }
} }
BuiltinOp::Yield { value } => {
res.sched = SchedSignal::Yield(match value {
None => None,
Some(rd) => Some(state.read(rd)?)
});
}
BuiltinOp::Join(obj) => {
res.sched = SchedSignal::Join(obj.read(state)?);
}
} }
Ok(res) Ok(res)

@ -1,4 +1,4 @@
use sexp::{Atom, Sexp, SourcePosition, atom_qs}; use sexp::{Atom, Sexp, SourcePosition, atom_qs, atom_s};
use crate::asm::data::literal::{RoutineName}; use crate::asm::data::literal::{RoutineName};
use crate::asm::data::reg::parse_reg; use crate::asm::data::reg::parse_reg;
@ -25,21 +25,21 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
BuiltinOp::Halt BuiltinOp::Halt
} }
"uslp" => { "uslp" | "usleep" => {
BuiltinOp::Sleep { BuiltinOp::Sleep {
count: args.next_rd()?, count: args.next_rd()?,
unit_us: SleepUnit::Usec, unit_us: SleepUnit::Usec,
} }
} }
"mslp" => { "mslp" | "msleep" => {
BuiltinOp::Sleep { BuiltinOp::Sleep {
count: args.next_rd()?, count: args.next_rd()?,
unit_us: SleepUnit::Msec, unit_us: SleepUnit::Msec,
} }
} }
"sslp" => { "sslp" | "ssleep" => {
BuiltinOp::Sleep { BuiltinOp::Sleep {
count: args.next_rd()?, count: args.next_rd()?,
unit_us: SleepUnit::Sec, unit_us: SleepUnit::Sec,
@ -132,7 +132,7 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
for t in args { for t in args {
call_args.push(parse_rd(t, pcx)?); call_args.push(parse_rd(t, pcx)?);
} }
BuiltinOp::Call(dest, call_args) BuiltinOp::Call { proc: dest, args: call_args }
} }
"ret" => { "ret" => {
@ -258,6 +258,38 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
} }
} }
"spawn" => {
let handle = args.next_wr()?;
let dest = RoutineName { name: args.next_string()?.0, arity: args.len() as u8 };
let mut call_args = vec![];
for t in args {
call_args.push(parse_rd(t, pcx)?);
}
BuiltinOp::Spawn {
handle,
proc: dest,
args: call_args
}
}
"join" => {
BuiltinOp::Join(args.next_rdobj()?)
}
"yield" => {
if args.have_more() {
BuiltinOp::Yield {
value: Some(args.next_rd()?),
}
} else {
BuiltinOp::Yield {
value: None,
}
}
}
"del" => { "del" => {
BuiltinOp::Delete(args.next_rdobj()?) BuiltinOp::Delete(args.next_rdobj()?)
} }
@ -326,7 +358,7 @@ pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp {
BuiltinOp::Jump(label) => sexp::list(&[A("j"), A(label)]), BuiltinOp::Jump(label) => sexp::list(&[A("j"), A(label)]),
BuiltinOp::FarLabel(label) => sexp::list(&[A("far"), A(label)]), BuiltinOp::FarLabel(label) => sexp::list(&[A("far"), A(label)]),
BuiltinOp::FarJump(label) => sexp::list(&[A("fj"), A(label)]), BuiltinOp::FarJump(label) => sexp::list(&[A("fj"), A(label)]),
BuiltinOp::Call(name, args) => { BuiltinOp::Call { proc: name, args } => {
if args.is_empty() { if args.is_empty() {
sexp::list(&[A("call"), A(&name.name)]) sexp::list(&[A("call"), A(&name.name)])
} else { } else {
@ -416,6 +448,22 @@ pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp {
} }
} }
} }
BuiltinOp::Spawn { handle, proc, args } => {
if args.is_empty() {
sexp::list(&[A("spawn"), A(handle), A(proc)])
} else {
let mut v = vec![A("spawn"), A(handle), A(proc)];
v.extend(args.iter().map(|r| A(r)));
sexp::list(&v)
}
}
BuiltinOp::Join(handle) => sexp::list(&[A("join"), A(handle)]),
BuiltinOp::Yield { value } => {
match value {
None => sexp::list(&[A("yield")]),
Some(rd) => sexp::list(&[A("yield"), A(rd)]),
}
}
} }
} }
@ -505,6 +553,11 @@ mod test {
("(far :label)", "(far :label)"), ("(far :label)", "(far :label)"),
("(del @r5)", "(del @r5)"), ("(del @r5)", "(del @r5)"),
("(sym cat r0)(del @cat)", "(del @r0)"), ("(sym cat r0)(del @cat)", "(del @r0)"),
("(spawn r0 foo 1 2 3)", "(spawn r0 foo 1 2 3)"),
("(yield)", "(yield)"),
("(yield -1)", "(yield -1)"),
("(yield r5)", "(yield r5)"),
("(join @r5)", "(join @r5)"),
]; ];
let parser = BuiltinOps::new(); let parser = BuiltinOps::new();

@ -1,3 +1,6 @@
use crate::asm::data::literal::Value;
use std::time::Duration;
/// Cycles spent executing an instruction /// Cycles spent executing an instruction
pub type CyclesSpent = usize; pub type CyclesSpent = usize;
@ -6,6 +9,7 @@ pub type CyclesSpent = usize;
pub struct EvalRes { pub struct EvalRes {
pub cycles: CyclesSpent, pub cycles: CyclesSpent,
pub advance: i64, pub advance: i64,
pub sched: SchedSignal,
} }
impl Default for EvalRes { impl Default for EvalRes {
@ -13,6 +17,24 @@ impl Default for EvalRes {
Self { Self {
cycles: 1, cycles: 1,
advance: 1, advance: 1,
sched: SchedSignal::Normal,
} }
} }
} }
/// Signal to the scheduler
#[derive(Debug)]
pub enum SchedSignal {
/// No signal, execution went normally.
Normal,
/// Yield control, optionally with a value.
/// If a value is yielded, it must be consumed through the handle before execution can resume.
Yield(Option<Value>),
/// The routine requests a delay in execution. The actual sleep time can be longer due to task
/// switching overhead.
Sleep(Duration),
/// Return from a coroutine - the thread ends and should be joined.
Ret(Vec<Value>),
/// Request to join a coroutine/thread
Join(Value),
}

@ -2,7 +2,7 @@
use std::fmt::Debug; use std::fmt::Debug;
pub use eval_res::EvalRes; pub use eval_res::*;
use sexp::{Sexp, SourcePosition}; use sexp::{Sexp, SourcePosition};
use crate::asm::data::literal::Value; use crate::asm::data::literal::Value;

@ -8,9 +8,9 @@ impl RunThread {
let state = &mut self.state; let state = &mut self.state;
let info = &self.info; let info = &self.info;
let op = info.program.fetch_instr(state.frame.pc); let op = info.program.fetch_instr(state.cr.frame.pc);
trace!("### {:04} : {:?}", state.frame.pc.0, op); trace!("### {:04} : {:?}", state.cr.frame.pc.0, op);
op.execute(info, state) op.execute(info, state)
} }

@ -26,6 +26,9 @@ pub enum Fault {
#[error("Program ended.")] #[error("Program ended.")]
Halt, Halt,
#[error("Instruction did not finish in time for context switch, retry later")]
Blocked,
#[error("Operation not allowed: {0}")] #[error("Operation not allowed: {0}")]
NotAllowed(Cow<'static, str>), NotAllowed(Cow<'static, str>),

@ -33,10 +33,14 @@ impl StackFrame {
sf sf
} }
#[inline(always)]
pub fn set_retvals(&mut self, vals: &[Value]) { pub fn set_retvals(&mut self, vals: &[Value]) {
for n in 0..(vals.len().min(REG_COUNT)) { for n in 0..(vals.len().min(REG_COUNT)) {
self.res[n] = vals[n]; self.res[n] = vals[n];
} }
// Zero the rest
for n in vals.len()..REG_COUNT {
self.res[n] = 0;
}
} }
} }

@ -1,16 +1,19 @@
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::{Duration}; use std::time::{Duration, Instant};
pub use info::ThreadInfo; pub use info::ThreadInfo;
pub use state::RunState; pub use state::RunState;
use crate::asm::data::literal::Addr; use crate::asm::data::literal::Addr;
use crate::module::{EvalRes, CrsnUniq}; use crate::module::{EvalRes, CrsnUniq, SchedSignal};
use crate::runtime::fault::Fault; use crate::runtime::fault::Fault;
use crate::runtime::frame::{StackFrame, REG_COUNT}; use crate::runtime::frame::{StackFrame, REG_COUNT};
use crate::runtime::program::Program; use crate::runtime::program::Program;
use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState};
use std::{mem, thread};
use crate::asm::instr::cond::Flag;
#[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)] #[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub struct ThreadToken(pub u32); pub struct ThreadToken(pub u32);
@ -29,6 +32,7 @@ pub struct ThreadParams<'a> {
pub program: Arc<Program>, pub program: Arc<Program>,
pub pc: Addr, pub pc: Addr,
pub cycle_time: Duration, pub cycle_time: Duration,
pub scheduler_interval: Duration,
pub args: &'a [u64], pub args: &'a [u64],
} }
@ -41,15 +45,22 @@ impl RunThread {
uniq: params.uniq, uniq: params.uniq,
program: params.program, program: params.program,
cycle_time: params.cycle_time, cycle_time: params.cycle_time,
scheduler_interval: params.scheduler_interval,
extensions, extensions,
}); });
let rs = RunState { let rs = RunState {
thread_info: ti.clone(), thread_info: ti.clone(),
cr: CoroutineContext {
handle: 0, // this is the root
frame: StackFrame::new(params.pc, params.args), frame: StackFrame::new(params.pc, params.args),
call_stack: vec![], call_stack: vec![],
cr_state: Default::default(),
},
parked: Default::default(),
global_regs: [0; REG_COUNT], global_regs: [0; REG_COUNT],
ext_data: Default::default(), ext_data: Default::default(),
cr_deadline: None
}; };
Self { Self {
@ -72,14 +83,69 @@ impl RunThread {
loop_helper.loop_start(); loop_helper.loop_start();
'run: loop { 'run: loop {
let mut want_switch = false;
match self.eval_op() { match self.eval_op() {
Ok(EvalRes { cycles, advance }) => { Ok(EvalRes { cycles, advance, sched }) => {
for _ in 0..cycles { for _ in 0..cycles {
loop_helper.loop_sleep(); loop_helper.loop_sleep();
loop_helper.loop_start(); loop_helper.loop_start();
} }
trace!("Step {}; Status = {}", advance, self.state.frame.status); trace!("Step {}; Status = {}", advance, self.state.cr.frame.status);
self.state.frame.pc.advance(advance); self.state.cr.frame.pc.advance(advance);
if let Some(dl) = self.state.cr_deadline {
want_switch = dl <= Instant::now();
}
match sched {
SchedSignal::Normal => {}
SchedSignal::Yield(None) => {
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true;
}
SchedSignal::Yield(Some(value)) => {
self.state.cr.cr_state = CoroutineState::YieldedValue(value);
want_switch = true;
}
SchedSignal::Sleep(time) => {
self.state.cr.cr_state = CoroutineState::Sleep { due: Instant::now() + time };
want_switch = true;
}
SchedSignal::Ret(results) => {
self.state.cr.cr_state = CoroutineState::Finished(results);
want_switch = true;
// TODO prioritize a thread that is waiting for this return value
}
SchedSignal::Join(handle) => {
let mut found = false;
'find: for (n, th) in self.state.parked.iter_mut().enumerate() {
if th.handle == handle {
if let CoroutineState::Finished(_) = &th.cr_state {
let mut crs = mem::replace(&mut th.cr_state, CoroutineState::Ready);
self.state.parked.remove(n); // delete it
if let CoroutineState::Finished(vals) = crs {
self.state.cr.frame.set_retvals(&vals);
found = true;
break 'find;
} else {
unreachable!();
}
}
}
}
if !found {
self.state.set_flag(Flag::Invalid, true);
warn!("Join with invalid thread handle!");
self.state.cr.frame.set_retvals(&[]);
}
}
}
}
Err(Fault::Blocked) => {
// Do not advance, the last instruction will be re-tried
want_switch = true;
} }
Err(Fault::Halt) => { Err(Fault::Halt) => {
// TODO implement coordinated shutdown when more threads are running! // TODO implement coordinated shutdown when more threads are running!
@ -91,6 +157,68 @@ impl RunThread {
break 'run; break 'run;
} }
} }
if want_switch {
trace!("Switch requested");
let now = Instant::now();
let mut candidate = None;
let mut closest_due = None;
for _ in 0..self.state.parked.len() {
if let Some(mut rt) = self.state.parked.pop_front() {
match rt.cr_state {
CoroutineState::Ready => {
candidate = Some(rt);
break;
}
CoroutineState::Sleep { due } => {
if due <= now {
rt.cr_state = CoroutineState::Ready;
candidate = Some(rt);
break;
} else {
match closest_due {
Some(d) => {
if d > due {
closest_due = Some(due);
}
},
None => {
closest_due = Some(due);
}
}
self.state.parked.push_back(rt);
}
}
_ => {
self.state.parked.push_back(rt);
}
}
}
}
if let Some(cr) = candidate {
trace!("Context switch to {:?}", cr);
// Do switch
let old = mem::replace(&mut self.state.cr, cr);
self.state.parked.push_back(old);
self.state.cr_deadline = Some(now + self.info.scheduler_interval);
} else if let Some(due) = closest_due {
let time = due.saturating_duration_since(now);
trace!("No thread to switch to, sleep {:?}", time);
thread::sleep(time);
}
let n_alive = self.state.parked.iter()
.filter(|p| p.cr_state.is_alive()).count();
if n_alive <= 1 {
trace!("Stop task switching, no parked threads are alive");
self.state.cr_deadline = None;
}
}
} }
debug!("Thread ended."); debug!("Thread ended.");

@ -17,6 +17,8 @@ pub struct ThreadInfo {
pub program: Arc<Program>, pub program: Arc<Program>,
/// Program to run /// Program to run
pub(crate) cycle_time: Duration, pub(crate) cycle_time: Duration,
/// Interval one thread/coroutine is let to run before the context switches
pub(crate) scheduler_interval: Duration,
/// Extensions /// Extensions
pub extensions: Arc<Vec<Box<dyn CrsnExtension>>>, pub extensions: Arc<Vec<Box<dyn CrsnExtension>>>,
} }

@ -1,5 +1,5 @@
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use crate::asm::data::{Rd, RdData, RdObj, Register, Wr, WrData}; use crate::asm::data::{Rd, RdData, RdObj, Register, Wr, WrData};
use crate::asm::data::literal::{Addr, Value}; use crate::asm::data::literal::{Addr, Value};
@ -12,34 +12,95 @@ use nudge::{likely};
use crate::asm::instr::cond::Flag; use crate::asm::instr::cond::Flag;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::fmt; use std::fmt;
use std::time::Instant;
pub struct RunState { pub struct RunState {
pub thread_info: Arc<ThreadInfo>, pub thread_info: Arc<ThreadInfo>,
/// Active stack frame /// The active coroutine
pub frame: StackFrame, pub cr: CoroutineContext,
/// Call stack /// Parked coroutines
pub call_stack: CallStack, pub parked: VecDeque<CoroutineContext>,
/// General purpose registers that stay valid for the entire run-time of the thread /// General purpose registers that stay valid for the entire run-time of the thread
pub global_regs: [Value; REG_COUNT], pub global_regs: [Value; REG_COUNT],
/// Extension data /// Extension data
pub ext_data: ExtensionDataStore, pub ext_data: ExtensionDataStore,
/// Execution deadline, if multi-tasked
pub cr_deadline: Option<Instant>,
}
#[derive(Debug,Default)]
pub struct CoroutineContext {
pub handle: Value,
/// Active stack frame
pub frame: StackFrame,
/// Call stack
pub call_stack: CallStack,
/// Coroutine run state
pub cr_state: CoroutineState,
}
/// Execution state of an inactive coroutine
#[derive(Debug, Eq, PartialEq)]
pub enum CoroutineState {
/// Ready to run, just started, or interrupted by the scheduler
Ready,
/// Delay in progress
Sleep { due: Instant },
/// The task finished
Finished(Vec<Value>),
/// The task yielded a value that must be consumed before it can resume.
/// State switches to Ready when the value is read.
YieldedValue(Value),
}
impl CoroutineState {
pub fn is_alive(&self) -> bool {
match self {
CoroutineState::Ready => true,
CoroutineState::Sleep { .. } => true,
CoroutineState::Finished(_) => false,
CoroutineState::YieldedValue(_) => true,
}
}
}
impl Default for CoroutineState {
fn default() -> Self {
Self::Ready
}
} }
impl RunState { impl RunState {
/// Clear everything. Caution: when done at runtime, this effectively reboots the thread /// Clear everything. Caution: when done at runtime, this effectively reboots the thread
pub fn clear_all(&mut self) { pub fn clear_all(&mut self) {
self.frame = Default::default(); self.cr = Default::default();
self.call_stack = Default::default(); self.parked = Default::default();
self.global_regs = Default::default(); self.global_regs = Default::default();
self.ext_data = Default::default(); self.ext_data = Default::default();
} }
/// Add a coroutine, marked as Ready, to run next
pub fn add_coroutine(&mut self, frame : StackFrame) -> Value {
let handle = self.thread_info.unique_handle();
self.parked.push_back(CoroutineContext {
handle,
frame,
call_stack: vec![],
cr_state: Default::default()
});
if self.cr_deadline.is_none() {
// start context switching
self.cr_deadline = Some(Instant::now() + self.thread_info.scheduler_interval);
}
handle
}
} }
impl Debug for RunState { impl Debug for RunState {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("RunState") f.debug_struct("RunState")
.field("frame", &self.frame) .field("cr", &self.cr)
.field("call_stack", &self.call_stack) .field("parked", &self.parked)
.field("global_regs", &self.global_regs) .field("global_regs", &self.global_regs)
//.field("ext_data", &self.ext_data) //.field("ext_data", &self.ext_data)
.finish() .finish()
@ -79,44 +140,44 @@ impl RunState {
#[inline(always)] #[inline(always)]
pub fn set_flag(&mut self, flag: Flag, set: bool) { pub fn set_flag(&mut self, flag: Flag, set: bool) {
trace!("Flag {} = {:?}", flag, set); trace!("Flag {} = {:?}", flag, set);
self.frame.status.set(flag, set); self.cr.frame.status.set(flag, set);
} }
/// Check status flags for a condition /// Check status flags for a condition
#[inline(always)] #[inline(always)]
pub fn test_cond(&self, cond: Cond) -> bool { pub fn test_cond(&self, cond: Cond) -> bool {
self.frame.status.test(cond) self.cr.frame.status.test(cond)
} }
/// Set program counter - address of the next instruction to run /// Set program counter - address of the next instruction to run
pub fn set_pc(&mut self, pc: Addr) { pub fn set_pc(&mut self, pc: Addr) {
trace!("PC := {}", pc); trace!("PC := {}", pc);
self.frame.pc = pc; self.cr.frame.pc = pc;
} }
/// Get program counter - address of the next instruction to run /// Get program counter - address of the next instruction to run
pub fn get_pc(&self) -> Addr { pub fn get_pc(&self) -> Addr {
self.frame.pc self.cr.frame.pc
} }
/// Clear status flags /// Clear status flags
#[inline(always)] #[inline(always)]
pub fn clear_status(&mut self) { pub fn clear_status(&mut self) {
self.frame.status.clear(); self.cr.frame.status.clear();
} }
/// Update status flags using a variable. /// Update status flags using a variable.
/// The update is additive - call `clear_status()` first if desired! /// The update is additive - call `clear_status()` first if desired!
#[inline(always)] #[inline(always)]
pub fn update_status(&mut self, val: Value) { pub fn update_status(&mut self, val: Value) {
self.frame.status.update(val); self.cr.frame.status.update(val);
} }
/// Update status flags using a variable. /// Update status flags using a variable.
/// The update is additive - call `clear_status()` first if desired! /// The update is additive - call `clear_status()` first if desired!
#[inline(always)] #[inline(always)]
pub fn update_status_float(&mut self, val: f64) { pub fn update_status_float(&mut self, val: f64) {
self.frame.status.update_float(val); self.cr.frame.status.update_float(val);
} }
/// Read object handle value /// Read object handle value
@ -133,8 +194,8 @@ impl RunState {
RdData::Immediate(v) => Ok(v), RdData::Immediate(v) => Ok(v),
RdData::Register(Register::Gen(rn)) => { RdData::Register(Register::Gen(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
trace!("Rd {:?} = {}", rd, self.frame.gen[rn as usize]); trace!("Rd {:?} = {}", rd, self.cr.frame.gen[rn as usize]);
Ok(self.frame.gen[rn as usize]) Ok(self.cr.frame.gen[rn as usize])
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Gen(rn) }) Err(Fault::RegisterNotExist { reg: Register::Gen(rn) })
} }
@ -149,16 +210,16 @@ impl RunState {
} }
RdData::Register(Register::Arg(rn)) => { RdData::Register(Register::Arg(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
trace!("Rd {:?} = {}", rd, self.frame.arg[rn as usize]); trace!("Rd {:?} = {}", rd, self.cr.frame.arg[rn as usize]);
Ok(self.frame.arg[rn as usize]) Ok(self.cr.frame.arg[rn as usize])
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Arg(rn) }) Err(Fault::RegisterNotExist { reg: Register::Arg(rn) })
} }
} }
RdData::Register(Register::Res(rn)) => { RdData::Register(Register::Res(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
trace!("Rd {:?} = {}", rd, self.frame.res[rn as usize]); trace!("Rd {:?} = {}", rd, self.cr.frame.res[rn as usize]);
Ok(self.frame.res[rn as usize]) Ok(self.cr.frame.res[rn as usize])
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Res(rn) }) // TODO use match after @ when stabilized https://github.com/rust-lang/rust/issues/65490 Err(Fault::RegisterNotExist { reg: Register::Res(rn) }) // TODO use match after @ when stabilized https://github.com/rust-lang/rust/issues/65490
} }
@ -174,6 +235,26 @@ impl RunState {
} }
fn read_object(&mut self, reference: Value) -> Result<Value, Fault> { fn read_object(&mut self, reference: Value) -> Result<Value, Fault> {
// values yielded from coroutines
for cr in &mut self.parked {
if cr.handle == reference {
match cr.cr_state {
CoroutineState::Ready | CoroutineState::Sleep { .. } => {
return Err(Fault::Blocked);
}
CoroutineState::Finished(_) => {
self.set_flag(Flag::Eof, true);
warn!("Attempt to read yielded value of a finished coroutine");
return Ok(0);
}
CoroutineState::YieldedValue(v) => {
cr.cr_state = CoroutineState::Ready;
return Ok(v);
}
}
}
}
// This is a shitty dirty hack to allow iterating over the extensions while passing a mutable reference // This is a shitty dirty hack to allow iterating over the extensions while passing a mutable reference
// to self to the reading methods. Since the extensions array is in an Arc, it can't be mutated internally // to self to the reading methods. Since the extensions array is in an Arc, it can't be mutated internally
// anyway, and we know it will still live after the method returns - unless someone does something incredibly stupid. // anyway, and we know it will still live after the method returns - unless someone does something incredibly stupid.
@ -213,7 +294,7 @@ impl RunState {
match wr.0 { match wr.0 {
WrData::Register(Register::Gen(rn)) => { WrData::Register(Register::Gen(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
self.frame.gen[rn as usize] = val; self.cr.frame.gen[rn as usize] = val;
Ok(()) Ok(())
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Gen(rn) }) Err(Fault::RegisterNotExist { reg: Register::Gen(rn) })

@ -38,6 +38,8 @@ struct Config {
assemble_debug: bool, assemble_debug: bool,
#[serde(with = "serde_duration_millis")] #[serde(with = "serde_duration_millis")]
cycle_time: Duration, cycle_time: Duration,
#[serde(with = "serde_duration_millis")]
switch_time: Duration,
} }
impl Default for Config { impl Default for Config {
@ -51,6 +53,7 @@ impl Default for Config {
assemble_only: false, assemble_only: false,
assemble_debug: false, assemble_debug: false,
cycle_time: Duration::default(), cycle_time: Duration::default(),
switch_time: Duration::from_millis(10),
} }
} }
} }
@ -177,6 +180,7 @@ fn main() -> anyhow::Result<()> {
program: parsed, program: parsed,
pc: Addr(0), pc: Addr(0),
cycle_time: config.cycle_time, cycle_time: config.cycle_time,
scheduler_interval: config.switch_time,
args args
}); });

Loading…
Cancel
Save