OmniThreadLibrary forum
News: SMF - Just Installed!
 
*
Welcome, Guest. Please login or register. May 17, 2012, 06:12:17 PM


Login with username, password and session length


Pages: [1] 2   Go Down

Author Topic: Newbie question: how to implement task queue/waiting list in my case?  (Read 764 times)

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile

Hi!

I just started playing with the OmniThreadLibrary library today and although I like it a lot (kudos for the great job :) ), I'm a bit lost too!

My need is simple. I want to:

1.   Run up to 20 tasks concurrently (for example)
2.   Send any subsequent task to "waiting queue" if the number of running tasks exceeds 20
3.   As soon as we have a "free slot" (ie. one of the running tasks finished its job), run one of the tasks that I put in the waiting list and remove it from the queue list.

Right now I'm doing this using TThread & TStringList.

For obvious reasons I'd like to switch to OmniThreadLibrary, but I'm not sure how to proceed;

I looked at tests\32_Queue\ demo, but the terminology used (reader/forwarder/writer) is a bit confusing to me.

So my question is: assuming this is possible in OmniThreadLibrary, am I looking at the right demo?

If so, how can I implement what I described earlier using OmniThreadLibrary/TOmniQueue?

Also, is there any limit on the queue/waiting list size? In my case, the "waiting queue" can be *huge* (although a bit extreme, the # of entries in the list can be in millions)

Thanks in advance!
Khaled.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #1 on: January 29, 2012, 06:23:40 PM »

I forgot to mention: given the presented situation, the use of a thread pool would be ideal in my case.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #2 on: January 29, 2012, 08:52:33 PM »

OK, I spent few hours reading & studying the code/your pages and finally came up with the code below.

It's based on the test_0_Beep project. At first glance, it does indeed satisfy my requirements, please feel free to add any comment and/or let me know if there's a better way to do things.

One thing I noticed, is that if I change TASKS_COUNT constant to a certain value & run the demo, I get this AV:

    TOmniCommunicationEndpoint.Send: Queue is full

I tried to debug but I couldn't determine a "safe limit" beyond which I get this AV, do I take it that OmniThreadLibrary isn't well suited with large queues?

Thanks!
Khaled.

Code: [Select]
unit test_0_Beep;

interface

uses
  Windows, SysUtils,
  Classes, Controls, StdCtrls, Forms,
  OtlThreadPool,
  OtlTask,
  OtlTaskControl, OtlEventMonitor;

type
  TfrmTestSimple = class(TForm)
    btnBeep: TButton;
    OmniEventMonitor: TOmniEventMonitor;
    procedure btnBeepClick(Sender: TObject);
    procedure FormCreate(Sender: TObject);
    procedure OmniEventMonitorTaskTerminated(const task: IOmniTaskControl);
    procedure OmniEventMonitorPoolWorkItemCompleted(
      const pool: IOmniThreadPool; taskID: Int64);
  private
    procedure Beep(const task: IOmniTask);
  end;

var
   frmTestSimple : TfrmTestSimple;

const
     TASKS_COUNT = 1000;

implementation

uses
    DSiWin32;

{$R *.dfm}

{ TfrmTestOTL }
// ------------------------------------------------------------------------------ //
procedure TfrmTestSimple.btnBeepClick(Sender: TObject);
var
   i : Integer;
begin
     with OmniEventMonitor do
     begin
          for i := 1 to TASKS_COUNT do
              Monitor(CreateTask(Beep, 'Beep-' + IntToStr(i))).Schedule();
     end;    // for
end;
// ------------------------------------------------------------------------------ //
procedure TfrmTestSimple.FormCreate(Sender: TObject);
begin
     GlobalOmniThreadPool.MonitorWith(OmniEventMonitor);
     GlobalOmniThreadPool.MaxExecuting := 20;
     GlobalOmniThreadPool.MaxQueued    := 0;
end;
// ------------------------------------------------------------------------------ //
procedure TfrmTestSimple.OmniEventMonitorPoolWorkItemCompleted(const pool: IOmniThreadPool; taskID: Int64);
begin
     Caption := 'Finished executing ' + IntToStr(TASKS_COUNT) + ' threads!';
end;
// ------------------------------------------------------------------------------ //
procedure TfrmTestSimple.OmniEventMonitorTaskTerminated(const task: IOmniTaskControl);
begin
     with GlobalOmniThreadPool do
          OutputDebugString(PChar('Task terminated: ' + task.Name + '. CountExecuting = ' + IntToStr(CountExecuting) + '. CountQueued = ' + IntToStr(CountQueued)));
end;
// ------------------------------------------------------------------------------ //
procedure TfrmTestSimple.Beep(const task: IOmniTask);
begin
     // Executed in a background thread
     Application.ProcessMessages; Application.ProcessMessages;
     // MessageBeep(MB_ICONEXCLAMATION);
end;
// ------------------------------------------------------------------------------ //
initialization
     Randomize;
end.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #3 on: January 31, 2012, 02:47:12 AM »

You are overflowing an internal queue. There is no good reason to have 7000 tasks scheduled for execution. Change your application.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #4 on: January 31, 2012, 06:26:23 AM »

Then I guess I either didn't understand the role of the queue, or the name is a bit misleading:

At first I was thinking the OTL queue is basically some kind of a TStringList (or a fast hash list) containing the list of the future tasks put in queue (once a task is completed, OTL gets the next entry in queue/list, run it as a task and then delete it from queue).

I have a backup application, when a user specify a folder to backup, I have to scan it, find the files that I need to backup, then back these files in batch of 20 (for example).

My users can specify a folder containing 25,000 files to backup (or even an entire disk!), the way I do it right now is by using a waiting list:

  • I create 20 threads, each one backing up a file
  • For other files, I save them in an ACID db (in case the app crashes) and then put them in a special WaitingList
  • When a thread is done, I get the next entry from the WaitingList and run it as a new thread
  • I repeat until there are no more files to backup

So I beg to differ: there are situations where I have a large waiting list. There can be no more than 20 tasks at a time, and if the issue is with messages/task comm, my understanding is that the OTL built-in communication shouldn't affect neither future nor past/completed tasks.

Admittedly I just started using OTL, so I'm pretty sure I'm missing something regarding the role of the queuing system.

I was able to bypass this limitation yesterday (ie. to make my application handle such large waiting list with OTL) by making queue for the OTL queue!

It was kind of hack-ish, and kinda defeated the purpose of the OTL queuing system.

Thanks!
Khaled.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #5 on: January 31, 2012, 08:05:05 AM »

You should create as many tasks as you want parallel backup operations, not more. Then you should use TOmniBlockingCollection, which is the kind of queue that you need (with unlimited size) and pass your 25.000 requests to your tasks through this queue.

You should probably use the Pipeline abstraction (OtlParallel.Pipeline). Check demo 41_Pipeline and example 'stringlist parser'. Also check blog articles:
http://www.thedelphigeek.com/2010/11/multistage-processes-with.html
http://www.thedelphigeek.com/2011/04/configuring-background-otlparallel.html
http://www.thedelphigeek.com/2011/09/life-after-21-pimp-my-pipeline.html
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #6 on: January 31, 2012, 03:47:46 PM »

Thank you, Primoz :)

