kcp/ikcp.c

1307 lines
32 KiB
C

//=====================================================================
//
// KCP - A Better ARQ Protocol Implementation
// skywind3000 (at) gmail.com, 2010-2011
//
// Features:
// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp.
// + Maximum RTT reduce three times vs tcp.
// + Lightweight, distributed as a single source file.
//
//=====================================================================
#include "ikcp.h"
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#define IKCP_FASTACK_CONSERVE
//=====================================================================
// KCP BASIC
//=====================================================================
const IUINT32 IKCP_RTO_NDL = 30; // no delay min rto
const IUINT32 IKCP_RTO_MIN = 100; // normal min rto
const IUINT32 IKCP_RTO_DEF = 200;
const IUINT32 IKCP_RTO_MAX = 60000;
const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data
const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack
const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask)
const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell)
const IUINT32 IKCP_ASK_SEND = 1; // need to send IKCP_CMD_WASK
const IUINT32 IKCP_ASK_TELL = 2; // need to send IKCP_CMD_WINS
const IUINT32 IKCP_WND_SND = 32;
const IUINT32 IKCP_WND_RCV = 128; // must >= max fragment size
const IUINT32 IKCP_MTU_DEF = 1400;
const IUINT32 IKCP_ACK_FAST = 3;
const IUINT32 IKCP_INTERVAL = 100;
const IUINT32 IKCP_OVERHEAD = 24;
const IUINT32 IKCP_DEADLINK = 20;
const IUINT32 IKCP_THRESH_INIT = 2;
const IUINT32 IKCP_THRESH_MIN = 2;
const IUINT32 IKCP_PROBE_INIT = 7000; // 7 secs to probe window size
const IUINT32 IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window
const IUINT32 IKCP_FASTACK_LIMIT = 5; // max times to trigger fastack
//---------------------------------------------------------------------
// encode / decode
//---------------------------------------------------------------------
/* encode 8 bits unsigned int */
static inline char *ikcp_encode8u(char *p, unsigned char c)
{
*(unsigned char*)p++ = c;
return p;
}
/* decode 8 bits unsigned int */
static inline const char *ikcp_decode8u(const char *p, unsigned char *c)
{
*c = *(unsigned char*)p++;
return p;
}
/* encode 16 bits unsigned int (lsb) */
static inline char *ikcp_encode16u(char *p, unsigned short w)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*(unsigned char*)(p + 0) = (w & 255);
*(unsigned char*)(p + 1) = (w >> 8);
#else
memcpy(p, &w, 2);
#endif
p += 2;
return p;
}
/* decode 16 bits unsigned int (lsb) */
static inline const char *ikcp_decode16u(const char *p, unsigned short *w)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*w = *(const unsigned char*)(p + 1);
*w = *(const unsigned char*)(p + 0) + (*w << 8);
#else
memcpy(w, p, 2);
#endif
p += 2;
return p;
}
/* encode 32 bits unsigned int (lsb) */
static inline char *ikcp_encode32u(char *p, IUINT32 l)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*(unsigned char*)(p + 0) = (unsigned char)((l >> 0) & 0xff);
*(unsigned char*)(p + 1) = (unsigned char)((l >> 8) & 0xff);
*(unsigned char*)(p + 2) = (unsigned char)((l >> 16) & 0xff);
*(unsigned char*)(p + 3) = (unsigned char)((l >> 24) & 0xff);
#else
memcpy(p, &l, 4);
#endif
p += 4;
return p;
}
/* decode 32 bits unsigned int (lsb) */
static inline const char *ikcp_decode32u(const char *p, IUINT32 *l)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*l = *(const unsigned char*)(p + 3);
*l = *(const unsigned char*)(p + 2) + (*l << 8);
*l = *(const unsigned char*)(p + 1) + (*l << 8);
*l = *(const unsigned char*)(p + 0) + (*l << 8);
#else
memcpy(l, p, 4);
#endif
p += 4;
return p;
}
static inline IUINT32 _imin_(IUINT32 a, IUINT32 b) {
return a <= b ? a : b;
}
static inline IUINT32 _imax_(IUINT32 a, IUINT32 b) {
return a >= b ? a : b;
}
static inline IUINT32 _ibound_(IUINT32 lower, IUINT32 middle, IUINT32 upper)
{
return _imin_(_imax_(lower, middle), upper);
}
static inline long _itimediff(IUINT32 later, IUINT32 earlier)
{
return ((IINT32)(later - earlier));
}
//---------------------------------------------------------------------
// manage segment
//---------------------------------------------------------------------
typedef struct IKCPSEG IKCPSEG;
static void* (*ikcp_malloc_hook)(size_t) = NULL;
static void (*ikcp_free_hook)(void *) = NULL;
// internal malloc
static void* ikcp_malloc(size_t size) {
if (ikcp_malloc_hook)
return ikcp_malloc_hook(size);
return malloc(size);
}
// internal free
static void ikcp_free(void *ptr) {
if (ikcp_free_hook) {
ikcp_free_hook(ptr);
} else {
free(ptr);
}
}
// redefine allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*))
{
ikcp_malloc_hook = new_malloc;
ikcp_free_hook = new_free;
}
// allocate a new kcp segment
static IKCPSEG* ikcp_segment_new(ikcpcb *kcp, int size)
{
return (IKCPSEG*)ikcp_malloc(sizeof(IKCPSEG) + size);
}
// delete a segment
static void ikcp_segment_delete(ikcpcb *kcp, IKCPSEG *seg)
{
ikcp_free(seg);
}
// write log
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...)
{
char buffer[1024];
va_list argptr;
if ((mask & kcp->logmask) == 0 || kcp->writelog == 0) return;
va_start(argptr, fmt);
vsprintf(buffer, fmt, argptr);
va_end(argptr);
kcp->writelog(buffer, kcp, kcp->user);
}
// check log mask
static int ikcp_canlog(const ikcpcb *kcp, int mask)
{
if ((mask & kcp->logmask) == 0 || kcp->writelog == NULL) return 0;
return 1;
}
// output segment
static int ikcp_output(ikcpcb *kcp, const void *data, int size)
{
assert(kcp);
assert(kcp->output);
if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) {
ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size);
}
if (size == 0) return 0;
return kcp->output((const char*)data, size, kcp, kcp->user);
}
// output queue
void ikcp_qprint(const char *name, const struct IQUEUEHEAD *head)
{
#if 0
const struct IQUEUEHEAD *p;
printf("<%s>: [", name);
for (p = head->next; p != head; p = p->next) {
const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
printf("(%lu %d)", (unsigned long)seg->sn, (int)(seg->ts % 10000));
if (p->next != head) printf(",");
}
printf("]\n");
#endif
}
//---------------------------------------------------------------------
// create a new kcpcb
//---------------------------------------------------------------------
ikcpcb* ikcp_create(IUINT32 conv, void *user)
{
ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB));
if (kcp == NULL) return NULL;
kcp->conv = conv;
kcp->user = user;
kcp->snd_una = 0;
kcp->snd_nxt = 0;
kcp->rcv_nxt = 0;
kcp->ts_recent = 0;
kcp->ts_lastack = 0;
kcp->ts_probe = 0;
kcp->probe_wait = 0;
kcp->snd_wnd = IKCP_WND_SND;
kcp->rcv_wnd = IKCP_WND_RCV;
kcp->rmt_wnd = IKCP_WND_RCV;
kcp->cwnd = 0;
kcp->incr = 0;
kcp->probe = 0;
kcp->mtu = IKCP_MTU_DEF;
kcp->mss = kcp->mtu - IKCP_OVERHEAD;
kcp->stream = 0;
kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3);
if (kcp->buffer == NULL) {
ikcp_free(kcp);
return NULL;
}
iqueue_init(&kcp->snd_queue);
iqueue_init(&kcp->rcv_queue);
iqueue_init(&kcp->snd_buf);
iqueue_init(&kcp->rcv_buf);
kcp->nrcv_buf = 0;
kcp->nsnd_buf = 0;
kcp->nrcv_que = 0;
kcp->nsnd_que = 0;
kcp->state = 0;
kcp->acklist = NULL;
kcp->ackblock = 0;
kcp->ackcount = 0;
kcp->rx_srtt = 0;
kcp->rx_rttval = 0;
kcp->rx_rto = IKCP_RTO_DEF;
kcp->rx_minrto = IKCP_RTO_MIN;
kcp->current = 0;
kcp->interval = IKCP_INTERVAL;
kcp->ts_flush = IKCP_INTERVAL;
kcp->nodelay = 0;
kcp->updated = 0;
kcp->logmask = 0;
kcp->ssthresh = IKCP_THRESH_INIT;
kcp->fastresend = 0;
kcp->fastlimit = IKCP_FASTACK_LIMIT;
kcp->nocwnd = 0;
kcp->xmit = 0;
kcp->dead_link = IKCP_DEADLINK;
kcp->output = NULL;
kcp->writelog = NULL;
return kcp;
}
//---------------------------------------------------------------------
// release a new kcpcb
//---------------------------------------------------------------------
void ikcp_release(ikcpcb *kcp)
{
assert(kcp);
if (kcp) {
IKCPSEG *seg;
while (!iqueue_is_empty(&kcp->snd_buf)) {
seg = iqueue_entry(kcp->snd_buf.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
while (!iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
while (!iqueue_is_empty(&kcp->snd_queue)) {
seg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
while (!iqueue_is_empty(&kcp->rcv_queue)) {
seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
if (kcp->buffer) {
ikcp_free(kcp->buffer);
}
if (kcp->acklist) {
ikcp_free(kcp->acklist);
}
kcp->nrcv_buf = 0;
kcp->nsnd_buf = 0;
kcp->nrcv_que = 0;
kcp->nsnd_que = 0;
kcp->ackcount = 0;
kcp->buffer = NULL;
kcp->acklist = NULL;
ikcp_free(kcp);
}
}
//---------------------------------------------------------------------
// set output callback, which will be invoked by kcp
//---------------------------------------------------------------------
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user))
{
kcp->output = output;
}
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
struct IQUEUEHEAD *p;
int ispeek = (len < 0)? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);
if (iqueue_is_empty(&kcp->rcv_queue))
return -1;
if (len < 0) len = -len;
peeksize = ikcp_peeksize(kcp);
if (peeksize < 0)
return -2;
if (peeksize > len)
return -3;
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;
// merge fragment
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;
if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}
len += seg->len;
fragment = seg->frg;
if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}
if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}
if (fragment == 0)
break;
}
assert(len == peeksize);
// move available data from rcv_buf -> rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
// fast recover
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
}
return len;
}
//---------------------------------------------------------------------
// peek data size
//---------------------------------------------------------------------
int ikcp_peeksize(const ikcpcb *kcp)
{
struct IQUEUEHEAD *p;
IKCPSEG *seg;
int length = 0;
assert(kcp);
if (iqueue_is_empty(&kcp->rcv_queue)) return -1;
seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
if (seg->frg == 0) return seg->len;
if (kcp->nrcv_que < seg->frg + 1) return -1;
for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; p = p->next) {
seg = iqueue_entry(p, IKCPSEG, node);
length += seg->len;
if (seg->frg == 0) break;
}
return length;
}
//---------------------------------------------------------------------
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
IKCPSEG *seg;
int count, i;
int sent = 0;
assert(kcp->mss > 0);
if (len < 0) return -1;
// append to previous segment in streaming mode (if possible)
if (kcp->stream != 0) {
if (!iqueue_is_empty(&kcp->snd_queue)) {
IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
if (old->len < kcp->mss) {
int capacity = kcp->mss - old->len;
int extend = (len < capacity)? len : capacity;
seg = ikcp_segment_new(kcp, old->len + extend);
assert(seg);
if (seg == NULL) {
return -2;
}
iqueue_add_tail(&seg->node, &kcp->snd_queue);
memcpy(seg->data, old->data, old->len);
if (buffer) {
memcpy(seg->data + old->len, buffer, extend);
buffer += extend;
}
seg->len = old->len + extend;
seg->frg = 0;
len -= extend;
iqueue_del_init(&old->node);
ikcp_segment_delete(kcp, old);
sent = extend;
}
}
if (len <= 0) {
return sent;
}
}
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;
if (count >= (int)IKCP_WND_RCV) {
if (kcp->stream != 0 && sent > 0)
return sent;
return -2;
}
if (count == 0) count = 1;
// fragment
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
sent += size;
}
return sent;
}
//---------------------------------------------------------------------
// parse ack
//---------------------------------------------------------------------
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
{
IINT32 rto = 0;
if (kcp->rx_srtt == 0) {
kcp->rx_srtt = rtt;
kcp->rx_rttval = rtt / 2;
} else {
long delta = rtt - kcp->rx_srtt;
if (delta < 0) delta = -delta;
kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4;
kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8;
if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
}
rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);
kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);
}
static void ikcp_shrink_buf(ikcpcb *kcp)
{
struct IQUEUEHEAD *p = kcp->snd_buf.next;
if (p != &kcp->snd_buf) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
kcp->snd_una = seg->sn;
} else {
kcp->snd_una = kcp->snd_nxt;
}
}
static void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn)
{
struct IQUEUEHEAD *p, *next;
if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
return;
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
next = p->next;
if (sn == seg->sn) {
iqueue_del(p);
ikcp_segment_delete(kcp, seg);
kcp->nsnd_buf--;
break;
}
if (_itimediff(sn, seg->sn) < 0) {
break;
}
}
}
static void ikcp_parse_una(ikcpcb *kcp, IUINT32 una)
{
struct IQUEUEHEAD *p, *next;
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
next = p->next;
if (_itimediff(una, seg->sn) > 0) {
iqueue_del(p);
ikcp_segment_delete(kcp, seg);
kcp->nsnd_buf--;
} else {
break;
}
}
}
static void ikcp_parse_fastack(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
{
struct IQUEUEHEAD *p, *next;
if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
return;
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
next = p->next;
if (_itimediff(sn, seg->sn) < 0) {
break;
}
else if (sn != seg->sn) {
#ifndef IKCP_FASTACK_CONSERVE
seg->fastack++;
#else
if (_itimediff(ts, seg->ts) >= 0)
seg->fastack++;
#endif
}
}
}
//---------------------------------------------------------------------
// ack append
//---------------------------------------------------------------------
static void ikcp_ack_push(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
{
IUINT32 newsize = kcp->ackcount + 1;
IUINT32 *ptr;
if (newsize > kcp->ackblock) {
IUINT32 *acklist;
IUINT32 newblock;
for (newblock = 8; newblock < newsize; newblock <<= 1);
acklist = (IUINT32*)ikcp_malloc(newblock * sizeof(IUINT32) * 2);
if (acklist == NULL) {
assert(acklist != NULL);
abort();
}
if (kcp->acklist != NULL) {
IUINT32 x;
for (x = 0; x < kcp->ackcount; x++) {
acklist[x * 2 + 0] = kcp->acklist[x * 2 + 0];
acklist[x * 2 + 1] = kcp->acklist[x * 2 + 1];
}
ikcp_free(kcp->acklist);
}
kcp->acklist = acklist;
kcp->ackblock = newblock;
}
ptr = &kcp->acklist[kcp->ackcount * 2];
ptr[0] = sn;
ptr[1] = ts;
kcp->ackcount++;
}
static void ikcp_ack_get(const ikcpcb *kcp, int p, IUINT32 *sn, IUINT32 *ts)
{
if (sn) sn[0] = kcp->acklist[p * 2 + 0];
if (ts) ts[0] = kcp->acklist[p * 2 + 1];
}
//---------------------------------------------------------------------
// parse data
//---------------------------------------------------------------------
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
struct IQUEUEHEAD *p, *prev;
IUINT32 sn = newseg->sn;
int repeat = 0;
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
_itimediff(sn, kcp->rcv_nxt) < 0) {
ikcp_segment_delete(kcp, newseg);
return;
}
for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
prev = p->prev;
if (seg->sn == sn) {
repeat = 1;
break;
}
if (_itimediff(sn, seg->sn) > 0) {
break;
}
}
if (repeat == 0) {
iqueue_init(&newseg->node);
iqueue_add(&newseg->node, p);
kcp->nrcv_buf++;
} else {
ikcp_segment_delete(kcp, newseg);
}
#if 0
ikcp_qprint("rcvbuf", &kcp->rcv_buf);
printf("rcv_nxt=%lu\n", kcp->rcv_nxt);
#endif
// move available data from rcv_buf -> rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
#if 0
ikcp_qprint("queue", &kcp->rcv_queue);
printf("rcv_nxt=%lu\n", kcp->rcv_nxt);
#endif
#if 1
// printf("snd(buf=%d, queue=%d)\n", kcp->nsnd_buf, kcp->nsnd_que);
// printf("rcv(buf=%d, queue=%d)\n", kcp->nrcv_buf, kcp->nrcv_que);
#endif
}
//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
IUINT32 prev_una = kcp->snd_una;
IUINT32 maxack = 0, latest_ts = 0;
int flag = 0;
if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
}
if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;
while (1) {
IUINT32 ts, sn, len, una, conv;
IUINT16 wnd;
IUINT8 cmd, frg;
IKCPSEG *seg;
if (size < (int)IKCP_OVERHEAD) break;
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
size -= IKCP_OVERHEAD;
if ((long)size < (long)len || (int)len < 0) return -2;
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS)
return -3;
kcp->rmt_wnd = wnd;
ikcp_parse_una(kcp, una);
ikcp_shrink_buf(kcp);
if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
ikcp_parse_ack(kcp, sn);
ikcp_shrink_buf(kcp);
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
ikcp_log(kcp, IKCP_LOG_IN_ACK,
"input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn,
(long)_itimediff(kcp->current, ts),
(long)kcp->rx_rto);
}
}
else if (cmd == IKCP_CMD_PUSH) {
if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
ikcp_log(kcp, IKCP_LOG_IN_DATA,
"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
}
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
ikcp_ack_push(kcp, sn, ts);
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
ikcp_parse_data(kcp, seg);
}
}
}
else if (cmd == IKCP_CMD_WASK) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
}
}
else if (cmd == IKCP_CMD_WINS) {
// do nothing
if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
ikcp_log(kcp, IKCP_LOG_IN_WINS,
"input wins: %lu", (unsigned long)(wnd));
}
}
else {
return -3;
}
data += len;
size -= len;
}
if (flag != 0) {
ikcp_parse_fastack(kcp, maxack, latest_ts);
}
if (_itimediff(kcp->snd_una, prev_una) > 0) {
if (kcp->cwnd < kcp->rmt_wnd) {
IUINT32 mss = kcp->mss;
if (kcp->cwnd < kcp->ssthresh) {
kcp->cwnd++;
kcp->incr += mss;
} else {
if (kcp->incr < mss) kcp->incr = mss;
kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
#if 1
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
#else
kcp->cwnd++;
#endif
}
}
if (kcp->cwnd > kcp->rmt_wnd) {
kcp->cwnd = kcp->rmt_wnd;
kcp->incr = kcp->rmt_wnd * mss;
}
}
}
return 0;
}
//---------------------------------------------------------------------
// ikcp_encode_seg
//---------------------------------------------------------------------
static char *ikcp_encode_seg(char *ptr, const IKCPSEG *seg)
{
ptr = ikcp_encode32u(ptr, seg->conv);
ptr = ikcp_encode8u(ptr, (IUINT8)seg->cmd);
ptr = ikcp_encode8u(ptr, (IUINT8)seg->frg);
ptr = ikcp_encode16u(ptr, (IUINT16)seg->wnd);
ptr = ikcp_encode32u(ptr, seg->ts);
ptr = ikcp_encode32u(ptr, seg->sn);
ptr = ikcp_encode32u(ptr, seg->una);
ptr = ikcp_encode32u(ptr, seg->len);
return ptr;
}
static int ikcp_wnd_unused(const ikcpcb *kcp)
{
if (kcp->nrcv_que < kcp->rcv_wnd) {
return kcp->rcv_wnd - kcp->nrcv_que;
}
return 0;
}
//---------------------------------------------------------------------
// ikcp_flush
//---------------------------------------------------------------------
void ikcp_flush(ikcpcb *kcp)
{
IUINT32 current = kcp->current;
char *buffer = kcp->buffer;
char *ptr = buffer;
int count, size, i;
IUINT32 resent, cwnd;
IUINT32 rtomin;
struct IQUEUEHEAD *p;
int change = 0;
int lost = 0;
IKCPSEG seg;
// 'ikcp_update' haven't been called.
if (kcp->updated == 0) return;
seg.conv = kcp->conv;
seg.cmd = IKCP_CMD_ACK;
seg.frg = 0;
seg.wnd = ikcp_wnd_unused(kcp);
seg.una = kcp->rcv_nxt;
seg.len = 0;
seg.sn = 0;
seg.ts = 0;
// flush acknowledges
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
// probe window size (if remote window size equals zero)
if (kcp->rmt_wnd == 0) {
if (kcp->probe_wait == 0) {
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
}
else {
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
kcp->probe |= IKCP_ASK_SEND;
}
}
} else {
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}
// flush window probing commands
if (kcp->probe & IKCP_ASK_SEND) {
seg.cmd = IKCP_CMD_WASK;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
// flush window probing commands
if (kcp->probe & IKCP_ASK_TELL) {
seg.cmd = IKCP_CMD_WINS;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->probe = 0;
// calculate window size
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
// move data from snd_queue to snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
// calculate resent
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
// flush data segments
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
segment->xmit++;
kcp->xmit++;
if (kcp->nodelay == 0) {
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
} else {
IINT32 step = (kcp->nodelay < 2)?
((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2;
}
segment->resendts = current + segment->rto;
lost = 1;
}
else if (segment->fastack >= resent) {
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;
}
}
if (needsend) {
int need;
segment->ts = current;
segment->wnd = seg.wnd;
segment->una = kcp->rcv_nxt;
size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;
if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, segment);
if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}
if (segment->xmit >= kcp->dead_link) {
kcp->state = (IUINT32)-1;
}
}
}
// flash remain segments
size = (int)(ptr - buffer);
if (size > 0) {
ikcp_output(kcp, buffer, size);
}
// update ssthresh
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = kcp->ssthresh + resent;
kcp->incr = kcp->cwnd * kcp->mss;
}
if (lost) {
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
if (kcp->cwnd < 1) {
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
}
//---------------------------------------------------------------------
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;
kcp->current = current;
if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}
slap = _itimediff(kcp->current, kcp->ts_flush);
if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}
if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}
//---------------------------------------------------------------------
// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
//---------------------------------------------------------------------
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)
{
IUINT32 ts_flush = kcp->ts_flush;
IINT32 tm_flush = 0x7fffffff;
IINT32 tm_packet = 0x7fffffff;
IUINT32 minimal = 0;
struct IQUEUEHEAD *p;
if (kcp->updated == 0) {
return current;
}
if (_itimediff(current, ts_flush) >= 10000 ||
_itimediff(current, ts_flush) < -10000) {
ts_flush = current;
}
if (_itimediff(current, ts_flush) >= 0) {
return current;
}
tm_flush = _itimediff(ts_flush, current);
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
IINT32 diff = _itimediff(seg->resendts, current);
if (diff <= 0) {
return current;
}
if (diff < tm_packet) tm_packet = diff;
}
minimal = (IUINT32)(tm_packet < tm_flush ? tm_packet : tm_flush);
if (minimal >= kcp->interval) minimal = kcp->interval;
return current + minimal;
}
int ikcp_setmtu(ikcpcb *kcp, int mtu)
{
char *buffer;
if (mtu < 50 || mtu < (int)IKCP_OVERHEAD)
return -1;
buffer = (char*)ikcp_malloc((mtu + IKCP_OVERHEAD) * 3);
if (buffer == NULL)
return -2;
kcp->mtu = mtu;
kcp->mss = kcp->mtu - IKCP_OVERHEAD;
ikcp_free(kcp->buffer);
kcp->buffer = buffer;
return 0;
}
int ikcp_interval(ikcpcb *kcp, int interval)
{
if (interval > 5000) interval = 5000;
else if (interval < 10) interval = 10;
kcp->interval = interval;
return 0;
}
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
{
if (nodelay >= 0) {
kcp->nodelay = nodelay;
if (nodelay) {
kcp->rx_minrto = IKCP_RTO_NDL;
}
else {
kcp->rx_minrto = IKCP_RTO_MIN;
}
}
if (interval >= 0) {
if (interval > 5000) interval = 5000;
else if (interval < 10) interval = 10;
kcp->interval = interval;
}
if (resend >= 0) {
kcp->fastresend = resend;
}
if (nc >= 0) {
kcp->nocwnd = nc;
}
return 0;
}
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd)
{
if (kcp) {
if (sndwnd > 0) {
kcp->snd_wnd = sndwnd;
}
if (rcvwnd > 0) { // must >= max fragment size
kcp->rcv_wnd = _imax_(rcvwnd, IKCP_WND_RCV);
}
}
return 0;
}
int ikcp_waitsnd(const ikcpcb *kcp)
{
return kcp->nsnd_buf + kcp->nsnd_que;
}
// read conv
IUINT32 ikcp_getconv(const void *ptr)
{
IUINT32 conv;
ikcp_decode32u((const char*)ptr, &conv);
return conv;
}