2. High-level Multithreading

Face it – multithreading programming is hard. It is hard to design a multithread program, it is hard to write and test it and it is insanely hard to debug it. To alleviate this problem, OmniThreadLibrary introduces a number of pre-packaged multithreading solutions; so-called abstractions.

The idea behind the high-level abstractions is that the user should just choose appropriate abstraction and write the worker code, while the OmniThreadLibrary provides the framework that implements the tricky multithreaded parts, takes care of synchronisation and so on.

2.1 Introduction

High-level abstractions are implemented in the OtlParallel unit. They are all created through the factory class Parallel. High-level code intensively uses anonymous methods and generics which makes Delphi 2009 the minimum supported version. As the implementation of generics in D2009 is not very stable, I’d recommend using at least Delphi 2010.

2.1.1 A Lifecycle of an Abstraction

Typically, factories from the Parallel class return an interface. For example, Parallel implements five Join overloads which create a Join abstraction.

1 class function  Join: IOmniParallelJoin; overload;
2 class function  Join(const task1, task2: TProc): 
3   IOmniParallelJoin; overload;
4 class function  Join(const task1, task2: TOmniJoinDelegate):
5   IOmniParallelJoin; overload;
6 class function  Join(const tasks: array of TProc): 
7   IOmniParallelJoin; overload;
8 class function  Join(const tasks: array of TOmniJoinDelegate):
9   IOmniParallelJoin; overload;

Important fact to keep in mind is that the Join abstraction (or any other abstraction returned from any of the Parallel factories) will only be alive as long as this interface is not destroyed. For example, the following code fragment will function fine.

 1 procedure Test_OK;
 2 begin
 3   Parallel.Join(
 4     procedure
 5     begin
 6       Sleep(10000);
 7     end,
 8     procedure
 9     begin
10       Sleep(15000);
11     end;
12   end
13   ).Execute;
14 end;

The interface returned from the Parallel.Join call is stored in a hidden variable, which will only be destroyed while executing the end statement of the Test_OK procedure. As the Execute waits for all subtasks to complete, Join will complete its execution before the end is executed and before the interface is destroyed.

Modify the code slightly and it will not work anymore.

 1 procedure Test_Fail;
 2 begin
 3   Parallel.Join(
 4     procedure
 5     begin
 6       Sleep(10000);
 7     end,
 8     procedure
 9     begin
10       Sleep(15000);
11     end;
12   end
13   ).NoWait.Execute;
14 end;

Because of the NoWait modifier, Execute will not wait for subtasks to complete but will return immediately. Next, the code behind the end will be executed and will destroy the abstraction while the subtasks are still running.

In such cases, it is important to keep the interface in a global field (typically it will be stored inside a form or another object) which will stay alive until the abstraction has stopped.

 1 procedure Test_OK_Again;
 2 begin
 3   FJoin := Parallel.Join(
 4     procedure
 5     begin
 6       Sleep(10000);
 7     end,
 8     procedure
 9     begin
10       Sleep(15000);
11     end;
12   end
13   ).NoWait.Execute;
14 end;

This leads to a new problem – when should this interface be destroyed? The answer depends on the abstraction used. Some abstractions provide OnStop method which can be used for this purpose. For other abstractions, you should use termination handler of the task configuration block.

2.1.2 Anonymous Methods, Methods and Procedures

High-level abstractions are very much based on anonymous methods. They are used all over the OtlParallel unit and they will also be used in your own code as they provide the simplest way to interact with the high-level threading. All of delegates (pieces of code that you ‘plug in’ into the high-level infrastructure) are declared as anonymous methods.

That, however, does not force you to write anonymous methods to use high-level multithreading. Thanks to the Delphi compiler, you can always provide a normal function/procedure or a method when an anonymous method is required.

For example, let’s take a look at the IOmniWorkItemConfig interface. It announces (between other stuff) method OnExecute which accepts a delegate of type TOmniBackgroundWorkerDelegate.

1 TOmniBackgroundWorkerDelegate = reference to procedure (
2   const workItem: IOmniWorkItem);
3 
4 IOmniWorkItemConfig = interface
5   function  OnExecute(const aTask: TOmniBackgroundWorkerDelegate):
6     IOmniWorkItemConfig;
7   ...
8 end; 

Let’s assume a variable of the appropriate type, config: IOmniWorkItemConfig. You can then call the OnExecute method using an anonymous method.

1 config.OnExecute(
2   procedure(const workItem: IOmniWorkItem)
3   begin
4     ...
5   end);

Alternatively, you could declare a ‘normal’ procedure with the same signature (with the same parameters) and pass it to the OnExecute call.

1 procedure OnConfigExecute(const workItem: IOmniWorkItem)
2 begin
3   ...
4 end;
5 
6 config.OnExecute(OnConfigExecute);

The third option is to pass a method of some class to the OnExecute.

1 procedure TMyClass.OnConfigExecute(const workItem: IOmniWorkItem)
2 begin
3   ...
4 end;
5 
6 procedure TMyClass.DoConfig;
7 begin
8   config.OnExecute(OnConfigExecute);
9 end;

These three options are valid whenever an anonymous method delegate can be used.

2.1.3 Pooling

Starting a thread is relatively slow operation, which is why threads, used in the high-level abstractions are not constantly created and destroyed. Rather than that, they are allocated from a thread pool.

All high-level abstractions are using the same thread pool, GlobalParallelPool. It has public visibility – just in case you have to configure its parameters.

1 function GlobalParallelPool: IOmniThreadPool;

This behaviour can be overridden by using the task configuration block.

The following rules hold for all tasks created by the high-level abstractions:

2.2 Blocking collection

The blocking collection is a Delphi clone of .NET 4 BlockingCollection. It is a thread-safe collection that provides multiple simultaneous readers and writers. This implementation of blocking collection works only with the TOmniValue elements, which is not a big limitation provided that TOmniValue can store anything, including a class, an interface and a record.

See also demo 33_BlockingCollection.

2.2.1 IOmniBlockingCollection

The blocking collecting is exposed as an interface that lives in the OtlCollections unit.

 1 IOmniBlockingCollection = interface
 2   procedure Add(const value: TOmniValue);
 3   procedure CompleteAdding;
 4   function  GetEnumerator: IOmniValueEnumerator;
 5   function  IsCompleted: boolean;
 6   function  IsEmpty: boolean;
 7   function  IsFinalized: boolean;
 8   function  Next: TOmniValue;
 9   procedure ReraiseExceptions(enable: boolean = true);
10   procedure SetThrottling(highWatermark, lowWatermark: integer);
11   function  Take(var value: TOmniValue): boolean;
12   function  TryAdd(const value: TOmniValue): boolean;
13   function  TryTake(var value: TOmniValue; 
14     timeout_ms: cardinal = 0): boolean;
15   property ContainerSubject: TOmniContainerSubject 
16     read GetContainerSubject;
17   property Count: integer read GetApproxCount;
18 end;

There’s also a class TOmniBlockingCollection which implements this interface. This class is public and can be used or reused in your code.

TOmniBlockingCollection also implements a class function ToArray<T> which extracts all data from a blocking collection and stores it into an array.

1 class function TOmniBlockingCollection.ToArray<T>
2   (coll: IOmniBlockingCollection): TArray<T>;

Demo 61_CollectionToArray shows how to use ToArray<T>.

The blocking collection works in the following way:

Enumerator calls Take in the MoveNext method and returns the value returned from Take. Enumerator will therefore block when there is no data in the collection. The usual way to stop the enumerator is to call CompleteAdding which will unblock all pending MoveNext calls and stop enumeration.

2.2.2 Throttling

Normally, a blocking collection can grow without limits and can fill up the available memory. If the algorithm doesn’t prevent this intrinsically, it is sometimes useful to set up throttling, a mechanism which blocks additions when the blocking collection size reaches some predetermined value (high watermark) and which allows additions again when the size reaches another predetermined value (low watermark).

1 procedure SetThrottling(highWatermark, lowWatermark: integer);

The behaviour of Add, TryAdd, Take and TryTake is modified if the throttling is used.

When Add or TryAdd is called and number of elements in the blocking collection equals highWatermark, the code blocks. It will only continue if the number of elements in the collection falls below the lowWatermark or if the CompleteAdding is called.

When Take or TryTake take an element from the collection and adding is temporarily blocked because of the throttling and new number of elements in the collection is now below the lowWatermark, all waiting Add and TryAdd calls will be unblocked.

2.3 Task Configuration

High-level abstractions will do most of the hard work for you, but sometimes you’ll still have to apply some configuration to the low-level parallel tasks (entities represented with the IOmniTask interface). The mechanism for doing low-level configuration is called task configuration block.

Task configuration block, or IOmniTaskConfig, is an interface returned from the Parallel.TaskConfig factory function.

1 class function Parallel.TaskConfig: IOmniTaskConfig;
2 begin
3   Result := TOmniTaskConfig.Create;
4 end;

This interface contains various functions that set up messaging handlers, termination handlers and so on. All function return interface itself so they can be used in a fluent manner.

 1 IOmniTaskConfig = interface
 2   procedure Apply(const task: IOmniTaskControl);
 3   function  CancelWith(
 4     const token: IOmniCancellationToken): IOmniTaskConfig;
 5   function  MonitorWith(
 6     const monitor: IOmniTaskControlMonitor): IOmniTaskConfig;
 7   function  NoThreadPool: IOmniTaskConfig;
 8   function  OnMessage(
 9     eventDispatcher: TObject): IOmniTaskConfig; overload;
