Switch from MessageBuffer to Queue for pipeline comms

custom
jacqueline 2 years ago
parent e7f926e2c3
commit f6dcd845fc
  1. 1
      src/audio/CMakeLists.txt
  2. 103
      src/audio/audio_decoder.cpp
  3. 59
      src/audio/audio_element.cpp
  4. 36
      src/audio/audio_playback.cpp
  5. 138
      src/audio/audio_task.cpp
  6. 145
      src/audio/chunk.cpp
  7. 136
      src/audio/fatfs_audio_input.cpp
  8. 12
      src/audio/i2s_audio_output.cpp
  9. 10
      src/audio/include/audio_decoder.hpp
  10. 63
      src/audio/include/audio_element.hpp
  11. 6
      src/audio/include/audio_playback.hpp
  12. 70
      src/audio/include/chunk.hpp
  13. 16
      src/audio/include/fatfs_audio_input.hpp
  14. 11
      src/audio/include/i2s_audio_output.hpp
  15. 57
      src/audio/include/stream_event.hpp
  16. 12
      src/audio/include/stream_info.hpp
  17. 3
      src/audio/stream_buffer.cpp
  18. 75
      src/audio/stream_event.cpp
  19. 1
      src/audio/stream_message.cpp

@ -2,6 +2,7 @@ idf_component_register(
SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp"
"stream_info.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_info.cpp" "stream_message.cpp" "i2s_audio_output.cpp"
"stream_buffer.cpp" "audio_playback.cpp" "audio_element_handle.cpp" "stream_buffer.cpp" "audio_playback.cpp" "audio_element_handle.cpp"
"stream_event.cpp" "audio_element.cpp"
INCLUDE_DIRS "include" INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span") REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span")

@ -4,6 +4,7 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <memory>
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
@ -14,19 +15,30 @@
#include "audio_element.hpp" #include "audio_element.hpp"
#include "chunk.hpp" #include "chunk.hpp"
#include "fatfs_audio_input.hpp" #include "fatfs_audio_input.hpp"
#include "stream_info.hpp"
static const char* kTag = "DEC";
namespace audio { namespace audio {
static const std::size_t kSamplesPerChunk = 256;
AudioDecoder::AudioDecoder() : IAudioElement(), stream_info_({}) {} AudioDecoder::AudioDecoder() : IAudioElement(), stream_info_({}) {}
AudioDecoder::~AudioDecoder() {} AudioDecoder::~AudioDecoder() {}
auto AudioDecoder::HasUnprocessedInput() -> bool {
return !needs_more_input_ || has_samples_to_send_;
}
auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> { -> cpp::result<void, AudioProcessingError> {
stream_info_ = info; stream_info_ = info;
if (info.ChunkSize()) {
chunk_reader_.emplace(info.ChunkSize().value());
} else {
// TODO.
}
// Reuse the existing codec if we can. This will help with gapless playback, // Reuse the existing codec if we can. This will help with gapless playback,
// since we can potentially just continue to decode as we were before, // since we can potentially just continue to decode as we were before,
// without any setup overhead. // without any setup overhead.
@ -42,71 +54,66 @@ auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info)
return cpp::fail(UNSUPPORTED_STREAM); return cpp::fail(UNSUPPORTED_STREAM);
} }
// TODO: defer until first header read, so we can give better info about
// sample rate, chunk size, etc.
auto downstream_info = StreamEvent::CreateStreamInfo(
input_events_, std::make_unique<StreamInfo>(info));
downstream_info->stream_info->BitsPerSample(32);
downstream_info->stream_info->SampleRate(48'000);
chunk_size_ = 128;
downstream_info->stream_info->ChunkSize(chunk_size_);
SendOrBufferEvent(std::move(downstream_info));
return {}; return {};
} }
auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk) auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<size_t, AudioProcessingError> { -> cpp::result<size_t, AudioProcessingError> {
if (current_codec_ == nullptr) { if (current_codec_ == nullptr || !chunk_reader_) {
// Should never happen, but fail explicitly anyway. // Should never happen, but fail explicitly anyway.
return cpp::fail(UNSUPPORTED_STREAM); return cpp::fail(UNSUPPORTED_STREAM);
} }
current_codec_->SetInput(chunk); current_codec_->SetInput(chunk_reader_->HandleNewData(chunk));
bool has_samples_to_send = false; return {};
bool needs_more_input = false;
std::optional<codecs::ICodec::ProcessingError> error = std::nullopt;
while (1) {
ChunkWriteResult res = chunk_writer_->WriteChunkToStream(
[&](cpp::span<std::byte> buffer) -> std::size_t {
std::size_t bytes_written = 0;
// Continue filling up the output buffer so long as we have samples
// leftover, or are able to synthesize more samples from the input.
while (has_samples_to_send || !needs_more_input) {
if (!has_samples_to_send) {
auto result = current_codec_->ProcessNextFrame();
has_samples_to_send = true;
if (result.has_error()) {
error = result.error();
// End our output stream immediately if the codec barfed.
return 0;
} else {
needs_more_input = result.value();
} }
} else {
auto result = current_codec_->WriteOutputSamples( auto AudioDecoder::Process() -> cpp::result<void, AudioProcessingError> {
buffer.last(buffer.size() - bytes_written)); if (has_samples_to_send_) {
bytes_written += result.first; // Writing samples is relatively quick (it's just a bunch of memcopy's), so
has_samples_to_send = !result.second; // do them all at once.
while (has_samples_to_send_ && !IsOverBuffered()) {
auto buffer = StreamEvent::CreateChunkData(input_events_, chunk_size_);
auto write_res =
current_codec_->WriteOutputSamples(buffer->chunk_data.bytes);
buffer->chunk_data.bytes =
buffer->chunk_data.bytes.first(write_res.first);
has_samples_to_send_ = !write_res.second;
if (!SendOrBufferEvent(std::move(buffer))) {
return {};
} }
} }
return bytes_written; // We will process the next frame during the next call to this method.
},
// TODO
portMAX_DELAY);
switch (res) {
case CHUNK_WRITE_OKAY:
break;
case CHUNK_WRITE_TIMEOUT:
case CHUNK_OUT_OF_DATA:
return {}; return {};
default:
return cpp::fail(IO_ERROR);
}
} }
if (error) { if (!needs_more_input_) {
ESP_LOGE(kTag, "Codec encountered error %d", error.value()); auto res = current_codec_->ProcessNextFrame();
return cpp::fail(IO_ERROR); if (res.has_error()) {
// todo
return {};
} }
needs_more_input_ = res.value();
has_samples_to_send_ = true;
return current_codec_->GetInputPosition(); if (needs_more_input_) {
chunk_reader_->HandleLeftovers(current_codec_->GetInputPosition());
}
} }
auto AudioDecoder::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
// Not used; we delay forever when waiting on IO.
return {}; return {};
} }

