Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/2][backend] Eng 2196 m2 allow user to configure notifications #935

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions src/golang/cmd/server/handler/edit_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/engine"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/workflow"
"github.com/aqueducthq/aqueduct/lib/workflow/utils"
Expand Down Expand Up @@ -43,18 +44,20 @@ type EditWorkflowHandler struct {
}

type editWorkflowInput struct {
WorkflowName string `json:"name"`
WorkflowDescription string `json:"description"`
Schedule *col_workflow.Schedule `json:"schedule"`
RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"`
WorkflowName string `json:"name"`
WorkflowDescription string `json:"description"`
Schedule *col_workflow.Schedule `json:"schedule"`
RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"`
NotificationSettings *shared.NotificationSettings `json:"notification_settings"`
}

type editWorkflowArgs struct {
workflowId uuid.UUID
workflowName string
workflowDescription string
schedule *col_workflow.Schedule
retentionPolicy *col_workflow.RetentionPolicy
workflowId uuid.UUID
workflowName string
workflowDescription string
schedule *col_workflow.Schedule
retentionPolicy *col_workflow.RetentionPolicy
notificationSettings *shared.NotificationSettings
}

func (*EditWorkflowHandler) Name() string {
Expand Down Expand Up @@ -116,11 +119,12 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error)
}

return &editWorkflowArgs{
workflowId: workflowID,
workflowName: input.WorkflowName,
workflowDescription: input.WorkflowDescription,
schedule: input.Schedule,
retentionPolicy: input.RetentionPolicy,
workflowId: workflowID,
workflowName: input.WorkflowName,
workflowDescription: input.WorkflowDescription,
schedule: input.Schedule,
retentionPolicy: input.RetentionPolicy,
notificationSettings: input.NotificationSettings,
}, http.StatusOK, nil
}

