jacqueline 2 years ago
parent 01be69eca1
commit 2056cad0ab
  1. 18
      src/audio/audio_decoder.cpp
  2. 10
      src/audio/audio_playback.cpp
  3. 43
      src/audio/audio_task.cpp
  4. 36
      src/audio/chunk.cpp
  5. 22
      src/audio/fatfs_audio_input.cpp
  6. 6
      src/audio/i2s_audio_output.cpp
  7. 2
      src/audio/include/audio_decoder.hpp
  8. 18
      src/audio/include/audio_element.hpp
  9. 9
      src/audio/include/audio_playback.hpp
  10. 4
      src/audio/include/audio_task.hpp
  11. 24
      src/audio/include/chunk.hpp
  12. 2
      src/audio/include/fatfs_audio_input.hpp
  13. 1
      src/audio/include/i2s_audio_output.hpp
  14. 22
      src/audio/include/stream_buffer.hpp

@ -57,8 +57,8 @@ auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk)
bool has_samples_to_send = false; bool has_samples_to_send = false;
bool needs_more_input = false; bool needs_more_input = false;
std::optional<codecs::ICodec::ProcessingError> error = std::nullopt; std::optional<codecs::ICodec::ProcessingError> error = std::nullopt;
WriteChunksToStream( while (1) {
output_buffer_, ChunkWriteResult res = chunk_writer_.WriteChunkToStream(
[&](cpp::span<std::byte> buffer) -> std::size_t { [&](cpp::span<std::byte> buffer) -> std::size_t {
std::size_t bytes_written = 0; std::size_t bytes_written = 0;
// Continue filling up the output buffer so long as we have samples // Continue filling up the output buffer so long as we have samples
@ -83,10 +83,20 @@ auto AudioDecoder::ProcessChunk(const cpp::span<std::byte>& chunk)
} }
return bytes_written; return bytes_written;
}, },
// This element doesn't support any kind of out of band commands, so we // TODO
// can just suspend the whole task if the output buffer fills up.
portMAX_DELAY); portMAX_DELAY);
switch (res) {
case CHUNK_WRITE_OKAY:
break;
case CHUNK_WRITE_TIMEOUT:
case CHUNK_OUT_OF_DATA:
return {};
default:
return cpp::fail(IO_ERROR);
}
}
if (error) { if (error) {
ESP_LOGE(kTag, "Codec encountered error %d", error.value()); ESP_LOGE(kTag, "Codec encountered error %d", error.value());
return cpp::fail(IO_ERROR); return cpp::fail(IO_ERROR);

@ -43,9 +43,9 @@ auto AudioPlayback::create(drivers::GpioExpander* expander,
playback->ConnectElements(codec.get(), sink.get()); playback->ConnectElements(codec.get(), sink.get());
// Launch! // Launch!
StartAudioTask("src", source); playback->element_handles_.push_back(StartAudioTask("src", source));
StartAudioTask("dec", codec); playback->element_handles_.push_back(StartAudioTask("dec", codec));
StartAudioTask("sink", sink); playback->element_handles_.push_back(StartAudioTask("sink", sink));
return playback; return playback;
} }
@ -55,7 +55,9 @@ AudioPlayback::AudioPlayback()
: stream_start_(128, 128), stream_end_(128, 128) {} : stream_start_(128, 128), stream_end_(128, 128) {}
AudioPlayback::~AudioPlayback() { AudioPlayback::~AudioPlayback() {
// TODO(jacqueline): signal the end of all things, and maybe wait for it? for (auto& element : element_handles_) {
element->Quit();
}
} }
auto AudioPlayback::Play(const std::string& filename) -> void { auto AudioPlayback::Play(const std::string& filename) -> void {

@ -3,9 +3,12 @@
#include <stdlib.h> #include <stdlib.h>
#include <cstdint> #include <cstdint>
#include <memory>
#include "audio_element_handle.hpp"
#include "cbor.h" #include "cbor.h"
#include "esp_heap_caps.h" #include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
#include "freertos/queue.h" #include "freertos/queue.h"
#include "freertos/stream_buffer.h" #include "freertos/stream_buffer.h"
@ -20,10 +23,17 @@
namespace audio { namespace audio {
auto StartAudioTask(const std::string& name, auto StartAudioTask(const std::string& name,
std::shared_ptr<IAudioElement> element) -> void { std::shared_ptr<IAudioElement> element)
-> std::unique_ptr<AudioElementHandle> {
auto task_handle = std::make_unique<TaskHandle_t>();
// Newly created task will free this.
AudioTaskArgs* args = new AudioTaskArgs{.element = element}; AudioTaskArgs* args = new AudioTaskArgs{.element = element};
xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args, xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
kTaskPriorityAudio, NULL); kTaskPriorityAudio, task_handle.get());
return std::make_unique<AudioElementHandle>(std::move(task_handle), element);
} }
void AudioTaskMain(void* args) { void AudioTaskMain(void* args) {
@ -32,9 +42,16 @@ void AudioTaskMain(void* args) {
std::shared_ptr<IAudioElement> element = std::move(real_args->element); std::shared_ptr<IAudioElement> element = std::move(real_args->element);
delete real_args; delete real_args;
char tag[] = "task";
ChunkReader chunk_reader = ChunkReader(element->InputBuffer()); ChunkReader chunk_reader = ChunkReader(element->InputBuffer());
while (1) { while (element->ElementState() != STATE_QUIT) {
if (element->ElementState() == STATE_PAUSE) {
// TODO: park with a condition variable or something?
vTaskDelay(100);
continue;
}
cpp::result<size_t, AudioProcessingError> process_res; cpp::result<size_t, AudioProcessingError> process_res;
// If this element has an input stream, then our top priority is // If this element has an input stream, then our top priority is
@ -54,6 +71,7 @@ void AudioTaskMain(void* args) {
if (chunk_res == CHUNK_PROCESSING_ERROR || if (chunk_res == CHUNK_PROCESSING_ERROR ||
chunk_res == CHUNK_DECODING_ERROR) { chunk_res == CHUNK_DECODING_ERROR) {
ESP_LOGE(tag, "failed to process chunk");
break; // TODO. break; // TODO.
} else if (chunk_res == CHUNK_STREAM_ENDED) { } else if (chunk_res == CHUNK_STREAM_ENDED) {
has_received_message = true; has_received_message = true;
@ -65,21 +83,36 @@ void AudioTaskMain(void* args) {
if (type == TYPE_STREAM_INFO) { if (type == TYPE_STREAM_INFO) {
auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message); auto parse_res = ReadMessage<StreamInfo>(&StreamInfo::Parse, message);
if (parse_res.has_error()) { if (parse_res.has_error()) {
ESP_LOGE(tag, "failed to parse stream info");
break; // TODO. break; // TODO.
} }
auto info_res = element->ProcessStreamInfo(parse_res.value()); auto info_res = element->ProcessStreamInfo(parse_res.value());
if (info_res.has_error()) { if (info_res.has_error()) {
ESP_LOGE(tag, "failed to process stream info");
break; // TODO. break; // TODO.
} }
} }
} }
// TODO: Do any out of band reading, such a a pause command, here.
// Chunk reading must have timed out, or we don't have an input stream. // Chunk reading must have timed out, or we don't have an input stream.
ElementState state = element->ElementState();
if (state == STATE_PAUSE) {
element->PrepareForPause();
vTaskSuspend(NULL);
// Zzzzzz...
// When we wake up, skip straight to the start of the loop again.
continue;
} else if (state == STATE_QUIT) {
break;
}
// Signal the element to do any of its idle tasks. // Signal the element to do any of its idle tasks.
auto process_error = element->ProcessIdle(); auto process_error = element->ProcessIdle();
if (process_error.has_error()) { if (process_error.has_error()) {
ESP_LOGE(tag, "failed to process idle");
break; // TODO. break; // TODO.
} }
} }

@ -13,11 +13,19 @@
namespace audio { namespace audio {
auto WriteChunksToStream(StreamBuffer* stream, ChunkWriter::ChunkWriter(StreamBuffer* buffer)
: stream_(buffer), leftover_bytes_(0) {}
ChunkWriter::~ChunkWriter() {}
auto ChunkWriter::Reset() -> void {
leftover_bytes_ = 0;
}
auto ChunkWriter::WriteChunkToStream(
std::function<size_t(cpp::span<std::byte>)> callback, std::function<size_t(cpp::span<std::byte>)> callback,
TickType_t max_wait) -> ChunkWriteResult { TickType_t max_wait) -> ChunkWriteResult {
cpp::span<std::byte> write_buffer = stream->WriteBuffer(); cpp::span<std::byte> write_buffer = stream_->WriteBuffer();
while (1) {
// First, write out our chunk header so we know how much space to give to // First, write out our chunk header so we know how much space to give to
// the callback. // the callback.
auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer); auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, write_buffer);
@ -25,11 +33,18 @@ auto WriteChunksToStream(StreamBuffer* stream,
return CHUNK_ENCODING_ERROR; return CHUNK_ENCODING_ERROR;
} }
// Now we can ask the callback to fill the remaining space. // Now we can ask the callback to fill the remaining space. If the previous
size_t chunk_size = std::invoke( // call to this method timed out, then we may already have the data we need
// in our write buffer.
size_t chunk_size;
if (leftover_bytes_ > 0) {
chunk_size = leftover_bytes_;
} else {
chunk_size = std::invoke(
callback, callback,
write_buffer.subspan(header_size.value(), write_buffer.subspan(header_size.value(),
write_buffer.size() - header_size.value())); write_buffer.size() - header_size.value()));
}
if (chunk_size == 0) { if (chunk_size == 0) {
// They had nothing for us, so bail out. // They had nothing for us, so bail out.
@ -39,16 +54,17 @@ auto WriteChunksToStream(StreamBuffer* stream,
// Try to write to the buffer. Note the return type here will be either 0 or // Try to write to the buffer. Note the return type here will be either 0 or
// header_size + chunk_size, as MessageBuffer doesn't allow partial writes. // header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
size_t actual_write_size = size_t actual_write_size =
xMessageBufferSend(stream->Handle(), write_buffer.data(), xMessageBufferSend(stream_->Handle(), write_buffer.data(),
header_size.value() + chunk_size, max_wait); header_size.value() + chunk_size, max_wait);
if (actual_write_size == 0) { if (actual_write_size == 0) {
// We failed to write in time, so bail out. This is techinically data loss leftover_bytes_ = chunk_size;
// unless the caller wants to go and parse our working buffer, but we
// assume the caller has a good reason to time us out.
return CHUNK_WRITE_TIMEOUT; return CHUNK_WRITE_TIMEOUT;
} else {
leftover_bytes_ = 0;
} }
}
return CHUNK_WRITE_OKAY;
} }
ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {} ChunkReader::ChunkReader(StreamBuffer* stream) : stream_(stream) {}

@ -126,25 +126,25 @@ auto FatfsAudioInput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
} }
// Now stream data into the output buffer until it's full. // Now stream data into the output buffer until it's full.
pending_read_pos_ = file_buffer_read_pos_; while (1) {
ChunkWriteResult result = WriteChunksToStream( ChunkWriteResult result = chunk_writer_.WriteChunkToStream(
output_buffer_, [&](cpp::span<std::byte> d) { return SendChunk(d); }, [&](cpp::span<std::byte> d) { return SendChunk(d); }, kServiceInterval);
kServiceInterval);
switch (result) { switch (result) {
case CHUNK_WRITE_OKAY:
break;
case CHUNK_WRITE_TIMEOUT: case CHUNK_WRITE_TIMEOUT:
case CHUNK_OUT_OF_DATA: case CHUNK_OUT_OF_DATA:
// Both of these are fine; SendChunk keeps track of where it's up to // Both of these are fine; we will pick back up where we left off in
// internally, so we will pick back up where we left off. // the next idle call.
return {}; return {};
default: default:
return cpp::fail(IO_ERROR); return cpp::fail(IO_ERROR);
} }
} }
}
auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t { auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t {
file_buffer_read_pos_ = pending_read_pos_;
if (file_buffer_read_pos_ == file_buffer_write_pos_) { if (file_buffer_read_pos_ == file_buffer_write_pos_) {
return 0; return 0;
} }
@ -159,9 +159,9 @@ auto FatfsAudioInput::SendChunk(cpp::span<std::byte> dest) -> size_t {
cpp::span<std::byte> source(file_buffer_read_pos_, chunk_size); cpp::span<std::byte> source(file_buffer_read_pos_, chunk_size);
std::copy(source.begin(), source.end(), dest.begin()); std::copy(source.begin(), source.end(), dest.begin());
pending_read_pos_ = file_buffer_read_pos_ + chunk_size; file_buffer_read_pos_ = file_buffer_read_pos_ + chunk_size;
if (pending_read_pos_ == file_buffer_.end()) { if (file_buffer_read_pos_ == file_buffer_.end()) {
pending_read_pos_ = file_buffer_.begin(); file_buffer_read_pos_ = file_buffer_.begin();
} }
return chunk_size; return chunk_size;
} }

@ -102,6 +102,12 @@ auto I2SAudioOutput::ProcessIdle() -> cpp::result<void, AudioProcessingError> {
return {}; return {};
} }
auto I2SAudioOutput::PrepareForPause() -> void {
// TODO(jacqueline): We ideally want to ensure we have enough samples in the
// DMA buffer here, so that soft mute can work properly.
SetSoftMute(true);
}
auto I2SAudioOutput::SetVolume(uint8_t volume) -> void { auto I2SAudioOutput::SetVolume(uint8_t volume) -> void {
volume_ = volume; volume_ = volume;
if (!is_soft_muted_) { if (!is_soft_muted_) {

@ -42,6 +42,8 @@ class AudioDecoder : public IAudioElement {
private: private:
std::unique_ptr<codecs::ICodec> current_codec_; std::unique_ptr<codecs::ICodec> current_codec_;
std::optional<StreamInfo> stream_info_; std::optional<StreamInfo> stream_info_;
ChunkWriter chunk_writer_;
}; };
} // namespace audio } // namespace audio

@ -1,10 +1,11 @@
#pragma once #pragma once
#include <atomic>
#include <cstdint> #include <cstdint>
#include "chunk.hpp"
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
#include "chunk.hpp"
#include "freertos/message_buffer.h" #include "freertos/message_buffer.h"
#include "freertos/portmacro.h" #include "freertos/portmacro.h"
#include "result.hpp" #include "result.hpp"
@ -16,6 +17,12 @@
namespace audio { namespace audio {
enum ElementState {
STATE_RUN,
STATE_PAUSE,
STATE_QUIT,
};
/* /*
* Errors that may be returned by any of the Process* methods of an audio * Errors that may be returned by any of the Process* methods of an audio
* element. * element.
@ -42,7 +49,8 @@ enum AudioProcessingError {
*/ */
class IAudioElement { class IAudioElement {
public: public:
IAudioElement() : input_buffer_(nullptr), output_buffer_(nullptr) {} IAudioElement()
: input_buffer_(nullptr), output_buffer_(nullptr), state_(STATE_RUN) {}
virtual ~IAudioElement() {} virtual ~IAudioElement() {}
/* /*
@ -71,6 +79,9 @@ class IAudioElement {
auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; } auto OutputBuffer(StreamBuffer* b) -> void { output_buffer_ = b; }
auto ElementState() const -> ElementState { return state_; }
auto ElementState(enum ElementState e) -> void { state_ = e; }
/* /*
* Called when a StreamInfo message is received. Used to configure this * Called when a StreamInfo message is received. Used to configure this
* element in preperation for incoming chunks. * element in preperation for incoming chunks.
@ -94,9 +105,12 @@ class IAudioElement {
*/ */
virtual auto ProcessIdle() -> cpp::result<void, AudioProcessingError> = 0; virtual auto ProcessIdle() -> cpp::result<void, AudioProcessingError> = 0;
virtual auto PrepareForPause() -> void{};
protected: protected:
StreamBuffer* input_buffer_; StreamBuffer* input_buffer_;
StreamBuffer* output_buffer_; StreamBuffer* output_buffer_;
std::atomic<enum ElementState> state_;
}; };
} // namespace audio } // namespace audio

@ -6,6 +6,7 @@
#include <vector> #include <vector>
#include "audio_element.hpp" #include "audio_element.hpp"
#include "audio_element_handle.hpp"
#include "esp_err.h" #include "esp_err.h"
#include "gpio_expander.hpp" #include "gpio_expander.hpp"
#include "result.hpp" #include "result.hpp"
@ -16,7 +17,8 @@
namespace audio { namespace audio {
/* /*
* TODO. * Creates and links together audio elements into a pipeline. This is the main
* entrypoint to playing audio on the system.
*/ */
class AudioPlayback { class AudioPlayback {
public: public:
@ -29,6 +31,10 @@ class AudioPlayback {
AudioPlayback(); AudioPlayback();
~AudioPlayback(); ~AudioPlayback();
/*
* Begins playing the file at the given FatFS path. This will interrupt any
* currently in-progress playback.
*/
auto Play(const std::string& filename) -> void; auto Play(const std::string& filename) -> void;
// Not copyable or movable. // Not copyable or movable.
@ -41,6 +47,7 @@ class AudioPlayback {
StreamBuffer stream_start_; StreamBuffer stream_start_;
StreamBuffer stream_end_; StreamBuffer stream_end_;
std::vector<std::unique_ptr<StreamBuffer>> element_buffers_; std::vector<std::unique_ptr<StreamBuffer>> element_buffers_;
std::vector<std::unique_ptr<AudioElementHandle>> element_handles_;
}; };
} // namespace audio } // namespace audio

@ -3,6 +3,7 @@
#include <memory> #include <memory>
#include "audio_element.hpp" #include "audio_element.hpp"
#include "audio_element_handle.hpp"
namespace audio { namespace audio {
@ -11,7 +12,8 @@ struct AudioTaskArgs {
}; };
auto StartAudioTask(const std::string& name, auto StartAudioTask(const std::string& name,
std::shared_ptr<IAudioElement> element) -> void; std::shared_ptr<IAudioElement> element)
-> std::unique_ptr<AudioElementHandle>;
void AudioTaskMain(void* args); void AudioTaskMain(void* args);

@ -18,6 +18,8 @@
namespace audio { namespace audio {
enum ChunkWriteResult { enum ChunkWriteResult {
// Returned when the callback does not write any data.
CHUNK_WRITE_OKAY,
// Returned when the callback does not write any data. // Returned when the callback does not write any data.
CHUNK_OUT_OF_DATA, CHUNK_OUT_OF_DATA,
// Returned when there is an error encoding a chunk header using cbor. // Returned when there is an error encoding a chunk header using cbor.
@ -27,19 +29,33 @@ enum ChunkWriteResult {
CHUNK_WRITE_TIMEOUT, CHUNK_WRITE_TIMEOUT,
}; };
class ChunkWriter {
public:
explicit ChunkWriter(StreamBuffer* buffer);
~ChunkWriter();
auto Reset() -> void;
auto GetLastMessage() -> cpp::span<std::byte>;
/* /*
* Invokes the given callback to receive data, breaks the received data up into * Invokes the given callback to receive data, breaks the received data up
* chunks with headers, and writes those chunks to the given output stream. * into chunks with headers, and writes those chunks to the given output
* stream.
* *
* The callback will be invoked with a byte buffer and its size. The callback * The callback will be invoked with a byte buffer and its size. The callback
* should write as much data as it can to this buffer, and then return the * should write as much data as it can to this buffer, and then return the
* number of bytes it wrote. Return a value of 0 to indicate that there is no * number of bytes it wrote. Return a value of 0 to indicate that there is no
* more input to read. * more input to read.
*/ */
auto WriteChunksToStream(StreamBuffer* stream, auto WriteChunkToStream(std::function<size_t(cpp::span<std::byte>)> callback,
std::function<size_t(cpp::span<std::byte>)> callback,
TickType_t max_wait) -> ChunkWriteResult; TickType_t max_wait) -> ChunkWriteResult;
private:
StreamBuffer* stream_;
std::size_t leftover_bytes_ = 0;
};
enum ChunkReadResult { enum ChunkReadResult {
CHUNK_READ_OKAY, CHUNK_READ_OKAY,
// Returned when the chunk was read successfully, but the consumer did not // Returned when the chunk was read successfully, but the consumer did not

@ -46,6 +46,8 @@ class FatfsAudioInput : public IAudioElement {
FIL current_file_; FIL current_file_;
bool is_file_open_; bool is_file_open_;
ChunkWriter chunk_writer_;
}; };
} // namespace audio } // namespace audio

@ -34,6 +34,7 @@ class I2SAudioOutput : public IAudioElement {
auto ProcessChunk(const cpp::span<std::byte>& chunk) auto ProcessChunk(const cpp::span<std::byte>& chunk)
-> cpp::result<std::size_t, AudioProcessingError> override; -> cpp::result<std::size_t, AudioProcessingError> override;
auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override; auto ProcessIdle() -> cpp::result<void, AudioProcessingError> override;
auto PrepareForPause() -> void override;
I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput(const I2SAudioOutput&) = delete;
I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete;

@ -10,13 +10,35 @@
namespace audio { namespace audio {
/*
* A collection of the buffers required for two IAudioElement implementations to
* stream data between each other.
*
* Currently, we use a FreeRTOS MessageBuffer to hold the byte stream, and also
* maintain two chunk-sized buffers for the elements to stage their read and
* write operations (as MessageBuffer copies the given data into its memory
* space). A future optimisation here could be to instead post himem memory
* addresses to the message buffer, and then maintain address spaces into which
* we map these messages, rather than 'real' allocated buffers as we do now.
*/
class StreamBuffer { class StreamBuffer {
public: public:
explicit StreamBuffer(std::size_t chunk_size, std::size_t buffer_size); explicit StreamBuffer(std::size_t chunk_size, std::size_t buffer_size);
~StreamBuffer(); ~StreamBuffer();
/* Returns the handle for the underlying message buffer. */
auto Handle() -> MessageBufferHandle_t* { return &handle_; } auto Handle() -> MessageBufferHandle_t* { return &handle_; }
/*
* Returns a chunk-sized staging buffer that should be used *only* by the
* reader (sink) element.
*/
auto ReadBuffer() -> cpp::span<std::byte> { return input_chunk_; } auto ReadBuffer() -> cpp::span<std::byte> { return input_chunk_; }
/*
* Returns a chunk-sized staging buffer that should be used *only* by the
* writer (source) element.
*/
auto WriteBuffer() -> cpp::span<std::byte> { return output_chunk_; } auto WriteBuffer() -> cpp::span<std::byte> { return output_chunk_; }
StreamBuffer(const StreamBuffer&) = delete; StreamBuffer(const StreamBuffer&) = delete;

Loading…
Cancel
Save