OmniThreadLibrary forum
News: SMF - Just Installed!
 
*
Welcome, Guest. Please login or register. May 17, 2012, 06:13:03 PM


Login with username, password and session length


Pages: 1 [2]   Go Down

Author Topic: Newbie question: how to implement task queue/waiting list in my case?  (Read 764 times)

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #15 on: February 05, 2012, 09:29:19 AM »

Yes, you should only call CompleteAdding when you want to stop the pipeline.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #16 on: February 05, 2012, 02:11:37 PM »

Thank you Primoz :)

Does it hurt if I *only* call CompleteAdding() when my app shuts down (ie. on main form's OnFormDestroy event)? Is it even mandatory to call it at any point in runtime?

I mean it seems the only way to avoid the exception is to avoid calling CompleteAdding, since I'm constantly adding to the queue.

Thanks again,
Khaled.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #17 on: February 05, 2012, 02:23:57 PM »

It doesn't hurt and it is entirely possible that your app works correctly even without that.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #18 on: February 05, 2012, 06:35:27 PM »

Thank you Primoz :)

OK, I'm still having issues with this, explained in my previous message (ie. passing variables to tasks):

    http://otl.17slon.com/forum/index.php/topic,354.msg1291.html#msg1291

You suggested using anonymous methods, but I'm pretty sure I didn't explain well my issue, so please allow me to show some code to illustrate the problem:

Code: [Select]
procedure backup_files();
begin
     FPipeline := Parallel.Pipeline.NumTasks(20).Stage(StageProc).Run;

     FPipeline.input.Add('C:\file-1.txt');
     FPipeline.input.Add('file-1.txt Parent Folder ID');
     FPipeline.input.Add('file-1.txt File ID');
     FPipeline.input.Add('file-1.txt Encryption Password');

     FPipeline.input.Add('C:\file-2.txt');
     FPipeline.input.Add('file-2.txt Parent Folder ID');
     FPipeline.input.Add('file-2.txt File ID');
     FPipeline.input.Add('file-2.txt Encryption Password');

     ...
end;
// -------------------------------------------------------------- //
procedure StageProc(const input, output: IOmniBlockingCollection);

    // ------------------------------------------- //
    function GetValue() : String;
    var
       Value : TOmniValue;
    begin
         Input.Take(Value);
         Result  := Trim(Value.AsString)
    end;
    // ------------------------------------------- //

begin
       FileName   := GetValue();
       ModifDate := GetValue();
       Comments := GetValue();

       if CompareFileModifDate(ModifDate) then
       begin
              // OK, file has been modified but we didn't back it up

              BackupFileName := CopyFile(FileName);
              ZipFileName := ZipFile(BackupFileName);
              EncryptFile(ZipFileName);

              ...
       end;
end;
// -------------------------------------------------------------- //

You see, every file that I need to backup has a series of info related to it, ie. file id, parent folder's ID, etc...

So basically, for each execution of StageProc(), I need to pass multiple variables and then execute StageProc(), for example (pseudo-code):


Code: [Select]
procedure backup_files();
begin
     FPipeline := Parallel.Pipeline.NumTasks(20).Stage(StageProc).Run;

     FPipeline.input.Add('C:\file-1.txt');
     FPipeline.input.Add('file-1.txt Parent Folder ID');
     FPipeline.input.Add('file-1.txt File ID');
     FPipeline.input.Add('file-1.txt Encryption Password');
     FPipeline.input.MarkAsReady;    // initially I thought CompleteAdding(); was responsible of telling the pipeline that input collection was ready to be used with StageProc()

     FPipeline.input.Add('C:\file-2.txt');
     FPipeline.input.Add('file-2.txt Parent Folder ID');
     FPipeline.input.Add('file-2.txt File ID');
     FPipeline.input.Add('file-2.txt Encryption Password');
     FPipeline.input.MarkAsReady;

     ...
end;

Is this possible? Or do I have to group all the variables & add them to pipeline as one? (AFAIK, OTL won't allow me to pass records here)

I mean, what's the recommended OTL way in my case?

Thanks!
Khaled.
« Last Edit: February 05, 2012, 06:43:00 PM by style-sheets »
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #19 on: February 05, 2012, 07:47:33 PM »

OK, I used TOmniValue's CreateNamed function to pass my values as an array, ie.

Code: [Select]
// ------------------------------------------------------------------------------ //
procedure backup_files();
var
   AFile : TOmniValue;
begin
       FPipeline := Parallel.Pipeline.NumTasks(10).Stage(StageProc).Run;

       AFile.CreateNamed(['FileName', 'C:\some-file-1.txt', 'FileSize', 1234]);
       FPipeline.Input.Add(AFile);

       AFile.CreateNamed(['FileName', 'C:\some-file-2.txt', 'FileSize', 56789]);
       FPipeline.Input.Add(AFile);
end;
// ------------------------------------------------------------------------------ //
procedure TForm1.StageProc(const input, output: IOmniBlockingCollection; const Task : IOmniTask);
var
   Value    : TOmniValue;
   FileName : String;
   FileSize : Int64;
begin
     for Value in input do
     begin
          FileName := Value.AsArray['FileName'].AsString;
          FileSize := Value.AsArray['FileSize'].AsInt64;

          // ...
     end;
end;
// ------------------------------------------------------------------------------ //
procedure TForm1.FormDestroy(Sender: TObject);
begin
       FPipeline.Input.CompleteAdding;
       FPipeline.WaitFor(INFINITE);
       FPipeline := nil;
end;

And now it works just fine with multiple vars being passed to StageProc() :)

