mostly single task pipeline

custom
jacqueline 2 years ago
parent 2a46eecdc6
commit a9531c86a4
  1. 2
      src/audio/CMakeLists.txt
  2. 137
      src/audio/audio_decoder.cpp
  3. 60
      src/audio/audio_playback.cpp
  4. 212
      src/audio/audio_task.cpp
  5. 73
      src/audio/fatfs_audio_input.cpp
  6. 79
      src/audio/i2s_audio_output.cpp
  7. 19
      src/audio/include/audio_decoder.hpp
  8. 58
      src/audio/include/audio_element.hpp
  9. 13
      src/audio/include/audio_playback.hpp
  10. 31
      src/audio/include/audio_task.hpp
  11. 17
      src/audio/include/fatfs_audio_input.hpp
  12. 17
      src/audio/include/i2s_audio_output.hpp
  13. 42
      src/audio/include/pipeline.hpp
  14. 63
      src/audio/include/stream_info.hpp
  15. 52
      src/audio/pipeline.cpp
  16. 7
      src/codecs/include/codec.hpp
  17. 3
      src/drivers/dac.cpp
  18. 2
      src/drivers/include/dac.hpp
  19. 2
      src/memory/CMakeLists.txt
  20. 78
      src/memory/include/himem.hpp

@ -1,7 +1,7 @@
idf_component_register(
SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp"
"stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp"
"audio_playback.cpp" "stream_event.cpp" "audio_element.cpp"
"audio_playback.cpp" "stream_event.cpp" "audio_element.cpp" "pipeline.cpp"
INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory")

@ -2,13 +2,16 @@
#include <string.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include "cbor/tinycbor/src/cborinternal_p.h"
#include "freertos/FreeRTOS.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/message_buffer.h"
#include "freertos/portmacro.h"
@ -21,126 +24,106 @@ namespace audio {
static const char* kTag = "DEC";
static const std::size_t kChunkSize = 1024;
static const std::size_t kReadahead = 8;
AudioDecoder::AudioDecoder()
: IAudioElement(),
arena_(kChunkSize, kReadahead, MALLOC_CAP_SPIRAM),
stream_info_({}),
has_samples_to_send_(false),
needs_more_input_(true) {}
AudioDecoder::~AudioDecoder() {}
auto AudioDecoder::HasUnprocessedInput() -> bool {
return !needs_more_input_ || has_samples_to_send_;
}
auto AudioDecoder::IsOverBuffered() -> bool {
return arena_.BlocksFree() == 0;
}
auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> void {
stream_info_ = info;
if (info.chunk_size) {
chunk_reader_.emplace(info.chunk_size.value());
} else {
ESP_LOGE(kTag, "no chunk size given");
return;
auto AudioDecoder::ProcessStreamInfo(const StreamInfo& info) -> bool {
if (!std::holds_alternative<StreamInfo::Encoded>(info.data)) {
return false;
}
const auto& encoded = std::get<StreamInfo::Encoded>(info.data);
// Reuse the existing codec if we can. This will help with gapless playback,
// since we can potentially just continue to decode as we were before,
// without any setup overhead.
// TODO: use audio type from stream
if (current_codec_ != nullptr &&
current_codec_->CanHandleFile(info.path.value_or(""))) {
current_codec_->CanHandleType(encoded.type)) {
current_codec_->ResetForNewStream();
return;
return true;
}
auto result = codecs::CreateCodecForFile(info.path.value_or(""));
// TODO: use audio type from stream
auto result = codecs::CreateCodecForType(encoded.type);
if (result.has_value()) {
current_codec_ = std::move(result.value());
} else {
ESP_LOGE(kTag, "no codec for this file");
return;
}
stream_info_ = info;
has_sent_stream_info_ = false;
}
auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk) -> void {
if (current_codec_ == nullptr || !chunk_reader_) {
// Should never happen, but fail explicitly anyway.
ESP_LOGW(kTag, "received chunk without chunk size or codec");
return;
return false;
}
ESP_LOGI(kTag, "received new chunk (size %u)", chunk.size());
current_codec_->SetInput(chunk_reader_->HandleNewData(chunk));
needs_more_input_ = false;
return true;
}
auto AudioDecoder::ProcessEndOfStream() -> void {
auto AudioDecoder::Process(std::vector<Stream>* inputs, MutableStream* output)
-> void {
// We don't really expect multiple inputs, so just pick the first that
// contains data. If none of them contain data, then we can still flush
// pending samples.
auto input =
std::find_if(inputs->begin(), inputs->end(),
[](const Stream& s) { return s.data.size_bytes() > 0; });
if (input != inputs->end()) {
const StreamInfo* info = input->info;
if (!stream_info_ || *stream_info_ != *info) {
// The input stream has changed! Immediately throw everything away and
// start from scratch.
// TODO: special case gapless playback? needs thought.
stream_info_ = *info;
has_samples_to_send_ = false;
needs_more_input_ = true;
current_codec_.reset();
has_set_stream_info_ = false;
SendOrBufferEvent(std::unique_ptr<StreamEvent>(
StreamEvent::CreateEndOfStream(input_events_)));
}
ProcessStreamInfo(*info);
}
auto AudioDecoder::Process() -> void {
if (has_samples_to_send_) {
// Writing samples is relatively quick (it's just a bunch of memcopy's), so
// do them all at once.
while (has_samples_to_send_ && !IsOverBuffered()) {
if (!has_sent_stream_info_) {
has_sent_stream_info_ = true;
auto format = current_codec_->GetOutputFormat();
stream_info_->bits_per_sample = format.bits_per_sample;
stream_info_->sample_rate = format.sample_rate_hz;
stream_info_->channels = format.num_channels;
stream_info_->chunk_size = kChunkSize;
auto event =
StreamEvent::CreateStreamInfo(input_events_, *stream_info_);
SendOrBufferEvent(std::unique_ptr<StreamEvent>(event));
current_codec_->SetInput(input->data);
}
auto block = arena_.Acquire();
if (!block) {
return;
while (true) {
if (has_samples_to_send_) {
if (!has_set_stream_info_) {
has_set_stream_info_ = true;
auto format = current_codec_->GetOutputFormat();
output->info->data.emplace<StreamInfo::Pcm>(
format.bits_per_sample, format.sample_rate_hz, format.num_channels);
}
auto write_res =
current_codec_->WriteOutputSamples({block->start, block->size});
block->used_size = write_res.first;
auto write_res = current_codec_->WriteOutputSamples(
output->data.subspan(output->info->bytes_in_stream));
output->info->bytes_in_stream += write_res.first;
has_samples_to_send_ = !write_res.second;
auto chunk = std::unique_ptr<StreamEvent>(
StreamEvent::CreateArenaChunk(input_events_, *block));
if (!SendOrBufferEvent(std::move(chunk))) {
return;
}
if (has_samples_to_send_) {
// We weren't able to fit all the generated samples into the output
// buffer. Stop trying; we'll finish up during the next pass.
break;
}
// We will process the next frame during the next call to this method.
}
if (!needs_more_input_) {
if (input != inputs->end()) {
auto res = current_codec_->ProcessNextFrame();
if (res.has_error()) {
// TODO(jacqueline): Handle errors.
return;
}
needs_more_input_ = res.value();
has_samples_to_send_ = true;
input->data = input->data.subspan(current_codec_->GetInputPosition());
if (needs_more_input_) {
chunk_reader_->HandleBytesUsed(current_codec_->GetInputPosition());
if (res.value()) {
// We're out of data in this buffer. Finish immediately; there's nothing
// to send.
break;
} else {
has_samples_to_send_ = true;
}
} else {
// No input; nothing to do.
break;
}
}
}

@ -13,6 +13,7 @@
#include "fatfs_audio_input.hpp"
#include "gpio_expander.hpp"
#include "i2s_audio_output.hpp"
#include "pipeline.hpp"
#include "storage.hpp"
#include "stream_buffer.hpp"
#include "stream_info.hpp"
@ -20,11 +21,10 @@
namespace audio {
auto AudioPlayback::create(drivers::GpioExpander* expander,
std::shared_ptr<drivers::SdStorage> storage)
auto AudioPlayback::create(drivers::GpioExpander* expander)
-> cpp::result<std::unique_ptr<AudioPlayback>, Error> {
// Create everything
auto source = std::make_shared<FatfsAudioInput>(storage);
auto source = std::make_shared<FatfsAudioInput>();
auto codec = std::make_shared<AudioDecoder>();
auto sink_res = I2SAudioOutput::create(expander);
@ -35,41 +35,47 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
auto playback = std::make_unique<AudioPlayback>();
// Configure the pipeline
playback->ConnectElements(source.get(), codec.get());
playback->ConnectElements(codec.get(), sink.get());
Pipeline *pipeline = new Pipeline(sink.get());
pipeline->AddInput(codec.get())->AddInput(source.get());
// Launch!
StartAudioTask("src", {}, source);
StartAudioTask("dec", {}, codec);
StartAudioTask("sink", 0, sink);
playback->input_handle_ = source->InputEventQueue();
task::Start(pipeline);
return playback;
}
AudioPlayback::AudioPlayback() {}
AudioPlayback::AudioPlayback() {
// Create everything
auto source = std::make_shared<FatfsAudioInput>();
auto codec = std::make_shared<AudioDecoder>();
AudioPlayback::~AudioPlayback() {}
auto sink_res = I2SAudioOutput::create(expander);
if (sink_res.has_error()) {
return cpp::fail(ERR_INIT_ELEMENT);
}
auto sink = sink_res.value();
auto AudioPlayback::Play(const std::string& filename) -> void {
StreamInfo info;
info.path = filename;
auto event = StreamEvent::CreateStreamInfo(input_handle_, info);
xQueueSend(input_handle_, &event, portMAX_DELAY);
event = StreamEvent::CreateEndOfStream(input_handle_);
xQueueSend(input_handle_, &event, portMAX_DELAY);
auto playback = std::make_unique<AudioPlayback>();
Pipeline *pipeline = new Pipeline(sink.get());
pipeline->AddInput(codec.get())->AddInput(source.get());
task::Start(pipeline);
return playback;
}
auto AudioPlayback::LogStatus() -> void {
auto event = StreamEvent::CreateLogStatus();
xQueueSendToFront(input_handle_, &event, portMAX_DELAY);
AudioPlayback::~AudioPlayback() {
pipeline_->Quit();
}
auto AudioPlayback::ConnectElements(IAudioElement* src, IAudioElement* sink)
-> void {
src->OutputEventQueue(sink->InputEventQueue());
auto AudioPlayback::Play(const std::string& filename) -> void {
// TODO: concurrency, yo!
file_source->OpenFile(filename);
pipeline_->Play();
}
auto AudioPlayback::LogStatus() -> void {
// TODO.
}
} // namespace audio

@ -2,16 +2,20 @@
#include <stdlib.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include "cbor.h"
#include "esp_err.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
#include "pipeline.hpp"
#include "span.hpp"
#include "arena.hpp"
@ -25,25 +29,26 @@
namespace audio {
namespace task {
static const char* kTag = "task";
static const std::size_t kStackSize = 24 * 1024;
static const uint8_t kAudioCore = 0;
auto StartAudioTask(const std::string& name,
std::optional<BaseType_t> core_id,
std::shared_ptr<IAudioElement> element) -> void {
auto task_handle = std::make_unique<TaskHandle_t>();
auto Start(Pipeline* pipeline) -> Handle* {
auto input_queue = xQueueCreate(8, 1);
// Newly created task will free this.
AudioTaskArgs* args = new AudioTaskArgs{.element = element};
ESP_LOGI(kTag, "starting audio task %s", name.c_str());
if (core_id) {
xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(),
element->StackSizeBytes(), args, kTaskPriorityAudio,
task_handle.get(), *core_id);
} else {
xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
kTaskPriorityAudio, task_handle.get());
}
AudioTaskArgs* args = new AudioTaskArgs{
.pipeline = pipeline,
.input = input_queue,
};
ESP_LOGI(kTag, "starting audio task");
xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
kTaskPriorityAudio, NULL, kAudioCore);
return new Handle(input_queue);
}
void AudioTaskMain(void* args) {
@ -51,113 +56,88 @@ void AudioTaskMain(void* args) {
// called before the task quits.
{
AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
std::shared_ptr<IAudioElement> element = std::move(real_args->element);
std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
QueueHandle_t input;
StreamBufferHandle_t output;
delete real_args;
// Queue of events that we have received on our input queue, but not yet
// processed.
std::deque<std::unique_ptr<StreamEvent>> pending_events;
// TODO(jacqueline): quit event
while (true) {
// First, we pull events from our input queue into pending_events. This
// keeps us responsive to any events that need to be handled immediately.
// Then we check if there's any events to flush downstream.
// Then we pass anything requiring processing to the element.
bool has_work_to_do =
(!pending_events.empty() || element->HasUnflushedOutput() ||
element->HasUnprocessedInput()) &&
!element->IsOverBuffered();
if (has_work_to_do) {
ESP_LOGD(kTag, "checking for events");
} else {
ESP_LOGD(kTag, "waiting for events");
}
// If we have no new events to process and the element has nothing left to
// do, then just delay forever waiting for a new event.
TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;
StreamEvent* new_event = nullptr;
bool has_event =
xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait);
if (has_event) {
if (new_event->tag == StreamEvent::UNINITIALISED) {
ESP_LOGE(kTag, "discarding invalid event!!");
} else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) {
delete new_event;
} else if (new_event->tag == StreamEvent::LOG_STATUS) {
element->ProcessLogStatus();
if (element->OutputEventQueue() != nullptr) {
xQueueSendToFront(element->OutputEventQueue(), &new_event, 0);
} else {
delete new_event;
}
} else {
// This isn't an event that needs to be actioned immediately. Add it
// to our work queue.
pending_events.emplace_back(new_event);
ESP_LOGD(kTag, "deferring event");
}
// Loop again, so that we service all incoming events before doing our
// possibly expensive processing.
continue;
}
if (element->HasUnflushedOutput()) {
ESP_LOGD(kTag, "flushing output");
std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
std::size_t max_inputs =
(*std::max_element(elements.begin(), elements.end(),
[](Pipeline const* first, Pipeline const* second) {
return first->NumInputs() < second->NumInputs();
}))
->NumInputs();
// We need to be able to simultaneously map all of an element's inputs, plus
// its output. So preallocate that many ranges.
std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
MappableRegion<kPipelineBufferSize> out_region;
std::for_each(in_regions.begin(), in_regions.end(),
[](const MappableRegion<kBufferSize>& region) {
assert(region.is_valid);
});
assert(out_region.is_valid);
// Each element has exactly one output buffer.
std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
std::vector<StreamInfo> buffer_infos(buffers.size());
std::for_each(buffers.begin(), buffers.end(),
[](const HimemAlloc<kPipelineBufferSize>& alloc) {
assert(alloc.is_valid);
});
bool playing = true;
bool quit = false;
while (!quit) {
// TODO: full event here?
Command cmd;
bool has_cmd = xQueueReceive(input, &cmd, 0);
if (has_cmd) {
switch (cmd) {
case PLAY:
playing = true;
break;
case PAUSE:
playing = false;
break;
case QUIT:
quit = true;
break;
}
// We have no new events. Next, see if there's anything that needs to be
// flushed.
if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
// We had things to flush, but couldn't send it all. This probably
// implies that the downstream element is having issues servicing its
// input queue, so hold off for a moment before retrying.
ESP_LOGW(kTag, "failed to flush buffered output");
vTaskDelay(pdMS_TO_TICKS(100));
continue;
}
if (element->HasUnprocessedInput()) {
ESP_LOGD(kTag, "processing input events");
element->Process();
continue;
if (quit) {
break;
}
// The element ran out of data, so now it's time to let it process more
// input.
while (!pending_events.empty()) {
std::unique_ptr<StreamEvent> event;
pending_events.front().swap(event);
pending_events.pop_front();
ESP_LOGD(kTag, "processing event, tag %i", event->tag);
if (event->tag == StreamEvent::STREAM_INFO) {
ESP_LOGD(kTag, "processing stream info");
element->ProcessStreamInfo(*event->stream_info);
} else if (event->tag == StreamEvent::ARENA_CHUNK) {
ESP_LOGD(kTag, "processing arena data");
memory::ArenaRef ref(event->arena_chunk);
auto callback =
StreamEvent::CreateChunkNotification(element->InputEventQueue());
if (!xQueueSend(event->source, &callback, 0)) {
ESP_LOGW(kTag, "failed to send chunk notif");
continue;
if (playing) {
for (int i = 0; i < elements.size(); i++) {
std::vector<MutableStream> in_streams;
elements.at(i)->InStreams(&in_regions, &in_streams);
MutableStream out_stream = elements.at(i)->OutStream(&out_region);
// Crop the input and output streams to the ranges that are safe to
// touch. For the input streams, this is the region that contains
// data. For the output stream, this is the region that does *not*
// already contain data.
std::vector<Stream> cropped_in_streams;
std::for_each(in_streams.begin(), in_streams.end(),
[&](MutableStream& s) {
cropped_in_streams.emplace_back(
s.info, s.data.first(s.info->bytes_in_stream));
});
elements.at(i)->OutputElement()->Process(&cropped_in_streams,
&out_stream);
for (int stream = 0; stream < in_streams.size(); stream++) {
MutableStream& orig_stream = in_streams.at(stream);
Stream& cropped_stream = cropped_in_streams.at(stream);
std::move(cropped_stream.data.begin(), cropped_stream.data.end(),
orig_stream.data.begin());
orig_stream.info->bytes_in_stream =
cropped_stream.data.size_bytes();
}
// TODO(jacqueline): Consider giving the element a full ArenaRef here,
// so that it can hang on to it and potentially save an alloc+copy.
element->ProcessChunk({ref.ptr.start, ref.ptr.used_size});
// TODO: think about whether to do the whole queue
break;
}
}
}
@ -165,4 +145,6 @@ void AudioTaskMain(void* args) {
vTaskDelete(NULL);
}
} // namespace task
} // namespace audio

@ -7,6 +7,8 @@
#include "arena.hpp"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "ff.h"
#include "freertos/portmacro.h"
#include "audio_element.hpp"
@ -15,43 +17,23 @@
#include "stream_event.hpp"
#include "stream_info.hpp"
#include "stream_message.hpp"
#include "types.hpp"
static const char* kTag = "SRC";
namespace audio {
static const std::size_t kChunkSize = 24 * 1024;
static const std::size_t kChunkReadahead = 2;
FatfsAudioInput::FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage)
: IAudioElement(),
arena_(kChunkSize, kChunkReadahead, MALLOC_CAP_SPIRAM),
storage_(storage),
current_file_(),
is_file_open_(false) {}
FatfsAudioInput::FatfsAudioInput()
: IAudioElement(), current_file_(), is_file_open_(false) {}
FatfsAudioInput::~FatfsAudioInput() {}
auto FatfsAudioInput::HasUnprocessedInput() -> bool {
return is_file_open_;
}
auto FatfsAudioInput::IsOverBuffered() -> bool {
return arena_.BlocksFree() == 0;
}
auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) -> void {
auto FatfsAudioInput::OpenFile(const std::string& path) -> void {
if (is_file_open_) {
f_close(&current_file_);
is_file_open_ = false;
}
if (!info.path) {
// TODO(jacqueline): Handle errors.
return;
}
ESP_LOGI(kTag, "opening file %s", info.path->c_str());
std::string path = *info.path;
ESP_LOGI(kTag, "opening file %s", path.c_str());
FRESULT res = f_open(&current_file_, path.c_str(), FA_READ);
if (res != FR_OK) {
ESP_LOGE(kTag, "failed to open file! res: %i", res);
@ -60,52 +42,31 @@ auto FatfsAudioInput::ProcessStreamInfo(const StreamInfo& info) -> void {
}
is_file_open_ = true;
StreamInfo new_info(info);
new_info.chunk_size = kChunkSize;
ESP_LOGI(kTag, "chunk size: %u bytes", kChunkSize);
auto event = StreamEvent::CreateStreamInfo(input_events_, new_info);
SendOrBufferEvent(std::unique_ptr<StreamEvent>(event));
}
auto FatfsAudioInput::ProcessChunk(const cpp::span<std::byte>& chunk) -> void {}
auto FatfsAudioInput::ProcessEndOfStream() -> void {
if (is_file_open_) {
f_close(&current_file_);
is_file_open_ = false;
SendOrBufferEvent(std::unique_ptr<StreamEvent>(
StreamEvent::CreateEndOfStream(input_events_)));
}
}
auto FatfsAudioInput::Process() -> void {
if (is_file_open_) {
auto dest_block = memory::ArenaRef::Acquire(&arena_);
if (!dest_block) {
auto FatfsAudioInput::Process(std::vector<Stream>* inputs,
MutableStream* output) -> void {
if (!is_file_open_) {
return;
}
FRESULT result = f_read(&current_file_, dest_block->ptr.start,
dest_block->ptr.size, &dest_block->ptr.used_size);
FRESULT result =
f_read(&current_file_, output->data.data(), output->data.size_bytes(),
&output->info->bytes_in_stream);
if (result != FR_OK) {
ESP_LOGE(kTag, "file I/O error %d", result);
// TODO(jacqueline): Handle errors.
return;
}
if (dest_block->ptr.used_size < dest_block->ptr.size ||
// TODO: read from filename?
output->info->data = StreamInfo::Encoded{codecs::STREAM_MP3};
if (output->info->bytes_in_stream < output->data.size_bytes() ||
f_eof(&current_file_)) {
f_close(&current_file_);
is_file_open_ = false;
}
auto dest_event = std::unique_ptr<StreamEvent>(
StreamEvent::CreateArenaChunk(input_events_, dest_block->Release()));
SendOrBufferEvent(std::move(dest_event));
}
}
} // namespace audio

@ -1,6 +1,7 @@
#include "i2s_audio_output.hpp"
#include <algorithm>
#include <variant>
#include "esp_err.h"
#include "freertos/portmacro.h"
@ -10,14 +11,12 @@
#include "freertos/projdefs.h"
#include "gpio_expander.hpp"
#include "result.hpp"
#include "stream_info.hpp"
static const TickType_t kIdleTimeBeforeMute = pdMS_TO_TICKS(1000);
static const char* kTag = "I2SOUT";
namespace audio {
static const std::size_t kDmaQueueLength = 8;
auto I2SAudioOutput::create(drivers::GpioExpander* expander)
-> cpp::result<std::shared_ptr<I2SAudioOutput>, Error> {
// First, we need to perform initial configuration of the DAC chip.
@ -38,40 +37,26 @@ auto I2SAudioOutput::create(drivers::GpioExpander* expander)
I2SAudioOutput::I2SAudioOutput(drivers::GpioExpander* expander,
std::unique_ptr<drivers::AudioDac> dac)
: expander_(expander),
dac_(std::move(dac)),
chunk_reader_(),
latest_chunk_() {}
: expander_(expander), dac_(std::move(dac)), current_config_() {}
I2SAudioOutput::~I2SAudioOutput() {}
auto I2SAudioOutput::HasUnprocessedInput() -> bool {
return latest_chunk_.size() > 0;
}
auto I2SAudioOutput::IsOverBuffered() -> bool {
auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> bool {
if (!std::holds_alternative<StreamInfo::Pcm>(info.data)) {
return false;
}
auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> void {
// TODO(jacqueline): probs do something with the channel hey
if (!info.bits_per_sample || !info.sample_rate) {
ESP_LOGE(kTag, "audio stream missing bits or sample rate");
return;
}
if (!info.chunk_size) {
ESP_LOGE(kTag, "audio stream missing chunk size");
return;
StreamInfo::Pcm pcm = std::get<StreamInfo::Pcm>(info.data);
if (current_config_ && pcm == *current_config_) {
return true;
}
chunk_reader_.emplace(*info.chunk_size);
ESP_LOGI(kTag, "incoming audio stream: %u bpp @ %u Hz", *info.bits_per_sample,
*info.sample_rate);
ESP_LOGI(kTag, "incoming audio stream: %u bpp @ %u Hz", pcm.bits_per_sample,
pcm.sample_rate);
drivers::AudioDac::BitsPerSample bps;
switch (*info.bits_per_sample) {
switch (pcm.bits_per_sample) {
case 16:
bps = drivers::AudioDac::BPS_16;
break;
@ -83,11 +68,11 @@ auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> void {
break;
default:
ESP_LOGE(kTag, "dropping stream with unknown bps");
return;
return false;
}
drivers::AudioDac::SampleRate sample_rate;
switch (*info.sample_rate) {
switch (pcm.sample_rate) {
case 44100:
sample_rate = drivers::AudioDac::SAMPLE_RATE_44_1;
break;
@ -96,37 +81,25 @@ auto I2SAudioOutput::ProcessStreamInfo(const StreamInfo& info) -> void {
break;
default:
ESP_LOGE(kTag, "dropping stream with unknown rate");
return;
return false;
}
dac_->Reconfigure(bps, sample_rate);
}
auto I2SAudioOutput::ProcessChunk(const cpp::span<std::byte>& chunk) -> void {
latest_chunk_ = chunk_reader_->HandleNewData(chunk);
}
// TODO(jacqueline): probs do something with the channel hey
auto I2SAudioOutput::ProcessEndOfStream() -> void {
dac_->Stop();
SendOrBufferEvent(std::unique_ptr<StreamEvent>(
StreamEvent::CreateEndOfStream(input_events_)));
}
dac_->Reconfigure(bps, sample_rate);
current_config_ = pcm;
auto I2SAudioOutput::ProcessLogStatus() -> void {
dac_->LogStatus();
return true;
}
auto I2SAudioOutput::Process() -> void {
// Note: avoid logging here! We need to get bytes from the chunk buffer into
// the I2S DMA buffer as fast as possible, to avoid running out of samples.
std::size_t bytes_written = dac_->WriteData(latest_chunk_);
if (bytes_written == latest_chunk_.size_bytes()) {
latest_chunk_ = cpp::span<std::byte>();
chunk_reader_->HandleBytesLeftOver(0);
} else {
latest_chunk_ = latest_chunk_.subspan(bytes_written);
auto I2SAudioOutput::Process(std::vector<Stream>* inputs, MutableStream* output)
-> void {
std::for_each(inputs->begin(), inputs->end(), [&](Stream& s) {
if (ProcessStreamInfo(s.info)) {
std::size_t bytes_written = dac_->WriteData(s.data);
s.data = s.data.subspan(bytes_written);
}
return;
});
}
auto I2SAudioOutput::SetVolume(uint8_t volume) -> void {

@ -3,6 +3,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <vector>
#include "chunk.hpp"
#include "ff.h"
@ -10,6 +11,7 @@
#include "audio_element.hpp"
#include "codec.hpp"
#include "stream_info.hpp"
namespace audio {
@ -22,28 +24,21 @@ class AudioDecoder : public IAudioElement {
AudioDecoder();
~AudioDecoder();
auto StackSizeBytes() const -> std::size_t override { return 10 * 1024; };
auto HasUnprocessedInput() -> bool override;
auto IsOverBuffered() -> bool override;
auto ProcessStreamInfo(const StreamInfo& info) -> void override;
auto ProcessChunk(const cpp::span<std::byte>& chunk) -> void override;
auto ProcessEndOfStream() -> void override;
auto Process() -> void override;
auto Process(std::vector<Stream>* inputs, MutableStream* output)
-> void override;
AudioDecoder(const AudioDecoder&) = delete;
AudioDecoder& operator=(const AudioDecoder&) = delete;
private:
memory::Arena arena_;
std::unique_ptr<codecs::ICodec> current_codec_;
std::optional<StreamInfo> stream_info_;
std::optional<ChunkReader> chunk_reader_;
bool has_sent_stream_info_;
bool has_set_stream_info_;
bool has_samples_to_send_;
bool needs_more_input_;
auto ProcessStreamInfo(const StreamInfo& info) -> bool;
};
} // namespace audio

@ -40,62 +40,8 @@ class IAudioElement {
IAudioElement();
virtual ~IAudioElement();
/*
* Returns the stack size in bytes that this element requires. This should
* be tuned according to the observed stack size of each element, as different
* elements have fairly different stack requirements (particular decoders).
*/
virtual auto StackSizeBytes() const -> std::size_t { return 4096; };
/* Returns this element's input buffer. */
auto InputEventQueue() const -> QueueHandle_t { return input_events_; }
/* Returns this element's output buffer. */
auto OutputEventQueue() const -> QueueHandle_t { return output_events_; }
auto OutputEventQueue(const QueueHandle_t q) -> void { output_events_ = q; }
virtual auto HasUnprocessedInput() -> bool = 0;
virtual auto IsOverBuffered() -> bool { return false; }
auto HasUnflushedOutput() -> bool { return !buffered_output_.empty(); }
auto FlushBufferedOutput() -> bool;
/*
* Called when a StreamInfo message is received. Used to configure this
* element in preperation for incoming chunks.
*/
virtual auto ProcessStreamInfo(const StreamInfo& info) -> void = 0;
/*
* Called when a ChunkHeader message is received. Includes the data associated
* with this chunk of stream data. This method should return the number of
* bytes in this chunk that were actually used; leftover bytes will be
* prepended to the next call.
*/
virtual auto ProcessChunk(const cpp::span<std::byte>& chunk) -> void = 0;
virtual auto ProcessEndOfStream() -> void = 0;
virtual auto ProcessLogStatus() -> void {}
/*
* Called when there has been no data received over the input buffer for some
* time. This could be used to synthesize output, or to save memory by
* releasing unused resources.
*/
virtual auto Process() -> void = 0;
protected:
auto SendOrBufferEvent(std::unique_ptr<StreamEvent> event) -> bool;
// Queue for events coming into this element. Owned by us.
QueueHandle_t input_events_;
// Queue for events going into the next element. Not owned by us, may be null
// if we're not yet in a pipeline.
// FIXME: it would be nicer if this was non-nullable.
QueueHandle_t output_events_;
// Output events that have been generated, but are yet to be sent downstream.
std::deque<std::unique_ptr<StreamEvent>> buffered_output_;
virtual auto Process(std::vector<Stream>* inputs, MutableStream* output)
-> void = 0;
};
} // namespace audio

@ -5,7 +5,9 @@
#include <string>
#include <vector>
#include "audio_task.hpp"
#include "esp_err.h"
#include "fatfs_audio_input.hpp"
#include "result.hpp"
#include "span.hpp"
@ -23,12 +25,10 @@ namespace audio {
class AudioPlayback {
public:
enum Error { ERR_INIT_ELEMENT, ERR_MEM };
static auto create(drivers::GpioExpander* expander,
std::shared_ptr<drivers::SdStorage> storage)
static auto create(drivers::GpioExpander* expander)
-> cpp::result<std::unique_ptr<AudioPlayback>, Error>;
// TODO(jacqueline): configure on the fly once we have things to configure.
AudioPlayback();
AudioPlayback(FatfsAudioInput *file_input);
~AudioPlayback();
/*
@ -44,9 +44,10 @@ class AudioPlayback {
AudioPlayback& operator=(const AudioPlayback&) = delete;
private:
auto ConnectElements(IAudioElement* src, IAudioElement* sink) -> void;
FatfsAudioInput *file_source;
QueueHandle_t input_handle_;
std::vector<std::unique_ptr<IAudioElement>> all_elements_;
std::unique_ptr<task::Handle> pipeline_;
};
} // namespace audio

@ -6,17 +6,38 @@
#include "audio_element.hpp"
#include "freertos/portmacro.h"
#include "pipeline.hpp"
namespace audio {
namespace task {
struct AudioTaskArgs {
std::shared_ptr<IAudioElement>& element;
Pipeline* pipeline;
QueueHandle_t input;
};
auto StartAudioTask(const std::string& name,
std::optional<BaseType_t> core_id,
std::shared_ptr<IAudioElement> element) -> void;
extern "C" void AudioTaskMain(void* args);
void AudioTaskMain(void* args);
enum Command { PLAY, PAUSE, QUIT };
class Handle {
public:
explicit Handle(QueueHandle_t input);
~Handle();
auto SetStreamInfo() -> void;
auto Play() -> void;
auto Pause() -> void;
auto Quit() -> void;
auto OutputBuffer() -> StreamBufferHandle_t;
private:
QueueHandle_t input;
};
auto Start(Pipeline* pipeline) -> Handle*;
} // namespace task
} // namespace audio

@ -3,41 +3,36 @@
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "arena.hpp"
#include "chunk.hpp"
#include "freertos/FreeRTOS.h"
#include "ff.h"
#include "freertos/message_buffer.h"
#include "freertos/queue.h"
#include "span.hpp"
#include "audio_element.hpp"
#include "storage.hpp"
#include "stream_buffer.hpp"
namespace audio {
class FatfsAudioInput : public IAudioElement {
public:
explicit FatfsAudioInput(std::shared_ptr<drivers::SdStorage> storage);
explicit FatfsAudioInput();
~FatfsAudioInput();
auto HasUnprocessedInput() -> bool override;
auto IsOverBuffered() -> bool override;
auto OpenFile(const std::string& path) -> void;
auto ProcessStreamInfo(const StreamInfo& info) -> void override;
auto ProcessChunk(const cpp::span<std::byte>& chunk) -> void override;
auto ProcessEndOfStream() -> void override;
auto Process() -> void override;
auto Process(std::vector<Stream>* inputs, MutableStream* output)
-> void override;
FatfsAudioInput(const FatfsAudioInput&) = delete;
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
private:
memory::Arena arena_;
std::shared_ptr<drivers::SdStorage> storage_;
FIL current_file_;
bool is_file_open_;
};

@ -2,6 +2,7 @@
#include <cstdint>
#include <memory>
#include <vector>
#include "audio_element.hpp"
#include "chunk.hpp"
@ -9,6 +10,7 @@
#include "dac.hpp"
#include "gpio_expander.hpp"
#include "stream_info.hpp"
namespace audio {
@ -22,14 +24,8 @@ class I2SAudioOutput : public IAudioElement {
std::unique_ptr<drivers::AudioDac> dac);
~I2SAudioOutput();
auto HasUnprocessedInput() -> bool override;
auto IsOverBuffered() -> bool override;
auto ProcessStreamInfo(const StreamInfo& info) -> void override;
auto ProcessChunk(const cpp::span<std::byte>& chunk) -> void override;
auto ProcessEndOfStream() -> void override;
auto ProcessLogStatus() -> void override;
auto Process() -> void override;
auto Process(std::vector<Stream>* inputs, MutableStream* output)
-> void override;
I2SAudioOutput(const I2SAudioOutput&) = delete;
I2SAudioOutput& operator=(const I2SAudioOutput&) = delete;
@ -40,8 +36,9 @@ class I2SAudioOutput : public IAudioElement {
drivers::GpioExpander* expander_;
std::unique_ptr<drivers::AudioDac> dac_;
std::optional<ChunkReader> chunk_reader_;
cpp::span<std::byte> latest_chunk_;
std::optional<StreamInfo::Pcm> current_config_;
auto ProcessStreamInfo(const StreamInfo& info) -> bool;
};
} // namespace audio

@ -0,0 +1,42 @@
#pragma once
#include <memory>
#include <optional>
#include <string>
#include "freertos/portmacro.h"
#include "audio_element.hpp"
#include "himem.hpp"
#include "stream_info.hpp"
namespace audio {
static const std::size_t kPipelineBufferSize = 32 * 1024;
class Pipeline {
public:
Pipeline(IAudioElement* output);
~Pipeline();
auto AddInput(IAudioElement* input) -> Pipeline*;
auto OutputElement() const -> IAudioElement*;
auto NumInputs() const -> std::size_t;
auto InStreams(std::vector<MappableRegion<kPipelineBufferSize>>*,
std::vector<MutableStream>*) -> void;
auto OutStream(MappableRegion<kPipelineBufferSize>*) -> MutableStream;
auto GetIterationOrder() -> std::vector<Pipeline*>;
private:
IAudioElement* root_;
std::vector<std::unique_ptr<Pipeline>> subtrees_;
HimemAlloc<kPipelineBufferSize> output_buffer_;
StreamInfo output_info_;
};
} // namespace audio

@ -4,19 +4,68 @@
#include <optional>
#include <string>
#include <string_view>
#include <variant>
#include "cbor.h"
#include "result.hpp"
#include "sys/_stdint.h"
#include "span.hpp"
#include "types.hpp"
namespace audio {
struct StreamInfo {
std::optional<std::string> path;
std::optional<uint8_t> channels;
std::optional<uint8_t> bits_per_sample;
std::optional<uint16_t> sample_rate;
std::optional<size_t> chunk_size;
// The number of bytes that are available for consumption within this
// stream's buffer.
std::size_t bytes_in_stream;
// The total length of this stream, in case its source is finite (e.g. a
// file on disk). May be absent for endless streams (internet streams,
// generated audio, etc.)
std::optional<std::size_t> length_bytes;
struct Encoded {
// The codec that this stream is associated with.
codecs::StreamType type;
bool operator==(const Encoded&) const = default;
};
struct Pcm {
// Number of channels in this stream.
uint8_t channels;
// Number of bits per sample.
uint8_t bits_per_sample;
// The sample rate.
uint16_t sample_rate;
bool operator==(const Pcm&) const = default;
};
std::variant<Encoded, Pcm> data;
bool operator==(const StreamInfo&) const = default;
};
class MutableStream {
public:
StreamInfo* info;
cpp::span<std::byte> data;
MutableStream(StreamInfo* i, cpp::span<std::byte> d)
: info(i), data(d) {}
};
/*
* A byte buffer + associated metadata, which is not allowed to modify any of
* the underlying data.
*/
class Stream {
public:
explicit Stream(const MutableStream& s) : info(*s.info), data(s.data) {}
const StreamInfo& info;
// `data` itself left mutable for signalling how much of the stream was
// consumed
cpp::span<const std::byte> data;
};
} // namespace audio

@ -0,0 +1,52 @@
#include "pipeline.hpp"
#include "stream_info.hpp"
namespace audio {
Pipeline::Pipeline(IAudioElement* output) : root_(output), subtrees_() {}
Pipeline::~Pipeline() {}
auto Pipeline::AddInput(IAudioElement* input) -> Pipeline* {
subtrees_.emplace_back(input);
return subtrees_.back().get();
}
auto Pipeline::OutputElement() const -> IAudioElement* {
return root_;
}
auto Pipeline::NumInputs() const -> std::size_t {
return subtrees_.size();
}
auto Pipeline::InStreams(
std::vector<MappableRegion<kPipelineBufferSize>>* regions,
std::vector<MutableStream>* out) -> void {
for (int i = 0; i < subtrees_.size(); i++) {
MutableStream s = subtrees_[i]->OutStream(&regions->at(i));
out->push_back(s);
}
}
auto Pipeline::OutStream(MappableRegion<kPipelineBufferSize>* region)
-> MutableStream {
return {&output_info_, region->Map(output_buffer_)};
}
auto Pipeline::GetIterationOrder() -> std::vector<Pipeline*> {
std::vector<Pipeline*> to_search{this};
std::vector<Pipeline*> found;
while (!to_search.empty()) {
Pipeline* current = to_search.back();
to_search.pop_back();
found.push_back(current);
to_search.insert(to_search.end(), current->subtrees_.begin(),
current->subtrees_.end());
}
return found;
}
} // namespace audio

@ -10,6 +10,7 @@
#include "result.hpp"
#include "span.hpp"
#include "types.hpp"
namespace codecs {
@ -17,7 +18,7 @@ class ICodec {
public:
virtual ~ICodec() {}
virtual auto CanHandleFile(const std::string& path) -> bool = 0;
virtual auto CanHandleType(StreamType type) -> bool = 0;
struct OutputFormat {
uint8_t num_channels;
@ -31,7 +32,7 @@ class ICodec {
virtual auto ResetForNewStream() -> void = 0;
virtual auto SetInput(cpp::span<std::byte> input) -> void = 0;
virtual auto SetInput(cpp::span<const std::byte> input) -> void = 0;
/*
* Returns the codec's next read position within the input buffer. If the
@ -63,7 +64,7 @@ class ICodec {
enum CreateCodecError { UNKNOWN_EXTENSION };
auto CreateCodecForFile(const std::string& file)
auto CreateCodecForType(StreamType type)
-> cpp::result<std::unique_ptr<ICodec>, CreateCodecError>;
} // namespace codecs

@ -192,7 +192,8 @@ auto AudioDac::Reconfigure(BitsPerSample bps, SampleRate rate) -> void {
WriteRegister(Register::POWER_MODE, 0);
}
auto AudioDac::WriteData(cpp::span<std::byte> data) -> std::size_t {
auto AudioDac::WriteData(const cpp::span<const std::byte>& data)
-> std::size_t {
std::size_t bytes_written = 0;
esp_err_t err = i2s_channel_write(i2s_handle_, data.data(), data.size_bytes(),
&bytes_written, 0);

@ -71,7 +71,7 @@ class AudioDac {
// TODO(jacqueline): worth supporting channels here as well?
auto Reconfigure(BitsPerSample bps, SampleRate rate) -> void;
auto WriteData(cpp::span<std::byte> data) -> std::size_t;
auto WriteData(const cpp::span<const std::byte>& data) -> std::size_t;
auto Stop() -> void;
auto LogStatus() -> void;

@ -1,2 +1,2 @@
idf_component_register(SRCS "arena.cpp" INCLUDE_DIRS "include" REQUIRES "span")
idf_component_register(SRCS "arena.cpp" INCLUDE_DIRS "include" REQUIRES "span" "esp_psram")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})

@ -0,0 +1,78 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include "esp32/himem.h"
#include "span.hpp"
/*
* Wrapper around an ESP-IDF himem allocation, which uses RAII to clean up after
* itself.
*/
template <std::size_t size>
class HimemAlloc {
public:
esp_himem_handle_t handle;
const bool is_valid;
HimemAlloc() : is_valid(esp_himem_alloc(size, &handle) == ESP_OK) {}
~HimemAlloc() {
if (is_valid) {
esp_himem_free(handle);
}
}
// Not copyable or movable.
HimemAlloc(const HimemAlloc&) = delete;
HimemAlloc& operator=(const HimemAlloc&) = delete;
};
/*
* Wrapper around an ESP-IDF himem allocation, which maps a HimemAlloc into the
* usable address space. Instances always contain the last memory region that
* was mapped within them.
*/
template <std::size_t size>
class MappableRegion {
private:
std::byte* bytes_;
public:
esp_himem_rangehandle_t range_handle;
const bool is_valid;
MappableRegion()
: bytes_(nullptr),
is_valid(esp_himem_alloc_map_range(size, &range_handle) == ESP_OK) {}
~MappableRegion() {
if (bytes_ != nullptr) {
esp_himem_unmap(range_handle, bytes_, size);
}
if (is_valid) {
esp_himem_free_map_range(range_handle);
}
}
auto Get() -> cpp::span<std::byte> {
if (bytes_ != nullptr) {
return {};
}
return {bytes_, size};
}
auto Map(const HimemAlloc<size> &alloc) -> cpp::span<std::byte> {
if (bytes_ != nullptr) {
ESP_ERROR_CHECK(esp_himem_unmap(range_handle, bytes_, size));
}
ESP_ERROR_CHECK(esp_himem_map(alloc.handle, range_handle, 0, 0, size, 0,
reinterpret_cast<void**>(&bytes_)));
return Get();
}
// Not copyable or movable.
MappableRegion(const MappableRegion&) = delete;
MappableRegion& operator=(const MappableRegion&) = delete;
};
Loading…
Cancel
Save