Below example configures the messaging queue between the sender and the listener. In the example, the scheduler sends the message to the queue every 3 seconds which will be pulled by the Listener class.
The implementation is in the Spring Boot.
AMQP is a protocol that defines communication between systems.
Project Setup
1) pom.xml - Defines the entry for Spring Boot AMPQ library
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.4.RELEASE</version> </dependency>
2) Sender.java - This class uses a scheduler implementation that pushes the message into the queue every 3 seconds. RabbitTemplate class helps sending the message to the Listener. It pushes the message into the queue.
package com.accenture.springmessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @Service public class Sender { @Autowired private final RabbitTemplate rabbitTemplate; @Autowired public StoryEnsRequestSender(final RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Scheduled(fixedDelay = 3000L) public void sendMessage() { String message = "Hello"; System.out.println("Send Message - " + message); rabbitTemplate.convertAndSend(App.EXCHANGE_NAME, App.ROUTING_KEY, message); } }
3) Listener.java - This class uses the @RabbitListener annotation that fetches the message from the queue. The queue is identified using the queue names attribute.
package com.accenture.springmessage; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class Listener { @RabbitListener(queues = App.QUEUE_SPECIFIC_NAME) public void receiveMessage(final String customMessage) { System.out.println("Received message - " + customMessage.toString()); } }
4) App.java - Configuration class to define the queue name, exchange name and routing key. Class must implement RabbitListenerConfigurer and below annotations are used @SpringBootApplication, @EnableScheduling, @EnableRabbit.
Enable Rabbit annnotation is used to enable the rabbit configuration.
Enable Scheduling pushes the message in the queue every 3 seconds.
package com.accenture.springmessage; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.scheduling.annotation.EnableScheduling; /** * Hello world! * */ @SpringBootApplication @EnableScheduling @EnableRabbit public class App implements RabbitListenerConfigurer { public static void main(String[] args) throws InterruptedException { SpringApplication.run(App.class, args); } public static final String EXCHANGE_NAME = "appExchange"; public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue"; public static final String ROUTING_KEY = "messages.key"; @Bean public TopicExchange appExchange() { return new TopicExchange(EXCHANGE_NAME); } @Bean public Queue appQueueSpecific() { return new Queue(QUEUE_SPECIFIC_NAME); } @Bean public Binding declareBindingSpecific() { return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY); } @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(producerJackson2MessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } @Bean public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(consumerJackson2MessageConverter()); return factory; } @Override public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } }
5) Output
Send Message - Hello at - Tue Jun 27 19:50:22 IST 2017 Received message - Hello at - Tue Jun 27 19:50:22 IST 2017 Send Message - Hello at - Tue Jun 27 19:50:25 IST 2017 Received message - Hello at - Tue Jun 27 19:50:25 IST 2017 Send Message - Hello at - Tue Jun 27 19:50:28 IST 2017 Received message - Hello at - Tue Jun 27 19:50:28 IST 2017 Send Message - Hello at - Tue Jun 27 19:50:31 IST 2017 Received message - Hello at - Tue Jun 27 19:50:31 IST 2017 Send Message - Hello at - Tue Jun 27 19:50:34 IST 2017 Received message - Hello at - Tue Jun 27 19:50:34 IST 2017
No comments:
Post a Comment