@ -0,0 +1,59 @@
#include "audio_element.hpp"
namespace audio {
IAudioElement::IAudioElement()
: input_events_(xQueueCreate(kEventQueueSize, sizeof(StreamEvent))),
output_events_(nullptr),
unprocessed_output_chunks_(0),
buffered_output_(),
current_state_(STATE_RUN) {}
IAudioElement::~IAudioElement() {
// Ensure we don't leak any memory from events leftover in the queue.
while (uxQueueSpacesAvailable(input_events_) < kEventQueueSize) {
StreamEvent* event;
if (xQueueReceive(input_events_, &event, 0)) {
free(event);
} else {
break;
}
}
// Technically there's a race here if someone is still adding to the queue,
// but hopefully the whole pipeline is stopped if an element is being
// destroyed.
vQueueDelete(input_events_);
}
auto IAudioElement::SendOrBufferEvent(std::unique_ptr<StreamEvent> event)
-> bool {
if (event->tag == StreamEvent::CHUNK_DATA) {
unprocessed_output_chunks_++;
}
if (!buffered_output_.empty()) {
// To ensure we send data in order, don't try to send if we've already
// failed to send something.
buffered_output_.push_back(std::move(event));
return false;
}
StreamEvent* raw_event = event.release();
if (!xQueueSend(output_events_, raw_event, 0)) {
buffered_output_.emplace_front(raw_event);
return false;
}
return true;
}
auto IAudioElement::FlushBufferedOutput() -> bool {
while (!buffered_output_.empty()) {
StreamEvent* raw_event = buffered_output_.front().release();
buffered_output_.pop_front();
if (!xQueueSend(output_events_, raw_event, 0)) {
buffered_output_.emplace_front(raw_event);
return false;
}
}
return true;
}
} // namespace audio

