Move DB interactions to a background thread

custom
jacqueline 2 years ago
parent fbe047a35f
commit 5d7cbec34c
  1. 2
      src/database/CMakeLists.txt
  2. 128
      src/database/database.cpp
  3. 91
      src/database/db_task.cpp
  4. 39
      src/database/env_esp.cpp
  5. 77
      src/database/include/database.hpp
  6. 25
      src/database/include/db_task.hpp
  7. 22
      src/database/include/env_esp.hpp
  8. 2
      src/database/include/file_gatherer.hpp
  9. 2
      src/database/include/tag_processor.hpp
  10. 4
      src/database/tag_processor.cpp
  11. 41
      src/main/app_console.cpp
  12. 5
      src/main/app_console.hpp
  13. 41
      src/main/main.cpp

@ -1,5 +1,5 @@
idf_component_register(
SRCS "env_esp.cpp" "database.cpp" "tag_processor.cpp"
SRCS "env_esp.cpp" "database.cpp" "tag_processor.cpp" "db_task.cpp"
INCLUDE_DIRS "include"
REQUIRES "result" "span" "esp_psram" "fatfs")

@ -4,75 +4,123 @@
#include "ff.h"
#include "leveldb/cache.h"
#include "db_task.hpp"
#include "env_esp.hpp"
#include "file_gatherer.hpp"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "result.hpp"
#include "tag_processor.hpp"
#include "env_esp.hpp"
#include "leveldb/options.h"
namespace database {
static SingletonEnv<leveldb::EspEnv> sEnv;
static const char *kTag = "DB";
static const char* kTag = "DB";
static std::atomic<bool> sIsDbOpen(false);
auto Database::Open() -> cpp::result<Database*, DatabaseError> {
leveldb::DB* db;
leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
leveldb::Options options;
options.env = sEnv.env();
options.create_if_missing = true;
options.write_buffer_size = 48 * 1024;
options.max_file_size = 32;
options.block_cache = cache;
options.block_size = 512;
auto status = leveldb::DB::Open(options, "/.db", &db);
if (!status.ok()) {
delete cache;
ESP_LOGE(kTag, "failed to open db, status %s", status.ToString().c_str());
return cpp::fail(FAILED_TO_OPEN);
// TODO(jacqueline): Why isn't compare_and_exchange_* available?
if (sIsDbOpen.exchange(true)) {
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
return new Database(db, cache);
if (!StartDbTask()) {
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
return RunOnDbTask<cpp::result<Database*, DatabaseError>>(
[]() -> cpp::result<Database*, DatabaseError> {
leveldb::DB* db;
leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
leveldb::Options options;
options.env = sEnv.env();
options.create_if_missing = true;
options.write_buffer_size = 48 * 1024;
options.max_file_size = 32;
options.block_cache = cache;
options.block_size = 512;
auto status = leveldb::DB::Open(options, "/.db", &db);
if (!status.ok()) {
delete cache;
ESP_LOGE(kTag, "failed to open db, status %s",
status.ToString().c_str());
return cpp::fail(FAILED_TO_OPEN);
}
ESP_LOGI(kTag, "Database opened successfully");
return new Database(db, cache);
})
.get();
}
Database::Database(leveldb::DB* db, leveldb::Cache* cache)
: db_(db), cache_(cache) {}
Database::~Database() {}
Database::~Database() {
QuitDbTask();
sIsDbOpen.store(false);
}
auto Database::Initialise() -> void {
leveldb::WriteOptions opt;
opt.sync = true;
FindFiles("", [&](const std::string &path) {
template <typename Parser>
auto IterateAndParse(leveldb::Iterator* it, std::size_t limit, Parser p)
-> void {
for (int i = 0; i < limit; i++) {
if (!it->Valid()) {
break;
}
std::invoke(p, it->key(), it->value());
it->Next();
}
}
auto Database::Populate() -> std::future<void> {
return RunOnDbTask<void>([&]() -> void {
leveldb::WriteOptions opt;
opt.sync = true;
FindFiles("", [&](const std::string& path) {
ESP_LOGI(kTag, "considering %s", path.c_str());
FileInfo info;
if (GetInfo(path, &info)) {
ESP_LOGI(kTag, "added as '%s'", info.title.c_str());
db_->Put(opt, "title:" + info.title, path);
}
});
db_->Put(opt, "title:coolkeywithoutval", leveldb::Slice());
});
db_->Put(opt, "title:coolkeywithoutval", leveldb::Slice());
}
auto Database::ByTitle() -> Iterator {
leveldb::Iterator *it = db_->NewIterator(leveldb::ReadOptions());
it->Seek("title:");
while (it->Valid()) {
ESP_LOGI(kTag, "%s : %s", it->key().ToString().c_str(), it->value().ToString().c_str());
it->Next();
}
return Iterator(it);
auto Database::GetSongs(std::size_t page_size) -> std::future<DbResult<Song>> {
return RunOnDbTask<DbResult<Song>>([&]() -> DbResult<Song> {
std::unique_ptr<leveldb::Iterator> it(
db_->NewIterator(leveldb::ReadOptions()));
it->Seek("title:");
std::vector<Song> results;
IterateAndParse(it.get(), page_size,
[&](const leveldb::Slice& key, const leveldb::Slice& val) {
Song s;
s.title = key.ToString();
results.push_back(s);
});
return DbResult<Song>(results, std::move(it));
});
}
auto Iterator::Next() -> std::optional<std::string> {
if (!it_->Valid()) {
return {};
}
std::string ret = it_->key().ToString();
it_->Next();
return ret;
auto Database::GetMoreSongs(std::size_t page_size, DbResult<Song> continuation)
-> std::future<DbResult<Song>> {
return RunOnDbTask<DbResult<Song>>([&]() -> DbResult<Song> {
std::unique_ptr<leveldb::Iterator> it(continuation.it());
std::vector<Song> results;
IterateAndParse(it.get(), page_size,
[&](const leveldb::Slice& key, const leveldb::Slice& val) {
Song s;
s.title = key.ToString();
results.push_back(s);
});
return DbResult<Song>(results, std::move(it));
});
}
} // namespace database

@ -0,0 +1,91 @@
#include "db_task.hpp"
#include <functional>
#include "esp_heap_caps.h"
#include "freertos/FreeRTOS.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
#include "freertos/task.h"
namespace database {
static const std::size_t kDbStackSize = 256 * 1024;
static StaticTask_t sDbStaticTask;
static StackType_t* sDbStack = nullptr;
static std::atomic<bool> sTaskRunning(false);
static QueueHandle_t sWorkQueue;
struct WorkItem {
std::function<void(void)>* fn;
bool quit;
};
auto SendToDbTask(std::function<void(void)> fn) -> void {
WorkItem item{
.fn = new std::function<void(void)>(fn),
.quit = false,
};
xQueueSend(sWorkQueue, &item, portMAX_DELAY);
}
template <>
auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
SendToDbTask([=]() {
std::invoke(fn);
promise->set_value();
});
return promise->get_future();
}
void DatabaseTaskMain(void* args) {
while (true) {
WorkItem item;
if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) {
if (item.quit) {
break;
}
if (item.fn != nullptr) {
std::invoke(*item.fn);
delete item.fn;
}
}
}
vQueueDelete(sWorkQueue);
sTaskRunning.store(false);
vTaskDelete(NULL);
}
auto StartDbTask() -> bool {
if (sTaskRunning.exchange(true)) {
return false;
}
if (sDbStack == nullptr) {
sDbStack = reinterpret_cast<StackType_t*>(
heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM));
}
sWorkQueue = xQueueCreate(8, sizeof(std::function<void(void)>*));
xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack,
&sDbStaticTask);
return true;
}
auto QuitDbTask() -> void {
if (!sTaskRunning.load()) {
return;
}
WorkItem item{
.fn = nullptr,
.quit = true,
};
xQueueSend(sWorkQueue, &item, portMAX_DELAY);
while (sTaskRunning.load()) {
vTaskDelay(pdMS_TO_TICKS(1));
}
}
} // namespace database

@ -7,6 +7,7 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <limits>
#include <mutex>
#include <queue>
@ -28,6 +29,8 @@
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "db_task.hpp"
namespace leveldb {
std::string ErrToStr(FRESULT err) {
@ -447,43 +450,13 @@ void EspEnv::SleepForMicroseconds(int micros) {
vTaskDelay(pdMS_TO_TICKS(micros / 1000));
}
extern "C" void BackgroundThreadEntryPoint(EspEnv* env) {
env->BackgroundThreadMain();
}
EspEnv::EspEnv()
: background_work_mutex_(),
started_background_thread_(false),
background_work_queue_(xQueueCreate(4, sizeof(BackgroundWorkItem))) {}
EspEnv::EspEnv() {}
void EspEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_work_mutex_.lock();
// Start the background thread, if we haven't done so already.
if (!started_background_thread_) {
started_background_thread_ = true;
xTaskCreate(reinterpret_cast<TaskFunction_t>(BackgroundThreadEntryPoint),
"LVL_ONEOFF", 16 * 1024, reinterpret_cast<void*>(this), 3, NULL);
}
BackgroundWorkItem item(background_work_function, background_work_arg);
xQueueSend(background_work_queue_, &item, portMAX_DELAY);
background_work_mutex_.unlock();
}
void EspEnv::BackgroundThreadMain() {
while (true) {
uint8_t buf[sizeof(BackgroundWorkItem)];
if (xQueueReceive(background_work_queue_, &buf, portMAX_DELAY)) {
BackgroundWorkItem* item = reinterpret_cast<BackgroundWorkItem*>(buf);
auto background_work_function = item->function;
void* background_work_arg = item->arg;
background_work_function(background_work_arg);
}
}
database::SendToDbTask(
[=]() { std::invoke(background_work_function, background_work_arg); });
}
} // namespace leveldb

@ -1,29 +1,85 @@
#pragma once
#include <string>
#include <cstdint>
#include <future>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "leveldb/cache.h"
#include "leveldb/db.h"
#include "leveldb/iterator.h"
#include "leveldb/slice.h"
#include "result.hpp"
namespace database {
class Iterator;
struct Artist {
std::string name;
};
struct Album {
std::string name;
};
typedef uint64_t SongId_t;
struct Song {
std::string title;
uint64_t id;
};
struct SongMetadata {};
template <typename T>
class DbResult {
public:
DbResult(const std::vector<T>& values, std::unique_ptr<leveldb::Iterator> it)
: values_(values), it_(std::move(it)) {}
auto values() -> std::vector<T> { return values_; }
auto it() -> leveldb::Iterator* { return it_.release(); };
private:
std::vector<T> values_;
std::unique_ptr<leveldb::Iterator> it_;
};
class Database {
public:
enum DatabaseError {
ALREADY_OPEN,
FAILED_TO_OPEN,
};
static auto Open() -> cpp::result<Database*, DatabaseError>;
~Database();
auto Initialise() -> void;
auto ByTitle() -> Iterator;
auto Populate() -> std::future<void>;
auto GetArtists(std::size_t page_size) -> std::future<DbResult<Artist>>;
auto GetMoreArtists(std::size_t page_size, DbResult<Artist> continuation)
-> std::future<DbResult<Artist>>;
auto GetAlbums(std::size_t page_size, std::optional<Artist> artist)
-> std::future<DbResult<Album>>;
auto GetMoreAlbums(std::size_t page_size, DbResult<Album> continuation)
-> std::future<DbResult<Album>>;
auto GetSongs(std::size_t page_size) -> std::future<DbResult<Song>>;
auto GetSongs(std::size_t page_size, std::optional<Artist> artist)
-> std::future<DbResult<Song>>;
auto GetSongs(std::size_t page_size,
std::optional<Artist> artist,
std::optional<Album> album) -> std::future<DbResult<Song>>;
auto GetMoreSongs(std::size_t page_size, DbResult<Song> continuation)
-> std::future<DbResult<Song>>;
auto GetSongIds(std::optional<Artist> artist, std::optional<Album> album)
-> std::future<std::vector<SongId_t>>;
auto GetSongFilePath(SongId_t id) -> std::future<std::optional<std::string>>;
auto GetSongMetadata(SongId_t id) -> std::future<std::optional<SongMetadata>>;
Database(const Database&) = delete;
Database& operator=(const Database&) = delete;
@ -35,17 +91,4 @@ class Database {
Database(leveldb::DB* db, leveldb::Cache* cache);
};
class Iterator {
public:
explicit Iterator(leveldb::Iterator *it) : it_(it) {}
auto Next() -> std::optional<std::string>;
Iterator(const Iterator&) = delete;
Iterator& operator=(const Iterator&) = delete;
private:
std::unique_ptr<leveldb::Iterator> it_;
};
} // namespace database

@ -0,0 +1,25 @@
#pragma once
#include <functional>
#include <future>
#include <memory>
namespace database {
auto StartDbTask() -> bool;
auto QuitDbTask() -> void;
auto SendToDbTask(std::function<void(void)> fn) -> void;
template <typename T>
auto RunOnDbTask(std::function<T(void)> fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>();
SendToDbTask([=]() { promise->set_value(std::invoke(fn)); });
return promise->get_future();
}
template <>
auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void>;
} // namespace database

@ -2,9 +2,8 @@
#include <mutex>
#include <set>
#include <string>
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
@ -89,25 +88,6 @@ class EspEnv : public leveldb::Env {
void BackgroundThreadMain();
private:
// Stores the work item data in a Schedule() call.
//
// Instances are constructed on the thread calling Schedule() and used on the
// background thread.
//
// This structure is thread-safe beacuse it is immutable.
struct BackgroundWorkItem {
explicit BackgroundWorkItem(void (*f)(void*), void* a)
: function(f), arg(a) {}
void (*const function)(void*);
void* const arg;
};
std::mutex background_work_mutex_;
bool started_background_thread_;
QueueHandle_t background_work_queue_;
InMemoryLockTable locks_; // Thread-safe.
};

@ -45,7 +45,7 @@ auto FindFiles(const std::string& root, Callback cb) -> void {
to_explore.push_back(full_path.str());
} else {
// This is a file! Let the callback know about it.
//std::invoke(cb, full_path.str(), info);
// std::invoke(cb, full_path.str(), info);
std::invoke(cb, full_path.str());
}
}

@ -9,6 +9,6 @@ struct FileInfo {
std::string title;
};
auto GetInfo(const std::string &path, FileInfo *out) -> bool;
auto GetInfo(const std::string& path, FileInfo* out) -> bool;
} // namespace database

@ -2,7 +2,7 @@
namespace database {
auto GetInfo(const std::string &path, FileInfo *out) -> bool {
auto GetInfo(const std::string& path, FileInfo* out) -> bool {
// TODO(jacqueline): bring in taglib for this
if (path.ends_with(".mp3")) {
out->is_playable = true;
@ -12,4 +12,4 @@ auto GetInfo(const std::string &path, FileInfo *out) -> bool {
return false;
}
} // namespace database
} // namespace database

@ -154,48 +154,49 @@ int CmdDbInit(int argc, char** argv) {
return 1;
}
sInstance->database_->Initialise();
sInstance->database_->Populate().get();
return 0;
}
void RegisterDbInit() {
esp_console_cmd_t cmd{.command = "db_init",
.help = "scans for playable files and adds them to the database",
.hint = NULL,
.func = &CmdDbInit,
.argtable = NULL};
esp_console_cmd_t cmd{
.command = "db_init",
.help = "scans for playable files and adds them to the database",
.hint = NULL,
.func = &CmdDbInit,
.argtable = NULL};
esp_console_cmd_register(&cmd);
}
int CmdDbTitles(int argc, char** argv) {
static const std::string usage = "usage: db_titles";
int CmdDbSongs(int argc, char** argv) {
static const std::string usage = "usage: db_songs";
if (argc != 1) {
std::cout << usage << std::endl;
return 1;
}
database::Iterator it = sInstance->database_->ByTitle();
while (true) {
std::optional<std::string> title = it.Next();
if (!title) {
break;
}
std::cout << *title << std::endl;
database::DbResult<database::Song> res =
sInstance->database_->GetSongs(10).get();
for (database::Song s : res.values()) {
std::cout << s.title << std::endl;
}
return 0;
}
void RegisterDbTitles() {
esp_console_cmd_t cmd{.command = "db_titles",
void RegisterDbSongs() {
esp_console_cmd_t cmd{.command = "db_songs",
.help = "lists titles of ALL songs in the database",
.hint = NULL,
.func = &CmdDbTitles,
.func = &CmdDbSongs,
.argtable = NULL};
esp_console_cmd_register(&cmd);
}
AppConsole::AppConsole(audio::AudioPlayback* playback, database::Database *database) : playback_(playback), database_(database) {
AppConsole::AppConsole(audio::AudioPlayback* playback,
database::Database* database)
: playback_(playback), database_(database) {
sInstance = this;
}
AppConsole::~AppConsole() {
@ -209,7 +210,7 @@ auto AppConsole::RegisterExtraComponents() -> void {
RegisterVolume();
RegisterAudioStatus();
RegisterDbInit();
RegisterDbTitles();
RegisterDbSongs();
}
} // namespace console

@ -11,11 +11,12 @@ namespace console {
class AppConsole : public Console {
public:
explicit AppConsole(audio::AudioPlayback* playback, database::Database *database);
explicit AppConsole(audio::AudioPlayback* playback,
database::Database* database);
virtual ~AppConsole();
audio::AudioPlayback* playback_;
database::Database *database_;
database::Database* database_;
protected:
virtual auto RegisterExtraComponents() -> void;

@ -37,29 +37,6 @@
static const char* TAG = "MAIN";
void db_main(void* whatever) {
database::Database **arg_db = reinterpret_cast<database::Database**>(whatever);
ESP_LOGI(TAG, "Init database");
std::unique_ptr<database::Database> db;
auto db_res = database::Database::Open();
if (db_res.has_error()) {
ESP_LOGE(TAG, "database failed :(");
} else {
db.reset(db_res.value());
ESP_LOGI(TAG, "database good :)");
}
*arg_db = db.get();
db->ByTitle();
while (1) {
vTaskDelay(portMAX_DELAY);
}
vTaskDelete(NULL);
}
extern "C" void app_main(void) {
ESP_LOGI(TAG, "Initialising peripherals");
@ -85,15 +62,6 @@ extern "C" void app_main(void) {
ESP_LOGE(TAG, "Failed! Do you have an SD card?");
}
ESP_LOGI(TAG, "Launch database task");
std::size_t db_stack_size = 256 * 1024;
StaticTask_t database_task_buffer = {};
StackType_t* database_stack = reinterpret_cast<StackType_t*>(
heap_caps_malloc(db_stack_size, MALLOC_CAP_SPIRAM));
database::Database *db;
xTaskCreateStatic(&db_main, "LEVELDB", db_stack_size, &db, 1, database_stack,
&database_task_buffer);
ESP_LOGI(TAG, "Init touch wheel");
std::shared_ptr<drivers::TouchWheel> touchwheel =
drivers->AcquireTouchWheel();
@ -108,11 +76,18 @@ extern "C" void app_main(void) {
playback = std::make_unique<audio::AudioPlayback>(drivers.get());
}
ESP_LOGI(TAG, "Init database");
std::unique_ptr<database::Database> db;
auto db_res = database::Database::Open();
if (db_res.has_value()) {
db.reset(db_res.value());
}
ESP_LOGI(TAG, "Waiting for background tasks before launching console...");
vTaskDelay(pdMS_TO_TICKS(1000));
ESP_LOGI(TAG, "Launch console");
console::AppConsole console(playback.get(), db);
console::AppConsole console(playback.get(), db.get());
console.Launch();
uint8_t prev_position = 0;

Loading…
Cancel
Save