Reactive Programming with Reactor 3

01.Reactor 3

Reactor 3 ๋Š” Reactive Streams ๋ฅผ ์ŠคํŒฉ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ตฌ์ถ•๋œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋กœ JVM์—์„œ ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ํŒจ๋Ÿฌ๋‹ค์ž„์„ ์Šน๊ณ„ํ•ฉ๋‹ˆ๋‹ค.

์ด๋Š” ๋ฐ์ดํ„ฐ ํ๋ฆ„ (data flows) ์™€ ๋ณ€ํ™˜ ์ „ํŒŒ์— ์ค‘์ ์„ ๋‘” ๋น„๋™๊ธฐํ”„๋กœ๊ทธ๋ž˜๋ฐ์ด๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ์„ ์–ธ๋ฌธ

๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ์œผ๋กœ ๊ตฌ์ถ•๋œ ์‹œ์Šคํ…œ์€ ์œ ์—ฐํ•จ, ๋‚ฎ์€ ๊ฒฐํ•ฉ๋„, ํ™•์žฅ์„ฑ์„ ์œ ์ง€ํ•ด์•ผ ํ•œ๋‹ค.
์ด ์‹œ์Šคํ…œ์€ ์žฅ์• ์— ๋Œ€ํ•œ ๊ฐ•ํ•œ ๋‚ด์„ฑ์„ ๊ฐ€์ง€๊ณ , ๋น„๋ก ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ๋›ฐ์–ด๋‚œ ์žฅ์•  ๋ณต๊ตฌ์„ฑ์˜ ํŠน์ง•์„ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
๋˜ํ•œ ๋†’์€ ์‘๋‹ต์„ฑ์„ ๊ฐ€์ ธ ์‚ฌ์šฉ์ž์—๊ฒŒ ํ•˜์—ฌ๊ธˆ ์ƒํ˜ธ์  ํ”ผ๋“œ๋ฐฑ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

์™œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ธ๊ฐ€?

๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์€ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์ถ•ํ•˜๊ธฐ ์œ„ํ•ด ์„ ์–ธ์  ์ฝ”๋“œ(aka. ํ•จ์ˆ˜ํ˜• ํ”„๋กœ๊ทธ๋ž˜๋ฐ) ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ƒˆ๋กœ์šด ํŒจ๋Ÿฌ๋‹ค์ž„์ด๋‹ค.
๋ฐ์ดํ„ฐ๊ฐ€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๊ฒŒ ๋” ์ค€๋น„๊ฐ€ ๋˜๋ฉด Consumer ์—๊ฒŒ Push ๋˜๋Š” ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ๊ธฐ๋ฐ˜์˜ ๋ชจ๋ธ์ž…๋‹ˆ๋‹ค. (์ด๋ฒคํŠธ์˜ ๋น„๋™๊ธฐ ์‹œํ€€์Šค๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.)
์ด๊ฒƒ์€ ๋ฆฌ์†Œ์Šค๋ฅผ ๋ณด๋‹ค ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด low-level ์˜ ๋™์‹œ์  ์ฒ˜๋ฆฌ ํ˜น์€ ๋ณ‘๋ ฌ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•˜๋Š”๋ฐ ํฐ ์–ด๋ ค์›€ ์—†์ด
์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ์ˆ˜์šฉ ๊ฐ€๋Šฅํ•œ ์ฒ˜๋ฆฌ ๋ฒ”์œ„๋ฅผ ์ฆ๊ฐ€์‹œํ‚ค๋Š” ํšจ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์€ ์™„์ „ํ•œ ๋น„๋™๊ธฐ ํ˜น์€ ๋…ผ ๋ธ”๋กœํ‚น์„ ์ค‘์‹ฌ์„ ๊ธฐ๋ฐ˜์œผ๋กœ JDK ์—์„œ ๋น„๋™๊ธฐ ์ฝ”๋“œ๋ฅผ ์ˆ˜ํ–‰ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•˜๋ฉฐ,
๊ธฐ์กด Callback ๊ธฐ๋ฐ˜์˜ API ์„ค๊ณ„๋‚˜ Future ์‚ฌ์šฉ์— ๋Œ€ํ•œ ๋Œ€์•ˆ์œผ๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ

Reactive Stream ์ŠคํŒฉ์€ JVM ์—์„œ Reactive Programming ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ํ‘œ์ค€ํ™” ํ•˜๊ธฐ ์œ„ํ•œ ์ฃผ๋„์  ๋…ธ๋ ฅ์ด๋ฉฐ,

