Message Queue

Message Queue

Message queuing makes it possible for applications to communicate asynchronously, by sending messages to each other via a queue. A message queue provides temporary storage between the sender and the receiver so that the sender can keep operating without interruption when the destination program is busy or not connected.

Basic concept

💡 Asynchronous processing: allows a task to call a service, and move on to the next task while the service processes the request at its own pace.

  • A queue is a line of things waiting to be handled — in sequential order starting at the beginning of the line. A message queue is a queue of messages sent between applications. It includes a sequence of work objects that are waiting to be processed.

  • A message is the data transported between the sender and the receiver application; it’s essentially a byte array with some headers on top. An example of a message could be an event. One application tells another application to start processing a specific task via the queue.

  • The basic architecture of a message queue is simple: there are client applications called producers that create messages and deliver them to the message queue. Another application called a consumer, connects to the queue and gets the messages to be processed. Messages placed in the queue are stored until the consumer retrieves them.

  • The queue can provide protection from service outages and failures.

  • Examples of queues: Kafka, Heron, real-time streaming, Amazon SQS, and RabbitMQ.

RabbitMQ Exchanges, routing keys and bindings

What is an exchange? What are routing keys and bindings? How are exchanges and queues associated with each other? When should I use them and how? This article explains the different types of exchanges in RabbitMQ and scenarios for how to use them.

Messages are not published directly to a queue. Instead, the producer sends messages to an exchange. Exchanges are message routing agents, defined by the virtual host within RabbitMQ. An exchange is responsible for routing the messages to different queues with the help of header attributes, bindings, and routing keys.

A binding is a "link" that you set up to bind a queue to an exchange.

The routing key is a message attribute the exchange looks at when deciding how to route the message to queues (depending on exchange type).

Exchanges, connections, and queues can be configured with parameters such as durable, temporary, and auto delete upon creation. Durable exchanges survive server restarts and last until they are explicitly deleted. Temporary exchanges exist until RabbitMQ is shut down. Auto-deleted exchanges are removed once the last bound object is unbound from the exchange.

In RabbitMQ, there are four different types of exchanges that route the message differently using different parameters and bindings setups. Clients can create their own exchanges or use the predefined default exchanges which are created when the server starts for the first time.

Standard RabbitMQ message flow

  1. The producer publishes a message to the exchange.

  2. The exchange receives the message and is now responsible for the routing of the message.

  3. Binding must be set up between the queue and the exchange. In this case, we have bindings to two different queues from the exchange. The exchange routes the message into the queues.

  4. The messages stay in the queue until they are handled by a consumer.

  5. The consumer handles the message.

https://www.cloudamqp.com/img/blog/exchanges-bidings-routing-keys.png

If you are not familiar with RabbitMQ and message queueing.

Exchange types

Direct Exchange

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added to the message header by the producer. Think of the routing key as an "address" that the exchange is using to decide how to route the message. A message goes to the queue(s) with the binding key that exactly matches the routing key of the message.

The direct exchange type is useful to distinguish messages published to the same exchange using a simple string identifier.

The default exchange AMQP brokers must provide for the direct exchange is "amq.direct".

SCENARIO 1

  • Exchange: pdf_events

  • Queue A: create_pdf_queue

  • Binding key between exchange (pdf_events) and Queue A (create_pdf_queue): pdf_create

SCENARIO 2

  • Exchange: pdf_events

  • Queue B: pdf_log_queue

  • Binding key between exchange (pdf_events) and Queue B (pdf_log_queue): pdf_log

EXAMPLE

Example: A message with routing key pdf_log is sent to the exchange pdf_events. The messages is routed to pdf_log_queue because the routing key (pdf_log) matches the binding key (pdf_log).

If the message routing key does not match any binding key, the message is discarded.

Direct Exchange: A message goes to the queues whose binding key exactly matches the routing key of the message.

  1. Default exchange

    The default exchange is a pre-declared direct exchange with no name, usually referred by an empty string. When you use default exchange, your message is delivered to the queue with a name equal to the routing key of the message. Every queue is automatically bound to the default exchange with a routing key which is the same as the queue name.

  2. Fanout Exchange

    A fanout exchange copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. The keys provided will simply be ignored.

    Fanout exchanges can be useful when the same message needs to be sent to one or more queues with consumers who may process the same message in different ways.

    The image to the right (Fanout Exchange) shows an example where a message received by the exchange is copied and routed to all three queues bound to the exchange. It could be sport or weather updates that should be sent out to each connected mobile device when something happens, for instance.

    Fanout Exchange: The received message is routed to all queues that are bound.

  3. Topic Exchange

    Topic exchanges route messages to queues based on wildcard matches between the routing key and the routing pattern, which is specified by the queue binding. Messages are routed to one or many queues based on a matching between a message routing key and this pattern.

