wireguard-go/device/send.go

452 lines
12 KiB
Go
Raw Normal View History

2019-01-02 01:55:51 +01:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2022 WireGuard LLC. All Rights Reserved.
*/
2019-03-03 04:04:41 +01:00
package device
2017-06-26 13:14:02 +02:00
import (
"bytes"
"encoding/binary"
"errors"
2017-06-26 13:14:02 +02:00
"net"
"os"
2017-06-26 13:14:02 +02:00
"sync"
"time"
2019-05-14 09:09:52 +02:00
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
2017-06-26 13:14:02 +02:00
)
2017-12-01 23:37:26 +01:00
/* Outbound flow
2017-06-26 13:14:02 +02:00
*
* 1. TUN queue
* 2. Routing (sequential)
* 3. Nonce assignment (sequential)
* 4. Encryption (parallel)
* 5. Transmission (sequential)
2017-06-26 13:14:02 +02:00
*
2017-12-01 23:37:26 +01:00
* The functions in this file occur (roughly) in the order in
* which the packets are processed.
*
* Locking, Producers and Consumers
*
* The order of packets (per peer) must be maintained,
* but encryption of packets happen out-of-order:
*
* The sequential consumers will attempt to take the lock,
* workers release lock when they have completed work (encryption) on the packet.
*
* If the element is inserted into the "encryption queue",
2017-12-01 23:37:26 +01:00
* the content is preceded by enough "junk" to contain the transport header
2017-07-07 13:47:09 +02:00
* (to allow the construction of transport messages in-place)
*/
2017-12-01 23:37:26 +01:00
type QueueOutboundElement struct {
sync.Mutex
buffer *[MaxMessageSize]byte // slice holding the packet data
packet []byte // slice of "buffer" (always!)
nonce uint64 // nonce for encryption
2018-05-13 19:50:58 +02:00
keypair *Keypair // keypair for encryption
peer *Peer // related peer
2017-06-26 13:14:02 +02:00
}
func (device *Device) NewOutboundElement() *QueueOutboundElement {
2018-09-22 06:29:02 +02:00
elem := device.GetOutboundElement()
elem.buffer = device.GetMessageBuffer()
elem.Mutex = sync.Mutex{}
2018-09-22 06:29:02 +02:00
elem.nonce = 0
// keypair and peer were cleared (if necessary) by clearPointers.
return elem
}
// clearPointers clears elem fields that contain pointers.
// This makes the garbage collector's life easier and
// avoids accidentally keeping other objects around unnecessarily.
// It also reduces the possible collateral damage from use-after-free bugs.
func (elem *QueueOutboundElement) clearPointers() {
elem.buffer = nil
elem.packet = nil
2018-09-22 06:29:02 +02:00
elem.keypair = nil
elem.peer = nil
}
/* Queues a keepalive if no packets are queued for peer
*/
func (peer *Peer) SendKeepalive() {
if len(peer.queue.staged) == 0 && peer.isRunning.Load() {
elem := peer.device.NewOutboundElement()
select {
case peer.queue.staged <- elem:
peer.device.log.Verbosef("%v - Sending keepalive packet", peer)
default:
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
}
peer.SendStagedPackets()
}
func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
if !isRetry {
peer.timers.handshakeAttempts.Store(0)
}
2018-05-13 23:14:43 +02:00
peer.handshake.mutex.RLock()
if time.Since(peer.handshake.lastSentHandshake) < RekeyTimeout {
2018-05-13 23:14:43 +02:00
peer.handshake.mutex.RUnlock()
return nil
}
peer.handshake.mutex.RUnlock()
peer.handshake.mutex.Lock()
if time.Since(peer.handshake.lastSentHandshake) < RekeyTimeout {
2018-05-13 23:14:43 +02:00
peer.handshake.mutex.Unlock()
return nil
}
2018-05-13 23:14:43 +02:00
peer.handshake.lastSentHandshake = time.Now()
peer.handshake.mutex.Unlock()
peer.device.log.Verbosef("%v - Sending handshake initiation", peer)
msg, err := peer.device.CreateMessageInitiation(peer)
if err != nil {
peer.device.log.Errorf("%v - Failed to create initiation message: %v", peer, err)
return err
}
var buff [MessageInitiationSize]byte
writer := bytes.NewBuffer(buff[:0])
binary.Write(writer, binary.LittleEndian, msg)
packet := writer.Bytes()
2018-05-13 23:14:43 +02:00
peer.cookieGenerator.AddMacs(packet)
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
2018-05-13 23:14:43 +02:00
err = peer.SendBuffer(packet)
if err != nil {
peer.device.log.Errorf("%v - Failed to send handshake initiation: %v", peer, err)
2018-05-13 23:14:43 +02:00
}
peer.timersHandshakeInitiated()
2018-05-13 23:14:43 +02:00
return err
}
func (peer *Peer) SendHandshakeResponse() error {
peer.handshake.mutex.Lock()
peer.handshake.lastSentHandshake = time.Now()
peer.handshake.mutex.Unlock()
peer.device.log.Verbosef("%v - Sending handshake response", peer)
2018-05-13 23:14:43 +02:00
response, err := peer.device.CreateMessageResponse(peer)
if err != nil {
peer.device.log.Errorf("%v - Failed to create response message: %v", peer, err)
2018-05-13 23:14:43 +02:00
return err
}
var buff [MessageResponseSize]byte
writer := bytes.NewBuffer(buff[:0])
binary.Write(writer, binary.LittleEndian, response)
packet := writer.Bytes()
peer.cookieGenerator.AddMacs(packet)
err = peer.BeginSymmetricSession()
if err != nil {
peer.device.log.Errorf("%v - Failed to derive keypair: %v", peer, err)
2018-05-13 23:14:43 +02:00
return err
}
peer.timersSessionDerived()
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
2018-05-13 23:14:43 +02:00
err = peer.SendBuffer(packet)
if err != nil {
peer.device.log.Errorf("%v - Failed to send handshake response: %v", peer, err)
2018-05-13 23:14:43 +02:00
}
return err
}
func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement) error {
device.log.Verbosef("Sending cookie response for denied handshake message for %v", initiatingElem.endpoint.DstToString())
2018-05-13 23:14:43 +02:00
sender := binary.LittleEndian.Uint32(initiatingElem.packet[4:8])
reply, err := device.cookieChecker.CreateReply(initiatingElem.packet, sender, initiatingElem.endpoint.DstToBytes())
if err != nil {
device.log.Errorf("Failed to create cookie reply: %v", err)
2018-05-13 23:14:43 +02:00
return err
}
var buff [MessageCookieReplySize]byte
writer := bytes.NewBuffer(buff[:0])
binary.Write(writer, binary.LittleEndian, reply)
device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint)
return nil
}
func (peer *Peer) keepKeyFreshSending() {
2018-05-13 23:14:43 +02:00
keypair := peer.keypairs.Current()
if keypair == nil {
return
}
nonce := keypair.sendNonce.Load()
if nonce > RekeyAfterMessages || (keypair.isInitiator && time.Since(keypair.created) > RekeyAfterTime) {
peer.SendHandshakeInitiation(false)
}
}
/* Reads packets from the TUN and inserts
* into staged queue for peer
*
* Obs. Single instance per TUN device
*/
2017-08-04 16:15:53 +02:00
func (device *Device) RoutineReadFromTUN() {
defer func() {
device.log.Verbosef("Routine: TUN reader - stopped")
device.state.stopping.Done()
device.queue.encryption.wg.Done()
}()
device.log.Verbosef("Routine: TUN reader - started")
2018-09-22 06:29:02 +02:00
var elem *QueueOutboundElement
for {
2018-09-22 06:29:02 +02:00
if elem != nil {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
elem = device.NewOutboundElement()
2017-06-26 13:14:02 +02:00
2017-08-25 14:53:23 +02:00
// read packet
offset := MessageTransportHeaderSize
size, err := device.tun.device.Read(elem.buffer[:], offset)
if err != nil {
if !device.isClosed() {
if !errors.Is(err, os.ErrClosed) {
device.log.Errorf("Failed to read packet from TUN device: %v", err)
}
go device.Close()
}
device.PutMessageBuffer(elem.buffer)
2018-09-22 06:29:02 +02:00
device.PutOutboundElement(elem)
return
}
2017-08-25 14:53:23 +02:00
if size == 0 || size > MaxContentSize {
continue
}
2017-06-26 13:14:02 +02:00
elem.packet = elem.buffer[offset : offset+size]
2017-08-04 16:15:53 +02:00
// lookup peer
2017-06-26 13:14:02 +02:00
var peer *Peer
switch elem.packet[0] >> 4 {
case ipv4.Version:
2017-08-04 16:15:53 +02:00
if len(elem.packet) < ipv4.HeaderLen {
continue
}
dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
peer = device.allowedips.Lookup(dst)
2017-06-26 13:14:02 +02:00
case ipv6.Version:
2017-08-04 16:15:53 +02:00
if len(elem.packet) < ipv6.HeaderLen {
continue
}
dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
peer = device.allowedips.Lookup(dst)
2017-06-26 13:14:02 +02:00
default:
device.log.Verbosef("Received packet with unknown IP version")
}
if peer == nil {
continue
}
if peer.isRunning.Load() {
peer.StagePacket(elem)
2018-09-22 06:29:02 +02:00
elem = nil
peer.SendStagedPackets()
}
2017-06-26 13:14:02 +02:00
}
}
func (peer *Peer) StagePacket(elem *QueueOutboundElement) {
for {
select {
case peer.queue.staged <- elem:
return
default:
}
select {
case tooOld := <-peer.queue.staged:
peer.device.PutMessageBuffer(tooOld.buffer)
peer.device.PutOutboundElement(tooOld)
default:
}
}
}
func (peer *Peer) SendStagedPackets() {
top:
if len(peer.queue.staged) == 0 || !peer.device.isUp() {
return
}
2018-09-24 01:52:02 +02:00
keypair := peer.keypairs.Current()
if keypair == nil || keypair.sendNonce.Load() >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime {
peer.SendHandshakeInitiation(false)
return
}
for {
select {
case elem := <-peer.queue.staged:
elem.peer = peer
elem.nonce = keypair.sendNonce.Add(1) - 1
if elem.nonce >= RejectAfterMessages {
keypair.sendNonce.Store(RejectAfterMessages)
peer.StagePacket(elem) // XXX: Out of order, but we can't front-load go chans
goto top
}
2018-05-13 18:23:40 +02:00
elem.keypair = keypair
elem.Lock()
// add to parallel and sequential queue
if peer.isRunning.Load() {
peer.queue.outbound.c <- elem
peer.device.queue.encryption.c <- elem
} else {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
default:
return
}
}
}
func (peer *Peer) FlushStagedPackets() {
for {
select {
case elem := <-peer.queue.staged:
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
default:
return
2017-06-26 13:14:02 +02:00
}
}
2017-06-26 13:14:02 +02:00
}
func calculatePaddingSize(packetSize, mtu int) int {
lastUnit := packetSize
if mtu == 0 {
return ((lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1)) - lastUnit
}
if lastUnit > mtu {
lastUnit %= mtu
}
paddedSize := ((lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1))
if paddedSize > mtu {
paddedSize = mtu
}
return paddedSize - lastUnit
}
/* Encrypts the elements in the queue
* and marks them for sequential consumption (by releasing the mutex)
*
* Obs. One instance per core
*/
func (device *Device) RoutineEncryption(id int) {
var paddingZeros [PaddingMultiple]byte
var nonce [chacha20poly1305.NonceSize]byte
2017-07-17 16:16:18 +02:00
defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
device.log.Verbosef("Routine: encryption worker %d - started", id)
2017-07-17 16:16:18 +02:00
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
for elem := range device.queue.encryption.c {
// populate header fields
header := elem.buffer[:MessageTransportHeaderSize]
2017-06-26 13:14:02 +02:00
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
fieldType := header[0:4]
fieldReceiver := header[4:8]
fieldNonce := header[8:16]
2017-07-02 15:28:38 +02:00
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
// pad content to multiple of 16
paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
2017-07-02 15:28:38 +02:00
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
// encrypt content and release to consumer
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
elem.packet = elem.keypair.send.Seal(
header,
nonce[:],
elem.packet,
nil,
)
elem.Unlock()
}
}
/* Sequentially reads packets from queue and sends to endpoint
*
* Obs. Single instance per peer.
* The routine terminates then the outbound queue is closed.
*/
func (peer *Peer) RoutineSequentialSender() {
device := peer.device
defer func() {
defer device.log.Verbosef("%v - Routine: sequential sender - stopped", peer)
peer.stopping.Done()
}()
device.log.Verbosef("%v - Routine: sequential sender - started", peer)
for elem := range peer.queue.outbound.c {
device: remove mutex from Peer send/receive The immediate motivation for this change is an observed deadlock. 1. A goroutine calls peer.Stop. That calls peer.queue.Lock(). 2. Another goroutine is in RoutineSequentialReceiver. It receives an elem from peer.queue.inbound. 3. The peer.Stop goroutine calls close(peer.queue.inbound), close(peer.queue.outbound), and peer.stopping.Wait(). It blocks waiting for RoutineSequentialReceiver and RoutineSequentialSender to exit. 4. The RoutineSequentialReceiver goroutine calls peer.SendStagedPackets(). SendStagedPackets attempts peer.queue.RLock(). That blocks forever because the peer.Stop goroutine holds a write lock on that mutex. A background motivation for this change is that it can be expensive to have a mutex in the hot code path of RoutineSequential*. The mutex was necessary to avoid attempting to send elems on a closed channel. This commit removes that danger by never closing the channel. Instead, we send a sentinel nil value on the channel to indicate to the receiver that it should exit. The only problem with this is that if the receiver exits, we could write an elem into the channel which would never get received. If it never gets received, it cannot get returned to the device pools. To work around this, we use a finalizer. When the channel can be GC'd, the finalizer drains any remaining elements from the channel and restores them to the device pool. After that change, peer.queue.RWMutex no longer makes sense where it is. It is only used to prevent concurrent calls to Start and Stop. Move it to a more sensible location and make it a plain sync.Mutex. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2021-02-08 22:02:52 +01:00
if elem == nil {
return
}
elem.Lock()
if !peer.isRunning.Load() {
// peer has been stopped; return re-usable elems to the shared pool.
// This is an optimization only. It is possible for the peer to be stopped
// immediately after this check, in which case, elem will get processed.
// The timers and SendBuffer code are resilient to a few stragglers.
// TODO: rework peer shutdown order to ensure
// that we never accidentally keep timers alive longer than necessary.
2017-07-27 23:45:37 +02:00
device.PutMessageBuffer(elem.buffer)
2018-09-22 06:29:02 +02:00
device.PutOutboundElement(elem)
continue
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
// send message and return buffer to pool
err := peer.SendBuffer(elem.packet)
if len(elem.packet) != MessageKeepaliveSize {
peer.timersDataSent()
}
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
if err != nil {
device.log.Errorf("%v - Failed to send data packet: %v", peer, err)
continue
}
peer.keepKeyFreshSending()
2017-06-26 13:14:02 +02:00
}
}