diff --git a/src/drivers/include/drivers/pcm_buffer.hpp b/src/drivers/include/drivers/pcm_buffer.hpp index 6630f720..8f53317e 100644 --- a/src/drivers/include/drivers/pcm_buffer.hpp +++ b/src/drivers/include/drivers/pcm_buffer.hpp @@ -28,8 +28,12 @@ class PcmBuffer { PcmBuffer(size_t size_in_samples); ~PcmBuffer(); - /* Adds samples to the buffer. */ - auto send(std::span) -> void; + /* + * Adds samples to the buffer. Returns the number of samples that were added, + * which may be less than the number of samples given if this PcmBuffer is + * close to full. + */ + auto send(std::span) -> size_t; /* * Fills the given span with samples. If enough samples are available in diff --git a/src/drivers/pcm_buffer.cpp b/src/drivers/pcm_buffer.cpp index 25762c50..071f5cea 100644 --- a/src/drivers/pcm_buffer.cpp +++ b/src/drivers/pcm_buffer.cpp @@ -17,6 +17,7 @@ #include "freertos/FreeRTOS.h" #include "esp_heap_caps.h" +#include "freertos/projdefs.h" #include "freertos/ringbuf.h" #include "portmacro.h" @@ -39,9 +40,13 @@ PcmBuffer::~PcmBuffer() { heap_caps_free(buf_); } -auto PcmBuffer::send(std::span data) -> void { - xRingbufferSend(ringbuf_, data.data(), data.size_bytes(), portMAX_DELAY); +auto PcmBuffer::send(std::span data) -> size_t { + if (!xRingbufferSend(ringbuf_, data.data(), data.size_bytes(), + pdMS_TO_TICKS(100))) { + return 0; + } sent_ += data.size(); + return data.size(); } IRAM_ATTR auto PcmBuffer::receive(std::span dest, bool isr) diff --git a/src/tangara/audio/audio_decoder.cpp b/src/tangara/audio/audio_decoder.cpp index ee06d984..992444f0 100644 --- a/src/tangara/audio/audio_decoder.cpp +++ b/src/tangara/audio/audio_decoder.cpp @@ -48,7 +48,7 @@ static const char* kTag = "decoder"; * increasing its size. */ static constexpr std::size_t kCodecBufferLength = - drivers::kI2SBufferLengthFrames * sizeof(sample::Sample); + drivers::kI2SBufferLengthFrames * 2; auto Decoder::Start(std::shared_ptr sink) -> Decoder* { Decoder* task = new Decoder(sink); @@ -78,11 +78,17 @@ Decoder::Decoder(std::shared_ptr processor) * Main decoding loop. Handles watching for new streams, or continuing to nudge * along the current stream if we have one. */ +IRAM_ATTR void Decoder::Main() { for (;;) { - // Check whether there's a new stream to begin. If we're idle, then we - // simply park and wait forever for a stream to arrive. - TickType_t wait_time = stream_ ? 0 : portMAX_DELAY; + // How long should we spend waiting for a command? By default, assume we're + // idle and wait forever. + TickType_t wait_time = portMAX_DELAY; + if (!leftover_samples_.empty() || stream_) { + // If we have work to do, then don't block waiting for a new stream. + wait_time = 0; + } + NextStream* next; if (xQueueReceive(next_stream_, &next, wait_time)) { // Copy the data out of the queue, then clean up the item. @@ -103,8 +109,15 @@ void Decoder::Main() { // Start decoding the new stream. prepareDecode(new_stream); + + // Keep handling commands until the command queue is empty. + continue; } + // We should always have a stream if we returned from xQueueReceive without + // receiving a new stream. + assert(stream_); + if (!continueDecode()) { finishDecode(false); } @@ -167,16 +180,36 @@ auto Decoder::prepareDecode(std::shared_ptr stream) -> void { } auto Decoder::continueDecode() -> bool { + // First, see if we have any samples from a previous decode that still need + // to be sent. + if (!leftover_samples_.empty()) { + leftover_samples_ = processor_->continueStream(leftover_samples_); + return true; + } + + // We might have already cleaned up the codec if the last decode pass of the + // stream resulted in leftoverr samples. + if (!codec_) { + return false; + } + auto res = codec_->DecodeTo(codec_buffer_); if (res.has_error()) { return false; } if (res->samples_written > 0) { - processor_->continueStream(codec_buffer_.first(res->samples_written)); + leftover_samples_ = + processor_->continueStream(codec_buffer_.first(res->samples_written)); + } + + if (res->is_stream_finished) { + // The codec has finished, so make sure we don't call it again. + codec_.reset(); } - return !res->is_stream_finished; + // We're done iff the codec has finished and we sent everything. + return codec_ || !leftover_samples_.empty(); } auto Decoder::finishDecode(bool cancel) -> void { @@ -191,6 +224,7 @@ auto Decoder::finishDecode(bool cancel) -> void { processor_->endStream(cancel); // Clean up after ourselves. + leftover_samples_ = {}; stream_.reset(); codec_.reset(); track_.reset(); diff --git a/src/tangara/audio/audio_decoder.hpp b/src/tangara/audio/audio_decoder.hpp index 64561d9d..9f20ec59 100644 --- a/src/tangara/audio/audio_decoder.hpp +++ b/src/tangara/audio/audio_decoder.hpp @@ -15,6 +15,7 @@ #include "audio/processor.hpp" #include "codec.hpp" #include "database/track.hpp" +#include "sample.hpp" #include "types.hpp" namespace audio { @@ -55,6 +56,7 @@ class Decoder { std::shared_ptr track_; std::span codec_buffer_; + std::span leftover_samples_; }; } // namespace audio diff --git a/src/tangara/audio/audio_fsm.cpp b/src/tangara/audio/audio_fsm.cpp index 16c16002..54ea5b6c 100644 --- a/src/tangara/audio/audio_fsm.cpp +++ b/src/tangara/audio/audio_fsm.cpp @@ -216,6 +216,7 @@ void AudioState::react(const internal::StreamStarted& ev) { } sStreamCues.addCue(ev.track, ev.cue_at_sample); + sStreamCues.update(sDrainBuffer->totalReceived()); if (!sIsPaused && !is_in_state()) { transit(); diff --git a/src/tangara/audio/processor.cpp b/src/tangara/audio/processor.cpp index 81858110..29124232 100644 --- a/src/tangara/audio/processor.cpp +++ b/src/tangara/audio/processor.cpp @@ -10,60 +10,57 @@ #include #include #include +#include #include #include -#include "audio/audio_events.hpp" -#include "audio/audio_sink.hpp" -#include "drivers/i2s_dac.hpp" -#include "drivers/pcm_buffer.hpp" +#include "assert.h" #include "esp_heap_caps.h" #include "esp_log.h" -#include "events/event_queue.hpp" +#include "esp_timer.h" #include "freertos/portmacro.h" #include "freertos/projdefs.h" +#include "audio/audio_events.hpp" +#include "audio/audio_sink.hpp" +#include "audio/i2s_audio_output.hpp" #include "audio/resample.hpp" +#include "drivers/i2s_dac.hpp" +#include "drivers/pcm_buffer.hpp" +#include "events/event_queue.hpp" #include "sample.hpp" #include "tasks.hpp" [[maybe_unused]] static constexpr char kTag[] = "mixer"; -static constexpr std::size_t kSampleBufferLength = - drivers::kI2SBufferLengthFrames * sizeof(sample::Sample) * 2; -static constexpr std::size_t kSourceBufferLength = kSampleBufferLength * 2; +static const size_t kSampleBufferLength = drivers::kI2SBufferLengthFrames * 2; +static const size_t kSourceBufferLength = kSampleBufferLength * 2; namespace audio { +static const I2SAudioOutput::Format kTargetFormat{ + .sample_rate = 48000, + .num_channels = 2, + .bits_per_sample = 16, +}; + SampleProcessor::SampleProcessor(drivers::PcmBuffer& sink) - : commands_(xQueueCreate(1, sizeof(Args))), - resampler_(nullptr), - source_(xStreamBufferCreateWithCaps(kSourceBufferLength, - sizeof(sample::Sample) * 2, + : commands_(xQueueCreate(2, sizeof(Args))), + source_(xStreamBufferCreateWithCaps(kSourceBufferLength + 1, + sizeof(sample::Sample), MALLOC_CAP_DMA)), sink_(sink), - leftover_bytes_(0) { - input_buffer_ = { - reinterpret_cast(heap_caps_calloc( - kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_DMA)), - kSampleBufferLength}; - input_buffer_as_bytes_ = {reinterpret_cast(input_buffer_.data()), - input_buffer_.size_bytes()}; - - resampled_buffer_ = { - reinterpret_cast(heap_caps_calloc( - kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_DMA)), - kSampleBufferLength}; - + unprocessed_samples_(0) { tasks::StartPersistent([&]() { Main(); }); } SampleProcessor::~SampleProcessor() { vQueueDelete(commands_); - vStreamBufferDelete(source_); + vStreamBufferDeleteWithCaps(source_); } auto SampleProcessor::SetOutput(std::shared_ptr output) -> void { + output->Configure(kTargetFormat); // FIXME: We should add synchronisation here, but we should be careful // about not impacting performance given that the output will change only // very rarely (if ever). @@ -80,15 +77,30 @@ auto SampleProcessor::beginStream(std::shared_ptr track) -> void { xQueueSend(commands_, &args, portMAX_DELAY); } -auto SampleProcessor::continueStream(std::span input) -> void { +auto SampleProcessor::continueStream(std::span input) + -> std::span { + size_t bytes_sent = xStreamBufferSend(source_, input.data(), + input.size_bytes(), pdMS_TO_TICKS(100)); + if (!bytes_sent) { + // If nothing could be sent, then bail out early. We don't want to send a + // samples_available command with zero samples. + return input; + } + + // We should only ever be placing whole samples into the buffer. If half + // samples start being sent, then this indicates a serious bug somewhere. + size_t samples_sent = bytes_sent / sizeof(sample::Sample); + assert(samples_sent * sizeof(sample::Sample) == bytes_sent); + Args args{ .track = nullptr, - .samples_available = input.size(), + .samples_available = samples_sent, .is_end_of_stream = false, .clear_buffers = false, }; xQueueSend(commands_, &args, portMAX_DELAY); - xStreamBufferSend(source_, input.data(), input.size_bytes(), portMAX_DELAY); + + return input.subspan(samples_sent); } auto SampleProcessor::endStream(bool cancelled) -> void { @@ -101,152 +113,281 @@ auto SampleProcessor::endStream(bool cancelled) -> void { xQueueSend(commands_, &args, portMAX_DELAY); } +IRAM_ATTR auto SampleProcessor::Main() -> void { for (;;) { + // Block indefinitely if the processor is idle. Otherwise check briefly for + // new commands, then continue processing. + TickType_t wait = hasPendingWork() ? 0 : portMAX_DELAY; + Args args; - while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { + if (xQueueReceive(commands_, &args, wait)) { + if (args.is_end_of_stream && args.clear_buffers) { + // The new command is telling us to clear our buffers! This includes + // discarding any commands that have backed up without being processed. + // Discard all the old commands, then immediately handle the end of + // stream. + while (!pending_commands_.empty()) { + Args discard = pending_commands_.front(); + pending_commands_.pop_front(); + discardCommand(discard); + } + handleEndStream(true); + } else { + pending_commands_.push_back(args); + } } - if (args.track) { - handleBeginStream(*args.track); - delete args.track; + // We need to finish flushing all processed samples before we can process + // more samples. + if (!output_buffer_.isEmpty() && flushOutputBuffer()) { + continue; } - if (args.samples_available) { - handleContinueStream(args.samples_available); + + // We need to finish processing all the samples we've been told about + // before we handle backed up commands. + if (unprocessed_samples_ && !processSamples(false)) { + continue; } - if (args.is_end_of_stream) { - handleEndStream(args.clear_buffers); + + while (!pending_commands_.empty()) { + args = pending_commands_.front(); + pending_commands_.pop_front(); + + if (args.track) { + handleBeginStream(*args.track); + delete args.track; + } + if (args.samples_available) { + unprocessed_samples_ += args.samples_available; + } + if (args.is_end_of_stream) { + if (processSamples(true) || args.clear_buffers) { + handleEndStream(args.clear_buffers); + } else { + // The output filled up while we were trying to flush the last + // samples of this stream, and we haven't been told to clear our + // buffers. Retry handling this command later. + pending_commands_.push_front(args); + break; + } + } } } } auto SampleProcessor::handleBeginStream(std::shared_ptr track) -> void { - if (track->format != source_format_) { - source_format_ = track->format; - // The new stream has a different format to the previous stream (or there - // was no previous stream). - // First, clean up our filters. - resampler_.reset(); - leftover_bytes_ = 0; - - // If the output is idle, then we can reconfigure it to the closest format - // to our new source. - // If the output *wasn't* idle, then we can't reconfigure without an - // audible gap in playback. So instead, we simply keep the same target - // format and begin resampling. - if (sink_.isEmpty()) { - target_format_ = output_->PrepareFormat(track->format); - output_->Configure(target_format_); + // If the new stream's sample rate doesn't match our canonical sample rate, + // then prepare to start resampling. + if (track->format.sample_rate != kTargetFormat.sample_rate) { + ESP_LOGI(kTag, "resampling %lu -> %lu", track->format.sample_rate, + kTargetFormat.sample_rate); + if (!resampler_ || resampler_->sourceRate() != track->format.sample_rate) { + // If there's already a resampler instance for this source rate, then + // reuse it to help gapless playback work smoothly. + resampler_.reset(new Resampler(track->format.sample_rate, + kTargetFormat.sample_rate, + track->format.num_channels)); } + } else { + resampler_.reset(); } + // If the new stream has only one channel, then we double it to get stereo + // audio. + // FIXME: If the Bluetooth stack allowed us to configure the number of + // channels, we could remove this. + double_samples_ = track->format.num_channels != kTargetFormat.num_channels; + events::Audio().Dispatch(internal::StreamStarted{ .track = track, - .sink_format = target_format_, + .sink_format = kTargetFormat, .cue_at_sample = sink_.totalSent(), }); } -auto SampleProcessor::handleContinueStream(size_t samples_available) -> void { - // Loop until we finish reading all the bytes indicated. There might be - // leftovers from each iteration, and from this process as a whole, - // depending on the resampling stage. - size_t bytes_read = 0; - size_t bytes_to_read = samples_available * sizeof(sample::Sample); - while (bytes_read < bytes_to_read) { - // First top up the input buffer, taking care not to overwrite anything - // remaining from a previous iteration. - size_t bytes_read_this_it = xStreamBufferReceive( - source_, input_buffer_as_bytes_.subspan(leftover_bytes_).data(), - std::min(input_buffer_as_bytes_.size() - leftover_bytes_, - bytes_to_read - bytes_read), - portMAX_DELAY); - bytes_read += bytes_read_this_it; - - // Calculate the number of whole samples that are now in the input buffer. - size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_; - size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample); - - size_t samples_used = handleSamples(input_buffer_.first(samples_in_buffer)); - - // Maybe the resampler didn't consume everything. Maybe the last few - // bytes we read were half a frame. Either way, we need to calculate the - // size of the remainder in bytes, then move it to the front of our - // buffer. - size_t bytes_used = samples_used * sizeof(sample::Sample); - assert(bytes_used <= bytes_in_buffer); - - leftover_bytes_ = bytes_in_buffer - bytes_used; - if (leftover_bytes_ > 0) { - std::memmove(input_buffer_as_bytes_.data(), - input_buffer_as_bytes_.data() + bytes_used, leftover_bytes_); - } - } -} +IRAM_ATTR +auto SampleProcessor::processSamples(bool finalise) -> bool { + for (;;) { + bool out_of_work = true; -auto SampleProcessor::handleSamples(std::span input) -> size_t { - if (source_format_ == target_format_) { - // The happiest possible case: the input format matches the output - // format already. - sink_.send(input); - return input.size(); - } + // First, fill up our input buffer with samples. + if (unprocessed_samples_ > 0) { + out_of_work = false; + auto input = input_buffer_.writeAcquire(); - size_t samples_used = 0; - while (samples_used < input.size()) { - std::span output_source; - if (source_format_.sample_rate != target_format_.sample_rate) { - if (resampler_ == nullptr) { - ESP_LOGI(kTag, "creating new resampler for %lu -> %lu", - source_format_.sample_rate, target_format_.sample_rate); - resampler_.reset(new Resampler(source_format_.sample_rate, - target_format_.sample_rate, - source_format_.num_channels)); - } + size_t bytes_received = xStreamBufferReceive( + source_, input.data(), + std::min(input.size_bytes(), + unprocessed_samples_ * sizeof(sample::Sample)), + 0); - size_t read, written; - std::tie(read, written) = resampler_->Process(input.subspan(samples_used), - resampled_buffer_, false); - samples_used += read; + // We should never receive a half sample. Blow up immediately if we do. + size_t samples_received = bytes_received / sizeof(sample::Sample); + assert(samples_received * sizeof(sample::Sample) == bytes_received); - if (read == 0 && written == 0) { - // Zero samples used or written. We need more input. - break; - } - output_source = resampled_buffer_.first(written); - } else { - output_source = input; - samples_used = input.size(); + unprocessed_samples_ -= samples_received; + input_buffer_.writeCommit(samples_received); } - sink_.send(output_source); - } + // Next, push input samples through the resampler. In the best case, this + // is a simple copy operation. + if (!input_buffer_.isEmpty()) { + out_of_work = false; + auto resample_input = input_buffer_.readAcquire(); + auto resample_output = resampled_buffer_.writeAcquire(); + + size_t read, wrote; + if (resampler_) { + std::tie(read, wrote) = + resampler_->Process(resample_input, resample_output, finalise); + } else { + read = wrote = std::min(resample_input.size(), resample_output.size()); + std::copy_n(resample_input.begin(), read, resample_output.begin()); + } - return samples_used; -} + input_buffer_.readCommit(read); + resampled_buffer_.writeCommit(wrote); + } -auto SampleProcessor::handleEndStream(bool clear_bufs) -> void { - if (resampler_ && !clear_bufs) { - size_t read, written; - std::tie(read, written) = resampler_->Process({}, resampled_buffer_, true); + // Next, we need to make sure the output is in stereo. This is also a simple + // copy in the best case. + if (!resampled_buffer_.isEmpty()) { + out_of_work = false; + auto channels_input = resampled_buffer_.readAcquire(); + auto channels_output = output_buffer_.writeAcquire(); + size_t read, wrote; + if (double_samples_) { + wrote = channels_output.size(); + read = wrote / 2; + if (read > channels_input.size()) { + read = channels_input.size(); + wrote = read * 2; + } + for (size_t i = 0; i < read; i++) { + channels_output[i * 2] = channels_input[i]; + channels_output[(i * 2) + 1] = channels_input[i]; + } + } else { + read = wrote = std::min(channels_input.size(), channels_output.size()); + std::copy_n(channels_input.begin(), read, channels_output.begin()); + } + resampled_buffer_.readCommit(read); + output_buffer_.writeCommit(wrote); + } - if (written > 0) { - sink_.send(resampled_buffer_.first(written)); + // Finally, flush whatever ended up in the output buffer. + if (flushOutputBuffer()) { + if (out_of_work) { + return true; + } + } else { + // The output is congested. Back off of processing for a moment. + return false; } } +} +auto SampleProcessor::handleEndStream(bool clear_bufs) -> void { if (clear_bufs) { sink_.clear(); - } - // FIXME: This discards any leftover samples, but there probably shouldn't be - // any leftover samples. Can this be an assert instead? - leftover_bytes_ = 0; + input_buffer_.clear(); + resampled_buffer_.clear(); + output_buffer_.clear(); + + size_t bytes_discarded = 0; + size_t bytes_to_discard = unprocessed_samples_ * sizeof(sample::Sample); + auto scratch_buf = output_buffer_.writeAcquire(); + while (bytes_discarded < bytes_to_discard) { + size_t bytes_read = + xStreamBufferReceive(source_, scratch_buf.data(), + std::min(scratch_buf.size_bytes(), + bytes_to_discard - bytes_discarded), + 0); + bytes_discarded += bytes_read; + } + unprocessed_samples_ = 0; + } events::Audio().Dispatch(internal::StreamEnded{ .cue_at_sample = sink_.totalSent(), }); } +auto SampleProcessor::hasPendingWork() -> bool { + return !pending_commands_.empty() || unprocessed_samples_ > 0 || + !input_buffer_.isEmpty() || !resampled_buffer_.isEmpty() || + !output_buffer_.isEmpty(); +} + +IRAM_ATTR +auto SampleProcessor::flushOutputBuffer() -> bool { + auto samples = output_buffer_.readAcquire(); + size_t sent = sink_.send(samples); + output_buffer_.readCommit(sent); + return output_buffer_.isEmpty(); +} + +auto SampleProcessor::discardCommand(Args& command) -> void { + if (command.track) { + delete command.track; + } + if (command.samples_available) { + unprocessed_samples_ += command.samples_available; + } + // End of stream commands can just be dropped. Without further actions. +} + +SampleProcessor::Buffer::Buffer() + : buffer_(reinterpret_cast( + heap_caps_calloc(kSampleBufferLength, + sizeof(sample::Sample), + MALLOC_CAP_DMA)), + kSampleBufferLength), + samples_in_buffer_() {} + +SampleProcessor::Buffer::~Buffer() { + heap_caps_free(buffer_.data()); +} + +auto SampleProcessor::Buffer::writeAcquire() -> std::span { + return buffer_.subspan(samples_in_buffer_.size()); +} + +auto SampleProcessor::Buffer::writeCommit(size_t samples) -> void { + if (samples == 0) { + return; + } + samples_in_buffer_ = buffer_.first(samples + samples_in_buffer_.size()); +} + +auto SampleProcessor::Buffer::readAcquire() -> std::span { + return samples_in_buffer_; +} + +auto SampleProcessor::Buffer::readCommit(size_t samples) -> void { + if (samples == 0) { + return; + } + samples_in_buffer_ = samples_in_buffer_.subspan(samples); + + // Move the leftover samples to the front of the buffer, so that we're setup + // for a new write. + if (!samples_in_buffer_.empty()) { + std::memmove(buffer_.data(), samples_in_buffer_.data(), + samples_in_buffer_.size_bytes()); + samples_in_buffer_ = buffer_.first(samples_in_buffer_.size()); + } +} + +auto SampleProcessor::Buffer::isEmpty() -> bool { + return samples_in_buffer_.empty(); +} + +auto SampleProcessor::Buffer::clear() -> void { + samples_in_buffer_ = {}; +} + } // namespace audio diff --git a/src/tangara/audio/processor.hpp b/src/tangara/audio/processor.hpp index 5c4ad0fa..f1b1d921 100644 --- a/src/tangara/audio/processor.hpp +++ b/src/tangara/audio/processor.hpp @@ -8,6 +8,8 @@ #include #include +#include +#include #include #include "audio/audio_events.hpp" @@ -33,18 +35,43 @@ class SampleProcessor { auto SetOutput(std::shared_ptr) -> void; + /* + * Signals to the sample processor that a new discrete stream of audio is now + * being sent. This will typically represent a new track being played. + */ auto beginStream(std::shared_ptr) -> void; - auto continueStream(std::span) -> void; + + /* + * Sends a span of PCM samples to the processor. Returns a subspan of the + * given span containing samples that were not able to be sent during this + * call, e.g. because of congestion downstream from the processor. + */ + auto continueStream(std::span) -> std::span; + + /* + * Signals to the sample processor that the current stream is ending. This + * can either be because the stream has naturally finished, or because it is + * being interrupted. + * If `cancelled` is false, the sample processor will ensure all previous + * samples are processed and sent before communicating the end of the stream + * onwards. If `cancelled` is true, any samples from the current stream that + * have not yet been played will be discarded. + */ auto endStream(bool cancelled) -> void; + SampleProcessor(const SampleProcessor&) = delete; + SampleProcessor& operator=(const SampleProcessor&) = delete; + private: auto Main() -> void; auto handleBeginStream(std::shared_ptr) -> void; - auto handleContinueStream(size_t samples_available) -> void; auto handleEndStream(bool cancel) -> void; - auto handleSamples(std::span) -> size_t; + auto processSamples(bool finalise) -> bool; + + auto hasPendingWork() -> bool; + auto flushOutputBuffer() -> bool; struct Args { std::shared_ptr* track; @@ -53,21 +80,44 @@ class SampleProcessor { bool clear_buffers; }; QueueHandle_t commands_; + std::list pending_commands_; - std::unique_ptr resampler_; + auto discardCommand(Args& command) -> void; StreamBufferHandle_t source_; drivers::PcmBuffer& sink_; - std::span input_buffer_; - std::span input_buffer_as_bytes_; + class Buffer { + public: + Buffer(); + ~Buffer(); + + auto writeAcquire() -> std::span; + auto writeCommit(size_t) -> void; + + auto readAcquire() -> std::span; + auto readCommit(size_t) -> void; - std::span resampled_buffer_; + auto isEmpty() -> bool; + auto clear() -> void; + + Buffer(const Buffer&) = delete; + Buffer& operator=(const Buffer&) = delete; + + private: + std::span buffer_; + std::span samples_in_buffer_; + }; + + Buffer input_buffer_; + Buffer resampled_buffer_; + Buffer output_buffer_; + + std::unique_ptr resampler_; + bool double_samples_; std::shared_ptr output_; - IAudioOutput::Format source_format_; - IAudioOutput::Format target_format_; - size_t leftover_bytes_; + size_t unprocessed_samples_; }; } // namespace audio diff --git a/src/tangara/audio/resample.cpp b/src/tangara/audio/resample.cpp index 143ce230..d6369022 100644 --- a/src/tangara/audio/resample.cpp +++ b/src/tangara/audio/resample.cpp @@ -4,6 +4,7 @@ * SPDX-License-Identifier: GPL-3.0-only */ #include "audio/resample.hpp" +#include #include #include @@ -31,6 +32,7 @@ Resampler::Resampler(uint32_t source_sample_rate, kQuality, &err_)), num_channels_(num_channels) { + speex_resampler_skip_zeros(resampler_); assert(err_ == 0); } @@ -38,18 +40,24 @@ Resampler::~Resampler() { speex_resampler_destroy(resampler_); } +auto Resampler::sourceRate() -> uint32_t { + uint32_t input = 0; + uint32_t output = 0; + speex_resampler_get_rate(resampler_, &input, &output); + return input; +} + auto Resampler::Process(std::span input, std::span output, bool end_of_data) -> std::pair { - uint32_t samples_used = input.size() / num_channels_; - uint32_t samples_produced = output.size() / num_channels_; + uint32_t frames_used = input.size() / num_channels_; + uint32_t frames_produced = output.size() / num_channels_; int err = speex_resampler_process_interleaved_int( - resampler_, input.data(), &samples_used, output.data(), - &samples_produced); + resampler_, input.data(), &frames_used, output.data(), &frames_produced); assert(err == 0); - return {samples_used * num_channels_, samples_produced * num_channels_}; + return {frames_used * num_channels_, frames_produced * num_channels_}; } } // namespace audio diff --git a/src/tangara/audio/resample.hpp b/src/tangara/audio/resample.hpp index 4d48d47f..df285020 100644 --- a/src/tangara/audio/resample.hpp +++ b/src/tangara/audio/resample.hpp @@ -6,6 +6,7 @@ #pragma once +#include #include #include #include @@ -24,6 +25,8 @@ class Resampler { ~Resampler(); + auto sourceRate() -> uint32_t; + auto Process(std::span input, std::span output, bool end_of_data) -> std::pair;