From 5988b4337d3bd9b1ff6f5674248588d215a8f68d Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sun, 14 Apr 2024 16:25:54 +0100 Subject: [PATCH] Stop using fixedConnMapping --- internal/multiplex/session.go | 1 + internal/multiplex/switchboard.go | 52 +++++++++++++------------------ 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index e176343..870346b 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -265,6 +265,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error { } func (sesh *Session) SetTerminalMsg(msg string) { + log.Debug("terminal message set to " + msg) sesh.terminalMsgSetter.Do(func() { sesh.terminalMsg = msg }) diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 52ee77b..86be66e 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -2,13 +2,12 @@ package multiplex import ( "errors" - "math/rand" + "github.com/cbeuw/Cloak/internal/common" + log "github.com/sirupsen/logrus" + "math/rand/v2" "net" "sync" "sync/atomic" - "time" - - log "github.com/sirupsen/logrus" ) type switchboardStrategy int @@ -39,19 +38,14 @@ type switchboard struct { } func makeSwitchboard(sesh *Session) *switchboard { - var strategy switchboardStrategy - if sesh.Unordered { - log.Debug("Connection is unordered") - strategy = uniformSpread - } else { - strategy = fixedConnMapping - } sb := &switchboard{ session: sesh, - strategy: strategy, + strategy: uniformSpread, valve: sesh.Valve, randPool: sync.Pool{New: func() interface{} { - return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) + var state [32]byte + common.CryptoRandRead(state[:]) + return rand.New(rand.NewChaCha8(state)) }}, } return sb @@ -60,8 +54,8 @@ func makeSwitchboard(sesh *Session) *switchboard { var errBrokenSwitchboard = errors.New("the switchboard is broken") func (sb *switchboard) addConn(conn net.Conn) { - atomic.AddUint32(&sb.connsCount, 1) - sb.conns.Store(conn, conn) + connId := atomic.AddUint32(&sb.connsCount, 1) - 1 + sb.conns.Store(connId, conn) go sb.deplex(conn) } @@ -86,6 +80,9 @@ func (sb *switchboard) send(data []byte, assignedConn *net.Conn) (n int, err err return n, err } case fixedConnMapping: + // FIXME: this strategy has a tendency to cause a TLS conn socket buffer to fill up, + // which is a problem when multiple streams are mapped to the same conn, resulting + // in all such streams being blocked. conn = *assignedConn if conn == nil { conn, err = sb.pickRandConn() @@ -110,7 +107,7 @@ func (sb *switchboard) send(data []byte, assignedConn *net.Conn) (n int, err err return n, nil } -// returns a random connId +// returns a random conn. This function can be called concurrently. func (sb *switchboard) pickRandConn() (net.Conn, error) { if atomic.LoadUint32(&sb.broken) == 1 { return nil, errBrokenSwitchboard @@ -122,22 +119,15 @@ func (sb *switchboard) pickRandConn() (net.Conn, error) { } randReader := sb.randPool.Get().(*rand.Rand) - - r := randReader.Intn(int(connsCount)) + connId := randReader.Uint32N(connsCount) sb.randPool.Put(randReader) - var c int - var ret net.Conn - sb.conns.Range(func(_, conn interface{}) bool { - if r == c { - ret = conn.(net.Conn) - return false - } - c++ - return true - }) - - return ret, nil + ret, ok := sb.conns.Load(connId) + if !ok { + log.Errorf("failed to get conn %d", connId) + return nil, errBrokenSwitchboard + } + return ret.(net.Conn), nil } // actively triggered by session.Close() @@ -145,10 +135,10 @@ func (sb *switchboard) closeAll() { if !atomic.CompareAndSwapUint32(&sb.broken, 0, 1) { return } + atomic.StoreUint32(&sb.connsCount, 0) sb.conns.Range(func(_, conn interface{}) bool { conn.(net.Conn).Close() sb.conns.Delete(conn) - atomic.AddUint32(&sb.connsCount, ^uint32(0)) return true }) }