Add a wrapper codec source that does readahead

custom
jacqueline 1 year ago
parent d36fe9be6b
commit 499d5a942f
  1. 2
      src/audio/CMakeLists.txt
  2. 3
      src/audio/audio_fsm.cpp
  3. 9
      src/audio/fatfs_audio_input.cpp
  4. 4
      src/audio/include/fatfs_audio_input.hpp
  5. 53
      src/audio/include/readahead_source.hpp
  6. 124
      src/audio/readahead_source.cpp
  7. 2
      src/codecs/source_buffer.cpp
  8. 7
      src/database/database.cpp
  9. 2
      src/database/env_esp.cpp
  10. 3
      src/database/include/database.hpp
  11. 2
      src/database/include/env_esp.hpp
  12. 5
      src/system_fsm/booting.cpp
  13. 12
      src/system_fsm/include/service_locator.hpp
  14. 5
      src/system_fsm/running.cpp
  15. 3
      src/tasks/tasks.cpp
  16. 4
      src/tasks/tasks.hpp

@ -5,7 +5,7 @@
idf_component_register(
SRCS "audio_decoder.cpp" "fatfs_audio_input.cpp" "i2s_audio_output.cpp"
"track_queue.cpp" "audio_fsm.cpp" "audio_converter.cpp" "resample.cpp"
"fatfs_source.cpp" "bt_audio_output.cpp"
"fatfs_source.cpp" "bt_audio_output.cpp" "readahead_source.cpp"
INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm"
"database" "system_fsm" "playlist" "speexdsp")

@ -105,7 +105,8 @@ void Uninitialised::react(const system_fsm::BootComplete& ev) {
StreamBufferHandle_t stream = xStreamBufferCreateWithCaps(
kDrainBufferSize, sizeof(sample::Sample) * 2, MALLOC_CAP_DMA);
sFileSource.reset(new FatfsAudioInput(sServices->tag_parser()));
sFileSource.reset(
new FatfsAudioInput(sServices->tag_parser(), sServices->bg_worker()));
sI2SOutput.reset(new I2SAudioOutput(stream, sServices->gpios()));
sBtOutput.reset(new BluetoothAudioOutput(stream, sServices->bluetooth()));

@ -23,6 +23,7 @@
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "idf_additions.h"
#include "readahead_source.hpp"
#include "span.hpp"
#include "audio_events.hpp"
@ -42,9 +43,11 @@
namespace audio {
FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser)
FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser,
tasks::Worker& bg_worker)
: IAudioSource(),
tag_parser_(tag_parser),
bg_worker_(bg_worker),
new_stream_mutex_(),
new_stream_(),
has_new_stream_(false),
@ -142,7 +145,9 @@ auto FatfsAudioInput::OpenFile(const std::pmr::string& path) -> bool {
return false;
}
new_stream_.reset(new FatfsSource(stream_type.value(), std::move(file)));
auto source =
std::make_unique<FatfsSource>(stream_type.value(), std::move(file));
new_stream_.reset(new ReadaheadSource(bg_worker_, std::move(source)));
return true;
}

