Consuming data from multiple RabbitMQ hosts

While working with RabbitMQ, you might have to deal with critical data. In such cases, you want high availability and safety. To achieve this, RMQ provides mirrored queues.

The principle is simple - you set up a cluster of rabbit hosts and create a queue that is shared across them. One of the nodes is master and the others are slaves. Whenever a consumer requests some data, the master ends up providing it, and then the slaves are updated accordingly. If a slave goes down, the system keeps running. If the master goes down, then one of the slaves is promoted as a new master.

This functionality is great, but it requires some work on the consumer side. Say we have a cluster of host1, host2, host3 and host1 is the master. Our consumer connects to host1, starts fetching data, and then host1 goes down. The cluster reconfigures itself and it is ready to go. However, the consumer cannot reconnect to host1 because it is down, so it has to know about the other hosts.

Unfortunately, I found no documentation on this behavior on the Internet, and Python RMQ libraries such as pika do not seem to support multiple hosts. However, if you check Celery code, you might see that its config options support a list of Rabbit hosts. Digging a bit, you'll find that Celery uses Kombu as a high level RMQ library. So let's see how we can use Kombu to achieve our failover-enabled consumer.

First of all, here is a code snippet that shows how you'd use Kombu to connect to a queue on a single host, fetch and process messages.

import kombu

connection = kombu.Connection('some host')  
connection.connect()  
channel = connection.channel()  
exchange = kombu.Exchange('some exchange')  
queue = kombu.Queue(name='some queue', exchange=exchange)

def callback(body, message):  
    print('got msg - %s' % body)
    message.ack()

consumer = kombu.Consumer(channel, queues=queue, callbacks=[callback])  
consumer.consume()

while True:  
    connection.drain_events()

For the multiple hosts use case, we just need to pass the list of hosts to Kombu. However, things will not automagically work (as yours truly thought at first). You have to handle connection errors and then reinitialize the consumer, every time Kombu connects to a new host. This is the bit that I did not find documented and, fortunately, @asksol helped me with in the #celery IRC channel.

import kombu

hosts = ['some-host-1',  
         'some-host-2',
         'some-host-3']
connection = kombu.Connection(hosts)  
connection.ensure_connection()  
connection.connect()  
channel = connection.channel()  
exchange = kombu.Exchange('some-exchange')  
queue = kombu.Queue(name='some-queue', exchange=exchange)

def callback(body, message):  
    print('got msg - %s' % body)
    message.ack()

consumer = kombu.Consumer(channel, queues=queue, callbacks=[callback])  
consumer.consume()  
while True:  
    try:
        connection.drain_events()
    except connection.connection_errors + connection.channel_errors:
        connection.close()
        print("Host down, connecting to the next one.")
        connection.ensure_connection()
        channel = connection.channel()
        consumer = kombu.Consumer(channel, queues=queue, callbacks=[callback])
        consumer.consume()

I hope this short guide will help you achieve the failover functionality that you need for your RabbitMQ consumer. Happy hacking!