Listen for flush in outer select
Now listen for flushNonceQueue signal in outer select during the RoutineNonce routine. This is needed to handle the edge case where the queue is flushed, but no packets are in the nonce queue. Since the signal has capacity 1 this signal will remain and potentially flush the queue at a later time, with packets meant for transmission.
This commit is contained in:
parent
38accea986
commit
2a432523ed
36
send.go
36
send.go
@ -336,6 +336,16 @@ func (peer *Peer) RoutineNonce() {
|
|||||||
peer.routines.stopping.Done()
|
peer.routines.stopping.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
flush := func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-peer.queue.nonce:
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
peer.routines.starting.Done()
|
peer.routines.starting.Done()
|
||||||
logDebug.Println(peer, ": Routine: nonce worker - started")
|
logDebug.Println(peer, ": Routine: nonce worker - started")
|
||||||
|
|
||||||
@ -347,15 +357,22 @@ func (peer *Peer) RoutineNonce() {
|
|||||||
case <-peer.routines.stop:
|
case <-peer.routines.stop:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case <-peer.signals.flushNonceQueue:
|
||||||
|
flush()
|
||||||
|
goto NextPacket
|
||||||
|
|
||||||
case elem, ok := <-peer.queue.nonce:
|
case elem, ok := <-peer.queue.nonce:
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for key pair
|
// make sure to always pick the newest key
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
// check validity of newest key pair
|
||||||
|
|
||||||
keypair = peer.keypairs.Current()
|
keypair = peer.keypairs.Current()
|
||||||
if keypair != nil && keypair.sendNonce < RejectAfterMessages {
|
if keypair != nil && keypair.sendNonce < RejectAfterMessages {
|
||||||
if time.Now().Sub(keypair.created) < RejectAfterTime {
|
if time.Now().Sub(keypair.created) < RejectAfterTime {
|
||||||
@ -364,6 +381,8 @@ func (peer *Peer) RoutineNonce() {
|
|||||||
}
|
}
|
||||||
peer.queue.packetInNonceQueueIsAwaitingKey = true
|
peer.queue.packetInNonceQueueIsAwaitingKey = true
|
||||||
|
|
||||||
|
// no suitable key pair, request for new handshake
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-peer.signals.newKeypairArrived:
|
case <-peer.signals.newKeypairArrived:
|
||||||
default:
|
default:
|
||||||
@ -371,19 +390,18 @@ func (peer *Peer) RoutineNonce() {
|
|||||||
|
|
||||||
peer.SendHandshakeInitiation(false)
|
peer.SendHandshakeInitiation(false)
|
||||||
|
|
||||||
|
// wait for key to be established
|
||||||
|
|
||||||
logDebug.Println(peer, ": Awaiting keypair")
|
logDebug.Println(peer, ": Awaiting keypair")
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-peer.signals.newKeypairArrived:
|
case <-peer.signals.newKeypairArrived:
|
||||||
logDebug.Println(peer, ": Obtained awaited keypair")
|
logDebug.Println(peer, ": Obtained awaited keypair")
|
||||||
|
|
||||||
case <-peer.signals.flushNonceQueue:
|
case <-peer.signals.flushNonceQueue:
|
||||||
for {
|
flush()
|
||||||
select {
|
|
||||||
case <-peer.queue.nonce:
|
|
||||||
default:
|
|
||||||
goto NextPacket
|
goto NextPacket
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-peer.routines.stop:
|
case <-peer.routines.stop:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -394,10 +412,14 @@ func (peer *Peer) RoutineNonce() {
|
|||||||
|
|
||||||
elem.peer = peer
|
elem.peer = peer
|
||||||
elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1
|
elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1
|
||||||
|
|
||||||
// double check in case of race condition added by future code
|
// double check in case of race condition added by future code
|
||||||
|
|
||||||
if elem.nonce >= RejectAfterMessages {
|
if elem.nonce >= RejectAfterMessages {
|
||||||
|
atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages)
|
||||||
goto NextPacket
|
goto NextPacket
|
||||||
}
|
}
|
||||||
|
|
||||||
elem.keypair = keypair
|
elem.keypair = keypair
|
||||||
elem.dropped = AtomicFalse
|
elem.dropped = AtomicFalse
|
||||||
elem.mutex.Lock()
|
elem.mutex.Lock()
|
||||||
|
Loading…
Reference in New Issue
Block a user