Skip to content

Commit

Permalink
mimo changes
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl committed Oct 25, 2023
1 parent 9135e8d commit 71f9889
Show file tree
Hide file tree
Showing 7 changed files with 518 additions and 20 deletions.
5 changes: 2 additions & 3 deletions pkg/mimo/actuator/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/golang/mock/gomock"
"github.com/sirupsen/logrus"

"github.com/Azure/ARO-RP/pkg/api"
Expand Down Expand Up @@ -57,7 +58,6 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
})

BeforeAll(func() {

controller = gomock.NewController(nil)
_env = mock_env.NewMockInterface(controller)

Expand Down Expand Up @@ -133,7 +133,6 @@ var _ = Describe("MIMO Actuator", Ordered, func() {

errs := checker.CheckMaintenanceManifests(manifestsClient)
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))

})
})

Expand Down
59 changes: 59 additions & 0 deletions pkg/mimo/actuator/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package actuator

// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.

import (
"context"

"github.com/Azure/ARO-RP/pkg/api"
)

// controller updates the monitor document with the list of buckets balanced between
// registered workers
func (s *service) controller(ctx context.Context) error {
var doc *api.BucketServiceDocument
var err error

// if we know we're not the controller, attempt to gain the lease on the monitor
// document
if !s.isController {
doc, err = s.dbBucketServices.TryLease(ctx, s.serviceName)
if err != nil || doc == nil {
return err
}
s.isController = true
}

// we know we're not the controller; give up
if !s.isController {
return nil
}

// we think we're the controller. Gather up all the registered workers
// including ourself, balance buckets between them and write the bucket
// allocations to the database. If it turns out that we're not the controller,
// the patch will fail
_, err = s.dbBucketServices.PatchWithLease(ctx, doc.ID, func(doc *api.BucketServiceDocument) error {
docs, err := s.dbBucketServices.ListBucketServices(ctx, s.serviceName)
if err != nil {
return err
}

var workers []string
if docs != nil {
workers = make([]string, 0, len(docs.BucketServiceDocuments))
for _, doc := range docs.BucketServiceDocuments {
workers = append(workers, doc.ID)
}
}

doc.Buckets = s.b.Balance(workers, doc.Buckets)

return nil
})
if err != nil && err.Error() == "lost lease" {
s.isController = false
}
return err
}
122 changes: 108 additions & 14 deletions pkg/mimo/actuator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,132 @@ package actuator

import (
"context"
"sync"
"fmt"
"strings"
"time"

"github.com/Azure/go-autorest/autorest/to"
"github.com/sirupsen/logrus"
"k8s.io/client-go/rest"

"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/util/service"
)

type Actuator interface {
Process(context.Context, *api.OpenShiftClusterDocument) (bool, error)
AddTask(string, TaskFunc)
}

type actuator struct {
env env.Interface
log *logrus.Entry
env env.Interface
log *logrus.Entry
restConfig *rest.Config
now func() time.Time

oc database.OpenShiftClusters
mmf database.MaintenanceManifests

oc database.OpenShiftClusters
q service.Runnable
tasks map[string]TaskFunc
}

func NewActuator(
ctx context.Context,
_env env.Interface,
log *logrus.Entry,
oc database.OpenShiftClusters) (service.Runnable, error) {
restConfig *rest.Config,
oc database.OpenShiftClusters,
mmf database.MaintenanceManifests) (Actuator, error) {
a := &actuator{
env: _env,
log: log,
oc: oc,
env: _env,
restConfig: restConfig,
log: log,
oc: oc,
mmf: mmf,
tasks: make(map[string]TaskFunc),

now: time.Now,
}

a.q = service.NewWorkerQueue(ctx, log, _env, a.try)
return a.q, nil
return a, nil
}

func (a *actuator) AddTask(u string, t TaskFunc) {
a.tasks[u] = t
}

