본문 바로가기

2.분석 및 설계/Pattern

ThreadPool 개념과 구조

들어가기

Thread Pool은 여러 개의 쓰레드를 두고 사용자 요청이 오면 쓰레드에 할당하여 작업을 수행하는 프로그래밍 기법을 말한다. Thread 여러 개를 Pool에 놓아두고 필요에 의해 꺼내쓰고, 다 쓰면 다시 Pool에 넣어두는 개념이다. 이런 비슷한 것이 BufferPool이라는 것도 있다. 쓰레드는 생성하는데 자원이 많이 필요하며, Context Switching에 의한 성능 저하가 발생한다. 그렇기 때문에 Thread Pool에 미리 thread를 생성해두고, 생성된 쓰레드로 사용을 제한하면, 시스템 자원을 효율적으로 사용할 수 있게 된다.

작성자: http://ospace.tistory.com/,2011.01.31 (ospace114@empal.com)

ThreadPool 요구사항

ThreadPool를 두는 이유는 각종 쓰레드 처리는 한 곳에 관리하며 시스템 전체적인 성능효율을 향상시킨다. 특히 thread의 반복적인 생성과 해제를 제한시켜 불필요한 작업을 없애는 목적이 크다.
이 글에서는 Java의 thread framework를 기반으로 ThreadPool를 구성하였다. 기본적인 구조만 다루었기 때문에 세부적인 코딩은 별도로 해줘야한다. 여기서는 개념을 잡고 전체적인 구조를 구성하는데 도움을 주기 위해서 작성하였다. 그리고 사용하는 코드는 C++를 기반으로 작성했다.
더 진행하기 전에 필자가 생각하고 있는 Thread Pool의 요구사항을 정리해보았다. 이는 필수는 아니기 때문에 적절한 수준에서 타협할 수 있다.

  • ThreadPool은 사용자로 부터 작업요청을 받아서 처리해야 한다.
  • 사용자는 Thread Pool 관리방식을 선택할 수 있게 해야한다.(ex. 동적, 정적)
  • 사용자가 Thread Pool에 요청한 작업을 우선순위에 의해서 처리할 수 있어야 한다.
  • 사용자가 Thread Pool에 모든 작업을 종료할 수 있어야 한다.
  • 사용자는 Thread Pool의 모든 작업이 중지할 때까지 대기 할 수 있어야 한다.
  • 사용자는 Thread Pool에 모든 작업을 즉시 중지할 수 있어야 한다.
  • 사용자는 Thread Pool에 요청한 작업을 취소 가능하다.
  • 사용자는 Thread Pool에 취소 요청한 작업을 취소되었는지 확인할 수 있어야 한다.

기본구조

처리는 진행하기 전에 기본 구조를 잡고 가자. 여기에 살을 붙이면서 하나씩 기능을 확장해보자.
일단 작없을 수행할 Thread가 있어야 하면 이런 Thread를 보관할 ThreadPool이 있어야 한다. 그럼 간단한 구조를 살펴보면;

Fig.01

일 단 Thread Pool은 ThreadPool로 이름을 사용하고, Thread은 Worker로 작업수행하는 역활로 명명하였다. ThreadPool은 여러 개의 Worker를 가지며, 그래 변수명을 workers로 하였다. 이를 기본으로 하여 확장하도록 하자.

class Worker
{
};

class ThreadPool
{
public:
    ThreadPool(int workerCount)    :
        workerCount_(workerCount),
        workers_(new  Worker[workerCount])
    {}

    ~ThreadPool()
    {
        delete[] workers_;
    }

private:
    Worker *workers_;
    int     workerCount_;
};

자동으로 ThreaddPool 인스탄스될때에 Worker객체를 원하는 개수만큼 생성하도록 하였다. 그리고 ThreadPool이 해제되면 자동으로 Worker도 해제된다.

사용자로부터 처리 요청 받기

사용자로 부터 처리할 로직을 받아서 Worker에 의해서 처리된다. 즉, 데이터가 아니라 로직을 받는다. 데이터는 클래스 인스탄스에 캡슐화되면 된다.
로직이 분리되어 특정 구현에 종속적이지 않고 범용적으로 사용할 수 있게 된다. 그럼 Worker에서 로직 부분을 분리하고, 이를 사용자로부터 받는 구조를 생성해보자.
그리고 당연히 요청들을 임시 보관해두었다가 Worker에 요청을 할당 할 수 있게 해야겠다.
역활을 2가지로 구분할 수 있다. 사용자로부터 처리 요청을 받아서 큐에 저장하는 역활과 처리 요청을 수행하는 역활로 구분할 수 있다. 전자는 ThreadPool에서 수행하며 후자는 Worker에 의해서 수행된다.

