wireguard-go/device/receive.go

541 lines
13 KiB
Go
Raw Normal View History

2019-01-02 01:55:51 +01:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/
2019-03-03 04:04:41 +01:00
package device
2017-07-01 23:29:22 +02:00
import (
"bytes"
"encoding/binary"
"errors"
2017-07-01 23:29:22 +02:00
"net"
"sync"
"time"
2019-05-14 09:09:52 +02:00
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
2024-01-07 20:03:11 +01:00
"gitea.hbanafa.com/hesham/wireguard-go/conn"
2017-07-01 23:29:22 +02:00
)
type QueueHandshakeElement struct {
msgType uint32
packet []byte
endpoint conn.Endpoint
buffer *[MaxMessageSize]byte
2017-07-01 23:29:22 +02:00
}
type QueueInboundElement struct {
2017-11-14 16:27:53 +01:00
buffer *[MaxMessageSize]byte
packet []byte
counter uint64
2018-05-13 18:23:40 +02:00
keypair *Keypair
endpoint conn.Endpoint
2017-07-01 23:29:22 +02:00
}
type QueueInboundElementsContainer struct {
sync.Mutex
elems []*QueueInboundElement
}
// clearPointers clears elem fields that contain pointers.
// This makes the garbage collector's life easier and
// avoids accidentally keeping other objects around unnecessarily.
// It also reduces the possible collateral damage from use-after-free bugs.
func (elem *QueueInboundElement) clearPointers() {
elem.buffer = nil
elem.packet = nil
elem.keypair = nil
elem.endpoint = nil
}
/* Called when a new authenticated message has been received
*
* NOTE: Not thread safe, but called by sequential receiver!
*/
func (peer *Peer) keepKeyFreshReceiving() {
if peer.timers.sentLastMinuteHandshake.Load() {
return
}
2018-05-13 23:14:43 +02:00
keypair := peer.keypairs.Current()
if keypair != nil && keypair.isInitiator && time.Since(keypair.created) > (RejectAfterTime-KeepaliveTimeout-RekeyTimeout) {
peer.timers.sentLastMinuteHandshake.Store(true)
peer.SendHandshakeInitiation(false)
}
}
2017-12-01 23:37:26 +01:00
/* Receives incoming datagrams for the device
*
* Every time the bind is updated a new routine is started for
* IPv4 and IPv6 (separately)
*/
func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.ReceiveFunc) {
recvName := recv.PrettyName()
defer func() {
device.log.Verbosef("Routine: receive incoming %s - stopped", recvName)
device.queue.decryption.wg.Done()
device.queue.handshake.wg.Done()
device.net.stopping.Done()
}()
device.log.Verbosef("Routine: receive incoming %s - started", recvName)
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// receive datagrams until conn is closed
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
var (
bufsArrs = make([]*[MaxMessageSize]byte, maxBatchSize)
bufs = make([][]byte, maxBatchSize)
err error
sizes = make([]int, maxBatchSize)
count int
endpoints = make([]conn.Endpoint, maxBatchSize)
deathSpiral int
elemsByPeer = make(map[*Peer]*QueueInboundElementsContainer, maxBatchSize)
2017-12-01 00:03:06 +01:00
)
for i := range bufsArrs {
bufsArrs[i] = device.GetMessageBuffer()
bufs[i] = bufsArrs[i][:]
}
defer func() {
for i := 0; i < maxBatchSize; i++ {
if bufsArrs[i] != nil {
device.PutMessageBuffer(bufsArrs[i])
}
}
}()
2017-08-04 16:15:53 +02:00
for {
count, err = recv(bufs, sizes, endpoints)
2017-12-01 00:03:06 +01:00
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
device.log.Verbosef("Failed to receive %s packet: %v", recvName, err)
if neterr, ok := err.(net.Error); ok && !neterr.Temporary() {
return
}
if deathSpiral < 10 {
deathSpiral++
time.Sleep(time.Second / 3)
continue
}
2017-12-01 00:03:06 +01:00
return
}
deathSpiral = 0
2017-07-01 23:29:22 +02:00
// handle each packet in the batch
for i, size := range sizes[:count] {
if size < MinMessageSize {
continue
}
2017-07-01 23:29:22 +02:00
// check size of packet
2017-07-01 23:29:22 +02:00
packet := bufsArrs[i][:size]
msgType := binary.LittleEndian.Uint32(packet[:4])
2017-08-04 16:15:53 +02:00
switch msgType {
2017-07-01 23:29:22 +02:00
// check if transport
2017-07-01 23:29:22 +02:00
case MessageTransportType:
2017-07-01 23:29:22 +02:00
// check size
2017-07-01 23:29:22 +02:00
if len(packet) < MessageTransportSize {
continue
}
2017-07-01 23:29:22 +02:00
// lookup key pair
2017-07-01 23:29:22 +02:00
receiver := binary.LittleEndian.Uint32(
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
)
value := device.indexTable.Lookup(receiver)
keypair := value.keypair
if keypair == nil {
continue
}
// check keypair expiry
2017-07-01 23:29:22 +02:00
if keypair.created.Add(RejectAfterTime).Before(time.Now()) {
continue
}
2017-07-01 23:29:22 +02:00
// create work element
peer := value.peer
elem := device.GetInboundElement()
elem.packet = packet
elem.buffer = bufsArrs[i]
elem.keypair = keypair
elem.endpoint = endpoints[i]
elem.counter = 0
elemsForPeer, ok := elemsByPeer[peer]
if !ok {
elemsForPeer = device.GetInboundElementsContainer()
elemsForPeer.Lock()
elemsByPeer[peer] = elemsForPeer
}
elemsForPeer.elems = append(elemsForPeer.elems, elem)
bufsArrs[i] = device.GetMessageBuffer()
bufs[i] = bufsArrs[i][:]
2017-12-01 00:03:06 +01:00
continue
2017-07-01 23:29:22 +02:00
// otherwise it is a fixed size & handshake related packet
2017-07-01 23:29:22 +02:00
case MessageInitiationType:
if len(packet) != MessageInitiationSize {
continue
}
case MessageResponseType:
if len(packet) != MessageResponseSize {
continue
}
case MessageCookieReplyType:
if len(packet) != MessageCookieReplySize {
continue
}
default:
device.log.Verbosef("Received message with unknown type")
continue
}
select {
case device.queue.handshake.c <- QueueHandshakeElement{
msgType: msgType,
buffer: bufsArrs[i],
packet: packet,
endpoint: endpoints[i],
}:
bufsArrs[i] = device.GetMessageBuffer()
bufs[i] = bufsArrs[i][:]
default:
}
}
for peer, elemsContainer := range elemsByPeer {
if peer.isRunning.Load() {
peer.queue.inbound.c <- elemsContainer
device.queue.decryption.c <- elemsContainer
} else {
for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
}
device.PutInboundElementsContainer(elemsContainer)
}
delete(elemsByPeer, peer)
}
2017-07-01 23:29:22 +02:00
}
}
func (device *Device) RoutineDecryption(id int) {
2017-07-01 23:29:22 +02:00
var nonce [chacha20poly1305.NonceSize]byte
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
device.log.Verbosef("Routine: decryption worker %d - started", id)
for elemsContainer := range device.queue.decryption.c {
for _, elem := range elemsContainer.elems {
device: distribute crypto work as slice of elements After reducing UDP stack traversal overhead via GSO and GRO, runtime.chanrecv() began to account for a high percentage (20% in one environment) of perf samples during a throughput benchmark. The individual packet channel ops with the crypto goroutines was the primary contributor to this overhead. Updating these channels to pass vectors, which the device package already handles at its ends, reduced this overhead substantially, and improved throughput. The iperf3 results below demonstrate the effect of this commit between two Linux computers with i5-12400 CPUs. There is roughly ~13us of round trip latency between them. The first result is with UDP GSO and GRO, and with single element channels. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 sender [ 5] 0.00-10.04 sec 12.3 GBytes 10.6 Gbits/sec receiver The second result is with channels updated to pass a slice of elements. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 13.2 GBytes 11.3 Gbits/sec 182 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 13.2 GBytes 11.3 Gbits/sec 182 sender [ 5] 0.00-10.04 sec 13.2 GBytes 11.3 Gbits/sec receiver Reviewed-by: Adrian Dewhurst <adrian@tailscale.com> Signed-off-by: Jordan Whited <jordan@tailscale.com> Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
2023-10-02 23:41:04 +02:00
// split message into fields
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
content := elem.packet[MessageTransportOffsetContent:]
// decrypt and release to consumer
var err error
elem.counter = binary.LittleEndian.Uint64(counter)
// copy counter to nonce
binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
elem.packet, err = elem.keypair.receive.Open(
content[:0],
nonce[:],
content,
nil,
)
if err != nil {
elem.packet = nil
}
2017-07-01 23:29:22 +02:00
}
elemsContainer.Unlock()
2017-07-01 23:29:22 +02:00
}
}
2017-12-01 23:37:26 +01:00
/* Handles incoming packets related to handshake
2017-07-01 23:29:22 +02:00
*/
func (device *Device) RoutineHandshake(id int) {
defer func() {
device.log.Verbosef("Routine: handshake worker %d - stopped", id)
device.queue.encryption.wg.Done()
}()
device.log.Verbosef("Routine: handshake worker %d - started", id)
2017-07-01 23:29:22 +02:00
for elem := range device.queue.handshake.c {
// handle cookie fields and ratelimiting
2017-07-01 23:29:22 +02:00
switch elem.msgType {
2017-07-08 09:23:10 +02:00
case MessageCookieReplyType:
2017-08-14 17:09:25 +02:00
// unmarshal packet
var reply MessageCookieReply
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &reply)
if err != nil {
device.log.Verbosef("Failed to decode cookie reply")
goto skip
2017-07-08 09:23:10 +02:00
}
2017-08-14 17:09:25 +02:00
// lookup peer from index
2017-08-14 17:09:25 +02:00
2018-05-13 18:23:40 +02:00
entry := device.indexTable.Lookup(reply.Receiver)
2017-08-14 17:09:25 +02:00
if entry.peer == nil {
goto skip
2017-08-14 17:09:25 +02:00
}
// consume reply
if peer := entry.peer; peer.isRunning.Load() {
device.log.Verbosef("Receiving cookie response from %s", elem.endpoint.DstToString())
2018-12-19 00:35:53 +01:00
if !peer.cookieGenerator.ConsumeReply(&reply) {
device.log.Verbosef("Could not decrypt invalid cookie response")
2018-12-19 00:35:53 +01:00
}
}
goto skip
2017-07-08 09:23:10 +02:00
case MessageInitiationType, MessageResponseType:
2017-07-08 09:23:10 +02:00
2018-05-13 23:14:43 +02:00
// check mac fields and maybe ratelimit
2017-07-08 09:23:10 +02:00
2018-05-13 23:14:43 +02:00
if !device.cookieChecker.CheckMAC1(elem.packet) {
device.log.Verbosef("Received packet with invalid mac1")
goto skip
2017-07-08 09:23:10 +02:00
}
// endpoints destination address is the source of the datagram
if device.IsUnderLoad() {
2017-10-08 22:03:32 +02:00
// verify MAC2 field
2018-05-13 23:14:43 +02:00
if !device.cookieChecker.CheckMAC2(elem.packet, elem.endpoint.DstToBytes()) {
device.SendHandshakeCookie(&elem)
goto skip
}
2017-10-08 22:03:32 +02:00
// check ratelimiter
if !device.rate.limiter.Allow(elem.endpoint.DstIP()) {
goto skip
}
}
default:
device.log.Errorf("Invalid packet ended up in the handshake queue")
goto skip
}
2017-07-08 09:23:10 +02:00
2017-12-01 23:37:26 +01:00
// handle handshake initiation/response content
2017-07-01 23:29:22 +02:00
switch elem.msgType {
case MessageInitiationType:
2017-07-01 23:29:22 +02:00
// unmarshal
2017-07-01 23:29:22 +02:00
var msg MessageInitiation
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
device.log.Errorf("Failed to decode initiation message")
goto skip
}
2017-07-01 23:29:22 +02:00
// consume initiation
2017-07-01 23:29:22 +02:00
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
device.log.Verbosef("Received invalid initiation message from %s", elem.endpoint.DstToString())
goto skip
}
2017-08-04 16:15:53 +02:00
// update timers
2017-08-04 16:15:53 +02:00
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketReceived()
2017-07-07 13:47:09 +02:00
// update endpoint
2018-05-26 02:59:26 +02:00
peer.SetEndpointFromPacket(elem.endpoint)
device.log.Verbosef("%v - Received handshake initiation", peer)
peer.rxBytes.Add(uint64(len(elem.packet)))
2018-04-20 07:13:40 +02:00
2018-05-13 23:14:43 +02:00
peer.SendHandshakeResponse()
case MessageResponseType:
2017-07-01 23:29:22 +02:00
// unmarshal
2017-07-01 23:29:22 +02:00
var msg MessageResponse
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
device.log.Errorf("Failed to decode response message")
goto skip
}
2017-07-01 23:29:22 +02:00
// consume response
2017-07-01 23:29:22 +02:00
peer := device.ConsumeMessageResponse(&msg)
if peer == nil {
device.log.Verbosef("Received invalid response message from %s", elem.endpoint.DstToString())
goto skip
}
2017-07-27 23:45:37 +02:00
2017-11-14 16:27:53 +01:00
// update endpoint
2018-05-26 02:59:26 +02:00
peer.SetEndpointFromPacket(elem.endpoint)
2017-11-14 16:27:53 +01:00
device.log.Verbosef("%v - Received handshake response", peer)
peer.rxBytes.Add(uint64(len(elem.packet)))
// update timers
2017-08-04 16:15:53 +02:00
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketReceived()
2017-08-04 16:15:53 +02:00
2018-05-13 19:50:58 +02:00
// derive keypair
2017-07-01 23:29:22 +02:00
2018-05-13 23:14:43 +02:00
err = peer.BeginSymmetricSession()
if err != nil {
device.log.Errorf("%v - Failed to derive keypair: %v", peer, err)
goto skip
}
2018-05-13 23:14:43 +02:00
peer.timersSessionDerived()
peer.timersHandshakeComplete()
peer.SendKeepalive()
}
skip:
device.PutMessageBuffer(elem.buffer)
2017-07-01 23:29:22 +02:00
}
}
func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
2017-07-01 23:29:22 +02:00
device := peer.device
defer func() {
device.log.Verbosef("%v - Routine: sequential receiver - stopped", peer)
peer.stopping.Done()
2018-02-04 19:18:44 +01:00
}()
device.log.Verbosef("%v - Routine: sequential receiver - started", peer)
bufs := make([][]byte, 0, maxBatchSize)
for elemsContainer := range peer.queue.inbound.c {
if elemsContainer == nil {
device: remove mutex from Peer send/receive The immediate motivation for this change is an observed deadlock. 1. A goroutine calls peer.Stop. That calls peer.queue.Lock(). 2. Another goroutine is in RoutineSequentialReceiver. It receives an elem from peer.queue.inbound. 3. The peer.Stop goroutine calls close(peer.queue.inbound), close(peer.queue.outbound), and peer.stopping.Wait(). It blocks waiting for RoutineSequentialReceiver and RoutineSequentialSender to exit. 4. The RoutineSequentialReceiver goroutine calls peer.SendStagedPackets(). SendStagedPackets attempts peer.queue.RLock(). That blocks forever because the peer.Stop goroutine holds a write lock on that mutex. A background motivation for this change is that it can be expensive to have a mutex in the hot code path of RoutineSequential*. The mutex was necessary to avoid attempting to send elems on a closed channel. This commit removes that danger by never closing the channel. Instead, we send a sentinel nil value on the channel to indicate to the receiver that it should exit. The only problem with this is that if the receiver exits, we could write an elem into the channel which would never get received. If it never gets received, it cannot get returned to the device pools. To work around this, we use a finalizer. When the channel can be GC'd, the finalizer drains any remaining elements from the channel and restores them to the device pool. After that change, peer.queue.RWMutex no longer makes sense where it is. It is only used to prevent concurrent calls to Start and Stop. Move it to a more sensible location and make it a plain sync.Mutex. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2021-02-08 22:02:52 +01:00
return
}
elemsContainer.Lock()
validTailPacket := -1
dataPacketReceived := false
rxBytesLen := uint64(0)
for i, elem := range elemsContainer.elems {
if elem.packet == nil {
// decryption failed
continue
}
2017-07-10 12:09:19 +02:00
if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) {
continue
}
2017-07-08 23:51:26 +02:00
validTailPacket = i
if peer.ReceivedWithKeypair(elem.keypair) {
peer.SetEndpointFromPacket(elem.endpoint)
peer.timersHandshakeComplete()
peer.SendStagedPackets()
}
rxBytesLen += uint64(len(elem.packet) + MinMessageSize)
2017-07-01 23:29:22 +02:00
if len(elem.packet) == 0 {
device.log.Verbosef("%v - Receiving keepalive packet", peer)
continue
}
dataPacketReceived = true
2017-07-01 23:29:22 +02:00
switch elem.packet[0] >> 4 {
case 4:
if len(elem.packet) < ipv4.HeaderLen {
continue
}
field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2]
length := binary.BigEndian.Uint16(field)
if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen {
continue
}
elem.packet = elem.packet[:length]
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.allowedips.Lookup(src) != peer {
device.log.Verbosef("IPv4 packet with disallowed source address from %v", peer)
continue
}
2017-07-08 09:23:10 +02:00
case 6:
if len(elem.packet) < ipv6.HeaderLen {
continue
}
field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2]
length := binary.BigEndian.Uint16(field)
length += ipv6.HeaderLen
if int(length) > len(elem.packet) {
continue
}
elem.packet = elem.packet[:length]
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.allowedips.Lookup(src) != peer {
device.log.Verbosef("IPv6 packet with disallowed source address from %v", peer)
continue
}
2017-07-07 13:47:09 +02:00
default:
device.log.Verbosef("Packet with invalid IP version from %v", peer)
continue
}
bufs = append(bufs, elem.buffer[:MessageTransportOffsetContent+len(elem.packet)])
2019-03-21 21:43:04 +01:00
}
peer.rxBytes.Add(rxBytesLen)
if validTailPacket >= 0 {
peer.SetEndpointFromPacket(elemsContainer.elems[validTailPacket].endpoint)
peer.keepKeyFreshReceiving()
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketReceived()
}
if dataPacketReceived {
peer.timersDataReceived()
}
if len(bufs) > 0 {
_, err := device.tun.device.Write(bufs, MessageTransportOffsetContent)
if err != nil && !device.isClosed() {
device.log.Errorf("Failed to write packets to TUN device: %v", err)
2019-07-01 15:23:24 +02:00
}
2019-03-21 21:43:04 +01:00
}
for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
}
bufs = bufs[:0]
device.PutInboundElementsContainer(elemsContainer)
2017-07-01 23:29:22 +02:00
}
}