jobmanager – Job Manager

Ensures things about jobs and spawns the actual tasks

class eventmq.jobmanager.JobManager(*args, **kwargs)

The exposed portion of the worker. The job manager’s main responsibility is to manage the resources on the server it’s running.

This job manager uses multiprocessing Queues

__init__(*args, **kwargs)

Note

All args are optional unless otherwise noted.

Parameters:
  • name (str) – unique name of this instance. By default a uuid will be generated.
  • queues (tuple) – List of queue names to listen on.
  • skip_signal (bool) – Don’t register the signal handlers. Useful for testing.
check_worker_health()

Checks for any dead processes in the pool and recreates them if necessary

concurrent_jobs = None

keep track of workers

handle_response(resp)

Handles a response from a worker process to the jobmanager

Parameters:resp (dict) – Must contain a key ‘callback’ with the desired callback

function as a string, i.e. ‘worker_done’ which is then called

Sample Input resp = {

‘callback’: ‘worker_done’, (str) ‘msgid’: ‘some_uuid’, (str) ‘return’: ‘return value’, (dict) ‘pid’: ‘pid_of_worker_process’ (int)

}

return_value must be a dictionary that can be json serialized and formatted like:

{
“value”: ‘return value of job goes here’

}

if the ‘return’ value of resp is ‘DEATH’, the worker died so we clean that up as well

jobmanager_main(broker_addr=None)

Kick off jobmanager with logging and settings import

Parameters:broker_addr (str) – The address of the broker to connect to.
jobs_in_flight = None

Stats and monitoring information Jobs in flight tracks all jobs currently executing. Key: msgid, Value: The message with all the details of the job

name = None

Define the name of this JobManager instance. Useful to know when referring to the logs.

on_heartbeat(msgid, message)

a placeholder for a noop command. The actual ‘logic’ for HEARTBEAT is in self.process_message() as every message is counted as a HEARTBEAT

on_request(msgid, msg)

Handles a REQUEST command

Messages are formatted like this: [subcmd(str), {

...options...

}]

Subcommands:
run - run some callable. Options:
{

‘callable’: func or method name (eg. walk), ‘path’: module path (eg. os.path), ‘args’: (optional) list of args, ‘kwargs’: (optional) dict of kwargs, ‘class_args’: (optional) list of args for class

instantiation,

‘class_kwargs’: (optional) dict of kwargs for class,

}

outgoing = None

JobManager starts out by INFORMing the router of it’s existence, then telling the router that it is READY. The reply will be the unit of work.

pid_distribution = None

Keep track of what pids are servicing our requests Key: pid, Value: # of jobs completed on the process with that pid

premature_death(reply, msgid)

Worker died before running any jobs

queues = None

List of queues that this job manager is listening on

send_ready()

send the READY command upstream to indicate that JobManager is ready for another REQUEST message.

send_reply(reply, msgid)

Sends an REPLY response

Parameters:
  • reply – Message to send as the reply
  • msgid – The unique id that we are acknowledging
total_ready_sent = None

Running total number of READY messages sent to the broker

total_requests = None

Running total number of REQUEST messages received on the broker

worker_death(reply, msgid, death, pid)

Worker died of natural causes, ensure its death and remove from tracking, will be replaced on next heartbeat

worker_done(reply, msgid, death, pid)

Worker finished a job, notify broker of an additional slot opening

worker_done_with_reply(reply, msgid, death, pid)

Worker finished a job and requested the return value