CAN od zera #3: Jak zarządzać danymi CAN w RTOS? Porównanie mechanizmów i wybór najlepszego

Na rozgrzewkę:

Dlaczego w systemie CAN gubią się ramki, mimo że magistrala działa poprawnie?
To pytanie pojawia się bardzo często.

Analizator CAN pokazuje, że wszystkie ramki zostały wysłane. Transceiver działa poprawnie. CRC się zgadza. A mimo to mikrokontroler nie widzi części wiadomości.

Problem w większości przypadków nie leży w protokole CAN, lecz w:
– sposobie zarządzania pamięcią,
– komunikacji między ISR a zadaniami RTOS,
– nieoptymalnym przekazywaniu danych.

W tym artykule pokażę:
– dlaczego proste podejście ISR + statyczny bufor prowadzi do utraty ramek,
– jak wygląda profesjonalna implementacja w systemie RTOS,
– dlaczego Memory Pool + kolejki z przekazywaniem wskaźników to wzorzec stosowany w systemach automotive i przemysłowych.

Przykłady bazują na mikrokontrolerze STM32.

Schemat podłączenia:

Połączenie układów jest identyczne jak w poprzednim wpisie, czyli mamy dwa mikrokontrolery połączone ze sobą transceiverami protokołem CAN, co przedstawia poniższy schemat blokowy.

Metody zarządzania pamięcią we wbudowanych systemach operacyjnych:

  1. Statyczny bufor + ISR + FIFO (najprostsza metoda)

    To najprostszy model odbioru danych: przerwanie odbioru CAN zapisuje dane do statycznego bufora (np. tablica struktur), a następnie dane są pobierane w kolejności FIFO.
    Przykład bufora:

static CANMessage_t rxBuffer[32];

Zarządzanie wymaga ręcznego obsłużenia indeksów, flag zajętości i kolejkowania.
Ta metoda jest:

  • deterministyczna (czas zawsze taki sam — brak dynamicznej alokacji),

  • bezpieczna pamięciowo (statycznie zaalokowane tablice),

  • mało skalowalna, ponieważ: (ISR wykonuje dużo logiki, przekazywanie danych między zadaniami nie jest izolowane, zmiana rozmiaru bufora wymaga rekompilacji)

Użycie: To dobra metoda na małe systemy i prosty ruch CAN.

2. Statyczny bufor + ISR + Semafor (usprawniona metoda)

To rozwinięcie metody nr 1.
ISR zapisuje ramkę do bufora (FIFO), ale zamiast aktywnie odpytywać bufor, zadanie budzi się dzięki semaforowi:

  • ISR → daje semafor

  • Zadanie → czeka na semafor (blokująco)

Zalety:

  • nadal deterministyczne i statyczne,

  • ISR jest krótsze (tylko zapis + semafor),

  • zadanie CAN nie pracuje gdy nie ma ramek,

  • dobre dla systemów z: (stałym rozmiarem ramek, niewielką liczbą typów wiadomości, umiarkowanym obciążeniem.)

Użycie: To najczęściej spotykana metoda w prostych systemach RTOS.

3. Dynamiczna alokacja (malloc/free)
ISR lub zadanie przydziela pamięć każdej ramce osobno:

CANMessage_t *msg = malloc(sizeof(CANMessage_t));

Zalety:

  • pełna elastyczność — każdy obiekt może mieć inny rozmiar.

Wady:

  • ryzyko fragmentacji, szczególnie po wielu godzinach/dniach,

  • konieczność ręcznego zwalniania pamięci,

  • potencjalne wycieki i zawieszenia systemu,

  • czas nie jest deterministyczny,

  • niedopuszczalne w systemach wymagających wysokiej niezawodności.

Użycie: Tę metodę stosuje się rzadko — głównie w systemach niekrytycznych lub eksperymentalnych.

4. Kolejki RTOS (Queue) – pełne struktury vs wskaźniki

Zadania mogą komunikować się poprzez:

a) Przesyłanie całych struktur
Łatwe, czytelne, ale:
– RTOS musi kopiować nawet duże struktury,
– powoduje duże obciążenie CPU,
– kolejki mają mały maksymalny rozmiar (kilkadziesiąt wiadomości).

b) Przesyłanie wskaźników w kolejce (zalecane)
Kolejka zawiera tylko pointer, np. do bufora statycznego lub z memory pool.
Zalety:

  • minimalny czas ISR,
  • mało kopiowania danych,
  • skalowalne i wydajne.

