For Each

For Each abstraction creates a parallel for loop that iterates over a range of data (number range, list, queue, dataset …) in multiple threads. To create a parallel for loop, call Parallel.ForEach.



When you use Parallel.ForEach, OmniThreadLibrary starts multiple background tasks and connects them to the source through a serialization mechanism. Output is optionally sorted in the order of the input. By default, ForEach waits for all background threads to complete before the control is returned to the caller.

See also demos 35_ParallelFor and 36_OrderedFor.

Example:

PrimeCount.Value := 0;
Parallel.ForEach(1, 1000000).Execute(
  procedure (const value: integer)
  begin
    if IsPrime(value) then 
      PrimeCount.Increment;
    end;
  end);

This simple program calculates number of prime numbers in the range from one to one million. The PrimeCount object must be capable of atomic incrementation (a thread-safe increment), which is simple to achieve with the use of the TGp4AlignedInt record. The ForEach task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

Cooperation

The main point of the ForEach abstraction is cooperation between the parallel tasks. ForEach goes to great lengths to minimize possible clashes between threads when they access the source data. Except in special occasions (number range, IOmniBlockingCollection), source data is not thread-safe and locking must be used for access.

To minimize this locking, source data is allocated to worker tasks in blocks. ForEach creates a source provider object which will access the source data in a thread-safe manner. This source provider makes sure to always return an appropriately-sized block of source data (size will depend on number of tasks, type of the source data and other factors) when a task runs out of data to process.

Because the source data is allocated in blocks, it is possible that one of the tasks runs out of work while other tasks are still busy. In this case, a task will steal data from one of the other tasks. This approach makes all tasks as busy as possible while minimizing the contention.

The details of this process are further discussed in section Internals below.

Iterating over ...

The Parallel class defines many ForEach overloads, each supporting different container type. We will look at them in more detail in the following sections.

... Number Ranges

To iterate over a range, pass first and last index to the ForEach call. Optionally, you can pass a step parameter, which defaults to 1. ForEach will then iterate from first to last with a step increment.

class function ForEach(low, high: integer; step: integer = 1):
  IOmniParallelLoop<integer>; overload;

The pseudocode for numeric ForEach could be written as

i := low;
while ((step > 0) and (i <= high)) or
      ((step < 0) and (i >= high)) do
begin
  // process 'i' in parallel
  if low < high then Inc(i, step)
                else Dec(i, step);
end;

... Enumerable Collections

If you want to iterate over a collection (say, a TStringList), you have two possibilities.

One is to use an equivalent of for i := 0 to sl.Count-1 do Something(sl[i]).

Parallel.ForEach(0, sl.Count-1).Execute(
  procedure (const value: integer)
  begin
    Something(sl[value]);
  end);

Another is to use an equivalent of for s in sl do Something(s).

Parallel.ForEach(sl).Execute(
  procedure (const value: TOmniValue)
  begin
    Something(value);
  end);

In the second example, value is passed to the task function as a ''TOmniValue'' parameter. In the example above, it will be automatically converted into a string, but sometimes you'll have to do it manually, by calling value.AsString (or use other appropriate casting function when iterating over a different container).

A variation of the second approach is to tell the ForEach that the container contains strings. OmniThreadLibrary will then do the conversion for you.

Parallel.ForEach<string>(sl).Execute(
  procedure (const value: string)
  begin
    Something(value);
  end);

You may wonder which of those approaches is better. The answer depends on whether you can simultaneously access different items in the container from different threads at the same time. In other words, you have to know whether the container is thread-safe for reading. Luckily, all important Delphi containers (TList, TObjectList, TStringList) fall into this category.

If the container is thread-safe for reading, then the numeric approach (ForEach(0, sl.Count-1)) is much faster than the for..in approach (ForEach(sl)). The speed difference comes from the locking - in the former example ForEach never locks anything and in the latter example locking is used to synchronize access to the container.

However, if the container is not thread-safe for reading, you have to use the latter approach.

There are three ways to iterate over enumerable containers. You can provide the ForEach call with an IEnumerable interface, with an IEnumerator interface or with an enumerable collection itself. In the latter case, OmniThreadLibrary will use RTTI to access the enumerator for the collection. For this to work, enumerator itself must be implemented as an object, not as a record or interface. Luckily, most if not all of the VCL enumerators are implemented in this way.

class function  ForEach(const enumerable: IEnumerable):
  IOmniParallelLoop; overload;
class function  ForEach(const enum: IEnumerator): 
  IOmniParallelLoop; overload;
class function  ForEach(const enumerable: TObject): 
  IOmniParallelLoop; overload;
class function  ForEach<T>(const enumerable: IEnumerable): 
  IOmniParallelLoop<T>; overload;
class function  ForEach<T>(const enum: IEnumerator): 
  IOmniParallelLoop<T>; overload;
class function  ForEach<T>(const enumerable: TEnumerable<T>): 
  IOmniParallelLoop<T>; overload;
class function  ForEach<T>(const enum: TEnumerator<T>): 
  IOmniParallelLoop<T>; overload;
class function  ForEach<T>(const enumerable: TObject): 
  IOmniParallelLoop<T>; overload;

