From de3fd4e72927d922592b26e589dcd822b92373d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Tue, 5 Oct 2021 10:39:10 +0200 Subject: [PATCH] improvements, more config, add -q, readme --- .gitignore | 4 +- CHANGELOG.md | 6 + Cargo.lock | 4 +- Cargo.toml | 4 +- README.md | 111 ++++++-- src/command.rs | 16 +- src/error.rs | 3 - src/group_handler/handle_mention.rs | 358 +++++++++++++------------ src/group_handler/mod.rs | 181 ++++++------- src/main.rs | 16 +- src/store/common_config.rs | 39 +++ src/store/{data.rs => group_config.rs} | 173 ++++++------ src/store/mod.rs | 64 +++-- src/utils.rs | 4 +- 14 files changed, 558 insertions(+), 425 deletions(-) create mode 100644 src/store/common_config.rs rename src/store/{data.rs => group_config.rs} (89%) diff --git a/.gitignore b/.gitignore index feee3e1..aff1264 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ group-actor-data.toml groups.json fedigroups *.bak -groups.d/ \ No newline at end of file +*.old +/groups +/groups.d diff --git a/CHANGELOG.md b/CHANGELOG.md index b3c5195..13cb2f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v0.3.0 +- Changed config/storage format to directory-based, removed shared config mutex +- Made more options configurable (timeouts, catch-up limits, etc) +- Changed default log level to Debug, added `-q` to reduce it (opposite of `-v`) +- Code cleaning + ## v0.2.8 - fix error processing statuses when a misskey poll has infinite run time diff --git a/Cargo.lock b/Cargo.lock index e1a6059..a7d22d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,7 +276,7 @@ checksum = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" [[package]] name = "elefren" version = "0.22.0" -source = "git+https://git.ondrovo.com/MightyPork/elefren-fork.git?rev=51e52bc24b33a4a15477b019d062c3a683a38ba4#51e52bc24b33a4a15477b019d062c3a683a38ba4" +source = "git+https://git.ondrovo.com/MightyPork/elefren-fork.git?rev=b10e5935ae32f4756b19e9ca58b78a5382f865d1#b10e5935ae32f4756b19e9ca58b78a5382f865d1" dependencies = [ "chrono", "doc-comment", @@ -328,7 +328,7 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" [[package]] name = "fedigroups" -version = "0.2.8" +version = "0.3.0" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index e57ae17..1444b91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fedigroups" -version = "0.2.8" +version = "0.3.0" authors = ["Ondřej Hruška "] edition = "2018" publish = false @@ -10,7 +10,7 @@ build = "build.rs" [dependencies] #elefren = { path = "../elefren22-fork" } -elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git", rev = "51e52bc24b33a4a15477b019d062c3a683a38ba4" } +elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git", rev = "b10e5935ae32f4756b19e9ca58b78a5382f865d1" } env_logger = "0.9.0" diff --git a/README.md b/README.md index 69eaaed..be3acc7 100644 --- a/README.md +++ b/README.md @@ -44,33 +44,92 @@ In case you need to re-authenticate an existing group, do the same but use `-A` ### Editing config -**Do not edit the config while the group service is running, it may overwrite your changes!** +**JSON does not support comments! Remove comments before using examples copied from this guide!** -Each group is stored as a sub-directory in `groups.d`. The sub-directories are normally named after their accounts, -but this is not required. +A typical setup could look like this: -The group's config and state is split into three files: +``` +├── groups.d +│ ├── betty@piggo.space +│ │ ├── config.json +│ │ ├── control.json +│ │ └── state.json +│ └── chatterbox@botsin.space +│ ├── config.json +│ ├── control.json +│ └── state.json +└── groups.json +``` + +#### Common config + +There is one shared config file: `groups.json` -- `config.json` - immutable config, never changed beside when you run the -A command to reauth the group. - This is where the auth token and the `enabled` flag are stored. -- `control.json` - settings and state that changes at runtime or can be set using slash commands. +- If the file does not exist, default settings are used. This is usually good enough. +- This file applies to all groups +- Prior to 0.3, the groups were also configured here. +- Running 0.3+ with the old file will produce an error, you need to update the config before continuing - move groups to subfolders + +``` +{ + // Max number of missed notifs to process after connect + max_catchup_notifs: 30, + // Max number of missed statuses to process after connect + max_catchup_statuses: 50, + // Delay between fetched pages when catching up + delay_fetch_page_s: 0.25, + // Delay after sending a status, making a follow or some other action. + // Set if there are Throttled errors and you need to slow the service down. + delay_after_post_s: 0.0, + // Delay before trying to re-connect after the server closed the socket + delay_reopen_closed_s: 0.5, + // Delay before trying to re-connect after an error + delay_reopen_error_s: 5.0, + // Timeout for a notification/timeline socket to be considered alive. + // If nothing arrives in this interval, reopen it. Some servers have a buggy socket + // implementation where it stays open but no longer works. + socket_alive_timeout_s: 30.0, + // Time after which a socket is always closed, even if seemingly alive. + // This is a work-around for servers that stop sending notifs after a while. + socket_retire_time_s: 120.0, +} +``` + +#### Per-group config + +Each group is stored as a sub-directory of `groups.d/`. The sub-directories are normally named after their accounts, +but this is not required. For example, `groups.d/betty@piggo.space/`. + +The group's config and state is split into three files in a way that minimizes the risk of data loss. + +Only the `config.json` file with credentials is required; the others will be created as needed by the group daemon. + +- `config.json` - immutable config, never changed beside when you run the `-A` command to reauth a group. + This is where the account name, the auth token and the `enabled` flag are stored. +- `control.json` - settings and state that change at runtime or can be set using slash commands. This file is overwritten by the group service when needed. - `state.json` - frequently changing state data. The last-seen status/notification timestamps are kept here. - State is split from Control to reduce the risk of damaging the control file. Timestamps can be updated multiple times + State is split from Control to limit the write frequency of the control file. Timestamps can be updated multiple times per minute. -The JSON files are easily editable, you can e.g. add yourself as an admin (use the e-mail format, e.g. `piggo@piggo.space`). +**Do not edit the control and state files while the group service is running, it may overwrite your changes!** + +The JSON files are easily editable, you can e.g. add yourself as an admin (use the e-mail format, e.g. `piggo@piggo.space`). +Note that changing config externally requires a restart. It's better to use slash commands and update the config at run-time. -When adding hashtags, note that *they must be entered as lowercase*! +When adding hashtags, *they must be entered as lowercase* and without the `#` symbol! -The file format is quite self-explanatory. +The file formats are quite self-explanatory (again, remove comments before copying, JSON does not support comments!) -#### config.json +**config.json** ```json { + // Enable or disable the group service "enabled": true, + // Group account name "acct": "group@myserver.xyz", + // Saved mastodon API credentials "appdata": { "base": "https://myserver.xyz", "client_id": "...", @@ -81,31 +140,35 @@ The file format is quite self-explanatory. } ``` -#### control.json +**control.json** ```json { + // List of group hashtags, lowercase. + // The group reblogs anything with these hashtags if the author is a member. "group_tags": [ "grouptest" ], + // List of admin users (e-mail format) "admin_users": [ "admin@myserver.xyz" ], + // Restrict write access to manually added members "member_only": false, + // List of member users (e-mail format) "member_users": [], + // List of banned users (e-mail format), their posts and actions are ignored by the group "banned_users": [], + // List of banned servers, users from there can't interact with the group and their posts can't be shared. "banned_servers": [ "bad-stuff-here.cc" ] } ``` -- `group_tags` - group hashtags (without the `#`), lowercase! The group reblogs anything with these hashtags if the author is a member. -- `member_users` - group members, used to track whose hashtags should be reblogged; in member-only groups, this is also a user whitelist. -- `banned_users` - can't post or interact with the group service -- `banned_servers` - work like an instance block +**state.json** -#### state.json +Internal use, millisecond timestamps of the last-seen status and notification. ```json { @@ -116,12 +179,15 @@ The file format is quite self-explanatory. ### Running -To run the group service, simply run it with no arguments. It will find groups in `groups.d` and start the service threads for you. +To run the group service, simply run it with no arguments. +It will read the `groups.json` file (if present), find groups in `groups.d/` and start the services for you. -Note that the control and status files must be writable, they are updated at run-time. Config files can have restricted permissions -to avoid accidental overwrite. +Note that the control and status files must be writable, they are updated at run-time. +Config files can have limited permissions to avoid accidental overwrite. -An example systemd service file is included in the repository as well. Make sure to set up the system user/group and file permissions according to your needs. You can use targets in the included `Makefile` to manage the systemd service and look at logs. +An example systemd service file is included in the repository as well. +Make sure to set up the system user/group and file permissions according to your needs. +You can use targets in the included `Makefile` to manage the systemd service and look at logs. ## Group usage @@ -145,7 +211,6 @@ These won't be shared: - `ducks suck` - `@group #ducks /i` (anything with the "ignore" command is ignored) - `@group /remove #ducks` (admin command, even if it includes a group hashtag) -- `@otheruser tell me about #ducks` (in a thread) - `@otheruser @group tell me about ducks` (in a thread) ### Commands diff --git a/src/command.rs b/src/command.rs index ad8717c..9bcb6f9 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,6 +1,6 @@ +use crate::utils; use once_cell::sync::Lazy; use regex::Regex; -use crate::utils; #[derive(Debug, Clone, PartialEq)] pub enum StatusCommand { @@ -101,7 +101,8 @@ static RE_UNBAN_SERVER: once_cell::sync::Lazy = Lazy::new(|| command!(r"u static RE_ADD_MEMBER: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:add|follow)\s+", p_user!())); -static RE_REMOVE_MEMBER: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:kick|unfollow|remove)\s+", p_user!())); +static RE_REMOVE_MEMBER: once_cell::sync::Lazy = + Lazy::new(|| command!(r"(?:kick|unfollow|remove)\s+", p_user!())); static RE_ADD_TAG: once_cell::sync::Lazy = Lazy::new(|| command!(r"(?:add|follow)\s+", p_hashtag!())); @@ -134,8 +135,7 @@ static RE_PING: once_cell::sync::Lazy = Lazy::new(|| command!(r"ping")); static RE_ANNOUNCE: once_cell::sync::Lazy = Lazy::new(|| Regex::new(r"(?:^|\s|>|\n)[\\/]announce\s+(.*)$").unwrap()); -static RE_A_HASHTAG: once_cell::sync::Lazy = - Lazy::new(|| Regex::new(r"(?:^|\b|\s|>|\n)#(\w+)").unwrap()); +static RE_A_HASHTAG: once_cell::sync::Lazy = Lazy::new(|| Regex::new(r"(?:^|\b|\s|>|\n)#(\w+)").unwrap()); pub static RE_NOBOT_TAG: once_cell::sync::Lazy = Lazy::new(|| Regex::new(r"(?:^|\b|\s|>|\n)#nobot(?:\b|$)").unwrap()); @@ -336,11 +336,15 @@ pub fn parse_slash_commands(content: &str) -> Vec { #[cfg(test)] mod test { - use crate::command::{parse_slash_commands, RE_A_HASHTAG, RE_HASHTAG_TRIGGERING_PLEROMA_BUG, RE_NOBOT_TAG, RE_ADD_TAG, RE_JOIN, StatusCommand}; + use crate::command::{ + parse_slash_commands, StatusCommand, RE_ADD_TAG, RE_A_HASHTAG, RE_HASHTAG_TRIGGERING_PLEROMA_BUG, RE_JOIN, + RE_NOBOT_TAG, + }; use super::{ RE_ADD_MEMBER, RE_ANNOUNCE, RE_BAN_SERVER, RE_BAN_USER, RE_BOOST, RE_CLOSE_GROUP, RE_GRANT_ADMIN, RE_HELP, - RE_IGNORE, RE_LEAVE, RE_OPTOUT, RE_OPTIN, RE_MEMBERS, RE_OPEN_GROUP, RE_REMOVE_MEMBER, RE_REVOKE_ADMIN, RE_TAGS, RE_UNDO, + RE_IGNORE, RE_LEAVE, RE_MEMBERS, RE_OPEN_GROUP, RE_OPTIN, RE_OPTOUT, RE_REMOVE_MEMBER, RE_REVOKE_ADMIN, + RE_TAGS, RE_UNDO, }; #[test] diff --git a/src/error.rs b/src/error.rs index 952ba59..dd3cb2b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,8 +10,6 @@ pub enum GroupError { UserOptedOut, #[error("Server could not be banned because there are admin users on it")] AdminsOnServer, - #[error("Group config is missing in the config store")] - GroupNotExist, #[error("Config error: {0}")] BadConfig(Cow<'static, str>), #[error("API request timed out")] @@ -32,7 +30,6 @@ impl PartialEq for GroupError { (Self::UserIsAdmin, Self::UserIsAdmin) | (Self::UserIsBanned, Self::UserIsBanned) | (Self::AdminsOnServer, Self::AdminsOnServer) - | (Self::GroupNotExist, Self::GroupNotExist) | (Self::BadConfig(_), Self::BadConfig(_)) ) } diff --git a/src/group_handler/handle_mention.rs b/src/group_handler/handle_mention.rs index c266f0a..c560f61 100644 --- a/src/group_handler/handle_mention.rs +++ b/src/group_handler/handle_mention.rs @@ -1,22 +1,25 @@ +use std::cmp::Ordering; use std::collections::HashSet; use std::time::Duration; -use elefren::{FediClient, SearchType, StatusBuilder}; use elefren::entities::account::Account; use elefren::entities::prelude::Status; use elefren::status_builder::Visibility; +use elefren::{FediClient, SearchType, StatusBuilder}; -use crate::command::{RE_NOBOT_TAG, StatusCommand}; +use crate::command::{StatusCommand, RE_NOBOT_TAG}; use crate::error::GroupError; use crate::group_handler::GroupHandle; -use crate::store::data::GroupConfig; -use crate::utils::{LogError, normalize_acct}; +use crate::store::group_config::GroupConfig; +use crate::store::CommonConfig; use crate::utils; +use crate::utils::{normalize_acct, LogError}; pub struct ProcessMention<'a> { status: Status, group_account: &'a Account, config: &'a mut GroupConfig, + cc: &'a CommonConfig, client: &'a mut FediClient, group_acct: String, status_acct: String, @@ -33,13 +36,13 @@ impl<'a> ProcessMention<'a> { async fn lookup_acct_id(&self, acct: &str, followed: bool) -> Result, GroupError> { debug!("Looking up user ID by acct: {}", acct); - match tokio::time::timeout(Duration::from_secs(5), self.client.search_v2( - acct, - !followed, - Some(SearchType::Accounts), - Some(1), - followed, - )).await { + match tokio::time::timeout( + Duration::from_secs(5), + self.client + .search_v2(acct, !followed, Some(SearchType::Accounts), Some(1), followed), + ) + .await + { Err(_) => { warn!("Account lookup timeout!"); Err(GroupError::ApiTimeout) @@ -50,6 +53,7 @@ impl<'a> ProcessMention<'a> { } Ok(Ok(res)) => { for item in res.accounts { + // XXX limit is 1! let acct_normalized = normalize_acct(&item.acct, &self.group_acct)?; if acct_normalized == acct { debug!("Search done, account found: {}", item.acct); @@ -91,12 +95,14 @@ impl<'a> ProcessMention<'a> { async fn follow_user_by_id(&self, id: &str) -> Result<(), GroupError> { debug!("Trying to follow user #{}", id); self.client.follow(id).await?; + self.delay_after_post().await; Ok(()) } async fn unfollow_user_by_id(&self, id: &str) -> Result<(), GroupError> { debug!("Trying to unfollow user #{}", id); self.client.unfollow(id).await?; + self.delay_after_post().await; Ok(()) } @@ -113,6 +119,7 @@ impl<'a> ProcessMention<'a> { group_account: &gh.group_account, status_user_id: status.account.id.to_string(), client: &mut gh.client, + cc: &gh.cc, can_write: gh.config.can_write(&status_acct), is_admin: gh.config.is_admin(&status_acct), replies: vec![], @@ -129,16 +136,15 @@ impl<'a> ProcessMention<'a> { } async fn reblog_status(&self) { - self.client.reblog(&self.status.id) - .await - .log_error("Failed to reblog status") + self.client.reblog(&self.status.id).await.log_error("Failed to reblog status"); + self.delay_after_post().await; } fn add_reply(&mut self, line: impl AsRef) { self.replies.push(line.as_ref().trim_matches(' ').to_string()) } - fn add_announcement<'t>(&mut self, line: impl AsRef) { + fn add_announcement(&mut self, line: impl AsRef) { self.announcements.push(line.as_ref().trim_matches(' ').to_string()) } @@ -156,8 +162,7 @@ impl<'a> ProcessMention<'a> { for cmd in commands { match cmd { StatusCommand::Undo => { - self.cmd_undo().await - .log_error("Error handling undo cmd"); + self.cmd_undo().await.log_error("Error handling undo cmd"); } StatusCommand::Ignore => { unreachable!(); // Handled above @@ -169,12 +174,10 @@ impl<'a> ProcessMention<'a> { self.cmd_boost().await; } StatusCommand::BanUser(u) => { - self.cmd_ban_user(&u).await - .log_error("Error handling ban-user cmd"); + self.cmd_ban_user(&u).await.log_error("Error handling ban-user cmd"); } StatusCommand::UnbanUser(u) => { - self.cmd_unban_user(&u).await - .log_error("Error handling unban-user cmd"); + self.cmd_unban_user(&u).await.log_error("Error handling unban-user cmd"); } StatusCommand::OptOut => { self.cmd_optout().await; @@ -189,12 +192,10 @@ impl<'a> ProcessMention<'a> { self.cmd_unban_server(&s).await; } StatusCommand::AddMember(u) => { - self.cmd_add_member(&u).await - .log_error("Error handling add-member cmd"); + self.cmd_add_member(&u).await.log_error("Error handling add-member cmd"); } StatusCommand::RemoveMember(u) => { - self.cmd_remove_member(&u).await - .log_error("Error handling remove-member cmd"); + self.cmd_remove_member(&u).await.log_error("Error handling remove-member cmd"); } StatusCommand::AddTag(tag) => { self.cmd_add_tag(tag).await; @@ -203,12 +204,10 @@ impl<'a> ProcessMention<'a> { self.cmd_remove_tag(tag).await; } StatusCommand::GrantAdmin(u) => { - self.cmd_grant_admin(&u).await - .log_error("Error handling grant-admin cmd"); + self.cmd_grant_admin(&u).await.log_error("Error handling grant-admin cmd"); } StatusCommand::RemoveAdmin(u) => { - self.cmd_revoke_member(&u).await - .log_error("Error handling grant-admin cmd"); + self.cmd_revoke_member(&u).await.log_error("Error handling grant-admin cmd"); } StatusCommand::OpenGroup => { self.cmd_open_group().await; @@ -239,13 +238,14 @@ impl<'a> ProcessMention<'a> { } if self.do_boost_prev_post { - if let (Some(prev_acct_id), Some(prev_status_id)) = (self.status.in_reply_to_account_id.as_ref(), self.status.in_reply_to_id.as_ref()) { + if let (Some(prev_acct_id), Some(prev_status_id)) = ( + self.status.in_reply_to_account_id.as_ref(), + self.status.in_reply_to_id.as_ref(), + ) { match self.id_to_acct_check_boostable(prev_acct_id).await { Ok(_acct) => { - self.client - .reblog(prev_status_id) - .await - .log_error("Failed to boost"); + self.client.reblog(prev_status_id).await.log_error("Failed to boost"); + self.delay_after_post().await; } Err(e) => { warn!("Can't reblog: {}", e); @@ -281,7 +281,7 @@ impl<'a> ProcessMention<'a> { Ok(()) } - async fn send_reply_multipart(&self, mention : String, msg : String) -> Result<(), GroupError> { + async fn send_reply_multipart(&self, mention: String, msg: String) -> Result<(), GroupError> { let parts = smart_split(&msg, Some(mention), self.config.get_character_limit()); let mut parent = self.status.id.clone(); @@ -298,6 +298,7 @@ impl<'a> ProcessMention<'a> { .build() { let status = self.client.new_status(post).await?; + self.delay_after_post().await; parent = status.id; } @@ -308,27 +309,23 @@ impl<'a> ProcessMention<'a> { Ok(()) } - async fn send_announcement_multipart(&self, msg : &str) -> Result<(), GroupError> { + async fn send_announcement_multipart(&self, msg: &str) -> Result<(), GroupError> { let parts = smart_split(msg, None, self.config.get_character_limit()); let mut parent = None; for p in parts { let mut builder = StatusBuilder::new(); - builder - .status(p) - .content_type("text/markdown") - .visibility(Visibility::Public); + builder.status(p).content_type("text/markdown").visibility(Visibility::Public); if let Some(p) = parent.as_ref() { builder.in_reply_to(p); } - let post = builder - .build() - .expect("error build status"); + let post = builder.build().expect("error build status"); let status = self.client.new_status(post).await?; + self.delay_after_post().await; parent = Some(status.id); // Sleep a bit to avoid throttling @@ -391,26 +388,28 @@ impl<'a> ProcessMention<'a> { } async fn cmd_undo(&mut self) -> Result<(), GroupError> { - if let (Some(ref parent_account_id), Some(ref parent_status_id)) = (&self.status.in_reply_to_account_id, &self.status.in_reply_to_id) { + if let (Some(ref parent_account_id), Some(ref parent_status_id)) = + (&self.status.in_reply_to_account_id, &self.status.in_reply_to_id) + { if parent_account_id == &self.group_account.id { // This is a post sent by the group user, likely an announcement. // Undo here means delete it. if self.is_admin { info!("Deleting group post #{}", parent_status_id); self.client.delete_status(parent_status_id).await?; + self.delay_after_post().await; } else { warn!("Only admin can delete posts made by the group user"); } + } else if self.is_admin || parent_account_id == &self.status_user_id { + info!("Un-reblogging post #{}", parent_status_id); + // User unboosting own post boosted by accident, or admin doing it + self.client.unreblog(parent_status_id).await?; + self.delay_after_post().await; } else { - if self.is_admin || parent_account_id == &self.status_user_id { - info!("Un-reblogging post #{}", parent_status_id); - // User unboosting own post boosted by accident, or admin doing it - self.client.unreblog(parent_status_id).await?; - } else { - warn!("Only the author and admins can undo reblogs"); - // XXX this means when someone /b's someone else's post to a group, - // they then can't reverse that (only admin or the post's author can) - } + warn!("Only the author and admins can undo reblogs"); + // XXX this means when someone /b's someone else's post to a group, + // they then can't reverse that (only admin or the post's author can) } } @@ -424,8 +423,7 @@ impl<'a> ProcessMention<'a> { match self.config.ban_user(&u, true) { Ok(_) => { self.add_reply(format!("User {} banned from group!", u)); - self.unfollow_by_acct(&u).await - .log_error("Failed to unfollow banned user"); + self.unfollow_by_acct(&u).await.log_error("Failed to unfollow banned user"); } Err(e) => { self.add_reply(format!("Failed to ban user {}: {}", u, e)); @@ -507,8 +505,7 @@ impl<'a> ProcessMention<'a> { match self.config.set_member(&u, true) { Ok(_) => { self.add_reply(format!("User {} added to the group!", u)); - self.follow_by_acct(&u) - .await.log_error("Failed to follow"); + self.follow_by_acct(&u).await.log_error("Failed to follow"); } Err(e) => { self.add_reply(format!("Failed to add user {} to group: {}", u, e)); @@ -526,8 +523,7 @@ impl<'a> ProcessMention<'a> { match self.config.set_member(&u, false) { Ok(_) => { self.add_reply(format!("User {} removed from the group.", u)); - self.unfollow_by_acct(&u).await - .log_error("Failed to unfollow removed user"); + self.unfollow_by_acct(&u).await.log_error("Failed to unfollow removed user"); } Err(e) => { self.add_reply(format!("Unexpected error occured: {}", e)); @@ -577,10 +573,7 @@ impl<'a> ProcessMention<'a> { self.add_reply(format!("User {} is now a group admin!", u)); } Err(e) => { - self.add_reply(format!( - "Failed to make user {} a group admin: {}", - u, e - )); + self.add_reply(format!("Failed to make user {} a group admin: {}", u, e)); } } } else { @@ -658,7 +651,8 @@ impl<'a> ProcessMention<'a> { self.add_reply(format!("This is a public-access group. {}", membership_line)); } - self.add_reply("\ + self.add_reply( + "\ To share a post, @ the group user or use a group hashtag.\n\ \n\ **Supported commands:**\n\ @@ -667,7 +661,8 @@ impl<'a> ProcessMention<'a> { `/tags` - show group hashtags\n\ `/join` - (re-)join the group\n\ `/leave` - leave the group\n\ - `/optout` - forbid sharing of your posts"); + `/optout` - forbid sharing of your posts", + ); if self.is_admin { self.add_reply("`/members`, `/who` - show group members / admins"); @@ -680,7 +675,8 @@ impl<'a> ProcessMention<'a> { // XXX when used on instance with small character limit, this won't fit! if self.is_admin { - self.add_reply("\n\ + self.add_reply( + "\n\ **Admin commands:**\n\ `/ping` - check the group works\n\ `/add user` - add a member (user@domain)\n\ @@ -694,7 +690,8 @@ impl<'a> ProcessMention<'a> { `/deadmin user` - revoke admin rights\n\ `/closegroup` - make member-only\n\ `/opengroup` - make public-access\n\ - `/announce x` - make a public announcement"); + `/announce x` - make a public announcement", + ); } } @@ -723,10 +720,13 @@ impl<'a> ProcessMention<'a> { if self.config.is_member_or_admin(&self.status_acct) { // admin can leave but that's a bad idea let _ = self.config.set_member(&self.status_acct, false); - self.add_reply("You're no longer a group member. Unfollow the group user to stop receiving group messages."); + self.add_reply( + "You're no longer a group member. Unfollow the group user to stop receiving group messages.", + ); } - self.unfollow_user_by_id(&self.status_user_id).await + self.unfollow_user_by_id(&self.status_user_id) + .await .log_error("Failed to unfollow"); } @@ -735,34 +735,39 @@ impl<'a> ProcessMention<'a> { debug!("Already member or admin, try to follow-back again"); // Already a member, so let's try to follow the user // again, maybe first time it failed - self.follow_user_by_id(&self.status_user_id).await - .log_error("Failed to follow"); + self.follow_user_by_id(&self.status_user_id).await.log_error("Failed to follow"); } else { // Not a member yet if self.config.is_member_only() { // No you can't - self.add_reply("\ + self.add_reply( + "\ Sorry, this group is closed to new sign-ups.\n\ - Please ask one of the group admins to add you:"); + Please ask one of the group admins to add you:", + ); self.append_admin_list_to_reply(); } else { // Open access, try to follow back - self.follow_user_by_id(&self.status_user_id).await - .log_error("Failed to follow"); + self.follow_user_by_id(&self.status_user_id).await.log_error("Failed to follow"); // This only fails if the user is banned, but that is filtered above let _ = self.config.set_member(&self.status_acct, true); - self.add_reply("\ + self.add_reply( + "\ Welcome to the group! The group user will now follow you to complete the sign-up. \ Make sure you follow back to receive shared posts!\n\n\ - Use /help for more info."); + Use /help for more info.", + ); } } } async fn cmd_ping(&mut self) { - self.add_reply(format!("pong, this is fedigroups service v{}", env!("CARGO_PKG_VERSION"))); + self.add_reply(format!( + "pong, this is fedigroups service v{}", + env!("CARGO_PKG_VERSION") + )); } async fn unfollow_by_acct(&self, acct: &str) -> Result<(), GroupError> { @@ -792,25 +797,29 @@ impl<'a> ProcessMention<'a> { } else { let normalized = normalize_acct(&account.acct, &self.group_acct)?; if self.config.is_banned(&normalized) { - return Err(GroupError::UserIsBanned); + Err(GroupError::UserIsBanned) } else if self.config.is_optout(&normalized) { - return Err(GroupError::UserOptedOut); + Err(GroupError::UserOptedOut) } else { Ok(normalized) } } } + + async fn delay_after_post(&self) { + tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_after_post_s)).await; + } } fn apply_trailing_hashtag_pleroma_bug_workaround(msg: &mut String) { - if crate::command::RE_HASHTAG_TRIGGERING_PLEROMA_BUG.is_match(&msg) { + if crate::command::RE_HASHTAG_TRIGGERING_PLEROMA_BUG.is_match(msg) { // if a status ends with a hashtag, pleroma will fuck it up debug!("Adding \" .\" to fix pleroma hashtag eating bug!"); msg.push_str(" ."); } } -fn smart_split(msg : &str, prefix: Option, limit: usize) -> Vec { +fn smart_split(msg: &str, prefix: Option, limit: usize) -> Vec { let prefix = prefix.unwrap_or_default(); if msg.len() + prefix.len() < limit { @@ -819,52 +828,57 @@ fn smart_split(msg : &str, prefix: Option, limit: usize) -> Vec let mut parts_to_send = vec![]; let mut this_piece = prefix.clone(); - for l in msg.split("\n") { + for l in msg.split('\n') { println!("* Line: {:?}", l); - if this_piece.len() + l.len() == limit { - println!("exactly fits within limit"); - // this line exactly reaches the limit - this_piece.push_str(l); - parts_to_send.push(std::mem::take(&mut this_piece).trim().to_owned()); - this_piece.push_str(&prefix); - } else if this_piece.len() + l.len() > limit { - println!("too long to append (already {} + new {})", this_piece.len(), l.len()); - // line too long to append - if this_piece != prefix { - let trimmed = this_piece.trim(); - if !trimmed.is_empty() { - println!("flush buffer: {:?}", trimmed); - parts_to_send.push(trimmed.to_owned()); - } + + match (this_piece.len() + l.len()).cmp(&limit) { + Ordering::Less => { + println!("append line"); + // this line still fits comfortably + this_piece.push_str(l); + this_piece.push('\n'); } + Ordering::Equal => { + println!("exactly fits within limit"); + // this line exactly reaches the limit + this_piece.push_str(l); + parts_to_send.push(std::mem::take(&mut this_piece).trim().to_owned()); + this_piece.push_str(&prefix); + } + Ordering::Greater => { + println!("too long to append (already {} + new {})", this_piece.len(), l.len()); + // line too long to append + if this_piece != prefix { + let trimmed = this_piece.trim(); + if !trimmed.is_empty() { + println!("flush buffer: {:?}", trimmed); + parts_to_send.push(trimmed.to_owned()); + } + } - // start new piece with the line. If the line is too long, break it up. - this_piece = format!("{}{}", prefix, l); + // start new piece with the line. If the line is too long, break it up. + this_piece = format!("{}{}", prefix, l); - while this_piece.len() > limit { - // line too long, try splitting at the last space, if any - let to_send = if let Some(last_space) = (&this_piece[..=limit]).rfind(' ') { - println!("line split at word boundary"); - let mut p = this_piece.split_off(last_space + 1); - std::mem::swap(&mut p, &mut this_piece); - p - } else { - println!("line split at exact len (no word boundary found)"); - let mut p = this_piece.split_off(limit); - std::mem::swap(&mut p, &mut this_piece); - p - }; - let part_trimmed = to_send.trim(); - println!("flush buffer: {:?}", part_trimmed); - parts_to_send.push(part_trimmed.to_owned()); - this_piece = format!("{}{}", prefix, this_piece.trim()); + while this_piece.len() > limit { + // line too long, try splitting at the last space, if any + let to_send = if let Some(last_space) = (&this_piece[..=limit]).rfind(' ') { + println!("line split at word boundary"); + let mut p = this_piece.split_off(last_space + 1); + std::mem::swap(&mut p, &mut this_piece); + p + } else { + println!("line split at exact len (no word boundary found)"); + let mut p = this_piece.split_off(limit); + std::mem::swap(&mut p, &mut this_piece); + p + }; + let part_trimmed = to_send.trim(); + println!("flush buffer: {:?}", part_trimmed); + parts_to_send.push(part_trimmed.to_owned()); + this_piece = format!("{}{}", prefix, this_piece.trim()); + } + this_piece.push('\n'); } - this_piece.push('\n'); - } else { - println!("append line"); - // this line still fits comfortably - this_piece.push_str(l); - this_piece.push('\n'); } } @@ -886,13 +900,16 @@ mod test { let to_split = "a234567890\nb234567890\nc234567890\nd234\n67890\ne234567890\n"; let parts = super::smart_split(to_split, None, 10); - assert_eq!(vec![ - "a234567890".to_string(), - "b234567890".to_string(), - "c234567890".to_string(), - "d234\n67890".to_string(), - "e234567890".to_string(), - ], parts); + assert_eq!( + vec![ + "a234567890".to_string(), + "b234567890".to_string(), + "c234567890".to_string(), + "d234\n67890".to_string(), + "e234567890".to_string(), + ], + parts + ); } #[test] @@ -900,81 +917,82 @@ mod test { let to_split = "foo\nbar\nbaz"; let parts = super::smart_split(to_split, None, 1000); - assert_eq!(vec![ - "foo\nbar\nbaz".to_string(), - ], parts); + assert_eq!(vec!["foo\nbar\nbaz".to_string(),], parts); } #[test] fn test_smart_split_nosplit_prefix() { let to_split = "foo\nbar\nbaz"; let parts = super::smart_split(to_split, Some("PREFIX".to_string()), 1000); - assert_eq!(vec![ - "PREFIXfoo\nbar\nbaz".to_string(), - ], parts); + assert_eq!(vec!["PREFIXfoo\nbar\nbaz".to_string(),], parts); } #[test] fn test_smart_split_prefix_each() { let to_split = "1234\n56\n7"; let parts = super::smart_split(to_split, Some("PREFIX".to_string()), 10); - assert_eq!(vec![ - "PREFIX1234".to_string(), - "PREFIX56\n7".to_string(), - ], parts); + assert_eq!(vec!["PREFIX1234".to_string(), "PREFIX56\n7".to_string(),], parts); } #[test] fn test_smart_split_words() { let to_split = "one two three four five six seven eight nine ten"; let parts = super::smart_split(to_split, None, 10); - assert_eq!(vec![ - "one two".to_string(), - "three four".to_string(), - "five six".to_string(), - "seven".to_string(), - "eight nine".to_string(), - "ten".to_string(), - ], parts); + assert_eq!( + vec![ + "one two".to_string(), + "three four".to_string(), + "five six".to_string(), + "seven".to_string(), + "eight nine".to_string(), + "ten".to_string(), + ], + parts + ); } #[test] fn test_smart_split_words_multispace() { let to_split = "one two three four five six seven eight nine ten "; let parts = super::smart_split(to_split, None, 10); - assert_eq!(vec![ - "one two".to_string(), - "three four".to_string(), - "five six".to_string(), - "seven".to_string(), - "eight nine".to_string(), - "ten".to_string(), - ], parts); + assert_eq!( + vec![ + "one two".to_string(), + "three four".to_string(), + "five six".to_string(), + "seven".to_string(), + "eight nine".to_string(), + "ten".to_string(), + ], + parts + ); } #[test] fn test_smart_split_words_longword() { let to_split = "one two threefourfive six"; let parts = super::smart_split(to_split, None, 10); - assert_eq!(vec![ - "one two".to_string(), - "threefourf".to_string(), - "ive six".to_string(), - ], parts); + assert_eq!( + vec!["one two".to_string(), "threefourf".to_string(), "ive six".to_string(),], + parts + ); } #[test] fn test_smart_split_words_prefix() { let to_split = "one two three four five six seven eight nine ten"; let parts = super::smart_split(to_split, Some("PREFIX".to_string()), 15); - assert_eq!(vec![ - "PREFIXone two".to_string(), - "PREFIXthree".to_string(), - "PREFIXfour five".to_string(), - "PREFIXsix seven".to_string(), - "PREFIXeight".to_string(), - "PREFIXnine ten".to_string(), - ], parts); + assert_eq!( + vec![ + "PREFIXone two".to_string(), + "PREFIXthree".to_string(), + "PREFIXfour five".to_string(), + "PREFIXsix seven".to_string(), + "PREFIXeight".to_string(), + "PREFIXnine ten".to_string(), + ], + parts + ); } #[test] @@ -998,4 +1016,4 @@ mod test { "@pepa@pig.club Nec varius mauris sem sollicitudin dolor. Nunc porta in urna nec vulputate.".to_string(), ], parts); } -} \ No newline at end of file +} diff --git a/src/group_handler/mod.rs b/src/group_handler/mod.rs index c888401..ef09f2b 100644 --- a/src/group_handler/mod.rs +++ b/src/group_handler/mod.rs @@ -1,24 +1,24 @@ use std::sync::Arc; use std::time::{Duration, Instant}; -use elefren::{FediClient, StatusBuilder}; use elefren::debug::EventDisplay; use elefren::debug::NotificationDisplay; use elefren::debug::StatusDisplay; +use elefren::entities::account::Account; use elefren::entities::event::Event; use elefren::entities::notification::{Notification, NotificationType}; use elefren::entities::status::Status; use elefren::status_builder::Visibility; +use elefren::{FediClient, StatusBuilder}; use futures::StreamExt; use handle_mention::ProcessMention; -use crate::error::GroupError; -use crate::store::ConfigStore; -use crate::store::data::{CommonConfig, GroupConfig}; -use crate::utils::{LogError, normalize_acct, VisExt}; use crate::command::StatusCommand; -use elefren::entities::account::Account; +use crate::error::GroupError; +use crate::store::CommonConfig; +use crate::store::GroupConfig; +use crate::utils::{normalize_acct, LogError, VisExt}; mod handle_mention; @@ -28,18 +28,16 @@ pub struct GroupHandle { pub group_account: Account, pub client: FediClient, pub config: GroupConfig, - pub common_config: Arc, + pub cc: Arc, } // TODO move other options to common_config! -// const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); -const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); -// higher because we can expect a lot of non-hashtag statuses here -const PERIODIC_SAVE: Duration = Duration::from_secs(60); -const SOCKET_ALIVE_TIMEOUT: Duration = Duration::from_secs(30); -const SOCKET_RETIRE_TIME: Duration = Duration::from_secs(120); -const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! +// // const DELAY_BEFORE_ACTION: Duration = Duration::from_millis(250); +// const DELAY_REOPEN_STREAM: Duration = Duration::from_millis(500); +// // higher because we can expect a lot of non-hashtag statuses here +// const SOCKET_ALIVE_TIMEOUT: Duration = Duration::from_secs(30); +// const SOCKET_RETIRE_TIME: Duration = Duration::from_secs(120); macro_rules! grp_debug { ($self:ident, $f:expr) => { @@ -59,6 +57,7 @@ macro_rules! grp_info { }; } +#[allow(unused)] macro_rules! grp_trace { ($self:ident, $f:expr) => { ::log::trace!(concat!("(@{}) ", $f), $self.config.get_acct()); @@ -86,30 +85,21 @@ macro_rules! grp_error { }; } - - impl GroupHandle { + #[allow(unused)] pub async fn save(&mut self) -> Result<(), GroupError> { - grp_debug!(self, "Saving group config & status"); + grp_debug!(self, "Saving group state unconditionally"); self.config.save(false).await?; Ok(()) } pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { - self.config.save_if_needed(false).await?; - Ok(()) - } - - /* - pub async fn reload(&mut self) -> Result<(), GroupError> { - if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { - self.config = g; - Ok(()) - } else { - Err(GroupError::GroupNotExist) + if self.config.is_dirty() { + grp_debug!(self, "Saving group state due to changes"); + self.config.save_if_needed(false).await?; } + Ok(()) } - */ } trait NotifTimestamp { @@ -130,15 +120,6 @@ impl NotifTimestamp for Status { } } -macro_rules! grp_trace { - ($self:ident, $f:expr) => { - ::log::trace!(concat!("(@{}) ", $f), $self.config.get_acct()); - }; - ($self:ident, $f:expr, $($arg:tt)+) => { - ::log::trace!(concat!("(@{}) ", $f), $self.config.get_acct(), $($arg)+); - }; -} - impl GroupHandle { pub async fn run(&mut self) -> Result<(), GroupError> { loop { @@ -150,18 +131,15 @@ impl GroupHandle { } Err(other) => { grp_error!(self, "ERROR in group handler, will restart! {}", other); - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_reopen_error_s)).await; } } } } pub async fn run_internal(&mut self) -> Result<(), GroupError> { - assert!(PERIODIC_SAVE >= PING_INTERVAL); - loop { grp_debug!(self, "Opening streaming API socket"); - let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start // wrapped in a timeout, this seems like the only place the group could hang // (https://git.ondrovo.com/MightyPork/group-actor/issues/8) @@ -175,7 +153,6 @@ impl GroupHandle { let socket_open_time = Instant::now(); let mut last_rx = Instant::now(); - // let mut last_ping = Instant::now(); match self.catch_up_with_missed_notifications().await { Ok(true) => { @@ -201,20 +178,14 @@ impl GroupHandle { } } - if self.config.is_dirty() { - // save asap - next_save = Instant::now() - PERIODIC_SAVE - } + self.save_if_needed().await.log_error("Failed to save"); 'rx: loop { - if next_save < Instant::now() { - grp_trace!(self, "Save time elapsed, saving if needed"); - self.save_if_needed().await.log_error("Failed to save group"); - next_save = Instant::now() + PERIODIC_SAVE; - } + let remains_to_idle_close = + Duration::from_secs_f64(self.cc.socket_alive_timeout_s).saturating_sub(last_rx.elapsed()); - let remains_to_idle_close = SOCKET_ALIVE_TIMEOUT.saturating_sub(last_rx.elapsed()); - let remains_to_retire = SOCKET_RETIRE_TIME.saturating_sub(socket_open_time.elapsed()); + let remains_to_retire = + Duration::from_secs_f64(self.cc.socket_retire_time_s).saturating_sub(socket_open_time.elapsed()); if remains_to_idle_close.is_zero() { grp_warn!(self, "Socket idle too long, close"); @@ -225,11 +196,7 @@ impl GroupHandle { break 'rx; } - let timeout = next_save - .saturating_duration_since(Instant::now()) - .min(remains_to_idle_close) - .min(remains_to_retire) - .max(Duration::from_secs(1)); // at least 1s + let timeout = remains_to_idle_close.min(remains_to_retire).max(Duration::from_secs(1)); // at least 1s grp_debug!(self, "Wait for message {:?}", timeout); match tokio::time::timeout(timeout, events.next()).await { @@ -247,6 +214,8 @@ impl GroupHandle { Event::FiltersChanged => {} Event::Heartbeat => {} } + + self.save_if_needed().await.log_error("Failed to save"); } Ok(None) => { grp_warn!(self, "Group @{} socket closed, restarting...", self.config.get_acct()); @@ -259,7 +228,7 @@ impl GroupHandle { } grp_warn!(self, "Notif stream closed, will reopen"); - tokio::time::sleep(DELAY_REOPEN_STREAM).await; + tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_reopen_closed_s)).await; } } @@ -324,8 +293,7 @@ impl GroupHandle { return Ok(()); } - if self.config.is_optout(&status_user) && !member_or_admin - { + if self.config.is_optout(&status_user) && !member_or_admin { grp_debug!(self, "Status author @{} opted out, discard", status_user); return Ok(()); } @@ -340,24 +308,30 @@ impl GroupHandle { let mentioned_user = normalize_acct(&m.acct, group_user)?; if mentioned_user == group_user { let notif_time = self.config.get_last_notif(); + if notif_time <= ts { - grp_debug!(self, "mentioned but status is older than last notif, can't be a valid notif, discard"); + grp_debug!( + self, + "mentioned but status is older than last notif, can't be a valid notif, discard" + ); return Ok(()); - } else { - if !commands.is_empty() { - grp_debug!(self, "Detected commands for this group, handle as notif"); + } + + if !commands.is_empty() { + grp_debug!(self, "Detected commands for this group, handle as notif"); - return self.handle_notification(Notification { + return self + .handle_notification(Notification { id: s.id.clone(), // ??? notification_type: NotificationType::Mention, - created_at: s.created_at.clone(), + created_at: s.created_at, account: s.account.clone(), - status: Some(s) - }).await; - } else if private { - grp_debug!(self, "mention in private without commands, discard, this is nothing"); - return Ok(()); - } + status: Some(s), + }) + .await; + } else if private { + grp_debug!(self, "mention in private without commands, discard, this is nothing"); + return Ok(()); } } } @@ -385,8 +359,8 @@ impl GroupHandle { 'tags: for t in tags { if self.config.is_tag_followed(&t) { grp_info!(self, "REBLOG #{} STATUS", t); - self.client.reblog(&s.id).await - .log_error("Failed to reblog"); + self.client.reblog(&s.id).await.log_error("Failed to reblog"); + self.delay_after_post().await; break 'tags; // do not reblog multiple times! } else { grp_debug!(self, "#{} is not a group tag", t); @@ -398,6 +372,7 @@ impl GroupHandle { async fn follow_user(&mut self, id: &str) -> Result<(), GroupError> { self.client.follow(id).await?; + self.delay_after_post().await; Ok(()) } @@ -413,6 +388,7 @@ impl GroupHandle { // They are retrieved newest first, but we want oldest first for chronological handling let mut num = 0; + let mut old_pn = 0; while let Some(n) = iter.next_item().await { let ts = n.timestamp_millis(); if ts <= last_notif { @@ -422,13 +398,17 @@ impl GroupHandle { grp_debug!(self, "Inspecting notif {}", NotificationDisplay(&n)); notifs_to_handle.push(n); num += 1; - if num > self.common_config.max_catchup_notifs { + if num > self.cc.max_catchup_notifs { grp_warn!(self, "Too many notifs missed to catch up!"); break; } - // sleep so we dont make the api angry - tokio::time::sleep(Duration::from_millis(250)).await; + let pn = iter.page_num(); + if pn != old_pn { + old_pn = pn; + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_fetch_page_s)).await; + } } if notifs_to_handle.is_empty() { @@ -461,6 +441,7 @@ impl GroupHandle { let mut newest_status = None; let mut num = 0; + let mut old_pn = 0; while let Some(s) = iter.next_item().await { let ts = s.timestamp_millis(); if ts <= last_status { @@ -475,13 +456,17 @@ impl GroupHandle { statuses_to_handle.push(s); num += 1; - if num > self.common_config.max_catchup_statuses { + if num > self.cc.max_catchup_statuses { grp_warn!(self, "Too many statuses missed to catch up!"); break; } - // sleep so we dont make the api angry - tokio::time::sleep(Duration::from_millis(250)).await; + let pn = iter.page_num(); + if pn != old_pn { + old_pn = pn; + // sleep so we dont make the api angry + tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_fetch_page_s)).await; + } } if let Some(ts) = newest_status { @@ -499,8 +484,7 @@ impl GroupHandle { for s in statuses_to_handle { grp_debug!(self, "Handling missed status: {}", StatusDisplay(&s)); - self.handle_status(s).await - .log_error("Error handling a status"); + self.handle_status(s).await.log_error("Error handling a status"); } Ok(true) @@ -509,8 +493,7 @@ impl GroupHandle { async fn handle_mention_status(&mut self, status: Status) -> Result<(), GroupError> { let res = ProcessMention::run(self, status).await; - self.save_if_needed().await - .log_error("Failed to save"); + self.save_if_needed().await.log_error("Failed to save"); res } @@ -523,24 +506,25 @@ impl GroupHandle { let mut admins = self.config.get_admins().cloned().collect::>(); admins.sort(); - format!("\ + format!( + "\ @{user} Welcome to the group! This group has posting restricted to members. \ If you'd like to join, please ask one of the group admins:\n\ {admins}", - user = notif_acct, - admins = admins.join(", ") + user = notif_acct, + admins = admins.join(", ") ) } else { follow_back = true; - self.config.set_member(notif_acct, true) - .log_error("Fail add a member"); + self.config.set_member(notif_acct, true).log_error("Fail add a member"); - format!("\ + format!( + "\ @{user} Welcome to the group! The group user will now follow you back to complete the sign-up. \ To share a post, @ the group user or use a group hashtag.\n\n\ Use /help for more info.", - user = notif_acct + user = notif_acct ) }; @@ -551,12 +535,15 @@ impl GroupHandle { .build() .expect("error build status"); - self.client.new_status(post).await - .log_error("Failed to post"); + self.client.new_status(post).await.log_error("Failed to post"); + self.delay_after_post().await; if follow_back { - self.follow_user(notif_user_id).await - .log_error("Failed to follow back"); + self.follow_user(notif_user_id).await.log_error("Failed to follow back"); } } + + async fn delay_after_post(&self) { + tokio::time::sleep(Duration::from_secs_f64(self.cc.delay_after_post_s)).await; + } } diff --git a/src/main.rs b/src/main.rs index c1e4db9..5c31dcc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,12 +31,18 @@ async fn main() -> anyhow::Result<()> { .multiple(true) .help("increase logging, can be repeated"), ) + .arg( + Arg::with_name("quiet") + .short("q") + .multiple(true) + .help("decrease logging, can be repeated"), + ) .arg( Arg::with_name("config") .short("c") .long("config") .takes_value(true) - .help("set custom config directory, defaults to groups.d"), + .help("set custom config directory, defaults to the current folder"), ) .arg( Arg::with_name("auth") @@ -64,9 +70,13 @@ async fn main() -> anyhow::Result<()> { LevelFilter::Trace, ]; - let default_level = 2; + let default_level = 3; - let level = (default_level + args.occurrences_of("verbose") as usize).min(LEVELS.len()); + let level = ( + default_level as isize + + args.occurrences_of("verbose") as isize + - args.occurrences_of("quiet") as isize) + .clamp(0, LEVELS.len() as isize) as usize; env_logger::Builder::new() .filter_level(LEVELS[level]) diff --git a/src/store/common_config.rs b/src/store/common_config.rs new file mode 100644 index 0000000..a40b28a --- /dev/null +++ b/src/store/common_config.rs @@ -0,0 +1,39 @@ +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct CommonConfig { + /// Max number of missed notifs to process after connect + pub max_catchup_notifs: usize, + /// Max number of missed statuses to process after connect + pub max_catchup_statuses: usize, + /// Delay between fetched pages when catching up + pub delay_fetch_page_s: f64, + /// Delay after sending a status, making a follow or some other action. + /// Set if there are Throttled errors and you need to slow the service down. + pub delay_after_post_s: f64, + /// Delay before trying to re-connect after the server closed the socket + pub delay_reopen_closed_s: f64, + /// Delay before trying to re-connect after an error + pub delay_reopen_error_s: f64, + /// Timeout for a notification/timeline socket to be considered alive. + /// If nothing arrives in this interval, reopen it. Some servers have a buggy socket + /// implementation where it stays open but no longer works. + pub socket_alive_timeout_s: f64, + /// Time after which a socket is always closed, even if seemingly alive. + /// This is a work-around for servers that stop sending notifs after a while. + pub socket_retire_time_s: f64, +} + +impl Default for CommonConfig { + fn default() -> Self { + Self { + max_catchup_notifs: 30, + max_catchup_statuses: 50, + delay_fetch_page_s: 0.25, + delay_after_post_s: 0.0, + delay_reopen_closed_s: 0.5, + delay_reopen_error_s: 5.0, + socket_alive_timeout_s: 30.0, + socket_retire_time_s: 120.0, + } + } +} diff --git a/src/store/data.rs b/src/store/group_config.rs similarity index 89% rename from src/store/data.rs rename to src/store/group_config.rs index e4b3b8d..4e56968 100644 --- a/src/store/data.rs +++ b/src/store/group_config.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::path::{Path, PathBuf}; use elefren::AppData; @@ -6,23 +6,7 @@ use elefren::AppData; use crate::error::GroupError; #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct CommonConfig { - pub max_catchup_notifs: usize, - pub max_catchup_statuses: usize, -} - -impl Default for CommonConfig { - fn default() -> Self { - Self { - max_catchup_notifs: 30, - max_catchup_statuses: 50, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] struct FixedConfig { enabled: bool, /// Group actor's acct @@ -37,43 +21,8 @@ struct FixedConfig { _path: PathBuf, } -macro_rules! impl_change_tracking { - ($struc:ident) => { - impl $struc { - pub(crate) fn mark_dirty(&mut self) { - self._dirty = true; - } - - pub(crate) fn is_dirty(&self) -> bool { - self._dirty - } - - pub(crate) fn clear_dirty_status(&mut self) { - self._dirty = false; - } - - pub(crate) async fn save_if_needed(&mut self) -> Result<(), GroupError> { - if self._dirty { - self.save().await?; - } - Ok(()) - } - - pub(crate) async fn save(&mut self) -> Result<(), GroupError> { - tokio::fs::write(&self._path, serde_json::to_string_pretty(&self)?.as_bytes()).await?; - self._dirty = false; - Ok(()) - } - } - }; -} - -impl_change_tracking!(FixedConfig); -impl_change_tracking!(MutableConfig); -impl_change_tracking!(StateConfig); - #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] struct MutableConfig { /// Hashtags the group will auto-boost from it's members group_tags: HashSet, @@ -96,7 +45,7 @@ struct MutableConfig { } #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] struct StateConfig { /// Last seen notification timestamp (millis) last_notif_ts: u64, @@ -109,8 +58,7 @@ struct StateConfig { } /// This is the inner data struct holding a group's config -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] +#[derive(Debug, Clone)] pub struct GroupConfig { /// Fixed config that we only read config: FixedConfig, @@ -166,6 +114,43 @@ impl Default for StateConfig { } } +macro_rules! impl_change_tracking { + ($struc:ident) => { + impl $struc { + pub(crate) fn mark_dirty(&mut self) { + self._dirty = true; + } + + pub(crate) fn is_dirty(&self) -> bool { + self._dirty + } + + pub(crate) fn clear_dirty_status(&mut self) { + self._dirty = false; + } + + pub(crate) async fn save_if_needed(&mut self) -> Result { + if self.is_dirty() { + self.save().await?; + Ok(true) + } else { + Ok(false) + } + } + + pub(crate) async fn save(&mut self) -> Result<(), GroupError> { + tokio::fs::write(&self._path, serde_json::to_string_pretty(&self)?.as_bytes()).await?; + self.clear_dirty_status(); + Ok(()) + } + } + }; +} + +impl_change_tracking!(FixedConfig); +impl_change_tracking!(MutableConfig); +impl_change_tracking!(StateConfig); + impl Default for GroupConfig { fn default() -> Self { Self { @@ -176,10 +161,10 @@ impl Default for GroupConfig { } } -async fn load_or_create_control_file(control_path : impl AsRef) -> Result { +async fn load_or_create_control_file(control_path: impl AsRef) -> Result { let control_path = control_path.as_ref(); let mut dirty = false; - let mut control : MutableConfig = if control_path.is_file() { + let mut control: MutableConfig = if control_path.is_file() { let f = tokio::fs::read(&control_path).await?; let mut control: MutableConfig = serde_json::from_slice(&f)?; control._path = control_path.to_owned(); @@ -194,15 +179,14 @@ async fn load_or_create_control_file(control_path : impl AsRef) -> Result< }; if dirty { control.save().await?; - // tokio::fs::write(&control._path, serde_json::to_string(&control)?.as_bytes()).await?; } Ok(control) } -async fn load_or_create_state_file(state_path : impl AsRef) -> Result { +async fn load_or_create_state_file(state_path: impl AsRef) -> Result { let state_path = state_path.as_ref(); let mut dirty = false; - let mut state : StateConfig = if state_path.is_file() { + let mut state: StateConfig = if state_path.is_file() { let f = tokio::fs::read(&state_path).await?; let mut control: StateConfig = serde_json::from_slice(&f)?; control._path = state_path.to_owned(); @@ -217,29 +201,42 @@ async fn load_or_create_state_file(state_path : impl AsRef) -> Result bool { - self.config.is_dirty() - || self.control.is_dirty() - || self.state.is_dirty() + self.config.is_dirty() || self.control.is_dirty() || self.state.is_dirty() } /// Save only what changed pub(crate) async fn save_if_needed(&mut self, danger_allow_overwriting_config: bool) -> Result<(), GroupError> { + #[allow(clippy::collapsible_if)] if danger_allow_overwriting_config { - self.config.save_if_needed().await?; + if self.config.save_if_needed().await? { + debug!( + "Written {} config file {}", + self.config.acct, + self.config._path.display() + ); + } + } + if self.control.save_if_needed().await? { + debug!( + "Written {} control file {}", + self.config.acct, + self.control._path.display() + ); + } + if self.state.save_if_needed().await? { + debug!("Written {} state file {}", self.config.acct, self.state._path.display()); } - self.control.save_if_needed().await?; - self.state.save_if_needed().await?; Ok(()) } /// Save all unconditionally + #[allow(unused)] pub(crate) async fn save(&mut self, danger_allow_overwriting_config: bool) -> Result<(), GroupError> { if danger_allow_overwriting_config { self.config.save().await?; @@ -297,11 +294,7 @@ impl GroupConfig { /* state */ let state = load_or_create_state_file(state_path).await?; - let g = GroupConfig { - config, - control, - state - }; + let g = GroupConfig { config, control, state }; g.warn_of_bad_config(); Ok(g) } @@ -324,11 +317,7 @@ impl GroupConfig { /* state */ let state = load_or_create_state_file(state_path).await?; - let g = GroupConfig { - config, - control, - state - }; + let g = GroupConfig { config, control, state }; g.warn_of_bad_config(); Ok(g) } @@ -336,17 +325,26 @@ impl GroupConfig { fn warn_of_bad_config(&self) { for t in &self.control.group_tags { if &t.to_lowercase() != t { - warn!("Group {} hashtag \"{}\" is not lowercase, it won't work!", self.config.acct, t); + warn!( + "Group {} hashtag \"{}\" is not lowercase, it won't work!", + self.config.acct, t + ); } } - for u in self.control.admin_users.iter() + for u in self + .control + .admin_users + .iter() .chain(self.control.member_users.iter()) .chain(self.control.banned_users.iter()) .chain(self.control.optout_users.iter()) { if &u.to_lowercase() != u { - warn!("Group {} config contains a user with non-lowercase name \"{}\", it won't work!", self.config.acct, u); + warn!( + "Group {} config contains a user with non-lowercase name \"{}\", it won't work!", + self.config.acct, u + ); } if u.starts_with('@') || u.chars().filter(|c| *c == '@').count() != 1 { @@ -374,15 +372,15 @@ impl GroupConfig { self.config.appdata = appdata; } - pub(crate) fn get_admins(&self) -> impl Iterator { + pub(crate) fn get_admins(&self) -> impl Iterator { self.control.admin_users.iter() } - pub(crate) fn get_members(&self) -> impl Iterator { + pub(crate) fn get_members(&self) -> impl Iterator { self.control.member_users.iter() } - pub(crate) fn get_tags(&self) -> impl Iterator { + pub(crate) fn get_tags(&self) -> impl Iterator { self.control.group_tags.iter() } @@ -425,8 +423,7 @@ impl GroupConfig { } pub(crate) fn is_member_or_admin(&self, acct: &str) -> bool { - self.is_member(acct) - || self.is_admin(acct) + self.is_member(acct) || self.is_admin(acct) } pub(crate) fn is_banned(&self, acct: &str) -> bool { @@ -563,7 +560,7 @@ fn acct_to_server(acct: &str) -> &str { #[cfg(test)] mod tests { use crate::error::GroupError; - use crate::store::data::{acct_to_server, GroupConfig}; + use crate::store::group_config::{acct_to_server, GroupConfig}; #[test] fn test_acct_to_server() { diff --git a/src/store/mod.rs b/src/store/mod.rs index 4412941..4d05c1c 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -3,20 +3,19 @@ use std::sync::Arc; use elefren::{scopes, FediClient, Registration, Scopes}; use futures::StreamExt; -use tokio::sync::RwLock; - -use data::{GroupConfig}; use crate::error::GroupError; use crate::group_handler::GroupHandle; -use std::time::Duration; -use crate::store::data::CommonConfig; -pub(crate) mod data; +pub mod common_config; +pub mod group_config; +pub use common_config::CommonConfig; +pub use group_config::GroupConfig; #[derive(Debug, Default)] pub struct ConfigStore { store_path: PathBuf, + groups_path: PathBuf, config: Arc, } @@ -37,13 +36,16 @@ impl ConfigStore { pub async fn load_from_fs(options: StoreOptions) -> Result, GroupError> { let given_path: &Path = options.store_dir.as_ref(); - let mut common_file : Option = None; - let base_dir : PathBuf; + let mut common_file: Option = None; + let base_dir: PathBuf; if given_path.is_file() { if given_path.extension().unwrap_or_default().to_string_lossy() == "json" { // this is a groups.json file common_file = Some(given_path.to_owned()); - base_dir = given_path.parent().ok_or_else(|| GroupError::BadConfig("no parent dir".into()))?.to_owned(); + base_dir = given_path + .parent() + .ok_or_else(|| GroupError::BadConfig("no parent dir".into()))? + .to_owned(); } else { return Err(GroupError::BadConfig("bad config file, should be JSON".into())); } @@ -61,15 +63,26 @@ impl ConfigStore { return Err(GroupError::BadConfig("base dir does not exist".into())); } - let config : CommonConfig = if let Some(cf) = &common_file { + let config: CommonConfig = if let Some(cf) = &common_file { + debug!("Loading common config from {}", cf.display()); let f = tokio::fs::read(&cf).await?; serde_json::from_slice(&f)? } else { + debug!("No common config file, using defaults"); CommonConfig::default() }; + debug!("Using common config:\n{:#?}", config); + + let groups_path = base_dir.join("groups.d"); + if !groups_path.exists() { + debug!("Creating groups directory"); + tokio::fs::create_dir_all(&groups_path).await?; + } + Ok(Arc::new(Self { store_path: base_dir.to_owned(), + groups_path, config: Arc::new(config), })) } @@ -87,7 +100,7 @@ impl ConfigStore { let client = elefren::helpers::cli::authenticate(registration).await?; let appdata = client.data.clone(); - let group_dir = self.store_path.join(&opts.acct); + let group_dir = self.groups_path.join(&opts.acct); let data = GroupConfig::from_appdata(opts.acct.clone(), appdata, group_dir).await?; @@ -111,14 +124,13 @@ impl ConfigStore { group_account, client, config: data, - common_config: self.config.clone(), + cc: self.config.clone(), }) } /// Re-auth an existing group pub async fn reauth_group(self: &Arc, acct: &str) -> Result { - - let group_dir = self.store_path.join(&acct); + let group_dir = self.groups_path.join(&acct); let mut config = GroupConfig::from_dir(group_dir).await?; @@ -156,21 +168,21 @@ impl ConfigStore { group_account, client, config, - common_config: self.config.clone(), + cc: self.config.clone(), }) } /// Spawn existing group using saved creds pub async fn spawn_groups(self: Arc) -> Result, GroupError> { - let dirs = std::fs::read_dir(&self.store_path.join("groups.d"))?; + info!("Starting group services for groups in {}", self.groups_path.display()); + let dirs = std::fs::read_dir(&self.groups_path)?; // Connect in parallel Ok(futures::stream::iter(dirs) - .map(|entry_maybe : Result| async { + .map(|entry_maybe: Result| async { match entry_maybe { Ok(entry) => { - let mut gc = GroupConfig::from_dir(entry.path()) - .await.ok()?; + let gc = GroupConfig::from_dir(entry.path()).await.ok()?; if !gc.is_enabled() { debug!("Group @{} is DISABLED", gc.get_acct()); @@ -184,9 +196,9 @@ impl ConfigStore { let my_account = match client.verify_credentials().await { Ok(account) => { info!( - "Group account verified: @{}, \"{}\"", - account.acct, account.display_name - ); + "Group account verified: @{}, \"{}\"", + account.acct, account.display_name + ); account } Err(e) => { @@ -199,7 +211,7 @@ impl ConfigStore { group_account: my_account, client, config: gc, - common_config: self.config.clone(), + cc: self.config.clone(), }) } Err(e) => { @@ -216,10 +228,8 @@ impl ConfigStore { .collect()) } - pub async fn group_exists(&self, acct : &str) -> bool { - self.store_path.join(acct) - .join("config.json") - .is_file() + pub async fn group_exists(&self, acct: &str) -> bool { + self.store_path.join(acct).join("config.json").is_file() } } diff --git a/src/utils.rs b/src/utils.rs index 1536812..01cb258 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -105,9 +105,7 @@ impl VisExt for Visibility { fn make_unlisted(self) -> Self { match self { - Visibility::Public => { - Visibility::Unlisted - } + Visibility::Public => Visibility::Unlisted, other => other, } }