Abordagens para projeto low-power (3/3)

6 Abordagens para redução de consumo em RTL

As ferramentas de síntese e back-end detectam oportunidades para salvar energia diretamente do RTL, portanto a microarquitetura e, consequentemente o estilo de codificação tem um grande impacto no consumo. A medida que o fluxo avança, menores são as intervenções que podem ser feitas para reduzir a energia dinâmica e estática. Um bom design low-power começa nos níveis arquiteturais como já foi discutido e, ainda antes da síntese, durante a escrita do código em linguagem de descrição de hardware. É dito que 80% do consumo em um ASIC estará definido no momento em que o RTL é finalizado. Somente os outros 20% dependerão do back-end. [1]

6.1 Máquinas de estado – codificação e decomposição

Minimizar a inversão de bits nas transições entre um estado e outro, naturalmente minimiza o consumo de uma máquina de estados. Codificação Grey, onde somente um bit inverte durante uma transição entre estados, é um cliché para a redução de consumo.

Figura 6.1 Codificação Binária versus Grey em uma FSM de 5 estados (Figura retirada de [1])

Ainda, pode-se considerar uma abordagem mais ad hoc, ao identificar os caminhos mais frequentes exercitados pela máquina, e codificá-los de forma que menos bits transicionem entre um estado e outro.

Na Figura 6.1, numa máquina de 5 estados, há 32 transições possíveis. Neste caso, somente 6 podem acontecer de fato. Na codificação binária, 2 bits invertem nas transições B->C e E->B. Se 30% das transições na operação normal forem E->B (ou B->C), a economia de energia com o Grey code fica em 15% somente na inversão dos bits dos registradores, e ainda haverá redução de consumo na lógica combinatória, na mesma proporção ou ainda mais [1].

Uma outra forma de economizar energia é decompor uma FSM em diferentes sub-máquinas, garantindo que as submáquinas em conjunto tenham o mesmo STG (grafo de transição de estados) da máquina original. Durante a operação, exceto se houver transições entre uma submáquina e outra, somente uma precisa estar ativa. Um problema desta abordagem é que os STGs crescem exponencialmente com o número de registros e as técnicas para decompor a máquina também tornam-se mais complicadas [3].

6.2 Representação binária

Já foi discutido neste site a representação e operação de números negativos em circuitos digitais. Para a maioria das aplicações, complemento de 2 é mais adequada que sua representação por sinal-magnitude [1]. Entretanto, para algumas aplicações bastante especificas, pode ser vantajoso o uso de sinal-magnitude. Numa transição de “0” para “-1”, a representação em complemento de 2 inverte todos os bits, ao contrário da lógica sinal-magnitude:

// complemento de 2:
"0" : 00000000
"-1": 11111111

// sinal-magnitude
"0" : 00000000
"-1": 10000001

6.3 Inferência para clock-gating

Abaixo duas possíveis descrições em Verilog para um banco de flip-flops (reg_bank_ff) registrar um novo dado somente (reg_bank_nxt) quando o sinal de seleção load_sel estiver alto. O próximo valor ainda depende de um sinal de seleção deadbeef_sel:

Código 1. Inferência de clock-gating

No “mau exemplo” o sinal i_load_sel_w não foi colocado diretamente sob o domínio do clock, e sim em um bloco de lógica puramente combinacional. É ainda possível que a ferramenta infira o clock-gating após o flattening, mas não é boa prática contar com isso. No “bom exemplo” a ferramenta de síntese prontamente infere que i_load_sel_w é o controle da célula de clock-gating que será integrada (linha 47), economizando 32 multiplexadores que seriam controlados por i_load_sel_w, como descrito no código de “mau exemplo”.

6.4 Multiplexadores One-Hot

Multiplexadores são descritos/inferidos normalmente a partir de cases e ifs [1]:

// Mux 4:1 (linha de seleção em código binário)  
case (SEL)     
2'b00: out = a;    
2'b01: out = b;    
2'b10: out = c;    
2'b11: out = d; 
endcase  
//Mux 4:1  (linha de seleção em código one-hot)  
case (SEL)    
4'b0001: out = a;    
4'b0010: out = b;    
4'b0100: out = c;    
4'b1000: out = d;    
default: out = out;  
endcase

Se as linhas de seleção de um multiplexador são barramentos de vários bits, o chaveamento pode ser significante.

Na Figura 6.3, à direita, os barramentos não selecionados são prontamente mascarados pela lógica AND na entrada do gate OR, diminuindo ainda o atraso da entrada selecionada para a saída.

Figura 6.3 – Muxes N:1 – Binário x One Hot [1]

A maior parte da lógica em um design pode consistir em multiplexadores, portanto, evitar ou mascarar falsas transições deve diminuir o consumo significativamente [1, 2]. Note que a codificação one-hot também é um bom padrão de escolha para máquinas de estado.

6.5 Evitar transições redundantes

Analise a microarquitetura da Figura 6.4. O sinal load_op ativa os 4 operandos a_in, b_in, c_in e d_in. O sinal sel_in seleciona qual operação terá o resultado carregado na saída. Perceba que se o sinal load_op não precisa – ou não deveria – estar ativo se o sinal load_out não for ativado também, visto que as operações nas nuvens lógicas não terão efeito na saída.

