CAN od zera #3: Jak zarządzać danymi CAN w RTOS? Porównanie mechanizmów i wybór najlepszego
Na rozgrzewkę:
Dlaczego w systemie CAN gubią się ramki, mimo że magistrala działa poprawnie?
To pytanie pojawia się bardzo często.
Analizator CAN pokazuje, że wszystkie ramki zostały wysłane. Transceiver działa poprawnie. CRC się zgadza. A mimo to mikrokontroler nie widzi części wiadomości.
Problem w większości przypadków nie leży w protokole CAN, lecz w:
– sposobie zarządzania pamięcią,
– komunikacji między ISR a zadaniami RTOS,
– nieoptymalnym przekazywaniu danych.
W tym artykule pokażę:
– dlaczego proste podejście ISR + statyczny bufor prowadzi do utraty ramek,
– jak wygląda profesjonalna implementacja w systemie RTOS,
– dlaczego Memory Pool + kolejki z przekazywaniem wskaźników to wzorzec stosowany w systemach automotive i przemysłowych.
Przykłady bazują na mikrokontrolerze STM32.
Schemat podłączenia:
Połączenie układów jest identyczne jak w poprzednim wpisie, czyli mamy dwa mikrokontrolery połączone ze sobą transceiverami protokołem CAN, co przedstawia poniższy schemat blokowy.
Metody zarządzania pamięcią we wbudowanych systemach operacyjnych:
Statyczny bufor + ISR + FIFO (najprostsza metoda)
To najprostszy model odbioru danych: przerwanie odbioru CAN zapisuje dane do statycznego bufora (np. tablica struktur), a następnie dane są pobierane w kolejności FIFO.
Przykład bufora:
static CANMessage_t rxBuffer[32];
Zarządzanie wymaga ręcznego obsłużenia indeksów, flag zajętości i kolejkowania.
Ta metoda jest:
deterministyczna (czas zawsze taki sam — brak dynamicznej alokacji),
bezpieczna pamięciowo (statycznie zaalokowane tablice),
mało skalowalna, ponieważ: (ISR wykonuje dużo logiki, przekazywanie danych między zadaniami nie jest izolowane, zmiana rozmiaru bufora wymaga rekompilacji)
Użycie: To dobra metoda na małe systemy i prosty ruch CAN.
2. Statyczny bufor + ISR + Semafor (usprawniona metoda)
To rozwinięcie metody nr 1.
ISR zapisuje ramkę do bufora (FIFO), ale zamiast aktywnie odpytywać bufor, zadanie budzi się dzięki semaforowi:
ISR → daje semafor
Zadanie → czeka na semafor (blokująco)
Zalety:
nadal deterministyczne i statyczne,
ISR jest krótsze (tylko zapis + semafor),
zadanie CAN nie pracuje gdy nie ma ramek,
dobre dla systemów z: (stałym rozmiarem ramek, niewielką liczbą typów wiadomości, umiarkowanym obciążeniem.)
Użycie: To najczęściej spotykana metoda w prostych systemach RTOS.
3. Dynamiczna alokacja (malloc/free)
ISR lub zadanie przydziela pamięć każdej ramce osobno:
CANMessage_t *msg = malloc(sizeof(CANMessage_t));
Zalety:
pełna elastyczność — każdy obiekt może mieć inny rozmiar.
Wady:
ryzyko fragmentacji, szczególnie po wielu godzinach/dniach,
konieczność ręcznego zwalniania pamięci,
potencjalne wycieki i zawieszenia systemu,
czas nie jest deterministyczny,
niedopuszczalne w systemach wymagających wysokiej niezawodności.
Użycie: Tę metodę stosuje się rzadko — głównie w systemach niekrytycznych lub eksperymentalnych.
4. Kolejki RTOS (Queue) – pełne struktury vs wskaźniki
Zadania mogą komunikować się poprzez:
a) Przesyłanie całych struktur
Łatwe, czytelne, ale:
– RTOS musi kopiować nawet duże struktury,
– powoduje duże obciążenie CPU,
– kolejki mają mały maksymalny rozmiar (kilkadziesiąt wiadomości).
b) Przesyłanie wskaźników w kolejce (zalecane)
Kolejka zawiera tylko pointer, np. do bufora statycznego lub z memory pool.
Zalety:
- minimalny czas ISR,
- mało kopiowania danych,
- skalowalne i wydajne.
Użycie: To najlepsza forma użycia kolejek RTOS w CAN.
5. Memory Pool (najlepsza metoda w RTOS) to wcześniej przygotowana pula identycznych bloków pamięci, z których system może szybko „wypożyczać” i „oddawać” elementy. Wyobraź sobie, że zamiast dynamicznie budować pamięć za każdym razem (malloc), przygotowujesz na starcie. Potem tylko: bierzesz jeden wolny blok, używasz go, oddajesz go do puli. Bez dzielenia sterty, bez szukania miejsca w pamięci, bez fragmentacji.
Prosta analogia:
Malloc to jak:
– Szukasz miejsca parkingowego w mieście.
– Czasem znajdziesz od razu.
– Czasem krążysz 10 minut.
Memory Pool to jak:
– Masz prywatny parking z 32 miejscami.
– Jeśli jest wolne miejsce — wjeżdżasz natychmiast.
– Jeśli nie ma — od razu wiesz, że parking jest pełny.
ISR odbiera ramkę i rezerwuje blok o stałym rozmiarze z memory pool:
ISR:
– pobiera blok z poola (stały czas)
– zapisuje ramkę do bloku
– wrzuca wskaźnik do kolejki
Zadanie:
– odbiera wskaźnik z kolejki
– dekoduje ramkę
– zwalnia blok do poola
Zalety:
- zero fragmentacji
- deterministyczne czasy alokacji i zwalniania
- idealne do stałych rozmiarów wiadomości CAN
- ISR bardzo szybkie
- zadanie dostaje gotowy wskaźnik — zero kopiowania
Użycie: To zdecydowanie najbardziej profesjonalna metoda w systemach pracujących latami.
Porównanie metod:
Przypadek 1 - Dlaczego gubimy ramki?
Założenia testu:
Mikrokontroler 1:
– wysyła watchdog ID 0x11 co 100 ms,
– wysyła ID 0x12 po naciśnięciu przycisku.
Mikrokontroler 2:
– używa statycznego bufora + ISR,
– przetwarza dane w pętli głównej.
Efekt:
Analizator CAN pokazuje 20 wysłanych ramek ID 0x12.
Mikrokontroler odbiera tylko 5.
Dlaczego?
Bo:
– ISR ustawia tylko flagę,
– przetwarzanie w pętli głównej trwa zbyt długo,
– nowe ramki nadpisują poprzednie dane.
Reasumując:
Magistrala działa poprawnie.
Zarządzanie pamięcią – nie.
Tak wygląda kod do obsługi przerwania:
void HAL_FDCAN_RxFifo0Callback(FDCAN_HandleTypeDef *hfdcan, uint32_t RxFifo0ITs)
{
if((RxFifo0ITs & FDCAN_IT_RX_FIFO0_NEW_MESSAGE) != RESET)
{
if(HAL_FDCAN_GetRxMessage(hfdcan, FDCAN_RX_FIFO0, &HBRxHeader, CAN_HBRxData)!= HAL_OK)
{
Error_Handler();
}
if (HAL_FDCAN_ActivateNotification(hfdcan, FDCAN_IT_RX_FIFO0_NEW_MESSAGE, 0))
{
Error_Handler();
}
if(HBRxHeader.Identifier==0x11)
{
datacheck=1;
}
if(HBRxHeader.Identifier==0x12)
{
datacheck=2;
}
}
}
Główna pętla:
while (1)
{
if(datacheck==1)
{
uint8_t data[] = "Received frame with ID 0x11\r\n";
HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 500);
datacheck=0;
}
if(datacheck==2)
{
counter_0x12++;
uint8_t data[40];
sprintf(data, "Received frame quantity ID 0x12: %d\r\n",counter_0x12);
HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 500);
datacheck=0;
}
}
Wysyłam z Mikrokontrolera 1 ramkę o ID 0x11 jako watchdog do Mikrokontrolera 2 co obserwujemy za pomocą konwertera USB CAN w programie USBCAN. Ramki wysyłane są co 100ms co widać na poniższym rysunku.
Po naciśnięciu przycisku wysyłane są ramki o ID 0x12, aby podsłuchiwać tylko te ramki wprowadziłem do programu USBCAN filtr na to ID.
Jak widać na powyższym rysunku rysunku Mikrokotroler 1 wysłał 20 ramek o ID 0x12, a Mikrokotroler 2 odebrał tylko 5 ramek o ID 0x12.
Przypadek 2 – RTOS + Memory Pool (rozwiązanie profesjonalne)
W tym wariancie w Mikrokontrolerze 2:
– używam dwóch filtrów (FIFO0 i FIFO1),
– tworzę dwie kolejki,
– tworzę dwa memory poole,
– każde FIFO obsługuje osobne zadanie.
ISR trwa bardzo krótko.
Zadania przetwarzają dane niezależnie.
Efekt testu:
20 wysłanych ramek ID 0x12.
20 odebranych ramek ID 0x12.
Reasumując:
Bez strat.
Poniżej umieściłem diagram pokazujący przepływ danych odbieranych po CAN:
W porównaniu do poprzednich wpisów w tym przypadku, użyłem dwóch filtrów:
Dalej włączyłem dwa przerwania od CAN:
Uruchomiłem RTOS i dodałem następujące dwa zadania:
Umieściłem następujące elementy w kodzie programu, handlery do kolejek i puli pamięci:
osMessageQueueId_t CANRxQueue;
osMessageQueueId_t CANWatchDogQueue;
osMemoryPoolId_t CAN_Rx_MemPool;
osMemoryPoolId_t CAN_WatchDog_MemPool;
Struktury do przechowywania odebranych ramek CAN:
typedef struct
{
FDCAN_RxHeaderTypeDef header;
uint8_t data[8];
} CANMessageRx_t;
typedef struct
{
FDCAN_RxHeaderTypeDef header;
uint8_t data[8];
} CANMessageWatchDog_t;
Funkcja, w której tworzone są kolejki:
void queues_creation(void)
{
CANRxQueue = osMessageQueueNew(CAN_RX_QUEUE_LENGTH, sizeof(CANMessageRx_t *), NULL);
if(CANRxQueue == NULL)
{
char *str = "Unable to create Rx queue\r\n";
HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
}
else
{
char *str = "Rx Queue created\r\n";
HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
}
CANWatchDogQueue = osMessageQueueNew(CAN_RX_QUEUE_LENGTH, sizeof(CANMessageWatchDog_t *), NULL);
if(CANWatchDogQueue == NULL)
{
char *str = "Unable to create WatchDog queue\r\n";
HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
}
else
{
char *str = "WatchDog Queue created\r\n";
HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
}
}
Funkcja, w której tworzone są banki pamięci:
void memory_pool_creation(void)
{
CAN_Rx_MemPool = osMemoryPoolNew(MEMPOOL_RX_OBJECTS, sizeof(CANMessageRx_t), NULL);
if (CAN_Rx_MemPool == NULL)
{
Error_Handler();
}
CAN_WatchDog_MemPool = osMemoryPoolNew(MEMPOOL_RX_OBJECTS, sizeof(CANMessageWatchDog_t), NULL);
if (CAN_WatchDog_MemPool == NULL)
{
Error_Handler();
}
}
Przerwanie, w którym odbierane są normalne ramki komunikacyjne:
void HAL_FDCAN_RxFifo0Callback(FDCAN_HandleTypeDef *hfdcan, uint32_t RxFifo0ITs)
{
CANMessageRx_t *RxMsg;
osStatus_t CAN_Rx_status;
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
// Allocate buffer from pool
RxMsg = osMemoryPoolAlloc(CAN_Rx_MemPool, 0);
if (RxMsg == NULL)
{
// Pool exhausted — handle error or drop frame
Error_Handler();
}
if((RxFifo0ITs & FDCAN_IT_RX_FIFO0_NEW_MESSAGE) != RESET)
{
// Read data to allocated buffer
if(HAL_FDCAN_GetRxMessage(hfdcan, FDCAN_RX_FIFO0, &RxMsg->header, RxMsg->data) == HAL_OK)
{
//Sending data from IT to queue
CAN_Rx_status = osMessageQueuePut(CANRxQueue, &RxMsg, 0U, 0U);
//Release mempool if problem occured during putting RxMsg buffer pointer to queue,
if(CAN_Rx_status != osOK)
{
osMemoryPoolFree(CAN_Rx_MemPool, RxMsg);
Error_Handler();
}
}
else
{
//Release mempool if problem occured during getting message from FIFO
osMemoryPoolFree(CAN_Rx_MemPool, RxMsg);
Error_Handler();
}
}
}
Przerwanie, w którym odbierane są ramki typu watchdog:
void HAL_FDCAN_RxFifo1Callback(FDCAN_HandleTypeDef *hfdcan, uint32_t RxFifo1ITs)
{
CANMessageRx_t *WatchDogMsg;
osStatus_t CAN_WatchDog_status;
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
// Allocate buffer from pool
WatchDogMsg = osMemoryPoolAlloc(CAN_WatchDog_MemPool, 0);
if (WatchDogMsg == NULL)
{
// Pool exhausted — handle error or drop frame
Error_Handler();
}
if((RxFifo1ITs & FDCAN_IT_RX_FIFO1_NEW_MESSAGE) != RESET)
{
if(HAL_FDCAN_GetRxMessage(hfdcan, FDCAN_RX_FIFO1, &WatchDogMsg->header, WatchDogMsg->data) == HAL_OK)
{
//Sending data from IT to queue
CAN_WatchDog_status = osMessageQueuePut(CANWatchDogQueue, &WatchDogMsg, 0U, 0U);
if(CAN_WatchDog_status != osOK)
{
//Release mempool if problem occured during getting message from FIFO
osMemoryPoolFree(CAN_WatchDog_MemPool, WatchDogMsg);
Error_Handler();
}
}
else
{
osMemoryPoolFree(CAN_WatchDog_MemPool, WatchDogMsg);
Error_Handler();
}
}
}
Funkcja odbioru normalnych ramek komunikacyjnych, która jest umieszczona w zadaniu:
void CAN_comm_receive(void)
{
CANMessageRx_t *RxMsg;
//Get data from memeory pool buffer by queue
if(osMessageQueueGet(CANRxQueue, &RxMsg, NULL, osWaitForever) == osOK)
{
switch(RxMsg->header.Identifier)
{
case 0x12:
{
datacheck=2;
}
break;
}
if(datacheck==2)
{
counter_0x12++;
uint8_t data[40];
sprintf(data, "Received frame quantity ID 0x12: %d\r\n",counter_0x12);
HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 500);
datacheck=0;
}
osMemoryPoolFree(CAN_Rx_MemPool, RxMsg);
}
}
Funkcja odbioru ramek typu watchdog, która jest umieszczona w zadaniu:
void WatchDog(void)
{
CANMessageWatchDog_t *WatchDogMsg;
//Not wait only check that queue will be empty
while(osMessageQueueGet(CANWatchDogQueue, &WatchDogMsg, NULL, 0) == osOK)
{
switch (WatchDogMsg->header.Identifier)
{
case 0x11:
{
datacheck = 1;
}
break;
}
osMemoryPoolFree(CAN_WatchDog_MemPool, WatchDogMsg);
}
if(datacheck==1)
{
uint8_t data[] = "Received frame with ID 0x11\r\n";
HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 100);
datacheck=0;
}
}
W funkcji main() umieściłem tylko start CAN, aktywację przerwań dla pierwszego i drugiego FIFO, nagłówek ramki nadawczej, funkcje tworzące kolejki oraz banki pamięci:
/* USER CODE BEGIN 2 */
HAL_FDCAN_Start(&hfdcan1);
HAL_FDCAN_ActivateNotification(&hfdcan1, FDCAN_IT_RX_FIFO0_NEW_MESSAGE, 0);
HAL_FDCAN_ActivateNotification(&hfdcan1, FDCAN_IT_RX_FIFO1_NEW_MESSAGE, 0);
//Header CAN1
TxHeader.Identifier = 0x13;
//11-bit Identifier
TxHeader.IdType = FDCAN_STANDARD_ID;
TxHeader.TxFrameType = FDCAN_DATA_FRAME;
//8-byte data payload
TxHeader.DataLength = FDCAN_DLC_BYTES_2;
TxHeader.ErrorStateIndicator = FDCAN_ESI_ACTIVE;
TxHeader.BitRateSwitch = FDCAN_BRS_OFF;
//No Bit Rate Switching (for CAN FD)
TxHeader.FDFormat = FDCAN_CLASSIC_CAN;
TxHeader.TxEventFifoControl = FDCAN_NO_TX_EVENTS;
TxHeader.MessageMarker = 0;
queues_creation();
memory_pool_creation();
/* USER CODE END 2 */
Dalej skonfigurowałem następujące filtry dla CAN:
/* USER CODE BEGIN FDCAN1_Init 2 */
FDCAN_FilterTypeDef fdcan1filterconfig1;
fdcan1filterconfig1.FilterConfig = FDCAN_FILTER_TO_RXFIFO0;
fdcan1filterconfig1.FilterID1 = 0x12;
fdcan1filterconfig1.FilterID2 = 0x14;
fdcan1filterconfig1.FilterIndex = 0;
fdcan1filterconfig1.FilterType = FDCAN_FILTER_DUAL;
fdcan1filterconfig1.IdType = FDCAN_STANDARD_ID;
if (HAL_FDCAN_ConfigFilter(&hfdcan1, &fdcan1filterconfig1) != HAL_OK)
{
Error_Handler();
}
FDCAN_FilterTypeDef fdcan1filterconfig2;
fdcan1filterconfig2.FilterConfig = FDCAN_FILTER_TO_RXFIFO1;
fdcan1filterconfig2.FilterID1 = 0x11;
fdcan1filterconfig2.FilterID2 = 0x15;
fdcan1filterconfig2.FilterIndex = 0;
fdcan1filterconfig2.FilterType = FDCAN_FILTER_DUAL;
fdcan1filterconfig2.IdType = FDCAN_STANDARD_ID;
if (HAL_FDCAN_ConfigFilter(&hfdcan1, &fdcan1filterconfig2) != HAL_OK)
{
Error_Handler();
}
/* USER CODE END FDCAN1_Init 2 */
Na końcu umieściłem w osobnych zadaniach funkcje odberające ramki CAN:
void StartCANIntReceive(void *argument)
{
/* USER CODE BEGIN StartCANIntReceive */
/* Infinite loop */
for(;;)
{
CAN_comm_receive();
osDelay(1);
}
/* USER CODE END StartCANIntReceive */
}
void StartCAN_WatchDog(void *argument)
{
/* USER CODE BEGIN StartCAN_WatchDog */
/* Infinite loop */
for(;;)
{
WatchDog();
osDelay(100);
}
/* USER CODE END StartCAN_WatchDog */
}
Po skonfigurowaniu Mikrokontrolera 2 zgodnie z powyższym opisem wykonałem taki sam test. Mikrokontroler 1 wysyła periodycznie ramki watchdog o ID 0x11, a po naciśnięciu przycisku wysyłane są ramki o ID 0x12. Tak samo jak w Przypadku 1 wprowadziłem do programu USBCAN filtr na ID 0x12.
Poniżej znajduje się terminal, który informuje o ramkach odebranych przez Mikrokontroler 2. Jak widać Mikrokontroler 2 odebrał wszystkie 20 ramek o ID 0x12.
Podsumowanie:
W tym artykule pokazałem, że problem utraty ramek CAN rzadko wynika z samej magistrali. Najczęściej jest to konsekwencja nieoptymalnego zarządzania pamięcią między ISR a zadaniami RTOS.
W systemach pracujących latami – jak automotive czy przemysł – deterministyczna alokacja pamięci i brak fragmentacji nie są opcją, lecz koniecznością.
Memory Pool + kolejki z przekazywaniem wskaźników to wzorzec projektowy, który pozwala:
– utrzymać minimalny czas ISR,
– uniknąć kopiowania danych,
– zagwarantować brak fragmentacji,
– skalować system bez utraty stabilności.