Summary:

RabbitMQ is a message broker that accepts and forwards messages.

Amqplib is a client library for RabbitMQ that allows you to interact with RabbitMQ using JavaScript in Nodejs. It allows you to send and receive messages, and to perform various other operations with RabbitMQ.ni

💡
Its better to create less number of channel because if more channel then more tcp connection.
  • Exchange_Name: there can be many exchange name possible so we should mention one name The exchange name in RabbitMQ is the name of the exchange to which messages are published. It is used by the producer to specify which exchange the message should be sent to. Each exchange in RabbitMQ has a unique name, and messages are routed to queues based on the exchange's routing rules.

  • Reminder_binging_key: here we would be having reminder queue so we made an reminder binding key as reminder_service

  • MESSAGE_BROKER_URL: our message broker is rabbitMQ and this url helps us to connect to rabbitMQ

Implementation using nodejs and amqplib:

Our basic env that we will be using:

EXCHANGE_NAME=AIRLINE BOOKING
REMINDER_BINDING_KEY=REMINDER_SERVICE
MESSAGE_BROKER_URL='amqp://127.0.0.1:5672'
const amqplib = require("amqplib");
const { json } = require("sequelize");
const { MESSAGE_BROKER_URL, EXCHANGE_NAME } = require("../config/serverConfig");

// This function creates a new channel for communication with the message broker.
const createChannel = async () => {
  try {
    // Connect to the message broker using the URL specified in the configuration.
    const connection = await amqplib.connect(MESSAGE_BROKER_URL);

    // Create a new channel for communication with the message broker.
    const channel = await connection.createChannel();

    // Assert the exchange used for sending messages, using the name specified in the configuration.
    await channel.assertExchange(EXCHANGE_NAME, "direct", false);
        //assertExchange: if no such exchange exists with this EXCHANGE_NAME then create
        //and other two parms are mandatory, just put direct and false for basic setup

    // Return the newly created channel for use in other functions.
    return channel;
  } catch (error) {
    throw error;
  }
};

// This function subscribes to messages from the message broker using the specified channel and binding key.
const subscribeMessage = async (channel, service, binding_key) => {
  try {
    // Assert a new queue for receiving messages from the message broker.
    const applicationQueue = await channel.assertQueue("REMINDER_QUEUE");

    // Bind the newly created queue to the exchange using the specified binding key.
    channel.bindQueue(applicationQueue.queue, EXCHANGE_NAME, binding_key);

    // Consume messages from the queue and handle them using the provided service function.
    channel.consume(applicationQueue.queue, (msg) => {
      console.log("received data");
      console.log(msg.content.toString()); //->this is fully in string

      // Parse the message payload as JSON.
      const payload = JSON.parse(msg.content.toString());

      // Call the service function with the message content.
      service(msg.content);

      // Acknowledge receipt of the message.
      channel.ack(msg);
    });
  } catch (error) {
    throw error;
  }
};

// This function publishes a message to the message broker using the specified channel and binding key.
const publishMessage = async (channel, binding_key, message) => {
  try {
    // Assert a new queue for sending messages to the message broker.
    await channel.assertQueue("REMINDER_QUEUE");
        //use binding key to which queue to send the message
        //buffer.from-> converts the message to required form that is consumed by
        //rabbitMQ

    // Publish the message to the exchange using the specified binding key.
    await channel.publish(EXCHANGE_NAME, binding_key, Buffer.from(message));
  } catch (error) {
    throw error;
  }
};

// Export the functions for use in other modules.
module.exports = {
  subscribeMessage,
  createChannel,
  publishMessage,
};

Thanks a lot for reading the article.
Hope you found it helpful.

Linkedin: https://www.linkedin.com/in/tautikk/
Email:
Twitter: https://twitter.com/TautikA

Did you find this article valuable?

Support Tautik Agrahari by becoming a sponsor. Any amount is appreciated!