This commit is contained in:
tursom 2020-05-27 04:09:26 +08:00
parent cfbeeabf98
commit 29c93fb875
6 changed files with 162 additions and 3 deletions

View File

@ -15,8 +15,8 @@ interface NioProtocol {
@Throws(Throwable::class)
fun exceptionCause(key: SelectionKey, nioThread: NioThread, e: Throwable) {
e.printStackTrace()
key.cancel()
key.channel().close()
e.printStackTrace()
}
}

View File

@ -26,7 +26,7 @@ interface NioThread : Closeable, Executor {
/**
* 将通道注册到线程对应的选择器上
*/
fun register(channel: SelectableChannel, ops: Int, onComplete: (key: SelectionKey) -> Unit) {
fun register(channel: SelectableChannel, ops: Int, onComplete: (key: SelectionKey) -> Unit = {}) {
if (Thread.currentThread() == thread) {
val key = channel.register(selector, ops)
onComplete(key)

View File

@ -36,7 +36,7 @@ class NioLoopServer(
if (started.compareAndSet(false, true)) {
listenChannel.socket().bind(InetSocketAddress(port), backLog)
listenChannel.configureBlocking(false)
bossNioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {}
bossNioThread.register(listenChannel, SelectionKey.OP_ACCEPT)
}
}

View File

@ -0,0 +1,111 @@
package cn.tursom.niothread;
import cn.tursom.core.buffer.ByteBuffer;
import cn.tursom.core.buffer.ByteBufferExtensionKt;
import cn.tursom.core.buffer.impl.DirectByteBuffer;
import cn.tursom.niothread.loophandler.BossLoopHandler;
import kotlin.Unit;
import org.jetbrains.annotations.NotNull;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* AsyncSocket NioThread Java 下实现 Echo 服务器
*/
public class EchoServer implements Closeable, Runnable {
public static void main(String[] args) {
EchoServer server = new EchoServer(12345);
server.run();
}
private final int port;
private final ServerSocketChannel serverSocketChannel = getServerSocketChannel();
private final NioThread nioThread = new WorkerLoopNioThread("nioLoopThread", getSelector(), false, 3000, loopHandler);
private SelectionKey key;
public EchoServer(int port) {
this.port = port;
}
public int getPort() {
return port;
}
@Override
public void close() throws IOException {
if (key != null) {
key.cancel();
}
serverSocketChannel.close();
nioThread.close();
}
@Override
public void run() {
try {
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
} catch (IOException e) {
throw new RuntimeException(e);
}
nioThread.register(serverSocketChannel, SelectionKey.OP_ACCEPT, key -> {
this.key = key;
return Unit.INSTANCE;
});
}
private static final NioProtocol nioProtocol = new NioProtocol() {
@Override
public void exceptionCause(@NotNull SelectionKey key, @NotNull NioThread nioThread, @NotNull Throwable e) throws Throwable {
e.printStackTrace();
key.cancel();
key.channel().close();
}
@Override
public void handleConnect(@NotNull SelectionKey key, @NotNull NioThread nioThread) {
key.interestOps(SelectionKey.OP_READ);
key.attach(new DirectByteBuffer(1024));
}
@Override
public void handleRead(@NotNull SelectionKey key, @NotNull NioThread nioThread) throws Throwable {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
ByteBufferExtensionKt.read(channel, buffer);
key.interestOps(SelectionKey.OP_WRITE);
}
@Override
public void handleWrite(@NotNull SelectionKey key, @NotNull NioThread nioThread) throws Throwable {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
ByteBufferExtensionKt.write(channel, buffer);
key.interestOps(SelectionKey.OP_WRITE);
}
};
private static final BossLoopHandler loopHandler = new BossLoopHandler(nioProtocol, null);
private static Selector getSelector() {
try {
return Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static ServerSocketChannel getServerSocketChannel() {
try {
return ServerSocketChannel.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,43 @@
package cn.tursom.niothread
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.niothread.loophandler.BossLoopHandler
import java.net.InetSocketAddress
import java.nio.channels.SelectionKey
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel
/**
* 一个Echo服务器实现
*/
fun main() {
val port = 12345
val memoryPool = DirectMemoryPool()
val protocol = object : NioProtocol {
override fun handleConnect(key: SelectionKey, nioThread: NioThread) {
key.interestOps(SelectionKey.OP_READ)
}
override fun handleRead(key: SelectionKey, nioThread: NioThread) {
val buffer = memoryPool.get()
(key.channel() as SocketChannel).read(buffer)
key.interestOps(SelectionKey.OP_WRITE)
}
override fun handleWrite(key: SelectionKey, nioThread: NioThread) {
(key.attachment() as ByteBuffer).use { buffer ->
(key.channel() as SocketChannel).write(buffer)
}
key.interestOps(SelectionKey.OP_READ)
}
}
val handler = BossLoopHandler(protocol)
val nioThread = WorkerLoopNioThread(workLoop = handler, daemon = false)
val serverChannel = ServerSocketChannel.open()
serverChannel.socket().bind(InetSocketAddress(port))
serverChannel.configureBlocking(false)
nioThread.register(serverChannel, SelectionKey.OP_ACCEPT)
}

View File

@ -0,0 +1,5 @@
package cn.tursom.socket;
public class SocketTest {
}