Skip to content

Commit

Permalink
perf: use mpsc queue (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Aug 12, 2024
1 parent 93280bb commit fddaedf
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 26 deletions.
11 changes: 2 additions & 9 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ type pid struct {
shutdownTimeout atomic.Duration

// specifies the actor mailbox
mailbox *queue.Queue[ReceiveContext]
mailbox *queue.Mpsc[ReceiveContext]

// receives a shutdown signal. Once the signal is received
// the actor is shut down gracefully.
Expand Down Expand Up @@ -319,7 +319,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
fieldsLocker: &sync.RWMutex{},
stopLocker: &sync.Mutex{},
httpClient: http.NewClient(),
mailbox: queue.New[ReceiveContext](),
mailbox: queue.NewMpsc[ReceiveContext](),
stashBuffer: nil,
stashLocker: &sync.Mutex{},
eventsStream: nil,
Expand Down Expand Up @@ -550,7 +550,6 @@ func (x *pid) Restart(ctx context.Context) error {
ticker.Stop()
}

x.mailbox = queue.New[ReceiveContext]()
x.resetBehavior()
if err := x.init(ctx); err != nil {
return err
Expand Down Expand Up @@ -1314,7 +1313,6 @@ func (x *pid) reset() {
}
}
x.processedCount.Store(0)
x.mailbox.Close()
}

func (x *pid) freeWatchers(ctx context.Context) {
Expand Down Expand Up @@ -1375,11 +1373,6 @@ func (x *pid) receive() {
case <-x.shutdownSignal:
return
default:
// break out of the loop when the channel is closed
if x.mailbox.IsClosed() {
return
}

// fetch the data and continue the loop when there are no records yet
received, ok := x.mailbox.Pop()
if !ok {
Expand Down
28 changes: 18 additions & 10 deletions bench/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ package bench
import (
"context"
"fmt"
"runtime"
"testing"
"time"

. "github.com/klauspost/cpuid/v2" //nolint
"github.com/stretchr/testify/require"

"github.com/tochemey/goakt/v2/actors"
Expand Down Expand Up @@ -167,19 +169,22 @@ func TestBenchmark_BenchTell(t *testing.T) {

actorsCount := 2000
workersCount := 20
duration := 10 * time.Second
duration := 30 * time.Second

benchmark := NewBenchmark(actorsCount, workersCount, duration)
require.NoError(t, benchmark.Start(ctx))

fmt.Printf("starting benchmark for (%v): num workers:(%d)\n", duration, workersCount)
fmt.Printf("Starting benchmark for (%v): num workers:(%d)\n", duration, workersCount)
if err := benchmark.BenchTell(ctx); err != nil {
t.Fatal(err)
}

fmt.Printf("total actors spawned: (%d)\n", actorsCount)
fmt.Printf("total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration)
fmt.Printf("messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds()))
fmt.Printf("Go Version: %s\n", runtime.Version())
fmt.Printf("cpu: %s (Physical Cores: %d)\n", CPU.BrandName, CPU.PhysicalCores)
fmt.Printf("Runtime CPUs: %d\n", runtime.NumCPU())
fmt.Printf("Total actors spawned: (%d)\n", actorsCount)
fmt.Printf("Total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration)
fmt.Printf("Messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds()))
t.Cleanup(func() {
require.NoError(t, benchmark.Stop(ctx))
})
Expand All @@ -190,19 +195,22 @@ func TestBenchmark_BenchAsk(t *testing.T) {

actorsCount := 2000
workersCount := 20
duration := 10 * time.Second
duration := 30 * time.Second

benchmark := NewBenchmark(actorsCount, workersCount, duration)
require.NoError(t, benchmark.Start(ctx))

fmt.Printf("starting benchmark for (%v): num workers:(%d)\n", duration, workersCount)
fmt.Printf("Starting benchmark for (%v): num workers:(%d)\n", duration, workersCount)
if err := benchmark.BenchAsk(ctx); err != nil {
t.Fatal(err)
}

fmt.Printf("total actors spawned: (%d)\n", actorsCount)
fmt.Printf("total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration)
fmt.Printf("messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds()))
fmt.Printf("Go Version: %s\n", runtime.Version())
fmt.Printf("cpu: %s (Physical Cores: %d)\n", CPU.BrandName, CPU.PhysicalCores)
fmt.Printf("Runtime CPUs: %d\n", runtime.NumCPU())
fmt.Printf("Total actors spawned: (%d)\n", actorsCount)
fmt.Printf("Total workers: (%d), total messages sent: (%d), total messages received: (%d) - duration: (%v)\n", workersCount, totalSent.Load(), totalRecv.Load(), duration)
fmt.Printf("Messages per second: (%d)\n", totalRecv.Load()/int64(duration.Seconds()))
t.Cleanup(func() {
require.NoError(t, benchmark.Stop(ctx))
})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/grandcat/zeroconf v1.0.0
github.com/klauspost/cpuid/v2 v2.2.8
github.com/nats-io/nats-server/v2 v2.10.18
github.com/nats-io/nats.go v1.36.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -69,7 +70,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/miekg/dns v1.1.61 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
Expand Down
100 changes: 100 additions & 0 deletions internal/queue/mpsc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package queue

import (
"sync"
"sync/atomic"
"unsafe"
)

// node returns the queue node
type node[T any] struct {
value T
next *node[T]
}

// Mpsc is a Multi-Producer-Single-Consumer Queue
// reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html
type Mpsc[T any] struct {
head *node[T]
tail *node[T]
length int64
lock sync.Mutex
}

// NewMpsc create an instance of Mpsc
func NewMpsc[T any]() *Mpsc[T] {
item := new(node[T])
return &Mpsc[T]{
head: item,
tail: item,
length: 0,
lock: sync.Mutex{},
}
}

// Push place the given value in the queue head (FIFO). Returns always true
func (q *Mpsc[T]) Push(value T) bool {
tnode := &node[T]{
value: value,
}
previousHead := (*node[T])(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(tnode)))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&previousHead.next)), unsafe.Pointer(tnode))
atomic.AddInt64(&q.length, 1)
return true
}

// Pop takes the QueueItem from the queue tail. Returns false if the queue is empty. Can be used in a single consumer (goroutine) only.
func (q *Mpsc[T]) Pop() (T, bool) {
var tnil T
next := (*node[T])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail.next))))
if next == nil {
return tnil, false
}

q.lock.Lock()
q.tail = next
q.lock.Unlock()
value := next.value
next.value = tnil
atomic.AddInt64(&q.length, -1)
return value, true
}

// Len returns queue length
func (q *Mpsc[T]) Len() int64 {
return atomic.LoadInt64(&q.length)
}

// IsEmpty returns true when the queue is empty
// must be called from a single, consumer goroutine
func (q *Mpsc[T]) IsEmpty() bool {
q.lock.Lock()
tail := q.tail
q.lock.Unlock()
next := (*node[T])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next))))
return next == nil
}
83 changes: 83 additions & 0 deletions internal/queue/mpsc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package queue

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TODO: add go routine-based tests
func TestMpscQueue(t *testing.T) {
t.Run("With Push/Pop", func(t *testing.T) {
q := NewMpsc[int]()
require.True(t, q.IsEmpty())
for j := 0; j < 100; j++ {
if q.Len() != 0 {
t.Fatal("expected no elements")
} else if _, ok := q.Pop(); ok {
t.Fatal("expected no elements")
}

for i := 0; i < j; i++ {
q.Push(i)
}

for i := 0; i < j; i++ {
if x, ok := q.Pop(); !ok {
t.Fatal("expected an element")
} else if x != i {
t.Fatalf("expected %d got %d", i, x)
}
}
}

a := 0
r := 0
for j := 0; j < 100; j++ {
for i := 0; i < 4; i++ {
q.Push(a)
a++
}

for i := 0; i < 2; i++ {
if x, ok := q.Pop(); !ok {
t.Fatal("expected an element")
} else if x != r {
t.Fatalf("expected %d got %d", r, x)
}
r++
}
}

if q.Len() != 200 {
t.Fatalf("expected 200 elements have %d", q.Len())
}

assert.True(t, q.Len() > 0)
})
}
10 changes: 4 additions & 6 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import "sync"
const minQueueLen = 16

// Queue thread-safe Queue using ring-buffer
// reference: https://blog.dubbelboer.com/2015/04/25/go-faster-queue.html
// https://github.com/eapache/queue
type Queue[T any] struct {
mu sync.RWMutex
cond *sync.Cond
Expand All @@ -54,10 +52,10 @@ func New[T any]() *Queue[T] {
return sq
}

// Push adds an item to the back of the queue
// Push adds an QueueItem to the back of the queue
// It can be safely called from multiple goroutines
// It will return false if the queue is closed.
// In that case the Item is dropped.
// In that case the node is dropped.
func (q *Queue[T]) Push(i T) bool {
q.mu.Lock()
if q.closed {
Expand Down Expand Up @@ -120,7 +118,7 @@ func (q *Queue[T]) IsClosed() bool {
return c
}

// Wait for an item to be added.
// Wait for an QueueItem to be added.
// If there is items on the queue the first will
// be returned immediately.
// Will return nil, false if the queue is closed.
Expand All @@ -141,7 +139,7 @@ func (q *Queue[T]) Wait() (T, bool) {
return q.Pop()
}

// Pop removes the item from the front of the queue
// Pop removes the QueueItem from the front of the queue
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
func (q *Queue[T]) Pop() (T, bool) {
Expand Down

0 comments on commit fddaedf

Please sign in to comment.