diff --git a/Cargo.toml b/Cargo.toml index f93340a..797081f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ tokio-util = { version = "0.6", features = [ "io" ] } tokio-stream = "0.1.7" chrono = { version = "0.4", features = ["serde"] } thiserror = "1" +futures-util = "0.3.16" [dev-dependencies] tempfile = "3.0.3" diff --git a/src/streaming.rs b/src/streaming.rs index 8effe91..d865043 100644 --- a/src/streaming.rs +++ b/src/streaming.rs @@ -8,6 +8,7 @@ use std::task::Poll; use tokio_tungstenite::tungstenite::Message; use crate::entities::notification::Notification; use crate::entities::status::Status; +use futures_util::sink::SinkExt; #[derive(Clone, Debug)] pub enum StreamKind<'a> { @@ -88,6 +89,10 @@ impl EventReader { lines: vec![] } } + + pub async fn send_ping(&mut self) -> std::result::Result<(), tokio_tungstenite::tungstenite::Error> { + self.stream.send(Message::Ping("pleroma groups".as_bytes().to_vec())).await + } } impl Stream for EventReader { @@ -114,6 +119,7 @@ impl Stream for EventReader { } Poll::Ready(Some(Ok(Message::Ping(_)))) | Poll::Ready(Some(Ok(Message::Pong(_)))) => { // Discard + trace!("Ping/Pong, discard"); Poll::Pending } Poll::Ready(Some(Ok(Message::Binary(_)))) => { @@ -121,6 +127,7 @@ impl Stream for EventReader { Poll::Pending } Poll::Ready(Some(Ok(Message::Close(_)))) => { + warn!("Websocket close frame!"); Poll::Ready(None) } Poll::Ready(Some(Err(error))) => {