SynchronousQueue Example in Java - Producer Consumer Model - Java @ Desk

Tuesday, January 6, 2015

SynchronousQueue Example in Java - Producer Consumer Model

SynchronousQueue Example in Java - Producer Consumer Model

In one of our previous posts, we learnt Producer Consumer in Java using BlockingQueue and Producer Consumer Using PriorityBlockingQueue where the main characteristic of a Producer and Consumer was it comes up with the size of the queue.

In above implementation, the queue gets initialized with the size attribute. The size attribute of the queue makes sure "Producer put an element in the queue only if the current size of the queue < Size Attribute defined while initialization".

For example, if the queue is initialized as :
BlockingQueue<ProducerConsumerBean> blockingQueue = new ArrayBlockingQueue<ProducerConsumerBean>(10);

then,
if(blockingQueue.size() == 10)
, put() enters into wait state. Once consumer starts taking the element out using take() method (blockingQueue.take()), it notifies the producer to producer to put further elements. In above example, Max elements the producer can put before entering in WAIT state is 10.
As stated, put() and take() methods run independently unless the size reaches 0 or Maximum.

On the other hand, SynchronousQueue works in a similar fashion with following major differences:
1) The size of SynchronousQueue is 0
2) put() method will only insert an element if take() method will be able to fetch that element from the queue at the same moment i.e. an element cannot be inserted if the consumer take() call is going to take some time to consume it.

SynchronousQueue - Insert only when someone will recieve it at that moment itself.

ProducerConsumerBean.java
package com.synchronousqueue;

public class ProducerConsumerBean {

 private String emailId;

 public String getEmailId() {
  return emailId;
 }

 public void setEmailId(String emailId) {
  this.emailId = emailId;
 }
}


EmailProducer.java
package com.synchronousqueue;

import java.util.concurrent.SynchronousQueue;

public class EmailProducer implements Runnable {

 private SynchronousQueue<ProducerConsumerBean> synchronousQueue;

 public EmailProducer(SynchronousQueue<ProducerConsumerBean> synchronousQueue) {
  super();
  this.synchronousQueue = synchronousQueue;
 }

 @Override
 public void run() {
  while(true) {
   try {
    ProducerConsumerBean producerConsumerBean = new ProducerConsumerBean();
    producerConsumerBean.setEmailId("EmailId");
    this.synchronousQueue.put(producerConsumerBean);
    System.out.println("Producer Produced Email Id");
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
}


EmailConsumer.java
package com.synchronousqueue;

import java.util.concurrent.SynchronousQueue;

public class EmailConsumer implements Runnable {
 private SynchronousQueue<ProducerConsumerBean> synchronousQueue;

 public EmailConsumer(SynchronousQueue<ProducerConsumerBean> synchronousQueue) {
  super();
  this.synchronousQueue = synchronousQueue;
 }

 @Override
 public void run() {
  while (true) {
   try {
    //Thread.sleep(10000);
    System.out.println("Consumer Started Sending Email to - "
      + this.synchronousQueue.take().getEmailId());
    //Thread.sleep(10000);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }

}




BulkEmailMarketing.java
package com.synchronousqueue;

import java.util.concurrent.SynchronousQueue;

public class BulkEmailMarketing {

 public static void main(String args[]) throws InterruptedException {
  SynchronousQueue<ProducerConsumerBean> synchronousQueue = new SynchronousQueue<ProducerConsumerBean>();

  EmailProducer producer = new EmailProducer(synchronousQueue);
  EmailConsumer consumer = new EmailConsumer(synchronousQueue);
  
  new Thread(producer).start();
  new Thread(consumer).start();
  
 }
}


Run the above client file in following 2 cases

1) Uncomment 2nd Thread.sleep(10000); code in EmailConsumer.java and run it
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
will be generated as soon as you run. Now, since the Consumer enters into sleep state for 1000ms, Producer won't be able to put() an element. It will enter an element only when consumer reaches at a state to execute take() method again

2) Uncomment 1st Thread.sleep(10000); code in EmailConsumer.java and run it
Nothing is generated. Now, since the Consumer enters into sleep state for 1000ms, Producer won't be able to put() an element. It will enter an element only when consumer reaches at a state to execute take() method. So first output will be generated after 1000ms
Producer Produced Email Id
Consumer Started Sending Email to - EmailId

To download source, click here.





No comments:

Post a Comment