Shard searching for new tracks across multiple tasks

This also has the effect of breaking up the enormous 'updateIndexes'
method into one call per file, which means database updates also no
longer monopolise a single background task for their entire duration.

avg. time per new file is now <140ms for a completely fresh database,
which is pretty good i think!
custom
jacqueline 8 months ago
parent 28cf749951
commit 2ad83cb210
  1. 161
      src/tangara/database/database.cpp
  2. 30
      src/tangara/database/database.hpp
  3. 44
      src/tangara/database/track_finder.cpp
  4. 50
      src/tangara/database/track_finder.hpp

@ -6,9 +6,6 @@
#include "database/database.hpp" #include "database/database.hpp"
#include <stdint.h>
#include <sys/_stdint.h>
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <functional> #include <functional>
@ -20,12 +17,8 @@
#include <string> #include <string>
#include <variant> #include <variant>
#include "collation.hpp"
#include "cppbor.h" #include "cppbor.h"
#include "cppbor_parse.h" #include "cppbor_parse.h"
#include "database/index.hpp"
#include "database/track_finder.hpp"
#include "debug.hpp"
#include "esp_log.h" #include "esp_log.h"
#include "esp_timer.h" #include "esp_timer.h"
#include "ff.h" #include "ff.h"
@ -39,12 +32,14 @@
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "collation.hpp"
#include "database/db_events.hpp" #include "database/db_events.hpp"
#include "database/env_esp.hpp" #include "database/env_esp.hpp"
#include "database/index.hpp"
#include "database/records.hpp" #include "database/records.hpp"
#include "database/tag_parser.hpp" #include "database/tag_parser.hpp"
#include "database/track.hpp" #include "database/track.hpp"
#include "drivers/spi.hpp" #include "database/track_finder.hpp"
#include "events/event_queue.hpp" #include "events/event_queue.hpp"
#include "memory_resource.hpp" #include "memory_resource.hpp"
#include "result.hpp" #include "result.hpp"
@ -58,12 +53,16 @@ static SingletonEnv<leveldb::EspEnv> sEnv;
static const char kDbPath[] = "/.tangara-db"; static const char kDbPath[] = "/.tangara-db";
static const char kKeyDbVersion[] = "schema_version"; static const char kKeyDbVersion[] = "schema_version";
static const char kKeyCustom[] = "U\0"; static const char kKeyCustom[] = "U\0";
static const char kKeyCollator[] = "collator"; static const char kKeyCollator[] = "collator";
static constexpr size_t kMaxParallelism = 2;
static std::atomic<bool> sIsDbOpen(false); static std::atomic<bool> sIsDbOpen(false);
using std::placeholders::_1;
using std::placeholders::_2;
static auto CreateNewDatabase(leveldb::Options& options, locale::ICollator& col) static auto CreateNewDatabase(leveldb::Options& options, locale::ICollator& col)
-> leveldb::DB* { -> leveldb::DB* {
Database::Destroy(); Database::Destroy();
@ -167,7 +166,8 @@ auto Database::Open(ITagParser& parser,
} }
ESP_LOGI(kTag, "Database opened successfully"); ESP_LOGI(kTag, "Database opened successfully");
return new Database(db, cache.release(), parser, collator); return new Database(db, cache.release(), bg_worker, parser,
collator);
}) })
.get(); .get();
} }
@ -180,15 +180,20 @@ auto Database::Destroy() -> void {
Database::Database(leveldb::DB* db, Database::Database(leveldb::DB* db,
leveldb::Cache* cache, leveldb::Cache* cache,
tasks::WorkerPool& pool,
ITagParser& tag_parser, ITagParser& tag_parser,
locale::ICollator& collator) locale::ICollator& collator)
: db_(db), : db_(db),
cache_(cache), cache_(cache),
track_finder_(
pool,
kMaxParallelism,
std::bind(&Database::processCandidateCallback, this, _1, _2),
std::bind(&Database::indexingCompleteCallback, this)),
tag_parser_(tag_parser), tag_parser_(tag_parser),
collator_(collator), collator_(collator),
is_updating_(false) { is_updating_(false) {
dbCalculateNextTrackId(); dbCalculateNextTrackId();
ESP_LOGI(kTag, "next track id is %lu", next_track_id_.load());
} }
Database::~Database() { Database::~Database() {
@ -243,7 +248,7 @@ auto Database::get(const std::string& key) -> std::optional<std::string> {
} }
auto Database::getTrackPath(TrackId id) -> std::optional<std::string> { auto Database::getTrackPath(TrackId id) -> std::optional<std::string> {
auto track_data = dbGetTrackData(id); auto track_data = dbGetTrackData(leveldb::ReadOptions(), id);
if (!track_data) { if (!track_data) {
return {}; return {};
} }
@ -251,7 +256,7 @@ auto Database::getTrackPath(TrackId id) -> std::optional<std::string> {
} }
auto Database::getTrack(TrackId id) -> std::shared_ptr<Track> { auto Database::getTrack(TrackId id) -> std::shared_ptr<Track> {
std::shared_ptr<TrackData> data = dbGetTrackData(id); std::shared_ptr<TrackData> data = dbGetTrackData(leveldb::ReadOptions(), id);
if (!data || data->is_tombstoned) { if (!data || data->is_tombstoned) {
return {}; return {};
} }
@ -274,34 +279,61 @@ auto Database::getIndexes() -> std::vector<IndexInfo> {
}; };
} }
class UpdateNotifier { Database::UpdateTracker::UpdateTracker()
public: : num_old_tracks_(0),
UpdateNotifier(std::atomic<bool>& is_updating) : is_updating_(is_updating) { num_new_tracks_(0),
start_time_(esp_timer_get_time()) {
events::Ui().Dispatch(event::UpdateStarted{}); events::Ui().Dispatch(event::UpdateStarted{});
events::System().Dispatch(event::UpdateStarted{}); events::System().Dispatch(event::UpdateStarted{});
} }
~UpdateNotifier() {
is_updating_ = false; Database::UpdateTracker::~UpdateTracker() {
uint64_t end_time = esp_timer_get_time();
uint64_t time_per_old = 0;
if (num_old_tracks_) {
time_per_old = (verification_finish_time_ - start_time_) / num_old_tracks_;
}
uint64_t time_per_new = 0;
if (num_new_tracks_) {
time_per_new = (end_time - verification_finish_time_) / num_new_tracks_;
}
ESP_LOGI(
kTag,
"processed %lu old tracks and %lu new tracks in %llu seconds (%llums "
"per old, %llums per new)",
num_old_tracks_, num_new_tracks_, (end_time - start_time_) / 1000000,
time_per_old / 1000, time_per_new / 1000);
events::Ui().Dispatch(event::UpdateFinished{}); events::Ui().Dispatch(event::UpdateFinished{});
events::System().Dispatch(event::UpdateFinished{}); events::System().Dispatch(event::UpdateFinished{});
} }
private: auto Database::UpdateTracker::onTrackVerified() -> void {
std::atomic<bool>& is_updating_; events::Ui().Dispatch(event::UpdateProgress{
}; .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks,
.val = ++num_old_tracks_,
});
}
auto Database::UpdateTracker::onVerificationFinished() -> void {
verification_finish_time_ = esp_timer_get_time();
}
auto Database::UpdateTracker::onTrackAdded() -> void {
num_new_tracks_++;
}
auto Database::updateIndexes() -> void { auto Database::updateIndexes() -> void {
if (is_updating_.exchange(true)) { if (is_updating_.exchange(true)) {
return; return;
} }
UpdateNotifier notifier{is_updating_}; update_tracker_ = std::make_unique<UpdateTracker>();
uint32_t num_old_tracks = 0;
uint32_t num_new_tracks = 0;
uint64_t start_time = esp_timer_get_time();
leveldb::ReadOptions read_options; leveldb::ReadOptions read_options;
read_options.fill_cache = true; read_options.fill_cache = false;
read_options.verify_checksums = true;
// Stage 1: verify all existing tracks are still valid. // Stage 1: verify all existing tracks are still valid.
ESP_LOGI(kTag, "verifying existing tracks"); ESP_LOGI(kTag, "verifying existing tracks");
@ -310,11 +342,7 @@ auto Database::updateIndexes() -> void {
std::string prefix = EncodeDataPrefix(); std::string prefix = EncodeDataPrefix();
for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix); for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix);
it->Next()) { it->Next()) {
num_old_tracks++; update_tracker_->onTrackVerified();
events::Ui().Dispatch(event::UpdateProgress{
.stage = event::UpdateProgress::Stage::kVerifyingExistingTracks,
.val = num_old_tracks,
});
std::shared_ptr<TrackData> track = ParseDataValue(it->value()); std::shared_ptr<TrackData> track = ParseDataValue(it->value());
if (!track) { if (!track) {
@ -325,7 +353,6 @@ auto Database::updateIndexes() -> void {
} }
if (track->is_tombstoned) { if (track->is_tombstoned) {
ESP_LOGW(kTag, "skipping tombstoned %lx", track->id);
continue; continue;
} }
@ -392,40 +419,36 @@ auto Database::updateIndexes() -> void {
} }
} }
uint64_t verify_end_time = esp_timer_get_time(); update_tracker_->onVerificationFinished();
// Stage 2: search for newly added files. // Stage 2: search for newly added files.
ESP_LOGI(kTag, "scanning for new tracks"); ESP_LOGI(kTag, "scanning for new tracks");
uint64_t num_files = 0; track_finder_.launch("");
};
auto track_finder = std::make_shared<TrackFinder>("");
FILINFO info; auto Database::processCandidateCallback(FILINFO& info, std::string_view path)
while (auto path = track_finder->next(info)) { -> void {
num_files++; leveldb::ReadOptions read_options;
events::Ui().Dispatch(event::UpdateProgress{ read_options.fill_cache = true;
.stage = event::UpdateProgress::Stage::kScanningForNewTracks, read_options.verify_checksums = false;
.val = num_files,
});
std::string unused; std::string unused;
if (db_->Get(read_options, EncodePathKey(*path), &unused).ok()) { if (db_->Get(read_options, EncodePathKey(path), &unused).ok()) {
// This file is already in the database; skip it. // This file is already in the database; skip it.
continue; return;
} }
std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(*path); std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(path);
if (!tags || tags->encoding() == Container::kUnsupported) { if (!tags || tags->encoding() == Container::kUnsupported) {
// No parseable tags; skip this fiile. // No parseable tags; skip this fiile.
continue; return;
} }
// Check for any existing track with the same hash. // Check for any existing track with the same hash.
uint64_t hash = tags->Hash(); uint64_t hash = tags->Hash();
std::optional<TrackId> existing_id; std::optional<TrackId> existing_id;
std::string raw_entry; std::string raw_entry;
if (db_->Get(leveldb::ReadOptions(), EncodeHashKey(hash), &raw_entry) if (db_->Get(read_options, EncodeHashKey(hash), &raw_entry).ok()) {
.ok()) {
existing_id = ParseHashValue(raw_entry); existing_id = ParseHashValue(raw_entry);
} }
@ -434,29 +457,30 @@ auto Database::updateIndexes() -> void {
// Do we have any existing data for this track? This could be the case if // Do we have any existing data for this track? This could be the case if
// this is a tombstoned entry. In such as case, we want to reuse the // this is a tombstoned entry. In such as case, we want to reuse the
// previous TrackData so that any extra metadata is preserved. // previous TrackData so that any extra metadata is preserved.
data = dbGetTrackData(*existing_id); data = dbGetTrackData(read_options, *existing_id);
if (!data) { if (!data) {
data = std::make_shared<TrackData>(); data = std::make_shared<TrackData>();
data->id = *existing_id; data->id = *existing_id;
} else if (std::string_view{data->filepath} != *path) { } else if (data->filepath != path && !data->is_tombstoned) {
ESP_LOGW(kTag, "hash collision: %s, %s, %s", ESP_LOGW(kTag, "hash collision: %s, %s, %s",
tags->title().value_or("no title").c_str(), tags->title().value_or("no title").c_str(),
tags->artist().value_or("no artist").c_str(), tags->artist().value_or("no artist").c_str(),
tags->album().value_or("no album").c_str()); tags->album().value_or("no album").c_str());
// Don't commit anything if there's a hash collision, since we're // Don't commit anything if there's a hash collision, since we're
// likely to make a big mess. // likely to make a big mess.
continue; return;
} }
} else { } else {
num_new_tracks++; update_tracker_->onTrackAdded();
data = std::make_shared<TrackData>(); data = std::make_shared<TrackData>();
data->id = dbMintNewTrackId(); data->id = dbMintNewTrackId();
} }
// Make sure the file-based metadata on the TrackData is up to date. // Make sure the file-based metadata on the TrackData is up to date.
data->filepath = *path; data->filepath = path;
data->tags_hash = hash; data->tags_hash = hash;
data->modified_at = {info.fdate, info.ftime}; data->modified_at = {info.fdate, info.ftime};
data->is_tombstoned = false;
// Apply all the actual database changes as one atomic batch. This makes // Apply all the actual database changes as one atomic batch. This makes
// the whole 'new track' operation atomic, and also reduces the amount of // the whole 'new track' operation atomic, and also reduces the amount of
@ -467,28 +491,14 @@ auto Database::updateIndexes() -> void {
dbCreateIndexesForTrack(*data, *tags, batch); dbCreateIndexesForTrack(*data, *tags, batch);
batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data)); batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data));
batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id)); batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id));
batch.Put(EncodePathKey(*path), TrackIdToBytes(data->id)); batch.Put(EncodePathKey(path), TrackIdToBytes(data->id));
db_->Write(leveldb::WriteOptions(), &batch); db_->Write(leveldb::WriteOptions(), &batch);
};
uint64_t end_time = esp_timer_get_time();
uint64_t time_per_old = 0;
if (num_old_tracks) {
time_per_old = (verify_end_time - start_time) / num_old_tracks;
}
uint64_t time_per_new = 0;
if (num_new_tracks) {
time_per_new = (end_time - verify_end_time) / num_new_tracks;
} }
ESP_LOGI( auto Database::indexingCompleteCallback() -> void {
kTag, update_tracker_.reset();
"processed %lu old tracks and %lu new tracks in %llu seconds (%llums " is_updating_ = false;
"per old, %llums per new)",
num_old_tracks, num_new_tracks, (end_time - start_time) / 1000000,
time_per_old / 1000, time_per_new / 1000);
} }
auto Database::isUpdating() -> bool { auto Database::isUpdating() -> bool {
@ -536,10 +546,11 @@ auto Database::dbMintNewTrackId() -> TrackId {
return next_track_id_++; return next_track_id_++;
} }
auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> { auto Database::dbGetTrackData(leveldb::ReadOptions options, TrackId id)
-> std::shared_ptr<TrackData> {
std::string key = EncodeDataKey(id); std::string key = EncodeDataKey(id);
std::string raw_val; std::string raw_val;
if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) { if (!db_->Get(options, key, &raw_val).ok()) {
ESP_LOGW(kTag, "no key found for #%lx", id); ESP_LOGW(kTag, "no key found for #%lx", id);
return {}; return {};
} }

@ -23,6 +23,8 @@
#include "database/records.hpp" #include "database/records.hpp"
#include "database/tag_parser.hpp" #include "database/tag_parser.hpp"
#include "database/track.hpp" #include "database/track.hpp"
#include "database/track_finder.hpp"
#include "ff.h"
#include "leveldb/cache.h" #include "leveldb/cache.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
@ -93,22 +95,48 @@ class Database {
leveldb::DB* db_; leveldb::DB* db_;
leveldb::Cache* cache_; leveldb::Cache* cache_;
TrackFinder track_finder_;
// Not owned. // Not owned.
ITagParser& tag_parser_; ITagParser& tag_parser_;
locale::ICollator& collator_; locale::ICollator& collator_;
/* Internal utility for tracking a currently in-progress index update. */
class UpdateTracker {
public:
UpdateTracker();
~UpdateTracker();
auto onTrackVerified() -> void;
auto onVerificationFinished() -> void;
auto onTrackAdded() -> void;
private:
uint32_t num_old_tracks_;
uint32_t num_new_tracks_;
uint64_t start_time_;
uint64_t verification_finish_time_;
};
std::atomic<bool> is_updating_; std::atomic<bool> is_updating_;
std::unique_ptr<UpdateTracker> update_tracker_;
std::atomic<TrackId> next_track_id_; std::atomic<TrackId> next_track_id_;
Database(leveldb::DB* db, Database(leveldb::DB* db,
leveldb::Cache* cache, leveldb::Cache* cache,
tasks::WorkerPool& pool,
ITagParser& tag_parser, ITagParser& tag_parser,
locale::ICollator& collator); locale::ICollator& collator);
auto processCandidateCallback(FILINFO&, std::string_view) -> void;
auto indexingCompleteCallback() -> void;
auto dbCalculateNextTrackId() -> void; auto dbCalculateNextTrackId() -> void;
auto dbMintNewTrackId() -> TrackId; auto dbMintNewTrackId() -> TrackId;
auto dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData>; auto dbGetTrackData(leveldb::ReadOptions, TrackId id)
-> std::shared_ptr<TrackData>;
auto dbCreateIndexesForTrack(const Track&, leveldb::WriteBatch&) -> void; auto dbCreateIndexesForTrack(const Track&, leveldb::WriteBatch&) -> void;
auto dbCreateIndexesForTrack(const TrackData&, auto dbCreateIndexesForTrack(const TrackData&,

@ -24,12 +24,12 @@ namespace database {
static_assert(sizeof(TCHAR) == sizeof(char), "TCHAR must be CHAR"); static_assert(sizeof(TCHAR) == sizeof(char), "TCHAR must be CHAR");
TrackFinder::TrackFinder(std::string_view root) CandidateIterator::CandidateIterator(std::string_view root)
: to_explore_(&memory::kSpiRamResource) { : to_explore_(&memory::kSpiRamResource) {
to_explore_.push_back({root.data(), root.size()}); to_explore_.push_back({root.data(), root.size()});
} }
auto TrackFinder::next(FILINFO& out_info) -> std::optional<std::string> { auto CandidateIterator::next(FILINFO& info) -> std::optional<std::string> {
std::scoped_lock<std::mutex> lock{mut_}; std::scoped_lock<std::mutex> lock{mut_};
while (!to_explore_.empty() || current_) { while (!to_explore_.empty() || current_) {
if (!current_) { if (!current_) {
@ -49,7 +49,6 @@ auto TrackFinder::next(FILINFO& out_info) -> std::optional<std::string> {
} }
} }
FILINFO info;
FRESULT res = f_readdir(&current_->second, &info); FRESULT res = f_readdir(&current_->second, &info);
if (res != FR_OK || info.fname[0] == 0) { if (res != FR_OK || info.fname[0] == 0) {
// No more files in the directory. // No more files in the directory.
@ -71,14 +70,49 @@ auto TrackFinder::next(FILINFO& out_info) -> std::optional<std::string> {
to_explore_.push_back(full_path); to_explore_.push_back(full_path);
} else { } else {
// This is a file! We can return now. // This is a file! We can return now.
out_info = info;
return {{full_path.data(), full_path.size()}}; return {{full_path.data(), full_path.size()}};
} }
} }
} }
// Out of things to explore. // Out of paths to explore.
return {}; return {};
} }
TrackFinder::TrackFinder(
tasks::WorkerPool& pool,
size_t parallelism,
std::function<void(FILINFO&, std::string_view)> processor,
std::function<void()> complete_cb)
: pool_{pool},
parallelism_(parallelism),
processor_(processor),
complete_cb_(complete_cb) {}
auto TrackFinder::launch(std::string_view root) -> void {
iterator_ = std::make_unique<CandidateIterator>(root);
num_workers_ = parallelism_;
for (size_t i = 0; i < parallelism_; i++) {
schedule();
}
}
auto TrackFinder::schedule() -> void {
pool_.Dispatch<void>([&]() {
FILINFO info;
auto next = iterator_->next(info);
if (next) {
std::invoke(processor_, info, *next);
schedule();
} else {
std::scoped_lock<std::mutex> lock{workers_mutex_};
num_workers_ -= 1;
if (num_workers_ == 0) {
iterator_.reset();
std::invoke(complete_cb_);
}
}
});
}
} // namespace database } // namespace database

@ -16,13 +16,27 @@
#include "ff.h" #include "ff.h"
#include "tasks.hpp"
namespace database { namespace database {
class TrackFinder { /*
* Iterator that recursively stats every file within the given directory root.
*/
class CandidateIterator {
public: public:
TrackFinder(std::string_view root); CandidateIterator(std::string_view root);
/*
* Returns the next file. The stat result is placed within `out`. If the
* iterator has finished, returns absent. This method always modifies the
* contents of `out`, even if no file is returned.
*/
auto next(FILINFO& out) -> std::optional<std::string>;
auto next(FILINFO&) -> std::optional<std::string>; // Cannot be copied or moved.
CandidateIterator(const CandidateIterator&) = delete;
CandidateIterator& operator=(const CandidateIterator&) = delete;
private: private:
std::mutex mut_; std::mutex mut_;
@ -30,4 +44,34 @@ class TrackFinder {
std::optional<std::pair<std::pmr::string, FF_DIR>> current_; std::optional<std::pair<std::pmr::string, FF_DIR>> current_;
}; };
/*
* Utility for iterating through each file within a directory root. Iteration
* can be sharded across several tasks.
*/
class TrackFinder {
public:
TrackFinder(tasks::WorkerPool&,
size_t parallelism,
std::function<void(FILINFO&, std::string_view)> processor,
std::function<void()> complete_cb);
auto launch(std::string_view root) -> void;
// Cannot be copied or moved.
TrackFinder(const TrackFinder&) = delete;
TrackFinder& operator=(const TrackFinder&) = delete;
private:
tasks::WorkerPool& pool_;
const size_t parallelism_;
const std::function<void(FILINFO&, std::string_view)> processor_;
const std::function<void()> complete_cb_;
std::mutex workers_mutex_;
std::unique_ptr<CandidateIterator> iterator_;
size_t num_workers_;
auto schedule() -> void;
};
} // namespace database } // namespace database

Loading…
Cancel
Save