@ -19,6 +19,7 @@
#include "codec.hpp"
#include "future_fetcher.hpp"
#include "tag_parser.hpp"
#include "tasks.hpp"
#include "types.hpp"
namespace audio {
@ -30,7 +31,7 @@ namespace audio {
*/
class FatfsAudioInput : public IAudioSource {
public:
explicit FatfsAudioInput(database::ITagParser& tag_parser);
explicit FatfsAudioInput(database::ITagParser&, tasks::Worker&);
~FatfsAudioInput();
/*
@ -54,6 +55,7 @@ class FatfsAudioInput : public IAudioSource {
-> std::optional<codecs::StreamType>;
database::ITagParser& tag_parser_;
tasks::Worker& bg_worker_;
std::mutex new_stream_mutex_;
std::shared_ptr<codecs::IStream> new_stream_;

@ -0,0 +1,53 @@
/*
* Copyright 2023 jacqueline <me@jacqueline.id.au>
*
* SPDX-License-Identifier: GPL-3.0-only
*/
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include "freertos/FreeRTOS.h"
#include "ff.h"
#include "freertos/stream_buffer.h"
#include "audio_source.hpp"
#include "codec.hpp"
#include "tasks.hpp"
namespace audio {
/*
* Wraps another stream, proactively buffering large chunks of it into memory
* at a time.
*/
class ReadaheadSource : public codecs::IStream {
public:
ReadaheadSource(tasks::Worker&, std::unique_ptr<codecs::IStream>);
~ReadaheadSource();
auto Read(cpp::span<std::byte> dest) -> ssize_t override;
auto CanSeek() -> bool override;
auto SeekTo(int64_t destination, SeekFrom from) -> void override;
auto CurrentPosition() -> int64_t override;
ReadaheadSource(const ReadaheadSource&) = delete;
ReadaheadSource& operator=(const ReadaheadSource&) = delete;
private:
tasks::Worker& worker_;
std::unique_ptr<codecs::IStream> wrapped_;
std::atomic<bool> is_refilling_;
StreamBufferHandle_t buffer_;
int64_t tell_;
};
} // namespace audio

@ -0,0 +1,124 @@
/*
* Copyright 2023 jacqueline <me@jacqueline.id.au>
*
* SPDX-License-Identifier: GPL-3.0-only
*/
#include "readahead_source.hpp"
#include <cstddef>
#include <cstdint>
#include <memory>
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "ff.h"
#include "audio_source.hpp"
#include "codec.hpp"
#include "freertos/portmacro.h"
#include "idf_additions.h"
#include "spi.hpp"
#include "tasks.hpp"
#include "types.hpp"
namespace audio {
static constexpr char kTag[] = "readahead";
static constexpr size_t kBufferSize = 1024 * 512;
ReadaheadSource::ReadaheadSource(tasks::Worker& worker,
std::unique_ptr<codecs::IStream> wrapped)
: IStream(wrapped->type()),
worker_(worker),
wrapped_(std::move(wrapped)),
is_refilling_(false),
buffer_(xStreamBufferCreateWithCaps(kBufferSize, 1, MALLOC_CAP_SPIRAM)),
tell_(wrapped_->CurrentPosition()) {}
ReadaheadSource::~ReadaheadSource() {
vStreamBufferDeleteWithCaps(buffer_);
}
auto ReadaheadSource::Read(cpp::span<std::byte> dest) -> ssize_t {
// Optismise for the most frequent case: the buffer already contains enough
// data for this call.
size_t bytes_read =
xStreamBufferReceive(buffer_, dest.data(), dest.size_bytes(), 0);
tell_ += bytes_read;
if (bytes_read == dest.size_bytes()) {
return bytes_read;
}
dest = dest.subspan(bytes_read);
// Are we currently fetching more bytes?
ssize_t extra_bytes = 0;
if (!is_refilling_) {
// No! Pass through directly to the wrapped source for the fastest
// response.
extra_bytes = wrapped_->Read(dest);
} else {
// Yes! Wait for the refill to catch up, then try again.
is_refilling_.wait(true);
extra_bytes =
xStreamBufferReceive(buffer_, dest.data(), dest.size_bytes(), 0);
}
// No need to check whether the dest buffer is actually filled, since at this
// point we've read as many bytes as were available.
tell_ += extra_bytes;
bytes_read += extra_bytes;
// Before returning, make sure the readahead task is kicked off again.
ESP_LOGI(kTag, "triggering readahead");
is_refilling_ = true;
std::function<void(void)> refill = [this]() {
// Try to keep larger than most reasonable FAT sector sizes for more
// efficient disk reads.
constexpr size_t kMaxSingleRead = 1024 * 64;
std::byte working_buf[kMaxSingleRead];
for (;;) {
size_t bytes_to_read = std::min<size_t>(
kMaxSingleRead, xStreamBufferSpacesAvailable(buffer_));
if (bytes_to_read == 0) {
break;
}
size_t read = wrapped_->Read({working_buf, bytes_to_read});
if (read > 0) {
xStreamBufferSend(buffer_, working_buf, read, 0);
}
if (read < bytes_to_read) {
break;
}
}
is_refilling_ = false;
is_refilling_.notify_all();
};
worker_.Dispatch(refill);
return bytes_read;
}
auto ReadaheadSource::CanSeek() -> bool {
return wrapped_->CanSeek();
}
auto ReadaheadSource::SeekTo(int64_t destination, SeekFrom from) -> void {
// Seeking blows away all of our prefetched data. To do this safely, we
// first need to wait for the refill task to finish.
is_refilling_.wait(true);
// It's now safe to clear out the buffer.
xStreamBufferReset(buffer_);
wrapped_->SeekTo(destination, from);
// Make sure our tell is up to date with the new location.
tell_ = wrapped_->CurrentPosition();
}
auto ReadaheadSource::CurrentPosition() -> int64_t {
return tell_;
}
} // namespace audio

@ -18,7 +18,7 @@
namespace codecs {
[[maybe_unused]] static constexpr char kTag[] = "dec_buf";
static constexpr size_t kBufferSize = 1024 * 128;
static constexpr size_t kBufferSize = 1024 * 16;
static constexpr size_t kReadThreshold = 1024 * 8;
SourceBuffer::SourceBuffer()

@ -117,16 +117,15 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool {
auto Database::Open(IFileGatherer& gatherer,
ITagParser& parser,
locale::ICollator& collator)
locale::ICollator& collator,
tasks::Worker& bg_worker)
-> cpp::result<Database*, DatabaseError> {
// TODO(jacqueline): Why isn't compare_and_exchange_* available?
if (sIsDbOpen.exchange(true)) {
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
if (!leveldb::sBackgroundThread) {
leveldb::sBackgroundThread.reset(
tasks::Worker::Start<tasks::Type::kDatabaseBackground>());
leveldb::sBackgroundThread = &bg_worker;
}
std::shared_ptr<tasks::Worker> worker(

@ -41,7 +41,7 @@
namespace leveldb {
std::shared_ptr<tasks::Worker> sBackgroundThread;
tasks::Worker *sBackgroundThread = nullptr;
std::string ErrToStr(FRESULT err) {
switch (err) {

@ -95,7 +95,8 @@ class Database {
};
static auto Open(IFileGatherer& file_gatherer,
ITagParser& tag_parser,
locale::ICollator& collator)
locale::ICollator& collator,
tasks::Worker& bg_worker)
-> cpp::result<Database*, DatabaseError>;
static auto Destroy() -> void;

@ -18,7 +18,7 @@
namespace leveldb {
extern std::shared_ptr<tasks::Worker> sBackgroundThread;
extern tasks::Worker* sBackgroundThread;
// Tracks the files locked by EspEnv::LockFile().
//

@ -32,6 +32,7 @@
#include "spi.hpp"
#include "system_events.hpp"
#include "tag_parser.hpp"
#include "tasks.hpp"
#include "touchwheel.hpp"
#include "track_queue.hpp"
#include "ui_fsm.hpp"
@ -63,6 +64,10 @@ auto Booting::entry() -> void {
return;
}
ESP_LOGI(kTag, "starting bg worker");
sServices->bg_worker(std::unique_ptr<tasks::Worker>{
tasks::Worker::Start<tasks::Type::kDatabaseBackground>()});
ESP_LOGI(kTag, "installing remaining drivers");
sServices->samd(std::unique_ptr<drivers::Samd>(drivers::Samd::Create()));
vTaskDelay(pdMS_TO_TICKS(1000));

@ -17,6 +17,7 @@
#include "samd.hpp"
#include "storage.hpp"
#include "tag_parser.hpp"
#include "tasks.hpp"
#include "touchwheel.hpp"
#include "track_queue.hpp"
@ -111,6 +112,15 @@ class ServiceLocator {
collator_ = std::move(i);
}
auto bg_worker() -> tasks::Worker& {
assert(bg_worker_ != nullptr);
return *bg_worker_;
}
auto bg_worker(std::unique_ptr<tasks::Worker> w) -> void {
bg_worker_ = std::move(w);
}
// Not copyable or movable.
ServiceLocator(const ServiceLocator&) = delete;
ServiceLocator& operator=(const ServiceLocator&) = delete;
@ -129,6 +139,8 @@ class ServiceLocator {
std::unique_ptr<database::ITagParser> tag_parser_;
std::unique_ptr<locale::ICollator> collator_;
std::unique_ptr<tasks::Worker> bg_worker_;
drivers::SdState sd_;
};

@ -55,8 +55,9 @@ void Running::entry() {
ESP_LOGI(kTag, "opening database");
sFileGatherer = new database::FileGathererImpl();
auto database_res = database::Database::Open(
*sFileGatherer, sServices->tag_parser(), sServices->collator());
auto database_res =
database::Database::Open(*sFileGatherer, sServices->tag_parser(),
sServices->collator(), sServices->bg_worker());
if (database_res.has_error()) {
ESP_LOGW(kTag, "failed to open!");
events::System().Dispatch(StorageError{});

@ -195,8 +195,7 @@ Worker::~Worker() {
}
template <>
auto Worker::Dispatch(const std::function<void(void)>& fn)
-> std::future<void> {
auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
WorkItem item{

@ -106,7 +106,7 @@ class Worker {
* asynchronously returns the result as a future.
*/
template <typename T>
auto Dispatch(const std::function<T(void)>& fn) -> std::future<T> {
auto Dispatch(const std::function<T(void)> fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>();
WorkItem item{
@ -125,6 +125,6 @@ class Worker {
/* Specialisation of Evaluate for functions that return nothing. */
template <>
auto Worker::Dispatch(const std::function<void(void)>& fn) -> std::future<void>;
auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void>;
} // namespace tasks

Loading…
Cancel
Save