Fig.02

추가된 부분 인터페이스로 IRequestAllocator가 있고, 내부 메소드들이 추가되었다.
Callable 인터페이스에는 call이라는 가상함수를 추가했다. 이를 통해서 해당 요청을 호출해서 처리하게 된다.

struct Callable
{
    virtual void call() = 0;
};

IRequestAllocator 은 Worker가 ThreadPool를 접근하기 위핸 것으로 주로, 작업요청과 작업 시작/끝을 관리한다. 즉 Worker는 IRequestAllocator에 의해서 처리할 작업을 요청하고 작업이 시작과 종료되었음을 알린다.

struct IRequestAllocator
{
    Callable* popRequest() = 0;   
    void      workBegin() = 0;
    void      workEnd() = 0;
};

그럼 IRequestAllocator를 구현한 ThreadPool를 살펴보자. ThreadPool은 IRequestAllocator 기능 외에 사용자로 부터 처리 요청한 작업을 저장하는 기능도 포함되어 있다.

class ThreadPool : public IRequestAllocator
{
public:
    ThreadPool(int workerCount)    :
        workerCount_(workerCount),
        workers_(new  Worker[workerCount]),
        workingCount_(0)
    {}

    ~ThreadPool()
    {
        delete[] workers_;
    }

    void putRequest(const Callable *callable)
    {
        requestLocker_.lock();
        requests.push_back(callable);
        requestLocker_.unlock();
        condVar_.notify();
    }

    Callable* popRequest()
    {
        Callable* request = NULL;
        requestLocker_.lock();
        while( NULL != request ) {
            if( requests.empty() ) {
                condVar_.wait();
            } else {
                request = requests.front();
                requests.pop_front();
            }
        }
        requestLocker_.unlock();

        return request;
    }

    void workBegin()
    {
        workingLocker_.lock();
        ++workingCount_;
        workingLocker_.unlock();
    }

    void workEnd()
    {
        workingLocker_.lock();
        --workingCount_;
        workingLocker_.unlock();
    }

private:
    Worker *workers_;
    int     workerCount_;
    int     workingCount_;
    std::list requests;

    ConvVar condVar_;
    Locker  requestLocker_;
    Locker  workingLocker_;
};

한가지 고려할 점은 popRequest 메소드로 Worker가 작업을 요청하는 부분인데, 현재 큐가 비어있다면, 작업이 들어올때까지 대기하기 된다. 만약 Worker가 종료하게 된다면, popRequest 메소드에서 풀어주지 않은다면 Worker는 영원히 종료되지 않을 지도 모른다. 즉, 대기하는 부분을 Worker로 두고 제어하는 것이 더 직관적이다. 수정한 내용이 다름과 같다.
ThreadPoll 수정한 부분이다.

struct IRequestAllocator
{
    Callable* popRequest() = 0;   
    void      workBegin() = 0;
    void      workEnd() = 0;
    void      waitRequest() = 0;
    void      notifyRequest() = 0;
};

class ThreadPool : public IRequestAllocator
{
public:
    ThreadPool(int workerCount)    :
        workerCount_(workerCount),
        workers_(new  Worker[workerCount]),
        workingCount_(0)
    {}

    ~ThreadPool()
    {
        delete[] workers_;
    }

    void putRequest(const Callable *callable)
    {
        requestLocker_.lock();
        requests.push_back(callable);
        requestLocker_.unlock();
        notifyRequest();
    }

    void waitRequest()
    {
        condVar_.wait();
    }

    void notifyRequest()
    {
        condVar_.notify();
    }

    Callable* popRequest()
    {
        Callable* request = NULL;
        requestLocker_.lock();
            if( !requests.empty() ) {
                request = requests.front();
                requests.pop();
            }
        }
        requestLocker_.unlock();

        return request;
    }

    //...
};

Worker 수정한 내용이다.

class Worker
{
public:
    Worker(IRequestAllocator *allocator) : allocator_(allocator)
    {}

    void run()
    {
        while(1)
        {
            while(NULL == request_ ) {
                request_ = allocator_->popRequest();

                if( NULL == request_ ) {
                    allocator_->waitRequest();
                }
            }

            allocator_->workBegin();
            request_->call();
            allocator_->workEnd();

            request_ = NULL;
        }
    }

private:
    Callable         *request_;
    IRequestAllocator *allocator_;
};

모든 작업 관리

넘어가기 전에 살짝 구조를 변경하였다. 즉, Worker도 Callable를 구현하는 방식으로 변경하였다. 이는 나중에 Thread라는 객체를 만들어서 공통적으로 쓰레드를 구동하는 인터페이스로 사용하기 위해서이다.

