|
|
|
@ -66,6 +66,8 @@ type Session struct {
|
|
|
|
|
|
|
|
|
|
streamsM sync.Mutex
|
|
|
|
|
streams map[uint32]*Stream
|
|
|
|
|
// For accepting new streams
|
|
|
|
|
acceptCh chan *Stream
|
|
|
|
|
|
|
|
|
|
// a pool of heap allocated frame objects so we don't have to allocate a new one each time we receive a frame
|
|
|
|
|
recvFramePool sync.Pool
|
|
|
|
@ -78,9 +80,6 @@ type Session struct {
|
|
|
|
|
// Used for LocalAddr() and RemoteAddr() etc.
|
|
|
|
|
addrs atomic.Value
|
|
|
|
|
|
|
|
|
|
// For accepting new streams
|
|
|
|
|
acceptCh chan *Stream
|
|
|
|
|
|
|
|
|
|
closed uint32
|
|
|
|
|
|
|
|
|
|
terminalMsg atomic.Value
|
|
|
|
@ -181,7 +180,7 @@ func (sesh *Session) Accept() (net.Conn, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (sesh *Session) closeStream(s *Stream, active bool) error {
|
|
|
|
|
if atomic.SwapUint32(&s.closed, 1) == 1 {
|
|
|
|
|
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
|
|
|
|
|
return fmt.Errorf("closing stream %v: %w", s.id, errRepeatStreamClosing)
|
|
|
|
|
}
|
|
|
|
|
_ = s.getRecvBuf().Close() // recvBuf.Close should not return error
|
|
|
|
@ -244,6 +243,10 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sesh.streamsM.Lock()
|
|
|
|
|
if sesh.IsClosed() {
|
|
|
|
|
sesh.streamsM.Unlock()
|
|
|
|
|
return ErrBrokenSession
|
|
|
|
|
}
|
|
|
|
|
existingStream, existing := sesh.streams[frame.StreamID]
|
|
|
|
|
if existing {
|
|
|
|
|
sesh.streamsM.Unlock()
|
|
|
|
@ -255,10 +258,10 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
|
|
|
|
|
} else {
|
|
|
|
|
newStream := makeStream(sesh, frame.StreamID)
|
|
|
|
|
sesh.streams[frame.StreamID] = newStream
|
|
|
|
|
sesh.acceptCh <- newStream
|
|
|
|
|
sesh.streamsM.Unlock()
|
|
|
|
|
// new stream
|
|
|
|
|
sesh.streamCountIncr()
|
|
|
|
|
sesh.acceptCh <- newStream
|
|
|
|
|
return newStream.recvFrame(frame)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -276,14 +279,14 @@ func (sesh *Session) TerminalMsg() string {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (sesh *Session) closeSession(closeSwitchboard bool) error {
|
|
|
|
|
if atomic.SwapUint32(&sesh.closed, 1) == 1 {
|
|
|
|
|
func (sesh *Session) closeSession() error {
|
|
|
|
|
if !atomic.CompareAndSwapUint32(&sesh.closed, 0, 1) {
|
|
|
|
|
log.Debugf("session %v has already been closed", sesh.id)
|
|
|
|
|
return errRepeatSessionClosing
|
|
|
|
|
}
|
|
|
|
|
sesh.acceptCh <- nil
|
|
|
|
|
|
|
|
|
|
sesh.streamsM.Lock()
|
|
|
|
|
close(sesh.acceptCh)
|
|
|
|
|
for id, stream := range sesh.streams {
|
|
|
|
|
if stream == nil {
|
|
|
|
|
continue
|
|
|
|
@ -294,26 +297,23 @@ func (sesh *Session) closeSession(closeSwitchboard bool) error {
|
|
|
|
|
sesh.streamCountDecr()
|
|
|
|
|
}
|
|
|
|
|
sesh.streamsM.Unlock()
|
|
|
|
|
|
|
|
|
|
if closeSwitchboard {
|
|
|
|
|
sesh.sb.closeAll()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (sesh *Session) passiveClose() error {
|
|
|
|
|
log.Debugf("attempting to passively close session %v", sesh.id)
|
|
|
|
|
err := sesh.closeSession(true)
|
|
|
|
|
err := sesh.closeSession()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
sesh.sb.closeAll()
|
|
|
|
|
log.Debugf("session %v closed gracefully", sesh.id)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (sesh *Session) Close() error {
|
|
|
|
|
log.Debugf("attempting to actively close session %v", sesh.id)
|
|
|
|
|
err := sesh.closeSession(false)
|
|
|
|
|
err := sesh.closeSession()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -339,7 +339,6 @@ func (sesh *Session) Close() error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sesh.sb.closeAll()
|
|
|
|
|
log.Debugf("session %v closed gracefully", sesh.id)
|
|
|
|
|
return nil
|
|
|
|
|