Tomasz Nurkiewicz
www.nurkiewicz.com | @tnurkiewicz | nurkiewicz@gmail.comThread thread = new Thread(
() -> doSomeWork(),
"MyWorker");
thread.setDaemon(true);
thread.start();
class Worker implements Runnable {
private final BlockingQueue<Work> queue;
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Work work = queue.take();
work.perform();
}
} catch (InterruptedException e) {
//interrupting Worker
}
}
}
Thread thread = new Thread(new Worker());
//...
thread.interrupt();
ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor();
Work work = //...
executor.submit(work::perform);
//...
executor.shutdownNow();
@FunctionalInterface
interface Work {
void perform()
};
ExecutorService executor =
Executors.newFixedThreadPool(10);
or
ExecutorService executor = new ThreadPoolExecutor(
10, // core size
10, // max size
0L, MILLISECONDS, // keep alive
new LinkedBlockingQueue<>(100), // work queue
threadFactory,
rejectedHandler
);
new LinkedBlockingQueue<>(100)
RejectedExecution Handler
RejectedExecutionHandler rejectedHandler =
(runnable, executor) ->
log.warn("Rejected task {}", runnable);
ThreadFactory
import com.google.common.util.concurrent.ThreadFactoryBuilder;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("MyPool-%d")
.setDaemon(true)
.build();
Future<Object> f1 = executor.submit(work::perform);
BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>(100);
Gauge<Integer> queueLen = queue::size;
metricRegistry.register("queue", queueLen);
var executor = Executors.newFixedThreadPool(20);
ExecutorServiceMetrics.monitor(
registry,
executor,
"executors",
Tags.of("key", "value"));
parallelStream()
veryLargeList
.parallelStream()
.filter(s -> s.startsWith("John"))
.findAny();
parallelStream()
don'tsForkJoinPool
Divide and conquer
ForkJoinPool
commonPool()
public class ForkJoinPool extends AbstractExecutorService {
static final ForkJoinPool common;
public static ForkJoinPool commonPool() {
return common;
}
//...
}
CompletableFuture
Future<Object> f1 = executor.submit(work::perform);
vs.
CompletableFuture<Object> f2 =
CompletableFuture.supplyAsync(work::perform, executor);
CompletableFuture
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() ->
loadUser(uuid), executorService);
CompletableFuture<GeoLocation> geoFuture =
CompletableFuture.supplyAsync(this::trackDevice);
CompletableFuture<Recommendations> recommendFuture =
userFuture.thenCombine(geoFuture,
(User user, GeoLocation geo) ->
buildRecommendations(user, geo));
CompletableFuture<String> bestFuture =
recommendFuture.thenCompose(
(Recommendations r) -> guessBest(r));
Flowable<Tweet> tweets = allTweets();
Flowable<List<String>> batches = tweets
.filter(tweet -> tweet.hasHashTag("#java"))
.sample(Duration.ofMillis(100))
.map(Tweet::contents)
.buffer(10);
Mono
.fromCallable(this::blockingMethod)
.timeout(ofMillis(300))
.retry(4)
.timeout(ofSeconds(1))
def customerVar = new DataflowVariable<Customer>()
def addressVar = new DataflowVariable<Address>()
def recommendationVar = new DataflowVariable<Recommendation>()
task {
Customer customer = customerVar.val
Address address = addressVar.val
recommendationVar << prepareRecommendation(customer, address)
}
task { customerVar << loadById(42) }
task { addressVar << track() }
println recommendationVar.val
async/await
public CompletableFuture<Boolean> buyItem(
String itemTypeId, int cost)
{
return bank.decrement(cost)
.thenCompose(result -> {
if(!result) {
return completedFuture(false);
}
return inventory.giveItem(itemTypeId)
.thenApply(res -> true);
});
}
public CompletableFuture<Boolean> buyItem(
String itemTypeId, int cost)
{
if(!await(bank.decrement(cost))) {
return completedFuture(false);
}
try {
await(inventory.giveItem(itemTypeId));
return completedFuture(true);
} catch (Exception ex) {
await(bank.refund(cost));
throw new AppException(ex);
}
}
java -javaagent:ea-async-1.2.3.jar
(def agn (agent ()))
(dotimes [i 5]
(send agn #(conj % i)))
(-> agn deref print)
core.async
(Clojure)
(require '[clojure.core.async :as async])
(def ch (async/chan 128))
(async/>!! ch "Foo")
(async/<!! ch) ;; "Foo"
(def accounts [
(ref {:name "Alice", :age 20, :balance 100})
(ref {:name "Bob", :age 23, :balance 200})
(ref {:name "Eve", :age 21, :balance 50})
(ref {:name "Jane", :age 24, :balance 0})
])
(defn change-balance-by [amount account]
(update-in account [:balance] #(+ % amount)))
(defn transfer-cash [from to amount]
(dosync
(alter from (partial change-balance-by (- amount)))
(alter to (partial change-balance-by amount))))
class Counter extends Actor {
private[this] var counter = 0
override def receive = {
case x: Int =>
counter += x
sender() ! counter
}
}
GlobalScope.launch {
delay(1000)
println("Hello")
}
for (i in 1..1_000_000)
GlobalScope.launch {
someWork()
}
suspend fun buyItem(itemTypeId: String, cost: Int): Boolean {
if(bank.decrement(cost)) {
return false
}
try {
inventory.giveItem(itemTypeId)
return true
} catch (ex: Exception) {
bank.refund(cost)
throw AppException(ex);
}
}
suspend fun decrement(...)
suspend fun giveItem(...)
suspend fun refund(...)
java.lang.Continuation
public interface ReactiveUserRepository
extends ReactiveCrudRepository<User, String> {
Mono<User> findById(long id);
}
@Tailable
public interface ReactiveUserRepository
extends ReactiveCrudRepository<User, String> {
@Tailable
Flux<User> findBy();
}
@Bean
RouterFunction<ServerResponse> mapping(
ReactiveUserRepository repository) {
return route(
GET("/users/{id}"), request -> Mono
.justOrEmpty(request.pathVariable("id"))
.map(Long::parseLong)
.flatMap(id -> repository.findByid(id)
.flatMap(p -> ok().body(fromObject(p))
.switchIfEmpty(notFound().build()))
);
}
LongAdder
LongAdder adder = new LongAdder();
adder.increment();
adder.add(42);
adder.sum();
adder.sumThenReset();
AtomicInteger
compareAndSet()
AtomicInteger atomic = new AtomicInteger();
//...
int cur;
do {
cur = atomic.get();
} while (!atomic.compareAndSet(cur, cur * 2));
AtomicInteger
updateAndGet()
AtomicInteger atomic = new AtomicInteger();
//...
atomic.updateAndGet(x -> x * 2);
java.util.Map
merge()
Map<String, Integer> wordCount = new ConcurrentHashMap<>();
wordCount.merge("Foo", 1, (x, y) -> x + y);
wordCount.merge("Bar", 1, (x, y) -> x + y);
wordCount.merge("Foo", 1, (x, y) -> x + y);
wordCount.merge("Foo", 1, (x, y) -> x + y);
Results:
{Foo=3, Bar=1}
java.util.Map
merge()
implementation
V merge(K key, V value, BiFunction<V, V, V> mapping) {
V oldValue = get(key);
V newValue = (oldValue == null) ?
value :
mapping.apply(oldValue, value);
put(key, newValue);
return newValue;
}
ConcurrentHashMap
Do you even need concurrency?
nurkiewicz.github.io/talks/2018/concurrency
www.nurkiewicz.com | @tnurkiewicz | nurkiewicz@gmail.com