Merge branch 'coroutines'

master
Ondřej Hruška 4 years ago
commit 91939f520b
Signed by untrusted user: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 70
      README.md
  2. 3
      crsn/src/asm/instr/op.rs
  3. 8
      crsn/src/asm/mod.rs
  4. 2
      crsn/src/asm/parse/parse_data.rs
  5. 9
      crsn/src/builtin/defs.rs
  6. 61
      crsn/src/builtin/exec.rs
  7. 65
      crsn/src/builtin/parse.rs
  8. 22
      crsn/src/module/eval_res.rs
  9. 2
      crsn/src/module/mod.rs
  10. 4
      crsn/src/runtime/exec.rs
  11. 3
      crsn/src/runtime/fault.rs
  12. 6
      crsn/src/runtime/frame.rs
  13. 152
      crsn/src/runtime/run_thread.rs
  14. 2
      crsn/src/runtime/run_thread/info.rs
  15. 130
      crsn/src/runtime/run_thread/state.rs
  16. 38
      examples/coroutines1.csn
  17. 44
      examples/coroutines2.csn
  18. 17
      examples/itoa.csn
  19. 4
      launcher/src/main.rs

@ -287,6 +287,76 @@ It can also be written like this:
) )
``` ```
## Coroutines
Croissant implements something that could be called "pre-emptive coroutines". They do not provide any performance gain,
but add asynchronicity to the program, and can work as generators!
There is no true parallelism, it is difficult to implement safely and efficiently with a global state.
### Spawning
*Any procedure can be used as a coroutine.*
A coroutine is created using the `spawn` instruction, which produces an object handle.
```
(spawn r0 do_stuff 1 2 3)
```
At this point, the program is evenly divided between the original and the coroutine "thread".
The spawned coroutine is scheduled to run immediately after being spawned.
### Task switching
Coroutines take turns to execute the program. The scheduling interval can be configured.
Control can be given up using the `yield` instruction; for example, when waiting for a mutex. This happens automatically when
a `sleep` instruction is invoked.
### Race conditions
Take care when working with objects, resources and global registers: you can get race conditions
with coroutines. Use atomic instructions (`cas`, `casXX`, `bfcas`…) to implement synchronization.
The `casXX` instruction is very powerful: you can use one bit of a register as a mutex and the rest of it to store some useful data.
You can also use one register for up to 64 mutexes.
Remember to only use global registers (or buffer items) as mutexes: `g0`-`g15`. Each coroutine has its own set of *regular* registers.
### Using coroutines as generators
A coroutine can "yield a value" by invoking the `yield` instruction with an operand. This can be done any number of times.
```
(yield r0)
```
The coroutine is blocked until the value is consumed by someone. To consume a yielded value, read the coroutine object handle:
```
(spawn r5 foo)
(ld r0 @r5) ; read a yielded value
```
### Joining a coroutine
Use the `join` instruction with a coroutine object handle to wait for its completion.
A coroutine completes by calling `ret` at its top level. This naturally means that a coroutine can return values!
The returned values are placed in the result registers, just like with the `call` instruction.
```
(spawn r5 foo)
; ...
(join @r0)
; res0-res15 now contain return values
```
# Instruction Set # Instruction Set
Crsn instruction set is composed of extensions. Crsn instruction set is composed of extensions.

@ -4,7 +4,7 @@ use sexp::{Atom, Sexp, SourcePosition};
use crate::asm::instr::Cond; use crate::asm::instr::Cond;
use crate::builtin::defs::BuiltinOp; use crate::builtin::defs::BuiltinOp;
use crate::module::{EvalRes, OpTrait}; use crate::module::{EvalRes, OpTrait, SchedSignal};
use crate::runtime::fault::Fault; use crate::runtime::fault::Fault;
use crate::runtime::run_thread::{info::ThreadInfo, state::RunState}; use crate::runtime::run_thread::{info::ThreadInfo, state::RunState};
@ -30,6 +30,7 @@ impl OpTrait for Op {
return Ok(EvalRes { return Ok(EvalRes {
cycles: 0, cycles: 0,
advance: 1, advance: 1,
sched: SchedSignal::Normal
}); });
} }
} }

@ -42,6 +42,7 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec<Box<dyn CrsnExt
uniq: Default::default(), uniq: Default::default(),
program: Program::new(vec![], parsers_arc.clone()).unwrap(), program: Program::new(vec![], parsers_arc.clone()).unwrap(),
cycle_time: Default::default(), cycle_time: Default::default(),
scheduler_interval: Default::default(),
extensions: parsers_arc.clone(), extensions: parsers_arc.clone(),
}); });
@ -57,10 +58,11 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec<Box<dyn CrsnExt
// This allows to evaluate nearly all instructions at compile time. // This allows to evaluate nearly all instructions at compile time.
const_eval: RunState { const_eval: RunState {
thread_info: ti.clone(), thread_info: ti.clone(),
frame: Default::default(), cr: Default::default(),
call_stack: vec![], parked: Default::default(),
global_regs: [0; REG_COUNT], global_regs: [0; REG_COUNT],
ext_data: Default::default() ext_data: Default::default(),
cr_deadline: None
}, },
const_eval_ti: ti, const_eval_ti: ti,
parsing_expr: false parsing_expr: false