fig.03

이 부분은 모든 작업을 멈추거나 대기하는 부분이다. 즉, threadpool의 모든 작업이 끝나기를 기다리거나, 혹은 강제로 작업을 멈추고 종료하는 것을 말한다. 먼저 Worker를 종료하는 부분부터 수정해보자.
Thread 에 의해서 종료되는 Worker인 경우는 강제로 종료하기는 불안전하기 때문에 로직 내에 반드시 종료할 수 있는 루틴을 넣어주는 방식을 사용했다. 일반적으로 모든 쓰레드는 강제로 인터럽트되는 경우 오류 발생 가능성이 너무 많다. 가급적이면 내부적으로 강제적인 정상 종료 로직을 넣는 것이 좋다고 본다.
그래서 callable 인터페이스 stop 메소드를 추가하고, 개별로 stop 메소드가 호출되었을 경우 강제로 로직을 빠져나오게 한다.

struct Callable
{
    virtual void call() = 0;
    virtual void stop() {}
};

마찬가지로 Worker도 stop 메소드를 구현하면 된다. Worker에서 Callable를 상속한 부분과 stop 메소드 구현은 다음과 같다.

class Worker : public Callable
{
public:
    Worker(IRequestAllocator *allocator) : allocator_(allocator)
    {}

    void call()
    {
        while (false == isStop_)
        {
            while (NULL == request_ && false == isStop_) {
                request_ = allocator_->popRequest();

                if (NULL == request_) {
                    allocator_->waitRequest();
                }               
            }

            if( NULL != request_ ) {
                allocator_->workBegin();
                request_->call();
                allocator_->workEnd();
                request_ = NULL;
            }
        }
    }

    void stop()
    {
        isStop_ = true;
        allocator_->notifyRequest();
    }

private:
    Callable          *request_;
    IRequestAllocator *allocator_;
};

Callable에 대한 구현은 여기서는 살펴보지 않겠다. 단지 Worker가 구현된 내용을 참고하기 바란다. 다음은 ThreadPool에서 모든 작업 관리하는 로직이다.
ThreadPool 에 요청한 작업이 모두 완결되기를 기다리는 waitRequestDone 메소드와 모든 Worker을 바로 종료하는 shutdownNow 메소드와 모든 요청 작업이 완료되면 모든 Worker를 종료하는 shutdown 메소드를 만들어보자.

class ThreadPool : public IRequestAllocator
{
public:
    //...

    // isReset은 강제로 requests의 요청을 모두 제거 여부
    void waitRequestDone(bool isReset = false)
    {
        if (true == isReset) {
            requestLocker_.lock();
            while (!requests.empty()) {
                requests.pop_front();
            }
            requestLocker_.unlock();
        }

        while (!requests.empty()) {
            waitRequest();
        }
    }

    //...

private:
    Worker *workers_;
    int     workerCount_;
    int     workingCount_;
    std::list requests;

    CondVar condVarAvailable_;
    CondVar condVarEmpty_;
    Locker  requestLocker_;
    Locker  workingLocker_;
};

그리고 shutdown 부분을 살펴보자. shutdown을 위해서 waitWorkingDone 메소드를 추가했다. 이 메소드는 모든 작업 중인 Worker가 마칠때까지 대기하는 것이다.

class ThreadPool : public IRequestAllocator
{
public:
    //...

    void waitWorkingDone(bool isStop)
    {
        if (true == isStop) {
            for(int i=0; istop();
            }
        }

        while (0 != workingcount_) {
            condVarEmpty_.wait();
        }
    }

    void shutdown(bool isNow = false)
    {
        if (isNow) {
            waitRequestDone(true);
        } else {
            waitRequestDone(false);
        }

        waitWorkingDone(isNow);
    }

    void shutdownNow()
    {
        shutdown(true);
    }




private:
    Worker *workers_;
    int     workerCount_;
    int     workingCount_;
    std::list requests;

    CondVar condVarAvailable_;
    CondVar condVarEmpty_;
    Locker  requestLocker_;
    Locker  workingLocker_;
};

고려되야할 부분이 몇 가지가 있다.

  • waitRequestDone 호출해서 대기 중에 처리 요청 받을지 여부
  • shutdown 중간에 들어오는 모든 요청 거부 여부
  • shutdown이 되면 모든 Worker가 중지되기 때문에 모든 요청을 거부 여부

이런 조건을 고려하여 중지 로직을 수정하면 될 것 같다.

특정 작업 관리

