blobpats reply

master
Ondřej Hruška 5 years ago
parent 176bde9d01
commit 2b7114f2da
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 69
      src/ele.rs
  2. 79
      src/main.rs

@ -8,8 +8,10 @@ use elefren::{Registration, Mastodon};
use elefren::http_send::HttpSender;
use elefren::helpers::cli;
use elefren::scopes::Scopes;
use elefren::entities::prelude::{Status, Notification, Event};
use std::str::FromStr;
use failure::Fallible;
use websocket::{WebSocketResult, OwnedMessage};
const KEY_OAUTH_REGISTRATION: &str = "oauth.registration";
const KEY_OAUTH_SESSION: &str = "oauth.session";
@ -86,3 +88,70 @@ pub fn open_stream_websocket(session : &EleSession, stream_name : &str) -> Falli
.expect("Error create ClientBuilder")
.connect_secure(Some(connector))?)
}
#[derive(Debug)]
pub enum ParsedSocketMsg {
/// The socket closed
WsClose,
/// Server pings us
WsPing(Vec<u8>),
/// Update event
Event(Event),
}
#[derive(Debug,Deserialize)]
pub struct StreamingApiRawMessage {
pub event : String,
pub payload: String,
}
pub fn parse_stream_socket_msg(msg_result : WebSocketResult<OwnedMessage>) -> Fallible<Option<ParsedSocketMsg>> {
Ok(match msg_result {
Ok(OwnedMessage::Text(msg)) => {
if msg.is_empty() {
None
} else {
trace!("WS text msg: {}", msg);
Some(parse_raw_msg(serde_json::from_str::<StreamingApiRawMessage>(&msg)?)?)
}
}
Ok(OwnedMessage::Ping(pld)) => {
trace!("WS ping request");
Some(ParsedSocketMsg::WsPing(pld))
}
Ok(OwnedMessage::Close(..)) => {
error!("WS closed by server.");
Some(ParsedSocketMsg::WsClose)
}
Ok(x) => {
warn!("Unhandled WS message: {:?}", x);
None
}
Err(e) => {
error!("WS error: {}", e);
Some(ParsedSocketMsg::WsClose)
}
})
}
fn parse_raw_msg(rawmsg : StreamingApiRawMessage) -> Fallible<ParsedSocketMsg> {
Ok(match &rawmsg.event[..] {
"notification" => {
let notification = serde_json::from_str::<Notification>(&rawmsg.payload)?;
ParsedSocketMsg::Event(Event::Notification(notification))
},
"update" => {
let status = serde_json::from_str::<Status>(&rawmsg.payload)?;
ParsedSocketMsg::Event(Event::Update(status))
},
"delete" => {
ParsedSocketMsg::Event(Event::Delete(rawmsg.payload))
},
"filters_changed" => {
ParsedSocketMsg::Event(Event::FiltersChanged)
},
_ => {
return Err(failure::format_err!("Invalid event type from streaming API: {}", &rawmsg.event))
}
})
}

@ -17,8 +17,12 @@ mod ele;
use crate::bootstrap::handle_cli_args_and_load_config;
use crate::store::Store;
use crate::ele::{EleSession, EleStreamSocket};
use crate::ele::{EleSession, EleStreamSocket, parse_stream_socket_msg, ParsedSocketMsg};
use std::time::Duration;
use elefren::entities::prelude::*;
use elefren::entities::notification::NotificationType;
use elefren::{MastodonClient, StatusBuilder};
use elefren::status_builder::Visibility;
#[derive(SmartDefault,Serialize,Deserialize,Debug)]
#[serde(default)]
@ -49,42 +53,79 @@ fn main() {
debug!("Listening to events");
let m_session = session.clone();
let handle = thread::spawn(move || handle_stream(m_session));
let handle = thread::spawn(move || job_listen_to_stream(m_session, "user"));
let _ = handle.join();
}
fn handle_stream(session : EleSession) -> ! {
/// Listen to a streaming API channel, reconnecting on failure.
fn job_listen_to_stream(session : EleSession, kind : &str) -> ! {
loop {
debug!("Trying to open websocket");
match ele::open_stream_websocket(&session, "user") {
Ok(socket) => process_incoming_events(socket),
match ele::open_stream_websocket(&session, kind) {
Ok(socket) => process_incoming_events(&session, socket),
Err(e) => error!("Error opening socket: {}", e)
}
debug!("Delay before reconnecting...");
thread::sleep(Duration::from_secs(10));
thread::sleep(Duration::from_secs(5));
}
}
fn process_incoming_events(mut socket : EleStreamSocket) {
'listen: for m in socket.incoming_messages() {
match m {
Ok(OwnedMessage::Text(text)) => {
debug!("Got msg: {}", text);
},
Ok(OwnedMessage::Close(_text)) => {
/// Process events received from a streaming API websocket
fn process_incoming_events(session : &EleSession, mut socket : EleStreamSocket) {
'listen: loop {
let recv_result = socket.recv_message();
if let Ok(Some(msg)) = parse_stream_socket_msg(recv_result) {
match msg {
ParsedSocketMsg::WsClose => {
debug!("Close");
break 'listen;
},
Ok(any) => {
debug!("Unhandled msg: {:?}", any);
}
Err(e) => {
error!("Error reading from socket: {}", e);
break 'listen;
ParsedSocketMsg::WsPing(data) => {
debug!("Sending Pong");
let _ = socket.send_message(&OwnedMessage::Pong(data));
},
ParsedSocketMsg::Event(event) => {
handle_received_event(session, event);
}
}
}
}
}
/// Build the special syntax for a @ mention
fn make_mention(user : &Account) -> String {
format!(r#"<span class="h-card"><a class="u-url mention" href="{url}">@<span>{handle}</span></a></span>"#,
url=user.url, handle=user.acct
)
}
/// Do something with an event we received from the API
fn handle_received_event(session : &EleSession, event: Event) {
debug!("*** Event received:\n{:?}", event);
match event {
Event::Notification(Notification {
notification_type: NotificationType::Mention,
account, status: Some(status),
..
}) => {
debug!("Sending a reply.");
let poste = StatusBuilder::new()
.status(format!("Hello {} :blobpats:", &make_mention(&account)))
.content_type("text/html")
.in_reply_to(status.id)
.visibility(Visibility::Unlisted)
.build().expect("error build status");
session.new_status(poste).expect("error send.");
}
_ => {
//
}
}
}

Loading…
Cancel
Save