Tomasz Nurkiewicz
[...] a very particular set of skills, skills [...] acquired over a very long career. Skills that make me a nightmare for
people like you
other developers
Liam Neeson on reactive programming
				* not really proud about the complex
 part
I wrote a book with the word reactive
 in the title
					(2016)
				May you live in interesting timesChinese curse
May you support interesting codebase
User user = ws.findUserByName(name);
if (!db.contains(user.getSsn())) {
    db.save(user);
}
List<Item> cart = loadCart(user);
double total = cart.stream()
		.mapToDouble(Item::getPrice)
		.sum();
UUID id = pay(total);
cart.forEach(item -> sendEmail(item, id));
					
				
						     User  user = ws.findUserByName(name)
										
					
						Mono<User> user = ws.findUserByName(name)
										
				
     boolean  contains = db.contains(user.getSsn())
										
					
Mono<Boolean> contains = db.contains(user.getSsn())
					
				
						if (!db.contains(user.getSsn())) {
						    db.save(user);
						}
										
					
						user -> db
				        .contains(user.getSsn())      //Mono<Bool>, true/false
				        .filter(contains -> contains) //Mono<Bool>, true/empty
				        .switchIfEmpty(db.save(user)) //if empty,
				                                      //replace with db.save()
					
				
						User user = ws.findUserByName(name);
						if (!db.contains(user.getSsn())) {
						    db.save(user);
						}
						List<Item> cart = loadCart(user);
						double total = cart.stream()
						        .mapToDouble(Item::getPrice)
						        .sum();
						UUID id = pay(total);
						cart.forEach(item -> sendEmail(item, id));
					
				
ws
    .findUserByName(name)
    .flatMap(user -> db
            .contains(user.getSsn())
            .filter(contains -> contains)
            .switchIfEmpty(db.save(user))
    )
    .flatMap(user -> loadCart(user)
            .collectList()
            .flatMap(cart -> {
                double total = cart.stream()
                        .mapToDouble(Item::getPrice)
                        .sum();
                return pay(total)
                        .map(uuid -> Pair.of(cart, uuid));
            }))
    .flatMapMany(pair -> Flux
            .fromIterable(pair.getLeft())
            .map(item -> Pair.of(item, pair.getRight())))
    .flatMap(pair -> sendEmail(pair.getLeft(), pair.getRight()))
    				
				
					
						User user = ws.findUserByName(name);
						if (!db.contains(user.getSsn())) {
						    db.save(user);
						}
						List<Item> cart = loadCart(user);
						double total = cart.stream()
						        .mapToDouble(Item::getPrice)
						        .sum();
						UUID id = pay(total);
						cart.forEach(item -> sendEmail(item, id));
					
				
ws
    .findUserByName(name)
    .flatMap(user -> db
            .contains(user.getSsn())
            .filter(contains -> contains)
            .switchIfEmpty(db.save(user))
    )
    .flatMap(user -> loadCart(user)
            .collectList()
            .flatMap(cart -> {
                double total = cart.stream()
                        .mapToDouble(Item::getPrice)
                        .sum();
                return pay(total)
                        .map(uuid -> Pair.of(cart, uuid));
            }))
    .flatMapMany(pair -> Flux
            .fromIterable(pair.getLeft())
            .map(item -> Pair.of(item, pair.getRight())))
    .flatMap(pair -> sendEmail(pair.getLeft(), pair.getRight()))
    				
				Are Semigroup, Monoid, Monad, Functor, Kleisli, and Yoneda pervasive in your domain model?www.innoq.com/en/blog/the-language-of-maths-is-not-the-language-of-your-business/
Unless your core domain is mathematics, category theory is not the language used by your domain experts.www.innoq.com/en/blog/the-language-of-maths-is-not-the-language-of-your-business/
Good luck getting your domain experts to understand the language introduced by that abstraction.www.innoq.com/en/blog/the-language-of-maths-is-not-the-language-of-your-business/
AreMonoandFluxpervasive in your domain model?
Unless your core domain is infectious diseases, Reactor is not the language used by your domain experts.
							en.wikipedia.org/wiki/Infectious_mononucleosis
							
							en.wikipedia.org/wiki/Dysentery
						
