In an earlier article, we implemented a bounded buffer using a protected
type with an entry to put an item, and another entry to get an item.
Here we present another bounded buffer, without the entry for getting an
item, implemented using suspension objects.
The basic idea is that we're going to implement the reader queue
ourselves, instead of letting the runtime system do it.
A reader "registers" with the buffer in order to state his intention to
read, passing in a reference to a suspension object. He then
immediately waits for the suspension to become true. When it does, the
reader calls a protected procedure to get the item.
Reading looks like this:
Buffer.Wait_To_Get (My_Suspension_Object'Access);
Suspend_Until_True (My_Suspension_Object);
Buffer.Get (Item);
The writer's interaction with the buffer is unchanged. He calls a
protected entry to put an item in the queue. It the item queue is full,
then the writer blocks until it becomes not full (because a reader came
along and read one of the items).
Writing thus looks like this:
Buffer.Put (Item);
It's up to us as implementors to decide whether to place an upper limit
on the number of readers that may be waiting to get an item, or to have
no limit at all. However, this implementation would typically be used
in a restricted runtime (a la Ravenscar), so we decide to bound the
maximum number of readers.
Therefore we pass in the size of the item queue and the size of the
reader queue as discriminants of the protected type. The spec of the
buffer type looks like this:
generic
type Item_Type is private;
package Buffers is
...
protected type Buffer_Type
(Size : Positive;
Max_Readers : Positive) is
entry Put
(Item : in Item_Type);
procedure Wait_To_Get
(SO : access Suspension_Object);
procedure Get
(Item : out Item_Type);
private
Item_Queue : Item_Queues.Queue_Type (Size);
Reader_Queue : SO_Queues.Queue_Type (Max_Readers);
end Buffer_Type;
end Buffers;
The Item_Queue, unchanged from our earlier example, is a bounded queue
of items. We keep a writer blocked if the item queue is full, and keep
a reader suspended if the item queue is empty.
The Reader_Queue is a bounded queue of suspension object pointers, which
we use to keep track of who's waiting to get an item.
That's it for the spec. Next up is the implementation of the protected
operations. Obviously it's going to be a little more complex than our
earlier example, because we have to manage the reader queue ourselves.
The protected operation Put is implemented using an entry, which means
it has a barrier expression. Here the barrier is closed if the item
queue is full.
When the barrier opens (the item queue becomes "not full"), we add the
item to the item queue, and then check to see if there are any waiting
readers. If so, we resume the reader at the front of the reader queue.
The protected entry Put looks like this:
entry Put (Item : in Item_Type)
when not Is_Full (Item_Queue) is
begin
Add (Item, To => Item_Queue);
if not Is_Empty (Reader_Queue) then
Set_True (Get_Front (Reader_Queue).all);
Pop (Reader_Queue);
end if;
end Put;
A reader states his intention to read by calling procedure Wait_To_Get,
passing in a pointer to a suspension object. No blocking is performed
directly by the protected object, because this is a protected procedure,
not an entry.
If the item queue is empty, we add the reader (really, the reader's
suspension object) to the reader queue, and suspend the reader.
Otherwise, we just resume the reader, so he doesn't actually do any
waiting. Like this:
procedure Wait_To_Get
(SO : access Suspension_Object) is
begin
if Is_Empty (Item_Queue) then
Add (Item => SO_Access (SO), To => Reader_Queue);
Set_False (SO.all);
else
Set_True (SO.all);
end if;
end Wait_To_Get;
The procedure Get returns the item at the front of the item queue, and
pops the queue. As in Put, some reader queue management is then
performed. If there's a reader waiting to get an item, and there's an
item, then we resume the front-most reader.
procedure Get
(Item : out Item_Type) is
begin
Item := Get_Front (Item_Queue);
Pop (Item_Queue);
if not Is_Empty (Reader_Queue)
and not Is_Empty (Item_Queue)
then
Set_True (Get_Front (Reader_Queue).all);
Pop (Reader_Queue);
end if;
end Get;
Note that the buffer was implemented so that it doesn't depend on the
actual number of writers. This means it can be used in restricted
runtimes like Ravenscar, which limit the maximum number of callers
queued on an entry to one. (The Ravenscar profile also limits the
number of protected entries to one.)
Like our earlier buffer example, this buffer suffers from the problem of
not being able to let a writer announce that no more items will be
placed in the queue. Suspended readers will therefore remain suspended
forever.
However, in a not-as-early article we showed how solve this problem, by
providing another procedure to announce "end of file," and passing back
an EOF flag to the reader. We should be able to use the same technique
here.
--STX
package body Buffers is
use Item_Queues;
use SO_Queues;
protected body Buffer_Type is
entry Put (Item : in Item_Type)
when not Is_Full (Item_Queue) is
begin
Add (Item, To => Item_Queue);
if not Is_Empty (Reader_Queue) then
Set_True (Get_Front (Reader_Queue).all);
Pop (Reader_Queue);
end if;
end Put;
procedure Wait_To_Get
(SO : access Suspension_Object) is
begin
if Is_Empty (Item_Queue) then
Add (Item => SO_Access (SO), To => Reader_Queue);
Set_False (SO.all);
else
Set_True (SO.all);
end if;
end Wait_To_Get;
procedure Get
(Item : out Item_Type) is
begin
Item := Get_Front (Item_Queue);
Pop (Item_Queue);
if not Is_Empty (Reader_Queue)
and not Is_Empty (Item_Queue)
then
Set_True (Get_Front (Reader_Queue).all);
Pop (Reader_Queue);
end if;
end Get;
end Buffer_Type;
end Buffers;
with Ada.Synchronous_Task_Control; use Ada.Synchronous_Task_Control;
with Queues.Bounded;
pragma Elaborate (Queues.Bounded);
generic
type Item_Type is private;
package Buffers is
pragma Elaborate_Body;
package Item_Queues is
new Queues.Bounded (Item_Type);
type SO_Access is
access all Suspension_Object;
for SO_Access'Storage_Size use 0;
package SO_Queues is
new Queues.Bounded (SO_Access);
protected type Buffer_Type
(Size : Positive;
Max_Readers : Positive) is
entry Put
(Item : in Item_Type);
procedure Wait_To_Get
(SO : access Suspension_Object);
procedure Get
(Item : out Item_Type);
private
Item_Queue : Item_Queues.Queue_Type (Size);
Reader_Queue : SO_Queues.Queue_Type (Max_Readers);
end Buffer_Type;
end Buffers;
package body Queues.Bounded is
procedure Add
(Item : in Item_Type;
To : in out Queue_Type) is
begin
pragma Assert (To.Length < To.Size);
To.Items (To.B) := Item;
To.B := To.B mod To.Size + 1;
To.Length := To.Length + 1;
end Add;
procedure Pop
(Queue : in out Queue_Type) is
begin
pragma Assert (Queue.Length > 0);
Queue.F := Queue.F mod Queue.Size + 1;
Queue.Length := Queue.Length - 1;
end;
function Get_Front
(Queue : Queue_Type) return Item_Type is
begin
pragma Assert (Queue.Length > 0);
return Queue.Items (Queue.F);
end;
function Is_Empty
(Queue : Queue_Type) return Boolean is
begin
return Queue.Length = 0;
end;
function Is_Full
(Queue : Queue_Type) return Boolean is
begin
return Queue.Length = Queue.Size;
end;
end Queues.Bounded;
generic
type Item_Type is private;
package Queues.Bounded is
type Queue_Type (Size : Positive) is limited private;
procedure Add
(Item : in Item_Type;
To : in out Queue_Type);
procedure Pop
(Queue : in out Queue_Type);
function Get_Front
(Queue : Queue_Type) return Item_Type;
function Is_Empty
(Queue : Queue_Type) return Boolean;
function Is_Full
(Queue : Queue_Type) return Boolean;
private
type Item_Array is array (Positive range <>) of Item_Type;
type Queue_Type (Size : Positive) is
limited record
Items : Item_Array (1 .. Size);
Length : Natural := 0;
F, B : Positive := 1;
end record;
end Queues.Bounded;
package Queues is
pragma Pure;
end Queues;
|