forked from MightyPork/group-actor
parent
1ea3aa7cb0
commit
5a631f785e
@ -0,0 +1,209 @@ |
||||
use std::sync::Arc; |
||||
use std::time::{Duration, Instant}; |
||||
|
||||
use elefren::{FediClient, StatusBuilder}; |
||||
use elefren::debug::EventDisplay; |
||||
use elefren::debug::NotificationDisplay; |
||||
use elefren::entities::event::Event; |
||||
use elefren::entities::notification::{Notification, NotificationType}; |
||||
use elefren::status_builder::Visibility; |
||||
use futures::StreamExt; |
||||
|
||||
use crate::store::{ConfigStore, GroupError}; |
||||
use crate::store::data::GroupConfig; |
||||
use crate::utils::LogError; |
||||
|
||||
/// This is one group's config store capable of persistence
|
||||
#[derive(Debug)] |
||||
pub struct GroupHandle { |
||||
pub(crate) client: FediClient, |
||||
pub(crate) config: GroupConfig, |
||||
pub(crate) store: Arc<ConfigStore>, |
||||
} |
||||
|
||||
impl GroupHandle { |
||||
pub async fn save(&mut self) -> Result<(), GroupError> { |
||||
debug!("Saving group config & status"); |
||||
self.store.set_group_config(self.config.clone()).await?; |
||||
self.config.clear_dirty_status(); |
||||
Ok(()) |
||||
} |
||||
|
||||
pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { |
||||
if self.config.is_dirty() { |
||||
self.save().await?; |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub async fn reload(&mut self) -> Result<(), GroupError> { |
||||
if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { |
||||
self.config = g; |
||||
Ok(()) |
||||
} else { |
||||
Err(GroupError::GroupNotExist) |
||||
} |
||||
} |
||||
} |
||||
|
||||
trait NotifTimestamp { |
||||
fn timestamp_millis(&self) -> u64; |
||||
} |
||||
|
||||
impl NotifTimestamp for Notification { |
||||
fn timestamp_millis(&self) -> u64 { |
||||
self.created_at.timestamp_millis().max(0) as u64 |
||||
} |
||||
} |
||||
|
||||
impl GroupHandle { |
||||
pub async fn run(&mut self) -> Result<(), GroupError> { |
||||
const PERIODIC_SAVE: Duration = Duration::from_secs(60); |
||||
const PING_INTERVAL: Duration = Duration::from_secs(15); |
||||
|
||||
assert!(PERIODIC_SAVE >= PING_INTERVAL); |
||||
|
||||
let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start
|
||||
|
||||
loop { |
||||
debug!("Opening streaming API socket"); |
||||
let mut events = self.client.streaming_user().await?; |
||||
|
||||
match self.catch_up_with_missed_notifications().await { |
||||
Ok(true) => { |
||||
debug!("Some missed notifs handled"); |
||||
// Save asap!
|
||||
next_save = Instant::now() - PERIODIC_SAVE |
||||
} |
||||
Ok(false) => { |
||||
debug!("No notifs missed"); |
||||
} |
||||
Err(e) => { |
||||
error!("Failed to handle missed notifs: {}", e); |
||||
} |
||||
} |
||||
|
||||
loop { |
||||
if next_save < Instant::now() { |
||||
self.save_if_needed().await |
||||
.log_error("Failed to save group"); |
||||
next_save = Instant::now() + PERIODIC_SAVE; |
||||
} |
||||
|
||||
let timeout = next_save.saturating_duration_since(Instant::now()) |
||||
.min(PING_INTERVAL) |
||||
.max(Duration::from_secs(1)); |
||||
|
||||
match tokio::time::timeout(timeout, events.next()).await { |
||||
Ok(Some(event)) => { |
||||
debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); |
||||
match event { |
||||
Event::Update(_status) => {} |
||||
Event::Notification(n) => { |
||||
self.handle_notification(n).await; |
||||
} |
||||
Event::Delete(_id) => {} |
||||
Event::FiltersChanged => {} |
||||
} |
||||
} |
||||
Ok(None) => { |
||||
warn!("Group @{} socket closed, restarting...", self.config.get_acct()); |
||||
break; |
||||
} |
||||
Err(_) => { |
||||
// Timeout so we can save if needed
|
||||
} |
||||
} |
||||
|
||||
trace!("Pinging"); |
||||
events.send_ping().await.log_error("Fail to send ping"); |
||||
} |
||||
|
||||
warn!("Notif stream closed, will reopen"); |
||||
tokio::time::sleep(Duration::from_millis(1000)).await; |
||||
} |
||||
} |
||||
|
||||
async fn handle_notification(&mut self, n: Notification) { |
||||
debug!("Handling notif #{}", n.id); |
||||
let ts = n.timestamp_millis(); |
||||
self.config.set_last_notif(ts); |
||||
|
||||
match n.notification_type { |
||||
NotificationType::Mention => { |
||||
if let Some(status) = n.status { |
||||
if status.content.contains("/gi") || status.content.contains("\\gi") { |
||||
info!("Mention ignored by gi"); |
||||
} else if status.content.contains("/gb") || status.content.contains("\\gb") { |
||||
if let Some(id) = status.in_reply_to_id { |
||||
info!("Boosting prev post by GB"); |
||||
tokio::time::sleep(Duration::from_millis(500)).await; |
||||
// self.client.reblog(&id).await.log_error("Failed to boost");
|
||||
} |
||||
} else { |
||||
info!("Boosting mention"); |
||||
tokio::time::sleep(Duration::from_millis(500)).await; |
||||
// self.client.reblog(&status.id).await.log_error("Failed to boost");
|
||||
} |
||||
} |
||||
} |
||||
NotificationType::Follow => { |
||||
info!("New follower!"); |
||||
tokio::time::sleep(Duration::from_millis(500)).await; |
||||
|
||||
/* |
||||
let post = StatusBuilder::new() |
||||
.status(format!("@{} welcome to the group!", &n.account.acct)) |
||||
.content_type("text/markdown") |
||||
.visibility(Visibility::Unlisted) |
||||
.build().expect("error build status"); |
||||
|
||||
let _ = self.client.new_status(post).await.log_error("Failed to post"); |
||||
*/ |
||||
} |
||||
_ => {} |
||||
} |
||||
} |
||||
|
||||
/// Catch up with missed notifications, returns true if any were handled
|
||||
async fn catch_up_with_missed_notifications(&mut self) -> Result<bool, GroupError> { |
||||
const MAX_CATCHUP_NOTIFS: usize = 25; |
||||
let last_notif = self.config.get_last_notif(); |
||||
|
||||
let notifications = self.client.notifications().await?; |
||||
let mut iter = notifications.items_iter(); |
||||
|
||||
let mut notifs_to_handle = vec![]; |
||||
|
||||
// They are retrieved newest first, but we want oldest first for chronological handling
|
||||
|
||||
let mut num = 0; |
||||
while let Some(n) = iter.next_item().await { |
||||
let ts = n.timestamp_millis(); |
||||
if ts <= last_notif { |
||||
break; // reached our last seen notif
|
||||
} |
||||
notifs_to_handle.push(n); |
||||
num += 1; |
||||
if num > MAX_CATCHUP_NOTIFS { |
||||
warn!("Too many notifs missed to catch up!"); |
||||
break; |
||||
} |
||||
} |
||||
|
||||
if notifs_to_handle.is_empty() { |
||||
return Ok(false); |
||||
} |
||||
|
||||
notifs_to_handle.reverse(); |
||||
|
||||
debug!("{} notifications to catch up!", notifs_to_handle.len()); |
||||
|
||||
for n in notifs_to_handle { |
||||
debug!("Handling missed notification: {}", NotificationDisplay(&n)); |
||||
self.handle_notification(n).await; |
||||
} |
||||
|
||||
return Ok(true); |
||||
} |
||||
} |
@ -0,0 +1,333 @@ |
||||
use std::collections::{HashMap, HashSet}; |
||||
|
||||
use crate::store; |
||||
use crate::store::GroupError; |
||||
use elefren::AppData; |
||||
|
||||
/// This is the inner data struct holding the config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)] |
||||
pub(crate) struct Config { |
||||
groups: HashMap<String, GroupConfig>, |
||||
} |
||||
|
||||
impl Config { |
||||
pub(crate) fn iter_groups(&self) -> impl Iterator<Item=&GroupConfig>{ |
||||
self.groups.values() |
||||
} |
||||
|
||||
pub(crate) fn get_group_config(&self, acct : &str) -> Option<&GroupConfig> { |
||||
self.groups.get(acct) |
||||
} |
||||
|
||||
pub(crate) fn set_group_config(&mut self, grp : GroupConfig) { |
||||
self.groups.insert(grp.acct.clone(), grp); |
||||
} |
||||
|
||||
} |
||||
|
||||
/// This is the inner data struct holding a group's config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)] |
||||
#[serde(default)] |
||||
pub(crate) struct GroupConfig { |
||||
enabled: bool, |
||||
/// Group actor's acct
|
||||
acct: String, |
||||
/// elefren data
|
||||
appdata : AppData, |
||||
/// List of admin account "acct" names, e.g. piggo@piggo.space
|
||||
admin_users: HashSet<String>, |
||||
/// List of users allowed to post to the group, if it is member-only
|
||||
member_users: HashSet<String>, |
||||
/// List of users banned from posting to the group
|
||||
banned_users: HashSet<String>, |
||||
/// True if only members should be allowed to write
|
||||
member_only: bool, |
||||
/// Banned domain names, e.g. kiwifarms.cc
|
||||
banned_servers: HashSet<String>, |
||||
/// Last seen notification timestamp
|
||||
last_notif_ts: u64, |
||||
|
||||
#[serde(skip)] |
||||
dirty: bool, |
||||
} |
||||
|
||||
impl Default for GroupConfig { |
||||
fn default() -> Self { |
||||
Self { |
||||
enabled: true, |
||||
acct: "".to_string(), |
||||
appdata: AppData { |
||||
base: Default::default(), |
||||
client_id: Default::default(), |
||||
client_secret: Default::default(), |
||||
redirect: Default::default(), |
||||
token: Default::default() |
||||
}, |
||||
admin_users: Default::default(), |
||||
member_users: Default::default(), |
||||
banned_users: Default::default(), |
||||
member_only: false, |
||||
banned_servers: Default::default(), |
||||
last_notif_ts: 0, |
||||
dirty: false, |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl GroupConfig { |
||||
pub(crate) fn new(acct : String, appdata: AppData) -> Self { |
||||
Self { |
||||
acct, |
||||
appdata, |
||||
..Default::default() |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn is_enabled(&self) -> bool { |
||||
self.enabled |
||||
} |
||||
|
||||
pub(crate) fn set_enabled(&mut self, ena: bool){ |
||||
self.enabled = ena; |
||||
self.mark_dirty(); |
||||
} |
||||
|
||||
pub(crate) fn get_appdata(&self) -> &AppData { |
||||
&self.appdata |
||||
} |
||||
|
||||
pub(crate) fn set_appdata(&mut self, appdata: AppData) { |
||||
self.appdata = appdata; |
||||
self.mark_dirty(); |
||||
} |
||||
|
||||
pub(crate) fn set_last_notif(&mut self, ts: u64) { |
||||
self.last_notif_ts = self.last_notif_ts.max(ts); |
||||
self.mark_dirty(); |
||||
} |
||||
|
||||
pub(crate) fn get_last_notif(&self) -> u64 { |
||||
self.last_notif_ts |
||||
} |
||||
|
||||
pub(crate) fn get_acct(&self) -> &str { |
||||
&self.acct |
||||
} |
||||
|
||||
pub(crate) fn is_admin(&self, acct: &str) -> bool { |
||||
self.admin_users.contains(acct) |
||||
} |
||||
|
||||
pub(crate) fn is_member(&self, acct: &str) -> bool { |
||||
self.member_users.contains(acct) |
||||
} |
||||
|
||||
pub(crate) fn is_banned(&self, acct: &str) -> bool { |
||||
self.banned_users.contains(acct) |
||||
|| self.is_users_server_banned(acct) |
||||
} |
||||
|
||||
/// Check if the user's server is banned
|
||||
fn is_users_server_banned(&self, acct: &str) -> bool { |
||||
let server = acct_to_server(acct); |
||||
self.banned_servers.contains(server) |
||||
} |
||||
|
||||
pub(crate) fn can_write(&self, acct: &str) -> bool { |
||||
if self.is_admin(acct) { |
||||
true |
||||
} else { |
||||
!self.is_banned(acct) && ( |
||||
!self.is_member_only() |
||||
|| self.is_member(acct) |
||||
) |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn set_admin(&mut self, acct: &str, admin: bool) -> Result<(), GroupError> { |
||||
if admin { |
||||
if self.is_banned(acct) { |
||||
return Err(GroupError::UserIsBanned); |
||||
} |
||||
self.admin_users.insert(acct.to_owned()); |
||||
} else { |
||||
self.admin_users.remove(acct); |
||||
} |
||||
self.mark_dirty(); |
||||
Ok(()) |
||||
} |
||||
|
||||
pub(crate) fn set_member(&mut self, acct: &str, member: bool) -> Result<(), GroupError> { |
||||
if member { |
||||
if self.is_banned(acct) { |
||||
return Err(GroupError::UserIsBanned); |
||||
} |
||||
self.member_users.insert(acct.to_owned()); |
||||
} else { |
||||
self.member_users.remove(acct); |
||||
} |
||||
self.mark_dirty(); |
||||
Ok(()) |
||||
} |
||||
|
||||
pub(crate) fn ban_user(&mut self, acct: &str, ban: bool) -> Result<(), GroupError> { |
||||
if ban { |
||||
if self.is_admin(acct) { |
||||
return Err(GroupError::UserIsAdmin); |
||||
} |
||||
self.banned_users.insert(acct.to_owned()); |
||||
} else { |
||||
self.banned_users.remove(acct); |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub(crate) fn ban_server(&mut self, server: &str, ban: bool) -> Result<(), GroupError> { |
||||
if ban { |
||||
for acct in &self.admin_users { |
||||
let acct_server = acct_to_server(acct); |
||||
if acct_server == server { |
||||
return Err(GroupError::AdminsOnServer); |
||||
} |
||||
} |
||||
self.banned_servers.insert(server.to_owned()); |
||||
} else |