make stdin async

master
Ondřej Hruška 4 years ago
parent f87d822432
commit fd90480cec
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 3
      crsn/src/runtime/fault.rs
  2. 19
      crsn/src/runtime/run_thread.rs
  3. 156
      crsn_stdio/src/lib.rs
  4. 17
      examples/coroutines4-io.csn

@ -86,6 +86,9 @@ pub enum Fault {
#[error("Root routine returned or yielded a value")] #[error("Root routine returned or yielded a value")]
RootReturned, RootReturned,
#[error("System IO error: {0}")]
IOError(#[from] std::io::Error),
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]

@ -153,7 +153,9 @@ impl RunThread {
} }
} }
Err(Fault::Blocked) => { Err(Fault::Blocked) => {
// Do not advance, the last instruction will be re-tried trace!("Thread reports being blocked!");
self.state.cr.frame.pc = orig_pc; // Retry
self.state.cr.cr_state = CoroutineState::Ready;
want_switch = true; want_switch = true;
} }
Err(e) => { Err(e) => {
@ -191,18 +193,20 @@ impl RunThread {
let mut candidate = None; let mut candidate = None;
let mut closest_due = None; let mut closest_due = None;
for _ in 0..self.state.parked.len() { 'findbest: for _ in 0..self.state.parked.len() {
if let Some(mut rt) = self.state.parked.pop_front() { if let Some(mut rt) = self.state.parked.pop_front() {
match rt.cr_state { match rt.cr_state {
CoroutineState::Ready => { CoroutineState::Ready => {
trace!("Found READY thread to run next");
candidate = Some(rt); candidate = Some(rt);
break; break 'findbest;
} }
CoroutineState::Sleep { due } => { CoroutineState::Sleep { due } => {
if due <= now { if due <= now {
trace!("Found DUE sleeping thread to run next");
rt.cr_state = CoroutineState::Ready; rt.cr_state = CoroutineState::Ready;
candidate = Some(rt); candidate = Some(rt);
break; break 'findbest;
} else { } else {
match closest_due { match closest_due {
Some(d) => { Some(d) => {
@ -234,10 +238,15 @@ impl RunThread {
self.state.start_task_switching(); self.state.start_task_switching();
} else if let Some(due) = closest_due { } else if let Some(due) = closest_due {
if self.state.cr.cr_state == CoroutineState::Ready {
trace!("No candidate found to run, retry same thread.");
// Let it run another quantum, maybe it was just blocked
continue 'run;
}
let time = due.saturating_duration_since(now); let time = due.saturating_duration_since(now);
trace!("No thread to switch to, sleep {:?}", time); trace!("No thread to switch to, sleep {:?}", time);
thread::sleep(time); thread::sleep(time);
continue 'next; continue 'next;
} else { } else {
// Nothing to run? // Nothing to run?

@ -14,6 +14,7 @@ use std::io;
use crsn::asm::instr::cond::Flag; use crsn::asm::instr::cond::Flag;
use std::fmt; use std::fmt;
use crsn::asm::data::Wr; use crsn::asm::data::Wr;
use std::time::Instant;
mod console { mod console {
use std::{io}; use std::{io};
@ -21,13 +22,29 @@ mod console {
use std::ffi::c_void; use std::ffi::c_void;
use std::mem::{self, MaybeUninit}; use std::mem::{self, MaybeUninit};
use crsn::runtime::fault::Fault;
use std::time::{Duration, Instant};
struct ReadCharState {
bytes: [u8; 4],
cursor: usize,
len: usize,
last_timeout : u8,
}
fn setup_fd(fd: RawFd) -> io::Result<libc::termios> { static mut READ_CHAR_STATE: ReadCharState = ReadCharState {
bytes: [0; 4],
cursor: 0,
len: 0,
last_timeout : 0,
};
fn setup_fd(fd: RawFd) -> Result<libc::termios, Fault> {
use libc::*; use libc::*;
let mut tio = MaybeUninit::uninit(); let mut tio = MaybeUninit::uninit();
if 0 != unsafe { tcgetattr(fd, tio.as_mut_ptr()) } { if 0 != unsafe { tcgetattr(fd, tio.as_mut_ptr()) } {
return Err(io::Error::last_os_error()); return Err(Fault::IOError(io::Error::last_os_error()));
} }
let mut tio = unsafe { MaybeUninit::assume_init(tio) }; let mut tio = unsafe { MaybeUninit::assume_init(tio) };
let old_tio : termios = unsafe { mem::transmute_copy(&tio) }; let old_tio : termios = unsafe { mem::transmute_copy(&tio) };
@ -39,36 +56,76 @@ mod console {
tio.c_cc[VMIN] = 1; tio.c_cc[VMIN] = 1;
tio.c_cc[VTIME] = 0; tio.c_cc[VTIME] = 0;
if 0 != unsafe { tcsetattr(fd, TCSANOW, &tio) } { if 0 != unsafe { tcsetattr(fd, TCSANOW, &tio) } {
return Err(io::Error::last_os_error()); return Err(Fault::IOError(io::Error::last_os_error()));
} }
Ok(old_tio) Ok(old_tio)
} }
pub fn init_io() -> io::Result<libc::termios> { pub fn init_io() -> Result<libc::termios, Fault> {
setup_fd(libc::STDIN_FILENO) setup_fd(libc::STDIN_FILENO)
} }
pub fn read_byte() -> io::Result<u8> { pub fn read_byte(deadline : Option<Instant>) -> Result<u8, Fault> {
// Set TIO timeout
let state = unsafe { &mut READ_CHAR_STATE };
if (state.last_timeout == 0) && deadline.is_none() {
// Keep it like that
} else {
let vtime = if let Some(dl) = deadline {
let timeout = dl.saturating_duration_since(Instant::now());
((timeout.as_secs_f32() * 10.0).round() as u32).min(255).max(1) as u8
} else {
0
};
if state.last_timeout != vtime {
// vtime changes
state.last_timeout = vtime;
let mut tio = MaybeUninit::uninit();
if 0 != unsafe { libc::tcgetattr(libc::STDIN_FILENO, tio.as_mut_ptr()) } {
return Err(Fault::IOError(io::Error::last_os_error()));
}
let mut tio = unsafe { MaybeUninit::assume_init(tio) };
if vtime > 0 {
tio.c_cc[libc::VTIME] = vtime; /* unit = 0.1 */
tio.c_cc[libc::VMIN] = 0;
} else {
tio.c_cc[libc::VTIME] = 0; // no counting
tio.c_cc[libc::VMIN] = 1; // want at least one character
}
unsafe { libc::tcsetattr(libc::STDIN_FILENO, libc::TCSANOW, &tio) };
}
}
let mut buf = 0u8; let mut buf = 0u8;
let len = unsafe { libc::read(libc::STDIN_FILENO, &mut buf as *mut u8 as *mut c_void, 1) }; let len = unsafe { libc::read(libc::STDIN_FILENO, &mut buf as *mut u8 as *mut c_void, 1) };
if len == 0 && state.last_timeout != 0 {
return Err(Fault::IOError(io::Error::new(std::io::ErrorKind::TimedOut, "")));
}
if len <= 0 { if len <= 0 {
Err(io::Error::last_os_error()) Err(Fault::IOError(io::Error::last_os_error()))
} else { } else {
Ok(buf as u8) Ok(buf as u8)
} }
} }
pub fn write_byte(b : u8) -> io::Result<()> { pub fn write_byte(b : u8) -> Result<(), Fault> {
let len = unsafe { libc::write(libc::STDOUT_FILENO, &b as *const u8 as *const c_void, 1) }; let len = unsafe { libc::write(libc::STDOUT_FILENO, &b as *const u8 as *const c_void, 1) };
if len <= 0 { if len <= 0 {
Err(io::Error::last_os_error()) Err(Fault::IOError(io::Error::last_os_error()))
} else { } else {
Ok(()) Ok(())
} }
} }
pub fn write_char(c : char) -> io::Result<()> { pub fn write_char(c : char) -> Result<(), Fault> {
let mut buf = [0u8; 4]; let mut buf = [0u8; 4];
for b in c.encode_utf8(&mut buf).as_bytes() { for b in c.encode_utf8(&mut buf).as_bytes() {
write_byte(*b)?; write_byte(*b)?;
@ -76,30 +133,43 @@ mod console {
Ok(()) Ok(())
} }
pub fn read_char() -> io::Result<char> { pub fn read_char(deadline : Option<Instant>) -> Result<char, Fault> {
let first = read_byte()?; let state = unsafe { &mut READ_CHAR_STATE };
if state.cursor == 0 {
let first = read_byte(deadline)?;
if first & 0x80 == 0 { if first & 0x80 == 0 {
return Ok(first as char); return Ok(first as char);
} }
let mut bytes = [first, 0, 0, 0]; state.bytes[0] = first;
state.cursor = 1;
let remain = if first & 0b1110_0000 == 0b1100_0000 { state.len = if first & 0b1110_0000 == 0b1100_0000 {
1
} else if first & 0b1111_0000 == 0b1110_0000 {
2 2
} else /*if first & 0b1111_1000 == 0b1111_0000*/ { } else if first & 0b1111_0000 == 0b1110_0000 {
3 3
} else /*if first & 0b1111_1000 == 0b1111_0000*/ {
4
}; };
}
for n in 1..=remain { let len = state.len;
bytes[n] = read_byte()?;
while state.cursor < len {
let b = read_byte(deadline)?;
state.bytes[state.cursor] = b;
state.cursor += 1;
} }
std::str::from_utf8(&bytes[..=remain]) let rv = std::str::from_utf8(&state.bytes[..=len])
.map(|s| s.chars().nth(0).unwrap()) .map(|s| s.chars().nth(0).unwrap())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) .map_err(|e| Fault::IOError(io::Error::new(io::ErrorKind::InvalidData, e)));
state.cursor = 0;
state.len = 0;
rv
} }
} }
@ -183,35 +253,47 @@ impl CrsnExtension for StdioOps {
fn read_obj(&self, state: &mut RunState, handle: Value) fn read_obj(&self, state: &mut RunState, handle: Value)
-> Result<Option<Value>, Fault> -> Result<Option<Value>, Fault>
{ {
let deadline = state.cr_deadline;
if handle == self.hdl_stdin { if handle == self.hdl_stdin {
match console::read_char() { match console::read_char(deadline) {
Ok(c) => { Ok(c) => {
return Ok(Some(c as Value)); return Ok(Some(c as Value));
} }
Err(e) => { Err(Fault::IOError(e)) => {
if e.kind() == io::ErrorKind::TimedOut {
return Err(Fault::Blocked);
}
state.set_flag(Flag::Invalid, true); state.set_flag(Flag::Invalid, true);
if e.kind() != io::ErrorKind::InvalidData { if e.kind() != io::ErrorKind::InvalidData {
state.set_flag(Flag::Eof, true); state.set_flag(Flag::Eof, true);
} }
return Ok(Some(0)); return Ok(Some(0));
} }
Err(other) => {
return Err(other);
} }
} }
} else if handle == self.hdl_stdin_raw {
if handle == self.hdl_stdin_raw { match console::read_byte(deadline) {
match console::read_byte() {
Ok(b) => { Ok(b) => {
return Ok(Some(b as Value)); return Ok(Some(b as Value));
} }
Err(_e) => { Err(Fault::IOError(e)) => {
if e.kind() == io::ErrorKind::TimedOut {
return Err(Fault::Blocked);
}
state.set_flag(Flag::Invalid, true); state.set_flag(Flag::Invalid, true);
state.set_flag(Flag::Eof, true); state.set_flag(Flag::Eof, true);
return Ok(Some(0)); return Ok(Some(0));
} }
Err(other) => {
return Err(other);
} }
} }
} else if handle == self.hdl_stdout || handle == self.hdl_stdout_raw {
if handle == self.hdl_stdout || handle == self.hdl_stdout_raw {
state.set_flag(Flag::Invalid, true); state.set_flag(Flag::Invalid, true);
return Ok(Some(0)); return Ok(Some(0));
} }
@ -254,13 +336,15 @@ impl CrsnExtension for StdioOps {
fn read_obj_all(&self, state: &mut RunState, whandle: Wr, rhandle: Value) fn read_obj_all(&self, state: &mut RunState, whandle: Wr, rhandle: Value)
-> Result<Option<()>, Fault> -> Result<Option<()>, Fault>
{ {
// XXX This is blocking, there is no sensible way to split it up.
if rhandle == self.hdl_stdin { if rhandle == self.hdl_stdin {
loop { loop {
match console::read_char() { match console::read_char(None) {
Ok(c) => { Ok(c) => {
state.write(whandle, c as Value)?; state.write(whandle, c as Value)?;
} }
Err(e) => { Err(Fault::IOError(e)) => {
if e.kind() != io::ErrorKind::InvalidData { if e.kind() != io::ErrorKind::InvalidData {
state.set_flag(Flag::Eof, true); state.set_flag(Flag::Eof, true);
} else { } else {
@ -268,17 +352,20 @@ impl CrsnExtension for StdioOps {
} }
return Ok(Some(())); return Ok(Some(()));
} }
Err(other) => {
return Err(other);
}
} }
} }
} }
if rhandle == self.hdl_stdin_raw { if rhandle == self.hdl_stdin_raw {
loop { loop {
match console::read_byte() { match console::read_byte(None) {
Ok(c) => { Ok(c) => {
state.write(whandle, c as Value)?; state.write(whandle, c as Value)?;
} }
Err(e) => { Err(Fault::IOError(e)) => {
if e.kind() != io::ErrorKind::InvalidData { if e.kind() != io::ErrorKind::InvalidData {
state.set_flag(Flag::Eof, true); state.set_flag(Flag::Eof, true);
} else { } else {
@ -286,6 +373,9 @@ impl CrsnExtension for StdioOps {
} }
return Ok(Some(())); return Ok(Some(()));
} }
Err(other) => {
return Err(other);
}
} }
} }
} }

@ -0,0 +1,17 @@
(
; This demo shows that stdin is nearly non-blocking
(spawn _ ReadAndEcho)
(:a)
(mslp 2500)
(lds @cout "Tick\n")
(j :a)
(proc ReadAndEcho
(:a)
(ld @cout '?')
(ld @cout @cin)
(j :a)
)
)
Loading…
Cancel
Save