Figura 6.4 – Microarquitetura que permite transições redundantes (inúteis) [1]

Na figura abaixo, a microarquitetura é modificada para evitar transições inúteis. A adição de portas AND na entrada dos operandos a_in e b_in faz com que estes sejam habilitados somente quando sel_in é ‘0’; c_in e d_in por sua vez, somente quando sel_in é ‘1’.

Figura 6.5 – Supressão das transições redudantes [1]

6.6 Compartilhamento de recursos

Observe as seguintes lógicas combinatórias descrita em Verilog [adaptado de 1]:

// Codigo #1 
// sem compartilhar recursos
always @(*) begin
 case(SEL) 
   3'b000: OUT = 1'b0;
   3'b001: OUT = 1'b1;
   3'b010: OUT = (value1 == value2); //equals
   3'b011: OUT = (value1 != value2);  //different
   3'b100: OUT = (value1 >= value2); // greater or equal
   3'b101: OUT = (value1 <= value2); // less or equal
   3'b110: OUT = (value1 < value2); // less
   3'b111: OUT = (value1 > value2); // greater
endcase
// Codigo #2
// compartilhando recursos
assign cmp_equal = (value1 == value2);
assign cmp_greater = (value1 > value2);
always @(*) begin
 case(SEL) 
   3'b000: OUT = 1'b0;
   3'b001: OUT = 1'b1;
   3'b010: OUT = cmp_equal;
   3'b011: OUT = !cmp_equal;
   3'b100: OUT = cmp_equal || cmp_greater;
   3'b101: OUT = !cmp_greater || cmp_equal ;
   3'b110: OUT = !cmp_greater;
   3'b111: OUT = cmp_greater;
endcase

O Código #1 faz uma operação aritmética para cada seleção de entrada SEL. Estas operações têm em comum a comparação entre 2 operandos, value1 e value2. No código #2, duas lógicas de comparação foram assinaladas às nets cmp_equal e cmp_greater. Estas, por sua vez, foram utilizadas nas saídas da seleção e evitar a replicação de operações aritméticas que envolvem os mesmos operandos e operadores.

6.7 Inversão de barramento

Quando a Distância de Hamming entre o dado atual e o próximo é maior que N/2 (sendo N o tamanho do barramento) é possível invertê-los para minimizar o número de transições:

Figura 6.6 – Para um barramento de 8 bits, quando a distância de hamming entre um dado e o próximo é maior que 4, o número de transições é reduzido ao aplicarmos a inversão (Adaptado de [1]).

Note que um sinal de controle INV é necessário para indicar ao receptor que os dados foram invertidos. Note que mesmo que adicionemos um encoder em Tx e um decoder em Rx, o barramento e portanto as maiores capacitâncias transitaram menos. As maiores correntes são evitadas e a transmissão dos dados consome menos energia.

O texto desta publicação é original. As seguintes fontes foram consultadas: [1] The Art of Hardware Architecture, Mohit Ahora (ISBN 978-1-4614-0397-5)􀀤
[2] Ultra-Low Power Integrated Circuit Design, Niaxiong Nick Tan et al (ISBN 978-1-4419-9973-3)
[3] FSM decomposition by direct circuit manipulation applied to low power design, JC Monteiro et. al. DOI: 10.1109/ASPDAC.2000.835123

A first preemptive kernel on ARM Cortex-M3

1. Introduction

When it comes to operating systems for embedded software, they are generally thought to be an overkill for most solutions. However, between a completely “hosted” multiapplication system (which uses a multi thread and general purpose OS) and a completely “standalone/monolithic” or “bare-metal” application specific system, there are many variations we can go for.

In previous publications I explored the notion of cooperative tasks, from a loop calling tasks from an array of function pointers, to something a little more complex with the processes being handled in a circular buffer, with explicit time criteria. This time I will show a minimal preemptive implementation, to also pave the way for something a little more complex, as in other publications.

2 Preemptive versus Cooperative

In a fully cooperative system, the processor does not interrupt any task to accept another, and the task itself needs to release the processor for the next one to use it. There is nothing wrong with this. A Run-To-Completion scheme is sufficient and/or necessary for many applications, and many embedded systems were deployed this way, including some very complex ones. In the past, even non-embedded systems used a cooperative kernel (Windows 3.x, NetWare 4.x, among others). If a task crashes, the entire system is compromised when we speak in a strictly cooperative way: it keeps the processor from going further (so in a server operating system like NetWare, this does not seem to be a good idea, because multiple clients are a must!).

In preemptive mode, tasks are interrupted and later resumed– i.e., a context (set of states in the processor registers) is saved and then retrieved. This leads to more complexity to the implementation but, if well done, it increases the robustness and the possibility of meeting narrower timing requirements, mainly if used with a priority and/or periodicity criteria to manage the queue.

3 Call stack