Użycie: To najlepsza forma użycia kolejek RTOS w CAN.

5. Memory Pool (najlepsza metoda w RTOS) to wcześniej przygotowana pula identycznych bloków pamięci, z których system może szybko „wypożyczać” i „oddawać” elementy. Wyobraź sobie, że zamiast dynamicznie budować pamięć za każdym razem (malloc), przygotowujesz na starcie. Potem tylko: bierzesz jeden wolny blok, używasz go, oddajesz go do puli. Bez dzielenia sterty, bez szukania miejsca w pamięci, bez fragmentacji.
Prosta analogia:
Malloc to jak:
– Szukasz miejsca parkingowego w mieście.
– Czasem znajdziesz od razu.
– Czasem krążysz 10 minut.

Memory Pool to jak:
– Masz prywatny parking z 32 miejscami.
– Jeśli jest wolne miejsce — wjeżdżasz natychmiast.
– Jeśli nie ma — od razu wiesz, że parking jest pełny.

ISR odbiera ramkę i rezerwuje blok o stałym rozmiarze z memory pool:

ISR:
– pobiera blok z poola (stały czas)
– zapisuje ramkę do bloku
– wrzuca wskaźnik do kolejki

Zadanie:
– odbiera wskaźnik z kolejki
– dekoduje ramkę
– zwalnia blok do poola

Zalety:

  • zero fragmentacji
  • deterministyczne czasy alokacji i zwalniania
  • idealne do stałych rozmiarów wiadomości CAN
  • ISR bardzo szybkie
  • zadanie dostaje gotowy wskaźnik — zero kopiowania

Użycie: To zdecydowanie najbardziej profesjonalna metoda w systemach pracujących latami.

Porównanie metod:

Przypadek 1 - Dlaczego gubimy ramki?

Założenia testu:

Mikrokontroler 1:
– wysyła watchdog ID 0x11 co 100 ms,
– wysyła ID 0x12 po naciśnięciu przycisku.

Mikrokontroler 2:
– używa statycznego bufora + ISR,
– przetwarza dane w pętli głównej.

Efekt:
Analizator CAN pokazuje 20 wysłanych ramek ID 0x12.
Mikrokontroler odbiera tylko 5.

Dlaczego?
Bo:
– ISR ustawia tylko flagę,
– przetwarzanie w pętli głównej trwa zbyt długo,
– nowe ramki nadpisują poprzednie dane.

Reasumując:
Magistrala działa poprawnie.
Zarządzanie pamięcią – nie.

Tak wygląda kod do obsługi przerwania:


void HAL_FDCAN_RxFifo0Callback(FDCAN_HandleTypeDef *hfdcan, uint32_t RxFifo0ITs)
{
	if((RxFifo0ITs & FDCAN_IT_RX_FIFO0_NEW_MESSAGE) != RESET)
	{
		if(HAL_FDCAN_GetRxMessage(hfdcan, FDCAN_RX_FIFO0, &HBRxHeader, CAN_HBRxData)!= HAL_OK)
		{
			Error_Handler();
		}

		if (HAL_FDCAN_ActivateNotification(hfdcan, FDCAN_IT_RX_FIFO0_NEW_MESSAGE, 0))
		{
			Error_Handler();
		}

		if(HBRxHeader.Identifier==0x11)
		{
			datacheck=1;
		}

		if(HBRxHeader.Identifier==0x12)
		{
			datacheck=2;
		}
	}
}

Główna pętla:


while (1)
{
    if(datacheck==1)
	{
	    uint8_t data[] = "Received frame with ID 0x11\r\n";
        HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 500);
        datacheck=0;
    }

    if(datacheck==2)
	{
        counter_0x12++;
        uint8_t data[40];
        sprintf(data, "Received frame quantity ID 0x12: %d\r\n",counter_0x12);
        HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 500);
        datacheck=0;
    }
}

Wysyłam z Mikrokontrolera 1 ramkę o ID 0x11 jako watchdog do Mikrokontrolera 2 co obserwujemy za pomocą konwertera USB CAN w programie USBCAN. Ramki wysyłane są co 100ms co widać na poniższym rysunku.

Po naciśnięciu przycisku wysyłane są ramki o ID 0x12, aby podsłuchiwać tylko te ramki wprowadziłem do programu USBCAN filtr na to ID.