It's nice to hear that it's possible.

I looked into the sample project and read the blog entries, I feel like I'm close, but I'm still lost!

So let's say I have this code:

Code: [Select]
// ------------------------------------------------------------------------------ //
procedure TForm1.FormCreate(Sender: TObject);
begin
     BackupFile('C:\file-1.txt');
     BackupFile('C:\file-2.txt');
     BackupFile('C:\file-3.txt');
end;
// ------------------------------------------------------------------------------ //
procedure TForm1.BackupFile(FileName : String);
begin
     with Parallel.Pipeline do
     begin
            // Pass file name to task
            Input.Add(FileName);
            Input.CompleteAdding;

            // No more than 20 tasks at a time
            NumTasks(20).Stage(StageProc);

            Run;
     end;    // with
end;
// ------------------------------------------------------------------------------ //
procedure TForm1.StageProc(const input, output: IOmniBlockingCollection);
var
    Value : TOmniValue;
begin
       // Run task here

       // Retrieve input value (ie. FileName)
       Input.Take(Value);

       if FileExists(Value.AsString) then
       begin
              //...
       end;
end;
// ------------------------------------------------------------------------------ //

Debugging this code suggest that it does indeed run 20 tasks, but...

  • Really all it does is passing the same file name to 20 tasks
  • It's not clear how the 21th, 22th, etc... tasks get picked and executed, is this done automatically by OTL? I'm only getting 20 tasks executed

