diff --git a/pkg/mimo/actuator/actuator_test.go b/pkg/mimo/actuator/actuator_test.go index e9ad0efe7cc..27b20605224 100644 --- a/pkg/mimo/actuator/actuator_test.go +++ b/pkg/mimo/actuator/actuator_test.go @@ -10,9 +10,10 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + "github.com/golang/mock/gomock" "github.com/sirupsen/logrus" "github.com/Azure/ARO-RP/pkg/api" @@ -57,7 +58,6 @@ var _ = Describe("MIMO Actuator", Ordered, func() { }) BeforeAll(func() { - controller = gomock.NewController(nil) _env = mock_env.NewMockInterface(controller) @@ -133,7 +133,6 @@ var _ = Describe("MIMO Actuator", Ordered, func() { errs := checker.CheckMaintenanceManifests(manifestsClient) Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs)) - }) }) diff --git a/pkg/mimo/actuator/controller.go b/pkg/mimo/actuator/controller.go new file mode 100644 index 00000000000..b3314e1c307 --- /dev/null +++ b/pkg/mimo/actuator/controller.go @@ -0,0 +1,59 @@ +package actuator + +// Copyright (c) Microsoft Corporation. +// Licensed under the Apache License 2.0. + +import ( + "context" + + "github.com/Azure/ARO-RP/pkg/api" +) + +// controller updates the monitor document with the list of buckets balanced between +// registered workers +func (s *service) controller(ctx context.Context) error { + var doc *api.BucketServiceDocument + var err error + + // if we know we're not the controller, attempt to gain the lease on the monitor + // document + if !s.isController { + doc, err = s.dbBucketServices.TryLease(ctx, s.serviceName) + if err != nil || doc == nil { + return err + } + s.isController = true + } + + // we know we're not the controller; give up + if !s.isController { + return nil + } + + // we think we're the controller. Gather up all the registered workers + // including ourself, balance buckets between them and write the bucket + // allocations to the database. If it turns out that we're not the controller, + // the patch will fail + _, err = s.dbBucketServices.PatchWithLease(ctx, doc.ID, func(doc *api.BucketServiceDocument) error { + docs, err := s.dbBucketServices.ListBucketServices(ctx, s.serviceName) + if err != nil { + return err + } + + var workers []string + if docs != nil { + workers = make([]string, 0, len(docs.BucketServiceDocuments)) + for _, doc := range docs.BucketServiceDocuments { + workers = append(workers, doc.ID) + } + } + + doc.Buckets = s.b.Balance(workers, doc.Buckets) + + return nil + }) + if err != nil && err.Error() == "lost lease" { + s.isController = false + } + return err +} diff --git a/pkg/mimo/actuator/manager.go b/pkg/mimo/actuator/manager.go index dbcc9a14132..b78531c9f07 100644 --- a/pkg/mimo/actuator/manager.go +++ b/pkg/mimo/actuator/manager.go @@ -5,38 +5,132 @@ package actuator import ( "context" - "sync" + "fmt" + "strings" + "time" + "github.com/Azure/go-autorest/autorest/to" "github.com/sirupsen/logrus" + "k8s.io/client-go/rest" + "github.com/Azure/ARO-RP/pkg/api" "github.com/Azure/ARO-RP/pkg/database" "github.com/Azure/ARO-RP/pkg/env" - "github.com/Azure/ARO-RP/pkg/util/service" ) +type Actuator interface { + Process(context.Context, *api.OpenShiftClusterDocument) (bool, error) + AddTask(string, TaskFunc) +} + type actuator struct { - env env.Interface - log *logrus.Entry + env env.Interface + log *logrus.Entry + restConfig *rest.Config + now func() time.Time + + oc database.OpenShiftClusters + mmf database.MaintenanceManifests - oc database.OpenShiftClusters - q service.Runnable + tasks map[string]TaskFunc } func NewActuator( ctx context.Context, _env env.Interface, log *logrus.Entry, - oc database.OpenShiftClusters) (service.Runnable, error) { + restConfig *rest.Config, + oc database.OpenShiftClusters, + mmf database.MaintenanceManifests) (Actuator, error) { a := &actuator{ - env: _env, - log: log, - oc: oc, + env: _env, + restConfig: restConfig, + log: log, + oc: oc, + mmf: mmf, + tasks: make(map[string]TaskFunc), + + now: time.Now, } - a.q = service.NewWorkerQueue(ctx, log, _env, a.try) - return a.q, nil + return a, nil +} + +func (a *actuator) AddTask(u string, t TaskFunc) { + a.tasks[u] = t } -func (a *actuator) try(ctx context.Context, c *sync.Cond) (bool, error) { - return false, nil +func (a *actuator) Process(ctx context.Context, doc *api.OpenShiftClusterDocument) (bool, error) { + var err error + cID := strings.ToLower(doc.OpenShiftCluster.ID) + + release := func() { + if doc.LeaseOwner == "" { + return + } + doc, err = a.oc.EndLease(ctx, doc.ResourceID, doc.OpenShiftCluster.Properties.ProvisioningState, doc.OpenShiftCluster.Properties.LastProvisioningState, nil) + if err != nil { + a.log.Error(err) + } + } + + didWork := false + + for { + task, err := a.mmf.Dequeue(ctx, cID) + if err != nil { + return didWork, err + } + + if task == nil { + break + } + + // renew/get the lease on the OpenShiftClusterDocument + doc, err = a.oc.Lease(ctx, cID) + if err != nil { + if err.Error() == "lost lease" { + return false, nil + } + return false, err + } + defer release() + + evaluationTime := a.now() + + if evaluationTime.Before(time.Unix(int64(task.MaintenanceManifest.RunAfter), 0)) { + continue + } + + if evaluationTime.After(time.Unix(int64(task.MaintenanceManifest.RunBefore), 0)) { + // timed out, mark as such + a.log.Infof("marking %v as outdated: %v older than %v", task.ID, task.MaintenanceManifest.RunBefore, evaluationTime.UTC()) + + _, err := a.mmf.EndLease(ctx, cID, task.ID, api.MaintenanceManifestStateTimedOut, to.StringPtr(fmt.Sprintf("timed out at %s", evaluationTime.UTC()))) + if err != nil { + return false, err + } + continue + } + + // here + f, ok := a.tasks[task.MaintenanceManifest.MaintenanceSetID] + if !ok { + a.log.Infof("not found %v", task.MaintenanceManifest.MaintenanceSetID) + } + + handler := &th{ + db: a.oc, + env: a.env, + } + state, msg := f(ctx, handler, doc, task) + _, err = a.mmf.EndLease(ctx, cID, task.ID, state, &msg) + if err != nil { + a.log.Error(err) + } + + didWork = true + } + + return didWork, nil } diff --git a/pkg/mimo/actuator/service.go b/pkg/mimo/actuator/service.go new file mode 100644 index 00000000000..94157999df9 --- /dev/null +++ b/pkg/mimo/actuator/service.go @@ -0,0 +1,149 @@ +package actuator + +// Copyright (c) Microsoft Corporation. +// Licensed under the Apache License 2.0. + +import ( + "context" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + + "github.com/Azure/ARO-RP/pkg/api" + "github.com/Azure/ARO-RP/pkg/database" + "github.com/Azure/ARO-RP/pkg/database/cosmosdb" + "github.com/Azure/ARO-RP/pkg/env" + "github.com/Azure/ARO-RP/pkg/metrics" + "github.com/Azure/ARO-RP/pkg/proxy" + "github.com/Azure/ARO-RP/pkg/util/buckets" + "github.com/Azure/ARO-RP/pkg/util/heartbeat" + "github.com/Azure/ARO-RP/pkg/util/recover" +) + +type Runnable interface { + Run(context.Context, <-chan struct{}, chan<- struct{}) error +} + +type service struct { + dialer proxy.Dialer + baseLog *logrus.Entry + env env.Interface + + dbBucketServices database.BucketServices + dbOpenShiftClusters database.OpenShiftClusters + dbMaintenanceManifests database.MaintenanceManifests + serviceName string + + m metrics.Emitter + mu sync.RWMutex + b buckets.BucketWorker + isController bool + stopping *atomic.Bool + + lastChangefeed atomic.Value //time.Time + lastBucketlist atomic.Value //time.Time + startTime time.Time +} + +func NewService(log *logrus.Entry, dialer proxy.Dialer, dbBucketServices database.BucketServices, dbOpenShiftClusters database.OpenShiftClusters, dbMaintenanceManifests database.MaintenanceManifests, m metrics.Emitter) Runnable { + s := &service{ + baseLog: log, + dialer: dialer, + + dbBucketServices: dbBucketServices, + dbOpenShiftClusters: dbOpenShiftClusters, + + m: m, + serviceName: "actuator", + stopping: &atomic.Bool{}, + + startTime: time.Now(), + } + + s.b = buckets.NewBucketWorker(log, s.worker, &s.mu) + return s +} + +func (s *service) Run(ctx context.Context, stop <-chan struct{}, done chan<- struct{}) error { + defer recover.Panic(s.baseLog) + + _, err := s.dbBucketServices.Create(ctx, &api.BucketServiceDocument{ + ServiceName: s.serviceName, + ServiceRole: "controller", + }) + if err != nil && !cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) { + return err + } + + // fill the cache from the database change feed + go s.changefeed(ctx, s.baseLog.WithField("component", "changefeed"), nil) + + t := time.NewTicker(10 * time.Second) + defer t.Stop() + + if stop != nil { + go func() { + defer recover.Panic(s.baseLog) + + <-stop + s.baseLog.Print("stopping") + s.stopping.Store(true) + }() + } + + go heartbeat.EmitHeartbeat(s.baseLog, s.m, s.serviceName+".heartbeat", nil, s.checkReady) + + for { + if s.stopping.Load() { + break + } + + // register ourself as a worker + err = s.dbBucketServices.BucketServiceHeartbeat(ctx, s.serviceName) + if err != nil { + s.baseLog.Error(err) + } + + // try to become controller and share buckets across registered monitors + err = s.controller(ctx) + if err != nil { + s.baseLog.Error(err) + } + + // read our bucket allocation from the controller + buckets, err := s.dbBucketServices.ListBuckets(ctx, s.serviceName) + s.b.LoadBuckets(buckets) + if err != nil { + s.baseLog.Error(err) + } else { + s.lastBucketlist.Store(time.Now()) + } + + <-t.C + } + + s.baseLog.Print("exiting") + close(done) + return nil +} + +// checkReady checks the ready status of the monitor to make it consistent +// across the /healthz/ready endpoint and emitted metrics. We wait for 2 +// minutes before indicating health. This ensures that there will be a gap in +// our health metric if we crash or restart. +func (s *service) checkReady() bool { + lastBucketTime, ok := s.lastBucketlist.Load().(time.Time) + if !ok { + return false + } + lastChangefeedTime, ok := s.lastChangefeed.Load().(time.Time) + if !ok { + return false + } + return (time.Since(lastBucketTime) < time.Minute) && // did we list buckets successfully recently? + (time.Since(lastChangefeedTime) < time.Minute) && // did we process the change feed recently? + (time.Since(s.startTime) > 2*time.Minute) // are we running for at least 2 minutes? +} diff --git a/pkg/mimo/actuator/task.go b/pkg/mimo/actuator/task.go new file mode 100644 index 00000000000..ffc25fbc582 --- /dev/null +++ b/pkg/mimo/actuator/task.go @@ -0,0 +1,32 @@ +package actuator + +// Copyright (c) Microsoft Corporation. +// Licensed under the Apache License 2.0. + +import ( + "context" + + "github.com/Azure/ARO-RP/pkg/api" + "github.com/Azure/ARO-RP/pkg/database" + "github.com/Azure/ARO-RP/pkg/env" +) + +type TaskHandler interface { + OpenShiftDatabase() database.OpenShiftClusters + Environment() env.Interface +} + +type th struct { + db database.OpenShiftClusters + env env.Interface +} + +func (t *th) OpenShiftDatabase() database.OpenShiftClusters { + return t.db +} + +func (t *th) Environment() env.Interface { + return t.env +} + +type TaskFunc func(context.Context, TaskHandler, *api.OpenShiftClusterDocument, *api.MaintenanceManifestDocument) (api.MaintenanceManifestState, string) diff --git a/pkg/mimo/actuator/worker.go b/pkg/mimo/actuator/worker.go new file mode 100644 index 00000000000..8d5810dce52 --- /dev/null +++ b/pkg/mimo/actuator/worker.go @@ -0,0 +1,152 @@ +package actuator + +// Copyright (c) Microsoft Corporation. +// Licensed under the Apache License 2.0. + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + + "github.com/Azure/ARO-RP/pkg/api" + utillog "github.com/Azure/ARO-RP/pkg/util/log" + "github.com/Azure/ARO-RP/pkg/util/recover" + "github.com/Azure/ARO-RP/pkg/util/restconfig" +) + +// changefeed tracks the OpenShiftClusters change feed and keeps mon.docs +// up-to-date. We don't monitor clusters in Creating state, hence we don't add +// them to mon.docs. We also don't monitor clusters in Deleting state; when +// this state is reached we delete from mon.docs +func (s *service) changefeed(ctx context.Context, baseLog *logrus.Entry, stop <-chan struct{}) { + defer recover.Panic(baseLog) + + clustersIterator := s.dbOpenShiftClusters.ChangeFeed() + + // Align this time with the deletion mechanism. + // Go to docs/monitoring.md for the details. + t := time.NewTicker(10 * time.Second) + defer t.Stop() + + for { + successful := true + for { + docs, err := clustersIterator.Next(ctx, -1) + if err != nil { + successful = false + baseLog.Error(err) + break + } + if docs == nil { + break + } + + s.mu.Lock() + + for _, doc := range docs.OpenShiftClusterDocuments { + ps := doc.OpenShiftCluster.Properties.ProvisioningState + fps := doc.OpenShiftCluster.Properties.FailedProvisioningState + + switch { + case ps == api.ProvisioningStateCreating, + ps == api.ProvisioningStateDeleting, + ps == api.ProvisioningStateFailed && + (fps == api.ProvisioningStateCreating || + fps == api.ProvisioningStateDeleting): + s.b.DeleteDoc(doc) + default: + // TODO: improve memory usage by storing a subset of doc in mon.docs + s.b.UpsertDoc(doc) + } + } + + s.mu.Unlock() + } + + if successful { + s.lastChangefeed.Store(time.Now()) + } + + select { + case <-t.C: + case <-stop: + return + } + } +} + +// worker reads clusters to be monitored and monitors them +func (s *service) worker(stop <-chan struct{}, delay time.Duration, id string) { + defer recover.Panic(s.baseLog) + + time.Sleep(delay) + + log := s.baseLog + { + s.mu.RLock() + v := s.b.Doc(id) + s.mu.RUnlock() + + if v == nil { + return + } + + log = utillog.EnrichWithResourceID(log, v.OpenShiftCluster.ID) + } + + log.Debug("starting service") + + t := time.NewTicker(time.Minute) + defer t.Stop() + + h := time.Now().Hour() + +out: + for { + s.mu.RLock() + v := s.b.Doc(id) + s.mu.RUnlock() + + if v == nil { + break + } + + newh := time.Now().Hour() + + s.workOne(context.Background(), log, v, newh != h) + + select { + case <-t.C: + case <-stop: + break out + } + + h = newh + } + + log.Debug("stopping actuator") +} + +// workOne checks the API server health of a cluster +func (s *service) workOne(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument, hourlyRun bool) { + restConfig, err := restconfig.RestConfig(s.dialer, doc.OpenShiftCluster) + if err != nil { + log.Error(err) + return + } + + m, err := NewActuator(ctx, s.env, log, restConfig, s.dbOpenShiftClusters, s.dbMaintenanceManifests) + if err != nil { + log.Error(err) + s.m.EmitGauge("actuator.cluster.failedworker", 1, map[string]string{ + "resourceId": doc.OpenShiftCluster.ID, + }) + return + } + + _, err = m.Process(ctx, doc) + if err != nil { + log.Error(err) + } +} diff --git a/pkg/mimo/cmd/cli.go b/pkg/mimo/cmd/cli.go index 22f750e42dc..7023c164f76 100644 --- a/pkg/mimo/cmd/cli.go +++ b/pkg/mimo/cmd/cli.go @@ -17,6 +17,7 @@ import ( "github.com/Azure/ARO-RP/pkg/metrics/statsd" "github.com/Azure/ARO-RP/pkg/metrics/statsd/golang" "github.com/Azure/ARO-RP/pkg/mimo/actuator" + "github.com/Azure/ARO-RP/pkg/proxy" utillog "github.com/Azure/ARO-RP/pkg/util/log" "github.com/Azure/ARO-RP/pkg/util/service" ) @@ -130,26 +131,38 @@ func main() { return err } + buckets, err := database.NewBucketServices(ctx.Context, dbc, dbName) + if err != nil { + return err + } + clusters, err := database.NewOpenShiftClusters(ctx.Context, dbc, dbName) if err != nil { return err } - a, err := actuator.NewActuator(ctx.Context, _env, _env.Logger(), clusters) + manifests, err := database.NewMaintenanceManifests(ctx.Context, dbc, dbName) if err != nil { return err } + dialer, err := proxy.NewDialer(_env.IsLocalDevelopmentMode()) + if err != nil { + return err + } + + a := actuator.NewService(_env.Logger(), dialer, buckets, clusters, manifests, m) + sigterm := make(chan os.Signal, 1) + done := make(chan struct{}) signal.Notify(sigterm, syscall.SIGTERM) - done := make(chan struct{}) go a.Run(ctx.Context, stop, done) <-sigterm log.Print("received SIGTERM") close(stop) - <-done + //<-done return nil },