From 939cf2474b82290f1980156c158ae67c61229a3a Mon Sep 17 00:00:00 2001 From: lixizan Date: Thu, 3 Aug 2023 09:39:09 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=B7=BB=E5=8A=A0WithMultiple=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/benchmark_test.go | 18 +++++- queues/multiple_queue.go | 117 +++++++++++++++++++++++++++++++++++ queues/options.go | 11 +++- queues/queue.go | 118 +++--------------------------------- queues/single_queue.go | 102 +++++++++++++++++++++++++++++++ 5 files changed, 254 insertions(+), 112 deletions(-) create mode 100644 queues/multiple_queue.go create mode 100644 queues/single_queue.go diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index c8df770..e39c344 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -34,7 +34,7 @@ func Benchmark_StdGo(b *testing.B) { } } -func Benchmark_Queues(b *testing.B) { +func Benchmark_QueuesSingle(b *testing.B) { q := queues.New(queues.WithConcurrency(Concurrency)) for i := 0; i < b.N; i++ { @@ -50,6 +50,22 @@ func Benchmark_Queues(b *testing.B) { } } +func Benchmark_QueuesMultiple(b *testing.B) { + q := queues.New(queues.WithConcurrency(Concurrency), queues.WithMultiple()) + + for i := 0; i < b.N; i++ { + wg := &sync.WaitGroup{} + wg.Add(M) + for j := 0; j < M; j++ { + q.Push(func() { + fib(N) + wg.Done() + }) + } + wg.Wait() + } +} + func Benchmark_Ants(b *testing.B) { q, _ := ants.NewPool(Concurrency) defer q.Release() diff --git a/queues/multiple_queue.go b/queues/multiple_queue.go new file mode 100644 index 0000000..c50c157 --- /dev/null +++ b/queues/multiple_queue.go @@ -0,0 +1,117 @@ +package queues + +import ( + "context" + "github.com/lxzan/concurrency/internal" + "github.com/lxzan/concurrency/logs" + "sync" + "sync/atomic" + "time" +) + +type ( + multipleQueue struct { + options *options + serial int64 + size int64 + qs []*multipleQueueChild + } + + multipleQueueChild struct { + mu *sync.Mutex // 锁 + q []Job // 任务队列 + maxConcurrency int64 // 最大并发 + curConcurrency int64 // 当前并发 + caller Caller // 异常处理 + logger logs.Logger // 日志 + } +) + +// 创建N条并发度为1的任务队列 +func newMultipleQueue(o *options) *multipleQueue { + size := internal.ToBinaryNumber(o.concurrency) + qs := make([]*multipleQueueChild, size) + for i := int64(0); i < size; i++ { + qs[i] = &multipleQueueChild{ + mu: &sync.Mutex{}, + maxConcurrency: 1, + curConcurrency: 0, + caller: o.caller, + logger: o.logger, + } + } + return &multipleQueue{options: o, qs: qs, size: size} +} + +// Push 追加任务 +func (c *multipleQueue) Push(job Job) { + index := atomic.AddInt64(&c.serial, 1) & (c.size - 1) + c.qs[index].push(job) +} + +// Stop 停止 +// 可能需要等待一段时间, 直到所有任务执行完成或者超时 +func (c *multipleQueue) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), c.options.timeout) + ticker := time.NewTicker(50 * time.Millisecond) + defer func() { + cancel() + ticker.Stop() + }() + + for { + select { + case <-ticker.C: + sum := 0 + for _, item := range c.qs { + sum += item.len() + } + if sum == 0 { + return + } + case <-ctx.Done(): + return + } + } +} + +func (c *multipleQueueChild) len() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.q) +} + +// 获取一个任务 +func (c *multipleQueueChild) getJob(delta int64) Job { + c.mu.Lock() + defer c.mu.Unlock() + c.curConcurrency += delta + if c.curConcurrency >= c.maxConcurrency { + return nil + } + if n := len(c.q); n == 0 { + return nil + } + var result = c.q[0] + c.q = c.q[1:] + c.curConcurrency++ + return result +} + +// 循环执行任务 +func (c *multipleQueueChild) do(job Job) { + for job != nil { + c.caller(c.logger, job) + job = c.getJob(-1) + } +} + +// push 追加任务, 有资源空闲的话会立即执行 +func (c *multipleQueueChild) push(job Job) { + c.mu.Lock() + c.q = append(c.q, job) + c.mu.Unlock() + if item := c.getJob(0); item != nil { + go c.do(item) + } +} diff --git a/queues/options.go b/queues/options.go index 03b7ba2..d9a3dde 100644 --- a/queues/options.go +++ b/queues/options.go @@ -1,7 +1,6 @@ package queues import ( - "github.com/lxzan/concurrency/internal" "github.com/lxzan/concurrency/logs" "runtime" "time" @@ -13,7 +12,7 @@ type Option func(o *options) // WithConcurrency 设置最大并发 func WithConcurrency(num int64) Option { return func(o *options) { - o.concurrency = internal.ToBinaryNumber(num) + o.concurrency = num } } @@ -31,6 +30,14 @@ func WithLogger(logger logs.Logger) Option { } } +// WithMultiple 设置多重队列, 即n条并发度为1的队列 +// 默认是1条并发度为n的队列 +func WithMultiple() Option { + return func(o *options) { + o.multiple = true + } +} + // WithRecovery 设置恢复程序 func WithRecovery() Option { return func(o *options) { diff --git a/queues/queue.go b/queues/queue.go index 7410d39..8fd530e 100644 --- a/queues/queue.go +++ b/queues/queue.go @@ -1,10 +1,7 @@ package queues import ( - "context" "github.com/lxzan/concurrency/logs" - "sync" - "sync/atomic" "time" ) @@ -13,10 +10,9 @@ const ( defaultTimeout = 30 * time.Second ) -var DefaultQueue = New(WithConcurrency(16), WithRecovery()) - type ( options struct { + multiple bool concurrency int64 timeout time.Duration caller Caller @@ -25,28 +21,17 @@ type ( Caller func(logger logs.Logger, f func()) - queue struct { - mu *sync.Mutex // 锁 - q []Job // 任务队列 - maxConcurrency int64 // 最大并发 - curConcurrency int64 // 当前并发 - caller Caller // 异常处理 - logger logs.Logger // 日志 - } - Job func() - Queue struct { - options *options - serial int64 - qs []*queue + Queue interface { + Push(job Job) + Stop() } ) -// New -// 创建N条并发度为1的任务队列 -func New(opts ...Option) *Queue { +func New(opts ...Option) Queue { o := &options{ + multiple: false, concurrency: defaultConcurrency, timeout: defaultTimeout, caller: func(logger logs.Logger, f func()) { f() }, @@ -56,93 +41,8 @@ func New(opts ...Option) *Queue { f(o) } - qs := make([]*queue, o.concurrency) - for i := int64(0); i < o.concurrency; i++ { - qs[i] = newQueue(o) - } - return &Queue{options: o, qs: qs} -} - -// Push 追加任务 -func (c *Queue) Push(job Job) { - index := atomic.AddInt64(&c.serial, 1) & (c.options.concurrency - 1) - c.qs[index].push(job) -} - -// Stop 停止 -// 可能需要等待一段时间, 直到所有任务执行完成或者超时 -func (c *Queue) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), c.options.timeout) - ticker := time.NewTicker(50 * time.Millisecond) - defer func() { - cancel() - ticker.Stop() - }() - - for { - select { - case <-ticker.C: - sum := 0 - for _, item := range c.qs { - sum += item.len() - } - if sum == 0 { - return - } - case <-ctx.Done(): - return - } - } -} - -// newQueue 创建一个任务队列 -func newQueue(o *options) *queue { - return &queue{ - mu: &sync.Mutex{}, - maxConcurrency: 1, - curConcurrency: 0, - caller: o.caller, - logger: o.logger, - } -} - -func (c *queue) len() int { - c.mu.Lock() - defer c.mu.Unlock() - return len(c.q) -} - -// 获取一个任务 -func (c *queue) getJob(delta int64) Job { - c.mu.Lock() - defer c.mu.Unlock() - c.curConcurrency += delta - if c.curConcurrency >= c.maxConcurrency { - return nil - } - if n := len(c.q); n == 0 { - return nil - } - var result = c.q[0] - c.q = c.q[1:] - c.curConcurrency++ - return result -} - -// 循环执行任务 -func (c *queue) do(job Job) { - for job != nil { - c.caller(c.logger, job) - job = c.getJob(-1) - } -} - -// push 追加任务, 有资源空闲的话会立即执行 -func (c *queue) push(job Job) { - c.mu.Lock() - c.q = append(c.q, job) - c.mu.Unlock() - if item := c.getJob(0); item != nil { - go c.do(item) + if o.multiple { + return newMultipleQueue(o) } + return newSingleQueue(o) } diff --git a/queues/single_queue.go b/queues/single_queue.go new file mode 100644 index 0000000..aac4601 --- /dev/null +++ b/queues/single_queue.go @@ -0,0 +1,102 @@ +package queues + +import ( + "context" + "github.com/lxzan/concurrency/logs" + "sync" + "time" +) + +// 创建一条任务队列 +func newSingleQueue(o *options) *singleQueue { + return &singleQueue{ + mu: &sync.Mutex{}, + maxConcurrency: o.concurrency, + curConcurrency: 0, + caller: o.caller, + logger: o.logger, + timeout: o.timeout, + } +} + +type singleQueue struct { + mu *sync.Mutex // 锁 + q []Job // 任务队列 + maxConcurrency int64 // 最大并发 + curConcurrency int64 // 当前并发 + caller Caller // 异常处理 + logger logs.Logger // 日志 + timeout time.Duration // 退出超时 + stopped bool // 是否关闭 +} + +func (c *singleQueue) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + ticker := time.NewTicker(50 * time.Millisecond) + defer func() { + cancel() + ticker.Stop() + }() + + for { + select { + case <-ticker.C: + if c.doStop() { + return + } + case <-ctx.Done(): + c.doStop() + return + } + } +} + +func (c *singleQueue) doStop() bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.q) == 0 { + c.stopped = true + return true + } + return false +} + +// 获取一个任务 +func (c *singleQueue) getJob(delta int64) Job { + c.mu.Lock() + defer c.mu.Unlock() + c.curConcurrency += delta + if c.curConcurrency >= c.maxConcurrency { + return nil + } + if n := len(c.q); n == 0 { + return nil + } + var result = c.q[0] + c.q = c.q[1:] + c.curConcurrency++ + return result +} + +// 循环执行任务 +func (c *singleQueue) do(job Job) { + for job != nil { + c.caller(c.logger, job) + job = c.getJob(-1) + } +} + +// push 追加任务, 有资源空闲的话会立即执行 +func (c *singleQueue) Push(job Job) { + c.mu.Lock() + if c.stopped { + c.mu.Unlock() + return + } + + c.q = append(c.q, job) + c.mu.Unlock() + if item := c.getJob(0); item != nil { + go c.do(item) + } +} From 1a7966a0c69db2c62795bb07a2ac056a57fc7048 Mon Sep 17 00:00:00 2001 From: lixizan Date: Thu, 3 Aug 2023 13:54:25 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=B7=BB=E5=8A=A0WithMultiple=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/benchmark_test.go | 2 +- queues/multiple_queue.go | 110 +++++++++++------------------------- queues/options.go | 12 ++-- queues/queue.go | 10 +++- queues/single_queue.go | 20 ++++--- 5 files changed, 63 insertions(+), 91 deletions(-) diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index e39c344..3e7fb99 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -51,7 +51,7 @@ func Benchmark_QueuesSingle(b *testing.B) { } func Benchmark_QueuesMultiple(b *testing.B) { - q := queues.New(queues.WithConcurrency(Concurrency), queues.WithMultiple()) + q := queues.New(queues.WithConcurrency(1), queues.WithMultiple(Concurrency)) for i := 0; i < b.N; i++ { wg := &sync.WaitGroup{} diff --git a/queues/multiple_queue.go b/queues/multiple_queue.go index c50c157..c625189 100644 --- a/queues/multiple_queue.go +++ b/queues/multiple_queue.go @@ -2,56 +2,38 @@ package queues import ( "context" - "github.com/lxzan/concurrency/internal" - "github.com/lxzan/concurrency/logs" - "sync" "sync/atomic" "time" ) -type ( - multipleQueue struct { - options *options - serial int64 - size int64 - qs []*multipleQueueChild - } - - multipleQueueChild struct { - mu *sync.Mutex // 锁 - q []Job // 任务队列 - maxConcurrency int64 // 最大并发 - curConcurrency int64 // 当前并发 - caller Caller // 异常处理 - logger logs.Logger // 日志 - } -) +type multipleQueue struct { + options *options // 参数 + serial int64 // 序列号 + size int64 // 队列大小 + stopped atomic.Uint32 // 是否关闭 + qs []*singleQueue // 子队列 +} -// 创建N条并发度为1的任务队列 +// 创建多重队列 func newMultipleQueue(o *options) *multipleQueue { - size := internal.ToBinaryNumber(o.concurrency) - qs := make([]*multipleQueueChild, size) - for i := int64(0); i < size; i++ { - qs[i] = &multipleQueueChild{ - mu: &sync.Mutex{}, - maxConcurrency: 1, - curConcurrency: 0, - caller: o.caller, - logger: o.logger, - } + qs := make([]*singleQueue, o.size) + for i := int64(0); i < o.size; i++ { + qs[i] = newSingleQueue(o) } - return &multipleQueue{options: o, qs: qs, size: size} + return &multipleQueue{options: o, qs: qs, size: o.size} } // Push 追加任务 func (c *multipleQueue) Push(job Job) { - index := atomic.AddInt64(&c.serial, 1) & (c.size - 1) - c.qs[index].push(job) + if c.stopped.Load() == 0 { + index := atomic.AddInt64(&c.serial, 1) & (c.size - 1) + c.qs[index].Push(job) + } } // Stop 停止 // 可能需要等待一段时间, 直到所有任务执行完成或者超时 -func (c *multipleQueue) Stop() { +func (c *multipleQueue) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), c.options.timeout) ticker := time.NewTicker(50 * time.Millisecond) defer func() { @@ -62,56 +44,30 @@ func (c *multipleQueue) Stop() { for { select { case <-ticker.C: - sum := 0 - for _, item := range c.qs { - sum += item.len() - } - if sum == 0 { - return + if c.doStop(false) { + return nil } case <-ctx.Done(): - return + c.doStop(true) + return ErrStopTimeout } } } -func (c *multipleQueueChild) len() int { - c.mu.Lock() - defer c.mu.Unlock() - return len(c.q) -} - -// 获取一个任务 -func (c *multipleQueueChild) getJob(delta int64) Job { - c.mu.Lock() - defer c.mu.Unlock() - c.curConcurrency += delta - if c.curConcurrency >= c.maxConcurrency { - return nil +func (c *multipleQueue) doStop(force bool) bool { + if force { + c.stopped.Store(1) + return true } - if n := len(c.q); n == 0 { - return nil - } - var result = c.q[0] - c.q = c.q[1:] - c.curConcurrency++ - return result -} -// 循环执行任务 -func (c *multipleQueueChild) do(job Job) { - for job != nil { - c.caller(c.logger, job) - job = c.getJob(-1) + sum := 0 + for _, item := range c.qs { + sum += item.len() } -} - -// push 追加任务, 有资源空闲的话会立即执行 -func (c *multipleQueueChild) push(job Job) { - c.mu.Lock() - c.q = append(c.q, job) - c.mu.Unlock() - if item := c.getJob(0); item != nil { - go c.do(item) + if sum == 0 { + c.stopped.Store(1) + return true } + + return false } diff --git a/queues/options.go b/queues/options.go index d9a3dde..e711372 100644 --- a/queues/options.go +++ b/queues/options.go @@ -1,6 +1,7 @@ package queues import ( + "github.com/lxzan/concurrency/internal" "github.com/lxzan/concurrency/logs" "runtime" "time" @@ -10,9 +11,9 @@ import ( type Option func(o *options) // WithConcurrency 设置最大并发 -func WithConcurrency(num int64) Option { +func WithConcurrency(n int64) Option { return func(o *options) { - o.concurrency = num + o.concurrency = n } } @@ -30,11 +31,12 @@ func WithLogger(logger logs.Logger) Option { } } -// WithMultiple 设置多重队列, 即n条并发度为1的队列 -// 默认是1条并发度为n的队列 -func WithMultiple() Option { +// WithMultiple 设置多重队列, 降低锁竞争开销 +// 注意: n会被转化为pow(2,x) +func WithMultiple(n int64) Option { return func(o *options) { o.multiple = true + o.size = internal.ToBinaryNumber(n) } } diff --git a/queues/queue.go b/queues/queue.go index 8fd530e..2c9d0f7 100644 --- a/queues/queue.go +++ b/queues/queue.go @@ -2,6 +2,7 @@ package queues import ( "github.com/lxzan/concurrency/logs" + "github.com/pkg/errors" "time" ) @@ -10,9 +11,12 @@ const ( defaultTimeout = 30 * time.Second ) +var ErrStopTimeout = errors.New("stop timeout") + type ( options struct { multiple bool + size int64 concurrency int64 timeout time.Duration caller Caller @@ -24,8 +28,12 @@ type ( Job func() Queue interface { + // 追加任务 Push(job Job) - Stop() + + // 停止 + // 停止后调用Push不会产生任何效果 + Stop() error } ) diff --git a/queues/single_queue.go b/queues/single_queue.go index aac4601..c06eb9a 100644 --- a/queues/single_queue.go +++ b/queues/single_queue.go @@ -30,7 +30,7 @@ type singleQueue struct { stopped bool // 是否关闭 } -func (c *singleQueue) Stop() { +func (c *singleQueue) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) ticker := time.NewTicker(50 * time.Millisecond) defer func() { @@ -41,20 +41,20 @@ func (c *singleQueue) Stop() { for { select { case <-ticker.C: - if c.doStop() { - return + if c.doStop(false) { + return nil } case <-ctx.Done(): - c.doStop() - return + c.doStop(true) + return ErrStopTimeout } } } -func (c *singleQueue) doStop() bool { +func (c *singleQueue) doStop(force bool) bool { c.mu.Lock() defer c.mu.Unlock() - if len(c.q) == 0 { + if force || len(c.q) == 0 { c.stopped = true return true } @@ -100,3 +100,9 @@ func (c *singleQueue) Push(job Job) { go c.do(item) } } + +func (c *singleQueue) len() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.q) +} From 2c87b8628d995a71e5f51fa0fcd5d21a642b9c43 Mon Sep 17 00:00:00 2001 From: lixizan Date: Thu, 3 Aug 2023 15:50:21 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=B7=BB=E5=8A=A0WithMultiple=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 +++++---- groups/options.go | 4 +-- queues/queue.go | 2 +- queues/queue_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 78 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index e84e6d6..7bb2fc5 100644 --- a/README.md +++ b/README.md @@ -83,11 +83,13 @@ go test -benchmem -run=^$ -bench . github.com/lxzan/concurrency/benchmark goos: darwin goarch: arm64 pkg: github.com/lxzan/concurrency/benchmark -Benchmark_Fib-8 1485490 775.0 ns/op 0 B/op 0 allocs/op -Benchmark_StdGo-8 388 3066459 ns/op 160537 B/op 10002 allocs/op -Benchmark_Queues-8 457 2602319 ns/op 324489 B/op 11061 allocs/op -Benchmark_Ants-8 139 7337507 ns/op 160368 B/op 10004 allocs/op -Benchmark_GoPool-8 264 4514672 ns/op 191897 B/op 10569 allocs/op +Benchmark_Fib-8 1534509 775.5 ns/op 0 B/op 0 allocs/op +Benchmark_StdGo-8 390 3078647 ns/op 160585 B/op 10002 allocs/op +Benchmark_QueuesSingle-8 262 4388264 ns/op 345144 B/op 10898 allocs/op +Benchmark_QueuesMultiple-8 470 2630718 ns/op 323923 B/op 10964 allocs/op +Benchmark_Ants-8 178 6708482 ns/op 160374 B/op 10004 allocs/op +Benchmark_GoPool-8 348 3487154 ns/op 194926 B/op 10511 allocs/op PASS -ok github.com/lxzan/concurrency/benchmark 8.500s +ok github.com/lxzan/concurrency/benchmark 10.107s + ``` \ No newline at end of file diff --git a/groups/options.go b/groups/options.go index 7125516..14ba1c8 100644 --- a/groups/options.go +++ b/groups/options.go @@ -17,9 +17,9 @@ func WithTimeout(t time.Duration) Option { } // WithConcurrency 设置最大并发 -func WithConcurrency(num int64) Option { +func WithConcurrency(n int64) Option { return func(o *options) { - o.concurrency = num + o.concurrency = n } } diff --git a/queues/queue.go b/queues/queue.go index 2c9d0f7..c27b3a8 100644 --- a/queues/queue.go +++ b/queues/queue.go @@ -32,7 +32,7 @@ type ( Push(job Job) // 停止 - // 停止后调用Push不会产生任何效果 + // 注意: 此方法有阻塞等待任务结束逻辑; 停止后调用Push方法不会产生任何效果. Stop() error } ) diff --git a/queues/queue_test.go b/queues/queue_test.go index b8590f7..8cdf97b 100644 --- a/queues/queue_test.go +++ b/queues/queue_test.go @@ -9,7 +9,7 @@ import ( "time" ) -func TestNewWorkerQueue(t *testing.T) { +func TestSingleQueue(t *testing.T) { as := assert.New(t) t.Run("sum", func(t *testing.T) { @@ -56,4 +56,70 @@ func TestNewWorkerQueue(t *testing.T) { cc.Stop() assert.Equal(t, int64(1), atomic.LoadInt64(&sum)) }) + + t.Run("", func(t *testing.T) { + q := New(WithConcurrency(1)) + q.Push(func() { time.Sleep(100 * time.Millisecond) }) + q.Push(func() {}) + q.Stop() + q.Push(func() {}) + assert.Equal(t, 0, q.(*singleQueue).len()) + }) +} + +func TestMultiQueue(t *testing.T) { + as := assert.New(t) + + t.Run("sum", func(t *testing.T) { + var val = int64(0) + var wg = sync.WaitGroup{} + wg.Add(1000) + w := New(WithConcurrency(16), WithMultiple(8)) + for i := 1; i <= 1000; i++ { + args := int64(i) + w.Push(func() { + atomic.AddInt64(&val, args) + wg.Done() + }) + } + wg.Wait() + as.Equal(int64(500500), val) + }) + + t.Run("recover", func(t *testing.T) { + w := New(WithRecovery(), WithLogger(logs.DefaultLogger), WithMultiple(8)) + w.Push(func() { + panic("test") + }) + }) + + t.Run("stop timeout", func(t *testing.T) { + cc := New(WithTimeout(time.Millisecond), WithMultiple(8)) + sum := int64(0) + cc.Push(func() { + time.Sleep(time.Second) + atomic.AddInt64(&sum, 1) + }) + cc.Stop() + assert.Equal(t, int64(0), atomic.LoadInt64(&sum)) + }) + + t.Run("stop graceful", func(t *testing.T) { + cc := New(WithTimeout(time.Second), WithMultiple(8)) + sum := int64(0) + cc.Push(func() { + time.Sleep(time.Millisecond) + atomic.AddInt64(&sum, 1) + }) + cc.Stop() + assert.Equal(t, int64(1), atomic.LoadInt64(&sum)) + }) + + t.Run("", func(t *testing.T) { + q := New(WithConcurrency(1), WithMultiple(1)) + q.Push(func() { time.Sleep(100 * time.Millisecond) }) + q.Push(func() {}) + q.Stop() + q.Push(func() {}) + }) }