From 7816d755636c0db74b8abb14f681a2c90da30eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Fri, 12 Jan 2018 09:30:02 +0100 Subject: [PATCH] merged job queue with message queue --- USB/usbd_cdc_if.c | 8 +--- comm/messages.c | 3 ++ comm/messages.h | 1 - freertos.c | 43 ++++++--------------- platform/lock_jumper.c | 3 -- platform/plat_compat.h | 10 ++--- tasks/sched_queue.h | 11 +++++- tasks/task_msg.c | 85 ++++++++++++++++++++++++++++++++++++++---- tasks/task_msg.h | 16 ++++++-- tasks/task_sched.c | 62 ------------------------------ tasks/task_sched.h | 21 ----------- utils/stacksmon.c | 8 +--- 12 files changed, 123 insertions(+), 148 deletions(-) delete mode 100644 tasks/task_sched.c delete mode 100644 tasks/task_sched.h diff --git a/USB/usbd_cdc_if.c b/USB/usbd_cdc_if.c index 736aa62..f0457a9 100644 --- a/USB/usbd_cdc_if.c +++ b/USB/usbd_cdc_if.c @@ -266,19 +266,13 @@ static int8_t CDC_Control_FS (uint8_t cmd, uint8_t* pbuf, uint16_t length) */ static int8_t CDC_Receive_FS (uint8_t* Buf, uint32_t *Len) { - static struct rx_que_item rxitem; /* USER CODE BEGIN 6 */ // this does nothing?! // the buffer was already assigned in the init function and we got it as argument here?! // USBD_CDC_SetRxBuffer(&hUsbDeviceFS, &Buf[0]); - assert_param(*Len <= APP_RX_DATA_SIZE); USBD_CDC_ReceivePacket(&hUsbDeviceFS); - - // Post the data chunk on the RX queue to be handled asynchronously. - rxitem.len = *Len; - memcpy(rxitem.data, Buf, rxitem.len); - assert_param(pdPASS == xQueueSend(queRxDataHandle, &rxitem, 100)); + rxQuePostMsg(Buf, *Len); return (USBD_OK); /* USER CODE END 6 */ diff --git a/comm/messages.c b/comm/messages.c index dec78d6..fff2be4 100644 --- a/comm/messages.c +++ b/comm/messages.c @@ -110,6 +110,9 @@ static void settings_bulkwrite_cb(BulkWrite *bulk, const uint8_t *chunk, uint32_ return; } + PUTSN((const char *) chunk, len); + PUTS("\r\n---\r\n"); + ini_parse((const char *) chunk, len); } diff --git a/comm/messages.h b/comm/messages.h index 8e0dcdf..c75acf5 100644 --- a/comm/messages.h +++ b/comm/messages.h @@ -7,7 +7,6 @@ #include "platform.h" #include "sched_queue.h" -#include "task_sched.h" #include "TinyFrame.h" /** diff --git a/freertos.c b/freertos.c index 1b2e0a4..24116d4 100644 --- a/freertos.c +++ b/freertos.c @@ -62,21 +62,13 @@ osThreadId tskMainHandle; uint32_t mainTaskStack[ TSK_STACK_MAIN ]; osStaticThreadDef_t mainTaskControlBlock; -osThreadId tskMsgHandle; -uint32_t msgTaskStack[ TSK_STACK_MSG ]; -osStaticThreadDef_t msgTaskControlBlock; +osThreadId tskMsgJobHandle; +uint32_t msgJobQueTaskStack[ TSK_STACK_MSG ]; +osStaticThreadDef_t msgJobQueTaskControlBlock; -osThreadId tskJobRunnerHandle; -uint32_t jobRunnerStack[ TSK_STACK_JOBRUNNER ]; -osStaticThreadDef_t jobRunnerControlBlock; - -osMessageQId queSchedHandle; -uint8_t queSchedBuffer[ JOB_QUEUE_CAPACITY * sizeof( struct sched_que_item ) ]; -osStaticMessageQDef_t queSchedControlBlock; - -osMessageQId queRxDataHandle; -uint8_t queRxDataBuffer[ RX_QUE_CAPACITY * sizeof( struct rx_que_item ) ]; -osStaticMessageQDef_t queRxDataControlBlock; +osMessageQId queMsgJobHandle; +uint8_t msgJobQueBuffer[ RX_QUE_CAPACITY * sizeof( struct rx_sched_combined_que_item ) ]; +osStaticMessageQDef_t msgJobQueControlBlock; osMutexId mutTinyFrameTxHandle; osStaticMutexDef_t mutTinyFrameTxControlBlock; @@ -93,9 +85,7 @@ osStaticMutexDef_t mutScratchBufferControlBlock; /* Function prototypes -------------------------------------------------------*/ void TaskMain(void const * argument); -extern void TaskSchedLP(void const * argument); -extern void TaskJobQueue(void const *argument); -extern void TaskMessaging(void const * argument); +extern void TaskMsgJob(void const *argument); void MX_FREERTOS_Init(void); /* (MISRA C 2004 rule 8.1) */ @@ -136,8 +126,7 @@ void vApplicationGetIdleTaskMemory( StaticTask_t **ppxIdleTaskTCBBuffer, StackTy void MX_FREERTOS_Init(void) { /* USER CODE BEGIN Init */ stackmon_register("Main", mainTaskStack, sizeof(mainTaskStack)); - stackmon_register("JobRunner", jobRunnerStack, sizeof(jobRunnerStack)); - stackmon_register("Messaging", msgTaskStack, sizeof(msgTaskStack)); + stackmon_register("Job+Msg", msgJobQueTaskStack, sizeof(msgJobQueTaskStack)); /* USER CODE END Init */ /* Create the mutex(es) */ @@ -171,13 +160,9 @@ void MX_FREERTOS_Init(void) { osThreadStaticDef(tskMain, TaskMain, osPriorityHigh, 0, TSK_STACK_MAIN, mainTaskStack, &mainTaskControlBlock); tskMainHandle = osThreadCreate(osThread(tskMain), NULL); - /* definition and creation of tskJobRunner */ - osThreadStaticDef(tskJobRunner, TaskJobQueue, osPriorityAboveNormal, 0, TSK_STACK_JOBRUNNER, jobRunnerStack, &jobRunnerControlBlock); - tskJobRunnerHandle = osThreadCreate(osThread(tskJobRunner), NULL); - /* definition and creation of TaskMessaging */ - osThreadStaticDef(tskMsg, TaskMessaging, osPriorityNormal, 0, TSK_STACK_MSG, msgTaskStack, &msgTaskControlBlock); - tskMsgHandle = osThreadCreate(osThread(tskMsg), NULL); + osThreadStaticDef(tskMsg, TaskMsgJob, osPriorityNormal, 0, TSK_STACK_MSG, msgJobQueTaskStack, &msgJobQueTaskControlBlock); + tskMsgJobHandle = osThreadCreate(osThread(tskMsg), NULL); /* USER CODE BEGIN RTOS_THREADS */ /* add threads, ... */ @@ -185,13 +170,9 @@ void MX_FREERTOS_Init(void) { /* Create the queue(s) */ - /* definition and creation of queSchedHP */ - osMessageQStaticDef(queSchedHP, JOB_QUEUE_CAPACITY, struct sched_que_item, queSchedBuffer, &queSchedControlBlock); - queSchedHandle = osMessageCreate(osMessageQ(queSchedHP), NULL); - /* definition and creation of queRxData */ - osMessageQStaticDef(queRxData, RX_QUE_CAPACITY, struct rx_que_item, queRxDataBuffer, &queRxDataControlBlock); - queRxDataHandle = osMessageCreate(osMessageQ(queRxData), NULL); + osMessageQStaticDef(queMsgJob, RX_QUE_CAPACITY, struct rx_sched_combined_que_item, msgJobQueBuffer, &msgJobQueControlBlock); + queMsgJobHandle = osMessageCreate(osMessageQ(queMsgJob), NULL); /* USER CODE BEGIN RTOS_QUEUES */ /* add queues, ... */ diff --git a/platform/lock_jumper.c b/platform/lock_jumper.c index c59e8b4..e70a419 100644 --- a/platform/lock_jumper.c +++ b/platform/lock_jumper.c @@ -3,9 +3,6 @@ // #include "sched_queue.h" -#include "task_sched.h" -#include "usbd_core.h" -#include "usb_device.h" #include "framework/settings.h" #include "framework/resources.h" diff --git a/platform/plat_compat.h b/platform/plat_compat.h index e571af6..cd6345e 100644 --- a/platform/plat_compat.h +++ b/platform/plat_compat.h @@ -16,15 +16,14 @@ #endif #define TSK_STACK_MSG 220 // TF message handler task stack size (all unit commands run on this thread) -#define TSK_STACK_JOBRUNNER 80 // Job runner task stack size (for async execution of events caught in interrupt) #define BULK_READ_BUF_LEN 256 // Buffer for TF bulk reads #define UNIT_TMP_LEN 512 // Buffer for bulk read and varions internal unit operations #define FLASH_SAVE_BUF_LEN 128 // Static buffer for saving to flash -#define JOB_QUEUE_CAPACITY 4 // Job runner queue size (16 bytes each) -#define RX_QUE_CAPACITY 6 // TinyFrame rx queue size (64 bytes each) +#define MSG_QUE_SLOT_SIZE 64 // FIXME this should be possible to lower, but there's some bug with bulk transfer / INI parser +#define RX_QUE_CAPACITY 8 // TinyFrame rx queue size (64 bytes each) #define TF_MAX_PAYLOAD_RX 512 // TF max Rx payload #define TF_SENDBUF_LEN 64 // TF transmit buffer (can be less than a full frame) @@ -45,8 +44,9 @@ #define IWBUFFER_LEN 80 // Ini writer buffer for sprintf // -------- Timeouts ------------ -#define TF_PARSER_TIMEOUT_TICKS 100 // Timeout for receiving & parsing a frame -#define BULK_LST_TIMEOUT_MS 200 // timeout for the bulk transaction to expire +#define TF_PARSER_TIMEOUT_TICKS 300 // Timeout for receiving & parsing a frame +#define BULK_LST_TIMEOUT_MS 500 // timeout for the bulk transaction to expire +#define MSG_QUE_POST_TIMEOUT 100 // Time to post to the messages / jobs queue // -------- Platform specific includes and defines --------- diff --git a/tasks/sched_queue.h b/tasks/sched_queue.h index 3dab5d9..8317cf0 100644 --- a/tasks/sched_queue.h +++ b/tasks/sched_queue.h @@ -5,6 +5,7 @@ #ifndef GEX_SCHED_QUEUE_H #define GEX_SCHED_QUEUE_H +#include "platform.h" #include typedef struct sched_que_item Job; @@ -32,7 +33,15 @@ struct sched_que_item { // This que is used to stash frames received from TinyFrame for later evaluation on the application thread struct rx_que_item { uint32_t len; - uint8_t data[64]; + uint8_t data[MSG_QUE_SLOT_SIZE]; +}; + +struct rx_sched_combined_que_item { + bool is_job; + union { + struct rx_que_item msg; + struct sched_que_item job; + }; }; #endif //GEX_SCHED_QUEUE_H diff --git a/tasks/task_msg.c b/tasks/task_msg.c index f1fcc2d..4a51a98 100644 --- a/tasks/task_msg.c +++ b/tasks/task_msg.c @@ -2,12 +2,56 @@ // Created by MightyPork on 2017/12/22. // +#include #include "platform.h" #include "comm/messages.h" #include "task_msg.h" volatile uint32_t msgQueHighWaterMark = 0; +static void que_safe_post(struct rx_sched_combined_que_item *slot) +{ + uint32_t count = 0; + assert_param(slot != NULL); + + if (inIRQ()) { + BaseType_t xHigherPriorityTaskWoken = pdFALSE; + assert_param(pdPASS == xQueueSendFromISR(queMsgJobHandle, slot, &xHigherPriorityTaskWoken)); + portYIELD_FROM_ISR(xHigherPriorityTaskWoken); + + #if USE_STACK_MONITOR + count = (uint32_t) uxQueueMessagesWaitingFromISR(queMsgJobHandle); + #endif + } else { + assert_param(pdPASS == xQueueSend(queMsgJobHandle, slot, MSG_QUE_POST_TIMEOUT)); + + #if USE_STACK_MONITOR + count = (uint32_t) uxQueueMessagesWaiting(queMsgJobHandle); + #endif + } + + #if USE_STACK_MONITOR + msgQueHighWaterMark = MAX(msgQueHighWaterMark, count); + #endif +} + +/** + * Schedule a function for later execution in the jobs thread + * + * @param callback - the callback function + */ +void scheduleJob(Job *job) +{ + assert_param(job->cb != NULL); + + struct rx_sched_combined_que_item slot = { + .is_job = true, + .job = *job, // copy content of the struct + }; + + que_safe_post(&slot); +} + /** * Process data received from TinyFrame. * The queue holds received messages or parts of messages, @@ -17,22 +61,47 @@ volatile uint32_t msgQueHighWaterMark = 0; * TF functions (send, respond) can be called immediately without the need for an * intermediate queued job. */ -void TaskMessaging(const void * argument) +void TaskMsgJob(const void *argument) { - dbg("> Message queue task started!"); + dbg("> Job+Msg queue task started!"); - struct rx_que_item slot; + struct rx_sched_combined_que_item slot; while (1) { - xQueueReceive(queRxDataHandle, &slot, osWaitForever); - assert_param(slot.len>0 && slot.len<=64); // check the len is within bounds + xQueueReceive(queMsgJobHandle, &slot, osWaitForever); - // We need thr scratch buffer for many unit command handlers - TF_Accept(comm, slot.data, slot.len); + if (slot.is_job) { + assert_param(slot.job.cb != NULL); + slot.job.cb(&slot.job); + } + else { + assert_param(slot.msg.len > 0 && slot.msg.len <= MSG_QUE_SLOT_SIZE); // check the len is within bounds + TF_Accept(comm, slot.msg.data, slot.msg.len); + } #if USE_STACK_MONITOR uint32_t count; - count = (uint32_t) uxQueueMessagesWaiting(queRxDataHandle); // this seems to return N+1, hence we don't add the +1 for the one just removed. + count = (uint32_t) uxQueueMessagesWaiting(queMsgJobHandle); // this seems to return N+1, hence we don't add the +1 for the one just removed. msgQueHighWaterMark = MAX(msgQueHighWaterMark, count); #endif } } + +void rxQuePostMsg(uint8_t *buf, uint32_t len) +{ + assert_param(buf != NULL); + assert_param(len != 0); + + static struct rx_sched_combined_que_item slot; + + do { + // Post the data chunk on the RX queue to be handled asynchronously. + slot.is_job = false; + slot.msg.len = len > MSG_QUE_SLOT_SIZE ? MSG_QUE_SLOT_SIZE : len; + memcpy(slot.msg.data, buf, slot.msg.len); + + que_safe_post(&slot); + + len -= slot.msg.len; + buf += slot.msg.len; + } while (len > 0); +} diff --git a/tasks/task_msg.h b/tasks/task_msg.h index ef3ffa6..7084575 100644 --- a/tasks/task_msg.h +++ b/tasks/task_msg.h @@ -5,8 +5,18 @@ #ifndef GEX_F072_TASK_MSG_H #define GEX_F072_TASK_MSG_H -extern osMessageQId queRxDataHandle; -extern osThreadId tskMsgHandle; -void TaskMessaging(const void * argument); +#include "platform.h" +#include "sched_queue.h" + +extern osMessageQId queMsgJobHandle; +extern osThreadId tskMsgJobHandle; +void TaskMsgJob(const void *argument); +void scheduleJob(Job *job); + +#if USE_STACK_MONITOR +extern volatile uint32_t msgQueHighWaterMark; +#endif + +void rxQuePostMsg(uint8_t *buf, uint32_t len); #endif //GEX_F072_TASK_MSG_H diff --git a/tasks/task_sched.c b/tasks/task_sched.c deleted file mode 100644 index 274c6be..0000000 --- a/tasks/task_sched.c +++ /dev/null @@ -1,62 +0,0 @@ -// -// Created by MightyPork on 2017/11/21. -// - -#include "platform.h" -#include "task_sched.h" - -extern osMessageQId queSchedHandle; - -volatile uint32_t jobQueHighWaterMark = 0; - -/** - * Schedule a function for later execution in the jobs thread - * - * @param callback - the callback function - */ -void scheduleJob(Job *job) -{ - QueueHandle_t que = queSchedHandle; - - assert_param(que != NULL); - assert_param(job->cb != NULL); - - uint32_t count; - if (inIRQ()) { - BaseType_t xHigherPriorityTaskWoken = pdFALSE; - assert_param(pdPASS == xQueueSendFromISR(que, job, &xHigherPriorityTaskWoken)); - portYIELD_FROM_ISR(xHigherPriorityTaskWoken); - -#if USE_STACK_MONITOR - count = (uint32_t) uxQueueMessagesWaitingFromISR(que); -#endif - } else { - assert_param(pdPASS == xQueueSend(que, job, 100)); - -#if USE_STACK_MONITOR - count = (uint32_t) uxQueueMessagesWaiting(que); -#endif - } - -#if USE_STACK_MONITOR - jobQueHighWaterMark = MAX(jobQueHighWaterMark, count); -#endif -} - -/** - * job queue handler (for use in interrupts to do longer stuff on a thread) - * - * @param argument - */ -void TaskJobQueue(const void *argument) -{ - dbg("> High priority queue task started!"); - - struct sched_que_item job; - while (1) { - xQueueReceive(queSchedHandle, &job, osWaitForever); - - assert_param(job.cb != NULL); - job.cb(&job); - } -} diff --git a/tasks/task_sched.h b/tasks/task_sched.h deleted file mode 100644 index ed9c070..0000000 --- a/tasks/task_sched.h +++ /dev/null @@ -1,21 +0,0 @@ -// -// Created by MightyPork on 2017/11/21. -// - -#ifndef GEX_TASK_SCHED_H -#define GEX_TASK_SCHED_H - -#include "platform.h" -#include "sched_queue.h" - -#if USE_STACK_MONITOR -extern volatile uint32_t jobQueHighWaterMark; -extern volatile uint32_t msgQueHighWaterMark; -#endif - -extern osThreadId tskJobRunnerHandle; -void TaskJobQueue(const void *argument); - -void scheduleJob(Job *job); - -#endif //GEX_TASK_SCHED_H diff --git a/utils/stacksmon.c b/utils/stacksmon.c index 99e4e5a..56e4fed 100644 --- a/utils/stacksmon.c +++ b/utils/stacksmon.c @@ -2,7 +2,7 @@ // Created by MightyPork on 2017/12/04. // -#include +#include "task_msg.h" #include "platform.h" #include "stacksmon.h" @@ -64,11 +64,7 @@ void stackmon_dump(void) ); } - PUTS("\033[36m>> JOB QUEUE\033[m\r\n"); - PRINTF(" Used slots: \033[33m%"PRIu32"\033[m\r\n", - jobQueHighWaterMark); - - PUTS("\033[36m>> MSG QUEUE\033[m\r\n"); + PUTS("\033[36m>> MSG+JOB QUEUE\033[m\r\n"); PRINTF(" Used slots: \033[33m%"PRIu32"\033[m\r\n", msgQueHighWaterMark);