From 5ac4d3949cd7430e0d4c994bbc528e8e4fa91337 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Mon, 22 May 2023 15:23:51 +1000 Subject: [PATCH] Generalise worker tasks, and centralise task priorities + stacks Includes making the display driver use a worker task for flushes, so that our double buffering actually does something useful /facepalm --- src/audio/audio_task.cpp | 236 +++++++++++++----------------- src/audio/include/audio_task.hpp | 14 -- src/database/CMakeLists.txt | 4 +- src/database/database.cpp | 88 +++++------ src/database/db_task.cpp | 91 ------------ src/database/env_esp.cpp | 11 +- src/database/include/database.hpp | 6 +- src/database/include/db_task.hpp | 25 ---- src/database/include/env_esp.hpp | 5 + src/drivers/CMakeLists.txt | 2 +- src/drivers/display.cpp | 82 ++++++++--- src/drivers/include/display.hpp | 3 + src/main/main.cpp | 4 +- src/tasks/CMakeLists.txt | 2 +- src/tasks/tasks.cpp | 205 +++++++++++++++++++++++++- src/tasks/tasks.hpp | 106 +++++++++++++- src/ui/include/lvgl_task.hpp | 4 +- src/ui/lvgl_task.cpp | 44 +----- 18 files changed, 542 insertions(+), 390 deletions(-) delete mode 100644 src/database/db_task.cpp delete mode 100644 src/database/include/db_task.hpp diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index b2a8062e..f0128bf3 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -35,157 +35,119 @@ namespace audio { namespace task { static const char* kTag = "task"; -static const std::size_t kStackSize = 24 * 1024; -static const std::size_t kDrainStackSize = 1024; -auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void { - // Newly created task will free this. - AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink}; +void AudioTaskMain(std::unique_ptr pipeline, IAudioSink* sink) { + std::optional output_format; + + std::vector elements = pipeline->GetIterationOrder(); + std::size_t max_inputs = + (*std::max_element(elements.begin(), elements.end(), + [](Pipeline const* first, Pipeline const* second) { + return first->NumInputs() < second->NumInputs(); + })) + ->NumInputs(); + + // We need to be able to simultaneously map all of an element's inputs, plus + // its output. So preallocate that many ranges. + std::vector> in_regions(max_inputs); + MappableRegion out_region; + std::for_each(in_regions.begin(), in_regions.end(), + [](const auto& region) { assert(region.is_valid); }); + assert(out_region.is_valid); + + // Each element has exactly one output buffer. + std::vector> buffers(elements.size()); + std::vector buffer_infos(buffers.size()); + std::for_each(buffers.begin(), buffers.end(), + [](const HimemAlloc& alloc) { + assert(alloc.is_valid); + }); + + while (1) { + for (int i = 0; i < elements.size(); i++) { + std::vector raw_in_streams; + elements.at(i)->InStreams(&in_regions, &raw_in_streams); + RawStream raw_out_stream = elements.at(i)->OutStream(&out_region); + + // Crop the input and output streams to the ranges that are safe to + // touch. For the input streams, this is the region that contains + // data. For the output stream, this is the region that does *not* + // already contain data. + std::vector in_streams; + std::for_each(raw_in_streams.begin(), raw_in_streams.end(), + [&](RawStream& s) { in_streams.emplace_back(&s); }); + OutputStream out_stream(&raw_out_stream); + + elements.at(i)->OutputElement()->Process(in_streams, &out_stream); + + std::for_each(in_regions.begin(), in_regions.end(), + [](auto&& r) { r.Unmap(); }); + out_region.Unmap(); + } - ESP_LOGI(kTag, "starting audio pipeline task"); - xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args, - kTaskPriorityAudioPipeline, NULL, 1); -} + RawStream raw_sink_stream = elements.front()->OutStream(&out_region); + InputStream sink_stream(&raw_sink_stream); -auto StartDrain(IAudioSink* sink) -> void { - auto command = new std::atomic(PLAY); - // Newly created task will free this. - AudioDrainArgs* drain_args = new AudioDrainArgs{ - .sink = sink, - .command = command, - }; + if (sink_stream.info().bytes_in_stream == 0) { + out_region.Unmap(); + vTaskDelay(pdMS_TO_TICKS(100)); + continue; + } - ESP_LOGI(kTag, "starting audio drain task"); - xTaskCreate(&AudioDrainMain, "drain", kDrainStackSize, drain_args, - kTaskPriorityAudioDrain, NULL); -} + if (!output_format || output_format != sink_stream.info().format) { + // The format of the stream within the sink stream has changed. We + // need to reconfigure the sink, but shouldn't do so until we've fully + // drained the current buffer. + if (xStreamBufferIsEmpty(sink->buffer())) { + ESP_LOGI(kTag, "reconfiguring dac"); + output_format = sink_stream.info().format; + sink->Configure(*output_format); + } + } -void AudioTaskMain(void* args) { - // Nest the body within an additional scope to ensure that destructors are - // called before the task quits. - { - AudioTaskArgs* real_args = reinterpret_cast(args); - std::unique_ptr pipeline(real_args->pipeline); - IAudioSink* sink = real_args->sink; - delete real_args; - - std::optional output_format; - - std::vector elements = pipeline->GetIterationOrder(); - std::size_t max_inputs = - (*std::max_element(elements.begin(), elements.end(), - [](Pipeline const* first, Pipeline const* second) { - return first->NumInputs() < second->NumInputs(); - })) - ->NumInputs(); - - // We need to be able to simultaneously map all of an element's inputs, plus - // its output. So preallocate that many ranges. - std::vector> in_regions(max_inputs); - MappableRegion out_region; - std::for_each(in_regions.begin(), in_regions.end(), - [](const auto& region) { assert(region.is_valid); }); - assert(out_region.is_valid); - - // Each element has exactly one output buffer. - std::vector> buffers(elements.size()); - std::vector buffer_infos(buffers.size()); - std::for_each(buffers.begin(), buffers.end(), - [](const HimemAlloc& alloc) { - assert(alloc.is_valid); - }); - - bool playing = true; - bool quit = false; - while (!quit) { - if (playing) { - for (int i = 0; i < elements.size(); i++) { - std::vector raw_in_streams; - elements.at(i)->InStreams(&in_regions, &raw_in_streams); - RawStream raw_out_stream = elements.at(i)->OutStream(&out_region); - - // Crop the input and output streams to the ranges that are safe to - // touch. For the input streams, this is the region that contains - // data. For the output stream, this is the region that does *not* - // already contain data. - std::vector in_streams; - std::for_each(raw_in_streams.begin(), raw_in_streams.end(), - [&](RawStream& s) { in_streams.emplace_back(&s); }); - OutputStream out_stream(&raw_out_stream); - - elements.at(i)->OutputElement()->Process(in_streams, &out_stream); - - std::for_each(in_regions.begin(), in_regions.end(), - [](auto&& r) { r.Unmap(); }); - out_region.Unmap(); - } - - RawStream raw_sink_stream = elements.front()->OutStream(&out_region); - InputStream sink_stream(&raw_sink_stream); - - if (sink_stream.info().bytes_in_stream == 0) { - out_region.Unmap(); - vTaskDelay(pdMS_TO_TICKS(100)); - continue; - } - - if (!output_format || output_format != sink_stream.info().format) { - // The format of the stream within the sink stream has changed. We - // need to reconfigure the sink, but shouldn't do so until we've fully - // drained the current buffer. - if (xStreamBufferIsEmpty(sink->buffer())) { - ESP_LOGI(kTag, "reconfiguring dac"); - output_format = sink_stream.info().format; - sink->Configure(*output_format); - } - } - - // We've reconfigured the sink, or it was already configured correctly. - // Send through some data. - if (output_format == sink_stream.info().format && - !std::holds_alternative(*output_format)) { - // TODO: tune the delay on this, as it's currently the only way to - // throttle this task's CPU time. Maybe also hold off on the pipeline - // if the buffer is already close to full? - std::size_t sent = xStreamBufferSend( - sink->buffer(), sink_stream.data().data(), - sink_stream.data().size_bytes(), pdMS_TO_TICKS(10)); - if (sent > 0) { - ESP_LOGI(kTag, "sunk %u bytes out of %u (%d %%)", sent, - sink_stream.info().bytes_in_stream, - (int)(((float)sent / - (float)sink_stream.info().bytes_in_stream) * - 100)); - } - sink_stream.consume(sent); - } - - out_region.Unmap(); + // We've reconfigured the sink, or it was already configured correctly. + // Send through some data. + if (output_format == sink_stream.info().format && + !std::holds_alternative(*output_format)) { + std::size_t sent = + xStreamBufferSend(sink->buffer(), sink_stream.data().data(), + sink_stream.data().size_bytes(), 0); + if (sent > 0) { + ESP_LOGI( + kTag, "sunk %u bytes out of %u (%d %%)", sent, + sink_stream.info().bytes_in_stream, + (int)(((float)sent / (float)sink_stream.info().bytes_in_stream) * + 100)); } + sink_stream.consume(sent); } + + out_region.Unmap(); } - vTaskDelete(NULL); } static std::byte sDrainBuf[8 * 1024]; -void AudioDrainMain(void* args) { - { - AudioDrainArgs* real_args = reinterpret_cast(args); - IAudioSink* sink = real_args->sink; - std::atomic* command = real_args->command; - delete real_args; - - // TODO(jacqueline): implement PAUSE without busy-waiting. - while (*command != QUIT) { - std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf, - sizeof(sDrainBuf), portMAX_DELAY); - if (len > 0) { - sink->Send({sDrainBuf, len}); - } +void AudioDrainMain(IAudioSink* sink) { + while (1) { + std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf, + sizeof(sDrainBuf), portMAX_DELAY); + if (len > 0) { + sink->Send({sDrainBuf, len}); } } - vTaskDelete(NULL); +} + +auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void { + ESP_LOGI(kTag, "starting audio pipeline task"); + tasks::StartPersistent( + [=]() { AudioTaskMain(std::unique_ptr(pipeline), sink); }); +} + +auto StartDrain(IAudioSink* sink) -> void { + ESP_LOGI(kTag, "starting audio drain task"); + tasks::StartPersistent( + [=]() { AudioDrainMain(sink); }); } } // namespace task diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp index a7b7a0fa..8269c8d4 100644 --- a/src/audio/include/audio_task.hpp +++ b/src/audio/include/audio_task.hpp @@ -15,20 +15,6 @@ namespace audio { namespace task { -enum Command { PLAY, PAUSE, QUIT }; - -struct AudioTaskArgs { - Pipeline* pipeline; - IAudioSink* sink; -}; -struct AudioDrainArgs { - IAudioSink* sink; - std::atomic* command; -}; - -extern "C" void AudioTaskMain(void* args); -extern "C" void AudioDrainMain(void* args); - auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void; auto StartDrain(IAudioSink* sink) -> void; diff --git a/src/database/CMakeLists.txt b/src/database/CMakeLists.txt index 897bf029..c769fa33 100644 --- a/src/database/CMakeLists.txt +++ b/src/database/CMakeLists.txt @@ -1,7 +1,7 @@ idf_component_register( - SRCS "env_esp.cpp" "database.cpp" "song.cpp" "db_task.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp" + SRCS "env_esp.cpp" "database.cpp" "song.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp" INCLUDE_DIRS "include" - REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor") + REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor" "tasks") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/database/database.cpp b/src/database/database.cpp index f5fe5240..65a500d9 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -18,13 +18,13 @@ #include "leveldb/slice.h" #include "leveldb/write_batch.h" -#include "db_task.hpp" #include "env_esp.hpp" #include "file_gatherer.hpp" #include "records.hpp" #include "result.hpp" #include "song.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" namespace database { @@ -62,33 +62,33 @@ auto Database::Open(IFileGatherer* gatherer, ITagParser* parser) return cpp::fail(DatabaseError::ALREADY_OPEN); } - if (!StartDbTask()) { - return cpp::fail(DatabaseError::ALREADY_OPEN); - } - - return RunOnDbTask>( - [=]() -> cpp::result { - leveldb::DB* db; - leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); - leveldb::Options options; - options.env = sEnv.env(); - options.create_if_missing = true; - options.write_buffer_size = 48 * 1024; - options.max_file_size = 32; - options.block_cache = cache; - options.block_size = 512; - - auto status = leveldb::DB::Open(options, "/.db", &db); - if (!status.ok()) { - delete cache; - ESP_LOGE(kTag, "failed to open db, status %s", - status.ToString().c_str()); - return cpp::fail(FAILED_TO_OPEN); - } - - ESP_LOGI(kTag, "Database opened successfully"); - return new Database(db, cache, gatherer, parser); - }) + std::shared_ptr worker( + tasks::Worker::Start()); + leveldb::sBackgroundThread = std::weak_ptr(worker); + return worker + ->Dispatch>( + [&]() -> cpp::result { + leveldb::DB* db; + leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); + leveldb::Options options; + options.env = sEnv.env(); + options.create_if_missing = true; + options.write_buffer_size = 48 * 1024; + options.max_file_size = 32; + options.block_cache = cache; + options.block_size = 512; + + auto status = leveldb::DB::Open(options, "/.db", &db); + if (!status.ok()) { + delete cache; + ESP_LOGE(kTag, "failed to open db, status %s", + status.ToString().c_str()); + return cpp::fail(FAILED_TO_OPEN); + } + + ESP_LOGI(kTag, "Database opened successfully"); + return new Database(db, cache, gatherer, parser, worker); + }) .get(); } @@ -101,9 +101,11 @@ auto Database::Destroy() -> void { Database::Database(leveldb::DB* db, leveldb::Cache* cache, IFileGatherer* file_gatherer, - ITagParser* tag_parser) + ITagParser* tag_parser, + std::shared_ptr worker) : db_(db), cache_(cache), + worker_task_(worker), file_gatherer_(file_gatherer), tag_parser_(tag_parser) {} @@ -113,12 +115,13 @@ Database::~Database() { delete db_; delete cache_; - QuitDbTask(); + leveldb::sBackgroundThread = std::weak_ptr(); + sIsDbOpen.store(false); } auto Database::Update() -> std::future { - return RunOnDbTask([&]() -> void { + return worker_task_->Dispatch([&]() -> void { // Stage 1: verify all existing songs are still valid. ESP_LOGI(kTag, "verifying existing songs"); const leveldb::Snapshot* snapshot = db_->GetSnapshot(); @@ -219,7 +222,7 @@ auto Database::Update() -> std::future { } auto Database::GetSongs(std::size_t page_size) -> std::future*> { - return RunOnDbTask*>([=, this]() -> Result* { + return worker_task_->Dispatch*>([=, this]() -> Result* { Continuation c{.iterator = nullptr, .prefix = CreateDataPrefix().data, .start_key = CreateDataPrefix().data, @@ -232,21 +235,22 @@ auto Database::GetSongs(std::size_t page_size) -> std::future*> { auto Database::GetDump(std::size_t page_size) -> std::future*> { - return RunOnDbTask*>([=, this]() -> Result* { - Continuation c{.iterator = nullptr, - .prefix = "", - .start_key = "", - .forward = true, - .was_prev_forward = true, - .page_size = page_size}; - return dbGetPage(c); - }); + return worker_task_->Dispatch*>( + [=, this]() -> Result* { + Continuation c{.iterator = nullptr, + .prefix = "", + .start_key = "", + .forward = true, + .was_prev_forward = true, + .page_size = page_size}; + return dbGetPage(c); + }); } template auto Database::GetPage(Continuation* c) -> std::future*> { Continuation copy = *c; - return RunOnDbTask*>( + return worker_task_->Dispatch*>( [=, this]() -> Result* { return dbGetPage(copy); }); } diff --git a/src/database/db_task.cpp b/src/database/db_task.cpp deleted file mode 100644 index 5b4b34b5..00000000 --- a/src/database/db_task.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include "db_task.hpp" - -#include - -#include "esp_heap_caps.h" -#include "freertos/FreeRTOS.h" -#include "freertos/portmacro.h" -#include "freertos/projdefs.h" -#include "freertos/queue.h" -#include "freertos/task.h" - -namespace database { - -static const std::size_t kDbStackSize = 256 * 1024; -static StaticTask_t sDbStaticTask; -static StackType_t* sDbStack = nullptr; - -static std::atomic sTaskRunning(false); -static QueueHandle_t sWorkQueue; - -struct WorkItem { - std::function* fn; - bool quit; -}; - -auto SendToDbTask(std::function fn) -> void { - WorkItem item{ - .fn = new std::function(fn), - .quit = false, - }; - xQueueSend(sWorkQueue, &item, portMAX_DELAY); -} - -template <> -auto RunOnDbTask(std::function fn) -> std::future { - std::shared_ptr> promise = - std::make_shared>(); - SendToDbTask([=]() { - std::invoke(fn); - promise->set_value(); - }); - return promise->get_future(); -} - -void DatabaseTaskMain(void* args) { - while (true) { - WorkItem item; - if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) { - if (item.fn != nullptr) { - std::invoke(*item.fn); - delete item.fn; - } - if (item.quit) { - break; - } - } - } - vQueueDelete(sWorkQueue); - sTaskRunning.store(false); - vTaskDelete(NULL); -} - -auto StartDbTask() -> bool { - if (sTaskRunning.exchange(true)) { - return false; - } - if (sDbStack == nullptr) { - sDbStack = reinterpret_cast( - heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM)); - } - sWorkQueue = xQueueCreate(8, sizeof(WorkItem)); - xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack, - &sDbStaticTask); - return true; -} - -auto QuitDbTask() -> void { - if (!sTaskRunning.load()) { - return; - } - WorkItem item{ - .fn = nullptr, - .quit = true, - }; - xQueueSend(sWorkQueue, &item, portMAX_DELAY); - while (sTaskRunning.load()) { - vTaskDelay(pdMS_TO_TICKS(1)); - } -} - -} // namespace database diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp index 71d4fcea..3bc68984 100644 --- a/src/database/env_esp.cpp +++ b/src/database/env_esp.cpp @@ -29,10 +29,12 @@ #include "leveldb/slice.h" #include "leveldb/status.h" -#include "db_task.hpp" +#include "tasks.hpp" namespace leveldb { +std::weak_ptr sBackgroundThread; + std::string ErrToStr(FRESULT err) { switch (err) { case FR_OK: @@ -455,8 +457,11 @@ EspEnv::EspEnv() {} void EspEnv::Schedule( void (*background_work_function)(void* background_work_arg), void* background_work_arg) { - database::SendToDbTask( - [=]() { std::invoke(background_work_function, background_work_arg); }); + auto worker = sBackgroundThread.lock(); + if (worker) { + worker->Dispatch( + [=]() { std::invoke(background_work_function, background_work_arg); }); + } } } // namespace leveldb diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp index da0ed083..ce2ca9fe 100644 --- a/src/database/include/database.hpp +++ b/src/database/include/database.hpp @@ -19,6 +19,7 @@ #include "result.hpp" #include "song.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" namespace database { @@ -90,6 +91,8 @@ class Database { leveldb::DB* db_; leveldb::Cache* cache_; + std::shared_ptr worker_task_; + // Not owned. IFileGatherer* file_gatherer_; ITagParser* tag_parser_; @@ -97,7 +100,8 @@ class Database { Database(leveldb::DB* db, leveldb::Cache* cache, IFileGatherer* file_gatherer, - ITagParser* tag_parser); + ITagParser* tag_parser, + std::shared_ptr worker); auto dbMintNewSongId() -> SongId; auto dbEntomb(SongId song, uint64_t hash) -> void; diff --git a/src/database/include/db_task.hpp b/src/database/include/db_task.hpp deleted file mode 100644 index 39f932b0..00000000 --- a/src/database/include/db_task.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace database { - -auto StartDbTask() -> bool; -auto QuitDbTask() -> void; - -auto SendToDbTask(std::function fn) -> void; - -template -auto RunOnDbTask(std::function fn) -> std::future { - std::shared_ptr> promise = - std::make_shared>(); - SendToDbTask([=]() { promise->set_value(std::invoke(fn)); }); - return promise->get_future(); -} - -template <> -auto RunOnDbTask(std::function fn) -> std::future; - -} // namespace database diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp index cf5a20e1..6a415ce6 100644 --- a/src/database/include/env_esp.hpp +++ b/src/database/include/env_esp.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -7,8 +8,12 @@ #include "leveldb/env.h" #include "leveldb/status.h" +#include "tasks.hpp" + namespace leveldb { +extern std::weak_ptr sBackgroundThread; + // Tracks the files locked by EspEnv::LockFile(). // // We maintain a separate set instead of relying on fcntl(F_SETLK) because diff --git a/src/drivers/CMakeLists.txt b/src/drivers/CMakeLists.txt index 413e1ea0..30b002a0 100644 --- a/src/drivers/CMakeLists.txt +++ b/src/drivers/CMakeLists.txt @@ -2,5 +2,5 @@ idf_component_register( SRCS "touchwheel.cpp" "dac.cpp" "gpio_expander.cpp" "battery.cpp" "storage.cpp" "i2c.cpp" "spi.cpp" "display.cpp" "display_init.cpp" "driver_cache.cpp" "samd.cpp" INCLUDE_DIRS "include" - REQUIRES "esp_adc" "fatfs" "result" "lvgl" "span") + REQUIRES "esp_adc" "fatfs" "result" "lvgl" "span" "tasks") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/drivers/display.cpp b/src/drivers/display.cpp index f8594a5a..15c0af75 100644 --- a/src/drivers/display.cpp +++ b/src/drivers/display.cpp @@ -21,6 +21,7 @@ #include "display_init.hpp" #include "gpio_expander.hpp" #include "soc/soc.h" +#include "tasks.hpp" static const char* kTag = "DISPLAY"; @@ -49,6 +50,19 @@ static const int kDisplayBufferSize = (kDisplayWidth * kDisplayHeight) / 10; DMA_ATTR static lv_color_t sBuffer1[kDisplayBufferSize]; DMA_ATTR static lv_color_t sBuffer2[kDisplayBufferSize]; +struct RenderTaskArgs { + std::atomic* quit; + QueueHandle_t work_queue; +}; + +struct FlushArgs { + lv_disp_drv_t* driver; + const lv_area_t* area; + lv_color_t* color_map; +}; + +void RenderMain(void* raw_args); + namespace drivers { /* @@ -138,7 +152,9 @@ auto Display::create(GpioExpander* expander, } Display::Display(GpioExpander* gpio, spi_device_handle_t handle) - : gpio_(gpio), handle_(handle) {} + : gpio_(gpio), + handle_(handle), + worker_task_(tasks::Worker::Start()) {} Display::~Display() {} @@ -225,31 +241,51 @@ void Display::SendTransaction(TransactionType type, void Display::OnLvglFlush(lv_disp_drv_t* disp_drv, const lv_area_t* area, lv_color_t* color_map) { - // Ideally we want to complete a single flush as quickly as possible, so grab - // the bus for this entire transaction sequence. - spi_device_acquire_bus(handle_, portMAX_DELAY); - - // First we need to specify the rectangle of the display we're writing into. - uint16_t data[2] = {0, 0}; - - data[0] = SPI_SWAP_DATA_TX(area->x1, 16); - data[1] = SPI_SWAP_DATA_TX(area->x2, 16); - SendCommandWithData(displays::ST77XX_CASET, reinterpret_cast(data), - 4); - - data[0] = SPI_SWAP_DATA_TX(area->y1, 16); - data[1] = SPI_SWAP_DATA_TX(area->y2, 16); - SendCommandWithData(displays::ST77XX_RASET, reinterpret_cast(data), - 4); + // area is stack-allocated, so it isn't safe to reference from the flush + // thread. + lv_area_t area_copy = *area; + worker_task_->Dispatch([=, this]() { + // Ideally we want to complete a single flush as quickly as possible, so + // grab the bus for this entire transaction sequence. + spi_device_acquire_bus(handle_, portMAX_DELAY); + + // First we need to specify the rectangle of the display we're writing into. + uint16_t data[2] = {0, 0}; + + data[0] = SPI_SWAP_DATA_TX(area_copy.x1, 16); + data[1] = SPI_SWAP_DATA_TX(area_copy.x2, 16); + SendCommandWithData(displays::ST77XX_CASET, + reinterpret_cast(data), 4); + + data[0] = SPI_SWAP_DATA_TX(area_copy.y1, 16); + data[1] = SPI_SWAP_DATA_TX(area_copy.y2, 16); + SendCommandWithData(displays::ST77XX_RASET, + reinterpret_cast(data), 4); + + // Now send the pixels for this region. + uint32_t size = lv_area_get_width(area) * lv_area_get_height(area); + SendCommandWithData(displays::ST77XX_RAMWR, + reinterpret_cast(color_map), size * 2); + + spi_device_release_bus(handle_); + + lv_disp_flush_ready(&driver_); + }); +} - // Now send the pixels for this region. - uint32_t size = lv_area_get_width(area) * lv_area_get_height(area); - SendCommandWithData(displays::ST77XX_RAMWR, - reinterpret_cast(color_map), size * 2); +void RenderMain(void* raw_args) { + RenderTaskArgs* args = reinterpret_cast(raw_args); + QueueHandle_t queue = args->work_queue; + std::atomic* quit = args->quit; + delete args; - spi_device_release_bus(handle_); + while (!quit->load()) { + // TODO: flush data here! Yay speed. + } - lv_disp_flush_ready(&driver_); + vQueueDelete(queue); + delete quit; + vTaskDelete(NULL); } } // namespace drivers diff --git a/src/drivers/include/display.hpp b/src/drivers/include/display.hpp index 9e4a0224..04bdf669 100644 --- a/src/drivers/include/display.hpp +++ b/src/drivers/include/display.hpp @@ -6,6 +6,7 @@ #include "driver/spi_master.h" #include "lvgl/lvgl.h" #include "result.hpp" +#include "tasks.hpp" #include "display_init.hpp" #include "gpio_expander.hpp" @@ -37,6 +38,8 @@ class Display { GpioExpander* gpio_; spi_device_handle_t handle_; + std::unique_ptr worker_task_; + lv_disp_draw_buf_t buffers_; lv_disp_drv_t driver_; lv_disp_t* display_ = nullptr; diff --git a/src/main/main.cpp b/src/main/main.cpp index 29ac2c7f..bd56eb87 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -77,9 +77,7 @@ extern "C" void app_main(void) { std::shared_ptr touchwheel = drivers->AcquireTouchWheel(); - std::atomic lvgl_quit; - TaskHandle_t lvgl_task_handle; - ui::StartLvgl(drivers.get(), &lvgl_quit, &lvgl_task_handle); + ui::StartLvgl(drivers.get()); std::unique_ptr playback; if (storage) { diff --git a/src/tasks/CMakeLists.txt b/src/tasks/CMakeLists.txt index 0503d293..f7d7244f 100644 --- a/src/tasks/CMakeLists.txt +++ b/src/tasks/CMakeLists.txt @@ -1,2 +1,2 @@ -idf_component_register(SRCS "tasks.cpp" INCLUDE_DIRS ".") +idf_component_register(SRCS "tasks.cpp" INCLUDE_DIRS "." REQUIRES "span") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp index b9fce7ec..0d9d7881 100644 --- a/src/tasks/tasks.cpp +++ b/src/tasks/tasks.cpp @@ -1,5 +1,204 @@ #include "tasks.hpp" +#include +#include "esp_heap_caps.h" +#include "freertos/FreeRTOS.h" +#include "freertos/portmacro.h" -const UBaseType_t kTaskPriorityLvgl = 4; -const UBaseType_t kTaskPriorityAudioPipeline = 5; -const UBaseType_t kTaskPriorityAudioDrain = 6; +namespace tasks { + +template +auto Name() -> std::string; + +template <> +auto Name() -> std::string { + return "LVGL"; +} +template <> +auto Name() -> std::string { + return "DISPLAY"; +} +template <> +auto Name() -> std::string { + return "AUDIO"; +} +template <> +auto Name() -> std::string { + return "DRAIN"; +} +template <> +auto Name() -> std::string { + return "DB"; +} + +template +auto AllocateStack() -> cpp::span; + +// Decoders run on the audio task, and these sometimes require a fairly large +// amount of stack space. +template <> +auto AllocateStack() -> cpp::span { + std::size_t size = 32 * 1024; + return {static_cast(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)), + size}; +} +template <> +auto AllocateStack() -> cpp::span { + std::size_t size = 1024; + return {static_cast(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)), + size}; +} +// LVGL requires only a relatively small stack. However, it can be allocated in +// PSRAM so we give it a bit of headroom for safety. +template <> +auto AllocateStack() -> cpp::span { + std::size_t size = 16 * 1024; + return {static_cast(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)), + size}; +} +// UI flushes *must* be done from internal RAM. Thankfully, there is very little +// stack required to perform them, and the amount of stack needed is fixed. +template <> +auto AllocateStack() -> cpp::span { + std::size_t size = 1024; + return {static_cast(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)), + size}; +} +// Leveldb is designed for non-embedded use cases, where stack space isn't so +// much of a concern. It therefore uses an eye-wateringly large amount of stack. +template <> +auto AllocateStack() -> cpp::span { + std::size_t size = 256 * 1024; + return {static_cast(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)), + size}; +} + +// 2048 bytes in internal ram +// 302 KiB in external ram. + +/* + * Please keep the priorities below in descending order for better readability. + */ + +template +auto Priority() -> UBaseType_t; + +// Realtime audio is the entire point of this device, so give this task the +// highest priority. +template <> +auto Priority() -> UBaseType_t { + return 10; +} +template <> +auto Priority() -> UBaseType_t { + return 10; +} +// After audio issues, UI jank is the most noticeable kind of scheduling-induced +// slowness that the user is likely to notice or care about. Therefore we place +// this task directly below audio in terms of priority. +template <> +auto Priority() -> UBaseType_t { + return 9; +} +// UI flushing should use the same priority as the UI task, so as to maximise +// the chance of the happy case: one of our cores is writing to the screen, +// whilst the other is simultaneously preparing the next buffer to be flushed. +template <> +auto Priority() -> UBaseType_t { + return 9; +} +// Database interactions are all inherently async already, due to their +// potential for disk access. The user likely won't notice or care about a +// couple of ms extra delay due to scheduling, so give this task the lowest +// priority. +template <> +auto Priority() -> UBaseType_t { + return 8; +} + +template +auto WorkerQueueSize() -> std::size_t; + +template <> +auto WorkerQueueSize() -> std::size_t { + return 8; +} + +template <> +auto WorkerQueueSize() -> std::size_t { + return 2; +} + +auto PersistentMain(void* fn) -> void { + auto* function = reinterpret_cast*>(fn); + std::invoke(*function); + assert("persistent task quit!" == 0); + vTaskDelete(NULL); +} + +auto Worker::Main(void* instance) { + Worker* i = reinterpret_cast(instance); + while (1) { + WorkItem item; + if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) { + if (item.quit) { + break; + } else if (item.fn != nullptr) { + std::invoke(*item.fn); + delete item.fn; + } + } + } + i->is_task_running_.store(false); + i->is_task_running_.notify_all(); + // Wait for the instance's destructor to delete this task. We do this instead + // of just deleting ourselves so that it's 100% certain that it's safe to + // delete or reuse this task's stack. + while (1) { + vTaskDelay(portMAX_DELAY); + } +} + +Worker::Worker(const std::string& name, + cpp::span stack, + std::size_t queue_size, + UBaseType_t priority) + : stack_(stack.data()), + queue_(xQueueCreate(queue_size, sizeof(WorkItem))), + is_task_running_(true), + task_buffer_(), + task_(xTaskCreateStatic(&Main, + name.c_str(), + stack.size(), + this, + priority, + stack_, + &task_buffer_)) {} + +Worker::~Worker() { + WorkItem item{ + .fn = nullptr, + .quit = true, + }; + xQueueSend(queue_, &item, portMAX_DELAY); + is_task_running_.wait(true); + vTaskDelete(task_); + free(stack_); +} + +template <> +auto Worker::Dispatch(const std::function& fn) + -> std::future { + std::shared_ptr> promise = + std::make_shared>(); + WorkItem item{ + .fn = new std::function([=]() { + std::invoke(fn); + promise->set_value(); + }), + .quit = false, + }; + xQueueSend(queue_, &item, portMAX_DELAY); + return promise->get_future(); +} + +} // namespace tasks diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp index 47668aea..9f37131e 100644 --- a/src/tasks/tasks.hpp +++ b/src/tasks/tasks.hpp @@ -1,7 +1,107 @@ #pragma once +#include +#include +#include +#include +#include + +#include "freertos/FreeRTOS.h" #include "freertos/portmacro.h" +#include "freertos/projdefs.h" +#include "freertos/queue.h" +#include "freertos/task.h" +#include "span.hpp" + +namespace tasks { + +/* + * Enumeration of every task (basically a thread) started within the firmware. + * These are centralised so that it is easier to reason about the relative + * priorities of tasks, as well as the amount and location of memory allocated + * to each one. + */ +enum class Type { + // The main UI task. This runs the LVGL main loop. + kUi, + // Task for flushing graphics buffers to the display. + kUiFlush, + // The main audio pipeline task. + kAudio, + // Task for flushing PCM samples to the current output. + kAudioDrain, + // Task for running database queries. + kDatabase, +}; + +template +auto Name() -> std::string; +template +auto AllocateStack() -> cpp::span; +template +auto Priority() -> UBaseType_t; +template +auto WorkerQueueSize() -> std::size_t; + +auto PersistentMain(void* fn) -> void; + +template +auto StartPersistent(const std::function& fn) -> void { + StaticTask_t* task_buffer = new StaticTask_t; + cpp::span stack = AllocateStack(); + xTaskCreateStatic(&PersistentMain, Name().c_str(), stack.size(), + new std::function(fn), Priority(), + stack.data(), task_buffer); +} + +class Worker { + private: + Worker(const std::string& name, + cpp::span stack, + std::size_t queue_size, + UBaseType_t priority); + + StackType_t* stack_; + QueueHandle_t queue_; + std::atomic is_task_running_; + StaticTask_t task_buffer_; + TaskHandle_t task_; + + struct WorkItem { + std::function* fn; + bool quit; + }; + + public: + template + static auto Start() -> Worker* { + return new Worker(Name(), AllocateStack(), WorkerQueueSize(), + Priority()); + } + + static auto Main(void* instance); + + /* + * Schedules the given function to be executed on the worker task, and + * asynchronously returns the result as a future. + */ + template + auto Dispatch(const std::function& fn) -> std::future { + std::shared_ptr> promise = + std::make_shared>(); + WorkItem item{ + .fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }), + .quit = false, + }; + xQueueSend(queue_, &item, portMAX_DELAY); + return promise->get_future(); + } + + ~Worker(); +}; + +/* Specialisation of Evaluate for functions that return nothing. */ +template <> +auto Worker::Dispatch(const std::function& fn) -> std::future; -extern const UBaseType_t kTaskPriorityLvgl; -extern const UBaseType_t kTaskPriorityAudioPipeline; -extern const UBaseType_t kTaskPriorityAudioDrain; +} // namespace tasks diff --git a/src/ui/include/lvgl_task.hpp b/src/ui/include/lvgl_task.hpp index ca3fc771..b129e329 100644 --- a/src/ui/include/lvgl_task.hpp +++ b/src/ui/include/lvgl_task.hpp @@ -10,8 +10,6 @@ namespace ui { -auto StartLvgl(drivers::DriverCache* drivers, - std::atomic* quit, - TaskHandle_t* handle) -> bool; +auto StartLvgl(drivers::DriverCache* drivers) -> void; } // namespace ui diff --git a/src/ui/lvgl_task.cpp b/src/ui/lvgl_task.cpp index 12dfd34e..0e96cd41 100644 --- a/src/ui/lvgl_task.cpp +++ b/src/ui/lvgl_task.cpp @@ -23,6 +23,7 @@ #include "misc/lv_color.h" #include "misc/lv_style.h" #include "misc/lv_timer.h" +#include "tasks.hpp" #include "widgets/lv_label.h" #include "display.hpp" @@ -37,24 +38,13 @@ auto tick_hook(TimerHandle_t xTimer) -> void { lv_tick_inc(1); } -struct LvglArgs { - drivers::DriverCache* drivers; - std::atomic* quit; -}; - -void LvglMain(void* voidArgs) { - LvglArgs* args = reinterpret_cast(voidArgs); - drivers::DriverCache* drivers = args->drivers; - std::atomic* quit = args->quit; - delete args; - +void LvglMain(drivers::DriverCache* drivers) { { ESP_LOGI(kTag, "init lvgl"); lv_init(); // LVGL has been initialised, so we can now start reporting ticks to it. - TimerHandle_t tick_timer = - xTimerCreate("lv_tick", pdMS_TO_TICKS(1), pdTRUE, NULL, &tick_hook); + xTimerCreate("lv_tick", pdMS_TO_TICKS(1), pdTRUE, NULL, &tick_hook); ESP_LOGI(kTag, "init display"); std::shared_ptr display = drivers->AcquireDisplay(); @@ -72,37 +62,15 @@ void LvglMain(void* voidArgs) { lv_obj_center(label); lv_scr_load(label); - while (!quit->load()) { + while (1) { lv_timer_handler(); vTaskDelay(pdMS_TO_TICKS(10)); } - - // TODO(robin? daniel?): De-init the UI stack here. - lv_obj_del(label); - lv_style_reset(&style); - - xTimerDelete(tick_timer, portMAX_DELAY); - - lv_deinit(); } - - vTaskDelete(NULL); } -static const size_t kLvglStackSize = 8 * 1024; -static StaticTask_t sLvglTaskBuffer = {}; -static StackType_t sLvglStack[kLvglStackSize] = {0}; - -auto StartLvgl(drivers::DriverCache* drivers, - std::atomic* quit, - TaskHandle_t* handle) -> bool { - LvglArgs* args = new LvglArgs(); - args->drivers = drivers; - args->quit = quit; - - return xTaskCreateStaticPinnedToCore(&LvglMain, "LVGL", kLvglStackSize, - reinterpret_cast(args), 1, - sLvglStack, &sLvglTaskBuffer, 1); +auto StartLvgl(drivers::DriverCache* drivers) -> void { + tasks::StartPersistent([=]() { LvglMain(drivers); }); } } // namespace ui