device: remove starting waitgroups
In each case, the starting waitgroup did nothing but ensure that the goroutine has launched. Nothing downstream depends on the order in which goroutines launch, and if the Go runtime scheduler is so broken that goroutines don't get launched reasonably promptly, we have much deeper problems. Given all that, simplify the code. Passed a race-enabled stress test 25,000 times without failure. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
		
							parent
							
								
									3591acba76
								
							
						
					
					
						commit
						c9e4a859ae
					
				| @ -27,7 +27,6 @@ type Device struct { | |||||||
| 	// synchronized resources (locks acquired in order)
 | 	// synchronized resources (locks acquired in order)
 | ||||||
| 
 | 
 | ||||||
| 	state struct { | 	state struct { | ||||||
| 		starting sync.WaitGroup |  | ||||||
| 		stopping sync.WaitGroup | 		stopping sync.WaitGroup | ||||||
| 		sync.Mutex | 		sync.Mutex | ||||||
| 		changing AtomicBool | 		changing AtomicBool | ||||||
| @ -35,7 +34,6 @@ type Device struct { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	net struct { | 	net struct { | ||||||
| 		starting sync.WaitGroup |  | ||||||
| 		stopping sync.WaitGroup | 		stopping sync.WaitGroup | ||||||
| 		sync.RWMutex | 		sync.RWMutex | ||||||
| 		bind          conn.Bind // bind interface
 | 		bind          conn.Bind // bind interface
 | ||||||
| @ -297,23 +295,18 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device { | |||||||
| 	// start workers
 | 	// start workers
 | ||||||
| 
 | 
 | ||||||
| 	cpus := runtime.NumCPU() | 	cpus := runtime.NumCPU() | ||||||
| 	device.state.starting.Wait() |  | ||||||
| 	device.state.stopping.Wait() | 	device.state.stopping.Wait() | ||||||
| 	for i := 0; i < cpus; i += 1 { | 	for i := 0; i < cpus; i += 1 { | ||||||
| 		device.state.starting.Add(3) |  | ||||||
| 		device.state.stopping.Add(3) | 		device.state.stopping.Add(3) | ||||||
| 		go device.RoutineEncryption() | 		go device.RoutineEncryption() | ||||||
| 		go device.RoutineDecryption() | 		go device.RoutineDecryption() | ||||||
| 		go device.RoutineHandshake() | 		go device.RoutineHandshake() | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	device.state.starting.Add(2) |  | ||||||
| 	device.state.stopping.Add(2) | 	device.state.stopping.Add(2) | ||||||
| 	go device.RoutineReadFromTUN() | 	go device.RoutineReadFromTUN() | ||||||
| 	go device.RoutineTUNEventReader() | 	go device.RoutineTUNEventReader() | ||||||
| 
 | 
 | ||||||
| 	device.state.starting.Wait() |  | ||||||
| 
 |  | ||||||
| 	return device | 	return device | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -370,8 +363,6 @@ func (device *Device) Close() { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	device.state.starting.Wait() |  | ||||||
| 
 |  | ||||||
| 	device.log.Info.Println("Device closing") | 	device.log.Info.Println("Device closing") | ||||||
| 	device.state.changing.Set(true) | 	device.state.changing.Set(true) | ||||||
| 	device.state.Lock() | 	device.state.Lock() | ||||||
| @ -527,11 +518,9 @@ func (device *Device) BindUpdate() error { | |||||||
| 
 | 
 | ||||||
| 		// start receiving routines
 | 		// start receiving routines
 | ||||||
| 
 | 
 | ||||||
| 		device.net.starting.Add(2) |  | ||||||
| 		device.net.stopping.Add(2) | 		device.net.stopping.Add(2) | ||||||
| 		go device.RoutineReceiveIncoming(ipv4.Version, netc.bind) | 		go device.RoutineReceiveIncoming(ipv4.Version, netc.bind) | ||||||
| 		go device.RoutineReceiveIncoming(ipv6.Version, netc.bind) | 		go device.RoutineReceiveIncoming(ipv6.Version, netc.bind) | ||||||
| 		device.net.starting.Wait() |  | ||||||
| 
 | 
 | ||||||
| 		device.log.Debug.Println("UDP bind has been updated") | 		device.log.Debug.Println("UDP bind has been updated") | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -66,8 +66,7 @@ type Peer struct { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	routines struct { | 	routines struct { | ||||||
| 		sync.Mutex                // held when stopping / starting routines
 | 		sync.Mutex                // held when stopping routines
 | ||||||
| 		starting   sync.WaitGroup // routines pending start
 |  | ||||||
| 		stopping   sync.WaitGroup // routines pending stop
 | 		stopping   sync.WaitGroup // routines pending stop
 | ||||||
| 		stop       chan struct{}  // size 0, stop all go routines in peer
 | 		stop       chan struct{}  // size 0, stop all go routines in peer
 | ||||||
| 	} | 	} | ||||||
| @ -189,10 +188,8 @@ func (peer *Peer) Start() { | |||||||
| 
 | 
 | ||||||
| 	// reset routine state
 | 	// reset routine state
 | ||||||
| 
 | 
 | ||||||
| 	peer.routines.starting.Wait() |  | ||||||
| 	peer.routines.stopping.Wait() | 	peer.routines.stopping.Wait() | ||||||
| 	peer.routines.stop = make(chan struct{}) | 	peer.routines.stop = make(chan struct{}) | ||||||
| 	peer.routines.starting.Add(PeerRoutineNumber) |  | ||||||
| 	peer.routines.stopping.Add(PeerRoutineNumber) | 	peer.routines.stopping.Add(PeerRoutineNumber) | ||||||
| 
 | 
 | ||||||
| 	// prepare queues
 | 	// prepare queues
 | ||||||
| @ -213,7 +210,6 @@ func (peer *Peer) Start() { | |||||||
| 	go peer.RoutineSequentialSender() | 	go peer.RoutineSequentialSender() | ||||||
| 	go peer.RoutineSequentialReceiver() | 	go peer.RoutineSequentialReceiver() | ||||||
| 
 | 
 | ||||||
| 	peer.routines.starting.Wait() |  | ||||||
| 	peer.isRunning.Set(true) | 	peer.isRunning.Set(true) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -270,8 +266,6 @@ func (peer *Peer) Stop() { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	peer.routines.starting.Wait() |  | ||||||
| 
 |  | ||||||
| 	peer.routines.Lock() | 	peer.routines.Lock() | ||||||
| 	defer peer.routines.Unlock() | 	defer peer.routines.Unlock() | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -111,7 +111,6 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) { | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - started") | 	logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - started") | ||||||
| 	device.net.starting.Done() |  | ||||||
| 
 | 
 | ||||||
| 	// receive datagrams until conn is closed
 | 	// receive datagrams until conn is closed
 | ||||||
| 
 | 
 | ||||||
| @ -246,7 +245,6 @@ func (device *Device) RoutineDecryption() { | |||||||
| 		device.state.stopping.Done() | 		device.state.stopping.Done() | ||||||
| 	}() | 	}() | ||||||
| 	logDebug.Println("Routine: decryption worker - started") | 	logDebug.Println("Routine: decryption worker - started") | ||||||
| 	device.state.starting.Done() |  | ||||||
| 
 | 
 | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| @ -321,7 +319,6 @@ func (device *Device) RoutineHandshake() { | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	logDebug.Println("Routine: handshake worker - started") | 	logDebug.Println("Routine: handshake worker - started") | ||||||
| 	device.state.starting.Done() |  | ||||||
| 
 | 
 | ||||||
| 	for { | 	for { | ||||||
| 		if elem.buffer != nil { | 		if elem.buffer != nil { | ||||||
| @ -521,8 +518,6 @@ func (peer *Peer) RoutineSequentialReceiver() { | |||||||
| 
 | 
 | ||||||
| 	logDebug.Println(peer, "- Routine: sequential receiver - started") | 	logDebug.Println(peer, "- Routine: sequential receiver - started") | ||||||
| 
 | 
 | ||||||
| 	peer.routines.starting.Done() |  | ||||||
| 
 |  | ||||||
| 	for { | 	for { | ||||||
| 		if elem != nil { | 		if elem != nil { | ||||||
| 			if !elem.IsDropped() { | 			if !elem.IsDropped() { | ||||||
|  | |||||||
| @ -262,7 +262,6 @@ func (device *Device) RoutineReadFromTUN() { | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	logDebug.Println("Routine: TUN reader - started") | 	logDebug.Println("Routine: TUN reader - started") | ||||||
| 	device.state.starting.Done() |  | ||||||
| 
 | 
 | ||||||
| 	var elem *QueueOutboundElement | 	var elem *QueueOutboundElement | ||||||
| 
 | 
 | ||||||
| @ -372,7 +371,6 @@ func (peer *Peer) RoutineNonce() { | |||||||
| 		peer.routines.stopping.Done() | 		peer.routines.stopping.Done() | ||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	peer.routines.starting.Done() |  | ||||||
| 	logDebug.Println(peer, "- Routine: nonce worker - started") | 	logDebug.Println(peer, "- Routine: nonce worker - started") | ||||||
| 
 | 
 | ||||||
| NextPacket: | NextPacket: | ||||||
| @ -507,7 +505,6 @@ func (device *Device) RoutineEncryption() { | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	logDebug.Println("Routine: encryption worker - started") | 	logDebug.Println("Routine: encryption worker - started") | ||||||
| 	device.state.starting.Done() |  | ||||||
| 
 | 
 | ||||||
| 	for { | 	for { | ||||||
| 
 | 
 | ||||||
| @ -596,8 +593,6 @@ func (peer *Peer) RoutineSequentialSender() { | |||||||
| 
 | 
 | ||||||
| 	logDebug.Println(peer, "- Routine: sequential sender - started") | 	logDebug.Println(peer, "- Routine: sequential sender - started") | ||||||
| 
 | 
 | ||||||
| 	peer.routines.starting.Done() |  | ||||||
| 
 |  | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -20,7 +20,6 @@ func (device *Device) RoutineTUNEventReader() { | |||||||
| 	logError := device.log.Error | 	logError := device.log.Error | ||||||
| 
 | 
 | ||||||
| 	logDebug.Println("Routine: event worker - started") | 	logDebug.Println("Routine: event worker - started") | ||||||
| 	device.state.starting.Done() |  | ||||||
| 
 | 
 | ||||||
| 	for event := range device.tun.device.Events() { | 	for event := range device.tun.device.Events() { | ||||||
| 		if event&tun.EventMTUUpdate != 0 { | 		if event&tun.EventMTUUpdate != 0 { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user