Join

Join abstraction enables you to start multiple background tasks in multiple threads. To create the task, call Parallel.Join.



*When you use Parallel.Join, background tasks are started in background threads. By default, Join waits
for all background threads to complete before the control is returned to the caller.*

See also demo 37_ParallelJoin.

Example:

Parallel.Join(
  procedure
  var
    i: integer;
  begin
    for i := 1 to 8 do begin
      Sleep(200);
      MessageBeep($FFFFFFFF);
    end,
  procedure
  var
    i: integer;
  begin
    for i := 1 to 10 do begin
      Sleep(160);
      MessageBeep($FFFFFFFF);
    end;
  end
).Execute;

This simple program executes two background tasks, each beeping at different frequency. Each task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.

The Parallel class defines five Join overloads. The first creates empty IOmniParallelJoin interface. Next two create same interface but configured with two tasks and last two contain this interface configured for any number of tasks. Tasks can be of two different types — parameterless methods and methods containing one parameter of the IOmniJoinState type.

type
  TOmniJoinDelegate = reference to procedure (const joinState:
                        IOmniJoinState);
 
  Parallel = class
    class function Join: IOmniParallelJoin; overload;
    class function Join(const task1, task2: TProc): IOmniParallelJoin;
                     overload;
    class function Join(const task1, task2: TOmniJoinDelegate): 
                     IOmniParallelJoin; overload;
    class function Join(const tasks: array of TProc): IOmniParallelJoin;
                     overload;
    class function Join(const tasks: array of TOmniJoinDelegate): 
                     IOmniParallelJoin; overload;
    ...
  end;

IOmniParallelJoin Interface

Parallel.Join returns an IOmniParallelJoin interface which you can use to specify tasks, start and control execution and handle exceptions.

type
  IOmniParallelJoin = interface
    function Cancel: IOmniParallelJoin;
    function DetachException: Exception;
    function Execute: IOmniParallelJoin;
    function FatalException: Exception;
    function IsCancelled: boolean;
    function IsExceptional: boolean;
    function NumTasks(numTasks: integer): IOmniParallelJoin;
    function OnStop(const stopCode: TProc): IOmniParallelJoin;
    function Task(const task: TProc): IOmniParallelJoin; overload;
    function Task(const task: TOmniJoinDelegate): IOmniParallelJoin;
               overload;
    function TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin;
    function NoWait: IOmniParallelJoin;
    function WaitFor(timeout_ms: cardinal): boolean;
  end;

The most important of these functions is Execute. It will start appropriate number of background threads and start executing tasks in those threads. By default, Join uses as many threads as there are tasks but you can override this behaviour by calling the NumTasks function.

You can add tasks to the Join by calling the Task function. In fact, that's just how the Parallel.Join overloads are implemented.

class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin;
begin
  Result := TOmniParallelJoin.Create.Task(task1).Task(task2);
end;
 
class function Parallel.Join(const tasks: array of TProc): IOmniParallelJoin;
var
  aTask: TProc;
begin
  Result := TOmniParallelJoin.Create;
  for aTask in tasks do
    Result.Task(aTask);
end;

To set up task configuration block, call the TaskConfig function. Following example uses TaskConfig to set up critical section which is then used in two parallel tasks to protect the shared resource. Workers use the IOmniJoinState instance to access the ''IOmniTask'' interface and through it the ''Lock'' property.

FSharedValue := 42;
Parallel.Join(
  procedure (const joinState: IOmniJoinState)
  var
    i: integer;
  begin
    for i := 1 to 1000000 do begin
      joinState.Task.Lock.Acquire;
      FSharedValue := FSharedValue + 17;
      joinState.Task.Lock.Release;
    end;
  end,
  procedure (const joinState: IOmniJoinState)
  var
    i: integer;
  begin
    for i := 1 to 1000000 do begin
      joinState.Task.Lock.Acquire;
      FSharedValue := FSharedValue - 17;
      joinState.Task.Lock.Release;
    end;
  end
).TaskConfig(Parallel.TaskConfig.WithLock(CreateOmniCriticalSection))
 .Execute;

By default, Join will wait for all background tasks to complete execution. Alternatively, you can call the NoWait function, after which Join will just start the tasks and return immediately. If you want to be notified when all tasks are finished, you can assign the termination handler by calling the OnStop function. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block.

You can also call WaitFor to wait for the Join to finish. WaitFor accepts an optional timeout parameter; by default it will wait as long as needed.

IOmniJoinState Interface

Tasks can be defined to accept a parameter of type IOmniJoinState. This allows them to access the ''IOmniTask'' interface, participate in the cooperative cancellation and check for exceptions.

