Batch up the db operations associated with adding new tracks

This is ostensibly yet another 'prepare for multithreaded updates'
commit, however it does actually save us another 60(!!) odd milliseconds
per track.
custom
jacqueline 9 months ago
parent b5dc53670a
commit 30aaefca64
  1. 183
      src/tangara/database/database.cpp
  2. 14
      src/tangara/database/database.hpp
  3. 57
      src/tangara/database/index.cpp
  4. 3
      src/tangara/database/index.hpp
  5. 11
      src/tangara/database/track.cpp
  6. 2
      src/tangara/database/track.hpp

@ -352,11 +352,19 @@ auto Database::updateIndexes() -> void {
// We couldn't read the tags for this track. Either they were // We couldn't read the tags for this track. Either they were
// malformed, or perhaps the file is missing. Either way, tombstone // malformed, or perhaps the file is missing. Either way, tombstone
// this record. // this record.
ESP_LOGW(kTag, "entombing missing #%lx", track->id); ESP_LOGI(kTag, "entombing missing #%lx", track->id);
// Remove the indexes first, so that interrupted operations don't leave
// dangling index records.
dbRemoveIndexes(track); dbRemoveIndexes(track);
// Do the rest of the tombstoning as one atomic write.
leveldb::WriteBatch batch;
track->is_tombstoned = true; track->is_tombstoned = true;
dbPutTrackData(*track); batch.Put(EncodeDataKey(track->id), EncodeDataValue(*track));
db_->Delete(leveldb::WriteOptions{}, EncodePathKey(track->filepath)); batch.Delete(EncodePathKey(track->filepath));
db_->Write(leveldb::WriteOptions(), &batch);
continue; continue;
} }
@ -370,12 +378,20 @@ auto Database::updateIndexes() -> void {
// database. // database.
ESP_LOGI(kTag, "updating hash (%llx -> %llx)", track->tags_hash, ESP_LOGI(kTag, "updating hash (%llx -> %llx)", track->tags_hash,
new_hash); new_hash);
// Again, we remove the old index records first so has to avoid
// dangling references.
dbRemoveIndexes(track); dbRemoveIndexes(track);
// Atomically correct the hash + create the new index records.
leveldb::WriteBatch batch;
track->tags_hash = new_hash; track->tags_hash = new_hash;
dbIngestTagHashes(*tags, track->individual_tag_hashes); dbIngestTagHashes(*tags, track->individual_tag_hashes, batch);
dbPutTrackData(*track);
dbPutHash(new_hash, track->id); dbCreateIndexesForTrack(*track, *tags, batch);
batch.Put(EncodeDataKey(track->id), EncodeDataValue(*track));
batch.Put(EncodeHashKey(new_hash), EncodeHashValue(track->id));
db_->Write(leveldb::WriteOptions(), &batch);
} }
} }
} }
@ -404,72 +420,56 @@ auto Database::updateIndexes() -> void {
return; return;
} }
// Check for any existing record with the same hash. // Check for any existing track with the same hash.
uint64_t hash = tags->Hash(); uint64_t hash = tags->Hash();
std::string key = EncodeHashKey(hash); std::optional<TrackId> existing_id;
std::optional<TrackId> existing_hash;
std::string raw_entry; std::string raw_entry;
if (db_->Get(leveldb::ReadOptions(), key, &raw_entry).ok()) { if (db_->Get(leveldb::ReadOptions(), EncodeHashKey(hash), &raw_entry)
existing_hash = ParseHashValue(raw_entry); .ok()) {
existing_id = ParseHashValue(raw_entry);
} }
std::pair<uint16_t, uint16_t> modified{info.fdate, info.ftime}; std::shared_ptr<TrackData> data;
if (!existing_hash) { if (existing_id) {
// We've never met this track before! Or we have, but the entry is // Do we have any existing data for this track? This could be the case if
// malformed. Either way, record this as a new track. // this is a tombstoned entry. In such as case, we want to reuse the
TrackId id = dbMintNewTrackId(); // previous TrackData so that any extra metadata is preserved.
ESP_LOGD(kTag, "recording new 0x%lx", id); data = dbGetTrackData(*existing_id);
if (!data) {
data = std::make_shared<TrackData>();
data->id = *existing_id;
} else if (data->filepath != path) {
ESP_LOGW(kTag, "hash collision: %s, %s, %s",
tags->title().value_or("no title").c_str(),
tags->artist().value_or("no artist").c_str(),
tags->album().value_or("no album").c_str());
// Don't commit anything if there's a hash collision, since we're
// likely to make a big mess.
return;
}
} else {
num_new_tracks++; num_new_tracks++;
data = std::make_shared<TrackData>();
auto data = std::make_shared<TrackData>(); data->id = dbMintNewTrackId();
data->id = id;
data->filepath = path;
data->tags_hash = hash;
data->modified_at = modified;
dbIngestTagHashes(*tags, data->individual_tag_hashes);
dbPutTrackData(*data);
dbPutHash(hash, id);
auto t = std::make_shared<Track>(data, tags);
dbCreateIndexesForTrack(*t);
db_->Put(leveldb::WriteOptions{}, EncodePathKey(path),
TrackIdToBytes(id));
return;
} }
std::shared_ptr<TrackData> existing_data = dbGetTrackData(*existing_hash); // Make sure the file-based metadata on the TrackData is up to date.
if (!existing_data) { data->filepath = path;
// We found a hash that matches, but there's no data record? Weird. data->tags_hash = hash;
auto new_data = std::make_shared<TrackData>(); data->modified_at = {info.fdate, info.ftime};
new_data->id = dbMintNewTrackId();
new_data->filepath = path;
new_data->tags_hash = hash;
new_data->modified_at = modified;
dbIngestTagHashes(*tags, new_data->individual_tag_hashes);
dbPutTrackData(*new_data);
auto t = std::make_shared<Track>(new_data, tags);
dbCreateIndexesForTrack(*t);
db_->Put(leveldb::WriteOptions{}, EncodePathKey(path),
TrackIdToBytes(new_data->id));
return;
}
if (existing_data->is_tombstoned) { // Apply all the actual database changes as one atomic batch. This makes
ESP_LOGI(kTag, "exhuming track %lu", existing_data->id); // the whole 'new track' operation atomic, and also reduces the amount of
existing_data->is_tombstoned = false; // lock contention when adding many tracks at once.
existing_data->modified_at = modified; leveldb::WriteBatch batch;
dbPutTrackData(*existing_data); dbIngestTagHashes(*tags, data->individual_tag_hashes, batch);
auto t = std::make_shared<Track>(existing_data, tags);
dbCreateIndexesForTrack(*t); dbCreateIndexesForTrack(*data, *tags, batch);
db_->Put(leveldb::WriteOptions{}, EncodePathKey(path), batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data));
TrackIdToBytes(existing_data->id)); batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id));
} else if (existing_data->filepath != batch.Put(EncodePathKey(path), TrackIdToBytes(data->id));
std::pmr::string{path.data(), path.size()}) {
ESP_LOGW(kTag, "hash collision: %s, %s, %s", db_->Write(leveldb::WriteOptions(), &batch);
tags->title().value_or("no title").c_str(),
tags->artist().value_or("no artist").c_str(),
tags->album().value_or("no album").c_str());
}
}); });
uint64_t end_time = esp_timer_get_time(); uint64_t end_time = esp_timer_get_time();
@ -536,22 +536,6 @@ auto Database::dbMintNewTrackId() -> TrackId {
return next_track_id_++; return next_track_id_++;
} }
auto Database::dbEntomb(TrackId id, uint64_t hash) -> void {
std::string key = EncodeHashKey(hash);
std::string val = EncodeHashValue(id);
if (!db_->Put(leveldb::WriteOptions(), key, val).ok()) {
ESP_LOGE(kTag, "failed to entomb #%llx (id #%lx)", hash, id);
}
}
auto Database::dbPutTrackData(const TrackData& s) -> void {
std::string key = EncodeDataKey(s.id);
std::string val = EncodeDataValue(s);
if (!db_->Put(leveldb::WriteOptions(), key, val).ok()) {
ESP_LOGE(kTag, "failed to write data for #%lx", s.id);
}
}
auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> { auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> {
std::string key = EncodeDataKey(id); std::string key = EncodeDataKey(id);
std::string raw_val; std::string raw_val;
@ -562,33 +546,19 @@ auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> {
return ParseDataValue(raw_val); return ParseDataValue(raw_val);
} }
auto Database::dbPutHash(const uint64_t& hash, TrackId i) -> void { auto Database::dbCreateIndexesForTrack(const Track& track,
std::string key = EncodeHashKey(hash); leveldb::WriteBatch& batch) -> void {
std::string val = EncodeHashValue(i); dbCreateIndexesForTrack(track.data(), track.tags(), batch);
if (!db_->Put(leveldb::WriteOptions(), key, val).ok()) {
ESP_LOGE(kTag, "failed to write hash for #%lx", i);
}
}
auto Database::dbGetHash(const uint64_t& hash) -> std::optional<TrackId> {
std::string key = EncodeHashKey(hash);
std::string raw_val;
if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) {
ESP_LOGW(kTag, "no key found for hash #%llx", hash);
return {};
}
return ParseHashValue(raw_val);
} }
auto Database::dbCreateIndexesForTrack(const Track& track) -> void { auto Database::dbCreateIndexesForTrack(const TrackData& data,
const TrackTags& tags,
leveldb::WriteBatch& batch) -> void {
for (const IndexInfo& index : getIndexes()) { for (const IndexInfo& index : getIndexes()) {
leveldb::WriteBatch writes; auto entries = Index(collator_, index, data, tags);
auto entries = Index(collator_, index, track);
for (const auto& it : entries) { for (const auto& it : entries) {
writes.Put(EncodeIndexKey(it.first), batch.Put(EncodeIndexKey(it.first), {it.second.data(), it.second.size()});
{it.second.data(), it.second.size()});
} }
db_->Write(leveldb::WriteOptions(), &writes);
} }
} }
@ -597,9 +567,8 @@ auto Database::dbRemoveIndexes(std::shared_ptr<TrackData> data) -> void {
if (!tags) { if (!tags) {
return; return;
} }
Track track{data, tags};
for (const IndexInfo& index : getIndexes()) { for (const IndexInfo& index : getIndexes()) {
auto entries = Index(collator_, index, track); auto entries = Index(collator_, index, *data, *tags);
for (auto it = entries.rbegin(); it != entries.rend(); it++) { for (auto it = entries.rbegin(); it != entries.rend(); it++) {
auto key = EncodeIndexKey(it->first); auto key = EncodeIndexKey(it->first);
auto status = db_->Delete(leveldb::WriteOptions{}, key); auto status = db_->Delete(leveldb::WriteOptions{}, key);
@ -626,16 +595,14 @@ auto Database::dbRemoveIndexes(std::shared_ptr<TrackData> data) -> void {
} }
auto Database::dbIngestTagHashes(const TrackTags& tags, auto Database::dbIngestTagHashes(const TrackTags& tags,
std::pmr::unordered_map<Tag, uint64_t>& out) std::pmr::unordered_map<Tag, uint64_t>& out,
-> void { leveldb::WriteBatch& batch) -> void {
leveldb::WriteBatch batch{};
for (const auto& tag : tags.allPresent()) { for (const auto& tag : tags.allPresent()) {
auto val = tags.get(tag); auto val = tags.get(tag);
auto hash = tagHash(val); auto hash = tagHash(val);
batch.Put(EncodeTagHashKey(hash), tagToString(val)); batch.Put(EncodeTagHashKey(hash), tagToString(val));
out[tag] = hash; out[tag] = hash;
} }
db_->Write(leveldb::WriteOptions{}, &batch);
} }
auto Database::dbRecoverTagsFromHashes( auto Database::dbRecoverTagsFromHashes(

@ -29,6 +29,7 @@
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/write_batch.h"
#include "memory_resource.hpp" #include "memory_resource.hpp"
#include "result.hpp" #include "result.hpp"
#include "tasks.hpp" #include "tasks.hpp"
@ -111,17 +112,18 @@ class Database {
auto dbCalculateNextTrackId() -> void; auto dbCalculateNextTrackId() -> void;
auto dbMintNewTrackId() -> TrackId; auto dbMintNewTrackId() -> TrackId;
auto dbEntomb(TrackId track, uint64_t hash) -> void;
auto dbPutTrackData(const TrackData& s) -> void;
auto dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData>; auto dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData>;
auto dbPutHash(const uint64_t& hash, TrackId i) -> void;
auto dbGetHash(const uint64_t& hash) -> std::optional<TrackId>;
auto dbCreateIndexesForTrack(const Track& track) -> void; auto dbCreateIndexesForTrack(const Track&, leveldb::WriteBatch&) -> void;
auto dbCreateIndexesForTrack(const TrackData&,
const TrackTags&,
leveldb::WriteBatch&) -> void;
auto dbRemoveIndexes(std::shared_ptr<TrackData>) -> void; auto dbRemoveIndexes(std::shared_ptr<TrackData>) -> void;
auto dbIngestTagHashes(const TrackTags&, auto dbIngestTagHashes(const TrackTags&,
std::pmr::unordered_map<Tag, uint64_t>&) -> void; std::pmr::unordered_map<Tag, uint64_t>&,
leveldb::WriteBatch&) -> void;
auto dbRecoverTagsFromHashes(const std::pmr::unordered_map<Tag, uint64_t>&) auto dbRecoverTagsFromHashes(const std::pmr::unordered_map<Tag, uint64_t>&)
-> std::shared_ptr<TrackTags>; -> std::shared_ptr<TrackTags>;

@ -52,10 +52,29 @@ const IndexInfo kAllAlbums{
.components = {Tag::kAlbum, Tag::kAlbumOrder}, .components = {Tag::kAlbum, Tag::kAlbumOrder},
}; };
static auto titleOrFilename(const TrackData& data, const TrackTags& tags)
-> std::pmr::string {
auto title = tags.title();
if (title) {
return *title;
}
auto start = data.filepath.find_last_of('/');
if (start == std::pmr::string::npos) {
return data.filepath;
}
return data.filepath.substr(start + 1);
}
class Indexer { class Indexer {
public: public:
Indexer(locale::ICollator& collator, const Track& t, const IndexInfo& idx) Indexer(locale::ICollator& collator,
: collator_(collator), track_(t), index_(idx) {} const IndexInfo& idx,
const TrackData& data,
const TrackTags& tags)
: collator_(collator),
index_(idx),
track_data_(data),
track_tags_(tags) {}
auto index() -> std::vector<std::pair<IndexKey, std::string>>; auto index() -> std::vector<std::pair<IndexKey, std::string>>;
@ -70,14 +89,13 @@ class Indexer {
auto missing_value(Tag tag) -> TagValue { auto missing_value(Tag tag) -> TagValue {
switch (tag) { switch (tag) {
case Tag::kTitle: case Tag::kTitle:
return track_.TitleOrFilename(); return titleOrFilename(track_data_, track_tags_);
case Tag::kArtist: case Tag::kArtist:
return "Unknown Artist"; return "Unknown Artist";
case Tag::kAlbum: case Tag::kAlbum:
return "Unknown Album"; return "Unknown Album";
case Tag::kAlbumArtist: case Tag::kAlbumArtist:
return track_.tags().artist().value_or("Unknown Artist"); return track_tags_.artist().value_or("Unknown Artist");
return "Unknown Album";
case Tag::kGenres: case Tag::kGenres:
return std::pmr::vector<std::pmr::string>{}; return std::pmr::vector<std::pmr::string>{};
case Tag::kDisc: case Tag::kDisc:
@ -91,8 +109,9 @@ class Indexer {
} }
locale::ICollator& collator_; locale::ICollator& collator_;
const Track& track_;
const IndexInfo index_; const IndexInfo index_;
const TrackData& track_data_;
const TrackTags& track_tags_;
std::vector<std::pair<IndexKey, std::string>> out_; std::vector<std::pair<IndexKey, std::string>> out_;
}; };
@ -113,7 +132,7 @@ auto Indexer::index() -> std::vector<std::pair<IndexKey, std::string>> {
auto Indexer::handleLevel(const IndexKey::Header& header, auto Indexer::handleLevel(const IndexKey::Header& header,
std::span<const Tag> components) -> void { std::span<const Tag> components) -> void {
Tag component = components.front(); Tag component = components.front();
TagValue value = track_.tags().get(component); TagValue value = track_tags_.get(component);
if (std::holds_alternative<std::monostate>(value)) { if (std::holds_alternative<std::monostate>(value)) {
value = missing_value(component); value = missing_value(component);
} }
@ -157,21 +176,17 @@ auto Indexer::handleItem(const IndexKey::Header& header,
auto xfrm = collator_.Transform(value); auto xfrm = collator_.Transform(value);
key.item = {xfrm.data(), xfrm.size()}; key.item = {xfrm.data(), xfrm.size()};
} else if constexpr (std::is_same_v<T, uint32_t>) { } else if constexpr (std::is_same_v<T, uint32_t>) {
value = std::to_string(arg); // CBOR's varint encoding actually works great for lexicographical
// FIXME: this sucks lol. we should just write the number directly, // sorting.
// LSB-first, but then we need to be able to parse it back properly. key.item = cppbor::Uint{arg}.toString();
std::ostringstream str;
str << std::setw(8) << std::setfill('0') << arg;
std::string encoded = str.str();
key.item = {encoded.data(), encoded.size()};
} }
}, },
item); item);
std::optional<IndexKey::Header> next_level; std::optional<IndexKey::Header> next_level;
if (components.size() == 1) { if (components.size() == 1) {
value = track_.TitleOrFilename(); value = titleOrFilename(track_data_, track_tags_);
key.track = track_.data().id; key.track = track_data_.id;
} else { } else {
next_level = ExpandHeader(key.header, key.item); next_level = ExpandHeader(key.header, key.item);
} }
@ -183,10 +198,12 @@ auto Indexer::handleItem(const IndexKey::Header& header,
} }
} }
auto Index(locale::ICollator& c, auto Index(locale::ICollator& collator,
const IndexInfo& i, const IndexInfo& index,
const Track& t) -> std::vector<std::pair<IndexKey, std::string>> { const TrackData& data,
Indexer indexer{c, t, i}; const TrackTags& tags)
-> std::vector<std::pair<IndexKey, std::string>> {
Indexer indexer{collator, index, data, tags};
return indexer.index(); return indexer.index();
} }

@ -63,7 +63,8 @@ struct IndexKey {
auto Index(locale::ICollator&, auto Index(locale::ICollator&,
const IndexInfo&, const IndexInfo&,
const Track&) -> std::vector<std::pair<IndexKey, std::string>>; const TrackData&,
const TrackTags&) -> std::vector<std::pair<IndexKey, std::string>>;
auto ExpandHeader(const IndexKey::Header&, auto ExpandHeader(const IndexKey::Header&,
const std::optional<std::pmr::string>&) -> IndexKey::Header; const std::optional<std::pmr::string>&) -> IndexKey::Header;

@ -293,15 +293,4 @@ auto TrackTags::Hash() const -> uint64_t {
return komihash_stream_final(&stream); return komihash_stream_final(&stream);
} }
auto Track::TitleOrFilename() const -> std::pmr::string {
auto title = tags().title();
if (title) {
return *title;
}
auto start = data().filepath.find_last_of('/');
if (start == std::pmr::string::npos) {
return data().filepath;
}
return data().filepath.substr(start + 1);
}
} // namespace database } // namespace database

@ -195,8 +195,6 @@ class Track {
auto data() const -> const TrackData& { return *data_; } auto data() const -> const TrackData& { return *data_; }
auto tags() const -> const TrackTags& { return *tags_; } auto tags() const -> const TrackTags& { return *tags_; }
auto TitleOrFilename() const -> std::pmr::string;
private: private:
std::shared_ptr<const TrackData> data_; std::shared_ptr<const TrackData> data_;
std::shared_ptr<TrackTags> tags_; std::shared_ptr<TrackTags> tags_;

Loading…
Cancel
Save