RabbitMQ

Overview

  • Producer sends messages to an exchange

  • An exchange decides what to do with a message and maybe forwords it to one or more queues

  • A binding binds an exchange to a queue

  • A queue store the message until the consumer receives (and acknowledges) it

  • Get status

rabbitmqctl status
rabbitmqctl list_connections
rabbitmqctl list_exchanges
rabbitmqctl list_queues
rabbitmqctl list_users

User management

  • Add a new user

rabbitmqctl add_user <username> <password>
  • Change password

rabbitmqctl change_password <username> <newpassword>
  • Delete user

rabbitmqctl delete_user <username>
  • Testing access

import pika
parameters = pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials('guest', 'guest'))
connection = pika.BlockingConnection(parameters)
connection.disconnect()

Enable management plugin

/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
  • Restart rabbitmq-server

  • Point your browser to http://localhost:55672

Troubleshooting

  • RabbitMQ server needs disk_free_limit space (default 1 Gb) otherwise it wont accept messages

Python example

  • pip install pika

  • Sending

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', duarable=True)
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(delivery_mode=2))
print " [x] Sent 'Hello World!'"
  • duarable=True save queue before restarting / stopping server

  • delivery_mode=2 save messages of this queue

  • Receiving

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)

  channel.basic_qos(prefetch_count=1)
  channel.basic_consume(callback,
                        queue='hello',
                        no_ack=True)

  channel.start_consuming()
  • Set no_ack to False to send acks after task was processes otherwise messages could get lost if worker dies