@ -18,9 +18,6 @@
namespace audio { namespace audio {
// TODO: idk
static const std::size_t kMinElementBufferSize = 1024;
auto AudioPlayback::create(drivers::GpioExpander* expander, auto AudioPlayback::create(drivers::GpioExpander* expander,
std::shared_ptr<drivers::SdStorage> storage) std::shared_ptr<drivers::SdStorage> storage)
-> cpp::result<std::unique_ptr<AudioPlayback>, Error> { -> cpp::result<std::unique_ptr<AudioPlayback>, Error> {
@ -37,8 +34,6 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
auto playback = std::make_unique<AudioPlayback>(); auto playback = std::make_unique<AudioPlayback>();
// Configure the pipeline // Configure the pipeline
source->InputBuffer(&playback->stream_start_);
sink->OutputBuffer(&playback->stream_end_);
playback->ConnectElements(source.get(), codec.get()); playback->ConnectElements(source.get(), codec.get());
playback->ConnectElements(codec.get(), sink.get()); playback->ConnectElements(codec.get(), sink.get());
@ -52,9 +47,7 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
return playback; return playback;
} }
// TODO(jacqueline): think about sizes AudioPlayback::AudioPlayback() {}
AudioPlayback::AudioPlayback()
: stream_start_(128, 256), stream_end_(128, 256) {}
AudioPlayback::~AudioPlayback() { AudioPlayback::~AudioPlayback() {
for (auto& element : element_handles_) { for (auto& element : element_handles_) {
@ -63,33 +56,16 @@ AudioPlayback::~AudioPlayback() {
} }
auto AudioPlayback::Play(const std::string& filename) -> void { auto AudioPlayback::Play(const std::string& filename) -> void {
StreamInfo info; auto info = std::make_unique<StreamInfo>();
info.Path(filename); info->Path(filename);
auto event = StreamEvent::CreateStreamInfo(nullptr, std::move(info));
std::array<std::byte, 128> dest;
auto len = WriteMessage(
TYPE_STREAM_INFO, [&](auto enc) { return info.Encode(enc); }, dest);
if (len.has_error()) { xQueueSend(input_handle_, event.release(), portMAX_DELAY);
// TODO.
return;
}
// TODO: short delay, return error on fail
xMessageBufferSend(*stream_start_.Handle(), dest.data(), len.value(),
portMAX_DELAY);
} }
auto AudioPlayback::ConnectElements(IAudioElement* src, IAudioElement* sink) auto AudioPlayback::ConnectElements(IAudioElement* src, IAudioElement* sink)
-> void { -> void {
std::size_t chunk_size = src->OutputEventQueue(sink->InputEventQueue());
std::max(src->InputMinChunkSize(), sink->InputMinChunkSize());
std::size_t buffer_size = std::max(kMinElementBufferSize, chunk_size * 2);
auto buffer = std::make_unique<StreamBuffer>(chunk_size, buffer_size);
src->OutputBuffer(buffer.get());
sink->OutputBuffer(buffer.get());
element_buffers_.push_back(std::move(buffer));
} }
} // namespace audio } // namespace audio

@ -3,6 +3,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <cstdint> #include <cstdint>
#include <deque>
#include <memory> #include <memory>
#include "audio_element_handle.hpp" #include "audio_element_handle.hpp"
@ -10,14 +11,16 @@
#include "esp_heap_caps.h" #include "esp_heap_caps.h"
#include "esp_log.h" #include "esp_log.h"
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h" #include "freertos/queue.h"
#include "freertos/stream_buffer.h"
#include "span.hpp" #include "span.hpp"
#include "audio_element.hpp" #include "audio_element.hpp"
#include "chunk.hpp" #include "chunk.hpp"
#include "stream_event.hpp"
#include "stream_info.hpp" #include "stream_info.hpp"
#include "stream_message.hpp" #include "stream_message.hpp"
#include "sys/_stdint.h"
#include "tasks.hpp" #include "tasks.hpp"
namespace audio { namespace audio {
@ -47,84 +50,91 @@ void AudioTaskMain(void* args) {
std::shared_ptr<IAudioElement> element = std::move(real_args->element); std::shared_ptr<IAudioElement> element = std::move(real_args->element);
delete real_args; delete real_args;
ChunkReader chunk_reader = ChunkReader(element->InputBuffer()); // Queue of events that we have received on our input queue, but not yet
// processed.
std::deque<std::unique_ptr<StreamEvent>> pending_events;
while (element->ElementState() != STATE_QUIT) { while (element->ElementState() != STATE_QUIT) {
if (element->ElementState() == STATE_PAUSE) { // First, we pull events from our input queue into pending_events. This
// TODO: park with a condition variable or something? // keeps us responsive to any events that need to be handled immediately.
vTaskDelay(1000); // Then we check if there's any events to flush downstream.
continue; // Then we pass anything requiring processing to the element.
}
bool has_work_to_do =
cpp::result<size_t, AudioProcessingError> process_res; (!pending_events.empty() || element->HasUnflushedOutput() ||
element->HasUnprocessedInput()) &&
// If this element has an input stream, then our top priority is !element->IsOverBuffered();
// processing any chunks from it. Try doing this first, then fall back to
// the other cases. // If we have no new events to process and the element has nothing left to
bool has_received_message = false; // do, then just delay forever waiting for a new event.
ChunkReadResult chunk_res = chunk_reader.ReadChunkFromStream( TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;
[&](cpp::span<std::byte> data) -> std::optional<size_t> {
process_res = element->ProcessChunk(data); StreamEvent* event_ptr = nullptr;
if (process_res.has_value()) { bool has_event =
return process_res.value(); xQueueReceive(element->InputEventQueue(), &event_ptr, ticks_to_wait);
if (has_event && event_ptr != nullptr) {
std::unique_ptr<StreamEvent> event(event_ptr);
if (event->tag == StreamEvent::CHUNK_NOTIFICATION) {
element->OnChunkProcessed();
} else { } else {
return {}; // This isn't an event that needs to be actioned immediately. Add it
// to our work queue.
pending_events.push_back(std::move(event));
} }
}, // Loop again, so that we service all incoming events before doing our
0); // possibly expensive processing.
continue;
if (chunk_res == CHUNK_PROCESSING_ERROR ||
chunk_res == CHUNK_DECODING_ERROR) {
ESP_LOGE(kTag, "failed to process chunk");
break; // TODO.
} else if (chunk_res == CHUNK_STREAM_ENDED) {
has_received_message = true;
} }
if (has_received_message) { // We have no new events. Next, see if there's anything that needs to be
auto message = chunk_reader.GetLastMessage(); // flushed.
MessageType type = ReadMessageType(message); if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
if (type == TYPE_STREAM_INFO) { // We had things to flush, but couldn't send it all. This probably
auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message); // implies that the downstream element is having issues servicing its
if (parse_res.has_error()) { // input queue, so hold off for a moment before retrying.
ESP_LOGE(kTag, "failed to parse stream info"); ESP_LOGW(kTag, "failed to flush buffered output");
break; // TODO. vTaskDelay(pdMS_TO_TICKS(100));
} continue;
auto info_res = element->ProcessStreamInfo(parse_res.value());
if (info_res.has_error()) {
ESP_LOGE(kTag, "failed to process stream info");
break; // TODO.
} }
if (element->HasUnprocessedInput()) {
auto process_res = element->Process();
if (!process_res.has_error() || process_res.error() != OUT_OF_DATA) {
// TODO: log!
} }
continue;
} }
// Chunk reading must have timed out, or we don't have an input stream. // The element ran out of data, so now it's time to let it process more
ElementState state = element->ElementState(); // input.
if (state == STATE_PAUSE) { while (!pending_events.empty()) {
element->PrepareForPause(); auto event = std::move(pending_events.front());
pending_events.pop_front();
vTaskSuspend(NULL);
// Zzzzzz... if (event->tag == StreamEvent::STREAM_INFO) {
auto process_res = element->ProcessStreamInfo(*event->stream_info);
// When we wake up, skip straight to the start of the loop again. if (process_res.has_error()) {
// TODO(jacqueline)
ESP_LOGE(kTag, "failed to process stream info");
}
} else if (event->tag == StreamEvent::CHUNK_DATA) {
StreamEvent* callback = new StreamEvent();
callback->source = element->InputEventQueue();
callback->tag = StreamEvent::CHUNK_NOTIFICATION;
if (!xQueueSend(event->source, callback, 0)) {
// TODO: log? crash? hmm.
pending_events.push_front(std::move(event));
continue; continue;
} else if (state == STATE_QUIT) {
break;
} }
// Signal the element to do any of its idle tasks. auto process_chunk_res =
auto process_error = element->ProcessIdle(); element->ProcessChunk(event->chunk_data.bytes);
if (process_error.has_error()) { if (process_chunk_res.has_error()) {
auto err = process_error.error(); // TODO(jacqueline)
if (err == OUT_OF_DATA) { ESP_LOGE(kTag, "failed to process chunk");
// If we ran out of data, then place ourselves into the pause state.
// We will be woken up when there's something to do.
element->ElementState(STATE_PAUSE);
continue; continue;
} else { }
ESP_LOGE(kTag, "failed to process idle");
break; // TODO.
} }
} }
} }

@ -14,130 +14,37 @@
namespace audio { namespace audio {
static const char* kTag = "chunk"; ChunkReader::ChunkReader(std::size_t chunk_size)
: raw_working_buffer_(static_cast<std::byte*>(
ChunkWriter::ChunkWriter(StreamBuffer* buffer) heap_caps_malloc(chunk_size * 1.5, MALLOC_CAP_SPIRAM))),
: stream_(buffer), leftover_bytes_(0) {} working_buffer_(raw_working_buffer_, chunk_size * 1.5) {}
ChunkWriter::~ChunkWriter() {} ChunkReader::~ChunkReader() {
free(raw_working_buffer_);
auto ChunkWriter::Reset() -> void { }
leftover_bytes_ = 0;
} auto ChunkReader::HandleNewData(cpp::span<std::byte> data)
-> cpp::span<std::byte> {
auto ChunkWriter::WriteChunkToStream( // Copy the new data onto the front for anything that was left over from the
std::function<size_t(cpp::span<std::byte>)> callback, // last portion. Note: this could be optimised for the '0 leftover bytes'
TickType_t max_wait) -> ChunkWriteResult { // case, which technically shouldn't need a copy.
cpp::span<std::byte> write_buffer = stream_->WriteBuffer(); cpp::span<std::byte> new_data_dest = working_buffer_.subspan(leftover_bytes_);
// First, write out our chunk header so we know how much space to give to std::copy(data.begin(), data.end(), new_data_dest.begin());
// the callback. last_data_in_working_buffer_ =
auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer); working_buffer_.first(leftover_bytes_ + data.size());
if (header_size.has_error()) {
return CHUNK_ENCODING_ERROR;
}
// Now we can ask the callback to fill the remaining space. If the previous
// call to this method timed out, then we may already have the data we need
// in our write buffer.
size_t chunk_size;
if (leftover_bytes_ > 0) {
chunk_size = leftover_bytes_;
} else {
chunk_size = std::invoke(
callback,
write_buffer.subspan(header_size.value(),
write_buffer.size() - header_size.value()));
}
if (chunk_size == 0) {
// They had nothing for us, so bail out.
return CHUNK_OUT_OF_DATA;
}
// Try to write to the buffer. Note the return type here will be either 0 or
// header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
size_t intended_write_size = header_size.value() + chunk_size;
ESP_LOGI(kTag, "writing chunk of size %d", intended_write_size);
size_t actual_write_size =
xMessageBufferSend(stream_->Handle(), write_buffer.data(),
intended_write_size, max_wait);
if (actual_write_size == 0) {
leftover_bytes_ = chunk_size;
return CHUNK_WRITE_TIMEOUT;
} else {
leftover_bytes_ = 0;
}
return CHUNK_WRITE_OKAY;
}
ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {}
ChunkReader::~ChunkReader() {}
auto ChunkReader::Reset() -> void {
leftover_bytes_ = 0; leftover_bytes_ = 0;
last_message_size_ = 0; return last_data_in_working_buffer_;
}
auto ChunkReader::GetLastMessage() -> cpp::span<std::byte> {
return stream_->ReadBuffer().subspan(leftover_bytes_, last_message_size_);
}
auto ChunkReader::ReadChunkFromStream(
std::function<std::optional<size_t>(cpp::span<std::byte>)> callback,
TickType_t max_wait) -> ChunkReadResult {
// First, wait for a message to arrive over the buffer.
cpp::span<std::byte> new_data_dest = stream_->ReadBuffer().last(
stream_->ReadBuffer().size() - leftover_bytes_);
ESP_LOGI(kTag, "reading chunk of size %d", new_data_dest.size());
last_message_size_ = xMessageBufferReceive(
stream_->Handle(), new_data_dest.data(), new_data_dest.size(), max_wait);
if (last_message_size_ == 0) {
return CHUNK_READ_TIMEOUT;
}
cpp::span<std::byte> new_data = GetLastMessage();
MessageType type = ReadMessageType(new_data);
if (type != TYPE_CHUNK_HEADER) {
// This message wasn't for us, so let the caller handle it.
Reset();
return CHUNK_STREAM_ENDED;
} }
// Work the size and position of the chunk. auto ChunkReader::HandleLeftovers(std::size_t bytes_used) -> void {
auto chunk_data = GetAdditionalData(new_data); leftover_bytes_ = last_data_in_working_buffer_.size() - bytes_used;
// Now we need to stick the end of the last chunk (if it exists) onto the
// front of the new chunk. Do it this way around bc we assume the old chunk
// is shorter, and therefore faster to move.
cpp::span<std::byte> leftover_data =
stream_->ReadBuffer().first(leftover_bytes_);
cpp::span<std::byte> combined_data(chunk_data.data() - leftover_data.size(),
leftover_data.size() + chunk_data.size());
if (leftover_bytes_ > 0) { if (leftover_bytes_ > 0) {
std::copy_backward(leftover_data.begin(), leftover_data.end(), auto data_to_keep = last_data_in_working_buffer_.last(leftover_bytes_);
combined_data.begin()); // Copy backwards, since if more than half of the data was unused then the
} // source and destination will overlap.
std::copy_backward(data_to_keep.begin(), data_to_keep.end(),
// Tell the callback about the new data. working_buffer_.begin());
std::optional<size_t> amount_processed = std::invoke(callback, combined_data);
if (!amount_processed) {
return CHUNK_PROCESSING_ERROR;
} }
// Prepare for the next iteration.
leftover_bytes_ = combined_data.size() - amount_processed.value();
if (leftover_bytes_ > 0) {
std::copy(combined_data.begin() + amount_processed.value(),
combined_data.end(), stream_->ReadBuffer().begin());
return CHUNK_LEFTOVER_DATA;
}
return CHUNK_READ_OKAY;
} }
} // namespace audio } // namespace audio

@ -1,5 +1,6 @@
#include "fatfs_audio_input.hpp" #include "fatfs_audio_input.hpp"
#include <algorithm>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <string> #include <string>
@ -10,33 +11,26 @@
#include "audio_element.hpp" #include "audio_element.hpp"
#include "chunk.hpp" #include "chunk.hpp"
#include "stream_buffer.hpp" #include "stream_buffer.hpp"
#include "stream_event.hpp"
#include "stream_message.hpp" #include "stream_message.hpp"
static const char* kTag = "SRC"; static const char* kTag = "SRC";
namespace audio { namespace audio {
static const TickType_t kServiceInterval = pdMS_TO_TICKS(50); // 32KiB to match the minimum himen region size.
static const std::size_t kChunkSize = 1024;
static const std::size_t kFileBufferSize = 1024 * 128;
static const std::size_t kMinFileReadSize = 1024 * 4;
FatfsAudioInput::FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage) FatfsAudioInput::FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage)
: IAudioElement(), : IAudioElement(),
storage_(storage), storage_(storage),
raw_file_buffer_(static_cast<std::byte*>(
heap_caps_malloc(kFileBufferSize, MALLOC_CAP_SPIRAM))),
file_buffer_(raw_file_buffer_, kFileBufferSize),
file_buffer_read_pos_(file_buffer_.begin()),
file_buffer_write_pos_(file_buffer_.begin()),
current_file_(), current_file_(),
is_file_open_(false), is_file_open_(false) {}
chunk_writer_(nullptr) {
// TODO: create our chunk writer whenever the output buffer changes.
}
FatfsAudioInput::~FatfsAudioInput() { FatfsAudioInput::~FatfsAudioInput() {}
free(raw_file_buffer_);
auto FatfsAudioInput::HasUnprocessedInput() -> bool {
return is_file_open_;
} }
auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info)
@ -57,17 +51,12 @@ auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info)
is_file_open_ = true; is_file_open_ = true;
auto write_size = std::unique_ptr<StreamInfo> new_info = std::make_unique<StreamInfo>(info);
WriteMessage(TYPE_STREAM_INFO, new_info->ChunkSize(kChunkSize);
std::bind(&StreamInfo::Encode, info, std::placeholders::_1),
output_buffer_->WriteBuffer());
if (write_size.has_error()) { auto event =
return cpp::fail(IO_ERROR); StreamEvent::CreateStreamInfo(input_events_, std::move(new_info));
} else { SendOrBufferEvent(std::move(event));
xMessageBufferSend(output_buffer_, output_buffer_->WriteBuffer().data(),
write_size.value(), portMAX_DELAY);
}
return {}; return {};
} }
@ -77,110 +66,29 @@ auto FatfsAudioInput::ProcessChunk(const cpp::span<std::byte>& chunk)
return cpp::fail(UNSUPPORTED_STREAM); return cpp::fail(UNSUPPORTED_STREAM);
} }
auto FatfsAudioInput::GetRingBufferDistance() const -> size_t { auto FatfsAudioInput::Process() -> cpp::result<void, AudioProcessingError> {
if (file_buffer_read_pos_ == file_buffer_write_pos_) {
return 0;
}
if (file_buffer_read_pos_ < file_buffer_write_pos_) {
return file_buffer_write_pos_ - file_buffer_read_pos_;
}
return
// Read position to end of buffer.
(file_buffer_.end() - file_buffer_read_pos_)
// Start of buffer to write position.
+ (file_buffer_write_pos_ - file_buffer_.begin());
}
auto FatfsAudioInput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
// First, see if we're able to fill up the input buffer with any more of the
// file's contents.
if (is_file_open_) { if (is_file_open_) {
size_t ringbuf_distance = GetRingBufferDistance(); auto dest_event = StreamEvent::CreateChunkData(input_events_, kChunkSize);
if (file_buffer_.size() - ringbuf_distance > kMinFileReadSize) {
size_t read_size;
if (file_buffer_write_pos_ < file_buffer_read_pos_) {
// Don't worry about the start of buffer -> read pos size; we can get to
// it next iteration.
read_size = file_buffer_read_pos_ - file_buffer_write_pos_;
} else {
read_size = file_buffer_.begin() - file_buffer_write_pos_;
}
ESP_LOGI(kTag, "reading up to %d bytes", (int)read_size);
UINT bytes_read = 0; UINT bytes_read = 0;
FRESULT result = FRESULT result =
f_read(&current_file_, std::addressof(file_buffer_write_pos_), f_read(&current_file_, dest_event->chunk_data.raw_bytes.get(),
read_size, &bytes_read); kChunkSize, &bytes_read);
if (result != FR_OK) { if (result != FR_OK) {
ESP_LOGE(kTag, "file I/O error %d", result); ESP_LOGE(kTag, "file I/O error %d", result);
return cpp::fail(IO_ERROR); return cpp::fail(IO_ERROR);
} }
ESP_LOGI(kTag, "actual read size %d bytes", (int)bytes_read); dest_event->chunk_data.bytes =
dest_event->chunk_data.bytes.first(bytes_read);
SendOrBufferEvent(std::move(dest_event));
if (f_eof(&current_file_)) { if (f_eof(&current_file_)) {
f_close(&current_file_); f_close(&current_file_);
is_file_open_ = false; is_file_open_ = false;
// TODO: open the next file?
}
file_buffer_write_pos_ += bytes_read;
if (file_buffer_write_pos_ == file_buffer_.end()) {
file_buffer_write_pos_ = file_buffer_.begin();
}
}
} else if (GetRingBufferDistance() == 0) {
// We have no file open, and no data waiting to be written. We're out of
// stuff to do, so signal a pause.
return cpp::fail(OUT_OF_DATA);
}
// Now stream data into the output buffer until it's full.
while (GetRingBufferDistance() > 0) {
ESP_LOGI(kTag, "writing up to %d bytes", (int)GetRingBufferDistance());
ChunkWriteResult result = chunk_writer_->WriteChunkToStream(
[&](cpp::span<std::byte> d) { return SendChunk(d); }, kServiceInterval);
switch (result) {
case CHUNK_WRITE_OKAY:
break;
case CHUNK_WRITE_TIMEOUT:
case CHUNK_OUT_OF_DATA:
// Both of these are fine; we will pick back up where we left off in
// the next idle call.
return {};
default:
return cpp::fail(IO_ERROR);
} }
} }
// We've finished writing out chunks, but there may be more of the file to
// read. Return, and begin again in the next idle call.
return {}; return {};
} }
auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t {
if (file_buffer_read_pos_ == file_buffer_write_pos_) {
return 0;
}
std::size_t chunk_size;
if (file_buffer_read_pos_ > file_buffer_write_pos_) {
chunk_size = file_buffer_.end() - file_buffer_read_pos_;
} else {
chunk_size = file_buffer_write_pos_ - file_buffer_read_pos_;
}
chunk_size = std::min(chunk_size, dest.size());
cpp::span<std::byte> source(file_buffer_read_pos_, chunk_size);
std::copy(source.begin(), source.end(), dest.begin());
file_buffer_read_pos_ = file_buffer_read_pos_ + chunk_size;
if (file_buffer_read_pos_ == file_buffer_.end()) {
file_buffer_read_pos_ = file_buffer_.begin();
}
return chunk_size;
}
} // namespace audio } // namespace audio

