working prototype

master
Ondřej Hruška 5 years ago
commit ecaad53beb
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 1
      .gitignore
  2. 1261
      Cargo.lock
  3. 18
      Cargo.toml
  4. 91
      src/config.rs
  5. 400
      src/main.rs

1
.gitignore vendored

@ -0,0 +1 @@
/target

1261
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,18 @@
[package]
name = "postit"
version = "0.1.0"
authors = ["Ondřej Hruška <ondra@ondrovo.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clappconfig = "0.3.1"
rouille = "3.0.0"
parking_lot = "0.10.2"
serde_json = "1.0.52"
serde = "1.0.106"
serde_derive = "1.0"
log = "0.4.8"
siphasher = "0.3.3"
rand = "0.7.3"

@ -0,0 +1,91 @@
use clappconfig::{AppConfig, anyhow, clap::ArgMatches};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize)]
#[serde(default)]
#[serde(deny_unknown_fields)]
pub(crate) struct Config {
/// Log level
pub(crate) logging: String,
/// Per-module log levels
pub(crate) log_levels: HashMap<String, String>,
/// Server bind address
pub(crate) host: String,
/// Server port
pub(crate) port: u16,
/// Default expiry time in seconds
#[serde(with = "serde_duration_secs")]
pub(crate) default_expiry: Duration,
/// Max expiry time in seconds
#[serde(with = "serde_duration_secs")]
pub(crate) max_expiry: Duration,
/// Expired post clearing interval (triggered on write)
#[serde(with = "serde_duration_secs")]
pub(crate) expired_gc_interval: Duration,
/// Max uploaded file size in bytes
pub(crate) max_file_size: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
logging: "debug".to_string(),
log_levels: Default::default(),
host: "0.0.0.0".to_string(),
port: 7745,
default_expiry: Duration::from_secs(60 * 10),
max_expiry: Duration::from_secs(60 * 10),
expired_gc_interval: Duration::from_secs(60),
max_file_size: 1 * (1024 * 1024) // 1MB
}
}
}
impl AppConfig for Config {
type Init = Config;
fn logging(&self) -> &str {
&self.logging
}
fn logging_mod_levels(&self) -> Option<&HashMap<String, String>> {
Some(&self.log_levels)
}
fn configure(self, _clap: &ArgMatches) -> anyhow::Result<Self::Init> {
Ok(self)
}
}
mod serde_duration_secs {
use serde::{self, Deserialize, Serializer, Deserializer};
use std::time::Duration;
pub fn serialize<S>(
value: &Duration,
se: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
se.serialize_u64(value.as_secs())
}
pub fn deserialize<'de, D>(
de: D,
) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let s: u64 = u64::deserialize(de)?;
Ok(Duration::from_secs(s))
}
}

