Skip to content

Commit

Permalink
添加WithMultiple参数
Browse files Browse the repository at this point in the history
  • Loading branch information
lixizan committed Aug 3, 2023
1 parent 1a7966a commit 2c87b86
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 10 deletions.
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ go test -benchmem -run=^$ -bench . github.com/lxzan/concurrency/benchmark
goos: darwin
goarch: arm64
pkg: github.com/lxzan/concurrency/benchmark
Benchmark_Fib-8 1485490 775.0 ns/op 0 B/op 0 allocs/op
Benchmark_StdGo-8 388 3066459 ns/op 160537 B/op 10002 allocs/op
Benchmark_Queues-8 457 2602319 ns/op 324489 B/op 11061 allocs/op
Benchmark_Ants-8 139 7337507 ns/op 160368 B/op 10004 allocs/op
Benchmark_GoPool-8 264 4514672 ns/op 191897 B/op 10569 allocs/op
Benchmark_Fib-8 1534509 775.5 ns/op 0 B/op 0 allocs/op
Benchmark_StdGo-8 390 3078647 ns/op 160585 B/op 10002 allocs/op
Benchmark_QueuesSingle-8 262 4388264 ns/op 345144 B/op 10898 allocs/op
Benchmark_QueuesMultiple-8 470 2630718 ns/op 323923 B/op 10964 allocs/op
Benchmark_Ants-8 178 6708482 ns/op 160374 B/op 10004 allocs/op
Benchmark_GoPool-8 348 3487154 ns/op 194926 B/op 10511 allocs/op
PASS
ok github.com/lxzan/concurrency/benchmark 8.500s
ok github.com/lxzan/concurrency/benchmark 10.107s
```
4 changes: 2 additions & 2 deletions groups/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ func WithTimeout(t time.Duration) Option {
}

// WithConcurrency 设置最大并发
func WithConcurrency(num int64) Option {
func WithConcurrency(n int64) Option {
return func(o *options) {
o.concurrency = num
o.concurrency = n
}
}

Expand Down
2 changes: 1 addition & 1 deletion queues/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type (
Push(job Job)

// 停止
// 停止后调用Push不会产生任何效果
// 注意: 此方法有阻塞等待任务结束逻辑; 停止后调用Push方法不会产生任何效果.
Stop() error
}
)
Expand Down
68 changes: 67 additions & 1 deletion queues/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"
)

func TestNewWorkerQueue(t *testing.T) {
func TestSingleQueue(t *testing.T) {
as := assert.New(t)

t.Run("sum", func(t *testing.T) {
Expand Down Expand Up @@ -56,4 +56,70 @@ func TestNewWorkerQueue(t *testing.T) {
cc.Stop()
assert.Equal(t, int64(1), atomic.LoadInt64(&sum))
})

t.Run("", func(t *testing.T) {
q := New(WithConcurrency(1))
q.Push(func() { time.Sleep(100 * time.Millisecond) })
q.Push(func() {})
q.Stop()
q.Push(func() {})
assert.Equal(t, 0, q.(*singleQueue).len())
})
}

func TestMultiQueue(t *testing.T) {
as := assert.New(t)

t.Run("sum", func(t *testing.T) {
var val = int64(0)
var wg = sync.WaitGroup{}
wg.Add(1000)
w := New(WithConcurrency(16), WithMultiple(8))
for i := 1; i <= 1000; i++ {
args := int64(i)
w.Push(func() {
atomic.AddInt64(&val, args)
wg.Done()
})
}
wg.Wait()
as.Equal(int64(500500), val)
})

t.Run("recover", func(t *testing.T) {
w := New(WithRecovery(), WithLogger(logs.DefaultLogger), WithMultiple(8))
w.Push(func() {
panic("test")
})
})

t.Run("stop timeout", func(t *testing.T) {
cc := New(WithTimeout(time.Millisecond), WithMultiple(8))
sum := int64(0)
cc.Push(func() {
time.Sleep(time.Second)
atomic.AddInt64(&sum, 1)
})
cc.Stop()
assert.Equal(t, int64(0), atomic.LoadInt64(&sum))
})

t.Run("stop graceful", func(t *testing.T) {
cc := New(WithTimeout(time.Second), WithMultiple(8))
sum := int64(0)
cc.Push(func() {
time.Sleep(time.Millisecond)
atomic.AddInt64(&sum, 1)
})
cc.Stop()
assert.Equal(t, int64(1), atomic.LoadInt64(&sum))
})

t.Run("", func(t *testing.T) {
q := New(WithConcurrency(1), WithMultiple(1))
q.Push(func() { time.Sleep(100 * time.Millisecond) })
q.Push(func() {})
q.Stop()
q.Push(func() {})
})
}

0 comments on commit 2c87b86

Please sign in to comment.