|
|
|
@ -310,27 +310,51 @@ func TestClosingStreamsFromProxy(t *testing.T) {
|
|
|
|
|
t.Fatal(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// closing stream on server side
|
|
|
|
|
clientConn, _ := pxyClientD.Dial("", "")
|
|
|
|
|
clientConn.Write(make([]byte, 16))
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
serverConn.Close()
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
if _, err := clientConn.Read(make([]byte, 16)); err == nil {
|
|
|
|
|
t.Errorf("closing stream on server side is not reflected to the client: %v", err)
|
|
|
|
|
}
|
|
|
|
|
t.Run("closing from server", func(t *testing.T) {
|
|
|
|
|
clientConn, _ := pxyClientD.Dial("", "")
|
|
|
|
|
clientConn.Write(make([]byte, 16))
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
serverConn.Close()
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
if _, err := clientConn.Read(make([]byte, 16)); err == nil {
|
|
|
|
|
t.Errorf("closing stream on server side is not reflected to the client: %v", err)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// closing stream on client side
|
|
|
|
|
clientConn, _ = pxyClientD.Dial("", "")
|
|
|
|
|
clientConn.Write(make([]byte, 16))
|
|
|
|
|
serverConn, _ = pxyServerL.Accept()
|
|
|
|
|
clientConn.Close()
|
|
|
|
|
t.Run("closing from client", func(t *testing.T) {
|
|
|
|
|
// closing stream on client side
|
|
|
|
|
clientConn, _ := pxyClientD.Dial("", "")
|
|
|
|
|
clientConn.Write(make([]byte, 16))
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
clientConn.Close()
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
if _, err := serverConn.Read(make([]byte, 16)); err == nil {
|
|
|
|
|
t.Errorf("closing stream on client side is not reflected to the server: %v", err)
|
|
|
|
|
}
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
if _, err := serverConn.Read(make([]byte, 16)); err == nil {
|
|
|
|
|
t.Errorf("closing stream on client side is not reflected to the server: %v", err)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
t.Run("send then close", func(t *testing.T) {
|
|
|
|
|
testData := make([]byte, 24*1024)
|
|
|
|
|
rand.Read(testData)
|
|
|
|
|
clientConn, _ := pxyClientD.Dial("", "")
|
|
|
|
|
go func() {
|
|
|
|
|
clientConn.Write(testData)
|
|
|
|
|
// TODO: this is time dependent. It could be due to the time it took for this
|
|
|
|
|
// connutil.StreamPipe's Close to be reflected on the copy function, instead of inherent bad sync
|
|
|
|
|
// in multiplexer
|
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
|
clientConn.Close()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
readBuf := make([]byte, len(testData))
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
_, err := io.ReadFull(serverConn, readBuf)
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Errorf("failed to read data sent before closing: %v", err)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func BenchmarkThroughput(b *testing.B) {
|
|
|
|
@ -357,39 +381,60 @@ func BenchmarkThroughput(b *testing.B) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.Run("single conn", func(b *testing.B) {
|
|
|
|
|
more := make(chan int, 100)
|
|
|
|
|
go func() {
|
|
|
|
|
writeBuf := make([]byte, bufSize+100)
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
io.Copy(ioutil.Discard, serverConn)
|
|
|
|
|
for {
|
|
|
|
|
serverConn.Write(writeBuf)
|
|
|
|
|
<-more
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
clientConn, _ := pxyClientD.Dial("", "")
|
|
|
|
|
writeBuf := make([]byte, bufSize)
|
|
|
|
|
readBuf := make([]byte, bufSize)
|
|
|
|
|
clientConn.Write([]byte{1}) // to make server accept
|
|
|
|
|
b.ResetTimer()
|
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
|
|
|
n, _ := clientConn.Write(writeBuf)
|
|
|
|
|
n, _ := clientConn.Read(readBuf)
|
|
|
|
|
b.SetBytes(int64(n))
|
|
|
|
|
more <- 0
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
b.Run("multi conn", func(b *testing.B) {
|
|
|
|
|
for i := 0; i < numConns; i++ {
|
|
|
|
|
go func() {
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
io.Copy(ioutil.Discard, serverConn)
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
conns := make([]net.Conn, numConns)
|
|
|
|
|
for i := 0; i < numConns; i++ {
|
|
|
|
|
conns[i], _ = pxyClientD.Dial("", "")
|
|
|
|
|
}
|
|
|
|
|
writeBuf := make([]byte, bufSize)
|
|
|
|
|
b.ResetTimer()
|
|
|
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
|
|
|
for pb.Next() {
|
|
|
|
|
n, _ := conns[rand.Intn(numConns)].Write(writeBuf)
|
|
|
|
|
b.SetBytes(int64(n))
|
|
|
|
|
/*
|
|
|
|
|
b.Run("multi conn", func(b *testing.B) {
|
|
|
|
|
var connsIds sync.Pool
|
|
|
|
|
conns := make([]net.Conn, numConns)
|
|
|
|
|
more := make([]chan int, numConns)
|
|
|
|
|
for i := 0; i < numConns; i++ {
|
|
|
|
|
conns[i], _ = pxyClientD.Dial("", "")
|
|
|
|
|
conns[i].Write([]byte{1}) // to make server accept
|
|
|
|
|
connsIds.Put(i)
|
|
|
|
|
moreChan := make(chan int, 100)
|
|
|
|
|
more[i] = moreChan
|
|
|
|
|
writeBuf := make([]byte, bufSize + 100)
|
|
|
|
|
go func() {
|
|
|
|
|
serverConn, _ := pxyServerL.Accept()
|
|
|
|
|
for {
|
|
|
|
|
serverConn.Write(writeBuf)
|
|
|
|
|
<- moreChan
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
b.SetParallelism(numConns)
|
|
|
|
|
b.ResetTimer()
|
|
|
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
|
|
|
buf := make([]byte, bufSize)
|
|
|
|
|
connNum := connsIds.Get().(int)
|
|
|
|
|
for pb.Next() {
|
|
|
|
|
n, _ := conns[connNum].Read(buf)
|
|
|
|
|
more[connNum] <- 0
|
|
|
|
|
b.SetBytes(int64(n))
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|