type
  IOmniJoinState = interface
    function  GetTask: IOmniTask;
  //
    procedure Cancel;
    function  IsCancelled: boolean;
    function  IsExceptional: boolean;
    property Task: IOmniTask read GetTask;
  end;

Cancellation

Join background tasks support cooperative cancellation. If you are using TOmniJoinDelegate tasks (that is, tasks accepting the IOmniJoinState parameter), any task can call the Cancel method of this interface. This, in turn, sets internal cancellation flag which may be queried by calling the IsCancelled method. That way, one task can interrupt other tasks provided that they are testing IsCancelled repeatedly.

Main thread can also cancel its subtasks (when using NoWait) by calling IOmniParallelJoin.Cancel and can test the cancellation flag by calling IsCancelled.

The following demo code demonstrates most of concepts mentioned above.

var
  join: IOmniParallelJoin;
  time: int64;
begin
  FJoinCount.Value := 0;
  FJoinCount2.Value := 0;
  join := Parallel.Join(
    procedure (const joinState: IOmniJoinState)
    var
      i: integer;
    begin
      for i := 1 to 10 do begin
        Sleep(100);
        FJoinCount.Increment;
        if joinState.IsCancelled then
          break; //for
      end;
    end,
    procedure (const joinState: IOmniJoinState)
    var
      i: integer;
    begin
      for i := 1 to 10 do begin
        Sleep(200);
        FJoinCount2.Increment;
        if joinState.IsCancelled then
          break; //for
      end;
    end
  ).NoWait.Execute;
  Sleep(500);
  time := DSiTimeGetTime64;
  join.Cancel.WaitFor(INFINITE);
  Log(Format('Waited %d ms for joins to terminate',
    [DSiElapsedTime64(time)]));
  Log(Format('Tasks counted up to %d and %d',
    [FJoinCount.Value, FJoinCount2.Value]));
end;

The call to Parallel.Join starts two tasks. Because the NoWait is used, the call returns immediately and stores resulting IOmniParallelJoin interface in the local variable join. Main code then sleeps for half a second, cancels the execution and immediately waits for background tasks to terminate.

Both tasks execute a simple loop which waits a little, increments a counter and checks the cancellation flag. Because the cancellation flag is set after 500 ms, we would expect five or six repetitions of the first loop (five repetitions take exactly 500 ms and we can't tell exactly what will execute first — Cancel or fifth IsCancelled) and three repetitions of the second loop. That is exactly what the program prints out.

Handling Exceptions

Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of Join (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.Join(…).Execute will re-raise task exceptions). If, however, you are using the asynchronous version (by calling Parallel.Join(…).NoWait.Execute), exception will only be raised when you wait for the background tasks to complete by calling WaitFor.

You can test for the exception by calling the FatalException function. It will first wait for all background tasks to complete (without raising the exception) and then return the exception object. You can also detach the exception object from the Join and handle it yourself by using the DetachException function.

There's also an IsExceptional function (available in IOmniParallelJoin and IOmniJoinState interfaces) which tells you if any background task has thrown an exception.

There's an additional complication — as Join executes multiple tasks, there can be multiple background exceptions. To get you full access to those exceptions, Join wraps them into EJoinException object.

type
  EJoinException = class(Exception)
    constructor Create; reintroduce;
    destructor  Destroy; override;
    procedure Add(iTask: integer; taskException: Exception);
    function  Count: integer;
    property Inner[idxException: integer]: TJoinInnerException 
      read GetInner; default;
  end;

This exception class contains combined error messages from all background tasks in its Message property and allows you to access exception information for all caught exceptions directly with the Inner[] property. The following code demonstrates this.

var
  iInnerExc: integer;
begin
  try
    Parallel.Join([
      procedure begin
        raise ETestException.Create('Exception 1 in Parallel.Join');
      end,
      procedure begin
      end,
      procedure begin
        raise ETestException.Create('Exception 2 in Parallel.Join');
      end]).Execute;
  except
    on E: EJoinException do begin
      Log('Join raised exception %s:%s', [E.ClassName, E.Message]);
      for iInnerExc := 0 to E.Count - 1 do
        Log('  Task #%d raised exception: %s:%s', [E[iInnerExc].TaskNumber,
          E[iInnerExc].FatalException.ClassName,
          E[iInnerExc].FatalException.Message]);
    end;
  end;
end;

The iInnerExc variable loops over all caught exceptions and for each such exception displays the task number (starting with 0), exception class and exception message. This approach allows you to either just log the exception or, if you are interested in details, examine specific inner exceptions and handle them appropriately.

See also demo 48_OtlParallelExceptions.

book/highlevel/join.txt · Last modified: 2012/11/14 11:55 by gabr
Recent changes RSS feed Debian Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki