From 3b7700a4b1da676a20867f621048bac4a0a389e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Tue, 24 Aug 2021 22:13:44 +0200 Subject: [PATCH 1/4] untested hashtag boost --- Cargo.lock | 1 - Cargo.toml | 4 +- src/command.rs | 178 +++++++++++++++++++--- src/group_handle.rs | 354 ++++++++++++++++++++++++++++++++++++-------- src/store/data.rs | 43 +++++- src/store/mod.rs | 2 + 6 files changed, 494 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a1f5ee..bb2bd64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,7 +276,6 @@ checksum = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" [[package]] name = "elefren" version = "0.22.0" -source = "git+https://git.ondrovo.com/MightyPork/elefren-fork.git#de38639fd178ae8ae47adb880ed965437a3d608e" dependencies = [ "chrono", "doc-comment", diff --git a/Cargo.toml b/Cargo.toml index c924c39..8fec97f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,8 @@ build = "build.rs" [dependencies] #elefren = { version = "0.22.0", features = ["toml"] } -#elefren = { path = "../elefren22-fork" } -elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git" } +elefren = { path = "../elefren22-fork" } +#elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git" } env_logger = "0.9.0" diff --git a/src/command.rs b/src/command.rs index cba20f0..394f73e 100644 --- a/src/command.rs +++ b/src/command.rs @@ -3,22 +3,54 @@ use regex::Regex; #[derive(Debug, Clone, PartialEq)] pub enum StatusCommand { - Boost, + /// Ignore this post Ignore, + /// Boost the previous post in the thread + Boost, + /// Admin: Ban a user BanUser(String), + /// Admin: Un-ban a server UnbanUser(String), + /// Admin: Ban a server BanServer(String), + /// Admin: Un-ban a server UnbanServer(String), + /// Admin: Add a member to a closed group (or force join) AddMember(String), + /// Admin: Remove a user from the group, also unfollow RemoveMember(String), + /// Admin: Add a hashtag to the group + AddTag(String), + /// Admin: Remove a hashtag from the group + RemoveTag(String), + /// Admin: Give admin to a user GrantAdmin(String), + /// Admin: Revoke admin to a user RemoveAdmin(String), + /// Admin: Send a public announcement Announce(String), + /// Admin: Make the group open-access OpenGroup, + /// Admin: Make the group member-only, this effectively disables posting from non-members + /// and disables /join and follow-back CloseGroup, + /// Show help. The content varies by group params (open/closed access), the user's privileges + /// and membership status. Help, + /// Show members. Non-admins will only see a list of admins. ListMembers, + /// Show tags. + ListTags, + /// Leave the group, this asks the group to unfollow the user and also revokes their membership. Leave, + /// Join a public group. This is normally not needed, as the group follows back and adds followers as members. + /// Manual join is useful when the follow somehow fails, or when the user wants to be able to + /// post without receiving the group's posts (naughty!) + /// + /// In member-only groups, this will just DM the user some info on how to get added. + Join, + /// The group will DM "Pong" back, this is to test that the daemon is running and also that the + /// user is not banned and federates. Ping, } @@ -27,12 +59,19 @@ macro_rules! p_user { r"(@?[a-zA-Z0-9_.-]+@[a-zA-Z0-9_.-]+\.[a-z0-9_-]+|@[a-zA-Z0-9_.-]+)" }; } + macro_rules! p_server { () => { r"([a-zA-Z0-9_.-]+\.[a-zA-Z0-9_-]+)" }; } +macro_rules! p_hashtag { + () => { + r"#(\w+)" + }; +} + macro_rules! command { ($($val:expr),+) => { Regex::new(concat!(r"(?:^|\s|>|\n)[\\/]", $($val,)+ r"(?:$|[!,]|\W)")).unwrap() @@ -55,6 +94,10 @@ static RE_ADD_MEMBER: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?: static RE_REMOVE_MEMBER: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:kick|remove)\s+", p_user!())); +static RE_ADD_TAG: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:add)\s+", p_hashtag!())); + +static RE_REMOVE_TAG: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:remove)\s+", p_hashtag!())); + static RE_GRANT_ADMIN: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:op|admin)\s+", p_user!())); static RE_REVOKE_ADMIN: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:deop|deadmin)\s+", p_user!())); @@ -67,14 +110,37 @@ static RE_HELP: once_cell::sync::Lazy = Lazy::new(|| command!(r"help")); static RE_MEMBERS: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:members|who)")); +static RE_TAGS: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:hashtags|tags)")); + static RE_LEAVE: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:leave)")); +static RE_JOIN: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:join)")); + static RE_PING: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:ping)")); static RE_ANNOUNCE: once_cell::sync::Lazy = Lazy::new(|| Regex::new(concat!(r"(?:^|\s|>|\n)[\\/]announce\s+(.*)$")).unwrap()); -pub fn parse_status(content: &str) -> Vec { +static RE_A_HASHTAG: once_cell::sync::Lazy = + Lazy::new(|| Regex::new(concat!(r"\b#(\w+)")).unwrap()); + +pub fn parse_status_tags(content: &str) -> Vec { + debug!("Raw content: {}", content); + let content = content.replace("
", " "); + let content = voca_rs::strip::strip_tags(&content); + debug!("Stripped tags: {}", content); + + let mut tags = vec![]; + for c in RE_A_HASHTAG.captures_iter(&content) { + if let Some(s) = c.get(1) { + tags.push(s.as_str().to_string()) + } + } + + tags +} + +pub fn parse_slash_commands(content: &str) -> Vec { debug!("Raw content: {}", content); let content = content.replace("
", " "); @@ -111,6 +177,9 @@ pub fn parse_status(content: &str) -> Vec { if RE_LEAVE.is_match(&content) { debug!("LEAVE"); commands.push(StatusCommand::Leave); + } else if RE_JOIN.is_match(&content) { + debug!("JOIN"); + commands.push(StatusCommand::Join); } if RE_PING.is_match(&content) { @@ -123,6 +192,11 @@ pub fn parse_status(content: &str) -> Vec { commands.push(StatusCommand::ListMembers); } + if RE_TAGS.is_match(&content) { + debug!("TAGS"); + commands.push(StatusCommand::ListTags); + } + if RE_OPEN_GROUP.is_match(&content) { debug!("OPEN GROUP"); commands.push(StatusCommand::OpenGroup); @@ -188,11 +262,27 @@ pub fn parse_status(content: &str) -> Vec { if let Some(s) = c.get(1) { let s = s.as_str(); let s = s.trim_start_matches('@'); - debug!("UNBAN USER: {}", s); + debug!("REMOVE USER: {}", s); commands.push(StatusCommand::RemoveMember(s.to_owned())); } } + for c in RE_ADD_TAG.captures_iter(&content) { + if let Some(s) = c.get(1) { + let s = s.as_str(); + debug!("ADD TAG: {}", s); + commands.push(StatusCommand::AddTag(s.to_owned())); + } + } + + for c in RE_REMOVE_TAG.captures_iter(&content) { + if let Some(s) = c.get(1) { + let s = s.as_str(); + debug!("REMOVE TAG: {}", s); + commands.push(StatusCommand::RemoveTag(s.to_owned())); + } + } + for c in RE_GRANT_ADMIN.captures_iter(&content) { if let Some(s) = c.get(1) { let s = s.as_str(); @@ -216,11 +306,11 @@ pub fn parse_status(content: &str) -> Vec { #[cfg(test)] mod test { - use crate::command::{parse_status, StatusCommand}; + use crate::command::{parse_slash_commands, StatusCommand, RE_JOIN, RE_ADD_TAG, RE_A_HASHTAG}; use super::{ RE_ADD_MEMBER, RE_ANNOUNCE, RE_BAN_SERVER, RE_BAN_USER, RE_BOOST, RE_CLOSE_GROUP, RE_GRANT_ADMIN, RE_HELP, - RE_IGNORE, RE_LEAVE, RE_MEMBERS, RE_OPEN_GROUP, RE_REMOVE_MEMBER, RE_REVOKE_ADMIN, + RE_IGNORE, RE_LEAVE, RE_MEMBERS, RE_TAGS, RE_OPEN_GROUP, RE_REMOVE_MEMBER, RE_REVOKE_ADMIN, }; #[test] @@ -276,23 +366,18 @@ mod test { assert!(RE_BAN_USER.is_match("/ban @LAIN@PleromA.soykaf.com")); let c = RE_BAN_USER.captures("/ban lain@pleroma.soykaf.com"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "lain@pleroma.soykaf.com"); let c = RE_BAN_USER.captures("/ban lain@pleroma.soykaf.com xx"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "lain@pleroma.soykaf.com"); let c = RE_BAN_USER.captures("/ban @lain@pleroma.soykaf.com"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "@lain@pleroma.soykaf.com"); let c = RE_BAN_USER.captures("/ban @lain"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "@lain"); let c = RE_BAN_USER.captures("/ban @lain xx"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "@lain"); } @@ -306,11 +391,9 @@ mod test { assert!(!RE_BAN_SERVER.is_match("/ban @pleroma.soykaf.com")); let c = RE_BAN_SERVER.captures("/ban pleroma.soykaf.com"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "pleroma.soykaf.com"); let c = RE_BAN_SERVER.captures("/ban pleroma.soykaf.com xx"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "pleroma.soykaf.com"); } @@ -321,7 +404,6 @@ mod test { assert!(RE_ADD_MEMBER.is_match("\\add @lain")); let c = RE_ADD_MEMBER.captures("/add @lain"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "@lain"); } @@ -337,10 +419,34 @@ mod test { assert!(RE_REMOVE_MEMBER.is_match("/remove @lain")); let c = RE_REMOVE_MEMBER.captures("/kick lain@pleroma.soykaf.com"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "lain@pleroma.soykaf.com"); } + #[test] + fn test_add_member() { + assert!(RE_ADD_TAG.is_match("/add #breadposting")); + assert!(RE_ADD_TAG.is_match("/add #čučkaři")); + assert!(RE_ADD_TAG.is_match("/add #χαλβάς")); + assert!(RE_ADD_TAG.is_match("\\add #ласточка")); + assert!(RE_ADD_TAG.is_match("/add #nya.")); + assert!(RE_ADD_TAG.is_match("/add #nya)")); + + let c = RE_ADD_TAG.captures("/add #breadposting"); + assert_eq!(c.unwrap().get(1).unwrap().as_str(), "breadposting"); + + let c = RE_ADD_TAG.captures("/add #χαλβάς"); + assert_eq!(c.unwrap().get(1).unwrap().as_str(), "χαλβάς"); + + let c = RE_ADD_TAG.captures("/add #ласточка"); + assert_eq!(c.unwrap().get(1).unwrap().as_str(), "ласточка"); + + let c = RE_ADD_TAG.captures("#nya."); + assert_eq!(c.unwrap().get(1).unwrap().as_str(), "nya"); + + let c = RE_ADD_TAG.captures("#nya)"); + assert_eq!(c.unwrap().get(1).unwrap().as_str(), "nya"); + } + #[test] fn test_add_admin() { assert!(!RE_GRANT_ADMIN.is_match("/expel lain@pleroma.soykaf.com")); @@ -350,7 +456,6 @@ mod test { assert!(RE_GRANT_ADMIN.is_match("\\op @lain")); let c = RE_GRANT_ADMIN.captures("/op @lain@pleroma.soykaf.com"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "@lain@pleroma.soykaf.com"); } @@ -362,7 +467,6 @@ mod test { assert!(RE_REVOKE_ADMIN.is_match("/deadmin @lain")); let c = RE_REVOKE_ADMIN.captures("/deadmin @lain"); - assert!(c.is_some()); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "@lain"); } @@ -405,6 +509,27 @@ mod test { assert!(RE_MEMBERS.is_match("/who")); } + #[test] + fn test_members() { + assert!(!RE_TAGS.is_match("/members")); + assert!(RE_TAGS.is_match("/hashtags")); + assert!(RE_TAGS.is_match("dsfsd /tags dfgd d")); + } + + #[test] + fn test_match_tag() { + assert!(!RE_A_HASHTAG.is_match("banana sdfsdf sdfsd fdsf sd")); + assert!(RE_A_HASHTAG.is_match("#banana")); + assert!(RE_A_HASHTAG.is_match("#ласточка")); + assert!(RE_A_HASHTAG.is_match("#χαλβάς")); + assert!(RE_A_HASHTAG.is_match("foo #banana gfdfgd")); + + let c = RE_GRANT_ADMIN.captures("foo #banana #χαλβάς #ласточка."); + assert_eq!(c.unwrap().get(1).unwrap().as_str(), "banana"); + assert_eq!(c.unwrap().get(2).unwrap().as_str(), "χαλβάς"); + assert_eq!(c.unwrap().get(3).unwrap().as_str(), "ласточка"); + } + #[test] fn test_leave() { assert!(!RE_LEAVE.is_match("/list")); @@ -414,6 +539,15 @@ mod test { assert!(RE_LEAVE.is_match("/leave z")); } + #[test] + fn test_leave() { + assert!(!RE_JOIN.is_match("/list")); + assert!(RE_JOIN.is_match("/join")); + assert!(RE_JOIN.is_match("/join")); + assert!(RE_JOIN.is_match("x /join")); + assert!(RE_JOIN.is_match("/join z")); + } + #[test] fn test_announce() { assert!(!RE_ANNOUNCE.is_match("/list")); @@ -435,15 +569,15 @@ mod test { fn test_real_post() { assert_eq!( Vec::::new(), - parse_status("Hello there is nothing here /fake command") + parse_slash_commands("Hello there is nothing here /fake command") ); assert_eq!( vec![StatusCommand::Help], - parse_status("lets see some \\help and /ban @lain") + parse_slash_commands("lets see some \\help and /ban @lain") ); assert_eq!( vec![StatusCommand::Ignore], - parse_status("lets see some /ignore and /ban @lain") + parse_slash_commands("lets see some /ignore and /ban @lain") ); assert_eq!( vec![ @@ -451,7 +585,7 @@ mod test { StatusCommand::BanUser("piggo@piggo.space".to_string()), StatusCommand::BanServer("soykaf.com".to_string()) ], - parse_status("let's /ban @lain! /ban @piggo@piggo.space and also /ban soykaf.com") + parse_slash_commands("let's /ban @lain! /ban @piggo@piggo.space and also /ban soykaf.com") ); } @@ -459,13 +593,13 @@ mod test { fn test_strip() { assert_eq!( vec![StatusCommand::BanUser("betty".to_string())], - parse_status( + parse_slash_commands( r#"Let's bad the naughty bot: /ban @betty"# ) ); assert_eq!( vec![StatusCommand::BanUser("betty@abstarbauze.com".to_string())], - parse_status( + parse_slash_commands( r#"Let's bad the naughty bot: /ban @betty@abstarbauze.com"# ) ); diff --git a/src/group_handle.rs b/src/group_handle.rs index 1f262f4..23d935c 100644 --- a/src/group_handle.rs +++ b/src/group_handle.rs @@ -2,19 +2,21 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::{Duration, Instant}; +use elefren::{FediClient, StatusBuilder}; use elefren::debug::EventDisplay; use elefren::debug::NotificationDisplay; +use elefren::debug::StatusDisplay; use elefren::entities::event::Event; use elefren::entities::notification::{Notification, NotificationType}; +use elefren::entities::status::Status; use elefren::status_builder::Visibility; -use elefren::{FediClient, StatusBuilder}; use futures::StreamExt; use crate::command::StatusCommand; use crate::error::GroupError; -use crate::store::data::GroupConfig; use crate::store::ConfigStore; -use crate::utils::{normalize_acct, LogError}; +use crate::store::data::GroupConfig; +use crate::utils::{LogError, normalize_acct}; /// This is one group's config store capable of persistence #[derive(Debug)] @@ -27,6 +29,9 @@ pub struct GroupHandle { const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(1000); const MAX_CATCHUP_NOTIFS: usize = 25; +// also statuses +const MAX_CATCHUP_STATUSES: usize = 100; +// higher because we can expect a lot of non-hashtag statuses here const PERIODIC_SAVE: Duration = Duration::from_secs(60); const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! @@ -45,6 +50,7 @@ impl GroupHandle { Ok(()) } + /* pub async fn reload(&mut self) -> Result<(), GroupError> { if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { self.config = g; @@ -53,6 +59,7 @@ impl GroupHandle { Err(GroupError::GroupNotExist) } } + */ } trait NotifTimestamp { @@ -65,6 +72,14 @@ impl NotifTimestamp for Notification { } } +impl NotifTimestamp for Status { + fn timestamp_millis(&self) -> u64 { + // this may not work well for unseen status tracking, + // if ancient statuses were to appear in the timeline :( + self.created_at.timestamp_millis().max(0) as u64 + } +} + impl GroupHandle { pub async fn run(&mut self) -> Result<(), GroupError> { assert!(PERIODIC_SAVE >= PING_INTERVAL); @@ -89,6 +104,20 @@ impl GroupHandle { } } + match self.catch_up_with_missed_statuses().await { + Ok(true) => { + debug!("Some missed statuses handled"); + // Save asap! + next_save = Instant::now() - PERIODIC_SAVE + } + Ok(false) => { + debug!("No statuses missed"); + } + Err(e) => { + error!("Failed to handle missed statuses: {}", e); + } + } + loop { if next_save < Instant::now() { self.save_if_needed().await.log_error("Failed to save group"); @@ -104,7 +133,9 @@ impl GroupHandle { Ok(Some(event)) => { debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); match event { - Event::Update(_status) => {} + Event::Update(status) => { + self.handle_status(status).await.log_error("Error handling a status"); + } Event::Notification(n) => { self.handle_notification(n).await.log_error("Error handling a notification"); } @@ -156,7 +187,7 @@ impl GroupHandle { return Ok(()); } - let commands = crate::command::parse_status(&status.content); + let commands = crate::command::parse_slash_commands(&status.content); let mut replies = vec![]; let mut announcements = vec![]; @@ -205,6 +236,9 @@ impl GroupHandle { any_admin_cmd = true; replies.push(format!("User {} banned from group!", u)); + self.unfollow_user(&u).await + .log_error("Failed to unfollow"); + // no announcement here } Err(e) => { @@ -280,11 +314,7 @@ impl GroupHandle { Ok(_) => { any_admin_cmd = true; replies.push(format!("User {} added to the group!", u)); - - if self.config.is_member_only() { - announcements - .push(format!("Welcome new member @{} to the group!", u)); - } + self.follow_user(&u).await.log_error("Failed to follow"); } Err(e) => { replies.push(format!("Failed to add user {} to group: {}", u, e)); @@ -303,6 +333,9 @@ impl GroupHandle { Ok(_) => { any_admin_cmd = true; replies.push(format!("User {} removed from the group.", u)); + + self.unfollow_user(&u).await + .log_error("Failed to unfollow"); } Err(_) => { unreachable!() @@ -313,12 +346,33 @@ impl GroupHandle { replies.push("Only admins can manage members".to_string()); } } + StatusCommand::AddTag(tag) => { + if is_admin { + any_admin_cmd = true; + self.config.add_tag(&tag); + replies.push(format!("Tag #{} added to the group!", tag)); + } else { + replies.push("Only admins can manage group tags".to_string()); + } + } + StatusCommand::RemoveTag(tag) => { + if is_admin { + any_admin_cmd = true; + self.config.remove_tag(&tag); + replies.push(format!("Tag #{} removed from the group!", tag)); + } else { + replies.push("Only admins can manage group tags".to_string()); + } + } StatusCommand::GrantAdmin(u) => { let u = normalize_acct(&u, &group_acct)?; if is_admin { if !self.config.is_admin(&u) { match self.config.set_admin(&u, true) { Ok(_) => { + // try to make the config a little more sane, admins should be members + let _ = self.config.set_member(&u, true); + any_admin_cmd = true; replies.push(format!("User {} is now a group admin!", u)); announcements @@ -409,12 +463,13 @@ impl GroupHandle { **Supported commands:**\n\ `/boost, /b` - boost the replied-to post into the group\n\ `/ignore, /i` - make the group completely ignore the post\n\ - `/ping` - check that the service is alive".to_string(), + `/ping` - check that the service is alive\n\ + `/join` - join the group\n\ + `/leave` - leave the group".to_string(), ); if self.config.is_member_only() { replies.push("`/members, /who` - show group members / admins".to_string()); - replies.push("`/leave` - leave the group".to_string()); } else { replies.push("`/members, /who` - show group admins".to_string()); } @@ -441,18 +496,7 @@ impl GroupHandle { if is_admin { if self.config.is_member_only() { replies.push("Group members:".to_string()); - let admins = self.config.get_admins().collect::>(); - let mut members = self.config.get_members().collect::>(); - members.extend(admins.iter()); - members.sort(); - members.dedup(); - for m in members { - if admins.contains(&m) { - replies.push(format!("{} [admin]", m)); - } else { - replies.push(m.to_string()); - } - } + self.list_members(&mut replies); } else { show_admins = true; } @@ -462,18 +506,55 @@ impl GroupHandle { if show_admins { replies.push("Group admins:".to_string()); - let mut admins = self.config.get_admins().collect::>(); - admins.sort(); - for a in admins { - replies.push(a.to_string()); - } + self.list_admins(&mut replies); + } + } + StatusCommand::ListTags => { + replies.push("Group tags:".to_string()); + let mut tags = self.config.get_tags().collect::>(); + tags.sort(); + for t in tags { + replies.push(format!("#{}", t)); } } StatusCommand::Leave => { - if self.config.is_member(¬if_acct) { + if self.config.is_member_or_admin(¬if_acct) { + // admin can leave but that's a bad idea + any_admin_cmd = true; let _ = self.config.set_member(¬if_acct, false); - replies.push("You left the group.".to_string()); + replies.push("You're no longer a group member. Unfollow the group user to stop receiving group messages.".to_string()); + + self.unfollow_user(¬if_acct).await + .log_error("Failed to unfollow"); + } + } + StatusCommand::Join => { + if self.config.is_member_or_admin(¬if_acct) { + // Already a member, so let's try to follow the user + // again, maybe first time it failed + self.follow_user(¬if_acct).await + .log_error("Failed to follow"); + } else { + // Not a member yet + if self.config.is_member_only() { + // No you can't + replies.push(format!( + "Hi, this group is closed to new sign-ups.\n\ + Please ask one of the group admins to add you:")); + self.list_admins(&mut replies); + } else { + // Open access + self.follow_user(¬if_acct).await + .log_error("Failed to follow"); + + // This only fails if the user is banned, but that is filtered above + let _ = self.config.set_member(¬if_acct, true); + replies.push(format!("\ + Thanks for joining, you are now a member and the group user will \ + follow you so you can use group hashtags. Make sure you follow the \ + group user to receive group messages.")); + } } } StatusCommand::Ping => { @@ -524,43 +605,106 @@ impl GroupHandle { } NotificationType::Follow => { info!("New follower!"); - tokio::time::sleep(Duration::from_millis(500)).await; - - let text = if self.config.is_member_only() { - // Admins are listed without @, so they won't become handles here. - // Tagging all admins would be annoying. - let mut admins = self.config.get_admins().cloned().collect::>(); - admins.sort(); - format!( - "@{user} welcome to the group! This is a member-only group, you won't be \ - able to post. Ask the group admins if you wish to join!\n\n\ - Admins: {admins}", - user = notif_acct, - admins = admins.join(", ") - ) + + if self.config.is_member_or_admin(¬if_acct) { + // Already joined, just doing something silly, ignore this + debug!("User already a member, ignoring"); } else { - format!( - "@{user} welcome to the group! \ - To share a post, tag the group user. Use /help for more info.", - user = notif_acct - ) - }; - - let post = StatusBuilder::new() - .status(text) - .content_type("text/markdown") - .visibility(Visibility::Direct) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); + let text = if self.config.is_member_only() { + // Admins are listed without @, so they won't become handles here. + // Tagging all admins would be annoying. + let mut admins = self.config.get_admins().cloned().collect::>(); + admins.sort(); + format!( + "@{user} Hi, this is a member-only group, you won't be \ + able to post. You can still receive group posts though. If you'd like to join, \ + please ask one of the group admins to add you:\n\n\ + {admins}", + user = notif_acct, + admins = admins.join(", ") + ) + } else { + self.follow_user(¬if_acct).await + .log_error("Failed to follow"); + make_welcome_text(¬if_acct) + }; + + let post = StatusBuilder::new() + .status(text) + .content_type("text/markdown") + .visibility(Visibility::Direct) + .build() + .expect("error build status"); + + tokio::time::sleep(Duration::from_millis(500)).await; + let _ = self.client.new_status(post).await.log_error("Failed to post"); + } } - _ => {} + NotificationType::Favourite => {} + NotificationType::Reblog => {} } Ok(()) } + /// Handle a non-mention status + async fn handle_status(&mut self, s: Status) -> Result<(), GroupError> { + debug!("Handling status #{}", s.id); + let ts = s.timestamp_millis(); + self.config.set_last_status(ts); + + if !s.content.contains('#') { + debug!("No tags in status"); + return Ok(()); + } + + if s.visibility.is_private() { + debug!("Status is direct/private, not boosting"); + return Ok(()); + } + + if s.content.contains("/add ") || s.content.contains("/remove ") { + debug!("Discard, looks like a hashtag manipulation command"); + return Ok(()); + } + + let gu = self.config.get_acct(); + let su = normalize_acct(&s.account.acct, gu)?; + + if self.config.is_banned(&su) { + debug!("Status author @{} is banned.", su); + return Ok(()); + } + + if !self.config.is_member_or_admin(&su) { + debug!("Status author @{} is not a member.", su); + return Ok(()); + } + + let tags = crate::command::parse_status_tags(&s.content); + debug!("Tags in status: {:?}", tags); + + for t in tags { + if self.config.is_tag_followed(&t) { + self.client.reblog(&s.id).await + .log_error("Failed to reblog"); + break; + } + } + + Ok(()) + } + + async fn follow_user(&mut self, acct: &str) -> Result<(), GroupError> { + self.client.follow(acct).await?; + Ok(()) + } + + async fn unfollow_user(&mut self, acct: &str) -> Result<(), GroupError> { + self.client.unfollow(acct).await?; + Ok(()) + } + /// Catch up with missed notifications, returns true if any were handled async fn catch_up_with_missed_notifications(&mut self) -> Result { let last_notif = self.config.get_last_notif(); @@ -601,4 +745,92 @@ impl GroupHandle { Ok(true) } + + /// Catch up with missed statuses, returns true if any were handled + async fn catch_up_with_missed_statuses(&mut self) -> Result { + let last_status = self.config.get_last_status(); + + let notifications = self.client.get_home_timeline().await?; + let mut iter = notifications.items_iter(); + + let mut statuses_to_handle = vec![]; + + // They are retrieved newest first, but we want oldest first for chronological handling + + let mut num = 0; + while let Some(s) = iter.next_item().await { + let ts = s.timestamp_millis(); + + if ts <= last_status { + break; // reached our last seen status (hopefully there arent any retro-bumped) + } + if s.content.contains('#') && !s.visibility.is_private() { + statuses_to_handle.push(s); + } + num += 1; + if num > MAX_CATCHUP_STATUSES { + warn!("Too many statuses missed to catch up!"); + break; + } + } + + if statuses_to_handle.is_empty() { + return Ok(false); + } + + statuses_to_handle.reverse(); + + debug!("{} statuses to catch up!", statuses_to_handle.len()); + + for s in statuses_to_handle { + debug!("Handling missed status: {}", StatusDisplay(&s)); + self.handle_status(s).await + .log_error("Error handling a status"); + } + + Ok(true) + } + + fn list_admins(&self, replies: &mut Vec) { + let mut admins = self.config.get_admins().collect::>(); + admins.sort(); + for a in admins { + replies.push(a.to_string()); + } + } + + fn list_members(&self, replies: &mut Vec) { + let admins = self.config.get_admins().collect::>(); + let mut members = self.config.get_members().collect::>(); + members.extend(admins.iter()); + members.sort(); + members.dedup(); + for m in members { + if admins.contains(&m) { + replies.push(format!("{} [admin]", m)); + } else { + replies.push(m.to_string()); + } + } + } +} + +fn make_welcome_text(user: &str) -> String { + format!( + "@{user} Welcome to the group! To share a post, tag the group user \ + or use one of the group hashtags. Use /help for more info.", + user = user + ) +} + + +trait VisExt: Copy { + /// Check if is private or direct + fn is_private(self) -> bool; +} + +impl VisExt for Visibility { + fn is_private(self) -> bool { + self == Visibility::Direct || self == Visibility::Private + } } diff --git a/src/store/data.rs b/src/store/data.rs index db90519..495fa6a 100644 --- a/src/store/data.rs +++ b/src/store/data.rs @@ -33,6 +33,8 @@ pub(crate) struct GroupConfig { acct: String, /// elefren data appdata: AppData, + /// Hashtags the group will auto-boost from it's members + group_tags: HashSet, /// List of admin account "acct" names, e.g. piggo@piggo.space admin_users: HashSet, /// List of users allowed to post to the group, if it is member-only @@ -43,9 +45,10 @@ pub(crate) struct GroupConfig { member_only: bool, /// Banned domain names, e.g. kiwifarms.cc banned_servers: HashSet, - /// Last seen notification timestamp + /// Last seen notification timestamp (millis) last_notif_ts: u64, - + /// Last seen status timestamp (millis) + last_status_ts: u64, #[serde(skip)] dirty: bool, } @@ -62,12 +65,14 @@ impl Default for GroupConfig { redirect: Default::default(), token: Default::default(), }, + group_tags: Default::default(), admin_users: Default::default(), member_users: Default::default(), banned_users: Default::default(), member_only: false, banned_servers: Default::default(), last_notif_ts: 0, + last_status_ts: 0, dirty: false, } } @@ -86,10 +91,12 @@ impl GroupConfig { self.enabled } + /* pub(crate) fn set_enabled(&mut self, ena: bool) { self.enabled = ena; self.mark_dirty(); } + */ pub(crate) fn get_appdata(&self) -> &AppData { &self.appdata @@ -108,6 +115,10 @@ impl GroupConfig { self.member_users.iter() } + pub(crate) fn get_tags(&self) -> impl Iterator { + self.group_tags.iter() + } + pub(crate) fn set_last_notif(&mut self, ts: u64) { self.last_notif_ts = self.last_notif_ts.max(ts); self.mark_dirty(); @@ -117,6 +128,15 @@ impl GroupConfig { self.last_notif_ts } + pub(crate) fn set_last_status(&mut self, ts: u64) { + self.last_status_ts = self.last_status_ts.max(ts); + self.mark_dirty(); + } + + pub(crate) fn get_last_status(&self) -> u64 { + self.last_status_ts + } + pub(crate) fn get_acct(&self) -> &str { &self.acct } @@ -129,6 +149,11 @@ impl GroupConfig { self.member_users.contains(acct) } + pub(crate) fn is_member_or_admin(&self, acct: &str) -> bool { + self.is_member(acct) + || self.is_admin(acct) + } + pub(crate) fn is_banned(&self, acct: &str) -> bool { self.banned_users.contains(acct) || self.is_users_server_banned(acct) } @@ -205,6 +230,20 @@ impl GroupConfig { Ok(()) } + pub(crate) fn add_tag(&mut self, tag: &str) { + self.group_tags.insert(tag.to_string()); + self.mark_dirty(); + } + + pub(crate) fn remove_tag(&mut self, tag: &str) { + self.group_tags.remove(tag); + self.mark_dirty(); + } + + pub(crate) fn is_tag_followed(&self, tag: &str) -> bool { + self.group_tags.contains(tag) + } + pub(crate) fn set_member_only(&mut self, member_only: bool) { self.member_only = member_only; self.mark_dirty(); diff --git a/src/store/mod.rs b/src/store/mod.rs index f6d1e03..9d03306 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -148,10 +148,12 @@ impl ConfigStore { .collect() } + /* pub(crate) async fn get_group_config(&self, group: &str) -> Option { let c = self.data.read().await; c.get_group_config(group).cloned() } + */ //noinspection RsSelfConvention /// Set group config to the store. The store then saved. From 8afc77dd60d46c65301468df94ba72b0869b0b48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Wed, 25 Aug 2021 21:06:12 +0200 Subject: [PATCH 2/4] tags wip --- .gitignore | 1 + Cargo.lock | 2 +- Cargo.toml | 2 +- src/command.rs | 37 +- src/group_handle.rs | 975 ++++++++++++++++++++++++-------------------- src/store/data.rs | 2 +- src/store/mod.rs | 21 +- 7 files changed, 567 insertions(+), 473 deletions(-) diff --git a/.gitignore b/.gitignore index 5a62f49..1758c24 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ group-actor-data.toml .idea/ groups.json fedigroups +*.bak diff --git a/Cargo.lock b/Cargo.lock index bb2bd64..8cf9def 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,7 +327,7 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" [[package]] name = "fedigroups" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index 8fec97f..8449619 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fedigroups" -version = "0.1.0" +version = "0.2.0" authors = ["Ondřej Hruška "] edition = "2018" publish = false diff --git a/src/command.rs b/src/command.rs index 394f73e..97046bb 100644 --- a/src/command.rs +++ b/src/command.rs @@ -122,11 +122,12 @@ static RE_ANNOUNCE: once_cell::sync::Lazy = Lazy::new(|| Regex::new(concat!(r"(?:^|\s|>|\n)[\\/]announce\s+(.*)$")).unwrap()); static RE_A_HASHTAG: once_cell::sync::Lazy = - Lazy::new(|| Regex::new(concat!(r"\b#(\w+)")).unwrap()); + Lazy::new(|| Regex::new(concat!(r"(?:^|\b|\s|>|\n)#(\w+)")).unwrap()); pub fn parse_status_tags(content: &str) -> Vec { debug!("Raw content: {}", content); - let content = content.replace("
", " "); + let content = content.replace("
", "
"); + let content = content.replace("

