classes – Utility Classes

Defines some classes to use when implementing ZMQ devices

class eventmq.utils.classes.EMQPService

Helper for devices that connect to brokers.

Implements utility methods for sending EMQP messages for the following EMQP commands.

  • INFORM

Also implements utlitiy methods for managing long-running processes.

To use you must define:
  • self.outgoing - socket where messages can be sent to the Router
  • self.SERVICE_TYPE - defines the service type for INFORM. See send_inform() for more information.
  • self.poller - the poller that self.outgoing will be using. Usually: self.poller = eventmq.poller.Poller()

When messages are received from the router, they are processed in process_message() which then calls on_COMMAND. This should be used in the event loop so if you want to respond to the SCHEDULE command, you would define the method on_schedule in your service class.

See the code for Scheduler and JobManager for examples.

__weakref__

list of weak references to the object (if defined)

is_heartbeat_enabled

Property to check if heartbeating is enabled. Useful when certain properties must be updated for heartbeating :returns: bool - True if heartbeating is enabled, False if it isn’t

on_ack(msgid, ackd_msgid)

Sets awaiting_ack to False

process_message(msg)

Processes a message. Processing takes form of calling an on_EMQP_COMMAND method. The method must accept msgid and message as the first arguments.

Parameters:msg – The message received from the socket to parse and process.
reset()

Resets the current connection by closing and reopening the socket

send_inform(queues=())

Notify the router that this job manager is online and and ready for work. This includes a list of queues the router should forward messages for.

Parameters:
  • type (str) – Either ‘worker’ or ‘scheduler’
  • queues (list) –
    • For ‘worker’ type, the queues the worker is listening on and their weights.
      Example:
      ([10, ‘default’], [15, ‘push_notifications’])
    • Ignored for ‘scheduler’ type
Raises:

ValueError – When type_ does not match a specified type

Returns:

ID of the message

Return type:

str

Note

Passing a single string for queues is supported for backward compatibility and not recommended for new apps.

start(addr, queues='default')

Connect to addr and begin listening for job requests

Parameters:addr (str) – connection string to connect to
class eventmq.utils.classes.EMQdeque(full=None, pfull=None, on_full=None, initial=())

EventMQ deque based on python’s collections.deque with full and programmable full.

Note

Because of the programmable full, some of the methods that would normally return None return a boolean value that should be captured and checked to ensure proper error handling.

__init__(full=None, pfull=None, on_full=None, initial=())
Parameters:
  • full (int) – Hard limit on deque size. Rejects adding elements. Default: 0 - no limit
  • pfull (int) – Programmable limit on deque size, defaults to full length
  • on_full (func) – callback to call when full limit is hit
  • initial (iter) – The initial iteratable used to contruct the deque
__weakref__

list of weak references to the object (if defined)

append(item)

Append item to the right this deque if the deque isn’t full.

Note

You should check the return value of this call and handle the cases where False is returned.

Returns:
True if item was successfully added, False if the deque
is at the self.full limit. If it is, self.on_full is called.
Return type:bool
appendleft(item)

Append item to the left this deque if the deque isn’t full.

Note

You should check the return value of this call and handle the cases where False is returned.

Returns:
True if item was successfully added, False if the deque
is at the self.full limit. If it is, self.on_full is called.
Return type:bool
extend(iterable)

append iterable to the right (end) of the deque

Returns:
True if item was successfully added, False if the deque
is at the self.full limit. If it is, self.on_full is called.
Return type:bool
is_empty()

Check to see if the deque contains no items.

Returns:True if the deque contains 0 items. False otherwise
Return type:bool
is_full()

Check to see if the deque contains self.full items.

Returns:True if the deque contains at least full items. False otherwise
Return type:bool
is_pfull()

Check to see if the deque contains self.pfull items.

Returns:True if the deque contains at least pfull items. False otherwise
Return type:bool
peek()
Returns:the last (right-most) element of the deque
Return type:object
peekleft()
Returns:the first (left-most) element of the deque
Return type:object
pop()
Returns:the last (right-most) element of the deque
Return type:object
popleft()
Returns:the first (left-most) element of the deque
Return type:object
remove(item)

Remove item from the deque.

Parameters:item (object) – The item to remove from the deque
class eventmq.utils.classes.HeartbeatMixin(*args, **kwargs)

Provides methods for implementing heartbeats

__init__(*args, **kwargs)

Sets up some variables to track the state of heartbeaty things

__weakref__

list of weak references to the object (if defined)

is_dead(now=None)

Checks the heartbeat counters to find out if the thresholds have been met.

Parameters:now (float) – The time to use to check if death has occurred. If this value is None, then utils.timeutils.monotonic() is used.
Returns:
True if the connection to the peer has died, otherwise
False
Return type:bool
reset_heartbeat_counters()

Resets all the counters for heartbeats back to 0

send_heartbeat(socket)

Send a HEARTBEAT command to the specified socket

Parameters:socket (socket) – The eMQP socket to send the message to
Returns:ID of the message
Return type:str
class eventmq.utils.classes.ZMQReceiveMixin

Defines some methods for receiving messages. This class will not work if used on it’s own

__weakref__

list of weak references to the object (if defined)

recv()

Receive a message

recv_multipart()

Receive a multipart message

class eventmq.utils.classes.ZMQSendMixin

Defines some methods for sending messages. This class will not work if used on it’s own

__weakref__

list of weak references to the object (if defined)

send(message, protocol_version)

Sends a message

Parameters:
  • message – message to send to something
  • protocol_version (str) – protocol version. it’s good practice, but you may explicitly specify None to skip adding the version
send_multipart(message, protocol_version, _recipient_id=None)

Send a message directly to the 0mq socket. Automatically inserts some frames for your convience. The sent frame ends up looking something like this

(_recipient_id, ‘’, protocol_version) + (your, tuple)
Parameters:
  • message (tuple) – Raw message to send.
  • protocol_version (str) – protocol version. it’s good practice but you may explicitly specify None to skip adding the version
  • _recipient_id (object) – When using a zmq.ROUTER you must specify the the recipient id of the remote socket