Refactor singleplex handling

ptspec
Andy Wang 2 years ago
parent f30141b388
commit 0a6846fbfc
No known key found for this signature in database
GPG Key ID: 181B49F9F38F3374

@ -123,7 +123,7 @@ upstream proxy server. Zero or negative value disables it. Default is 0 (disable
`UID` is your UID in base64.
`Transport` can be either `direct` or `CDN`. If the server host wishes you to connect to it directly, use `direct`. If
instead a CDN is used, use `CDN`.
instead a CDN is used, use `CDN`. Defaults to `direct`
`PublicKey` is the static curve25519 public key in base64, given by the server admin.
@ -131,7 +131,7 @@ instead a CDN is used, use `CDN`.
server's `ProxyBook` exactly.
`EncryptionMethod` is the name of the encryption algorithm you want Cloak to use. Options are `plain`, `aes-256-gcm` (
synonymous to `aes-gcm`), `aes-128-gcm`, and `chacha20-poly1305`. Note: Cloak isn't intended to provide transport
synonymous to `aes-gcm`), `aes-128-gcm`, and `chacha20-poly1305`. Defaults to `aes-256-gcm` if empty. Note: Cloak isn't intended to provide transport
security. The point of encryption is to hide fingerprints of proxy protocols and render the payload statistically
random-like. **You may only leave it as `plain` if you are certain that your underlying proxy tool already provides BOTH
encryption and authentication (via AEAD or similar techniques).**
@ -141,7 +141,7 @@ match `RedirAddr` in the server's configuration, a major site the censor allows,
`AlternativeNames` is an array used alongside `ServerName` to shuffle between different ServerNames for every new
connection. **This may conflict with `CDN` Transport mode** if the CDN provider prohibits domain fronting and rejects
the alternative domains.
the alternative domains. Default is empty.
Example:
@ -165,8 +165,8 @@ requests under specific url path are forwarded.
`NumConn` is the amount of underlying TCP connections you want to use. The default of 4 should be appropriate for most
people. Setting it too high will hinder the performance. Setting it to 0 will disable connection multiplexing and each
TCP connection will spawn a separate short-lived session that will be closed after it is terminated. This makes it
behave like GoQuiet. This maybe useful for people with unstable connections.
TCP connection will spawn a separate short-lived session that will be closed after it is terminated. This maybe useful
for people with unstable connections.
`BrowserSig` is the browser you want to **appear** to be using. It's not relevant to the browser you are actually using.
Currently, `chrome`, `firefox` and `safari` are supported.

@ -20,6 +20,10 @@ import (
var version string
func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
// Should be 127.0.0.1 to listen to a proxy client on this machine
var localHost string
// port used by proxy clients to communicate with cloak client
@ -74,9 +78,6 @@ func main() {
log.Info("Starting standalone mode")
}
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
lvl, err := log.ParseLevel(*verbosity)
if err != nil {
log.Fatal(err)
@ -194,12 +195,12 @@ func main() {
return net.ListenUDP("udp", udpAddr)
}
cli_client.RouteUDP(acceptor, localConfig.Timeout, remoteConfig.Singleplex, seshMaker)
cli_client.RouteUDP(acceptor, localConfig.Timeout, localConfig.Singleplex, seshMaker)
} else {
listener, err := net.Listen("tcp", localConfig.LocalAddr)
if err != nil {
log.Fatal(err)
}
cli_client.RouteTCP(listener, localConfig.Timeout, remoteConfig.Singleplex, seshMaker)
cli_client.RouteTCP(listener, localConfig.Timeout, localConfig.Singleplex, seshMaker)
}
}

@ -20,10 +20,11 @@ type CLIConfig struct {
// LocalPort is the port to listen for incomig proxy client connections
LocalPort string // jsonOptional
// AlternativeNames is a list of ServerName Cloak may randomly pick from for different sessions
// Optional
AlternativeNames []string
// StreamTimeout is the duration, in seconds, for an incoming connection to be automatically closed after the last
// piece of incoming data .
// Defaults to 300
// Optional, Defaults to 300
StreamTimeout int
}
@ -108,6 +109,7 @@ type LocalConnConfig struct {
LocalAddr string
Timeout time.Duration
MockDomainList []string
Singleplex bool
}
func (raw *CLIConfig) ProcessCLIConfig(worldState common.WorldState) (local LocalConnConfig, remote client.RemoteConnConfig, auth client.AuthInfo, err error) {
@ -116,15 +118,18 @@ func (raw *CLIConfig) ProcessCLIConfig(worldState common.WorldState) (local Loca
return
}
var filteredAlternativeNames []string
for _, alternativeName := range raw.AlternativeNames {
if len(alternativeName) > 0 {
filteredAlternativeNames = append(filteredAlternativeNames, alternativeName)
if raw.AlternativeNames != nil && len(raw.AlternativeNames) > 0 {
var filteredAlternativeNames []string
for _, alternativeName := range raw.AlternativeNames {
if len(alternativeName) > 0 {
filteredAlternativeNames = append(filteredAlternativeNames, alternativeName)
}
}
local.MockDomainList = raw.AlternativeNames
} else {
local.MockDomainList = []string{}
}
raw.AlternativeNames = filteredAlternativeNames
local.MockDomainList = raw.AlternativeNames
local.MockDomainList = append(local.MockDomainList, auth.MockDomain)
if raw.LocalHost == "" {
@ -143,5 +148,7 @@ func (raw *CLIConfig) ProcessCLIConfig(worldState common.WorldState) (local Loca
local.Timeout = time.Duration(raw.StreamTimeout) * time.Second
}
local.Singleplex = raw.NumConn != nil && *raw.NumConn == 0
return
}

@ -1,10 +1,11 @@
package cli_client
import (
"github.com/cbeuw/Cloak/internal/common"
"github.com/cbeuw/Cloak/libcloak/client"
"github.com/stretchr/testify/assert"
"io/ioutil"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseConfig(t *testing.T) {
@ -33,5 +34,42 @@ func TestParseConfig(t *testing.T) {
_, err := ParseConfig(tmpConfig.Name())
assert.Error(t, err)
})
}
func TestProcessCLIConfig(t *testing.T) {
config := CLIConfig{
Config: client.Config{
ServerName: "bbc.co.uk",
// ProxyMethod is the name of the underlying proxy you wish
// to connect to, as determined by your server. The value can
// be any string whose UTF-8 ENCODED byte length is no greater than
// 12 bytes
ProxyMethod: "ssh",
// UID is a 16-byte secret string unique to an authorised user
// The same UID can be used by the same user for multiple Cloak connections
UID: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
// PublicKey is the 32-byte public Curve25519 ECDH key of your server
PublicKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
// RemoteHost is the Cloak server's hostname or IP address
RemoteHost: "1.2.3.4",
},
LocalHost: "0.0.0.0",
LocalPort: "1234",
}
t.Run("Zero means singleplex", func(t *testing.T) {
zero := 0
config := config
config.NumConn = &zero
local, _, _, err := config.ProcessCLIConfig(common.RealWorldState)
assert.NoError(t, err)
assert.True(t, local.Singleplex)
})
t.Run("Empty means no singleplex", func(t *testing.T) {
config := config
local, _, _, err := config.ProcessCLIConfig(common.RealWorldState)
assert.NoError(t, err)
assert.False(t, local.Singleplex)
})
}

@ -14,15 +14,13 @@ import (
)
const (
acceptBacklog = 1024
defaultInactivityTimeout = 30 * time.Second
defaultMaxOnWireSize = 1<<14 + 256 // https://tools.ietf.org/html/rfc8446#section-5.2
acceptBacklog = 1024
defaultMaxOnWireSize = 1<<14 + 256 // https://tools.ietf.org/html/rfc8446#section-5.2
)
var ErrBrokenSession = errors.New("broken session")
var errRepeatSessionClosing = errors.New("trying to close a closed session")
var errRepeatStreamClosing = errors.New("trying to close a closed stream")
var errNoMultiplex = errors.New("a singleplexing session can have only one stream")
type SessionConfig struct {
Obfuscator
@ -30,15 +28,15 @@ type SessionConfig struct {
// Valve is used to limit transmission rates, and record and limit usage
Valve
// Unordered determines whether stream packets' order is preserved
Unordered bool
// A Singleplexing session always has just one stream
Singleplex bool
// maximum size of an obfuscated frame, including headers and overhead
// MsgOnWireSizeLimit is maximum size of an obfuscated frame, including headers and overhead
// Optional
MsgOnWireSizeLimit int
// InactivityTimeout sets the duration a Session waits while it has no active streams before it closes itself
// Non-optional. 0 means the session closes immediately after the last stream is closed
InactivityTimeout time.Duration
}
@ -104,9 +102,6 @@ func MakeSession(id uint32, config SessionConfig) *Session {
if config.MsgOnWireSizeLimit <= 0 {
sesh.MsgOnWireSizeLimit = defaultMaxOnWireSize
}
if config.InactivityTimeout == 0 {
sesh.InactivityTimeout = defaultInactivityTimeout
}
sesh.maxStreamUnitWrite = sesh.MsgOnWireSizeLimit - frameHeaderLength - sesh.maxOverhead
sesh.streamSendBufferSize = sesh.MsgOnWireSizeLimit
@ -118,7 +113,13 @@ func MakeSession(id uint32, config SessionConfig) *Session {
}}
sesh.sb = makeSwitchboard(sesh)
time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout)
if sesh.InactivityTimeout > 0 {
time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout)
} else {
// The user wants session to close immediately after the last stream closes,
// but we still give them some time to start a stream first
time.AfterFunc(10*time.Second, sesh.checkTimeout)
}
return sesh
}
@ -149,12 +150,6 @@ func (sesh *Session) OpenStream() (*Stream, error) {
return nil, ErrBrokenSession
}
id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1
// Because atomic.AddUint32 returns the value after incrementation
if sesh.Singleplex && id > 1 {
// if there are more than one streams, which shouldn't happen if we are
// singleplexing
return nil, errNoMultiplex
}
stream := makeStream(sesh, id)
sesh.streamsM.Lock()
sesh.streams[id] = stream
@ -213,12 +208,8 @@ func (sesh *Session) closeStream(s *Stream, active bool) error {
sesh.streams[s.id] = nil
sesh.streamsM.Unlock()
if sesh.streamCountDecr() == 0 {
if sesh.Singleplex {
return sesh.Close()
} else {
log.Debugf("session %v has no active stream left", sesh.id)
time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout)
}
log.Debugf("session %v has no active stream left", sesh.id)
time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout)
}
return nil
}

