OmniThreadLibrary forum
News: SMF - Just Installed!
 
*
Welcome, Guest. Please login or register. May 17, 2012, 05:55:29 PM


Login with username, password and session length


Pages: [1] 2   Go Down

Author Topic: Thread pool clarification  (Read 1356 times)

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Thread pool clarification
« on: November 04, 2011, 07:27:34 AM »

Primož,

Just to make sure I'm not missing something related to how to take advantage of the GlobalOmniThreadPool.

Here's the idea:
0) 2 * CPUCount tasks to be executed at one time.
1) Tasks are being created and send to the pool for execution
2) A task receives an Object and does something to the Object
3) When the task is done, the Object is read, and the UI (main app thread) is updated

Is the following a correct approach:

Code: [Select]

type
  TMyInteger = class
    Id, SleepTime : integer;
  end;

...

const
  tasksNumber  = 6;
var
  i : integer;
  sleepTime : integer;
  myInt : TMyInteger;
begin
  for i := 0 to tasksNumber do
  begin
    myInt := TMyInteger.Create;
    myInt.Id := i;

    sleepTime := Random(1000);

    CreateTask(
      procedure(const task: IOmniTask)
      var
        myInt : TMyInteger;
        sc : integer;
      begin
        myInt := TMyInteger(task.Param['Object']);
        myInt.SleepTime := task.Param['SleepTime'].AsInteger;

        for sc := 1 to myInt.SleepTime do
        begin
          if task.Terminated then Break;
          Sleep(1);
        end;

        task.Comm.Send(MSG_OBJECT, myInt);
      end)
      .OnMessage(
        procedure(const task: IOmniTaskControl; const msg: TOmniMessage)
        begin
          if msg.MsgID = MSG_OBJECT then
          begin
            Log(Format('%d | value: %d', [TMyInteger(msg.msgData).Id, TMyInteger(msg.msgData).SleepTime]));
            TMyInteger(msg.msgData).Free;
          end;
        end)
      .SetParameter('SleepTime', sleepTime)
      .SetParameter('Object', myInt)
      .Schedule;
  end;
end;

Having

Code: [Select]
  GlobalOmniThreadPool.MaxExecuting := 2 * System.CPUCount;
  GlobalOmniThreadPool.MaxQueued := MaxInt;


Additional questions:

0) what is GlobalOmniThreadPool.MinWorkers used for?

1) How do I read Parameters set using SetParameter from IOmniTaskControl (example: read my Object from the OnTerminated method)?

2) Is the OnMessage the only way to read my Object (is it safe to update the UI from there)?

3) when should "Unobserved" and "Alertable" be used for IOmniTaskControl ?


-žarko
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #1 on: November 04, 2011, 12:53:45 PM »

-2) Your code seems fine (I didn't test it).
-1) Just set MaxQueued to 0 (this is also the default value) - this will disable wait queue checking.
0) MinWorkers sets the number of worker threads that are always kept alive even if they are not working.
1) Actually, you can't, but there's not a good reason for this behaviour. I could add access to task parameter from the task controller side easily.
2) This is safe only if you keep all normal data access rules for multithreading in mind. (I.e. don't access this object in the worker thread after you have send it to the owner.)
3a) For unobserved, read OmniThreadLibrary patterns – Task controller needs an owner.
3b) Alertable - that's more complicated. If you set it, internal wait in the TOmniWorker object will set MWMO_ALERTABLE wait flag. I'd say this is useful only in very specific circumstances.
Logged

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Re: Thread pool clarification / some memory leaks
« Reply #2 on: November 04, 2011, 06:04:23 PM »

Primož,

Thanks!

Quote
1) Actually, you can't, but there's not a good reason for this behaviour. I could add access to task parameter from the task controller side easily.

That would be fantastic!

This way we could easily access my Object (set using SetParameter) or even a class that inherits TOmniWorker (using CreateTask(TMyOmniWorker.Create(myObject))) from the, for example, OnTerminate method.

Btw, when using "CreateTask(TMyOmniWorker.Create()))..." how do I get the IOmniWorker to use the .Implementor and be able to access the instance of TMyOmniWorker created inside CreateTask(), for example in the OnMessage method?


Quote
2) This is safe only if you keep all normal data access rules for multithreading in mind. (I.e. don't access this object in the worker thread after you have send it to the owner.)

Does that mean: do not touch my Object (myInt) after "task.Comm.Send(MSG_OBJECT, myInt)" inside the "procedure(const task: IOmniTask)" ?

Quote
3a) For unobserved, read OmniThreadLibrary patterns – Task controller needs an owner.

