Appendix

Demo Applications

OmniThreadLibrary distribution includes plenty of demo applications that will help you get started. They are stored in the tests subfolder. This chapter lists all tests.

Examples

OmniThreadLibrary distribution includes some complex examples, stored in the examples subfolder. This chapter lists all examples. Many are also explained in the How-to chapter.

Installing with GetIt

Delphi/RAD Studio XE8 and newer come with an integrated package manager called GetIt.

With GetIt you can install OmniThreadLibrary with just a few clicks. Start Delphi and open Tools, GetIt Package Manager. Enter ‘omnithreadlibrary’ into the search bar. Then click on the INSTALL button under the OmniThreadLibrary graphics.

GetIt will download OmniThreadLibrary from Embarcadero’s servers, add source path to the Library path and compile and install the design package.

To find the demos, look at the Library path. It will contain something like this at the end: $(BDSCatalogRepository)\OmniThreadLibrary_3.07.1-Tokyo\src\. To find the true path, look into Tools, Options, Environment Options, Environment Variables where BDSCatalogRepository is defined.

Removing OmniThreadLibrary from Delphi is equally simple. Just open GetIt, select the Installed category and click the UNINSTALL button under the OmniThreadLibrary graphics.

Installing with Delphinus

Delphinus is a 3rd party package manager for Delphi XE and newer, similar to Embarcadero’s own GetIt package manager.

Unlike GetIt, which comes integrated into Delphi, you have to install Delphinus manually. Recommended installation procedure is:

  1. Stop Delphi/RAD Studio.
  2. Install Delphinus with the web installer.
    1. It is recommended to right-click on the installer and select ‘Run as administrator’. Otherwise Delphinus may not be able to create the installation folder.
  3. Start Delphi/RAD Studio.
  4. (Optional, but highly recommended) Open Tools, Delphinus, click the Settings icon and enter OAuth-Token. This will prevent frequent GitHub ‘rate limitation’ errors during operation. Instructions for generating the token can be found on the Delphinus wiki.

Once Delphinus is installed, click the Tools, Delphinus and in the Delphinus window click the green Refresh icon. When the package list is refreshed, enter ‘omnithreadlibrary’ into the search bar and press <Enter>. Click on the OmniThreadLibrary list item to get additional description in the panel on the right.

To install OmniThreadLibrary, click the Install icon (blue arrow pointing downwards to the disk drive). Be patient as Delphinus may take some time without displaying any progress on the screen.

When installation is complete, click the Show log button and in the log find the path where OmniThreadLibrary was installed (look for Adding libpathes message). Inside that folder you’ll also find all OmniThreadLibrary demos.

Delphinus will also compile and install appropriate package so everything is set up for your.

Removing OmniThreadLibrary from Delphi is equally simple. Just open Delphinus, select the Installed category, select OmniThreadLibrary and click the Remove icon (red circle with white X).

Hooking Into OmniThreadLibrary

The OtlHooks unit allows your code to hook into internal OmniThreadLibrary processes. Currently you can register notification methods which are called when a thread is created/destroyed, a pool is created/destroyed, or an unhandled exception ‘escapes’ from a task.

Exception notifications

Exception filters allow your code to be notified when an unhandled exception in a task occurs. You can also prevent exception from being stored in the IOmniTask.FatalException property.

1 type
2   TExceptionFilterProc = procedure(var e: Exception; var continueProcessing: boolean);
3   TExceptionFilterMeth = procedure(var e: Exception; var continueProcessing: boolean) 
4                          of object;
5 
6 procedure RegisterExceptionFilter(filterProc: TExceptionFilterProc); overload;
7 procedure RegisterExceptionFilter(filterMethod: TExceptionFilterMeth); overload;
8 procedure UnregisterExceptionFilter(filterProc: TExceptionFilterProc); overload;
9 procedure UnregisterExceptionFilter(filterMethod: TExceptionFilterMeth); overload;

