Reformat priopool as library package

This commit is contained in:
Alex Vanin 2021-10-23 18:16:40 +03:00
parent 5dcd3cf578
commit 11e44d2e6c
9 changed files with 327 additions and 153 deletions

4
.github/logo.svg vendored Normal file
View file

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Do not edit this file with editors other than diagrams.net -->
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg xmlns="http://www.w3.org/2000/svg" style="background-color: rgb(255, 255, 255);" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="717px" height="201px" viewBox="-0.5 -0.5 717 201" content="&lt;mxfile host=&quot;app.diagrams.net&quot; modified=&quot;2021-10-23T14:49:34.983Z&quot; agent=&quot;5.0 (Macintosh)&quot; etag=&quot;4VGlpcsbnBwwhf7h8wyT&quot; version=&quot;15.5.8&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;9-g5vCLfetzEmFAWvmZT&quot; name=&quot;Page-1&quot;&gt;7Zpdk5owFIZ/DZd2+BDUS6XudnamHWecTi87WchC2kCcEFftr2+QIBCgSBeobtmbxTfJibzPIRyIimEHx0cKdv5n4kKs6Kp7VIyPiq5rU93k/2LllCizxSwRPIpc0SkTtugXFKIq1D1yYVToyAjBDO2KokPCEDqsoAFKyaHY7YXg4qw74MGSsHUALqvfkMv8RJ2baqZ/gsjz05k1VbQEIO0shMgHLjnkJGOtGDYlhCVHwdGGODYv9SUZ91DTevliFIbsmgE/vj5az94Tfdp8mUznz9vvaDeZxAPiMK8A78UZi2/LTqkFlOxDF8ZRVMVYHXzE4HYHnLj1wKFzzWcB5p80fviCMLYJJvQ81ng4/3E9YpT8hGlLSEI+fCXmhpTBY+1ZaReveJJBEkBGT7yLGGCkiSLyy0jtPmS0pkLyc6BSDYj88C6RMwv5gXCxjaPav3Q0EdNM1T6Y3bisq80u64O6rN553l7suhVHtXm/js7WS2ut9uno9NYcXfSco9rCXiz6dNS6NUebDYWhu4xv+5kROQOLbtevldAtFQWNnuU8MSs8STUKMWDotRi+yigxw4YgPnHzQpyGiMieOlCMylcDcqB5QyAGqAdZKdCZ2+W037CAjyhllFZXKOVAPaM0RpRNF9O9oJyOKOvuea1RyoEGRmmOKOsKwtYo5UADo7RGlHVPS61RyoEGRqmNJWztG5u2LEuBhmY51rCG2RVLOdDQLMcitvFyuhuWYxXb3RrbdIH3zbJdGetgEEXI+RPOm0cnOy6/TbsanfwsKQfqG127svUdoCs9vf81OvnyHRrd7H9Dp806Qld6qzcwOr1cyGwIwSV8DB5Z3dZBfp9BSAAjL4xRc1yQ66t4YwE5AC9FQ4BcN45dyf1tO7ylV97inpDLDKsiM4ze9h4r7kn2WlnZ78hkfXGdyXI50J3JFUtQbPLqjk3W6oqp7k3mH7NfliRrS/b7HGP9Gw==&lt;/diagram&gt;&lt;/mxfile&gt;"><defs/><g><rect x="480" y="75" width="120" height="120" fill="#ffffff" stroke="none" pointer-events="all"/><rect x="150" y="75" width="60" height="120" fill="#ffffff" stroke="none" pointer-events="all"/><rect x="210" y="75" width="60" height="120" fill="#ffffff" stroke="none" pointer-events="all"/><rect x="270" y="75" width="60" height="120" fill="#7ea6e0" stroke="none" pointer-events="all"/><rect x="330" y="75" width="60" height="120" fill="#f19c99" stroke="none" pointer-events="all"/><path d="M 150 75 L 390 75" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 150 195 L 390 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 390 75 L 390 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 330 75 L 330 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 270 75 L 270 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 210 75 L 210 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 480 75 L 480 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 600 75 L 600 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 480 195 L 600 195" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 480 75 L 600 75" fill="none" stroke="#000000" stroke-width="4.5" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 600 135 L 670.9 135" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 686.65 135 L 665.65 145.5 L 670.9 135 L 665.65 124.5 Z" fill="#000000" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="all"/><path d="M 390 135 L 460.9 135" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 476.65 135 L 455.65 145.5 L 460.9 135 L 455.65 124.5 Z" fill="#000000" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="all"/><path d="M 60 135 L 130.9 135" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 146.65 135 L 125.65 145.5 L 130.9 135 L 125.65 124.5 Z" fill="#000000" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="all"/><rect x="450" y="90" width="180" height="90" fill="none" stroke="none" pointer-events="all"/><g fill="#000000" font-family="Helvetica" text-anchor="middle" font-size="36px"><text x="539.5" y="149.5">Pool</text></g><rect x="450" y="0" width="180" height="60" fill="none" stroke="none" pointer-events="all"/><g fill="#000000" font-family="Helvetica" text-anchor="middle" font-size="36px"><text x="539.5" y="44.5">μ</text></g><rect x="0" y="75" width="180" height="60" fill="none" stroke="none" pointer-events="all"/><g fill="#000000" font-family="Helvetica" text-anchor="middle" font-size="36px"><text x="89.5" y="119.5">λ</text></g></g></svg>