Since I'm using .Schedule and GlobalOmniThreadPool, which is monitored, there's no need for Unobserved in my code?

If the above is correct, are problems to be expected if  I do not use TOmniEventMonitor, that is: using .Schedule(myPoll) and myPool := CreateThreadPool() without specifying the Monitor?

I'm asking all this as I've noted lots of memory leaks (ReportMemoryLeaksOnShutdown) with my code (first post in this topic) when "tasksNumber" is rather large (even when using monitored GlobalOmniThreadPool).

Everything works ok if tasksNumber is < 50 or so. When it is > 50 (might be 80), upon closing the application there are lots of OTL related object leaks :(

p.s.
I know there's a memory leak in my sample code if GlobalOmniThreadPool.CancelAll is called before pool is empty (OnMessage not called therefore myInt not fred) - but if you try the code and set tasksNumber to 100, let it run/finish, after closing the application, there are memory leaks.

-žarko
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification / some memory leaks
« Reply #3 on: November 05, 2011, 04:08:31 AM »

That would be fantastic!

OK, I'll add it.

Btw, when using "CreateTask(TMyOmniWorker.Create()))..." how do I get the IOmniWorker to use the .Implementor and be able to access the instance of TMyOmniWorker created inside CreateTask(), for example in the OnMessage method?

Sorry, I don't understand this.

Does that mean: do not touch my Object (myInt) after "task.Comm.Send(MSG_OBJECT, myInt)" inside the "procedure(const task: IOmniTask)" ?

Yes.

Since I'm using .Schedule and GlobalOmniThreadPool, which is monitored, there's no need for Unobserved in my code?

True.

If the above is correct, are problems to be expected if  I do not use TOmniEventMonitor, that is: using .Schedule(myPoll) and myPool := CreateThreadPool() without specifying the Monitor?

No, as thread pool takes over the task ownership.

I'm asking all this as I've noted lots of memory leaks (ReportMemoryLeaksOnShutdown) with my code (first post in this topic) when "tasksNumber" is rather large (even when using monitored GlobalOmniThreadPool).

Everything works ok if tasksNumber is < 50 or so. When it is > 50 (might be 80), upon closing the application there are lots of OTL related object leaks :(

There's a weird problem if you try to schedule more than (around) 80 tasks without allowing the application to process messages which I can't track down and exterminate. Test 11_ThreadPool demonstrates this problem. For now, the only solution is "Don't do that."
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #4 on: November 05, 2011, 11:58:35 AM »

Zarko, update to the current SVN state and you'll have IOmniTaskControl.Param at your disposal.
Logged

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #5 on: November 05, 2011, 04:08:04 PM »

Primož,

Quote
There's a weird problem if you try to schedule more than (around) 80 tasks without allowing the application to process messages which I can't track down and exterminate. Test 11_ThreadPool demonstrates this problem. For now, the only solution is "Don't do that."

I have Application.ProcessMessages before each CreateProcess .. but this seems not to help :(

The "do this" approach would than be to have no more than +/-80 tasks in the pool?

p.s.
Thanks much for all the answers (and the updated code) ... the more I get into OTL .. the more I like it and appreciate your work!

-žarko
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #6 on: November 06, 2011, 03:11:00 AM »

I have Application.ProcessMessages before each CreateProcess .. but this seems not to help :(

Weird. It should. Can you create a small application that fails in this manner so I can look into it?

The "do this" approach would than be to have no more than +/-80 tasks in the pool?

No, I meant "to schedule around 80 tasks in one go without processing messages".
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification / some memory leaks
« Reply #7 on: November 06, 2011, 05:41:10 AM »

Since I'm using .Schedule and GlobalOmniThreadPool, which is monitored, there's no need for Unobserved in my code?

Correction - it seems that you have to use Unobserved even in this case. (Just noticed that OtlParallel uses .Unobserved when .Scheduling tasks and if I remove .Unobserved, the code crashes.)
Logged

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #8 on: November 07, 2011, 05:05:54 AM »

Primož,

Attached a sample application where even with Application.ProcessMessages before each CreateTask I have leaks upon app exit.


p.s.
Is there a way to get to the Pool used in .Schedule(Pool) from IOmniTaskControl (OnMessage method)?

-žarko
Logged

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #9 on: November 07, 2011, 06:46:06 AM »

Primož,

Regarding my unclear question: "Btw, when using "CreateTask(TMyOmniWorker.Create()))..." how do I get the IOmniWorker to use the .Implementor and be able to access the instance of TMyOmniWorker created inside CreateTask(), for example in the OnMessage method?"

Here's what I meant:

I have a rather simple class which extends TOmniWorker, and am using it for CreateTask:

Code: [Select]
var
  st : cardinal;
begin
  st := Random(100);

  CreateTask(TMyOmniWorker.Create(st),IntToStr(st))
  .Invoke(
    procedure(const task: IOmniTask)
    begin
      TMyOmniWorker(task.Implementor).DoSleep;
    end)
  .OnMessage(
    procedure(const task: IOmniTaskControl; const msg: TOmniMessage)
    var
      myWorker : TMyOmniWorker;
    begin
      //
      // How do I directly get the instance of TMyOmniWorker here ?
      // That is: how do I get IOmniTask from IOmniTaskControl?
      //


      //receiving string
      if msg.MsgID = 1 then Memo1.Lines.Add(msg.MsgData.AsString);

      //receiving TMyOmniWorker instance through Comm
      if msg.MsgID = 2 then
      begin
        myWorker := TMyOmniWorker(msg.MsgData);

        Memo1.Lines.Add(IntToStr(myWorker.SleepTime1));

        myWorker.Task.Invoke(myWorker.DoMoreSleep);
      end;
    end)
  .Unobserved
  .Run;
end;


I was asking: how do I get to the instance of my TMyOmniWorker inside the OnMessage - without using the task.Comm, if possible?

My TMyOmniWorker:

Code: [Select]
 TMyOmniWorker = class(TOmniWorker)
  private
    fSleepTime1: cardinal;
  protected
    procedure Cleanup; override;
  public
    constructor Create(const sleepTime :cardinal);
    property SleepTime1 : cardinal read fSleepTime1;
    procedure DoSleep;
    procedure DoMoreSleep;
  end;

...
procedure TMyOmniWorker.Cleanup;
begin
  task.Comm.Send(1, Format('Task %s signing off from thread %d', [task.Name, GetCurrentThreadID]));
  inherited;
end;

constructor TMyOmniWorker.Create(const sleepTime : cardinal);
begin
  fSleepTime1 := sleepTime;
end;

procedure TMyOmniWorker.DoMoreSleep;
var
  i : integer;
begin
  for i := 1 to SleepTime1 do
  begin
    if self.Task.Terminated then Exit;
    Sleep(1);
  end;

  task.Comm.Send(1, 'DoMoreSleep DONE');
  task.Terminate;
end;

procedure TMyOmniWorker.DoSleep;
var
  i : integer;
begin
  for i := 1 to SleepTime1 do
  begin
    if self.Task.Terminated then Exit;
    Sleep(1);
  end;

  task.Comm.Send(1, 'DoSleep DONE');

  if TMyOmniWorker(task.Implementor).SleepTime1 > 49 then
    task.Comm.Send(2, TMyOmniWorker(task.Implementor))
  else
    task.Terminate;
end;

Basically, I can have TMyOmniWorker work (DoSleep and DoMoreSleep) in main app thread (just check if Task = nil) or in a background thread (using CreateTask...).

Am hoping this is more clear.

-žarko
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #10 on: November 08, 2011, 07:41:22 AM »

1) First Invoke can be simplified to

  .Invoke(@TMyOmniWorker.DoSleep)

   You can just pass generic method addresses to the worker, it will interpret it correctly.

2) Second Invoke can be accessed through the IOmniTaskControl interface.

   task.Invoke(@TMyOmniWorker.DoMoreSleep);

3) Don't access task data directly from the owner, this is a very bad practice which quickly leads to buggy code. I'm recommending sending SleepTime1 as a parameter of the message '2'.

