109 lines
2.7 KiB
Go
109 lines
2.7 KiB
Go
package priopool
|
|
|
|
import (
|
|
"container/heap"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
)
|
|
|
|
type (
|
|
// PriorityPool is a pool of goroutines with priority queue buffer.
|
|
// Based on panjf2000/ants and stdlib heap libraries.
|
|
PriorityPool struct {
|
|
mu sync.Mutex // thread safe access to priority queue
|
|
pool *ants.Pool
|
|
queue priorityQueue
|
|
limit int
|
|
}
|
|
)
|
|
|
|
const defaultQueueCapacity = 10
|
|
|
|
var (
|
|
// ErrQueueOverload will be returned on submit operation
|
|
// when both goroutines pool and priority queue are full.
|
|
ErrQueueOverload = errors.New("pool and priority queue are full")
|
|
|
|
// ErrPoolCapacitySize will be returned when constructor
|
|
// provided with non-positive pool capacity.
|
|
ErrPoolCapacitySize = errors.New("pool capacity must be positive")
|
|
)
|
|
|
|
// New creates instance of priority pool. Pool capacity must be positive.
|
|
// Zero queue capacity disables priority queue. Negative queue capacity
|
|
// disables priority queue length limit.
|
|
func New(poolCapacity, queueCapacity int) (*PriorityPool, error) {
|
|
if poolCapacity <= 0 {
|
|
return nil, ErrPoolCapacitySize
|
|
}
|
|
|
|
pool, err := ants.NewPool(poolCapacity, ants.WithNonblocking(true))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating pool instance: %w", err)
|
|
}
|
|
|
|
var queue priorityQueue
|
|
|
|
switch {
|
|
case queueCapacity >= 0:
|
|
queue.tasks = make([]*priorityQueueTask, 0, queueCapacity)
|
|
case queueCapacity < 0:
|
|
queue.tasks = make([]*priorityQueueTask, 0, defaultQueueCapacity)
|
|
}
|
|
|
|
return &PriorityPool{
|
|
pool: pool,
|
|
queue: queue,
|
|
limit: queueCapacity,
|
|
}, nil
|
|
}
|
|
|
|
// Submit sends the task into priority pool. Non-blocking operation. If pool has
|
|
// available workers, then task executes immediately. If pool is full, then task
|
|
// is stored in priority queue. It will be executed when available worker pops
|
|
// the task from priority queue. Tasks from queue do not evict running tasks
|
|
// from pool. Tasks with bigger priority number are popped earlier.
|
|
// If queue is full, submit returns ErrQueueOverload error.
|
|
func (p *PriorityPool) Submit(priority uint32, task func()) error {
|
|
p.mu.Lock() // lock from the beginning to avoid starving
|
|
defer p.mu.Unlock()
|
|
|
|
err := p.pool.Submit(func() {
|
|
task()
|
|
|
|
// pick the highest priority item from the queue
|
|
// process items until queue is empty
|
|
for {
|
|
p.mu.Lock()
|
|
if p.queue.Len() == 0 {
|
|
p.mu.Unlock()
|
|
return
|
|
}
|
|
queueF := heap.Pop(&p.queue)
|
|
p.mu.Unlock()
|
|
|
|
queueF.(*priorityQueueTask).value()
|
|
}
|
|
})
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !errors.Is(err, ants.ErrPoolOverload) {
|
|
return fmt.Errorf("pool submit: %w", err)
|
|
}
|
|
|
|
if p.limit >= 0 && p.queue.Len() >= p.limit {
|
|
return ErrQueueOverload
|
|
}
|
|
|
|
heap.Push(&p.queue, &priorityQueueTask{
|
|
value: task,
|
|
priority: int(priority),
|
|
})
|
|
|
|
return nil
|
|
}
|