In this example we have a bounded buffer with operations to Put and Get
characters. A producer thread puts characters into the buffer, and a
consumer thread gets characters.
If the buffer is full, we want the producer to block until there is more
room in the buffer. If the buffer is empty, we want the consumer to
block until there's something in the buffer.
We also want to be able to tell the waiting consumer to stop getting
characters. This requires some way of interrupting the blocked task.
Implementation
The bounded buffer is implemented as an array encapsulated by a protected
type, with a discriminant that indicates the size:
generic
type Item_Type is private;
package Buffers is
type Item_Array is array (Positive range <>) of Item_Type;
protected type Buffer_Type (Size : Positive) is
...
private
Items : Item_Array (1 .. Size);
Get_Index : Positive := 1;
Put_Index : Positive := Size;
Count : Natural := 0;
end Buffer_Type;
end Buffers;
We want the producer to block if the buffer is full, so we implement the
Put operation as a protected entry:
entry Put (Item : in Item_Type);
The entry barrier tests whether the count of the number of items is less
then the buffer size, and if so, copies the item to the array:
entry Put (Item : in Item_Type)
when Count < Size is
begin
Put_Index := Put_Index mod Size + 1;
Items (Put_Index) := Item;
Count := Count + 1;
end;
We implement Get using separate operations for waiting until there's
an item, and actually fetching the item:
entry Wait_For_Item;
procedure Get (Item : out Item_Type);
We've done it this way because we only want to interrupt waiting for an
item, not retrieving one. If we were to interrupt a combined
wait-and-fetch operation, then an item could get lost.
The entry Wait_For_Item has only a barrier to test whether there's an
item in the buffer:
entry Wait_For_Item when Count > 0 is
begin
null;
end;
The protected procedure Get copies the first item in the array, and
increments the index:
procedure Get (Item : out Item_Type) is
begin
Item := Items (Get_Index);
Get_Index := Get_Index mod Size + 1;
Count := Count - 1;
end;
Our consumer abstraction binds to a character buffer via an access
discriminant, and provides an operation to shut down the thread:
package Consumers is
type Consumer_Type (Buffer : access Buffer_Type) is
limited private;
procedure Shutdown (Consumer : in out Consumer_Type);
private
...
end Consumers;
The Consumer_Type has an active thread of control, implemented as a task
component. The type is not implemented as a task directly, because (as
we shall see) it needs another component.
private
task type Consumer_Task_Type (Consumer : access Consumer_Type) is
end;
...
type Consumer_Type (Buffer : access Buffer_Type) is
limited record
...
Consumer_Task : Consumer_Task_Type (Consumer_Type'Access);
end record;
end Consumers;
The Consumer_Task_Type has its own discriminant, to bind to the
enclosing Consumer_Type. This gives the task visibility to the Buffer
and to the other components of the same record.
The consumer task is implemented as a loop that waits for a character,
fetches it, then displays it:
Main:
loop
<wait for character>
Buffer.Get (C);
Put_Line ("Consumed '" & C & "'");
end loop Main;
If we were to implement the wait as a straight call to the buffer's
protected entry:
Buffer.Wait_For_Item;
then there would be no way to interrupt the entry call once in progress.
You can't use a task entry for this purpose, because the task is not
waiting to accept them -- it's blocked on the buffer call.
In order to interrupt the waiting task, we use Asynchronous Transfer of
Control (ATC). By moving the wait to the abortable region,
select
<triggering alternative>
exit Main;
then abort
Buffer.Wait_For_Item;
end select;
then we can interrupt the call when a triggering alternative is
selected, which can be either a delay or an entry call.
The task itself can't handle the shutdown, because it's blocked waiting
for a character from the buffer. So what we do is introduce an internal
protected type to receive the shutdown request from a client, and with
another entry to act as the triggering alternative for the task:
protected type Trigger_Type is
procedure Shutdown;
entry Stop_Consuming;
private
Stop_Requested : Boolean := False;
end Trigger_Type;
The protected procedure Shutdown is just sets a flag that indicates
we're done:
procedure Shutdown is
begin
Stop_Requested := True;
end;
The protected entry Stop_Consuming blocks until the done flag is true:
entry Stop_Consuming when Stop_Requested is
begin
null;
end;
The complete implementation of the Consumer_Type thus looks like this:
type Consumer_Type (Buffer : access Buffer_Type) is
limited record
Trigger : Trigger_Type;
Consumer_Task : Consumer_Task_Type (Consumer_Type'Access);
end record;
The consumer task calls the Stop_Consuming entry of the trigger:
select
Consumer.Trigger.Stop_Consuming;
exit Main;
then abort
Buffer.Wait_For_Item;
end select;
This (asynchronous) select statement will complete either because the
trigger accepts the Stop_Consuming entry, or because the Buffer accepts
the Wait_For_Item entry.
Shutdown is implemented by calling the trigger's protected Shutdown
procedure:
procedure Shutdown (Consumer : in out Consumer_Type) is
begin
Consumer.Trigger.Shutdown;
end;
This sets the done flag to True, which immediately forces the barrier
for Stop_Consuming to be reevaluated. The trigger object accepts the
entry call, which triggers abortion of the Wait_For_Item call already in
progress.
There's one more issue to discuss. Text_IO is not a concurrent
abstraction, and it's possible that simultaneous calls to Put_Line can
produce interleaved output.
The package Message_IO provides a thread-safe version of Put_Line that
synchronizes its callers:
package Message_IO is
procedure Put_Line (Message : in String);
end Message_IO;
The body of Message_IO declares a semaphore that is seized and released
during calls to Put_Line:
package body Message_IO is
Semaphore : aliased Semaphore_Type;
procedure Put_Line (Message : in String) is
Control : Semaphore_Control (Semaphore'Access);
begin
Ada.Text_IO.Put_Line (Message);
end;
end Message_IO;
We use a controlled object to automatically seize and release the
semaphore, which guarantees that the semaphore will get released even if
there's an exception.
Note that we can't just wrap the Text_IO.Put_Line call in a protected
procedure:
protected body Synchronization is
procedure Do_Put_Line (S : String) is
begin
Text_IO.Put_Line (S);
end;
end Synchronization;
because Text_IO.Put_Line is a "potentially blocking call," and it is a
bounded error to invoke a blocking operation from within a protected
operation. See RM95 9.5.1 (8 - 18).
For whatever reason, the test program dumps core as it terminates
because of a segmentation violation. I don't know why...
--STX
package body Binary_Semaphores.Controls is
procedure Initialize (Control : in out Semaphore_Control) is
begin
Control.Semaphore.Seize;
end;
procedure Finalize (Control : in out Semaphore_Control) is
begin
Control.Semaphore.Release;
end;
end Binary_Semaphores.Controls;
with Ada.Finalization;
package Binary_Semaphores.Controls is
pragma Preelaborate;
type Semaphore_Control (Semaphore : access Semaphore_Type) is
limited private;
private
use Ada.Finalization;
type Semaphore_Control (Semaphore : access Semaphore_Type) is
new Limited_Controlled with null record;
procedure Initialize (Control : in out Semaphore_Control);
procedure Finalize (Control : in out Semaphore_Control);
end Binary_Semaphores.Controls;
package body Binary_Semaphores is
protected body Semaphore_Type is
procedure Release is
begin
In_Use := False;
end;
entry Seize when not In_Use is
begin
In_Use := True;
end;
end Semaphore_Type;
end Binary_Semaphores;
package Binary_Semaphores is
pragma Pure;
protected type Semaphore_Type is
procedure Release;
entry Seize;
private
In_Use : Boolean := False;
end Semaphore_Type;
end Binary_Semaphores;
package body Buffers is
protected body Buffer_Type is
entry Put (Item : in Item_Type)
when Count < Size is
begin
Put_Index := Put_Index mod Size + 1;
Items (Put_Index) := Item;
Count := Count + 1;
end;
entry Wait_For_Item when Count > 0 is
begin
null;
end;
procedure Get (Item : out Item_Type) is
begin
Item := Items (Get_Index);
Get_Index := Get_Index mod Size + 1;
Count := Count - 1;
end;
function Is_Full return Boolean is
begin
return Count = Size;
end;
function Is_Empty return Boolean is
begin
return Count = 0;
end;
end Buffer_Type;
end Buffers;
generic
type Item_Type is private;
package Buffers is
type Item_Array is array (Positive range <>) of Item_Type;
protected type Buffer_Type (Size : Positive) is
entry Put (Item : in Item_Type);
entry Wait_For_Item;
procedure Get (Item : out Item_Type);
function Is_Full return Boolean;
function Is_Empty return Boolean;
private
Items : Item_Array (1 .. Size);
Get_Index : Positive := 1;
Put_Index : Positive := Size;
Count : Natural := 0;
end Buffer_Type;
end Buffers;
with Buffers;
package Character_Buffers is new Buffers (Character);
with Message_IO; use Message_IO;
package body Consumers is
procedure Shutdown (Consumer : in out Consumer_Type) is
begin
Consumer.Trigger.Shutdown;
end;
protected body Trigger_Type is
procedure Shutdown is
begin
Stop_Requested := True;
end;
entry Stop_Consuming when Stop_Requested is
begin
null;
end;
end Trigger_Type;
task body Consumer_Task_Type is
C : Character;
Buffer : Buffer_Type renames Consumer.Buffer.all;
begin
Main:
loop
select
Consumer.Trigger.Stop_Consuming;
exit Main;
then abort
Buffer.Wait_For_Item;
end select;
Buffer.Get (C);
Put_Line ("Consumed '" & C & "'");
end loop Main;
while not Buffer.Is_Empty loop
Buffer.Get (C);
Put_Line ("Consumed '" & C & "'");
end loop;
Put_Line ("Consumer done.");
end Consumer_Task_Type;
end Consumers;
with Character_Buffers; use Character_Buffers;
package Consumers is
type Consumer_Type (Buffer : access Buffer_Type) is
limited private;
procedure Shutdown (Consumer : in out Consumer_Type);
private
task type Consumer_Task_Type (Consumer : access Consumer_Type) is
end;
protected type Trigger_Type is
procedure Shutdown;
entry Stop_Consuming;
private
Stop_Requested : Boolean := False;
end Trigger_Type;
type Consumer_Type (Buffer : access Buffer_Type) is
limited record
Trigger : Trigger_Type;
Consumer_Task : Consumer_Task_Type (Consumer_Type'Access);
end record;
end Consumers;
with Binary_Semaphores.Controls;
with Ada.Text_IO;
package body Message_IO is
use Binary_Semaphores;
Semaphore : aliased Semaphore_Type;
procedure Put_Line (Message : in String) is
use Controls;
Control : Semaphore_Control (Semaphore'Access);
begin
Ada.Text_IO.Put_Line (Message);
end;
end Message_IO;
package Message_IO is
procedure Put_Line (Message : in String);
end Message_IO;
with Ada.Text_IO;
with Message_IO; use Message_IO;
with Character_Buffers; use Character_Buffers;
with Consumers; use Consumers;
procedure Test_Buffers is
Buffer : aliased Buffer_Type (Size => 5);
Consumer : Consumer_Type (Buffer'Access);
C : Character;
begin
Put_Line ("Enter characters, and use '0' to indicate you're done.");
loop
Ada.Text_IO.Get (C);
Buffer.Put (C);
exit when C = '0';
end loop;
Put_Line ("Shutting down consumer");
Shutdown (Consumer);
Put_Line ("Shutting down complete; exiting.");
end Test_Buffers;
|