pass stream start/update/end events through the whole pipeline

custom
jacqueline 1 year ago
parent 175bfc4e3e
commit 078b77d0f7
  1. 2
      dependencies.lock
  2. 113
      src/audio/audio_converter.cpp
  3. 22
      src/audio/audio_decoder.cpp
  4. 31
      src/audio/audio_fsm.cpp
  5. 18
      src/audio/include/audio_converter.hpp
  6. 15
      src/audio/include/audio_events.hpp
  7. 9
      src/audio/include/audio_fsm.hpp
  8. 4
      src/ui/ui_fsm.cpp

@ -4,6 +4,6 @@ dependencies:
source: source:
type: idf type: idf
version: 5.1.1 version: 5.1.1
manifest_hash: 9e4320e6f25503854c6c93bcbfa9b80f780485bcf066bdbad31a820544492538 manifest_hash: b9761e0028130d307b778c710e5dd39fb3c942d8084ed429d448d938957fb0e6
target: esp32 target: esp32
version: 1.0.0 version: 1.0.0

@ -28,7 +28,7 @@
[[maybe_unused]] static constexpr char kTag[] = "mixer"; [[maybe_unused]] static constexpr char kTag[] = "mixer";
static constexpr std::size_t kSampleBufferLength = static constexpr std::size_t kSampleBufferLength =
drivers::kI2SBufferLengthFrames * sizeof(sample::Sample); drivers::kI2SBufferLengthFrames * sizeof(sample::Sample) * 2;
static constexpr std::size_t kSourceBufferLength = kSampleBufferLength * 2; static constexpr std::size_t kSourceBufferLength = kSampleBufferLength * 2;
namespace audio { namespace audio {
@ -68,24 +68,32 @@ auto SampleConverter::SetOutput(std::shared_ptr<IAudioOutput> output) -> void {
sink_ = output; sink_ = output;
} }
auto SampleConverter::ConvertSamples(cpp::span<sample::Sample> input, auto SampleConverter::beginStream(std::shared_ptr<TrackInfo> track) -> void {
const IAudioOutput::Format& format,
bool is_eos) -> void {
Args args{ Args args{
.format = format, .track = new std::shared_ptr<TrackInfo>(track),
.samples_available = input.size(), .samples_available = 0,
.is_end_of_stream = is_eos, .is_end_of_stream = false,
}; };
xQueueSend(commands_, &args, portMAX_DELAY); xQueueSend(commands_, &args, portMAX_DELAY);
}
cpp::span<std::byte> input_as_bytes = { auto SampleConverter::continueStream(cpp::span<sample::Sample> input) -> void {
reinterpret_cast<std::byte*>(input.data()), input.size_bytes()}; Args args{
size_t bytes_sent = 0; .track = nullptr,
while (bytes_sent < input_as_bytes.size()) { .samples_available = input.size(),
bytes_sent += xStreamBufferSend( .is_end_of_stream = false,
source_, input_as_bytes.subspan(bytes_sent).data(), };
input_as_bytes.size() - bytes_sent, pdMS_TO_TICKS(100)); xQueueSend(commands_, &args, portMAX_DELAY);
xStreamBufferSend(source_, input.data(), input.size_bytes(), portMAX_DELAY);
} }
auto SampleConverter::endStream() -> void {
Args args{
.track = nullptr,
.samples_available = 0,
.is_end_of_stream = true,
};
xQueueSend(commands_, &args, portMAX_DELAY);
} }
auto SampleConverter::Main() -> void { auto SampleConverter::Main() -> void {
@ -93,12 +101,28 @@ auto SampleConverter::Main() -> void {
Args args; Args args;
while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { while (!xQueueReceive(commands_, &args, portMAX_DELAY)) {
} }
if (args.format != source_format_) {
if (args.track) {
handleBeginStream(*args.track);
delete args.track;
}
if (args.samples_available) {
handleContinueStream(args.samples_available);
}
if (args.is_end_of_stream) {
handleEndStream();
}
}
}
auto SampleConverter::handleBeginStream(std::shared_ptr<TrackInfo> track)
-> void {
if (track->format != source_format_) {
resampler_.reset(); resampler_.reset();
source_format_ = args.format; source_format_ = track->format;
leftover_bytes_ = 0; leftover_bytes_ = 0;
auto new_target = sink_->PrepareFormat(args.format); auto new_target = sink_->PrepareFormat(track->format);
if (new_target != target_format_) { if (new_target != target_format_) {
// The new format is different to the old one. Wait for the sink to // The new format is different to the old one. Wait for the sink to
// drain before continuing. // drain before continuing.
@ -112,26 +136,22 @@ auto SampleConverter::Main() -> void {
sink_->Configure(new_target); sink_->Configure(new_target);
} }
target_format_ = new_target; target_format_ = new_target;
// Send a final sample count for the previous sample rate.
if (samples_sunk_ > 0) {
events::Audio().Dispatch(internal::ConverterProgress{
.samples_sunk = samples_sunk_,
});
} }
samples_sunk_ = 0; samples_sunk_ = 0;
events::Audio().Dispatch(internal::ConverterConfigurationChanged{ events::Audio().Dispatch(internal::StreamStarted{
.track = track,
.src_format = source_format_, .src_format = source_format_,
.dst_format = target_format_, .dst_format = target_format_,
}); });
} }
auto SampleConverter::handleContinueStream(size_t samples_available) -> void {
// Loop until we finish reading all the bytes indicated. There might be // Loop until we finish reading all the bytes indicated. There might be
// leftovers from each iteration, and from this process as a whole, // leftovers from each iteration, and from this process as a whole,
// depending on the resampling stage. // depending on the resampling stage.
size_t bytes_read = 0; size_t bytes_read = 0;
size_t bytes_to_read = args.samples_available * sizeof(sample::Sample); size_t bytes_to_read = samples_available * sizeof(sample::Sample);
while (bytes_read < bytes_to_read) { while (bytes_read < bytes_to_read) {
// First top up the input buffer, taking care not to overwrite anything // First top up the input buffer, taking care not to overwrite anything
// remaining from a previous iteration. // remaining from a previous iteration.
@ -146,9 +166,7 @@ auto SampleConverter::Main() -> void {
size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_; size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_;
size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample); size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample);
size_t samples_used = size_t samples_used = handleSamples(input_buffer_.first(samples_in_buffer));
HandleSamples(input_buffer_.first(samples_in_buffer),
args.is_end_of_stream && bytes_read == bytes_to_read);
// Maybe the resampler didn't consume everything. Maybe the last few // Maybe the resampler didn't consume everything. Maybe the last few
// bytes we read were half a frame. Either way, we need to calculate the // bytes we read were half a frame. Either way, we need to calculate the
@ -160,19 +178,16 @@ auto SampleConverter::Main() -> void {
leftover_bytes_ = bytes_in_buffer - bytes_used; leftover_bytes_ = bytes_in_buffer - bytes_used;
if (leftover_bytes_ > 0) { if (leftover_bytes_ > 0) {
std::memmove(input_buffer_as_bytes_.data(), std::memmove(input_buffer_as_bytes_.data(),
input_buffer_as_bytes_.data() + bytes_used, input_buffer_as_bytes_.data() + bytes_used, leftover_bytes_);
leftover_bytes_);
}
} }
} }
} }
auto SampleConverter::HandleSamples(cpp::span<sample::Sample> input, auto SampleConverter::handleSamples(cpp::span<sample::Sample> input) -> size_t {
bool is_eos) -> size_t {
if (source_format_ == target_format_) { if (source_format_ == target_format_) {
// The happiest possible case: the input format matches the output // The happiest possible case: the input format matches the output
// format already. // format already.
SendToSink(input); sendToSink(input);
return input.size(); return input.size();
} }
@ -190,7 +205,7 @@ auto SampleConverter::HandleSamples(cpp::span<sample::Sample> input,
size_t read, written; size_t read, written;
std::tie(read, written) = resampler_->Process(input.subspan(samples_used), std::tie(read, written) = resampler_->Process(input.subspan(samples_used),
resampled_buffer_, is_eos); resampled_buffer_, false);
samples_used += read; samples_used += read;
if (read == 0 && written == 0) { if (read == 0 && written == 0) {
@ -203,18 +218,40 @@ auto SampleConverter::HandleSamples(cpp::span<sample::Sample> input,
samples_used = input.size(); samples_used = input.size();
} }
SendToSink(output_source); sendToSink(output_source);
} }
return samples_used; return samples_used;
} }
auto SampleConverter::SendToSink(cpp::span<sample::Sample> samples) -> void { auto SampleConverter::handleEndStream() -> void {
if (resampler_) {
size_t read, written;
std::tie(read, written) = resampler_->Process({}, resampled_buffer_, true);
if (written > 0) {
sendToSink(resampled_buffer_.first(written));
}
}
// Send a final update to finish off this stream's samples.
if (samples_sunk_ > 0) {
events::Audio().Dispatch(internal::StreamUpdate{
.samples_sunk = samples_sunk_,
});
samples_sunk_ = 0;
}
events::Audio().Dispatch(internal::StreamEnded{});
}
auto SampleConverter::sendToSink(cpp::span<sample::Sample> samples) -> void {
// Update the number of samples sunk so far *before* actually sinking them, // Update the number of samples sunk so far *before* actually sinking them,
// since writing to the stream buffer will block when the buffer gets full. // since writing to the stream buffer will block when the buffer gets full.
samples_sunk_ += samples.size(); samples_sunk_ += samples.size();
if (samples_sunk_ >= if (samples_sunk_ >=
target_format_.sample_rate * target_format_.num_channels) { target_format_.sample_rate * target_format_.num_channels) {
events::Audio().Dispatch(internal::ConverterProgress{ events::Audio().Dispatch(internal::StreamUpdate{
.samples_sunk = samples_sunk_, .samples_sunk = samples_sunk_,
}); });
samples_sunk_ = 0; samples_sunk_ = 0;

@ -72,7 +72,6 @@ void Decoder::Main() {
for (;;) { for (;;) {
if (source_->HasNewStream() || !stream_) { if (source_->HasNewStream() || !stream_) {
std::shared_ptr<TaggedStream> new_stream = source_->NextStream(); std::shared_ptr<TaggedStream> new_stream = source_->NextStream();
ESP_LOGI(kTag, "decoder has new stream");
if (new_stream && BeginDecoding(new_stream)) { if (new_stream && BeginDecoding(new_stream)) {
stream_ = new_stream; stream_ = new_stream;
} else { } else {
@ -91,8 +90,7 @@ auto Decoder::BeginDecoding(std::shared_ptr<TaggedStream> stream) -> bool {
codec_.reset(); codec_.reset();
codec_.reset(codecs::CreateCodecForType(stream->type()).value_or(nullptr)); codec_.reset(codecs::CreateCodecForType(stream->type()).value_or(nullptr));
if (!codec_) { if (!codec_) {
ESP_LOGE(kTag, "no codec found"); ESP_LOGE(kTag, "no codec found for stream");
events::Audio().Dispatch(internal::DecoderError{});
return false; return false;
} }
@ -100,7 +98,6 @@ auto Decoder::BeginDecoding(std::shared_ptr<TaggedStream> stream) -> bool {
if (open_res.has_error()) { if (open_res.has_error()) {
ESP_LOGE(kTag, "codec failed to start: %s", ESP_LOGE(kTag, "codec failed to start: %s",
codecs::ICodec::ErrorString(open_res.error()).c_str()); codecs::ICodec::ErrorString(open_res.error()).c_str());
events::Audio().Dispatch(internal::DecoderError{});
return false; return false;
} }
stream->SetPreambleFinished(); stream->SetPreambleFinished();
@ -110,24 +107,21 @@ auto Decoder::BeginDecoding(std::shared_ptr<TaggedStream> stream) -> bool {
.bits_per_sample = 16, .bits_per_sample = 16,
}; };
ESP_LOGI(kTag, "stream started ok");
std::optional<uint32_t> duration; std::optional<uint32_t> duration;
if (open_res->total_samples) { if (open_res->total_samples) {
duration = open_res->total_samples.value() / open_res->num_channels / duration = open_res->total_samples.value() / open_res->num_channels /
open_res->sample_rate_hz; open_res->sample_rate_hz;
} }
events::Audio().Dispatch(internal::DecoderOpened{ converter_->beginStream(std::make_shared<TrackInfo>(TrackInfo{
.track = std::make_shared<TrackInfo>(TrackInfo{
.tags = stream->tags(), .tags = stream->tags(),
.uri = stream->Filepath(), .uri = stream->Filepath(),
.duration = duration, .duration = duration,
.start_offset = stream->Offset(), .start_offset = stream->Offset(),
.bitrate_kbps = open_res->sample_rate_hz, .bitrate_kbps = open_res->sample_rate_hz,
.encoding = stream->type(), .encoding = stream->type(),
}), .format = *current_sink_format_,
}); }));
return true; return true;
} }
@ -135,18 +129,16 @@ auto Decoder::BeginDecoding(std::shared_ptr<TaggedStream> stream) -> bool {
auto Decoder::ContinueDecoding() -> bool { auto Decoder::ContinueDecoding() -> bool {
auto res = codec_->DecodeTo(codec_buffer_); auto res = codec_->DecodeTo(codec_buffer_);
if (res.has_error()) { if (res.has_error()) {
events::Audio().Dispatch(internal::DecoderError{}); converter_->endStream();
return true; return true;
} }
if (res->samples_written > 0) { if (res->samples_written > 0) {
converter_->ConvertSamples(codec_buffer_.first(res->samples_written), converter_->continueStream(codec_buffer_.first(res->samples_written));
current_sink_format_.value(),
res->is_stream_finished);
} }
if (res->is_stream_finished) { if (res->is_stream_finished) {
events::Audio().Dispatch(internal::DecoderClosed{}); converter_->endStream();
codec_.reset(); codec_.reset();
} }

@ -55,9 +55,9 @@ std::shared_ptr<BluetoothAudioOutput> AudioState::sBtOutput;
std::shared_ptr<IAudioOutput> AudioState::sOutput; std::shared_ptr<IAudioOutput> AudioState::sOutput;
// Two seconds of samples for two channels, at a representative sample rate. // Two seconds of samples for two channels, at a representative sample rate.
constexpr size_t kDrainLatencySamples = 48000; constexpr size_t kDrainLatencySamples = 48000 * 2 * 2;
constexpr size_t kDrainBufferSize = constexpr size_t kDrainBufferSize =
sizeof(sample::Sample) * kDrainLatencySamples * 4; sizeof(sample::Sample) * kDrainLatencySamples;
StreamBufferHandle_t AudioState::sDrainBuffer; StreamBufferHandle_t AudioState::sDrainBuffer;
@ -151,33 +151,24 @@ void AudioState::react(const TogglePlayPause& ev) {
} }
} }
void AudioState::react(const internal::DecoderOpened& ev) { void AudioState::react(const internal::StreamStarted& ev) {
ESP_LOGI(kTag, "decoder opened %s", ev.track->uri.c_str()); sCurrentFormat = ev.dst_format;
sIsResampling = ev.src_format != ev.dst_format;
sNextTrack = ev.track; sNextTrack = ev.track;
sNextTrackCueSamples = sCurrentSamples + kDrainLatencySamples; sNextTrackCueSamples = sCurrentSamples + kDrainLatencySamples;
}
void AudioState::react(const internal::DecoderClosed&) { ESP_LOGI(kTag, "new stream %s %u ch @ %lu hz (resample=%i)",
ESP_LOGI(kTag, "decoder closed"); ev.track->uri.c_str(), sCurrentFormat->num_channels,
// FIXME: only when we were playing the current track sCurrentFormat->sample_rate, sIsResampling);
sServices->track_queue().finish();
} }
void AudioState::react(const internal::DecoderError&) { void AudioState::react(const internal::StreamEnded&) {
ESP_LOGW(kTag, "decoder errored"); ESP_LOGI(kTag, "stream ended");
// FIXME: only when we were playing the current track // FIXME: only when we were playing the current track
sServices->track_queue().finish(); sServices->track_queue().finish();
} }
void AudioState::react(const internal::ConverterConfigurationChanged& ev) { void AudioState::react(const internal::StreamUpdate& ev) {
sCurrentFormat = ev.dst_format;
sIsResampling = ev.src_format != ev.dst_format;
ESP_LOGI(kTag, "output format now %u ch @ %lu hz (resample=%i)",
sCurrentFormat->num_channels, sCurrentFormat->sample_rate,
sIsResampling);
}
void AudioState::react(const internal::ConverterProgress& ev) {
ESP_LOGI(kTag, "sample converter sunk %lu samples", ev.samples_sunk); ESP_LOGI(kTag, "sample converter sunk %lu samples", ev.samples_sunk);
sCurrentSamples += ev.samples_sunk; sCurrentSamples += ev.samples_sunk;

@ -10,6 +10,7 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include "audio_events.hpp"
#include "audio_sink.hpp" #include "audio_sink.hpp"
#include "audio_source.hpp" #include "audio_source.hpp"
#include "codec.hpp" #include "codec.hpp"
@ -31,20 +32,23 @@ class SampleConverter {
auto SetOutput(std::shared_ptr<IAudioOutput>) -> void; auto SetOutput(std::shared_ptr<IAudioOutput>) -> void;
auto ConvertSamples(cpp::span<sample::Sample>, auto beginStream(std::shared_ptr<TrackInfo>) -> void;
const IAudioOutput::Format& format, auto continueStream(cpp::span<sample::Sample>) -> void;
bool is_eos) -> void; auto endStream() -> void;
private: private:
auto Main() -> void; auto Main() -> void;
auto SetTargetFormat(const IAudioOutput::Format& format) -> void; auto handleBeginStream(std::shared_ptr<TrackInfo>) -> void;
auto HandleSamples(cpp::span<sample::Sample>, bool) -> size_t; auto handleContinueStream(size_t samples_available) -> void;
auto handleEndStream() -> void;
auto SendToSink(cpp::span<sample::Sample>) -> void; auto handleSamples(cpp::span<sample::Sample>) -> size_t;
auto sendToSink(cpp::span<sample::Sample>) -> void;
struct Args { struct Args {
IAudioOutput::Format format; std::shared_ptr<TrackInfo>* track;
size_t samples_available; size_t samples_available;
bool is_end_of_stream; bool is_end_of_stream;
}; };

@ -51,6 +51,8 @@ struct TrackInfo {
/* The encoded format of the this track. */ /* The encoded format of the this track. */
codecs::StreamType encoding; codecs::StreamType encoding;
IAudioOutput::Format format;
}; };
/* /*
@ -136,23 +138,18 @@ struct OutputModeChanged : tinyfsm::Event {};
namespace internal { namespace internal {
struct DecoderOpened : tinyfsm::Event { struct StreamStarted : tinyfsm::Event {
std::shared_ptr<TrackInfo> track; std::shared_ptr<TrackInfo> track;
};
struct DecoderClosed : tinyfsm::Event {};
struct DecoderError : tinyfsm::Event {};
struct ConverterConfigurationChanged : tinyfsm::Event {
IAudioOutput::Format src_format; IAudioOutput::Format src_format;
IAudioOutput::Format dst_format; IAudioOutput::Format dst_format;
}; };
struct ConverterProgress : tinyfsm::Event { struct StreamUpdate : tinyfsm::Event {
uint32_t samples_sunk; uint32_t samples_sunk;
}; };
struct StreamEnded : tinyfsm::Event {};
} // namespace internal } // namespace internal
} // namespace audio } // namespace audio

@ -46,12 +46,9 @@ class AudioState : public tinyfsm::Fsm<AudioState> {
void react(const SetTrack&); void react(const SetTrack&);
void react(const TogglePlayPause&); void react(const TogglePlayPause&);
void react(const internal::DecoderOpened&); void react(const internal::StreamStarted&);
void react(const internal::DecoderClosed&); void react(const internal::StreamUpdate&);
void react(const internal::DecoderError&); void react(const internal::StreamEnded&);
void react(const internal::ConverterConfigurationChanged&);
void react(const internal::ConverterProgress&);
void react(const StepUpVolume&); void react(const StepUpVolume&);
void react(const StepDownVolume&); void react(const StepDownVolume&);

@ -392,7 +392,11 @@ void UiState::react(const audio::QueueUpdate&) {
} }
void UiState::react(const audio::PlaybackUpdate& ev) { void UiState::react(const audio::PlaybackUpdate& ev) {
if (ev.current_track) {
sPlaybackTrack.Update(*ev.current_track); sPlaybackTrack.Update(*ev.current_track);
} else {
sPlaybackTrack.Update(std::monostate{});
}
sPlaybackPlaying.Update(!ev.paused); sPlaybackPlaying.Update(!ev.paused);
sPlaybackPosition.Update(static_cast<int>(ev.track_position.value_or(0))); sPlaybackPosition.Update(static_cast<int>(ev.track_position.value_or(0)));
} }

Loading…
Cancel
Save