April 25, 2024
rabbitmq python java php example

RabbitMQ + Java + Python + PHP Real-World Example

RabbitMQ is one of the most popular and easy to deploy message brokers. It receives messages from producers, and delivers them to consumers. It is used to reduce loads and delivery times in applications, and it is widely used in microservice architectures to facilitate asynchronous communication between services. One thing that makes this message broker very popular is that for all the major programming languages, the client libraries to work with the RabbitMQ are available and easy to use. In this article I am going to build a simple project in which different services with different technology communicate to each other via RabbitMQ.

Project Description

We want to build a registration system that sends SMS & email confirmation messages to user after registration. We have 3 tasks to be done in registration process : recording user information in database, sending SMS confirmation and sending email confirmation.

Monolith approach

The simplest approach for implementing this system is to do all tasks in sequence and synchronously as a monolith application. In this way, first we store user information in database, then, after completion of first step we send SMS message to user by calling external 3dparty API and wait for its response, next we call the email function to send confirmation email and wait for the function to return the result, finally we can show the success message to user in the browser.

Sequential approach for registration system

One of disadvantages of this approach is that user will face with an unexpected delay in the situation that any of SMS API or email function takes more time to respond. Moreover, if for any reason the system fail just after first step (storing data in database), no SMS and no email message will be sent to user, because all the next tasks depend to completion of first one .

Separated services with asynchronous inter-communication

Separated services allows a system to be divided into a number of smaller, independent and loosely coupled parts. Each part is only responsible for its own tasks , particularly in microservice architecture each service has got exclusive and isolated resources. They work as autonomous processes and communicate with each other through APIs which can be in synchronous or asynchronous way. In this section I focus on asynchronous way of communicating. Each service can be implemented in a different programming language and technology on a different platform.

The Registration System as I described above consists of 3 main parts : recording user information in database, sending SMS confirmation and sending email confirmation. We simply assign each task to an individual service with same name.

Turning monolith project into number of Individual Independent Services

As it is crystal clear,

  • User Service are in charge of tasks related to users including user registration.
  • Email Service is responsible for sending email upon request.
  • And SMS Service‘s job is to send mobile SMS upon request.

Although each service is assigned and responsible to/for own tasks separately, they are in need of communicating to one another. In this case, the user service needs to send request to two other services to send confirmation messages. As it’s mentioned earlier we want to experiment the asynchronous approach of communication. One choice in this sort of communication is to use message queue. The client service send his request as a message to a queue (we call it Producer service) and the other service which serves the first service, consumes the messages within queue in FIFO order and accomplishes the related tasks (it is called Consumer service). In this project I use RabbitMQ as message broker which is responsible for receiving messages from producers, and delivers them to consumers.

RabbitMQ installation and configuration

RabbitMQ needs Erlang to work. So first we should download and install the erlang on our system. At the time of this post, the latest version of RabbitMQ is 3.9.12 which is compatible with Erlang 23.2 and later versions.

Erlang 24.1.12 download page : link
RabbitMQ 3.912 download page : link

Install RabbitMQ after Erlang. The RabbitMQ service starts after installation is completed. If it didn’t start, run it manually from service manager of your operating system. In Windows it’s better to add sbin folder of RabbitMQ to system path otherwise you need to run your command under this directory. By default, RabbitMQ listens on port 5672 on all interfaces.

Then we need to enable the RabbitMQ management plugin, it provides an HTTP API for RabbitMQ management, a web based interface and a command line tool. We don’t need to install it separately, it is already included in the RabbitMQ installation package , it just needs to be enabled :

rabbitmq-plugins enable rabbitmq_management

The web UI can be accessed at http://localhost:15672. But, we need a username with appropriate permissions to be able to login to management UI. Let’s create one :

# create a user 
rabbitmqctl add_user rabbitmq_admin 1234@5
# give the user the "administrator" privilege (tag)
rabbitmqctl set_user_tags rabbitmq_admin administrator

Now, open  http://localhost:15672 in your browser and login with rabbitmq_admin and its password 1234@5 . After login you will be redirected to Overview page where you can see server related information and statistics. First we need to define a new virtual host for our project from Admin tab> Virtual Hosts > Add a new virtual host section :

