diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index 4986a7da..57dae1cd 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -2,6 +2,7 @@ idf_component_register( SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" "stream_info.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" "audio_playback.cpp" "audio_element_handle.cpp" + "stream_event.cpp" "audio_element.cpp" INCLUDE_DIRS "include" REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span") diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp index 0b3d9878..8ef90905 100644 --- a/src/audio/audio_decoder.cpp +++ b/src/audio/audio_decoder.cpp @@ -4,6 +4,7 @@ #include #include +#include #include "freertos/FreeRTOS.h" @@ -14,19 +15,30 @@ #include "audio_element.hpp" #include "chunk.hpp" #include "fatfs_audio_input.hpp" - -static const char* kTag = "DEC"; +#include "stream_info.hpp" namespace audio { +static const std::size_t kSamplesPerChunk = 256; + AudioDecoder::AudioDecoder() : IAudioElement(), stream_info_({}) {} AudioDecoder::~AudioDecoder() {} +auto AudioDecoder::HasUnprocessedInput() -> bool { + return !needs_more_input_ || has_samples_to_send_; +} + auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> cpp::result { stream_info_ = info; + if (info.ChunkSize()) { + chunk_reader_.emplace(info.ChunkSize().value()); + } else { + // TODO. + } + // Reuse the existing codec if we can. This will help with gapless playback, // since we can potentially just continue to decode as we were before, // without any setup overhead. @@ -42,71 +54,66 @@ auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) return cpp::fail(UNSUPPORTED_STREAM); } + // TODO: defer until first header read, so we can give better info about + // sample rate, chunk size, etc. + auto downstream_info = StreamEvent::CreateStreamInfo( + input_events_, std::make_unique(info)); + downstream_info->stream_info->BitsPerSample(32); + downstream_info->stream_info->SampleRate(48'000); + chunk_size_ = 128; + downstream_info->stream_info->ChunkSize(chunk_size_); + + SendOrBufferEvent(std::move(downstream_info)); + return {}; } auto AudioDecoder::ProcessChunk(const cpp::span& chunk) -> cpp::result { - if (current_codec_ == nullptr) { + if (current_codec_ == nullptr || !chunk_reader_) { // Should never happen, but fail explicitly anyway. return cpp::fail(UNSUPPORTED_STREAM); } - current_codec_->SetInput(chunk); - - bool has_samples_to_send = false; - bool needs_more_input = false; - std::optional error = std::nullopt; - while (1) { - ChunkWriteResult res = chunk_writer_->WriteChunkToStream( - [&](cpp::span buffer) -> std::size_t { - std::size_t bytes_written = 0; - // Continue filling up the output buffer so long as we have samples - // leftover, or are able to synthesize more samples from the input. - while (has_samples_to_send || !needs_more_input) { - if (!has_samples_to_send) { - auto result = current_codec_->ProcessNextFrame(); - has_samples_to_send = true; - if (result.has_error()) { - error = result.error(); - // End our output stream immediately if the codec barfed. - return 0; - } else { - needs_more_input = result.value(); - } - } else { - auto result = current_codec_->WriteOutputSamples( - buffer.last(buffer.size() - bytes_written)); - bytes_written += result.first; - has_samples_to_send = !result.second; - } - } - return bytes_written; - }, - // TODO - portMAX_DELAY); - - switch (res) { - case CHUNK_WRITE_OKAY: - break; - case CHUNK_WRITE_TIMEOUT: - case CHUNK_OUT_OF_DATA: + current_codec_->SetInput(chunk_reader_->HandleNewData(chunk)); + + return {}; +} + +auto AudioDecoder::Process() -> cpp::result { + if (has_samples_to_send_) { + // Writing samples is relatively quick (it's just a bunch of memcopy's), so + // do them all at once. + while (has_samples_to_send_ && !IsOverBuffered()) { + auto buffer = StreamEvent::CreateChunkData(input_events_, chunk_size_); + auto write_res = + current_codec_->WriteOutputSamples(buffer->chunk_data.bytes); + buffer->chunk_data.bytes = + buffer->chunk_data.bytes.first(write_res.first); + has_samples_to_send_ = !write_res.second; + + if (!SendOrBufferEvent(std::move(buffer))) { return {}; - default: - return cpp::fail(IO_ERROR); + } } + // We will process the next frame during the next call to this method. + return {}; } - if (error) { - ESP_LOGE(kTag, "Codec encountered error %d", error.value()); - return cpp::fail(IO_ERROR); - } + if (!needs_more_input_) { + auto res = current_codec_->ProcessNextFrame(); + if (res.has_error()) { + // todo + return {}; + } + needs_more_input_ = res.value(); + has_samples_to_send_ = true; - return current_codec_->GetInputPosition(); -} + if (needs_more_input_) { + chunk_reader_->HandleLeftovers(current_codec_->GetInputPosition()); + } + } -auto AudioDecoder::ProcessIdle() -> cpp::result { - // Not used; we delay forever when waiting on IO. return {}; } diff --git a/src/audio/audio_element.cpp b/src/audio/audio_element.cpp new file mode 100644 index 00000000..e1623c36 --- /dev/null +++ b/src/audio/audio_element.cpp @@ -0,0 +1,59 @@ +#include "audio_element.hpp" + +namespace audio { + +IAudioElement::IAudioElement() + : input_events_(xQueueCreate(kEventQueueSize, sizeof(StreamEvent))), + output_events_(nullptr), + unprocessed_output_chunks_(0), + buffered_output_(), + current_state_(STATE_RUN) {} + +IAudioElement::~IAudioElement() { + // Ensure we don't leak any memory from events leftover in the queue. + while (uxQueueSpacesAvailable(input_events_) < kEventQueueSize) { + StreamEvent* event; + if (xQueueReceive(input_events_, &event, 0)) { + free(event); + } else { + break; + } + } + // Technically there's a race here if someone is still adding to the queue, + // but hopefully the whole pipeline is stopped if an element is being + // destroyed. + vQueueDelete(input_events_); +} + +auto IAudioElement::SendOrBufferEvent(std::unique_ptr event) + -> bool { + if (event->tag == StreamEvent::CHUNK_DATA) { + unprocessed_output_chunks_++; + } + if (!buffered_output_.empty()) { + // To ensure we send data in order, don't try to send if we've already + // failed to send something. + buffered_output_.push_back(std::move(event)); + return false; + } + StreamEvent* raw_event = event.release(); + if (!xQueueSend(output_events_, raw_event, 0)) { + buffered_output_.emplace_front(raw_event); + return false; + } + return true; +} + +auto IAudioElement::FlushBufferedOutput() -> bool { + while (!buffered_output_.empty()) { + StreamEvent* raw_event = buffered_output_.front().release(); + buffered_output_.pop_front(); + if (!xQueueSend(output_events_, raw_event, 0)) { + buffered_output_.emplace_front(raw_event); + return false; + } + } + return true; +} + +} // namespace audio diff --git a/src/audio/audio_playback.cpp b/src/audio/audio_playback.cpp index 7462b4f6..fae096d0 100644 --- a/src/audio/audio_playback.cpp +++ b/src/audio/audio_playback.cpp @@ -18,9 +18,6 @@ namespace audio { -// TODO: idk -static const std::size_t kMinElementBufferSize = 1024; - auto AudioPlayback::create(drivers::GpioExpander* expander, std::shared_ptr storage) -> cpp::result, Error> { @@ -37,8 +34,6 @@ auto AudioPlayback::create(drivers::GpioExpander* expander, auto playback = std::make_unique(); // Configure the pipeline - source->InputBuffer(&playback->stream_start_); - sink->OutputBuffer(&playback->stream_end_); playback->ConnectElements(source.get(), codec.get()); playback->ConnectElements(codec.get(), sink.get()); @@ -52,9 +47,7 @@ auto AudioPlayback::create(drivers::GpioExpander* expander, return playback; } -// TODO(jacqueline): think about sizes -AudioPlayback::AudioPlayback() - : stream_start_(128, 256), stream_end_(128, 256) {} +AudioPlayback::AudioPlayback() {} AudioPlayback::~AudioPlayback() { for (auto& element : element_handles_) { @@ -63,33 +56,16 @@ AudioPlayback::~AudioPlayback() { } auto AudioPlayback::Play(const std::string& filename) -> void { - StreamInfo info; - info.Path(filename); - - std::array dest; - auto len = WriteMessage( - TYPE_STREAM_INFO, [&](auto enc) { return info.Encode(enc); }, dest); + auto info = std::make_unique(); + info->Path(filename); + auto event = StreamEvent::CreateStreamInfo(nullptr, std::move(info)); - if (len.has_error()) { - // TODO. - return; - } - - // TODO: short delay, return error on fail - xMessageBufferSend(*stream_start_.Handle(), dest.data(), len.value(), - portMAX_DELAY); + xQueueSend(input_handle_, event.release(), portMAX_DELAY); } auto AudioPlayback::ConnectElements(IAudioElement* src, IAudioElement* sink) -> void { - std::size_t chunk_size = - std::max(src->InputMinChunkSize(), sink->InputMinChunkSize()); - std::size_t buffer_size = std::max(kMinElementBufferSize, chunk_size * 2); - - auto buffer = std::make_unique(chunk_size, buffer_size); - src->OutputBuffer(buffer.get()); - sink->OutputBuffer(buffer.get()); - element_buffers_.push_back(std::move(buffer)); + src->OutputEventQueue(sink->InputEventQueue()); } } // namespace audio diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index e3641973..f0a01b6c 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include "audio_element_handle.hpp" @@ -10,19 +11,21 @@ #include "esp_heap_caps.h" #include "esp_log.h" #include "freertos/portmacro.h" +#include "freertos/projdefs.h" #include "freertos/queue.h" -#include "freertos/stream_buffer.h" #include "span.hpp" #include "audio_element.hpp" #include "chunk.hpp" +#include "stream_event.hpp" #include "stream_info.hpp" #include "stream_message.hpp" +#include "sys/_stdint.h" #include "tasks.hpp" namespace audio { - static const char *kTag = "task"; +static const char* kTag = "task"; auto StartAudioTask(const std::string& name, std::shared_ptr element) @@ -47,84 +50,91 @@ void AudioTaskMain(void* args) { std::shared_ptr element = std::move(real_args->element); delete real_args; - ChunkReader chunk_reader = ChunkReader(element->InputBuffer()); + // Queue of events that we have received on our input queue, but not yet + // processed. + std::deque> pending_events; while (element->ElementState() != STATE_QUIT) { - if (element->ElementState() == STATE_PAUSE) { - // TODO: park with a condition variable or something? - vTaskDelay(1000); + // First, we pull events from our input queue into pending_events. This + // keeps us responsive to any events that need to be handled immediately. + // Then we check if there's any events to flush downstream. + // Then we pass anything requiring processing to the element. + + bool has_work_to_do = + (!pending_events.empty() || element->HasUnflushedOutput() || + element->HasUnprocessedInput()) && + !element->IsOverBuffered(); + + // If we have no new events to process and the element has nothing left to + // do, then just delay forever waiting for a new event. + TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY; + + StreamEvent* event_ptr = nullptr; + bool has_event = + xQueueReceive(element->InputEventQueue(), &event_ptr, ticks_to_wait); + + if (has_event && event_ptr != nullptr) { + std::unique_ptr event(event_ptr); + if (event->tag == StreamEvent::CHUNK_NOTIFICATION) { + element->OnChunkProcessed(); + } else { + // This isn't an event that needs to be actioned immediately. Add it + // to our work queue. + pending_events.push_back(std::move(event)); + } + // Loop again, so that we service all incoming events before doing our + // possibly expensive processing. continue; } - cpp::result process_res; - - // If this element has an input stream, then our top priority is - // processing any chunks from it. Try doing this first, then fall back to - // the other cases. - bool has_received_message = false; - ChunkReadResult chunk_res = chunk_reader.ReadChunkFromStream( - [&](cpp::span data) -> std::optional { - process_res = element->ProcessChunk(data); - if (process_res.has_value()) { - return process_res.value(); - } else { - return {}; - } - }, - 0); - - if (chunk_res == CHUNK_PROCESSING_ERROR || - chunk_res == CHUNK_DECODING_ERROR) { - ESP_LOGE(kTag, "failed to process chunk"); - break; // TODO. - } else if (chunk_res == CHUNK_STREAM_ENDED) { - has_received_message = true; + // We have no new events. Next, see if there's anything that needs to be + // flushed. + if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) { + // We had things to flush, but couldn't send it all. This probably + // implies that the downstream element is having issues servicing its + // input queue, so hold off for a moment before retrying. + ESP_LOGW(kTag, "failed to flush buffered output"); + vTaskDelay(pdMS_TO_TICKS(100)); + continue; } - if (has_received_message) { - auto message = chunk_reader.GetLastMessage(); - MessageType type = ReadMessageType(message); - if (type == TYPE_STREAM_INFO) { - auto parse_res = ReadMessage(&StreamInfo::Parse, message); - if (parse_res.has_error()) { - ESP_LOGE(kTag, "failed to parse stream info"); - break; // TODO. - } - auto info_res = element->ProcessStreamInfo(parse_res.value()); - if (info_res.has_error()) { - ESP_LOGE(kTag, "failed to process stream info"); - break; // TODO. - } + if (element->HasUnprocessedInput()) { + auto process_res = element->Process(); + if (!process_res.has_error() || process_res.error() != OUT_OF_DATA) { + // TODO: log! } + continue; } - // Chunk reading must have timed out, or we don't have an input stream. - ElementState state = element->ElementState(); - if (state == STATE_PAUSE) { - element->PrepareForPause(); - - vTaskSuspend(NULL); + // The element ran out of data, so now it's time to let it process more + // input. + while (!pending_events.empty()) { + auto event = std::move(pending_events.front()); + pending_events.pop_front(); - // Zzzzzz... - - // When we wake up, skip straight to the start of the loop again. - continue; - } else if (state == STATE_QUIT) { - break; - } + if (event->tag == StreamEvent::STREAM_INFO) { + auto process_res = element->ProcessStreamInfo(*event->stream_info); + if (process_res.has_error()) { + // TODO(jacqueline) + ESP_LOGE(kTag, "failed to process stream info"); + } + } else if (event->tag == StreamEvent::CHUNK_DATA) { + StreamEvent* callback = new StreamEvent(); + callback->source = element->InputEventQueue(); + callback->tag = StreamEvent::CHUNK_NOTIFICATION; + if (!xQueueSend(event->source, callback, 0)) { + // TODO: log? crash? hmm. + pending_events.push_front(std::move(event)); + continue; + } - // Signal the element to do any of its idle tasks. - auto process_error = element->ProcessIdle(); - if (process_error.has_error()) { - auto err = process_error.error(); - if (err == OUT_OF_DATA) { - // If we ran out of data, then place ourselves into the pause state. - // We will be woken up when there's something to do. - element->ElementState(STATE_PAUSE); - continue; - } else { - ESP_LOGE(kTag, "failed to process idle"); - break; // TODO. + auto process_chunk_res = + element->ProcessChunk(event->chunk_data.bytes); + if (process_chunk_res.has_error()) { + // TODO(jacqueline) + ESP_LOGE(kTag, "failed to process chunk"); + continue; + } } } } diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp index b37fdfed..baf2aba5 100644 --- a/src/audio/chunk.cpp +++ b/src/audio/chunk.cpp @@ -14,130 +14,37 @@ namespace audio { -static const char* kTag = "chunk"; +ChunkReader::ChunkReader(std::size_t chunk_size) + : raw_working_buffer_(static_cast( + heap_caps_malloc(chunk_size * 1.5, MALLOC_CAP_SPIRAM))), + working_buffer_(raw_working_buffer_, chunk_size * 1.5) {} -ChunkWriter::ChunkWriter(StreamBuffer* buffer) - : stream_(buffer), leftover_bytes_(0) {} - -ChunkWriter::~ChunkWriter() {} - -auto ChunkWriter::Reset() -> void { - leftover_bytes_ = 0; -} - -auto ChunkWriter::WriteChunkToStream( - std::function)> callback, - TickType_t max_wait) -> ChunkWriteResult { - cpp::span write_buffer = stream_->WriteBuffer(); - // First, write out our chunk header so we know how much space to give to - // the callback. - auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer); - if (header_size.has_error()) { - return CHUNK_ENCODING_ERROR; - } - - // Now we can ask the callback to fill the remaining space. If the previous - // call to this method timed out, then we may already have the data we need - // in our write buffer. - size_t chunk_size; - if (leftover_bytes_ > 0) { - chunk_size = leftover_bytes_; - } else { - chunk_size = std::invoke( - callback, - write_buffer.subspan(header_size.value(), - write_buffer.size() - header_size.value())); - } - - if (chunk_size == 0) { - // They had nothing for us, so bail out. - return CHUNK_OUT_OF_DATA; - } - - // Try to write to the buffer. Note the return type here will be either 0 or - // header_size + chunk_size, as MessageBuffer doesn't allow partial writes. - size_t intended_write_size = header_size.value() + chunk_size; - ESP_LOGI(kTag, "writing chunk of size %d", intended_write_size); - size_t actual_write_size = - xMessageBufferSend(stream_->Handle(), write_buffer.data(), - intended_write_size, max_wait); - - if (actual_write_size == 0) { - leftover_bytes_ = chunk_size; - return CHUNK_WRITE_TIMEOUT; - } else { - leftover_bytes_ = 0; - } - - return CHUNK_WRITE_OKAY; +ChunkReader::~ChunkReader() { + free(raw_working_buffer_); } -ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {} - -ChunkReader::~ChunkReader() {} - -auto ChunkReader::Reset() -> void { +auto ChunkReader::HandleNewData(cpp::span data) + -> cpp::span { + // Copy the new data onto the front for anything that was left over from the + // last portion. Note: this could be optimised for the '0 leftover bytes' + // case, which technically shouldn't need a copy. + cpp::span new_data_dest = working_buffer_.subspan(leftover_bytes_); + std::copy(data.begin(), data.end(), new_data_dest.begin()); + last_data_in_working_buffer_ = + working_buffer_.first(leftover_bytes_ + data.size()); leftover_bytes_ = 0; - last_message_size_ = 0; -} - -auto ChunkReader::GetLastMessage() -> cpp::span { - return stream_->ReadBuffer().subspan(leftover_bytes_, last_message_size_); + return last_data_in_working_buffer_; } -auto ChunkReader::ReadChunkFromStream( - std::function(cpp::span)> callback, - TickType_t max_wait) -> ChunkReadResult { - // First, wait for a message to arrive over the buffer. - cpp::span new_data_dest = stream_->ReadBuffer().last( - stream_->ReadBuffer().size() - leftover_bytes_); - ESP_LOGI(kTag, "reading chunk of size %d", new_data_dest.size()); - last_message_size_ = xMessageBufferReceive( - stream_->Handle(), new_data_dest.data(), new_data_dest.size(), max_wait); - - if (last_message_size_ == 0) { - return CHUNK_READ_TIMEOUT; - } - - cpp::span new_data = GetLastMessage(); - MessageType type = ReadMessageType(new_data); - - if (type != TYPE_CHUNK_HEADER) { - // This message wasn't for us, so let the caller handle it. - Reset(); - return CHUNK_STREAM_ENDED; - } - - // Work the size and position of the chunk. - auto chunk_data = GetAdditionalData(new_data); - - // Now we need to stick the end of the last chunk (if it exists) onto the - // front of the new chunk. Do it this way around bc we assume the old chunk - // is shorter, and therefore faster to move. - cpp::span leftover_data = - stream_->ReadBuffer().first(leftover_bytes_); - cpp::span combined_data(chunk_data.data() - leftover_data.size(), - leftover_data.size() + chunk_data.size()); - if (leftover_bytes_ > 0) { - std::copy_backward(leftover_data.begin(), leftover_data.end(), - combined_data.begin()); - } - - // Tell the callback about the new data. - std::optional amount_processed = std::invoke(callback, combined_data); - if (!amount_processed) { - return CHUNK_PROCESSING_ERROR; - } - - // Prepare for the next iteration. - leftover_bytes_ = combined_data.size() - amount_processed.value(); +auto ChunkReader::HandleLeftovers(std::size_t bytes_used) -> void { + leftover_bytes_ = last_data_in_working_buffer_.size() - bytes_used; if (leftover_bytes_ > 0) { - std::copy(combined_data.begin() + amount_processed.value(), - combined_data.end(), stream_->ReadBuffer().begin()); - return CHUNK_LEFTOVER_DATA; + auto data_to_keep = last_data_in_working_buffer_.last(leftover_bytes_); + // Copy backwards, since if more than half of the data was unused then the + // source and destination will overlap. + std::copy_backward(data_to_keep.begin(), data_to_keep.end(), + working_buffer_.begin()); } - - return CHUNK_READ_OKAY; } } // namespace audio diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index 7f11805e..29e03784 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -1,5 +1,6 @@ #include "fatfs_audio_input.hpp" +#include #include #include #include @@ -10,33 +11,26 @@ #include "audio_element.hpp" #include "chunk.hpp" #include "stream_buffer.hpp" +#include "stream_event.hpp" #include "stream_message.hpp" static const char* kTag = "SRC"; namespace audio { -static const TickType_t kServiceInterval = pdMS_TO_TICKS(50); - -static const std::size_t kFileBufferSize = 1024 * 128; -static const std::size_t kMinFileReadSize = 1024 * 4; +// 32KiB to match the minimum himen region size. +static const std::size_t kChunkSize = 1024; FatfsAudioInput::FatfsAudioInput(std::shared_ptr storage) : IAudioElement(), storage_(storage), - raw_file_buffer_(static_cast( - heap_caps_malloc(kFileBufferSize, MALLOC_CAP_SPIRAM))), - file_buffer_(raw_file_buffer_, kFileBufferSize), - file_buffer_read_pos_(file_buffer_.begin()), - file_buffer_write_pos_(file_buffer_.begin()), current_file_(), - is_file_open_(false), - chunk_writer_(nullptr) { - // TODO: create our chunk writer whenever the output buffer changes. -} + is_file_open_(false) {} + +FatfsAudioInput::~FatfsAudioInput() {} -FatfsAudioInput::~FatfsAudioInput() { - free(raw_file_buffer_); +auto FatfsAudioInput::HasUnprocessedInput() -> bool { + return is_file_open_; } auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) @@ -57,17 +51,12 @@ auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) is_file_open_ = true; - auto write_size = - WriteMessage(TYPE_STREAM_INFO, - std::bind(&StreamInfo::Encode, info, std::placeholders::_1), - output_buffer_->WriteBuffer()); + std::unique_ptr new_info = std::make_unique(info); + new_info->ChunkSize(kChunkSize); - if (write_size.has_error()) { - return cpp::fail(IO_ERROR); - } else { - xMessageBufferSend(output_buffer_, output_buffer_->WriteBuffer().data(), - write_size.value(), portMAX_DELAY); - } + auto event = + StreamEvent::CreateStreamInfo(input_events_, std::move(new_info)); + SendOrBufferEvent(std::move(event)); return {}; } @@ -77,110 +66,29 @@ auto FatfsAudioInput::ProcessChunk(const cpp::span& chunk) return cpp::fail(UNSUPPORTED_STREAM); } -auto FatfsAudioInput::GetRingBufferDistance() const -> size_t { - if (file_buffer_read_pos_ == file_buffer_write_pos_) { - return 0; - } - if (file_buffer_read_pos_ < file_buffer_write_pos_) { - return file_buffer_write_pos_ - file_buffer_read_pos_; - } - return - // Read position to end of buffer. - (file_buffer_.end() - file_buffer_read_pos_) - // Start of buffer to write position. - + (file_buffer_write_pos_ - file_buffer_.begin()); -} - -auto FatfsAudioInput::ProcessIdle() -> cpp::result { - // First, see if we're able to fill up the input buffer with any more of the - // file's contents. +auto FatfsAudioInput::Process() -> cpp::result { if (is_file_open_) { - size_t ringbuf_distance = GetRingBufferDistance(); - if (file_buffer_.size() - ringbuf_distance > kMinFileReadSize) { - size_t read_size; - if (file_buffer_write_pos_ < file_buffer_read_pos_) { - // Don't worry about the start of buffer -> read pos size; we can get to - // it next iteration. - read_size = file_buffer_read_pos_ - file_buffer_write_pos_; - } else { - read_size = file_buffer_.begin() - file_buffer_write_pos_; - } - - ESP_LOGI(kTag, "reading up to %d bytes", (int)read_size); - - UINT bytes_read = 0; - FRESULT result = - f_read(¤t_file_, std::addressof(file_buffer_write_pos_), - read_size, &bytes_read); - if (result != FR_OK) { - ESP_LOGE(kTag, "file I/O error %d", result); - return cpp::fail(IO_ERROR); - } - - ESP_LOGI(kTag, "actual read size %d bytes", (int)bytes_read); - - if (f_eof(¤t_file_)) { - f_close(¤t_file_); - is_file_open_ = false; - - // TODO: open the next file? - } - - file_buffer_write_pos_ += bytes_read; - if (file_buffer_write_pos_ == file_buffer_.end()) { - file_buffer_write_pos_ = file_buffer_.begin(); - } + auto dest_event = StreamEvent::CreateChunkData(input_events_, kChunkSize); + UINT bytes_read = 0; + + FRESULT result = + f_read(¤t_file_, dest_event->chunk_data.raw_bytes.get(), + kChunkSize, &bytes_read); + if (result != FR_OK) { + ESP_LOGE(kTag, "file I/O error %d", result); + return cpp::fail(IO_ERROR); } - } else if (GetRingBufferDistance() == 0) { - // We have no file open, and no data waiting to be written. We're out of - // stuff to do, so signal a pause. - return cpp::fail(OUT_OF_DATA); - } - // Now stream data into the output buffer until it's full. - while (GetRingBufferDistance() > 0) { - ESP_LOGI(kTag, "writing up to %d bytes", (int)GetRingBufferDistance()); - ChunkWriteResult result = chunk_writer_->WriteChunkToStream( - [&](cpp::span d) { return SendChunk(d); }, kServiceInterval); - - switch (result) { - case CHUNK_WRITE_OKAY: - break; - case CHUNK_WRITE_TIMEOUT: - case CHUNK_OUT_OF_DATA: - // Both of these are fine; we will pick back up where we left off in - // the next idle call. - return {}; - default: - return cpp::fail(IO_ERROR); + dest_event->chunk_data.bytes = + dest_event->chunk_data.bytes.first(bytes_read); + SendOrBufferEvent(std::move(dest_event)); + + if (f_eof(¤t_file_)) { + f_close(¤t_file_); + is_file_open_ = false; } } - - // We've finished writing out chunks, but there may be more of the file to - // read. Return, and begin again in the next idle call. return {}; } -auto FatfsAudioInput::SendChunk(cpp::span dest) -> size_t { - if (file_buffer_read_pos_ == file_buffer_write_pos_) { - return 0; - } - std::size_t chunk_size; - if (file_buffer_read_pos_ > file_buffer_write_pos_) { - chunk_size = file_buffer_.end() - file_buffer_read_pos_; - } else { - chunk_size = file_buffer_write_pos_ - file_buffer_read_pos_; - } - chunk_size = std::min(chunk_size, dest.size()); - - cpp::span source(file_buffer_read_pos_, chunk_size); - std::copy(source.begin(), source.end(), dest.begin()); - - file_buffer_read_pos_ = file_buffer_read_pos_ + chunk_size; - if (file_buffer_read_pos_ == file_buffer_.end()) { - file_buffer_read_pos_ = file_buffer_.begin(); - } - return chunk_size; -} - } // namespace audio diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp index d853a06f..d041f726 100644 --- a/src/audio/i2s_audio_output.cpp +++ b/src/audio/i2s_audio_output.cpp @@ -92,22 +92,12 @@ auto I2SAudioOutput::ProcessChunk(const cpp::span& chunk) return dac_->WriteData(chunk, portMAX_DELAY); } -auto I2SAudioOutput::IdleTimeout() const -> TickType_t { - return kIdleTimeBeforeMute; -} - -auto I2SAudioOutput::ProcessIdle() -> cpp::result { +auto I2SAudioOutput::Process() -> cpp::result { // TODO(jacqueline): Consider powering down the dac completely maybe? SetSoftMute(true); return {}; } -auto I2SAudioOutput::PrepareForPause() -> void { - // TODO(jacqueline): We ideally want to ensure we have enough samples in the - // DMA buffer here, so that soft mute can work properly. - SetSoftMute(true); -} - auto I2SAudioOutput::SetVolume(uint8_t volume) -> void { volume_ = volume; if (!is_soft_muted_) { diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp index 0a2df76d..a2591d25 100644 --- a/src/audio/include/audio_decoder.hpp +++ b/src/audio/include/audio_decoder.hpp @@ -4,6 +4,7 @@ #include #include +#include "chunk.hpp" #include "ff.h" #include "span.hpp" @@ -30,11 +31,13 @@ class AudioDecoder : public IAudioElement { return 1024; } + auto HasUnprocessedInput() -> bool override; + auto ProcessStreamInfo(const StreamInfo& info) -> cpp::result override; auto ProcessChunk(const cpp::span& chunk) -> cpp::result override; - auto ProcessIdle() -> cpp::result override; + auto Process() -> cpp::result override; AudioDecoder(const AudioDecoder&) = delete; AudioDecoder& operator=(const AudioDecoder&) = delete; @@ -42,8 +45,11 @@ class AudioDecoder : public IAudioElement { private: std::unique_ptr current_codec_; std::optional stream_info_; + std::optional chunk_reader_; - std::unique_ptr chunk_writer_; + std::size_t chunk_size_; + bool has_samples_to_send_; + bool needs_more_input_; }; } // namespace audio diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp index eb700180..8827a0c3 100644 --- a/src/audio/include/audio_element.hpp +++ b/src/audio/include/audio_element.hpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include "freertos/FreeRTOS.h" @@ -12,6 +14,7 @@ #include "span.hpp" #include "stream_buffer.hpp" +#include "stream_event.hpp" #include "stream_info.hpp" #include "types.hpp" @@ -36,6 +39,8 @@ enum AudioProcessingError { OUT_OF_DATA, }; +static const size_t kEventQueueSize = 8; + /* * One indepedentent part of an audio pipeline. Each element has an input and * output message stream, and is responsible for taking data from the input @@ -51,38 +56,38 @@ enum AudioProcessingError { */ class IAudioElement { public: - IAudioElement() - : input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {} - virtual ~IAudioElement() {} + IAudioElement(); + virtual ~IAudioElement(); /* * Returns the stack size in bytes that this element requires. This should * be tuned according to the observed stack size of each element, as different - * elements have fairly different stack requirements. + * elements have fairly different stack requirements (particular decoders). */ virtual auto StackSizeBytes() const -> std::size_t { return 2048; }; - /* - * How long to wait for new data on the input stream before triggering a call - * to ProcessIdle(). If this is portMAX_DELAY (the default), then ProcessIdle - * will never be called. - */ - virtual auto IdleTimeout() const -> TickType_t { return 10; } - virtual auto InputMinChunkSize() const -> std::size_t { return 0; } /* Returns this element's input buffer. */ - auto InputBuffer() const -> StreamBuffer* { return input_buffer_; } + auto InputEventQueue() const -> QueueHandle_t { return input_events_; } /* Returns this element's output buffer. */ - auto OutputBuffer() const -> StreamBuffer* { return output_buffer_; } + auto OutputEventQueue() const -> QueueHandle_t { return output_events_; } + + auto OutputEventQueue(const QueueHandle_t q) -> void { output_events_ = q; } - auto InputBuffer(StreamBuffer* b) -> void { input_buffer_ = b; } + auto HasUnflushedOutput() -> bool { return !buffered_output_.empty(); } - auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; } + virtual auto HasUnprocessedInput() -> bool = 0; - auto ElementState() const -> ElementState { return state_; } - auto ElementState(enum ElementState e) -> void { state_ = e; } + auto IsOverBuffered() -> bool { return unprocessed_output_chunks_ > 4; } + + auto FlushBufferedOutput() -> bool; + + auto ElementState() const -> ElementState { return current_state_; } + auto ElementState(enum ElementState e) -> void { current_state_ = e; } + + virtual auto OnChunkProcessed() -> void { unprocessed_output_chunks_--; } /* * Called when a StreamInfo message is received. Used to configure this @@ -105,14 +110,26 @@ class IAudioElement { * time. This could be used to synthesize output, or to save memory by * releasing unused resources. */ - virtual auto ProcessIdle() -> cpp::result = 0; - - virtual auto PrepareForPause() -> void{}; + virtual auto Process() -> cpp::result = 0; protected: - StreamBuffer* input_buffer_; - StreamBuffer* output_buffer_; - std::atomic state_; + auto SendOrBufferEvent(std::unique_ptr event) -> bool; + + // Queue for events coming into this element. Owned by us. + QueueHandle_t input_events_; + // Queue for events going into the next element. Not owned by us, may be null + // if we're not yet in a pipeline. + // FIXME: it would be nicer if this was non-nullable. + QueueHandle_t output_events_; + + // The number of output chunks that we have generated, but have not yet been + // processed by the next element in the pipeline. This includes any chunks + // that are currently help in buffered_output_. + int unprocessed_output_chunks_; + // Output events that have been generated, but are yet to be sent downstream. + std::deque> buffered_output_; + + enum ElementState current_state_; }; } // namespace audio diff --git a/src/audio/include/audio_playback.hpp b/src/audio/include/audio_playback.hpp index bffc3f02..f05ca327 100644 --- a/src/audio/include/audio_playback.hpp +++ b/src/audio/include/audio_playback.hpp @@ -43,11 +43,9 @@ class AudioPlayback { private: auto ConnectElements(IAudioElement* src, IAudioElement* sink) -> void; - - StreamBuffer stream_start_; - StreamBuffer stream_end_; - std::vector> element_buffers_; std::vector> element_handles_; + + QueueHandle_t input_handle_; }; } // namespace audio diff --git a/src/audio/include/chunk.hpp b/src/audio/include/chunk.hpp index 5c7e73dd..6154ab25 100644 --- a/src/audio/include/chunk.hpp +++ b/src/audio/include/chunk.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -17,68 +18,12 @@ namespace audio { -enum ChunkWriteResult { - // Returned when the callback does not write any data. - CHUNK_WRITE_OKAY, - // Returned when the callback does not write any data. - CHUNK_OUT_OF_DATA, - // Returned when there is an error encoding a chunk header using cbor. - CHUNK_ENCODING_ERROR, - // Returned when max_wait expires without room in the stream buffer becoming - // available. - CHUNK_WRITE_TIMEOUT, -}; - -class ChunkWriter { - public: - explicit ChunkWriter(StreamBuffer* buffer); - ~ChunkWriter(); - - auto Reset() -> void; - - auto GetLastMessage() -> cpp::span; - - /* - * Invokes the given callback to receive data, breaks the received data up - * into chunks with headers, and writes those chunks to the given output - * stream. - * - * The callback will be invoked with a byte buffer and its size. The callback - * should write as much data as it can to this buffer, and then return the - * number of bytes it wrote. Return a value of 0 to indicate that there is no - * more input to read. - */ - auto WriteChunkToStream(std::function)> callback, - TickType_t max_wait) -> ChunkWriteResult; - - private: - StreamBuffer* stream_; - std::size_t leftover_bytes_ = 0; -}; - -enum ChunkReadResult { - CHUNK_READ_OKAY, - // Returned when the chunk was read successfully, but the consumer did not - // use all of the data. - CHUNK_LEFTOVER_DATA, - // Returned an error in parsing the cbor-encoded header. - CHUNK_DECODING_ERROR, - // Returned when max_wait expired before any data was read. - CHUNK_READ_TIMEOUT, - // Returned when a non-chunk message is received. - CHUNK_STREAM_ENDED, - // Returned when the processing callback does not return a value. - CHUNK_PROCESSING_ERROR, -}; - class ChunkReader { public: - explicit ChunkReader(StreamBuffer* buffer); + explicit ChunkReader(std::size_t chunk_size); ~ChunkReader(); - auto Reset() -> void; - - auto GetLastMessage() -> cpp::span; + auto HandleLeftovers(std::size_t bytes_used) -> void; /* * Reads chunks of data from the given input stream, and invokes the given @@ -92,14 +37,13 @@ class ChunkReader { * If this function encounters a message in the stream that is not a chunk, it * will place the message at the start of the working_buffer and then return. */ - auto ReadChunkFromStream( - std::function(cpp::span)> callback, - TickType_t max_wait) -> ChunkReadResult; + auto HandleNewData(cpp::span data) -> cpp::span; private: - StreamBuffer* stream_; + std::byte* raw_working_buffer_; + cpp::span working_buffer_; + cpp::span last_data_in_working_buffer_; std::size_t leftover_bytes_ = 0; - std::size_t last_message_size_ = 0; }; } // namespace audio diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index f3704f1d..5625d941 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -22,32 +22,22 @@ class FatfsAudioInput : public IAudioElement { explicit FatfsAudioInput(std::shared_ptr storage); ~FatfsAudioInput(); + auto HasUnprocessedInput() -> bool override; + auto ProcessStreamInfo(const StreamInfo& info) -> cpp::result override; auto ProcessChunk(const cpp::span& chunk) -> cpp::result override; - auto ProcessIdle() -> cpp::result override; - - auto SendChunk(cpp::span dest) -> size_t; + auto Process() -> cpp::result override; FatfsAudioInput(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; private: - auto GetRingBufferDistance() const -> size_t; - std::shared_ptr storage_; - std::byte* raw_file_buffer_; - cpp::span file_buffer_; - cpp::span::iterator file_buffer_read_pos_; - cpp::span::iterator pending_read_pos_; - cpp::span::iterator file_buffer_write_pos_; - FIL current_file_; bool is_file_open_; - - std::unique_ptr chunk_writer_; }; } // namespace audio diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp index 75a3be76..4fbcad49 100644 --- a/src/audio/include/i2s_audio_output.hpp +++ b/src/audio/include/i2s_audio_output.hpp @@ -22,19 +22,14 @@ class I2SAudioOutput : public IAudioElement { std::unique_ptr dac); ~I2SAudioOutput(); - auto InputMinChunkSize() const -> std::size_t override { - // TODO(jacqueline): work out a good value here. Maybe similar to the total - // DMA buffer size? - return 128; - } + // TODO. + auto HasUnprocessedInput() -> bool override { return false; } - auto IdleTimeout() const -> TickType_t override; auto ProcessStreamInfo(const StreamInfo& info) -> cpp::result override; auto ProcessChunk(const cpp::span& chunk) -> cpp::result override; - auto ProcessIdle() -> cpp::result override; - auto PrepareForPause() -> void override; + auto Process() -> cpp::result override; I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; diff --git a/src/audio/include/stream_event.hpp b/src/audio/include/stream_event.hpp new file mode 100644 index 00000000..4dfdab41 --- /dev/null +++ b/src/audio/include/stream_event.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include + +#include "freertos/FreeRTOS.h" +#include "freertos/queue.h" + +#include "span.hpp" +#include "stream_info.hpp" + +namespace audio { + +struct StreamEvent { + static auto CreateStreamInfo(QueueHandle_t source, + std::unique_ptr payload) + -> std::unique_ptr; + static auto CreateChunkData(QueueHandle_t source, std::size_t chunk_size) + -> std::unique_ptr; + static auto CreateChunkNotification(QueueHandle_t source) + -> std::unique_ptr; + + StreamEvent(); + ~StreamEvent(); + StreamEvent(StreamEvent&&); + + QueueHandle_t source; + + enum { + UNINITIALISED, + STREAM_INFO, + CHUNK_DATA, + CHUNK_NOTIFICATION, + } tag; + + union { + std::unique_ptr stream_info; + + // Scott Meyers says: + // `About the only situation I can conceive of when a std::unique_ptr + // would make sense would be when you’re using a C-like API that returns a + // raw pointer to a heap array that you assume ownership of.` + // :-) + + struct { + std::unique_ptr raw_bytes; + cpp::span bytes; + } chunk_data; + + // FIXME: It would be nice to also support a pointer to himem data here, to + // save a little ordinary heap space. + }; + + StreamEvent(const StreamEvent&) = delete; + StreamEvent& operator=(const StreamEvent&) = delete; +}; + +} // namespace audio diff --git a/src/audio/include/stream_info.hpp b/src/audio/include/stream_info.hpp index 45f10fc6..ed3096bb 100644 --- a/src/audio/include/stream_info.hpp +++ b/src/audio/include/stream_info.hpp @@ -7,6 +7,7 @@ #include "cbor.h" #include "result.hpp" +#include "sys/_stdint.h" namespace audio { @@ -24,14 +25,24 @@ class StreamInfo { auto Channels() const -> const std::optional& { return channels_; } + auto BitsPerSample(uint8_t bpp) -> void { bits_per_sample_ = bpp; } + auto BitsPerSample() const -> const std::optional& { return bits_per_sample_; } + auto SampleRate(uint16_t rate) -> void { sample_rate_ = rate; } + auto SampleRate() const -> const std::optional& { return sample_rate_; } + auto ChunkSize() const -> const std::optional& { + return chunk_size_; + } + + auto ChunkSize(std::size_t s) -> void { chunk_size_ = s; } + auto Encode(CborEncoder& enc) -> std::optional; private: @@ -39,6 +50,7 @@ class StreamInfo { std::optional channels_; std::optional bits_per_sample_; std::optional sample_rate_; + std::optional chunk_size_; }; } // namespace audio diff --git a/src/audio/stream_buffer.cpp b/src/audio/stream_buffer.cpp index 9bdeaa72..71ab3965 100644 --- a/src/audio/stream_buffer.cpp +++ b/src/audio/stream_buffer.cpp @@ -17,10 +17,11 @@ StreamBuffer::StreamBuffer(std::size_t chunk_size, std::size_t buffer_size) raw_output_chunk_(static_cast( heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM))), output_chunk_(raw_output_chunk_, chunk_size) { - assert(input_chunk_.size() <= buffer_size); - assert(output_chunk_.size() <= buffer_size); - ESP_LOGI("streambuf", "created buffer of chunk size %d, total size %d", chunk_size, buffer_size); - } + assert(input_chunk_.size() <= buffer_size); + assert(output_chunk_.size() <= buffer_size); + ESP_LOGI("streambuf", "created buffer of chunk size %d, total size %d", + chunk_size, buffer_size); +} StreamBuffer::~StreamBuffer() { vMessageBufferDelete(handle_); diff --git a/src/audio/stream_event.cpp b/src/audio/stream_event.cpp new file mode 100644 index 00000000..0a116297 --- /dev/null +++ b/src/audio/stream_event.cpp @@ -0,0 +1,75 @@ +#include "stream_event.hpp" +#include +#include + +namespace audio { + +auto StreamEvent::CreateStreamInfo(QueueHandle_t source, + std::unique_ptr payload) + -> std::unique_ptr { + auto event = std::make_unique(); + event->tag = StreamEvent::STREAM_INFO; + event->source = source; + event->stream_info = std::move(payload); + return event; +} + +auto StreamEvent::CreateChunkData(QueueHandle_t source, std::size_t chunk_size) + -> std::unique_ptr { + auto event = std::make_unique(); + event->tag = StreamEvent::CHUNK_DATA; + event->source = source; + + auto raw_bytes = + static_cast(heap_caps_malloc(chunk_size, MALLOC_CAP_SPIRAM)); + + event->chunk_data.raw_bytes = std::make_unique(raw_bytes); + event->chunk_data.bytes = cpp::span(raw_bytes, chunk_size); + + return event; +} + +auto StreamEvent::CreateChunkNotification(QueueHandle_t source) + -> std::unique_ptr { + auto event = std::make_unique(); + event->tag = StreamEvent::CHUNK_NOTIFICATION; + event->source = source; + return event; +} + +StreamEvent::StreamEvent() : tag(StreamEvent::UNINITIALISED) {} + +StreamEvent::~StreamEvent() { + switch (tag) { + case UNINITIALISED: + break; + case STREAM_INFO: + stream_info.reset(); + break; + case CHUNK_DATA: + chunk_data.raw_bytes.reset(); + break; + case CHUNK_NOTIFICATION: + break; + } +} + +StreamEvent::StreamEvent(StreamEvent&& other) { + tag = other.tag; + source = other.source; + switch (tag) { + case UNINITIALISED: + break; + case STREAM_INFO: + stream_info = std::move(other.stream_info); + break; + case CHUNK_DATA: + chunk_data = std::move(other.chunk_data); + break; + case CHUNK_NOTIFICATION: + break; + } + other.tag = StreamEvent::UNINITIALISED; +} + +} // namespace audio diff --git a/src/audio/stream_message.cpp b/src/audio/stream_message.cpp index 055e7e96..0f3b56e9 100644 --- a/src/audio/stream_message.cpp +++ b/src/audio/stream_message.cpp @@ -3,7 +3,6 @@ #include #include "cbor.h" -#include "esp-idf/components/cbor/tinycbor/src/cbor.h" #include "span.hpp" namespace audio {