Pipeline abstraction encapsulates multistage processes in which the data processing can be split into multiple independent stages connected with data queues.
See also demo
uses OtlCommon, OtlCollections, OtlParallel; var sum: integer; sum := Parallel.Pipeline .Stage( procedure (const input, output: IOmniBlockingCollection) var i: integer; begin for i := 1 to 1000000 do output.Add(i); end) .Stage( procedure (const input: TOmniValue; var output: TOmniValue) begin output := input.AsInteger * 3; end) .Stage( procedure (const input, output: IOmniBlockingCollection) var sum: integer; value: TOmniValue; begin sum := 0; for value in input do Inc(sum, value); output.Add(sum); end) .Run.Output.Next;
This example creates a three-stage pipeline. First stage generates numbers from 1 to 1.000,000. Second stage multiplies each number by 3. Third stage adds all numbers together and writes result to the output queue. This result is then stored in the variable
The Pipeline abstraction is appropriate for parallelizing processes that can be split into stages (subprocesses), connected with data queues. Data flows from the (optional) input queue into the first stage, where it is partially processed and then emitted into intermediary queue. First stage then continues execution, processes more input data and outputs more output data. This continues until complete input is processed. Intermediary queue leads into the next stage which does the processing in a similar manner and so on and on. At the end, the data is output into a queue which can be then read and processed by the program that created this multistage process. As a whole, a multistage process acts as a pipeline â€” data comes in, data comes out (and a miracle occurs in-between ;)).
What is important here is that no stage shares state with any other stage. The only interaction between stages is done with the data passed through the intermediary queues. The quantity of data, however, doesn't have to be constant. It is entirely possible for a stage to generate more or less data than it received on the input queue.
In a classical single-threaded program the execution plan for a multistage process is very simple.
In a multithreaded environment, however, we can do better than that. Because the stages are largely independent, they can be executed in parallel.
A pipeline is created by calling
Parallel.Pipeline function which returns an
IOmniPipeline interface. There are three overloaded versions of this function. The first creates an unconfigured pipeline. The second prepares one or more stages and optionally sets the input queue. The third prepares one or more stages with a different method signature.
TPipelineStageDelegate = reference to procedure (const input, output: IOmniBlockingCollection); TPipelineStageDelegateEx = reference to procedure (const input, output: IOmniBlockingCollection; const task: IOmniTask); class function Pipeline: IOmniPipeline; overload; class function Pipeline(const stages: array of TPipelineStageDelegate; const input: IOmniBlockingCollection = nil): IOmniPipeline; overload; class function Pipeline(const stages: array of TPipelineStageDelegateEx; const input: IOmniBlockingCollection = nil): IOmniPipeline; overload;
Stages are implemented as anonymous procedures, procedures or methods taking two queue parameters â€” one for input and one for output. Except in the first stage where the input queue may not be defined, both are automatically created by the Pipeline implementation and passed to the stage delegate.
Pipeline also supports concept of simple stages where stage method accepts a
TOmniValue input and provides a
TOmniValue output. In this case, OmniThreadLibrary provides the loop which reads data from the input queue, calls your stage code and writes data to the output queue.
TPipelineSimpleStageDelegate = reference to procedure( const input: TOmniValue; var output: TOmniValue);
Simple stage can produce zero or one data element for each input. If the code assigns a value to the
output parameter, this value will be written to the output queue. But if the code does not assign a value to this parameter, nothing will be written.
Parallel.Pipeline overloads return the
IOmniPipeline = interface function GetInput: IOmniBlockingCollection; function GetOutput: IOmniBlockingCollection; // procedure Cancel; function From(const queue: IOmniBlockingCollection): IOmniPipeline; function HandleExceptions: IOmniPipeline; function NumTasks(numTasks: integer): IOmniPipeline; function OnStop(const stopCode: TProc): IOmniPipeline; function Run: IOmniPipeline; function Stage( pipelineStage: TPipelineSimpleStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stage( pipelineStage: TPipelineStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stage( pipelineStage: TPipelineStageDelegateEx; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages( const pipelineStages: array of TPipelineSimpleStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages( const pipelineStages: array of TPipelineStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages( const pipelineStages: array of TPipelineStageDelegateEx; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Throttle(numEntries: integer; unblockAtCount: integer = 0): IOmniPipeline; function WaitFor(timeout_ms: cardinal): boolean; property Input: IOmniBlockingCollection read GetInput; property Output: IOmniBlockingCollection read GetOutput; end;
Stages overloads can be used to define one or more stages and optionally configure them with a task configuration block.
Run function does all the hard work. It creates queues and sets up OmniThreadLibrary tasks.
OnStop function assigns a termination handler - a code that will be called when all pipeline stages stop working. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block on the last stage.
Another possibility is to use another variation of
OnStop that accepts a delegate with an
IOmniTask parameter. You can then use ''task.Invoke'' to execute a code in the context of the main thread.
From function sets the input queue. It will be passed to the first stage in the
input parameter. If your code does not call this function, OmniThreadLibrary will automatically create the input queue for you. Input queue can be accessed through the
The output queue is always created automatically. It can be accessed through the
Output property. Even if the last stage doesn't produce any result this queue can be used to signal the end of computation. [When each stage ends, CompleteAdding is automatically called on the output queue. This allows the next stage to detect the end of input (blocking collection enumerator will exit or TryTake will return false). Same goes on for the output queue.]
WaitFor function waits for up to
timeout_ms milliseconds for the pipeline to finish the work. It returns
True if all stages have processed the data before the time interval expires.
NumTasks function sets the number of parallel execution tasks for the stage(s) just added with the
Stage(s) function (IOW, call
Stage followed by
NumTasks to do that). If it is called before any stage is added, it will specify the default for all stages. Number of parallel execution tasks for a specific stage can then still be overridden by calling
NumTasks after the
Stage is called. See the [Parallel stages section below for more information.]
Throttle functions sets the throttling parameters for stage(s) just added with the
Stage(s) function. Just as the
NumTask it affects either the global defaults or just currently added stage(s). By default, throttling is set to 10240 elements. See the [Throttling section below for more info.]
HandleExceptions function changes the stage wrapper code so that it will pass exceptions from the input queue to the stage code. Just as the
NumTask it affects either the global defaults or just currently added stage(s). See the [Exceptions section below for more info.]
An example will help explain all this.
Parallel.Pipeline .Throttle(102400) .Stage(StageGenerate) .Stage(StageMult2) .Stages([StageMinus3, StageMod5]) .NumTasks(2) .Stage(StageSum) .Run;
First, a global throttling parameter is set. It will be applied to all stages. Two stages are then added, each with a separate call to the
Another two stages are then added with one call. They are both set to execute in two parallel tasks. At the end another stage is added and the whole setup is executed.
The complete process will use seven tasks (one for StageGenerate, one for StageMult2, two for StageMinus3, two for StageMod5 and one for StageSum).
Let's take a look at three different examples of multiprocessing stages.
A stage may accept no input and just generate an output. This will only happen in the first stage. (A middle stage accepting no input would render the whole pipeline rather useless.)
procedure StageGenerate(const input, output: IOmniBlockingCollection); var i: integer; begin for i := 1 to 1000000 do output.Add(i); end;
A stage may also read data from the input and generate the output. Zero, one or more elements could be generated for each input.
procedure StageMult2(const input, output: IOmniBlockingCollection); var value: TOmniValue; begin for value in input do output.Add(2 * value.AsInteger); end;
The last example is a stage that reads data from the input, performs some operation on it and returns the aggregation of this data.
procedure StageSum(const input, output: IOmniBlockingCollection); var sum : integer; value: TOmniValue; begin sum := 0; for value in input do Inc(sum, value); output.Add(sum); end;
All examples are just special cases of the general principle â€” there's no correlation required between the amount of input data and the amount of output data. There's also absolutely no requirement that data must be all numbers. Feel free to pass around anything that can be contained in TOmniValue.
In some cases, large amount of data may be passed through the multistage process. If one stage is suspended for some time â€” or if it performs a calculation that is slower than the calculation in the previous stage â€” this stage's input queue may fill up with data which can cause lots of memory to be allocated and later released. To even out the data flow, Pipeline uses throttling.
Throttling sets the maximum size of the blocking collection (in TOmniValue units). When the specified quantity of data items is stored in the collection, no more data can be added. The Add function will simply block until the collection is empty enough or
CompleteAdding has been called. Collection is deemed to be empty enough when the data count drops below some value which can be either passed as a second parameter to the Throttle function or is calculated as a 3/4 of the maximum size limit if the second parameter is not provided.
Usually, one thread is started for each stage in the pipeline. In some specialized cases, however, it may be desirable to run more than one parallel task for each stage.
There's always only one queue sitting between stages even if there are multiple processing units for a stage. This is easily accomplished by
IOmniBlockingCollection supporting multiple readers and multiple writers in a threadesafe manner.
There's an important caveat, though. If you split a stage into multiple tasks, data will be processed in an indeterminate order. You cannot know how many items will be processed by each task and in which order they will be processed. Even worse â€” data will exit multitask stage in an indeterminate order (data output from one task will be interleaved with the data from the other task). As of this moment there's no way to enforce original ordering.
In the Pipeline abstraction, exceptions are also passed over the pipeline (through the interconnecting queues) from a stage to a stage.
If an unhandled exception occurs in a stage, it gets caught by the wrapper code and is enqueued to the output queue. When data element containing exception will be read by the next stage, it will automatically generate an exception which will get passed to the next output queue. In this way, exception will progress through the pipeline and will be inserted into the output queue.
If you want to handle exceptions in one of the stages, call the
HandleExceptions function after declaring the stage. (You can also call this function before declaring any stage - that way it will change behaviour for all stages.) You can then call the
AsException functions on the input value to check whether a value contains an exception and to access this exception.
Handling the last (output) stage is slightly different. If you don't want to reraise exceptions when data is read from the pipeline output, you have to turn the reraise exception flag off on the output queue by calling
The following example demonstrates the use of exception-handling functions.
procedure StageException1(const input: TOmniValue; var output: TOmniValue); begin output := input.AsInteger * 42; end; procedure StageException2(const input, output: IOmniBlockingCollection); var outVal: TOmniValue; value : TOmniValue; begin for value in input do begin if value.IsException then begin value.AsException.Free; outVal.Clear; end else outVal := 1 / value.AsInteger; if not output.TryAdd(outVal) then break; //for end; end; procedure TfrmOtlParallelExceptions.btnPipeline1Click(Sender: TObject); var pipeline: IOmniPipeline; value : TOmniValue; begin //Stage 2 should accept and correct stage 1 exception //(third output will be empty) pipeline := Parallel.Pipeline .Stage(StageException1) .Stage(StageException2) .HandleExceptions .Run; // Provide input with pipeline.Input do begin // few normal elements Add(1); Add(2); // then trigger the exception in the first stage; // this exception should be 'corrected' in the second stage Add('three'); Add(4); CompleteAdding; end; // Process output; there should be no exception in the output collection for value in pipeline.Output do Log(value.AsString); end;
See also demo
Practical example of Pipeline usage can be found in chapter Web Download and Database Storage.