device: distribute crypto work as slice of elements

After reducing UDP stack traversal overhead via GSO and GRO,
runtime.chanrecv() began to account for a high percentage (20% in one
environment) of perf samples during a throughput benchmark. The
individual packet channel ops with the crypto goroutines was the primary
contributor to this overhead.

Updating these channels to pass vectors, which the device package
already handles at its ends, reduced this overhead substantially, and
improved throughput.

The iperf3 results below demonstrate the effect of this commit between
two Linux computers with i5-12400 CPUs. There is roughly ~13us of round
trip latency between them.

The first result is with UDP GSO and GRO, and with single element
channels.

Starting Test: protocol: TCP, 1 streams, 131072 byte blocks
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-10.00  sec  12.3 GBytes  10.6 Gbits/sec  232   3.15 MBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
Test Complete. Summary Results:
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  12.3 GBytes  10.6 Gbits/sec  232   sender
[  5]   0.00-10.04  sec  12.3 GBytes  10.6 Gbits/sec        receiver

The second result is with channels updated to pass a slice of
elements.

Starting Test: protocol: TCP, 1 streams, 131072 byte blocks
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-10.00  sec  13.2 GBytes  11.3 Gbits/sec  182   3.15 MBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
Test Complete. Summary Results:
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  13.2 GBytes  11.3 Gbits/sec  182   sender
[  5]   0.00-10.04  sec  13.2 GBytes  11.3 Gbits/sec        receiver

Reviewed-by: Adrian Dewhurst <adrian@tailscale.com>
Signed-off-by: Jordan Whited <jordan@tailscale.com>
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
This commit is contained in:
Jordan Whited 2023-10-02 14:41:04 -07:00 committed by Jason A. Donenfeld
parent 6a84778f2c
commit 4201e08f1d
3 changed files with 49 additions and 49 deletions

View File

@ -19,13 +19,13 @@ import (
// call wg.Done to remove the initial reference. // call wg.Done to remove the initial reference.
// When the refcount hits 0, the queue's channel is closed. // When the refcount hits 0, the queue's channel is closed.
type outboundQueue struct { type outboundQueue struct {
c chan *QueueOutboundElement c chan *[]*QueueOutboundElement
wg sync.WaitGroup wg sync.WaitGroup
} }
func newOutboundQueue() *outboundQueue { func newOutboundQueue() *outboundQueue {
q := &outboundQueue{ q := &outboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize), c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
} }
q.wg.Add(1) q.wg.Add(1)
go func() { go func() {
@ -37,13 +37,13 @@ func newOutboundQueue() *outboundQueue {
// A inboundQueue is similar to an outboundQueue; see those docs. // A inboundQueue is similar to an outboundQueue; see those docs.
type inboundQueue struct { type inboundQueue struct {
c chan *QueueInboundElement c chan *[]*QueueInboundElement
wg sync.WaitGroup wg sync.WaitGroup
} }
func newInboundQueue() *inboundQueue { func newInboundQueue() *inboundQueue {
q := &inboundQueue{ q := &inboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize), c: make(chan *[]*QueueInboundElement, QueueInboundSize),
} }
q.wg.Add(1) q.wg.Add(1)
go func() { go func() {

View File

@ -220,9 +220,7 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
for peer, elems := range elemsByPeer { for peer, elems := range elemsByPeer {
if peer.isRunning.Load() { if peer.isRunning.Load() {
peer.queue.inbound.c <- elems peer.queue.inbound.c <- elems
for _, elem := range *elems { device.queue.decryption.c <- elems
device.queue.decryption.c <- elem
}
} else { } else {
for _, elem := range *elems { for _, elem := range *elems {
device.PutMessageBuffer(elem.buffer) device.PutMessageBuffer(elem.buffer)
@ -241,7 +239,8 @@ func (device *Device) RoutineDecryption(id int) {
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id) defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
device.log.Verbosef("Routine: decryption worker %d - started", id) device.log.Verbosef("Routine: decryption worker %d - started", id)
for elem := range device.queue.decryption.c { for elems := range device.queue.decryption.c {
for _, elem := range *elems {
// split message into fields // split message into fields
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent] counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
content := elem.packet[MessageTransportOffsetContent:] content := elem.packet[MessageTransportOffsetContent:]
@ -262,6 +261,7 @@ func (device *Device) RoutineDecryption(id int) {
} }
elem.Unlock() elem.Unlock()
} }
}
} }
/* Handles incoming packets related to handshake /* Handles incoming packets related to handshake

View File

@ -385,9 +385,7 @@ top:
// add to parallel and sequential queue // add to parallel and sequential queue
if peer.isRunning.Load() { if peer.isRunning.Load() {
peer.queue.outbound.c <- elems peer.queue.outbound.c <- elems
for _, elem := range *elems { peer.device.queue.encryption.c <- elems
peer.device.queue.encryption.c <- elem
}
} else { } else {
for _, elem := range *elems { for _, elem := range *elems {
peer.device.PutMessageBuffer(elem.buffer) peer.device.PutMessageBuffer(elem.buffer)
@ -447,7 +445,8 @@ func (device *Device) RoutineEncryption(id int) {
defer device.log.Verbosef("Routine: encryption worker %d - stopped", id) defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
device.log.Verbosef("Routine: encryption worker %d - started", id) device.log.Verbosef("Routine: encryption worker %d - started", id)
for elem := range device.queue.encryption.c { for elems := range device.queue.encryption.c {
for _, elem := range *elems {
// populate header fields // populate header fields
header := elem.buffer[:MessageTransportHeaderSize] header := elem.buffer[:MessageTransportHeaderSize]
@ -474,6 +473,7 @@ func (device *Device) RoutineEncryption(id int) {
) )
elem.Unlock() elem.Unlock()
} }
}
} }
func (peer *Peer) RoutineSequentialSender(maxBatchSize int) { func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {