jobmanager – Job Manager

Ensures things about jobs and spawns the actual tasks

class eventmq.jobmanager.JobManager(*args, **kwargs)

The exposed portion of the worker. The job manager’s main responsibility is to manage the resources on the server it’s running.

This job manager uses multiprocessing Queues

__init__(*args, **kwargs)


All args are optional unless otherwise noted.

  • name (str) – unique name of this instance. By default a uuid will be generated.
  • queues (tuple) – List of queue names to listen on.
  • skip_signal (bool) – Don’t register the signal handlers. Useful for testing.

Checks for any dead processes in the pool and recreates them if necessary

concurrent_jobs = None

keep track of workers


Handles a response from a worker process to the jobmanager

Parameters:resp (dict) – Must contain a key ‘callback’ with the desired callback

function as a string, i.e. ‘worker_done’ which is then called

Sample Input resp = {

‘callback’: ‘worker_done’, (str) ‘msgid’: ‘some_uuid’, (str) ‘return’: ‘return value’, (dict) ‘pid’: ‘pid_of_worker_process’ (int)


return_value must be a dictionary that can be json serialized and formatted like:

“value”: ‘return value of job goes here’


if the ‘return’ value of resp is ‘DEATH’, the worker died so we clean that up as well


Kick off jobmanager with logging and settings import

Parameters:broker_addr (str) – The address of the broker to connect to.
jobs_in_flight = None

Stats and monitoring information Jobs in flight tracks all jobs currently executing. Key: msgid, Value: The message with all the details of the job

name = None

Define the name of this JobManager instance. Useful to know when referring to the logs.

on_heartbeat(msgid, message)

a placeholder for a noop command. The actual ‘logic’ for HEARTBEAT is in self.process_message() as every message is counted as a HEARTBEAT

on_request(msgid, msg)

Handles a REQUEST command

Messages are formatted like this: [subcmd(str), {



run - run some callable. Options:

‘callable’: func or method name (eg. walk), ‘path’: module path (eg. os.path), ‘args’: (optional) list of args, ‘kwargs’: (optional) dict of kwargs, ‘class_args’: (optional) list of args for class


‘class_kwargs’: (optional) dict of kwargs for class,


outgoing = None

JobManager starts out by INFORMing the router of it’s existence, then telling the router that it is READY. The reply will be the unit of work.

pid_distribution = None

Keep track of what pids are servicing our requests Key: pid, Value: # of jobs completed on the process with that pid

premature_death(reply, msgid)

Worker died before running any jobs

queues = None

List of queues that this job manager is listening on


send the READY command upstream to indicate that JobManager is ready for another REQUEST message.

send_reply(reply, msgid)

Sends an REPLY response

  • reply – Message to send as the reply
  • msgid – The unique id that we are acknowledging
total_ready_sent = None

Running total number of READY messages sent to the broker

total_requests = None

Running total number of REQUEST messages received on the broker

worker_death(reply, msgid, death, pid)

Worker died of natural causes, ensure its death and remove from tracking, will be replaced on next heartbeat

worker_done(reply, msgid, death, pid)

Worker finished a job, notify broker of an additional slot opening

worker_done_with_reply(reply, msgid, death, pid)

Worker finished a job and requested the return value