big cleanup of new encoder + stream buffer types

custom
jacqueline 2 years ago
parent f94be3db2f
commit 9b1b401dcb
  1. 2
      src/audio/CMakeLists.txt
  2. 265
      src/audio/audio_task.cpp
  3. 70
      src/audio/fatfs_audio_input.cpp
  4. 9
      src/audio/i2s_audio_output.cpp
  5. 2
      src/audio/include/audio_sink.hpp
  6. 20
      src/audio/include/audio_source.hpp
  7. 18
      src/audio/include/audio_task.hpp
  8. 13
      src/audio/include/fatfs_audio_input.hpp
  9. 2
      src/audio/include/i2s_audio_output.hpp
  10. 72
      src/audio/include/stream_info.hpp
  11. 75
      src/audio/stream_info.cpp
  12. 4
      src/main/main.cpp

@ -5,7 +5,7 @@
idf_component_register( idf_component_register(
SRCS "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" SRCS "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp"
"stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" "track_queue.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" "track_queue.cpp"
"stream_event.cpp" "pipeline.cpp" "stream_info.cpp" "audio_fsm.cpp" "stream_event.cpp" "stream_info.cpp" "audio_fsm.cpp"
INCLUDE_DIRS "include" INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm" "database" "system_fsm" "playlist") REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm" "database" "system_fsm" "playlist")

@ -21,6 +21,7 @@
#include "audio_events.hpp" #include "audio_events.hpp"
#include "audio_fsm.hpp" #include "audio_fsm.hpp"
#include "audio_sink.hpp" #include "audio_sink.hpp"
#include "audio_source.hpp"
#include "cbor.h" #include "cbor.h"
#include "codec.hpp" #include "codec.hpp"
#include "esp_err.h" #include "esp_err.h"
@ -43,6 +44,7 @@
#include "stream_message.hpp" #include "stream_message.hpp"
#include "sys/_stdint.h" #include "sys/_stdint.h"
#include "tasks.hpp" #include "tasks.hpp"
#include "types.hpp"
#include "ui_fsm.hpp" #include "ui_fsm.hpp"
namespace audio { namespace audio {
@ -62,7 +64,7 @@ auto Timer::SetLengthSeconds(uint32_t len) -> void {
} }
auto Timer::SetLengthBytes(uint32_t len) -> void { auto Timer::SetLengthBytes(uint32_t len) -> void {
total_duration_seconds_ = 0; total_duration_seconds_ = bytes_to_samples(len) / format_.sample_rate;
} }
auto Timer::AddBytes(std::size_t bytes) -> void { auto Timer::AddBytes(std::size_t bytes) -> void {
@ -84,14 +86,29 @@ auto Timer::AddBytes(std::size_t bytes) -> void {
} }
if (incremented) { if (incremented) {
// ESP_LOGI("timer", "new time %lu", current_seconds_); if (total_duration_seconds_ < current_seconds_) {
total_duration_seconds_ = current_seconds_;
}
events::Audio().Dispatch(PlaybackUpdate{ events::Audio().Dispatch(PlaybackUpdate{
.seconds_elapsed = current_seconds_, .seconds_elapsed = current_seconds_,
.seconds_total = 0, .seconds_total = total_duration_seconds_,
}); });
} }
} }
auto Timer::bytes_to_samples(uint32_t bytes) -> uint32_t {
uint32_t samples = bytes;
samples /= format_.channels;
// Samples must be aligned to 16 bits. The number of actual bytes per
// sample is therefore the bps divided by 16, rounded up (align to word),
// times two (convert to bytes).
uint8_t bytes_per_sample = ((format_.bits_per_sample + 16 - 1) / 16) * 2;
samples /= bytes_per_sample;
return samples;
}
auto AudioTask::Start(IAudioSource* source, IAudioSink* sink) -> AudioTask* { auto AudioTask::Start(IAudioSource* source, IAudioSink* sink) -> AudioTask* {
AudioTask* task = new AudioTask(source, sink); AudioTask* task = new AudioTask(source, sink);
tasks::StartPersistent<tasks::Type::kAudio>([=]() { task->Main(); }); tasks::StartPersistent<tasks::Type::kAudio>([=]() { task->Main(); });
@ -103,7 +120,7 @@ AudioTask::AudioTask(IAudioSource* source, IAudioSink* sink)
sink_(sink), sink_(sink),
codec_(), codec_(),
timer_(), timer_(),
is_new_stream_(false), has_begun_decoding_(false),
current_input_format_(), current_input_format_(),
current_output_format_(), current_output_format_(),
sample_buffer_(reinterpret_cast<std::byte*>( sample_buffer_(reinterpret_cast<std::byte*>(
@ -114,38 +131,72 @@ AudioTask::AudioTask(IAudioSource* source, IAudioSink* sink)
void AudioTask::Main() { void AudioTask::Main() {
for (;;) { for (;;) {
source_->Read( source_->Read(
[this](StreamInfo::Format format) -> bool { [this](IAudioSource::Flags flags, InputStream& stream) -> void {
if (current_input_format_ && format == *current_input_format_) { if (flags.is_start()) {
// This is the continuation of previous data. We can handle it if has_begun_decoding_ = false;
// we are able to decode it, or if it doesn't need decoding. if (!HandleNewStream(stream)) {
return current_output_format_ == format || codec_ != nullptr; return;
}
}
auto pcm = stream.info().format_as<StreamInfo::Pcm>();
if (pcm) {
if (ForwardPcmStream(*pcm, stream.data())) {
stream.consume(stream.data().size_bytes());
}
timer_->SetLengthBytes(
stream.info().total_length_bytes().value_or(0));
return;
}
if (!stream.info().format_as<StreamInfo::Encoded>() || !codec_) {
// Either unknown stream format, or it's encoded but we don't have
// a decoder that supports it. Either way, bail out.
return;
}
if (!has_begun_decoding_) {
if (BeginDecoding(stream)) {
has_begun_decoding_ = true;
} else {
return;
}
}
// At this point the decoder has been initialised, and the sink has
// been correctly configured. All that remains is to throw samples
// into the sink as fast as possible.
if (!ContinueDecoding(stream)) {
codec_.reset();
}
if (flags.is_end()) {
FinishDecoding(stream);
events::Audio().Dispatch(internal::InputFileFinished{});
} }
},
portMAX_DELAY);
}
}
auto AudioTask::HandleNewStream(const InputStream& stream) -> bool {
// This must be a new stream of data. Reset everything to prepare to // This must be a new stream of data. Reset everything to prepare to
// handle it. // handle it.
current_input_format_ = format; current_input_format_ = stream.info().format();
is_new_stream_ = true;
codec_.reset(); codec_.reset();
timer_.reset();
// What kind of data does this new stream contain? // What kind of data does this new stream contain?
if (std::holds_alternative<StreamInfo::Pcm>(format)) { auto pcm = stream.info().format_as<StreamInfo::Pcm>();
// It's already decoded! We can handle this immediately if it auto encoded = stream.info().format_as<StreamInfo::Encoded>();
// matches what we're currently sending to the sink. Otherwise, we if (pcm) {
// will need to wait for the sink to drain before we can reconfigure // It's already decoded! We can always handle this.
// it.
if (current_output_format_ && format == *current_output_format_) {
return true;
} else if (xStreamBufferIsEmpty(sink_->stream())) {
return true; return true;
} else { } else if (encoded) {
return false;
}
} else if (std::holds_alternative<StreamInfo::Encoded>(format)) {
// The stream has some kind of encoding. Whether or not we can // The stream has some kind of encoding. Whether or not we can
// handle it is entirely down to whether or not we have a codec for // handle it is entirely down to whether or not we have a codec for
// it. // it.
auto encoding = std::get<StreamInfo::Encoded>(format); has_begun_decoding_ = false;
auto codec = codecs::CreateCodecForType(encoding.type); auto codec = codecs::CreateCodecForType(encoded->type);
if (codec) { if (codec) {
ESP_LOGI(kTag, "successfully created codec for stream"); ESP_LOGI(kTag, "successfully created codec for stream");
codec_.reset(*codec); codec_.reset(*codec);
@ -157,104 +208,136 @@ void AudioTask::Main() {
} else { } else {
// programmer error / skill issue :( // programmer error / skill issue :(
ESP_LOGE(kTag, "stream has unknown format"); ESP_LOGE(kTag, "stream has unknown format");
current_input_format_ = format;
return false; return false;
} }
}, }
[this](cpp::span<const std::byte> bytes) -> size_t {
// PCM streams are simple, so handle them first.
if (std::holds_alternative<StreamInfo::Pcm>(*current_input_format_)) {
// First we need to reconfigure the sink for this sample format.
// TODO(jacqueline): We should verify whether or not the sink can
// actually deal with this format first.
if (current_input_format_ != current_output_format_) {
current_output_format_ = current_input_format_;
sink_->Configure(*current_output_format_);
timer_.reset(new Timer(
std::get<StreamInfo::Pcm>(*current_output_format_)));
}
// Stream the raw samples directly to the sink.
xStreamBufferSend(sink_->stream(), bytes.data(), bytes.size_bytes(),
portMAX_DELAY);
timer_->AddBytes(bytes.size_bytes());
return bytes.size_bytes();
}
// Else, assume it's an encoded stream.
size_t bytes_used = 0; auto AudioTask::BeginDecoding(InputStream& stream) -> bool {
if (is_new_stream_) { auto res = codec_->BeginStream(stream.data());
// This is a new stream! First order of business is verifying that stream.consume(res.first);
// we can indeed decode it.
auto res = codec_->BeginStream(bytes);
bytes_used += res.first;
if (res.second.has_error()) { if (res.second.has_error()) {
if (res.second.error() != codecs::ICodec::Error::kOutOfInput) { if (res.second.error() == codecs::ICodec::Error::kOutOfInput) {
// Decoding the header failed, so we can't actually deal with // Running out of input is fine; just return and we will try beginning the
// this stream after all. It could be malformed. // stream again when we have more data.
return false;
}
// Decoding the header failed, so we can't actually deal with this stream
// after all. It could be malformed.
ESP_LOGE(kTag, "error beginning stream"); ESP_LOGE(kTag, "error beginning stream");
codec_.reset(); codec_.reset();
return false;
} }
return bytes_used;
}
is_new_stream_ = false;
codecs::ICodec::OutputFormat format = res.second.value(); codecs::ICodec::OutputFormat format = res.second.value();
StreamInfo::Pcm pcm{ StreamInfo::Pcm new_format{
.channels = format.num_channels, .channels = format.num_channels,
.bits_per_sample = format.bits_per_sample, .bits_per_sample = format.bits_per_sample,
.sample_rate = format.sample_rate_hz, .sample_rate = format.sample_rate_hz,
}; };
StreamInfo::Format new_format{pcm};
timer_.reset(new Timer{pcm}); if (!ConfigureSink(new_format)) {
return false;
}
if (format.duration_seconds) { if (format.duration_seconds) {
timer_->SetLengthSeconds(*format.duration_seconds); timer_->SetLengthSeconds(*format.duration_seconds);
} else {
timer_->SetLengthBytes(stream.info().total_length_bytes().value_or(0));
} }
// Now that we have the output format for decoded samples from this return true;
// stream, we need to see if they are compatible with what's already }
// in the sink stream.
if (new_format != current_output_format_) { auto AudioTask::ContinueDecoding(InputStream& stream) -> bool {
// The new format is different to the old one. Wait for the sink while (!stream.data().empty()) {
// to drain before continuing. auto res = codec_->ContinueStream(stream.data(),
while (!xStreamBufferIsEmpty(sink_->stream())) { {sample_buffer_, sample_buffer_len_});
ESP_LOGI(kTag, "waiting for sink stream to drain...");
// TODO(jacqueline): Get the sink drain ISR to notify us of this stream.consume(res.first);
// via semaphore instead of busy-ish waiting.
vTaskDelay(pdMS_TO_TICKS(100)); if (res.second.has_error()) {
if (res.second.error() == codecs::ICodec::Error::kOutOfInput) {
return true;
} else {
return false;
} }
} else {
xStreamBufferSend(sink_->stream(), sample_buffer_,
res.second->bytes_written, portMAX_DELAY);
timer_->AddBytes(res.second->bytes_written);
} }
ESP_LOGI(kTag, "configuring sink");
current_output_format_ = new_format;
sink_->Configure(new_format);
timer_.reset(
new Timer(std::get<StreamInfo::Pcm>(*current_output_format_)));
} }
return true;
}
// At this point the decoder has been initialised, and the sink has auto AudioTask::FinishDecoding(InputStream& stream) -> void {
// been correctly configured. All that remains is to throw samples // HACK: libmad requires each frame passed to it to have an additional
// into the sink as fast as possible. // MAD_HEADER_GUARD (8) bytes after the end of the frame. Without these extra
while (bytes_used < bytes.size_bytes()) { // bytes, it will not decode the frame.
auto res = // The is fine for most of the stream, but at the end of the stream we don't
codec_->ContinueStream(bytes.subspan(bytes_used), // get a trailing 8 bytes for free.
{sample_buffer_, sample_buffer_len_}); if (stream.info().format_as<StreamInfo::Encoded>()->type ==
codecs::StreamType::kMp3) {
ESP_LOGI(kTag, "applying MAD_HEADER_GUARD fix");
std::unique_ptr<RawStream> mad_buffer;
mad_buffer.reset(new RawStream(stream.data().size_bytes() + 8));
bytes_used += res.first; OutputStream writer{mad_buffer.get()};
std::copy(stream.data().begin(), stream.data().end(),
writer.data().begin());
std::fill(writer.data().begin(), writer.data().end(), std::byte{0});
InputStream padded_stream{mad_buffer.get()};
auto res = codec_->ContinueStream(stream.data(),
{sample_buffer_, sample_buffer_len_});
if (res.second.has_error()) { if (res.second.has_error()) {
return bytes_used; return;
} else { }
xStreamBufferSend(sink_->stream(), sample_buffer_, xStreamBufferSend(sink_->stream(), sample_buffer_,
res.second->bytes_written, portMAX_DELAY); res.second->bytes_written, portMAX_DELAY);
timer_->AddBytes(res.second->bytes_written); timer_->AddBytes(res.second->bytes_written);
} }
}
auto AudioTask::ForwardPcmStream(StreamInfo::Pcm& format,
cpp::span<const std::byte> samples) -> bool {
// First we need to reconfigure the sink for this sample format.
if (format != current_output_format_) {
if (!ConfigureSink(format)) {
return false;
}
} }
return bytes_used; // Stream the raw samples directly to the sink.
}, xStreamBufferSend(sink_->stream(), samples.data(), samples.size_bytes(),
portMAX_DELAY); portMAX_DELAY);
timer_->AddBytes(samples.size_bytes());
return true;
}
auto AudioTask::ConfigureSink(const StreamInfo::Pcm& format) -> bool {
if (format != current_output_format_) {
// The new format is different to the old one. Wait for the sink to drain
// before continuing.
while (!xStreamBufferIsEmpty(sink_->stream())) {
ESP_LOGI(kTag, "waiting for sink stream to drain...");
// TODO(jacqueline): Get the sink drain ISR to notify us of this
// via semaphore instead of busy-ish waiting.
vTaskDelay(pdMS_TO_TICKS(100));
} }
ESP_LOGI(kTag, "configuring sink");
if (!sink_->Configure(format)) {
return false;
}
}
current_output_format_ = format;
timer_.reset(new Timer(format));
return true;
} }
} // namespace audio } // namespace audio

@ -145,21 +145,15 @@ FatfsAudioInput::FatfsAudioInput(
has_data_(xSemaphoreCreateBinary()), has_data_(xSemaphoreCreateBinary()),
streamer_buffer_(xStreamBufferCreate(kStreamerBufferSize, 1)), streamer_buffer_(xStreamBufferCreate(kStreamerBufferSize, 1)),
streamer_(new FileStreamer(streamer_buffer_, has_data_)), streamer_(new FileStreamer(streamer_buffer_, has_data_)),
file_buffer_info_(), input_buffer_(new RawStream(kFileBufferSize)),
file_buffer_len_(kFileBufferSize),
file_buffer_(reinterpret_cast<std::byte*>(
heap_caps_malloc(file_buffer_len_,
MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL))),
file_buffer_stream_(&file_buffer_info_, {file_buffer_, file_buffer_len_}),
source_mutex_(), source_mutex_(),
pending_path_(), pending_path_(),
current_format_() {} is_first_read_(false) {}
FatfsAudioInput::~FatfsAudioInput() { FatfsAudioInput::~FatfsAudioInput() {
streamer_.reset(); streamer_.reset();
vStreamBufferDelete(streamer_buffer_); vStreamBufferDelete(streamer_buffer_);
vSemaphoreDelete(has_data_); vSemaphoreDelete(has_data_);
free(file_buffer_);
} }
auto FatfsAudioInput::SetPath(std::future<std::optional<std::string>> fut) auto FatfsAudioInput::SetPath(std::future<std::optional<std::string>> fut)
@ -185,9 +179,7 @@ auto FatfsAudioInput::SetPath() -> void {
CloseCurrentFile(); CloseCurrentFile();
} }
auto FatfsAudioInput::Read( auto FatfsAudioInput::Read(std::function<void(Flags, InputStream&)> read_cb,
std::function<bool(StreamInfo::Format)> can_read,
std::function<size_t(cpp::span<const std::byte>)> read,
TickType_t max_wait) -> void { TickType_t max_wait) -> void {
// Wait until we have data to return. // Wait until we have data to return.
xSemaphoreTake(has_data_, portMAX_DELAY); xSemaphoreTake(has_data_, portMAX_DELAY);
@ -205,7 +197,7 @@ auto FatfsAudioInput::Read(
auto res = pending_path_->Result(); auto res = pending_path_->Result();
pending_path_.reset(); pending_path_.reset();
if (res || *res) { if (res && *res) {
OpenFile(**res); OpenFile(**res);
} }
@ -217,28 +209,22 @@ auto FatfsAudioInput::Read(
// Move data from the file streamer's buffer into our file buffer. We need our // Move data from the file streamer's buffer into our file buffer. We need our
// own buffer so that we can handle concatenating smaller file chunks into // own buffer so that we can handle concatenating smaller file chunks into
// complete frames for the decoder. // complete frames for the decoder.
OutputStream writer{&file_buffer_stream_}; OutputStream writer{input_buffer_.get()};
std::size_t bytes_added = std::size_t bytes_added =
xStreamBufferReceive(streamer_buffer_, writer.data().data(), xStreamBufferReceive(streamer_buffer_, writer.data().data(),
writer.data().size_bytes(), pdMS_TO_TICKS(0)); writer.data().size_bytes(), pdMS_TO_TICKS(0));
writer.add(bytes_added); writer.add(bytes_added);
// HACK: libmad needs at least MAD_HEADER_GUARD (= 8) extra bytes following a bool has_data_remaining = HasDataRemaining();
// frame, or else it refuses to decode it.
if (IsCurrentFormatMp3() && !HasDataRemaining()) {
ESP_LOGI(kTag, "applying MAD_HEADER_GUARD fix");
cpp::span<std::byte> buf = writer.data();
size_t pad_amount = std::min<size_t>(buf.size_bytes(), 8);
std::fill_n(buf.begin(), pad_amount, static_cast<std::byte>(0));
}
InputStream reader{&file_buffer_stream_}; InputStream reader{input_buffer_.get()};
auto data_for_cb = reader.data(); auto data_for_cb = reader.data();
if (!data_for_cb.empty() && std::invoke(can_read, *current_format_)) { if (!data_for_cb.empty()) {
reader.consume(std::invoke(read, reader.data())); std::invoke(read_cb, Flags{is_first_read_, !has_data_remaining}, reader);
is_first_read_ = false;
} }
if (!HasDataRemaining()) { if (!has_data_remaining) {
// Out of data. We're finished. Note we don't care about anything left in // Out of data. We're finished. Note we don't care about anything left in
// the file buffer at this point; the callback as seen it, so if it didn't // the file buffer at this point; the callback as seen it, so if it didn't
// consume it then presumably whatever is left isn't enough to form a // consume it then presumably whatever is left isn't enough to form a
@ -273,18 +259,19 @@ auto FatfsAudioInput::OpenFile(const std::string& path) -> void {
return; return;
} }
if (*stream_type == codecs::StreamType::kPcm && tags.channels && StreamInfo::Format format;
tags.bits_per_sample && tags.channels) { if (*stream_type == codecs::StreamType::kPcm) {
current_format_ = StreamInfo::Pcm{ if (tags.channels && tags.bits_per_sample && tags.channels) {
format = StreamInfo::Pcm{
.channels = static_cast<uint8_t>(*tags.channels), .channels = static_cast<uint8_t>(*tags.channels),
.bits_per_sample = static_cast<uint8_t>(*tags.bits_per_sample), .bits_per_sample = static_cast<uint8_t>(*tags.bits_per_sample),
.sample_rate = static_cast<uint32_t>(*tags.sample_rate), .sample_rate = static_cast<uint32_t>(*tags.sample_rate)};
}; } else {
ESP_LOGW(kTag, "pcm stream missing format info");
return;
}
} else { } else {
current_format_ = StreamInfo::Encoded{ format = StreamInfo::Encoded{.type = *stream_type};
.type = *stream_type,
.duration_bytes = info.fsize,
};
} }
std::unique_ptr<FIL> file = std::make_unique<FIL>(); std::unique_ptr<FIL> file = std::make_unique<FIL>();
@ -294,15 +281,17 @@ auto FatfsAudioInput::OpenFile(const std::string& path) -> void {
return; return;
} }
streamer_->Restart(std::move(file)); OutputStream writer{input_buffer_.get()};
writer.prepare(format, info.fsize);
streamer_->Restart(std::move(file));
is_first_read_ = true;
events::Audio().Dispatch(internal::InputFileOpened{}); events::Audio().Dispatch(internal::InputFileOpened{});
} }
auto FatfsAudioInput::CloseCurrentFile() -> void { auto FatfsAudioInput::CloseCurrentFile() -> void {
streamer_->Restart({}); streamer_->Restart({});
xStreamBufferReset(streamer_buffer_); xStreamBufferReset(streamer_buffer_);
current_format_ = {};
} }
auto FatfsAudioInput::HasDataRemaining() -> bool { auto FatfsAudioInput::HasDataRemaining() -> bool {
@ -327,14 +316,11 @@ auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc)
} }
auto FatfsAudioInput::IsCurrentFormatMp3() -> bool { auto FatfsAudioInput::IsCurrentFormatMp3() -> bool {
if (!current_format_) { auto format = input_buffer_->info().format_as<StreamInfo::Encoded>();
return false; if (!format) {
}
if (!std::holds_alternative<StreamInfo::Encoded>(*current_format_)) {
return false; return false;
} }
return std::get<StreamInfo::Encoded>(*current_format_).type == return format->type == codecs::StreamType::kMp3;
codecs::StreamType::kMp3;
} }
} // namespace audio } // namespace audio

@ -114,14 +114,7 @@ auto I2SAudioOutput::AdjustVolumeDown() -> bool {
return true; return true;
} }
auto I2SAudioOutput::Configure(const StreamInfo::Format& format) -> bool { auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> bool {
if (!std::holds_alternative<StreamInfo::Pcm>(format)) {
ESP_LOGI(kTag, "ignoring non-pcm stream (%d)", format.index());
return false;
}
StreamInfo::Pcm pcm = std::get<StreamInfo::Pcm>(format);
if (current_config_ && pcm == *current_config_) { if (current_config_ && pcm == *current_config_) {
ESP_LOGI(kTag, "ignoring unchanged format"); ESP_LOGI(kTag, "ignoring unchanged format");
return true; return true;

@ -38,7 +38,7 @@ class IAudioSink {
virtual auto AdjustVolumeUp() -> bool = 0; virtual auto AdjustVolumeUp() -> bool = 0;
virtual auto AdjustVolumeDown() -> bool = 0; virtual auto AdjustVolumeDown() -> bool = 0;
virtual auto Configure(const StreamInfo::Format& format) -> bool = 0; virtual auto Configure(const StreamInfo::Pcm& format) -> bool = 0;
virtual auto Send(const cpp::span<std::byte>& data) -> void = 0; virtual auto Send(const cpp::span<std::byte>& data) -> void = 0;
auto stream() -> StreamBufferHandle_t { return stream_; } auto stream() -> StreamBufferHandle_t { return stream_; }

@ -8,6 +8,7 @@
#include <stdint.h> #include <stdint.h>
#include <bitset>
#include <memory> #include <memory>
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
@ -22,12 +23,25 @@ class IAudioSource {
public: public:
virtual ~IAudioSource() {} virtual ~IAudioSource() {}
class Flags {
public:
Flags(bool is_start, bool is_end) {
flags_[0] = is_start;
flags_[1] = is_start;
}
auto is_start() -> bool { return flags_[0]; }
auto is_end() -> bool { return flags_[1]; }
private:
std::bitset<2> flags_;
};
/* /*
* Synchronously fetches data from this source. * Synchronously fetches data from this source.
*/ */
virtual auto Read(std::function<bool(StreamInfo::Format)>, virtual auto Read(std::function<void(Flags, InputStream&)>, TickType_t)
std::function<size_t(cpp::span<const std::byte>)>, -> void = 0;
TickType_t) -> void = 0;
}; };
} // namespace audio } // namespace audio

@ -14,6 +14,7 @@
#include "audio_source.hpp" #include "audio_source.hpp"
#include "codec.hpp" #include "codec.hpp"
#include "pipeline.hpp" #include "pipeline.hpp"
#include "stream_info.hpp"
namespace audio { namespace audio {
@ -27,10 +28,13 @@ class Timer {
auto AddBytes(std::size_t) -> void; auto AddBytes(std::size_t) -> void;
private: private:
auto bytes_to_samples(uint32_t) -> uint32_t;
StreamInfo::Pcm format_; StreamInfo::Pcm format_;
uint32_t current_seconds_; uint32_t current_seconds_;
uint32_t current_sample_in_second_; uint32_t current_sample_in_second_;
uint32_t total_duration_seconds_; uint32_t total_duration_seconds_;
}; };
@ -43,14 +47,24 @@ class AudioTask {
private: private:
AudioTask(IAudioSource* source, IAudioSink* sink); AudioTask(IAudioSource* source, IAudioSink* sink);
auto HandleNewStream(const InputStream&) -> bool;
auto BeginDecoding(InputStream&) -> bool;
auto ContinueDecoding(InputStream&) -> bool;
auto FinishDecoding(InputStream&) -> void;
auto ForwardPcmStream(StreamInfo::Pcm&, cpp::span<const std::byte>) -> bool;
auto ConfigureSink(const StreamInfo::Pcm&) -> bool;
IAudioSource* source_; IAudioSource* source_;
IAudioSink* sink_; IAudioSink* sink_;
std::unique_ptr<codecs::ICodec> codec_; std::unique_ptr<codecs::ICodec> codec_;
std::unique_ptr<Timer> timer_; std::unique_ptr<Timer> timer_;
bool is_new_stream_; bool has_begun_decoding_;
std::optional<StreamInfo::Format> current_input_format_; std::optional<StreamInfo::Format> current_input_format_;
std::optional<StreamInfo::Format> current_output_format_; std::optional<StreamInfo::Pcm> current_output_format_;
std::byte* sample_buffer_; std::byte* sample_buffer_;
std::size_t sample_buffer_len_; std::size_t sample_buffer_len_;

@ -89,9 +89,8 @@ class FatfsAudioInput : public IAudioSource {
auto SetPath(const std::string&) -> void; auto SetPath(const std::string&) -> void;
auto SetPath() -> void; auto SetPath() -> void;
auto Read(std::function<bool(StreamInfo::Format)>, auto Read(std::function<void(Flags, InputStream&)>, TickType_t)
std::function<size_t(cpp::span<const std::byte>)>, -> void override;
TickType_t) -> void override;
FatfsAudioInput(const FatfsAudioInput&) = delete; FatfsAudioInput(const FatfsAudioInput&) = delete;
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
@ -118,11 +117,7 @@ class FatfsAudioInput : public IAudioSource {
StreamBufferHandle_t streamer_buffer_; StreamBufferHandle_t streamer_buffer_;
std::unique_ptr<FileStreamer> streamer_; std::unique_ptr<FileStreamer> streamer_;
StreamInfo file_buffer_info_; std::unique_ptr<RawStream> input_buffer_;
std::size_t file_buffer_len_;
std::byte* file_buffer_;
RawStream file_buffer_stream_;
// Mutex guarding the current file/stream associated with this source. Must be // Mutex guarding the current file/stream associated with this source. Must be
// held during readings, and before altering the current file. // held during readings, and before altering the current file.
@ -130,7 +125,7 @@ class FatfsAudioInput : public IAudioSource {
std::unique_ptr<database::FutureFetcher<std::optional<std::string>>> std::unique_ptr<database::FutureFetcher<std::optional<std::string>>>
pending_path_; pending_path_;
std::optional<StreamInfo::Format> current_format_; bool is_first_read_;
}; };
} // namespace audio } // namespace audio

@ -34,7 +34,7 @@ class I2SAudioOutput : public IAudioSink {
auto AdjustVolumeUp() -> bool override; auto AdjustVolumeUp() -> bool override;
auto AdjustVolumeDown() -> bool override; auto AdjustVolumeDown() -> bool override;
auto Configure(const StreamInfo::Format& format) -> bool override; auto Configure(const StreamInfo::Pcm& format) -> bool override;
auto Send(const cpp::span<std::byte>& data) -> void override; auto Send(const cpp::span<std::byte>& data) -> void override;
I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput(const I2SAudioOutput&) = delete;

@ -7,6 +7,7 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <sys/_stdint.h>
#include <cstdint> #include <cstdint>
#include <optional> #include <optional>
#include <string> #include <string>
@ -25,25 +26,26 @@
namespace audio { namespace audio {
struct StreamInfo { class StreamInfo {
public:
StreamInfo() : bytes_in_stream_(0), total_length_bytes_(), format_() {}
// The number of bytes that are available for consumption within this // The number of bytes that are available for consumption within this
// stream's buffer. // stream's buffer.
std::size_t bytes_in_stream{0}; auto bytes_in_stream() -> std::size_t& { return bytes_in_stream_; }
auto bytes_in_stream() const -> std::size_t { return bytes_in_stream_; }
bool is_producer_finished = true;
bool is_consumer_finished = true;
std::optional<std::uint32_t> duration_seconds;
std::optional<std::uint32_t> seek_to_seconds{}; auto total_length_bytes() -> std::optional<std::uint32_t>& {
return total_length_bytes_;
}
auto total_length_bytes() const -> std::optional<std::uint32_t> {
return total_length_bytes_;
}
struct Encoded { struct Encoded {
// The codec that this stream is associated with. // The codec that this stream is associated with.
codecs::StreamType type; codecs::StreamType type;
std::optional<std::size_t> duration_bytes;
bool operator==(const Encoded&) const = default; bool operator==(const Encoded&) const = default;
}; };
@ -59,33 +61,48 @@ struct StreamInfo {
}; };
typedef std::variant<std::monostate, Encoded, Pcm> Format; typedef std::variant<std::monostate, Encoded, Pcm> Format;
Format format{}; auto format() const -> const Format& { return format_; }
auto set_format(Format f) -> void { format_ = f; }
template <typename T>
auto format_as() const -> std::optional<T> {
if (std::holds_alternative<T>(format_)) {
return std::get<T>(format_);
}
return {};
}
bool operator==(const StreamInfo&) const = default; bool operator==(const StreamInfo&) const = default;
private:
std::size_t bytes_in_stream_;
std::optional<std::uint32_t> total_length_bytes_;
Format format_{};
}; };
class InputStream;
class OutputStream;
class RawStream { class RawStream {
public: public:
StreamInfo* info; explicit RawStream(std::size_t size);
cpp::span<std::byte> data; ~RawStream();
RawStream(StreamInfo* i, cpp::span<std::byte> d) : info(i), data(d) {} auto info() -> StreamInfo& { return info_; }
auto data() -> cpp::span<std::byte>;
private:
StreamInfo info_;
std::size_t buffer_size_;
std::byte* buffer_;
}; };
/*
* A byte buffer + associated metadata, which is not allowed to modify any of
* the underlying data.
*/
class InputStream { class InputStream {
public: public:
explicit InputStream(RawStream* s) : raw_(s) {} explicit InputStream(RawStream* s) : raw_(s) {}
void consume(std::size_t bytes) const; void consume(std::size_t bytes) const;
bool is_producer_finished() const;
void mark_consumer_finished() const;
const StreamInfo& info() const; const StreamInfo& info() const;
cpp::span<const std::byte> data() const; cpp::span<const std::byte> data() const;
@ -100,18 +117,13 @@ class OutputStream {
void add(std::size_t bytes) const; void add(std::size_t bytes) const;
bool prepare(const StreamInfo::Format& new_format); void prepare(const StreamInfo::Format& new_format,
std::optional<uint32_t> length);
void set_duration(std::size_t);
const StreamInfo& info() const; const StreamInfo& info() const;
cpp::span<std::byte> data() const; cpp::span<std::byte> data() const;
bool is_consumer_finished() const;
void mark_producer_finished() const;
private: private:
RawStream* raw_; RawStream* raw_;
}; };

@ -5,6 +5,7 @@
*/ */
#include "stream_info.hpp" #include "stream_info.hpp"
#include <sys/_stdint.h>
#include <cstdint> #include <cstdint>
#include <optional> #include <optional>
@ -14,77 +15,63 @@
#include <utility> #include <utility>
#include <variant> #include <variant>
#include "esp_heap_caps.h"
#include "result.hpp" #include "result.hpp"
#include "span.hpp" #include "span.hpp"
#include "types.hpp" #include "types.hpp"
namespace audio { namespace audio {
void InputStream::consume(std::size_t bytes) const { RawStream::RawStream(std::size_t size)
assert(raw_->info->bytes_in_stream >= bytes); : info_(),
auto new_data = buffer_size_(size),
raw_->data.subspan(bytes, raw_->info->bytes_in_stream - bytes); buffer_(reinterpret_cast<std::byte*>(
std::move(new_data.begin(), new_data.end(), raw_->data.begin()); heap_caps_malloc(size, MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))) {
raw_->info->bytes_in_stream = new_data.size_bytes(); assert(buffer_ != NULL);
}
RawStream::~RawStream() {
heap_caps_free(buffer_);
} }
void InputStream::mark_consumer_finished() const { auto RawStream::data() -> cpp::span<std::byte> {
raw_->info->is_consumer_finished = true; return {buffer_, buffer_size_};
if (is_producer_finished()) {
raw_->info->format = std::monostate();
}
} }
bool InputStream::is_producer_finished() const { void InputStream::consume(std::size_t bytes) const {
return raw_->info->is_producer_finished; assert(raw_->info().bytes_in_stream() >= bytes);
auto new_data =
raw_->data().subspan(bytes, raw_->info().bytes_in_stream() - bytes);
std::move(new_data.begin(), new_data.end(), raw_->data().begin());
raw_->info().bytes_in_stream() = new_data.size_bytes();
} }
const StreamInfo& InputStream::info() const { const StreamInfo& InputStream::info() const {
return *raw_->info; return raw_->info();
} }
cpp::span<const std::byte> InputStream::data() const { cpp::span<const std::byte> InputStream::data() const {
return raw_->data.first(raw_->info->bytes_in_stream); return raw_->data().first(raw_->info().bytes_in_stream());
} }
void OutputStream::add(std::size_t bytes) const { void OutputStream::add(std::size_t bytes) const {
assert(raw_->info->bytes_in_stream + bytes <= raw_->data.size_bytes()); assert(raw_->info().bytes_in_stream() + bytes <= raw_->data().size_bytes());
raw_->info->bytes_in_stream += bytes; raw_->info().bytes_in_stream() += bytes;
} }
bool OutputStream::prepare(const StreamInfo::Format& new_format) { void OutputStream::prepare(const StreamInfo::Format& new_format,
if (std::holds_alternative<std::monostate>(raw_->info->format) || std::optional<uint32_t> length) {
raw_->info->is_consumer_finished) { raw_->info().set_format(new_format);
raw_->info->format = new_format; raw_->info().bytes_in_stream() = 0;
raw_->info->bytes_in_stream = 0; raw_->info().total_length_bytes() = length;
raw_->info->is_producer_finished = false;
raw_->info->is_consumer_finished = false;
return true;
}
return false;
}
void OutputStream::set_duration(std::size_t seconds) {
raw_->info->duration_seconds = seconds;
} }
const StreamInfo& OutputStream::info() const { const StreamInfo& OutputStream::info() const {
return *raw_->info; return raw_->info();
} }
cpp::span<std::byte> OutputStream::data() const { cpp::span<std::byte> OutputStream::data() const {
return raw_->data.subspan(raw_->info->bytes_in_stream); return raw_->data().subspan(raw_->info().bytes_in_stream());
}
void OutputStream::mark_producer_finished() const {
raw_->info->is_producer_finished = true;
if (is_consumer_finished()) {
raw_->info->format = std::monostate();
}
}
bool OutputStream::is_consumer_finished() const {
return raw_->info->is_consumer_finished;
} }
} // namespace audio } // namespace audio

@ -20,6 +20,10 @@ extern "C" void app_main(void) {
ESP_ERROR_CHECK(drivers::init_i2c()); ESP_ERROR_CHECK(drivers::init_i2c());
drivers::Gpios* gpios = system_fsm::SystemState::early_init_gpios(); drivers::Gpios* gpios = system_fsm::SystemState::early_init_gpios();
// Semaphores must be empty before being added to a queue set. Hence all this
// weird early init stuff; by being explicit about initialisation order, we're
// able to handle GPIO ISR notifcations + system events from the same task,
// and a little mess with worth not needing to allocate a whole extra stack.
QueueSetHandle_t set = xQueueCreateSet(2); QueueSetHandle_t set = xQueueCreateSet(2);
auto* event_queue = events::queues::SystemAndAudio(); auto* event_queue = events::queues::SystemAndAudio();
xQueueAddToSet(event_queue->has_events(), set); xQueueAddToSet(event_queue->has_events(), set);

Loading…
Cancel
Save