본문 바로가기

3.구현/Java or Kotlin

Reactive Programming 맛보기

Reactive란?

Reactive는 변화가 발생하면 대응하는 프로그래밍 모델이라고 할 수 있다. 예를 들어 네크워크 구성요소는 I/O 이벤트에 대응하고 UI 컨트롤러는 입력 이벤트에 대응한다.
Reactive 프로그래밍은 데이터 처리에서 비동기 데이터 스크림과 이벤트드리븐 방식으로 접근하는 패러다임이다. 보통 관찰자Observer 패턴을 사용해서 감시하고 이벤트를 받는다. 또한 함수형 프로그래밍으로 데이터 스크림 조작 및 처리하는데 사용된다. 논블록킹 배압관리(nonblocking backpressure)로 생산속도와 소비소도를 조절하여 리소스 누수나 성능 문제를 해결한다. backpressure는 push 시나리오에서 생산자가 소비자보다 더 빠른 경우 생산자에게 천천히 보내라는 시그널이다.

작성자: ospace114@empal.com, http://ospace.tistory.com/

Reactive library

대표적인 reactive 라이브러리는 Reactor와 RxJava가 있다.

RxJava는 Netflix가 만든 라이브러리로 오픈소스로 공개했고 현재는 ReactiveX에서 오픈소스로 배포하고 있다. Reactor은 Spring을 만든 팀인 Pivotal에서 만든 프레임워크이다. Java8이상을 사용한다면 Reactor3을 권장하며 Java6을 사용하면 RxJava2을 권장한다. 여기서는 reactor을 사용한다.
Reactor은 Java8을 기반으로 만들어졌다. Reactor에서는 두가지 형태의 publisher가 있다. 하나는 Mono로 0 또는 1개 요소를 반환하고 다른 하나는 Flux로 0 또는 N개 요소를 반환한다. 또한 Java8의 Stream과 CompletableFuture가 Mono와 Flux로 변환되고 반대도 가능하다.

Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Flux<String> flux1 = Flux.just("Hello", "World!");
Flux<String> flux2 = Flux.fromArray(new String[]{"Hello", "World!"});
Flux<String> flux3 = Flux.fromIterable(Array.asList("Hello", "World!"));

Reactor을 사용한 예를 살펴보면서 reactive 프로그래밍이 무엇이지 맛을 보자.

Dependency 추가

Reactor 라이브러리를 사용한 간단한 예를 살펴보자. dependency을 추가하자.

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.12</version>
</dependency>

간단한 예1

Flux을 생성하고 출력하는 예이다. 팩토리 메소드인 just()과 fromIterable()에 의해서 Flux을 생성할 수 있다. subscribe()은 producer에서 전송하라는 backpressure이다.

List<String> words = Arrays.asList("the", "qukck", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
Flux<String> fewWords = Flux.just("Hello", "World!");
Flux<String> manyWords = Flux.fromIterable(words);
fewWords.subscribe(System.out::println);
manyWords.subscribe(System.out::println);

간단한 예2

문자열에서 개별 문자을 출력한다. 모든 단어에 “s”을 덧붙이고 distinct()와 sort()을 사용해서 중복 문자를 필터링하고 정렬한다. zipWith()와 range()을 사용해서 개발 문자의 인덱스를 출력한다.

List<String> words = Arrays.asList("the", "qukck", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
Mono<String> missing = Mono.just("s");
Flux<String> allLetters = Flux
  .fromIterable(words)
  .flatMap(word -> Flux.fromArray(word.split("")))
  .concatWith(missing)
  .distinct()
  .sort()
  .zipWith(Flux.range(1, Integer.MAX_VALUE),
      (str, cnt) -> String.format("%2d. %s", cnt, str));

allLetters.subscribe(System.out::println);

간단한 예3

delaySubscriptionMillis()을 사용하면 지연을 가지고 전달되다. 그렇기 때문에 “Hello”만 출력되고 “world”은 지연된다.

Flux<String> helloDelayedWorld = Mono
    .just("Hello")
    .concatWith(
        Mono.just("world")
            .delaySubscription(Duration.ofMillis(500)));

helloDelayedWorld.subscribe(System.out::println);

이를 해결하가 위해 toIterable()과 toStream()을 사용해서 스트림 블럭킹 형태로 변경한다.

Flux<String> helloDelayedWorld = Mono
    .just("Hello")
    .concatWith(Mono
        .just("world")
        .delaySubscription(Duration.ofMillis(500)));
helloDelayedWorld.toStream().forEach(System.out::println);

간단한 예4

firstEmitting()은 첫번째 Flux을 전달한다. a는 450ms 지연이 있고 b는 400ms 지연이 있고서 b가 먼저 발생한다. firEmitting()으로 두개를 같이 사용하면 b가 먼저 발생하므로 출력된다.

Mono<String> a = Mono
    .just("oops I'm late")
    .delaySubscription(Duration.ofMillis(450));
Flux<String> b = Flux
    .just("let's get", "the party", "started")
    .delaySubscription(Duration.ofMillis(400));

Flux.firstWithSignal(a, b).toIterable().forEach(System.out::println);

간단한 예5

1에서 10까지 숫자를 생성하고 제곱해서 출력하는 예이다. reduce()에 의해서 Flux의 모든 항목을 처리해서 Mono로 리턴되기에 업스트림에 Long.MAX_VALUE로 backpressure 신호를 보내며, 완전한 푸시 형태로 동작하게 된다.

Flux.range(1, 10)
    .map(num -> num * num)
    .reduce((pre, val)->pre+val)
    .subscribe(
        val -> System.out.println("Sum: " + val),
        err -> System.err.println("Error: " + err),
        () -> System.out.println("Done"));

결론

간단하게 reactive 프로그래밍을 살펴보았다. 예제에서는 reactor의 극히 일부만 다루었다. 상당히 많은 API가 있고 버전에 따라서 변경되는 사항이 있기 때문에 확인이 필요하다. Deprecated되거나 이름 바뀌는 부분이 많이 때문에 주의가 필요하다.

reactive 프로그래밍이 넌블럭킹을 잘 활용하기 위한 기술이라고 볼 수 있다. 잘 사용하면 최적의 효율을 가져올 수 있지만, 잘 못 사용하며 더 나쁜 성능을 보여줄 수 있기에 사용상에 주의가 필요하다. 특이 이런 비동기 형태의 작업에서는 디버깅이 매우 어려워지기 때문에 에러가 발생할 경우 디버깅을 어떻게 할지에 대한 고민도 함께 필요하다. 도움이 되었기를 바라며 즐프하세요. ospace.

참고

[1] Hyoj, RxJava vs Reactor 뭐가 좋나요, https://hyoj.github.io/blog/java/spring/rxjava-vs-reactor/

[2] https://github.com/ReactiveX/RxJava

[3] Simon Baslé, Reactor by Example, https://www.infoq.com/articles/reactor-by-example/

[4] Class Mono, https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

[5] Class Flux, https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

[6] Deprecated API, https://projectreactor.io/docs/core/release/api/deprecated-list.html

반응형