Jak widać na powyższym rysunku rysunku Mikrokotroler 1 wysłał 20 ramek o ID 0x12, a Mikrokotroler 2 odebrał tylko 5 ramek o ID 0x12.  

Przypadek 2 – RTOS + Memory Pool (rozwiązanie profesjonalne)

W tym wariancie w Mikrokontrolerze 2:
– używam dwóch filtrów (FIFO0 i FIFO1),
– tworzę dwie kolejki,
– tworzę dwa memory poole,
– każde FIFO obsługuje osobne zadanie.

ISR trwa bardzo krótko.
Zadania przetwarzają dane niezależnie.

Efekt testu:
20 wysłanych ramek ID 0x12.
20 odebranych ramek ID 0x12.

Reasumując:
Bez strat.


Poniżej umieściłem diagram pokazujący przepływ danych odbieranych po CAN:

W porównaniu do poprzednich wpisów w tym przypadku, użyłem dwóch filtrów:

Dalej włączyłem dwa przerwania od CAN:

Uruchomiłem RTOS i dodałem następujące dwa zadania:

Umieściłem następujące elementy w kodzie programu, handlery do kolejek i puli pamięci:


osMessageQueueId_t CANRxQueue;
osMessageQueueId_t CANWatchDogQueue;
osMemoryPoolId_t CAN_Rx_MemPool;
osMemoryPoolId_t CAN_WatchDog_MemPool;

Struktury do przechowywania odebranych ramek CAN:


typedef struct 
{
    FDCAN_RxHeaderTypeDef header;
	uint8_t data[8];
} CANMessageRx_t;

typedef struct 
{
    FDCAN_RxHeaderTypeDef header;
	uint8_t data[8];
} CANMessageWatchDog_t;

Funkcja, w której tworzone są kolejki:


void queues_creation(void)
{
    CANRxQueue = osMessageQueueNew(CAN_RX_QUEUE_LENGTH, sizeof(CANMessageRx_t *), NULL);
    if(CANRxQueue == NULL)
    {
        char *str = "Unable to create Rx queue\r\n";
        HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
    }
    else
    {
        char *str = "Rx Queue created\r\n";
        HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
    }

    CANWatchDogQueue = osMessageQueueNew(CAN_RX_QUEUE_LENGTH, sizeof(CANMessageWatchDog_t *), NULL);
    if(CANWatchDogQueue == NULL)
    {
        char *str = "Unable to create WatchDog queue\r\n";
        HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
    }
    else
    {
        char *str = "WatchDog Queue created\r\n";
        HAL_UART_Transmit(&hlpuart1, (uint8_t *)str, strlen(str), HAL_MAX_DELAY);
    }
}

Funkcja, w której tworzone są banki pamięci:


void memory_pool_creation(void)
{
    CAN_Rx_MemPool = osMemoryPoolNew(MEMPOOL_RX_OBJECTS, sizeof(CANMessageRx_t), NULL);
    if (CAN_Rx_MemPool == NULL)
    {
        Error_Handler();
    }
    CAN_WatchDog_MemPool = osMemoryPoolNew(MEMPOOL_RX_OBJECTS, sizeof(CANMessageWatchDog_t), NULL);
    if (CAN_WatchDog_MemPool == NULL)
    {
        Error_Handler();
    }

}

Przerwanie, w którym odbierane są normalne ramki komunikacyjne:


void HAL_FDCAN_RxFifo0Callback(FDCAN_HandleTypeDef *hfdcan, uint32_t RxFifo0ITs)
{
    CANMessageRx_t *RxMsg;
    osStatus_t CAN_Rx_status;
    BaseType_t xHigherPriorityTaskWoken = pdFALSE;

    // Allocate buffer from pool
    RxMsg = osMemoryPoolAlloc(CAN_Rx_MemPool, 0);
    if (RxMsg == NULL)
    {
        // Pool exhausted — handle error or drop frame
        Error_Handler();
    }

    if((RxFifo0ITs & FDCAN_IT_RX_FIFO0_NEW_MESSAGE) != RESET)
    {
        // Read data to allocated buffer
	if(HAL_FDCAN_GetRxMessage(hfdcan, FDCAN_RX_FIFO0, &RxMsg->header, RxMsg->data) == HAL_OK)
	{
	    //Sending data from IT to queue
	    CAN_Rx_status = osMessageQueuePut(CANRxQueue, &RxMsg, 0U, 0U);

	    //Release mempool if problem occured during putting RxMsg buffer pointer to queue,
	    if(CAN_Rx_status != osOK)
	    {
	        osMemoryPoolFree(CAN_Rx_MemPool, RxMsg);
	        Error_Handler();
	    }

        }
        else
        {
	    //Release mempool if problem occured during getting message from FIFO
	    osMemoryPoolFree(CAN_Rx_MemPool, RxMsg);
	    Error_Handler();
        }
    }
}

