device: use channel close to shut down and drain outbound channel
This is a similar treatment to the handling of the encryption channel found a few commits ago: Use the closing of the channel to manage goroutine lifetime and shutdown. It is considerably simpler because there is only a single writer. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
		
							parent
							
								
									63066ce406
								
							
						
					
					
						commit
						2832e96339
					
				@ -17,7 +17,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	PeerRoutineNumber = 3
 | 
			
		||||
	PeerRoutineNumber = 2
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Peer struct {
 | 
			
		||||
@ -287,7 +287,6 @@ func (peer *Peer) Stop() {
 | 
			
		||||
 | 
			
		||||
	peer.queue.Lock()
 | 
			
		||||
	close(peer.queue.nonce)
 | 
			
		||||
	close(peer.queue.outbound)
 | 
			
		||||
	close(peer.queue.inbound)
 | 
			
		||||
	peer.queue.Unlock()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() {
 | 
			
		||||
		logDebug.Println(peer, "- Routine: nonce worker - stopped")
 | 
			
		||||
		peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
 | 
			
		||||
		device.queue.encryption.wg.Done() // no more writes from us
 | 
			
		||||
		close(peer.queue.outbound)        // no more writes to this channel
 | 
			
		||||
		peer.routines.stopping.Done()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() {
 | 
			
		||||
	logDebug := device.log.Debug
 | 
			
		||||
	logError := device.log.Error
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case elem, ok := <-peer.queue.outbound:
 | 
			
		||||
				if ok {
 | 
			
		||||
					elem.Lock()
 | 
			
		||||
					if !elem.IsDropped() {
 | 
			
		||||
						device.PutMessageBuffer(elem.buffer)
 | 
			
		||||
						elem.Drop()
 | 
			
		||||
					}
 | 
			
		||||
					device.PutOutboundElement(elem)
 | 
			
		||||
				}
 | 
			
		||||
			default:
 | 
			
		||||
				goto out
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	out:
 | 
			
		||||
		logDebug.Println(peer, "- Routine: sequential sender - stopped")
 | 
			
		||||
		peer.routines.stopping.Done()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	defer logDebug.Println(peer, "- Routine: sequential sender - stopped")
 | 
			
		||||
	logDebug.Println(peer, "- Routine: sequential sender - started")
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
 | 
			
		||||
		case <-peer.routines.stop:
 | 
			
		||||
			return
 | 
			
		||||
 | 
			
		||||
		case elem, ok := <-peer.queue.outbound:
 | 
			
		||||
 | 
			
		||||
			if !ok {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			elem.Lock()
 | 
			
		||||
			if elem.IsDropped() {
 | 
			
		||||
				device.PutOutboundElement(elem)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			peer.timersAnyAuthenticatedPacketTraversal()
 | 
			
		||||
			peer.timersAnyAuthenticatedPacketSent()
 | 
			
		||||
 | 
			
		||||
			// send message and return buffer to pool
 | 
			
		||||
 | 
			
		||||
			err := peer.SendBuffer(elem.packet)
 | 
			
		||||
			if len(elem.packet) != MessageKeepaliveSize {
 | 
			
		||||
				peer.timersDataSent()
 | 
			
		||||
			}
 | 
			
		||||
	for elem := range peer.queue.outbound {
 | 
			
		||||
		elem.Lock()
 | 
			
		||||
		if elem.IsDropped() {
 | 
			
		||||
			device.PutOutboundElement(elem)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if !peer.isRunning.Get() {
 | 
			
		||||
			// peer has been stopped; return re-usable elems to the shared pool.
 | 
			
		||||
			// This is an optimization only. It is possible for the peer to be stopped
 | 
			
		||||
			// immediately after this check, in which case, elem will get processed.
 | 
			
		||||
			// The timers and SendBuffer code are resilient to a few stragglers.
 | 
			
		||||
			// TODO(josharian): rework peer shutdown order to ensure
 | 
			
		||||
			// that we never accidentally keep timers alive longer than necessary.
 | 
			
		||||
			device.PutMessageBuffer(elem.buffer)
 | 
			
		||||
			device.PutOutboundElement(elem)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logError.Println(peer, "- Failed to send data packet", err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			peer.keepKeyFreshSending()
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		peer.timersAnyAuthenticatedPacketTraversal()
 | 
			
		||||
		peer.timersAnyAuthenticatedPacketSent()
 | 
			
		||||
 | 
			
		||||
		// send message and return buffer to pool
 | 
			
		||||
 | 
			
		||||
		err := peer.SendBuffer(elem.packet)
 | 
			
		||||
		if len(elem.packet) != MessageKeepaliveSize {
 | 
			
		||||
			peer.timersDataSent()
 | 
			
		||||
		}
 | 
			
		||||
		device.PutMessageBuffer(elem.buffer)
 | 
			
		||||
		device.PutOutboundElement(elem)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			logError.Println(peer, "- Failed to send data packet", err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		peer.keepKeyFreshSending()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user