Implement critical sections and timeslice setting cmd

master
Ondřej Hruška 3 years ago
parent 9cd03ca8e3
commit 735f871ea0
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 33
      README.md
  2. 31
      crsn/src/asm/instr/flatten.rs
  3. 15
      crsn/src/asm/mod.rs
  4. 8
      crsn/src/asm/parse/mod.rs
  5. 11
      crsn/src/asm/parse/parse_instr.rs
  6. 6
      crsn/src/builtin/defs.rs
  7. 31
      crsn/src/builtin/exec.rs
  8. 80
      crsn/src/builtin/mod.rs
  9. 24
      crsn/src/builtin/parse.rs
  10. 9
      crsn/src/runtime/fault.rs
  11. 170
      crsn/src/runtime/run_thread.rs
  12. 3
      crsn/src/runtime/run_thread/info.rs
  13. 4
      crsn/src/runtime/run_thread/state.rs
  14. 0
      examples/coroutines2-gen.csn
  15. 39
      examples/coroutines3-crit.csn

@ -326,6 +326,29 @@ You can also use one register for up to 64 mutexes.
Remember to only use global registers (or buffer items) as mutexes: `g0`-`g15`. Each coroutine has its own set of *regular* registers.
Another way to avoid race conditions is to use **critical sections**.
Context switch (switching between active coroutines) is forbidden in a critical section. Try to keep critical sections as short as possible,
since they can distort sleep times and cause other similar problems.
```
(crit-begin)
...
(crit-end)
```
A safer way is to use the "critical block", which expands to the same, but also detects some common bugs at compile time,
like trying to jump out of the critical section.
```
(crit
...
)
```
Critical section nesting is allowed, but probably a bug.
**Beware deadlocks!**
### Using coroutines as generators
A coroutine can "yield a value" by invoking the `yield` instruction with an operand. This can be done any number of times.
@ -497,6 +520,16 @@ Jumping to a label is always safer than a manual skip.
; Wait for a coroutine to complete, read its return values and delete it.
(join @Obj)
; Begin a critical section (no context switch allowed)
(crit-begin)
; End a critical section
(crit-end)
; Shortcut to define a critical section
(crit
...ops
)
; Generate a run-time fault with a debugger message
(fault)
(fault message)

@ -29,6 +29,16 @@ impl Flatten for () {
}
}
impl Flatten for Op {
fn flatten(self: Box<Self>, _label_num: &AtomicU32) -> Result<Vec<Op>, CrsnError> {
Ok(vec![*self])
}
fn pos(&self) -> SourcePosition {
self.pos.clone()
}
}
impl Flatten for InstrWithBranches {
fn pos(&self) -> SourcePosition {
self.pos.clone()
@ -111,6 +121,23 @@ impl Flatten for Vec<Box<dyn Flatten>> {
}
}
impl Flatten for Vec<Op> {
fn pos(&self) -> SourcePosition {
match self.first() {
None => {
Default::default()
}
Some(f) => {
f.pos()
}
}
}
fn flatten(self: Box<Self>, _label_num: &AtomicU32) -> Result<Vec<Op>, CrsnError> {
Ok(*self)
}
}
impl Flatten for Routine {
fn pos(&self) -> SourcePosition {
self.pos.clone()
@ -138,12 +165,12 @@ impl Flatten for Routine {
}.into_op(self_pos)
);
labels_to_skips(ops)
jumps_to_skips(ops)
}
}
/// Convert jumps to relative skips
pub fn labels_to_skips(ops: Vec<Op>) -> Result<Vec<Op>, CrsnError> {
pub fn jumps_to_skips(ops: Vec<Op>) -> Result<Vec<Op>, CrsnError> {
let mut label_positions = HashMap::<Label, usize>::new();
for (n, op) in ops.iter().enumerate() {
if let OpKind::BuiltIn(BuiltinOp::Label(name)) = &op.kind {

@ -3,13 +3,14 @@ use std::sync::Arc;
use sexp::SourcePosition;
use crate::asm::instr::flatten::labels_to_skips;
use crate::asm::instr::flatten::jumps_to_skips;
use crate::asm::parse::{ParserContext, ParserState};
use crate::module::{CrsnExtension, CrsnUniq};
use crate::runtime::program::Program;
use crate::builtin::BuiltinOps;
use crate::runtime::run_thread::{RunState, ThreadInfo, ThreadToken};
use crate::runtime::frame::REG_COUNT;
use std::sync::atomic::AtomicU32;
pub mod data;
pub mod error;
@ -46,6 +47,10 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec<Box<dyn CrsnExt
extensions: parsers_arc.clone(),
});
/* numbered labels start with a weird high number
to avoid conflicts with user-defined numbered labels */
let label_num = Arc::new(AtomicU32::new(0x7890_0000));
let pcx = ParserContext {
parsers: &parsers_arc,
state: RefCell::new(ParserState {
@ -62,15 +67,17 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec<Box<dyn CrsnExt
parked: Default::default(),
global_regs: [0; REG_COUNT],
ext_data: Default::default(),
cr_deadline: None
cr_deadline: None,
critical_section: 0,
},
const_eval_ti: ti,
parsing_expr: false
parsing_expr: false,
label_num: label_num.clone()
}),
};
let ops = parse::parse(source, &SourcePosition::default(), &pcx)?;
let ops = labels_to_skips(ops)?;
let ops = jumps_to_skips(ops)?;
Ok(Program::new(ops, parsers_arc)?)
}

