Padrões de design para comunicação interprocessos em software embarcado (2/3)

«Primeira parte»

4.3. Queuing Pattern

Quando tasks têm baixo acomplamento temporal, i.e., virtualmente1 o tempo em que um processo consumidor leva para resgatar o resultado de um processo produtor não é condição para diferenciar falha de sucesso, o padrão de Message Queueing é bastante comum.

O processo produtor envia uma mensagem a uma fila (um buffer, normalmente circular) e algum tempo depois o processo consumidor resgata esta mensagem para seguir adiante. A mensagem pode ser desde um simples booleano ou qualquer outra estrutura mais complexa.

1 isto não significa que o sistema não tenha condições temporais. Significa somente que determinadas tasks não têm relação temporal rígida entre si naquele ponto de sincronização (coordenação).

Figura 10. Message Queue para coordenar processos [2]
Figura 11. Generalização do padrão Queuing em UML

4.3.1. Rationale

Se os processos são fracamente acoplados no tempo, podemos coordená-los (ou sincronizá-los) com um mecanismo assíncrono: um processo envia a mensagem e o outro lê “quando puder”.

4.3.2 Elementos

Na Figura 11, diversas tarefas QTask compartilham uma única MessageQueue. Um Mutex agregado a esta fila de mensagens, coordena o acesso de cada tarefa à fila de mensagens. A fila tem QUEUE_SIZE elementos do tipo Message, que podem ser qualquer dado/comando – desde um simples booleano até uma PDU do tipo CAN, por exemplo. Este tamanho deve comportar o pior caso para filas no sistema1. Poderíamos dispensar o Mutex se agregássemos a cada processo a sua própria caixa de mensagens, obviamente ao custo de utilizarmos mais memória.

1 Um dos métodos empregados para calcular o tamanho da fila é o Teorema de Little

4.3.3. Implementação

No que diz respeito à complexidade, este tipo de mecanismo de IPC é relativamente simples, mas existem muitas variantes. Algumas mensagens podem ser mais prioritárias que outras, em sistemas complexos o tamanho da fila é difícil de ser optimizado e algumas implementações fazem uso de alocação dinâmica (o que em embarcados, é controverso) ou até mesmo guardam mensagens pouco prioritárias no sistema de arquivos [1]. A utilização de linked lists para implementação também é uma abordagem que conta com a flexibilidade do próximo item a ser lido ser passado por referência, e com esta flexibilidade também os potenciais problemas que o extensivo uso de ponteiros trás.

Neste artigo um scheduler cooperativo manejava as tarefas em fila, onde aquelas mais urgentes (baseados na deadline) eram colocadas no início da fila. Fundamentalmente isto é um mecanismo de comunicação interprocessos do padrão queueing: a fila coordena o disparo dos processos que concorrem para utilizar um recurso: o microprocessador.

4.3.4. Exemplo

A Figura 12 mostra um sistema, em que sensores de determinados gases compartilham um queue para registrar os valores medidos na estrutura de dados GasData. Um GasDataQueue tem GAS_QUEUE_SIZE elementos do tipo GasData. Este por sua vez contém um enum GAS_TYPE.

Figura 12. Queing Example [1]

O programa GasDataQueue.c é o núcleo deste design pattern, com os métodos para incluir novos dados na fila e remover os antigos (este código mostra uma boa implementação de buffer circular, um workhorse em sistemas embebidos). A Figura 12 nos diz que duas threads, GasProcessingThread e SensorThread, compartilham o recurso GasDataQueue, como consumidor e provedor respectivamente.

A SensorThread atua provendo os dados à fila: atualiza os dados recebidos pelos sensores e os concatena para corretamente alocá-los em uma das estruturas GasData de GasDataQueue. Este processo de alocar dados de um objeto em uma outra estrutura, para transmiti-los, representá-los e/ou armazená-los de forma significativa, em computação é conhecido como marshalling.

Do lado consumidor, a thread GasProcessingThread periodicamente resgata dados da fila. No exemplo em questão, os dados estão somente sendo impressos na tela de um computador.

