Tour around concurrency in Java

Tomasz Nurkiewicz

www.nurkiewicz.com | @tnurkiewicz | nurkiewicz@gmail.com

Plain threads

Thread thread = new Thread(
    () -> doSomeWork(), 
    "MyWorker");
thread.setDaemon(true);
thread.start();

When JVM terminates?

Worker queue

class Worker implements Runnable {
	
  private final BlockingQueue<Work> queue;
  
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        Work work = queue.take();
        work.perform();
      }
    } catch (InterruptedException e) {
      //interrupting Worker
    }
  }
}

Interrupting worker queue


Thread thread = new Thread(new Worker());
//...
thread.interrupt();
					

ExecutorService

ExecutorService executor = Executors.newSingleThreadExecutor();
Work work = //...
executor.submit(work::perform);
//...
executor.shutdownNow();
@FunctionalInterface
interface Work {
    void perform()
};

Configuring thread pool - defaults

ExecutorService executor =
  Executors.newFixedThreadPool(10);

Inline

++N

or

Ctrl+Alt+N

Configuring thread pool

ExecutorService executor = new ThreadPoolExecutor(
  10,                             // core size
  10,                             // max size
  0L, MILLISECONDS,               // keep alive
  new LinkedBlockingQueue<>(100), // work queue
  threadFactory,
  rejectedHandler
);

Always bound queue

new LinkedBlockingQueue<>(100)

Custom RejectedExecution Handler

RejectedExecutionHandler rejectedHandler =
        (runnable, executor) ->
            log.warn("Rejected task {}", runnable);

Custom ThreadFactory

import com.google.common.util.concurrent.ThreadFactoryBuilder;

ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat("MyPool-%d")
    .setDaemon(true)
    .build();

Don't swallow exceptions


Future<Object> f1 = executor.submit(work::perform);
					

Monitor queue length

BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>(100);

Gauge<Integer> queueLen = queue::size;
metricRegistry.register("queue", queueLen);

Micrometer


var executor = Executors.newFixedThreadPool(20);
ExecutorServiceMetrics.monitor(
		registry, 
		executor, 
		"executors", 
		Tags.of("key", "value"));
					

Other metrics

  • Thread utilization
  • Execution latency
  • Execution duration

parallelStream()


veryLargeList
  .parallelStream()
  .filter(s -> s.startsWith("John"))
  .findAny();
					

parallelStream() don'ts

  • Don't run any I/O
  • Don't use in shared JVM, like application server
  • Don't... just don't use it

ForkJoinPool

Divide and conquer

ForkJoinPool

commonPool()


public class ForkJoinPool extends AbstractExecutorService {

  static final ForkJoinPool common;

  public static ForkJoinPool commonPool() {
    return common;
  }

  //...
}
					

CompletableFuture


						Future<Object> f1 = executor.submit(work::perform);
					
vs.

CompletableFuture<Object> f2 =
  CompletableFuture.supplyAsync(work::perform, executor);
					

CompletableFuture


CompletableFuture<User> userFuture =
  CompletableFuture.supplyAsync(() ->
    loadUser(uuid), executorService);

CompletableFuture<GeoLocation> geoFuture =
  CompletableFuture.supplyAsync(this::trackDevice);
					

CompletableFuture<Recommendations> recommendFuture =
  userFuture.thenCombine(geoFuture,
    (User user, GeoLocation geo) ->
      buildRecommendations(user, geo));
					

CompletableFuture<String> bestFuture =
     recommendFuture.thenCompose(
       (Recommendations r) -> guessBest(r));
					

RxJava / Reactor


Flowable<Tweet> tweets = allTweets();

Flowable<List<String>> batches = tweets
  .filter(tweet -> tweet.hasHashTag("#java"))
  .sample(Duration.ofMillis(100))
  .map(Tweet::contents)
  .buffer(10);
  					

RxJava / Reactor (cont.)


Mono
	.fromCallable(this::blockingMethod)
	.timeout(ofMillis(300))
	.retry(4)
	.timeout(ofSeconds(1))
                	

GPars - data flow


def customerVar = new DataflowVariable<Customer>()
def addressVar = new DataflowVariable<Address>()
def recommendationVar = new DataflowVariable<Recommendation>()

task {
  Customer customer = customerVar.val
  Address address = addressVar.val
  recommendationVar << prepareRecommendation(customer, address)
}

task { customerVar << loadById(42) }

task { addressVar << track() }

println recommendationVar.val
  					

async/await


public CompletableFuture<Boolean> buyItem(
			String itemTypeId, int cost)
    {
        return bank.decrement(cost)
            .thenCompose(result -> {
                if(!result) {
                    return completedFuture(false);
                }
                return inventory.giveItem(itemTypeId)
                	.thenApply(res -> true);
            });
    }
					

