diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index 52d41aa6..80532542 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -4,6 +4,6 @@ idf_component_register( "audio_playback.cpp" "audio_element_handle.cpp" "stream_event.cpp" "audio_element.cpp" INCLUDE_DIRS "include" - REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span") + REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp index 97f45534..07e05653 100644 --- a/src/audio/audio_decoder.cpp +++ b/src/audio/audio_decoder.cpp @@ -21,10 +21,12 @@ namespace audio { static const char* kTag = "DEC"; -static const std::size_t kSamplesPerChunk = 1024; +static const std::size_t kChunkSize = 1024; +static const std::size_t kReadahead = 8; AudioDecoder::AudioDecoder() : IAudioElement(), + arena_(kChunkSize, kReadahead, MALLOC_CAP_SPIRAM), stream_info_({}), has_samples_to_send_(false), needs_more_input_(true) {} @@ -104,23 +106,25 @@ auto AudioDecoder::Process() -> cpp::result { stream_info_->bits_per_sample = format.bits_per_sample; stream_info_->sample_rate = format.sample_rate_hz; stream_info_->channels = format.num_channels; - - chunk_size_ = kSamplesPerChunk * (*stream_info_->bits_per_sample); - stream_info_->chunk_size = chunk_size_; - ESP_LOGI(kTag, "pcm stream chunk size: %u bytes", chunk_size_); + stream_info_->chunk_size = kChunkSize; auto event = StreamEvent::CreateStreamInfo(input_events_, *stream_info_); SendOrBufferEvent(std::unique_ptr(event)); } - auto chunk = std::unique_ptr( - StreamEvent::CreateChunkData(input_events_, chunk_size_)); + auto block = arena_.Acquire(); + if (!block) { + return {}; + } + auto write_res = - current_codec_->WriteOutputSamples(chunk->chunk_data.bytes); - chunk->chunk_data.bytes = chunk->chunk_data.bytes.first(write_res.first); + current_codec_->WriteOutputSamples({block->start, block->size}); + block->used_size = write_res.first; has_samples_to_send_ = !write_res.second; + auto chunk = std::unique_ptr( + StreamEvent::CreateArenaChunk(input_events_, *block)); if (!SendOrBufferEvent(std::move(chunk))) { return {}; } diff --git a/src/audio/audio_element.cpp b/src/audio/audio_element.cpp index 70a59a51..6b04b892 100644 --- a/src/audio/audio_element.cpp +++ b/src/audio/audio_element.cpp @@ -28,7 +28,7 @@ IAudioElement::~IAudioElement() { auto IAudioElement::SendOrBufferEvent(std::unique_ptr event) -> bool { - if (event->tag == StreamEvent::CHUNK_DATA) { + if (event->tag == StreamEvent::ARENA_CHUNK) { unprocessed_output_chunks_++; } if (!buffered_output_.empty()) { diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 14f3462d..7c91b8cc 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -6,6 +6,7 @@ #include #include +#include "arena.hpp" #include "audio_element_handle.hpp" #include "cbor.h" #include "esp_heap_caps.h" @@ -96,7 +97,7 @@ void AudioTaskMain(void* args) { } else if (new_event->tag == StreamEvent::LOG_STATUS) { element->ProcessLogStatus(); if (element->OutputEventQueue() != nullptr) { - xQueueSendToFront(element->OutputEventQueue(), &new_event, 0); + xQueueSendToFront(element->OutputEventQueue(), &new_event, 0); } else { delete new_event; } @@ -150,8 +151,9 @@ void AudioTaskMain(void* args) { // TODO(jacqueline) ESP_LOGE(kTag, "failed to process stream info"); } - } else if (event->tag == StreamEvent::CHUNK_DATA) { - ESP_LOGD(kTag, "processing chunk data"); + } else if (event->tag == StreamEvent::ARENA_CHUNK) { + ESP_LOGD(kTag, "processing arena data"); + memory::ArenaRef ref(event->arena_chunk); auto callback = StreamEvent::CreateChunkNotification(element->InputEventQueue()); if (!xQueueSend(event->source, &callback, 0)) { @@ -159,8 +161,10 @@ void AudioTaskMain(void* args) { continue; } + // TODO(jacqueline): Consider giving the element a full ArenaRef here, + // so that it can hang on to it and potentially save an alloc+copy. auto process_chunk_res = - element->ProcessChunk(event->chunk_data.bytes); + element->ProcessChunk({ref.ptr.start, ref.ptr.used_size}); if (process_chunk_res.has_error()) { // TODO(jacqueline) ESP_LOGE(kTag, "failed to process chunk"); diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index fd1c1f3a..829064d8 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -5,6 +5,7 @@ #include #include +#include "arena.hpp" #include "esp_heap_caps.h" #include "freertos/portmacro.h" @@ -19,11 +20,12 @@ static const char* kTag = "SRC"; namespace audio { -// 32KiB to match the minimum himen region size. static const std::size_t kChunkSize = 24 * 1024; +static const std::size_t kChunkReadahead = 2; FatfsAudioInput::FatfsAudioInput(std::shared_ptr storage) : IAudioElement(), + arena_(kChunkSize, kChunkReadahead, MALLOC_CAP_SPIRAM), storage_(storage), current_file_(), is_file_open_(false) {} @@ -80,25 +82,28 @@ auto FatfsAudioInput::ProcessEndOfStream() -> void { auto FatfsAudioInput::Process() -> cpp::result { if (is_file_open_) { - auto dest_event = std::unique_ptr( - StreamEvent::CreateChunkData(input_events_, kChunkSize)); - UINT bytes_read = 0; + auto dest_block = memory::ArenaRef::Acquire(&arena_); + if (!dest_block) { + return {}; + } - FRESULT result = f_read(¤t_file_, dest_event->chunk_data.raw_bytes, - kChunkSize, &bytes_read); + FRESULT result = f_read(¤t_file_, dest_block->ptr.start, + dest_block->ptr.size, &dest_block->ptr.used_size); if (result != FR_OK) { ESP_LOGE(kTag, "file I/O error %d", result); return cpp::fail(IO_ERROR); } - dest_event->chunk_data.bytes = - dest_event->chunk_data.bytes.first(bytes_read); - SendOrBufferEvent(std::move(dest_event)); - - if (bytes_read < kChunkSize || f_eof(¤t_file_)) { + if (dest_block->ptr.used_size < dest_block->ptr.size || + f_eof(¤t_file_)) { f_close(¤t_file_); is_file_open_ = false; } + + auto dest_event = std::unique_ptr( + StreamEvent::CreateArenaChunk(input_events_, dest_block->Release())); + + SendOrBufferEvent(std::move(dest_event)); } return {}; } diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp index aa83825f..9cc40162 100644 --- a/src/audio/include/audio_decoder.hpp +++ b/src/audio/include/audio_decoder.hpp @@ -44,6 +44,7 @@ class AudioDecoder : public IAudioElement { AudioDecoder& operator=(const AudioDecoder&) = delete; private: + memory::Arena arena_; std::unique_ptr current_codec_; std::optional stream_info_; std::optional chunk_reader_; diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index 883441c2..06b0b7ea 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -4,6 +4,7 @@ #include #include +#include "arena.hpp" #include "chunk.hpp" #include "freertos/FreeRTOS.h" @@ -35,6 +36,7 @@ class FatfsAudioInput : public IAudioElement { FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; private: + memory::Arena arena_; std::shared_ptr storage_; FIL current_file_; diff --git a/src/audio/include/stream_event.hpp b/src/audio/include/stream_event.hpp index d42de411..6d270e9d 100644 --- a/src/audio/include/stream_event.hpp +++ b/src/audio/include/stream_event.hpp @@ -2,6 +2,7 @@ #include +#include "arena.hpp" #include "freertos/FreeRTOS.h" #include "freertos/queue.h" @@ -13,7 +14,7 @@ namespace audio { struct StreamEvent { static auto CreateStreamInfo(QueueHandle_t source, const StreamInfo& payload) -> StreamEvent*; - static auto CreateChunkData(QueueHandle_t source, std::size_t chunk_size) + static auto CreateArenaChunk(QueueHandle_t source, memory::ArenaPtr ptr) -> StreamEvent*; static auto CreateChunkNotification(QueueHandle_t source) -> StreamEvent*; static auto CreateEndOfStream(QueueHandle_t source) -> StreamEvent*; @@ -28,7 +29,7 @@ struct StreamEvent { enum { UNINITIALISED, STREAM_INFO, - CHUNK_DATA, + ARENA_CHUNK, CHUNK_NOTIFICATION, END_OF_STREAM, LOG_STATUS, @@ -37,10 +38,7 @@ struct StreamEvent { union { StreamInfo* stream_info; - struct { - std::byte* raw_bytes; - cpp::span bytes; - } chunk_data; + memory::ArenaPtr arena_chunk; // FIXME: It would be nice to also support a pointer to himem data here, to // save a little ordinary heap space. diff --git a/src/audio/stream_event.cpp b/src/audio/stream_event.cpp index af470584..e08f26da 100644 --- a/src/audio/stream_event.cpp +++ b/src/audio/stream_event.cpp @@ -1,6 +1,7 @@ #include "stream_event.hpp" #include #include +#include "arena.hpp" #include "stream_info.hpp" namespace audio { @@ -14,17 +15,12 @@ auto StreamEvent::CreateStreamInfo(QueueHandle_t source, return event; } -auto StreamEvent::CreateChunkData(QueueHandle_t source, std::size_t chunk_size) +auto StreamEvent::CreateArenaChunk(QueueHandle_t source, memory::ArenaPtr ptr) -> StreamEvent* { auto event = new StreamEvent; - event->tag = StreamEvent::CHUNK_DATA; + event->tag = StreamEvent::ARENA_CHUNK; event->source = source; - - auto raw_bytes = - static_cast(heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM)); - - event->chunk_data.raw_bytes = raw_bytes; - event->chunk_data.bytes = cpp::span(raw_bytes, chunk_size); + event->arena_chunk = ptr; return event; } @@ -59,8 +55,8 @@ StreamEvent::~StreamEvent() { case STREAM_INFO: delete stream_info; break; - case CHUNK_DATA: - free(chunk_data.raw_bytes); + case ARENA_CHUNK: + arena_chunk.owner->Return(arena_chunk); break; case CHUNK_NOTIFICATION: break; @@ -81,9 +77,10 @@ StreamEvent::StreamEvent(StreamEvent&& other) { stream_info = other.stream_info; other.stream_info = nullptr; break; - case CHUNK_DATA: - chunk_data = other.chunk_data; - other.chunk_data = {}; + case ARENA_CHUNK: + arena_chunk = other.arena_chunk; + other.arena_chunk = { + .owner = nullptr, .start = nullptr, .size = 0, .used_size = 0}; break; case CHUNK_NOTIFICATION: break; diff --git a/src/memory/CMakeLists.txt b/src/memory/CMakeLists.txt new file mode 100644 index 00000000..67e64267 --- /dev/null +++ b/src/memory/CMakeLists.txt @@ -0,0 +1,2 @@ +idf_component_register(SRCS "arena.cpp" INCLUDE_DIRS "include" REQUIRES "span") +target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/memory/arena.cpp b/src/memory/arena.cpp new file mode 100644 index 00000000..450ac4f2 --- /dev/null +++ b/src/memory/arena.cpp @@ -0,0 +1,75 @@ +#include "arena.hpp" + +#include +#include + +#include "esp_heap_caps.h" +#include "freertos/queue.h" +#include "span.hpp" + +namespace memory { + +Arena::Arena(std::size_t block_size, + std::size_t num_blocks, + uint32_t alloc_flags) + : block_size_(block_size) { + pool_ = static_cast( + heap_caps_malloc(block_size * num_blocks, alloc_flags)); + free_blocks_ = xQueueCreate(num_blocks, sizeof(void*)); + for (int i = 0; i < num_blocks; i++) { + std::byte* block = pool_ + (i * block_size); + xQueueSend(free_blocks_, &block, 0); + } +} + +Arena::~Arena() { + // TODO: assert queue is full? + vQueueDelete(free_blocks_); + free(pool_); +} + +auto Arena::Acquire() -> std::optional { + std::byte* block; + bool result = xQueueReceive(free_blocks_, &block, 0); + if (result) { + ArenaPtr ptr{this, block, block_size_, 0}; + return ptr; + } else { + return {}; + } +} + +auto Arena::Return(ArenaPtr ptr) -> void { + assert(ptr.owner == this); + xQueueSend(free_blocks_, &ptr.start, 0); +} + +auto ArenaRef::Acquire(Arena* a) -> std::optional { + auto ptr = a->Acquire(); + if (ptr) { + ArenaRef ref{*ptr}; + return ref; + } + return {}; +} + +ArenaRef::ArenaRef(ArenaPtr p) : ptr(p) {} + +ArenaRef::ArenaRef(ArenaRef&& other) : ptr(other.Release()) {} + +auto ArenaRef::Release() -> ArenaPtr { + auto ret = ptr; + ptr.owner = nullptr; + ptr.start = nullptr; + ptr.size = 0; + ptr.used_size = 0; + return ret; +} + +ArenaRef::~ArenaRef() { + if (ptr.owner != nullptr) { + ptr.owner->Return(ptr); + } +} + +} // namespace memory diff --git a/src/memory/include/arena.hpp b/src/memory/include/arena.hpp new file mode 100644 index 00000000..26d49d27 --- /dev/null +++ b/src/memory/include/arena.hpp @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include + +#include "freertos/FreeRTOS.h" +#include "freertos/queue.h" +#include "span.hpp" +#include "sys/_stdint.h" + +namespace memory { + +class Arena; + +/* + * A pointer to data that has been given out by an Arena, plus extra accounting + * information so that it can be returned properly. + */ +struct ArenaPtr { + Arena* owner; + std::byte* start; + std::size_t size; + // A convenience for keeping track of the subset of the block that has had + // data placed within it. + std::size_t used_size; +}; + +/* + * A basic memory arena. This class mediates access to fixed-size blocks of + * memory within a larger contiguous block. This is faster than re-allocating + * smaller blocks every time they're needed, and lets us easily limit the + * maximum size of the memory used. + * + * A single arena instance is safe to be used concurrently by multiple tasks, + * however there is no built in synchronisation of the underlying memory. + */ +class Arena { + public: + Arena(std::size_t block_size, std::size_t num_blocks, uint32_t alloc_flags); + ~Arena(); + + /* + * Attempts to receive an allocation from this arena. Returns absent if + * there are no blocks left. + */ + auto Acquire() -> std::optional; + + /* Returns a previously allocated block to this arena. */ + auto Return(ArenaPtr) -> void; + + /* Returns the number of blocks that are currently free. */ + auto BlocksFree() -> std::size_t; + + Arena(const Arena&) = delete; + Arena& operator=(const Arena&) = delete; + + private: + std::size_t block_size_; + // The large memory allocation that is divided into blocks. + std::byte* pool_; + // A FreeRTOS queue containing the blocks that are currently unused. + QueueHandle_t free_blocks_; +}; + +/* + * Wrapper around an ArenaPtr that handles acquiring and returning the block + * through RAII. + */ +class ArenaRef { + public: + static auto Acquire(Arena* a) -> std::optional; + explicit ArenaRef(ArenaPtr ptr); + ~ArenaRef(); + + auto Release() -> ArenaPtr; + + ArenaRef(ArenaRef&&); + ArenaRef(const ArenaRef&) = delete; + Arena& operator=(const Arena&) = delete; + + ArenaPtr ptr; +}; + +} // namespace memory