From 4c1e1df7bcb3adfc14c033cfeab7606913793c6a Mon Sep 17 00:00:00 2001 From: Zohaib Date: Sun, 23 Jul 2023 03:02:58 +0300 Subject: [PATCH] Optimizing NATS snapshotting for low memory environments --- snapshot/nats_storage.go | 31 ++++++++++++++++++++++++------- stream/embedded_nats.go | 2 +- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/snapshot/nats_storage.go b/snapshot/nats_storage.go index 9a0413f..65c9cf6 100644 --- a/snapshot/nats_storage.go +++ b/snapshot/nats_storage.go @@ -1,11 +1,13 @@ package snapshot import ( + "os" + "time" + "github.com/maxpert/marmot/cfg" "github.com/maxpert/marmot/stream" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" - "os" ) const hashHeaderKey = "marmot-snapshot-tag" @@ -63,16 +65,31 @@ func (n *natsStorage) Download(filePath, name string) error { return err } - err = blb.GetFile(name, filePath) - if err == nats.ErrObjectNotFound { - return ErrNoSnapshotFound - } + for { + err = blb.GetFile(name, filePath) + if err == nil { + return nil + } + + if err == nats.ErrObjectNotFound { + return ErrNoSnapshotFound + } + + if jsmErr, ok := err.(nats.JetStreamError); ok { + log.Warn(). + Err(err). + Int("Status", jsmErr.APIError().Code). + Msg("Error downloading snapshot, retrying...") + time.Sleep(time.Second) + continue + } - return err + return err + } } func getBlobStore(conn *nats.Conn) (nats.ObjectStore, error) { - js, err := conn.JetStream() + js, err := conn.JetStream(nats.MaxWait(30 * time.Second)) if err != nil { return nil, err } diff --git a/stream/embedded_nats.go b/stream/embedded_nats.go index 7ade339..24eb82f 100644 --- a/stream/embedded_nats.go +++ b/stream/embedded_nats.go @@ -51,7 +51,7 @@ func startEmbeddedServer(nodeName string) (*embeddedNats, error) { Port: -1, NoSigs: true, JetStream: true, - JetStreamMaxMemory: 1 << 25, + JetStreamMaxMemory: 1 << 20, JetStreamMaxStore: 1 << 30, Cluster: server.ClusterOpts{ Name: "e-marmot",