diff --git a/.github/logo.svg b/.github/logo.svg
new file mode 100644
index 0000000..e2e947e
--- /dev/null
+++ b/.github/logo.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..c3d0af9
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2021 Alexey Vanin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a2f5710
--- /dev/null
+++ b/README.md
@@ -0,0 +1,29 @@
+
+
+
+
+ Goroutines pool with priority queue buffer.
+
+
+---
+
+## Overview
+
+Package `priopool` provides goroutines pool based on
+[panjf2000/ants](https://github.com/panjf2000/ants) library with priority queue
+buffer based on [stdlib heap](https://pkg.go.dev/container/heap) package.
+
+Priority pool:
+- is non-blocking,
+- prioritizes tasks with higher priority value,
+- can be configured with unlimited queue buffer.
+
+## Install
+
+```powershell
+go get -u github.com/alexvanin/priopool
+```
+
+## License
+
+Source code is available under the [MIT License](/LICENSE).
diff --git a/go.mod b/go.mod
index ea473f6..d48d0b9 100644
--- a/go.mod
+++ b/go.mod
@@ -1,5 +1,14 @@
-module priopool
+module github.com/alexvanin/priopool
go 1.17
-require github.com/panjf2000/ants/v2 v2.4.6
+require (
+ github.com/panjf2000/ants/v2 v2.4.6
+ github.com/stretchr/testify v1.4.0
+)
+
+require (
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ gopkg.in/yaml.v2 v2.2.7 // indirect
+)
diff --git a/go.sum b/go.sum
index 88d6b94..da4b33e 100644
--- a/go.sum
+++ b/go.sum
@@ -8,6 +8,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
diff --git a/main.go b/main.go
deleted file mode 100644
index 21bc79a..0000000
--- a/main.go
+++ /dev/null
@@ -1,130 +0,0 @@
-package main
-
-import (
- "errors"
- "fmt"
- "log"
- "sync"
- "time"
-
- "github.com/panjf2000/ants/v2"
-)
-
-type (
- PriorityPool struct {
- mu sync.Mutex // PriorityQueue is not thread safe
- pool *ants.Pool
- queue PriorityQueue
- limit int
- }
-)
-
-const (
- PoolCapacity = 2
- QueueCapacity = 4
-)
-
-func (p *PriorityPool) Submit(pri int, f func()) error {
- err := p.pool.Submit(func() {
- f()
-
- for {
- // pick the highest priority item from the queue if there is any
- p.mu.Lock()
- if p.queue.Len() == 0 {
- p.mu.Unlock()
- return
- }
- queueF := p.queue.Pop()
- p.mu.Unlock()
-
- queueF.(*Item).value()
- }
- })
- if err == nil {
- return nil
- }
-
- if !errors.Is(err, ants.ErrPoolOverload) {
- return err
- }
-
- p.mu.Lock()
- defer p.mu.Unlock()
-
- ln := p.queue.Len()
- if ln >= QueueCapacity {
- return errors.New("queue capacity is full")
- }
-
- p.queue.Push(&Item{
- value: f,
- priority: pri,
- index: ln,
- })
-
- return nil
-}
-
-func NewPriorityPool() (*PriorityPool, error) {
- pool, err := ants.NewPool(PoolCapacity, ants.WithNonblocking(true))
- if err != nil {
- return nil, fmt.Errorf("new pool: %w", err)
- }
-
- return &PriorityPool{
- pool: pool,
- queue: make(PriorityQueue, 0, QueueCapacity),
- limit: QueueCapacity,
- }, nil
-}
-
-func main() {
- p, err := NewPriorityPool()
- if err != nil {
- log.Fatal(err)
- }
-
- wg := new(sync.WaitGroup)
- wg.Add(QueueCapacity + PoolCapacity)
-
- for i := 0; i < QueueCapacity+PoolCapacity+1; i++ {
- if i < 4 {
- err = p.Submit(1, func() {
- time.Sleep(1 * time.Second)
- fmt.Println("Low priority task is done")
- wg.Done()
- })
- fmt.Printf("id:%d error:%v\n", i+1, err)
- } else if i < 6 {
- err = p.Submit(10, func() {
- time.Sleep(1 * time.Second)
- fmt.Println("High priority task is done")
- wg.Done()
- })
- fmt.Printf("id:%d error:%v\n", i+1, err)
- } else {
- err = p.Submit(10, func() {})
- fmt.Printf("id:%d error:%v\n", i+1, err)
- }
- }
- fmt.Println()
- wg.Wait()
-
- /*
- id:1 error:
- id:2 error:
- id:3 error:
- id:4 error:
- id:5 error:
- id:6 error:
- id:7 error:queue capacity is full
-
- Low priority task is done
- Low priority task is done
- High priority task is done
- High priority task is done
- Low priority task is done
- Low priority task is done
- */
-}
diff --git a/pool.go b/pool.go
new file mode 100644
index 0000000..aa14543
--- /dev/null
+++ b/pool.go
@@ -0,0 +1,110 @@
+package priopool
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ "github.com/panjf2000/ants/v2"
+)
+
+type (
+ // PriorityPool is a pool of goroutines with priority queue buffer.
+ // Based on panjf2000/ants and stdlib heap libraries.
+ PriorityPool struct {
+ mu sync.Mutex // thread safe access to priority queue
+ pool *ants.Pool
+ queue priorityQueue
+ limit int
+ }
+)
+
+const defaultQueueCapacity = 10
+
+var (
+ // ErrQueueOverload will be returned on submit operation
+ // when both goroutines pool and priority queue are full.
+ ErrQueueOverload = errors.New("pool and priority queue are full")
+
+ // ErrPoolCapacitySize will be returned when constructor
+ // provided with non-positive pool capacity.
+ ErrPoolCapacitySize = errors.New("pool capacity must be positive")
+)
+
+// New creates instance of priority pool. Pool capacity must be positive.
+// Zero queue capacity disables priority queue. Negative queue capacity
+// disables priority queue length limit.
+func New(poolCapacity, queueCapacity int) (*PriorityPool, error) {
+ if poolCapacity <= 0 {
+ return nil, ErrPoolCapacitySize
+ }
+
+ pool, err := ants.NewPool(poolCapacity, ants.WithNonblocking(true))
+ if err != nil {
+ return nil, fmt.Errorf("creating pool instance: %w", err)
+ }
+
+ var queue priorityQueue
+
+ switch {
+ case queueCapacity >= 0:
+ queue = make(priorityQueue, 0, queueCapacity)
+ case queueCapacity < 0:
+ queue = make(priorityQueue, 0, defaultQueueCapacity)
+ }
+
+ return &PriorityPool{
+ pool: pool,
+ queue: queue,
+ limit: queueCapacity,
+ }, nil
+}
+
+// Submit sends the task into priority pool. Non-blocking operation. If pool has
+// available workers, then task executes immediately. If pool is full, then task
+// is stored in priority queue. It will be executed when available worker pops
+// the task from priority queue. Tasks from queue do not evict running tasks
+// from pool. Tasks with bigger priority number are popped earlier.
+// If queue is full, submit returns ErrQueueOverload error.
+func (p *PriorityPool) Submit(priority uint32, task func()) error {
+ p.mu.Lock() // lock from the beginning to avoid starving
+ defer p.mu.Unlock()
+
+ err := p.pool.Submit(func() {
+ task()
+
+ // pick the highest priority item from the queue
+ // process items until queue is empty
+ for {
+ p.mu.Lock()
+ if p.queue.Len() == 0 {
+ p.mu.Unlock()
+ return
+ }
+ queueF := p.queue.Pop()
+ p.mu.Unlock()
+
+ queueF.(*priorityQueueTask).value()
+ }
+ })
+ if err == nil {
+ return nil
+ }
+
+ if !errors.Is(err, ants.ErrPoolOverload) {
+ return fmt.Errorf("pool submit: %w", err)
+ }
+
+ ln := p.queue.Len()
+ if p.limit >= 0 && ln >= p.limit {
+ return ErrQueueOverload
+ }
+
+ p.queue.Push(&priorityQueueTask{
+ value: task,
+ priority: int(priority),
+ index: ln,
+ })
+
+ return nil
+}
diff --git a/pool_test.go b/pool_test.go
new file mode 100644
index 0000000..ad1c4b8
--- /dev/null
+++ b/pool_test.go
@@ -0,0 +1,133 @@
+package priopool_test
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/alexvanin/priopool"
+)
+
+type syncList struct {
+ mu sync.Mutex
+ list []int
+}
+
+func (s *syncList) append(i int) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.list = append(s.list, i)
+}
+
+func TestPriorityPool_New(t *testing.T) {
+ const pos, zero, neg = 1, 0, -1
+
+ cases := [...]struct {
+ pool, queue int
+ err error
+ }{
+ {pool: pos, queue: pos, err: nil},
+ {pool: pos, queue: zero, err: nil},
+ {pool: pos, queue: neg, err: nil},
+ {pool: zero, queue: pos, err: priopool.ErrPoolCapacitySize},
+ {pool: neg, queue: pos, err: priopool.ErrPoolCapacitySize},
+ }
+
+ for _, c := range cases {
+ _, err := priopool.New(c.pool, c.queue)
+ require.Equal(t, c.err, err, c)
+ }
+}
+
+func TestPriorityPool_Submit(t *testing.T) {
+ const (
+ poolCap, queueCap = 2, 4
+ totalCap = poolCap + queueCap
+ highPriority = 10
+ midPriority = 5
+ lowPriority = 1
+ )
+
+ p, err := priopool.New(poolCap, queueCap)
+ require.NoError(t, err)
+
+ result := new(syncList)
+ wg := new(sync.WaitGroup)
+ wg.Add(totalCap)
+
+ for i := 0; i < totalCap; i++ {
+ var priority uint32
+
+ switch {
+ case i < poolCap:
+ priority = midPriority // first put two middle priority tasks
+ case i < poolCap+queueCap/2:
+ priority = lowPriority // then add to queue two low priority tasks
+ default:
+ priority = highPriority // in the end fill queue with high priority tasks
+ }
+
+ err = p.Submit(priority, taskGenerator(int(priority), result, wg))
+ require.NoError(t, err)
+ }
+
+ err = p.Submit(highPriority, func() {})
+ require.Error(t, err, priopool.ErrQueueOverload)
+
+ wg.Wait()
+
+ expected := []int{
+ midPriority, midPriority, // first tasks that took workers from pool
+ highPriority, highPriority, // tasks from queue with higher priority
+ lowPriority, lowPriority, // remaining tasks from queue
+ }
+ require.Equal(t, expected, result.list)
+
+ t.Run("disabled queue", func(t *testing.T) {
+ p, err := priopool.New(poolCap, 0)
+ require.NoError(t, err)
+
+ wg := new(sync.WaitGroup)
+ wg.Add(poolCap)
+
+ for i := 0; i < poolCap; i++ {
+ err = p.Submit(lowPriority, taskGenerator(i, nil, wg))
+ require.NoError(t, err)
+ }
+
+ err = p.Submit(highPriority, func() {})
+ require.Error(t, err, priopool.ErrQueueOverload)
+
+ wg.Wait()
+ })
+
+ t.Run("disabled queue limit", func(t *testing.T) {
+ const n = 50
+
+ p, err := priopool.New(poolCap, -1)
+ require.NoError(t, err)
+
+ wg := new(sync.WaitGroup)
+ wg.Add(n)
+
+ for i := 0; i < n; i++ {
+ err = p.Submit(lowPriority, taskGenerator(i, nil, wg))
+ require.NoError(t, err)
+ }
+
+ wg.Wait()
+ })
+}
+
+func taskGenerator(ind int, output *syncList, wg *sync.WaitGroup) func() {
+ return func() {
+ time.Sleep(100 * time.Millisecond)
+ if output != nil {
+ output.append(ind)
+ }
+ wg.Done()
+ }
+}
diff --git a/queue.go b/queue.go
index 40f7dba..5d137ee 100644
--- a/queue.go
+++ b/queue.go
@@ -1,54 +1,51 @@
-package main
+package priopool
-// From https://golang.org/src/container/heap/example_pq_test.go
+// Priority pool based on implementation example from heap package.
+// Priority queue itself is not thread safe.
+// See https://cs.opensource.google/go/go/+/refs/tags/go1.17.2:src/container/heap/example_pq_test.go
import (
"container/heap"
)
-// An Item is something we manage in a priority queue.
-type Item struct {
- value func() // The value of the item; arbitrary.
- priority int // The priority of the item in the queue.
- // The index is needed by update and is maintained by the heap.Interface methods.
- index int // The index of the item in the heap.
+type priorityQueueTask struct {
+ value func()
+ priority int
+ index int // the index is needed by update and is maintained by the heap.Interface methods
}
-// A PriorityQueue implements heap.Interface and holds Items.
-type PriorityQueue []*Item
+type priorityQueue []*priorityQueueTask
-func (pq PriorityQueue) Len() int { return len(pq) }
+func (pq priorityQueue) Len() int { return len(pq) }
-func (pq PriorityQueue) Less(i, j int) bool {
- // We want Pop to give us the highest, not lowest, priority so we use greater than here.
+func (pq priorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority
}
-func (pq PriorityQueue) Swap(i, j int) {
+func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
-func (pq *PriorityQueue) Push(x interface{}) {
+func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
- item := x.(*Item)
+ item := x.(*priorityQueueTask)
item.index = n
*pq = append(*pq, item)
}
-func (pq *PriorityQueue) Pop() interface{} {
+func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
- old[n-1] = nil // avoid memory leak
- item.index = -1 // for safety
+ old[n-1] = nil
+ item.index = -1
*pq = old[0 : n-1]
return item
}
-// update modifies the priority and value of an Item in the queue.
-func (pq *PriorityQueue) update(item *Item, value func(), priority int) {
+func (pq *priorityQueue) update(item *priorityQueueTask, value func(), priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)