Pipeline

Pipeline abstraction encapsulates multistage processes in which the data processing can be split into multiple independent stages connected with data queues.



*When you call Parallel.Pipeline, background tasks for all stages are started each in its own thread.
The system also creates input queue, output queue and interconnecting queues.*

See also demo 41_Pipeline.

Example:

uses
  OtlCommon,
  OtlCollections,
  OtlParallel;
 
var
  sum: integer;
 
sum := Parallel.Pipeline
  .Stage(
    procedure (const input, output: IOmniBlockingCollection)
    var
      i: integer;
    begin
      for i := 1 to 1000000 do
        output.Add(i);
    end)
  .Stage(
    procedure (const input: TOmniValue; var output: TOmniValue)
    begin
      output := input.AsInteger * 3;
    end)
  .Stage(
    procedure (const input, output: IOmniBlockingCollection)
    var
      sum: integer;
      value: TOmniValue;
    begin
      sum := 0;
      for value in input do
        Inc(sum, value);
      output.Add(sum);
    end)
  .Run.Output.Next;

This example creates a three-stage pipeline. First stage generates numbers from 1 to 1.000,000. Second stage multiplies each number by 3. Third stage adds all numbers together and writes result to the output queue. This result is then stored in the variable sum.

Background

The Pipeline abstraction is appropriate for parallelizing processes that can be split into stages (subprocesses), connected with data queues. Data flows from the (optional) input queue into the first stage, where it is partially processed and then emitted into intermediary queue. First stage then continues execution, processes more input data and outputs more output data. This continues until complete input is processed. Intermediary queue leads into the next stage which does the processing in a similar manner and so on and on. At the end, the data is output into a queue which can be then read and processed by the program that created this multistage process. As a whole, a multistage process acts as a pipeline — data comes in, data comes out (and a miracle occurs in-between ;)).

What is important here is that no stage shares state with any other stage. The only interaction between stages is done with the data passed through the intermediary queues. The quantity of data, however, doesn't have to be constant. It is entirely possible for a stage to generate more or less data than it received on the input queue.

In a classical single-threaded program the execution plan for a multistage process is very simple.

In a multithreaded environment, however, we can do better than that. Because the stages are largely independent, they can be executed in parallel.

Basics

A pipeline is created by calling Parallel.Pipeline function which returns an IOmniPipeline interface. There are three overloaded versions of this function. The first creates an unconfigured pipeline. The second prepares one or more stages and optionally sets the input queue. The third prepares one or more stages with a different method signature.

TPipelineStageDelegate = reference to procedure (const input, output:
  IOmniBlockingCollection);
TPipelineStageDelegateEx = reference to procedure (const input, output:
  IOmniBlockingCollection; const task: IOmniTask);
 
class function Pipeline: IOmniPipeline; overload;
class function Pipeline(const stages: array of TPipelineStageDelegate;
  const input: IOmniBlockingCollection = nil): IOmniPipeline; overload;
class function Pipeline(const stages: array of TPipelineStageDelegateEx;
  const input: IOmniBlockingCollection = nil): IOmniPipeline; overload;

Stages are implemented as anonymous procedures, procedures or methods taking two queue parameters — one for input and one for output. Except in the first stage where the input queue may not be defined, both are automatically created by the Pipeline implementation and passed to the stage delegate.

Pipeline also supports concept of simple stages where stage method accepts a TOmniValue input and provides a TOmniValue output. In this case, OmniThreadLibrary provides the loop which reads data from the input queue, calls your stage code and writes data to the output queue.

TPipelineSimpleStageDelegate = reference to procedure(
  const input: TOmniValue; var output: TOmniValue);

Simple stage can produce zero or one data element for each input. If the code assigns a value to the output parameter, this value will be written to the output queue. But if the code does not assign a value to this parameter, nothing will be written.

IOmniPipeline

All Parallel.Pipeline overloads return the IOmniPipeline interface.

IOmniPipeline = interface
  function  GetInput: IOmniBlockingCollection;
  function  GetOutput: IOmniBlockingCollection;
