From 52cf8f8e970ec14f1d27c5d07baa4b94937429b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Thu, 26 Aug 2021 22:11:20 +0200 Subject: [PATCH] v0.2, refactor, improve some messages, fix lints --- src/error.rs | 2 + src/group_handle.rs | 911 ---------------------------- src/group_handler/handle_mention.rs | 625 +++++++++++++++++++ src/group_handler/mod.rs | 454 ++++++++++++++ src/main.rs | 2 +- src/store/data.rs | 72 ++- src/store/mod.rs | 8 +- src/utils.rs | 13 + 8 files changed, 1146 insertions(+), 941 deletions(-) delete mode 100644 src/group_handle.rs create mode 100644 src/group_handler/handle_mention.rs create mode 100644 src/group_handler/mod.rs diff --git a/src/error.rs b/src/error.rs index 29f109f..3e0095d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,8 @@ pub enum GroupError { GroupNotExist, #[error("Config error: {0}")] BadConfig(Cow<'static, str>), + #[error("API request timed out")] + ApiTimeout, #[error(transparent)] IoError(#[from] std::io::Error), #[error(transparent)] diff --git a/src/group_handle.rs b/src/group_handle.rs deleted file mode 100644 index 82a2bce..0000000 --- a/src/group_handle.rs +++ /dev/null @@ -1,911 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use elefren::{FediClient, StatusBuilder}; -use elefren::debug::EventDisplay; -use elefren::debug::NotificationDisplay; -use elefren::debug::StatusDisplay; -use elefren::entities::event::Event; -use elefren::entities::notification::{Notification, NotificationType}; -use elefren::entities::status::Status; -use elefren::status_builder::Visibility; -use futures::StreamExt; - -use crate::command::StatusCommand; -use crate::error::GroupError; -use crate::store::ConfigStore; -use crate::store::data::GroupConfig; -use crate::utils::{LogError, normalize_acct}; - -/// This is one group's config store capable of persistence -#[derive(Debug)] -pub struct GroupHandle { - pub(crate) client: FediClient, - pub(crate) config: GroupConfig, - pub(crate) store: Arc, -} - -const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); -const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); -const MAX_CATCHUP_NOTIFS: usize = 25; -// also statuses -const MAX_CATCHUP_STATUSES: usize = 50; -// higher because we can expect a lot of non-hashtag statuses here -const PERIODIC_SAVE: Duration = Duration::from_secs(60); -const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! - -impl GroupHandle { - pub async fn save(&mut self) -> Result<(), GroupError> { - debug!("Saving group config & status"); - self.store.set_group_config(self.config.clone()).await?; - debug!("Saved"); - self.config.clear_dirty_status(); - Ok(()) - } - - pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { - if self.config.is_dirty() { - self.save().await?; - } - Ok(()) - } - - /* - pub async fn reload(&mut self) -> Result<(), GroupError> { - if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { - self.config = g; - Ok(()) - } else { - Err(GroupError::GroupNotExist) - } - } - */ -} - -trait NotifTimestamp { - fn timestamp_millis(&self) -> u64; -} - -impl NotifTimestamp for Notification { - fn timestamp_millis(&self) -> u64 { - self.created_at.timestamp_millis().max(0) as u64 - } -} - -impl NotifTimestamp for Status { - fn timestamp_millis(&self) -> u64 { - // this may not work well for unseen status tracking, - // if ancient statuses were to appear in the timeline :( - self.created_at.timestamp_millis().max(0) as u64 - } -} - -impl GroupHandle { - pub async fn run(&mut self) -> Result<(), GroupError> { - assert!(PERIODIC_SAVE >= PING_INTERVAL); - - loop { - debug!("Opening streaming API socket"); - let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start - let mut events = self.client.streaming_user().await?; - let socket_open_time = Instant::now(); - let mut last_rx = Instant::now(); - let mut last_ping = Instant::now(); - - match self.catch_up_with_missed_notifications().await { - Ok(true) => { - debug!("Some missed notifs handled"); - } - Ok(false) => { - debug!("No notifs missed"); - } - Err(e) => { - error!("Failed to handle missed notifs: {}", e); - } - } - - match self.catch_up_with_missed_statuses().await { - Ok(true) => { - debug!("Some missed statuses handled"); - } - Ok(false) => { - debug!("No statuses missed"); - } - Err(e) => { - error!("Failed to handle missed statuses: {}", e); - } - } - - if self.config.is_dirty() { - // save asap - next_save = Instant::now() - PERIODIC_SAVE - } - - 'rx: loop { - if next_save < Instant::now() { - debug!("Save time elapsed, saving if needed"); - self.save_if_needed().await.log_error("Failed to save group"); - next_save = Instant::now() + PERIODIC_SAVE; - } - - if last_rx.elapsed() > PING_INTERVAL * 2 { - warn!("Socket idle too long, close"); - break 'rx; - } - - if socket_open_time.elapsed() > Duration::from_secs(120) { - debug!("Socket open too long, closing"); - break 'rx; - } - - debug!("Await msg"); - let timeout = next_save - .saturating_duration_since(Instant::now()) - .min(PING_INTERVAL) - .max(Duration::from_secs(1)); - - match tokio::time::timeout(timeout, events.next()).await { - Ok(Some(event)) => { - last_rx = Instant::now(); - debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); - match event { - Event::Update(status) => { - self.handle_status(status).await.log_error("Error handling a status"); - } - Event::Notification(n) => { - self.handle_notification(n).await.log_error("Error handling a notification"); - } - Event::Delete(_id) => {} - Event::FiltersChanged => {} - Event::Heartbeat => {} - } - } - Ok(None) => { - warn!("Group @{} socket closed, restarting...", self.config.get_acct()); - break 'rx; - } - Err(_) => { - // Timeout so we can save if needed - } - } - - if last_ping.elapsed() > PING_INTERVAL { - last_ping = Instant::now(); - debug!("Pinging"); - if events.send_ping() - .await.is_err() { - break 'rx; - } - } - } - - warn!("Notif stream closed, will reopen"); - tokio::time::sleep(DELAY_REOPEN_STREAM).await; - } - } - - async fn handle_notification(&mut self, n: Notification) -> Result<(), GroupError> { - debug!("Handling notif #{}", n.id); - let ts = n.timestamp_millis(); - self.config.set_last_notif(ts); - - let group_acct = self.config.get_acct().to_string(); - let notif_user_id = &n.account.id; - let notif_acct = normalize_acct(&n.account.acct, &group_acct)?; - - if notif_acct == group_acct { - debug!("This is our post, ignore that"); - return Ok(()); - } - - if self.config.is_banned(¬if_acct) { - warn!("Notification actor {} is banned!", notif_acct); - return Ok(()); - } - - match n.notification_type { - NotificationType::Mention => { - if let Some(status) = n.status { - self.handle_mention_status(status).await?; - } - } - NotificationType::Follow => { - info!("New follower!"); - - if self.config.is_member_or_admin(¬if_acct) { - // Already joined, just doing something silly, ignore this - debug!("User already a member, ignoring"); - } else { - let text = if self.config.is_member_only() { - // Admins are listed without @, so they won't become handles here. - // Tagging all admins would be annoying. - let mut admins = self.config.get_admins().cloned().collect::>(); - admins.sort(); - format!( - "@{user} Hi, this is a member-only group, you won't be \ - able to post. You can still receive group posts though. If you'd like to join, \ - please ask one of the group admins to add you:\n\n\ - {admins}", - user = notif_acct, - admins = admins.join(", ") - ) - } else { - self.follow_user(notif_user_id).await - .log_error("Failed to follow"); - make_welcome_text(¬if_acct) - }; - - let post = StatusBuilder::new() - .status(text) - .content_type("text/markdown") - .visibility(Visibility::Direct) - .build() - .expect("error build status"); - - // tokio::time::sleep(Duration::from_millis(500)).await; - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - } - NotificationType::Favourite => {} - NotificationType::Reblog => {} - } - - Ok(()) - } - - /// Handle a non-mention status - async fn handle_status(&mut self, s: Status) -> Result<(), GroupError> { - debug!("Handling status #{}", s.id); - let ts = s.timestamp_millis(); - self.config.set_last_status(ts); - - let group_user = self.config.get_acct(); - let status_user = normalize_acct(&s.account.acct, group_user)?; - - if status_user == group_user { - debug!("This is our post, ignore that"); - return Ok(()); - } - - // for m in &s.mentions { - // let ma = normalize_acct(&m.acct, gu)?; - // if ma == gu { - // debug!("Mention detected, handle status as mention notification!"); - // return self.handle_mention_status(s).await; - // } - // } - - if !s.content.contains('#') { - debug!("No tags in status"); - return Ok(()); - } - - if s.visibility.is_private() { - debug!("Status is direct/private, not boosting"); - return Ok(()); - } - - if s.content.contains("/add ") || s.content.contains("/remove ") { - debug!("Discard, looks like a hashtag manipulation command"); - return Ok(()); - } - - - if self.config.is_banned(&status_user) { - debug!("Status author @{} is banned.", status_user); - return Ok(()); - } - - if !self.config.is_member_or_admin(&status_user) { - debug!("Status author @{} is not a member.", status_user); - return Ok(()); - } - - let tags = crate::command::parse_status_tags(&s.content); - debug!("Tags in status: {:?}", tags); - - for t in tags { - if self.config.is_tag_followed(&t) { - self.client.reblog(&s.id).await - .log_error("Failed to reblog"); - break; - } - } - - Ok(()) - } - - async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { - self.client.follow(id).await?; - Ok(()) - } - - async fn unfollow_user(&mut self, id: &str) -> Result<(), GroupError> { - self.client.unfollow(id).await?; - Ok(()) - } - - /// Catch up with missed notifications, returns true if any were handled - async fn catch_up_with_missed_notifications(&mut self) -> Result { - let last_notif = self.config.get_last_notif(); - - let notifications = self.client.notifications().await?; - let mut iter = notifications.items_iter(); - - let mut notifs_to_handle = vec![]; - - // They are retrieved newest first, but we want oldest first for chronological handling - - let mut num = 0; - while let Some(n) = iter.next_item().await { - let ts = n.timestamp_millis(); - if ts <= last_notif { - break; // reached our last seen notif - } - - debug!("Inspecting notif {}", NotificationDisplay(&n)); - notifs_to_handle.push(n); - num += 1; - if num > MAX_CATCHUP_NOTIFS { - warn!("Too many notifs missed to catch up!"); - break; - } - - // sleep so we dont make the api angry - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if notifs_to_handle.is_empty() { - return Ok(false); - } - - notifs_to_handle.reverse(); - - debug!("{} notifications to catch up!", notifs_to_handle.len()); - - for n in notifs_to_handle { - debug!("Handling missed notification: {}", NotificationDisplay(&n)); - self.handle_notification(n).await.log_error("Error handling a notification"); - } - - Ok(true) - } - - /// Catch up with missed statuses, returns true if any were handled - async fn catch_up_with_missed_statuses(&mut self) -> Result { - let last_status = self.config.get_last_status(); - - let statuses = self.client.get_home_timeline().await?; - let mut iter = statuses.items_iter(); - - let mut statuses_to_handle = vec![]; - - // They are retrieved newest first, but we want oldest first for chronological handling - - let mut newest_status = None; - - let mut num = 0; - while let Some(s) = iter.next_item().await { - let ts = s.timestamp_millis(); - if ts <= last_status { - break; // reached our last seen status (hopefully there arent any retro-bumped) - } - - debug!("Inspecting status {}", StatusDisplay(&s)); - - if newest_status.is_none() { - newest_status = Some(ts); - } - - if s.content.contains('#') && !s.visibility.is_private() { - statuses_to_handle.push(s); - } - num += 1; - if num > MAX_CATCHUP_STATUSES { - warn!("Too many statuses missed to catch up!"); - break; - } - - // sleep so we dont make the api angry - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if let Some(ts) = newest_status { - self.config.set_last_status(ts); - } - - if statuses_to_handle.is_empty() { - return Ok(false); - } - - statuses_to_handle.reverse(); - - debug!("{} statuses to catch up!", statuses_to_handle.len()); - - for s in statuses_to_handle { - debug!("Handling missed status: {}", StatusDisplay(&s)); - self.handle_status(s).await - .log_error("Error handling a status"); - } - - Ok(true) - } - - fn list_admins(&self, replies: &mut Vec) { - let mut admins = self.config.get_admins().collect::>(); - admins.sort(); - for a in admins { - replies.push(a.to_string()); - } - } - - fn list_members(&self, replies: &mut Vec) { - let admins = self.config.get_admins().collect::>(); - let mut members = self.config.get_members().collect::>(); - members.extend(admins.iter()); - members.sort(); - members.dedup(); - for m in members { - if admins.contains(&m) { - replies.push(format!("{} [admin]", m)); - } else { - replies.push(m.to_string()); - } - } - } - async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { - let group_acct = self.config.get_acct().to_string(); - let status_acct = normalize_acct(&status.account.acct, &group_acct)?; - let status_user_id = &status.account.id; - - let can_write = self.config.can_write(&status_acct); - let is_admin = self.config.is_admin(&status_acct); - - if self.config.is_banned(&status_acct) { - warn!("Status author {} is banned!", status_acct); - return Ok(()); - } - - let commands = crate::command::parse_slash_commands(&status.content); - - let mut replies = vec![]; - let mut announcements = vec![]; - let mut do_boost_prev_post = false; - let mut any_admin_cmd = false; - let mut want_markdown = false; - - if commands.is_empty() { - debug!("No commands in post"); - if status.in_reply_to_id.is_none() { - if can_write { - // Someone tagged the group in OP, boost it. - info!("Boosting OP mention"); - // tokio::time::sleep(DELAY_BEFORE_ACTION).await; - self.client.reblog(&status.id).await.log_error("Failed to boost"); - } else { - replies.push("You are not allowed to post to this group".to_string()); - } - } else { - debug!("Not OP, ignore mention"); - } - } else { - for cmd in commands { - match cmd { - // ignore is first on purpose - StatusCommand::Ignore => { - debug!("Notif ignored because of ignore command"); - return Ok(()); - } - StatusCommand::Announce(a) => { - info!("Sending PSA"); - announcements.push(a); - } - StatusCommand::Boost => { - if can_write { - do_boost_prev_post = status.in_reply_to_id.is_some(); - } else { - replies.push("You are not allowed to share to this group".to_string()); - } - } - StatusCommand::BanUser(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_banned(&u) { - match self.config.ban_user(&u, true) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} banned from group!", u)); - - // FIXME we need user ID, not handle - get it via API? - // self.unfollow_user(&u).await - // .log_error("Failed to unfollow"); - - // no announcement here - } - Err(e) => { - replies.push(format!("Failed to ban user {}: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage user bans".to_string()); - } - } - StatusCommand::UnbanUser(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_banned(&u) { - match self.config.ban_user(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} un-banned!", u)); - - // no announcement here - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage user bans".to_string()); - } - } - StatusCommand::BanServer(s) => { - if is_admin { - if !self.config.is_server_banned(&s) { - match self.config.ban_server(&s, true) { - Ok(_) => { - any_admin_cmd = true; - announcements.push(format!("Server \"{}\" has been banned.", s)); - replies.push(format!("Server {} banned from group!", s)); - } - Err(e) => { - replies.push(format!("Failed to ban server {}: {}", s, e)); - } - } - } - } else { - replies.push("Only admins can manage server bans".to_string()); - } - } - StatusCommand::UnbanServer(s) => { - if is_admin { - if self.config.is_server_banned(&s) { - match self.config.ban_server(&s, false) { - Ok(_) => { - any_admin_cmd = true; - announcements.push(format!("Server \"{}\" has been un-banned.", s)); - replies.push(format!("Server {} un-banned!", s)); - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage server bans".to_string()); - } - } - StatusCommand::AddMember(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_member(&u) { - match self.config.set_member(&u, true) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} added to the group!", u)); - self.follow_user(status_user_id).await.log_error("Failed to follow"); - } - Err(e) => { - replies.push(format!("Failed to add user {} to group: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage members".to_string()); - } - } - StatusCommand::RemoveMember(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_member(&u) { - match self.config.set_member(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} removed from the group.", u)); - - // FIXME we need user ID, not handle - get it via API? - // self.unfollow_user(&u).await - // .log_error("Failed to unfollow"); - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage members".to_string()); - } - } - StatusCommand::AddTag(tag) => { - if is_admin { - any_admin_cmd = true; - self.config.add_tag(&tag); - replies.push(format!("Tag #{} added to the group!", tag)); - } else { - replies.push("Only admins can manage group tags".to_string()); - } - } - StatusCommand::RemoveTag(tag) => { - if is_admin { - any_admin_cmd = true; - self.config.remove_tag(&tag); - replies.push(format!("Tag #{} removed from the group!", tag)); - } else { - replies.push("Only admins can manage group tags".to_string()); - } - } - StatusCommand::GrantAdmin(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_admin(&u) { - match self.config.set_admin(&u, true) { - Ok(_) => { - // try to make the config a little more sane, admins should be members - let _ = self.config.set_member(&u, true); - - any_admin_cmd = true; - replies.push(format!("User {} is now a group admin!", u)); - announcements - .push(format!("User @{} can now manage this group!", u)); - } - Err(e) => { - replies.push(format!( - "Failed to make user {} a group admin: {}", - u, e - )); - } - } - } - } else { - replies.push("Only admins can manage admins".to_string()); - } - } - StatusCommand::RemoveAdmin(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_admin(&u) { - match self.config.set_admin(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} is no longer a group admin!", u)); - announcements - .push(format!("User @{} no longer manages this group.", u)); - } - Err(e) => { - replies - .push(format!("Failed to revoke {}'s group admin: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage admins".to_string()); - } - } - StatusCommand::OpenGroup => { - if is_admin { - if self.config.is_member_only() { - any_admin_cmd = true; - self.config.set_member_only(false); - replies.push("Group changed to open-access".to_string()); - announcements.push("This group is now open-access!".to_string()); - } - } else { - replies.push("Only admins can set group options".to_string()); - } - } - StatusCommand::CloseGroup => { - if is_admin { - if !self.config.is_member_only() { - any_admin_cmd = true; - self.config.set_member_only(true); - replies.push("Group changed to member-only".to_string()); - announcements.push("This group is now member-only!".to_string()); - } - } else { - replies.push("Only admins can set group options".to_string()); - } - } - StatusCommand::Help => { - want_markdown = true; - - if self.config.is_member_only() { - replies.push("This is a member-only group. ".to_string()); - } else { - replies.push("This is a public-access group. ".to_string()); - } - - if self.config.can_write(&status_acct) { - if is_admin { - replies.push("*You are an admin.*".to_string()); - } else { - replies.push("*You are a member.*".to_string()); - } - } else { - if self.config.is_member_only() { - replies.push("*You are not a member, ask one of the admins to add you.*".to_string()); - } else { - replies.push("*You are not a member, follow or use /join to join the group.*".to_string()); - } - } - - replies.push( - "\nTo share an original post, mention the group user.\n\ - Replies and mentions with commands won't be shared.\n\ - \n\ - **Supported commands:**\n\ - `/boost, /b` - boost the replied-to post into the group\n\ - `/ignore, /i` - make the group completely ignore the post\n\ - `/ping` - check that the service is alive\n\ - `/join` - join the group\n\ - `/leave` - leave the group".to_string(), - ); - - if self.config.is_member_only() { - replies.push("`/members, /who` - show group members / admins".to_string()); - } else { - replies.push("`/members, /who` - show group admins".to_string()); - } - - if is_admin { - replies.push( - "\n\ - **Admin commands:**\n\ - `/add user` - add a member (use e-mail style address)\n\ - `/kick, /remove user` - kick a member\n\ - `/ban x` - ban a user or a server\n\ - `/unban x` - lift a ban\n\ - `/op, /admin user` - grant admin rights\n\ - `/deop, /deadmin user` - revoke admin rights\n\ - `/opengroup` - make member-only\n\ - `/closegroup` - make public-access\n\ - `/announce x` - make a public announcement from the rest of the status" - .to_string(), - ); - } - } - StatusCommand::ListMembers => { - let mut show_admins = false; - if is_admin { - replies.push("Group members:".to_string()); - self.list_members(&mut replies); - } else { - replies.push("Group admins:".to_string()); - self.list_admins(&mut replies); - } - } - StatusCommand::ListTags => { - replies.push("Group tags:".to_string()); - let mut tags = self.config.get_tags().collect::>(); - tags.sort(); - for t in tags { - replies.push(format!("#{}", t)); - } - } - StatusCommand::Leave => { - if self.config.is_member_or_admin(&status_acct) { - // admin can leave but that's a bad idea - - any_admin_cmd = true; - let _ = self.config.set_member(&status_acct, false); - replies.push("You're no longer a group member. Unfollow the group user to stop receiving group messages.".to_string()); - - self.unfollow_user(&status_user_id).await - .log_error("Failed to unfollow"); - } - } - StatusCommand::Join => { - if self.config.is_member_or_admin(&status_acct) { - // Already a member, so let's try to follow the user - // again, maybe first time it failed - self.follow_user(status_user_id).await - .log_error("Failed to follow"); - } else { - // Not a member yet - if self.config.is_member_only() { - // No you can't - replies.push(format!( - "Hi, this group is closed to new sign-ups.\n\ - Please ask one of the group admins to add you:")); - self.list_admins(&mut replies); - } else { - // Open access - self.follow_user(status_user_id).await - .log_error("Failed to follow"); - - // This only fails if the user is banned, but that is filtered above - let _ = self.config.set_member(&status_acct, true); - replies.push(format!("\ - Thanks for joining, you are now a member and the group user will \ - follow you so you can use group hashtags. Make sure you follow the \ - group user to receive group messages.")); - } - } - } - StatusCommand::Ping => { - replies.push("Pong".to_string()); - } - } - } - - // tokio::time::sleep(DELAY_BEFORE_ACTION).await; - } - - if do_boost_prev_post { - self.client - .reblog(&status.in_reply_to_id.as_ref().unwrap()) - .await - .log_error("Failed to boost"); - } - - if !replies.is_empty() { - debug!("replies={:?}", replies); - let r = replies.join("\n"); - debug!("r={}", r); - - let post = StatusBuilder::new() - .status(format!("@{user}\n{msg}", user = status_acct, msg = r)) - .content_type(if want_markdown { - "text/markdown" - } else { - "text/plain" - }) - .visibility(Visibility::Direct) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - - if !announcements.is_empty() { - let msg = announcements.join("\n"); - let post = StatusBuilder::new() - .status(format!("**📢 Group announcement**\n{msg}", msg = msg)) - .content_type("text/markdown") - .visibility(Visibility::Public) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - - if any_admin_cmd { - debug!("Saving after admin cmd"); - self.save_if_needed().await.log_error("Failed to save"); - } - - Ok(()) - } -} - -fn make_welcome_text(user: &str) -> String { - format!( - "@{user} Welcome to the group! To share a post, tag the group user \ - or use one of the group hashtags. Use /help for more info.", - user = user - ) -} - - -trait VisExt: Copy { - /// Check if is private or direct - fn is_private(self) -> bool; -} - -impl VisExt for Visibility { - fn is_private(self) -> bool { - self == Visibility::Direct || self == Visibility::Private - } -} diff --git a/src/group_handler/handle_mention.rs b/src/group_handler/handle_mention.rs new file mode 100644 index 0000000..e145adf --- /dev/null +++ b/src/group_handler/handle_mention.rs @@ -0,0 +1,625 @@ +use std::collections::HashSet; +use std::time::Duration; + +use elefren::{FediClient, SearchType, StatusBuilder}; +use elefren::entities::prelude::Status; +use elefren::status_builder::Visibility; + +use crate::command::StatusCommand; +use crate::error::GroupError; +use crate::group_handler::GroupHandle; +use crate::store::data::GroupConfig; +use crate::utils::{LogError, normalize_acct}; + +pub struct ProcessMention<'a> { + status: Status, + config: &'a mut GroupConfig, + client: &'a mut FediClient, + group_acct: String, + status_acct: String, + status_user_id: String, + can_write: bool, + is_admin: bool, + replies: Vec, + announcements: Vec, + do_boost_prev_post: bool, + want_markdown: bool, +} + +impl<'a> ProcessMention<'a> { + async fn lookup_acct_id(&self, acct: &str, followed: bool) -> Result, GroupError> { + debug!("Looking up user ID by acct: {}", acct); + + match tokio::time::timeout(Duration::from_secs(5), self.client.search_v2( + acct, + !followed, + Some(SearchType::Accounts), + Some(1), + followed, + )).await { + Err(_) => { + warn!("Account lookup timeout!"); + Err(GroupError::ApiTimeout) + } + Ok(Err(e)) => { + // Elefren error + Err(e.into()) + } + Ok(Ok(res)) => { + if let Some(item) = res.accounts.into_iter().next() { + debug!("Search done, account found"); + Ok(Some(item.id)) + } else { + debug!("Search done, nothing found"); + Ok(None) + } + } + } + } + + fn append_admin_list_to_reply(&mut self) { + let mut admins = self.config.get_admins().collect::>(); + admins.sort(); + for a in admins { + self.replies.push(a.to_string()); + } + } + + fn append_member_list_to_reply(&mut self) { + let admins = self.config.get_admins().collect::>(); + let mut members = self.config.get_members().collect::>(); + members.extend(admins.iter()); + members.sort(); + members.dedup(); + for m in members { + self.replies.push(if admins.contains(&m) { + format!("{} [admin]", m) + } else { + m.to_string() + }); + } + } + + async fn follow_user(&self, id: &str) -> Result<(), GroupError> { + self.client.follow(id).await?; + Ok(()) + } + + async fn unfollow_user(&self, id: &str) -> Result<(), GroupError> { + self.client.unfollow(id).await?; + Ok(()) + } + + pub(crate) async fn run(gh: &'a mut GroupHandle, status: Status) -> Result<(), GroupError> { + let group_acct = gh.config.get_acct().to_string(); + let status_acct = normalize_acct(&status.account.acct, &group_acct)?.to_string(); + + if gh.config.is_banned(&status_acct) { + warn!("Status author {} is banned!", status_acct); + return Ok(()); + } + + let pm = Self { + status_user_id: status.account.id.to_string(), + client: &mut gh.client, + can_write: gh.config.can_write(&status_acct), + is_admin: gh.config.is_admin(&status_acct), + replies: vec![], + announcements: vec![], + do_boost_prev_post: false, + want_markdown: false, + group_acct, + status_acct, + status, + config: &mut gh.config, + }; + + pm.handle().await + } + + async fn reblog_status(&self) { + self.client.reblog(&self.status.id) + .await + .log_error("Failed to reblog status") + } + + fn add_reply(&mut self, line: impl ToString) { + self.replies.push(line.to_string()) + } + + fn add_announcement(&mut self, line: impl ToString) { + self.announcements.push(line.to_string()) + } + + async fn handle(mut self) -> Result<(), GroupError> { + let commands = crate::command::parse_slash_commands(&self.status.content); + + if commands.is_empty() { + self.handle_post_with_no_commands().await; + } else { + if commands.contains(&StatusCommand::Ignore) { + debug!("Notif ignored because of ignore command"); + return Ok(()); + } + + for cmd in commands { + match cmd { + StatusCommand::Ignore => { + unreachable!(); // Handled above + } + StatusCommand::Announce(a) => { + self.cmd_announce(a).await; + } + StatusCommand::Boost => { + self.cmd_boost().await; + } + StatusCommand::BanUser(u) => { + self.cmd_ban_user(&u).await + .log_error("Error handling ban-user cmd"); + } + StatusCommand::UnbanUser(u) => { + self.cmd_unban_user(&u).await + .log_error("Error handling unban-user cmd"); + } + StatusCommand::BanServer(s) => { + self.cmd_ban_server(&s).await; + } + StatusCommand::UnbanServer(s) => { + self.cmd_unban_server(&s).await; + } + StatusCommand::AddMember(u) => { + self.cmd_add_member(&u).await + .log_error("Error handling add-member cmd"); + } + StatusCommand::RemoveMember(u) => { + self.cmd_remove_member(&u).await + .log_error("Error handling remove-member cmd"); + } + StatusCommand::AddTag(tag) => { + self.cmd_add_tag(tag).await; + } + StatusCommand::RemoveTag(tag) => { + self.cmd_remove_tag(tag).await; + } + StatusCommand::GrantAdmin(u) => { + self.cmd_grant_member(&u).await + .log_error("Error handling grant-admin cmd"); + } + StatusCommand::RemoveAdmin(u) => { + self.cmd_revoke_member(&u).await + .log_error("Error handling grant-admin cmd"); + } + StatusCommand::OpenGroup => { + self.cmd_open_group().await; + } + StatusCommand::CloseGroup => { + self.cmd_close_group().await; + } + StatusCommand::Help => { + self.cmd_help().await; + } + StatusCommand::ListMembers => { + self.cmd_list_members().await; + } + StatusCommand::ListTags => { + self.cmd_list_tags().await; + } + StatusCommand::Leave => { + self.cmd_leave().await; + } + StatusCommand::Join => { + self.cmd_join().await; + } + StatusCommand::Ping => { + self.cmd_ping().await; + } + } + } + } + + if self.do_boost_prev_post { + self.client + .reblog(self.status.in_reply_to_id.as_ref().unwrap()) + .await + .log_error("Failed to boost"); + } + + if !self.replies.is_empty() { + debug!("replies={:?}", self.replies); + let r = self.replies.join("\n"); + debug!("r={}", r); + + if let Ok(post) = StatusBuilder::new() + .status(format!("@{user}\n{msg}", user = self.status_acct, msg = r)) + .content_type(if self.want_markdown { + "text/markdown" + } else { + "text/plain" + }) + .visibility(self.status.visibility) // Copy visibility + .build() + { + let _ = self.client.new_status(post) + .await.log_error("Failed to post"); + } + } + + if !self.announcements.is_empty() { + let msg = self.announcements.join("\n"); + let post = StatusBuilder::new() + .status(format!("**📢 Group announcement**\n{msg}", msg = msg)) + .content_type("text/markdown") + .visibility(Visibility::Public) + .build() + .expect("error build status"); + + let _ = self.client.new_status(post) + .await.log_error("Failed to post"); + } + + Ok(()) + } + + async fn handle_post_with_no_commands(&mut self) { + debug!("No commands in post"); + if self.status.in_reply_to_id.is_none() { + if self.can_write { + // Someone tagged the group in OP, boost it. + info!("Boosting OP mention"); + // tokio::time::sleep(DELAY_BEFORE_ACTION).await; + self.reblog_status().await; + } else { + self.add_reply("You are not allowed to post to this group"); + } + } else { + debug!("Not OP, ignore mention"); + } + } + + async fn cmd_announce(&mut self, msg: String) { + info!("Sending PSA"); + self.add_announcement(msg); + } + + async fn cmd_boost(&mut self) { + if self.can_write { + self.do_boost_prev_post = self.status.in_reply_to_id.is_some(); + } else { + self.add_reply("You are not allowed to share to this group"); + } + } + + async fn cmd_ban_user(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if !self.config.is_banned(&u) { + match self.config.ban_user(&u, true) { + Ok(_) => { + self.add_reply(format!("User {} banned from group!", u)); + self.unfollow_by_acct(&u).await + .log_error("Failed to unfollow banned user"); + } + Err(e) => { + self.add_reply(format!("Failed to ban user {}: {}", u, e)); + } + } + } + } else { + self.add_reply("Only admins can manage user bans"); + } + Ok(()) + } + + async fn cmd_unban_user(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if self.config.is_banned(&u) { + match self.config.ban_user(&u, false) { + Ok(_) => { + self.add_reply(format!("User {} un-banned!", u)); + // no announcement here + } + Err(_) => { + unreachable!() + } + } + } + } else { + self.add_reply("Only admins can manage user bans"); + } + Ok(()) + } + + async fn cmd_ban_server(&mut self, s: &str) { + if self.is_admin { + if !self.config.is_server_banned(s) { + match self.config.ban_server(s, true) { + Ok(_) => { + self.add_announcement(format!("Server \"{}\" has been banned.", s)); + self.add_reply(format!("Server {} banned from group!", s)); + } + Err(e) => { + self.add_reply(format!("Failed to ban server {}: {}", s, e)); + } + } + } + } else { + self.add_reply("Only admins can manage server bans"); + } + } + + async fn cmd_unban_server(&mut self, s: &str) { + if self.is_admin { + if self.config.is_server_banned(s) { + match self.config.ban_server(s, false) { + Ok(_) => { + self.add_announcement(format!("Server \"{}\" has been un-banned.", s)); + self.add_reply(format!("Server {} un-banned!", s)); + } + Err(_) => { + unreachable!() + } + } + } + } else { + self.add_reply("Only admins can manage server bans"); + } + } + + async fn cmd_add_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if !self.config.is_member(&u) { + match self.config.set_member(&u, true) { + Ok(_) => { + self.add_reply(format!("User {} added to the group!", u)); + self.follow_user(&self.status_user_id) + .await.log_error("Failed to follow"); + } + Err(e) => { + self.add_reply(format!("Failed to add user {} to group: {}", u, e)); + } + } + } + } else { + self.add_reply("Only admins can manage members"); + } + Ok(()) + } + + async fn cmd_remove_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if self.config.is_member(&u) { + match self.config.set_member(&u, false) { + Ok(_) => { + self.add_reply(format!("User {} removed from the group.", u)); + self.unfollow_by_acct(&u).await + .log_error("Failed to unfollow removed user"); + } + Err(_) => { + unreachable!() + } + } + } + } else { + self.add_reply("Only admins can manage members"); + } + Ok(()) + } + + async fn cmd_add_tag(&mut self, tag: String) { + if self.is_admin { + self.config.add_tag(&tag); + self.add_reply(format!("Tag #{} added to the group!", tag)); + } else { + self.add_reply("Only admins can manage group tags"); + } + } + + async fn cmd_remove_tag(&mut self, tag: String) { + if self.is_admin { + self.config.remove_tag(&tag); + self.add_reply(format!("Tag #{} removed from the group!", tag)); + } else { + self.add_reply("Only admins can manage group tags"); + } + } + + async fn cmd_grant_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if !self.config.is_admin(&u) { + match self.config.set_admin(&u, true) { + Ok(_) => { + // try to make the config a little more sane, admins should be members + let _ = self.config.set_member(&u, true); + + self.add_reply(format!("User {} is now a group admin!", u)); + self.add_announcement(format!("User @{} can now manage this group!", u)); + } + Err(e) => { + self.add_reply(format!( + "Failed to make user {} a group admin: {}", + u, e + )); + } + } + } + } else { + self.add_reply("Only admins can manage admins"); + } + Ok(()) + } + + async fn cmd_revoke_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if self.config.is_admin(&u) { + match self.config.set_admin(&u, false) { + Ok(_) => { + self.add_reply(format!("User {} is no longer a group admin!", u)); + self.add_announcement(format!("User @{} no longer manages this group.", u)); + } + Err(e) => { + self.add_reply(format!("Failed to revoke {}'s group admin: {}", u, e)); + } + } + } + } else { + self.add_reply("Only admins can manage admins"); + } + Ok(()) + } + + async fn cmd_open_group(&mut self) { + if self.is_admin { + if self.config.is_member_only() { + self.config.set_member_only(false); + self.add_reply("Group changed to open-access"); + self.add_announcement("This group is now open-access!"); + } + } else { + self.add_reply("Only admins can set group options"); + } + } + + async fn cmd_close_group(&mut self) { + if self.is_admin { + if !self.config.is_member_only() { + self.config.set_member_only(true); + self.add_reply("Group changed to member-only"); + self.add_announcement("This group is now member-only!"); + } + } else { + self.add_reply("Only admins can set group options"); + } + } + + async fn cmd_help(&mut self) { + self.want_markdown = true; + + if self.config.is_member_only() { + self.add_reply("This is a member-only group. "); + } else { + self.add_reply("This is a public-access group. "); + } + + if self.config.can_write(&self.status_acct) { + if self.is_admin { + self.add_reply("*You are an admin.*"); + } else { + self.add_reply("*You are a member.*"); + } + } else { + if self.config.is_member_only() { + self.add_reply("*You are not a member, ask one of the admins to add you.*"); + } else { + self.add_reply("*You are not a member, follow or use /join to join the group.*"); + } + } + + self.add_reply("\n\ + To share an original post, mention the group user.\n\ + Replies and mentions with commands won't be shared.\n\ + \n\ + **Supported commands:**\n\ + `/boost, /b` - boost the replied-to post into the group\n\ + `/ignore, /i` - make the group completely ignore the post\n\ + `/ping` - check that the service is alive\n\ + `/join` - join the group\n\ + `/leave` - leave the group"); + + if self.config.is_member_only() { + self.add_reply("`/members, /who` - show group members / admins"); + } else { + self.add_reply("`/members, /who` - show group admins"); + } + + if self.is_admin { + self.add_reply("\n\ + **Admin commands:**\n\ + `/add user` - add a member (use e-mail style address)\n\ + `/kick, /remove user` - kick a member\n\ + `/ban x` - ban a user or a server\n\ + `/unban x` - lift a ban\n\ + `/op, /admin user` - grant admin rights\n\ + `/deop, /deadmin user` - revoke admin rights\n\ + `/opengroup` - make member-only\n\ + `/closegroup` - make public-access\n\ + `/announce x` - make a public announcement from the rest of the status"); + } + } + + async fn cmd_list_members(&mut self) { + if self.is_admin { + self.add_reply("Group members:"); + self.append_member_list_to_reply(); + } else { + self.add_reply("Group admins:"); + self.append_admin_list_to_reply(); + } + } + + async fn cmd_list_tags(&mut self) { + self.add_reply("Group tags:"); + let mut tags = self.config.get_tags().collect::>(); + tags.sort(); + for t in tags { + self.replies.push(format!("#{}", t).to_string()); + } + } + + async fn cmd_leave(&mut self) { + if self.config.is_member_or_admin(&self.status_acct) { + // admin can leave but that's a bad idea + let _ = self.config.set_member(&self.status_acct, false); + self.add_reply("You're no longer a group member. Unfollow the group user to stop receiving group messages."); + self.unfollow_user(&self.status_user_id).await + .log_error("Failed to unfollow"); + } + } + + async fn cmd_join(&mut self) { + if self.config.is_member_or_admin(&self.status_acct) { + debug!("Already member or admin, try to follow-back again"); + // Already a member, so let's try to follow the user + // again, maybe first time it failed + self.follow_user(&self.status_user_id).await + .log_error("Failed to follow"); + } else { + // Not a member yet + if self.config.is_member_only() { + // No you can't + self.add_reply("\ + Sorry, this group is closed to new sign-ups.\n\ + Please ask one of the group admins to add you:"); + + self.append_admin_list_to_reply(); + } else { + // Open access, try to follow back + self.follow_user(&self.status_user_id).await + .log_error("Failed to follow"); + + // This only fails if the user is banned, but that is filtered above + let _ = self.config.set_member(&self.status_acct, true); + self.add_reply("\ + Welcome to the group! The group user will now follow you to complete the sign-up. \ + Make sure you follow back to receive shared posts!\n\n\ + Use /help for more info."); + } + } + } + + async fn cmd_ping(&mut self) { + self.add_reply(format!("pong, this is fedigroups service v{}", env!("CARGO_PKG_VERSION"))); + } + + async fn unfollow_by_acct(&self, acct: &str) -> Result<(), GroupError> { + // Try to unfollow + if let Ok(Some(id)) = self.lookup_acct_id(acct, true).await { + self.unfollow_user(&id).await?; + } + Ok(()) + } +} diff --git a/src/group_handler/mod.rs b/src/group_handler/mod.rs new file mode 100644 index 0000000..d5b260b --- /dev/null +++ b/src/group_handler/mod.rs @@ -0,0 +1,454 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use elefren::{FediClient, StatusBuilder}; +use elefren::debug::EventDisplay; +use elefren::debug::NotificationDisplay; +use elefren::debug::StatusDisplay; +use elefren::entities::event::Event; +use elefren::entities::notification::{Notification, NotificationType}; +use elefren::entities::status::Status; +use elefren::status_builder::Visibility; +use futures::StreamExt; + +use handle_mention::ProcessMention; + +use crate::error::GroupError; +use crate::store::ConfigStore; +use crate::store::data::GroupConfig; +use crate::utils::{LogError, normalize_acct, VisExt}; + +mod handle_mention; + +/// This is one group's config store capable of persistence +#[derive(Debug)] +pub struct GroupHandle { + pub(crate) client: FediClient, + pub(crate) config: GroupConfig, + pub(crate) store: Arc, +} + +// const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); +const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); +const MAX_CATCHUP_NOTIFS: usize = 25; +// also statuses +const MAX_CATCHUP_STATUSES: usize = 50; +// higher because we can expect a lot of non-hashtag statuses here +const PERIODIC_SAVE: Duration = Duration::from_secs(60); +const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! + +impl GroupHandle { + pub async fn save(&mut self) -> Result<(), GroupError> { + debug!("Saving group config & status"); + self.store.set_group_config(self.config.clone()).await?; + trace!("Saved"); + self.config.clear_dirty_status(); + Ok(()) + } + + pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { + if self.config.is_dirty() { + self.save().await?; + } + Ok(()) + } + + /* + pub async fn reload(&mut self) -> Result<(), GroupError> { + if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { + self.config = g; + Ok(()) + } else { + Err(GroupError::GroupNotExist) + } + } + */ +} + +trait NotifTimestamp { + fn timestamp_millis(&self) -> u64; +} + +impl NotifTimestamp for Notification { + fn timestamp_millis(&self) -> u64 { + self.created_at.timestamp_millis().max(0) as u64 + } +} + +impl NotifTimestamp for Status { + fn timestamp_millis(&self) -> u64 { + // this may not work well for unseen status tracking, + // if ancient statuses were to appear in the timeline :( + self.created_at.timestamp_millis().max(0) as u64 + } +} + +impl GroupHandle { + pub async fn run(&mut self) -> Result<(), GroupError> { + assert!(PERIODIC_SAVE >= PING_INTERVAL); + + loop { + debug!("Opening streaming API socket"); + let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start + let mut events = self.client.streaming_user().await?; + let socket_open_time = Instant::now(); + let mut last_rx = Instant::now(); + let mut last_ping = Instant::now(); + + match self.catch_up_with_missed_notifications().await { + Ok(true) => { + debug!("Some missed notifs handled"); + } + Ok(false) => { + debug!("No notifs missed"); + } + Err(e) => { + error!("Failed to handle missed notifs: {}", e); + } + } + + match self.catch_up_with_missed_statuses().await { + Ok(true) => { + debug!("Some missed statuses handled"); + } + Ok(false) => { + debug!("No statuses missed"); + } + Err(e) => { + error!("Failed to handle missed statuses: {}", e); + } + } + + if self.config.is_dirty() { + // save asap + next_save = Instant::now() - PERIODIC_SAVE + } + + 'rx: loop { + if next_save < Instant::now() { + trace!("Save time elapsed, saving if needed"); + self.save_if_needed().await.log_error("Failed to save group"); + next_save = Instant::now() + PERIODIC_SAVE; + } + + if last_rx.elapsed() > PING_INTERVAL * 2 { + warn!("Socket idle too long, close"); + break 'rx; + } + + if socket_open_time.elapsed() > Duration::from_secs(120) { + debug!("Socket open too long, closing"); + break 'rx; + } + + trace!("Waiting for message"); + let timeout = next_save + .saturating_duration_since(Instant::now()) + .min(PING_INTERVAL) + .max(Duration::from_secs(1)); + + match tokio::time::timeout(timeout, events.next()).await { + Ok(Some(event)) => { + last_rx = Instant::now(); + debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); + match event { + Event::Update(status) => { + self.handle_status(status).await.log_error("Error handling a status"); + } + Event::Notification(n) => { + self.handle_notification(n).await.log_error("Error handling a notification"); + } + Event::Delete(_id) => {} + Event::FiltersChanged => {} + Event::Heartbeat => {} + } + } + Ok(None) => { + warn!("Group @{} socket closed, restarting...", self.config.get_acct()); + break 'rx; + } + Err(_) => { + // Timeout so we can save if needed + } + } + + if last_ping.elapsed() > PING_INTERVAL { + last_ping = Instant::now(); + trace!("Pinging"); + if events.send_ping() + .await.is_err() { + break 'rx; + } + } + } + + warn!("Notif stream closed, will reopen"); + tokio::time::sleep(DELAY_REOPEN_STREAM).await; + } + } + + async fn handle_notification(&mut self, n: Notification) -> Result<(), GroupError> { + debug!("Handling notif #{}", n.id); + let ts = n.timestamp_millis(); + self.config.set_last_notif(ts); + + let group_acct = self.config.get_acct().to_string(); + let notif_user_id = &n.account.id; + let notif_acct = normalize_acct(&n.account.acct, &group_acct)?; + + if notif_acct == group_acct { + debug!("This is our post, ignore that"); + return Ok(()); + } + + if self.config.is_banned(¬if_acct) { + warn!("Notification actor {} is banned!", notif_acct); + return Ok(()); + } + + match n.notification_type { + NotificationType::Mention => { + if let Some(status) = n.status { + self.handle_mention_status(status).await?; + } + } + NotificationType::Follow => { + info!("New follower!"); + + // Just greet the user always + self.handle_new_follow(¬if_acct, notif_user_id).await; + + // if self.config.is_member_or_admin(¬if_acct) { + // // Already joined, just doing something silly, ignore this + // debug!("User already a member, ignoring"); + // } else { + // + // } + } + NotificationType::Favourite => {} + NotificationType::Reblog => {} + NotificationType::Other(_) => {} + } + + Ok(()) + } + + /// Handle a non-mention status + async fn handle_status(&mut self, s: Status) -> Result<(), GroupError> { + debug!("Handling status #{}", s.id); + let ts = s.timestamp_millis(); + self.config.set_last_status(ts); + + if s.visibility.is_private() { + debug!("Status is direct/private, discard"); + return Ok(()); + } + + if !s.content.contains('#') { + debug!("No tags in status, discard"); + return Ok(()); + } + + let group_user = self.config.get_acct(); + let status_user = normalize_acct(&s.account.acct, group_user)?; + + if status_user == group_user { + debug!("This is our post, discard"); + return Ok(()); + } + + if s.content.contains("/add ") + || s.content.contains("/remove ") + || s.content.contains("\\add ") + || s.content.contains("\\remove ") + { + debug!("Looks like a hashtag manipulation command, discard"); + return Ok(()); + } + + if self.config.is_banned(&status_user) { + debug!("Status author @{} is banned.", status_user); + return Ok(()); + } + + if !self.config.is_member_or_admin(&status_user) { + debug!("Status author @{} is not a member.", status_user); + return Ok(()); + } + + let tags = crate::command::parse_status_tags(&s.content); + debug!("Tags in status: {:?}", tags); + + 'tags: for t in tags { + if self.config.is_tag_followed(&t) { + info!("REBLOG #{} STATUS", &t); + self.client.reblog(&s.id).await + .log_error("Failed to reblog"); + break 'tags; // do not reblog multiple times! + } + } + + Ok(()) + } + + async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { + self.client.follow(id).await?; + Ok(()) + } + + /// Catch up with missed notifications, returns true if any were handled + async fn catch_up_with_missed_notifications(&mut self) -> Result { + let last_notif = self.config.get_last_notif(); + + let notifications = self.client.notifications().await?; + let mut iter = notifications.items_iter(); + + let mut notifs_to_handle = vec![]; + + // They are retrieved newest first, but we want oldest first for chronological handling + + let mut num = 0; + while let Some(n) = iter.next_item().await { + let ts = n.timestamp_millis(); + if ts <= last_notif { + break; // reached our last seen notif + } + + debug!("Inspecting notif {}", NotificationDisplay(&n)); + notifs_to_handle.push(n); + num += 1; + if num > MAX_CATCHUP_NOTIFS { + warn!("Too many notifs missed to catch up!"); + break; + } + + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_millis(250)).await; + } + + if notifs_to_handle.is_empty() { + return Ok(false); + } + + notifs_to_handle.reverse(); + + debug!("{} notifications to catch up!", notifs_to_handle.len()); + + for n in notifs_to_handle { + debug!("Handling missed notification: {}", NotificationDisplay(&n)); + self.handle_notification(n).await.log_error("Error handling a notification"); + } + + Ok(true) + } + + /// Catch up with missed statuses, returns true if any were handled + async fn catch_up_with_missed_statuses(&mut self) -> Result { + let last_status = self.config.get_last_status(); + + let statuses = self.client.get_home_timeline().await?; + let mut iter = statuses.items_iter(); + + let mut statuses_to_handle = vec![]; + + // They are retrieved newest first, but we want oldest first for chronological handling + + let mut newest_status = None; + + let mut num = 0; + while let Some(s) = iter.next_item().await { + let ts = s.timestamp_millis(); + if ts <= last_status { + break; // reached our last seen status (hopefully there arent any retro-bumped) + } + + debug!("Inspecting status {}", StatusDisplay(&s)); + + if newest_status.is_none() { + newest_status = Some(ts); + } + + if s.content.contains('#') && !s.visibility.is_private() { + statuses_to_handle.push(s); + } + num += 1; + if num > MAX_CATCHUP_STATUSES { + warn!("Too many statuses missed to catch up!"); + break; + } + + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_millis(250)).await; + } + + if let Some(ts) = newest_status { + self.config.set_last_status(ts); + } + + if statuses_to_handle.is_empty() { + return Ok(false); + } + + statuses_to_handle.reverse(); + + debug!("{} statuses to catch up!", statuses_to_handle.len()); + + for s in statuses_to_handle { + debug!("Handling missed status: {}", StatusDisplay(&s)); + self.handle_status(s).await + .log_error("Error handling a status"); + } + + Ok(true) + } + + async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { + let res = ProcessMention::run(self, status).await; + + self.save_if_needed().await + .log_error("Failed to save"); + + res + } + + async fn handle_new_follow(&mut self, notif_acct: &str, notif_user_id: &str) { + let mut follow_back = false; + let text = if self.config.is_member_only() { + // Admins are listed without @, so they won't become handles here. + // Tagging all admins would be annoying. + let mut admins = self.config.get_admins().cloned().collect::>(); + admins.sort(); + + format!("\ + @{user} Welcome! This group has posting restricted to members. \ + If you'd like to join, please ask one of the group admins:\n\ + {admins}", + user = notif_acct, + admins = admins.join(", ") + ) + } else { + follow_back = true; + format!("\ + @{user} Welcome to the group! The group user will now follow you back to complete the sign-up. \ + To share a post, tag the group user or use one of the group hashtags.\n\n\ + Use /help for more info.", + user = notif_acct + ) + }; + + let post = StatusBuilder::new() + .status(text) + .content_type("text/markdown") + .visibility(Visibility::Direct) + .build() + .expect("error build status"); + + self.client.new_status(post).await + .log_error("Failed to post"); + + if follow_back { + self.follow_user(notif_user_id).await + .log_error("Failed to follow back"); + } + } +} diff --git a/src/main.rs b/src/main.rs index 36721d3..a13c054 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use crate::utils::acct_to_server; mod command; mod error; -mod group_handle; +mod group_handler; mod store; mod utils; diff --git a/src/store/data.rs b/src/store/data.rs index c921a7f..6e0ac01 100644 --- a/src/store/data.rs +++ b/src/store/data.rs @@ -11,9 +11,9 @@ pub(crate) struct Config { } impl Config { - pub(crate) fn iter_groups(&self) -> impl Iterator { - self.groups.values() - } + // pub(crate) fn iter_groups(&self) -> impl Iterator { + // self.groups.values() + // } pub(crate) fn get_group_config(&self, acct: &str) -> Option<&GroupConfig> { self.groups.get(acct) @@ -103,8 +103,10 @@ impl GroupConfig { } pub(crate) fn set_appdata(&mut self, appdata: AppData) { + if self.appdata != appdata { + self.mark_dirty(); + } self.appdata = appdata; - self.mark_dirty(); } pub(crate) fn get_admins(&self) -> impl Iterator { @@ -120,8 +122,10 @@ impl GroupConfig { } pub(crate) fn set_last_notif(&mut self, ts: u64) { + if self.last_notif_ts != ts { + self.mark_dirty(); + } self.last_notif_ts = self.last_notif_ts.max(ts); - self.mark_dirty(); } pub(crate) fn get_last_notif(&self) -> u64 { @@ -129,8 +133,10 @@ impl GroupConfig { } pub(crate) fn set_last_status(&mut self, ts: u64) { + if self.last_status_ts != ts { + self.mark_dirty(); + } self.last_status_ts = self.last_status_ts.max(ts); - self.mark_dirty(); } pub(crate) fn get_last_status(&self) -> u64 { @@ -177,67 +183,81 @@ impl GroupConfig { } pub(crate) fn set_admin(&mut self, acct: &str, admin: bool) -> Result<(), GroupError> { - if admin { + let change = if admin { if self.is_banned(acct) { return Err(GroupError::UserIsBanned); } - self.admin_users.insert(acct.to_owned()); + self.admin_users.insert(acct.to_owned()) } else { - self.admin_users.remove(acct); + self.admin_users.remove(acct) + }; + if change { + self.mark_dirty(); } - self.mark_dirty(); Ok(()) } pub(crate) fn set_member(&mut self, acct: &str, member: bool) -> Result<(), GroupError> { - if member { + let change = if member { if self.is_banned(acct) { return Err(GroupError::UserIsBanned); } - self.member_users.insert(acct.to_owned()); + self.member_users.insert(acct.to_owned()) } else { - self.member_users.remove(acct); + self.member_users.remove(acct) + }; + if change { + self.mark_dirty(); } - self.mark_dirty(); Ok(()) } pub(crate) fn ban_user(&mut self, acct: &str, ban: bool) -> Result<(), GroupError> { + let mut change = false; if ban { if self.is_admin(acct) { return Err(GroupError::UserIsAdmin); } - self.banned_users.insert(acct.to_owned()); + // Banned user is also kicked + change |= self.member_users.remove(acct); + change |= self.banned_users.insert(acct.to_owned()); } else { - self.banned_users.remove(acct); + change |= self.banned_users.remove(acct); + } + if change { + self.mark_dirty(); } Ok(()) } pub(crate) fn ban_server(&mut self, server: &str, ban: bool) -> Result<(), GroupError> { - if ban { + let changed = if ban { for acct in &self.admin_users { let acct_server = acct_to_server(acct); if acct_server == server { return Err(GroupError::AdminsOnServer); } } - self.banned_servers.insert(server.to_owned()); + self.banned_servers.insert(server.to_owned()) } else { - self.banned_servers.remove(server); + self.banned_servers.remove(server) + }; + if changed { + self.mark_dirty(); } - self.mark_dirty(); Ok(()) } pub(crate) fn add_tag(&mut self, tag: &str) { - self.group_tags.insert(tag.to_string()); - self.mark_dirty(); + if self.group_tags.insert(tag.to_string()) { + self.mark_dirty(); + } } pub(crate) fn remove_tag(&mut self, tag: &str) { - self.group_tags.remove(tag); - self.mark_dirty(); + if self.group_tags.remove(tag) { + self.mark_dirty(); + } } pub(crate) fn is_tag_followed(&self, tag: &str) -> bool { @@ -245,8 +265,10 @@ impl GroupConfig { } pub(crate) fn set_member_only(&mut self, member_only: bool) { + if self.member_only != member_only { + self.mark_dirty(); + } self.member_only = member_only; - self.mark_dirty(); } pub(crate) fn is_member_only(&self) -> bool { diff --git a/src/store/mod.rs b/src/store/mod.rs index 123aab4..9d9ba84 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -8,7 +8,7 @@ use tokio::sync::RwLock; use data::{Config, GroupConfig}; use crate::error::GroupError; -use crate::group_handle::GroupHandle; +use crate::group_handler::GroupHandle; use std::time::Duration; pub(crate) mod data; @@ -162,11 +162,11 @@ impl ConfigStore { //noinspection RsSelfConvention /// Set group config to the store. The store then saved. pub(crate) async fn set_group_config(&self, config: GroupConfig) -> Result<(), GroupError> { - debug!("Locking mutex"); + trace!("Locking mutex"); if let Ok(mut data) = tokio::time::timeout(Duration::from_secs(1), self.data.write()).await { - debug!("Locked"); + trace!("Locked"); data.set_group_config(config); - debug!("Writing file"); + trace!("Writing file"); self.persist(&data).await?; } else { error!("DEADLOCK? Timeout waiting for data RW Lock in settings store"); diff --git a/src/utils.rs b/src/utils.rs index 87f7af4..88ce3b3 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,8 @@ use std::borrow::Cow; use std::error::Error; +use elefren::status_builder::Visibility; + use crate::error::GroupError; pub trait LogError { @@ -82,3 +84,14 @@ mod test { assert_eq!(Err(GroupError::BadConfig("_".into())), normalize_acct("piggo", "uhh")); } } + +pub trait VisExt: Copy { + /// Check if is private or direct + fn is_private(self) -> bool; +} + +impl VisExt for Visibility { + fn is_private(self) -> bool { + self == Visibility::Direct || self == Visibility::Private + } +}