From 7c6fd654f50e6665efa4226c6b927f9762734182 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Sat, 1 Apr 2023 13:22:21 +1100 Subject: [PATCH] New pipeline building, still needs proper control --- src/audio/CMakeLists.txt | 2 +- src/audio/audio_decoder.cpp | 97 +++++++++--------- src/audio/audio_element.cpp | 56 ---------- src/audio/audio_playback.cpp | 48 +++------ src/audio/audio_task.cpp | 129 ++++++++++++++---------- src/audio/fatfs_audio_input.cpp | 20 ++-- src/audio/i2s_audio_output.cpp | 23 ++--- src/audio/include/audio_decoder.hpp | 8 +- src/audio/include/audio_element.hpp | 8 +- src/audio/include/audio_playback.hpp | 10 +- src/audio/include/audio_sink.hpp | 22 ++++ src/audio/include/audio_task.hpp | 34 +++---- src/audio/include/fatfs_audio_input.hpp | 3 +- src/audio/include/i2s_audio_output.hpp | 11 +- src/audio/include/pipeline.hpp | 7 +- src/audio/include/stream_info.hpp | 69 +++++++++++-- src/audio/pipeline.cpp | 14 +-- src/codecs/codec.cpp | 2 +- src/codecs/include/mad.hpp | 4 +- src/codecs/mad.cpp | 7 +- src/drivers/dac.cpp | 6 +- src/drivers/include/dac.hpp | 2 +- src/main/main.cpp | 4 +- src/memory/include/himem.hpp | 2 +- 24 files changed, 296 insertions(+), 292 deletions(-) delete mode 100644 src/audio/audio_element.cpp create mode 100644 src/audio/include/audio_sink.hpp diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index 6ad35f8f..34fcd8ee 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -1,7 +1,7 @@ idf_component_register( SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" - "audio_playback.cpp" "stream_event.cpp" "audio_element.cpp" "pipeline.cpp" + "audio_playback.cpp" "stream_event.cpp" "pipeline.cpp" INCLUDE_DIRS "include" REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory") diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp index f8614478..ada1f8f7 100644 --- a/src/audio/audio_decoder.cpp +++ b/src/audio/audio_decoder.cpp @@ -26,22 +26,22 @@ static const char* kTag = "DEC"; AudioDecoder::AudioDecoder() : IAudioElement(), - stream_info_({}), - has_samples_to_send_(false), - needs_more_input_(true) {} + current_codec_(), + current_input_format_(), + current_output_format_(), + has_samples_to_send_(false) {} AudioDecoder::~AudioDecoder() {} auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> bool { - if (!std::holds_alternative(info.data)) { + if (!std::holds_alternative(info.format)) { return false; } - const auto& encoded = std::get(info.data); + const auto& encoded = std::get(info.format); // Reuse the existing codec if we can. This will help with gapless playback, // since we can potentially just continue to decode as we were before, // without any setup overhead. - // TODO: use audio type from stream if (current_codec_ != nullptr && current_codec_->CanHandleType(encoded.type)) { current_codec_->ResetForNewStream(); @@ -60,43 +60,47 @@ auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> bool { return true; } -auto AudioDecoder::Process(std::vector* inputs, MutableStream* output) - -> void { +auto AudioDecoder::Process(const std::vector& inputs, + OutputStream* output) -> void { // We don't really expect multiple inputs, so just pick the first that // contains data. If none of them contain data, then we can still flush // pending samples. - auto input = - std::find_if(inputs->begin(), inputs->end(), - [](const Stream& s) { return s.data.size_bytes() > 0; }); - - if (input != inputs->end()) { - const StreamInfo* info = input->info; - if (!stream_info_ || *stream_info_ != *info) { - // The input stream has changed! Immediately throw everything away and - // start from scratch. - // TODO: special case gapless playback? needs thought. - stream_info_ = *info; - has_samples_to_send_ = false; - has_set_stream_info_ = false; - - ProcessStreamInfo(*info); - } + auto input = std::find_if( + inputs.begin(), inputs.end(), + [](const InputStream& s) { return s.data().size_bytes() > 0; }); + if (input == inputs.end()) { + input = inputs.begin(); + } + + const StreamInfo& info = input->info(); + if (!current_input_format_ || *current_input_format_ != info.format) { + // The input stream has changed! Immediately throw everything away and + // start from scratch. + current_input_format_ = info.format; + has_samples_to_send_ = false; - current_codec_->SetInput(input->data); + ProcessStreamInfo(info); } + current_codec_->SetInput(input->data()); + while (true) { if (has_samples_to_send_) { - if (!has_set_stream_info_) { - has_set_stream_info_ = true; + if (!current_output_format_) { auto format = current_codec_->GetOutputFormat(); - output->info->data.emplace( - format.bits_per_sample, format.sample_rate_hz, format.num_channels); + current_output_format_ = StreamInfo::Pcm{ + .channels = format.num_channels, + .bits_per_sample = format.bits_per_sample, + .sample_rate = format.sample_rate_hz, + }; + } + + if (!output->prepare(*current_output_format_)) { + break; } - auto write_res = current_codec_->WriteOutputSamples( - output->data.subspan(output->info->bytes_in_stream)); - output->info->bytes_in_stream += write_res.first; + auto write_res = current_codec_->WriteOutputSamples(output->data()); + output->add(write_res.first); has_samples_to_send_ = !write_res.second; if (has_samples_to_send_) { @@ -106,26 +110,23 @@ auto AudioDecoder::Process(std::vector* inputs, MutableStream* output) } } - if (input != inputs->end()) { - auto res = current_codec_->ProcessNextFrame(); - if (res.has_error()) { - // TODO(jacqueline): Handle errors. - return; - } - input->data = input->data.subspan(current_codec_->GetInputPosition()); + auto res = current_codec_->ProcessNextFrame(); + if (res.has_error()) { + // TODO(jacqueline): Handle errors. + return; + } - if (res.value()) { - // We're out of data in this buffer. Finish immediately; there's nothing - // to send. - break; - } else { - has_samples_to_send_ = true; - } - } else { - // No input; nothing to do. + if (res.value()) { + // We're out of useable data in this buffer. Finish immediately; there's + // nothing to send. + input->mark_incomplete(); break; + } else { + has_samples_to_send_ = true; } } + + input->consume(current_codec_->GetInputPosition()); } } // namespace audio diff --git a/src/audio/audio_element.cpp b/src/audio/audio_element.cpp deleted file mode 100644 index cef54631..00000000 --- a/src/audio/audio_element.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "audio_element.hpp" -#include - -namespace audio { - -IAudioElement::IAudioElement() - : input_events_(xQueueCreate(kEventQueueSize, sizeof(void*))), - output_events_(nullptr), - buffered_output_() {} - -IAudioElement::~IAudioElement() { - // Ensure we don't leak any memory from events leftover in the queue. - while (uxQueueSpacesAvailable(input_events_) < kEventQueueSize) { - StreamEvent* event; - if (xQueueReceive(input_events_, &event, 0)) { - free(event); - } else { - break; - } - } - // Technically there's a race here if someone is still adding to the queue, - // but hopefully the whole pipeline is stopped if an element is being - // destroyed. - vQueueDelete(input_events_); -} - -auto IAudioElement::SendOrBufferEvent(std::unique_ptr event) - -> bool { - if (!buffered_output_.empty()) { - // To ensure we send data in order, don't try to send if we've already - // failed to send something. - buffered_output_.push_back(std::move(event)); - return false; - } - StreamEvent* raw_event = event.release(); - if (!xQueueSend(output_events_, &raw_event, 0)) { - event.reset(raw_event); - buffered_output_.push_back(std::move(event)); - return false; - } - return true; -} - -auto IAudioElement::FlushBufferedOutput() -> bool { - while (!buffered_output_.empty()) { - StreamEvent* raw_event = buffered_output_.front().release(); - buffered_output_.pop_front(); - if (!xQueueSend(output_events_, &raw_event, 0)) { - buffered_output_.emplace_front(raw_event); - return false; - } - } - return true; -} - -} // namespace audio diff --git a/src/audio/audio_playback.cpp b/src/audio/audio_playback.cpp index 613f629c..89139ec4 100644 --- a/src/audio/audio_playback.cpp +++ b/src/audio/audio_playback.cpp @@ -8,6 +8,7 @@ #include "freertos/portmacro.h" #include "audio_decoder.hpp" +#include "audio_element.hpp" #include "audio_task.hpp" #include "chunk.hpp" #include "fatfs_audio_input.hpp" @@ -23,55 +24,30 @@ namespace audio { auto AudioPlayback::create(drivers::GpioExpander* expander) -> cpp::result, Error> { - // Create everything - auto source = std::make_shared(); - auto codec = std::make_shared(); - auto sink_res = I2SAudioOutput::create(expander); if (sink_res.has_error()) { return cpp::fail(ERR_INIT_ELEMENT); } - auto sink = sink_res.value(); - - auto playback = std::make_unique(); - - Pipeline *pipeline = new Pipeline(sink.get()); - pipeline->AddInput(codec.get())->AddInput(source.get()); - - task::Start(pipeline); - - return playback; + return std::make_unique(std::move(sink_res.value())); } -AudioPlayback::AudioPlayback() { - // Create everything - auto source = std::make_shared(); - auto codec = std::make_shared(); +AudioPlayback::AudioPlayback(std::unique_ptr output) + : file_source_(), i2s_output_(std::move(output)) { + AudioDecoder* codec = new AudioDecoder(); + elements_.emplace_back(codec); - auto sink_res = I2SAudioOutput::create(expander); - if (sink_res.has_error()) { - return cpp::fail(ERR_INIT_ELEMENT); - } - auto sink = sink_res.value(); - - auto playback = std::make_unique(); + Pipeline* pipeline = new Pipeline(elements_.front().get()); + pipeline->AddInput(file_source_.get()); - Pipeline *pipeline = new Pipeline(sink.get()); - pipeline->AddInput(codec.get())->AddInput(source.get()); - - task::Start(pipeline); - - return playback; + task::StartPipeline(pipeline, i2s_output_.get()); + task::StartDrain(i2s_output_.get()); } -AudioPlayback::~AudioPlayback() { - pipeline_->Quit(); -} +AudioPlayback::~AudioPlayback() {} auto AudioPlayback::Play(const std::string& filename) -> void { // TODO: concurrency, yo! - file_source->OpenFile(filename); - pipeline_->Play(); + file_source_->OpenFile(filename); } auto AudioPlayback::LogStatus() -> void { diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 542bada8..e6c7778c 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -8,7 +8,9 @@ #include #include +#include "audio_sink.hpp" #include "cbor.h" +#include "dac.hpp" #include "esp_err.h" #include "esp_heap_caps.h" #include "esp_log.h" @@ -33,22 +35,29 @@ namespace task { static const char* kTag = "task"; static const std::size_t kStackSize = 24 * 1024; +static const std::size_t kDrainStackSize = 1024; static const uint8_t kAudioCore = 0; -auto Start(Pipeline* pipeline) -> Handle* { - auto input_queue = xQueueCreate(8, 1); - +auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void { // Newly created task will free this. - AudioTaskArgs* args = new AudioTaskArgs{ - .pipeline = pipeline, - .input = input_queue, - }; + AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink}; - ESP_LOGI(kTag, "starting audio task"); + ESP_LOGI(kTag, "starting audio pipeline task"); xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args, kTaskPriorityAudio, NULL, kAudioCore); +} - return new Handle(input_queue); +auto StartDrain(IAudioSink* sink) -> void { + auto command = new std::atomic(PLAY); + // Newly created task will free this. + AudioDrainArgs* drain_args = new AudioDrainArgs{ + .sink = sink, + .command = command, + }; + + ESP_LOGI(kTag, "starting audio drain task"); + xTaskCreatePinnedToCore(&AudioDrainMain, "drain", kDrainStackSize, drain_args, + kTaskPriorityAudio, NULL, kAudioCore); } void AudioTaskMain(void* args) { @@ -57,10 +66,11 @@ void AudioTaskMain(void* args) { { AudioTaskArgs* real_args = reinterpret_cast(args); std::unique_ptr pipeline(real_args->pipeline); - QueueHandle_t input; - StreamBufferHandle_t output; + IAudioSink* sink = real_args->sink; delete real_args; + std::optional output_format; + std::vector elements = pipeline->GetIterationOrder(); std::size_t max_inputs = (*std::max_element(elements.begin(), elements.end(), @@ -74,9 +84,7 @@ void AudioTaskMain(void* args) { std::vector> in_regions(max_inputs); MappableRegion out_region; std::for_each(in_regions.begin(), in_regions.end(), - [](const MappableRegion& region) { - assert(region.is_valid); - }); + [](const auto& region) { assert(region.is_valid); }); assert(out_region.is_valid); // Each element has exactly one output buffer. @@ -90,55 +98,68 @@ void AudioTaskMain(void* args) { bool playing = true; bool quit = false; while (!quit) { - // TODO: full event here? - Command cmd; - bool has_cmd = xQueueReceive(input, &cmd, 0); - if (has_cmd) { - switch (cmd) { - case PLAY: - playing = true; - break; - case PAUSE: - playing = false; - break; - case QUIT: - quit = true; - break; - } - } - if (quit) { - break; - } - if (playing) { for (int i = 0; i < elements.size(); i++) { - std::vector in_streams; - elements.at(i)->InStreams(&in_regions, &in_streams); - MutableStream out_stream = elements.at(i)->OutStream(&out_region); + std::vector raw_in_streams; + elements.at(i)->InStreams(&in_regions, &raw_in_streams); + RawStream raw_out_stream = elements.at(i)->OutStream(&out_region); // Crop the input and output streams to the ranges that are safe to // touch. For the input streams, this is the region that contains // data. For the output stream, this is the region that does *not* // already contain data. - std::vector cropped_in_streams; - std::for_each(in_streams.begin(), in_streams.end(), - [&](MutableStream& s) { - cropped_in_streams.emplace_back( - s.info, s.data.first(s.info->bytes_in_stream)); - }); - - elements.at(i)->OutputElement()->Process(&cropped_in_streams, - &out_stream); - - for (int stream = 0; stream < in_streams.size(); stream++) { - MutableStream& orig_stream = in_streams.at(stream); - Stream& cropped_stream = cropped_in_streams.at(stream); - std::move(cropped_stream.data.begin(), cropped_stream.data.end(), - orig_stream.data.begin()); - orig_stream.info->bytes_in_stream = - cropped_stream.data.size_bytes(); + std::vector in_streams; + std::for_each(raw_in_streams.begin(), raw_in_streams.end(), + [&](RawStream& s) { in_streams.emplace_back(&s); }); + OutputStream out_stream(&raw_out_stream); + + elements.at(i)->OutputElement()->Process(in_streams, &out_stream); + } + + RawStream raw_sink_stream = elements.back()->OutStream(&out_region); + InputStream sink_stream(&raw_sink_stream); + + if (!output_format || output_format != sink_stream.info().format) { + // The format of the stream within the sink stream has changed. We + // need to reconfigure the sink, but shouldn't do so until we've fully + // drained the current buffer. + if (xStreamBufferIsEmpty(sink->buffer())) { + output_format = sink_stream.info().format; + sink->Configure(*output_format); } } + + // We've reconfigured the sink, or it was already configured correctly. + // Send through some data. + if (output_format == sink_stream.info().format) { + // TODO: tune the delay on this, as it's currently the only way to + // throttle this task's CPU time. Maybe also hold off on the pipeline + // if the buffer is already close to full? + std::size_t sent = xStreamBufferSend( + sink->buffer(), sink_stream.data().data(), + sink_stream.data().size_bytes(), pdMS_TO_TICKS(10)); + sink_stream.consume(sent); + } + } + } + } + vTaskDelete(NULL); +} + +void AudioDrainMain(void* args) { + { + AudioDrainArgs* real_args = reinterpret_cast(args); + IAudioSink* sink = real_args->sink; + std::atomic* command = real_args->command; + delete real_args; + + // TODO(jacqueline): implement PAUSE without busy-waiting. + while (*command != QUIT) { + std::byte buf[64]; + std::size_t len = + xStreamBufferReceive(sink->buffer(), buf, sizeof(buf), portMAX_DELAY); + if (len > 0) { + sink->Send({buf, len}); } } } diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index bd8748eb..b9882711 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -44,26 +44,30 @@ auto FatfsAudioInput::OpenFile(const std::string& path) -> void { is_file_open_ = true; } -auto FatfsAudioInput::Process(std::vector* inputs, - MutableStream* output) -> void { +auto FatfsAudioInput::Process(const std::vector& inputs, + OutputStream* output) -> void { if (!is_file_open_) { return; } + StreamInfo::Format format = StreamInfo::Encoded{codecs::STREAM_MP3}; + if (!output->prepare(format)) { + return; + } + + std::size_t max_size = output->data().size_bytes(); + std::size_t size = 0; FRESULT result = - f_read(¤t_file_, output->data.data(), output->data.size_bytes(), - &output->info->bytes_in_stream); + f_read(¤t_file_, output->data().data(), max_size, &size); if (result != FR_OK) { ESP_LOGE(kTag, "file I/O error %d", result); // TODO(jacqueline): Handle errors. return; } - // TODO: read from filename? - output->info->data = StreamInfo::Encoded{codecs::STREAM_MP3}; + output->add(size); - if (output->info->bytes_in_stream < output->data.size_bytes() || - f_eof(¤t_file_)) { + if (size < max_size || f_eof(¤t_file_)) { f_close(¤t_file_); is_file_open_ = false; } diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp index 7766ebed..55d45001 100644 --- a/src/audio/i2s_audio_output.cpp +++ b/src/audio/i2s_audio_output.cpp @@ -1,6 +1,7 @@ #include "i2s_audio_output.hpp" #include +#include #include #include "esp_err.h" @@ -18,7 +19,7 @@ static const char* kTag = "I2SOUT"; namespace audio { auto I2SAudioOutput::create(drivers::GpioExpander* expander) - -> cpp::result, Error> { + -> cpp::result, Error> { // First, we need to perform initial configuration of the DAC chip. auto dac_result = drivers::AudioDac::create(expander); if (dac_result.has_error()) { @@ -32,7 +33,7 @@ auto I2SAudioOutput::create(drivers::GpioExpander* expander) // dac->WriteVolume(255); dac->WriteVolume(120); // for testing - return std::make_shared(expander, std::move(dac)); + return std::make_unique(expander, std::move(dac)); } I2SAudioOutput::I2SAudioOutput(drivers::GpioExpander* expander, @@ -41,18 +42,18 @@ I2SAudioOutput::I2SAudioOutput(drivers::GpioExpander* expander, I2SAudioOutput::~I2SAudioOutput() {} -auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> bool { - if (!std::holds_alternative(info.data)) { +auto I2SAudioOutput::Configure(const StreamInfo::Format& format) -> bool { + if (!std::holds_alternative(format)) { return false; } - StreamInfo::Pcm pcm = std::get(info.data); + StreamInfo::Pcm pcm = std::get(format); if (current_config_ && pcm == *current_config_) { return true; } - ESP_LOGI(kTag, "incoming audio stream: %u bpp @ %u Hz", pcm.bits_per_sample, + ESP_LOGI(kTag, "incoming audio stream: %u bpp @ %lu Hz", pcm.bits_per_sample, pcm.sample_rate); drivers::AudioDac::BitsPerSample bps; @@ -92,14 +93,8 @@ auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> bool { return true; } -auto I2SAudioOutput::Process(std::vector* inputs, MutableStream* output) - -> void { - std::for_each(inputs->begin(), inputs->end(), [&](Stream& s) { - if (ProcessStreamInfo(s.info)) { - std::size_t bytes_written = dac_->WriteData(s.data); - s.data = s.data.subspan(bytes_written); - } - }); +auto I2SAudioOutput::Send(const cpp::span& data) -> void { + dac_->WriteData(data); } auto I2SAudioOutput::SetVolume(uint8_t volume) -> void { diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp index be8daf99..6a1b5177 100644 --- a/src/audio/include/audio_decoder.hpp +++ b/src/audio/include/audio_decoder.hpp @@ -24,7 +24,7 @@ class AudioDecoder : public IAudioElement { AudioDecoder(); ~AudioDecoder(); - auto Process(std::vector* inputs, MutableStream* output) + auto Process(const std::vector& inputs, OutputStream* output) -> void override; AudioDecoder(const AudioDecoder&) = delete; @@ -32,11 +32,9 @@ class AudioDecoder : public IAudioElement { private: std::unique_ptr current_codec_; - std::optional stream_info_; - - bool has_set_stream_info_; + std::optional current_input_format_; + std::optional current_output_format_; bool has_samples_to_send_; - bool needs_more_input_; auto ProcessStreamInfo(const StreamInfo& info) -> bool; }; diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp index c9192e4a..5884f7b2 100644 --- a/src/audio/include/audio_element.hpp +++ b/src/audio/include/audio_element.hpp @@ -37,11 +37,11 @@ static const size_t kEventQueueSize = 8; */ class IAudioElement { public: - IAudioElement(); - virtual ~IAudioElement(); + IAudioElement() {} + virtual ~IAudioElement() {} - virtual auto Process(std::vector* inputs, MutableStream* output) - -> void = 0; + virtual auto Process(const std::vector& inputs, + OutputStream* output) -> void = 0; }; } // namespace audio diff --git a/src/audio/include/audio_playback.hpp b/src/audio/include/audio_playback.hpp index 507e6f73..88dc29aa 100644 --- a/src/audio/include/audio_playback.hpp +++ b/src/audio/include/audio_playback.hpp @@ -8,6 +8,7 @@ #include "audio_task.hpp" #include "esp_err.h" #include "fatfs_audio_input.hpp" +#include "i2s_audio_output.hpp" #include "result.hpp" #include "span.hpp" @@ -28,7 +29,7 @@ class AudioPlayback { static auto create(drivers::GpioExpander* expander) -> cpp::result, Error>; - AudioPlayback(FatfsAudioInput *file_input); + explicit AudioPlayback(std::unique_ptr output); ~AudioPlayback(); /* @@ -44,10 +45,9 @@ class AudioPlayback { AudioPlayback& operator=(const AudioPlayback&) = delete; private: - FatfsAudioInput *file_source; - - std::vector> all_elements_; - std::unique_ptr pipeline_; + std::unique_ptr file_source_; + std::unique_ptr i2s_output_; + std::vector> elements_; }; } // namespace audio diff --git a/src/audio/include/audio_sink.hpp b/src/audio/include/audio_sink.hpp new file mode 100644 index 00000000..ed7eb02b --- /dev/null +++ b/src/audio/include/audio_sink.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "audio_element.hpp" +#include "stream_info.hpp" +namespace audio { + +class IAudioSink { + private: + static const std::size_t kDrainBufferSize = 8 * 1024; + StreamBufferHandle_t buffer_; + + public: + IAudioSink() : buffer_(xStreamBufferCreate(kDrainBufferSize, 1)) {} + virtual ~IAudioSink() { vStreamBufferDelete(buffer_); } + + virtual auto Configure(const StreamInfo::Format& format) -> bool = 0; + virtual auto Send(const cpp::span& data) -> void = 0; + + auto buffer() const -> StreamBufferHandle_t { return buffer_; } +}; + +} // namespace audio diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp index 8db99850..a7b7a0fa 100644 --- a/src/audio/include/audio_task.hpp +++ b/src/audio/include/audio_task.hpp @@ -5,38 +5,32 @@ #include #include "audio_element.hpp" +#include "audio_sink.hpp" +#include "dac.hpp" #include "freertos/portmacro.h" #include "pipeline.hpp" +#include "stream_buffer.hpp" namespace audio { namespace task { + +enum Command { PLAY, PAUSE, QUIT }; + struct AudioTaskArgs { Pipeline* pipeline; - QueueHandle_t input; + IAudioSink* sink; +}; +struct AudioDrainArgs { + IAudioSink* sink; + std::atomic* command; }; extern "C" void AudioTaskMain(void* args); +extern "C" void AudioDrainMain(void* args); -enum Command { PLAY, PAUSE, QUIT }; - -class Handle { - public: - explicit Handle(QueueHandle_t input); - ~Handle(); - - auto SetStreamInfo() -> void; - auto Play() -> void; - auto Pause() -> void; - auto Quit() -> void; - - auto OutputBuffer() -> StreamBufferHandle_t; - - private: - QueueHandle_t input; -}; - -auto Start(Pipeline* pipeline) -> Handle*; +auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void; +auto StartDrain(IAudioSink* sink) -> void; } // namespace task diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index b3a6d843..24f62e3c 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -16,6 +16,7 @@ #include "audio_element.hpp" #include "stream_buffer.hpp" +#include "stream_info.hpp" namespace audio { @@ -26,7 +27,7 @@ class FatfsAudioInput : public IAudioElement { auto OpenFile(const std::string& path) -> void; - auto Process(std::vector* inputs, MutableStream* output) + auto Process(const std::vector& inputs, OutputStream* output) -> void override; FatfsAudioInput(const FatfsAudioInput&) = delete; diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp index 57881b35..77019228 100644 --- a/src/audio/include/i2s_audio_output.hpp +++ b/src/audio/include/i2s_audio_output.hpp @@ -5,6 +5,7 @@ #include #include "audio_element.hpp" +#include "audio_sink.hpp" #include "chunk.hpp" #include "result.hpp" @@ -14,18 +15,18 @@ namespace audio { -class I2SAudioOutput : public IAudioElement { +class I2SAudioOutput : public IAudioSink { public: enum Error { DAC_CONFIG, I2S_CONFIG, STREAM_INIT }; static auto create(drivers::GpioExpander* expander) - -> cpp::result, Error>; + -> cpp::result, Error>; I2SAudioOutput(drivers::GpioExpander* expander, std::unique_ptr dac); ~I2SAudioOutput(); - auto Process(std::vector* inputs, MutableStream* output) - -> void override; + auto Configure(const StreamInfo::Format& format) -> bool override; + auto Send(const cpp::span& data) -> void override; I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; @@ -37,8 +38,6 @@ class I2SAudioOutput : public IAudioElement { std::unique_ptr dac_; std::optional current_config_; - - auto ProcessStreamInfo(const StreamInfo& info) -> bool; }; } // namespace audio diff --git a/src/audio/include/pipeline.hpp b/src/audio/include/pipeline.hpp index 42f70828..2e9247bc 100644 --- a/src/audio/include/pipeline.hpp +++ b/src/audio/include/pipeline.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "freertos/portmacro.h" @@ -16,7 +17,7 @@ static const std::size_t kPipelineBufferSize = 32 * 1024; class Pipeline { public: - Pipeline(IAudioElement* output); + explicit Pipeline(IAudioElement* output); ~Pipeline(); auto AddInput(IAudioElement* input) -> Pipeline*; @@ -25,9 +26,9 @@ class Pipeline { auto NumInputs() const -> std::size_t; auto InStreams(std::vector>*, - std::vector*) -> void; + std::vector*) -> void; - auto OutStream(MappableRegion*) -> MutableStream; + auto OutStream(MappableRegion*) -> RawStream; auto GetIterationOrder() -> std::vector; diff --git a/src/audio/include/stream_info.hpp b/src/audio/include/stream_info.hpp index 47f65649..5622517f 100644 --- a/src/audio/include/stream_info.hpp +++ b/src/audio/include/stream_info.hpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include "result.hpp" @@ -35,37 +37,82 @@ struct StreamInfo { // Number of bits per sample. uint8_t bits_per_sample; // The sample rate. - uint16_t sample_rate; + uint32_t sample_rate; bool operator==(const Pcm&) const = default; }; - std::variant data; + typedef std::variant Format; + Format format; bool operator==(const StreamInfo&) const = default; }; -class MutableStream { +class RawStream { public: StreamInfo* info; cpp::span data; + bool is_incomplete; - MutableStream(StreamInfo* i, cpp::span d) - : info(i), data(d) {} + RawStream(StreamInfo* i, cpp::span d) + : info(i), data(d), is_incomplete(false) {} }; /* * A byte buffer + associated metadata, which is not allowed to modify any of * the underlying data. */ -class Stream { +class InputStream { public: - explicit Stream(const MutableStream& s) : info(*s.info), data(s.data) {} + explicit InputStream(RawStream* s) : raw_(s) {} - const StreamInfo& info; - // `data` itself left mutable for signalling how much of the stream was - // consumed - cpp::span data; + void consume(std::size_t bytes) const { + auto new_data = raw_->data.subspan(bytes); + std::move(new_data.begin(), new_data.end(), raw_->data.begin()); + raw_->info->bytes_in_stream = new_data.size_bytes(); + } + + void mark_incomplete() const { raw_->is_incomplete = true; } + + const StreamInfo& info() const { return *raw_->info; } + + cpp::span data() const { + return raw_->data.first(raw_->info->bytes_in_stream); + } + + private: + RawStream* raw_; +}; + +class OutputStream { + public: + explicit OutputStream(RawStream* s) : raw_(s) {} + + void add(std::size_t bytes) const { raw_->info->bytes_in_stream += bytes; } + + bool prepare(const StreamInfo::Format& new_format) { + if (new_format == raw_->info->format) { + raw_->info->format = new_format; + return true; + } + if (raw_->is_incomplete) { + raw_->info->format = new_format; + raw_->info->bytes_in_stream = 0; + return true; + } + return false; + } + + const StreamInfo& info() const { return *raw_->info; } + + cpp::span data() const { + return raw_->data.subspan(raw_->info->bytes_in_stream); + } + + bool is_incomplete() const { return raw_->is_incomplete; } + + private: + RawStream* raw_; }; } // namespace audio diff --git a/src/audio/pipeline.cpp b/src/audio/pipeline.cpp index f42e6853..8af8f215 100644 --- a/src/audio/pipeline.cpp +++ b/src/audio/pipeline.cpp @@ -1,4 +1,5 @@ #include "pipeline.hpp" +#include #include "stream_info.hpp" namespace audio { @@ -7,7 +8,7 @@ Pipeline::Pipeline(IAudioElement* output) : root_(output), subtrees_() {} Pipeline::~Pipeline() {} auto Pipeline::AddInput(IAudioElement* input) -> Pipeline* { - subtrees_.emplace_back(input); + subtrees_.push_back(std::make_unique(input)); return subtrees_.back().get(); } @@ -21,15 +22,15 @@ auto Pipeline::NumInputs() const -> std::size_t { auto Pipeline::InStreams( std::vector>* regions, - std::vector* out) -> void { + std::vector* out) -> void { for (int i = 0; i < subtrees_.size(); i++) { - MutableStream s = subtrees_[i]->OutStream(®ions->at(i)); + RawStream s = subtrees_[i]->OutStream(®ions->at(i)); out->push_back(s); } } auto Pipeline::OutStream(MappableRegion* region) - -> MutableStream { + -> RawStream { return {&output_info_, region->Map(output_buffer_)}; } @@ -42,8 +43,9 @@ auto Pipeline::GetIterationOrder() -> std::vector { to_search.pop_back(); found.push_back(current); - to_search.insert(to_search.end(), current->subtrees_.begin(), - current->subtrees_.end()); + for (const auto& i : current->subtrees_) { + to_search.push_back(i.get()); + } } return found; diff --git a/src/codecs/codec.cpp b/src/codecs/codec.cpp index 4e9a6a47..bdc8f51e 100644 --- a/src/codecs/codec.cpp +++ b/src/codecs/codec.cpp @@ -5,7 +5,7 @@ namespace codecs { -auto CreateCodecForFile(const std::string& file) +auto CreateCodecForType(StreamType type) -> cpp::result, CreateCodecError> { return std::make_unique(); // TODO. } diff --git a/src/codecs/include/mad.hpp b/src/codecs/include/mad.hpp index 1f5791b9..074784fb 100644 --- a/src/codecs/include/mad.hpp +++ b/src/codecs/include/mad.hpp @@ -17,10 +17,10 @@ class MadMp3Decoder : public ICodec { MadMp3Decoder(); ~MadMp3Decoder(); - auto CanHandleFile(const std::string& path) -> bool override; + auto CanHandleType(StreamType type) -> bool override; auto GetOutputFormat() -> OutputFormat override; auto ResetForNewStream() -> void override; - auto SetInput(cpp::span input) -> void override; + auto SetInput(cpp::span input) -> void override; auto GetInputPosition() -> std::size_t override; auto ProcessNextFrame() -> cpp::result override; auto WriteOutputSamples(cpp::span output) diff --git a/src/codecs/mad.cpp b/src/codecs/mad.cpp index 1112bd62..eec2d633 100644 --- a/src/codecs/mad.cpp +++ b/src/codecs/mad.cpp @@ -5,6 +5,7 @@ #include "mad.h" #include "codec.hpp" +#include "types.hpp" namespace codecs { @@ -35,8 +36,8 @@ MadMp3Decoder::~MadMp3Decoder() { mad_header_finish(&header_); } -auto MadMp3Decoder::CanHandleFile(const std::string& path) -> bool { - return true; // TODO. +auto MadMp3Decoder::CanHandleType(StreamType type) -> bool { + return type == STREAM_MP3; } auto MadMp3Decoder::GetOutputFormat() -> OutputFormat { @@ -52,7 +53,7 @@ auto MadMp3Decoder::ResetForNewStream() -> void { has_decoded_header_ = false; } -auto MadMp3Decoder::SetInput(cpp::span input) -> void { +auto MadMp3Decoder::SetInput(cpp::span input) -> void { mad_stream_buffer(&stream_, reinterpret_cast(input.data()), input.size()); diff --git a/src/drivers/dac.cpp b/src/drivers/dac.cpp index 4d3aca1d..1f3ba557 100644 --- a/src/drivers/dac.cpp +++ b/src/drivers/dac.cpp @@ -192,15 +192,13 @@ auto AudioDac::Reconfigure(BitsPerSample bps, SampleRate rate) -> void { WriteRegister(Register::POWER_MODE, 0); } -auto AudioDac::WriteData(const cpp::span& data) - -> std::size_t { +auto AudioDac::WriteData(const cpp::span& data) -> void { std::size_t bytes_written = 0; esp_err_t err = i2s_channel_write(i2s_handle_, data.data(), data.size_bytes(), - &bytes_written, 0); + &bytes_written, portMAX_DELAY); if (err != ESP_ERR_TIMEOUT) { ESP_ERROR_CHECK(err); } - return bytes_written; } auto AudioDac::Stop() -> void { diff --git a/src/drivers/include/dac.hpp b/src/drivers/include/dac.hpp index 028d46cb..4a1b2a5b 100644 --- a/src/drivers/include/dac.hpp +++ b/src/drivers/include/dac.hpp @@ -71,7 +71,7 @@ class AudioDac { // TODO(jacqueline): worth supporting channels here as well? auto Reconfigure(BitsPerSample bps, SampleRate rate) -> void; - auto WriteData(const cpp::span& data) -> std::size_t; + auto WriteData(const cpp::span& data) -> void; auto Stop() -> void; auto LogStatus() -> void; diff --git a/src/main/main.cpp b/src/main/main.cpp index a30dc675..98a571b6 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -133,10 +133,10 @@ extern "C" void app_main(void) { (void*)lvglArgs, 1, sLvglStack, &sLvglTaskBuffer, 1); - std::shared_ptr playback; + std::unique_ptr playback; if (storage) { ESP_LOGI(TAG, "Init audio pipeline"); - auto playback_res = audio::AudioPlayback::create(expander, storage); + auto playback_res = audio::AudioPlayback::create(expander); if (playback_res.has_error()) { ESP_LOGE(TAG, "Failed! Playback will not work."); } else { diff --git a/src/memory/include/himem.hpp b/src/memory/include/himem.hpp index c65091d7..f71e912f 100644 --- a/src/memory/include/himem.hpp +++ b/src/memory/include/himem.hpp @@ -63,7 +63,7 @@ class MappableRegion { return {bytes_, size}; } - auto Map(const HimemAlloc &alloc) -> cpp::span { + auto Map(const HimemAlloc& alloc) -> cpp::span { if (bytes_ != nullptr) { ESP_ERROR_CHECK(esp_himem_unmap(range_handle, bytes_, size)); }