Lista 5. Implementação em C do modelo descrito na Figura 12

/*********************************
@file GasDataExample.h
*********************************/
#ifndef QueuingExample_H
#define QueuingExample_H
struct GasController;
struct GasData;
struct GasDataQueue;
struct GasDisplay;
struct GasProcessingThread;
struct HeSensor;
struct N2Sensor;
struct O2Sensor;
struct OSSemaphore;
struct SensorThread;
typedef enum GAS_TYPE {
 O2_GAS,
 N2_GAS,
 HE_GAS,
 UNKNOWN_GAS
} GAS_TYPE;
/* define the size of the queue */
#define GAS_QUEUE_SIZE (10)
/* OS semaphore services */
struct OSSemaphore* OS_create_semaphore(void);
void OS_destroy_semaphore(struct OSSemaphore* sPtr);
void OS_lock_semaphore(struct OSSemaphore* sPtr);
void OS_release_semaphore(struct OSSemaphore* sPtr);
#endif

/*********************************
@file GasData.h
*********************************/
#ifndef GasData_H
#define GasData_H
#include "QueuingExample.h"
typedef struct GasData GasData;
struct GasData {
double conc;
unsigned int flowInCCPerMin;
GAS_TYPE gType;
};
/* Constructors and destructors:*/
void GasData_Init(GasData* const me);
void GasData_Cleanup(GasData* const me);
GasData * GasData_Create(void);
void GasData_Destroy(GasData* const me);
#endif

//EOF
/*********************************
@file GasDataQueue.h
*********************************/
#ifndef GasDataQueue_H
#define GasDataQueue_H
#include "QueuingExample.h"
#include "GasData.h"
#include "OSSemaphore.h"
typedef struct GasDataQueue GasDataQueue;
struct GasDataQueue {
  int head;
  OSSemaphore * sema;
  int size;
  int tail;
  struct GasData itsGasData[GAS_QUEUE_SIZE];
};
/* Constructors and destructors:*/
void GasDataQueue_Init(GasDataQueue* const me);
void GasDataQueue_Cleanup(GasDataQueue* const me);
/* Operations */
int GasDataQueue_insert(GasDataQueue* const me, GasData g);
GasData * GasDataQueue_remove(GasDataQueue* const me);
int GasDataQueue_getItsGasData(const GasDataQueue* const me);
GasDataQueue * GasDataQueue_Create(void);
void GasDataQueue_Destroy(GasDataQueue* const me);
#endif

//EOF
/*********************************
@file GasDataQueue.c
*********************************/
#include "GasDataQueue.h"
#include <stdio.h>
/* private (static) methods */
static void cleanUpRelations(GasDataQueue* const me);
static int getNextIndex(GasDataQueue* const me, int index);
static unsigned char isEmpty(GasDataQueue* const me);
static unsigned char isFull(GasDataQueue* const me);
static void initRelations(GasDataQueue* const me);

