diff --git a/src/entities/event.rs b/src/entities/event.rs new file mode 100644 index 0000000..101c677 --- /dev/null +++ b/src/entities/event.rs @@ -0,0 +1,14 @@ +use entities::{notification::Notification, status::Status}; + +#[derive(Debug, Clone)] +/// Events that come from the /streaming/user API call +pub enum Event { + /// Update event + Update(Status), + /// Notification event + Notification(Notification), + /// Delete event + Delete(String), + /// FiltersChanged event + FiltersChanged, +} diff --git a/src/entities/mod.rs b/src/entities/mod.rs index 69e13bf..d5ad373 100644 --- a/src/entities/mod.rs +++ b/src/entities/mod.rs @@ -6,6 +6,8 @@ pub mod attachment; pub mod card; /// Data structures for ser/de of contetx-related resources pub mod context; +/// Data structures for ser/de of streaming events +pub mod event; /// Data structures for ser/de of filter-related resources pub mod filter; /// Data structures for ser/de of instance-related resources @@ -41,6 +43,7 @@ pub mod prelude { attachment::{Attachment, MediaType}, card::Card, context::Context, + event::Event, filter::{Filter, FilterContext}, instance::*, list::List, diff --git a/src/lib.rs b/src/lib.rs index 0b1956a..0db5db1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,7 +71,11 @@ extern crate tempfile; #[cfg_attr(all(test, any(feature = "toml", feature = "json")), macro_use)] extern crate indoc; -use std::{borrow::Cow, ops}; +use std::{ + borrow::Cow, + io::{BufRead, BufReader}, + ops, +}; use reqwest::{Client, RequestBuilder, Response}; use tap_reader::Tap; @@ -165,6 +169,8 @@ impl From for Mastodon { } impl MastodonClient for Mastodon { + type Stream = EventReader>; + paged_routes! { (get) favourites: "favourites" => Status, (get) blocks: "blocks" => Account, @@ -419,6 +425,134 @@ impl MastodonClient for Mastodon { let me = self.verify_credentials()?; Ok(self.following(&me.id)?) } + + /// returns events that are relevant to the authorized user, i.e. home + /// timeline & notifications + fn streaming_user(&self) -> Result { + let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } + + /// returns all public statuses + fn streaming_public(&self) -> Result { + let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } + + /// Returns all local statuses + fn streaming_local(&self) -> Result { + let response = self.send( + self.client + .get(&self.route("/api/v1/streaming/public/local")), + )?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } + + /// Returns all public statuses for a particular hashtag + fn streaming_public_hashtag(&self, hashtag: &str) -> Result { + let response = self.send( + self.client + .get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))), + )?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } + + /// Returns all local statuses for a particular hashtag + fn streaming_local_hashtag(&self, hashtag: &str) -> Result { + let response = self.send( + self.client + .get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))), + )?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } + + /// Returns statuses for a list + fn streaming_list(&self, list_id: &str) -> Result { + let response = self.send( + self.client + .get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))), + )?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } + + /// Returns all direct messages + fn streaming_direct(&self) -> Result { + let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?; + let reader = BufReader::new(response); + Ok(EventReader(reader)) + } +} + +#[derive(Debug)] +/// Iterator that produces events from a mastodon streaming API event stream +pub struct EventReader(R); +impl Iterator for EventReader { + type Item = Event; + + fn next(&mut self) -> Option { + let mut lines = Vec::new(); + let mut tmp = String::new(); + loop { + if let Ok(..) = self.0.read_line(&mut tmp) { + let line = dbg!(tmp.trim().to_string()); + tmp.clear(); + if dbg!(line.starts_with(":")) { + continue; + } + if dbg!(line.is_empty() && !lines.is_empty()) { + if let Ok(event) = dbg!(self.make_event(&lines)) { + lines.clear(); + return Some(event); + } else { + continue; + } + } + lines.push(line); + } + } + } +} + +impl EventReader { + fn make_event(&self, lines: &[String]) -> Result { + let event = lines + .iter() + .find(|line| line.starts_with("event:")) + .ok_or_else(|| Error::Other("No `event:` line".to_string()))?; + let event = event[6..].trim(); + let data = lines.iter().find(|line| line.starts_with("data:")); + Ok(match event { + "notification" => { + let data = data.ok_or_else(|| { + Error::Other("Missing `data` line for notification".to_string()) + })?; + let data = data[5..].trim(); + let notification = serde_json::from_str::(&data)?; + Event::Notification(notification) + }, + "update" => { + let data = + data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?; + let data = data[5..].trim(); + let status = serde_json::from_str::(&data)?; + Event::Update(status) + }, + "delete" => { + let data = + data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?; + let data = data[5..].trim().to_string(); + Event::Delete(data) + }, + "filters_changed" => Event::FiltersChanged, + _ => return Err(Error::Other(format!("Unknown event `{}`", event))), + }) + } } impl ops::Deref for Mastodon { diff --git a/src/mastodon_client.rs b/src/mastodon_client.rs index 559f079..c5fee0e 100644 --- a/src/mastodon_client.rs +++ b/src/mastodon_client.rs @@ -17,6 +17,9 @@ use status_builder::StatusBuilder; /// implementations might be swapped out for testing #[allow(unused)] pub trait MastodonClient { + /// Type that wraps streaming API streams + type Stream: Iterator; + /// GET /api/v1/favourites fn favourites(&self) -> Result> { unimplemented!("This method was not implemented"); @@ -316,4 +319,40 @@ pub trait MastodonClient { fn followed_by_me(&self) -> Result> { unimplemented!("This method was not implemented"); } + + /// Returns events that are relevant to the authorized user, i.e. home + /// timeline and notifications + fn streaming_user(&self) -> Result { + unimplemented!("This method was not implemented"); + } + + /// Returns all public statuses + fn streaming_public(&self) -> Result { + unimplemented!("This method was not implemented"); + } + + /// Returns all local statuses + fn streaming_local(&self) -> Result { + unimplemented!("This method was not implemented"); + } + + /// Returns all public statuses for a particular hashtag + fn streaming_public_hashtag(&self, hashtag: &str) -> Result { + unimplemented!("This method was not implemented"); + } + + /// Returns all local statuses for a particular hashtag + fn streaming_local_hashtag(&self, hashtag: &str) -> Result { + unimplemented!("This method was not implemented"); + } + + /// Returns statuses for a list + fn streaming_list(&self, list_id: &str) -> Result { + unimplemented!("This method was not implemented"); + } + + /// Returns all direct messages + fn streaming_direct(&self) -> Result { + unimplemented!("This method was not implemented"); + } }