Use WebSockets for events

master
leo60228 5 years ago committed by Paul Woolcock
parent b877d79831
commit 791bc83387
  1. 4
      Cargo.toml
  2. 10
      src/errors.rs
  3. 220
      src/lib.rs
  4. 2
      src/page.rs
  5. 2
      src/registration.rs

@ -18,13 +18,15 @@ serde_derive = "1"
serde_json = "1"
serde_urlencoded = "0.6.1"
serde_qs = "0.4.5"
url = "1"
tap-reader = "1"
try_from = "0.3.2"
toml = { version = "0.5.0", optional = true }
hyper-old-types = "0.11.0"
envy = { version = "0.4.0", optional = true }
log = "0.4.6"
tungstenite = "0.10.1"
url = "2.1.1"
url1x = { version = "1", package = "url" }
[dependencies.chrono]
version = "0.4"

@ -12,6 +12,8 @@ use tomlcrate::de::Error as TomlDeError;
#[cfg(feature = "toml")]
use tomlcrate::ser::Error as TomlSerError;
use url::ParseError as UrlError;
use url1x::ParseError as ReqwestUrlError;
use tungstenite::error::Error as WebSocketError;
/// Convience type over `std::result::Result` with `Error` as the error type.
pub type Result<T> = ::std::result::Result<T, Error>;
@ -33,6 +35,8 @@ pub enum Error {
Io(IoError),
/// Wrapper around the `url::ParseError` struct.
Url(UrlError),
/// Wrapper around the `url::ParseError` struct.
ReqwestUrl(ReqwestUrlError),
/// Missing Client Id.
ClientIdRequired,
/// Missing Client Secret.
@ -60,6 +64,8 @@ pub enum Error {
Envy(EnvyError),
/// Error serializing to a query string
SerdeQs(SerdeQsError),
/// WebSocket error
WebSocket(WebSocketError),
/// Other errors
Other(String),
}
@ -79,6 +85,7 @@ impl error::Error for Error {
Error::Http(ref e) => e,
Error::Io(ref e) => e,
Error::Url(ref e) => e,
Error::ReqwestUrl(ref e) => e,
#[cfg(feature = "toml")]
Error::TomlSer(ref e) => e,
#[cfg(feature = "toml")]
@ -88,6 +95,7 @@ impl error::Error for Error {
#[cfg(feature = "env")]
Error::Envy(ref e) => e,
Error::SerdeQs(ref e) => e,
Error::WebSocket(ref e) => e,
Error::Client(..) | Error::Server(..) => {
return None
@ -138,6 +146,7 @@ from! {
SerdeError, Serde,
UrlEncodedError, UrlEncoded,
UrlError, Url,
ReqwestUrlError, ReqwestUrl,
ApiError, Api,
#[cfg(feature = "toml")] TomlSerError, TomlSer,
#[cfg(feature = "toml")] TomlDeError, TomlDe,
@ -145,6 +154,7 @@ from! {
HeaderParseError, HeaderParseError,
#[cfg(feature = "env")] EnvyError, Envy,
SerdeQsError, SerdeQs,
WebSocketError, WebSocket,
String, Other,
}

@ -89,6 +89,8 @@ extern crate serde_urlencoded;
extern crate tap_reader;
extern crate try_from;
extern crate url;
extern crate url1x;
extern crate tungstenite;
#[cfg(feature = "env")]
extern crate envy;
@ -105,12 +107,13 @@ extern crate indoc;
use std::{
borrow::Cow,
io::{BufRead, BufReader},
io::BufRead,
ops,
};
use reqwest::{Client, RequestBuilder, Response};
use tap_reader::Tap;
use tungstenite::client::AutoStream;
use entities::prelude::*;
use http_send::{HttpSend, HttpSender};
@ -202,7 +205,7 @@ impl From<Data> for Mastodon<HttpSender> {
}
impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
type Stream = EventReader<BufReader<Response>>;
type Stream = EventReader<WebSocket>;
paged_routes! {
(get) favourites: "favourites" => Status,
@ -491,83 +494,180 @@ impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
/// # }
/// ```
fn streaming_user(&self) -> Result<Self::Stream> {
let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "user");
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
/// returns all public statuses
fn streaming_public(&self) -> Result<Self::Stream> {
let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "public");
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
/// Returns all local statuses
fn streaming_local(&self) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route("/api/v1/streaming/public/local")),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "public:local");
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
/// Returns all public statuses for a particular hashtag
fn streaming_public_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "hashtag")
.append_pair("tag", hashtag);
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
/// Returns all local statuses for a particular hashtag
fn streaming_local_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "hashtag:local")
.append_pair("tag", hashtag);
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
/// Returns statuses for a list
fn streaming_list(&self, list_id: &str) -> Result<Self::Stream> {
let response = self.send(
self.client
.get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))),
)?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "list")
.append_pair("list", list_id);
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
/// Returns all direct messages
fn streaming_direct(&self) -> Result<Self::Stream> {
let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?;
let reader = BufReader::new(response);
Ok(EventReader(reader))
let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.token)
.append_pair("stream", "direct");
let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
let client = tungstenite::connect(url.as_str())?.0;
Ok(EventReader(WebSocket(client)))
}
}
#[derive(Debug)]
/// WebSocket newtype so that EventStream can be implemented without coherency issues
pub struct WebSocket(tungstenite::protocol::WebSocket<AutoStream>);
/// A type that streaming events can be read from
pub trait EventStream {
/// Read a message from this stream
fn read_message(&mut self) -> Result<String>;
}
impl<R: BufRead> EventStream for R {
fn read_message(&mut self) -> Result<String> {
let mut buf = String::new();
self.read_line(&mut buf)?;
Ok(buf)
}
}
impl EventStream for WebSocket {
fn read_message(&mut self) -> Result<String> {
Ok(self.0.read_message()?.into_text()?)
}
}
#[derive(Debug)]
/// Iterator that produces events from a mastodon streaming API event stream
pub struct EventReader<R: BufRead>(R);
impl<R: BufRead> Iterator for EventReader<R> {
pub struct EventReader<R: EventStream>(R);
impl<R: EventStream> Iterator for EventReader<R> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
let mut lines = Vec::new();
let mut tmp = String::new();
loop {
if let Ok(..) = self.0.read_line(&mut tmp) {
let line = tmp.trim().to_string();
tmp.clear();
if line.starts_with(":") {
if let Ok(line) = self.0.read_message() {
let line = line.trim().to_string();
if line.starts_with(":") || line.is_empty() {
continue;
}
if line.is_empty() && !lines.is_empty() {
lines.push(line);
if let Ok(event) = self.make_event(&lines) {
lines.clear();
return Some(event);
@ -575,40 +675,48 @@ impl<R: BufRead> Iterator for EventReader<R> {
continue;
}
}
lines.push(line);
}
}
}
}
impl<R: BufRead> EventReader<R> {
impl<R: EventStream> EventReader<R> {
fn make_event(&self, lines: &[String]) -> Result<Event> {
let event = lines
let event;
let data;
if let Some(event_line) = lines
.iter()
.find(|line| line.starts_with("event:"))
.ok_or_else(|| Error::Other("No `event:` line".to_string()))?;
let event = event[6..].trim();
let data = lines.iter().find(|line| line.starts_with("data:"));
{
event = event_line[6..].trim().to_string();
data = lines.iter().find(|line| line.starts_with("data:")).map(|x| x[5..].trim().to_string());
} else {
#[derive(Deserialize)]
struct Message {
pub event: String,
pub payload: Option<String>,
}
let message = serde_json::from_str::<Message>(&lines[0])?;
event = message.event;
data = message.payload;
}
let event: &str = &event;
Ok(match event {
"notification" => {
let data = data.ok_or_else(|| {
Error::Other("Missing `data` line for notification".to_string())
})?;
let data = data[5..].trim();
let notification = serde_json::from_str::<Notification>(&data)?;
Event::Notification(notification)
},
"update" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?;
let data = data[5..].trim();
let status = serde_json::from_str::<Status>(&data)?;
Event::Update(status)
},
"delete" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?;
let data = data[5..].trim().to_string();
Event::Delete(data)
},
"filters_changed" => Event::FiltersChanged,
@ -668,7 +776,7 @@ impl<H: HttpSend> MastodonBuilder<H> {
pub struct MastodonUnauth<H: HttpSend = HttpSender> {
client: Client,
http_sender: H,
base: url::Url,
base: reqwest::Url,
}
impl MastodonUnauth<HttpSender> {
@ -682,13 +790,13 @@ impl MastodonUnauth<HttpSender> {
Ok(MastodonUnauth {
client: Client::new(),
http_sender: HttpSender,
base: url::Url::parse(&base)?,
base: reqwest::Url::parse(&base)?,
})
}
}
impl<H: HttpSend> MastodonUnauth<H> {
fn route(&self, url: &str) -> Result<url::Url> {
fn route(&self, url: &str) -> Result<reqwest::Url> {
Ok(self.base.join(url)?)
}

@ -3,7 +3,7 @@ use entities::itemsiter::ItemsIter;
use hyper_old_types::header::{parsing, Link, RelationType};
use reqwest::{header::LINK, Response};
use serde::Deserialize;
use url::Url;
use reqwest::Url;
use http_send::HttpSend;

@ -2,7 +2,7 @@ use std::borrow::Cow;
use reqwest::{Client, RequestBuilder, Response};
use try_from::TryInto;
use url::percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET};
use url1x::percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET};
use apps::{App, AppBuilder};
use http_send::{HttpSend, HttpSender};

Loading…
Cancel
Save