@ -139,7 +139,7 @@ pub fn parse_data_disp(tok: Sexp, pcx: &ParserContext) -> Result<DataDisp, CrsnE
state_mut.parsing_expr = false; state_mut.parsing_expr = false;
} }
return Ok(DataDisp::Immediate(state_mut.const_eval.frame.gen[0])); return Ok(DataDisp::Immediate(state_mut.const_eval.cr.frame.gen[0]));
} }
Err(CrsnError::Parse("List not expected here".into(), pos)) Err(CrsnError::Parse("List not expected here".into(), pos))

@ -136,7 +136,9 @@ pub enum BuiltinOp {
FarJump(Label), FarJump(Label),
/// Call a routine with arguments. /// Call a routine with arguments.
/// The arguments are passed as argX. Return values are stored in resX registers. /// The arguments are passed as argX. Return values are stored in resX registers.
Call(RoutineName, Vec<Rd>), Call { proc: RoutineName, args: Vec<Rd> },
/// Spawn a coroutine. The invocation is similar to (call).
Spawn { handle: Wr, proc: RoutineName, args: Vec<Rd> },
/// Exit the current routine with return values /// Exit the current routine with return values
Ret(Vec<Rd>), Ret(Vec<Rd>),
/// Mark a routine entry point (call target). /// Mark a routine entry point (call target).
@ -144,6 +146,11 @@ pub enum BuiltinOp {
Routine(RoutineName), Routine(RoutineName),
/// Skip backward or forward. The skip count can be defined by an argument. /// Skip backward or forward. The skip count can be defined by an argument.
Skip(Rd), Skip(Rd),
/// Join a coroutine
Join(RdObj),
/// Yield control, optionally yielding a value that must be consumed (by reading the task handle)
/// before execution can resume
Yield { value: Option<Rd> },
/// Deny jumps, skips and run across this address, producing a run-time fault. /// Deny jumps, skips and run across this address, producing a run-time fault.
Barrier { Barrier {
kind: Barrier, kind: Barrier,

@ -4,7 +4,7 @@ use sexp::Sexp;
use crate::asm::data::literal::{Addr, Value}; use crate::asm::data::literal::{Addr, Value};
use crate::builtin::defs::{Barrier, BuiltinOp, LdsValue}; use crate::builtin::defs::{Barrier, BuiltinOp, LdsValue};
use crate::module::{EvalRes, OpTrait}; use crate::module::{EvalRes, OpTrait, SchedSignal};
use crate::runtime::fault::Fault; use crate::runtime::fault::Fault;
use crate::runtime::frame::StackFrame; use crate::runtime::frame::StackFrame;
use crate::runtime::run_thread::{state::RunState, ThreadInfo}; use crate::runtime::run_thread::{state::RunState, ThreadInfo};
@ -63,7 +63,7 @@ impl OpTrait for BuiltinOp {
BuiltinOp::Jump(_name) => { BuiltinOp::Jump(_name) => {
panic!("jump not translated to skip by assembler!"); panic!("jump not translated to skip by assembler!");
} }
BuiltinOp::Call(name, args) => { BuiltinOp::Call { proc: name, args } => {
match program.find_routine(&name) { match program.find_routine(&name) {
Ok(pos) => { Ok(pos) => {
let mut values = Vec::with_capacity(args.len()); let mut values = Vec::with_capacity(args.len());
@ -72,8 +72,8 @@ impl OpTrait for BuiltinOp {
} }
let mut frame2 = StackFrame::new(pos, &values); let mut frame2 = StackFrame::new(pos, &values);
std::mem::swap(&mut state.frame, &mut frame2); std::mem::swap(&mut state.cr.frame, &mut frame2);
state.call_stack.push(frame2); state.cr.call_stack.push(frame2);
res.advance = 0; res.advance = 0;
} }
Err(e) => { Err(e) => {
@ -81,18 +81,40 @@ impl OpTrait for BuiltinOp {
} }
} }
} }
BuiltinOp::Ret(retvals) => { BuiltinOp::Spawn { handle, proc, args } => {
match state.call_stack.pop() { match program.find_routine(&proc) {
Some(previous) => { Ok(pos) => {
let mut values = Vec::with_capacity(retvals.len()); let mut values = Vec::with_capacity(args.len());
for arg in retvals { for arg in args {
values.push(state.read(arg)?); values.push(state.read(arg)?);
} }
state.frame = previous;
state.frame.set_retvals(&values); let mut frame2 = StackFrame::new(pos, &values);
let handle_val = state.add_coroutine(frame2);
state.write(handle, handle_val)?;
// Yield execution so the new thread can start running
res.sched = SchedSignal::Yield(None);
}
Err(e) => {
return Err(e);
}
}
}
BuiltinOp::Ret(retvals) => {
let mut values = Vec::with_capacity(retvals.len());
for arg in retvals {
values.push(state.read(arg)?);
}
match state.cr.call_stack.pop() {
Some(previous) => {
state.cr.frame = previous;
state.cr.frame.set_retvals(&values);
} }
None => { None => {
return Err(Fault::CallStackUnderflow); // Stack underflow - if this is a coroutine, it's finished
res.sched = SchedSignal::Ret(values);
} }
} }
} }
@ -196,15 +218,15 @@ impl OpTrait for BuiltinOp {
} }
} }
BuiltinOp::StoreFlags { dst } => { BuiltinOp::StoreFlags { dst } => {
let packed = state.frame.status.store(); let packed = state.cr.frame.status.store();
state.write(dst, packed)?; state.write(dst, packed)?;
} }
BuiltinOp::LoadFlags { src } => { BuiltinOp::LoadFlags { src } => {
let x = state.read(src)?; let x = state.read(src)?;
state.frame.status.load(x); state.cr.frame.status.load(x);
} }
BuiltinOp::Sleep { count: micros, unit_us } => { BuiltinOp::Sleep { count: micros, unit_us } => {
std::thread::sleep(Duration::from_micros(state.read(micros)? * unit_us.micros())) res.sched = SchedSignal::Sleep(Duration::from_micros(state.read(micros)? * unit_us.micros()));
} }
BuiltinOp::Delete(obj) => { BuiltinOp::Delete(obj) => {
let hv = obj.read(state)?; let hv = obj.read(state)?;
@ -221,6 +243,15 @@ impl OpTrait for BuiltinOp {
state.set_flag(Flag::Invalid, true); state.set_flag(Flag::Invalid, true);
} }
} }
BuiltinOp::Yield { value } => {
res.sched = SchedSignal::Yield(match value {
None => None,
Some(rd) => Some(state.read(rd)?)
});
}
BuiltinOp::Join(obj) => {
res.sched = SchedSignal::Join(obj.read(state)?);
}
} }
Ok(res) Ok(res)

