rabbitmq on rhel

RabbitMQ needs a decentcopy or Erland.

I pulled the RHEL from the Web

You also need

  • socat
  • erlang
  • Java - I used V8 same as Elastic Search

RPM Files

I downloaded the following RPM files for this

  • erlang-20.1.5-1.el6.x86_64.rpm
  • rabbitmq-server-3.6.14-1.suse.noarch.rpm

These files are saved in the 172.28.11.40 admin/tim dev environment.

Install Process

yum install socat 
yum install ./erlang-20.1.5-1.el6.x86_64
yum install rabbitmq-server-3.6.14-1.suse.noarch.rpm

Starting and Stopping

To make sure RabbitMq auto starts.

chkconfig rabbitmq-server on

As an administrator, start and stop the server as usual using

  • /sbin/service rabbitmq-server stop/start/etc.

Adding Web Console Interface

RabbitMq for idiots like me, is easier to use with a Web Console interface. Note the RabbitMq must be running to do this.

rabbitmq-plugins enable rabbitmq_management

By default this will appear at

http://server-name:15672/

Ports needed

Port Purpose
4369 epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
5672, 5671 used by AMQP 0-9-1 and 1.0 clients without and with TLS
25672 used by Erlang distribution for inter-node and CLI tools communication and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). See networking guide for details.
15672 HTTP API clients and rabbitmqadmin (only if the management plugin is enabled)
61613, 61614 STOMP clients without and with TLS (only if the STOMP plugin is enabled)
1883, 8883 (MQTT clients without and with TLS, if the MQTT plugin is enabled
15674 STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
15675 MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)

This translates into quite a few mods for the firewall.

firewall-cmd --perm --add-port=4369/tcp
firewall-cmd --perm --add-port=5671/tcp
firewall-cmd --perm --add-port=5672/tcp
firewall-cmd --perm --add-port=25672/tcp
firewall-cmd --perm --add-port=15672/tcp
firewall-cmd --perm --add-port=15674/tcp
firewall-cmd --perm --add-port=15675/tcp
firewall-cmd --reload

Adding a Priv user

rabbitmqctl add_user admin OMAN@123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

At this point you will have a single node RabbitMq node.

#HA Rabbit Mq

High Availability RabbitMQ nodes.

Whilst having a MQ is better than lots of other options, having a single point of failure for the message queue is also not a good idea. So like all BCP we will duplicate the machines.

So we now have 2 machines. We need to have a queue that is shared on each machine. So that is a machine fals the data is still on the other queue.

At this point I am assuming the following

  • 2 Machines
  • Same Version of
    • Erlang
    • Java (not 100% about this need but seems sensible)
    • RabbitMq
    • We can Ping each machines

Nodes used

rabbitmaster = 192.168.59.4 (rmq1) rabbitslave = 192.168.59.5 (rmq2) haproxy = 192.168.59.6 (err?? No idea yet)

Please note haproxy is NOT a MACHINE

Shut Everything Down

  • on rmq1
    • service rabbitmq-server stop
  • on rmq2
    • service rabbitmq-server stop

Copy a Cooking from Master Node to slave Nodes

  • on rmq1

    • scp /var/lib/rabbitmq/.erlang.cookie tim@rmq2:~/
  • on rmq2

    • cd ~tim
    • su root
    • cp .erlang.cookie /var/lib/rabbitmq/

cp: overwrite ‘/var/lib/rabbitmq/.erlang.cookie’? yes Is normal - say Yes

Start All Nodes

On the Master and Slave start RabbitMq

service rabbitmq-server start

On the Slave Node ONLY

We need to reset the slave Node now.

 rabbitmqctl stop_app
 rabbitmqctl reset
 rabbitmqctl start_app

On the Master Node ONLY

Just on the Master Node now

rabbitmqctl stop_app
rabbitmqctl reset

Join a Node to the Cluster

On the Master node

rabbitmqctl join_cluster rabbit@rmq2

We see

Clustering node rabbit@rmq1 with rabbit@rmq2

Check the Cluster Status

The command is

rabbitmqctl cluster_status
Cluster status of node rabbit@rmq1
[{nodes,[{disc,[rabbit@rmq1,rabbit@rmq2]}]},{alarms,[{rabbit@rmq2,[]}]}]

Start App on Master

Only now on the master node

rabbitmqctl start_app

Apply HA Rules

On all nodes in the Cluster we do the following

rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Create a Priv user

The earlier Priv user was removed by the reset command I think so - create admin again

rabbitmqctl add_user admin OMAN@123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Create a TestQ

Using the Web Interface (rmq1:15672) I create a Queue called TestQ

Check Queue from Slave

On the Slave Machine

 rabbitmqctl list_queues

And I see

 Listing queues
Testq   0

Looks like it has worked !!

HA Time

We will install HAProxy

This is a federated service that will ensure both queues are accessed.

As this will only be installed on 1 machine - this does not seem like a good HA solution - but it is a starting point.

Install

yum install haproxy

Configure

vi /etc/haproxy/haproxy.cfg

global
daemon

defaults
mode tcp
maxconn 10000
timeout connect 5s
timeout client 100s
timeout server 100s

listen rabbitmq 192.168.59.6:5672
mode tcp
balance roundrobin
server rmq1 192.168.59.4:5672 check inter 5s rise 2 fall 3
server rmq2 192.168.59.5:5672 check inter 5s rise 2 fall 3

Python

Test Loader

#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials('admin', 'OMAN@123')
parameters = pika.ConnectionParameters('rmq',
                                       5672,
                                       '/',
                                       credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Test Reader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
              'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
   print " [x] Received %r" % (body,)
channel.basic_consume(callback,
                     queue='hello',
                     no_ack=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()