use std::sync::Arc; use std::time::{Duration, Instant}; use elefren::debug::EventDisplay; use elefren::debug::NotificationDisplay; use elefren::debug::StatusDisplay; use elefren::entities::account::Account; use elefren::entities::event::Event; use elefren::entities::notification::{Notification, NotificationType}; use elefren::entities::status::Status; use elefren::status_builder::Visibility; use elefren::{FediClient, StatusBuilder}; use futures::StreamExt; use handle_mention::ProcessMention; use crate::command::StatusCommand; use crate::error::GroupError; use crate::store::CommonConfig; use crate::store::GroupConfig; use crate::utils::{normalize_acct, LogError, VisExt}; mod handle_mention; /// This is one group's config store capable of persistence #[derive(Debug)] pub struct GroupHandle { pub group_account: Account, pub client: FediClient, pub config: GroupConfig, pub cc: Arc, } // TODO move other options to common_config! // // const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); // const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); // // higher because we can expect a lot of non-hashtag statuses here // const SOCKET_ALIVE_TIMEOUT: Duration = Duration::from_secs(30); // const SOCKET_RETIRE_TIME: Duration = Duration::from_secs(120); macro_rules! grp_debug { ($self:ident, $f:expr) => { ::log::debug!(concat!("(@{}) ", $f), $self.config.get_acct()); }; ($self:ident, $f:expr, $($arg:tt)+) => { ::log::debug!(concat!("(@{}) ", $f), $self.config.get_acct(), $($arg)+); }; } macro_rules! grp_info { ($self:ident, $f:expr) => { ::log::info!(concat!("(@{}) ", $f), $self.config.get_acct()); }; ($self:ident, $f:expr, $($arg:tt)+) => { ::log::info!(concat!("(@{}) ", $f), $self.config.get_acct(), $($arg)+); }; } #[allow(unused)] macro_rules! grp_trace { ($self:ident, $f:expr) => { ::log::trace!(concat!("(@{}) ", $f), $self.config.get_acct()); }; ($self:ident, $f:expr, $($arg:tt)+) => { ::log::trace!(concat!("(@{}) ", $f), $self.config.get_acct(), $($arg)+); }; } macro_rules! grp_warn { ($self:ident, $f:expr) => { ::log::warn!(concat!("(@{}) ", $f), $self.config.get_acct()); }; ($self:ident, $f:expr, $($arg:tt)+) => { ::log::warn!(concat!("(@{}) ", $f), $self.config.get_acct(), $($arg)+); }; } macro_rules! grp_error { ($self:ident, $f:expr) => { ::log::error!(concat!("(@{}) ", $f), $self.config.get_acct()); }; ($self:ident, $f:expr, $($arg:tt)+) => { ::log::error!(concat!("(@{}) ", $f), $self.config.get_acct(), $($arg)+); }; } impl GroupHandle { #[allow(unused)] pub async fn save(&mut self) -> Result<(), GroupError> { grp_debug!(self, "Saving group state unconditionally"); self.config.save(false).await?; Ok(()) } pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { if self.config.is_dirty() { grp_debug!(self, "Saving group state due to changes"); self.config.save_if_needed(false).await?; } Ok(()) } } trait NotifTimestamp { fn timestamp_millis(&self) -> u64; } impl NotifTimestamp for Notification { fn timestamp_millis(&self) -> u64 { self.created_at.timestamp_millis().max(0) as u64 } } impl NotifTimestamp for Status { fn timestamp_millis(&self) -> u64 { // this may not work well for unseen status tracking, // if ancient statuses were to appear in the timeline :( self.created_at.timestamp_millis().max(0) as u64 } } impl GroupHandle { pub async fn run(&mut self) -> Result<(), GroupError> { loop { match self.run_internal().await { Ok(()) => unreachable!(), Err(e @ GroupError::BadConfig(_)) => { grp_error!(self, "ERROR in group handler, aborting! {}", e); return Err(e); } Err(other) => { grp_error!(self, "ERROR in group handler, will restart! {}", other); tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_reopen_error_s)).await; } } } } pub async fn run_internal(&mut self) -> Result<(), GroupError> { loop { grp_debug!(self, "Opening streaming API socket"); // wrapped in a timeout, this seems like the only place the group could hang // (https://git.ondrovo.com/MightyPork/group-actor/issues/8) let mut events = match tokio::time::timeout(Duration::from_secs(3), self.client.streaming_user()).await { Ok(Ok(events)) => events, Ok(Err(e)) => return Err(e.into()), Err(_) => { return Err(GroupError::ApiTimeout); } }; let socket_open_time = Instant::now(); let mut last_rx = Instant::now(); match self.catch_up_with_missed_notifications().await { Ok(true) => { grp_debug!(self, "Some missed notifs handled"); } Ok(false) => { grp_debug!(self, "No notifs missed"); } Err(e) => { grp_error!(self, "Failed to handle missed notifs: {}", e); } } match self.catch_up_with_missed_statuses().await { Ok(true) => { grp_debug!(self, "Some missed statuses handled"); } Ok(false) => { grp_debug!(self, "No statuses missed"); } Err(e) => { grp_error!(self, "Failed to handle missed statuses: {}", e); } } self.save_if_needed().await.log_error("Failed to save"); 'rx: loop { let remains_to_idle_close = Duration::from_secs_f64(self.cc.socket_alive_timeout_s).saturating_sub(last_rx.elapsed()); let remains_to_retire = Duration::from_secs_f64(self.cc.socket_retire_time_s).saturating_sub(socket_open_time.elapsed()); if remains_to_idle_close.is_zero() { grp_warn!(self, "Socket idle too long, close"); break 'rx; } if remains_to_retire.is_zero() { grp_debug!(self, "Socket open too long, closing"); break 'rx; } let timeout = remains_to_idle_close.min(remains_to_retire).max(Duration::from_secs(1)); // at least 1s grp_debug!(self, "Wait for message {:?}", timeout); match tokio::time::timeout(timeout, events.next()).await { Ok(Some(event)) => { last_rx = Instant::now(); grp_debug!(self, "(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); match event { Event::Update(status) => { self.handle_status(status).await.log_error("Error handling a status"); } Event::Notification(n) => { self.handle_notification(n).await.log_error("Error handling a notification"); } Event::Delete(_id) => {} Event::FiltersChanged => {} Event::Heartbeat => {} } self.save_if_needed().await.log_error("Failed to save"); } Ok(None) => { grp_warn!(self, "Group @{} socket closed, restarting...", self.config.get_acct()); break 'rx; } Err(_) => { // Timeout so we can save if needed } } } grp_warn!(self, "Notif stream closed, will reopen"); tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_reopen_closed_s)).await; } } async fn handle_notification(&mut self, n: Notification) -> Result<(), GroupError> { grp_debug!(self, "Handling notif #{}", n.id); let ts = n.timestamp_millis(); self.config.set_last_notif(ts); let group_acct = self.config.get_acct().to_string(); let notif_user_id = &n.account.id; let notif_acct = normalize_acct(&n.account.acct, &group_acct)?; if notif_acct == group_acct { grp_debug!(self, "This is our post, ignore that"); return Ok(()); } if self.config.is_banned(¬if_acct) { grp_warn!(self, "Notification actor {} is banned!", notif_acct); return Ok(()); } match n.notification_type { NotificationType::Mention => { if let Some(status) = n.status { self.handle_mention_status(status).await?; } } NotificationType::Follow => { grp_info!(self, "New follower!"); // Just greet the user always self.handle_new_follow(¬if_acct, notif_user_id).await; } NotificationType::Favourite => {} NotificationType::Reblog => {} NotificationType::Other(_) => {} } Ok(()) } /// Handle a non-mention status for tags async fn handle_status(&mut self, s: Status) -> Result<(), GroupError> { grp_debug!(self, "Handling status #{}", s.id); let ts = s.timestamp_millis(); self.config.set_last_status(ts); let private = s.visibility.is_private(); let has_hashtags = s.content.contains('#'); let group_user = self.config.get_acct(); let status_user = normalize_acct(&s.account.acct, group_user)?; let member_or_admin = self.config.is_member_or_admin(&status_user); let commands = crate::command::parse_slash_commands(&s.content); if status_user == group_user { grp_debug!(self, "This is our post, discard"); return Ok(()); } if self.config.is_banned(&status_user) { grp_debug!(self, "Status author @{} is banned, discard", status_user); return Ok(()); } if self.config.is_optout(&status_user) && !member_or_admin { grp_debug!(self, "Status author @{} opted out, discard", status_user); return Ok(()); } if commands.contains(&StatusCommand::Ignore) { grp_debug!(self, "Post has IGNORE command, discard"); return Ok(()); } // Sometimes notifications don't work, but we see the mentions as statuses for m in &s.mentions { let mentioned_user = normalize_acct(&m.acct, group_user)?; if mentioned_user == group_user { let notif_time = self.config.get_last_notif(); if notif_time <= ts { grp_debug!( self, "mentioned but status is older than last notif, can't be a valid notif, discard" ); return Ok(()); } if !commands.is_empty() { grp_debug!(self, "Detected commands for this group, handle as notif"); return self .handle_notification(Notification { id: s.id.clone(), // ??? notification_type: NotificationType::Mention, created_at: s.created_at, account: s.account.clone(), status: Some(s), }) .await; } else if private { grp_debug!(self, "mention in private without commands, discard, this is nothing"); return Ok(()); } } } // optout does not work for members and admins, so don't check it if !member_or_admin { grp_debug!(self, "Status author @{} is not a member, discard", status_user); return Ok(()); } if private { grp_debug!(self, "Status is private, discard"); return Ok(()); } if !has_hashtags { grp_debug!(self, "No hashtags, discard"); return Ok(()); } let tags = crate::command::parse_status_tags(&s.content); grp_debug!(self, "Tags in status: {:?}", tags); 'tags: for t in tags { if self.config.is_tag_followed(&t) { grp_info!(self, "REBLOG #{} STATUS", t); self.client.reblog(&s.id).await.log_error("Failed to reblog"); self.delay_after_post().await; break 'tags; // do not reblog multiple times! } else { grp_debug!(self, "#{} is not a group tag", t); } } Ok(()) } async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { self.client.follow(id).await?; self.delay_after_post().await; Ok(()) } /// Catch up with missed notifications, returns true if any were handled async fn catch_up_with_missed_notifications(&mut self) -> Result { let last_notif = self.config.get_last_notif(); let notifications = self.client.notifications().await?; let mut iter = notifications.items_iter(); let mut notifs_to_handle = vec![]; // They are retrieved newest first, but we want oldest first for chronological handling let mut num = 0; let mut old_pn = 0; while let Some(n) = iter.next_item().await { let ts = n.timestamp_millis(); if ts <= last_notif { break; // reached our last seen notif } grp_debug!(self, "Inspecting notif {}", NotificationDisplay(&n)); notifs_to_handle.push(n); num += 1; if num > self.cc.max_catchup_notifs { grp_warn!(self, "Too many notifs missed to catch up!"); break; } let pn = iter.page_num(); if pn != old_pn { old_pn = pn; // sleep so we dont make the api angry tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_fetch_page_s)).await; } } if notifs_to_handle.is_empty() { return Ok(false); } notifs_to_handle.reverse(); grp_debug!(self, "{} notifications to catch up!", notifs_to_handle.len()); for n in notifs_to_handle { grp_debug!(self, "Handling missed notification: {}", NotificationDisplay(&n)); self.handle_notification(n).await.log_error("Error handling a notification"); } Ok(true) } /// Catch up with missed statuses, returns true if any were handled async fn catch_up_with_missed_statuses(&mut self) -> Result { let last_status = self.config.get_last_status(); let statuses = self.client.get_home_timeline().await?; let mut iter = statuses.items_iter(); let mut statuses_to_handle = vec![]; // They are retrieved newest first, but we want oldest first for chronological handling let mut newest_status = None; let mut num = 0; let mut old_pn = 0; while let Some(s) = iter.next_item().await { let ts = s.timestamp_millis(); if ts <= last_status { break; // reached our last seen status (hopefully there arent any retro-bumped) } grp_debug!(self, "Inspecting status {}", StatusDisplay(&s)); if newest_status.is_none() { newest_status = Some(ts); } statuses_to_handle.push(s); num += 1; if num > self.cc.max_catchup_statuses { grp_warn!(self, "Too many statuses missed to catch up!"); break; } let pn = iter.page_num(); if pn != old_pn { old_pn = pn; // sleep so we dont make the api angry tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_fetch_page_s)).await; } } if let Some(ts) = newest_status { self.config.set_last_status(ts); } if statuses_to_handle.is_empty() { grp_debug!(self, "No statuses to handle"); return Ok(false); } statuses_to_handle.reverse(); grp_debug!(self, "{} statuses to catch up!", statuses_to_handle.len()); for s in statuses_to_handle { grp_debug!(self, "Handling missed status: {}", StatusDisplay(&s)); self.handle_status(s).await.log_error("Error handling a status"); } Ok(true) } async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { let res = ProcessMention::run(self, status).await; self.save_if_needed().await.log_error("Failed to save"); res } async fn handle_new_follow(&mut self, notif_acct: &str, notif_user_id: &str) { let mut follow_back = false; let text = if self.config.is_member_only() { // Admins are listed without @, so they won't become handles here. // Tagging all admins would be annoying. let mut admins = self.config.get_admins().cloned().collect::>(); admins.sort(); format!( "\ @{user} Welcome to the group! This group has posting restricted to members. \ If you'd like to join, please ask one of the group admins:\n\ {admins}", user = notif_acct, admins = admins.join(", ") ) } else { follow_back = true; self.config.set_member(notif_acct, true).log_error("Fail add a member"); format!( "\ @{user} Welcome to the group! The group user will now follow you back to complete the sign-up. \ To share a post, @ the group user or use a group hashtag.\n\n\ Use /help for more info.", user = notif_acct ) }; let post = StatusBuilder::new() .status(text) .content_type("text/markdown") .visibility(Visibility::Direct) .build() .expect("error build status"); self.client.new_status(post).await.log_error("Failed to post"); self.delay_after_post().await; if follow_back { self.follow_user(notif_user_id).await.log_error("Failed to follow back"); } } async fn delay_after_post(&self) { tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_after_post_s)).await; } }