slightly better handling of socket going silent, maybe

master
Ondřej Hruška 3 years ago
parent 60653314cd
commit 81a98c3d3c
Signed by untrusted user: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 2
      Cargo.lock
  2. 2
      Cargo.toml
  3. 36
      src/group_handler/mod.rs

2
Cargo.lock generated

@ -328,7 +328,7 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]] [[package]]
name = "fedigroups" name = "fedigroups"
version = "0.2.0" version = "0.2.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",

@ -1,6 +1,6 @@
[package] [package]
name = "fedigroups" name = "fedigroups"
version = "0.2.0" version = "0.2.1"
authors = ["Ondřej Hruška <ondra@ondrovo.com>"] authors = ["Ondřej Hruška <ondra@ondrovo.com>"]
edition = "2018" edition = "2018"
publish = false publish = false

@ -35,6 +35,8 @@ const MAX_CATCHUP_NOTIFS: usize = 25;
const MAX_CATCHUP_STATUSES: usize = 50; const MAX_CATCHUP_STATUSES: usize = 50;
// higher because we can expect a lot of non-hashtag statuses here // higher because we can expect a lot of non-hashtag statuses here
const PERIODIC_SAVE: Duration = Duration::from_secs(60); const PERIODIC_SAVE: Duration = Duration::from_secs(60);
const SOCKET_ALIVE_TIMEOUT: Duration = Duration::from_secs(30);
const SOCKET_RETIRE_TIME: Duration = Duration::from_secs(120);
const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save! const PING_INTERVAL: Duration = Duration::from_secs(15); // must be < periodic save!
impl GroupHandle { impl GroupHandle {
@ -93,7 +95,7 @@ impl GroupHandle {
let mut events = self.client.streaming_user().await?; let mut events = self.client.streaming_user().await?;
let socket_open_time = Instant::now(); let socket_open_time = Instant::now();
let mut last_rx = Instant::now(); let mut last_rx = Instant::now();
let mut last_ping = Instant::now(); // let mut last_ping = Instant::now();
match self.catch_up_with_missed_notifications().await { match self.catch_up_with_missed_notifications().await {
Ok(true) => { Ok(true) => {
@ -131,12 +133,14 @@ impl GroupHandle {
next_save = Instant::now() + PERIODIC_SAVE; next_save = Instant::now() + PERIODIC_SAVE;
} }
if last_rx.elapsed() > PING_INTERVAL * 2 { let remains_to_idle_close = SOCKET_ALIVE_TIMEOUT.saturating_sub(last_rx.elapsed());
let remains_to_retire = SOCKET_RETIRE_TIME.saturating_sub(socket_open_time.elapsed());
if remains_to_idle_close.is_zero() {
warn!("Socket idle too long, close"); warn!("Socket idle too long, close");
break 'rx; break 'rx;
} }
if remains_to_retire.is_zero() {
if socket_open_time.elapsed() > Duration::from_secs(120) {
debug!("Socket open too long, closing"); debug!("Socket open too long, closing");
break 'rx; break 'rx;
} }
@ -144,8 +148,9 @@ impl GroupHandle {
trace!("Waiting for message"); trace!("Waiting for message");
let timeout = next_save let timeout = next_save
.saturating_duration_since(Instant::now()) .saturating_duration_since(Instant::now())
.min(PING_INTERVAL) .min(remains_to_idle_close)
.max(Duration::from_secs(1)); .min(remains_to_retire)
.max(Duration::from_secs(1)); // at least 1s
match tokio::time::timeout(timeout, events.next()).await { match tokio::time::timeout(timeout, events.next()).await {
Ok(Some(event)) => { Ok(Some(event)) => {
@ -172,14 +177,17 @@ impl GroupHandle {
} }
} }
if last_ping.elapsed() > PING_INTERVAL { /* ping is nice, but pleroma still sometimes doesnt send
last_ping = Instant::now(); notifs after a while, just let it expire */
trace!("Pinging");
if events.send_ping() // if last_ping.elapsed() > PING_INTERVAL {
.await.is_err() { // last_ping = Instant::now();
break 'rx; // trace!("Pinging");
} // if events.send_ping()
} // .await.is_err() {
// break 'rx;
// }
// }
} }
warn!("Notif stream closed, will reopen"); warn!("Notif stream closed, will reopen");

Loading…
Cancel
Save