From d8fc77101dcf80a3643a00b3446dca1e390ce997 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Thu, 10 Aug 2023 15:33:00 +1000 Subject: [PATCH] Give codecs complete control of their input files --- src/audio/CMakeLists.txt | 1 + src/audio/audio_fsm.cpp | 4 + src/audio/audio_task.cpp | 274 ++++------------------ src/audio/fatfs_audio_input.cpp | 294 +++++------------------- src/audio/fatfs_source.cpp | 70 ++++++ src/audio/fatfs_source.hpp | 44 ++++ src/audio/i2s_audio_output.cpp | 25 +- src/audio/include/audio_fsm.hpp | 1 + src/audio/include/audio_sink.hpp | 13 +- src/audio/include/audio_source.hpp | 24 +- src/audio/include/audio_task.hpp | 27 +-- src/audio/include/fatfs_audio_input.hpp | 79 +------ src/audio/include/i2s_audio_output.hpp | 6 +- src/audio/include/sink_mixer.hpp | 38 ++- src/audio/sink_mixer.cpp | 263 +++++++++++---------- src/codecs/CMakeLists.txt | 1 + src/codecs/codec.cpp | 4 +- src/codecs/foxenflac.cpp | 77 ++++--- src/codecs/include/codec.hpp | 46 +++- src/codecs/include/foxenflac.hpp | 19 +- src/codecs/include/mad.hpp | 31 ++- src/codecs/include/opus.hpp | 33 +-- src/codecs/include/source_buffer.hpp | 37 +++ src/codecs/include/vorbis.hpp | 32 +-- src/codecs/mad.cpp | 245 +++++++++----------- src/codecs/opus.cpp | 135 +++++++---- src/codecs/source_buffer.cpp | 75 ++++++ src/codecs/vorbis.cpp | 103 ++++----- src/database/database.cpp | 4 +- src/database/include/track.hpp | 10 +- src/database/tag_parser.cpp | 12 +- src/tasks/tasks.cpp | 4 +- 32 files changed, 923 insertions(+), 1108 deletions(-) create mode 100644 src/audio/fatfs_source.cpp create mode 100644 src/audio/fatfs_source.hpp create mode 100644 src/codecs/include/source_buffer.hpp create mode 100644 src/codecs/source_buffer.cpp diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index bd4ba32d..02e84c3f 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -6,6 +6,7 @@ idf_component_register( SRCS "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" "track_queue.cpp" "stream_event.cpp" "stream_info.cpp" "audio_fsm.cpp" "sink_mixer.cpp" "resample.cpp" + "fatfs_source.cpp" INCLUDE_DIRS "include" REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm" "database" "system_fsm" "playlist") diff --git a/src/audio/audio_fsm.cpp b/src/audio/audio_fsm.cpp index 79080c9a..617272b3 100644 --- a/src/audio/audio_fsm.cpp +++ b/src/audio/audio_fsm.cpp @@ -97,6 +97,10 @@ void Standby::react(const PlayFile& ev) { sFileSource->SetPath(ev.filename); } +void Playback::react(const PlayFile& ev) { + sFileSource->SetPath(ev.filename); +} + void Standby::react(const internal::InputFileOpened& ev) { transit(); } diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 046df378..d880e6b1 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -46,6 +46,7 @@ #include "stream_message.hpp" #include "sys/_stdint.h" #include "tasks.hpp" +#include "track.hpp" #include "types.hpp" #include "ui_fsm.hpp" @@ -53,7 +54,7 @@ namespace audio { static const char* kTag = "audio_dec"; -static constexpr std::size_t kSampleBufferSize = 16 * 1024; +static constexpr std::size_t kCodecBufferLength = 240 * 4; Timer::Timer(const StreamInfo::Pcm& format, const Duration& duration) : format_(format), current_seconds_(0), current_sample_in_second_(0) { @@ -120,260 +121,69 @@ AudioTask::AudioTask(IAudioSource* source, IAudioSink* sink) : source_(source), sink_(sink), codec_(), - mixer_(new SinkMixer(sink->stream())), + mixer_(new SinkMixer(sink)), timer_(), - has_begun_decoding_(false), - current_input_format_(), - current_output_format_(), - codec_buffer_(new RawStream(kSampleBufferSize, MALLOC_CAP_8BIT)) {} + current_format_() { + codec_buffer_ = { + reinterpret_cast(heap_caps_calloc( + kCodecBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kCodecBufferLength}; +} void AudioTask::Main() { for (;;) { - source_->Read( - [this](IAudioSource::Flags flags, InputStream& stream) -> void { - if (flags.is_start()) { - has_begun_decoding_ = false; - if (!HandleNewStream(stream)) { - return; - } - } - - auto pcm = stream.info().format_as(); - if (pcm) { - if (ForwardPcmStream(*pcm, stream.data())) { - stream.consume(stream.data().size_bytes()); - } - return; - } - - if (!stream.info().format_as() || !codec_) { - // Either unknown stream format, or it's encoded but we don't have - // a decoder that supports it. Either way, bail out. - return; - } - - if (!has_begun_decoding_) { - if (BeginDecoding(stream)) { - has_begun_decoding_ = true; - } else { - return; - } - } - - // At this point the decoder has been initialised, and the sink has - // been correctly configured. All that remains is to throw samples - // into the sink as fast as possible. - if (!ContinueDecoding(stream)) { - codec_.reset(); - } + if (source_->HasNewStream() || !stream_) { + std::shared_ptr new_stream = source_->NextStream(); + if (new_stream && BeginDecoding(new_stream)) { + stream_ = new_stream; + } else { + continue; + } + } - if (flags.is_end()) { - FinishDecoding(stream); - events::Audio().Dispatch(internal::InputFileFinished{}); - } - }, - portMAX_DELAY); + if (ContinueDecoding()) { + events::Audio().Dispatch(internal::InputFileFinished{}); + stream_.reset(); + } } } -auto AudioTask::HandleNewStream(const InputStream& stream) -> bool { - // This must be a new stream of data. Reset everything to prepare to - // handle it. - current_input_format_ = stream.info().format(); - codec_.reset(); - - // What kind of data does this new stream contain? - auto pcm = stream.info().format_as(); - auto encoded = stream.info().format_as(); - if (pcm) { - // It's already decoded! We can always handle this. - return true; - } else if (encoded) { - // The stream has some kind of encoding. Whether or not we can - // handle it is entirely down to whether or not we have a codec for - // it. - has_begun_decoding_ = false; - auto codec = codecs::CreateCodecForType(encoded->type); - if (codec) { - ESP_LOGI(kTag, "successfully created codec for stream"); - codec_.reset(*codec); - return true; - } else { - ESP_LOGE(kTag, "stream has unknown encoding"); - return false; - } - } else { - // programmer error / skill issue :( - ESP_LOGE(kTag, "stream has unknown format"); +auto AudioTask::BeginDecoding(std::shared_ptr stream) -> bool { + codec_.reset(codecs::CreateCodecForType(stream->type()).value_or(nullptr)); + if (!codec_) { + ESP_LOGE(kTag, "no codec found"); return false; } -} - -auto AudioTask::BeginDecoding(InputStream& stream) -> bool { - auto res = codec_->BeginStream(stream.data()); - stream.consume(res.first); - if (res.second.has_error()) { - if (res.second.error() == codecs::ICodec::Error::kOutOfInput) { - // Running out of input is fine; just return and we will try beginning the - // stream again when we have more data. - return false; - } - // Decoding the header failed, so we can't actually deal with this stream - // after all. It could be malformed. - ESP_LOGE(kTag, "error beginning stream"); - codec_.reset(); + auto open_res = codec_->OpenStream(stream); + if (open_res.has_error()) { + ESP_LOGE(kTag, "codec failed to start: %s", + codecs::ICodec::ErrorString(open_res.error()).c_str()); return false; } - codecs::ICodec::OutputFormat format = res.second.value(); - StreamInfo::Pcm new_format{ - .channels = format.num_channels, + current_sink_format_ = IAudioSink::Format{ + .sample_rate = open_res->sample_rate_hz, + .num_channels = open_res->num_channels, .bits_per_sample = 32, - .sample_rate = format.sample_rate_hz, }; - - Duration duration; - if (format.duration_seconds) { - duration.src = Duration::Source::kCodec; - duration.duration = *format.duration_seconds; - } else if (stream.info().total_length_seconds()) { - duration.src = Duration::Source::kLibTags; - duration.duration = *stream.info().total_length_seconds(); - } else { - duration.src = Duration::Source::kFileSize; - duration.duration = *stream.info().total_length_bytes(); - } - - if (!ConfigureSink(new_format, duration)) { - return false; - } - - OutputStream writer{codec_buffer_.get()}; - writer.prepare(new_format, {}); - - return true; -} - -auto AudioTask::ContinueDecoding(InputStream& stream) -> bool { - while (!stream.data().empty()) { - OutputStream writer{codec_buffer_.get()}; - - auto res = - codec_->ContinueStream(stream.data(), writer.data_as()); - - stream.consume(res.first); - - if (res.second.has_error()) { - if (res.second.error() == codecs::ICodec::Error::kOutOfInput) { - return true; - } else { - return false; - } - } else { - writer.add(res.second->samples_written * sizeof(sample::Sample)); - - InputStream reader{codec_buffer_.get()}; - SendToSink(reader); - } - } + ESP_LOGI(kTag, "stream started ok"); + events::Audio().Dispatch(internal::InputFileOpened{}); return true; } -auto AudioTask::FinishDecoding(InputStream& stream) -> void { - // HACK: libmad requires each frame passed to it to have an additional - // MAD_HEADER_GUARD (8) bytes after the end of the frame. Without these extra - // bytes, it will not decode the frame. - // The is fine for most of the stream, but at the end of the stream we don't - // get a trailing 8 bytes for free. - if (stream.info().format_as()->type == - codecs::StreamType::kMp3) { - ESP_LOGI(kTag, "applying MAD_HEADER_GUARD fix"); - - std::unique_ptr mad_buffer; - mad_buffer.reset(new RawStream(stream.data().size_bytes() + 8)); - - OutputStream mad_writer{mad_buffer.get()}; - std::copy(stream.data().begin(), stream.data().end(), - mad_writer.data().begin()); - std::fill(mad_writer.data().begin(), mad_writer.data().end(), std::byte{0}); - InputStream padded_stream{mad_buffer.get()}; - - OutputStream writer{codec_buffer_.get()}; - auto res = - codec_->ContinueStream(stream.data(), writer.data_as()); - if (res.second.has_error()) { - return; - } - - writer.add(res.second->samples_written * sizeof(sample::Sample)); - - InputStream reader{codec_buffer_.get()}; - SendToSink(reader); - } -} - -auto AudioTask::ForwardPcmStream(StreamInfo::Pcm& format, - cpp::span samples) -> bool { - // First we need to reconfigure the sink for this sample format. - if (format != current_output_format_) { - Duration d{ - .src = Duration::Source::kFileSize, - .duration = samples.size_bytes(), - }; - if (!ConfigureSink(format, d)) { - return false; - } +auto AudioTask::ContinueDecoding() -> bool { + auto res = codec_->DecodeTo(codec_buffer_); + if (res.has_error()) { + return true; } - // Stream the raw samples directly to the sink. - xStreamBufferSend(sink_->stream(), samples.data(), samples.size_bytes(), - portMAX_DELAY); - timer_->AddBytes(samples.size_bytes()); - InputStream reader{codec_buffer_.get()}; - SendToSink(reader); - - return true; -} - -auto AudioTask::ConfigureSink(const StreamInfo::Pcm& format, - const Duration& duration) -> bool { - if (format != current_output_format_) { - current_output_format_ = format; - StreamInfo::Pcm new_sink_format = sink_->PrepareFormat(format); - if (new_sink_format != current_sink_format_) { - current_sink_format_ = new_sink_format; - - // The new format is different to the old one. Wait for the sink to drain - // before continuing. - while (!xStreamBufferIsEmpty(sink_->stream())) { - ESP_LOGI(kTag, "waiting for sink stream to drain..."); - // TODO(jacqueline): Get the sink drain ISR to notify us of this - // via semaphore instead of busy-ish waiting. - vTaskDelay(pdMS_TO_TICKS(10)); - } - - ESP_LOGI(kTag, "configuring sink"); - sink_->Configure(new_sink_format); - } + if (res->samples_written > 0) { + mixer_->MixAndSend(codec_buffer_.first(res->samples_written), + current_sink_format_.value(), res->is_stream_finished); } - current_output_format_ = format; - timer_.reset(new Timer(format, duration)); - return true; -} - -auto AudioTask::SendToSink(InputStream& stream) -> void { - std::size_t bytes_to_send = stream.data().size_bytes(); - std::size_t bytes_sent; - if (stream.info().format_as() == current_sink_format_) { - bytes_sent = xStreamBufferSend(sink_->stream(), stream.data().data(), - bytes_to_send, portMAX_DELAY); - stream.consume(bytes_sent); - } else { - bytes_sent = mixer_->MixAndSend(stream, current_sink_format_.value()); - } - timer_->AddBytes(bytes_sent); + return res->is_stream_finished; } } // namespace audio diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index d5194821..cae552db 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -19,8 +19,10 @@ #include #include +#include "codec.hpp" #include "esp_heap_caps.h" #include "esp_log.h" +#include "fatfs_source.hpp" #include "ff.h" #include "audio_events.hpp" @@ -41,294 +43,124 @@ static const char* kTag = "SRC"; namespace audio { -static constexpr UINT kFileBufferSize = 8 * 1024; -static constexpr UINT kStreamerBufferSize = 64 * 1024; - -static StreamBufferHandle_t sForwardDest = nullptr; - -auto forward_cb(const BYTE* buf, UINT buf_length) -> UINT { - if (buf_length == 0) { - return !xStreamBufferIsFull(sForwardDest); - } else { - return xStreamBufferSend(sForwardDest, buf, buf_length, 0); - } -} - -FileStreamer::FileStreamer(StreamBufferHandle_t dest, - SemaphoreHandle_t data_was_read) - : control_(xQueueCreate(1, sizeof(Command))), - destination_(dest), - data_was_read_(data_was_read), - has_data_(false), - file_(), - next_file_() { - assert(sForwardDest == nullptr); - sForwardDest = dest; - tasks::StartPersistent([this]() { Main(); }); -} - -FileStreamer::~FileStreamer() { - sForwardDest = nullptr; - Command quit = kQuit; - xQueueSend(control_, &quit, portMAX_DELAY); - vQueueDelete(control_); -} - -auto FileStreamer::Main() -> void { - for (;;) { - Command cmd; - xQueueReceive(control_, &cmd, portMAX_DELAY); - - if (cmd == kQuit) { - break; - } else if (cmd == kRestart) { - CloseFile(); - xStreamBufferReset(destination_); - file_ = std::move(next_file_); - has_data_ = file_ != nullptr; - } else if (cmd == kRefillBuffer && file_) { - UINT bytes_sent = 0; // Unused. - // Use f_forward to push bytes directly from FATFS internal buffers into - // the destination. This has the nice side effect of letting FATFS decide - // the most efficient way to pull in data from disk; usually one whole - // sector at a time. Consult the FATFS lib application notes if changing - // this to use f_read. - FRESULT res = f_forward(file_.get(), forward_cb, UINT_MAX, &bytes_sent); - if (res != FR_OK || f_eof(file_.get())) { - CloseFile(); - has_data_ = false; - } - if (bytes_sent > 0) { - xSemaphoreGive(data_was_read_); - } - } - } - - ESP_LOGW(kTag, "quit file streamer"); - CloseFile(); - vTaskDelete(NULL); -} - -auto FileStreamer::Fetch() -> void { - if (!has_data_.load()) { - return; - } - Command refill = kRefillBuffer; - xQueueSend(control_, &refill, portMAX_DELAY); -} - -auto FileStreamer::HasFinished() -> bool { - return !has_data_.load(); -} - -auto FileStreamer::Restart(std::unique_ptr new_file) -> void { - next_file_ = std::move(new_file); - Command restart = kRestart; - xQueueSend(control_, &restart, portMAX_DELAY); - Command fill = kRefillBuffer; - xQueueSend(control_, &fill, portMAX_DELAY); -} - -auto FileStreamer::CloseFile() -> void { - if (!file_) { - return; - } - ESP_LOGI(kTag, "closing file"); - f_close(file_.get()); - file_ = {}; - events::Audio().Dispatch(internal::InputFileClosed{}); -} - FatfsAudioInput::FatfsAudioInput( std::shared_ptr tag_parser) : IAudioSource(), tag_parser_(tag_parser), - has_data_(xSemaphoreCreateBinary()), - streamer_buffer_(xStreamBufferCreateWithCaps(kStreamerBufferSize, - 1, - MALLOC_CAP_SPIRAM)), - streamer_(new FileStreamer(streamer_buffer_, has_data_)), - input_buffer_(new RawStream(kFileBufferSize)), - source_mutex_(), - pending_path_(), - is_first_read_(false) {} + new_stream_mutex_(), + new_stream_(), + has_new_stream_(xSemaphoreCreateBinary()), + pending_path_() {} FatfsAudioInput::~FatfsAudioInput() { - streamer_.reset(); - vStreamBufferDelete(streamer_buffer_); - vSemaphoreDelete(has_data_); + vSemaphoreDelete(has_new_stream_); } auto FatfsAudioInput::SetPath(std::future> fut) -> void { - std::lock_guard lock{source_mutex_}; - - CloseCurrentFile(); + std::lock_guard guard{new_stream_mutex_}; pending_path_.reset( new database::FutureFetcher>(std::move(fut))); - xSemaphoreGive(has_data_); + xSemaphoreGive(has_new_stream_); } auto FatfsAudioInput::SetPath(const std::string& path) -> void { - std::lock_guard lock{source_mutex_}; - - CloseCurrentFile(); - OpenFile(path); + std::lock_guard guard{new_stream_mutex_}; + if (OpenFile(path)) { + xSemaphoreGive(has_new_stream_); + } } auto FatfsAudioInput::SetPath() -> void { - std::lock_guard lock{source_mutex_}; - CloseCurrentFile(); + std::lock_guard guard{new_stream_mutex_}; + new_stream_.reset(); + xSemaphoreGive(has_new_stream_); } -auto FatfsAudioInput::Read(std::function read_cb, - TickType_t max_wait) -> void { - // Wait until we have data to return. - xSemaphoreTake(has_data_, portMAX_DELAY); - - // Ensure the file doesn't change whilst we're trying to get data about it. - std::lock_guard source_lock{source_mutex_}; - - // If the path is a future, then wait for it to complete. - // TODO(jacqueline): We should really make some kind of FreeRTOS-integrated - // way to block a task whilst awaiting a future. - if (pending_path_) { - while (!pending_path_->Finished()) { - vTaskDelay(pdMS_TO_TICKS(100)); - } - auto res = pending_path_->Result(); - pending_path_.reset(); - - if (res && *res) { - OpenFile(**res); - } - - // Bail out now that we've resolved the future. If we end up successfully - // readinig from the path, then has_data will be flagged again. - return; +auto FatfsAudioInput::HasNewStream() -> bool { + bool res = xSemaphoreTake(has_new_stream_, 0); + if (res) { + xSemaphoreGive(has_new_stream_); } + return res; +} + +auto FatfsAudioInput::NextStream() -> std::shared_ptr { + while (true) { + xSemaphoreTake(has_new_stream_, portMAX_DELAY); + + { + std::lock_guard guard{new_stream_mutex_}; + // If the path is a future, then wait for it to complete. + // TODO(jacqueline): We should really make some kind of + // FreeRTOS-integrated way to block a task whilst awaiting a future. + if (pending_path_) { + while (!pending_path_->Finished()) { + vTaskDelay(pdMS_TO_TICKS(100)); + } + auto res = pending_path_->Result(); + pending_path_.reset(); + + if (res && *res) { + OpenFile(**res); + } + } - // Move data from the file streamer's buffer into our file buffer. We need our - // own buffer so that we can handle concatenating smaller file chunks into - // complete frames for the decoder. - OutputStream writer{input_buffer_.get()}; - std::size_t bytes_added = - xStreamBufferReceive(streamer_buffer_, writer.data().data(), - writer.data().size_bytes(), pdMS_TO_TICKS(0)); - writer.add(bytes_added); - - bool has_data_remaining = HasDataRemaining(); - - InputStream reader{input_buffer_.get()}; - auto data_for_cb = reader.data(); - if (!data_for_cb.empty()) { - std::invoke(read_cb, Flags{is_first_read_, !has_data_remaining}, reader); - is_first_read_ = false; - } + if (new_stream_ == nullptr) { + continue; + } - if (!has_data_remaining) { - // Out of data. We're finished. Note we don't care about anything left in - // the file buffer at this point; the callback as seen it, so if it didn't - // consume it then presumably whatever is left isn't enough to form a - // complete frame. - ESP_LOGI(kTag, "finished streaming file"); - CloseCurrentFile(); - } else { - // There is still data to be read, or sitting in the buffer. - streamer_->Fetch(); - xSemaphoreGive(has_data_); + auto stream = new_stream_; + new_stream_ = nullptr; + return stream; + } } } -auto FatfsAudioInput::OpenFile(const std::string& path) -> void { +auto FatfsAudioInput::OpenFile(const std::string& path) -> bool { ESP_LOGI(kTag, "opening file %s", path.c_str()); - FILINFO info; - if (f_stat(path.c_str(), &info) != FR_OK) { - ESP_LOGE(kTag, "failed to stat file"); - return; - } - database::TrackTags tags; if (!tag_parser_->ReadAndParseTags(path, &tags)) { ESP_LOGE(kTag, "failed to read tags"); - return; + return false; } auto stream_type = ContainerToStreamType(tags.encoding()); if (!stream_type.has_value()) { ESP_LOGE(kTag, "couldn't match container to stream"); - return; - } - - StreamInfo::Format format; - if (*stream_type == codecs::StreamType::kPcm) { - if (tags.channels && tags.bits_per_sample && tags.channels) { - format = StreamInfo::Pcm{ - .channels = static_cast(*tags.channels), - .bits_per_sample = static_cast(*tags.bits_per_sample), - .sample_rate = static_cast(*tags.sample_rate)}; - } else { - ESP_LOGW(kTag, "pcm stream missing format info"); - return; - } - } else { - format = StreamInfo::Encoded{.type = *stream_type}; + return false; } std::unique_ptr file = std::make_unique(); FRESULT res = f_open(file.get(), path.c_str(), FA_READ); if (res != FR_OK) { ESP_LOGE(kTag, "failed to open file! res: %i", res); - return; - } - - OutputStream writer{input_buffer_.get()}; - writer.prepare(format, info.fsize); - if (tags.duration) { - writer.info().total_length_seconds() = *tags.duration; + return false; } - streamer_->Restart(std::move(file)); - is_first_read_ = true; - events::Audio().Dispatch(internal::InputFileOpened{}); + new_stream_.reset(new FatfsSource(stream_type.value(), std::move(file))); + return true; } -auto FatfsAudioInput::CloseCurrentFile() -> void { - streamer_->Restart({}); - xStreamBufferReset(streamer_buffer_); -} - -auto FatfsAudioInput::HasDataRemaining() -> bool { - return !streamer_->HasFinished() || !xStreamBufferIsEmpty(streamer_buffer_); -} - -auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc) +auto FatfsAudioInput::ContainerToStreamType(database::Container enc) -> std::optional { switch (enc) { - case database::Encoding::kMp3: + case database::Container::kMp3: return codecs::StreamType::kMp3; - case database::Encoding::kWav: + case database::Container::kWav: return codecs::StreamType::kPcm; - case database::Encoding::kOgg: + case database::Container::kOgg: return codecs::StreamType::kVorbis; - case database::Encoding::kFlac: + case database::Container::kFlac: return codecs::StreamType::kFlac; - case database::Encoding::kOpus: + case database::Container::kOpus: return codecs::StreamType::kOpus; - case database::Encoding::kUnsupported: + case database::Container::kUnsupported: default: return {}; } } -auto FatfsAudioInput::IsCurrentFormatMp3() -> bool { - auto format = input_buffer_->info().format_as(); - if (!format) { - return false; - } - return format->type == codecs::StreamType::kMp3; -} - } // namespace audio diff --git a/src/audio/fatfs_source.cpp b/src/audio/fatfs_source.cpp new file mode 100644 index 00000000..6a9aea47 --- /dev/null +++ b/src/audio/fatfs_source.cpp @@ -0,0 +1,70 @@ +/* + * Copyright 2023 jacqueline + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#include "fatfs_source.hpp" +#include + +#include +#include +#include + +#include "esp_log.h" +#include "ff.h" + +#include "audio_source.hpp" +#include "codec.hpp" +#include "types.hpp" + +namespace audio { + +static constexpr char kTag[] = "fatfs_src"; + +FatfsSource::FatfsSource(codecs::StreamType t, std::unique_ptr file) + : IStream(t), file_(std::move(file)) {} + +FatfsSource::~FatfsSource() { + f_close(file_.get()); +} + +auto FatfsSource::Read(cpp::span dest) -> ssize_t { + if (f_eof(file_.get())) { + ESP_LOGI(kTag, "read from empty file"); + return 0; + } + UINT bytes_read = 0; + FRESULT res = f_read(file_.get(), dest.data(), dest.size(), &bytes_read); + if (res != FR_OK) { + ESP_LOGE(kTag, "error reading from file"); + return -1; + } + ESP_LOGI(kTag, "read %u bytes into %p (%u)", bytes_read, dest.data(), + dest.size_bytes()); + return bytes_read; +} + +auto FatfsSource::CanSeek() -> bool { + return true; +} + +auto FatfsSource::SeekTo(int64_t destination, SeekFrom from) -> void { + ESP_LOGI(kTag, "seeking to %llu", destination); + switch (from) { + case SeekFrom::kStartOfStream: + f_lseek(file_.get(), destination); + break; + case SeekFrom::kEndOfStream: + f_lseek(file_.get(), f_size(file_.get()) + destination); + break; + case SeekFrom::kCurrentPosition: + f_lseek(file_.get(), f_tell(file_.get()) + destination); + break; + } +} + +auto FatfsSource::CurrentPosition() -> int64_t { + return f_tell(file_.get()); +} +} // namespace audio diff --git a/src/audio/fatfs_source.hpp b/src/audio/fatfs_source.hpp new file mode 100644 index 00000000..e4187d60 --- /dev/null +++ b/src/audio/fatfs_source.hpp @@ -0,0 +1,44 @@ +/* + * Copyright 2023 jacqueline + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#pragma once + +#include +#include +#include + +#include "codec.hpp" +#include "ff.h" + +#include "audio_source.hpp" + +namespace audio { + +/* + * Handles coordination with a persistent background task to asynchronously + * read files from disk into a StreamBuffer. + */ +class FatfsSource : public codecs::IStream { + public: + FatfsSource(codecs::StreamType, std::unique_ptr file); + ~FatfsSource(); + + auto Read(cpp::span dest) -> ssize_t override; + + auto CanSeek() -> bool override; + + auto SeekTo(int64_t destination, SeekFrom from) -> void override; + + auto CurrentPosition() -> int64_t override; + + FatfsSource(const FatfsSource&) = delete; + FatfsSource& operator=(const FatfsSource&) = delete; + + private: + std::unique_ptr file_; +}; + +} // namespace audio \ No newline at end of file diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp index e53dbe2a..b7fcf104 100644 --- a/src/audio/i2s_audio_output.cpp +++ b/src/audio/i2s_audio_output.cpp @@ -115,26 +115,25 @@ auto I2SAudioOutput::AdjustVolumeDown() -> bool { return true; } -auto I2SAudioOutput::PrepareFormat(const StreamInfo::Pcm& orig) - -> StreamInfo::Pcm { - return StreamInfo::Pcm{ - .channels = std::min(orig.channels, 2), - .bits_per_sample = std::clamp(orig.bits_per_sample, 16, 32), +auto I2SAudioOutput::PrepareFormat(const Format& orig) -> Format { + return Format{ .sample_rate = std::clamp(orig.sample_rate, 8000, 96000), + .num_channels = std::min(orig.num_channels, 2), + .bits_per_sample = std::clamp(orig.bits_per_sample, 16, 32), }; } -auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { - if (current_config_ && pcm == *current_config_) { +auto I2SAudioOutput::Configure(const Format& fmt) -> void { + if (current_config_ && fmt == *current_config_) { ESP_LOGI(kTag, "ignoring unchanged format"); return; } - ESP_LOGI(kTag, "incoming audio stream: %u ch %u bpp @ %lu Hz", pcm.channels, - pcm.bits_per_sample, pcm.sample_rate); + ESP_LOGI(kTag, "incoming audio stream: %u ch %u bpp @ %lu Hz", + fmt.num_channels, fmt.bits_per_sample, fmt.sample_rate); drivers::I2SDac::Channels ch; - switch (pcm.channels) { + switch (fmt.num_channels) { case 1: ch = drivers::I2SDac::CHANNELS_MONO; break; @@ -147,7 +146,7 @@ auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { } drivers::I2SDac::BitsPerSample bps; - switch (pcm.bits_per_sample) { + switch (fmt.bits_per_sample) { case 16: bps = drivers::I2SDac::BPS_16; break; @@ -163,7 +162,7 @@ auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { } drivers::I2SDac::SampleRate sample_rate; - switch (pcm.sample_rate) { + switch (fmt.sample_rate) { case 8000: sample_rate = drivers::I2SDac::SAMPLE_RATE_8; break; @@ -188,7 +187,7 @@ auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { } dac_->Reconfigure(ch, bps, sample_rate); - current_config_ = pcm; + current_config_ = fmt; } } // namespace audio diff --git a/src/audio/include/audio_fsm.hpp b/src/audio/include/audio_fsm.hpp index cc3dae0e..d10f31e1 100644 --- a/src/audio/include/audio_fsm.hpp +++ b/src/audio/include/audio_fsm.hpp @@ -95,6 +95,7 @@ class Playback : public AudioState { void entry() override; void exit() override; + void react(const PlayFile&) override; void react(const QueueUpdate&) override; void react(const PlaybackUpdate&) override; diff --git a/src/audio/include/audio_sink.hpp b/src/audio/include/audio_sink.hpp index 28acdc31..2fb4bf63 100644 --- a/src/audio/include/audio_sink.hpp +++ b/src/audio/include/audio_sink.hpp @@ -7,6 +7,7 @@ #pragma once #include +#include #include "audio_element.hpp" #include "esp_heap_caps.h" #include "freertos/FreeRTOS.h" @@ -37,8 +38,16 @@ class IAudioSink { virtual auto AdjustVolumeUp() -> bool = 0; virtual auto AdjustVolumeDown() -> bool = 0; - virtual auto PrepareFormat(const StreamInfo::Pcm&) -> StreamInfo::Pcm = 0; - virtual auto Configure(const StreamInfo::Pcm& format) -> void = 0; + struct Format { + uint32_t sample_rate; + uint_fast8_t num_channels; + uint_fast8_t bits_per_sample; + + bool operator==(const Format&) const = default; + }; + + virtual auto PrepareFormat(const Format&) -> Format = 0; + virtual auto Configure(const Format& format) -> void = 0; auto stream() -> StreamBufferHandle_t { return stream_; } }; diff --git a/src/audio/include/audio_source.hpp b/src/audio/include/audio_source.hpp index 055a92cd..6c54a882 100644 --- a/src/audio/include/audio_source.hpp +++ b/src/audio/include/audio_source.hpp @@ -15,7 +15,10 @@ #include "freertos/portmacro.h" #include "freertos/semphr.h" +#include "codec.hpp" #include "stream_info.hpp" +#include "track.hpp" +#include "types.hpp" namespace audio { @@ -23,25 +26,8 @@ class IAudioSource { public: virtual ~IAudioSource() {} - class Flags { - public: - Flags(bool is_start, bool is_end) { - flags_[0] = is_start; - flags_[1] = is_end; - } - - auto is_start() -> bool { return flags_[0]; } - auto is_end() -> bool { return flags_[1]; } - - private: - std::bitset<2> flags_; - }; - - /* - * Synchronously fetches data from this source. - */ - virtual auto Read(std::function, TickType_t) - -> void = 0; + virtual auto HasNewStream() -> bool = 0; + virtual auto NextStream() -> std::shared_ptr = 0; }; } // namespace audio diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp index b27aa039..48f5502c 100644 --- a/src/audio/include/audio_task.hpp +++ b/src/audio/include/audio_task.hpp @@ -16,6 +16,8 @@ #include "pipeline.hpp" #include "sink_mixer.hpp" #include "stream_info.hpp" +#include "track.hpp" +#include "types.hpp" namespace audio { @@ -52,32 +54,27 @@ class AudioTask { auto Main() -> void; + AudioTask(const AudioTask&) = delete; + AudioTask& operator=(const AudioTask&) = delete; + private: AudioTask(IAudioSource* source, IAudioSink* sink); - auto HandleNewStream(const InputStream&) -> bool; - - auto BeginDecoding(InputStream&) -> bool; - auto ContinueDecoding(InputStream&) -> bool; - auto FinishDecoding(InputStream&) -> void; - - auto ForwardPcmStream(StreamInfo::Pcm&, cpp::span) -> bool; - - auto ConfigureSink(const StreamInfo::Pcm&, const Duration&) -> bool; - auto SendToSink(InputStream&) -> void; + auto BeginDecoding(std::shared_ptr) -> bool; + auto ContinueDecoding() -> bool; IAudioSource* source_; IAudioSink* sink_; + + std::shared_ptr stream_; std::unique_ptr codec_; std::unique_ptr mixer_; std::unique_ptr timer_; - bool has_begun_decoding_; - std::optional current_input_format_; - std::optional current_output_format_; - std::optional current_sink_format_; + std::optional current_format_; + std::optional current_sink_format_; - std::unique_ptr codec_buffer_; + cpp::span codec_buffer_; }; } // namespace audio diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index e13e49e2..df40696a 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -12,6 +12,7 @@ #include #include +#include "codec.hpp" #include "ff.h" #include "audio_source.hpp" @@ -23,54 +24,6 @@ namespace audio { -/* - * Handles coordination with a persistent background task to asynchronously - * read files from disk into a StreamBuffer. - */ -class FileStreamer { - public: - FileStreamer(StreamBufferHandle_t dest, SemaphoreHandle_t first_read); - ~FileStreamer(); - - /* - * Continues reading data into the destination buffer until the destination - * is full. - */ - auto Fetch() -> void; - - /* Returns true if the streamer has run out of data from the current file. */ - auto HasFinished() -> bool; - - /* - * Clears any remaining buffered data, and begins reading again from the - * given file. This function respects any seeking/reading that has already - * been done on the new source file. - */ - auto Restart(std::unique_ptr) -> void; - - FileStreamer(const FileStreamer&) = delete; - FileStreamer& operator=(const FileStreamer&) = delete; - - private: - // Note: private methods here should only be called from the streamer's task. - - auto Main() -> void; - auto CloseFile() -> void; - - enum Command { - kRestart, - kRefillBuffer, - kQuit, - }; - QueueHandle_t control_; - StreamBufferHandle_t destination_; - SemaphoreHandle_t data_was_read_; - - std::atomic has_data_; - std::unique_ptr file_; - std::unique_ptr next_file_; -}; - /* * Audio source that fetches data from a FatFs (or exfat i guess) filesystem. * @@ -89,43 +42,27 @@ class FatfsAudioInput : public IAudioSource { auto SetPath(const std::string&) -> void; auto SetPath() -> void; - auto Read(std::function, TickType_t) - -> void override; + auto HasNewStream() -> bool override; + auto NextStream() -> std::shared_ptr override; FatfsAudioInput(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; private: - // Note: private methods assume that the appropriate locks have already been - // acquired. - - auto OpenFile(const std::string& path) -> void; - auto CloseCurrentFile() -> void; - auto HasDataRemaining() -> bool; + auto OpenFile(const std::string& path) -> bool; - auto ContainerToStreamType(database::Encoding) + auto ContainerToStreamType(database::Container) -> std::optional; - auto IsCurrentFormatMp3() -> bool; std::shared_ptr tag_parser_; - // Semaphore used to block when this source is out of data. This should be - // acquired before attempting to read data, and returned after each incomplete - // read. - SemaphoreHandle_t has_data_; - - StreamBufferHandle_t streamer_buffer_; - std::unique_ptr streamer_; - - std::unique_ptr input_buffer_; + std::mutex new_stream_mutex_; + std::shared_ptr new_stream_; - // Mutex guarding the current file/stream associated with this source. Must be - // held during readings, and before altering the current file. - std::mutex source_mutex_; + SemaphoreHandle_t has_new_stream_; std::unique_ptr>> pending_path_; - bool is_first_read_; }; } // namespace audio diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp index e0f791c5..717e6519 100644 --- a/src/audio/include/i2s_audio_output.hpp +++ b/src/audio/include/i2s_audio_output.hpp @@ -35,8 +35,8 @@ class I2SAudioOutput : public IAudioSink { auto AdjustVolumeUp() -> bool override; auto AdjustVolumeDown() -> bool override; - auto PrepareFormat(const StreamInfo::Pcm&) -> StreamInfo::Pcm override; - auto Configure(const StreamInfo::Pcm& format) -> void override; + auto PrepareFormat(const Format&) -> Format override; + auto Configure(const Format& format) -> void override; I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; @@ -45,7 +45,7 @@ class I2SAudioOutput : public IAudioSink { drivers::IGpios* expander_; std::shared_ptr dac_; - std::optional current_config_; + std::optional current_config_; int_fast8_t left_difference_; uint16_t current_volume_; uint16_t max_volume_; diff --git a/src/audio/include/sink_mixer.hpp b/src/audio/include/sink_mixer.hpp index d1e9aa8a..b4d25781 100644 --- a/src/audio/include/sink_mixer.hpp +++ b/src/audio/include/sink_mixer.hpp @@ -28,44 +28,42 @@ namespace audio { */ class SinkMixer { public: - SinkMixer(StreamBufferHandle_t dest); + SinkMixer(IAudioSink* sink); ~SinkMixer(); - auto MixAndSend(InputStream&, const StreamInfo::Pcm&) -> std::size_t; + auto MixAndSend(cpp::span, + const IAudioSink::Format& format, + bool is_eos) -> void; private: auto Main() -> void; auto SetTargetFormat(const StreamInfo::Pcm& format) -> void; - auto HandleBytes() -> void; + auto HandleSamples(cpp::span, bool) -> size_t; - auto Resample(InputStream&, OutputStream&) -> bool; auto ApplyDither(cpp::span samples, uint_fast8_t bits) -> void; - auto Downscale(cpp::span, cpp::span) -> void; - - enum class Command { - kReadBytes, - kSetSourceFormat, - kSetTargetFormat, - }; struct Args { - Command cmd; - StreamInfo::Pcm format; + IAudioSink::Format format; + size_t samples_available; + bool is_end_of_stream; }; - QueueHandle_t commands_; - SemaphoreHandle_t is_idle_; std::unique_ptr resampler_; - std::unique_ptr input_stream_; - std::unique_ptr resampled_stream_; - - StreamInfo::Pcm target_format_; StreamBufferHandle_t source_; - StreamBufferHandle_t sink_; + cpp::span input_buffer_; + cpp::span input_buffer_as_bytes_; + + cpp::span resampled_buffer_; + + IAudioSink* sink_; + IAudioSink::Format source_format_; + IAudioSink::Format target_format_; + size_t leftover_bytes_; + size_t leftover_offset_; }; } // namespace audio diff --git a/src/audio/sink_mixer.cpp b/src/audio/sink_mixer.cpp index 6c72c8b0..9f973d4b 100644 --- a/src/audio/sink_mixer.cpp +++ b/src/audio/sink_mixer.cpp @@ -7,6 +7,7 @@ #include "sink_mixer.hpp" #include +#include #include #include "esp_heap_caps.h" @@ -23,20 +24,28 @@ static constexpr char kTag[] = "mixer"; static constexpr std::size_t kSourceBufferLength = 8 * 1024; -static constexpr std::size_t kSampleBufferLength = 240 * 2 * sizeof(int32_t); +static constexpr std::size_t kSampleBufferLength = 240 * 2; namespace audio { -SinkMixer::SinkMixer(StreamBufferHandle_t dest) +SinkMixer::SinkMixer(IAudioSink* sink) : commands_(xQueueCreate(1, sizeof(Args))), - is_idle_(xSemaphoreCreateBinary()), resampler_(nullptr), source_(xStreamBufferCreateWithCaps(kSourceBufferLength, 1, MALLOC_CAP_SPIRAM)), - sink_(dest) { - input_stream_.reset(new RawStream(kSampleBufferLength)); - resampled_stream_.reset(new RawStream(kSampleBufferLength)); + sink_(sink) { + input_buffer_ = { + reinterpret_cast(heap_caps_calloc( + kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kSampleBufferLength}; + input_buffer_as_bytes_ = {reinterpret_cast(input_buffer_.data()), + input_buffer_.size_bytes()}; + + resampled_buffer_ = { + reinterpret_cast(heap_caps_calloc( + kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kSampleBufferLength}; // Pin to CORE0 because we need the FPU. // FIXME: A fixed point implementation could run freely on either core, @@ -46,168 +55,156 @@ SinkMixer::SinkMixer(StreamBufferHandle_t dest) SinkMixer::~SinkMixer() { vQueueDelete(commands_); - vSemaphoreDelete(is_idle_); vStreamBufferDelete(source_); } -auto SinkMixer::MixAndSend(InputStream& input, const StreamInfo::Pcm& target) - -> std::size_t { - if (input.info().format_as() != - input_stream_->info().format_as()) { - xSemaphoreTake(is_idle_, portMAX_DELAY); - Args args{ - .cmd = Command::kSetSourceFormat, - .format = input.info().format_as().value(), - }; - xQueueSend(commands_, &args, portMAX_DELAY); - xSemaphoreGive(is_idle_); - } - if (target_format_ != target) { - xSemaphoreTake(is_idle_, portMAX_DELAY); - Args args{ - .cmd = Command::kSetTargetFormat, - .format = target, - }; - xQueueSend(commands_, &args, portMAX_DELAY); - xSemaphoreGive(is_idle_); - } - +auto SinkMixer::MixAndSend(cpp::span input, + const IAudioSink::Format& format, + bool is_eos) -> void { Args args{ - .cmd = Command::kReadBytes, - .format = {}, + .format = format, + .samples_available = input.size(), + .is_end_of_stream = is_eos, }; xQueueSend(commands_, &args, portMAX_DELAY); - auto buf = input.data(); - std::size_t bytes_sent = - xStreamBufferSend(source_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input.consume(bytes_sent); - return bytes_sent; + cpp::span input_as_bytes = { + reinterpret_cast(input.data()), input.size_bytes()}; + size_t bytes_sent = 0; + while (bytes_sent < input_as_bytes.size()) { + bytes_sent += + xStreamBufferSend(source_, input_as_bytes.subspan(bytes_sent).data(), + input_as_bytes.size() - bytes_sent, portMAX_DELAY); + } } auto SinkMixer::Main() -> void { - OutputStream input_receiver{input_stream_.get()}; - xSemaphoreGive(is_idle_); - for (;;) { Args args; while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { } - switch (args.cmd) { - case Command::kSetSourceFormat: - ESP_LOGI(kTag, "setting source format"); - input_receiver.prepare(args.format, {}); - resampler_.reset(); - break; - case Command::kSetTargetFormat: - ESP_LOGI(kTag, "setting target format"); - target_format_ = args.format; - resampler_.reset(); - break; - case Command::kReadBytes: - xSemaphoreTake(is_idle_, 0); - while (!xStreamBufferIsEmpty(source_)) { - auto buf = input_receiver.data(); - std::size_t bytes_received = xStreamBufferReceive( - source_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input_receiver.add(bytes_received); - HandleBytes(); + if (args.format != source_format_) { + resampler_.reset(); + source_format_ = args.format; + leftover_bytes_ = 0; + leftover_offset_ = 0; + + auto new_target = sink_->PrepareFormat(args.format); + if (new_target != target_format_) { + // The new format is different to the old one. Wait for the sink to + // drain before continuing. + while (!xStreamBufferIsEmpty(sink_->stream())) { + ESP_LOGI(kTag, "waiting for sink stream to drain..."); + // TODO(jacqueline): Get the sink drain ISR to notify us of this + // via semaphore instead of busy-ish waiting. + vTaskDelay(pdMS_TO_TICKS(10)); } - xSemaphoreGive(is_idle_); - break; + + ESP_LOGI(kTag, "configuring sink"); + sink_->Configure(new_target); + } + target_format_ = new_target; } - } -} -auto SinkMixer::HandleBytes() -> void { - InputStream input{input_stream_.get()}; - auto pcm = input.info().format_as(); - if (!pcm) { - ESP_LOGE(kTag, "mixer got unsupported data"); - return; + // Loop until we finish reading all the bytes indicated. There might be + // leftovers from each iteration, and from this process as a whole, + // depending on the resampling stage. + size_t bytes_read = 0; + size_t bytes_to_read = args.samples_available * sizeof(sample::Sample); + while (bytes_read < bytes_to_read) { + // First top up the input buffer, taking care not to overwrite anything + // remaining from a previous iteration. + size_t bytes_read_this_it = xStreamBufferReceive( + source_, + input_buffer_as_bytes_.subspan(leftover_offset_ + leftover_bytes_) + .data(), + std::min(input_buffer_as_bytes_.size() - leftover_offset_ - + leftover_bytes_, + bytes_to_read - bytes_read), + portMAX_DELAY); + bytes_read += bytes_read_this_it; + + // Calculate the number of whole samples that are now in the input buffer. + size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_; + size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample); + + size_t samples_used = HandleSamples( + input_buffer_.subspan(leftover_offset_).first(samples_in_buffer), + args.is_end_of_stream && bytes_read == bytes_to_read); + + // Maybe the resampler didn't consume everything. Maybe the last few + // bytes we read were half a frame. Either way, we need to calculate the + // size of the remainder in bytes. + size_t bytes_used = samples_used * sizeof(sample::Sample); + leftover_bytes_ = bytes_in_buffer - bytes_used; + if (leftover_bytes_ == 0) { + leftover_offset_ = 0; + } else { + leftover_offset_ += bytes_used; + } + } } +} - if (*pcm == target_format_) { +auto SinkMixer::HandleSamples(cpp::span input, bool is_eos) + -> size_t { + if (source_format_ == target_format_) { // The happiest possible case: the input format matches the output - // format already. Streams like this should probably have bypassed the - // mixer. - // TODO(jacqueline): Make this an error; it's slow to use the mixer in this - // case, compared to just writing directly to the sink. - auto buf = input.data(); - std::size_t bytes_sent = - xStreamBufferSend(sink_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input.consume(bytes_sent); - return; + // format already. + std::size_t bytes_sent = xStreamBufferSend( + sink_->stream(), input.data(), input.size_bytes(), portMAX_DELAY); + return bytes_sent / sizeof(sample::Sample); } - while (input_stream_->info().bytes_in_stream() >= sizeof(sample::Sample)) { - RawStream* output_source; - if (pcm->sample_rate != target_format_.sample_rate) { - OutputStream resampled_writer{resampled_stream_.get()}; - if (Resample(input, resampled_writer)) { + size_t samples_used = 0; + while (input.size() < samples_used) { + cpp::span output_source; + if (source_format_.sample_rate != target_format_.sample_rate) { + if (resampler_ == nullptr) { + ESP_LOGI(kTag, "creating new resampler"); + resampler_.reset(new Resampler(source_format_.sample_rate, + target_format_.sample_rate, + source_format_.num_channels)); + } + + size_t read, written; + std::tie(read, written) = + resampler_->Process(input, resampled_buffer_, is_eos); + samples_used += read; + if (read == 0 && written == 0) { // Zero samples used or written. We need more input. break; } - output_source = resampled_stream_.get(); + output_source = resampled_buffer_.first(written); } else { - output_source = input_stream_.get(); + output_source = input; + samples_used = input.size(); } - size_t bytes_consumed = output_source->info().bytes_in_stream(); - size_t bytes_to_send = output_source->info().bytes_in_stream(); - if (target_format_.bits_per_sample == 16) { - // This is slightly scary; we're basically reaching into the internals of - // the stream buffer to do in-place conversion of samples. Saving an - // extra buffer + copy into that buffer is certainly worth it however. - cpp::span src = - output_source->data_as().first( - output_source->info().bytes_in_stream() / sizeof(sample::Sample)); - cpp::span dest{reinterpret_cast(src.data()), - src.size()}; - - ApplyDither(src, 16); - Downscale(src, dest); - - bytes_consumed = src.size_bytes(); - bytes_to_send = src.size_bytes() / 2; - } + // FIXME: The source should have some kind of hint indicating whether it + // needs dither, since some codecs (e.g. opus) apply their own dither. + ApplyDither(output_source, 16); + + cpp::span dest{reinterpret_cast(output_source.data()), + output_source.size()}; + for (size_t i = 0; i < output_source.size(); i++) { + dest[i] = sample::ToSigned16Bit(output_source[i]); + } - InputStream output{output_source}; - cpp::span buf = output.data(); + output_source = output_source.first(output_source.size() / 2); + } size_t bytes_sent = 0; + size_t bytes_to_send = output_source.size_bytes(); while (bytes_sent < bytes_to_send) { - auto cropped = buf.subspan(bytes_sent, bytes_to_send - bytes_sent); - bytes_sent += xStreamBufferSend(sink_, cropped.data(), - cropped.size_bytes(), portMAX_DELAY); + bytes_sent += xStreamBufferSend( + sink_->stream(), + reinterpret_cast(output_source.data()) + bytes_sent, + bytes_to_send - bytes_sent, portMAX_DELAY); } - output.consume(bytes_consumed); - } -} - -auto SinkMixer::Resample(InputStream& in, OutputStream& out) -> bool { - if (resampler_ == nullptr) { - ESP_LOGI(kTag, "creating new resampler"); - auto format = in.info().format_as(); - resampler_.reset(new Resampler( - format->sample_rate, target_format_.sample_rate, format->channels)); - } - - auto res = resampler_->Process(in.data_as(), - out.data_as(), false); - - in.consume(res.first * sizeof(sample::Sample)); - out.add(res.second * sizeof(sample::Sample)); - - return res.first == 0 && res.second == 0; -} - -auto SinkMixer::Downscale(cpp::span samples, - cpp::span output) -> void { - for (size_t i = 0; i < samples.size(); i++) { - output[i] = sample::ToSigned16Bit(samples[i]); } + return samples_used; } auto SinkMixer::ApplyDither(cpp::span samples, diff --git a/src/codecs/CMakeLists.txt b/src/codecs/CMakeLists.txt index 91d3f319..2d98198b 100644 --- a/src/codecs/CMakeLists.txt +++ b/src/codecs/CMakeLists.txt @@ -4,6 +4,7 @@ idf_component_register( SRCS "codec.cpp" "mad.cpp" "foxenflac.cpp" "opus.cpp" "vorbis.cpp" + "source_buffer.cpp" INCLUDE_DIRS "include" REQUIRES "result" "span" "libmad" "libfoxenflac" "tremor" "opusfile") diff --git a/src/codecs/codec.cpp b/src/codecs/codec.cpp index 9ac20097..a4c1a5cf 100644 --- a/src/codecs/codec.cpp +++ b/src/codecs/codec.cpp @@ -10,10 +10,10 @@ #include #include "foxenflac.hpp" -#include "opus.hpp" #include "mad.hpp" -#include "vorbis.hpp" +#include "opus.hpp" #include "types.hpp" +#include "vorbis.hpp" namespace codecs { diff --git a/src/codecs/foxenflac.cpp b/src/codecs/foxenflac.cpp index b676f82a..cc110920 100644 --- a/src/codecs/foxenflac.cpp +++ b/src/codecs/foxenflac.cpp @@ -19,23 +19,34 @@ namespace codecs { static const char kTag[] = "flac"; FoxenFlacDecoder::FoxenFlacDecoder() - : flac_(FX_FLAC_ALLOC(FLAC_MAX_BLOCK_SIZE, 2)) {} + : input_(), buffer_(), flac_(FX_FLAC_ALLOC(FLAC_MAX_BLOCK_SIZE, 2)) {} FoxenFlacDecoder::~FoxenFlacDecoder() { free(flac_); } -auto FoxenFlacDecoder::BeginStream(const cpp::span input) - -> Result { - uint32_t bytes_used = input.size_bytes(); - fx_flac_state_t state = - fx_flac_process(flac_, reinterpret_cast(input.data()), - &bytes_used, NULL, NULL); +auto FoxenFlacDecoder::OpenStream(std::shared_ptr input) + -> cpp::result { + input_ = input; + + bool eof = false; + fx_flac_state_t state; + do { + eof = buffer_.Refill(input_.get()); + buffer_.ConsumeBytes([&](cpp::span buf) -> size_t { + uint32_t bytes_used = buf.size(); + state = + fx_flac_process(flac_, reinterpret_cast(buf.data()), + &bytes_used, NULL, NULL); + return bytes_used; + }); + } while (state != FLAC_END_OF_METADATA && !eof); + if (state != FLAC_END_OF_METADATA) { if (state == FLAC_ERR) { - return {bytes_used, cpp::fail(Error::kMalformedData)}; + return cpp::fail(Error::kMalformedData); } else { - return {bytes_used, cpp::fail(Error::kOutOfInput)}; + return cpp::fail(Error::kOutOfInput); } } @@ -43,14 +54,12 @@ auto FoxenFlacDecoder::BeginStream(const cpp::span input) int64_t fs = fx_flac_get_streaminfo(flac_, FLAC_KEY_SAMPLE_RATE); if (channels == FLAC_INVALID_METADATA_KEY || fs == FLAC_INVALID_METADATA_KEY) { - return {bytes_used, cpp::fail(Error::kMalformedData)}; + return cpp::fail(Error::kMalformedData); } OutputFormat format{ .num_channels = static_cast(channels), .sample_rate_hz = static_cast(fs), - .duration_seconds = {}, - .bits_per_second = {}, }; uint64_t num_samples = fx_flac_get_streaminfo(flac_, FLAC_KEY_N_SAMPLES); @@ -58,38 +67,32 @@ auto FoxenFlacDecoder::BeginStream(const cpp::span input) format.duration_seconds = num_samples / fs; } - return {bytes_used, format}; + return format; } -auto FoxenFlacDecoder::ContinueStream(cpp::span input, - cpp::span output) - -> Result { - cpp::span output_as_samples{ - reinterpret_cast(output.data()), output.size_bytes() / 4}; - uint32_t bytes_read = input.size_bytes(); - uint32_t samples_written = output_as_samples.size(); - - fx_flac_state_t state = - fx_flac_process(flac_, reinterpret_cast(input.data()), - &bytes_read, output_as_samples.data(), &samples_written); - if (state == FLAC_ERR) { - return {bytes_read, cpp::fail(Error::kMalformedData)}; - } +auto FoxenFlacDecoder::DecodeTo(cpp::span output) + -> cpp::result { + bool is_eof = buffer_.Refill(input_.get()); - if (samples_written > 0) { - return {bytes_read, - OutputInfo{.samples_written = samples_written, - .is_finished_writing = state == FLAC_END_OF_FRAME}}; + fx_flac_state_t state; + uint32_t samples_written = output.size(); + + buffer_.ConsumeBytes([&](cpp::span buf) -> size_t { + uint32_t bytes_read = buf.size_bytes(); + state = fx_flac_process(flac_, reinterpret_cast(buf.data()), + &bytes_read, output.data(), &samples_written); + return bytes_read; + }); + if (state == FLAC_ERR) { + return cpp::fail(Error::kMalformedData); } - // No error, but no samples written. We must be out of data. - return {bytes_read, cpp::fail(Error::kOutOfInput)}; + return OutputInfo{.samples_written = samples_written, + .is_stream_finished = samples_written == 0 && is_eof}; } -auto FoxenFlacDecoder::SeekStream(cpp::span input, - std::size_t target_sample) -> Result { - // TODO(jacqueline): Implement me. - return {0, {}}; +auto FoxenFlacDecoder::SeekTo(size_t target) -> cpp::result { + return {}; } } // namespace codecs diff --git a/src/codecs/include/codec.hpp b/src/codecs/include/codec.hpp index 32ebef69..ece3d4fe 100644 --- a/src/codecs/include/codec.hpp +++ b/src/codecs/include/codec.hpp @@ -23,6 +23,34 @@ namespace codecs { +/* + * Interface for an abstract source of file-like data. + */ +class IStream { + public: + IStream(StreamType t) : t_(t) {} + virtual ~IStream() {} + + auto type() -> StreamType { return t_; } + + virtual auto Read(cpp::span dest) -> ssize_t = 0; + + virtual auto CanSeek() -> bool = 0; + + enum class SeekFrom { + kStartOfStream, + kEndOfStream, + kCurrentPosition, + }; + + virtual auto SeekTo(int64_t destination, SeekFrom from) -> void = 0; + + virtual auto CurrentPosition() -> int64_t = 0; + + protected: + StreamType t_; +}; + /* * Common interface to be implemented by all audio decoders. */ @@ -63,32 +91,30 @@ class ICodec { struct OutputFormat { uint8_t num_channels; uint32_t sample_rate_hz; - std::optional duration_seconds; - std::optional bits_per_second; + + bool operator==(const OutputFormat&) const = default; }; /* * Decodes metadata or headers from the given input stream, and returns the * format for the samples that will be decoded from it. */ - virtual auto BeginStream(cpp::span input) - -> Result = 0; + virtual auto OpenStream(std::shared_ptr input) + -> cpp::result = 0; struct OutputInfo { std::size_t samples_written; - bool is_finished_writing; + bool is_stream_finished; }; /* * Writes PCM samples to the given output buffer. */ - virtual auto ContinueStream(cpp::span input, - cpp::span output) - -> Result = 0; + virtual auto DecodeTo(cpp::span destination) + -> cpp::result = 0; - virtual auto SeekStream(cpp::span input, - std::size_t target_sample) -> Result = 0; + virtual auto SeekTo(size_t target_sample) -> cpp::result = 0; }; auto CreateCodecForType(StreamType type) -> std::optional; diff --git a/src/codecs/include/foxenflac.hpp b/src/codecs/include/foxenflac.hpp index abfa6d80..7522d967 100644 --- a/src/codecs/include/foxenflac.hpp +++ b/src/codecs/include/foxenflac.hpp @@ -15,6 +15,7 @@ #include "foxen/flac.h" #include "sample.hpp" +#include "source_buffer.hpp" #include "span.hpp" #include "codec.hpp" @@ -26,13 +27,21 @@ class FoxenFlacDecoder : public ICodec { FoxenFlacDecoder(); ~FoxenFlacDecoder(); - auto BeginStream(cpp::span) -> Result override; - auto ContinueStream(cpp::span, cpp::span) - -> Result override; - auto SeekStream(cpp::span input, std::size_t target_sample) - -> Result override; + auto OpenStream(std::shared_ptr input) + -> cpp::result override; + + auto DecodeTo(cpp::span destination) + -> cpp::result override; + + auto SeekTo(std::size_t target_sample) -> cpp::result override; + + FoxenFlacDecoder(const FoxenFlacDecoder&) = delete; + FoxenFlacDecoder& operator=(const FoxenFlacDecoder&) = delete; private: + std::shared_ptr input_; + SourceBuffer buffer_; + fx_flac_t* flac_; }; diff --git a/src/codecs/include/mad.hpp b/src/codecs/include/mad.hpp index b81e4acb..2a8813e9 100644 --- a/src/codecs/include/mad.hpp +++ b/src/codecs/include/mad.hpp @@ -14,6 +14,7 @@ #include "mad.h" #include "sample.hpp" +#include "source_buffer.hpp" #include "span.hpp" #include "codec.hpp" @@ -25,33 +26,31 @@ class MadMp3Decoder : public ICodec { MadMp3Decoder(); ~MadMp3Decoder(); - /* - * Returns the output format for the next frame in the stream. MP3 streams - * may represent multiple distinct tracks, with different bitrates, and so we - * handle the stream only on a frame-by-frame basis. - */ - auto BeginStream(cpp::span) -> Result override; + auto OpenStream(std::shared_ptr input) + -> cpp::result override; - /* - * Writes samples for the current frame. - */ - auto ContinueStream(cpp::span input, - cpp::span output) - -> Result override; + auto DecodeTo(cpp::span destination) + -> cpp::result override; - auto SeekStream(cpp::span input, std::size_t target_sample) - -> Result override; + auto SeekTo(std::size_t target_sample) -> cpp::result override; + + MadMp3Decoder(const MadMp3Decoder&) = delete; + MadMp3Decoder& operator=(const MadMp3Decoder&) = delete; private: auto GetVbrLength(const mad_header& header) -> std::optional; + auto GetBytesUsed() -> std::size_t; + + std::shared_ptr input_; + SourceBuffer buffer_; mad_stream stream_; mad_frame frame_; mad_synth synth_; int current_sample_; - - auto GetBytesUsed(std::size_t) -> std::size_t; + bool is_eof_; + bool is_eos_; }; } // namespace codecs diff --git a/src/codecs/include/opus.hpp b/src/codecs/include/opus.hpp index 051cd0b9..45b1b07a 100644 --- a/src/codecs/include/opus.hpp +++ b/src/codecs/include/opus.hpp @@ -26,30 +26,21 @@ class XiphOpusDecoder : public ICodec { XiphOpusDecoder(); ~XiphOpusDecoder(); - /* - * Returns the output format for the next frame in the stream. MP3 streams - * may represent multiple distinct tracks, with different bitrates, and so we - * handle the stream only on a frame-by-frame basis. - */ - auto BeginStream(cpp::span) -> Result override; - - /* - * Writes samples for the current frame. - */ - auto ContinueStream(cpp::span input, - cpp::span output) - -> Result override; - - auto SeekStream(cpp::span input, std::size_t target_sample) - -> Result override; - - auto ReadCallback() -> cpp::span; - auto AfterReadCallback(size_t bytes_read) -> void; + auto OpenStream(std::shared_ptr input) + -> cpp::result override; + + auto DecodeTo(cpp::span destination) + -> cpp::result override; + + auto SeekTo(std::size_t target_sample) -> cpp::result override; + + XiphOpusDecoder(const XiphOpusDecoder&) = delete; + XiphOpusDecoder& operator=(const XiphOpusDecoder&) = delete; private: + std::shared_ptr input_; OggOpusFile* opus_; - cpp::span input_; - size_t pos_in_input_; + uint8_t num_channels_; }; } // namespace codecs diff --git a/src/codecs/include/source_buffer.hpp b/src/codecs/include/source_buffer.hpp new file mode 100644 index 00000000..d0d7635a --- /dev/null +++ b/src/codecs/include/source_buffer.hpp @@ -0,0 +1,37 @@ +/* + * Copyright 2023 jacqueline + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#pragma once + +#include +#include +#include + +#include "span.hpp" + +#include "codec.hpp" + +namespace codecs { + +class SourceBuffer { + public: + SourceBuffer(); + ~SourceBuffer(); + + auto Refill(IStream* src) -> bool; + auto AddBytes(std::function)> writer) -> void; + auto ConsumeBytes(std::function)> reader) -> void; + + SourceBuffer(const SourceBuffer&) = delete; + SourceBuffer& operator=(const SourceBuffer&) = delete; + + private: + const cpp::span buffer_; + size_t bytes_in_buffer_; + size_t offset_of_bytes_; +}; + +} // namespace codecs diff --git a/src/codecs/include/vorbis.hpp b/src/codecs/include/vorbis.hpp index ab15af19..2f93c37e 100644 --- a/src/codecs/include/vorbis.hpp +++ b/src/codecs/include/vorbis.hpp @@ -28,30 +28,20 @@ class TremorVorbisDecoder : public ICodec { TremorVorbisDecoder(); ~TremorVorbisDecoder(); - /* - * Returns the output format for the next frame in the stream. MP3 streams - * may represent multiple distinct tracks, with different bitrates, and so we - * handle the stream only on a frame-by-frame basis. - */ - auto BeginStream(cpp::span) -> Result override; - - /* - * Writes samples for the current frame. - */ - auto ContinueStream(cpp::span input, - cpp::span output) - -> Result override; - - auto SeekStream(cpp::span input, std::size_t target_sample) - -> Result override; - - auto ReadCallback() -> cpp::span; - auto AfterReadCallback(size_t bytes_read) -> void; + auto OpenStream(std::shared_ptr input) + -> cpp::result override; + + auto DecodeTo(cpp::span destination) + -> cpp::result override; + + auto SeekTo(std::size_t target_sample) -> cpp::result override; + + TremorVorbisDecoder(const TremorVorbisDecoder&) = delete; + TremorVorbisDecoder& operator=(const TremorVorbisDecoder&) = delete; private: + std::shared_ptr input_; OggVorbis_File vorbis_; - cpp::span input_; - size_t pos_in_input_; }; } // namespace codecs diff --git a/src/codecs/mad.cpp b/src/codecs/mad.cpp index a2739bcd..ce3a9cac 100644 --- a/src/codecs/mad.cpp +++ b/src/codecs/mad.cpp @@ -22,7 +22,10 @@ namespace codecs { -MadMp3Decoder::MadMp3Decoder() { +static constexpr char kTag[] = "mad"; + +MadMp3Decoder::MadMp3Decoder() + : input_(), buffer_(), current_sample_(-1), is_eof_(false), is_eos_(false) { mad_stream_init(&stream_); mad_frame_init(&frame_); mad_synth_init(&synth_); @@ -33,185 +36,145 @@ MadMp3Decoder::~MadMp3Decoder() { mad_synth_finish(&synth_); } -auto MadMp3Decoder::GetBytesUsed(std::size_t buffer_size) -> std::size_t { +auto MadMp3Decoder::GetBytesUsed() -> std::size_t { if (stream_.next_frame) { - std::size_t remaining = stream_.bufend - stream_.next_frame; - return buffer_size - remaining; + return stream_.next_frame - stream_.buffer; } else { return stream_.bufend - stream_.buffer; } } -auto MadMp3Decoder::BeginStream(const cpp::span input) - -> Result { - mad_stream_buffer(&stream_, - reinterpret_cast(input.data()), - input.size_bytes()); - // Whatever was last synthesized is now invalid, so ensure we don't try to - // send it. - current_sample_ = -1; +auto MadMp3Decoder::OpenStream(std::shared_ptr input) + -> cpp::result { + input_ = input; // To get the output format for MP3 streams, we simply need to decode the // first frame header. mad_header header; mad_header_init(&header); - while (mad_header_decode(&header, &stream_) < 0) { - if (MAD_RECOVERABLE(stream_.error)) { - // Recoverable errors are usually malformed parts of the stream. - // We can recover from them by just retrying the decode. - continue; - } - if (stream_.error == MAD_ERROR_BUFLEN) { - return {GetBytesUsed(input.size_bytes()), cpp::fail(Error::kOutOfInput)}; - } - return {GetBytesUsed(input.size_bytes()), cpp::fail(Error::kMalformedData)}; + bool eof = false; + bool got_header = false; + while (!eof && !got_header) { + eof = buffer_.Refill(input_.get()); + + buffer_.ConsumeBytes([&](cpp::span buf) -> size_t { + mad_stream_buffer(&stream_, + reinterpret_cast(buf.data()), + buf.size_bytes()); + + while (mad_header_decode(&header, &stream_) < 0) { + if (MAD_RECOVERABLE(stream_.error)) { + // Recoverable errors are usually malformed parts of the stream. + // We can recover from them by just retrying the decode. + continue; + } + if (stream_.error == MAD_ERROR_BUFLEN) { + return GetBytesUsed(); + } + eof = true; + return 0; + } + + got_header = true; + return GetBytesUsed(); + }); + } + + if (!got_header) { + return cpp::fail(ICodec::Error::kMalformedData); } uint8_t channels = MAD_NCHANNELS(&header); OutputFormat output{ .num_channels = channels, .sample_rate_hz = header.samplerate, - .duration_seconds = {}, - .bits_per_second = {}, }; auto vbr_length = GetVbrLength(header); if (vbr_length) { output.duration_seconds = vbr_length; - } else { - output.bits_per_second = header.bitrate; } - - return {GetBytesUsed(input.size_bytes()), output}; + return output; } -auto MadMp3Decoder::ContinueStream(cpp::span input, - cpp::span output) - -> Result { - std::size_t bytes_read = 0; - if (current_sample_ < 0) { - mad_stream_buffer(&stream_, - reinterpret_cast(input.data()), - input.size()); - - // Decode the next frame. To signal errors, this returns -1 and - // stashes an error code in the stream structure. - while (mad_frame_decode(&frame_, &stream_) < 0) { - if (MAD_RECOVERABLE(stream_.error)) { - // Recoverable errors are usually malformed parts of the stream. - // We can recover from them by just retrying the decode. - continue; - } - if (stream_.error == MAD_ERROR_BUFLEN) { - // The decoder ran out of bytes before it completed a frame. We - // need to return back to the caller to give us more data. - return {GetBytesUsed(input.size_bytes()), - cpp::fail(Error::kOutOfInput)}; +auto MadMp3Decoder::DecodeTo(cpp::span output) + -> cpp::result { + if (current_sample_ < 0 && !is_eos_) { + if (!is_eof_) { + is_eof_ = buffer_.Refill(input_.get()); + if (is_eof_) { + buffer_.AddBytes([&](cpp::span buf) -> size_t { + if (buf.size() < 8) { + is_eof_ = false; + return 0; + } + ESP_LOGI(kTag, "adding MAD_HEADER_GUARD"); + std::fill_n(buf.begin(), 8, std::byte(0)); + return 8; + }); } - // The error is unrecoverable. Give up. - return {GetBytesUsed(input.size_bytes()), - cpp::fail(Error::kMalformedData)}; } - // We've successfully decoded a frame! Now synthesize samples to write out. - mad_synth_frame(&synth_, &frame_); - current_sample_ = 0; - bytes_read = GetBytesUsed(input.size_bytes()); + buffer_.ConsumeBytes([&](cpp::span buf) -> size_t { + mad_stream_buffer(&stream_, + reinterpret_cast(buf.data()), + buf.size()); + + // Decode the next frame. To signal errors, this returns -1 and + // stashes an error code in the stream structure. + while (mad_frame_decode(&frame_, &stream_) < 0) { + if (MAD_RECOVERABLE(stream_.error)) { + // Recoverable errors are usually malformed parts of the stream. + // We can recover from them by just retrying the decode. + continue; + } + if (stream_.error == MAD_ERROR_BUFLEN) { + if (is_eof_) { + ESP_LOGI(kTag, "BUFLEN while eof; this is eos"); + is_eos_ = true; + } + return GetBytesUsed(); + } + // The error is unrecoverable. Give up. + is_eof_ = true; + is_eos_ = true; + return 0; + } + + // We've successfully decoded a frame! Now synthesize samples to write + // out. + mad_synth_frame(&synth_, &frame_); + current_sample_ = 0; + return GetBytesUsed(); + }); } size_t output_sample = 0; - while (current_sample_ < synth_.pcm.length) { - if (output_sample + synth_.pcm.channels >= output.size()) { - // We can't fit the next full frame into the buffer. - return {bytes_read, OutputInfo{.samples_written = output_sample, - .is_finished_writing = false}}; - } + if (current_sample_ >= 0) { + while (current_sample_ < synth_.pcm.length) { + if (output_sample + synth_.pcm.channels >= output.size()) { + // We can't fit the next full frame into the buffer. + return OutputInfo{.samples_written = output_sample, + .is_stream_finished = false}; + } - for (int channel = 0; channel < synth_.pcm.channels; channel++) { - output[output_sample++] = - sample::FromMad(synth_.pcm.samples[channel][current_sample_]); + for (int channel = 0; channel < synth_.pcm.channels; channel++) { + output[output_sample++] = + sample::FromMad(synth_.pcm.samples[channel][current_sample_]); + } + current_sample_++; } - current_sample_++; } // We wrote everything! Reset, ready for the next frame. current_sample_ = -1; - return {bytes_read, OutputInfo{.samples_written = output_sample, - .is_finished_writing = true}}; + return OutputInfo{.samples_written = output_sample, + .is_stream_finished = is_eos_}; } -auto MadMp3Decoder::SeekStream(cpp::span input, - std::size_t target_sample) -> Result { - mad_stream_buffer(&stream_, - reinterpret_cast(input.data()), - input.size()); - std::size_t current_sample = 0; - std::size_t samples_per_frame = 0; - while (true) { - current_sample += samples_per_frame; - - // First, decode the header for this frame. - mad_header header; - mad_header_init(&header); - while (mad_header_decode(&header, &stream_) < 0) { - if (MAD_RECOVERABLE(stream_.error)) { - // Recoverable errors are usually malformed parts of the stream. - // We can recover from them by just retrying the decode. - continue; - } else { - // Don't bother checking for other errors; if the first part of the - // stream doesn't even contain a header then something's gone wrong. - return {GetBytesUsed(input.size_bytes()), - cpp::fail(Error::kMalformedData)}; - } - } - - // Calculate samples per frame if we haven't already. - if (samples_per_frame == 0) { - samples_per_frame = 32 * MAD_NSBSAMPLES(&header); - } - - // Work out how close we are to the target. - std::size_t samples_to_go = target_sample - current_sample; - std::size_t frames_to_go = samples_to_go / samples_per_frame; - if (frames_to_go > 3) { - // The target is far in the distance. Keep skipping through headers only. - continue; - } - - // The target is within the next few frames. We should decode these, as per - // the LAME FAQ (https://lame.sourceforge.io/tech-FAQ.txt): - // > The MP3 data for frame N is not stored in frame N, but can be spread - // > over several frames. In a typical case, the data for frame N will - // > have 20% of it stored in frame N-1 and 80% stored in frame N. - while (mad_frame_decode(&frame_, &stream_) < 0) { - if (MAD_RECOVERABLE(stream_.error)) { - continue; - } - if (stream_.error == MAD_ERROR_BUFLEN) { - return {GetBytesUsed(input.size_bytes()), - cpp::fail(Error::kOutOfInput)}; - } - // The error is unrecoverable. Give up. - return {GetBytesUsed(input.size_bytes()), - cpp::fail(Error::kMalformedData)}; - } - - if (frames_to_go <= 1) { - // The target is within the next couple of frames. We should start - // synthesizing a frame early because this guy says so: - // https://lists.mars.org/hyperkitty/list/mad-dev@lists.mars.org/message/UZSHXZTIZEF7FZ4KFOR65DUCKAY2OCUT/ - mad_synth_frame(&synth_, &frame_); - } - - if (frames_to_go == 0) { - // The target is actually within this frame! Set up for the ContinueStream - // call. - current_sample_ = - (target_sample > current_sample) ? target_sample - current_sample : 0; - return {GetBytesUsed(input.size_bytes()), {}}; - } - } +auto MadMp3Decoder::SeekTo(std::size_t target_sample) + -> cpp::result { + return {}; } /* diff --git a/src/codecs/opus.cpp b/src/codecs/opus.cpp index a71c5fc0..70ec9e45 100644 --- a/src/codecs/opus.cpp +++ b/src/codecs/opus.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -27,23 +28,49 @@ namespace codecs { static constexpr char kTag[] = "opus"; -int read_cb(void* instance, unsigned char* ptr, int nbytes) { - XiphOpusDecoder* dec = reinterpret_cast(instance); - auto input = dec->ReadCallback(); - size_t amount_to_read = std::min(nbytes, input.size_bytes()); - std::memcpy(ptr, input.data(), amount_to_read); - dec->AfterReadCallback(amount_to_read); - return amount_to_read; +static int read_cb(void* src, unsigned char* ptr, int nbytes) { + IStream* source = reinterpret_cast(src); + return source->Read( + {reinterpret_cast(ptr), static_cast(nbytes)}); +} + +static int seek_cb(void* src, int64_t offset, int whence) { + IStream* source = reinterpret_cast(src); + if (!source->CanSeek()) { + return -1; + } + IStream::SeekFrom from; + switch (whence) { + case SEEK_CUR: + from = IStream::SeekFrom::kCurrentPosition; + break; + case SEEK_END: + from = IStream::SeekFrom::kEndOfStream; + break; + case SEEK_SET: + from = IStream::SeekFrom::kStartOfStream; + break; + default: + return -1; + } + source->SeekTo(offset, from); + return 0; +} + +static int64_t tell_cb(void* src) { + IStream* source = reinterpret_cast(src); + return source->CurrentPosition(); } static const OpusFileCallbacks kCallbacks{ .read = read_cb, - .seek = NULL, - .tell = NULL, // Not seekable + .seek = seek_cb, + .tell = tell_cb, .close = NULL, }; -XiphOpusDecoder::XiphOpusDecoder() : opus_(nullptr) {} +XiphOpusDecoder::XiphOpusDecoder() + : input_(nullptr), opus_(nullptr), num_channels_() {} XiphOpusDecoder::~XiphOpusDecoder() { if (opus_ != nullptr) { @@ -51,12 +78,12 @@ XiphOpusDecoder::~XiphOpusDecoder() { } } -auto XiphOpusDecoder::BeginStream(const cpp::span input) - -> Result { +auto XiphOpusDecoder::OpenStream(std::shared_ptr input) + -> cpp::result { + input_ = input; + int res; - opus_ = op_open_callbacks( - this, &kCallbacks, reinterpret_cast(input.data()), - input.size(), &res); + opus_ = op_open_callbacks(input.get(), &kCallbacks, nullptr, 0, &res); if (res < 0) { std::string err; @@ -64,60 +91,72 @@ auto XiphOpusDecoder::BeginStream(const cpp::span input) case OP_EREAD: err = "OP_EREAD"; break; + case OP_EFAULT: + err = "OP_EFAULT"; + break; + case OP_EIMPL: + err = "OP_EIMPL"; + break; + case OP_EINVAL: + err = "OP_EINVAL"; + break; + case OP_ENOTFORMAT: + err = "OP_ENOTFORMAT"; + break; + case OP_EBADHEADER: + err = "OP_EBADHEADER"; + break; + case OP_EVERSION: + err = "OP_EVERSION"; + break; + case OP_EBADLINK: + err = "OP_EBADLINK"; + break; + case OP_EBADTIMESTAMP: + err = "OP_BADTIMESTAMP"; + break; default: err = "unknown"; } ESP_LOGE(kTag, "error beginning stream: %s", err.c_str()); - return {input.size(), cpp::fail(Error::kMalformedData)}; + return cpp::fail(Error::kMalformedData); } - return {input.size(), OutputFormat{ - .num_channels = 2, - .sample_rate_hz = 48000, - }}; + num_channels_ = std::min(2, op_channel_count(opus_, -1)); + + return OutputFormat{ + .num_channels = num_channels_, + .sample_rate_hz = 48000, + }; } -auto XiphOpusDecoder::ContinueStream(cpp::span input, - cpp::span output) - -> Result { +auto XiphOpusDecoder::DecodeTo(cpp::span output) + -> cpp::result { cpp::span staging_buffer{ reinterpret_cast(output.subspan(output.size() / 2).data()), output.size_bytes() / 2}; - input_ = input; - pos_in_input_ = 0; - - int bytes_written = + int samples_written = op_read_stereo(opus_, staging_buffer.data(), staging_buffer.size()); - if (bytes_written < 0) { - ESP_LOGE(kTag, "read failed %i", bytes_written); - return {pos_in_input_, cpp::fail(Error::kMalformedData)}; - } else if (bytes_written == 0) { - return {pos_in_input_, cpp::fail(Error::kOutOfInput)}; + + if (samples_written < 0) { + ESP_LOGE(kTag, "read failed %i", samples_written); + return cpp::fail(Error::kMalformedData); } - for (int i = 0; i < bytes_written / 2; i++) { + samples_written *= num_channels_; + for (int i = 0; i < samples_written; i++) { output[i] = sample::FromSigned(staging_buffer[i], 16); } - return {pos_in_input_, - OutputInfo{ - .samples_written = static_cast(bytes_written / 2), - .is_finished_writing = bytes_written == 0, - }}; + return OutputInfo{ + .samples_written = static_cast(samples_written / 2), + .is_stream_finished = samples_written == 0, + }; } -auto XiphOpusDecoder::SeekStream(cpp::span input, - std::size_t target_sample) -> Result { +auto XiphOpusDecoder::SeekTo(size_t target) -> cpp::result { return {}; } -auto XiphOpusDecoder::ReadCallback() -> cpp::span { - return input_.subspan(pos_in_input_); -} - -auto XiphOpusDecoder::AfterReadCallback(size_t bytes_read) -> void { - pos_in_input_ += bytes_read; -} - } // namespace codecs diff --git a/src/codecs/source_buffer.cpp b/src/codecs/source_buffer.cpp new file mode 100644 index 00000000..5955523e --- /dev/null +++ b/src/codecs/source_buffer.cpp @@ -0,0 +1,75 @@ +/* + * Copyright 2023 jacqueline + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#include "source_buffer.hpp" +#include + +#include +#include + +#include "esp_heap_caps.h" +#include "esp_log.h" + +#include "codec.hpp" + +namespace codecs { + +static constexpr char kTag[] = "dec_buf"; +static constexpr size_t kBufferSize = 1024 * 8; + +SourceBuffer::SourceBuffer() + : buffer_(reinterpret_cast( + heap_caps_malloc(kBufferSize, MALLOC_CAP_SPIRAM)), + kBufferSize), + bytes_in_buffer_(0), + offset_of_bytes_(0) { + assert(buffer_.data() != nullptr); +} + +SourceBuffer::~SourceBuffer() { + free(buffer_.data()); +} + +auto SourceBuffer::Refill(IStream* src) -> bool { + if (bytes_in_buffer_ == buffer_.size_bytes()) { + return false; + } + bool eof = false; + AddBytes([&](cpp::span buf) -> size_t { + size_t bytes_read = src->Read(buf); + eof = bytes_read == 0; + return bytes_read; + }); + return eof; +} + +auto SourceBuffer::AddBytes(std::function)> writer) + -> void { + if (offset_of_bytes_ > 0) { + std::memmove(buffer_.data(), buffer_.data() + offset_of_bytes_, + bytes_in_buffer_); + offset_of_bytes_ = 0; + } + size_t added_bytes = std::invoke(writer, buffer_.subspan(bytes_in_buffer_)); + assert(bytes_in_buffer_ + added_bytes <= buffer_.size_bytes()); + bytes_in_buffer_ += added_bytes; +} + +auto SourceBuffer::ConsumeBytes( + std::function)> reader) -> void { + size_t bytes_consumed = std::invoke( + reader, buffer_.subspan(offset_of_bytes_).first(bytes_in_buffer_)); + assert(bytes_consumed <= bytes_in_buffer_); + + bytes_in_buffer_ -= bytes_consumed; + if (bytes_in_buffer_ == 0) { + offset_of_bytes_ = 0; + } else { + offset_of_bytes_ += bytes_consumed; + } +} + +} // namespace codecs diff --git a/src/codecs/vorbis.cpp b/src/codecs/vorbis.cpp index 88ffbec4..6fa3256a 100644 --- a/src/codecs/vorbis.cpp +++ b/src/codecs/vorbis.cpp @@ -34,43 +34,59 @@ namespace codecs { static constexpr char kTag[] = "vorbis"; -size_t read_cb(void* ptr, size_t size, size_t nmemb, void* instance) { - TremorVorbisDecoder* dec = reinterpret_cast(instance); - auto input = dec->ReadCallback(); - size_t amount_to_read = std::min(size * nmemb, input.size_bytes()); - std::memcpy(ptr, input.data(), amount_to_read); - dec->AfterReadCallback(amount_to_read); - return amount_to_read; +static size_t read_cb(void* ptr, size_t size, size_t nmemb, void* instance) { + IStream* source = reinterpret_cast(instance); + return source->Read({reinterpret_cast(ptr), size * nmemb}); } -int seek_cb(void* instance, ogg_int64_t offset, int whence) { - // Seeking is handled separately. - return -1; +static int seek_cb(void* instance, ogg_int64_t offset, int whence) { + IStream* source = reinterpret_cast(instance); + if (!source->CanSeek()) { + return -1; + } + IStream::SeekFrom from; + switch (whence) { + case SEEK_CUR: + from = IStream::SeekFrom::kCurrentPosition; + break; + case SEEK_END: + from = IStream::SeekFrom::kEndOfStream; + break; + case SEEK_SET: + from = IStream::SeekFrom::kStartOfStream; + break; + default: + return -1; + } + source->SeekTo(offset, from); + return 0; } -int close_cb(void* instance) { +static int close_cb(void* src) { return 0; } +static long tell_cb(void* src) { + IStream* source = reinterpret_cast(src); + return source->CurrentPosition(); +} + static const ov_callbacks kCallbacks{ .read_func = read_cb, .seek_func = seek_cb, .close_func = close_cb, - .tell_func = NULL, // Not seekable + .tell_func = tell_cb, // Not seekable }; -TremorVorbisDecoder::TremorVorbisDecoder() - : vorbis_(), input_(), pos_in_input_(0) {} +TremorVorbisDecoder::TremorVorbisDecoder() : input_(), vorbis_() {} TremorVorbisDecoder::~TremorVorbisDecoder() { ov_clear(&vorbis_); } -auto TremorVorbisDecoder::BeginStream(const cpp::span input) - -> Result { - int res = ov_open_callbacks(this, &vorbis_, - reinterpret_cast(input.data()), - input.size(), kCallbacks); +auto TremorVorbisDecoder::OpenStream(std::shared_ptr input) + -> cpp::result { + int res = ov_open_callbacks(input.get(), &vorbis_, NULL, 0, kCallbacks); if (res < 0) { std::string err; switch (res) { @@ -93,70 +109,51 @@ auto TremorVorbisDecoder::BeginStream(const cpp::span input) err = "unknown"; } ESP_LOGE(kTag, "error beginning stream: %s", err.c_str()); - return {input.size(), cpp::fail(Error::kMalformedData)}; + return cpp::fail(Error::kMalformedData); } vorbis_info* info = ov_info(&vorbis_, -1); if (info == NULL) { ESP_LOGE(kTag, "failed to get stream info"); - return {input.size(), cpp::fail(Error::kMalformedData)}; + return cpp::fail(Error::kMalformedData); } - return {input.size(), - OutputFormat{ - .num_channels = static_cast(info->channels), - .sample_rate_hz = static_cast(info->rate), - .bits_per_second = info->bitrate_nominal, - }}; + return OutputFormat{ + .num_channels = static_cast(info->channels), + .sample_rate_hz = static_cast(info->rate), + }; } -auto TremorVorbisDecoder::ContinueStream(cpp::span input, - cpp::span output) - -> Result { +auto TremorVorbisDecoder::DecodeTo(cpp::span output) + -> cpp::result { cpp::span staging_buffer{ reinterpret_cast(output.subspan(output.size() / 2).data()), output.size_bytes() / 2}; - input_ = input; - pos_in_input_ = 0; - int bitstream; long bytes_written = ov_read(&vorbis_, reinterpret_cast(staging_buffer.data()), staging_buffer.size_bytes(), &bitstream); if (bytes_written == OV_HOLE) { ESP_LOGE(kTag, "got OV_HOLE"); - return {pos_in_input_, cpp::fail(Error::kMalformedData)}; + return cpp::fail(Error::kMalformedData); } else if (bytes_written == OV_EBADLINK) { ESP_LOGE(kTag, "got OV_EBADLINK"); - return {pos_in_input_, cpp::fail(Error::kMalformedData)}; - } else if (bytes_written == 0) { - return {pos_in_input_, cpp::fail(Error::kOutOfInput)}; + return cpp::fail(Error::kMalformedData); } for (int i = 0; i < bytes_written / 2; i++) { output[i] = sample::FromSigned(staging_buffer[i], 16); } - return {pos_in_input_, - OutputInfo{ - .samples_written = static_cast(bytes_written / 2), - .is_finished_writing = bytes_written == 0, - }}; + return OutputInfo{ + .samples_written = static_cast(bytes_written / 2), + .is_stream_finished = bytes_written == 0, + }; } -auto TremorVorbisDecoder::SeekStream(cpp::span input, - std::size_t target_sample) - -> Result { +auto TremorVorbisDecoder::SeekTo(size_t target) -> cpp::result { return {}; } -auto TremorVorbisDecoder::ReadCallback() -> cpp::span { - return input_.subspan(pos_in_input_); -} - -auto TremorVorbisDecoder::AfterReadCallback(size_t bytes_read) -> void { - pos_in_input_ += bytes_read; -} - } // namespace codecs diff --git a/src/database/database.cpp b/src/database/database.cpp index 61e292ff..e6341e43 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -173,7 +173,7 @@ auto Database::Update() -> std::future { TrackTags tags{}; if (!tag_parser_->ReadAndParseTags(track->filepath(), &tags) || - tags.encoding() == Encoding::kUnsupported) { + tags.encoding() == Container::kUnsupported) { // We couldn't read the tags for this track. Either they were // malformed, or perhaps the file is missing. Either way, tombstone // this record. @@ -209,7 +209,7 @@ auto Database::Update() -> std::future { file_gatherer_->FindFiles("", [&](const std::string& path) { TrackTags tags; if (!tag_parser_->ReadAndParseTags(path, &tags) || - tags.encoding() == Encoding::kUnsupported) { + tags.encoding() == Container::kUnsupported) { // No parseable tags; skip this fiile. return; } diff --git a/src/database/include/track.hpp b/src/database/include/track.hpp index d4f01b71..41f552d2 100644 --- a/src/database/include/track.hpp +++ b/src/database/include/track.hpp @@ -37,7 +37,7 @@ typedef uint32_t TrackId; * Values of this enum are persisted in this database, so it is probably never a * good idea to change the int representation of an existing value. */ -enum class Encoding { +enum class Container { kUnsupported = 0, kMp3 = 1, kWav = 2, @@ -61,10 +61,10 @@ enum class Tag { */ class TrackTags { public: - auto encoding() const -> Encoding { return encoding_; }; - auto encoding(Encoding e) -> void { encoding_ = e; }; + auto encoding() const -> Container { return encoding_; }; + auto encoding(Container e) -> void { encoding_ = e; }; - TrackTags() : encoding_(Encoding::kUnsupported) {} + TrackTags() : encoding_(Container::kUnsupported) {} std::optional channels; std::optional sample_rate; @@ -89,7 +89,7 @@ class TrackTags { TrackTags(const TrackTags&) = default; private: - Encoding encoding_; + Container encoding_; std::unordered_map tags_; }; diff --git a/src/database/tag_parser.cpp b/src/database/tag_parser.cpp index 2f1fe337..fc6c95f2 100644 --- a/src/database/tag_parser.cpp +++ b/src/database/tag_parser.cpp @@ -142,22 +142,22 @@ auto TagParserImpl::ReadAndParseTags(const std::string& path, TrackTags* out) switch (ctx.format) { case Fmp3: - out->encoding(Encoding::kMp3); + out->encoding(Container::kMp3); break; case Fogg: - out->encoding(Encoding::kOgg); + out->encoding(Container::kOgg); break; case Fflac: - out->encoding(Encoding::kFlac); + out->encoding(Container::kFlac); break; case Fwav: - out->encoding(Encoding::kWav); + out->encoding(Container::kWav); break; case Fopus: - out->encoding(Encoding::kOpus); + out->encoding(Container::kOpus); break; default: - out->encoding(Encoding::kUnsupported); + out->encoding(Container::kUnsupported); } if (ctx.channels > 0) { diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp index 34c690f3..ce5f17dc 100644 --- a/src/tasks/tasks.cpp +++ b/src/tasks/tasks.cpp @@ -53,8 +53,8 @@ auto AllocateStack() -> cpp::span; // amount of stack space. template <> auto AllocateStack() -> cpp::span { - std::size_t size = 48 * 1024; - return {static_cast(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)), + std::size_t size = 64 * 1024; + return {static_cast(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)), size}; } // LVGL requires only a relatively small stack. However, it can be allocated in