Use a single pool of bg workers instead of separate tasks per use case

Also: bump the number of workers tasks up to 3 from 2!
This makes bg db updates + playback work :)
custom
jacqueline 1 year ago
parent fde45dba39
commit dad159dc3c
  1. 2
      src/audio/fatfs_audio_input.cpp
  2. 4
      src/audio/include/fatfs_audio_input.hpp
  3. 4
      src/audio/include/readahead_source.hpp
  4. 4
      src/audio/include/track_queue.hpp
  5. 2
      src/audio/readahead_source.cpp
  6. 2
      src/audio/track_queue.cpp
  7. 4
      src/database/database.cpp
  8. 2
      src/database/env_esp.cpp
  9. 2
      src/database/include/database.hpp
  10. 2
      src/database/include/env_esp.hpp
  11. 3
      src/system_fsm/booting.cpp
  12. 14
      src/system_fsm/include/service_locator.hpp
  13. 119
      src/tasks/tasks.cpp
  14. 44
      src/tasks/tasks.hpp

@ -44,7 +44,7 @@
namespace audio { namespace audio {
FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser, FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser,
tasks::Worker& bg_worker) tasks::WorkerPool& bg_worker)
: IAudioSource(), : IAudioSource(),
tag_parser_(tag_parser), tag_parser_(tag_parser),
bg_worker_(bg_worker), bg_worker_(bg_worker),

