Reactive Programming with Reactor 3
01.Reactor 3
Reactor 3 ๋ Reactive Streams ๋ฅผ ์คํฉ์ ๊ธฐ๋ฐ์ผ๋ก ๊ตฌ์ถ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ก JVM์์ ๋ฆฌ์ํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ ํจ๋ฌ๋ค์์ ์น๊ณํฉ๋๋ค.
์ด๋ ๋ฐ์ดํฐ ํ๋ฆ (data flows) ์ ๋ณํ ์ ํ์ ์ค์ ์ ๋ ๋น๋๊ธฐํ๋ก๊ทธ๋๋ฐ์ด๋ค.
๋ฆฌ์กํฐ๋ธ ์ ์ธ๋ฌธ
๋ฆฌ์กํฐ๋ธ ์์คํ ์ผ๋ก ๊ตฌ์ถ๋ ์์คํ ์
์ ์ฐํจ
,๋ฎ์ ๊ฒฐํฉ๋
,ํ์ฅ์ฑ
์ ์ ์งํด์ผ ํ๋ค.
์ด ์์คํ ์ ์ฅ์ ์ ๋ํ ๊ฐํ ๋ด์ฑ์ ๊ฐ์ง๊ณ , ๋น๋ก ์ฅ์ ๊ฐ ๋ฐ์ํ๋๋ผ๋ ๋ฐ์ด๋ ์ฅ์ ๋ณต๊ตฌ์ฑ์ ํน์ง์ ๊ฐ์ง๊ณ ์์ต๋๋ค.
๋ํ ๋์ ์๋ต์ฑ์ ๊ฐ์ ธ ์ฌ์ฉ์์๊ฒ ํ์ฌ๊ธ ์ํธ์ ํผ๋๋ฐฑ์ ์ ๊ณตํฉ๋๋ค.
์ ๋ฆฌ์กํฐ๋ธ ์ธ๊ฐ?
๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ๋น๋๊ธฐ ์ฒ๋ฆฌ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ๊ธฐ ์ํด ์ ์ธ์ ์ฝ๋(aka. ํจ์ํ ํ๋ก๊ทธ๋๋ฐ) ๋ฅผ ์ฌ์ฉํ๋ ์๋ก์ด ํจ๋ฌ๋ค์์ด๋ค.
๋ฐ์ดํฐ๊ฐ ์ฌ์ฉ ๊ฐ๋ฅํ๊ฒ ๋ ์ค๋น๊ฐ ๋๋ฉด Consumer ์๊ฒ Push ๋๋ ์ด๋ฒคํธ ์ฒ๋ฆฌ ๊ธฐ๋ฐ์ ๋ชจ๋ธ์
๋๋ค. (์ด๋ฒคํธ์ ๋น๋๊ธฐ ์ํ์ค๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.)
์ด๊ฒ์ ๋ฆฌ์์ค๋ฅผ ๋ณด๋ค ํจ์จ์ ์ผ๋ก ์ฌ์ฉํ๊ธฐ ์ํด low-level ์ ๋์์ ์ฒ๋ฆฌ ํน์ ๋ณ๋ ฌ ์ฝ๋๋ฅผ ์์ฑํ๋๋ฐ ํฐ ์ด๋ ค์ ์์ด
์ดํ๋ฆฌ์ผ์ด์
์ ์์ฉ ๊ฐ๋ฅํ ์ฒ๋ฆฌ ๋ฒ์๋ฅผ ์ฆ๊ฐ์ํค๋ ํจ๊ณผ๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ์์ ํ ๋น๋๊ธฐ ํน์ ๋
ผ ๋ธ๋กํน์ ์ค์ฌ์ ๊ธฐ๋ฐ์ผ๋ก JDK ์์ ๋น๋๊ธฐ ์ฝ๋๋ฅผ ์ํ ๊ฐ๋ฅํ๊ฒ ํ๋ฉฐ,
๊ธฐ์กด Callback ๊ธฐ๋ฐ์ API ์ค๊ณ๋ Future ์ฌ์ฉ์ ๋ํ ๋์์ผ๋ก ์ฌ์ฉ ๊ฐ๋ฅํ๋ค.
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ
Reactive Stream ์คํฉ์ JVM ์์ Reactive Programming ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ํ์คํ ํ๊ธฐ ์ํ ์ฃผ๋์ ๋ ธ๋ ฅ์ด๋ฉฐ,
๋ ์ค์ํ๊ฒ๋ ์๋ก ๋ค๋ฅธ ํ๊ฒฝ์์ ์ํธ ์ด์ ๊ฐ๋ฅํ๋๋ก(ํธํ ๊ฐ๋ฅ) ์๋ํ๋ ๋ฐฉ์์ ์ง์ ํ๋ค.
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๋ค์ 4๊ฐ์ ์ธํฐํ์ด์ค๋ก ์ ์๋์ด ์๋ค.
- Subscriber
- Publisher
- Subscription
- Processor
๋ํ์ ์ ๊ตฌํ์ฒด๋ ๋ค์๊ณผ ๊ฐ์๊ฒ๋ค์ด ์๋ค.
- Reactor 3
- RxJava (Version 2~)
- Akka Streams
- Vert.x
- Ratpack
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๋ชฉํ๋ ๋ชจ๋ ์ํฉ์์ ์ฌ์ฉ๊ฐ๋ฅํ๊ฒ ๋ high-level ์ API ์ ๊ณต์ ๋ชฉํ๋ก ํฉ๋๋ค.
์ํธ์์ฉ
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ Publisher
์ ๋ฐ์ดํฐ๊ฐ ์์๋๊ณ ,
๊ธฐ๋ณธ์ ์ผ๋ก๋ Subscriber
๊ฐ subscribe
๋ฅผ ํธ์ถํ๋ ์๊ฐ
Publisher ์์ Subscriber
๋ก ๋ฐ์ดํฐ ์ ๋ฌ์ด ์์ ๋ฉ๋๋ค.
์ ๋ค์ด์ด๊ทธ๋จ์์ request(n)
์ backpressure ์ ์กฐ์ ํ๋ ๋ถ๋ถ์
๋๋ค.
์ฐธ๊ณ ์๋ฃ
Tech.io
https://www.reactive-streams.org/
What is Reactive Programming
Reactive Manifesto
02.Flux
Flux ๋ ๋ค์๊ณผ ๊ฐ์ ์ฐ์ฐ์ด ์ถ๊ฐ๋ก ์ ์๋์ด ์๋ค.
- ์์ฑ (Generate)
- ๋ณํ (Transform)
- ์กฐ์จ (Orchestrate)
0 ์์ n
๊ฐ๊น์ง์ <T>
์ ์์๋ฅผ ๋ณด๋ธ ๋ค(onNext
์ด๋ฒคํธ) ์ฑ๊ณต(onComplete
๋ฉ์๋) ํ๊ฑฐ๋ ์๋ฌ๋ฅผ ๋ฐ์ (onError
์ข
๋ฃ ๋ฉ์๋)
์ธ์คํด์ค ๋ฉ์๋์ธ ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ฉด ๋น๋๊ธฐ ์ํ์ค๋ฅผ ์์ฑํ๋ ๋น๋๊ธฐ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ ์ ์๋ค.
์์
// TODO Return an empty Flux
Flux<String> emptyFlux() {
return Flux.empty();
}
// TODO Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
// TODO Create a Flux from a List that contains 2 values "foo" and "bar"
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
// TODO Create a Flux that emits an IllegalStateException
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
// TODO Create a Flux that emits increasing values from 0 to 9 each 100ms
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.take(10);
}
์ฐธ๊ณ ์๋ฃ
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
Reactor ์ธ์ ์ด๋ค Operator์ ์จ์ผ ํ ๊น?
03.Mono
์ต๋ 1๊ฐ์ ์์๋ฅผ ๋ฐํํ ์ ์์ต๋๋ค.
์์
// TODO Return an empty Mono
Mono<String> emptyMono() {
return Mono.empty();
}
// TODO Return a Mono that never emits any signal
Mono<String> monoWithNoSignal() {
return Mono.never();
}
// TODO Return a Mono that contains a "foo" value
Mono<String> fooMono() {
return Mono.just("foo");
}
// TODO Create a Mono that emits an IllegalStateException
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
04.StepVerifier
Publisher ๋ฅผ ๊ตฌ๋ ํ๋ฉด์ ์์๊ฐ๊ณผ ์์๋ฅผ ๊ฒ์ฆํ ์ ์์
create
๋ฉ์๋๋ก ์ธ์คํด์ค๋ฅผ ์์ฑ
๋ฐ๋์ verify()
๋ฉ์๋๋ฅผ ํธ์ถํด์ผ ํจ
StepVerifier.create(T<Publisher>)
.{ expectation... }
.verify()
์ฝ๋ ์์
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo")
.expectNext("bar")
.verifyComplete();
}
// TODO Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then a RuntimeException error.
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo")
.expectNext("bar")
.verifyError(RuntimeException.class);
}
// TODO Use StepVerifier to check that the flux parameter emits a User with "swhite"username
// and another one with "jpinkman" then completes successfully.
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext( u -> assertThat(u.getUsername()).isEqualTo("swhite"))
.assertNext( u -> assertThat(u.getUsername()).isEqualTo("jpinkman"))
.verifyComplete();
}
// TODO Expect 10 elements then complete and notice how long the test takes.
void expect10Elements(Flux<Long> flux) {
StepVerifier.create(flux)
.expectNextCount(10)
.verifyComplete();
}
// TODO Expect 3600 elements at intervals of 1 second, and verify quicker than 3600s
// by manipulating virtual time thanks to StepVerifier#withVirtualTime, notice how long the test takes
void expect3600Elements(Supplier<Flux<Long>> supplier) {
StepVerifier.withVirtualTime(supplier)
.thenAwait(Duration.ofSeconds(3600))
.expectNextCount(3600)
.verifyComplete();
}
์ฐธ๊ณ ์๋ฃ
05.Transform
Reactor ์์๋ ๋ฐ์ดํฐ๋ฅผ ๋ณํ / ๋ณํ ๊ฐ๋ฅํ ๋ค์ํ ์ฐ์ฐ์๊ฐ ์๋ค.
๋ง์ฝ ๋ค์ ์ฌํญ์ ์ ์ ํด๋ณธ๋ค.
- ๋ฐ์ดํฐ ์ํ์ค๊ฐ (1..5)๊น์ง ์๋ค.
map
์์๋ ๊ฐ ๋ฐ์ดํฐ์ 100์ ๋ํด์ฃผ๋ ์์ ์ ์ฒ๋ฆฌํ๋ค.- ์ด ์์ ์ ๊ฐ 10์ด์ฉ ๊ฑธ๋ฆฐ๋ค. (latency)
์ด๋ฌํ ๊ฒฝ์ฐ๋ผ๋ฉด map ์ ์ฌ์ฉํ ๋๋ ๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ ์ด ์์์๊ฐ์ 500์ด๊ฐ ๊ฑธ๋ฆฐ๋ค.
์ด๋ฅผ ์ํด ๋น๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌ ๊ฐ๋ฅํ ๋ฐฉ๋ฒ์ ๊ณ ๋ คํด์ผ ํ ๊ฒ์ด๋ค.
Flux
๋๋ Mono
๋ฅผ ์ด์ฉํ๊ณ flatMap
์ฐ์ฐ์ ์ด์ฉํ๋ฉด ๋น๋๊ธฐ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ค.
flatMap
์ Publisher
๋ฅผ ๋ฐํํ๋ ๋ฉ์๋๊ฐ ๋ด๋ถ์ ์ผ๋ก ์ฌ์ฉ๋๋ค.
map :
T
->U
flatMap:T
->Publisher<U>
Publisher<U>
์ ์๋ต๊ฐ์ ๋ฐ๋ผ ๊ฒฐ๊ณผ ์์๊ฐ ๋ฐ๋์ ์๋ค. (๋๋ถ๋ถ ๋ฐ๋)
์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ์์๋ฅผ ๋ณด์ฅํ๋ ์ฌ๋ฌ ๋ฐฉ๋ฒ์ด ์๋ค. (flatMapSequential)
06.Merge
์ฌ๋ฌ ์ํ์ค๋ฅผ ๋ณํฉํ์ฌ ํ๋์ Flux ๋ก ์ ๋ฌํ๋ ๋ฐฉ๋ฒ์ด๋ค.
Publisher
์ ์์์ ์๊ด ์์ด ๋ฐ์ดํฐ๊ฐ ๋ฐ์ํ ๋๋ง๋ค Flux
๋ก ์ ๋ฌ
// TODO Merge flux1 and flux2 values with no interleave (flux1 values and then flux2 values)
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return Flux.concat(flux1, flux2);
}
Publisher
์ ๋ฐํ์ด ์ข
๋ฃ๋ ๋๊น์ง ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ ํ, ๋ค์ ์ธ์์ Publisher
์ ์ธ์๋ฅผ ์ ๋ฌํจ
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return Flux.concat(mono1, mono2);
}
07.Request
08.Error
09.Adapt
10.Other Operations
11.Reactive to Blocking
12.Blocking to Reactive
โ Nested Class