func (a *actuator) try(ctx context.Context, c *sync.Cond) (bool, error) {
return false, nil
func (a *actuator) Process(ctx context.Context, doc *api.OpenShiftClusterDocument) (bool, error) {
var err error
cID := strings.ToLower(doc.OpenShiftCluster.ID)

release := func() {
if doc.LeaseOwner == "" {
return
}
doc, err = a.oc.EndLease(ctx, doc.ResourceID, doc.OpenShiftCluster.Properties.ProvisioningState, doc.OpenShiftCluster.Properties.LastProvisioningState, nil)
if err != nil {
a.log.Error(err)
}
}

didWork := false

for {
task, err := a.mmf.Dequeue(ctx, cID)
if err != nil {
return didWork, err
}

if task == nil {
break
}

// renew/get the lease on the OpenShiftClusterDocument
doc, err = a.oc.Lease(ctx, cID)
if err != nil {
if err.Error() == "lost lease" {
return false, nil
}
return false, err
}
defer release()

evaluationTime := a.now()

if evaluationTime.Before(time.Unix(int64(task.MaintenanceManifest.RunAfter), 0)) {
continue
}

if evaluationTime.After(time.Unix(int64(task.MaintenanceManifest.RunBefore), 0)) {
// timed out, mark as such
a.log.Infof("marking %v as outdated: %v older than %v", task.ID, task.MaintenanceManifest.RunBefore, evaluationTime.UTC())

_, err := a.mmf.EndLease(ctx, cID, task.ID, api.MaintenanceManifestStateTimedOut, to.StringPtr(fmt.Sprintf("timed out at %s", evaluationTime.UTC())))
if err != nil {
return false, err
}
continue
}

// here
f, ok := a.tasks[task.MaintenanceManifest.MaintenanceSetID]
if !ok {
a.log.Infof("not found %v", task.MaintenanceManifest.MaintenanceSetID)
}

handler := &th{
db: a.oc,
env: a.env,
}
state, msg := f(ctx, handler, doc, task)
_, err = a.mmf.EndLease(ctx, cID, task.ID, state, &msg)
if err != nil {
a.log.Error(err)
}

didWork = true
}

return didWork, nil
}
149 changes: 149 additions & 0 deletions pkg/mimo/actuator/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package actuator

// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.

import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"

"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/proxy"
"github.com/Azure/ARO-RP/pkg/util/buckets"
"github.com/Azure/ARO-RP/pkg/util/heartbeat"
"github.com/Azure/ARO-RP/pkg/util/recover"
)

type Runnable interface {
Run(context.Context, <-chan struct{}, chan<- struct{}) error
}

type service struct {
dialer proxy.Dialer
baseLog *logrus.Entry
env env.Interface

dbBucketServices database.BucketServices
dbOpenShiftClusters database.OpenShiftClusters
dbMaintenanceManifests database.MaintenanceManifests
serviceName string

m metrics.Emitter
mu sync.RWMutex
b buckets.BucketWorker
isController bool
stopping *atomic.Bool

Check failure on line 44 in pkg/mimo/actuator/service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Bool not declared by package atomic (typecheck)

lastChangefeed atomic.Value //time.Time
lastBucketlist atomic.Value //time.Time
startTime time.Time
}

func NewService(log *logrus.Entry, dialer proxy.Dialer, dbBucketServices database.BucketServices, dbOpenShiftClusters database.OpenShiftClusters, dbMaintenanceManifests database.MaintenanceManifests, m metrics.Emitter) Runnable {
s := &service{
baseLog: log,
dialer: dialer,

dbBucketServices: dbBucketServices,
dbOpenShiftClusters: dbOpenShiftClusters,

m: m,
serviceName: "actuator",
stopping: &atomic.Bool{},

Check failure on line 61 in pkg/mimo/actuator/service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Bool not declared by package atomic (typecheck)

startTime: time.Now(),
}

s.b = buckets.NewBucketWorker(log, s.worker, &s.mu)
return s
}

func (s *service) Run(ctx context.Context, stop <-chan struct{}, done chan<- struct{}) error {
defer recover.Panic(s.baseLog)

_, err := s.dbBucketServices.Create(ctx, &api.BucketServiceDocument{
ServiceName: s.serviceName,
ServiceRole: "controller",
})
if err != nil && !cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) {
return err
}

// fill the cache from the database change feed
go s.changefeed(ctx, s.baseLog.WithField("component", "changefeed"), nil)

t := time.NewTicker(10 * time.Second)
defer t.Stop()

if stop != nil {
go func() {
defer recover.Panic(s.baseLog)

<-stop
s.baseLog.Print("stopping")
s.stopping.Store(true)
}()
}

go heartbeat.EmitHeartbeat(s.baseLog, s.m, s.serviceName+".heartbeat", nil, s.checkReady)

for {
if s.stopping.Load() {
break
}

// register ourself as a worker
err = s.dbBucketServices.BucketServiceHeartbeat(ctx, s.serviceName)
if err != nil {
s.baseLog.Error(err)
}

// try to become controller and share buckets across registered monitors
err = s.controller(ctx)
if err != nil {
s.baseLog.Error(err)
}

// read our bucket allocation from the controller
buckets, err := s.dbBucketServices.ListBuckets(ctx, s.serviceName)
s.b.LoadBuckets(buckets)
if err != nil {
s.baseLog.Error(err)
} else {
s.lastBucketlist.Store(time.Now())
}

<-t.C
}

s.baseLog.Print("exiting")
close(done)
return nil
}

// checkReady checks the ready status of the monitor to make it consistent
// across the /healthz/ready endpoint and emitted metrics. We wait for 2
// minutes before indicating health. This ensures that there will be a gap in
// our health metric if we crash or restart.
func (s *service) checkReady() bool {
lastBucketTime, ok := s.lastBucketlist.Load().(time.Time)
if !ok {
return false
}
lastChangefeedTime, ok := s.lastChangefeed.Load().(time.Time)
if !ok {
return false
}
return (time.Since(lastBucketTime) < time.Minute) && // did we list buckets successfully recently?
(time.Since(lastChangefeedTime) < time.Minute) && // did we process the change feed recently?
(time.Since(s.startTime) > 2*time.Minute) // are we running for at least 2 minutes?
}
Loading

0 comments on commit 71f9889

Please sign in to comment.