From 9176ef187227ffb56c249c5f321cd1bf50d4cfcc Mon Sep 17 00:00:00 2001 From: jacqueline Date: Tue, 22 Nov 2022 17:05:02 +1100 Subject: [PATCH] Add cbor wrapper, and chunk streaming util --- src/audio/CMakeLists.txt | 5 +- src/audio/audio_decoder.cpp | 42 +++--- src/{drivers => audio}/audio_playback.cpp | 0 src/audio/audio_task.cpp | 147 +++++++++++++------- src/audio/chunk.cpp | 115 ++++++++++++++++ src/audio/fatfs_audio_input.cpp | 53 ++------ src/audio/include/audio_decoder.hpp | 9 -- src/audio/include/audio_element.hpp | 72 ++-------- src/audio/include/chunk.hpp | 57 ++++++++ src/audio/include/fatfs_audio_input.hpp | 26 +--- src/audio/include/stream_info.hpp | 44 ++++++ src/audio/include/stream_message.hpp | 11 ++ src/audio/stream_info.cpp | 90 ++++++++++++ src/cbor/CMakeLists.txt | 6 + src/cbor/cbor_decoder.cpp | 158 ++++++++++++++++++++++ src/cbor/cbor_encoder.cpp | 53 ++++++++ src/cbor/include/cbor_decoder.hpp | 47 +++++++ src/cbor/include/cbor_encoder.hpp | 30 ++++ src/codecs/include/types.hpp | 12 ++ src/drivers/CMakeLists.txt | 2 +- src/main/main.cpp | 1 - 21 files changed, 772 insertions(+), 208 deletions(-) rename src/{drivers => audio}/audio_playback.cpp (100%) create mode 100644 src/audio/chunk.cpp create mode 100644 src/audio/include/chunk.hpp create mode 100644 src/audio/include/stream_info.hpp create mode 100644 src/audio/include/stream_message.hpp create mode 100644 src/audio/stream_info.cpp create mode 100644 src/cbor/CMakeLists.txt create mode 100644 src/cbor/cbor_decoder.cpp create mode 100644 src/cbor/cbor_encoder.cpp create mode 100644 src/cbor/include/cbor_decoder.hpp create mode 100644 src/cbor/include/cbor_encoder.hpp create mode 100644 src/codecs/include/types.hpp diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index d98484c6..89a2e54d 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -1,6 +1,7 @@ idf_component_register( - SRCS "audio_decoder.cpp" "fatfs_audio_input.cpp" "audio_task.cpp" + SRCS "audio_decoder.cpp" "audio_task.cpp" "fatfs_audio_input.cpp" "chunk.cpp" + "i2s_audio_output.cpp" "stream_info.cpp" INCLUDE_DIRS "include" - REQUIRES "codecs" "drivers") + REQUIRES "codecs" "drivers" "cbor") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp index ff9f0d62..7ed67339 100644 --- a/src/audio/audio_decoder.cpp +++ b/src/audio/audio_decoder.cpp @@ -1,12 +1,13 @@ #include "audio_decoder.hpp" #include +#include +#include #include "esp_heap_caps.h" #include "include/audio_element.hpp" #include "include/fatfs_audio_input.hpp" namespace audio { -static const TickType_t kMaxWaitTicks = portMAX_DELAY; // TODO: could this be larger? depends on the codecs i guess static const std::size_t kWorkingBufferSize = kMaxFrameSize; @@ -18,22 +19,14 @@ static const TickType_t kMaxWaitTicks = portMAX_DELAY; free(working_buffer_); } - auto AudioDecoder::InputCommandQueue() -> QueueHandle_t { - return input_queue_; - } - - auto AudioDecoder::SetInputCommandQueue(QueueHandle_t queue) -> void { - input_queue_ = queue; - } - - auto AudioDecoder::SetOutputCommandQueue(QueueHandle_t queue) -> void { - output_queue_ = queue; - } - auto AudioDecoder::InputBuffer() -> StreamBufferHandle_t { return input_buffer_; } + auto AudioDecoder::OutputBuffer() -> StreamBufferHandle_t { + return output_buffer_; + } + auto AudioDecoder::SetInputBuffer(StreamBufferHandle_t buffer) -> void { input_buffer_ = buffer; } @@ -72,12 +65,23 @@ static const TickType_t kMaxWaitTicks = portMAX_DELAY; return OK; } - auto result = current_codec_->Process(data, length, working_buffer_, kWorkingBufferSize); - if (result.has_value()) { - xStreamBufferSend(&output_buffer_, working_buffer_, result.value(), kMaxWaitTicks); - } else { - // TODO: handle i guess - return ERROR; + while (true) { + auto result = current_codec_->Process(data, length, working_buffer_, kWorkingBufferSize); + + if (result.has_error()) { + // TODO: handle i guess + return ERROR; + } + ICodec::Result process_res = result.value(); + + if (process_res.flush_output) { + xStreamBufferSend(&output_buffer_, working_buffer_, process_res.output_written, kMaxWaitTicks); + } + + if (process_res.need_more_input) { + // TODO: wtf do we do about the leftover bytes? + return OK; + } } return OK; diff --git a/src/drivers/audio_playback.cpp b/src/audio/audio_playback.cpp similarity index 100% rename from src/drivers/audio_playback.cpp rename to src/audio/audio_playback.cpp diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 86eb4f4a..ad0834e2 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -4,6 +4,7 @@ #include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" #include "esp_heap_caps.h" #include "freertos/portmacro.h" #include "freertos/queue.h" @@ -11,6 +12,7 @@ #include "audio_element.hpp" #include "include/audio_element.hpp" +#include "stream_message.hpp" namespace audio { @@ -22,71 +24,114 @@ void audio_task(void* args) { std::shared_ptr element = real_args->element; delete real_args; - QueueHandle_t commands = element->InputCommandQueue(); - StreamBufferHandle_t stream = element->InputBuffer(); + MessageBufferHandle_t *stream = element->InputBuffer(); - // TODO: think about overflow. - uint8_t current_sequence_number; - uint8_t* frame_buffer = + uint8_t* message_buffer = (uint8_t*)heap_caps_malloc(kFrameSize, MALLOC_CAP_SPIRAM); while (1) { - IAudioElement::Command command; - ProcessResult result; + BaseType_t rtos_res; + IAudioElement::ProcessResult result; - if (!xQueueReceive(commands, &command, kCommandWaitTicks)) { - result = element->ProcessIdle(); - if (result == IAudioElement::ERROR) { - break; - } - if (result == IAudioElement::OUTPUT_FULL) { - vTaskDelay(kIdleTaskDelay); - } - continue; - }; - if (command.type == IAudioElement::SEQUENCE_NUMBER) { - if (command.sequence_number > current_sequence_number) { - current_sequence_number = command.sequence_number; - } - continue; + size_t message_size = 0; + if (message_buffer != nullptr) { + // TODO: tune delay. + message_size = xMessageBufferReceive(stream, &message_buffer, kFrameSize, portMAX_DELAY); } - if (command.type == IAudioElement::READ) { - assert(command.read_size <= kFrameSize); - assert(stream != NULL); - xStreamBufferReceive(stream, &frame_buffer, command.read_size, 0); - - if (command.sequence_number == current_sequence_number) { - result = element->ProcessData(frame_buffer, command.read_size); - if (result == IAudioElement::ERROR) { - break; - } - if (result == IAudioElement::OUTPUT_FULL) { - // TODO: Do we care about this? could just park indefinitely. - } - } - + if (message_size == 0) { + element->ProcessIdle(); continue; } - if (command.type == IAudioElement::ELEMENT) { - assert(command.data != NULL); - if (command.sequence_number == current_sequence_number) { - result = element->ProcessElementCommand(command.data); - if (result == IAudioElement::ERROR) { - break; - } - if (result == IAudioElement::OUTPUT_FULL) { - // TODO: what does this mean lol - } - } else { - element->SkipElementCommand(command.data); + // We got a valid message. Check what kind it is so that we know how to + // process it. + CborParser parser; + CborValue value; + cbor_parser_init(message_buffer, message_size, &parser, &value); + + MessageType message_type; + if (!cbor_value_is_integer(&value) || !cbor_value_get_integer(&value, &message_type)) { + // We weren't able to parse the message type. This is really bad, so just + // abort. + break; // TODO. + } + + if (message_type == STREAM_INFO) { + errs = StreamInfo::Create(message_buffer, message_size).map(element->ProcessStreamInfo); + if (errs.has_error) { + // TODO; } + } else if (message_type == CHUNK_HEADER) { + } else { + // TODO. } - if (command.type == IAudioElement::QUIT) { - break; + cbor_value_ + if (!xQueueReceive(commands, &command, wait_time)) { + if (bytes_in_stream > 0) { + size_t read_length = std::min(kMaxFrameSize - leftover_data, bytes_in_stream); + xStreamBufferReceive(stream, &frame_buffer + leftover_data, read_length, 0); + + uint8_t *data_in = frame_buffer; + result = element->ProcessData(&data_in, read_length); + if (result == IAudioElement::ERROR) { + break; + } + + if (result == IAudioElement::LEFTOVER_DATA) { + leftover_data = frame_buffer + read_length - data_in; + memmove(frame_buffer, data_in, leftover_data); + } else { + leftover_data = 0; + } + } else { + result = element->ProcessIdle(); + if (result == IAudioElement::ERROR) { + break; + } + if (result == IAudioElement::OUTPUT_FULL) { + vTaskDelay(kIdleTaskDelay); + } + } + } else { + if (command.type == IAudioElement::SEQUENCE_NUMBER) { + if (command.sequence_number > current_sequence_number) { + current_sequence_number = command.sequence_number; + bytes_in_stream = 0; + } + } else if (command.type == IAudioElement::READ) { + assert(command.read_size <= kFrameSize); + assert(stream != NULL); + + if (command.sequence_number == current_sequence_number) { + bytes_in_stream += command.read_size; + } else { + // This data is for a different stream, so just discard it. + xStreamBufferReceive(stream, &frame_buffer, command.read_size, 0); + } + } else if (command.type == IAudioElement::ELEMENT) { + assert(command.data != NULL); + if (command.sequence_number == current_sequence_number) { + if (bytes_in_stream > 0) { + // We're not ready to handle this yet, so put it back. + xQueueSendToFront(commands, &command, kMaxWaitTicks); + } else { + result = element->ProcessElementCommand(command.data); + if (result == IAudioElement::ERROR) { + break; + } + if (result == IAudioElement::OUTPUT_FULL) { + // TODO: what does this mean lol + } + } + } else { + element->SkipElementCommand(command.data); + } + } else if (command.type == IAudioElement::QUIT) { + break; + } } } diff --git a/src/audio/chunk.cpp b/src/audio/chunk.cpp new file mode 100644 index 00000000..a8864930 --- /dev/null +++ b/src/audio/chunk.cpp @@ -0,0 +1,115 @@ +#include "chunk.hpp" + +#include "cbor_encoder.hpp" +#include "cbor_decoder.hpp" +#include +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" +#include "stream_message.hpp" + +namespace audio { + +/* + * The maximum size that we expect a header to take up. + */ +// TODO: tune this. +static const size_t kMaxHeaderSize = 64; + +auto WriteChunksToStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, size_t working_buffer_length, std::function callback, TickType_t max_wait) -> EncodeWriteResult { + while (1) { + // First, ask the callback for some data to write. + size_t chunk_size = + callback( + working_buffer + kMaxHeaderSize, + working_buffer_length - kMaxHeaderSize); + + if (chunk_size == 0) { + // They had nothing for us, so bail out. + return CHUNK_OUT_OF_DATA; + } + + // Put together a header. + cbor::Encoder encoder(cbor::CONTAINER_ARRAY, 3, working_buffer, working_buffer_length); + encoder.WriteUnsigned(TYPE_CHUNK_HEADER); + // Note here that we need to write the offset of the chunk into the header. + // We could be smarter here and write the actual header size, allowing us to + // pack slightly more data into each message, but this is hard so I haven't + // done it. Please make my code better for me. + encoder.WriteUnsigned(kMaxHeaderSize); + encoder.WriteUnsigned(chunk_size); + if (encoder.Finish().has_error()) { + return CHUNK_ENCODING_ERROR; + }; + + // Try to write to the buffer. Note the return type here will be either 0 or + // kMaxHeaderSize + chunk_size, as MessageBuffer doesn't allow partial + // writes. + size_t actual_write_size = + xMessageBufferSend( + *stream, working_buffer, kMaxHeaderSize + chunk_size, max_wait); + + if (actual_write_size == 0) { + // We failed to write in time, so bail out. This is techinically data loss + // unless the caller wants to go and parse our working buffer, but we + // assume the caller has a good reason to time us out. + return CHUNK_WRITE_TIMEOUT; + } + } +} + +auto ReadChunksFromStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, size_t working_buffer_length, std::function callback, TickType_t max_wait) -> EncodeReadResult { + // Spillover if the previous iteration did not consume all of the input. + size_t leftover_bytes = 0; + while (1) { + // First, wait for a message to arrive over the buffer. + size_t read_size = + xMessageBufferReceive( + *stream, working_buffer + leftover_bytes, working_buffer_length - leftover_bytes, max_wait); + + if (read_size == 0) { + return CHUNK_READ_TIMEOUT; + } + + auto decoder = cbor::MapDecoder::Create(working_buffer + leftover_bytes, read_size); + if (decoder.has_error()) { + // Weird; this implies someone is shoving invalid data into the buffer. + return CHUNK_DECODING_ERROR; + } + + MessageType type = decoder.value().ParseUnsigned().value_or(TYPE_UNKNOWN); + if (type != TYPE_CHUNK_HEADER) { + // This message wasn't for us, so put it in a consistent place and let the + // caller handle it. + memmove(working_buffer, working_buffer + leftover_bytes, read_size); + return CHUNK_STREAM_ENDED; + } + + // Work the size and position of the chunk (don't assume it's at + // kMaxHeaderSize offset for future-proofing). + header_length = decoder.ParseUnsigned().value_or(0); + chunk_length = decoder.ParseUnsigned().value_or(0); + if (decoder.Failed()) { + return CHUNK_DECODING_ERROR; + } + + // Now we need to stick the end of the last chunk (if it exists) onto the + // front of the new chunk. Do it this way around bc we assume the old chunk + // is shorter, and therefore faster to move. + uint8_t *combined_buffer = working_buffer + header_length - leftover_bytes; + size_t combined_buffer_size = leftover_bytes + chunk_length; + if (leftover_bytes > 0) { + memmove(combined_buffer, working_buffer, leftover_bytes); + } + + // Tell the callback about the new data. + size_t amount_processed = callback(combined_buffer, combined_buffer_size); + + // Prepare for the next iteration. + leftover_bytes = combined_buffer_size - amount_processed; + if (leftover_bytes > 0) { + memmove(working_buffer, combined_buffer + amount_processed, leftover_bytes); + } + } +} + +} // namespace audio diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index df38f1d6..0cf1abe5 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -16,59 +16,30 @@ static const TickType_t kMaxWaitTicks = portMAX_DELAY; // into memory as soon as possible. static constexpr std::size_t kOutputBufferSize = 1024 * 128; static constexpr std::size_t kQueueItemSize = sizeof(IAudioElement::Command); -// Use a large enough command queue size that we can fit reads for the full -// buffer into the queue. -static constexpr std::size_t kOutputQueueItemNumber = kOutputBufferSize / kMaxFrameSize; -static constexpr std::size_t kOutputQueueSize = kOutputQueueItemNumber * kQueueItemSize; - -// This should be a relatively responsive element, so no need for a particularly -// large queue. -static constexpr std::size_t kInputQueueItemNumber = 4; -static constexpr std::size_t kInputQueueSize = kInputQueueItemNumber * kQueueItemSize; FatfsAudioInput::FatfsAudioInput(std::shared_ptr storage) : IAudioElement(), storage_(storage) { working_buffer_ = heap_caps_malloc(kMaxFrameSize, MALLOC_CAP_SPIRAM); - input_queue_memory_ = heap_caps_malloc(kInputQueueSize, MALLOC_CAP_SPIRAM); - input_queue_ = xQueueCreateStatic( - kInputQueueItemNumber, kQueueItemSize, input_queue_memory_, &input_queue_metadata_); - - output_queue_memory_ = heap_caps_malloc(kOutputQueueSize, MALLOC_CAP_SPIRAM); - output_queue_ = - xQueueCreateStatic(kOutputQueueItems, kQueueItemSize, output_queue_memory_, - &output_queue_metadata_); - output_buffer_memory_ = - heap_caps_malloc(kOutputBufferSize, MALLOC_CAP_SPIRAM); + heap_caps_malloc(kOutputBufferSize + 1, MALLOC_CAP_SPIRAM); output_buffer_ = - xStreamBufferCreateStatic(kOutputBufferSize - 1, 1, output_buffer_memory_, + xMessageBufferCreateStatic(kOutputBufferSize, output_buffer_memory_, &output_buffer_metadata_); } FatfsAudioInput::~FatfsAudioInput() { free(working_buffer_); - vStreamBufferDelete(output_buffer_); + vMessageBufferDelete(output_buffer_); free(output_buffer_memory_); - vQueueDelete(output_queue_); - free(output_queue_memory_); - vQueueDelete(input_queue_); - free(input_queue_memory_); } -auto FatfsAudioInput::InputCommandQueue() -> QueueHandle_t { - return input_queue_; -} -auto FatfsAudioInput::OutputCommandQueue() -> QueueHandle_t { - return output_queue_; +auto FatfsAudioInput::InputBuffer() -> MessageBufferHandle_t { + return input_buffer_; } -auto FatfsAudioInput::InputBuffer() -> StreamBufferHandle_t { - return nullptr; -} - -auto FatfsAudioInput::OutputBuffer() -> StreamBufferHandle_t { +auto FatfsAudioInput::OutputBuffer() -> MessageBufferHandle_t { return output_buffer_; } @@ -94,20 +65,16 @@ auto FatfsAudioInput::ProcessElementCommand(void* command) -> ProcessResult { } is_file_open_ = true; - current_sequence_++; - - Command sequence_update; - sequence_update.type = SEQUENCE_NUMBER; - sequence_update.sequence_number = current_sequence_; if (real->interrupt) { + Command sequence_update; + sequence_update.type = SEQUENCE_NUMBER; + sequence_update.sequence_number = current_sequence_++; xQueueSendToFront(output_queue_, &sequence_update, kMaxWaitTicks); - } else { - xQueueSendToBack(output_queue_, &sequence_update, kMaxWaitTicks); } OutputCommand *data = new OutputCommand; - data->extension = "txt"; + data->extension = "mp3"; Command file_info; file_info.type = ELEMENT; file_info.sequence_number = current_sequence_; diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp index 083bd564..2ee43fb7 100644 --- a/src/audio/include/audio_decoder.hpp +++ b/src/audio/include/audio_decoder.hpp @@ -12,13 +12,6 @@ class AudioDecoder : public IAudioElement { AudioDecoder(); ~AudioDecoder(); - auto Pause() -> void; - auto IsPaused() -> bool; - - auto Resume() -> void; - - auto SetInputCommandQueue(QueueHandle_t) -> void; - auto SetOutputCommandQueue(QueueHandle_t) -> void; auto SetInputBuffer(StreamBufferHandle_t) -> void; auto SetOutputBuffer(StreamBufferHandle_t) -> void; @@ -27,8 +20,6 @@ class AudioDecoder : public IAudioElement { uint8_t *working_buffer_; - QueueHandle_t input_queue_; - QueueHandle_t output_queue_; StreamBufferHandle_t input_buffer_; StreamBufferHandle_t output_buffer_; }; diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp index 03fefd70..0be58f48 100644 --- a/src/audio/include/audio_element.hpp +++ b/src/audio/include/audio_element.hpp @@ -1,6 +1,10 @@ #pragma once +#include #include +#include "freertos/portmacro.h" +#include "types.hpp" +#include "result.hpp" namespace audio { @@ -10,71 +14,19 @@ class IAudioElement { public: virtual ~IAudioElement(); - enum CommandType { - /* - * Sets the sequence number of the most recent byte stream. Any commands - * received that have a lower sequence number than this will be discarded. - */ - SEQUENCE_NUMBER, - /* - * Instructs this element to read a specific number of bytes from its - * input buffer. - */ - READ_FRAME, - /* - * Represents an element-specific command. This handling of this is - * delegated to element implementations. - */ - ELEMENT, - /* Instructs this element to shut down. */ - QUIT, - }; - - struct Command { - CommandType type; - uint8_t sequence_number; - // TODO: tag data's type - union { - void* data; - std::size_t frame_size; - }; - }; + virtual auto IdleTimeout() -> TickType_t { return portMAX_DELAY; } - /* - * Returns a queue that should be used for all communication with this - * element. - */ - virtual auto InputCommandQueue() -> QueueHandle_t = 0; + virtual auto InputBuffer() -> MessageBufferHandle_t* = 0; - /* - * Returns a buffer that will be used to stream input bytes to this element. - * This may be NULL, if this element represents a source, e.g. a FATFS - * reader. - */ - virtual auto InputBuffer() -> StreamBufferHandle_t = 0; + virtual auto OutputBuffer() -> MessageBufferHandle_t* = 0; - enum ProcessResult { - OK, - OUTPUT_FULL, - ERROR, + enum StreamError { + BAD_FORMAT }; - /* - * Called when an element-specific command has been received. - */ - virtual auto ProcessElementCommand(void* command) -> ProcessResult = 0; - - virtual auto SkipElementCommand(void* command) -> void = 0; - - /* - * Called with the result of a read bytes command. - */ - virtual auto ProcessData(uint8_t* data, uint16_t length) -> ProcessResult = 0; - - /* - * Called periodically when there are no pending commands. - */ - virtual auto ProcessIdle() -> ProcessResult = 0; + virtual auto ProcessStreamInfo(StreamInfo &info) -> cpp::result = 0; + virtual auto ProcessChunk(uint8_t* data, std::size_t length) -> cpp::result = 0; + virtual auto ProcessIdle() -> cpp::result = 0; }; } // namespace audio diff --git a/src/audio/include/chunk.hpp b/src/audio/include/chunk.hpp new file mode 100644 index 00000000..1351ecfb --- /dev/null +++ b/src/audio/include/chunk.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" +#include "freertos/portmacro.h" +#include "result.hpp" + +namespace audio { + +enum ChunkWriteResult { + // Returned when the callback does not write any data. + CHUNK_OUT_OF_DATA, + // Returned when there is an error encoding a chunk header using cbor. + CHUNK_ENCODING_ERROR, + // Returned when max_wait expires without room in the stream buffer becoming + // available. + CHUNK_WRITE_TIMEOUT, +}; + +/* + * Invokes the given callback to receive data, breaks the received data up into + * chunks with headers, and writes those chunks to the given output stream. + * + * The callback will be invoked with a byte buffer and its size. The callback + * should write as much data as it can to this buffer, and then return the + * number of bytes it wrote. Return a value of 0 to indicate that there is no + * more input to read. + */ +auto WriteChunksToStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, size_t working_buffer_length, std::function callback, TickType_t max_wait) -> EncodeWriteResult; + + enum ChunkReadResult { + // Returned an error in parsing the cbor-encoded header. + CHUNK_DECODING_ERROR, + // Returned when max_wait expired before any data was read. + CHUNK_READ_TIMEOUT, + // Returned when a non-chunk message is received. + CHUNK_STREAM_ENDED, + }; + +/* + * Reads chunks of data from the given input stream, and invokes the given + * callback to process each of them in turn. + * + * The callback will be invoked with a byte buffer and its size. The callback + * should process as much data as it can from this buffer, and then return the + * number of bytes it was able to read. Any leftover bytes will be added as a + * prefix to the next chunk. + * + * If this function encounters a message in the stream that is not a chunk, it + * will place the message at the start of the working_buffer and then return. + */ +auto ReadChunksFromStream(MessageBufferHandle_t *stream, uint8_t *working_buffer, size_t working_buffer_length, std::function callback, TickType_t max_wait) -> EncodeReadResult; + +} // namespace audio diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index bf5f150d..5651419d 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -15,42 +15,24 @@ namespace audio { class FatfsAudioInput : public IAudioElement { public: - struct InputCommand { - std::string filename; - size_t seek_to; - bool interrupt; - }; - - struct OutputCommand { - std::string extension; - }; - FatfsAudioInput(std::shared_ptr storage); ~FatfsAudioInput(); - auto OutputCommandQueue() -> QueueHandle_t; - auto OutputBuffer() -> StreamBufferHandle_t; + auto OutputBuffer() -> MessageBufferHandle_t; private: std::shared_ptr storage_; uint8_t *working_buffer_; - uint8_t current_sequence_ = 0; FIL current_file_; bool is_file_open_ = false; - uint8_t* input_queue_memory_; - StaticQueue_t input_queue_metadata_; - QueueHandle_t input_queue_; - - uint8_t* output_queue_memory_; - StaticQueue_t output_queue_metadata_; - QueueHandle_t output_queue_; + MessageBufferHandle_t input_buffer_; uint8_t* output_buffer_memory_; - StaticStreamBuffer_t output_buffer_metadata_; - StreamBufferHandle_t output_buffer_; + StaticMessageBuffer_t output_buffer_metadata_; + MessageBufferHandle_t output_buffer_; }; } // namespace audio diff --git a/src/audio/include/stream_info.hpp b/src/audio/include/stream_info.hpp new file mode 100644 index 00000000..2b1429ea --- /dev/null +++ b/src/audio/include/stream_info.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" +#include "result.hpp" + +namespace audio { + +class StreamInfo { + public: + enum ParseError { + WRONG_TYPE, + MISSING_MAP, + }; + + static auto Create(const uint8_t *buffer, size_t length) -> cpp::result; + StreamInfo(CborValue& map); + + StreamInfo() = default; + StreamInfo(const StreamInfo&) = default; + + ~StreamInfo() = default; + + auto Path() const -> const std::optional& { return path_; } + auto Channels() const -> const std::optional& { return channels_; } + auto BitsPerSample() const -> const std::optional& { return bits_per_sample_; } + auto SampleRate() const -> const std::optional& { return sample_rate_; } + + enum EncodeError { + OUT_OF_MEMORY, + }; + + auto WriteToStream(CborEncoder encoder) -> cpp::result; + private: + + std::optional path_; + std::optional channels_; + std::optional bits_per_sample_; + std::optional sample_rate_; +}; + +} // namespace audio diff --git a/src/audio/include/stream_message.hpp b/src/audio/include/stream_message.hpp new file mode 100644 index 00000000..f59aba8d --- /dev/null +++ b/src/audio/include/stream_message.hpp @@ -0,0 +1,11 @@ +#pragma once + +namespace audio { + + enum MessageType { + TYPE_UNKNOWN, + TYPE_CHUNK_HEADER, + TYPE_STREAM_INFO, + }; + +} // namespace audio diff --git a/src/audio/stream_info.cpp b/src/audio/stream_info.cpp new file mode 100644 index 00000000..bb9b1fa2 --- /dev/null +++ b/src/audio/stream_info.cpp @@ -0,0 +1,90 @@ +#include "stream_info.hpp" +#include "stream_message.hpp" +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" + +namespace audio { + + static const char* kKeyPath = "p"; + static const char* kKeyChannels = "c"; + static const char* kKeyBitsPerSample = "b"; + static const char* kKeySampleRate = "r"; + + static auto find_uint64(CborValue &map, char *key) -> cpp::optional { + CborValue val; + cbor_value_map_find_value(&map, key, &val); + if (cbor_value_is_unsigned_integer(&val)) { + uint64_t raw_val; + cbor_value_get_uint64(&val, &raw_val); + return raw_val; + } + return {}; + } + + + static auto write_uint64(CborEncoder &map, const char *key, const optional &val) -> cpp::result { + if (val) { + cbor_encode_byte_string(&map, key, 1); + cbor_encode_uint(&map, *val); + } + return {}; + } + +static auto StreamInfo::Create(const uint8_t *buffer, size_t length) -> cpp::result { + CborParser parser; + CborValue value; + + cbor_parser_init(buffer, len, 0, &parser, &value); + + uint8_t type = 0; + if (!cbor_value_is_integer(&value) + || !cbor_value_get_integer(&value, &type) + || type != STREAM_INFO) { + return cpp::fail(WRONG_TYPE); + } + + cbor_value_advance_fixed(&value); + + if (!cbor_value_is_map(&value)) { + return cpp::fail(MISSING_MAP); + } + + return StreamInfo(value); +} + +StreamInfo::StreamInfo(CborValue& map) { + // TODO: this method is n^2, which seems less than ideal. But you don't do it + // that frequently, so maybe it's okay? Needs investigation. + channels_ = find_uint64(map, kKeyChannels); + bits_per_sample_ = find_uint64(map, kKeyBitsPerSample); + sample_rate_ = find_uint64(map, kKeySampleRate); + + CborValue val; + cbor_value_map_find_value(&map, kKeyPath, &val); + if (cbor_value_is_text_string(&val)) { + size_t len; + char *str; + cbor_value_dup_text_string(&val, &str, &len, &val); + path_ = std::string(str, len); + free(str); + } +} + +auto StreamInfo::WriteToStream(CborEncoder encoder) -> cpp::result { + cbor_encode_int(&encoder, STREAM_INFO); + + CborEncoder map; + cbor_encoder_create_map(&encoder, &map, length); + + write_uint64(&map, kKeyChannels, channels_); + write_uint64(&map, kKeyBitsPerSample, bits_per_sample_); + write_uint64(&map, kKeySampleRate, sample_rate_); + + if (path_) { + cbor_encode_text_string(&map, path_->c_str(), path_->size()); + } + + cbor_encoder_close_container(&encoder, &map); +} + +} // namespace audio diff --git a/src/cbor/CMakeLists.txt b/src/cbor/CMakeLists.txt new file mode 100644 index 00000000..cd5186e0 --- /dev/null +++ b/src/cbor/CMakeLists.txt @@ -0,0 +1,6 @@ +idf_component_register( + SRCS "cbor_decoder.cpp" "cbor_encoder.cpp" + INCLUDE_DIRS "include" + REQUIRES "cbor" "result") + +target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/cbor/cbor_decoder.cpp b/src/cbor/cbor_decoder.cpp new file mode 100644 index 00000000..eb43e163 --- /dev/null +++ b/src/cbor/cbor_decoder.cpp @@ -0,0 +1,158 @@ +#include "cbor_decoder.hpp" +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" +#include "include/cbor_decoder.hpp" + +namespace cbor { + +static auto ArrayDecoder::Create(uint8_t *buffer, size_t buffer_len) -> cpp::result, CborError> { + auto decoder = std::make_unique(); + cbor_parser_init(buffer, buffer_len, &decoder->parser_, &decoder->root_); + if (!cbor_value_is_array(&decoder->root_)) { + return cpp::fail(CborErrorIllegalType); + } + CborError err = cbor_value_enter_container(&decoder->root_, &decoder->it_); + if (err != CborNoError) { + return cpp::fail(err); + } + return std::move(decoder); +} + + +auto ArrayDecoder::ParseString() -> cpp::result { + if (error_ != CborNoError) { + return cpp::fail(error_); + } + + if (!cbor_value_is_byte_string(&it_)) { + error_ = CborErrorIllegalType; + return cpp::fail(error_); + } + uint8_t *buf; size_t len; CborValue new_val; + error_ = cbor_value_dup_byte_string(&it_, &buf, &len, &new_val); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + std::string ret(buf, len); + free(buf); + val_ = new_val; + return ret; +} + +auto ArrayDecoder::ParseUnsigned() -> cpp::result { + if (error_ != CborNoError) { + return cpp::fail(error_); + } + + if (!cbor_value_is_unsigned_integer(&it_)) { + error_ = CborErrorIllegalType; + return cpp::fail(error_); + } + uint64_t ret; + error_ = cbor_value_get_uint64(&it_, &ret); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + error_ = cbor_value_advance(&it_); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + return ret; +} + +auto ArrayDecoder::ParseSigned() -> cpp::result { + if (error_ != CborNoError) { + return cpp::fail(error_); + } + if (!cbor_value_is_unsigned_integer(&it_)) { + error_ = CborErrorIllegalType; + return cpp::fail(error_); + } + uint64_t ret; + error_ = cbor_value_get_uint64(&it_, &ret); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + error_ = cbor_value_advance(&it_); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + return ret; +} + +static auto MapDecoder::Create(uint8_t *buffer, size_t buffer_len) -> cpp::result, CborError> { + auto decoder = std::make_unique(); + cbor_parser_init(buffer, buffer_len, &decoder->parser_, &decoder->root_); + if (!cbor_value_is_map(&decoder->root_)) { + return cpp::fail(CborErrorIllegalType); + } + CborError err = cbor_value_enter_container(&decoder->root_, &decoder->it_); + if (err != CborNoError) { + return cpp::fail(err); + } + return std::move(decoder); +} + +auto MapDecoder::FindString(const std::string &key) -> std::optional { + CborValue val; + if (error_ != CborNoError) { + return {}; + } + if (cbor_value_map_find_value(&it_, key.c_str(), &val) != CborNoError) { + return {}; + } + if (!cbor_value_is_byte_string(&val)) { + error_ = CborErrorIllegalType; + return {}; + } + uint8_t *buf; size_t len; + error_ = cbor_value_dup_byte_string(&val, &buf, &len, NULL); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + std::string ret(buf, len); + free(buf); + return ret; +} + +auto MapDecoder::FindUnsigned(const std::string &key) -> std::optional { + CborValue val; + if (error_ != CborNoError) { + return {}; + } + if (cbor_value_map_find_value(&it_, key.c_str(), &val) != CborNoError) { + return {}; + } + if (!cbor_value_is_unsigned_integer(&val)) { + error_ = CborErrorIllegalType; + return {}; + } + uint64_t ret; + error_ = cbor_value_get_uint64(&val, &ret); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + return ret; +} + +auto MapDecoder::FindSigned(const std::string &key) -> std::optional { + CborValue val; + if (error_ != CborNoError) { + return {}; + } + if (cbor_value_map_find_value(&it_, key.c_str(), &val) != CborNoError) { + return {}; + } + if (!cbor_value_is_integer(&val)) { + error_ = CborErrorIllegalType; + return {}; + } + int32_t ret; + error_ = cbor_value_get_int(&val, &ret); + if (error_ != CborNoError) { + return cpp::fail(error_); + } + return ret; +} + +} // namespace cbor diff --git a/src/cbor/cbor_encoder.cpp b/src/cbor/cbor_encoder.cpp new file mode 100644 index 00000000..863597b4 --- /dev/null +++ b/src/cbor/cbor_encoder.cpp @@ -0,0 +1,53 @@ +#include "cbor_encoder.hpp" +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" + +namespace cbor { + + static const int kEncoderFlags = 0; + +Encoder::Encoder(ContainerType type, uint32_t container_len, uint8_t *buffer, size_t buffer_len) { + cbor_encoder_init(&root_encoder, buffer, buffer_len, kEncoderFlags); + switch (type) { + case CONTAINER_ARRAY: + error_ = cbor_encoder_create_array(&encoder, &container_encoder_, container_len); + break; + case CONTAINER_MAP: + error_ = cbor_encoder_create_map(&encoder, &container_encoder_, container_len); + break; + } +} + +auto Encoder::WriteString(const std::string &val) -> void { + if (error_ != CborNoError) { + return; + } + error_ = cbor_encode_byte_string(&container_encoder_, val.c_str(), val.size()); +} + +auto Encoder::WriteUnsigned(uint32_t val) -> void { + if (error_ != CborNoError) { + return; + } + error_ = cbor_encode_uint(&container_encoder_, val); +} + +auto Encoder::WriteSigned(int32_t val) -> void { + if (error_ != CborNoError) { + return; + } + error_ = cbor_encode_int(&container_encoder_, val); +} + +auto Encoder::Finish() -> cpp::result { + if (error_ != CborNoError) { + return cpp::fail(error_); + } + if (CborError final_error = cbor_encoder_close_container(&root_encoder, &container_encoder_) != CborNoError) { + return cpp::fail(final_error); + } + return cbor_encoder_get_buffer_size(&root_encoder); +} + +} // namespace cbor + diff --git a/src/cbor/include/cbor_decoder.hpp b/src/cbor/include/cbor_decoder.hpp new file mode 100644 index 00000000..249db9cc --- /dev/null +++ b/src/cbor/include/cbor_decoder.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include +namespace cbor { + + class ArrayDecoder { + public: + static auto Create(uint8_t *buffer, size_t buffer_len) -> cpp::result, CborError>; + + auto ParseString() -> cpp::result; + auto ParseUnsigned() -> cpp::result; + auto ParseSigned() -> cpp::result; + + auto Failed() -> CborError { return error_; } + + ArrayDecoder(const ArrayDecoder&) = delete; + ArrayDecoder& operator=(const ArrayDecoder&) = delete; + private: + CborParser parser_; + CborValue root_; + + CborValue it_; + CborError error_ = CborNoError; + }; + + class MapDecoder { + public: + static auto Create(uint8_t *buffer, size_t buffer_len) -> cpp::result, CborError>; + + auto FindString(const std::string &key) -> std::optional; + auto FindUnsigned(const std::string &key) -> std::optional; + auto FindSigned(const std::string &key) -> std::optional; + + auto Failed() -> CborError { return error_; } + + MapDecoder(const MapDecoder&) = delete; + MapDecoder& operator=(const MapDecoder&) = delete; + private: + CborParser parser_; + CborValue root_; + + CborValue it_; + CborError error_ = CborNoError; + }; + + +} // namespace cbor diff --git a/src/cbor/include/cbor_encoder.hpp b/src/cbor/include/cbor_encoder.hpp new file mode 100644 index 00000000..0edbbdff --- /dev/null +++ b/src/cbor/include/cbor_encoder.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include +#include "esp-idf/components/cbor/tinycbor/src/cbor.h" +namespace cbor { + + class Encoder { + public: + enum ContainerType { + CONTAINER_ARRAY, + CONTAINER_MAP + }; + Encoder(ContainerType type, uint32_t container_len, uint8_t *buffer, size_t buffer_len); + + auto WriteString(const std::string &val) -> void; + auto WriteUnsigned(uint32_t val) -> void; + auto WriteSigned(int32_t val) -> void; + + auto Finish() -> cpp::result; + + Encoder(const Encoder&) = delete; + Encoder& operator=(const Encoder&) = delete; + private: + CborEncoder root_encoder_; + CborEncoder container_encoder_; + + CborError error_ = CborNoError; + }; + +} // namespace cbor diff --git a/src/codecs/include/types.hpp b/src/codecs/include/types.hpp new file mode 100644 index 00000000..8525a136 --- /dev/null +++ b/src/codecs/include/types.hpp @@ -0,0 +1,12 @@ +#pragma once + +#include + +namespace codecs { + + enum StreamType { + STREAM_MP3, + }; + + auto GetStreamTypeFromFilename(std::string filename); +} diff --git a/src/drivers/CMakeLists.txt b/src/drivers/CMakeLists.txt index 90c1742a..fbecf1c8 100644 --- a/src/drivers/CMakeLists.txt +++ b/src/drivers/CMakeLists.txt @@ -1,6 +1,6 @@ idf_component_register( SRCS "dac.cpp" "gpio_expander.cpp" "battery.cpp" "storage.cpp" "i2c.cpp" - "audio_playback.cpp" "i2s_audio_output.cpp" "display.cpp" "display_init.cpp" "spi.cpp" + "spi.cpp" "display.cpp" "display_init.cpp" INCLUDE_DIRS "include" REQUIRES "esp_adc_cal" "fatfs" "result" "lvgl") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/main/main.cpp b/src/main/main.cpp index 24d47e9a..62cb430e 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -30,7 +30,6 @@ #include "display_init.hpp" #include "gpio_expander.hpp" #include "i2c.hpp" -#include "i2s_audio_output.hpp" #include "spi.hpp" #include "storage.hpp"