A first preemptive kernel on ARM Cortex-M3

1. Introduction

When it comes to operating systems for embedded software, they are generally thought to be an overkill for most solutions. However, between a completely “hosted” multiapplication system (which uses a multi thread and general purpose OS) and a completely “standalone/monolithic” or “bare-metal” application specific system, there are many variations we can go for.

In previous publications I explored the notion of cooperative tasks, from a loop calling tasks from an array of function pointers, to something a little more complex with the processes being handled in a circular buffer, with explicit time criteria. This time I will show a minimal preemptive implementation, to also pave the way for something a little more complex, as in other publications.

2 Preemptive versus Cooperative

In a fully cooperative system, the processor does not interrupt any task to accept another, and the task itself needs to release the processor for the next one to use it. There is nothing wrong with this. A Run-To-Completion scheme is sufficient and / or necessary for many applications, and many embedded systems were deployed this way, including some very complex ones. In the past, even non-embedded systems used a cooperative kernel (Windows 3.x, NetWare 4.x, among others). If a task crashes, the entire system is compromised when we speak in a strictly cooperative way: it keeps the processor from going further (so in a server operating system like NetWare, this does not seem to be a good idea, because multiple clients are a must!).

In preemptive mode, tasks are interrupted and later resumed– i.e., a context (set of states in the processor registers) is saved and then retrieved. This leads to more complexity to the implementation but, if well done, it increases the robustness and the possibility of meeting narrower timing requirements, mainly if used with a priority and/or periodicity criteria to manage the queue.

3 Call stack

A processor is, in fact, a programmable finite-state machine. With some simplification, each state of our program can be defined within the set of core register values . This set dictates which program point is active. Therefore, activating a task means pushing values ​​to the call stack so that this task will be processed. This set of values is called context. To resume the task afterwards, it is necessary to save the call stack data at that point in the program. This “frozen” data represents a program state and is therefore called a stack frame. For every saved stackframe there is a context related. To resume a task, a previously saved stack frame is loaded back into the call stack.

In the ARM Cortex-M3, the 32-bit registers that define the active state of the processor are: R0-R12 for general use and R13-R15 registers for special use, in addition to the Program Status Register (xPSR) – its value is on the top of any stackframe, and it is not actually a single physical register, but a composition of three (Application, Interrupt and Execution: APSR, IPSR e EPSR).

Esta imagem possuí um atributo alt vazio; O nome do arquivo é armcortexm3arch-1.png

3.1. Load-store architectures

A load-store architecture is a processor architecture in which data from memory needs to be loaded to the core registers before being processed. Also, the result of this processing before being stored in memory must be in a register.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é registersm3-1.png

The two basic memory access operations on Cortex-M3:

