Give more descriptive names to multiplex objects

pull/132/head
Andy Wang 4 years ago
parent f00ef43613
commit 3e133090f0

@ -9,10 +9,10 @@ import (
"time"
)
// datagramBuffer is the same as bufferedPipe with the exception that it's message-oriented,
// datagramBufferedPipe is the same as streamBufferedPipe with the exception that it's message-oriented,
// instead of byte-oriented. The integrity of datagrams written into this buffer is preserved.
// it won't get chopped up into individual bytes
type datagramBuffer struct {
type datagramBufferedPipe struct {
pLens []int
buf *bytes.Buffer
closed bool
@ -21,14 +21,14 @@ type datagramBuffer struct {
rDeadline time.Time
}
func NewDatagramBuffer() *datagramBuffer {
d := &datagramBuffer{
func NewDatagramBufferedPipe() *datagramBufferedPipe {
d := &datagramBufferedPipe{
rwCond: sync.NewCond(&sync.Mutex{}),
}
return d
}
func (d *datagramBuffer) Read(target []byte) (int, error) {
func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
@ -63,7 +63,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) {
return dataLen, nil
}
func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) {
func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
@ -104,7 +104,7 @@ func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) {
}
}
func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
func (d *datagramBufferedPipe) Write(f Frame) (toBeClosed bool, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
@ -135,7 +135,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
return false, nil
}
func (d *datagramBuffer) Close() error {
func (d *datagramBufferedPipe) Close() error {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
@ -144,7 +144,7 @@ func (d *datagramBuffer) Close() error {
return nil
}
func (d *datagramBuffer) SetReadDeadline(t time.Time) {
func (d *datagramBufferedPipe) SetReadDeadline(t time.Time) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
@ -152,7 +152,7 @@ func (d *datagramBuffer) SetReadDeadline(t time.Time) {
d.rwCond.Broadcast()
}
func (d *datagramBuffer) SetWriteToTimeout(t time.Duration) {
func (d *datagramBufferedPipe) SetWriteToTimeout(t time.Duration) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()

@ -9,7 +9,7 @@ import (
func TestDatagramBuffer_RW(t *testing.T) {
b := []byte{0x01, 0x02, 0x03}
t.Run("simple write", func(t *testing.T) {
pipe := NewDatagramBuffer()
pipe := NewDatagramBufferedPipe()
_, err := pipe.Write(Frame{Payload: b})
if err != nil {
t.Error(
@ -21,7 +21,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
})
t.Run("simple read", func(t *testing.T) {
pipe := NewDatagramBuffer()
pipe := NewDatagramBufferedPipe()
_, _ = pipe.Write(Frame{Payload: b})
b2 := make([]byte, len(b))
n, err := pipe.Read(b2)
@ -54,7 +54,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
})
t.Run("writing closing frame", func(t *testing.T) {
pipe := NewDatagramBuffer()
pipe := NewDatagramBufferedPipe()
toBeClosed, err := pipe.Write(Frame{Closing: C_STREAM})
if !toBeClosed {
t.Error("should be to be closed")
@ -73,7 +73,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
}
func TestDatagramBuffer_BlockingRead(t *testing.T) {
pipe := NewDatagramBuffer()
pipe := NewDatagramBufferedPipe()
b := []byte{0x01, 0x02, 0x03}
go func() {
time.Sleep(100 * time.Millisecond)
@ -108,7 +108,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) {
}
func TestDatagramBuffer_CloseThenRead(t *testing.T) {
pipe := NewDatagramBuffer()
pipe := NewDatagramBufferedPipe()
b := []byte{0x01, 0x02, 0x03}
pipe.Write(Frame{Payload: b})
b2 := make([]byte, len(b))

@ -32,11 +32,17 @@ type Obfuscator struct {
// Used in Stream.Write. Add multiplexing headers, encrypt and add TLS header
Obfs Obfser
// Remove TLS header, decrypt and unmarshall frames
Deobfs Deobfser
SessionKey [32]byte
minOverhead int
Deobfs Deobfser
SessionKey [32]byte
maxOverhead int
}
// MakeObfs returns a function of type Obfser. An Obfser takes three arguments:
// a *Frame with all the field set correctly, a []byte as buffer to put encrypted
// message in, and an int called payloadOffsetInBuf to be used when *Frame.payload
// is in the byte slice used as buffer (2nd argument). payloadOffsetInBuf specifies
// the index at which data belonging to *Frame.Payload starts in the buffer.
func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser {
obfs := func(f *Frame, buf []byte, payloadOffsetInBuf int) (int, error) {
// we need the encrypted data to be at least 8 bytes to be used as nonce for salsa20 stream header encryption
@ -48,7 +54,9 @@ func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser {
}
var extraLen int
if payloadCipher == nil {
if extraLen = 8 - payloadLen; extraLen < 0 {
extraLen = 8 - payloadLen
if extraLen < 0 {
// if our payload is already greater than 8 bytes
extraLen = 0
}
} else {
@ -92,6 +100,9 @@ func MakeObfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Obfser {
return obfs
}
// MakeDeobfs returns a function Deobfser. A Deobfser takes in a single byte slice,
// containing the message to be decrypted, and returns a *Frame containing the frame
// information and plaintext
func MakeDeobfs(salsaKey [32]byte, payloadCipher cipher.AEAD) Deobfser {
// stream header length + minimum data size (i.e. nonce size of salsa20)
const minInputLen = HEADER_LEN + 8
@ -151,7 +162,7 @@ func MakeObfuscator(encryptionMethod byte, sessionKey [32]byte) (obfuscator Obfu
switch encryptionMethod {
case E_METHOD_PLAIN:
payloadCipher = nil
obfuscator.minOverhead = 0
obfuscator.maxOverhead = 0
case E_METHOD_AES_GCM:
var c cipher.Block
c, err = aes.NewCipher(sessionKey[:])
@ -162,13 +173,13 @@ func MakeObfuscator(encryptionMethod byte, sessionKey [32]byte) (obfuscator Obfu
if err != nil {
return
}
obfuscator.minOverhead = payloadCipher.Overhead()
obfuscator.maxOverhead = payloadCipher.Overhead()
case E_METHOD_CHACHA20_POLY1305:
payloadCipher, err = chacha20poly1305.New(sessionKey[:])
if err != nil {
return
}
obfuscator.minOverhead = payloadCipher.Overhead()
obfuscator.maxOverhead = payloadCipher.Overhead()
default:
return obfuscator, errors.New("Unknown encryption method")
}

@ -1,15 +1,23 @@
package multiplex
import (
"errors"
"io"
"time"
)
var ErrTimeout = errors.New("deadline exceeded")
type recvBuffer interface {
// Read calls' err must be nil | io.EOF | io.ErrShortBuffer
// Read should NOT return error on a closed streamBuffer with a non-empty buffer.
// Instead, it should behave as if it hasn't been closed. Closure is only relevant
// when the buffer is empty.
io.ReadCloser
io.WriterTo
Write(Frame) (toBeClosed bool, err error)
SetReadDeadline(time time.Time)
// SetWriteToTimeout sets the duration a recvBuffer waits in a WriteTo call when nothing
// has been written for a while. After that duration it should return ErrTimeout
SetWriteToTimeout(d time.Duration)
}

@ -34,7 +34,8 @@ type SessionConfig struct {
Singleplex bool
MaxFrameSize int // maximum size of the frame, including the header
// maximum size of Frame.Payload
MaxFrameSize int
SendBufferSize int
ReceiveBufferSize int
}
@ -64,7 +65,8 @@ type Session struct {
terminalMsg atomic.Value
maxStreamUnitWrite int // the max size passed to Write calls before it splits it into multiple frames
// the max size passed to Write calls before it splits it into multiple frames
maxStreamUnitWrite int
}
func MakeSession(id uint32, config SessionConfig) *Session {
@ -89,7 +91,7 @@ func MakeSession(id uint32, config SessionConfig) *Session {
sesh.MaxFrameSize = defaultSendRecvBufSize - 1024
}
// todo: validation. this must be smaller than the buffer sizes
sesh.maxStreamUnitWrite = sesh.MaxFrameSize - HEADER_LEN - sesh.Obfuscator.minOverhead
sesh.maxStreamUnitWrite = sesh.MaxFrameSize - HEADER_LEN - sesh.Obfuscator.maxOverhead
sbConfig := switchboardConfig{
valve: sesh.Valve,
@ -156,7 +158,7 @@ func (sesh *Session) closeStream(s *Stream, active bool) error {
if atomic.SwapUint32(&s.closed, 1) == 1 {
return fmt.Errorf("closing stream %v: %w", s.id, errRepeatStreamClosing)
}
_ = s.recvBuf.Close() // both datagramBuffer and streamBuffer won't return err on Close()
_ = s.recvBuf.Close() // recvBuf.Close should not return error
if active {
// Notify remote that this stream is closed
@ -291,6 +293,7 @@ func (sesh *Session) Close() error {
return true
})
// we send a notice frame telling remote to close the session
pad := genRandomPadding()
f := &Frame{
StreamID: 0xffffffff,

@ -335,7 +335,7 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) {
}
}
func TestParallel(t *testing.T) {
func TestParallelStreams(t *testing.T) {
rand.Seed(0)
var sessionKey [32]byte

@ -43,7 +43,7 @@ type Stream struct {
func makeStream(sesh *Session, id uint32) *Stream {
var recvBuf recvBuffer
if sesh.Unordered {
recvBuf = NewDatagramBuffer()
recvBuf = NewDatagramBufferedPipe()
} else {
recvBuf = NewStreamBuffer()
}

@ -48,13 +48,17 @@ type streamBuffer struct {
nextRecvSeq uint64
sh sorterHeap
buf *bufferedPipe
buf *streamBufferedPipe
}
// streamBuffer is a wrapper around streamBufferedPipe.
// Its main function is to sort frames in order, and wait for frames to arrive
// if they have arrived out-of-order. Then it writes the payload of frames into
// a streamBufferedPipe.
func NewStreamBuffer() *streamBuffer {
sb := &streamBuffer{
sh: []*Frame{},
buf: NewBufferedPipe(),
buf: NewStreamBufferedPipe(),
}
return sb
}

@ -4,7 +4,6 @@ package multiplex
import (
"bytes"
"errors"
"io"
"sync"
"time"
@ -12,10 +11,8 @@ import (
const BUF_SIZE_LIMIT = 1 << 20 * 500
var ErrTimeout = errors.New("deadline exceeded")
// The point of a bufferedPipe is that Read() will block until data is available
type bufferedPipe struct {
// The point of a streamBufferedPipe is that Read() will block until data is available
type streamBufferedPipe struct {
// only alloc when on first Read or Write
buf *bytes.Buffer
@ -25,14 +22,14 @@ type bufferedPipe struct {
wtTimeout time.Duration
}
func NewBufferedPipe() *bufferedPipe {
p := &bufferedPipe{
func NewStreamBufferedPipe() *streamBufferedPipe {
p := &streamBufferedPipe{
rwCond: sync.NewCond(&sync.Mutex{}),
}
return p
}
func (p *bufferedPipe) Read(target []byte) (int, error) {
func (p *streamBufferedPipe) Read(target []byte) (int, error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
@ -60,7 +57,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) {
return n, err
}
func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
@ -98,7 +95,7 @@ func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
}
}
func (p *bufferedPipe) Write(input []byte) (int, error) {
func (p *streamBufferedPipe) Write(input []byte) (int, error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
@ -120,7 +117,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
return n, err
}
func (p *bufferedPipe) Close() error {
func (p *streamBufferedPipe) Close() error {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
@ -129,7 +126,7 @@ func (p *bufferedPipe) Close() error {
return nil
}
func (p *bufferedPipe) SetReadDeadline(t time.Time) {
func (p *streamBufferedPipe) SetReadDeadline(t time.Time) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
@ -137,7 +134,7 @@ func (p *bufferedPipe) SetReadDeadline(t time.Time) {
p.rwCond.Broadcast()
}
func (p *bufferedPipe) SetWriteToTimeout(d time.Duration) {
func (p *streamBufferedPipe) SetWriteToTimeout(d time.Duration) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()

@ -8,7 +8,7 @@ import (
)
func TestPipeRW(t *testing.T) {
pipe := NewBufferedPipe()
pipe := NewStreamBufferedPipe()
b := []byte{0x01, 0x02, 0x03}
n, err := pipe.Write(b)
if n != len(b) {
@ -57,7 +57,7 @@ func TestPipeRW(t *testing.T) {
}
func TestReadBlock(t *testing.T) {
pipe := NewBufferedPipe()
pipe := NewStreamBufferedPipe()
b := []byte{0x01, 0x02, 0x03}
go func() {
time.Sleep(100 * time.Millisecond)
@ -92,7 +92,7 @@ func TestReadBlock(t *testing.T) {
}
func TestPartialRead(t *testing.T) {
pipe := NewBufferedPipe()
pipe := NewStreamBufferedPipe()
b := []byte{0x01, 0x02, 0x03}
pipe.Write(b)
b1 := make([]byte, 1)
@ -148,7 +148,7 @@ func TestPartialRead(t *testing.T) {
}
func TestReadAfterClose(t *testing.T) {
pipe := NewBufferedPipe()
pipe := NewStreamBufferedPipe()
b := []byte{0x01, 0x02, 0x03}
pipe.Write(b)
b2 := make([]byte, len(b))
@ -184,7 +184,7 @@ func BenchmarkBufferedPipe_RW(b *testing.B) {
testData := make([]byte, PAYLOAD_LEN)
rand.Read(testData)
pipe := NewBufferedPipe()
pipe := NewStreamBufferedPipe()
smallBuf := make([]byte, PAYLOAD_LEN-10)
go func() {

@ -20,7 +20,12 @@ type switchboardConfig struct {
recvBufferSize int
}
// switchboard is responsible for keeping the reference of TCP connections between client and server
// switchboard is responsible for managing TCP connections between client and server.
// It has several purposes: constantly receiving incoming data from all connections
// and pass them to Session.recvDataFromRemote(); accepting data through
// switchboard.send(), in which it selects a connection according to its
// switchboardStrategy and send the data off using that; and counting, as well as
// rate limiting, data received and sent through its Valve.
type switchboard struct {
session *Session

Loading…
Cancel
Save