OmniThreadLibrary distribution includes plenty of demo applications that will help you get started. They are stored in the
tests subfolder. This chapter lists all tests.
0_BeepThe simplest possible OmniThreadLibrary threading code.
1_HelloWorldThreaded “Hello, World” with TOmniEventMonitor component created in runtime.
2_TwoWayHelloHello, World with bidirectional communication; TOmniEventMonitor created in runtime.
3_HelloWorld_with_packageThreaded “Hello, World” with TOmniEventMonitor component on the form.
4_TwoWayHello_with_packageHello, World with bidirectional communication; TOmniEventMonitor component on the form.
5_TwoWayHello_without_loopHello, World with bidirectional communication, the OTL way.
6_TwoWayHello_with_object_workerObsolete, almost totally equal to the demo 5_TwoWayHello_without_loop.
7_InitTestDemonstrates .WaitForInit, .ExitCode, .ExitMessage, and .SetPriority.
8_RegisterCommDemonstrates creation of additional communication channels.
9_CommunicationsSimple communication subsystem tester.
10_ContainersFull-blown communication subsystem tester. Used to verify correctness of the lock-free code.
11_ThreadPoolThread pool demo.
13_ExceptionsDemonstrates exception catching.
14_TerminateWhenDemonstrates .TerminateWhen and .WithCounter.
15_TaskGroupTask group demo.
17_MsgWaitDemonstrates .MsgWait and Windows message processing inside tasks.
18_StringMsgDispatchCalling task methods by name and address.
19_StringMsgBenchmarkBenchmarks various ways of task method invocation.
20_QuickSortParallel QuickSort demo.
21_Anonymous_methodsDemonstrates the use of anonymous methods as task workers in Delphi 2009.
22_TerminationTests for .Terminate and .Terminated.
23_BackgroundFileSearchDemonstrates file scanning in a background thread.
24_ConnectionPoolDemonstrates how to create a connection pool with OmniThreadLibrary.
25_WaitableCommDemo for ReceiveWait and SendWait.
26_MultiEventMonitorHow to run multiple event monitors in parallel.
27_RecursiveTreeParallel tree processing.
28_HooksDemo for the new hook system.
29_ImplicitEventMonitorDemo for OnMessage and OnTerminated, named method approach.
30_AnonymousEventMonitorDemo for OnMessage and OnTerminated, anonymous method approach.
31_WaitableObjectsDemo for the RegisterWaitObject/UnregisterWaitObject API.
32_QueueStress test for TOmniBaseQueue and TOmniQueue.
33_BlockingCollectionStress test for the TOmniBlockingCollection, also demoes the use of Environment to set process affinity.
34_TreeScanParallel tree scan using TOmniBlockingCollection.
35_ParallelForParallel tree scan using Parallel.ForEach (Delphi 2009 and newer).
37_ParallelJoinParallelJoin: Parallel.Join demo.
38_OrderedForOrdered parallel for loops.
40_MandelbrotVery simple parallel graphics demo.
41_PipelineMultistage parallel processes
42_MessageQueueStress test for TOmniMessageQueue.
43_InvokeAnonymousDemo for IOmniTask.Invoke.
44_Fork-Join QuickSortQuickSort implemented using Parallel.ForkJoin.
45_Fork-Join maxMax(array) implemented using Parallel.ForkJoin.
46_AsyncDemo for Parallel.Async.
47_TaskConfigDemo for task configuration with Parallel.TaskConfig.
48_OtlParallelExceptionsException handling in high-level OTL constructs.
49_FramedWorkersMultiple frames each communication with own worker task.
50_OmniValueArrayWrapping arrays, hashes and records in TOmniValue.
51_PipelineStressTestPipeline stress test by [Anton Alisov].
52_BackgroundWorkerDemo for the Parallel.BackgroundWorker abstraction.
53_AsyncAwaitDemo for the Async/Await abstraction.
54_LockManagerLock manager (IOmniLockManager<K>) demo.
55_ForEachProgressDemonstrates progress bar updating from a ForEach loop.
56_RunInvokeSimplified ‘run & invoke’ low-level API.
57_ForSimple and fast parallel for.
58_ForVsForEachSpeed comparison between Parallel.ForEach, Parallel.For, and TParallel.For (XE7+).
59_TWaitForDemo for the TWaitFor class.
60_MapDemonstrates the Parallel.Map abstraction.
61_CollectionToArrayDemonstrates the TOmniBlockingCollection.ToArray method.
62_ConsoleDemonstrates how to use OmniThreadLibrary from a console application.
63_ServiceDemonstrates how to use OmniThreadLibrary from a service application.
64_ProcessorGroups_NUMADemonstrates how to work with processor groups and NUMA nodes.
65_TimedTaskDemonstrates the Parallel.TimedTask asbstraction.
66_ThreadsInThreadsDemonstrates how to start OmniThreadLibrary threads from background threads.
OmniThreadLibrary distribution includes some complex examples, stored
examples subfolder. This chapter lists all examples. Many are also
explained in the How-to chapter.
Using COM/OLE from OmniThreadLibrary.
Redirecting output from a parallel for loop into a structure that doesn’t support multithreaded access.
Simulation of a report generator, which uses multiple background workers to generate reports; one worker per client.
Writing server-like background processing.
TOmniMessageQueue to communicate with a
Using databases from OmniThreadLibrary.
Delphi/RAD Studio XE8 and newer come with an integrated package manager called GetIt.
With GetIt you can install OmniThreadLibrary with just a few clicks. Start Delphi and open Tools, GetIt Package Manager. Enter ‘omnithreadlibrary’ into the search bar. Then click on the INSTALL button under the OmniThreadLibrary graphics.
GetIt will download OmniThreadLibrary from Embarcadero’s servers, add source path to the Library path and compile and install the design package.
To find the demos, look at the Library path. It will contain
something like this at the end:
To find the true path, look into Tools, Options, Environment Options,
Environment Variables where
BDSCatalogRepository is defined.
Removing OmniThreadLibrary from Delphi is equally simple. Just open GetIt, select the Installed category and click the UNINSTALL button under the OmniThreadLibrary graphics.
Delphinus is a 3rd party package manager for Delphi XE and newer, similar to Embarcadero’s own GetIt package manager.
Unlike GetIt, which comes integrated into Delphi, you have to install Delphinus manually. Recommended installation procedure is:
Once Delphinus is installed, click the Tools, Delphinus and in the Delphinus window click the green Refresh icon. When the package list is refreshed, enter ‘omnithreadlibrary’ into the search bar and press <Enter>. Click on the OmniThreadLibrary list item to get additional description in the panel on the right.
To install OmniThreadLibrary, click the Install icon (blue arrow pointing downwards to the disk drive). Be patient as Delphinus may take some time without displaying any progress on the screen.
When installation is complete, click the Show log button and in the log find the path where OmniThreadLibrary was installed (look for Adding libpathes message). Inside that folder you’ll also find all OmniThreadLibrary demos.
Delphinus will also compile and install appropriate package so everything is set up for you.
Removing OmniThreadLibrary from Delphi is equally simple. Just open Delphinus, select the Installed category, select OmniThreadLibrary and click the Remove icon (red circle with white X).
OtlHooks unit allows your code to hook into internal OmniThreadLibrary
processes. Currently you can register notification methods which are called when
a thread is created/destroyed, a pool is created/destroyed, or an unhandled
exception ‘escapes’ from a task.
Exception filters allow your code to be notified when an unhandled exception in a task occurs. You can also prevent exception from being stored in the IOmniTask.FatalException property.
RegisterExceptionFilter to register a custom exception filter.
UnregisterExceptionFilter to remove custom exception filter.
Exception filter can use application-specific logging code to log detailed
information about application state. It can also free the exception object
and set it to
nil, which will prevent this exception to be stored in the
If the filter sets
false, further custom exception
filters won’t be called. Filters are always called in the order in which
they were registered.
Thread notifications allow your code to be notified when a thread is created or destroyed inside the OmniThreadLibrary. This allows OmniThreadLibrary to cooperate with application-specific exception-logging code.
RegisterThreadNotification to register a thread notification method.
UnregisterThreadNotification to unregister such method.
Notification method is always called in the context of the thread being created/destroyed.
For example, the following code fragment registers/unregisters OmniThreadLibrary threads with an application-specific thread logger.
Pool notifications allow your code to be notified when a thread pool is being created or destroyed. This allows the application to modifiy pool parameters on the fly.
RegisterPoolNotification to register a pool notification method.
UnregisterPoolNotification to unregister such method.
You can, for example, use pool notification mechanism to set Asy_OnUnhandledWorkerException property whenever a thread pool is created.
This section tries to explain how the
ForEach is implemented.
Let’s start with a very simple code.
This simple code iterates from 1 to 1000 on all available cores in parallel and executes a simple procedure that contains no workload. All in all, the code will do nothing - but it will do it in a very complicated manner.
ForEach method creates new
TOmniParallelLoop<integer> object (that’s the object that will coordinate parallel tasks) and passes it a source provider - an object that knows how to access values that are being enumerated (integers from
1000 in this example).
OtlDataManager unit contains four different source providers - one for each type of source that can be passed to the
ForEach method. If there is a need to extend
ForEach with a new enumeration source, I would only have to add few simple methods to the OtlParallel unit and write a new source provider.
Parallel for tasks are started in
InternalExecuteTask. This method first creates a data manager and attaches it to the source provider (compare this with the picture above - there is one source provider and one data manager). Next it creates an appropriate number of tasks and calls the task-specific delegate method from each one. [This delegate wraps your parallel code and provides it with proper input (and sometimes, output). There are many calls to
InternalExecuteTask in the OtlParallel unit, each with a different
taskDelegate and each providing support for a different kind of the loop.]
Data manager is a global field in the
TOmniParallelLoop<T> object so that it can be simply reused from the task delegate. The simplest possible task delegate (below) just creates a local queue and fetches values from the local queue one by one. This results in many local queues - one per task - all connected to the same data manager.
In case you’re wondering what
loopBody is - it is the anonymous method you have passed to the
PreserveOrdermethod is called in the high-level code).
All this was designed to provide fast data access (blocking is limited to the source provider, all other interactions are lock-free), good workload distribution (when a task runs out of work before other tasks, it will steal some work from other tasks) and output ordering (when required).
A source provider is an object that fetches data from the enumeration source (the data that was passed to the parallel for) and repackages it into a format suitable for parallel consumption. Currently there are three source providers defined in the OtlDataManager unit.
Iterates over integer ranges (just like a ‘normal’
for statement does). As such, it doesn’t really fetch data from enumeration source but generates it internally.
IOmniValueEnumerator, which is a special enumerator that can be accessed from multiple readers and doesn’t require locking. Currently it is only provided by the
Iterates over Windows enumerators (
IEnumerator) or Delphi enumerators (
GetEnumerator, wrapped into
All source providers descend from an abstract class
TOmniSourceProvider which provides common source provider interface. In theory, an interface should be used for that purpose, but in practice source providers are very performance intensive and not using interfaces speeds the program by a measurable amount.
Not all source providers are created equal and that’s why function
GetCapabilities returns source provider capabilities:
TOmniIntegerRangeProvider is both countable (it’s very simple to know how many values are between
10, for example) and fast (it takes same amount of time to fetch
10 values or
10.000 values) while other two source providers are neither countable nor fast. The third capability,
spcDataLimit is obsolete and not used. It was replaced by the
The other important aspect of a source provider is the
GetPackage method. It accesses the source (by ensuring a locked access if necessary), retrieves data and returns it in the data package. Implementation is highly dependent on the source data. For example, integer source provider just advances the current low field value and returns data package that doesn’t contain bunch of values but just low and high boundaries (and that’s why it is considered to be fast). Enumerator source provider locks the source, fetches the data and builds data package value by value. And in the simplest case,
TOmniValueEnumerator source provider just fetches values and builds data package.
Data manager is the central hub in the OtlDataManager hierarchy. It seats between multiple local queues and the single source provider and makes sure that all parallel tasks always have some work to do.
Two different data managers are implemented at the moment - a countable data manager and a heuristic data manager. The former is used if source provider is countable and the latter if it is not. Both descend from the abstract class
The main difference between them lies in function
GetNextFromProvider which reads data from the source provider (by calling its
GetPackage method). In the countable provider this is just a simple forwarder while in the heuristic provider this function tries to find a good package size that will allow all parallel tasks to work at the full speed.
Each parallel task reads data from a local queue, which is just a simple interface to the data manager. The most important part of a local queue is its
GetNext method which provides the task with the next value.
Each local queue contains a local data package.
GetNext first tries to read next value from that data package. If that fails (data packages is empty - it was already fully processed), it tries to get new data package from the data manager and (if successful) retries fetching next data from the (refreshed) data package.
GetNext in the data manager first tries to get next package from the source provider (via private method
GetNextFromProvider which calls source provider’s
GetPackage method). If that fails, it tries to steal part of workload from another task.
Stealing is the feature that allows all parallel tasks to be active up to the last value being enumerated. To implement it, data manager iterates over all local queues and tries to split each local queue’s data package in half. If that succeeds, half of data package is left in the original local queue and another half is returned to the local queue that requested more data.
Package splitting is highly dependent on data type. For example, integer data package just recalculates boundaries while enumerator-based packages must copy data around.
PreserveOrder) is usually used together with the
Into modifier. The reason lies in the integration between the
Parallel.ForEach infrastructure and your parallel code (the one that is executing as
Execute payload). In the ‘normal’
ForEach, output from this parallel payload is not defined. You are allowed to generate any output in the payload but
ForEach will know nothing about that. In this case OTL has no ability to preserver ordering because - at least from the viewpoint of the library - the parallelized code is producing no output.
Into is used, however, your code uses a different signature (different parameters).
Parallel payload now takes two parameters. First is - as in the more common case - the input value while the second takes the output value. As you can see from the example, the parallelized code can produce zero or one output but not more.
This small modification changes everything. As the Parallel infrastructure has control over the output parameter it can manage it internally, associate it with the input and make sure that output is generated in the same order as input was.
Let’s look at the innermost code - the part that is scheduling parallel tasks. When
Into is used,
InternalExecuteTask executes the following quite complicated code.
Important points here are:
oplIntoQueueIntffield contains a value passed to the
The interesting part is hidden in the background; inside local queue, data manager and output buffer.
The first modification lies in the data source. When
PreserveOrder is used, each data package knows the source position it was read from. To simplify matters, data package splitting is not used in this case. [And because of that, data stealing cannot be used causing slightly less effective use of CPU as in the simpler
Each local queue has an output buffer set associated with it.
Each output buffer set manages two output buffers. One is active and task is writing into it and another may be either empty or full. Each output buffer is associated with an input position - just as the data package is.
When we look at data reading/writing from perspective of one task, everything is very simple. The task is reading data from a local queue (which reads data from a data package, associated with some position) and writing it to an output buffer (associated with the same position).
The tricky part comes up when the data package is exhausted (the
if not Result branch in the code below).
First, the currently active buffer is marked as full. This causes
NotifyBufferFull to be called (see below). Then, alternate buffer is activated. This call (
ActivateBuffer) will actually block if alternate buffer is not free. In this case, the current thread is blocked until one of its buffers is written into the output queue.
From this point on,
GetNext proceeds in the same way as when used in the simple
ForEach, except that it sets active buffer’s position whenever new data package is read from the data manager.
The other part of the magic happens in the method that is called from
MarkFull. It walks the buffer list and checks if there are any output buffers that are a) full and b) destined for the current output position. Such buffers are copied to the output and returned into use.