Improve decoder's interface to accept streams

custom
jacqueline 12 months ago
parent 344a46d066
commit b242ba9986
  1. 151
      src/tangara/audio/audio_decoder.cpp
  2. 36
      src/tangara/audio/audio_decoder.hpp
  3. 12
      src/tangara/audio/audio_events.hpp
  4. 54
      src/tangara/audio/audio_fsm.cpp
  5. 10
      src/tangara/audio/audio_fsm.hpp
  6. 163
      src/tangara/audio/fatfs_audio_input.cpp
  7. 66
      src/tangara/audio/fatfs_audio_input.hpp
  8. 104
      src/tangara/audio/fatfs_stream_factory.cpp
  9. 53
      src/tangara/audio/fatfs_stream_factory.hpp
  10. 27
      src/tangara/audio/processor.cpp
  11. 6
      src/tangara/audio/processor.hpp

@ -6,7 +6,7 @@
#include "audio/audio_decoder.hpp" #include "audio/audio_decoder.hpp"
#include <algorithm> #include <cassert>
#include <cmath> #include <cmath>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
@ -23,14 +23,12 @@
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
#include "freertos/projdefs.h" #include "freertos/projdefs.h"
#include "freertos/queue.h" #include "freertos/queue.h"
#include "freertos/ringbuf.h"
#include "audio/audio_converter.hpp"
#include "audio/audio_events.hpp" #include "audio/audio_events.hpp"
#include "audio/audio_fsm.hpp" #include "audio/audio_fsm.hpp"
#include "audio/audio_sink.hpp" #include "audio/audio_sink.hpp"
#include "audio/audio_source.hpp" #include "audio/audio_source.hpp"
#include "audio/fatfs_audio_input.hpp" #include "audio/processor.hpp"
#include "codec.hpp" #include "codec.hpp"
#include "database/track.hpp" #include "database/track.hpp"
#include "drivers/i2s_dac.hpp" #include "drivers/i2s_dac.hpp"
@ -42,21 +40,33 @@
namespace audio { namespace audio {
[[maybe_unused]] static const char* kTag = "audio_dec"; static const char* kTag = "decoder";
/*
* The size of the buffer used for holding decoded samples. This buffer is
* allocated in internal memory for greater speed, so be careful when
* increasing its size.
*/
static constexpr std::size_t kCodecBufferLength = static constexpr std::size_t kCodecBufferLength =
drivers::kI2SBufferLengthFrames * sizeof(sample::Sample); drivers::kI2SBufferLengthFrames * sizeof(sample::Sample);
auto Decoder::Start(std::shared_ptr<IAudioSource> source, auto Decoder::Start(std::shared_ptr<SampleProcessor> sink) -> Decoder* {
std::shared_ptr<SampleConverter> sink) -> Decoder* { Decoder* task = new Decoder(sink);
Decoder* task = new Decoder(source, sink);
tasks::StartPersistent<tasks::Type::kAudioDecoder>([=]() { task->Main(); }); tasks::StartPersistent<tasks::Type::kAudioDecoder>([=]() { task->Main(); });
return task; return task;
} }
Decoder::Decoder(std::shared_ptr<IAudioSource> source, auto Decoder::open(std::shared_ptr<TaggedStream> stream) -> void {
std::shared_ptr<SampleConverter> mixer) NextStream* next = new NextStream();
: source_(source), converter_(mixer), codec_(), current_format_() { next->stream = stream;
// The decoder services its queue very quickly, so blocking on this write
// should be fine. If we discover contention here, then adding more space for
// items to next_stream_ should be fine too.
xQueueSend(next_stream_, &next, portMAX_DELAY);
}
Decoder::Decoder(std::shared_ptr<SampleProcessor> processor)
: processor_(processor), next_stream_(xQueueCreate(1, sizeof(void*))) {
ESP_LOGI(kTag, "allocating codec buffer, %u KiB", kCodecBufferLength / 1024); ESP_LOGI(kTag, "allocating codec buffer, %u KiB", kCodecBufferLength / 1024);
codec_buffer_ = { codec_buffer_ = {
reinterpret_cast<sample::Sample*>(heap_caps_calloc( reinterpret_cast<sample::Sample*>(heap_caps_calloc(
@ -64,81 +74,122 @@ Decoder::Decoder(std::shared_ptr<IAudioSource> source,
kCodecBufferLength}; kCodecBufferLength};
} }
/*
* Main decoding loop. Handles watching for new streams, or continuing to nudge
* along the current stream if we have one.
*/
void Decoder::Main() { void Decoder::Main() {
for (;;) { for (;;) {
if (source_->HasNewStream() || !stream_) { // Check whether there's a new stream to begin. If we're idle, then we
std::shared_ptr<TaggedStream> new_stream = source_->NextStream(); // simply park and wait forever for a stream to arrive.
if (new_stream && BeginDecoding(new_stream)) { TickType_t wait_time = stream_ ? 0 : portMAX_DELAY;
stream_ = new_stream; NextStream* next;
} else { if (xQueueReceive(next_stream_, &next, wait_time)) {
// Copy the data out of the queue, then clean up the item.
std::shared_ptr<TaggedStream> new_stream = next->stream;
delete next;
// If we were already decoding, then make sure we finish up the current
// file gracefully.
if (stream_) {
finishDecode();
}
// Ensure there's actually stream data; we might have been given nullptr
// as a signal to stop.
if (!new_stream) {
continue; continue;
} }
// Start decoding the new stream.
prepareDecode(new_stream);
} }
if (ContinueDecoding()) { if (!continueDecode()) {
stream_.reset(); finishDecode();
} }
} }
} }
auto Decoder::BeginDecoding(std::shared_ptr<TaggedStream> stream) -> bool { auto Decoder::prepareDecode(std::shared_ptr<TaggedStream> stream) -> void {
// Ensure any previous codec is freed before creating a new one. auto stub_track = std::make_shared<TrackInfo>(TrackInfo{
codec_.reset(); .tags = stream->tags(),
.uri = stream->Filepath(),
.duration = {},
.start_offset = {},
.bitrate_kbps = {},
.encoding = stream->type(),
.format = {},
});
codec_.reset(codecs::CreateCodecForType(stream->type()).value_or(nullptr)); codec_.reset(codecs::CreateCodecForType(stream->type()).value_or(nullptr));
if (!codec_) { if (!codec_) {
ESP_LOGE(kTag, "no codec found for stream"); ESP_LOGE(kTag, "no codec found for stream");
return false; events::Audio().Dispatch(
internal::DecodingFailedToStart{.track = stub_track});
return;
} }
auto open_res = codec_->OpenStream(stream, stream->Offset()); auto open_res = codec_->OpenStream(stream, stream->Offset());
if (open_res.has_error()) { if (open_res.has_error()) {
ESP_LOGE(kTag, "codec failed to start: %s", ESP_LOGE(kTag, "codec failed to start: %s",
codecs::ICodec::ErrorString(open_res.error()).c_str()); codecs::ICodec::ErrorString(open_res.error()).c_str());
return false; events::Audio().Dispatch(
} internal::DecodingFailedToStart{.track = stub_track});
stream->SetPreambleFinished(); return;
current_sink_format_ = IAudioOutput::Format{
.sample_rate = open_res->sample_rate_hz,
.num_channels = open_res->num_channels,
.bits_per_sample = 16,
};
std::optional<uint32_t> duration;
if (open_res->total_samples) {
duration = open_res->total_samples.value() / open_res->num_channels /
open_res->sample_rate_hz;
} }
converter_->beginStream(std::make_shared<TrackInfo>(TrackInfo{ // Decoding started okay! Fill out the rest of the track info for this
// stream.
stream_ = stream;
track_ = std::make_shared<TrackInfo>(TrackInfo{
.tags = stream->tags(), .tags = stream->tags(),
.uri = stream->Filepath(), .uri = stream->Filepath(),
.duration = duration, .duration = {},
.start_offset = stream->Offset(), .start_offset = stream->Offset(),
.bitrate_kbps = open_res->sample_rate_hz, .bitrate_kbps = {},
.encoding = stream->type(), .encoding = stream->type(),
.format = *current_sink_format_, .format =
})); {
.sample_rate = open_res->sample_rate_hz,
.num_channels = open_res->num_channels,
.bits_per_sample = 16,
},
});
if (open_res->total_samples) {
track_->duration = open_res->total_samples.value() /
open_res->num_channels / open_res->sample_rate_hz;
}
return true; events::Audio().Dispatch(internal::DecodingStarted{.track = track_});
processor_->beginStream(track_);
} }
auto Decoder::ContinueDecoding() -> bool { auto Decoder::continueDecode() -> bool {
auto res = codec_->DecodeTo(codec_buffer_); auto res = codec_->DecodeTo(codec_buffer_);
if (res.has_error()) { if (res.has_error()) {
converter_->endStream(); return false;
return true;
} }
if (res->samples_written > 0) { if (res->samples_written > 0) {
converter_->continueStream(codec_buffer_.first(res->samples_written)); processor_->continueStream(codec_buffer_.first(res->samples_written));
} }
if (res->is_stream_finished) { return !res->is_stream_finished;
converter_->endStream(); }
codec_.reset();
}
return res->is_stream_finished; auto Decoder::finishDecode() -> void {
assert(track_);
// Tell everyone we're finished.
events::Audio().Dispatch(internal::DecodingFinished{.track = track_});
processor_->endStream();
// Clean up after ourselves.
stream_.reset();
codec_.reset();
track_.reset();
} }
} // namespace audio } // namespace audio

@ -9,10 +9,10 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include "audio/audio_converter.hpp"
#include "audio/audio_events.hpp" #include "audio/audio_events.hpp"
#include "audio/audio_sink.hpp" #include "audio/audio_sink.hpp"
#include "audio/audio_source.hpp" #include "audio/audio_source.hpp"
#include "audio/processor.hpp"
#include "codec.hpp" #include "codec.hpp"
#include "database/track.hpp" #include "database/track.hpp"
#include "types.hpp" #include "types.hpp"
@ -20,35 +20,39 @@
namespace audio { namespace audio {
/* /*
* Handle to a persistent task that takes bytes from the given source, decodes * Handle to a persistent task that takes encoded bytes from arbitrary sources,
* them into sample::Sample (normalised to 16 bit signed PCM), and then * decodes them into sample::Sample (normalised to 16 bit signed PCM), and then
* forwards the resulting stream to the given converter. * streams them onward to the sample processor.
*/ */
class Decoder { class Decoder {
public: public:
static auto Start(std::shared_ptr<IAudioSource> source, static auto Start(std::shared_ptr<SampleProcessor>) -> Decoder*;
std::shared_ptr<SampleConverter> converter) -> Decoder*;
auto Main() -> void; auto open(std::shared_ptr<TaggedStream>) -> void;
Decoder(const Decoder&) = delete; Decoder(const Decoder&) = delete;
Decoder& operator=(const Decoder&) = delete; Decoder& operator=(const Decoder&) = delete;
private: private:
Decoder(std::shared_ptr<IAudioSource> source, Decoder(std::shared_ptr<SampleProcessor>);
std::shared_ptr<SampleConverter> converter);
auto Main() -> void;
auto BeginDecoding(std::shared_ptr<TaggedStream>) -> bool; auto prepareDecode(std::shared_ptr<TaggedStream>) -> void;
auto ContinueDecoding() -> bool; auto continueDecode() -> bool;
auto finishDecode() -> void;
std::shared_ptr<IAudioSource> source_; std::shared_ptr<SampleProcessor> processor_;
std::shared_ptr<SampleConverter> converter_;
// Struct used with the next_stream_ queue.
struct NextStream {
std::shared_ptr<TaggedStream> stream;
};
QueueHandle_t next_stream_;
std::shared_ptr<codecs::IStream> stream_; std::shared_ptr<codecs::IStream> stream_;
std::unique_ptr<codecs::ICodec> codec_; std::unique_ptr<codecs::ICodec> codec_;
std::shared_ptr<TrackInfo> track_;
std::optional<codecs::ICodec::OutputFormat> current_format_;
std::optional<IAudioOutput::Format> current_sink_format_;
std::span<sample::Sample> codec_buffer_; std::span<sample::Sample> codec_buffer_;
}; };

@ -138,6 +138,18 @@ struct OutputModeChanged : tinyfsm::Event {};
namespace internal { namespace internal {
struct DecodingStarted : tinyfsm::Event {
std::shared_ptr<TrackInfo> track;
};
struct DecodingFailedToStart : tinyfsm::Event {
std::shared_ptr<TrackInfo> track;
};
struct DecodingFinished : tinyfsm::Event {
std::shared_ptr<TrackInfo> track;
};
struct StreamStarted : tinyfsm::Event { struct StreamStarted : tinyfsm::Event {
std::shared_ptr<TrackInfo> track; std::shared_ptr<TrackInfo> track;
IAudioOutput::Format src_format; IAudioOutput::Format src_format;

@ -11,29 +11,28 @@
#include <memory> #include <memory>
#include <variant> #include <variant>
#include "audio/audio_sink.hpp"
#include "cppbor.h" #include "cppbor.h"
#include "cppbor_parse.h" #include "cppbor_parse.h"
#include "drivers/bluetooth_types.hpp"
#include "drivers/storage.hpp"
#include "esp_heap_caps.h" #include "esp_heap_caps.h"
#include "esp_log.h" #include "esp_log.h"
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
#include "freertos/projdefs.h" #include "freertos/projdefs.h"
#include "audio/audio_converter.hpp"
#include "audio/audio_decoder.hpp" #include "audio/audio_decoder.hpp"
#include "audio/audio_events.hpp" #include "audio/audio_events.hpp"
#include "audio/audio_sink.hpp"
#include "audio/bt_audio_output.hpp" #include "audio/bt_audio_output.hpp"
#include "audio/fatfs_audio_input.hpp" #include "audio/fatfs_stream_factory.hpp"
#include "audio/i2s_audio_output.hpp" #include "audio/i2s_audio_output.hpp"
#include "audio/track_queue.hpp" #include "audio/track_queue.hpp"
#include "database/future_fetcher.hpp" #include "database/future_fetcher.hpp"
#include "database/track.hpp" #include "database/track.hpp"
#include "drivers/bluetooth.hpp" #include "drivers/bluetooth.hpp"
#include "drivers/bluetooth_types.hpp"
#include "drivers/i2s_dac.hpp" #include "drivers/i2s_dac.hpp"
#include "drivers/nvs.hpp" #include "drivers/nvs.hpp"
#include "drivers/storage.hpp"
#include "drivers/wm8523.hpp" #include "drivers/wm8523.hpp"
#include "events/event_queue.hpp" #include "events/event_queue.hpp"
#include "sample.hpp" #include "sample.hpp"
@ -47,9 +46,9 @@ namespace audio {
std::shared_ptr<system_fsm::ServiceLocator> AudioState::sServices; std::shared_ptr<system_fsm::ServiceLocator> AudioState::sServices;
std::shared_ptr<FatfsAudioInput> AudioState::sFileSource; std::shared_ptr<FatfsStreamFactory> AudioState::sStreamFactory;
std::unique_ptr<Decoder> AudioState::sDecoder; std::unique_ptr<Decoder> AudioState::sDecoder;
std::shared_ptr<SampleConverter> AudioState::sSampleConverter; std::shared_ptr<SampleProcessor> AudioState::sSampleProcessor;
std::shared_ptr<I2SAudioOutput> AudioState::sI2SOutput; std::shared_ptr<I2SAudioOutput> AudioState::sI2SOutput;
std::shared_ptr<BluetoothAudioOutput> AudioState::sBtOutput; std::shared_ptr<BluetoothAudioOutput> AudioState::sBtOutput;
std::shared_ptr<IAudioOutput> AudioState::sOutput; std::shared_ptr<IAudioOutput> AudioState::sOutput;
@ -143,7 +142,7 @@ void AudioState::react(const SetTrack& ev) {
if (std::holds_alternative<std::monostate>(ev.new_track)) { if (std::holds_alternative<std::monostate>(ev.new_track)) {
ESP_LOGI(kTag, "playback finished, awaiting drain"); ESP_LOGI(kTag, "playback finished, awaiting drain");
sFileSource->SetPath(); sDecoder->open({});
awaitEmptyDrainBuffer(); awaitEmptyDrainBuffer();
sCurrentTrack.reset(); sCurrentTrack.reset();
sDrainFormat.reset(); sDrainFormat.reset();
@ -158,26 +157,20 @@ void AudioState::react(const SetTrack& ev) {
auto new_track = ev.new_track; auto new_track = ev.new_track;
uint32_t seek_to = ev.seek_to_second.value_or(0); uint32_t seek_to = ev.seek_to_second.value_or(0);
sServices->bg_worker().Dispatch<void>([=]() { sServices->bg_worker().Dispatch<void>([=]() {
std::optional<std::string> path; std::shared_ptr<TaggedStream> stream;
if (std::holds_alternative<database::TrackId>(new_track)) { if (std::holds_alternative<database::TrackId>(new_track)) {
auto db = sServices->database().lock(); stream = sStreamFactory->create(std::get<database::TrackId>(new_track),
if (db) { seek_to);
path = db->getTrackPath(std::get<database::TrackId>(new_track));
}
} else if (std::holds_alternative<std::string>(new_track)) { } else if (std::holds_alternative<std::string>(new_track)) {
path = std::get<std::string>(new_track); stream =
sStreamFactory->create(std::get<std::string>(new_track), seek_to);
} }
if (path) { // This was a seek or replay within the same track; don't forget where
if (*path == prev_uri) { // the track originally came from.
// This was a seek or replay within the same track; don't forget where // FIXME:
// the track originally came from. // sNextTrackIsFromQueue = prev_from_queue;
sNextTrackIsFromQueue = prev_from_queue; sDecoder->open(stream);
}
sFileSource->SetPath(*path, seek_to);
} else {
sFileSource->SetPath();
}
}); });
} }
@ -350,7 +343,7 @@ void AudioState::react(const OutputModeChanged& ev) {
break; break;
} }
sOutput->mode(IAudioOutput::Modes::kOnPaused); sOutput->mode(IAudioOutput::Modes::kOnPaused);
sSampleConverter->SetOutput(sOutput); sSampleProcessor->SetOutput(sOutput);
// Bluetooth volume isn't 'changed' until we've connected to a device. // Bluetooth volume isn't 'changed' until we've connected to a device.
if (new_mode == drivers::NvsStorage::Output::kHeadphones) { if (new_mode == drivers::NvsStorage::Output::kHeadphones) {
@ -365,7 +358,7 @@ auto AudioState::clearDrainBuffer() -> void {
// Tell the decoder to stop adding new samples. This might not take effect // Tell the decoder to stop adding new samples. This might not take effect
// immediately, since the decoder might currently be stuck waiting for space // immediately, since the decoder might currently be stuck waiting for space
// to become available in the drain buffer. // to become available in the drain buffer.
sFileSource->SetPath(); sDecoder->open({});
auto mode = sOutput->mode(); auto mode = sOutput->mode();
if (mode == IAudioOutput::Modes::kOnPlaying) { if (mode == IAudioOutput::Modes::kOnPlaying) {
@ -428,8 +421,7 @@ void Uninitialised::react(const system_fsm::BootComplete& ev) {
sDrainBuffer = xStreamBufferCreateStatic( sDrainBuffer = xStreamBufferCreateStatic(
kDrainBufferSize, sizeof(sample::Sample), storage, meta); kDrainBufferSize, sizeof(sample::Sample), storage, meta);
sFileSource.reset( sStreamFactory.reset(new FatfsStreamFactory(*sServices));
new FatfsAudioInput(sServices->tag_parser(), sServices->bg_worker()));
sI2SOutput.reset(new I2SAudioOutput(sDrainBuffer, sServices->gpios())); sI2SOutput.reset(new I2SAudioOutput(sDrainBuffer, sServices->gpios()));
sBtOutput.reset(new BluetoothAudioOutput(sDrainBuffer, sServices->bluetooth(), sBtOutput.reset(new BluetoothAudioOutput(sDrainBuffer, sServices->bluetooth(),
sServices->bg_worker())); sServices->bg_worker()));
@ -463,10 +455,10 @@ void Uninitialised::react(const system_fsm::BootComplete& ev) {
.left_bias = nvs.AmpLeftBias(), .left_bias = nvs.AmpLeftBias(),
}); });
sSampleConverter.reset(new SampleConverter()); sSampleProcessor.reset(new SampleProcessor());
sSampleConverter->SetOutput(sOutput); sSampleProcessor->SetOutput(sOutput);
Decoder::Start(sFileSource, sSampleConverter); sDecoder.reset(Decoder::Start(sSampleProcessor));
transit<Standby>(); transit<Standby>();
} }

@ -11,14 +11,13 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "audio/audio_sink.hpp"
#include "system_fsm/service_locator.hpp"
#include "tinyfsm.hpp" #include "tinyfsm.hpp"
#include "audio/audio_decoder.hpp" #include "audio/audio_decoder.hpp"
#include "audio/audio_events.hpp" #include "audio/audio_events.hpp"
#include "audio/audio_sink.hpp"
#include "audio/bt_audio_output.hpp" #include "audio/bt_audio_output.hpp"
#include "audio/fatfs_audio_input.hpp" #include "audio/fatfs_stream_factory.hpp"
#include "audio/i2s_audio_output.hpp" #include "audio/i2s_audio_output.hpp"
#include "audio/track_queue.hpp" #include "audio/track_queue.hpp"
#include "database/database.hpp" #include "database/database.hpp"
@ -28,6 +27,7 @@
#include "drivers/gpios.hpp" #include "drivers/gpios.hpp"
#include "drivers/i2s_dac.hpp" #include "drivers/i2s_dac.hpp"
#include "drivers/storage.hpp" #include "drivers/storage.hpp"
#include "system_fsm/service_locator.hpp"
#include "system_fsm/system_events.hpp" #include "system_fsm/system_events.hpp"
namespace audio { namespace audio {
@ -74,9 +74,9 @@ class AudioState : public tinyfsm::Fsm<AudioState> {
static std::shared_ptr<system_fsm::ServiceLocator> sServices; static std::shared_ptr<system_fsm::ServiceLocator> sServices;
static std::shared_ptr<FatfsAudioInput> sFileSource; static std::shared_ptr<FatfsStreamFactory> sStreamFactory;
static std::unique_ptr<Decoder> sDecoder; static std::unique_ptr<Decoder> sDecoder;
static std::shared_ptr<SampleConverter> sSampleConverter; static std::shared_ptr<SampleProcessor> sSampleProcessor;
static std::shared_ptr<I2SAudioOutput> sI2SOutput; static std::shared_ptr<I2SAudioOutput> sI2SOutput;
static std::shared_ptr<BluetoothAudioOutput> sBtOutput; static std::shared_ptr<BluetoothAudioOutput> sBtOutput;
static std::shared_ptr<IAudioOutput> sOutput; static std::shared_ptr<IAudioOutput> sOutput;

@ -1,163 +0,0 @@
/*
* Copyright 2023 jacqueline <me@jacqueline.id.au>
*
* SPDX-License-Identifier: GPL-3.0-only
*/
#include "audio/fatfs_audio_input.hpp"
#include <algorithm>
#include <climits>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <span>
#include <string>
#include <variant>
#include "audio/readahead_source.hpp"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "ff.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "audio/audio_events.hpp"
#include "audio/audio_fsm.hpp"
#include "audio/audio_source.hpp"
#include "audio/fatfs_source.hpp"
#include "codec.hpp"
#include "database/future_fetcher.hpp"
#include "database/tag_parser.hpp"
#include "database/track.hpp"
#include "drivers/spi.hpp"
#include "events/event_queue.hpp"
#include "tasks.hpp"
#include "types.hpp"
[[maybe_unused]] static const char* kTag = "SRC";
namespace audio {
FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser,
tasks::WorkerPool& bg_worker)
: IAudioSource(),
tag_parser_(tag_parser),
bg_worker_(bg_worker),
new_stream_mutex_(),
new_stream_(),
has_new_stream_(false) {}
FatfsAudioInput::~FatfsAudioInput() {}
auto FatfsAudioInput::SetPath(std::optional<std::string> path) -> void {
if (path) {
SetPath(*path);
} else {
SetPath();
}
}
auto FatfsAudioInput::SetPath(const std::string& path,
uint32_t offset) -> void {
std::lock_guard<std::mutex> guard{new_stream_mutex_};
if (OpenFile(path, offset)) {
has_new_stream_ = true;
has_new_stream_.notify_one();
}
}
auto FatfsAudioInput::SetPath() -> void {
std::lock_guard<std::mutex> guard{new_stream_mutex_};
new_stream_.reset();
has_new_stream_ = true;
has_new_stream_.notify_one();
}
auto FatfsAudioInput::HasNewStream() -> bool {
return has_new_stream_;
}
auto FatfsAudioInput::NextStream() -> std::shared_ptr<TaggedStream> {
while (true) {
has_new_stream_.wait(false);
{
std::lock_guard<std::mutex> guard{new_stream_mutex_};
if (!has_new_stream_.exchange(false)) {
// If the new stream went away, then we need to go back to waiting.
continue;
}
if (new_stream_ == nullptr) {
continue;
}
auto stream = new_stream_;
new_stream_ = nullptr;
return stream;
}
}
}
auto FatfsAudioInput::OpenFile(const std::string& path,
uint32_t offset) -> bool {
ESP_LOGI(kTag, "opening file %s", path.c_str());
auto tags = tag_parser_.ReadAndParseTags(path);
if (!tags) {
ESP_LOGE(kTag, "failed to read tags");
return false;
}
if (!tags->title()) {
tags->title(path);
}
auto stream_type = ContainerToStreamType(tags->encoding());
if (!stream_type.has_value()) {
ESP_LOGE(kTag, "couldn't match container to stream");
return false;
}
std::unique_ptr<FIL> file = std::make_unique<FIL>();
FRESULT res;
{
auto lock = drivers::acquire_spi();
res = f_open(file.get(), path.c_str(), FA_READ);
}
if (res != FR_OK) {
ESP_LOGE(kTag, "failed to open file! res: %i", res);
return false;
}
auto source =
std::make_unique<FatfsSource>(stream_type.value(), std::move(file));
new_stream_.reset(new TaggedStream(tags, std::move(source), path, offset));
return true;
}
auto FatfsAudioInput::ContainerToStreamType(database::Container enc)
-> std::optional<codecs::StreamType> {
switch (enc) {
case database::Container::kMp3:
return codecs::StreamType::kMp3;
case database::Container::kWav:
return codecs::StreamType::kWav;
case database::Container::kOgg:
return codecs::StreamType::kVorbis;
case database::Container::kFlac:
return codecs::StreamType::kFlac;
case database::Container::kOpus:
return codecs::StreamType::kOpus;
case database::Container::kUnsupported:
default:
return {};
}
}
} // namespace audio

@ -1,66 +0,0 @@
/*
* Copyright 2023 jacqueline <me@jacqueline.id.au>
*
* SPDX-License-Identifier: GPL-3.0-only
*/
#pragma once
#include <cstddef>
#include <cstdint>
#include <future>
#include <memory>
#include <string>
#include "ff.h"
#include "freertos/portmacro.h"
#include "audio/audio_source.hpp"
#include "codec.hpp"
#include "database/future_fetcher.hpp"
#include "database/tag_parser.hpp"
#include "tasks.hpp"
#include "types.hpp"
namespace audio {
/*
* Audio source that fetches data from a FatFs (or exfat i guess) filesystem.
*
* All public methods are safe to call from any task.
*/
class FatfsAudioInput : public IAudioSource {
public:
explicit FatfsAudioInput(database::ITagParser&, tasks::WorkerPool&);
~FatfsAudioInput();
/*
* Immediately cease reading any current source, and begin reading from the
* given file path.
*/
auto SetPath(std::optional<std::string>) -> void;
auto SetPath(const std::string&, uint32_t offset = 0) -> void;
auto SetPath() -> void;
auto HasNewStream() -> bool override;
auto NextStream() -> std::shared_ptr<TaggedStream> override;
FatfsAudioInput(const FatfsAudioInput&) = delete;
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
private:
auto OpenFile(const std::string& path, uint32_t offset) -> bool;
auto ContainerToStreamType(database::Container)
-> std::optional<codecs::StreamType>;
database::ITagParser& tag_parser_;
tasks::WorkerPool& bg_worker_;
std::mutex new_stream_mutex_;
std::shared_ptr<TaggedStream> new_stream_;
std::atomic<bool> has_new_stream_;
};
} // namespace audio

@ -0,0 +1,104 @@
/*
* Copyright 2023 jacqueline <me@jacqueline.id.au>
*
* SPDX-License-Identifier: GPL-3.0-only
*/
#include "audio/fatfs_stream_factory.hpp"
#include <cstdint>
#include <memory>
#include <string>
#include "database/database.hpp"
#include "esp_log.h"
#include "ff.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "audio/audio_source.hpp"
#include "audio/fatfs_source.hpp"
#include "codec.hpp"
#include "database/tag_parser.hpp"
#include "database/track.hpp"
#include "drivers/spi.hpp"
#include "system_fsm/service_locator.hpp"
#include "tasks.hpp"
#include "types.hpp"
[[maybe_unused]] static const char* kTag = "SRC";
namespace audio {
FatfsStreamFactory::FatfsStreamFactory(system_fsm::ServiceLocator& services)
: services_(services) {}
auto FatfsStreamFactory::create(database::TrackId id, uint32_t offset)
-> std::shared_ptr<TaggedStream> {
auto db = services_.database().lock();
if (!db) {
return {};
}
auto path = db->getTrackPath(id);
if (!path) {
return {};
}
return create(*path, offset);
}
auto FatfsStreamFactory::create(std::string path, uint32_t offset)
-> std::shared_ptr<TaggedStream> {
auto tags = services_.tag_parser().ReadAndParseTags(path);
if (!tags) {
ESP_LOGE(kTag, "failed to read tags");
return {};
}
if (!tags->title()) {
tags->title(path);
}
auto stream_type = ContainerToStreamType(tags->encoding());
if (!stream_type.has_value()) {
ESP_LOGE(kTag, "couldn't match container to stream");
return {};
}
std::unique_ptr<FIL> file = std::make_unique<FIL>();
FRESULT res;
{
auto lock = drivers::acquire_spi();
res = f_open(file.get(), path.c_str(), FA_READ);
}
if (res != FR_OK) {
ESP_LOGE(kTag, "failed to open file! res: %i", res);
return {};
}
return std::make_shared<TaggedStream>(
tags, std::make_unique<FatfsSource>(stream_type.value(), std::move(file)),
path, offset);
}
auto FatfsStreamFactory::ContainerToStreamType(database::Container enc)
-> std::optional<codecs::StreamType> {
switch (enc) {
case database::Container::kMp3:
return codecs::StreamType::kMp3;
case database::Container::kWav:
return codecs::StreamType::kWav;
case database::Container::kOgg:
return codecs::StreamType::kVorbis;
case database::Container::kFlac:
return codecs::StreamType::kFlac;
case database::Container::kOpus:
return codecs::StreamType::kOpus;
case database::Container::kUnsupported:
default:
return {};
}
}
} // namespace audio

@ -0,0 +1,53 @@
/*
* Copyright 2023 jacqueline <me@jacqueline.id.au>
*
* SPDX-License-Identifier: GPL-3.0-only
*/
#pragma once
#include <stdint.h>
#include <cstddef>
#include <cstdint>
#include <future>
#include <memory>
#include <string>
#include "database/database.hpp"
#include "database/track.hpp"
#include "ff.h"
#include "freertos/portmacro.h"
#include "audio/audio_source.hpp"
#include "codec.hpp"
#include "database/future_fetcher.hpp"
#include "database/tag_parser.hpp"
#include "system_fsm/service_locator.hpp"
#include "tasks.hpp"
#include "types.hpp"
namespace audio {
/*
* Utility to create streams that read from files on the sd card.
*/
class FatfsStreamFactory {
public:
explicit FatfsStreamFactory(system_fsm::ServiceLocator&);
auto create(database::TrackId, uint32_t offset = 0)
-> std::shared_ptr<TaggedStream>;
auto create(std::string, uint32_t offset = 0)
-> std::shared_ptr<TaggedStream>;
FatfsStreamFactory(const FatfsStreamFactory&) = delete;
FatfsStreamFactory& operator=(const FatfsStreamFactory&) = delete;
private:
auto ContainerToStreamType(database::Container)
-> std::optional<codecs::StreamType>;
system_fsm::ServiceLocator& services_;
};
} // namespace audio

@ -4,8 +4,7 @@
* SPDX-License-Identifier: GPL-3.0-only * SPDX-License-Identifier: GPL-3.0-only
*/ */
#include "audio/audio_converter.hpp" #include "audio/processor.hpp"
#include <stdint.h>
#include <algorithm> #include <algorithm>
#include <cmath> #include <cmath>
@ -32,7 +31,7 @@ static constexpr std::size_t kSourceBufferLength = kSampleBufferLength * 2;
namespace audio { namespace audio {
SampleConverter::SampleConverter() SampleProcessor::SampleProcessor()
: commands_(xQueueCreate(1, sizeof(Args))), : commands_(xQueueCreate(1, sizeof(Args))),
resampler_(nullptr), resampler_(nullptr),
source_(xStreamBufferCreateWithCaps(kSourceBufferLength, source_(xStreamBufferCreateWithCaps(kSourceBufferLength,
@ -55,19 +54,19 @@ SampleConverter::SampleConverter()
tasks::StartPersistent<tasks::Type::kAudioConverter>([&]() { Main(); }); tasks::StartPersistent<tasks::Type::kAudioConverter>([&]() { Main(); });
} }
SampleConverter::~SampleConverter() { SampleProcessor::~SampleProcessor() {
vQueueDelete(commands_); vQueueDelete(commands_);
vStreamBufferDelete(source_); vStreamBufferDelete(source_);
} }
auto SampleConverter::SetOutput(std::shared_ptr<IAudioOutput> output) -> void { auto SampleProcessor::SetOutput(std::shared_ptr<IAudioOutput> output) -> void {
// FIXME: We should add synchronisation here, but we should be careful about // FIXME: We should add synchronisation here, but we should be careful about
// not impacting performance given that the output will change only very // not impacting performance given that the output will change only very
// rarely (if ever). // rarely (if ever).
sink_ = output; sink_ = output;
} }
auto SampleConverter::beginStream(std::shared_ptr<TrackInfo> track) -> void { auto SampleProcessor::beginStream(std::shared_ptr<TrackInfo> track) -> void {
Args args{ Args args{
.track = new std::shared_ptr<TrackInfo>(track), .track = new std::shared_ptr<TrackInfo>(track),
.samples_available = 0, .samples_available = 0,
@ -76,7 +75,7 @@ auto SampleConverter::beginStream(std::shared_ptr<TrackInfo> track) -> void {
xQueueSend(commands_, &args, portMAX_DELAY); xQueueSend(commands_, &args, portMAX_DELAY);
} }
auto SampleConverter::continueStream(std::span<sample::Sample> input) -> void { auto SampleProcessor::continueStream(std::span<sample::Sample> input) -> void {
Args args{ Args args{
.track = nullptr, .track = nullptr,
.samples_available = input.size(), .samples_available = input.size(),
@ -86,7 +85,7 @@ auto SampleConverter::continueStream(std::span<sample::Sample> input) -> void {
xStreamBufferSend(source_, input.data(), input.size_bytes(), portMAX_DELAY); xStreamBufferSend(source_, input.data(), input.size_bytes(), portMAX_DELAY);
} }
auto SampleConverter::endStream() -> void { auto SampleProcessor::endStream() -> void {
Args args{ Args args{
.track = nullptr, .track = nullptr,
.samples_available = 0, .samples_available = 0,
@ -95,7 +94,7 @@ auto SampleConverter::endStream() -> void {
xQueueSend(commands_, &args, portMAX_DELAY); xQueueSend(commands_, &args, portMAX_DELAY);
} }
auto SampleConverter::Main() -> void { auto SampleProcessor::Main() -> void {
for (;;) { for (;;) {
Args args; Args args;
while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { while (!xQueueReceive(commands_, &args, portMAX_DELAY)) {
@ -114,7 +113,7 @@ auto SampleConverter::Main() -> void {
} }
} }
auto SampleConverter::handleBeginStream(std::shared_ptr<TrackInfo> track) auto SampleProcessor::handleBeginStream(std::shared_ptr<TrackInfo> track)
-> void { -> void {
if (track->format != source_format_) { if (track->format != source_format_) {
resampler_.reset(); resampler_.reset();
@ -145,7 +144,7 @@ auto SampleConverter::handleBeginStream(std::shared_ptr<TrackInfo> track)
}); });
} }
auto SampleConverter::handleContinueStream(size_t samples_available) -> void { auto SampleProcessor::handleContinueStream(size_t samples_available) -> void {
// Loop until we finish reading all the bytes indicated. There might be // Loop until we finish reading all the bytes indicated. There might be
// leftovers from each iteration, and from this process as a whole, // leftovers from each iteration, and from this process as a whole,
// depending on the resampling stage. // depending on the resampling stage.
@ -182,7 +181,7 @@ auto SampleConverter::handleContinueStream(size_t samples_available) -> void {
} }
} }
auto SampleConverter::handleSamples(std::span<sample::Sample> input) -> size_t { auto SampleProcessor::handleSamples(std::span<sample::Sample> input) -> size_t {
if (source_format_ == target_format_) { if (source_format_ == target_format_) {
// The happiest possible case: the input format matches the output // The happiest possible case: the input format matches the output
// format already. // format already.
@ -223,7 +222,7 @@ auto SampleConverter::handleSamples(std::span<sample::Sample> input) -> size_t {
return samples_used; return samples_used;
} }
auto SampleConverter::handleEndStream() -> void { auto SampleProcessor::handleEndStream() -> void {
if (resampler_) { if (resampler_) {
size_t read, written; size_t read, written;
std::tie(read, written) = resampler_->Process({}, resampled_buffer_, true); std::tie(read, written) = resampler_->Process({}, resampled_buffer_, true);
@ -245,7 +244,7 @@ auto SampleConverter::handleEndStream() -> void {
events::Audio().Dispatch(internal::StreamEnded{}); events::Audio().Dispatch(internal::StreamEnded{});
} }
auto SampleConverter::sendToSink(std::span<sample::Sample> samples) -> void { auto SampleProcessor::sendToSink(std::span<sample::Sample> samples) -> void {
// Update the number of samples sunk so far *before* actually sinking them, // Update the number of samples sunk so far *before* actually sinking them,
// since writing to the stream buffer will block when the buffer gets full. // since writing to the stream buffer will block when the buffer gets full.
samples_sunk_ += samples.size(); samples_sunk_ += samples.size();

@ -25,10 +25,10 @@ namespace audio {
* format of the current output device. The resulting samples are forwarded * format of the current output device. The resulting samples are forwarded
* to the output device's sink stream. * to the output device's sink stream.
*/ */
class SampleConverter { class SampleProcessor {
public: public:
SampleConverter(); SampleProcessor();
~SampleConverter(); ~SampleProcessor();
auto SetOutput(std::shared_ptr<IAudioOutput>) -> void; auto SetOutput(std::shared_ptr<IAudioOutput>) -> void;
Loading…
Cancel
Save