|
|
|
@ -6,7 +6,6 @@ import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"io"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -16,7 +15,7 @@ import (
|
|
|
|
|
type datagramBuffer struct {
|
|
|
|
|
pLens []int
|
|
|
|
|
buf *bytes.Buffer
|
|
|
|
|
closed uint32
|
|
|
|
|
closed bool
|
|
|
|
|
rwCond *sync.Cond
|
|
|
|
|
rDeadline time.Time
|
|
|
|
|
}
|
|
|
|
@ -35,7 +34,7 @@ func (d *datagramBuffer) Read(target []byte) (int, error) {
|
|
|
|
|
d.buf = new(bytes.Buffer)
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 {
|
|
|
|
|
if d.closed && len(d.pLens) == 0 {
|
|
|
|
|
return 0, io.EOF
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -70,7 +69,7 @@ func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) {
|
|
|
|
|
d.buf = new(bytes.Buffer)
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 {
|
|
|
|
|
if d.closed && len(d.pLens) == 0 {
|
|
|
|
|
return 0, io.EOF
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -104,7 +103,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
|
|
|
|
|
d.buf = new(bytes.Buffer)
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
if atomic.LoadUint32(&d.closed) == 1 {
|
|
|
|
|
if d.closed {
|
|
|
|
|
return true, io.ErrClosedPipe
|
|
|
|
|
}
|
|
|
|
|
if d.buf.Len() <= BUF_SIZE_LIMIT {
|
|
|
|
@ -115,7 +114,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if f.Closing != C_NOOP {
|
|
|
|
|
atomic.StoreUint32(&d.closed, 1)
|
|
|
|
|
d.closed = true
|
|
|
|
|
d.rwCond.Broadcast()
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
@ -129,7 +128,10 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *datagramBuffer) Close() error {
|
|
|
|
|
atomic.StoreUint32(&d.closed, 1)
|
|
|
|
|
d.rwCond.L.Lock()
|
|
|
|
|
defer d.rwCond.L.Unlock()
|
|
|
|
|
|
|
|
|
|
d.closed = true
|
|
|
|
|
d.rwCond.Broadcast()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|