@ -17,8 +17,8 @@ import (
)
var seshConfigs = map[string]SessionConfig{
"ordered": {},
"unordered": {Unordered: true},
"ordered": {InactivityTimeout: 30 * time.Second},
"unordered": {Unordered: true, InactivityTimeout: 30 * time.Second},
}
var encryptionMethods = map[string]byte{
"plain": EncryptionMethodPlain,

@ -80,19 +80,21 @@ func TestStream_WriteSync(t *testing.T) {
// Close calls made after write MUST have a higher seq
var sessionKey [32]byte
rand.Read(sessionKey[:])
clientSesh := setupSesh(false, sessionKey, EncryptionMethodPlain)
serverSesh := setupSesh(false, sessionKey, EncryptionMethodPlain)
w, r := connutil.AsyncPipe()
clientSesh.AddConnection(common.NewTLSConn(w))
serverSesh.AddConnection(common.NewTLSConn(r))
testData := make([]byte, payloadLen)
rand.Read(testData)
t.Run("test single", func(t *testing.T) {
clientSesh := setupSesh(false, sessionKey, EncryptionMethodPlain)
serverSesh := setupSesh(false, sessionKey, EncryptionMethodPlain)
w, r := connutil.AsyncPipe()
clientSesh.AddConnection(common.NewTLSConn(w))
serverSesh.AddConnection(common.NewTLSConn(r))
go func() {
stream, _ := clientSesh.OpenStream()
stream.Write(testData)
stream.Close()
stream, err := clientSesh.OpenStream()
assert.NoError(t, err)
_, err = stream.Write(testData)
assert.NoError(t, err)
}()
recvBuf := make([]byte, payloadLen)
@ -104,12 +106,19 @@ func TestStream_WriteSync(t *testing.T) {
})
t.Run("test multiple", func(t *testing.T) {
clientSesh := setupSesh(false, sessionKey, EncryptionMethodPlain)
serverSesh := setupSesh(false, sessionKey, EncryptionMethodPlain)
w, r := connutil.AsyncPipe()
clientSesh.AddConnection(common.NewTLSConn(w))
serverSesh.AddConnection(common.NewTLSConn(r))
const numStreams = 100
for i := 0; i < numStreams; i++ {
go func() {
stream, _ := clientSesh.OpenStream()
stream.Write(testData)
stream.Close()
stream, err := clientSesh.OpenStream()
assert.NoError(t, err)
_, err = stream.Write(testData)
assert.NoError(t, err)
}()
}
for i := 0; i < numStreams; i++ {

@ -157,7 +157,7 @@ func (m *mockUDPDialer) Dial(network, address string) (net.Conn, error) {
return net.DialUDP("udp", nil, m.raddr)
}
func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverState *server.State) (common.Dialer, *connutil.PipeListener, common.Dialer, net.Listener, error) {
func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverState *server.State, singleplex bool) (common.Dialer, *connutil.PipeListener, common.Dialer, net.Listener, error) {
// redirecting web server
// ^
// |
@ -174,6 +174,9 @@ func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverSta
// |
// whatever connection initiator (including a proper ck-client)
if singleplex && rcc.NumConn != 1 {
log.Fatal("NumConn must be 1 under singleplex")
}
netToCkServerD, ckServerListener := connutil.DialerListener(10 * 1024)
clientSeshMaker := func() *client.CloakClient {
@ -198,12 +201,12 @@ func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverSta
addrCh <- conn.LocalAddr().(*net.UDPAddr)
return conn, err
}
go cli_client.RouteUDP(acceptor, 300*time.Second, rcc.Singleplex, clientSeshMaker)
go cli_client.RouteUDP(acceptor, 300*time.Second, singleplex, clientSeshMaker)
proxyToCkClientD = mDialer
} else {
var proxyToCkClientL *connutil.PipeListener
proxyToCkClientD, proxyToCkClientL = connutil.DialerListener(10 * 1024)
go cli_client.RouteTCP(proxyToCkClientL, 300*time.Second, rcc.Singleplex, clientSeshMaker)
go cli_client.RouteTCP(proxyToCkClientL, 300*time.Second, singleplex, clientSeshMaker)
}
// set up server
@ -259,7 +262,7 @@ func TestUDP(t *testing.T) {
sta := basicServerState(worldState)
t.Run("simple send", func(t *testing.T) {
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false)
if err != nil {
t.Fatal(err)
}
@ -296,7 +299,7 @@ func TestUDP(t *testing.T) {
const echoMsgLen = 1024
t.Run("user echo", func(t *testing.T) {
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false)
if err != nil {
t.Fatal(err)
}
@ -318,7 +321,7 @@ func TestTCPSingleplex(t *testing.T) {
worldState := common.WorldOfTime(time.Unix(10, 0))
rcc, ai := generateClientConfigs(singleplexTCPConfig, worldState)
sta := basicServerState(worldState)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, true)
if err != nil {
t.Fatal(err)
}
@ -386,7 +389,7 @@ func TestTCPMultiplex(t *testing.T) {
writeData := make([]byte, dataLen)
rand.Read(writeData)
t.Run(fmt.Sprintf("data length %v", dataLen), func(t *testing.T) {
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false)
if err != nil {
t.Fatal(err)
}
@ -419,7 +422,7 @@ func TestTCPMultiplex(t *testing.T) {
const echoMsgLen = 16384
t.Run("user echo", func(t *testing.T) {
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false)
if err != nil {
t.Fatal(err)
}
@ -437,7 +440,7 @@ func TestTCPMultiplex(t *testing.T) {
})
t.Run("redir echo", func(t *testing.T) {
_, _, netToCkServerD, redirFromCkServerL, err := establishSession(rcc, ai, sta)
_, _, netToCkServerD, redirFromCkServerL, err := establishSession(rcc, ai, sta, false)
if err != nil {
t.Fatal(err)
}
@ -458,13 +461,13 @@ func TestClosingStreamsFromProxy(t *testing.T) {
log.SetLevel(log.ErrorLevel)
worldState := common.WorldOfTime(time.Unix(10, 0))
for clientConfigName, clientConfig := range map[string]client.Config{"basic": basicTCPConfig, "singleplex": singleplexTCPConfig} {
clientConfig := clientConfig
for clientConfigName, clientConfig := range map[string]client.Config{"multiplex": basicTCPConfig, "singleplex": singleplexTCPConfig} {
clientConfigName := clientConfigName
clientConfig := clientConfig
t.Run(clientConfigName, func(t *testing.T) {
rcc, ai := generateClientConfigs(clientConfig, worldState)
sta := basicServerState(worldState)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, *clientConfig.NumConn == 0)
if err != nil {
t.Fatal(err)
}
@ -538,7 +541,7 @@ func BenchmarkIntegration(b *testing.B) {
for name, method := range encryptionMethods {
b.Run(name, func(b *testing.B) {
ai.EncryptionMethod = method
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta)
proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false)
if err != nil {
b.Fatal(err)
}

@ -72,14 +72,18 @@ type Config struct {
// RemotePort is the port Cloak server is listening to
// Defaults to 443
RemotePort string
// InactivityTimeout is the number of seconds the client keeps the underlying connections to the server
// after the last proxy connection is disconnected.
// Defaults to 30. Always set to 0 under Singleplex mode (NumConn == 0)
InactivityTimeout *int
}
type RemoteConnConfig struct {
Singleplex bool
NumConn int
KeepAlive time.Duration
RemoteAddr string
TransportMaker func() transports.Transport
NumConn int
KeepAlive time.Duration
RemoteAddr string
TransportMaker func() transports.Transport
InactivityTimeout time.Duration
}
type AuthInfo = transports.AuthInfo
@ -142,15 +146,20 @@ func (raw *Config) Process(worldState common.WorldState) (remote RemoteConnConfi
remotePort = raw.RemotePort
}
remote.RemoteAddr = net.JoinHostPort(raw.RemoteHost, remotePort)
if raw.InactivityTimeout == nil {
remote.InactivityTimeout = 30 * time.Second
} else {
remote.InactivityTimeout = time.Duration(*raw.InactivityTimeout) * time.Second
}
if raw.NumConn == nil {
remote.NumConn = 4
remote.Singleplex = false
} else if *raw.NumConn <= 0 {
remote.NumConn = 1
remote.Singleplex = true
remote.InactivityTimeout = 0
} else {
remote.NumConn = *raw.NumConn
remote.Singleplex = false
}
// Transport and (if TLS mode), browser

@ -65,11 +65,11 @@ func NewCloakClient(connConfig RemoteConnConfig, authInfo AuthInfo, dialer commo
}
seshConfig := mux.SessionConfig{
Singleplex: connConfig.Singleplex,
Obfuscator: obfuscator,
Valve: nil,
Unordered: authInfo.Unordered,
MsgOnWireSizeLimit: appDataMaxLength,
InactivityTimeout: connConfig.InactivityTimeout,
}
session := mux.MakeSession(authInfo.SessionId, seshConfig)

Loading…
Cancel
Save