diff --git a/actors/pid.go b/actors/pid.go index ce016e8f..45648ee4 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -225,7 +225,7 @@ type pid struct { shutdownTimeout atomic.Duration // specifies the actor mailbox - mailbox *queue.Queue[ReceiveContext] + mailbox *queue.Mpsc[ReceiveContext] // receives a shutdown signal. Once the signal is received // the actor is shut down gracefully. @@ -319,7 +319,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption fieldsLocker: &sync.RWMutex{}, stopLocker: &sync.Mutex{}, httpClient: http.NewClient(), - mailbox: queue.New[ReceiveContext](), + mailbox: queue.NewMpsc[ReceiveContext](), stashBuffer: nil, stashLocker: &sync.Mutex{}, eventsStream: nil, @@ -550,7 +550,6 @@ func (x *pid) Restart(ctx context.Context) error { ticker.Stop() } - x.mailbox = queue.New[ReceiveContext]() x.resetBehavior() if err := x.init(ctx); err != nil { return err @@ -1314,7 +1313,6 @@ func (x *pid) reset() { } } x.processedCount.Store(0) - x.mailbox.Close() } func (x *pid) freeWatchers(ctx context.Context) { @@ -1375,11 +1373,6 @@ func (x *pid) receive() { case <-x.shutdownSignal: return default: - // break out of the loop when the channel is closed - if x.mailbox.IsClosed() { - return - } - // fetch the data and continue the loop when there are no records yet received, ok := x.mailbox.Pop() if !ok { diff --git a/bench/benchmark_test.go b/bench/benchmark_test.go index dd66d7f1..5ffccdaa 100644 --- a/bench/benchmark_test.go +++ b/bench/benchmark_test.go @@ -27,9 +27,11 @@ package bench import ( "context" "fmt" + "runtime" "testing" "time" + . "github.com/klauspost/cpuid/v2" //nolint "github.com/stretchr/testify/require" "github.com/tochemey/goakt/v2/actors" @@ -167,19 +169,22 @@ func TestBenchmark_BenchTell(t *testing.T) { actorsCount := 2000 workersCount := 20 - duration := 10 * time.Second + duration := 30 * time.Second benchmark := NewBenchmark(actorsCount, workersCount, duration) require.NoError(t, benchmark.Start(ctx)) - fmt.Printf("starting benchmark for (%v): num workers:(%d)\n", duration, workersCount) + fmt.Printf("Starting benchmark for (%v): num workers:(%d)\n", duration, workersCount) if err := benchmark.BenchTell(ctx); err != nil { t.Fatal(err) } - fmt.Printf("total actors spawned: (%d)\n", actorsCount) - fmt.Printf("total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration) - fmt.Printf("messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds())) + fmt.Printf("Go Version: %s\n", runtime.Version()) + fmt.Printf("cpu: %s (Physical Cores: %d)\n", CPU.BrandName, CPU.PhysicalCores) + fmt.Printf("Runtime CPUs: %d\n", runtime.NumCPU()) + fmt.Printf("Total actors spawned: (%d)\n", actorsCount) + fmt.Printf("Total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration) + fmt.Printf("Messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds())) t.Cleanup(func() { require.NoError(t, benchmark.Stop(ctx)) }) @@ -190,19 +195,22 @@ func TestBenchmark_BenchAsk(t *testing.T) { actorsCount := 2000 workersCount := 20 - duration := 10 * time.Second + duration := 30 * time.Second benchmark := NewBenchmark(actorsCount, workersCount, duration) require.NoError(t, benchmark.Start(ctx)) - fmt.Printf("starting benchmark for (%v): num workers:(%d)\n", duration, workersCount) + fmt.Printf("Starting benchmark for (%v): num workers:(%d)\n", duration, workersCount) if err := benchmark.BenchAsk(ctx); err != nil { t.Fatal(err) } - fmt.Printf("total actors spawned: (%d)\n", actorsCount) - fmt.Printf("total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration) - fmt.Printf("messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds())) + fmt.Printf("Go Version: %s\n", runtime.Version()) + fmt.Printf("cpu: %s (Physical Cores: %d)\n", CPU.BrandName, CPU.PhysicalCores) + fmt.Printf("Runtime CPUs: %d\n", runtime.NumCPU()) + fmt.Printf("Total actors spawned: (%d)\n", actorsCount) + fmt.Printf("Total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration) + fmt.Printf("Messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds())) t.Cleanup(func() { require.NoError(t, benchmark.Stop(ctx)) }) diff --git a/go.mod b/go.mod index 49e88332..f867a8cf 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/grandcat/zeroconf v1.0.0 + github.com/klauspost/cpuid/v2 v2.2.8 github.com/nats-io/nats-server/v2 v2.10.18 github.com/nats-io/nats.go v1.36.0 github.com/pkg/errors v0.9.1 @@ -69,7 +70,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect - github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/miekg/dns v1.1.61 // indirect github.com/minio/highwayhash v1.0.3 // indirect diff --git a/internal/queue/mpsc.go b/internal/queue/mpsc.go new file mode 100644 index 00000000..9ac2f26b --- /dev/null +++ b/internal/queue/mpsc.go @@ -0,0 +1,100 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package queue + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +// node returns the queue node +type node[T any] struct { + value T + next *node[T] +} + +// Mpsc is a Multi-Producer-Single-Consumer Queue +// reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html +type Mpsc[T any] struct { + head *node[T] + tail *node[T] + length int64 + lock sync.Mutex +} + +// NewMpsc create an instance of Mpsc +func NewMpsc[T any]() *Mpsc[T] { + item := new(node[T]) + return &Mpsc[T]{ + head: item, + tail: item, + length: 0, + lock: sync.Mutex{}, + } +} + +// Push place the given value in the queue head (FIFO). Returns always true +func (q *Mpsc[T]) Push(value T) bool { + tnode := &node[T]{ + value: value, + } + previousHead := (*node[T])(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(tnode))) + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&previousHead.next)), unsafe.Pointer(tnode)) + atomic.AddInt64(&q.length, 1) + return true +} + +// Pop takes the QueueItem from the queue tail. Returns false if the queue is empty. Can be used in a single consumer (goroutine) only. +func (q *Mpsc[T]) Pop() (T, bool) { + var tnil T + next := (*node[T])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail.next)))) + if next == nil { + return tnil, false + } + + q.lock.Lock() + q.tail = next + q.lock.Unlock() + value := next.value + next.value = tnil + atomic.AddInt64(&q.length, -1) + return value, true +} + +// Len returns queue length +func (q *Mpsc[T]) Len() int64 { + return atomic.LoadInt64(&q.length) +} + +// IsEmpty returns true when the queue is empty +// must be called from a single, consumer goroutine +func (q *Mpsc[T]) IsEmpty() bool { + q.lock.Lock() + tail := q.tail + q.lock.Unlock() + next := (*node[T])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) + return next == nil +} diff --git a/internal/queue/mpsc_test.go b/internal/queue/mpsc_test.go new file mode 100644 index 00000000..4fd1e6cb --- /dev/null +++ b/internal/queue/mpsc_test.go @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TODO: add go routine-based tests +func TestMpscQueue(t *testing.T) { + t.Run("With Push/Pop", func(t *testing.T) { + q := NewMpsc[int]() + require.True(t, q.IsEmpty()) + for j := 0; j < 100; j++ { + if q.Len() != 0 { + t.Fatal("expected no elements") + } else if _, ok := q.Pop(); ok { + t.Fatal("expected no elements") + } + + for i := 0; i < j; i++ { + q.Push(i) + } + + for i := 0; i < j; i++ { + if x, ok := q.Pop(); !ok { + t.Fatal("expected an element") + } else if x != i { + t.Fatalf("expected %d got %d", i, x) + } + } + } + + a := 0 + r := 0 + for j := 0; j < 100; j++ { + for i := 0; i < 4; i++ { + q.Push(a) + a++ + } + + for i := 0; i < 2; i++ { + if x, ok := q.Pop(); !ok { + t.Fatal("expected an element") + } else if x != r { + t.Fatalf("expected %d got %d", r, x) + } + r++ + } + } + + if q.Len() != 200 { + t.Fatalf("expected 200 elements have %d", q.Len()) + } + + assert.True(t, q.Len() > 0) + }) +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index bea7b8c4..ee2b321a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -31,8 +31,6 @@ import "sync" const minQueueLen = 16 // Queue thread-safe Queue using ring-buffer -// reference: https://blog.dubbelboer.com/2015/04/25/go-faster-queue.html -// https://github.com/eapache/queue type Queue[T any] struct { mu sync.RWMutex cond *sync.Cond @@ -54,10 +52,10 @@ func New[T any]() *Queue[T] { return sq } -// Push adds an item to the back of the queue +// Push adds an QueueItem to the back of the queue // It can be safely called from multiple goroutines // It will return false if the queue is closed. -// In that case the Item is dropped. +// In that case the node is dropped. func (q *Queue[T]) Push(i T) bool { q.mu.Lock() if q.closed { @@ -120,7 +118,7 @@ func (q *Queue[T]) IsClosed() bool { return c } -// Wait for an item to be added. +// Wait for an QueueItem to be added. // If there is items on the queue the first will // be returned immediately. // Will return nil, false if the queue is closed. @@ -141,7 +139,7 @@ func (q *Queue[T]) Wait() (T, bool) { return q.Pop() } -// Pop removes the item from the front of the queue +// Pop removes the QueueItem from the front of the queue // If false is returned, it either means 1) there were no items on the queue // or 2) the queue is closed. func (q *Queue[T]) Pop() (T, bool) {