router
– Router¶
Routes messages to workers (that are in named queues).
-
class
eventmq.router.
Router
(*args, **kwargs)¶ A simple router of messages
-
add_scheduler
(scheduler_id)¶ Adds a scheduler to the queue to receive SCHEDULE commands
Parameters: scheduler_id (str) – unique id of the scheduler to add
-
add_worker
(worker_id, queues=None)¶ Adds a worker to worker queues
Parameters: - worker_id (str) – unique id of the worker to add
- queues – queue or queues this worker should be a member of
-
clean_up_dead_schedulers
()¶ Loops through the list of schedulers and remove any schedulers who the router hasn’t received a heartbeat in HEARTBEAT_TIMEOUT
-
clean_up_dead_workers
()¶ Loops through the worker queues and removes any workers who haven’t responded in HEARTBEAT_TIMEOUT
-
get_available_worker
(queue_name='default')¶ Gets the job manager with the next available worker for the provided queue.
Parameters: queue_name (str) – Name of the queue
Raises: NoAvailableWorkerSlotsError
– Raised when there are no available- slots in any the job managers.
UnknownQueueError
– Raised whenqueue_name
is not found in self.queues
Returns: uuid of the job manager with an available worker slot
Return type: (str)
-
get_status
()¶ - Return
- (str) Serialized information about the current state of the router.
-
job_latencies
= None¶ Latency tracking dictionary Key: msgid of msg each REQUEST received and forwarded to a worker Value: (timestamp, queue_name)
-
on_disconnect
(msgid, msg)¶ Prepare router for disconnecting by removing schedulers, clearing worker queue (if needed), and removing workers.
-
on_heartbeat
(sender, msgid, msg)¶ a placeholder for a no-op command. The actual ‘logic’ for HEARTBEAT is in
self.process_worker_message()
because any message from a worker counts as a HEARTBEAT
-
on_inform
(sender, msgid, msg)¶ Handles an INFORM message. This happens when new worker coming online and announces itself.
-
on_ready
(sender, msgid, msg)¶ A worker that we should already know about is ready for another job
Parameters:
-
on_reply
(sender, msgid, msg)¶ Handles an REPLY message. Replies are sent by the worker for latanecy measurements
-
on_request
(sender, msgid, msg, depth=1)¶ Process a client REQUEST frame
Parameters: - sender –
- msgid –
- msgid –
- depth (int) – The recusion depth in retrying when PeerGoneAwayError is raised.
-
classmethod
prioritize_queue_list
(unprioritized_iterable)¶ Prioritize a given iterable in the format: ((PRIORITY, OBJ),..)
Parameters: unprioritized_iterable (iter) – Any list, tuple, etc where the 0-index key is an integer to use as priority. Largest numbers come first. Raises: IndexError - There was no 0-index element. Returns: decsending order list. E.g. ((20, ‘a’), (14, ‘b’), (12, ‘c’))
-
process_client_message
(original_msg, depth=0)¶ Parameters: - msg – The untouched message from zmq
- depth – The number of times this method has been recursively called. This is used to short circuit message retry attempts.
Raises: InvalidMessageError
– Unable to parse the message
-
process_worker_message
(msg)¶ This method is called when a message comes in from the worker socket. It then calls on_COMMAND.lower(). If on_command isn’t found, then a warning is created.
Parameters: msg – The untouched message from zmq
-
queues
= None¶ JobManager address by queue name. The lists here are Last Recently Used queues where a worker is popped off when given a job, and appeneded when one finishes. There is one entry per available worker slot, so you may see duplicate addresses.
Example
{‘default’: [‘w1’, ‘w2’, ‘w1’, ‘w4’]}
-
received_disconnect
= None¶ Excecuted function tracking dictionary Key: msgid of msg each REQUEST received and forwarded to a worker Value: (function_name, queue_name) Set to True when the router should die.
-
requeue_worker
(worker_id)¶ Add a worker back to the pools for which it is a member of.
-
reset_heartbeat_counters
()¶ Reset all the counters for heartbeats back to 0
-
router_main
()¶ Kick off router with logging and settings import
-
scheduler_queue
= None¶ Queue for schedulers to use
-
schedulers
= None¶ Scheduler clients. Clients are able to send SCHEDULE commands that need to be routed to a scheduler, which will keep track of time and run the job. Contains dictionaries:
- self.schedulers[<scheduler_zmq_id>] = {
- ‘hb’: <last_recv_heartbeat>,
}
-
send_ack
(socket, recipient, msgid)¶ Sends an ACK response
Parameters: Returns: The ID of the ACK message
Return type: msgid
-
send_heartbeat
(socket, recipient)¶ Custom send heartbeat method to take into account the recipient that is needed when building messages
Parameters: Returns: The ID of the HEARTBEAT message
Return type: msgid
-
send_schedulers_heartbeats
()¶ Send HEARTBEATs to all registered schedulers
-
send_workers_heartbeats
()¶ Send HEARTBEATs to all registered workers.
-
sighup_handler
(signum, frame)¶ Reloads the configuration and rebinds the ports. Exectued when the process receives a SIGHUP from the system.
-
start
(frontend_addr='tcp://127.0.0.1:47291', backend_addr='tcp://127.0.0.1:47290', administrative_addr='tcp://127.0.0.1:47293')¶ Begin listening for connections on the provided connection strings
Parameters:
-
waiting_messages
= None¶ Message buffer. When messages can’t be sent because there are no workers available to take the job
-
workers
= None¶ List of queues by workers. Meta data about the worker such as the queue memebership and timestamp of last message received are stored here.
- Keys
queues
: list() of queue names and prioritiess the worker belongs to. e.g. (10, ‘default’)hb
: monotonic timestamp of the last received message from workeravailable_slots
: int count of jobs this manager can still process.
-