package priopool import ( "errors" "fmt" "sync" "github.com/panjf2000/ants/v2" ) type ( // PriorityPool is a pool of goroutines with priority queue buffer. // Based on panjf2000/ants and stdlib heap libraries. PriorityPool struct { mu sync.Mutex // thread safe access to priority queue pool *ants.Pool queue priorityQueue limit int } ) const defaultQueueCapacity = 10 var ( // ErrQueueOverload will be returned on submit operation // when both goroutines pool and priority queue are full. ErrQueueOverload = errors.New("pool and priority queue are full") // ErrPoolCapacitySize will be returned when constructor // provided with non-positive pool capacity. ErrPoolCapacitySize = errors.New("pool capacity must be positive") ) // New creates instance of priority pool. Pool capacity must be positive. // Zero queue capacity disables priority queue. Negative queue capacity // disables priority queue length limit. func New(poolCapacity, queueCapacity int) (*PriorityPool, error) { if poolCapacity <= 0 { return nil, ErrPoolCapacitySize } pool, err := ants.NewPool(poolCapacity, ants.WithNonblocking(true)) if err != nil { return nil, fmt.Errorf("creating pool instance: %w", err) } var queue priorityQueue switch { case queueCapacity >= 0: queue = make(priorityQueue, 0, queueCapacity) case queueCapacity < 0: queue = make(priorityQueue, 0, defaultQueueCapacity) } return &PriorityPool{ pool: pool, queue: queue, limit: queueCapacity, }, nil } // Submit sends the task into priority pool. Non-blocking operation. If pool has // available workers, then task executes immediately. If pool is full, then task // is stored in priority queue. It will be executed when available worker pops // the task from priority queue. Tasks from queue do not evict running tasks // from pool. Tasks with bigger priority number are popped earlier. // If queue is full, submit returns ErrQueueOverload error. func (p *PriorityPool) Submit(priority uint32, task func()) error { p.mu.Lock() // lock from the beginning to avoid starving defer p.mu.Unlock() err := p.pool.Submit(func() { task() // pick the highest priority item from the queue // process items until queue is empty for { p.mu.Lock() if p.queue.Len() == 0 { p.mu.Unlock() return } queueF := p.queue.Pop() p.mu.Unlock() queueF.(*priorityQueueTask).value() } }) if err == nil { return nil } if !errors.Is(err, ants.ErrPoolOverload) { return fmt.Errorf("pool submit: %w", err) } ln := p.queue.Len() if p.limit >= 0 && ln >= p.limit { return ErrQueueOverload } p.queue.Push(&priorityQueueTask{ value: task, priority: int(priority), index: ln, }) return nil }