Initial commit

This commit is contained in:
Alex Vanin 2021-10-20 17:02:55 +03:00
commit 5dcd3cf578
4 changed files with 204 additions and 0 deletions

5
go.mod Normal file
View file

@ -0,0 +1,5 @@
module priopool
go 1.17
require github.com/panjf2000/ants/v2 v2.4.6

14
go.sum Normal file
View file

@ -0,0 +1,14 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ=
github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

130
main.go Normal file
View file

@ -0,0 +1,130 @@
package main
import (
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
type (
PriorityPool struct {
mu sync.Mutex // PriorityQueue is not thread safe
pool *ants.Pool
queue PriorityQueue
limit int
}
)
const (
PoolCapacity = 2
QueueCapacity = 4
)
func (p *PriorityPool) Submit(pri int, f func()) error {
err := p.pool.Submit(func() {
f()
for {
// pick the highest priority item from the queue if there is any
p.mu.Lock()
if p.queue.Len() == 0 {
p.mu.Unlock()
return
}
queueF := p.queue.Pop()
p.mu.Unlock()
queueF.(*Item).value()
}
})
if err == nil {
return nil
}
if !errors.Is(err, ants.ErrPoolOverload) {
return err
}
p.mu.Lock()
defer p.mu.Unlock()
ln := p.queue.Len()
if ln >= QueueCapacity {
return errors.New("queue capacity is full")
}
p.queue.Push(&Item{
value: f,
priority: pri,
index: ln,
})
return nil
}
func NewPriorityPool() (*PriorityPool, error) {
pool, err := ants.NewPool(PoolCapacity, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("new pool: %w", err)
}
return &PriorityPool{
pool: pool,
queue: make(PriorityQueue, 0, QueueCapacity),
limit: QueueCapacity,
}, nil
}
func main() {
p, err := NewPriorityPool()
if err != nil {
log.Fatal(err)
}
wg := new(sync.WaitGroup)
wg.Add(QueueCapacity + PoolCapacity)
for i := 0; i < QueueCapacity+PoolCapacity+1; i++ {
if i < 4 {
err = p.Submit(1, func() {
time.Sleep(1 * time.Second)
fmt.Println("Low priority task is done")
wg.Done()
})
fmt.Printf("id:%d <low priority task> error:%v\n", i+1, err)
} else if i < 6 {
err = p.Submit(10, func() {
time.Sleep(1 * time.Second)
fmt.Println("High priority task is done")
wg.Done()
})
fmt.Printf("id:%d <high priority task> error:%v\n", i+1, err)
} else {
err = p.Submit(10, func() {})
fmt.Printf("id:%d <out of capacity task> error:%v\n", i+1, err)
}
}
fmt.Println()
wg.Wait()
/*
id:1 <low priority task> error:<nil>
id:2 <low priority task> error:<nil>
id:3 <low priority task> error:<nil>
id:4 <low priority task> error:<nil>
id:5 <high priority task> error:<nil>
id:6 <high priority task> error:<nil>
id:7 <out of capacity task> error:queue capacity is full
Low priority task is done
Low priority task is done
High priority task is done
High priority task is done
Low priority task is done
Low priority task is done
*/
}

55
queue.go Normal file
View file

@ -0,0 +1,55 @@
package main
// From https://golang.org/src/container/heap/example_pq_test.go
import (
"container/heap"
)
// An Item is something we manage in a priority queue.
type Item struct {
value func() // The value of the item; arbitrary.
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return pq[i].priority > pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, value func(), priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)
}