@ -92,22 +92,12 @@ auto I2SAudioOutput::ProcessChunk(const cpp::span<std::byte>& chunk)
return dac_->WriteData(chunk, portMAX_DELAY); return dac_->WriteData(chunk, portMAX_DELAY);
} }
auto I2SAudioOutput::IdleTimeout() const -> TickType_t { auto I2SAudioOutput::Process() -> cpp::result<void, AudioProcessingError> {
return kIdleTimeBeforeMute;
}
auto I2SAudioOutput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
// TODO(jacqueline): Consider powering down the dac completely maybe? // TODO(jacqueline): Consider powering down the dac completely maybe?
SetSoftMute(true); SetSoftMute(true);
return {}; return {};
} }
auto I2SAudioOutput::PrepareForPause() -> void {
// TODO(jacqueline): We ideally want to ensure we have enough samples in the
// DMA buffer here, so that soft mute can work properly.
SetSoftMute(true);
}
auto I2SAudioOutput::SetVolume(uint8_t volume) -> void { auto I2SAudioOutput::SetVolume(uint8_t volume) -> void {
volume_ = volume; volume_ = volume;
if (!is_soft_muted_) { if (!is_soft_muted_) {

@ -4,6 +4,7 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include "chunk.hpp"
#include "ff.h" #include "ff.h"
#include "span.hpp" #include "span.hpp"
@ -30,11 +31,13 @@ class AudioDecoder : public IAudioElement {
return 1024; return 1024;
} }
auto HasUnprocessedInput() -> bool override;
auto ProcessStreamInfo(const StreamInfo& info) auto ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> override; -> cpp::result<void, AudioProcessingError> override;
auto ProcessChunk(const cpp::span<std::byte>& chunk) auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override; -> cpp::result<std::size_t, AudioProcessingError> override;
auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override; auto Process() -> cpp::result<void, AudioProcessingError> override;
AudioDecoder(const AudioDecoder&) = delete; AudioDecoder(const AudioDecoder&) = delete;
AudioDecoder& operator=(const AudioDecoder&) = delete; AudioDecoder& operator=(const AudioDecoder&) = delete;
@ -42,8 +45,11 @@ class AudioDecoder : public IAudioElement {
private: private:
std::unique_ptr<codecs::ICodec> current_codec_; std::unique_ptr<codecs::ICodec> current_codec_;
std::optional<StreamInfo> stream_info_; std::optional<StreamInfo> stream_info_;
std::optional<ChunkReader> chunk_reader_;
std::unique_ptr<ChunkWriter> chunk_writer_; std::size_t chunk_size_;
bool has_samples_to_send_;
bool needs_more_input_;
}; };
} // namespace audio } // namespace audio

@ -2,6 +2,8 @@
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
#include <deque>
#include <memory>
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
@ -12,6 +14,7 @@
#include "span.hpp" #include "span.hpp"
#include "stream_buffer.hpp" #include "stream_buffer.hpp"
#include "stream_event.hpp"
#include "stream_info.hpp" #include "stream_info.hpp"
#include "types.hpp" #include "types.hpp"
@ -36,6 +39,8 @@ enum AudioProcessingError {
OUT_OF_DATA, OUT_OF_DATA,
}; };
static const size_t kEventQueueSize = 8;
/* /*
* One indepedentent part of an audio pipeline. Each element has an input and * One indepedentent part of an audio pipeline. Each element has an input and
* output message stream, and is responsible for taking data from the input * output message stream, and is responsible for taking data from the input
@ -51,38 +56,38 @@ enum AudioProcessingError {
*/ */
class IAudioElement { class IAudioElement {
public: public:
IAudioElement() IAudioElement();
: input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {} virtual ~IAudioElement();
virtual ~IAudioElement() {}
/* /*
* Returns the stack size in bytes that this element requires. This should * Returns the stack size in bytes that this element requires. This should
* be tuned according to the observed stack size of each element, as different * be tuned according to the observed stack size of each element, as different
* elements have fairly different stack requirements. * elements have fairly different stack requirements (particular decoders).
*/ */
virtual auto StackSizeBytes() const -> std::size_t { return 2048; }; virtual auto StackSizeBytes() const -> std::size_t { return 2048; };
/*
* How long to wait for new data on the input stream before triggering a call
* to ProcessIdle(). If this is portMAX_DELAY (the default), then ProcessIdle
* will never be called.
*/
virtual auto IdleTimeout() const -> TickType_t { return 10; }
virtual auto InputMinChunkSize() const -> std::size_t { return 0; } virtual auto InputMinChunkSize() const -> std::size_t { return 0; }
/* Returns this element's input buffer. */ /* Returns this element's input buffer. */
auto InputBuffer() const -> StreamBuffer* { return input_buffer_; } auto InputEventQueue() const -> QueueHandle_t { return input_events_; }
/* Returns this element's output buffer. */ /* Returns this element's output buffer. */
auto OutputBuffer() const -> StreamBuffer* { return output_buffer_; } auto OutputEventQueue() const -> QueueHandle_t { return output_events_; }
auto OutputEventQueue(const QueueHandle_t q) -> void { output_events_ = q; }
auto InputBuffer(StreamBuffer* b) -> void { input_buffer_ = b; } auto HasUnflushedOutput() -> bool { return !buffered_output_.empty(); }
auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; } virtual auto HasUnprocessedInput() -> bool = 0;
auto ElementState() const -> ElementState { return state_; } auto IsOverBuffered() -> bool { return unprocessed_output_chunks_ > 4; }
auto ElementState(enum ElementState e) -> void { state_ = e; }
auto FlushBufferedOutput() -> bool;
auto ElementState() const -> ElementState { return current_state_; }
auto ElementState(enum ElementState e) -> void { current_state_ = e; }
virtual auto OnChunkProcessed() -> void { unprocessed_output_chunks_--; }
/* /*
* Called when a StreamInfo message is received. Used to configure this * Called when a StreamInfo message is received. Used to configure this
@ -105,14 +110,26 @@ class IAudioElement {
* time. This could be used to synthesize output, or to save memory by * time. This could be used to synthesize output, or to save memory by
* releasing unused resources. * releasing unused resources.
*/ */
virtual auto ProcessIdle() -> cpp::result<void, AudioProcessingError> = 0; virtual auto Process() -> cpp::result<void, AudioProcessingError> = 0;
virtual auto PrepareForPause() -> void{};
protected: protected:
StreamBuffer* input_buffer_; auto SendOrBufferEvent(std::unique_ptr<StreamEvent> event) -> bool;
StreamBuffer* output_buffer_;
std::atomic<enum ElementState> state_; // Queue for events coming into this element. Owned by us.
QueueHandle_t input_events_;
// Queue for events going into the next element. Not owned by us, may be null
// if we're not yet in a pipeline.
// FIXME: it would be nicer if this was non-nullable.
QueueHandle_t output_events_;
// The number of output chunks that we have generated, but have not yet been
// processed by the next element in the pipeline. This includes any chunks
// that are currently help in buffered_output_.
int unprocessed_output_chunks_;
// Output events that have been generated, but are yet to be sent downstream.
std::deque<std::unique_ptr<StreamEvent>> buffered_output_;
enum ElementState current_state_;
}; };
} // namespace audio } // namespace audio

