Skip to content

Commit

Permalink
test timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
lixizan committed Mar 24, 2023
1 parent 7523fbe commit e90131e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
7 changes: 7 additions & 0 deletions worker_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (c *WorkerGroup[T]) SetTimeout(d time.Duration) *WorkerGroup[T] {
return c
}

func (c *WorkerGroup[T]) clear() {
c.mu.Lock()
c.q = c.q[:0]
c.mu.Unlock()
}

func (c *WorkerGroup[T]) getJob() (v T, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -143,6 +149,7 @@ func (c *WorkerGroup[T]) Start() error {
case <-c.done:
return c.err.ErrorOrNil()
case <-ctx.Done():
c.clear()
return ErrWaitTimeout
}
}
36 changes: 18 additions & 18 deletions worker_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package concurrency
import (
"errors"
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestNewTaskGroup(t *testing.T) {
Expand Down Expand Up @@ -48,24 +50,22 @@ func TestNewTaskGroup(t *testing.T) {
as.Error(err)
})

//t.Run("timeout", func(t *testing.T) {
// var mu = &sync.Mutex{}
// var list = make([]int, 0)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// ctl := NewWorkerGroup[int]().WithContext(ctx).WithConcurrency(2)
// ctl.Push(1, 3, 5, 7, 9)
// ctl.OnMessage = func(args int) error {
// mu.Lock()
// list = append(list, args)
// mu.Unlock()
// time.Sleep(2 * time.Second)
// return nil
// }
// err := ctl.Start()
// as.NoError(err)
// as.ElementsMatch(list, []int{1, 3})
//})
t.Run("timeout", func(t *testing.T) {
var mu = &sync.Mutex{}
var list = make([]int, 0)
ctl := NewWorkerGroup[int]().SetConcurrency(2).SetTimeout(time.Second)
ctl.Push(1, 3, 5, 7, 9)
ctl.OnMessage = func(args int) error {
mu.Lock()
list = append(list, args)
mu.Unlock()
time.Sleep(2 * time.Second)
return nil
}
err := ctl.Start()
as.Error(err)
as.ElementsMatch(list, []int{1, 3})
})

t.Run("recovery", func(t *testing.T) {
ctl := NewWorkerGroup[int]()
Expand Down

0 comments on commit e90131e

Please sign in to comment.