Add a memory arena for the audio pipeline

custom
jacqueline 2 years ago
parent 12d2ffdab7
commit 941bafca17
  1. 2
      src/audio/CMakeLists.txt
  2. 22
      src/audio/audio_decoder.cpp
  3. 2
      src/audio/audio_element.cpp
  4. 12
      src/audio/audio_task.cpp
  5. 27
      src/audio/fatfs_audio_input.cpp
  6. 1
      src/audio/include/audio_decoder.hpp
  7. 2
      src/audio/include/fatfs_audio_input.hpp
  8. 10
      src/audio/include/stream_event.hpp
  9. 23
      src/audio/stream_event.cpp
  10. 2
      src/memory/CMakeLists.txt
  11. 75
      src/memory/arena.cpp
  12. 85
      src/memory/include/arena.hpp

@ -4,6 +4,6 @@ idf_component_register(
"audio_playback.cpp" "audio_element_handle.cpp" "stream_event.cpp" "audio_playback.cpp" "audio_element_handle.cpp" "stream_event.cpp"
"audio_element.cpp" "audio_element.cpp"
INCLUDE_DIRS "include" INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span") REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})

@ -21,10 +21,12 @@ namespace audio {
static const char* kTag = "DEC"; static const char* kTag = "DEC";
static const std::size_t kSamplesPerChunk = 1024; static const std::size_t kChunkSize = 1024;
static const std::size_t kReadahead = 8;
AudioDecoder::AudioDecoder() AudioDecoder::AudioDecoder()
: IAudioElement(), : IAudioElement(),
arena_(kChunkSize, kReadahead, MALLOC_CAP_SPIRAM),
stream_info_({}), stream_info_({}),
has_samples_to_send_(false), has_samples_to_send_(false),
needs_more_input_(true) {} needs_more_input_(true) {}
@ -104,23 +106,25 @@ auto AudioDecoder::Process() -> cpp::result<void, AudioProcessingError> {
stream_info_->bits_per_sample = format.bits_per_sample; stream_info_->bits_per_sample = format.bits_per_sample;
stream_info_->sample_rate = format.sample_rate_hz; stream_info_->sample_rate = format.sample_rate_hz;
stream_info_->channels = format.num_channels; stream_info_->channels = format.num_channels;
stream_info_->chunk_size = kChunkSize;
chunk_size_ = kSamplesPerChunk * (*stream_info_->bits_per_sample);
stream_info_->chunk_size = chunk_size_;
ESP_LOGI(kTag, "pcm stream chunk size: %u bytes", chunk_size_);
auto event = auto event =
StreamEvent::CreateStreamInfo(input_events_, *stream_info_); StreamEvent::CreateStreamInfo(input_events_, *stream_info_);
SendOrBufferEvent(std::unique_ptr<StreamEvent>(event)); SendOrBufferEvent(std::unique_ptr<StreamEvent>(event));
} }
auto chunk = std::unique_ptr<StreamEvent>( auto block = arena_.Acquire();
StreamEvent::CreateChunkData(input_events_, chunk_size_)); if (!block) {
return {};
}
auto write_res = auto write_res =
current_codec_->WriteOutputSamples(chunk->chunk_data.bytes); current_codec_->WriteOutputSamples({block->start, block->size});
chunk->chunk_data.bytes = chunk->chunk_data.bytes.first(write_res.first); block->used_size = write_res.first;
has_samples_to_send_ = !write_res.second; has_samples_to_send_ = !write_res.second;
auto chunk = std::unique_ptr<StreamEvent>(
StreamEvent::CreateArenaChunk(input_events_, *block));
if (!SendOrBufferEvent(std::move(chunk))) { if (!SendOrBufferEvent(std::move(chunk))) {
return {}; return {};
} }

@ -28,7 +28,7 @@ IAudioElement::~IAudioElement() {
auto IAudioElement::SendOrBufferEvent(std::unique_ptr<StreamEvent> event) auto IAudioElement::SendOrBufferEvent(std::unique_ptr<StreamEvent> event)
-> bool { -> bool {
if (event->tag == StreamEvent::CHUNK_DATA) { if (event->tag == StreamEvent::ARENA_CHUNK) {
unprocessed_output_chunks_++; unprocessed_output_chunks_++;
} }
if (!buffered_output_.empty()) { if (!buffered_output_.empty()) {

@ -6,6 +6,7 @@
#include <deque> #include <deque>
#include <memory> #include <memory>
#include "arena.hpp"
#include "audio_element_handle.hpp" #include "audio_element_handle.hpp"
#include "cbor.h" #include "cbor.h"
#include "esp_heap_caps.h" #include "esp_heap_caps.h"
@ -96,7 +97,7 @@ void AudioTaskMain(void* args) {
} else if (new_event->tag == StreamEvent::LOG_STATUS) { } else if (new_event->tag == StreamEvent::LOG_STATUS) {
element->ProcessLogStatus(); element->ProcessLogStatus();
if (element->OutputEventQueue() != nullptr) { if (element->OutputEventQueue() != nullptr) {
xQueueSendToFront(element->OutputEventQueue(), &new_event, 0); xQueueSendToFront(element->OutputEventQueue(), &new_event, 0);
} else { } else {
delete new_event; delete new_event;
} }
@ -150,8 +151,9 @@ void AudioTaskMain(void* args) {
// TODO(jacqueline) // TODO(jacqueline)
ESP_LOGE(kTag, "failed to process stream info"); ESP_LOGE(kTag, "failed to process stream info");
} }
} else if (event->tag == StreamEvent::CHUNK_DATA) { } else if (event->tag == StreamEvent::ARENA_CHUNK) {
ESP_LOGD(kTag, "processing chunk data"); ESP_LOGD(kTag, "processing arena data");
memory::ArenaRef ref(event->arena_chunk);
auto callback = auto callback =
StreamEvent::CreateChunkNotification(element->InputEventQueue()); StreamEvent::CreateChunkNotification(element->InputEventQueue());
if (!xQueueSend(event->source, &callback, 0)) { if (!xQueueSend(event->source, &callback, 0)) {
@ -159,8 +161,10 @@ void AudioTaskMain(void* args) {
continue; continue;
} }
// TODO(jacqueline): Consider giving the element a full ArenaRef here,
// so that it can hang on to it and potentially save an alloc+copy.
auto process_chunk_res = auto process_chunk_res =
element->ProcessChunk(event->chunk_data.bytes); element->ProcessChunk({ref.ptr.start, ref.ptr.used_size});
if (process_chunk_res.has_error()) { if (process_chunk_res.has_error()) {
// TODO(jacqueline) // TODO(jacqueline)
ESP_LOGE(kTag, "failed to process chunk"); ESP_LOGE(kTag, "failed to process chunk");

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "arena.hpp"
#include "esp_heap_caps.h" #include "esp_heap_caps.h"
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
@ -19,11 +20,12 @@ static const char* kTag = "SRC";
namespace audio { namespace audio {
// 32KiB to match the minimum himen region size.
static const std::size_t kChunkSize = 24 * 1024; static const std::size_t kChunkSize = 24 * 1024;
static const std::size_t kChunkReadahead = 2;
FatfsAudioInput::FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage) FatfsAudioInput::FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage)
: IAudioElement(), : IAudioElement(),
arena_(kChunkSize, kChunkReadahead, MALLOC_CAP_SPIRAM),
storage_(storage), storage_(storage),
current_file_(), current_file_(),
is_file_open_(false) {} is_file_open_(false) {}
@ -80,25 +82,28 @@ auto FatfsAudioInput::ProcessEndOfStream() -> void {
auto FatfsAudioInput::Process() -> cpp::result<void, AudioProcessingError> { auto FatfsAudioInput::Process() -> cpp::result<void, AudioProcessingError> {
if (is_file_open_) { if (is_file_open_) {
auto dest_event = std::unique_ptr<StreamEvent>( auto dest_block = memory::ArenaRef::Acquire(&arena_);
StreamEvent::CreateChunkData(input_events_, kChunkSize)); if (!dest_block) {
UINT bytes_read = 0; return {};
}
FRESULT result = f_read(&current_file_, dest_event->chunk_data.raw_bytes, FRESULT result = f_read(&current_file_, dest_block->ptr.start,
kChunkSize, &bytes_read); dest_block->ptr.size, &dest_block->ptr.used_size);
if (result != FR_OK) { if (result != FR_OK) {
ESP_LOGE(kTag, "file I/O error %d", result); ESP_LOGE(kTag, "file I/O error %d", result);
return cpp::fail(IO_ERROR); return cpp::fail(IO_ERROR);
} }
dest_event->chunk_data.bytes = if (dest_block->ptr.used_size < dest_block->ptr.size ||
dest_event->chunk_data.bytes.first(bytes_read); f_eof(&current_file_)) {
SendOrBufferEvent(std::move(dest_event));
if (bytes_read < kChunkSize || f_eof(&current_file_)) {
f_close(&current_file_); f_close(&current_file_);
is_file_open_ = false; is_file_open_ = false;
} }
auto dest_event = std::unique_ptr<StreamEvent>(
StreamEvent::CreateArenaChunk(input_events_, dest_block->Release()));
SendOrBufferEvent(std::move(dest_event));
} }
return {}; return {};
} }

@ -44,6 +44,7 @@ class AudioDecoder : public IAudioElement {
AudioDecoder& operator=(const AudioDecoder&) = delete; AudioDecoder& operator=(const AudioDecoder&) = delete;
private: private:
memory::Arena arena_;
std::unique_ptr<codecs::ICodec> current_codec_; std::unique_ptr<codecs::ICodec> current_codec_;
std::optional<StreamInfo> stream_info_; std::optional<StreamInfo> stream_info_;
std::optional<ChunkReader> chunk_reader_; std::optional<ChunkReader> chunk_reader_;

@ -4,6 +4,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "arena.hpp"
#include "chunk.hpp" #include "chunk.hpp"
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
@ -35,6 +36,7 @@ class FatfsAudioInput : public IAudioElement {
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
private: private:
memory::Arena arena_;
std::shared_ptr<drivers::SdStorage> storage_; std::shared_ptr<drivers::SdStorage> storage_;
FIL current_file_; FIL current_file_;

@ -2,6 +2,7 @@
#include <memory> #include <memory>
#include "arena.hpp"
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
#include "freertos/queue.h" #include "freertos/queue.h"
@ -13,7 +14,7 @@ namespace audio {
struct StreamEvent { struct StreamEvent {
static auto CreateStreamInfo(QueueHandle_t source, const StreamInfo& payload) static auto CreateStreamInfo(QueueHandle_t source, const StreamInfo& payload)
-> StreamEvent*; -> StreamEvent*;
static auto CreateChunkData(QueueHandle_t source, std::size_t chunk_size) static auto CreateArenaChunk(QueueHandle_t source, memory::ArenaPtr ptr)
-> StreamEvent*; -> StreamEvent*;
static auto CreateChunkNotification(QueueHandle_t source) -> StreamEvent*; static auto CreateChunkNotification(QueueHandle_t source) -> StreamEvent*;
static auto CreateEndOfStream(QueueHandle_t source) -> StreamEvent*; static auto CreateEndOfStream(QueueHandle_t source) -> StreamEvent*;
@ -28,7 +29,7 @@ struct StreamEvent {
enum { enum {
UNINITIALISED, UNINITIALISED,
STREAM_INFO, STREAM_INFO,
CHUNK_DATA, ARENA_CHUNK,
CHUNK_NOTIFICATION, CHUNK_NOTIFICATION,
END_OF_STREAM, END_OF_STREAM,
LOG_STATUS, LOG_STATUS,
@ -37,10 +38,7 @@ struct StreamEvent {
union { union {
StreamInfo* stream_info; StreamInfo* stream_info;
struct { memory::ArenaPtr arena_chunk;
std::byte* raw_bytes;
cpp::span<std::byte> bytes;
} chunk_data;
// FIXME: It would be nice to also support a pointer to himem data here, to // FIXME: It would be nice to also support a pointer to himem data here, to
// save a little ordinary heap space. // save a little ordinary heap space.

@ -1,6 +1,7 @@
#include "stream_event.hpp" #include "stream_event.hpp"
#include <cstddef> #include <cstddef>
#include <memory> #include <memory>
#include "arena.hpp"
#include "stream_info.hpp" #include "stream_info.hpp"
namespace audio { namespace audio {
@ -14,17 +15,12 @@ auto StreamEvent::CreateStreamInfo(QueueHandle_t source,
return event; return event;
} }
auto StreamEvent::CreateChunkData(QueueHandle_t source, std::size_t chunk_size) auto StreamEvent::CreateArenaChunk(QueueHandle_t source, memory::ArenaPtr ptr)
-> StreamEvent* { -> StreamEvent* {
auto event = new StreamEvent; auto event = new StreamEvent;
event->tag = StreamEvent::CHUNK_DATA; event->tag = StreamEvent::ARENA_CHUNK;
event->source = source; event->source = source;
event->arena_chunk = ptr;
auto raw_bytes =
static_cast<std::byte*>(heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM));
event->chunk_data.raw_bytes = raw_bytes;
event->chunk_data.bytes = cpp::span<std::byte>(raw_bytes, chunk_size);
return event; return event;
} }
@ -59,8 +55,8 @@ StreamEvent::~StreamEvent() {
case STREAM_INFO: case STREAM_INFO:
delete stream_info; delete stream_info;
break; break;
case CHUNK_DATA: case ARENA_CHUNK:
free(chunk_data.raw_bytes); arena_chunk.owner->Return(arena_chunk);
break; break;
case CHUNK_NOTIFICATION: case CHUNK_NOTIFICATION:
break; break;
@ -81,9 +77,10 @@ StreamEvent::StreamEvent(StreamEvent&& other) {
stream_info = other.stream_info; stream_info = other.stream_info;
other.stream_info = nullptr; other.stream_info = nullptr;
break; break;
case CHUNK_DATA: case ARENA_CHUNK:
chunk_data = other.chunk_data; arena_chunk = other.arena_chunk;
other.chunk_data = {}; other.arena_chunk = {
.owner = nullptr, .start = nullptr, .size = 0, .used_size = 0};
break; break;
case CHUNK_NOTIFICATION: case CHUNK_NOTIFICATION:
break; break;

@ -0,0 +1,2 @@
idf_component_register(SRCS "arena.cpp" INCLUDE_DIRS "include" REQUIRES "span")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})

@ -0,0 +1,75 @@
#include "arena.hpp"
#include <cstdint>
#include <optional>
#include "esp_heap_caps.h"
#include "freertos/queue.h"
#include "span.hpp"
namespace memory {
Arena::Arena(std::size_t block_size,
std::size_t num_blocks,
uint32_t alloc_flags)
: block_size_(block_size) {
pool_ = static_cast<std::byte*>(
heap_caps_malloc(block_size * num_blocks, alloc_flags));
free_blocks_ = xQueueCreate(num_blocks, sizeof(void*));
for (int i = 0; i < num_blocks; i++) {
std::byte* block = pool_ + (i * block_size);
xQueueSend(free_blocks_, &block, 0);
}
}
Arena::~Arena() {
// TODO: assert queue is full?
vQueueDelete(free_blocks_);
free(pool_);
}
auto Arena::Acquire() -> std::optional<ArenaPtr> {
std::byte* block;
bool result = xQueueReceive(free_blocks_, &block, 0);
if (result) {
ArenaPtr ptr{this, block, block_size_, 0};
return ptr;
} else {
return {};
}
}
auto Arena::Return(ArenaPtr ptr) -> void {
assert(ptr.owner == this);
xQueueSend(free_blocks_, &ptr.start, 0);
}
auto ArenaRef::Acquire(Arena* a) -> std::optional<ArenaRef> {
auto ptr = a->Acquire();
if (ptr) {
ArenaRef ref{*ptr};
return ref;
}
return {};
}
ArenaRef::ArenaRef(ArenaPtr p) : ptr(p) {}
ArenaRef::ArenaRef(ArenaRef&& other) : ptr(other.Release()) {}
auto ArenaRef::Release() -> ArenaPtr {
auto ret = ptr;
ptr.owner = nullptr;
ptr.start = nullptr;
ptr.size = 0;
ptr.used_size = 0;
return ret;
}
ArenaRef::~ArenaRef() {
if (ptr.owner != nullptr) {
ptr.owner->Return(ptr);
}
}
} // namespace memory

@ -0,0 +1,85 @@
#pragma once
#include <cstdint>
#include <optional>
#include <utility>
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "span.hpp"
#include "sys/_stdint.h"
namespace memory {
class Arena;
/*
* A pointer to data that has been given out by an Arena, plus extra accounting
* information so that it can be returned properly.
*/
struct ArenaPtr {
Arena* owner;
std::byte* start;
std::size_t size;
// A convenience for keeping track of the subset of the block that has had
// data placed within it.
std::size_t used_size;
};
/*
* A basic memory arena. This class mediates access to fixed-size blocks of
* memory within a larger contiguous block. This is faster than re-allocating
* smaller blocks every time they're needed, and lets us easily limit the
* maximum size of the memory used.
*
* A single arena instance is safe to be used concurrently by multiple tasks,
* however there is no built in synchronisation of the underlying memory.
*/
class Arena {
public:
Arena(std::size_t block_size, std::size_t num_blocks, uint32_t alloc_flags);
~Arena();
/*
* Attempts to receive an allocation from this arena. Returns absent if
* there are no blocks left.
*/
auto Acquire() -> std::optional<ArenaPtr>;
/* Returns a previously allocated block to this arena. */
auto Return(ArenaPtr) -> void;
/* Returns the number of blocks that are currently free. */
auto BlocksFree() -> std::size_t;
Arena(const Arena&) = delete;
Arena& operator=(const Arena&) = delete;
private:
std::size_t block_size_;
// The large memory allocation that is divided into blocks.
std::byte* pool_;
// A FreeRTOS queue containing the blocks that are currently unused.
QueueHandle_t free_blocks_;
};
/*
* Wrapper around an ArenaPtr that handles acquiring and returning the block
* through RAII.
*/
class ArenaRef {
public:
static auto Acquire(Arena* a) -> std::optional<ArenaRef>;
explicit ArenaRef(ArenaPtr ptr);
~ArenaRef();
auto Release() -> ArenaPtr;
ArenaRef(ArenaRef&&);
ArenaRef(const ArenaRef&) = delete;
Arena& operator=(const Arena&) = delete;
ArenaPtr ptr;
};
} // namespace memory
Loading…
Cancel
Save