merged job queue with message queue

sipo
Ondřej Hruška 7 years ago
parent c58787c95d
commit 7816d75563
Signed by: MightyPork
GPG Key ID: 2C5FD5035250423D
  1. 8
      USB/usbd_cdc_if.c
  2. 3
      comm/messages.c
  3. 1
      comm/messages.h
  4. 43
      freertos.c
  5. 3
      platform/lock_jumper.c
  6. 10
      platform/plat_compat.h
  7. 11
      tasks/sched_queue.h
  8. 85
      tasks/task_msg.c
  9. 16
      tasks/task_msg.h
  10. 62
      tasks/task_sched.c
  11. 21
      tasks/task_sched.h
  12. 8
      utils/stacksmon.c

@ -266,19 +266,13 @@ static int8_t CDC_Control_FS (uint8_t cmd, uint8_t* pbuf, uint16_t length)
*/
static int8_t CDC_Receive_FS (uint8_t* Buf, uint32_t *Len)
{
static struct rx_que_item rxitem;
/* USER CODE BEGIN 6 */
// this does nothing?!
// the buffer was already assigned in the init function and we got it as argument here?!
// USBD_CDC_SetRxBuffer(&hUsbDeviceFS, &Buf[0]);
assert_param(*Len <= APP_RX_DATA_SIZE);
USBD_CDC_ReceivePacket(&hUsbDeviceFS);
// Post the data chunk on the RX queue to be handled asynchronously.
rxitem.len = *Len;
memcpy(rxitem.data, Buf, rxitem.len);
assert_param(pdPASS == xQueueSend(queRxDataHandle, &rxitem, 100));
rxQuePostMsg(Buf, *Len);
return (USBD_OK);
/* USER CODE END 6 */

@ -110,6 +110,9 @@ static void settings_bulkwrite_cb(BulkWrite *bulk, const uint8_t *chunk, uint32_
return;
}
PUTSN((const char *) chunk, len);
PUTS("\r\n---\r\n");
ini_parse((const char *) chunk, len);
}

