Tuesday, February 14, 2017

Executor Framework in Java

Issues with Multithreading design:
Before java 1.5, Multithreading applications were created using Thread group, Thread Pool or Custom Thread Pool. 
Here entire thread management was the responsibility of the programmer which are below:
    Thread synchronization
    Thread waiting
    Thread joining
    Thread locking
    Thread notification
    Handling deadlock

Thread behaviors are dependent on the environment where the application is deployed and running. So the same application 
might behave in different way on different deployment environment based on the Processor speed, the RAM size, the bandwidth 
etc. All have a direct impact on the multithreading application. 

What is Executor Framework:
Executors framework (java.util.concurrent.Executor) is used for running the Runnable objects without creating new threads 
every time and mostly re-using the already created threads. This provides multi-threading applications an easy abstraction layer. 
The executor abstraction layer hides the critical parts of concurrent execution and the programmer only concentrates on the 
business logic implementation. 

In java executor framework all parallel works are considered as tasks instead of simple threads. So the application now deals 
with instances of Runnable (basically collections of tasks or parallel works) and then it is passed to an Executor to process. 
The ExecutorService interface extends the simplistic Executor interface. The ExecutorService interface represents an asynchronous 
execution mechanism which is capable of executing tasks in the background. An ExecutorService is thus very similar to a Thread pool. 

ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");

ExecutorService executorService1 = Executors.newSingleThreadExecutor(); //Single thread to execute commands    
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(20);
ExecutorService executorService3 = Executors.newCachedThreadPool();

The newFixedThreadPool(int): returns a ThreadPoolExecutor instance with an initialized and unbounded queue and a 
        fixed number of threads. Here no extra thread is created during execution than the set value. So if there 
        is no free thread available the task has to wait and then execute when one thread is free.
The newCachedThreadPool():  returns a ThreadPoolExecutor instance initialized with an unbounded queue and unbounded 
        number of threads. Here existing threads are reused if available. But if no free thread is available, a new one 
        is created and added to the pool to complete the new task. Threads that have been idle for longer than a timeout period 
        will be removed automatically from the pool.

Different methods to delegate tasks for execution to an ExecutorService:

Q. Difference between "Executors.newSingleThreadExecutor().execute(command) and "new Thread(command).start()";    
A. Once you have an Executor instance, you can submit multiple tasks to it, and have them executed one after another. 
   You can't do that simply with a raw Thread.
Executor framework creates tasks by using instances of Runnable or Callable. In case of Runnable, the run() method does not 
return a value or throw any checked exception. But Callable is a more functional version in that area. It defines a call() 
method that allows the return type as Object. This can be used in future processing and it also throws an exception if necessary.

The FutureTask class is another important component which is used to get future information about the processing. An 
instance of this class can wrap either a Callable or a Runnable. You can get an instance of this as the return value of 
submit() method of an ExecutorService. You can also manually wrap your task in a FutureTask before calling execute() method.

Apart from above Executors, here are the functional steps to implement the Java ThreadPoolExecutor:
    A pool of multiple threads is created.
    A queue is created holding all the tasks but these tasks are not yet assigned to threads from the pool.
    Rejection handler is used to handle the situation when one or more tasks are not able to assign in the queue. 
    As per the default rejection policy, it will simply throw a RejectedExecutionException, a runtime exception, and the 
        application can catch it or discard it.

Creating Executors:    
Executor is an interface having only "public abstract void execute(java.lang.Runnable)" method. Used to submit a new task.

ExecutorService is a sub-interface of Executor. It has other methods like "shutdown(), shutdownNow(), isTerminated(),
    Future submit(Callable), Future submit(Runnable, Object), Future submit(Runnable)" etc.    
    Callable<String> myCommand2 = ...
    ExecutorService executorService = ... // Build an executorService
    //submit Accepts also a Callable
    Future<String> resultFromMyCommand2 = executorService.submit(myCommand2);   
    //Will wait for myCommand1 and myCommand2 termination
    Runnable myCommand3 = ...;
    //Will throw a RejectedExecutionException because no new task can be submitted