@ -0,0 +1,400 @@
#[macro_use] extern crate serde_derive;
#[macro_use] extern crate log;
use std::time::{Instant, Duration};
use parking_lot::Mutex;
use std::collections::HashMap;
use clappconfig::{AppConfig, anyhow};
use std::io::Read;
use std::hash::{Hash, Hasher};
use rand::rngs::OsRng;
use rand::Rng;
use rouille::{Request, Response, ResponseBody};
use std::borrow::Cow;
use crate::config::Config;
mod config;
fn error_with_text(code : u16, text : impl Into<String>) -> Response {
Response {
status_code: code,
headers: vec![("Content-Type".into(), "text/plain; charset=utf8".into())],
data: rouille::ResponseBody::from_string(text),
upgrade: None,
}
}
fn empty_error(code : u16) -> Response {
Response {
status_code: code,
headers: vec![("Content-Type".into(), "text/plain; charset=utf8".into())],
data: rouille::ResponseBody::empty(),
upgrade: None,
}
}
type PostId = u64;
type Secret = u64;
type DataHash = u64;
#[derive(Debug)]
struct Post {
mime : Cow<'static, str>,
hash : DataHash,
secret : Secret,
expires : Instant,
}
impl Post {
pub fn is_expired(&self) -> bool {
self.expires < Instant::now()
}
}
fn main() -> anyhow::Result<()> {
let config = Config::init("postit", "postit.json", None)?;
let serve_at = format!("{}:{}", config.host, config.port);
let store = Mutex::new(Repository::new(config));
rouille::start_server(serve_at, move |req| {
let mut store_w = store.lock();
let method = req.method();
info!("{} {}", method, req.raw_url());
store_w.gc_expired_posts_if_needed();
match method {
"POST" | "PUT" => {
store_w.serve_post_put(req)
}
"GET" | "HEAD" => {
store_w.serve_get_head(req)
}
"DELETE" => {
store_w.serve_delete(req)
}
_ => {
rouille::Response::empty_400()
}
}
});
}
type PostsMap = HashMap<PostId, Post>;
type DataMap = HashMap<DataHash, (usize, Vec<u8>)>;
struct Repository {
config: Config,
posts: PostsMap,
/// (use_count, data)
data: DataMap,
/// Time of last expired posts GC
last_gc_time: Instant,
}
impl Repository {
fn new(config: Config) -> Self {
Repository {
config,
posts: Default::default(),
data: Default::default(),
last_gc_time: Instant::now(),
}
}
fn serve_delete(&mut self, req : &Request) -> Response {
let post_id = match self.request_to_post_id(req, true) {
Ok(Some(pid)) => pid,
Ok(None) => return error_with_text(400, "Post ID required."),
Err(resp) => return resp
};
self.delete_post(post_id);
Response::text("Deleted.")
}
fn serve_post_put(&mut self, req : &Request) -> Response {
let post_id = match self.request_to_post_id(req, true) {
Ok(pid) => {
if req.method() == "PUT" && pid.is_none() {
return error_with_text(400, "PUT requires a file ID!");
} else if req.method() == "POST" && pid.is_some() {
return error_with_text(400, "Use PUT to update a file!");
}
pid
},
Err(resp) => return resp
};
debug!("Submit new data, post ID: {:?}", post_id);
let mut data = vec![];
if let Some(body) = req.data() {
// Read up to 1 byte past the limit to catch too large uploads.
// We can't reply on the "Length" field, which is not present with chunked encoding.
body.take(self.config.max_file_size as u64 + 1).read_to_end(&mut data).unwrap();
if data.len() > self.config.max_file_size {
return empty_error(413);
}
} else {
return error_with_text(400, "Empty body!");
}
let mime = match req.header("Content-Type") {
None => None,
Some("application/x-www-form-urlencoded") => Some("text/plain"),
Some(v) => Some(v),
};
let expiry = match req.header("X-Expires") {
Some(text) => {
match text.parse() {
Ok(v) => {
let dur = Duration::from_secs(v);
if dur > self.config.max_expiry {
return error_with_text(400,
format!("Expiration time {} out of allowed range 0-{} s",
v,
self.config.max_expiry.as_secs()
));
}
Some(dur)
},
Err(_) => {
return error_with_text(400, "Malformed \"X-Expires\", use relative time in seconds.");
},
}
}
None => None
};
if let Some(id) = post_id {
// UPDATE
self.update(id, data, mime, expiry);
Response::text("Updated.")
} else {
// INSERT
let (id, token) = self.insert(data, mime, expiry.unwrap_or(self.config.default_expiry));
Response::text(format!("{:016x}", id))
.with_additional_header("X-Secret", format!("{:016x}", token))
}
}
fn serve_get_head(&mut self, req : &Request) -> Response {
let post_id = match self.request_to_post_id(req, false) {
Ok(Some(pid)) => pid,
Ok(None) => return error_with_text(400, "Post ID required."),
Err(resp) => return resp
};
if let Some(post) = self.posts.get(&post_id) {
if post.is_expired() {
warn!("GET of expired post!");
Response::empty_404()
} else {
let data = self.data.get(&post.hash);
if data.is_none() {
error!("No matching data!");
return error_with_text(500, "File data lost.");
}
Response {
status_code: 200,
headers: vec![("Content-Type".into(), format!("{}; charset=utf8", post.mime).into())],
data: if req.method() == "HEAD" {
ResponseBody::empty()
} else {
ResponseBody::from_data(data.unwrap().1.clone())
},
upgrade: None,
}
}
} else {
warn!("No such post!");
Response::empty_404()
}
}
fn request_to_post_id(&self, req : &Request, check_secret : bool) -> Result<Option<PostId>, Response> {
let url = req.url();
let stripped = url.trim_matches('/');
if stripped.is_empty() {
// No ID given
return Ok(None);
}
let id = match u64::from_str_radix(stripped, 16) {
Ok(bytes) => bytes,
Err(_) => {
return Err(error_with_text(400, "Bad file ID format!"));
},
};
if check_secret {
// Check the write token
match self.posts.get(&id) {
None/* | Some(_p) if _p.is_expired()*/ => {
return Err(error_with_text(404, "No file with this ID!"));
},
Some(post) => {
if post.is_expired() {
warn!("Access of expired post!");
return Err(error_with_text(404, "No file with this ID!"));
}
let secret: u64 = match req.header("X-Secret").map(|v| u64::from_str_radix(v, 16)) {
Some(Ok(bytes)) => bytes,
None => {
return Err(error_with_text(400, "X-Secret required!"));
}
Some(Err(e)) => {
warn!("{:?}", e);
return Err(error_with_text(400, "Bad secret format!"));
},
};
if post.secret != secret {
return Err(error_with_text(401, "Invalid secret!"));
}
},
}
}
// secret is now validated and we got an ID
Ok(Some(id))
}
fn gc_expired_posts_if_needed(&mut self) {
if self.last_gc_time.elapsed() > self.config.expired_gc_interval {
self.gc_expired_posts();
self.last_gc_time = Instant::now();
}
}
fn gc_expired_posts(&mut self) {
debug!("GC expired posts");
let mut to_rm = vec![];
for post in &self.posts {
if post.1.is_expired() {
to_rm.push(*post.0);
}
}
for id in to_rm {
debug!("Drop post ID {:016x}", id);
if let Some(post) = self.posts.remove(&id) {
Self::drop_data_or_decrement_rc(&mut self.data, post.hash);
}
}
}
fn hash_data(data : &Vec<u8>) -> DataHash {
let mut hasher = siphasher::sip::SipHasher::new();
data.hash(&mut hasher);
hasher.finish()
}
fn store_data_or_increment_rc(data_map : &mut DataMap, hash : u64, data: Vec<u8>) {
match data_map.get_mut(&hash) {
None => {
debug!("Store new data hash #{:016x}", hash);
data_map.insert(hash, (1, data));
},
Some(entry) => {
debug!("Link new use of data hash #{:016x}", hash);
entry.0 += 1; // increment use counter
},
}
}
fn drop_data_or_decrement_rc(data_map : &mut DataMap, hash : u64) {
if let Some(old_data) = data_map.get_mut(&hash) {
if old_data.0 > 1 {
old_data.0 -= 1;
debug!("Unlink use of data hash #{:016x} ({} remain)", hash, old_data.0);
} else {
debug!("Drop data hash #{:016x}", hash);
data_map.remove(&hash);
}
}
}
fn insert(&mut self, data : Vec<u8>, mime : Option<&str>, expires : Duration) -> (PostId, Secret) {
info!("Insert post with data of len {} bytes, mime {}, expiry {:?}",
data.len(), mime.unwrap_or("unspecified"),
expires);
let hash = Self::hash_data(&data);
Self::store_data_or_increment_rc(&mut self.data, hash, data);
let post_id = loop {
let id = OsRng.gen();
if !self.posts.contains_key(&id) {
break id;
}
};
let secret = OsRng.gen();
debug!("Data hash = #{:016x}", hash);
debug!("Post ID = #{:016x}", post_id);
debug!("Secret = #{:016x}", secret);
self.posts.insert(post_id, Post {
mime: mime.map(ToString::to_string).map(Cow::Owned)
.unwrap_or(Cow::Borrowed("application/octet-stream")),
hash,
secret,
expires: Instant::now() + expires
});
(post_id, secret)
}
fn update(&mut self, id : PostId, data : Vec<u8>, mime : Option<&str>, expires : Option<Duration>) {
info!("Update post id #{:016x} with data of len {} bytes, mime {}, expiry {}",
id, data.len(), mime.unwrap_or("unchanged"),
expires.map(|v| Cow::Owned(format!("{:?}", v)))
.unwrap_or("unchanged".into()));
let hash = Self::hash_data(&data);
let post = self.posts.get_mut(&id).unwrap(); // post existence was checked before
if hash != post.hash {
debug!("Data hash = #{:016x} (content changed)", hash);
Self::drop_data_or_decrement_rc(&mut self.data, post.hash);
Self::store_data_or_increment_rc(&mut self.data, hash, data);
post.hash = hash;
} else {
debug!("Data hash = #{:016x} (no change)", hash);
}
if let Some(mime) = mime {
if &post.mime != mime {
debug!("Content type changed to {}", mime);
post.mime = Cow::Owned(mime.to_string());
}
}
if let Some(exp) = expires {
debug!("Expiration changed to {:?} from now", exp);
post.expires = Instant::now() + exp;
}
}
fn delete_post(&mut self, id : PostId) {
info!("Delete post id #{:016x}", id);
let post = self.posts.remove(&id).unwrap(); // post existence was checked before
Self::drop_data_or_decrement_rc(&mut self.data, post.hash);
}
}
Loading…
Cancel
Save