@ -7,7 +7,6 @@
#include "platform.h"
#include "sched_queue.h"
#include "task_sched.h"
#include "TinyFrame.h"
/**

@ -62,21 +62,13 @@ osThreadId tskMainHandle;
uint32_t mainTaskStack[ TSK_STACK_MAIN ];
osStaticThreadDef_t mainTaskControlBlock;
osThreadId tskMsgHandle;
uint32_t msgTaskStack[ TSK_STACK_MSG ];
osStaticThreadDef_t msgTaskControlBlock;
osThreadId tskMsgJobHandle;
uint32_t msgJobQueTaskStack[ TSK_STACK_MSG ];
osStaticThreadDef_t msgJobQueTaskControlBlock;
osThreadId tskJobRunnerHandle;
uint32_t jobRunnerStack[ TSK_STACK_JOBRUNNER ];
osStaticThreadDef_t jobRunnerControlBlock;
osMessageQId queSchedHandle;
uint8_t queSchedBuffer[ JOB_QUEUE_CAPACITY * sizeof( struct sched_que_item ) ];
osStaticMessageQDef_t queSchedControlBlock;
osMessageQId queRxDataHandle;
uint8_t queRxDataBuffer[ RX_QUE_CAPACITY * sizeof( struct rx_que_item ) ];
osStaticMessageQDef_t queRxDataControlBlock;
osMessageQId queMsgJobHandle;
uint8_t msgJobQueBuffer[ RX_QUE_CAPACITY * sizeof( struct rx_sched_combined_que_item ) ];
osStaticMessageQDef_t msgJobQueControlBlock;
osMutexId mutTinyFrameTxHandle;
osStaticMutexDef_t mutTinyFrameTxControlBlock;
@ -93,9 +85,7 @@ osStaticMutexDef_t mutScratchBufferControlBlock;
/* Function prototypes -------------------------------------------------------*/
void TaskMain(void const * argument);
extern void TaskSchedLP(void const * argument);
extern void TaskJobQueue(void const *argument);
extern void TaskMessaging(void const * argument);
extern void TaskMsgJob(void const *argument);
void MX_FREERTOS_Init(void); /* (MISRA C 2004 rule 8.1) */
@ -136,8 +126,7 @@ void vApplicationGetIdleTaskMemory( StaticTask_t **ppxIdleTaskTCBBuffer, StackTy
void MX_FREERTOS_Init(void) {
/* USER CODE BEGIN Init */
stackmon_register("Main", mainTaskStack, sizeof(mainTaskStack));
stackmon_register("JobRunner", jobRunnerStack, sizeof(jobRunnerStack));
stackmon_register("Messaging", msgTaskStack, sizeof(msgTaskStack));
stackmon_register("Job+Msg", msgJobQueTaskStack, sizeof(msgJobQueTaskStack));
/* USER CODE END Init */
/* Create the mutex(es) */
@ -171,13 +160,9 @@ void MX_FREERTOS_Init(void) {
osThreadStaticDef(tskMain, TaskMain, osPriorityHigh, 0, TSK_STACK_MAIN, mainTaskStack, &mainTaskControlBlock);
tskMainHandle = osThreadCreate(osThread(tskMain), NULL);
/* definition and creation of tskJobRunner */
osThreadStaticDef(tskJobRunner, TaskJobQueue, osPriorityAboveNormal, 0, TSK_STACK_JOBRUNNER, jobRunnerStack, &jobRunnerControlBlock);
tskJobRunnerHandle = osThreadCreate(osThread(tskJobRunner), NULL);
/* definition and creation of TaskMessaging */
osThreadStaticDef(tskMsg, TaskMessaging, osPriorityNormal, 0, TSK_STACK_MSG, msgTaskStack, &msgTaskControlBlock);
tskMsgHandle = osThreadCreate(osThread(tskMsg), NULL);
osThreadStaticDef(tskMsg, TaskMsgJob, osPriorityNormal, 0, TSK_STACK_MSG, msgJobQueTaskStack, &msgJobQueTaskControlBlock);
tskMsgJobHandle = osThreadCreate(osThread(tskMsg), NULL);
/* USER CODE BEGIN RTOS_THREADS */
/* add threads, ... */
@ -185,13 +170,9 @@ void MX_FREERTOS_Init(void) {
/* Create the queue(s) */
/* definition and creation of queSchedHP */
osMessageQStaticDef(queSchedHP, JOB_QUEUE_CAPACITY, struct sched_que_item, queSchedBuffer, &queSchedControlBlock);
queSchedHandle = osMessageCreate(osMessageQ(queSchedHP), NULL);
/* definition and creation of queRxData */
osMessageQStaticDef(queRxData, RX_QUE_CAPACITY, struct rx_que_item, queRxDataBuffer, &queRxDataControlBlock);
queRxDataHandle = osMessageCreate(osMessageQ(queRxData), NULL);
osMessageQStaticDef(queMsgJob, RX_QUE_CAPACITY, struct rx_sched_combined_que_item, msgJobQueBuffer, &msgJobQueControlBlock);
queMsgJobHandle = osMessageCreate(osMessageQ(queMsgJob), NULL);
/* USER CODE BEGIN RTOS_QUEUES */
/* add queues, ... */

@ -3,9 +3,6 @@
//
#include "sched_queue.h"
#include "task_sched.h"
#include "usbd_core.h"
#include "usb_device.h"
#include "framework/settings.h"
#include "framework/resources.h"

@ -16,15 +16,14 @@
#endif
#define TSK_STACK_MSG 220 // TF message handler task stack size (all unit commands run on this thread)
#define TSK_STACK_JOBRUNNER 80 // Job runner task stack size (for async execution of events caught in interrupt)
#define BULK_READ_BUF_LEN 256 // Buffer for TF bulk reads
#define UNIT_TMP_LEN 512 // Buffer for bulk read and varions internal unit operations
#define FLASH_SAVE_BUF_LEN 128 // Static buffer for saving to flash
#define JOB_QUEUE_CAPACITY 4 // Job runner queue size (16 bytes each)
#define RX_QUE_CAPACITY 6 // TinyFrame rx queue size (64 bytes each)
#define MSG_QUE_SLOT_SIZE 64 // FIXME this should be possible to lower, but there's some bug with bulk transfer / INI parser
#define RX_QUE_CAPACITY 8 // TinyFrame rx queue size (64 bytes each)
#define TF_MAX_PAYLOAD_RX 512 // TF max Rx payload
#define TF_SENDBUF_LEN 64 // TF transmit buffer (can be less than a full frame)
@ -45,8 +44,9 @@
#define IWBUFFER_LEN 80 // Ini writer buffer for sprintf
// -------- Timeouts ------------
#define TF_PARSER_TIMEOUT_TICKS 100 // Timeout for receiving & parsing a frame
#define BULK_LST_TIMEOUT_MS 200 // timeout for the bulk transaction to expire
#define TF_PARSER_TIMEOUT_TICKS 300 // Timeout for receiving & parsing a frame
#define BULK_LST_TIMEOUT_MS 500 // timeout for the bulk transaction to expire
#define MSG_QUE_POST_TIMEOUT 100 // Time to post to the messages / jobs queue
// -------- Platform specific includes and defines ---------

@ -5,6 +5,7 @@
#ifndef GEX_SCHED_QUEUE_H
#define GEX_SCHED_QUEUE_H
#include "platform.h"
#include <TinyFrame.h>
typedef struct sched_que_item Job;
@ -32,7 +33,15 @@ struct sched_que_item {
// This que is used to stash frames received from TinyFrame for later evaluation on the application thread
struct rx_que_item {
uint32_t len;
uint8_t data[64];
uint8_t data[MSG_QUE_SLOT_SIZE];
};
struct rx_sched_combined_que_item {
bool is_job;
union {
struct rx_que_item msg;
struct sched_que_item job;
};
};
#endif //GEX_SCHED_QUEUE_H

@ -2,12 +2,56 @@
// Created by MightyPork on 2017/12/22.
//
#include <utils/hexdump.h>
#include "platform.h"
#include "comm/messages.h"
#include "task_msg.h"
volatile uint32_t msgQueHighWaterMark = 0;
static void que_safe_post(struct rx_sched_combined_que_item *slot)
{
uint32_t count = 0;
assert_param(slot != NULL);
if (inIRQ()) {
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
assert_param(pdPASS == xQueueSendFromISR(queMsgJobHandle, slot, &xHigherPriorityTaskWoken));
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
#if USE_STACK_MONITOR
count = (uint32_t) uxQueueMessagesWaitingFromISR(queMsgJobHandle);
#endif
} else {
assert_param(pdPASS == xQueueSend(queMsgJobHandle, slot, MSG_QUE_POST_TIMEOUT));
#if USE_STACK_MONITOR
count = (uint32_t) uxQueueMessagesWaiting(queMsgJobHandle);
#endif
}
#if USE_STACK_MONITOR
msgQueHighWaterMark = MAX(msgQueHighWaterMark, count);
#endif
}
/**
* Schedule a function for later execution in the jobs thread
*
* @param callback - the callback function
*/
void scheduleJob(Job *job)
{
assert_param(job->cb != NULL);
struct rx_sched_combined_que_item slot = {
.is_job = true,
.job = *job, // copy content of the struct
};
que_safe_post(&slot);
}
/**
* Process data received from TinyFrame.
* The queue holds received messages or parts of messages,
@ -17,22 +61,47 @@ volatile uint32_t msgQueHighWaterMark = 0;
* TF functions (send, respond) can be called immediately without the need for an
* intermediate queued job.
*/
void TaskMessaging(const void * argument)
void TaskMsgJob(const void *argument)
{
dbg("> Message queue task started!");
dbg("> Job+Msg queue task started!");
struct rx_que_item slot;
struct rx_sched_combined_que_item slot;
while (1) {
xQueueReceive(queRxDataHandle, &slot, osWaitForever);
assert_param(slot.len>0 && slot.len<=64); // check the len is within bounds
xQueueReceive(queMsgJobHandle, &slot, osWaitForever);
// We need thr scratch buffer for many unit command handlers
TF_Accept(comm, slot.data, slot.len);
if (slot.is_job) {
assert_param(slot.job.cb != NULL);
slot.job.cb(&slot.job);
}
else {
assert_param(slot.msg.len > 0 && slot.msg.len <= MSG_QUE_SLOT_SIZE); // check the len is within bounds
TF_Accept(comm, slot.msg.data, slot.msg.len);
}
#if USE_STACK_MONITOR
uint32_t count;
count = (uint32_t) uxQueueMessagesWaiting(queRxDataHandle); // this seems to return N+1, hence we don't add the +1 for the one just removed.
count = (uint32_t) uxQueueMessagesWaiting(queMsgJobHandle); // this seems to return N+1, hence we don't add the +1 for the one just removed.
msgQueHighWaterMark = MAX(msgQueHighWaterMark, count);
#endif
}
}
void rxQuePostMsg(uint8_t *buf, uint32_t len)
{
assert_param(buf != NULL);
assert_param(len != 0);
static struct rx_sched_combined_que_item slot;
do {
// Post the data chunk on the RX queue to be handled asynchronously.
slot.is_job = false;
slot.msg.len = len > MSG_QUE_SLOT_SIZE ? MSG_QUE_SLOT_SIZE : len;
memcpy(slot.msg.data, buf, slot.msg.len);
que_safe_post(&slot);
len -= slot.msg.len;
buf += slot.msg.len;
} while (len > 0);
}

@ -5,8 +5,18 @@
#ifndef GEX_F072_TASK_MSG_H
#define GEX_F072_TASK_MSG_H
extern osMessageQId queRxDataHandle;
extern osThreadId tskMsgHandle;
void TaskMessaging(const void * argument);
#include "platform.h"
#include "sched_queue.h"
extern osMessageQId queMsgJobHandle;
extern osThreadId tskMsgJobHandle;
void TaskMsgJob(const void *argument);
void scheduleJob(Job *job);
#if USE_STACK_MONITOR
extern volatile uint32_t msgQueHighWaterMark;
#endif
void rxQuePostMsg(uint8_t *buf, uint32_t len);
#endif //GEX_F072_TASK_MSG_H

@ -1,62 +0,0 @@
//
// Created by MightyPork on 2017/11/21.
//
#include "platform.h"
#include "task_sched.h"
extern osMessageQId queSchedHandle;
volatile uint32_t jobQueHighWaterMark = 0;
/**
* Schedule a function for later execution in the jobs thread
*
* @param callback - the callback function
*/
void scheduleJob(Job *job)
{
QueueHandle_t que = queSchedHandle;
assert_param(que != NULL);
assert_param(job->cb != NULL);
uint32_t count;
if (inIRQ()) {
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
assert_param(pdPASS == xQueueSendFromISR(que, job, &xHigherPriorityTaskWoken));
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
#if USE_STACK_MONITOR
count = (uint32_t) uxQueueMessagesWaitingFromISR(que);
#endif
} else {
assert_param(pdPASS == xQueueSend(que, job, 100));
#if USE_STACK_MONITOR
count = (uint32_t) uxQueueMessagesWaiting(que);
#endif
}
#if USE_STACK_MONITOR
jobQueHighWaterMark = MAX(jobQueHighWaterMark, count);
#endif
}
/**
* job queue handler (for use in interrupts to do longer stuff on a thread)
*
* @param argument
*/
void TaskJobQueue(const void *argument)
{
dbg("> High priority queue task started!");
struct sched_que_item job;
while (1) {
xQueueReceive(queSchedHandle, &job, osWaitForever);
assert_param(job.cb != NULL);
job.cb(&job);
}
}

@ -1,21 +0,0 @@
//
// Created by MightyPork on 2017/11/21.
//
#ifndef GEX_TASK_SCHED_H
#define GEX_TASK_SCHED_H
#include "platform.h"
#include "sched_queue.h"
#if USE_STACK_MONITOR
extern volatile uint32_t jobQueHighWaterMark;
extern volatile uint32_t msgQueHighWaterMark;
#endif
extern osThreadId tskJobRunnerHandle;
void TaskJobQueue(const void *argument);
void scheduleJob(Job *job);
#endif //GEX_TASK_SCHED_H

@ -2,7 +2,7 @@
// Created by MightyPork on 2017/12/04.
//
#include <task_sched.h>
#include "task_msg.h"
#include "platform.h"
#include "stacksmon.h"
@ -64,11 +64,7 @@ void stackmon_dump(void)
);
}
PUTS("\033[36m>> JOB QUEUE\033[m\r\n");
PRINTF(" Used slots: \033[33m%"PRIu32"\033[m\r\n",
jobQueHighWaterMark);
PUTS("\033[36m>> MSG QUEUE\033[m\r\n");
PUTS("\033[36m>> MSG+JOB QUEUE\033[m\r\n");
PRINTF(" Used slots: \033[33m%"PRIu32"\033[m\r\n",
msgQueHighWaterMark);

Loading…
Cancel
Save