ScheduledExecutorService is a sub-interface of ExecutorService and has "schedule(), scheduleAtFixedRate(), 
    scheduleWithFixedDelay()" methods. Used to execute commands periodically or after a given delay.
    ScheduledExecutorService executor = ...;
    Runnable command1 = ...;
    Runnable command2 = ...;
    Runnable command3 = ...;
    //Will start command1 after 50 seconds
    executor.schedule(command1, 50L, TimeUnit.SECONDS);
    //Will start command 2 after 20 seconds, 25 seconds, 30 seconds ...
    executor.scheduleAtFixedRate(command2, 20L, 5L, TimeUnit.SECONDS);
    //Will start command 3 after 10 seconds and if command3 takes 2 seconds to be executed also after 17, 24, 31, 38 seconds...
    executor.scheduleWithFixedDelay(command3, 10L, 5L, TimeUnit.SECONDS);

Executors is a Class and it has number of static factory methods to create an ExecutorService and 
    ScheduledExecutorService objects depending upon the requirement of the application. 
        ExecutorService ex3 = Executors.newSingleThreadExecutor();
        Future future = ex3.submit(new Callable(){
            public Object call() {
                for(int i=20;i<=23;i++)
                    System.out.println("Asynchronous Callable: "+i);
                return "My Result";
        try {
            System.out.println("Callable: "+future.get());
        } catch (Exception e) {
        System.out.println("All Executors are Shutdown...");        
        if(!ex3.isTerminated()) //Recheck if not shut down.
ThreadPool Executor:
    private static final Executor executor = new ThreadPoolExecutor(6, 12, 5000L, TimeUnit.MILLISECONDS, 
                                             new LinkedBlockingQueue<Runnable>(250));
    The parameter values depend upon the application need. Here the core pool is having 6 threads which can run concurrently 
    and the maximum number is 12. The queue is capable of keeping 250 tasks. Here one point should be remembered that the pool 
    size should be kept on a higher side to accommodate all tasks. The idle time limit is kept as 5 ms.

Submit the task to the Executor: After creating the ExecutorService and proposed tasks, we need to submit the task to the 
executor by using either submit() or execute() method. Now as per our configuration the tasks will be picked up from the queue 
and run concurrently. For example if you have configured 5 concurrent executions, then 5 tasks will be picked up from the queue 
and run in parallel. This process will continue till all the tasks are finished from the queue.

Execute the task: Next the actual execution of the tasks will be managed by the framework. The Executor is responsible for 
managing the task’s execution, thread pool, synchronization and queue. If the pool has less than its configured number of 
minimum threads, new threads will be created as per requirement to handle queued tasks until that limit is reached. If the 
number is higher than the configured minimum, then the pool will not start any more threads. Instead, the task is queued 
until a thread is freed up to process the request. 

And Finally Shutdown the Executor: The termination is executed by invoking its shutdown() method or shutdownNow(). 
You can choose to terminate it gracefully, or abruptly.

Some Theories taken from : http://mrbool.com/working-with-java-executor-framework-in-multithreaded-application/27560

Working Example:
package concurrency;
public class MyRunnable implements Runnable {
    public void run(){
        System.out.println("Run method");
//Sample ThreadPool Class to understand Executor Framework
package concurrency;
public class ThreadPool {
    public static void main(String ar[]){
        Thread worker[] = new Thread[3];
        Runnable r = new MyRunnable();
        System.out.println("Running ThreadPool Task..");
        for(int i=0;i<worker.length;i++){
            worker[i]=new Thread(r);
        for(int i=0;i<worker.length;i++){
            try {
            } catch (InterruptedException e) {
//Above ThreadPool code is same as below Executor Framework
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorTask {
    public static void main(String[] args) {
        int poolSize = 3;
        int jobCount = 3;
        Runnable r = new MyRunnable();
        System.out.println("Running Executor Task..");        
        ExecutorService ex = Executors.newFixedThreadPool(poolSize);
        for(int i=0;i<jobCount;i++){
        System.out.println("Running Executor Task for Callable..");
        List<Future> list = new ArrayList<Future>();
        //ExecutorService executor = Executors.newFixedThreadPool(poolSize);
        ExecutorService executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(15));
                //CorePoolSize, MaxPoolSize, Alive Timeout (0 means lifetime), BlockingQueue

        for(int i=0;i<jobCount;i++){
            list.add(executor.submit(new MyCallable()));
        try {
            for(Future f: list)
                System.out.println("Future Returned get: "+f.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
package concurrency;
import java.util.concurrent.Callable;
public class MyCallable implements Callable {
    public Object call(){
        System.out.println("Call method..");
        return "Server msg is Hi";

No comments:

Post a Comment