본문 바로가기

3.구현/Java or Kotlin

Reactor 패턴의 예제 코드

Reactor 패턴 예제 코드

(http://ospace.tistory.com/(ospace114엣empal.컴)) 2008.07.18

Reactor 패턴는 잘알거라 생각합니다. 그래서 그림은 생략합니다. 그냥 귀찮아서 ^^;
다음은 Scalable IO in Java라는 문서로 Doug Lea분의 예제 코드입니다. 앞에 타이틀로 찾아보시면 쉽게 문서를 구하실 수 있을 겁니다.
예제 코드가 정말 기막히게 작성했더군요. 생각을 많이 하게하는 코드 입니다.
문서 내용도 좋으니깐 꼭 보도록 하세요. 시간이 된다면 여기에 추가로 작성해서 올리도록 하지요.

[Reactor Part]

클라이언트로부터 접속 요청이 들어왔들때 접속 처리하고 Acceptor로 처리를 넘김다. Acceptor는 이를 받아서 실제 데이터를 처리하는 Handler로 넘기는 역활을 가진다.

class Reactor implements Runnable {
 final Selector selector;
 final ServerSocketChannel serverSocket;

 Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new INetSocketAddress(port));
    serversocket.configureBlocking(false); //넌블럭킹상태로 변경
    SlectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // OP_ACCEPT(접속) 이벤트 발생시 selector로 알림
    sk.attach(new Acceptor()); // OP_ACCEPT의 SelectionKey객체에 Acceptor 인스탄스를 저장해둠. 이 것이 이벤트 핸들러 객체이다.
 }
 /* alternatively, use explicit SPI provider: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serveSocket = p.openServerSocketChannel(); */
 public void run() {
    try {
    while(!Thread.interrupted()) {
       selector.select(); // 이벤트 발생을 대기
       Set selected = selector.selectedKeys();
       Iterator<SelectionKey> it = selected.iterator();

       while(it.hasNext()) { // 발생한 모든 이벤트 루핑
          dispatch(it.next()); // 이벤트 디스패칭
          selected.clear();
       }
    }
 } catch(IOException ex) { /*...*/ } }

 // SelectionKey에 attach된 객체가 이벤트 핸들러이다. 해당 이벤트 핸들러를 추철해서 실행해준다.
 private dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if(r!=null) r.run();
 }

 // 접속 요청을 처리하는 이벤트 핸들러. 접속이 되면 실제 데이터를 처리하는 Handler라는 이벤트 핸들러로 넘겨준다.
 private class Acceptor implements Runnable {
    public void run() {
       try {
          SocketChannel c = serverSocket.accept();
          if(c != null) new Handler(selector, c);
       } catch (IOException ex) { /* ... */ }
    }
 }
 }

[Handler Part]

접속이 완료된 후 실제 데이터를 처리하는 역활을 가진다.

final class Handler implements Runnable {
 final SocketChannel socket; //접속된 소켓 채널
 final SelectionKey sk; // 이벤트 발생시 넘겨받은 SelectionKey
 ByteBuffer input = ByteBuffer.allocate(MAXIN);
 ByteBuffer output = ByteBuffer.allocate(MAXOUT);
 static final int READING = 0, SENDING = 1;
 int state = READING; // 현재 작업 상태 (READING-수신)

 // Handler는 생성자에서 해당 연결 채널 설정과 이벤트 등록을 한다.
 public Handler(Selector sel, SocketChannel c) throws IOException {
    socket = c; c.configureBlocking(false); // 넌블럭킹
    sk = socket.register(sel,c); // selector에 현재 채널 등록
    sk.attach(this); // 현 Handler객체를 저장
    sk.interestOps(SelectionKey.OP_READ); // 관심 이벤트(OP_READ) 등록
    sel.wakeup(); // 잠자고 있는 selector를 깨우고 자신 이벤트 처리 대기
 }

 boolean inputIsComplete() { /* ... */ }
 boolean outputIsComplete() { /* ... */ }
 void process() { /* ... */ }

 // 현 상태가 READING인 경우 수신처리, SENDING인 경우 송신 처리함
 public void run() {
    try {
       if (state == READING) read();
       else if (state == SENDING) send();
    } catch (IOException ex) { /* ... */ }
 }

 // 소켓에서 데이터 수신하고 처리를 process()로 넘기고, 현재 상태를 처리 결과를 송신하는 상태로 변경하고 현재 SelectionKey의 관심 이벤트(OP_WRITE)를 변경한다.
 void read() throws IOException {
    socket.read(input);
    if (inputIsComplete()) {
       process();
       state = SENDING;
       sk.interestOps(SelectionKey.OP_WRITE);
    }
 }

 // 소켓에 데이터를 송신한다.
 void send() throws IOException {
    socket.write(output);
    if(outputIsComplete()) // 송신이 완료 되면 해당 SelectinKey를 취소.
       sk.cancel();
 }
}

[Handler State version]

현재 Handler에서 송신 핸들링이 상태 변수로 처리하는데 이를 State Pattern에 기반하여 Sender로 따로 핸들러를 분리해서 처리하는 형태

class Handler {
    //... 앞의 Handler와 같고, 바뀐 부분만 표시했다.
    public void run() {
       socket.read(input);
       if (inputIsComplete()) {
          process();
          sk.attach(new Sender()); // 송신시 처리할 이벤트 핸들러를 Sender객체를 등록한다.
          sk.interest(SelectionKey.OP_WRITE);
          sk.selector().wakeup();
       }
    }

    // Sender 클래스를 Inner class로 정의해서 사용.
    class Sender implements Runnable {
       public void run() {
          socket.write(output);
          if(outputIsComplete())
          sk.cancel();
       }
    }
 }

[Handler with Thread pool]

처리중 상태 하나를 더 추가했다. 처리 중이라는 상태는 쓰레드 형태로 독립적으로 처리를 한다.

class Handler implements Runnable {
 static PooledExcutor pool = new PooledExecutor(...);
 static final int PROCESSING = 3;
 // ...

 synchronized void read() throws IOException {
    socket.read(input);
    if (inputIsComplete()) {
       // ... process(); // 이부분은 필요가 없음
       state = PROCESSING;
       pool.execute(new Processer());
    }
 }

 // 데이터 처리 부분을 따로 분리해서 독립젇으로 처리하기 위한 루틴
 synchronized void processAndHandOff() {
    process();
    state = SENDING;
    sk.interest(SelectionKey.OP_WRITE);
 }

 // 처리상태 핸들러. State pattern으로 작성됨
 class Processer implements Runnable {
    public void run() {
       processAndHandoff();
    }
 }
}

[Multiple Reactor Threads]

여러 selector를 가지고 여러 이벤트처리에 대한 부분.

Selector[] selectors;
 int next = 0;
 class Acceptor {
 //...
     // 여러 selector들 마다 접속 후 데이터 처리 핸들링 Handler 클래스를 각각 생성해주면 된다.
     public synchronized void run() {
       Socket connection = serverSocket.accept(); 
        if(connection != null)
          new Handler(selections[next], connection);
       if( ++next == selectors.length)
          next = 0;
    }
 }

이상으로 Reactor Pattern를 마치겠다. 위의 코드는 아이디어를 얻기위한 코드이지 코드 완성도를 따지기 위한 부분은 아니다. 위의 코드에서 유용한 정보를 얻어가기를 바란다.

참고:

[1] Scable IO in Java, Doug Lea, State University of New York at Oswego

반응형