... Thread-safe Enumerable Collections

Collection enumeration uses locking to synchronize access to the collection enumerator, which slows down the enumeration process. In some special cases, collection may be enumerable without the locking. To enumerate over such collection, it must implement IOmniValueEnumerable and IOmniValueEnumerator interfaces, which are defined in the OtlCommon unit.

class function  ForEach(const enumerable: IOmniValueEnumerable): 
  IOmniParallelLoop; overload;
class function  ForEach(const enum: IOmniValueEnumerator): 
  IOmniParallelLoop; overload;
class function  ForEach<T>(const enumerable: IOmniValueEnumerable): 
  IOmniParallelLoop<T>; overload;
class function  ForEach<T>(const enum: IOmniValueEnumerator): 
  IOmniParallelLoop<T>; overload;

... Blocking Collections

To simplify enumerating over blocking collections, the Parallel class implements two ForEach overloads accepting a blocking collection. Internally, blocking collection is enumerated with the IOmniValueEnumerable interface.

class function  ForEach(const source: IOmniBlockingCollection): 
  IOmniParallelLoop; overload;
class function  ForEach<T>(const source: IOmniBlockingCollection): 
  IOmniParallelLoop<T>; overload;

... Anything

As a last resort, the Parallel class implements three ForEach overloads that will (with some help from the programmer) iterate over any data.

The TOmniSourceProvider way is powerful, but complicated.

class function  ForEach(const sourceProvider: TOmniSourceProvider): 
  IOmniParallelLoop; overload;

You must implement a descendant of the TOmniSourceProvider class. All methods must be thread-safe. For more information about the source providers, see the Internals section, below.

TOmniSourceProvider = class abstract
public
  function  Count: int64; virtual; abstract;
  function  CreateDataPackage: TOmniDataPackage; virtual; abstract;
  function  GetCapabilities: TOmniSourceProviderCapabilities; 
    virtual; abstract;
  function  GetPackage(dataCount: integer; package: TOmniDataPackage): 
    boolean; virtual; abstract;
  function  GetPackageSizeLimit: integer; virtual; abstract;
end; 

As this approach is not for the faint of heart, OmniThreadLibrary provides a slower but much simpler version.

class function  ForEach(enumerator: TEnumeratorDelegate): 
  IOmniParallelLoop; overload;
class function  ForEach<T>(enumerator: TEnumeratorDelegate<T>): 
  IOmniParallelLoop<T>; overload;

Here, you must provide a function that will return next data whenever the ForEach asks for it.

TEnumeratorDelegate = reference to function(var next: TOmniValue): boolean;
TEnumeratorDelegate<T> = reference to function(var next: T): boolean;

OmniThreadLibrary will provide the synchronisation (locking) so you can be sure this method will only be called from one thread at any time. As you may expect, this will slow things down, but parallelization may still give you a reasonable performance increase if ForEach payload is substantial (i.e. if the method you are executing in the ForEach loop takes some time to execute).

The TEnumeratorDelegate function can also be used as a generator; that is it can calculate the values that will then be processed in the parallel for loop.

Providing External Input

Sometimes, especially when you are dealing with datasets, synchronized access to the container will not be enough. When you are dealing with database connections, datasets etc you can easily run into thread affinity problems - that is the inability of some component to work correctly if it is called from a different thread than the one that it was created in.

[Always initialize database connections and datasets in the thread that will use them. You code may work without that precaution but unless you have extensively tested database components in multiple threads, you should not assume that they will work correctly unless that condition (initialization and use in the same thread) is met.]

In such case, the best way is to provide the input directly from the main thread. There are few different ways to achieve that.

1. Repackage data into another collection that can be easily consumed in ForEach (TObjectList, TStringList, TOmniBlockingCollection).

2. Run the ForEach in NoWait mode, then write the data into the input queue and when you run out of data, wait for the ForEach loop to terminate. This approach is also useful, when you want to push ForEach into background and provide it with data from some asynchronous event handler.

An example of the second approach will help clarify the idea.

uses
  OtlCommon,
  OtlCollections,
  OtlParallel;
 
procedure Test;
var
  i    : integer;
  input: IOmniBlockingCollection;
  loop : IOmniParallelLoop<integer>;
  wait : IOmniWaitableValue;
begin
  // create the container
  input := TOmniBlockingCollection.Create;
  // create the 'end of work' signal
  wait := CreateWaitableValue;
  loop := Parallel.ForEach<integer>(input);
  // set up the termination method which will signal 'end of work'
  loop.OnStop(
    procedure
    begin
      wait.Signal;
    end);
  // start the parallel for loop in NoWait mode
  loop.NoWait.Execute(
    procedure (const value: integer)
    begin
      // do something with the input value
      OutputDebugString(PChar(Format('%d', [value])));
    end
  );
  // provide the data to the parallel for loop
  for i := 1 to 1000 do
    input.Add(i);
  // signal to the parallel for loop that there's no more data to process
  input.CompleteAdding;
  // wait for the parallel for loop to stop
  wait.WaitFor;
  // destroy the parallel for loop
  loop := nil;
end;

IOmniParallelLoop

