From 791bc83387bbac4fb6ff46f4cf1153e37837b626 Mon Sep 17 00:00:00 2001 From: leo60228 Date: Mon, 25 May 2020 09:03:28 -0400 Subject: [PATCH] Use WebSockets for events --- Cargo.toml | 4 +- src/errors.rs | 10 ++ src/lib.rs | 230 ++++++++++++++++++++++++++++++++------------ src/page.rs | 2 +- src/registration.rs | 2 +- 5 files changed, 184 insertions(+), 64 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f18c2bf..4f03ffb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,13 +18,15 @@ serde_derive = "1" serde_json = "1" serde_urlencoded = "0.6.1" serde_qs = "0.4.5" -url = "1" tap-reader = "1" try_from = "0.3.2" toml = { version = "0.5.0", optional = true } hyper-old-types = "0.11.0" envy = { version = "0.4.0", optional = true } log = "0.4.6" +tungstenite = "0.10.1" +url = "2.1.1" +url1x = { version = "1", package = "url" } [dependencies.chrono] version = "0.4" diff --git a/src/errors.rs b/src/errors.rs index b48cb0c..65e5fcc 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,6 +12,8 @@ use tomlcrate::de::Error as TomlDeError; #[cfg(feature = "toml")] use tomlcrate::ser::Error as TomlSerError; use url::ParseError as UrlError; +use url1x::ParseError as ReqwestUrlError; +use tungstenite::error::Error as WebSocketError; /// Convience type over `std::result::Result` with `Error` as the error type. pub type Result = ::std::result::Result; @@ -33,6 +35,8 @@ pub enum Error { Io(IoError), /// Wrapper around the `url::ParseError` struct. Url(UrlError), + /// Wrapper around the `url::ParseError` struct. + ReqwestUrl(ReqwestUrlError), /// Missing Client Id. ClientIdRequired, /// Missing Client Secret. @@ -60,6 +64,8 @@ pub enum Error { Envy(EnvyError), /// Error serializing to a query string SerdeQs(SerdeQsError), + /// WebSocket error + WebSocket(WebSocketError), /// Other errors Other(String), } @@ -79,6 +85,7 @@ impl error::Error for Error { Error::Http(ref e) => e, Error::Io(ref e) => e, Error::Url(ref e) => e, + Error::ReqwestUrl(ref e) => e, #[cfg(feature = "toml")] Error::TomlSer(ref e) => e, #[cfg(feature = "toml")] @@ -88,6 +95,7 @@ impl error::Error for Error { #[cfg(feature = "env")] Error::Envy(ref e) => e, Error::SerdeQs(ref e) => e, + Error::WebSocket(ref e) => e, Error::Client(..) | Error::Server(..) => { return None @@ -138,6 +146,7 @@ from! { SerdeError, Serde, UrlEncodedError, UrlEncoded, UrlError, Url, + ReqwestUrlError, ReqwestUrl, ApiError, Api, #[cfg(feature = "toml")] TomlSerError, TomlSer, #[cfg(feature = "toml")] TomlDeError, TomlDe, @@ -145,6 +154,7 @@ from! { HeaderParseError, HeaderParseError, #[cfg(feature = "env")] EnvyError, Envy, SerdeQsError, SerdeQs, + WebSocketError, WebSocket, String, Other, } diff --git a/src/lib.rs b/src/lib.rs index 4661784..b82099c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,8 @@ extern crate serde_urlencoded; extern crate tap_reader; extern crate try_from; extern crate url; +extern crate url1x; +extern crate tungstenite; #[cfg(feature = "env")] extern crate envy; @@ -105,12 +107,13 @@ extern crate indoc; use std::{ borrow::Cow, - io::{BufRead, BufReader}, + io::BufRead, ops, }; use reqwest::{Client, RequestBuilder, Response}; use tap_reader::Tap; +use tungstenite::client::AutoStream; use entities::prelude::*; use http_send::{HttpSend, HttpSender}; @@ -202,7 +205,7 @@ impl From for Mastodon { } impl MastodonClient for Mastodon { - type Stream = EventReader>; + type Stream = EventReader; paged_routes! { (get) favourites: "favourites" => Status, @@ -491,124 +494,229 @@ impl MastodonClient for Mastodon { /// # } /// ``` fn streaming_user(&self) -> Result { - let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "user"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// returns all public statuses fn streaming_public(&self) -> Result { - let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "public"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all local statuses fn streaming_local(&self) -> Result { - let response = self.send( - self.client - .get(&self.route("/api/v1/streaming/public/local")), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "public:local"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all public statuses for a particular hashtag fn streaming_public_hashtag(&self, hashtag: &str) -> Result { - let response = self.send( - self.client - .get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "hashtag") + .append_pair("tag", hashtag); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all local statuses for a particular hashtag fn streaming_local_hashtag(&self, hashtag: &str) -> Result { - let response = self.send( - self.client - .get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "hashtag:local") + .append_pair("tag", hashtag); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns statuses for a list fn streaming_list(&self, list_id: &str) -> Result { - let response = self.send( - self.client - .get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "list") + .append_pair("list", list_id); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all direct messages fn streaming_direct(&self) -> Result { - let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "direct"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) + } +} + +#[derive(Debug)] +/// WebSocket newtype so that EventStream can be implemented without coherency issues +pub struct WebSocket(tungstenite::protocol::WebSocket); + +/// A type that streaming events can be read from +pub trait EventStream { + /// Read a message from this stream + fn read_message(&mut self) -> Result; +} + +impl EventStream for R { + fn read_message(&mut self) -> Result { + let mut buf = String::new(); + self.read_line(&mut buf)?; + Ok(buf) + } +} + +impl EventStream for WebSocket { + fn read_message(&mut self) -> Result { + Ok(self.0.read_message()?.into_text()?) } } #[derive(Debug)] /// Iterator that produces events from a mastodon streaming API event stream -pub struct EventReader(R); -impl Iterator for EventReader { +pub struct EventReader(R); +impl Iterator for EventReader { type Item = Event; fn next(&mut self) -> Option { let mut lines = Vec::new(); - let mut tmp = String::new(); loop { - if let Ok(..) = self.0.read_line(&mut tmp) { - let line = tmp.trim().to_string(); - tmp.clear(); - if line.starts_with(":") { + if let Ok(line) = self.0.read_message() { + let line = line.trim().to_string(); + if line.starts_with(":") || line.is_empty() { continue; } - if line.is_empty() && !lines.is_empty() { - if let Ok(event) = self.make_event(&lines) { - lines.clear(); - return Some(event); - } else { - continue; - } - } lines.push(line); + if let Ok(event) = self.make_event(&lines) { + lines.clear(); + return Some(event); + } else { + continue; + } } } } } -impl EventReader { +impl EventReader { fn make_event(&self, lines: &[String]) -> Result { - let event = lines + let event; + let data; + if let Some(event_line) = lines .iter() .find(|line| line.starts_with("event:")) - .ok_or_else(|| Error::Other("No `event:` line".to_string()))?; - let event = event[6..].trim(); - let data = lines.iter().find(|line| line.starts_with("data:")); + { + event = event_line[6..].trim().to_string(); + data = lines.iter().find(|line| line.starts_with("data:")).map(|x| x[5..].trim().to_string()); + } else { + #[derive(Deserialize)] + struct Message { + pub event: String, + pub payload: Option, + } + let message = serde_json::from_str::(&lines[0])?; + event = message.event; + data = message.payload; + } + let event: &str = &event; Ok(match event { "notification" => { let data = data.ok_or_else(|| { Error::Other("Missing `data` line for notification".to_string()) })?; - let data = data[5..].trim(); let notification = serde_json::from_str::(&data)?; Event::Notification(notification) }, "update" => { let data = data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?; - let data = data[5..].trim(); let status = serde_json::from_str::(&data)?; Event::Update(status) }, "delete" => { let data = data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?; - let data = data[5..].trim().to_string(); Event::Delete(data) }, "filters_changed" => Event::FiltersChanged, @@ -668,7 +776,7 @@ impl MastodonBuilder { pub struct MastodonUnauth { client: Client, http_sender: H, - base: url::Url, + base: reqwest::Url, } impl MastodonUnauth { @@ -682,13 +790,13 @@ impl MastodonUnauth { Ok(MastodonUnauth { client: Client::new(), http_sender: HttpSender, - base: url::Url::parse(&base)?, + base: reqwest::Url::parse(&base)?, }) } } impl MastodonUnauth { - fn route(&self, url: &str) -> Result { + fn route(&self, url: &str) -> Result { Ok(self.base.join(url)?) } diff --git a/src/page.rs b/src/page.rs index 3a47ce6..09a2dca 100644 --- a/src/page.rs +++ b/src/page.rs @@ -3,7 +3,7 @@ use entities::itemsiter::ItemsIter; use hyper_old_types::header::{parsing, Link, RelationType}; use reqwest::{header::LINK, Response}; use serde::Deserialize; -use url::Url; +use reqwest::Url; use http_send::HttpSend; diff --git a/src/registration.rs b/src/registration.rs index 7aba7c5..8417b40 100644 --- a/src/registration.rs +++ b/src/registration.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use reqwest::{Client, RequestBuilder, Response}; use try_from::TryInto; -use url::percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET}; +use url1x::percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET}; use apps::{App, AppBuilder}; use http_send::{HttpSend, HttpSender};