Make it easy to restrict queue sizes more
This commit is contained in:
		
							parent
							
								
									ebc7541953
								
							
						
					
					
						commit
						70bcf9ecb8
					
				
							
								
								
									
										14
									
								
								constants.go
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								constants.go
									
									
									
									
									
								
							@ -26,18 +26,14 @@ const (
 | 
			
		||||
	PaddingMultiple         = 16
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/* Implementation specific constants */
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	QueueOutboundSize  = 1024
 | 
			
		||||
	QueueInboundSize   = 1024
 | 
			
		||||
	QueueHandshakeSize = 1024
 | 
			
		||||
	MaxSegmentSize     = (1 << 16) - 1                         // largest possible UDP datagram
 | 
			
		||||
	MinMessageSize     = MessageKeepaliveSize                  // minimum size of transport message (keepalive)
 | 
			
		||||
	MaxMessageSize     = MaxSegmentSize                        // maximum size of transport message
 | 
			
		||||
	MaxContentSize     = MaxSegmentSize - MessageTransportSize // maximum size of transport message content
 | 
			
		||||
	MinMessageSize = MessageKeepaliveSize                  // minimum size of transport message (keepalive)
 | 
			
		||||
	MaxMessageSize = MaxSegmentSize                        // maximum size of transport message
 | 
			
		||||
	MaxContentSize = MaxSegmentSize - MessageTransportSize // maximum size of transport message content
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/* Implementation constants */
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	UnderLoadQueueSize = QueueHandshakeSize / 8
 | 
			
		||||
	UnderLoadAfterTime = time.Second // how long does the device remain under load after detected
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										28
									
								
								pools.go
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								pools.go
									
									
									
									
									
								
							@ -7,10 +7,8 @@ package main
 | 
			
		||||
 | 
			
		||||
import "sync"
 | 
			
		||||
 | 
			
		||||
var preallocatedBuffers = 0
 | 
			
		||||
 | 
			
		||||
func (device *Device) PopulatePools() {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		device.pool.messageBufferPool = &sync.Pool{
 | 
			
		||||
			New: func() interface{} {
 | 
			
		||||
				return new([MaxMessageSize]byte)
 | 
			
		||||
@ -27,23 +25,23 @@ func (device *Device) PopulatePools() {
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		device.pool.messageBufferReuseChan = make(chan *[MaxMessageSize]byte, preallocatedBuffers)
 | 
			
		||||
		for i := 0; i < preallocatedBuffers; i += 1 {
 | 
			
		||||
		device.pool.messageBufferReuseChan = make(chan *[MaxMessageSize]byte, PreallocatedBuffersPerPool)
 | 
			
		||||
		for i := 0; i < PreallocatedBuffersPerPool; i += 1 {
 | 
			
		||||
			device.pool.messageBufferReuseChan <- new([MaxMessageSize]byte)
 | 
			
		||||
		}
 | 
			
		||||
		device.pool.inboundElementReuseChan = make(chan *QueueInboundElement, preallocatedBuffers)
 | 
			
		||||
		for i := 0; i < preallocatedBuffers; i += 1 {
 | 
			
		||||
		device.pool.inboundElementReuseChan = make(chan *QueueInboundElement, PreallocatedBuffersPerPool)
 | 
			
		||||
		for i := 0; i < PreallocatedBuffersPerPool; i += 1 {
 | 
			
		||||
			device.pool.inboundElementReuseChan <- new(QueueInboundElement)
 | 
			
		||||
		}
 | 
			
		||||
		device.pool.outboundElementReuseChan = make(chan *QueueOutboundElement, preallocatedBuffers)
 | 
			
		||||
		for i := 0; i < preallocatedBuffers; i += 1 {
 | 
			
		||||
		device.pool.outboundElementReuseChan = make(chan *QueueOutboundElement, PreallocatedBuffersPerPool)
 | 
			
		||||
		for i := 0; i < PreallocatedBuffersPerPool; i += 1 {
 | 
			
		||||
			device.pool.outboundElementReuseChan <- new(QueueOutboundElement)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		return device.pool.messageBufferPool.Get().(*[MaxMessageSize]byte)
 | 
			
		||||
	} else {
 | 
			
		||||
		return <-device.pool.messageBufferReuseChan
 | 
			
		||||
@ -51,7 +49,7 @@ func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (device *Device) PutMessageBuffer(msg *[MaxMessageSize]byte) {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		device.pool.messageBufferPool.Put(msg)
 | 
			
		||||
	} else {
 | 
			
		||||
		device.pool.messageBufferReuseChan <- msg
 | 
			
		||||
@ -59,7 +57,7 @@ func (device *Device) PutMessageBuffer(msg *[MaxMessageSize]byte) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (device *Device) GetInboundElement() *QueueInboundElement {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		return device.pool.inboundElementPool.Get().(*QueueInboundElement)
 | 
			
		||||
	} else {
 | 
			
		||||
		return <-device.pool.inboundElementReuseChan
 | 
			
		||||
@ -67,7 +65,7 @@ func (device *Device) GetInboundElement() *QueueInboundElement {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (device *Device) PutInboundElement(msg *QueueInboundElement) {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		device.pool.inboundElementPool.Put(msg)
 | 
			
		||||
	} else {
 | 
			
		||||
		device.pool.inboundElementReuseChan <- msg
 | 
			
		||||
@ -75,7 +73,7 @@ func (device *Device) PutInboundElement(msg *QueueInboundElement) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (device *Device) GetOutboundElement() *QueueOutboundElement {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		return device.pool.outboundElementPool.Get().(*QueueOutboundElement)
 | 
			
		||||
	} else {
 | 
			
		||||
		return <-device.pool.outboundElementReuseChan
 | 
			
		||||
@ -83,7 +81,7 @@ func (device *Device) GetOutboundElement() *QueueOutboundElement {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (device *Device) PutOutboundElement(msg *QueueOutboundElement) {
 | 
			
		||||
	if preallocatedBuffers == 0 {
 | 
			
		||||
	if PreallocatedBuffersPerPool == 0 {
 | 
			
		||||
		device.pool.outboundElementPool.Put(msg)
 | 
			
		||||
	} else {
 | 
			
		||||
		device.pool.outboundElementReuseChan <- msg
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user