Call RegisterExceptionFilter to register a custom exception filter. Call UnregisterExceptionFilter to remove custom exception filter.

Exception filter can use application-specific logging code to log detailed information about application state. It can also free the exception object e and set it to nil, which will prevent this exception to be stored in the FatalException property.

If the filter sets continueProcessing to false, further custom exception filters won’t be called. Filters are always called in the order in which they were registered.

Thread notifications

Thread notifications allow your code to be notified when a thread is created or destroyed inside the OmniThreadLibrary. This allows OmniThreadLibrary to cooperate with application-specific exception-logging code.

 1 type
 2   TThreadNotificationType = (tntCreate, tntDestroy);
 3   TThreadNotificationProc = procedure(notifyType: TThreadNotificationType;
 4                               const threadName: string);
 5   TThreadNotificationMeth = procedure(notifyType: TThreadNotificationType;
 6                               const threadName: string) of object;
 7 
 8 procedure RegisterThreadNotification(notifyProc: TThreadNotificationProc); overload;
 9 procedure RegisterThreadNotification(notifyMethod: TThreadNotificationMeth); overload;
10 procedure UnregisterThreadNotification(notifyProc: TThreadNotificationProc); overload;
11 procedure UnregisterThreadNotification(notifyMethod: TThreadNotificationMeth); overload;

Call RegisterThreadNotification to register a thread notification method. Call UnregisterThreadNotification to unregister such method.

Notification method is always called in the context of the thread being created/destroyed.

For example, the following code fragment registers/unregisters OmniThreadLibrary threads with an application-specific thread logger.

 1 procedure OtlThreadNotify(notifyType: TThreadNotificationType; const threadName: string);
 2 var
 3   name: string;
 4 begin
 5   case notifyType of
 6     tntCreate:
 7       begin
 8         if threadName <> '' then
 9           name := threadName
10         else
11           name := 'unnamed OTL thread';
12         LoggerRegisterThread(name);
13       end;
14     tntDestroy:
15       LoggerUnregisterThread;
16     else
17       raise Exception.Create('OtlThreadNotify: Unexpected notification type');
18   end;
19 end;
20 
21 OtlHooks.RegisterThreadNotification(OtlThreadNotify);

Pool notifications

Pool notifications allow your code to be notified when a thread pool is being created or destroyed. This allows the application to modifiy pool parameters on the fly.

 1 type
 2   TPoolNotificationType = (pntCreate, pntDestroy);
 3   TPoolNotificationProc = procedure(notifyType: TPoolNotificationType;
 4                             const pool: IOmniThreadPool);
 5   TPoolNotificationMeth = procedure(notifyType: TPoolNotificationType;
 6                             const pool: IOmniThreadPool) of object;
 7 
 8 procedure RegisterPoolNotification(notifyProc: TPoolNotificationProc); overload;
 9 procedure RegisterPoolNotification(notifyMethod: TPoolNotificationMeth); overload;
10 procedure UnregisterPoolNotification(notifyProc: TPoolNotificationProc); overload;
11 procedure UnregisterPoolNotification(notifyMethod: TPoolNotificationMeth); overload;

Call RegisterPoolNotification to register a pool notification method. Call UnregisterPoolNotification to unregister such method.

You can, for example, use pool notification mechanism to set Asy_OnUnhandledWorkerException property whenever a thread pool is created.

 1 procedure OtlPoolNotify(notifyType: TPoolNotificationType; const pool: IOmniThreadPool);
 2 begin
 3   case notifyType of
 4     pntCreate:  pool.Asy_OnUnhandledWorkerException := Asy_LogUnhandledOtlWorkerException;
 5     pntDestroy: pool.Asy_OnUnhandledWorkerException := nil;
 6     else        raise Exception.Create('OtlPoolNotify: Unexpected notification type');
 7   end;
 8 end;
 9 
10 OtlHooks.RegisterPoolNotification(OtlPoolNotify);

For Each Internals

