diff --git a/worker.go b/worker.go index 1fa7680..b349e0f 100644 --- a/worker.go +++ b/worker.go @@ -49,7 +49,7 @@ func (w *WorkerConfig) SetDefault() { } } -type kafkaWorker struct { +type KafkaWorker struct { consumer kafka_consumer.Consumer running bool error error @@ -59,7 +59,7 @@ type kafkaWorker struct { errorCount uint64 } -func (k *kafkaWorker) Health() error { +func (k *KafkaWorker) Health() error { if !k.running { return ErrWorkerStopped } @@ -72,12 +72,12 @@ func (k *kafkaWorker) Health() error { } // resetError -func (k *kafkaWorker) resetError() { +func (k *KafkaWorker) resetError() { k.error = nil k.errorCount = 0 } -func (k *kafkaWorker) handleErrorWithBackoff(err error) { +func (k *KafkaWorker) handleErrorWithBackoff(err error) { if err == nil { return } @@ -99,7 +99,7 @@ func (k *kafkaWorker) handleErrorWithBackoff(err error) { } } -func (k *kafkaWorker) Start(ctx context.Context) error { +func (k *KafkaWorker) Start(ctx context.Context) error { if len(k.handlers) == 0 { return ErrNoHandlersDefined } @@ -143,7 +143,7 @@ func (k *kafkaWorker) Start(ctx context.Context) error { } } -func (k *kafkaWorker) pull() ([]kafka.Message, error) { +func (k *KafkaWorker) pull() ([]kafka.Message, error) { ctx, cancel := context.WithTimeout(context.Background(), k.config.MaxWait) defer cancel() @@ -152,7 +152,7 @@ func (k *kafkaWorker) pull() ([]kafka.Message, error) { for { msg, err := k.consumer.FetchMessage(ctx) if err != nil { - if err == context.DeadlineExceeded { + if errors.Is(err, context.DeadlineExceeded) { return messages, nil } @@ -167,10 +167,10 @@ func (k *kafkaWorker) pull() ([]kafka.Message, error) { } } -func NewKafkaWorkerWithCustomer(workerConfig WorkerConfig, customer *kafka.Reader, handlers ...handler.Handler) *kafkaWorker { +func NewKafkaWorkerWithCustomer(workerConfig WorkerConfig, customer *kafka.Reader, handlers ...handler.Handler) *KafkaWorker { workerConfig.SetDefault() - return &kafkaWorker{ + return &KafkaWorker{ consumer: customer, running: false, error: nil, @@ -179,7 +179,7 @@ func NewKafkaWorkerWithCustomer(workerConfig WorkerConfig, customer *kafka.Reade } } -func NewKafkaWorker(workerConfig WorkerConfig, handlers ...handler.Handler) *kafkaWorker { +func NewKafkaWorker(workerConfig WorkerConfig, handlers ...handler.Handler) *KafkaWorker { workerConfig.SetDefault() customer := kafka.NewReader(kafka.ReaderConfig{ Brokers: workerConfig.KafkaBrokers, @@ -204,7 +204,7 @@ func NewKafkaWorker(workerConfig WorkerConfig, handlers ...handler.Handler) *kaf GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}, kafka.RoundRobinGroupBalancer{}}, // kafka-go did not support sticky group balancer :( }) - return &kafkaWorker{ + return &KafkaWorker{ consumer: customer, running: false, error: nil, diff --git a/worker_manager.go b/worker_manager.go index a470069..a7d497f 100644 --- a/worker_manager.go +++ b/worker_manager.go @@ -6,10 +6,11 @@ import ( "os" "os/signal" "sync" + "syscall" ) type KafkaWorkerManager struct { - workers []*kafkaWorker + workers []*KafkaWorker } // Start all workers @@ -20,7 +21,7 @@ func (k KafkaWorkerManager) Start() []error { // Wait for OS SIGINT/SIGTERM then gracefully shutdown the workers { exit := make(chan os.Signal, 1) - signal.Notify(exit, os.Interrupt, os.Kill) + signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM) go func() { <-exit cancel() @@ -30,7 +31,7 @@ func (k KafkaWorkerManager) Start() []error { errs := make([]error, len(k.workers)) for i, worker := range k.workers { wg.Add(1) - go func(worker *kafkaWorker, i int) { + go func(worker *KafkaWorker, i int) { defer wg.Done() errs[i] = worker.Start(ctx) }(worker, i) @@ -51,7 +52,7 @@ func (k KafkaWorkerManager) Health() error { return nil } -func NewWorkerManager(workers ...*kafkaWorker) *KafkaWorkerManager { +func NewWorkerManager(workers ...*KafkaWorker) *KafkaWorkerManager { return &KafkaWorkerManager{ workers: workers, }