Join abstraction enables you to start multiple background tasks in multiple threads. To create the task, call Parallel.Join.
*When you use Parallel.Join, background tasks are started in background threads. By default, Join waits
for all background threads to complete before the control is returned to the caller.*
See also demo 37_ParallelJoin.
Example:
Parallel.Join( procedure var i: integer; begin for i := 1 to 8 do begin Sleep(200); MessageBeep($FFFFFFFF); end, procedure var i: integer; begin for i := 1 to 10 do begin Sleep(160); MessageBeep($FFFFFFFF); end; end ).Execute;
This simple program executes two background tasks, each beeping at different frequency. Each task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.
The Parallel class defines five Join overloads. The first creates empty IOmniParallelJoin interface. Next two create same interface but configured with two tasks and last two contain this interface configured for any number of tasks. Tasks can be of two different types — parameterless methods and methods containing one parameter of the IOmniJoinState type.
type TOmniJoinDelegate = reference to procedure (const joinState: IOmniJoinState); Parallel = class class function Join: IOmniParallelJoin; overload; class function Join(const task1, task2: TProc): IOmniParallelJoin; overload; class function Join(const task1, task2: TOmniJoinDelegate): IOmniParallelJoin; overload; class function Join(const tasks: array of TProc): IOmniParallelJoin; overload; class function Join(const tasks: array of TOmniJoinDelegate): IOmniParallelJoin; overload; ... end;
Parallel.Join returns an IOmniParallelJoin interface which you can use to specify tasks, start and control execution and handle exceptions.
type IOmniParallelJoin = interface function Cancel: IOmniParallelJoin; function DetachException: Exception; function Execute: IOmniParallelJoin; function FatalException: Exception; function IsCancelled: boolean; function IsExceptional: boolean; function NumTasks(numTasks: integer): IOmniParallelJoin; function OnStop(const stopCode: TProc): IOmniParallelJoin; function Task(const task: TProc): IOmniParallelJoin; overload; function Task(const task: TOmniJoinDelegate): IOmniParallelJoin; overload; function TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin; function NoWait: IOmniParallelJoin; function WaitFor(timeout_ms: cardinal): boolean; end;
The most important of these functions is Execute. It will start appropriate number of background threads and start executing tasks in those threads. By default, Join uses as many threads as there are tasks but you can override this behaviour by calling the NumTasks function.
You can add tasks to the Join by calling the Task function. In fact, that's just how the Parallel.Join overloads are implemented.
class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin; begin Result := TOmniParallelJoin.Create.Task(task1).Task(task2); end; class function Parallel.Join(const tasks: array of TProc): IOmniParallelJoin; var aTask: TProc; begin Result := TOmniParallelJoin.Create; for aTask in tasks do Result.Task(aTask); end;
To set up task configuration block, call the TaskConfig function. Following example uses TaskConfig to set up critical section which is then used in two parallel tasks to protect the shared resource. Workers use the IOmniJoinState instance to access the ''IOmniTask'' interface and through it the ''Lock'' property.
FSharedValue := 42; Parallel.Join( procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 1000000 do begin joinState.Task.Lock.Acquire; FSharedValue := FSharedValue + 17; joinState.Task.Lock.Release; end; end, procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 1000000 do begin joinState.Task.Lock.Acquire; FSharedValue := FSharedValue - 17; joinState.Task.Lock.Release; end; end ).TaskConfig(Parallel.TaskConfig.WithLock(CreateOmniCriticalSection)) .Execute;
By default, Join will wait for all background tasks to complete execution. Alternatively, you can call the NoWait function, after which Join will just start the tasks and return immediately. If you want to be notified when all tasks are finished, you can assign the termination handler by calling the OnStop function. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block.
You can also call WaitFor to wait for the Join to finish. WaitFor accepts an optional timeout parameter; by default it will wait as long as needed.
Tasks can be defined to accept a parameter of type IOmniJoinState. This allows them to access the ''IOmniTask'' interface, participate in the cooperative cancellation and check for exceptions.
type IOmniJoinState = interface function GetTask: IOmniTask; // procedure Cancel; function IsCancelled: boolean; function IsExceptional: boolean; property Task: IOmniTask read GetTask; end;
Join background tasks support cooperative cancellation. If you are using TOmniJoinDelegate tasks (that is, tasks accepting the IOmniJoinState parameter), any task can call the Cancel method of this interface. This, in turn, sets internal cancellation flag which may be queried by calling the IsCancelled method. That way, one task can interrupt other tasks provided that they are testing IsCancelled repeatedly.
Main thread can also cancel its subtasks (when using NoWait) by calling IOmniParallelJoin.Cancel and can test the cancellation flag by calling IsCancelled.
The following demo code demonstrates most of concepts mentioned above.
var join: IOmniParallelJoin; time: int64; begin FJoinCount.Value := 0; FJoinCount2.Value := 0; join := Parallel.Join( procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 10 do begin Sleep(100); FJoinCount.Increment; if joinState.IsCancelled then break; //for end; end, procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 10 do begin Sleep(200); FJoinCount2.Increment; if joinState.IsCancelled then break; //for end; end ).NoWait.Execute; Sleep(500); time := DSiTimeGetTime64; join.Cancel.WaitFor(INFINITE); Log(Format('Waited %d ms for joins to terminate', [DSiElapsedTime64(time)])); Log(Format('Tasks counted up to %d and %d', [FJoinCount.Value, FJoinCount2.Value])); end;
The call to Parallel.Join starts two tasks. Because the NoWait is used, the call returns immediately and stores resulting IOmniParallelJoin interface in the local variable join. Main code then sleeps for half a second, cancels the execution and immediately waits for background tasks to terminate.
Both tasks execute a simple loop which waits a little, increments a counter and checks the cancellation flag. Because the cancellation flag is set after 500 ms, we would expect five or six repetitions of the first loop (five repetitions take exactly 500 ms and we can't tell exactly what will execute first — Cancel or fifth IsCancelled) and three repetitions of the second loop. That is exactly what the program prints out.
Exceptions in background tasks are caught and re-raised in the WaitFor method. If you are using synchronous version of Join (without the NoWait modifier), then WaitFor is called at the end of the Execute method (in other words, Parallel.Join(…).Execute will re-raise task exceptions). If, however, you are using the asynchronous version (by calling Parallel.Join(…).NoWait.Execute), exception will only be raised when you wait for the background tasks to complete by calling WaitFor.
You can test for the exception by calling the FatalException function. It will first wait for all background tasks to complete (without raising the exception) and then return the exception object. You can also detach the exception object from the Join and handle it yourself by using the DetachException function.
There's also an IsExceptional function (available in IOmniParallelJoin and IOmniJoinState interfaces) which tells you if any background task has thrown an exception.
There's an additional complication — as Join executes multiple tasks, there can be multiple background exceptions. To get you full access to those exceptions, Join wraps them into EJoinException object.
type EJoinException = class(Exception) constructor Create; reintroduce; destructor Destroy; override; procedure Add(iTask: integer; taskException: Exception); function Count: integer; property Inner[idxException: integer]: TJoinInnerException read GetInner; default; end;
This exception class contains combined error messages from all background tasks in its Message property and allows you to access exception information for all caught exceptions directly with the Inner[] property. The following code demonstrates this.
var iInnerExc: integer; begin try Parallel.Join([ procedure begin raise ETestException.Create('Exception 1 in Parallel.Join'); end, procedure begin end, procedure begin raise ETestException.Create('Exception 2 in Parallel.Join'); end]).Execute; except on E: EJoinException do begin Log('Join raised exception %s:%s', [E.ClassName, E.Message]); for iInnerExc := 0 to E.Count - 1 do Log(' Task #%d raised exception: %s:%s', [E[iInnerExc].TaskNumber, E[iInnerExc].FatalException.ClassName, E[iInnerExc].FatalException.Message]); end; end; end;
The iInnerExc variable loops over all caught exceptions and for each such exception displays the task number (starting with 0), exception class and exception message.
This approach allows you to either just log the exception or, if you are interested in details, examine specific inner exceptions and handle them appropriately.
See also demo 48_OtlParallelExceptions.