๋” ์ค‘์š”ํ•˜๊ฒŒ๋Š” ์„œ๋กœ ๋‹ค๋ฅธ ํ™˜๊ฒฝ์—์„œ ์ƒํ˜ธ ์šด์˜ ๊ฐ€๋Šฅํ•˜๋„๋ก(ํ˜ธํ™˜ ๊ฐ€๋Šฅ) ์ž๋™ํ•˜๋Š” ๋ฐฉ์‹์„ ์ง€์ •ํ•œ๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์€ ๋‹ค์Œ 4๊ฐœ์˜ ์ธํ„ฐํŽ˜์ด์Šค๋กœ ์ •์˜๋˜์–ด ์žˆ๋‹ค.

  • Subscriber
  • Publisher
  • Subscription
  • Processor

๋Œ€ํ‘œ์ ์€ ๊ตฌํ˜„์ฒด๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€๊ฒƒ๋“ค์ด ์žˆ๋‹ค.

  • Reactor 3
  • RxJava (Version 2~)
  • Akka Streams
  • Vert.x
  • Ratpack

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์˜ ๋ชฉํ‘œ๋Š” ๋ชจ๋“  ์ƒํ™ฉ์—์„œ ์‚ฌ์šฉ๊ฐ€๋Šฅํ•˜๊ฒŒ ๋” high-level ์˜ API ์ œ๊ณต์„ ๋ชฉํ‘œ๋กœ ํ•ฉ๋‹ˆ๋‹ค.

์ƒํ˜ธ์ž‘์šฉ

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์€ Publisher ์™€ ๋ฐ์ดํ„ฐ๊ฐ€ ์‹œ์ž‘๋˜๊ณ ,

๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” Subscriber ๊ฐ€ subscribe ๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ์ˆœ๊ฐ„

Publisher ์—์„œ Subscriber ๋กœ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ์ด ์‹œ์ž‘ ๋ฉ๋‹ˆ๋‹ค.

Put them all together

์œ„ ๋‹ค์ด์–ด๊ทธ๋žจ์—์„œ request(n) ์€ backpressure ์„ ์กฐ์ ˆํ•˜๋Š” ๋ถ€๋ถ„์ž…๋‹ˆ๋‹ค.

02.Flux

Flux ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์—ฐ์‚ฐ์ด ์ถ”๊ฐ€๋กœ ์ •์˜๋˜์–ด ์žˆ๋‹ค.

  • ์ƒ์„ฑ (Generate)
  • ๋ณ€ํ™˜ (Transform)
  • ์กฐ์œจ (Orchestrate)

0 ์—์„œ n ๊ฐœ๊นŒ์ง€์˜ <T> ์˜ ์š”์†Œ๋ฅผ ๋ณด๋‚ธ ๋’ค(onNext ์ด๋ฒคํŠธ) ์„ฑ๊ณต(onComplete ๋ฉ”์„œ๋“œ) ํ•˜๊ฑฐ๋‚˜ ์—๋Ÿฌ๋ฅผ ๋ฐœ์ƒ (onError ์ข…๋ฃŒ ๋ฉ”์„œ๋“œ)

์ธ์Šคํ„ด์Šค ๋ฉ”์„œ๋“œ์ธ ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋น„๋™๊ธฐ ์‹œํ€€์Šค๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋‹ค.

flux diagram

์˜ˆ์ œ

// TODO Return an empty Flux
Flux<String> emptyFlux() {
    return Flux.empty();
}

// TODO Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection
Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
}

// TODO Create a Flux from a List that contains 2 values "foo" and "bar"
Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
}

// TODO Create a Flux that emits an IllegalStateException
Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
}

// TODO Create a Flux that emits increasing values from 0 to 9 each 100ms
Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100))
        .take(10);
}

03.Mono

์ตœ๋Œ€ 1๊ฐœ์˜ ์š”์†Œ๋ฅผ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

mono diagram

์˜ˆ์ œ

// TODO Return an empty Mono
Mono<String> emptyMono() {
    return Mono.empty();
}

// TODO Return a Mono that never emits any signal
Mono<String> monoWithNoSignal() {
    return Mono.never();
}

// TODO Return a Mono that contains a "foo" value
Mono<String> fooMono() {
    return Mono.just("foo");
}

// TODO Create a Mono that emits an IllegalStateException
Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
}

04.StepVerifier

Publisher ๋ฅผ ๊ตฌ๋…ํ•˜๋ฉด์„œ ์˜ˆ์ƒ๊ฐ’๊ณผ ์ˆœ์„œ๋ฅผ ๊ฒ€์ฆํ•  ์ˆ˜ ์žˆ์Œ

create ๋ฉ”์„œ๋“œ๋กœ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑ

๋ฐ˜๋“œ์‹œ verify() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด์•ผ ํ•จ

StepVerifier.create(T<Publisher>)
    .{ expectation... }
    .verify()

์ฝ”๋“œ ์˜ˆ์‹œ

void expectFooBarComplete(Flux<String> flux) {
    StepVerifier.create(flux)
        .expectNext("foo")
        .expectNext("bar")
        .verifyComplete();
}

// TODO Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then a RuntimeException error.
void expectFooBarError(Flux<String> flux) {
    StepVerifier.create(flux)
        .expectNext("foo")
        .expectNext("bar")
        .verifyError(RuntimeException.class);
}