@ -43,11 +43,9 @@ class AudioPlayback {
private: private:
auto ConnectElements(IAudioElement* src, IAudioElement* sink) -> void; auto ConnectElements(IAudioElement* src, IAudioElement* sink) -> void;
StreamBuffer stream_start_;
StreamBuffer stream_end_;
std::vector<std::unique_ptr<StreamBuffer>> element_buffers_;
std::vector<std::unique_ptr<AudioElementHandle>> element_handles_; std::vector<std::unique_ptr<AudioElementHandle>> element_handles_;
QueueHandle_t input_handle_;
}; };
} // namespace audio } // namespace audio

@ -2,6 +2,7 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
@ -17,68 +18,12 @@
namespace audio { namespace audio {
enum ChunkWriteResult {
// Returned when the callback does not write any data.
CHUNK_WRITE_OKAY,
// Returned when the callback does not write any data.
CHUNK_OUT_OF_DATA,
// Returned when there is an error encoding a chunk header using cbor.
CHUNK_ENCODING_ERROR,
// Returned when max_wait expires without room in the stream buffer becoming
// available.
CHUNK_WRITE_TIMEOUT,
};
class ChunkWriter {
public:
explicit ChunkWriter(StreamBuffer* buffer);
~ChunkWriter();
auto Reset() -> void;
auto GetLastMessage() -> cpp::span<std::byte>;
/*
* Invokes the given callback to receive data, breaks the received data up
* into chunks with headers, and writes those chunks to the given output
* stream.
*
* The callback will be invoked with a byte buffer and its size. The callback
* should write as much data as it can to this buffer, and then return the
* number of bytes it wrote. Return a value of 0 to indicate that there is no
* more input to read.
*/
auto WriteChunkToStream(std::function<size_t(cpp::span<std::byte>)> callback,
TickType_t max_wait) -> ChunkWriteResult;
private:
StreamBuffer* stream_;
std::size_t leftover_bytes_ = 0;
};
enum ChunkReadResult {
CHUNK_READ_OKAY,
// Returned when the chunk was read successfully, but the consumer did not
// use all of the data.
CHUNK_LEFTOVER_DATA,
// Returned an error in parsing the cbor-encoded header.
CHUNK_DECODING_ERROR,
// Returned when max_wait expired before any data was read.
CHUNK_READ_TIMEOUT,
// Returned when a non-chunk message is received.
CHUNK_STREAM_ENDED,
// Returned when the processing callback does not return a value.
CHUNK_PROCESSING_ERROR,
};
class ChunkReader { class ChunkReader {
public: public:
explicit ChunkReader(StreamBuffer* buffer); explicit ChunkReader(std::size_t chunk_size);
~ChunkReader(); ~ChunkReader();
auto Reset() -> void; auto HandleLeftovers(std::size_t bytes_used) -> void;
auto GetLastMessage() -> cpp::span<std::byte>;
/* /*
* Reads chunks of data from the given input stream, and invokes the given * Reads chunks of data from the given input stream, and invokes the given
@ -92,14 +37,13 @@ class ChunkReader {
* If this function encounters a message in the stream that is not a chunk, it * If this function encounters a message in the stream that is not a chunk, it
* will place the message at the start of the working_buffer and then return. * will place the message at the start of the working_buffer and then return.
*/ */
auto ReadChunkFromStream( auto HandleNewData(cpp::span<std::byte> data) -> cpp::span<std::byte>;
std::function<std::optional<std::size_t>(cpp::span<std::byte>)> callback,
TickType_t max_wait) -> ChunkReadResult;
private: private:
StreamBuffer* stream_; std::byte* raw_working_buffer_;
cpp::span<std::byte> working_buffer_;
cpp::span<std::byte> last_data_in_working_buffer_;
std::size_t leftover_bytes_ = 0; std::size_t leftover_bytes_ = 0;
std::size_t last_message_size_ = 0;
}; };
} // namespace audio } // namespace audio

@ -22,32 +22,22 @@ class FatfsAudioInput : public IAudioElement {
explicit FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage); explicit FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage);
~FatfsAudioInput(); ~FatfsAudioInput();
auto HasUnprocessedInput() -> bool override;
auto ProcessStreamInfo(const StreamInfo& info) auto ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> override; -> cpp::result<void, AudioProcessingError> override;
auto ProcessChunk(const cpp::span<std::byte>& chunk) auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override; -> cpp::result<std::size_t, AudioProcessingError> override;
auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override; auto Process() -> cpp::result<void, AudioProcessingError> override;
auto SendChunk(cpp::span<std::byte> dest) -> size_t;
FatfsAudioInput(const FatfsAudioInput&) = delete; FatfsAudioInput(const FatfsAudioInput&) = delete;
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
private: private:
auto GetRingBufferDistance() const -> size_t;
std::shared_ptr<drivers::SdStorage> storage_; std::shared_ptr<drivers::SdStorage> storage_;
std::byte* raw_file_buffer_;
cpp::span<std::byte> file_buffer_;
cpp::span<std::byte>::iterator file_buffer_read_pos_;
cpp::span<std::byte>::iterator pending_read_pos_;
cpp::span<std::byte>::iterator file_buffer_write_pos_;
FIL current_file_; FIL current_file_;
bool is_file_open_; bool is_file_open_;
std::unique_ptr<ChunkWriter> chunk_writer_;
}; };
} // namespace audio } // namespace audio

