Make lua db iterators async

custom
jacqueline 1 year ago
parent 5afdb89f9a
commit cd46d7bd20
  1. 54
      src/database/database.cpp
  2. 14
      src/database/include/database.hpp
  3. 17
      src/lua/lua_database.cpp

@ -858,7 +858,7 @@ auto IndexRecord::Expand(std::size_t page_size) const
}
Iterator::Iterator(std::weak_ptr<Database> db, const IndexInfo& idx)
: db_(db), prev_pos_(), current_pos_() {
: db_(db), pos_mutex_(), current_pos_(), prev_pos_() {
std::string prefix = EncodeIndexPrefix(
IndexKey::Header{.id = idx.id, .depth = 0, .components_hash = 0});
current_pos_ = Continuation{.prefix = {prefix.data(), prefix.size()},
@ -869,36 +869,50 @@ Iterator::Iterator(std::weak_ptr<Database> db, const IndexInfo& idx)
}
Iterator::Iterator(std::weak_ptr<Database> db, const Continuation& c)
: db_(db), prev_pos_(), current_pos_(c) {}
: db_(db), pos_mutex_(), current_pos_(c), prev_pos_() {}
auto Iterator::Prev() -> std::optional<IndexRecord> {
if (!prev_pos_) {
return {};
}
auto Iterator::Next(Callback cb) -> void {
auto db = db_.lock();
if (!db) {
return {};
InvokeNull(cb);
return;
}
db->worker_task_->Dispatch<void>([=]() {
std::lock_guard lock{pos_mutex_};
if (!current_pos_) {
InvokeNull(cb);
return;
}
std::unique_ptr<Result<IndexRecord>> res{
db->GetPage<IndexRecord>(&*prev_pos_).get()};
prev_pos_ = res->prev_page();
current_pos_ = prev_pos_;
return *res->values()[0];
db->dbGetPage<IndexRecord>(*current_pos_)};
prev_pos_ = current_pos_;
current_pos_ = res->next_page();
std::invoke(cb, *res->values()[0]);
});
}
auto Iterator::Next() -> std::optional<IndexRecord> {
if (!current_pos_) {
return {};
}
auto Iterator::Prev(Callback cb) -> void {
auto db = db_.lock();
if (!db) {
return {};
InvokeNull(cb);
return;
}
db->worker_task_->Dispatch<void>([=]() {
std::lock_guard lock{pos_mutex_};
if (!prev_pos_) {
InvokeNull(cb);
return;
}
std::unique_ptr<Result<IndexRecord>> res{
db->GetPage<IndexRecord>(&*current_pos_).get()};
prev_pos_ = current_pos_;
current_pos_ = res->next_page();
return *res->values()[0];
db->dbGetPage<IndexRecord>(*current_pos_)};
current_pos_ = prev_pos_;
prev_pos_ = res->prev_page();
std::invoke(cb, *res->values()[0]);
});
}
auto Iterator::InvokeNull(Callback cb) -> void {
std::invoke(cb, std::optional<IndexRecord>{});
}
} // namespace database

@ -129,6 +129,8 @@ class Database {
Database& operator=(const Database&) = delete;
private:
friend class Iterator;
// Owned. Dumb pointers because destruction needs to be done in an explicit
// order.
leveldb::DB* db_;
@ -191,13 +193,19 @@ class Iterator {
Iterator(std::weak_ptr<Database>, const IndexInfo&);
Iterator(std::weak_ptr<Database>, const Continuation&);
auto Prev() -> std::optional<IndexRecord>;
auto Next() -> std::optional<IndexRecord>;
using Callback = std::function<void(std::optional<IndexRecord>)>;
auto Next(Callback) -> void;
auto Prev(Callback) -> void;
private:
auto InvokeNull(Callback) -> void;
std::weak_ptr<Database> db_;
std::optional<Continuation> prev_pos_;
std::mutex pos_mutex_;
std::optional<Continuation> current_pos_;
std::optional<Continuation> prev_pos_;
};
} // namespace database

@ -56,17 +56,28 @@ static const struct luaL_Reg kDatabaseFuncs[] = {{"indexes", indexes},
{NULL, NULL}};
static auto db_iterate(lua_State* state) -> int {
luaL_checktype(state, 1, LUA_TFUNCTION);
int callback_ref = luaL_ref(state, LUA_REGISTRYINDEX);
database::Iterator* it = *reinterpret_cast<database::Iterator**>(
lua_touserdata(state, lua_upvalueindex(1)));
auto res = it->Next();
it->Next([=](std::optional<database::IndexRecord> res) {
events::Ui().RunOnTask([=]() {
lua_rawgeti(state, LUA_REGISTRYINDEX, callback_ref);
if (res) {
database::IndexRecord** record = reinterpret_cast<database::IndexRecord**>(
database::IndexRecord** record =
reinterpret_cast<database::IndexRecord**>(
lua_newuserdata(state, sizeof(uintptr_t)));
*record = new database::IndexRecord(*res);
luaL_setmetatable(state, kDbRecordMetatable);
return 1;
} else {
lua_pushnil(state);
}
lua_call(state, 1, 0);
luaL_unref(state, LUA_REGISTRYINDEX, callback_ref);
});
});
return 0;
}

Loading…
Cancel
Save