The Parallel.ForEach returns an IOmniParallelLoop interface which is used to configure and run the parallel for loop.

IOmniParallelLoop = interface
  function  Aggregate(defaultAggregateValue: TOmniValue;
    aggregator: TOmniAggregatorDelegate): IOmniParallelAggregatorLoop;
  function  AggregateSum: IOmniParallelAggregatorLoop;
  procedure Execute(loopBody: TOmniIteratorDelegate); overload;
  procedure Execute(loopBody: TOmniIteratorTaskDelegate); overload;
  function  CancelWith(const token: IOmniCancellationToken): 
    IOmniParallelLoop;
  function  Initialize(taskInitializer: TOmniTaskInitializerDelegate): 
    IOmniParallelInitializedLoop;
  function  Into(const queue: IOmniBlockingCollection): 
    IOmniParallelIntoLoop; overload;
  function  NoWait: IOmniParallelLoop;
  function  NumTasks(taskCount : integer): IOmniParallelLoop;
  function  OnMessage(eventDispatcher: TObject): 
    IOmniParallelLoop; overload; deprecated 'use TaskConfig';
  function  OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent):
    IOmniParallelLoop; overload; deprecated 'use TaskConfig';
  function  OnMessage(msgID: word; eventHandler: TOmniOnMessageFunction):
    IOmniParallelLoop; overload; deprecated 'use TaskConfig';
  function  OnTaskCreate(taskCreateDelegate: TOmniTaskCreateDelegate): 
    IOmniParallelLoop; overload;
  function  OnTaskCreate(taskCreateDelegate: 
    TOmniTaskControlCreateDelegate): IOmniParallelLoop; overload;
  function  OnStop(stopCode: TProc): IOmniParallelLoop;
  function  PreserveOrder: IOmniParallelLoop;
  function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelLoop;
end;

ForEach<T> returns an IOmniParallelLoop<T> interface, which is exactly the same as the IOmniParallelLoop except that each method returns the appropriate <T> version of the interface.

Aggregate and AggregateSum are used to implement aggregation. See the Aggregation section, below.

Execute accepts the block of code to be executed for each value in the input container. Two method signatures are supported, both with the <T> variant. One accepts only the iteration value and another accepts the ''IOmniTask'' parameter.

TOmniIteratorDelegate = reference to procedure(const value: TOmniValue);
TOmniIteratorDelegate<T> = reference to procedure(const value: T);
TOmniIteratorTaskDelegate =
  reference to procedure(const task: IOmniTask; const value: TOmniValue);
TOmniIteratorTaskDelegate<T> = 
  reference to procedure(const task: IOmniTask; const value: T);

CancelWith enables the cancellation mechanism.

With Initialize and OnTaskCreate you can initialize per-task data before the task begins execution. See the Task Initialization section, below.

Into sets up the output queue, see Preserving Output Order.

If you call the NoWait function, parallel for will start in the background and control will be returned to the main thread immediately. If NoWait is not called, Execute will only return after all tasks have stopped working.

By calling NumTasks you can set up the number of worker tasks. By default, number of tasks is set to [number of cores available to the process] - 1 if NoWait or PreserveOrder modifiers are used and to [number of cores available to the process] in all other cases.

OnMessage functions are deprecated, use TaskConfig instead.

OnStop sets up a termination handler which will be called after all parallel for tasks will have completed their work. If NoWait function was called, OnStop will be called from one of the worker threads. If, however, NoWait function was not called, OnStop will be called from the thread that created the ForEach abstraction. This behaviour makes it hard to execute VCL code from the OnStop so release 3.02 introduced another variation accepting a delegate with an IOmniTask parameter.

TOmniTaskStopDelegate = reference to procedure (const task: IOmniTask);
IOmniParallelLoop = interface
  function  OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelLoop;
    overload;
end;

Using this version of OnStop, the termination handler can use ''task.Invoke'' to execute some code in the main thread. This, however, requires the ForEach abstraction to stay alive until the Invoke-d code is executed so you must store the ForEach result in a global variable (form field, for example) and destroy it only in the termination handler.

var
  loop: IOmniParallelLoop<integer>;
 
loop := Parallel.ForEach(1, N).NoWait;
loop.OnStop(
  procedure (const task: IOmniTask)
  begin
    task.Invoke(
      procedure
      begin
        // do anything
        loop := nil;
      end);
  end);
loop.Execute(
  procedure (const value: integer)
  begin
    ...
  end);

PreserveOrder modifies the parallel for behaviour so that output values are generated in the order of the corresponding input values. See the Preserving Output Order section, below.

TaskConfig sets up a task configuration block. Same task configuration block will be applied to all ForEach worker tasks.

The following example uses TaskConfig to set up a message handler which will receive messages sent from ForEach worker tasks.

FParallel := Parallel.ForEach(1, 17)
  .TaskConfig(Parallel.TaskConfig.OnMessage(Self))
  .NoWait
  .OnStop(procedure begin FParallel := nil; end);
FParallel
  .Execute(
    procedure (const task: IOmniTask; const value: integer)
    begin
      task.Comm.Send(WM_LOG, value);
    end);

Messages sent from the worker task are received and dispatched by the IOmniParallelLoop interface. This requires the ForEach abstraction to stay alive until the messages are processed so you must store the ForEach result in a global variable (form field, for example) and destroy it only in the OnStop handler.

Some functions return a different interface. Typically, it only implements the Execute function accepting a different parameter than the 'normal' Execute. For example, Aggregate returns the IOmniParallelAggregatorLoop interface.

TOmniIteratorIntoDelegate = 
  reference to procedure(const value: TOmniValue; var result: TOmniValue);
 
IOmniParallelAggregatorLoop = interface
  function  Execute(loopBody: TOmniIteratorIntoDelegate): TOmniValue;
end;

These variants of the IOmniParallelLoop interface will be described in following sections.

Preserving Output Order

When you run a ForEach loop, you can't tell in advance in which order elements from the input collection will be processed in. For example, the code below will generate all primes from 1 to CMaxPrime and write them into the output queue (primeQueue) in a nondeterministic order.

primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, CMaxPrime).Execute(
  procedure (const value: integer)
  begin
    if IsPrime(value) then begin
      primeQueue.Add(value);
    end;
  end);

Sometimes this will represent a big problem and you'll have to write a sorting function that will resort the output before it can be processed further. To alleviate the problem, IOmniParallelLoop implements the PreserveOrder modifier. When used, ForEach loop will internally sort the results produced in the task method passed to the Execute method.

Using PreserveOrder also forces you to use the Into method which returns the IOmniParallelIntoLoop interface. (As you may expect, there's also the <T> version of that interface.)

TOmniIteratorIntoDelegate = 
  reference to procedure(const value: TOmniValue; var result: TOmniValue);
TOmniIteratorIntoTaskDelegate = 
  reference to procedure(const task: IOmniTask; const value: TOmniValue; 
                         var result: TOmniValue);
 
IOmniParallelIntoLoop = interface
  procedure Execute(loopBody: TOmniIteratorIntoDelegate); overload;
  procedure Execute(loopBody: TOmniIteratorIntoTaskDelegate); overload;
end;

As you can see, the Execute method in IOmniParallelIntoLoop takes a different parameter than the 'normal' Execute. Because of that, you'll have to change a code that is passed to the Execute to return a result.

primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, CMaxPrime)
  .PreserveOrder
  .Into(primeQueue)
  .Execute(
    procedure (const value: integer; var res: TOmniValue)
    begin
      if IsPrime(value) then
        res := value;
    end);

When using PreserveOrder and Into, ForEach calls your worker code for each input value. If the worker code sets output parameter (res) to any value, it will be inserted into a temporary buffer. Then the magic will happen (see the Internals section, below) and as soon as the appropriate (sorted) value is available in the temporary buffer, it is inserted into the output queue (the one passed to the Into parameter).

You can also use Into without the PreserveOrder. This will give you queue management but no ordering.

Aggregation

Aggregation allows you to collect data from the parallel for tasks and calculate one number that is returned to the user.

Let's start with an example - and a very bad one! The following code fragment tries to calculate a number of prime numbers between 1 and CMaxPrime.

numPrimes := 0;
Parallel.ForEach(1, CMaxPrime).Execute(
  procedure (const value: integer)
  begin
    if IsPrime(value) then 
      Inc(numPrimes);
  end);

Let's say it out loud - this code is wrong! Access to the shared variable is not synchronized between threads and that will make the result indeterminable. One way to solve the problem is to wrap the Inc(numPrimes) with locking and another is to use InterlockedIncrement instead of Inc, but both will slow down the execution a lot.

A solution to this problem is to use the Aggregate function.

procedure SumPrimes(var aggregate: TOmniValue; const value: TOmniValue)
begin
  aggregate := aggregate.AsInt64 + value.AsInt64;
end;
 
procedure CheckPrime(const value: integer; var result: TOmniValue)
begin
  if IsPrime(value) then
    Result := 1;
end;  
 
numPrimes :=
  Parallel.ForEach(1, CMaxPrime)
  .Aggregate(0, SumPrimes)
  .Execute(CheckPrime);

Aggregate takes two parameters - the first is the initial value for the aggregate and the second is an aggregation function - a piece of code that will take the current aggregate value and update it with the value returned from the parallel for task.

When using Aggregate, parallel for task (the code passed to the Execute function) has the same signature as when used with Into. It takes the current iteration value and optionally produces a result.

We could replace the code above with a for loop.

agg := 0;
result.Clear;
for value := 1 to CMaxPrime do begin
  CheckPrime(value, result);
  if not result.IsEmpty then begin
    SumPrimes(agg, result);
    result.Clear;  
  end;
end;
numPrimes := agg;

ForEach executes the aggregation in two stages. While the parallel for task is running, it will use this approach to aggregate data into a local variable. When it runs out of work, it will call the same aggregation method to aggregate this local variable into a global result. In this second stage, however, locking will be used to protect the access to the global result.

Because summation is the most common usage of aggregation, IOmniParallelLoop implements function AggregateSum, which works exactly the same as the SumPrimes above.

