单 Reactor

在单 Reactor 模型中, 直接使用 Reactor 注册 OP_ACCEPT (与客户端连接建立完成) 事件, 触发后 attach 的 Acceptor 被回调。

Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port)); // 绑定端口
serverSocket.configureBlocking(false); // 设置成非阻塞模式
// 注册并关注一个 IO 事件
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 关联事件的处理程序
sk.attach(new Acceptor());

后续的步骤都是围绕 Selector 和从 ServerSocketChannel 中 accept 出来的 SocketChannel sc 做文章。

Acceptor, new 一个 Handler, Handler 在 Selector 上绑定事件 OP_READ, 并 attach 自身,持有 SelectionKey。

SocketChannel sc = serverSocket.accept(); // 接收连接,非阻塞模式下,没有连接直接返回 null
if (sc != null) {
	// 把提示发到界面
	sc.write(ByteBuffer.wrap("Implementation of Reactor Design Partten by xu.zeng\\r\\nreactor> ".getBytes()));
	new BasicHandler(selector, sc); // 单线程处理连接
}

public BasicHandler(Selector sel, SocketChannel sc) throws IOException {
	socket = sc;
	sc.configureBlocking(false); // 设置非阻塞
	// Optionally try first read now
	sk = socket.register(sel, 0); // 注册通道
	sk.interestOps(SelectionKey.OP_READ); // 绑定要处理的事件
	sk.attach(this); // 管理事件的处理程序
	sel.wakeup(); // 唤醒 select() 方法
}

客户端写入后,Handler 被 Reactor 回调,处理读事件,Handler 走 decode-compute-encode 后,绑定事件 OP_WRITE, Handler 再度被回调后拿着 socket 写即可, send 完成后,再绑定事件 OP_READ。

protected void read() throws IOException {
	input.clear(); // 清空接收缓冲区
	int n = socket.read(input);
	if (inputIsComplete(n)) {// 如果读取了完整的数据
		process();
		// 待发送的数据已经放入发送缓冲区中
		
		// 更改服务的逻辑状态以及要处理的事件类型
		sk.interestOps(SelectionKey.OP_WRITE);
	}
}

protected void send() throws IOException {
	int written = -1;
	output.flip();// 切换到读取模式,判断是否有数据要发送
	if (output.hasRemaining()) {
		written = socket.write(output);
	}
	
	// 检查连接是否处理完毕,是否断开连接
	if (outputIsComplete(written)) {
		sk.channel().close();
	} else {
		// 否则继续读取
		state = READING;
		// 把提示发到界面
		socket.write(ByteBuffer.wrap("\\r\\nreactor> ".getBytes()));
		sk.interestOps(SelectionKey.OP_READ);
	}	
}

很容想起,HTTP2 模型中一个 TCP dipatch multi Stream 的情况,读写事件会源源不断。这时对 read send sync/lock 做同步处理,decode, compute, encode 线程池处理即可。

线性读写是可能会成为响应速度瓶颈的,使用多 Reactor 模型,主 Reactor 只负责 accpt, 多个 subReactor 负责 read 和 write,可以提高处理海量客户端的能力。

(曾注: 但是对单个网站来说,Socket 只有一个,读写肯定还是线性的,可以 batch HTTP Stream?)

多 Reactor

多 Reactor 模型下,MainReactor 只负责 accept (通过 attach Acceptor 做到), SubReacot 负责 socket 读写, Worker 负责请求处理。所有 Reactor 仍共享一个 Selector。

不同的是,单 Reactor 模式下,Acceptor 直接注册 Handler (也就是在 mainReactor中)。多 Reactor 模式下,Acceptor 选择 subReactor 注册 Handler, 由 subReactor 处理请求。