wireguard-go/device/receive.go

632 lines
13 KiB
Go
Raw Normal View History

2019-01-02 01:55:51 +01:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2020 WireGuard LLC. All Rights Reserved.
*/
2019-03-03 04:04:41 +01:00
package device
2017-07-01 23:29:22 +02:00
import (
"bytes"
"encoding/binary"
"errors"
2017-07-01 23:29:22 +02:00
"net"
"strconv"
2017-07-01 23:29:22 +02:00
"sync"
"sync/atomic"
"time"
2019-05-14 09:09:52 +02:00
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"golang.zx2c4.com/wireguard/conn"
2017-07-01 23:29:22 +02:00
)
type QueueHandshakeElement struct {
msgType uint32
packet []byte
endpoint conn.Endpoint
buffer *[MaxMessageSize]byte
2017-07-01 23:29:22 +02:00
}
type QueueInboundElement struct {
dropped int32
sync.Mutex
2017-11-14 16:27:53 +01:00
buffer *[MaxMessageSize]byte
packet []byte
counter uint64
2018-05-13 18:23:40 +02:00
keypair *Keypair
endpoint conn.Endpoint
2017-07-01 23:29:22 +02:00
}
// clearPointers clears elem fields that contain pointers.
// This makes the garbage collector's life easier and
// avoids accidentally keeping other objects around unnecessarily.
// It also reduces the possible collateral damage from use-after-free bugs.
func (elem *QueueInboundElement) clearPointers() {
elem.buffer = nil
elem.packet = nil
elem.keypair = nil
elem.endpoint = nil
}
2017-07-01 23:29:22 +02:00
func (elem *QueueInboundElement) Drop() {
2017-07-08 23:51:26 +02:00
atomic.StoreInt32(&elem.dropped, AtomicTrue)
}
func (elem *QueueInboundElement) IsDropped() bool {
2017-07-08 23:51:26 +02:00
return atomic.LoadInt32(&elem.dropped) == AtomicTrue
}
func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, elem QueueHandshakeElement) bool {
select {
case queue <- elem:
return true
default:
return false
2017-07-07 13:47:09 +02:00
}
}
/* Called when a new authenticated message has been received
*
* NOTE: Not thread safe, but called by sequential receiver!
*/
func (peer *Peer) keepKeyFreshReceiving() {
2018-05-20 06:50:07 +02:00
if peer.timers.sentLastMinuteHandshake.Get() {
return
}
2018-05-13 23:14:43 +02:00
keypair := peer.keypairs.Current()
if keypair != nil && keypair.isInitiator && time.Since(keypair.created) > (RejectAfterTime-KeepaliveTimeout-RekeyTimeout) {
2018-05-20 06:50:07 +02:00
peer.timers.sentLastMinuteHandshake.Set(true)
peer.SendHandshakeInitiation(false)
}
}
2017-12-01 23:37:26 +01:00
/* Receives incoming datagrams for the device
*
* Every time the bind is updated a new routine is started for
* IPv4 and IPv6 (separately)
*/
func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
2017-07-01 23:29:22 +02:00
2017-07-08 09:23:10 +02:00
logDebug := device.log.Debug
defer func() {
logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - stopped")
device.queue.decryption.wg.Done()
device.net.stopping.Done()
}()
2018-11-01 19:54:25 +01:00
logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - started")
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// receive datagrams until conn is closed
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
buffer := device.GetMessageBuffer()
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
var (
err error
size int
endpoint conn.Endpoint
deathSpiral int
2017-12-01 00:03:06 +01:00
)
2017-12-01 00:03:06 +01:00
for {
switch IP {
case ipv4.Version:
size, endpoint, err = bind.ReceiveIPv4(buffer[:])
case ipv6.Version:
size, endpoint, err = bind.ReceiveIPv6(buffer[:])
default:
panic("invalid IP version")
2017-12-01 00:03:06 +01:00
}
2017-08-04 16:15:53 +02:00
2017-12-01 00:03:06 +01:00
if err != nil {
device.PutMessageBuffer(buffer)
if errors.Is(err, net.ErrClosed) {
return
}
device.log.Error.Printf("Failed to receive packet: %v", err)
if deathSpiral < 10 {
deathSpiral++
time.Sleep(time.Second / 3)
continue
}
2017-12-01 00:03:06 +01:00
return
}
deathSpiral = 0
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
if size < MinMessageSize {
continue
}
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// check size of packet
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
packet := buffer[:size]
msgType := binary.LittleEndian.Uint32(packet[:4])
2017-08-04 16:15:53 +02:00
2017-12-01 00:03:06 +01:00
var okay bool
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
switch msgType {
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// check if transport
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
case MessageTransportType:
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// check size
2017-07-01 23:29:22 +02:00
if len(packet) < MessageTransportSize {
2017-12-01 00:03:06 +01:00
continue
}
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// lookup key pair
2017-12-01 00:03:06 +01:00
receiver := binary.LittleEndian.Uint32(
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
)
2018-05-13 18:23:40 +02:00
value := device.indexTable.Lookup(receiver)
keypair := value.keypair
if keypair == nil {
2017-12-01 00:03:06 +01:00
continue
}
2017-07-01 23:29:22 +02:00
2018-05-13 19:50:58 +02:00
// check keypair expiry
2017-07-01 23:29:22 +02:00
2018-05-13 18:23:40 +02:00
if keypair.created.Add(RejectAfterTime).Before(time.Now()) {
2017-12-01 00:03:06 +01:00
continue
}
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// create work element
peer := value.peer
2018-09-22 06:29:02 +02:00
elem := device.GetInboundElement()
elem.packet = packet
elem.buffer = buffer
elem.keypair = keypair
elem.dropped = AtomicFalse
elem.endpoint = endpoint
elem.counter = 0
elem.Mutex = sync.Mutex{}
elem.Lock()
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// add to decryption queues
peer.queue.RLock()
if peer.isRunning.Get() {
peer.queue.inbound <- elem
device.queue.decryption.c <- elem
buffer = device.GetMessageBuffer()
} else {
device.PutInboundElement(elem)
}
peer.queue.RUnlock()
2017-12-01 23:37:26 +01:00
2017-12-01 00:03:06 +01:00
continue
2017-07-01 23:29:22 +02:00
2017-12-01 00:03:06 +01:00
// otherwise it is a fixed size & handshake related packet
2017-12-01 00:03:06 +01:00
case MessageInitiationType:
okay = len(packet) == MessageInitiationSize
2017-12-01 00:03:06 +01:00
case MessageResponseType:
okay = len(packet) == MessageResponseSize
2017-12-01 00:03:06 +01:00
case MessageCookieReplyType:
okay = len(packet) == MessageCookieReplySize
default:
logDebug.Println("Received message with unknown type")
2017-12-01 00:03:06 +01:00
}
2017-12-01 00:03:06 +01:00
if okay {
if (device.addToHandshakeQueue(
2017-12-01 00:03:06 +01:00
device.queue.handshake,
QueueHandshakeElement{
msgType: msgType,
buffer: buffer,
packet: packet,
endpoint: endpoint,
},
)) {
buffer = device.GetMessageBuffer()
}
}
2017-07-01 23:29:22 +02:00
}
}
func (device *Device) RoutineDecryption() {
2017-07-01 23:29:22 +02:00
var nonce [chacha20poly1305.NonceSize]byte
logDebug := device.log.Debug
defer func() {
logDebug.Println("Routine: decryption worker - stopped")
device.state.stopping.Done()
}()
logDebug.Println("Routine: decryption worker - started")
for elem := range device.queue.decryption.c {
// check if dropped
if elem.IsDropped() {
continue
}
// split message into fields
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
content := elem.packet[MessageTransportOffsetContent:]
// decrypt and release to consumer
var err error
elem.counter = binary.LittleEndian.Uint64(counter)
// copy counter to nonce
binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
elem.packet, err = elem.keypair.receive.Open(
content[:0],
nonce[:],
content,
nil,
)
if err != nil {
elem.Drop()
device.PutMessageBuffer(elem.buffer)
2017-07-01 23:29:22 +02:00
}
elem.Unlock()
2017-07-01 23:29:22 +02:00
}
}
2017-12-01 23:37:26 +01:00
/* Handles incoming packets related to handshake
2017-07-01 23:29:22 +02:00
*/
func (device *Device) RoutineHandshake() {
logInfo := device.log.Info
logError := device.log.Error
logDebug := device.log.Debug
2018-09-17 00:43:23 +02:00
var elem QueueHandshakeElement
var ok bool
defer func() {
logDebug.Println("Routine: handshake worker - stopped")
device.state.stopping.Done()
2018-09-17 00:43:23 +02:00
if elem.buffer != nil {
device.PutMessageBuffer(elem.buffer)
}
}()
logDebug.Println("Routine: handshake worker - started")
2017-07-01 23:29:22 +02:00
for {
2018-09-17 00:43:23 +02:00
if elem.buffer != nil {
device.PutMessageBuffer(elem.buffer)
2018-09-24 01:52:02 +02:00
elem.buffer = nil
2018-09-17 00:43:23 +02:00
}
2017-07-01 23:29:22 +02:00
select {
case elem, ok = <-device.queue.handshake:
case <-device.signals.stop:
2017-07-01 23:29:22 +02:00
return
}
if !ok {
return
}
// handle cookie fields and ratelimiting
2017-07-01 23:29:22 +02:00
switch elem.msgType {
2017-07-08 09:23:10 +02:00
case MessageCookieReplyType:
2017-08-14 17:09:25 +02:00
// unmarshal packet
var reply MessageCookieReply
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &reply)
if err != nil {
logDebug.Println("Failed to decode cookie reply")
2017-07-08 09:23:10 +02:00
return
}
2017-08-14 17:09:25 +02:00
// lookup peer from index
2017-08-14 17:09:25 +02:00
2018-05-13 18:23:40 +02:00
entry := device.indexTable.Lookup(reply.Receiver)
2017-08-14 17:09:25 +02:00
if entry.peer == nil {
2018-01-16 14:57:12 +01:00
continue
2017-08-14 17:09:25 +02:00
}
// consume reply
if peer := entry.peer; peer.isRunning.Get() {
2018-12-19 00:35:53 +01:00
logDebug.Println("Receiving cookie response from ", elem.endpoint.DstToString())
if !peer.cookieGenerator.ConsumeReply(&reply) {
logDebug.Println("Could not decrypt invalid cookie response")
}
}
continue
2017-07-08 09:23:10 +02:00
case MessageInitiationType, MessageResponseType:
2017-07-08 09:23:10 +02:00
2018-05-13 23:14:43 +02:00
// check mac fields and maybe ratelimit
2017-07-08 09:23:10 +02:00
2018-05-13 23:14:43 +02:00
if !device.cookieChecker.CheckMAC1(elem.packet) {
logDebug.Println("Received packet with invalid mac1")
2018-01-16 14:57:12 +01:00
continue
2017-07-08 09:23:10 +02:00
}
// endpoints destination address is the source of the datagram
if device.IsUnderLoad() {
2017-10-08 22:03:32 +02:00
// verify MAC2 field
2018-05-13 23:14:43 +02:00
if !device.cookieChecker.CheckMAC2(elem.packet, elem.endpoint.DstToBytes()) {
device.SendHandshakeCookie(&elem)
continue
}
2017-10-08 22:03:32 +02:00
// check ratelimiter
if !device.rate.limiter.Allow(elem.endpoint.DstIP()) {
continue
}
}
default:
logError.Println("Invalid packet ended up in the handshake queue")
continue
}
2017-07-08 09:23:10 +02:00
2017-12-01 23:37:26 +01:00
// handle handshake initiation/response content
2017-07-01 23:29:22 +02:00
switch elem.msgType {
case MessageInitiationType:
2017-07-01 23:29:22 +02:00
// unmarshal
2017-07-01 23:29:22 +02:00
var msg MessageInitiation
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode initiation message")
continue
}
2017-07-01 23:29:22 +02:00
// consume initiation
2017-07-01 23:29:22 +02:00
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
logInfo.Println(
2017-12-01 23:37:26 +01:00
"Received invalid initiation message from",
elem.endpoint.DstToString(),
)
continue
}
2017-08-04 16:15:53 +02:00
// update timers
2017-08-04 16:15:53 +02:00
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketReceived()
2017-07-07 13:47:09 +02:00
// update endpoint
2018-05-26 02:59:26 +02:00
peer.SetEndpointFromPacket(elem.endpoint)
logDebug.Println(peer, "- Received handshake initiation")
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
2018-04-20 07:13:40 +02:00
2018-05-13 23:14:43 +02:00
peer.SendHandshakeResponse()
case MessageResponseType:
2017-07-01 23:29:22 +02:00
// unmarshal
2017-07-01 23:29:22 +02:00
var msg MessageResponse
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode response message")
continue
}
2017-07-01 23:29:22 +02:00
// consume response
2017-07-01 23:29:22 +02:00
peer := device.ConsumeMessageResponse(&msg)
if peer == nil {
logInfo.Println(
2018-05-23 19:00:00 +02:00
"Received invalid response message from",
elem.endpoint.DstToString(),
)
continue
}
2017-07-27 23:45:37 +02:00
2017-11-14 16:27:53 +01:00
// update endpoint
2018-05-26 02:59:26 +02:00
peer.SetEndpointFromPacket(elem.endpoint)
2017-11-14 16:27:53 +01:00
logDebug.Println(peer, "- Received handshake response")
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
// update timers
2017-08-04 16:15:53 +02:00
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketReceived()
2017-08-04 16:15:53 +02:00
2018-05-13 19:50:58 +02:00
// derive keypair
2017-07-01 23:29:22 +02:00
2018-05-13 23:14:43 +02:00
err = peer.BeginSymmetricSession()
if err != nil {
logError.Println(peer, "- Failed to derive keypair:", err)
continue
}
2018-05-13 23:14:43 +02:00
peer.timersSessionDerived()
peer.timersHandshakeComplete()
peer.SendKeepalive()
select {
case peer.signals.newKeypairArrived <- struct{}{}:
default:
}
}
2017-07-01 23:29:22 +02:00
}
}
func (peer *Peer) RoutineSequentialReceiver() {
device := peer.device
logInfo := device.log.Info
logError := device.log.Error
2017-07-01 23:29:22 +02:00
logDebug := device.log.Debug
2018-02-04 19:18:44 +01:00
2018-09-17 00:43:23 +02:00
var elem *QueueInboundElement
defer func() {
logDebug.Println(peer, "- Routine: sequential receiver - stopped")
peer.routines.stopping.Done()
2018-09-22 06:29:02 +02:00
if elem != nil {
2018-09-24 01:52:02 +02:00
if !elem.IsDropped() {
2018-09-22 06:29:02 +02:00
device.PutMessageBuffer(elem.buffer)
}
device.PutInboundElement(elem)
2018-09-17 00:43:23 +02:00
}
2018-02-04 19:18:44 +01:00
}()
logDebug.Println(peer, "- Routine: sequential receiver - started")
2017-07-01 23:29:22 +02:00
for {
2018-09-22 06:29:02 +02:00
if elem != nil {
2018-09-24 01:52:02 +02:00
if !elem.IsDropped() {
2018-09-22 06:29:02 +02:00
device.PutMessageBuffer(elem.buffer)
}
device.PutInboundElement(elem)
2018-09-24 01:52:02 +02:00
elem = nil
2018-09-17 00:43:23 +02:00
}
2017-07-01 23:29:22 +02:00
2019-07-01 15:23:24 +02:00
var elemOk bool
select {
case <-peer.routines.stop:
2017-07-01 23:29:22 +02:00
return
2019-07-01 15:23:24 +02:00
case elem, elemOk = <-peer.queue.inbound:
if !elemOk {
return
}
2019-03-21 21:43:04 +01:00
}
2017-07-01 23:29:22 +02:00
2019-03-21 21:43:04 +01:00
// wait for decryption
2019-03-21 21:43:04 +01:00
elem.Lock()
2019-03-21 21:43:04 +01:00
if elem.IsDropped() {
continue
}
2017-07-10 12:09:19 +02:00
2019-03-21 21:43:04 +01:00
// check for replay
2017-07-08 23:51:26 +02:00
2019-03-21 21:43:04 +01:00
if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) {
continue
}
2017-07-08 23:51:26 +02:00
2019-03-21 21:43:04 +01:00
// update endpoint
peer.SetEndpointFromPacket(elem.endpoint)
2017-07-01 23:29:22 +02:00
2019-03-21 21:43:04 +01:00
// check if using new keypair
if peer.ReceivedWithKeypair(elem.keypair) {
peer.timersHandshakeComplete()
select {
case peer.signals.newKeypairArrived <- struct{}{}:
default:
2017-09-01 14:21:53 +02:00
}
2019-03-21 21:43:04 +01:00
}
2017-07-01 23:29:22 +02:00
2019-03-21 21:43:04 +01:00
peer.keepKeyFreshReceiving()
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketReceived()
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)+MinMessageSize))
2017-07-01 23:29:22 +02:00
2019-03-21 21:43:04 +01:00
// check for keepalive
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
if len(elem.packet) == 0 {
logDebug.Println(peer, "- Receiving keepalive packet")
continue
}
peer.timersDataReceived()
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
// verify source and strip padding
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
switch elem.packet[0] >> 4 {
case ipv4.Version:
2017-07-07 13:47:09 +02:00
2019-03-21 21:43:04 +01:00
// strip padding
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
if len(elem.packet) < ipv4.HeaderLen {
continue
}
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2]
length := binary.BigEndian.Uint16(field)
if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen {
continue
}
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
elem.packet = elem.packet[:length]
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
// verify IPv4 source
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.allowedips.LookupIPv4(src) != peer {
logInfo.Println(
"IPv4 packet with disallowed source address from",
peer,
)
continue
}
2017-07-07 13:47:09 +02:00
2019-03-21 21:43:04 +01:00
case ipv6.Version:
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
// strip padding
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
if len(elem.packet) < ipv6.HeaderLen {
continue
}
2017-07-08 09:23:10 +02:00
2019-03-21 21:43:04 +01:00
field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2]
length := binary.BigEndian.Uint16(field)
length += ipv6.HeaderLen
if int(length) > len(elem.packet) {
continue
}
2017-08-04 16:15:53 +02:00
2019-03-21 21:43:04 +01:00
elem.packet = elem.packet[:length]
2019-03-21 21:43:04 +01:00
// verify IPv6 source
2017-07-01 23:29:22 +02:00
2019-03-21 21:43:04 +01:00
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.allowedips.LookupIPv6(src) != peer {
logInfo.Println(
"IPv6 packet with disallowed source address from",
2019-03-21 21:43:04 +01:00
peer,
)
continue
}
2019-03-21 21:43:04 +01:00
default:
logInfo.Println("Packet with invalid IP version from", peer)
continue
}
// write to tun device
2019-03-21 21:43:04 +01:00
offset := MessageTransportOffsetContent
_, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset)
if err != nil && !device.isClosed.Get() {
logError.Println("Failed to write packet to TUN device:", err)
}
2019-07-01 15:23:24 +02:00
if len(peer.queue.inbound) == 0 {
err := device.tun.device.Flush()
2019-07-01 15:23:24 +02:00
if err != nil {
peer.device.log.Error.Printf("Unable to flush packets: %v", err)
}
2019-03-21 21:43:04 +01:00
}
2017-07-01 23:29:22 +02:00
}
}