", "

"); let content = voca_rs::strip::strip_tags(&content); debug!("Stripped tags: {}", content); @@ -143,10 +144,8 @@ pub fn parse_status_tags(content: &str) -> Vec { pub fn parse_slash_commands(content: &str) -> Vec { debug!("Raw content: {}", content); - let content = content.replace("
", " "); - // let content = content.replace("
", " "); - // let content = content.replace("
", " "); - // let content = content.replace("
", " "); + let content = content.replace("
", "
"); + let content = content.replace("

", "

"); let content = voca_rs::strip::strip_tags(&content); debug!("Stripped tags: {}", content); @@ -423,13 +422,14 @@ mod test { } #[test] - fn test_add_member() { + fn test_add_tag() { assert!(RE_ADD_TAG.is_match("/add #breadposting")); assert!(RE_ADD_TAG.is_match("/add #čučkaři")); assert!(RE_ADD_TAG.is_match("/add #χαλβάς")); assert!(RE_ADD_TAG.is_match("\\add #ласточка")); assert!(RE_ADD_TAG.is_match("/add #nya.")); assert!(RE_ADD_TAG.is_match("/add #nya)")); + assert!(RE_ADD_TAG.is_match("/add #nya and more)")); let c = RE_ADD_TAG.captures("/add #breadposting"); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "breadposting"); @@ -440,10 +440,10 @@ mod test { let c = RE_ADD_TAG.captures("/add #ласточка"); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "ласточка"); - let c = RE_ADD_TAG.captures("#nya."); + let c = RE_ADD_TAG.captures("/add #nya."); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "nya"); - let c = RE_ADD_TAG.captures("#nya)"); + let c = RE_ADD_TAG.captures("/add #nya)"); assert_eq!(c.unwrap().get(1).unwrap().as_str(), "nya"); } @@ -510,7 +510,7 @@ mod test { } #[test] - fn test_members() { + fn test_tags() { assert!(!RE_TAGS.is_match("/members")); assert!(RE_TAGS.is_match("/hashtags")); assert!(RE_TAGS.is_match("dsfsd /tags dfgd d")); @@ -524,10 +524,17 @@ mod test { assert!(RE_A_HASHTAG.is_match("#χαλβάς")); assert!(RE_A_HASHTAG.is_match("foo #banana gfdfgd")); - let c = RE_GRANT_ADMIN.captures("foo #banana #χαλβάς #ласточка."); - assert_eq!(c.unwrap().get(1).unwrap().as_str(), "banana"); - assert_eq!(c.unwrap().get(2).unwrap().as_str(), "χαλβάς"); - assert_eq!(c.unwrap().get(3).unwrap().as_str(), "ласточка"); + for (i, c) in RE_A_HASHTAG.captures_iter("foo #banana #χαλβάς #ласточка").enumerate() { + if i == 0 { + assert_eq!(c.get(1).unwrap().as_str(), "banana"); + } + else if i == 1 { + assert_eq!(c.get(1).unwrap().as_str(), "χαλβάς"); + } + else if i == 2 { + assert_eq!(c.get(1).unwrap().as_str(), "ласточка"); + } + } } #[test] @@ -540,7 +547,7 @@ mod test { } #[test] - fn test_leave() { + fn test_join() { assert!(!RE_JOIN.is_match("/list")); assert!(RE_JOIN.is_match("/join")); assert!(RE_JOIN.is_match("/join")); diff --git a/src/group_handle.rs b/src/group_handle.rs index 23d935c..82a2bce 100644 --- a/src/group_handle.rs +++ b/src/group_handle.rs @@ -27,10 +27,10 @@ pub struct GroupHandle { } const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); -const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(1000); +const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); const MAX_CATCHUP_NOTIFS: usize = 25; // also statuses -const MAX_CATCHUP_STATUSES: usize = 100; +const MAX_CATCHUP_STATUSES: usize = 50; // higher because we can expect a lot of non-hashtag statuses here const PERIODIC_SAVE: Duration = Duration::from_secs(60); const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! @@ -39,6 +39,7 @@ impl GroupHandle { pub async fn save(&mut self) -> Result<(), GroupError> { debug!("Saving group config & status"); self.store.set_group_config(self.config.clone()).await?; + debug!("Saved"); self.config.clear_dirty_status(); Ok(()) } @@ -84,17 +85,17 @@ impl GroupHandle { pub async fn run(&mut self) -> Result<(), GroupError> { assert!(PERIODIC_SAVE >= PING_INTERVAL); - let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start - loop { debug!("Opening streaming API socket"); + let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start let mut events = self.client.streaming_user().await?; + let socket_open_time = Instant::now(); + let mut last_rx = Instant::now(); + let mut last_ping = Instant::now(); match self.catch_up_with_missed_notifications().await { Ok(true) => { debug!("Some missed notifs handled"); - // Save asap! - next_save = Instant::now() - PERIODIC_SAVE } Ok(false) => { debug!("No notifs missed"); @@ -107,8 +108,6 @@ impl GroupHandle { match self.catch_up_with_missed_statuses().await { Ok(true) => { debug!("Some missed statuses handled"); - // Save asap! - next_save = Instant::now() - PERIODIC_SAVE } Ok(false) => { debug!("No statuses missed"); @@ -118,12 +117,29 @@ impl GroupHandle { } } - loop { + if self.config.is_dirty() { + // save asap + next_save = Instant::now() - PERIODIC_SAVE + } + + 'rx: loop { if next_save < Instant::now() { + debug!("Save time elapsed, saving if needed"); self.save_if_needed().await.log_error("Failed to save group"); next_save = Instant::now() + PERIODIC_SAVE; } + if last_rx.elapsed() > PING_INTERVAL * 2 { + warn!("Socket idle too long, close"); + break 'rx; + } + + if socket_open_time.elapsed() > Duration::from_secs(120) { + debug!("Socket open too long, closing"); + break 'rx; + } + + debug!("Await msg"); let timeout = next_save .saturating_duration_since(Instant::now()) .min(PING_INTERVAL) @@ -131,6 +147,7 @@ impl GroupHandle { match tokio::time::timeout(timeout, events.next()).await { Ok(Some(event)) => { + last_rx = Instant::now(); debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); match event { Event::Update(status) => { @@ -141,19 +158,26 @@ impl GroupHandle { } Event::Delete(_id) => {} Event::FiltersChanged => {} + Event::Heartbeat => {} } } Ok(None) => { warn!("Group @{} socket closed, restarting...", self.config.get_acct()); - break; + break 'rx; } Err(_) => { // Timeout so we can save if needed } } - trace!("Pinging"); - events.send_ping().await.log_error("Fail to send ping"); + if last_ping.elapsed() > PING_INTERVAL { + last_ping = Instant::now(); + debug!("Pinging"); + if events.send_ping() + .await.is_err() { + break 'rx; + } + } } warn!("Notif stream closed, will reopen"); @@ -167,10 +191,13 @@ impl GroupHandle { self.config.set_last_notif(ts); let group_acct = self.config.get_acct().to_string(); + let notif_user_id = &n.account.id; let notif_acct = normalize_acct(&n.account.acct, &group_acct)?; - let can_write = self.config.can_write(¬if_acct); - let is_admin = self.config.is_admin(¬if_acct); + if notif_acct == group_acct { + debug!("This is our post, ignore that"); + return Ok(()); + } if self.config.is_banned(¬if_acct) { warn!("Notification actor {} is banned!", notif_acct); @@ -180,427 +207,7 @@ impl GroupHandle { match n.notification_type { NotificationType::Mention => { if let Some(status) = n.status { - let status_acct = normalize_acct(&status.account.acct, &group_acct)?; - - if self.config.is_banned(&status_acct) { - warn!("Status author {} is banned!", status_acct); - return Ok(()); - } - - let commands = crate::command::parse_slash_commands(&status.content); - - let mut replies = vec![]; - let mut announcements = vec![]; - let mut do_boost_prev_post = false; - let mut any_admin_cmd = false; - - if commands.is_empty() { - debug!("No commands in post"); - if status.in_reply_to_id.is_none() { - if can_write { - // Someone tagged the group in OP, boost it. - info!("Boosting OP mention"); - tokio::time::sleep(DELAY_BEFORE_ACTION).await; - self.client.reblog(&status.id).await.log_error("Failed to boost"); - } else { - replies.push("You are not allowed to post to this group".to_string()); - } - } else { - debug!("Not OP, ignore mention"); - } - } else { - for cmd in commands { - match cmd { - // ignore is first on purpose - StatusCommand::Ignore => { - debug!("Notif ignored because of ignore command"); - return Ok(()); - } - StatusCommand::Announce(a) => { - info!("Sending PSA"); - announcements.push(a); - } - StatusCommand::Boost => { - if can_write { - do_boost_prev_post = status.in_reply_to_id.is_some(); - } else { - replies.push("You are not allowed to share to this group".to_string()); - } - } - StatusCommand::BanUser(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_banned(&u) { - match self.config.ban_user(&u, true) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} banned from group!", u)); - - self.unfollow_user(&u).await - .log_error("Failed to unfollow"); - - // no announcement here - } - Err(e) => { - replies.push(format!("Failed to ban user {}: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage user bans".to_string()); - } - } - StatusCommand::UnbanUser(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_banned(&u) { - match self.config.ban_user(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} un-banned!", u)); - - // no announcement here - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage user bans".to_string()); - } - } - StatusCommand::BanServer(s) => { - if is_admin { - if !self.config.is_server_banned(&s) { - match self.config.ban_server(&s, true) { - Ok(_) => { - any_admin_cmd = true; - announcements.push(format!("Server \"{}\" has been banned.", s)); - replies.push(format!("Server {} banned from group!", s)); - } - Err(e) => { - replies.push(format!("Failed to ban server {}: {}", s, e)); - } - } - } - } else { - replies.push("Only admins can manage server bans".to_string()); - } - } - StatusCommand::UnbanServer(s) => { - if is_admin { - if self.config.is_server_banned(&s) { - match self.config.ban_server(&s, false) { - Ok(_) => { - any_admin_cmd = true; - announcements.push(format!("Server \"{}\" has been un-banned.", s)); - replies.push(format!("Server {} un-banned!", s)); - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage server bans".to_string()); - } - } - StatusCommand::AddMember(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_member(&u) { - match self.config.set_member(&u, true) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} added to the group!", u)); - self.follow_user(&u).await.log_error("Failed to follow"); - } - Err(e) => { - replies.push(format!("Failed to add user {} to group: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage members".to_string()); - } - } - StatusCommand::RemoveMember(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_member(&u) { - match self.config.set_member(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} removed from the group.", u)); - - self.unfollow_user(&u).await - .log_error("Failed to unfollow"); - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage members".to_string()); - } - } - StatusCommand::AddTag(tag) => { - if is_admin { - any_admin_cmd = true; - self.config.add_tag(&tag); - replies.push(format!("Tag #{} added to the group!", tag)); - } else { - replies.push("Only admins can manage group tags".to_string()); - } - } - StatusCommand::RemoveTag(tag) => { - if is_admin { - any_admin_cmd = true; - self.config.remove_tag(&tag); - replies.push(format!("Tag #{} removed from the group!", tag)); - } else { - replies.push("Only admins can manage group tags".to_string()); - } - } - StatusCommand::GrantAdmin(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_admin(&u) { - match self.config.set_admin(&u, true) { - Ok(_) => { - // try to make the config a little more sane, admins should be members - let _ = self.config.set_member(&u, true); - - any_admin_cmd = true; - replies.push(format!("User {} is now a group admin!", u)); - announcements - .push(format!("User @{} can now manage this group!", u)); - } - Err(e) => { - replies.push(format!( - "Failed to make user {} a group admin: {}", - u, e - )); - } - } - } - } else { - replies.push("Only admins can manage admins".to_string()); - } - } - StatusCommand::RemoveAdmin(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_admin(&u) { - match self.config.set_admin(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} is no longer a group admin!", u)); - announcements - .push(format!("User @{} no longer manages this group.", u)); - } - Err(e) => { - replies - .push(format!("Failed to revoke {}'s group admin: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage admins".to_string()); - } - } - StatusCommand::OpenGroup => { - if is_admin { - if self.config.is_member_only() { - any_admin_cmd = true; - self.config.set_member_only(false); - replies.push("Group changed to open-access".to_string()); - announcements.push("This group is now open-access!".to_string()); - } - } else { - replies.push("Only admins can set group options".to_string()); - } - } - StatusCommand::CloseGroup => { - if is_admin { - if !self.config.is_member_only() { - any_admin_cmd = true; - self.config.set_member_only(true); - replies.push("Group changed to member-only".to_string()); - announcements.push("This group is now member-only!".to_string()); - } - } else { - replies.push("Only admins can set group options".to_string()); - } - } - StatusCommand::Help => { - if self.config.is_member_only() { - let mut s = "This is a member-only group. ".to_string(); - if self.config.can_write(¬if_acct) { - if is_admin { - s.push_str("*You are an admin.*"); - } else { - s.push_str("*You are a member.*"); - } - } else { - s.push_str("*You are not a member, ask one of the admins to add you.*"); - } - replies.push(s); - } else { - let mut s = "This is a public-access group. ".to_string(); - if is_admin { - s.push_str("*You are an admin.*"); - } - replies.push(s); - } - - replies.push( - "\nTo share an original post, mention the group user.\n\ - Replies and mentions with commands won't be shared.\n\ - \n\ - **Supported commands:**\n\ - `/boost, /b` - boost the replied-to post into the group\n\ - `/ignore, /i` - make the group completely ignore the post\n\ - `/ping` - check that the service is alive\n\ - `/join` - join the group\n\ - `/leave` - leave the group".to_string(), - ); - - if self.config.is_member_only() { - replies.push("`/members, /who` - show group members / admins".to_string()); - } else { - replies.push("`/members, /who` - show group admins".to_string()); - } - - if is_admin { - replies.push( - "\n\ - **Admin commands:**\n\ - `/add user` - add a member (use e-mail style address)\n\ - `/kick, /remove user` - kick a member\n\ - `/ban x` - ban a user or a server\n\ - `/unban x` - lift a ban\n\ - `/op, /admin user` - grant admin rights\n\ - `/deop, /deadmin user` - revoke admin rights\n\ - `/opengroup` - make member-only\n\ - `/closegroup` - make public-access\n\ - `/announce x` - make a public announcement from the rest of the status" - .to_string(), - ); - } - } - StatusCommand::ListMembers => { - let mut show_admins = false; - if is_admin { - if self.config.is_member_only() { - replies.push("Group members:".to_string()); - self.list_members(&mut replies); - } else { - show_admins = true; - } - } else { - show_admins = true; - } - - if show_admins { - replies.push("Group admins:".to_string()); - self.list_admins(&mut replies); - } - } - StatusCommand::ListTags => { - replies.push("Group tags:".to_string()); - let mut tags = self.config.get_tags().collect::>(); - tags.sort(); - for t in tags { - replies.push(format!("#{}", t)); - } - } - StatusCommand::Leave => { - if self.config.is_member_or_admin(¬if_acct) { - // admin can leave but that's a bad idea - - any_admin_cmd = true; - let _ = self.config.set_member(¬if_acct, false); - replies.push("You're no longer a group member. Unfollow the group user to stop receiving group messages.".to_string()); - - self.unfollow_user(¬if_acct).await - .log_error("Failed to unfollow"); - } - } - StatusCommand::Join => { - if self.config.is_member_or_admin(¬if_acct) { - // Already a member, so let's try to follow the user - // again, maybe first time it failed - self.follow_user(¬if_acct).await - .log_error("Failed to follow"); - } else { - // Not a member yet - if self.config.is_member_only() { - // No you can't - replies.push(format!( - "Hi, this group is closed to new sign-ups.\n\ - Please ask one of the group admins to add you:")); - self.list_admins(&mut replies); - } else { - // Open access - self.follow_user(¬if_acct).await - .log_error("Failed to follow"); - - // This only fails if the user is banned, but that is filtered above - let _ = self.config.set_member(¬if_acct, true); - replies.push(format!("\ - Thanks for joining, you are now a member and the group user will \ - follow you so you can use group hashtags. Make sure you follow the \ - group user to receive group messages.")); - } - } - } - StatusCommand::Ping => { - replies.push("Pong".to_string()); - } - } - } - - tokio::time::sleep(DELAY_BEFORE_ACTION).await; - } - - if do_boost_prev_post { - self.client - .reblog(&status.in_reply_to_id.as_ref().unwrap()) - .await - .log_error("Failed to boost"); - } - - if !replies.is_empty() { - let r = replies.join("\n"); - - let post = StatusBuilder::new() - .status(format!("@{user}\n{msg}", user = notif_acct, msg = r)) - .content_type("text/markdown") - .visibility(Visibility::Direct) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - - if !announcements.is_empty() { - let msg = announcements.join("\n"); - let post = StatusBuilder::new() - .status(format!("**📢 Group announcement**\n{msg}", msg = msg)) - .content_type("text/markdown") - .visibility(Visibility::Public) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - - if any_admin_cmd { - self.save_if_needed().await.log_error("Failed to save"); - } + self.handle_mention_status(status).await?; } } NotificationType::Follow => { @@ -624,7 +231,7 @@ impl GroupHandle { admins = admins.join(", ") ) } else { - self.follow_user(¬if_acct).await + self.follow_user(notif_user_id).await .log_error("Failed to follow"); make_welcome_text(¬if_acct) }; @@ -636,7 +243,7 @@ impl GroupHandle { .build() .expect("error build status"); - tokio::time::sleep(Duration::from_millis(500)).await; + // tokio::time::sleep(Duration::from_millis(500)).await; let _ = self.client.new_status(post).await.log_error("Failed to post"); } } @@ -653,6 +260,22 @@ impl GroupHandle { let ts = s.timestamp_millis(); self.config.set_last_status(ts); + let group_user = self.config.get_acct(); + let status_user = normalize_acct(&s.account.acct, group_user)?; + + if status_user == group_user { + debug!("This is our post, ignore that"); + return Ok(()); + } + + // for m in &s.mentions { + // let ma = normalize_acct(&m.acct, gu)?; + // if ma == gu { + // debug!("Mention detected, handle status as mention notification!"); + // return self.handle_mention_status(s).await; + // } + // } + if !s.content.contains('#') { debug!("No tags in status"); return Ok(()); @@ -668,16 +291,14 @@ impl GroupHandle { return Ok(()); } - let gu = self.config.get_acct(); - let su = normalize_acct(&s.account.acct, gu)?; - if self.config.is_banned(&su) { - debug!("Status author @{} is banned.", su); + if self.config.is_banned(&status_user) { + debug!("Status author @{} is banned.", status_user); return Ok(()); } - if !self.config.is_member_or_admin(&su) { - debug!("Status author @{} is not a member.", su); + if !self.config.is_member_or_admin(&status_user) { + debug!("Status author @{} is not a member.", status_user); return Ok(()); } @@ -695,13 +316,13 @@ impl GroupHandle { Ok(()) } - async fn follow_user(&mut self, acct: &str) -> Result<(), GroupError> { - self.client.follow(acct).await?; + async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { + self.client.follow(id).await?; Ok(()) } - async fn unfollow_user(&mut self, acct: &str) -> Result<(), GroupError> { - self.client.unfollow(acct).await?; + async fn unfollow_user(&mut self, id: &str) -> Result<(), GroupError> { + self.client.unfollow(id).await?; Ok(()) } @@ -722,12 +343,17 @@ impl GroupHandle { if ts <= last_notif { break; // reached our last seen notif } + + debug!("Inspecting notif {}", NotificationDisplay(&n)); notifs_to_handle.push(n); num += 1; if num > MAX_CATCHUP_NOTIFS { warn!("Too many notifs missed to catch up!"); break; } + + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_millis(250)).await; } if notifs_to_handle.is_empty() { @@ -750,20 +376,28 @@ impl GroupHandle { async fn catch_up_with_missed_statuses(&mut self) -> Result { let last_status = self.config.get_last_status(); - let notifications = self.client.get_home_timeline().await?; - let mut iter = notifications.items_iter(); + let statuses = self.client.get_home_timeline().await?; + let mut iter = statuses.items_iter(); let mut statuses_to_handle = vec![]; // They are retrieved newest first, but we want oldest first for chronological handling + let mut newest_status = None; + let mut num = 0; while let Some(s) = iter.next_item().await { let ts = s.timestamp_millis(); - if ts <= last_status { break; // reached our last seen status (hopefully there arent any retro-bumped) } + + debug!("Inspecting status {}", StatusDisplay(&s)); + + if newest_status.is_none() { + newest_status = Some(ts); + } + if s.content.contains('#') && !s.visibility.is_private() { statuses_to_handle.push(s); } @@ -772,6 +406,13 @@ impl GroupHandle { warn!("Too many statuses missed to catch up!"); break; } + + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_millis(250)).await; + } + + if let Some(ts) = newest_status { + self.config.set_last_status(ts); } if statuses_to_handle.is_empty() { @@ -813,6 +454,440 @@ impl GroupHandle { } } } + async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { + let group_acct = self.config.get_acct().to_string(); + let status_acct = normalize_acct(&status.account.acct, &group_acct)?; + let status_user_id = &status.account.id; + + let can_write = self.config.can_write(&status_acct); + let is_admin = self.config.is_admin(&status_acct); + + if self.config.is_banned(&status_acct) { + warn!("Status author {} is banned!", status_acct); + return Ok(()); + } + + let commands = crate::command::parse_slash_commands(&status.content); + + let mut replies = vec![]; + let mut announcements = vec![]; + let mut do_boost_prev_post = false; + let mut any_admin_cmd = false; + let mut want_markdown = false; + + if commands.is_empty() { + debug!("No commands in post"); + if status.in_reply_to_id.is_none() { + if can_write { + // Someone tagged the group in OP, boost it. + info!("Boosting OP mention"); + // tokio::time::sleep(DELAY_BEFORE_ACTION).await; + self.client.reblog(&status.id).await.log_error("Failed to boost"); + } else { + replies.push("You are not allowed to post to this group".to_string()); + } + } else { + debug!("Not OP, ignore mention"); + } + } else { + for cmd in commands { + match cmd { + // ignore is first on purpose + StatusCommand::Ignore => { + debug!("Notif ignored because of ignore command"); + return Ok(()); + } + StatusCommand::Announce(a) => { + info!("Sending PSA"); + announcements.push(a); + } + StatusCommand::Boost => { + if can_write { + do_boost_prev_post = status.in_reply_to_id.is_some(); + } else { + replies.push("You are not allowed to share to this group".to_string()); + } + } + StatusCommand::BanUser(u) => { + let u = normalize_acct(&u, &group_acct)?; + if is_admin { + if !self.config.is_banned(&u) { + match self.config.ban_user(&u, true) { + Ok(_) => { + any_admin_cmd = true; + replies.push(format!("User {} banned from group!", u)); + + // FIXME we need user ID, not handle - get it via API? + // self.unfollow_user(&u).await + // .log_error("Failed to unfollow"); + + // no announcement here + } + Err(e) => { + replies.push(format!("Failed to ban user {}: {}", u, e)); + } + } + } + } else { + replies.push("Only admins can manage user bans".to_string()); + } + } + StatusCommand::UnbanUser(u) => { + let u = normalize_acct(&u, &group_acct)?; + if is_admin { + if self.config.is_banned(&u) { + match self.config.ban_user(&u, false) { + Ok(_) => { + any_admin_cmd = true; + replies.push(format!("User {} un-banned!", u)); + + // no announcement here + } + Err(_) => { + unreachable!() + } + } + } + } else { + replies.push("Only admins can manage user bans".to_string()); + } + } + StatusCommand::BanServer(s) => { + if is_admin { + if !self.config.is_server_banned(&s) { + match self.config.ban_server(&s, true) { + Ok(_) => { + any_admin_cmd = true; + announcements.push(format!("Server \"{}\" has been banned.", s)); + replies.push(format!("Server {} banned from group!", s)); + } + Err(e) => { + replies.push(format!("Failed to ban server {}: {}", s, e)); + } + } + } + } else { + replies.push("Only admins can manage server bans".to_string()); + } + } + StatusCommand::UnbanServer(s) => { + if is_admin { + if self.config.is_server_banned(&s) { + match self.config.ban_server(&s, false) { + Ok(_) => { + any_admin_cmd = true; + announcements.push(format!("Server \"{}\" has been un-banned.", s)); + replies.push(format!("Server {} un-banned!", s)); + } + Err(_) => { + unreachable!() + } + } + } + } else { + replies.push("Only admins can manage server bans".to_string()); + } + } + StatusCommand::AddMember(u) => { + let u = normalize_acct(&u, &group_acct)?; + if is_admin { + if !self.config.is_member(&u) { + match self.config.set_member(&u, true) { + Ok(_) => { + any_admin_cmd = true; + replies.push(format!("User {} added to the group!", u)); + self.follow_user(status_user_id).await.log_error("Failed to follow"); + } + Err(e) => { + replies.push(format!("Failed to add user {} to group: {}", u, e)); + } + } + } + } else { + replies.push("Only admins can manage members".to_string()); + } + } + StatusCommand::RemoveMember(u) => { + let u = normalize_acct(&u, &group_acct)?; + if is_admin { + if self.config.is_member(&u) { + match self.config.set_member(&u, false) { + Ok(_) => { + any_admin_cmd = true; + replies.push(format!("User {} removed from the group.", u)); + + // FIXME we need user ID, not handle - get it via API? + // self.unfollow_user(&u).await + // .log_error("Failed to unfollow"); + } + Err(_) => { + unreachable!() + } + } + } + } else { + replies.push("Only admins can manage members".to_string()); + } + } + StatusCommand::AddTag(tag) => { + if is_admin { + any_admin_cmd = true; + self.config.add_tag(&tag); + replies.push(format!("Tag #{} added to the group!", tag)); + } else { + replies.push("Only admins can manage group tags".to_string()); + } + } + StatusCommand::RemoveTag(tag) => { + if is_admin { + any_admin_cmd = true; + self.config.remove_tag(&tag); + replies.push(format!("Tag #{} removed from the group!", tag)); + } else { + replies.push("Only admins can manage group tags".to_string()); + } + } + StatusCommand::GrantAdmin(u) => { + let u = normalize_acct(&u, &group_acct)?; + if is_admin { + if !self.config.is_admin(&u) { + match self.config.set_admin(&u, true) { + Ok(_) => { + // try to make the config a little more sane, admins should be members + let _ = self.config.set_member(&u, true); + + any_admin_cmd = true; + replies.push(format!("User {} is now a group admin!", u)); + announcements + .push(format!("User @{} can now manage this group!", u)); + } + Err(e) => { + replies.push(format!( + "Failed to make user {} a group admin: {}", + u, e + )); + } + } + } + } else { + replies.push("Only admins can manage admins".to_string()); + } + } + StatusCommand::RemoveAdmin(u) => { + let u = normalize_acct(&u, &group_acct)?; + if is_admin { + if self.config.is_admin(&u) { + match self.config.set_admin(&u, false) { + Ok(_) => { + any_admin_cmd = true; + replies.push(format!("User {} is no longer a group admin!", u)); + announcements + .push(format!("User @{} no longer manages this group.", u)); + } + Err(e) => { + replies + .push(format!("Failed to revoke {}'s group admin: {}", u, e)); + } + } + } + } else { + replies.push("Only admins can manage admins".to_string()); + } + } + StatusCommand::OpenGroup => { + if is_admin { + if self.config.is_member_only() { + any_admin_cmd = true; + self.config.set_member_only(false); + replies.push("Group changed to open-access".to_string()); + announcements.push("This group is now open-access!".to_string()); + } + } else { + replies.push("Only admins can set group options".to_string()); + } + } + StatusCommand::CloseGroup => { + if is_admin { + if !self.config.is_member_only() { + any_admin_cmd = true; + self.config.set_member_only(true); + replies.push("Group changed to member-only".to_string()); + announcements.push("This group is now member-only!".to_string()); + } + } else { + replies.push("Only admins can set group options".to_string()); + } + } + StatusCommand::Help => { + want_markdown = true; + + if self.config.is_member_only() { + replies.push("This is a member-only group. ".to_string()); + } else { + replies.push("This is a public-access group. ".to_string()); + } + + if self.config.can_write(&status_acct) { + if is_admin { + replies.push("*You are an admin.*".to_string()); + } else { + replies.push("*You are a member.*".to_string()); + } + } else { + if self.config.is_member_only() { + replies.push("*You are not a member, ask one of the admins to add you.*".to_string()); + } else { + replies.push("*You are not a member, follow or use /join to join the group.*".to_string()); + } + } + + replies.push( + "\nTo share an original post, mention the group user.\n\ + Replies and mentions with commands won't be shared.\n\ + \n\ + **Supported commands:**\n\ + `/boost, /b` - boost the replied-to post into the group\n\ + `/ignore, /i` - make the group completely ignore the post\n\ + `/ping` - check that the service is alive\n\ + `/join` - join the group\n\ + `/leave` - leave the group".to_string(), + ); + + if self.config.is_member_only() { + replies.push("`/members, /who` - show group members / admins".to_string()); + } else { + replies.push("`/members, /who` - show group admins".to_string()); + } + + if is_admin { + replies.push( + "\n\ + **Admin commands:**\n\ + `/add user` - add a member (use e-mail style address)\n\ + `/kick, /remove user` - kick a member\n\ + `/ban x` - ban a user or a server\n\ + `/unban x` - lift a ban\n\ + `/op, /admin user` - grant admin rights\n\ + `/deop, /deadmin user` - revoke admin rights\n\ + `/opengroup` - make member-only\n\ + `/closegroup` - make public-access\n\ + `/announce x` - make a public announcement from the rest of the status" + .to_string(), + ); + } + } + StatusCommand::ListMembers => { + let mut show_admins = false; + if is_admin { + replies.push("Group members:".to_string()); + self.list_members(&mut replies); + } else { + replies.push("Group admins:".to_string()); + self.list_admins(&mut replies); + } + } + StatusCommand::ListTags => { + replies.push("Group tags:".to_string()); + let mut tags = self.config.get_tags().collect::>(); + tags.sort(); + for t in tags { + replies.push(format!("#{}", t)); + } + } + StatusCommand::Leave => { + if self.config.is_member_or_admin(&status_acct) { + // admin can leave but that's a bad idea + + any_admin_cmd = true; + let _ = self.config.set_member(&status_acct, false); + replies.push("You're no longer a group member. Unfollow the group user to stop receiving group messages.".to_string()); + + self.unfollow_user(&status_user_id).await + .log_error("Failed to unfollow"); + } + } + StatusCommand::Join => { + if self.config.is_member_or_admin(&status_acct) { + // Already a member, so let's try to follow the user + // again, maybe first time it failed + self.follow_user(status_user_id).await + .log_error("Failed to follow"); + } else { + // Not a member yet + if self.config.is_member_only() { + // No you can't + replies.push(format!( + "Hi, this group is closed to new sign-ups.\n\ + Please ask one of the group admins to add you:")); + self.list_admins(&mut replies); + } else { + // Open access + self.follow_user(status_user_id).await + .log_error("Failed to follow"); + + // This only fails if the user is banned, but that is filtered above + let _ = self.config.set_member(&status_acct, true); + replies.push(format!("\ + Thanks for joining, you are now a member and the group user will \ + follow you so you can use group hashtags. Make sure you follow the \ + group user to receive group messages.")); + } + } + } + StatusCommand::Ping => { + replies.push("Pong".to_string()); + } + } + } + + // tokio::time::sleep(DELAY_BEFORE_ACTION).await; + } + + if do_boost_prev_post { + self.client + .reblog(&status.in_reply_to_id.as_ref().unwrap()) + .await + .log_error("Failed to boost"); + } + + if !replies.is_empty() { + debug!("replies={:?}", replies); + let r = replies.join("\n"); + debug!("r={}", r); + + let post = StatusBuilder::new() + .status(format!("@{user}\n{msg}", user = status_acct, msg = r)) + .content_type(if want_markdown { + "text/markdown" + } else { + "text/plain" + }) + .visibility(Visibility::Direct) + .build() + .expect("error build status"); + + let _ = self.client.new_status(post).await.log_error("Failed to post"); + } + + if !announcements.is_empty() { + let msg = announcements.join("\n"); + let post = StatusBuilder::new() + .status(format!("**📢 Group announcement**\n{msg}", msg = msg)) + .content_type("text/markdown") + .visibility(Visibility::Public) + .build() + .expect("error build status"); + + let _ = self.client.new_status(post).await.log_error("Failed to post"); + } + + if any_admin_cmd { + debug!("Saving after admin cmd"); + self.save_if_needed().await.log_error("Failed to save"); + } + + Ok(()) + } } fn make_welcome_text(user: &str) -> String { diff --git a/src/store/data.rs b/src/store/data.rs index 495fa6a..c921a7f 100644 --- a/src/store/data.rs +++ b/src/store/data.rs @@ -7,7 +7,7 @@ use crate::error::GroupError; /// This is the inner data struct holding the config #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub(crate) struct Config { - groups: HashMap, + pub(crate) groups: HashMap, } impl Config { diff --git a/src/store/mod.rs b/src/store/mod.rs index 9d03306..123aab4 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -9,6 +9,7 @@ use data::{Config, GroupConfig}; use crate::error::GroupError; use crate::group_handle::GroupHandle; +use std::time::Duration; pub(crate) mod data; @@ -82,6 +83,7 @@ impl ConfigStore { pub async fn reauth_group(self: &Arc, acct: &str) -> Result { let groups = self.data.read().await; let mut config = groups.get_group_config(acct).ok_or(GroupError::GroupNotExist)?.clone(); + drop(groups); println!("--- Re-authenticating bot user @{} ---", acct); let registration = Registration::new(config.get_appdata().base.to_string()) @@ -92,6 +94,8 @@ impl ConfigStore { .await?; let client = elefren::helpers::cli::authenticate(registration).await?; + println!("Auth complete"); + let appdata = client.data.clone(); config.set_appdata(appdata); @@ -106,8 +110,8 @@ impl ConfigStore { /// Spawn existing group using saved creds pub async fn spawn_groups(self: Arc) -> Vec { - let groups = self.data.read().await; - let groups_iter = groups.iter_groups().cloned(); + let groups = self.data.read().await.clone(); + let groups_iter = groups.groups.into_values(); // Connect in parallel futures::stream::iter(groups_iter) @@ -158,9 +162,15 @@ impl ConfigStore { //noinspection RsSelfConvention /// Set group config to the store. The store then saved. pub(crate) async fn set_group_config(&self, config: GroupConfig) -> Result<(), GroupError> { - let mut data = self.data.write().await; - data.set_group_config(config); - self.persist(&data).await?; + debug!("Locking mutex"); + if let Ok(mut data) = tokio::time::timeout(Duration::from_secs(1), self.data.write()).await { + debug!("Locked"); + data.set_group_config(config); + debug!("Writing file"); + self.persist(&data).await?; + } else { + error!("DEADLOCK? Timeout waiting for data RW Lock in settings store"); + } Ok(()) } @@ -187,6 +197,7 @@ fn make_scopes() -> Scopes { | Scopes::read(scopes::Read::Follows) | Scopes::write(scopes::Write::Statuses) | Scopes::write(scopes::Write::Media) + | Scopes::write(scopes::Write::Follows) } // trait TapOk { From 52cf8f8e970ec14f1d27c5d07baa4b94937429b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Thu, 26 Aug 2021 22:11:20 +0200 Subject: [PATCH 3/4] v0.2, refactor, improve some messages, fix lints --- src/error.rs | 2 + src/group_handle.rs | 911 ---------------------------- src/group_handler/handle_mention.rs | 625 +++++++++++++++++++ src/group_handler/mod.rs | 454 ++++++++++++++ src/main.rs | 2 +- src/store/data.rs | 72 ++- src/store/mod.rs | 8 +- src/utils.rs | 13 + 8 files changed, 1146 insertions(+), 941 deletions(-) delete mode 100644 src/group_handle.rs create mode 100644 src/group_handler/handle_mention.rs create mode 100644 src/group_handler/mod.rs diff --git a/src/error.rs b/src/error.rs index 29f109f..3e0095d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,8 @@ pub enum GroupError { GroupNotExist, #[error("Config error: {0}")] BadConfig(Cow<'static, str>), + #[error("API request timed out")] + ApiTimeout, #[error(transparent)] IoError(#[from] std::io::Error), #[error(transparent)] diff --git a/src/group_handle.rs b/src/group_handle.rs deleted file mode 100644 index 82a2bce..0000000 --- a/src/group_handle.rs +++ /dev/null @@ -1,911 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use elefren::{FediClient, StatusBuilder}; -use elefren::debug::EventDisplay; -use elefren::debug::NotificationDisplay; -use elefren::debug::StatusDisplay; -use elefren::entities::event::Event; -use elefren::entities::notification::{Notification, NotificationType}; -use elefren::entities::status::Status; -use elefren::status_builder::Visibility; -use futures::StreamExt; - -use crate::command::StatusCommand; -use crate::error::GroupError; -use crate::store::ConfigStore; -use crate::store::data::GroupConfig; -use crate::utils::{LogError, normalize_acct}; - -/// This is one group's config store capable of persistence -#[derive(Debug)] -pub struct GroupHandle { - pub(crate) client: FediClient, - pub(crate) config: GroupConfig, - pub(crate) store: Arc, -} - -const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); -const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); -const MAX_CATCHUP_NOTIFS: usize = 25; -// also statuses -const MAX_CATCHUP_STATUSES: usize = 50; -// higher because we can expect a lot of non-hashtag statuses here -const PERIODIC_SAVE: Duration = Duration::from_secs(60); -const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! - -impl GroupHandle { - pub async fn save(&mut self) -> Result<(), GroupError> { - debug!("Saving group config & status"); - self.store.set_group_config(self.config.clone()).await?; - debug!("Saved"); - self.config.clear_dirty_status(); - Ok(()) - } - - pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { - if self.config.is_dirty() { - self.save().await?; - } - Ok(()) - } - - /* - pub async fn reload(&mut self) -> Result<(), GroupError> { - if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { - self.config = g; - Ok(()) - } else { - Err(GroupError::GroupNotExist) - } - } - */ -} - -trait NotifTimestamp { - fn timestamp_millis(&self) -> u64; -} - -impl NotifTimestamp for Notification { - fn timestamp_millis(&self) -> u64 { - self.created_at.timestamp_millis().max(0) as u64 - } -} - -impl NotifTimestamp for Status { - fn timestamp_millis(&self) -> u64 { - // this may not work well for unseen status tracking, - // if ancient statuses were to appear in the timeline :( - self.created_at.timestamp_millis().max(0) as u64 - } -} - -impl GroupHandle { - pub async fn run(&mut self) -> Result<(), GroupError> { - assert!(PERIODIC_SAVE >= PING_INTERVAL); - - loop { - debug!("Opening streaming API socket"); - let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start - let mut events = self.client.streaming_user().await?; - let socket_open_time = Instant::now(); - let mut last_rx = Instant::now(); - let mut last_ping = Instant::now(); - - match self.catch_up_with_missed_notifications().await { - Ok(true) => { - debug!("Some missed notifs handled"); - } - Ok(false) => { - debug!("No notifs missed"); - } - Err(e) => { - error!("Failed to handle missed notifs: {}", e); - } - } - - match self.catch_up_with_missed_statuses().await { - Ok(true) => { - debug!("Some missed statuses handled"); - } - Ok(false) => { - debug!("No statuses missed"); - } - Err(e) => { - error!("Failed to handle missed statuses: {}", e); - } - } - - if self.config.is_dirty() { - // save asap - next_save = Instant::now() - PERIODIC_SAVE - } - - 'rx: loop { - if next_save < Instant::now() { - debug!("Save time elapsed, saving if needed"); - self.save_if_needed().await.log_error("Failed to save group"); - next_save = Instant::now() + PERIODIC_SAVE; - } - - if last_rx.elapsed() > PING_INTERVAL * 2 { - warn!("Socket idle too long, close"); - break 'rx; - } - - if socket_open_time.elapsed() > Duration::from_secs(120) { - debug!("Socket open too long, closing"); - break 'rx; - } - - debug!("Await msg"); - let timeout = next_save - .saturating_duration_since(Instant::now()) - .min(PING_INTERVAL) - .max(Duration::from_secs(1)); - - match tokio::time::timeout(timeout, events.next()).await { - Ok(Some(event)) => { - last_rx = Instant::now(); - debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); - match event { - Event::Update(status) => { - self.handle_status(status).await.log_error("Error handling a status"); - } - Event::Notification(n) => { - self.handle_notification(n).await.log_error("Error handling a notification"); - } - Event::Delete(_id) => {} - Event::FiltersChanged => {} - Event::Heartbeat => {} - } - } - Ok(None) => { - warn!("Group @{} socket closed, restarting...", self.config.get_acct()); - break 'rx; - } - Err(_) => { - // Timeout so we can save if needed - } - } - - if last_ping.elapsed() > PING_INTERVAL { - last_ping = Instant::now(); - debug!("Pinging"); - if events.send_ping() - .await.is_err() { - break 'rx; - } - } - } - - warn!("Notif stream closed, will reopen"); - tokio::time::sleep(DELAY_REOPEN_STREAM).await; - } - } - - async fn handle_notification(&mut self, n: Notification) -> Result<(), GroupError> { - debug!("Handling notif #{}", n.id); - let ts = n.timestamp_millis(); - self.config.set_last_notif(ts); - - let group_acct = self.config.get_acct().to_string(); - let notif_user_id = &n.account.id; - let notif_acct = normalize_acct(&n.account.acct, &group_acct)?; - - if notif_acct == group_acct { - debug!("This is our post, ignore that"); - return Ok(()); - } - - if self.config.is_banned(¬if_acct) { - warn!("Notification actor {} is banned!", notif_acct); - return Ok(()); - } - - match n.notification_type { - NotificationType::Mention => { - if let Some(status) = n.status { - self.handle_mention_status(status).await?; - } - } - NotificationType::Follow => { - info!("New follower!"); - - if self.config.is_member_or_admin(¬if_acct) { - // Already joined, just doing something silly, ignore this - debug!("User already a member, ignoring"); - } else { - let text = if self.config.is_member_only() { - // Admins are listed without @, so they won't become handles here. - // Tagging all admins would be annoying. - let mut admins = self.config.get_admins().cloned().collect::>(); - admins.sort(); - format!( - "@{user} Hi, this is a member-only group, you won't be \ - able to post. You can still receive group posts though. If you'd like to join, \ - please ask one of the group admins to add you:\n\n\ - {admins}", - user = notif_acct, - admins = admins.join(", ") - ) - } else { - self.follow_user(notif_user_id).await - .log_error("Failed to follow"); - make_welcome_text(¬if_acct) - }; - - let post = StatusBuilder::new() - .status(text) - .content_type("text/markdown") - .visibility(Visibility::Direct) - .build() - .expect("error build status"); - - // tokio::time::sleep(Duration::from_millis(500)).await; - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - } - NotificationType::Favourite => {} - NotificationType::Reblog => {} - } - - Ok(()) - } - - /// Handle a non-mention status - async fn handle_status(&mut self, s: Status) -> Result<(), GroupError> { - debug!("Handling status #{}", s.id); - let ts = s.timestamp_millis(); - self.config.set_last_status(ts); - - let group_user = self.config.get_acct(); - let status_user = normalize_acct(&s.account.acct, group_user)?; - - if status_user == group_user { - debug!("This is our post, ignore that"); - return Ok(()); - } - - // for m in &s.mentions { - // let ma = normalize_acct(&m.acct, gu)?; - // if ma == gu { - // debug!("Mention detected, handle status as mention notification!"); - // return self.handle_mention_status(s).await; - // } - // } - - if !s.content.contains('#') { - debug!("No tags in status"); - return Ok(()); - } - - if s.visibility.is_private() { - debug!("Status is direct/private, not boosting"); - return Ok(()); - } - - if s.content.contains("/add ") || s.content.contains("/remove ") { - debug!("Discard, looks like a hashtag manipulation command"); - return Ok(()); - } - - - if self.config.is_banned(&status_user) { - debug!("Status author @{} is banned.", status_user); - return Ok(()); - } - - if !self.config.is_member_or_admin(&status_user) { - debug!("Status author @{} is not a member.", status_user); - return Ok(()); - } - - let tags = crate::command::parse_status_tags(&s.content); - debug!("Tags in status: {:?}", tags); - - for t in tags { - if self.config.is_tag_followed(&t) { - self.client.reblog(&s.id).await - .log_error("Failed to reblog"); - break; - } - } - - Ok(()) - } - - async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { - self.client.follow(id).await?; - Ok(()) - } - - async fn unfollow_user(&mut self, id: &str) -> Result<(), GroupError> { - self.client.unfollow(id).await?; - Ok(()) - } - - /// Catch up with missed notifications, returns true if any were handled - async fn catch_up_with_missed_notifications(&mut self) -> Result { - let last_notif = self.config.get_last_notif(); - - let notifications = self.client.notifications().await?; - let mut iter = notifications.items_iter(); - - let mut notifs_to_handle = vec![]; - - // They are retrieved newest first, but we want oldest first for chronological handling - - let mut num = 0; - while let Some(n) = iter.next_item().await { - let ts = n.timestamp_millis(); - if ts <= last_notif { - break; // reached our last seen notif - } - - debug!("Inspecting notif {}", NotificationDisplay(&n)); - notifs_to_handle.push(n); - num += 1; - if num > MAX_CATCHUP_NOTIFS { - warn!("Too many notifs missed to catch up!"); - break; - } - - // sleep so we dont make the api angry - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if notifs_to_handle.is_empty() { - return Ok(false); - } - - notifs_to_handle.reverse(); - - debug!("{} notifications to catch up!", notifs_to_handle.len()); - - for n in notifs_to_handle { - debug!("Handling missed notification: {}", NotificationDisplay(&n)); - self.handle_notification(n).await.log_error("Error handling a notification"); - } - - Ok(true) - } - - /// Catch up with missed statuses, returns true if any were handled - async fn catch_up_with_missed_statuses(&mut self) -> Result { - let last_status = self.config.get_last_status(); - - let statuses = self.client.get_home_timeline().await?; - let mut iter = statuses.items_iter(); - - let mut statuses_to_handle = vec![]; - - // They are retrieved newest first, but we want oldest first for chronological handling - - let mut newest_status = None; - - let mut num = 0; - while let Some(s) = iter.next_item().await { - let ts = s.timestamp_millis(); - if ts <= last_status { - break; // reached our last seen status (hopefully there arent any retro-bumped) - } - - debug!("Inspecting status {}", StatusDisplay(&s)); - - if newest_status.is_none() { - newest_status = Some(ts); - } - - if s.content.contains('#') && !s.visibility.is_private() { - statuses_to_handle.push(s); - } - num += 1; - if num > MAX_CATCHUP_STATUSES { - warn!("Too many statuses missed to catch up!"); - break; - } - - // sleep so we dont make the api angry - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if let Some(ts) = newest_status { - self.config.set_last_status(ts); - } - - if statuses_to_handle.is_empty() { - return Ok(false); - } - - statuses_to_handle.reverse(); - - debug!("{} statuses to catch up!", statuses_to_handle.len()); - - for s in statuses_to_handle { - debug!("Handling missed status: {}", StatusDisplay(&s)); - self.handle_status(s).await - .log_error("Error handling a status"); - } - - Ok(true) - } - - fn list_admins(&self, replies: &mut Vec) { - let mut admins = self.config.get_admins().collect::>(); - admins.sort(); - for a in admins { - replies.push(a.to_string()); - } - } - - fn list_members(&self, replies: &mut Vec) { - let admins = self.config.get_admins().collect::>(); - let mut members = self.config.get_members().collect::>(); - members.extend(admins.iter()); - members.sort(); - members.dedup(); - for m in members { - if admins.contains(&m) { - replies.push(format!("{} [admin]", m)); - } else { - replies.push(m.to_string()); - } - } - } - async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { - let group_acct = self.config.get_acct().to_string(); - let status_acct = normalize_acct(&status.account.acct, &group_acct)?; - let status_user_id = &status.account.id; - - let can_write = self.config.can_write(&status_acct); - let is_admin = self.config.is_admin(&status_acct); - - if self.config.is_banned(&status_acct) { - warn!("Status author {} is banned!", status_acct); - return Ok(()); - } - - let commands = crate::command::parse_slash_commands(&status.content); - - let mut replies = vec![]; - let mut announcements = vec![]; - let mut do_boost_prev_post = false; - let mut any_admin_cmd = false; - let mut want_markdown = false; - - if commands.is_empty() { - debug!("No commands in post"); - if status.in_reply_to_id.is_none() { - if can_write { - // Someone tagged the group in OP, boost it. - info!("Boosting OP mention"); - // tokio::time::sleep(DELAY_BEFORE_ACTION).await; - self.client.reblog(&status.id).await.log_error("Failed to boost"); - } else { - replies.push("You are not allowed to post to this group".to_string()); - } - } else { - debug!("Not OP, ignore mention"); - } - } else { - for cmd in commands { - match cmd { - // ignore is first on purpose - StatusCommand::Ignore => { - debug!("Notif ignored because of ignore command"); - return Ok(()); - } - StatusCommand::Announce(a) => { - info!("Sending PSA"); - announcements.push(a); - } - StatusCommand::Boost => { - if can_write { - do_boost_prev_post = status.in_reply_to_id.is_some(); - } else { - replies.push("You are not allowed to share to this group".to_string()); - } - } - StatusCommand::BanUser(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_banned(&u) { - match self.config.ban_user(&u, true) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} banned from group!", u)); - - // FIXME we need user ID, not handle - get it via API? - // self.unfollow_user(&u).await - // .log_error("Failed to unfollow"); - - // no announcement here - } - Err(e) => { - replies.push(format!("Failed to ban user {}: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage user bans".to_string()); - } - } - StatusCommand::UnbanUser(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_banned(&u) { - match self.config.ban_user(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} un-banned!", u)); - - // no announcement here - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage user bans".to_string()); - } - } - StatusCommand::BanServer(s) => { - if is_admin { - if !self.config.is_server_banned(&s) { - match self.config.ban_server(&s, true) { - Ok(_) => { - any_admin_cmd = true; - announcements.push(format!("Server \"{}\" has been banned.", s)); - replies.push(format!("Server {} banned from group!", s)); - } - Err(e) => { - replies.push(format!("Failed to ban server {}: {}", s, e)); - } - } - } - } else { - replies.push("Only admins can manage server bans".to_string()); - } - } - StatusCommand::UnbanServer(s) => { - if is_admin { - if self.config.is_server_banned(&s) { - match self.config.ban_server(&s, false) { - Ok(_) => { - any_admin_cmd = true; - announcements.push(format!("Server \"{}\" has been un-banned.", s)); - replies.push(format!("Server {} un-banned!", s)); - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage server bans".to_string()); - } - } - StatusCommand::AddMember(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_member(&u) { - match self.config.set_member(&u, true) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} added to the group!", u)); - self.follow_user(status_user_id).await.log_error("Failed to follow"); - } - Err(e) => { - replies.push(format!("Failed to add user {} to group: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage members".to_string()); - } - } - StatusCommand::RemoveMember(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_member(&u) { - match self.config.set_member(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} removed from the group.", u)); - - // FIXME we need user ID, not handle - get it via API? - // self.unfollow_user(&u).await - // .log_error("Failed to unfollow"); - } - Err(_) => { - unreachable!() - } - } - } - } else { - replies.push("Only admins can manage members".to_string()); - } - } - StatusCommand::AddTag(tag) => { - if is_admin { - any_admin_cmd = true; - self.config.add_tag(&tag); - replies.push(format!("Tag #{} added to the group!", tag)); - } else { - replies.push("Only admins can manage group tags".to_string()); - } - } - StatusCommand::RemoveTag(tag) => { - if is_admin { - any_admin_cmd = true; - self.config.remove_tag(&tag); - replies.push(format!("Tag #{} removed from the group!", tag)); - } else { - replies.push("Only admins can manage group tags".to_string()); - } - } - StatusCommand::GrantAdmin(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if !self.config.is_admin(&u) { - match self.config.set_admin(&u, true) { - Ok(_) => { - // try to make the config a little more sane, admins should be members - let _ = self.config.set_member(&u, true); - - any_admin_cmd = true; - replies.push(format!("User {} is now a group admin!", u)); - announcements - .push(format!("User @{} can now manage this group!", u)); - } - Err(e) => { - replies.push(format!( - "Failed to make user {} a group admin: {}", - u, e - )); - } - } - } - } else { - replies.push("Only admins can manage admins".to_string()); - } - } - StatusCommand::RemoveAdmin(u) => { - let u = normalize_acct(&u, &group_acct)?; - if is_admin { - if self.config.is_admin(&u) { - match self.config.set_admin(&u, false) { - Ok(_) => { - any_admin_cmd = true; - replies.push(format!("User {} is no longer a group admin!", u)); - announcements - .push(format!("User @{} no longer manages this group.", u)); - } - Err(e) => { - replies - .push(format!("Failed to revoke {}'s group admin: {}", u, e)); - } - } - } - } else { - replies.push("Only admins can manage admins".to_string()); - } - } - StatusCommand::OpenGroup => { - if is_admin { - if self.config.is_member_only() { - any_admin_cmd = true; - self.config.set_member_only(false); - replies.push("Group changed to open-access".to_string()); - announcements.push("This group is now open-access!".to_string()); - } - } else { - replies.push("Only admins can set group options".to_string()); - } - } - StatusCommand::CloseGroup => { - if is_admin { - if !self.config.is_member_only() { - any_admin_cmd = true; - self.config.set_member_only(true); - replies.push("Group changed to member-only".to_string()); - announcements.push("This group is now member-only!".to_string()); - } - } else { - replies.push("Only admins can set group options".to_string()); - } - } - StatusCommand::Help => { - want_markdown = true; - - if self.config.is_member_only() { - replies.push("This is a member-only group. ".to_string()); - } else { - replies.push("This is a public-access group. ".to_string()); - } - - if self.config.can_write(&status_acct) { - if is_admin { - replies.push("*You are an admin.*".to_string()); - } else { - replies.push("*You are a member.*".to_string()); - } - } else { - if self.config.is_member_only() { - replies.push("*You are not a member, ask one of the admins to add you.*".to_string()); - } else { - replies.push("*You are not a member, follow or use /join to join the group.*".to_string()); - } - } - - replies.push( - "\nTo share an original post, mention the group user.\n\ - Replies and mentions with commands won't be shared.\n\ - \n\ - **Supported commands:**\n\ - `/boost, /b` - boost the replied-to post into the group\n\ - `/ignore, /i` - make the group completely ignore the post\n\ - `/ping` - check that the service is alive\n\ - `/join` - join the group\n\ - `/leave` - leave the group".to_string(), - ); - - if self.config.is_member_only() { - replies.push("`/members, /who` - show group members / admins".to_string()); - } else { - replies.push("`/members, /who` - show group admins".to_string()); - } - - if is_admin { - replies.push( - "\n\ - **Admin commands:**\n\ - `/add user` - add a member (use e-mail style address)\n\ - `/kick, /remove user` - kick a member\n\ - `/ban x` - ban a user or a server\n\ - `/unban x` - lift a ban\n\ - `/op, /admin user` - grant admin rights\n\ - `/deop, /deadmin user` - revoke admin rights\n\ - `/opengroup` - make member-only\n\ - `/closegroup` - make public-access\n\ - `/announce x` - make a public announcement from the rest of the status" - .to_string(), - ); - } - } - StatusCommand::ListMembers => { - let mut show_admins = false; - if is_admin { - replies.push("Group members:".to_string()); - self.list_members(&mut replies); - } else { - replies.push("Group admins:".to_string()); - self.list_admins(&mut replies); - } - } - StatusCommand::ListTags => { - replies.push("Group tags:".to_string()); - let mut tags = self.config.get_tags().collect::>(); - tags.sort(); - for t in tags { - replies.push(format!("#{}", t)); - } - } - StatusCommand::Leave => { - if self.config.is_member_or_admin(&status_acct) { - // admin can leave but that's a bad idea - - any_admin_cmd = true; - let _ = self.config.set_member(&status_acct, false); - replies.push("You're no longer a group member. Unfollow the group user to stop receiving group messages.".to_string()); - - self.unfollow_user(&status_user_id).await - .log_error("Failed to unfollow"); - } - } - StatusCommand::Join => { - if self.config.is_member_or_admin(&status_acct) { - // Already a member, so let's try to follow the user - // again, maybe first time it failed - self.follow_user(status_user_id).await - .log_error("Failed to follow"); - } else { - // Not a member yet - if self.config.is_member_only() { - // No you can't - replies.push(format!( - "Hi, this group is closed to new sign-ups.\n\ - Please ask one of the group admins to add you:")); - self.list_admins(&mut replies); - } else { - // Open access - self.follow_user(status_user_id).await - .log_error("Failed to follow"); - - // This only fails if the user is banned, but that is filtered above - let _ = self.config.set_member(&status_acct, true); - replies.push(format!("\ - Thanks for joining, you are now a member and the group user will \ - follow you so you can use group hashtags. Make sure you follow the \ - group user to receive group messages.")); - } - } - } - StatusCommand::Ping => { - replies.push("Pong".to_string()); - } - } - } - - // tokio::time::sleep(DELAY_BEFORE_ACTION).await; - } - - if do_boost_prev_post { - self.client - .reblog(&status.in_reply_to_id.as_ref().unwrap()) - .await - .log_error("Failed to boost"); - } - - if !replies.is_empty() { - debug!("replies={:?}", replies); - let r = replies.join("\n"); - debug!("r={}", r); - - let post = StatusBuilder::new() - .status(format!("@{user}\n{msg}", user = status_acct, msg = r)) - .content_type(if want_markdown { - "text/markdown" - } else { - "text/plain" - }) - .visibility(Visibility::Direct) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - - if !announcements.is_empty() { - let msg = announcements.join("\n"); - let post = StatusBuilder::new() - .status(format!("**📢 Group announcement**\n{msg}", msg = msg)) - .content_type("text/markdown") - .visibility(Visibility::Public) - .build() - .expect("error build status"); - - let _ = self.client.new_status(post).await.log_error("Failed to post"); - } - - if any_admin_cmd { - debug!("Saving after admin cmd"); - self.save_if_needed().await.log_error("Failed to save"); - } - - Ok(()) - } -} - -fn make_welcome_text(user: &str) -> String { - format!( - "@{user} Welcome to the group! To share a post, tag the group user \ - or use one of the group hashtags. Use /help for more info.", - user = user - ) -} - - -trait VisExt: Copy { - /// Check if is private or direct - fn is_private(self) -> bool; -} - -impl VisExt for Visibility { - fn is_private(self) -> bool { - self == Visibility::Direct || self == Visibility::Private - } -} diff --git a/src/group_handler/handle_mention.rs b/src/group_handler/handle_mention.rs new file mode 100644 index 0000000..e145adf --- /dev/null +++ b/src/group_handler/handle_mention.rs @@ -0,0 +1,625 @@ +use std::collections::HashSet; +use std::time::Duration; + +use elefren::{FediClient, SearchType, StatusBuilder}; +use elefren::entities::prelude::Status; +use elefren::status_builder::Visibility; + +use crate::command::StatusCommand; +use crate::error::GroupError; +use crate::group_handler::GroupHandle; +use crate::store::data::GroupConfig; +use crate::utils::{LogError, normalize_acct}; + +pub struct ProcessMention<'a> { + status: Status, + config: &'a mut GroupConfig, + client: &'a mut FediClient, + group_acct: String, + status_acct: String, + status_user_id: String, + can_write: bool, + is_admin: bool, + replies: Vec, + announcements: Vec, + do_boost_prev_post: bool, + want_markdown: bool, +} + +impl<'a> ProcessMention<'a> { + async fn lookup_acct_id(&self, acct: &str, followed: bool) -> Result, GroupError> { + debug!("Looking up user ID by acct: {}", acct); + + match tokio::time::timeout(Duration::from_secs(5), self.client.search_v2( + acct, + !followed, + Some(SearchType::Accounts), + Some(1), + followed, + )).await { + Err(_) => { + warn!("Account lookup timeout!"); + Err(GroupError::ApiTimeout) + } + Ok(Err(e)) => { + // Elefren error + Err(e.into()) + } + Ok(Ok(res)) => { + if let Some(item) = res.accounts.into_iter().next() { + debug!("Search done, account found"); + Ok(Some(item.id)) + } else { + debug!("Search done, nothing found"); + Ok(None) + } + } + } + } + + fn append_admin_list_to_reply(&mut self) { + let mut admins = self.config.get_admins().collect::>(); + admins.sort(); + for a in admins { + self.replies.push(a.to_string()); + } + } + + fn append_member_list_to_reply(&mut self) { + let admins = self.config.get_admins().collect::>(); + let mut members = self.config.get_members().collect::>(); + members.extend(admins.iter()); + members.sort(); + members.dedup(); + for m in members { + self.replies.push(if admins.contains(&m) { + format!("{} [admin]", m) + } else { + m.to_string() + }); + } + } + + async fn follow_user(&self, id: &str) -> Result<(), GroupError> { + self.client.follow(id).await?; + Ok(()) + } + + async fn unfollow_user(&self, id: &str) -> Result<(), GroupError> { + self.client.unfollow(id).await?; + Ok(()) + } + + pub(crate) async fn run(gh: &'a mut GroupHandle, status: Status) -> Result<(), GroupError> { + let group_acct = gh.config.get_acct().to_string(); + let status_acct = normalize_acct(&status.account.acct, &group_acct)?.to_string(); + + if gh.config.is_banned(&status_acct) { + warn!("Status author {} is banned!", status_acct); + return Ok(()); + } + + let pm = Self { + status_user_id: status.account.id.to_string(), + client: &mut gh.client, + can_write: gh.config.can_write(&status_acct), + is_admin: gh.config.is_admin(&status_acct), + replies: vec![], + announcements: vec![], + do_boost_prev_post: false, + want_markdown: false, + group_acct, + status_acct, + status, + config: &mut gh.config, + }; + + pm.handle().await + } + + async fn reblog_status(&self) { + self.client.reblog(&self.status.id) + .await + .log_error("Failed to reblog status") + } + + fn add_reply(&mut self, line: impl ToString) { + self.replies.push(line.to_string()) + } + + fn add_announcement(&mut self, line: impl ToString) { + self.announcements.push(line.to_string()) + } + + async fn handle(mut self) -> Result<(), GroupError> { + let commands = crate::command::parse_slash_commands(&self.status.content); + + if commands.is_empty() { + self.handle_post_with_no_commands().await; + } else { + if commands.contains(&StatusCommand::Ignore) { + debug!("Notif ignored because of ignore command"); + return Ok(()); + } + + for cmd in commands { + match cmd { + StatusCommand::Ignore => { + unreachable!(); // Handled above + } + StatusCommand::Announce(a) => { + self.cmd_announce(a).await; + } + StatusCommand::Boost => { + self.cmd_boost().await; + } + StatusCommand::BanUser(u) => { + self.cmd_ban_user(&u).await + .log_error("Error handling ban-user cmd"); + } + StatusCommand::UnbanUser(u) => { + self.cmd_unban_user(&u).await + .log_error("Error handling unban-user cmd"); + } + StatusCommand::BanServer(s) => { + self.cmd_ban_server(&s).await; + } + StatusCommand::UnbanServer(s) => { + self.cmd_unban_server(&s).await; + } + StatusCommand::AddMember(u) => { + self.cmd_add_member(&u).await + .log_error("Error handling add-member cmd"); + } + StatusCommand::RemoveMember(u) => { + self.cmd_remove_member(&u).await + .log_error("Error handling remove-member cmd"); + } + StatusCommand::AddTag(tag) => { + self.cmd_add_tag(tag).await; + } + StatusCommand::RemoveTag(tag) => { + self.cmd_remove_tag(tag).await; + } + StatusCommand::GrantAdmin(u) => { + self.cmd_grant_member(&u).await + .log_error("Error handling grant-admin cmd"); + } + StatusCommand::RemoveAdmin(u) => { + self.cmd_revoke_member(&u).await + .log_error("Error handling grant-admin cmd"); + } + StatusCommand::OpenGroup => { + self.cmd_open_group().await; + } + StatusCommand::CloseGroup => { + self.cmd_close_group().await; + } + StatusCommand::Help => { + self.cmd_help().await; + } + StatusCommand::ListMembers => { + self.cmd_list_members().await; + } + StatusCommand::ListTags => { + self.cmd_list_tags().await; + } + StatusCommand::Leave => { + self.cmd_leave().await; + } + StatusCommand::Join => { + self.cmd_join().await; + } + StatusCommand::Ping => { + self.cmd_ping().await; + } + } + } + } + + if self.do_boost_prev_post { + self.client + .reblog(self.status.in_reply_to_id.as_ref().unwrap()) + .await + .log_error("Failed to boost"); + } + + if !self.replies.is_empty() { + debug!("replies={:?}", self.replies); + let r = self.replies.join("\n"); + debug!("r={}", r); + + if let Ok(post) = StatusBuilder::new() + .status(format!("@{user}\n{msg}", user = self.status_acct, msg = r)) + .content_type(if self.want_markdown { + "text/markdown" + } else { + "text/plain" + }) + .visibility(self.status.visibility) // Copy visibility + .build() + { + let _ = self.client.new_status(post) + .await.log_error("Failed to post"); + } + } + + if !self.announcements.is_empty() { + let msg = self.announcements.join("\n"); + let post = StatusBuilder::new() + .status(format!("**📢 Group announcement**\n{msg}", msg = msg)) + .content_type("text/markdown") + .visibility(Visibility::Public) + .build() + .expect("error build status"); + + let _ = self.client.new_status(post) + .await.log_error("Failed to post"); + } + + Ok(()) + } + + async fn handle_post_with_no_commands(&mut self) { + debug!("No commands in post"); + if self.status.in_reply_to_id.is_none() { + if self.can_write { + // Someone tagged the group in OP, boost it. + info!("Boosting OP mention"); + // tokio::time::sleep(DELAY_BEFORE_ACTION).await; + self.reblog_status().await; + } else { + self.add_reply("You are not allowed to post to this group"); + } + } else { + debug!("Not OP, ignore mention"); + } + } + + async fn cmd_announce(&mut self, msg: String) { + info!("Sending PSA"); + self.add_announcement(msg); + } + + async fn cmd_boost(&mut self) { + if self.can_write { + self.do_boost_prev_post = self.status.in_reply_to_id.is_some(); + } else { + self.add_reply("You are not allowed to share to this group"); + } + } + + async fn cmd_ban_user(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if !self.config.is_banned(&u) { + match self.config.ban_user(&u, true) { + Ok(_) => { + self.add_reply(format!("User {} banned from group!", u)); + self.unfollow_by_acct(&u).await + .log_error("Failed to unfollow banned user"); + } + Err(e) => { + self.add_reply(format!("Failed to ban user {}: {}", u, e)); + } + } + } + } else { + self.add_reply("Only admins can manage user bans"); + } + Ok(()) + } + + async fn cmd_unban_user(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if self.config.is_banned(&u) { + match self.config.ban_user(&u, false) { + Ok(_) => { + self.add_reply(format!("User {} un-banned!", u)); + // no announcement here + } + Err(_) => { + unreachable!() + } + } + } + } else { + self.add_reply("Only admins can manage user bans"); + } + Ok(()) + } + + async fn cmd_ban_server(&mut self, s: &str) { + if self.is_admin { + if !self.config.is_server_banned(s) { + match self.config.ban_server(s, true) { + Ok(_) => { + self.add_announcement(format!("Server \"{}\" has been banned.", s)); + self.add_reply(format!("Server {} banned from group!", s)); + } + Err(e) => { + self.add_reply(format!("Failed to ban server {}: {}", s, e)); + } + } + } + } else { + self.add_reply("Only admins can manage server bans"); + } + } + + async fn cmd_unban_server(&mut self, s: &str) { + if self.is_admin { + if self.config.is_server_banned(s) { + match self.config.ban_server(s, false) { + Ok(_) => { + self.add_announcement(format!("Server \"{}\" has been un-banned.", s)); + self.add_reply(format!("Server {} un-banned!", s)); + } + Err(_) => { + unreachable!() + } + } + } + } else { + self.add_reply("Only admins can manage server bans"); + } + } + + async fn cmd_add_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if !self.config.is_member(&u) { + match self.config.set_member(&u, true) { + Ok(_) => { + self.add_reply(format!("User {} added to the group!", u)); + self.follow_user(&self.status_user_id) + .await.log_error("Failed to follow"); + } + Err(e) => { + self.add_reply(format!("Failed to add user {} to group: {}", u, e)); + } + } + } + } else { + self.add_reply("Only admins can manage members"); + } + Ok(()) + } + + async fn cmd_remove_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if self.config.is_member(&u) { + match self.config.set_member(&u, false) { + Ok(_) => { + self.add_reply(format!("User {} removed from the group.", u)); + self.unfollow_by_acct(&u).await + .log_error("Failed to unfollow removed user"); + } + Err(_) => { + unreachable!() + } + } + } + } else { + self.add_reply("Only admins can manage members"); + } + Ok(()) + } + + async fn cmd_add_tag(&mut self, tag: String) { + if self.is_admin { + self.config.add_tag(&tag); + self.add_reply(format!("Tag #{} added to the group!", tag)); + } else { + self.add_reply("Only admins can manage group tags"); + } + } + + async fn cmd_remove_tag(&mut self, tag: String) { + if self.is_admin { + self.config.remove_tag(&tag); + self.add_reply(format!("Tag #{} removed from the group!", tag)); + } else { + self.add_reply("Only admins can manage group tags"); + } + } + + async fn cmd_grant_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if !self.config.is_admin(&u) { + match self.config.set_admin(&u, true) { + Ok(_) => { + // try to make the config a little more sane, admins should be members + let _ = self.config.set_member(&u, true); + + self.add_reply(format!("User {} is now a group admin!", u)); + self.add_announcement(format!("User @{} can now manage this group!", u)); + } + Err(e) => { + self.add_reply(format!( + "Failed to make user {} a group admin: {}", + u, e + )); + } + } + } + } else { + self.add_reply("Only admins can manage admins"); + } + Ok(()) + } + + async fn cmd_revoke_member(&mut self, user: &str) -> Result<(), GroupError> { + let u = normalize_acct(user, &self.group_acct)?; + if self.is_admin { + if self.config.is_admin(&u) { + match self.config.set_admin(&u, false) { + Ok(_) => { + self.add_reply(format!("User {} is no longer a group admin!", u)); + self.add_announcement(format!("User @{} no longer manages this group.", u)); + } + Err(e) => { + self.add_reply(format!("Failed to revoke {}'s group admin: {}", u, e)); + } + } + } + } else { + self.add_reply("Only admins can manage admins"); + } + Ok(()) + } + + async fn cmd_open_group(&mut self) { + if self.is_admin { + if self.config.is_member_only() { + self.config.set_member_only(false); + self.add_reply("Group changed to open-access"); + self.add_announcement("This group is now open-access!"); + } + } else { + self.add_reply("Only admins can set group options"); + } + } + + async fn cmd_close_group(&mut self) { + if self.is_admin { + if !self.config.is_member_only() { + self.config.set_member_only(true); + self.add_reply("Group changed to member-only"); + self.add_announcement("This group is now member-only!"); + } + } else { + self.add_reply("Only admins can set group options"); + } + } + + async fn cmd_help(&mut self) { + self.want_markdown = true; + + if self.config.is_member_only() { + self.add_reply("This is a member-only group. "); + } else { + self.add_reply("This is a public-access group. "); + } + + if self.config.can_write(&self.status_acct) { + if self.is_admin { + self.add_reply("*You are an admin.*"); + } else { + self.add_reply("*You are a member.*"); + } + } else { + if self.config.is_member_only() { + self.add_reply("*You are not a member, ask one of the admins to add you.*"); + } else { + self.add_reply("*You are not a member, follow or use /join to join the group.*"); + } + } + + self.add_reply("\n\ + To share an original post, mention the group user.\n\ + Replies and mentions with commands won't be shared.\n\ + \n\ + **Supported commands:**\n\ + `/boost, /b` - boost the replied-to post into the group\n\ + `/ignore, /i` - make the group completely ignore the post\n\ + `/ping` - check that the service is alive\n\ + `/join` - join the group\n\ + `/leave` - leave the group"); + + if self.config.is_member_only() { + self.add_reply("`/members, /who` - show group members / admins"); + } else { + self.add_reply("`/members, /who` - show group admins"); + } + + if self.is_admin { + self.add_reply("\n\ + **Admin commands:**\n\ + `/add user` - add a member (use e-mail style address)\n\ + `/kick, /remove user` - kick a member\n\ + `/ban x` - ban a user or a server\n\ + `/unban x` - lift a ban\n\ + `/op, /admin user` - grant admin rights\n\ + `/deop, /deadmin user` - revoke admin rights\n\ + `/opengroup` - make member-only\n\ + `/closegroup` - make public-access\n\ + `/announce x` - make a public announcement from the rest of the status"); + } + } + + async fn cmd_list_members(&mut self) { + if self.is_admin { + self.add_reply("Group members:"); + self.append_member_list_to_reply(); + } else { + self.add_reply("Group admins:"); + self.append_admin_list_to_reply(); + } + } + + async fn cmd_list_tags(&mut self) { + self.add_reply("Group tags:"); + let mut tags = self.config.get_tags().collect::>(); + tags.sort(); + for t in tags { + self.replies.push(format!("#{}", t).to_string()); + } + } + + async fn cmd_leave(&mut self) { + if self.config.is_member_or_admin(&self.status_acct) { + // admin can leave but that's a bad idea + let _ = self.config.set_member(&self.status_acct, false); + self.add_reply("You're no longer a group member. Unfollow the group user to stop receiving group messages."); + self.unfollow_user(&self.status_user_id).await + .log_error("Failed to unfollow"); + } + } + + async fn cmd_join(&mut self) { + if self.config.is_member_or_admin(&self.status_acct) { + debug!("Already member or admin, try to follow-back again"); + // Already a member, so let's try to follow the user + // again, maybe first time it failed + self.follow_user(&self.status_user_id).await + .log_error("Failed to follow"); + } else { + // Not a member yet + if self.config.is_member_only() { + // No you can't + self.add_reply("\ + Sorry, this group is closed to new sign-ups.\n\ + Please ask one of the group admins to add you:"); + + self.append_admin_list_to_reply(); + } else { + // Open access, try to follow back + self.follow_user(&self.status_user_id).await + .log_error("Failed to follow"); + + // This only fails if the user is banned, but that is filtered above + let _ = self.config.set_member(&self.status_acct, true); + self.add_reply("\ + Welcome to the group! The group user will now follow you to complete the sign-up. \ + Make sure you follow back to receive shared posts!\n\n\ + Use /help for more info."); + } + } + } + + async fn cmd_ping(&mut self) { + self.add_reply(format!("pong, this is fedigroups service v{}", env!("CARGO_PKG_VERSION"))); + } + + async fn unfollow_by_acct(&self, acct: &str) -> Result<(), GroupError> { + // Try to unfollow + if let Ok(Some(id)) = self.lookup_acct_id(acct, true).await { + self.unfollow_user(&id).await?; + } + Ok(()) + } +} diff --git a/src/group_handler/mod.rs b/src/group_handler/mod.rs new file mode 100644 index 0000000..d5b260b --- /dev/null +++ b/src/group_handler/mod.rs @@ -0,0 +1,454 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use elefren::{FediClient, StatusBuilder}; +use elefren::debug::EventDisplay; +use elefren::debug::NotificationDisplay; +use elefren::debug::StatusDisplay; +use elefren::entities::event::Event; +use elefren::entities::notification::{Notification, NotificationType}; +use elefren::entities::status::Status; +use elefren::status_builder::Visibility; +use futures::StreamExt; + +use handle_mention::ProcessMention; + +use crate::error::GroupError; +use crate::store::ConfigStore; +use crate::store::data::GroupConfig; +use crate::utils::{LogError, normalize_acct, VisExt}; + +mod handle_mention; + +/// This is one group's config store capable of persistence +#[derive(Debug)] +pub struct GroupHandle { + pub(crate) client: FediClient, + pub(crate) config: GroupConfig, + pub(crate) store: Arc, +} + +// const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); +const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); +const MAX_CATCHUP_NOTIFS: usize = 25; +// also statuses +const MAX_CATCHUP_STATUSES: usize = 50; +// higher because we can expect a lot of non-hashtag statuses here +const PERIODIC_SAVE: Duration = Duration::from_secs(60); +const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! + +impl GroupHandle { + pub async fn save(&mut self) -> Result<(), GroupError> { + debug!("Saving group config & status"); + self.store.set_group_config(self.config.clone()).await?; + trace!("Saved"); + self.config.clear_dirty_status(); + Ok(()) + } + + pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { + if self.config.is_dirty() { + self.save().await?; + } + Ok(()) + } + + /* + pub async fn reload(&mut self) -> Result<(), GroupError> { + if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { + self.config = g; + Ok(()) + } else { + Err(GroupError::GroupNotExist) + } + } + */ +} + +trait NotifTimestamp { + fn timestamp_millis(&self) -> u64; +} + +impl NotifTimestamp for Notification { + fn timestamp_millis(&self) -> u64 { + self.created_at.timestamp_millis().max(0) as u64 + } +} + +impl NotifTimestamp for Status { + fn timestamp_millis(&self) -> u64 { + // this may not work well for unseen status tracking, + // if ancient statuses were to appear in the timeline :( + self.created_at.timestamp_millis().max(0) as u64 + } +} + +impl GroupHandle { + pub async fn run(&mut self) -> Result<(), GroupError> { + assert!(PERIODIC_SAVE >= PING_INTERVAL); + + loop { + debug!("Opening streaming API socket"); + let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start + let mut events = self.client.streaming_user().await?; + let socket_open_time = Instant::now(); + let mut last_rx = Instant::now(); + let mut last_ping = Instant::now(); + + match self.catch_up_with_missed_notifications().await { + Ok(true) => { + debug!("Some missed notifs handled"); + } + Ok(false) => { + debug!("No notifs missed"); + } + Err(e) => { + error!("Failed to handle missed notifs: {}", e); + } + } + + match self.catch_up_with_missed_statuses().await { + Ok(true) => { + debug!("Some missed statuses handled"); + } + Ok(false) => { + debug!("No statuses missed"); + } + Err(e) => { + error!("Failed to handle missed statuses: {}", e); + } + } + + if self.config.is_dirty() { + // save asap + next_save = Instant::now() - PERIODIC_SAVE + } + + 'rx: loop { + if next_save < Instant::now() { + trace!("Save time elapsed, saving if needed"); + self.save_if_needed().await.log_error("Failed to save group"); + next_save = Instant::now() + PERIODIC_SAVE; + } + + if last_rx.elapsed() > PING_INTERVAL * 2 { + warn!("Socket idle too long, close"); + break 'rx; + } + + if socket_open_time.elapsed() > Duration::from_secs(120) { + debug!("Socket open too long, closing"); + break 'rx; + } + + trace!("Waiting for message"); + let timeout = next_save + .saturating_duration_since(Instant::now()) + .min(PING_INTERVAL) + .max(Duration::from_secs(1)); + + match tokio::time::timeout(timeout, events.next()).await { + Ok(Some(event)) => { + last_rx = Instant::now(); + debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); + match event { + Event::Update(status) => { + self.handle_status(status).await.log_error("Error handling a status"); + } + Event::Notification(n) => { + self.handle_notification(n).await.log_error("Error handling a notification"); + } + Event::Delete(_id) => {} + Event::FiltersChanged => {} + Event::Heartbeat => {} + } + } + Ok(None) => { + warn!("Group @{} socket closed, restarting...", self.config.get_acct()); + break 'rx; + } + Err(_) => { + // Timeout so we can save if needed + } + } + + if last_ping.elapsed() > PING_INTERVAL { + last_ping = Instant::now(); + trace!("Pinging"); + if events.send_ping() + .await.is_err() { + break 'rx; + } + } + } + + warn!("Notif stream closed, will reopen"); + tokio::time::sleep(DELAY_REOPEN_STREAM).await; + } + } + + async fn handle_notification(&mut self, n: Notification) -> Result<(), GroupError> { + debug!("Handling notif #{}", n.id); + let ts = n.timestamp_millis(); + self.config.set_last_notif(ts); + + let group_acct = self.config.get_acct().to_string(); + let notif_user_id = &n.account.id; + let notif_acct = normalize_acct(&n.account.acct, &group_acct)?; + + if notif_acct == group_acct { + debug!("This is our post, ignore that"); + return Ok(()); + } + + if self.config.is_banned(¬if_acct) { + warn!("Notification actor {} is banned!", notif_acct); + return Ok(()); + } + + match n.notification_type { + NotificationType::Mention => { + if let Some(status) = n.status { + self.handle_mention_status(status).await?; + } + } + NotificationType::Follow => { + info!("New follower!"); + + // Just greet the user always + self.handle_new_follow(¬if_acct, notif_user_id).await; + + // if self.config.is_member_or_admin(¬if_acct) { + // // Already joined, just doing something silly, ignore this + // debug!("User already a member, ignoring"); + // } else { + // + // } + } + NotificationType::Favourite => {} + NotificationType::Reblog => {} + NotificationType::Other(_) => {} + } + + Ok(()) + } + + /// Handle a non-mention status + async fn handle_status(&mut self, s: Status) -> Result<(), GroupError> { + debug!("Handling status #{}", s.id); + let ts = s.timestamp_millis(); + self.config.set_last_status(ts); + + if s.visibility.is_private() { + debug!("Status is direct/private, discard"); + return Ok(()); + } + + if !s.content.contains('#') { + debug!("No tags in status, discard"); + return Ok(()); + } + + let group_user = self.config.get_acct(); + let status_user = normalize_acct(&s.account.acct, group_user)?; + + if status_user == group_user { + debug!("This is our post, discard"); + return Ok(()); + } + + if s.content.contains("/add ") + || s.content.contains("/remove ") + || s.content.contains("\\add ") + || s.content.contains("\\remove ") + { + debug!("Looks like a hashtag manipulation command, discard"); + return Ok(()); + } + + if self.config.is_banned(&status_user) { + debug!("Status author @{} is banned.", status_user); + return Ok(()); + } + + if !self.config.is_member_or_admin(&status_user) { + debug!("Status author @{} is not a member.", status_user); + return Ok(()); + } + + let tags = crate::command::parse_status_tags(&s.content); + debug!("Tags in status: {:?}", tags); + + 'tags: for t in tags { + if self.config.is_tag_followed(&t) { + info!("REBLOG #{} STATUS", &t); + self.client.reblog(&s.id).await + .log_error("Failed to reblog"); + break 'tags; // do not reblog multiple times! + } + } + + Ok(()) + } + + async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { + self.client.follow(id).await?; + Ok(()) + } + + /// Catch up with missed notifications, returns true if any were handled + async fn catch_up_with_missed_notifications(&mut self) -> Result { + let last_notif = self.config.get_last_notif(); + + let notifications = self.client.notifications().await?; + let mut iter = notifications.items_iter(); + + let mut notifs_to_handle = vec![]; + + // They are retrieved newest first, but we want oldest first for chronological handling + + let mut num = 0; + while let Some(n) = iter.next_item().await { + let ts = n.timestamp_millis(); + if ts <= last_notif { + break; // reached our last seen notif + } + + debug!("Inspecting notif {}", NotificationDisplay(&n)); + notifs_to_handle.push(n); + num += 1; + if num > MAX_CATCHUP_NOTIFS { + warn!("Too many notifs missed to catch up!"); + break; + } + + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_millis(250)).await; + } + + if notifs_to_handle.is_empty() { + return Ok(false); + } + + notifs_to_handle.reverse(); + + debug!("{} notifications to catch up!", notifs_to_handle.len()); + + for n in notifs_to_handle { + debug!("Handling missed notification: {}", NotificationDisplay(&n)); + self.handle_notification(n).await.log_error("Error handling a notification"); + } + + Ok(true) + } + + /// Catch up with missed statuses, returns true if any were handled + async fn catch_up_with_missed_statuses(&mut self) -> Result { + let last_status = self.config.get_last_status(); + + let statuses = self.client.get_home_timeline().await?; + let mut iter = statuses.items_iter(); + + let mut statuses_to_handle = vec![]; + + // They are retrieved newest first, but we want oldest first for chronological handling + + let mut newest_status = None; + + let mut num = 0; + while let Some(s) = iter.next_item().await { + let ts = s.timestamp_millis(); + if ts <= last_status { + break; // reached our last seen status (hopefully there arent any retro-bumped) + } + + debug!("Inspecting status {}", StatusDisplay(&s)); + + if newest_status.is_none() { + newest_status = Some(ts); + } + + if s.content.contains('#') && !s.visibility.is_private() { + statuses_to_handle.push(s); + } + num += 1; + if num > MAX_CATCHUP_STATUSES { + warn!("Too many statuses missed to catch up!"); + break; + } + + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_millis(250)).await; + } + + if let Some(ts) = newest_status { + self.config.set_last_status(ts); + } + + if statuses_to_handle.is_empty() { + return Ok(false); + } + + statuses_to_handle.reverse(); + + debug!("{} statuses to catch up!", statuses_to_handle.len()); + + for s in statuses_to_handle { + debug!("Handling missed status: {}", StatusDisplay(&s)); + self.handle_status(s).await + .log_error("Error handling a status"); + } + + Ok(true) + } + + async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { + let res = ProcessMention::run(self, status).await; + + self.save_if_needed().await + .log_error("Failed to save"); + + res + } + + async fn handle_new_follow(&mut self, notif_acct: &str, notif_user_id: &str) { + let mut follow_back = false; + let text = if self.config.is_member_only() { + // Admins are listed without @, so they won't become handles here. + // Tagging all admins would be annoying. + let mut admins = self.config.get_admins().cloned().collect::>(); + admins.sort(); + + format!("\ + @{user} Welcome! This group has posting restricted to members. \ + If you'd like to join, please ask one of the group admins:\n\ + {admins}", + user = notif_acct, + admins = admins.join(", ") + ) + } else { + follow_back = true; + format!("\ + @{user} Welcome to the group! The group user will now follow you back to complete the sign-up. \ + To share a post, tag the group user or use one of the group hashtags.\n\n\ + Use /help for more info.", + user = notif_acct + ) + }; + + let post = StatusBuilder::new() + .status(text) + .content_type("text/markdown") + .visibility(Visibility::Direct) + .build() + .expect("error build status"); + + self.client.new_status(post).await + .log_error("Failed to post"); + + if follow_back { + self.follow_user(notif_user_id).await + .log_error("Failed to follow back"); + } + } +} diff --git a/src/main.rs b/src/main.rs index 36721d3..a13c054 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use crate::utils::acct_to_server; mod command; mod error; -mod group_handle; +mod group_handler; mod store; mod utils; diff --git a/src/store/data.rs b/src/store/data.rs index c921a7f..6e0ac01 100644 --- a/src/store/data.rs +++ b/src/store/data.rs @@ -11,9 +11,9 @@ pub(crate) struct Config { } impl Config { - pub(crate) fn iter_groups(&self) -> impl Iterator { - self.groups.values() - } + // pub(crate) fn iter_groups(&self) -> impl Iterator { + // self.groups.values() + // } pub(crate) fn get_group_config(&self, acct: &str) -> Option<&GroupConfig> { self.groups.get(acct) @@ -103,8 +103,10 @@ impl GroupConfig { } pub(crate) fn set_appdata(&mut self, appdata: AppData) { + if self.appdata != appdata { + self.mark_dirty(); + } self.appdata = appdata; - self.mark_dirty(); } pub(crate) fn get_admins(&self) -> impl Iterator { @@ -120,8 +122,10 @@ impl GroupConfig { } pub(crate) fn set_last_notif(&mut self, ts: u64) { + if self.last_notif_ts != ts { + self.mark_dirty(); + } self.last_notif_ts = self.last_notif_ts.max(ts); - self.mark_dirty(); } pub(crate) fn get_last_notif(&self) -> u64 { @@ -129,8 +133,10 @@ impl GroupConfig { } pub(crate) fn set_last_status(&mut self, ts: u64) { + if self.last_status_ts != ts { + self.mark_dirty(); + } self.last_status_ts = self.last_status_ts.max(ts); - self.mark_dirty(); } pub(crate) fn get_last_status(&self) -> u64 { @@ -177,67 +183,81 @@ impl GroupConfig { } pub(crate) fn set_admin(&mut self, acct: &str, admin: bool) -> Result<(), GroupError> { - if admin { + let change = if admin { if self.is_banned(acct) { return Err(GroupError::UserIsBanned); } - self.admin_users.insert(acct.to_owned()); + self.admin_users.insert(acct.to_owned()) } else { - self.admin_users.remove(acct); + self.admin_users.remove(acct) + }; + if change { + self.mark_dirty(); } - self.mark_dirty(); Ok(()) } pub(crate) fn set_member(&mut self, acct: &str, member: bool) -> Result<(), GroupError> { - if member { + let change = if member { if self.is_banned(acct) { return Err(GroupError::UserIsBanned); } - self.member_users.insert(acct.to_owned()); + self.member_users.insert(acct.to_owned()) } else { - self.member_users.remove(acct); + self.member_users.remove(acct) + }; + if change { + self.mark_dirty(); } - self.mark_dirty(); Ok(()) } pub(crate) fn ban_user(&mut self, acct: &str, ban: bool) -> Result<(), GroupError> { + let mut change = false; if ban { if self.is_admin(acct) { return Err(GroupError::UserIsAdmin); } - self.banned_users.insert(acct.to_owned()); + // Banned user is also kicked + change |= self.member_users.remove(acct); + change |= self.banned_users.insert(acct.to_owned()); } else { - self.banned_users.remove(acct); + change |= self.banned_users.remove(acct); + } + if change { + self.mark_dirty(); } Ok(()) } pub(crate) fn ban_server(&mut self, server: &str, ban: bool) -> Result<(), GroupError> { - if ban { + let changed = if ban { for acct in &self.admin_users { let acct_server = acct_to_server(acct); if acct_server == server { return Err(GroupError::AdminsOnServer); } } - self.banned_servers.insert(server.to_owned()); + self.banned_servers.insert(server.to_owned()) } else { - self.banned_servers.remove(server); + self.banned_servers.remove(server) + }; + if changed { + self.mark_dirty(); } - self.mark_dirty(); Ok(()) } pub(crate) fn add_tag(&mut self, tag: &str) { - self.group_tags.insert(tag.to_string()); - self.mark_dirty(); + if self.group_tags.insert(tag.to_string()) { + self.mark_dirty(); + } } pub(crate) fn remove_tag(&mut self, tag: &str) { - self.group_tags.remove(tag); - self.mark_dirty(); + if self.group_tags.remove(tag) { + self.mark_dirty(); + } } pub(crate) fn is_tag_followed(&self, tag: &str) -> bool { @@ -245,8 +265,10 @@ impl GroupConfig { } pub(crate) fn set_member_only(&mut self, member_only: bool) { + if self.member_only != member_only { + self.mark_dirty(); + } self.member_only = member_only; - self.mark_dirty(); } pub(crate) fn is_member_only(&self) -> bool { diff --git a/src/store/mod.rs b/src/store/mod.rs index 123aab4..9d9ba84 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -8,7 +8,7 @@ use tokio::sync::RwLock; use data::{Config, GroupConfig}; use crate::error::GroupError; -use crate::group_handle::GroupHandle; +use crate::group_handler::GroupHandle; use std::time::Duration; pub(crate) mod data; @@ -162,11 +162,11 @@ impl ConfigStore { //noinspection RsSelfConvention /// Set group config to the store. The store then saved. pub(crate) async fn set_group_config(&self, config: GroupConfig) -> Result<(), GroupError> { - debug!("Locking mutex"); + trace!("Locking mutex"); if let Ok(mut data) = tokio::time::timeout(Duration::from_secs(1), self.data.write()).await { - debug!("Locked"); + trace!("Locked"); data.set_group_config(config); - debug!("Writing file"); + trace!("Writing file"); self.persist(&data).await?; } else { error!("DEADLOCK? Timeout waiting for data RW Lock in settings store"); diff --git a/src/utils.rs b/src/utils.rs index 87f7af4..88ce3b3 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,8 @@ use std::borrow::Cow; use std::error::Error; +use elefren::status_builder::Visibility; + use crate::error::GroupError; pub trait LogError { @@ -82,3 +84,14 @@ mod test { assert_eq!(Err(GroupError::BadConfig("_".into())), normalize_acct("piggo", "uhh")); } } + +pub trait VisExt: Copy { + /// Check if is private or direct + fn is_private(self) -> bool; +} + +impl VisExt for Visibility { + fn is_private(self) -> bool { + self == Visibility::Direct || self == Visibility::Private + } +} From d02b47f75052de2c72ef8146ee79339c74da30f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Thu, 26 Aug 2021 22:15:41 +0200 Subject: [PATCH 4/4] change elefren linking --- CHANGELOG.md | 15 +++++++++++++++ Cargo.lock | 1 + Cargo.toml | 9 ++------- 3 files changed, 18 insertions(+), 7 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..9df49b5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,15 @@ +# Changelog + +## v0.2 + +- Add hashtag boosting and back-follow/unfollow +- Add hashtag commands +- Code reorganization +- Improve greeting messages +- Do not crash on unknown notification types (elefren patch) + +## v0.1 + +This is the initial test release + +. diff --git a/Cargo.lock b/Cargo.lock index 8cf9def..561e833 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,7 @@ checksum = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" [[package]] name = "elefren" version = "0.22.0" +source = "git+https://git.ondrovo.com/MightyPork/elefren-fork.git?rev=54a0e55#54a0e55964784368864f36580c5630f730bf72dc" dependencies = [ "chrono", "doc-comment", diff --git a/Cargo.toml b/Cargo.toml index 8449619..9bec613 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,17 +9,14 @@ build = "build.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -#elefren = { version = "0.22.0", features = ["toml"] } -elefren = { path = "../elefren22-fork" } -#elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git" } +#elefren = { path = "../elefren22-fork" } +elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git", rev = "54a0e55" } env_logger = "0.9.0" -#elefren = { path = "../elefren-fork" } log = "0.4.14" serde = "1" serde_json = "1" -#smart-default = "0.6.0" anyhow = "1" clap = "2.33.0" tokio = { version = "1", features = ["full"] } @@ -32,5 +29,3 @@ once_cell = "1.8.0" native-tls = "0.2.8" websocket = "0.26.2" - -#tokio = { version = "0.2.22", features = ["full"] }