Przerwanie, w którym odbierane są ramki typu watchdog:


void HAL_FDCAN_RxFifo1Callback(FDCAN_HandleTypeDef *hfdcan, uint32_t RxFifo1ITs)
{
    CANMessageRx_t *WatchDogMsg;
    osStatus_t CAN_WatchDog_status;
    BaseType_t xHigherPriorityTaskWoken = pdFALSE;

    // Allocate buffer from pool
    WatchDogMsg = osMemoryPoolAlloc(CAN_WatchDog_MemPool, 0);
    if (WatchDogMsg == NULL)
    {
        // Pool exhausted — handle error or drop frame
        Error_Handler();
    }

    if((RxFifo1ITs & FDCAN_IT_RX_FIFO1_NEW_MESSAGE) != RESET)
    {
        if(HAL_FDCAN_GetRxMessage(hfdcan, FDCAN_RX_FIFO1, &WatchDogMsg->header, WatchDogMsg->data) == HAL_OK)
        {

            //Sending data from IT to queue
            CAN_WatchDog_status = osMessageQueuePut(CANWatchDogQueue, &WatchDogMsg, 0U, 0U);

            if(CAN_WatchDog_status != osOK)
            {
                //Release mempool if problem occured during getting message from FIFO
                osMemoryPoolFree(CAN_WatchDog_MemPool, WatchDogMsg);
                Error_Handler();
            }
		}
		else
		{
		    osMemoryPoolFree(CAN_WatchDog_MemPool, WatchDogMsg);
		    Error_Handler();
                    }
	}

}

Funkcja odbioru normalnych ramek komunikacyjnych, która jest umieszczona w zadaniu:


void CAN_comm_receive(void)
{
    CANMessageRx_t *RxMsg;

    //Get data from memeory pool buffer by queue
    if(osMessageQueueGet(CANRxQueue, &RxMsg, NULL, osWaitForever) == osOK)
    {
        switch(RxMsg->header.Identifier)
        {
            case 0x12:
            {
                datacheck=2;
            }
            break;
        }

        if(datacheck==2)
        {
            counter_0x12++;
            uint8_t data[40];
            sprintf(data, "Received frame quantity ID 0x12: %d\r\n",counter_0x12);
            HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 500);
            datacheck=0;
        }
        osMemoryPoolFree(CAN_Rx_MemPool, RxMsg);
    }
}

Funkcja odbioru ramek typu watchdog, która jest umieszczona w zadaniu:


void WatchDog(void)
{
    CANMessageWatchDog_t *WatchDogMsg;
    //Not wait only check that queue will be empty
    while(osMessageQueueGet(CANWatchDogQueue, &WatchDogMsg, NULL, 0) == osOK)
    {
        switch (WatchDogMsg->header.Identifier)
        {
            case 0x11:
            {
                datacheck = 1;
            }
            break;
        }
        osMemoryPoolFree(CAN_WatchDog_MemPool, WatchDogMsg);
    }
    if(datacheck==1)
    {
        uint8_t data[] = "Received frame with ID 0x11\r\n";
        HAL_UART_Transmit(&hlpuart1, data, sizeof(data), 100);
        datacheck=0;
    }
}