// reads the data contained in the address indicated by Rn + offset and places it in Rn. 
LDR Rd, [Rn, #offset]
// stores data contained in Rn at the address pointed by Rd + offset
STR Rn, [Rd, #offset]

It is important to understand at least the Cortex-M3 instructions shown below. I suggest sources [1] and [2] as good references, in addition to this or this link.

MOV Rd, Rn // Rd = Rn
MOV Rd, #M // Rd = M, the immediate being a 32-bit value (here represented by M)
ADD Rd, Rn, Rm // Rd = Rn + Rm
ADD Rd, Rn, #M // Rd = Rn + M
SUB Rd, Rn, Rm // Rd = Rn - Rm
SUB Rd, Rn, #M // Rd = Rn - M
// pseudo-instruction to save Rn in a memory location.
// After a PUSH, the value of the stack pointer is decreased by 4 bytes
PUSH {Rn}
// POP increases SP by 4 bytes after loading data into Rn.
// this increase-decrease is based on the current address the SP is pointing to
POP {Rn}
B label // jump to routine label
BX Rm // jump to routine specified indirectly by Rm
BL label // jump to label and moves the caller address to LR

CPSID I // enable interrupts
CPSIE I // disable interrupts

We will operate the M3 in Thumb mode , where the instructions are actually 16 bits. According to ARM , this is done to improve code density while maintaining the benefits of a 32-bit architecture. Bit 24 of the PSR is always 1.

3.2. Stacks and stack pointer (SP)

Stack is a memory usage model [1]. It works in the Last In – First Out format (last to enter, first to leave). It is as if I organized a pile of documents to read. It is convenient that the first document to be read is at the top of the stack, and the last at the end.

We usually divide the memory between heap and stack . As said, the “call stack” will contain that temporary data that determines the next state of the processor. The heap contains data which nature is not temporary in the course of the program (this does not mean “non-volatile”). The stack pointer is a kind of pivot that keeps control of the program flow, by pointing to some position of the stack.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é stackusage.png
Figure 3. Model for using a stack. When saving data before processing (transforming) it saves the previous information. (Figure from [1])
Esta imagem possuí um atributo alt vazio; O nome do arquivo é annotation-2020-02-06-001056.png
Figure 4. Regions of memory mapped on a Cortex M3. The region available for the stack is confined between the addresses 0x20008000 and 0 0x20007C00. [1]

4 Multitasking on the ARM Cortex M3

The M3 offers two stack pointers (Main Stack Pointer and Process Stack Pointer) to isolate user processes of the process kernel. Every interrupt service runs in kernel mode . It is not possible to go from user mode to kernel mode (actually called thread mode and privileged mode) without going through an interruption – but it is possible to go from privileged mode to user mode by changing the control register.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é so.png
Figure 5. Changing context on an OS that isolates the kernel application [1]

The core also has dedicated hardware for switching tasks. The SysTick interrupt service can be used to implement synchronous context switching. There are still other asynchronous software interruptions (traps) like PendSV and SVC. Thus, SysTick is used for synchronous tasks in the kernel, while SVC serves asynchronous interruptions, when the application makes a call to the system. The PendSV  is a software interruption which by default can only be triggered in privileged mode. It is usually suggested [1] to trigger it within SysTick service, because it is possible to keep track of the ticks to meet the time criteria. The interruption by SysTick is soon served, with no risk of losing any tick of the clock. A secure OS implementation would use both stack pointers to separate user and kernel threads and separate memory domains if an MPU (Memory Protection Unit) is available. At first, we will only use the MSP in privileged mode.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é 2sps.png
Figure 6. Memory layout on an OS with two stack pointers and protected memory [1]

5. Building the kernel

Kernel is a somewhat broad concept, but I believe that there is no OS which kernel is not responsible for scheduling tasks. In addition, there must be IPC (inter-process communication) mechanisms. It is interesting to note the strong hardware-dependency of the scheduler that will be shown, due to its low-level nature .

5.1. Stackframes and context switching

Remember: call stack = the registers of the core ; stack or stackframe = state (values) of these registers saved in memory.

When a SysTick is served, part of the call stack is saved by the hardware (R0, R1, R2, R3, R12, R14 (LR) and R15 (PC) and PSR). Let’s call this portion saved by the hardware stackframe. The remaining is the software stackframe [3], which we must explicitly save and retrieve with the PUSH and POP instructions .

To think about our system, we can outline a complete context switch depicting the key positions the stack pointer assumes during the operation (in the figure below the memory addresses increase, from bottom to top. When SP points to R4 it is aligned with an address lower than the PC on the stack)

Esta imagem possuí um atributo alt vazio; O nome do arquivo é ctxtswitch-1.png
Figure 7. Switching contexts. The active task is saved by the hardware and the kernel. The stackpointer is re-assigned, according to pre-established criteria, to the R4 of the next stackframe to be activated. The data is rescued. The task is performed. (Figure based on [3]) (“Salvo pelo hardware/kernel” translates to “Saved /pushed by hardware/kernel”; “Resgatado pelo hardware/kernel” translates to “retrieved/popped by hardware/kernel”)

When an interruption takes place, SP will be pointing to the top of the stack (SP (O)) to be saved. This is inevitable because this is how the M3 works. In an interruption the hardware will save the first 8 highest registers in the call stack at the 8 addresses below the stack pointer, stopping at (SP (1)). When we save the remaining registers, the SP will now be pointing to the R4 of the current stack (SP (2)). When we reassign the SP to the address that points to the R4 of the next stack (SP (3)), the POP throws the values of R4-R11 to the call stack and the stack pointer is now at (SP (4)). Finally, the return from the interrupt pops the remaining stackframe, and the SP (5) is at the top of the stack that has just been activated. (If you’re wondering where R13 is: it stores the value of the stack pointer)

The context switching routine is written in assembly and implements exactly what is described in Figure 7.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é systick-2.png
Figure 8. Context switcher

PS: When an interruption occurs, the LR takes on a special code. 0xFFFFFFF9 , if the interrupted thread was using MSP or 0xFFFFFFFD if the interrupted thread was using PSP.

5.1 Initializing the stacks for each task

For the above strategy to work, we need to initialize the stacks for each task accordingly. The sp starts by pointing to R4. This is by definition the starting stack pointer of a task, as it is the lowest address in a frame .

In addition, we need to create a data structure that correctly points to the stacks that will be activated for each SysTick service . We usually call this structure a TCB (thread control block). For the time being we do not use any selection criteria and therefore there are no control parameters other than next: when a task is interrupted, the next one in the queue will be resumed and executed.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é struct.png
Figure 9. Thread control block
Esta imagem possuí um atributo alt vazio; O nome do arquivo é stackinit-3.png
Figure 10. Initializing the stack (the values representing the registers, like 0x07070707 are for debugging purposes)

The kSetInitStack function initializes the stack for each “i” thread . The stack pointer in the associated TCB points to the data relating to R4. The stack data is initialized with the record number that must be loaded to facilitate debugging. The PSR only needs to be initialized with bit 24 as 1, which is the bit that identifies Thumb mode . The tasks have the signature void Task (void * args) .

To add a task to the stack, we initially need the address of the main function of the task. In addition, we will also pass an argument. The first argument is in R0. If more arguments are needed, other registers can be used, according to AAPCS (ARM Application Procedure Call Standard).

Esta imagem possuí um atributo alt vazio; O nome do arquivo é addthreads-1.png
Figure 11. Routine for adding tasks and their arguments to the initial stackframe

5.3. Kernel start-up

It is not enough to initialize the stacks and wait for the SysTick. The TCB structure sp will only hold a valid stack pointer value when the task is interrupted. We have two types of threads running: background and foreground threads. The background includes the kernel routines, including the context switcher. At each SysTick, it is the kernel’s turn to use the processor. In the foreground are the applications.

If the task has not already been executed, the stack pointer saved in sp will not be valid. So we need to make it looks like the task has been executed, interrupted and saved – to be reactivated later. I used the following strategy:

  1. An interruption is forced (PendSV). Initial hardware stackframe is saved.
  2. tcb [0].sp is loaded in SP
  3. The R4 – R11 of the core are loaded with the values ​​of the initialized stackframe.
  4. ISR returns, retrieves the hardware stack frame and the SP will be at the top of the stack. The PC is now loaded with the address of the first call to be made, and the program follows the flow.
Esta imagem possuí um atributo alt vazio; O nome do arquivo é pendsv.png
Figure 12. PendSV interrupt service to boot the kernel

In [2] a very smarter way of starting up the kernel is suggested:

Esta imagem possuí um atributo alt vazio; O nome do arquivo é kstart-1.png
Figure 13. Routine for booting the kernel

The interruption is dispensed and the call stack is loaded by activating the LR with the PC value of the stack . After finally taking SP to the top of the stack, BX LR executes the task and returns.

If we use the first method presented, kStart is simply:

// using CMSIS lib 
void kStart(void) { SCB->ICSR |= SCB_ICSR_PENDSVSET_Msk; }

6. Putting it all together

To illustrate, we will perform, in Round-Robin 3 tasks that switch the output of 3 different pins and increment three different counters. The time between a change of context and another will be 1000 cycles of the main clock. Note that these functions run within a “while (1) {}”. It is like we have several main programs running on the foreground . Each stack has 64 x 4-byte elements (256 bytes).

Esta imagem possuí um atributo alt vazio; O nome do arquivo é tasks-1.png
Figure 14. System Tasks

Below the main function of the system. The hardware is initialized. Tasks are added and the stacks are initialized. The RunPtr receives the address of the thread.  After setting the SysTick to trigger every 1000 clock cycles, boot up the kernel . After executing the first task and being interrupted, the system is switching between one task and another, with the context switcher running in the background .

Esta imagem possuí um atributo alt vazio; O nome do arquivo é main.png
Figure 15. Main program

6.1. Debug

You will need at least a simulator to implement the system more easily, as you will need to access the core registers and see the data moving in the stacks. If the system is working, each time the debugger is paused, the counters should have almost the same value.

In the photo below, I use an Arduino Due board with an Atmel SAM3X8E processor and an Atmel ICE debugger connected to the board’s JTAG. On the oscilloscope you can see the waveforms of the outputs switching for each of the 3 tasks.

Esta imagem possuí um atributo alt vazio; O nome do arquivo é 20200209_163936.jpg
Figure 16. Debug bench
Esta imagem possuí um atributo alt vazio; O nome do arquivo é ds1z_quickprint1-1.png
Figure 17. Tasks 1, 2, 3 on the oscilloscope.

7 Conclusions

The implementation of a preemptive kernel requires reasonable knowledge of the processor architecture to be used. Loading the call stack registers and saving them in a “handmade” way allows us to have greater control of the system at the expense of the complexity of handling the stacks.

The example presented here is a minimal example where each task is given the same amount of time to be performed. After that time, the task is interrupted and suspended – the data from the call stack is saved. This saved set is called a stackframe – a “photograph” of the point at the program was. The next task to be performed is loaded at the point it was interrupted and resumed. The code was written in order to explain the concepts.

In the next publication we will split the threads between user mode and privileged mode – using two stack pointers – a fundamental requirement for a more secure system.

References

The text of this post as well as the non-referenced figures are from the author.
[1] The definitive guide to ARM Cortex-M3, Joseph Yiu
[2] Real-Time Operating Systems for the ARM Cortex-M, Jonathan Valvano
[3] https://www.embedded.com/taking-advantage-of-the-cortex-m3s-pre-emptive-context-switches/

Separating user space from kernel space on ARM Cortex-M3

1 . Introduction

ARM Cortex-M processors are in SoCs of several application domains, especially in much of what we call smart devices. This publication continues the previous one, in which I had demonstrated the implementation of a minimum preemptive scheduling mechanism for an ARM Cortex-M3, taking advantage of the special hardware resources for context switching.

Some important features of this architecture will now be used to improve the kernel development, such as the separation of user and kernel threads – with 2 stack pointers: MSP and PSP, besides the use of Supervisor Calls to implement system calls.

Although some concepts about operating systems are addressed because they are inherent to the subject, the objective is to explore the ARM Cortex-M3, a relatively inexpensive processor with wide applicability, and how to take advantage of it to develop more robust systems.

2. Special registers

There are 3 special registers on the ARM Cortex-M3. You can consult the ARM documentation to understand the role of each of them in the processor. The most important in this publication is CONTROL .

  • Program Status Registers (APSR, IPSR, and EPSR)
  • Interrupt Mask Registers (PRIMASK, FAULTMASK and BASEPRI)
  • Control (CONTROL) .

Special registers can only be accessed through the privileged instructions MRS (ARM Read Special Register) and MSR (ARM Set Special Register):

// Load in R0 the current value contained in the special register 
MRS R0, SPECIAL
// Load in the special register the value contained in R0)
MSR SPECIAL, R0

The CONTROL register has only 2 configurable bits. When an exception handler (e.g, SysTick_Handler) is running, the processor will be in privileged mode and using the main stack pointer (MSP) , and CONTROL [1] = 0, CONTROL [0] = 0 . In other routines that are not handlers, this register can assume different values ​​depending on the software implementation (Table 1).

In the small kernel shown before, the application tasks (Task1, Task2 and Task3) were also executed in privileged mode and using the main stack pointer (MSP). Thus, an application program could change the special registers of the core if it wanted to.

3. Kernel and user domains

In the last publication I highlighted the fact that register R13 is not part of the stackframe, as it stores the address of the current stack pointer. The R13 is a “banked” register, meaning that it is physically replicated, and takes a value or another depending on the state of the core.

CTRL [1] (0 = MSP / 1 = PSP)CTRL [0] (0 = Priv, 1 = Non priv )state
00Privileged handler * / Base mode
01Unprivileged
10Privileged thread
11User thread
Table 1. Possible states of the CONTROL register
* in exception handlers, this mode will always be active even if CTRL [0] = 1.

With two stack pointers, one for application and another for the kernel, means that a user thread can not easily corrupt the kernel stack by a programming application error or malicious code. According to the ARM manuals, a robust operating system typically has the following characteristics:

  • interrupt handlers use MSP (by default)
  • kernel routines are activated through SysTick at regular intervals to perform task scheduling and system management in privileged mode
  • user applications use PSP in non-privileged mode
  • memory for kernel routines can only be accessed in privileged mode* and use MSP

* for now we will not isolate the memory spaces

4. System Calls

Putting it simple, a system call is a method in which a software requests a service from the kernel or OS on which it is running. If we intend to separate our system into privilege levels, it is inevitable that the application level needs call the kernel to have access to, for example, hardware services or whatever else we think is critical to the security and stability of our system.

A common way to implement system calls in ARM Cortex-M3 (and in other ARMv7) is to use the software interrupt Supervisor Call (SVC). The SVC acts as an entry point for a service that requires privileges to run. The only input parameter of an SVC is its number (ASM instruction: SVC #N), which we associate with a function call (callback). Unlike other exceptions triggered via software available, like PendSV (Pendable Supervisor Call), the SVC can be triggered in user mode by default.

Figure 2. Block diagram of a possible operating system architecture. Supervisor Calls act as an entry point for privileged services. [2]

5. Design

5.1 Using two stack pointers

To use the two available stack pointers (MSP and PSP) it is essential to understand 2 things:

  • The control register manipulation: it is only possible to write or read the CONTROL register in handler mode (within an exception handler) or in privileged threads.
  • The exceptions mechanism: when an interrupt takes place, the processor saves the contents of registers R0-R3, LR, PC and xPSR, as explained in the previous publication. The value of the LR when we enter an exception indicates the mode the processor was running, when the thread was interrupted. We can manipulate this value of LR together with the manipulation of the stack pointer to control the program flow.
LRBX LR
0xFFFFFFF9Returns to “base” mode, privileged MSP. (CONTROL = 0b00)
0xFFFFFFFDReturns to user mode (PSP, with the privilege level of entry) (Control = 0b1x)
0xFFFFFFF1Returns to the previous interruption, in case a higher priority interruption occurs during a lower priority.
Table 2. Exception return values

5.1.1. One kernel stack for each user stack

Each user stack will have a correspondent kernel stack (one kernel stack per thread). Thus, each Task is associated to a kernel stack and a user stack. Another approach would be only one kernel stack for the entire the system (one kernel stack per processor). The advantage of using the first approach is that from the point of view of who implements the system, the programs that run in the kernel follow the same development pattern as the application programs. The advantage of the second approach is less memory overhead and less latency in context switching.

Figure 3. Each user stack has an associated kernel stack

5.2 Kernel entry and exit mechanisms

In the previous publication, the interruption triggered by SysTick handled the context switching, i.e., it interrupted the running thread, saved its stackframe, searched for the next thread pointed to by the next field in the TCB (thread control block) structure and resumed it.

With the separation between user and supervisor spaces, we will have two mechanisms to get in and out the kernel, the system calls, explicitly called in code, and the interruption by SysTick that implements the scheduling routine. Although still using a round-robin scheme in which each task has the same time slice, the threads of the kernel also work cooperatively with a user thread that evoked it, that is: when there is nothing more to be done, the kernel can explicitly return. If the thread of the kernel takes longer than the time between one tick and another, it will be interrupted and rescheduled. User tasks could also use a similar mechanism, however, for simplicity of exposure, I chose to leave user tasks only in a fixed round-robin scheme, with no cooperative mechanisms.

5.2.1. Scheduler

The flowchart of the preemptive scheduler to be implemented is in Figure 4. The start-up of the kernel and user application is also shown for clarity. The kernel starts upo and voluntarily triggers the first user task. At every SysTick interruption, the thread has its state saved and the next scheduled task is resumed according to the state in which it was interrupted: kernel or user mode.

Figura 4. Scheduler flowchart

5.2.2 System Calls

System Calls are needed when the user requests access to a privileged service. In addition, I also use the same mechanism for a kernel thread to cooperatively return to the user thread.

Note that I chose not to run the kernel threads within the SVC handler, which would be more intuitive, as well as more usual. The reasons for this are because I wanted to take advantage of the processor’s own interrupt mechanism that when returning POPs registers RO-R3, LR, PC and xPSR, and also to avoid the interruption nesting during the preemption of kernel tasks.

If I had chosen to use only one kernel stack for all threads, the implementation within the handler itself I think would be better. Choices, design choices…

Figure 5. System Call flowchart

6. Implementation

Below I explain the codes created to implement previously described features proof of concept. Most of the kernel itself is written in assembly, except for a portion of the supervisor calls handler that is written in C with some inline assembly. In my opinion, more cumbersome and susceptible to errors than writing in assembly is to embed assembly in C code. The toolchain used is the GNU ARM.

6.1. Stacks

There is nothing special here, except that now in addition to the stack user declare another array of integers for the stack of the kernel . These will be associated in the Thread Control Block.

int32_t p_stacks[NTHREADS][STACK_SIZE]; // user stack
int32_t k_stacks[NTHREADS][STACK_SIZE]; // kernel stack

6.2. Task Scheduler

The main difference from this to the scheduler shown in the last publication is that we will now handle two different stack pointers: the MSP and the PSP. Thus, when entering an exception handler, the portion of the stackframe saved automatically depends on the stack pointer used when the exception took place. However, in the exception routine, the active stack pointer is always the MSP. Thus, in order to be able to handle a stack pointer when we are operating with another, we will cannot use the PUSH and POP pseudo-instructions because they have the active stack pointer as their base address . We will have to replace them with the instructions LDMIA (load multiple and increment after) for POP, and STMDB (store multiple decrement before) for PUSH, with the writeback sign “!” at the base address [1] .

// Example of POP 
MRS R12, PSP // reads the value of the process stack pointer in R12
LDMIA R12!, {R4-R11} // R12 contains the base address (PSP)
/ * the address contained in R12 now stores the value from R4; [R12] + 0x4
contains the value of R5, and so on until [R12] + 0x20 contains the
value of R11.
the initial value of R12 is also increased by 32 bytes
* /
MSR PSP, R12 // PSP is updated to the new value of R12

// Example of PUSH
MSR R12, MSP
STMDB R12!, {R4-R11}
/ * [R12] - 0x20 contains R4, [R12] - 0x16 contains R5, ..., [R12] contains R4
the initial value of R12 is decremented by 32 bytes * /
MSR MSP, R12 // MSP is updated to the new value of R12

Another difference is that the TCB structure now needs to contain a pointer to each of the stack pointers of the thread it controls, and also a flag indicating whether the task to be resumed was using MSP or PSP when it was interrupted.

// thread control block
struct tcb 
{
  int32_t*  psp; //psp saved from the last interrupted thread
  int32_t*  ksp; //ksp saved from the last interrupted kernel thread
  struct tcb    *next; //points to next tcb
  int32_t   pid; //task id
  int32_t   kernel_flag; // 0=kernel, 1=user    
};
typedef struct tcb tcb_t;
tcb_t tcb[NTHREADS]; //tcb array
tcb_t* RunPtr; 

The scheduler routines are shown below. The code was written so it is clear in its intention, without trying to save instructions. Note that in line 5 the value of LR at the entry of the exception is only compared with 0xFFFFFFFD, if false it is assumed that it is 0xFFFFFFFF9, this is because I guarantee that there will be no nested interrupts (SysTick never interrupts an SVC, for example), so the LR should never assume 0xFFFFFFF1. If other than a proof of concept, the test should be considered.

.global SysTick_Handler
.type SysTick_Handler, %function
SysTick_Handler:            
CPSID   I // atomic begins
CMP LR, #0xFFFFFFFD // were we at an user thread?
BEQ SaveUserCtxt    // if yes
B   SaveKernelCtxt  //if no
 
SaveKernelCtxt:
MRS R12, MSP 
STMDB   R12!, {R4-R11}  //push R4-R11
MSR MSP, R12
LDR R0,=RunPtr      
LDR R1, [R0]
LDR R2, [R1,#4]
STR R12, [R2]  //saves stack pointer
B   Schedule
 
SaveUserCtxt:
MRS R12, PSP
STMB    R12!, {R4-R11}
MSR PSP, R12
LDR R0,=RunPtr
LDR R1, [R0]
STR R12, [R1]       
B   Schedule
 
Schedule:
LDR R1, =RunPtr //R1 <- RunPtr
LDR R2, [R1]    
LDR R2, [R2,#8] //R2 <- RunPtr.next
STR R2, [R1]    //updates RunPtr
LDR R0, =RunPtr
LDR R1, [R0]
LDR R2, [R1,#16]
CMP R2, #1       //if kernel_flag=1
BEQ ResumeUser   //yes, resume user thread
B   ResumeKernel //no, resume kernel thread
 
ResumeUser:
LDR R1, =RunPtr  //R1 <- RunPtr
LDR R2, [R1]
LDR R2, [R2]
LDMIA   R2!, {R4-R11} //Pop sw stackframe 
MSR PSP, R2     
MOV LR, #0xFFFFFFFD //LR=return to user thread
CPSIE   I       //atomica end
BX  LR 
 
ResumeKernel:
LDR R1, =RunPtr   //R1 <- RunPtr
LDR R2, [R1]
LDR R2, [R2, #4]
MSR MSP, R2
LDMIA   R2!, {R4-R11} //Retrieves sw stackframe 
MSR MSP, R2
MOV LR, #0xFFFFFFF9  //LR=return to kernel thread
CPSIE   I        //atomic end
BX  LR 

6.3 System Calls

The implementation of system calls uses the SVC Handler. As stated, SVC has a unique input parameter (ARM makes it sounds like an advantage…), that is the number we associate with a callback. But then how do we pass the arguments forward to the callback, if we can make system calls with only one input parameter? They need to be retrieved from the stack. The AAPCS (ARM Application Procedure Call Standard) which is followed by compilers, says that when a function (caller) calls another function (callee), the callee expects its arguments to be in R0-R3. Likewise, the caller expects the callee return value to be in R0. R4-R11 must be preserved between calls. R12 is the scratch register and can be freely used.

No wonder that when an exception takes place the core saves (PUSH) the registers R0-R3, LR, PC and xPSR from the interrupted function, and when returning put them (POP) again in the core registers. It is fully prepared to get back to the same point when it was interrupted. But if we change the context, that is, after the interruption we do not return to the same point we were before, there will be a need to explicitly save the remaining stackframe so this thread can be resumed properly later. It is essential to follow the AAPCS if we want to evoke functions written in assembly from C code and vice-versa.

To system calls, I defined a macro function in C that receives the SVC code and the arguments for the callback (the syntax of inline assembly depends on the compiler used).

#define SysCall(svc_number, args) {                                       
                                                                        
    __ASM volatile ("MOV R0, %0 "     :: "r"            (args) );     
    __ASM volatile ("svc %[immediate]"::[immediate] "I" (svc_number) : );   
}

(There is a reason I created a macro and not a common function: it has to do with the return point to the user thread and the fact that the kernel callbacks are not executed within the exception, which requires changing the context. If I had created a common function for the system call, the user stack pointer would be saved within the call, and upon returning from the kernel, the SVC would be executed again. If you know how to run the system call outside the handler routine without using a macro, please let me know!)

The args value is stored in R0. The SVC call is made with the immediate “svc_number”. When the SVC is triggered, R0-R3 will be automatically saved to the stack. The code was written as follows, without saving instructions, for clarity:

global SVC_Handler
.type   SVC_Handler, %function
 SVC_Handler:
 MRS R12, PSP        //saves psp
 CMP LR, #0xFFFFFFFD
 BEQ KernelEntry
 B   KernelExit
 
//saves user context
KernelEntry: 
MRS R3, PSP
STMDB   R3!, {R4-R11}
MSR PSP, R3
LDR R1,=RunPtr
LDR R2, [R1]
STR R3, [R2]    
LDR R3, =#0  
STR R3, [R1, #16] //kernel flag = 0
MOV R0, R12   //gets r0 from CORE to retrieve SVC number
B svchandler_main //branch to C routine
KernelExit:
//retrieves user context
LDR R0, =RunPtr
LDR R1, [R0]
LDR R2, [R1]
LDMIA   R2!, {R4-R11}
MOV LR, #0xFFFFFFFD
MSR PSP, R2
LDR R12, =#1 //kernel flag = 1
STR R12, [R1, #16]
BX  LR 

The rest of the routine for entering the kernel is written in C [2, 3]. Note that in the routine written in assembly a simple branch occurs (line 20) and therefore we have not yet returned from the exception handler .

The svc_number, in turn, is retrieved by walking two bytes (hence the cast to char) out of the address of the PC that is 6 positions above R0 in the stack [1, 2, 3]. Note that it was necessary to assign to R0 the value contained in PSP shortly after entering the interrupt, before saving the rest of the stack (lines 4 and 19 of the assembly code).

After retrieving the system call number and its arguments, the MSP is overwritten with the value stored in the TCB. Then we change the value of LR so the exception returns to the base mode. The callback does not run within the handler. When the BX LR instruction is executed, the remaining of the stackframe is automatically activated onto the core registers.

#define SysCall_GPIO_Toggle  1 //svc number for gpio toggle
#define SysCall_Uart_PrintLn 2 //svc number for uart print line
 
void svchandler_main(uint32_t * svc_args)
{       
    uint32_t svc_number;
    uint32_t svc_arg0;
    uint32_t svc_arg1;
    svc_number = ((char *) svc_args[6])[-2]; // recupera o imediato 
    svc_arg0 = svc_args[0];
    svc_arg1 = svc_args[1]; 
  
 switch(svc_number)
 {
 case SysCall_GPIO_Toggle: 
    k_stacks[RunPtr->pid][STACK_SIZE-2] = (int32_t)SysCallGPIO_Toggle_; //PC
    k_stacks[RunPtr->pid][STACK_SIZE-8] = (int32_t)svc_arg0; //R0
    k_stacks[RunPtr->pid][STACK_SIZE-1] = (1 << 24); // T=1 (xPSR)
    __ASM volatile ("MSR MSP, %0" : : "r" (RunPtr->ksp) : );
    __ASM volatile ("POP {R4-R11}");
    __ASM volatile ("MOV LR, #0xFFFFFFF9");
    __ASM volatile ("BX LR"); //returns from exception
    break;
 case SysCall_Uart_PrintLn: 
    k_stacks[RunPtr->pid][STACK_SIZE-2] = (int32_t)SysCallUART_PrintLn_; 
    k_stacks[RunPtr->pid][STACK_SIZE-8] = (int32_t)svc_arg0;
    k_stacks[RunPtr->pid][STACK_SIZE-1] = (1 << 24); // T=1
    __ASM volatile ("MSR MSP, %0" : : "r" (RunPtr->ksp) : );
    __ASM volatile ("POP {R4-R11}");
    __ASM volatile ("MOV LR, #0xFFFFFFF9");
    __ASM volatile ("BX LR"); //returns from exception
    break;
 default:
    __ASM volatile("B SysCall_Dummy");
    break;
 break;
 }
}

A callback looks like this:

static void SysCall_CallBack_(void* args)
{
    BSP_Function((int32_t*) args); //BSP function with one argument int32
    exitKernel_(); // leaves cooperatively
}

6.4. Start-up

The start-up is a critical point. The system starts in base mode. The stacks are assembled. The first task to be performed by the kernel after booting the system is to configure SysTick, switch to user mode and trigger the first user thread .

The assembly routines for the star-up are as follows:

.equ SYSTICK_CTRL, 0xE000E010 
.equ TIME_SLICE,    999
 
.global kStart 
.type kStart, %function
kStart:
LDR R0, =RunPtrStart
LDR R1, [R0]
LDR R2, [R1,#4]
MSR MSP, R2   // MSP <- RunPtr.ksp
POP {R4-R11}  //loads stackframe 0 at call stack
POP {R0-R3}
POP {R12}
ADD SP, SP, #4
POP {LR}     //LR <- PC = UsrAppStart
ADD SP, SP, #4
BX  LR // branches to UsrAppStart
 
//this function manages the stack to run the first user thread
.global UsrAppStart 
.type   UsrAppStart, %function
UsrAppStart:                
LDR R1, =RunPtr //R1 <- RunPtr
LDR R2, [R1]        
LDR R2, [R2]
MSR PSP, R2
BL  SysTickConf //configures systick
MOV R0, #0x3
MSR CONTROL, R0 //thread unprivileged mode
ISB         // inst set barrier: guarantees CONTROL is updated before going
POP {R4-R11}   //loads stackframe 0
POP {R0-R3}
POP {R12}
ADD SP, SP, #4
POP {LR}       //LR <- PC
ADD SP, SP, #4
BX LR
     
SysTickConf:
LDR R0, =SYSTICK_CTRL 
MOV R1, #0
STR R1, [R0]  // resets counter
LDR R1, =TIME_SLICE  
STR R1, [R0,#4] // RELOAD <- TIME_SLICE
STR R1, [R0,#8] // CURR_VALUE <- TIME_SLICE
MOV R1, #0x7   // 0b111:
            // 1: Clock source = core clock 
            // 1: Enables irq
            // 1: Enables counter
STR R1, [R0]        
BX  LR      //get back to caller

7. Test

For a little demonstration, we will write on the PC screen via UART. The callback for the system call was written as follows:

static void SysCallUART_PrintLn_(const char* args)
{
    __disable_irq();
    uart_write_line(UART, args);        
// waits until transmission is done
    while (uart_get_status(UART) != UART_SR_TXRDY); 
    __enable_irq();
    exitKernel_(); // exit kernel cooperatively
}

It is necessary to be careful when using multitasking to use any shared resource, since we have not yet inserted any inter-process communication mechanism. However, the operation is at a “Guarded Region”, and it will not be interrupted by SysTick. The main program is as follows:

#include <commondefs.h> //board support package, std libs, etc.
#include <kernel.h>  
#include <tasks.h>
 
int main(void)
{
  kHardwareInit(); 
  kAddThreads(Task1, (void*)"Task1nr", Task2, (void*)"Task2nr", Task3, (void*)"Task3nr");
  RunPtrStart = &tcbs[0]; 
  RunPtr = &tcbs[1];
  uart_write_line(UART, "Inicializando kernel...nr");
  delay_ms(500); //delay to print screen  : P
  kStart(); 
  while(1);
}

The tasks (main threads) look like this:

void Task1(void* args)
{
    const char* string = (char*)args;
    while(1)
    {
        SysCall(SysCall_Uart_PrintLn, string);
    }
}

In the figure below, the running system:

8. Conclusions

The use of two stack pointers, one for application and another for the kernel isolates these spaces not allowing the application to corrupt the kernel stack. The privileges prevent the user from overwriting special registers, keeping the kernel safe from application programming errors or malicious code.

Adding another stack pointer to the system required changes to the scheduling routine because we now manipulate two stacks in the domain of two different stack pointers, and both can be preempted. In addition, a cooperative mechanism has also been added for kernel exiting.

The one kernel stack per user stack approach makes the development of kernel or application routines to follow the same pattern from the perspective of who is writing the system. The price to pay is memory overhead and more latency when switching contexts. To mitigate the last, cooperative mechanisms can be added as shown. To mitigate the memory overhead more attention should be put when modeling the tasks (or concurrent units), so they are efficiently allocated.

The system call mechanism is used as an entry point to hardware services, or whatever else we deem critical for the security and stability of the system. This will make even more sense by separating not only the stacks at privilege levels but also the memory regions with the MPU .

For the next publications we will: 1) create IPC mechanisms, 2) add priority levels to the tasks that are running on a fixed round-robin scheme 3) configure the MPU

9. References

[1] http://infocenter.arm.com/help/topic/com.arm.doc.ddi0337e/DDI0337E_cortex_m3_r1p1_trm.pdf

[2] The definitive Guide to ARM Cortex M3, Joseph Yiu

[3] https://developer.arm.com/docs/dui0471/j/handling-processor-exceptions/svc-handlers-in-c-and-assembly-language

Padrões de design para comunicação interprocessos em software embarcado (2/3)

«Primeira parte»

4.3. Queuing Pattern

Quando tasks têm baixo acomplamento temporal, i.e., virtualmente1 o tempo em que um processo consumidor leva para resgatar o resultado de um processo produtor não é condição para diferenciar falha de sucesso, o padrão de Message Queueing é bastante comum.

O processo produtor envia uma mensagem a uma fila (um buffer, normalmente circular) e algum tempo depois o processo consumidor resgata esta mensagem para seguir adiante. A mensagem pode ser desde um simples booleano ou qualquer outra estrutura mais complexa.

1 isto não significa que o sistema não tenha condições temporais. Significa somente que determinadas tasks não têm relação temporal rígida entre si naquele ponto de sincronização (coordenação).

Figura 10. Message Queue para coordenar processos [2]
Figura 11. Generalização do padrão Queuing em UML

4.3.1. Rationale

Se os processos são fracamente acoplados no tempo, podemos coordená-los (ou sincronizá-los) com um mecanismo assíncrono: um processo envia a mensagem e o outro lê “quando puder”.

4.3.2 Elementos

Na Figura 11, diversas tarefas QTask compartilham uma única MessageQueue. Um Mutex agregado a esta fila de mensagens, coordena o acesso de cada tarefa à fila de mensagens. A fila tem QUEUE_SIZE elementos do tipo Message, que podem ser qualquer dado/comando – desde um simples booleano até uma PDU do tipo CAN, por exemplo. Este tamanho deve comportar o pior caso para filas no sistema1. Poderíamos dispensar o Mutex se agregássemos a cada processo a sua própria caixa de mensagens, obviamente ao custo de utilizarmos mais memória.

1 Um dos métodos empregados para calcular o tamanho da fila é o Teorema de Little

4.3.3. Implementação

No que diz respeito à complexidade, este tipo de mecanismo de IPC é relativamente simples, mas existem muitas variantes. Algumas mensagens podem ser mais prioritárias que outras, em sistemas complexos o tamanho da fila é difícil de ser optimizado e algumas implementações fazem uso de alocação dinâmica (o que em embarcados, é controverso) ou até mesmo guardam mensagens pouco prioritárias no sistema de arquivos [1]. A utilização de linked lists para implementação também é uma abordagem que conta com a flexibilidade do próximo item a ser lido ser passado por referência, e com esta flexibilidade também os potenciais problemas que o extensivo uso de ponteiros trás.

Neste artigo um scheduler cooperativo manejava as tarefas em fila, onde aquelas mais urgentes (baseados na deadline) eram colocadas no início da fila. Fundamentalmente isto é um mecanismo de comunicação interprocessos do padrão queueing: a fila coordena o disparo dos processos que concorrem para utilizar um recurso: o microprocessador.

4.3.4. Exemplo

A Figura 12 mostra um sistema, em que sensores de determinados gases compartilham um queue para registrar os valores medidos na estrutura de dados GasData. Um GasDataQueue tem GAS_QUEUE_SIZE elementos do tipo GasData. Este por sua vez contém um enum GAS_TYPE.

Figura 12. Queing Example [1]

O programa GasDataQueue.c é o núcleo deste design pattern, com os métodos para incluir novos dados na fila e remover os antigos (este código mostra uma boa implementação de buffer circular, um workhorse em sistemas embebidos). A Figura 12 nos diz que duas threads, GasProcessingThread e SensorThread, compartilham o recurso GasDataQueue, como consumidor e provedor respectivamente.

A SensorThread atua provendo os dados à fila: atualiza os dados recebidos pelos sensores e os concatena para corretamente alocá-los em uma das estruturas GasData de GasDataQueue. Este processo de alocar dados de um objeto em uma outra estrutura, para transmiti-los, representá-los e/ou armazená-los de forma significativa, em computação é conhecido como marshalling.

Do lado consumidor, a thread GasProcessingThread periodicamente resgata dados da fila. No exemplo em questão, os dados estão somente sendo impressos na tela de um computador.

Lista 5. Implementação em C do modelo descrito na Figura 12

/*********************************
@file GasDataExample.h
*********************************/
#ifndef QueuingExample_H
#define QueuingExample_H
struct GasController;
struct GasData;
struct GasDataQueue;
struct GasDisplay;
struct GasProcessingThread;
struct HeSensor;
struct N2Sensor;
struct O2Sensor;
struct OSSemaphore;
struct SensorThread;
typedef enum GAS_TYPE {
 O2_GAS,
 N2_GAS,
 HE_GAS,
 UNKNOWN_GAS
} GAS_TYPE;
/* define the size of the queue */
#define GAS_QUEUE_SIZE (10)
/* OS semaphore services */
struct OSSemaphore* OS_create_semaphore(void);
void OS_destroy_semaphore(struct OSSemaphore* sPtr);
void OS_lock_semaphore(struct OSSemaphore* sPtr);
void OS_release_semaphore(struct OSSemaphore* sPtr);
#endif

/*********************************
@file GasData.h
*********************************/
#ifndef GasData_H
#define GasData_H
#include "QueuingExample.h"
typedef struct GasData GasData;
struct GasData {
double conc;
unsigned int flowInCCPerMin;
GAS_TYPE gType;
};
/* Constructors and destructors:*/
void GasData_Init(GasData* const me);
void GasData_Cleanup(GasData* const me);
GasData * GasData_Create(void);
void GasData_Destroy(GasData* const me);
#endif

//EOF
/*********************************
@file GasDataQueue.h
*********************************/
#ifndef GasDataQueue_H
#define GasDataQueue_H
#include "QueuingExample.h"
#include "GasData.h"
#include "OSSemaphore.h"
typedef struct GasDataQueue GasDataQueue;
struct GasDataQueue {
  int head;
  OSSemaphore * sema;
  int size;
  int tail;
  struct GasData itsGasData[GAS_QUEUE_SIZE];
};
/* Constructors and destructors:*/
void GasDataQueue_Init(GasDataQueue* const me);
void GasDataQueue_Cleanup(GasDataQueue* const me);
/* Operations */
int GasDataQueue_insert(GasDataQueue* const me, GasData g);
GasData * GasDataQueue_remove(GasDataQueue* const me);
int GasDataQueue_getItsGasData(const GasDataQueue* const me);
GasDataQueue * GasDataQueue_Create(void);
void GasDataQueue_Destroy(GasDataQueue* const me);
#endif

//EOF
/*********************************
@file GasDataQueue.c
*********************************/
#include "GasDataQueue.h"
#include <stdio.h>
/* private (static) methods */
static void cleanUpRelations(GasDataQueue* const me);
static int getNextIndex(GasDataQueue* const me, int index);
static unsigned char isEmpty(GasDataQueue* const me);
static unsigned char isFull(GasDataQueue* const me);
static void initRelations(GasDataQueue* const me);

void GasDataQueue_Init(GasDataQueue* const me) 
{
  me->head = 0;
  me->sema = NULL;
  me->size = 0;
  me->tail = 0;
  initRelations(me);
  me->sema = OS_create_semaphore();
}
void GasDataQueue_Cleanup(GasDataQueue* const me) 
{
  OS_destroy_semaphore(me->sema);
  cleanUpRelations(me);
}
/*
Insert puts new gas data elements into the queue
if possible. It returns 1 if successful, 0 otherwise.
*/
int GasDataQueue_insert(GasDataQueue* const me, GasData g)
{
  OS_lock_semaphore(me->sema);
  if (!isFull(me)) {
    me->itsGasData[me->head] = g;
    me->head = getNextIndex(me, me->head);
    ++me->size;
    /* instrumentation */
    /* print stuff out, just to visualize the insertions */
    switch (g.gType) 
    {
      case O2_GAS:
      printf("+++ Oxygen ");
      break;
      case N2_GAS:
      printf("+++ Nitrogen ");
      break;
      case HE_GAS:
      printf("+++ Helium ");
      break;
      default:
      printf("UNKNWON ");
      break;
   };
   printf(" at conc %f, flow %d\n",g.conc,g.flowInCCPerMin);
   printf(" Number of elements queued %d, head = %d, tail = %d\n",
   me->size, me->head, me->tail);
   /* end instrumentation */
   OS_release_semaphore(me->sema);
   return 1;
  }
  else 
  { 
    /* return error indication */
    OS_release_semaphore(me->sema);
    return 0;
  }
}
/*
remove creates a new element on the heap, copies
the contents of the oldest data into it, and
returns the pointer. Returns NULL if the queue
is empty
*/
GasData * GasDataQueue_remove(GasDataQueue* const me) 
{
  GasData* gPtr;
  OS_lock_semaphore(me->sema);
  if (!isEmpty(me)) 
  {
    gPtr = (GasData*)malloc(sizeof(GasData));
    gPtr->gType = me->itsGasData[me->tail].gType;
    gPtr->conc = me->itsGasData[me->tail].conc;
    gPtr->flowInCCPerMin = me->itsGasData[me->tail].flowInCCPerMin;
    me->tail = getNextIndex(me, me->tail);
    /* instrumentation */
    switch (gPtr->gType) 
    {
       case O2_GAS:
       printf("— Oxygen ");
       break;
       case N2_GAS:
       printf("— Nitrogen ");
       break;
       case HE_GAS:
       printf("— Helium ");
       break;
       default:
       printf("— UNKNWON ");
       break;
    };
    printf(" at conc %f, flow %d\n",gPtr->conc,gPtr->flowInCCPerMin);
    printf(" Number of elements queued %d, head = %d, tail = %d\n",
    me->size, me->head, me->tail);
 /* end instrumentation */
   OS_release_semaphore(me->sema);
   return gPtr;
  }
  else 
  { /* if empty return NULL ptr */
    OS_release_semaphore(me->sema);
    return NULL;
  }
}
static int getNextIndex(GasDataQueue* const me, int index) 
{
   /* this operation computes the next index from the
   first using modulo arithmetic
   */
   return (index+1) % QUEUE_SIZE;
}
static unsigned char isEmpty(GasDataQueue* const me) 
{
  return (me->size == 0);
}
static unsigned char isFull(GasDataQueue* const me) 
{
  return (me->size == GAS_QUEUE_SIZE);
}
int GasDataQueue_getItsGasData(const GasDataQueue* const me) 
{
  int iter = 0;
  return iter;
}
GasDataQueue * GasDataQueue_Create(void) 
{
  GasDataQueue* me = (GasDataQueue *)
  malloc(sizeof(GasDataQueue));
  if(me!=NULL) 
  {
    GasDataQueue_Init(me);
   }
 return me;
}
void GasDataQueue_Destroy(GasDataQueue* const me)
{
  if(me!=NULL) 
  {
    GasDataQueue_Cleanup(me);
  }
  free(me);
}
static void initRelations(GasDataQueue* const me) 
{
  int iter = 0;
  while (iter < GAS_QUEUE_SIZE)
  {
    GasData_Init(&((me->itsGasData)[iter]));
    iter++;
  }
}
static void cleanUpRelations(GasDataQueue* const me) 
{
  int iter = 0;
  while (iter < GAS_QUEUE_SIZE)
  {
    GasData_Cleanup(&((me->itsGasData)[iter]));
    iter++;
  }
}

De posse destes dados, entretanto, tarefas mais úteis poderiam ser executadas a depender das necessidades da planta, como manter a concentração de determinado gás constante, através de algum tipo de controle retroalimentado, por exemplo.

A Figura 13 mostra o sistema em execução. A tarefa GasProcessingThread é disparada primeiro, com um período de 1000 ms e a a tarefa SensorThread é posteriormente disparada a um período de 500 ms. A fila tem 10 elementos. Note que apesar de cada um dos três sensores terem uma chance de 1/3 de produzir dados neste intervalo, os dados estão sendo mais rapidamente inseridos do que removidos, até que a fila enche. [1]

Figura 13. Queueing exemplo sendo executado

4.4. Rendez-Vous

Se as condições para coordenar uma task são mais complexas que as apresentadas anteriormente, quando fundamentalmente estávamos a proteger um recurso de acesso mútuo, podemos “concretizar” estas condições em um objeto de fato. O padrão Rende-Vouz modela as pré-condições para a coordenação de tasks, na forma de um objeto explicitamente separado com seus próprios dados e funções. É um padrão generalista aplicado para garantir que um conjunto de pré-condições arbitrárias sejam atendidas em tempo de execução. [1]

Figura 14. Rende-vouz pattern modelado em UML [1]

4.4.1 Rationale

Duas ou mais tarefas podem ser sincronizadas utilizando-se de uma estratégia a sua escolha que será codificada na classe Rendevouz. O padrão é facilmente adaptável. Quando uma thread encontra um ponto de sincronização, ela registra-se em um objeto da classe Rendevouz, e bloqueia-se até que este objeto a libere para ser executado, usando qualquer que seja a política de scheduling do sistema. É como se um veículo parasse em algum ponto de inspeção na estrada – o fiscal, i.e., o objeto Rendevouz, só a deixa seguir adiante se as condições (papelada, pneus, etc.), estiverem todas satisfeitas.

4.4.2. Elementos

A classe Rendezvous coordena as tarefas através de duas funções primárias:

void reset(void): reseta os critérios de sincronização, i.e., coloca-os de volta em suas condições iniciais.

void synchronize(void): este método é chamado quando uma task quer ser sincronizada. Se os critérios não estão satisfeitos, esta task será de alguma forma bloqueada. A estratégia para pode ser através de um Observer Pattern ou um Guarded Call Pattern, por exemplo. A complexidade deste padrão concentra-se primariamente neste método que avalia se as condições estão satisfeitas. Estas condições podem ser internas (como a Thread Barrier do padrão), externas, ou qualquer combinação das duas.

Normalmente o objeto RendeVouz é agnóstico em relação às threads que coordena. Se não o for, o método synchronize() terá um mais parâmetros para identificação das threads. [1]

O Semaphore é um semáforo comum, com as operações de lock e release.

SynchronizingThread representa uma task que utiliza o objeto Rendezvous para ser sincronizada: quando atinge seu ponto de sincronização, deve explicitamente chamar o método synchronize().

4.4.3 Implementação

Se o padrão for implementado com auxílio de um Observer Pattern, então as tasks precisam registrarem-se com o endereço de um callback a ser chamado quando os critérios de sincronização foram atendidos. Se o padrão Guarded Call Pattern (veja publicação anterior) for utilizado, então cada objeto RendezVous tem somente um único semáforo, que maneja uma fila de tasks que registraram-se naquele objeto RendezVous. Note que neste último caso, o padrão acaba por ser uma solução custosa (overkill), mas que simplifica a implementação de guarded calls, ao concentrá-las em um único componente do programa. É o típico trade-off reusabilidade/custo, que normalmente nos deparamos nas escolhas de design.

4.4.4 Exemplo

No exemplo da Figura 15, uma forma específica do padrão Rendezvous, conhecida como Thread Barrier Pattern [1] é implementada.

Figura 15. Rendezvous implementando uma Thread Barrier

Note que neste snippet, a única informação que o objeto recebe, fora do seu escopo, é o número de tarefas a serem sincronizadas, através do atributo expectedCount. Quando o número de tasks que invocaram o método synchronize() atinge 3, as tarefas são liberadas para o kernel manejá-las com a política de agendamento que utiliza. Os objetos do tipo semáforo e barrier são ponteiros que assumem a referência passada pela task que chamou o ThreadBarrier. O snippet não mostra nenhuma lógica que avalie informações externas, portanto estamos simplesmente a implementar um Guarded Call Pattern, de uma maneira bastante reutilizável. O importante é perceber o forte caráter extensível deste padrão.

Vale aqui replicar a Figura 3 da primeira parte do artigo:

Figura 16. Representação dinâmica de uma Thread Barrier; neste caso expectedCount = 4

Lista 6. Implementação em C do modelo descrito na Figura 15

/******************************
@file ThreadBarrier.h
******************************/
#ifndef ThreadBarrier_H
#define ThreadBarrier_H
/*# # auto_generated */
#include <oxf/Ric.h>
/*# # auto_generated */
#include "RendezvousExample.h"
/*# # auto_generated */
#include <oxf/RiCTask.h>
/*# # package RendezvousPattern::RendezvousExample */
/*# # class ThreadBarrier */
typedef struct ThreadBarrier ThreadBarrier;
struct ThreadBarrier {
  int currentCount;
  int expectedCount;
  OSSemaphore* barrier;
  OSSemaphore* mutex;
};
/* Constructors and destructors:*/
void ThreadBarrier_Init(ThreadBarrier* const me);
void ThreadBarrier_Cleanup(ThreadBarrier* const me);
/* Operations */
void ThreadBarrier_reset(ThreadBarrier* const me, int x);
void ThreadBarrier_synchronize(ThreadBarrier* const me);
ThreadBarrier * ThreadBarrier_Create(void);
void ThreadBarrier_Destroy(ThreadBarrier* const me);
#endif
// EOF
/******************************
@file ThreadBarrier.c
******************************/
#include "ThreadBarrier.h"
void ThreadBarrier_Init(ThreadBarrier* const me) 
{
  me->currentCount = 0;
  me->expectedCount = 3;
  if (me->barrier) 
  {
    OSSemaphore_lock(me->barrier);
    printf("BARRIER IS LOCKED FIRST TIME\n");
 }
}
void ThreadBarrier_Cleanup(ThreadBarrier* const me) 
{
  OSSemaphore_Destroy(me->barrier);
  OSSemaphore_Destroy(me->mutex);
}
void ThreadBarrier_reset(ThreadBarrier* const me, int x) 
{
  me->expectedCount = x;
  me->currentCount = 0;
}
void ThreadBarrier_synchronize(ThreadBarrier* const me) 
{
/*
protect the critical region around
the currentCount
*/
  OSSemaphore_lock(me->mutex);
  ++me->currentCount; /* critical region */
  OSSemaphore_release(me->mutex);
/*
are conditions for unblocking all threads met?
if so, then release the first blocked
thread or the highest priority blocked
thread (depending on the OS)
*/
  if (me->currentCount == me->expectedCount) 
  {
   printf("Conditions met\n");
   OSSemaphore_release(me->barrier);
   //let the scheduler do its job
  }
/*
lock the semaphore and when condition met
then release it for the next blocked thread
*/
  OSSemaphore_lock(me->barrier);
  /* code to check if condition met */
  OSSemaphore_release(me->barrier);
}
ThreadBarrier * ThreadBarrier_Create(void) 
{
  ThreadBarrier* me = (ThreadBarrier *) malloc(sizeof(ThreadBarrier));
  if(me!=NULL)
  {
    ThreadBarrier_Init(me);
    return me;
  }
void ThreadBarrier_Destroy(ThreadBarrier* const me) 
{
   if(me!=NULL)
   ThreadBarrier_Cleanup(me);
   free(me);
}

Fim da segunda parte.

Todos os padrões e exemplos aqui apresentados são de: 
[1] Douglass, Bruce Powel. Design patterns for embedded C: an embedded software engineering toolkit, 1st ed. ISBN 978-1-85617-707-8

Padrões de design para comunicação interprocessos em software embarcado (1/3)

A quem interessar: A depender do dispositivo em que escrevi parte dos textos, algumas palavras estão grafadas em português brasileiro (o meu nativo) ou português europeu (quando o auto corretor assim desejou).

1. Introdução

Nos dois últimos artigos demonstrei um pequeno kernel com escalonador de tarefas cíclico, porém preemptivo (round-robin) implementado em um ARM Cortex-M3. Além do escalonamento de tarefas também separavam-se os processos de usuário dos processos de supervisor, o que trouxe a necessidade da implementação de mecanismos de System Calls.

Durante os artigos, muitas vezes mencionei que o kernel ainda não estava preparado para a sincronização de tarefas e gerenciamento de recursos, ou de forma geral, não havia mecanismos para comunicação interprocessos.

Se quisermos generalizar o uso deste kernel – i.e., reutilizá-lo para uma solução em que as tasks não atuam isoladamente, um ou mais mecanismos de IPC é fundamental. Se por um lado os processos precisam cooperar para atingir determinados objetivos, por outro eles concorrem pelo uso limitado de recursos. [4]

2. IPCs: proteger e coordenar

O fenômeno que nos leva à necessidade de comunicação interprocessos é o da  Concorrência: os recursos finitos de um computador faz com que seja necessário coordenar as diversas actividades que concorrem simultaneamente ou paralelamente. Isto nos leva a ter que proteger uma atividade dos efeitos de outra e sincronizar atividades que são mutualmente dependentes. [4]

Figura 1. Multithreading e multiprocessing, conceitualmente [2]

Na publicação passada em que três tasks acessavam o mesmo hardware para imprimir na tela do PC, a coordenação para compartilhar o recurso foi feita com o sinal de TX_READY lido no registro do próprio dispositivo. Independente da quota de tempo que cada task tinha para ser executada, o kernel só liberava o processador após garantir que os dados da UART haviam sido transmitidos ao computador. Uma forma de IPC que não abstrai o hardware (UART) ou a aplicação (transmitir dados).

static void SysCallUART_PrintLn_(const char* args)
{
    __disable_irq(); // sessão crítica inicio
    uart_write_line(UART, args); // transmite dados        
    while (uart_get_status(UART) != UART_SR_TXRDY); //aguarda fim da transmissão
    __enable_irq(); // sessão crítica fim
    exitKernel_();
}

Lista 0. System Call lendo diretamente o sinal de TX_READY em uma critical region para coordenar as tarefas de acesso à UART

2.1. A Unidade Concorrente

O design do modelo de concorrência é uma parte crítica na arquitetura do sistema a ser desenvolvido. Vamos definir Process, Task e Thread.

Entendo um Process como algo estático (previamente definido) e encapsulado. Uma Task é um processo em execução. As Threads são tasks que acabam por ser dependentes entre si: uma consequência da topologia do sistema em que estes processos são executados [2, 3]. Se analisarmos a Figura 1, é intuitivo notar que a multithreading implica em complexidade de implementação em troca de economia/limitação de recursos.

Para efetivamente entender estes conceitos e modelar a arquitectura mais adequada ao gerenciamento da concorrência de tarefas em um dado sistema, a melhor definição que eu conheço é a de Unidade Concorrente [1]: 

«Unidade Concorrente é um conjunto de ações seqüenciais no mesmo segmento de execução. Também conhecido como task ou thread.» Em [2] uma Thread é definida como o «contexto de um processo em execução» – i.e., o contexto de uma Task – o que não se sobrepõe ou contradiz a definição dada por [1], e complementa a ideia de que uma thread é essencialmente dinâmica. Não à toa, a estrutura de dados em que salvamos o contexto de uma task é classicamente chamada de Thread Control Block.

Powell em [1] ainda observa: «muitos fazem uma distinção entre Task, Thread e Process. Entretanto, todos são o mesmo conceito em diferentes escopos.» Novamente analise a Figura 1. Se o “escopo”for a granularidade, diante do que consta na literatura apresentada, podemos dizer sem danos que: a thread é a menor unidade concorrente de um sistema quando em execução.

Figura 2. A, B, C e D são Unidades Concorrentes: a) A compartilhar um único core (pseudoconcorrentes)  b) Cada unidade tem seu próprio core c) Em execução, somente uma unidade concorrente estará activa em um dado instante de tempo (adaptado de [3])

2.2 A necessidade de sincronização

Antes de prosseguirmos, mais uma definição:

«Pontos de sincronização são aqueles em que as tasks compartilharão informações e/ou garantirão que determinadas pré-condições estejam satisfeitas antes  de prosseguirem.» [1]

A Figura 3 ilustra esta ideia. A barreira de sincronização só permite que o programa siga adiante quando todos os processos tiverem atingido aquele ponto – i.e., as condições para que todas as ações do sistema estejam corretamente sincronizados são verdadeiras.

Figura 3. a) Processos em execução b) Todos os processos exceto C estão prontos c) Assim que o processo C atinge o ponto de sincronização, o programa segue adiante. [3]

Eu prefiro o termo coordenação a sincronização, porque sincronia não é necessariamente um fenômeno desejável ou relacionado entre duas partes, portanto, gosto da ideia de pontos de coordenação. A literatura entretanto normalmente utiliza o termo sincronizado como sinônimo de coordenado.

A Figura 4 mostra unidades concorrentes, modeladas em três tasks. O que sabemos sobre elas? Se a analisarmos individualmente fica claro que na Task 1 a Acção A ocorre antes da B, que percorre um caminho condicional até chegar a E. O mesmo raciocínio  vale para as Tasks 2 e 3.

De que forma as acções individuais das Tasks 1, 2, e 3 relacionam-se entre si? Se a resposta (verdadeira) for “não se relacionam, ou não importa“, não há nenhum problema de concorrência a ser resolvido. E provavelmente o teu sistema é muito simples ou  muito bem desenhado.

Figura 4. Unidades concorrentes representadas como fluxogramas [1]

Entretanto, se em algum ponto é necessário garantir, por exemplo, que as Acções F, X e Zeta sejam executadas somente depois que as Ações E, Z e Gamma atingirem determinadas condições, precisamos de alguma forma reunir (join) cada um dos estados destas tasks em logicamente uma única thread, a partir de um ponto de sincronização. Após as condições necessárias terem sido atingidas no ponto de sincronização, as threads são novamente separadas (fork), e o programa segue.

Isto significa que sempre que houver condições entre dois ou mais processos para atingir determinado objectivo, precisamos criar mecanismos para que estes se comuniquem.

2.2.1. Mecanismos de Exclusão mútua

Recursos podem ser classificados como compartilháveis ou não compartilháveis. Isto depende da natureza física ou lógica de determinado recurso [4]. Por exemplo, a menor unidade de memória do sistema não pode ser lida e escrita ao mesmo tempo por dois processos distintos. Já uma estrutura de dados que guarda um resultado para ser consumido por um processo, precisa ser protegida para não ser sobrescrita antes de consumida (ou enquanto consumida, se pensarmos em pseudo-paralelismo), porque isto causará uma falha lógica no sistema. A exclusão mútua coordena o acesso de várias unidades concorrentes a um recurso.

2.2.2. Falha por deadlock

Quando processos concorrem por recursos, podemos chegar em uma situação em que nenhum deles consegue seguir adiante. Suponhamos dois processos A e B. O processo A está bloqueado, a espera de que um recurso seja liberado para seguir adiante. O processo B por sua vez só vai liberar este recurso quando determinada condição for satisfeita. Se esta condição depender do processo A, que já está bloqueado, temos um deadlock

3. Modelagem de Tasks em UML

Em UML há 3 maneiras de modelar concorrência [1]: objetos com o estereótipo «active» em diagramas estruturais, forks e joins em diagramas de seqüência, e regiões ortogonais em diagramas de máquinas de estado. Um objeto com o estereótipo «active» em geral é uma estrutura de outros objetos. O objeto ativo tem uma thread associada, que executa a ações sequenciais que correm em um contexto (contexto este que pode ou não estar representado ou inferido do diagrama). O símbolo de um objeto ativo pode tanto dar-se como um objeto comum com o estereótipo «active», ou com as bordas esquerda e direita destacadas. Na Figura 5, uma Task é modelada com cinco objetos ativos, um recurso (database) compartilhado e protegido por um semáforo e comentários com os metaparâmetros de cada thread associada (prioridade, período e deadline – parâmetros estes que dependem escalonador utilizado). Objetos «active» que não são um conjunto de outros objetos, como itsBuiltInTestThread  ou itsControlThread correm na sua própria thread. Por fim, em alguns casos utilizamos portas nas Tasks para conectarem-se diretamente com objetos – o que geralmente indica que passaremos estes valores por cópia, e em outros utilizamos links, o que normalmente indica que estes valores serão passados por referência.

4. Padrões para exclusão mútua

Quatro padrões de design serão aqui apresentados com os seus respectivos modelos UML: Critical Region, Guarded Call, Message Queue e Rendez-vous. Todos estes padrões estão descritos em [1].

4.1. Critical Region Pattern

Esta é a forma mais primitiva de coordenar tasks: desligar os mecanismos que poderiam interromper um trecho de execução, para evitar que a task ceda lugar a outra antes de as condições necessárias serem atendidas, ou para evitar a concorrência de acesso a um recurso.

Figura 6.Generalização do Padrão Critical Section em UML [1]

4.1.1. Rationale

Este padrão é utilizado em sistemas com escalonamento preemptivo, quando múltiplas unidades concorrentes podem acessar um mesmo recurso ou interromper uma tarefa em andamento. Ao desabilitarmos a troca de tarefas em determinado trecho de execução, garantimos a atomicidade da operação.

4.1.2 Elementos

CRSharedResource: O recurso a ser protegido é o  atributo value. Qualquer trecho de código que utilize os métodos setValue e getValue precisam ser colocados em regiões críticas.

TaskWithSharedResource: Este elemento simboliza o conjunto de tasks que acessam o recurso anteriormente mencionado (por isso a multiplicidade ‘*’ na associação de dependência). As tarefas não têm conhecimento de que recurso esta a ser utilizado; ele está encapsulado dentro do objeto CRSharedResource.

4.1.3. Implementação

Usualmente o kernel provê uma API com funções como osDisableTaskSwitch()/osEnableTaskSwitch(). Caso não, alguma diretiva em C/ASM pode ser utilizada para desabilitar as interrupções diretamente em hardware – no caso da API de abstração de hardware CMSIS para processadores ARM, esta diretiva seria __disable_irq()/__enable_irq(), e estamos na verdade a desabilitar toda e qualquer interrupção do sistema. Na publicação anterior, vários trechos do código do kernel estavam em critical regions porque necessitavam ser atômicos.

4.1.4. Exemplo

Na Figura 6, um hipotético sistema para mover o braço de um robô. Uma única Task move este braço (CRRobotArmManager). As entradas do usuário são as coordenadas (x, y, z) no espaço,  passadas por referência à Task.

É importante notar a estratégia para desacoplamento entre a Task e os objetos a ela associados: as regiões críticas são implementadas dentro dos métodos da Task, e não no método moveTo, por exemplo. A definição de regiões críticas dentro do método moveTo fatalmente acoplaria uma entidade que não é da natureza do kernel, o braço do robô - que deve poder ser movimentado por outros sistemas operacionais - à uma entidade totalmente acoplada ao kernel, a Task. Se não houver justificativa, o contrário é uma má escolha de design.
Figura 7. Exemplo de Task modelada que necessita do uso de sessão crítica [1]

Abaixo, o cabeçalho do programa, e a implementação de uma operação para facilitar a compreensão da linguagem UML [1]:

Lista 1. Cabeçalho da Task CRRobotArmManager [1]

#ifndef CRRobotArmManager_H
#define CRRobotArmManager_H
struct CRDisplay;
struct RobotArm;
struct UserInput;
typedef struct CRRobotArmManager CRRobotArmManager;
struct CRRobotArmManager 
{
  struct CRDisplay* itsCRDisplay;
  struct RobotArm* itsRobotArm;
  struct UserInput* itsUserInput;
};
/* Constructors and destructors:*/
void CRRobotArmManager_Init(CRRobotArmManager* const me);
void CRRobotArmManager_Cleanup(CRRobotArmManager* const me);
/* Operations */
void CRRobotArmManager_motorZero(CRRobotArmManager* const me);
void CRRobotArmManager_moveRobotArm(CRRobotArmManager* const me);
struct CRDisplay* CRRobotArmManager_getItsCRDisplay(const
CRRobotArmManager* const me);
void CRRobotArmManager_setItsCRDisplay(CRRobotArmManager* const
me, struct CRDisplay* p_CRDisplay);
struct RobotArm* CRRobotArmManager_getItsRobotArm(const
CRRobotArmManager* const me);
void CRRobotArmManager_setItsRobotArm(CRRobotArmManager* const me,
struct RobotArm* p_RobotArm);
struct UserInput* CRRobotArmManager_getItsUserInput(const
CRRobotArmManager* const me);
void CRRobotArmManager_setItsUserInput(CRRobotArmManager* const me,
struct UserInput* p_UserInput);
CRRobotArmManager * CRRobotArmManager_Create(void);
void CRRobotArmManager_Destroy(CRRobotArmManager* const me);
#endif

Lista 2. Método com o uso de critical regions [1]

void CRRobotArmManager_moveRobotArm(CRRobotArmManager* const me) 
{
 /* local stack variable declarations */
 int x, y, z, success = 1; 
 /*noncritical region code */
 /* note that the function below has its own critical region and so cannot be
 called inside of the critical region of this function
 */
 CRRobotArmManager_motorZero(me);
 x = UserInput_getX(me->itsUserInput);
 y = UserInput_getY(me->itsUserInput);
 z = UserInput_getX(me->itsUserInput);
 /* critical region begins */
 OS_disable_task_switching();
 /* critical region code */
 success = RobotArm_moveTo(me->itsRobotArm, x, y, z);
 /* critical region ends */
 OS_enable_task_switching();
 /* more noncritical region code */
 CRDisplay_printInt(me->itsCRDisplay, "Result is ", success);
}

4.2. Guarded Call Pattern

O padrão Guarded Call, serializa o acesso a um conjunto de serviços que potencialmente causam danos se chamados simultaneamente. Neste padrão, o acesso serializado é implementado através de um semáforo que previne que outras threads invoquem este serviço enquanto ele estiver em uso.

Figura 8. Generalização do Guarded Call Pattern modelado em UML [1]

4.2.1 Rationale

Múltiplas tarefas preemptivas acessam um recurso protegido invocando os métodos deste. O escalonador de tarefas por sua vez deve colocar a task que tentou acessar um recurso ocupado em uma fila de tasks bloqueadas, desbloqueando-a assim que o recurso for liberado.

4.2.2. Elementos

GuardedResource é um recurso compartilhado que emprega um Semaphore do tipo mutex para forçar a exclusão mútua entre múltiplas PreemptiveTasks (note que existe um único semáforo agregado a um único recurso, e portanto é um semáforo para um elemento, ou seja, um mutex1). Quando uma tarefa quer utilizar este recurso, as funções relevantes deste chamarão ao semáforo para colocá-lo em lock. Caso o recurso esteja livre, ele é locked e a tarefa pode utilizá-lo, e liberá-lo ao final. Caso o recurso já esteja em locked, este sinaliza ao escalonador para colocar a tarefa atual em estado blocked até que ele seja liberado. Outras tarefas que tentam acessar o recurso enquanto ele está bloqueado também tornar-se-ão blocked. Cabe ao escalonador do sistema fazer o manejo desta fila de tarefas bloqueadas a espera do recurso; ou seja, quais critérios serão utilizados para cada tarefa na fila – pode ser a prioridade, por exemplo.2

1 a implementação de semáforos/ mutexes é um assunto que será abordado na continuidade da construção do kernelzito já apresentado.

2 um problema comum nesta implementação é quando ocorre a chamada “inversão de prioridade”

4.2.3 Implementação

A parte mais complicada é a implementação dos semáforos. Normalmente o sistema operacional provê semáforos, acessados através de uma API do tipo:

  • Semaphore* osCreateSemaphore(): cria um semáforo e retorna um ponteiro para a instância.
  • void osDestroySemaphore(Semaphore*): destrói a instância semáforo para o qual aponta.
  • void osLockSemaphore(Semaphore*): trava o dispositivo associado ao semáforo se ele estiver livre. Caso não esteja, o contexto da tarefa que o requisitou é salvo, bem como a ID do semáforo do recurso requisitado, e a tarefa é colocada em blocked.
  • void osUnlockSemaphore(Semaphore*): destrava o dispositivo associado ao semáforo, e libera a tarefa que estava a sua espera.

4.2.4 Exemplo

A Figura 9 mostra a porção de um hipotético controlador de vôo. A classe KinematicData é compartilhada por diversos clientes. Para garantir a serialização de acesso um semáforo é instanciado neste recurso.

Figura 9. Padrão Guarded Pattern aplicado a um controlador de vôo [1]

Isto evita que o método FlightDataDisplay::ShowFlightData()seja interrompido enquanto disponibiliza os dados de vôo, pelo método Navigator::updatePosition() por exemplo. Perceba que as Tasks não estão modeladas. A utilização do semáforo está acoplada ao escalonador – o seu estado quando a thread é disparada influencia no manejo da fila de threads no qual cada método em espera será executado. Abaixo snippets de parte da implementação para facilitar o entendimento.

Lista 3. Cabeçalho da classe KinematicData

#ifndef KinematicData_H
#define KinematicData_H
#include "GuardedCallExample.h"
#include "Attitude.h"
#include "OSSemaphore.h"
#include "Position.h"
typedef struct KinematicData KinematicData;
struct KinematicData 
{
 struct Attitude attitude;
 struct Position position;
 OSSemaphore* sema; /*## mutex semaphore */
};
/* Constructors and destructors:*/
void KinematicData_Init(KinematicData* const me);
void KinematicData_Cleanup(KinematicData* const me);
/* Operations */
Attitude KinematicData_getAttitude(KinematicData* const me);
struct Position KinematicData_getPosition(KinematicData* const me);
void KinematicData_setAttitude(KinematicData* const me, Attitude a);
void KinematicData_setPosition(KinematicData* const me, Position p);
KinematicData * KinematicData_Create(void);
void KinematicData_Destroy(KinematicData* const me);
#endif

Lista 4. Implementação da classe KinematicData

#include "KinematicData.h"
void KinematicData_Init(KinematicData* const me) 
{
  Attitude_Init(&(me->attitude));
  Position_Init(&(me->position));
  me->sema = OS_create_semaphore();
}
void KinematicData_Cleanup(KinematicData* const me) 
{
  OS_destroy_semaphore(me->sema);
}
struct Position KinematicData_getPosition(KinematicData* const me) 
{
  Position p;
  /* engage the lock */
  OS_lock_semaphore(me->sema);
  p = me->position;
  /* release the lock */
  OS_release_semaphore(me->sema);
  return p;
}
void KinematicData_setAttitude(KinematicData* const me, Attitude a) 
{
/* engage the lock */
  OS_lock_semaphore(me->sema);
  me->attitude = a;
  /* release the lock */
  OS_release_semaphore(me->sema);
}
void KinematicData_setPosition(KinematicData* const me, Position p) 
{
  /* engage the lock */
  OS_lock_semaphore(me->sema);
  me->position = p;
  /* release the lock */
  OS_release_semaphore(me->sema);
}
KinematicData * KinematicData_Create(void) 
{
  KinematicData* me = (KinematicData *)
  malloc(sizeof(KinematicData));
  if(me!=NULL) 
  {
   KinematicData_Init(me);
  }
  return me;
}
void KinematicData_Destroy(KinematicData* const me) 
{
  if(me!=NULL) 
  {
   KinematicData_Cleanup(me);
  }
  free(me);
}
Attitude KinematicData_getAttitude(KinematicData* const me) 
{
  Attitude a;
  /* engage the lock */
  OS_lock_semaphore(me->sema);
  a = me->attitude;
  /* release the lock */
  OS_release_semaphore(me->sema);
  return a;
}

Fim da primeira parte

Referências:
[1] Design Patterns for Embedded Systems in C (Douglass Powell, 2009)
[2] Embedded Systems: Real-Time Operating Systems for Arm Cortex-M MCUs (Jonathan Valvanno, 2012)
[3] Modern Operating Systems (Andrew Taneunbaum, Herbert Boss, 2014)
[4] Fundamentals of Operating Systems (A. M. Lister, 1984)

Separando espaços de supervisor e usuário no ARM Cortex-M3

In English

1. Introdução

Os processadores ARM Cortex-M estão integrados a SoCs para domínios diversos, principalmente em muito daquilo que chamamos de dispositivos smart. Esta publicação é uma continuação da anterior, em que demonstrei a implementação de um mecanismo preemptivo mínimo para um ARM Cortex-M3, aproveitando-se dos recursos de hardware especiais para a troca de contexto.

Alguns recursos importantes desta arquitetura serão agora demonstrados como a separação de threads de usuário e kernel – com 2 stack pointers: MSP e PSP e a utilização de Supervisor Calls para implementar chamadas ao sistema.

Apesar de alguns conceitos sobre sistemas operacionais serem abordados porque são inerentes ao assunto, o objetivo é a exploração do ARM Cortex-M3, um processador relativamente barato e de larga aplicabilidade, e como aproveitá-lo para desenvolver sistemas mais robustos.

2. Registradores especiais

São 3 os registradores especiais no ARM Cortex-M3. Você pode consultar a documentação da ARM para entender o papel de cada um deles no processador. O mais importante nesta publicação é o CONTROL.

  • Program Status Registers (APSR, IPSR, e EPSR)
  • Interrupt Mask Registers (PRIMASK, FAULTMASK e BASEPRI)
  • Control (CONTROL).

Registradores especiais somente podem ser acessado através das instruções privilegiadas MRS (ARM Read Special Register) e MSR (ARM Set Special Register):

//Carrega em R0 o atual valor contido no registrador especial
MRS R0, SPECIAL 
//Carrega no registrador especial o valor contido em R0)
MSR SPECIAL, R0 

O registrador CONTROL tem somente 2 bits configuráveis. Quando um serviço de excepção (e.g., SysTick_Handler) estiver a ser executado, o processador estará em modo privilegiado e utilizando o main stack pointer (MSP), e CONTROL[1] = 0, CONTROL[0] = 0. Em outras rotinas que não sejam handlers, este registrador pode assumir diferentes valores a depender da implementação do software (Tabela 1).

No pequeno kernel mostrado anteriormente, as tasks da aplicação (Task1, Task2 e Task3) também eram executadas em modo privilegiado e utilizando o stack pointer principal (MSP). Assim, um programa da aplicação poderia alterar os registradores especiais do core se quisesse.

3. Sistema com 2 stack pointers e diferentes privilégios

Na última publicação ressaltei o fato de o registrador R13 não fazer parte do stackframe, pois é ele justamente que guarda o endereço do stack pointer. O R13 é um registrador do tipo “banked” (não conheço uma boa tradução para português), significando que ele é fisicamente replicado, e assume um valor ou outro a depender do estado do core.

CTRL[1] (0=MSP/1=PSP)CTRL[0] (0=Priv, 1=Non priv)Estado
00Privileged handler* / Base mode
01Unprivileged
10Privileged thread
11User thread
Tabela 1. Estados possíveis do registrador CONTROL
*em rotinas que atendem às exceções este modo estará sempre ativo mesmo que o CTRL[0] = 1.

Com dois stack pointers, um para aplicação e outro para o kernel, significa que uma thread de usuário não poderá facilmente corromper o stack pointer do kernel por um erro de programação na aplicação. Os privilégios por sua vez evitam que a aplicação sobrescreva registradores especiais. De acordo com os manuais da ARM, um sistema operacional robusto tipicamente tem as seguintes características:

  • serviços de interrupção utilizam o MSP (por default da arquitetura)
  • rotinas do kernel são ativadas através do SysTick em intervalos regulares para executar, em modo privilegiado, o escalonamento das tarefas e gerenciamento do sistema
  • aplicações de usuário são executadas como threads, usam o PSP em modo não-privilegiado
  • a memória para as stacks do kernel e handlers são apontadas pelo MSP, sendo que os dados stack só podem ser acessados em modo privilegiado*
  • a memória para a stack das aplicações é apontada pelo PSP

*por enquanto não vamos isolar os espaços de memória

4. Chamadas ao sistema (System Calls)

Em uma perspectiva simples, uma chamada de sistema é um método no qual um software requisita um serviço/mecanismo do kernel ou SO sobre o qual está rodando: gerenciamento de processos (escalonamento, mecanismos de IPC), acesso a algum recurso de hardware, sistema de arquivos, entre outros, a depender da arquitetura do sistema operacional.

Se pretendemos separar nosso sistema em níveis de privilégio é inevitável que o nível da aplicação precise fazer chamadas ao kernel para ter acesso a, por exemplo, serviços de hardware ou o que mais julgarmos crítico para a segurança e estabilidade do sistema.

Uma maneira comum de implementar system calls no ARM Cortex-M3 (e em outros cores baseados em ARMv7) é a utilização da interrupção por software chamada Supervisor Call (SVC). O SVC funciona como um ponto de entrada para um serviço que necessita de privilégios para ser executado. O único parâmetro de entrada de um SVC é o seu número (instrução ASM: SVC #N), ao qual associamos a uma chamada de função (callback). Ao contrário da outra excepção disparada via software disponível, que é o PendSV (Pendable Supervisor Call), o SVC pode ser disparado em modo usuário. Apesar de serem destinadas a diferentes usos – o PendSV é tipicamente usado como forma de “agendar” tarefas do kernel menos críticas e não perder ticks do sistema, é possível configurar o core para que usuários também possam disparar o PendSV.

Figura 2. Diagrama de blocos de uma possível arquitetura de sistema operacional no M3. Supervisor Calls funcionam como ponto de entrada para serviços privilegiados. [2]

5. Design

5.1 Utilização de dois stack pointers

Para a utilizar os dois stack pointers disponíveis (MSP e PSP) é fundamental entender 2 coisas:

  • A manipulação do registrador de controle: só é possível escrever ou ler o registrador de controle em modo handler (na rotina que atende à uma exceção) ou em threads privilegiadas.
  • O mecanismo de exceções: quando uma interrupção ocorre, o processador guarda na stack o conteúdo dos registradores R0-R3, LR, PC e xPSR, como explicado na publicação anterior. O valor do LR quando entramos em uma exceção indica o modo que o processador estava a rodar, quando a thread foi interrompida. Podemos manipular este valor de LR juntamente com a manipulação dos stack pointer para controlar o fluxo do programa.
LRBX LR
0xFFFFFFF9Volta para modo “base”, MSP privilegiado. (CONTROL=0b00)
0xFFFFFFFDVolta para user mode (PSP, com o nível de privilégio da entrada) (Control = 0b1x)
0xFFFFFFF1Volta para a interrupção anterior, no caso de uma interrupção de maior prioridade ocorrer durante uma de menor prioridade.
Tabela 2. Valores para retorno de exceção

5.1.1. Um kernel stack para cada user stack

Cada stack dedicada ao usuário terá uma stack de kernel correspondente (one kernel stack per thread). Assim, cada Task (thread principal) terá stacks de kernel e usuário associadas. Uma outra abordagem seria uma stack somente de kernel no sistema (one kernel stack per processor). A vantagem de utilizar a primeira abordagem é que além das threads do kernel poderem ser preemptivas, do ponto de vista de quem implementa o sistema, os programas que rodam no kernel seguem o mesmo padrão de desenvolvimento dos programas de aplicação. A vantagem da segunda abordagem é menor overhead de memória e menor latência nas trocas de contexto.

Figura 3. Cada user stack tem uma kernel stack associada

5.2 Mecanismos de entrada e saída do kernel

Na publicação anterior a interrupção feita pelo SysTick manejava a troca de contexto, i.e., interrompia a thread em execução, salvava seu stackframe, buscava a próxima thread apontada pelo campo next na estrutura TCB (thread control block) e a resumia.

Com a separação dos espaços de usuário e supervisor, teremos dois mecanismos para entrada e saída do kernel: as system calls, explicitamente chamadas em código, e a interrupção por SysTick que implementa a lógica de escalonamento (scheduling). Outro ponto a destacar, é que apesar de continuar utilizando um esquema round-robin simples, em que cada tarefa tem o mesmo tempo de execução, as threads do kernel também funcionarão de forma cooperativa com a a thread do usuário que a evocou, isto é: quando não houver mais nada a ser feito, o kernel pode explicitamente retornar. Se a thread do kernel demorar mais que o tempo entre um tick e outro, será interrompida e reagendada. As tarefas do usuário também poderiam utilizar-se de mecanismo similar, entretanto, por simplicidade para exposição, optei por deixar as tarefas de usuário somente em esquema round-robin fixo.

5.2.1. Escalonador (task scheduler)

O fluxograma do escalonador preemptivo a ser implementado está na Figura 4. O start-up do kernel e aplicação do usuário é também mostrado para maior clareza. O kernel é inicializado e voluntariamente inicia a primeira task de usuário. A cada interrupção por SysTick, a thread tem seu estado salvo e a próxima task agendada é resumida consoante ao estado em que foi interrompida: kernel ou user mode.

Figura 4. Fluxograma do escalonador

5.2.2 Chamadas ao sistema (System Calls)

As chamadas ao sistema ocorrem quando o usuário requisita acesso a um serviço privilegiado. Além disso, também utilizo o mesmo mecanismo para que tarefas privilegiadas do kernel retornem cooperativamente à tarefa do usuário.

Perceba que optei por não executar o as threads de kernel dentro do próprio handler, o que seria mais intuitivo, além de usual. As razões para isto são porque quis aproveitar o próprio mecanismo de interrupção do processador, que no seu retorno faz o POP dos registradores RO-R3, LR, PC e xPSR, e também para evitar o aninhamento de interrupções durante a preempção de tarefas do kernel. Caso eu tivesse optado por utilizar somente uma kernel stack para todas as threads, a implementação dentro do próprio handler julgo que seria melhor.

Figura 5. Fluxograma das system calls

6. Implementação

Abaixo explico os códigos criados para implementar as provas de conceito das funcionalidades anteriormente descritas. A maior parte do kernel propriamente dito é escrito em assembly, exceto para uma porção do handler de supervisor calls que é escrita em C com algum inline assembly. Na minha opinião, mais trabalhoso e suscetível a erros que escrever em assembly é embutir assembly no código C. A toolchain utilizada é a GNU ARM.

6.1. Stacks

Não há nada em especial aqui, exceto que agora além da stack de usuário, declaramos outro array de inteiros para a stack do kernel. Estes serão associados no Thread Control Block.

int32_t p_stacks[NTHREADS][STACK_SIZE]; // stack de usuário
int32_t k_stacks[NTHREADS][STACK_SIZE]; // stack do kernel

6.2. Task Scheduler

A principal diferença deste para o escalonador mostrado na última publicação é que agora manejaremos 2 stack pointers distintos: o MSP e o PSP. Assim, quando entramos em um handler de exceção, a porção do stackframe salvo automaticamente depende do stack pointer utilizado quando a exceção foi disparada. Entretanto na rotina de exceção o stack pointer ativo é sempre o MSP. Desta forma, para podermos manejar um stack pointer quando estamos a operar com outro, não poderemos utilizar as instruções PUSH e POP porque estas têm como endereço base o stack pointer ativo. Teremos de substituí-las pelas instruções LDMIA (load multiple and increment after, i.e, após o primeiro load incrementa-se o endereço base) para o POP, e STMDB (store multiple decrement before, antes do primeiro store decrementa-se o endereço base) para o PUSH, com o sinal de writeback “!” no endereço base [1].

// Exemplo de POP 
MRS R12, PSP // lê o valor do process stack pointer em R12
LDMIA R12!, {R4-R11} // R12 contém o endereço base (PSP)
/* o endereço contido em R12 agora armazena o valor de R4, [R12] + 0x4  
 contém o valor de R5, e assim por diante até que [R12] + 0x20 contém o 
 valor de R11.
 o valor inicial de R12 também é incrementado em 32 bytes 
*/
MSR PSP, R12 // PSP é atualizado para o novo valor de R12

// Exemplo de PUSH
MSR R12, MSP 
STMDB R12!, {R4-R11} 
/* [R12] - 0x20 contém R4, [R12] - 0x16 contém R5, ..., [R12] contém R4 
 o valor inicial de R12 é decrementado em 32 bytes */
MSR MSP, R12 // MSP é atualizado para o novo valor de R12 

Outra diferença é que agora a estrutura TCB precisa conter um ponteiro para cada um dos stack pointers da thread que controla, e também uma flag indicando se a tarefa a ser resumida estava utilizando o MSP ou o PSP quando foi interrompida.

// thread control block
struct tcb 
{
  int32_t*	psp; //psp salvo da ultima user thread interrompida
  int32_t*	ksp; //ksp salvo da ultima kernel thread interrompida	
  struct tcb	*next; //aponta para o proximo tcb
  int32_t	pid; //id da task (é o "i" do kSetInitStack(i))
  int32_t	kernel_flag; // 0=kernel, 1=user	
};
typedef struct tcb tcb_t;
tcb_t tcb[NTHREADS]; //array de tcbs
tcb_t* RunPtr; 

Abaixo a rotina que escrevi para implementação. O código foi escrito de forma a ficar claro em sua intenção, sem tentar economizar instruções. Perceba que na linha 5 o valor de LR assumido na entrada da excepção só é comparado com 0xFFFFFFFD, caso falso assume-se que ele é 0xFFFFFFFF9, isto porque garanto que não haverá interrupções aninhadas (o SysTick nunca interrompe um SVC, por exemplo), assim o LR nunca deve assumir 0xFFFFFFF1. Para propósitos que não uma prova de conceito, o teste deveria ser considerado.

.global SysTick_Handler
.type SysTick_Handler, %function
SysTick_Handler:			
CPSID	I //atomica inicio
CMP	LR, #0xFFFFFFFD //verifica se retornou de uma user thread
BEQ	SaveUserCtxt	//se sim, branch para save user context
B	SaveKernelCtxt  //se nao

SaveKernelCtxt:
MRS	R12, MSP 
STMDB	R12!, {R4-R11}  //push R4-R11
MSR	MSP, R12
LDR	R0,=RunPtr      //RunPtr aponta para a tcb atual
LDR	R1, [R0]
LDR	R2, [R1,#4]
STR	R12, [R2]  //salva stack pointer
B	Schedule

SaveUserCtxt:
MRS	R12, PSP
STMB	R12!, {R4-R11}
MSR	PSP, R12
LDR	R0,=RunPtr
LDR	R1, [R0]
STR	R12, [R1]		
B	Schedule

Schedule:
LDR	R1, =RunPtr //R1 <- RunPtr
LDR	R2, [R1]	
LDR	R2, [R2,#8] //R2 <- RunPtr.next
STR	R2, [R1]    //atualiza valor de RunPtr
LDR	R0, =RunPtr
LDR	R1, [R0]
LDR	R2, [R1,#16]
CMP	R2, #1	     //verifica se kernel_flag=1
BEQ	ResumeUser   //sim, resume user thread
B	ResumeKernel //nao, resume kernel thread

ResumeUser:
LDR	R1, =RunPtr  //R1 <- RunPtr
LDR	R2, [R1]
LDR	R2, [R2]
LDMIA	R2!, {R4-R11} //Resgata sw stackframe 
MSR	PSP, R2		
MOV	LR, #0xFFFFFFFD	//LR=return to user thread
CPSIE	I		//atomica fim
BX	LR 

ResumeKernel:
LDR	R1, =RunPtr   //R1 <- RunPtr atualizado
LDR	R2, [R1]
LDR	R2, [R2, #4]
MSR	MSP, R2
LDMIA	R2!, {R4-R11} //Resgata sw stackframe 
MSR	MSP, R2
MOV	LR, #0xFFFFFFF9	 //LR=return to kernel thread
CPSIE	I		 //atomic fim
BX	LR 

6.3 System Calls

A implementação dos system calls utiliza o SVC Handler. Como dito, o único parâmetro de entrada do SVC é o número que associamos a um callback. Mas então como passamos os argumentos para o callback? Eles precisam ser buscados na stack. O padrão AAPCS (ARM Application Procedure Call Standard), que é seguido pelos compiladores, diz que quando uma função (caller) chama outra função (callee), o callee espera que seus argumentos estejam em R0-R3. Da mesma forma, o caller espera que o retorno do callee esteja em R0. R4-R11 precisam ser preservados entre uma chamada e outra. R12 é o scratch register e pode ser usado livremente.

Não à toa, quando uma exceção ocorre o core salva (PUSH) na stack os registradores R0-R3, LR, PC e xPSR da função que foi interrompida, e no retorno os lança (POP) novamente nos registradores do core. Se trocarmos de contexto, isto é, após a interrupção não retornarmos ao mesmo ponto do programa que foi interrompido, precisamos explicitamente salvar o restante da stackframe para que a thread seja resumida de forma íntegra. É fundamental seguir o AAPCS se quisermos evocar funções escritas em assembly em código C e vice-versa.

Para executar chamadas ao sistema defini uma função macro em C que recebe o código do SVC e os argumentos para a callback (a sintaxe de assembly inline depende do compilador utilizado).

(Há uma razão de eu ter criado uma macro e não uma função comum: tem a ver com o ponto de retorno à user thread e o fato de os callbacks do kernel não serem executados dentro da exceção, o que exige a troca de contexto. Se criasse uma função comum para o system call, o stack pointer do usuário seria salvo dentro da chamada, e ao retornar do kernel, o SVC seria novamente executado.)

#define SysCall(svc_number, args) {					   				      
                                                                        \
	__ASM volatile ("MOV R0, %0 "     :: "r"            (args) );     \
	__ASM volatile ("svc %[immediate]"::[immediate] "I" (svc_number) : );   \
}

O valor de args é armazenado em R0. A chamada do SVC é feita com o imediato “svc_number”. Quando o SVC é disparado R0-R3 serão automaticamente salvos na stack. O código foi escrito da seguinte forma, sem economizar instruções, para clareza:

.global SVC_Handler
.type	SVC_Handler, %function
 SVC_Handler:
 MRS R12, PSP		 //salva psp
 CMP LR, #0xFFFFFFFD
 BEQ KernelEntry
 B   KernelExit

//salva contexto do usuário
KernelEntry: 
MRS	R3, PSP
STMDB	R3!, {R4-R11}
MSR	PSP, R3
LDR	R1,=RunPtr
LDR	R2, [R1]
STR	R3, [R2]	
LDR	R3, =#0  
STR	R3, [R1, #16] //kernel flag = 0
MOV	R0, R12   //psp da chamada no r0 do CORE pra recuperar svc number
B svchandler_main //branch para rotina em C 
KernelExit:
//recarrega frame do usuário
LDR	R0, =RunPtr
LDR	R1, [R0]
LDR	R2, [R1]
LDMIA	R2!, {R4-R11}
MOV	LR, #0xFFFFFFFD
MSR	PSP, R2
LDR	R12, =#1 //kernel flag = 1
STR	R12, [R1, #16]
BX	LR 

O restante da rotina para entrada no kernel é escrito em C [2, 3]. Perceba que na rotina escrita em assembly um branch simples ocorre (linha 20) e portanto ainda não retornamos do handler de exceção.

#define SysCall_GPIO_Toggle  1 //codigo svc para gpio toggle
#define SysCall_Uart_PrintLn 2 //codigo svc para uart print line

void svchandler_main(uint32_t * svc_args)
{		
    uint32_t svc_number;
    uint32_t svc_arg0;
    uint32_t svc_arg1;
    svc_number = ((char *) svc_args[6])[-2]; // recupera o imediato 
    svc_arg0 = svc_args[0];
    svc_arg1 = svc_args[1]; 
 
 switch(svc_number)
 {
 case SysCall_GPIO_Toggle: 
	k_stacks[RunPtr->pid][STACK_SIZE-2] = (int32_t)SysCallGPIO_Toggle_; //PC
	k_stacks[RunPtr->pid][STACK_SIZE-8] = (int32_t)svc_arg0; //R0
	k_stacks[RunPtr->pid][STACK_SIZE-1] = (1 << 24); // T=1 (xPSR)
	__ASM volatile ("MSR MSP, %0" : : "r" (RunPtr->ksp) : );
	__ASM volatile ("POP {R4-R11}");
	__ASM volatile ("MOV LR, #0xFFFFFFF9");
	__ASM volatile ("BX LR"); //retorna da excecao
	break;
 case SysCall_Uart_PrintLn: 
	k_stacks[RunPtr->pid][STACK_SIZE-2] = (int32_t)SysCallUART_PrintLn_; 
	k_stacks[RunPtr->pid][STACK_SIZE-8] = (int32_t)svc_arg0;
	k_stacks[RunPtr->pid][STACK_SIZE-1] = (1 << 24); // T=1
	__ASM volatile ("MSR MSP, %0" : : "r" (RunPtr->ksp) : );
	__ASM volatile ("POP {R4-R11}");
	__ASM volatile ("MOV LR, #0xFFFFFFF9");
	__ASM volatile ("BX LR"); //retorna da excecao
	break;
 default:
	__ASM volatile("B SysCall_Dummy");
	break;
 break;
 }
}

O svc_number, por sua vez, é recuperado ao andarmos dois bytes (por isso o cast para char) descrescentes a partir endereço do PC que está 6 posições acima de R0 na stack [1, 2, 3]. Note que foi preciso assinalar a R0 o valor contido em PSP logo após a entrada na interrupção, antes de salvarmos o restante da stack (linhas 4 e 19 do código assembly).

Após recuperar o número do SVC e os argumentos, inicializamos a stack do kernel, MSP é sobrescrito com o valor armazenado no TCB, mudamos o valor de LR para que a exceção ao retornar vá para o modo base, pois não iremos executar o callback dentro do handler. Quando a instrução BX LR é executada o restante do stackframe é automaticamente ativado nos registradores do core.

Um callback tem a seguinte cara:

static void SysCall_CallBack_(void* args)
{
	BSP_Function((int32_t) args); //funcao do BSB com unico argumento int32
	exitKernel_(); // //qualquer chamada ao svc aqui ira sair do kernel (Figura 5)
}

6.4. Start-up

O start-up é um ponto crítico. O sistema inicializa em modo base. As stacks são montadas. A primeira tarefa a ser executada pelo kernel após a inicialização do sistema é configurar o SysTick, mudar para o modo de usuário e disparar a primeira thread de usuário.

//inicializacao da primeira stack
tcb_t* RunPtrStart;
RunPtrStart = tcb[0];
void kFirstThreadInit(void)
{
	kSetInitStack(0); /* esta função segue o mesmo padrão da última postagem */
	k_stacks[0][STACK_SIZE-2] = (int32_t) UsrAppStart; // LR = UsrAppStart
}
// RunPtr e o restante dos tcbs é configurado da mesma forma da última publicacao 

As rotinas em assembly para o startup são as seguintes:

.equ SYSTICK_CTRL, 0xE000E010 
.equ TIME_SLICE,	999

.global kStart  // esta é a função principal de init 
.type kStart, %function
kStart:
LDR	R0, =RunPtrStart
LDR	R1, [R0]
LDR	R2, [R1,#4]
MSR	MSP, R2	  // MSP <- RunPtr.ksp
POP	{R4-R11}  //carrega stackframe 0 na callstack
POP	{R0-R3}
POP	{R12}
ADD	SP, SP, #4
POP	{LR}	 //LR <- PC = UsrAppStart
ADD	SP, SP, #4
BX	LR // vai para UsrAppStart

//esta função prepara a stack para disparar a primeira user thread
.global UsrAppStart 
.type	UsrAppStart, %function
UsrAppStart:				
LDR	R1, =RunPtr //R1 <- RunPtr
LDR	R2, [R1]		
LDR	R2, [R2]
MSR	PSP, R2
BL	SysTickConf //configura systick
MOV	R0, #0x3
MSR	CONTROL, R0 //habilita thread unprivileged mode
ISB		    /* inst sect barrier: garante que CONTROL 
                     estara atualizado nas proximas instrucoes*/
POP	{R4-R11}   //carrega stackframe 0 na callstack
POP	{R0-R3}
POP	{R12}
ADD	SP, SP, #4
POP	{LR}	   //LR <- PC
ADD	SP, SP, #4
BX LR
	
SysTickConf:
LDR	R0, =SYSTICK_CTRL 
MOV	R1, #0
STR	R1, [R0]  // zera contador
LDR	R1, =TIME_SLICE  
STR	R1, [R0,#4] // RELOAD <- TIME_SLICE
STR	R1, [R0,#8] // CURR_VALUE <- TIME_SLICE (mas tanto faz)
MOV	R1, #0x7   // 0b111:
		    // 1: Clock source = core clock 
		    // 1: Habilita irq
		    // 1: Habilita contador
STR	R1, [R0]		
BX	LR	    //volta pro caller

7. Teste

Para fazer um pequeno teste, vamos escrever na tela do PC através da UART. O callback para chamada de sistema foi escrito da seguinte forma:

static void SysCallUART_PrintLn_(const char* args)
{
	__disable_irq();
	uart_write_line(UART, args);		
	while (uart_get_status(UART) != UART_SR_TXRDY); //espera fim da transmissão
	__enable_irq();
	exitKernel_();
}

É preciso tomar cuidado na hora de utilizar multithreading concorrendo para utilizar serviços de hardware, já que ainda não inserimos nenhum mecanismo de semáforo. Entretanto, fiz a operação atômica e não será interrompida pelo SysTick. O programa principal é o seguinte:

#include <commondefs.h> //board support package, libs padrão, etc.
#include <kernel.h>  
#include <tasks.h>

int main(void)
{
  kHardwareInit(); //configura clock, interrupcoes, uart, entre outros
  kAddThreads(Task1, (void*)"Task1\n\r", Task2, (void*)"Task2\n\r", Task3, (void*)"Task3\n\r");
  RunPtrStart = &tcbs[0]; 
  RunPtr = &tcbs[1];
  uart_write_line(UART, "Inicializando kernel...\n\r");
  delay_ms(500); //delay para dar tempo de tirar o printscreen da tela 😛
  kStart();	
  while(1);
}

As tasks (threads principais) têm a seguinte cara:

void Task1(void* args)
{
	const char* string = (char*)args;
	while(1)
	{
		SysCall(SysCall_Uart_PrintLn, string);
	}
}

Na figura abaixo o sisteminha em execução:

8. Conclusões

A utilização de dois stack pointers, um para aplicação e outro para o kernel isola estes espaços não permitindo que a aplicação corrompa a stack do kernel. Os privilégios por sua vez evitam que a aplicação sobrescreva registradores especiais. Adicionar mais um stack pointer ao sistema exigiu mudanças na rotina do escalonamento porque agora manipulamos duas stacks no domínio de 2 stack pointers distintos, em que ambos estão sujeitos à preempção. Além disso, também foi adicionado um mecanismo cooperativo para que as tarefas do kernel liberem o processador para o usuário.

O mecanismo de system calls é utilizado como ponto de entrada a serviços de hardware, ou ao que mais julgarmos crítico para a segurança e estabilidade do sistema. Isto fará ainda mais sentido ao separarmos não só as stacks em níveis de privilégio mas também as regiões da memória com a MPU.

Para as próximas publicações ainda iremos: 1) incluir mecanismos de semáforos e IPCs, 2) configurar a MPU e 3) adicionar níveis de prioridades às tarefas que até então rodam em um esquema round-robin fixo.

9. Referências

[1] http://infocenter.arm.com/help/topic/com.arm.doc.ddi0337e/DDI0337E_cortex_m3_r1p1_trm.pdf

[2] The definitive Guide to ARM Cortex M3, Joseph Yiu

[3] https://developer.arm.com/docs/dui0471/j/handling-processor-exceptions/svc-handlers-in-c-and-assembly-language

Um primeiro kernel preemptivo no ARM Cortex-M3

In English

1 Introdução

Quando se fala em sistemas operativos para software embarcado, em geral pensa-se que eles são um overkill (implementação cujo custo não compensa) para a maioria das soluções. No entanto entre uma aplicação completamente “hosted” (que faz uso de um SO multithread e de propósito geral) e outra completamente “standalone” ou “bare-metal” há muitas variações.

Em publicações anteriores eu explorei a noção de tarefas cooperativas, desde um loop fazendo chamadas a partir de um vetor de ponteiros para funções, até algo um pouco mais complexo com os processos sendo manejados em um buffer circular, com critérios temporais explícitos. Desta vez vou mostrar uma implementação preemptiva mínima, para também abrir o caminho para algo um pouco mais complexo, como nas outras publicações.

2 Preemptivo versus Cooperativo

Em um sistema cooperativo o processador não realiza a troca de contexto entre uma tarefa e outra, sendo necessário que a tarefa por si só libere o processador para a próxima ocupá-lo. Não há nada de errado nisto, este esquema Run-To-Completion é suficiente e/ou necessário para muitas aplicações, e muitos sistemas embarcados são assim executados, inclusive alguns muito complexos. Mais antigamente até sistemas não-embarcados utilizavam-se de kernel cooperativo (Windows 3.x, NetWare 4.x, entre outros). Se uma tarefa travar, o sistema todo fica comprometido quando falamos de um modo estritamente cooperativo (portanto em um sistema operacional para servidores como era o NetWare, isto não parece bom).

No modo preemptivo, tarefas são interrompidas e posteriormente resumidas – i.e, um contexto (conjunto de estados dos registradores do processador) é salvo e depois recuperado. Isto leva a maior complexidade na implementação do sistema mas, se bem implementado, aumenta a robustez e a possibilidade de atender a requisitos de tempo mais estreitos.

3 Call stack

Um processador é, para todos os efeitos, uma máquina de estados. Com alguma simplificação, cada estado do nosso programa pode ser definido como o conjunto de valores dos registradoresdo core. Este conjunto dita qual rotina do programa está ativa. Portanto, ativar uma tarefa significa lançar valores na call stack de forma que esta tarefa será processada. Para trocar de contexto é necessário salvar os dados da call stack naquele ponto do programa. Estes dados “congelados” representam um estado do programa e portanto são chamados de stack frame.

Para resumir uma tarefa, um stack frame previamente salvo é carregado novamente na call stack – os registradores reais do core.

No ARM Cortex-M3, registadores de 32-bit que definem o estado ativo do processador são: R0-R12 para uso geral e R13-R15 registos de uso especial.

3.1. Arquiteturas load-store

Uma arquitetura do tipo load-store é aquela em que um dado da memória precisa de ser carregado (load) aos registradores do core antes de ser processado. Também, o resultado deste processamento antes de ser armazenado (store) na memória deve estar em um registo. Na figura abaixo (retirada de[1]) a arquitetura do processador: pipeline de 3 estágios, barramentos de dados e controle distintos, e seu banco de registros representado abaixo.

As duas operações básicas de acesso a memória no Cortex-M3:

// lê o dado contido no endereço apontado por Rn + offset e o coloca em Rn.
LDR Rd, [Rn, #offset]
//armazena dado contido em Rn no endereço apontado por Rd + offset
STR Rn, [Rd, #offset]

É importante ainda compreender no mínimo as instruções assembly do Cortex-M3 mostradas abaixo. Sugiro as fontes [1] e [2] como boas referências, além deste ou este link.

MOV Rd, Rn // Rd = Rn
MOV Rd, #M// Rd = M, sendo o imediato um valor de 32 bit (aqui representado por M)
ADD Rd, Rn, Rm  //Rd = Rn + Rm
ADD Rd, Rn, #M //Rd = Rn + M
SUB Rd, Rn, Rm //Rd = Rn - Rm
SUB Rd, Rn, #M //Rd = Rn - M
// pseudo-instrucao para salvar Rn em uma posicão da memória.
// Após um PUSH, o valor do stack pointer é decrementado em 4 bytes, e o
PUSH {Rn} 
//Ao contrário do PUSH, o POP incrementa o SP após carregar o dado em Rn.
POP  {Rn} 
B label //pula para a rotina label
BX Rm //pula para a rotina especificada indiretamente por Rm
BL label //pula para label e passa endereco da prox. instrucao ao LR
         //para retornar LR=Link Register 
CPSID I //habilita interrupções
CPSIE I //desabilita interrupções

Vamos operar o M3 no modo Thumb, onde as instruções têm na verdade 16 bits. Segundo a ARM, isto é feito para melhorar a densidade de código mantendo os benefícios de uma arquitetura 32-bit. O bit 24 do PSR é sempre 1.

3.2. Stacks e o stack pointer (SP)

Stack é um modelo de uso da memória. Seu funcionamento se dá no formato Last InFirst Out (último a entrar, primeiro a sair). É como se eu organizasse uma pilha de documentos para ler. É conveniente que o primeiro documento a ser lido esteja no topo da pilha, e o último ao final.

Normalmente dividimos a memória entre heap e stack. Como dito, na “call stack” estarão contidos aqueles dados temporários que determinam o próximo estado do processador. No heap estão armazenados dados cuja natureza não é temporária no curso do programa (isto não significa “não-volátil”). O stack pointer é uma espécie de pivô que mantém o controle do fluxo do programa, ao apontar para alguma posição da stack.

Figura 3. Modelo de uso de uma stack. Ao salvarmos dados antes de processá-los (transformá-los) guardamos a informação anterior. (Figura de [1])
Figura 4. Regiões da memória mapeada em um Cortex M3. A região disponível para a stack está confinada entre os endereços 0x20008000 e 0 0x20007C00. [1]

4 Multitarefas no ARM Cortex M3

O M3 oferece dois stack pointers (Main Stack Pointer e Process Stack Pointer) para isolar os processos do usuário dos processos do kernel. Todo serviço de interrupção é executado no modo kernel. Não é possível ir do modo usuário ao modo kernel (na verdade chamados de thread mode e privileged mode) sem passar por uma interrupção – mas é possível ir do modo privilegiado ao modo usuário alterando o registro de controle.

Figura 5. Troca de contexto em um SO que isola a aplicação do kernel [1]

O core também tem hardware dedicado para a troca de tarefas. O serviço de interrupção SysTick pode ser usado para implementar a troca de contextos síncrona. Ainda existem outras interrupções assíncronas por software (traps) como o PendSV e o SVC. Assim, o SysTick é utilizado para as tarefas síncronas no kernel, enquanto o SVC serve às interrupções assíncronas, quando a aplicação faz uma chamada ao sistema. O PendSV é uma trap (interrupção via software) que por padrão só pode ser disparada também no modo privilegiado. Normalmente sugere-se [1] ativá-la no SysTick, porque assim é possível manter o controle dos ticks para atender aos critérios de tempo. A interrupção por SysTick é logo servida, não correndo-se riscos de perder algum tick do relógio. Uma implementação de S.O. seguro, utilizaria os dois stack pointers para separar threads de usuário e kernel, além de também separar os domínios de memória se houver uma MPU (Memory Protection Unit) disponível.

Figura 6. Leiaute da memória de dados em um SO com dois stack pointers e memória protegida [1]

Em um primeiro momento iremos utilizar somente o MSP em modo privilegiado.

5. Construção do kernel

Kernel é um conceito um tanto amplo, mas creio que não exista um S.O. cujo kernel não seja o responsável pelo escalonamento das tarefas. Além disso, minimamente deve haver também mecanismos de IPC (comunicação inter-processos). É interessante notar a forte hardware-dependência do escalonador que será mostrado, devido à sua natureza low-level.

5.1. Stackframes e troca de contexto

Lembre-se: call stack = valores armazenados no banco do core; stack ou stackframe = estado (valores) destes registradores salvos na memória.

Quando um SysTick é atendido, parte da call stack é salva pelo hardware (R0, R1, R2, R3, R12, R14 (LR) e R15 (PC) e PSR). Vamos chamar esta porção salva pelo hardware de hardware stackframe. O restante é o software stackframe [3], que devemos explicitamente salvar e recuperar com as instruções PUSH e POP.

Para pensar nosso sistema, podemos esquematizar uma troca de contexto completa, delineando as posições-chave que o stack pointer assume durante a operação (na figura abaixo os endereços de memória aumentam, de baixo pra cima. Quando SP aponta para R4 está alinhado com um endereço menor que o PC da sua stack)

Figura 7. Troca de contexto. A tarefa ativa é salva pelo hardware e pelo kernel. O stackpointer é reassinalado conforme critérios, apontado para o R4 do próximo stackframe a ser ativado. Os dados são resgatados. A tarefa é executada. (Figura baseada em [3])

Quando uma interrupção ocorre, SP estará apontando para o topo do stack (SP(O)) a ser salvo. É inevitável que seja assim porque é assim que o M3 funciona. Numa interrupção o hardware irá a salvar os primeiro 8 registradores mais altos da call stack nos 8 endereços abaixo do stack pointer, parando em (SP(1)). Quando salvarmos os registradores que restam, o SP agora estará apontando para o R4 da stack atual (SP(2)). Ao reassinalarmos o SP para o endereço que aponta ao R4 do próximo stack (SP(3)), o POP joga os valores de R4-R11 à call stack e o stack pointer agora está em (SP(4)). Finalmente, o retorno da interrupção resgata os valores do hardware stackframe à call stack, e o SP(5) está no topo da stack que acabou de ser ativada. (Se estiver se perguntando onde está o R13: ele armazena o valor do stack pointer )

A rotina para troca de contexto é escrita em assembly e implementa exatamente o que está descrito na Figura 7.

Figura 8. Troca de contexto

PS: Quando uma interrupção ocorre, o LR assume um código especial. 0xFFFFFFF9, se a thread interrompida estava a utilizar o MSP ou 0xFFFFFFFD se a thread interrompida utilizava o PSP.

5.1 Inicializando as stacks para cada tarefa

Para que a estratégia acima funcione, precisamos inicializar as stacks de cada tarefa, de acordo. O sp começa apontando para R4. Este é por definição o stack pointer inicial de uma task, pois é o endereço mais baixo de um frame.

Além disso, precisamos criar uma estrutura de dados que aponte corretamente para as stacks que serão ativadas a cada serviço de SysTick. Normalmente chamamos esta estrutura de TCB (thread control block). Por enquanto não utilizamos nenhum critério de escolha e portanto não há parâmetros de controle além do next: quando uma tarefa é interrompida, a próxima da fila será resumida e executada.

Figura 9. Estruturas para controle das threads

A função kSetInitStack inicializa a stack de cada thread i“. O stack pointer na TCB associada aponta para o dado relativo ao R4. Os dados da stack são inicializados com o número do registro a que devem ser carregados para facilitar o debug. O PSR só precisa ser inicializado com o bit 24 em 1, que é o bit que identifica o modo Thumb. As tarefas são do tipo void Task(void* args).

Figura 10. Inicializando a stack

Para adicionarmos uma tarefa à stack, precisamos inicialmente do endereço da função principal da task. Além disso, também passaremos um argumento. O primeiro argumento fica em R0. Se mais argumentos forem necessários outros registradores podem ser usados, conforme o AAPCS (ARM Application Procedure Call Standard).

Figura 11. Rotina para adicionar as tasks e seus argumentos no stackframe inicial

5.3. Inicializando o kernel

Não basta inicializar as stacks e esperar a interrupção do SysTick. O sp da estrutura TCB só guardará um valor válido de stack pointer quando a tarefa for interrompida.

Num sistema preemptivo, temos dois tipos de threads a rodar: background e foreground. O background comporta as rotinas do kernel, entre elas, a troca de contexto. A cada SysTick, é vez do kernel utilizar o processador. No foreground estarão as aplicações.

Se a tarefa já não tiver sido executada, o stack pointer guardado em sp não será válido. Assim precisamos fazer parecer que a tarefa foi executada, interrompida e guardada – para ser reativada depois. Eu usei a seguinte estratégia:

  1. Uma interrupção é forçada (PendSV). Hardware stackframe inicial é salvo.
  2. tcb[0].sp é carregado em SP. SP agora tem o endereço do dado R4 da stackframe
  3. O R4R11 do core são carregados com os valores da stackframe inicializada.
  4. ISR retorna, recupera o hardware stack frame e o SP estará no topo da stack. O PC agora está carregado com o endereço da primeira chamada a ser feita, e o programa segue o fluxo.
Figura 12. Serviço de interrupção do PendSV para inicializar kernel

Em [2] é sugerido uma outra forma de inicializar o kernel, bem mais didática:

Figura 13. Rotina para inicializar o kernel

Dispensa-se a interrupção e a call stack é carregada ativando o LR com o valor do PC da stack. Após finalmente levar o SP ao topo da stack, o BX LR executa a task e retorna.

Se utilizarmos o primeiro método apresentado, kStart fica simplesmente:

// com a lib CMSIS
void kStart(void) 
{ 
   SCB->ICSR |= SCB_ICSR_PENDSVSET_Msk; 
} 

6. Juntando as peças

Para ilustrar vamos executar, em Round-Robin, 3 tarefas que chaveiam a saída de 3 diferentes pinos e incrementam um contador cada. O tempo entre uma troca de contexto e outro será de 1000 ciclos do relógio principal. Perceba que estas funções rodam dentro de um “while(1) { }”. É como se tivéssemos vários programas principais sendo executados no foreground. Cada stack tem 64 elementos de 4 bytes (256 bytes).

Figura 14. Tasks do sistema

Abaixo a função main do sistema. O hardware é inicializado. As tarefas são adicionadas e as stacks inicializadas com a função kAddThreads. O RunPtr recebe o endereço da thread 0. Após configurar o SysTick para disparar a cada 1000 ciclos de clock, inicializa-se o kernel. Depois de executar a primeira tarefa e ser interrompido, o sistema fica quicando entre uma tarefa e outra, com a troca de contexto a rodar no background.

Figura 15. Programa principal

6.1. Debug

Você vai precisar pelo menos de um simulador para implementar mais facilmente o sistema, pois precisará acessar os registradores do core (call stack) e ver os dados movimentando-se nas stacks. Se o sistema estiver a funcionar, a cada pausa do debugger os contadores devem ter praticamente o mesmo valor.

Na foto abaixo, utilizo uma placa Arduino Due com o processador Atmel SAM3X8E e um debugger Atmel ICE conectado na JTAG da placa. No osciloscópio pode-se ver as formas de onda das saídas chaveando de cada uma das 3 tasks.

Figura 16. Bancada para debug
Figura 17. Tasks 1, 2, 3 no osciloscópio.

7 Considerações finais

A implementação de um kernel preemptivo exige razoável conhecimento da arquitetura do processador a ser utilizado. Carregar os registradores da call stack e salvá-los de forma “artesanal” nos permite ter um maior controle do sistema às custas da complexidade de manejarmos as stacks.

O exemplo aqui apresentado é um exemplo mínimo onde cada tarefa recebe o mesmo tempo para ser executada. Após esse tempo, a tarefa é interrompida e suspensa – os dados da call stack são salvos. Este conjunto salvo chamamos de stackframe – uma “fotografia” da ponto do programa que estava a ser executado. A próxima tarefa a ser executada é carregada do ponto de onde parou e resumida. O código foi escrito de forma a explicitar os conceitos.

Na próxima publicação iremos adicionar prioridade às tarefas e separar as threads entre modo usuário e modo privilegiado – utilizando os dois stack pointers – requisito fundamental para um sistema mais seguro.

Referências

O texto desta postagem bem como as figuras não referenciadas são do autor.
[1] The definitive guide to ARM Cortex-M3 , Joseph Yiu
[2] Real-Time Operating Systems for the ARM Cortex-M, Jonathan Valvano
[3] https://www.embedded.com/taking-advantage-of-the-cortex-m3s-pre-emptive-context-switches/

SOA para diminuição da hardware-dependência em C

aa480021.aj1soa01(en-us,msdn.10)
(MS Software Architecture Journal, 2004)

1. Impacto do software hardware-dependente (HdS)

É dito na literatura que o custo do software embarcado tem dominado o projeto de sistemas eletrônicos [1]. Sistemas embarcados, tradicionalmente limitados em memória e processamento, conseguem hoje rodar aplicações mais complexas fruto do avanço no projeto e fabricação de circuitos integrados. Ora, a introdução de hardware mais complexo permite a utilização de recursos de programação também mais complexos. De fato, mais de 90% das inovações na indústria automotiva da última década são decorrência direta do desenvolvimento em software embarcado. A especialização do dispositivo leva à especialização do software e ao conceito de software hardware-dependente:

  • é especialmente desenvolvido para um bloco de hardware específico: o software é inútil sem aquele hardware
  • software e hardware juntos implementam uma funcionalidade: isto é, o hardware é inútil sem aquele software

Muito provavelmente ao ler esta descrição pensaste em um sistema operacional para determinada plataforma e seus controladores (drivers). Abaixo segue um diagrama em camadas de um típico sistema embarcado. Conforme subimos as camadas, menos hardware-dependente será o nosso software, de forma que para escrevermos em linguagem interpretada, digamos, eu não preciso tomar conhecimento do microprocessador, e muito menos das tensões de operação dos meus dispositivos. O HAL (camada de abstração de hardware) é onde encontra-se o software mais hardware-dependente, i.e., mudanças no hardware invariavelmente implicarão em mudanças no software desta camada (Figura 1).

hal

Figura 1. Componentes e camadas de um típico sistema embarcado [ECKER]

2. Orientação a objetos para aumentar a coesão

O paradigma de orientação a objetos parte de um conceito muito intuitivo que é particionar um sistema em objetos classificados consoante à sua natureza. Não é necessário muito para perceber como isto aumenta o reuso e a portabilidade. A utilização do paradigma aumenta a coesão dos componentes de nossa arquitetura na medida da nossa habilidade em pensar de maneira orientada a objetos. Ao contrário do domínio de software aplicativo, em software embarcado sobretudo quando os requisitos são bastante específicos e o hardware limitado, a portabilidade e reuso esbarram na hardware-dependência. Se a OO é um bom paradigma para escrever software coeso, muitas vezes nestes domínio este recurso está indisponível. Podemos utilizar o C para escrever em um (meta-)padrão orientado a objetos, se soubermos fazer bom uso de ponteiros.

3. Exemplo de arquitetura orientada a serviços em C

Como exemplo vou descrever uma arquitetura para desacoplar a aplicação do hardware e que se beneficia dos conceitos de OO, entre eles o polimorfismo. O objetivo é escrever software para comunicação serial coeso o suficiente para quando o hardware mudar (ou for definido), somente o código mais fortemente hardware-dependente precise ser adaptado (ou escrito). O conceito de arquitetura orientada a serviços está presente em sistemas operativos baseados em microkernel (dois exemplos extremos: Windows NT e MicroC/OS). A arquitetura proposta está representada na Figura 2. Todo o software representado daqui em diante refere-se somente à camada de Serviços. A camada Board Support Package desacopla (ou diminui o acoplamento) entre o HAL e os Serviços; é a API do HAL para o programador dos Serviços.

layers

Figura 2. Layers em uma arquitetura orientada a serviços [do autor]

É uma boa ideia permitir que a aplicação utilize dois únicos métodos para transferir e receber um número de dados via serial, aplicados sob diferentes componentes de hardware: enviar(interface, x_bytes) e receber(interface, x_bytes). Estes são métodos que assumem uma forma ou outra a depender do objeto. Assim chegamos aos conceitos de interface (Classe abstrata), herança e polimorfismo. Abaixo o diagrama de classes da proposta. A assinatura completa dos métodos foi omitida.

class
Figura 3. Diagrama de classes da proposta

O padrão de design Proxy/Server permite que o hardware seja configurado de forma genérica abstraindo seus detalhes específicos. As mudanças no BSP são isoladas pela proxy ao programador da aplicação. O servidor por sua vez sustenta-se nos dados de configuração da proxy e na API do HAL para configurar e inicializar os serviços (this_proxy->this_service).

(Errata: na figura 3 os métodos de construção/inicialização e enviar/receber da classe SPI_proxy estão representados como se fossem iguais ao da UART, porém são métodos distintos.)

3.1. Implementação da Classe Abstrata

Uma interface pode ser percebida como um barramento de funções virtuais que aponta para um método ou outro em tempo de execução. Este apontamento segue por um caminho de endereços até chegar ao método para ser executado naquele contexto (etapa do fluxo do programa). Poderíamos também atribuir através de setters os métodos de cada objeto, i.e., ainda em tempo de compilação.

Começaremos por implementar a estrutura de dados que compreende a classe. Esta estrutura implementa 2 métodos que recebem e enviam frames de bytes. Uma classe Serial_Comm, portanto dá cria a um objeto que contém uma tabela de funções genéricas de enviar e receber. Os métodos públicos da interface são somente construtores e desconstrutores. Os métodos de enviar e receber ficam privados, haja vista que o objeto a ser utilizado pelo programador da aplicação é que vai defini-los. A estrutura com as funções virtuais Serial_Table é declarada mas não definida inicialmente. Mais adiante declaramos que a estrutura é somente leitura (definida em tempo de compilação) contém dois ponteiros para funções, e finalmente as definimos de forma limitada ao escopo. Isto é necessário para que ocorra o alinhamento do endereço desta tabela ao endereço da tabela do objeto que a utiliza em tempo de execução.

Abaixo o cabeçalho do Serial_Comm.

serial_comm_h-e1569800116804.png
Figura 4. Cabeçalho da classe Serial_Comm

No programa .c da interface, uma estrutura SerialTable é inicializada com endereço de funções dummy. A definição destes métodos com o assert(0) é uma forma de acusar um erro de programação em tempo de execução, se estas funções forem chamadas sem a devida sobrescrita.

serial_comm_c
Figura 5. Programa da Classe Virtual

3.2. Implementação do serviço UART

As classes de comunicação serial seguirão este padrão de design. O construtor inicialmente define uma estrutura SerialTable. Perceba que esta estrutura só pode ser definida uma vez, mas os elementos de vtbl podem assumir qualquer valor; i.e., se construímos um objeto que herda esta classe podemos estender os parâmetros e definir o endereço dos métodos, que ficarão ligados a uma e somente uma interface de funções virtuais.

Na estrutura de dados de uma classe herdeira a sua superclasse precisa ser declarada primeiramente. De outro modo não poderemos estender os atributos desta.

Na construção da proxy, uma tabela de funções definidas localmente é o endereço destino que contém a definição dos métodos virtuais da interface.

uart_proxy_hhh.png
Figura 6. Cabeçalho da classe Uart_proxy
uart_proxy_c-e1569704959728.png
uart_proxy_c_2
Figura 7. Programa uart_proxy.c

Um ponto chave é o downcast (linhas 51 e 57) feito nos métodos privados Uart_Send_ e Uart_Get_. Um ponteiro para Uart_proxy é inicializado com o endereço de uma interface Serial_Comm. O alinhamento da memória permite que acessemos os métodos desta proxy a partir desta interface. Além disso como servidor e proxy estão agregados, os metodos do BSP podem ser chamados na proxy. As funções que começam com “_” (_uart_init, _uart_get …) fazem parte deste BSP, cujo cabeçalho uart_driver_api.h está incluído no programa. No BSP podem-se utilizar as mesmas técnicas, para permitir por exemplo, que a definição do endereço do dispositivo a ser construído se dê também por uma interface abstrata que não muda com as características do dispositivo (mapeado em porta ou memória, plataforma alvo, etc.)

uart_server_hh
Figura 8. Cabeçalho da classe Uart_Server

Logicamente, a implementação do serviço para SPI segue o mesmo padrão de design.

4. Utilização dos serviços

Para a utilização dos serviços pela camada de aplicação, basta construir uma proxy com parâmetros de inicialização do dispositivo, neste exemplo UART ou SPI, e utilizar os métodos SendFrame e GetFrame que acessam a interface de cada objeto proxy.

main2.png
Figura 9. Um programa utilizando os Serviços e seus métodos polimórficos
run
Figura 10. Programa em execução

5. Conclusões

Conceitos do paradigma de programação orientado a objetos podem ser implementados, virtualmente, em qualquer linguagem se partirmos da ideia de que classes são estruturas de dados agregadas a um conjunto de funções que fornecem uma interface externa comum. Tanto estes métodos quanto estes dados podem ser privados ou não. No primeiro caso está o conceito de encapsulamento. A alocação destas estruturas com seus valores e funções, é a instância de um objeto. A instanciação de um objeto de uma classe dentro de outra classe implementa a herança. Uma interface que não implementa métodos pode ser utilizada para conectar-se à interface de uma outra classe, o que chamamos de funções virtuais. As funções virtuais podem então assumir uma forma ou outra a depender do fluxo do programa, o que caracteriza o polimorfismo.

Uma arquitetura orientada à serviço eleva a coesão dos componentes e a consequente reutilização e portabilidade, mitigando os problemas gerados pela natural hardware-dependência do software para sistemas embarcados. A orientação a objetos por sua vez é o paradigma natural para a construção de sistemas orientado a serviços. Este artigo demonstrou uma forma de aplicar estes conceitos em sistemas embarcados quando linguagens OO (comumente neste domínio C++ ou Ada) não estão disponíveis.


O texto desta postagem bem como as figuras não referenciadas são do autor.
As seguintes referências foram consultadas:
[1] Hardware-Dependent Software: Principles and Practices. Ecker, Muller, Dommer. Springer, 2009.
[2] Design Patterns for Embedded Systems in C. Bruce Douglass. Elsevier, 2001.
[3] Object-Oriented Programming in C: Application Note. Quantum Leaps. April, 2019.

Abordagens para projeto low power (2/3)

17zhswf3uasrj.jpg

(Um PDA da Nokia com acesso à internet, 1995. Fonte: gizmodo.com)

5 Abordagens low power no nível arquitetural

Penso que desenhar a arquitetura de um projeto é, para todos os efeitos, definir como cada módulo do sistema se comunica com os outros módulos para trabalharem em conjunto, e através de análises, seja pela experiência do projetista ou por resultados de modelos, elaborar a melhor forma de construí-los. Abaixo seguem algumas abordagens utilizadas na indústria para elaborar arquiteturas eficientes em consumo.

5.1 Redução dinâmica da tensão de alimentação e frequência (DVFS)

Apesar dos avanços, o problema entre o poder computacional disponível e o tempo de bateria ainda são os principais desafios da indústria. O seu smartphone aguenta 1 único de dia de uso pesado com navegação, músicas, redes sociais e sinal intermitente? O meu não.

Mas como disse, uso pesado. Quer dizer, o seu smartphone sabe quando mais e quando menos energia serão necessárias para aplicação, reduzindo ou aumentado a tensão e/ou a frequência fornecidas dinamicamente para os periféricos do microprocessador.

Os próprios microprocessadores em geral são especificados de forma que a frequência de operação máxima depende da tensão fornecida. Assim quando a performance não estiver crítica a tensão (e a frequência) podem ser reduzidas. Infelizmente isto não vale para sistemas de tempo real cujo principal requisito é atender aos deadline. Existem soluções que incorporam DVFS entre o escalonador e o disparador do RTOS.

5.2 Múltiplas tensões

Projetar um chip utilizando uma única tensão de alimentação é comum e também tem a penalidade de que os piores caminhos de tempo a serem cumpridos terão a mesma força de drive dos demais. Ou seja, haverá slack times bastante altos para poder compensar os caminhos críticos. Claro que um bom projeto e uma boa ferramenta tentam encontrar um compromisso.

O ponto chave é que se todos os caminhos forem críticos, o consumo global vai ser o menor possível. A forma mais simples é: operar todos os caminhos que não são críticos em tensão mais baixa, e os críticos em tensão mais alta. A penalidade se dará principalmente em área pela adição de layers para comunicar blocos em diferentes tensões.

Formas mais eficientes adicionam inteligência nas ferramentas de projeto para escolher as tensões de cada caminhos. Falando de forma muito simplificada, um algoritmo calcula o slack de uma célula, e decide se ela pode ou não ser substituída por uma célula low voltage. Caso os demais caminhos continuem positivos esta célula é então substituída. Se não, será preciso sacrificar o consumo em detrimento da performance naquela célula.

Ainda, alguns autores sugerem que o caminho crítico de um circuito não é o caminho mais longo, mas sim o caminho mais longo e mais demandado na operação – uma variável a mais. Esta técnica demonstrou alguns ganhos em relação a anterior, e é chamada de PVCS (path-oriented clustered voltage scaling).

É preciso dizer que os layers adicionados para comunicar módulos em diferentes tensões também consumirão energia tanto estática quanto dinâmica. Existem pesquisas com as mais diversas soluções para a utilização de múltiplas tensões, inclusive com técnicas livres de conversores de nível de tensão.

5.3 Clock-gating

Em primeiro lugar é importante dizer que a árvore de clock consome aproximadamente 50% da energia em um circuito integrado digital. Depois, cada vez que um gate é chaveado, energia dinâmica será consumida. Se o dado a ser disponibilizado após o chaveamento é o mesmo que lá está, por que então consumir esta energia? Condições para habilitar ou não o clock são as técnicas chamadas de clock gating.

Está apresentada aqui em nível arquitetural pois, assim como as outras técnicas neste nível, é implementada no projeto através das diretivas de síntese que o projetista lança e das células disponíveis no PDK (Process Design Kit). Entretanto o código em que o hardware é descrito também tem forte influência no que a ferramenta vai conseguir fazer, pois é a partir desta descrição que as melhores ‘condições’ para habilitar ou não o clock serão inferidas.

A forma mais comum de clock-gating, simplesmente compara se a entrada D de um banco de registradores é igual ou não à saída Q.

clockcombinacional

Figura 1: Clock gating combinacional (figura retirada de Mohit Arora)

Na figura acima, uma condição de enable permite ou não que aquele registrador seja ‘clockado’, e o dado segue adiante na pipeline. (perceba que o segundo registrador não tem uma lógica de clock gating representada, e muitos menos relacionada com a primeira)Estima-se que 5-10% de energia dinâmica é salva com essa técnica se implementada combinacionalmente.

Pensando num pipeline onde lógicas são encadeadas entre um banco de registro e outros podemos também reduzir o chaveamento redundante na porção do circuito que está conectada à saída banco de registradores que estão sob clock gating, se toda a cadeia subsequente da pipeline for chaveada levando em conta as condições de enable da anterior. Na literatura costumam chamar esta técnica de “clock-gating avançado”.

clocksequencial

Figura 2: Clock gating sequencial (de Morit Ahora)

5.4 Power gating

Desabilitar completamente um módulo quando ele não está em uso, é consideravelmente importante nas atuais tecnologias onde a componente de consumo estático é dominante. Os módulos são habilitados ou desabilitados conforme a necessidade da aplicação. As chaves utilizadas para habilitar ou desabilitar passagem de corrente para o módulo são comumente chamadas de sleep transistors. As chaves conectadas entre Vdd e o módulo são os chamados ‘headers’ e entre o módulo de Vss são os ‘footers’. A inserção destes sleep transistors insere agora dois grandes domínios de tensão no sistema: um permanente, conectado à fonte de alimentação, e um virtual que é o visto pelo módulo de fato. O maior desafio é projetar uma chave que permita que o domínio de tensão real e virtual sejam muito similares em todas as suas características.

header-footer-swithces

Figura 3: Diagrama de blocos de dois circuitos utilizando sleep transitors, (a) header e (b) footer. (de Mohit Arora)

Não é difícil imaginar que o tamanho do sleep transistor seja bastante considerável (no dedão: ~3x a capacitância a ser driveada). Devido ao seu tamanho, ligar ou desligar um módulo através do seu chaveamento leva um tempo. Assim, não parece uma boa política adicionar estas chaves a módulos que ficarão pouco tempo em idle na operação típica do sistema. Assim, ou fazemos um sistema de power gating de baixa granularidade ou alta granularidade.

No modo fine-grain (alta granularidade) cada módulo tem um sleep transistors que são construídos como parte do PDK e adicionados durante a síntese, o que traz imensas vantagens na facilidade de projeto. A economia de corrente de leakage pode chegar a 10X.

No sistema de baixa granularidade, menos sleep-transitors são disponibilizados na forma de ‘grid’ que ligam ou desligam os domínios de módulos conforme a aplicação. Isto implica em menor overhead de área (e por consequência, menor variação de processo). Nesta abordagem os sleep-transistors são de fato parte das linhas de alimentação do circuito, não células adicionadas na síntese lógica, e portanto mais próximos do projetista de back-end.

Por outro lado, nesta abordagem de grid, com menos sleep-transistors chaveando mais circuitos, teremos mais domínios de power, com maiores variações de IR entre eles, maiores correntes de pico de power-up, que podem, entre outros problemas, ocasionar transições indesejadas em módulos vizinhos, o que vai exigir contra-medidas no back-end. Além de é claro, fritar um circuito cujas linhas foram subdimensionadas.

coarsepower

Figura 4: Representação de um esquema de power-gating de baixa granularidade com sleep transistor do tipo footer (de Mohit Arora)

Na prática não existe uma linha bem definida entre fine-grain e power-grain, e um misto de ambos é utilizado em um projeto real.

5.5 Sub-threshold/near-threshold design

A diminuição da tensão de alimentação (mantendo uma tensão de threshold fixa) resulta na diminuição quadrática do consumo dinâmico, às custas de performance, o que a depender da área de aplicação (sensores biomédicos, para dar um único exemplo) não é um problema.

Indo um pouco além, podemos pensar em utilizar o que seria a corrente de leakage desperdiçada para de fato implementar a lógica do sistema, o que é atingido quando se reduz a tensão de alimentação para um valor menor ou muito próximo de Vth. A corrente de sub-threshold é exponencialmente dependente da tensão no gate. A literatura demonstra algumas reduções de até 20X quando comparados com circuitos operando com (super)-Vth.

O grande problema? A pequeníssima diferença na corrente de um transistor ligado e um desligado, faz com que as variações de processo sejam muito impactantes no circuito construído. As contra medidas partem da arquitetura do sistema e chegam ao nível de transistor.

* * *

A próxima e última publicação vai falar das técnicas em nivel RTL.

Se não concorda, não entendeu, achou muito bom ou muito ruim, comentários são muito bem vindos.

O texto desta publicação é original. As seguintes fontes foram consultadas: The Art of Hardware Architecture, Mohit Ahora Ultra-Low Power Integrated Circuit Design, Niaxiong Nick Tan et al.

Abordagens para projeto low-power (1/3)

iphone-battery

1 Eficiência

Na última publicação sugeri que a Compaq teria percebido a portabilidade como requisito mais que desejável para a computação pessoal. Apesar de os primeiros computadores da companhia terem ~12 kg (!) e serem considerados “portáteis” simplesmente por serem apresentados em uma maleta com alças, ainda precisavam de uma tomada de corrente para funcionar.

Um pouco mais tarde, com a popularização das tecnologias multimídias e dos PDAs (personal digital assistants, ou “palmtops”), a computação portátil começa a precisar de cada vez mais performance, e aí a otimização de consumo passou a ser critério determinante em um projeto. O objetivo é ser eficiente: realizando tanto ou mais trabalho com menos energia, as baterias duram mais. Não sei dizer exatamente quando e nem quem começou as pesquisas no assunto, mas um dos trabalhos acadêmicos mais relevantes na área é datado de 1992 (Low-power CMOS Digital Design, CHANDRAKASAN et. al).

Nesta publicação quero falar um pouco sobre técnicas de baixo consumo e a oportunidade de aplicá-las nas diferentes fases e camadas de abstração de um projeto. Aqui estou assumindo que os circuitos são essencialmente digitais. O projeto low-power para circuitos (integrados) analógicos tem técnicas distintas que podem ser assunto para outra postagem.

2 Consumo estático e dinâmico

Dois componentes gerais de consumo são o consumo estático e o consumo dinâmico. Este refere-se ao consumo ocasionado pelas transições de uma porta lógica e é proporcional à corrente circulante, à frequência de transições e às capacitâncias que são carregadas e descarregadas nessas transições. O consumo estático se refere às correntes que circulam entre Vdd e Gnd mesmo quando os módulos de um dispositivo estão desligados. Com os comprimentos de canais dos transistores cada vez menores, as correntes de leakage passaram a ser uma significativa parcela do consumo total, quando não a componente dominante. Quanto menor o canal de um transistor, menor será a sua tensão de threshold e menor será a diferença entre a corrente de operação e a corrente de leakage. Na verdade, a corrente de leakage aumenta exponencialmente com a diminuição da tensão de threshold.

De maneira geral, para um gate lógico:

Potência total = Potência dinâmica + Potência estática [W]

Potência dinamica = Vdd^2 * Freq * Cl* p [W]

onde p é a probabilidade de uma transição lógica ocorrer.

3 Níveis de abstração de um projeto digital

Fazendo uma analogia com carros, uma Lamborghini Diablo 1991 é um sistema igual a um Renault Clio 2010. Ambos são automóveis, do tipo “carro”, compostos por carroceria, 4 rodas, motor, diferencial, volante, transmissão e etc. Porém arquiteturalmente são projetos radicalmente distintos.

Um sistema é concebido para prover uma solução. No caso dos automóveis o problema a ser resolvido é como deslocar algo de um lugar a outro. Se eu preciso levar uma pessoa de um ponto ao outro, eu posso escolher construir um carro, uma moto ou um patinete (que não é um automóvel!). Enfim, as escolhas em nível de sistema, são aquelas que irão definir o que é o meu projeto e quais são suas características. Estas escolhas terão como ponto de partida os requisitos do produto. Se o propósito é deslocar até 5 pessoas de um ponto ao outro e com economia de energia, o Clio faz muito mais sentido que a Lamborghini, se o requisito for um carro.

De maneira geral, podemos representar a estrutura de um projeto, nos seguintes níveis de abstração:

Nível de Sistema: refere-se a definição do conjunto de módulos de um projeto e suas conexões (microprocessador+RAM+NVM+I/O, etc.)

Nivel Arquitetural: refere-se a forma como são construído os módulos definidos no sistema e como eles interagem: a definição de interfaces, dos protocolos de controle, comunicação, etc. (ex.: microprocessador RISC-V 32-bit, 8KB de RAM, 64KB de memória NAND Flash, transreceptor compatível com NFC, camada MAC comunica-se com a PHY através de uma interface AMBA-PB, etc.)

Nível de Registradores (Register Transfer Level): representação do circuito digital como um conjunto de registros, ULAs, Muxes, contadores, etc. Pode ser chamado de “microarquitetura”.

Nível lógico: mapeamento do RTL como um conjunto de portas lógicas, latches e flip-flops.

Nível de Circuito: a representação elétrica do sistema, através de um esquemático de transistores e outros componentes elétricos.

4 Abordagens low-power no nível de sistema

Quando pensamos em sistemas digitais o consumo geralmente estará relacionado à área e performance. Uma maior performance exige uma operação em alta freqüência e com suficiente força de drive. A área relaciona-secom o tamanho dos dispositivos que por sua vez dita o tamanho das capacitâncias a serem carregadas/descarregadas.

4.1 Faça um SoC

O advento dos SoCs, sistemas inteiramente construídos em um único circuito integrado, possibilitou drástico aumento na eficiência energética. Se falamos de um sistema que está construído na forma de chips e componentes discretos conectados em uma placa, a transformação em SoC pode ser vista como uma técnica para redução de consumo. E das mais eficientes.

4.2 Capriche no co-projeto de hardware+software

O que implementar em hardware e o que implementar em software? Tipicamente as implementações em hardware consumirão menos energia que as suas contrapartes em software.

Podemos simplesmente identificar aquelas rotinas que consomem mais processamento e escolher implementá-las em hardware.

Entretanto em um fluxo de projeto que realmente aproveite a oportunidade de se poder implementar qualquer parte de um sistema em hardware ou software, uma abordagem baseada em modelos deve ser considerada. Existem linguagens como o SystemC ou SystemVerilog que permite a confecção de modelos em alto nível de abstração. Outras ferramentas como SystemVue, Matlab/Simulink são dedicadas a modelar sistemas. Depois de o sistema modelado, as simulações são utilizadas para fazer estimativas da performance e consumo das funcionalidades, e pode-se fazer escolhas direcionadas sobre o que será implementado em hardware ou software. As funcionalidades que consomem mais recursos beneficiarão todo o sistema se forem construídas com arquitetura visando baixo consumo. Ou seja, as análises no nível do sistema guiam nossas escolhas arquiteturais mais adiante.

A validação da arquitetura pode ser feita através de uma abordagem TLM (Transaction-Level Modeling).

lowpower_hwswcodesignflow

4.3 Software eficiente

Idealmente o seu compilador deve conseguir produzir um código objeto otimizado, mas ele não tem nenhuma outra informação a não ser o código que você entrega a ele, puro e duro. Compiladores “system aware” só amanhã.

Quantas instruções tem a task mais executada do sistema? Quantos ciclos de clock cada instrução consome? Se um sistema rodando a 5 MHz acorda o processador a cada segundo para executar 500 instruções, admitindo 1 instrução/ciclo, ao diminuírmos somente uma instrução desta task, estaremos dando uma sobrevida a bateria de ~ 6,5 segundos por ano, num sistema 24/7. Escalone isso para mais MHz de operação e mais instruções economizadas, e veja por si só.

Assim, o nosso código, como regra geral deve evitar primitivas complexas e ser otimizado em desempenho. Considero que conhecer o processador e o compilador, e a utilização de boas práticas de engenharia de software como a refatoração, são as melhores forma de escrever códigos eficientes. Abaixo alguns exemplos:

Código original

Código otimizado

if ((i % 10) == 0 )
{
   // faça algo
}
i++;
If (counter == 10)
{
   // faça algo
   counter=0;
}
else
{ 
   i ++
}
counter++;

A operação ‘módulo’ usualmente toma mais ciclos de instrução. É mais econômico reproduzir o mesmo efeito com operações mais baratas.

Código original

Código otimizado

for(i=0;i<10;i++)
{
   // faça algo 1
}
for(i=0;i<10;i++)
{
   // faça algo 2
}
for(i=0;i<10;i++)
{
   // faça algo 1
   // faça algo 2
}

Uma chamada de loop com inicialização, incremento e comparação é economizada.

Código original

Código otimizado

unsigned int x;
 for (x = 0; x < 100; x++)
 {
     A[x] = B[x];
 }
unsigned int x; 
 for (x = 0; x < 100; x += 5 )
 {
     A[x]   = B[x];
     A[x+1] = B[x+1];
     A[x+2] = B[x+2];
     A[x+3] = B[x+3];
     A[x+4] = B[x+4];
     
 }

No código acima, 100 elementos do vetor A serão copiados para as primeiras 100 posições do vetor B. A segunda implementação faz com que o loop precise ser rodado 20 vezes, ao invés de 100.

(Link externo: este AN da Atmel indica no capítulo 9 algumas formas de otimizar tamanho e tempo de execução para AVRs 8-bit)


 

Na parte 2 vou falar sobre técnicas aplicadas no nível arquitetural (power-gating, clock gating, multi Vdd, multi Vth, DVFS…).

Até mais!

O texto desta publicação é original. As seguintes fontes foram consultadas:
The Art of Hardware Architecture, Mohit Ahora
Ultra-Low Power Integrated Circuit Design, Niaxiong Nick Tan et al.