From 5a631f785e4a08ef5c690765546712195db59640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Sat, 21 Aug 2021 22:35:01 +0200 Subject: [PATCH] config store & event loop with reconn --- .gitignore | 1 + Cargo.lock | 424 +++++++++++++++++++++++++++++++++++++------- Cargo.toml | 11 +- src/group_handle.rs | 209 ++++++++++++++++++++++ src/main.rs | 117 +++++++++--- src/store/data.rs | 333 ++++++++++++++++++++++++++++++++++ src/store/mod.rs | 225 +++++++++++++++++++++++ src/utils.rs | 16 ++ 8 files changed, 1249 insertions(+), 87 deletions(-) create mode 100644 src/group_handle.rs create mode 100644 src/store/data.rs create mode 100644 src/store/mod.rs create mode 100644 src/utils.rs diff --git a/.gitignore b/.gitignore index 09bf281..88bf4b8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **/*.rs.bk group-actor-data.toml .idea/ +groups.json diff --git a/Cargo.lock b/Cargo.lock index dad3b85..def3ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -267,10 +276,10 @@ checksum = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" [[package]] name = "elefren" version = "0.22.0" -source = "git+https://git.ondrovo.com/MightyPork/elefren-fork.git#3c7a84a12423ee69a2b45442e9a6d1f9a4818fd8" dependencies = [ "chrono", "doc-comment", + "futures-util", "hyper-old-types", "isolang", "log 0.4.14", @@ -279,9 +288,12 @@ dependencies = [ "serde_json", "serde_qs", "serde_urlencoded 0.6.1", - "tap-reader", + "thiserror", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tokio-util", "toml", - "tungstenite", "url 2.2.2", ] @@ -294,6 +306,19 @@ dependencies = [ "cfg-if 0.1.10", ] +[[package]] +name = "env_logger" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" +dependencies = [ + "atty", + "humantime", + "log 0.4.14", + "regex", + "termcolor", +] + [[package]] name = "fake-simd" version = "0.1.2" @@ -359,6 +384,21 @@ version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" +[[package]] +name = "futures" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1adc00f486adfc9ce99f77d717836f0c5aa84965eb0b4f051f4e83f7cab53f8b" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.16" @@ -366,6 +406,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74ed2411805f6e4e3d9bc904c95d5d423b89b3b25dc0250aa74729de20629ff9" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -374,12 +415,36 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af51b1b4a7fdff033703db39de8802c673eb91855f2e0d47dcf3bf2c0ef01f99" +[[package]] +name = "futures-executor" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d0d535a57b87e1ae31437b892713aee90cd2d7b0ee48727cd11fc72ef54761c" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b0e06c393068f3a6ef246c75cdca793d6a46347e75286933e5e75fd2fd11582" +[[package]] +name = "futures-macro" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c54913bae956fb8df7f4dc6fc90362aa72e69148e3f39041fbe8742d21e0ac57" +dependencies = [ + "autocfg 1.0.1", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.16" @@ -399,12 +464,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67eb846bfd58e44a8481a00049e82c43e0ccb5d61f8dc071057cb19249dd4d78" dependencies = [ "autocfg 1.0.1", + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -456,12 +526,16 @@ dependencies = [ "anyhow", "clap", "elefren", + "env_logger", + "futures 0.3.16", "log 0.4.14", "native-tls", "serde", "serde_json", - "simple-logging", "smart-default", + "thiserror", + "tokio", + "tokio-stream", "websocket", ] @@ -524,6 +598,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.10.16" @@ -585,16 +665,18 @@ dependencies = [ ] [[package]] -name = "hyper-tls" -version = "0.5.0" +name = "hyper-rustls" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" dependencies = [ - "bytes 1.0.1", + "futures-util", "hyper 0.14.4", - "native-tls", + "log 0.4.14", + "rustls", "tokio", - "tokio-native-tls", + "tokio-rustls", + "webpki", ] [[package]] @@ -629,6 +711,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "iovec" version = "0.1.4" @@ -705,7 +796,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" dependencies = [ "owning_ref", - "scopeguard", + "scopeguard 0.3.3", +] + +[[package]] +name = "lock_api" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb" +dependencies = [ + "scopeguard 1.1.0", ] [[package]] @@ -948,8 +1048,19 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.1.5", + "parking_lot_core 0.4.0", +] + +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api 0.4.4", + "parking_lot_core 0.8.3", ] [[package]] @@ -961,7 +1072,21 @@ dependencies = [ "libc", "rand 0.6.5", "rustc_version", - "smallvec", + "smallvec 0.6.10", + "winapi 0.3.8", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall 0.2.10", + "smallvec 1.6.1", "winapi 0.3.8", ] @@ -1059,6 +1184,18 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.28" @@ -1279,6 +1416,32 @@ version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +[[package]] +name = "redox_syscall" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + [[package]] name = "remove_dir_all" version = "0.5.2" @@ -1302,28 +1465,44 @@ dependencies = [ "http", "http-body", "hyper 0.14.4", - "hyper-tls", + "hyper-rustls", "ipnet", "js-sys", "lazy_static", "log 0.4.14", "mime 0.3.16", "mime_guess", - "native-tls", "percent-encoding 2.1.0", "pin-project-lite", + "rustls", "serde", "serde_json", "serde_urlencoded 0.7.0", "tokio", - "tokio-native-tls", + "tokio-rustls", "url 2.2.2", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi 0.3.8", +] + [[package]] name = "rustc_version" version = "0.2.3" @@ -1333,6 +1512,31 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64 0.13.0", + "log 0.4.14", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" +dependencies = [ + "openssl-probe", + "rustls", + "schannel", + "security-framework", +] + [[package]] name = "ryu" version = "1.0.0" @@ -1361,6 +1565,22 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.3.1" @@ -1491,14 +1711,12 @@ dependencies = [ ] [[package]] -name = "simple-logging" -version = "2.0.2" +name = "signal-hook-registry" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b00d48e85675326bb182a2286ea7c1a0b264333ae10f27a937a72be08628b542" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" dependencies = [ - "lazy_static", - "log 0.4.14", - "thread-id", + "libc", ] [[package]] @@ -1519,6 +1737,12 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab606a9c5e214920bb66c458cd7be8ef094f813f20fe77a54cc7dbfff220d4b7" +[[package]] +name = "smallvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" + [[package]] name = "smart-default" version = "0.6.0" @@ -1541,6 +1765,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "stable_deref_trait" version = "1.1.1" @@ -1564,12 +1794,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "tap-reader" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f13fc301d415a8cd4529ba679720c59f07369bcff573618a6e8d5afebefb6f3" - [[package]] name = "tempfile" version = "3.1.0" @@ -1579,11 +1803,20 @@ dependencies = [ "cfg-if 0.1.10", "libc", "rand 0.7.2", - "redox_syscall", + "redox_syscall 0.1.56", "remove_dir_all", "winapi 0.3.8", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -1613,17 +1846,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread-id" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fbf4c9d56b320106cd64fd024dadfa0be7cb4706725fc44a7d7ce952d820c1" -dependencies = [ - "libc", - "redox_syscall", - "winapi 0.3.8", -] - [[package]] name = "time" version = "0.1.42" @@ -1631,7 +1853,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" dependencies = [ "libc", - "redox_syscall", + "redox_syscall 0.1.56", "winapi 0.3.8", ] @@ -1647,7 +1869,11 @@ dependencies = [ "memchr", "mio 0.7.13", "num_cpus", + "once_cell", + "parking_lot 0.11.1", "pin-project-lite", + "signal-hook-registry", + "tokio-macros", "winapi 0.3.8", ] @@ -1658,7 +1884,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "tokio-io", ] @@ -1669,7 +1895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f27ee0e6db01c5f0b2973824547ce7e637b2ed79b891a9677b0de9bd532b6ac" dependencies = [ "crossbeam-utils", - "futures", + "futures 0.1.29", ] [[package]] @@ -1679,18 +1905,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "log 0.4.14", ] [[package]] -name = "tokio-native-tls" -version = "0.3.0" +name = "tokio-macros" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" dependencies = [ - "native-tls", - "tokio", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1700,18 +1927,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6af16bfac7e112bea8b0442542161bfc41cbfa4466b580bdda7d18cb88b911ce" dependencies = [ "crossbeam-utils", - "futures", + "futures 0.1.29", "lazy_static", "log 0.4.14", "mio 0.6.23", "num_cpus", - "parking_lot", + "parking_lot 0.7.1", "slab", "tokio-executor", "tokio-io", "tokio-sync", ] +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-stream" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-sync" version = "0.1.6" @@ -1719,7 +1968,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2162248ff317e2bc713b261f242b69dbb838b85248ed20bb21df56d60ea4cae7" dependencies = [ "fnv", - "futures", + "futures 0.1.29", ] [[package]] @@ -1729,7 +1978,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "iovec", "mio 0.6.23", "tokio-io", @@ -1742,11 +1991,28 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "354b8cd83825b3c20217a9dc174d6a0c67441a2fae5c41bcb1ea6679f6ae0f7c" dependencies = [ - "futures", + "futures 0.1.29", "native-tls", "tokio-io", ] +[[package]] +name = "tokio-tungstenite" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" +dependencies = [ + "futures-util", + "log 0.4.14", + "pin-project", + "rustls", + "tokio", + "tokio-rustls", + "tungstenite", + "webpki", + "webpki-roots", +] + [[package]] name = "tokio-util" version = "0.6.7" @@ -1810,9 +2076,9 @@ checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" [[package]] name = "tungstenite" -version = "0.15.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "983d40747bce878d2fb67d910dcb8bd3eca2b2358540c3cc1b98c027407a3ae3" +checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" dependencies = [ "base64 0.13.0", "byteorder", @@ -1820,12 +2086,14 @@ dependencies = [ "http", "httparse", "log 0.4.14", - "native-tls", "rand 0.8.4", + "rustls", + "rustls-native-certs", "sha-1 0.9.7", "thiserror", "url 2.2.2", "utf-8", + "webpki", ] [[package]] @@ -1873,7 +2141,7 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "141339a08b982d942be2ca06ff8b076563cbe223d1befd5450716790d44e2426" dependencies = [ - "smallvec", + "smallvec 0.6.10", ] [[package]] @@ -1888,6 +2156,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "1.7.2" @@ -2041,6 +2315,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +dependencies = [ + "webpki", +] + [[package]] name = "websocket" version = "0.26.2" @@ -2048,7 +2341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "723abe6b75286edc51d8ecabb38a2353f62a9e9b0588998b59111474f1dcd637" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.29", "hyper 0.10.16", "native-tls", "rand 0.6.5", @@ -2072,7 +2365,7 @@ dependencies = [ "bitflags", "byteorder", "bytes 0.4.12", - "futures", + "futures 0.1.29", "native-tls", "rand 0.6.5", "sha-1 0.8.2", @@ -2110,6 +2403,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 8893d06..d6d169d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,11 @@ build = "build.rs" [dependencies] #elefren = { version = "0.22.0", features = ["toml"] } -#elefren = { path = "../elefren22-fork", features = ["toml"] } -elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git", features = ["toml"] } +elefren = { path = "../elefren22-fork" } +#elefren = { git = "https://git.ondrovo.com/MightyPork/elefren-fork.git", features = ["toml"] } + +env_logger = "0.9.0" -simple-logging = "2.0.2" #elefren = { path = "../elefren-fork" } log = "0.4.14" serde = "1" @@ -21,6 +22,10 @@ serde_json = "1" smart-default = "0.6.0" anyhow = "1" clap = "2.33.0" +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1.7" +thiserror = "1.0.26" +futures = "0.3" native-tls = "0.2.8" websocket = "0.26.2" diff --git a/src/group_handle.rs b/src/group_handle.rs new file mode 100644 index 0000000..6029187 --- /dev/null +++ b/src/group_handle.rs @@ -0,0 +1,209 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use elefren::{FediClient, StatusBuilder}; +use elefren::debug::EventDisplay; +use elefren::debug::NotificationDisplay; +use elefren::entities::event::Event; +use elefren::entities::notification::{Notification, NotificationType}; +use elefren::status_builder::Visibility; +use futures::StreamExt; + +use crate::store::{ConfigStore, GroupError}; +use crate::store::data::GroupConfig; +use crate::utils::LogError; + +/// This is one group's config store capable of persistence +#[derive(Debug)] +pub struct GroupHandle { + pub(crate) client: FediClient, + pub(crate) config: GroupConfig, + pub(crate) store: Arc, +} + +impl GroupHandle { + pub async fn save(&mut self) -> Result<(), GroupError> { + debug!("Saving group config & status"); + self.store.set_group_config(self.config.clone()).await?; + self.config.clear_dirty_status(); + Ok(()) + } + + pub async fn save_if_needed(&mut self) -> Result<(), GroupError> { + if self.config.is_dirty() { + self.save().await?; + } + Ok(()) + } + + pub async fn reload(&mut self) -> Result<(), GroupError> { + if let Some(g) = self.store.get_group_config(self.config.get_acct()).await { + self.config = g; + Ok(()) + } else { + Err(GroupError::GroupNotExist) + } + } +} + +trait NotifTimestamp { + fn timestamp_millis(&self) -> u64; +} + +impl NotifTimestamp for Notification { + fn timestamp_millis(&self) -> u64 { + self.created_at.timestamp_millis().max(0) as u64 + } +} + +impl GroupHandle { + pub async fn run(&mut self) -> Result<(), GroupError> { + const PERIODIC_SAVE: Duration = Duration::from_secs(60); + const PING_INTERVAL: Duration = Duration::from_secs(15); + + assert!(PERIODIC_SAVE >= PING_INTERVAL); + + let mut next_save = Instant::now() + PERIODIC_SAVE; // so we save at start + + loop { + debug!("Opening streaming API socket"); + let mut events = self.client.streaming_user().await?; + + match self.catch_up_with_missed_notifications().await { + Ok(true) => { + debug!("Some missed notifs handled"); + // Save asap! + next_save = Instant::now() - PERIODIC_SAVE + } + Ok(false) => { + debug!("No notifs missed"); + } + Err(e) => { + error!("Failed to handle missed notifs: {}", e); + } + } + + loop { + if next_save < Instant::now() { + self.save_if_needed().await + .log_error("Failed to save group"); + next_save = Instant::now() + PERIODIC_SAVE; + } + + let timeout = next_save.saturating_duration_since(Instant::now()) + .min(PING_INTERVAL) + .max(Duration::from_secs(1)); + + match tokio::time::timeout(timeout, events.next()).await { + Ok(Some(event)) => { + debug!("(@{}) Event: {}", self.config.get_acct(), EventDisplay(&event)); + match event { + Event::Update(_status) => {} + Event::Notification(n) => { + self.handle_notification(n).await; + } + Event::Delete(_id) => {} + Event::FiltersChanged => {} + } + } + Ok(None) => { + warn!("Group @{} socket closed, restarting...", self.config.get_acct()); + break; + } + Err(_) => { + // Timeout so we can save if needed + } + } + + trace!("Pinging"); + events.send_ping().await.log_error("Fail to send ping"); + } + + warn!("Notif stream closed, will reopen"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + + async fn handle_notification(&mut self, n: Notification) { + debug!("Handling notif #{}", n.id); + let ts = n.timestamp_millis(); + self.config.set_last_notif(ts); + + match n.notification_type { + NotificationType::Mention => { + if let Some(status) = n.status { + if status.content.contains("/gi") || status.content.contains("\\gi") { + info!("Mention ignored by gi"); + } else if status.content.contains("/gb") || status.content.contains("\\gb") { + if let Some(id) = status.in_reply_to_id { + info!("Boosting prev post by GB"); + tokio::time::sleep(Duration::from_millis(500)).await; +// self.client.reblog(&id).await.log_error("Failed to boost"); + } + } else { + info!("Boosting mention"); + tokio::time::sleep(Duration::from_millis(500)).await; +// self.client.reblog(&status.id).await.log_error("Failed to boost"); + } + } + } + NotificationType::Follow => { + info!("New follower!"); + tokio::time::sleep(Duration::from_millis(500)).await; + + /* + let post = StatusBuilder::new() + .status(format!("@{} welcome to the group!", &n.account.acct)) + .content_type("text/markdown") + .visibility(Visibility::Unlisted) + .build().expect("error build status"); + + let _ = self.client.new_status(post).await.log_error("Failed to post"); + */ + } + _ => {} + } + } + + /// Catch up with missed notifications, returns true if any were handled + async fn catch_up_with_missed_notifications(&mut self) -> Result { + const MAX_CATCHUP_NOTIFS: usize = 25; + let last_notif = self.config.get_last_notif(); + + let notifications = self.client.notifications().await?; + let mut iter = notifications.items_iter(); + + let mut notifs_to_handle = vec![]; + + // They are retrieved newest first, but we want oldest first for chronological handling + + let mut num = 0; + while let Some(n) = iter.next_item().await { + let ts = n.timestamp_millis(); + if ts <= last_notif { + break; // reached our last seen notif + } + notifs_to_handle.push(n); + num += 1; + if num > MAX_CATCHUP_NOTIFS { + warn!("Too many notifs missed to catch up!"); + break; + } + } + + if notifs_to_handle.is_empty() { + return Ok(false); + } + + notifs_to_handle.reverse(); + + debug!("{} notifications to catch up!", notifs_to_handle.len()); + + for n in notifs_to_handle { + debug!("Handling missed notification: {}", NotificationDisplay(&n)); + self.handle_notification(n).await; + } + + return Ok(true); + } +} diff --git a/src/main.rs b/src/main.rs index f005ae0..00fe785 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,33 +1,97 @@ +#![deny(unused_must_use)] + #[macro_use] extern crate log; #[macro_use] extern crate smart_default; #[macro_use] extern crate serde; +#[macro_use] +extern crate thiserror; extern crate elefren; -use elefren::prelude::*; -use elefren::helpers::{cli, toml}; use elefren::entities::event::Event; use log::LevelFilter; use elefren::entities::notification::NotificationType; use elefren::entities::account::Account; use elefren::status_builder::Visibility; +use elefren::{Registration, Scopes, StatusBuilder, FediClient}; +use tokio_stream::{Stream, StreamExt}; +use elefren::scopes; +use crate::store::{NewGroupOptions, StoreOptions}; +use elefren::debug::NotificationDisplay; + +mod store; +mod group_handle; +mod utils; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::Builder::new() + .filter_level(LevelFilter::Debug) + .write_style(env_logger::WriteStyle::Always) + .filter_module("rustls", LevelFilter::Warn) + .filter_module("reqwest", LevelFilter::Warn) + .init(); + + let store = store::ConfigStore::new(StoreOptions { + store_path: "groups.json".to_string(), + save_pretty: true + }).await?; + + /* + let mut new_group = store.auth_new_group(NewGroupOptions { + server: "https://botsin.space".to_string(), + acct: "betty@botsin.space".to_string() + }).await?; + // */ -fn main() -> anyhow::Result<()> { - simple_logging::log_to_stderr(LevelFilter::Trace); + let mut groups = store.spawn_groups().await; + //groups.push(new_group); + + +/* + let mut betty = groups.remove(0); + + let notifications = betty.client.notifications().await?; + let mut iter = notifications.items_iter(); + + let mut num = 0; + while let Some(n) = iter.next_item().await { + debug!("A notification: {}", NotificationDisplay(&n)); + num += 1; + if num > 10 { + break; + } + } - let client = if let Ok(data) = toml::from_file("group-actor-data.toml") { - Mastodon::from(data) + return Ok(()); + */ + + let mut handles = vec![]; + + for mut g in groups { + handles.push(tokio::spawn(async move { + g.run().await + })); + } + + futures::future::join_all(handles).await; + + /* + + let client = if let Ok(data) = elefren::helpers::toml::from_file("group-actor-data.toml") { + FediClient::from(data) } else { - register()? + register().await? }; - let you = client.verify_credentials()?; - + let you = client.verify_credentials().await?; println!("{:#?}", you); - for event in client.streaming_user()? { + let mut events = client.streaming_user().await?; + + while let Some(event) = events.next().await { match event { Event::Update(status) => { info!("Status: {:?}", status); @@ -36,6 +100,7 @@ fn main() -> anyhow::Result<()> { info!("Notification: {:?}", notification.notification_type); debug!("{:?}", notification); + /* match notification.notification_type { NotificationType::Mention => { if let Some(status) = notification.status { @@ -72,10 +137,11 @@ fn main() -> anyhow::Result<()> { .visibility(Visibility::Unlisted) .build().expect("error build status"); - let _ = client.new_status(post); + let _ = client.new_status(post).await; } _ => {} } + */ }, Event::Delete(id) => { info!("Delete: {}", id); @@ -87,24 +153,29 @@ fn main() -> anyhow::Result<()> { } } + */ + println!("Main loop ended!"); Ok(()) } -fn register() -> anyhow::Result { +/* +async fn register() -> anyhow::Result { let registration = Registration::new("https://piggo.space") .client_name("group-actor") - .scopes(Scopes::all()) - .build()?; - let client = cli::authenticate(registration)?; - toml::to_file(&*client, "group-actor-data.toml")?; + .force_login(true) + .scopes( + Scopes::read(scopes::Read::Accounts) + | Scopes::read(scopes::Read::Notifications) + | Scopes::read(scopes::Read::Statuses) + | Scopes::read(scopes::Read::Follows) + | Scopes::write(scopes::Write::Statuses) + | Scopes::write(scopes::Write::Media) + ) + .build().await?; + let client = elefren::helpers::cli::authenticate(registration).await?; + elefren::helpers::toml::to_file(&*client, "group-actor-data.toml")?; Ok(client) } - -fn make_mention(user : &Account) -> String { - format!(r#"@{handle}"#, - id=user.id, - url=user.url, handle=user.acct - ) -} +*/ diff --git a/src/store/data.rs b/src/store/data.rs new file mode 100644 index 0000000..e8eaf55 --- /dev/null +++ b/src/store/data.rs @@ -0,0 +1,333 @@ +use std::collections::{HashMap, HashSet}; + +use crate::store; +use crate::store::GroupError; +use elefren::AppData; + +/// This is the inner data struct holding the config +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub(crate) struct Config { + groups: HashMap, +} + +impl Config { + pub(crate) fn iter_groups(&self) -> impl Iterator{ + self.groups.values() + } + + pub(crate) fn get_group_config(&self, acct : &str) -> Option<&GroupConfig> { + self.groups.get(acct) + } + + pub(crate) fn set_group_config(&mut self, grp : GroupConfig) { + self.groups.insert(grp.acct.clone(), grp); + } + +} + +/// This is the inner data struct holding a group's config +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub(crate) struct GroupConfig { + enabled: bool, + /// Group actor's acct + acct: String, + /// elefren data + appdata : AppData, + /// List of admin account "acct" names, e.g. piggo@piggo.space + admin_users: HashSet, + /// List of users allowed to post to the group, if it is member-only + member_users: HashSet, + /// List of users banned from posting to the group + banned_users: HashSet, + /// True if only members should be allowed to write + member_only: bool, + /// Banned domain names, e.g. kiwifarms.cc + banned_servers: HashSet, + /// Last seen notification timestamp + last_notif_ts: u64, + + #[serde(skip)] + dirty: bool, +} + +impl Default for GroupConfig { + fn default() -> Self { + Self { + enabled: true, + acct: "".to_string(), + appdata: AppData { + base: Default::default(), + client_id: Default::default(), + client_secret: Default::default(), + redirect: Default::default(), + token: Default::default() + }, + admin_users: Default::default(), + member_users: Default::default(), + banned_users: Default::default(), + member_only: false, + banned_servers: Default::default(), + last_notif_ts: 0, + dirty: false, + } + } +} + +impl GroupConfig { + pub(crate) fn new(acct : String, appdata: AppData) -> Self { + Self { + acct, + appdata, + ..Default::default() + } + } + + pub(crate) fn is_enabled(&self) -> bool { + self.enabled + } + + pub(crate) fn set_enabled(&mut self, ena: bool){ + self.enabled = ena; + self.mark_dirty(); + } + + pub(crate) fn get_appdata(&self) -> &AppData { + &self.appdata + } + + pub(crate) fn set_appdata(&mut self, appdata: AppData) { + self.appdata = appdata; + self.mark_dirty(); + } + + pub(crate) fn set_last_notif(&mut self, ts: u64) { + self.last_notif_ts = self.last_notif_ts.max(ts); + self.mark_dirty(); + } + + pub(crate) fn get_last_notif(&self) -> u64 { + self.last_notif_ts + } + + pub(crate) fn get_acct(&self) -> &str { + &self.acct + } + + pub(crate) fn is_admin(&self, acct: &str) -> bool { + self.admin_users.contains(acct) + } + + pub(crate) fn is_member(&self, acct: &str) -> bool { + self.member_users.contains(acct) + } + + pub(crate) fn is_banned(&self, acct: &str) -> bool { + self.banned_users.contains(acct) + || self.is_users_server_banned(acct) + } + + /// Check if the user's server is banned + fn is_users_server_banned(&self, acct: &str) -> bool { + let server = acct_to_server(acct); + self.banned_servers.contains(server) + } + + pub(crate) fn can_write(&self, acct: &str) -> bool { + if self.is_admin(acct) { + true + } else { + !self.is_banned(acct) && ( + !self.is_member_only() + || self.is_member(acct) + ) + } + } + + pub(crate) fn set_admin(&mut self, acct: &str, admin: bool) -> Result<(), GroupError> { + if admin { + if self.is_banned(acct) { + return Err(GroupError::UserIsBanned); + } + self.admin_users.insert(acct.to_owned()); + } else { + self.admin_users.remove(acct); + } + self.mark_dirty(); + Ok(()) + } + + pub(crate) fn set_member(&mut self, acct: &str, member: bool) -> Result<(), GroupError> { + if member { + if self.is_banned(acct) { + return Err(GroupError::UserIsBanned); + } + self.member_users.insert(acct.to_owned()); + } else { + self.member_users.remove(acct); + } + self.mark_dirty(); + Ok(()) + } + + pub(crate) fn ban_user(&mut self, acct: &str, ban: bool) -> Result<(), GroupError> { + if ban { + if self.is_admin(acct) { + return Err(GroupError::UserIsAdmin); + } + self.banned_users.insert(acct.to_owned()); + } else { + self.banned_users.remove(acct); + } + Ok(()) + } + + pub(crate) fn ban_server(&mut self, server: &str, ban: bool) -> Result<(), GroupError> { + if ban { + for acct in &self.admin_users { + let acct_server = acct_to_server(acct); + if acct_server == server { + return Err(GroupError::AdminsOnServer); + } + } + self.banned_servers.insert(server.to_owned()); + } else { + self.banned_servers.remove(server); + } + self.mark_dirty(); + Ok(()) + } + + pub(crate) fn set_member_only(&mut self, member_only: bool) { + self.member_only = member_only; + self.mark_dirty(); + } + + pub(crate) fn is_member_only(&self) -> bool { + self.member_only + } + + pub(crate) fn mark_dirty(&mut self) { + self.dirty = true; + } + + pub(crate) fn is_dirty(&self) -> bool { + self.dirty + } + + pub(crate) fn clear_dirty_status(&mut self) { + self.dirty = false; + } +} + +fn acct_to_server(acct: &str) -> &str { + acct.split('@').nth(1) + .unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use crate::store::{GroupError}; + use crate::store::data::{GroupConfig, acct_to_server}; + + #[test] + fn test_acct_to_server() { + assert_eq!("pikachu.rocks", acct_to_server("raichu@pikachu.rocks")); + assert_eq!("pikachu.rocks", acct_to_server("m@pikachu.rocks")); + assert_eq!("", acct_to_server("what")); + } + + #[test] + fn test_default_rules() { + let mut group = GroupConfig::default(); + assert!(!group.is_member_only()); + assert!(!group.is_member("piggo@piggo.space")); + assert!(!group.is_admin("piggo@piggo.space")); + assert!(group.can_write("piggo@piggo.space"), "anyone can post by default"); + } + + #[test] + fn test_member_only() { + let mut group = GroupConfig::default(); + assert!(group.can_write("piggo@piggo.space"), "rando can write in public group"); + + group.set_member_only(true); + assert!(!group.can_write("piggo@piggo.space"), "rando can't write in member-only group"); + + // Admin in member only + group.set_admin("piggo@piggo.space", true).unwrap(); + assert!(group.can_write("piggo@piggo.space"), "admin non-member can write in member-only group"); + group.set_admin("piggo@piggo.space", false).unwrap(); + assert!(!group.can_write("piggo@piggo.space"), "removed admin removes privileged write access"); + + // Member in member only + group.set_member("piggo@piggo.space", true).unwrap(); + assert!(group.can_write("piggo@piggo.space"), "member can post in member-only group"); + group.set_admin("piggo@piggo.space", true).unwrap(); + assert!(group.can_write("piggo@piggo.space"), "member+admin can post in member-only group"); + } + + #[test] + fn test_banned_users() { + // Banning single user + let mut group = GroupConfig::default(); + group.ban_user("piggo@piggo.space", true).unwrap(); + assert!(!group.can_write("piggo@piggo.space"), "banned user can't post"); + group.ban_user("piggo@piggo.space", false).unwrap(); + assert!(group.can_write("piggo@piggo.space"), "un-ban works"); + } + + #[test] + fn test_banned_members() { + // Banning single user + let mut group = GroupConfig::default(); + group.set_member_only(true); + + group.set_member("piggo@piggo.space", true).unwrap(); + assert!(group.can_write("piggo@piggo.space"), "member can write"); + assert!(group.is_member("piggo@piggo.space"), "member is member"); + assert!(!group.is_banned("piggo@piggo.space"), "user not banned by default"); + + group.ban_user("piggo@piggo.space", true).unwrap(); + assert!(group.is_member("piggo@piggo.space"), "still member even if banned"); + assert!(group.is_banned("piggo@piggo.space"), "banned user is banned"); + + assert!(!group.can_write("piggo@piggo.space"), "banned member can't post"); + + // unban + group.ban_user("piggo@piggo.space", false).unwrap(); + assert!(group.can_write("piggo@piggo.space"), "un-ban works"); + } + + #[test] + fn test_server_ban() { + let mut group = GroupConfig::default(); + assert!(group.can_write("hitler@nazi.camp"), "randos can write"); + + group.ban_server("nazi.camp", true).unwrap(); + assert!(!group.can_write("hitler@nazi.camp"), "users from banned server can't write"); + assert!(!group.can_write("1488@nazi.camp"), "users from banned server can't write"); + assert!(group.can_write("troll@freezepeach.xyz"), "other users can still write"); + + group.ban_server("nazi.camp", false).unwrap(); + assert!(group.can_write("hitler@nazi.camp"), "server unban works"); + } + + #[test] + fn test_sanity() { + let mut group = GroupConfig::default(); + + group.set_admin("piggo@piggo.space", true).unwrap(); + assert_eq!(Err(GroupError::UserIsAdmin), group.ban_user("piggo@piggo.space", true), "can't bad admin users"); + group.ban_user("piggo@piggo.space", false).expect("can unbad admin"); + + group.ban_user("hitler@nazi.camp", true).unwrap(); + assert_eq!(Err(GroupError::UserIsBanned), group.set_admin("hitler@nazi.camp", true), "can't make banned users admins"); + + group.ban_server("freespeechextremist.com", true).unwrap(); + assert_eq!(Err(GroupError::UserIsBanned), group.set_admin("nibber@freespeechextremist.com", true), "can't make server-banned users admins"); + + assert!(group.is_admin("piggo@piggo.space")); + assert_eq!(Err(GroupError::AdminsOnServer), group.ban_server("piggo.space", true), "can't bad server with admins"); + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 0000000..c87edc5 --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1,225 @@ +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use elefren::{FediClient, Registration, Scopes, scopes}; +use elefren::entities::event::Event; +use futures::StreamExt; +use tokio::sync::RwLock; + +use data::{Config, GroupConfig}; + +use crate::group_handle::GroupHandle; + +pub(crate) mod data; + +#[derive(Debug, Default)] +pub struct ConfigStore { + store_path: PathBuf, + save_pretty: bool, + data: tokio::sync::RwLock, +} + +#[derive(Debug)] +pub struct NewGroupOptions { + pub server: String, + pub acct: String, +} + +#[derive(Debug)] +pub struct StoreOptions { + pub store_path: String, + pub save_pretty: bool, +} + +impl ConfigStore { + /// Create a new instance of the store. + /// If a path is given, it will try to load the content from a file. + pub async fn new(options: StoreOptions) -> Result, GroupError> { + let path: &Path = options.store_path.as_ref(); + + let config = if path.is_file() { + let f = tokio::fs::read(path).await?; + serde_json::from_slice(&f)? + } else { + let empty = Config::default(); + tokio::fs::write(path, serde_json::to_string(&empty)?.as_bytes()).await?; + empty + }; + + Ok(Arc::new(Self { + store_path: path.to_owned(), + save_pretty: options.save_pretty, + data: RwLock::new(config), + })) + } + + /// Spawn a new group + pub async fn auth_new_group(self: &Arc, opts: NewGroupOptions) -> Result { + let registration = Registration::new(&opts.server) + .client_name("group-actor") + .force_login(true) + .scopes(make_scopes()) + .build().await?; + + println!("--- Authenticating NEW bot user @{} ---", opts.acct); + let client = elefren::helpers::cli::authenticate(registration).await?; + let appdata = client.data.clone(); + + let data = GroupConfig::new(opts.acct, appdata); + + // save & persist + self.set_group_config(data.clone()).await?; + + Ok(GroupHandle { + client, + config: data, + store: self.clone(), + }) + } + + /// Re-auth an existing group + pub async fn reauth_group(self: &Arc, acct: &str) -> Result { + let groups = self.data.read().await; + let mut config = groups.get_group_config(acct).ok_or(GroupError::GroupNotExist)?.clone(); + + println!("--- Re-authenticating bot user @{} ---", acct); + let registration = Registration::new(config.get_appdata().base.to_string()) + .client_name("group-actor") + .force_login(true) + .scopes(make_scopes()) + .build().await?; + + let client = elefren::helpers::cli::authenticate(registration).await?; + let appdata = client.data.clone(); + + config.set_appdata(appdata); + self.set_group_config(config.clone()).await?; + + Ok(GroupHandle { + client, + config, + store: self.clone(), + }) + } + + /// Spawn existing group using saved creds + pub async fn spawn_groups(self: Arc) -> Vec { + let groups = self.data.read().await; + let groups_iter = groups.iter_groups().cloned(); + + // Connect in parallel + futures::stream::iter(groups_iter).map(|gc| async { + if !gc.is_enabled() { + debug!("Group @{} is DISABLED", gc.get_acct()); + return None; + } + + debug!("Connecting to @{}", gc.get_acct()); + + let client = FediClient::from(gc.get_appdata().clone()); + + match client.verify_credentials().await { + Ok(account) => { + info!("Group account verified: @{}, {}", account.acct, account.display_name); + } + Err(e) => { + error!("Group @{} auth error: {}", gc.get_acct(), e); + return None; + } + }; + + Some(GroupHandle { + client, + config: gc, + store: self.clone(), + }) + }).buffer_unordered(8).collect::>().await + .into_iter().flatten().collect() + } + + pub(crate) async fn get_group_config(&self, group: &str) -> Option { + let c = self.data.read().await; + c.get_group_config(group).map(|inner| { + inner.clone() + }) + } + + //noinspection RsSelfConvention + /// Set group config to the store. The store then saved. + pub(crate) async fn set_group_config<'a>(&'a self, config: GroupConfig) -> Result<(), GroupError> { + let mut data = self.data.write().await; + data.set_group_config(config); + self.persist(&data).await?; + Ok(()) + } + + /// Persist the store + async fn persist(&self, data: &Config) -> Result<(), GroupError> { + tokio::fs::write(&self.store_path, + if self.save_pretty { + serde_json::to_string_pretty(&data) + } else { + serde_json::to_string(&data) + }?.as_bytes()) + .await?; + Ok(()) + } +} + +#[derive(Debug, Error)] +pub enum GroupError { + #[error("Operation refused because the user is admin")] + UserIsAdmin, + #[error("Operation refused because the user is banned")] + UserIsBanned, + #[error("Server could not be banned because there are admin users on it")] + AdminsOnServer, + #[error("Group config is missing in the config store")] + GroupNotExist, + #[error(transparent)] + IoError(#[from] std::io::Error), + #[error(transparent)] + Serializer(#[from] serde_json::Error), + #[error(transparent)] + Elefren(#[from] elefren::Error), +} + +// this is for tests +impl PartialEq for GroupError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::UserIsAdmin, Self::UserIsAdmin) => true, + (Self::UserIsBanned, Self::UserIsBanned) => true, + (Self::AdminsOnServer, Self::AdminsOnServer) => true, + (Self::GroupNotExist, Self::GroupNotExist) => true, + _ => false, + } + } +} + +fn make_scopes() -> Scopes { + Scopes::read(scopes::Read::Accounts) + | Scopes::read(scopes::Read::Notifications) + | Scopes::read(scopes::Read::Statuses) + | Scopes::read(scopes::Read::Follows) + | Scopes::write(scopes::Write::Statuses) + | Scopes::write(scopes::Write::Media) +} + +// trait TapOk { +// fn tap_ok(self, f: F) -> Self; +// } +// +// impl TapOk for Result { +// fn tap_ok(self, f: F) -> Self { +// match self { +// Ok(v) => { +// f(&v); +// Ok(v) +// } +// Err(e) => Err(e) +// } +// } +// } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..a3fbf25 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,16 @@ +use std::error::Error; + +pub trait LogError { + fn log_error>(self, msg: S); +} + +impl LogError for Result { + fn log_error>(self, msg: S) { + match self { + Ok(_) => {} + Err(e) => { + error!("{}: {}", msg.as_ref(), e); + } + } + } +}