However, according to my testing, CompleteAdding() must be called on exit, otherwise the EXE will refuse to shutdown (I was able to reproduce it in a demo, let me know if you want me to send it to you):

Code: [Select]
    FPipeline.Input.CompleteAdding;
I clearly under-estimated TOmniValue!
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #20 on: February 06, 2012, 01:47:30 AM »

You're correct and I was mistaken in my last statement.

CompleteAdding is needed to shut down the loop in pipeline's stage process.

Sorry for my misinformation.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #21 on: February 19, 2012, 12:44:49 AM »

Hi Primoz :)

Regarding this code (ie. the pipeline with many items in the TOmniBlockingCollection queue), how can I know when *all* the tasks were finished?

I know I can communicate using messages (ie. Task.comm) to detect that a certain task finished its execution, but it doesn't give me any info on the queue itself (ie. the remaining tasks number)

So my question is: is there some event that tells me that nothing is left in the waiting queue (to be executed)?

Thanks!
Khaled.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #22 on: February 19, 2012, 03:44:03 AM »

It's your code that is writing to the pipeline and your code reading from it so you can always know how many tasks are inside - just subtract the two.
Logged

style-sheets

  • Newbie
  • *
  • Posts: 25
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #23 on: February 19, 2012, 04:25:21 PM »

That's not what I meant, Primoz  :'( I was more looking for an event that fires after all the tasks were finished rather than having to use a counter that obviously would have to lock threads (in order to count & detect tasks execution end).

Also, debugging suggests that for some strange reason the StageProc() function never gets finished (unless the application shuts down), so I can't just add the code that I need to execute (after the tasks are done) in the end of StageProc(), ie. this won't work:

Code: [Select]
// ------------------------------------------------------------------------------ //
procedure TForm1.FormCreate(Sender: TObject);
begin
     FPipeline := Parallel.Pipeline.NumTasks(5).Stage(StageProc).Run;
     FPipeline.Input.Add('C:\file-1.txt');
     FPipeline.Input.Add('C:\file-2.txt');
     FPipeline.Input.Add('C:\file-3.txt');
     ...
end;
// ------------------------------------------------------------------------------ //
procedure StageProc(const Input, Output : IOmniBlockingCollection; const Task : IOmniTask);
var
   FileName : String;
begin
     // Run task here

     try
        for Value in input do
        begin
             FileName := Value.AsArray['FileName'].AsString;

             // Do some stuff here...
             MyFunction(FileName);
        end;    // for
     finally
            // We're done executing the tasks, update UI

            UpdateUI();
     end;    // try/finally
end;
// ------------------------------------------------------------------------------ //

In this case UpdateUI(); will only be executed when the application shuts down, why this is happening is beyond me  ???

Thanks!
Khaled.
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #24 on: February 19, 2012, 04:57:42 PM »

There is no such counter/event built into the pipeline.

If you provide your own counter you don't need any locking as both writing into the pipeline and reading from the pipeline happen in the main thread.

The 'for Value in input' terminates only when CompleteAdding is called on the pipeline. This happens automatically when you destroy the pipeline. Maybe you should use TryTake method instead (http://www.thedelphigeek.com/2010/02/three-steps-to-blocking-collection-3.html).
Logged

GoustiFruit

  • Newbie
  • *
  • Posts: 12
    • View Profile
Re: Newbie question: how to implement task queue/waiting list in my case?
« Reply #25 on: March 03, 2012, 11:10:19 AM »

OK, I used TOmniValue's CreateNamed function to pass my values as an array, ie.

(...)

And now it works just fine with multiple vars being passed to StageProc() :)


I'm using JSON objects to transmit multiple variables through my pipelines !
Logged
Pages: 1 [2]   Go Up
 
 

Powered by MySQL Powered by PHP Powered by SMF 2.0.2 | SMF © 2006-2009, Simple Machines LLC

Valid XHTML 1.0! Valid CSS! Dilber MC Theme by HarzeM