work on bulk writes and reads

master
Ondřej Hruška 6 years ago
parent a40e32d322
commit ffd95a175d
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 2
      CMakeLists.txt
  2. 8
      gex/TF_Integration.c
  3. 18
      gex/gex_defines.h
  4. 5
      gex/gex_helpers.h
  5. 13
      gex/gex_internal.h
  6. 10
      gex/gex_message_types.h
  7. 220
      gex/gex_unit.c
  8. 24
      gex/gex_unit.h
  9. 4
      main.c

@ -3,6 +3,8 @@ project(gex_client)
set(CMAKE_C_STANDARD 99)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -Werror")
set(SOURCE_FILES
main.c
gex/serial/serial.c

@ -6,7 +6,7 @@
#include "TinyFrame.h"
#define GEX_H // to allow including other headers
#include "gex.h"
#include "gex_internal.h"
void TF_WriteImpl(TinyFrame *tf, const uint8_t *buff, size_t len)
@ -15,7 +15,7 @@ void TF_WriteImpl(TinyFrame *tf, const uint8_t *buff, size_t len)
assert(gc->acm_fd != 0);
ssize_t rv = write(gc->acm_fd, buff, len);
if (rv != len) {
if (rv != (ssize_t)len) {
fprintf(stderr, "ERROR %d in TF write: %s\n", errno, strerror(errno));
}
}
@ -23,11 +23,11 @@ void TF_WriteImpl(TinyFrame *tf, const uint8_t *buff, size_t len)
/** Claim the TX interface before composing and sending a frame */
void TF_ClaimTx(TinyFrame *tf)
{
//
(void)tf;
}
/** Free the TX interface after composing and sending a frame */
void TF_ReleaseTx(TinyFrame *tf)
{
//
(void)tf;
}

@ -19,6 +19,7 @@ typedef TF_ID GexSession;
typedef struct gex_client GexClient;
typedef struct gex_unit GexUnit;
typedef struct gex_msg GexMsg;
typedef struct gex_bulk GexBulk;
/** Callback for spontaneous reports from units */
typedef void (*GexEventListener)(GexMsg msg);
@ -29,6 +30,7 @@ typedef void (*GexEventListener)(GexMsg msg);
*/
struct gex_msg {
GexUnit *unit; //!< Unit this message belongs to
GexSession session; //!< TinyFrame session used in the message
uint8_t *payload; //!< Useful payload
uint32_t len; //!< Payload length
uint8_t type; //!< Message type (e.g. MSG_ERROR), or report type in event handler
@ -46,4 +48,20 @@ struct gex_unit {
struct gex_unit *next; //!< Pointer to the next entry in this linked list, or NULL if none
};
/**
* Bulk transport info struct
*/
struct gex_bulk {
uint8_t req_cmd; //!< Bulk request command
const uint8_t *req_data; //!< Bulk request data
uint32_t req_len; //!< Bulk request data len
uint8_t *buffer; //!< Transfer buffer (rx or tx)
// those are in a union to make it more obvious they shouldn't be used at once
union {
uint32_t capacity; //!< Receive buffer capacity
uint32_t len; //!< Transfer buffer length
};
};
#endif //GEX_CLIENT_GEX_DEFINES_H

@ -9,17 +9,16 @@
#error "Include gex.h instead!"
#endif
#include "gex_internal.h"
#include "gex_defines.h"
/** Delete recursively all GEX callsign look-up table entries */
void gex_destroy_unit_lookup(GexClient *gex);
/** Get lookup entry for unit name */
struct gex_unit *gex_find_unit_by_callsign(GexClient *gex, uint8_t callsign);
GexUnit *gex_find_unit_by_callsign(GexClient *gex, uint8_t callsign);
/** Get lookup entry for unit name */
struct gex_unit *gex_find_unit_by_name(GexClient *gex, const char *name);
GexUnit *gex_find_unit_by_name(GexClient *gex, const char *name);
/** Get callsign for unit name */
uint8_t gex_find_callsign_by_name(GexClient *gex, const char *name);

@ -9,6 +9,13 @@
#include <stdbool.h>
#include "gex_client.h"
#ifndef MAX
#define MAX(a, b) ((a)>(b)?(a):(b))
#endif
#ifndef MIN
#define MIN(a, b) ((a)<(b)?(a):(b))
#endif
struct gex_client {
TinyFrame *tf; //!< TinyFrame instance
const char *acm_device; //!< Comport device name, might be used to reconnect (?)
@ -16,9 +23,9 @@ struct gex_client {
bool connected; //!< Flag that we're currently connected to the comport
// synchronous query "hacks"
bool sync_query_ok; //!< flag that the query response was received
GexMsg sync_query_response; //!< response message, copied here
uint8_t sync_query_buffer[TF_MAX_PAYLOAD_RX]; //!< buffer for the rx payload to be copied here
bool squery_ok; //!< flag that the query response was received
GexMsg squery_msg; //!< response message, copied here
uint8_t squery_buffer[TF_MAX_PAYLOAD_RX]; //!< buffer for the rx payload to be copied here
GexEventListener fallback_report_handler;

@ -17,9 +17,19 @@ enum TF_Types_ {
MSG_SUCCESS = 0x00, //!< Generic success response; used by default in all responses; payload is transaction-specific
MSG_PING = 0x01, //!< Ping request (or response), used to test connection
MSG_ERROR = 0x02, //!< Generic failure response (when a request fails to execute)
MSG_BULK_READ_OFFER = 0x03, //!< Offer of data to read. Payload: u32 total len
MSG_BULK_READ_POLL = 0x04, //!< Request to read a previously announced chunk.
MSG_BULK_WRITE_OFFER = 0x05, //!< Offer to receive data in a write transaction. Payload: u32 max size, u32 max chunk
MSG_BULK_DATA = 0x06, //!< Writing a chunk, or sending a chunk to master.
MSG_BULK_END = 0x07, //!< Bulk transfer is done, no more data to read or write.
//!< Recipient shall check total len and discard it on mismatch. There could be a checksum ...
MSG_BULK_ABORT = 0x08, //!< Discard the ongoing transfer
// Unit messages
MSG_UNIT_REQUEST = 0x10, //!< Command addressed to a particular unit
MSG_UNIT_REPORT = 0x11, //!< Spontaneous report from a unit
// System messages
MSG_LIST_UNITS = 0x20, //!< Get all unit call-signs and names
};

@ -4,11 +4,15 @@
#include <malloc.h>
#include <assert.h>
#include <utils/payload_parser.h>
#include <utils/payload_builder.h>
#define GEX_H // to allow including other headers
#include "gex_defines.h"
#include "gex_helpers.h"
#include "gex_message_types.h"
#include "gex_unit.h"
#include "gex_internal.h"
/**
* Low level query
@ -20,7 +24,11 @@
* @param lst - TF listener to handle the response, can be NULL
* @param userdata2 userdata2 argument for the TF listener's message
*/
static void GEX_LL_Query(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t len, TF_Listener lst, void *userdata2)
static void GEX_LL_Query(GexUnit *unit,
uint8_t cmd,
const uint8_t *payload, uint32_t len,
GexSession session, bool is_reply,
TF_Listener lst, void *userdata2)
{
assert(unit != NULL);
assert(unit->gex != NULL);
@ -45,6 +53,8 @@ static void GEX_LL_Query(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uin
msg.len = (TF_LEN) (len + 2);
msg.userdata = unit;
msg.userdata2 = userdata2;
msg.frame_id = session;
msg.is_response = is_reply; // This ensures the frame_id is not re-generated
TF_Query(gex->tf, &msg, lst, 0);
}
free(pld);
@ -56,7 +66,24 @@ void GEX_Send(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t len)
assert(unit != NULL);
assert(unit->gex != NULL);
GEX_LL_Query(unit, cmd, payload, len, NULL, NULL);
GEX_LL_Query(unit, cmd,
payload, len,
0, false,
NULL, NULL);
}
/** Send with no listener, don't wait for response */
void GEX_SendEx(GexUnit *unit, uint8_t cmd,
const uint8_t *payload, uint32_t len,
GexSession session, bool is_reply)
{
assert(unit != NULL);
assert(unit->gex != NULL);
GEX_LL_Query(unit, cmd,
payload, len,
session, is_reply,
NULL, NULL);
}
/** listener for the synchronous query functionality */
@ -66,60 +93,82 @@ static TF_Result sync_query_lst(TinyFrame *tf, TF_Msg *msg)
assert(gex != NULL);
// clone the message
gex->sync_query_response.len = msg->len;
gex->sync_query_response.unit = msg->userdata;
gex->sync_query_response.type = msg->type;
gex->squery_msg.len = msg->len;
gex->squery_msg.unit = msg->userdata;
gex->squery_msg.type = msg->type;
gex->squery_msg.session = msg->frame_id;
// clone the buffer
if (msg->len > 0) memcpy(gex->sync_query_buffer, msg->data, msg->len);
if (msg->len > 0) memcpy(gex->squery_buffer, msg->data, msg->len);
// re-link the buffer
gex->sync_query_response.payload = gex->sync_query_buffer;
gex->sync_query_ok = true;
gex->squery_msg.payload = gex->squery_buffer;
gex->squery_ok = true;
return TF_CLOSE;
}
/** Static query */
GexMsg GEX_Query(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t len)
static GexMsg GEX_QueryEx(GexUnit *unit, uint8_t cmd,
const uint8_t *payload, uint32_t len,
GexSession session, bool is_reply)
{
assert(unit != NULL);
assert(unit->gex != NULL);
GexClient *gex = unit->gex;
gex->sync_query_ok = false;
gex->squery_ok = false;
// Default response that will be used if nothing is received
gex->sync_query_response.unit = unit;
gex->sync_query_response.type = MSG_ERROR;
sprintf((char *) gex->sync_query_buffer, "TIMEOUT");
gex->sync_query_response.len = (uint32_t) strlen("TIMEOUT");
gex->sync_query_response.payload = gex->sync_query_buffer;
gex->squery_msg.unit = unit;
gex->squery_msg.type = MSG_ERROR;
gex->squery_msg.session = 0;
sprintf((char *) gex->squery_buffer, "TIMEOUT");
gex->squery_msg.len = (uint32_t) strlen("TIMEOUT");
gex->squery_msg.payload = gex->squery_buffer;
GEX_LL_Query(unit, cmd,
payload, len,
session, is_reply,
sync_query_lst, NULL);
GEX_LL_Query(unit, cmd, payload, len, sync_query_lst, NULL);
GEX_Poll(gex);
if (!gex->sync_query_ok) {
if (!gex->squery_ok) {
fprintf(stderr, "No response to query of unit %s!", unit->name);
}
return gex->sync_query_response;
return gex->squery_msg;
}
/** Static query */
GexMsg GEX_Query(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t len)
{
assert(unit != NULL);
assert(unit->gex != NULL);
return GEX_QueryEx(unit, cmd, payload, len, 0, false);
}
/** listener for the synchronous query functionality */
static TF_Result async_query_lst(TinyFrame *tf, TF_Msg *msg)
{
(void)tf;
GexMsg gexMsg;
// clone the message
gexMsg.len = msg->len;
gexMsg.session = msg->frame_id;
gexMsg.unit = msg->userdata; // gex is accessible via the unit
gexMsg.unit = msg->userdata; // Unit passed to "userdata" in GEX_LL_Query
gexMsg.type = msg->type;
gexMsg.session = msg->frame_id;
gexMsg.payload = (uint8_t *) msg->data;
assert( gexMsg.unit != NULL);
GexEventListener lst = msg->userdata2;
assert(lst != NULL);
lst(gexMsg);
return TF_CLOSE;
}
/** Sync query, without poll */
@ -128,14 +177,129 @@ void GEX_QueryAsync(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t
assert(unit != NULL);
assert(unit->gex != NULL);
GexClient *gex = unit->gex;
gex->sync_query_ok = false;
memset(&gex->sync_query_response, 0, sizeof(GexMsg));
// Async query does not poll, instead the user listener is attached to the message
GEX_LL_Query(unit, cmd,
payload, len,
0, false,
async_query_lst, lst);
}
// Default response that will be used if nothing is received
gex->sync_query_response.type = MSG_ERROR;
sprintf((char *) gex->sync_query_buffer, "TIMEOUT");
gex->sync_query_response.len = (uint32_t) strlen("TIMEOUT");
// ---------------------------- BULK ----------------------------
/**
* Bulk read from a unit, synchronous
*
* @param unit - the unit to target
* @param bulk - the bulk transport info struct
* @return actual number of bytes received.
*/
uint32_t GEX_BulkRead(GexUnit *unit, GexBulk *bulk)
{
// We ask for the transfer. This is unit specific and can contain information that determines the transferred data
GexMsg resp0 = GEX_Query(unit, bulk->req_cmd, bulk->req_data, bulk->req_len);
if (resp0.type == MSG_ERROR) {
fprintf(stderr, "Bulk read rejected! %.*s", resp0.len, (char*)resp0.payload);
return 0;
}
if (resp0.type != MSG_BULK_READ_OFFER) {
fprintf(stderr, "Bulk read failed, response not BULK_READ_OFFER!");
return 0;
}
// Check how much data is available
PayloadParser pp = pp_start(resp0.payload, resp0.len, NULL);
uint32_t total = pp_u32(&pp);
assert(pp.ok);
total = MIN(total, bulk->capacity); // clamp...
uint32_t at = 0;
while (at < total) {
GexMsg resp = GEX_QueryEx(unit, MSG_BULK_READ_POLL,
NULL, 0,
resp0.session, true);
if (resp.type == MSG_ERROR) {
fprintf(stderr, "Bulk read failed! %.*s", resp.len, (char *) resp.payload);
return 0;
}
if (resp.type == MSG_BULK_END) {
// No more data
return at;
}
if (resp.type == MSG_BULK_DATA) {
memcpy(bulk->buffer+at, resp.payload, resp.len);
at += resp.len;
} else {
fprintf(stderr, "Bulk read failed! Bad response type.");
return 0;
}
}
return at;
}
/**
* Bulk write to a unit, synchronous
*
* @param unit - the unit to target
* @param bulk - the bulk transport info struct
* @return true on success
*/
bool GEX_BulkWrite(GexUnit *unit, GexBulk *bulk)
{
// We ask for the transfer. This is unit specific
GexMsg resp0 = GEX_Query(unit, bulk->req_cmd, bulk->req_data, bulk->req_len);
if (resp0.type == MSG_ERROR) {
fprintf(stderr, "Bulk write rejected! %.*s", resp0.len, (char*)resp0.payload);
return false;
}
if (resp0.type != MSG_BULK_WRITE_OFFER) {
fprintf(stderr, "Bulk write failed, response not MSG_BULK_WRITE_OFFER!");
return false;
}
PayloadParser pp = pp_start(resp0.payload, resp0.len, NULL);
uint32_t max_size = pp_u32(&pp);
uint32_t max_chunk = pp_u32(&pp);
assert(pp.ok);
if (max_size < bulk->len) {
fprintf(stderr, "Write not possible, not enough space.");
// Inform GEX we're not going to do it
GEX_SendEx(unit, MSG_BULK_ABORT, NULL, 0, resp0.session, true);
return false;
}
uint32_t at = 0;
while (at < bulk->len) {
uint32_t chunk = MIN(max_chunk, bulk->len - at);
GexMsg resp = GEX_QueryEx(unit, MSG_BULK_DATA,
bulk->buffer+at, chunk,
resp0.session, true);
at += chunk;
if (resp.type == MSG_ERROR) {
fprintf(stderr, "Bulk write failed! %.*s", resp.len, (char *) resp.payload);
return false;
}
if (resp.type != MSG_SUCCESS) {
fprintf(stderr, "Bulk write failed! Bad response type.");
return false;
}
}
// Conclude the transfer
GEX_SendEx(unit, MSG_BULK_END, NULL, 0, resp0.session, true);
GEX_LL_Query(unit, cmd, payload, len, async_query_lst, lst);
return true;
}

@ -13,6 +13,8 @@
#include <stdint.h>
#include <stdbool.h>
// TODO proper descriptions
/** Send a command with no listener */
void GEX_Send(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t len);
@ -28,36 +30,22 @@ GexMsg GEX_Query(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t le
/** Asynchronous query with an async listener */
void GEX_QueryAsync(GexUnit *unit, uint8_t cmd, const uint8_t *payload, uint32_t len, GexEventListener lst);
/**
* Bulk read from a unit, synchronous
*
* @param unit - the unit to target
* @param cmd - initial request command
* @param payload - initial request payload
* @param len - initial request payload length
* @param buffer - destination buffer
* @param capacity - size of the buffer, max nr of bytes to receive
* @param bulk - the bulk transport info struct
* @return actual number of bytes received.
*/
uint32_t GEX_BulkRead(GexUnit *unit, uint8_t cmd,
const uint8_t *payload, uint32_t len,
uint8_t *buffer, uint32_t capacity);
uint32_t GEX_BulkRead(GexUnit *unit, GexBulk *bulk);
/**
* Bulk write to a unit, synchronous
*
* @param unit - the unit to target
* @param cmd - initial request command
* @param payload - initial request payload
* @param len - initial request payload length
* @param buffer - destination buffer
* @param capacity - size of the buffer, max nr of bytes to receive
* @param bulk - the bulk transport info struct
* @return true on success
*/
bool GEX_BulkWrite(GexUnit *unit, uint8_t cmd,
const uint8_t *payload, uint32_t len,
const uint8_t *buffer, uint32_t capacity);
bool GEX_BulkWrite(GexUnit *unit, GexBulk *bulk);
#endif //GEX_CLIENT_GEX_UNIT_H

@ -10,6 +10,8 @@ static GexClient *gex;
/** ^C handler to close it gracefully */
static void sigintHandler(int sig)
{
(void)sig;
if (gex != NULL) {
GEX_DeInit(gex);
}
@ -18,7 +20,7 @@ static void sigintHandler(int sig)
#define LED_CMD_TOGGLE 0x02
int main()
int main(void)
{
// Bind ^C handler for safe shutdown
signal(SIGINT, sigintHandler);

Loading…
Cancel
Save