Simple?
Monad transformers are reducing boilerplate
Tested?
Remember that code with badly written tests can be more harmful than code without tests.
Enable 10 other developers
				
				
				
				
 point
				
?Tomcat, 100 threads (L), 100 ms/request (W)
1K request/second (λ)
on a laptop
 confirmed!
![]()  | 
							![]()  | 
						
java.sql.SQLTransientConnectionException: HikariPool-1 -
		Connection is not available,
		request timed out after 30003ms.
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:555) ~[HikariCP-2.4.7.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:188) ~[HikariCP-2.4.7.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:147) ~[HikariCP-2.4.7.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:99) ~[HikariCP-2.4.7.jar:na]
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:211) ~[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:447) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:277) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) ~[spring-tx-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    				
    				www.nurkiewicz.com/2017/03/beware-of-slow-transaction-callbacks-in.html
				
"http-nio-9099-exec-2@6415" daemon prio=5 tid=0x28 nid=NA waiting
  java.lang.Thread.State: WAITING
    [...4 frames omitted...]
    at org.apache.activemq.transport.FutureResponse.getResult
    at o.a.a.transport.ResponseCorrelator.request
    at o.a.a.ActiveMQConnection.syncSendPacket
    at o.a.a.ActiveMQConnection.syncSendPacket
    at o.a.a.ActiveMQSession.syncSendPacket
    at o.a.a.ActiveMQMessageProducer.<init>
    at o.a.a.ActiveMQSession.createProducer
    [...5  frames omitted...]
    at org.springframework.jms.core.JmsTemplate.send
    at com.nurkiewicz.Sample$sendMessageAfterCommit$1.afterCommit
    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit
    at o.s.t.s.TransactionSynchronizationUtils.triggerAfterCommit
    at o.s.t.s.AbstractPlatformTransactionManager.triggerAfterCommit
    at o.s.t.s.AbstractPlatformTransactionManager.processCommit
    at o.s.t.s.AbstractPlatformTransactionManager.commit
    [...73 frames omitted...]
					
				stack trace is meaningless when trying to follow a request.
It is difficult to follow a request as events and callbacks are processed [...]
[...] unhandled exceptions, and incorrectly handled state changes [...] These types of issues have proven to be quite difficult to debug
					
						Hooks.onOperatorDebug()
					
				
flux
	.map(this::foo)
	.map(this::bar)
	.map(this::buzz)
					
				
java.lang.NullPointerException:
		The mapper function returned a null value.
	at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:64)
	at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.internal.subscribers.LambdaSubscriber.request(LambdaSubscriber.java:111)
	at io.reactivex.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:219)
	at io.reactivex.internal.operators.flowable.FlowableInternalHelper$RequestMax.accept(FlowableInternalHelper.java:215)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:49)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
	at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
	at io.reactivex.Flowable.subscribe(Flowable.java:12986)
	at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
	at io.reactivex.Flowable.subscribe(Flowable.java:12986)
	at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
	at io.reactivex.Flowable.subscribe(Flowable.java:12986)
	at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
	at io.reactivex.Flowable.subscribe(Flowable.java:12986)
	at io.reactivex.Flowable.subscribe(Flowable.java:12922)
	at io.reactivex.Flowable.subscribe(Flowable.java:12843)
					
				
java.lang.NullPointerException:
		The mapper returned a null value.
	at java.util.Objects.requireNonNull(Objects.java:228)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
	at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:91)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156)
	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:89)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6571)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:6738)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6564)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6528)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6498)
					
				TimeoutException