numPrimes :=
  Parallel.ForEach(1, CMaxPrime)
  .AggregateSum
  .Execute(
    procedure (const value: integer; var result: TOmniValue)
    begin
      if IsPrime(value) then
        Result := 1;
    end
  );

Aggregation function can do something else but the summation. The following code segment uses aggregation to find the length of the longest line in a file.

function GetLongestLineInFile(const fileName: string): integer;
var
  maxLength: TOmniValue;
  sl       : TStringList;
begin
  sl := TStringList.Create;
  try
    sl.LoadFromFile(fileName);
    maxLength := Parallel.ForEach<string>(sl)
      .Aggregate(0,
        procedure(var aggregate: TOmniValue; const value: TOmniValue)
        begin
          if value.AsInteger > aggregate.AsInteger then
            aggregate := value.AsInteger;
        end)
      .Execute(
        procedure(const value: string; var result: TOmniValue)
        begin
          result := Length(value);
        end);
	Result := maxLength;
  finally FreeAndNil(sl); end;
end;

Cancellation

ForEach has a built-in cancellation mechanism. To use it, create a cancellation token and pass it to the CancelWith function. When a cancellation token gets signalled, all worker loops will complete the current iteration and then stop.

An example of using cancellation token can be found in the chapter Parallel Search in a Tree.

Task Initialization and Finalization

In some cases it would be nice if each parallel task could have some data initialized at beginning and available to the enumerator code (the one passed to the Execute). For those occasions, ForEach implements an Initialize function.

TOmniTaskInitializerDelegate = 
  reference to procedure(var taskState: TOmniValue);
TOmniTaskFinalizerDelegate = 
  reference to procedure(const taskState: TOmniValue);
TOmniIteratorStateDelegate = 
  reference to procedure(const value: TOmniValue; var taskState: TOmniValue);
 
IOmniParallelInitializedLoop = interface
  function  Finalize(taskFinalizer: TOmniTaskFinalizerDelegate):
    IOmniParallelInitializedLoop;
  procedure Execute(loopBody: TOmniIteratorStateDelegate);
end;
 
IOmniParallelLoop = interface
  ...
  function  Initialize(taskInitializer: TOmniTaskInitializerDelegate):
    IOmniParallelInitializedLoop;
end;

You provide Initialize with task initializer, a procedure that will be called in each parallel worker task when it is created and before it starts enumerating values. This procedure can initialize the taskState parameter with any value.

Initialize returns a IOmniParallelInitializedLoop interface which implements two functions - Finalize and Execute. Call Finalize to set up task finalizer, a procedure that gets called after all values have been enumerated and before the parallel worker task ends its job.

Execute accepts a worker method with two parameters - the first one is the usual value from the enumerated container and the second contains the shared task state.

Of course, all those functions and interfaces are implemented in the <T> version, too.

The following example shows how to calculate number of primes from 1 to CHighPrime by using initializers and finalizers.

var
  lockNum  : TOmniCS;
  numPrimes: integer;
begin
  numPrimes := 0;
  Parallel.ForEach(1, CHighPrime)
    .Initialize(
      procedure (var taskState: TOmniValue)
      begin
        taskState.AsInteger := 0;
      end)
    .Finalize(
      procedure (const taskState: TOmniValue)
      begin
        lockNum.Acquire;
        try
          numPrimes := numPrimes + taskState.AsInteger;
        finally lockNum.Release; end;
      end)
    .Execute(
      procedure (const value: integer; var taskState: TOmniValue)
      begin
        if IsPrime(value) then
          taskState.AsInteger := taskState.AsInteger + 1;
      end
    );
end;

Handling Exceptions

ForEach abstraction does not yet implement any exception handling. You should always wrap task method (code passed to the Execute) in try..except if you expect the code to raise exceptions.

Internals

This section tries to explain how the ForEach is implemented. Consider it as a bonus material for users that want to know more. You don't have to read or understand it to use ForEach so you may simply skip this part of the book.

Let's start with a very simple code.

Parallel.ForEach(1, 1000)
  .Execute(
    procedure (const elem: integer)
    begin
    end);

This simple code iterates from 1 to 1000 on all available cores in parallel and executes a simple procedure that contains no workload. All in all, the code will do nothing - but it will do it in a very complicated manner.

