Wednesday, February 1, 2017

BlockingQueue and Producer Consumer Problem

BlockingQueue and Sharing Data between two threads (Producer Consumer issue):
  Sharing of Data between threads should be minimized if not prevented completely as this will open bugs like thread-safety.
  However if required this can be done using shared object or shared data structures like Queue. One good API provided by Java
  is concurrent collection BlockingQueue. Here we can easily share data without being bothered about thread safety and inter-thread 
  communication. BlockingQueue doesn't allow null to be stored in Queue, will throw NullPointerException.
  There are many implementations of this Interface: 
 ArrayBlockingQueue  (Most Used)
 DelayQueue
 LinkedBlockingQueue  (Most Used)
 PriorityBlockingQueue  (Most Used)
 SynchronousQueue
 
  BlockingQueue is a unique collection type which not only store elements but also supports flow control by introducing 
  blocking if either BlockingQueue is full or empty. "take()" method of BlockingQueue will block if Queue is empty and 
  "put()" method of BlockingQueue will block if Queue is full. This property makes BlockingQueue an ideal choice for implementing 
  Producer consumer design pattern where one thread insert elements into BlockingQueue and other thread consumes it. 
  
  All queuing method uses concurrency control and internal locks to perform operation atomically. Since BlockingQueue also extend 
  Collection, bulk Collection operations like addAll(), containsAll() are not performed atomically until any BlockingQueue 
  implementation specifically supports it. So call to addAll() may fail after inserting couple of elements. BlockingQueue can 
  be bounded or unbounded. A bounded BlockingQueue is one which is initialized with initial capacity and call to put() will be 
  blocked if BlockingQueue is full and size is equal to capacity. This bounding nature makes it ideal to use a shared queue between 
  multiple threads like in most common Producer consumer solutions in Java. An unbounded Queue is one which is initialized without 
  capacity, actually by default it initialized with Integer.MAX_VALUE. 
  Most common example of BlockingQueue uses bounded BlockingQueue as shown below:

 BlockingQueue bQueue = new ArrayBlockingQueue(2);  //Size is 2
 bQueue.put("Java");
 System.out.println("Item 1 inserted into BlockingQueue");
 bQueue.put("JDK");
 System.out.println("Item 2 is inserted on BlockingQueue");
 bQueue.put("J2SE");    //This insertion is not done. BlockingQueue will block here for further adding of items as size is only 2.
 System.out.println("Done");

 Output:
 Item 1 inserted into BlockingQueue
 Item 2 inserted on BlockingQueue


ArrayBlockingQueue and LinkedBlockingQueue are common implementation of BlockingQueue interface. 
ArrayBlockingQueue is backed by array and Queue impose orders as FIFO. Head of the queue is the oldest element in terms of time and 
tail of the queue is youngest element. ArrayBlockingQueue is also fixed size bounded buffer on the other hand LinkedBlockingQueue is 
an optionally bounded queue built on top of Linked nodes. In terms of throughput LinkedBlockingQueue provides higher throughput than 
ArrayBlockingQueue in Java.
-------------------------------------------------------------
Complete Example:
//BlockingQueueExample.java
package concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
 public static void main(String[] args) throws Exception {
        BlockingQueue queue = new ArrayBlockingQueue(2); //Size is 2.
        ProducerNew producer = new ProducerNew(queue);
        ConsumerNew consumer = new ConsumerNew(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(2000);
    }
}

//ProducerNew.java
package concurrency;
import java.util.concurrent.BlockingQueue;
public class ProducerNew implements Runnable{
    protected BlockingQueue queue = null;    //This declaration can be avoided if ProducerNew extends BlockingQueueExample
    public ProducerNew(BlockingQueue queue) { //This can be avoided if ProducerNew extends BlockingQueueExample
        this.queue = queue;
    }
    public void run() {
        try {
         System.out.println("Inserted: 1");
            queue.put("1");
            //queue.put(null);  //Null Pointer Exception
            Thread.sleep(500);
            System.out.println("Inserted: 2");
            queue.put("2");
            Thread.sleep(500);
            System.out.println("Inserted: 3");
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//ConsumerNew.java
package concurrency;
import java.util.concurrent.BlockingQueue;
public class ConsumerNew implements Runnable{
    protected BlockingQueue queue = null;      //This declaration can be avoided if ConsumerNew extends BlockingQueueExample
    public ConsumerNew(BlockingQueue queue) {  //This can be avoided if ConsumerNew extends BlockingQueueExample
        this.queue = queue;
    }
    public void run() {
        try {
            System.out.println("Consumed: "+queue.take());
            System.out.println("Consumed: "+queue.take());
            System.out.println("Consumed: "+queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
----------------------------------------------------------------------------------
Producer Consumer Using Executor Service and BlockingQueue:

//ProducerConsumer_ExecutorService.java
package concurrency;
public class ProducerConsumer_ExecutorService {
 ExecutorServiceThreadPool ex;  //This Class is defined below.
 public static void main(String[] args) {
  ProducerConsumer_ExecutorService prodconsumer = new ProducerConsumer_ExecutorService();
  prodconsumer.init();
 }
 private void init() {
  ex = new ExecutorServiceThreadPool();
  for(int i = 0; i < 10; i++){
   ex.addThread(new Producer(i));   
   //ex.addThread(new Producer(i)); //Adding more Producer, Once Queue is full, it will be blocked for more insertions.
   ex.addThread(new Consumer());
  }
  ex.finish();
 }
 
 private class Producer implements Runnable {
  int data;
  public Producer(int datatoput) {
   data = datatoput;
  }
  @Override
  public void run() {         
   System.out.println("Inserting Element " + data);
   try {
    ex.queue.put(data);
    Thread.sleep(100);
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
 
 private class Consumer implements Runnable {
  int datatake;         
  @Override
  public void run() {                                 
   try {
    datatake = ex.queue.take();
    System.out.println("Fetching Element " + datatake);
    Thread.sleep(100);
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
}

//ExecutorServiceThreadPool.java
package concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ExecutorServiceThreadPool {
 final BlockingQueue queue = new ArrayBlockingQueue(10); 
 ExecutorService executor = Executors.newFixedThreadPool(10); 
 
 public void addThread(Runnable r){  
  Future f = executor.submit(r);
  try {
   System.out.println("Status: "+f.get());  //null means successful.
  }catch(Exception e) {
   e.printStackTrace();
  }
 }
 public void finish(){
  try {
   executor.shutdown();
   executor.awaitTermination(50, TimeUnit.SECONDS);
  } catch (InterruptedException ex) {
   Logger.getLogger(ExecutorServiceThreadPool.class.getName()).log(Level.SEVERE, null, ex);
  }      
  System.out.println("Finished all threads");
 }
}
------------------------------------------------------------------

No comments:

Post a Comment