device: remove selects from encrypt/decrypt/inbound/outbound enqueuing
Block instead. Backpressure here is fine, probably preferable. This reduces code complexity. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
		
							parent
							
								
									0cc15e7c7c
								
							
						
					
					
						commit
						2fe19ce54d
					
				| @ -58,23 +58,6 @@ func (elem *QueueInboundElement) IsDropped() bool { | |||||||
| 	return atomic.LoadInt32(&elem.dropped) == AtomicTrue | 	return atomic.LoadInt32(&elem.dropped) == AtomicTrue | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueInboundElement, decryptionQueue chan *QueueInboundElement, elem *QueueInboundElement) bool { |  | ||||||
| 	select { |  | ||||||
| 	case inboundQueue <- elem: |  | ||||||
| 		select { |  | ||||||
| 		case decryptionQueue <- elem: |  | ||||||
| 			return true |  | ||||||
| 		default: |  | ||||||
| 			elem.Drop() |  | ||||||
| 			elem.Unlock() |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
| 	default: |  | ||||||
| 		device.PutInboundElement(elem) |  | ||||||
| 		return false |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, elem QueueHandshakeElement) bool { | func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, elem QueueHandshakeElement) bool { | ||||||
| 	select { | 	select { | ||||||
| 	case queue <- elem: | 	case queue <- elem: | ||||||
| @ -207,9 +190,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) { | |||||||
| 
 | 
 | ||||||
| 			peer.queue.RLock() | 			peer.queue.RLock() | ||||||
| 			if peer.isRunning.Get() { | 			if peer.isRunning.Get() { | ||||||
| 				if device.addToInboundAndDecryptionQueues(peer.queue.inbound, device.queue.decryption.c, elem) { | 				peer.queue.inbound <- elem | ||||||
|  | 				device.queue.decryption.c <- elem | ||||||
| 				buffer = device.GetMessageBuffer() | 				buffer = device.GetMessageBuffer() | ||||||
| 				} |  | ||||||
| 			} else { | 			} else { | ||||||
| 				device.PutInboundElement(elem) | 				device.PutInboundElement(elem) | ||||||
| 			} | 			} | ||||||
|  | |||||||
| @ -97,23 +97,6 @@ func addToNonceQueue(queue chan *QueueOutboundElement, elem *QueueOutboundElemen | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func addToOutboundAndEncryptionQueues(outboundQueue chan *QueueOutboundElement, encryptionQueue chan *QueueOutboundElement, elem *QueueOutboundElement) { |  | ||||||
| 	select { |  | ||||||
| 	case outboundQueue <- elem: |  | ||||||
| 		select { |  | ||||||
| 		case encryptionQueue <- elem: |  | ||||||
| 			return |  | ||||||
| 		default: |  | ||||||
| 			elem.Drop() |  | ||||||
| 			elem.peer.device.PutMessageBuffer(elem.buffer) |  | ||||||
| 			elem.Unlock() |  | ||||||
| 		} |  | ||||||
| 	default: |  | ||||||
| 		elem.peer.device.PutMessageBuffer(elem.buffer) |  | ||||||
| 		elem.peer.device.PutOutboundElement(elem) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /* Queues a keepalive if no packets are queued for peer | /* Queues a keepalive if no packets are queued for peer | ||||||
|  */ |  */ | ||||||
| func (peer *Peer) SendKeepalive() bool { | func (peer *Peer) SendKeepalive() bool { | ||||||
| @ -457,7 +440,8 @@ NextPacket: | |||||||
| 			elem.Lock() | 			elem.Lock() | ||||||
| 
 | 
 | ||||||
| 			// add to parallel and sequential queue
 | 			// add to parallel and sequential queue
 | ||||||
| 			addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption.c, elem) | 			peer.queue.outbound <- elem | ||||||
|  | 			device.queue.encryption.c <- elem | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user