From 5d7cbec34cd5e473d5768b39054d99bc72ddad62 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Thu, 27 Apr 2023 12:55:30 +1000 Subject: [PATCH] Move DB interactions to a background thread --- src/database/CMakeLists.txt | 2 +- src/database/database.cpp | 128 +++++++++++++++++-------- src/database/db_task.cpp | 91 ++++++++++++++++++ src/database/env_esp.cpp | 39 ++------ src/database/include/database.hpp | 77 +++++++++++---- src/database/include/db_task.hpp | 25 +++++ src/database/include/env_esp.hpp | 22 +---- src/database/include/file_gatherer.hpp | 2 +- src/database/include/tag_processor.hpp | 2 +- src/database/tag_processor.cpp | 4 +- src/main/app_console.cpp | 41 ++++---- src/main/app_console.hpp | 5 +- src/main/main.cpp | 41 ++------ 13 files changed, 308 insertions(+), 171 deletions(-) create mode 100644 src/database/db_task.cpp create mode 100644 src/database/include/db_task.hpp diff --git a/src/database/CMakeLists.txt b/src/database/CMakeLists.txt index 3f9ca6fb..0bc6f6b9 100644 --- a/src/database/CMakeLists.txt +++ b/src/database/CMakeLists.txt @@ -1,5 +1,5 @@ idf_component_register( - SRCS "env_esp.cpp" "database.cpp" "tag_processor.cpp" + SRCS "env_esp.cpp" "database.cpp" "tag_processor.cpp" "db_task.cpp" INCLUDE_DIRS "include" REQUIRES "result" "span" "esp_psram" "fatfs") diff --git a/src/database/database.cpp b/src/database/database.cpp index b677f4ba..8ca72771 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -4,75 +4,123 @@ #include "ff.h" #include "leveldb/cache.h" +#include "db_task.hpp" +#include "env_esp.hpp" #include "file_gatherer.hpp" #include "leveldb/iterator.h" +#include "leveldb/options.h" #include "leveldb/slice.h" +#include "result.hpp" #include "tag_processor.hpp" -#include "env_esp.hpp" -#include "leveldb/options.h" namespace database { static SingletonEnv sEnv; -static const char *kTag = "DB"; +static const char* kTag = "DB"; + +static std::atomic sIsDbOpen(false); auto Database::Open() -> cpp::result { - leveldb::DB* db; - leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); - leveldb::Options options; - options.env = sEnv.env(); - options.create_if_missing = true; - options.write_buffer_size = 48 * 1024; - options.max_file_size = 32; - options.block_cache = cache; - options.block_size = 512; - - auto status = leveldb::DB::Open(options, "/.db", &db); - if (!status.ok()) { - delete cache; - ESP_LOGE(kTag, "failed to open db, status %s", status.ToString().c_str()); - return cpp::fail(FAILED_TO_OPEN); + // TODO(jacqueline): Why isn't compare_and_exchange_* available? + if (sIsDbOpen.exchange(true)) { + return cpp::fail(DatabaseError::ALREADY_OPEN); } - return new Database(db, cache); + if (!StartDbTask()) { + return cpp::fail(DatabaseError::ALREADY_OPEN); + } + + return RunOnDbTask>( + []() -> cpp::result { + leveldb::DB* db; + leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); + leveldb::Options options; + options.env = sEnv.env(); + options.create_if_missing = true; + options.write_buffer_size = 48 * 1024; + options.max_file_size = 32; + options.block_cache = cache; + options.block_size = 512; + + auto status = leveldb::DB::Open(options, "/.db", &db); + if (!status.ok()) { + delete cache; + ESP_LOGE(kTag, "failed to open db, status %s", + status.ToString().c_str()); + return cpp::fail(FAILED_TO_OPEN); + } + + ESP_LOGI(kTag, "Database opened successfully"); + return new Database(db, cache); + }) + .get(); } Database::Database(leveldb::DB* db, leveldb::Cache* cache) : db_(db), cache_(cache) {} -Database::~Database() {} +Database::~Database() { + QuitDbTask(); + sIsDbOpen.store(false); +} -auto Database::Initialise() -> void { - leveldb::WriteOptions opt; - opt.sync = true; - FindFiles("", [&](const std::string &path) { +template +auto IterateAndParse(leveldb::Iterator* it, std::size_t limit, Parser p) + -> void { + for (int i = 0; i < limit; i++) { + if (!it->Valid()) { + break; + } + std::invoke(p, it->key(), it->value()); + it->Next(); + } +} + +auto Database::Populate() -> std::future { + return RunOnDbTask([&]() -> void { + leveldb::WriteOptions opt; + opt.sync = true; + FindFiles("", [&](const std::string& path) { ESP_LOGI(kTag, "considering %s", path.c_str()); FileInfo info; if (GetInfo(path, &info)) { ESP_LOGI(kTag, "added as '%s'", info.title.c_str()); db_->Put(opt, "title:" + info.title, path); } + }); + db_->Put(opt, "title:coolkeywithoutval", leveldb::Slice()); }); - db_->Put(opt, "title:coolkeywithoutval", leveldb::Slice()); } -auto Database::ByTitle() -> Iterator { - leveldb::Iterator *it = db_->NewIterator(leveldb::ReadOptions()); - it->Seek("title:"); - while (it->Valid()) { - ESP_LOGI(kTag, "%s : %s", it->key().ToString().c_str(), it->value().ToString().c_str()); - it->Next(); - } - return Iterator(it); +auto Database::GetSongs(std::size_t page_size) -> std::future> { + return RunOnDbTask>([&]() -> DbResult { + std::unique_ptr it( + db_->NewIterator(leveldb::ReadOptions())); + it->Seek("title:"); + std::vector results; + IterateAndParse(it.get(), page_size, + [&](const leveldb::Slice& key, const leveldb::Slice& val) { + Song s; + s.title = key.ToString(); + results.push_back(s); + }); + return DbResult(results, std::move(it)); + }); } -auto Iterator::Next() -> std::optional { - if (!it_->Valid()) { - return {}; - } - std::string ret = it_->key().ToString(); - it_->Next(); - return ret; +auto Database::GetMoreSongs(std::size_t page_size, DbResult continuation) + -> std::future> { + return RunOnDbTask>([&]() -> DbResult { + std::unique_ptr it(continuation.it()); + std::vector results; + IterateAndParse(it.get(), page_size, + [&](const leveldb::Slice& key, const leveldb::Slice& val) { + Song s; + s.title = key.ToString(); + results.push_back(s); + }); + return DbResult(results, std::move(it)); + }); } } // namespace database diff --git a/src/database/db_task.cpp b/src/database/db_task.cpp new file mode 100644 index 00000000..ce1cd98a --- /dev/null +++ b/src/database/db_task.cpp @@ -0,0 +1,91 @@ +#include "db_task.hpp" + +#include + +#include "esp_heap_caps.h" +#include "freertos/FreeRTOS.h" +#include "freertos/portmacro.h" +#include "freertos/projdefs.h" +#include "freertos/queue.h" +#include "freertos/task.h" + +namespace database { + +static const std::size_t kDbStackSize = 256 * 1024; +static StaticTask_t sDbStaticTask; +static StackType_t* sDbStack = nullptr; + +static std::atomic sTaskRunning(false); +static QueueHandle_t sWorkQueue; + +struct WorkItem { + std::function* fn; + bool quit; +}; + +auto SendToDbTask(std::function fn) -> void { + WorkItem item{ + .fn = new std::function(fn), + .quit = false, + }; + xQueueSend(sWorkQueue, &item, portMAX_DELAY); +} + +template <> +auto RunOnDbTask(std::function fn) -> std::future { + std::shared_ptr> promise = + std::make_shared>(); + SendToDbTask([=]() { + std::invoke(fn); + promise->set_value(); + }); + return promise->get_future(); +} + +void DatabaseTaskMain(void* args) { + while (true) { + WorkItem item; + if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) { + if (item.quit) { + break; + } + if (item.fn != nullptr) { + std::invoke(*item.fn); + delete item.fn; + } + } + } + vQueueDelete(sWorkQueue); + sTaskRunning.store(false); + vTaskDelete(NULL); +} + +auto StartDbTask() -> bool { + if (sTaskRunning.exchange(true)) { + return false; + } + if (sDbStack == nullptr) { + sDbStack = reinterpret_cast( + heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM)); + } + sWorkQueue = xQueueCreate(8, sizeof(std::function*)); + xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack, + &sDbStaticTask); + return true; +} + +auto QuitDbTask() -> void { + if (!sTaskRunning.load()) { + return; + } + WorkItem item{ + .fn = nullptr, + .quit = true, + }; + xQueueSend(sWorkQueue, &item, portMAX_DELAY); + while (sTaskRunning.load()) { + vTaskDelay(pdMS_TO_TICKS(1)); + } +} + +} // namespace database diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp index 363421a7..71d4fcea 100644 --- a/src/database/env_esp.cpp +++ b/src/database/env_esp.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -28,6 +29,8 @@ #include "leveldb/slice.h" #include "leveldb/status.h" +#include "db_task.hpp" + namespace leveldb { std::string ErrToStr(FRESULT err) { @@ -447,43 +450,13 @@ void EspEnv::SleepForMicroseconds(int micros) { vTaskDelay(pdMS_TO_TICKS(micros / 1000)); } -extern "C" void BackgroundThreadEntryPoint(EspEnv* env) { - env->BackgroundThreadMain(); -} - -EspEnv::EspEnv() - : background_work_mutex_(), - started_background_thread_(false), - background_work_queue_(xQueueCreate(4, sizeof(BackgroundWorkItem))) {} +EspEnv::EspEnv() {} void EspEnv::Schedule( void (*background_work_function)(void* background_work_arg), void* background_work_arg) { - background_work_mutex_.lock(); - - // Start the background thread, if we haven't done so already. - if (!started_background_thread_) { - started_background_thread_ = true; - xTaskCreate(reinterpret_cast(BackgroundThreadEntryPoint), - "LVL_ONEOFF", 16 * 1024, reinterpret_cast(this), 3, NULL); - } - - BackgroundWorkItem item(background_work_function, background_work_arg); - xQueueSend(background_work_queue_, &item, portMAX_DELAY); - - background_work_mutex_.unlock(); -} - -void EspEnv::BackgroundThreadMain() { - while (true) { - uint8_t buf[sizeof(BackgroundWorkItem)]; - if (xQueueReceive(background_work_queue_, &buf, portMAX_DELAY)) { - BackgroundWorkItem* item = reinterpret_cast(buf); - auto background_work_function = item->function; - void* background_work_arg = item->arg; - background_work_function(background_work_arg); - } - } + database::SendToDbTask( + [=]() { std::invoke(background_work_function, background_work_arg); }); } } // namespace leveldb diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp index cb1437df..bc102ca8 100644 --- a/src/database/include/database.hpp +++ b/src/database/include/database.hpp @@ -1,29 +1,85 @@ #pragma once -#include +#include +#include #include #include +#include +#include +#include #include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/iterator.h" +#include "leveldb/slice.h" #include "result.hpp" namespace database { -class Iterator; +struct Artist { + std::string name; +}; + +struct Album { + std::string name; +}; + +typedef uint64_t SongId_t; + +struct Song { + std::string title; + uint64_t id; +}; + +struct SongMetadata {}; + +template +class DbResult { + public: + DbResult(const std::vector& values, std::unique_ptr it) + : values_(values), it_(std::move(it)) {} + auto values() -> std::vector { return values_; } + auto it() -> leveldb::Iterator* { return it_.release(); }; + + private: + std::vector values_; + std::unique_ptr it_; +}; class Database { public: enum DatabaseError { + ALREADY_OPEN, FAILED_TO_OPEN, }; static auto Open() -> cpp::result; ~Database(); - auto Initialise() -> void; - auto ByTitle() -> Iterator; + auto Populate() -> std::future; + + auto GetArtists(std::size_t page_size) -> std::future>; + auto GetMoreArtists(std::size_t page_size, DbResult continuation) + -> std::future>; + + auto GetAlbums(std::size_t page_size, std::optional artist) + -> std::future>; + auto GetMoreAlbums(std::size_t page_size, DbResult continuation) + -> std::future>; + + auto GetSongs(std::size_t page_size) -> std::future>; + auto GetSongs(std::size_t page_size, std::optional artist) + -> std::future>; + auto GetSongs(std::size_t page_size, + std::optional artist, + std::optional album) -> std::future>; + auto GetMoreSongs(std::size_t page_size, DbResult continuation) + -> std::future>; + + auto GetSongIds(std::optional artist, std::optional album) + -> std::future>; + auto GetSongFilePath(SongId_t id) -> std::future>; + auto GetSongMetadata(SongId_t id) -> std::future>; Database(const Database&) = delete; Database& operator=(const Database&) = delete; @@ -35,17 +91,4 @@ class Database { Database(leveldb::DB* db, leveldb::Cache* cache); }; -class Iterator { - public: - explicit Iterator(leveldb::Iterator *it) : it_(it) {} - - auto Next() -> std::optional; - - Iterator(const Iterator&) = delete; - Iterator& operator=(const Iterator&) = delete; - - private: - std::unique_ptr it_; -}; - } // namespace database diff --git a/src/database/include/db_task.hpp b/src/database/include/db_task.hpp new file mode 100644 index 00000000..39f932b0 --- /dev/null +++ b/src/database/include/db_task.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include + +namespace database { + +auto StartDbTask() -> bool; +auto QuitDbTask() -> void; + +auto SendToDbTask(std::function fn) -> void; + +template +auto RunOnDbTask(std::function fn) -> std::future { + std::shared_ptr> promise = + std::make_shared>(); + SendToDbTask([=]() { promise->set_value(std::invoke(fn)); }); + return promise->get_future(); +} + +template <> +auto RunOnDbTask(std::function fn) -> std::future; + +} // namespace database diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp index b8819b05..cf5a20e1 100644 --- a/src/database/include/env_esp.hpp +++ b/src/database/include/env_esp.hpp @@ -2,9 +2,8 @@ #include #include +#include -#include "freertos/FreeRTOS.h" -#include "freertos/queue.h" #include "leveldb/env.h" #include "leveldb/status.h" @@ -89,25 +88,6 @@ class EspEnv : public leveldb::Env { void BackgroundThreadMain(); private: - // Stores the work item data in a Schedule() call. - // - // Instances are constructed on the thread calling Schedule() and used on the - // background thread. - // - // This structure is thread-safe beacuse it is immutable. - struct BackgroundWorkItem { - explicit BackgroundWorkItem(void (*f)(void*), void* a) - : function(f), arg(a) {} - - void (*const function)(void*); - void* const arg; - }; - - std::mutex background_work_mutex_; - bool started_background_thread_; - - QueueHandle_t background_work_queue_; - InMemoryLockTable locks_; // Thread-safe. }; diff --git a/src/database/include/file_gatherer.hpp b/src/database/include/file_gatherer.hpp index 7cf00b41..5df5a61b 100644 --- a/src/database/include/file_gatherer.hpp +++ b/src/database/include/file_gatherer.hpp @@ -45,7 +45,7 @@ auto FindFiles(const std::string& root, Callback cb) -> void { to_explore.push_back(full_path.str()); } else { // This is a file! Let the callback know about it. - //std::invoke(cb, full_path.str(), info); + // std::invoke(cb, full_path.str(), info); std::invoke(cb, full_path.str()); } } diff --git a/src/database/include/tag_processor.hpp b/src/database/include/tag_processor.hpp index 88c95b61..fb2201db 100644 --- a/src/database/include/tag_processor.hpp +++ b/src/database/include/tag_processor.hpp @@ -9,6 +9,6 @@ struct FileInfo { std::string title; }; -auto GetInfo(const std::string &path, FileInfo *out) -> bool; +auto GetInfo(const std::string& path, FileInfo* out) -> bool; } // namespace database diff --git a/src/database/tag_processor.cpp b/src/database/tag_processor.cpp index f2d520a4..16dbf160 100644 --- a/src/database/tag_processor.cpp +++ b/src/database/tag_processor.cpp @@ -2,7 +2,7 @@ namespace database { -auto GetInfo(const std::string &path, FileInfo *out) -> bool { +auto GetInfo(const std::string& path, FileInfo* out) -> bool { // TODO(jacqueline): bring in taglib for this if (path.ends_with(".mp3")) { out->is_playable = true; @@ -12,4 +12,4 @@ auto GetInfo(const std::string &path, FileInfo *out) -> bool { return false; } -} // namespace database +} // namespace database diff --git a/src/main/app_console.cpp b/src/main/app_console.cpp index 859700f4..fc2a4fe5 100644 --- a/src/main/app_console.cpp +++ b/src/main/app_console.cpp @@ -154,48 +154,49 @@ int CmdDbInit(int argc, char** argv) { return 1; } - sInstance->database_->Initialise(); + sInstance->database_->Populate().get(); return 0; } void RegisterDbInit() { - esp_console_cmd_t cmd{.command = "db_init", - .help = "scans for playable files and adds them to the database", - .hint = NULL, - .func = &CmdDbInit, - .argtable = NULL}; + esp_console_cmd_t cmd{ + .command = "db_init", + .help = "scans for playable files and adds them to the database", + .hint = NULL, + .func = &CmdDbInit, + .argtable = NULL}; esp_console_cmd_register(&cmd); } -int CmdDbTitles(int argc, char** argv) { - static const std::string usage = "usage: db_titles"; +int CmdDbSongs(int argc, char** argv) { + static const std::string usage = "usage: db_songs"; if (argc != 1) { std::cout << usage << std::endl; return 1; } - database::Iterator it = sInstance->database_->ByTitle(); - while (true) { - std::optional title = it.Next(); - if (!title) { - break; - } - std::cout << *title << std::endl; + database::DbResult res = + sInstance->database_->GetSongs(10).get(); + for (database::Song s : res.values()) { + std::cout << s.title << std::endl; } + return 0; } -void RegisterDbTitles() { - esp_console_cmd_t cmd{.command = "db_titles", +void RegisterDbSongs() { + esp_console_cmd_t cmd{.command = "db_songs", .help = "lists titles of ALL songs in the database", .hint = NULL, - .func = &CmdDbTitles, + .func = &CmdDbSongs, .argtable = NULL}; esp_console_cmd_register(&cmd); } -AppConsole::AppConsole(audio::AudioPlayback* playback, database::Database *database) : playback_(playback), database_(database) { +AppConsole::AppConsole(audio::AudioPlayback* playback, + database::Database* database) + : playback_(playback), database_(database) { sInstance = this; } AppConsole::~AppConsole() { @@ -209,7 +210,7 @@ auto AppConsole::RegisterExtraComponents() -> void { RegisterVolume(); RegisterAudioStatus(); RegisterDbInit(); - RegisterDbTitles(); + RegisterDbSongs(); } } // namespace console diff --git a/src/main/app_console.hpp b/src/main/app_console.hpp index fcefd4d4..3a11d70c 100644 --- a/src/main/app_console.hpp +++ b/src/main/app_console.hpp @@ -11,11 +11,12 @@ namespace console { class AppConsole : public Console { public: - explicit AppConsole(audio::AudioPlayback* playback, database::Database *database); + explicit AppConsole(audio::AudioPlayback* playback, + database::Database* database); virtual ~AppConsole(); audio::AudioPlayback* playback_; - database::Database *database_; + database::Database* database_; protected: virtual auto RegisterExtraComponents() -> void; diff --git a/src/main/main.cpp b/src/main/main.cpp index 6ef7c61b..07547713 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -37,29 +37,6 @@ static const char* TAG = "MAIN"; -void db_main(void* whatever) { - database::Database **arg_db = reinterpret_cast(whatever); - ESP_LOGI(TAG, "Init database"); - std::unique_ptr db; - auto db_res = database::Database::Open(); - if (db_res.has_error()) { - ESP_LOGE(TAG, "database failed :("); - } else { - db.reset(db_res.value()); - ESP_LOGI(TAG, "database good :)"); - } - - *arg_db = db.get(); - - db->ByTitle(); - - while (1) { - vTaskDelay(portMAX_DELAY); - } - - vTaskDelete(NULL); -} - extern "C" void app_main(void) { ESP_LOGI(TAG, "Initialising peripherals"); @@ -85,15 +62,6 @@ extern "C" void app_main(void) { ESP_LOGE(TAG, "Failed! Do you have an SD card?"); } - ESP_LOGI(TAG, "Launch database task"); - std::size_t db_stack_size = 256 * 1024; - StaticTask_t database_task_buffer = {}; - StackType_t* database_stack = reinterpret_cast( - heap_caps_malloc(db_stack_size, MALLOC_CAP_SPIRAM)); - database::Database *db; - xTaskCreateStatic(&db_main, "LEVELDB", db_stack_size, &db, 1, database_stack, - &database_task_buffer); - ESP_LOGI(TAG, "Init touch wheel"); std::shared_ptr touchwheel = drivers->AcquireTouchWheel(); @@ -108,11 +76,18 @@ extern "C" void app_main(void) { playback = std::make_unique(drivers.get()); } + ESP_LOGI(TAG, "Init database"); + std::unique_ptr db; + auto db_res = database::Database::Open(); + if (db_res.has_value()) { + db.reset(db_res.value()); + } + ESP_LOGI(TAG, "Waiting for background tasks before launching console..."); vTaskDelay(pdMS_TO_TICKS(1000)); ESP_LOGI(TAG, "Launch console"); - console::AppConsole console(playback.get(), db); + console::AppConsole console(playback.get(), db.get()); console.Launch(); uint8_t prev_position = 0;