router – Router

Routes messages to workers (that are in named queues).

class eventmq.router.Router(*args, **kwargs)

A simple router of messages

add_scheduler(scheduler_id)

Adds a scheduler to the queue to receive SCHEDULE commands

Parameters:scheduler_id (str) – unique id of the scheduler to add
add_worker(worker_id, queues=None)

Adds a worker to worker queues

Parameters:
  • worker_id (str) – unique id of the worker to add
  • queues – queue or queues this worker should be a member of
clean_up_dead_schedulers()

Loops through the list of schedulers and remove any schedulers who the router hasn’t received a heartbeat in HEARTBEAT_TIMEOUT

clean_up_dead_workers()

Loops through the worker queues and removes any workers who haven’t responded in HEARTBEAT_TIMEOUT

get_available_worker(queue_name='default')

Gets the job manager with the next available worker for the provided queue.

Parameters:

queue_name (str) – Name of the queue

Raises:
  • NoAvailableWorkerSlotsError – Raised when there are no available
  • slots in any the job managers.
  • UnknownQueueError – Raised when queue_name is not found in self.queues
Returns:

uuid of the job manager with an available worker slot

Return type:

(str)

get_status()
Return
(str) Serialized information about the current state of the router.
job_latencies = None

Latency tracking dictionary Key: msgid of msg each REQUEST received and forwarded to a worker Value: (timestamp, queue_name)

on_disconnect(msgid, msg)

Prepare router for disconnecting by removing schedulers, clearing worker queue (if needed), and removing workers.

on_heartbeat(sender, msgid, msg)

a placeholder for a no-op command. The actual ‘logic’ for HEARTBEAT is in self.process_worker_message() because any message from a worker counts as a HEARTBEAT

on_inform(sender, msgid, msg)

Handles an INFORM message. This happens when new worker coming online and announces itself.

on_ready(sender, msgid, msg)

A worker that we should already know about is ready for another job

Parameters:
  • sender (str) – The id of the sender
  • msgid (str) – Unique identifier for this message
  • msg – The actual message that was sent
on_reply(sender, msgid, msg)

Handles an REPLY message. Replies are sent by the worker for latanecy measurements

on_request(sender, msgid, msg, depth=1)

Process a client REQUEST frame

Parameters:
  • sender
  • msgid
  • msgid
  • depth (int) – The recusion depth in retrying when PeerGoneAwayError is raised.
classmethod prioritize_queue_list(unprioritized_iterable)

Prioritize a given iterable in the format: ((PRIORITY, OBJ),..)

Parameters:unprioritized_iterable (iter) – Any list, tuple, etc where the 0-index key is an integer to use as priority. Largest numbers come first.
Raises:IndexError - There was no 0-index element.
Returns:decsending order list. E.g. ((20, ‘a’), (14, ‘b’), (12, ‘c’))
process_client_message(original_msg, depth=0)
Parameters:
  • msg – The untouched message from zmq
  • depth – The number of times this method has been recursively called. This is used to short circuit message retry attempts.
Raises:

InvalidMessageError – Unable to parse the message

process_worker_message(msg)

This method is called when a message comes in from the worker socket. It then calls on_COMMAND.lower(). If on_command isn’t found, then a warning is created.

Parameters:msg – The untouched message from zmq
queues = None

JobManager address by queue name. The lists here are Last Recently Used queues where a worker is popped off when given a job, and appeneded when one finishes. There is one entry per available worker slot, so you may see duplicate addresses.

Example

{‘default’: [‘w1’, ‘w2’, ‘w1’, ‘w4’]}

received_disconnect = None

Excecuted function tracking dictionary Key: msgid of msg each REQUEST received and forwarded to a worker Value: (function_name, queue_name) Set to True when the router should die.

requeue_worker(worker_id)

Add a worker back to the pools for which it is a member of.

reset_heartbeat_counters()

Reset all the counters for heartbeats back to 0

router_main()

Kick off router with logging and settings import

scheduler_queue = None

Queue for schedulers to use

schedulers = None

Scheduler clients. Clients are able to send SCHEDULE commands that need to be routed to a scheduler, which will keep track of time and run the job. Contains dictionaries:

self.schedulers[<scheduler_zmq_id>] = {
‘hb’: <last_recv_heartbeat>,

}

send_ack(socket, recipient, msgid)

Sends an ACK response

Parameters:
  • socket (socket) – The socket to use for this ack
  • recipient (str) – The recipient id for the ack
  • msgid – The unique id that we are acknowledging
Returns:

The ID of the ACK message

Return type:

msgid

send_heartbeat(socket, recipient)

Custom send heartbeat method to take into account the recipient that is needed when building messages

Parameters:
  • socket (socket) – the socket to send the heartbeat with
  • recipient (str) – Worker I
Returns:

The ID of the HEARTBEAT message

Return type:

msgid

send_schedulers_heartbeats()

Send HEARTBEATs to all registered schedulers

send_workers_heartbeats()

Send HEARTBEATs to all registered workers.

sighup_handler(signum, frame)

Reloads the configuration and rebinds the ports. Exectued when the process receives a SIGHUP from the system.

start(frontend_addr='tcp://127.0.0.1:47291', backend_addr='tcp://127.0.0.1:47290', administrative_addr='tcp://127.0.0.1:47293')

Begin listening for connections on the provided connection strings

Parameters:
  • frontend_addr (str) – connection string to listen for requests
  • backend_addr (str) – connection string to listen for workers
  • administrative_addr (str) – connection string to listen for emq-cli commands on.
waiting_messages = None

Message buffer. When messages can’t be sent because there are no workers available to take the job

workers = None

List of queues by workers. Meta data about the worker such as the queue memebership and timestamp of last message received are stored here.

Keys
  • queues: list() of queue names and prioritiess the worker belongs to. e.g. (10, ‘default’)
  • hb: monotonic timestamp of the last received message from worker
  • available_slots: int count of jobs this manager can still process.