@ -10,60 +10,57 @@
# include <algorithm>
# include <algorithm>
# include <cmath>
# include <cmath>
# include <cstdint>
# include <cstdint>
# include <cstring>
# include <limits>
# include <limits>
# include <span>
# include <span>
# include "audio/audio_events.hpp"
# include "assert.h"
# include "audio/audio_sink.hpp"
# include "drivers/i2s_dac.hpp"
# include "drivers/pcm_buffer.hpp"
# include "esp_heap_caps.h"
# include "esp_heap_caps.h"
# include "esp_log.h"
# include "esp_log.h"
# include "events/event_queue.hpp "
# include "esp_timer.h"
# include "freertos/portmacro.h"
# include "freertos/portmacro.h"
# include "freertos/projdefs.h"
# include "freertos/projdefs.h"
# include "audio/audio_events.hpp"
# include "audio/audio_sink.hpp"
# include "audio/i2s_audio_output.hpp"
# include "audio/resample.hpp"
# include "audio/resample.hpp"
# include "drivers/i2s_dac.hpp"
# include "drivers/pcm_buffer.hpp"
# include "events/event_queue.hpp"
# include "sample.hpp"
# include "sample.hpp"
# include "tasks.hpp"
# include "tasks.hpp"
[[maybe_unused]] static constexpr char kTag [ ] = " mixer " ;
[[maybe_unused]] static constexpr char kTag [ ] = " mixer " ;
static constexpr std : : size_t kSampleBufferLength =
static const size_t kSampleBufferLength = drivers : : kI2SBufferLengthFrames * 2 ;
drivers : : kI2SBufferLengthFrames * sizeof ( sample : : Sample ) * 2 ;
static const size_t kSourceBufferLength = kSampleBufferLength * 2 ;
static constexpr std : : size_t kSourceBufferLength = kSampleBufferLength * 2 ;
namespace audio {
namespace audio {
static const I2SAudioOutput : : Format kTargetFormat {
. sample_rate = 48000 ,
. num_channels = 2 ,
. bits_per_sample = 16 ,
} ;
SampleProcessor : : SampleProcessor ( drivers : : PcmBuffer & sink )
SampleProcessor : : SampleProcessor ( drivers : : PcmBuffer & sink )
: commands_ ( xQueueCreate ( 1 , sizeof ( Args ) ) ) ,
: commands_ ( xQueueCreate ( 2 , sizeof ( Args ) ) ) ,
resampler_ ( nullptr ) ,
source_ ( xStreamBufferCreateWithCaps ( kSourceBufferLength + 1 ,
source_ ( xStreamBufferCreateWithCaps ( kSourceBufferLength ,
sizeof ( sample : : Sample ) ,
sizeof ( sample : : Sample ) * 2 ,
MALLOC_CAP_DMA ) ) ,
MALLOC_CAP_DMA ) ) ,
sink_ ( sink ) ,
sink_ ( sink ) ,
leftover_bytes_ ( 0 ) {
unprocessed_samples_ ( 0 ) {
input_buffer_ = {
reinterpret_cast < sample : : Sample * > ( heap_caps_calloc (
kSampleBufferLength , sizeof ( sample : : Sample ) , MALLOC_CAP_DMA ) ) ,
kSampleBufferLength } ;
input_buffer_as_bytes_ = { reinterpret_cast < std : : byte * > ( input_buffer_ . data ( ) ) ,
input_buffer_ . size_bytes ( ) } ;
resampled_buffer_ = {
reinterpret_cast < sample : : Sample * > ( heap_caps_calloc (
kSampleBufferLength , sizeof ( sample : : Sample ) , MALLOC_CAP_DMA ) ) ,
kSampleBufferLength } ;
tasks : : StartPersistent < tasks : : Type : : kAudioConverter > ( [ & ] ( ) { Main ( ) ; } ) ;
tasks : : StartPersistent < tasks : : Type : : kAudioConverter > ( [ & ] ( ) { Main ( ) ; } ) ;
}
}
SampleProcessor : : ~ SampleProcessor ( ) {
SampleProcessor : : ~ SampleProcessor ( ) {
vQueueDelete ( commands_ ) ;
vQueueDelete ( commands_ ) ;
vStreamBufferDelete ( source_ ) ;
vStreamBufferDeleteWithCaps ( source_ ) ;
}
}
auto SampleProcessor : : SetOutput ( std : : shared_ptr < IAudioOutput > output ) - > void {
auto SampleProcessor : : SetOutput ( std : : shared_ptr < IAudioOutput > output ) - > void {
output - > Configure ( kTargetFormat ) ;
// FIXME: We should add synchronisation here, but we should be careful
// FIXME: We should add synchronisation here, but we should be careful
// about not impacting performance given that the output will change only
// about not impacting performance given that the output will change only
// very rarely (if ever).
// very rarely (if ever).
@ -80,15 +77,30 @@ auto SampleProcessor::beginStream(std::shared_ptr<TrackInfo> track) -> void {
xQueueSend ( commands_ , & args , portMAX_DELAY ) ;
xQueueSend ( commands_ , & args , portMAX_DELAY ) ;
}
}
auto SampleProcessor : : continueStream ( std : : span < sample : : Sample > input ) - > void {
auto SampleProcessor : : continueStream ( std : : span < sample : : Sample > input )
- > std : : span < sample : : Sample > {
size_t bytes_sent = xStreamBufferSend ( source_ , input . data ( ) ,
input . size_bytes ( ) , pdMS_TO_TICKS ( 100 ) ) ;
if ( ! bytes_sent ) {
// If nothing could be sent, then bail out early. We don't want to send a
// samples_available command with zero samples.
return input ;
}
// We should only ever be placing whole samples into the buffer. If half
// samples start being sent, then this indicates a serious bug somewhere.
size_t samples_sent = bytes_sent / sizeof ( sample : : Sample ) ;
assert ( samples_sent * sizeof ( sample : : Sample ) = = bytes_sent ) ;
Args args {
Args args {
. track = nullptr ,
. track = nullptr ,
. samples_available = input . size ( ) ,
. samples_available = samples_sent ,
. is_end_of_stream = false ,
. is_end_of_stream = false ,
. clear_buffers = false ,
. clear_buffers = false ,
} ;
} ;
xQueueSend ( commands_ , & args , portMAX_DELAY ) ;
xQueueSend ( commands_ , & args , portMAX_DELAY ) ;
xStreamBufferSend ( source_ , input . data ( ) , input . size_bytes ( ) , portMAX_DELAY ) ;
return input . subspan ( samples_sent ) ;
}
}
auto SampleProcessor : : endStream ( bool cancelled ) - > void {
auto SampleProcessor : : endStream ( bool cancelled ) - > void {
@ -101,152 +113,281 @@ auto SampleProcessor::endStream(bool cancelled) -> void {
xQueueSend ( commands_ , & args , portMAX_DELAY ) ;
xQueueSend ( commands_ , & args , portMAX_DELAY ) ;
}
}
IRAM_ATTR
auto SampleProcessor : : Main ( ) - > void {
auto SampleProcessor : : Main ( ) - > void {
for ( ; ; ) {
for ( ; ; ) {
// Block indefinitely if the processor is idle. Otherwise check briefly for
// new commands, then continue processing.
TickType_t wait = hasPendingWork ( ) ? 0 : portMAX_DELAY ;
Args args ;
Args args ;
while ( ! xQueueReceive ( commands_ , & args , portMAX_DELAY ) ) {
if ( xQueueReceive ( commands_ , & args , wait ) ) {
if ( args . is_end_of_stream & & args . clear_buffers ) {
// The new command is telling us to clear our buffers! This includes
// discarding any commands that have backed up without being processed.
// Discard all the old commands, then immediately handle the end of
// stream.
while ( ! pending_commands_ . empty ( ) ) {
Args discard = pending_commands_ . front ( ) ;
pending_commands_ . pop_front ( ) ;
discardCommand ( discard ) ;
}
handleEndStream ( true ) ;
} else {
pending_commands_ . push_back ( args ) ;
}
}
}
// We need to finish flushing all processed samples before we can process
// more samples.
if ( ! output_buffer_ . isEmpty ( ) & & flushOutputBuffer ( ) ) {
continue ;
}
// We need to finish processing all the samples we've been told about
// before we handle backed up commands.
if ( unprocessed_samples_ & & ! processSamples ( false ) ) {
continue ;
}
while ( ! pending_commands_ . empty ( ) ) {
args = pending_commands_ . front ( ) ;
pending_commands_ . pop_front ( ) ;
if ( args . track ) {
if ( args . track ) {
handleBeginStream ( * args . track ) ;
handleBeginStream ( * args . track ) ;
delete args . track ;
delete args . track ;
}
}
if ( args . samples_available ) {
if ( args . samples_available ) {
handleContinueStream ( args . samples_available ) ;
unprocessed_samples_ + = args . samples_available ;
}
}
if ( args . is_end_of_stream ) {
if ( args . is_end_of_stream ) {
if ( processSamples ( true ) | | args . clear_buffers ) {
handleEndStream ( args . clear_buffers ) ;
handleEndStream ( args . clear_buffers ) ;
} else {
// The output filled up while we were trying to flush the last
// samples of this stream, and we haven't been told to clear our
// buffers. Retry handling this command later.
pending_commands_ . push_front ( args ) ;
break ;
}
}
}
}
}
}
}
}
auto SampleProcessor : : handleBeginStream ( std : : shared_ptr < TrackInfo > track )
auto SampleProcessor : : handleBeginStream ( std : : shared_ptr < TrackInfo > track )
- > void {
- > void {
if ( track - > format ! = source_format_ ) {
// If the new stream's sample rate doesn't match our canonical sample rate,
source_format_ = track - > format ;
// then prepare to start resampling.
// The new stream has a different format to the previous stream (or there
if ( track - > format . sample_rate ! = kTargetFormat . sample_rate ) {
// was no previous stream).
ESP_LOGI ( kTag , " resampling %lu -> %lu " , track - > format . sample_rate ,
// First, clean up our filters.
kTargetFormat . sample_rate ) ;
resampler_ . reset ( ) ;
if ( ! resampler_ | | resampler_ - > sourceRate ( ) ! = track - > format . sample_rate ) {
leftover_bytes_ = 0 ;
// If there's already a resampler instance for this source rate, then
// reuse it to help gapless playback work smoothly.
// If the output is idle, then we can reconfigure it to the closest format
resampler_ . reset ( new Resampler ( track - > format . sample_rate ,
// to our new source.
kTargetFormat . sample_rate ,
// If the output *wasn't* idle, then we can't reconfigure without an
track - > format . num_channels ) ) ;
// audible gap in playback. So instead, we simply keep the same target
// format and begin resampling.
if ( sink_ . isEmpty ( ) ) {
target_format_ = output_ - > PrepareFormat ( track - > format ) ;
output_ - > Configure ( target_format_ ) ;
}
}
} else {
resampler_ . reset ( ) ;
}
}
// If the new stream has only one channel, then we double it to get stereo
// audio.
// FIXME: If the Bluetooth stack allowed us to configure the number of
// channels, we could remove this.
double_samples_ = track - > format . num_channels ! = kTargetFormat . num_channels ;
events : : Audio ( ) . Dispatch ( internal : : StreamStarted {
events : : Audio ( ) . Dispatch ( internal : : StreamStarted {
. track = track ,
. track = track ,
. sink_format = target_format_ ,
. sink_format = kTargetFormat ,
. cue_at_sample = sink_ . totalSent ( ) ,
. cue_at_sample = sink_ . totalSent ( ) ,
} ) ;
} ) ;
}
}
auto SampleProcessor : : handleContinueStream ( size_t samples_available ) - > void {
IRAM_ATTR
// Loop until we finish reading all the bytes indicated. There might be
auto SampleProcessor : : processSamples ( bool finalise ) - > bool {
// leftovers from each iteration, and from this process as a whole,
for ( ; ; ) {
// depending on the resampling stage.
bool out_of_work = true ;
size_t bytes_read = 0 ;
size_t bytes_to_read = samples_available * sizeof ( sample : : Sample ) ;
// First, fill up our input buffer with samples.
while ( bytes_read < bytes_to_read ) {
if ( unprocessed_samples_ > 0 ) {
// First top up the input buffer, taking care not to overwrite anything
out_of_work = false ;
// remaining from a previous iteration.
auto input = input_buffer_ . writeAcquire ( ) ;
size_t bytes_read_this_it = xStreamBufferReceive (
source_ , input_buffer_as_bytes_ . subspan ( leftover_bytes_ ) . data ( ) ,
size_t bytes_received = xStreamBufferReceive (
std : : min ( input_buffer_as_bytes_ . size ( ) - leftover_bytes_ ,
source_ , input . data ( ) ,
bytes_to_read - bytes_read ) ,
std : : min ( input . size_bytes ( ) ,
portMAX_DELAY ) ;
unprocessed_samples_ * sizeof ( sample : : Sample ) ) ,
bytes_read + = bytes_read_this_it ;
0 ) ;
// Calculate the number of whole samples that are now in the input buffer.
// We should never receive a half sample. Blow up immediately if we do.
size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_ ;
size_t samples_received = bytes_received / sizeof ( sample : : Sample ) ;
size_t samples_in_buffer = bytes_in_buffer / sizeof ( sample : : Sample ) ;
assert ( samples_received * sizeof ( sample : : Sample ) = = bytes_received ) ;
size_t samples_used = handleSamples ( input_buffer_ . first ( samples_in_buffer ) ) ;
unprocessed_samples_ - = samples_received ;
input_buffer_ . writeCommit ( samples_received ) ;
// Maybe the resampler didn't consume everything. Maybe the last few
// bytes we read were half a frame. Either way, we need to calculate the
// size of the remainder in bytes, then move it to the front of our
// buffer.
size_t bytes_used = samples_used * sizeof ( sample : : Sample ) ;
assert ( bytes_used < = bytes_in_buffer ) ;
leftover_bytes_ = bytes_in_buffer - bytes_used ;
if ( leftover_bytes_ > 0 ) {
std : : memmove ( input_buffer_as_bytes_ . data ( ) ,
input_buffer_as_bytes_ . data ( ) + bytes_used , leftover_bytes_ ) ;
}
}
}
auto SampleProcessor : : handleSamples ( std : : span < sample : : Sample > input ) - > size_t {
if ( source_format_ = = target_format_ ) {
// The happiest possible case: the input format matches the output
// format already.
sink_ . send ( input ) ;
return input . size ( ) ;
}
size_t samples_used = 0 ;
while ( samples_used < input . size ( ) ) {
std : : span < sample : : Sample > output_source ;
if ( source_format_ . sample_rate ! = target_format_ . sample_rate ) {
if ( resampler_ = = nullptr ) {
ESP_LOGI ( kTag , " creating new resampler for %lu -> %lu " ,
source_format_ . sample_rate , target_format_ . sample_rate ) ;
resampler_ . reset ( new Resampler ( source_format_ . sample_rate ,
target_format_ . sample_rate ,
source_format_ . num_channels ) ) ;
}
size_t read , written ;
std : : tie ( read , written ) = resampler_ - > Process ( input . subspan ( samples_used ) ,
resampled_buffer_ , false ) ;
samples_used + = read ;
if ( read = = 0 & & written = = 0 ) {
// Zero samples used or written. We need more input.
break ;
}
}
output_source = resampled_buffer_ . first ( written ) ;
// Next, push input samples through the resampler. In the best case, this
// is a simple copy operation.
if ( ! input_buffer_ . isEmpty ( ) ) {
out_of_work = false ;
auto resample_input = input_buffer_ . readAcquire ( ) ;
auto resample_output = resampled_buffer_ . writeAcquire ( ) ;
size_t read , wrote ;
if ( resampler_ ) {
std : : tie ( read , wrote ) =
resampler_ - > Process ( resample_input , resample_output , finalise ) ;
} else {
} else {
output_source = input ;
read = wrote = std : : min ( resample_input . size ( ) , resample_output . size ( ) ) ;
samples_used = input . size ( ) ;
std : : copy_n ( resample_input . begin ( ) , read , resample_output . begin ( ) ) ;
}
}
sink_ . send ( output_source ) ;
input_buffer_ . readCommit ( read ) ;
resampled_buffer_ . writeCommit ( wrote ) ;
}
}
return samples_used ;
// Next, we need to make sure the output is in stereo. This is also a simple
// copy in the best case.
if ( ! resampled_buffer_ . isEmpty ( ) ) {
out_of_work = false ;
auto channels_input = resampled_buffer_ . readAcquire ( ) ;
auto channels_output = output_buffer_ . writeAcquire ( ) ;
size_t read , wrote ;
if ( double_samples_ ) {
wrote = channels_output . size ( ) ;
read = wrote / 2 ;
if ( read > channels_input . size ( ) ) {
read = channels_input . size ( ) ;
wrote = read * 2 ;
}
for ( size_t i = 0 ; i < read ; i + + ) {
channels_output [ i * 2 ] = channels_input [ i ] ;
channels_output [ ( i * 2 ) + 1 ] = channels_input [ i ] ;
}
} else {
read = wrote = std : : min ( channels_input . size ( ) , channels_output . size ( ) ) ;
std : : copy_n ( channels_input . begin ( ) , read , channels_output . begin ( ) ) ;
}
resampled_buffer_ . readCommit ( read ) ;
output_buffer_ . writeCommit ( wrote ) ;
}
}
auto SampleProcessor : : handleEndStream ( bool clear_bufs ) - > void {
// Finally, flush whatever ended up in the output buffer.
if ( resampler_ & & ! clear_bufs ) {
if ( flushOutputBuffer ( ) ) {
size_t read , written ;
if ( out_of_work ) {
std : : tie ( read , written ) = resampler_ - > Process ( { } , resampled_buffer_ , true ) ;
return true ;
}
if ( written > 0 ) {
} else {
sink_ . send ( resampled_buffer_ . first ( written ) ) ;
// The output is congested. Back off of processing for a moment.
return false ;
}
}
}
}
}
auto SampleProcessor : : handleEndStream ( bool clear_bufs ) - > void {
if ( clear_bufs ) {
if ( clear_bufs ) {
sink_ . clear ( ) ;
sink_ . clear ( ) ;
}
// FIXME: This discards any leftover samples, but there probably shouldn't be
input_buffer_ . clear ( ) ;
// any leftover samples. Can this be an assert instead?
resampled_buffer_ . clear ( ) ;
leftover_bytes_ = 0 ;
output_buffer_ . clear ( ) ;
size_t bytes_discarded = 0 ;
size_t bytes_to_discard = unprocessed_samples_ * sizeof ( sample : : Sample ) ;
auto scratch_buf = output_buffer_ . writeAcquire ( ) ;
while ( bytes_discarded < bytes_to_discard ) {
size_t bytes_read =
xStreamBufferReceive ( source_ , scratch_buf . data ( ) ,
std : : min ( scratch_buf . size_bytes ( ) ,
bytes_to_discard - bytes_discarded ) ,
0 ) ;
bytes_discarded + = bytes_read ;
}
unprocessed_samples_ = 0 ;
}
events : : Audio ( ) . Dispatch ( internal : : StreamEnded {
events : : Audio ( ) . Dispatch ( internal : : StreamEnded {
. cue_at_sample = sink_ . totalSent ( ) ,
. cue_at_sample = sink_ . totalSent ( ) ,
} ) ;
} ) ;
}
}
auto SampleProcessor : : hasPendingWork ( ) - > bool {
return ! pending_commands_ . empty ( ) | | unprocessed_samples_ > 0 | |
! input_buffer_ . isEmpty ( ) | | ! resampled_buffer_ . isEmpty ( ) | |
! output_buffer_ . isEmpty ( ) ;
}
IRAM_ATTR
auto SampleProcessor : : flushOutputBuffer ( ) - > bool {
auto samples = output_buffer_ . readAcquire ( ) ;
size_t sent = sink_ . send ( samples ) ;
output_buffer_ . readCommit ( sent ) ;
return output_buffer_ . isEmpty ( ) ;
}
auto SampleProcessor : : discardCommand ( Args & command ) - > void {
if ( command . track ) {
delete command . track ;
}
if ( command . samples_available ) {
unprocessed_samples_ + = command . samples_available ;
}
// End of stream commands can just be dropped. Without further actions.
}
SampleProcessor : : Buffer : : Buffer ( )
: buffer_ ( reinterpret_cast < sample : : Sample * > (
heap_caps_calloc ( kSampleBufferLength ,
sizeof ( sample : : Sample ) ,
MALLOC_CAP_DMA ) ) ,
kSampleBufferLength ) ,
samples_in_buffer_ ( ) { }
SampleProcessor : : Buffer : : ~ Buffer ( ) {
heap_caps_free ( buffer_ . data ( ) ) ;
}
auto SampleProcessor : : Buffer : : writeAcquire ( ) - > std : : span < sample : : Sample > {
return buffer_ . subspan ( samples_in_buffer_ . size ( ) ) ;
}
auto SampleProcessor : : Buffer : : writeCommit ( size_t samples ) - > void {
if ( samples = = 0 ) {
return ;
}
samples_in_buffer_ = buffer_ . first ( samples + samples_in_buffer_ . size ( ) ) ;
}
auto SampleProcessor : : Buffer : : readAcquire ( ) - > std : : span < sample : : Sample > {
return samples_in_buffer_ ;
}
auto SampleProcessor : : Buffer : : readCommit ( size_t samples ) - > void {
if ( samples = = 0 ) {
return ;
}
samples_in_buffer_ = samples_in_buffer_ . subspan ( samples ) ;
// Move the leftover samples to the front of the buffer, so that we're setup
// for a new write.
if ( ! samples_in_buffer_ . empty ( ) ) {
std : : memmove ( buffer_ . data ( ) , samples_in_buffer_ . data ( ) ,
samples_in_buffer_ . size_bytes ( ) ) ;
samples_in_buffer_ = buffer_ . first ( samples_in_buffer_ . size ( ) ) ;
}
}
auto SampleProcessor : : Buffer : : isEmpty ( ) - > bool {
return samples_in_buffer_ . empty ( ) ;
}
auto SampleProcessor : : Buffer : : clear ( ) - > void {
samples_in_buffer_ = { } ;
}
} // namespace audio
} // namespace audio