From 2832e96339b4b847172741e9252020fc7bfa59af Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Tue, 15 Dec 2020 15:54:48 -0800 Subject: [PATCH] device: use channel close to shut down and drain outbound channel This is a similar treatment to the handling of the encryption channel found a few commits ago: Use the closing of the channel to manage goroutine lifetime and shutdown. It is considerably simpler because there is only a single writer. Signed-off-by: Josh Bleecher Snyder --- device/peer.go | 3 +- device/send.go | 88 +++++++++++++++++++------------------------------- 2 files changed, 35 insertions(+), 56 deletions(-) diff --git a/device/peer.go b/device/peer.go index 31b75c7..c094160 100644 --- a/device/peer.go +++ b/device/peer.go @@ -17,7 +17,7 @@ import ( ) const ( - PeerRoutineNumber = 3 + PeerRoutineNumber = 2 ) type Peer struct { @@ -287,7 +287,6 @@ func (peer *Peer) Stop() { peer.queue.Lock() close(peer.queue.nonce) - close(peer.queue.outbound) close(peer.queue.inbound) peer.queue.Unlock() diff --git a/device/send.go b/device/send.go index 1b16edd..1f71f79 100644 --- a/device/send.go +++ b/device/send.go @@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() { logDebug.Println(peer, "- Routine: nonce worker - stopped") peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) device.queue.encryption.wg.Done() // no more writes from us + close(peer.queue.outbound) // no more writes to this channel peer.routines.stopping.Done() }() @@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() { logDebug := device.log.Debug logError := device.log.Error - defer func() { - for { - select { - case elem, ok := <-peer.queue.outbound: - if ok { - elem.Lock() - if !elem.IsDropped() { - device.PutMessageBuffer(elem.buffer) - elem.Drop() - } - device.PutOutboundElement(elem) - } - default: - goto out - } - } - out: - logDebug.Println(peer, "- Routine: sequential sender - stopped") - peer.routines.stopping.Done() - }() - + defer logDebug.Println(peer, "- Routine: sequential sender - stopped") logDebug.Println(peer, "- Routine: sequential sender - started") - for { - select { - - case <-peer.routines.stop: - return - - case elem, ok := <-peer.queue.outbound: - - if !ok { - return - } - - elem.Lock() - if elem.IsDropped() { - device.PutOutboundElement(elem) - continue - } - - peer.timersAnyAuthenticatedPacketTraversal() - peer.timersAnyAuthenticatedPacketSent() - - // send message and return buffer to pool - - err := peer.SendBuffer(elem.packet) - if len(elem.packet) != MessageKeepaliveSize { - peer.timersDataSent() - } + for elem := range peer.queue.outbound { + elem.Lock() + if elem.IsDropped() { + device.PutOutboundElement(elem) + continue + } + if !peer.isRunning.Get() { + // peer has been stopped; return re-usable elems to the shared pool. + // This is an optimization only. It is possible for the peer to be stopped + // immediately after this check, in which case, elem will get processed. + // The timers and SendBuffer code are resilient to a few stragglers. + // TODO(josharian): rework peer shutdown order to ensure + // that we never accidentally keep timers alive longer than necessary. device.PutMessageBuffer(elem.buffer) device.PutOutboundElement(elem) - if err != nil { - logError.Println(peer, "- Failed to send data packet", err) - continue - } - - peer.keepKeyFreshSending() + continue } + + peer.timersAnyAuthenticatedPacketTraversal() + peer.timersAnyAuthenticatedPacketSent() + + // send message and return buffer to pool + + err := peer.SendBuffer(elem.packet) + if len(elem.packet) != MessageKeepaliveSize { + peer.timersDataSent() + } + device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) + if err != nil { + logError.Println(peer, "- Failed to send data packet", err) + continue + } + + peer.keepKeyFreshSending() } }