Skip to content

Commit

Permalink
Export worker struct
Browse files Browse the repository at this point in the history
  • Loading branch information
NV4RE committed Oct 11, 2023
1 parent 1cd4bbe commit 1627db8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
22 changes: 11 additions & 11 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (w *WorkerConfig) SetDefault() {
}
}

type kafkaWorker struct {
type KafkaWorker struct {
consumer kafka_consumer.Consumer
running bool
error error
Expand All @@ -59,7 +59,7 @@ type kafkaWorker struct {
errorCount uint64
}

func (k *kafkaWorker) Health() error {
func (k *KafkaWorker) Health() error {
if !k.running {
return ErrWorkerStopped
}
Expand All @@ -72,12 +72,12 @@ func (k *kafkaWorker) Health() error {
}

// resetError
func (k *kafkaWorker) resetError() {
func (k *KafkaWorker) resetError() {
k.error = nil
k.errorCount = 0
}

func (k *kafkaWorker) handleErrorWithBackoff(err error) {
func (k *KafkaWorker) handleErrorWithBackoff(err error) {
if err == nil {
return
}
Expand All @@ -99,7 +99,7 @@ func (k *kafkaWorker) handleErrorWithBackoff(err error) {
}
}

func (k *kafkaWorker) Start(ctx context.Context) error {
func (k *KafkaWorker) Start(ctx context.Context) error {
if len(k.handlers) == 0 {
return ErrNoHandlersDefined
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func (k *kafkaWorker) Start(ctx context.Context) error {
}
}

func (k *kafkaWorker) pull() ([]kafka.Message, error) {
func (k *KafkaWorker) pull() ([]kafka.Message, error) {
ctx, cancel := context.WithTimeout(context.Background(), k.config.MaxWait)
defer cancel()

Expand All @@ -152,7 +152,7 @@ func (k *kafkaWorker) pull() ([]kafka.Message, error) {
for {
msg, err := k.consumer.FetchMessage(ctx)
if err != nil {
if err == context.DeadlineExceeded {
if errors.Is(err, context.DeadlineExceeded) {
return messages, nil
}

Expand All @@ -167,10 +167,10 @@ func (k *kafkaWorker) pull() ([]kafka.Message, error) {
}
}

func NewKafkaWorkerWithCustomer(workerConfig WorkerConfig, customer *kafka.Reader, handlers ...handler.Handler) *kafkaWorker {
func NewKafkaWorkerWithCustomer(workerConfig WorkerConfig, customer *kafka.Reader, handlers ...handler.Handler) *KafkaWorker {
workerConfig.SetDefault()

return &kafkaWorker{
return &KafkaWorker{
consumer: customer,
running: false,
error: nil,
Expand All @@ -179,7 +179,7 @@ func NewKafkaWorkerWithCustomer(workerConfig WorkerConfig, customer *kafka.Reade
}
}

func NewKafkaWorker(workerConfig WorkerConfig, handlers ...handler.Handler) *kafkaWorker {
func NewKafkaWorker(workerConfig WorkerConfig, handlers ...handler.Handler) *KafkaWorker {
workerConfig.SetDefault()
customer := kafka.NewReader(kafka.ReaderConfig{
Brokers: workerConfig.KafkaBrokers,
Expand All @@ -204,7 +204,7 @@ func NewKafkaWorker(workerConfig WorkerConfig, handlers ...handler.Handler) *kaf
GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}, kafka.RoundRobinGroupBalancer{}}, // kafka-go did not support sticky group balancer :(
})

return &kafkaWorker{
return &KafkaWorker{
consumer: customer,
running: false,
error: nil,
Expand Down
9 changes: 5 additions & 4 deletions worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"os"
"os/signal"
"sync"
"syscall"
)

type KafkaWorkerManager struct {
workers []*kafkaWorker
workers []*KafkaWorker
}

// Start all workers
Expand All @@ -20,7 +21,7 @@ func (k KafkaWorkerManager) Start() []error {
// Wait for OS SIGINT/SIGTERM then gracefully shutdown the workers
{
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, os.Kill)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-exit
cancel()
Expand All @@ -30,7 +31,7 @@ func (k KafkaWorkerManager) Start() []error {
errs := make([]error, len(k.workers))
for i, worker := range k.workers {
wg.Add(1)
go func(worker *kafkaWorker, i int) {
go func(worker *KafkaWorker, i int) {
defer wg.Done()
errs[i] = worker.Start(ctx)
}(worker, i)
Expand All @@ -51,7 +52,7 @@ func (k KafkaWorkerManager) Health() error {
return nil
}

func NewWorkerManager(workers ...*kafkaWorker) *KafkaWorkerManager {
func NewWorkerManager(workers ...*KafkaWorker) *KafkaWorkerManager {
return &KafkaWorkerManager{
workers: workers,
}
Expand Down

0 comments on commit 1627db8

Please sign in to comment.