Hi,
This is an (long) update on the task packaging and task
synchronization features. I just checked in my changes into the
task_packaging branch of my git tree. I haven't run tests against the
new code, and it is probably not working yet. But I believe it will be
quickly.
Generally speaking, I implemented:
1) a simple task packaging scheme as per description of ticket #74.
a) Note that now referencing a task needs a fully-qualified name,
like 'some_pkg.SomeTask'.
b) There can be only ONE tasks_dir, which is "task_cache" by
default. Otherwise, it will confuse the task synchronization process.
2) a naive task synchronization mechanism to fix ticket #22
When the scheduler wants a worker to run a task or a workunit, it
also tells the worker the latest version number of that task. If the
worker finds that the version number does not match its local version,
it will initiate a task synchronization request to the master. A
synchronization session will subsequently take place between the
master and the worker. After the session, the task definition at the
worker side will be up-to-date.
What I haven't finished:
1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's
TaskManager are not handled yet. Looks like it will need much effort
to cleanly and neatly deal with these situations on the worker side.
However, missing this feature won't be a big problem, since the worker
will passively synchronize with the master.
2) Should hold the execution of a task on the worker if the task code
is out-dated and resume it after synchronization. I'm working on this.
Code changes are mostly in tasks/packaging.py and
tasks/task_manager.py. Here are some of my notes during the
development:
= Task Packaging =
I don't change the behavior that a worker runs a task. However, to
facilitate sandboxed execution, I offered an interface in TaskManager
(i.e., TaskManager.get_task()) to return all the additional module
search paths of a task.
= Task Synchronization =
This feature is much more trickier than task packaging. I made several
design decisions.
1) TaskManager is a module used by both the master and the worker.
Though pydra is operated in a master/slave fashion, I want the
TaskManager to rely on neither the master or the worker. So
TaskManager should not call any remote methods of the master and the
worker, and should be a relatively independent module, except that it
emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This
design will even make it easy to develop peer-to-peer task
synchronization, though we don't need this feature right now.
2) Though TaskManager is independent of other modules, the master and
the worker needs access to it to manage available tasks, such as
retrieving tasks and synchronizing tasks. The original architecture
only allows me to use signals. But this would be a little bit clumsy.
So I took the concept of "friend class" in C++, and added a _friend
attribute in the Module class. A module can define its friends as
follows:
self._friends = {
'friend1' : FriendModule1,
'friend2' : FriendModule2,
}
After the initialization process, the module will have two more
attributes, namely friend1 and friend2, as long as FriendModule1 and
FriendModule2 are found in the same ModuleManager as this module. Note
that this is actually contrary to the concept of friend class in C++.
With this in hand, I made TaskManager friends of WorkerTaskControls,
TaskScheduler, and TaskSyncModule (a new module created by me) so that
the latter three can access the TaskManager directly.
3) Synchronization between A and B may take more than one round of
message passing, depending on the synchronization algorithm used. To
flexibly support various synchronization algorithms, I defined two
interfaces in the TaskPackage class:
# TaskPackage.active_sync(self, response, phase=1)
Generates a sync request, and perform certain actions to update the
local task package to make it the same as a remote package. The
"response" parameter is the sync response generated by that remote
task package (see below). "phase" indicates the phase which the sync
process is in. Obviously, when phase is 1, response should be passed
as None.
The return value of this method is a tuple containing the request data
and a boolean flag, which indicates whether this task package expects
a response from the remote side.
# TaskPackage.passive_sync(self, request, phase=1)
Generates a sync response according to the received sync request. The
"request" and "phase" parameters have similar meanings to those in
active_sync().
Subclasses can derive from TaskPackage and implement their own
active_sync() and passive_sync() methods to support other
synchronization mechanisms.
Take the simplistic tar.gz synchronization algorithm for example. The
flow of a successful synchronization is:
1. WorkerTaskControls detects a task is out-dated, so it calls
TaskManager.active_sync() with task_key specified.
2. TaskManager finds the task package which the task belongs to
according to the task_key.
3. TaskManager calls TaskPackage.active_sync(response=None, phase=1).
4. In phase 1 of active_sync, TaskPackage simply returns its current
version, and indicates that it expects a further response from the
master.
5. WorkerTaskControls sends the sync request generated by
active_sync() to the master
6. TaskSyncModule at the master side receives the sync request, and
then invokes TaskPackage.passive_sync(request, 1).
7. In TaskPackage.passive_sync(), the latest content of the task
package is compressed in tar.gz format, serialized, and returned as a
string.
8. WorkerTaskControls receives the response from the master and
subsequently calls TaskPackage.active_sync(response, 2),which will
decompress the data and result in an update-to-date task package.
Now sync requests and responses are all represented in strings.
Ideally, we will use Producers and Consumers to achieve better
performance, as we discussed.
Hope I expressed myself clearly. Any comments are appreciated.
Yin QIU wrote: > Hi, > > This is an (long) update on the task packaging and task > synchronization features. I just checked in my changes into the > task_packaging branch of my git tree. I haven't run tests against the > new code, and it is probably not working yet. But I believe it will be > quickly. > > Generally speaking, I implemented: > > 1) a simple task packaging scheme as per description of ticket #74. > a) Note that now referencing a task needs a fully-qualified name, > like 'some_pkg.SomeTask'. > b) There can be only ONE tasks_dir, which is "task_cache" by > default. Otherwise, it will confuse the task synchronization process. > >Is it redundant packages (ie same package in two locations) that is the problem? This is fine for now but eventually we'll want to support multiple directories.> 2) a naive task synchronization mechanism to fix ticket #22 > When the scheduler wants a worker to run a task or a workunit, it > also tells the worker the latest version number of that task. If the > worker finds that the version number does not match its local version, > it will initiate a task synchronization request to the master. A > synchronization session will subsequently take place between the > master and the worker. After the session, the task definition at the > worker side will be up-to-date. > > What I haven't finished: > > 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's > TaskManager are not handled yet. Looks like it will need much effort > to cleanly and neatly deal with these situations on the worker side. > However, missing this feature won't be a big problem, since the worker > will passively synchronize with the master. > >Are you referring to unloading and reloading code from python? It can't really be done currently. Python just doesn't have the capability. This is one of the reasons to switch to a model where workers are created specifically for a TaskInstance, and then thrown away after. The task manager will also work slightly to ensure the tasks it is loading are sandboxed away from the Node. It will have to run a TaskLoader script in another sandboxxed process that returns a json string of task information. TaskManager in this mode will only have a dictionary of Tasks, it won't have any instances of them. Removing or updating a task will just be manipulating the dictionary.> 2) Should hold the execution of a task on the worker if the task code > is out-dated and resume it after synchronization. I'm working on this. > > > Code changes are mostly in tasks/packaging.py and > tasks/task_manager.py. Here are some of my notes during the > development: > > = Task Packaging = > I don't change the behavior that a worker runs a task. However, to > facilitate sandboxed execution, I offered an interface in TaskManager > (i.e., TaskManager.get_task()) to return all the additional module > search paths of a task. > > > = Task Synchronization = > This feature is much more trickier than task packaging. I made several > design decisions. > > 1) TaskManager is a module used by both the master and the worker. > Though pydra is operated in a master/slave fashion, I want the > TaskManager to rely on neither the master or the worker. So > TaskManager should not call any remote methods of the master and the > worker, and should be a relatively independent module, except that it > emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This > design will even make it easy to develop peer-to-peer task > synchronization, though we don't need this feature right now. > >Great, this is exactly how the module system should be used> 2) Though TaskManager is independent of other modules, the master and > the worker needs access to it to manage available tasks, such as > retrieving tasks and synchronizing tasks. The original architecture > only allows me to use signals. But this would be a little bit clumsy. > So I took the concept of "friend class" in C++, and added a _friend > attribute in the Module class. A module can define its friends as > follows: > > self._friends = { > 'friend1' : FriendModule1, > 'friend2' : FriendModule2, > } > > After the initialization process, the module will have two more > attributes, namely friend1 and friend2, as long as FriendModule1 and > FriendModule2 are found in the same ModuleManager as this module. Note > that this is actually contrary to the concept of friend class in C++. > > With this in hand, I made TaskManager friends of WorkerTaskControls, > TaskScheduler, and TaskSyncModule (a new module created by me) so that > the latter three can access the TaskManager directly. > >I'd like to avoid tightly coupling modules but you're right that it does become clumsy to deal with signals even if they are executed synchronously. This is ok, provided we keep tight control over which modules are allowed to do this.> 3) Synchronization between A and B may take more than one round of > message passing, depending on the synchronization algorithm used. To > flexibly support various synchronization algorithms, I defined two > interfaces in the TaskPackage class: > > # TaskPackage.active_sync(self, response, phase=1) > Generates a sync request, and perform certain actions to update the > local task package to make it the same as a remote package. The > "response" parameter is the sync response generated by that remote > task package (see below). "phase" indicates the phase which the sync > process is in. Obviously, when phase is 1, response should be passed > as None. > > The return value of this method is a tuple containing the request data > and a boolean flag, which indicates whether this task package expects > a response from the remote side. > > # TaskPackage.passive_sync(self, request, phase=1) > Generates a sync response according to the received sync request. The > "request" and "phase" parameters have similar meanings to those in > active_sync(). > > Subclasses can derive from TaskPackage and implement their own > active_sync() and passive_sync() methods to support other > synchronization mechanisms. > > Take the simplistic tar.gz synchronization algorithm for example. The > flow of a successful synchronization is: > > 1. WorkerTaskControls detects a task is out-dated, so it calls > TaskManager.active_sync() with task_key specified. > 2. TaskManager finds the task package which the task belongs to > according to the task_key. > 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1). > 4. In phase 1 of active_sync, TaskPackage simply returns its current > version, and indicates that it expects a further response from the > master. > 5. WorkerTaskControls sends the sync request generated by > active_sync() to the master > 6. TaskSyncModule at the master side receives the sync request, and > then invokes TaskPackage.passive_sync(request, 1). > 7. In TaskPackage.passive_sync(), the latest content of the task > package is compressed in tar.gz format, serialized, and returned as a > string. > 8. WorkerTaskControls receives the response from the master and > subsequently calls TaskPackage.active_sync(response, 2),which will > decompress the data and result in an update-to-date task package. > > >I have some issues with this workflow. 1) why use multiple call with a state variable "phase"? Why not use specific function calls? Are there large sections of reused code that can't be methods? 2) It should actually be synchronizing with the Node since Node is responsible for anything that affects all of its workers. Also once we have refactored the Node-Worker relationship (see above) worker will not always be around for synchronization. Since this doesn't exist yet you can just mock up a run_task() method on the Node that just tries to lookup the requested task from the TaskManager. 3) To play nice with the sandbox TaskPackage can't be responsible for sending sync messages or packaging itself. There should be a TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the entire workflow on both ends. this is more what i envisioned: 1) When a task is requested from TaskManager this should be an asynchronous call 1a) checks for if sync is in progress for that task because there might be multiple workunits fired at the same time. 1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal 1c) deferred created and registered for the requested task key. 2) TaskSyncClient, or other task management modules (ie. directory scanner) will listen for the signal and attempt to find the task. 3) If found and or transfered 3a) the module calls ADD_TASK which unpacks it and loads it as an available task. 4) The task manager fires any callbacks registered for that task (there might be multiple workunits) 5) callbacks would cause workers to be spawned and run the task. TaskManager should handle packaging/unpacking functions to separate that from synchronization. This will allow task packages to be added to the filesystem and unpacked by a different module.>Now sync requests and responses are all represented in strings. > Ideally, we will use Producers and Consumers to achieve better > performance, as we discussed. > > > Hope I expressed myself clearly. Any comments are appreciated. > > >-Peter
On Sun, Aug 16, 2009 at 12:16 AM, Peter Krenesky< wrote: > Yin QIU wrote: >> Hi, >> >> This is an (long) update on the task packaging and task >> synchronization features. I just checked in my changes into the >> task_packaging branch of my git tree. I haven't run tests against the >> new code, and it is probably not working yet. But I believe it will be >> quickly. >> >> Generally speaking, I implemented: >> >> 1) a simple task packaging scheme as per description of ticket #74. >> ? a) Note that now referencing a task needs a fully-qualified name, >> like 'some_pkg.SomeTask'. >> ? b) There can be only ONE tasks_dir, which is "task_cache" by >> default. Otherwise, it will confuse the task synchronization process. >> >> > Is it redundant packages (ie same package in two locations) that is the > problem? > > This is fine for now but eventually we'll want to support multiple > directories.Partially. The main problem is: how do we know where to put a specific task package if multiple tasks_dir are allowed? Suppose the master has 2 tasks_dir, /folder/one/ and /folder/two/. There is a task package called "demo" in /folder/one. Does that mean there should be two folders on each worker, with their names exactly being /folder/one/ and /folder/two/? If that is the case, I think this is way inflexible. And if it is not, we have to map /folder/one/ to a local directory on each worker. Please correct me if I understand you right.>> 2) a naive task synchronization mechanism to fix ticket #22 >> ? When the scheduler wants a worker to run a task or a workunit, it >> also tells the worker the latest version number of that task. If the >> worker finds that the version number does not match its local version, >> it will initiate a task synchronization request to the master. A >> synchronization session will subsequently take place between the >> master and the worker. After the session, the task definition at the >> worker side will be up-to-date. >> >> What I haven't finished: >> >> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's >> TaskManager are not handled yet. Looks like it will need much effort >> to cleanly and neatly deal with these situations on the worker side. >> However, missing this feature won't be a big problem, since the worker >> will passively synchronize with the master. >> >> > Are you referring to unloading and reloading code from python? ?It can't > really be done currently. ?Python just doesn't have the capability. > This is one of the reasons to switch to a model where workers are > created specifically for a TaskInstance, and then thrown away after. >There are two things missing. One is to inform the worker the updates so that synchronization can be done in an active way rather than passive. However, I don't know if it is desirable yet. The other is to stop task execution and to re-run them. As you said, we would change the way that workers are created and disposed. I guess this feature can be postponed.> The task manager will also work slightly to ensure the tasks it is > loading are sandboxed away from the Node. ?It will have to run a > TaskLoader script in another sandboxxed process that returns a json > string of task information. ?TaskManager in this mode will only have a > dictionary of Tasks, it won't have any instances of them. ?Removing or > updating a task will just be manipulating the dictionary. >> 2) Should hold the execution of a task on the worker if the task code >> is out-dated and resume it after synchronization. I'm working on this. >> >> >> Code changes are mostly in tasks/packaging.py and >> tasks/task_manager.py. Here are some of my notes during the >> development: >> >> = Task Packaging = >> I don't change the behavior that a worker runs a task. However, to >> facilitate sandboxed execution, I offered an interface in TaskManager >> (i.e., TaskManager.get_task()) to return all the additional module >> search paths of a task. >> >> >> = Task Synchronization = >> This feature is much more trickier than task packaging. I made several >> design decisions. >> >> 1) TaskManager is a module used by both the master and the worker. >> Though pydra is operated in a master/slave fashion, I want the >> TaskManager to rely on neither the master or the worker. So >> TaskManager should not call any remote methods of the master and the >> worker, and should be a relatively independent module, except that it >> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This >> design will even make it easy to develop peer-to-peer task >> synchronization, though we don't need this feature right now. >> >> > Great, this is exactly how the module system should be used >> 2) Though TaskManager is independent of other modules, the master and >> the worker needs access to it to manage available tasks, such as >> retrieving tasks and synchronizing tasks. The original architecture >> only allows me to use signals. But this would be a little bit clumsy. >> So I took the concept of "friend class" in C++, and added a _friend >> attribute in the Module class. A module can define its friends as >> follows: >> >> self._friends = { >> ? 'friend1' : FriendModule1, >> ? 'friend2' : FriendModule2, >> } >> >> After the initialization process, the module will have two more >> attributes, namely friend1 and friend2, as long as FriendModule1 and >> FriendModule2 are found in the same ModuleManager as this module. Note >> that this is actually contrary to the concept of friend class in C++. >> >> With this in hand, I made TaskManager friends of WorkerTaskControls, >> TaskScheduler, and TaskSyncModule (a new module created by me) so that >> the latter three can access the TaskManager directly. >> >> > I'd like to avoid tightly coupling modules but you're right that it does > become clumsy to deal with signals even if they are executed > synchronously. ?This is ok, provided we keep tight control over which > modules are allowed to do this. >Yeah, tight coupling was my concern about this change too. But I didn't find a better solution. The main reason is, I guess, TaskManager naturally has tight relationship with the master and workers, but a single shared "registry" attribute is not enough to reflect this relationship.>> 3) Synchronization between A and B may take more than one round of >> message passing, depending on the synchronization algorithm used. To >> flexibly support various synchronization algorithms, I defined two >> interfaces in the TaskPackage class: >> >> # TaskPackage.active_sync(self, response, phase=1) >> Generates a sync request, and perform certain actions to update the >> local task package to make it the same as a remote package. The >> "response" parameter is the sync response generated by that remote >> task package (see below). "phase" indicates the phase which the sync >> process is in. Obviously, when phase is 1, response should be passed >> as None. >> >> The return value of this method is a tuple containing the request data >> and a boolean flag, which indicates whether this task package expects >> a response from the remote side. >> >> # TaskPackage.passive_sync(self, request, phase=1) >> Generates a sync response according to the received sync request. The >> "request" and "phase" parameters have similar meanings to those in >> active_sync(). >> >> Subclasses can derive from TaskPackage and implement their own >> active_sync() and passive_sync() methods to support other >> synchronization mechanisms. >> >> Take the simplistic tar.gz synchronization algorithm for example. The >> flow of a successful synchronization is: >> >> 1. WorkerTaskControls detects a task is out-dated, so it calls >> TaskManager.active_sync() with task_key specified. >> 2. TaskManager finds the task package which the task belongs to >> according to the task_key. >> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1). >> 4. In phase 1 of active_sync, TaskPackage simply returns its current >> version, and indicates that it expects a further response from the >> master. >> 5. WorkerTaskControls sends the sync request generated by >> active_sync() to the master >> 6. TaskSyncModule at the master side receives the sync request, and >> then invokes TaskPackage.passive_sync(request, 1). >> 7. In TaskPackage.passive_sync(), the latest content of the task >> package is compressed in tar.gz format, serialized, and returned as a >> string. >> 8. WorkerTaskControls receives the response from the master and >> subsequently calls TaskPackage.active_sync(response, 2),which will >> decompress the data and result in an update-to-date task package. >> >> >> > I have some issues with this workflow. > > 1) why use multiple call with a state variable "phase"? ?Why not ?use > specific function calls? ?Are there large sections of reused code that > can't be methods?In my simplistic "tar.gz" scheme, there are 2 phases at the sync requester side, and 1 phase at the sync server side. That is: phase 1 at the worker side: worker ---sends local version--> master phase 1 at the master side: master (phase 1)---sends compressed data--> worker phase 2 at the worker side: worker decompresses the data but other synchronization schemes, which may be based on delta encoding, could require more phases to do a synchronization. I cannot offer a set of versatile interfaces for that.> > 2) It should actually be synchronizing with the Node since Node is > responsible for anything that affects all of its workers. ?Also once we > have refactored the Node-Worker relationship (see above) worker will not > always be around for synchronization. >Oh, I had doubts about this when I first started working on task sync. Sorry that I later forgot it.> Since this doesn't exist yet you can just mock up a run_task() method on > the Node that just tries to lookup the requested task from the TaskManager. >Simply mocking up the run_task() method is not enough. Now the scheduler directly calls the remote interfaces of workers. Seems like we would change the way that the scheduler works and maintain node references on the master, which would result in lots of changes. Is that acceptable for now?> 3) To play nice with the sandbox TaskPackage can't be responsible for > sending sync messages or packaging itself. ?There should be a > TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the > entire workflow on both ends.I totally agree. Actually it is almost my current design. A TaskSyncModule at the master side handles sync requests. I put the sync requesting functionality in WorkerTaskControls without defining a new module.> > this is more what i envisioned: > > 1) When a task is requested from TaskManager this should be an > asynchronous call > ? 1a) checks for if sync is in progress for that task because there > might be multiple workunits fired at the same time.Good point! I did not consider this :(> ? 1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal > ? 1c) deferred created and registered for the requested task key. > 2) TaskSyncClient, or other task management modules (ie. directory > scanner) will listen for the signal and attempt to find the task. > 3) If found and or transfered > ? ? 3a) the module calls ADD_TASK which unpacks it and loads it as an > available task. > 4) The task manager fires any callbacks registered for that task (there > might be multiple workunits) > 5) callbacks would cause workers to be spawned and run the task.In fact I'm planning to adopt this asynchronous design. There are two reasons that drives me to do this: 1) task synchronization may be time-consuming, especially in case of a "cold start"; 2) twisted's async nature :-) There is however an embarrassing question: since the pencil-down date is approaching, I guess I won't be able to finish this before that date. Can I continue working on it after gsoc?> > TaskManager should handle packaging/unpacking functions to separate that > from synchronization. ?This will allow task packages to be added to the > filesystem and unpacked by a different module. >Could you explain more on this? I'm sorry I didn't understand it quite well.>>Now sync requests and responses are all represented in strings. >> Ideally, we will use Producers and Consumers to achieve better >> performance, as we discussed. >> >> >> Hope I expressed myself clearly. Any comments are appreciated. >> >> >> > > -Peter > _______________________________________________ > Pydra mailing list > Pydra at osuosl.org > http://lists.osuosl.org/mailman/listinfo/pydr... >
Yin QIU wrote: > On Sun, Aug 16, 2009 at 12:16 AM, Peter Krenesky< wrote: > >> Yin QIU wrote: >> >>> Hi, >>> >>> This is an (long) update on the task packaging and task >>> synchronization features. I just checked in my changes into the >>> task_packaging branch of my git tree. I haven't run tests against the >>> new code, and it is probably not working yet. But I believe it will be >>> quickly. >>> >>> Generally speaking, I implemented: >>> >>> 1) a simple task packaging scheme as per description of ticket #74. >>> a) Note that now referencing a task needs a fully-qualified name, >>> like 'some_pkg.SomeTask'. >>> b) There can be only ONE tasks_dir, which is "task_cache" by >>> default. Otherwise, it will confuse the task synchronization process. >>> >>> >>> >> Is it redundant packages (ie same package in two locations) that is the >> problem? >> >> This is fine for now but eventually we'll want to support multiple >> directories. >> > > Partially. The main problem is: how do we know where to put a specific > task package if multiple tasks_dir are allowed? > > Suppose the master has 2 tasks_dir, /folder/one/ and /folder/two/. > There is a task package called "demo" in /folder/one. Does that mean > there should be two folders on each worker, with their names exactly > being /folder/one/ and /folder/two/? If that is the case, I think this > is way inflexible. And if it is not, we have to map /folder/one/ to a > local directory on each worker. Please correct me if I understand you > right. > > >oh you're right. That is a big problem. One directory is fine for now. We'll have to revisit this later for a better solution>>> 2) a naive task synchronization mechanism to fix ticket #22 >>> When the scheduler wants a worker to run a task or a workunit, it >>> also tells the worker the latest version number of that task. If the >>> worker finds that the version number does not match its local version, >>> it will initiate a task synchronization request to the master. A >>> synchronization session will subsequently take place between the >>> master and the worker. After the session, the task definition at the >>> worker side will be up-to-date. >>> >>> What I haven't finished: >>> >>> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's >>> TaskManager are not handled yet. Looks like it will need much effort >>> to cleanly and neatly deal with these situations on the worker side. >>> However, missing this feature won't be a big problem, since the worker >>> will passively synchronize with the master. >>> >>> >>> >> Are you referring to unloading and reloading code from python? It can't >> really be done currently. Python just doesn't have the capability. >> This is one of the reasons to switch to a model where workers are >> created specifically for a TaskInstance, and then thrown away after. >> >> > > There are two things missing. One is to inform the worker the updates > so that synchronization can be done in an active way rather than > passive. However, I don't know if it is desirable yet. > > The other is to stop task execution and to re-run them. As you said, > we would change the way that workers are created and disposed. I guess > this feature can be postponed. > >Workers should not be updated if they are already running a task. There is no way to ensure that the newer version will be compatible.This is a big problem. We could have two instances of the same task scheduled and the task updated before the first task completes. I think we need to include versioning in the package location and instance tracking so that tasks that are currently running can finish using the same set of code they started with. This would mean some extra calls to ensure that unused versions are removed after they are no longer needed. I think this could be most problematic with dependencies. We'd have to know which version of a dependency a task started with.>> The task manager will also work slightly to ensure the tasks it is >> loading are sandboxed away from the Node. It will have to run a >> TaskLoader script in another sandboxxed process that returns a json >> string of task information. TaskManager in this mode will only have a >> dictionary of Tasks, it won't have any instances of them. Removing or >> updating a task will just be manipulating the dictionary. >> >>> 2) Should hold the execution of a task on the worker if the task code >>> is out-dated and resume it after synchronization. I'm working on this. >>> >>> >>> Code changes are mostly in tasks/packaging.py and >>> tasks/task_manager.py. Here are some of my notes during the >>> development: >>> >>> = Task Packaging = >>> I don't change the behavior that a worker runs a task. However, to >>> facilitate sandboxed execution, I offered an interface in TaskManager >>> (i.e., TaskManager.get_task()) to return all the additional module >>> search paths of a task. >>> >>> >>> = Task Synchronization = >>> This feature is much more trickier than task packaging. I made several >>> design decisions. >>> >>> 1) TaskManager is a module used by both the master and the worker. >>> Though pydra is operated in a master/slave fashion, I want the >>> TaskManager to rely on neither the master or the worker. So >>> TaskManager should not call any remote methods of the master and the >>> worker, and should be a relatively independent module, except that it >>> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This >>> design will even make it easy to develop peer-to-peer task >>> synchronization, though we don't need this feature right now. >>> >>> >>> >> Great, this is exactly how the module system should be used >> >>> 2) Though TaskManager is independent of other modules, the master and >>> the worker needs access to it to manage available tasks, such as >>> retrieving tasks and synchronizing tasks. The original architecture >>> only allows me to use signals. But this would be a little bit clumsy. >>> So I took the concept of "friend class" in C++, and added a _friend >>> attribute in the Module class. A module can define its friends as >>> follows: >>> >>> self._friends = { >>> 'friend1' : FriendModule1, >>> 'friend2' : FriendModule2, >>> } >>> >>> After the initialization process, the module will have two more >>> attributes, namely friend1 and friend2, as long as FriendModule1 and >>> FriendModule2 are found in the same ModuleManager as this module. Note >>> that this is actually contrary to the concept of friend class in C++. >>> >>> With this in hand, I made TaskManager friends of WorkerTaskControls, >>> TaskScheduler, and TaskSyncModule (a new module created by me) so that >>> the latter three can access the TaskManager directly. >>> >>> >>> >> I'd like to avoid tightly coupling modules but you're right that it does >> become clumsy to deal with signals even if they are executed >> synchronously. This is ok, provided we keep tight control over which >> modules are allowed to do this. >> >> > > Yeah, tight coupling was my concern about this change too. But I > didn't find a better solution. The main reason is, I guess, > TaskManager naturally has tight relationship with the master and > workers, but a single shared "registry" attribute is not enough to > reflect this relationship. > > >>> 3) Synchronization between A and B may take more than one round of >>> message passing, depending on the synchronization algorithm used. To >>> flexibly support various synchronization algorithms, I defined two >>> interfaces in the TaskPackage class: >>> >>> # TaskPackage.active_sync(self, response, phase=1) >>> Generates a sync request, and perform certain actions to update the >>> local task package to make it the same as a remote package. The >>> "response" parameter is the sync response generated by that remote >>> task package (see below). "phase" indicates the phase which the sync >>> process is in. Obviously, when phase is 1, response should be passed >>> as None. >>> >>> The return value of this method is a tuple containing the request data >>> and a boolean flag, which indicates whether this task package expects >>> a response from the remote side. >>> >>> # TaskPackage.passive_sync(self, request, phase=1) >>> Generates a sync response according to the received sync request. The >>> "request" and "phase" parameters have similar meanings to those in >>> active_sync(). >>> >>> Subclasses can derive from TaskPackage and implement their own >>> active_sync() and passive_sync() methods to support other >>> synchronization mechanisms. >>> >>> Take the simplistic tar.gz synchronization algorithm for example. The >>> flow of a successful synchronization is: >>> >>> 1. WorkerTaskControls detects a task is out-dated, so it calls >>> TaskManager.active_sync() with task_key specified. >>> 2. TaskManager finds the task package which the task belongs to >>> according to the task_key. >>> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1). >>> 4. In phase 1 of active_sync, TaskPackage simply returns its current >>> version, and indicates that it expects a further response from the >>> master. >>> 5. WorkerTaskControls sends the sync request generated by >>> active_sync() to the master >>> 6. TaskSyncModule at the master side receives the sync request, and >>> then invokes TaskPackage.passive_sync(request, 1). >>> 7. In TaskPackage.passive_sync(), the latest content of the task >>> package is compressed in tar.gz format, serialized, and returned as a >>> string. >>> 8. WorkerTaskControls receives the response from the master and >>> subsequently calls TaskPackage.active_sync(response, 2),which will >>> decompress the data and result in an update-to-date task package. >>> >>> >>> >>> >> I have some issues with this workflow. >> >> 1) why use multiple call with a state variable "phase"? Why not use >> specific function calls? Are there large sections of reused code that >> can't be methods? >> > > In my simplistic "tar.gz" scheme, there are 2 phases at the sync > requester side, and 1 phase at the sync server side. That is: > > phase 1 at the worker side: worker ---sends local version--> master > phase 1 at the master side: master (phase 1)---sends compressed data--> worker > phase 2 at the worker side: worker decompresses the data > > but other synchronization schemes, which may be based on delta > encoding, could require more phases to do a synchronization. I cannot > offer a set of versatile interfaces for that. > >I was just saying that phase 1 and two seem like distinct calls: * node.request_sync() calls master.request_sync() * master.request_sync() calls node.receive_sync() with data it shouldn't matter whether node.receive_sync() is receiving a whole tarball, deltas, etc. But i don't see the code for this in your repository so i couldnt tell for sure if there was more to it.>> 2) It should actually be synchronizing with the Node since Node is >> responsible for anything that affects all of its workers. Also once we >> have refactored the Node-Worker relationship (see above) worker will not >> always be around for synchronization. >> >> > > Oh, I had doubts about this when I first started working on task sync. > Sorry that I later forgot it. > > >> Since this doesn't exist yet you can just mock up a run_task() method on >> the Node that just tries to lookup the requested task from the TaskManager. >> >> > > Simply mocking up the run_task() method is not enough. Now the > scheduler directly calls the remote interfaces of workers. Seems like > we would change the way that the scheduler works and maintain node > references on the master, which would result in lots of changes. Is > that acceptable for now? > >Correct. refactoring the node-worker relationship will mean that scheduler interacts with the node. For many things the node will act as a proxy to the worker. The node will have all the remotes (run_task, send_results, etc) that worker currently has, but they will be mostly proxies to the worker. perhaps this is a better solution: write the module TaskSyncClient and include it as part of the Worker. When we move these functions over to the Node we can move the TaskSyncClient as well. Node will eventually have a NodeTaskControls module that has the same set of listeners, signals, and remotes that WorkerTaskControls has. It should be fairly transparent and an easy switch.>> 3) To play nice with the sandbox TaskPackage can't be responsible for >> sending sync messages or packaging itself. There should be a >> TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the >> entire workflow on both ends. >> > > I totally agree. Actually it is almost my current design. A > TaskSyncModule at the master side handles sync requests. I put the > sync requesting functionality in WorkerTaskControls without defining a > new module. > > >> this is more what i envisioned: >> >> 1) When a task is requested from TaskManager this should be an >> asynchronous call >> 1a) checks for if sync is in progress for that task because there >> might be multiple workunits fired at the same time. >> > > Good point! I did not consider this :( > > >> 1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal >> 1c) deferred created and registered for the requested task key. >> 2) TaskSyncClient, or other task management modules (ie. directory >> scanner) will listen for the signal and attempt to find the task. >> 3) If found and or transfered >> 3a) the module calls ADD_TASK which unpacks it and loads it as an >> available task. >> 4) The task manager fires any callbacks registered for that task (there >> might be multiple workunits) >> 5) callbacks would cause workers to be spawned and run the task. >> > > In fact I'm planning to adopt this asynchronous design. There are two > reasons that drives me to do this: 1) task synchronization may be > time-consuming, especially in case of a "cold start"; 2) twisted's > async nature :-) > > There is however an embarrassing question: since the pencil-down date > is approaching, I guess I won't be able to finish this before that > date. Can I continue working on it after gsoc? > >Absolutely. I'd really like for you to remain a part of the community after GSOC ends. Even though you haven't completed all the code in your proposal, you've added a lot of value to the project. We've worked through the logic of some very tough problems that on my own I probably wouldn't have gotten right.>> TaskManager should handle packaging/unpacking functions to separate that >> from synchronization. This will allow task packages to be added to the >> filesystem and unpacked by a different module. >> >> > > Could you explain more on this? I'm sorry I didn't understand it quite well. > >We'll also want another components to be able to unpack tasks from other sources: 1) tarballs or source dropped directly into task_cache 2) tarballs uploaded to the website That means the code for unpacking the code must be in a general place. TaskManager makes the most sense as it handles loading tasks and maintaining dictionaries of available tasks.>>> Now sync requests and responses are all represented in strings. >>> Ideally, we will use Producers and Consumers to achieve better >>> performance, as we discussed. >>> >>> >>> Hope I expressed myself clearly. Any comments are appreciated. >>> >>> >>> >>> >> -Peter >> _______________________________________________ >> Pydra mailing list >> Pydra at osuosl.org >> http://lists.osuosl.org/mailman/listinfo/pydr... >> >> > > > >
On Sun, Aug 16, 2009 at 2:51 AM, Peter Krenesky< wrote: > Yin QIU wrote: >> On Sun, Aug 16, 2009 at 12:16 AM, Peter Krenesky< wrote: >> >>> Yin QIU wrote: >>> >>>> Hi, >>>> >>>> This is an (long) update on the task packaging and task >>>> synchronization features. I just checked in my changes into the >>>> task_packaging branch of my git tree. I haven't run tests against the >>>> new code, and it is probably not working yet. But I believe it will be >>>> quickly. >>>> >>>> Generally speaking, I implemented: >>>> >>>> 1) a simple task packaging scheme as per description of ticket #74. >>>> ? a) Note that now referencing a task needs a fully-qualified name, >>>> like 'some_pkg.SomeTask'. >>>> ? b) There can be only ONE tasks_dir, which is "task_cache" by >>>> default. Otherwise, it will confuse the task synchronization process. >>>> >>>> >>>> >>> Is it redundant packages (ie same package in two locations) that is the >>> problem? >>> >>> This is fine for now but eventually we'll want to support multiple >>> directories. >>> >> >> Partially. The main problem is: how do we know where to put a specific >> task package if multiple tasks_dir are allowed? >> >> Suppose the master has 2 tasks_dir, /folder/one/ and /folder/two/. >> There is a task package called "demo" in /folder/one. Does that mean >> there should be two folders on each worker, with their names exactly >> being /folder/one/ and /folder/two/? If that is the case, I think this >> is way inflexible. And if it is not, we have to map /folder/one/ to a >> local directory on each worker. Please correct me if I understand you >> right. >> >> >> > oh you're right. ?That is a big problem. ?One directory is fine for > now. ?We'll have to revisit this later for a better solution >>>> 2) a naive task synchronization mechanism to fix ticket #22 >>>> ? When the scheduler wants a worker to run a task or a workunit, it >>>> also tells the worker the latest version number of that task. If the >>>> worker finds that the version number does not match its local version, >>>> it will initiate a task synchronization request to the master. A >>>> synchronization session will subsequently take place between the >>>> master and the worker. After the session, the task definition at the >>>> worker side will be up-to-date. >>>> >>>> What I haven't finished: >>>> >>>> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's >>>> TaskManager are not handled yet. Looks like it will need much effort >>>> to cleanly and neatly deal with these situations on the worker side. >>>> However, missing this feature won't be a big problem, since the worker >>>> will passively synchronize with the master. >>>> >>>> >>>> >>> Are you referring to unloading and reloading code from python? ?It can't >>> really be done currently. ?Python just doesn't have the capability. >>> This is one of the reasons to switch to a model where workers are >>> created specifically for a TaskInstance, and then thrown away after. >>> >>> >> >> There are two things missing. One is to inform the worker the updates >> so that synchronization can be done in an active way rather than >> passive. However, I don't know if it is desirable yet. >> >> The other is to stop task execution and to re-run them. As you said, >> we would change the way that workers are created and disposed. I guess >> this feature can be postponed. >> >> > Workers should not be updated if they are already running a task. ?There > is no way to ensure that the newer version will be compatible. > > This is a big problem. ?We could have two instances of the same task > scheduled and the task updated before the first task completes. > > I think we need to include versioning in the package location and > instance tracking so that tasks that are currently running can finish > using the same set of code they started with. ?This would mean some > extra calls to ensure that unused versions are removed after they are no > longer needed. >A second thought brought me a question: do we really need to support runtime task synchronization? I mean, if the definition of a task is updated, it is more natural that we stop all current running instances of that task, as well as those tasks that depend on it, otherwise the running results wouldn't be reliable. Stopping running tasks could be done either by the user himself or in a programmed way.> I think this could be most problematic with dependencies. ?We'd have to > know which version of a dependency a task started with. >If this is needed, I can add version to the dependency graph in addition to package names.>>> The task manager will also work slightly to ensure the tasks it is >>> loading are sandboxed away from the Node. ?It will have to run a >>> TaskLoader script in another sandboxxed process that returns a json >>> string of task information. ?TaskManager in this mode will only have a >>> dictionary of Tasks, it won't have any instances of them. ?Removing or >>> updating a task will just be manipulating the dictionary. >>> >>>> 2) Should hold the execution of a task on the worker if the task code >>>> is out-dated and resume it after synchronization. I'm working on this. >>>> >>>> >>>> Code changes are mostly in tasks/packaging.py and >>>> tasks/task_manager.py. Here are some of my notes during the >>>> development: >>>> >>>> = Task Packaging = >>>> I don't change the behavior that a worker runs a task. However, to >>>> facilitate sandboxed execution, I offered an interface in TaskManager >>>> (i.e., TaskManager.get_task()) to return all the additional module >>>> search paths of a task. >>>> >>>> >>>> = Task Synchronization = >>>> This feature is much more trickier than task packaging. I made several >>>> design decisions. >>>> >>>> 1) TaskManager is a module used by both the master and the worker. >>>> Though pydra is operated in a master/slave fashion, I want the >>>> TaskManager to rely on neither the master or the worker. So >>>> TaskManager should not call any remote methods of the master and the >>>> worker, and should be a relatively independent module, except that it >>>> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This >>>> design will even make it easy to develop peer-to-peer task >>>> synchronization, though we don't need this feature right now. >>>> >>>> >>>> >>> Great, this is exactly how the module system should be used >>> >>>> 2) Though TaskManager is independent of other modules, the master and >>>> the worker needs access to it to manage available tasks, such as >>>> retrieving tasks and synchronizing tasks. The original architecture >>>> only allows me to use signals. But this would be a little bit clumsy. >>>> So I took the concept of "friend class" in C++, and added a _friend >>>> attribute in the Module class. A module can define its friends as >>>> follows: >>>> >>>> self._friends = { >>>> ? 'friend1' : FriendModule1, >>>> ? 'friend2' : FriendModule2, >>>> } >>>> >>>> After the initialization process, the module will have two more >>>> attributes, namely friend1 and friend2, as long as FriendModule1 and >>>> FriendModule2 are found in the same ModuleManager as this module. Note >>>> that this is actually contrary to the concept of friend class in C++. >>>> >>>> With this in hand, I made TaskManager friends of WorkerTaskControls, >>>> TaskScheduler, and TaskSyncModule (a new module created by me) so that >>>> the latter three can access the TaskManager directly. >>>> >>>> >>>> >>> I'd like to avoid tightly coupling modules but you're right that it does >>> become clumsy to deal with signals even if they are executed >>> synchronously. ?This is ok, provided we keep tight control over which >>> modules are allowed to do this. >>> >>> >> >> Yeah, tight coupling was my concern about this change too. But I >> didn't find a better solution. The main reason is, I guess, >> TaskManager naturally has tight relationship with the master and >> workers, but a single shared "registry" attribute is not enough to >> reflect this relationship. >> >> >>>> 3) Synchronization between A and B may take more than one round of >>>> message passing, depending on the synchronization algorithm used. To >>>> flexibly support various synchronization algorithms, I defined two >>>> interfaces in the TaskPackage class: >>>> >>>> # TaskPackage.active_sync(self, response, phase=1) >>>> Generates a sync request, and perform certain actions to update the >>>> local task package to make it the same as a remote package. The >>>> "response" parameter is the sync response generated by that remote >>>> task package (see below). "phase" indicates the phase which the sync >>>> process is in. Obviously, when phase is 1, response should be passed >>>> as None. >>>> >>>> The return value of this method is a tuple containing the request data >>>> and a boolean flag, which indicates whether this task package expects >>>> a response from the remote side. >>>> >>>> # TaskPackage.passive_sync(self, request, phase=1) >>>> Generates a sync response according to the received sync request. The >>>> "request" and "phase" parameters have similar meanings to those in >>>> active_sync(). >>>> >>>> Subclasses can derive from TaskPackage and implement their own >>>> active_sync() and passive_sync() methods to support other >>>> synchronization mechanisms. >>>> >>>> Take the simplistic tar.gz synchronization algorithm for example. The >>>> flow of a successful synchronization is: >>>> >>>> 1. WorkerTaskControls detects a task is out-dated, so it calls >>>> TaskManager.active_sync() with task_key specified. >>>> 2. TaskManager finds the task package which the task belongs to >>>> according to the task_key. >>>> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1). >>>> 4. In phase 1 of active_sync, TaskPackage simply returns its current >>>> version, and indicates that it expects a further response from the >>>> master. >>>> 5. WorkerTaskControls sends the sync request generated by >>>> active_sync() to the master >>>> 6. TaskSyncModule at the master side receives the sync request, and >>>> then invokes TaskPackage.passive_sync(request, 1). >>>> 7. In TaskPackage.passive_sync(), the latest content of the task >>>> package is compressed in tar.gz format, serialized, and returned as a >>>> string. >>>> 8. WorkerTaskControls receives the response from the master and >>>> subsequently calls TaskPackage.active_sync(response, 2),which will >>>> decompress the data and result in an update-to-date task package. >>>> >>>> >>>> >>>> >>> I have some issues with this workflow. >>> >>> 1) why use multiple call with a state variable "phase"? ?Why not ?use >>> specific function calls? ?Are there large sections of reused code that >>> can't be methods? >>> >> >> In my simplistic "tar.gz" scheme, there are 2 phases at the sync >> requester side, and 1 phase at the sync server side. That is: >> >> phase 1 at the worker side: worker ---sends local version--> master >> phase 1 at the master side: master (phase 1)---sends compressed data--> worker >> phase 2 at the worker side: worker decompresses the data >> >> but other synchronization schemes, which may be based on delta >> encoding, could require more phases to do a synchronization. I cannot >> offer a set of versatile interfaces for that. >> >> > I was just saying that phase 1 and two seem like distinct calls: > ? * node.request_sync() calls ?master.request_sync() > ? * master.request_sync() calls node.receive_sync() with data > > it shouldn't matter whether node.receive_sync() is receiving a whole > tarball, deltas, etc. ?But i don't see the code for this in your > repository so i couldnt tell for sure if there was more to it. >You were right. active_sync() and passive_sync() are supposed to be implemented in a way like this: if phase = 1: # phase 1 actions elif phase = 2: # phase 2 actions. .... # if there are more phases packaging.TaskPackage.active_sync() and packaging.TaskPackage.passive_sync() contain a simple implementation for tarball-style synchroniztaion. A bsdiff-based BsdiffTaskPackage class derived from TaskPackage is also added, but hasn't been implemented yet.>>> 2) It should actually be synchronizing with the Node since Node is >>> responsible for anything that affects all of its workers. ?Also once we >>> have refactored the Node-Worker relationship (see above) worker will not >>> always be around for synchronization. >>> >>> >> >> Oh, I had doubts about this when I first started working on task sync. >> Sorry that I later forgot it. >> >> >>> Since this doesn't exist yet you can just mock up a run_task() method on >>> the Node that just tries to lookup the requested task from the TaskManager. >>> >>> >> >> Simply mocking up the run_task() method is not enough. Now the >> scheduler directly calls the remote interfaces of workers. Seems like >> we would change the way that the scheduler works and maintain node >> references on the master, which would result in lots of changes. Is >> that acceptable for now? >> >> > Correct. ?refactoring the node-worker relationship will mean that > scheduler interacts with the node. ?For many things the node will act as > a proxy to the worker. ?The node will have all the remotes (run_task, > send_results, etc) that worker currently has, but they will be mostly > proxies to the worker. > > perhaps this is a better solution: > ? ?write the module TaskSyncClient and include it as part of the > Worker. ?When we move these functions over to the Node we can move the > TaskSyncClient as well. ?Node will eventually have a NodeTaskControls > module that has the same set of listeners, signals, and remotes that > WorkerTaskControls has. ?It should be fairly transparent and an easy switch.Okay. I'll do this.>>> 3) To play nice with the sandbox TaskPackage can't be responsible for >>> sending sync messages or packaging itself. ?There should be a >>> TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the >>> entire workflow on both ends. >>> >> >> I totally agree. Actually it is almost my current design. A >> TaskSyncModule at the master side handles sync requests. I put the >> sync requesting functionality in WorkerTaskControls without defining a >> new module. >> >> >>> this is more what i envisioned: >>> >>> 1) When a task is requested from TaskManager this should be an >>> asynchronous call >>> ? 1a) checks for if sync is in progress for that task because there >>> might be multiple workunits fired at the same time. >>> >> >> Good point! I did not consider this :( >> >> >>> ? 1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal >>> ? 1c) deferred created and registered for the requested task key. >>> 2) TaskSyncClient, or other task management modules (ie. directory >>> scanner) will listen for the signal and attempt to find the task. >>> 3) If found and or transfered >>> ? ? 3a) the module calls ADD_TASK which unpacks it and loads it as an >>> available task. >>> 4) The task manager fires any callbacks registered for that task (there >>> might be multiple workunits) >>> 5) callbacks would cause workers to be spawned and run the task. >>> >> >> In fact I'm planning to adopt this asynchronous design. There are two >> reasons that drives me to do this: 1) task synchronization may be >> time-consuming, especially in case of a "cold start"; 2) twisted's >> async nature :-) >> >> There is however an embarrassing question: since the pencil-down date >> is approaching, I guess I won't be able to finish this before that >> date. Can I continue working on it after gsoc? >> >> > Absolutely. ?I'd really like for you to remain a part of the community > after GSOC ends. ?Even though you haven't completed all the code in your > proposal, you've added a lot of ?value to the project. ?We've worked > through the logic of some very tough problems that on my own I probably > wouldn't have gotten right.Great. Thanks!>>> TaskManager should handle packaging/unpacking functions to separate that >>> from synchronization. ?This will allow task packages to be added to the >>> filesystem and unpacked by a different module. >>> >>> >> >> Could you explain more on this? I'm sorry I didn't understand it quite well. >> >> > We'll also want another components to be able to unpack tasks from other > sources: > > ? ?1) tarballs or source dropped directly into task_cache > ? ?2) tarballs uploaded to the website > > That means the code for unpacking the code must be in a general place. > TaskManager makes the most sense as it handles loading tasks and > maintaining dictionaries of available tasks.ok, I get it. It won't be hard to do this.>>>> Now sync requests and responses are all represented in strings. >>>> Ideally, we will use Producers and Consumers to achieve better >>>> performance, as we discussed. >>>> >>>> >>>> Hope I expressed myself clearly. Any comments are appreciated. >>>> >>>>
Yin QIU wrote: > On Sun, Aug 16, 2009 at 2:51 AM, Peter Krenesky< wrote: > >> Yin QIU wrote: >> >>> On Sun, Aug 16, 2009 at 12:16 AM, Peter Krenesky< wrote: >>> >>> >>>> Yin QIU wrote: >>>> >>>> >>>>> Hi, >>>>> >>>>> This is an (long) update on the task packaging and task >>>>> synchronization features. I just checked in my changes into the >>>>> task_packaging branch of my git tree. I haven't run tests against the >>>>> new code, and it is probably not working yet. But I believe it will be >>>>> quickly. >>>>> >>>>> Generally speaking, I implemented: >>>>> >>>>> 1) a simple task packaging scheme as per description of ticket #74. >>>>> a) Note that now referencing a task needs a fully-qualified name, >>>>> like 'some_pkg.SomeTask'. >>>>> b) There can be only ONE tasks_dir, which is "task_cache" by >>>>> default. Otherwise, it will confuse the task synchronization process. >>>>> >>>>> >>>>> >>>>> >>>> Is it redundant packages (ie same package in two locations) that is the >>>> problem? >>>> >>>> This is fine for now but eventually we'll want to support multiple >>>> directories. >>>> >>>> >>> Partially. The main problem is: how do we know where to put a specific >>> task package if multiple tasks_dir are allowed? >>> >>> Suppose the master has 2 tasks_dir, /folder/one/ and /folder/two/. >>> There is a task package called "demo" in /folder/one. Does that mean >>> there should be two folders on each worker, with their names exactly >>> being /folder/one/ and /folder/two/? If that is the case, I think this >>> is way inflexible. And if it is not, we have to map /folder/one/ to a >>> local directory on each worker. Please correct me if I understand you >>> right. >>> >>> >>> >>> >> oh you're right. That is a big problem. One directory is fine for >> now. We'll have to revisit this later for a better solution >> >>>>> 2) a naive task synchronization mechanism to fix ticket #22 >>>>> When the scheduler wants a worker to run a task or a workunit, it >>>>> also tells the worker the latest version number of that task. If the >>>>> worker finds that the version number does not match its local version, >>>>> it will initiate a task synchronization request to the master. A >>>>> synchronization session will subsequently take place between the >>>>> master and the worker. After the session, the task definition at the >>>>> worker side will be up-to-date. >>>>> >>>>> What I haven't finished: >>>>> >>>>> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's >>>>> TaskManager are not handled yet. Looks like it will need much effort >>>>> to cleanly and neatly deal with these situations on the worker side. >>>>> However, missing this feature won't be a big problem, since the worker >>>>> will passively synchronize with the master. >>>>> >>>>> >>>>> >>>>> >>>> Are you referring to unloading and reloading code from python? It can't >>>> really be done currently. Python just doesn't have the capability. >>>> This is one of the reasons to switch to a model where workers are >>>> created specifically for a TaskInstance, and then thrown away after. >>>> >>>> >>>> >>> There are two things missing. One is to inform the worker the updates >>> so that synchronization can be done in an active way rather than >>> passive. However, I don't know if it is desirable yet. >>> >>> The other is to stop task execution and to re-run them. As you said, >>> we would change the way that workers are created and disposed. I guess >>> this feature can be postponed. >>> >>> >>> >> Workers should not be updated if they are already running a task. There >> is no way to ensure that the newer version will be compatible. >> >> This is a big problem. We could have two instances of the same task >> scheduled and the task updated before the first task completes. >> >> I think we need to include versioning in the package location and >> instance tracking so that tasks that are currently running can finish >> using the same set of code they started with. This would mean some >> extra calls to ensure that unused versions are removed after they are no >> longer needed. >> >> > > A second thought brought me a question: do we really need to support > runtime task synchronization? I mean, if the definition of a task is > updated, it is more natural that we stop all current running instances > of that task, as well as those tasks that depend on it, otherwise the > running results wouldn't be reliable. Stopping running tasks could be > done either by the user himself or in a programmed way. > >Well i guess it depends on reason for the upgrade: - is it efficiency improvement? - is it a bug fix? - is it a new feature? - is it a new parameter? I think all of those can and will happen. bug fixes make sense. In some cases efficiency will warrant restarting but not all the time. What if you're 99% done with a task and you've just added a newer version that will run better next time? You could require jobs running the old version to stop first before any new jobs could start, giving a window in which to update the task. This means the cluster would slow down during this time because it wouldn't be able to start certain jobs while it waits for others to finish.>> I think this could be most problematic with dependencies. We'd have to >> know which version of a dependency a task started with. >> >> > > If this is needed, I can add version to the dependency graph in > addition to package names. > > >>>> The task manager will also work slightly to ensure the tasks it is >>>> loading are sandboxed away from the Node. It will have to run a >>>> TaskLoader script in another sandboxxed process that returns a json >>>> string of task information. TaskManager in this mode will only have a >>>> dictionary of Tasks, it won't have any instances of them. Removing or >>>> updating a task will just be manipulating the dictionary. >>>> >>>> >>>>> 2) Should hold the execution of a task on the worker if the task code >>>>> is out-dated and resume it after synchronization. I'm working on this. >>>>> >>>>> >>>>> Code changes are mostly in tasks/packaging.py and >>>>> tasks/task_manager.py. Here are some of my notes during the >>>>> development: >>>>> >>>>> = Task Packaging = >>>>> I don't change the behavior that a worker runs a task. However, to >>>>> facilitate sandboxed execution, I offered an interface in TaskManager >>>>> (i.e., TaskManager.get_task()) to return all the additional module >>>>> search paths of a task. >>>>> >>>>> >>>>> = Task Synchronization = >>>>> This feature is much more trickier than task packaging. I made several >>>>> design decisions. >>>>> >>>>> 1) TaskManager is a module used by both the master and the worker. >>>>> Though pydra is operated in a master/slave fashion, I want the >>>>> TaskManager to rely on neither the master or the worker. So >>>>> TaskManager should not call any remote methods of the master and the >>>>> worker, and should be a relatively independent module, except that it >>>>> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This >>>>> design will even make it easy to develop peer-to-peer task >>>>> synchronization, though we don't need this feature right now. >>>>> >>>>> >>>>> >>>>> >>>> Great, this is exactly how the module system should be used >>>> >>>> >>>>> 2) Though TaskManager is independent of other modules, the master and >>>>> the worker needs access to it to manage available tasks, such as >>>>> retrieving tasks and synchronizing tasks. The original architecture >>>>> only allows me to use signals. But this would be a little bit clumsy. >>>>> So I took the concept of "friend class" in C++, and added a _friend >>>>> attribute in the Module class. A module can define its friends as >>>>> follows: >>>>> >>>>> self._friends = { >>>>> 'friend1' : FriendModule1, >>>>> 'friend2' : FriendModule2, >>>>> } >>>>> >>>>> After the initialization process, the module will have two more >>>>> attributes, namely friend1 and friend2, as long as FriendModule1 and >>>>> FriendModule2 are found in the same ModuleManager as this module. Note >>>>> that this is actually contrary to the concept of friend class in C++. >>>>> >>>>> With this in hand, I made TaskManager friends of WorkerTaskControls, >>>>> TaskScheduler, and TaskSyncModule (a new module created by me) so that >>>>> the latter three can access the TaskManager directly. >>>>> >>>>> >>>>> >>>>> >>>> I'd like to avoid tightly coupling modules but you're right that it does >>>> become clumsy to deal with signals even if they are executed >>>> synchronously. This is ok, provided we keep tight control over which >>>> modules are allowed to do this. >>>> >>>> >>>> >>> Yeah, tight coupling was my concern about this change too. But I >>> didn't find a better solution. The main reason is, I guess, >>> TaskManager naturally has tight relationship with the master and >>> workers, but a single shared "registry" attribute is not enough to >>> reflect this relationship. >>> >>> >>> >>>>> 3) Synchronization between A and B may take more than one round of >>>>> message passing, depending on the synchronization algorithm used. To >>>>> flexibly support various synchronization algorithms, I defined two >>>>> interfaces in the TaskPackage class: >>>>> >>>>> # TaskPackage.active_sync(self, response, phase=1) >>>>> Generates a sync request, and perform certain actions to update the >>>>> local task package to make it the same as a remote package. The >>>>> "response" parameter is the sync response generated by that remote >>>>> task package (see below). "phase" indicates the phase which the sync >>>>> process is in. Obviously, when phase is 1, response should be passed >>>>> as None. >>>>> >>>>> The return value of this method is a tuple containing the request data >>>>> and a boolean flag, which indicates whether this task package expects >>>>> a response from the remote side. >>>>> >>>>> # TaskPackage.passive_sync(self, request, phase=1) >>>>> Generates a sync response according to the received sync request. The >>>>> "request" and "phase" parameters have similar meanings to those in >>>>> active_sync(). >>>>> >>>>> Subclasses can derive from TaskPackage and implement their own >>>>> active_sync() and passive_sync() methods to support other >>>>> synchronization mechanisms. >>>>> >>>>> Take the simplistic tar.gz synchronization algorithm for example. The >>>>> flow of a successful synchronization is: >>>>> >>>>> 1. WorkerTaskControls detects a task is out-dated, so it calls >>>>> TaskManager.active_sync() with task_key specified. >>>>> 2. TaskManager finds the task package which the task belongs to >>>>> according to the task_key. >>>>> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1). >>>>> 4. In phase 1 of active_sync, TaskPackage simply returns its current >>>>> version, and indicates that it expects a further response from the >>>>> master. >>>>> 5. WorkerTaskControls sends the sync request generated by >>>>> active_sync() to the master >>>>> 6. TaskSyncModule at the master side receives the sync request, and >>>>> then invokes TaskPackage.passive_sync(request, 1). >>>>> 7. In TaskPackage.passive_sync(), the latest content of the task >>>>> package is compressed in tar.gz format, serialized, and returned as a >>>>> string. >>>>> 8. WorkerTaskControls receives the response from the master and >>>>> subsequently calls TaskPackage.active_sync(response, 2),which will >>>>> decompress the data and result in an update-to-date task package. >>>>> >>>>> >>>>> >>>>> >>>>> >>>> I have some issues with this workflow. >>>> >>>> 1) why use multiple call with a state variable "phase"? Why not use >>>> specific function calls? Are there large sections of reused code that >>>> can't be methods? >>>> >>>> >>> In my simplistic "tar.gz" scheme, there are 2 phases at the sync >>> requester side, and 1 phase at the sync server side. That is: >>> >>> phase 1 at the worker side: worker ---sends local version--> master >>> phase 1 at the master side: master (phase 1)---sends compressed data--> worker >>> phase 2 at the worker side: worker decompresses the data >>> >>> but other synchronization schemes, which may be based on delta >>> encoding, could require more phases to do a synchronization. I cannot >>> offer a set of versatile interfaces for that. >>> >>> >>> >> I was just saying that phase 1 and two seem like distinct calls: >> * node.request_sync() calls master.request_sync() >> * master.request_sync() calls node.receive_sync() with data >> >> it shouldn't matter whether node.receive_sync() is receiving a whole >> tarball, deltas, etc. But i don't see the code for this in your >> repository so i couldnt tell for sure if there was more to it. >> >> > > You were right. active_sync() and passive_sync() are supposed to be > implemented in a way like this: > > if phase = 1: > # phase 1 actions > elif phase = 2: > # phase 2 actions. > .... # if there are more phases > > packaging.TaskPackage.active_sync() and > packaging.TaskPackage.passive_sync() contain a simple implementation > for tarball-style synchroniztaion. A bsdiff-based BsdiffTaskPackage > class derived from TaskPackage is also added, but hasn't been > implemented yet. > > >>>> 2) It should actually be synchronizing with the Node since Node is >>>> responsible for anything that affects all of its workers. Also once we >>>> have refactored the Node-Worker relationship (see above) worker will not >>>> always be around for synchronization. >>>> >>>> >>>> >>> Oh, I had doubts about this when I first started working on task sync. >>> Sorry that I later forgot it. >>> >>> >>> >>>> Since this doesn't exist yet you can just mock up a run_task() method on >>>> the Node that just tries to lookup the requested task from the TaskManager. >>>> >>>> >>>> >>> Simply mocking up the run_task() method is not enough. Now the >>> scheduler directly calls the remote interfaces of workers. Seems like >>> we would change the way that the scheduler works and maintain node >>> references on the master, which would result in lots of changes. Is >>> that acceptable for now? >>> >>> >>> >> Correct. refactoring the node-worker relationship will mean that >> scheduler interacts with the node. For many things the node will act as >> a proxy to the worker. The node will have all the remotes (run_task, >> send_results, etc) that worker currently has, but they will be mostly >> proxies to the worker. >> >> perhaps this is a better solution: >> write the module TaskSyncClient and include it as part of the >> Worker. When we move these functions over to the Node we can move the >> TaskSyncClient as well. Node will eventually have a NodeTaskControls >> module that has the same set of listeners, signals, and remotes that >> WorkerTaskControls has. It should be fairly transparent and an easy switch. >> > > Okay. I'll do this. > > >>>> 3) To play nice with the sandbox TaskPackage can't be responsible for >>>> sending sync messages or packaging itself. There should be a >>>> TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the >>>> entire workflow on both ends. >>>> >>>> >>> I totally agree. Actually it is almost my current design. A >>> TaskSyncModule at the master side handles sync requests. I put the >>> sync requesting functionality in WorkerTaskControls without defining a >>> new module. >>> >>> >>> >>>> this is more what i envisioned: >>>> >>>> 1) When a task is requested from TaskManager this should be an >>>> asynchronous call >>>> 1a) checks for if sync is in progress for that task because there >>>> might be multiple workunits fired at the same time. >>>> >>>> >>> Good point! I did not consider this :( >>> >>> >>> >>>> 1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal >>>> 1c) deferred created and registered for the requested task key. >>>> 2) TaskSyncClient, or other task management modules (ie. directory >>>> scanner) will listen for the signal and attempt to find the task. >>>> 3) If found and or transfered >>>> 3a) the module calls ADD_TASK which unpacks it and loads it as an >>>> available task. >>>> 4) The task manager fires any callbacks registered for that task (there >>>> might be multiple workunits) >>>> 5) callbacks would cause workers to be spawned and run the task. >>>> >>>> >>> In fact I'm planning to adopt this asynchronous design. There are two >>> reasons that drives me to do this: 1) task synchronization may be >>> time-consuming, especially in case of a "cold start"; 2) twisted's >>> async nature :-) >>> >>> There is however an embarrassing question: since the pencil-down date >>> is approaching, I guess I won't be able to finish this before that >>> date. Can I continue working on it after gsoc? >>> >>> >>> >> Absolutely. I'd really like for you to remain a part of the community >> after GSOC ends. Even though you haven't completed all the code in your >> proposal, you've added a lot of value to the project. We've worked >> through the logic of some very tough problems that on my own I probably >> wouldn't have gotten right. >> > > Great. Thanks! > > >>>> TaskManager should handle packaging/unpacking functions to separate that >>>> from synchronization. This will allow task packages to be added to the >>>> filesystem and unpacked by a different module. >>>> >>>> >>>> >>> Could you explain more on this? I'm sorry I didn't understand it quite well. >>> >>> >>> >> We'll also want another components to be able to unpack tasks from other >> sources: >> >> 1) tarballs or source dropped directly into task_cache >> 2) tarballs uploaded to the website >> >> That means the code for unpacking the code must be in a general place. >> TaskManager makes the most sense as it handles loading tasks and >> maintaining dictionaries of available tasks. >> > > ok, I get it. It won't be hard to do this. > > >>>>> Now sync requests and responses are all represented in strings. >>>>> Ideally, we will use Producers and Consumers to achieve better >>>>> performance, as we discussed. >>>>> >>>>> >>>>> Hope I expressed myself clearly. Any comments are appreciated. >>>>> >>>>> >>>>> > > > >
Hi, I just pushed some changes to my public repo. I managed to add preliminary support for keeping multiple versions of a task package. There are now two folders holding the task code, namely tasks_cache and tasks_cache_internal. The former is publicly known and is for deployment usage; the latter is used by TaskManager internally and is thus hidden from the outside world. tasks_cache always contains the latest code. We can either drop files to this folder or put contents into it with certain API (not available yet). TaskManager keeps monitoring tasks_cache, and if it notices updates, copies the latest task code into tasks_cache_internal, where it places the code in a subdirectory with the SHA1 hash of the code as the directory's name. I've performed a simple test against this new feature. I put a modified task package while running an older version of the package. This resulted in two different task packages in tasks_cache_internal. There is currently no cleanup mechanism yet. That is, once a task package is created in tasks_cache_internal, there is no automatic way to remove it after it expires. This issue will be resolved after we let the scheduler emit TASK_STARTED and TASK_STOPPED signals and handle these signals in TaskManager.
looks great.
I'm excited to get this merged in. I hope to get enough of the
remaining node refactor code finished tonight that i can merge it back
into master. I'll be merging before all the fault tolerance is added
because I want to consolidate all the branches (sync, node-refactor, dist).
-PeterYin QIU wrote:
> Hi,
>
> I just pushed some changes to my public repo. I managed to add
> preliminary support for keeping multiple versions of a task package.
>
> There are now two folders holding the task code, namely tasks_cache
> and tasks_cache_internal. The former is publicly known and is for
> deployment usage; the latter is used by TaskManager internally and is
> thus hidden from the outside world.
>
> tasks_cache always contains the latest code. We can either drop files
> to this folder or put contents into it with certain API (not available
> yet). TaskManager keeps monitoring tasks_cache, and if it notices
> updates, copies the latest task code into tasks_cache_internal, where
> it places the code in a subdirectory with the SHA1 hash of the code as
> the directory's name.
>
> I've performed a simple test against this new feature. I put a
> modified task package while running an older version of the package.
> This resulted in two different task packages in tasks_cache_internal.
>
> There is currently no cleanup mechanism yet. That is, once a task
> package is created in tasks_cache_internal, there is no automatic way
> to remove it after it expires. This issue will be resolved after we
> let the scheduler emit TASK_STARTED and TASK_STOPPED signals and
> handle these signals in TaskManager.
>
>
>
The TaskSync code has been merged into master. I haven't thoroughly
tested it yet, but the basic functionality works. I only encountered
minor issues:
* it was including .pyc files in the hashes so there were
mismatches when loading compiled code
* run_task and _run_task signatures needed to be merged by hand to
match what changes I had made.
- PeterYin QIU wrote:
> Hi,
>
> I just pushed some changes to my public repo. I managed to add
> preliminary support for keeping multiple versions of a task package.
>
> There are now two folders holding the task code, namely tasks_cache
> and tasks_cache_internal. The former is publicly known and is for
> deployment usage; the latter is used by TaskManager internally and is
> thus hidden from the outside world.
>
> tasks_cache always contains the latest code. We can either drop files
> to this folder or put contents into it with certain API (not available
> yet). TaskManager keeps monitoring tasks_cache, and if it notices
> updates, copies the latest task code into tasks_cache_internal, where
> it places the code in a subdirectory with the SHA1 hash of the code as
> the directory's name.
>
> I've performed a simple test against this new feature. I put a
> modified task package while running an older version of the package.
> This resulted in two different task packages in tasks_cache_internal.
>
> There is currently no cleanup mechanism yet. That is, once a task
> package is created in tasks_cache_internal, there is no automatic way
> to remove it after it expires. This issue will be resolved after we
> let the scheduler emit TASK_STARTED and TASK_STOPPED signals and
> handle these signals in TaskManager.
>
>
>
On Sat, Aug 29, 2009 at 2:38 PM, Peter Krenesky< wrote: > The TaskSync code has been merged into master. ?I haven't thoroughly > tested it yet, but the basic functionality works. ?I only encountered > minor issues: > ? ? * it was including .pyc files in the hashes so there were > mismatches when loading compiled codeI intentionally avoided examining only .py files in computing the hash. Because I thought not all the files in a task package were .py files - there might be dynamic libraries, configuration files, etc. Of course, we can explicitly exclude .pyc files.> ? ? * run_task and _run_task signatures needed to be merged by hand to > match what changes I had made. > > - Peter > > Yin QIU wrote: >> Hi, >> >> I just pushed some changes to my public repo. I managed to add >> preliminary support for keeping multiple versions of a task package. >> >> There are now two folders holding the task code, namely tasks_cache >> and tasks_cache_internal. The former is publicly known and is for >> deployment usage; the latter is used by TaskManager internally and is >> thus hidden from the outside world. >> >> tasks_cache always contains the latest code. We can either drop files >> to this folder or put contents into it with certain API (not available >> yet). TaskManager keeps monitoring tasks_cache, and if it notices >> updates, copies the latest task code into tasks_cache_internal, where >> it places the code in a subdirectory with the SHA1 hash of the code as >> the directory's name. >> >> I've performed a simple test against this new feature. I put a >> modified task package while running an older version of the package. >> This resulted in two different task packages in tasks_cache_internal. >> >> There is currently no cleanup mechanism yet. That is, once a task >> package is created in tasks_cache_internal, there is no automatic way >> to remove it after it expires. This issue will be resolved after we >> let the scheduler emit TASK_STARTED and TASK_STOPPED signals and >> handle these signals in TaskManager. >> >> >> > >
Yin QIU wrote: > On Sat, Aug 29, 2009 at 2:38 PM, Peter Krenesky< wrote: > >> The TaskSync code has been merged into master. I haven't thoroughly >> tested it yet, but the basic functionality works. I only encountered >> minor issues: >> * it was including .pyc files in the hashes so there were >> mismatches when loading compiled code >> > > I intentionally avoided examining only .py files in computing the > hash. Because I thought not all the files in a task package were .py > files - there might be dynamic libraries, configuration files, etc. Of > course, we can explicitly exclude .pyc files. > >and thats the right thing to do. there shouldn't be any dynamic libraries within the package anyways. We can't avoid .pyc files because that is where the python runtime puts them. we could avoid this in a more generic way by just using the directory name as the hash instead of computing it every time. If users edit the internal cache manually its likely it will break anyways. the only thing we lose is the ability to determine if the user edited it locally and tell them not to do it. again. I'm also considering whether we even need task_cache on Nodes, or just task_cache_internal. Really you shouldn't be manually placing files there when there is no way to disable task synchronization. We have the issue of large packages that might make someone want to avoid the sync client, but that should eventually be solved by using a consumer/producer elements of twisted to transfer the data more effectively.>> * run_task and _run_task signatures needed to be merged by hand to >> match what changes I had made. >> >> - Peter >> >> Yin QIU wrote: >> >>> Hi, >>> >>> I just pushed some changes to my public repo. I managed to add >>> preliminary support for keeping multiple versions of a task package. >>> >>> There are now two folders holding the task code, namely tasks_cache >>> and tasks_cache_internal. The former is publicly known and is for >>> deployment usage; the latter is used by TaskManager internally and is >>> thus hidden from the outside world. >>> >>> tasks_cache always contains the latest code. We can either drop files >>> to this folder or put contents into it with certain API (not available >>> yet). TaskManager keeps monitoring tasks_cache, and if it notices >>> updates, copies the latest task code into tasks_cache_internal, where >>> it places the code in a subdirectory with the SHA1 hash of the code as >>> the directory's name. >>> >>> I've performed a simple test against this new feature. I put a >>> modified task package while running an older version of the package. >>> This resulted in two different task packages in tasks_cache_internal. >>> >>> There is currently no cleanup mechanism yet. That is, once a task >>> package is created in tasks_cache_internal, there is no automatic way >>> to remove it after it expires. This issue will be resolved after we >>> let the scheduler emit TASK_STARTED and TASK_STOPPED signals and >>> handle these signals in TaskManager. >>> >>> >>> >>> >> > > > >
On Sun, Aug 30, 2009 at 12:15 AM, Peter Krenesky< wrote: > Yin QIU wrote: >> On Sat, Aug 29, 2009 at 2:38 PM, Peter Krenesky< wrote: >> >>> The TaskSync code has been merged into master. ?I haven't thoroughly >>> tested it yet, but the basic functionality works. ?I only encountered >>> minor issues: >>> ? ? * it was including .pyc files in the hashes so there were >>> mismatches when loading compiled code >>> >> >> I intentionally avoided examining only .py files in computing the >> hash. Because I thought not all the files in a task package were .py >> files - there might be dynamic libraries, configuration files, etc. Of >> course, we can explicitly exclude .pyc files. >> >> > and thats the right thing to do. ?there shouldn't be any dynamic > libraries within the package anyways. ?We can't avoid .pyc files because > that is where the python runtime puts them. > > we could avoid this in a more generic way by just using the directory > name as the hash instead of computing it every time. ?If users edit the > internal cache manually its likely it will break anyways. ?the only > thing we lose is the ability to determine if the user edited it locally > and tell them not to do it. again. >If we hash directory names only, we won't be able to tell if a task package has code modifications if those modifications change neither the directory names nor the directory structure. So I think it would be better to keep computing hashes from the contents while excluding certain types of files, which can be configured flexibly, e.g., in settings.py.> I'm also considering whether we even need task_cache on Nodes, or just > task_cache_internal. ?Really you shouldn't be manually placing files > there when there is no way to disable task synchronization. ?We have the > issue of large packages that might make someone want to avoid the sync > client, but that should eventually be solved by using a > consumer/producer elements of twisted to transfer the data more effectively. >Currently TaskManager's at the master and the nodes are in fact identical. But now the situation is: we don't use the master's TaskManager to request synchronization and we don't use nodes' TaskManager's to handle sync requests. That is, we actually made a distinction between the two kinds of TaskManager's. So task_cache on nodes seems to be unnecessary. We can of course implement a mechanism to disable task synchronization. On the other hand, we can leave this alone, and have the opportunity to implement a new feature that enables P2P-style synchronization. For example, in a large cluster, we may update the task package on an arbitrary node, and expect other node, perhaps including the master, to sync with this node. I think this would greatly ease the maintenance of a cluster. But certainly this feature is far from our current project goal, and is hence just an imagination right now :-)>>> ? ? * run_task and _run_task signatures needed to be merged by hand to >>> match what changes I had made. >>> >>> - Peter >>> >>> Yin QIU wrote: >>> >>>> Hi, >>>> >>>> I just pushed some changes to my public repo. I managed to add >>>> preliminary support for keeping multiple versions of a task package. >>>> >>>> There are now two folders holding the task code, namely tasks_cache >>>> and tasks_cache_internal. The former is publicly known and is for >>>> deployment usage; the latter is used by TaskManager internally and is >>>> thus hidden from the outside world. >>>> >>>> tasks_cache always contains the latest code. We can either drop files >>>> to this folder or put contents into it with certain API (not available >>>> yet). TaskManager keeps monitoring tasks_cache, and if it notices >>>> updates, copies the latest task code into tasks_cache_internal, where >>>> it places the code in a subdirectory with the SHA1 hash of the code as >>>> the directory's name. >>>> >>>> I've performed a simple test against this new feature. I put a >>>> modified task package while running an older version of the package. >>>> This resulted in two different task packages in tasks_cache_internal. >>>> >>>> There is currently no cleanup mechanism yet. That is, once a task >>>> package is created in tasks_cache_internal, there is no automatic way >>>> to remove it after it expires. This issue will be resolved after we >>>> let the scheduler emit TASK_STARTED and TASK_STOPPED signals and >>>> handle these signals in TaskManager. >>>> >>>> >>>> >>>> >>> >> >> >> >> > > _______________________________________________ > Pydra mailing list > Pydra at osuosl.org > http://lists.osuosl.org/mailman/listinfo/pydr... >
Yin QIU wrote: > On Sun, Aug 30, 2009 at 12:15 AM, Peter Krenesky< wrote: > >> Yin QIU wrote: >> >>> On Sat, Aug 29, 2009 at 2:38 PM, Peter Krenesky< wrote: >>> >>> >>>> The TaskSync code has been merged into master. I haven't thoroughly >>>> tested it yet, but the basic functionality works. I only encountered >>>> minor issues: >>>> * it was including .pyc files in the hashes so there were >>>> mismatches when loading compiled code >>>> >>>> >>> I intentionally avoided examining only .py files in computing the >>> hash. Because I thought not all the files in a task package were .py >>> files - there might be dynamic libraries, configuration files, etc. Of >>> course, we can explicitly exclude .pyc files. >>> >>> >>> >> and thats the right thing to do. there shouldn't be any dynamic >> libraries within the package anyways. We can't avoid .pyc files because >> that is where the python runtime puts them. >> >> we could avoid this in a more generic way by just using the directory >> name as the hash instead of computing it every time. If users edit the >> internal cache manually its likely it will break anyways. the only >> thing we lose is the ability to determine if the user edited it locally >> and tell them not to do it. again. >> >> > > If we hash directory names only, we won't be able to tell if a task > package has code modifications if those modifications change neither > the directory names nor the directory structure. So I think it would > be better to keep computing hashes from the contents while excluding > certain types of files, which can be configured flexibly, e.g., in > settings.py. > >I wasn't suggesting hashing names only. I was suggesting that we shouldn't ever touch the INTERNAL_CACHE If you modify *anything* in INTERNAL_CACHE the task_manager will treat the files in TASK_CACHE as new files. Once we implement automatic cleanup of old versions, it would also remove your modified version. This would not be a bug, that is how it should work. Since we can't manually do anything to INTERNAL_CACHE there is no reason to require it to be hashed repeatedly. It should be expected that if you modify anything in INTERNAL_CACHE bad things will happen and it *will* break. TASK_CACHE of course still needs to be hashed every time to check for changes.>> I'm also considering whether we even need task_cache on Nodes, or just >> task_cache_internal. Really you shouldn't be manually placing files >> there when there is no way to disable task synchronization. We have the >> issue of large packages that might make someone want to avoid the sync >> client, but that should eventually be solved by using a >> consumer/producer elements of twisted to transfer the data more effectively. >> >> > > Currently TaskManager's at the master and the nodes are in fact > identical. But now the situation is: we don't use the master's > TaskManager to request synchronization and we don't use nodes' > TaskManager's to handle sync requests. That is, we actually made a > distinction between the two kinds of TaskManager's. So task_cache on > nodes seems to be unnecessary. > > We can of course implement a mechanism to disable task > synchronization. On the other hand, we can leave this alone, and have > the opportunity to implement a new feature that enables P2P-style > synchronization. For example, in a large cluster, we may update the > task package on an arbitrary node, and expect other node, perhaps > including the master, to sync with this node. I think this would > greatly ease the maintenance of a cluster. But certainly this feature > is far from our current project goal, and is hence just an imagination > right now :-) > >Yeah P2P style distribution might be worthwhile. I've been pondering the idea of P2P style communication in general since we have overhead for Nodes always talking through the master. P2P may end up being the defining feature of 2.0 I think that we can make synchronization more efficient. We currently synchronize the TASK_CACHE folder which TaskManager reads and processes tasks from. We repeat this for every node that the code is deployed to. It would be faster to synchronize INTERNAL_CACHE. The files there have already been processed, you just need to deploy them. These changes aren't that high on priority right now. We have something that works, I'm just thinking about what the next step is for it.>>>> * run_task and _run_task signatures needed to be merged by hand to >>>> match what changes I had made. >>>> >>>> - Peter >>>> >>>> Yin QIU wrote: >>>> >>>> >>>>> Hi, >>>>> >>>>> I just pushed some changes to my public repo. I managed to add >>>>> preliminary support for keeping multiple versions of a task package. >>>>> >>>>> There are now two folders holding the task code, namely tasks_cache >>>>> and tasks_cache_internal. The former is publicly known and is for >>>>> deployment usage; the latter is used by TaskManager internally and is >>>>> thus hidden from the outside world. >>>>> >>>>> tasks_cache always contains the latest code. We can either drop files >>>>> to this folder or put contents into it with certain API (not available >>>>> yet). TaskManager keeps monitoring tasks_cache, and if it notices >>>>> updates, copies the latest task code into tasks_cache_internal, where >>>>> it places the code in a subdirectory with the SHA1 hash of the code as >>>>> the directory's name. >>>>> >>>>> I've performed a simple test against this new feature. I put a >>>>> modified task package while running an older version of the package. >>>>> This resulted in two different task packages in tasks_cache_internal. >>>>> >>>>> There is currently no cleanup mechanism yet. That is, once a task >>>>> package is created in tasks_cache_internal, there is no automatic way >>>>> to remove it after it expires. This issue will be resolved after we >>>>> let the scheduler emit TASK_STARTED and TASK_STOPPED signals and >>>>> handle these signals in TaskManager. >>>>> >>>>> >>>>> >>>>> >>>>> >>> >>> >>> >> _______________________________________________ >> Pydra mailing list >> Pydra at osuosl.org >> http://lists.osuosl.org/mailman/listinfo/pydr... >> >> > > > >