@ -31,7 +31,7 @@ namespace audio {
*/ */
class FatfsAudioInput : public IAudioSource { class FatfsAudioInput : public IAudioSource {
public: public:
explicit FatfsAudioInput(database::ITagParser&, tasks::Worker&); explicit FatfsAudioInput(database::ITagParser&, tasks::WorkerPool&);
~FatfsAudioInput(); ~FatfsAudioInput();
/* /*
@ -55,7 +55,7 @@ class FatfsAudioInput : public IAudioSource {
-> std::optional<codecs::StreamType>; -> std::optional<codecs::StreamType>;
database::ITagParser& tag_parser_; database::ITagParser& tag_parser_;
tasks::Worker& bg_worker_; tasks::WorkerPool& bg_worker_;
std::mutex new_stream_mutex_; std::mutex new_stream_mutex_;
std::shared_ptr<TaggedStream> new_stream_; std::shared_ptr<TaggedStream> new_stream_;

@ -27,7 +27,7 @@ namespace audio {
*/ */
class ReadaheadSource : public codecs::IStream { class ReadaheadSource : public codecs::IStream {
public: public:
ReadaheadSource(tasks::Worker&, std::unique_ptr<codecs::IStream>); ReadaheadSource(tasks::WorkerPool&, std::unique_ptr<codecs::IStream>);
~ReadaheadSource(); ~ReadaheadSource();
auto Read(cpp::span<std::byte> dest) -> ssize_t override; auto Read(cpp::span<std::byte> dest) -> ssize_t override;
@ -48,7 +48,7 @@ class ReadaheadSource : public codecs::IStream {
private: private:
auto BeginReadahead() -> void; auto BeginReadahead() -> void;
tasks::Worker& worker_; tasks::WorkerPool& worker_;
std::unique_ptr<codecs::IStream> wrapped_; std::unique_ptr<codecs::IStream> wrapped_;
bool readahead_enabled_; bool readahead_enabled_;

@ -57,7 +57,7 @@ class RandomIterator {
*/ */
class TrackQueue { class TrackQueue {
public: public:
TrackQueue(tasks::Worker& bg_worker); TrackQueue(tasks::WorkerPool& bg_worker);
/* Returns the currently playing track. */ /* Returns the currently playing track. */
auto current() const -> std::optional<database::TrackId>; auto current() const -> std::optional<database::TrackId>;
@ -105,7 +105,7 @@ class TrackQueue {
private: private:
mutable std::shared_mutex mutex_; mutable std::shared_mutex mutex_;
tasks::Worker& bg_worker_; tasks::WorkerPool& bg_worker_;
size_t pos_; size_t pos_;
std::pmr::vector<database::TrackId> tracks_; std::pmr::vector<database::TrackId> tracks_;

@ -27,7 +27,7 @@ namespace audio {
static constexpr char kTag[] = "readahead"; static constexpr char kTag[] = "readahead";
static constexpr size_t kBufferSize = 1024 * 512; static constexpr size_t kBufferSize = 1024 * 512;
ReadaheadSource::ReadaheadSource(tasks::Worker& worker, ReadaheadSource::ReadaheadSource(tasks::WorkerPool& worker,
std::unique_ptr<codecs::IStream> wrapped) std::unique_ptr<codecs::IStream> wrapped)
: IStream(wrapped->type()), : IStream(wrapped->type()),
worker_(worker), worker_(worker),

@ -74,7 +74,7 @@ auto notifyChanged(bool current_changed) -> void {
events::Audio().Dispatch(ev); events::Audio().Dispatch(ev);
} }
TrackQueue::TrackQueue(tasks::Worker& bg_worker) TrackQueue::TrackQueue(tasks::WorkerPool& bg_worker)
: mutex_(), : mutex_(),
bg_worker_(bg_worker), bg_worker_(bg_worker),
pos_(0), pos_(0),

@ -126,14 +126,14 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool {
auto Database::Open(IFileGatherer& gatherer, auto Database::Open(IFileGatherer& gatherer,
ITagParser& parser, ITagParser& parser,
locale::ICollator& collator, locale::ICollator& collator,
tasks::Worker& bg_worker) tasks::WorkerPool& bg_worker)
-> cpp::result<Database*, DatabaseError> { -> cpp::result<Database*, DatabaseError> {
if (sIsDbOpen.exchange(true)) { if (sIsDbOpen.exchange(true)) {
return cpp::fail(DatabaseError::ALREADY_OPEN); return cpp::fail(DatabaseError::ALREADY_OPEN);
} }
if (!leveldb::sBackgroundThread) { if (!leveldb::sBackgroundThread) {
leveldb::sBackgroundThread = tasks::Worker::Start<tasks::Type::kDatabase>(); leveldb::sBackgroundThread = &bg_worker;
} }
return bg_worker return bg_worker

@ -41,7 +41,7 @@
namespace leveldb { namespace leveldb {
tasks::Worker *sBackgroundThread = nullptr; tasks::WorkerPool *sBackgroundThread = nullptr;
std::string ErrToStr(FRESULT err) { std::string ErrToStr(FRESULT err) {
switch (err) { switch (err) {

@ -56,7 +56,7 @@ class Database {
static auto Open(IFileGatherer& file_gatherer, static auto Open(IFileGatherer& file_gatherer,
ITagParser& tag_parser, ITagParser& tag_parser,
locale::ICollator& collator, locale::ICollator& collator,
tasks::Worker& bg_worker) tasks::WorkerPool& bg_worker)
-> cpp::result<Database*, DatabaseError>; -> cpp::result<Database*, DatabaseError>;
static auto Destroy() -> void; static auto Destroy() -> void;

@ -18,7 +18,7 @@
namespace leveldb { namespace leveldb {
extern tasks::Worker* sBackgroundThread; extern tasks::WorkerPool* sBackgroundThread;
// Tracks the files locked by EspEnv::LockFile(). // Tracks the files locked by EspEnv::LockFile().
// //

@ -68,8 +68,7 @@ auto Booting::entry() -> void {
} }
ESP_LOGI(kTag, "starting bg worker"); ESP_LOGI(kTag, "starting bg worker");
sServices->bg_worker(std::unique_ptr<tasks::Worker>{ sServices->bg_worker(std::make_unique<tasks::WorkerPool>());
tasks::Worker::Start<tasks::Type::kBackgroundWorker>()});
ESP_LOGI(kTag, "installing remaining drivers"); ESP_LOGI(kTag, "installing remaining drivers");
drivers::spiffs_mount(); drivers::spiffs_mount();

@ -80,13 +80,9 @@ class ServiceLocator {
touchwheel_ = std::move(i); touchwheel_ = std::move(i);
} }
auto haptics() -> drivers::Haptics& { auto haptics() -> drivers::Haptics& { return *haptics_; }
return *haptics_;
}
auto haptics(std::unique_ptr<drivers::Haptics> i) { auto haptics(std::unique_ptr<drivers::Haptics> i) { haptics_ = std::move(i); }
haptics_ = std::move(i);
}
auto database() -> std::weak_ptr<database::Database> { return database_; } auto database() -> std::weak_ptr<database::Database> { return database_; }
@ -121,12 +117,12 @@ class ServiceLocator {
collator_ = std::move(i); collator_ = std::move(i);
} }
auto bg_worker() -> tasks::Worker& { auto bg_worker() -> tasks::WorkerPool& {
assert(bg_worker_ != nullptr); assert(bg_worker_ != nullptr);
return *bg_worker_; return *bg_worker_;
} }
auto bg_worker(std::unique_ptr<tasks::Worker> w) -> void { auto bg_worker(std::unique_ptr<tasks::WorkerPool> w) -> void {
bg_worker_ = std::move(w); bg_worker_ = std::move(w);
} }
@ -149,7 +145,7 @@ class ServiceLocator {
std::unique_ptr<database::ITagParser> tag_parser_; std::unique_ptr<database::ITagParser> tag_parser_;
std::unique_ptr<locale::ICollator> collator_; std::unique_ptr<locale::ICollator> collator_;
std::unique_ptr<tasks::Worker> bg_worker_; std::unique_ptr<tasks::WorkerPool> bg_worker_;
drivers::SdState sd_; drivers::SdState sd_;
}; };

@ -31,14 +31,6 @@ template <>
auto Name<Type::kAudioConverter>() -> std::pmr::string { auto Name<Type::kAudioConverter>() -> std::pmr::string {
return "audio_conv"; return "audio_conv";
} }
template <>
auto Name<Type::kDatabase>() -> std::pmr::string {
return "db_fg";
}
template <>
auto Name<Type::kBackgroundWorker>() -> std::pmr::string {
return "bg_worker";
}
template <Type t> template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>; auto AllocateStack() -> cpp::span<StackType_t>;
@ -68,14 +60,10 @@ auto AllocateStack<Type::kAudioConverter>() -> cpp::span<StackType_t> {
static StackType_t sStack[size]; static StackType_t sStack[size];
return {sStack, size}; return {sStack, size};
} }
// Leveldb is designed for non-embedded use cases, where stack space isn't so // Background workers receive huge stacks in PSRAM. This is mostly to faciliate
// much of a concern. It therefore uses an eye-wateringly large amount of stack. // use of LevelDB from any bg worker; Leveldb is designed for non-embedded use
template <> // cases, where large stack usage isn't so much of a concern. It therefore uses
auto AllocateStack<Type::kDatabase>() -> cpp::span<StackType_t> { // an eye-wateringly large amount of stack.
std::size_t size = 256 * 1024;
return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)),
size};
}
template <> template <>
auto AllocateStack<Type::kBackgroundWorker>() -> cpp::span<StackType_t> { auto AllocateStack<Type::kBackgroundWorker>() -> cpp::span<StackType_t> {
std::size_t size = 256 * 1024; std::size_t size = 256 * 1024;
@ -115,26 +103,10 @@ auto Priority<Type::kUi>() -> UBaseType_t {
// couple of ms extra delay due to scheduling, so give this task the lowest // couple of ms extra delay due to scheduling, so give this task the lowest
// priority. // priority.
template <> template <>
auto Priority<Type::kDatabase>() -> UBaseType_t {
return 2;
}
template <>
auto Priority<Type::kBackgroundWorker>() -> UBaseType_t { auto Priority<Type::kBackgroundWorker>() -> UBaseType_t {
return 1; return 1;
} }
template <Type t>
auto WorkerQueueSize() -> std::size_t;
template <>
auto WorkerQueueSize<Type::kDatabase>() -> std::size_t {
return 8;
}
template <>
auto WorkerQueueSize<Type::kBackgroundWorker>() -> std::size_t {
return 8;
}
auto PersistentMain(void* fn) -> void { auto PersistentMain(void* fn) -> void {
auto* function = reinterpret_cast<std::function<void(void)>*>(fn); auto* function = reinterpret_cast<std::function<void(void)>*>(fn);
std::invoke(*function); std::invoke(*function);
@ -142,69 +114,50 @@ auto PersistentMain(void* fn) -> void {
vTaskDelete(NULL); vTaskDelete(NULL);
} }
auto Worker::Main(void* instance) { auto WorkerPool::Main(void* q) {
Worker* i = reinterpret_cast<Worker*>(instance); QueueHandle_t queue = reinterpret_cast<QueueHandle_t>(q);
while (1) { while (1) {
WorkItem item; WorkItem item;
if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) { if (xQueueReceive(queue, &item, portMAX_DELAY)) {
if (item.quit) { std::invoke(*item);
break; delete item;
} else if (item.fn != nullptr) {
std::invoke(*item.fn);
delete item.fn;
} }
} }
} }
i->is_task_running_.store(false);
i->is_task_running_.notify_all(); static constexpr size_t kNumWorkers = 3;
// Wait for the instance's destructor to delete this task. We do this instead static constexpr size_t kMaxPendingItems = 8;
// of just deleting ourselves so that it's 100% certain that it's safe to
// delete or reuse this task's stack. WorkerPool::WorkerPool()
while (1) { : queue_(xQueueCreate(kMaxPendingItems, sizeof(WorkItem))) {
vTaskDelay(portMAX_DELAY); for (size_t i = 0; i < kNumWorkers; i++) {
} auto stack = AllocateStack<Type::kBackgroundWorker>();
} // Task buffers must be in internal ram. Thankfully they're fairly small.
auto buffer = reinterpret_cast<StaticTask_t*>(heap_caps_malloc(
Worker::Worker(const std::pmr::string& name, sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT));
cpp::span<StackType_t> stack,
std::size_t queue_size, std::string name = "worker_" + std::to_string(i);
UBaseType_t priority)
: stack_(stack.data()), xTaskCreateStatic(&Main, name.c_str(), stack.size(), queue_,
queue_(xQueueCreate(queue_size, sizeof(WorkItem))), Priority<Type::kBackgroundWorker>(), stack.data(),
is_task_running_(true), buffer);
task_buffer_(static_cast<StaticTask_t*>( }
heap_caps_malloc(sizeof(StaticTask_t), }
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))),
task_(xTaskCreateStatic(&Main, WorkerPool::~WorkerPool() {
name.c_str(), // This should never happen!
stack.size(), assert("worker pool destroyed" == 0);
this,
priority,
stack_,
task_buffer_)) {}
Worker::~Worker() {
WorkItem item{
.fn = nullptr,
.quit = true,
};
xQueueSend(queue_, &item, portMAX_DELAY);
is_task_running_.wait(true);
vTaskDelete(task_);
free(stack_);
} }
template <> template <>
auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void> { auto WorkerPool::Dispatch(const std::function<void(void)> fn)
-> std::future<void> {
std::shared_ptr<std::promise<void>> promise = std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>(); std::make_shared<std::promise<void>>();
WorkItem item{ WorkItem item = new std::function<void(void)>([=]() {
.fn = new std::function<void(void)>([=]() {
std::invoke(fn); std::invoke(fn);
promise->set_value(); promise->set_value();
}), });
.quit = false,
};
xQueueSend(queue_, &item, portMAX_DELAY); xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future(); return promise->get_future();
} }

@ -48,8 +48,6 @@ template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>; auto AllocateStack() -> cpp::span<StackType_t>;
template <Type t> template <Type t>
auto Priority() -> UBaseType_t; auto Priority() -> UBaseType_t;
template <Type t>
auto WorkerQueueSize() -> std::size_t;
auto PersistentMain(void* fn) -> void; auto PersistentMain(void* fn) -> void;
@ -74,32 +72,15 @@ auto StartPersistent(BaseType_t core, const std::function<void(void)>& fn)
Priority<t>(), stack.data(), task_buffer, core); Priority<t>(), stack.data(), task_buffer, core);
} }
class Worker { class WorkerPool {
private: private:
Worker(const std::pmr::string& name,
cpp::span<StackType_t> stack,
std::size_t queue_size,
UBaseType_t priority);
StackType_t* stack_;
QueueHandle_t queue_; QueueHandle_t queue_;
std::atomic<bool> is_task_running_; using WorkItem = std::function<void(void)>*;
StaticTask_t *task_buffer_; static auto Main(void* instance);
TaskHandle_t task_;
struct WorkItem {
std::function<void(void)>* fn;
bool quit;
};
public: public:
template <Type t> WorkerPool();
static auto Start() -> Worker* { ~WorkerPool();
return new Worker(Name<t>(), AllocateStack<t>(), WorkerQueueSize<t>(),
Priority<t>());
}
static auto Main(void* instance);
/* /*
* Schedules the given function to be executed on the worker task, and * Schedules the given function to be executed on the worker task, and
@ -109,22 +90,19 @@ class Worker {
auto Dispatch(const std::function<T(void)> fn) -> std::future<T> { auto Dispatch(const std::function<T(void)> fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise = std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>(); std::make_shared<std::promise<T>>();
WorkItem item{ WorkItem item =
.fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }), new std::function([=]() { promise->set_value(std::invoke(fn)); });
.quit = false,
};
xQueueSend(queue_, &item, portMAX_DELAY); xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future(); return promise->get_future();
} }
~Worker(); WorkerPool(const WorkerPool&) = delete;
WorkerPool& operator=(const WorkerPool&) = delete;
Worker(const Worker&) = delete;
Worker& operator=(const Worker&) = delete;
}; };
/* Specialisation of Evaluate for functions that return nothing. */ /* Specialisation of Evaluate for functions that return nothing. */
template <> template <>
auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void>; auto WorkerPool::Dispatch(const std::function<void(void)> fn)
-> std::future<void>;
} // namespace tasks } // namespace tasks

Loading…
Cancel
Save