From 2056cad0ab7b805f0ed5629b100b50f8ea9e127e Mon Sep 17 00:00:00 2001 From: jacqueline Date: Thu, 12 Jan 2023 14:28:52 +1100 Subject: [PATCH] WIP --- src/audio/audio_decoder.cpp | 62 +++++++++++--------- src/audio/audio_playback.cpp | 10 ++-- src/audio/audio_task.cpp | 43 ++++++++++++-- src/audio/chunk.cpp | 78 +++++++++++++++---------- src/audio/fatfs_audio_input.cpp | 36 ++++++------ src/audio/i2s_audio_output.cpp | 6 ++ src/audio/include/audio_decoder.hpp | 2 + src/audio/include/audio_element.hpp | 18 +++++- src/audio/include/audio_playback.hpp | 9 ++- src/audio/include/audio_task.hpp | 4 +- src/audio/include/chunk.hpp | 40 +++++++++---- src/audio/include/fatfs_audio_input.hpp | 2 + src/audio/include/i2s_audio_output.hpp | 1 + src/audio/include/stream_buffer.hpp | 22 +++++++ 14 files changed, 233 insertions(+), 100 deletions(-) diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp index 872b7ead..88ddc323 100644 --- a/src/audio/audio_decoder.cpp +++ b/src/audio/audio_decoder.cpp @@ -57,35 +57,45 @@ auto AudioDecoder::ProcessChunk(const cpp::span& chunk) bool has_samples_to_send = false; bool needs_more_input = false; std::optional error = std::nullopt; - WriteChunksToStream( - output_buffer_, - [&](cpp::span buffer) -> std::size_t { - std::size_t bytes_written = 0; - // Continue filling up the output buffer so long as we have samples - // leftover, or are able to synthesize more samples from the input. - while (has_samples_to_send || !needs_more_input) { - if (!has_samples_to_send) { - auto result = current_codec_->ProcessNextFrame(); - has_samples_to_send = true; - if (result.has_error()) { - error = result.error(); - // End our output stream immediately if the codec barfed. - return 0; + while (1) { + ChunkWriteResult res = chunk_writer_.WriteChunkToStream( + [&](cpp::span buffer) -> std::size_t { + std::size_t bytes_written = 0; + // Continue filling up the output buffer so long as we have samples + // leftover, or are able to synthesize more samples from the input. + while (has_samples_to_send || !needs_more_input) { + if (!has_samples_to_send) { + auto result = current_codec_->ProcessNextFrame(); + has_samples_to_send = true; + if (result.has_error()) { + error = result.error(); + // End our output stream immediately if the codec barfed. + return 0; + } else { + needs_more_input = result.value(); + } } else { - needs_more_input = result.value(); + auto result = current_codec_->WriteOutputSamples( + buffer.last(buffer.size() - bytes_written)); + bytes_written += result.first; + has_samples_to_send = !result.second; } - } else { - auto result = current_codec_->WriteOutputSamples( - buffer.last(buffer.size() - bytes_written)); - bytes_written += result.first; - has_samples_to_send = !result.second; } - } - return bytes_written; - }, - // This element doesn't support any kind of out of band commands, so we - // can just suspend the whole task if the output buffer fills up. - portMAX_DELAY); + return bytes_written; + }, + // TODO + portMAX_DELAY); + + switch (res) { + case CHUNK_WRITE_OKAY: + break; + case CHUNK_WRITE_TIMEOUT: + case CHUNK_OUT_OF_DATA: + return {}; + default: + return cpp::fail(IO_ERROR); + } + } if (error) { ESP_LOGE(kTag, "Codec encountered error %d", error.value()); diff --git a/src/audio/audio_playback.cpp b/src/audio/audio_playback.cpp index 7b8418d7..bcc3ad04 100644 --- a/src/audio/audio_playback.cpp +++ b/src/audio/audio_playback.cpp @@ -43,9 +43,9 @@ auto AudioPlayback::create(drivers::GpioExpander* expander, playback->ConnectElements(codec.get(), sink.get()); // Launch! - StartAudioTask("src", source); - StartAudioTask("dec", codec); - StartAudioTask("sink", sink); + playback->element_handles_.push_back(StartAudioTask("src", source)); + playback->element_handles_.push_back(StartAudioTask("dec", codec)); + playback->element_handles_.push_back(StartAudioTask("sink", sink)); return playback; } @@ -55,7 +55,9 @@ AudioPlayback::AudioPlayback() : stream_start_(128, 128), stream_end_(128, 128) {} AudioPlayback::~AudioPlayback() { - // TODO(jacqueline): signal the end of all things, and maybe wait for it? + for (auto& element : element_handles_) { + element->Quit(); + } } auto AudioPlayback::Play(const std::string& filename) -> void { diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 3512c96f..079ae852 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -3,9 +3,12 @@ #include #include +#include +#include "audio_element_handle.hpp" #include "cbor.h" #include "esp_heap_caps.h" +#include "esp_log.h" #include "freertos/portmacro.h" #include "freertos/queue.h" #include "freertos/stream_buffer.h" @@ -20,10 +23,17 @@ namespace audio { auto StartAudioTask(const std::string& name, - std::shared_ptr element) -> void { + std::shared_ptr element) + -> std::unique_ptr { + auto task_handle = std::make_unique(); + + // Newly created task will free this. AudioTaskArgs* args = new AudioTaskArgs{.element = element}; + xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args, - kTaskPriorityAudio, NULL); + kTaskPriorityAudio, task_handle.get()); + + return std::make_unique(std::move(task_handle), element); } void AudioTaskMain(void* args) { @@ -32,9 +42,16 @@ void AudioTaskMain(void* args) { std::shared_ptr element = std::move(real_args->element); delete real_args; + char tag[] = "task"; ChunkReader chunk_reader = ChunkReader(element->InputBuffer()); - while (1) { + while (element->ElementState() != STATE_QUIT) { + if (element->ElementState() == STATE_PAUSE) { + // TODO: park with a condition variable or something? + vTaskDelay(100); + continue; + } + cpp::result process_res; // If this element has an input stream, then our top priority is @@ -54,6 +71,7 @@ void AudioTaskMain(void* args) { if (chunk_res == CHUNK_PROCESSING_ERROR || chunk_res == CHUNK_DECODING_ERROR) { + ESP_LOGE(tag, "failed to process chunk"); break; // TODO. } else if (chunk_res == CHUNK_STREAM_ENDED) { has_received_message = true; @@ -65,21 +83,36 @@ void AudioTaskMain(void* args) { if (type == TYPE_STREAM_INFO) { auto parse_res = ReadMessage(&StreamInfo::Parse, message); if (parse_res.has_error()) { + ESP_LOGE(tag, "failed to parse stream info"); break; // TODO. } auto info_res = element->ProcessStreamInfo(parse_res.value()); if (info_res.has_error()) { + ESP_LOGE(tag, "failed to process stream info"); break; // TODO. } } } - // TODO: Do any out of band reading, such a a pause command, here. - // Chunk reading must have timed out, or we don't have an input stream. + ElementState state = element->ElementState(); + if (state == STATE_PAUSE) { + element->PrepareForPause(); + + vTaskSuspend(NULL); + + // Zzzzzz... + + // When we wake up, skip straight to the start of the loop again. + continue; + } else if (state == STATE_QUIT) { + break; + } + // Signal the element to do any of its idle tasks. auto process_error = element->ProcessIdle(); if (process_error.has_error()) { + ESP_LOGE(tag, "failed to process idle"); break; // TODO. } } diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp index fbd795d9..5f8f3148 100644 --- a/src/audio/chunk.cpp +++ b/src/audio/chunk.cpp @@ -13,42 +13,58 @@ namespace audio { -auto WriteChunksToStream(StreamBuffer* stream, - std::function)> callback, - TickType_t max_wait) -> ChunkWriteResult { - cpp::span write_buffer = stream->WriteBuffer(); - while (1) { - // First, write out our chunk header so we know how much space to give to - // the callback. - auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer); - if (header_size.has_error()) { - return CHUNK_ENCODING_ERROR; - } - - // Now we can ask the callback to fill the remaining space. - size_t chunk_size = std::invoke( +ChunkWriter::ChunkWriter(StreamBuffer* buffer) + : stream_(buffer), leftover_bytes_(0) {} + +ChunkWriter::~ChunkWriter() {} + +auto ChunkWriter::Reset() -> void { + leftover_bytes_ = 0; +} + +auto ChunkWriter::WriteChunkToStream( + std::function)> callback, + TickType_t max_wait) -> ChunkWriteResult { + cpp::span write_buffer = stream_->WriteBuffer(); + // First, write out our chunk header so we know how much space to give to + // the callback. + auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer); + if (header_size.has_error()) { + return CHUNK_ENCODING_ERROR; + } + + // Now we can ask the callback to fill the remaining space. If the previous + // call to this method timed out, then we may already have the data we need + // in our write buffer. + size_t chunk_size; + if (leftover_bytes_ > 0) { + chunk_size = leftover_bytes_; + } else { + chunk_size = std::invoke( callback, write_buffer.subspan(header_size.value(), write_buffer.size() - header_size.value())); + } - if (chunk_size == 0) { - // They had nothing for us, so bail out. - return CHUNK_OUT_OF_DATA; - } - - // Try to write to the buffer. Note the return type here will be either 0 or - // header_size + chunk_size, as MessageBuffer doesn't allow partial writes. - size_t actual_write_size = - xMessageBufferSend(stream->Handle(), write_buffer.data(), - header_size.value() + chunk_size, max_wait); - - if (actual_write_size == 0) { - // We failed to write in time, so bail out. This is techinically data loss - // unless the caller wants to go and parse our working buffer, but we - // assume the caller has a good reason to time us out. - return CHUNK_WRITE_TIMEOUT; - } + if (chunk_size == 0) { + // They had nothing for us, so bail out. + return CHUNK_OUT_OF_DATA; } + + // Try to write to the buffer. Note the return type here will be either 0 or + // header_size + chunk_size, as MessageBuffer doesn't allow partial writes. + size_t actual_write_size = + xMessageBufferSend(stream_->Handle(), write_buffer.data(), + header_size.value() + chunk_size, max_wait); + + if (actual_write_size == 0) { + leftover_bytes_ = chunk_size; + return CHUNK_WRITE_TIMEOUT; + } else { + leftover_bytes_ = 0; + } + + return CHUNK_WRITE_OKAY; } ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {} diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index 9e8c5243..bc5be42a 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -126,25 +126,25 @@ auto FatfsAudioInput::ProcessIdle() -> cpp::result { } // Now stream data into the output buffer until it's full. - pending_read_pos_ = file_buffer_read_pos_; - ChunkWriteResult result = WriteChunksToStream( - output_buffer_, [&](cpp::span d) { return SendChunk(d); }, - kServiceInterval); - - switch (result) { - case CHUNK_WRITE_TIMEOUT: - case CHUNK_OUT_OF_DATA: - // Both of these are fine; SendChunk keeps track of where it's up to - // internally, so we will pick back up where we left off. - return {}; - default: - return cpp::fail(IO_ERROR); + while (1) { + ChunkWriteResult result = chunk_writer_.WriteChunkToStream( + [&](cpp::span d) { return SendChunk(d); }, kServiceInterval); + + switch (result) { + case CHUNK_WRITE_OKAY: + break; + case CHUNK_WRITE_TIMEOUT: + case CHUNK_OUT_OF_DATA: + // Both of these are fine; we will pick back up where we left off in + // the next idle call. + return {}; + default: + return cpp::fail(IO_ERROR); + } } } auto FatfsAudioInput::SendChunk(cpp::span dest) -> size_t { - file_buffer_read_pos_ = pending_read_pos_; - if (file_buffer_read_pos_ == file_buffer_write_pos_) { return 0; } @@ -159,9 +159,9 @@ auto FatfsAudioInput::SendChunk(cpp::span dest) -> size_t { cpp::span source(file_buffer_read_pos_, chunk_size); std::copy(source.begin(), source.end(), dest.begin()); - pending_read_pos_ = file_buffer_read_pos_ + chunk_size; - if (pending_read_pos_ == file_buffer_.end()) { - pending_read_pos_ = file_buffer_.begin(); + file_buffer_read_pos_ = file_buffer_read_pos_ + chunk_size; + if (file_buffer_read_pos_ == file_buffer_.end()) { + file_buffer_read_pos_ = file_buffer_.begin(); } return chunk_size; } diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp index a51d6aa5..d853a06f 100644 --- a/src/audio/i2s_audio_output.cpp +++ b/src/audio/i2s_audio_output.cpp @@ -102,6 +102,12 @@ auto I2SAudioOutput::ProcessIdle() -> cpp::result { return {}; } +auto I2SAudioOutput::PrepareForPause() -> void { + // TODO(jacqueline): We ideally want to ensure we have enough samples in the + // DMA buffer here, so that soft mute can work properly. + SetSoftMute(true); +} + auto I2SAudioOutput::SetVolume(uint8_t volume) -> void { volume_ = volume; if (!is_soft_muted_) { diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp index eaef2f8c..9c0626db 100644 --- a/src/audio/include/audio_decoder.hpp +++ b/src/audio/include/audio_decoder.hpp @@ -42,6 +42,8 @@ class AudioDecoder : public IAudioElement { private: std::unique_ptr current_codec_; std::optional stream_info_; + + ChunkWriter chunk_writer_; }; } // namespace audio diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp index 590889bd..ec6d6e80 100644 --- a/src/audio/include/audio_element.hpp +++ b/src/audio/include/audio_element.hpp @@ -1,10 +1,11 @@ #pragma once +#include #include -#include "chunk.hpp" #include "freertos/FreeRTOS.h" +#include "chunk.hpp" #include "freertos/message_buffer.h" #include "freertos/portmacro.h" #include "result.hpp" @@ -16,6 +17,12 @@ namespace audio { +enum ElementState { + STATE_RUN, + STATE_PAUSE, + STATE_QUIT, +}; + /* * Errors that may be returned by any of the Process* methods of an audio * element. @@ -42,7 +49,8 @@ enum AudioProcessingError { */ class IAudioElement { public: - IAudioElement() : input_buffer_(nullptr), output_buffer_(nullptr) {} + IAudioElement() + : input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {} virtual ~IAudioElement() {} /* @@ -71,6 +79,9 @@ class IAudioElement { auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; } + auto ElementState() const -> ElementState { return state_; } + auto ElementState(enum ElementState e) -> void { state_ = e; } + /* * Called when a StreamInfo message is received. Used to configure this * element in preperation for incoming chunks. @@ -94,9 +105,12 @@ class IAudioElement { */ virtual auto ProcessIdle() -> cpp::result = 0; + virtual auto PrepareForPause() -> void{}; + protected: StreamBuffer* input_buffer_; StreamBuffer* output_buffer_; + std::atomic state_; }; } // namespace audio diff --git a/src/audio/include/audio_playback.hpp b/src/audio/include/audio_playback.hpp index 9a7c5fc0..bffc3f02 100644 --- a/src/audio/include/audio_playback.hpp +++ b/src/audio/include/audio_playback.hpp @@ -6,6 +6,7 @@ #include #include "audio_element.hpp" +#include "audio_element_handle.hpp" #include "esp_err.h" #include "gpio_expander.hpp" #include "result.hpp" @@ -16,7 +17,8 @@ namespace audio { /* - * TODO. + * Creates and links together audio elements into a pipeline. This is the main + * entrypoint to playing audio on the system. */ class AudioPlayback { public: @@ -29,6 +31,10 @@ class AudioPlayback { AudioPlayback(); ~AudioPlayback(); + /* + * Begins playing the file at the given FatFS path. This will interrupt any + * currently in-progress playback. + */ auto Play(const std::string& filename) -> void; // Not copyable or movable. @@ -41,6 +47,7 @@ class AudioPlayback { StreamBuffer stream_start_; StreamBuffer stream_end_; std::vector> element_buffers_; + std::vector> element_handles_; }; } // namespace audio diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp index ca75fbd2..9a76ea7e 100644 --- a/src/audio/include/audio_task.hpp +++ b/src/audio/include/audio_task.hpp @@ -3,6 +3,7 @@ #include #include "audio_element.hpp" +#include "audio_element_handle.hpp" namespace audio { @@ -11,7 +12,8 @@ struct AudioTaskArgs { }; auto StartAudioTask(const std::string& name, - std::shared_ptr element) -> void; + std::shared_ptr element) + -> std::unique_ptr; void AudioTaskMain(void* args); diff --git a/src/audio/include/chunk.hpp b/src/audio/include/chunk.hpp index d55e5d9d..5c7e73dd 100644 --- a/src/audio/include/chunk.hpp +++ b/src/audio/include/chunk.hpp @@ -18,6 +18,8 @@ namespace audio { enum ChunkWriteResult { + // Returned when the callback does not write any data. + CHUNK_WRITE_OKAY, // Returned when the callback does not write any data. CHUNK_OUT_OF_DATA, // Returned when there is an error encoding a chunk header using cbor. @@ -27,18 +29,32 @@ enum ChunkWriteResult { CHUNK_WRITE_TIMEOUT, }; -/* - * Invokes the given callback to receive data, breaks the received data up into - * chunks with headers, and writes those chunks to the given output stream. - * - * The callback will be invoked with a byte buffer and its size. The callback - * should write as much data as it can to this buffer, and then return the - * number of bytes it wrote. Return a value of 0 to indicate that there is no - * more input to read. - */ -auto WriteChunksToStream(StreamBuffer* stream, - std::function)> callback, - TickType_t max_wait) -> ChunkWriteResult; +class ChunkWriter { + public: + explicit ChunkWriter(StreamBuffer* buffer); + ~ChunkWriter(); + + auto Reset() -> void; + + auto GetLastMessage() -> cpp::span; + + /* + * Invokes the given callback to receive data, breaks the received data up + * into chunks with headers, and writes those chunks to the given output + * stream. + * + * The callback will be invoked with a byte buffer and its size. The callback + * should write as much data as it can to this buffer, and then return the + * number of bytes it wrote. Return a value of 0 to indicate that there is no + * more input to read. + */ + auto WriteChunkToStream(std::function)> callback, + TickType_t max_wait) -> ChunkWriteResult; + + private: + StreamBuffer* stream_; + std::size_t leftover_bytes_ = 0; +}; enum ChunkReadResult { CHUNK_READ_OKAY, diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index 21c729be..040b2b54 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -46,6 +46,8 @@ class FatfsAudioInput : public IAudioElement { FIL current_file_; bool is_file_open_; + + ChunkWriter chunk_writer_; }; } // namespace audio diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp index 9e59f8fd..75a3be76 100644 --- a/src/audio/include/i2s_audio_output.hpp +++ b/src/audio/include/i2s_audio_output.hpp @@ -34,6 +34,7 @@ class I2SAudioOutput : public IAudioElement { auto ProcessChunk(const cpp::span& chunk) -> cpp::result override; auto ProcessIdle() -> cpp::result override; + auto PrepareForPause() -> void override; I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; diff --git a/src/audio/include/stream_buffer.hpp b/src/audio/include/stream_buffer.hpp index cfb4bf9d..3853a53f 100644 --- a/src/audio/include/stream_buffer.hpp +++ b/src/audio/include/stream_buffer.hpp @@ -10,13 +10,35 @@ namespace audio { +/* + * A collection of the buffers required for two IAudioElement implementations to + * stream data between each other. + * + * Currently, we use a FreeRTOS MessageBuffer to hold the byte stream, and also + * maintain two chunk-sized buffers for the elements to stage their read and + * write operations (as MessageBuffer copies the given data into its memory + * space). A future optimisation here could be to instead post himem memory + * addresses to the message buffer, and then maintain address spaces into which + * we map these messages, rather than 'real' allocated buffers as we do now. + */ class StreamBuffer { public: explicit StreamBuffer(std::size_t chunk_size, std::size_t buffer_size); ~StreamBuffer(); + /* Returns the handle for the underlying message buffer. */ auto Handle() -> MessageBufferHandle_t* { return &handle_; } + + /* + * Returns a chunk-sized staging buffer that should be used *only* by the + * reader (sink) element. + */ auto ReadBuffer() -> cpp::span { return input_chunk_; } + + /* + * Returns a chunk-sized staging buffer that should be used *only* by the + * writer (source) element. + */ auto WriteBuffer() -> cpp::span { return output_chunk_; } StreamBuffer(const StreamBuffer&) = delete;