//
  procedure Cancel;
  function  From(const queue: IOmniBlockingCollection): IOmniPipeline;
  function  HandleExceptions: IOmniPipeline;
  function  NumTasks(numTasks: integer): IOmniPipeline;
  function  OnStop(const stopCode: TProc): IOmniPipeline;
  function  Run: IOmniPipeline;
  function  Stage(
    pipelineStage: TPipelineSimpleStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stage(
    pipelineStage: TPipelineStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stage(
    pipelineStage: TPipelineStageDelegateEx; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(
    const pipelineStages: array of TPipelineSimpleStageDelegate;
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(
    const pipelineStages: array of TPipelineStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(
    const pipelineStages: array of TPipelineStageDelegateEx; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Throttle(numEntries: integer; unblockAtCount: integer = 0): 
    IOmniPipeline;
  function  WaitFor(timeout_ms: cardinal): boolean;
  property Input: IOmniBlockingCollection read GetInput;
  property Output: IOmniBlockingCollection read GetOutput;
end;

Various Stage and Stages overloads can be used to define one or more stages and optionally configure them with a task configuration block.

The Run function does all the hard work. It creates queues and sets up OmniThreadLibrary tasks.

The OnStop function assigns a termination handler - a code that will be called when all pipeline stages stop working. This termination handler is called from one of the worker threads, not from the main thread! If you need to run a code in the main thread, use the task configuration block on the last stage.

Another possibility is to use another variation of OnStop that accepts a delegate with an IOmniTask parameter. You can then use ''task.Invoke'' to execute a code in the context of the main thread.

The From function sets the input queue. It will be passed to the first stage in the input parameter. If your code does not call this function, OmniThreadLibrary will automatically create the input queue for you. Input queue can be accessed through the Input property.

The output queue is always created automatically. It can be accessed through the Output property. Even if the last stage doesn't produce any result this queue can be used to signal the end of computation. [When each stage ends, CompleteAdding is automatically called on the output queue. This allows the next stage to detect the end of input (blocking collection enumerator will exit or TryTake will return false). Same goes on for the output queue.]

The WaitFor function waits for up to timeout_ms milliseconds for the pipeline to finish the work. It returns True if all stages have processed the data before the time interval expires.

The NumTasks function sets the number of parallel execution tasks for the stage(s) just added with the Stage(s) function (IOW, call Stage followed by NumTasks to do that). If it is called before any stage is added, it will specify the default for all stages. Number of parallel execution tasks for a specific stage can then still be overridden by calling NumTasks after the Stage is called. See the [Parallel stages section below for more information.]

The Throttle functions sets the throttling parameters for stage(s) just added with the Stage(s) function. Just as the NumTask it affects either the global defaults or just currently added stage(s). By default, throttling is set to 10240 elements. See the [Throttling section below for more info.]

The HandleExceptions function changes the stage wrapper code so that it will pass exceptions from the input queue to the stage code. Just as the NumTask it affects either the global defaults or just currently added stage(s). See the [Exceptions section below for more info.]

An example will help explain all this.

Parallel.Pipeline
  .Throttle(102400)
  .Stage(StageGenerate)
  .Stage(StageMult2)
  .Stages([StageMinus3, StageMod5])
    .NumTasks(2)
  .Stage(StageSum)
  .Run;

First, a global throttling parameter is set. It will be applied to all stages. Two stages are then added, each with a separate call to the Stage function.

Another two stages are then added with one call. They are both set to execute in two parallel tasks. At the end another stage is added and the whole setup is executed.

The complete process will use seven tasks (one for StageGenerate, one for StageMult2, two for StageMinus3, two for StageMod5 and one for StageSum).

Generators, Mutators, and Aggregators

Let's take a look at three different examples of multiprocessing stages.

A stage may accept no input and just generate an output. This will only happen in the first stage. (A middle stage accepting no input would render the whole pipeline rather useless.)

procedure StageGenerate(const input, output: IOmniBlockingCollection);
var
  i: integer;
begin
  for i := 1 to 1000000 do
    output.Add(i);
end;

A stage may also read data from the input and generate the output. Zero, one or more elements could be generated for each input.

procedure StageMult2(const input, output: IOmniBlockingCollection);
var
  value: TOmniValue;
begin
  for value in input do
    output.Add(2 * value.AsInteger);
end;

The last example is a stage that reads data from the input, performs some operation on it and returns the aggregation of this data.

procedure StageSum(const input, output: IOmniBlockingCollection);
var
  sum  : integer;
  value: TOmniValue;
begin
  sum := 0;
  for value in input do
    Inc(sum, value);
  output.Add(sum);
end;

All examples are just special cases of the general principle — there's no correlation required between the amount of input data and the amount of output data. There's also absolutely no requirement that data must be all numbers. Feel free to pass around anything that can be contained in TOmniValue.

Throttling

In some cases, large amount of data may be passed through the multistage process. If one stage is suspended for some time — or if it performs a calculation that is slower than the calculation in the previous stage — this stage's input queue may fill up with data which can cause lots of memory to be allocated and later released. To even out the data flow, Pipeline uses throttling.

Throttling sets the maximum size of the blocking collection (in TOmniValue units). When the specified quantity of data items is stored in the collection, no more data can be added. The Add function will simply block until the collection is empty enough or CompleteAdding has been called. Collection is deemed to be empty enough when the data count drops below some value which can be either passed as a second parameter to the Throttle function or is calculated as a 3/4 of the maximum size limit if the second parameter is not provided.

Parallel Stages

Usually, one thread is started for each stage in the pipeline. In some specialized cases, however, it may be desirable to run more than one parallel task for each stage.

There's always only one queue sitting between stages even if there are multiple processing units for a stage. This is easily accomplished by IOmniBlockingCollection supporting multiple readers and multiple writers in a threadesafe manner.

There's an important caveat, though. If you split a stage into multiple tasks, data will be processed in an indeterminate order. You cannot know how many items will be processed by each task and in which order they will be processed. Even worse — data will exit multitask stage in an indeterminate order (data output from one task will be interleaved with the data from the other task). As of this moment there's no way to enforce original ordering.

Exceptions

In the Pipeline abstraction, exceptions are also passed over the pipeline (through the interconnecting queues) from a stage to a stage.

If an unhandled exception occurs in a stage, it gets caught by the wrapper code and is enqueued to the output queue. When data element containing exception will be read by the next stage, it will automatically generate an exception which will get passed to the next output queue. In this way, exception will progress through the pipeline and will be inserted into the output queue.

If you want to handle exceptions in one of the stages, call the HandleExceptions function after declaring the stage. (You can also call this function before declaring any stage - that way it will change behaviour for all stages.) You can then call the IsException and AsException functions on the input value to check whether a value contains an exception and to access this exception.

Handling the last (output) stage is slightly different. If you don't want to reraise exceptions when data is read from the pipeline output, you have to turn the reraise exception flag off on the output queue by calling pipeline.Output.ReraiseExceptions(false).

The following example demonstrates the use of exception-handling functions.

procedure StageException1(const input: TOmniValue; var output: TOmniValue);
begin
  output := input.AsInteger * 42;
end;
 
procedure StageException2(const input, output: IOmniBlockingCollection);
var
  outVal: TOmniValue;
  value : TOmniValue;
begin
  for value in input do begin
    if value.IsException then begin
      value.AsException.Free;
      outVal.Clear;
    end
    else
      outVal := 1 / value.AsInteger;
    if not output.TryAdd(outVal) then
      break; //for
  end;
end;
 
procedure TfrmOtlParallelExceptions.btnPipeline1Click(Sender: TObject);
var
  pipeline: IOmniPipeline;
  value   : TOmniValue;
begin
  //Stage 2 should accept and correct stage 1 exception 
  //(third output will be empty)
  pipeline := Parallel.Pipeline
    .Stage(StageException1)
    .Stage(StageException2)
      .HandleExceptions
    .Run;
 
  // Provide input
  with pipeline.Input do begin
    // few normal elements
    Add(1);
    Add(2);
    // then trigger the exception in the first stage;
    // this exception should be 'corrected' in the second stage
    Add('three');
    Add(4);
    CompleteAdding;
  end;
 
  // Process output; there should be no exception in the output collection
  for value in pipeline.Output do
    Log(value.AsString);
end;

See also demo 48_OtlParallelExceptions.

Examples

Practical example of Pipeline usage can be found in chapter Web Download and Database Storage.

book/highlevel/pipeline.txt · Last modified: 2012/11/14 12:21 by gabr
Recent changes RSS feed Debian Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki