Skip to content

Commit

Permalink
fix: synchronize access to lastPingTime in ticker struct (#99)
Browse files Browse the repository at this point in the history
* fix: Synchronize access to lastPingTime in Ticker struct

* fix: Synchronize access to lastPingTime in Ticker struct second pass

* updated lasPingTime visibility, gitignored .vscode/settings.json

* update: updated as specified
  • Loading branch information
abhinandkakkadi authored Aug 31, 2023
1 parent 1eb9eba commit f07f57d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.chglog
.env
.env
.vscode
27 changes: 23 additions & 4 deletions ticker/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand All @@ -28,7 +29,7 @@ type Ticker struct {

url url.URL
callbacks callbacks
lastPingTime time.Time
lastPingTime atomicTime
autoReconnect bool
reconnectMaxRetries int
reconnectMaxDelay time.Duration
Expand All @@ -41,6 +42,22 @@ type Ticker struct {
cancel context.CancelFunc
}

// atomicTime is wrapper over time.Time to safely access
// an updating timestamp concurrently.
type atomicTime struct {
v atomic.Value
}

// Get returns the current timestamp.
func (b *atomicTime) Get() time.Time {
return b.v.Load().(time.Time)
}

// Set sets the current timestamp.
func (b *atomicTime) Set(value time.Time) {
b.v.Store(value)
}

// callbacks represents callbacks available in ticker.
type callbacks struct {
onTick func(models.Tick)
Expand Down Expand Up @@ -311,7 +328,7 @@ func (t *Ticker) ServeWithContext(ctx context.Context) {
t.reconnectAttempt = 0

// Set current time as last ping time
t.lastPingTime = time.Now()
t.lastPingTime.Set(time.Now())

// Set on close handler
t.Conn.SetCloseHandler(t.handleClose)
Expand Down Expand Up @@ -339,6 +356,7 @@ func (t *Ticker) handleClose(code int, reason string) error {
return nil
}


// Trigger callback methods
func (t *Ticker) triggerError(err error) {
if t.callbacks.onError != nil {
Expand Down Expand Up @@ -370,6 +388,7 @@ func (t *Ticker) triggerNoReconnect(attempt int) {
}
}


func (t *Ticker) triggerMessage(messageType int, message []byte) {
if t.callbacks.onMessage != nil {
t.callbacks.onMessage(messageType, message)
Expand Down Expand Up @@ -401,7 +420,7 @@ func (t *Ticker) checkConnection(ctx context.Context, wg *sync.WaitGroup) {

// If last ping time is greater then timeout interval then close the
// existing connection and reconnect
if time.Since(t.lastPingTime) > dataTimeoutInterval {
if time.Since(t.lastPingTime.Get()) > dataTimeoutInterval {
// Close the current connection without waiting for close frame
if t.Conn != nil {
t.Conn.Close()
Expand Down Expand Up @@ -431,7 +450,7 @@ func (t *Ticker) readMessage(ctx context.Context, wg *sync.WaitGroup) {
}

// Update last ping time to check for connection
t.lastPingTime = time.Now()
t.lastPingTime.Set(time.Now())

// Trigger message.
t.triggerMessage(mType, msg)
Expand Down

0 comments on commit f07f57d

Please sign in to comment.