이번 기능은 조금 어렵다. 이는 개발 요청에 대한 작업 관리이다. 이런 기능는 다음과 같다.

  • 결과를 획득
  • 요청에 대한 작업 완료 여부
  • 중간에 요청을 취소

말은 쉽지만, 단순 코딩으로 하면 꽤 복잡하고 난해한 문제가 발생한다. 여기서는 Future 패턴을 사용해서 구현하였다.
사 용자가 ThreadPool에 처리 요청(Callable 객체)를 넣으면 ThreadPool은 Token를 반환한다. 사용자는 이 Token을 사용하여 자신이 요청한 작업에 대한 관리를 할 수 있다. 구조를 간단히 그려보면 다음과 같다.

Fig.04

앞으로 모든 요청은 RequestToken에 저장하고 RequestToken이 Callable 객체 대신에 사용된다. 사용자는 IToken 인스탄스를 받아서 이를 처리하게 된다.
그럼 IToken과 RequestToken를 살펴보고 ThreadPool과 Worker에서 변경된 RequestToken으로 처리되는 로직을 살펴보자.

template <typename R>
struct IToken
{
    virtual void cancel() = 0;
    virtual bool isCancelled() = 0;
    virtual bool isDone() = 0;
    virtual R getResult() = 0;
    typedef R ResultType;
};

IToken에는 작업을 취소하는 cancel 메소드와 취소가 완료되었는지 확인하는 isCancelled 메소드와 작업이 완료되었는지 확인하는 isDone 메소드, 마지막으로 결과를 얻는 getResult 메소드가 있다. 구현은 RequestToken에서할 것이다.

enum RequestTokenState
{
    INIT = 0,
    RUN,
    CANCEL,
    CANCELLED,
    STOPPIING,
    STOPPED,
    DONE
};

template <typename R>
class RequestToken : public IToken 
{
public:
    RequestToken(Callable *request) :
    worker_(NULL),
    request_(request),
    state_(INIT)
    {}

    void cancel()
    {
        state_ = CANCEL;
        if( NULL != worker_ ) {
            worker_->stop();
        }
    }

    bool isCancelled()
    {
        return state_ == CANCELLED;
    }

    bool isDone()
    {
        return DONE == state_;
    }

    R getResult()
    {
        if( DONE != state_ ) condVarResult_.wait();
        return result_;
    }

    bool isCancel()
    {
        return (CANCEL == state_ || CANCELLED == state_);
    }

    void setWorker(Worker* worker)
    {
        assert(NULL!=worker);
        worker_ = worker;
    }

    void setResult(R result)
    {
        if (isCancel()) return;

        result_ = result;
        state_ = DONE;
        condVarResult_.notify();
    }

    void cancelled()
    {
        state_ = CANCELLED;
    }

    Callable* getRequest()
    {
        return request_;
    }

private:
    Worker   *worker_
    Callable *request_;
    RequestTokenState state_;
    R         result_;
    CondVar   condVarResult_;
};

추가되는 메소드로 setWorker, setResult, cancelled, isCancel이 있다. setWorker 메소드는 요청이 작업을 시작할때 처리하는 Worker 인스탄스를 저장하기 위한 것이고, setResult는 결과를 저장하기 위한 것이고, cancelled은 취소작업이 완료되었다고 표시하고, isCancel은 현재 취소상태인지 확인하며, getRequest는 저장된 요청을 획득하는 메소드이다. 특히 요청이 완료되지 않은 상태에서 getResult로 결과를 획득할 경우, 요청이 완료될 때까지 멈추게 된다. 즉, 동기적으로 동작한다. 그렇기 때문에 작업이 완료했는지 여부는 isDone 메소드를 확인하여 getResult 메소드로 결과를 획득한다. 혹은 결과를 동기적으로 시퀀스하게 처리하고 싶다면, isDone 메소드 사용없이 바로 getResult 메소드를 사용해도 된다.
다음은 ThreadPool을 수정해보자.

class ThreadPool : public IRequestAllocator
{
public:
    IToken* putRequest(const Callable *callable)
    {
        RequestToken *token = new RequestToken(callable);

        requestLocker_.lock();
        requests.push_back(token);
        requestLocker_.unlock();
        notifyRequest();

        return token;
    }

    RequestToken* popRequest()
    {
        RequestToken* request = NULL;

        requestLocker_.lock();       
        while (NULL != request) {
            if (!requests.empty()) {
                request = requests.front();
                requests.pop();
                if ( request->isCancel()) {
                    request->cancelled();
                    request = NULL;
                }
            } else {
                break;
            }
        }
        requestLocker_.unlock();

        return request;
    }

