What is Producer Consumer in Java?
Producer - A producer in Java is a thread that runs independently that produces something that is going to be consumed by a single Consumer or a group of Consumers.
Consumer - A Consumer in Java is a thread that runs independently and consumes something produced by a single producer or a group of Producers.
Producer/Consumer implmentation is required in java when instead of running a complete process in a single thread, the division takes place to run a process in order to speed up the process.
Consider a banking application scenario, where the requirement is to notify all the customers whose balance falls below 5000. Consider below tables :
1) <ACCOUNT> - It holds all the account numbers & balance
2) <ACCOUNT_INFO> - It holds all the account numbers in which withdrawal/deposit took place.
As soon as financial transaction takes place a job run which picks up the account number from 1st table and inserts into second table.
Now, there is a single thread which runs indefinetly in a while(true) loop which scans the
1) Pick up the account
2) Check balance < 5000
3) If true notify the user either through email or SMS
Now all the three steps runs in a single thread. Why not seperate the above process using producer consumer model as shown below :
Producer Thread scans the
Consumer pick up the account from Producer and checks for balance and notify the user
Both these threads can run independently. This can be achieved using the java.util.concurrent.BlockingQueue class.
BlockingQueue | |
Insert | put(e) |
Remove | take() |
The advantage of BlockingQueue implementations is it interally performs wait and notify operations. There is no need of synchronize block and wait/notify.
BlockingQueue may be capacity bounded as BlockingQueue
In above scenario as soon as, blockingQueue size reached 10 it wait for atleast one to be consumed. Also, if it becomes empty, it waits for the producer to produce atleast 1.
put(e) - It adds the object in the queue. As soon a queue becomes full, it waits for take() to consume atlease one object. As soon as take() is called on the blockingQueue, it notified the queue to add more.
take() - It waits for the put(e) to add atlease one object in the queue. As soon as an object is added put(e) notifies to start consuming.
So, as shown above there is no need to perform additional wait(), notify() checks in case of BlockingQueues.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
Sample Implementation:
This is the sample implementation for email marketing, where the producer will load the email id and put in the blocking quere whereas the consumer will consume the email id and send the email to the user.
EmailProducer.java
package com.blockingqueue; import java.util.concurrent.BlockingQueue; public class EmailProducer implements Runnable { private BlockingQueue<ProducerConsumerBean> blockingQueue; public EmailProducer(BlockingQueue<ProducerConsumerBean> blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { while(true) { try { ProducerConsumerBean producerConsumerBean = new ProducerConsumerBean(); producerConsumerBean.setEmailId("EmailId"); this.blockingQueue.put(producerConsumerBean); System.out.println("Producer Produced Email Id"); Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
EmailConsumer.java
package com.blockingqueue; import java.util.concurrent.BlockingQueue; public class EmailConsumer implements Runnable { private BlockingQueue<ProducerConsumerBean> blockingQueue; public EmailConsumer(BlockingQueue<ProducerConsumerBean> blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { while(true) { try { System.out.println("Consumer Started Sending Email to - " + this.blockingQueue.take().getEmailId()); Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
ProducerConsumerBean.java
package com.blockingqueue; public class ProducerConsumerBean { private String emailId; public String getEmailId() { return emailId; } public void setEmailId(String emailId) { this.emailId = emailId; } }
BulkEmailMarketing.java
package com.blockingqueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BulkEmailMarketing { public static void main(String args[]) { BlockingQueue<ProducerConsumerBean> blockingQueue = new ArrayBlockingQueue<ProducerConsumerBean>(10); EmailProducer producer = new EmailProducer(blockingQueue); EmailConsumer consumer = new EmailConsumer(blockingQueue); new Thread(producer).start(); new Thread(consumer).start(); } }
Output
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
Producer Produced Email Id
Consumer Started Sending Email to - EmailId
....and so on
To download source, click here
No comments:
Post a Comment