From 11e44d2e6c1ce0233357b1e54bb1b18437a265dd Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Sat, 23 Oct 2021 18:16:40 +0300 Subject: [PATCH] Reformat priopool as library package --- .github/logo.svg | 4 ++ LICENSE | 21 ++++++++ README.md | 29 +++++++++++ go.mod | 13 ++++- go.sum | 1 + main.go | 130 --------------------------------------------- pool.go | 110 +++++++++++++++++++++++++++++++++++++++ pool_test.go | 133 +++++++++++++++++++++++++++++++++++++++++++++++ queue.go | 39 +++++++------- 9 files changed, 327 insertions(+), 153 deletions(-) create mode 100644 .github/logo.svg create mode 100644 LICENSE create mode 100644 README.md delete mode 100644 main.go create mode 100644 pool.go create mode 100644 pool_test.go diff --git a/.github/logo.svg b/.github/logo.svg new file mode 100644 index 0000000..e2e947e --- /dev/null +++ b/.github/logo.svg @@ -0,0 +1,4 @@ + + + +Poolμλ \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c3d0af9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Alexey Vanin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a2f5710 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +

+ +

+

+ Goroutines pool with priority queue buffer. +

+ +--- + +## Overview + +Package `priopool` provides goroutines pool based on +[panjf2000/ants](https://github.com/panjf2000/ants) library with priority queue +buffer based on [stdlib heap](https://pkg.go.dev/container/heap) package. + +Priority pool: +- is non-blocking, +- prioritizes tasks with higher priority value, +- can be configured with unlimited queue buffer. + +## Install + +```powershell +go get -u github.com/alexvanin/priopool +``` + +## License + +Source code is available under the [MIT License](/LICENSE). diff --git a/go.mod b/go.mod index ea473f6..d48d0b9 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,14 @@ -module priopool +module github.com/alexvanin/priopool go 1.17 -require github.com/panjf2000/ants/v2 v2.4.6 +require ( + github.com/panjf2000/ants/v2 v2.4.6 + github.com/stretchr/testify v1.4.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v2 v2.2.7 // indirect +) diff --git a/go.sum b/go.sum index 88d6b94..da4b33e 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= diff --git a/main.go b/main.go deleted file mode 100644 index 21bc79a..0000000 --- a/main.go +++ /dev/null @@ -1,130 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "log" - "sync" - "time" - - "github.com/panjf2000/ants/v2" -) - -type ( - PriorityPool struct { - mu sync.Mutex // PriorityQueue is not thread safe - pool *ants.Pool - queue PriorityQueue - limit int - } -) - -const ( - PoolCapacity = 2 - QueueCapacity = 4 -) - -func (p *PriorityPool) Submit(pri int, f func()) error { - err := p.pool.Submit(func() { - f() - - for { - // pick the highest priority item from the queue if there is any - p.mu.Lock() - if p.queue.Len() == 0 { - p.mu.Unlock() - return - } - queueF := p.queue.Pop() - p.mu.Unlock() - - queueF.(*Item).value() - } - }) - if err == nil { - return nil - } - - if !errors.Is(err, ants.ErrPoolOverload) { - return err - } - - p.mu.Lock() - defer p.mu.Unlock() - - ln := p.queue.Len() - if ln >= QueueCapacity { - return errors.New("queue capacity is full") - } - - p.queue.Push(&Item{ - value: f, - priority: pri, - index: ln, - }) - - return nil -} - -func NewPriorityPool() (*PriorityPool, error) { - pool, err := ants.NewPool(PoolCapacity, ants.WithNonblocking(true)) - if err != nil { - return nil, fmt.Errorf("new pool: %w", err) - } - - return &PriorityPool{ - pool: pool, - queue: make(PriorityQueue, 0, QueueCapacity), - limit: QueueCapacity, - }, nil -} - -func main() { - p, err := NewPriorityPool() - if err != nil { - log.Fatal(err) - } - - wg := new(sync.WaitGroup) - wg.Add(QueueCapacity + PoolCapacity) - - for i := 0; i < QueueCapacity+PoolCapacity+1; i++ { - if i < 4 { - err = p.Submit(1, func() { - time.Sleep(1 * time.Second) - fmt.Println("Low priority task is done") - wg.Done() - }) - fmt.Printf("id:%d error:%v\n", i+1, err) - } else if i < 6 { - err = p.Submit(10, func() { - time.Sleep(1 * time.Second) - fmt.Println("High priority task is done") - wg.Done() - }) - fmt.Printf("id:%d error:%v\n", i+1, err) - } else { - err = p.Submit(10, func() {}) - fmt.Printf("id:%d error:%v\n", i+1, err) - } - } - fmt.Println() - wg.Wait() - - /* - id:1 error: - id:2 error: - id:3 error: - id:4 error: - id:5 error: - id:6 error: - id:7 error:queue capacity is full - - Low priority task is done - Low priority task is done - High priority task is done - High priority task is done - Low priority task is done - Low priority task is done - */ -} diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..aa14543 --- /dev/null +++ b/pool.go @@ -0,0 +1,110 @@ +package priopool + +import ( + "errors" + "fmt" + "sync" + + "github.com/panjf2000/ants/v2" +) + +type ( + // PriorityPool is a pool of goroutines with priority queue buffer. + // Based on panjf2000/ants and stdlib heap libraries. + PriorityPool struct { + mu sync.Mutex // thread safe access to priority queue + pool *ants.Pool + queue priorityQueue + limit int + } +) + +const defaultQueueCapacity = 10 + +var ( + // ErrQueueOverload will be returned on submit operation + // when both goroutines pool and priority queue are full. + ErrQueueOverload = errors.New("pool and priority queue are full") + + // ErrPoolCapacitySize will be returned when constructor + // provided with non-positive pool capacity. + ErrPoolCapacitySize = errors.New("pool capacity must be positive") +) + +// New creates instance of priority pool. Pool capacity must be positive. +// Zero queue capacity disables priority queue. Negative queue capacity +// disables priority queue length limit. +func New(poolCapacity, queueCapacity int) (*PriorityPool, error) { + if poolCapacity <= 0 { + return nil, ErrPoolCapacitySize + } + + pool, err := ants.NewPool(poolCapacity, ants.WithNonblocking(true)) + if err != nil { + return nil, fmt.Errorf("creating pool instance: %w", err) + } + + var queue priorityQueue + + switch { + case queueCapacity >= 0: + queue = make(priorityQueue, 0, queueCapacity) + case queueCapacity < 0: + queue = make(priorityQueue, 0, defaultQueueCapacity) + } + + return &PriorityPool{ + pool: pool, + queue: queue, + limit: queueCapacity, + }, nil +} + +// Submit sends the task into priority pool. Non-blocking operation. If pool has +// available workers, then task executes immediately. If pool is full, then task +// is stored in priority queue. It will be executed when available worker pops +// the task from priority queue. Tasks from queue do not evict running tasks +// from pool. Tasks with bigger priority number are popped earlier. +// If queue is full, submit returns ErrQueueOverload error. +func (p *PriorityPool) Submit(priority uint32, task func()) error { + p.mu.Lock() // lock from the beginning to avoid starving + defer p.mu.Unlock() + + err := p.pool.Submit(func() { + task() + + // pick the highest priority item from the queue + // process items until queue is empty + for { + p.mu.Lock() + if p.queue.Len() == 0 { + p.mu.Unlock() + return + } + queueF := p.queue.Pop() + p.mu.Unlock() + + queueF.(*priorityQueueTask).value() + } + }) + if err == nil { + return nil + } + + if !errors.Is(err, ants.ErrPoolOverload) { + return fmt.Errorf("pool submit: %w", err) + } + + ln := p.queue.Len() + if p.limit >= 0 && ln >= p.limit { + return ErrQueueOverload + } + + p.queue.Push(&priorityQueueTask{ + value: task, + priority: int(priority), + index: ln, + }) + + return nil +} diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..ad1c4b8 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,133 @@ +package priopool_test + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/alexvanin/priopool" +) + +type syncList struct { + mu sync.Mutex + list []int +} + +func (s *syncList) append(i int) { + s.mu.Lock() + defer s.mu.Unlock() + + s.list = append(s.list, i) +} + +func TestPriorityPool_New(t *testing.T) { + const pos, zero, neg = 1, 0, -1 + + cases := [...]struct { + pool, queue int + err error + }{ + {pool: pos, queue: pos, err: nil}, + {pool: pos, queue: zero, err: nil}, + {pool: pos, queue: neg, err: nil}, + {pool: zero, queue: pos, err: priopool.ErrPoolCapacitySize}, + {pool: neg, queue: pos, err: priopool.ErrPoolCapacitySize}, + } + + for _, c := range cases { + _, err := priopool.New(c.pool, c.queue) + require.Equal(t, c.err, err, c) + } +} + +func TestPriorityPool_Submit(t *testing.T) { + const ( + poolCap, queueCap = 2, 4 + totalCap = poolCap + queueCap + highPriority = 10 + midPriority = 5 + lowPriority = 1 + ) + + p, err := priopool.New(poolCap, queueCap) + require.NoError(t, err) + + result := new(syncList) + wg := new(sync.WaitGroup) + wg.Add(totalCap) + + for i := 0; i < totalCap; i++ { + var priority uint32 + + switch { + case i < poolCap: + priority = midPriority // first put two middle priority tasks + case i < poolCap+queueCap/2: + priority = lowPriority // then add to queue two low priority tasks + default: + priority = highPriority // in the end fill queue with high priority tasks + } + + err = p.Submit(priority, taskGenerator(int(priority), result, wg)) + require.NoError(t, err) + } + + err = p.Submit(highPriority, func() {}) + require.Error(t, err, priopool.ErrQueueOverload) + + wg.Wait() + + expected := []int{ + midPriority, midPriority, // first tasks that took workers from pool + highPriority, highPriority, // tasks from queue with higher priority + lowPriority, lowPriority, // remaining tasks from queue + } + require.Equal(t, expected, result.list) + + t.Run("disabled queue", func(t *testing.T) { + p, err := priopool.New(poolCap, 0) + require.NoError(t, err) + + wg := new(sync.WaitGroup) + wg.Add(poolCap) + + for i := 0; i < poolCap; i++ { + err = p.Submit(lowPriority, taskGenerator(i, nil, wg)) + require.NoError(t, err) + } + + err = p.Submit(highPriority, func() {}) + require.Error(t, err, priopool.ErrQueueOverload) + + wg.Wait() + }) + + t.Run("disabled queue limit", func(t *testing.T) { + const n = 50 + + p, err := priopool.New(poolCap, -1) + require.NoError(t, err) + + wg := new(sync.WaitGroup) + wg.Add(n) + + for i := 0; i < n; i++ { + err = p.Submit(lowPriority, taskGenerator(i, nil, wg)) + require.NoError(t, err) + } + + wg.Wait() + }) +} + +func taskGenerator(ind int, output *syncList, wg *sync.WaitGroup) func() { + return func() { + time.Sleep(100 * time.Millisecond) + if output != nil { + output.append(ind) + } + wg.Done() + } +} diff --git a/queue.go b/queue.go index 40f7dba..5d137ee 100644 --- a/queue.go +++ b/queue.go @@ -1,54 +1,51 @@ -package main +package priopool -// From https://golang.org/src/container/heap/example_pq_test.go +// Priority pool based on implementation example from heap package. +// Priority queue itself is not thread safe. +// See https://cs.opensource.google/go/go/+/refs/tags/go1.17.2:src/container/heap/example_pq_test.go import ( "container/heap" ) -// An Item is something we manage in a priority queue. -type Item struct { - value func() // The value of the item; arbitrary. - priority int // The priority of the item in the queue. - // The index is needed by update and is maintained by the heap.Interface methods. - index int // The index of the item in the heap. +type priorityQueueTask struct { + value func() + priority int + index int // the index is needed by update and is maintained by the heap.Interface methods } -// A PriorityQueue implements heap.Interface and holds Items. -type PriorityQueue []*Item +type priorityQueue []*priorityQueueTask -func (pq PriorityQueue) Len() int { return len(pq) } +func (pq priorityQueue) Len() int { return len(pq) } -func (pq PriorityQueue) Less(i, j int) bool { - // We want Pop to give us the highest, not lowest, priority so we use greater than here. +func (pq priorityQueue) Less(i, j int) bool { return pq[i].priority > pq[j].priority } -func (pq PriorityQueue) Swap(i, j int) { +func (pq priorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } -func (pq *PriorityQueue) Push(x interface{}) { +func (pq *priorityQueue) Push(x interface{}) { n := len(*pq) - item := x.(*Item) + item := x.(*priorityQueueTask) item.index = n *pq = append(*pq, item) } -func (pq *PriorityQueue) Pop() interface{} { +func (pq *priorityQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety + old[n-1] = nil + item.index = -1 *pq = old[0 : n-1] return item } -// update modifies the priority and value of an Item in the queue. -func (pq *PriorityQueue) update(item *Item, value func(), priority int) { +func (pq *priorityQueue) update(item *priorityQueueTask, value func(), priority int) { item.value = value item.priority = priority heap.Fix(pq, item.index)