Initial commit of streaming support

master
Paul Woolcock 5 years ago
parent 5e69ad4b33
commit e2c11f48d6
  1. 14
      src/entities/event.rs
  2. 3
      src/entities/mod.rs
  3. 136
      src/lib.rs
  4. 39
      src/mastodon_client.rs

@ -0,0 +1,14 @@
use entities::{notification::Notification, status::Status};
#[derive(Debug, Clone)]
/// Events that come from the /streaming/user API call
pub enum Event {
/// Update event
Update(Status),
/// Notification event
Notification(Notification),
/// Delete event
Delete(String),
/// FiltersChanged event
FiltersChanged,
}

@ -6,6 +6,8 @@ pub mod attachment;
pub mod card;
/// Data structures for ser/de of contetx-related resources
pub mod context;
/// Data structures for ser/de of streaming events
pub mod event;
/// Data structures for ser/de of filter-related resources
pub mod filter;
/// Data structures for ser/de of instance-related resources
@ -41,6 +43,7 @@ pub mod prelude {
attachment::{Attachment, MediaType},
card::Card,
context::Context,
event::Event,
filter::{Filter, FilterContext},
instance::*,
list::List,

@ -71,7 +71,11 @@ extern crate tempfile;
#[cfg_attr(all(test, any(feature = "toml", feature = "json")), macro_use)]
extern crate indoc;
use std::{borrow::Cow, ops};
use std::{
borrow::Cow,
io::{BufRead, BufReader},
ops,
};
use reqwest::{Client, RequestBuilder, Response};
use tap_reader::Tap;
@ -165,6 +169,8 @@ impl From<Data> for Mastodon<HttpSender> {
}
impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
type Stream = EventReader<BufReader<Response>>;
paged_routes! {
(get) favourites: "favourites" => Status,
(get) blocks: "blocks" => Account,
@ -419,6 +425,134 @@ impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
let me = self.verify_credentials()?;
Ok(self.following(&me.id)?)
}
/// returns events that are relevant to the authorized user, i.e. home
/// timeline & notifications
fn streaming_user(&self) -> Result<Self::Stream> {
let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
/// returns all public statuses
fn streaming_public(&self) -> Result<Self::Stream> {
let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
/// Returns all local statuses
fn streaming_local(&self) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route("/api/v1/streaming/public/local")),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
/// Returns all public statuses for a particular hashtag
fn streaming_public_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
/// Returns all local statuses for a particular hashtag
fn streaming_local_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
/// Returns statuses for a list
fn streaming_list(&self, list_id: &str) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
/// Returns all direct messages
fn streaming_direct(&self) -> Result<Self::Stream> {
let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
}
}
#[derive(Debug)]
/// Iterator that produces events from a mastodon streaming API event stream
pub struct EventReader<R: BufRead>(R);
impl<R: BufRead> Iterator for EventReader<R> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
let mut lines = Vec::new();
let mut tmp = String::new();
loop {
if let Ok(..) = self.0.read_line(&mut tmp) {
let line = dbg!(tmp.trim().to_string());
tmp.clear();
if dbg!(line.starts_with(":")) {
continue;
}
if dbg!(line.is_empty() && !lines.is_empty()) {
if let Ok(event) = dbg!(self.make_event(&lines)) {
lines.clear();
return Some(event);
} else {
continue;
}
}
lines.push(line);
}
}
}
}
impl<R: BufRead> EventReader<R> {
fn make_event(&self, lines: &[String]) -> Result<Event> {
let event = lines
.iter()
.find(|line| line.starts_with("event:"))
.ok_or_else(|| Error::Other("No `event:` line".to_string()))?;
let event = event[6..].trim();
let data = lines.iter().find(|line| line.starts_with("data:"));
Ok(match event {
"notification" => {
let data = data.ok_or_else(|| {
Error::Other("Missing `data` line for notification".to_string())
})?;
let data = data[5..].trim();
let notification = serde_json::from_str::<Notification>(&data)?;
Event::Notification(notification)
},
"update" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?;
let data = data[5..].trim();
let status = serde_json::from_str::<Status>(&data)?;
Event::Update(status)
},
"delete" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?;
let data = data[5..].trim().to_string();
Event::Delete(data)
},
"filters_changed" => Event::FiltersChanged,
_ => return Err(Error::Other(format!("Unknown event `{}`", event))),
})
}
}
impl<H: HttpSend> ops::Deref for Mastodon<H> {

@ -17,6 +17,9 @@ use status_builder::StatusBuilder;
/// implementations might be swapped out for testing
#[allow(unused)]
pub trait MastodonClient<H: HttpSend = HttpSender> {
/// Type that wraps streaming API streams
type Stream: Iterator<Item = Event>;
/// GET /api/v1/favourites
fn favourites(&self) -> Result<Page<Status, H>> {
unimplemented!("This method was not implemented");
@ -316,4 +319,40 @@ pub trait MastodonClient<H: HttpSend = HttpSender> {
fn followed_by_me(&self) -> Result<Page<Account, H>> {
unimplemented!("This method was not implemented");
}
/// Returns events that are relevant to the authorized user, i.e. home
/// timeline and notifications
fn streaming_user(&self) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
/// Returns all public statuses
fn streaming_public(&self) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
/// Returns all local statuses
fn streaming_local(&self) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
/// Returns all public statuses for a particular hashtag
fn streaming_public_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
/// Returns all local statuses for a particular hashtag
fn streaming_local_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
/// Returns statuses for a list
fn streaming_list(&self, list_id: &str) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
/// Returns all direct messages
fn streaming_direct(&self) -> Result<Self::Stream> {
unimplemented!("This method was not implemented");
}
}

Loading…
Cancel
Save