This section tries to explain how the ForEach is implemented.

Let’s start with a very simple code.

1 Parallel.ForEach(1, 1000)
2   .Execute(
3     procedure (const elem: integer)
4     begin
5     end);

This simple code iterates from 1 to 1000 on all available cores in parallel and executes a simple procedure that contains no workload. All in all, the code will do nothing - but it will do it in a very complicated manner.

ForEach method creates new TOmniParallelLoop<integer> object (that’s the object that will coordinate parallel tasks) and passes it a source provider - an object that knows how to access values that are being enumerated (integers from 1 to 1000 in this example).

OtlDataManager unit contains four different source providers - one for each type of source that can be passed to the ForEach method. If there is a need to extend ForEach with a new enumeration source, I would only have to add few simple methods to the OtlParallel unit and write a new source provider.

1 class function Parallel.ForEach(low, high: integer; step: integer):
2   IOmniParallelLoop<integer>;
3 begin
4   Result := TOmniParallelLoop<integer>.Create(
5     CreateSourceProvider(low, high, step), true);
6 end;

Parallel for tasks are started in InternalExecuteTask. This method first creates a data manager and attaches it to the source provider (compare this with the picture above - there is one source provider and one data manager). Next it creates an appropriate number of tasks and calls the task-specific delegate method from each one. [This delegate wraps your parallel code and provides it with proper input (and sometimes, output). There are many calls to InternalExecuteTask in the OtlParallel unit, each with a different taskDelegate and each providing support for a different kind of the loop.]

 1 procedure TOmniParallelLoopBase.InternalExecuteTask(
 2   taskDelegate: TOmniTaskDelegate);
 3 var
 4   dmOptions    : TOmniDataManagerOptions;
 5   iTask        : integer;
 6   numTasks     : integer;
 7   task         : IOmniTaskControl;
 8   begin
 9     ...