@ -1,4 +1,4 @@
use sexp::{Atom, Sexp, SourcePosition, atom_qs}; use sexp::{Atom, Sexp, SourcePosition, atom_qs, atom_s};
use crate::asm::data::literal::{RoutineName}; use crate::asm::data::literal::{RoutineName};
use crate::asm::data::reg::parse_reg; use crate::asm::data::reg::parse_reg;
@ -25,21 +25,21 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
BuiltinOp::Halt BuiltinOp::Halt
} }
"uslp" => { "uslp" | "usleep" => {
BuiltinOp::Sleep { BuiltinOp::Sleep {
count: args.next_rd()?, count: args.next_rd()?,
unit_us: SleepUnit::Usec, unit_us: SleepUnit::Usec,
} }
} }
"mslp" => { "mslp" | "msleep" => {
BuiltinOp::Sleep { BuiltinOp::Sleep {
count: args.next_rd()?, count: args.next_rd()?,
unit_us: SleepUnit::Msec, unit_us: SleepUnit::Msec,
} }
} }
"sslp" => { "sslp" | "ssleep" => {
BuiltinOp::Sleep { BuiltinOp::Sleep {
count: args.next_rd()?, count: args.next_rd()?,
unit_us: SleepUnit::Sec, unit_us: SleepUnit::Sec,
@ -132,7 +132,7 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
for t in args { for t in args {
call_args.push(parse_rd(t, pcx)?); call_args.push(parse_rd(t, pcx)?);
} }
BuiltinOp::Call(dest, call_args) BuiltinOp::Call { proc: dest, args: call_args }
} }
"ret" => { "ret" => {
@ -258,6 +258,38 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
} }
} }
"spawn" => {
let handle = args.next_wr()?;
let dest = RoutineName { name: args.next_string()?.0, arity: args.len() as u8 };
let mut call_args = vec![];
for t in args {
call_args.push(parse_rd(t, pcx)?);
}
BuiltinOp::Spawn {
handle,
proc: dest,
args: call_args
}
}
"join" => {
BuiltinOp::Join(args.next_rdobj()?)
}
"yield" => {
if args.have_more() {
BuiltinOp::Yield {
value: Some(args.next_rd()?),
}
} else {
BuiltinOp::Yield {
value: None,
}
}
}
"del" => { "del" => {
BuiltinOp::Delete(args.next_rdobj()?) BuiltinOp::Delete(args.next_rdobj()?)
} }
@ -326,7 +358,7 @@ pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp {
BuiltinOp::Jump(label) => sexp::list(&[A("j"), A(label)]), BuiltinOp::Jump(label) => sexp::list(&[A("j"), A(label)]),
BuiltinOp::FarLabel(label) => sexp::list(&[A("far"), A(label)]), BuiltinOp::FarLabel(label) => sexp::list(&[A("far"), A(label)]),
BuiltinOp::FarJump(label) => sexp::list(&[A("fj"), A(label)]), BuiltinOp::FarJump(label) => sexp::list(&[A("fj"), A(label)]),
BuiltinOp::Call(name, args) => { BuiltinOp::Call { proc: name, args } => {
if args.is_empty() { if args.is_empty() {
sexp::list(&[A("call"), A(&name.name)]) sexp::list(&[A("call"), A(&name.name)])
} else { } else {
@ -416,6 +448,22 @@ pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp {
} }
} }
} }
BuiltinOp::Spawn { handle, proc, args } => {
if args.is_empty() {
sexp::list(&[A("spawn"), A(handle), A(proc)])
} else {
let mut v = vec![A("spawn"), A(handle), A(proc)];
v.extend(args.iter().map(|r| A(r)));
sexp::list(&v)
}
}
BuiltinOp::Join(handle) => sexp::list(&[A("join"), A(handle)]),
BuiltinOp::Yield { value } => {
match value {
None => sexp::list(&[A("yield")]),
Some(rd) => sexp::list(&[A("yield"), A(rd)]),
}
}
} }
} }
@ -505,6 +553,11 @@ mod test {
("(far :label)", "(far :label)"), ("(far :label)", "(far :label)"),
("(del @r5)", "(del @r5)"), ("(del @r5)", "(del @r5)"),
("(sym cat r0)(del @cat)", "(del @r0)"), ("(sym cat r0)(del @cat)", "(del @r0)"),
("(spawn r0 foo 1 2 3)", "(spawn r0 foo 1 2 3)"),
("(yield)", "(yield)"),
("(yield -1)", "(yield -1)"),
("(yield r5)", "(yield r5)"),
("(join @r5)", "(join @r5)"),
]; ];
let parser = BuiltinOps::new(); let parser = BuiltinOps::new();