Add new virtual host in RabbitMQ for our project
Add new virtual host in RabbitMQ for our project

As you can see in the list, I have already added a virtual host with shaeri-vc name. Next, go to Exchange tab, in add new exchange, select the virtual host you created in previous step, choose a name for your exchange in the Name field, I chose “new_registration”, it is important because we are going to use it our codes in later steps and select fanout in the Type dropdown, click Add exchange to create the exchange. We can also create exchange in code that is more common, but here I aim to show you how to work with management web UI.

Add new Fanout Exchange

We need to send message to both SMS queue and Email queue upon every user registration events. I created a Fanout exchange because it routes messages to all bound queues, which is exactly what we want. Now that we have our RabbitMQ server installed, configured and ready to work, we dive into codes.

User Registration Service

Registration service is the entry point of our tiny project. It accepts user’s basic information including Name, Family, Mobile number and Email address, stores them into database and sends a message containing these data to RabbitMQ, two other services that are listening to the queues which are bound to new_registration exchange will consume this message and perform appropriate actions.

RabbitMQ Fanout Exchange
User service sends a message to fanout exchange, two other services that are listening to the queues which are bound to the exchange consumes the message.

We code the Registration Service in PHP, a simple API that accepts user information in json forma as a HTTP POST request. For communicating with RabbitMQ in PHP we use  php-amqplib, the most widely used PHP client for RabbitMQ. To communicating with message broker, clients need to pass authentication and authorization. So, before we go to code, we need to create a user for our Registration service in RabbitMQ (Admin Tab> Users), click on the user name in the list, and set its permission to write on new_registration exchange. Everything is ready to dive into code, we need to send user object to message broker to be delivered to other services, but RabbitMQ has no knowledge on object, messages all are just byte arrays to it. So, we need to serialize objects before sending them to RabbitMQ.

<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

// Request sample payload => { "name": "Alex", "family": "Sherif", "phone": "09205551325", "email" : "my_email@yahoo.com"}

// get request payload
$inputJSON = file_get_contents('php://input');
$user_object= json_decode( $inputJSON );

// Store the new user information in Database
$user_id = StoreInDB($user_object);

// set RabbitMQ client parameters
$exchange = "new_registration";
$vhost = "shaeri-vc";
$rabbitmq_username = "register_service";
$rabbitmq_password = "1234";
// create a connection to RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, $rabbitmq_username, $rabbitmq_password, $vhost);

// Create a Channel on the existing connection
$channel = $connection->channel();

