Reactor 패턴 예제 코드
(http://ospace.tistory.com/(ospace114엣empal.컴)) 2008.07.18
Reactor 패턴는 잘알거라 생각합니다. 그래서 그림은 생략합니다. 그냥 귀찮아서 ^^;
다음은 Scalable IO in Java라는 문서로 Doug Lea분의 예제 코드입니다. 앞에 타이틀로 찾아보시면 쉽게 문서를 구하실 수 있을 겁니다.
예제 코드가 정말 기막히게 작성했더군요. 생각을 많이 하게하는 코드 입니다.
문서 내용도 좋으니깐 꼭 보도록 하세요. 시간이 된다면 여기에 추가로 작성해서 올리도록 하지요.
[Reactor Part]
클라이언트로부터 접속 요청이 들어왔들때 접속 처리하고 Acceptor로 처리를 넘김다. Acceptor는 이를 받아서 실제 데이터를 처리하는 Handler로 넘기는 역활을 가진다.
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new INetSocketAddress(port));
serversocket.configureBlocking(false); //넌블럭킹상태로 변경
SlectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // OP_ACCEPT(접속) 이벤트 발생시 selector로 알림
sk.attach(new Acceptor()); // OP_ACCEPT의 SelectionKey객체에 Acceptor 인스탄스를 저장해둠. 이 것이 이벤트 핸들러 객체이다.
}
/* alternatively, use explicit SPI provider: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serveSocket = p.openServerSocketChannel(); */
public void run() {
try {
while(!Thread.interrupted()) {
selector.select(); // 이벤트 발생을 대기
Set selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while(it.hasNext()) { // 발생한 모든 이벤트 루핑
dispatch(it.next()); // 이벤트 디스패칭
selected.clear();
}
}
} catch(IOException ex) { /*...*/ } }
// SelectionKey에 attach된 객체가 이벤트 핸들러이다. 해당 이벤트 핸들러를 추철해서 실행해준다.
private dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if(r!=null) r.run();
}
// 접속 요청을 처리하는 이벤트 핸들러. 접속이 되면 실제 데이터를 처리하는 Handler라는 이벤트 핸들러로 넘겨준다.
private class Acceptor implements Runnable {
public void run() {
try {
SocketChannel c = serverSocket.accept();
if(c != null) new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
}
[Handler Part]
접속이 완료된 후 실제 데이터를 처리하는 역활을 가진다.
final class Handler implements Runnable {
final SocketChannel socket; //접속된 소켓 채널
final SelectionKey sk; // 이벤트 발생시 넘겨받은 SelectionKey
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING; // 현재 작업 상태 (READING-수신)
// Handler는 생성자에서 해당 연결 채널 설정과 이벤트 등록을 한다.
public Handler(Selector sel, SocketChannel c) throws IOException {
socket = c; c.configureBlocking(false); // 넌블럭킹
sk = socket.register(sel,c); // selector에 현재 채널 등록
sk.attach(this); // 현 Handler객체를 저장
sk.interestOps(SelectionKey.OP_READ); // 관심 이벤트(OP_READ) 등록
sel.wakeup(); // 잠자고 있는 selector를 깨우고 자신 이벤트 처리 대기
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
// 현 상태가 READING인 경우 수신처리, SENDING인 경우 송신 처리함
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
// 소켓에서 데이터 수신하고 처리를 process()로 넘기고, 현재 상태를 처리 결과를 송신하는 상태로 변경하고 현재 SelectionKey의 관심 이벤트(OP_WRITE)를 변경한다.
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}
// 소켓에 데이터를 송신한다.
void send() throws IOException {
socket.write(output);
if(outputIsComplete()) // 송신이 완료 되면 해당 SelectinKey를 취소.
sk.cancel();
}
}
[Handler State version]
현재 Handler에서 송신 핸들링이 상태 변수로 처리하는데 이를 State Pattern에 기반하여 Sender로 따로 핸들러를 분리해서 처리하는 형태
class Handler {
//... 앞의 Handler와 같고, 바뀐 부분만 표시했다.
public void run() {
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender()); // 송신시 처리할 이벤트 핸들러를 Sender객체를 등록한다.
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
// Sender 클래스를 Inner class로 정의해서 사용.
class Sender implements Runnable {
public void run() {
socket.write(output);
if(outputIsComplete())
sk.cancel();
}
}
}
[Handler with Thread pool]
처리중 상태 하나를 더 추가했다. 처리 중이라는 상태는 쓰레드 형태로 독립적으로 처리를 한다.
class Handler implements Runnable {
static PooledExcutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
// ... process(); // 이부분은 필요가 없음
state = PROCESSING;
pool.execute(new Processer());
}
}
// 데이터 처리 부분을 따로 분리해서 독립젇으로 처리하기 위한 루틴
synchronized void processAndHandOff() {
process();
state = SENDING;
sk.interest(SelectionKey.OP_WRITE);
}
// 처리상태 핸들러. State pattern으로 작성됨
class Processer implements Runnable {
public void run() {
processAndHandoff();
}
}
}
[Multiple Reactor Threads]
여러 selector를 가지고 여러 이벤트처리에 대한 부분.
Selector[] selectors;
int next = 0;
class Acceptor {
//...
// 여러 selector들 마다 접속 후 데이터 처리 핸들링 Handler 클래스를 각각 생성해주면 된다.
public synchronized void run() {
Socket connection = serverSocket.accept();
if(connection != null)
new Handler(selections[next], connection);
if( ++next == selectors.length)
next = 0;
}
}
이상으로 Reactor Pattern를 마치겠다. 위의 코드는 아이디어를 얻기위한 코드이지 코드 완성도를 따지기 위한 부분은 아니다. 위의 코드에서 유용한 정보를 얻어가기를 바란다.
참고:
[1] Scable IO in Java, Doug Lea, State University of New York at Oswego
'3.구현 > Java or Kotlin' 카테고리의 다른 글
Java 쓰레드 상태 (0) | 2008.07.24 |
---|---|
Jar 패키징(Packaging) (0) | 2008.07.21 |
SLF4J simple tutorial (0) | 2007.12.20 |
Java thread에서 IllegalMonitorStateException 예외 발생문제 (0) | 2007.11.28 |
MS-Sql 서버에서 iBatis 사용 강좌 (0) | 2007.10.26 |