From 2b7114f2da954cb5a39f35d1721d2ea15e9c974e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Thu, 3 Oct 2019 11:56:39 +0200 Subject: [PATCH] blobpats reply --- src/ele.rs | 69 +++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 85 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 132 insertions(+), 22 deletions(-) diff --git a/src/ele.rs b/src/ele.rs index 58c9eac..af2a6a3 100644 --- a/src/ele.rs +++ b/src/ele.rs @@ -8,8 +8,10 @@ use elefren::{Registration, Mastodon}; use elefren::http_send::HttpSender; use elefren::helpers::cli; use elefren::scopes::Scopes; +use elefren::entities::prelude::{Status, Notification, Event}; use std::str::FromStr; use failure::Fallible; +use websocket::{WebSocketResult, OwnedMessage}; const KEY_OAUTH_REGISTRATION: &str = "oauth.registration"; const KEY_OAUTH_SESSION: &str = "oauth.session"; @@ -86,3 +88,70 @@ pub fn open_stream_websocket(session : &EleSession, stream_name : &str) -> Falli .expect("Error create ClientBuilder") .connect_secure(Some(connector))?) } + +#[derive(Debug)] +pub enum ParsedSocketMsg { + /// The socket closed + WsClose, + /// Server pings us + WsPing(Vec), + /// Update event + Event(Event), +} + +#[derive(Debug,Deserialize)] +pub struct StreamingApiRawMessage { + pub event : String, + pub payload: String, +} + +pub fn parse_stream_socket_msg(msg_result : WebSocketResult) -> Fallible> { + Ok(match msg_result { + Ok(OwnedMessage::Text(msg)) => { + if msg.is_empty() { + None + } else { + trace!("WS text msg: {}", msg); + Some(parse_raw_msg(serde_json::from_str::(&msg)?)?) + } + } + Ok(OwnedMessage::Ping(pld)) => { + trace!("WS ping request"); + Some(ParsedSocketMsg::WsPing(pld)) + } + Ok(OwnedMessage::Close(..)) => { + error!("WS closed by server."); + Some(ParsedSocketMsg::WsClose) + } + Ok(x) => { + warn!("Unhandled WS message: {:?}", x); + None + } + Err(e) => { + error!("WS error: {}", e); + Some(ParsedSocketMsg::WsClose) + } + }) +} + +fn parse_raw_msg(rawmsg : StreamingApiRawMessage) -> Fallible { + Ok(match &rawmsg.event[..] { + "notification" => { + let notification = serde_json::from_str::(&rawmsg.payload)?; + ParsedSocketMsg::Event(Event::Notification(notification)) + }, + "update" => { + let status = serde_json::from_str::(&rawmsg.payload)?; + ParsedSocketMsg::Event(Event::Update(status)) + }, + "delete" => { + ParsedSocketMsg::Event(Event::Delete(rawmsg.payload)) + }, + "filters_changed" => { + ParsedSocketMsg::Event(Event::FiltersChanged) + }, + _ => { + return Err(failure::format_err!("Invalid event type from streaming API: {}", &rawmsg.event)) + } + }) +} diff --git a/src/main.rs b/src/main.rs index 4c3d784..c5fc8a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,8 +17,12 @@ mod ele; use crate::bootstrap::handle_cli_args_and_load_config; use crate::store::Store; -use crate::ele::{EleSession, EleStreamSocket}; +use crate::ele::{EleSession, EleStreamSocket, parse_stream_socket_msg, ParsedSocketMsg}; use std::time::Duration; +use elefren::entities::prelude::*; +use elefren::entities::notification::NotificationType; +use elefren::{MastodonClient, StatusBuilder}; +use elefren::status_builder::Visibility; #[derive(SmartDefault,Serialize,Deserialize,Debug)] #[serde(default)] @@ -49,42 +53,79 @@ fn main() { debug!("Listening to events"); let m_session = session.clone(); - let handle = thread::spawn(move || handle_stream(m_session)); + let handle = thread::spawn(move || job_listen_to_stream(m_session, "user")); let _ = handle.join(); } -fn handle_stream(session : EleSession) -> ! { +/// Listen to a streaming API channel, reconnecting on failure. +fn job_listen_to_stream(session : EleSession, kind : &str) -> ! { loop { debug!("Trying to open websocket"); - match ele::open_stream_websocket(&session, "user") { - Ok(socket) => process_incoming_events(socket), + match ele::open_stream_websocket(&session, kind) { + Ok(socket) => process_incoming_events(&session, socket), Err(e) => error!("Error opening socket: {}", e) } debug!("Delay before reconnecting..."); - thread::sleep(Duration::from_secs(10)); + thread::sleep(Duration::from_secs(5)); } } -fn process_incoming_events(mut socket : EleStreamSocket) { - 'listen: for m in socket.incoming_messages() { - match m { - Ok(OwnedMessage::Text(text)) => { - debug!("Got msg: {}", text); - }, - Ok(OwnedMessage::Close(_text)) => { - debug!("Close"); - break 'listen; - }, - Ok(any) => { - debug!("Unhandled msg: {:?}", any); +/// Process events received from a streaming API websocket +fn process_incoming_events(session : &EleSession, mut socket : EleStreamSocket) { + 'listen: loop { + let recv_result = socket.recv_message(); + + if let Ok(Some(msg)) = parse_stream_socket_msg(recv_result) { + match msg { + ParsedSocketMsg::WsClose => { + debug!("Close"); + break 'listen; + }, + ParsedSocketMsg::WsPing(data) => { + debug!("Sending Pong"); + let _ = socket.send_message(&OwnedMessage::Pong(data)); + }, + ParsedSocketMsg::Event(event) => { + handle_received_event(session, event); + } } - Err(e) => { - error!("Error reading from socket: {}", e); - break 'listen; - }, + } + } +} + +/// Build the special syntax for a @ mention +fn make_mention(user : &Account) -> String { + format!(r#"@{handle}"#, + url=user.url, handle=user.acct + ) +} + +/// Do something with an event we received from the API +fn handle_received_event(session : &EleSession, event: Event) { + debug!("*** Event received:\n{:?}", event); + + match event { + Event::Notification(Notification { + notification_type: NotificationType::Mention, + account, status: Some(status), + .. + }) => { + debug!("Sending a reply."); + + let poste = StatusBuilder::new() + .status(format!("Hello {} :blobpats:", &make_mention(&account))) + .content_type("text/html") + .in_reply_to(status.id) + .visibility(Visibility::Unlisted) + .build().expect("error build status"); + + session.new_status(poste).expect("error send."); + } + _ => { + // } } }