mirror of
https://github.com/fatedier/frp.git
synced 2026-01-11 22:23:12 +00:00
update vendors
This commit is contained in:
56
vendor/github.com/fatedier/kcp-go/README.md
generated
vendored
56
vendor/github.com/fatedier/kcp-go/README.md
generated
vendored
@@ -34,6 +34,7 @@ This library intents to provide a **smooth, resilient, ordered, error-checked an
|
||||
1. Packet level encryption support with [AES](https://en.wikipedia.org/wiki/Advanced_Encryption_Standard), [TEA](https://en.wikipedia.org/wiki/Tiny_Encryption_Algorithm), [3DES](https://en.wikipedia.org/wiki/Triple_DES), [Blowfish](https://en.wikipedia.org/wiki/Blowfish_(cipher)), [Cast5](https://en.wikipedia.org/wiki/CAST-128), [Salsa20]( https://en.wikipedia.org/wiki/Salsa20), etc. in [CFB](https://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Cipher_Feedback_.28CFB.29) mode, which generates completely anonymous packet.
|
||||
1. Only **A fixed number of goroutines** will be created for the entire server application, costs in **context switch** between goroutines have been taken into consideration.
|
||||
1. Compatible with [skywind3000's](https://github.com/skywind3000) C version with various improvements.
|
||||
1. Platform-dependent optimizations: [sendmmsg](http://man7.org/linux/man-pages/man2/sendmmsg.2.html) and [recvmmsg](http://man7.org/linux/man-pages/man2/recvmmsg.2.html) were expoloited for linux.
|
||||
|
||||
## Documentation
|
||||
|
||||
@@ -43,6 +44,24 @@ For complete documentation, see the associated [Godoc](https://godoc.org/github.
|
||||
|
||||
<img src="frame.png" alt="Frame Format" height="109px" />
|
||||
|
||||
```
|
||||
NONCE:
|
||||
16bytes cryptographically secure random number, nonce changes for every packet.
|
||||
|
||||
CRC32:
|
||||
CRC-32 checksum of data using the IEEE polynomial
|
||||
|
||||
FEC TYPE:
|
||||
typeData = 0xF1
|
||||
typeParity = 0xF2
|
||||
|
||||
FEC SEQID:
|
||||
monotonically increasing in range: [0, (0xffffffff/shardSize) * shardSize - 1]
|
||||
|
||||
SIZE:
|
||||
The size of KCP frame plus 2
|
||||
```
|
||||
|
||||
```
|
||||
+-----------------+
|
||||
| SESSION |
|
||||
@@ -65,16 +84,11 @@ For complete documentation, see the associated [Godoc](https://godoc.org/github.
|
||||
```
|
||||
|
||||
|
||||
## Usage
|
||||
## Examples
|
||||
|
||||
Client: [full demo](https://github.com/xtaci/kcptun/blob/master/client/main.go)
|
||||
```go
|
||||
kcpconn, err := kcp.DialWithOptions("192.168.0.1:10000", nil, 10, 3)
|
||||
```
|
||||
Server: [full demo](https://github.com/xtaci/kcptun/blob/master/server/main.go)
|
||||
```go
|
||||
lis, err := kcp.ListenWithOptions(":10000", nil, 10, 3)
|
||||
```
|
||||
1. [simple examples](https://github.com/xtaci/kcp-go/tree/master/examples)
|
||||
2. [kcptun client](https://github.com/xtaci/kcptun/blob/master/client/main.go)
|
||||
3. [kcptun server](https://github.com/xtaci/kcptun/blob/master/server/main.go)
|
||||
|
||||
## Benchmark
|
||||
```
|
||||
@@ -128,6 +142,10 @@ PASS
|
||||
ok github.com/xtaci/kcp-go 50.349s
|
||||
```
|
||||
|
||||
|
||||
## Typical Flame Graph
|
||||

|
||||
|
||||
## Key Design Considerations
|
||||
|
||||
1. slice vs. container/list
|
||||
@@ -159,6 +177,18 @@ BenchmarkNow-4 100000000 15.6 ns/op
|
||||
|
||||
In kcp-go, after each `kcp.output()` function call, current clock time will be updated upon return, and for a single `kcp.flush()` operation, current time will be queried from system once. For most of the time, 5000 connections costs 5000 * 15.6ns = 78us(a fixed cost while no packet needs to be sent), as for 10MB/s data transfering with 1400 MTU, `kcp.output()` will be called around 7500 times and costs 117us for `time.Now()` in **every second**.
|
||||
|
||||
3. Memory management
|
||||
|
||||
Primary memory allocation are done from a global buffer pool xmit.Buf, in kcp-go, when we need to allocate some bytes, we can get from that pool, and a fixed-capacity 1500 bytes(mtuLimit) will be returned, the rx queue, tx queue and fec queue all receive bytes from there, and they will return the bytes to the pool after using to prevent unnecessary zer0ing of bytes. The pool mechanism maintained a high watermark for slice objects, these in-flight objects from the pool will survive from the perodical garbage collection, meanwhile the pool kept the ability to return the memory to runtime if in idle.
|
||||
|
||||
4. Information security
|
||||
|
||||
kcp-go is shipped with builtin packet encryption powered by various block encryption algorithms and works in [Cipher Feedback Mode](https://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Cipher_Feedback_(CFB)), for each packet to be sent, the encryption process will start from encrypting a [nonce](https://en.wikipedia.org/wiki/Cryptographic_nonce) from the [system entropy](https://en.wikipedia.org/wiki//dev/random), so encryption to same plaintexts never leads to a same ciphertexts thereafter.
|
||||
|
||||
The contents of the packets are completely anonymous with encryption, including the headers(FEC,KCP), checksums and contents. Note that, no matter which encryption method you choose on you upper layer, if you disable encryption, the transmit will be insecure somehow, since the header is ***PLAINTEXT*** to everyone it would be susceptible to header tampering, such as jamming the *sliding window size*, *round-trip time*, *FEC property* and *checksums*. ```AES-128``` is suggested for minimal encryption since modern CPUs are shipped with [AES-NI](https://en.wikipedia.org/wiki/AES_instruction_set) instructions and performs even better than `salsa20`(check the table above).
|
||||
|
||||
Other possible attacks to kcp-go includes: a) [traffic analysis](https://en.wikipedia.org/wiki/Traffic_analysis), dataflow on specific websites may have pattern while interchanging data, but this type of eavesdropping has been mitigated by adapting [smux](https://github.com/xtaci/smux) to mix data streams so as to introduce noises, perfect solution to this has not appeared yet, theroretically by shuffling/mixing messages on larger scale network may mitigate this problem. b) [replay attack](https://en.wikipedia.org/wiki/Replay_attack), since the asymmetrical encryption has not been introduced into kcp-go for some reason, capturing the packets and replay them on a different machine is possible, (notice: hijacking the session and decrypting the contents is still *impossible*), so upper layers should contain a asymmetrical encryption system to guarantee the authenticity of each message(to process message exactly once), such as HTTPS/OpenSSL/LibreSSL, only by signing the requests with private keys can eliminate this type of attack.
|
||||
|
||||
## Connection Termination
|
||||
|
||||
Control messages like **SYN/FIN/RST** in TCP **are not defined** in KCP, you need some **keepalive/heartbeat mechanism** in the application-level. A real world example is to use some **multiplexing** protocol over session, such as [smux](https://github.com/xtaci/smux)(with embedded keepalive mechanism), see [kcptun](https://github.com/xtaci/kcptun) for example.
|
||||
@@ -169,6 +199,14 @@ Q: I'm handling >5K connections on my server, the CPU utilization is so high.
|
||||
|
||||
A: A standalone `agent` or `gate` server for running kcp-go is suggested, not only for CPU utilization, but also important to the **precision** of RTT measurements(timing) which indirectly affects retransmission. By increasing update `interval` with `SetNoDelay` like `conn.SetNoDelay(1, 40, 1, 1)` will dramatically reduce system load, but lower the performance.
|
||||
|
||||
Q: When should I enable FEC?
|
||||
|
||||
A: Forward error correction is critical to long-distance transmission, because a packet loss will lead to a huge penalty in time. And for the complicated packet routing network in modern world, round-trip time based loss check will not always be efficient, the big deviation of RTT samples in the long way usually leads to a larger RTO value in typical rtt estimator, which in other words, slows down the transmission.
|
||||
|
||||
Q: Should I enable encryption?
|
||||
|
||||
A: Yes, for the safety of protocol, even if the upper layer has encrypted.
|
||||
|
||||
## Who is using this?
|
||||
|
||||
1. https://github.com/xtaci/kcptun -- A Secure Tunnel Based On KCP over UDP.
|
||||
|
||||
12
vendor/github.com/fatedier/kcp-go/batchconn.go
generated
vendored
Normal file
12
vendor/github.com/fatedier/kcp-go/batchconn.go
generated
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
package kcp
|
||||
|
||||
import "golang.org/x/net/ipv4"
|
||||
|
||||
const (
|
||||
batchSize = 16
|
||||
)
|
||||
|
||||
type batchConn interface {
|
||||
WriteBatch(ms []ipv4.Message, flags int) (int, error)
|
||||
ReadBatch(ms []ipv4.Message, flags int) (int, error)
|
||||
}
|
||||
115
vendor/github.com/fatedier/kcp-go/fec.go
generated
vendored
115
vendor/github.com/fatedier/kcp-go/fec.go
generated
vendored
@@ -11,36 +11,34 @@ const (
|
||||
fecHeaderSize = 6
|
||||
fecHeaderSizePlus2 = fecHeaderSize + 2 // plus 2B data size
|
||||
typeData = 0xf1
|
||||
typeFEC = 0xf2
|
||||
typeParity = 0xf2
|
||||
)
|
||||
|
||||
type (
|
||||
// fecPacket is a decoded FEC packet
|
||||
fecPacket struct {
|
||||
seqid uint32
|
||||
flag uint16
|
||||
data []byte
|
||||
}
|
||||
// fecPacket is a decoded FEC packet
|
||||
type fecPacket []byte
|
||||
|
||||
// fecDecoder for decoding incoming packets
|
||||
fecDecoder struct {
|
||||
rxlimit int // queue size limit
|
||||
dataShards int
|
||||
parityShards int
|
||||
shardSize int
|
||||
rx []fecPacket // ordered receive queue
|
||||
func (bts fecPacket) seqid() uint32 { return binary.LittleEndian.Uint32(bts) }
|
||||
func (bts fecPacket) flag() uint16 { return binary.LittleEndian.Uint16(bts[4:]) }
|
||||
func (bts fecPacket) data() []byte { return bts[6:] }
|
||||
|
||||
// caches
|
||||
decodeCache [][]byte
|
||||
flagCache []bool
|
||||
// fecDecoder for decoding incoming packets
|
||||
type fecDecoder struct {
|
||||
rxlimit int // queue size limit
|
||||
dataShards int
|
||||
parityShards int
|
||||
shardSize int
|
||||
rx []fecPacket // ordered receive queue
|
||||
|
||||
// zeros
|
||||
zeros []byte
|
||||
// caches
|
||||
decodeCache [][]byte
|
||||
flagCache []bool
|
||||
|
||||
// RS decoder
|
||||
codec reedsolomon.Encoder
|
||||
}
|
||||
)
|
||||
// zeros
|
||||
zeros []byte
|
||||
|
||||
// RS decoder
|
||||
codec reedsolomon.Encoder
|
||||
}
|
||||
|
||||
func newFECDecoder(rxlimit, dataShards, parityShards int) *fecDecoder {
|
||||
if dataShards <= 0 || parityShards <= 0 {
|
||||
@@ -66,33 +64,24 @@ func newFECDecoder(rxlimit, dataShards, parityShards int) *fecDecoder {
|
||||
return dec
|
||||
}
|
||||
|
||||
// decodeBytes a fec packet
|
||||
func (dec *fecDecoder) decodeBytes(data []byte) fecPacket {
|
||||
var pkt fecPacket
|
||||
pkt.seqid = binary.LittleEndian.Uint32(data)
|
||||
pkt.flag = binary.LittleEndian.Uint16(data[4:])
|
||||
// allocate memory & copy
|
||||
buf := xmitBuf.Get().([]byte)[:len(data)-6]
|
||||
copy(buf, data[6:])
|
||||
pkt.data = buf
|
||||
return pkt
|
||||
}
|
||||
|
||||
// decode a fec packet
|
||||
func (dec *fecDecoder) decode(pkt fecPacket) (recovered [][]byte) {
|
||||
func (dec *fecDecoder) decode(in fecPacket) (recovered [][]byte) {
|
||||
// insertion
|
||||
n := len(dec.rx) - 1
|
||||
insertIdx := 0
|
||||
for i := n; i >= 0; i-- {
|
||||
if pkt.seqid == dec.rx[i].seqid { // de-duplicate
|
||||
xmitBuf.Put(pkt.data)
|
||||
if in.seqid() == dec.rx[i].seqid() { // de-duplicate
|
||||
return nil
|
||||
} else if _itimediff(pkt.seqid, dec.rx[i].seqid) > 0 { // insertion
|
||||
} else if _itimediff(in.seqid(), dec.rx[i].seqid()) > 0 { // insertion
|
||||
insertIdx = i + 1
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// make a copy
|
||||
pkt := fecPacket(xmitBuf.Get().([]byte)[:len(in)])
|
||||
copy(pkt, in)
|
||||
|
||||
// insert into ordered rx queue
|
||||
if insertIdx == n+1 {
|
||||
dec.rx = append(dec.rx, pkt)
|
||||
@@ -103,11 +92,11 @@ func (dec *fecDecoder) decode(pkt fecPacket) (recovered [][]byte) {
|
||||
}
|
||||
|
||||
// shard range for current packet
|
||||
shardBegin := pkt.seqid - pkt.seqid%uint32(dec.shardSize)
|
||||
shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize)
|
||||
shardEnd := shardBegin + uint32(dec.shardSize) - 1
|
||||
|
||||
// max search range in ordered queue for current shard
|
||||
searchBegin := insertIdx - int(pkt.seqid%uint32(dec.shardSize))
|
||||
searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize))
|
||||
if searchBegin < 0 {
|
||||
searchBegin = 0
|
||||
}
|
||||
@@ -130,21 +119,21 @@ func (dec *fecDecoder) decode(pkt fecPacket) (recovered [][]byte) {
|
||||
|
||||
// shard assembly
|
||||
for i := searchBegin; i <= searchEnd; i++ {
|
||||
seqid := dec.rx[i].seqid
|
||||
seqid := dec.rx[i].seqid()
|
||||
if _itimediff(seqid, shardEnd) > 0 {
|
||||
break
|
||||
} else if _itimediff(seqid, shardBegin) >= 0 {
|
||||
shards[seqid%uint32(dec.shardSize)] = dec.rx[i].data
|
||||
shards[seqid%uint32(dec.shardSize)] = dec.rx[i].data()
|
||||
shardsflag[seqid%uint32(dec.shardSize)] = true
|
||||
numshard++
|
||||
if dec.rx[i].flag == typeData {
|
||||
if dec.rx[i].flag() == typeData {
|
||||
numDataShard++
|
||||
}
|
||||
if numshard == 1 {
|
||||
first = i
|
||||
}
|
||||
if len(dec.rx[i].data) > maxlen {
|
||||
maxlen = len(dec.rx[i].data)
|
||||
if len(dec.rx[i].data()) > maxlen {
|
||||
maxlen = len(dec.rx[i].data())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -159,11 +148,14 @@ func (dec *fecDecoder) decode(pkt fecPacket) (recovered [][]byte) {
|
||||
dlen := len(shards[k])
|
||||
shards[k] = shards[k][:maxlen]
|
||||
copy(shards[k][dlen:], dec.zeros)
|
||||
} else {
|
||||
shards[k] = xmitBuf.Get().([]byte)[:0]
|
||||
}
|
||||
}
|
||||
if err := dec.codec.ReconstructData(shards); err == nil {
|
||||
for k := range shards[:dec.dataShards] {
|
||||
if !shardsflag[k] {
|
||||
// recovered data should be recycled
|
||||
recovered = append(recovered, shards[k])
|
||||
}
|
||||
}
|
||||
@@ -174,7 +166,7 @@ func (dec *fecDecoder) decode(pkt fecPacket) (recovered [][]byte) {
|
||||
|
||||
// keep rxlimit
|
||||
if len(dec.rx) > dec.rxlimit {
|
||||
if dec.rx[0].flag == typeData { // track the unrecoverable data
|
||||
if dec.rx[0].flag() == typeData { // track the unrecoverable data
|
||||
atomic.AddUint64(&DefaultSnmp.FECShortShards, 1)
|
||||
}
|
||||
dec.rx = dec.freeRange(0, 1, dec.rx)
|
||||
@@ -182,15 +174,16 @@ func (dec *fecDecoder) decode(pkt fecPacket) (recovered [][]byte) {
|
||||
return
|
||||
}
|
||||
|
||||
// free a range of fecPacket, and zero for GC recycling
|
||||
// free a range of fecPacket
|
||||
func (dec *fecDecoder) freeRange(first, n int, q []fecPacket) []fecPacket {
|
||||
for i := first; i < first+n; i++ { // recycle buffer
|
||||
xmitBuf.Put(q[i].data)
|
||||
xmitBuf.Put([]byte(q[i]))
|
||||
}
|
||||
|
||||
if first == 0 && n < cap(q)/2 {
|
||||
return q[n:]
|
||||
}
|
||||
copy(q[first:], q[first+n:])
|
||||
for i := 0; i < n; i++ { // dereference data
|
||||
q[len(q)-1-i].data = nil
|
||||
}
|
||||
return q[:len(q)-n]
|
||||
}
|
||||
|
||||
@@ -229,7 +222,7 @@ func newFECEncoder(dataShards, parityShards, offset int) *fecEncoder {
|
||||
enc.dataShards = dataShards
|
||||
enc.parityShards = parityShards
|
||||
enc.shardSize = dataShards + parityShards
|
||||
enc.paws = (0xffffffff/uint32(enc.shardSize) - 1) * uint32(enc.shardSize)
|
||||
enc.paws = 0xffffffff / uint32(enc.shardSize) * uint32(enc.shardSize)
|
||||
enc.headerOffset = offset
|
||||
enc.payloadOffset = enc.headerOffset + fecHeaderSize
|
||||
|
||||
@@ -252,13 +245,16 @@ func newFECEncoder(dataShards, parityShards, offset int) *fecEncoder {
|
||||
// encodes the packet, outputs parity shards if we have collected quorum datashards
|
||||
// notice: the contents of 'ps' will be re-written in successive calling
|
||||
func (enc *fecEncoder) encode(b []byte) (ps [][]byte) {
|
||||
// The header format:
|
||||
// | FEC SEQID(4B) | FEC TYPE(2B) | SIZE (2B) | PAYLOAD(SIZE-2) |
|
||||
// |<-headerOffset |<-payloadOffset
|
||||
enc.markData(b[enc.headerOffset:])
|
||||
binary.LittleEndian.PutUint16(b[enc.payloadOffset:], uint16(len(b[enc.payloadOffset:])))
|
||||
|
||||
// copy data to fec datashards
|
||||
// copy data from payloadOffset to fec shard cache
|
||||
sz := len(b)
|
||||
enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
|
||||
copy(enc.shardCache[enc.shardCount], b)
|
||||
copy(enc.shardCache[enc.shardCount][enc.payloadOffset:], b[enc.payloadOffset:])
|
||||
enc.shardCount++
|
||||
|
||||
// track max datashard length
|
||||
@@ -285,7 +281,7 @@ func (enc *fecEncoder) encode(b []byte) (ps [][]byte) {
|
||||
if err := enc.codec.Encode(cache); err == nil {
|
||||
ps = enc.shardCache[enc.dataShards:]
|
||||
for k := range ps {
|
||||
enc.markFEC(ps[k][enc.headerOffset:])
|
||||
enc.markParity(ps[k][enc.headerOffset:])
|
||||
ps[k] = ps[k][:enc.maxSize]
|
||||
}
|
||||
}
|
||||
@@ -304,8 +300,9 @@ func (enc *fecEncoder) markData(data []byte) {
|
||||
enc.next++
|
||||
}
|
||||
|
||||
func (enc *fecEncoder) markFEC(data []byte) {
|
||||
func (enc *fecEncoder) markParity(data []byte) {
|
||||
binary.LittleEndian.PutUint32(data, enc.next)
|
||||
binary.LittleEndian.PutUint16(data[4:], typeFEC)
|
||||
binary.LittleEndian.PutUint16(data[4:], typeParity)
|
||||
// sequence wrap will only happen at parity shard
|
||||
enc.next = (enc.next + 1) % enc.paws
|
||||
}
|
||||
|
||||
BIN
vendor/github.com/fatedier/kcp-go/flame.png
generated
vendored
Normal file
BIN
vendor/github.com/fatedier/kcp-go/flame.png
generated
vendored
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 56 KiB |
147
vendor/github.com/fatedier/kcp-go/kcp.go
generated
vendored
147
vendor/github.com/fatedier/kcp-go/kcp.go
generated
vendored
@@ -1,9 +1,9 @@
|
||||
// Package kcp - A Fast and Reliable ARQ Protocol
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -30,6 +30,12 @@ const (
|
||||
IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
|
||||
)
|
||||
|
||||
// monotonic reference time point
|
||||
var refTime time.Time = time.Now()
|
||||
|
||||
// currentMs returns current elasped monotonic milliseconds since program startup
|
||||
func currentMs() uint32 { return uint32(time.Now().Sub(refTime) / time.Millisecond) }
|
||||
|
||||
// output_callback is a prototype which ought capture conn and call conn.Write
|
||||
type output_callback func(buf []byte, size int)
|
||||
|
||||
@@ -145,8 +151,9 @@ type KCP struct {
|
||||
|
||||
acklist []ackItem
|
||||
|
||||
buffer []byte
|
||||
output output_callback
|
||||
buffer []byte
|
||||
reserved int
|
||||
output output_callback
|
||||
}
|
||||
|
||||
type ackItem struct {
|
||||
@@ -154,8 +161,11 @@ type ackItem struct {
|
||||
ts uint32
|
||||
}
|
||||
|
||||
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint
|
||||
// from the same connection.
|
||||
// NewKCP create a new kcp state machine
|
||||
//
|
||||
// 'conv' must be equal in the connection peers, or else data will be silently rejected.
|
||||
//
|
||||
// 'output' function will be called whenever these is data to be sent on wire.
|
||||
func NewKCP(conv uint32, output output_callback) *KCP {
|
||||
kcp := new(KCP)
|
||||
kcp.conv = conv
|
||||
@@ -164,7 +174,7 @@ func NewKCP(conv uint32, output output_callback) *KCP {
|
||||
kcp.rmt_wnd = IKCP_WND_RCV
|
||||
kcp.mtu = IKCP_MTU_DEF
|
||||
kcp.mss = kcp.mtu - IKCP_OVERHEAD
|
||||
kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
|
||||
kcp.buffer = make([]byte, kcp.mtu)
|
||||
kcp.rx_rto = IKCP_RTO_DEF
|
||||
kcp.rx_minrto = IKCP_RTO_MIN
|
||||
kcp.interval = IKCP_INTERVAL
|
||||
@@ -189,6 +199,19 @@ func (kcp *KCP) delSegment(seg *segment) {
|
||||
}
|
||||
}
|
||||
|
||||
// ReserveBytes keeps n bytes untouched from the beginning of the buffer,
|
||||
// the output_callback function should be aware of this.
|
||||
//
|
||||
// Return false if n >= mss
|
||||
func (kcp *KCP) ReserveBytes(n int) bool {
|
||||
if n >= int(kcp.mtu-IKCP_OVERHEAD) || n < 0 {
|
||||
return false
|
||||
}
|
||||
kcp.reserved = n
|
||||
kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(n)
|
||||
return true
|
||||
}
|
||||
|
||||
// PeekSize checks the size of next message in the recv queue
|
||||
func (kcp *KCP) PeekSize() (length int) {
|
||||
if len(kcp.rcv_queue) == 0 {
|
||||
@@ -214,19 +237,21 @@ func (kcp *KCP) PeekSize() (length int) {
|
||||
return
|
||||
}
|
||||
|
||||
// Recv is user/upper level recv: returns size, returns below zero for EAGAIN
|
||||
// Receive data from kcp state machine
|
||||
//
|
||||
// Return number of bytes read.
|
||||
//
|
||||
// Return -1 when there is no readable data.
|
||||
//
|
||||
// Return -2 if len(buffer) is smaller than kcp.PeekSize().
|
||||
func (kcp *KCP) Recv(buffer []byte) (n int) {
|
||||
if len(kcp.rcv_queue) == 0 {
|
||||
peeksize := kcp.PeekSize()
|
||||
if peeksize < 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
peeksize := kcp.PeekSize()
|
||||
if peeksize < 0 {
|
||||
return -2
|
||||
}
|
||||
|
||||
if peeksize > len(buffer) {
|
||||
return -3
|
||||
return -2
|
||||
}
|
||||
|
||||
var fast_recover bool
|
||||
@@ -255,7 +280,7 @@ func (kcp *KCP) Recv(buffer []byte) (n int) {
|
||||
count = 0
|
||||
for k := range kcp.rcv_buf {
|
||||
seg := &kcp.rcv_buf[k]
|
||||
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
|
||||
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) {
|
||||
kcp.rcv_nxt++
|
||||
count++
|
||||
} else {
|
||||
@@ -386,6 +411,10 @@ func (kcp *KCP) parse_ack(sn uint32) {
|
||||
for k := range kcp.snd_buf {
|
||||
seg := &kcp.snd_buf[k]
|
||||
if sn == seg.sn {
|
||||
// mark and free space, but leave the segment here,
|
||||
// and wait until `una` to delete this, then we don't
|
||||
// have to shift the segments behind forward,
|
||||
// which is an expensive operation for large window
|
||||
seg.acked = 1
|
||||
kcp.delSegment(seg)
|
||||
break
|
||||
@@ -474,7 +503,7 @@ func (kcp *KCP) parse_data(newseg segment) bool {
|
||||
count := 0
|
||||
for k := range kcp.rcv_buf {
|
||||
seg := &kcp.rcv_buf[k]
|
||||
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
|
||||
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) {
|
||||
kcp.rcv_nxt++
|
||||
count++
|
||||
} else {
|
||||
@@ -489,8 +518,12 @@ func (kcp *KCP) parse_data(newseg segment) bool {
|
||||
return repeat
|
||||
}
|
||||
|
||||
// Input when you received a low level packet (eg. UDP packet), call it
|
||||
// regular indicates a regular packet has received(not from FEC)
|
||||
// Input a packet into kcp state machine.
|
||||
//
|
||||
// 'regular' indicates it's a real data packet from remote, and it means it's not generated from ReedSolomon
|
||||
// codecs.
|
||||
//
|
||||
// 'ackNoDelay' will trigger immediate ACK, but surely it will not be efficient in bandwidth
|
||||
func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
|
||||
snd_una := kcp.snd_una
|
||||
if len(data) < IKCP_OVERHEAD {
|
||||
@@ -634,14 +667,28 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
|
||||
seg.una = kcp.rcv_nxt
|
||||
|
||||
buffer := kcp.buffer
|
||||
// flush acknowledges
|
||||
ptr := buffer
|
||||
for i, ack := range kcp.acklist {
|
||||
ptr := buffer[kcp.reserved:] // keep n bytes untouched
|
||||
|
||||
// makeSpace makes room for writing
|
||||
makeSpace := func(space int) {
|
||||
size := len(buffer) - len(ptr)
|
||||
if size+IKCP_OVERHEAD > int(kcp.mtu) {
|
||||
if size+space > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
ptr = buffer[kcp.reserved:]
|
||||
}
|
||||
}
|
||||
|
||||
// flush bytes in buffer if there is any
|
||||
flushBuffer := func() {
|
||||
size := len(buffer) - len(ptr)
|
||||
if size > kcp.reserved {
|
||||
kcp.output(buffer, size)
|
||||
}
|
||||
}
|
||||
|
||||
// flush acknowledges
|
||||
for i, ack := range kcp.acklist {
|
||||
makeSpace(IKCP_OVERHEAD)
|
||||
// filter jitters caused by bufferbloat
|
||||
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
|
||||
seg.sn, seg.ts = ack.sn, ack.ts
|
||||
@@ -651,10 +698,7 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
|
||||
kcp.acklist = kcp.acklist[0:0]
|
||||
|
||||
if ackOnly { // flash remain ack segments
|
||||
size := len(buffer) - len(ptr)
|
||||
if size > 0 {
|
||||
kcp.output(buffer, size)
|
||||
}
|
||||
flushBuffer()
|
||||
return kcp.interval
|
||||
}
|
||||
|
||||
@@ -685,22 +729,14 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
|
||||
// flush window probing commands
|
||||
if (kcp.probe & IKCP_ASK_SEND) != 0 {
|
||||
seg.cmd = IKCP_CMD_WASK
|
||||
size := len(buffer) - len(ptr)
|
||||
if size+IKCP_OVERHEAD > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
makeSpace(IKCP_OVERHEAD)
|
||||
ptr = seg.encode(ptr)
|
||||
}
|
||||
|
||||
// flush window probing commands
|
||||
if (kcp.probe & IKCP_ASK_TELL) != 0 {
|
||||
seg.cmd = IKCP_CMD_WINS
|
||||
size := len(buffer) - len(ptr)
|
||||
if size+IKCP_OVERHEAD > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
makeSpace(IKCP_OVERHEAD)
|
||||
ptr = seg.encode(ptr)
|
||||
}
|
||||
|
||||
@@ -779,20 +815,14 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
|
||||
}
|
||||
|
||||
if needsend {
|
||||
current = currentMs() // time update for a blocking call
|
||||
current = currentMs()
|
||||
segment.xmit++
|
||||
segment.ts = current
|
||||
segment.wnd = seg.wnd
|
||||
segment.una = seg.una
|
||||
|
||||
size := len(buffer) - len(ptr)
|
||||
need := IKCP_OVERHEAD + len(segment.data)
|
||||
|
||||
if size+need > int(kcp.mtu) {
|
||||
kcp.output(buffer, size)
|
||||
ptr = buffer
|
||||
}
|
||||
|
||||
makeSpace(need)
|
||||
ptr = segment.encode(ptr)
|
||||
copy(ptr, segment.data)
|
||||
ptr = ptr[len(segment.data):]
|
||||
@@ -809,10 +839,7 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
|
||||
}
|
||||
|
||||
// flash remain segments
|
||||
size := len(buffer) - len(ptr)
|
||||
if size > 0 {
|
||||
kcp.output(buffer, size)
|
||||
}
|
||||
flushBuffer()
|
||||
|
||||
// counter updates
|
||||
sum := lostSegs
|
||||
@@ -864,6 +891,8 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
|
||||
return uint32(minrto)
|
||||
}
|
||||
|
||||
// (deprecated)
|
||||
//
|
||||
// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
|
||||
// ikcp_check when to call it again (without ikcp_input/_send calling).
|
||||
// 'current' - current timestamp in millisec.
|
||||
@@ -892,6 +921,8 @@ func (kcp *KCP) Update() {
|
||||
}
|
||||
}
|
||||
|
||||
// (deprecated)
|
||||
//
|
||||
// Check determines when should you invoke ikcp_update:
|
||||
// returns when you should invoke ikcp_update in millisec, if there
|
||||
// is no ikcp_input/_send calling. you can call ikcp_update in that
|
||||
@@ -947,12 +978,16 @@ func (kcp *KCP) SetMtu(mtu int) int {
|
||||
if mtu < 50 || mtu < IKCP_OVERHEAD {
|
||||
return -1
|
||||
}
|
||||
buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
|
||||
if kcp.reserved >= int(kcp.mtu-IKCP_OVERHEAD) || kcp.reserved < 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
buffer := make([]byte, mtu)
|
||||
if buffer == nil {
|
||||
return -2
|
||||
}
|
||||
kcp.mtu = uint32(mtu)
|
||||
kcp.mss = kcp.mtu - IKCP_OVERHEAD
|
||||
kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(kcp.reserved)
|
||||
kcp.buffer = buffer
|
||||
return 0
|
||||
}
|
||||
@@ -1006,7 +1041,13 @@ func (kcp *KCP) WaitSnd() int {
|
||||
}
|
||||
|
||||
// remove front n elements from queue
|
||||
// if the number of elements to remove is more than half of the size.
|
||||
// just shift the rear elements to front, otherwise just reslice q to q[n:]
|
||||
// then the cost of runtime.growslice can always be less than n/2
|
||||
func (kcp *KCP) remove_front(q []segment, n int) []segment {
|
||||
newn := copy(q, q[n:])
|
||||
return q[:newn]
|
||||
if n > cap(q)/2 {
|
||||
newn := copy(q, q[n:])
|
||||
return q[:newn]
|
||||
}
|
||||
return q[n:]
|
||||
}
|
||||
|
||||
48
vendor/github.com/fatedier/kcp-go/readloop.go
generated
vendored
Normal file
48
vendor/github.com/fatedier/kcp-go/readloop.go
generated
vendored
Normal file
@@ -0,0 +1,48 @@
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (s *UDPSession) defaultReadLoop() {
|
||||
buf := make([]byte, mtuLimit)
|
||||
var src string
|
||||
for {
|
||||
if n, addr, err := s.conn.ReadFrom(buf); err == nil {
|
||||
// make sure the packet is from the same source
|
||||
if src == "" { // set source address
|
||||
src = addr.String()
|
||||
} else if addr.String() != src {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
if n >= s.headerSize+IKCP_OVERHEAD {
|
||||
s.packetInput(buf[:n])
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
}
|
||||
} else {
|
||||
s.notifyReadError(errors.WithStack(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) defaultMonitor() {
|
||||
buf := make([]byte, mtuLimit)
|
||||
for {
|
||||
if n, from, err := l.conn.ReadFrom(buf); err == nil {
|
||||
if n >= l.headerSize+IKCP_OVERHEAD {
|
||||
l.packetInput(buf[:n], from)
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
}
|
||||
} else {
|
||||
l.notifyReadError(errors.WithStack(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
11
vendor/github.com/fatedier/kcp-go/readloop_generic.go
generated
vendored
Normal file
11
vendor/github.com/fatedier/kcp-go/readloop_generic.go
generated
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
// +build !linux
|
||||
|
||||
package kcp
|
||||
|
||||
func (s *UDPSession) readLoop() {
|
||||
s.defaultReadLoop()
|
||||
}
|
||||
|
||||
func (l *Listener) monitor() {
|
||||
l.defaultMonitor()
|
||||
}
|
||||
120
vendor/github.com/fatedier/kcp-go/readloop_linux.go
generated
vendored
Normal file
120
vendor/github.com/fatedier/kcp-go/readloop_linux.go
generated
vendored
Normal file
@@ -0,0 +1,120 @@
|
||||
// +build linux
|
||||
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/ipv4"
|
||||
"golang.org/x/net/ipv6"
|
||||
)
|
||||
|
||||
// the read loop for a client session
|
||||
func (s *UDPSession) readLoop() {
|
||||
// default version
|
||||
if s.xconn == nil {
|
||||
s.defaultReadLoop()
|
||||
return
|
||||
}
|
||||
|
||||
// x/net version
|
||||
var src string
|
||||
msgs := make([]ipv4.Message, batchSize)
|
||||
for k := range msgs {
|
||||
msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)}
|
||||
}
|
||||
|
||||
for {
|
||||
if count, err := s.xconn.ReadBatch(msgs, 0); err == nil {
|
||||
for i := 0; i < count; i++ {
|
||||
msg := &msgs[i]
|
||||
// make sure the packet is from the same source
|
||||
if src == "" { // set source address if nil
|
||||
src = msg.Addr.String()
|
||||
} else if msg.Addr.String() != src {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
if msg.N < s.headerSize+IKCP_OVERHEAD {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
// source and size has validated
|
||||
s.packetInput(msg.Buffers[0][:msg.N])
|
||||
}
|
||||
} else {
|
||||
// compatibility issue:
|
||||
// for linux kernel<=2.6.32, support for sendmmsg is not available
|
||||
// an error of type os.SyscallError will be returned
|
||||
if operr, ok := err.(*net.OpError); ok {
|
||||
if se, ok := operr.Err.(*os.SyscallError); ok {
|
||||
if se.Syscall == "recvmmsg" {
|
||||
s.defaultReadLoop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
s.notifyReadError(errors.WithStack(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// monitor incoming data for all connections of server
|
||||
func (l *Listener) monitor() {
|
||||
var xconn batchConn
|
||||
if _, ok := l.conn.(*net.UDPConn); ok {
|
||||
addr, err := net.ResolveUDPAddr("udp", l.conn.LocalAddr().String())
|
||||
if err == nil {
|
||||
if addr.IP.To4() != nil {
|
||||
xconn = ipv4.NewPacketConn(l.conn)
|
||||
} else {
|
||||
xconn = ipv6.NewPacketConn(l.conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// default version
|
||||
if xconn == nil {
|
||||
l.defaultMonitor()
|
||||
return
|
||||
}
|
||||
|
||||
// x/net version
|
||||
msgs := make([]ipv4.Message, batchSize)
|
||||
for k := range msgs {
|
||||
msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)}
|
||||
}
|
||||
|
||||
for {
|
||||
if count, err := xconn.ReadBatch(msgs, 0); err == nil {
|
||||
for i := 0; i < count; i++ {
|
||||
msg := &msgs[i]
|
||||
if msg.N >= l.headerSize+IKCP_OVERHEAD {
|
||||
l.packetInput(msg.Buffers[0][:msg.N], msg.Addr)
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// compatibility issue:
|
||||
// for linux kernel<=2.6.32, support for sendmmsg is not available
|
||||
// an error of type os.SyscallError will be returned
|
||||
if operr, ok := err.(*net.OpError); ok {
|
||||
if se, ok := operr.Err.(*os.SyscallError); ok {
|
||||
if se.Syscall == "recvmmsg" {
|
||||
l.defaultMonitor()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
l.notifyReadError(errors.WithStack(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
602
vendor/github.com/fatedier/kcp-go/sess.go
generated
vendored
602
vendor/github.com/fatedier/kcp-go/sess.go
generated
vendored
@@ -1,9 +1,17 @@
|
||||
// Package kcp-go is a Reliable-UDP library for golang.
|
||||
//
|
||||
// This library intents to provide a smooth, resilient, ordered,
|
||||
// error-checked and anonymous delivery of streams over UDP packets.
|
||||
//
|
||||
// The interfaces of this package aims to be compatible with
|
||||
// net.Conn in standard library, but offers powerful features for advanced users.
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -14,14 +22,6 @@ import (
|
||||
"golang.org/x/net/ipv6"
|
||||
)
|
||||
|
||||
type errTimeout struct {
|
||||
error
|
||||
}
|
||||
|
||||
func (errTimeout) Timeout() bool { return true }
|
||||
func (errTimeout) Temporary() bool { return true }
|
||||
func (errTimeout) Error() string { return "i/o timeout" }
|
||||
|
||||
const (
|
||||
// 16-bytes nonce for each packet
|
||||
nonceSize = 16
|
||||
@@ -42,9 +42,9 @@ const (
|
||||
acceptBacklog = 128
|
||||
)
|
||||
|
||||
const (
|
||||
errBrokenPipe = "broken pipe"
|
||||
errInvalidOperation = "invalid operation"
|
||||
var (
|
||||
errInvalidOperation = errors.New("invalid operation")
|
||||
errTimeout = errors.New("timeout")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -72,8 +72,6 @@ type (
|
||||
// recvbuf turns packets into stream
|
||||
recvbuf []byte
|
||||
bufptr []byte
|
||||
// header extended output buffer, if has header
|
||||
ext []byte
|
||||
|
||||
// FEC codec
|
||||
fecDecoder *fecDecoder
|
||||
@@ -90,16 +88,27 @@ type (
|
||||
|
||||
// notifications
|
||||
die chan struct{} // notify current session has Closed
|
||||
dieOnce sync.Once
|
||||
chReadEvent chan struct{} // notify Read() can be called without blocking
|
||||
chWriteEvent chan struct{} // notify Write() can be called without blocking
|
||||
chReadError chan error // notify PacketConn.Read() have an error
|
||||
chWriteError chan error // notify PacketConn.Write() have an error
|
||||
|
||||
// socket error handling
|
||||
socketReadError atomic.Value
|
||||
socketWriteError atomic.Value
|
||||
chSocketReadError chan struct{}
|
||||
chSocketWriteError chan struct{}
|
||||
socketReadErrorOnce sync.Once
|
||||
socketWriteErrorOnce sync.Once
|
||||
|
||||
// nonce generator
|
||||
nonce Entropy
|
||||
|
||||
isClosed bool // flag the session has Closed
|
||||
mu sync.Mutex
|
||||
// packets waiting to be sent on wire
|
||||
txqueue []ipv4.Message
|
||||
xconn batchConn // for x/net
|
||||
xconnWriteError error
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
setReadBuffer interface {
|
||||
@@ -119,14 +128,26 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
|
||||
sess.nonce.Init()
|
||||
sess.chReadEvent = make(chan struct{}, 1)
|
||||
sess.chWriteEvent = make(chan struct{}, 1)
|
||||
sess.chReadError = make(chan error, 1)
|
||||
sess.chWriteError = make(chan error, 1)
|
||||
sess.chSocketReadError = make(chan struct{})
|
||||
sess.chSocketWriteError = make(chan struct{})
|
||||
sess.remote = remote
|
||||
sess.conn = conn
|
||||
sess.l = l
|
||||
sess.block = block
|
||||
sess.recvbuf = make([]byte, mtuLimit)
|
||||
|
||||
// cast to writebatch conn
|
||||
if _, ok := conn.(*net.UDPConn); ok {
|
||||
addr, err := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
|
||||
if err == nil {
|
||||
if addr.IP.To4() != nil {
|
||||
sess.xconn = ipv4.NewPacketConn(conn)
|
||||
} else {
|
||||
sess.xconn = ipv6.NewPacketConn(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FEC codec initialization
|
||||
sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
|
||||
if sess.block != nil {
|
||||
@@ -143,17 +164,12 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
|
||||
sess.headerSize += fecHeaderSizePlus2
|
||||
}
|
||||
|
||||
// we only need to allocate extended packet buffer if we have the additional header
|
||||
if sess.headerSize > 0 {
|
||||
sess.ext = make([]byte, mtuLimit)
|
||||
}
|
||||
|
||||
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
|
||||
if size >= IKCP_OVERHEAD {
|
||||
if size >= IKCP_OVERHEAD+sess.headerSize {
|
||||
sess.output(buf[:size])
|
||||
}
|
||||
})
|
||||
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
|
||||
sess.kcp.ReserveBytes(sess.headerSize)
|
||||
|
||||
// register current session to the global updater,
|
||||
// which call sess.update() periodically.
|
||||
@@ -165,6 +181,7 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
|
||||
}
|
||||
|
||||
currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
|
||||
maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
|
||||
if currestab > maxconn {
|
||||
@@ -186,11 +203,6 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
if s.isClosed {
|
||||
s.mu.Unlock()
|
||||
return 0, errors.New(errBrokenPipe)
|
||||
}
|
||||
|
||||
if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
|
||||
if len(b) >= size { // receive data into 'b' directly
|
||||
s.kcp.Recv(b)
|
||||
@@ -220,7 +232,7 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
|
||||
if !s.rd.IsZero() {
|
||||
if time.Now().After(s.rd) {
|
||||
s.mu.Unlock()
|
||||
return 0, errTimeout{}
|
||||
return 0, errors.WithStack(errTimeout)
|
||||
}
|
||||
|
||||
delay := s.rd.Sub(time.Now())
|
||||
@@ -229,63 +241,66 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// wait for read event or timeout
|
||||
// wait for read event or timeout or error
|
||||
select {
|
||||
case <-s.chReadEvent:
|
||||
case <-c:
|
||||
case <-s.die:
|
||||
case err = <-s.chReadError:
|
||||
if timeout != nil {
|
||||
timeout.Stop()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
if timeout != nil {
|
||||
timeout.Stop()
|
||||
case <-c:
|
||||
return 0, errors.WithStack(errTimeout)
|
||||
case <-s.chSocketReadError:
|
||||
return 0, s.socketReadError.Load().(error)
|
||||
case <-s.die:
|
||||
return 0, errors.WithStack(io.ErrClosedPipe)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write implements net.Conn
|
||||
func (s *UDPSession) Write(b []byte) (n int, err error) {
|
||||
func (s *UDPSession) Write(b []byte) (n int, err error) { return s.WriteBuffers([][]byte{b}) }
|
||||
|
||||
// WriteBuffers write a vector of byte slices to the underlying connection
|
||||
func (s *UDPSession) WriteBuffers(v [][]byte) (n int, err error) {
|
||||
for {
|
||||
s.mu.Lock()
|
||||
if s.isClosed {
|
||||
s.mu.Unlock()
|
||||
return 0, errors.New(errBrokenPipe)
|
||||
select {
|
||||
case <-s.chSocketWriteError:
|
||||
return 0, s.socketWriteError.Load().(error)
|
||||
case <-s.die:
|
||||
return 0, errors.WithStack(io.ErrClosedPipe)
|
||||
default:
|
||||
}
|
||||
|
||||
// controls how much data will be sent to kcp core
|
||||
// to prevent the memory from exhuasting
|
||||
s.mu.Lock()
|
||||
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
|
||||
n = len(b)
|
||||
for {
|
||||
if len(b) <= int(s.kcp.mss) {
|
||||
s.kcp.Send(b)
|
||||
break
|
||||
} else {
|
||||
s.kcp.Send(b[:s.kcp.mss])
|
||||
b = b[s.kcp.mss:]
|
||||
for _, b := range v {
|
||||
n += len(b)
|
||||
for {
|
||||
if len(b) <= int(s.kcp.mss) {
|
||||
s.kcp.Send(b)
|
||||
break
|
||||
} else {
|
||||
s.kcp.Send(b[:s.kcp.mss])
|
||||
b = b[s.kcp.mss:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flush immediately if the queue is full
|
||||
if s.kcp.WaitSnd() >= int(s.kcp.snd_wnd) || !s.writeDelay {
|
||||
s.kcp.flush(false)
|
||||
s.uncork()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// deadline for current writing operation
|
||||
var timeout *time.Timer
|
||||
var c <-chan time.Time
|
||||
if !s.wd.IsZero() {
|
||||
if time.Now().After(s.wd) {
|
||||
s.mu.Unlock()
|
||||
return 0, errTimeout{}
|
||||
return 0, errors.WithStack(errTimeout)
|
||||
}
|
||||
delay := s.wd.Sub(time.Now())
|
||||
timeout = time.NewTimer(delay)
|
||||
@@ -293,44 +308,52 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// wait for write event or timeout
|
||||
select {
|
||||
case <-s.chWriteEvent:
|
||||
case <-c:
|
||||
case <-s.die:
|
||||
case err = <-s.chWriteError:
|
||||
if timeout != nil {
|
||||
timeout.Stop()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
if timeout != nil {
|
||||
timeout.Stop()
|
||||
case <-c:
|
||||
return 0, errors.WithStack(errTimeout)
|
||||
case <-s.chSocketWriteError:
|
||||
return 0, s.socketWriteError.Load().(error)
|
||||
case <-s.die:
|
||||
return 0, errors.WithStack(io.ErrClosedPipe)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// uncork sends data in txqueue if there is any
|
||||
func (s *UDPSession) uncork() {
|
||||
if len(s.txqueue) > 0 {
|
||||
s.tx(s.txqueue)
|
||||
s.txqueue = s.txqueue[:0]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the connection.
|
||||
func (s *UDPSession) Close() error {
|
||||
// remove current session from updater & listener(if necessary)
|
||||
updater.removeSession(s)
|
||||
if s.l != nil { // notify listener
|
||||
s.l.closeSession(s.remote)
|
||||
}
|
||||
var once bool
|
||||
s.dieOnce.Do(func() {
|
||||
close(s.die)
|
||||
once = true
|
||||
})
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.isClosed {
|
||||
return errors.New(errBrokenPipe)
|
||||
if once {
|
||||
// remove from updater
|
||||
updater.removeSession(s)
|
||||
atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
|
||||
|
||||
if s.l != nil { // belongs to listener
|
||||
s.l.closeSession(s.remote)
|
||||
return nil
|
||||
} else { // client socket close
|
||||
return s.conn.Close()
|
||||
}
|
||||
} else {
|
||||
return errors.WithStack(io.ErrClosedPipe)
|
||||
}
|
||||
close(s.die)
|
||||
s.isClosed = true
|
||||
atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
|
||||
if s.l == nil { // client socket close
|
||||
return s.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
|
||||
@@ -390,7 +413,7 @@ func (s *UDPSession) SetMtu(mtu int) bool {
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.kcp.SetMtu(mtu - s.headerSize)
|
||||
s.kcp.SetMtu(mtu)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -412,7 +435,9 @@ func (s *UDPSession) SetACKNoDelay(nodelay bool) {
|
||||
s.ackNoDelay = nodelay
|
||||
}
|
||||
|
||||
// SetDUP duplicates udp packets for kcp output, for testing purpose only
|
||||
// (deprecated)
|
||||
//
|
||||
// SetDUP duplicates udp packets for kcp output.
|
||||
func (s *UDPSession) SetDUP(dup int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -427,19 +452,29 @@ func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
|
||||
s.kcp.NoDelay(nodelay, interval, resend, nc)
|
||||
}
|
||||
|
||||
// SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
|
||||
// SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header.
|
||||
//
|
||||
// It has no effect if it's accepted from Listener.
|
||||
func (s *UDPSession) SetDSCP(dscp int) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.l == nil {
|
||||
if nc, ok := s.conn.(net.Conn); ok {
|
||||
if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err != nil {
|
||||
return ipv6.NewConn(nc).SetTrafficClass(dscp)
|
||||
}
|
||||
if s.l != nil {
|
||||
return errInvalidOperation
|
||||
}
|
||||
if nc, ok := s.conn.(net.Conn); ok {
|
||||
var succeed bool
|
||||
if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil {
|
||||
succeed = true
|
||||
}
|
||||
if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil {
|
||||
succeed = true
|
||||
}
|
||||
|
||||
if succeed {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.New(errInvalidOperation)
|
||||
return errInvalidOperation
|
||||
}
|
||||
|
||||
// SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
|
||||
@@ -451,7 +486,7 @@ func (s *UDPSession) SetReadBuffer(bytes int) error {
|
||||
return nc.SetReadBuffer(bytes)
|
||||
}
|
||||
}
|
||||
return errors.New(errInvalidOperation)
|
||||
return errInvalidOperation
|
||||
}
|
||||
|
||||
// SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
|
||||
@@ -463,37 +498,29 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
|
||||
return nc.SetWriteBuffer(bytes)
|
||||
}
|
||||
}
|
||||
return errors.New(errInvalidOperation)
|
||||
return errInvalidOperation
|
||||
}
|
||||
|
||||
// post-processing for sending a packet from kcp core
|
||||
// steps:
|
||||
// 0. Header extending
|
||||
// 1. FEC packet generation
|
||||
// 2. CRC32 integrity
|
||||
// 3. Encryption
|
||||
// 4. WriteTo kernel
|
||||
// 4. TxQueue
|
||||
func (s *UDPSession) output(buf []byte) {
|
||||
var ecc [][]byte
|
||||
|
||||
// 0. extend buf's header space(if necessary)
|
||||
ext := buf
|
||||
if s.headerSize > 0 {
|
||||
ext = s.ext[:s.headerSize+len(buf)]
|
||||
copy(ext[s.headerSize:], buf)
|
||||
}
|
||||
|
||||
// 1. FEC encoding
|
||||
if s.fecEncoder != nil {
|
||||
ecc = s.fecEncoder.encode(ext)
|
||||
ecc = s.fecEncoder.encode(buf)
|
||||
}
|
||||
|
||||
// 2&3. crc32 & encryption
|
||||
if s.block != nil {
|
||||
s.nonce.Fill(ext[:nonceSize])
|
||||
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
|
||||
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
|
||||
s.block.Encrypt(ext, ext)
|
||||
s.nonce.Fill(buf[:nonceSize])
|
||||
checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
|
||||
binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
|
||||
s.block.Encrypt(buf, buf)
|
||||
|
||||
for k := range ecc {
|
||||
s.nonce.Fill(ecc[k][:nonceSize])
|
||||
@@ -503,28 +530,23 @@ func (s *UDPSession) output(buf []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
// 4. WriteTo kernel
|
||||
nbytes := 0
|
||||
npkts := 0
|
||||
// 4. TxQueue
|
||||
var msg ipv4.Message
|
||||
for i := 0; i < s.dup+1; i++ {
|
||||
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
|
||||
nbytes += n
|
||||
npkts++
|
||||
} else {
|
||||
s.notifyWriteError(err)
|
||||
}
|
||||
bts := xmitBuf.Get().([]byte)[:len(buf)]
|
||||
copy(bts, buf)
|
||||
msg.Buffers = [][]byte{bts}
|
||||
msg.Addr = s.remote
|
||||
s.txqueue = append(s.txqueue, msg)
|
||||
}
|
||||
|
||||
for k := range ecc {
|
||||
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
|
||||
nbytes += n
|
||||
npkts++
|
||||
} else {
|
||||
s.notifyWriteError(err)
|
||||
}
|
||||
bts := xmitBuf.Get().([]byte)[:len(ecc[k])]
|
||||
copy(bts, ecc[k])
|
||||
msg.Buffers = [][]byte{bts}
|
||||
msg.Addr = s.remote
|
||||
s.txqueue = append(s.txqueue, msg)
|
||||
}
|
||||
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
|
||||
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
|
||||
}
|
||||
|
||||
// kcp update, returns interval for next calling
|
||||
@@ -535,6 +557,7 @@ func (s *UDPSession) update() (interval time.Duration) {
|
||||
if s.kcp.WaitSnd() < waitsnd {
|
||||
s.notifyWriteEvent()
|
||||
}
|
||||
s.uncork()
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -556,10 +579,39 @@ func (s *UDPSession) notifyWriteEvent() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *UDPSession) notifyReadError(err error) {
|
||||
s.socketReadErrorOnce.Do(func() {
|
||||
s.socketReadError.Store(err)
|
||||
close(s.chSocketReadError)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *UDPSession) notifyWriteError(err error) {
|
||||
select {
|
||||
case s.chWriteError <- err:
|
||||
default:
|
||||
s.socketWriteErrorOnce.Do(func() {
|
||||
s.socketWriteError.Store(err)
|
||||
close(s.chSocketWriteError)
|
||||
})
|
||||
}
|
||||
|
||||
// packet input stage
|
||||
func (s *UDPSession) packetInput(data []byte) {
|
||||
dataValid := false
|
||||
if s.block != nil {
|
||||
s.block.Decrypt(data, data)
|
||||
data = data[nonceSize:]
|
||||
checksum := crc32.ChecksumIEEE(data[crcSize:])
|
||||
if checksum == binary.LittleEndian.Uint32(data) {
|
||||
data = data[crcSize:]
|
||||
dataValid = true
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
|
||||
}
|
||||
} else if s.block == nil {
|
||||
dataValid = true
|
||||
}
|
||||
|
||||
if dataValid {
|
||||
s.kcpInput(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,16 +620,16 @@ func (s *UDPSession) kcpInput(data []byte) {
|
||||
|
||||
if s.fecDecoder != nil {
|
||||
if len(data) > fecHeaderSize { // must be larger than fec header size
|
||||
f := s.fecDecoder.decodeBytes(data)
|
||||
if f.flag == typeData || f.flag == typeFEC { // header check
|
||||
if f.flag == typeFEC {
|
||||
f := fecPacket(data)
|
||||
if f.flag() == typeData || f.flag() == typeParity { // header check
|
||||
if f.flag() == typeParity {
|
||||
fecParityShards++
|
||||
}
|
||||
recovers := s.fecDecoder.decode(f)
|
||||
|
||||
s.mu.Lock()
|
||||
waitsnd := s.kcp.WaitSnd()
|
||||
if f.flag == typeData {
|
||||
if f.flag() == typeData {
|
||||
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
|
||||
kcpInErrors++
|
||||
}
|
||||
@@ -598,6 +650,8 @@ func (s *UDPSession) kcpInput(data []byte) {
|
||||
} else {
|
||||
fecErrs++
|
||||
}
|
||||
// recycle the recovers
|
||||
xmitBuf.Put(r)
|
||||
}
|
||||
|
||||
// to notify the readers to receive the data
|
||||
@@ -608,6 +662,7 @@ func (s *UDPSession) kcpInput(data []byte) {
|
||||
if s.kcp.WaitSnd() < waitsnd {
|
||||
s.notifyWriteEvent()
|
||||
}
|
||||
s.uncork()
|
||||
s.mu.Unlock()
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
@@ -627,6 +682,7 @@ func (s *UDPSession) kcpInput(data []byte) {
|
||||
if s.kcp.WaitSnd() < waitsnd {
|
||||
s.notifyWriteEvent()
|
||||
}
|
||||
s.uncork()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -644,50 +700,7 @@ func (s *UDPSession) kcpInput(data []byte) {
|
||||
if fecRecovered > 0 {
|
||||
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
|
||||
}
|
||||
}
|
||||
|
||||
// the read loop for a client session
|
||||
func (s *UDPSession) readLoop() {
|
||||
buf := make([]byte, mtuLimit)
|
||||
var src string
|
||||
for {
|
||||
if n, addr, err := s.conn.ReadFrom(buf); err == nil {
|
||||
// make sure the packet is from the same source
|
||||
if src == "" { // set source address
|
||||
src = addr.String()
|
||||
} else if addr.String() != src {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
if n >= s.headerSize+IKCP_OVERHEAD {
|
||||
data := buf[:n]
|
||||
dataValid := false
|
||||
if s.block != nil {
|
||||
s.block.Decrypt(data, data)
|
||||
data = data[nonceSize:]
|
||||
checksum := crc32.ChecksumIEEE(data[crcSize:])
|
||||
if checksum == binary.LittleEndian.Uint32(data) {
|
||||
data = data[crcSize:]
|
||||
dataValid = true
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
|
||||
}
|
||||
} else if s.block == nil {
|
||||
dataValid = true
|
||||
}
|
||||
|
||||
if dataValid {
|
||||
s.kcpInput(data)
|
||||
}
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
}
|
||||
} else {
|
||||
s.chReadError <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
@@ -704,98 +717,91 @@ type (
|
||||
chAccepts chan *UDPSession // Listen() backlog
|
||||
chSessionClosed chan net.Addr // session close queue
|
||||
headerSize int // the additional header to a KCP frame
|
||||
die chan struct{} // notify the listener has closed
|
||||
rd atomic.Value // read deadline for Accept()
|
||||
wd atomic.Value
|
||||
|
||||
die chan struct{} // notify the listener has closed
|
||||
dieOnce sync.Once
|
||||
|
||||
// socket error handling
|
||||
socketReadError atomic.Value
|
||||
chSocketReadError chan struct{}
|
||||
socketReadErrorOnce sync.Once
|
||||
|
||||
rd atomic.Value // read deadline for Accept()
|
||||
}
|
||||
)
|
||||
|
||||
// monitor incoming data for all connections of server
|
||||
func (l *Listener) monitor() {
|
||||
// a cache for session object last used
|
||||
var lastAddr string
|
||||
var lastSession *UDPSession
|
||||
buf := make([]byte, mtuLimit)
|
||||
for {
|
||||
if n, from, err := l.conn.ReadFrom(buf); err == nil {
|
||||
if n >= l.headerSize+IKCP_OVERHEAD {
|
||||
data := buf[:n]
|
||||
dataValid := false
|
||||
if l.block != nil {
|
||||
l.block.Decrypt(data, data)
|
||||
data = data[nonceSize:]
|
||||
checksum := crc32.ChecksumIEEE(data[crcSize:])
|
||||
if checksum == binary.LittleEndian.Uint32(data) {
|
||||
data = data[crcSize:]
|
||||
dataValid = true
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
|
||||
// packet input stage
|
||||
func (l *Listener) packetInput(data []byte, addr net.Addr) {
|
||||
dataValid := false
|
||||
if l.block != nil {
|
||||
l.block.Decrypt(data, data)
|
||||
data = data[nonceSize:]
|
||||
checksum := crc32.ChecksumIEEE(data[crcSize:])
|
||||
if checksum == binary.LittleEndian.Uint32(data) {
|
||||
data = data[crcSize:]
|
||||
dataValid = true
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
|
||||
}
|
||||
} else if l.block == nil {
|
||||
dataValid = true
|
||||
}
|
||||
|
||||
if dataValid {
|
||||
l.sessionLock.Lock()
|
||||
s, ok := l.sessions[addr.String()]
|
||||
l.sessionLock.Unlock()
|
||||
|
||||
if !ok { // new address:port
|
||||
if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
|
||||
var conv uint32
|
||||
convValid := false
|
||||
if l.fecDecoder != nil {
|
||||
isfec := binary.LittleEndian.Uint16(data[4:])
|
||||
if isfec == typeData {
|
||||
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
|
||||
convValid = true
|
||||
}
|
||||
} else if l.block == nil {
|
||||
dataValid = true
|
||||
} else {
|
||||
conv = binary.LittleEndian.Uint32(data)
|
||||
convValid = true
|
||||
}
|
||||
|
||||
if dataValid {
|
||||
addr := from.String()
|
||||
var s *UDPSession
|
||||
var ok bool
|
||||
|
||||
// the packets received from an address always come in batch,
|
||||
// cache the session for next packet, without querying map.
|
||||
if addr == lastAddr {
|
||||
s, ok = lastSession, true
|
||||
} else {
|
||||
l.sessionLock.Lock()
|
||||
if s, ok = l.sessions[addr]; ok {
|
||||
lastSession = s
|
||||
lastAddr = addr
|
||||
}
|
||||
l.sessionLock.Unlock()
|
||||
}
|
||||
|
||||
if !ok { // new session
|
||||
if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
|
||||
var conv uint32
|
||||
convValid := false
|
||||
if l.fecDecoder != nil {
|
||||
isfec := binary.LittleEndian.Uint16(data[4:])
|
||||
if isfec == typeData {
|
||||
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
|
||||
convValid = true
|
||||
}
|
||||
} else {
|
||||
conv = binary.LittleEndian.Uint32(data)
|
||||
convValid = true
|
||||
}
|
||||
|
||||
if convValid { // creates a new session only if the 'conv' field in kcp is accessible
|
||||
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
|
||||
s.kcpInput(data)
|
||||
l.sessionLock.Lock()
|
||||
l.sessions[addr] = s
|
||||
l.sessionLock.Unlock()
|
||||
l.chAccepts <- s
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.kcpInput(data)
|
||||
}
|
||||
if convValid { // creates a new session only if the 'conv' field in kcp is accessible
|
||||
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, addr, l.block)
|
||||
s.kcpInput(data)
|
||||
l.sessionLock.Lock()
|
||||
l.sessions[addr.String()] = s
|
||||
l.sessionLock.Unlock()
|
||||
l.chAccepts <- s
|
||||
}
|
||||
} else {
|
||||
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
|
||||
}
|
||||
} else {
|
||||
return
|
||||
s.kcpInput(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) notifyReadError(err error) {
|
||||
l.socketReadErrorOnce.Do(func() {
|
||||
l.socketReadError.Store(err)
|
||||
close(l.chSocketReadError)
|
||||
|
||||
// propagate read error to all sessions
|
||||
l.sessionLock.Lock()
|
||||
for _, s := range l.sessions {
|
||||
s.notifyReadError(err)
|
||||
}
|
||||
l.sessionLock.Unlock()
|
||||
})
|
||||
}
|
||||
|
||||
// SetReadBuffer sets the socket read buffer for the Listener
|
||||
func (l *Listener) SetReadBuffer(bytes int) error {
|
||||
if nc, ok := l.conn.(setReadBuffer); ok {
|
||||
return nc.SetReadBuffer(bytes)
|
||||
}
|
||||
return errors.New(errInvalidOperation)
|
||||
return errInvalidOperation
|
||||
}
|
||||
|
||||
// SetWriteBuffer sets the socket write buffer for the Listener
|
||||
@@ -803,18 +809,25 @@ func (l *Listener) SetWriteBuffer(bytes int) error {
|
||||
if nc, ok := l.conn.(setWriteBuffer); ok {
|
||||
return nc.SetWriteBuffer(bytes)
|
||||
}
|
||||
return errors.New(errInvalidOperation)
|
||||
return errInvalidOperation
|
||||
}
|
||||
|
||||
// SetDSCP sets the 6bit DSCP field of IP header
|
||||
// SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header.
|
||||
func (l *Listener) SetDSCP(dscp int) error {
|
||||
if nc, ok := l.conn.(net.Conn); ok {
|
||||
if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err != nil {
|
||||
return ipv6.NewConn(nc).SetTrafficClass(dscp)
|
||||
var succeed bool
|
||||
if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil {
|
||||
succeed = true
|
||||
}
|
||||
if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil {
|
||||
succeed = true
|
||||
}
|
||||
|
||||
if succeed {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return errors.New(errInvalidOperation)
|
||||
return errInvalidOperation
|
||||
}
|
||||
|
||||
// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
|
||||
@@ -831,11 +844,13 @@ func (l *Listener) AcceptKCP() (*UDPSession, error) {
|
||||
|
||||
select {
|
||||
case <-timeout:
|
||||
return nil, &errTimeout{}
|
||||
return nil, errors.WithStack(errTimeout)
|
||||
case c := <-l.chAccepts:
|
||||
return c, nil
|
||||
case <-l.chSocketReadError:
|
||||
return nil, l.socketReadError.Load().(error)
|
||||
case <-l.die:
|
||||
return nil, errors.New(errBrokenPipe)
|
||||
return nil, errors.WithStack(io.ErrClosedPipe)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -853,15 +868,21 @@ func (l *Listener) SetReadDeadline(t time.Time) error {
|
||||
}
|
||||
|
||||
// SetWriteDeadline implements the Conn SetWriteDeadline method.
|
||||
func (l *Listener) SetWriteDeadline(t time.Time) error {
|
||||
l.wd.Store(t)
|
||||
return nil
|
||||
}
|
||||
func (l *Listener) SetWriteDeadline(t time.Time) error { return errInvalidOperation }
|
||||
|
||||
// Close stops listening on the UDP address. Already Accepted connections are not closed.
|
||||
// Close stops listening on the UDP address, and closes the socket
|
||||
func (l *Listener) Close() error {
|
||||
close(l.die)
|
||||
return l.conn.Close()
|
||||
var once bool
|
||||
l.dieOnce.Do(func() {
|
||||
close(l.die)
|
||||
once = true
|
||||
})
|
||||
|
||||
if once {
|
||||
return l.conn.Close()
|
||||
} else {
|
||||
return errors.WithStack(io.ErrClosedPipe)
|
||||
}
|
||||
}
|
||||
|
||||
// closeSession notify the listener that a session has closed
|
||||
@@ -881,16 +902,21 @@ func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
|
||||
// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
|
||||
func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
|
||||
|
||||
// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
|
||||
// dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
|
||||
// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption.
|
||||
//
|
||||
// 'block' is the block encryption algorithm to encrypt packets.
|
||||
//
|
||||
// 'dataShards', 'parityShards' specifiy how many parity packets will be generated following the data packets.
|
||||
//
|
||||
// Check https://github.com/klauspost/reedsolomon for details
|
||||
func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
|
||||
udpaddr, err := net.ResolveUDPAddr("udp", laddr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
conn, err := net.ListenUDP("udp", udpaddr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "net.ListenUDP")
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return ServeConn(block, dataShards, parityShards, conn)
|
||||
@@ -908,6 +934,7 @@ func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketCo
|
||||
l.parityShards = parityShards
|
||||
l.block = block
|
||||
l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
|
||||
l.chSocketReadError = make(chan struct{})
|
||||
|
||||
// calculate header size
|
||||
if l.block != nil {
|
||||
@@ -921,15 +948,21 @@ func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketCo
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// Dial connects to the remote address "raddr" on the network "udp"
|
||||
// Dial connects to the remote address "raddr" on the network "udp" without encryption and FEC
|
||||
func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
|
||||
|
||||
// DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
|
||||
//
|
||||
// 'block' is the block encryption algorithm to encrypt packets.
|
||||
//
|
||||
// 'dataShards', 'parityShards' specifiy how many parity packets will be generated following the data packets.
|
||||
//
|
||||
// Check https://github.com/klauspost/reedsolomon for details
|
||||
func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
|
||||
// network type detection
|
||||
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
network := "udp4"
|
||||
if udpaddr.IP.To4() == nil {
|
||||
@@ -938,30 +971,33 @@ func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards in
|
||||
|
||||
conn, err := net.ListenUDP(network, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "net.DialUDP")
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return NewConn(raddr, block, dataShards, parityShards, conn)
|
||||
}
|
||||
|
||||
// NewConn3 establishes a session and talks KCP protocol over a packet connection.
|
||||
func NewConn3(convid uint32, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
|
||||
return newUDPSession(convid, dataShards, parityShards, nil, conn, raddr, block), nil
|
||||
}
|
||||
|
||||
// NewConn2 establishes a session and talks KCP protocol over a packet connection.
|
||||
func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
|
||||
var convid uint32
|
||||
binary.Read(rand.Reader, binary.LittleEndian, &convid)
|
||||
return NewConn3(convid, raddr, block, dataShards, parityShards, conn)
|
||||
}
|
||||
|
||||
// NewConn establishes a session and talks KCP protocol over a packet connection.
|
||||
func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
|
||||
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
var convid uint32
|
||||
binary.Read(rand.Reader, binary.LittleEndian, &convid)
|
||||
return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
|
||||
return NewConn2(udpaddr, block, dataShards, parityShards, conn)
|
||||
}
|
||||
|
||||
// monotonic reference time point
|
||||
var refTime time.Time = time.Now()
|
||||
|
||||
// currentMs returns current elasped monotonic milliseconds since program startup
|
||||
func currentMs() uint32 { return uint32(time.Now().Sub(refTime) / time.Millisecond) }
|
||||
|
||||
func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, dataShards, parityShards int, conn *net.UDPConn) (*UDPSession, error) {
|
||||
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
|
||||
if err != nil {
|
||||
|
||||
25
vendor/github.com/fatedier/kcp-go/tx.go
generated
vendored
Normal file
25
vendor/github.com/fatedier/kcp-go/tx.go
generated
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
func (s *UDPSession) defaultTx(txqueue []ipv4.Message) {
|
||||
nbytes := 0
|
||||
npkts := 0
|
||||
for k := range txqueue {
|
||||
if n, err := s.conn.WriteTo(txqueue[k].Buffers[0], txqueue[k].Addr); err == nil {
|
||||
nbytes += n
|
||||
npkts++
|
||||
xmitBuf.Put(txqueue[k].Buffers[0])
|
||||
} else {
|
||||
s.notifyWriteError(errors.WithStack(err))
|
||||
break
|
||||
}
|
||||
}
|
||||
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
|
||||
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
|
||||
}
|
||||
11
vendor/github.com/fatedier/kcp-go/tx_generic.go
generated
vendored
Normal file
11
vendor/github.com/fatedier/kcp-go/tx_generic.go
generated
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
// +build !linux
|
||||
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
func (s *UDPSession) tx(txqueue []ipv4.Message) {
|
||||
s.defaultTx(txqueue)
|
||||
}
|
||||
52
vendor/github.com/fatedier/kcp-go/tx_linux.go
generated
vendored
Normal file
52
vendor/github.com/fatedier/kcp-go/tx_linux.go
generated
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
// +build linux
|
||||
|
||||
package kcp
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
func (s *UDPSession) tx(txqueue []ipv4.Message) {
|
||||
// default version
|
||||
if s.xconn == nil || s.xconnWriteError != nil {
|
||||
s.defaultTx(txqueue)
|
||||
return
|
||||
}
|
||||
|
||||
// x/net version
|
||||
nbytes := 0
|
||||
npkts := 0
|
||||
for len(txqueue) > 0 {
|
||||
if n, err := s.xconn.WriteBatch(txqueue, 0); err == nil {
|
||||
for k := range txqueue[:n] {
|
||||
nbytes += len(txqueue[k].Buffers[0])
|
||||
xmitBuf.Put(txqueue[k].Buffers[0])
|
||||
}
|
||||
npkts += n
|
||||
txqueue = txqueue[n:]
|
||||
} else {
|
||||
// compatibility issue:
|
||||
// for linux kernel<=2.6.32, support for sendmmsg is not available
|
||||
// an error of type os.SyscallError will be returned
|
||||
if operr, ok := err.(*net.OpError); ok {
|
||||
if se, ok := operr.Err.(*os.SyscallError); ok {
|
||||
if se.Syscall == "sendmmsg" {
|
||||
s.xconnWriteError = se
|
||||
s.defaultTx(txqueue)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
s.notifyWriteError(errors.WithStack(err))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
|
||||
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
|
||||
}
|
||||
8
vendor/github.com/fatedier/kcp-go/updater.go
generated
vendored
8
vendor/github.com/fatedier/kcp-go/updater.go
generated
vendored
@@ -76,10 +76,10 @@ func (h *updateHeap) wakeup() {
|
||||
}
|
||||
|
||||
func (h *updateHeap) updateTask() {
|
||||
var timer <-chan time.Time
|
||||
timer := time.NewTimer(0)
|
||||
for {
|
||||
select {
|
||||
case <-timer:
|
||||
case <-timer.C:
|
||||
case <-h.chWakeUp:
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ func (h *updateHeap) updateTask() {
|
||||
hlen := h.Len()
|
||||
for i := 0; i < hlen; i++ {
|
||||
entry := &h.entries[0]
|
||||
if time.Now().After(entry.ts) {
|
||||
if !time.Now().Before(entry.ts) {
|
||||
interval := entry.s.update()
|
||||
entry.ts = time.Now().Add(interval)
|
||||
heap.Fix(h, 0)
|
||||
@@ -97,7 +97,7 @@ func (h *updateHeap) updateTask() {
|
||||
}
|
||||
|
||||
if hlen > 0 {
|
||||
timer = time.After(h.entries[0].ts.Sub(time.Now()))
|
||||
timer.Reset(h.entries[0].ts.Sub(time.Now()))
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user