Generalise worker tasks, and centralise task priorities + stacks

Includes making the display driver use a worker task for flushes, so
that our double buffering actually does something useful /facepalm
custom
jacqueline 2 years ago
parent b320a6a863
commit 5ac4d3949c
  1. 80
      src/audio/audio_task.cpp
  2. 14
      src/audio/include/audio_task.hpp
  3. 4
      src/database/CMakeLists.txt
  4. 32
      src/database/database.cpp
  5. 91
      src/database/db_task.cpp
  6. 9
      src/database/env_esp.cpp
  7. 6
      src/database/include/database.hpp
  8. 25
      src/database/include/db_task.hpp
  9. 5
      src/database/include/env_esp.hpp
  10. 2
      src/drivers/CMakeLists.txt
  11. 58
      src/drivers/display.cpp
  12. 3
      src/drivers/include/display.hpp
  13. 4
      src/main/main.cpp
  14. 2
      src/tasks/CMakeLists.txt
  15. 205
      src/tasks/tasks.cpp
  16. 106
      src/tasks/tasks.hpp
  17. 4
      src/ui/include/lvgl_task.hpp
  18. 42
      src/ui/lvgl_task.cpp

@ -35,40 +35,8 @@ namespace audio {
namespace task { namespace task {
static const char* kTag = "task"; static const char* kTag = "task";
static const std::size_t kStackSize = 24 * 1024;
static const std::size_t kDrainStackSize = 1024;
auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
// Newly created task will free this.
AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink};
ESP_LOGI(kTag, "starting audio pipeline task");
xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
kTaskPriorityAudioPipeline, NULL, 1);
}
auto StartDrain(IAudioSink* sink) -> void {
auto command = new std::atomic<Command>(PLAY);
// Newly created task will free this.
AudioDrainArgs* drain_args = new AudioDrainArgs{
.sink = sink,
.command = command,
};
ESP_LOGI(kTag, "starting audio drain task");
xTaskCreate(&AudioDrainMain, "drain", kDrainStackSize, drain_args,
kTaskPriorityAudioDrain, NULL);
}
void AudioTaskMain(void* args) {
// Nest the body within an additional scope to ensure that destructors are
// called before the task quits.
{
AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
IAudioSink* sink = real_args->sink;
delete real_args;
void AudioTaskMain(std::unique_ptr<Pipeline> pipeline, IAudioSink* sink) {
std::optional<StreamInfo::Format> output_format; std::optional<StreamInfo::Format> output_format;
std::vector<Pipeline*> elements = pipeline->GetIterationOrder(); std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
@ -95,10 +63,7 @@ void AudioTaskMain(void* args) {
assert(alloc.is_valid); assert(alloc.is_valid);
}); });
bool playing = true; while (1) {
bool quit = false;
while (!quit) {
if (playing) {
for (int i = 0; i < elements.size(); i++) { for (int i = 0; i < elements.size(); i++) {
std::vector<RawStream> raw_in_streams; std::vector<RawStream> raw_in_streams;
elements.at(i)->InStreams(&in_regions, &raw_in_streams); elements.at(i)->InStreams(&in_regions, &raw_in_streams);
@ -144,17 +109,14 @@ void AudioTaskMain(void* args) {
// Send through some data. // Send through some data.
if (output_format == sink_stream.info().format && if (output_format == sink_stream.info().format &&
!std::holds_alternative<std::monostate>(*output_format)) { !std::holds_alternative<std::monostate>(*output_format)) {
// TODO: tune the delay on this, as it's currently the only way to std::size_t sent =
// throttle this task's CPU time. Maybe also hold off on the pipeline xStreamBufferSend(sink->buffer(), sink_stream.data().data(),
// if the buffer is already close to full? sink_stream.data().size_bytes(), 0);
std::size_t sent = xStreamBufferSend(
sink->buffer(), sink_stream.data().data(),
sink_stream.data().size_bytes(), pdMS_TO_TICKS(10));
if (sent > 0) { if (sent > 0) {
ESP_LOGI(kTag, "sunk %u bytes out of %u (%d %%)", sent, ESP_LOGI(
kTag, "sunk %u bytes out of %u (%d %%)", sent,
sink_stream.info().bytes_in_stream, sink_stream.info().bytes_in_stream,
(int)(((float)sent / (int)(((float)sent / (float)sink_stream.info().bytes_in_stream) *
(float)sink_stream.info().bytes_in_stream) *
100)); 100));
} }
sink_stream.consume(sent); sink_stream.consume(sent);
@ -163,21 +125,11 @@ void AudioTaskMain(void* args) {
out_region.Unmap(); out_region.Unmap();
} }
} }
}
vTaskDelete(NULL);
}
static std::byte sDrainBuf[8 * 1024]; static std::byte sDrainBuf[8 * 1024];
void AudioDrainMain(void* args) { void AudioDrainMain(IAudioSink* sink) {
{ while (1) {
AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args);
IAudioSink* sink = real_args->sink;
std::atomic<Command>* command = real_args->command;
delete real_args;
// TODO(jacqueline): implement PAUSE without busy-waiting.
while (*command != QUIT) {
std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf, std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf,
sizeof(sDrainBuf), portMAX_DELAY); sizeof(sDrainBuf), portMAX_DELAY);
if (len > 0) { if (len > 0) {
@ -185,7 +137,17 @@ void AudioDrainMain(void* args) {
} }
} }
} }
vTaskDelete(NULL);
auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
ESP_LOGI(kTag, "starting audio pipeline task");
tasks::StartPersistent<tasks::Type::kAudio>(
[=]() { AudioTaskMain(std::unique_ptr<Pipeline>(pipeline), sink); });
}
auto StartDrain(IAudioSink* sink) -> void {
ESP_LOGI(kTag, "starting audio drain task");
tasks::StartPersistent<tasks::Type::kAudioDrain>(
[=]() { AudioDrainMain(sink); });
} }
} // namespace task } // namespace task