Full code:

Code: [Select]
type
  TMyOmniWorker = class(TOmniWorker)
  private
    fSleepTime1: cardinal;
  protected
    procedure Cleanup; override;
  public
    constructor Create(const sleepTime :cardinal);
    property SleepTime1 : cardinal read fSleepTime1;
    procedure DoSleep;
    procedure DoMoreSleep;
  end;

procedure TMyOmniWorker.Cleanup;
begin
  task.Comm.Send(1, Format('Task %s signing off from thread %d', [task.Name, GetCurrentThreadID]));
  inherited;
end;

constructor TMyOmniWorker.Create(const sleepTime : cardinal);
begin
  fSleepTime1 := sleepTime;
end;

procedure TMyOmniWorker.DoMoreSleep;
var
  i : integer;
begin
  for i := 1 to SleepTime1 do
  begin
    if self.Task.Terminated then Exit;
    Sleep(1);
  end;

  task.Comm.Send(1, 'DoMoreSleep DONE');
  task.Terminate;
end;

procedure TMyOmniWorker.DoSleep;
var
  i : integer;
begin
  for i := 1 to SleepTime1 do
  begin
    if self.Task.Terminated then Exit;
    Sleep(1);
  end;

  task.Comm.Send(1, 'DoSleep DONE');

  if SleepTime1 > 49 then
    task.Comm.Send(2, SleepTime1)
  else
    task.Terminate;