@ -1,3 +1,6 @@
use crate::asm::data::literal::Value;
use std::time::Duration;
/// Cycles spent executing an instruction /// Cycles spent executing an instruction
pub type CyclesSpent = usize; pub type CyclesSpent = usize;
@ -6,6 +9,7 @@ pub type CyclesSpent = usize;
pub struct EvalRes { pub struct EvalRes {
pub cycles: CyclesSpent, pub cycles: CyclesSpent,
pub advance: i64, pub advance: i64,
pub sched: SchedSignal,
} }
impl Default for EvalRes { impl Default for EvalRes {
@ -13,6 +17,24 @@ impl Default for EvalRes {
Self { Self {
cycles: 1, cycles: 1,
advance: 1, advance: 1,
sched: SchedSignal::Normal,
} }
} }
} }
/// Signal to the scheduler
#[derive(Debug)]
pub enum SchedSignal {
/// No signal, execution went normally.
Normal,
/// Yield control, optionally with a value.
/// If a value is yielded, it must be consumed through the handle before execution can resume.
Yield(Option<Value>),
/// The routine requests a delay in execution. The actual sleep time can be longer due to task
/// switching overhead.
Sleep(Duration),
/// Return from a coroutine - the thread ends and should be joined.
Ret(Vec<Value>),
/// Request to join a coroutine/thread
Join(Value),
}

@ -2,7 +2,7 @@
use std::fmt::Debug; use std::fmt::Debug;
pub use eval_res::EvalRes; pub use eval_res::*;
use sexp::{Sexp, SourcePosition}; use sexp::{Sexp, SourcePosition};
use crate::asm::data::literal::Value; use crate::asm::data::literal::Value;

@ -8,9 +8,9 @@ impl RunThread {
let state = &mut self.state; let state = &mut self.state;
let info = &self.info; let info = &self.info;
let op = info.program.fetch_instr(state.frame.pc); let op = info.program.fetch_instr(state.cr.frame.pc);
trace!("### {:04} : {:?}", state.frame.pc.0, op); trace!("### {:04} : {:?}", state.cr.frame.pc.0, op);
op.execute(info, state) op.execute(info, state)
} }

@ -26,6 +26,9 @@ pub enum Fault {
#[error("Program ended.")] #[error("Program ended.")]
Halt, Halt,
#[error("Instruction did not finish in time for context switch, retry later")]
Blocked,
#[error("Operation not allowed: {0}")] #[error("Operation not allowed: {0}")]
NotAllowed(Cow<'static, str>), NotAllowed(Cow<'static, str>),

@ -33,10 +33,14 @@ impl StackFrame {
sf sf
} }
#[inline(always)]
pub fn set_retvals(&mut self, vals: &[Value]) { pub fn set_retvals(&mut self, vals: &[Value]) {
for n in 0..(vals.len().min(REG_COUNT)) { for n in 0..(vals.len().min(REG_COUNT)) {
self.res[n] = vals[n]; self.res[n] = vals[n];
} }
// Zero the rest
for n in vals.len()..REG_COUNT {
self.res[n] = 0;
}
} }
} }

