Producer Consumer in Java using BlockingQueue - Java @ Desk

Friday, February 21, 2014

Producer Consumer in Java using BlockingQueue

Producer Consumer in Java using BlockingQueue

What is Producer Consumer in Java?
Producer - A producer in Java is a thread that runs independently that produces something that is going to be consumed by a single Consumer or a group of Consumers.
Consumer - A Consumer in Java is a thread that runs independently and consumes something produced by a single producer or a group of Producers.

Producer/Consumer implmentation is required in java when instead of running a complete process in a single thread, the division takes place to run a process in order to speed up the process.

Consider a banking application scenario, where the requirement is to notify all the customers whose balance falls below 5000. Consider below tables :
1) <ACCOUNT> - It holds all the account numbers & balance
2) <ACCOUNT_INFO> - It holds all the account numbers in which withdrawal/deposit took place.

As soon as financial transaction takes place a job run which picks up the account number from 1st table and inserts into second table.
Now, there is a single thread which runs indefinetly in a while(true) loop which scans the table and as soon as an update is found follows the process :
1) Pick up the account
2) Check balance < 5000
3) If true notify the user either through email or SMS

Now all the three steps runs in a single thread. Why not seperate the above process using producer consumer model as shown below :
Producer Thread scans the table and loads the batch of 1000 accounts
Consumer pick up the account from Producer and checks for balance and notify the user
Both these threads can run independently. This can be achieved using the java.util.concurrent.BlockingQueue class.

BlockingQueue
Insert put(e)
Remove take()

The advantage of BlockingQueue implementations is it interally performs wait and notify operations. There is no need of synchronize block and wait/notify.
BlockingQueue may be capacity bounded as BlockingQueue blockingQueue = new ArrayBlockingQueue(10);

In above scenario as soon as, blockingQueue size reached 10 it wait for atleast one to be consumed. Also, if it becomes empty, it waits for the producer to produce atleast 1.
put(e) - It adds the object in the queue. As soon a queue becomes full, it waits for take() to consume atlease one object. As soon as take() is called on the blockingQueue, it notified the queue to add more.
take() - It waits for the put(e) to add atlease one object in the queue. As soon as an object is added put(e) notifies to start consuming.
So, as shown above there is no need to perform additional wait(), notify() checks in case of BlockingQueues.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.

Sample Implementation:
This is the sample implementation for email marketing, where the producer will load the email id and put in the blocking quere whereas the consumer will consume the email id and send the email to the user.



EmailProducer.java
package com.blockingqueue;

import java.util.concurrent.BlockingQueue;

public class EmailProducer implements Runnable {

 private BlockingQueue<ProducerConsumerBean> blockingQueue;

 public EmailProducer(BlockingQueue<ProducerConsumerBean> blockingQueue) {
  super();
  this.blockingQueue = blockingQueue;
 }

 @Override
 public void run() {
  while(true) {
   try {
    ProducerConsumerBean producerConsumerBean = new ProducerConsumerBean();
    producerConsumerBean.setEmailId("EmailId");
    this.blockingQueue.put(producerConsumerBean);
    System.out.println("Producer Produced Email Id");
    Thread.sleep(100);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
}

EmailConsumer.java
package com.blockingqueue;

import java.util.concurrent.BlockingQueue;

public class EmailConsumer implements Runnable {
 private BlockingQueue<ProducerConsumerBean> blockingQueue;

 public EmailConsumer(BlockingQueue<ProducerConsumerBean> blockingQueue) {
  super();
  this.blockingQueue = blockingQueue;
 }

 @Override
 public void run() {
  while(true) {
   try {
    System.out.println("Consumer Started Sending Email to - " + this.blockingQueue.take().getEmailId());
    Thread.sleep(100);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }  
 }

}

ProducerConsumerBean.java
package com.blockingqueue;

public class ProducerConsumerBean {

 private String emailId;

 public String getEmailId() {
  return emailId;
 }

 public void setEmailId(String emailId) {
  this.emailId = emailId;
 }
}

BulkEmailMarketing.java
package com.blockingqueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BulkEmailMarketing {

 public static void main(String args[]) {
  BlockingQueue<ProducerConsumerBean> blockingQueue = new ArrayBlockingQueue<ProducerConsumerBean>(10);
  
  EmailProducer producer = new EmailProducer(blockingQueue);
  EmailConsumer consumer = new EmailConsumer(blockingQueue);
  
  new Thread(producer).start();
  new Thread(consumer).start();
 }
}

Output
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
....and so on

To download source, click here






No comments:

Post a Comment