Join abstraction enables you to start multiple background tasks in multiple threads. To create the task, call
*When you use Parallel.Join, background tasks are started in background threads. By default, Join waits
for all background threads to complete before the control is returned to the caller.*
See also demo
Parallel.Join( procedure var i: integer; begin for i := 1 to 8 do begin Sleep(200); MessageBeep($FFFFFFFF); end, procedure var i: integer; begin for i := 1 to 10 do begin Sleep(160); MessageBeep($FFFFFFFF); end; end ).Execute;
This simple program executes two background tasks, each beeping at different frequency. Each task is coded as an anonymous method but you can also use a normal method or a normal procedure for the task code.
Parallel class defines five
Join overloads. The first creates empty
IOmniParallelJoin interface. Next two create same interface but configured with two tasks and last two contain this interface configured for any number of tasks. Tasks can be of two different types â€” parameterless methods and methods containing one parameter of the
type TOmniJoinDelegate = reference to procedure (const joinState: IOmniJoinState); Parallel = class class function Join: IOmniParallelJoin; overload; class function Join(const task1, task2: TProc): IOmniParallelJoin; overload; class function Join(const task1, task2: TOmniJoinDelegate): IOmniParallelJoin; overload; class function Join(const tasks: array of TProc): IOmniParallelJoin; overload; class function Join(const tasks: array of TOmniJoinDelegate): IOmniParallelJoin; overload; ... end;
Parallel.Join returns an
IOmniParallelJoin interface which you can use to specify tasks, start and control execution and handle exceptions.
type IOmniParallelJoin = interface function Cancel: IOmniParallelJoin; function DetachException: Exception; function Execute: IOmniParallelJoin; function FatalException: Exception; function IsCancelled: boolean; function IsExceptional: boolean; function NumTasks(numTasks: integer): IOmniParallelJoin; function OnStop(const stopCode: TProc): IOmniParallelJoin; function Task(const task: TProc): IOmniParallelJoin; overload; function Task(const task: TOmniJoinDelegate): IOmniParallelJoin; overload; function TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin; function NoWait: IOmniParallelJoin; function WaitFor(timeout_ms: cardinal): boolean; end;
The most important of these functions is
Execute. It will start appropriate number of background threads and start executing tasks in those threads. By default, Join uses as many threads as there are tasks but you can override this behaviour by calling the
You can add tasks to the
Join by calling the
Task function. In fact, that's just how the
Parallel.Join overloads are implemented.
class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin; begin Result := TOmniParallelJoin.Create.Task(task1).Task(task2); end; class function Parallel.Join(const tasks: array of TProc): IOmniParallelJoin; var aTask: TProc; begin Result := TOmniParallelJoin.Create; for aTask in tasks do Result.Task(aTask); end;
To set up task configuration block, call the
TaskConfig function. Following example uses
TaskConfig to set up critical section which is then used in two parallel tasks to protect the shared resource. Workers use the
IOmniJoinState instance to access the ''IOmniTask'' interface and through it the ''Lock'' property.
FSharedValue := 42; Parallel.Join( procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 1000000 do begin joinState.Task.Lock.Acquire; FSharedValue := FSharedValue + 17; joinState.Task.Lock.Release; end; end, procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 1000000 do begin joinState.Task.Lock.Acquire; FSharedValue := FSharedValue - 17; joinState.Task.Lock.Release; end; end ).TaskConfig(Parallel.TaskConfig.WithLock(CreateOmniCriticalSection)) .Execute;
Join will wait for all background tasks to complete execution. Alternatively, you can call the
NoWait function, after which
Join will just start the tasks and return immediately. If you want to be notified when all tasks are finished, you can assign the termination handler by calling the
OnStop function. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block.
You can also call
WaitFor to wait for the
Join to finish.
WaitFor accepts an optional timeout parameter; by default it will wait as long as needed.
Tasks can be defined to accept a parameter of type
IOmniJoinState. This allows them to access the ''IOmniTask'' interface, participate in the cooperative cancellation and check for exceptions.
type IOmniJoinState = interface function GetTask: IOmniTask; // procedure Cancel; function IsCancelled: boolean; function IsExceptional: boolean; property Task: IOmniTask read GetTask; end;
Join background tasks support cooperative cancellation. If you are using
TOmniJoinDelegate tasks (that is, tasks accepting the IOmniJoinState parameter), any task can call the
Cancel method of this interface. This, in turn, sets internal cancellation flag which may be queried by calling the
IsCancelled method. That way, one task can interrupt other tasks provided that they are testing
Main thread can also cancel its subtasks (when using
NoWait) by calling
IOmniParallelJoin.Cancel and can test the cancellation flag by calling
The following demo code demonstrates most of concepts mentioned above.
var join: IOmniParallelJoin; time: int64; begin FJoinCount.Value := 0; FJoinCount2.Value := 0; join := Parallel.Join( procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 10 do begin Sleep(100); FJoinCount.Increment; if joinState.IsCancelled then break; //for end; end, procedure (const joinState: IOmniJoinState) var i: integer; begin for i := 1 to 10 do begin Sleep(200); FJoinCount2.Increment; if joinState.IsCancelled then break; //for end; end ).NoWait.Execute; Sleep(500); time := DSiTimeGetTime64; join.Cancel.WaitFor(INFINITE); Log(Format('Waited %d ms for joins to terminate', [DSiElapsedTime64(time)])); Log(Format('Tasks counted up to %d and %d', [FJoinCount.Value, FJoinCount2.Value])); end;
The call to
Parallel.Join starts two tasks. Because the
NoWait is used, the call returns immediately and stores resulting
IOmniParallelJoin interface in the local variable
join. Main code then sleeps for half a second, cancels the execution and immediately waits for background tasks to terminate.
Both tasks execute a simple loop which waits a little, increments a counter and checks the cancellation flag. Because the cancellation flag is set after 500 ms, we would expect five or six repetitions of the first loop (five repetitions take exactly 500 ms and we can't tell exactly what will execute first â€”
Cancel or fifth
IsCancelled) and three repetitions of the second loop. That is exactly what the program prints out.
Exceptions in background tasks are caught and re-raised in the
WaitFor method. If you are using synchronous version of Join (without the
NoWait modifier), then
WaitFor is called at the end of the
Execute method (in other words,
Parallel.Join(…).Execute will re-raise task exceptions). If, however, you are using the asynchronous version (by calling
Parallel.Join(…).NoWait.Execute), exception will only be raised when you wait for the background tasks to complete by calling
You can test for the exception by calling the
FatalException function. It will first wait for all background tasks to complete (without raising the exception) and then return the exception object. You can also detach the exception object from the Join and handle it yourself by using the
There's also an
IsExceptional function (available in
IOmniJoinState interfaces) which tells you if any background task has thrown an exception.
There's an additional complication â€” as Join executes multiple tasks, there can be multiple background exceptions. To get you full access to those exceptions, Join wraps them into
type EJoinException = class(Exception) constructor Create; reintroduce; destructor Destroy; override; procedure Add(iTask: integer; taskException: Exception); function Count: integer; property Inner[idxException: integer]: TJoinInnerException read GetInner; default; end;
This exception class contains combined error messages from all background tasks in its
Message property and allows you to access exception information for all caught exceptions directly with the
Inner property. The following code demonstrates this.
var iInnerExc: integer; begin try Parallel.Join([ procedure begin raise ETestException.Create('Exception 1 in Parallel.Join'); end, procedure begin end, procedure begin raise ETestException.Create('Exception 2 in Parallel.Join'); end]).Execute; except on E: EJoinException do begin Log('Join raised exception %s:%s', [E.ClassName, E.Message]); for iInnerExc := 0 to E.Count - 1 do Log(' Task #%d raised exception: %s:%s', [E[iInnerExc].TaskNumber, E[iInnerExc].FatalException.ClassName, E[iInnerExc].FatalException.Message]); end; end; end;
iInnerExc variable loops over all caught exceptions and for each such exception displays the task number (starting with 0), exception class and exception message.
This approach allows you to either just log the exception or, if you are interested in details, examine specific inner exceptions and handle them appropriately.
See also demo