I tried adding this line to StageProc() to start the next batch/series of tasks:

  Parallel.Pipeline.NumTasks(20).Run;

but it didn't work! (I found something similar to it in the app_41 demo -> AddTasks() function)

Any advice would be greatly appreciated,

Thanks!
Khaled.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #7 on: February 01, 2012, 02:28:42 AM »

There are numerous problems in your code.

1) You are creating a new pipeline for each file and running 20 tasks to backup that on file.
2) You are not storing result of Parallel.Pipeline anywhere so it gets destroyed when the BackupFile returns.
3) StageProc only reads one file and then exits. It should read all files that appear on the input.

Corrected example:
Code: [Select]
unit Unit85;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs,
  OtlCommon, OtlParallel, OtlCollections;

type
  TForm85 = class(TForm)
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
  private
    FPipeline: IOmniPipeline;
    procedure BackupFile(FileName: String);
    procedure StageProc(const input, output: IOmniBlockingCollection);
  public
  end;

var
  Form85: TForm85;

implementation

{$R *.dfm}

procedure TForm85.FormDestroy(Sender: TObject);
begin
     FPipeline.WaitFor(INFINITE);
     FPipeline := nil;
end;

// ------------------------------------------------------------------------------ //
procedure TForm85.FormCreate(Sender: TObject);
begin
     FPipeline := Parallel.Pipeline.NumTasks(20).Stage(StageProc).Run;
     BackupFile('C:\file-1.txt');
     BackupFile('C:\file-2.txt');
     BackupFile('C:\file-3.txt');
     FPipeline.Input.CompleteAdding;
end;
// ------------------------------------------------------------------------------ //
procedure TForm85.BackupFile(FileName : String);
begin
  FPipeline.Input.Add(FileName);
end;
// ------------------------------------------------------------------------------ //
procedure TForm85.StageProc(const input, output: IOmniBlockingCollection);
var
    Value : TOmniValue;
begin
       // Run task here

       // Retrieve input value (ie. FileName)
       for Value in input do begin
           if FileExists(Value.AsString) then
           begin
                  //...
           end;
       end;
end;
// ------------------------------------------------------------------------------ //

end.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #8 on: February 01, 2012, 09:34:11 AM »

You rock, Primoz!  :)

Your code is perfect, thank you, thank you, thank you!!!
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #9 on: February 02, 2012, 01:22:15 PM »

Hi!

OK, I'm having a small, but important & hopefuly last, issue when converting my code from TThread to OTL.

You see, I'm used to have local variables that are shared across a particular thread, ie.

Code: [Select]
    TMyThread = class(TThread)
    private
           FFileName : String;
    protected
             procedure Create(FileName : String); override;
             procedure DoStuff;
             procedure Execute; override;
    end;    // TMyThread

// -------------------------------------------- //
procedure TMyThread.Create(FileName : String);
begin
     FFileName := FileName;
     Start;
end;
// -------------------------------------------- //
procedure TMyThread.Execute;
begin
     FFileName := Trim(FFileName);
     DoStuff();
end;
// -------------------------------------------- //
procedure TMyThread.DoStuff;
begin
     if FileExists(FFileName) then
        // Proceed...
end;
// -------------------------------------------- //

So in this example, any procedure "belonging" to [ TMyThread ] would have access to variable [ FFileName ].

I'm using pipelines as you suggested. I know I have access to (Input : IOmniBlockingCollection). I know I can just pass it to whatever function I want, but:

