2.11.3. Throttle messages for a slow consumer¶
This example shows how to throttle messages that are published are a rate faster than what a slow consumer (subscribed) can process. In this example, only the newest messages are preserved, messages that cannot be consumed on time are dropped.
import time
import logging
import roslibpy
# Configure logging
fmt = '%(asctime)s %(levelname)8s: %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
log = logging.getLogger(__name__)
client = roslibpy.Ros(host='127.0.0.1', port=9090)
def to_epoch(stamp):
stamp_secs = stamp['secs']
stamp_nsecs = stamp['nsecs']
return stamp_secs + stamp_nsecs*1e-9
def from_epoch(stamp):
stamp_secs = int(stamp)
stamp_nsecs = (stamp - stamp_secs) * 1e9
return {'secs': stamp_secs, 'nsecs': stamp_nsecs}
def receive_message(msg):
age = time.time() - to_epoch(msg['stamp'])
fmt = 'Age of message (sequence #%d): %6.3f seconds'
log.info(fmt, msg['seq'], age)
# Simulate a very slow consumer
time.sleep(.5)
publisher = roslibpy.Topic(client, '/slow_consumer', 'std_msgs/Header')
publisher.advertise()
# Queue length needs to be used in combination with throttle rate (in ms)
# This value must be tuned to the expected duration of the slow consumer
# and ideally bigger than the max of it,
# otherwise message will be older than expected (up to a limit)
subscriber = roslibpy.Topic(client, '/slow_consumer', 'std_msgs/Header',
queue_length=1, throttle_rate=600)
subscriber.subscribe(receive_message)
seq = 0
def publish_message():
global seq
seq += 1
header = dict(frame_id='', seq=seq, stamp=from_epoch(time.time()))
publisher.publish(header)
client.call_later(.001, publish_message)
client.on_ready(publish_message)
client.run_forever()
In the console, you should see gaps in the sequence of messages, because the publisher is producing messages every 0.001 seconds, but we configure a queue of length 1, with a throttling of 600ms to give time to our slow consumer. Without this throttling, the consumer would process increasingly old messages.