Spring Boot Messaging Queue RabbitMQ using AMQP - Java @ Desk

Tuesday, June 27, 2017

Spring Boot Messaging Queue RabbitMQ using AMQP

Spring Boot RabbitMQ using AMQP

Below example configures the messaging queue between the sender and the listener. In the example, the scheduler sends the message to the queue every 3 seconds which will be pulled by the Listener class.

The implementation is in the Spring Boot.

AMQP is a protocol that defines communication between systems.

Project Setup
1) pom.xml - Defines the entry for Spring Boot AMPQ library
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 <version>1.5.4.RELEASE</version>
</dependency>


2) Sender.java - This class uses a scheduler implementation that pushes the message into the queue every 3 seconds. RabbitTemplate class helps sending the message to the Listener. It pushes the message into the queue.
package com.accenture.springmessage;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class Sender {

 @Autowired
 private final RabbitTemplate rabbitTemplate;

 @Autowired
 public StoryEnsRequestSender(final RabbitTemplate rabbitTemplate) {
  this.rabbitTemplate = rabbitTemplate;
 }

 @Scheduled(fixedDelay = 3000L)
 public void sendMessage() {
  String message = "Hello";
  System.out.println("Send Message - " + message);
  rabbitTemplate.convertAndSend(App.EXCHANGE_NAME, App.ROUTING_KEY, message);
 }
}


3) Listener.java - This class uses the @RabbitListener annotation that fetches the message from the queue. The queue is identified using the queue names attribute.
package com.accenture.springmessage;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class Listener {

 @RabbitListener(queues = App.QUEUE_SPECIFIC_NAME)
 public void receiveMessage(final String customMessage) {
  System.out.println("Received message - " + customMessage.toString());
 }
}


4) App.java - Configuration class to define the queue name, exchange name and routing key. Class must implement RabbitListenerConfigurer and below annotations are used @SpringBootApplication, @EnableScheduling, @EnableRabbit.
Enable Rabbit annnotation is used to enable the rabbit configuration.
Enable Scheduling pushes the message in the queue every 3 seconds.


package com.accenture.springmessage;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * Hello world!
 *
 */
@SpringBootApplication
@EnableScheduling
@EnableRabbit
public class App implements RabbitListenerConfigurer {
 public static void main(String[] args) throws InterruptedException {
  SpringApplication.run(App.class, args);
 }

 public static final String EXCHANGE_NAME = "appExchange";
 public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue";
 public static final String ROUTING_KEY = "messages.key";

 @Bean
 public TopicExchange appExchange() {
  return new TopicExchange(EXCHANGE_NAME);
 }

 @Bean
 public Queue appQueueSpecific() {
  return new Queue(QUEUE_SPECIFIC_NAME);
 }

 @Bean
 public Binding declareBindingSpecific() {
  return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
 }

 @Bean
 public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
  final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
  return rabbitTemplate;
 }

 @Bean
 public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
  return new Jackson2JsonMessageConverter();
 }

 @Bean
 public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
  return new MappingJackson2MessageConverter();
 }

 @Bean
 public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
  DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
  factory.setMessageConverter(consumerJackson2MessageConverter());
  return factory;
 }

 @Override
 public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
  registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
 }
}


5) Output

Send Message - Hello at - Tue Jun 27 19:50:22 IST 2017
Received message - Hello at - Tue Jun 27 19:50:22 IST 2017
Send Message - Hello at - Tue Jun 27 19:50:25 IST 2017
Received message - Hello at - Tue Jun 27 19:50:25 IST 2017
Send Message - Hello at - Tue Jun 27 19:50:28 IST 2017
Received message - Hello at - Tue Jun 27 19:50:28 IST 2017
Send Message - Hello at - Tue Jun 27 19:50:31 IST 2017
Received message - Hello at - Tue Jun 27 19:50:31 IST 2017
Send Message - Hello at - Tue Jun 27 19:50:34 IST 2017
Received message - Hello at - Tue Jun 27 19:50:34 IST 2017






No comments:

Post a Comment