add Snowflake

This commit is contained in:
tursom 2020-07-03 17:27:16 +08:00
parent ba1b964e24
commit bf51a17bad
6 changed files with 61 additions and 3 deletions

View File

@ -4,6 +4,7 @@ import cn.tursom.channel.BufferedAsyncChannel
import cn.tursom.core.log
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.datagram.AsyncDatagramClient
import cn.tursom.socket.server.BuffedNioServer
import kotlinx.coroutines.runBlocking
val echoHandler: suspend BufferedAsyncChannel.() -> Unit = {
@ -17,7 +18,7 @@ val echoHandler: suspend BufferedAsyncChannel.() -> Unit = {
fun main() {
val port = 12345
val pool = DirectMemoryPool(1024, 16)
val server = BufferedAsyncDatagramServer(port, pool, echoHandler)
val server = BuffedNioServer(port, pool, handler = echoHandler)
//val server = LoopDatagramServer(port, protocol = object : NioProtocol {
// override fun handleRead(key: SelectionKey, nioThread: NioThread) {
// val datagramChannel = key.channel() as DatagramChannel

View File

@ -87,7 +87,7 @@ public class EchoServer implements Closeable, Runnable {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
ByteBufferExtensionKt.write(channel, buffer);
key.interestOps(SelectionKey.OP_WRITE);
key.interestOps(SelectionKey.OP_READ);
}
};

View File

@ -0,0 +1,50 @@
package cn.tursom.core
import java.util.concurrent.atomic.AtomicLong
/**
* 被改造过的雪花ID
* 时间戳仅在初始化时使用, 与序列号接壤, 这样做可以避免某一时间内大量请求导致的ID爆炸
* 当前方案在ID溢出时, 溢出的数据会让时间戳 +1.
* 这样做, 只要节点不重启或者在重启前平均QPS没有超标, 重启后分配的ID仍能唯一
*
* 当前被调试为平均每毫秒可以相应 128 个消息
* 如果平均每个人 10 秒发一条消息, 1 128 条消息大约要 1280 , 1 毫秒 128 条消息就大约需要 128W 用户了
* 单点无法应付如此巨量的并发, ID生成器保证性能过剩
*
* 当前最多支持 8192 个节点同时上线, 未来如果节点数超过了 8192 , 也可以以ID生成的最晚时间为代价提升节点最高数量
*/
class Snowflake(
@Suppress("MemberVisibilityCanBePrivate") val nodeId: Int
) {
constructor(workerId: String) : this(parseId(workerId))
@Suppress("MemberVisibilityCanBePrivate")
val timestamp = System.currentTimeMillis().and(0x00_00_07_ff__ff_ff_ff_ff).shl(7)
private val seed = AtomicLong((nodeId and 0x1fff).toLong().shl(50) or timestamp)
val id: Long get() = seed.incrementAndGet()
override fun toString() = "Snowflake(workerId=${nodeId
}, timestamp=0x${timestamp.toByteArray().toHexString()
}, seed=0x${seed.get().toByteArray().toHexString()})"
companion object {
fun parseId(workerId: String): Int {
var id = 0
var step = 1
for (index in workerId.length - 1 downTo 0) {
if (Character.isDigit(workerId[index])) {
id += (workerId[index] - '0') * step
step *= 10
} else {
break
}
}
if (step != 1) {
return id
} else {
throw NumberFormatException(workerId)
}
}
}
}

View File

@ -62,7 +62,7 @@ object Yaml {
var appended = 0
obj.forEach {
it ?: return@forEach
stringBuilder.append("${if (appended == 0) '\n' else ""}$indentation- ")
stringBuilder.append("${if (appended == 0) "\n" else ""}$indentation- ")
appended++
toYaml(it, stringBuilder, "$indentation ", true)
if (!stringBuilder.endsWith('\n')) {

View File

@ -15,6 +15,11 @@ class NettyWebSocketHandler(
) : SimpleChannelInboundHandler<WebSocketFrame>() {
private val webSocketContext = NettyWebSocketContent(channel)
override fun channelActive(ctx: ChannelHandlerContext?) {
handler.connected(webSocketContext)
super.channelActive(ctx)
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) {
when (msg) {
is TextWebSocketFrame -> {

View File

@ -3,6 +3,8 @@ package cn.tursom.web
import cn.tursom.core.buffer.ByteBuffer
interface WebSocketHandler<in T : WebSocketContent> {
fun connected(context: T)
fun recvText(str: String, context: T)
fun recvText(byteBuffer: ByteBuffer, context: T) = recvText(byteBuffer.getString(), context)