Tuesday 30 January 2018

Multithreading | CountDownLatch in Java

CountDownLatch is a kind of synchronizer which is used to make sure that a thread will wait until another set of threads completes their tasks. A very good example of CountDownLatch is the server where the main task can only start when all the required services have started.


A CountDownLatch is initialized with a given count and the count is decremented by calls to the countDown() method. When the count reaches to zero, it means all threads have completed their execution and thread which is waiting on latch resume the execution. await() method blocks the execution of the current thread until the count reaches zero.

This is a one-shot phenomenon i.e. the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

A simple example of CounDownLatch in Java
import java.util.concurrent.CountDownLatch;

/**
 * Worker thread class.
 * @author rajesh.kumar2
 */
class MyWorker extends Thread {

    private int delayTime;
    private CountDownLatch cdLatch;

    public MyWorker(int delay, CountDownLatch latch, String name) {
        super(name);
        this.delayTime = delay;
        this.cdLatch = latch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(delayTime);
            cdLatch.countDown();
            System.out.println(Thread.currentThread().getName() + " finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
 * Tests the countdown latch functionality.
 * @author rajesh.kumar2
 */
public class TestCountDownLatch {

    public static void main(String args[]) throws InterruptedException {
        /** CountDownLatch of size three which enable to wait main thread to complete the execution of 3 threads. */
        CountDownLatch cdLatch = new CountDownLatch(3);

        /** Creating three worker threads. */
        MyWorker first = new MyWorker(2000, cdLatch, "Worker t1");
        MyWorker second = new MyWorker(6000, cdLatch, "Worker t2");
        MyWorker third = new MyWorker(4000, cdLatch, "Worker t3");

        /** Starting three worker threads. */
        first.start();
        second.start();
        third.start();
       
        /** Main thread will wait the complete the execution of all 3 Worker threads. */
        cdLatch.await();

        System.out.println("Main thread has been started the execution ");
        System.out.println(Thread.currentThread().getName() + " has completed execution");
    }
}

Example, In which is mocking the Application Servers using countdown latch
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


abstract class IVerifyService implements Runnable {

    private CountDownLatch cdLatch;
    private String serviceName;
    private boolean isServiceUp;

    public IVerifyService(String serviceName, CountDownLatch latch) {
        super();
        this.cdLatch = latch;
        this.serviceName = serviceName;
        this.isServiceUp = false;
    }

    @Override
    public void run() {
        try {
            verifyService();
            isServiceUp = true;
        } catch (Exception e) {
            e.printStackTrace(System.err);
            isServiceUp = false;
        } finally {
            /** After completing the task, calling the countDown() the latch. */
            if (cdLatch != null) {
                cdLatch.countDown();
            }
        }
    }

    public String getServiceName() {
        return serviceName;
    }

    public boolean isServiceUp() {
        return isServiceUp;
    }

    /** abstract method needs to be implemented by all service verifiers */
    public abstract void verifyService() throws InterruptedException;
}

class VerifyDatabaseService extends IVerifyService {

    public VerifyDatabaseService(CountDownLatch latch) {
        super("Database", latch);
    }

    @Override
    public void verifyService() throws InterruptedException {
        System.out.println("Verifying the " + this.getServiceName());
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            throw e;
        }
        System.out.println(this.getServiceName() + " is up and running ");
    }
}


class VerifyNetworkService extends IVerifyService {

    public VerifyNetworkService(CountDownLatch latch) {
        super("Network", latch);
    }

    @Override
    public void verifyService() throws InterruptedException {
        System.out.println("Verifying the " + this.getServiceName());
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            throw e;
        }
        System.out.println(this.getServiceName() + " is up and running ");
    }
}

class ApplicationStartupUtil {

    /** List of service verifiers. */
    private static List<IVerifyService> services = new ArrayList<IVerifyService>();

    /** This latch is used to wait on. */
    private static CountDownLatch cdLatch;

    private ApplicationStartupUtil() {
       
    }

    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();

    public static ApplicationStartupUtil getInstance() {
        return INSTANCE;
    }

    public static boolean checkExternalServices() throws InterruptedException {
       
        /** Initialize the latch with number of service verifiers. */
        cdLatch = new CountDownLatch(2);

        /** Adding checker to lists */
        services.add(new VerifyDatabaseService(cdLatch));
        services.add(new VerifyNetworkService(cdLatch));


        /** Start service checkers using executor framework. */
        Executor executor = Executors.newFixedThreadPool(services.size());

        for (final IVerifyService service: services) {
            executor.execute(service);
        }

        /** Wait until all services are checked. */
        cdLatch.await();

        /** Check whether all required services are up and running. */
        for (final IVerifyService service: services) {
            if (!service.isServiceUp()) {
                return false;
            }
        }
        return true;
    }
}

public class CountDownLatchDemo {

    public static void main(String[] args) {
        boolean isServicesUp = false;
        try {
            isServicesUp = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
        String validationRes = (isServicesUp ? "" : " not ") + "up";
        System.out.println("All Services are " + validationRes );
    }
}

Usages of CountDownLatch in java
1. Achieving Maximum Parallelism
Sometimes we want to start a number of threads at the same time to achieve maximum parallelism. For example, we want to test a class for being a singleton. This can be done easily if we create a CountDownLatch with initial count 1, and make wait for all threads to wait for the latch. A single call to countDown() method will resume execution for all waiting threads at the same time.

2. Wait for N threads to complete before start execution
For example, an application start-up class wants to ensure that all N external systems are UP and running before handling the user requests.

3. Deadlock detection
A very handy use case in which you can use N threads to access a shared resource with the different number of threads in each test phase, and try to create a deadlock.

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...