@ -15,20 +15,6 @@ namespace audio {
namespace task { namespace task {
enum Command { PLAY, PAUSE, QUIT };
struct AudioTaskArgs {
Pipeline* pipeline;
IAudioSink* sink;
};
struct AudioDrainArgs {
IAudioSink* sink;
std::atomic<Command>* command;
};
extern "C" void AudioTaskMain(void* args);
extern "C" void AudioDrainMain(void* args);
auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void; auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void;
auto StartDrain(IAudioSink* sink) -> void; auto StartDrain(IAudioSink* sink) -> void;

@ -1,7 +1,7 @@
idf_component_register( idf_component_register(
SRCS "env_esp.cpp" "database.cpp" "song.cpp" "db_task.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp" SRCS "env_esp.cpp" "database.cpp" "song.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp"
INCLUDE_DIRS "include" INCLUDE_DIRS "include"
REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor") REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor" "tasks")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})

@ -18,13 +18,13 @@
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "db_task.hpp"
#include "env_esp.hpp" #include "env_esp.hpp"
#include "file_gatherer.hpp" #include "file_gatherer.hpp"
#include "records.hpp" #include "records.hpp"
#include "result.hpp" #include "result.hpp"
#include "song.hpp" #include "song.hpp"
#include "tag_parser.hpp" #include "tag_parser.hpp"
#include "tasks.hpp"
namespace database { namespace database {
@ -62,12 +62,12 @@ auto Database::Open(IFileGatherer* gatherer, ITagParser* parser)
return cpp::fail(DatabaseError::ALREADY_OPEN); return cpp::fail(DatabaseError::ALREADY_OPEN);
} }
if (!StartDbTask()) { std::shared_ptr<tasks::Worker> worker(
return cpp::fail(DatabaseError::ALREADY_OPEN); tasks::Worker::Start<tasks::Type::kDatabase>());
} leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>(worker);
return worker
return RunOnDbTask<cpp::result<Database*, DatabaseError>>( ->Dispatch<cpp::result<Database*, DatabaseError>>(
[=]() -> cpp::result<Database*, DatabaseError> { [&]() -> cpp::result<Database*, DatabaseError> {
leveldb::DB* db; leveldb::DB* db;
leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
leveldb::Options options; leveldb::Options options;
@ -87,7 +87,7 @@ auto Database::Open(IFileGatherer* gatherer, ITagParser* parser)
} }
ESP_LOGI(kTag, "Database opened successfully"); ESP_LOGI(kTag, "Database opened successfully");
return new Database(db, cache, gatherer, parser); return new Database(db, cache, gatherer, parser, worker);
}) })
.get(); .get();
} }
@ -101,9 +101,11 @@ auto Database::Destroy() -> void {
Database::Database(leveldb::DB* db, Database::Database(leveldb::DB* db,
leveldb::Cache* cache, leveldb::Cache* cache,
IFileGatherer* file_gatherer, IFileGatherer* file_gatherer,
ITagParser* tag_parser) ITagParser* tag_parser,
std::shared_ptr<tasks::Worker> worker)
: db_(db), : db_(db),
cache_(cache), cache_(cache),
worker_task_(worker),
file_gatherer_(file_gatherer), file_gatherer_(file_gatherer),
tag_parser_(tag_parser) {} tag_parser_(tag_parser) {}
@ -113,12 +115,13 @@ Database::~Database() {
delete db_; delete db_;
delete cache_; delete cache_;
QuitDbTask(); leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>();
sIsDbOpen.store(false); sIsDbOpen.store(false);
} }
auto Database::Update() -> std::future<void> { auto Database::Update() -> std::future<void> {
return RunOnDbTask<void>([&]() -> void { return worker_task_->Dispatch<void>([&]() -> void {
// Stage 1: verify all existing songs are still valid. // Stage 1: verify all existing songs are still valid.
ESP_LOGI(kTag, "verifying existing songs"); ESP_LOGI(kTag, "verifying existing songs");
const leveldb::Snapshot* snapshot = db_->GetSnapshot(); const leveldb::Snapshot* snapshot = db_->GetSnapshot();
@ -219,7 +222,7 @@ auto Database::Update() -> std::future<void> {
} }
auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> { auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> {
return RunOnDbTask<Result<Song>*>([=, this]() -> Result<Song>* { return worker_task_->Dispatch<Result<Song>*>([=, this]() -> Result<Song>* {
Continuation<Song> c{.iterator = nullptr, Continuation<Song> c{.iterator = nullptr,
.prefix = CreateDataPrefix().data, .prefix = CreateDataPrefix().data,
.start_key = CreateDataPrefix().data, .start_key = CreateDataPrefix().data,
@ -232,7 +235,8 @@ auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> {
auto Database::GetDump(std::size_t page_size) auto Database::GetDump(std::size_t page_size)
-> std::future<Result<std::string>*> { -> std::future<Result<std::string>*> {
return RunOnDbTask<Result<std::string>*>([=, this]() -> Result<std::string>* { return worker_task_->Dispatch<Result<std::string>*>(
[=, this]() -> Result<std::string>* {
Continuation<std::string> c{.iterator = nullptr, Continuation<std::string> c{.iterator = nullptr,
.prefix = "", .prefix = "",
.start_key = "", .start_key = "",
@ -246,7 +250,7 @@ auto Database::GetDump(std::size_t page_size)
template <typename T> template <typename T>
auto Database::GetPage(Continuation<T>* c) -> std::future<Result<T>*> { auto Database::GetPage(Continuation<T>* c) -> std::future<Result<T>*> {
Continuation<T> copy = *c; Continuation<T> copy = *c;
return RunOnDbTask<Result<T>*>( return worker_task_->Dispatch<Result<T>*>(
[=, this]() -> Result<T>* { return dbGetPage(copy); }); [=, this]() -> Result<T>* { return dbGetPage(copy); });
} }

@ -1,91 +0,0 @@
#include "db_task.hpp"
#include <functional>
#include "esp_heap_caps.h"
#include "freertos/FreeRTOS.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
#include "freertos/task.h"
namespace database {
static const std::size_t kDbStackSize = 256 * 1024;
static StaticTask_t sDbStaticTask;
static StackType_t* sDbStack = nullptr;
static std::atomic<bool> sTaskRunning(false);
static QueueHandle_t sWorkQueue;
struct WorkItem {
std::function<void(void)>* fn;
bool quit;
};
auto SendToDbTask(std::function<void(void)> fn) -> void {
WorkItem item{
.fn = new std::function<void(void)>(fn),
.quit = false,
};
xQueueSend(sWorkQueue, &item, portMAX_DELAY);
}
template <>
auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
SendToDbTask([=]() {
std::invoke(fn);
promise->set_value();
});
return promise->get_future();
}
void DatabaseTaskMain(void* args) {
while (true) {
WorkItem item;
if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) {
if (item.fn != nullptr) {
std::invoke(*item.fn);
delete item.fn;
}
if (item.quit) {
break;
}
}
}
vQueueDelete(sWorkQueue);
sTaskRunning.store(false);
vTaskDelete(NULL);
}
auto StartDbTask() -> bool {
if (sTaskRunning.exchange(true)) {
return false;
}
if (sDbStack == nullptr) {
sDbStack = reinterpret_cast<StackType_t*>(
heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM));
}
sWorkQueue = xQueueCreate(8, sizeof(WorkItem));
xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack,
&sDbStaticTask);
return true;
}
auto QuitDbTask() -> void {
if (!sTaskRunning.load()) {
return;
}
WorkItem item{
.fn = nullptr,
.quit = true,
};
xQueueSend(sWorkQueue, &item, portMAX_DELAY);
while (sTaskRunning.load()) {
vTaskDelay(pdMS_TO_TICKS(1));
}
}
} // namespace database

@ -29,10 +29,12 @@
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "db_task.hpp" #include "tasks.hpp"
namespace leveldb { namespace leveldb {
std::weak_ptr<tasks::Worker> sBackgroundThread;
std::string ErrToStr(FRESULT err) { std::string ErrToStr(FRESULT err) {
switch (err) { switch (err) {
case FR_OK: case FR_OK:
@ -455,8 +457,11 @@ EspEnv::EspEnv() {}
void EspEnv::Schedule( void EspEnv::Schedule(
void (*background_work_function)(void* background_work_arg), void (*background_work_function)(void* background_work_arg),
void* background_work_arg) { void* background_work_arg) {
database::SendToDbTask( auto worker = sBackgroundThread.lock();
if (worker) {
worker->Dispatch<void>(
[=]() { std::invoke(background_work_function, background_work_arg); }); [=]() { std::invoke(background_work_function, background_work_arg); });
} }
}
} // namespace leveldb } // namespace leveldb

@ -19,6 +19,7 @@
#include "result.hpp" #include "result.hpp"
#include "song.hpp" #include "song.hpp"
#include "tag_parser.hpp" #include "tag_parser.hpp"
#include "tasks.hpp"
namespace database { namespace database {
@ -90,6 +91,8 @@ class Database {
leveldb::DB* db_; leveldb::DB* db_;
leveldb::Cache* cache_; leveldb::Cache* cache_;
std::shared_ptr<tasks::Worker> worker_task_;
// Not owned. // Not owned.
IFileGatherer* file_gatherer_; IFileGatherer* file_gatherer_;
ITagParser* tag_parser_; ITagParser* tag_parser_;
@ -97,7 +100,8 @@ class Database {
Database(leveldb::DB* db, Database(leveldb::DB* db,
leveldb::Cache* cache, leveldb::Cache* cache,
IFileGatherer* file_gatherer, IFileGatherer* file_gatherer,
ITagParser* tag_parser); ITagParser* tag_parser,
std::shared_ptr<tasks::Worker> worker);
auto dbMintNewSongId() -> SongId; auto dbMintNewSongId() -> SongId;
auto dbEntomb(SongId song, uint64_t hash) -> void; auto dbEntomb(SongId song, uint64_t hash) -> void;

@ -1,25 +0,0 @@
#pragma once
#include <functional>
#include <future>
#include <memory>
namespace database {
auto StartDbTask() -> bool;
auto QuitDbTask() -> void;
auto SendToDbTask(std::function<void(void)> fn) -> void;
template <typename T>
auto RunOnDbTask(std::function<T(void)> fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>();
SendToDbTask([=]() { promise->set_value(std::invoke(fn)); });
return promise->get_future();
}
template <>
auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void>;
} // namespace database

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <memory>
#include <mutex> #include <mutex>
#include <set> #include <set>
#include <string> #include <string>
@ -7,8 +8,12 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "tasks.hpp"
namespace leveldb { namespace leveldb {
extern std::weak_ptr<tasks::Worker> sBackgroundThread;
// Tracks the files locked by EspEnv::LockFile(). // Tracks the files locked by EspEnv::LockFile().
// //
// We maintain a separate set instead of relying on fcntl(F_SETLK) because // We maintain a separate set instead of relying on fcntl(F_SETLK) because

@ -2,5 +2,5 @@ idf_component_register(
SRCS "touchwheel.cpp" "dac.cpp" "gpio_expander.cpp" "battery.cpp" "storage.cpp" "i2c.cpp" SRCS "touchwheel.cpp" "dac.cpp" "gpio_expander.cpp" "battery.cpp" "storage.cpp" "i2c.cpp"
"spi.cpp" "display.cpp" "display_init.cpp" "driver_cache.cpp" "samd.cpp" "spi.cpp" "display.cpp" "display_init.cpp" "driver_cache.cpp" "samd.cpp"
INCLUDE_DIRS "include" INCLUDE_DIRS "include"
REQUIRES "esp_adc" "fatfs" "result" "lvgl" "span") REQUIRES "esp_adc" "fatfs" "result" "lvgl" "span" "tasks")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})

@ -21,6 +21,7 @@
#include "display_init.hpp" #include "display_init.hpp"
#include "gpio_expander.hpp" #include "gpio_expander.hpp"
#include "soc/soc.h" #include "soc/soc.h"
#include "tasks.hpp"
static const char* kTag = "DISPLAY"; static const char* kTag = "DISPLAY";
@ -49,6 +50,19 @@ static const int kDisplayBufferSize = (kDisplayWidth * kDisplayHeight) / 10;
DMA_ATTR static lv_color_t sBuffer1[kDisplayBufferSize]; DMA_ATTR static lv_color_t sBuffer1[kDisplayBufferSize];
DMA_ATTR static lv_color_t sBuffer2[kDisplayBufferSize]; DMA_ATTR static lv_color_t sBuffer2[kDisplayBufferSize];
struct RenderTaskArgs {
std::atomic<bool>* quit;
QueueHandle_t work_queue;
};
struct FlushArgs {
lv_disp_drv_t* driver;
const lv_area_t* area;
lv_color_t* color_map;
};
void RenderMain(void* raw_args);
namespace drivers { namespace drivers {
/* /*
@ -138,7 +152,9 @@ auto Display::create(GpioExpander* expander,
} }
Display::Display(GpioExpander* gpio, spi_device_handle_t handle) Display::Display(GpioExpander* gpio, spi_device_handle_t handle)
: gpio_(gpio), handle_(handle) {} : gpio_(gpio),
handle_(handle),
worker_task_(tasks::Worker::Start<tasks::Type::kUiFlush>()) {}
Display::~Display() {} Display::~Display() {}
@ -225,22 +241,26 @@ void Display::SendTransaction(TransactionType type,
void Display::OnLvglFlush(lv_disp_drv_t* disp_drv, void Display::OnLvglFlush(lv_disp_drv_t* disp_drv,
const lv_area_t* area, const lv_area_t* area,
lv_color_t* color_map) { lv_color_t* color_map) {
// Ideally we want to complete a single flush as quickly as possible, so grab // area is stack-allocated, so it isn't safe to reference from the flush
// the bus for this entire transaction sequence. // thread.
lv_area_t area_copy = *area;
worker_task_->Dispatch<void>([=, this]() {
// Ideally we want to complete a single flush as quickly as possible, so
// grab the bus for this entire transaction sequence.
spi_device_acquire_bus(handle_, portMAX_DELAY); spi_device_acquire_bus(handle_, portMAX_DELAY);
// First we need to specify the rectangle of the display we're writing into. // First we need to specify the rectangle of the display we're writing into.
uint16_t data[2] = {0, 0}; uint16_t data[2] = {0, 0};
data[0] = SPI_SWAP_DATA_TX(area->x1, 16); data[0] = SPI_SWAP_DATA_TX(area_copy.x1, 16);
data[1] = SPI_SWAP_DATA_TX(area->x2, 16); data[1] = SPI_SWAP_DATA_TX(area_copy.x2, 16);
SendCommandWithData(displays::ST77XX_CASET, reinterpret_cast<uint8_t*>(data), SendCommandWithData(displays::ST77XX_CASET,
4); reinterpret_cast<uint8_t*>(data), 4);
data[0] = SPI_SWAP_DATA_TX(area->y1, 16); data[0] = SPI_SWAP_DATA_TX(area_copy.y1, 16);
data[1] = SPI_SWAP_DATA_TX(area->y2, 16); data[1] = SPI_SWAP_DATA_TX(area_copy.y2, 16);
SendCommandWithData(displays::ST77XX_RASET, reinterpret_cast<uint8_t*>(data), SendCommandWithData(displays::ST77XX_RASET,
4); reinterpret_cast<uint8_t*>(data), 4);
// Now send the pixels for this region. // Now send the pixels for this region.
uint32_t size = lv_area_get_width(area) * lv_area_get_height(area); uint32_t size = lv_area_get_width(area) * lv_area_get_height(area);
@ -250,6 +270,22 @@ void Display::OnLvglFlush(lv_disp_drv_t* disp_drv,
spi_device_release_bus(handle_); spi_device_release_bus(handle_);
lv_disp_flush_ready(&driver_); lv_disp_flush_ready(&driver_);
});
}
void RenderMain(void* raw_args) {
RenderTaskArgs* args = reinterpret_cast<RenderTaskArgs*>(raw_args);
QueueHandle_t queue = args->work_queue;
std::atomic<bool>* quit = args->quit;
delete args;
while (!quit->load()) {
// TODO: flush data here! Yay speed.
}
vQueueDelete(queue);
delete quit;
vTaskDelete(NULL);
} }
} // namespace drivers } // namespace drivers

@ -6,6 +6,7 @@
#include "driver/spi_master.h" #include "driver/spi_master.h"
#include "lvgl/lvgl.h" #include "lvgl/lvgl.h"
#include "result.hpp" #include "result.hpp"
#include "tasks.hpp"
#include "display_init.hpp" #include "display_init.hpp"
#include "gpio_expander.hpp" #include "gpio_expander.hpp"
@ -37,6 +38,8 @@ class Display {
GpioExpander* gpio_; GpioExpander* gpio_;
spi_device_handle_t handle_; spi_device_handle_t handle_;
std::unique_ptr<tasks::Worker> worker_task_;
lv_disp_draw_buf_t buffers_; lv_disp_draw_buf_t buffers_;
lv_disp_drv_t driver_; lv_disp_drv_t driver_;
lv_disp_t* display_ = nullptr; lv_disp_t* display_ = nullptr;

@ -77,9 +77,7 @@ extern "C" void app_main(void) {
std::shared_ptr<drivers::TouchWheel> touchwheel = std::shared_ptr<drivers::TouchWheel> touchwheel =
drivers->AcquireTouchWheel(); drivers->AcquireTouchWheel();
std::atomic<bool> lvgl_quit; ui::StartLvgl(drivers.get());
TaskHandle_t lvgl_task_handle;
ui::StartLvgl(drivers.get(), &lvgl_quit, &lvgl_task_handle);
std::unique_ptr<audio::AudioPlayback> playback; std::unique_ptr<audio::AudioPlayback> playback;
if (storage) { if (storage) {

@ -1,2 +1,2 @@
idf_component_register(SRCS "tasks.cpp" INCLUDE_DIRS ".") idf_component_register(SRCS "tasks.cpp" INCLUDE_DIRS "." REQUIRES "span")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})

@ -1,5 +1,204 @@
#include "tasks.hpp" #include "tasks.hpp"
#include <functional>
#include "esp_heap_caps.h"
#include "freertos/FreeRTOS.h"
#include "freertos/portmacro.h"
const UBaseType_t kTaskPriorityLvgl = 4; namespace tasks {
const UBaseType_t kTaskPriorityAudioPipeline = 5;
const UBaseType_t kTaskPriorityAudioDrain = 6; template <Type t>
auto Name() -> std::string;
template <>
auto Name<Type::kUi>() -> std::string {
return "LVGL";
}
template <>
auto Name<Type::kUiFlush>() -> std::string {
return "DISPLAY";
}
template <>
auto Name<Type::kAudio>() -> std::string {
return "AUDIO";
}
template <>
auto Name<Type::kAudioDrain>() -> std::string {
return "DRAIN";
}
template <>
auto Name<Type::kDatabase>() -> std::string {
return "DB";
}
template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>;
// Decoders run on the audio task, and these sometimes require a fairly large
// amount of stack space.
template <>
auto AllocateStack<Type::kAudio>() -> cpp::span<StackType_t> {
std::size_t size = 32 * 1024;
return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
size};
}
template <>
auto AllocateStack<Type::kAudioDrain>() -> cpp::span<StackType_t> {
std::size_t size = 1024;
return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
size};
}
// LVGL requires only a relatively small stack. However, it can be allocated in
// PSRAM so we give it a bit of headroom for safety.
template <>
auto AllocateStack<Type::kUi>() -> cpp::span<StackType_t> {
std::size_t size = 16 * 1024;
return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
size};
}
// UI flushes *must* be done from internal RAM. Thankfully, there is very little
// stack required to perform them, and the amount of stack needed is fixed.
template <>
auto AllocateStack<Type::kUiFlush>() -> cpp::span<StackType_t> {
std::size_t size = 1024;
return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
size};
}
// Leveldb is designed for non-embedded use cases, where stack space isn't so
// much of a concern. It therefore uses an eye-wateringly large amount of stack.
template <>
auto AllocateStack<Type::kDatabase>() -> cpp::span<StackType_t> {
std::size_t size = 256 * 1024;
return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)),
size};
}
// 2048 bytes in internal ram
// 302 KiB in external ram.
/*
* Please keep the priorities below in descending order for better readability.
*/
template <Type t>
auto Priority() -> UBaseType_t;
// Realtime audio is the entire point of this device, so give this task the
// highest priority.
template <>
auto Priority<Type::kAudio>() -> UBaseType_t {
return 10;
}
template <>
auto Priority<Type::kAudioDrain>() -> UBaseType_t {
return 10;
}
// After audio issues, UI jank is the most noticeable kind of scheduling-induced
// slowness that the user is likely to notice or care about. Therefore we place
// this task directly below audio in terms of priority.
template <>
auto Priority<Type::kUi>() -> UBaseType_t {
return 9;
}
// UI flushing should use the same priority as the UI task, so as to maximise
// the chance of the happy case: one of our cores is writing to the screen,
// whilst the other is simultaneously preparing the next buffer to be flushed.
template <>
auto Priority<Type::kUiFlush>() -> UBaseType_t {
return 9;
}
// Database interactions are all inherently async already, due to their
// potential for disk access. The user likely won't notice or care about a
// couple of ms extra delay due to scheduling, so give this task the lowest
// priority.
template <>
auto Priority<Type::kDatabase>() -> UBaseType_t {
return 8;
}
template <Type t>
auto WorkerQueueSize() -> std::size_t;
template <>
auto WorkerQueueSize<Type::kDatabase>() -> std::size_t {
return 8;
}
template <>
auto WorkerQueueSize<Type::kUiFlush>() -> std::size_t {
return 2;
}
auto PersistentMain(void* fn) -> void {
auto* function = reinterpret_cast<std::function<void(void)>*>(fn);
std::invoke(*function);
assert("persistent task quit!" == 0);
vTaskDelete(NULL);
}
auto Worker::Main(void* instance) {
Worker* i = reinterpret_cast<Worker*>(instance);
while (1) {
WorkItem item;
if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) {
if (item.quit) {
break;
} else if (item.fn != nullptr) {
std::invoke(*item.fn);
delete item.fn;
}
}
}
i->is_task_running_.store(false);
i->is_task_running_.notify_all();
// Wait for the instance's destructor to delete this task. We do this instead
// of just deleting ourselves so that it's 100% certain that it's safe to
// delete or reuse this task's stack.
while (1) {
vTaskDelay(portMAX_DELAY);
}
}
Worker::Worker(const std::string& name,
cpp::span<StackType_t> stack,
std::size_t queue_size,
UBaseType_t priority)
: stack_(stack.data()),
queue_(xQueueCreate(queue_size, sizeof(WorkItem))),
is_task_running_(true),
task_buffer_(),
task_(xTaskCreateStatic(&Main,
name.c_str(),
stack.size(),
this,
priority,
stack_,
&task_buffer_)) {}
Worker::~Worker() {
WorkItem item{
.fn = nullptr,
.quit = true,
};
xQueueSend(queue_, &item, portMAX_DELAY);
is_task_running_.wait(true);
vTaskDelete(task_);
free(stack_);
}
template <>
auto Worker::Dispatch(const std::function<void(void)>& fn)
-> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
WorkItem item{
.fn = new std::function<void(void)>([=]() {
std::invoke(fn);
promise->set_value();
}),
.quit = false,
};
xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future();
}
} // namespace tasks

@ -1,7 +1,107 @@
#pragma once #pragma once
#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <string>
#include "freertos/FreeRTOS.h"
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
#include "freertos/task.h"
#include "span.hpp"
namespace tasks {
/*
* Enumeration of every task (basically a thread) started within the firmware.
* These are centralised so that it is easier to reason about the relative
* priorities of tasks, as well as the amount and location of memory allocated
* to each one.
*/
enum class Type {
// The main UI task. This runs the LVGL main loop.
kUi,
// Task for flushing graphics buffers to the display.
kUiFlush,
// The main audio pipeline task.
kAudio,
// Task for flushing PCM samples to the current output.
kAudioDrain,
// Task for running database queries.
kDatabase,
};
template <Type t>
auto Name() -> std::string;
template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>;
template <Type t>
auto Priority() -> UBaseType_t;
template <Type t>
auto WorkerQueueSize() -> std::size_t;
auto PersistentMain(void* fn) -> void;
template <Type t>
auto StartPersistent(const std::function<void(void)>& fn) -> void {
StaticTask_t* task_buffer = new StaticTask_t;
cpp::span<StackType_t> stack = AllocateStack<t>();
xTaskCreateStatic(&PersistentMain, Name<t>().c_str(), stack.size(),
new std::function<void(void)>(fn), Priority<t>(),
stack.data(), task_buffer);
}
class Worker {
private:
Worker(const std::string& name,
cpp::span<StackType_t> stack,
std::size_t queue_size,
UBaseType_t priority);
StackType_t* stack_;
QueueHandle_t queue_;
std::atomic<bool> is_task_running_;
StaticTask_t task_buffer_;
TaskHandle_t task_;
struct WorkItem {
std::function<void(void)>* fn;
bool quit;
};
public:
template <Type t>
static auto Start() -> Worker* {
return new Worker(Name<t>(), AllocateStack<t>(), WorkerQueueSize<t>(),
Priority<t>());
}
static auto Main(void* instance);
/*
* Schedules the given function to be executed on the worker task, and
* asynchronously returns the result as a future.
*/
template <typename T>
auto Dispatch(const std::function<T(void)>& fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>();
WorkItem item{
.fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }),
.quit = false,
};
xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future();
}
~Worker();
};
/* Specialisation of Evaluate for functions that return nothing. */
template <>
auto Worker::Dispatch(const std::function<void(void)>& fn) -> std::future<void>;
extern const UBaseType_t kTaskPriorityLvgl; } // namespace tasks
extern const UBaseType_t kTaskPriorityAudioPipeline;
extern const UBaseType_t kTaskPriorityAudioDrain;

@ -10,8 +10,6 @@
namespace ui { namespace ui {
auto StartLvgl(drivers::DriverCache* drivers, auto StartLvgl(drivers::DriverCache* drivers) -> void;
std::atomic<bool>* quit,
TaskHandle_t* handle) -> bool;
} // namespace ui } // namespace ui

@ -23,6 +23,7 @@
#include "misc/lv_color.h" #include "misc/lv_color.h"
#include "misc/lv_style.h" #include "misc/lv_style.h"
#include "misc/lv_timer.h" #include "misc/lv_timer.h"
#include "tasks.hpp"
#include "widgets/lv_label.h" #include "widgets/lv_label.h"
#include "display.hpp" #include "display.hpp"
@ -37,23 +38,12 @@ auto tick_hook(TimerHandle_t xTimer) -> void {
lv_tick_inc(1); lv_tick_inc(1);
} }
struct LvglArgs { void LvglMain(drivers::DriverCache* drivers) {
drivers::DriverCache* drivers;
std::atomic<bool>* quit;
};
void LvglMain(void* voidArgs) {
LvglArgs* args = reinterpret_cast<LvglArgs*>(voidArgs);
drivers::DriverCache* drivers = args->drivers;
std::atomic<bool>* quit = args->quit;
delete args;
{ {
ESP_LOGI(kTag, "init lvgl"); ESP_LOGI(kTag, "init lvgl");
lv_init(); lv_init();
// LVGL has been initialised, so we can now start reporting ticks to it. // LVGL has been initialised, so we can now start reporting ticks to it.
TimerHandle_t tick_timer =
xTimerCreate("lv_tick", pdMS_TO_TICKS(1), pdTRUE, NULL, &tick_hook); xTimerCreate("lv_tick", pdMS_TO_TICKS(1), pdTRUE, NULL, &tick_hook);
ESP_LOGI(kTag, "init display"); ESP_LOGI(kTag, "init display");
@ -72,37 +62,15 @@ void LvglMain(void* voidArgs) {
lv_obj_center(label); lv_obj_center(label);
lv_scr_load(label); lv_scr_load(label);
while (!quit->load()) { while (1) {
lv_timer_handler(); lv_timer_handler();
vTaskDelay(pdMS_TO_TICKS(10)); vTaskDelay(pdMS_TO_TICKS(10));
} }
// TODO(robin? daniel?): De-init the UI stack here.
lv_obj_del(label);
lv_style_reset(&style);
xTimerDelete(tick_timer, portMAX_DELAY);
lv_deinit();
} }
vTaskDelete(NULL);
} }
static const size_t kLvglStackSize = 8 * 1024; auto StartLvgl(drivers::DriverCache* drivers) -> void {
static StaticTask_t sLvglTaskBuffer = {}; tasks::StartPersistent<tasks::Type::kUi>([=]() { LvglMain(drivers); });
static StackType_t sLvglStack[kLvglStackSize] = {0};
auto StartLvgl(drivers::DriverCache* drivers,
std::atomic<bool>* quit,
TaskHandle_t* handle) -> bool {
LvglArgs* args = new LvglArgs();
args->drivers = drivers;
args->quit = quit;
return xTaskCreateStaticPinnedToCore(&LvglMain, "LVGL", kLvglStackSize,
reinterpret_cast<void*>(args), 1,
sLvglStack, &sLvglTaskBuffer, 1);
} }
} // namespace ui } // namespace ui

Loading…
Cancel
Save