Table of Contents
Getting Started
Prerequisites
This text is a continuation of How To Use RabbitMQ and Python's Puka to Deliver Messages to Multiple Consumers and requires the same software bundle up and running properly. Also, the same definitions are used throughout the article and we assume that the reader is acquainted with subjects from the former text.
Exchanges
We have already described the fanout exchange, that delivers messages to every queue bound to that exchange with no additional rules in place. It is a very useful mechanism, but lacks flexibility. It is often undesirable to receive everything a producer emits to the exchange. RabbitMQ offers two different exchange types that can be used to implement more complex scenarios. One of these is direct exchange.
Direct exchange
Introduction
Direct exchange offers a simple, key-based routing mechanism in RabbitMQ. It is somewhat similar to the nameless exchange used in the very first example, in which a message was delivered to the queue of name equal to the routing key of the message. However, whereas with nameless exchange there was no need to define explicit queue bindings, in direct exchange the bindings are crucial and mandatory.
When using direct exchange, each message produced to that exchange must have a routing key specified, which is an arbitrary name string, e.g. *Texas*. The message will then be delivered to all queues that have been bound to this exchange with the same routing key (all queues that were explicitly declared as interested in messages with *Texas* routing key).
The biggest difference between basic nameless exchange and direct exchange is that the latter needs bindings and no queue listens to messages on that exchange before that. That in turn results in three great advantages.
- One queue can be bound to listen to many different routing keys on the same exchange
- One queue can be bound to listen on many different exchanges at once
- Many queues can be bound to listen to the same routing key on an exchange
Let's imagine a big city hub: a rail and bus station in one, with many destinations reachable by both means of transportation. And let's imagine that the station wants to dispatch departure notifications using RabbitMQ. The task is to inform everyone interested that a bus or train to *Seattle*, *Tooele*, or *Boston* departs soon.
Such a program would define a direct departures exchange to which all interested customers could subscribe their queues. Then messages containing departure time would be produced to that exchange with the routing key containing the destination. For example:
- Message to
departuresexchange with routing keyTooeleand body2014-01-03 15:23
- Message to
departuresexchange with routing keyBostonand body2014-01-03 15:41
- Message to
departuresexchange with routing keySeattleand body2014-01-03 15:55
Since one queue may be bound to many routing keys at once, and many queues can be bound to the same key, we could easily have:
- One customer interested in *Tooele* only
- One customer interested in *Boston* only
- Another customer interested in *Tooele* and *Boston* at the same time
All waiting for information at the same time. They would receive proper messages using our direct exchange.
Producer
—
To simplify the task slightly for the example, let's write a basic notification dispatcher that will accept one command line parameter. It will specify the destination and the application will send the current time to all interested consumers.
Create a sample python script named direct_notify.py
~~~~
vim direct_notify.py
~~~~
and paste the script contents:
~~~~
import puka
import datetime
import time
import sys
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)
exchange_promise = producer.exchange_declare(exchange='departures', type='direct')
producer.wait(exchange_promise)
message = "%s" % datetime.datetime.now()
message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message)
producer.wait(message_promise)
print "Departure to %s at %s" % (sys.argv[1], message)
producer.close()
~~~~
Press :wq to save the file and quit.
Running the script with one parameter should print the current time and used destination. The output should look like:
~~~~
root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#
~~~~
Let's go through the script step by step:
- Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely.
- A named
departuresdirect exchange is created. It does not need routing key specified at creation, as any message published to that exchange can have different key assigned to it. After that step the exchange exists on the RabbitMQ server and can be used to bind queues to it and send messages through it.
- A message containing current time is published to that exchange, using the command line parameter as the routing key. In the sample run Tooele is used as the parameter, and hence as the departure destination – routing key.
Note: for simplicity, the script does not check whether the mandatory command line argument is supplied! It will not work properly if executed without parameters.
Consumer
—
This example consumer application will act as a public transport customer interested in one or more of the destinations reachable from the station.
Create a sample python script named direct_watch.py
~~~~
vim direct_watch.py
~~~~
and paste the script contents:
~~~~
import puka
import sys
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']
for destination in sys.argv[1:]:
print "Watching departure times for %s" % destination
bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination)
consumer.wait(bind_promise)
message_promise = consumer.basic_consume(queue=queue, no_ack=True)
while True:
message = consumer.wait(message_promise)
print "Departure for %s at %s" % (message['routing_key'], message['body'])
consumer.close()
~~~~
Press :wq to save the file and quit.
Running the script with one parameter *Tooele* should announce that the script watches departure times for *Tooele*, whereas running it with more than one parameter should announce watching departure times for many destinations.
~~~~
root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(…)
root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(…)
root@rabbitmq:~#
~~~~
Let's go through the script step by step to explain what it does:
- Consumer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely.
- A temporary queue for this particular consumer is created, with auto-generated name by RabbitMQ. The queue will be destroyed after the script ends.
- The queue is bound to all
departuresexchange on all routing keys (destinations) specified using command line parameters, printing on the screen each destination for information.
- The script starts waiting for messages on the queue. It shall receive all messages matching the bound routing keys. When running with *Tooele* as a single parameter – only those, when running with both *Tooele* and *Boston* – on both of them. Each departure time will be printed on the screen.
Testing
—
To check whether both scripts work as expected, open three terminal windows to the server. One will be used as a public transport station to send notifications. Another two will serve as customers waiting for departures.
In the first terminal, run the direct_notify.py script once with any parameter:
~~~~
root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#
~~~~
Important: the direct_notify.py script must be executed at least once before any consumers, as the exchange must be created before binding queues to it. After execution the exchange stays on the RabbitMQ server and can be used freely.
In the second terminal, run the direct_watch.py script with one parameter – *Tooele*:
~~~~
root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(…)
root@rabbitmq:~#
~~~~
In the third terminal, run the direct_watch.py script with two parameters – *Tooele* and *Boston*:
~~~~
root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(…)
root@rabbitmq:~#
~~~~
Then, back in the first terminal, send three departure notifications. One to *Tooele*, one to *Boston* and one to *Chicago*:
~~~~
root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~# python direct_notify.py Boston
Departure to Tooele at 2014-02-18 15:57:31.035000
root@rabbitmq:~# python direct_notify.py Chicago
Departure to Tooele at 2014-02-18 15:57:35.035000
root@rabbitmq:~#
~~~~
The first notification should be received only by both consumers waiting for departures to Tooele. The second one should get only to the consumer waiting for departures to Boston. The third one should not be received by any of these consumers, since none of them wait for departures to Chicago.
This is the expected behaviour. Those simple examples illustrate how to dispatch messages that only certain consumers specified by a routing key will receive.
Further reading
—
Direct routing does not offer complete control over where the messages will be delivered, but is a big step up from fanout exchange used in previous exchanges that blindly delivers messages everywhere. With direct exchange many real world messaging scenarios can be served and the process is not terribly difficult.
The primary goal of this text was to introduce basic direct routing using a simple, real world situation. Many other uses are covered in detail in official RabbitMQ documentation which is a great resource for RabbitMQ users and administrators.