From 499d5a942fc2ad0149b0a16e978e090336dd8319 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Tue, 7 Nov 2023 10:32:07 +1100 Subject: [PATCH] Add a wrapper codec source that does readahead --- src/audio/CMakeLists.txt | 2 +- src/audio/audio_fsm.cpp | 3 +- src/audio/fatfs_audio_input.cpp | 9 +- src/audio/include/fatfs_audio_input.hpp | 4 +- src/audio/include/readahead_source.hpp | 53 +++++++++ src/audio/readahead_source.cpp | 124 +++++++++++++++++++++ src/codecs/source_buffer.cpp | 2 +- src/database/database.cpp | 7 +- src/database/env_esp.cpp | 2 +- src/database/include/database.hpp | 3 +- src/database/include/env_esp.hpp | 2 +- src/system_fsm/booting.cpp | 5 + src/system_fsm/include/service_locator.hpp | 12 ++ src/system_fsm/running.cpp | 5 +- src/tasks/tasks.cpp | 3 +- src/tasks/tasks.hpp | 4 +- 16 files changed, 221 insertions(+), 19 deletions(-) create mode 100644 src/audio/include/readahead_source.hpp create mode 100644 src/audio/readahead_source.cpp diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index df5622f5..0cf8eacd 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -5,7 +5,7 @@ idf_component_register( SRCS "audio_decoder.cpp" "fatfs_audio_input.cpp" "i2s_audio_output.cpp" "track_queue.cpp" "audio_fsm.cpp" "audio_converter.cpp" "resample.cpp" - "fatfs_source.cpp" "bt_audio_output.cpp" + "fatfs_source.cpp" "bt_audio_output.cpp" "readahead_source.cpp" INCLUDE_DIRS "include" REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm" "database" "system_fsm" "playlist" "speexdsp") diff --git a/src/audio/audio_fsm.cpp b/src/audio/audio_fsm.cpp index 09a64db1..f43d0ce2 100644 --- a/src/audio/audio_fsm.cpp +++ b/src/audio/audio_fsm.cpp @@ -105,7 +105,8 @@ void Uninitialised::react(const system_fsm::BootComplete& ev) { StreamBufferHandle_t stream = xStreamBufferCreateWithCaps( kDrainBufferSize, sizeof(sample::Sample) * 2, MALLOC_CAP_DMA); - sFileSource.reset(new FatfsAudioInput(sServices->tag_parser())); + sFileSource.reset( + new FatfsAudioInput(sServices->tag_parser(), sServices->bg_worker())); sI2SOutput.reset(new I2SAudioOutput(stream, sServices->gpios())); sBtOutput.reset(new BluetoothAudioOutput(stream, sServices->bluetooth())); diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index b919a3a8..e13ae793 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -23,6 +23,7 @@ #include "freertos/portmacro.h" #include "freertos/projdefs.h" #include "idf_additions.h" +#include "readahead_source.hpp" #include "span.hpp" #include "audio_events.hpp" @@ -42,9 +43,11 @@ namespace audio { -FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser) +FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser, + tasks::Worker& bg_worker) : IAudioSource(), tag_parser_(tag_parser), + bg_worker_(bg_worker), new_stream_mutex_(), new_stream_(), has_new_stream_(false), @@ -142,7 +145,9 @@ auto FatfsAudioInput::OpenFile(const std::pmr::string& path) -> bool { return false; } - new_stream_.reset(new FatfsSource(stream_type.value(), std::move(file))); + auto source = + std::make_unique(stream_type.value(), std::move(file)); + new_stream_.reset(new ReadaheadSource(bg_worker_, std::move(source))); return true; } diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index 08527350..9b516478 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -19,6 +19,7 @@ #include "codec.hpp" #include "future_fetcher.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" #include "types.hpp" namespace audio { @@ -30,7 +31,7 @@ namespace audio { */ class FatfsAudioInput : public IAudioSource { public: - explicit FatfsAudioInput(database::ITagParser& tag_parser); + explicit FatfsAudioInput(database::ITagParser&, tasks::Worker&); ~FatfsAudioInput(); /* @@ -54,6 +55,7 @@ class FatfsAudioInput : public IAudioSource { -> std::optional; database::ITagParser& tag_parser_; + tasks::Worker& bg_worker_; std::mutex new_stream_mutex_; std::shared_ptr new_stream_; diff --git a/src/audio/include/readahead_source.hpp b/src/audio/include/readahead_source.hpp new file mode 100644 index 00000000..dea3ff3f --- /dev/null +++ b/src/audio/include/readahead_source.hpp @@ -0,0 +1,53 @@ +/* + * Copyright 2023 jacqueline + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#pragma once + +#include +#include +#include + +#include "freertos/FreeRTOS.h" + +#include "ff.h" +#include "freertos/stream_buffer.h" + +#include "audio_source.hpp" +#include "codec.hpp" +#include "tasks.hpp" + +namespace audio { + +/* + * Wraps another stream, proactively buffering large chunks of it into memory + * at a time. + */ +class ReadaheadSource : public codecs::IStream { + public: + ReadaheadSource(tasks::Worker&, std::unique_ptr); + ~ReadaheadSource(); + + auto Read(cpp::span dest) -> ssize_t override; + + auto CanSeek() -> bool override; + + auto SeekTo(int64_t destination, SeekFrom from) -> void override; + + auto CurrentPosition() -> int64_t override; + + ReadaheadSource(const ReadaheadSource&) = delete; + ReadaheadSource& operator=(const ReadaheadSource&) = delete; + + private: + tasks::Worker& worker_; + std::unique_ptr wrapped_; + + std::atomic is_refilling_; + StreamBufferHandle_t buffer_; + int64_t tell_; +}; + +} // namespace audio \ No newline at end of file diff --git a/src/audio/readahead_source.cpp b/src/audio/readahead_source.cpp new file mode 100644 index 00000000..85922425 --- /dev/null +++ b/src/audio/readahead_source.cpp @@ -0,0 +1,124 @@ +/* + * Copyright 2023 jacqueline + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#include "readahead_source.hpp" + +#include +#include +#include + +#include "esp_heap_caps.h" +#include "esp_log.h" +#include "ff.h" + +#include "audio_source.hpp" +#include "codec.hpp" +#include "freertos/portmacro.h" +#include "idf_additions.h" +#include "spi.hpp" +#include "tasks.hpp" +#include "types.hpp" + +namespace audio { + +static constexpr char kTag[] = "readahead"; +static constexpr size_t kBufferSize = 1024 * 512; + +ReadaheadSource::ReadaheadSource(tasks::Worker& worker, + std::unique_ptr wrapped) + : IStream(wrapped->type()), + worker_(worker), + wrapped_(std::move(wrapped)), + is_refilling_(false), + buffer_(xStreamBufferCreateWithCaps(kBufferSize, 1, MALLOC_CAP_SPIRAM)), + tell_(wrapped_->CurrentPosition()) {} + +ReadaheadSource::~ReadaheadSource() { + vStreamBufferDeleteWithCaps(buffer_); +} + +auto ReadaheadSource::Read(cpp::span dest) -> ssize_t { + // Optismise for the most frequent case: the buffer already contains enough + // data for this call. + size_t bytes_read = + xStreamBufferReceive(buffer_, dest.data(), dest.size_bytes(), 0); + + tell_ += bytes_read; + if (bytes_read == dest.size_bytes()) { + return bytes_read; + } + + dest = dest.subspan(bytes_read); + + // Are we currently fetching more bytes? + ssize_t extra_bytes = 0; + if (!is_refilling_) { + // No! Pass through directly to the wrapped source for the fastest + // response. + extra_bytes = wrapped_->Read(dest); + } else { + // Yes! Wait for the refill to catch up, then try again. + is_refilling_.wait(true); + extra_bytes = + xStreamBufferReceive(buffer_, dest.data(), dest.size_bytes(), 0); + } + + // No need to check whether the dest buffer is actually filled, since at this + // point we've read as many bytes as were available. + tell_ += extra_bytes; + bytes_read += extra_bytes; + + // Before returning, make sure the readahead task is kicked off again. + ESP_LOGI(kTag, "triggering readahead"); + is_refilling_ = true; + std::function refill = [this]() { + // Try to keep larger than most reasonable FAT sector sizes for more + // efficient disk reads. + constexpr size_t kMaxSingleRead = 1024 * 64; + std::byte working_buf[kMaxSingleRead]; + for (;;) { + size_t bytes_to_read = std::min( + kMaxSingleRead, xStreamBufferSpacesAvailable(buffer_)); + if (bytes_to_read == 0) { + break; + } + size_t read = wrapped_->Read({working_buf, bytes_to_read}); + if (read > 0) { + xStreamBufferSend(buffer_, working_buf, read, 0); + } + if (read < bytes_to_read) { + break; + } + } + is_refilling_ = false; + is_refilling_.notify_all(); + }; + worker_.Dispatch(refill); + + return bytes_read; +} + +auto ReadaheadSource::CanSeek() -> bool { + return wrapped_->CanSeek(); +} + +auto ReadaheadSource::SeekTo(int64_t destination, SeekFrom from) -> void { + // Seeking blows away all of our prefetched data. To do this safely, we + // first need to wait for the refill task to finish. + is_refilling_.wait(true); + // It's now safe to clear out the buffer. + xStreamBufferReset(buffer_); + + wrapped_->SeekTo(destination, from); + + // Make sure our tell is up to date with the new location. + tell_ = wrapped_->CurrentPosition(); +} + +auto ReadaheadSource::CurrentPosition() -> int64_t { + return tell_; +} +} // namespace audio diff --git a/src/codecs/source_buffer.cpp b/src/codecs/source_buffer.cpp index 8e40ba42..bf8951f3 100644 --- a/src/codecs/source_buffer.cpp +++ b/src/codecs/source_buffer.cpp @@ -18,7 +18,7 @@ namespace codecs { [[maybe_unused]] static constexpr char kTag[] = "dec_buf"; -static constexpr size_t kBufferSize = 1024 * 128; +static constexpr size_t kBufferSize = 1024 * 16; static constexpr size_t kReadThreshold = 1024 * 8; SourceBuffer::SourceBuffer() diff --git a/src/database/database.cpp b/src/database/database.cpp index e826f576..e6cb85ed 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -117,16 +117,15 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool { auto Database::Open(IFileGatherer& gatherer, ITagParser& parser, - locale::ICollator& collator) + locale::ICollator& collator, + tasks::Worker& bg_worker) -> cpp::result { - // TODO(jacqueline): Why isn't compare_and_exchange_* available? if (sIsDbOpen.exchange(true)) { return cpp::fail(DatabaseError::ALREADY_OPEN); } if (!leveldb::sBackgroundThread) { - leveldb::sBackgroundThread.reset( - tasks::Worker::Start()); + leveldb::sBackgroundThread = &bg_worker; } std::shared_ptr worker( diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp index 8cddd09a..8cd11727 100644 --- a/src/database/env_esp.cpp +++ b/src/database/env_esp.cpp @@ -41,7 +41,7 @@ namespace leveldb { -std::shared_ptr sBackgroundThread; +tasks::Worker *sBackgroundThread = nullptr; std::string ErrToStr(FRESULT err) { switch (err) { diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp index 5eb3a8e9..fb58f3e7 100644 --- a/src/database/include/database.hpp +++ b/src/database/include/database.hpp @@ -95,7 +95,8 @@ class Database { }; static auto Open(IFileGatherer& file_gatherer, ITagParser& tag_parser, - locale::ICollator& collator) + locale::ICollator& collator, + tasks::Worker& bg_worker) -> cpp::result; static auto Destroy() -> void; diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp index eba6e8a9..dd0159b2 100644 --- a/src/database/include/env_esp.hpp +++ b/src/database/include/env_esp.hpp @@ -18,7 +18,7 @@ namespace leveldb { -extern std::shared_ptr sBackgroundThread; +extern tasks::Worker* sBackgroundThread; // Tracks the files locked by EspEnv::LockFile(). // diff --git a/src/system_fsm/booting.cpp b/src/system_fsm/booting.cpp index 893a4560..f509c52e 100644 --- a/src/system_fsm/booting.cpp +++ b/src/system_fsm/booting.cpp @@ -32,6 +32,7 @@ #include "spi.hpp" #include "system_events.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" #include "touchwheel.hpp" #include "track_queue.hpp" #include "ui_fsm.hpp" @@ -63,6 +64,10 @@ auto Booting::entry() -> void { return; } + ESP_LOGI(kTag, "starting bg worker"); + sServices->bg_worker(std::unique_ptr{ + tasks::Worker::Start()}); + ESP_LOGI(kTag, "installing remaining drivers"); sServices->samd(std::unique_ptr(drivers::Samd::Create())); vTaskDelay(pdMS_TO_TICKS(1000)); diff --git a/src/system_fsm/include/service_locator.hpp b/src/system_fsm/include/service_locator.hpp index 24dc1eb9..327d0c50 100644 --- a/src/system_fsm/include/service_locator.hpp +++ b/src/system_fsm/include/service_locator.hpp @@ -17,6 +17,7 @@ #include "samd.hpp" #include "storage.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" #include "touchwheel.hpp" #include "track_queue.hpp" @@ -111,6 +112,15 @@ class ServiceLocator { collator_ = std::move(i); } + auto bg_worker() -> tasks::Worker& { + assert(bg_worker_ != nullptr); + return *bg_worker_; + } + + auto bg_worker(std::unique_ptr w) -> void { + bg_worker_ = std::move(w); + } + // Not copyable or movable. ServiceLocator(const ServiceLocator&) = delete; ServiceLocator& operator=(const ServiceLocator&) = delete; @@ -129,6 +139,8 @@ class ServiceLocator { std::unique_ptr tag_parser_; std::unique_ptr collator_; + std::unique_ptr bg_worker_; + drivers::SdState sd_; }; diff --git a/src/system_fsm/running.cpp b/src/system_fsm/running.cpp index 91cd46af..9c556e0a 100644 --- a/src/system_fsm/running.cpp +++ b/src/system_fsm/running.cpp @@ -55,8 +55,9 @@ void Running::entry() { ESP_LOGI(kTag, "opening database"); sFileGatherer = new database::FileGathererImpl(); - auto database_res = database::Database::Open( - *sFileGatherer, sServices->tag_parser(), sServices->collator()); + auto database_res = + database::Database::Open(*sFileGatherer, sServices->tag_parser(), + sServices->collator(), sServices->bg_worker()); if (database_res.has_error()) { ESP_LOGW(kTag, "failed to open!"); events::System().Dispatch(StorageError{}); diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp index ca93336f..8ff57d4a 100644 --- a/src/tasks/tasks.cpp +++ b/src/tasks/tasks.cpp @@ -195,8 +195,7 @@ Worker::~Worker() { } template <> -auto Worker::Dispatch(const std::function& fn) - -> std::future { +auto Worker::Dispatch(const std::function fn) -> std::future { std::shared_ptr> promise = std::make_shared>(); WorkItem item{ diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp index 2f008120..06cbd26b 100644 --- a/src/tasks/tasks.hpp +++ b/src/tasks/tasks.hpp @@ -106,7 +106,7 @@ class Worker { * asynchronously returns the result as a future. */ template - auto Dispatch(const std::function& fn) -> std::future { + auto Dispatch(const std::function fn) -> std::future { std::shared_ptr> promise = std::make_shared>(); WorkItem item{ @@ -125,6 +125,6 @@ class Worker { /* Specialisation of Evaluate for functions that return nothing. */ template <> -auto Worker::Dispatch(const std::function& fn) -> std::future; +auto Worker::Dispatch(const std::function fn) -> std::future; } // namespace tasks