From 17567bc85c296027376ebe57f2f2e62c7aea571a Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Mon, 6 Feb 2023 12:22:23 -0800 Subject: [PATCH] Eng 2195 Implement application logic to send notifications (#926) * [2/2][UI] Eng 2193 Allow connecting to email as integration (#897) * [1/2][Backend] Eng 2191 add slack integration to backend (#911) * [2/2][UI] Eng 2191 Allow user to connect to slack integration (#910) * Eng 2194 m2 implement go interface for (#923) * [1/2][backend] Eng 2196 m2 allow user to configure notifications (#935) * [2/2][UI] Eng 2196 m2 allow user to configure wf specific notifications (#936) * Implement actual code to send email notifications (#943) * Implement actual code to send slack notifications (#944) --- src/golang/cmd/executor/executor/database.go | 3 + .../cmd/server/handler/edit_workflow.go | 33 +++-- .../cmd/server/handler/register_workflow.go | 1 + src/golang/cmd/server/request/integration.go | 2 + src/golang/cmd/server/server/database.go | 1 + src/golang/lib/engine/aq_engine.go | 99 +++++++++++++- src/golang/lib/engine/databricks_executor.go | 63 ++++++++- src/golang/lib/engine/engine.go | 2 + src/golang/lib/engine/notification.go | 103 +++++++++++++++ src/golang/lib/engine/notification_test.go | 44 +++++++ src/golang/lib/models/shared/workflow.go | 32 +---- src/golang/lib/models/workflow.go | 16 +-- src/golang/lib/notification/email.go | 47 ++++++- src/golang/lib/notification/notification.go | 87 ++++++++++-- .../lib/notification/notification_test.go | 36 +++++ src/golang/lib/notification/slack.go | 96 +++++++++++++- src/golang/lib/repos/sqlite/workflow.go | 5 +- src/golang/lib/repos/tests/seed.go | 1 + src/golang/lib/repos/tests/workflow.go | 24 +++- src/golang/lib/repos/workflow.go | 2 + src/golang/lib/workflow/dag/workflow_dag.go | 29 ++++ src/golang/lib/workflow/utils/database.go | 1 + .../integrations/cards/slackCard.tsx | 2 +- .../WorkflowNotificationSettings.tsx | 124 ++++++++++++++++++ .../components/workflows/WorkflowSettings.tsx | 93 ++++++++++++- src/ui/common/src/utils/workflows.tsx | 6 + 26 files changed, 865 insertions(+), 87 deletions(-) create mode 100644 src/golang/lib/engine/notification.go create mode 100644 src/golang/lib/engine/notification_test.go create mode 100644 src/golang/lib/notification/notification_test.go create mode 100644 src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx diff --git a/src/golang/cmd/executor/executor/database.go b/src/golang/cmd/executor/executor/database.go index 28e24882e..6c8f7c16a 100644 --- a/src/golang/cmd/executor/executor/database.go +++ b/src/golang/cmd/executor/executor/database.go @@ -13,6 +13,7 @@ type Repos struct { DAGEdgeRepo repos.DAGEdge DAGResultRepo repos.DAGResult ExecutionEnvironmentRepo repos.ExecutionEnvironment + IntegrationRepo repos.Integration NotificationRepo repos.Notification OperatorRepo repos.Operator OperatorResultRepo repos.OperatorResult @@ -28,6 +29,7 @@ func createRepos() *Repos { DAGEdgeRepo: sqlite.NewDAGEdgeRepo(), DAGResultRepo: sqlite.NewDAGResultRepo(), ExecutionEnvironmentRepo: sqlite.NewExecutionEnvironmentRepo(), + IntegrationRepo: sqlite.NewIntegrationRepo(), NotificationRepo: sqlite.NewNotificationRepo(), OperatorRepo: sqlite.NewOperatorRepo(), OperatorResultRepo: sqlite.NewOperatorResultRepo(), @@ -44,6 +46,7 @@ func getEngineRepos(repos *Repos) *engine.Repos { DAGEdgeRepo: repos.DAGEdgeRepo, DAGResultRepo: repos.DAGResultRepo, ExecutionEnvironmentRepo: repos.ExecutionEnvironmentRepo, + IntegrationRepo: repos.IntegrationRepo, NotificationRepo: repos.NotificationRepo, OperatorRepo: repos.OperatorRepo, OperatorResultRepo: repos.OperatorResultRepo, diff --git a/src/golang/cmd/server/handler/edit_workflow.go b/src/golang/cmd/server/handler/edit_workflow.go index 3bf0d585c..41ad50f1d 100644 --- a/src/golang/cmd/server/handler/edit_workflow.go +++ b/src/golang/cmd/server/handler/edit_workflow.go @@ -10,6 +10,7 @@ import ( aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/engine" + "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/repos" "github.com/aqueducthq/aqueduct/lib/workflow" "github.com/aqueducthq/aqueduct/lib/workflow/utils" @@ -43,18 +44,20 @@ type EditWorkflowHandler struct { } type editWorkflowInput struct { - WorkflowName string `json:"name"` - WorkflowDescription string `json:"description"` - Schedule *col_workflow.Schedule `json:"schedule"` - RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"` + WorkflowName string `json:"name"` + WorkflowDescription string `json:"description"` + Schedule *col_workflow.Schedule `json:"schedule"` + RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"` + NotificationSettings *shared.NotificationSettings `json:"notification_settings"` } type editWorkflowArgs struct { - workflowId uuid.UUID - workflowName string - workflowDescription string - schedule *col_workflow.Schedule - retentionPolicy *col_workflow.RetentionPolicy + workflowId uuid.UUID + workflowName string + workflowDescription string + schedule *col_workflow.Schedule + retentionPolicy *col_workflow.RetentionPolicy + notificationSettings *shared.NotificationSettings } func (*EditWorkflowHandler) Name() string { @@ -116,11 +119,12 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) } return &editWorkflowArgs{ - workflowId: workflowID, - workflowName: input.WorkflowName, - workflowDescription: input.WorkflowDescription, - schedule: input.Schedule, - retentionPolicy: input.RetentionPolicy, + workflowId: workflowID, + workflowName: input.WorkflowName, + workflowDescription: input.WorkflowDescription, + schedule: input.Schedule, + retentionPolicy: input.RetentionPolicy, + notificationSettings: input.NotificationSettings, }, http.StatusOK, nil } @@ -173,6 +177,7 @@ func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfa args.workflowDescription, args.schedule, args.retentionPolicy, + args.notificationSettings, ) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/cmd/server/handler/register_workflow.go b/src/golang/cmd/server/handler/register_workflow.go index b2281198e..637bbcb68 100644 --- a/src/golang/cmd/server/handler/register_workflow.go +++ b/src/golang/cmd/server/handler/register_workflow.go @@ -227,6 +227,7 @@ func (h *RegisterWorkflowHandler) Perform(ctx context.Context, interfaceArgs int dbWorkflowDag.Metadata.Description, &dbWorkflowDag.Metadata.Schedule, &dbWorkflowDag.Metadata.RetentionPolicy, + &dbWorkflowDag.Metadata.NotificationSettings, ) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/cmd/server/request/integration.go b/src/golang/cmd/server/request/integration.go index f5968cd13..8e8e494db 100644 --- a/src/golang/cmd/server/request/integration.go +++ b/src/golang/cmd/server/request/integration.go @@ -47,6 +47,8 @@ func isUserOnlyIntegration(svc integration.Service) bool { integration.GoogleSheets, integration.Github, integration.Conda, + integration.Email, + integration.Slack, } for _, s := range userSpecific { if s == svc { diff --git a/src/golang/cmd/server/server/database.go b/src/golang/cmd/server/server/database.go index 06ea75e18..412f0a135 100644 --- a/src/golang/cmd/server/server/database.go +++ b/src/golang/cmd/server/server/database.go @@ -50,6 +50,7 @@ func GetEngineRepos(repos *Repos) *engine.Repos { DAGEdgeRepo: repos.DAGEdgeRepo, DAGResultRepo: repos.DAGResultRepo, ExecutionEnvironmentRepo: repos.ExecutionEnvironmentRepo, + IntegrationRepo: repos.IntegrationRepo, NotificationRepo: repos.NotificationRepo, OperatorRepo: repos.OperatorRepo, OperatorResultRepo: repos.OperatorResultRepo, diff --git a/src/golang/lib/engine/aq_engine.go b/src/golang/lib/engine/aq_engine.go index 8cd948ce6..a602cd39e 100644 --- a/src/golang/lib/engine/aq_engine.go +++ b/src/golang/lib/engine/aq_engine.go @@ -54,6 +54,7 @@ type Repos struct { DAGEdgeRepo repos.DAGEdge DAGResultRepo repos.DAGResult ExecutionEnvironmentRepo repos.ExecutionEnvironment + IntegrationRepo repos.Integration NotificationRepo repos.Notification OperatorRepo repos.Operator OperatorResultRepo repos.OperatorResult @@ -658,6 +659,7 @@ func (eng *aqEngine) EditWorkflow( workflowDescription string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings *mdl_shared.NotificationSettings, ) error { changes := map[string]interface{}{} if workflowName != "" { @@ -672,6 +674,10 @@ func (eng *aqEngine) EditWorkflow( changes[models.WorkflowRetentionPolicy] = retentionPolicy } + if notificationSettings != nil { + changes[models.WorkflowNotificationSettings] = notificationSettings + } + if schedule.Trigger != "" { cronjobName := shared_utils.AppendPrefix(workflowID.String()) err := eng.updateWorkflowSchedule(ctx, workflowID, cronjobName, schedule) @@ -811,32 +817,77 @@ func (eng *aqEngine) executeWithEngine( timeConfig, opExecMode, databricksJobManager, + vaultObject, + eng.IntegrationRepo, + eng.Database, ) default: return eng.execute( - ctx, dag, workflowRunMetadata, timeConfig, + vaultObject, opExecMode, ) } } +func onFinishExecution( + ctx context.Context, + inProgressOps map[uuid.UUID]operator.Operator, + pollInterval time.Duration, + cleanupTimeout time.Duration, + curErr error, + notificationContent *notificationContentStruct, + dag dag_utils.WorkflowDag, + execMode operator.ExecutionMode, + vaultObject vault.Vault, + integrationRepo repos.Integration, + DB database.Database, +) { + // Wait a little bit for all active operators to finish before exiting on failure. + waitForInProgressOperators(ctx, inProgressOps, pollInterval, cleanupTimeout) + if curErr != nil && notificationContent == nil { + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.ErrorNotificationLevel, + contextMsg: curErr.Error(), + } + } + + // Send notifications + if notificationContent != nil && execMode == operator.Publish { + err := sendNotifications( + ctx, + dag, + notificationContent, + vaultObject, + integrationRepo, + DB, + ) + if err != nil { + log.Errorf("Error sending notifications: %s", err) + } + } +} + func (eng *aqEngine) execute( ctx context.Context, workflowDag dag_utils.WorkflowDag, workflowRunMetadata *WorkflowRunMetadata, timeConfig *AqueductTimeConfig, + vaultObject vault.Vault, opExecMode operator.ExecutionMode, -) error { +) (err error) { // These are the operators of immediate interest. They either need to be scheduled or polled on. inProgressOps := workflowRunMetadata.InProgressOps completedOps := workflowRunMetadata.CompletedOps dag := workflowDag opToDependencyCount := workflowRunMetadata.OpToDependencyCount + var notificationContent *notificationContentStruct = nil + err = nil + // Kick off execution by starting all operators that don't have any inputs. for _, op := range dag.Operators() { if opToDependencyCount[op.ID()] == 0 { @@ -848,8 +899,21 @@ func (eng *aqEngine) execute( return errors.Newf("No initial operators to schedule.") } - // Wait a little bit for all active operators to finish before exiting on failure. - defer waitForInProgressOperators(ctx, inProgressOps, timeConfig.OperatorPollInterval, timeConfig.CleanupTimeout) + defer func() { + onFinishExecution( + ctx, + inProgressOps, + timeConfig.OperatorPollInterval, + timeConfig.CleanupTimeout, + err, + notificationContent, + workflowDag, + opExecMode, + vaultObject, + eng.IntegrationRepo, + eng.Database, + ) + }() start := time.Now() @@ -909,7 +973,27 @@ func (eng *aqEngine) execute( } } + notificationCtxMsg := "" + if execState.Error != nil { + notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context) + } + + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.ErrorNotificationLevel, + contextMsg: notificationCtxMsg, + } + return opFailureError(*execState.FailureType, op) + } else if execState.Status == shared.FailedExecutionStatus { + notificationCtxMsg := "" + if execState.Error != nil { + notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context) + } + + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.WarningNotificationLevel, + contextMsg: notificationCtxMsg, + } } // Add the operator to the completed stack, and remove it from the in-progress one. @@ -961,6 +1045,13 @@ func (eng *aqEngine) execute( return errors.Newf("Internal error: operator %s has a non-zero dep count %d.", opID, depCount) } } + + // avoid overriding an existing notification (in practice, this is a warning) + if notificationContent == nil { + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.SuccessNotificationLevel, + } + } return nil } diff --git a/src/golang/lib/engine/databricks_executor.go b/src/golang/lib/engine/databricks_executor.go index 8caf8b02e..0c3f53585 100644 --- a/src/golang/lib/engine/databricks_executor.go +++ b/src/golang/lib/engine/databricks_executor.go @@ -6,7 +6,11 @@ import ( "time" "github.com/aqueducthq/aqueduct/lib/collections/shared" + "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/job" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/aqueducthq/aqueduct/lib/repos" + "github.com/aqueducthq/aqueduct/lib/vault" dag_utils "github.com/aqueducthq/aqueduct/lib/workflow/dag" "github.com/aqueducthq/aqueduct/lib/workflow/operator" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -28,9 +32,14 @@ func ExecuteDatabricks( timeConfig *AqueductTimeConfig, opExecMode operator.ExecutionMode, databricksJobManager *job.DatabricksJobManager, -) error { + vaultObject vault.Vault, + integrationRepo repos.Integration, + DB database.Database, +) (err error) { inProgressOps := workflowRunMetadata.InProgressOps completedOps := workflowRunMetadata.CompletedOps + var notificationContent *notificationContentStruct = nil + err = nil // Convert the operators into tasks taskList, err := CreateTaskList(ctx, dag, workflowName, databricksJobManager) @@ -56,8 +65,21 @@ func ExecuteDatabricks( return errors.Newf("No initial operators to schedule.") } - // Wait a little bit for all active operators to finish before exiting on failure. - defer waitForInProgressOperators(ctx, inProgressOps, timeConfig.OperatorPollInterval, timeConfig.CleanupTimeout) + defer func() { + onFinishExecution( + ctx, + inProgressOps, + timeConfig.OperatorPollInterval, + timeConfig.CleanupTimeout, + err, + notificationContent, + dag, + opExecMode, + vaultObject, + integrationRepo, + DB, + ) + }() start := time.Now() var operatorError error @@ -81,10 +103,32 @@ func ExecuteDatabricks( return errors.Wrapf(err, "Error when finishing execution of operator %s", op.Name()) } } + // Capture the first failed operator. + if shouldStopExecution(execState) { + if operatorError == nil { + operatorError = opFailureError(*execState.FailureType, op) + } - if shouldStopExecution(execState) && operatorError == nil { - operatorError = opFailureError(*execState.FailureType, op) + notificationCtxMsg := "" + if execState.Error != nil { + notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context) + } + + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.ErrorNotificationLevel, + contextMsg: notificationCtxMsg, + } + } else if execState.Status == shared.FailedExecutionStatus { + notificationCtxMsg := "" + if execState.Error != nil { + notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context) + } + + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.WarningNotificationLevel, + contextMsg: notificationCtxMsg, + } } // Add the operator to the completed stack, and remove it from the in-progress one. @@ -100,9 +144,18 @@ func ExecuteDatabricks( if len(completedOps) != len(dag.Operators()) { return errors.Newf("Internal error: %d operators were provided but only %d completed.", len(dag.Operators()), len(completedOps)) } + if operatorError != nil { return operatorError } + + // avoid overriding an existing notification (in practice, this is a warning) + if notificationContent == nil { + notificationContent = ¬ificationContentStruct{ + level: mdl_shared.SuccessNotificationLevel, + } + } + return nil } diff --git a/src/golang/lib/engine/engine.go b/src/golang/lib/engine/engine.go index a562209fa..ed0d70d19 100644 --- a/src/golang/lib/engine/engine.go +++ b/src/golang/lib/engine/engine.go @@ -10,6 +10,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/database" exec_env "github.com/aqueducthq/aqueduct/lib/execution_environment" "github.com/aqueducthq/aqueduct/lib/models" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) @@ -50,6 +51,7 @@ type Engine interface { workflowDescription string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings *mdl_shared.NotificationSettings, ) error // TODO ENG-1444: Used as a wrapper to trigger a workflow via executor binary. diff --git a/src/golang/lib/engine/notification.go b/src/golang/lib/engine/notification.go new file mode 100644 index 000000000..69a051418 --- /dev/null +++ b/src/golang/lib/engine/notification.go @@ -0,0 +1,103 @@ +package engine + +import ( + "context" + "fmt" + + "github.com/aqueducthq/aqueduct/lib/database" + "github.com/aqueducthq/aqueduct/lib/models/shared" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/aqueducthq/aqueduct/lib/notification" + "github.com/aqueducthq/aqueduct/lib/repos" + "github.com/aqueducthq/aqueduct/lib/vault" + "github.com/aqueducthq/aqueduct/lib/workflow/dag" +) + +type notificationContentStruct struct { + level mdl_shared.NotificationLevel + contextMsg string +} + +func getNotifications( + ctx context.Context, + wfDag dag.WorkflowDag, + vaultObject vault.Vault, + integrationRepo repos.Integration, + DB database.Database, +) ([]notification.Notification, error) { + return notification.GetNotificationsFromUser( + ctx, + wfDag.UserID(), + integrationRepo, + vaultObject, + DB, + ) +} + +func notificationMsg(dagName string, level shared.NotificationLevel, contextMsg string) string { + // Full message will look like "Workflow my_churn succeeded with warning: some context ." + statusMsg := "" + contextSuffix := "." + if len(contextMsg) > 0 { + contextSuffix = fmt.Sprintf(": %s .", contextMsg) + } + if level == shared.SuccessNotificationLevel { + statusMsg = fmt.Sprintf("succeeded%s", contextSuffix) + } else if level == shared.WarningNotificationLevel { + statusMsg = fmt.Sprintf("succeeded with warning%s", contextSuffix) + } else if level == shared.ErrorNotificationLevel { + statusMsg = fmt.Sprintf("failed%s", contextSuffix) + } else { + // For now, no caller will send message other than success, warning, or error. + // This line is in case of future use cases. + statusMsg = fmt.Sprintf("has a message: %s .", contextMsg) + } + + return fmt.Sprintf("Workflow %s %s", dagName, statusMsg) +} + +func sendNotifications( + ctx context.Context, + wfDag dag.WorkflowDag, + content *notificationContentStruct, + vaultObject vault.Vault, + integrationRepo repos.Integration, + DB database.Database, +) error { + if content == nil { + return nil + } + + notifications, err := getNotifications(ctx, wfDag, vaultObject, integrationRepo, DB) + if err != nil { + return err + } + + msg := notificationMsg(wfDag.Name(), content.level, content.contextMsg) + workflowSettings := wfDag.NotificationSettings().Settings + for _, notificationObj := range notifications { + if len(workflowSettings) > 0 { + // send based on settings + thresholdLevel, ok := workflowSettings[notificationObj.ID()] + if ok { + if notification.ShouldSend(thresholdLevel, content.level) { + err = notificationObj.Send(ctx, msg) + if err != nil { + return err + } + } + } + } else { + // Otherwise we send based on global settings. + // ENG-2341 will allow user to configure if a notification applies to all workflows. + if notification.ShouldSend(notificationObj.Level(), content.level) { + err = notificationObj.Send(ctx, msg) + if err != nil { + return err + } + } + } + } + + return nil +} diff --git a/src/golang/lib/engine/notification_test.go b/src/golang/lib/engine/notification_test.go new file mode 100644 index 000000000..219882a7d --- /dev/null +++ b/src/golang/lib/engine/notification_test.go @@ -0,0 +1,44 @@ +// Note about package and file structure for testings: +// https://medium.com/@butterv/go-how-to-implement-tests-of-private-methods-e34d1cc2bc31 +package engine + +import ( + "testing" + + "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/stretchr/testify/require" +) + +func TestGetNotification(t *testing.T) { + expectedMsg := "Workflow example_dag succeeded." + actualMsg := notificationMsg("example_dag", shared.SuccessNotificationLevel, "") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag succeeded: example_ctx ." + actualMsg = notificationMsg("example_dag", shared.SuccessNotificationLevel, "example_ctx") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag succeeded with warning." + actualMsg = notificationMsg("example_dag", shared.WarningNotificationLevel, "") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag succeeded with warning: example_ctx ." + actualMsg = notificationMsg("example_dag", shared.WarningNotificationLevel, "example_ctx") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag failed." + actualMsg = notificationMsg("example_dag", shared.ErrorNotificationLevel, "") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag failed: example_ctx ." + actualMsg = notificationMsg("example_dag", shared.ErrorNotificationLevel, "example_ctx") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag has a message: example_ctx ." + actualMsg = notificationMsg("example_dag", shared.NeutralNotificationLevel, "example_ctx") + require.Equal(t, expectedMsg, actualMsg) + + expectedMsg = "Workflow example_dag has a message: example_ctx ." + actualMsg = notificationMsg("example_dag", shared.InfoNotificationLevel, "example_ctx") + require.Equal(t, expectedMsg, actualMsg) +} diff --git a/src/golang/lib/models/shared/workflow.go b/src/golang/lib/models/shared/workflow.go index 25b1ab754..7a0fcd53c 100644 --- a/src/golang/lib/models/shared/workflow.go +++ b/src/golang/lib/models/shared/workflow.go @@ -8,40 +8,20 @@ import ( ) // `NotificationSettings` maps IntegrationID to NotificationLevel -type NotificationSettings map[uuid.UUID]NotificationLevel +// This has to be a struct since sql driver does not support map type. +type NotificationSettings struct { + Settings map[uuid.UUID]NotificationLevel `json:"settings"` +} func (s *NotificationSettings) Value() (driver.Value, error) { return utils.ValueJSONB(*s) } func (s *NotificationSettings) Scan(value interface{}) error { - return utils.ScanJSONB(value, s) -} - -type NullNotificationSettings struct { - NotificationSettings - IsNull bool -} - -func (n *NullNotificationSettings) Value() (driver.Value, error) { - if n.IsNull { - return nil, nil - } - - return (&n.NotificationSettings).Value() -} - -func (n *NullNotificationSettings) Scan(value interface{}) error { if value == nil { - n.IsNull = true + s.Settings = nil return nil } - s := &NotificationSettings{} - if err := s.Scan(value); err != nil { - return err - } - - n.NotificationSettings, n.IsNull = *s, false - return nil + return utils.ScanJSONB(value, s) } diff --git a/src/golang/lib/models/workflow.go b/src/golang/lib/models/workflow.go index 6d39f9d0f..fe26b8c79 100644 --- a/src/golang/lib/models/workflow.go +++ b/src/golang/lib/models/workflow.go @@ -26,14 +26,14 @@ const ( // A Workflow maps to the workflow table. type Workflow struct { - ID uuid.UUID `db:"id" json:"id"` - UserID uuid.UUID `db:"user_id" json:"user_id"` - Name string `db:"name" json:"name"` - Description string `db:"description" json:"description"` - Schedule workflow.Schedule `db:"schedule" json:"schedule"` - CreatedAt time.Time `db:"created_at" json:"created_at"` - RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"` - NotificationSettings shared.NullNotificationSettings `db:"notification_settings" json:"notification_settings"` + ID uuid.UUID `db:"id" json:"id"` + UserID uuid.UUID `db:"user_id" json:"user_id"` + Name string `db:"name" json:"name"` + Description string `db:"description" json:"description"` + Schedule workflow.Schedule `db:"schedule" json:"schedule"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"` + NotificationSettings shared.NotificationSettings `db:"notification_settings" json:"notification_settings"` } // WorkflowCols returns a comma-separated string of all Workflow columns. diff --git a/src/golang/lib/notification/email.go b/src/golang/lib/notification/email.go index bbbc8389b..990a8b879 100644 --- a/src/golang/lib/notification/email.go +++ b/src/golang/lib/notification/email.go @@ -1,23 +1,58 @@ package notification import ( + "context" "crypto/tls" + "fmt" "net/smtp" + "strings" + "github.com/aqueducthq/aqueduct/lib/models" "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/google/uuid" ) type EmailNotification struct { - conf *shared.EmailConfig + integration *models.Integration + conf *shared.EmailConfig } -func newEmailNotification(conf *shared.EmailConfig) *EmailNotification { - return &EmailNotification{conf: conf} +func newEmailNotification(integration *models.Integration, conf *shared.EmailConfig) *EmailNotification { + return &EmailNotification{integration: integration, conf: conf} } -func (e *EmailNotification) Send(msg string, level shared.NotificationLevel) error { - // TODO: Implement - return nil +func (e *EmailNotification) ID() uuid.UUID { + return e.integration.ID +} + +func (e *EmailNotification) Level() shared.NotificationLevel { + return e.conf.Level +} + +func fullMessage(subject string, from string, targets []string, body string) string { + fullMsg := fmt.Sprintf("From: %s\n", from) + fullMsg += fmt.Sprintf("To: %s\n", strings.Join(targets, ",")) + fullMsg += fmt.Sprintf("Subject: %s\n\n", subject) + fullMsg += body + return fullMsg +} + +func (e *EmailNotification) Send(ctx context.Context, msg string) error { + auth := smtp.PlainAuth( + "", // identity + e.conf.User, + e.conf.Password, + e.conf.Host, + ) + + fullMsg := fullMessage("aqueduct notification", e.conf.User, e.conf.Targets, msg) + return smtp.SendMail( + e.conf.FullHost(), + auth, + e.conf.User, + e.conf.Targets, + []byte(fullMsg), + ) } func AuthenticateEmail(conf *shared.EmailConfig) error { diff --git a/src/golang/lib/notification/notification.go b/src/golang/lib/notification/notification.go index a2503878c..2e1993d11 100644 --- a/src/golang/lib/notification/notification.go +++ b/src/golang/lib/notification/notification.go @@ -4,25 +4,70 @@ import ( "context" "github.com/aqueducthq/aqueduct/lib/collections/integration" + "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/lib_utils" "github.com/aqueducthq/aqueduct/lib/models" "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/aqueducthq/aqueduct/lib/repos" "github.com/aqueducthq/aqueduct/lib/vault" "github.com/aqueducthq/aqueduct/lib/workflow/operator/connector/auth" "github.com/dropbox/godropbox/errors" + "github.com/google/uuid" ) var ErrIntegrationTypeIsNotNotification = errors.New("Integration type is not a notification.") type Notification interface { - // `Send()` sends a notification with `level`, and the content is `msg`. + // `ID()` is the unique identifier, typically mapped to the integration ID. + ID() uuid.UUID + + // `Level()` is the global default severity level threshold beyond which a notification should send. + // For example, 'warning' threshold allows 'error' and 'warning' level notifications, + // but blocking 'success' notifications. // - // The caller always call `Send()` when a notification is generated. - // There could be a level preference associated with the notification integration. - // For example, slack and email has `level` field in config, - // and only notifications beyond this level will be sent. - // In such cases, the implementation of `Send()` should reflect the level preference. - Send(msg string, level shared.NotificationLevel) error + // This behavior is controlled by caller calling `ShouldSend()` function. + // This field is a 'global default' as we allow overriding this behavior in, + // for example, workflow specific settings. + Level() shared.NotificationLevel + + // `Send()` sends a notification. + // The caller should decide, based on `Level()` and any other context, if `Send()` + // should be called. + Send(ctx context.Context, msg string) error +} + +func GetNotificationsFromUser( + ctx context.Context, + userID uuid.UUID, + integrationRepo repos.Integration, + vaultObject vault.Vault, + DB database.Database, +) ([]Notification, error) { + emailIntegrations, err := integrationRepo.GetByServiceAndUser(ctx, integration.Email, userID, DB) + if err != nil { + return nil, err + } + + slackIntegrations, err := integrationRepo.GetByServiceAndUser(ctx, integration.Slack, userID, DB) + if err != nil { + return nil, err + } + + allIntegrations := make([]models.Integration, 0, len(emailIntegrations)+len(slackIntegrations)) + allIntegrations = append(allIntegrations, emailIntegrations...) + allIntegrations = append(allIntegrations, slackIntegrations...) + notifications := make([]Notification, 0, len(allIntegrations)) + for _, integrationObj := range allIntegrations { + integrationCopied := integrationObj + notification, err := NewNotificationFromIntegration(ctx, &integrationCopied, vaultObject) + if err != nil { + return nil, err + } + + notifications = append(notifications, notification) + } + + return notifications, nil } func NewNotificationFromIntegration( @@ -41,7 +86,7 @@ func NewNotificationFromIntegration( return nil, err } - return newEmailNotification(emailConf), nil + return newEmailNotification(integrationObject, emailConf), nil } if integrationObject.Service == integration.Slack { @@ -55,8 +100,32 @@ func NewNotificationFromIntegration( return nil, err } - return newSlackNotification(slackConf), nil + return newSlackNotification(integrationObject, slackConf), nil } return nil, ErrIntegrationTypeIsNotNotification } + +// `ShouldSend` determines if a notification at 'level' passes configuration +// specified by `thresholdLevel`. +// 'info' and 'neutral' will get through regardless of threshold. +// And 'info' or 'neutral' threshold lets everything through. +// Other states will follow the severity ordering. +func ShouldSend( + thresholdLevel shared.NotificationLevel, + level shared.NotificationLevel, +) bool { + if thresholdLevel == shared.InfoNotificationLevel || thresholdLevel == shared.NeutralNotificationLevel { + return true + } + + levelSeverityMap := map[shared.NotificationLevel]int{ + shared.SuccessNotificationLevel: 0, + shared.WarningNotificationLevel: 1, + shared.ErrorNotificationLevel: 2, + shared.InfoNotificationLevel: 3, + shared.NeutralNotificationLevel: 3, + } + + return levelSeverityMap[level] >= levelSeverityMap[thresholdLevel] +} diff --git a/src/golang/lib/notification/notification_test.go b/src/golang/lib/notification/notification_test.go new file mode 100644 index 000000000..1e6658e3f --- /dev/null +++ b/src/golang/lib/notification/notification_test.go @@ -0,0 +1,36 @@ +package notification + +import ( + "testing" + + "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/stretchr/testify/require" +) + +func TestShouldSend(t *testing.T) { + severities := []shared.NotificationLevel{ + shared.SuccessNotificationLevel, + shared.WarningNotificationLevel, + shared.ErrorNotificationLevel, + shared.InfoNotificationLevel, + shared.NeutralNotificationLevel, + } + + for _, severity := range severities { + require.Equal(t, true, ShouldSend(severity, severity)) + + // success, info and neutral thresholds always let through regardless of level + require.Equal(t, true, ShouldSend(shared.InfoNotificationLevel, severity)) + require.Equal(t, true, ShouldSend(shared.NeutralNotificationLevel, severity)) + require.Equal(t, true, ShouldSend(shared.SuccessNotificationLevel, severity)) + + // info and neutral always get through regardless of threshold + require.Equal(t, true, ShouldSend(severity, shared.InfoNotificationLevel)) + require.Equal(t, true, ShouldSend(severity, shared.NeutralNotificationLevel)) + } + + require.Equal(t, false, ShouldSend(shared.ErrorNotificationLevel, shared.WarningNotificationLevel)) + require.Equal(t, false, ShouldSend(shared.ErrorNotificationLevel, shared.SuccessNotificationLevel)) + require.Equal(t, true, ShouldSend(shared.WarningNotificationLevel, shared.ErrorNotificationLevel)) + require.Equal(t, false, ShouldSend(shared.WarningNotificationLevel, shared.SuccessNotificationLevel)) +} diff --git a/src/golang/lib/notification/slack.go b/src/golang/lib/notification/slack.go index 814ac66c3..5c6294ee3 100644 --- a/src/golang/lib/notification/slack.go +++ b/src/golang/lib/notification/slack.go @@ -1,25 +1,111 @@ package notification import ( + "context" + + "github.com/aqueducthq/aqueduct/lib/models" "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/dropbox/godropbox/errors" + "github.com/google/uuid" "github.com/slack-go/slack" ) +const maxChannelLimit = 2000 + type SlackNotification struct { - conf *shared.SlackConfig + integration *models.Integration + conf *shared.SlackConfig +} + +func newSlackNotification(integration *models.Integration, conf *shared.SlackConfig) *SlackNotification { + return &SlackNotification{integration: integration, conf: conf} +} + +func (s *SlackNotification) ID() uuid.UUID { + return s.integration.ID +} + +func (s *SlackNotification) Level() shared.NotificationLevel { + return s.conf.Level } -func newSlackNotification(conf *shared.SlackConfig) *SlackNotification { - return &SlackNotification{conf: conf} +// reference: https://stackoverflow.com/questions/50106263/slack-api-to-find-existing-channel +// We have to use list channel API together with a linear search. +func findChannels(client *slack.Client, names []string) ([]slack.Channel, error) { + channels, _ /* cursor */, err := client.GetConversations(&slack.GetConversationsParameters{ + ExcludeArchived: true, + Limit: maxChannelLimit, + }) + if err != nil { + return nil, err + } + + namesSet := make(map[string]bool, len(names)) + namesTaken := make(map[string]bool, len(names)) + for _, name := range names { + namesSet[name] = true + } + + // Slack channel names should be unique. We will still send notifications to all + // channels matching the given name. + results := make([]slack.Channel, 0, len(names)) + for _, channel := range channels { + _, ok := namesSet[channel.Name] + if ok { + results = append(results, channel) + namesTaken[channel.Name] = true + } + } + + if len(namesTaken) != len(namesSet) { + for name := range namesSet { + _, ok := namesTaken[name] + if !ok { + return nil, errors.Newf("Channel %s does not exist.", name) + } + } + } + + return results, nil } -func (e *SlackNotification) Send(msg string, level shared.NotificationLevel) error { - // TODO: Implement +func (s *SlackNotification) Send(ctx context.Context, msg string) error { + client := slack.New(s.conf.Token) + channels, err := findChannels(client, s.conf.Channels) + if err != nil { + return err + } + + for _, channel := range channels { + // reference: https://medium.com/@gausha/a-simple-slackbot-with-golang-c5a932d719c7 + _, _, _, err = client.SendMessage(channel.ID, slack.MsgOptionBlocks( + slack.NewSectionBlock( + slack.NewTextBlockObject( + "plain_text", + msg, + false, /* emoji */ + false, /* verbatim */ + ), + nil, + nil, + ), + )) + + if err != nil { + return err + } + } + return nil } func AuthenticateSlack(conf *shared.SlackConfig) error { client := slack.New(conf.Token) _, err := client.AuthTest() + if err != nil { + return err + } + + _, err = findChannels(client, conf.Channels) return err } diff --git a/src/golang/lib/repos/sqlite/workflow.go b/src/golang/lib/repos/sqlite/workflow.go index ffa34c8b8..62f6b0b33 100644 --- a/src/golang/lib/repos/sqlite/workflow.go +++ b/src/golang/lib/repos/sqlite/workflow.go @@ -10,6 +10,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/collections/workflow" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/models/views" "github.com/aqueducthq/aqueduct/lib/repos" "github.com/dropbox/godropbox/errors" @@ -253,6 +254,7 @@ func (*workflowWriter) Create( description string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings *mdl_shared.NotificationSettings, DB database.Database, ) (*models.Workflow, error) { cols := []string{ @@ -263,6 +265,7 @@ func (*workflowWriter) Create( models.WorkflowSchedule, models.WorkflowCreatedAt, models.WorkflowRetentionPolicy, + models.WorkflowNotificationSettings, } query := DB.PrepareInsertWithReturnAllStmt(models.WorkflowTable, cols, models.WorkflowCols()) @@ -271,7 +274,7 @@ func (*workflowWriter) Create( return nil, err } - args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy} + args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy, notificationSettings} return getWorkflow(ctx, DB, query, args...) } diff --git a/src/golang/lib/repos/tests/seed.go b/src/golang/lib/repos/tests/seed.go index 7b0fa90fb..967683dd5 100644 --- a/src/golang/lib/repos/tests/seed.go +++ b/src/golang/lib/repos/tests/seed.go @@ -187,6 +187,7 @@ func (ts *TestSuite) seedWorkflowWithUser(count int, userIDs []uuid.UUID) []mode description, schedule, retentionPolicy, + &shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) diff --git a/src/golang/lib/repos/tests/workflow.go b/src/golang/lib/repos/tests/workflow.go index d5e11e328..46f8190ab 100644 --- a/src/golang/lib/repos/tests/workflow.go +++ b/src/golang/lib/repos/tests/workflow.go @@ -60,6 +60,7 @@ func (ts *TestSuite) TestWorkflow_GetByScheduleTrigger() { &workflow.RetentionPolicy{ KLatestRuns: 5, }, + &shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) @@ -91,6 +92,7 @@ func (ts *TestSuite) TestWorkflow_GetTargets() { &workflow.RetentionPolicy{ KLatestRuns: 5, }, + &shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) @@ -130,6 +132,7 @@ func (ts *TestSuite) TestWorkflow_ValidateOrg() { func (ts *TestSuite) TestWorkflow_Create() { users := ts.seedUser(1) user := users[0] + notificationIntegrationID := uuid.New() expectedWorkflow := &models.Workflow{ UserID: user.ID, @@ -144,8 +147,11 @@ func (ts *TestSuite) TestWorkflow_Create() { RetentionPolicy: workflow.RetentionPolicy{ KLatestRuns: 10, }, - // Explicitly set `IsNull` to true, otherwise it will be false by default. - NotificationSettings: shared.NullNotificationSettings{IsNull: true}, + NotificationSettings: shared.NotificationSettings{ + Settings: map[uuid.UUID]shared.NotificationLevel{ + notificationIntegrationID: shared.ErrorNotificationLevel, + }, + }, } actualWorkflow, err := ts.workflow.Create( @@ -155,6 +161,7 @@ func (ts *TestSuite) TestWorkflow_Create() { expectedWorkflow.Description, &expectedWorkflow.Schedule, &expectedWorkflow.RetentionPolicy, + &expectedWorkflow.NotificationSettings, ts.DB, ) require.Nil(ts.T(), err) @@ -177,6 +184,7 @@ func (ts *TestSuite) TestWorkflow_Delete() { func (ts *TestSuite) TestWorkflow_Update() { workflows := ts.seedWorkflow(1) oldWorkflow := workflows[0] + notificationIntegrationID := uuid.New() newName := "new_workflow_name" newSchedule := workflow.Schedule{ @@ -186,9 +194,16 @@ func (ts *TestSuite) TestWorkflow_Update() { Paused: true, } + newNotificationSettings := shared.NotificationSettings{ + Settings: map[uuid.UUID]shared.NotificationLevel{ + notificationIntegrationID: shared.ErrorNotificationLevel, + }, + } + changes := map[string]interface{}{ - models.WorkflowName: newName, - models.WorkflowSchedule: &newSchedule, + models.WorkflowName: newName, + models.WorkflowSchedule: &newSchedule, + models.WorkflowNotificationSettings: &newNotificationSettings, } newWorkflow, err := ts.workflow.Update(ts.ctx, oldWorkflow.ID, changes, ts.DB) @@ -196,4 +211,5 @@ func (ts *TestSuite) TestWorkflow_Update() { requireDeepEqual(ts.T(), newSchedule, newWorkflow.Schedule) require.Equal(ts.T(), newName, newWorkflow.Name) + requireDeepEqual(ts.T(), newWorkflow.NotificationSettings, newNotificationSettings) } diff --git a/src/golang/lib/repos/workflow.go b/src/golang/lib/repos/workflow.go index 4c3b87f6c..0deaa1c30 100644 --- a/src/golang/lib/repos/workflow.go +++ b/src/golang/lib/repos/workflow.go @@ -7,6 +7,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/collections/workflow" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/models/views" "github.com/google/uuid" ) @@ -63,6 +64,7 @@ type workflowWriter interface { description string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings *mdl_shared.NotificationSettings, DB database.Database, ) (*models.Workflow, error) diff --git a/src/golang/lib/workflow/dag/workflow_dag.go b/src/golang/lib/workflow/dag/workflow_dag.go index b12f6a5e5..4328d8dab 100644 --- a/src/golang/lib/workflow/dag/workflow_dag.go +++ b/src/golang/lib/workflow/dag/workflow_dag.go @@ -7,6 +7,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/database" exec_env "github.com/aqueducthq/aqueduct/lib/execution_environment" "github.com/aqueducthq/aqueduct/lib/models" + "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/repos" "github.com/aqueducthq/aqueduct/lib/vault" "github.com/aqueducthq/aqueduct/lib/workflow/artifact" @@ -18,6 +19,10 @@ import ( ) type WorkflowDag interface { + UserID() uuid.UUID + Name() string + NotificationSettings() shared.NotificationSettings + Operators() map[uuid.UUID]operator.Operator Artifacts() map[uuid.UUID]artifact.Artifact @@ -68,6 +73,30 @@ type workflowDagImpl struct { artifactToInputOp map[uuid.UUID]uuid.UUID } +func (dag *workflowDagImpl) UserID() uuid.UUID { + if dag.dbDAG.Metadata == nil { + return uuid.UUID{} + } + + return dag.dbDAG.Metadata.UserID +} + +func (dag *workflowDagImpl) Name() string { + if dag.dbDAG.Metadata == nil { + return "" + } + + return dag.dbDAG.Metadata.Name +} + +func (dag *workflowDagImpl) NotificationSettings() shared.NotificationSettings { + if dag.dbDAG.Metadata == nil { + return shared.NotificationSettings{} + } + + return dag.dbDAG.Metadata.NotificationSettings +} + // Assumption: all dag's start with operators. // computeArtifactSignatures traverses over the entire dag structure from beginning to end, // computing the signatures for each artifact. These signatures are returned in a map keyed diff --git a/src/golang/lib/workflow/utils/database.go b/src/golang/lib/workflow/utils/database.go index 578554328..55ea416d0 100644 --- a/src/golang/lib/workflow/utils/database.go +++ b/src/golang/lib/workflow/utils/database.go @@ -41,6 +41,7 @@ func WriteDAGToDatabase( dag.Metadata.Description, &dag.Metadata.Schedule, &dag.Metadata.RetentionPolicy, + &dag.Metadata.NotificationSettings, DB, ) if err != nil { diff --git a/src/ui/common/src/components/integrations/cards/slackCard.tsx b/src/ui/common/src/components/integrations/cards/slackCard.tsx index 299f35ead..df6380e8d 100644 --- a/src/ui/common/src/components/integrations/cards/slackCard.tsx +++ b/src/ui/common/src/components/integrations/cards/slackCard.tsx @@ -23,7 +23,7 @@ export const SlackCard: React.FC = ({ integration }) => { Level: - {config.level.toUpperCase()} + {config.level[0].toUpperCase() + config.level.slice(1)} ); diff --git a/src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx b/src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx new file mode 100644 index 000000000..4b1ee6541 --- /dev/null +++ b/src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx @@ -0,0 +1,124 @@ +import { faPlusSquare, faTrash } from '@fortawesome/free-solid-svg-icons'; +import { FontAwesomeIcon } from '@fortawesome/react-fontawesome'; +import { Box, MenuItem, Select, Typography } from '@mui/material'; +import React from 'react'; + +import { theme } from '../../styles/theme/theme'; +import { Integration } from '../../utils/integrations'; +import { NotificationLogLevel } from '../../utils/notifications'; +import { NotificationSettingsMap } from '../../utils/workflows'; +import NotificationLevelSelector from '../notifications/NotificationLevelSelector'; + +type SelectedNotificationEntryProps = { + remainingNotificationIntegrations: Integration[]; + selected: Integration; + level: NotificationLogLevel | undefined; + onSelect: ( + id: string, + level: NotificationLogLevel | undefined, + replacingID?: string + ) => void; + onRemove: (id: string) => void; +}; + +type Props = { + notificationIntegrations: Integration[]; + curSettingsMap: NotificationSettingsMap; + onSelect: ( + id: string, + level?: NotificationLogLevel, + replacingID?: string + ) => void; + onRemove: (id: string) => void; +}; + +export const SelectedNotificationEntry: React.FC< + SelectedNotificationEntryProps +> = ({ + remainingNotificationIntegrations, + selected, + level, + onSelect, + onRemove, +}) => { + return ( + + + + + onRemove(selected.id)} + /> + + + + onSelect(selected.id, level)} + /> + + + ); +}; + +const WorkflowNotificationSettings: React.FC = ({ + notificationIntegrations, + curSettingsMap, + onSelect, + onRemove, +}) => { + const selectedIDs = Object.keys(curSettingsMap); + const remainingIntegrations = notificationIntegrations.filter( + (x) => !selectedIDs.includes(x.id) + ); + const integrationsByID: { [id: string]: Integration } = {}; + notificationIntegrations.forEach((x) => (integrationsByID[x.id] = x)); + + const selectedEntries = Object.entries(curSettingsMap).map(([id, level]) => ( + + + + )); + + return ( + + {selectedEntries} + {remainingIntegrations.length > 0 && ( + 0 ? 2 : 1}> + onSelect(remainingIntegrations[0].id, undefined)} + /> + + )} + + ); +}; + +export default WorkflowNotificationSettings; diff --git a/src/ui/common/src/components/workflows/WorkflowSettings.tsx b/src/ui/common/src/components/workflows/WorkflowSettings.tsx index 65410eb6b..451bf93ae 100644 --- a/src/ui/common/src/components/workflows/WorkflowSettings.tsx +++ b/src/ui/common/src/components/workflows/WorkflowSettings.tsx @@ -51,10 +51,15 @@ import { getNextUpdateTime, PeriodUnit, } from '../../utils/cron'; +import { + IntegrationCategories, + SupportedIntegrations, +} from '../../utils/integrations'; import { UpdateMode } from '../../utils/operators'; import ExecutionStatus, { LoadingStatusEnum } from '../../utils/shared'; import { getSavedObjectIdentifier, + NotificationSettingsMap, RetentionPolicy, SavedObject, WorkflowDag, @@ -65,6 +70,7 @@ import { Button } from '../primitives/Button.styles'; import { LoadingButton } from '../primitives/LoadingButton.styles'; import StorageSelector from './storageSelector'; import TriggerSourceSelector from './triggerSourceSelector'; +import WorkflowNotificationSettings from './WorkflowNotificationSettings'; type PeriodicScheduleSelectorProps = { cronString: string; @@ -226,6 +232,28 @@ type WorkflowSettingsProps = { onClose: () => void; }; +// Returns whether `updated` is different from `existing`. +function IsNotificationSettingsMapUpdated( + curSettingsMap: NotificationSettingsMap, + newSettingsMap: NotificationSettingsMap +): boolean { + // Starting here, both `curSettings` and `newSettings` should be non-empty. + if ( + Object.keys(curSettingsMap).length !== Object.keys(newSettingsMap).length + ) { + return true; + } + + // both should have the same key size. Check k-v match + let updated = false; + Object.entries(curSettingsMap).forEach(([k, v]) => { + if (newSettingsMap[k] !== v) { + updated = true; + } + }); + return updated; +} + const WorkflowSettings: React.FC = ({ user, workflowDag, @@ -266,6 +294,16 @@ const WorkflowSettings: React.FC = ({ (state: RootState) => state.listWorkflowReducer.workflows ); + const integrations = useSelector( + (state: RootState) => state.integrationsReducer.integrations + ); + + const notificationIntegrations = Object.values(integrations).filter( + (x) => + SupportedIntegrations[x.service].category === + IntegrationCategories.NOTIFICATION + ); + const [name, setName] = useState(workflowDag.metadata?.name); const [description, setDescription] = useState( workflowDag.metadata?.description @@ -283,10 +321,15 @@ const WorkflowSettings: React.FC = ({ const [retentionPolicy, setRetentionPolicy] = useState( workflowDag.metadata?.retention_policy ); - const retentionPolicyUpdated = - retentionPolicy.k_latest_runs !== - workflowDag.metadata?.retention_policy?.k_latest_runs; + const [notificationSettingsMap, setNotificationSettingsMap] = + useState( + workflowDag.metadata?.notification_settings?.settings ?? {} + ); + // filter out empty key / values + const normalizedNotificationSettingsMap = Object.fromEntries( + Object.entries(notificationSettingsMap).filter(([k, v]) => !!k && !!v) + ); const initialSettings = { name: workflowDag.metadata?.name, description: workflowDag.metadata?.description, @@ -295,8 +338,19 @@ const WorkflowSettings: React.FC = ({ paused: workflowDag.metadata.schedule.paused, retentionPolicy: workflowDag.metadata?.retention_policy, sourceId: workflowDag.metadata?.schedule?.source_id, + notificationSettingsMap: + workflowDag.metadata?.notification_settings?.settings ?? {}, }; + const retentionPolicyUpdated = + retentionPolicy.k_latest_runs !== + workflowDag.metadata?.retention_policy?.k_latest_runs; + + const isNotificationSettingsUpdated = IsNotificationSettingsMapUpdated( + initialSettings.notificationSettingsMap, + normalizedNotificationSettingsMap + ); + const settingsChanged = name !== workflowDag.metadata?.name || // The workflow name has been changed. description !== workflowDag.metadata?.description || // The workflow description has changed. @@ -306,7 +360,8 @@ const WorkflowSettings: React.FC = ({ (triggerType === WorkflowUpdateTrigger.Cascade && // The trigger type is still cascade but the source has changed. sourceId !== workflowDag.metadata?.schedule?.source_id) || paused !== workflowDag.metadata.schedule.paused || // The schedule type is periodic and we've changed the pausedness of the workflow. - retentionPolicyUpdated; // retention policy has changed. + retentionPolicyUpdated || + isNotificationSettingsUpdated; // retention policy has changed. const triggerOptions = [ { label: 'Update Manually', value: WorkflowUpdateTrigger.Manual }, @@ -471,6 +526,9 @@ const WorkflowSettings: React.FC = ({ : '00000000-0000-0000-0000-000000000000', }, retention_policy: retentionPolicyUpdated ? retentionPolicy : undefined, + notification_settings: isNotificationSettingsUpdated + ? { settings: normalizedNotificationSettingsMap } + : undefined, }; fetch(`${apiAddress}/api/workflow/${workflowDag.workflow_id}/edit`, { @@ -911,6 +969,33 @@ const WorkflowSettings: React.FC = ({ + {notificationIntegrations.length > 0 && ( + + + Notifications + + + { + const newSettings = { ...notificationSettingsMap }; + newSettings[id] = level; + if (replacingID) { + delete newSettings[replacingID]; + } + + setNotificationSettingsMap(newSettings); + }} + onRemove={(id) => { + const newSettings = { ...notificationSettingsMap }; + delete newSettings[id]; + setNotificationSettingsMap(newSettings); + }} + /> + + )} +