void GasDataQueue_Init(GasDataQueue* const me) 
{
  me->head = 0;
  me->sema = NULL;
  me->size = 0;
  me->tail = 0;
  initRelations(me);
  me->sema = OS_create_semaphore();
}
void GasDataQueue_Cleanup(GasDataQueue* const me) 
{
  OS_destroy_semaphore(me->sema);
  cleanUpRelations(me);
}
/*
Insert puts new gas data elements into the queue
if possible. It returns 1 if successful, 0 otherwise.
*/
int GasDataQueue_insert(GasDataQueue* const me, GasData g)
{
  OS_lock_semaphore(me->sema);
  if (!isFull(me)) {
    me->itsGasData[me->head] = g;
    me->head = getNextIndex(me, me->head);
    ++me->size;
    /* instrumentation */
    /* print stuff out, just to visualize the insertions */
    switch (g.gType) 
    {
      case O2_GAS:
      printf("+++ Oxygen ");
      break;
      case N2_GAS:
      printf("+++ Nitrogen ");
      break;
      case HE_GAS:
      printf("+++ Helium ");
      break;
      default:
      printf("UNKNWON ");
      break;
   };
   printf(" at conc %f, flow %d\n",g.conc,g.flowInCCPerMin);
   printf(" Number of elements queued %d, head = %d, tail = %d\n",
   me->size, me->head, me->tail);
   /* end instrumentation */
   OS_release_semaphore(me->sema);
   return 1;
  }
  else 
  { 
    /* return error indication */
    OS_release_semaphore(me->sema);
    return 0;
  }
}
/*
remove creates a new element on the heap, copies
the contents of the oldest data into it, and
returns the pointer. Returns NULL if the queue
is empty
*/
GasData * GasDataQueue_remove(GasDataQueue* const me) 
{
  GasData* gPtr;
  OS_lock_semaphore(me->sema);
  if (!isEmpty(me)) 
  {
    gPtr = (GasData*)malloc(sizeof(GasData));
    gPtr->gType = me->itsGasData[me->tail].gType;
    gPtr->conc = me->itsGasData[me->tail].conc;
    gPtr->flowInCCPerMin = me->itsGasData[me->tail].flowInCCPerMin;
    me->tail = getNextIndex(me, me->tail);
    /* instrumentation */
    switch (gPtr->gType) 
    {
       case O2_GAS:
       printf("— Oxygen ");
       break;
       case N2_GAS:
       printf("— Nitrogen ");
       break;
       case HE_GAS:
       printf("— Helium ");
       break;
       default:
       printf("— UNKNWON ");
       break;
    };
    printf(" at conc %f, flow %d\n",gPtr->conc,gPtr->flowInCCPerMin);
    printf(" Number of elements queued %d, head = %d, tail = %d\n",
    me->size, me->head, me->tail);
 /* end instrumentation */
   OS_release_semaphore(me->sema);
   return gPtr;
  }
  else 
  { /* if empty return NULL ptr */
    OS_release_semaphore(me->sema);
    return NULL;
  }
}
static int getNextIndex(GasDataQueue* const me, int index) 
{
   /* this operation computes the next index from the
   first using modulo arithmetic
   */
   return (index+1) % QUEUE_SIZE;
}
static unsigned char isEmpty(GasDataQueue* const me) 
{
  return (me->size == 0);
}
static unsigned char isFull(GasDataQueue* const me) 
{
  return (me->size == GAS_QUEUE_SIZE);
}
int GasDataQueue_getItsGasData(const GasDataQueue* const me) 
{
  int iter = 0;
  return iter;
}
GasDataQueue * GasDataQueue_Create(void) 
{
  GasDataQueue* me = (GasDataQueue *)
  malloc(sizeof(GasDataQueue));
  if(me!=NULL) 
  {
    GasDataQueue_Init(me);
   }
 return me;
}
void GasDataQueue_Destroy(GasDataQueue* const me)
{
  if(me!=NULL) 
  {
    GasDataQueue_Cleanup(me);
  }
  free(me);
}
static void initRelations(GasDataQueue* const me) 
{
  int iter = 0;
  while (iter < GAS_QUEUE_SIZE)
  {
    GasData_Init(&((me->itsGasData)[iter]));
    iter++;
  }
}
static void cleanUpRelations(GasDataQueue* const me) 
{
  int iter = 0;
  while (iter < GAS_QUEUE_SIZE)
  {
    GasData_Cleanup(&((me->itsGasData)[iter]));
    iter++;
  }
}

De posse destes dados, entretanto, tarefas mais úteis poderiam ser executadas a depender das necessidades da planta, como manter a concentração de determinado gás constante, através de algum tipo de controle retroalimentado, por exemplo.

A Figura 13 mostra o sistema em execução. A tarefa GasProcessingThread é disparada primeiro, com um período de 1000 ms e a a tarefa SensorThread é posteriormente disparada a um período de 500 ms. A fila tem 10 elementos. Note que apesar de cada um dos três sensores terem uma chance de 1/3 de produzir dados neste intervalo, os dados estão sendo mais rapidamente inseridos do que removidos, até que a fila enche. [1]

Figura 13. Queueing exemplo sendo executado

