diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioProtocol.kt b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioProtocol.kt index c3b0aa2..62df6fb 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioProtocol.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioProtocol.kt @@ -15,8 +15,8 @@ interface NioProtocol { @Throws(Throwable::class) fun exceptionCause(key: SelectionKey, nioThread: NioThread, e: Throwable) { + e.printStackTrace() key.cancel() key.channel().close() - e.printStackTrace() } } \ No newline at end of file diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioThread.kt b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioThread.kt index 081e73f..7d4195b 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioThread.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/NioThread.kt @@ -26,7 +26,7 @@ interface NioThread : Closeable, Executor { /** * 将通道注册到线程对应的选择器上 */ - fun register(channel: SelectableChannel, ops: Int, onComplete: (key: SelectionKey) -> Unit) { + fun register(channel: SelectableChannel, ops: Int, onComplete: (key: SelectionKey) -> Unit = {}) { if (Thread.currentThread() == thread) { val key = channel.register(selector, ops) onComplete(key) diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioLoopServer.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioLoopServer.kt index 97f3e32..965f93f 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioLoopServer.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioLoopServer.kt @@ -36,7 +36,7 @@ class NioLoopServer( if (started.compareAndSet(false, true)) { listenChannel.socket().bind(InetSocketAddress(port), backLog) listenChannel.configureBlocking(false) - bossNioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {} + bossNioThread.register(listenChannel, SelectionKey.OP_ACCEPT) } } diff --git a/AsyncSocket/src/test/kotlin/cn/tursom/niothread/EchoServer.java b/AsyncSocket/src/test/kotlin/cn/tursom/niothread/EchoServer.java new file mode 100644 index 0000000..5e0ec10 --- /dev/null +++ b/AsyncSocket/src/test/kotlin/cn/tursom/niothread/EchoServer.java @@ -0,0 +1,111 @@ +package cn.tursom.niothread; + +import cn.tursom.core.buffer.ByteBuffer; +import cn.tursom.core.buffer.ByteBufferExtensionKt; +import cn.tursom.core.buffer.impl.DirectByteBuffer; +import cn.tursom.niothread.loophandler.BossLoopHandler; +import kotlin.Unit; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * AsyncSocket 的 NioThread 在 Java 下实现 Echo 服务器 + */ +public class EchoServer implements Closeable, Runnable { + public static void main(String[] args) { + EchoServer server = new EchoServer(12345); + server.run(); + } + + private final int port; + private final ServerSocketChannel serverSocketChannel = getServerSocketChannel(); + private final NioThread nioThread = new WorkerLoopNioThread("nioLoopThread", getSelector(), false, 3000, loopHandler); + private SelectionKey key; + + public EchoServer(int port) { + this.port = port; + } + + public int getPort() { + return port; + } + + @Override + public void close() throws IOException { + if (key != null) { + key.cancel(); + } + serverSocketChannel.close(); + nioThread.close(); + } + + @Override + public void run() { + try { + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + serverSocketChannel.configureBlocking(false); + } catch (IOException e) { + throw new RuntimeException(e); + } + nioThread.register(serverSocketChannel, SelectionKey.OP_ACCEPT, key -> { + this.key = key; + return Unit.INSTANCE; + }); + } + + private static final NioProtocol nioProtocol = new NioProtocol() { + @Override + public void exceptionCause(@NotNull SelectionKey key, @NotNull NioThread nioThread, @NotNull Throwable e) throws Throwable { + e.printStackTrace(); + key.cancel(); + key.channel().close(); + } + + @Override + public void handleConnect(@NotNull SelectionKey key, @NotNull NioThread nioThread) { + key.interestOps(SelectionKey.OP_READ); + key.attach(new DirectByteBuffer(1024)); + } + + @Override + public void handleRead(@NotNull SelectionKey key, @NotNull NioThread nioThread) throws Throwable { + ByteBuffer buffer = (ByteBuffer) key.attachment(); + SocketChannel channel = (SocketChannel) key.channel(); + ByteBufferExtensionKt.read(channel, buffer); + key.interestOps(SelectionKey.OP_WRITE); + } + + @Override + public void handleWrite(@NotNull SelectionKey key, @NotNull NioThread nioThread) throws Throwable { + ByteBuffer buffer = (ByteBuffer) key.attachment(); + SocketChannel channel = (SocketChannel) key.channel(); + ByteBufferExtensionKt.write(channel, buffer); + key.interestOps(SelectionKey.OP_WRITE); + } + }; + + private static final BossLoopHandler loopHandler = new BossLoopHandler(nioProtocol, null); + + private static Selector getSelector() { + try { + return Selector.open(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static ServerSocketChannel getServerSocketChannel() { + try { + return ServerSocketChannel.open(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/AsyncSocket/src/test/kotlin/cn/tursom/niothread/Test.kt b/AsyncSocket/src/test/kotlin/cn/tursom/niothread/Test.kt new file mode 100644 index 0000000..e755d37 --- /dev/null +++ b/AsyncSocket/src/test/kotlin/cn/tursom/niothread/Test.kt @@ -0,0 +1,43 @@ +package cn.tursom.niothread + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.read +import cn.tursom.core.buffer.write +import cn.tursom.core.pool.DirectMemoryPool +import cn.tursom.niothread.loophandler.BossLoopHandler +import java.net.InetSocketAddress +import java.nio.channels.SelectionKey +import java.nio.channels.ServerSocketChannel +import java.nio.channels.SocketChannel + +/** + * 一个Echo服务器实现 + */ +fun main() { + val port = 12345 + val memoryPool = DirectMemoryPool() + val protocol = object : NioProtocol { + override fun handleConnect(key: SelectionKey, nioThread: NioThread) { + key.interestOps(SelectionKey.OP_READ) + } + + override fun handleRead(key: SelectionKey, nioThread: NioThread) { + val buffer = memoryPool.get() + (key.channel() as SocketChannel).read(buffer) + key.interestOps(SelectionKey.OP_WRITE) + } + + override fun handleWrite(key: SelectionKey, nioThread: NioThread) { + (key.attachment() as ByteBuffer).use { buffer -> + (key.channel() as SocketChannel).write(buffer) + } + key.interestOps(SelectionKey.OP_READ) + } + } + val handler = BossLoopHandler(protocol) + val nioThread = WorkerLoopNioThread(workLoop = handler, daemon = false) + val serverChannel = ServerSocketChannel.open() + serverChannel.socket().bind(InetSocketAddress(port)) + serverChannel.configureBlocking(false) + nioThread.register(serverChannel, SelectionKey.OP_ACCEPT) +} \ No newline at end of file diff --git a/AsyncSocket/src/test/kotlin/cn/tursom/socket/SocketTest.java b/AsyncSocket/src/test/kotlin/cn/tursom/socket/SocketTest.java new file mode 100644 index 0000000..9294c1c --- /dev/null +++ b/AsyncSocket/src/test/kotlin/cn/tursom/socket/SocketTest.java @@ -0,0 +1,5 @@ +package cn.tursom.socket; + +public class SocketTest { + +}