noc.core.nsq.topic

Module Contents

class noc.core.nsq.topic.TopicQueue(topic, io_loop=None)

Bases: object

static iter_encode_chunks(message, limit=(config.nsqd.mpub_size) - (8))

Encode data to iterable atomic chunks of up to limit size

Parameters:
  • message – Input data
  • limit – Chunk limit
Returns:

Yields JSON-encoded chunks

Raises:

ValueError – If message cannot be encoded or too big

put(self, message, fifo=True)

Put message into queue. Block if queue is full

Parameters:
  • message – Message of any json-serializable type
  • fifo – Boolean. Append message to the start of queue (LIFO) if False. Append message to the end of queue (FIFO) if True.
Returns:

_notify_all(self)
return_messages(self, messages)

Return messages to the start of the queue

Parameters:messages – List of messages
Returns:
iter_get(self, n=1, size=None, total_overhead=0, message_overhead=0)

Get up to n items up to size size.

Warning queue will be locked until the end of function call.

Parameters:
  • n – Amount of items returned
  • size – None - unlimited, integer - upper size limit
  • total_overhead – Adjust total size to total_overhead octets.
  • message_overhead – Adjust total size to message_overhead per each returned message.
Returns:

Yields items

is_empty(self)

Check if queue is empty

Returns:True if queue is empty, False otherwise
qsize(self)

Returns amount of messages and size of queue

Returns:messages, total size
shutdown(self)

Begin shutdown sequence. Disable queue writes

Returns:
wait(self, timeout=None, rate=None)

Block and wait up to timeout :param timeout: Max. wait in seconds :param rate: Max. rate of publishing in messages per second :return:

apply_metrics(self, data)