Electronic Arts?

github.com/electronicarts/ea-async


    public CompletableFuture<Boolean> buyItem(
    			String itemTypeId, int cost)
    {
        if(!await(bank.decrement(cost))) {
            return completedFuture(false);
        }
        try {
            await(inventory.giveItem(itemTypeId));
            return completedFuture(true);
        } catch (Exception ex) {
            await(bank.refund(cost));
            throw new AppException(ex);
        }
    }
					

😢

java -javaagent:ea-async-1.2.3.jar

Agents (Clojure)


(def agn (agent ()))
(dotimes [i 5]
  (send agn #(conj % i)))
(-> agn deref print)
  					

core.async (Clojure)


(require '[clojure.core.async :as async])
(def ch (async/chan 128))
(async/>!! ch "Foo")
(async/<!! ch)  ;; "Foo"
  					

STM (Clojure)


(def accounts [
    (ref {:name "Alice", :age 20, :balance 100})
    (ref {:name "Bob",   :age 23, :balance 200})
    (ref {:name "Eve",   :age 21, :balance 50})
    (ref {:name "Jane",  :age 24, :balance 0})
])
  					

STM cont.


(defn change-balance-by [amount account]
  (update-in account [:balance] #(+ % amount)))
					

(defn transfer-cash [from to amount]
  (dosync
    (alter from (partial change-balance-by (- amount)))
    (alter to (partial change-balance-by amount))))
					

Actors - example


class Counter extends Actor {

  private[this] var counter = 0
 
  override def receive = {
    case x: Int =>
    	counter += x
        sender() ! counter
  }
}
						

Kotlin - coroutines


GlobalScope.launch {
    delay(1000)
    println("Hello")
}
					

Lightweight threads


for (i in 1..1_000_000)
    GlobalScope.launch {
        someWork()
    }
					

suspend fun buyItem(itemTypeId: String, cost: Int): Boolean {
    if(bank.decrement(cost)) {
        return false
    }
    try {
        inventory.giveItem(itemTypeId)
        return true
    } catch (ex: Exception) {
        bank.refund(cost)
        throw AppException(ex);
    }
}
					

suspend fun decrement(...)
suspend fun giveItem(...)
suspend fun refund(...)
					

Project Loom

wiki.openjdk.java.net/display/loom/Main

java.lang.Continuation

Let's get practical!

Spring Boot 2.0


public interface ReactiveUserRepository
		extends ReactiveCrudRepository<User, String> {

    Mono<User> findById(long id);

}
					

Infinite stream with @Tailable


public interface ReactiveUserRepository
		extends ReactiveCrudRepository<User, String> {

    @Tailable
    Flux<User> findBy();

}
					

Spring WebFlux


@Bean
RouterFunction<ServerResponse> mapping(
			ReactiveUserRepository repository) {
    return route(
            GET("/users/{id}"), request -> Mono
                    .justOrEmpty(request.pathVariable("id"))
                    .map(Long::parseLong)
                    .flatMap(id -> repository.findByid(id)
                    .flatMap(p -> ok().body(fromObject(p))
                    .switchIfEmpty(notFound().build()))
    );
}
					

Useful utilities

LongAdder


LongAdder adder = new LongAdder();
adder.increment();
adder.add(42);
adder.sum();
adder.sumThenReset();
					

AtomicInteger

Good ol' compareAndSet()


AtomicInteger atomic = new AtomicInteger();
//...
int cur;
do {
    cur = atomic.get();
} while (!atomic.compareAndSet(cur, cur * 2));
					

AtomicInteger

updateAndGet()


AtomicInteger atomic = new AtomicInteger();
//...
atomic.updateAndGet(x -> x * 2);
					

java.util.Map

merge()


Map<String, Integer> wordCount = new ConcurrentHashMap<>();

wordCount.merge("Foo", 1, (x, y) -> x + y);
wordCount.merge("Bar", 1, (x, y) -> x + y);
wordCount.merge("Foo", 1, (x, y) -> x + y);
wordCount.merge("Foo", 1, (x, y) -> x + y);
					

Results:


{Foo=3, Bar=1}
					

java.util.Map

merge() implementation


V merge(K key, V value, BiFunction<V, V, V> mapping) {
    V oldValue = get(key);
    V newValue = (oldValue == null) ?
	   value :
	   mapping.apply(oldValue, value);
    put(key, newValue);
    return newValue;
}
					

ConcurrentHashMap

Summary

Do you even need concurrency?

Thank you


nurkiewicz.github.io/talks/2018/concurrency

www.nurkiewicz.com | @tnurkiewicz | nurkiewicz@gmail.com