// serialize the user object 
$messageBody = json_encode($user_object);
// Prepare the message
$message = new AMQPMessage($messageBody,
                          array('content_type' => 'application/json',
                                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
// send the message to RabbitMQ
$channel->basic_publish($message, $exchange);
 
//Close the Channel & Connection
$channel->close();
$connection->close();

// return result to client
header('Content-Type: application/json; charset=utf-8');
echo "{'result':'OK', 'userid': $user_id}";

?>

In the line 22, I created a connection to RabbitMQ, then in the line 25, a channel is created upon the connection. Next , in the lines 28-33 the message is prepared and sent to message broker. Then I closed the connection and channel in the lines 36 and 37.

Single Active Consumer

Before we go to next steps, we should know how we want to deal with messages in queues, how many instance of each service will be listening to a queue at same time, and how to handle fails in services. Here I preferer to use Single active consumer queue mode. It attachs multiple consumers to a queue, however the queue accepts only one active consumer at a time and ignores other consumers. It switches to another registered consumer in case the active one is cancelled or dies. This feature can be enabled at queue declaration time.

rabbitmq single active consumer
Single active consumer queue accepts only one consumer at a time and ignores other consumers.

Email service

In the Email service which I want to write in Python, we need to create a queue and bind it to new_registration exchange, then start consuming the message from the queue, and trigger a task on every received message that is sending email.

To communicate with RabbitMQ in Python, pika package must be installed using pip :

pip install pika --upgrade

The complete code for Email Service :

import pika, time, json, smtplib, ssl

def email_function(msg):

  print(" Email service started")

  #Convert the serialized messaged to Python dictionary
  user_info = json.loads(msg)

  print(" [x] Sending Email to " + user_info['email'])
  port = 465  # For SSL
  smtp_server = "smtp.gmail.com"
  sender_email = "your_email@gmail.com"  
  receiver_email = user_info['email']  
  password = "YourEmailPassword"
  message = "Welcome to our Application, it is a confirmation email!"
  context = ssl.create_default_context()
  with smtplib.SMTP_SSL(smtp_server, port, context=context) as server:
    server.login(sender_email, password)
    server.sendmail(sender_email, receiver_email, message)

  time.sleep(5) # delays for 5 seconds
  print(" Email service finished")
  return

# set credentials parameters
credentials = pika.PlainCredentials('email_service', '1234')
parameters = pika.ConnectionParameters('localhost',
                                   5672,
                                   'shaeri-vc',
                                   credentials)

# create connection
connection = pika.BlockingConnection(parameters)
# create channel
channel = connection.channel()

# create single active queue if it is not already created
channel.queue_declare(queue='email_queue', arguments = {"x-single-active-consumer": True})
# bind the queue to new_registration exchange
channel.queue_bind(exchange='new_registration', queue='email_queue')

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  email_function(body)

# set up subscription on the queue
channel.basic_consume('email_queue',
  callback,
  auto_ack=True)

# start listening to the queue (blocks)
channel.start_consuming()
connection.close()

As you can see in the code, from the line 27-36 it is connecting to RabbitMQ and creating a channel. In the line 39, it creates the queue and in line 41 it bind the created queue to “new_registration” exchange. Next we need a function to be called on incoming messages, I defined callback function and set up subscription on the queue with this function in line 48. Finally it starts consuming in line 53, it blocks the thread and listens to incoming message in queue. As I mentioned, RabbitMQ works with byte array and string, we sent serialized object to RabbitMQ in User service, so in line 8, I de-serialize message received from RabbitMQ and convert it to Python dictionary.

SMS Service

Like Email service, this service should listen to the queue bound to “new_registration” exchange. Every concepts are same as what I did in last section, except that I want to write it in Java. In order to communicate to broker I am going to use the Java client provided by RabbitMQ, amqp-client . Although I could use the .jar file of this library, I download it by Maven to have better dependency management. Note that it’s necessary to import  SLF4J API and SLF4J Simple, because the RabbitMQ client depends on these two libraries. You may noticed the org.json, httpclient and httpmime in dependencies, don’t worry, we need the org.json to de-serialize the message body, and the two other library helps us in call 3dParty SMS provider’s web service:

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.33</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.33</version>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.14.1</version>
    </dependency>
    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20211205</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpmime</artifactId>
        <version>4.5.1</version>
    </dependency>

</dependencies>

And finally the code for SMS service in Java :

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.HttpClientBuilder;
import org.json.*;

import java.nio.charset.StandardCharsets;

public class app {
    private final static String QUEUE_NAME = "SMS_Queue";
    private final static String EXCHANGE_NAME = "new_registration";

    public static void main(String[] argv) throws Exception {

        // create connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://sms_service:1234@localhost:5672/shaeri-vc");
        Connection connection = factory.newConnection();
        // create channel
        Channel channel = connection.createChannel();

        // enable the single active mode
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-single-active-consumer", true);
        // create queue if it is not already created
        channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
        // bind the queue to new_registration exchange
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        //create a callback function to handle incoming messages
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            try {
                JSONObject userObject = new JSONObject(message);
                System.out.println(" [x] New task received : Sending SMS to " + userObject.getString("phone") );
                HttpClient client = HttpClientBuilder.create().build();
                HttpUriRequest httpUriRequest = new HttpGet("http://URL/to/sms/provider/");
                HttpResponse response = client.execute(httpUriRequest);
                System.out.println(response);
                System.out.println(" [x] SMS sent to : "  + userObject.getString("phone") );
            }catch (Exception err){
                System.out.println(" [x] Error Occurred" + err.toString() );
            }

        };


        System.out.println(" [*] Waiting for new task");
        // start listening to the queue (it doesn't blocks)
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

2 thoughts on “RabbitMQ + Java + Python + PHP Real-World Example

Leave a Reply

Your email address will not be published. Required fields are marked *