1.   Input is declared as constant, so I can't change the values
2.   It seems I can't directly access to a particular value from the Input collection (ie. Input['FileName'] or Input[1]). Instead I have to start a loop, which is *really* error proning and far from being "natural".

I know I could just pass the variables I want with a string list, ie.

Code: [Select]
// ------------------------------------------------------------------------------ //
procedure TForm85.StageProc(const input, output: IOmniBlockingCollection);
var
   Value   : TOmniValue;
   VarList : TStringList;
begin
       VarList := TStringList.Create;

       for Value in input do
           VarList.Add(Value.AsString);

       Function_1(VarList);
end;
// ------------------------------------------------------------------------------ //
procedure TForm85.Function_1(var VarList : TStringList);
begin
     VarList[0] := Copy(VarList[0], 1, 25);
     Function_2(VarList);
end;
// ------------------------------------------------------------------------------ //
procedure TForm85.Function_2(var VarList : TStringList);
begin
     // ...
end;
// ------------------------------------------------------------------------------ //

But I wonder if there's a better way to do this? In other words, what's the recommended way to share cross-functions variables that are specific to one task?
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #10 on: February 02, 2012, 03:54:36 PM »

The best approach in such case is to use anonymous methods.

For example:

var
  FSharedState: ISharedState;

procedure DoSomethingWithPipeline;
begin
   FSharedState := InitializeSharedState;
   FPipeline := Parallel.Pipeline.NumTasks(20).Stage(
     procedure (const input, output: IOmniBlockingCollection);
     var
        Value : TOmniValue;
     begin
        for value in input do
          ProcessFile(FSharedState, value);
     end
     ).Run;
     BackupFile('C:\file-1.txt');
     BackupFile('C:\file-2.txt');
     BackupFile('C:\file-3.txt');
     FPipeline.Input.CompleteAdding;
end;
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #11 on: February 02, 2012, 04:10:55 PM »

Thank you Primoz, but...

I couldn't find any reference of ISharedState & InitializeSharedState!

What unit / where do I get them from? I do have XE2, but this project is currently developed under Delphi 2010

Thanks again!
Khaled.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #12 on: February 02, 2012, 04:17:08 PM »

That is just something you have to put together - a class or interface that stores your shared state.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #13 on: February 02, 2012, 04:34:01 PM »

Thanks a bunch!  :)
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #14 on: February 05, 2012, 05:56:15 AM »

Sorry to bring this up Primoz, but if I run this (slightly) modified code to add more variables to FPipeline.input *after* calling FPipeline.Input.CompleteAdding:

Code: [Select]
// ------------------------------------------------- //
procedure DoSomethingWithPipeline;
begin
     FPipeline := Parallel.Pipeline.NumTasks(20).Stage(StageProc).Run;
     FPipeline.Input.Add('C:\file-1.txt');
     FPipeline.Input.Add('C:\file-2.txt');
     FPipeline.Input.Add('C:\file-3.txt');
     FPipeline.Input.CompleteAdding;

     FPipeline.Input.Add('C:\file-4.txt');
     FPipeline.Input.CompleteAdding;
end;
// ------------------------------------------------- //

I get this exception:

   
Quote
Project raised exception class ECollectionCompleted with message 'Adding to completed collection'.

If I understood correctly, this line:

Code: [Select]
FPipeline.Input.CompleteAdding;
Basically locks the collection & tell OTL that it's time to execute StageProc in the (previously initialized) threads/tasks(?)

But then, how can I add more variables to the queue once CompleteAdding() has been called?

I mean, that kinda defeats the initial purpose, as my goal is to make an expandable queue of tasks, not just through my code (ie. once I scan folders & find files to backup), but also make it extendable as the user *may* instruct my app to add more files/folders to backup queue *after* CompleteAdding() has been called.

Any advice would be greatly appreciated,

Thanks!
Khaled.
Logged
Pages: [1] 2   Go Up
 
 

Powered by MySQL Powered by PHP Powered by SMF 2.0.2 | SMF © 2006-2009, Simple Machines LLC

Valid XHTML 1.0! Valid CSS! Dilber MC Theme by HarzeM