This part of the book contains practical examples of OmniThreadLibrary usage. Each of them starts with a question that introduces the problem and continues with the discussion of the solution.
Following topics are covered:
Scanning folders and files in a background thread.
How to write ‘blocking’ multithreaded code with a mechanism similar to .NET’s
Multiple workers downloading data and storing it in a single database.
Redirecting output from a parallel for loop into a structure that doesn’t support multithreaded access.
taskIndex property and task initializer delegate to provide a per-task data storage in Parallel.For.
Writing server-like background processing.
Multiple workers generating data and writing it into a single file.
Using OmniThreadLibrary to create a pool of database connections.
How to sort an array and how to process an array using multiple threads.
Finding data in a tree.
Graphical user interface containing multiple frames where each frame is working as a frontend for a background task.
Using databases from OmniThreadLibrary.
Using COM/OLE from OmniThreadLibrary.
Using OmniThreadLibrary’s TOmniMessageQueue to communicate with a TThread worker.
This solution uses low-level part of the OmniThreadLibrary to implement a file scanner application. It is also available as a demo application
The user interface is intentionally simple.
User enters a path and file mask in the edit field and clicks Scan. The application starts a background task which scans the file system and reports found files back to the application where they are displayed in the listbox.
During the scanning, main thread stays responsive. You can move the program around, resize it, minimize, maximize and so on.
Besides the components that are visible at runtime, the form contains two components – a
OTLMonitor and a
When the user clicks the Scan button, a background task is created.
ScanFolders is the method that will do the scanning (in a background thread). Task will be monitored with the
OTLMonitor component which will receive task messages.
OTLMonitor will also tell us when the task has terminated. Input folder and mask are sent to the task as a parameter
FolderMask and task is started.
FFileList field is a
TStringList that will contain a list of all found files.
Let’s ignore the scanner details for the moment and skip to the end of the scanning process. When the task has completed its job,
OTLMonitor.OnTaskTerminated is called.
At that point, number of found files is copied to the outFiles edit field and complete list is assigned to the listbox. Task reference
FScanTask is then cleared, which causes the task object to be destroyed and Scan button is reenabled (it was disabled during the scanning process).
We should also handle the possibility of user closing the program by clicking the ‘X’ button while the background scanner is active. We must catch the
OnFormCloseQuery event and tell the task to terminate.
Terminate method will do two things – tell the task to terminate and then wait for its termination. After that, we simply have to clear the task reference and allow the program to terminate.
Let’s move to the scanning part now. The
ScanFolders method (which is the main task method, the one we passed to the
CreateTask) splits the value of the
FolderMask parameter into the folder and mask parts and passes them to the main worker method
ScanFolder first finds all subfolders of the selected folder and calls itself recursively for each subfolder. That means that we’ll first process deepest folders and then proceed to the top of the folder tree.
Then it sends a message
MSG_SCAN_FOLDER to the main thread. As a parameter of this message it sends the name of the folder being processed. There’s nothing magical about this message – it is just an arbitrary numeric constant from range
65535 is reserved for internal OmniThreadLibrary use).
ScanFolder then runs the
FindClose loop for the second time to search for files in the folder. [BTW, if you want to first scan folders nearer to the root, just exchange the two loops and scan for files first and for folders second.] Each file is added to an internal
TStringList object which was created just a moment before. When a folder scan is completed, this object is sent to the main thread as a parameter of the
This approach – sending data for one folder – is a compromise between returning the complete set (full scanned tree), which would not provide a good feedback, and returning each file as we detect it, which would unnecessarily put a high load on the system.
Find loops test the state of the
task.Terminated function and exit immediately if it is
True. That allows us to terminate the background task when the user closes the application and the
OnFormCloseQuery is called.
That’s all that has to be done in the background task but we still have to process the messages in the main thread. For that, we have to implement the
If the message is
MSG_SCAN_FOLDER we just copy folder name to a local field. If the message is
MSG_FOLDER_FILES, we copy file names from the parameter (which is a
TStringList) to the global
FFileList list and destroy the parameter. We also update a local field holding the number of currently found files.
Why don’t we directly update two edit fields on the form (one with current folder and another with number of found files)? The background task can send many messages in one second (when processing folders with small number of files) and there’s no point in displaying them all – the user will never see what was displayed anyway. It would also slow down the GUI because Windows controls would be updated hundreds of times per second, which is never a good idea.
Instead of that we just store the strings to be displayed in two form fields and display them from a timer which is triggered three times per second. That will not show all scanned folders and all intermediate file count results, but will still provide the user with the sufficient feedback.
In short – they cannot be implemented, just emulated.
Before showing how to do that, let’s return to the basics and see how
await could be used if they existed in Delphi.
Let’s assume you’ve inherited this pretty useless code.
Now, your boss says, you have to make it parallel so the user can start three copies of it. (You also have to add two new buttons to the form to start those instances but that’s easy to do.)
There are many ways to solve this problem, some more and some less complicated; I’d like to point out a simplest possible solution. But first, let’s take a detour into .NET waters…
.NET 4.5 introduced a heavy magical concept of ‘async’ and ‘await’17. In short, it allows you to write a code like this:
[Please note that this is not a supported syntax; that’s just an example of how the .NET syntax could look if Delphi would have supported it.]
The trick here is that
await doesn’t really wait. It relinquishes control back to the main loop which continues to process events etc. In other words – the rest of the program
is running as usual. It may also call another asynchronous function and
await on it. Only when an asynchronous function returns (any of them, if there are more than one
running), the control is returned to the point of the appropriate
await call and the code continues with the next line. [Carlo Kok wrote a nice article about how
await works on the RemObjects Blog.]
Async/await needs extensive support of the compiler and there’s absolutely no way to write an async/await clone in Delphi. But … there’s a simple trick which allows us to write the code in almost the same way. It uses OmniThreadLibrary’s Async construct and the magic of anonymous methods.
Async executes its parameter (the delegate containing the
Sleep call) in a background thread. When this background task is completed, it executes the second parameter (the
OnTerminate delegate) in the main thread. While the background task is working, the main thread spins in its own message loop and runs the user interface – just as it would in the .NET case.
With some syntactical sugar, you can fake a very convincing .NET-like behaviour.
To test, put three buttons on a form and assign the
Button1Click handler to all three. Click and enjoy.
The simplest approach is to create a pipeline with two stages – multiple http retrievers in the first stage and one database writer in the second stage. The number of concurrent http retrievers would have be determined with some testing. It will depend on the throughput of the internet connection and on the quantity of the postprocessing done on the retrieved pages.
First pipeline stage,
Retriever, fetches contents of one page. If the page is fetched correctly, a page description object (not shown in this demo) is created and sent to the output pipeline. Internally (not shown in this demo),
TPage.Create could parse the page and extract the data.
As there can be at most one output generated for each input, this stage is implemented as a simple stage meaning that the Pipeline itself will loop over the input data and call the
Retriever for each input.
Inserter, is implemented as a normal stage (so it has to loop internally
over all input data). First it establishes a connection to the database, then it loops over all input values (over data from all successfully retrieved pages) and inserts each result into the database. At the end (when there is no more data to process) it closes the database connection.
Main method (
ParallelWebRetriever) first sets up and starts the pipeline. Next it feeds URLs to be retrieved into the input pipeline and marks the input pipeline as completed. At the end it waits for the pipeline to complete.
The program will execute as follows:
ParallelWebRetrieverstarts the pipeline.
ParallelWebRetrieverstarts inserting URLs into the pipeline’s input queue.
ParallelWebRetrieverwill run out of URLs and mark pipeline’s input as completed.
WaitForcall will exit.
The best way is to use built-in capabilities of the For Each abstraction which allows you to write data to a shared blocking collection. Your program could then read data from this blocking collection and repack it to a
A solution to this problem is included with the OmniThreadLibrary distribution in folder
The code first creates a blocking collection that will ‘pipe out’ data from the For Each abstraction.
Next it starts a parallel for loop. It will iterate over all elements in the
input list (
ForEach), will preserve the order of the original items (
PreserveOrder) and will write output into the blocking collection (
Into). It will also run in background without waiting for all input to be processed (
NoWait) so that the code in the main thread (
for transaction) will continue executing in parallel with the
The parallel for worker code just creates a TTransaction object from the input line and stores it in the ‘result’ variable.
ForEach code will take this
result and store it in the
outQueue. If you don’t want to produce a result for the given input value, just don’t set the
This code also solves the stopping problem. The
for transaction loop will run until all of the
parallel for input is processed. Only when the
ForEach is truly finished, the
for transaction will exit,
ProcessTransaction will also exit and the object running the parallel for loop will be automatically destroyed.
If you don’t need the output order to be preserved, you can also run the parallel for loop enumerating directly over the
Below is a full code for a test program, implemented in a single form with a single component – ListBox1.
This can be done by using the Parallel.For
taskIndex parameter. The example below also demonstrates the use of a task initializer which is strictly speaking not necessary in this case.
A solution to this problem is included with the OmniThreadLibrary distribution in folder
The code below counts how many numbers in a big array of randomly generated data end in
9 and reports this result at the end. Each worker generates a partial result for a part of input array and results are merged at the end.
This example is included with the OmniThreadLibrary distribution in demo
Let’s assume we have a big array of test data (
testData: array of integer). We can easily generate this data with a call to
As we have to prepare data storage for each worker thread, we have to know how many worker threads would be running. Therefore, we have to set it by calling
.NumTasks. A good default for a CPU intensive operation we’ll be executing is to create one worker task for each available core.
buckets element will store data for one worker thread.
The for loop is next started with
numTasks tasks. For each task an initializer (a parameter provided to the
.Initialize call) is called with the appropriate
numTasks - 1). Initializer just sets the bucket that is associated with the task to zero. [This could easily be done in a main thread for all tasks at once, but I wanted to show how initializer can be used.]
.Execute is called and provided with a delegate which accepts two parameters – the task index
taskIndex and the current value of the for loop
idx. The code determines the last digit of the
testData[idx] and increments the appropriate slot in the bucket that belongs to the current task.
At the end, partial data is aggregated in the main thread. Result is stored in
You should keep in mind that this is really just a simplified example because there is no sense in splitting short strings into character in multiple threads. A solution to this problem is included with the OmniThreadLibrary distribution in folder
The solution below implements the master task (although the question mentioned threads I will describe the answer in the context of tasks) as a background worker abstraction because it solves two problems automatically:
The child tasks are implemented as a paralel task abstraction. It allows us to run a code in multiple parallel tasks at the same time.
To set up a background worker, simply call
Parallel.BackgroundWorker and provide it with a code that will process work items (
BreakStringHL) and a code that will process results of the work item processor (
ShowResultHL). It is important to keep in mind that the former (
BreakStringHL) executes in the background thread while the latter (
ShowResultHL) executes in the main thread. [Actually, it executes in the thread which calls
Parallel.BackgroundWorker but in most cases that will be the main thread.]
Tearing it down is also simple.
CancellAll is called to cancel any pending work requests,
Terminate stops the worker (and waits for it to complete execution) and assignment clears the interface variable and destroys last pieces of the worker.
BreakStringHL method takes a work item (which will contain the input string), sets up a parallel task abstraction, splits the input string into multiple strings and sends each one to the parallel task.
BreakStringHL is called for each input string that arrives over the communication channel. It first decides how many threads to use (number of cores minus one; the assumption here is that one core is used to run the main thread). One string list is then created for each child subtask. It will contain the results generated from that task.
A parallel task abstraction is then started, running (number of cores minus one) tasks. Each will accept a work unit on an internally created queue, process it and shut down.
Next, the code sends work units to child tasks. Each work unit contains the index of the task (so that it knows where to store the data) and the string to be processed. All child tasks also get the same cancellation token so that they can be cancelled in one go. Child tasks are executed in a thread pool to minimize thread creation overhead.
When all child tasks are completed, partial results are collected into one
TStringList object which is returned as a result of the background worker work item.
Actual string breaking is implemented as a standalone procedure. It checks each input character and signals the cancelation token if the character is an exclamation mark. (This is implemented just as a cancelation testing mechanism.) It exits if the cancelation token is signalled. At the end,
Sleep(100) simulates heavy processing and allows the user to click the Cancel button in the GUI before the operation is completed.
The example program uses simple
OnClick handler to send string to processing.
Results are returned to the
ShowResultHL method (as it was passed as a parameter to the
OnRequestDone call when creating the background worker).
It receives an
IOmniBackgroundWorker interface (useful if you are sharing one method between several background workers) and the work item that was processed (or cancelled). The code simply checks if the work item was cancelled and displays the result (by using the ShowResult from the original code) otherwise.
The demonstration program also implements a Cancel button which cancels all pending operations.
All not-yet-executing operations will be cancelled automatically. For the string that is currently being processed, a cancellation token will be signalled.
SplitPartialList will notice this token being signalled and will stop processing.
This question comes from StackOverflow. It is reproduced here in a slightly shortened form.
This solution uses Parallel Task abstraction.
The algorithm works as follows:
The tricky part is implementing the third item – ‘find out how many bytes to process in this iteration’ – in a lock-free fashion. What we need is a thread-safe equivalent of the following (completely thread-unsafe) fragment.
OmniThreadLibrary implements a thread-safe version of this pattern in
TOmniCounter.Take. If you have
TOmniCounter initialized with some value (say,
fileSize) and you call
TOmniCounter.Take(numBytes), the code will behave exactly the same as the fragment above except that it will work correctly if
Take is called from multiple threads at the same time. In addition to that, the new value of the
fileSize will be stored in the
TOmniCounter’s counter and returned as a function result.
There’s another version of
Take which returns the result in a
var parameter and sets its result to
True if value returned is larger than zero.
This version of Take allows you to write elegant iteration code which also works when multiple tasks are accessing the same counter instance.
The solution creates a counter which holds number of bytes to be generated (
unwritten) and a queue that will hold generated data buffers until they are written to a file (
outQueue). Then it starts a
ParallelTask abstraction on all available cores. While the abstraction is running in the background (because
NoWait is used), the main thread continues with the
CreateRandomFile execution, reads the data from the
outQueue and writes blocks to the file.
The parallel part first creates a random generator in each tasks. Because the random generator code is not thread-safe, it cannot be shared between the tasks. Next it uses the above-mentioned
Take pattern to grab a bunch of work, generates that much random data (inside the
FillBuffer which is not shown here) and adds the buffer to the
You may be asking yourself how will this code stop? When the
unwritten counter drops to zero,
Take will fail in every task and anonymous method running inside the task will exit. When this happens in all tasks,
OnStop handler will be called automatically.
The code above passes
Parallel.CompleteQueue to the
OnStop. This is a special helper which creates a delegate that calls
CompleteAdding on its parameter. Therefore,
OnStop handler will call
outQueue.CompleteAdding, which will cause the
for loop in
CreateRandomFile to exit after all data is processed.
The thread pool enables connection pooling by providing property
ThreadData: IOtlThreadData to each task. This property is bound to a thread – it is created when a thread is created and is destroyed together with the thread.
To facilitate this, task implements property
ThreadData which contains the user data associated with the thread.
This data is initialized in the thread pool when a new thread is created. It is destroyed automatically when a thread is destroyed.
To initialize the
ThreadData, you have to write a ‘factory’ method, a method that creates thread data interface. Thread pool will call this factory method to create thread data and will then assign the same object to all tasks running in that thread.
You can write two kinds of a thread data factory – a ‘normal’ function that returns an
IInterface or a method function (a function that belongs to a class) that returns an
Let’s return to the practical part. In the database connection pool scenario, you’d have to write a connection interface, object and factory (see demo application
24_ConnectionPool for the full code).
OnCreate event the code creates a thread pool, assigns it a name and thread data factory. The latter is a function that will create and initialize new connection for each new thread. In the
OnClose event the code terminates all waiting tasks (if any), allowing the application to shutdown gracefully.
FConnectionPool is an interface and its lifetime is managed automatically so we don’t have to do anything explicit with it.
CreateThreadData factory just creates a connection object (which would in a real program establish a database connection, for example).
There’s no black magic behind this connection object. It is an object that implements an interface. Any interface. This interface will be used only in your code. In this demo,
TConnectionPoolData contains only one field – unique ID, which will help us follow the program execution.
As this is not a code from a real world application, I didn’t bother connecting it to any specific database.
TConnectionPoolData constructor will just notify the main form that it has begun its job, generate new ID and sleep for five seconds (to emulate establishing a slow connection). The destructor is even simpler, it just sends a notification to the main form.
Creating and running a task is really simple with the OmniThreadLibrary.
We are monitoring the task with the
TOmniEventMonitor component because a) we want to know when the task will terminate and b) otherwise we would have to store into a global field a reference to the
IOmniTaskControl interface returned from the
The task worker procedure
TaskProc is again really simple. First it pulls the connection data from the task interface (
task.ThreadData as IConnectionPoolData), retrieves the connection ID and sends task and connection ID to the main form (for logging purposes) and then it sleeps for three seconds, indicating some heavy database activity.
Then … but wait! There’s no more! Believe it or not, that’s all. OK, there is some infrastructure code that is used only for logging but that you can look up by yourself.
There is also a code assigned to the second button (Schedule and wait) but it only demonstrates how you can schedule a task and wait on its execution. This is useful if you’re running the task from a background thread.
Let’s run the demo and click on the Schedule key.
What happened here?
OK, nothing special. Let’s click the Schedule button again.
Now a new task was created (with ID 4), was scheduled for execution in the same thread as the previous task and reused the connection that was created when the first task was scheduled. There is no 5 second wait, just the 3 second wait implemented in the task worker procedure.
If you now leave the program running for 10 seconds, a message ‘Destroying connection 1’ will appear. The reason for this is that the default thread idle timeout in a thread pool is 10 seconds. In other words, if a thread does nothing for 10 seconds, it will be stopped. You are, of course, free to set this value to any number or even to 0, which would disable the idle thread termination mechanism.
If you now click the Schedule button again, new thread will be created in the thread pool and new connection will be created in our factory function (spending 5 seconds doing nothing).
Let’s try something else. I was running the demo on my laptop with a dual core CPU, which caused the thread pool to limit maximum number of currently executing threads to two. By default, thread pool uses as much threads as there are cores in the system, but again you can override the value. (In releases [1.0-3.03], you could use at most 60 currently executing threads. Starting from release [3.04], this number is only limited by the system resources.)
To recap – when running the demo, the thread pool was limited to two concurrent threads. When I clicked the Schedule button two times in a quick succession, first task was scheduled and first connection started being established (has entered the
Sleep function). Then the second task was created (as the connection is being established from the worker thread, GUI is not blocked) and second connection started being established in the second thread. Five seconds later, connections are created and tasks start running (and wait three seconds, and exit).
Then I clicked the Schedule button two more times. Two tasks were scheduled and they immediately started execution in two worker threads.
For the third demo, I restarted the app and clicked the Shedule button three times. Only two worker threads were created and two connections established and two tasks started execution. The third task entered the thread pool queue and waited for the first task to terminate, after which it was immediately scheduled.
So here you have it – a very simple way to build a connection pool.
The answer to both parts of the problem is the same – use the Fork/Join abstraction.
Let’s start with a non-optimized single threaded sorter. This simple implementation is very easy to convert to the multithreaded form.
As you can see, the code switches to an insertion sort when the dimension of the array drops below some threshold. This is not really important for the single threaded version (it only brings a small speedup) but it will help immensely with the multithreaded version.
Converting this quicksort to a multithreaded version is quite simple.
Firstly, we have to create a fork/join computation pool. In this example, it is stored in a global field.
Secondly, we have to adapt the
The code looks much longer but changes are really simple. Each recursive call to
QuickSort is replaced with the call to
… and the code
Awaits on both subtasks.
Instead of calling
QuickSort directly, parallel version creates
IOmniCompute interface by calling
FForkJoin.Compute. This creates a subtask wrapping the anonymous function which was passed to the
Compute and puts this subtask into the fork/join computation pool.
The subtask is later read from this pool by one of the fork/join workers and is processed in the background thread.
Await checks if the subtask has finished its work. In that case,
Await simply returns
and the code can proceed. Otherwise (subtask is still working),
Await tries to get one subtask from
the computation pool, executes it, and then repeats from the beginning (by checking if the subtask has
finished its work). This way, all threads are always busy either with executing their own code or a subtask from the computation pool.
IOmniCompute interfaces are stored on the stack in each
QuickSort call, this code uses more stack space than the single threaded version. That is the main reason why the parallel execution is stopped at some level and simple sequential version is used to sort remaining fields.
The second part of this how-to finds a maximum element of an array in a parallel way (see demo application
45_Fork-Join max for the full code).
The parallel solution is similar to the quicksort example above with few important differences related to the fact that the code must return a value (the quicksort code merely sorted the array returning nothing).
This directly affects the interface usage – instead of working with
IOmniCompute<T>. As our example array contains integers, the parallel code creates
IOmniForkJoin<integer> and passes it to the ParallelMax function.
In this example fork/join computation pool is passed as a parameter. This approach is more flexible but is also slightly slower and – more importantly – uses more stack space.
When the array subrange is small enough,
ParallelMax calls the sequential (single threaded) version –
just as the parallel QuickSort did, and because of the same reason – not to run out of stack space.
With a big subrange, the code creates two
IOmniCompute<integer> subtasks each wrapping a function returning an integer. This function in turn calls back
ParallelMax (but with a smaller range). To get the result of the anonymous function wrapped by the
Compute, the code calls the
Just as with the
Value either returns a result (if it was already computed) or executes another fork/join subtasks from the computation pool.
The For Each abstraction can be used to iterate over complicated structures such as trees. The biggest problem is to assure that the code will always stop. The solution below achieves this by using special features of the blocking collection.
The solution to this problem is available as a demo application 35_ParallelFor.
The code in the demo application creates a big tree of
TNode nodes. Each node contains a value (
Value) and a list of child nodes (
TNode also implements a function to return the number of children (
NumChild), a function that converts the node into a textual representation (
ToString; used for printing out the result) and an enumerator that will allow us to access child nodes in a nice structured fashion (
Children). To learn more about the implementation of
TNode and its enumerator, see the demo program.
For comparison purposes the demo program implements sequential search function
SeqScan which uses recursion to traverse the tree.
The parallel version of this function is more complicated. It uses a blocking collection that is shared between all
ForEach tasks. This blocking collection contains all nodes that have yet to be traversed. At the beginning, it contains only the root node. Each task executes the following pseudocode:
The real code is more complicated because of two complications. Firstly, when a value is found, all
ForEach tasks must stop, not just the one that had found the value. Secondly, the code must stop if the value we’re searching for is not present in the tree. In the pseudocode above this is automatically achieved by the condition in the
while statement but in reality this is not so easy. At some time the blocking collection may be completely empty when there is still data to be processed. (For example just at the beginning when the first task takes out the root node of the tree. Yes, this does mean that the condition in the
while statement above is not correct.)
The code first creates a cancellation token which will be used to stop the
ForEach loop. Number of tasks is set to number of cores accessible from the process and a blocking collection is created.
Resource count for this collection is initialized to the number of tasks (
numTasks parameter to the
TOmniBlockingCollection.Create). This assures that the blocking collection will be automatically put into the ‘completed’ mode (as if the
CompleteAdding had been called) if
numTasks threads are simultaneously calling
Take and the collection is empty. This prevents the ‘resource exhaustion’ scenario – if all workers are waiting for new data and the collection is empty, then there’s no way for new data to appear and the waiting is stopped by putting the collection into completed state.
The root node of the tree is added to the blocking collection. Then the
Parallel.ForEach is called, enumerating the blocking collection.
The code also passes cancellation token to the
ForEach loop and starts the parallel execution. In each parallel task, the following code is executed (this code is copied from the full
ParaScan example above):
The code is provided with one element from the blocking collection at a time. If the
Value field is the value we’re searching for,
nodeResult is set, blocking collection is put into
CompleteAdding state (so that enumerators in other tasks will terminate blocking wait (if any)) and cancellation token is signalled to stop other tasks that are not blocked.
Otherwise (not the value we’re looking for), all the children of the current node are added to the blocking collection.
TryAdd is used (and its return value ignored) because another thread may call
CompleteAdding while the
for childNode loop is being executed.
Parallel for loop is therefore iterating over a blocking collection into which nodes are put (via the
for childNode loop) and from which they are removed (via the
ForEach implementation). If child nodes are not provided fast enough, blocking collection will block on
Take and one or more tasks may sleep for some time until new values appear. Only when the value is found, the blocking collection and
ForEach loop are completed/cancelled.
The solution to this problem can be split into three parts – the worker, the frame and the binding code in the form unit.
The solution to this problem is available as a demo application 49_FramedWorkers.
In this example (unit
test_49_Worker in the demo application), the worker code is intentionally very simple. It implements a timer which, triggered approximately every second, sends a message to the owner. This message is received in a
Msg parameter when the task is created. The worker can also respond to a
MSG_HELLO message with a
Message ID’s (
MSG_NOTIFY) are defined in unit
test_49_Common as they are shared with the frame implementation.
The frame (unit
test_49_FrameWithWorker) contains a listbox and a button. It implements a response function for the
MSG_NOTIFY message –
MsgNotify – and it contains a reference to the worker task. This reference will be set in the main form when the task and the frame are constructed.
MsgNotify method is automatically called whenever the
MSG_NOTIFY message is received by the frame. It merely shows the message contents.
A click on the button sends a
MSG_HELLO message to the worker. A name of the frame is sent as a parameter. The worker will include this name in the response so that we can verify that the response is indeed sent to the correct frame.
Five frame/worker pairs are created in the form while it is being created. The code in
FormCreate creates and positions each frame and then creates a worker named
Frame #%d (where
%d is replaced with the sequential number of the frame). Workers are created in the
FormCreate method also creates a task group which is used to terminate all workers when a form is closed.
The final piece of the puzzle is the
CreateWorker method. It creates a low-level task and sets its name. The same name is assigned to the
Msg parameter so that it will be used in messages sent from the task. The
OnMessage call assigns the frame to function as a ‘message-processor’ for the tasks – all messages from the task will be dispatched to the frame. That’s how the
MSG_NOTIFY message ends up being processed by the frame’s
The code above also assigns the worker to the frame and adds the worker to the task group.
For a different approach to multiple workers problem see OmniThreadLibrary and Databases.
Using databases with the OmniThreadLibrary can be quite simple at times; on the other hand, it can also be quite tricky. The main problem with databases is that you have to create database components in the thread that will be using them. As the visual components (as the TDBGrid) must be initialized from the main thread, this implies that you can’t directly connect database-aware GUI controls to database components.
Because of that you have to devise a mechanism that transfers database data from the task to the main thread (and also – if the database access is not read-only – a mechanism that will send updates to the task so that they can be applied to the database). In most cases this means that you should ignore database-aware components and just build the GUI without them. In some cases, however, you could do a lot by just splitting the existing database infrastructure at the correct point and leaving the GUI part almost unmodified. This example explores such option.
An example is included with the OmniThreadLibrary distribution in folder
The basis for this article is the well-known Fish Facts demo program, included in Delphi’s Samples18 folder. This is a simple application that uses database-aware controls to display data from an InterBase database.
I have built a view-only version of Fish Facts called TwoFish which uses two frames, each containing data-aware controls and a background thread which accesses the InterBase data. Both frames are running in parallel and accessing the data at the same time.
To create the TwoFish, I have copied Fish Facts components
IBTable1 into a data module
twoFishDB. This data module contains no code, only these three components. I have also set
Then I created the frame
twoFishDB_GUI which uses the data module
twoFishDB. This frame contains an unconnected
DataSource1 and all data-aware components that are placed on the Fish Facts form –
TDBMemo. They are all connected to the
Main TwoFish program creates two frames. Each frame creates a Background Worker abstraction that (inside the worker task) creates the data module and activates database and database table (more details are given below).
When data is to be retrieved, the task creates a
TClientDataSet and a
TDataSetProvider which ‘pumps’ all data from the
IBTable1 to the
TClientDataSet. This client data set is then sent to the main form which connects it to the
DataSource1. This automatically causes the data to be displayed in the data-aware controls. To keep the example simple, I have disabled data editing.
The most important points of this example are:
OnDestroyis called (
OnCloseQueryis used for this purpose). If you try to destroy the data module from the
OnDestroy, a deadlock will occur inside the Delphi RTL code.
This example shows a different approach to frame-task interaction than the Multiple Workers with Multiple Frames – here the background worker is managed by the frame itself, not by the main form.
The frame wraps one background task that operates on the database and contains database-aware controls displaying the database data.
The Background Worker abstraction is created in the
AfterConstruction method and destroyed in the
AfterConstruction creates a background worker and specifies task initializer and finalizer (
.Finalize). Delegates provided to these two functions (
FinalizeDatabase) are called when background worker task is created and before it is destroyed.
You may have noticed that no code was provided to execute work items. The reason behind this is that the background worker will execute different types of requests. Instead of writing
if ... then tests to detect the work item type and trigger appropriate code, we’ll pass the executor function together with each request.
BeforeDestruction destroys the background worker and destroys the
FDataSet component (we’ll see later why it is used).
Task initializer and finalizer are also very simple – they just create and destroy the data module. The data module is accessible to the background worker through the
Data controls are initially in an unconnected state. They are only connected when the public method
OpenConnection is called.
OpenConnection schedules a work request that contains the database name as a parameter. It also sets the executor function (
ConnectToDatabase) and an anonymous function that will be executed after the request is processed (
OnRequestDone). This anonymous function returns the ‘result’ of the request to the
OpenConnection caller by calling the
onConnectionOpen parameter. [‘Result’ in this case is exposed as an exception that is triggered if the database connection cannot be established. If the connection can be made, the
workItem.FatalException function will return
The important fact to note is that the
OnExecute parameter (
ConnectToDatabase) is called from the worker thread and the
OnRequestDone parameter (the anonymous function) is called from the thread that created the frame (the main thread).
The data module associated with the worker is accessed through the
workItem.TaskState property which gives you access to the
taskState variable initialized in the
InitializeDatabase method. Database name is taken from the work item parameter (
workItem.Data). The database name is set in the
IBDatabase component and connection is established (
Connected := true). If connection fails, an exception will be raised. This exception is caught by the OmniThreadLibrary and stored in the
workItem object where it is later processed by the anonymous method in the
Release pair is here because of bugs in the gds32.dll – the dynamic library that handles connection to the InterBase. It turns out that gds32 handles parallel connections to the database perfectly well – as long as they are not established at the same time. In other words – you can communicate with the database on multiple connections at the same time (get data, put data, execute SQL commands …) but you cannot establish connections in parallel. Sometimes it will work, sometimes it will fail with a mysterious access violation error in the gds32 code. That’s why the twoFishDB_GUI unit uses a global critical section to prevent multiple connections to be established at the same time.
To retrieve data from the database, main unit calls the
Reload function. This function is also called inside the frame from the click event on the Reload button.
Reload just schedules a work request without any input. To process the request,
LoadData will be called.
LoadData executes in the background worker thread. It uses a temporary
TDataSetProvider to copy data to a freshly created
TClientDataSet19. During this process, a ‘Field not found’20 exception is raised twice. If you run the program in the debugger, you’ll see this exception four times (twice for each frame). You can safely ignore the exception as it is handled internally in the Delphi RTL and is not visible to the end-user.
At the end, the
TClientDataSet that was created inside the
LoadData is assigned to the
workItem.Result. It will be processed (and eventually destroyed) in the main thread.
DisplayData method executes in the main thread after the request was processed (i.e., the data was retrieved). If there was an exception inside the work item processing code (
LoadData), it is displayed. Otherwise, the
TClientDataSet is copied from the
workItem.Result into an internal
TfrmTwoFishDB_GUI field and assigned to the
DataSource1.DataSet. By doing that, all data-aware controls on the frame can access the data.
The main program is fairly simple. In the
OnCreateEvent two frames are created. Frame references are stored in the
FFrames form field, declared as
array of TfrmTwoFishDB_GUI.
Next, the form is resized to twice the frame size and
OpenConnections is called to
establish database connections in all frames.
OpenConnections iterates over all frames and calls
OpenConnection method in each one. Two parameters are passed to it – the database name and an anonymous method that will be executed after the connection has been established.
If connection fails, the
FatalException field will contain the exception object raised inside the background worker’s
OpenConnection code. In such case, it will be logged. Otherwise, the connection was established successfully and
Reload is called to load data into the frame.
Frames are destroyed from
OnCloseQuery. It turns out that Delphi (at least XE2) will deadlock if data modules are destroyed in background threads while
OnDestroy is running.
To recapitulate, most important facts about using databases from secondary threads are:
TIBDatabase.Connected := truein a critical section because of gds32 bugs.
OnCloseQuery, not from
OnDestroyif you are using data modules in a secondary thread.
It is actually very simple – you have to remember to call
CoUninitialize from the task code and then you won’t have any problems.
I have put together a simple example that uses SOAP to retrieve VAT info for European companies using the SOAP service at ec.europa.eu. It is included with the OmniThreadLibrary distribution in folder
The program has two input fields, one for the country code (
inpCC) and one for the VAT number (
inpVat), a button that triggers the SOAP request (
btnCheckVat) and a memo that displays the result (
There’s only one method – the
This method first disables the button (so that only one request at a time can be active) and clears the output. Then it uses a Future returning a
checkVatResponse (a type defined in the
checkVatService unit which was generating by importing the WSDL specification). This future will execute the SOAP request in a background task and after that the anonymous method in
Parallel.TaskConfig.OnTerminated will be called in the main thread. This anonymous method displays the result in the
outVatInfo control, destroys the
FRequest future object and enables the button.
The main future method looks just the same as if it would be executed from the main thread except that the SOAP stuff is wrapped in
CoUninitialize calls that make sure that everything is correctly initialized for COM/OLE.
The simplest way is to create two TOmniMessageQueue
objects, one to send data to a thread and one to receive data. Alternatively,
you could create a TOmniTwoWayChannel, which is
just a simple pair of two
TOmniMessageQueue instances. The solution below
uses a former approach.
A solution to this problem is included with the OmniThreadLibrary distribution in folder
We have to handle two very similar but not completely identical parts:
TThreadbased worker or from a form to the main thread (to a form).
Let’s deal with them one by one.
To send data form a form to a thread, we need a message queue. This example uses
TOmniMessageQueue object for that purpose. An instance of this object is
created in the main thread. All threads – the main thread, the worker threads,
and possible other data-producing threads – use the same shared object which is
written with thread-safety in mind.
TOmniMessageQueue constructor takes a maximum queue size for a parameter.
TWorker is just a simple
TThread descendant which accepts the instance of
the message queue as a parameter so it can read from the queue.
The shutdown sequence is fairly standard.
Stop is used instead of
so it can set internal event which is used to signal the thread to stop.
To put some data into a queue, use its
Enqueue method. It accepts a
TOmniMessage record. Each
TOmniMessage contains an integer message ID
(not used in this example) and a TOmniValue data
which, in turn, can hold any data type.
False if the queue is full. (A
TOmniMessageQueue can only
hold as much elements as specified in the constructor call.)
The example below shows how everything works correctly if two threads are started (almost) at the same time and both write to the message queue.
Execute method waits on two handles in a loop. If a
(an internal event) is signalled, the loop will exit. If the message queue’s
THandle-returning method) gets signalled, a new data
has arrived to the queue. In that case, the code loops to empty the message
queue and then waits again for something to happen.
To send messages from a worker thread to a form we need another instance of
TOmniMessageQueue. As we can’t wait on a handle in the main thread (that would
block the user interface), we’ll use a different notification mechanism – a
window message observer.
We create the queue just as in the first part. To use a window message
observer we then just have to assign a message handler to the queue’s
OnMessage event. An observer will be set up automatically in the background.
After that, the event handler will be called once for each message that is inserted into the queue from any thread (or from the form itself).
While shutting down, we just have to destroy the queue.
To send a data, we use exactly the same approach as in the first part (sending data to a worker).
On the receiving side (the form) we have to write an event handler that is called for each message message.