본문 바로가기

3.구현/Java or Kotlin

[java] CompletableFuture 사용하기

들어가기

java.util.concurrent 패키지에는 비 동기 처리를 하기 위한 JDK 라이브러리를 제공한다. 그 중에서도 가장 간편하고 기능도 풍부한 CompletableFuture에 대해서 살펴보자.

작성자: ospace114@empal.com, http://ospace.tistory.com/

단순한 예제

CompletableFuture는 완료가능한 작업을 지원하는 Future이다. CompletableFuture 클래스 선언이다.

public class CompletableFuture<T> extends Object implements Future<T>, CompletionStage<T> {...}

Future와 CompletableFuture 인터페이스를 구현하고 있다. 대표적인 메소드인 complete()와 get()를 사용해서 처리할 수 있다.

  • complete(): 결과를 알림
  • get(): 결과를 획득(동기적 호출)

이외에도 다양한 메소드가 있지만 가장 기본적인 두 메소드를 활용한 예를 보자.

CompletableFuture<String> f = new CompletableFuture<>();
f.complete("world");
System.out.printf("hello %s\n", f.get());

생각보다 너무 단순한다. complete()에서 저장된 문자열을 get()으로 추출한다. 이를 가지고 비동기로 활용하기 위해 쓰레드를 사용하도록 확장해보자.

CompletableFuture<String> f=new CompletableFuture<>();
new Thread(()->{
    try {
        TimeUnit.SECONDS.sleep(1);
        f.complete("world");    
    } catch (InterruptedException e) {
        f.completeExceptionally(e);
    }
}).start();
System.out.printf("hello %s\n", f.get());

쓰레드에 의해서 별도로 실행해서 처리해서 complete()로 결과를 저장하고 get()에 의해서 결과를 별도로 획득하고 있다. get()은 비동기로 앞에 결과를 리턴될 때까지 대기하게 된다. 즉, 1초 후에 결과가 완료될 때에 get()에서 리턴된다. get()은 비동기 호출이 된다. 뭔가 그럴듯 하다. ㅡ.ㅡ;;;
이렇게 사용한다면 CompletableFuture를 제대로 활용하고 있지 않다. 이미 내부적으로 비동기 처리할 수 있는 기능이 많이 가지고 있다.

CompletableFuture 비동기 처리

Thread 대신에 CompletableFuture를 사용하면 간단하게 처리할 수 있다.

CompletableFuture<String> f = null;
f = CompletableFuture.supplyAsync(()->{
    return "world";
});
System.out.printf("hello %s\n", f.get());

supplyAsync()은 결과를 반환하는 비동기 처리해준다. 내부적으로 리턴된 결과를 자동으로 CompletableFuture에 저장되고 get()에 의해서 결과를 조회할 수 있다. 다음은 supplyAsync()의 시그니처이다.

static <U> CompletableFuture<U>    supplyAsync(Supplier<U> supplier)

이번에는 runAsync()를 사용하는 다른 예제를 보자.

CompletableFuture.runAsync(()->{
    try {
        TimeUnit.SECONDS.sleep(1);
        System.out.printf("runAsync\n");
    } catch (InterruptedException e) { }
}).get();

runAsync()는 결과가 없이 비동기 처리하고 종료된다. 앞에 supplyAsync()은 리턴결과가 있다면 runAsync()은 리턴타입이 void형으로 리턴결과가 없다.get()에 의해서 비동기 호출이 되면서 실행이 끝날때까지 대기하기 된다.
다음은 runAsync()의 시그니처이다.

static CompletableFuture<Void>    runAsync(Runnable runnable)

CompletableFuture의 다양한 활용법을 살펴보자.

2개 작업을 조합하기

thenCombine()을 사용하여 2개 작업을 조합해서 처리할 수 있다. 예제를 보자.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("supplyAsync1 running...");
    return "supplyAsync1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("supplyAsync2 running...");
    return "supplyAsync2";
});

CompletableFuture<String> future = future1.thenCombine(future2, (l,r)-> {
    return l + " and " + r;
});

System.out.println(future.get());

