Java concurrent library

The library java.util.concurrent contains classes useful when we do concurrency:

  • Executors: The Executor interface allows you to represent an object that executes tasks. ExecutorService enables asynchronous processing, managing a queue and executing submitted tasks based on thread availability.
  • Queues: ConcurrentLinkedQueue, BlockingQueue.
  • Synchronizers: the classic Semaphore, CountDownLatch.
  • Concurrent Collections: eg ConcurrentHashMap, or the Collections methods /java/util/Collections.html) synchronizedMap(), synchronizedList() and synchronizedSet().
  • Variables that allow non-blocking atomic operations in the java.util.concurrent.atomic package: AtomicBoolean, AtomicInteger, etc.

It is always preferable to use these classes over wait/notify synchronization methods, because they simplify programming. Just like it's better to use executors and tasks than threads directly.

Tasks and executors

Most concurrent applications are organized using tasks. A task performs a specific job. In this way, we can simplify the design and operation.

We see a possible solution for managing connections to a server. Suppose we have a method, attendRequest(), which serves a web request.

Sequential execution

class OneThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); attendRequest(connection); } } }

One thread for each request

class OneThreadPerRequestWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); Runnable task = new Runnable() { @Override public void run() { attendRequest(connection); } } new Thread(task).start(); } } }

Shared group of threads

class ExecutionTasksWebServer { private static final int NTHREADS = 100; private static final Executor executor = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { attendRequest(connection); } }; executor.execute(task); } } }

In this solution we introduced the Executor interface:

public interface Executor { void execute(Runnable command); }

It is an object that allows running Runnables. Internally, what it does is execute tasks asynchronously, creating a thread for each running task, and returning control to the thread that calls its execute method. Tasks can have four states:

  • Created
  • Sent
  • Started
  • Completed

Executors can be created from the class with static methods Executors. This class returns a subclass of Executor, the ExecutorService . This subclass uses the Thread Pool pattern, which reuses a maximum number of threads between a number of tasks in a queue.

An ExecutorService must always be stopped with the shutdown() method, which stops all threads in the pool.

Tasks with results

Some tasks return results. To implement them, we can use the interfaces Callable and Future:

public interface Callable<V> { V call() throws Exception; } public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, CancellationException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException; }

Callable<V> allows to execute the task and return a value of type V. In order to be able to execute it, we need an ExecutorService. In particular, its two methods:

  • Future<?> submit(Runnable task)
  • <T> Future<T> submit(Callable<T> task)

These allow a Runnable / Callable to be executed and return a Future, which is an object that allows the result to be obtained in a deferred manner using the get() method (blocking) or get(long timeout, TimeUnit unit) (blocking for some time).

We can also cancel the task using cancel(boolean mayInterruptIfRunning): the parameter tells whether to interrupt also if it has already started.

ExecutorServices can be created using the same class we saw before, Executors.

Below is a working example. How does execution change if we do Executors.newFixedThreadPool(2)?

public class SimpleCallableTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); Future<String> f1 = executor.submit(new ToUpperCallable("hello")); Future<String> f2 = executor.submit(new ToUpperCallable("world")); try { long millis = System.currentTimeMillis(); System.out.println("main " + f1.get() + " " + f2.get() + " in millis: " + (System.currentTimeMillis() - millis)); } catch (InterruptedException | ExecutionException ex) { ex.printStackTrace(); } executor.shutdown(); } private static final class ToUpperCallable implements Callable<String> { private String word; public ToUpperCallable(String word) { this.word = word; } @Override public String call() throws Exception { String name = Thread.currentThread().getName(); System.out.println(name + " calling for " + word); Thread.sleep(2500); String result = word.toUpperCase(); System.out.println(name + " result " + word + " => " + result); return result; } } }

In Java 7 the framework fork/join was introduced.

Java 8 introduced the CompletableFuture, which allows combine futures and better handle errors that occur. An example is the use of the complete method to complete a future, in another thread:

CompletableFuture<String> completableFuture = new CompletableFuture<>(); //... String result = completeableFuture.get(); // while in another thread... completableFuture.complete("Hello World!");

Or, the ability to run directly with supplyAsync:

Supplier<String> supplier = new Supplier<String>() { @Override public String get() { return "Hello, world!"; } }; Future<String> future = CompletableFuture.supplyAsync(supplier, executor); // executor is optional System.out.println(future.get());