A processor is, in fact, a programmable finite-state machine. With some simplification, each state of our program can be defined within the set of core register values . This set dictates which program point is active. Therefore, activating a task means pushing values ​​to the call stack so that this task will be processed. This set of values is called context. To resume the task afterwards, it is necessary to save the call stack data at that point in the program. This “frozen” data represents a program state and is therefore called a stack frame. For every saved stackframe there is a context related. To resume a task, a previously saved stack frame is loaded back into the call stack.

In the ARM Cortex-M3, the 32-bit registers that define the active state of the processor are: R0-R12 for general use and R13-R15 registers for special use, in addition to the Program Status Register (xPSR) – its value is on the top of any stackframe, and it is not actually a single physical register, but a composition of three (Application, Interrupt and Execution: APSR, IPSR e EPSR).

Esta imagem possuí um atributo alt vazio; O nome do arquivo é armcortexm3arch-1.png

3.1. Load-store architectures

A load-store architecture is a processor architecture in which data from memory needs to be loaded to the core registers before being processed. Also, the result of this processing before being stored in memory must be in a register.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é registersm3-1.png

The two basic memory access operations on Cortex-M3:

// reads the data contained in the address indicated by Rn + offset and places it in Rn. 
LDR Rd, [Rn, #offset]
// stores data contained in Rn at the address pointed by Rd + offset
STR Rn, [Rd, #offset]

It is important to understand at least the Cortex-M3 instructions shown below. I suggest sources [1] and [2] as good references, in addition to this or this link.

MOV Rd, Rn // Rd = Rn
MOV Rd, #M // Rd = M, the immediate being a 32-bit value (here represented by M)
ADD Rd, Rn, Rm // Rd = Rn + Rm
ADD Rd, Rn, #M // Rd = Rn + M
SUB Rd, Rn, Rm // Rd = Rn - Rm
SUB Rd, Rn, #M // Rd = Rn - M
// pseudo-instruction to save Rn in a memory location.
// After a PUSH, the value of the stack pointer is decreased by 4 bytes
PUSH {Rn}
// POP increases SP by 4 bytes after loading data into Rn.
// this increase-decrease is based on the current address the SP is pointing to
POP {Rn}
B label // jump to routine label
BX Rm // jump to routine specified indirectly by Rm
BL label // jump to label and moves the caller address to LR

CPSID I // enable interrupts
CPSIE I // disable interrupts

We will operate the M3 in Thumb mode , where the instructions are actually 16 bits. According to ARM , this is done to improve code density while maintaining the benefits of a 32-bit architecture. Bit 24 of the PSR is always 1.

3.2. Stacks and stack pointer (SP)

Stack is a memory usage model [1]. It works in the Last In – First Out format (last to enter, first to leave). It is as if I organized a pile of documents to read. It is convenient that the first document to be read is at the top of the stack, and the last at the end.

We usually divide the memory between heap and stack . As said, the “call stack” will contain that temporary data that determines the next state of the processor. The heap contains data which nature is not temporary in the course of the program (this does not mean “non-volatile”). The stack pointer is a kind of pivot that keeps control of the program flow, by pointing to some position of the stack.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é stackusage.png
Figure 3. Model for using a stack. When saving data before processing (transforming) it saves the previous information. (Figure from [1])
Esta imagem possuí um atributo alt vazio; O nome do arquivo é annotation-2020-02-06-001056.png
Figure 4. Regions of memory mapped on a Cortex M3. The region available for the stack is confined between the addresses 0x20008000 and 0 0x20007C00. [1]

4 Multitasking on the ARM Cortex M3

The M3 offers two stack pointers (Main Stack Pointer and Process Stack Pointer) to isolate user processes from the kernel processes. Every interrupt service runs in kernel mode. It is not possible to go from user mode to kernel mode (actually called thread mode and privileged mode, respectively) without going through an interruption – but it is possible to go from privileged mode to user mode by changing the control register.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é so.png
Figure 5. Changing context on an OS that isolates the kernel application [1]

The core also has dedicated hardware for switching tasks. The SysTick interrupt service can be used to implement synchronous context switching. There are still other asynchronous software interruptions (traps) like PendSV and SVC. Thus, SysTick is used for synchronous tasks in the kernel, while SVC serves asynchronous interruptions, when the application makes a call to the system. The PendSV  is a software interruption which by default can only be triggered in privileged mode. It is usually suggested [1] to trigger it within SysTick service, because it is possible to keep track of the ticks to meet the time criteria. The interruption by SysTick is soon served, with no risk of losing any tick of the clock. A secure OS implementation would use both stack pointers to separate user and kernel threads and separate memory domains if an MPU (Memory Protection Unit) is available. At first, we will only use the MSP in privileged mode.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é 2sps.png
Figure 6. Memory layout on an OS with two stack pointers and protected memory [1]

5. Building the kernel

Kernel is a somewhat broad concept, but I believe that there is no OS which the kernel is not responsible for scheduling tasks. In addition, there must be IPC (inter-process communication) mechanisms. It is interesting to note the strong hardware-dependency of the scheduler that will be shown, due to its low-level nature .

5.1. Stackframes and context switching

When a SysTick is served, part of the call stack is saved by the hardware (R0, R1, R2, R3, R12, R14 (LR) and R15 (PC) and PSR). Let’s call this portion saved by the hardware stackframe. The remaining is the software stackframe [3], which we must explicitly save and retrieve with the PUSH and POP instructions .

To think about our system, we can outline a complete context switch depicting the key positions the stack pointer assumes during the operation (in the figure below the memory addresses increase, from bottom to top. When SP points to R4 it is aligned with an address lower than the PC on the stack)

Esta imagem possuí um atributo alt vazio; O nome do arquivo é ctxtswitch-1.png
Figure 7. Switching contexts. The active task is saved by the hardware and the kernel. The stackpointer is re-assigned, according to pre-established criteria, to the R4 of the next stackframe to be activated. The data is rescued. The task is performed. (Figure based on [3]) (“Salvo pelo hardware/kernel” translates to “Saved /pushed by hardware/kernel”; “Resgatado pelo hardware/kernel” translates to “retrieved/popped by hardware/kernel”)

When an interruption takes place, SP will be pointing to the top of the stack (SP (O)) to be saved. This is inevitable because this is how the M3 works. In an interruption the hardware will save the first 8 highest registers in the call stack at the 8 addresses below the stack pointer, stopping at (SP (1)). When we save the remaining registers, the SP will now be pointing to the R4 of the current stack (SP (2)). When we reassign the SP to the address that points to the R4 of the next stack (SP (3)), the POP throws the values of R4-R11 to the call stack and the stack pointer is now at (SP (4)). Finally, the return from the interrupt pops the remaining stackframe, and the SP (5) is at the top of the stack that has just been activated. (If you’re wondering where R13 is: it stores the value of the stack pointer)

The context switching routine is written in assembly and implements exactly what is described in Figure 7.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é systick-2.png
Figure 8. Context switcher

PS: When an interruption occurs, the LR takes on a special code. 0xFFFFFFF9 , if the interrupted thread was using MSP or 0xFFFFFFFD if the interrupted thread was using PSP.

5.1 Initializing the stacks for each task

For the above strategy to work, we need to initialize the stacks for each task accordingly. The sp starts by pointing to R4. This is by definition the starting stack pointer of a task, as it is the lowest address in a frame .

In addition, we need to create a data structure that correctly points to the stacks that will be activated for each SysTick service . We usually call this structure a TCB (thread control block). For the time being we do not use any selection criteria and therefore there are no control parameters other than next: when a task is interrupted, the next one in the queue will be resumed and executed.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é struct.png
Figure 9. Thread control block
Esta imagem possuí um atributo alt vazio; O nome do arquivo é stackinit-3.png
Figure 10. Initializing the stack (the values representing the registers, like 0x07070707 are for debugging purposes)

The kSetInitStack function initializes the stack for each “i” thread . The stack pointer in the associated TCB points to the data relating to R4. The stack data is initialized with the register number, to facilitate debugging. The PSR only needs to be initialized with bit 24 as 1, which is the bit that identifies Thumb mode . The tasks have the signature void Task (void * args) .

To add a task to the stack, we initially need the address of the main function of the task. In addition, we will also pass an argument. The first argument is in R0. If more arguments are needed, other registers can be used, according to AAPCS (ARM Application Procedure Call Standard).

Esta imagem possuí um atributo alt vazio; O nome do arquivo é addthreads-1.png
Figure 11. Routine for adding tasks and their arguments to the initial stackframe

5.3. Kernel start-up

It is not enough to initialize the stacks and wait for the SysTick. The TCB structure sp will only hold a valid stack pointer value when the task is interrupted. We have two types of threads running: background and foreground threads. The background includes the kernel routines, including the context switcher. At each SysTick, it is the kernel’s turn to use the processor. In the foreground are the applications.

In [2] a start-up routine is suggested:

Esta imagem possuí um atributo alt vazio; O nome do arquivo é kstart-1.png
Figure 13. Routine for booting the kernel

6. Putting it all together

To illustrate, we will perform, in Round-Robin 3 tasks that switch the output of 3 different pins and increment three different counters. The time between a change of context and another will be 1000 cycles of the main clock. Note that these functions run within a “while (1) {}”. It is like we have several main programs running on the foreground . Each stack has 64 x 4-byte elements (256 bytes).

Esta imagem possuí um atributo alt vazio; O nome do arquivo é tasks-1.png
Figure 14. System Tasks

Below the main function of the system. The hardware is initialized. Tasks are added and the stacks are initialized. The RunPtr receives the address of the thread.  After setting the SysTick to trigger every 1000 clock cycles, boot up the kernel . After executing the first task and being interrupted, the system is switching between one task and another, with the context switcher running in the background .

Esta imagem possuí um atributo alt vazio; O nome do arquivo é main.png
Figure 15. Main program

6.1. Debug

You will need at least a simulator to implement the system more easily, as you will need to access the core registers and see the data moving in the stacks. If the system is working, each time the debugger is paused, the counters should have almost the same value.

In the photo below, I use an Arduino Due board with an Atmel SAM3X8E processor and an Atmel ICE debugger connected to the board’s JTAG. On the oscilloscope you can see the waveforms of the outputs switching for each of the 3 tasks.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é 20200209_163936.jpg
Figure 16. Debug bench
Esta imagem possuí um atributo alt vazio; O nome do arquivo é ds1z_quickprint1-1.png
Figure 17. Tasks 1, 2, 3 on the oscilloscope.

7 Conclusions

The implementation of a preemptive kernel requires reasonable knowledge of the processor architecture to be used. Loading the call stack registers and saving them in a “handmade” way allows us to have greater control of the system at the expense of the complexity of handling the stacks.

The example presented here is a minimal example where each task is given the same amount of time to be performed. After that time, the task is interrupted and suspended – the data from the call stack is saved. This saved set is called a stackframe – a “photograph” of the point at the program was. The next task to be performed is loaded at the point it was interrupted and resumed. The code was written in order to explain the concepts.

In the next publication we will split the threads between user mode and privileged mode – using two stack pointers – a fundamental requirement for a more secure system.

References

The text of this post as well as the non-referenced figures are from the author.
[1] The definitive guide to ARM Cortex-M3, Joseph Yiu
[2] Real-Time Operating Systems for the ARM Cortex-M, Jonathan Valvano
[3] https://www.embedded.com/taking-advantage-of-the-cortex-m3s-pre-emptive-context-switches/

Padrões de design para comunicação interprocessos em software embarcado (2/3)

«Primeira parte»

4.3. Queuing Pattern

Quando tasks têm baixo acomplamento temporal, i.e., virtualmente1 o tempo em que um processo consumidor leva para resgatar o resultado de um processo produtor não é condição para diferenciar falha de sucesso, o padrão de Message Queueing é bastante comum.

O processo produtor envia uma mensagem a uma fila (um buffer, normalmente circular) e algum tempo depois o processo consumidor resgata esta mensagem para seguir adiante. A mensagem pode ser desde um simples booleano ou qualquer outra estrutura mais complexa.

1 isto não significa que o sistema não tenha condições temporais. Significa somente que determinadas tasks não têm relação temporal rígida entre si naquele ponto de sincronização (coordenação).

Figura 10. Message Queue para coordenar processos [2]
Figura 11. Generalização do padrão Queuing em UML

4.3.1. Rationale

Se os processos são fracamente acoplados no tempo, podemos coordená-los (ou sincronizá-los) com um mecanismo assíncrono: um processo envia a mensagem e o outro lê “quando puder”.

4.3.2 Elementos

Na Figura 11, diversas tarefas QTask compartilham uma única MessageQueue. Um Mutex agregado a esta fila de mensagens, coordena o acesso de cada tarefa à fila de mensagens. A fila tem QUEUE_SIZE elementos do tipo Message, que podem ser qualquer dado/comando – desde um simples booleano até uma PDU do tipo CAN, por exemplo. Este tamanho deve comportar o pior caso para filas no sistema1. Poderíamos dispensar o Mutex se agregássemos a cada processo a sua própria caixa de mensagens, obviamente ao custo de utilizarmos mais memória.

1 Um dos métodos empregados para calcular o tamanho da fila é o Teorema de Little

4.3.3. Implementação

No que diz respeito à complexidade, este tipo de mecanismo de IPC é relativamente simples, mas existem muitas variantes. Algumas mensagens podem ser mais prioritárias que outras, em sistemas complexos o tamanho da fila é difícil de ser optimizado e algumas implementações fazem uso de alocação dinâmica (o que em embarcados, é controverso) ou até mesmo guardam mensagens pouco prioritárias no sistema de arquivos [1]. A utilização de linked lists para implementação também é uma abordagem que conta com a flexibilidade do próximo item a ser lido ser passado por referência, e com esta flexibilidade também os potenciais problemas que o extensivo uso de ponteiros trás.

Neste artigo um scheduler cooperativo manejava as tarefas em fila, onde aquelas mais urgentes (baseados na deadline) eram colocadas no início da fila. Fundamentalmente isto é um mecanismo de comunicação interprocessos do padrão queueing: a fila coordena o disparo dos processos que concorrem para utilizar um recurso: o microprocessador.

4.3.4. Exemplo

A Figura 12 mostra um sistema, em que sensores de determinados gases compartilham um queue para registrar os valores medidos na estrutura de dados GasData. Um GasDataQueue tem GAS_QUEUE_SIZE elementos do tipo GasData. Este por sua vez contém um enum GAS_TYPE.

Figura 12. Queing Example [1]

O programa GasDataQueue.c é o núcleo deste design pattern, com os métodos para incluir novos dados na fila e remover os antigos (este código mostra uma boa implementação de buffer circular, um workhorse em sistemas embebidos). A Figura 12 nos diz que duas threads, GasProcessingThread e SensorThread, compartilham o recurso GasDataQueue, como consumidor e provedor respectivamente.

A SensorThread atua provendo os dados à fila: atualiza os dados recebidos pelos sensores e os concatena para corretamente alocá-los em uma das estruturas GasData de GasDataQueue. Este processo de alocar dados de um objeto em uma outra estrutura, para transmiti-los, representá-los e/ou armazená-los de forma significativa, em computação é conhecido como marshalling.

Do lado consumidor, a thread GasProcessingThread periodicamente resgata dados da fila. No exemplo em questão, os dados estão somente sendo impressos na tela de um computador.

Lista 5. Implementação em C do modelo descrito na Figura 12

/*********************************
@file GasDataExample.h
*********************************/
#ifndef QueuingExample_H
#define QueuingExample_H
struct GasController;
struct GasData;
struct GasDataQueue;
struct GasDisplay;
struct GasProcessingThread;
struct HeSensor;
struct N2Sensor;
struct O2Sensor;
struct OSSemaphore;
struct SensorThread;
typedef enum GAS_TYPE {
 O2_GAS,
 N2_GAS,
 HE_GAS,
 UNKNOWN_GAS
} GAS_TYPE;
/* define the size of the queue */
#define GAS_QUEUE_SIZE (10)
/* OS semaphore services */
struct OSSemaphore* OS_create_semaphore(void);
void OS_destroy_semaphore(struct OSSemaphore* sPtr);
void OS_lock_semaphore(struct OSSemaphore* sPtr);
void OS_release_semaphore(struct OSSemaphore* sPtr);
#endif

/*********************************
@file GasData.h
*********************************/
#ifndef GasData_H
#define GasData_H
#include "QueuingExample.h"
typedef struct GasData GasData;
struct GasData {
double conc;
unsigned int flowInCCPerMin;
GAS_TYPE gType;
};
/* Constructors and destructors:*/
void GasData_Init(GasData* const me);
void GasData_Cleanup(GasData* const me);
GasData * GasData_Create(void);
void GasData_Destroy(GasData* const me);
#endif

//EOF
/*********************************
@file GasDataQueue.h
*********************************/
#ifndef GasDataQueue_H
#define GasDataQueue_H
#include "QueuingExample.h"
#include "GasData.h"
#include "OSSemaphore.h"
typedef struct GasDataQueue GasDataQueue;
struct GasDataQueue {
  int head;
  OSSemaphore * sema;
  int size;
  int tail;
  struct GasData itsGasData[GAS_QUEUE_SIZE];
};
/* Constructors and destructors:*/
void GasDataQueue_Init(GasDataQueue* const me);
void GasDataQueue_Cleanup(GasDataQueue* const me);
/* Operations */
int GasDataQueue_insert(GasDataQueue* const me, GasData g);
GasData * GasDataQueue_remove(GasDataQueue* const me);
int GasDataQueue_getItsGasData(const GasDataQueue* const me);
GasDataQueue * GasDataQueue_Create(void);
void GasDataQueue_Destroy(GasDataQueue* const me);
#endif

//EOF
/*********************************
@file GasDataQueue.c
*********************************/
#include "GasDataQueue.h"
#include <stdio.h>
/* private (static) methods */
static void cleanUpRelations(GasDataQueue* const me);
static int getNextIndex(GasDataQueue* const me, int index);
static unsigned char isEmpty(GasDataQueue* const me);
static unsigned char isFull(GasDataQueue* const me);
static void initRelations(GasDataQueue* const me);

void GasDataQueue_Init(GasDataQueue* const me) 
{
  me->head = 0;
  me->sema = NULL;
  me->size = 0;
  me->tail = 0;
  initRelations(me);
  me->sema = OS_create_semaphore();
}
void GasDataQueue_Cleanup(GasDataQueue* const me) 
{
  OS_destroy_semaphore(me->sema);
  cleanUpRelations(me);
}
/*
Insert puts new gas data elements into the queue
if possible. It returns 1 if successful, 0 otherwise.
*/
int GasDataQueue_insert(GasDataQueue* const me, GasData g)
{
  OS_lock_semaphore(me->sema);
  if (!isFull(me)) {
    me->itsGasData[me->head] = g;
    me->head = getNextIndex(me, me->head);
    ++me->size;
    /* instrumentation */
    /* print stuff out, just to visualize the insertions */
    switch (g.gType) 
    {
      case O2_GAS:
      printf("+++ Oxygen ");
      break;
      case N2_GAS:
      printf("+++ Nitrogen ");
      break;
      case HE_GAS:
      printf("+++ Helium ");
      break;
      default:
      printf("UNKNWON ");
      break;
   };
   printf(" at conc %f, flow %d\n",g.conc,g.flowInCCPerMin);
   printf(" Number of elements queued %d, head = %d, tail = %d\n",
   me->size, me->head, me->tail);
   /* end instrumentation */
   OS_release_semaphore(me->sema);
   return 1;
  }
  else 
  { 
    /* return error indication */
    OS_release_semaphore(me->sema);
    return 0;
  }
}
/*
remove creates a new element on the heap, copies
the contents of the oldest data into it, and
returns the pointer. Returns NULL if the queue
is empty
*/
GasData * GasDataQueue_remove(GasDataQueue* const me) 
{
  GasData* gPtr;
  OS_lock_semaphore(me->sema);
  if (!isEmpty(me)) 
  {
    gPtr = (GasData*)malloc(sizeof(GasData));
    gPtr->gType = me->itsGasData[me->tail].gType;
    gPtr->conc = me->itsGasData[me->tail].conc;
    gPtr->flowInCCPerMin = me->itsGasData[me->tail].flowInCCPerMin;
    me->tail = getNextIndex(me, me->tail);
    /* instrumentation */
    switch (gPtr->gType) 
    {
       case O2_GAS:
       printf("— Oxygen ");
       break;
       case N2_GAS:
       printf("— Nitrogen ");
       break;
       case HE_GAS:
       printf("— Helium ");
       break;
       default:
       printf("— UNKNWON ");
       break;
    };
    printf(" at conc %f, flow %d\n",gPtr->conc,gPtr->flowInCCPerMin);
    printf(" Number of elements queued %d, head = %d, tail = %d\n",
    me->size, me->head, me->tail);
 /* end instrumentation */
   OS_release_semaphore(me->sema);
   return gPtr;
  }
  else 
  { /* if empty return NULL ptr */
    OS_release_semaphore(me->sema);
    return NULL;
  }
}
static int getNextIndex(GasDataQueue* const me, int index) 
{
   /* this operation computes the next index from the
   first using modulo arithmetic
   */
   return (index+1) % QUEUE_SIZE;
}
static unsigned char isEmpty(GasDataQueue* const me) 
{
  return (me->size == 0);
}
static unsigned char isFull(GasDataQueue* const me) 
{
  return (me->size == GAS_QUEUE_SIZE);
}
int GasDataQueue_getItsGasData(const GasDataQueue* const me) 
{
  int iter = 0;
  return iter;
}
GasDataQueue * GasDataQueue_Create(void) 
{
  GasDataQueue* me = (GasDataQueue *)
  malloc(sizeof(GasDataQueue));
  if(me!=NULL) 
  {
    GasDataQueue_Init(me);
   }
 return me;
}
void GasDataQueue_Destroy(GasDataQueue* const me)
{
  if(me!=NULL) 
  {
    GasDataQueue_Cleanup(me);
  }
  free(me);
}
static void initRelations(GasDataQueue* const me) 
{
  int iter = 0;
  while (iter < GAS_QUEUE_SIZE)
  {
    GasData_Init(&((me->itsGasData)[iter]));
    iter++;
  }
}
static void cleanUpRelations(GasDataQueue* const me) 
{
  int iter = 0;
  while (iter < GAS_QUEUE_SIZE)
  {
    GasData_Cleanup(&((me->itsGasData)[iter]));
    iter++;
  }
}

De posse destes dados, entretanto, tarefas mais úteis poderiam ser executadas a depender das necessidades da planta, como manter a concentração de determinado gás constante, através de algum tipo de controle retroalimentado, por exemplo.

A Figura 13 mostra o sistema em execução. A tarefa GasProcessingThread é disparada primeiro, com um período de 1000 ms e a a tarefa SensorThread é posteriormente disparada a um período de 500 ms. A fila tem 10 elementos. Note que apesar de cada um dos três sensores terem uma chance de 1/3 de produzir dados neste intervalo, os dados estão sendo mais rapidamente inseridos do que removidos, até que a fila enche. [1]

Figura 13. Queueing exemplo sendo executado

4.4. Rendez-Vous

Se as condições para coordenar uma task são mais complexas que as apresentadas anteriormente, quando fundamentalmente estávamos a proteger um recurso de acesso mútuo, podemos “concretizar” estas condições em um objeto de fato. O padrão Rende-Vouz modela as pré-condições para a coordenação de tasks, na forma de um objeto explicitamente separado com seus próprios dados e funções. É um padrão generalista aplicado para garantir que um conjunto de pré-condições arbitrárias sejam atendidas em tempo de execução. [1]

Figura 14. Rende-vouz pattern modelado em UML [1]

4.4.1 Rationale

Duas ou mais tarefas podem ser sincronizadas utilizando-se de uma estratégia a sua escolha que será codificada na classe Rendevouz. O padrão é facilmente adaptável. Quando uma thread encontra um ponto de sincronização, ela registra-se em um objeto da classe Rendevouz, e bloqueia-se até que este objeto a libere para ser executado, usando qualquer que seja a política de scheduling do sistema. É como se um veículo parasse em algum ponto de inspeção na estrada – o fiscal, i.e., o objeto Rendevouz, só a deixa seguir adiante se as condições (papelada, pneus, etc.), estiverem todas satisfeitas.

4.4.2. Elementos

A classe Rendezvous coordena as tarefas através de duas funções primárias:

void reset(void): reseta os critérios de sincronização, i.e., coloca-os de volta em suas condições iniciais.

void synchronize(void): este método é chamado quando uma task quer ser sincronizada. Se os critérios não estão satisfeitos, esta task será de alguma forma bloqueada. A estratégia para pode ser através de um Observer Pattern ou um Guarded Call Pattern, por exemplo. A complexidade deste padrão concentra-se primariamente neste método que avalia se as condições estão satisfeitas. Estas condições podem ser internas (como a Thread Barrier do padrão), externas, ou qualquer combinação das duas.

Normalmente o objeto RendeVouz é agnóstico em relação às threads que coordena. Se não o for, o método synchronize() terá um mais parâmetros para identificação das threads. [1]

O Semaphore é um semáforo comum, com as operações de lock e release.

SynchronizingThread representa uma task que utiliza o objeto Rendezvous para ser sincronizada: quando atinge seu ponto de sincronização, deve explicitamente chamar o método synchronize().

4.4.3 Implementação

Se o padrão for implementado com auxílio de um Observer Pattern, então as tasks precisam registrarem-se com o endereço de um callback a ser chamado quando os critérios de sincronização foram atendidos. Se o padrão Guarded Call Pattern (veja publicação anterior) for utilizado, então cada objeto RendezVous tem somente um único semáforo, que maneja uma fila de tasks que registraram-se naquele objeto RendezVous. Note que neste último caso, o padrão acaba por ser uma solução custosa (overkill), mas que simplifica a implementação de guarded calls, ao concentrá-las em um único componente do programa. É o típico trade-off reusabilidade/custo, que normalmente nos deparamos nas escolhas de design.

4.4.4 Exemplo

No exemplo da Figura 15, uma forma específica do padrão Rendezvous, conhecida como Thread Barrier Pattern [1] é implementada.

Figura 15. Rendezvous implementando uma Thread Barrier

Note que neste snippet, a única informação que o objeto recebe, fora do seu escopo, é o número de tarefas a serem sincronizadas, através do atributo expectedCount. Quando o número de tasks que invocaram o método synchronize() atinge 3, as tarefas são liberadas para o kernel manejá-las com a política de agendamento que utiliza. Os objetos do tipo semáforo e barrier são ponteiros que assumem a referência passada pela task que chamou o ThreadBarrier. O snippet não mostra nenhuma lógica que avalie informações externas, portanto estamos simplesmente a implementar um Guarded Call Pattern, de uma maneira bastante reutilizável. O importante é perceber o forte caráter extensível deste padrão.

Vale aqui replicar a Figura 3 da primeira parte do artigo:

Figura 16. Representação dinâmica de uma Thread Barrier; neste caso expectedCount = 4

Lista 6. Implementação em C do modelo descrito na Figura 15

/******************************
@file ThreadBarrier.h
******************************/
#ifndef ThreadBarrier_H
#define ThreadBarrier_H
/*# # auto_generated */
#include <oxf/Ric.h>
/*# # auto_generated */
#include "RendezvousExample.h"
/*# # auto_generated */
#include <oxf/RiCTask.h>
/*# # package RendezvousPattern::RendezvousExample */
/*# # class ThreadBarrier */
typedef struct ThreadBarrier ThreadBarrier;
struct ThreadBarrier {
  int currentCount;
  int expectedCount;
  OSSemaphore* barrier;
  OSSemaphore* mutex;
};
/* Constructors and destructors:*/
void ThreadBarrier_Init(ThreadBarrier* const me);
void ThreadBarrier_Cleanup(ThreadBarrier* const me);
/* Operations */
void ThreadBarrier_reset(ThreadBarrier* const me, int x);
void ThreadBarrier_synchronize(ThreadBarrier* const me);
ThreadBarrier * ThreadBarrier_Create(void);
void ThreadBarrier_Destroy(ThreadBarrier* const me);
#endif
// EOF
/******************************
@file ThreadBarrier.c
******************************/
#include "ThreadBarrier.h"
void ThreadBarrier_Init(ThreadBarrier* const me) 
{
  me->currentCount = 0;
  me->expectedCount = 3;
  if (me->barrier) 
  {
    OSSemaphore_lock(me->barrier);
    printf("BARRIER IS LOCKED FIRST TIME\n");
 }
}
void ThreadBarrier_Cleanup(ThreadBarrier* const me) 
{
  OSSemaphore_Destroy(me->barrier);
  OSSemaphore_Destroy(me->mutex);
}
void ThreadBarrier_reset(ThreadBarrier* const me, int x) 
{
  me->expectedCount = x;
  me->currentCount = 0;
}
void ThreadBarrier_synchronize(ThreadBarrier* const me) 
{
/*
protect the critical region around
the currentCount
*/
  OSSemaphore_lock(me->mutex);
  ++me->currentCount; /* critical region */
  OSSemaphore_release(me->mutex);
/*
are conditions for unblocking all threads met?
if so, then release the first blocked
thread or the highest priority blocked
thread (depending on the OS)
*/
  if (me->currentCount == me->expectedCount) 
  {
   printf("Conditions met\n");
   OSSemaphore_release(me->barrier);
   //let the scheduler do its job
  }
/*
lock the semaphore and when condition met
then release it for the next blocked thread
*/
  OSSemaphore_lock(me->barrier);
  /* code to check if condition met */
  OSSemaphore_release(me->barrier);
}
ThreadBarrier * ThreadBarrier_Create(void) 
{
  ThreadBarrier* me = (ThreadBarrier *) malloc(sizeof(ThreadBarrier));
  if(me!=NULL)
  {
    ThreadBarrier_Init(me);
    return me;
  }
void ThreadBarrier_Destroy(ThreadBarrier* const me) 
{
   if(me!=NULL)
   ThreadBarrier_Cleanup(me);
   free(me);
}

Fim da segunda parte.

Todos os padrões e exemplos aqui apresentados são de: 
[1] Douglass, Bruce Powel. Design patterns for embedded C: an embedded software engineering toolkit, 1st ed. ISBN 978-1-85617-707-8