4.4. Rendez-Vous

Se as condições para coordenar uma task são mais complexas que as apresentadas anteriormente, quando fundamentalmente estávamos a proteger um recurso de acesso mútuo, podemos “concretizar” estas condições em um objeto de fato. O padrão Rende-Vouz modela as pré-condições para a coordenação de tasks, na forma de um objeto explicitamente separado com seus próprios dados e funções. É um padrão generalista aplicado para garantir que um conjunto de pré-condições arbitrárias sejam atendidas em tempo de execução. [1]

Figura 14. Rende-vouz pattern modelado em UML [1]

4.4.1 Rationale

Duas ou mais tarefas podem ser sincronizadas utilizando-se de uma estratégia a sua escolha que será codificada na classe Rendevouz. O padrão é facilmente adaptável. Quando uma thread encontra um ponto de sincronização, ela registra-se em um objeto da classe Rendevouz, e bloqueia-se até que este objeto a libere para ser executado, usando qualquer que seja a política de scheduling do sistema. É como se um veículo parasse em algum ponto de inspeção na estrada – o fiscal, i.e., o objeto Rendevouz, só a deixa seguir adiante se as condições (papelada, pneus, etc.), estiverem todas satisfeitas.

4.4.2. Elementos

A classe Rendezvous coordena as tarefas através de duas funções primárias:

void reset(void): reseta os critérios de sincronização, i.e., coloca-os de volta em suas condições iniciais.

void synchronize(void): este método é chamado quando uma task quer ser sincronizada. Se os critérios não estão satisfeitos, esta task será de alguma forma bloqueada. A estratégia para pode ser através de um Observer Pattern ou um Guarded Call Pattern, por exemplo. A complexidade deste padrão concentra-se primariamente neste método que avalia se as condições estão satisfeitas. Estas condições podem ser internas (como a Thread Barrier do padrão), externas, ou qualquer combinação das duas.

Normalmente o objeto RendeVouz é agnóstico em relação às threads que coordena. Se não o for, o método synchronize() terá um mais parâmetros para identificação das threads. [1]

O Semaphore é um semáforo comum, com as operações de lock e release.

SynchronizingThread representa uma task que utiliza o objeto Rendezvous para ser sincronizada: quando atinge seu ponto de sincronização, deve explicitamente chamar o método synchronize().

4.4.3 Implementação

Se o padrão for implementado com auxílio de um Observer Pattern, então as tasks precisam registrarem-se com o endereço de um callback a ser chamado quando os critérios de sincronização foram atendidos. Se o padrão Guarded Call Pattern (veja publicação anterior) for utilizado, então cada objeto RendezVous tem somente um único semáforo, que maneja uma fila de tasks que registraram-se naquele objeto RendezVous. Note que neste último caso, o padrão acaba por ser uma solução custosa (overkill), mas que simplifica a implementação de guarded calls, ao concentrá-las em um único componente do programa. É o típico trade-off reusabilidade/custo, que normalmente nos deparamos nas escolhas de design.

4.4.4 Exemplo

No exemplo da Figura 15, uma forma específica do padrão Rendezvous, conhecida como Thread Barrier Pattern [1] é implementada.

Figura 15. Rendezvous implementando uma Thread Barrier

Note que neste snippet, a única informação que o objeto recebe, fora do seu escopo, é o número de tarefas a serem sincronizadas, através do atributo expectedCount. Quando o número de tasks que invocaram o método synchronize() atinge 3, as tarefas são liberadas para o kernel manejá-las com a política de agendamento que utiliza. Os objetos do tipo semáforo e barrier são ponteiros que assumem a referência passada pela task que chamou o ThreadBarrier. O snippet não mostra nenhuma lógica que avalie informações externas, portanto estamos simplesmente a implementar um Guarded Call Pattern, de uma maneira bastante reutilizável. O importante é perceber o forte caráter extensível deste padrão.

Vale aqui replicar a Figura 3 da primeira parte do artigo:

Figura 16. Representação dinâmica de uma Thread Barrier; neste caso expectedCount = 4