Expand Down Expand Up @@ -173,6 +177,7 @@ func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfa
args.workflowDescription,
args.schedule,
args.retentionPolicy,
args.notificationSettings,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
Expand Down
1 change: 1 addition & 0 deletions src/golang/cmd/server/handler/register_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (h *RegisterWorkflowHandler) Perform(ctx context.Context, interfaceArgs int
dbWorkflowDag.Metadata.Description,
&dbWorkflowDag.Metadata.Schedule,
&dbWorkflowDag.Metadata.RetentionPolicy,
&dbWorkflowDag.Metadata.NotificationSettings,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
Expand Down
5 changes: 5 additions & 0 deletions src/golang/lib/engine/aq_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ func (eng *aqEngine) EditWorkflow(
workflowDescription string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
) error {
changes := map[string]interface{}{}
if workflowName != "" {
Expand All @@ -673,6 +674,10 @@ func (eng *aqEngine) EditWorkflow(
changes[models.WorkflowRetentionPolicy] = retentionPolicy
}

if notificationSettings != nil {
changes[models.WorkflowNotificationSettings] = notificationSettings
}

if schedule.Trigger != "" {
cronjobName := shared_utils.AppendPrefix(workflowID.String())
err := eng.updateWorkflowSchedule(ctx, workflowID, cronjobName, schedule)
Expand Down
2 changes: 2 additions & 0 deletions src/golang/lib/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/database"
exec_env "github.com/aqueducthq/aqueduct/lib/execution_environment"
"github.com/aqueducthq/aqueduct/lib/models"
mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -50,6 +51,7 @@ type Engine interface {
workflowDescription string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
) error

// TODO ENG-1444: Used as a wrapper to trigger a workflow via executor binary.
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/engine/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func sendNotifications(
}

msg := notificationMsg(wfDag.Name(), content.level, content.contextMsg)
workflowSettings := wfDag.NotificationSettings()
workflowSettings := wfDag.NotificationSettings().Settings
for _, notificationObj := range notifications {
if len(workflowSettings) > 0 {
// send based on settings
Expand Down
32 changes: 6 additions & 26 deletions src/golang/lib/models/shared/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,20 @@ import (
)

// `NotificationSettings` maps IntegrationID to NotificationLevel
type NotificationSettings map[uuid.UUID]NotificationLevel
// This has to be a struct since sql driver does not support map type.
type NotificationSettings struct {
Settings map[uuid.UUID]NotificationLevel `json:"settings"`
}

func (s *NotificationSettings) Value() (driver.Value, error) {
return utils.ValueJSONB(*s)
}

func (s *NotificationSettings) Scan(value interface{}) error {
return utils.ScanJSONB(value, s)
}

type NullNotificationSettings struct {
NotificationSettings
IsNull bool
}

func (n *NullNotificationSettings) Value() (driver.Value, error) {
if n.IsNull {
return nil, nil
}

return (&n.NotificationSettings).Value()
}

func (n *NullNotificationSettings) Scan(value interface{}) error {
if value == nil {
n.IsNull = true
s.Settings = nil
return nil
}

s := &NotificationSettings{}
if err := s.Scan(value); err != nil {
return err
}

n.NotificationSettings, n.IsNull = *s, false
return nil
return utils.ScanJSONB(value, s)
}
16 changes: 8 additions & 8 deletions src/golang/lib/models/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ const (

// A Workflow maps to the workflow table.
type Workflow struct {
ID uuid.UUID `db:"id" json:"id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Schedule workflow.Schedule `db:"schedule" json:"schedule"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"`
NotificationSettings shared.NullNotificationSettings `db:"notification_settings" json:"notification_settings"`
ID uuid.UUID `db:"id" json:"id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Schedule workflow.Schedule `db:"schedule" json:"schedule"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"`
NotificationSettings shared.NotificationSettings `db:"notification_settings" json:"notification_settings"`
}

// WorkflowCols returns a comma-separated string of all Workflow columns.
Expand Down
27 changes: 25 additions & 2 deletions src/golang/lib/notification/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package notification
import (
"context"
"crypto/tls"
"fmt"
"net/smtp"
"strings"

"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
Expand All @@ -27,9 +29,30 @@ func (e *EmailNotification) Level() shared.NotificationLevel {
return e.conf.Level
}

func fullMessage(subject string, from string, targets []string, body string) string {
fullMsg := fmt.Sprintf("From: %s\n", from)
fullMsg += fmt.Sprintf("To: %s\n", strings.Join(targets, ","))
fullMsg += fmt.Sprintf("Subject: %s\n\n", subject)
fullMsg += body
return fullMsg
}

func (e *EmailNotification) Send(ctx context.Context, msg string) error {
// TODO: Implement
return nil
auth := smtp.PlainAuth(
"", // identity
e.conf.User,
e.conf.Password,
e.conf.Host,
)

fullMsg := fullMessage("aqueduct notification", e.conf.User, e.conf.Targets, msg)
return smtp.SendMail(
e.conf.FullHost(),
auth,
e.conf.User,
e.conf.Targets,
[]byte(fullMsg),
)
}

func AuthenticateEmail(conf *shared.EmailConfig) error {
Expand Down
75 changes: 74 additions & 1 deletion src/golang/lib/notification/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (

"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
"github.com/slack-go/slack"
)

const maxChannelLimit = 2000

type SlackNotification struct {
integration *models.Integration
conf *shared.SlackConfig
Expand All @@ -26,13 +29,83 @@ func (s *SlackNotification) Level() shared.NotificationLevel {
return s.conf.Level
}

// reference: https://stackoverflow.com/questions/50106263/slack-api-to-find-existing-channel
// We have to use list channel API together with a linear search.
func findChannels(client *slack.Client, names []string) ([]slack.Channel, error) {
channels, _ /* cursor */, err := client.GetConversations(&slack.GetConversationsParameters{
ExcludeArchived: true,
Limit: maxChannelLimit,
})
if err != nil {
return nil, err
}

namesSet := make(map[string]bool, len(names))
namesTaken := make(map[string]bool, len(names))
for _, name := range names {
namesSet[name] = true
}

// Slack channel names should be unique. We will still send notifications to all
// channels matching the given name.
results := make([]slack.Channel, 0, len(names))
for _, channel := range channels {
_, ok := namesSet[channel.Name]
if ok {
results = append(results, channel)
namesTaken[channel.Name] = true
}
}

if len(namesTaken) != len(namesSet) {
for name := range namesSet {
_, ok := namesTaken[name]
if !ok {
return nil, errors.Newf("Channel %s does not exist.", name)
}
}
}

return results, nil
}

func (s *SlackNotification) Send(ctx context.Context, msg string) error {
// TODO: Implement
client := slack.New(s.conf.Token)
channels, err := findChannels(client, s.conf.Channels)
if err != nil {
return err
}

for _, channel := range channels {
// reference: https://medium.com/@gausha/a-simple-slackbot-with-golang-c5a932d719c7
_, _, _, err = client.SendMessage(channel.ID, slack.MsgOptionBlocks(
slack.NewSectionBlock(
slack.NewTextBlockObject(
"plain_text",
msg,
false, /* emoji */
false, /* verbatim */
),
nil,
nil,
),
))

if err != nil {
return err
}
}

return nil
}

func AuthenticateSlack(conf *shared.SlackConfig) error {
client := slack.New(conf.Token)
_, err := client.AuthTest()
if err != nil {
return err
}

_, err = findChannels(client, conf.Channels)
return err
}
5 changes: 4 additions & 1 deletion src/golang/lib/repos/sqlite/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/collections/workflow"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/models"
mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/models/views"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/dropbox/godropbox/errors"
Expand Down Expand Up @@ -253,6 +254,7 @@ func (*workflowWriter) Create(
description string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
DB database.Database,
) (*models.Workflow, error) {
cols := []string{
Expand All @@ -263,6 +265,7 @@ func (*workflowWriter) Create(
models.WorkflowSchedule,
models.WorkflowCreatedAt,
models.WorkflowRetentionPolicy,
models.WorkflowNotificationSettings,
}
query := DB.PrepareInsertWithReturnAllStmt(models.WorkflowTable, cols, models.WorkflowCols())

Expand All @@ -271,7 +274,7 @@ func (*workflowWriter) Create(
return nil, err
}

args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy}
args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy, notificationSettings}
return getWorkflow(ctx, DB, query, args...)
}

Expand Down
1 change: 1 addition & 0 deletions src/golang/lib/repos/tests/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (ts *TestSuite) seedWorkflowWithUser(count int, userIDs []uuid.UUID) []mode
description,
schedule,
retentionPolicy,
&shared.NotificationSettings{},
ts.DB,
)
require.Nil(ts.T(), err)
Expand Down
Loading