diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index d4c89214..6ad35f8f 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -1,7 +1,7 @@ idf_component_register( SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" - "audio_playback.cpp" "stream_event.cpp" "audio_element.cpp" + "audio_playback.cpp" "stream_event.cpp" "audio_element.cpp" "pipeline.cpp" INCLUDE_DIRS "include" REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory") diff --git a/src/audio/audio_decoder.cpp b/src/audio/audio_decoder.cpp index 9879b042..f8614478 100644 --- a/src/audio/audio_decoder.cpp +++ b/src/audio/audio_decoder.cpp @@ -2,13 +2,16 @@ #include +#include #include #include #include +#include "cbor/tinycbor/src/cborinternal_p.h" #include "freertos/FreeRTOS.h" #include "esp_heap_caps.h" +#include "esp_log.h" #include "freertos/message_buffer.h" #include "freertos/portmacro.h" @@ -21,126 +24,106 @@ namespace audio { static const char* kTag = "DEC"; -static const std::size_t kChunkSize = 1024; -static const std::size_t kReadahead = 8; - AudioDecoder::AudioDecoder() : IAudioElement(), - arena_(kChunkSize, kReadahead, MALLOC_CAP_SPIRAM), stream_info_({}), has_samples_to_send_(false), needs_more_input_(true) {} AudioDecoder::~AudioDecoder() {} -auto AudioDecoder::HasUnprocessedInput() -> bool { - return !needs_more_input_ || has_samples_to_send_; -} - -auto AudioDecoder::IsOverBuffered() -> bool { - return arena_.BlocksFree() == 0; -} - -auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> void { - stream_info_ = info; - - if (info.chunk_size) { - chunk_reader_.emplace(info.chunk_size.value()); - } else { - ESP_LOGE(kTag, "no chunk size given"); - return; +auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> bool { + if (!std::holds_alternative(info.data)) { + return false; } + const auto& encoded = std::get(info.data); // Reuse the existing codec if we can. This will help with gapless playback, // since we can potentially just continue to decode as we were before, // without any setup overhead. + // TODO: use audio type from stream if (current_codec_ != nullptr && - current_codec_->CanHandleFile(info.path.value_or(""))) { + current_codec_->CanHandleType(encoded.type)) { current_codec_->ResetForNewStream(); - return; + return true; } - auto result = codecs::CreateCodecForFile(info.path.value_or("")); + // TODO: use audio type from stream + auto result = codecs::CreateCodecForType(encoded.type); if (result.has_value()) { current_codec_ = std::move(result.value()); } else { ESP_LOGE(kTag, "no codec for this file"); - return; - } - - stream_info_ = info; - has_sent_stream_info_ = false; -} - -auto AudioDecoder::ProcessChunk(const cpp::span& chunk) -> void { - if (current_codec_ == nullptr || !chunk_reader_) { - // Should never happen, but fail explicitly anyway. - ESP_LOGW(kTag, "received chunk without chunk size or codec"); - return; + return false; } - ESP_LOGI(kTag, "received new chunk (size %u)", chunk.size()); - current_codec_->SetInput(chunk_reader_->HandleNewData(chunk)); - needs_more_input_ = false; + return true; } -auto AudioDecoder::ProcessEndOfStream() -> void { - has_samples_to_send_ = false; - needs_more_input_ = true; - current_codec_.reset(); +auto AudioDecoder::Process(std::vector* inputs, MutableStream* output) + -> void { + // We don't really expect multiple inputs, so just pick the first that + // contains data. If none of them contain data, then we can still flush + // pending samples. + auto input = + std::find_if(inputs->begin(), inputs->end(), + [](const Stream& s) { return s.data.size_bytes() > 0; }); + + if (input != inputs->end()) { + const StreamInfo* info = input->info; + if (!stream_info_ || *stream_info_ != *info) { + // The input stream has changed! Immediately throw everything away and + // start from scratch. + // TODO: special case gapless playback? needs thought. + stream_info_ = *info; + has_samples_to_send_ = false; + has_set_stream_info_ = false; + + ProcessStreamInfo(*info); + } - SendOrBufferEvent(std::unique_ptr( - StreamEvent::CreateEndOfStream(input_events_))); -} + current_codec_->SetInput(input->data); + } -auto AudioDecoder::Process() -> void { - if (has_samples_to_send_) { - // Writing samples is relatively quick (it's just a bunch of memcopy's), so - // do them all at once. - while (has_samples_to_send_ && !IsOverBuffered()) { - if (!has_sent_stream_info_) { - has_sent_stream_info_ = true; + while (true) { + if (has_samples_to_send_) { + if (!has_set_stream_info_) { + has_set_stream_info_ = true; auto format = current_codec_->GetOutputFormat(); - stream_info_->bits_per_sample = format.bits_per_sample; - stream_info_->sample_rate = format.sample_rate_hz; - stream_info_->channels = format.num_channels; - stream_info_->chunk_size = kChunkSize; - - auto event = - StreamEvent::CreateStreamInfo(input_events_, *stream_info_); - SendOrBufferEvent(std::unique_ptr(event)); - } - - auto block = arena_.Acquire(); - if (!block) { - return; + output->info->data.emplace( + format.bits_per_sample, format.sample_rate_hz, format.num_channels); } - auto write_res = - current_codec_->WriteOutputSamples({block->start, block->size}); - block->used_size = write_res.first; + auto write_res = current_codec_->WriteOutputSamples( + output->data.subspan(output->info->bytes_in_stream)); + output->info->bytes_in_stream += write_res.first; has_samples_to_send_ = !write_res.second; - auto chunk = std::unique_ptr( - StreamEvent::CreateArenaChunk(input_events_, *block)); - if (!SendOrBufferEvent(std::move(chunk))) { - return; + if (has_samples_to_send_) { + // We weren't able to fit all the generated samples into the output + // buffer. Stop trying; we'll finish up during the next pass. + break; } } - // We will process the next frame during the next call to this method. - } - - if (!needs_more_input_) { - auto res = current_codec_->ProcessNextFrame(); - if (res.has_error()) { - // TODO(jacqueline): Handle errors. - return; - } - needs_more_input_ = res.value(); - has_samples_to_send_ = true; - if (needs_more_input_) { - chunk_reader_->HandleBytesUsed(current_codec_->GetInputPosition()); + if (input != inputs->end()) { + auto res = current_codec_->ProcessNextFrame(); + if (res.has_error()) { + // TODO(jacqueline): Handle errors. + return; + } + input->data = input->data.subspan(current_codec_->GetInputPosition()); + + if (res.value()) { + // We're out of data in this buffer. Finish immediately; there's nothing + // to send. + break; + } else { + has_samples_to_send_ = true; + } + } else { + // No input; nothing to do. + break; } } } diff --git a/src/audio/audio_playback.cpp b/src/audio/audio_playback.cpp index c95a5d63..613f629c 100644 --- a/src/audio/audio_playback.cpp +++ b/src/audio/audio_playback.cpp @@ -13,6 +13,7 @@ #include "fatfs_audio_input.hpp" #include "gpio_expander.hpp" #include "i2s_audio_output.hpp" +#include "pipeline.hpp" #include "storage.hpp" #include "stream_buffer.hpp" #include "stream_info.hpp" @@ -20,11 +21,10 @@ namespace audio { -auto AudioPlayback::create(drivers::GpioExpander* expander, - std::shared_ptr storage) +auto AudioPlayback::create(drivers::GpioExpander* expander) -> cpp::result, Error> { // Create everything - auto source = std::make_shared(storage); + auto source = std::make_shared(); auto codec = std::make_shared(); auto sink_res = I2SAudioOutput::create(expander); @@ -35,41 +35,47 @@ auto AudioPlayback::create(drivers::GpioExpander* expander, auto playback = std::make_unique(); - // Configure the pipeline - playback->ConnectElements(source.get(), codec.get()); - playback->ConnectElements(codec.get(), sink.get()); + Pipeline *pipeline = new Pipeline(sink.get()); + pipeline->AddInput(codec.get())->AddInput(source.get()); - // Launch! - StartAudioTask("src", {}, source); - StartAudioTask("dec", {}, codec); - StartAudioTask("sink", 0, sink); - - playback->input_handle_ = source->InputEventQueue(); + task::Start(pipeline); return playback; } -AudioPlayback::AudioPlayback() {} +AudioPlayback::AudioPlayback() { + // Create everything + auto source = std::make_shared(); + auto codec = std::make_shared(); -AudioPlayback::~AudioPlayback() {} + auto sink_res = I2SAudioOutput::create(expander); + if (sink_res.has_error()) { + return cpp::fail(ERR_INIT_ELEMENT); + } + auto sink = sink_res.value(); -auto AudioPlayback::Play(const std::string& filename) -> void { - StreamInfo info; - info.path = filename; - auto event = StreamEvent::CreateStreamInfo(input_handle_, info); - xQueueSend(input_handle_, &event, portMAX_DELAY); - event = StreamEvent::CreateEndOfStream(input_handle_); - xQueueSend(input_handle_, &event, portMAX_DELAY); + auto playback = std::make_unique(); + + Pipeline *pipeline = new Pipeline(sink.get()); + pipeline->AddInput(codec.get())->AddInput(source.get()); + + task::Start(pipeline); + + return playback; } -auto AudioPlayback::LogStatus() -> void { - auto event = StreamEvent::CreateLogStatus(); - xQueueSendToFront(input_handle_, &event, portMAX_DELAY); +AudioPlayback::~AudioPlayback() { + pipeline_->Quit(); } -auto AudioPlayback::ConnectElements(IAudioElement* src, IAudioElement* sink) - -> void { - src->OutputEventQueue(sink->InputEventQueue()); +auto AudioPlayback::Play(const std::string& filename) -> void { + // TODO: concurrency, yo! + file_source->OpenFile(filename); + pipeline_->Play(); +} + +auto AudioPlayback::LogStatus() -> void { + // TODO. } } // namespace audio diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index ce6d724e..542bada8 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -2,16 +2,20 @@ #include +#include +#include #include #include #include #include "cbor.h" +#include "esp_err.h" #include "esp_heap_caps.h" #include "esp_log.h" #include "freertos/portmacro.h" #include "freertos/projdefs.h" #include "freertos/queue.h" +#include "pipeline.hpp" #include "span.hpp" #include "arena.hpp" @@ -25,25 +29,26 @@ namespace audio { +namespace task { + static const char* kTag = "task"; +static const std::size_t kStackSize = 24 * 1024; +static const uint8_t kAudioCore = 0; -auto StartAudioTask(const std::string& name, - std::optional core_id, - std::shared_ptr element) -> void { - auto task_handle = std::make_unique(); +auto Start(Pipeline* pipeline) -> Handle* { + auto input_queue = xQueueCreate(8, 1); // Newly created task will free this. - AudioTaskArgs* args = new AudioTaskArgs{.element = element}; - - ESP_LOGI(kTag, "starting audio task %s", name.c_str()); - if (core_id) { - xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(), - element->StackSizeBytes(), args, kTaskPriorityAudio, - task_handle.get(), *core_id); - } else { - xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args, - kTaskPriorityAudio, task_handle.get()); - } + AudioTaskArgs* args = new AudioTaskArgs{ + .pipeline = pipeline, + .input = input_queue, + }; + + ESP_LOGI(kTag, "starting audio task"); + xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args, + kTaskPriorityAudio, NULL, kAudioCore); + + return new Handle(input_queue); } void AudioTaskMain(void* args) { @@ -51,113 +56,88 @@ void AudioTaskMain(void* args) { // called before the task quits. { AudioTaskArgs* real_args = reinterpret_cast(args); - std::shared_ptr element = std::move(real_args->element); + std::unique_ptr pipeline(real_args->pipeline); + QueueHandle_t input; + StreamBufferHandle_t output; delete real_args; - // Queue of events that we have received on our input queue, but not yet - // processed. - std::deque> pending_events; - - // TODO(jacqueline): quit event - while (true) { - // First, we pull events from our input queue into pending_events. This - // keeps us responsive to any events that need to be handled immediately. - // Then we check if there's any events to flush downstream. - // Then we pass anything requiring processing to the element. - - bool has_work_to_do = - (!pending_events.empty() || element->HasUnflushedOutput() || - element->HasUnprocessedInput()) && - !element->IsOverBuffered(); - - if (has_work_to_do) { - ESP_LOGD(kTag, "checking for events"); - } else { - ESP_LOGD(kTag, "waiting for events"); - } - - // If we have no new events to process and the element has nothing left to - // do, then just delay forever waiting for a new event. - TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY; - - StreamEvent* new_event = nullptr; - bool has_event = - xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait); - - if (has_event) { - if (new_event->tag == StreamEvent::UNINITIALISED) { - ESP_LOGE(kTag, "discarding invalid event!!"); - } else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) { - delete new_event; - } else if (new_event->tag == StreamEvent::LOG_STATUS) { - element->ProcessLogStatus(); - if (element->OutputEventQueue() != nullptr) { - xQueueSendToFront(element->OutputEventQueue(), &new_event, 0); - } else { - delete new_event; - } - } else { - // This isn't an event that needs to be actioned immediately. Add it - // to our work queue. - pending_events.emplace_back(new_event); - ESP_LOGD(kTag, "deferring event"); + std::vector elements = pipeline->GetIterationOrder(); + std::size_t max_inputs = + (*std::max_element(elements.begin(), elements.end(), + [](Pipeline const* first, Pipeline const* second) { + return first->NumInputs() < second->NumInputs(); + })) + ->NumInputs(); + + // We need to be able to simultaneously map all of an element's inputs, plus + // its output. So preallocate that many ranges. + std::vector> in_regions(max_inputs); + MappableRegion out_region; + std::for_each(in_regions.begin(), in_regions.end(), + [](const MappableRegion& region) { + assert(region.is_valid); + }); + assert(out_region.is_valid); + + // Each element has exactly one output buffer. + std::vector> buffers(elements.size()); + std::vector buffer_infos(buffers.size()); + std::for_each(buffers.begin(), buffers.end(), + [](const HimemAlloc& alloc) { + assert(alloc.is_valid); + }); + + bool playing = true; + bool quit = false; + while (!quit) { + // TODO: full event here? + Command cmd; + bool has_cmd = xQueueReceive(input, &cmd, 0); + if (has_cmd) { + switch (cmd) { + case PLAY: + playing = true; + break; + case PAUSE: + playing = false; + break; + case QUIT: + quit = true; + break; } - // Loop again, so that we service all incoming events before doing our - // possibly expensive processing. - continue; } - - if (element->HasUnflushedOutput()) { - ESP_LOGD(kTag, "flushing output"); - } - - // We have no new events. Next, see if there's anything that needs to be - // flushed. - if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) { - // We had things to flush, but couldn't send it all. This probably - // implies that the downstream element is having issues servicing its - // input queue, so hold off for a moment before retrying. - ESP_LOGW(kTag, "failed to flush buffered output"); - vTaskDelay(pdMS_TO_TICKS(100)); - continue; + if (quit) { + break; } - if (element->HasUnprocessedInput()) { - ESP_LOGD(kTag, "processing input events"); - element->Process(); - continue; - } - - // The element ran out of data, so now it's time to let it process more - // input. - while (!pending_events.empty()) { - std::unique_ptr event; - pending_events.front().swap(event); - pending_events.pop_front(); - ESP_LOGD(kTag, "processing event, tag %i", event->tag); - - if (event->tag == StreamEvent::STREAM_INFO) { - ESP_LOGD(kTag, "processing stream info"); - - element->ProcessStreamInfo(*event->stream_info); - - } else if (event->tag == StreamEvent::ARENA_CHUNK) { - ESP_LOGD(kTag, "processing arena data"); - - memory::ArenaRef ref(event->arena_chunk); - auto callback = - StreamEvent::CreateChunkNotification(element->InputEventQueue()); - if (!xQueueSend(event->source, &callback, 0)) { - ESP_LOGW(kTag, "failed to send chunk notif"); - continue; + if (playing) { + for (int i = 0; i < elements.size(); i++) { + std::vector in_streams; + elements.at(i)->InStreams(&in_regions, &in_streams); + MutableStream out_stream = elements.at(i)->OutStream(&out_region); + + // Crop the input and output streams to the ranges that are safe to + // touch. For the input streams, this is the region that contains + // data. For the output stream, this is the region that does *not* + // already contain data. + std::vector cropped_in_streams; + std::for_each(in_streams.begin(), in_streams.end(), + [&](MutableStream& s) { + cropped_in_streams.emplace_back( + s.info, s.data.first(s.info->bytes_in_stream)); + }); + + elements.at(i)->OutputElement()->Process(&cropped_in_streams, + &out_stream); + + for (int stream = 0; stream < in_streams.size(); stream++) { + MutableStream& orig_stream = in_streams.at(stream); + Stream& cropped_stream = cropped_in_streams.at(stream); + std::move(cropped_stream.data.begin(), cropped_stream.data.end(), + orig_stream.data.begin()); + orig_stream.info->bytes_in_stream = + cropped_stream.data.size_bytes(); } - - // TODO(jacqueline): Consider giving the element a full ArenaRef here, - // so that it can hang on to it and potentially save an alloc+copy. - element->ProcessChunk({ref.ptr.start, ref.ptr.used_size}); - - // TODO: think about whether to do the whole queue - break; } } } @@ -165,4 +145,6 @@ void AudioTaskMain(void* args) { vTaskDelete(NULL); } +} // namespace task + } // namespace audio diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index 5354c5fd..bd8748eb 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -7,6 +7,8 @@ #include "arena.hpp" #include "esp_heap_caps.h" +#include "esp_log.h" +#include "ff.h" #include "freertos/portmacro.h" #include "audio_element.hpp" @@ -15,43 +17,23 @@ #include "stream_event.hpp" #include "stream_info.hpp" #include "stream_message.hpp" +#include "types.hpp" static const char* kTag = "SRC"; namespace audio { -static const std::size_t kChunkSize = 24 * 1024; -static const std::size_t kChunkReadahead = 2; - -FatfsAudioInput::FatfsAudioInput(std::shared_ptr storage) - : IAudioElement(), - arena_(kChunkSize, kChunkReadahead, MALLOC_CAP_SPIRAM), - storage_(storage), - current_file_(), - is_file_open_(false) {} +FatfsAudioInput::FatfsAudioInput() + : IAudioElement(), current_file_(), is_file_open_(false) {} FatfsAudioInput::~FatfsAudioInput() {} -auto FatfsAudioInput::HasUnprocessedInput() -> bool { - return is_file_open_; -} - -auto FatfsAudioInput::IsOverBuffered() -> bool { - return arena_.BlocksFree() == 0; -} - -auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) -> void { +auto FatfsAudioInput::OpenFile(const std::string& path) -> void { if (is_file_open_) { f_close(¤t_file_); is_file_open_ = false; } - - if (!info.path) { - // TODO(jacqueline): Handle errors. - return; - } - ESP_LOGI(kTag, "opening file %s", info.path->c_str()); - std::string path = *info.path; + ESP_LOGI(kTag, "opening file %s", path.c_str()); FRESULT res = f_open(¤t_file_, path.c_str(), FA_READ); if (res != FR_OK) { ESP_LOGE(kTag, "failed to open file! res: %i", res); @@ -60,51 +42,30 @@ auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) -> void { } is_file_open_ = true; - - StreamInfo new_info(info); - new_info.chunk_size = kChunkSize; - ESP_LOGI(kTag, "chunk size: %u bytes", kChunkSize); - - auto event = StreamEvent::CreateStreamInfo(input_events_, new_info); - SendOrBufferEvent(std::unique_ptr(event)); } -auto FatfsAudioInput::ProcessChunk(const cpp::span& chunk) -> void {} - -auto FatfsAudioInput::ProcessEndOfStream() -> void { - if (is_file_open_) { - f_close(¤t_file_); - is_file_open_ = false; - SendOrBufferEvent(std::unique_ptr( - StreamEvent::CreateEndOfStream(input_events_))); +auto FatfsAudioInput::Process(std::vector* inputs, + MutableStream* output) -> void { + if (!is_file_open_) { + return; } -} - -auto FatfsAudioInput::Process() -> void { - if (is_file_open_) { - auto dest_block = memory::ArenaRef::Acquire(&arena_); - if (!dest_block) { - return; - } - FRESULT result = f_read(¤t_file_, dest_block->ptr.start, - dest_block->ptr.size, &dest_block->ptr.used_size); - if (result != FR_OK) { - ESP_LOGE(kTag, "file I/O error %d", result); - // TODO(jacqueline): Handle errors. - return; - } - - if (dest_block->ptr.used_size < dest_block->ptr.size || - f_eof(¤t_file_)) { - f_close(¤t_file_); - is_file_open_ = false; - } + FRESULT result = + f_read(¤t_file_, output->data.data(), output->data.size_bytes(), + &output->info->bytes_in_stream); + if (result != FR_OK) { + ESP_LOGE(kTag, "file I/O error %d", result); + // TODO(jacqueline): Handle errors. + return; + } - auto dest_event = std::unique_ptr( - StreamEvent::CreateArenaChunk(input_events_, dest_block->Release())); + // TODO: read from filename? + output->info->data = StreamInfo::Encoded{codecs::STREAM_MP3}; - SendOrBufferEvent(std::move(dest_event)); + if (output->info->bytes_in_stream < output->data.size_bytes() || + f_eof(¤t_file_)) { + f_close(¤t_file_); + is_file_open_ = false; } } diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp index 110227cf..7766ebed 100644 --- a/src/audio/i2s_audio_output.cpp +++ b/src/audio/i2s_audio_output.cpp @@ -1,6 +1,7 @@ #include "i2s_audio_output.hpp" #include +#include #include "esp_err.h" #include "freertos/portmacro.h" @@ -10,14 +11,12 @@ #include "freertos/projdefs.h" #include "gpio_expander.hpp" #include "result.hpp" +#include "stream_info.hpp" -static const TickType_t kIdleTimeBeforeMute = pdMS_TO_TICKS(1000); static const char* kTag = "I2SOUT"; namespace audio { -static const std::size_t kDmaQueueLength = 8; - auto I2SAudioOutput::create(drivers::GpioExpander* expander) -> cpp::result, Error> { // First, we need to perform initial configuration of the DAC chip. @@ -38,40 +37,26 @@ auto I2SAudioOutput::create(drivers::GpioExpander* expander) I2SAudioOutput::I2SAudioOutput(drivers::GpioExpander* expander, std::unique_ptr dac) - : expander_(expander), - dac_(std::move(dac)), - chunk_reader_(), - latest_chunk_() {} + : expander_(expander), dac_(std::move(dac)), current_config_() {} I2SAudioOutput::~I2SAudioOutput() {} -auto I2SAudioOutput::HasUnprocessedInput() -> bool { - return latest_chunk_.size() > 0; -} - -auto I2SAudioOutput::IsOverBuffered() -> bool { - return false; -} - -auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> void { - // TODO(jacqueline): probs do something with the channel hey - - if (!info.bits_per_sample || !info.sample_rate) { - ESP_LOGE(kTag, "audio stream missing bits or sample rate"); - return; +auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> bool { + if (!std::holds_alternative(info.data)) { + return false; } - if (!info.chunk_size) { - ESP_LOGE(kTag, "audio stream missing chunk size"); - return; + StreamInfo::Pcm pcm = std::get(info.data); + + if (current_config_ && pcm == *current_config_) { + return true; } - chunk_reader_.emplace(*info.chunk_size); - ESP_LOGI(kTag, "incoming audio stream: %u bpp @ %u Hz", *info.bits_per_sample, - *info.sample_rate); + ESP_LOGI(kTag, "incoming audio stream: %u bpp @ %u Hz", pcm.bits_per_sample, + pcm.sample_rate); drivers::AudioDac::BitsPerSample bps; - switch (*info.bits_per_sample) { + switch (pcm.bits_per_sample) { case 16: bps = drivers::AudioDac::BPS_16; break; @@ -83,11 +68,11 @@ auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> void { break; default: ESP_LOGE(kTag, "dropping stream with unknown bps"); - return; + return false; } drivers::AudioDac::SampleRate sample_rate; - switch (*info.sample_rate) { + switch (pcm.sample_rate) { case 44100: sample_rate = drivers::AudioDac::SAMPLE_RATE_44_1; break; @@ -96,37 +81,25 @@ auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> void { break; default: ESP_LOGE(kTag, "dropping stream with unknown rate"); - return; + return false; } - dac_->Reconfigure(bps, sample_rate); -} - -auto I2SAudioOutput::ProcessChunk(const cpp::span& chunk) -> void { - latest_chunk_ = chunk_reader_->HandleNewData(chunk); -} + // TODO(jacqueline): probs do something with the channel hey -auto I2SAudioOutput::ProcessEndOfStream() -> void { - dac_->Stop(); - SendOrBufferEvent(std::unique_ptr( - StreamEvent::CreateEndOfStream(input_events_))); -} + dac_->Reconfigure(bps, sample_rate); + current_config_ = pcm; -auto I2SAudioOutput::ProcessLogStatus() -> void { - dac_->LogStatus(); + return true; } -auto I2SAudioOutput::Process() -> void { - // Note: avoid logging here! We need to get bytes from the chunk buffer into - // the I2S DMA buffer as fast as possible, to avoid running out of samples. - std::size_t bytes_written = dac_->WriteData(latest_chunk_); - if (bytes_written == latest_chunk_.size_bytes()) { - latest_chunk_ = cpp::span(); - chunk_reader_->HandleBytesLeftOver(0); - } else { - latest_chunk_ = latest_chunk_.subspan(bytes_written); - } - return; +auto I2SAudioOutput::Process(std::vector* inputs, MutableStream* output) + -> void { + std::for_each(inputs->begin(), inputs->end(), [&](Stream& s) { + if (ProcessStreamInfo(s.info)) { + std::size_t bytes_written = dac_->WriteData(s.data); + s.data = s.data.subspan(bytes_written); + } + }); } auto I2SAudioOutput::SetVolume(uint8_t volume) -> void { diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp index 47642469..be8daf99 100644 --- a/src/audio/include/audio_decoder.hpp +++ b/src/audio/include/audio_decoder.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "chunk.hpp" #include "ff.h" @@ -10,6 +11,7 @@ #include "audio_element.hpp" #include "codec.hpp" +#include "stream_info.hpp" namespace audio { @@ -22,28 +24,21 @@ class AudioDecoder : public IAudioElement { AudioDecoder(); ~AudioDecoder(); - auto StackSizeBytes() const -> std::size_t override { return 10 * 1024; }; - - auto HasUnprocessedInput() -> bool override; - auto IsOverBuffered() -> bool override; - - auto ProcessStreamInfo(const StreamInfo& info) -> void override; - auto ProcessChunk(const cpp::span& chunk) -> void override; - auto ProcessEndOfStream() -> void override; - auto Process() -> void override; + auto Process(std::vector* inputs, MutableStream* output) + -> void override; AudioDecoder(const AudioDecoder&) = delete; AudioDecoder& operator=(const AudioDecoder&) = delete; private: - memory::Arena arena_; std::unique_ptr current_codec_; std::optional stream_info_; - std::optional chunk_reader_; - bool has_sent_stream_info_; + bool has_set_stream_info_; bool has_samples_to_send_; bool needs_more_input_; + + auto ProcessStreamInfo(const StreamInfo& info) -> bool; }; } // namespace audio diff --git a/src/audio/include/audio_element.hpp b/src/audio/include/audio_element.hpp index 91036348..c9192e4a 100644 --- a/src/audio/include/audio_element.hpp +++ b/src/audio/include/audio_element.hpp @@ -40,62 +40,8 @@ class IAudioElement { IAudioElement(); virtual ~IAudioElement(); - /* - * Returns the stack size in bytes that this element requires. This should - * be tuned according to the observed stack size of each element, as different - * elements have fairly different stack requirements (particular decoders). - */ - virtual auto StackSizeBytes() const -> std::size_t { return 4096; }; - - /* Returns this element's input buffer. */ - auto InputEventQueue() const -> QueueHandle_t { return input_events_; } - /* Returns this element's output buffer. */ - auto OutputEventQueue() const -> QueueHandle_t { return output_events_; } - auto OutputEventQueue(const QueueHandle_t q) -> void { output_events_ = q; } - - virtual auto HasUnprocessedInput() -> bool = 0; - - virtual auto IsOverBuffered() -> bool { return false; } - - auto HasUnflushedOutput() -> bool { return !buffered_output_.empty(); } - auto FlushBufferedOutput() -> bool; - - /* - * Called when a StreamInfo message is received. Used to configure this - * element in preperation for incoming chunks. - */ - virtual auto ProcessStreamInfo(const StreamInfo& info) -> void = 0; - - /* - * Called when a ChunkHeader message is received. Includes the data associated - * with this chunk of stream data. This method should return the number of - * bytes in this chunk that were actually used; leftover bytes will be - * prepended to the next call. - */ - virtual auto ProcessChunk(const cpp::span& chunk) -> void = 0; - - virtual auto ProcessEndOfStream() -> void = 0; - - virtual auto ProcessLogStatus() -> void {} - - /* - * Called when there has been no data received over the input buffer for some - * time. This could be used to synthesize output, or to save memory by - * releasing unused resources. - */ - virtual auto Process() -> void = 0; - - protected: - auto SendOrBufferEvent(std::unique_ptr event) -> bool; - - // Queue for events coming into this element. Owned by us. - QueueHandle_t input_events_; - // Queue for events going into the next element. Not owned by us, may be null - // if we're not yet in a pipeline. - // FIXME: it would be nicer if this was non-nullable. - QueueHandle_t output_events_; - // Output events that have been generated, but are yet to be sent downstream. - std::deque> buffered_output_; + virtual auto Process(std::vector* inputs, MutableStream* output) + -> void = 0; }; } // namespace audio diff --git a/src/audio/include/audio_playback.hpp b/src/audio/include/audio_playback.hpp index a1bea64f..507e6f73 100644 --- a/src/audio/include/audio_playback.hpp +++ b/src/audio/include/audio_playback.hpp @@ -5,7 +5,9 @@ #include #include +#include "audio_task.hpp" #include "esp_err.h" +#include "fatfs_audio_input.hpp" #include "result.hpp" #include "span.hpp" @@ -23,12 +25,10 @@ namespace audio { class AudioPlayback { public: enum Error { ERR_INIT_ELEMENT, ERR_MEM }; - static auto create(drivers::GpioExpander* expander, - std::shared_ptr storage) + static auto create(drivers::GpioExpander* expander) -> cpp::result, Error>; - // TODO(jacqueline): configure on the fly once we have things to configure. - AudioPlayback(); + AudioPlayback(FatfsAudioInput *file_input); ~AudioPlayback(); /* @@ -44,9 +44,10 @@ class AudioPlayback { AudioPlayback& operator=(const AudioPlayback&) = delete; private: - auto ConnectElements(IAudioElement* src, IAudioElement* sink) -> void; + FatfsAudioInput *file_source; - QueueHandle_t input_handle_; + std::vector> all_elements_; + std::unique_ptr pipeline_; }; } // namespace audio diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp index df70ebaa..8db99850 100644 --- a/src/audio/include/audio_task.hpp +++ b/src/audio/include/audio_task.hpp @@ -6,17 +6,38 @@ #include "audio_element.hpp" #include "freertos/portmacro.h" +#include "pipeline.hpp" namespace audio { +namespace task { struct AudioTaskArgs { - std::shared_ptr& element; + Pipeline* pipeline; + QueueHandle_t input; }; -auto StartAudioTask(const std::string& name, - std::optional core_id, - std::shared_ptr element) -> void; +extern "C" void AudioTaskMain(void* args); -void AudioTaskMain(void* args); +enum Command { PLAY, PAUSE, QUIT }; + +class Handle { + public: + explicit Handle(QueueHandle_t input); + ~Handle(); + + auto SetStreamInfo() -> void; + auto Play() -> void; + auto Pause() -> void; + auto Quit() -> void; + + auto OutputBuffer() -> StreamBufferHandle_t; + + private: + QueueHandle_t input; +}; + +auto Start(Pipeline* pipeline) -> Handle*; + +} // namespace task } // namespace audio diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index 9f2d676c..b3a6d843 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -3,41 +3,36 @@ #include #include #include +#include #include "arena.hpp" #include "chunk.hpp" #include "freertos/FreeRTOS.h" +#include "ff.h" #include "freertos/message_buffer.h" #include "freertos/queue.h" #include "span.hpp" #include "audio_element.hpp" -#include "storage.hpp" #include "stream_buffer.hpp" namespace audio { class FatfsAudioInput : public IAudioElement { public: - explicit FatfsAudioInput(std::shared_ptr storage); + explicit FatfsAudioInput(); ~FatfsAudioInput(); - auto HasUnprocessedInput() -> bool override; - auto IsOverBuffered() -> bool override; + auto OpenFile(const std::string& path) -> void; - auto ProcessStreamInfo(const StreamInfo& info) -> void override; - auto ProcessChunk(const cpp::span& chunk) -> void override; - auto ProcessEndOfStream() -> void override; - auto Process() -> void override; + auto Process(std::vector* inputs, MutableStream* output) + -> void override; FatfsAudioInput(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; private: - memory::Arena arena_; - std::shared_ptr storage_; - FIL current_file_; bool is_file_open_; }; diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp index 2bea091b..57881b35 100644 --- a/src/audio/include/i2s_audio_output.hpp +++ b/src/audio/include/i2s_audio_output.hpp @@ -2,6 +2,7 @@ #include #include +#include #include "audio_element.hpp" #include "chunk.hpp" @@ -9,6 +10,7 @@ #include "dac.hpp" #include "gpio_expander.hpp" +#include "stream_info.hpp" namespace audio { @@ -22,14 +24,8 @@ class I2SAudioOutput : public IAudioElement { std::unique_ptr dac); ~I2SAudioOutput(); - auto HasUnprocessedInput() -> bool override; - auto IsOverBuffered() -> bool override; - - auto ProcessStreamInfo(const StreamInfo& info) -> void override; - auto ProcessChunk(const cpp::span& chunk) -> void override; - auto ProcessEndOfStream() -> void override; - auto ProcessLogStatus() -> void override; - auto Process() -> void override; + auto Process(std::vector* inputs, MutableStream* output) + -> void override; I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; @@ -40,8 +36,9 @@ class I2SAudioOutput : public IAudioElement { drivers::GpioExpander* expander_; std::unique_ptr dac_; - std::optional chunk_reader_; - cpp::span latest_chunk_; + std::optional current_config_; + + auto ProcessStreamInfo(const StreamInfo& info) -> bool; }; } // namespace audio diff --git a/src/audio/include/pipeline.hpp b/src/audio/include/pipeline.hpp new file mode 100644 index 00000000..42f70828 --- /dev/null +++ b/src/audio/include/pipeline.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include + +#include "freertos/portmacro.h" + +#include "audio_element.hpp" +#include "himem.hpp" +#include "stream_info.hpp" + +namespace audio { + +static const std::size_t kPipelineBufferSize = 32 * 1024; + +class Pipeline { + public: + Pipeline(IAudioElement* output); + ~Pipeline(); + auto AddInput(IAudioElement* input) -> Pipeline*; + + auto OutputElement() const -> IAudioElement*; + + auto NumInputs() const -> std::size_t; + + auto InStreams(std::vector>*, + std::vector*) -> void; + + auto OutStream(MappableRegion*) -> MutableStream; + + auto GetIterationOrder() -> std::vector; + + private: + IAudioElement* root_; + std::vector> subtrees_; + + HimemAlloc output_buffer_; + StreamInfo output_info_; +}; + +} // namespace audio diff --git a/src/audio/include/stream_info.hpp b/src/audio/include/stream_info.hpp index bf67364f..47f65649 100644 --- a/src/audio/include/stream_info.hpp +++ b/src/audio/include/stream_info.hpp @@ -4,19 +4,68 @@ #include #include #include +#include -#include "cbor.h" #include "result.hpp" -#include "sys/_stdint.h" +#include "span.hpp" +#include "types.hpp" namespace audio { struct StreamInfo { - std::optional path; - std::optional channels; - std::optional bits_per_sample; - std::optional sample_rate; - std::optional chunk_size; + // The number of bytes that are available for consumption within this + // stream's buffer. + std::size_t bytes_in_stream; + + // The total length of this stream, in case its source is finite (e.g. a + // file on disk). May be absent for endless streams (internet streams, + // generated audio, etc.) + std::optional length_bytes; + + struct Encoded { + // The codec that this stream is associated with. + codecs::StreamType type; + + bool operator==(const Encoded&) const = default; + }; + + struct Pcm { + // Number of channels in this stream. + uint8_t channels; + // Number of bits per sample. + uint8_t bits_per_sample; + // The sample rate. + uint16_t sample_rate; + + bool operator==(const Pcm&) const = default; + }; + + std::variant data; + + bool operator==(const StreamInfo&) const = default; +}; + +class MutableStream { + public: + StreamInfo* info; + cpp::span data; + + MutableStream(StreamInfo* i, cpp::span d) + : info(i), data(d) {} +}; + +/* + * A byte buffer + associated metadata, which is not allowed to modify any of + * the underlying data. + */ +class Stream { + public: + explicit Stream(const MutableStream& s) : info(*s.info), data(s.data) {} + + const StreamInfo& info; + // `data` itself left mutable for signalling how much of the stream was + // consumed + cpp::span data; }; } // namespace audio diff --git a/src/audio/pipeline.cpp b/src/audio/pipeline.cpp new file mode 100644 index 00000000..f42e6853 --- /dev/null +++ b/src/audio/pipeline.cpp @@ -0,0 +1,52 @@ +#include "pipeline.hpp" +#include "stream_info.hpp" + +namespace audio { + +Pipeline::Pipeline(IAudioElement* output) : root_(output), subtrees_() {} +Pipeline::~Pipeline() {} + +auto Pipeline::AddInput(IAudioElement* input) -> Pipeline* { + subtrees_.emplace_back(input); + return subtrees_.back().get(); +} + +auto Pipeline::OutputElement() const -> IAudioElement* { + return root_; +} + +auto Pipeline::NumInputs() const -> std::size_t { + return subtrees_.size(); +} + +auto Pipeline::InStreams( + std::vector>* regions, + std::vector* out) -> void { + for (int i = 0; i < subtrees_.size(); i++) { + MutableStream s = subtrees_[i]->OutStream(®ions->at(i)); + out->push_back(s); + } +} + +auto Pipeline::OutStream(MappableRegion* region) + -> MutableStream { + return {&output_info_, region->Map(output_buffer_)}; +} + +auto Pipeline::GetIterationOrder() -> std::vector { + std::vector to_search{this}; + std::vector found; + + while (!to_search.empty()) { + Pipeline* current = to_search.back(); + to_search.pop_back(); + found.push_back(current); + + to_search.insert(to_search.end(), current->subtrees_.begin(), + current->subtrees_.end()); + } + + return found; +} + +} // namespace audio diff --git a/src/codecs/include/codec.hpp b/src/codecs/include/codec.hpp index 6897acf2..4595f877 100644 --- a/src/codecs/include/codec.hpp +++ b/src/codecs/include/codec.hpp @@ -10,6 +10,7 @@ #include "result.hpp" #include "span.hpp" +#include "types.hpp" namespace codecs { @@ -17,7 +18,7 @@ class ICodec { public: virtual ~ICodec() {} - virtual auto CanHandleFile(const std::string& path) -> bool = 0; + virtual auto CanHandleType(StreamType type) -> bool = 0; struct OutputFormat { uint8_t num_channels; @@ -31,7 +32,7 @@ class ICodec { virtual auto ResetForNewStream() -> void = 0; - virtual auto SetInput(cpp::span input) -> void = 0; + virtual auto SetInput(cpp::span input) -> void = 0; /* * Returns the codec's next read position within the input buffer. If the @@ -63,7 +64,7 @@ class ICodec { enum CreateCodecError { UNKNOWN_EXTENSION }; -auto CreateCodecForFile(const std::string& file) +auto CreateCodecForType(StreamType type) -> cpp::result, CreateCodecError>; } // namespace codecs diff --git a/src/drivers/dac.cpp b/src/drivers/dac.cpp index c9af0d99..4d3aca1d 100644 --- a/src/drivers/dac.cpp +++ b/src/drivers/dac.cpp @@ -192,7 +192,8 @@ auto AudioDac::Reconfigure(BitsPerSample bps, SampleRate rate) -> void { WriteRegister(Register::POWER_MODE, 0); } -auto AudioDac::WriteData(cpp::span data) -> std::size_t { +auto AudioDac::WriteData(const cpp::span& data) + -> std::size_t { std::size_t bytes_written = 0; esp_err_t err = i2s_channel_write(i2s_handle_, data.data(), data.size_bytes(), &bytes_written, 0); diff --git a/src/drivers/include/dac.hpp b/src/drivers/include/dac.hpp index 06808a78..028d46cb 100644 --- a/src/drivers/include/dac.hpp +++ b/src/drivers/include/dac.hpp @@ -71,7 +71,7 @@ class AudioDac { // TODO(jacqueline): worth supporting channels here as well? auto Reconfigure(BitsPerSample bps, SampleRate rate) -> void; - auto WriteData(cpp::span data) -> std::size_t; + auto WriteData(const cpp::span& data) -> std::size_t; auto Stop() -> void; auto LogStatus() -> void; diff --git a/src/memory/CMakeLists.txt b/src/memory/CMakeLists.txt index 67e64267..69afc454 100644 --- a/src/memory/CMakeLists.txt +++ b/src/memory/CMakeLists.txt @@ -1,2 +1,2 @@ -idf_component_register(SRCS "arena.cpp" INCLUDE_DIRS "include" REQUIRES "span") +idf_component_register(SRCS "arena.cpp" INCLUDE_DIRS "include" REQUIRES "span" "esp_psram") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/memory/include/himem.hpp b/src/memory/include/himem.hpp new file mode 100644 index 00000000..c65091d7 --- /dev/null +++ b/src/memory/include/himem.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include +#include + +#include "esp32/himem.h" +#include "span.hpp" + +/* + * Wrapper around an ESP-IDF himem allocation, which uses RAII to clean up after + * itself. + */ +template +class HimemAlloc { + public: + esp_himem_handle_t handle; + const bool is_valid; + + HimemAlloc() : is_valid(esp_himem_alloc(size, &handle) == ESP_OK) {} + + ~HimemAlloc() { + if (is_valid) { + esp_himem_free(handle); + } + } + + // Not copyable or movable. + HimemAlloc(const HimemAlloc&) = delete; + HimemAlloc& operator=(const HimemAlloc&) = delete; +}; + +/* + * Wrapper around an ESP-IDF himem allocation, which maps a HimemAlloc into the + * usable address space. Instances always contain the last memory region that + * was mapped within them. + */ +template +class MappableRegion { + private: + std::byte* bytes_; + + public: + esp_himem_rangehandle_t range_handle; + const bool is_valid; + + MappableRegion() + : bytes_(nullptr), + is_valid(esp_himem_alloc_map_range(size, &range_handle) == ESP_OK) {} + + ~MappableRegion() { + if (bytes_ != nullptr) { + esp_himem_unmap(range_handle, bytes_, size); + } + if (is_valid) { + esp_himem_free_map_range(range_handle); + } + } + + auto Get() -> cpp::span { + if (bytes_ != nullptr) { + return {}; + } + return {bytes_, size}; + } + + auto Map(const HimemAlloc &alloc) -> cpp::span { + if (bytes_ != nullptr) { + ESP_ERROR_CHECK(esp_himem_unmap(range_handle, bytes_, size)); + } + ESP_ERROR_CHECK(esp_himem_map(alloc.handle, range_handle, 0, 0, size, 0, + reinterpret_cast(&bytes_))); + return Get(); + } + + // Not copyable or movable. + MappableRegion(const MappableRegion&) = delete; + MappableRegion& operator=(const MappableRegion&) = delete; +};