From 8cbe566f957bad21fc81b371c1d2bf62b69fd952 Mon Sep 17 00:00:00 2001 From: Fmar Date: Tue, 15 Oct 2024 18:05:32 +0200 Subject: [PATCH] feat: adding subscription WiP --- .idea/.gitignore | 8 +++++ .idea/hub.iml | 9 ++++++ .idea/modules.xml | 8 +++++ .idea/vcs.xml | 6 ++++ api/api.go | 10 +++++- api/models.go | 2 +- http/http_service.go | 2 +- service/models.go | 3 ++ service/service.go | 2 +- service/start.go | 76 ++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/hub.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..13566b81 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/hub.iml b/.idea/hub.iml new file mode 100644 index 00000000..5e764c4f --- /dev/null +++ b/.idea/hub.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..8d1428ff --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..35eb1ddf --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/api/api.go b/api/api.go index f19056ca..fcc6798e 100644 --- a/api/api.go +++ b/api/api.go @@ -57,7 +57,7 @@ func NewAPI(svc service.Service, gormDB *gorm.DB, config config.Config, keys key } } -func (api *api) CreateApp(createAppRequest *CreateAppRequest) (*CreateAppResponse, error) { +func (api *api) CreateApp(ctx context.Context, createAppRequest *CreateAppRequest) (*CreateAppResponse, error) { expiresAt, err := api.parseExpiresAt(createAppRequest.ExpiresAt) if err != nil { return nil, fmt.Errorf("invalid expiresAt: %v", err) @@ -128,6 +128,14 @@ func (api *api) CreateApp(createAppRequest *CreateAppRequest) (*CreateAppRespons lud16 = fmt.Sprintf("&lud16=%s", lightningAddress) } responseBody.PairingUri = fmt.Sprintf("nostr+walletconnect://%s?relay=%s&secret=%s%s", appWalletPubKey, relayUrl, pairingSecretKey, lud16) + + fmt.Println("~+~+~+~+~+~+~+~+~+ GOING TO SUBSCRIBE TO NEW APP WALLET!!!!!!!!!!!!!! ") + err = api.svc.SubscribeToAppRequests(ctx, appWalletPubKey) + if err != nil { + fmt.Println("error subscribing to new app wallet key: ", err) + return nil, err + } + return responseBody, nil } diff --git a/api/models.go b/api/models.go index 5d2e1645..1d58fe2f 100644 --- a/api/models.go +++ b/api/models.go @@ -11,7 +11,7 @@ import ( ) type API interface { - CreateApp(createAppRequest *CreateAppRequest) (*CreateAppResponse, error) + CreateApp(ctx context.Context, createAppRequest *CreateAppRequest) (*CreateAppResponse, error) UpdateApp(userApp *db.App, updateAppRequest *UpdateAppRequest) error DeleteApp(userApp *db.App) error GetApp(userApp *db.App) *App diff --git a/http/http_service.go b/http/http_service.go index 32e61baf..31a09bad 100644 --- a/http/http_service.go +++ b/http/http_service.go @@ -837,7 +837,7 @@ func (httpSvc *HttpService) appsCreateHandler(c echo.Context) error { }) } - responseBody, err := httpSvc.api.CreateApp(&requestData) + responseBody, err := httpSvc.api.CreateApp(c.Request().Context(), &requestData) if err != nil { logger.Logger.WithField("requestData", requestData).WithError(err).Error("Failed to save app") diff --git a/service/models.go b/service/models.go index edb41bd7..27688f77 100644 --- a/service/models.go +++ b/service/models.go @@ -1,6 +1,8 @@ package service import ( + "context" + "github.com/getAlby/hub/alby" "github.com/getAlby/hub/config" "github.com/getAlby/hub/events" @@ -14,6 +16,7 @@ type Service interface { StartApp(encryptionKey string) error StopApp() Shutdown() + SubscribeToAppRequests(ctx context.Context, appWalletPubKey string) error // TODO: remove getters (currently used by http / wails services) GetAlbyOAuthSvc() alby.AlbyOAuthService diff --git a/service/service.go b/service/service.go index 351d000e..34cd0b10 100644 --- a/service/service.go +++ b/service/service.go @@ -138,7 +138,7 @@ func NewService(ctx context.Context) (*service, error) { func (svc *service) createFilters(identityPubkey string) nostr.Filters { filter := nostr.Filter{ - // Tags: nostr.TagMap{"p": []string{identityPubkey}}, + Tags: nostr.TagMap{"p": []string{identityPubkey}}, Kinds: []int{models.REQUEST_KIND}, } return []nostr.Filter{filter} diff --git a/service/start.go b/service/start.go index b3bd21a6..877918d1 100644 --- a/service/start.go +++ b/service/start.go @@ -119,6 +119,82 @@ func (svc *service) startNostr(ctx context.Context, encryptionKey string) error return nil } +func (svc *service) SubscribeToAppRequests(ctx context.Context, appWalletPubKey string) error { + relayUrl := svc.cfg.GetRelayUrl() + go func() { + // ensure the relay is properly disconnected before exiting + defer svc.wg.Done() + //Start infinite loop which will be only broken by canceling ctx (SIGINT) + var relay *nostr.Relay + waitToReconnectSeconds := 0 + + for i := 0; ; i++ { + + // wait for a delay if any before retrying + if waitToReconnectSeconds > 0 { + contextCancelled := false + + select { + case <-ctx.Done(): //context cancelled + logger.Logger.Info("service context cancelled while waiting for retry") + contextCancelled = true + case <-time.After(time.Duration(waitToReconnectSeconds) * time.Second): //timeout + } + if contextCancelled { + break + } + } + + closeRelay(relay) + + //connect to the relay + logger.Logger.WithFields(logrus.Fields{ + "relay_url": relayUrl, + "iteration": i, + }).Info("Connecting to the relay") + + relay, err := nostr.RelayConnect(ctx, relayUrl, nostr.WithNoticeHandler(svc.noticeHandler)) + if err != nil { + // exponential backoff from 2 - 60 seconds + waitToReconnectSeconds = max(waitToReconnectSeconds, 1) + waitToReconnectSeconds *= 2 + waitToReconnectSeconds = min(waitToReconnectSeconds, 60) + logger.Logger.WithFields(logrus.Fields{ + "iteration": i, + "retry_seconds": waitToReconnectSeconds, + }).WithError(err).Error("Failed to connect to relay") + continue + } + + waitToReconnectSeconds = 0 + + //publish event with NIP-47 info + err = svc.nip47Service.PublishNip47Info(ctx, relay, svc.lnClient) + if err != nil { + logger.Logger.WithError(err).Error("Could not publish NIP47 info") + } + + logger.Logger.Info("Subscribing to events") + sub, err := relay.Subscribe(ctx, svc.createFilters(appWalletPubKey)) + if err != nil { + logger.Logger.WithError(err).Error("Failed to subscribe to events") + continue + } + err = svc.StartSubscription(sub.Context, sub) + if err != nil { + //err being non-nil means that we have an error on the websocket error channel. In this case we just try to reconnect. + logger.Logger.WithError(err).Error("Got an error from the relay while listening to subscription.") + continue + } + //err being nil means that the context was canceled and we should exit the program. + break + } + closeRelay(relay) + logger.Logger.Info("Relay subroutine ended") + }() + return nil +} + func (svc *service) StartApp(encryptionKey string) error { if svc.lnClient != nil { return errors.New("app already started")