After

Width:  |  Height:  |  Size: 4.8 KiB

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Alexey Vanin
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

29
README.md Normal file
View file

@ -0,0 +1,29 @@
<p align="center">
<img src="/.github/logo.svg" width="500px">
</p>
<p align="center">
Goroutines pool with priority queue buffer.
</p>
---
## Overview
Package `priopool` provides goroutines pool based on
[panjf2000/ants](https://github.com/panjf2000/ants) library with priority queue
buffer based on [stdlib heap](https://pkg.go.dev/container/heap) package.
Priority pool:
- is non-blocking,
- prioritizes tasks with higher priority value,
- can be configured with unlimited queue buffer.
## Install
```powershell
go get -u github.com/alexvanin/priopool
```
## License
Source code is available under the [MIT License](/LICENSE).

13
go.mod
View file

@ -1,5 +1,14 @@
module priopool
module github.com/alexvanin/priopool
go 1.17
require github.com/panjf2000/ants/v2 v2.4.6
require (
github.com/panjf2000/ants/v2 v2.4.6
github.com/stretchr/testify v1.4.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v2 v2.2.7 // indirect
)

1
go.sum
View file

@ -8,6 +8,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=

130
main.go
View file

@ -1,130 +0,0 @@
package main
import (
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
type (
PriorityPool struct {
mu sync.Mutex // PriorityQueue is not thread safe
pool *ants.Pool
queue PriorityQueue
limit int
}
)
const (
PoolCapacity = 2
QueueCapacity = 4
)
func (p *PriorityPool) Submit(pri int, f func()) error {
err := p.pool.Submit(func() {
f()
for {
// pick the highest priority item from the queue if there is any
p.mu.Lock()
if p.queue.Len() == 0 {
p.mu.Unlock()
return
}
queueF := p.queue.Pop()
p.mu.Unlock()
queueF.(*Item).value()
}
})
if err == nil {
return nil
}
if !errors.Is(err, ants.ErrPoolOverload) {
return err
}
p.mu.Lock()
defer p.mu.Unlock()
ln := p.queue.Len()
if ln >= QueueCapacity {
return errors.New("queue capacity is full")
}
p.queue.Push(&Item{
value: f,
priority: pri,
index: ln,
})
return nil
}
func NewPriorityPool() (*PriorityPool, error) {
pool, err := ants.NewPool(PoolCapacity, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("new pool: %w", err)
}
return &PriorityPool{
pool: pool,
queue: make(PriorityQueue, 0, QueueCapacity),
limit: QueueCapacity,
}, nil
}
func main() {
p, err := NewPriorityPool()
if err != nil {
log.Fatal(err)
}
wg := new(sync.WaitGroup)
wg.Add(QueueCapacity + PoolCapacity)
for i := 0; i < QueueCapacity+PoolCapacity+1; i++ {
if i < 4 {
err = p.Submit(1, func() {
time.Sleep(1 * time.Second)
fmt.Println("Low priority task is done")
wg.Done()
})
fmt.Printf("id:%d <low priority task> error:%v\n", i+1, err)
} else if i < 6 {
err = p.Submit(10, func() {
time.Sleep(1 * time.Second)
fmt.Println("High priority task is done")
wg.Done()
})
fmt.Printf("id:%d <high priority task> error:%v\n", i+1, err)
} else {
err = p.Submit(10, func() {})
fmt.Printf("id:%d <out of capacity task> error:%v\n", i+1, err)
}
}
fmt.Println()
wg.Wait()
/*
id:1 <low priority task> error:<nil>
id:2 <low priority task> error:<nil>
id:3 <low priority task> error:<nil>
id:4 <low priority task> error:<nil>
id:5 <high priority task> error:<nil>
id:6 <high priority task> error:<nil>
id:7 <out of capacity task> error:queue capacity is full
Low priority task is done
Low priority task is done
High priority task is done
High priority task is done
Low priority task is done
Low priority task is done
*/
}

110
pool.go Normal file
View file

@ -0,0 +1,110 @@
package priopool
import (
"errors"
"fmt"
"sync"
"github.com/panjf2000/ants/v2"
)
type (
// PriorityPool is a pool of goroutines with priority queue buffer.
// Based on panjf2000/ants and stdlib heap libraries.
PriorityPool struct {
mu sync.Mutex // thread safe access to priority queue
pool *ants.Pool
queue priorityQueue
limit int
}
)
const defaultQueueCapacity = 10
var (
// ErrQueueOverload will be returned on submit operation
// when both goroutines pool and priority queue are full.
ErrQueueOverload = errors.New("pool and priority queue are full")
// ErrPoolCapacitySize will be returned when constructor
// provided with non-positive pool capacity.
ErrPoolCapacitySize = errors.New("pool capacity must be positive")
)
// New creates instance of priority pool. Pool capacity must be positive.
// Zero queue capacity disables priority queue. Negative queue capacity
// disables priority queue length limit.
func New(poolCapacity, queueCapacity int) (*PriorityPool, error) {
if poolCapacity <= 0 {
return nil, ErrPoolCapacitySize
}
pool, err := ants.NewPool(poolCapacity, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("creating pool instance: %w", err)
}
var queue priorityQueue
switch {
case queueCapacity >= 0:
queue = make(priorityQueue, 0, queueCapacity)
case queueCapacity < 0:
queue = make(priorityQueue, 0, defaultQueueCapacity)
}
return &PriorityPool{
pool: pool,
queue: queue,
limit: queueCapacity,
}, nil
}
// Submit sends the task into priority pool. Non-blocking operation. If pool has
// available workers, then task executes immediately. If pool is full, then task
// is stored in priority queue. It will be executed when available worker pops
// the task from priority queue. Tasks from queue do not evict running tasks
// from pool. Tasks with bigger priority number are popped earlier.
// If queue is full, submit returns ErrQueueOverload error.
func (p *PriorityPool) Submit(priority uint32, task func()) error {
p.mu.Lock() // lock from the beginning to avoid starving
defer p.mu.Unlock()
err := p.pool.Submit(func() {
task()
// pick the highest priority item from the queue
// process items until queue is empty
for {
p.mu.Lock()
if p.queue.Len() == 0 {
p.mu.Unlock()
return
}
queueF := p.queue.Pop()
p.mu.Unlock()
queueF.(*priorityQueueTask).value()
}
})
if err == nil {
return nil
}
if !errors.Is(err, ants.ErrPoolOverload) {
return fmt.Errorf("pool submit: %w", err)
}
ln := p.queue.Len()
if p.limit >= 0 && ln >= p.limit {
return ErrQueueOverload
}
p.queue.Push(&priorityQueueTask{
value: task,
priority: int(priority),
index: ln,
})
return nil
}

133
pool_test.go Normal file
View file

@ -0,0 +1,133 @@
package priopool_test
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/alexvanin/priopool"
)
type syncList struct {
mu sync.Mutex
list []int
}
func (s *syncList) append(i int) {
s.mu.Lock()
defer s.mu.Unlock()
s.list = append(s.list, i)
}
func TestPriorityPool_New(t *testing.T) {
const pos, zero, neg = 1, 0, -1
cases := [...]struct {
pool, queue int
err error
}{
{pool: pos, queue: pos, err: nil},
{pool: pos, queue: zero, err: nil},
{pool: pos, queue: neg, err: nil},
{pool: zero, queue: pos, err: priopool.ErrPoolCapacitySize},
{pool: neg, queue: pos, err: priopool.ErrPoolCapacitySize},
}
for _, c := range cases {
_, err := priopool.New(c.pool, c.queue)
require.Equal(t, c.err, err, c)
}
}
func TestPriorityPool_Submit(t *testing.T) {
const (
poolCap, queueCap = 2, 4
totalCap = poolCap + queueCap
highPriority = 10
midPriority = 5
lowPriority = 1
)
p, err := priopool.New(poolCap, queueCap)
require.NoError(t, err)
result := new(syncList)
wg := new(sync.WaitGroup)
wg.Add(totalCap)
for i := 0; i < totalCap; i++ {
var priority uint32
switch {
case i < poolCap:
priority = midPriority // first put two middle priority tasks
case i < poolCap+queueCap/2:
priority = lowPriority // then add to queue two low priority tasks
default:
priority = highPriority // in the end fill queue with high priority tasks
}
err = p.Submit(priority, taskGenerator(int(priority), result, wg))
require.NoError(t, err)
}
err = p.Submit(highPriority, func() {})
require.Error(t, err, priopool.ErrQueueOverload)
wg.Wait()
expected := []int{
midPriority, midPriority, // first tasks that took workers from pool
highPriority, highPriority, // tasks from queue with higher priority
lowPriority, lowPriority, // remaining tasks from queue
}
require.Equal(t, expected, result.list)
t.Run("disabled queue", func(t *testing.T) {
p, err := priopool.New(poolCap, 0)
require.NoError(t, err)
wg := new(sync.WaitGroup)
wg.Add(poolCap)
for i := 0; i < poolCap; i++ {
err = p.Submit(lowPriority, taskGenerator(i, nil, wg))
require.NoError(t, err)
}
err = p.Submit(highPriority, func() {})
require.Error(t, err, priopool.ErrQueueOverload)
wg.Wait()
})
t.Run("disabled queue limit", func(t *testing.T) {
const n = 50
p, err := priopool.New(poolCap, -1)
require.NoError(t, err)
wg := new(sync.WaitGroup)
wg.Add(n)
for i := 0; i < n; i++ {
err = p.Submit(lowPriority, taskGenerator(i, nil, wg))
require.NoError(t, err)
}
wg.Wait()
})
}
func taskGenerator(ind int, output *syncList, wg *sync.WaitGroup) func() {
return func() {
time.Sleep(100 * time.Millisecond)
if output != nil {
output.append(ind)
}
wg.Done()
}
}

View file

@ -1,54 +1,51 @@
package main
package priopool
// From https://golang.org/src/container/heap/example_pq_test.go
// Priority pool based on implementation example from heap package.
// Priority queue itself is not thread safe.
// See https://cs.opensource.google/go/go/+/refs/tags/go1.17.2:src/container/heap/example_pq_test.go
import (
"container/heap"
)
// An Item is something we manage in a priority queue.
type Item struct {
value func() // The value of the item; arbitrary.
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
type priorityQueueTask struct {
value func()
priority int
index int // the index is needed by update and is maintained by the heap.Interface methods
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
type priorityQueue []*priorityQueueTask
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item := x.(*priorityQueueTask)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
old[n-1] = nil
item.index = -1
*pq = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, value func(), priority int) {
func (pq *priorityQueue) update(item *priorityQueueTask, value func(), priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)