W funkcji main() umieściłem tylko start CAN, aktywację przerwań dla pierwszego i drugiego FIFO, nagłówek ramki nadawczej, funkcje tworzące kolejki oraz banki pamięci:


  /* USER CODE BEGIN 2 */

  HAL_FDCAN_Start(&hfdcan1);
  HAL_FDCAN_ActivateNotification(&hfdcan1, FDCAN_IT_RX_FIFO0_NEW_MESSAGE, 0);
  HAL_FDCAN_ActivateNotification(&hfdcan1, FDCAN_IT_RX_FIFO1_NEW_MESSAGE, 0);

  //Header CAN1
   TxHeader.Identifier = 0x13;
   //11-bit Identifier
   TxHeader.IdType = FDCAN_STANDARD_ID; 
   TxHeader.TxFrameType = FDCAN_DATA_FRAME;
   //8-byte data payload
   TxHeader.DataLength = FDCAN_DLC_BYTES_2;
   TxHeader.ErrorStateIndicator = FDCAN_ESI_ACTIVE;
   TxHeader.BitRateSwitch = FDCAN_BRS_OFF;  
   //No Bit Rate Switching (for CAN FD)
   TxHeader.FDFormat = FDCAN_CLASSIC_CAN;
   TxHeader.TxEventFifoControl = FDCAN_NO_TX_EVENTS;
   TxHeader.MessageMarker = 0;

    queues_creation();
    memory_pool_creation();

  /* USER CODE END 2 */

Dalej skonfigurowałem następujące filtry dla CAN:


/* USER CODE BEGIN FDCAN1_Init 2 */
  
    FDCAN_FilterTypeDef fdcan1filterconfig1;
    fdcan1filterconfig1.FilterConfig = FDCAN_FILTER_TO_RXFIFO0;
    fdcan1filterconfig1.FilterID1 = 0x12;
    fdcan1filterconfig1.FilterID2 = 0x14;
    fdcan1filterconfig1.FilterIndex = 0;
    fdcan1filterconfig1.FilterType = FDCAN_FILTER_DUAL;
    fdcan1filterconfig1.IdType = FDCAN_STANDARD_ID;
    if (HAL_FDCAN_ConfigFilter(&hfdcan1, &fdcan1filterconfig1) != HAL_OK)
    {
        Error_Handler();
    }

    FDCAN_FilterTypeDef fdcan1filterconfig2;
    fdcan1filterconfig2.FilterConfig = FDCAN_FILTER_TO_RXFIFO1;
    fdcan1filterconfig2.FilterID1 = 0x11;
    fdcan1filterconfig2.FilterID2 = 0x15;
    fdcan1filterconfig2.FilterIndex = 0;
    fdcan1filterconfig2.FilterType = FDCAN_FILTER_DUAL;
    fdcan1filterconfig2.IdType = FDCAN_STANDARD_ID;
    if (HAL_FDCAN_ConfigFilter(&hfdcan1, &fdcan1filterconfig2) != HAL_OK)
    {
        Error_Handler();
    }
/* USER CODE END FDCAN1_Init 2 */

Na końcu umieściłem w osobnych zadaniach funkcje odberające ramki CAN:


void StartCANIntReceive(void *argument)
{
    /* USER CODE BEGIN StartCANIntReceive */
    /* Infinite loop */
    for(;;)
    {
        CAN_comm_receive();
        osDelay(1);
    }
/* USER CODE END StartCANIntReceive */
}

void StartCAN_WatchDog(void *argument)
{
    /* USER CODE BEGIN StartCAN_WatchDog */
    /* Infinite loop */
    for(;;)
    {
        WatchDog();
        osDelay(100);
    }
    /* USER CODE END StartCAN_WatchDog */
}

Po skonfigurowaniu Mikrokontrolera 2 zgodnie z powyższym opisem wykonałem taki sam test. Mikrokontroler 1 wysyła periodycznie ramki watchdog o ID 0x11, a po naciśnięciu przycisku wysyłane są ramki o ID 0x12. Tak samo jak w Przypadku 1 wprowadziłem do programu USBCAN filtr na ID 0x12.

Poniżej znajduje się terminal, który informuje o ramkach odebranych przez Mikrokontroler 2. Jak widać Mikrokontroler 2 odebrał wszystkie 20 ramek o ID 0x12.

Podsumowanie:

W tym artykule pokazałem, że problem utraty ramek CAN rzadko wynika z samej magistrali. Najczęściej jest to konsekwencja nieoptymalnego zarządzania pamięcią między ISR a zadaniami RTOS.
W systemach pracujących latami – jak automotive czy przemysł – deterministyczna alokacja pamięci i brak fragmentacji nie są opcją, lecz koniecznością.
Memory Pool + kolejki z przekazywaniem wskaźników to wzorzec projektowy, który pozwala:
– utrzymać minimalny czas ISR,
– uniknąć kopiowania danych,
– zagwarantować brak fragmentacji,
– skalować system bez utraty stabilności.

Dodaj komentarz