10   function  OnMessage(
11     eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload;
12   function  OnMessage(
13     msgID: word; 
14     eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload;
15   function  OnMessage(
16     msgID: word; 
17     eventHandler: TOmniOnMessageFunction): IOmniTaskConfig; overload;
18   function  OnTerminated(
19     eventHandler: TOmniTaskTerminatedEvent): IOmniTaskConfig; overload;
20   function  OnTerminated(
21     eventHandler: TOmniOnTerminatedFunction): IOmniTaskConfig; overload;
22   function  OnTerminated(
23     eventHandler: TOmniOnTerminatedFunctionSimple): IOmniTaskConfig; 
24     overload;
25   function  SetPriority(threadPriority: TOTLThreadPriority): IOmniTaskConfig;
26   function  ThreadPool(const threadPool: IOmniThreadPool): IOmniTaskConfig;
27   function  WithCounter(
28     const counter: IOmniCounter): IOmniTaskConfig;
29   function  WithLock(
30     const lock: TSynchroObject; 
31     autoDestroyLock: boolean = true): IOmniTaskConfig; overload;
32   function  WithLock(
33     const lock: IOmniCriticalSection): IOmniTaskConfig; overload;
34 end; 

Examples of task configuration block usage are demonstrated in demo 47_TaskConfig. Some examples are also given in sections describing individual abstractions.

2.4 Async

Async is the simplest of high-level abstractions and is typically used for fire and forget scenarios. To create an Async task, call Parallel.Async.

See also demo 46_Async.

Example:

1 Parallel.Async(
2   procedure
3   begin
4     MessageBeep($FFFFFFFF);
5   end);

This simple program creates a background task with a sole purpose to make some noise from it. The task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

The Parallel class defines two Async overloads. The first accepts a parameter-less background task and an optional task configuration block and the second accepts a background task with an IOmniTask parameter and an optional task configuration block.

 1 type
 2   TOmniTaskDelegate = reference to procedure(const task: IOmniTask);
 3 	
 4   Parallel = class
 5     class procedure Async(task: TProc;
 6       taskConfig: IOmniTaskConfig = nil); overload;
 7     class procedure Async(task: TOmniTaskDelegate;
 8       taskConfig: IOmniTaskConfig = nil); overload;
 9     ...
10   end;

The second form is useful if the background code needs access to the IOmniTask interface, for example to send messages to the owner or to execute code in the owner thread (typically that will be the main thread).

The example below uses Async task to fetch the contents of a web page (by calling a mysterious function HttpGet) and then uses Invoke to execute a code that logs the length of the result in the main thread.

 1 Parallel.Async(
 2   procedure (const task: IOmniTask)
 3   var 
 4     page: string;
 5   begin
 6     HttpGet('otl.17slon.com', 80, 'tutorials.htm', page, '');
 7     task.Invoke(
 8       procedure
 9       begin
10         lbLogAsync.Items.Add(Format('Async GET: %d ms; page length = %d', 
11           [time, Length(page)]))
12       end);
13   end);

The same result could be achieved by sending a message from the background thread to the main thread. TaskConfig block is used to configure message handler.

 1 const
 2   WM_RESULT = WM_USER;
 3 	
 4 procedure LogResult(const task: IOmniTaskControl; const msg: TOmniMessage);
 5 begin
 6   lbLogAsync.Items.Add(Format('Async GET: %d ms; page length = %d', 
 7     [time, Length(page)]))
 8 end;
 9 	
10 Parallel.Async(
11   procedure (const task: IOmniTask)
12   var 
13     page: string;
14   begin
15     HttpGet('otl.17slon.com', 80, 'tutorials.htm', page, '');
16     task.Comm.Send(WM_RESULT, page);
17   end,
18   TaskConfig.OnMessage(WM_RESULT, LogResult)
19 );

Let me warn you that in cases where you want to return a result from a background task, Async abstraction is not the most appropriate. You would be better off by using a Future.

2.4.1 Handling Exceptions

If the background code raises unhandled exception, OmniThreadLibrary will catch this exception and re-raise it in the OnTerminated handler. This way the exception will travel from the background thread to the owner thread where it can be processed.

As the OnTerminated handler occurs at the unspecified moment when Windows are processing window messages, there is no good way to catch this message with a try..except block. The caller must install its own OnTerminated handler instead and handle exception there.

The following example uses OnTerminated handler to detach fatal exception from the task, log the exception details and destroy the exception object.

 1 Parallel.Async(
 2   procedure
 3   begin
 4     Sleep(1000);
 5     raise Exception.Create('Exception in Async');
 6   end,
 7   Parallel.TaskConfig.OnTerminated(
 8     procedure (const task: IOmniTaskControl)
 9     var
10       excp: Exception;
11     begin
12       if assigned(task.FatalException) then begin
13         excp := task.DetachException;
14         Log('Caught async exception %s:%s',[excp.ClassName, excp.Message]);
15         FreeAndNil(excp);
16       end;
17     end
18   ));

If you don’t install a OnTerminated handler, exception will be handled by the application-level filter, which will by default cause a message box to appear.

See also demo 48_OtlParallelExceptions.

2.5 Async/Await

Async/Await is a simplified version of the Async abstraction which mimics the .NET Async/Await mechanism.3

In short, Async/Await accepts two parameterless anonymous methods. The first one is executed in a background thread and the second one is executed in the main thread after the background thread has completed its work.

See also demo 53_AsyncAwait.

Using Async/Await you can, for example, create a background operation which is triggered by a click and which re-enables button after the background job has been completed.

 1 procedure TForm1.Button1Click(Sender: TObject);
 2 var
 3   button: TButton;
 4 begin
 5   button := Sender as TButton;
 6   button.Caption := 'Working ...';
 7   button.Enabled := false;
 8   Async( 
 9     // executed in a background thread
10     procedure begin
11       Sleep(5000);
12     end).
13   Await( 
14     // executed in the main thread after 
15     // the anonymous method passed to 
16     // Async has completed its work
17     procedure begin
18       button.Enabled := true;
19       button.Caption := 'Done!';
20     end);
21 end;

Exceptions in the Async part are currently not handled by the OmniThreadLibrary.

2.6 Future

A future is a background calculation that returns a result. To create the task, call Parallel.Future<T> (where T is the type returned from the calculation). This will return a result of type IOmniFuture<T>, which is the interface you will use to work with the background task.

To get the result of the calculation, call the .Value method on the interface returned from the Parallel.Future call.

See also demo 39_Future.

Example:

 1 var
 2   FCalculation: IOmniFuture<integer>;
 3 
 4 procedure StartCalculation;
 5 begin
 6   FCalculation := Parallel.Future<integer>(
 7     function: integer
 8     var
 9       i: integer;
10     begin
11       Result := 0;
12       for i := 1 to 100000 do
13         Result := Result + i;
14     end
15   );
16 end;
17 
18 function GetResult: integer;
19 begin
20   Result := FCalculation.Value;
21   FCalculation := nil;
22 end;

The Parallel class implements two Future<T> overloads. The first accepts a parameter-less background task and an optional task configuration block and the second accepts a background task with an IOmniTask parameter and an optional task configuration block.

 1 type
 2   TOmniFutureDelegate<T> = reference to function: T;
 3   TOmniFutureDelegateEx<T> = 
 4     reference to function(const task: IOmniTask): T;
 5 
 6   Parallel = class
 7     class function Future<T>(action: TOmniFutureDelegate<T>;
 8       taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;
 9     class function Future<T>(action: TOmniFutureDelegateEx<T>;
10       taskConfig: IOmniTaskConfig = nil): IOmniFuture<T>; overload;
11     ...
12   end;

The second form is useful if the background code needs access to the IOmniTask interface, for example to send messages to the owner or to execute code in the owner thread (typically that will be the main thread).

A Future task always wraps a function of some type. In the example above the function added numbers from 1 to 100000 and returned an integer result. That’s why the Future task was created by calling Parallel.Future<integer> and why the result – the interface that provides a way to manage the task – is declared as IOmniFuture<integer>. But a Future could equally well return result of any type – a string or a date/time or even a record, class or interface.

The following example is a rewrite of the Async example. It uses the same mysterious HttpGet function but it is wrapped in a more flexible way. Function StartHttpGet accepts a url parameter specifying which page to retrieve from the web server. It then creates a future returning a string and passes it a simple code to execute – a one-liner anonymous function that only calls the already known HttpGet.

This example illustrates two important points:

 1 var
 2   FGetFuture: IOmniFuture<string>;
 3 
 4 function HttpGet(const url: string): string;
 5 begin
 6   // this function fetches a page from the web server 
 7   // and returns its contents
 8 end;
 9 
10 procedure StartHttpGet(const url: string);
11 begin
12   FGetFuture := Parallel.Future<string>(
13     function: string
14     begin
15       Result := HttpGet(url);
16     end
17   );
18 end;
19 
20 function GetResult: string;
21 begin
22   Result := FGetFuture.Value;
23   FGetFuture := nil;
24 end;

2.6.1 IOmniFuture<T> Interface

The IOmniFuture<T> interface implements other methods besides the Value.

 1 type
 2   IOmniFuture<T> = interface
 3     procedure Cancel;
 4     function  DetachException: Exception;
 5     function  FatalException: Exception;
 6     function  IsCancelled: boolean;
 7     function  IsDone: boolean;
 8     function  TryValue(timeout_ms: cardinal; var value: T): boolean;
 9     function  Value: T;
10     function  WaitFor(timeout_ms: cardinal): boolean;
11    end;

The interface implements exception-handling functions, cancellation support and functions that check if the background calculation has completed.

2.6.2 Completion Detection

When you call the Value function you don’t know ahead what will happen. If the background code has already calculated the result, the Value call will return immediately. Otherwise, the caller thread will be blocked until the result is available and if you are executing a long calculation (or if the web or database connection did not succeed and is now waiting for a timeout to occur) this may last a while. If you created the future in the main thread, then your whole application will be blocked until Value returns.

There are few ways around this problem. One is to periodically call the IsDone function. It will return False while the background calculation is still working and True one the result is available. Another option is to call WaitFor with some (small) timeout. WaitFor will wait specified number of milliseconds and will return True if result is available. Third way to achieve the same is to call TryValue periodically. TryValue also waits some specified number of milliseconds and returns True if result is available but in addition it will also return the result in the value parameter.

The fourth and completely different way is to specify the termination handler which will notify you when the background calculation is completed. The following example sets the termination handler to get the value of the background calculation into the memo field and then destroy the Future interface.

 1 procedure StartHttpGet(const url: string);
 2 begin
 3   FGetFuture := Parallel.Future<string>(
 4     function: string
 5     begin
 6       Result := HttpGet(url);
 7     end,
 8     Parallel.TaskConfig.OnTerminated(
 9       procedure
10       begin
11         Memo1.Text := FGetFuture.Value;
12         FGetFuture := nil;
13       end
14      )
15   );
16 end;

2.6.3 Cancellation

It is possible to cancel the background execution of the future before it is completed. The Future uses Cancellation token mechanism to achieve this. Cancellation is cooperative – if the background task does not willingly cancel itself, cancellation will fail.

To cancel a background task, the Future owner (the code that called Parallel.Future) has to call the Cancel method on the IOmniFuture<T> interface. This will signal the cancellation token which the background task must check periodically. To get access to the cancellation token, background code must be declared differently – not as parameterless function but as a function accepting a IOmniTask parameter.

The following (pretty much pointless) program illustrates this concept.

 1 var
 2   FCountFuture: IOmniFuture<integer>;
 3 
 4 function CountTo100(const task: IOmniTask): integer;
 5 var
 6   i: integer;
 7 begin
 8   for i := 1 to 100 do begin
 9     Sleep(100);
10     Result := i;
11     if task.CancellationToken.IsSignalled then
12       break; //for
13   end;
14 end;
15 
16 procedure StartCounting;
17 begin
18   FCountFuture := Parallel.Future<integer>(CountTo100);
19   Sleep(100);
20   FCountFuture.Cancel;
21   FCountFuture.WaitFor(INFINITE);
22   FCountFuture := nil;
23 end;

StartCounting creates a Future which executes CountTo100 function in the background. It then sleeps 100 milliseconds, calls the Cancel function, waits for the Future to terminate and clears the Future interface.

CountTo100 function counts from 1 to 100. It sleeps for 100 milliseconds after each number, stores the current counter in the function result and then checks the cancellation token. If it is signaled (meaning that the owner called the Cancel function), it will break out of the for loop.

If you put a breakpoint on the last line of the StartCounting function and run the program, you’ll see that it will be reached almost immediately, proving that the CountTo100 did not take 10 seconds to return a result (100 repeats * 100 milliseconds = 10 seconds).

You cannot call the Value function if the calculation was cancelled as it would raise an EFutureCancelled exception. If you don’t know whether the Cancel was called or not, you can call the IOmniFuture<T>.IsCancelled and check the result (True = calculation was cancelled).

2.6.4 Handling Exceptions

If the background code raises unhandled exception (i.e. the exception was not captured in a try..except block), OmniThreadLibrary will catch this exception and gracefully complete the background task. When you call the Value function, this exception will be raised.

This immensely helps with debugging as the background exceptions (exceptions in background threads) are ignored. Delphi ignores all exceptions in background threads – you can handle them if you write appropriate code but they are not automatically visible – and that can be quite dangerous. As the Future exceptions are re-raised in the main thread when the Value is called this makes them equivalent to other exceptions in the main thread.

There are few different ways to handle exceptions in Futures and they are most simply explained through the code. First example catches the exception by wrapping the Value call in try..except.

 1 procedure FutureException1;
 2 var
 3   future: IOmniFuture<integer>;
 4 begin
 5   future := Parallel.Future<integer>(
 6     function: integer
 7     begin
 8       raise ETestException.Create('Exception in Parallel.Future');
 9     end
10   );
11   Log('Future is executing ...');
12   Sleep(1000);
13   try
14     Log('Future retured: %d', [future.Value]);
15   except
16     on E: Exception do
17       Log('Future raised exception %s:%s', [E.ClassName, E.Message]);
18   end;
19 end;

Second example uses WaitFor to wait on task completion and then checks the result of the FatalException function. It will return Nil if there was no exception or the exception object if there was an exception. Exception object itself will still be owned by the Future task and will be destroyed when the Future is destroyed.

 1 procedure FutureException2;
 2 var
 3   future: IOmniFuture<integer>;
 4 begin
 5   future := Parallel.Future<integer>(
 6     function: integer
 7     begin
 8       raise ETestException.Create('Exception in Parallel.Future');
 9     end
10   );
11   Log('Future is executing ...');
12   future.WaitFor(INFINITE);
13   if assigned(future.FatalException) then
14     Log('Future raised exception %s:%s', 
15       [future.FatalException.ClassName, future.FatalException.Message])
16   else
17     Log('Future retured: %d', [future.Value]);
18 end;

Third example shows how you can detach exception from the future. By calling DetachException you will get the ownership of the exception object and you should destroy it at some appropriate point in time.

 1 procedure FutureException3;
 2 var
 3   excFuture: Exception;
 4   future   : IOmniFuture<integer>;
 5 begin
 6   future := Parallel.Future<integer>(
 7     function: integer
 8     begin
 9       raise ETestException.Create('Exception in Parallel.Future');
10     end
11   );
12   Log('Future is executing ...');
13   future.WaitFor(INFINITE);
14   excFuture := future.DetachException;
15   try
16     if assigned(excFuture) then
17       Log('Future raised exception %s:%s', 
18         [excFuture.ClassName, excFuture.Message])
19     else
20       Log('Future retured: %d', [future.Value]);
21   finally FreeAndNil(excFuture); end;
22 end;

See also demo 48_OtlParallelExceptions.

2.6.5 Examples

Practical examples of Future usage can be found in chapter OmniThreadLibrary and COM/OLE.

2.7 Join

Join abstraction enables you to start multiple background tasks in multiple threads. To create the task, call Parallel.Join.

See also demo 37_ParallelJoin.

Example:

 1 Parallel.Join(
 2   procedure
 3   var
 4     i: integer;
 5   begin
 6     for i := 1 to 8 do begin
 7       Sleep(200);
 8       MessageBeep($FFFFFFFF);
 9     end,
10   procedure
11   var
12     i: integer;
13   begin
14     for i := 1 to 10 do begin
15       Sleep(160);
16       MessageBeep($FFFFFFFF);
17     end;
18   end
19 ).Execute;

This simple program executes two background tasks, each beeping at different frequency. Each task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

The Parallel class defines five Join overloads. The first creates empty IOmniParallelJoin interface. Next two create same interface but configured with two tasks and last two contain this interface configured for any number of tasks. Tasks can be of two different types – parameterless methods and methods containing one parameter of the IOmniJoinState type.

 1 type
 2   TOmniJoinDelegate = reference to procedure (const joinState:
 3                         IOmniJoinState);
 4 
 5   Parallel = class
 6     class function Join: IOmniParallelJoin; overload;
 7     class function Join(const task1, task2: TProc): IOmniParallelJoin;
 8                      overload;
 9     class function Join(const task1, task2: TOmniJoinDelegate): 
10                      IOmniParallelJoin; overload;
11     class function Join(const tasks: array of TProc): IOmniParallelJoin;
12                      overload;
13     class function Join(const tasks: array of TOmniJoinDelegate): 
14                      IOmniParallelJoin; overload;
15     ...
16   end;

2.7.1 IOmniParallelJoin Interface

Parallel.Join returns an IOmniParallelJoin interface which you can use to specify tasks, start and control execution and handle exceptions.

 1 type
 2   IOmniParallelJoin = interface
 3     function Cancel: IOmniParallelJoin;
 4     function DetachException: Exception;
 5     function Execute: IOmniParallelJoin;
 6     function FatalException: Exception;
 7     function IsCancelled: boolean;
 8     function IsExceptional: boolean;
 9     function NumTasks(numTasks: integer): IOmniParallelJoin;
10     function OnStop(const stopCode: TProc): IOmniParallelJoin; overload;
11     function OnStop(const stopCode: TOmniTaskStopDelegate): IOmniParallelJoin; overload;
12     function OnStopInvoke(const stopCode: TProc): IOmniParallelJoin;
13     function Task(const task: TProc): IOmniParallelJoin; overload;
14     function Task(const task: TOmniJoinDelegate): IOmniParallelJoin;
15                overload;
16     function TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin;
17     function NoWait: IOmniParallelJoin;
18     function WaitFor(timeout_ms: cardinal): boolean;
19   end;

The most important of these functions is Execute. It will start appropriate number of background threads and start executing tasks in those threads. By default, Join uses as many threads as there are tasks but you can override this behaviour by calling the NumTasks function.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

You can add tasks to the Join by calling the Task function. In fact, that’s just how the Parallel.Join overloads are implemented.

 1 class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin;
 2 begin
 3   Result := TOmniParallelJoin.Create.Task(task1).Task(task2);
 4 end;
 5 
 6 class function Parallel.Join(const tasks: array of TProc): IOmniParallelJoin;
 7 var
 8   aTask: TProc;
 9 begin
10   Result := TOmniParallelJoin.Create;
11   for aTask in tasks do
12     Result.Task(aTask);
13 end;

To set up task configuration block, call the TaskConfig function. Following example uses TaskConfig to set up critical section which is then used in two parallel tasks to protect the shared resource. Workers use the IOmniJoinState instance to access the IOmniTask interface and through it the Lock property.

 1 FSharedValue := 42;
 2 Parallel.Join(
 3   procedure (const joinState: IOmniJoinState)
 4   var
 5     i: integer;
 6   begin
 7     for i := 1 to 1000000 do begin
 8       joinState.Task.Lock.Acquire;
 9       FSharedValue := FSharedValue + 17;
10       joinState.Task.Lock.Release;
11     end;
12   end,
13   procedure (const joinState: IOmniJoinState)
14   var
15     i: integer;
16   begin
17     for i := 1 to 1000000 do begin
18       joinState.Task.Lock.Acquire;
19       FSharedValue := FSharedValue - 17;
20       joinState.Task.Lock.Release;
21     end;
22   end
23 ).TaskConfig(Parallel.TaskConfig.WithLock(CreateOmniCriticalSection))
24  .Execute;

By default, Join will wait for all background tasks to complete execution. Alternatively, you can call the NoWait function, after which Join will just start the tasks and return immediately. If you want to be notified when all tasks are finished, you can assign the termination handler by calling the OnStop function. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block.

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke. For example, see Parallel.ForEach.OnStopInvoke.

You can also call WaitFor to wait for the Join to finish. WaitFor accepts an optional timeout parameter; by default it will wait as long as needed.

2.7.2 IOmniJoinState Interface

Tasks can be defined to accept a parameter of type IOmniJoinState. This allows them to access the IOmniTask interface, participate in the cooperative cancellation and check for exceptions.

1 type
2   IOmniJoinState = interface
3     function  GetTask: IOmniTask;
4   //
5     procedure Cancel;
6     function  IsCancelled: boolean;
7     function  IsExceptional: boolean;
8     property Task: IOmniTask read GetTask;
9   end;

2.7.3 Cancellation

Join background tasks support cooperative cancellation. If you are using TOmniJoinDelegate tasks (that is, tasks accepting the IOmniJoinState parameter), any task can call the Cancel method of this interface. This, in turn, sets internal cancellation flag which may be queried by calling the IsCancelled method. That way, one task can interrupt other tasks provided that they are testing IsCancelled repeatedly.

Main thread can also cancel its subtasks (when using NoWait) by calling IOmniParallelJoin.Cancel and can test the cancellation flag by calling IsCancelled.

The following demo code demonstrates most of concepts mentioned above.

 1 var
 2   join: IOmniParallelJoin;
 3   time: int64;
 4 begin
 5   FJoinCount.Value := 0;
 6   FJoinCount2.Value := 0;
 7   join := Parallel.Join(
 8     procedure (const joinState: IOmniJoinState)
 9     var
10       i: integer;
11     begin
12       for i := 1 to 10 do begin
13         Sleep(100);
14         FJoinCount.Increment;
15         if joinState.IsCancelled then
16           break; //for
17       end;
18     end,
19     procedure (const joinState: IOmniJoinState)
20     var
21       i: integer;
22     begin
23       for i := 1 to 10 do begin
24         Sleep(200);
25         FJoinCount2.Increment;
26         if joinState.IsCancelled then
27           break; //for
28       end;
29     end
30   ).NoWait.Execute;
31   Sleep(500);
32   time := DSiTimeGetTime64;
33   join.Cancel.WaitFor(INFINITE);
34   Log(Format('Waited %d ms for joins to terminate',
35     [DSiElapsedTime64(time)]));
36   Log(Format('Tasks counted up to %d and %d',
37     [FJoinCount.Value, FJoinCount2.Value]));
38 end;

The call to Parallel.Join starts two tasks. Because the NoWait is used, the call returns immediately and stores resulting IOmniParallelJoin interface in the local variable join. Main code then sleeps for half a second, cancels the execution and immediately waits for background tasks to terminate.

Both tasks execute a simple loop which waits a little, increments a counter and checks the cancellation flag. Because the cancellation flag is set after 500 ms, we would expect five or six repetitions of the first loop (five repetitions take exactly 500 ms and we can’t tell exactly what will execute first – Cancel or fifth IsCancelled) and three repetitions of the second loop. That is exactly what the program prints out.

2.7.4 Handling Exceptions

Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of Join (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.Join(…).Execute will re-raise task exceptions). If, however, you are using the asynchronous version (by calling Parallel.Join(…).NoWait.Execute), exception will only be raised when you wait for the background tasks to complete by calling WaitFor.

You can test for the exception by calling the FatalException function. It will first wait for all background tasks to complete (without raising the exception) and then return the exception object. You can also detach the exception object from the Join and handle it yourself by using the DetachException function.

There’s also an IsExceptional function (available in IOmniParallelJoin and IOmniJoinState interfaces) which tells you if any background task has thrown an exception.

There’s an additional complication – as Join executes multiple tasks, there can be multiple background exceptions. To get you full access to those exceptions, Join wraps them into EJoinException object.

1 type
2   EJoinException = class(Exception)
3     constructor Create; reintroduce;
4     destructor  Destroy; override;
5     procedure Add(iTask: integer; taskException: Exception);
6     function  Count: integer;
7     property Inner[idxException: integer]: TJoinInnerException 
8       read GetInner; default;
9   end;

This exception class contains combined error messages from all background tasks in its Message property and allows you to access exception information for all caught exceptions directly with the Inner[] property. The following code demonstrates this.

 1 var
 2   iInnerExc: integer;
 3 begin
 4   try
 5     Parallel.Join([
 6       procedure begin
 7         raise ETestException.Create('Exception 1 in Parallel.Join');
 8       end,
 9       procedure begin
10       end,
11       procedure begin
12         raise ETestException.Create('Exception 2 in Parallel.Join');
13       end]).Execute;
14   except
15     on E: EJoinException do begin
16       Log('Join raised exception %s:%s', [E.ClassName, E.Message]);
17       for iInnerExc := 0 to E.Count - 1 do
18         Log('  Task #%d raised exception: %s:%s', [E[iInnerExc].TaskNumber,
19           E[iInnerExc].FatalException.ClassName,
20           E[iInnerExc].FatalException.Message]);
21     end;
22   end;
23 end;

The iInnerExc variable loops over all caught exceptions and for each such exception displays the task number (starting with 0), exception class and exception message. This approach allows you to either just log the exception or, if you are interested in details, examine specific inner exceptions and handle them appropriately.

See also demo 48_OtlParallelExceptions.

2.8 Parallel Task

Parallel Task abstraction enables you to start a background tasks in multiple threads. To create the task, call Parallel.ParallelTask.

Example:

1 Parallel.ParallelTask.NumTasks(3).Execute(
2   procedure
3   begin
4     while true do
5       ;
6   end
7 );

This simple code fragment starts infinite loop in three threads. The task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

The Parallel class implements function ParallelTask which returns a IOmniParallelTask interface. All configuration of the parallel task is done via this interface.

1 type
2   Parallel = class
3     class function ParallelTask: IOmniParallelTask;
4     ...
5   end;

2.8.1 IOmniParallelTask Interface

 1 type
 2   IOmniParallelTask = interface
 3     function  Execute(const aTask: TProc): IOmniParallelTask; overload;
 4     function  Execute(
 5       const aTask: TOmniParallelTaskDelegate): IOmniParallelTask; overload;
 6     function  NoWait: IOmniParallelTask;
 7     function  NumTasks(numTasks: integer): IOmniParallelTask;
 8     function  OnStop(const stopCode: TProc): IOmniParallelTask; overload;
 9     function  OnStop(const stopCode: TOmniTaskStopDelegate): IOmniParallelTask; overload;
10     function  OnStopInvoke(const stopCode: TProc): IOmniParallelTask;
11     function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelTask;
12     function  WaitFor(timeout_ms: cardinal): boolean;
13   end;

The most important of these functions is Execute. It will start appropriate number of background threads and start executing task in those threads.

There are two overloaded versions of Execute. The first accepts a parameter-less background task and the second accepts a background task with an IOmniTask parameter.

By default, ParallelTask uses as many threads as there are tasks but you can override this behaviour by calling the NumTasks function.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

The OnStop function can be used to set up a termination handler - a code that will get executed when all background tasks will have complete execution. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block. To set up task configuration block, call the TaskConfig function.

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke. For example, see Parallel.ForEach.OnStopInvoke.

A call to the WaitFor function will wait for up to timeout_ms milliseconds (this value can be set to INFINITE) for all background tasks to terminate. If the tasks terminate in the specified time, WaitFor will return True. Otherwise, it will return False.

2.8.2 Example

The following code uses ParallelTask to generate large quantities of pseudorandom data. The data is written to an output stream.

 1 procedure CreateRandomFile(fileSize: integer; output: TStream);
 2 const
 3   CBlockSize = 1 * 1024 * 1024 {1 MB};
 4 var
 5   buffer   : TOmniValue;
 6   memStr   : TMemoryStream;
 7   outQueue : IOmniBlockingCollection;
 8   unwritten: IOmniCounter;
 9 begin
10   outQueue := TOmniBlockingCollection.Create;
11   unwritten := CreateCounter(fileSize);
12   Parallel.ParallelTask.NoWait
13     .NumTasks(Environment.Process.Affinity.Count)
14     .OnStop(Parallel.CompleteQueue(outQueue))
15     .Execute(
16       procedure
17       var
18         buffer      : TMemoryStream;
19         bytesToWrite: integer;
20         randomGen   : TGpRandom;
21       begin
22         randomGen := TGpRandom.Create;
23         try
24           while unwritten.Take(CBlockSize, bytesToWrite) do begin
25             buffer := TMemoryStream.Create;
26             buffer.Size := bytesToWrite;
27             FillBuffer(buffer.Memory, bytesToWrite, randomGen);
28             outQueue.Add(buffer);
29           end;
30         finally FreeAndNil(randomGen); end;
31       end
32     );
33   for buffer in outQueue do begin
34     memStr := buffer.AsObject as TMemoryStream;
35     output.CopyFrom(memStr, 0);
36     FreeAndNil(memStr);
37   end;
38 end;

The code creates a blocking collection to hold buffers with pseudorandom data. Then it creates a counter which will hold the count of bytes that have yet to be written.

ParallelTask is used to start parallel workers. Each worker initializes its own pseudorandom data generator and then keeps generating buffers with pseudorandom data until the counter drops to zero. Each buffer is written to the blocking collection.

Because of the NoWait modifier, main thread continues with the execution immediately after all threads have been scheduled. Main thread keeps reading buffers from the blocking collection and writes the content of those buffers into the output stream. (As the TStream is not thread-safe, we cannot write to the output stream directly from multiple background threads.)

For..in loop will block if there is no data in the blocking collection, but it will only stop looping after the blocking collection’s CompleteAdding method is called. This is done with the help of the Parallel.CompleteQueue helper which is called from the termination handler (OnStop).

1 class function Parallel.CompleteQueue(
2   const queue: IOmniBlockingCollection): TProc;
3 begin
4   Result :=
5     procedure
6     begin
7       queue.CompleteAdding;
8     end;
9 end;

2.8.3 Handling Exceptions

Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of ParallelTask (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.ParallelTask.Execute(…) will re-raise task exceptions). If, however, you are using the asynchronous version (by calling Parallel.Paralleltask.NoWait.Execute(…)), exception will only be raised when you wait for the background tasks to complete by calling WaitFor.

For more details on handling ParallelTask exceptions, see the Handling Exception section in the Join chapter.

2.8.4 Examples

Practical examples of Parallel Task usage can be found in chapters Background Worker and List Partitioning and Parallel Data Production.

2.9 Background Worker

Background Worker abstraction implements a client/server relationship. To create a Background Worker server, call Parallel.BackgroundWorker.

See also demo 52_BackgroundWorker.

Example:

Start the worker

 1 FBackgroundWorker := Parallel.BackgroundWorker.NumTasks(2)
 2   .Execute(
 3     procedure (const workItem: IOmniWorkItem)
 4     begin
 5       workItem.Result := workItem.Data.AsInteger * 3;
 6     end
 7   )
 8   .OnRequestDone(
 9     procedure (const Sender: IOmniBackgroundWorker;
10       const workItem: IOmniWorkItem)
11     begin
12       lbLogBW.ItemIndex := lbLogBW.Items.Add(Format('%d * 3 = %d',
13         [workItem.Data.AsInteger, workItem.Result.AsInteger]));
14     end
15   );

Schedule a work item

1 FBackgroundWorker.Schedule(
2   FBackgroundWorker.CreateWorkItem(Random(100)));

Stop the worker

1 FBackgroundWorker.Terminate(INFINITE);
2 FBackgroundWorker := nil;

2.9.1 Basics

Background worker is designed around the concept of a work item. You create a worker, which spawns one or more background threads, and then schedule work items to it. When they are processed, background worker notifies you so you can process the result. Work items are queued so you can schedule many work items at once and background threads will then process them one by one.

Background worker is created by calling Parallel.BackgroundWorker factory. Usually you’ll also set the main work item processing method and completion method by calling Execute and OnRequestDone, respectively. As usual, you can provide OTL with a method, a procedure, or an anonymous method.

1 FWorker := Parallel.BackgroundWorker
2   .OnRequestDone(StringRequestDone)
3   .Execute(StringProcessorHL);

To close the background worker, call the Terminate method and set the reference (FWorker) to nil.

To create a work item, call the CreateWorkItem factory and pass the result to the Schedule method. You can pass any data to the work item by passing a parameter to the CreateWorkItem method. If you have to pass multiple parameters, you can collect them in a record and wrap it with a TOmniValue.FromRecord<T>, pass them as an array of TOmniValues or pass them as an object or an interface.

2.9.2 IOmniBackgroundWorker Interface

 1 type
 2   TOmniTaskInitializerDelegate = 
 3     reference to procedure(var taskState: TOmniValue);
 4   TOmniTaskFinalizerDelegate = 
 5     reference to procedure(const taskState: TOmniValue);
 6 
 7   IOmniBackgroundWorker = interface
 8     function  CreateWorkItem(const data: TOmniValue): IOmniWorkItem;
 9     procedure CancelAll; overload;
10     procedure CancelAll(upToUniqueID: int64); overload;
11     function  Config: IOmniWorkItemConfig;
12     function  Execute(const aTask: TOmniBackgroundWorkerDelegate = nil): 
13       IOmniBackgroundWorker;
14     function  Finalize(taskFinalizer: 
15       TOmniTaskFinalizerDelegate): IOmniBackgroundWorker;
16     function  Initialize(taskInitializer: 
17       TOmniTaskInitializerDelegate): IOmniBackgroundWorker;      
18     function  NumTasks(numTasks: integer): IOmniBackgroundWorker;
19     function  OnRequestDone(const aTask: TOmniWorkItemDoneDelegate): 
20       IOmniBackgroundWorker;
21     function  OnRequestDone_Asy(const aTask: TOmniWorkItemDoneDelegate): 
22       IOmniBackgroundWorker;
23     function  OnStop(stopCode: TProc): IOmniBackgroundWorker; overload;
24     function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniBackgroundWorker; overload;
25     function  OnStopInvoke(stopCode: TProc): IOmniBackgroundWorker;
26     procedure Schedule(const workItem: IOmniWorkItem; 
27       const workItemConfig: IOmniWorkItemConfig = nil); 
28     function  TaskConfig(const config: IOmniTaskConfig): 
29       IOmniBackgroundWorker;
30     function  Terminate(maxWait_ms: cardinal): boolean;
31     function  WaitFor(maxWait_ms: cardinal): boolean;
32   end;

Background worker supports two notification mechanisms. By calling OnRequestDone, you are setting a synchronous handler, which will be executed in the context of the thread that created the background worker (usually a main thread). In other words – if you call OnRequestDone, you don’t have to worry about thread synchronisation issues. On the other hand, OnRequestDone_Asy handler is executed asynchronously, in the context of the thread that processed the work item.

For performance reasons (for example when terminating the application), the code can prevent execution of event handlers on per-task basis by setting IOmniWorkItem.SkipCompletionHandler to True.

By calling NumTasks, you can set the degree of parallelism. By default, background worker uses only one background task but you can override this behaviour.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

The OnStop function can be used to set up a termination handler - a code that will get executed when all background tasks will have complete execution. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block. To set up task configuration block, call the TaskConfig function.

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke. For example, see Parallel.ForEach.OnStopInvoke.

Calling Terminate will stop background workers. If they stop in maxWait_ms, True will be returned, False otherwise. WaitFor waits for workers to stop (without commanding them to stop beforehand so you would have to call Terminate before WaitFor) and returns True/False just as Terminate does.

2.9.3 Task Initialization

Background worker implements mechanism that can be used by worker tasks to intialize and destroy task-specific structures.

By calling Initialize you can provide a task initializer which is executed once for each worker task before it begins processing work items. Similarly, by calling Finalize you provide the background worker with a task finalizer which is called just before the background task is destroyed.

Both initializer and finalizer will receive a taskState variable where you can store any task-specific data (for example, a class containing multiple task-specific structures). This task state is also available to the work item processor throught the property IOmniWorkItem.TaskState.

2.9.4 Work Item Configuration

You can pass additional configuration parameters to the Schedule method by providing a configuration block, which can be created by calling the Config method. By using this approach, you can set a custom executor method or a custom completion method for each separate work item.

1 type
2   IOmniWorkItemConfig = interface
3     function  OnExecute(const aTask: TOmniBackgroundWorkerDelegate): 
4       IOmniWorkItemConfig;
5     function  OnRequestDone(const aTask: TOmniWorkItemDoneDelegate): 
6       IOmniWorkItemConfig;
7     function  OnRequestDone_Asy(const aTask: TOmniWorkItemDoneDelegate): 
8       IOmniWorkItemConfig;
9   end;

2.9.5 Work Item Interface

CreateWorkItem method returns an IOmniWorkItem interface.

 1 type
 2   IOmniWorkItem = interface
 3     function  DetachException: Exception;
 4     function  FatalException: Exception;
 5     function  IsExceptional: boolean;
 6     property CancellationToken: IOmniCancellationToken
 7       read GetCancellationToken;
 8     property Data: TOmniValue read GetData;
 9     property Result: TOmniValue read GetResult write SetResult;
10     property SkipCompletionHandler: boolean read GetSkipCompletionHandler write
11       SetSkipCompletionHandler;
12     property Task: IOmniTask read GetTask;
13     property TaskState: TOmniValue read GetTaskState;    
14     property UniqueID: int64 read GetUniqueID;
15   end;

It contains input data (Data property), result (Result property) and a unique ID, which is assigned in the CreateWorkItem call. First work item gets ID 1, second ID 2 and so on. This allows for some flexibility when you want to cancel work items. You can cancel one specific item by calling workItem.CancellationToken.Signal or multiple items by calling backgroundWorker.CancelAll( highestIDToBeCancelled) or all items by calling backgroundWorker.CancelAll.

Cancellation is partly automatic and partly cooperative. If the work item that is to be cancelled has not yet reached the execution, the system will prevent it from ever being executed. If, however, work item is already being processed, your code must occasionally check workItem.CancellationToken.IsSignalled and exit if that happens (provided that you want to support cancellation at all). Regardless of how the work item was cancelled, completion handler will still be called and it can check workItem.CancellationToken.IsSignalled to check whether the work item was cancelled prematurely or not.

Any uncaught exception will be stored in the FatalException property. You can detach (and take ownership of) the exception by calling the DetachException and you can test if there was an exception by calling IsExceptional. If IsExceptional returns True, any access to the Result property will raise exception stored in the FatalException property. [In other words – if an unhandled exception occurs in the executor code (in the background thread), it will propagate to the place where you access workItem.Result.]

Property Task provides access to the task executing the work item. Property TaskState returns the value initialized in the task initializer.

If SkipCompletionHandler is set to True when work item is created or during its execution, request handlers for that work item won’t be called.

If SkipCompletionHandler is set to True in the OnRequestDone_Asy handler, then OnRequestDone handler won’t be called.

2.9.6 Examples

Practical examples of Background Worker usage can be found in chapters Background Worker and List Partitioning and OmniThreadLibrary and Databases.

2.10 Pipeline

Pipeline abstraction encapsulates multistage processes in which the data processing can be split into multiple independent stages connected with data queues.

See also demo 41_Pipeline.

Example:

 1 uses
 2   OtlCommon,
 3   OtlCollections,
 4   OtlParallel;
 5 
 6 var
 7   sum: integer;
 8 
 9 sum := Parallel.Pipeline
10   .Stage(
11     procedure (const input, output: IOmniBlockingCollection)
12     var
13       i: integer;
14     begin
15       for i := 1 to 1000000 do
16         output.Add(i);
17     end)
18   .Stage(
19     procedure (const input: TOmniValue; var output: TOmniValue)
20     begin
21       output := input.AsInteger * 3;
22     end)
23   .Stage(
24     procedure (const input, output: IOmniBlockingCollection)
25     var
26       sum: integer;
27       value: TOmniValue;
28     begin
29       sum := 0;
30       for value in input do
31         Inc(sum, value);
32       output.Add(sum);
33     end)
34   .Run.Output.Next;

This example creates a three-stage pipeline. First stage generates numbers from 1 to 1.000,000. Second stage multiplies each number by 3. Third stage adds all numbers together and writes result to the output queue. This result is then stored in the variable sum.

2.10.1 Background

The Pipeline abstraction is appropriate for parallelizing processes that can be split into stages (subprocesses), connected with data queues. Data flows from the (optional) input queue into the first stage, where it is partially processed and then emitted into intermediary queue. First stage then continues execution, processes more input data and outputs more output data. This continues until complete input is processed. Intermediary queue leads into the next stage which does the processing in a similar manner and so on and on. At the end, the data is output into a queue which can be then read and processed by the program that created this multistage process. As a whole, a multistage process acts as a pipeline – data comes in, data comes out (and a miracle occurs in-between ;)).

What is important here is that no stage shares state with any other stage. The only interaction between stages is done with the data passed through the intermediary queues. The quantity of data, however, doesn’t have to be constant. It is entirely possible for a stage to generate more or less data than it received on the input queue.

In a classical single-threaded program the execution plan for a multistage process is very simple.

In a multithreaded environment, however, we can do better than that. Because the stages are largely independent, they can be executed in parallel.

2.10.2 Basics

A pipeline is created by calling Parallel.Pipeline function which returns an IOmniPipeline interface. There are three overloaded versions of this function. The first creates an unconfigured pipeline. The second prepares one or more stages and optionally sets the input queue. The third prepares one or more stages with a different method signature.

 1 TPipelineStageDelegate = reference to procedure (const input, output:
 2   IOmniBlockingCollection);
 3 TPipelineStageDelegateEx = reference to procedure (const input, output:
 4   IOmniBlockingCollection; const task: IOmniTask);
 5 
 6 class function Pipeline: IOmniPipeline; overload;
 7 class function Pipeline(const stages: array of TPipelineStageDelegate;
 8   const input: IOmniBlockingCollection = nil): IOmniPipeline; overload;
 9 class function Pipeline(const stages: array of TPipelineStageDelegateEx;
10   const input: IOmniBlockingCollection = nil): IOmniPipeline; overload;

Stages are implemented as anonymous procedures, procedures or methods taking two queue parameters – one for input and one for output. Except in the first stage where the input queue may not be defined, both are automatically created by the Pipeline implementation and passed to the stage delegate.

Pipeline also supports concept of simple stages where stage method accepts a TOmniValue input and provides a TOmniValue output. In this case, OmniThreadLibrary provides the loop which reads data from the input queue, calls your stage code and writes data to the output queue.

1 TPipelineSimpleStageDelegate = reference to procedure(
2   const input: TOmniValue; var output: TOmniValue);

Simple stage can produce zero or one data element for each input. If the code assigns a value to the output parameter, this value will be written to the output queue. But if the code does not assign a value to this parameter, nothing will be written.

2.10.3 IOmniPipeline Interface

All Parallel.Pipeline overloads return the IOmniPipeline interface.

 1 IOmniPipeline = interface
 2   procedure Cancel;
 3   function  From(const queue: IOmniBlockingCollection): IOmniPipeline;
 4   function  HandleExceptions: IOmniPipeline;
 5   function  NoThrottling: IOmniPipeline;
 6   function  NumTasks(numTasks: integer): IOmniPipeline;
 7   function  OnStop(stopCode: TProc): IOmniPipeline; overload;
 8   function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniPipeline; overload;
 9   function  OnStopInvoke(stopCode: TProc): IOmniPipeline;
10   function  Run: IOmniPipeline;
11   function  Stage(
12     pipelineStage: TPipelineSimpleStageDelegate; 
13     taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
14   function  Stage(
15     pipelineStage: TPipelineStageDelegate; 
16     taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
17   function  Stage(
18     pipelineStage: TPipelineStageDelegateEx; 
19     taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
20   function  Stages(
21     const pipelineStages: array of TPipelineSimpleStageDelegate;
22     taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
23   function  Stages(
24     const pipelineStages: array of TPipelineStageDelegate; 
25     taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
26   function  Stages(
27     const pipelineStages: array of TPipelineStageDelegateEx; 
28     taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
29   function  Throttle(numEntries: integer; unblockAtCount: integer = 0): 
30     IOmniPipeline;
31   function  WaitFor(timeout_ms: cardinal): boolean;
32   property Input: IOmniBlockingCollection read GetInput;
33   property Output: IOmniBlockingCollection read GetOutput;
34   property PipelineStage[idxStage: integer]: IOmniPipelineStage read GetPipelineStage;
35 end;

Various Stage and Stages overloads can be used to define one or more stages and optionally configure them with a task configuration block.

The Run function does all the hard work. It creates queues and sets up OmniThreadLibrary tasks.

The OnStop function assigns a termination handler - a code that will be called when all pipeline stages stop working. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block on the last stage.

Another possibility is to use another variation of OnStop that accepts a delegate with an IOmniTask parameter. You can then use task.Invoke to execute a code in the context of the main thread.

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke. For example, see Parallel.ForEach.OnStopInvoke.

The From function sets the input queue. It will be passed to the first stage in the input parameter. If your code does not call this function, OmniThreadLibrary will automatically create the input queue for you. Input queue can be accessed through the Input property.

The output queue is always created automatically. It can be accessed through the Output property. Even if the last stage doesn’t produce any result this queue can be used to signal the end of computation. [When each stage ends, CompleteAdding is automatically called on the output queue. This allows the next stage to detect the end of input (blocking collection enumerator will exit or TryTake will return false). Same goes on for the output queue.]

The WaitFor function waits for up to timeout_ms milliseconds for the pipeline to finish the work. It returns True if all stages have processed the data before the time interval expires.

The NumTasks function sets the number of parallel execution tasks for the stage(s) just added with the Stage(s) function (IOW, call Stage followed by NumTasks to do that). If it is called before any stage is added, it will specify the default for all stages. Number of parallel execution tasks for a specific stage can then still be overridden by calling NumTasks after the Stage is called. [See the Parallel stages section below for more information.]

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

The Throttle functions sets the throttling parameters for stage(s) just added with the Stage(s) function. Just as the NumTask it affects either the global defaults or just currently added stage(s). By default, throttling is set to 10240 elements. [See the Throttling section below for more info.]

Up to version [3.07] it was not possible to fully disable the throttling mechanism. To fix this problem, version [3.07] introduced method NoThrottle which disables throttling on stage(s) just added with the Stage(s) function.

The HandleExceptions function changes the stage wrapper code so that it will pass exceptions from the input queue to the stage code. Just as the NumTask it affects either the global defaults or just currently added stage(s). [See the Exceptions section below for more info.]

The PipelineStages[] index property allows the program to access input and output pipeline of each stage. This allows the code to insert messages at any point in the pipeline.

1 IOmniPipelineStage = interface
2   property Input: IOmniBlockingCollection read GetInput;
3   property Output: IOmniBlockingCollection read GetOutput;
4 end;
2.10.3.1 Example

An example will help explain all this.

1 Parallel.Pipeline
2   .Throttle(102400)
3   .Stage(StageGenerate)
4   .Stage(StageMult2)
5   .Stages([StageMinus3, StageMod5])
6     .NumTasks(2)
7   .Stage(StageSum)
8   .Run;

First, a global throttling parameter is set. It will be applied to all stages. Two stages are then added, each with a separate call to the Stage function.

Another two stages are then added with one call. They are both set to execute in two parallel tasks. At the end another stage is added and the whole setup is executed.

The complete process will use seven tasks (one for StageGenerate, one for StageMult2, two for StageMinus3, two for StageMod5 and one for StageSum).

2.10.4 Generators, Mutators, and Aggregators

Let’s take a look at three different examples of multiprocessing stages.

A stage may accept no input and just generate an output. This will only happen in the first stage. (A middle stage accepting no input would render the whole pipeline rather useless.)

1 procedure StageGenerate(const input, output: IOmniBlockingCollection);
2 var
3   i: integer;
4 begin
5   for i := 1 to 1000000 do
6     output.Add(i);
7 end;

A stage may also read data from the input and generate the output. Zero, one or more elements could be generated for each input.

1 procedure StageMult2(const input, output: IOmniBlockingCollection);
2 var
3   value: TOmniValue;
4 begin
5   for value in input do
6     output.Add(2 * value.AsInteger);
7 end;

The last example is a stage that reads data from the input, performs some operation on it and returns the aggregation of this data.

 1 procedure StageSum(const input, output: IOmniBlockingCollection);
 2 var
 3   sum  : integer;
 4   value: TOmniValue;
 5 begin
 6   sum := 0;
 7   for value in input do
 8     Inc(sum, value);
 9   output.Add(sum);
10 end;

All examples are just special cases of the general principle – there’s no correlation required between the amount of input data and the amount of output data. There’s also absolutely no requirement that data must be all numbers. Feel free to pass around anything that can be contained in TOmniValue.

2.10.5 Throttling

In some cases, large amount of data may be passed through the multistage process. If one stage is suspended for some time – or if it performs a calculation that is slower than the calculation in the previous stage – this stage’s input queue may fill up with data which can cause lots of memory to be allocated and later released. To even out the data flow, Pipeline uses throttling.

Throttling sets the maximum size of the blocking collection (in TOmniValue units). When the specified quantity of data items is stored in the collection, no more data can be added. The Add function will simply block until the collection is empty enough or CompleteAdding has been called. Collection is deemed to be empty enough when the data count drops below some value which can be either passed as a second parameter to the Throttle function or is calculated as a 3/4 of the maximum size limit if the second parameter is not provided.

2.10.6 Parallel Stages

Usually, one thread is started for each stage in the pipeline. In some specialized cases, however, it may be desirable to run more than one parallel task for each stage.

There’s always only one queue sitting between stages even if there are multiple processing units for a stage. This is easily accomplished by IOmniBlockingCollection supporting multiple readers and multiple writers in a threadesafe manner.

There’s an important caveat, though. If you split a stage into multiple tasks, data will be processed in an indeterminate order. You cannot know how many items will be processed by each task and in which order they will be processed. Even worse – data will exit multitask stage in an indeterminate order (data output from one task will be interleaved with the data from the other task). As of this moment there’s no way to enforce original ordering.

2.10.7 Exceptions

In the Pipeline abstraction, exceptions are also passed over the pipeline (through the interconnecting queues) from a stage to a stage.

If an unhandled exception occurs in a stage, it gets caught by the wrapper code and is enqueued to the output queue. When data element containing exception will be read by the next stage, it will automatically generate an exception which will get passed to the next output queue. In this way, exception will progress through the pipeline and will be inserted into the output queue.

If you want to handle exceptions in one of the stages, call the HandleExceptions function after declaring the stage. (You can also call this function before declaring any stage - that way it will change behaviour for all stages.) You can then call the IsException and AsException functions on the input value to check whether a value contains an exception and to access this exception.

Handling the last (output) stage is slightly different. If you don’t want to reraise exceptions when data is read from the pipeline output, you have to turn the reraise exception flag off on the output queue by calling pipeline.Output.ReraiseExceptions(false).

The following example demonstrates the use of exception-handling functions.

 1 procedure StageException1(const input: TOmniValue; var output: TOmniValue);
 2 begin
 3   output := input.AsInteger * 42;
 4 end;
 5 
 6 procedure StageException2(const input, output: IOmniBlockingCollection);
 7 var
 8   outVal: TOmniValue;
 9   value : TOmniValue;
10 begin
11   for value in input do begin
12     if value.IsException then begin
13       value.AsException.Free;
14       outVal.Clear;
15     end
16     else
17       outVal := 1 / value.AsInteger;
18     if not output.TryAdd(outVal) then
19       break; //for
20   end;
21 end;
22 
23 procedure TfrmOtlParallelExceptions.btnPipeline1Click(Sender: TObject);
24 var
25   pipeline: IOmniPipeline;
26   value   : TOmniValue;
27 begin
28   //Stage 2 should accept and correct stage 1 exception 
29   //(third output will be empty)
30   pipeline := Parallel.Pipeline
31     .Stage(StageException1)
32     .Stage(StageException2)
33       .HandleExceptions
34     .Run;
35 
36   // Provide input
37   with pipeline.Input do begin
38     // few normal elements
39     Add(1);
40     Add(2);
41     // then trigger the exception in the first stage;
42     // this exception should be 'corrected' in the second stage
43     Add('three');
44     Add(4);
45     CompleteAdding;
46   end;
47 
48   // Process output; there should be no exception in the output collection
49   for value in pipeline.Output do
50     Log(value.AsString);
51 end;

See also demo 48_OtlParallelExceptions.

2.10.8 Examples

Practical example of Pipeline usage can be found in chapter Web Download and Database Storage.

2.11 For

For abstraction creates a parallel for loop that iterates over an integer range in multiple threads. To create a parallel for loop, call Parallel.For. Since version [3.06] for abstraction can also iterate over elements of an array. This kind of parallel for loop is created by calling Parallel.For<T>.

See also demos 57_For and 58_ForVsForEach.

Example:

1 PrimeCount.Value := 0;
2 Parallel.For(1, 1000000).Execute(
3   procedure (value: integer)
4   begin
5     if IsPrime(value) then 
6       PrimeCount.Increment;
7     end;
8   end);

This simple program calculates number of prime numbers in the range from one to one million. The PrimeCount object must be capable of atomic increment (a thread-safe increment), which is simple to achieve with the use of the TOmniAlignedInt32 record. The ForEach task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

2.11.1 IOmniParallelSimpleLoop Interface

The Parallel.For function returns an IOmniParallelSimpleLoop interface which is used to configure and run the parallel for loop.

 1 type
 2   TOmniIteratorSimpleSimpleDelegate = reference to procedure(value: integer);
 3   TOmniIteratorSimpleDelegate = reference to procedure(taskIndex, value: integer);
 4   TOmniIteratorSimpleFullDelegate = reference to procedure(const task: IOmniTask; 
 5     taskIndex, value: integer);
 6   TOmniSimpleTaskInitializerDelegate = reference to procedure(taskIndex, fromIndex, 
 7     toIndex: integer);
 8   TOmniSimpleTaskInitializerTaskDelegate = reference to procedure(const task: IOmniTask; 
 9     taskIndex, fromIndex, toIndex: integer);
10   TOmniSimpleTaskFinalizerDelegate = reference to procedure(taskIndex, fromIndex, 
11     toIndex: integer);
12   TOmniSimpleTaskFinalizerTaskDelegate = reference to procedure(const task: IOmniTask; 
13     taskIndex, fromIndex, toIndex: integer);
14 
15   IOmniParallelSimpleLoop = interface
16     function  CancelWith(const token: IOmniCancellationToken): IOmniParallelSimpleLoop;
17     function  NoWait: IOmniParallelSimpleLoop;
18     function  NumTasks(taskCount : integer): IOmniParallelSimpleLoop;
19     function  OnStop(stopCode: TProc): IOmniParallelSimpleLoop; overload;
20     function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelSimpleLoop; overload;
21     function  OnStopInvoke(stopCode: TProc): IOmniParallelSimpleLoop;
22     function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelSimpleLoop;
23     procedure Execute(loopBody: TOmniIteratorSimpleSimpleDelegate); overload;
24     procedure Execute(loopBody: TOmniIteratorSimpleDelegate); overload;
25     procedure Execute(loopBody: TOmniIteratorSimpleFullDelegate); overload;
26     function  Initialize(taskInitializer: TOmniSimpleTaskInitializerDelegate): 
27       IOmniParallelSimpleLoop; overload;
28     function  Initialize(taskInitializer: TOmniSimpleTaskInitializerTaskDelegate): 
29       IOmniParallelSimpleLoop; overload;
30     function  Finalize(taskFinalizer: TOmniSimpleTaskFinalizerDelegate): 
31       IOmniParallelSimpleLoop; overload;
32     function  Finalize(taskFinalizer: TOmniSimpleTaskFinalizerTaskDelegate): 
33       IOmniParallelSimpleLoop; overload;
34     function  WaitFor(maxWait_ms: cardinal): boolean;
35   end; 

Execute accepts the block of code to be executed for each value in the input container. Three method signatures are supported.

During execution, input data is split into N sequential ranges (where N is number of concurrently running tasks). Each tasks is assigned a task index (from 0 to N-1). This task index can be used for task-specific initialization and is accessible from Initialize, Finalize and Execute delegates.

CancelWith enables the cancellation mechanism.

With Initialize you can initialize per-task data before the task begins execution. Such data can be finalized (cleaned up) with Finalize.

If you call the NoWait function, parallel for will start in the background and control will be returned to the main thread immediately. If NoWait is not called, Execute will only return after all tasks have stopped working.

By calling NumTasks you can set up the number of worker tasks. By default, number of tasks is set to [number of cores available to the process] - 1 if NoWait or PreserveOrder modifiers are used and to [number of cores available to the process] in all other cases.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

OnStop sets up a termination handler which will be called after all parallel for tasks will have completed their work. If NoWait function was called, OnStop will be called from one of the worker threads. If, however, NoWait function was not called, OnStop will be called from the thread that created the For abstraction.

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke. For example, see Parallel.ForEach.OnStopInvoke.

TaskConfig sets up a task configuration block. Same task configuration block will be applied to all worker tasks.

The following example uses TaskConfig to set up a message handler which will receive messages sent from For worker tasks.

 1 FParallel := Parallel.For(1, 17)
 2   .TaskConfig(Parallel.TaskConfig.OnMessage(Self))
 3   .NoWait
 4   .OnStop(procedure begin FParallel := nil; end);
 5   
 6 FParallel
 7   .Execute(
 8     procedure (const task: IOmniTask; taskIndex, value: integer)
 9     begin
10       task.Comm.Send(WM_LOG, value);
11     end);

A call to the WaitFor function will wait for up to timeout_ms milliseconds (this value can be set to INFINITE) for all background tasks to terminate. If the tasks terminate in the specified time, WaitFor will return True. Otherwise, it will return False.

2.11.2 Iterating over an array

Since version [3.06] you can use Parallel.For<T>(const arr: TArray<T>) to iterate over elements of an array. Parallel.For<T> returns a IOmniParallelSimpleLoop<T> interface which is almost the same as the IOmniParallelSimpleLoop interface returned from Parallel.For except that it operates on elements of the array (value: T) and not on indexes (value: integer).

 1 type
 2   TOmniIteratorSimpleSimpleDelegate<T> = reference to procedure(var value: T);
 3   TOmniIteratorSimpleDelegate<T> = reference to procedure(taskIndex: integer; var value: T\
 4 );
 5   TOmniIteratorSimpleFullDelegate<T> = reference to procedure(const task: IOmniTask; taskI\
 6 ndex: integer; var value: T);
 7 
 8   IOmniParallelSimpleLoop<T> = interface
 9     function  CancelWith(const token: IOmniCancellationToken): IOmniParallelSimpleLoop<T>;
10     function  NoWait: IOmniParallelSimpleLoop<T>;
11     function  NumTasks(taskCount : integer): IOmniParallelSimpleLoop<T>;
12     function  OnStop(stopCode: TProc): IOmniParallelSimpleLoop<T>; overload;
13     function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelSimpleLoop<T>; overloa\
14 d;
15     function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelSimpleLoop<T>;
16     procedure Execute(loopBody: TOmniIteratorSimpleSimpleDelegate<T>); overload;
17     procedure Execute(loopBody: TOmniIteratorSimpleDelegate<T>); overload;
18     procedure Execute(loopBody: TOmniIteratorSimpleFullDelegate<T>); overload;
19     function  Initialize(taskInitializer: TOmniSimpleTaskInitializerDelegate): IOmniParall\
20 elSimpleLoop<T>; overload;
21     function  Initialize(taskInitializer: TOmniSimpleTaskInitializerTaskDelegate): IOmniPa\
22 rallelSimpleLoop<T>; overload;
23     function  Finalize(taskFinalizer: TOmniSimpleTaskFinalizerDelegate): IOmniParallelSimp\
24 leLoop<T>; overload;
25     function  Finalize(taskFinalizer: TOmniSimpleTaskFinalizerTaskDelegate): IOmniParallel\
26 SimpleLoop<T>; overload;
27     function  WaitFor(maxWait_ms: cardinal): boolean;
28   end;

2.11.3 Examples

Practical example of Parallel.For usage can be found in chapter Using Task Initializers in Parallel For.

2.12 For Each

For Each abstraction creates a parallel for loop that iterates over a range of data (number range, list, queue, dataset …) in multiple threads. To create a parallel for loop, call Parallel.ForEach.

See also demos 35_ParallelFor and 36_OrderedFor.

Example:

1 PrimeCount.Value := 0;
2 Parallel.ForEach(1, 1000000).Execute(
3   procedure (const value: integer)
4   begin
5     if IsPrime(value) then 
6       PrimeCount.Increment;
7     end;
8   end);

This simple program calculates number of prime numbers in the range from one to one million. The PrimeCount object must be capable of atomic increment (a thread-safe increment), which is simple to achieve with the use of the TOmniAlignedInt32 record. The ForEach task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

2.12.1 Cooperation

The main point of the ForEach abstraction is cooperation between the parallel tasks. ForEach goes to great lengths to minimize possible clashes between threads when they access the source data. Except in special occasions (number range, IOmniBlockingCollection), source data is not thread-safe and locking must be used for access.

To minimize this locking, source data is allocated to worker tasks in blocks. ForEach creates a source provider object which will access the source data in a thread-safe manner. This source provider makes sure to always return an appropriately-sized block of source data (size will depend on number of tasks, type of the source data and other factors) when a task runs out of data to process.

Because the source data is allocated in blocks, it is possible that one of the tasks runs out of work while other tasks are still busy. In this case, a task will steal data from one of the other tasks. This approach makes all tasks as busy as possible while minimizing the contention.

The details of this process are further discussed in section Internals below.

2.12.2 Iterating over …

The Parallel class defines many ForEach overloads, each supporting different container type. We will look at them in more detail in the following sections.

2.12.2.1 … Number Ranges

To iterate over a range, pass first and last index to the ForEach call. Optionally, you can pass a step parameter, which defaults to 1. ForEach will then iterate from first to last with a step increment.

1 class function ForEach(low, high: integer; step: integer = 1):
2   IOmniParallelLoop<integer>; overload;

The pseudocode for numeric ForEach could be written as

1 i := low;
2 while ((step > 0) and (i <= high)) or
3       ((step < 0) and (i >= high)) do
4 begin
5   // process 'i' in parallel
6   if low < high then Inc(i, step)
7                 else Dec(i, step);
8 end;
2.12.2.2 … Enumerable Collections

If you want to iterate over a collection (say, a TStringList), you have two possibilities.

One is to use an equivalent of for i := 0 to sl.Count-1 do Something(sl[i]).

1 Parallel.ForEach(0, sl.Count-1).Execute(
2   procedure (const value: integer)
3   begin
4     Something(sl[value]);
5   end);

Another is to use an equivalent of for s in sl do Something(s).

1 Parallel.ForEach(sl).Execute(
2   procedure (const value: TOmniValue)
3   begin
4     Something(value);
5   end);

In the second example, value is passed to the task function as a TOmniValue parameter. In the example above, it will be automatically converted into a string, but sometimes you’ll have to do it manually, by calling value.AsString (or use other appropriate casting function when iterating over a different container).

A variation of the second approach is to tell the ForEach that the container contains strings. OmniThreadLibrary will then do the conversion for you.

1 Parallel.ForEach<string>(sl).Execute(
2   procedure (const value: string)
3   begin
4     Something(value);
5   end);

You may wonder which of those approaches is better. The answer depends on whether you can simultaneously access different items in the container from different threads at the same time. In other words, you have to know whether the container is thread-safe for reading. Luckily, all important Delphi containers (TList, TObjectList, TStringList) fall into this category.

If the container is thread-safe for reading, then the numeric approach (ForEach(0, sl.Count-1)) is much faster than the for..in approach (ForEach(sl)). The speed difference comes from the locking - in the former example ForEach never locks anything and in the latter example locking is used to synchronize access to the container.

However, if the container is not thread-safe for reading, you have to use the latter approach.

There are three ways to iterate over enumerable containers. You can provide the ForEach call with an IEnumerable interface, with an IEnumerator interface or with an enumerable collection itself. In the latter case, OmniThreadLibrary will use RTTI to access the enumerator for the collection. For this to work, enumerator itself must be implemented as an object, not as a record or interface. Luckily, most if not all of the VCL enumerators are implemented in this way.

 1 class function  ForEach(const enumerable: IEnumerable):
 2   IOmniParallelLoop; overload;
 3 class function  ForEach(const enum: IEnumerator): 
 4   IOmniParallelLoop; overload;
 5 class function  ForEach(const enumerable: TObject): 
 6   IOmniParallelLoop; overload;
 7 class function  ForEach<T>(const enumerable: IEnumerable): 
 8   IOmniParallelLoop<T>; overload;
 9 class function  ForEach<T>(const enum: IEnumerator): 
10   IOmniParallelLoop<T>; overload;
11 class function  ForEach<T>(const enumerable: TEnumerable<T>): 
12   IOmniParallelLoop<T>; overload;
13 class function  ForEach<T>(const enum: TEnumerator<T>): 
14   IOmniParallelLoop<T>; overload;
15 class function  ForEach<T>(const enumerable: TObject): 
16   IOmniParallelLoop<T>; overload;
2.12.2.3 … Thread-safe Enumerable Collections

Collection enumeration uses locking to synchronize access to the collection enumerator, which slows down the enumeration process. In some special cases, collection may be enumerable without the locking. To enumerate over such collection, it must implement IOmniValueEnumerable and IOmniValueEnumerator interfaces, which are defined in the OtlCommon unit.

1 class function  ForEach(const enumerable: IOmniValueEnumerable): 
2   IOmniParallelLoop; overload;
3 class function  ForEach(const enum: IOmniValueEnumerator): 
4   IOmniParallelLoop; overload;
5 class function  ForEach<T>(const enumerable: IOmniValueEnumerable): 
6   IOmniParallelLoop<T>; overload;
7 class function  ForEach<T>(const enum: IOmniValueEnumerator): 
8   IOmniParallelLoop<T>; overload;
2.12.2.4 … Blocking Collections

To simplify enumerating over blocking collections, the Parallel class implements two ForEach overloads accepting a blocking collection. Internally, blocking collection is enumerated with the IOmniValueEnumerable interface.

1 class function  ForEach(const source: IOmniBlockingCollection): 
2   IOmniParallelLoop; overload;
3 class function  ForEach<T>(const source: IOmniBlockingCollection): 
4   IOmniParallelLoop<T>; overload;
2.12.2.5 … Anything

As a last resort, the Parallel class implements three ForEach overloads that will (with some help from the programmer) iterate over any data.

The TOmniSourceProvider way is powerful, but complicated.

1 class function  ForEach(const sourceProvider: TOmniSourceProvider): 
2   IOmniParallelLoop; overload;

You must implement a descendant of the TOmniSourceProvider class. All methods must be thread-safe. For more information about the source providers, see the Internals section, below.

 1 TOmniSourceProvider = class abstract
 2 public
 3   function  Count: int64; virtual; abstract;
 4   function  CreateDataPackage: TOmniDataPackage; virtual; abstract;
 5   function  GetCapabilities: TOmniSourceProviderCapabilities; 
 6     virtual; abstract;
 7   function  GetPackage(dataCount: integer; package: TOmniDataPackage): 
 8     boolean; virtual; abstract;
 9   function  GetPackageSizeLimit: integer; virtual; abstract;
10 end; 

As this approach is not for the faint of heart, OmniThreadLibrary provides a slower but much simpler version.

1 class function  ForEach(enumerator: TEnumeratorDelegate): 
2   IOmniParallelLoop; overload;
3 class function  ForEach<T>(enumerator: TEnumeratorDelegate<T>): 
4   IOmniParallelLoop<T>; overload;

Here, you must provide a function that will return next data whenever the ForEach asks for it.

1 TEnumeratorDelegate = reference to function(var next: TOmniValue): boolean;
2 TEnumeratorDelegate<T> = reference to function(var next: T): boolean;

OmniThreadLibrary will provide the synchronisation (locking) so you can be sure this method will only be called from one thread at any time. As you may expect, this will slow things down, but parallelization may still give you a reasonable performance increase if ForEach payload is substantial (i.e. if the method you are executing in the ForEach loop takes some time to execute).

The TEnumeratorDelegate function can also be used as a generator; that is it can calculate the values that will then be processed in the parallel for loop.

2.12.3 Providing External Input

Sometimes, especially when you are dealing with datasets, synchronized access to the container will not be enough. When you are dealing with database connections, datasets etc you can easily run into thread affinity problems - that is the inability of some component to work correctly if it is called from a different thread than the one that it was created in.

[Always initialize database connections and datasets in the thread that will use them. You code may work without that precaution but unless you have extensively tested database components in multiple threads, you should not assume that they will work correctly unless that condition (initialization and use in the same thread) is met.]

In such case, the best way is to provide the input directly from the main thread. There are few different ways to achieve that.

  1. Repackage data into another collection that can be easily consumed in ForEach (TObjectList, TStringList, TOmniBlockingCollection).
  2. Run the ForEach in NoWait mode, then write the data into the input queue and when you run out of data, wait for the ForEach loop to terminate. This approach is also useful, when you want to push ForEach into background and provide it with data from some asynchronous event handler.

An example of the second approach will help clarify the idea.

 1 uses
 2   OtlCommon,
 3   OtlCollections,
 4   OtlParallel;
 5 
 6 procedure Test;
 7 var
 8   i    : integer;
 9   input: IOmniBlockingCollection;
10   loop : IOmniParallelLoop<integer>;
11   wait : IOmniWaitableValue;
12 begin
13   // create the container
14   input := TOmniBlockingCollection.Create;
15   // create the 'end of work' signal
16   wait := CreateWaitableValue;
17   loop := Parallel.ForEach<integer>(input);
18   // set up the termination method which will signal 'end of work'
19   loop.OnStop(
20     procedure
21     begin
22       wait.Signal;
23     end);
24   // start the parallel for loop in NoWait mode
25   loop.NoWait.Execute(
26     procedure (const value: integer)
27     begin
28       // do something with the input value
29       OutputDebugString(PChar(Format('%d', [value])));
30     end
31   );
32   // provide the data to the parallel for loop
33   for i := 1 to 1000 do
34     input.Add(i);
35   // signal to the parallel for loop that there's no more data to process
36   input.CompleteAdding;
37   // wait for the parallel for loop to stop
38   wait.WaitFor;
39   // destroy the parallel for loop
40   loop := nil;
41 end;

2.12.4 IOmniParallelLoop Interface

The Parallel.ForEach returns an IOmniParallelLoop interface which is used to configure and run the parallel for loop.

 1 IOmniParallelLoop = interface
 2   function  Aggregate(defaultAggregateValue: TOmniValue;
 3     aggregator: TOmniAggregatorDelegate): IOmniParallelAggregatorLoop;
 4   function  AggregateSum: IOmniParallelAggregatorLoop;
 5   procedure Execute(loopBody: TOmniIteratorDelegate); overload;
 6   procedure Execute(loopBody: TOmniIteratorTaskDelegate); overload;
 7   function  CancelWith(const token: IOmniCancellationToken): 
 8     IOmniParallelLoop;
 9   function  Initialize(taskInitializer: TOmniTaskInitializerDelegate): 
10     IOmniParallelInitializedLoop;
11   function  Into(const queue: IOmniBlockingCollection): 
12     IOmniParallelIntoLoop; overload;
13   function  NoWait: IOmniParallelLoop;
14   function  NumTasks(taskCount : integer): IOmniParallelLoop;
15   function  OnMessage(eventDispatcher: TObject): 
16     IOmniParallelLoop; overload; deprecated 'use TaskConfig';
17   function  OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent):
18     IOmniParallelLoop; overload; deprecated 'use TaskConfig';
19   function  OnMessage(msgID: word; eventHandler: TOmniOnMessageFunction):
20     IOmniParallelLoop; overload; deprecated 'use TaskConfig';
21   function  OnTaskCreate(taskCreateDelegate: TOmniTaskCreateDelegate): 
22     IOmniParallelLoop; overload;
23   function  OnTaskCreate(taskCreateDelegate: 
24     TOmniTaskControlCreateDelegate): IOmniParallelLoop; overload;
25   function  OnStop(stopCode: TProc): IOmniParallelLoop;
26   function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelLoop; overload;
27   function  OnStopInvoke(stopCode: TProc): IOmniParallelLoop;
28   function  PreserveOrder: IOmniParallelLoop;
29   function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelLoop;
30 end;

ForEach<T> returns an IOmniParallelLoop<T> interface, which is exactly the same as the IOmniParallelLoop except that each method returns the appropriate <T> version of the interface.

Aggregate and AggregateSum are used to implement aggregation. See the Aggregation section, below.

Execute accepts the block of code to be executed for each value in the input container. Two method signatures are supported, both with the <T> variant. One accepts only the iteration value and another accepts the IOmniTask parameter.

1 TOmniIteratorDelegate = reference to procedure(const value: TOmniValue);
2 TOmniIteratorDelegate<T> = reference to procedure(const value: T);
3 TOmniIteratorTaskDelegate =
4   reference to procedure(const task: IOmniTask; const value: TOmniValue);
5 TOmniIteratorTaskDelegate<T> = 
6   reference to procedure(const task: IOmniTask; const value: T);

CancelWith enables the cancellation mechanism.

With Initialize and OnTaskCreate you can initialize per-task data before the task begins execution. See the Task Initialization section, below.

Into sets up the output queue, see Preserving Output Order.

If you call the NoWait function, parallel for will start in the background and control will be returned to the main thread immediately. If NoWait is not called, Execute will only return after all tasks have stopped working.

By calling NumTasks you can set up the number of worker tasks. By default, number of tasks is set to [number of cores available to the process] - 1 if NoWait or PreserveOrder modifiers are used and to [number of cores available to the process] in all other cases.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

OnMessage functions are deprecated, use TaskConfig instead.

OnStop sets up a termination handler which will be called after all parallel for tasks will have completed their work. If NoWait function was called, OnStop will be called from one of the worker threads. If, however, NoWait function was not called, OnStop will be called from the thread that created the ForEach abstraction. This behaviour makes it hard to execute VCL code from the OnStop so release 3.02 introduced another variation accepting a delegate with an IOmniTask parameter.

1 TOmniTaskStopDelegate = reference to procedure (const task: IOmniTask);
2 IOmniParallelLoop = interface
3   function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelLoop;
4     overload;
5 end;

Using this version of OnStop, the termination handler can use task.Invoke to execute some code in the main thread. This, however, requires the ForEach abstraction to stay alive until the Invoke-d code is executed so you must store the ForEach result in a global variable (form field, for example) and destroy it only in the termination handler.

 1 var
 2   loop: IOmniParallelLoop<integer>;
 3 
 4 loop := Parallel.ForEach(1, N).NoWait;
 5 loop.OnStop(
 6   procedure (const task: IOmniTask)
 7   begin
 8     task.Invoke(
 9       procedure
10       begin
11         // do anything
12         loop := nil;
13       end);
14   end);
15 loop.Execute(
16   procedure (const value: integer)
17   begin
18     ...
19   end);

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke.

The code fragment above can be rewritten using OnStopInvoke as follows.

 1 var
 2   loop: IOmniParallelLoop<integer>;
 3 
 4 loop := Parallel.ForEach(1, N).NoWait;
 5 loop.OnStopInvoke(
 6   procedure
 7   begin
 8     // do anything
 9     loop := nil;
10   end);
11 loop.Execute(
12   procedure (const value: integer)
13   begin
14     ...
15   end);

PreserveOrder modifies the parallel for behaviour so that output values are generated in the order of the corresponding input values. See the Preserving Output Order section, below.

TaskConfig sets up a task configuration block. Same task configuration block will be applied to all worker tasks.

The following example uses TaskConfig to set up a message handler which will receive messages sent from ForEach worker tasks.

 1 FParallel := Parallel.ForEach(1, 17)
 2   .TaskConfig(Parallel.TaskConfig.OnMessage(Self))
 3   .NoWait
 4   .OnStop(procedure begin FParallel := nil; end);
 5   
 6 FParallel
 7   .Execute(
 8     procedure (const task: IOmniTask; const value: integer)
 9     begin
10       task.Comm.Send(WM_LOG, value);
11     end);

Messages sent from the worker task are received and dispatched by the IOmniParallelLoop interface. This requires the ForEach abstraction to stay alive until the messages are processed so you must store the ForEach result in a global variable (form field, for example) and destroy it only in the OnStop handler.

Some functions return a different interface. Typically, it only implements the Execute function accepting a different parameter than the ‘normal’ Execute. For example, Aggregate returns the IOmniParallelAggregatorLoop interface.

1 TOmniIteratorIntoDelegate = 
2   reference to procedure(const value: TOmniValue; var result: TOmniValue);
3 
4 IOmniParallelAggregatorLoop = interface
5   function  Execute(loopBody: TOmniIteratorIntoDelegate): TOmniValue;
6 end;

These variants of the IOmniParallelLoop interface will be described in following sections.

2.12.5 Preserving Output Order

When you run a ForEach loop, you can’t tell in advance in which order elements from the input collection will be processed in. For example, the code below will generate all primes from 1 to CMaxPrime and write them into the output queue (primeQueue) in a nondeterministic order.

1 primeQueue := TOmniBlockingCollection.Create;
2 Parallel.ForEach(1, CMaxPrime).Execute(
3   procedure (const value: integer)
4   begin
5     if IsPrime(value) then begin
6       primeQueue.Add(value);
7     end;
8   end);

Sometimes this will represent a big problem and you’ll have to write a sorting function that will resort the output before it can be processed further. To alleviate the problem, IOmniParallelLoop implements the PreserveOrder modifier. When used, ForEach loop will internally sort the results produced in the task method passed to the Execute method.

Using PreserveOrder also forces you to use the Into method which returns the IOmniParallelIntoLoop interface. (As you may expect, there’s also the <T> version of that interface.)

 1 TOmniIteratorIntoDelegate = 
 2   reference to procedure(const value: TOmniValue; var result: TOmniValue);
 3 TOmniIteratorIntoTaskDelegate = 
 4   reference to procedure(const task: IOmniTask; const value: TOmniValue; 
 5                          var result: TOmniValue);
 6 
 7 IOmniParallelIntoLoop = interface
 8   procedure Execute(loopBody: TOmniIteratorIntoDelegate); overload;
 9   procedure Execute(loopBody: TOmniIteratorIntoTaskDelegate); overload;
10 end;

As you can see, the Execute method in IOmniParallelIntoLoop takes a different parameter than the ‘normal’ Execute. Because of that, you’ll have to change a code that is passed to the Execute to return a result.

 1 primeQueue := TOmniBlockingCollection.Create;
 2 Parallel.ForEach(1, CMaxPrime)
 3   .PreserveOrder
 4   .Into(primeQueue)
 5   .Execute(
 6     procedure (const value: integer; var res: TOmniValue)
 7     begin
 8       if IsPrime(value) then
 9         res := value;
10     end);

When using PreserveOrder and Into, ForEach calls your worker code for each input value. If the worker code sets output parameter (res) to any value, it will be inserted into a temporary buffer. Then the magic will happen (see the Internals section, below) and as soon as the appropriate (sorted) value is available in the temporary buffer, it is inserted into the output queue (the one passed to the Into parameter).

You can also use Into without the PreserveOrder. This will give you queue management but no ordering.

2.12.6 Aggregation

Aggregation allows you to collect data from the parallel for tasks and calculate one number that is returned to the user.

Let’s start with an example - and a very bad one! The following code fragment tries to calculate a number of prime numbers between 1 and CMaxPrime.

1 numPrimes := 0;
2 Parallel.ForEach(1, CMaxPrime).Execute(
3   procedure (const value: integer)
4   begin
5     if IsPrime(value) then 
6       Inc(numPrimes);
7   end);

Let’s say it out loud - this code is wrong! Access to the shared variable is not synchronized between threads and that will make the result indeterminable. One way to solve the problem is to wrap the Inc(numPrimes) with locking and another is to use InterlockedIncrement instead of Inc, but both will slow down the execution a lot.

A solution to this problem is to use the Aggregate function.

 1 procedure SumPrimes(var aggregate: TOmniValue; const value: TOmniValue)
 2 begin
 3   aggregate := aggregate.AsInt64 + value.AsInt64;
 4 end;
 5 
 6 procedure CheckPrime(const value: integer; var result: TOmniValue)
 7 begin
 8   if IsPrime(value) then
 9     Result := 1;
10 end;  
11 
12 numPrimes :=
13   Parallel.ForEach(1, CMaxPrime)
14   .Aggregate(0, SumPrimes)
15   .Execute(CheckPrime);

Aggregate takes two parameters - the first is the initial value for the aggregate and the second is an aggregation function - a piece of code that will take the current aggregate value and update it with the value returned from the parallel for task.

When using Aggregate, parallel for task (the code passed to the Execute function) has the same signature as when used with Into. It takes the current iteration value and optionally produces a result.

We could replace the code above with a for loop.

 1 agg := 0;
 2 result.Clear;
 3 for value := 1 to CMaxPrime do begin
 4   CheckPrime(value, result);
 5   if not result.IsEmpty then begin
 6     SumPrimes(agg, result);
 7     result.Clear;  
 8   end;
 9 end;
10 numPrimes := agg;

ForEach executes the aggregation in two stages. While the parallel for task is running, it will use this approach to aggregate data into a local variable. When it runs out of work, it will call the same aggregation method to aggregate this local variable into a global result. In this second stage, however, locking will be used to protect the access to the global result.

Because summation is the most common usage of aggregation, IOmniParallelLoop implements function AggregateSum, which works exactly the same as the SumPrimes above.

 1 numPrimes :=
 2   Parallel.ForEach(1, CMaxPrime)
 3   .AggregateSum
 4   .Execute(
 5     procedure (const value: integer; var result: TOmniValue)
 6     begin
 7       if IsPrime(value) then
 8         Result := 1;
 9     end
10   );

Aggregation function can do something else but the summation. The following code segment uses aggregation to find the length of the longest line in a file.

 1 function GetLongestLineInFile(const fileName: string): integer;
 2 var
 3   maxLength: TOmniValue;
 4   sl       : TStringList;
 5 begin
 6   sl := TStringList.Create;
 7   try
 8     sl.LoadFromFile(fileName);
 9     maxLength := Parallel.ForEach<string>(sl)
10       .Aggregate(0,
11         procedure(var aggregate: TOmniValue; const value: TOmniValue)
12         begin
13           if value.AsInteger > aggregate.AsInteger then
14             aggregate := value.AsInteger;
15         end)
16       .Execute(
17         procedure(const value: string; var result: TOmniValue)
18         begin
19           result := Length(value);
20         end);
21 	Result := maxLength;
22   finally FreeAndNil(sl); end;
23 end;

2.12.7 Cancellation

ForEach has a built-in cancellation mechanism. To use it, create a cancellation token and pass it to the CancelWith function. When a cancellation token gets signalled, all worker loops will complete the current iteration and then stop.

An example of using cancellation token can be found in the chapter Parallel Search in a Tree.

2.12.8 Task Initialization and Finalization

In some cases it would be nice if each parallel task could have some data initialized at beginning and available to the enumerator code (the one passed to the Execute). For those occasions, ForEach implements an Initialize function.

 1 TOmniTaskInitializerDelegate = 
 2   reference to procedure(var taskState: TOmniValue);
 3 TOmniTaskFinalizerDelegate = 
 4   reference to procedure(const taskState: TOmniValue);
 5 TOmniIteratorStateDelegate = 
 6   reference to procedure(const value: TOmniValue; var taskState: TOmniValue);
 7 
 8 IOmniParallelInitializedLoop = interface
 9   function  Finalize(taskFinalizer: TOmniTaskFinalizerDelegate):
10     IOmniParallelInitializedLoop;
11   procedure Execute(loopBody: TOmniIteratorStateDelegate);
12 end;
13 
14 IOmniParallelLoop = interface
15   ...
16   function  Initialize(taskInitializer: TOmniTaskInitializerDelegate):
17     IOmniParallelInitializedLoop;
18 end;

You provide Initialize with task initializer, a procedure that will be called in each parallel worker task when it is created and before it starts enumerating values. This procedure can initialize the taskState parameter with any value.

Initialize returns a IOmniParallelInitializedLoop interface which implements two functions - Finalize and Execute. Call Finalize to set up task finalizer, a procedure that gets called after all values have been enumerated and before the parallel worker task ends its job.

Execute accepts a worker method with two parameters - the first one is the usual value from the enumerated container and the second contains the shared task state.

Of course, all those functions and interfaces are implemented in the <T> version, too.

The following example shows how to calculate number of primes from 1 to CHighPrime by using initializers and finalizers.

 1 var
 2   lockNum  : TOmniCS;
 3   numPrimes: integer;
 4 begin
 5   numPrimes := 0;
 6   Parallel.ForEach(1, CHighPrime)
 7     .Initialize(
 8       procedure (var taskState: TOmniValue)
 9       begin
10         taskState.AsInteger := 0;
11       end)
12     .Finalize(
13       procedure (const taskState: TOmniValue)
14       begin
15         lockNum.Acquire;
16         try
17           numPrimes := numPrimes + taskState.AsInteger;
18         finally lockNum.Release; end;
19       end)
20     .Execute(
21       procedure (const value: integer; var taskState: TOmniValue)
22       begin
23         if IsPrime(value) then
24           taskState.AsInteger := taskState.AsInteger + 1;
25       end
26     );
27 end;

2.12.9 Handling Exceptions

ForEach abstraction does not yet implement any exception handling. You should always wrap task method (code passed to the Execute) in try..except if you expect the code to raise exceptions.

2.12.10 Examples

Practical example of For Each usage can be found in chapters Parallel For with Synchronized Output and Parallel Search in a Tree.

2.13 Fork/Join

Fork/Join abstraction creates a framework for solving divide and conquer algorithms.

See also demos 44_Fork-Join Quicksort and 45_Fork-Join max.

A typical fork/join usage pattern is:

The trick here is that subtasks may spawn new subtasks and so on ad infinitum (probably a little less, or you’ll run out of stack ;) ). For optimum execution, fork/join must therefore guarantee that the code is never running on too many background threads (an optimal value is usually equal to the number of cores in the system) and that those threads don’t run out of work.

To achieve this, ForkJoin creates multiple worker threads and connects them to a computation pool. Computation requests (i.e. subtasks) are added to this pool. They are removed by worker threads, processed and optional new subtasks are added back to the pool.

2.13.1 IOmniForkJoin Interface

Fork/join computation pool is implemented by the IOmniForkJoin interface and is created in the Parallel.ForkJoin factory function. There are two ForkJoin overloads – one is used for computations that don’t return a result and another is used for computations that return a result of some type T.

1 class function ForkJoin: IOmniForkJoin; overload;
2 class function ForkJoin<T>: IOmniForkJoin<T>; overload;

Both interfaces declare just few methods.

 1 IOmniForkJoin = interface
 2   function  Compute(action: TOmniForkJoinDelegate): IOmniCompute;
 3   function  NumTasks(numTasks: integer): IOmniForkJoin;
 4   function  TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin;
 5 end; 
 6 
 7 IOmniForkJoin<T> = interface
 8   function  Compute(action: TOmniForkJoinDelegate<T>): IOmniCompute<T>;
 9   function  NumTasks(numTasks: integer): IOmniForkJoin<T>;
10   function  TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin<T>;
11 end; 

Compute creates new subtasks executing the action and returns control interface IOmniCompute (or IOmniCompute<T>).

By calling NumTasks, you can set the degree of parallelism. By default, fork/join uses as many threads as there are cores accessible by the process.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

TaskConfig method is used to set up a task configuration block, applied to each worker task.

2.13.2 IOmniCompute Interface

The IOmniCompute interface provides interaction with the computation unit that doesn’t return a result.

1 IOmniCompute = interface
2   procedure Execute;
3   function  IsDone: boolean;
4   procedure Await;
5 end; 

Execute executes the action that was provided to the Compute method. This method is used internally and should not be called from the user code.

IsDone verifies if the computation unit has completed the work.

Await waits for the computation unit to complete the work.

2.13.3 IOmniCompute<T> Interface

The IOmniCompute<T> interface provides interaction with the computation unit that returns a result.

1 IOmniCompute<T> = interface
2   procedure Execute;
3   function  IsDone: boolean;
4   function  TryValue(timeout_ms: cardinal; var value: T): boolean;
5   function  Value: T;
6 end;

Execute executes the action that was provided to the Compute method. This method is used internally and should not be called from the user code.

IsDone verifies if the computation unit has completed the work.

TryValue waits for up to timeout_ms milliseconds (or as long as needed if INFINITE is passed for this parameter) for the computation to complete and returns the result (if available) in the value parameter. Returns True if result is known, False otherwise.

Value returns the computation unit result. This function will block until the result is available.

2.13.4 Exceptions

There’s no special exception handling built into the fork/join abstraction at the moment. You should always catch and handle exceptions inside the action passed to the Compute method.

2.13.5 Examples

Practical examples of Fork/Join usage can be found in chapter QuickSort and Parallel Max.

2.14 Map

Map abstraction creates a parallellized mapping function which iterates over a source data array. To create a map abstraction, call Parallel.Map.

See also demos 60_Map.

Example:

1 odds := Parallel.Map<integer,string>(numbers,
2   function (const source: integer; var dest: string): boolean
3   begin
4     Result := Odd(source);
5     if Result then
6       dest := IntTostr(source);
7   end);

This simple code fragment takes an array of integers numbers: TArray<integer>, converts all odd values in that array to string and assigns resulting TArray<string> to odds. Worker code runs in parallel on all available cores. The filtering function is coded as an anonymous method but you can also use a normal method or a normal procedure.

The Parallel class defines two Map overloads. The first creates an IOmniParallelMapper<T1,T2> interface which you can then further configure before workers are started.

1 type
2   TMapProc<T1,T2> = reference to function(const source: T1; var target: T2): boolean;
3 	
4   Parallel = class
5     class function Map<T1,T2>: IOmniParallelMapper<T1,T2>; overload;
6     class function Map<T1,T2>(const source: TArray<T1>;
7       mapper: TMapProc<T1,T2>): TArray<T2>; overload;
8     ...
9   end;

The second overload is just a shorthand which runs mapping function on all cores and waits for the result. It is defined in the OtlParallel as foollows.

 1 class function Parallel.Map<T1, T2>(const source: TArray<T1>; mapper: TMapProc<T1,T2>):
 2   TArray<T2>;
 3 var
 4   map: IOmniParallelMapper<T1,T2>;
 5 begin
 6   map := Parallel.Map<T1,T2>.Source(source);
 7   map.Execute(mapper);
 8   map.WaitFor(INFINITE);
 9   Result := map.Result;
10 end;

2.14.1 IOmniParallelMapper<T1,T2> interface

The IOmniParallelMapper<T1,T2> interface provides methods that configure and control the parallel mapper.

 1 IOmniParallelMapper<T1,T2> = interface
 2   function  Execute(mapper: TMapProc<T1,T2>): IOmniParallelMapper<T1,T2>;
 3   function  NoWait: IOmniParallelMapper<T1,T2>;
 4   function  NumTasks(numTasks: integer): IOmniParallelMapper<T1,T2>;
 5   function  OnStop(stopCode: TProc): IOmniParallelMapper<T1,T2>; overload;
 6   function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelMapper<T1,T2>; overload;
 7   function  OnStopInvoke(stopCode: TProc): IOmniParallelMapper<T1,T2>;
 8   function  Result: TArray<T2>;
 9   function  Source(const data: TArray<T1>; 
10     makeCopy: boolean = false): IOmniParallelMapper<T1,T2>;
11   function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelMapper<T1,T2>;
12   function  WaitFor(maxWait_ms: cardinal): boolean;
13 end;

Source sets the input data array. If makeCopy parameter is set, data is copied to an internal array. If not (default), original data is referenced from all worker threads.

TaskConfig sets up a task configuration block. Same task configuration block will be applied to all worker tasks.

By calling NumTasks you can set up the number of worker tasks. By default, number of tasks is set to [number of cores available to the process] - 1 if NoWait modifier is used and to [number of cores available to the process] if NoWait is not used.

If NumTasks receives a positive parameter (> 0), then the number of worker tasks is set to that number. For example, NumTasks(16) starts 16 worker tasks, even if that is more then number of available cores.

If NumTasks receives a negative parameter (< 0), it specifies number of cores that should be reserved for other use. Number of worker task is then set to <number of available cores> - <number of reserved cores>. If, for example, current process can use 16 cores and NumTasks(-4) is used, only 12 (16-4) worker tasks will be started.

Parameter 0 is not allowed and results in an exception.

Execute starts the worker tasks. As a parameter it takes a mapping function which is executed in worker tasks.

If you call the NoWait function, parallel for will start in the background and control will be returned to the main thread immediately. If NoWait is not called, Execute will only return after all tasks have stopped working.

OnStop sets up a termination handler which will be called after all parallel tasks will have completed their work. If NoWait function was called, OnStop will be called from one of the worker threads. If, however, NoWait function was not called, OnStop will be called from the thread that created the Map abstraction.

Release [3.07.2] introduced method OnStopInvoke which works like OnStop except that the termination handler is automatically executed in the context of the owner thread via implicit Invoke. For example, see Parallel.ForEach.OnStopInvoke.

A call to the WaitFor function will wait for up to timeout_ms milliseconds (this value can be set to INFINITE) for all background tasks to terminate. If the tasks terminate in the specified time, WaitFor will return True. Otherwise, it will return False.

The Result function returns the resulting array. This function should only be called after all worker threads have finished their work (i.e. from OnStop, after successfull WaitFor, or after Execute if NoWait is not used).

2.15 Timed Task

Timed Task abstraction creates a threaded timer. To create a timed task, call Parallel.TimedTask.

See also demos 65_TimedTask.

Example:

1 FTimedTask := Parallel.TimedTask.Every(5000).Execute(
2   procedure (const task: IOmniTask)
3   begin
4     task.Comm.Send(MSG_WATCHDOG, 'Still alive ...');
5   end;

This code fragment creates a timed task which every 5 seconds (5000 milliseconds) sends a message MSG_WATCHDOG to its owner.

2.15.1 IOmniTimedTask interface

The Parallel.TimedTask function returns an IOmniTimedTask interface which is used to configure and control timed task.

 1 type
 2   TOmniTaskDelegate = reference to procedure(const task: IOmniTask);
 3 
 4   IOmniTimedTask = interface
 5     function  Every(interval_ms: integer): IOmniTimedTask;
 6     function  Execute(const aTask: TProc): IOmniTimedTask; overload;
 7     function  Execute(const aTask: TOmniTaskDelegate): IOmniTimedTask; overload;
 8     procedure ExecuteNow;
 9     procedure Start;
10     procedure Stop;
11     function  TaskConfig(const config: IOmniTaskConfig): IOmniTimedTask;
12     function  Terminate(maxWait_ms: cardinal): boolean;
13     function  WaitFor(maxWait_ms: cardinal): boolean;
14     property Active: boolean;
15     property Interval: integer;
16   end;

Every sets the timer interval (in milliseconds). Interval must be a positive number. If you set it to a value which is less than or equal to zero, timer will be disabled. (It is, however, advisable to use Stop or Active := false for this purpose as that more clearly states the intention of the code.)

When a timed task is created, its interval is set to 0 ms.

You can inspect and modify the interval by using the Interval property. Setting an interval by assigning a value to this property (Interval := value;) is equivalent to calling Every(value).

Execute specifies the code that will be called each Interval milliseconds (timer event handler code). This can be a parameterless anonymous method (or normal procedure, or an object method) or procedure/method/anonymous method accepting one parameter of type IOmniTask which holds the interface of the background task executing the timer method.

After you set an interval (even if the current value is equal to the previous value), the background task will wait for Interval milliseconds before calling the timer event handler code.

If you want to execute timer event handler immediately, call the ExecuteNow method. This will also reset the timer so that next automatic invocation of the timer event handler will occur Interval millisconds from now.

Start will start (enable) the timer. To stop (disable) the timer call the Stop method. Current state of the timer is available via the Active property.

You can also start/stop a timer by changing the Active property. Setting Active := true is equivalent to calling Start and setting Active := false is equivalent to calling Stop.

A timer is automatically started (enabled) when you call Execute – if and only if the Interval has already been set.

If you want to create a stopped timer with a very small Interval value, it is advisable to use the following pattern:

1 timedTask := Parallel.TimedTask.Execute(SomeCode);
2 timedTask.Stop;
3 timedTask.Interval := 1;  

TaskConfig sets up a task configuration block.

Calling Terminate will stop the timed task. If it stops in maxWait_ms, True will be returned, False otherwise. WaitFor waits for the task to stop (without commanding it to stop beforehand so you would have to call Terminate before WaitFor) and returns True/False just as Terminate does.

It is usually enough to just set the IOmniTimedTask instance to nil as that effectively calls the Terminate(INFINITE).