play.api.http.HttpErrorHandlerExceptions$$anon$1:
		Execution exception[[AskTimeoutException: 
		Ask timed out on
		[Actor[akka://application/user/$a#-948062154]]
		after [60000 ms]]]
    at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:265) ~[play_2.11-2.4.3.jar:2.4.3]
    at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:191) ~[play_2.11-2.4.3.jar:2.4.3]
    at play.api.GlobalSettings$class.onError(GlobalSettings.scala:179) [play_2.11-2.4.3.jar:2.4.3]
    at play.api.DefaultGlobal$.onError(GlobalSettings.scala:212) [play_2.11-2.4.3.jar:2.4.3]
    at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:94) [play_2.11-2.4.3.jar:2.4.3]
    at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:158) [play-netty-server_2.11-2.4.3.jar:2.4.3]
    at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:155) [play-netty-server_2.11-2.4.3.jar:2.4.3]
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) [scala-library-2.11.7.jar:na]
    at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.7.jar:na]
    at scala.util.Failure.recover(Try.scala:216) [scala-library-2.11.7.jar:na]
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) [scala-library-2.11.7.jar:na]
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) [scala-library-2.11.7.jar:na]
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) [scala-library-2.11.7.jar:na]
    at play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109) [play-iteratees_2.11-2.4.3.jar:2.4.3]
    at play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71) [play-iteratees_2.11-2.4.3.jar:2.4.3]
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) [scala-library-2.11.7.jar:na]
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) [scala-library-2.11.7.jar:na]
    at scala.concurrent.Promise$class.complete(Promise.scala:55) [scala-library-2.11.7.jar:na]
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) [scala-library-2.11.7.jar:na]
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) [scala-library-2.11.7.jar:na]
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) [scala-library-2.11.7.jar:na]
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) [scala-library-2.11.7.jar:na]
    at play.core.j.HttpExecutionContext$$anon$2.run(HttpExecutionContext.scala:40) [play_2.11-2.4.3.jar:2.4.3]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [akka-actor_2.11-2.3.13.jar:na]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [akka-actor_2.11-2.3.13.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/$a#-948062154]] after [60000 ms]
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) ~[akka-actor_2.11-2.3.13.jar:na]
    at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) ~[akka-actor_2.11-2.3.13.jar:na]
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) ~[scala-library-2.11.7.jar:na]
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) ~[scala-library-2.11.7.jar:na]
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) ~[scala-library-2.11.7.jar:na]
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) ~[akka-actor_2.11-2.3.13.jar:na]
    at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) ~[akka-actor_2.11-2.3.13.jar:na]
    at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) ~[akka-actor_2.11-2.3.13.jar:na]
    at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) ~[akka-actor_2.11-2.3.13.jar:na]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
					
					github.com/CogComp/open-eval/issues/186
				
				parallel() vsflatMap() vsconcatMap() vsconcatMapEager()
				flatMap(..., 128)
				an approach to software architecture that decomposes a complex, event-driven application into a set of stages connected by queuesen.wikipedia.org/wiki/Staged_event-driven_architecture
mailboxSize in Akka 2?slow, inaccurate, impossible, silly
letitcrash.com/post/17707262394/why-no-mailboxsize-in-akka-2
					
import io.micrometer.core.instrument.Timer;
var timer = Metrics.timer("timer");
//...
User user = timer.recordCallable(this::loadUser);
					
				
						Mono<User> user = timer.recordCallable(this::loadUserAsync);
					
				
Mono<User> Mono
    .fromCallable(System::currentTimeMillis)
    .flatMap(start ->
        loadUserAsync()
            .doOnSuccess(response ->
                timer.record(
                    currentTimeMillis() - start, MILLISECONDS))
    );
}
					
				
						loadUserAsync()
							.tag("operation", "loadUser")
							.metrics()
					
				
@GetMapping("/users/{id}")
ResponseEntity<User> get(@PathVariable long id) {
    User user = repository.findById(id);
    if(user != null) {
        return ok(user);
    } else {
        return notFound();
    }
}
					
				
@GetMapping("/users/{id}")
ResponseEntity<User> get(@PathVariable long id) {
    return repository
    		.findById(id)
    		.map(user -> ok(user))
    		.orElse(notFound());
}
					
				
@Bean
RouterFunction<ServerResponse> route() {
    return route(
            GET("/users/{id}"), request -> Mono
                    .justOrEmpty(request.pathVariable("id"))
                    .map(Long::parseLong)
                    .flatMap(id -> repository.findById(id)
                    .flatMap(p -> ok().body(fromObject(p))
                    .switchIfEmpty(notFound().build()))
    );
}
					
				the explosion of latency-inducing microserviceswww.infoq.com/articles/Designing-Implementing-Using-Reactive-APIs