Lista 6. Implementação em C do modelo descrito na Figura 15

/******************************
@file ThreadBarrier.h
******************************/
#ifndef ThreadBarrier_H
#define ThreadBarrier_H
/*# # auto_generated */
#include <oxf/Ric.h>
/*# # auto_generated */
#include "RendezvousExample.h"
/*# # auto_generated */
#include <oxf/RiCTask.h>
/*# # package RendezvousPattern::RendezvousExample */
/*# # class ThreadBarrier */
typedef struct ThreadBarrier ThreadBarrier;
struct ThreadBarrier {
  int currentCount;
  int expectedCount;
  OSSemaphore* barrier;
  OSSemaphore* mutex;
};
/* Constructors and destructors:*/
void ThreadBarrier_Init(ThreadBarrier* const me);
void ThreadBarrier_Cleanup(ThreadBarrier* const me);
/* Operations */
void ThreadBarrier_reset(ThreadBarrier* const me, int x);
void ThreadBarrier_synchronize(ThreadBarrier* const me);
ThreadBarrier * ThreadBarrier_Create(void);
void ThreadBarrier_Destroy(ThreadBarrier* const me);
#endif
// EOF
/******************************
@file ThreadBarrier.c
******************************/
#include "ThreadBarrier.h"
void ThreadBarrier_Init(ThreadBarrier* const me) 
{
  me->currentCount = 0;
  me->expectedCount = 3;
  if (me->barrier) 
  {
    OSSemaphore_lock(me->barrier);
    printf("BARRIER IS LOCKED FIRST TIME\n");
 }
}
void ThreadBarrier_Cleanup(ThreadBarrier* const me) 
{
  OSSemaphore_Destroy(me->barrier);
  OSSemaphore_Destroy(me->mutex);
}
void ThreadBarrier_reset(ThreadBarrier* const me, int x) 
{
  me->expectedCount = x;
  me->currentCount = 0;
}
void ThreadBarrier_synchronize(ThreadBarrier* const me) 
{
/*
protect the critical region around
the currentCount
*/
  OSSemaphore_lock(me->mutex);
  ++me->currentCount; /* critical region */
  OSSemaphore_release(me->mutex);
/*
are conditions for unblocking all threads met?
if so, then release the first blocked
thread or the highest priority blocked
thread (depending on the OS)
*/
  if (me->currentCount == me->expectedCount) 
  {
   printf("Conditions met\n");
   OSSemaphore_release(me->barrier);
   //let the scheduler do its job
  }
/*
lock the semaphore and when condition met
then release it for the next blocked thread
*/
  OSSemaphore_lock(me->barrier);
  /* code to check if condition met */
  OSSemaphore_release(me->barrier);
}
ThreadBarrier * ThreadBarrier_Create(void) 
{
  ThreadBarrier* me = (ThreadBarrier *) malloc(sizeof(ThreadBarrier));
  if(me!=NULL)
  {
    ThreadBarrier_Init(me);
    return me;
  }
void ThreadBarrier_Destroy(ThreadBarrier* const me) 
{
   if(me!=NULL)
   ThreadBarrier_Cleanup(me);
   free(me);
}

Fim da segunda parte.

Todos os padrões e exemplos aqui apresentados são de: 
[1] Douglass, Bruce Powel. Design patterns for embedded C: an embedded software engineering toolkit, 1st ed. ISBN 978-1-85617-707-8

Autor: Antonio Giacomelli de Oliveira

Engenheiro Eletrônico

Deixe uma Resposta

Preencha os seus detalhes abaixo ou clique num ícone para iniciar sessão:

Logótipo da WordPress.com

Está a comentar usando a sua conta WordPress.com Terminar Sessão /  Alterar )

Google photo

Está a comentar usando a sua conta Google Terminar Sessão /  Alterar )

Imagem do Twitter

Está a comentar usando a sua conta Twitter Terminar Sessão /  Alterar )

Facebook photo

Está a comentar usando a sua conta Facebook Terminar Sessão /  Alterar )

Connecting to %s