Parallel Task

Parallel Task abstraction enables you to start a background tasks in multiple threads. To create the task, call Parallel.ParallelTask.



When you use Parallel.ParallelTask, same background task is started in multiple background threads. By default, ParallelTask waits for all background threads to complete before the control is returned to the caller.

Example:

Parallel.ParallelTask.NumTasks(3).Execute(
  procedure
  begin
    while true do
      ;
  end
);

This simple code fragment starts infinite loop in three threads. The task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

The Parallel class implements function ParallelTask which returns a IOmniParallelTask interface. All configuration of the parallel task is done via this interface.

type
  Parallel = class
    class function ParallelTask: IOmniParallelTask;
    ...
  end;

IOmniParallelTask Interface

type
  IOmniParallelTask = interface
    function  Execute(const aTask: TProc): IOmniParallelTask; overload;
    function  Execute(
      const aTask: TOmniParallelTaskDelegate): IOmniParallelTask; overload;
    function  NoWait: IOmniParallelTask;
    function  NumTasks(numTasks: integer): IOmniParallelTask;
    function  OnStop(const stopCode: TProc): IOmniParallelTask;
    function  TaskConfig(const config: IOmniTaskConfig): IOmniParallelTask;
    function  WaitFor(timeout_ms: cardinal): boolean;
  end;

The most important of these functions is Execute. It will start appropriate number of background threads and start executing task in those threads. By default, ParallelTask uses as many threads as there are tasks but you can override this behaviour by calling the NumTasks function.

There are two overloaded versions of Execute. The first accepts a parameter-less background task and the second accepts a background task with an IOmniTask parameter.

The OnStop function can be used to set up a termination handler - a code that will get executed when all background tasks will have complete execution. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block. To set up task configuration block, call the TaskConfig function.

A call to the WaitFor function will wait for up to timeout_ms milliseconds (this value can be set to INFINITE) for all background tasks to terminate. If they will terminate in the specified time, WaitFor will return True. Otherwise, it will return False.

Example

The following code uses ParallelTask to generate large quantities of pseudorandom data. The data is written to an output stream.

procedure CreateRandomFile(fileSize: integer; output: TStream);
const
  CBlockSize = 1 * 1024 * 1024 {1 MB};
var
  buffer   : TOmniValue;
  memStr   : TMemoryStream;
  outQueue : IOmniBlockingCollection;
  unwritten: IOmniCounter;
begin
  outQueue := TOmniBlockingCollection.Create;
  unwritten := CreateCounter(fileSize);
  Parallel.ParallelTask.NoWait
    .NumTasks(Environment.Process.Affinity.Count)
    .OnStop(Parallel.CompleteQueue(outQueue))
    .Execute(
      procedure
      var
        buffer      : TMemoryStream;
        bytesToWrite: integer;
        randomGen   : TGpRandom;
      begin
        randomGen := TGpRandom.Create;
        try
          while unwritten.Take(CBlockSize, bytesToWrite) do begin
            buffer := TMemoryStream.Create;
            buffer.Size := bytesToWrite;
            FillBuffer(buffer.Memory, bytesToWrite, randomGen);
            outQueue.Add(buffer);
          end;
        finally FreeAndNil(randomGen); end;
      end
    );
  for buffer in outQueue do begin
    memStr := buffer.AsObject as TMemoryStream;
    output.CopyFrom(memStr, 0);
    FreeAndNil(memStr);
  end;
end;

The code creates a blocking collection to hold buffers with pseudorandom data. Then it creates a counter which will hold the count of bytes that have yet to be written.

ParallelTask is used to start parallel workers. Each worker initializes its own pseudorandom data generator and then keeps generating buffers with pseudorandom data until the counter drops to zero. Each buffer is written to the blocking collection.

Because of the NoWait modifier, main thread continues with the execution immediately after all threads have been scheduled. Main thread keeps reading buffers from the blocking collection and writes the content of those buffers into the output stream. (As the TStream is not thread-safe, we cannot write to the output stream directly from multiple background threads.)

For..in loop will block if there is no data in the blocking collection, but it will only stop looping after the blocking collection's CompleteAdding method is called. This is done with the help of the Parallel.CompleteQueue helper which is called from the termination handler (OnStop).

class function Parallel.CompleteQueue(
  const queue: IOmniBlockingCollection): TProc;
begin
  Result :=
    procedure
    begin
      queue.CompleteAdding;
    end;
end;

Handling Exceptions

Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of ParallelTask (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.ParallelTask.Execute(…) will re-raise task exceptions). If, however, you are using the asynchronous version (by calling Parallel.Paralleltask.NoWait.Execute(…)), exception will only be raised when you wait for the background tasks to complete by calling WaitFor.

For more details on handling ParallelTask exceptions, see the Handling Exception section in the Join chapter.

Examples

Practical examples of Parallel Task usage can be found in chapters Background Worker and List Partitioning and Parallel Data Production.

book/highlevel/paralleltask.txt · Last modified: 2012/11/14 11:55 by gabr
Recent changes RSS feed Debian Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki