본문 바로가기

3.구현/C or C++

[c++] std::async와 std::thread 사용하기

들어가기

이 글은 C++에서 비동기 전체를 다루지는 않는다. 이미 thread 작동 방식이나 사용에 대해서는 어느정도 알고 있다고 가정한다. std::thread와 std::async에 대한 간단한 사용법과 각 자원 해제시 주의점, 몇 가지 고려할 점을 정리했다.

작성자: ospace114@empal.com, http://ospace.tistory.com/

맛보기

비동기 처리에 있어서 std::thread와 std::async을 사용하는 방법이 있다. std::thread와 std::async은 결과는 같지만 실제 동작과 인터페이스가 조금씩 다르다. std::thread는 스레드 기반(thread-based) 프로그래밍이고 std::async는 과제 기반(task-based) 프로그래밍이다. std::thread는 낮은 수준의 인터페이스를 지원하며 std::async가 좀더 높은 수준의 추상화를 지원한다. 그렇기 때문에 std::thread는 단순 쓰레드만 생성하고 동기화나 쓰레드풀 등의 모든 처리는 직접해줘야 한다. std::async은 동기화나 쓰레드 풀등의 많은 처리를 해주기 때문에 신경쓸 일이 줄어든다. std::thread의 장점은 직접 처리하기에 성능이나 반응성에 최적화에 좀더 유리하다. 또한 저수준 API을 접근할 수 있기 때문에 하드웨어 특성에 종속적인 부분을 개발할 때 유리하다.

간단하게 사용하는 예제를 보자. 먼저 std::thread를 살펴보자.

#include <iostream>
#include <thread>

int run() {
    std::cout << "thread run!\n";
    return 1;
}

int main() {
    std::thread t(run);
    t.join();
}

std::thread는 쓰레드를 새로 생성해서 run()을 실행한다. 그리고 join()을 통해서 쓰레드 종료를 대기한다.

다음으로 std::async 예제를 살펴보자.

#include <iostream>
#include <future>

int run() {
    std::cout << "async run!\n";
    return 1;
}

int main() {
  auto fut = std::async(run);
  fut.get();
  std::cout << "fut is " << fut.get() << ".\n";
}

std::async는 future 객체를 리턴하는데 wait()을 통해서 실행 종료를 기다릴 수 있다. get()을 통해서 반환 결과를 획득할 수 있다.
std::async을 반드시 쓰레드를 생성해서 실행하지 않는다. 설정에 따라 실행되는데, 기본은 쓰레드 또는 넌블럭킹 형태로 동작할 수 있다. 물론 강제로 쓰레드로 실행하도록 지정할 수 있다.

async 실행정책 지정하기

std::async는 동기와 비동기 실행을 허용한다. 그러나 어떤 기준을 실행될지 명확하지 않기 때문에 실행 정책(launch policy)을 선택할 수 있다.

  • std::launch::async - 반드시 스레드를 사용한 비동기로 실행
  • std::launch::deferred - future 객체의 get() 또는 wait()을 호출할 때에 실행

std::async의 기본 실행 정책은 위의 두가지가 같이 구성되었다. 즉, 아래와 같은 형태이다.

int f();
auto fut = std::async(std::launch::async | std::launch::deferred, f);

그렇기 때문에 std::async는 어떻게 실행될지 모르기 때문에 미리 예측해서 작업하기에는 문제가 발생할 수 있다. 예를 들어 스레드라고 가정하고 thread_local 변수를 만들어서 사용하게 된다면 예측 불가능 행동을 할 수 있다.

만약 비동기 모드로 사용하고 싶다면 다음 처럼 실행하면 된다.

auto fut = std::async(std::launch::async, f);

이를 범용적인 함수로 만든다면 아래 처럼 만들 수 있다.

template<typename F, typename... Ts>
inline
std::future<typename std::result_of<F(Ts...)>::type>
reallyAsync(F&& f, Ts&&... params) {
  return std::async(
    std::launch::async,
    std::forward<F>(f), 
    std::forward<Ts>(params)...
  );
}

이 함수는 C++11 버전이고 C++14는 auto 사용으로 더 단순해진다. 음~~ 더 복잡(?)….

std::thread 해제하기

std::thread는 소멸될 때에 다른 객체와 다르게 join 또는 detach가 필요하다. 만약 join과 detach가 없이 소멸되는 경우는 가장 나쁜 상황이 발생할 수 있다. 그러나 std::thread에서 join과 detach 사용에도 주의가 필요하다. std::thread은 join 가능한(joinable) 상태이거나 join 불가능한(unjoinable) 상태로 구분된다. 다음의 join 불가능한 상태가 아니면 join 가능한 상태가 된다.

  • 실행 함수 없는 기본 std::thread
  • 다른 std::thread로 이동 후의 std::thread
  • join된 std::thread
  • detach한 std::thread

앞의 경우에 해당하는 std::thread는 대응되는 스레드가 없는 상태이다. 그렇기 때문에 join이 불가능한 상태가 된다. join가능한 상태인 경우 적절한 처리를 해줘야 한다. 생각하기에는 std::thread 소멸자에서 알아서 join() 또는 detach()을 해주면 좋은데 상황에 따라 달라질 수 있어서 알아서 처리해주지 않는다. 프로그래머가 이런 부분을 상황에 맞게 처리해줘야 한다.

간단한 예를 보자. 다음과 같은 실행 함수가 있다고 하자. 스레드를 실행하고 조건에 따라 결과에 대해 추가적인 처리하는 함수이다.

bool doWork(std::function<bool(int)> filter, int size) {
    std::vector<int> res;
    std::thread t([&filter, size, &res] {
        for(int i=0; i<size; ++i) {
            if (filter(i)) res.push_back(i);
        }
    });

    if (isSatisfied()) {
        t.join();
        process(res);
        return true;
    }
    return false;
}

std::thread에서 filter()조건에 따라 res에 추가한다. 그리고 isSatisified()에 결과가 true이면 join()하고 res을 처리한다.

여기서 std::thread는 isSatisfied()가 true인 경우 정상적으로 처리되고 완료된다. 그렇지 않다면 std::thread는 doWork()가 종료될 때에 지역 변수가 파괴되고 스레드는 잘못된 접근이 발생할 수 있다.

RAII(Resource Acquisition Is Initialization)을 도입하여 std::thread용 RAII 클래스인 ThreadRAII 클래스를 작성해보자.

class ThreadRAII {
public:
    ThreadRAII(std::thread&& t, bool isJoin) : isJoin(isJoin), t(std::move(t)) {}
    ~ThreadRAII() {
        if (t.joinable()) {
            if (isJoin) {
                t.join();
            } else {
                t.detach();
            }
        }
    }
    std::thread& get() { return t; }

private:
    bool isJoin;
    std::thread t;
};

이 클래스의 핵심은 소멸자에 있다. std::thread가 join 가능한지 여부를 확인하고 join 적용인 경우 join()을 호출하고 아닌 경우 detach()을 호출한다. 즉, ThreadRAII 변수가 범위를 벗어날 경우 자동으로 std::thread을 관리한다. 추가로 ThreadRAII 생성자는 std::thread 오른값만 받는다. 이는 std::thread 객체를 이동하기 위해서이다.

ThreadRAII 클래스를 doWork()에 적용해보자.

bool doWork(std::function<bool(int)> filter, int size) {
    std::vector<int> res;
    ThreadRAII t(std::thread([&filter, size, &res] {
        for(int i=0; i<size; ++i) {
            if (filter(i)) res.push_back(i);
        }
    }), true);

    if (isSatisfied()) {
        t.get().join();
        process(res);
        return true;
    }
    return false;
}

조금더 안전하게 변경되었다. 더 개선할 수 있지만, 배보다 배꼽이 커지는 것 같아서 다루지 않겠다.

std::future 이해하기