// TODO Use StepVerifier to check that the flux parameter emits a User with "swhite"username
// and another one with "jpinkman" then completes successfully.
void expectSkylerJesseComplete(Flux<User> flux) {
    StepVerifier.create(flux)
        .assertNext( u -> assertThat(u.getUsername()).isEqualTo("swhite"))
        .assertNext( u -> assertThat(u.getUsername()).isEqualTo("jpinkman"))
        .verifyComplete();
}

// TODO Expect 10 elements then complete and notice how long the test takes.
void expect10Elements(Flux<Long> flux) {
    StepVerifier.create(flux)
        .expectNextCount(10)
        .verifyComplete();
}

// TODO Expect 3600 elements at intervals of 1 second, and verify quicker than 3600s
// by manipulating virtual time thanks to StepVerifier#withVirtualTime, notice how long the test takes
void expect3600Elements(Supplier<Flux<Long>> supplier) {
    StepVerifier.withVirtualTime(supplier)
        .thenAwait(Duration.ofSeconds(3600))
        .expectNextCount(3600)
        .verifyComplete();
}

์ฐธ๊ณ ์ž๋ฃŒ

https://godekdls.github.io/Reactor Core/testing/

05.Transform

Reactor ์—์„œ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ€ํ˜• / ๋ณ€ํ™˜ ๊ฐ€๋Šฅํ•œ ๋‹ค์–‘ํ•œ ์—ฐ์‚ฐ์ž๊ฐ€ ์žˆ๋‹ค.

Flux Map

๋งŒ์•ฝ ๋‹ค์Œ ์‚ฌํ•ญ์„ ์ •์˜ ํ•ด๋ณธ๋‹ค.

  1. ๋ฐ์ดํ„ฐ ์‹œํ€€์Šค๊ฐ€ (1..5)๊นŒ์ง€ ์žˆ๋‹ค.
  2. map ์—์„œ๋Š” ๊ฐ ๋ฐ์ดํ„ฐ์— 100์„ ๋”ํ•ด์ฃผ๋Š” ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•œ๋‹ค.
  3. ์ด ์ž‘์—…์€ ๊ฐ 10์ดˆ์”ฉ ๊ฑธ๋ฆฐ๋‹ค. (latency)

์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ๋ผ๋ฉด map ์„ ์‚ฌ์šฉํ• ๋•Œ๋Š” ๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ด ์†Œ์š”์‹œ๊ฐ„์€ 500์ดˆ๊ฐ€ ๊ฑธ๋ฆฐ๋‹ค.

์ด๋ฅผ ์œ„ํ•ด ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅํ•œ ๋ฐฉ๋ฒ•์„ ๊ณ ๋ คํ•ด์•ผ ํ• ๊ฒƒ์ด๋‹ค.

Flux Flatmap

Flux ๋˜๋Š” Mono ๋ฅผ ์ด์šฉํ•˜๊ณ  flatMap ์—ฐ์‚ฐ์„ ์ด์šฉํ•˜๋ฉด ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

flatMap ์€ Publisher ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฉ”์„œ๋“œ๊ฐ€ ๋‚ด๋ถ€์ ์œผ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

map : T -> U flatMap: T -> Publisher<U>

Publisher<U> ์˜ ์‘๋‹ต๊ฐ’์— ๋”ฐ๋ผ ๊ฒฐ๊ณผ ์ˆœ์„œ๊ฐ€ ๋ฐ”๋€”์ˆ˜ ์žˆ๋‹ค. (๋Œ€๋ถ€๋ถ„ ๋ฐ”๋€œ)

์ด๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•˜๋Š” ์—ฌ๋Ÿฌ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค. (flatMapSequential)

06.Merge

์—ฌ๋Ÿฌ ์‹œํ€€์Šค๋ฅผ ๋ณ‘ํ•ฉํ•˜์—ฌ ํ•˜๋‚˜์˜ Flux ๋กœ ์ „๋‹ฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

merge.png

Publisher ์˜ ์ˆœ์„œ์™€ ์ƒ๊ด€ ์—†์ด ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœ์ƒํ• ๋•Œ๋งˆ๋‹ค Flux ๋กœ ์ „๋‹ฌ

// TODO Merge flux1 and flux2 values with no interleave (flux1 values and then flux2 values) 
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
    return Flux.concat(flux1, flux2); 
}

concat.png

Publisher ์˜ ๋ฐœํ–‰์ด ์ข…๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•œ ํ›„, ๋‹ค์Œ ์ธ์ž์˜ Publisher ์˜ ์ธ์ž๋ฅผ ์ „๋‹ฌํ•จ

Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
    return Flux.concat(mono1, mono2); 
}

07.Request

08.Error

09.Adapt

10.Other Operations

11.Reactive to Blocking

12.Blocking to Reactive