CompletableFuture 객체인 future1와 future2를 생성하고 future2의 thenCombine()을 호출한다. 처리되는 순서는 상관없이 실행되고 thenCombine()에서 결과는 future1 결과는 l에 future2 결과는 r에 넘겨지게 된다.
다음은 thenCombine()의 시그니처이다.

<U> CompletableFuture<U>    thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

2개 작업을 순서대로 처리

thenCombine()은 두개 작업을 동시에 처리되지만 thenCompose()는 순서대로 작업을 처리한다. 인자로 CompletableFuture를 받아서 비동기 작업 처리한다.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("supplyAsync1 running...");
    return "supplyAsync1";
});

CompletableFuture<String> future = future1.thenCompose(v->{
    return CompletableFuture.supplyAsync(()->v+"->thenCompose");
});

System.out.println(future.get());

thenCombine()은 순서를 보장하기 때문에 future1를 먼저 실행하고 다음에 CompletableFuture 객체를 실행한다.
다음은 thenCombine()의 시그니처이다.

<U,V> CompletableFuture<V>    thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

3개 이상의 작업을 제어하기

allOf()는 여러 비동기 작업이 모두 끝날때 까지 대기한다.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {}
    System.out.println("supplyAsync1 done");
    return "supplyAsync1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {}
    System.out.println("supplyAsync2 done");
    return "supplyAsync2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(()->{
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {}
    System.out.println("supplyAsync3 done");
    return "supplyAsync3";
});

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.get();

System.out.printf(“Result: %s, %s, %s\n”, future1.get, future2.get(), future3.get());

CompletableFuture인 future1, future2, future3 객체가 있다. 3개 객체를 allOf()에 넘기고 get()을 호출하며 3개 객체가 모두 끝날때까지 대기한다. 그렇기에 allOf()은 단점은 실행시간이 가장 긴 CompletableFuture 객체가 끝날때까지 대기해야 한다.

다음은 allOf()의 시그니처이다.

static CompletableFuture<Void>    allOf(CompletableFuture<?>... cfs)

anyOf()는 여러 작업 중에 임의 작업 하나가 끝나며 대기가 종료된다. 그리고 그 작업의 결과를 리턴받는다.
앞의 예제에서 연속해서 작업해보자.

CompletableFuture<Object> allFutures = CompletableFuture.anyOf(future1, future2, future3);
System.out.println("Done: " + allFutures.get());

다음은 anyOf()의 시그니처이다.

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

에러 처리는?

CompletableFuture에 의해서 실행하는 중인 비동기 작업에서 발생한 예외는 ExecutionException 예외로 감싸서 예외를 발생한다.

CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
    System.out.println("runAsync running...");
    throw new RuntimeException("ERROR!!!");
});

try {
    future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
System.out.println("Done");

예외의 메시지를 출력하면 실제 발생한 예외를 확인가능하다.
만약 반환 값이 있는 supplyAsync()을 사용할 경우 exceptionally()을 사용할 수 있다.

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    System.out.println("sypplyAsync running...");
    int values[] = {};
    return values[0];
}).exceptionally(e->{
    e.printStackTrace();
    return -1;
});

System.out.println("Done: " + future.get());

exceptionally()은 별도 try-catch 문을 사용하지 않도 되고, 각 CompletableFuture 별로 처리할 수도 있다. 그러나 예외처리된다고 해서 리턴값이 없는게 아니다. 에러는 처리하고 별도로 리턴도 해야한다.
다음은 exceptional() 시그니처이다.

CompletableFuture<T>    exceptionally(Function<Throwable,? extends T> fn)

결론

CompletableFuture를 사용해서 쓰레드를 기반한 비동기 처리는 쉽게 적용 가능하다. 다양한 비동기 간에 작업 조합도 쉽게 구현할 수 있다. 자바의 Executor에서도 적용해서 사용할 수 있다. 비동기 작업을 사용할게 된다면 Thread을 사용해서 구현하기보다 CompletableFuturer를 사용하게 더 효율적이고 안정적이라고 말할 수 있다.
부족한 글이지만 도움이 되었으면 하네요. 모두 즐거운 코딩생활하세요.^^ ospace.

참조

[1] Guide To CompletableFuture, http://www.baeldung.com/java-completablefuture
[2] Class CompletableFuture, https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

반응형