@ -22,19 +22,14 @@ class I2SAudioOutput : public IAudioElement {
std::unique_ptr<drivers::AudioDac> dac); std::unique_ptr<drivers::AudioDac> dac);
~I2SAudioOutput(); ~I2SAudioOutput();
auto InputMinChunkSize() const -> std::size_t override { // TODO.
// TODO(jacqueline): work out a good value here. Maybe similar to the total auto HasUnprocessedInput() -> bool override { return false; }
// DMA buffer size?
return 128;
}
auto IdleTimeout() const -> TickType_t override;
auto ProcessStreamInfo(const StreamInfo& info) auto ProcessStreamInfo(const StreamInfo& info)
-> cpp::result<void, AudioProcessingError> override; -> cpp::result<void, AudioProcessingError> override;
auto ProcessChunk(const cpp::span<std::byte>& chunk) auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override; -> cpp::result<std::size_t, AudioProcessingError> override;
auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override; auto Process() -> cpp::result<void, AudioProcessingError> override;
auto PrepareForPause() -> void override;
I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput(const I2SAudioOutput&) = delete;
I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete;

@ -0,0 +1,57 @@
#pragma once
#include <memory>
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "span.hpp"
#include "stream_info.hpp"
namespace audio {
struct StreamEvent {
static auto CreateStreamInfo(QueueHandle_t source,
std::unique_ptr<StreamInfo> payload)
-> std::unique_ptr<StreamEvent>;
static auto CreateChunkData(QueueHandle_t source, std::size_t chunk_size)
-> std::unique_ptr<StreamEvent>;
static auto CreateChunkNotification(QueueHandle_t source)
-> std::unique_ptr<StreamEvent>;
StreamEvent();
~StreamEvent();
StreamEvent(StreamEvent&&);
QueueHandle_t source;
enum {
UNINITIALISED,
STREAM_INFO,
CHUNK_DATA,
CHUNK_NOTIFICATION,
} tag;
union {
std::unique_ptr<StreamInfo> stream_info;
// Scott Meyers says:
// `About the only situation I can conceive of when a std::unique_ptr<T[]>
// would make sense would be when you’re using a C-like API that returns a
// raw pointer to a heap array that you assume ownership of.`
// :-)
struct {
std::unique_ptr<std::byte*> raw_bytes;
cpp::span<std::byte> bytes;
} chunk_data;
// FIXME: It would be nice to also support a pointer to himem data here, to
// save a little ordinary heap space.
};
StreamEvent(const StreamEvent&) = delete;
StreamEvent& operator=(const StreamEvent&) = delete;
};
} // namespace audio

