Skip to content

Commit

Permalink
Merge pull request #2 from anhhuu/fix-handle-panic-and-add-work-flow
Browse files Browse the repository at this point in the history
Fix handle panic and add work flow
  • Loading branch information
anhhuu authored Dec 24, 2023
2 parents 2cf940d + 555a026 commit 81aac01
Show file tree
Hide file tree
Showing 8 changed files with 459 additions and 165 deletions.
5 changes: 5 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
coverage:
status:
patch:
default:
target: 90%
54 changes: 54 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: build

on:
push:
branches: ['main']
pull_request:
branches: ['main']

jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go 1.21
uses: actions/setup-go@v4
with:
go-version: '1.21'
id: go

- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Build
run: go build -v ./...

unit-test:
name: Unit Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go 1.21
uses: actions/setup-go@v4
with:
go-version: '1.21'
id: go

- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Run Unit Test
run: go test -v -coverprofile=coverage.out .

- name: Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.out
27 changes: 27 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: golangci-lint

on:
pull_request:
branches: ['main']

permissions:
contents: read
pull-requests: read
checks: write

jobs:
lint:
name: Check lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go 1.21
uses: actions/setup-go@v4
with:
go-version: '1.21'

- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.54
43 changes: 32 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Concurrent Executor for Asynchronous Task Execution
# conexec #

Package name: `anhhuu/conexec`
[![Build](https://github.com/anhhuu/conexec/workflows/build/badge.svg?branch=main)](https://github.com/anhhuu/conexec/actions)
[![Codecov](https://codecov.io/gh/anhhuu/conexec/branch/main/graph/badge.svg)](https://codecov.io/gh/anhhuu/conexec)
[![GoReportCard](https://goreportcard.com/badge/github.com/anhhuu/conexec)](https://goreportcard.com/report/github.com/anhhuu/conexec)
[![MIT License](https://img.shields.io/badge/License-MIT-green.svg)](https://github.com/anhhuu/conexec/blob/main/LICENSE)

## Overview
Package `anhhuu/conexec` (**Concurrent Executor for Asynchronous Task Execution**) provides a Concurrent Executor that facilitates the concurrent execution of multiple tasks, managing its own task queue. It is designed to handle asynchronous task execution with controlled concurrency and a task queue to ensure efficient resource utilization.

The `conexec` package provides a Concurrent Executor that facilitates the concurrent execution of multiple tasks, managing its own task queue. It is designed to handle asynchronous task execution with controlled concurrency and a task queue to ensure efficient resource utilization.
## Installation

## Features

- **Task Execution:** Execute tasks concurrently while respecting the specified maximum concurrent task limit.
- **Task Queue:** Manage a task queue to handle tasks that exceed the current concurrency limit.
- **Error Handling:** Capture and report errors during task execution.
- **Task Response:** Retrieve responses and errors for each completed task.
```bash
go get github.com/anhhuu/conexec
```

## Example

Expand Down Expand Up @@ -55,8 +55,21 @@ func main() {
}
```

## Features

- **Task Execution:** Execute tasks concurrently while respecting the specified maximum concurrent task limit.
- **Task Queue:** Manage a task queue to handle tasks that exceed the current concurrency limit.
- **Error Handling:** Capture and report errors/panics during task execution.
- **Task Response:** Retrieve responses and errors for each completed task.

## Usage

```go
import (
"github.com/anhhuu/conexec"
)
```

### Task Structure

```go
Expand All @@ -69,7 +82,7 @@ type Task struct {
type TaskExecutor func(ctx context.Context, args ...interface{}) (interface{}, error)
```

The `Task` struct represents a task with a unique identifier (`ID`), an executor function (`Executor`), and optional executor arguments (ExecutorArgs). The executor function takes a context and variable arguments and returns a value and an error.
The `Task` struct represents a task with a unique identifier (`ID`), an executor function (`Executor`), and optional executor arguments (`ExecutorArgs`). The executor function takes a context and variable arguments and returns a value and an error.

### Concurrent Executor Initialization

Expand All @@ -78,6 +91,14 @@ func NewConcurrentExecutor(maxConcurrentTasks, maxTaskQueueSize int) *Concurrent
```

Initialize a new Concurrent Executor with the specified maximum concurrent tasks and task queue size.
Or simple to use builder:

```go
concurrentExecutor := conexec.NewConcurrentExecutorBuilder().
WithMaxTaskQueueSize(defautMaxTaskQueueSize).
WithMaxConcurrentTasks(defaultMaxConcurrentTasks).
Build()
```

### Enqueue Task

Expand Down
18 changes: 11 additions & 7 deletions concurrent_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package conexec

import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"
)

/*
// If ExecutorArgs is valid, it will be passed into Executor.
// If ExecutorArgs is valid, it will be passed into Executor func.
{
...
Expand All @@ -21,7 +22,7 @@ import (
task.Executor = func(ctx context.Context, args ...interface{}) (interface{}, error) {
arg0, ok := args[0].(type)
if !ok {
return "", fmt.Errorf("parse error")
return "", errors.Errorf("parse error")
}
// Do something with arg0
Expand Down Expand Up @@ -75,13 +76,16 @@ func (concurrentExecutor *ConcurrentExecutor) runTask(ctx context.Context) {
defer func() {
// Recover from panics and report errors to responseChan
if r := recover(); r != nil {
concurrentExecutor.mutex.Lock()
// Push error panic handling
concurrentExecutor.responseChan <- &TaskResponse{
TaskID: task.ID,
Value: nil,
Error: fmt.Errorf("got panic, recover: %v", r),
Error: errors.Errorf("got panic, recover: %v", r),
}
concurrentExecutor.mutex.Unlock()

// This goroutine is destroyed because panic -> create new to replace it, support handle another task
concurrentExecutor.waitgroup.Add(1)
go concurrentExecutor.runTask(ctx)
}
}()

Expand Down Expand Up @@ -149,7 +153,7 @@ func (concurrentExecutor *ConcurrentExecutor) EnqueueTask(task Task) error {
case concurrentExecutor.taskQueueChan <- task:
return nil
default:
return fmt.Errorf(FullQueueErr)
return errors.Errorf(FullQueueErr)
}
}

Expand Down
Loading

0 comments on commit 81aac01

Please sign in to comment.