commit 5dcd3cf57889c94be82885a50c56c04f95a5b8a0 Author: Alex Vanin Date: Wed Oct 20 17:02:55 2021 +0300 Initial commit diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ea473f6 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module priopool + +go 1.17 + +require github.com/panjf2000/ants/v2 v2.4.6 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..88d6b94 --- /dev/null +++ b/go.sum @@ -0,0 +1,14 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ= +github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..21bc79a --- /dev/null +++ b/main.go @@ -0,0 +1,130 @@ +package main + +import ( + "errors" + "fmt" + "log" + "sync" + "time" + + "github.com/panjf2000/ants/v2" +) + +type ( + PriorityPool struct { + mu sync.Mutex // PriorityQueue is not thread safe + pool *ants.Pool + queue PriorityQueue + limit int + } +) + +const ( + PoolCapacity = 2 + QueueCapacity = 4 +) + +func (p *PriorityPool) Submit(pri int, f func()) error { + err := p.pool.Submit(func() { + f() + + for { + // pick the highest priority item from the queue if there is any + p.mu.Lock() + if p.queue.Len() == 0 { + p.mu.Unlock() + return + } + queueF := p.queue.Pop() + p.mu.Unlock() + + queueF.(*Item).value() + } + }) + if err == nil { + return nil + } + + if !errors.Is(err, ants.ErrPoolOverload) { + return err + } + + p.mu.Lock() + defer p.mu.Unlock() + + ln := p.queue.Len() + if ln >= QueueCapacity { + return errors.New("queue capacity is full") + } + + p.queue.Push(&Item{ + value: f, + priority: pri, + index: ln, + }) + + return nil +} + +func NewPriorityPool() (*PriorityPool, error) { + pool, err := ants.NewPool(PoolCapacity, ants.WithNonblocking(true)) + if err != nil { + return nil, fmt.Errorf("new pool: %w", err) + } + + return &PriorityPool{ + pool: pool, + queue: make(PriorityQueue, 0, QueueCapacity), + limit: QueueCapacity, + }, nil +} + +func main() { + p, err := NewPriorityPool() + if err != nil { + log.Fatal(err) + } + + wg := new(sync.WaitGroup) + wg.Add(QueueCapacity + PoolCapacity) + + for i := 0; i < QueueCapacity+PoolCapacity+1; i++ { + if i < 4 { + err = p.Submit(1, func() { + time.Sleep(1 * time.Second) + fmt.Println("Low priority task is done") + wg.Done() + }) + fmt.Printf("id:%d error:%v\n", i+1, err) + } else if i < 6 { + err = p.Submit(10, func() { + time.Sleep(1 * time.Second) + fmt.Println("High priority task is done") + wg.Done() + }) + fmt.Printf("id:%d error:%v\n", i+1, err) + } else { + err = p.Submit(10, func() {}) + fmt.Printf("id:%d error:%v\n", i+1, err) + } + } + fmt.Println() + wg.Wait() + + /* + id:1 error: + id:2 error: + id:3 error: + id:4 error: + id:5 error: + id:6 error: + id:7 error:queue capacity is full + + Low priority task is done + Low priority task is done + High priority task is done + High priority task is done + Low priority task is done + Low priority task is done + */ +} diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..40f7dba --- /dev/null +++ b/queue.go @@ -0,0 +1,55 @@ +package main + +// From https://golang.org/src/container/heap/example_pq_test.go + +import ( + "container/heap" +) + +// An Item is something we manage in a priority queue. +type Item struct { + value func() // The value of the item; arbitrary. + priority int // The priority of the item in the queue. + // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// A PriorityQueue implements heap.Interface and holds Items. +type PriorityQueue []*Item + +func (pq PriorityQueue) Len() int { return len(pq) } + +func (pq PriorityQueue) Less(i, j int) bool { + // We want Pop to give us the highest, not lowest, priority so we use greater than here. + return pq[i].priority > pq[j].priority +} + +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *PriorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*Item) + item.index = n + *pq = append(*pq, item) +} + +func (pq *PriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// update modifies the priority and value of an Item in the queue. +func (pq *PriorityQueue) update(item *Item, value func(), priority int) { + item.value = value + item.priority = priority + heap.Fix(pq, item.index) +}