@ -7,6 +7,7 @@
#include "cbor.h" #include "cbor.h"
#include "result.hpp" #include "result.hpp"
#include "sys/_stdint.h"
namespace audio { namespace audio {
@ -24,14 +25,24 @@ class StreamInfo {
auto Channels() const -> const std::optional<uint8_t>& { return channels_; } auto Channels() const -> const std::optional<uint8_t>& { return channels_; }
auto BitsPerSample(uint8_t bpp) -> void { bits_per_sample_ = bpp; }
auto BitsPerSample() const -> const std::optional<uint8_t>& { auto BitsPerSample() const -> const std::optional<uint8_t>& {
return bits_per_sample_; return bits_per_sample_;
} }
auto SampleRate(uint16_t rate) -> void { sample_rate_ = rate; }
auto SampleRate() const -> const std::optional<uint16_t>& { auto SampleRate() const -> const std::optional<uint16_t>& {
return sample_rate_; return sample_rate_;
} }
auto ChunkSize() const -> const std::optional<std::size_t>& {
return chunk_size_;
}
auto ChunkSize(std::size_t s) -> void { chunk_size_ = s; }
auto Encode(CborEncoder& enc) -> std::optional<CborError>; auto Encode(CborEncoder& enc) -> std::optional<CborError>;
private: private:
@ -39,6 +50,7 @@ class StreamInfo {
std::optional<uint8_t> channels_; std::optional<uint8_t> channels_;
std::optional<uint8_t> bits_per_sample_; std::optional<uint8_t> bits_per_sample_;
std::optional<uint16_t> sample_rate_; std::optional<uint16_t> sample_rate_;
std::optional<size_t> chunk_size_;
}; };
} // namespace audio } // namespace audio

@ -19,7 +19,8 @@ StreamBuffer::StreamBuffer(std::size_t chunk_size, std::size_t buffer_size)
output_chunk_(raw_output_chunk_, chunk_size) { output_chunk_(raw_output_chunk_, chunk_size) {
assert(input_chunk_.size() <= buffer_size); assert(input_chunk_.size() <= buffer_size);
assert(output_chunk_.size() <= buffer_size); assert(output_chunk_.size() <= buffer_size);
ESP_LOGI("streambuf", "created buffer of chunk size %d, total size %d", chunk_size, buffer_size); ESP_LOGI("streambuf", "created buffer of chunk size %d, total size %d",
chunk_size, buffer_size);
} }
StreamBuffer::~StreamBuffer() { StreamBuffer::~StreamBuffer() {

@ -0,0 +1,75 @@
#include "stream_event.hpp"
#include <cstddef>
#include <memory>
namespace audio {
auto StreamEvent::CreateStreamInfo(QueueHandle_t source,
std::unique_ptr<StreamInfo> payload)
-> std::unique_ptr<StreamEvent> {
auto event = std::make_unique<StreamEvent>();
event->tag = StreamEvent::STREAM_INFO;
event->source = source;
event->stream_info = std::move(payload);
return event;
}
auto StreamEvent::CreateChunkData(QueueHandle_t source, std::size_t chunk_size)
-> std::unique_ptr<StreamEvent> {
auto event = std::make_unique<StreamEvent>();
event->tag = StreamEvent::CHUNK_DATA;
event->source = source;
auto raw_bytes =
static_cast<std::byte*>(heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM));
event->chunk_data.raw_bytes = std::make_unique<std::byte*>(raw_bytes);
event->chunk_data.bytes = cpp::span<std::byte>(raw_bytes, chunk_size);
return event;
}
auto StreamEvent::CreateChunkNotification(QueueHandle_t source)
-> std::unique_ptr<StreamEvent> {
auto event = std::make_unique<StreamEvent>();
event->tag = StreamEvent::CHUNK_NOTIFICATION;
event->source = source;
return event;
}
StreamEvent::StreamEvent() : tag(StreamEvent::UNINITIALISED) {}
StreamEvent::~StreamEvent() {
switch (tag) {
case UNINITIALISED:
break;
case STREAM_INFO:
stream_info.reset();
break;
case CHUNK_DATA:
chunk_data.raw_bytes.reset();
break;
case CHUNK_NOTIFICATION:
break;
}
}
StreamEvent::StreamEvent(StreamEvent&& other) {
tag = other.tag;
source = other.source;
switch (tag) {
case UNINITIALISED:
break;
case STREAM_INFO:
stream_info = std::move(other.stream_info);
break;
case CHUNK_DATA:
chunk_data = std::move(other.chunk_data);
break;
case CHUNK_NOTIFICATION:
break;
}
other.tag = StreamEvent::UNINITIALISED;
}
} // namespace audio

@ -3,7 +3,6 @@
#include <cstdint> #include <cstdint>
#include "cbor.h" #include "cbor.h"
#include "esp-idf/components/cbor/tinycbor/src/cbor.h"
#include "span.hpp" #include "span.hpp"
namespace audio { namespace audio {

Loading…
Cancel
Save