std::async 수행에 의한 future 객체는 내부적으로 std::thread처럼 스레드 핸들이 있을 수 있다. 이 경우 해제할 때에 std::thread와는 차이가 있다. std::async 수행 결과를 promise 객체에서 future 객체로 전달해야하다. 결과를 저장할 장소로 promise내는 작업이 종료되고 get() 호출시 유효하지 않을 수 있고 future 내에 저장하기에는 저장할 때에 future가 소멸되서 유효하지 않아서 적당하지 않다. 그래서 외부에 공유상태(shared state)로 저장한다. 이 공유 상태로 인해 future의 소멸 작업 형태를 결정한다.

std::launch::async 정책으로 실행된 비지연 std::async를 마지막으로 참조하는 future 객체 소멸자는 작업 완료될 때까지 차단되는 암묵적 join을 수행하고 이외의 다른 future 객체는 소멸시 자신 개체를 정상 제거한다.

이런게 만든 이유는 표준위원회에서 암묵적 detach 문제를 피할려고 암묵적 join 타협안을 선택했다. 그러나 future 객체 소멸자가 차단 조건을 알수 없다. 이런 원인 중에 하나로 공유상태를 참조하는지 알 수 없는데 있다.(?)

std::package_task 예제를 살펴보자. 이는 주어진 함수를 비동기적으로 실행할 수 있게 포장한다. 실행결과는 공유상태에 저장하고 공유상태에서 future 객체를 얻기 위해 get_future()를 호출한다. 실제 실행은 std::thread이 수행한다.

int calcValue();
std;:packaged_task<int()> pt(calcValue);
auto fut = pt.get_future();
std::thread t(std::move(pt));

여기서 future은 std::async의 공유상태를 참조하지 않으므로 소멸자는 정상 행동한다. 그리고 std::packaged_task는 복사할 수 없으므로 이동해야 한다.

주의할 부분은 생성된 std::thread를 어떻게 처리할지이다. 이미 std::thread에 대해서 어떻게 처리할지 프로그래머가 결정하기 때문에 fut 객체에서는 특별한 작업이 필요 없다.

스레드간 통신

스레드간 통신은 조건 변수(condition variable)을 사용하고 조건을 검출하는 작업을 검출 작업(detecting task)라고 하고 이에 반응하는 작업를 반응 작업(reaction task)이라고 한다. 검출 작업이 메시지를 보내는 측이고 반응 작업이 메시지를 받아서 처리하는 측이다. 필요로 하는 객체는 아래와 같다.

std::condition_variable cv;
std::mutex m;

검출 작업 코드는 아래와 같다.

cv.notify_one();

반응 작업 코드는 다음과 같다.

{ // mutex 범위 한정
    std::unique_lock<std::mutex> lock(m);
    cv.wait(lock);
    ...
}

코드 냄새(code smell)가 난다. 이는 mutex로 인한 부분으로 이를 통해 공유 데이터에 대한 접근 제어에 사용되지만 서로 논리적으로 충돌이 없으면 사용할 필요가 없다. 조건 변수에서 mutex를 사용한다면 뭔가 문제가 있다. 추가적인 문제로 wait 실행 전에 검축 작업이 조건 변수를 notify하면 반응 작업이 notify를 놓치고 영원히 wait하게 된다. 또한 wait는 notify 없이도 깨어날 수 있기에 재확인이 필요하다. 그래서 wait 호출할 때에 조건 판정용 람다식을 인자로 넘겨줄 수 있다.

cv.wait(lock, []{ return 사건발생여부; });

대안으로서 조건 변수보다 플래그를 고려한다.

std::atomic<bool> flag(false);
...
flat = true;

반응 작업에서 플래그를 풀링한다.

while(!flag) {
  ...
}

이는 앞의 문제점을 모두 해결하지만 풀링 비용이 발생한다. 대기 중인 상태여도 계속 실행되며 문맥 전환 비용이 발생한다. 그래서 조건 변수와 플래그를 결합하기 한다.