end;

procedure TForm53.FormCreate(Sender: TObject);
var
  st : cardinal;
begin
  st := Random(100);

  CreateTask(TMyOmniWorker.Create(st),IntToStr(st))
  .Invoke(@TMyOmniWorker.DoSleep)
  .OnMessage(
    procedure(const task: IOmniTaskControl; const msg: TOmniMessage)
    begin
      //receiving string
      if msg.MsgID = 1 then Memo1.Lines.Add(msg.MsgData.AsString);

      //receiving TMyOmniWorker instance through Comm
      if msg.MsgID = 2 then
      begin
        Memo1.Lines.Add(IntToStr(msg.MsgData));
        task.Invoke(@TMyOmniWorker.DoMoreSleep);
      end;
    end)
  .Unobserved
  .Run;
end;
Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #11 on: November 08, 2011, 08:36:09 AM »

Attached a sample application where even with Application.ProcessMessages before each CreateTask I have leaks upon app exit.

Seems to be working fine here (start, click "do tasks", wait, close). Delphi XE.
Logged

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #12 on: November 08, 2011, 10:46:49 AM »

Primož, thanks.

Quote
Seems to be working fine here (start, click "do tasks", wait, close). Delphi XE.

Ah :( I though this will be like that. Leaks on my side .. am not sure why. Would it help if I send you in the FastMM log?

Btw, can you also answer this one:

Quote
Is there a way to get to the Pool used in .Schedule(Pool) from IOmniTaskControl (OnMessage method)?

Finally,
Quote
3) Don't access task data directly from the owner, this is a very bad practice which quickly leads to buggy code.
Can you be more specific here - who is the owner here and in what way have I accessed the task directly from the owner?

Does that mean "don't send TMyOmniWorker(task.Implementor) through task.Comm to the OnMessage"?

Logged

Primoz Gabrijelcic

  • Administrator
  • Hero Member
  • *****
  • Posts: 569
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #13 on: November 08, 2011, 11:21:25 AM »

Ah :( I though this will be like that. Leaks on my side .. am not sure why. Would it help if I send you in the FastMM log?

Send it, I'll see if I can guess the reason.

Quote
Finally,Can you be more specific here - who is the owner here and in what way have I accessed the task directly from the owner?

The owner is the thread that created the task and owns IOmniThreadControl. You were accessing task's SleepTime1 directyle from the main thread.

Quote
Does that mean "don't send TMyOmniWorker(task.Implementor) through task.Comm to the OnMessage"?

That is definitely not a good idea.
Logged

zarkogajic

  • Newbie
  • *
  • Posts: 15
    • View Profile
    • Email
Re: Thread pool clarification
« Reply #14 on: November 08, 2011, 02:38:38 PM »

Primož,

Ok, thanks for answers, all clear now.

I'll see to send the memory leak details tomorrow.

How about:

Quote
Is there a way to get to the Pool used in .Schedule(Pool) from IOmniTaskControl (OnMessage method)?

That is: I'm not using the GlobalOmniThreadPool, rather I'm creating my own using the CreateThreadPool. Using it when .Schedule(MyPool).

The question is: how do I get the MyPool inside OnMessage (to "print out" the CountExecuting and CountQueued for example)?

-žarko
Logged
Pages: [1] 2   Go Up
 
 

Powered by MySQL Powered by PHP Powered by SMF 2.0.2 | SMF © 2006-2009, Simple Machines LLC

Valid XHTML 1.0! Valid CSS! Dilber MC Theme by HarzeM