Tomasz Nurkiewicz
[...] a very particular set of skills, skills [...] acquired over a very long career. Skills that make me a nightmare for
people like you
other developers
Liam Neeson on reactive programming
* not really proud about the complex
part
I wrote a book with the word reactive
in the title
(2016)
May you live in interesting timesChinese curse
May you support interesting codebase
User user = ws.findUserByName(name);
if (!db.contains(user.getSsn())) {
db.save(user);
}
List<Item> cart = loadCart(user);
double total = cart.stream()
.mapToDouble(Item::getPrice)
.sum();
UUID id = pay(total);
cart.forEach(item -> sendEmail(item, id));
User user = ws.findUserByName(name)
Mono<User> user = ws.findUserByName(name)
boolean contains = db.contains(user.getSsn())
Mono<Boolean> contains = db.contains(user.getSsn())
if (!db.contains(user.getSsn())) {
db.save(user);
}
user -> db
.contains(user.getSsn()) //Mono<Bool>, true/false
.filter(contains -> contains) //Mono<Bool>, true/empty
.switchIfEmpty(db.save(user)) //if empty,
//replace with db.save()
User user = ws.findUserByName(name);
if (!db.contains(user.getSsn())) {
db.save(user);
}
List<Item> cart = loadCart(user);
double total = cart.stream()
.mapToDouble(Item::getPrice)
.sum();
UUID id = pay(total);
cart.forEach(item -> sendEmail(item, id));
ws
.findUserByName(name)
.flatMap(user -> db
.contains(user.getSsn())
.filter(contains -> contains)
.switchIfEmpty(db.save(user))
)
.flatMap(user -> loadCart(user)
.collectList()
.flatMap(cart -> {
double total = cart.stream()
.mapToDouble(Item::getPrice)
.sum();
return pay(total)
.map(uuid -> Pair.of(cart, uuid));
}))
.flatMapMany(pair -> Flux
.fromIterable(pair.getLeft())
.map(item -> Pair.of(item, pair.getRight())))
.flatMap(pair -> sendEmail(pair.getLeft(), pair.getRight()))
User user = ws.findUserByName(name);
if (!db.contains(user.getSsn())) {
db.save(user);
}
List<Item> cart = loadCart(user);
double total = cart.stream()
.mapToDouble(Item::getPrice)
.sum();
UUID id = pay(total);
cart.forEach(item -> sendEmail(item, id));
ws
.findUserByName(name)
.flatMap(user -> db
.contains(user.getSsn())
.filter(contains -> contains)
.switchIfEmpty(db.save(user))
)
.flatMap(user -> loadCart(user)
.collectList()
.flatMap(cart -> {
double total = cart.stream()
.mapToDouble(Item::getPrice)
.sum();
return pay(total)
.map(uuid -> Pair.of(cart, uuid));
}))
.flatMapMany(pair -> Flux
.fromIterable(pair.getLeft())
.map(item -> Pair.of(item, pair.getRight())))
.flatMap(pair -> sendEmail(pair.getLeft(), pair.getRight()))
Are Semigroup, Monoid, Monad, Functor, Kleisli, and Yoneda pervasive in your domain model?www.innoq.com/en/blog/the-language-of-maths-is-not-the-language-of-your-business/
Unless your core domain is mathematics, category theory is not the language used by your domain experts.www.innoq.com/en/blog/the-language-of-maths-is-not-the-language-of-your-business/
Good luck getting your domain experts to understand the language introduced by that abstraction.www.innoq.com/en/blog/the-language-of-maths-is-not-the-language-of-your-business/
AreMono
andFlux
pervasive in your domain model?
Unless your core domain is infectious diseases, Reactor is not the language used by your domain experts.
en.wikipedia.org/wiki/Infectious_mononucleosis
en.wikipedia.org/wiki/Dysentery
Simple?
Monad transformers are reducing boilerplate
Tested?
Remember that code with badly written tests can be more harmful than code without tests.
Enable 10 other developers
Tomcat, 100 threads (L), 100 ms/request (W)
1K request/second (λ)
on a laptop
confirmed!
java.sql.SQLTransientConnectionException: HikariPool-1 -
Connection is not available,
request timed out after 30003ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:555) ~[HikariCP-2.4.7.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:188) ~[HikariCP-2.4.7.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:147) ~[HikariCP-2.4.7.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:99) ~[HikariCP-2.4.7.jar:na]
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:211) ~[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:447) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:277) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
www.nurkiewicz.com/2017/03/beware-of-slow-transaction-callbacks-in.html
"http-nio-9099-exec-2@6415" daemon prio=5 tid=0x28 nid=NA waiting
java.lang.Thread.State: WAITING
[...4 frames omitted...]
at org.apache.activemq.transport.FutureResponse.getResult
at o.a.a.transport.ResponseCorrelator.request
at o.a.a.ActiveMQConnection.syncSendPacket
at o.a.a.ActiveMQConnection.syncSendPacket
at o.a.a.ActiveMQSession.syncSendPacket
at o.a.a.ActiveMQMessageProducer.<init>
at o.a.a.ActiveMQSession.createProducer
[...5 frames omitted...]
at org.springframework.jms.core.JmsTemplate.send
at com.nurkiewicz.Sample$sendMessageAfterCommit$1.afterCommit
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit
at o.s.t.s.TransactionSynchronizationUtils.triggerAfterCommit
at o.s.t.s.AbstractPlatformTransactionManager.triggerAfterCommit
at o.s.t.s.AbstractPlatformTransactionManager.processCommit
at o.s.t.s.AbstractPlatformTransactionManager.commit
[...73 frames omitted...]
stack trace is meaningless when trying to follow a request.
It is difficult to follow a request as events and callbacks are processed [...]
[...] unhandled exceptions, and incorrectly handled state changes [...] These types of issues have proven to be quite difficult to debug
Hooks.onOperatorDebug()
flux
.map(this::foo)
.map(this::bar)
.map(this::buzz)
java.lang.NullPointerException:
The mapper function returned a null value.
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:64)
at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:111)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:219)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:215)
at io.reactivex.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:49)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.Flowable.subscribe(Flowable.java:12922)
at io.reactivex.Flowable.subscribe(Flowable.java:12843)
java.lang.NullPointerException:
The mapper returned a null value.
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:91)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:89)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at reactor.core.publisher.Flux.subscribe(Flux.java:6571)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:6738)
at reactor.core.publisher.Flux.subscribe(Flux.java:6564)
at reactor.core.publisher.Flux.subscribe(Flux.java:6528)
at reactor.core.publisher.Flux.subscribe(Flux.java:6498)
TimeoutException
play.api.http.HttpErrorHandlerExceptions$$anon$1:
Execution exception[[AskTimeoutException:
Ask timed out on
[Actor[akka://application/user/$a#-948062154]]
after [60000 ms]]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:265) ~[play_2.11-2.4.3.jar:2.4.3]
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:191) ~[play_2.11-2.4.3.jar:2.4.3]
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:179) [play_2.11-2.4.3.jar:2.4.3]
at play.api.DefaultGlobal$.onError(GlobalSettings.scala:212) [play_2.11-2.4.3.jar:2.4.3]
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:94) [play_2.11-2.4.3.jar:2.4.3]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:158) [play-netty-server_2.11-2.4.3.jar:2.4.3]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:155) [play-netty-server_2.11-2.4.3.jar:2.4.3]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) [scala-library-2.11.7.jar:na]
at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.7.jar:na]
at scala.util.Failure.recover(Try.scala:216) [scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) [scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) [scala-library-2.11.7.jar:na]
at play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109) [play-iteratees_2.11-2.4.3.jar:2.4.3]
at play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71) [play-iteratees_2.11-2.4.3.jar:2.4.3]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) [scala-library-2.11.7.jar:na]
at scala.concurrent.Promise$class.complete(Promise.scala:55) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) [scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) [scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) [scala-library-2.11.7.jar:na]
at play.core.j.HttpExecutionContext$$anon$2.run(HttpExecutionContext.scala:40) [play_2.11-2.4.3.jar:2.4.3]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [akka-actor_2.11-2.3.13.jar:na]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [akka-actor_2.11-2.3.13.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/$a#-948062154]] after [60000 ms]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) ~[akka-actor_2.11-2.3.13.jar:na]
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) ~[akka-actor_2.11-2.3.13.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) ~[scala-library-2.11.7.jar:na]
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) ~[scala-library-2.11.7.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) ~[scala-library-2.11.7.jar:na]
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) ~[akka-actor_2.11-2.3.13.jar:na]
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) ~[akka-actor_2.11-2.3.13.jar:na]
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) ~[akka-actor_2.11-2.3.13.jar:na]
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) ~[akka-actor_2.11-2.3.13.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
github.com/CogComp/open-eval/issues/186
parallel()
vsflatMap()
vsconcatMap()
vsconcatMapEager()
flatMap(..., 128)
an approach to software architecture that decomposes a complex, event-driven application into a set of stages connected by queuesen.wikipedia.org/wiki/Staged_event-driven_architecture
mailboxSize
in Akka 2?slow, inaccurate, impossible, silly
letitcrash.com/post/17707262394/why-no-mailboxsize-in-akka-2
import io.micrometer.core.instrument.Timer;
var timer = Metrics.timer("timer");
//...
User user = timer.recordCallable(this::loadUser);
Mono<User> user = timer.recordCallable(this::loadUserAsync);
Mono<User> Mono
.fromCallable(System::currentTimeMillis)
.flatMap(start ->
loadUserAsync()
.doOnSuccess(response ->
timer.record(
currentTimeMillis() - start, MILLISECONDS))
);
}
loadUserAsync()
.tag("operation", "loadUser")
.metrics()
@GetMapping("/users/{id}")
ResponseEntity<User> get(@PathVariable long id) {
User user = repository.findById(id);
if(user != null) {
return ok(user);
} else {
return notFound();
}
}
@GetMapping("/users/{id}")
ResponseEntity<User> get(@PathVariable long id) {
return repository
.findById(id)
.map(user -> ok(user))
.orElse(notFound());
}
@Bean
RouterFunction<ServerResponse> route() {
return route(
GET("/users/{id}"), request -> Mono
.justOrEmpty(request.pathVariable("id"))
.map(Long::parseLong)
.flatMap(id -> repository.findById(id)
.flatMap(p -> ok().body(fromObject(p))
.switchIfEmpty(notFound().build()))
);
}
the explosion of latency-inducing microserviceswww.infoq.com/articles/Designing-Implementing-Using-Reactive-APIs