@ -51,6 +51,9 @@ pub struct ParserState {
/// True if we are in an expression parser context
pub parsing_expr : bool,
/// Label numberer
pub label_num : Arc<AtomicU32>,
}
impl ParserState {
@ -67,9 +70,6 @@ impl ParserState {
pub fn parse(source: &str, pos: &SourcePosition, parsers: &ParserContext) -> Result<Vec<Op>, CrsnError> {
let (items, _pos) = expect_list(sexp::parse(source)?, true)?;
/* numbered labels start with a weird high number
to avoid conflicts with user-defined numbered labels */
let label_num = AtomicU32::new(0x7890_0000);
parse_instructions(items.into_iter(), pos, parsers)?
.flatten(&label_num)
.flatten(&parsers.state.borrow().label_num)
}

@ -14,7 +14,7 @@ use super::parse_op::parse_op;
pub fn parse_instructions(items: impl Iterator<Item=Sexp>, pos: &SourcePosition, pcx: &ParserContext) -> Result<Box<dyn Flatten>, CrsnError> {
let mut parsed = vec![];
for expr in items {
'exprs: for expr in items {
let (tokens, listpos) = expect_list(expr, false)?;
let mut toki = tokens.into_iter();
@ -40,8 +40,13 @@ pub fn parse_instructions(items: impl Iterator<Item=Sexp>, pos: &SourcePosition,
let mut token_parser = TokenParser::new(toki.collect(), &listpos, pcx);
for p in pcx.parsers {
token_parser = match p.parse_syntax(pos, &name, token_parser) {
Ok(ParseRes::Parsed(op)) => return Ok(op),
Ok(ParseRes::ParsedNone) => return Ok(Box::new(())),
Ok(ParseRes::Parsed(op)) => {
parsed.push(op);
continue 'exprs;
},
Ok(ParseRes::ParsedNone) => {
continue 'exprs;
},
Ok(ParseRes::Unknown(to_reuse)) => {
if to_reuse.parsing_started() {
panic!("Module \"{}\" started parsing syntax, but returned Unknown!", p.name());

@ -151,6 +151,12 @@ pub enum BuiltinOp {
/// Yield control, optionally yielding a value that must be consumed (by reading the task handle)
/// before execution can resume
Yield { value: Option<Rd> },
/// Set runtime option
RuntimeOpt { opt: Rd, value: Rd },
/// Begin critical section
CriticalBegin,
/// End critical section
CriticalEnd,
/// Deny jumps, skips and run across this address, producing a run-time fault.
Barrier {
kind: Barrier,

@ -10,6 +10,8 @@ use crate::runtime::frame::StackFrame;
use crate::runtime::run_thread::{state::RunState, ThreadInfo};
use crate::asm::instr::cond::Flag;
use super::RT_OPT_TIMESLICE;
impl OpTrait for BuiltinOp {
fn execute(&self, info: &ThreadInfo, state: &mut RunState) -> Result<EvalRes, Fault> {
let program = &info.program;
@ -252,6 +254,35 @@ impl OpTrait for BuiltinOp {
BuiltinOp::Join(obj) => {
res.sched = SchedSignal::Join(obj.read(state)?);
}
BuiltinOp::RuntimeOpt { opt, value } => {
let opt = state.read(opt)?;
let val = state.read(value)?;
match opt {
RT_OPT_TIMESLICE => {
*state.thread_info.scheduler_interval.write() = Duration::from_micros(val);
}
_ => {
warn!("Invalid rt-opt {}", opt);
state.set_flag(Flag::Invalid, true);
}
}
}
BuiltinOp::CriticalBegin => {
if state.critical_section == Value::MAX {
return Err(Fault::UnbalancedCriticalSection);
}
state.critical_section += 1;
res.cycles = 0;
}
BuiltinOp::CriticalEnd => {
if state.critical_section == 0 {
return Err(Fault::UnbalancedCriticalSection);
}
state.critical_section -= 1;
res.cycles = 0;
}
}
Ok(res)

@ -4,11 +4,19 @@ use crate::asm::error::CrsnError;
use crate::asm::instr::op::OpKind;
use crate::asm::parse::arg_parser::TokenParser;
use crate::module::{CrsnExtension, ParseRes};
use crate::asm::data::literal::Value;
use crate::asm::instr::{Flatten, Op};
use crate::asm::parse::parse_instructions;
use crate::builtin::defs::BuiltinOp;
use crate::asm::instr::flatten::jumps_to_skips;
use crate::asm::data::{Rd, RdData};
pub mod defs;
pub mod exec;
pub mod parse;
pub(crate) const RT_OPT_TIMESLICE : u64 = 1;
#[derive(Debug, Clone)]
pub struct BuiltinOps;
@ -26,4 +34,76 @@ impl CrsnExtension for BuiltinOps {
fn parse_op<'a>(&self, pos: &SourcePosition, keyword: &str, args: TokenParser<'a>) -> Result<ParseRes<'a, OpKind>, CrsnError> {
parse::parse_op(pos, keyword, args)
}
/// Get value of an extension-provided constant.
/// This constant may be an object handle, or a constant value used as argument in some other instruction.
fn get_constant_value<'a>(&self, name: &str) -> Option<Value> {
match name {
"RT_TIMESLICE" => Some(RT_OPT_TIMESLICE),
_ => None,
}
}
/// Parse a generic S-expression (non-op) that started with the given keyword
///
/// pcx is available on the arg_tokens parser
fn parse_syntax<'a>(&self, pos: &SourcePosition, keyword: &str, tokens: TokenParser<'a>)
-> Result<ParseRes<'a, Box<dyn Flatten>>, CrsnError>
{
if keyword == "crit" || keyword == "critical" {
let pcx = tokens.pcx;
let opts = parse_instructions(tokens.into_iter(), pos, pcx)?;
let flattened = jumps_to_skips(opts.flatten(&pcx.state.borrow_mut().label_num)?)?;
let len = flattened.len();
for (n, op) in flattened.iter().enumerate() {
match op.kind {
OpKind::BuiltIn(BuiltinOp::Skip(Rd(RdData::Immediate(skip)))) => {
let signed = i64::from_ne_bytes(skip.to_ne_bytes());
let target = n as i64 + signed;
if target < 0 || target > len as i64 {
return Err(CrsnError::Parse("Cannot jump out of a critical section!".into(), op.pos()));
}
}
/* Non-constant skip cannot be validated */
OpKind::BuiltIn(BuiltinOp::Skip(_)) => {
return Err(CrsnError::Parse("Variable skips are not allowed in a critical section".into(), op.pos()));
}
/* Yield in critical makes zero sense */
OpKind::BuiltIn(BuiltinOp::Yield { .. }) => {
return Err(CrsnError::Parse("Yield in a critical section!".into(), op.pos()));
}
/* This is likely a bug */
OpKind::BuiltIn(BuiltinOp::Ret(_)) => {
return Err(CrsnError::Parse("Ret in a critical section!".into(), op.pos()));
}
/* Probably also a bug. If someone really wants this, they can start and end the critical section manually. */
OpKind::BuiltIn(BuiltinOp::FarJump(_)) => {
return Err(CrsnError::Parse("Far jump a critical section!".into(), op.pos()));
}
_ => {}
}
}
let vec : Vec<Box<dyn Flatten>> = vec![
Box::new(Op {
kind: OpKind::BuiltIn(BuiltinOp::CriticalBegin),
cond: None,
pos: pos.clone()
}),
Box::new(flattened),
Box::new(Op {
kind: OpKind::BuiltIn(BuiltinOp::CriticalEnd),
cond: None,
pos: pos.clone()
})
];
return Ok(ParseRes::Parsed(Box::new(vec)));
}
Ok(ParseRes::Unknown(tokens))
}
}

@ -183,6 +183,21 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok
})
}
"crit-begin" => {
BuiltinOp::CriticalBegin
}
"crit-end" => {
BuiltinOp::CriticalEnd
}
"rt-opt" => {
BuiltinOp::RuntimeOpt {
opt: args.next_rd()?,
value: args.next_rd()?,
}
}
"ld" => {
BuiltinOp::Load {
dst: args.next_wr()?,
@ -349,6 +364,9 @@ pub(crate) fn parse_routine_name(name: String, pos: &SourcePosition) -> Result<R
pub(crate) fn to_sexp(op: &BuiltinOp) -> Sexp {
match op {
BuiltinOp::CriticalBegin => sexp::list(&[A("crit-begin")]),
BuiltinOp::CriticalEnd => sexp::list(&[A("crit-end")]),
BuiltinOp::RuntimeOpt { opt, value } => sexp::list(&[A("rt-opt"), A(opt), A(value)]),
BuiltinOp::Nop => sexp::list(&[A("nop")]),
BuiltinOp::Halt => sexp::list(&[A("halt")]),
BuiltinOp::Sleep { count: micros, unit_us: SleepUnit::Sec } => sexp::list(&[A("sslp"), A(micros)]),
@ -594,10 +612,12 @@ mod test {
parked: Default::default(),
global_regs: [0; REG_COUNT],
ext_data: Default::default(),
cr_deadline: None
cr_deadline: None,
critical_section: false,
},
const_eval_ti: ti.clone(),
parsing_expr: false
parsing_expr: false,
label_num: Default::default()
}),
};

@ -77,6 +77,15 @@ pub enum Fault {
#[error("Attempt to read undefined extension data store")]
ExtDataNotDefined,
#[error("Unbalanced critical sections")]
UnbalancedCriticalSection,
#[error("Deadlock detected")]
Deadlock,
#[error("Root routine returned or yielded a value")]
RootReturned,
}
#[derive(Error, Debug)]

@ -1,6 +1,5 @@
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
pub use info::ThreadInfo;
@ -14,6 +13,8 @@ use crate::runtime::program::Program;
use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState};
use std::{mem, thread};
use crate::asm::instr::cond::Flag;
use parking_lot::RwLock;
use crate::asm::instr::Flatten;
#[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub struct ThreadToken(pub u32);
@ -40,12 +41,14 @@ impl RunThread {
pub fn new(params: ThreadParams) -> Self {
let extensions = params.program.extensions.clone();
// TODO investigate if this division to 2 structs is still needed
let ti = Arc::new(ThreadInfo {
id: params.id,
uniq: params.uniq,
program: params.program,
cycle_time: params.cycle_time,
scheduler_interval: params.scheduler_interval,
scheduler_interval: RwLock::new(params.scheduler_interval),
extensions,
});
@ -60,7 +63,8 @@ impl RunThread {
parked: Default::default(),
global_regs: [0; REG_COUNT],
ext_data: Default::default(),
cr_deadline: None
cr_deadline: None,
critical_section: 0,
};
Self {
@ -69,21 +73,17 @@ impl RunThread {
}
}
/// Spawn as a thread
pub fn spawn(self) -> JoinHandle<()> {
std::thread::spawn(move || {
self.run();
})
}
/// Start synchronously
pub fn run(mut self) {
let mut loop_helper = spin_sleep::LoopHelper::builder()
.build_with_target_rate(1.0/self.info.cycle_time.as_secs_f64());
loop_helper.loop_start();
'run: loop {
let mut orig_pc;
let fault = 'run: loop {
let mut want_switch = false;
orig_pc = self.state.cr.frame.pc;
match self.eval_op() {
Ok(EvalRes { cycles, advance, sched }) => {
for _ in 0..cycles {
@ -91,7 +91,6 @@ impl RunThread {
loop_helper.loop_start();
}
trace!("Step {}; Status = {}", advance, self.state.cr.frame.status);
let orig_pc = self.state.cr.frame.pc;
self.state.cr.frame.pc.advance(advance);
if let Some(dl) = self.state.cr_deadline {
@ -157,80 +156,125 @@ impl RunThread {
// Do not advance, the last instruction will be re-tried
want_switch = true;
}
Err(Fault::Halt) => {
// TODO implement coordinated shutdown when more threads are running!
break 'run;
}
Err(e) => {
error!("Fault: {:?}", e);
error!("Core dump: {:?}", self.state);
break 'run;
break 'run e;
}
}
if want_switch {
trace!("Switch requested");
let now = Instant::now();
// Resolve the next coroutine to run, or wait a bit...
'next: loop {
if want_switch {
let now = Instant::now();
let mut candidate = None;
let mut closest_due = None;
for _ in 0..self.state.parked.len() {
if let Some(mut rt) = self.state.parked.pop_front() {
match rt.cr_state {
CoroutineState::Ready => {
candidate = Some(rt);
break;
}
if self.state.critical_section > 0 {
match self.state.cr.cr_state {
CoroutineState::Ready => {}
CoroutineState::Sleep { due } => {
if due <= now {
rt.cr_state = CoroutineState::Ready;
let time = due.saturating_duration_since(now);
trace!("Sleep in critical: {:?}", time);
thread::sleep(time);
continue 'run;
}
CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => {
// This is not good
break 'run Fault::Deadlock;
}
}
// This is either a bug, or waiting for IO.
// If it is the latter, try to give the OS a bit of breathing room..
std::thread::yield_now();
continue 'run;
}
trace!("Switch requested");
let mut candidate = None;
let mut closest_due = None;
for _ in 0..self.state.parked.len() {
if let Some(mut rt) = self.state.parked.pop_front() {
match rt.cr_state {
CoroutineState::Ready => {
candidate = Some(rt);
break;
} else {
match closest_due {
Some(d) => {
if d > due {
}
CoroutineState::Sleep { due } => {
if due <= now {
rt.cr_state = CoroutineState::Ready;
candidate = Some(rt);
break;
} else {
match closest_due {
Some(d) => {
if d > due {
closest_due = Some(due);
}
},
None => {
closest_due = Some(due);
}
},
None => {
closest_due = Some(due);
}
self.state.parked.push_back(rt);
}
}
_ => {
self.state.parked.push_back(rt);
}
}
_ => {
self.state.parked.push_back(rt);
}
}
}
}
if let Some(cr) = candidate {
trace!("Context switch to {:?}", cr);
// Do switch
let old = mem::replace(&mut self.state.cr, cr);
self.state.parked.push_back(old);
self.state.cr_deadline = Some(now + self.info.scheduler_interval);
} else if let Some(due) = closest_due {
let time = due.saturating_duration_since(now);
trace!("No thread to switch to, sleep {:?}", time);
thread::sleep(time);
if let Some(cr) = candidate {
trace!("Context switch to {:?}", cr);
// Do switch
let old = mem::replace(&mut self.state.cr, cr);
self.state.parked.push_back(old);
self.state.cr_deadline = Some(now + *self.info.scheduler_interval.read());
} else if let Some(due) = closest_due {
let time = due.saturating_duration_since(now);
trace!("No thread to switch to, sleep {:?}", time);
thread::sleep(time);
continue 'next;
} else {
// Nothing to run?
thread::yield_now();
trace!("No thread to switch to!");
}
let n_alive = self.state.parked.iter()
.filter(|p| p.cr_state.is_alive()).count();
if n_alive == 0 {
trace!("Stop task switching, no parked threads are alive");
self.state.cr_deadline = None;
}
match self.state.cr.cr_state {
CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => {
break 'run Fault::RootReturned;
}
_ => {}
}
}
let n_alive = self.state.parked.iter()
.filter(|p| p.cr_state.is_alive()).count();
break 'next;
}
};
if n_alive == 0 {
trace!("Stop task switching, no parked threads are alive");
self.state.cr_deadline = None;
match fault {
Fault::Halt => {
debug!("Thread ended.");
}
e => {
if let Some(instr) = self.info.program.ops.get(orig_pc.0 as usize) {
error!("Fault at {}: {}", instr.pos(), e);
} else {
error!("Fault at PC {}: {}", orig_pc, e);
}
warn!("Core dump: {:?}", self.state);
}
}
debug!("Thread ended.");
}
}

@ -6,6 +6,7 @@ use crate::asm::data::literal::Value;
use crate::runtime::program::Program;
use crate::runtime::run_thread::ThreadToken;
use crate::module::{CrsnExtension, CrsnUniq};
use parking_lot::RwLock;
#[derive(Debug)]
pub struct ThreadInfo {
@ -18,7 +19,7 @@ pub struct ThreadInfo {
/// Program to run
pub(crate) cycle_time: Duration,
/// Interval one thread/coroutine is let to run before the context switches
pub(crate) scheduler_interval: Duration,
pub(crate) scheduler_interval: RwLock<Duration>,
/// Extensions
pub extensions: Arc<Vec<Box<dyn CrsnExtension>>>,
}

@ -26,6 +26,8 @@ pub struct RunState {
pub ext_data: ExtensionDataStore,
/// Execution deadline, if multi-tasked
pub cr_deadline: Option<Instant>,
/// Nonzero if inside a critical section
pub critical_section: Value,
}
#[derive(Debug,Default)]
@ -93,7 +95,7 @@ impl RunState {
});
if self.cr_deadline.is_none() {
// start context switching
self.cr_deadline = Some(Instant::now() + self.thread_info.scheduler_interval);
self.cr_deadline = Some(Instant::now() + *self.thread_info.scheduler_interval.read());
}
handle
}

@ -0,0 +1,39 @@
(
; This example shows the use of critical sections.
(spawn _ unsafe 'A' 'Z')
(spawn _ safe '0' '9')
(ssleep 2)
(proc unsafe start end
; This can be interrupted any time
(:x)
(ld r0 start)
(:l)
(msleep 5)
(ld @cout r0)
(cmp r0 end)
(j.eq :x)
(inc r0)
(j :l)
)
(proc safe start end
(:again)
(ld r0 start)
; The sequence will always be complete
(crit
(ld @cout ' ') ; space to make it easier to read
(:l)
(ld @cout r0)
(cmp r0 end)
(j.eq :x)
(inc r0)
(j :l)
(:x)
(ld @cout ' ') ; space to make it easier to read
)
(yield)
(j :again)
)
)
Loading…
Cancel
Save