10     oplDataManager := CreateDataManager(oplSourceProvider,
11       numTasks, dmOptions); 
12     ...
13     for iTask := 1 to numTasks do begin
14       task := CreateTask(
15         procedure (const task: IOmniTask)
16         begin
17           ...
18           taskDelegate(task);
19           ...
20         end,
21         ...
22       task.Schedule(GParallelPool);
23     end;
24     ...
25   end;
26 end;

Data manager is a global field in the TOmniParallelLoop<T> object so that it can be simply reused from the task delegate. The simplest possible task delegate (below) just creates a local queue and fetches values from the local queue one by one. This results in many local queues - one per task - all connected to the same data manager.

In case you’re wondering what loopBody is - it is the anonymous method you have passed to the Parallel.ForEach.Execute method.

 1 procedure InternalExecuteTask(const task: IOmniTask)
 2 var
 3   localQueue: TOmniLocalQueue;
 4   value     : TOmniValue;
 5 begin
 6   localQueue := oplDataManager.CreateLocalQueue;
 7   try
 8     while (not Stopped) and localQueue.GetNext(value) do
 9       loopBody(task, value);
10   finally FreeAndNil(localQueue); end;
11 end;

Let’s reiterate:

All this was designed to provide fast data access (blocking is limited to the source provider, all other interactions are lock-free), good workload distribution (when a task runs out of work before other tasks, it will steal some work from other tasks) and output ordering (when required).

Source Provider

A source provider is an object that fetches data from the enumeration source (the data that was passed to the parallel for) and repackages it into a format suitable for parallel consumption. Currently there are three source providers defined in the OtlDataManager unit.

All source providers descend from an abstract class TOmniSourceProvider which provides common source provider interface. In theory, an interface should be used for that purpose, but in practice source providers are very performance intensive and not using interfaces speeds the program by a measurable amount.

 1 TOmniSourceProvider = class abstract
 2 public
 3   function  Count: int64; virtual; abstract;
 4   function  CreateDataPackage: TOmniDataPackage; virtual; abstract;
 5   function  GetCapabilities: TOmniSourceProviderCapabilities;
 6     virtual; abstract;
 7   function  GetPackage(dataCount: integer; 
 8     package: TOmniDataPackage): boolean; virtual; abstract;
 9   function  GetPackageSizeLimit: integer; virtual; abstract;
10 end;

Not all source providers are created equal and that’s why function GetCapabilities returns source provider capabilities:

1 TOmniSourceProviderCapability = (
2   spcCountable,  // source provider that knows how much data it holds
3   spcFast,       // source provider operations are O(1)
4   spcDataLimit   // data package can only hold limited amount of data
5 ); 
6 
7 TOmniSourceProviderCapabilities = set of
8   TOmniSourceProviderCapability;

TOmniIntegerRangeProvider is both countable (it’s very simple to know how many values are between 1 and 10, for example) and fast (it takes same amount of time to fetch 10 values or 10.000 values) while other two source providers are neither countable nor fast. The third capability, spcDataLimit is obsolete and not used. It was replaced by the GetPackageSizeLimit method.

The other important aspect of a source provider is the GetPackage method. It accesses the source (by ensuring a locked access if necessary), retrieves data and returns it in the data package. Implementation is highly dependent on the source data. For example, integer source provider just advances the current low field value and returns data package that doesn’t contain bunch of values but just low and high boundaries (and that’s why it is considered to be fast). Enumerator source provider locks the source, fetches the data and builds data package value by value. And in the simplest case, TOmniValueEnumerator source provider just fetches values and builds data package.

 1 function TOmniValueEnumeratorProvider.GetPackage(dataCount: integer; 
 2   package: TOmniDataPackage): boolean;
 3 var
 4   iData     : integer;
 5   intPackage: TOmniValueEnumeratorDataPackage absolute package;
 6   timeout   : cardinal;
 7   value     : TOmniValue;
 8 begin
 9   Assert(not StorePositions);
10   Result := false;
11   dataCount := intPackage.Prepare(dataCount);
12   timeout := INFINITE;
13   for iData := 1 to dataCount do begin
14     if not vepEnumerator.TryTake(value, timeout) then
15       break; //for
16     intPackage.Add(value);
17     timeout := 0;
18     Result := true;
19   end;
20 end; 
Data Manager

Data manager is the central hub in the OtlDataManager hierarchy. It seats between multiple local queues and the single source provider and makes sure that all parallel tasks always have some work to do.

Two different data managers are implemented at the moment - a countable data manager and a heuristic data manager. The former is used if source provider is countable and the latter if it is not. Both descend from the abstract class TOmniDataManager.

 1 TOmniDataManager = class abstract
 2 public
 3   function  CreateLocalQueue: TOmniLocalQueue; virtual; abstract;
 4   function  AllocateOutputBuffer: TOmniOutputBuffer; 
 5     virtual; abstract;
 6   function  GetNext(package: TOmniDataPackage): boolean; 
 7     virtual; abstract;
 8   procedure ReleaseOutputBuffer(buffer: TOmniOutputBuffer); 
 9     virtual; abstract;
10   procedure SetOutput(const queue: IOmniBlockingCollection);
11     overload; virtual; abstract;
12 end;

The main difference between them lies in function GetNextFromProvider which reads data from the source provider (by calling its GetPackage method). In the countable provider this is just a simple forwarder while in the heuristic provider this function tries to find a good package size that will allow all parallel tasks to work at the full speed.

 1 function TOmniHeuristicDataManager.GetNextFromProvider(
 2   package: TOmniDataPackage; generation: integer): boolean;
 3 const
 4   CDataLimit = Trunc(High(integer) / CFetchTimeout_ms);
 5 var
 6   dataPerMs: cardinal;
 7   dataSize : integer;
 8   time     : int64;
 9 begin
10   // the goal is to fetch as much (but not exceeding <fetch_limit>)
11   // data as possible in <fetch_timeout> milliseconds; highest amount
12   // of data is limited by the GetDataCountForGeneration method.
13   dataSize := GetDataCountForGeneration(generation);
14   if dataSize > hdmEstimatedPackageSize.Value then
15     dataSize := hdmEstimatedPackageSize.Value;
16   time := DSiTimeGetTime64;
17   Result := SourceProvider.GetPackage(dataSize, package);
18   time := DSiTimeGetTime64 - time;
19   if Result then begin
20     if time = 0 then
21       dataPerMs := CDataLimit
22     else begin
23       dataPerMs := Round(dataSize / time);
24       if dataPerMs >= CDataLimit then
25         dataPerMs := CDataLimit;
26     end;
27     // average over last four fetches for dynamic adaptation
28     hdmEstimatedPackageSize.Value := Round
29       ((hdmEstimatedPackageSize.Value / 4 * 3) + 
30        (dataPerMs / 4) * CFetchTimeout_ms);
31   end;
32 end;
Local Queue

Each parallel task reads data from a local queue, which is just a simple interface to the data manager. The most important part of a local queue is its GetNext method which provides the task with the next value.

1 function TOmniLocalQueueImpl.GetNext(var value: TOmniValue): boolean;
2 begin
3   Result := lqiDataPackage.GetNext(value);
4   if not Result then begin
5     Result := lqiDataManager_ref.GetNext(lqiDataPackage);
6     if Result then
7       Result := lqiDataPackage.GetNext(value);
8   end;
9 end;

Each local queue contains a local data package. GetNext first tries to read next value from that data package. If that fails (data packages is empty - it was already fully processed), it tries to get new data package from the data manager and (if successful) retries fetching next data from the (refreshed) data package.

GetNext in the data manager first tries to get next package from the source provider (via private method GetNextFromProvider which calls source provider’s GetPackage method). If that fails, it tries to steal part of workload from another task.

Stealing is the feature that allows all parallel tasks to be active up to the last value being enumerated. To implement it, data manager iterates over all local queues and tries to split each local queue’s data package in half. If that succeeds, half of data package is left in the original local queue and another half is returned to the local queue that requested more data.

Package splitting is highly dependent on data type. For example, integer data package just recalculates boundaries while enumerator-based packages must copy data around.

 1 function TOmniValueEnumeratorDataPackage.Split(
 2   package: TOmniDataPackage): boolean;
 3 var
 4   intPackage: TOmniValueEnumeratorDataPackage absolute package;
 5   iValue    : integer;
 6   value     : TOmniValue;
 7 begin
 8   Result := false;
 9   for iValue := 1 to intPackage.Prepare(vedpApproxCount.Value div 2)
10   do begin
11     if not GetNext(value) then
12       break; //for
13     intPackage.Add(value);
14     Result := true;
15   end;
16 end;
Output Ordering

Ordering (PreserveOrder) is usually used together with the Into modifier. The reason lies in the integration between the Parallel.ForEach infrastructure and your parallel code (the one that is executing as Execute payload). In the ‘normal’ ForEach, output from this parallel payload is not defined. You are allowed to generate any output in the payload but ForEach will know nothing about that. In this case OTL has no ability to preserver ordering because - at least from the viewpoint of the library - the parallelized code is producing no output.

When Into is used, however, your code uses a different signature (different parameters).

1 Parallel.ForEach(1, CMaxTest)
2   .PreserveOrder
3   .Into(primeQueue)
4   .Execute(
5     procedure (const value: integer; var res: TOmniValue)
6     begin
7       if IsPrime(value) then
8         res := value;
9     end);

Parallel payload now takes two parameters. First is - as in the more common case - the input value while the second takes the output value. As you can see from the example, the parallelized code can produce zero or one output but not more.

This small modification changes everything. As the Parallel infrastructure has control over the output parameter it can manage it internally, associate it with the input and make sure that output is generated in the same order as input was.

Let’s look at the innermost code - the part that is scheduling parallel tasks. When Into is used, InternalExecuteTask executes the following quite complicated code.

 1 InternalExecuteTask(
 2   procedure (const task: IOmniTask)
 3   var
 4     localQueue      : TOmniLocalQueue;
 5     outputBuffer_ref: TOmniOutputBuffer;
 6     position        : int64;
 7     result          : TOmniValue;
 8     value           : TOmniValue;
 9   begin
10     oplDataManager.SetOutput(oplIntoQueueIntf);
11     localQueue := oplDataManager.CreateLocalQueue;
12     try
13       outputBuffer_ref := oplDataManager.AllocateOutputBuffer;
14       try
15         localQueue.AssociateBuffer(outputBuffer_ref);
16         result := TOmniValue.Null;
17         while (not Stopped) and 
18               localQueue.GetNext(position, value) do 
19         begin
20           loopBody(task, value, result);
21           if not result.IsEmpty then begin
22             outputBuffer_ref.Submit(position, result);
23             result := TOmniValue.Null;
24           end;
25         end;
26       finally 
27         oplDataManager.ReleaseOutputBuffer(outputBuffer_ref); 
28       end;
29     finally 
30       FreeAndNil(localQueue); 
31     end;
32   end);

Important points here are:

The interesting part is hidden in the background; inside local queue, data manager and output buffer.

The first modification lies in the data source. When PreserveOrder is used, each data package knows the source position it was read from. To simplify matters, data package splitting is not used in this case. [And because of that, data stealing cannot be used causing slightly less effective use of CPU as in the simpler ForEach case.]

Each local queue has an output buffer set associated with it.

Each output buffer set manages two output buffers. One is active and task is writing into it and another may be either empty or full. Each output buffer is associated with an input position - just as the data package is.

When we look at data reading/writing from perspective of one task, everything is very simple. The task is reading data from a local queue (which reads data from a data package, associated with some position) and writing it to an output buffer (associated with the same position).

The tricky part comes up when the data package is exhausted (the if not Result branch in the code below).

 1 function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean;
 2 begin
 3   Result := lqiDataPackage.GetNext(position, value);
 4   if not Result then begin
 5     lqiBufferSet.ActiveBuffer.MarkFull;
 6     lqiBufferSet.ActivateBuffer; 
 7       // this will block if alternate buffer is also full
 8     Result := lqiDataManager_ref.GetNext(lqiDataPackage);
 9     if Result then begin
10       Result := lqiDataPackage.GetNext(position, value);
11       if Result then
12         lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range;
13     end;
14   end;
15 end;

First, the currently active buffer is marked as full. This causes NotifyBufferFull to be called (see below). Then, alternate buffer is activated. This call (ActivateBuffer) will actually block if alternate buffer is not free. In this case, the current thread is blocked until one of its buffers is written into the output queue.

From this point on, GetNext proceeds in the same way as when used in the simple ForEach, except that it sets active buffer’s position whenever new data package is read from the data manager.

The other part of the magic happens in the method that is called from MarkFull. It walks the buffer list and checks if there are any output buffers that are a) full and b) destined for the current output position. Such buffers are copied to the output and returned into use.

 1 procedure TOmniBaseDataManager.NotifyBufferFull(
 2   buffer: TOmniOutputBufferImpl);
 3 begin
 4   // Remove buffer from the list. Check if next buffer is waiting in
 5   // the list. Copy buffer if it is full and repeat the process.
 6   dmBufferRangeLock.Acquire;
 7   try
 8     while (dmBufferRangeList.Count > 0) and
 9           (BufferList[0].Range.First = dmNextPosition) and
10           BufferList[0].IsFull do
11     begin
12       buffer := TOmniOutputBufferImpl(
13         dmBufferRangeList.ExtractObject(0));
14       dmNextPosition := buffer.Range.Last + 1;
15       buffer.CopyToOutput;
16     end;
17   finally dmBufferRangeLock.Release; end;
18 end;

To recap: