Tuesday 30 January 2018

Multithreading | Deadlock using CountDownLatch

According to the functionality of Countdown Latch, await() method blocks the execution of the current thread until the count reaches zero. Here CountDownLatch initialized as 10, and there are 4 threads which are used to reduce the count of CountDownLatch and await () method will wait infinitely to get 0 as CountDownLatch value. 4 threads are able to reduce the count by 4 from 10. So count will never reach zero and execution of main thread keep waiting for infinite time.

Line below the await() method will wait infinitely to decrease the countdown value.

package com.threads.countdown;
import java.util.concurrent.CountDownLatch;

class StopLatchedThread extends Thread {

    private final CountDownLatch stopLatch;
    private int threadId;

    public StopLatchedThread(CountDownLatch stopLatch, int i) {
        this.stopLatch = stopLatch;
        this.threadId = i;
    }

    @Override
    public void run() {
        try {
            System.out.println("inside the run method of " + threadId + " thread");
        } finally {
            System.out.println("Count down by " + threadId + " thread");
            stopLatch.countDown();
        }
    }
}

/**
 * @author rajesh.kumar2
 */
public class DeadLockCoundDownLatch extends Thread {

    public static void main(String[] args) throws InterruptedException {
        performParallelTask();
    }

    public static void performParallelTask() throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(10);
        for (int i = 0; i < 4; i++) {
            Thread t = new StopLatchedThread(cdl, i);
            t.start();
        }
       
        System.out.println("Before await method");
        System.out.println("Waiting for the further execution of Main thread...");
        cdl.await();

        /** This statement will not execute because cdl.await() method will wait till Count will reduced to 0. */
        System.out.println("After await method");
    }
}


Multithreading | CountDownLatch in Java

CountDownLatch is a kind of synchronizer which is used to make sure that a thread will wait until another set of threads completes their tasks. A very good example of CountDownLatch is the server where the main task can only start when all the required services have started.


A CountDownLatch is initialized with a given count and the count is decremented by calls to the countDown() method. When the count reaches to zero, it means all threads have completed their execution and thread which is waiting on latch resume the execution. await() method blocks the execution of the current thread until the count reaches zero.

This is a one-shot phenomenon i.e. the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

A simple example of CounDownLatch in Java
import java.util.concurrent.CountDownLatch;

/**
 * Worker thread class.
 * @author rajesh.kumar2
 */
class MyWorker extends Thread {

    private int delayTime;
    private CountDownLatch cdLatch;

    public MyWorker(int delay, CountDownLatch latch, String name) {
        super(name);
        this.delayTime = delay;
        this.cdLatch = latch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(delayTime);
            cdLatch.countDown();
            System.out.println(Thread.currentThread().getName() + " finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
 * Tests the countdown latch functionality.
 * @author rajesh.kumar2
 */
public class TestCountDownLatch {

    public static void main(String args[]) throws InterruptedException {
        /** CountDownLatch of size three which enable to wait main thread to complete the execution of 3 threads. */
        CountDownLatch cdLatch = new CountDownLatch(3);

        /** Creating three worker threads. */
        MyWorker first = new MyWorker(2000, cdLatch, "Worker t1");
        MyWorker second = new MyWorker(6000, cdLatch, "Worker t2");
        MyWorker third = new MyWorker(4000, cdLatch, "Worker t3");

        /** Starting three worker threads. */
        first.start();
        second.start();
        third.start();
       
        /** Main thread will wait the complete the execution of all 3 Worker threads. */
        cdLatch.await();

        System.out.println("Main thread has been started the execution ");
        System.out.println(Thread.currentThread().getName() + " has completed execution");
    }
}

Example, In which is mocking the Application Servers using countdown latch
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


abstract class IVerifyService implements Runnable {

    private CountDownLatch cdLatch;
    private String serviceName;
    private boolean isServiceUp;

    public IVerifyService(String serviceName, CountDownLatch latch) {
        super();
        this.cdLatch = latch;
        this.serviceName = serviceName;
        this.isServiceUp = false;
    }

    @Override
    public void run() {
        try {
            verifyService();
            isServiceUp = true;
        } catch (Exception e) {
            e.printStackTrace(System.err);
            isServiceUp = false;
        } finally {
            /** After completing the task, calling the countDown() the latch. */
            if (cdLatch != null) {
                cdLatch.countDown();
            }
        }
    }

    public String getServiceName() {
        return serviceName;
    }

    public boolean isServiceUp() {
        return isServiceUp;
    }

    /** abstract method needs to be implemented by all service verifiers */
    public abstract void verifyService() throws InterruptedException;
}

class VerifyDatabaseService extends IVerifyService {

    public VerifyDatabaseService(CountDownLatch latch) {
        super("Database", latch);
    }

    @Override
    public void verifyService() throws InterruptedException {
        System.out.println("Verifying the " + this.getServiceName());
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            throw e;
        }
        System.out.println(this.getServiceName() + " is up and running ");
    }
}


class VerifyNetworkService extends IVerifyService {

    public VerifyNetworkService(CountDownLatch latch) {
        super("Network", latch);
    }

    @Override
    public void verifyService() throws InterruptedException {
        System.out.println("Verifying the " + this.getServiceName());
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            throw e;
        }
        System.out.println(this.getServiceName() + " is up and running ");
    }
}

class ApplicationStartupUtil {

    /** List of service verifiers. */
    private static List<IVerifyService> services = new ArrayList<IVerifyService>();

    /** This latch is used to wait on. */
    private static CountDownLatch cdLatch;

    private ApplicationStartupUtil() {
       
    }

    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();

    public static ApplicationStartupUtil getInstance() {
        return INSTANCE;
    }

    public static boolean checkExternalServices() throws InterruptedException {
       
        /** Initialize the latch with number of service verifiers. */
        cdLatch = new CountDownLatch(2);

        /** Adding checker to lists */
        services.add(new VerifyDatabaseService(cdLatch));
        services.add(new VerifyNetworkService(cdLatch));


        /** Start service checkers using executor framework. */
        Executor executor = Executors.newFixedThreadPool(services.size());

        for (final IVerifyService service: services) {
            executor.execute(service);
        }

        /** Wait until all services are checked. */
        cdLatch.await();

        /** Check whether all required services are up and running. */
        for (final IVerifyService service: services) {
            if (!service.isServiceUp()) {
                return false;
            }
        }
        return true;
    }
}

public class CountDownLatchDemo {

    public static void main(String[] args) {
        boolean isServicesUp = false;
        try {
            isServicesUp = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
        String validationRes = (isServicesUp ? "" : " not ") + "up";
        System.out.println("All Services are " + validationRes );
    }
}

Usages of CountDownLatch in java
1. Achieving Maximum Parallelism
Sometimes we want to start a number of threads at the same time to achieve maximum parallelism. For example, we want to test a class for being a singleton. This can be done easily if we create a CountDownLatch with initial count 1, and make wait for all threads to wait for the latch. A single call to countDown() method will resume execution for all waiting threads at the same time.

2. Wait for N threads to complete before start execution
For example, an application start-up class wants to ensure that all N external systems are UP and running before handling the user requests.

3. Deadlock detection
A very handy use case in which you can use N threads to access a shared resource with the different number of threads in each test phase, and try to create a deadlock.

Related Posts Plugin for WordPress, Blogger...