Skip to content

Commit

Permalink
Eng 2195 Implement application logic to send notifications (#926)
Browse files Browse the repository at this point in the history
* [2/2][UI] Eng 2193 Allow connecting to email as integration (#897)

* [1/2][Backend] Eng 2191 add slack integration to backend (#911)

* [2/2][UI] Eng 2191 Allow user to connect to slack integration (#910)

* Eng 2194 m2 implement go interface for (#923)

* [1/2][backend] Eng 2196 m2 allow user to configure notifications (#935)

* [2/2][UI] Eng 2196 m2 allow user to configure wf specific notifications (#936)

* Implement actual code to send email notifications (#943)

* Implement actual code to send slack notifications (#944)
  • Loading branch information
likawind authored Feb 6, 2023
1 parent 96b8a48 commit 17567bc
Show file tree
Hide file tree
Showing 26 changed files with 865 additions and 87 deletions.
3 changes: 3 additions & 0 deletions src/golang/cmd/executor/executor/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Repos struct {
DAGEdgeRepo repos.DAGEdge
DAGResultRepo repos.DAGResult
ExecutionEnvironmentRepo repos.ExecutionEnvironment
IntegrationRepo repos.Integration
NotificationRepo repos.Notification
OperatorRepo repos.Operator
OperatorResultRepo repos.OperatorResult
Expand All @@ -28,6 +29,7 @@ func createRepos() *Repos {
DAGEdgeRepo: sqlite.NewDAGEdgeRepo(),
DAGResultRepo: sqlite.NewDAGResultRepo(),
ExecutionEnvironmentRepo: sqlite.NewExecutionEnvironmentRepo(),
IntegrationRepo: sqlite.NewIntegrationRepo(),
NotificationRepo: sqlite.NewNotificationRepo(),
OperatorRepo: sqlite.NewOperatorRepo(),
OperatorResultRepo: sqlite.NewOperatorResultRepo(),
Expand All @@ -44,6 +46,7 @@ func getEngineRepos(repos *Repos) *engine.Repos {
DAGEdgeRepo: repos.DAGEdgeRepo,
DAGResultRepo: repos.DAGResultRepo,
ExecutionEnvironmentRepo: repos.ExecutionEnvironmentRepo,
IntegrationRepo: repos.IntegrationRepo,
NotificationRepo: repos.NotificationRepo,
OperatorRepo: repos.OperatorRepo,
OperatorResultRepo: repos.OperatorResultRepo,
Expand Down
33 changes: 19 additions & 14 deletions src/golang/cmd/server/handler/edit_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/engine"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/workflow"
"github.com/aqueducthq/aqueduct/lib/workflow/utils"
Expand Down Expand Up @@ -43,18 +44,20 @@ type EditWorkflowHandler struct {
}

type editWorkflowInput struct {
WorkflowName string `json:"name"`
WorkflowDescription string `json:"description"`
Schedule *col_workflow.Schedule `json:"schedule"`
RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"`
WorkflowName string `json:"name"`
WorkflowDescription string `json:"description"`
Schedule *col_workflow.Schedule `json:"schedule"`
RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"`
NotificationSettings *shared.NotificationSettings `json:"notification_settings"`
}

type editWorkflowArgs struct {
workflowId uuid.UUID
workflowName string
workflowDescription string
schedule *col_workflow.Schedule
retentionPolicy *col_workflow.RetentionPolicy
workflowId uuid.UUID
workflowName string
workflowDescription string
schedule *col_workflow.Schedule
retentionPolicy *col_workflow.RetentionPolicy
notificationSettings *shared.NotificationSettings
}

func (*EditWorkflowHandler) Name() string {
Expand Down Expand Up @@ -116,11 +119,12 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error)
}

return &editWorkflowArgs{
workflowId: workflowID,
workflowName: input.WorkflowName,
workflowDescription: input.WorkflowDescription,
schedule: input.Schedule,
retentionPolicy: input.RetentionPolicy,
workflowId: workflowID,
workflowName: input.WorkflowName,
workflowDescription: input.WorkflowDescription,
schedule: input.Schedule,
retentionPolicy: input.RetentionPolicy,
notificationSettings: input.NotificationSettings,
}, http.StatusOK, nil
}

Expand Down Expand Up @@ -173,6 +177,7 @@ func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfa
args.workflowDescription,
args.schedule,
args.retentionPolicy,
args.notificationSettings,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
Expand Down
1 change: 1 addition & 0 deletions src/golang/cmd/server/handler/register_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (h *RegisterWorkflowHandler) Perform(ctx context.Context, interfaceArgs int
dbWorkflowDag.Metadata.Description,
&dbWorkflowDag.Metadata.Schedule,
&dbWorkflowDag.Metadata.RetentionPolicy,
&dbWorkflowDag.Metadata.NotificationSettings,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
Expand Down
2 changes: 2 additions & 0 deletions src/golang/cmd/server/request/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func isUserOnlyIntegration(svc integration.Service) bool {
integration.GoogleSheets,
integration.Github,
integration.Conda,
integration.Email,
integration.Slack,
}
for _, s := range userSpecific {
if s == svc {
Expand Down
1 change: 1 addition & 0 deletions src/golang/cmd/server/server/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func GetEngineRepos(repos *Repos) *engine.Repos {
DAGEdgeRepo: repos.DAGEdgeRepo,
DAGResultRepo: repos.DAGResultRepo,
ExecutionEnvironmentRepo: repos.ExecutionEnvironmentRepo,
IntegrationRepo: repos.IntegrationRepo,
NotificationRepo: repos.NotificationRepo,
OperatorRepo: repos.OperatorRepo,
OperatorResultRepo: repos.OperatorResultRepo,
Expand Down
99 changes: 95 additions & 4 deletions src/golang/lib/engine/aq_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Repos struct {
DAGEdgeRepo repos.DAGEdge
DAGResultRepo repos.DAGResult
ExecutionEnvironmentRepo repos.ExecutionEnvironment
IntegrationRepo repos.Integration
NotificationRepo repos.Notification
OperatorRepo repos.Operator
OperatorResultRepo repos.OperatorResult
Expand Down Expand Up @@ -658,6 +659,7 @@ func (eng *aqEngine) EditWorkflow(
workflowDescription string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
) error {
changes := map[string]interface{}{}
if workflowName != "" {
Expand All @@ -672,6 +674,10 @@ func (eng *aqEngine) EditWorkflow(
changes[models.WorkflowRetentionPolicy] = retentionPolicy
}

if notificationSettings != nil {
changes[models.WorkflowNotificationSettings] = notificationSettings
}

if schedule.Trigger != "" {
cronjobName := shared_utils.AppendPrefix(workflowID.String())
err := eng.updateWorkflowSchedule(ctx, workflowID, cronjobName, schedule)
Expand Down Expand Up @@ -811,32 +817,77 @@ func (eng *aqEngine) executeWithEngine(
timeConfig,
opExecMode,
databricksJobManager,
vaultObject,
eng.IntegrationRepo,
eng.Database,
)
default:
return eng.execute(

ctx,
dag,
workflowRunMetadata,
timeConfig,
vaultObject,
opExecMode,
)
}
}

func onFinishExecution(
ctx context.Context,
inProgressOps map[uuid.UUID]operator.Operator,
pollInterval time.Duration,
cleanupTimeout time.Duration,
curErr error,
notificationContent *notificationContentStruct,
dag dag_utils.WorkflowDag,
execMode operator.ExecutionMode,
vaultObject vault.Vault,
integrationRepo repos.Integration,
DB database.Database,
) {
// Wait a little bit for all active operators to finish before exiting on failure.
waitForInProgressOperators(ctx, inProgressOps, pollInterval, cleanupTimeout)
if curErr != nil && notificationContent == nil {
notificationContent = &notificationContentStruct{
level: mdl_shared.ErrorNotificationLevel,
contextMsg: curErr.Error(),
}
}

// Send notifications
if notificationContent != nil && execMode == operator.Publish {
err := sendNotifications(
ctx,
dag,
notificationContent,
vaultObject,
integrationRepo,
DB,
)
if err != nil {
log.Errorf("Error sending notifications: %s", err)
}
}
}

func (eng *aqEngine) execute(
ctx context.Context,
workflowDag dag_utils.WorkflowDag,
workflowRunMetadata *WorkflowRunMetadata,
timeConfig *AqueductTimeConfig,
vaultObject vault.Vault,
opExecMode operator.ExecutionMode,
) error {
) (err error) {
// These are the operators of immediate interest. They either need to be scheduled or polled on.
inProgressOps := workflowRunMetadata.InProgressOps
completedOps := workflowRunMetadata.CompletedOps
dag := workflowDag
opToDependencyCount := workflowRunMetadata.OpToDependencyCount

var notificationContent *notificationContentStruct = nil
err = nil

// Kick off execution by starting all operators that don't have any inputs.
for _, op := range dag.Operators() {
if opToDependencyCount[op.ID()] == 0 {
Expand All @@ -848,8 +899,21 @@ func (eng *aqEngine) execute(
return errors.Newf("No initial operators to schedule.")
}

// Wait a little bit for all active operators to finish before exiting on failure.
defer waitForInProgressOperators(ctx, inProgressOps, timeConfig.OperatorPollInterval, timeConfig.CleanupTimeout)
defer func() {
onFinishExecution(
ctx,
inProgressOps,
timeConfig.OperatorPollInterval,
timeConfig.CleanupTimeout,
err,
notificationContent,
workflowDag,
opExecMode,
vaultObject,
eng.IntegrationRepo,
eng.Database,
)
}()

start := time.Now()

Expand Down Expand Up @@ -909,7 +973,27 @@ func (eng *aqEngine) execute(
}
}

notificationCtxMsg := ""
if execState.Error != nil {
notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context)
}

notificationContent = &notificationContentStruct{
level: mdl_shared.ErrorNotificationLevel,
contextMsg: notificationCtxMsg,
}

return opFailureError(*execState.FailureType, op)
} else if execState.Status == shared.FailedExecutionStatus {
notificationCtxMsg := ""
if execState.Error != nil {
notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context)
}

notificationContent = &notificationContentStruct{
level: mdl_shared.WarningNotificationLevel,
contextMsg: notificationCtxMsg,
}
}

// Add the operator to the completed stack, and remove it from the in-progress one.
Expand Down Expand Up @@ -961,6 +1045,13 @@ func (eng *aqEngine) execute(
return errors.Newf("Internal error: operator %s has a non-zero dep count %d.", opID, depCount)
}
}

// avoid overriding an existing notification (in practice, this is a warning)
if notificationContent == nil {
notificationContent = &notificationContentStruct{
level: mdl_shared.SuccessNotificationLevel,
}
}
return nil
}

Expand Down
63 changes: 58 additions & 5 deletions src/golang/lib/engine/databricks_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
"time"

"github.com/aqueducthq/aqueduct/lib/collections/shared"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/job"
mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/vault"
dag_utils "github.com/aqueducthq/aqueduct/lib/workflow/dag"
"github.com/aqueducthq/aqueduct/lib/workflow/operator"
"github.com/databricks/databricks-sdk-go/service/jobs"
Expand All @@ -28,9 +32,14 @@ func ExecuteDatabricks(
timeConfig *AqueductTimeConfig,
opExecMode operator.ExecutionMode,
databricksJobManager *job.DatabricksJobManager,
) error {
vaultObject vault.Vault,
integrationRepo repos.Integration,
DB database.Database,
) (err error) {
inProgressOps := workflowRunMetadata.InProgressOps
completedOps := workflowRunMetadata.CompletedOps
var notificationContent *notificationContentStruct = nil
err = nil

// Convert the operators into tasks
taskList, err := CreateTaskList(ctx, dag, workflowName, databricksJobManager)
Expand All @@ -56,8 +65,21 @@ func ExecuteDatabricks(
return errors.Newf("No initial operators to schedule.")
}

// Wait a little bit for all active operators to finish before exiting on failure.
defer waitForInProgressOperators(ctx, inProgressOps, timeConfig.OperatorPollInterval, timeConfig.CleanupTimeout)
defer func() {
onFinishExecution(
ctx,
inProgressOps,
timeConfig.OperatorPollInterval,
timeConfig.CleanupTimeout,
err,
notificationContent,
dag,
opExecMode,
vaultObject,
integrationRepo,
DB,
)
}()

start := time.Now()
var operatorError error
Expand All @@ -81,10 +103,32 @@ func ExecuteDatabricks(
return errors.Wrapf(err, "Error when finishing execution of operator %s", op.Name())
}
}

// Capture the first failed operator.
if shouldStopExecution(execState) {
if operatorError == nil {
operatorError = opFailureError(*execState.FailureType, op)
}

if shouldStopExecution(execState) && operatorError == nil {
operatorError = opFailureError(*execState.FailureType, op)
notificationCtxMsg := ""
if execState.Error != nil {
notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context)
}

notificationContent = &notificationContentStruct{
level: mdl_shared.ErrorNotificationLevel,
contextMsg: notificationCtxMsg,
}
} else if execState.Status == shared.FailedExecutionStatus {
notificationCtxMsg := ""
if execState.Error != nil {
notificationCtxMsg = fmt.Sprintf("%s\nContext:\n%s", execState.Error.Tip, execState.Error.Context)
}

notificationContent = &notificationContentStruct{
level: mdl_shared.WarningNotificationLevel,
contextMsg: notificationCtxMsg,
}
}

// Add the operator to the completed stack, and remove it from the in-progress one.
Expand All @@ -100,9 +144,18 @@ func ExecuteDatabricks(
if len(completedOps) != len(dag.Operators()) {
return errors.Newf("Internal error: %d operators were provided but only %d completed.", len(dag.Operators()), len(completedOps))
}

if operatorError != nil {
return operatorError
}

// avoid overriding an existing notification (in practice, this is a warning)
if notificationContent == nil {
notificationContent = &notificationContentStruct{
level: mdl_shared.SuccessNotificationLevel,
}
}

return nil
}

Expand Down
Loading

0 comments on commit 17567bc

Please sign in to comment.