ForEach method creates new TOmniParallelLoop<integer> object (that's the object that will coordinate parallel tasks) and passes it a source provider - an object that knows how to access values that are being enumerated (integers from 1 to 1000 in this example).

OtlDataManager unit contains four different source providers - one for each type of source that can be passed to the ForEach method. If there is a need to extend ForEach with a new enumeration source, I would only have to add few simple methods to the OtlParallel unit and write a new source provider.

class function Parallel.ForEach(low, high: integer; step: integer):
  IOmniParallelLoop<integer>;
begin
  Result := TOmniParallelLoop<integer>.Create(
    CreateSourceProvider(low, high, step), true);
end;

Parallel for tasks are started in InternalExecuteTask. This method first creates a data manager and attaches it to the source provider (compare this with the picture above - there is one source provider and one data manager). Next it creates an appropriate number of tasks and calls the task-specific delegate method from each one. [This delegate wraps your parallel code and provides it with proper input (and sometimes, output). There are many calls to InternalExecuteTask in the OtlParallel unit, each with a different taskDelegate and each providing support for a different kind of the loop.]

procedure TOmniParallelLoopBase.InternalExecuteTask(
  taskDelegate: TOmniTaskDelegate);
var
  dmOptions    : TOmniDataManagerOptions;
  iTask        : integer;
  numTasks     : integer;
  task         : IOmniTaskControl;
  begin
    ...
    oplDataManager := CreateDataManager(oplSourceProvider,
      numTasks, dmOptions); 
    ...
    for iTask := 1 to numTasks do begin
      task := CreateTask(
        procedure (const task: IOmniTask)
        begin
          ...
          taskDelegate(task);
          ...
        end,
        ...
      task.Schedule(GParallelPool);
    end;
    ...
  end;
end;

Data manager is a global field in the TOmniParallelLoop<T> object so that it can be simply reused from the task delegate. The simplest possible task delegate (below) just creates a local queue and fetches values from the local queue one by one. This results in many local queues - one per task - all connected to the same data manager.

In case you're wondering what loopBody is - it is the anonymous method you have passed to the Parallel.ForEach.Execute method.

procedure InternalExecuteTask(const task: IOmniTask)
var
  localQueue: TOmniLocalQueue;
  value     : TOmniValue;
begin
  localQueue := oplDataManager.CreateLocalQueue;
  try
    while (not Stopped) and localQueue.GetNext(value) do
      loopBody(task, value);
  finally FreeAndNil(localQueue); end;
end;

Let's reiterate:

- Source provider is created.

- Data manager is created and associated with the source provider.

- Each task creates its own local queue and uses it to access the source data.

- As you'll see in the next section, local queue retrieves data in packages (data package) and sends it to an output buffer which makes sure that the output is produced in a correct order (the output buffer part happens only if PreserveOrder method is called in the high-level code).

- If the task runs out of work, it requests a new data package from the data manager, which gets this data from the source provider (more on that below). If the source provider runs out of data, data manager will attempt to steal some data from other tasks.

All this was designed to provide fast data access (blocking is limited to the source provider, all other interactions are lock-free), good workload distribution (when a task runs out of work before other tasks, it will steal some work from other tasks) and output ordering (when required).

Source Provider

A source provider is an object that fetches data from the enumeration source (the data that was passed to the parallel for) and repackages it into a format suitable for parallel consumption. Currently there are three source providers defined in the OtlDataManager unit.

- TOmniIntegerRangeProvider

Iterates over integer ranges (just like a 'normal' ''for'' statement does). As such, it doesn't really fetch data from enumeration source but generates it internally. 

- TOmniValueEnumeratorProvider

Iterates over ''IOmniValueEnumerator'', which is a special enumerator that can be accessed from multiple readers and doesn't require locking. Currently it is only provided by the ''IOmniBlockingCollection''. 

- TOmniEnumeratorProvider

Iterates over Windows enumerators (''IEnumerator'') or Delphi enumerators (''GetEnumerator'', wrapped into ''TOmniValueEnumerator'' class). 

All source providers descend from an abstract class TOmniSourceProvider which provides common source provider interface. In theory, an interface should be used for that purpose, but in practice source providers are very performance intensive and not using interfaces speeds the program by a measurable amount.

TOmniSourceProvider = class abstract
public
  function  Count: int64; virtual; abstract;
  function  CreateDataPackage: TOmniDataPackage; virtual; abstract;
  function  GetCapabilities: TOmniSourceProviderCapabilities;
    virtual; abstract;
  function  GetPackage(dataCount: integer; 
    package: TOmniDataPackage): boolean; virtual; abstract;
  function  GetPackageSizeLimit: integer; virtual; abstract;
end;

Not all source providers are created equal and that's why function GetCapabilities returns source provider capabilities:

TOmniSourceProviderCapability = (
  spcCountable,  // source provider that knows how much data it holds
  spcFast,       // source provider operations are O(1)
  spcDataLimit   // data package can only hold limited amount of data
); 
 
TOmniSourceProviderCapabilities = set of
  TOmniSourceProviderCapability;

TOmniIntegerRangeProvider is both countable (it's very simple to know how many values are between 1 and 10, for example) and fast (it takes same amount of time to fetch 10 values or 10.000 values) while other two source providers are neither countable nor fast. The third capability, spcDataLimit is obsolete and not used. It was replaced by the GetPackageSizeLimit method.

The other important aspect of a source provider is the GetPackage method. It accesses the source (by ensuring a locked access if necessary), retrieves data and returns it in the data package. Implementation is highly dependent on the source data. For example, integer source provider just advances the current low field value and returns data package that doesn't contain bunch of values but just low and high boundaries (and that's why it is considered to be fast). Enumerator source provider locks the source, fetches the data and builds data package value by value. And in the simplest case, TOmniValueEnumerator source provider just fetches values and builds data package.

function TOmniValueEnumeratorProvider.GetPackage(dataCount: integer; 
  package: TOmniDataPackage): boolean;
var
  iData     : integer;
  intPackage: TOmniValueEnumeratorDataPackage absolute package;
  timeout   : cardinal;
  value     : TOmniValue;
begin
  Assert(not StorePositions);
  Result := false;
  dataCount := intPackage.Prepare(dataCount);
  timeout := INFINITE;
  for iData := 1 to dataCount do begin
    if not vepEnumerator.TryTake(value, timeout) then
      break; //for
    intPackage.Add(value);
    timeout := 0;
    Result := true;
  end;
end; 

Data Manager

Data manager is the central hub in the OtlDataManager hierarchy. It seats between multiple local queues and the single source provider and makes sure that all parallel tasks always have some work to do.

Two different data managers are implemented at the moment - a countable data manager and a heuristic data manager. The former is used if source provider is countable and the latter if it is not. Both descend from the abstract class TOmniDataManager.

TOmniDataManager = class abstract
public
  function  CreateLocalQueue: TOmniLocalQueue; virtual; abstract;
  function  AllocateOutputBuffer: TOmniOutputBuffer; 
    virtual; abstract;
  function  GetNext(package: TOmniDataPackage): boolean; 
    virtual; abstract;
  procedure ReleaseOutputBuffer(buffer: TOmniOutputBuffer); 
    virtual; abstract;
  procedure SetOutput(const queue: IOmniBlockingCollection);
    overload; virtual; abstract;
end;

The main difference between them lies in function GetNextFromProvider which reads data from the source provider (by calling its GetPackage method). In the countable provider this is just a simple forwarder while in the heuristic provider this function tries to find a good package size that will allow all parallel tasks to work at the full speed.

function TOmniHeuristicDataManager.GetNextFromProvider(
  package: TOmniDataPackage; generation: integer): boolean;
const
  CDataLimit = Trunc(High(integer) / CFetchTimeout_ms);
var
  dataPerMs: cardinal;
  dataSize : integer;
  time     : int64;
begin
  // the goal is to fetch as much (but not exceeding <fetch_limit>)
  // data as possible in <fetch_timeout> milliseconds; highest amount
  // of data is limited by the GetDataCountForGeneration method.
  dataSize := GetDataCountForGeneration(generation);
  if dataSize > hdmEstimatedPackageSize.Value then
    dataSize := hdmEstimatedPackageSize.Value;
  time := DSiTimeGetTime64;
  Result := SourceProvider.GetPackage(dataSize, package);
  time := DSiTimeGetTime64 - time;
  if Result then begin
    if time = 0 then
      dataPerMs := CDataLimit
    else begin
      dataPerMs := Round(dataSize / time);
      if dataPerMs >= CDataLimit then
        dataPerMs := CDataLimit;
    end;
    // average over last four fetches for dynamic adaptation
    hdmEstimatedPackageSize.Value := Round
      ((hdmEstimatedPackageSize.Value / 4 * 3) + 
       (dataPerMs / 4) * CFetchTimeout_ms);
  end;
end;

Local Queue

Each parallel task reads data from a local queue, which is just a simple interface to the data manager. The most important part of a local queue is its GetNext method which provides the task with the next value.

function TOmniLocalQueueImpl.GetNext(var value: TOmniValue): boolean;
begin
  Result := lqiDataPackage.GetNext(value);
  if not Result then begin
    Result := lqiDataManager_ref.GetNext(lqiDataPackage);
    if Result then
      Result := lqiDataPackage.GetNext(value);
  end;
end;

Each local queue contains a local data package. GetNext first tries to read next value from that data package. If that fails (data packages is empty - it was already fully processed), it tries to get new data package from the data manager and (if successful) retries fetching next data from the (refreshed) data package.

GetNext in the data manager first tries to get next package from the source provider (via private method GetNextFromProvider which calls source provider's GetPackage method). If that fails, it tries to steal part of workload from another task.

Stealing is the feature that allows all parallel tasks to be active up to the last value being enumerated. To implement it, data manager iterates over all local queues and tries to split each local queue's data package in half. If that succeeds, half of data package is left in the original local queue and another half is returned to the local queue that requested more data.

Package splitting is highly dependent on data type. For example, integer data package just recalculates boundaries while enumerator-based packages must copy data around.

function TOmniValueEnumeratorDataPackage.Split(
  package: TOmniDataPackage): boolean;
var
  intPackage: TOmniValueEnumeratorDataPackage absolute package;
  iValue    : integer;
  value     : TOmniValue;
begin
  Result := false;
  for iValue := 1 to intPackage.Prepare(vedpApproxCount.Value div 2)
  do begin
    if not GetNext(value) then
      break; //for
    intPackage.Add(value);
    Result := true;
  end;
end;

Output Ordering

Ordering (PreserveOrder) is usually used together with the Into modifier. The reason lies in the integration between the Parallel.ForEach infrastructure and your parallel code (the one that is executing as Execute payload). In the 'normal' ForEach, output from this parallel payload is not defined. You are allowed to generate any output in the payload but ForEach will know nothing about that. In this case OTL has no ability to preserver ordering because - at least from the viewpoint of the library - the parallelized code is producing no output.

When Into is used, however, your code uses a different signature (different parameters).

Parallel.ForEach(1, CMaxTest)
  .PreserveOrder
  .Into(primeQueue)
  .Execute(
    procedure (const value: integer; var res: TOmniValue)
    begin
      if IsPrime(value) then
        res := value;
    end);

Parallel payload now takes two parameters. First is - as in the more common case - the input value while the second takes the output value. As you can see from the example, the parallelized code can produce zero or one output but not more.

This small modification changes everything. As the Parallel infrastructure has control over the output parameter it can manage it internally, associate it with the input and make sure that output is generated in the same order as input was.

Let's look at the innermost code - the part that is scheduling parallel tasks. When Into is used, InternalExecuteTask executes the following quite complicated code.

InternalExecuteTask(
  procedure (const task: IOmniTask)
  var
    localQueue      : TOmniLocalQueue;
    outputBuffer_ref: TOmniOutputBuffer;
    position        : int64;
    result          : TOmniValue;
    value           : TOmniValue;
  begin
    oplDataManager.SetOutput(oplIntoQueueIntf);
    localQueue := oplDataManager.CreateLocalQueue;
    try
      outputBuffer_ref := oplDataManager.AllocateOutputBuffer;
      try
        localQueue.AssociateBuffer(outputBuffer_ref);
        result := TOmniValue.Null;
        while (not Stopped) and 
              localQueue.GetNext(position, value) do 
        begin
          loopBody(task, value, result);
          if not result.IsEmpty then begin
            outputBuffer_ref.Submit(position, result);
            result := TOmniValue.Null;
          end;
        end;
      finally 
        oplDataManager.ReleaseOutputBuffer(outputBuffer_ref); 
      end;
    finally 
      FreeAndNil(localQueue); 
    end;
  end);

Important points here are:

- The data manager is associated with the output queue. (The oplIntoQueueIntf field contains a value passed to the Into method.)

- A local queue is created, same as when 'normal' ForEach is executed.

- An output buffer is created by the data manager and associated with the local queue.

- For each input user code is executed and each non-empty output value is written into the output buffer.

- Output buffer is released, as is local queue.

The interesting part is hidden in the background; inside local queue, data manager and output buffer.

The first modification lies in the data source. When PreserveOrder is used, each data package knows the source position it was read from. To simplify matters, data package splitting is not used in this case. [And because of that, data stealing cannot be used causing slightly less effective use of CPU as in the simpler ForEach case.]

Each local queue has an output buffer set associated with it.

Each output buffer set manages two output buffers. One is active and task is writing into it and another may be either empty or full. Each output buffer is associated with an input position - just as the data package is.

When we look at data reading/writing from perspective of one task, everything is very simple. The task is reading data from a local queue (which reads data from a data package, associated with some position) and writing it to an output buffer (associated with the same position).

The tricky part comes up when the data package is exhausted (the if not Result branch in the code below).

function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean;
begin
  Result := lqiDataPackage.GetNext(position, value);
  if not Result then begin
    lqiBufferSet.ActiveBuffer.MarkFull;
    lqiBufferSet.ActivateBuffer; 
      // this will block if alternate buffer is also full
    Result := lqiDataManager_ref.GetNext(lqiDataPackage);
    if Result then begin
      Result := lqiDataPackage.GetNext(position, value);
      if Result then
        lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range;
    end;
  end;
end;

First, the currently active buffer is marked as full. This causes NotifyBufferFull to be called (see below). Then, alternate buffer is activated. This call (ActivateBuffer) will actually block if alternate buffer is not free. In this case, the current thread is blocked until one of its buffers is written into the output queue.

From this point on, GetNext proceeds in the same way as when used in the simple ForEach, except that it sets active buffer's position whenever new data package is read from the data manager.

The other part of the magic happens in the method that is called from MarkFull. It walks the buffer list and checks if there are any output buffers that are a) full and b) destined for the current output position. Such buffers are copied to the output and returned into use.

procedure TOmniBaseDataManager.NotifyBufferFull(
  buffer: TOmniOutputBufferImpl);
begin
  // Remove buffer from the list. Check if next buffer is waiting in
  // the list. Copy buffer if it is full and repeat the process.
  dmBufferRangeLock.Acquire;
  try
    while (dmBufferRangeList.Count > 0) and
          (BufferList[0].Range.First = dmNextPosition) and
          BufferList[0].IsFull do
    begin
      buffer := TOmniOutputBufferImpl(
        dmBufferRangeList.ExtractObject(0));
      dmNextPosition := buffer.Range.Last + 1;
      buffer.CopyToOutput;
    end;
  finally dmBufferRangeLock.Release; end;
end;

To recap:

- Each data buffer is associated with a position.

- Each local queue has two output buffers, one is active and another is either free or full.

- Each output buffer is also associated with a position.

- Local queue writes data to an output buffer.

- When a buffer is full, it is put into a list of waiting buffers. At that moment all appropriate waiting buffers are copied to output.

Examples

Practical example of For Each usage can be found in chapters Parallel For with Synchronized Output and Parallel Search in a Tree.

book/highlevel/foreach.txt · Last modified: 2013/03/06 02:12 by gabr
Recent changes RSS feed Debian Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki