For Each abstraction creates a parallel for loop that iterates over a range of data (number range, list, queue, dataset …) in multiple threads. To create a parallel for loop, call
When you use Parallel.ForEach, OmniThreadLibrary starts multiple background tasks and connects them to the source through a serialization mechanism. Output is optionally sorted in the order of the input. By default, ForEach waits for all background threads to complete before the control is returned to the caller.
See also demos
PrimeCount.Value := 0; Parallel.ForEach(1, 1000000).Execute( procedure (const value: integer) begin if IsPrime(value) then PrimeCount.Increment; end; end);
This simple program calculates number of prime numbers in the range from one to one million. The
PrimeCount object must be capable of atomic incrementation (a thread-safe increment), which is simple to achieve with the use of the TGp4AlignedInt record. The ForEach task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.
The main point of the
ForEach abstraction is cooperation between the parallel tasks.
ForEach goes to great lengths to minimize possible clashes between threads when they access the source data. Except in special occasions (number range, IOmniBlockingCollection), source data is not thread-safe and locking must be used for access.
To minimize this locking, source data is allocated to worker tasks in blocks.
ForEach creates a source provider object which will access the source data in a thread-safe manner. This source provider makes sure to always return an appropriately-sized block of source data (size will depend on number of tasks, type of the source data and other factors) when a task runs out of data to process.
Because the source data is allocated in blocks, it is possible that one of the tasks runs out of work while other tasks are still busy. In this case, a task will steal data from one of the other tasks. This approach makes all tasks as busy as possible while minimizing the contention.
The details of this process are further discussed in section Internals below.
Parallel class defines many
ForEach overloads, each supporting different container type. We will look at them in more detail in the following sections.
To iterate over a range, pass
last index to the
ForEach call. Optionally, you can pass a
step parameter, which defaults to
ForEach will then iterate from
last with a
class function ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>; overload;
The pseudocode for numeric
ForEach could be written as
i := low; while ((step > 0) and (i <= high)) or ((step < 0) and (i >= high)) do begin // process 'i' in parallel if low < high then Inc(i, step) else Dec(i, step); end;
If you want to iterate over a collection (say, a
TStringList), you have two possibilities.
One is to use an equivalent of
for i := 0 to sl.Count-1 do Something(sl[i]).
Parallel.ForEach(0, sl.Count-1).Execute( procedure (const value: integer) begin Something(sl[value]); end);
Another is to use an equivalent of
for s in sl do Something(s).
Parallel.ForEach(sl).Execute( procedure (const value: TOmniValue) begin Something(value); end);
In the second example, value is passed to the task function as a ''TOmniValue'' parameter. In the example above, it will be automatically converted into a string, but sometimes you'll have to do it manually, by calling
value.AsString (or use other appropriate casting function when iterating over a different container).
A variation of the second approach is to tell the
ForEach that the container contains strings. OmniThreadLibrary will then do the conversion for you.
Parallel.ForEach<string>(sl).Execute( procedure (const value: string) begin Something(value); end);
You may wonder which of those approaches is better. The answer depends on whether you can simultaneously access different items in the container from different threads at the same time. In other words, you have to know whether the container is thread-safe for reading. Luckily, all important Delphi containers (
TStringList) fall into this category.
If the container is thread-safe for reading, then the numeric approach (
ForEach(0, sl.Count-1)) is much faster than the for..in approach (
ForEach(sl)). The speed difference comes from the locking - in the former example
ForEach never locks anything and in the latter example locking is used to synchronize access to the container.
However, if the container is not thread-safe for reading, you have to use the latter approach.
There are three ways to iterate over enumerable containers. You can provide the
ForEach call with an
IEnumerable interface, with an
IEnumerator interface or with an enumerable collection itself. In the latter case, OmniThreadLibrary will use RTTI to access the enumerator for the collection. For this to work, enumerator itself must be implemented as an object, not as a record or interface. Luckily, most if not all of the VCL enumerators are implemented in this way.
class function ForEach(const enumerable: IEnumerable): IOmniParallelLoop; overload; class function ForEach(const enum: IEnumerator): IOmniParallelLoop; overload; class function ForEach(const enumerable: TObject): IOmniParallelLoop; overload; class function ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>; overload; class function ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>; overload; class function ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>; overload; class function ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>; overload; class function ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>; overload;
Collection enumeration uses locking to synchronize access to the collection enumerator, which slows down the enumeration process. In some special cases, collection may be enumerable without the locking. To enumerate over such collection, it must implement
IOmniValueEnumerator interfaces, which are defined in the OtlCommon unit.
class function ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop; overload; class function ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop; overload; class function ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>; overload; class function ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>; overload;
To simplify enumerating over blocking collections, the
Parallel class implements two
ForEach overloads accepting a blocking collection. Internally, blocking collection is enumerated with the
class function ForEach(const source: IOmniBlockingCollection): IOmniParallelLoop; overload; class function ForEach<T>(const source: IOmniBlockingCollection): IOmniParallelLoop<T>; overload;
As a last resort, the
Parallel class implements three
ForEach overloads that will (with some help from the programmer) iterate over any data.
TOmniSourceProvider way is powerful, but complicated.
class function ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop; overload;
You must implement a descendant of the
TOmniSourceProvider class. All methods must be thread-safe. For more information about the source providers, see the Internals section, below.
TOmniSourceProvider = class abstract public function Count: int64; virtual; abstract; function CreateDataPackage: TOmniDataPackage; virtual; abstract; function GetCapabilities: TOmniSourceProviderCapabilities; virtual; abstract; function GetPackage(dataCount: integer; package: TOmniDataPackage): boolean; virtual; abstract; function GetPackageSizeLimit: integer; virtual; abstract; end;
As this approach is not for the faint of heart, OmniThreadLibrary provides a slower but much simpler version.
class function ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop; overload; class function ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>; overload;
Here, you must provide a function that will return next data whenever the
ForEach asks for it.
TEnumeratorDelegate = reference to function(var next: TOmniValue): boolean; TEnumeratorDelegate<T> = reference to function(var next: T): boolean;
OmniThreadLibrary will provide the synchronisation (locking) so you can be sure this method will only be called from one thread at any time. As you may expect, this will slow things down, but parallelization may still give you a reasonable performance increase if
ForEach payload is substantial (i.e. if the method you are executing in the
ForEach loop takes some time to execute).
TEnumeratorDelegate function can also be used as a generator; that is it can calculate the values that will then be processed in the parallel for loop.
Sometimes, especially when you are dealing with datasets, synchronized access to the container will not be enough. When you are dealing with database connections, datasets etc you can easily run into thread affinity problems - that is the inability of some component to work correctly if it is called from a different thread than the one that it was created in.
[Always initialize database connections and datasets in the thread that will use them. You code may work without that precaution but unless you have extensively tested database components in multiple threads, you should not assume that they will work correctly unless that condition (initialization and use in the same thread) is met.]
In such case, the best way is to provide the input directly from the main thread. There are few different ways to achieve that.
1. Repackage data into another collection that can be easily consumed in
2. Run the
NoWait mode, then write the data into the input queue and when you run out of data, wait for the
ForEach loop to terminate. This approach is also useful, when you want to push
ForEach into background and provide it with data from some asynchronous event handler.
An example of the second approach will help clarify the idea.
uses OtlCommon, OtlCollections, OtlParallel; procedure Test; var i : integer; input: IOmniBlockingCollection; loop : IOmniParallelLoop<integer>; wait : IOmniWaitableValue; begin // create the container input := TOmniBlockingCollection.Create; // create the 'end of work' signal wait := CreateWaitableValue; loop := Parallel.ForEach<integer>(input); // set up the termination method which will signal 'end of work' loop.OnStop( procedure begin wait.Signal; end); // start the parallel for loop in NoWait mode loop.NoWait.Execute( procedure (const value: integer) begin // do something with the input value OutputDebugString(PChar(Format('%d', [value]))); end ); // provide the data to the parallel for loop for i := 1 to 1000 do input.Add(i); // signal to the parallel for loop that there's no more data to process input.CompleteAdding; // wait for the parallel for loop to stop wait.WaitFor; // destroy the parallel for loop loop := nil; end;
Parallel.ForEach returns an
IOmniParallelLoop interface which is used to configure and run the parallel for loop.
IOmniParallelLoop = interface function Aggregate(defaultAggregateValue: TOmniValue; aggregator: TOmniAggregatorDelegate): IOmniParallelAggregatorLoop; function AggregateSum: IOmniParallelAggregatorLoop; procedure Execute(loopBody: TOmniIteratorDelegate); overload; procedure Execute(loopBody: TOmniIteratorTaskDelegate); overload; function CancelWith(const token: IOmniCancellationToken): IOmniParallelLoop; function Initialize(taskInitializer: TOmniTaskInitializerDelegate): IOmniParallelInitializedLoop; function Into(const queue: IOmniBlockingCollection): IOmniParallelIntoLoop; overload; function NoWait: IOmniParallelLoop; function NumTasks(taskCount : integer): IOmniParallelLoop; function OnMessage(eventDispatcher: TObject): IOmniParallelLoop; overload; deprecated 'use TaskConfig'; function OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent): IOmniParallelLoop; overload; deprecated 'use TaskConfig'; function OnMessage(msgID: word; eventHandler: TOmniOnMessageFunction): IOmniParallelLoop; overload; deprecated 'use TaskConfig'; function OnTaskCreate(taskCreateDelegate: TOmniTaskCreateDelegate): IOmniParallelLoop; overload; function OnTaskCreate(taskCreateDelegate: TOmniTaskControlCreateDelegate): IOmniParallelLoop; overload; function OnStop(stopCode: TProc): IOmniParallelLoop; function PreserveOrder: IOmniParallelLoop; function TaskConfig(const config: IOmniTaskConfig): IOmniParallelLoop; end;
ForEach<T> returns an
IOmniParallelLoop<T> interface, which is exactly the same as the
IOmniParallelLoop except that each method returns the appropriate
<T> version of the interface.
AggregateSum are used to implement aggregation. See the Aggregation section, below.
Execute accepts the block of code to be executed for each value in the input container. Two method signatures are supported, both with the
<T> variant. One accepts only the iteration value and another accepts the ''IOmniTask'' parameter.
TOmniIteratorDelegate = reference to procedure(const value: TOmniValue); TOmniIteratorDelegate<T> = reference to procedure(const value: T); TOmniIteratorTaskDelegate = reference to procedure(const task: IOmniTask; const value: TOmniValue); TOmniIteratorTaskDelegate<T> = reference to procedure(const task: IOmniTask; const value: T);
CancelWith enables the cancellation mechanism.
OnTaskCreate you can initialize per-task data before the task begins execution. See the Task Initialization section, below.
Into sets up the output queue, see Preserving Output Order.
If you call the
NoWait function, parallel for will start in the background and control will be returned to the main thread immediately. If
NoWait is not called,
Execute will only return after all tasks have stopped working.
NumTasks you can set up the number of worker tasks. By default, number of tasks is set to [number of cores available to the process] - 1 if
PreserveOrder modifiers are used and to [number of cores available to the process] in all other cases.
OnMessage functions are deprecated, use
OnStop sets up a termination handler which will be called after all parallel for tasks will have completed their work. If
NoWait function was called,
OnStop will be called from one of the worker threads. If, however,
NoWait function was not called,
OnStop will be called from the thread that created the
ForEach abstraction. This behaviour makes it hard to execute VCL code from the
OnStop so release 3.02 introduced another variation accepting a delegate with an
TOmniTaskStopDelegate = reference to procedure (const task: IOmniTask); IOmniParallelLoop = interface function OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelLoop; overload; end;
Using this version of
OnStop, the termination handler can use ''task.Invoke'' to execute some code in the main thread. This, however, requires the
ForEach abstraction to stay alive until the
Invoke-d code is executed so you must store the
ForEach result in a global variable (form field, for example) and destroy it only in the termination handler.
var loop: IOmniParallelLoop<integer>; loop := Parallel.ForEach(1, N).NoWait; loop.OnStop( procedure (const task: IOmniTask) begin task.Invoke( procedure begin // do anything loop := nil; end); end); loop.Execute( procedure (const value: integer) begin ... end);
PreserveOrder modifies the parallel for behaviour so that output values are generated in the order of the corresponding input values. See the Preserving Output Order section, below.
TaskConfig sets up a task configuration block. Same task configuration block will be applied to all
ForEach worker tasks.
The following example uses
TaskConfig to set up a message handler which will receive messages sent from
ForEach worker tasks.
FParallel := Parallel.ForEach(1, 17) .TaskConfig(Parallel.TaskConfig.OnMessage(Self)) .NoWait .OnStop(procedure begin FParallel := nil; end); FParallel .Execute( procedure (const task: IOmniTask; const value: integer) begin task.Comm.Send(WM_LOG, value); end);
Messages sent from the worker task are received and dispatched by the
IOmniParallelLoop interface. This requires the
ForEach abstraction to stay alive until the messages are processed so you must store the
ForEach result in a global variable (form field, for example) and destroy it only in the
Some functions return a different interface. Typically, it only implements the
Execute function accepting a different parameter than the 'normal'
Execute. For example,
Aggregate returns the
TOmniIteratorIntoDelegate = reference to procedure(const value: TOmniValue; var result: TOmniValue); IOmniParallelAggregatorLoop = interface function Execute(loopBody: TOmniIteratorIntoDelegate): TOmniValue; end;
These variants of the
IOmniParallelLoop interface will be described in following sections.
When you run a
ForEach loop, you can't tell in advance in which order elements from the input collection will be processed in. For example, the code below will generate all primes from
CMaxPrime and write them into the output queue (
primeQueue) in a nondeterministic order.
primeQueue := TOmniBlockingCollection.Create; Parallel.ForEach(1, CMaxPrime).Execute( procedure (const value: integer) begin if IsPrime(value) then begin primeQueue.Add(value); end; end);
Sometimes this will represent a big problem and you'll have to write a sorting function that will resort the output before it can be processed further. To alleviate the problem,
IOmniParallelLoop implements the
PreserveOrder modifier. When used,
ForEach loop will internally sort the results produced in the task method passed to the
PreserveOrder also forces you to use the
Into method which returns the
IOmniParallelIntoLoop interface. (As you may expect, there's also the
<T> version of that interface.)
TOmniIteratorIntoDelegate = reference to procedure(const value: TOmniValue; var result: TOmniValue); TOmniIteratorIntoTaskDelegate = reference to procedure(const task: IOmniTask; const value: TOmniValue; var result: TOmniValue); IOmniParallelIntoLoop = interface procedure Execute(loopBody: TOmniIteratorIntoDelegate); overload; procedure Execute(loopBody: TOmniIteratorIntoTaskDelegate); overload; end;
As you can see, the
Execute method in
IOmniParallelIntoLoop takes a different parameter than the 'normal'
Execute. Because of that, you'll have to change a code that is passed to the
Execute to return a result.
primeQueue := TOmniBlockingCollection.Create; Parallel.ForEach(1, CMaxPrime) .PreserveOrder .Into(primeQueue) .Execute( procedure (const value: integer; var res: TOmniValue) begin if IsPrime(value) then res := value; end);
ForEach calls your worker code for each input value. If the worker code sets output parameter (
res) to any value, it will be inserted into a temporary buffer. Then the magic will happen (see the Internals section, below) and as soon as the appropriate (sorted) value is available in the temporary buffer, it is inserted into the output queue (the one passed to the
You can also use
Into without the
PreserveOrder. This will give you queue management but no ordering.
Aggregation allows you to collect data from the parallel for tasks and calculate one number that is returned to the user.
Let's start with an example - and a very bad one! The following code fragment tries to calculate a number of prime numbers between
numPrimes := 0; Parallel.ForEach(1, CMaxPrime).Execute( procedure (const value: integer) begin if IsPrime(value) then Inc(numPrimes); end);
Let's say it out loud - this code is wrong! Access to the shared variable is not synchronized between threads and that will make the result indeterminable. One way to solve the problem is to wrap the
Inc(numPrimes) with locking and another is to use
InterlockedIncrement instead of
Inc, but both will slow down the execution a lot.
A solution to this problem is to use the
procedure SumPrimes(var aggregate: TOmniValue; const value: TOmniValue) begin aggregate := aggregate.AsInt64 + value.AsInt64; end; procedure CheckPrime(const value: integer; var result: TOmniValue) begin if IsPrime(value) then Result := 1; end; numPrimes := Parallel.ForEach(1, CMaxPrime) .Aggregate(0, SumPrimes) .Execute(CheckPrime);
Aggregate takes two parameters - the first is the initial value for the aggregate and the second is an aggregation function - a piece of code that will take the current aggregate value and update it with the value returned from the parallel for task.
Aggregate, parallel for task (the code passed to the
Execute function) has the same signature as when used with
Into. It takes the current iteration value and optionally produces a result.
We could replace the code above with a
agg := 0; result.Clear; for value := 1 to CMaxPrime do begin CheckPrime(value, result); if not result.IsEmpty then begin SumPrimes(agg, result); result.Clear; end; end; numPrimes := agg;
ForEach executes the aggregation in two stages. While the parallel for task is running, it will use this approach to aggregate data into a local variable. When it runs out of work, it will call the same aggregation method to aggregate this local variable into a global result. In this second stage, however, locking will be used to protect the access to the global result.
Because summation is the most common usage of aggregation,
IOmniParallelLoop implements function
AggregateSum, which works exactly the same as the
numPrimes := Parallel.ForEach(1, CMaxPrime) .AggregateSum .Execute( procedure (const value: integer; var result: TOmniValue) begin if IsPrime(value) then Result := 1; end );
Aggregation function can do something else but the summation. The following code segment uses aggregation to find the length of the longest line in a file.
function GetLongestLineInFile(const fileName: string): integer; var maxLength: TOmniValue; sl : TStringList; begin sl := TStringList.Create; try sl.LoadFromFile(fileName); maxLength := Parallel.ForEach<string>(sl) .Aggregate(0, procedure(var aggregate: TOmniValue; const value: TOmniValue) begin if value.AsInteger > aggregate.AsInteger then aggregate := value.AsInteger; end) .Execute( procedure(const value: string; var result: TOmniValue) begin result := Length(value); end); Result := maxLength; finally FreeAndNil(sl); end; end;
ForEach has a built-in cancellation mechanism. To use it, create a cancellation token and pass it to the
CancelWith function. When a cancellation token gets signalled, all worker loops will complete the current iteration and then stop.
An example of using cancellation token can be found in the chapter Parallel Search in a Tree.
In some cases it would be nice if each parallel task could have some data initialized at beginning and available to the enumerator code (the one passed to the
Execute). For those occasions,
ForEach implements an
TOmniTaskInitializerDelegate = reference to procedure(var taskState: TOmniValue); TOmniTaskFinalizerDelegate = reference to procedure(const taskState: TOmniValue); TOmniIteratorStateDelegate = reference to procedure(const value: TOmniValue; var taskState: TOmniValue); IOmniParallelInitializedLoop = interface function Finalize(taskFinalizer: TOmniTaskFinalizerDelegate): IOmniParallelInitializedLoop; procedure Execute(loopBody: TOmniIteratorStateDelegate); end; IOmniParallelLoop = interface ... function Initialize(taskInitializer: TOmniTaskInitializerDelegate): IOmniParallelInitializedLoop; end;
Initialize with task initializer, a procedure that will be called in each parallel worker task when it is created and before it starts enumerating values. This procedure can initialize the
taskState parameter with any value.
Initialize returns a
IOmniParallelInitializedLoop interface which implements two functions -
Finalize to set up task finalizer, a procedure that gets called after all values have been enumerated and before the parallel worker task ends its job.
Execute accepts a worker method with two parameters - the first one is the usual value from the enumerated container and the second contains the shared task state.
Of course, all those functions and interfaces are implemented in the
<T> version, too.
The following example shows how to calculate number of primes from
CHighPrime by using initializers and finalizers.
var lockNum : TOmniCS; numPrimes: integer; begin numPrimes := 0; Parallel.ForEach(1, CHighPrime) .Initialize( procedure (var taskState: TOmniValue) begin taskState.AsInteger := 0; end) .Finalize( procedure (const taskState: TOmniValue) begin lockNum.Acquire; try numPrimes := numPrimes + taskState.AsInteger; finally lockNum.Release; end; end) .Execute( procedure (const value: integer; var taskState: TOmniValue) begin if IsPrime(value) then taskState.AsInteger := taskState.AsInteger + 1; end ); end;
ForEach abstraction does not yet implement any exception handling. You should always wrap task method (code passed to the
try..except if you expect the code to raise exceptions.
This section tries to explain how the
ForEach is implemented. Consider it as a bonus material for users that want to know more. You don't have to read or understand it to use
ForEach so you may simply skip this part of the book.
Let's start with a very simple code.
Parallel.ForEach(1, 1000) .Execute( procedure (const elem: integer) begin end);
This simple code iterates from 1 to 1000 on all available cores in parallel and executes a simple procedure that contains no workload. All in all, the code will do nothing - but it will do it in a very complicated manner.
ForEach method creates new
TOmniParallelLoop<integer> object (that's the object that will coordinate parallel tasks) and passes it a source provider - an object that knows how to access values that are being enumerated (integers from
1000 in this example).
OtlDataManager unit contains four different source providers - one for each type of source that can be passed to the
ForEach method. If there is a need to extend
ForEach with a new enumeration source, I would only have to add few simple methods to the OtlParallel unit and write a new source provider.
class function Parallel.ForEach(low, high: integer; step: integer): IOmniParallelLoop<integer>; begin Result := TOmniParallelLoop<integer>.Create( CreateSourceProvider(low, high, step), true); end;
Parallel for tasks are started in
InternalExecuteTask. This method first creates a data manager and attaches it to the source provider (compare this with the picture above - there is one source provider and one data manager). Next it creates an appropriate number of tasks and calls the task-specific delegate method from each one. [This delegate wraps your parallel code and provides it with proper input (and sometimes, output). There are many calls to
InternalExecuteTask in the OtlParallel unit, each with a different
taskDelegate and each providing support for a different kind of the loop.]
procedure TOmniParallelLoopBase.InternalExecuteTask( taskDelegate: TOmniTaskDelegate); var dmOptions : TOmniDataManagerOptions; iTask : integer; numTasks : integer; task : IOmniTaskControl; begin ... oplDataManager := CreateDataManager(oplSourceProvider, numTasks, dmOptions); ... for iTask := 1 to numTasks do begin task := CreateTask( procedure (const task: IOmniTask) begin ... taskDelegate(task); ... end, ... task.Schedule(GParallelPool); end; ... end; end;
Data manager is a global field in the
TOmniParallelLoop<T> object so that it can be simply reused from the task delegate. The simplest possible task delegate (below) just creates a local queue and fetches values from the local queue one by one. This results in many local queues - one per task - all connected to the same data manager.
In case you're wondering what
loopBody is - it is the anonymous method you have passed to the
procedure InternalExecuteTask(const task: IOmniTask) var localQueue: TOmniLocalQueue; value : TOmniValue; begin localQueue := oplDataManager.CreateLocalQueue; try while (not Stopped) and localQueue.GetNext(value) do loopBody(task, value); finally FreeAndNil(localQueue); end; end;
- Source provider is created.
- Data manager is created and associated with the source provider.
- Each task creates its own local queue and uses it to access the source data.
- As you'll see in the next section, local queue retrieves data in packages (data package) and sends it to an output buffer which makes sure that the output is produced in a correct order (the output buffer part happens only if
PreserveOrder method is called in the high-level code).
- If the task runs out of work, it requests a new data package from the data manager, which gets this data from the source provider (more on that below). If the source provider runs out of data, data manager will attempt to steal some data from other tasks.
All this was designed to provide fast data access (blocking is limited to the source provider, all other interactions are lock-free), good workload distribution (when a task runs out of work before other tasks, it will steal some work from other tasks) and output ordering (when required).
A source provider is an object that fetches data from the enumeration source (the data that was passed to the parallel for) and repackages it into a format suitable for parallel consumption. Currently there are three source providers defined in the OtlDataManager unit.
Iterates over integer ranges (just like a 'normal' ''for'' statement does). As such, it doesn't really fetch data from enumeration source but generates it internally.
Iterates over ''IOmniValueEnumerator'', which is a special enumerator that can be accessed from multiple readers and doesn't require locking. Currently it is only provided by the ''IOmniBlockingCollection''.
Iterates over Windows enumerators (''IEnumerator'') or Delphi enumerators (''GetEnumerator'', wrapped into ''TOmniValueEnumerator'' class).
All source providers descend from an abstract class
TOmniSourceProvider which provides common source provider interface. In theory, an interface should be used for that purpose, but in practice source providers are very performance intensive and not using interfaces speeds the program by a measurable amount.
TOmniSourceProvider = class abstract public function Count: int64; virtual; abstract; function CreateDataPackage: TOmniDataPackage; virtual; abstract; function GetCapabilities: TOmniSourceProviderCapabilities; virtual; abstract; function GetPackage(dataCount: integer; package: TOmniDataPackage): boolean; virtual; abstract; function GetPackageSizeLimit: integer; virtual; abstract; end;
Not all source providers are created equal and that's why function
GetCapabilities returns source provider capabilities:
TOmniSourceProviderCapability = ( spcCountable, // source provider that knows how much data it holds spcFast, // source provider operations are O(1) spcDataLimit // data package can only hold limited amount of data ); TOmniSourceProviderCapabilities = set of TOmniSourceProviderCapability;
TOmniIntegerRangeProvider is both countable (it's very simple to know how many values are between
10, for example) and fast (it takes same amount of time to fetch
10 values or
10.000 values) while other two source providers are neither countable nor fast. The third capability,
spcDataLimit is obsolete and not used. It was replaced by the
The other important aspect of a source provider is the
GetPackage method. It accesses the source (by ensuring a locked access if necessary), retrieves data and returns it in the data package. Implementation is highly dependent on the source data. For example, integer source provider just advances the current low field value and returns data package that doesn't contain bunch of values but just low and high boundaries (and that's why it is considered to be fast). Enumerator source provider locks the source, fetches the data and builds data package value by value. And in the simplest case,
TOmniValueEnumerator source provider just fetches values and builds data package.
function TOmniValueEnumeratorProvider.GetPackage(dataCount: integer; package: TOmniDataPackage): boolean; var iData : integer; intPackage: TOmniValueEnumeratorDataPackage absolute package; timeout : cardinal; value : TOmniValue; begin Assert(not StorePositions); Result := false; dataCount := intPackage.Prepare(dataCount); timeout := INFINITE; for iData := 1 to dataCount do begin if not vepEnumerator.TryTake(value, timeout) then break; //for intPackage.Add(value); timeout := 0; Result := true; end; end;
Data manager is the central hub in the OtlDataManager hierarchy. It seats between multiple local queues and the single source provider and makes sure that all parallel tasks always have some work to do.
Two different data managers are implemented at the moment - a countable data manager and a heuristic data manager. The former is used if source provider is countable and the latter if it is not. Both descend from the abstract class
TOmniDataManager = class abstract public function CreateLocalQueue: TOmniLocalQueue; virtual; abstract; function AllocateOutputBuffer: TOmniOutputBuffer; virtual; abstract; function GetNext(package: TOmniDataPackage): boolean; virtual; abstract; procedure ReleaseOutputBuffer(buffer: TOmniOutputBuffer); virtual; abstract; procedure SetOutput(const queue: IOmniBlockingCollection); overload; virtual; abstract; end;
The main difference between them lies in function
GetNextFromProvider which reads data from the source provider (by calling its
GetPackage method). In the countable provider this is just a simple forwarder while in the heuristic provider this function tries to find a good package size that will allow all parallel tasks to work at the full speed.
function TOmniHeuristicDataManager.GetNextFromProvider( package: TOmniDataPackage; generation: integer): boolean; const CDataLimit = Trunc(High(integer) / CFetchTimeout_ms); var dataPerMs: cardinal; dataSize : integer; time : int64; begin // the goal is to fetch as much (but not exceeding <fetch_limit>) // data as possible in <fetch_timeout> milliseconds; highest amount // of data is limited by the GetDataCountForGeneration method. dataSize := GetDataCountForGeneration(generation); if dataSize > hdmEstimatedPackageSize.Value then dataSize := hdmEstimatedPackageSize.Value; time := DSiTimeGetTime64; Result := SourceProvider.GetPackage(dataSize, package); time := DSiTimeGetTime64 - time; if Result then begin if time = 0 then dataPerMs := CDataLimit else begin dataPerMs := Round(dataSize / time); if dataPerMs >= CDataLimit then dataPerMs := CDataLimit; end; // average over last four fetches for dynamic adaptation hdmEstimatedPackageSize.Value := Round ((hdmEstimatedPackageSize.Value / 4 * 3) + (dataPerMs / 4) * CFetchTimeout_ms); end; end;
Each parallel task reads data from a local queue, which is just a simple interface to the data manager. The most important part of a local queue is its
GetNext method which provides the task with the next value.
function TOmniLocalQueueImpl.GetNext(var value: TOmniValue): boolean; begin Result := lqiDataPackage.GetNext(value); if not Result then begin Result := lqiDataManager_ref.GetNext(lqiDataPackage); if Result then Result := lqiDataPackage.GetNext(value); end; end;
Each local queue contains a local data package.
GetNext first tries to read next value from that data package. If that fails (data packages is empty - it was already fully processed), it tries to get new data package from the data manager and (if successful) retries fetching next data from the (refreshed) data package.
GetNext in the data manager first tries to get next package from the source provider (via private method
GetNextFromProvider which calls source provider's
GetPackage method). If that fails, it tries to steal part of workload from another task.
Stealing is the feature that allows all parallel tasks to be active up to the last value being enumerated. To implement it, data manager iterates over all local queues and tries to split each local queue's data package in half. If that succeeds, half of data package is left in the original local queue and another half is returned to the local queue that requested more data.
Package splitting is highly dependent on data type. For example, integer data package just recalculates boundaries while enumerator-based packages must copy data around.
function TOmniValueEnumeratorDataPackage.Split( package: TOmniDataPackage): boolean; var intPackage: TOmniValueEnumeratorDataPackage absolute package; iValue : integer; value : TOmniValue; begin Result := false; for iValue := 1 to intPackage.Prepare(vedpApproxCount.Value div 2) do begin if not GetNext(value) then break; //for intPackage.Add(value); Result := true; end; end;
PreserveOrder) is usually used together with the
Into modifier. The reason lies in the integration between the
Parallel.ForEach infrastructure and your parallel code (the one that is executing as
Execute payload). In the 'normal'
ForEach, output from this parallel payload is not defined. You are allowed to generate any output in the payload but
ForEach will know nothing about that. In this case OTL has no ability to preserver ordering because - at least from the viewpoint of the library - the parallelized code is producing no output.
Into is used, however, your code uses a different signature (different parameters).
Parallel.ForEach(1, CMaxTest) .PreserveOrder .Into(primeQueue) .Execute( procedure (const value: integer; var res: TOmniValue) begin if IsPrime(value) then res := value; end);
Parallel payload now takes two parameters. First is - as in the more common case - the input value while the second takes the output value. As you can see from the example, the parallelized code can produce zero or one output but not more.
This small modification changes everything. As the Parallel infrastructure has control over the output parameter it can manage it internally, associate it with the input and make sure that output is generated in the same order as input was.
Let's look at the innermost code - the part that is scheduling parallel tasks. When
Into is used,
InternalExecuteTask executes the following quite complicated code.
InternalExecuteTask( procedure (const task: IOmniTask) var localQueue : TOmniLocalQueue; outputBuffer_ref: TOmniOutputBuffer; position : int64; result : TOmniValue; value : TOmniValue; begin oplDataManager.SetOutput(oplIntoQueueIntf); localQueue := oplDataManager.CreateLocalQueue; try outputBuffer_ref := oplDataManager.AllocateOutputBuffer; try localQueue.AssociateBuffer(outputBuffer_ref); result := TOmniValue.Null; while (not Stopped) and localQueue.GetNext(position, value) do begin loopBody(task, value, result); if not result.IsEmpty then begin outputBuffer_ref.Submit(position, result); result := TOmniValue.Null; end; end; finally oplDataManager.ReleaseOutputBuffer(outputBuffer_ref); end; finally FreeAndNil(localQueue); end; end);
Important points here are:
- The data manager is associated with the output queue. (The
oplIntoQueueIntf field contains a value passed to the
- A local queue is created, same as when 'normal'
ForEach is executed.
- An output buffer is created by the data manager and associated with the local queue.
- For each input user code is executed and each non-empty output value is written into the output buffer.
- Output buffer is released, as is local queue.
The interesting part is hidden in the background; inside local queue, data manager and output buffer.
The first modification lies in the data source. When
PreserveOrder is used, each data package knows the source position it was read from. To simplify matters, data package splitting is not used in this case. [And because of that, data stealing cannot be used causing slightly less effective use of CPU as in the simpler
Each local queue has an output buffer set associated with it.
Each output buffer set manages two output buffers. One is active and task is writing into it and another may be either empty or full. Each output buffer is associated with an input position - just as the data package is.
When we look at data reading/writing from perspective of one task, everything is very simple. The task is reading data from a local queue (which reads data from a data package, associated with some position) and writing it to an output buffer (associated with the same position).
The tricky part comes up when the data package is exhausted (the
if not Result branch in the code below).
function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean; begin Result := lqiDataPackage.GetNext(position, value); if not Result then begin lqiBufferSet.ActiveBuffer.MarkFull; lqiBufferSet.ActivateBuffer; // this will block if alternate buffer is also full Result := lqiDataManager_ref.GetNext(lqiDataPackage); if Result then begin Result := lqiDataPackage.GetNext(position, value); if Result then lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range; end; end; end;
First, the currently active buffer is marked as full. This causes
NotifyBufferFull to be called (see below). Then, alternate buffer is activated. This call (
ActivateBuffer) will actually block if alternate buffer is not free. In this case, the current thread is blocked until one of its buffers is written into the output queue.
From this point on,
GetNext proceeds in the same way as when used in the simple
ForEach, except that it sets active buffer's position whenever new data package is read from the data manager.
The other part of the magic happens in the method that is called from
MarkFull. It walks the buffer list and checks if there are any output buffers that are a) full and b) destined for the current output position. Such buffers are copied to the output and returned into use.
procedure TOmniBaseDataManager.NotifyBufferFull( buffer: TOmniOutputBufferImpl); begin // Remove buffer from the list. Check if next buffer is waiting in // the list. Copy buffer if it is full and repeat the process. dmBufferRangeLock.Acquire; try while (dmBufferRangeList.Count > 0) and (BufferList.Range.First = dmNextPosition) and BufferList.IsFull do begin buffer := TOmniOutputBufferImpl( dmBufferRangeList.ExtractObject(0)); dmNextPosition := buffer.Range.Last + 1; buffer.CopyToOutput; end; finally dmBufferRangeLock.Release; end; end;
- Each data buffer is associated with a position.
- Each local queue has two output buffers, one is active and another is either free or full.
- Each output buffer is also associated with a position.
- Local queue writes data to an output buffer.
- When a buffer is full, it is put into a list of waiting buffers. At that moment all appropriate waiting buffers are copied to output.