From 3364a9e472d800d0d3884191b34a4cce69e736ba Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:04:50 +0700 Subject: [PATCH 01/10] Fix bug handle panic --- concurrent_executor.go | 18 +++++++++++------- go.mod | 5 ++++- go.sum | 3 +++ 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/concurrent_executor.go b/concurrent_executor.go index 9186bdd..2eebec1 100644 --- a/concurrent_executor.go +++ b/concurrent_executor.go @@ -2,12 +2,13 @@ package conexec import ( "context" - "fmt" "sync" + + "github.com/pkg/errors" ) /* -// If ExecutorArgs is valid, it will be passed into Executor. +// If ExecutorArgs is valid, it will be passed into Executor func. { ... @@ -21,7 +22,7 @@ import ( task.Executor = func(ctx context.Context, args ...interface{}) (interface{}, error) { arg0, ok := args[0].(type) if !ok { - return "", fmt.Errorf("parse error") + return "", errors.Errorf("parse error") } // Do something with arg0 @@ -75,13 +76,16 @@ func (concurrentExecutor *ConcurrentExecutor) runTask(ctx context.Context) { defer func() { // Recover from panics and report errors to responseChan if r := recover(); r != nil { - concurrentExecutor.mutex.Lock() + // Push error panic handling concurrentExecutor.responseChan <- &TaskResponse{ TaskID: task.ID, Value: nil, - Error: fmt.Errorf("got panic, recover: %v", r), + Error: errors.Errorf("got panic, recover: %v", r), } - concurrentExecutor.mutex.Unlock() + + // This goroutine is destroyed because panic -> create new to replace it, support handle another task + concurrentExecutor.waitgroup.Add(1) + go concurrentExecutor.runTask(ctx) } }() @@ -149,7 +153,7 @@ func (concurrentExecutor *ConcurrentExecutor) EnqueueTask(task Task) error { case concurrentExecutor.taskQueueChan <- task: return nil default: - return fmt.Errorf(FullQueueErr) + return errors.Errorf(FullQueueErr) } } diff --git a/go.mod b/go.mod index 55cece3..091350e 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module github.com/anhhuu/conexec go 1.21.5 -require github.com/stretchr/testify v1.8.4 +require ( + github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.8.4 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 8cf6655..57d96e6 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From e293e1848dcad29ab3f2b9272bfc35eee539d958 Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:04:59 +0700 Subject: [PATCH 02/10] Add more unit test --- concurrent_executor_test.go | 467 +++++++++++++++++++++++++----------- 1 file changed, 322 insertions(+), 145 deletions(-) diff --git a/concurrent_executor_test.go b/concurrent_executor_test.go index 4cfd873..fee2a0a 100644 --- a/concurrent_executor_test.go +++ b/concurrent_executor_test.go @@ -2,180 +2,357 @@ package conexec import ( "context" - "fmt" + "strconv" "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) -func TestConcurrentExecutor(t *testing.T) { +const ( + numberOfTestingTasks = 50 +) + +func executorForTest(ctx context.Context, args ...interface{}) (interface{}, error) { + if len(args) != 3 { + return "", errors.Errorf("func dummyExecutor need 3 args, got %d", len(args)) + } + + taskID, ok := args[0].(string) + if !ok { + return "", errors.Errorf("parse taskID error") + } + + isReturnError, ok := args[1].(bool) + if !ok { + return "", errors.Errorf("parse isReturnError error") + } + + // Because expectedErr arg can NULL, so need a check to avoid panic + var expectedError error + if args[2] != nil { + if expectedError, ok = args[2].(error); !ok { + return "", errors.Errorf("parse expectedError error") + } + } + + // Simulating task execution + time.Sleep(5 * time.Millisecond) + + if isReturnError { + return "", expectedError + } + return taskID, nil +} + +func executorForPanicTest(ctx context.Context, args ...interface{}) (interface{}, error) { + if len(args) != 3 { + return "", errors.Errorf("func dummyExecutor need 3 args, got %d", len(args)) + } + + taskID, ok := args[0].(string) + if !ok { + return "", errors.Errorf("parse taskID error") + } + + isPanic, ok := args[1].(bool) + if !ok { + return "", errors.Errorf("parse isPanic error") + } + + expectedPanicMsg, ok := args[2].(string) + if !ok { + return "", errors.Errorf("parse expectedPanicMsg error") + } + + // Simulating task execution + time.Sleep(5 * time.Millisecond) + + if isPanic { + panic(expectedPanicMsg) + } + return taskID, nil +} + +func getTaskIDForTest(index int) string { + return "Task_" + strconv.Itoa(index) +} + +func TestConcurrentExecutor_SingleRun(t *testing.T) { t.Parallel() + test := assert.New(t) - t.Run("happy case with 1 time run", func(t *testing.T) { - // Create a ConcurrentExecutor - concurrentExecutor := NewConcurrentExecutorBuilder(). - WithMaxTaskQueueSize(10). - WithMaxConcurrentTasks(5). - Build() + // Create a ConcurrentExecutor + concurrentExecutor := NewConcurrentExecutorBuilder(). + WithMaxTaskQueueSize(defautMaxTaskQueueSize). + WithMaxConcurrentTasks(defaultMaxConcurrentTasks). + Build() - // Adding tasks - for i := 1; i <= 5; i++ { - taskID := fmt.Sprintf("Task_%d", i) - task := Task{ - ID: taskID, - Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { - taskID, ok := args[0].(string) - if !ok { - return "", fmt.Errorf("parse error") - } - - // Simulating task execution - time.Sleep(10 * time.Millisecond) - return taskID, nil - }, - ExecutorArgs: []interface{}{taskID}, - } - err := concurrentExecutor.EnqueueTask(task) - assert.Nil(t, err) + // Adding tasks + for i := 1; i <= numberOfTestingTasks; i++ { + taskID := getTaskIDForTest(i) + task := Task{ + ID: taskID, + Executor: executorForTest, + ExecutorArgs: []interface{}{taskID, false, nil}, } - concurrentExecutor.StartExecution(context.Background()) - resp := concurrentExecutor.WaitForCompletionAndGetResponse() - - // Assertions - assert.Len(t, resp, 5) - for i := 1; i <= 5; i++ { - taskID := fmt.Sprintf("Task_%d", i) - assert.Contains(t, resp, taskID) - assert.Equal(t, fmt.Sprintf("Task_%d", i), resp[taskID].Value) - assert.Nil(t, resp[taskID].Error) + err := concurrentExecutor.EnqueueTask(task) + test.Nil(err) + } + concurrentExecutor.StartExecution(context.Background()) + resp := concurrentExecutor.WaitForCompletionAndGetResponse() + + // Assertions + test.Len(resp, numberOfTestingTasks) + for i := 1; i <= numberOfTestingTasks; i++ { + taskID := getTaskIDForTest(i) + test.Contains(resp, taskID) + test.Equal(taskID, resp[taskID].Value) + test.Nil(resp[taskID].Error) + } + + concurrentExecutor.Close() +} + +func TestConcurrentExecutor_MultipleRun(t *testing.T) { + t.Parallel() + test := assert.New(t) + + numberOfTestingTasksFirstRun := 10 + numberOfTestingTasksSecondRun := 5 + + // Create a ConcurrentExecutor + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + // Adding tasks for the first run + for i := 1; i <= numberOfTestingTasksFirstRun; i++ { + taskID := getTaskIDForTest(i) + task := Task{ + ID: taskID, + Executor: executorForTest, + ExecutorArgs: []interface{}{taskID, false, nil}, } + err := concurrentExecutor.EnqueueTask(task) + test.Nil(err) + } + // Start execution tasks first times + concurrentExecutor.StartExecution(context.Background()) + resp := concurrentExecutor.WaitForCompletionAndGetResponse() - concurrentExecutor.Close() - }) + // Assertions for the first run + test.Len(resp, numberOfTestingTasksFirstRun) + for i := 1; i <= numberOfTestingTasksFirstRun; i++ { + taskID := getTaskIDForTest(i) + test.Contains(resp, taskID) + test.Equal(taskID, resp[taskID].Value) + test.Nil(resp[taskID].Error) + } - t.Run("happy case with 2 times run", func(t *testing.T) { - // Create a ConcurrentExecutor - concurrentExecutor := NewConcurrentExecutor(3, 10) - // Adding tasks - for i := 1; i <= 5; i++ { - taskID := fmt.Sprintf("Task_%d", i) - task := Task{ - ID: taskID, - Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { - taskID, ok := args[0].(string) - if !ok { - return "", fmt.Errorf("parse error") - } - - // Simulating task execution - time.Sleep(10 * time.Millisecond) - return taskID, nil - }, - ExecutorArgs: []interface{}{taskID}, - } - err := concurrentExecutor.EnqueueTask(task) - assert.Nil(t, err) + // Adding tasks for the second run + for i := numberOfTestingTasksFirstRun + 1; i <= numberOfTestingTasksFirstRun+numberOfTestingTasksSecondRun; i++ { + taskID := getTaskIDForTest(i) + task := Task{ + ID: taskID, + Executor: executorForTest, + ExecutorArgs: []interface{}{taskID, false, nil}, } - // First run - concurrentExecutor.StartExecution(context.Background()) - resp := concurrentExecutor.WaitForCompletionAndGetResponse() - - // Assertions - assert.Len(t, resp, 5) - for i := 1; i <= 5; i++ { - taskID := fmt.Sprintf("Task_%d", i) - assert.Contains(t, resp, taskID) - assert.Equal(t, fmt.Sprintf("Task_%d", i), resp[taskID].Value) - assert.Nil(t, resp[taskID].Error) + err := concurrentExecutor.EnqueueTask(task) + test.Nil(err) + } + + // Start execution tasks second times + concurrentExecutor.StartExecution(context.Background()) + resp = concurrentExecutor.WaitForCompletionAndGetResponse() + + // Assertions for the second run + test.Len(resp, numberOfTestingTasksSecondRun) + for i := numberOfTestingTasksFirstRun + 1; i <= numberOfTestingTasksFirstRun+numberOfTestingTasksSecondRun; i++ { + taskID := getTaskIDForTest(i) + test.Contains(resp, taskID) + test.Equal(taskID, resp[taskID].Value) + test.Nil(resp[taskID].Error) + } + concurrentExecutor.Close() +} + +func TestConcurrentExecutor_ErrorHandling(t *testing.T) { + test := assert.New(t) + + // Create a ConcurrentExecutor + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + mapExpectedDummyError := make(map[string]error) + // Adding tasks + for i := 1; i <= numberOfTestingTasks; i++ { + taskID := getTaskIDForTest(i) + isReturnError := i%2 == 0 + var expectedDummyError error = nil + if isReturnError { + expectedDummyError = errors.Errorf("dummy error %d", i) + } + mapExpectedDummyError[taskID] = expectedDummyError + task := Task{ + ID: taskID, + Executor: executorForTest, + ExecutorArgs: []interface{}{taskID, isReturnError, expectedDummyError}, } + err := concurrentExecutor.EnqueueTask(task) + test.Nil(err) + } - // Adding tasks for the second run - for i := 6; i <= 10; i++ { - taskID := fmt.Sprintf("Task_%d", i) - task := Task{ - ID: taskID, - Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { - taskID, ok := args[0].(string) - if !ok { - return "", fmt.Errorf("parse error") - } - - // Simulating task execution - time.Sleep(10 * time.Millisecond) - return taskID, nil - }, - ExecutorArgs: []interface{}{taskID}, - } - err := concurrentExecutor.EnqueueTask(task) - assert.Nil(t, err) + // Start execution tasks + concurrentExecutor.StartExecution(context.Background()) + resp := concurrentExecutor.WaitForCompletionAndGetResponse() + + // Assertions + test.Len(resp, numberOfTestingTasks) + for i := 1; i <= numberOfTestingTasks; i++ { + taskID := getTaskIDForTest(i) + test.Contains(resp, taskID) + + res := resp[taskID] + if res.Error != nil { + test.Equal(mapExpectedDummyError[taskID].Error(), res.Error.Error()) + } else { + test.Equal(taskID, resp[taskID].Value) + test.Nil(resp[taskID].Error) } - // Second run - concurrentExecutor.StartExecution(context.Background()) - resp = concurrentExecutor.WaitForCompletionAndGetResponse() - - // Assertions for the second run - assert.Len(t, resp, 5) - for i := 6; i <= 10; i++ { - taskID := fmt.Sprintf("Task_%d", i) - assert.Contains(t, resp, taskID) - assert.Equal(t, fmt.Sprintf("Task_%d", i), resp[taskID].Value) - assert.Nil(t, resp[taskID].Error) + } + + concurrentExecutor.Close() +} + +func TestConcurrentExecutor_PanicHandling(t *testing.T) { + t.Parallel() + test := assert.New(t) + + // Create a ConcurrentExecutor + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + mapExpectedPanicError := make(map[string]error) + // Adding tasks + for i := 1; i <= numberOfTestingTasks; i++ { + taskID := getTaskIDForTest(i) + isPanic := i%2 == 0 + mapExpectedPanicError[taskID] = nil + expectedDummyPanicMsg := "" + if isPanic { + expectedDummyPanicMsg = "dummy panic " + strconv.Itoa(i) + mapExpectedPanicError[taskID] = errors.Errorf("got panic, recover: %v", expectedDummyPanicMsg) + } + task := Task{ + ID: taskID, + Executor: executorForPanicTest, + ExecutorArgs: []interface{}{taskID, isPanic, expectedDummyPanicMsg}, } + err := concurrentExecutor.EnqueueTask(task) + test.Nil(err) + } + + // Start execution tasks + concurrentExecutor.StartExecution(context.Background()) + resp := concurrentExecutor.WaitForCompletionAndGetResponse() + + // Assertions + test.Len(resp, numberOfTestingTasks) + for i := 1; i <= numberOfTestingTasks; i++ { + taskID := getTaskIDForTest(i) + test.Contains(resp, taskID) + + res := resp[taskID] + if res.Error != nil { + test.Equal(mapExpectedPanicError[taskID].Error(), res.Error.Error()) + } else { + test.Equal(taskID, resp[taskID].Value) + test.Nil(resp[taskID].Error) + } + } + + concurrentExecutor.Close() +} + +func TestConcurrentExecutor_EnqueueTask(t *testing.T) { + t.Run("Panic Enqueue a closed Executor", func(t *testing.T) { + test := assert.New(t) + + // Panic when enqueue a task into a closed ConcurrentExecutor + defer func() { + if r := recover(); r != nil { + test.Equal(r, ClosedPanicMsg) + } + }() + + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + concurrentExecutor.Close() + concurrentExecutor.EnqueueTask(Task{ + ID: "test", + Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { + return "", nil + }, + ExecutorArgs: nil, + }) concurrentExecutor.Close() }) - t.Run("error handling", func(t *testing.T) { - // Create a ConcurrentExecutor - concurrentExecutor := NewConcurrentExecutor(10, 5) - expectedError := make(map[string]error) - // Adding tasks - for i := 1; i <= 5; i++ { - taskID := fmt.Sprintf("Task_%d", i) - expectedError[taskID] = fmt.Errorf("dummy error %d", i) + t.Run("Enqueue task into a full queue", func(t *testing.T) { + test := assert.New(t) + + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + + for i := 1; i <= defautMaxTaskQueueSize; i++ { task := Task{ - ID: taskID, + ID: getTaskIDForTest(i), Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { - taskID, ok := args[0].(string) - if !ok { - return "", fmt.Errorf("parse task_id error") - } - index, ok := args[1].(int) - if !ok { - return "", fmt.Errorf("parse index error") - } - - // Simulating task execution - time.Sleep(10 * time.Millisecond) - if index%2 == 0 { - return "", expectedError[taskID] - } - - return taskID, nil + return "", nil }, - ExecutorArgs: []interface{}{taskID, i}, + ExecutorArgs: nil, } err := concurrentExecutor.EnqueueTask(task) - assert.Nil(t, err) + test.Nil(err) } - concurrentExecutor.StartExecution(context.Background()) - resp := concurrentExecutor.WaitForCompletionAndGetResponse() - - // Assertions - assert.Len(t, resp, 5) - for i := 1; i <= 5; i++ { - taskID := fmt.Sprintf("Task_%d", i) - assert.Contains(t, resp, taskID) - - res := resp[taskID] - if res.Error != nil { - assert.Equal(t, expectedError[taskID].Error(), res.Error.Error()) - } else { - assert.Equal(t, taskID, resp[taskID].Value) - assert.Nil(t, resp[taskID].Error) + + // Enqueue a task into a full queue + err := concurrentExecutor.EnqueueTask(Task{ + ID: "test-exceeded-queue-size", + Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { + return "", nil + }, + ExecutorArgs: nil, + }) + + test.Equal(FullQueueErr, err.Error()) + concurrentExecutor.Close() + }) + +} + +func TestConcurrentExecutor_StartExecution(t *testing.T) { + t.Run("Panic StartExecution a closed Executor", func(t *testing.T) { + test := assert.New(t) + + // Panic when enqueue a task into a closed ConcurrentExecutor + defer func() { + if r := recover(); r != nil { + test.Equal(r, ClosedPanicMsg) } - } + }() + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + concurrentExecutor.EnqueueTask(Task{ + ID: "test", + Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { + return "", nil + }, + ExecutorArgs: nil, + }) concurrentExecutor.Close() + concurrentExecutor.StartExecution(context.Background()) }) } + +func TestConcurrentExecutor_Close(t *testing.T) { + test := assert.New(t) + + concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) + concurrentExecutor.Close() + + test.Equal(true, concurrentExecutor.closed) +} From dfe1a2b38e4d629eacadcb100a6acad6cf8d17d8 Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:05:08 +0700 Subject: [PATCH 03/10] Add workflow build and test --- .codecov.yml | 5 ++++ .github/workflows/go.yml | 54 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 .codecov.yml create mode 100644 .github/workflows/go.yml diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 0000000..e795d24 --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,5 @@ +coverage: + status: + patch: + default: + target: 85% \ No newline at end of file diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..acd1fea --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,54 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + branches: ['main'] + pull_request: + branches: ['main'] + +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go 1.20 + uses: actions/setup-go@v4 + with: + go-version: '1.20' + id: go + + - name: Get dependencies + run: | + go get -v -t -d ./... + + - name: Build + run: go build -v ./... + + unit-test: + name: Unit Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go 1.20 + uses: actions/setup-go@v4 + with: + go-version: '1.20' + id: go + + - name: Get dependencies + run: | + go get -v -t -d ./... + + - name: Run Unit Test + run: go test -v -coverprofile=coverage.out . + + - name: Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.out From a7bc77056f09d28747781c6c4b3c205cbd106f0d Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:10:04 +0700 Subject: [PATCH 04/10] Fix go version on go.mod to match with workflow file --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 091350e..859fc21 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/anhhuu/conexec -go 1.21.5 +go 1.20 require ( github.com/pkg/errors v0.9.1 From 829b064f70668a95b1c607b1a2fa3d396006dc84 Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:21:21 +0700 Subject: [PATCH 05/10] Upgrade go 1.21, add golangci-lint workflow --- .github/workflows/go.yml | 8 ++++---- .github/workflows/golangci-lint.yml | 22 ++++++++++++++++++++++ go.mod | 2 +- 3 files changed, 27 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/golangci-lint.yml diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index acd1fea..937caaa 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -16,10 +16,10 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up Go 1.20 + - name: Set up Go 1.21 uses: actions/setup-go@v4 with: - go-version: '1.20' + go-version: '1.21' id: go - name: Get dependencies @@ -35,10 +35,10 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up Go 1.20 + - name: Set up Go 1.21 uses: actions/setup-go@v4 with: - go-version: '1.20' + go-version: '1.21' id: go - name: Get dependencies diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml new file mode 100644 index 0000000..08eb9ff --- /dev/null +++ b/.github/workflows/golangci-lint.yml @@ -0,0 +1,22 @@ +name: golangci-lint + +on: + pull_request: + branches: ['main'] + +jobs: + lint: + name: Check golangci-lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go 1.21 + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.54 diff --git a/go.mod b/go.mod index 859fc21..06af59e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/anhhuu/conexec -go 1.20 +go 1.21 require ( github.com/pkg/errors v0.9.1 From 3776cf2c03b61e7822924f81b34fbcf53952209b Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:24:05 +0700 Subject: [PATCH 06/10] Update linter permission --- .github/workflows/go.yml | 2 +- .github/workflows/golangci-lint.yml | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 937caaa..e25dd65 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -1,7 +1,7 @@ # This workflow will build a golang project # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go -name: Go +name: golang on: push: diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 08eb9ff..35d30c4 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -4,9 +4,14 @@ on: pull_request: branches: ['main'] +permissions: + contents: read + pull-requests: read + checks: write + jobs: lint: - name: Check golangci-lint + name: Check lint runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 From e57b08f6498b1b1c748ea87a23b00a2e8f9f6c47 Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:29:20 +0700 Subject: [PATCH 07/10] Fix lint issue --- concurrent_executor_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/concurrent_executor_test.go b/concurrent_executor_test.go index fee2a0a..26ea249 100644 --- a/concurrent_executor_test.go +++ b/concurrent_executor_test.go @@ -282,14 +282,13 @@ func TestConcurrentExecutor_EnqueueTask(t *testing.T) { concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) concurrentExecutor.Close() - concurrentExecutor.EnqueueTask(Task{ + _ = concurrentExecutor.EnqueueTask(Task{ ID: "test", Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { return "", nil }, ExecutorArgs: nil, }) - concurrentExecutor.Close() }) t.Run("Enqueue task into a full queue", func(t *testing.T) { @@ -336,13 +335,14 @@ func TestConcurrentExecutor_StartExecution(t *testing.T) { }() concurrentExecutor := NewConcurrentExecutor(defaultMaxConcurrentTasks, defautMaxTaskQueueSize) - concurrentExecutor.EnqueueTask(Task{ + err := concurrentExecutor.EnqueueTask(Task{ ID: "test", Executor: func(ctx context.Context, args ...interface{}) (interface{}, error) { return "", nil }, ExecutorArgs: nil, }) + test.Nil(err) concurrentExecutor.Close() concurrentExecutor.StartExecution(context.Background()) }) From 2b2e10929daa5b8117ded7c2cf749a7a5cd69085 Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:31:11 +0700 Subject: [PATCH 08/10] Increase code cov condition --- .codecov.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.codecov.yml b/.codecov.yml index e795d24..2f15125 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -2,4 +2,4 @@ coverage: status: patch: default: - target: 85% \ No newline at end of file + target: 90% \ No newline at end of file From b3b291c3ea5dbf75b0d76e663910ab041fe26ab9 Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:44:46 +0700 Subject: [PATCH 09/10] Update action name to `build` --- .github/workflows/go.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index e25dd65..595ef74 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -1,7 +1,7 @@ # This workflow will build a golang project # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go -name: golang +name: build on: push: From 555a026dbd9c1cc045d20c72a681970a8c379cea Mon Sep 17 00:00:00 2001 From: anhhuu Date: Sun, 24 Dec 2023 18:44:53 +0700 Subject: [PATCH 10/10] Update README.md --- README.md | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 2bed8e3..2fce99a 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,17 @@ -# Concurrent Executor for Asynchronous Task Execution +# conexec # -Package name: `anhhuu/conexec` +[![Build](https://github.com/anhhuu/conexec/workflows/build/badge.svg?branch=main)](https://github.com/anhhuu/conexec/actions) +[![Codecov](https://codecov.io/gh/anhhuu/conexec/branch/main/graph/badge.svg)](https://codecov.io/gh/anhhuu/conexec) +[![GoReportCard](https://goreportcard.com/badge/github.com/anhhuu/conexec)](https://goreportcard.com/report/github.com/anhhuu/conexec) +[![MIT License](https://img.shields.io/badge/License-MIT-green.svg)](https://github.com/anhhuu/conexec/blob/main/LICENSE) -## Overview +Package `anhhuu/conexec` (**Concurrent Executor for Asynchronous Task Execution**) provides a Concurrent Executor that facilitates the concurrent execution of multiple tasks, managing its own task queue. It is designed to handle asynchronous task execution with controlled concurrency and a task queue to ensure efficient resource utilization. -The `conexec` package provides a Concurrent Executor that facilitates the concurrent execution of multiple tasks, managing its own task queue. It is designed to handle asynchronous task execution with controlled concurrency and a task queue to ensure efficient resource utilization. +## Installation -## Features - -- **Task Execution:** Execute tasks concurrently while respecting the specified maximum concurrent task limit. -- **Task Queue:** Manage a task queue to handle tasks that exceed the current concurrency limit. -- **Error Handling:** Capture and report errors during task execution. -- **Task Response:** Retrieve responses and errors for each completed task. +```bash +go get github.com/anhhuu/conexec +``` ## Example @@ -55,8 +55,21 @@ func main() { } ``` +## Features + +- **Task Execution:** Execute tasks concurrently while respecting the specified maximum concurrent task limit. +- **Task Queue:** Manage a task queue to handle tasks that exceed the current concurrency limit. +- **Error Handling:** Capture and report errors/panics during task execution. +- **Task Response:** Retrieve responses and errors for each completed task. + ## Usage +```go +import ( + "github.com/anhhuu/conexec" +) +``` + ### Task Structure ```go @@ -69,7 +82,7 @@ type Task struct { type TaskExecutor func(ctx context.Context, args ...interface{}) (interface{}, error) ``` -The `Task` struct represents a task with a unique identifier (`ID`), an executor function (`Executor`), and optional executor arguments (ExecutorArgs). The executor function takes a context and variable arguments and returns a value and an error. +The `Task` struct represents a task with a unique identifier (`ID`), an executor function (`Executor`), and optional executor arguments (`ExecutorArgs`). The executor function takes a context and variable arguments and returns a value and an error. ### Concurrent Executor Initialization @@ -78,6 +91,14 @@ func NewConcurrentExecutor(maxConcurrentTasks, maxTaskQueueSize int) *Concurrent ``` Initialize a new Concurrent Executor with the specified maximum concurrent tasks and task queue size. +Or simple to use builder: + +```go +concurrentExecutor := conexec.NewConcurrentExecutorBuilder(). + WithMaxTaskQueueSize(defautMaxTaskQueueSize). + WithMaxConcurrentTasks(defaultMaxConcurrentTasks). + Build() +``` ### Enqueue Task