diff --git a/README.md b/README.md index d9d4665..78305ec 100644 --- a/README.md +++ b/README.md @@ -326,6 +326,29 @@ You can also use one register for up to 64 mutexes. Remember to only use global registers (or buffer items) as mutexes: `g0`-`g15`. Each coroutine has its own set of *regular* registers. +Another way to avoid race conditions is to use **critical sections**. +Context switch (switching between active coroutines) is forbidden in a critical section. Try to keep critical sections as short as possible, +since they can distort sleep times and cause other similar problems. + +``` +(crit-begin) +... +(crit-end) +``` + +A safer way is to use the "critical block", which expands to the same, but also detects some common bugs at compile time, +like trying to jump out of the critical section. + +``` +(crit + ... +) +``` + +Critical section nesting is allowed, but probably a bug. + +**Beware deadlocks!** + ### Using coroutines as generators A coroutine can "yield a value" by invoking the `yield` instruction with an operand. This can be done any number of times. @@ -497,6 +520,16 @@ Jumping to a label is always safer than a manual skip. ; Wait for a coroutine to complete, read its return values and delete it. (join @Obj) +; Begin a critical section (no context switch allowed) +(crit-begin) +; End a critical section +(crit-end) + +; Shortcut to define a critical section +(crit + ...ops +) + ; Generate a run-time fault with a debugger message (fault) (fault message) diff --git a/crsn/src/asm/instr/flatten.rs b/crsn/src/asm/instr/flatten.rs index 5af6a87..a180476 100644 --- a/crsn/src/asm/instr/flatten.rs +++ b/crsn/src/asm/instr/flatten.rs @@ -29,6 +29,16 @@ impl Flatten for () { } } +impl Flatten for Op { + fn flatten(self: Box, _label_num: &AtomicU32) -> Result, CrsnError> { + Ok(vec![*self]) + } + + fn pos(&self) -> SourcePosition { + self.pos.clone() + } +} + impl Flatten for InstrWithBranches { fn pos(&self) -> SourcePosition { self.pos.clone() @@ -111,6 +121,23 @@ impl Flatten for Vec> { } } +impl Flatten for Vec { + fn pos(&self) -> SourcePosition { + match self.first() { + None => { + Default::default() + } + Some(f) => { + f.pos() + } + } + } + + fn flatten(self: Box, _label_num: &AtomicU32) -> Result, CrsnError> { + Ok(*self) + } +} + impl Flatten for Routine { fn pos(&self) -> SourcePosition { self.pos.clone() @@ -138,12 +165,12 @@ impl Flatten for Routine { }.into_op(self_pos) ); - labels_to_skips(ops) + jumps_to_skips(ops) } } /// Convert jumps to relative skips -pub fn labels_to_skips(ops: Vec) -> Result, CrsnError> { +pub fn jumps_to_skips(ops: Vec) -> Result, CrsnError> { let mut label_positions = HashMap::::new(); for (n, op) in ops.iter().enumerate() { if let OpKind::BuiltIn(BuiltinOp::Label(name)) = &op.kind { diff --git a/crsn/src/asm/mod.rs b/crsn/src/asm/mod.rs index c880636..e4daa32 100644 --- a/crsn/src/asm/mod.rs +++ b/crsn/src/asm/mod.rs @@ -3,13 +3,14 @@ use std::sync::Arc; use sexp::SourcePosition; -use crate::asm::instr::flatten::labels_to_skips; +use crate::asm::instr::flatten::jumps_to_skips; use crate::asm::parse::{ParserContext, ParserState}; use crate::module::{CrsnExtension, CrsnUniq}; use crate::runtime::program::Program; use crate::builtin::BuiltinOps; use crate::runtime::run_thread::{RunState, ThreadInfo, ThreadToken}; use crate::runtime::frame::REG_COUNT; +use std::sync::atomic::AtomicU32; pub mod data; pub mod error; @@ -46,6 +47,10 @@ pub fn assemble(source: &str, uniq : &CrsnUniq, mut parsers: Vec, } impl ParserState { @@ -67,9 +70,6 @@ impl ParserState { pub fn parse(source: &str, pos: &SourcePosition, parsers: &ParserContext) -> Result, CrsnError> { let (items, _pos) = expect_list(sexp::parse(source)?, true)?; - /* numbered labels start with a weird high number - to avoid conflicts with user-defined numbered labels */ - let label_num = AtomicU32::new(0x7890_0000); parse_instructions(items.into_iter(), pos, parsers)? - .flatten(&label_num) + .flatten(&parsers.state.borrow().label_num) } diff --git a/crsn/src/asm/parse/parse_instr.rs b/crsn/src/asm/parse/parse_instr.rs index adb3eb1..22048f4 100644 --- a/crsn/src/asm/parse/parse_instr.rs +++ b/crsn/src/asm/parse/parse_instr.rs @@ -14,7 +14,7 @@ use super::parse_op::parse_op; pub fn parse_instructions(items: impl Iterator, pos: &SourcePosition, pcx: &ParserContext) -> Result, CrsnError> { let mut parsed = vec![]; - for expr in items { + 'exprs: for expr in items { let (tokens, listpos) = expect_list(expr, false)?; let mut toki = tokens.into_iter(); @@ -40,8 +40,13 @@ pub fn parse_instructions(items: impl Iterator, pos: &SourcePosition, let mut token_parser = TokenParser::new(toki.collect(), &listpos, pcx); for p in pcx.parsers { token_parser = match p.parse_syntax(pos, &name, token_parser) { - Ok(ParseRes::Parsed(op)) => return Ok(op), - Ok(ParseRes::ParsedNone) => return Ok(Box::new(())), + Ok(ParseRes::Parsed(op)) => { + parsed.push(op); + continue 'exprs; + }, + Ok(ParseRes::ParsedNone) => { + continue 'exprs; + }, Ok(ParseRes::Unknown(to_reuse)) => { if to_reuse.parsing_started() { panic!("Module \"{}\" started parsing syntax, but returned Unknown!", p.name()); diff --git a/crsn/src/builtin/defs.rs b/crsn/src/builtin/defs.rs index a97b6ba..c13c47f 100644 --- a/crsn/src/builtin/defs.rs +++ b/crsn/src/builtin/defs.rs @@ -151,6 +151,12 @@ pub enum BuiltinOp { /// Yield control, optionally yielding a value that must be consumed (by reading the task handle) /// before execution can resume Yield { value: Option }, + /// Set runtime option + RuntimeOpt { opt: Rd, value: Rd }, + /// Begin critical section + CriticalBegin, + /// End critical section + CriticalEnd, /// Deny jumps, skips and run across this address, producing a run-time fault. Barrier { kind: Barrier, diff --git a/crsn/src/builtin/exec.rs b/crsn/src/builtin/exec.rs index 9b5793b..f0c794b 100644 --- a/crsn/src/builtin/exec.rs +++ b/crsn/src/builtin/exec.rs @@ -10,6 +10,8 @@ use crate::runtime::frame::StackFrame; use crate::runtime::run_thread::{state::RunState, ThreadInfo}; use crate::asm::instr::cond::Flag; +use super::RT_OPT_TIMESLICE; + impl OpTrait for BuiltinOp { fn execute(&self, info: &ThreadInfo, state: &mut RunState) -> Result { let program = &info.program; @@ -252,6 +254,35 @@ impl OpTrait for BuiltinOp { BuiltinOp::Join(obj) => { res.sched = SchedSignal::Join(obj.read(state)?); } + BuiltinOp::RuntimeOpt { opt, value } => { + let opt = state.read(opt)?; + let val = state.read(value)?; + + match opt { + RT_OPT_TIMESLICE => { + *state.thread_info.scheduler_interval.write() = Duration::from_micros(val); + } + _ => { + warn!("Invalid rt-opt {}", opt); + state.set_flag(Flag::Invalid, true); + } + } + + } + BuiltinOp::CriticalBegin => { + if state.critical_section == Value::MAX { + return Err(Fault::UnbalancedCriticalSection); + } + state.critical_section += 1; + res.cycles = 0; + } + BuiltinOp::CriticalEnd => { + if state.critical_section == 0 { + return Err(Fault::UnbalancedCriticalSection); + } + state.critical_section -= 1; + res.cycles = 0; + } } Ok(res) diff --git a/crsn/src/builtin/mod.rs b/crsn/src/builtin/mod.rs index fdf1ea9..ba3ceba 100644 --- a/crsn/src/builtin/mod.rs +++ b/crsn/src/builtin/mod.rs @@ -4,11 +4,19 @@ use crate::asm::error::CrsnError; use crate::asm::instr::op::OpKind; use crate::asm::parse::arg_parser::TokenParser; use crate::module::{CrsnExtension, ParseRes}; +use crate::asm::data::literal::Value; +use crate::asm::instr::{Flatten, Op}; +use crate::asm::parse::parse_instructions; +use crate::builtin::defs::BuiltinOp; +use crate::asm::instr::flatten::jumps_to_skips; +use crate::asm::data::{Rd, RdData}; pub mod defs; pub mod exec; pub mod parse; +pub(crate) const RT_OPT_TIMESLICE : u64 = 1; + #[derive(Debug, Clone)] pub struct BuiltinOps; @@ -26,4 +34,76 @@ impl CrsnExtension for BuiltinOps { fn parse_op<'a>(&self, pos: &SourcePosition, keyword: &str, args: TokenParser<'a>) -> Result, CrsnError> { parse::parse_op(pos, keyword, args) } + + /// Get value of an extension-provided constant. + /// This constant may be an object handle, or a constant value used as argument in some other instruction. + fn get_constant_value<'a>(&self, name: &str) -> Option { + match name { + "RT_TIMESLICE" => Some(RT_OPT_TIMESLICE), + _ => None, + } + } + + /// Parse a generic S-expression (non-op) that started with the given keyword + /// + /// pcx is available on the arg_tokens parser + fn parse_syntax<'a>(&self, pos: &SourcePosition, keyword: &str, tokens: TokenParser<'a>) + -> Result>, CrsnError> + { + if keyword == "crit" || keyword == "critical" { + let pcx = tokens.pcx; + let opts = parse_instructions(tokens.into_iter(), pos, pcx)?; + + let flattened = jumps_to_skips(opts.flatten(&pcx.state.borrow_mut().label_num)?)?; + + let len = flattened.len(); + + for (n, op) in flattened.iter().enumerate() { + match op.kind { + OpKind::BuiltIn(BuiltinOp::Skip(Rd(RdData::Immediate(skip)))) => { + let signed = i64::from_ne_bytes(skip.to_ne_bytes()); + let target = n as i64 + signed; + if target < 0 || target > len as i64 { + return Err(CrsnError::Parse("Cannot jump out of a critical section!".into(), op.pos())); + } + } + /* Non-constant skip cannot be validated */ + OpKind::BuiltIn(BuiltinOp::Skip(_)) => { + return Err(CrsnError::Parse("Variable skips are not allowed in a critical section".into(), op.pos())); + } + /* Yield in critical makes zero sense */ + OpKind::BuiltIn(BuiltinOp::Yield { .. }) => { + return Err(CrsnError::Parse("Yield in a critical section!".into(), op.pos())); + } + /* This is likely a bug */ + OpKind::BuiltIn(BuiltinOp::Ret(_)) => { + return Err(CrsnError::Parse("Ret in a critical section!".into(), op.pos())); + } + /* Probably also a bug. If someone really wants this, they can start and end the critical section manually. */ + OpKind::BuiltIn(BuiltinOp::FarJump(_)) => { + return Err(CrsnError::Parse("Far jump a critical section!".into(), op.pos())); + } + _ => {} + } + } + + let vec : Vec> = vec![ + Box::new(Op { + kind: OpKind::BuiltIn(BuiltinOp::CriticalBegin), + cond: None, + pos: pos.clone() + }), + Box::new(flattened), + Box::new(Op { + kind: OpKind::BuiltIn(BuiltinOp::CriticalEnd), + cond: None, + pos: pos.clone() + }) + ]; + + return Ok(ParseRes::Parsed(Box::new(vec))); + } + + Ok(ParseRes::Unknown(tokens)) + } } diff --git a/crsn/src/builtin/parse.rs b/crsn/src/builtin/parse.rs index 21ab651..4424a15 100644 --- a/crsn/src/builtin/parse.rs +++ b/crsn/src/builtin/parse.rs @@ -183,6 +183,21 @@ pub(crate) fn parse_op<'a>(op_pos: &SourcePosition, keyword: &str, mut args: Tok }) } + "crit-begin" => { + BuiltinOp::CriticalBegin + } + + "crit-end" => { + BuiltinOp::CriticalEnd + } + + "rt-opt" => { + BuiltinOp::RuntimeOpt { + opt: args.next_rd()?, + value: args.next_rd()?, + } + } + "ld" => { BuiltinOp::Load { dst: args.next_wr()?, @@ -349,6 +364,9 @@ pub(crate) fn parse_routine_name(name: String, pos: &SourcePosition) -> Result Sexp { match op { + BuiltinOp::CriticalBegin => sexp::list(&[A("crit-begin")]), + BuiltinOp::CriticalEnd => sexp::list(&[A("crit-end")]), + BuiltinOp::RuntimeOpt { opt, value } => sexp::list(&[A("rt-opt"), A(opt), A(value)]), BuiltinOp::Nop => sexp::list(&[A("nop")]), BuiltinOp::Halt => sexp::list(&[A("halt")]), BuiltinOp::Sleep { count: micros, unit_us: SleepUnit::Sec } => sexp::list(&[A("sslp"), A(micros)]), @@ -594,10 +612,12 @@ mod test { parked: Default::default(), global_regs: [0; REG_COUNT], ext_data: Default::default(), - cr_deadline: None + cr_deadline: None, + critical_section: false, }, const_eval_ti: ti.clone(), - parsing_expr: false + parsing_expr: false, + label_num: Default::default() }), }; diff --git a/crsn/src/runtime/fault.rs b/crsn/src/runtime/fault.rs index 4903ec2..29f1ef6 100644 --- a/crsn/src/runtime/fault.rs +++ b/crsn/src/runtime/fault.rs @@ -77,6 +77,15 @@ pub enum Fault { #[error("Attempt to read undefined extension data store")] ExtDataNotDefined, + + #[error("Unbalanced critical sections")] + UnbalancedCriticalSection, + + #[error("Deadlock detected")] + Deadlock, + + #[error("Root routine returned or yielded a value")] + RootReturned, } #[derive(Error, Debug)] diff --git a/crsn/src/runtime/run_thread.rs b/crsn/src/runtime/run_thread.rs index 632f403..f16c781 100644 --- a/crsn/src/runtime/run_thread.rs +++ b/crsn/src/runtime/run_thread.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use std::thread::JoinHandle; use std::time::{Duration, Instant}; pub use info::ThreadInfo; @@ -14,6 +13,8 @@ use crate::runtime::program::Program; use crate::runtime::run_thread::state::{CoroutineContext, CoroutineState}; use std::{mem, thread}; use crate::asm::instr::cond::Flag; +use parking_lot::RwLock; +use crate::asm::instr::Flatten; #[derive(Clone, Copy, Eq, PartialEq, Debug, Ord, PartialOrd)] pub struct ThreadToken(pub u32); @@ -40,12 +41,14 @@ impl RunThread { pub fn new(params: ThreadParams) -> Self { let extensions = params.program.extensions.clone(); + // TODO investigate if this division to 2 structs is still needed + let ti = Arc::new(ThreadInfo { id: params.id, uniq: params.uniq, program: params.program, cycle_time: params.cycle_time, - scheduler_interval: params.scheduler_interval, + scheduler_interval: RwLock::new(params.scheduler_interval), extensions, }); @@ -60,7 +63,8 @@ impl RunThread { parked: Default::default(), global_regs: [0; REG_COUNT], ext_data: Default::default(), - cr_deadline: None + cr_deadline: None, + critical_section: 0, }; Self { @@ -69,21 +73,17 @@ impl RunThread { } } - /// Spawn as a thread - pub fn spawn(self) -> JoinHandle<()> { - std::thread::spawn(move || { - self.run(); - }) - } - /// Start synchronously pub fn run(mut self) { let mut loop_helper = spin_sleep::LoopHelper::builder() .build_with_target_rate(1.0/self.info.cycle_time.as_secs_f64()); loop_helper.loop_start(); - 'run: loop { + let mut orig_pc; + + let fault = 'run: loop { let mut want_switch = false; + orig_pc = self.state.cr.frame.pc; match self.eval_op() { Ok(EvalRes { cycles, advance, sched }) => { for _ in 0..cycles { @@ -91,7 +91,6 @@ impl RunThread { loop_helper.loop_start(); } trace!("Step {}; Status = {}", advance, self.state.cr.frame.status); - let orig_pc = self.state.cr.frame.pc; self.state.cr.frame.pc.advance(advance); if let Some(dl) = self.state.cr_deadline { @@ -157,80 +156,125 @@ impl RunThread { // Do not advance, the last instruction will be re-tried want_switch = true; } - Err(Fault::Halt) => { - // TODO implement coordinated shutdown when more threads are running! - break 'run; - } Err(e) => { - error!("Fault: {:?}", e); - error!("Core dump: {:?}", self.state); - break 'run; + break 'run e; } } - if want_switch { - trace!("Switch requested"); - - let now = Instant::now(); + // Resolve the next coroutine to run, or wait a bit... + 'next: loop { + if want_switch { + let now = Instant::now(); - let mut candidate = None; - let mut closest_due = None; - for _ in 0..self.state.parked.len() { - if let Some(mut rt) = self.state.parked.pop_front() { - match rt.cr_state { - CoroutineState::Ready => { - candidate = Some(rt); - break; - } + if self.state.critical_section > 0 { + match self.state.cr.cr_state { + CoroutineState::Ready => {} CoroutineState::Sleep { due } => { - if due <= now { - rt.cr_state = CoroutineState::Ready; + let time = due.saturating_duration_since(now); + trace!("Sleep in critical: {:?}", time); + thread::sleep(time); + continue 'run; + } + CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => { + // This is not good + break 'run Fault::Deadlock; + } + } + + // This is either a bug, or waiting for IO. + // If it is the latter, try to give the OS a bit of breathing room.. + std::thread::yield_now(); + continue 'run; + } + + trace!("Switch requested"); + + let mut candidate = None; + let mut closest_due = None; + for _ in 0..self.state.parked.len() { + if let Some(mut rt) = self.state.parked.pop_front() { + match rt.cr_state { + CoroutineState::Ready => { candidate = Some(rt); break; - } else { - match closest_due { - Some(d) => { - if d > due { + } + CoroutineState::Sleep { due } => { + if due <= now { + rt.cr_state = CoroutineState::Ready; + candidate = Some(rt); + break; + } else { + match closest_due { + Some(d) => { + if d > due { + closest_due = Some(due); + } + }, + None => { closest_due = Some(due); } - }, - None => { - closest_due = Some(due); } + self.state.parked.push_back(rt); } + } + _ => { self.state.parked.push_back(rt); } } - _ => { - self.state.parked.push_back(rt); - } } } - } - if let Some(cr) = candidate { - trace!("Context switch to {:?}", cr); - - // Do switch - let old = mem::replace(&mut self.state.cr, cr); - self.state.parked.push_back(old); - self.state.cr_deadline = Some(now + self.info.scheduler_interval); - } else if let Some(due) = closest_due { - let time = due.saturating_duration_since(now); - trace!("No thread to switch to, sleep {:?}", time); - thread::sleep(time); + if let Some(cr) = candidate { + trace!("Context switch to {:?}", cr); + + // Do switch + let old = mem::replace(&mut self.state.cr, cr); + self.state.parked.push_back(old); + self.state.cr_deadline = Some(now + *self.info.scheduler_interval.read()); + } else if let Some(due) = closest_due { + let time = due.saturating_duration_since(now); + trace!("No thread to switch to, sleep {:?}", time); + thread::sleep(time); + + continue 'next; + } else { + // Nothing to run? + thread::yield_now(); + trace!("No thread to switch to!"); + } + + let n_alive = self.state.parked.iter() + .filter(|p| p.cr_state.is_alive()).count(); + + if n_alive == 0 { + trace!("Stop task switching, no parked threads are alive"); + self.state.cr_deadline = None; + } + + match self.state.cr.cr_state { + CoroutineState::Finished(_) | CoroutineState::YieldedValue(_) => { + break 'run Fault::RootReturned; + } + _ => {} + } } - let n_alive = self.state.parked.iter() - .filter(|p| p.cr_state.is_alive()).count(); + break 'next; + } + }; - if n_alive == 0 { - trace!("Stop task switching, no parked threads are alive"); - self.state.cr_deadline = None; + match fault { + Fault::Halt => { + debug!("Thread ended."); + } + e => { + if let Some(instr) = self.info.program.ops.get(orig_pc.0 as usize) { + error!("Fault at {}: {}", instr.pos(), e); + } else { + error!("Fault at PC {}: {}", orig_pc, e); } + warn!("Core dump: {:?}", self.state); } } - - debug!("Thread ended."); } } diff --git a/crsn/src/runtime/run_thread/info.rs b/crsn/src/runtime/run_thread/info.rs index ac976e5..b6aff0e 100644 --- a/crsn/src/runtime/run_thread/info.rs +++ b/crsn/src/runtime/run_thread/info.rs @@ -6,6 +6,7 @@ use crate::asm::data::literal::Value; use crate::runtime::program::Program; use crate::runtime::run_thread::ThreadToken; use crate::module::{CrsnExtension, CrsnUniq}; +use parking_lot::RwLock; #[derive(Debug)] pub struct ThreadInfo { @@ -18,7 +19,7 @@ pub struct ThreadInfo { /// Program to run pub(crate) cycle_time: Duration, /// Interval one thread/coroutine is let to run before the context switches - pub(crate) scheduler_interval: Duration, + pub(crate) scheduler_interval: RwLock, /// Extensions pub extensions: Arc>>, } diff --git a/crsn/src/runtime/run_thread/state.rs b/crsn/src/runtime/run_thread/state.rs index 0e0341f..e6c52ea 100644 --- a/crsn/src/runtime/run_thread/state.rs +++ b/crsn/src/runtime/run_thread/state.rs @@ -26,6 +26,8 @@ pub struct RunState { pub ext_data: ExtensionDataStore, /// Execution deadline, if multi-tasked pub cr_deadline: Option, + /// Nonzero if inside a critical section + pub critical_section: Value, } #[derive(Debug,Default)] @@ -93,7 +95,7 @@ impl RunState { }); if self.cr_deadline.is_none() { // start context switching - self.cr_deadline = Some(Instant::now() + self.thread_info.scheduler_interval); + self.cr_deadline = Some(Instant::now() + *self.thread_info.scheduler_interval.read()); } handle } diff --git a/examples/coroutines2.csn b/examples/coroutines2-gen.csn similarity index 100% rename from examples/coroutines2.csn rename to examples/coroutines2-gen.csn diff --git a/examples/coroutines3-crit.csn b/examples/coroutines3-crit.csn new file mode 100644 index 0000000..eed6d59 --- /dev/null +++ b/examples/coroutines3-crit.csn @@ -0,0 +1,39 @@ +( + ; This example shows the use of critical sections. + + (spawn _ unsafe 'A' 'Z') + (spawn _ safe '0' '9') + (ssleep 2) + + (proc unsafe start end + ; This can be interrupted any time + (:x) + (ld r0 start) + (:l) + (msleep 5) + (ld @cout r0) + (cmp r0 end) + (j.eq :x) + (inc r0) + (j :l) + ) + + (proc safe start end + (:again) + (ld r0 start) + ; The sequence will always be complete + (crit + (ld @cout ' ') ; space to make it easier to read + (:l) + (ld @cout r0) + (cmp r0 end) + (j.eq :x) + (inc r0) + (j :l) + (:x) + (ld @cout ' ') ; space to make it easier to read + ) + (yield) + (j :again) + ) +)