Stop using fixedConnMapping

master
Andy Wang 4 weeks ago
parent de4dab6bf3
commit 5988b4337d
No known key found for this signature in database
GPG Key ID: 181B49F9F38F3374

@ -265,6 +265,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
}
func (sesh *Session) SetTerminalMsg(msg string) {
log.Debug("terminal message set to " + msg)
sesh.terminalMsgSetter.Do(func() {
sesh.terminalMsg = msg
})

@ -2,13 +2,12 @@ package multiplex
import (
"errors"
"math/rand"
"github.com/cbeuw/Cloak/internal/common"
log "github.com/sirupsen/logrus"
"math/rand/v2"
"net"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
)
type switchboardStrategy int
@ -39,19 +38,14 @@ type switchboard struct {
}
func makeSwitchboard(sesh *Session) *switchboard {
var strategy switchboardStrategy
if sesh.Unordered {
log.Debug("Connection is unordered")
strategy = uniformSpread
} else {
strategy = fixedConnMapping
}
sb := &switchboard{
session: sesh,
strategy: strategy,
strategy: uniformSpread,
valve: sesh.Valve,
randPool: sync.Pool{New: func() interface{} {
return rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
var state [32]byte
common.CryptoRandRead(state[:])
return rand.New(rand.NewChaCha8(state))
}},
}
return sb
@ -60,8 +54,8 @@ func makeSwitchboard(sesh *Session) *switchboard {
var errBrokenSwitchboard = errors.New("the switchboard is broken")
func (sb *switchboard) addConn(conn net.Conn) {
atomic.AddUint32(&sb.connsCount, 1)
sb.conns.Store(conn, conn)
connId := atomic.AddUint32(&sb.connsCount, 1) - 1
sb.conns.Store(connId, conn)
go sb.deplex(conn)
}
@ -86,6 +80,9 @@ func (sb *switchboard) send(data []byte, assignedConn *net.Conn) (n int, err err
return n, err
}
case fixedConnMapping:
// FIXME: this strategy has a tendency to cause a TLS conn socket buffer to fill up,
// which is a problem when multiple streams are mapped to the same conn, resulting
// in all such streams being blocked.
conn = *assignedConn
if conn == nil {
conn, err = sb.pickRandConn()
@ -110,7 +107,7 @@ func (sb *switchboard) send(data []byte, assignedConn *net.Conn) (n int, err err
return n, nil
}
// returns a random connId
// returns a random conn. This function can be called concurrently.
func (sb *switchboard) pickRandConn() (net.Conn, error) {
if atomic.LoadUint32(&sb.broken) == 1 {
return nil, errBrokenSwitchboard
@ -122,22 +119,15 @@ func (sb *switchboard) pickRandConn() (net.Conn, error) {
}
randReader := sb.randPool.Get().(*rand.Rand)
r := randReader.Intn(int(connsCount))
connId := randReader.Uint32N(connsCount)
sb.randPool.Put(randReader)
var c int
var ret net.Conn
sb.conns.Range(func(_, conn interface{}) bool {
if r == c {
ret = conn.(net.Conn)
return false
}
c++
return true
})
return ret, nil
ret, ok := sb.conns.Load(connId)
if !ok {
log.Errorf("failed to get conn %d", connId)
return nil, errBrokenSwitchboard
}
return ret.(net.Conn), nil
}
// actively triggered by session.Close()
@ -145,10 +135,10 @@ func (sb *switchboard) closeAll() {
if !atomic.CompareAndSwapUint32(&sb.broken, 0, 1) {
return
}
atomic.StoreUint32(&sb.connsCount, 0)
sb.conns.Range(func(_, conn interface{}) bool {
conn.(net.Conn).Close()
sb.conns.Delete(conn)
atomic.AddUint32(&sb.connsCount, ^uint32(0))
return true
})
}

Loading…
Cancel
Save