BlockingQueue and Sharing Data between two threads (Producer Consumer issue):
Sharing of Data between threads should be minimized if not prevented completely as this will open bugs like thread-safety.
However if required this can be done using shared object or shared data structures like Queue. One good API provided by Java
is concurrent collection BlockingQueue. Here we can easily share data without being bothered about thread safety and inter-thread
communication. BlockingQueue doesn't allow null to be stored in Queue, will throw NullPointerException.
There are many implementations of this Interface:
ArrayBlockingQueue (Most Used)
DelayQueue
LinkedBlockingQueue (Most Used)
PriorityBlockingQueue (Most Used)
SynchronousQueue
BlockingQueue is a unique collection type which not only store elements but also supports flow control by introducing
blocking if either BlockingQueue is full or empty. "take()" method of BlockingQueue will block if Queue is empty and
"put()" method of BlockingQueue will block if Queue is full. This property makes BlockingQueue an ideal choice for implementing
Producer consumer design pattern where one thread insert elements into BlockingQueue and other thread consumes it.
All queuing method uses concurrency control and internal locks to perform operation atomically. Since BlockingQueue also extend
Collection, bulk Collection operations like addAll(), containsAll() are not performed atomically until any BlockingQueue
implementation specifically supports it. So call to addAll() may fail after inserting couple of elements. BlockingQueue can
be bounded or unbounded. A bounded BlockingQueue is one which is initialized with initial capacity and call to put() will be
blocked if BlockingQueue is full and size is equal to capacity. This bounding nature makes it ideal to use a shared queue between
multiple threads like in most common Producer consumer solutions in Java. An unbounded Queue is one which is initialized without
capacity, actually by default it initialized with Integer.MAX_VALUE.
Most common example of BlockingQueue uses bounded BlockingQueue as shown below:
BlockingQueue bQueue = new ArrayBlockingQueue(2); //Size is 2
bQueue.put("Java");
System.out.println("Item 1 inserted into BlockingQueue");
bQueue.put("JDK");
System.out.println("Item 2 is inserted on BlockingQueue");
bQueue.put("J2SE"); //This insertion is not done. BlockingQueue will block here for further adding of items as size is only 2.
System.out.println("Done");
Output:
Item 1 inserted into BlockingQueue
Item 2 inserted on BlockingQueue
ArrayBlockingQueue and LinkedBlockingQueue are common implementation of BlockingQueue interface.
ArrayBlockingQueue is backed by array and Queue impose orders as FIFO. Head of the queue is the oldest element in terms of time and
tail of the queue is youngest element. ArrayBlockingQueue is also fixed size bounded buffer on the other hand LinkedBlockingQueue is
an optionally bounded queue built on top of Linked nodes. In terms of throughput LinkedBlockingQueue provides higher throughput than
ArrayBlockingQueue in Java.
-------------------------------------------------------------
Complete Example:
//BlockingQueueExample.java
package concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(2); //Size is 2.
ProducerNew producer = new ProducerNew(queue);
ConsumerNew consumer = new ConsumerNew(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(2000);
}
}
//ProducerNew.java
package concurrency;
import java.util.concurrent.BlockingQueue;
public class ProducerNew implements Runnable{
protected BlockingQueue queue = null; //This declaration can be avoided if ProducerNew extends BlockingQueueExample
public ProducerNew(BlockingQueue queue) { //This can be avoided if ProducerNew extends BlockingQueueExample
this.queue = queue;
}
public void run() {
try {
System.out.println("Inserted: 1");
queue.put("1");
//queue.put(null); //Null Pointer Exception
Thread.sleep(500);
System.out.println("Inserted: 2");
queue.put("2");
Thread.sleep(500);
System.out.println("Inserted: 3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//ConsumerNew.java
package concurrency;
import java.util.concurrent.BlockingQueue;
public class ConsumerNew implements Runnable{
protected BlockingQueue queue = null; //This declaration can be avoided if ConsumerNew extends BlockingQueueExample
public ConsumerNew(BlockingQueue queue) { //This can be avoided if ConsumerNew extends BlockingQueueExample
this.queue = queue;
}
public void run() {
try {
System.out.println("Consumed: "+queue.take());
System.out.println("Consumed: "+queue.take());
System.out.println("Consumed: "+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------
Producer Consumer Using Executor Service and BlockingQueue:
//ProducerConsumer_ExecutorService.java
package concurrency;
public class ProducerConsumer_ExecutorService {
ExecutorServiceThreadPool ex; //This Class is defined below.
public static void main(String[] args) {
ProducerConsumer_ExecutorService prodconsumer = new ProducerConsumer_ExecutorService();
prodconsumer.init();
}
private void init() {
ex = new ExecutorServiceThreadPool();
for(int i = 0; i < 10; i++){
ex.addThread(new Producer(i));
//ex.addThread(new Producer(i)); //Adding more Producer, Once Queue is full, it will be blocked for more insertions.
ex.addThread(new Consumer());
}
ex.finish();
}
private class Producer implements Runnable {
int data;
public Producer(int datatoput) {
data = datatoput;
}
@Override
public void run() {
System.out.println("Inserting Element " + data);
try {
ex.queue.put(data);
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private class Consumer implements Runnable {
int datatake;
@Override
public void run() {
try {
datatake = ex.queue.take();
System.out.println("Fetching Element " + datatake);
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//ExecutorServiceThreadPool.java
package concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ExecutorServiceThreadPool {
final BlockingQueue queue = new ArrayBlockingQueue(10);
ExecutorService executor = Executors.newFixedThreadPool(10);
public void addThread(Runnable r){
Future f = executor.submit(r);
try {
System.out.println("Status: "+f.get()); //null means successful.
}catch(Exception e) {
e.printStackTrace();
}
}
public void finish(){
try {
executor.shutdown();
executor.awaitTermination(50, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorServiceThreadPool.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("Finished all threads");
}
}
------------------------------------------------------------------
Wednesday, February 1, 2017
BlockingQueue and Producer Consumer Problem
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment