From b636c9c9f01df9a0ba9bfbc42f7ba3bcb2faef31 Mon Sep 17 00:00:00 2001 From: likawind Date: Tue, 31 Jan 2023 02:24:29 +0000 Subject: [PATCH 1/4] compiles and unit tests --- .../cmd/server/handler/edit_workflow.go | 40 ++++++++++++------- .../cmd/server/handler/register_workflow.go | 2 + src/golang/lib/engine/aq_engine.go | 6 +++ src/golang/lib/engine/engine.go | 6 +++ src/golang/lib/models/shared/workflow.go | 28 ------------- src/golang/lib/models/workflow.go | 16 ++++---- src/golang/lib/repos/sqlite/workflow.go | 5 ++- src/golang/lib/repos/tests/seed.go | 1 + src/golang/lib/repos/tests/workflow.go | 20 ++++++++-- src/golang/lib/repos/workflow.go | 2 + src/golang/lib/workflow/dag/workflow_dag.go | 2 +- src/golang/lib/workflow/utils/database.go | 1 + 12 files changed, 73 insertions(+), 56 deletions(-) diff --git a/src/golang/cmd/server/handler/edit_workflow.go b/src/golang/cmd/server/handler/edit_workflow.go index 3bf0d585c..42b11f43f 100644 --- a/src/golang/cmd/server/handler/edit_workflow.go +++ b/src/golang/cmd/server/handler/edit_workflow.go @@ -10,6 +10,7 @@ import ( aq_context "github.com/aqueducthq/aqueduct/lib/context" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/engine" + "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/repos" "github.com/aqueducthq/aqueduct/lib/workflow" "github.com/aqueducthq/aqueduct/lib/workflow/utils" @@ -43,18 +44,25 @@ type EditWorkflowHandler struct { } type editWorkflowInput struct { - WorkflowName string `json:"name"` - WorkflowDescription string `json:"description"` - Schedule *col_workflow.Schedule `json:"schedule"` - RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"` + WorkflowName string `json:"name"` + WorkflowDescription string `json:"description"` + Schedule *col_workflow.Schedule `json:"schedule"` + RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"` + NotificationSettings shared.NotificationSettings `json:"notification_settings"` + // We use an explicit flag to specify if `NotificationSettings` should be updated + // to cover the case where this field is updated to `nil`. + // Overall, we update workflow with `NotificationSettings` only if this flag is turned on. + UpdateNotificationSettings bool `json:"update_notification_settings"` } type editWorkflowArgs struct { - workflowId uuid.UUID - workflowName string - workflowDescription string - schedule *col_workflow.Schedule - retentionPolicy *col_workflow.RetentionPolicy + workflowId uuid.UUID + workflowName string + workflowDescription string + schedule *col_workflow.Schedule + retentionPolicy *col_workflow.RetentionPolicy + notificationSettings shared.NotificationSettings + updateNotificationSettings bool `json:"update_notification_settings"` } func (*EditWorkflowHandler) Name() string { @@ -116,11 +124,13 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) } return &editWorkflowArgs{ - workflowId: workflowID, - workflowName: input.WorkflowName, - workflowDescription: input.WorkflowDescription, - schedule: input.Schedule, - retentionPolicy: input.RetentionPolicy, + workflowId: workflowID, + workflowName: input.WorkflowName, + workflowDescription: input.WorkflowDescription, + schedule: input.Schedule, + retentionPolicy: input.RetentionPolicy, + notificationSettings: input.NotificationSettings, + updateNotificationSettings: input.UpdateNotificationSettings, }, http.StatusOK, nil } @@ -173,6 +183,8 @@ func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfa args.workflowDescription, args.schedule, args.retentionPolicy, + args.notificationSettings, + args.updateNotificationSettings, ) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/cmd/server/handler/register_workflow.go b/src/golang/cmd/server/handler/register_workflow.go index d899c81cd..e56b5f009 100644 --- a/src/golang/cmd/server/handler/register_workflow.go +++ b/src/golang/cmd/server/handler/register_workflow.go @@ -229,6 +229,8 @@ func (h *RegisterWorkflowHandler) Perform(ctx context.Context, interfaceArgs int dbWorkflowDag.Metadata.Description, &dbWorkflowDag.Metadata.Schedule, &dbWorkflowDag.Metadata.RetentionPolicy, + dbWorkflowDag.Metadata.NotificationSettings, + true, // updateNotificationSettings. Set to true to attempt to update. ) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/lib/engine/aq_engine.go b/src/golang/lib/engine/aq_engine.go index c7ade25b9..9d65013f6 100644 --- a/src/golang/lib/engine/aq_engine.go +++ b/src/golang/lib/engine/aq_engine.go @@ -638,6 +638,8 @@ func (eng *aqEngine) EditWorkflow( workflowDescription string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings mdl_shared.NotificationSettings, + updateNotificationSettings bool, ) error { changes := map[string]interface{}{} if workflowName != "" { @@ -652,6 +654,10 @@ func (eng *aqEngine) EditWorkflow( changes[models.WorkflowRetentionPolicy] = retentionPolicy } + if updateNotificationSettings { + changes[models.WorkflowNotificationSettings] = notificationSettings + } + if schedule.Trigger != "" { cronjobName := shared_utils.AppendPrefix(workflowID.String()) err := eng.updateWorkflowSchedule(ctx, workflowID, cronjobName, schedule) diff --git a/src/golang/lib/engine/engine.go b/src/golang/lib/engine/engine.go index a562209fa..c1cb7073c 100644 --- a/src/golang/lib/engine/engine.go +++ b/src/golang/lib/engine/engine.go @@ -10,6 +10,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/database" exec_env "github.com/aqueducthq/aqueduct/lib/execution_environment" "github.com/aqueducthq/aqueduct/lib/models" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/dropbox/godropbox/errors" "github.com/google/uuid" ) @@ -49,7 +50,12 @@ type Engine interface { workflowName string, workflowDescription string, schedule *workflow.Schedule, + // We use an explicit flag to specify if `NotificationSettings` should be updated + // to cover the case where this field is updated to `nil`. + // Overall, we update workflow with `NotificationSettings` only if this flag is turned on. retentionPolicy *workflow.RetentionPolicy, + notificationSettings mdl_shared.NotificationSettings, + updateNotificationSettings bool, ) error // TODO ENG-1444: Used as a wrapper to trigger a workflow via executor binary. diff --git a/src/golang/lib/models/shared/workflow.go b/src/golang/lib/models/shared/workflow.go index 25b1ab754..53f6e0d05 100644 --- a/src/golang/lib/models/shared/workflow.go +++ b/src/golang/lib/models/shared/workflow.go @@ -17,31 +17,3 @@ func (s *NotificationSettings) Value() (driver.Value, error) { func (s *NotificationSettings) Scan(value interface{}) error { return utils.ScanJSONB(value, s) } - -type NullNotificationSettings struct { - NotificationSettings - IsNull bool -} - -func (n *NullNotificationSettings) Value() (driver.Value, error) { - if n.IsNull { - return nil, nil - } - - return (&n.NotificationSettings).Value() -} - -func (n *NullNotificationSettings) Scan(value interface{}) error { - if value == nil { - n.IsNull = true - return nil - } - - s := &NotificationSettings{} - if err := s.Scan(value); err != nil { - return err - } - - n.NotificationSettings, n.IsNull = *s, false - return nil -} diff --git a/src/golang/lib/models/workflow.go b/src/golang/lib/models/workflow.go index 6d39f9d0f..fe26b8c79 100644 --- a/src/golang/lib/models/workflow.go +++ b/src/golang/lib/models/workflow.go @@ -26,14 +26,14 @@ const ( // A Workflow maps to the workflow table. type Workflow struct { - ID uuid.UUID `db:"id" json:"id"` - UserID uuid.UUID `db:"user_id" json:"user_id"` - Name string `db:"name" json:"name"` - Description string `db:"description" json:"description"` - Schedule workflow.Schedule `db:"schedule" json:"schedule"` - CreatedAt time.Time `db:"created_at" json:"created_at"` - RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"` - NotificationSettings shared.NullNotificationSettings `db:"notification_settings" json:"notification_settings"` + ID uuid.UUID `db:"id" json:"id"` + UserID uuid.UUID `db:"user_id" json:"user_id"` + Name string `db:"name" json:"name"` + Description string `db:"description" json:"description"` + Schedule workflow.Schedule `db:"schedule" json:"schedule"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"` + NotificationSettings shared.NotificationSettings `db:"notification_settings" json:"notification_settings"` } // WorkflowCols returns a comma-separated string of all Workflow columns. diff --git a/src/golang/lib/repos/sqlite/workflow.go b/src/golang/lib/repos/sqlite/workflow.go index ffa34c8b8..4d42ce526 100644 --- a/src/golang/lib/repos/sqlite/workflow.go +++ b/src/golang/lib/repos/sqlite/workflow.go @@ -10,6 +10,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/collections/workflow" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/models/views" "github.com/aqueducthq/aqueduct/lib/repos" "github.com/dropbox/godropbox/errors" @@ -253,6 +254,7 @@ func (*workflowWriter) Create( description string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings mdl_shared.NotificationSettings, DB database.Database, ) (*models.Workflow, error) { cols := []string{ @@ -263,6 +265,7 @@ func (*workflowWriter) Create( models.WorkflowSchedule, models.WorkflowCreatedAt, models.WorkflowRetentionPolicy, + models.WorkflowNotificationSettings, } query := DB.PrepareInsertWithReturnAllStmt(models.WorkflowTable, cols, models.WorkflowCols()) @@ -271,7 +274,7 @@ func (*workflowWriter) Create( return nil, err } - args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy} + args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy, notificationSettings} return getWorkflow(ctx, DB, query, args...) } diff --git a/src/golang/lib/repos/tests/seed.go b/src/golang/lib/repos/tests/seed.go index 7b0fa90fb..dd0c12fb1 100644 --- a/src/golang/lib/repos/tests/seed.go +++ b/src/golang/lib/repos/tests/seed.go @@ -187,6 +187,7 @@ func (ts *TestSuite) seedWorkflowWithUser(count int, userIDs []uuid.UUID) []mode description, schedule, retentionPolicy, + shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) diff --git a/src/golang/lib/repos/tests/workflow.go b/src/golang/lib/repos/tests/workflow.go index d5e11e328..43d63564c 100644 --- a/src/golang/lib/repos/tests/workflow.go +++ b/src/golang/lib/repos/tests/workflow.go @@ -60,6 +60,7 @@ func (ts *TestSuite) TestWorkflow_GetByScheduleTrigger() { &workflow.RetentionPolicy{ KLatestRuns: 5, }, + nil, ts.DB, ) require.Nil(ts.T(), err) @@ -91,6 +92,7 @@ func (ts *TestSuite) TestWorkflow_GetTargets() { &workflow.RetentionPolicy{ KLatestRuns: 5, }, + nil, ts.DB, ) require.Nil(ts.T(), err) @@ -130,6 +132,7 @@ func (ts *TestSuite) TestWorkflow_ValidateOrg() { func (ts *TestSuite) TestWorkflow_Create() { users := ts.seedUser(1) user := users[0] + notificationIntegrationID := uuid.New() expectedWorkflow := &models.Workflow{ UserID: user.ID, @@ -144,8 +147,9 @@ func (ts *TestSuite) TestWorkflow_Create() { RetentionPolicy: workflow.RetentionPolicy{ KLatestRuns: 10, }, - // Explicitly set `IsNull` to true, otherwise it will be false by default. - NotificationSettings: shared.NullNotificationSettings{IsNull: true}, + NotificationSettings: shared.NotificationSettings{ + notificationIntegrationID: shared.ErrorNotificationLevel, // just need an arbitrary uuid + }, } actualWorkflow, err := ts.workflow.Create( @@ -155,6 +159,7 @@ func (ts *TestSuite) TestWorkflow_Create() { expectedWorkflow.Description, &expectedWorkflow.Schedule, &expectedWorkflow.RetentionPolicy, + expectedWorkflow.NotificationSettings, ts.DB, ) require.Nil(ts.T(), err) @@ -177,6 +182,7 @@ func (ts *TestSuite) TestWorkflow_Delete() { func (ts *TestSuite) TestWorkflow_Update() { workflows := ts.seedWorkflow(1) oldWorkflow := workflows[0] + notificationIntegrationID := uuid.New() newName := "new_workflow_name" newSchedule := workflow.Schedule{ @@ -186,9 +192,14 @@ func (ts *TestSuite) TestWorkflow_Update() { Paused: true, } + newNotificationSettings := shared.NotificationSettings{ + notificationIntegrationID: shared.ErrorNotificationLevel, + } + changes := map[string]interface{}{ - models.WorkflowName: newName, - models.WorkflowSchedule: &newSchedule, + models.WorkflowName: newName, + models.WorkflowSchedule: &newSchedule, + models.WorkflowNotificationSettings: newNotificationSettings, } newWorkflow, err := ts.workflow.Update(ts.ctx, oldWorkflow.ID, changes, ts.DB) @@ -196,4 +207,5 @@ func (ts *TestSuite) TestWorkflow_Update() { requireDeepEqual(ts.T(), newSchedule, newWorkflow.Schedule) require.Equal(ts.T(), newName, newWorkflow.Name) + requireDeepEqual(ts.T(), newWorkflow.NotificationSettings, newNotificationSettings) } diff --git a/src/golang/lib/repos/workflow.go b/src/golang/lib/repos/workflow.go index 4c3b87f6c..bbb760ffa 100644 --- a/src/golang/lib/repos/workflow.go +++ b/src/golang/lib/repos/workflow.go @@ -7,6 +7,7 @@ import ( "github.com/aqueducthq/aqueduct/lib/collections/workflow" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" + mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared" "github.com/aqueducthq/aqueduct/lib/models/views" "github.com/google/uuid" ) @@ -63,6 +64,7 @@ type workflowWriter interface { description string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, + notificationSettings mdl_shared.NotificationSettings, DB database.Database, ) (*models.Workflow, error) diff --git a/src/golang/lib/workflow/dag/workflow_dag.go b/src/golang/lib/workflow/dag/workflow_dag.go index 620d22438..bbde280f8 100644 --- a/src/golang/lib/workflow/dag/workflow_dag.go +++ b/src/golang/lib/workflow/dag/workflow_dag.go @@ -86,7 +86,7 @@ func (dag *workflowDagImpl) NotificationSettings() shared.NotificationSettings { return shared.NotificationSettings{} } - return dag.dbDAG.Metadata.NotificationSettings.NotificationSettings + return dag.dbDAG.Metadata.NotificationSettings } // Assumption: all dag's start with operators. diff --git a/src/golang/lib/workflow/utils/database.go b/src/golang/lib/workflow/utils/database.go index 578554328..779a236ed 100644 --- a/src/golang/lib/workflow/utils/database.go +++ b/src/golang/lib/workflow/utils/database.go @@ -41,6 +41,7 @@ func WriteDAGToDatabase( dag.Metadata.Description, &dag.Metadata.Schedule, &dag.Metadata.RetentionPolicy, + dag.Metadata.NotificationSettings, DB, ) if err != nil { From 6dbf43a62169ba2dc0543e61044158646ac6f1d0 Mon Sep 17 00:00:00 2001 From: likawind Date: Tue, 31 Jan 2023 06:22:05 +0000 Subject: [PATCH 2/4] fix scan --- src/golang/lib/models/shared/workflow.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/golang/lib/models/shared/workflow.go b/src/golang/lib/models/shared/workflow.go index 53f6e0d05..961f3cfcb 100644 --- a/src/golang/lib/models/shared/workflow.go +++ b/src/golang/lib/models/shared/workflow.go @@ -15,5 +15,10 @@ func (s *NotificationSettings) Value() (driver.Value, error) { } func (s *NotificationSettings) Scan(value interface{}) error { + if value == nil { + *s = nil + return nil + } + return utils.ScanJSONB(value, s) } From ee85121faa473c30ebc11916fab0d188cc0197e0 Mon Sep 17 00:00:00 2001 From: likawind Date: Tue, 31 Jan 2023 07:31:40 +0000 Subject: [PATCH 3/4] seems to be working with maps --- .../cmd/server/handler/edit_workflow.go | 33 ++++++++----------- .../cmd/server/handler/register_workflow.go | 3 +- src/golang/lib/engine/aq_engine.go | 5 ++- src/golang/lib/engine/engine.go | 6 +--- src/golang/lib/engine/notification.go | 2 +- src/golang/lib/models/shared/workflow.go | 7 ++-- src/golang/lib/repos/sqlite/workflow.go | 2 +- src/golang/lib/repos/tests/seed.go | 2 +- src/golang/lib/repos/tests/workflow.go | 16 +++++---- src/golang/lib/repos/workflow.go | 2 +- src/golang/lib/workflow/utils/database.go | 2 +- 11 files changed, 37 insertions(+), 43 deletions(-) diff --git a/src/golang/cmd/server/handler/edit_workflow.go b/src/golang/cmd/server/handler/edit_workflow.go index 42b11f43f..41ad50f1d 100644 --- a/src/golang/cmd/server/handler/edit_workflow.go +++ b/src/golang/cmd/server/handler/edit_workflow.go @@ -48,21 +48,16 @@ type editWorkflowInput struct { WorkflowDescription string `json:"description"` Schedule *col_workflow.Schedule `json:"schedule"` RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"` - NotificationSettings shared.NotificationSettings `json:"notification_settings"` - // We use an explicit flag to specify if `NotificationSettings` should be updated - // to cover the case where this field is updated to `nil`. - // Overall, we update workflow with `NotificationSettings` only if this flag is turned on. - UpdateNotificationSettings bool `json:"update_notification_settings"` + NotificationSettings *shared.NotificationSettings `json:"notification_settings"` } type editWorkflowArgs struct { - workflowId uuid.UUID - workflowName string - workflowDescription string - schedule *col_workflow.Schedule - retentionPolicy *col_workflow.RetentionPolicy - notificationSettings shared.NotificationSettings - updateNotificationSettings bool `json:"update_notification_settings"` + workflowId uuid.UUID + workflowName string + workflowDescription string + schedule *col_workflow.Schedule + retentionPolicy *col_workflow.RetentionPolicy + notificationSettings *shared.NotificationSettings } func (*EditWorkflowHandler) Name() string { @@ -124,13 +119,12 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) } return &editWorkflowArgs{ - workflowId: workflowID, - workflowName: input.WorkflowName, - workflowDescription: input.WorkflowDescription, - schedule: input.Schedule, - retentionPolicy: input.RetentionPolicy, - notificationSettings: input.NotificationSettings, - updateNotificationSettings: input.UpdateNotificationSettings, + workflowId: workflowID, + workflowName: input.WorkflowName, + workflowDescription: input.WorkflowDescription, + schedule: input.Schedule, + retentionPolicy: input.RetentionPolicy, + notificationSettings: input.NotificationSettings, }, http.StatusOK, nil } @@ -184,7 +178,6 @@ func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfa args.schedule, args.retentionPolicy, args.notificationSettings, - args.updateNotificationSettings, ) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/cmd/server/handler/register_workflow.go b/src/golang/cmd/server/handler/register_workflow.go index e56b5f009..3e168a68e 100644 --- a/src/golang/cmd/server/handler/register_workflow.go +++ b/src/golang/cmd/server/handler/register_workflow.go @@ -229,8 +229,7 @@ func (h *RegisterWorkflowHandler) Perform(ctx context.Context, interfaceArgs int dbWorkflowDag.Metadata.Description, &dbWorkflowDag.Metadata.Schedule, &dbWorkflowDag.Metadata.RetentionPolicy, - dbWorkflowDag.Metadata.NotificationSettings, - true, // updateNotificationSettings. Set to true to attempt to update. + &dbWorkflowDag.Metadata.NotificationSettings, ) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.") diff --git a/src/golang/lib/engine/aq_engine.go b/src/golang/lib/engine/aq_engine.go index 9d65013f6..eef60e2ea 100644 --- a/src/golang/lib/engine/aq_engine.go +++ b/src/golang/lib/engine/aq_engine.go @@ -638,8 +638,7 @@ func (eng *aqEngine) EditWorkflow( workflowDescription string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, - notificationSettings mdl_shared.NotificationSettings, - updateNotificationSettings bool, + notificationSettings *mdl_shared.NotificationSettings, ) error { changes := map[string]interface{}{} if workflowName != "" { @@ -654,7 +653,7 @@ func (eng *aqEngine) EditWorkflow( changes[models.WorkflowRetentionPolicy] = retentionPolicy } - if updateNotificationSettings { + if notificationSettings != nil { changes[models.WorkflowNotificationSettings] = notificationSettings } diff --git a/src/golang/lib/engine/engine.go b/src/golang/lib/engine/engine.go index c1cb7073c..ed0d70d19 100644 --- a/src/golang/lib/engine/engine.go +++ b/src/golang/lib/engine/engine.go @@ -50,12 +50,8 @@ type Engine interface { workflowName string, workflowDescription string, schedule *workflow.Schedule, - // We use an explicit flag to specify if `NotificationSettings` should be updated - // to cover the case where this field is updated to `nil`. - // Overall, we update workflow with `NotificationSettings` only if this flag is turned on. retentionPolicy *workflow.RetentionPolicy, - notificationSettings mdl_shared.NotificationSettings, - updateNotificationSettings bool, + notificationSettings *mdl_shared.NotificationSettings, ) error // TODO ENG-1444: Used as a wrapper to trigger a workflow via executor binary. diff --git a/src/golang/lib/engine/notification.go b/src/golang/lib/engine/notification.go index 240626758..1a80205e2 100644 --- a/src/golang/lib/engine/notification.go +++ b/src/golang/lib/engine/notification.go @@ -53,7 +53,7 @@ func (eng *aqEngine) sendNotifications( } msg := notificationMsg(wfDag, level, contextMsg) - workflowSettings := wfDag.NotificationSettings() + workflowSettings := wfDag.NotificationSettings().Settings for _, notificationObj := range notifications { if len(workflowSettings) > 0 { // send based on settings diff --git a/src/golang/lib/models/shared/workflow.go b/src/golang/lib/models/shared/workflow.go index 961f3cfcb..7a0fcd53c 100644 --- a/src/golang/lib/models/shared/workflow.go +++ b/src/golang/lib/models/shared/workflow.go @@ -8,7 +8,10 @@ import ( ) // `NotificationSettings` maps IntegrationID to NotificationLevel -type NotificationSettings map[uuid.UUID]NotificationLevel +// This has to be a struct since sql driver does not support map type. +type NotificationSettings struct { + Settings map[uuid.UUID]NotificationLevel `json:"settings"` +} func (s *NotificationSettings) Value() (driver.Value, error) { return utils.ValueJSONB(*s) @@ -16,7 +19,7 @@ func (s *NotificationSettings) Value() (driver.Value, error) { func (s *NotificationSettings) Scan(value interface{}) error { if value == nil { - *s = nil + s.Settings = nil return nil } diff --git a/src/golang/lib/repos/sqlite/workflow.go b/src/golang/lib/repos/sqlite/workflow.go index 4d42ce526..62f6b0b33 100644 --- a/src/golang/lib/repos/sqlite/workflow.go +++ b/src/golang/lib/repos/sqlite/workflow.go @@ -254,7 +254,7 @@ func (*workflowWriter) Create( description string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, - notificationSettings mdl_shared.NotificationSettings, + notificationSettings *mdl_shared.NotificationSettings, DB database.Database, ) (*models.Workflow, error) { cols := []string{ diff --git a/src/golang/lib/repos/tests/seed.go b/src/golang/lib/repos/tests/seed.go index dd0c12fb1..967683dd5 100644 --- a/src/golang/lib/repos/tests/seed.go +++ b/src/golang/lib/repos/tests/seed.go @@ -187,7 +187,7 @@ func (ts *TestSuite) seedWorkflowWithUser(count int, userIDs []uuid.UUID) []mode description, schedule, retentionPolicy, - shared.NotificationSettings{}, + &shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) diff --git a/src/golang/lib/repos/tests/workflow.go b/src/golang/lib/repos/tests/workflow.go index 43d63564c..46f8190ab 100644 --- a/src/golang/lib/repos/tests/workflow.go +++ b/src/golang/lib/repos/tests/workflow.go @@ -60,7 +60,7 @@ func (ts *TestSuite) TestWorkflow_GetByScheduleTrigger() { &workflow.RetentionPolicy{ KLatestRuns: 5, }, - nil, + &shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) @@ -92,7 +92,7 @@ func (ts *TestSuite) TestWorkflow_GetTargets() { &workflow.RetentionPolicy{ KLatestRuns: 5, }, - nil, + &shared.NotificationSettings{}, ts.DB, ) require.Nil(ts.T(), err) @@ -148,7 +148,9 @@ func (ts *TestSuite) TestWorkflow_Create() { KLatestRuns: 10, }, NotificationSettings: shared.NotificationSettings{ - notificationIntegrationID: shared.ErrorNotificationLevel, // just need an arbitrary uuid + Settings: map[uuid.UUID]shared.NotificationLevel{ + notificationIntegrationID: shared.ErrorNotificationLevel, + }, }, } @@ -159,7 +161,7 @@ func (ts *TestSuite) TestWorkflow_Create() { expectedWorkflow.Description, &expectedWorkflow.Schedule, &expectedWorkflow.RetentionPolicy, - expectedWorkflow.NotificationSettings, + &expectedWorkflow.NotificationSettings, ts.DB, ) require.Nil(ts.T(), err) @@ -193,13 +195,15 @@ func (ts *TestSuite) TestWorkflow_Update() { } newNotificationSettings := shared.NotificationSettings{ - notificationIntegrationID: shared.ErrorNotificationLevel, + Settings: map[uuid.UUID]shared.NotificationLevel{ + notificationIntegrationID: shared.ErrorNotificationLevel, + }, } changes := map[string]interface{}{ models.WorkflowName: newName, models.WorkflowSchedule: &newSchedule, - models.WorkflowNotificationSettings: newNotificationSettings, + models.WorkflowNotificationSettings: &newNotificationSettings, } newWorkflow, err := ts.workflow.Update(ts.ctx, oldWorkflow.ID, changes, ts.DB) diff --git a/src/golang/lib/repos/workflow.go b/src/golang/lib/repos/workflow.go index bbb760ffa..0deaa1c30 100644 --- a/src/golang/lib/repos/workflow.go +++ b/src/golang/lib/repos/workflow.go @@ -64,7 +64,7 @@ type workflowWriter interface { description string, schedule *workflow.Schedule, retentionPolicy *workflow.RetentionPolicy, - notificationSettings mdl_shared.NotificationSettings, + notificationSettings *mdl_shared.NotificationSettings, DB database.Database, ) (*models.Workflow, error) diff --git a/src/golang/lib/workflow/utils/database.go b/src/golang/lib/workflow/utils/database.go index 779a236ed..55ea416d0 100644 --- a/src/golang/lib/workflow/utils/database.go +++ b/src/golang/lib/workflow/utils/database.go @@ -41,7 +41,7 @@ func WriteDAGToDatabase( dag.Metadata.Description, &dag.Metadata.Schedule, &dag.Metadata.RetentionPolicy, - dag.Metadata.NotificationSettings, + &dag.Metadata.NotificationSettings, DB, ) if err != nil { From ec0304842cca718384caf7813bcbe8f482907661 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 3 Feb 2023 16:29:59 -0800 Subject: [PATCH 4/4] [2/2][UI] Eng 2196 m2 allow user to configure wf specific notifications (#936) * Implement actual code to send email notifications (#943) * Implement actual code to send slack notifications (#944) --- src/golang/lib/notification/email.go | 27 +++- src/golang/lib/notification/slack.go | 75 ++++++++++- .../integrations/cards/slackCard.tsx | 2 +- .../WorkflowNotificationSettings.tsx | 124 ++++++++++++++++++ .../components/workflows/WorkflowSettings.tsx | 93 ++++++++++++- .../components/workflows/storageSelector.tsx | 2 +- src/ui/common/src/utils/data.ts | 2 +- src/ui/common/src/utils/workflows.tsx | 6 + 8 files changed, 321 insertions(+), 10 deletions(-) create mode 100644 src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx diff --git a/src/golang/lib/notification/email.go b/src/golang/lib/notification/email.go index ae9a80bf7..990a8b879 100644 --- a/src/golang/lib/notification/email.go +++ b/src/golang/lib/notification/email.go @@ -3,7 +3,9 @@ package notification import ( "context" "crypto/tls" + "fmt" "net/smtp" + "strings" "github.com/aqueducthq/aqueduct/lib/models" "github.com/aqueducthq/aqueduct/lib/models/shared" @@ -27,9 +29,30 @@ func (e *EmailNotification) Level() shared.NotificationLevel { return e.conf.Level } +func fullMessage(subject string, from string, targets []string, body string) string { + fullMsg := fmt.Sprintf("From: %s\n", from) + fullMsg += fmt.Sprintf("To: %s\n", strings.Join(targets, ",")) + fullMsg += fmt.Sprintf("Subject: %s\n\n", subject) + fullMsg += body + return fullMsg +} + func (e *EmailNotification) Send(ctx context.Context, msg string) error { - // TODO: Implement - return nil + auth := smtp.PlainAuth( + "", // identity + e.conf.User, + e.conf.Password, + e.conf.Host, + ) + + fullMsg := fullMessage("aqueduct notification", e.conf.User, e.conf.Targets, msg) + return smtp.SendMail( + e.conf.FullHost(), + auth, + e.conf.User, + e.conf.Targets, + []byte(fullMsg), + ) } func AuthenticateEmail(conf *shared.EmailConfig) error { diff --git a/src/golang/lib/notification/slack.go b/src/golang/lib/notification/slack.go index ce53f49c7..5c6294ee3 100644 --- a/src/golang/lib/notification/slack.go +++ b/src/golang/lib/notification/slack.go @@ -5,10 +5,13 @@ import ( "github.com/aqueducthq/aqueduct/lib/models" "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/dropbox/godropbox/errors" "github.com/google/uuid" "github.com/slack-go/slack" ) +const maxChannelLimit = 2000 + type SlackNotification struct { integration *models.Integration conf *shared.SlackConfig @@ -26,13 +29,83 @@ func (s *SlackNotification) Level() shared.NotificationLevel { return s.conf.Level } +// reference: https://stackoverflow.com/questions/50106263/slack-api-to-find-existing-channel +// We have to use list channel API together with a linear search. +func findChannels(client *slack.Client, names []string) ([]slack.Channel, error) { + channels, _ /* cursor */, err := client.GetConversations(&slack.GetConversationsParameters{ + ExcludeArchived: true, + Limit: maxChannelLimit, + }) + if err != nil { + return nil, err + } + + namesSet := make(map[string]bool, len(names)) + namesTaken := make(map[string]bool, len(names)) + for _, name := range names { + namesSet[name] = true + } + + // Slack channel names should be unique. We will still send notifications to all + // channels matching the given name. + results := make([]slack.Channel, 0, len(names)) + for _, channel := range channels { + _, ok := namesSet[channel.Name] + if ok { + results = append(results, channel) + namesTaken[channel.Name] = true + } + } + + if len(namesTaken) != len(namesSet) { + for name := range namesSet { + _, ok := namesTaken[name] + if !ok { + return nil, errors.Newf("Channel %s does not exist.", name) + } + } + } + + return results, nil +} + func (s *SlackNotification) Send(ctx context.Context, msg string) error { - // TODO: Implement + client := slack.New(s.conf.Token) + channels, err := findChannels(client, s.conf.Channels) + if err != nil { + return err + } + + for _, channel := range channels { + // reference: https://medium.com/@gausha/a-simple-slackbot-with-golang-c5a932d719c7 + _, _, _, err = client.SendMessage(channel.ID, slack.MsgOptionBlocks( + slack.NewSectionBlock( + slack.NewTextBlockObject( + "plain_text", + msg, + false, /* emoji */ + false, /* verbatim */ + ), + nil, + nil, + ), + )) + + if err != nil { + return err + } + } + return nil } func AuthenticateSlack(conf *shared.SlackConfig) error { client := slack.New(conf.Token) _, err := client.AuthTest() + if err != nil { + return err + } + + _, err = findChannels(client, conf.Channels) return err } diff --git a/src/ui/common/src/components/integrations/cards/slackCard.tsx b/src/ui/common/src/components/integrations/cards/slackCard.tsx index 299f35ead..df6380e8d 100644 --- a/src/ui/common/src/components/integrations/cards/slackCard.tsx +++ b/src/ui/common/src/components/integrations/cards/slackCard.tsx @@ -23,7 +23,7 @@ export const SlackCard: React.FC = ({ integration }) => { Level: - {config.level.toUpperCase()} + {config.level[0].toUpperCase() + config.level.slice(1)} ); diff --git a/src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx b/src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx new file mode 100644 index 000000000..4b1ee6541 --- /dev/null +++ b/src/ui/common/src/components/workflows/WorkflowNotificationSettings.tsx @@ -0,0 +1,124 @@ +import { faPlusSquare, faTrash } from '@fortawesome/free-solid-svg-icons'; +import { FontAwesomeIcon } from '@fortawesome/react-fontawesome'; +import { Box, MenuItem, Select, Typography } from '@mui/material'; +import React from 'react'; + +import { theme } from '../../styles/theme/theme'; +import { Integration } from '../../utils/integrations'; +import { NotificationLogLevel } from '../../utils/notifications'; +import { NotificationSettingsMap } from '../../utils/workflows'; +import NotificationLevelSelector from '../notifications/NotificationLevelSelector'; + +type SelectedNotificationEntryProps = { + remainingNotificationIntegrations: Integration[]; + selected: Integration; + level: NotificationLogLevel | undefined; + onSelect: ( + id: string, + level: NotificationLogLevel | undefined, + replacingID?: string + ) => void; + onRemove: (id: string) => void; +}; + +type Props = { + notificationIntegrations: Integration[]; + curSettingsMap: NotificationSettingsMap; + onSelect: ( + id: string, + level?: NotificationLogLevel, + replacingID?: string + ) => void; + onRemove: (id: string) => void; +}; + +export const SelectedNotificationEntry: React.FC< + SelectedNotificationEntryProps +> = ({ + remainingNotificationIntegrations, + selected, + level, + onSelect, + onRemove, +}) => { + return ( + + + + + onRemove(selected.id)} + /> + + + + onSelect(selected.id, level)} + /> + + + ); +}; + +const WorkflowNotificationSettings: React.FC = ({ + notificationIntegrations, + curSettingsMap, + onSelect, + onRemove, +}) => { + const selectedIDs = Object.keys(curSettingsMap); + const remainingIntegrations = notificationIntegrations.filter( + (x) => !selectedIDs.includes(x.id) + ); + const integrationsByID: { [id: string]: Integration } = {}; + notificationIntegrations.forEach((x) => (integrationsByID[x.id] = x)); + + const selectedEntries = Object.entries(curSettingsMap).map(([id, level]) => ( + + + + )); + + return ( + + {selectedEntries} + {remainingIntegrations.length > 0 && ( + 0 ? 2 : 1}> + onSelect(remainingIntegrations[0].id, undefined)} + /> + + )} + + ); +}; + +export default WorkflowNotificationSettings; diff --git a/src/ui/common/src/components/workflows/WorkflowSettings.tsx b/src/ui/common/src/components/workflows/WorkflowSettings.tsx index 65410eb6b..451bf93ae 100644 --- a/src/ui/common/src/components/workflows/WorkflowSettings.tsx +++ b/src/ui/common/src/components/workflows/WorkflowSettings.tsx @@ -51,10 +51,15 @@ import { getNextUpdateTime, PeriodUnit, } from '../../utils/cron'; +import { + IntegrationCategories, + SupportedIntegrations, +} from '../../utils/integrations'; import { UpdateMode } from '../../utils/operators'; import ExecutionStatus, { LoadingStatusEnum } from '../../utils/shared'; import { getSavedObjectIdentifier, + NotificationSettingsMap, RetentionPolicy, SavedObject, WorkflowDag, @@ -65,6 +70,7 @@ import { Button } from '../primitives/Button.styles'; import { LoadingButton } from '../primitives/LoadingButton.styles'; import StorageSelector from './storageSelector'; import TriggerSourceSelector from './triggerSourceSelector'; +import WorkflowNotificationSettings from './WorkflowNotificationSettings'; type PeriodicScheduleSelectorProps = { cronString: string; @@ -226,6 +232,28 @@ type WorkflowSettingsProps = { onClose: () => void; }; +// Returns whether `updated` is different from `existing`. +function IsNotificationSettingsMapUpdated( + curSettingsMap: NotificationSettingsMap, + newSettingsMap: NotificationSettingsMap +): boolean { + // Starting here, both `curSettings` and `newSettings` should be non-empty. + if ( + Object.keys(curSettingsMap).length !== Object.keys(newSettingsMap).length + ) { + return true; + } + + // both should have the same key size. Check k-v match + let updated = false; + Object.entries(curSettingsMap).forEach(([k, v]) => { + if (newSettingsMap[k] !== v) { + updated = true; + } + }); + return updated; +} + const WorkflowSettings: React.FC = ({ user, workflowDag, @@ -266,6 +294,16 @@ const WorkflowSettings: React.FC = ({ (state: RootState) => state.listWorkflowReducer.workflows ); + const integrations = useSelector( + (state: RootState) => state.integrationsReducer.integrations + ); + + const notificationIntegrations = Object.values(integrations).filter( + (x) => + SupportedIntegrations[x.service].category === + IntegrationCategories.NOTIFICATION + ); + const [name, setName] = useState(workflowDag.metadata?.name); const [description, setDescription] = useState( workflowDag.metadata?.description @@ -283,10 +321,15 @@ const WorkflowSettings: React.FC = ({ const [retentionPolicy, setRetentionPolicy] = useState( workflowDag.metadata?.retention_policy ); - const retentionPolicyUpdated = - retentionPolicy.k_latest_runs !== - workflowDag.metadata?.retention_policy?.k_latest_runs; + const [notificationSettingsMap, setNotificationSettingsMap] = + useState( + workflowDag.metadata?.notification_settings?.settings ?? {} + ); + // filter out empty key / values + const normalizedNotificationSettingsMap = Object.fromEntries( + Object.entries(notificationSettingsMap).filter(([k, v]) => !!k && !!v) + ); const initialSettings = { name: workflowDag.metadata?.name, description: workflowDag.metadata?.description, @@ -295,8 +338,19 @@ const WorkflowSettings: React.FC = ({ paused: workflowDag.metadata.schedule.paused, retentionPolicy: workflowDag.metadata?.retention_policy, sourceId: workflowDag.metadata?.schedule?.source_id, + notificationSettingsMap: + workflowDag.metadata?.notification_settings?.settings ?? {}, }; + const retentionPolicyUpdated = + retentionPolicy.k_latest_runs !== + workflowDag.metadata?.retention_policy?.k_latest_runs; + + const isNotificationSettingsUpdated = IsNotificationSettingsMapUpdated( + initialSettings.notificationSettingsMap, + normalizedNotificationSettingsMap + ); + const settingsChanged = name !== workflowDag.metadata?.name || // The workflow name has been changed. description !== workflowDag.metadata?.description || // The workflow description has changed. @@ -306,7 +360,8 @@ const WorkflowSettings: React.FC = ({ (triggerType === WorkflowUpdateTrigger.Cascade && // The trigger type is still cascade but the source has changed. sourceId !== workflowDag.metadata?.schedule?.source_id) || paused !== workflowDag.metadata.schedule.paused || // The schedule type is periodic and we've changed the pausedness of the workflow. - retentionPolicyUpdated; // retention policy has changed. + retentionPolicyUpdated || + isNotificationSettingsUpdated; // retention policy has changed. const triggerOptions = [ { label: 'Update Manually', value: WorkflowUpdateTrigger.Manual }, @@ -471,6 +526,9 @@ const WorkflowSettings: React.FC = ({ : '00000000-0000-0000-0000-000000000000', }, retention_policy: retentionPolicyUpdated ? retentionPolicy : undefined, + notification_settings: isNotificationSettingsUpdated + ? { settings: normalizedNotificationSettingsMap } + : undefined, }; fetch(`${apiAddress}/api/workflow/${workflowDag.workflow_id}/edit`, { @@ -911,6 +969,33 @@ const WorkflowSettings: React.FC = ({ + {notificationIntegrations.length > 0 && ( + + + Notifications + + + { + const newSettings = { ...notificationSettingsMap }; + newSettings[id] = level; + if (replacingID) { + delete newSettings[replacingID]; + } + + setNotificationSettingsMap(newSettings); + }} + onRemove={(id) => { + const newSettings = { ...notificationSettingsMap }; + delete newSettings[id]; + setNotificationSettingsMap(newSettings); + }} + /> + + )} + { const getMenuItems = () => { return ( Object.values(StorageType) as Array< - (typeof StorageType)[keyof typeof StorageType] + typeof StorageType[keyof typeof StorageType] > ).map((storageType) => { return ( diff --git a/src/ui/common/src/utils/data.ts b/src/ui/common/src/utils/data.ts index 76e2cc3b1..a077f1939 100644 --- a/src/ui/common/src/utils/data.ts +++ b/src/ui/common/src/utils/data.ts @@ -11,7 +11,7 @@ export const DataColumnTypeNames = [ 'object', ] as const; -export type DataColumnType = (typeof DataColumnTypeNames)[number]; +export type DataColumnType = typeof DataColumnTypeNames[number]; export type DataColumn = { /** diff --git a/src/ui/common/src/utils/workflows.tsx b/src/ui/common/src/utils/workflows.tsx index 5167b9382..2099f4c94 100644 --- a/src/ui/common/src/utils/workflows.tsx +++ b/src/ui/common/src/utils/workflows.tsx @@ -1,5 +1,6 @@ import { Artifact } from './artifacts'; import { EngineConfig } from './engine'; +import { NotificationLogLevel } from './notifications'; import { Load, normalizeOperator, @@ -97,6 +98,10 @@ export type WorkflowDagResultSummary = { workflow_dag_id: string; }; +export type NotificationSettingsMap = { [id: string]: NotificationLogLevel }; + +export type NotificationSettings = { settings: NotificationSettingsMap }; + export type Workflow = { id: string; user_id: string; @@ -105,6 +110,7 @@ export type Workflow = { schedule: WorkflowSchedule; created_at: number; retention_policy?: RetentionPolicy; + notification_settings?: NotificationSettings; }; export type WorkflowDag = {