jobmanager
– Job Manager¶
Ensures things about jobs and spawns the actual tasks
-
class
eventmq.jobmanager.
JobManager
(*args, **kwargs)¶ The exposed portion of the worker. The job manager’s main responsibility is to manage the resources on the server it’s running.
This job manager uses multiprocessing Queues
-
__init__
(*args, **kwargs)¶ Note
All args are optional unless otherwise noted.
Parameters:
-
check_worker_health
()¶ Checks for any dead processes in the pool and recreates them if necessary
-
concurrent_jobs
= None¶ keep track of workers
-
handle_response
(resp)¶ Handles a response from a worker process to the jobmanager
Parameters: resp (dict) – Must contain a key ‘callback’ with the desired callback function as a string, i.e. ‘worker_done’ which is then called
Sample Input resp = {
‘callback’: ‘worker_done’, (str) ‘msgid’: ‘some_uuid’, (str) ‘return’: ‘return value’, (dict) ‘pid’: ‘pid_of_worker_process’ (int)}
return_value must be a dictionary that can be json serialized and formatted like:
- {
- “value”: ‘return value of job goes here’
}
if the ‘return’ value of resp is ‘DEATH’, the worker died so we clean that up as well
-
jobmanager_main
(broker_addr=None)¶ Kick off jobmanager with logging and settings import
Parameters: broker_addr (str) – The address of the broker to connect to.
-
jobs_in_flight
= None¶ Stats and monitoring information Jobs in flight tracks all jobs currently executing. Key: msgid, Value: The message with all the details of the job
-
name
= None¶ Define the name of this JobManager instance. Useful to know when referring to the logs.
-
on_heartbeat
(msgid, message)¶ a placeholder for a noop command. The actual ‘logic’ for HEARTBEAT is in
self.process_message()
as every message is counted as a HEARTBEAT
-
on_request
(msgid, msg)¶ Handles a REQUEST command
Messages are formatted like this: [subcmd(str), {
...options...}]
- Subcommands:
- run - run some callable. Options:
- {
‘callable’: func or method name (eg. walk), ‘path’: module path (eg. os.path), ‘args’: (optional) list of args, ‘kwargs’: (optional) dict of kwargs, ‘class_args’: (optional) list of args for class
instantiation,‘class_kwargs’: (optional) dict of kwargs for class,
}
-
outgoing
= None¶ JobManager starts out by INFORMing the router of it’s existence, then telling the router that it is READY. The reply will be the unit of work.
-
pid_distribution
= None¶ Keep track of what pids are servicing our requests Key: pid, Value: # of jobs completed on the process with that pid
-
premature_death
(reply, msgid)¶ Worker died before running any jobs
-
queues
= None¶ List of queues that this job manager is listening on
-
send_ready
()¶ send the READY command upstream to indicate that JobManager is ready for another REQUEST message.
-
send_reply
(reply, msgid)¶ Sends an REPLY response
Parameters: - reply – Message to send as the reply
- msgid – The unique id that we are acknowledging
-
total_ready_sent
= None¶ Running total number of READY messages sent to the broker
-
total_requests
= None¶ Running total number of REQUEST messages received on the broker
-
worker_death
(reply, msgid, death, pid)¶ Worker died of natural causes, ensure its death and remove from tracking, will be replaced on next heartbeat
-
worker_done
(reply, msgid, death, pid)¶ Worker finished a job, notify broker of an additional slot opening
-
worker_done_with_reply
(reply, msgid, death, pid)¶ Worker finished a job and requested the return value
-