|
|
@ -1,11 +1,13 @@ |
|
|
|
#include "chunk.hpp" |
|
|
|
#include "chunk.hpp" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#include <algorithm> |
|
|
|
#include <cstddef> |
|
|
|
#include <cstddef> |
|
|
|
#include <cstdint> |
|
|
|
#include <cstdint> |
|
|
|
#include <cstring> |
|
|
|
#include <cstring> |
|
|
|
#include <optional> |
|
|
|
#include <optional> |
|
|
|
|
|
|
|
|
|
|
|
#include "cbor.h" |
|
|
|
#include "cbor.h" |
|
|
|
|
|
|
|
#include "psram_allocator.h" |
|
|
|
|
|
|
|
|
|
|
|
#include "stream_message.hpp" |
|
|
|
#include "stream_message.hpp" |
|
|
|
|
|
|
|
|
|
|
@ -17,60 +19,34 @@ const std::size_t kMaxChunkSize = 512; |
|
|
|
// TODO: tune
|
|
|
|
// TODO: tune
|
|
|
|
static const std::size_t kWorkingBufferSize = kMaxChunkSize * 1.5; |
|
|
|
static const std::size_t kWorkingBufferSize = kMaxChunkSize * 1.5; |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
|
|
* The amount of space to allocate for the first chunk's header. After the first |
|
|
|
|
|
|
|
* chunk, we have a more concrete idea of the header's size and can allocate |
|
|
|
|
|
|
|
* space for future headers more compactly. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
// TODO: measure how big headers tend to be to pick a better value.
|
|
|
|
|
|
|
|
static const std::size_t kInitialHeaderSize = 32; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
auto WriteChunksToStream(MessageBufferHandle_t* stream, |
|
|
|
auto WriteChunksToStream(MessageBufferHandle_t* stream, |
|
|
|
uint8_t* working_buffer, |
|
|
|
cpp::span<std::byte> working_buffer, |
|
|
|
size_t working_buffer_length, |
|
|
|
std::function<size_t(cpp::span<std::byte>)> callback, |
|
|
|
std::function<size_t(uint8_t*, size_t)> callback, |
|
|
|
|
|
|
|
TickType_t max_wait) -> ChunkWriteResult { |
|
|
|
TickType_t max_wait) -> ChunkWriteResult { |
|
|
|
size_t header_size = kInitialHeaderSize; |
|
|
|
|
|
|
|
while (1) { |
|
|
|
while (1) { |
|
|
|
// First, ask the callback for some data to write.
|
|
|
|
// First, write out our chunk header so we know how much space to give to
|
|
|
|
size_t chunk_size = callback(working_buffer + header_size, |
|
|
|
// the callback.
|
|
|
|
working_buffer_length - header_size); |
|
|
|
auto header_size = WriteTypeOnlyMessage(TYPE_CHUNK_HEADER, working_buffer); |
|
|
|
|
|
|
|
if (header_size.has_error()) { |
|
|
|
|
|
|
|
return CHUNK_ENCODING_ERROR; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Now we can ask the callback to fill the remaining space.
|
|
|
|
|
|
|
|
size_t chunk_size = std::invoke( |
|
|
|
|
|
|
|
callback, |
|
|
|
|
|
|
|
working_buffer.subspan(header_size.value(), |
|
|
|
|
|
|
|
working_buffer.size() - header_size.value())); |
|
|
|
|
|
|
|
|
|
|
|
if (chunk_size == 0) { |
|
|
|
if (chunk_size == 0) { |
|
|
|
// They had nothing for us, so bail out.
|
|
|
|
// They had nothing for us, so bail out.
|
|
|
|
return CHUNK_OUT_OF_DATA; |
|
|
|
return CHUNK_OUT_OF_DATA; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Put together a header.
|
|
|
|
|
|
|
|
CborEncoder arr; |
|
|
|
|
|
|
|
cpp::result<size_t, CborError> encoder_res = WriteMessage( |
|
|
|
|
|
|
|
TYPE_CHUNK_HEADER, |
|
|
|
|
|
|
|
[&](CborEncoder& container) { |
|
|
|
|
|
|
|
cbor_encoder_create_array(&container, &arr, 2); |
|
|
|
|
|
|
|
cbor_encode_uint(&arr, header_size); |
|
|
|
|
|
|
|
cbor_encode_uint(&arr, chunk_size); |
|
|
|
|
|
|
|
cbor_encoder_close_container(&container, &arr); |
|
|
|
|
|
|
|
return std::nullopt; |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
working_buffer, working_buffer_length); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
size_t new_header_size = header_size; |
|
|
|
|
|
|
|
if (encoder_res.has_error()) { |
|
|
|
|
|
|
|
return CHUNK_ENCODING_ERROR; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
// We can now tune the space to allocate for the header to be closer to
|
|
|
|
|
|
|
|
// its actual size. We pad this by 2 bytes to allow extra space for the
|
|
|
|
|
|
|
|
// chunk size and header size fields to each spill over into another byte
|
|
|
|
|
|
|
|
// each.
|
|
|
|
|
|
|
|
new_header_size = encoder_res.value() + 2; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Try to write to the buffer. Note the return type here will be either 0 or
|
|
|
|
// Try to write to the buffer. Note the return type here will be either 0 or
|
|
|
|
// header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
|
|
|
|
// header_size + chunk_size, as MessageBuffer doesn't allow partial writes.
|
|
|
|
size_t actual_write_size = xMessageBufferSend( |
|
|
|
size_t actual_write_size = |
|
|
|
*stream, working_buffer, header_size + chunk_size, max_wait); |
|
|
|
xMessageBufferSend(*stream, working_buffer.data(), |
|
|
|
|
|
|
|
header_size.value() + chunk_size, max_wait); |
|
|
|
header_size = new_header_size; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (actual_write_size == 0) { |
|
|
|
if (actual_write_size == 0) { |
|
|
|
// We failed to write in time, so bail out. This is techinically data loss
|
|
|
|
// We failed to write in time, so bail out. This is techinically data loss
|
|
|
@ -81,38 +57,39 @@ auto WriteChunksToStream(MessageBufferHandle_t* stream, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
ChunkReader::ChunkReader(MessageBufferHandle_t* stream) : stream_(stream) { |
|
|
|
ChunkReader::ChunkReader(MessageBufferHandle_t* stream) |
|
|
|
working_buffer_ = static_cast<uint8_t*>( |
|
|
|
: stream_(stream), |
|
|
|
heap_caps_malloc(kWorkingBufferSize, MALLOC_CAP_SPIRAM)); |
|
|
|
raw_working_buffer_(static_cast<std::byte*>( |
|
|
|
}; |
|
|
|
heap_caps_malloc(kWorkingBufferSize, MALLOC_CAP_SPIRAM))), |
|
|
|
|
|
|
|
working_buffer_(raw_working_buffer_, kWorkingBufferSize){}; |
|
|
|
|
|
|
|
|
|
|
|
ChunkReader::~ChunkReader() { |
|
|
|
ChunkReader::~ChunkReader() { |
|
|
|
free(working_buffer_); |
|
|
|
free(raw_working_buffer_); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
auto ChunkReader::Reset() -> void { |
|
|
|
auto ChunkReader::Reset() -> void { |
|
|
|
leftover_bytes_ = 0; |
|
|
|
leftover_bytes_ = 0; |
|
|
|
last_message_size_ = 0; |
|
|
|
last_message_size_ = 0; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
auto ChunkReader::GetLastMessage() -> std::pair<uint8_t*, size_t> { |
|
|
|
auto ChunkReader::GetLastMessage() -> cpp::span<std::byte> { |
|
|
|
return std::make_pair(working_buffer_ + leftover_bytes_, last_message_size_); |
|
|
|
return working_buffer_.subspan(leftover_bytes_, last_message_size_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
auto ChunkReader::ReadChunkFromStream( |
|
|
|
auto ChunkReader::ReadChunkFromStream( |
|
|
|
std::function<std::optional<size_t>(uint8_t*, size_t)> callback, |
|
|
|
std::function<std::optional<size_t>(cpp::span<std::byte>)> callback, |
|
|
|
TickType_t max_wait) -> ChunkReadResult { |
|
|
|
TickType_t max_wait) -> ChunkReadResult { |
|
|
|
// First, wait for a message to arrive over the buffer.
|
|
|
|
// First, wait for a message to arrive over the buffer.
|
|
|
|
last_message_size_ = |
|
|
|
last_message_size_ = |
|
|
|
xMessageBufferReceive(*stream_, working_buffer_ + leftover_bytes_, |
|
|
|
xMessageBufferReceive(*stream_, raw_working_buffer_ + leftover_bytes_, |
|
|
|
kWorkingBufferSize - leftover_bytes_, max_wait); |
|
|
|
working_buffer_.size() - leftover_bytes_, max_wait); |
|
|
|
|
|
|
|
|
|
|
|
if (last_message_size_ == 0) { |
|
|
|
if (last_message_size_ == 0) { |
|
|
|
return CHUNK_READ_TIMEOUT; |
|
|
|
return CHUNK_READ_TIMEOUT; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
MessageType type = |
|
|
|
cpp::span<std::byte> new_data = GetLastMessage(); |
|
|
|
ReadMessageType(working_buffer_ + leftover_bytes_, last_message_size_); |
|
|
|
MessageType type = ReadMessageType(new_data); |
|
|
|
|
|
|
|
|
|
|
|
if (type != TYPE_CHUNK_HEADER) { |
|
|
|
if (type != TYPE_CHUNK_HEADER) { |
|
|
|
// This message wasn't for us, so let the caller handle it.
|
|
|
|
// This message wasn't for us, so let the caller handle it.
|
|
|
@ -121,31 +98,30 @@ auto ChunkReader::ReadChunkFromStream( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Work the size and position of the chunk.
|
|
|
|
// Work the size and position of the chunk.
|
|
|
|
size_t header_length = 0, chunk_length = 0; |
|
|
|
auto chunk_data = GetAdditionalData(new_data); |
|
|
|
|
|
|
|
|
|
|
|
// TODO: chunker header type to encapsulate this?
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Now we need to stick the end of the last chunk (if it exists) onto the
|
|
|
|
// Now we need to stick the end of the last chunk (if it exists) onto the
|
|
|
|
// front of the new chunk. Do it this way around bc we assume the old chunk
|
|
|
|
// front of the new chunk. Do it this way around bc we assume the old chunk
|
|
|
|
// is shorter, and therefore faster to move.
|
|
|
|
// is shorter, and therefore faster to move.
|
|
|
|
uint8_t* combined_buffer = working_buffer_ + header_length - leftover_bytes_; |
|
|
|
cpp::span<std::byte> leftover_data = working_buffer_.first(leftover_bytes_); |
|
|
|
size_t combined_buffer_size = leftover_bytes_ + chunk_length; |
|
|
|
cpp::span<std::byte> combined_data(chunk_data.data() - leftover_data.size(), |
|
|
|
|
|
|
|
leftover_data.size() + chunk_data.size()); |
|
|
|
if (leftover_bytes_ > 0) { |
|
|
|
if (leftover_bytes_ > 0) { |
|
|
|
memmove(combined_buffer, working_buffer_, leftover_bytes_); |
|
|
|
std::copy_backward(leftover_data.begin(), leftover_data.end(), |
|
|
|
|
|
|
|
combined_data.begin()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Tell the callback about the new data.
|
|
|
|
// Tell the callback about the new data.
|
|
|
|
std::optional<size_t> amount_processed = |
|
|
|
std::optional<size_t> amount_processed = std::invoke(callback, combined_data); |
|
|
|
callback(combined_buffer, combined_buffer_size); |
|
|
|
|
|
|
|
if (!amount_processed) { |
|
|
|
if (!amount_processed) { |
|
|
|
return CHUNK_PROCESSING_ERROR; |
|
|
|
return CHUNK_PROCESSING_ERROR; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Prepare for the next iteration.
|
|
|
|
// Prepare for the next iteration.
|
|
|
|
leftover_bytes_ = combined_buffer_size - amount_processed.value(); |
|
|
|
leftover_bytes_ = combined_data.size() - amount_processed.value(); |
|
|
|
if (leftover_bytes_ > 0) { |
|
|
|
if (leftover_bytes_ > 0) { |
|
|
|
memmove(working_buffer_, combined_buffer + amount_processed.value(), |
|
|
|
std::copy(combined_data.begin() + amount_processed.value(), |
|
|
|
leftover_bytes_); |
|
|
|
combined_data.end(), working_buffer_.begin()); |
|
|
|
return CHUNK_LEFTOVER_DATA; |
|
|
|
return CHUNK_LEFTOVER_DATA; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|