std::condition_variable cv;
std::mutex m;
bool flag(false;

검출 작업 코드이다.

{
  std::lock_guard<std::mutex> lock(m);
    flag = true;
}
cv.notify_one();

반응 작업 코드이다.

{
    std::unique_lock<std::mutex> lock(m);
  cv.wait(lock, []{ return flag; });
  ...
}

이 방식은 문제가 없지만 깔끔하지는 않다. 더 낳은 방법이 std::promise와 이에 대응하는 std::future 객체를 사용한다.

std::promise<void> p;

검출 작업 코드 이다.

p.set_value();

반응 작업 코드이다.

p.get_future().wait();

완벽해 보이지만 공유상태 할당/해제 비용 고려가 필요하다. std::promise는 단발성(one-shot) 매커니즘으로 여러 번 사용하도록 만들어야 한다.

std::promise<void> p;

void detect() {
  std::thread t([]{
    // 반응 작업
    p.get_future().wait();
    react();
  });
  ...
  p.set_value(); //검출 작업
  ...
  t.join();
}

std::thread에 의해서 새 future에 대해 wait()을 수행한다. 그리고 p에 대해 set_value()로 알려준다. detect() 외부에서 t를 합류 불가능하게 해야 한다.

여기서 반응 과제를 여러 개를 처리하려면 std::shared_future을 사용해야 한다. 이를 사용하기 위해 share()함수로 해당 객체를 획득할 수 있다.

std::promise<void> p;

void detect() {
  auto sf = p.get_future().shared();

  std::vector<std::thread> ts;
  for(int i=0; i < nThreads; ++i) {
    ts.emplace_back([sf]{
      sf.wait();
      react();
    });
  }
  ...
  p.set_value(); //검출 작업
  ...
  for (auto& t : ts) t.join();
}

std::automic과 volatile

C++에서 volatile는 스레드에서 별다른 의미가 없다. volatile 변수는 두 스레드에서 변수를 최적화 없이 그대로 접근 가능하며, atomic은 동기적으로 접근 가능하다. 다음 예제로 차이점을 확인해보자.

std::atomic<int> ac(0);
volatile int vc(0);

// thread 1
++ac;
++vc;
// thread 2
++ac;
++vc;

두개 스레드에서 ac와 vc를 동시에 접근한다. 각각 두번씩 더하고 있다. 실행이 끝나면 ac는 2가 보장되지만 vc는 2가 아닐 수 있다. 위의 접근에서 RMW(읽기, 변경,쓰기)인 3가지 작업이 하나의 원자적 실행을 보장하지 않는다. std::atomic에서 원자적 실행을 보장하지만 volatile 변수는 보장하지 않는다.

std::atomic의 모든 연산은 원자성을 보장한다. 주의할 부분은 복사 생성을 지원하지 않기 때문에 할당 연산도 사용할 수 없다. 그렇기 때문에 대안으로 load()와 store()를 사용해야 한다.

std::atomic<int> y(x.load());
y.store(x.load());

volatile 변수 활용에 대해 살펴보자. volatile은 컴파일러에서 해당 변수는 특수한 변수라고 알려준다. 컴파일러 최적화에 의해 변경없이 여러 번 읽거나 변경된 값이 읽기 없이 변경할 때에 이전 변경은 제거할 수 있다.

auto y = x;
y = x;  // 변경없이 읽음(최적화 대상)
x = 10; // 변경되었지만 읽지 않음(최적화 대상)
x = 20; // 변경

쓸모없는 읽기(redundant reads)와 불필요한 저장(superfluous writes)이다. 메모리 대응 입출력(memory-mapped I/O)같은 특별한 메모리는 여러번 저장 작업은 외부 장치로 전송하는 명령일 수 있다. 이럴 경우 해당 변수를 volatile로 지정해야 앞의 최적화 작업으로 인해 불필요한 저장이 제거되지 않고 반복적인 작업을 수행한다.

마무리

std::thread와 std::async 내용이 전체적으로 내용이 매끄럽지 못했지만 최대한 정리해보았다. 부족한 글이지만 여러분에게 도움이 되었으면 합니다. 즐거운 코딩생활 되세요. ospace.

참고

[1] 스콧 마이어스, Effective Modern C++, 인사이트

반응형