diff --git a/crsn/src/runtime/fault.rs b/crsn/src/runtime/fault.rs index 29f1ef6..d6f9fdf 100644 --- a/crsn/src/runtime/fault.rs +++ b/crsn/src/runtime/fault.rs @@ -86,6 +86,9 @@ pub enum Fault { #[error("Root routine returned or yielded a value")] RootReturned, + + #[error("System IO error: {0}")] + IOError(#[from] std::io::Error), } #[derive(Error, Debug)] diff --git a/crsn/src/runtime/run_thread.rs b/crsn/src/runtime/run_thread.rs index 7ed8f1d..89798d5 100644 --- a/crsn/src/runtime/run_thread.rs +++ b/crsn/src/runtime/run_thread.rs @@ -153,7 +153,9 @@ impl RunThread { } } Err(Fault::Blocked) => { - // Do not advance, the last instruction will be re-tried + trace!("Thread reports being blocked!"); + self.state.cr.frame.pc = orig_pc; // Retry + self.state.cr.cr_state = CoroutineState::Ready; want_switch = true; } Err(e) => { @@ -191,18 +193,20 @@ impl RunThread { let mut candidate = None; let mut closest_due = None; - for _ in 0..self.state.parked.len() { + 'findbest: for _ in 0..self.state.parked.len() { if let Some(mut rt) = self.state.parked.pop_front() { match rt.cr_state { CoroutineState::Ready => { + trace!("Found READY thread to run next"); candidate = Some(rt); - break; + break 'findbest; } CoroutineState::Sleep { due } => { if due <= now { + trace!("Found DUE sleeping thread to run next"); rt.cr_state = CoroutineState::Ready; candidate = Some(rt); - break; + break 'findbest; } else { match closest_due { Some(d) => { @@ -234,10 +238,15 @@ impl RunThread { self.state.start_task_switching(); } else if let Some(due) = closest_due { + if self.state.cr.cr_state == CoroutineState::Ready { + trace!("No candidate found to run, retry same thread."); + // Let it run another quantum, maybe it was just blocked + continue 'run; + } + let time = due.saturating_duration_since(now); trace!("No thread to switch to, sleep {:?}", time); thread::sleep(time); - continue 'next; } else { // Nothing to run? diff --git a/crsn_stdio/src/lib.rs b/crsn_stdio/src/lib.rs index 126cec4..c9af344 100644 --- a/crsn_stdio/src/lib.rs +++ b/crsn_stdio/src/lib.rs @@ -14,6 +14,7 @@ use std::io; use crsn::asm::instr::cond::Flag; use std::fmt; use crsn::asm::data::Wr; +use std::time::Instant; mod console { use std::{io}; @@ -21,13 +22,29 @@ mod console { use std::ffi::c_void; use std::mem::{self, MaybeUninit}; + use crsn::runtime::fault::Fault; + use std::time::{Duration, Instant}; + + struct ReadCharState { + bytes: [u8; 4], + cursor: usize, + len: usize, + last_timeout : u8, + } + + static mut READ_CHAR_STATE: ReadCharState = ReadCharState { + bytes: [0; 4], + cursor: 0, + len: 0, + last_timeout : 0, + }; - fn setup_fd(fd: RawFd) -> io::Result { + fn setup_fd(fd: RawFd) -> Result { use libc::*; let mut tio = MaybeUninit::uninit(); if 0 != unsafe { tcgetattr(fd, tio.as_mut_ptr()) } { - return Err(io::Error::last_os_error()); + return Err(Fault::IOError(io::Error::last_os_error())); } let mut tio = unsafe { MaybeUninit::assume_init(tio) }; let old_tio : termios = unsafe { mem::transmute_copy(&tio) }; @@ -39,36 +56,76 @@ mod console { tio.c_cc[VMIN] = 1; tio.c_cc[VTIME] = 0; if 0 != unsafe { tcsetattr(fd, TCSANOW, &tio) } { - return Err(io::Error::last_os_error()); + return Err(Fault::IOError(io::Error::last_os_error())); } Ok(old_tio) } - pub fn init_io() -> io::Result { + pub fn init_io() -> Result { setup_fd(libc::STDIN_FILENO) } - pub fn read_byte() -> io::Result { + pub fn read_byte(deadline : Option) -> Result { + // Set TIO timeout + let state = unsafe { &mut READ_CHAR_STATE }; + + if (state.last_timeout == 0) && deadline.is_none() { + // Keep it like that + } else { + let vtime = if let Some(dl) = deadline { + let timeout = dl.saturating_duration_since(Instant::now()); + ((timeout.as_secs_f32() * 10.0).round() as u32).min(255).max(1) as u8 + } else { + 0 + }; + + if state.last_timeout != vtime { + // vtime changes + state.last_timeout = vtime; + + let mut tio = MaybeUninit::uninit(); + if 0 != unsafe { libc::tcgetattr(libc::STDIN_FILENO, tio.as_mut_ptr()) } { + return Err(Fault::IOError(io::Error::last_os_error())); + } + let mut tio = unsafe { MaybeUninit::assume_init(tio) }; + + if vtime > 0 { + tio.c_cc[libc::VTIME] = vtime; /* unit = 0.1 */ + tio.c_cc[libc::VMIN] = 0; + } else { + tio.c_cc[libc::VTIME] = 0; // no counting + tio.c_cc[libc::VMIN] = 1; // want at least one character + } + + unsafe { libc::tcsetattr(libc::STDIN_FILENO, libc::TCSANOW, &tio) }; + } + } + let mut buf = 0u8; let len = unsafe { libc::read(libc::STDIN_FILENO, &mut buf as *mut u8 as *mut c_void, 1) }; + + if len == 0 && state.last_timeout != 0 { + return Err(Fault::IOError(io::Error::new(std::io::ErrorKind::TimedOut, ""))); + } + if len <= 0 { - Err(io::Error::last_os_error()) + Err(Fault::IOError(io::Error::last_os_error())) } else { Ok(buf as u8) } } - pub fn write_byte(b : u8) -> io::Result<()> { + pub fn write_byte(b : u8) -> Result<(), Fault> { let len = unsafe { libc::write(libc::STDOUT_FILENO, &b as *const u8 as *const c_void, 1) }; if len <= 0 { - Err(io::Error::last_os_error()) + Err(Fault::IOError(io::Error::last_os_error())) } else { Ok(()) } } - pub fn write_char(c : char) -> io::Result<()> { + pub fn write_char(c : char) -> Result<(), Fault> { let mut buf = [0u8; 4]; for b in c.encode_utf8(&mut buf).as_bytes() { write_byte(*b)?; @@ -76,30 +133,43 @@ mod console { Ok(()) } - pub fn read_char() -> io::Result { - let first = read_byte()?; + pub fn read_char(deadline : Option) -> Result { + let state = unsafe { &mut READ_CHAR_STATE }; - if first & 0x80 == 0 { - return Ok(first as char); - } + if state.cursor == 0 { + let first = read_byte(deadline)?; - let mut bytes = [first, 0, 0, 0]; + if first & 0x80 == 0 { + return Ok(first as char); + } - let remain = if first & 0b1110_0000 == 0b1100_0000 { - 1 - } else if first & 0b1111_0000 == 0b1110_0000 { - 2 - } else /*if first & 0b1111_1000 == 0b1111_0000*/ { - 3 - }; + state.bytes[0] = first; + state.cursor = 1; + state.len = if first & 0b1110_0000 == 0b1100_0000 { + 2 + } else if first & 0b1111_0000 == 0b1110_0000 { + 3 + } else /*if first & 0b1111_1000 == 0b1111_0000*/ { + 4 + }; + } - for n in 1..=remain { - bytes[n] = read_byte()?; + let len = state.len; + + while state.cursor < len { + let b = read_byte(deadline)?; + state.bytes[state.cursor] = b; + state.cursor += 1; } - std::str::from_utf8(&bytes[..=remain]) + let rv = std::str::from_utf8(&state.bytes[..=len]) .map(|s| s.chars().nth(0).unwrap()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + .map_err(|e| Fault::IOError(io::Error::new(io::ErrorKind::InvalidData, e))); + + state.cursor = 0; + state.len = 0; + + rv } } @@ -183,35 +253,47 @@ impl CrsnExtension for StdioOps { fn read_obj(&self, state: &mut RunState, handle: Value) -> Result, Fault> { + let deadline = state.cr_deadline; + if handle == self.hdl_stdin { - match console::read_char() { + match console::read_char(deadline) { Ok(c) => { return Ok(Some(c as Value)); } - Err(e) => { + Err(Fault::IOError(e)) => { + if e.kind() == io::ErrorKind::TimedOut { + return Err(Fault::Blocked); + } + state.set_flag(Flag::Invalid, true); if e.kind() != io::ErrorKind::InvalidData { state.set_flag(Flag::Eof, true); } return Ok(Some(0)); } + Err(other) => { + return Err(other); + } } - } - - if handle == self.hdl_stdin_raw { - match console::read_byte() { + } else if handle == self.hdl_stdin_raw { + match console::read_byte(deadline) { Ok(b) => { return Ok(Some(b as Value)); } - Err(_e) => { + Err(Fault::IOError(e)) => { + if e.kind() == io::ErrorKind::TimedOut { + return Err(Fault::Blocked); + } + state.set_flag(Flag::Invalid, true); state.set_flag(Flag::Eof, true); return Ok(Some(0)); } + Err(other) => { + return Err(other); + } } - } - - if handle == self.hdl_stdout || handle == self.hdl_stdout_raw { + } else if handle == self.hdl_stdout || handle == self.hdl_stdout_raw { state.set_flag(Flag::Invalid, true); return Ok(Some(0)); } @@ -254,13 +336,15 @@ impl CrsnExtension for StdioOps { fn read_obj_all(&self, state: &mut RunState, whandle: Wr, rhandle: Value) -> Result, Fault> { + // XXX This is blocking, there is no sensible way to split it up. + if rhandle == self.hdl_stdin { loop { - match console::read_char() { + match console::read_char(None) { Ok(c) => { state.write(whandle, c as Value)?; } - Err(e) => { + Err(Fault::IOError(e)) => { if e.kind() != io::ErrorKind::InvalidData { state.set_flag(Flag::Eof, true); } else { @@ -268,17 +352,20 @@ impl CrsnExtension for StdioOps { } return Ok(Some(())); } + Err(other) => { + return Err(other); + } } } } if rhandle == self.hdl_stdin_raw { loop { - match console::read_byte() { + match console::read_byte(None) { Ok(c) => { state.write(whandle, c as Value)?; } - Err(e) => { + Err(Fault::IOError(e)) => { if e.kind() != io::ErrorKind::InvalidData { state.set_flag(Flag::Eof, true); } else { @@ -286,6 +373,9 @@ impl CrsnExtension for StdioOps { } return Ok(Some(())); } + Err(other) => { + return Err(other); + } } } } diff --git a/examples/coroutines4-io.csn b/examples/coroutines4-io.csn new file mode 100644 index 0000000..66088fc --- /dev/null +++ b/examples/coroutines4-io.csn @@ -0,0 +1,17 @@ +( + ; This demo shows that stdin is nearly non-blocking + + (spawn _ ReadAndEcho) + + (:a) + (mslp 2500) + (lds @cout "Tick\n") + (j :a) + + (proc ReadAndEcho + (:a) + (ld @cout '?') + (ld @cout @cin) + (j :a) + ) +)