wireguard-go/device/send.go

547 lines
15 KiB
Go
Raw Normal View History

2019-01-01 19:55:51 -05:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/
2019-03-02 22:04:41 -05:00
package device
2017-06-26 07:14:02 -04:00
import (
"bytes"
"encoding/binary"
"errors"
2017-06-26 07:14:02 -04:00
"net"
"os"
2017-06-26 07:14:02 -04:00
"sync"
"time"
2019-05-14 03:09:52 -04:00
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
2024-01-07 14:03:11 -05:00
"gitea.hbanafa.com/hesham/wireguard-go/conn"
"gitea.hbanafa.com/hesham/wireguard-go/tun"
2017-06-26 07:14:02 -04:00
)
2017-12-01 17:37:26 -05:00
/* Outbound flow
2017-06-26 07:14:02 -04:00
*
* 1. TUN queue
* 2. Routing (sequential)
* 3. Nonce assignment (sequential)
* 4. Encryption (parallel)
* 5. Transmission (sequential)
2017-06-26 07:14:02 -04:00
*
2017-12-01 17:37:26 -05:00
* The functions in this file occur (roughly) in the order in
* which the packets are processed.
*
* Locking, Producers and Consumers
*
* The order of packets (per peer) must be maintained,
* but encryption of packets happen out-of-order:
*
* The sequential consumers will attempt to take the lock,
* workers release lock when they have completed work (encryption) on the packet.
*
* If the element is inserted into the "encryption queue",
2017-12-01 17:37:26 -05:00
* the content is preceded by enough "junk" to contain the transport header
2017-07-07 07:47:09 -04:00
* (to allow the construction of transport messages in-place)
*/
2017-12-01 17:37:26 -05:00
type QueueOutboundElement struct {
buffer *[MaxMessageSize]byte // slice holding the packet data
packet []byte // slice of "buffer" (always!)
nonce uint64 // nonce for encryption
2018-05-13 13:50:58 -04:00
keypair *Keypair // keypair for encryption
peer *Peer // related peer
2017-06-26 07:14:02 -04:00
}
type QueueOutboundElementsContainer struct {
sync.Mutex
elems []*QueueOutboundElement
}
func (device *Device) NewOutboundElement() *QueueOutboundElement {
2018-09-22 00:29:02 -04:00
elem := device.GetOutboundElement()
elem.buffer = device.GetMessageBuffer()
elem.nonce = 0
// keypair and peer were cleared (if necessary) by clearPointers.
return elem
}
// clearPointers clears elem fields that contain pointers.
// This makes the garbage collector's life easier and
// avoids accidentally keeping other objects around unnecessarily.
// It also reduces the possible collateral damage from use-after-free bugs.
func (elem *QueueOutboundElement) clearPointers() {
elem.buffer = nil
elem.packet = nil
2018-09-22 00:29:02 -04:00
elem.keypair = nil
elem.peer = nil
}
/* Queues a keepalive if no packets are queued for peer
*/
func (peer *Peer) SendKeepalive() {
if len(peer.queue.staged) == 0 && peer.isRunning.Load() {
elem := peer.device.NewOutboundElement()
elemsContainer := peer.device.GetOutboundElementsContainer()
elemsContainer.elems = append(elemsContainer.elems, elem)
select {
case peer.queue.staged <- elemsContainer:
peer.device.log.Verbosef("%v - Sending keepalive packet", peer)
default:
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
peer.device.PutOutboundElementsContainer(elemsContainer)
}
}
peer.SendStagedPackets()
}
func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
if !isRetry {
peer.timers.handshakeAttempts.Store(0)
}
2018-05-13 17:14:43 -04:00
peer.handshake.mutex.RLock()
if time.Since(peer.handshake.lastSentHandshake) < RekeyTimeout {
2018-05-13 17:14:43 -04:00
peer.handshake.mutex.RUnlock()
return nil
}
peer.handshake.mutex.RUnlock()
peer.handshake.mutex.Lock()
if time.Since(peer.handshake.lastSentHandshake) < RekeyTimeout {
2018-05-13 17:14:43 -04:00
peer.handshake.mutex.Unlock()
return nil
}
2018-05-13 17:14:43 -04:00
peer.handshake.lastSentHandshake = time.Now()
peer.handshake.mutex.Unlock()
peer.device.log.Verbosef("%v - Sending handshake initiation", peer)
msg, err := peer.device.CreateMessageInitiation(peer)
if err != nil {
peer.device.log.Errorf("%v - Failed to create initiation message: %v", peer, err)
return err
}
var buf [MessageInitiationSize]byte
writer := bytes.NewBuffer(buf[:0])
binary.Write(writer, binary.LittleEndian, msg)
packet := writer.Bytes()
2018-05-13 17:14:43 -04:00
peer.cookieGenerator.AddMacs(packet)
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
2018-05-13 17:14:43 -04:00
err = peer.SendBuffers([][]byte{packet})
2018-05-13 17:14:43 -04:00
if err != nil {
peer.device.log.Errorf("%v - Failed to send handshake initiation: %v", peer, err)
2018-05-13 17:14:43 -04:00
}
peer.timersHandshakeInitiated()
2018-05-13 17:14:43 -04:00
return err
}
func (peer *Peer) SendHandshakeResponse() error {
peer.handshake.mutex.Lock()
peer.handshake.lastSentHandshake = time.Now()
peer.handshake.mutex.Unlock()
peer.device.log.Verbosef("%v - Sending handshake response", peer)
2018-05-13 17:14:43 -04:00
response, err := peer.device.CreateMessageResponse(peer)
if err != nil {
peer.device.log.Errorf("%v - Failed to create response message: %v", peer, err)
2018-05-13 17:14:43 -04:00
return err
}
var buf [MessageResponseSize]byte
writer := bytes.NewBuffer(buf[:0])
2018-05-13 17:14:43 -04:00
binary.Write(writer, binary.LittleEndian, response)
packet := writer.Bytes()
peer.cookieGenerator.AddMacs(packet)
err = peer.BeginSymmetricSession()
if err != nil {
peer.device.log.Errorf("%v - Failed to derive keypair: %v", peer, err)
2018-05-13 17:14:43 -04:00
return err
}
peer.timersSessionDerived()
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
2018-05-13 17:14:43 -04:00
// TODO: allocation could be avoided
err = peer.SendBuffers([][]byte{packet})
2018-05-13 17:14:43 -04:00
if err != nil {
peer.device.log.Errorf("%v - Failed to send handshake response: %v", peer, err)
2018-05-13 17:14:43 -04:00
}
return err
}
func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement) error {
device.log.Verbosef("Sending cookie response for denied handshake message for %v", initiatingElem.endpoint.DstToString())
2018-05-13 17:14:43 -04:00
sender := binary.LittleEndian.Uint32(initiatingElem.packet[4:8])
reply, err := device.cookieChecker.CreateReply(initiatingElem.packet, sender, initiatingElem.endpoint.DstToBytes())
if err != nil {
device.log.Errorf("Failed to create cookie reply: %v", err)
2018-05-13 17:14:43 -04:00
return err
}
var buf [MessageCookieReplySize]byte
writer := bytes.NewBuffer(buf[:0])
2018-05-13 17:14:43 -04:00
binary.Write(writer, binary.LittleEndian, reply)
// TODO: allocation could be avoided
device.net.bind.Send([][]byte{writer.Bytes()}, initiatingElem.endpoint)
return nil
}
func (peer *Peer) keepKeyFreshSending() {
2018-05-13 17:14:43 -04:00
keypair := peer.keypairs.Current()
if keypair == nil {
return
}
nonce := keypair.sendNonce.Load()
if nonce > RekeyAfterMessages || (keypair.isInitiator && time.Since(keypair.created) > RekeyAfterTime) {
peer.SendHandshakeInitiation(false)
}
}
2017-08-04 10:15:53 -04:00
func (device *Device) RoutineReadFromTUN() {
defer func() {
device.log.Verbosef("Routine: TUN reader - stopped")
device.state.stopping.Done()
device.queue.encryption.wg.Done()
}()
device.log.Verbosef("Routine: TUN reader - started")
var (
batchSize = device.BatchSize()
readErr error
elems = make([]*QueueOutboundElement, batchSize)
bufs = make([][]byte, batchSize)
elemsByPeer = make(map[*Peer]*QueueOutboundElementsContainer, batchSize)
count = 0
sizes = make([]int, batchSize)
offset = MessageTransportHeaderSize
)
for i := range elems {
elems[i] = device.NewOutboundElement()
bufs[i] = elems[i].buffer[:]
}
2018-09-22 00:29:02 -04:00
defer func() {
for _, elem := range elems {
if elem != nil {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
2018-09-22 00:29:02 -04:00
}
}()
for {
// read packets
count, readErr = device.tun.device.Read(bufs, sizes, offset)
for i := 0; i < count; i++ {
if sizes[i] < 1 {
continue
}
elem := elems[i]
elem.packet = bufs[i][offset : offset+sizes[i]]
2017-06-26 07:14:02 -04:00
// lookup peer
var peer *Peer
switch elem.packet[0] >> 4 {
case 4:
if len(elem.packet) < ipv4.HeaderLen {
continue
}
dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
peer = device.allowedips.Lookup(dst)
2017-08-04 10:15:53 -04:00
case 6:
if len(elem.packet) < ipv6.HeaderLen {
continue
}
dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
peer = device.allowedips.Lookup(dst)
2017-06-26 07:14:02 -04:00
default:
device.log.Verbosef("Received packet with unknown IP version")
2017-08-04 10:15:53 -04:00
}
2017-06-26 07:14:02 -04:00
if peer == nil {
2017-08-04 10:15:53 -04:00
continue
}
elemsForPeer, ok := elemsByPeer[peer]
if !ok {
elemsForPeer = device.GetOutboundElementsContainer()
elemsByPeer[peer] = elemsForPeer
}
elemsForPeer.elems = append(elemsForPeer.elems, elem)
elems[i] = device.NewOutboundElement()
bufs[i] = elems[i].buffer[:]
}
for peer, elemsForPeer := range elemsByPeer {
if peer.isRunning.Load() {
peer.StagePackets(elemsForPeer)
peer.SendStagedPackets()
} else {
for _, elem := range elemsForPeer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
device.PutOutboundElementsContainer(elemsForPeer)
}
delete(elemsByPeer, peer)
}
if readErr != nil {
if errors.Is(readErr, tun.ErrTooManySegments) {
// TODO: record stat for this
// This will happen if MSS is surprisingly small (< 576)
// coincident with reasonably high throughput.
device.log.Verbosef("Dropped some packets from multi-segment read: %v", readErr)
continue
}
if !device.isClosed() {
if !errors.Is(readErr, os.ErrClosed) {
device.log.Errorf("Failed to read packet from TUN device: %v", readErr)
}
go device.Close()
}
return
}
2017-06-26 07:14:02 -04:00
}
}
func (peer *Peer) StagePackets(elems *QueueOutboundElementsContainer) {
for {
select {
case peer.queue.staged <- elems:
return
default:
}
select {
case tooOld := <-peer.queue.staged:
for _, elem := range tooOld.elems {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
peer.device.PutOutboundElementsContainer(tooOld)
default:
}
}
}
func (peer *Peer) SendStagedPackets() {
top:
if len(peer.queue.staged) == 0 || !peer.device.isUp() {
return
}
2018-09-23 19:52:02 -04:00
keypair := peer.keypairs.Current()
if keypair == nil || keypair.sendNonce.Load() >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime {
peer.SendHandshakeInitiation(false)
return
}
for {
var elemsContainerOOO *QueueOutboundElementsContainer
select {
case elemsContainer := <-peer.queue.staged:
i := 0
for _, elem := range elemsContainer.elems {
elem.peer = peer
elem.nonce = keypair.sendNonce.Add(1) - 1
if elem.nonce >= RejectAfterMessages {
keypair.sendNonce.Store(RejectAfterMessages)
if elemsContainerOOO == nil {
elemsContainerOOO = peer.device.GetOutboundElementsContainer()
}
elemsContainerOOO.elems = append(elemsContainerOOO.elems, elem)
continue
} else {
elemsContainer.elems[i] = elem
i++
}
elem.keypair = keypair
}
elemsContainer.Lock()
elemsContainer.elems = elemsContainer.elems[:i]
if elemsContainerOOO != nil {
peer.StagePackets(elemsContainerOOO) // XXX: Out of order, but we can't front-load go chans
}
if len(elemsContainer.elems) == 0 {
peer.device.PutOutboundElementsContainer(elemsContainer)
goto top
}
// add to parallel and sequential queue
if peer.isRunning.Load() {
peer.queue.outbound.c <- elemsContainer
peer.device.queue.encryption.c <- elemsContainer
} else {
for _, elem := range elemsContainer.elems {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
peer.device.PutOutboundElementsContainer(elemsContainer)
}
if elemsContainerOOO != nil {
goto top
}
default:
return
}
}
}
func (peer *Peer) FlushStagedPackets() {
for {
select {
case elemsContainer := <-peer.queue.staged:
for _, elem := range elemsContainer.elems {
peer.device.PutMessageBuffer(elem.buffer)
peer.device.PutOutboundElement(elem)
}
peer.device.PutOutboundElementsContainer(elemsContainer)
default:
return
2017-06-26 07:14:02 -04:00
}
}
2017-06-26 07:14:02 -04:00
}
func calculatePaddingSize(packetSize, mtu int) int {
lastUnit := packetSize
if mtu == 0 {
return ((lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1)) - lastUnit
}
if lastUnit > mtu {
lastUnit %= mtu
}
paddedSize := ((lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1))
if paddedSize > mtu {
paddedSize = mtu
}
return paddedSize - lastUnit
}
/* Encrypts the elements in the queue
* and marks them for sequential consumption (by releasing the mutex)
*
* Obs. One instance per core
*/
func (device *Device) RoutineEncryption(id int) {
var paddingZeros [PaddingMultiple]byte
var nonce [chacha20poly1305.NonceSize]byte
2017-07-17 10:16:18 -04:00
defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
device.log.Verbosef("Routine: encryption worker %d - started", id)
2017-07-17 10:16:18 -04:00
for elemsContainer := range device.queue.encryption.c {
for _, elem := range elemsContainer.elems {
device: distribute crypto work as slice of elements After reducing UDP stack traversal overhead via GSO and GRO, runtime.chanrecv() began to account for a high percentage (20% in one environment) of perf samples during a throughput benchmark. The individual packet channel ops with the crypto goroutines was the primary contributor to this overhead. Updating these channels to pass vectors, which the device package already handles at its ends, reduced this overhead substantially, and improved throughput. The iperf3 results below demonstrate the effect of this commit between two Linux computers with i5-12400 CPUs. There is roughly ~13us of round trip latency between them. The first result is with UDP GSO and GRO, and with single element channels. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 sender [ 5] 0.00-10.04 sec 12.3 GBytes 10.6 Gbits/sec receiver The second result is with channels updated to pass a slice of elements. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 13.2 GBytes 11.3 Gbits/sec 182 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 13.2 GBytes 11.3 Gbits/sec 182 sender [ 5] 0.00-10.04 sec 13.2 GBytes 11.3 Gbits/sec receiver Reviewed-by: Adrian Dewhurst <adrian@tailscale.com> Signed-off-by: Jordan Whited <jordan@tailscale.com> Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
2023-10-02 17:41:04 -04:00
// populate header fields
header := elem.buffer[:MessageTransportHeaderSize]
fieldType := header[0:4]
fieldReceiver := header[4:8]
fieldNonce := header[8:16]
binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
// pad content to multiple of 16
paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
// encrypt content and release to consumer
binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
elem.packet = elem.keypair.send.Seal(
header,
nonce[:],
elem.packet,
nil,
)
}
elemsContainer.Unlock()
}
}
func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
device := peer.device
defer func() {
defer device.log.Verbosef("%v - Routine: sequential sender - stopped", peer)
peer.stopping.Done()
}()
device.log.Verbosef("%v - Routine: sequential sender - started", peer)
bufs := make([][]byte, 0, maxBatchSize)
for elemsContainer := range peer.queue.outbound.c {
bufs = bufs[:0]
if elemsContainer == nil {
device: remove mutex from Peer send/receive The immediate motivation for this change is an observed deadlock. 1. A goroutine calls peer.Stop. That calls peer.queue.Lock(). 2. Another goroutine is in RoutineSequentialReceiver. It receives an elem from peer.queue.inbound. 3. The peer.Stop goroutine calls close(peer.queue.inbound), close(peer.queue.outbound), and peer.stopping.Wait(). It blocks waiting for RoutineSequentialReceiver and RoutineSequentialSender to exit. 4. The RoutineSequentialReceiver goroutine calls peer.SendStagedPackets(). SendStagedPackets attempts peer.queue.RLock(). That blocks forever because the peer.Stop goroutine holds a write lock on that mutex. A background motivation for this change is that it can be expensive to have a mutex in the hot code path of RoutineSequential*. The mutex was necessary to avoid attempting to send elems on a closed channel. This commit removes that danger by never closing the channel. Instead, we send a sentinel nil value on the channel to indicate to the receiver that it should exit. The only problem with this is that if the receiver exits, we could write an elem into the channel which would never get received. If it never gets received, it cannot get returned to the device pools. To work around this, we use a finalizer. When the channel can be GC'd, the finalizer drains any remaining elements from the channel and restores them to the device pool. After that change, peer.queue.RWMutex no longer makes sense where it is. It is only used to prevent concurrent calls to Start and Stop. Move it to a more sensible location and make it a plain sync.Mutex. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2021-02-08 16:02:52 -05:00
return
}
if !peer.isRunning.Load() {
// peer has been stopped; return re-usable elems to the shared pool.
// This is an optimization only. It is possible for the peer to be stopped
// immediately after this check, in which case, elem will get processed.
// The timers and SendBuffers code are resilient to a few stragglers.
// TODO: rework peer shutdown order to ensure
// that we never accidentally keep timers alive longer than necessary.
elemsContainer.Lock()
for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
continue
}
dataSent := false
elemsContainer.Lock()
for _, elem := range elemsContainer.elems {
if len(elem.packet) != MessageKeepaliveSize {
dataSent = true
}
bufs = append(bufs, elem.packet)
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
err := peer.SendBuffers(bufs)
if dataSent {
peer.timersDataSent()
}
for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
device.PutOutboundElementsContainer(elemsContainer)
conn, device: use UDP GSO and GRO on Linux StdNetBind probes for UDP GSO and GRO support at runtime. UDP GSO is dependent on checksum offload support on the egress netdev. UDP GSO will be disabled in the event sendmmsg() returns EIO, which is a strong signal that the egress netdev does not support checksum offload. The iperf3 results below demonstrate the effect of this commit between two Linux computers with i5-12400 CPUs. There is roughly ~13us of round trip latency between them. The first result is from commit 052af4a without UDP GSO or GRO. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 9.85 GBytes 8.46 Gbits/sec 1139 3.01 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 9.85 GBytes 8.46 Gbits/sec 1139 sender [ 5] 0.00-10.04 sec 9.85 GBytes 8.42 Gbits/sec receiver The second result is with UDP GSO and GRO. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 sender [ 5] 0.00-10.04 sec 12.3 GBytes 10.6 Gbits/sec receiver Reviewed-by: Adrian Dewhurst <adrian@tailscale.com> Signed-off-by: Jordan Whited <jordan@tailscale.com> Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
2023-10-02 16:53:07 -04:00
if err != nil {
var errGSO conn.ErrUDPGSODisabled
if errors.As(err, &errGSO) {
device.log.Verbosef(err.Error())
err = errGSO.RetryErr
}
}
if err != nil {
device.log.Errorf("%v - Failed to send data packets: %v", peer, err)
continue
}
peer.keepKeyFreshSending()
2017-06-26 07:14:02 -04:00
}
}