@ -1,16 +1,19 @@
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::{Duration}; use std::time::{Duration, Instant};
pub use info::ThreadInfo; pub use info::ThreadInfo;
pub use state::RunState; pub use state::RunState;
use crate::asm::data::literal::Addr; use crate::asm::data::literal::Addr;
use crate::module::{EvalRes, CrsnUniq}; use crate::module::{EvalRes, CrsnUniq, SchedSignal};
use crate::runtime::fault::Fault; use crate::runtime::fault::Fault;
use crate::runtime::frame::{StackFrame, REG_COUNT}; use crate::runtime::frame::{StackFrame, REG_COUNT};
use crate::runtime::program::Program; use crate::runtime::program::Program;
use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState};
use std::{mem, thread};
use crate::asm::instr::cond::Flag;
#[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)] #[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub struct ThreadToken(pub u32); pub struct ThreadToken(pub u32);
@ -29,6 +32,7 @@ pub struct ThreadParams<'a> {
pub program: Arc<Program>, pub program: Arc<Program>,
pub pc: Addr, pub pc: Addr,
pub cycle_time: Duration, pub cycle_time: Duration,
pub scheduler_interval: Duration,
pub args: &'a [u64], pub args: &'a [u64],
} }
@ -41,15 +45,22 @@ impl RunThread {
uniq: params.uniq, uniq: params.uniq,
program: params.program, program: params.program,
cycle_time: params.cycle_time, cycle_time: params.cycle_time,
scheduler_interval: params.scheduler_interval,
extensions, extensions,
}); });
let rs = RunState { let rs = RunState {
thread_info: ti.clone(), thread_info: ti.clone(),
frame: StackFrame::new(params.pc, params.args), cr: CoroutineContext {
call_stack: vec![], handle: 0, // this is the root
frame: StackFrame::new(params.pc, params.args),
call_stack: vec![],
cr_state: Default::default(),
},
parked: Default::default(),
global_regs: [0; REG_COUNT], global_regs: [0; REG_COUNT],
ext_data: Default::default(), ext_data: Default::default(),
cr_deadline: None
}; };
Self { Self {
@ -72,14 +83,79 @@ impl RunThread {
loop_helper.loop_start(); loop_helper.loop_start();
'run: loop { 'run: loop {
let mut want_switch = false;
match self.eval_op() { match self.eval_op() {
Ok(EvalRes { cycles, advance }) => { Ok(EvalRes { cycles, advance, sched }) => {
for _ in 0..cycles { for _ in 0..cycles {
loop_helper.loop_sleep(); loop_helper.loop_sleep();
loop_helper.loop_start(); loop_helper.loop_start();
} }
trace!("Step {}; Status = {}", advance, self.state.frame.status); trace!("Step {}; Status = {}", advance, self.state.cr.frame.status);
self.state.frame.pc.advance(advance); let orig_pc = self.state.cr.frame.pc;
self.state.cr.frame.pc.advance(advance);
if let Some(dl) = self.state.cr_deadline {
want_switch = dl <= Instant::now();
}
match sched {
SchedSignal::Normal => {}
SchedSignal::Yield(None) => {
trace!("yield");
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true;
}
SchedSignal::Yield(Some(value)) => {
trace!("yield {}", value);
self.state.cr.cr_state = CoroutineState::YieldedValue(value);
want_switch = true;
}
SchedSignal::Sleep(time) => {
trace!("sleep {:?}", time);
self.state.cr.cr_state = CoroutineState::Sleep { due: Instant::now() + time };
want_switch = true;
}
SchedSignal::Ret(results) => {
self.state.cr.cr_state = CoroutineState::Finished(results);
want_switch = true;
// TODO prioritize a thread that is waiting for this return value
}
SchedSignal::Join(handle) => {
trace!("Join cr {:#}", handle);
let mut found = false;
'find: for (n, th) in self.state.parked.iter_mut().enumerate() {
if th.handle == handle {
if let CoroutineState::Finished(_) = &th.cr_state {
let mut crs = mem::replace(&mut th.cr_state, CoroutineState::Ready);
self.state.parked.remove(n); // delete it
found = true;
if let CoroutineState::Finished(vals) = crs {
self.state.cr.frame.set_retvals(&vals);
break 'find;
} else {
unreachable!();
}
} else {
self.state.cr.frame.pc = orig_pc; // Retry
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true;
found = true;
}
}
}
if !found {
self.state.set_flag(Flag::Invalid, true);
warn!("Join with invalid thread handle {:#}!", handle);
self.state.cr.frame.set_retvals(&[]);
}
}
}
}
Err(Fault::Blocked) => {
// Do not advance, the last instruction will be re-tried
want_switch = true;
} }
Err(Fault::Halt) => { Err(Fault::Halt) => {
// TODO implement coordinated shutdown when more threads are running! // TODO implement coordinated shutdown when more threads are running!
@ -91,6 +167,68 @@ impl RunThread {
break 'run; break 'run;
} }
} }
if want_switch {
trace!("Switch requested");
let now = Instant::now();
let mut candidate = None;
let mut closest_due = None;
for _ in 0..self.state.parked.len() {
if let Some(mut rt) = self.state.parked.pop_front() {
match rt.cr_state {
CoroutineState::Ready => {
candidate = Some(rt);
break;
}
CoroutineState::Sleep { due } => {
if due <= now {
rt.cr_state = CoroutineState::Ready;
candidate = Some(rt);
break;
} else {
match closest_due {
Some(d) => {
if d > due {
closest_due = Some(due);
}
},
None => {
closest_due = Some(due);
}
}
self.state.parked.push_back(rt);
}
}
_ => {
self.state.parked.push_back(rt);
}
}
}
}
if let Some(cr) = candidate {
trace!("Context switch to {:?}", cr);
// Do switch
let old = mem::replace(&mut self.state.cr, cr);
self.state.parked.push_back(old);
self.state.cr_deadline = Some(now + self.info.scheduler_interval);
} else if let Some(due) = closest_due {
let time = due.saturating_duration_since(now);
trace!("No thread to switch to, sleep {:?}", time);
thread::sleep(time);
}
let n_alive = self.state.parked.iter()
.filter(|p| p.cr_state.is_alive()).count();
if n_alive == 0 {
trace!("Stop task switching, no parked threads are alive");
self.state.cr_deadline = None;
}
}
} }
debug!("Thread ended."); debug!("Thread ended.");

@ -17,6 +17,8 @@ pub struct ThreadInfo {
pub program: Arc<Program>, pub program: Arc<Program>,
/// Program to run /// Program to run
pub(crate) cycle_time: Duration, pub(crate) cycle_time: Duration,
/// Interval one thread/coroutine is let to run before the context switches
pub(crate) scheduler_interval: Duration,
/// Extensions /// Extensions
pub extensions: Arc<Vec<Box<dyn CrsnExtension>>>, pub extensions: Arc<Vec<Box<dyn CrsnExtension>>>,
} }

@ -1,5 +1,5 @@
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use crate::asm::data::{Rd, RdData, RdObj, Register, Wr, WrData}; use crate::asm::data::{Rd, RdData, RdObj, Register, Wr, WrData};
use crate::asm::data::literal::{Addr, Value}; use crate::asm::data::literal::{Addr, Value};
@ -12,34 +12,98 @@ use nudge::{likely};
use crate::asm::instr::cond::Flag; use crate::asm::instr::cond::Flag;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::fmt; use std::fmt;
use std::time::Instant;
pub struct RunState { pub struct RunState {
pub thread_info: Arc<ThreadInfo>, pub thread_info: Arc<ThreadInfo>,
/// Active stack frame /// The active coroutine
pub frame: StackFrame, pub cr: CoroutineContext,
/// Call stack /// Parked coroutines
pub call_stack: CallStack, pub parked: VecDeque<CoroutineContext>,
/// General purpose registers that stay valid for the entire run-time of the thread /// General purpose registers that stay valid for the entire run-time of the thread
pub global_regs: [Value; REG_COUNT], pub global_regs: [Value; REG_COUNT],
/// Extension data /// Extension data
pub ext_data: ExtensionDataStore, pub ext_data: ExtensionDataStore,
/// Execution deadline, if multi-tasked
pub cr_deadline: Option<Instant>,
}
#[derive(Debug,Default)]
pub struct CoroutineContext {
pub handle: Value,
/// Active stack frame
pub frame: StackFrame,
/// Call stack
pub call_stack: CallStack,
/// Coroutine run state
pub cr_state: CoroutineState,
}
/// Execution state of an inactive coroutine
#[derive(Debug, Eq, PartialEq)]
pub enum CoroutineState {
/// Ready to run, just started, or interrupted by the scheduler
Ready,
/// Delay in progress
Sleep { due: Instant },
/// The task finished
Finished(Vec<Value>),
/// The task yielded a value that must be consumed before it can resume.
/// State switches to Ready when the value is read.
YieldedValue(Value),
}
impl CoroutineState {
pub fn is_alive(&self) -> bool {
match self {
CoroutineState::Ready => true,
CoroutineState::Sleep { .. } => true,
CoroutineState::Finished(_) => false,
CoroutineState::YieldedValue(_) => true,
}
}
}
impl Default for CoroutineState {
fn default() -> Self {
Self::Ready
}
} }
impl RunState { impl RunState {
/// Clear everything. Caution: when done at runtime, this effectively reboots the thread /// Clear everything. Caution: when done at runtime, this effectively reboots the thread
pub fn clear_all(&mut self) { pub fn clear_all(&mut self) {
self.frame = Default::default(); self.cr = Default::default();
self.call_stack = Default::default(); self.parked = Default::default();
self.global_regs = Default::default(); self.global_regs = Default::default();
self.ext_data = Default::default(); self.ext_data = Default::default();
} }
/// Add a coroutine, marked as Ready, to run next
pub fn add_coroutine(&mut self, frame : StackFrame) -> Value {
let handle = self.thread_info.unique_handle();
trace!("Spawn cr {:#}", handle);
// front - so it runs ASAP
self.parked.push_front(CoroutineContext {
handle,
frame,
call_stack: vec![],
cr_state: Default::default()
});
if self.cr_deadline.is_none() {
// start context switching
self.cr_deadline = Some(Instant::now() + self.thread_info.scheduler_interval);
}
handle
}
} }
impl Debug for RunState { impl Debug for RunState {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("RunState") f.debug_struct("RunState")
.field("frame", &self.frame) .field("cr", &self.cr)
.field("call_stack", &self.call_stack) .field("parked", &self.parked)
.field("global_regs", &self.global_regs) .field("global_regs", &self.global_regs)
//.field("ext_data", &self.ext_data) //.field("ext_data", &self.ext_data)
.finish() .finish()
@ -79,44 +143,44 @@ impl RunState {
#[inline(always)] #[inline(always)]
pub fn set_flag(&mut self, flag: Flag, set: bool) { pub fn set_flag(&mut self, flag: Flag, set: bool) {
trace!("Flag {} = {:?}", flag, set); trace!("Flag {} = {:?}", flag, set);
self.frame.status.set(flag, set); self.cr.frame.status.set(flag, set);
} }
/// Check status flags for a condition /// Check status flags for a condition
#[inline(always)] #[inline(always)]
pub fn test_cond(&self, cond: Cond) -> bool { pub fn test_cond(&self, cond: Cond) -> bool {
self.frame.status.test(cond) self.cr.frame.status.test(cond)
} }
/// Set program counter - address of the next instruction to run /// Set program counter - address of the next instruction to run
pub fn set_pc(&mut self, pc: Addr) { pub fn set_pc(&mut self, pc: Addr) {
trace!("PC := {}", pc); trace!("PC := {}", pc);
self.frame.pc = pc; self.cr.frame.pc = pc;
} }
/// Get program counter - address of the next instruction to run /// Get program counter - address of the next instruction to run
pub fn get_pc(&self) -> Addr { pub fn get_pc(&self) -> Addr {
self.frame.pc self.cr.frame.pc
} }
/// Clear status flags /// Clear status flags
#[inline(always)] #[inline(always)]
pub fn clear_status(&mut self) { pub fn clear_status(&mut self) {
self.frame.status.clear(); self.cr.frame.status.clear();
} }
/// Update status flags using a variable. /// Update status flags using a variable.
/// The update is additive - call `clear_status()` first if desired! /// The update is additive - call `clear_status()` first if desired!
#[inline(always)] #[inline(always)]
pub fn update_status(&mut self, val: Value) { pub fn update_status(&mut self, val: Value) {
self.frame.status.update(val); self.cr.frame.status.update(val);
} }
/// Update status flags using a variable. /// Update status flags using a variable.
/// The update is additive - call `clear_status()` first if desired! /// The update is additive - call `clear_status()` first if desired!
#[inline(always)] #[inline(always)]
pub fn update_status_float(&mut self, val: f64) { pub fn update_status_float(&mut self, val: f64) {
self.frame.status.update_float(val); self.cr.frame.status.update_float(val);
} }
/// Read object handle value /// Read object handle value
@ -133,8 +197,8 @@ impl RunState {
RdData::Immediate(v) => Ok(v), RdData::Immediate(v) => Ok(v),
RdData::Register(Register::Gen(rn)) => { RdData::Register(Register::Gen(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
trace!("Rd {:?} = {}", rd, self.frame.gen[rn as usize]); trace!("Rd {:?} = {}", rd, self.cr.frame.gen[rn as usize]);
Ok(self.frame.gen[rn as usize]) Ok(self.cr.frame.gen[rn as usize])
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Gen(rn) }) Err(Fault::RegisterNotExist { reg: Register::Gen(rn) })
} }
@ -149,16 +213,16 @@ impl RunState {
} }
RdData::Register(Register::Arg(rn)) => { RdData::Register(Register::Arg(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
trace!("Rd {:?} = {}", rd, self.frame.arg[rn as usize]); trace!("Rd {:?} = {}", rd, self.cr.frame.arg[rn as usize]);
Ok(self.frame.arg[rn as usize]) Ok(self.cr.frame.arg[rn as usize])
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Arg(rn) }) Err(Fault::RegisterNotExist { reg: Register::Arg(rn) })
} }
} }
RdData::Register(Register::Res(rn)) => { RdData::Register(Register::Res(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
trace!("Rd {:?} = {}", rd, self.frame.res[rn as usize]); trace!("Rd {:?} = {}", rd, self.cr.frame.res[rn as usize]);
Ok(self.frame.res[rn as usize]) Ok(self.cr.frame.res[rn as usize])
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Res(rn) }) // TODO use match after @ when stabilized https://github.com/rust-lang/rust/issues/65490 Err(Fault::RegisterNotExist { reg: Register::Res(rn) }) // TODO use match after @ when stabilized https://github.com/rust-lang/rust/issues/65490
} }
@ -174,6 +238,26 @@ impl RunState {
} }
fn read_object(&mut self, reference: Value) -> Result<Value, Fault> { fn read_object(&mut self, reference: Value) -> Result<Value, Fault> {
// values yielded from coroutines
for cr in &mut self.parked {
if cr.handle == reference {
match cr.cr_state {
CoroutineState::Ready | CoroutineState::Sleep { .. } => {
return Err(Fault::Blocked);
}
CoroutineState::Finished(_) => {
self.set_flag(Flag::Eof, true);
warn!("Attempt to read yielded value of a finished coroutine");
return Ok(0);
}
CoroutineState::YieldedValue(v) => {
cr.cr_state = CoroutineState::Ready;
return Ok(v);
}
}
}
}
// This is a shitty dirty hack to allow iterating over the extensions while passing a mutable reference // This is a shitty dirty hack to allow iterating over the extensions while passing a mutable reference
// to self to the reading methods. Since the extensions array is in an Arc, it can't be mutated internally // to self to the reading methods. Since the extensions array is in an Arc, it can't be mutated internally
// anyway, and we know it will still live after the method returns - unless someone does something incredibly stupid. // anyway, and we know it will still live after the method returns - unless someone does something incredibly stupid.
@ -213,7 +297,7 @@ impl RunState {
match wr.0 { match wr.0 {
WrData::Register(Register::Gen(rn)) => { WrData::Register(Register::Gen(rn)) => {
if likely(rn < REG_COUNT as u8) { if likely(rn < REG_COUNT as u8) {
self.frame.gen[rn as usize] = val; self.cr.frame.gen[rn as usize] = val;
Ok(()) Ok(())
} else { } else {
Err(Fault::RegisterNotExist { reg: Register::Gen(rn) }) Err(Fault::RegisterNotExist { reg: Register::Gen(rn) })

@ -0,0 +1,38 @@
(
(lds @cout "main\n")
(msleep 500)
(lds @cout "main\n")
(msleep 500)
(lds @cout "Spawnign bg\n")
(spawn r15 bg 10)
(msleep 500)
(lds @cout "FG\n")
(msleep 500)
(lds @cout "FG\n")
(msleep 500)
(lds @cout "FG\n")
(msleep 1000)
(lds @cout "Wait for BG\n")
(join @r15)
(lds @cout "Joined\n")
(msleep 500)
(lds @cout "main\n")
(msleep 500)
(lds @cout "main\n")
(msleep 500)
(proc bg times
(ld r0 times)
(:x)
(msleep 500)
(lds @cout "***BG\n")
(dec r0 (nz? (j :x)))
(lds @cout "***BG done.\n")
(ret)
)
)

@ -0,0 +1,44 @@
(
; using a coroutine as a generator
(spawn r15 fibo)
(ld r0 20)
(:next)
(ld r1 @r15) ; Read a yielded value
(call printnum r1)
(lds @cout ", ")
(dec r0)
(j.nz :next)
(ld @cout '\n')
(halt)
(proc fibo
(ld r0 0)
(ld r1 1)
(:x)
(yield r1)
(add r2 r0 r1)
(ld r0 r1)
(ld r1 r2)
(j :x)
)
(proc printnum num
(mkbf r15)
(ld r1 num)
(tst r1 (<0? (mul r1 -1)))
(:next)
(mod r0 r1 10)
(add r0 '0')
(bfrpush @r15 r0)
(div r1 10 (z?
(tst num (<0? (bfrpush @r15 '-')))
(lds @cout @r15)
(del @r15)
(ret)))
(j :next)
)
)

@ -21,4 +21,21 @@
(ret))) (ret)))
(j :next) (j :next)
) )
; other version that prints it
(proc printnum num
(mkbf r15)
(ld r1 num)
(tst r1 (<0? (mul r1 -1)))
(:next)
(mod r0 r1 10)
(add r0 '0')
(bfrpush @r15 r0)
(div r1 10 (z?
(tst num (<0? (bfrpush @r15 '-')))
(lds @cout @r15)
(del @r15)
(ret)))
(j :next)
)
) )

@ -38,6 +38,8 @@ struct Config {
assemble_debug: bool, assemble_debug: bool,
#[serde(with = "serde_duration_millis")] #[serde(with = "serde_duration_millis")]
cycle_time: Duration, cycle_time: Duration,
#[serde(with = "serde_duration_millis")]
switch_time: Duration,
} }
impl Default for Config { impl Default for Config {
@ -51,6 +53,7 @@ impl Default for Config {
assemble_only: false, assemble_only: false,
assemble_debug: false, assemble_debug: false,
cycle_time: Duration::default(), cycle_time: Duration::default(),
switch_time: Duration::from_millis(10),
} }
} }
} }
@ -177,6 +180,7 @@ fn main() -> anyhow::Result<()> {
program: parsed, program: parsed,
pc: Addr(0), pc: Addr(0),
cycle_time: config.cycle_time, cycle_time: config.cycle_time,
scheduler_interval: config.switch_time,
args args
}); });

Loading…
Cancel
Save