    //...

private:
    std::list requests;
    //...
};

putRequest와 popRequest 메소드가 수정되었으며 requests의 list 템플릿 타입이 RequestToken으로 변경되었다. 한가지 이슈로서 putRequest에서 요청이 들어 올때마다 RequestToken 인스탄스가 생성이 된다. 그러면 생성된 리소스에 대한 해제 위치와 비번한 리소스 생성으로 인한 문제가 있다. 이 부분은 숙제로 남겨두겠다.
특히 popRequest 메소드에서는 꺼낸 RequestToken이 취소상태이며, 최소처리하고 다시 RequestToken을 꺼내게 된다.
마지막으로 Woker를 수정해보자.

class Worker : public Callable
{
public:
    Worker(IRequestAllocator *allocator) : allocator_(allocator)
    {}

    void call()
    {
        while (false == isStop_)
        {
            while (NULL == request_ && false == isStop_) {
                request_ = allocator_->popRequest();

                if (NULL == request_) {
                    allocator_->waitRequest();
                }               
            }

            if( NULL != request_ ) {

                allocator_->workBegin();

                if (request_->isCancel() || isStop_) {
                    request_->cancelled();
                } else {
                    result = request_->call();
                }

                request_->setResult(request_->call(););

                allocator_->workEnd();
                request_ = NULL;
            }
        }
    }

    //...

private:
    RequestToken      *request_;
    IRequestAllocator *allocator_;
};

실제 요청을 수행하기 전에 취소상태인지 확인하고 실행한다. 그리고 결과를 저장하기 전에도 취소상태인지 확인하고 싶었지만, RequestToken의 결과값 테이터형이 Worker가 알지못하기에 결과를 중간에 저장할 수 없었다. 그래서 바로 저장하고 RequestToken의 setResult 메소드 안에서 취소 상태이면 결과를 저장하지않도록 했다.

관련패턴

  • Future Pattern: 쓰레드에서 결과값을 얻는데 사용
  • ACT(Asynchronous Completion Token) Pattern
  • Preactor Pattern
  • Strategy Pattern

결론

겨우 Thread Pool에 대해 정리가 끝났다. 앞에서 다룬 Thread Pool은 로직이 빈번히 변경되는 환경에서 유용하다. 로직은 고정적이고, 데이터만 변하는 경우라면 달라질 것이다. 아마 더 쉽게 구현할 수 있을 거라 본다. 아직 코드가 완벽하지 않기 때문에 실제 컴파일을 하면 오류가 많이 발생할 것이다. 특히 CondVar은 아애 없기 때문에 새로 구현해야할 것이다. CondVar은 여기서 다루지는 않을 것이다. 다른 thread synchronization에서 참고하면 된다. 혹시나 말하는 건데 CondVar은 기본으로 제공되지는 않는다. 위의 Thread Pool을 기반으로 다양한 변형을 할 수 있을 것이다. 필자도 그렬러고 큰 그림에서 Thread Pool을 정리하였다.
앞으로 Thread Pool을 기반으로 ACT(Asynchronous Completion Token) Pattern을 사용하여 Preactor Pattern을 다뤄보고 싶다. 차후 시간이 되면 이부분도 다룰 예정이다.
앞에서 다룬 모든 요구조건을 만족하지 않았지만, 그래서 어느정도 기본적인 부분은 완료했다고 본다. 다양한 Thread Pool 관리 방식 같은 경우는 Strategy Pattern같은 것을 사용해서 구현하면 될 것이다. 그리고 실행시간에 대한 정보도 처리가 가능하다. 요청에 대한 우선순위도 고려한 저장소 설계를 할 수 있다. 다양한 기능을 더 확장이 가능하다. 그리고 여기서는 결과를 Future 패턴을 이용했지만 Event Listener를 이용하여 이벤트 형태로 결과를 얻을 수도 있다. 물론 여기서는 다루지 않을 것이다. ^^;
마지막으로 thread synchronization에 대한 부분은 완벽히 검토하지 못했다. 최대한 고려했지만, 실제 구동할 때에 어떤 버그가 있을지 확인하지 못했기 때문에 사용할때 이부분을 고려하기 바란다. 모두 즐프하길~~ ospace.

참조

[1] JDK, Java.util.Concurrent Package, Executor class

[2] ThreadPool, 소스코드에 저작자표시가 없어서 누가, 어디서 받았는지 확인하기 어려움

[3] Jeff Heaton, ThreadPool, http://www.jeffheaton.com

반응형

'2.분석 및 설계 > Pattern' 카테고리의 다른 글

Chain of responsibility  (0) 2012.07.27