Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Refactor config and agent #426

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 116 additions & 26 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package agent
import (
"errors"
"fmt"
"log"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"time"

"github.com/namsral/flag"
flag "github.com/spf13/pflag"

"go.ligato.io/cn-infra/v2/config"
"go.ligato.io/cn-infra/v2/infra"
Expand Down Expand Up @@ -52,6 +53,8 @@ type Agent interface {
// close of quit channel (can be set via options) and then stops the agent.
// Returns nil if all the plugins were intialized and closed successfully.
Run() error
// Init applies given options to the agent.
Init(...Option)
// Start starts the agent with all the plugins, calling their Init() and optionally AfterInit().
// Returns nil if all the plugins were initialized successfully.
Start() error
Expand All @@ -60,7 +63,6 @@ type Agent interface {
Stop() error
// Options returns all agent's options configured via constructor.
Options() Options

// Wait waits until agent is stopped and returns same error as Stop().
Wait() error
// After returns a channel that is closed before the agents is stopped.
Expand All @@ -74,22 +76,7 @@ type Agent interface {
// NewAgent creates a new agent using given options and registers all flags
// defined for plugins via config.ForPlugin.
func NewAgent(opts ...Option) Agent {
options := newOptions(opts...)

if !flag.Parsed() {
config.DefineDirFlag()
for _, p := range options.Plugins {
name := p.String()
infraLogger.Debugf("registering flags for: %q", name)
config.DefineFlagsFor(name)
}
flag.Parse()
}

return &agent{
opts: options,
tracer: measure.NewTracer("agent-plugins"),
}
return newAgent(opts...)
}

type agent struct {
Expand All @@ -106,6 +93,31 @@ type agent struct {
curPlugin infra.Plugin
}

func newAgent(opts ...Option) *agent {
options := newOptions()
for _, o := range opts {
o(&options)
}
a := &agent{
opts: options,
tracer: measure.NewTracer("agent-plugins"),
}
return a
}

func (a *agent) Name() string {
return a.opts.Name
}

func (a *agent) Init(opts ...Option) {
for _, o := range opts {
o(&a.opts)
}
if err := a.setup(); err != nil {
agentLogger.Warn("setup failed:", err)
}
}

// Options returns the Options the agent was created with
func (a *agent) Options() Options {
return a.opts
Expand All @@ -130,11 +142,89 @@ func (a *agent) Run() error {
return a.Wait()
}

func (a *agent) setup() error {
if a.opts.initialized {
return nil
}
a.opts.initialized = true

if a.opts.Config == nil {
a.opts.Config = config.DefaultConfig
}

if err := a.parseFlags(); err != nil {
return fmt.Errorf("flags error: %w", err)
}
if err := a.loadConfig(); err != nil {
return fmt.Errorf("config error: %w", err)
}
return nil
}

func (a *agent) parseFlags() error {
flagSet := flag.NewFlagSet(a.Name(), flag.ExitOnError)
flagSet.SortFlags = false
//flags.SetOutput(ioutil.Discard)

infraLogger.Debugf("adding %d flags from CommandLine", strings.Count(flag.CommandLine.FlagUsages(), "\n"))
flagSet.AddFlagSet(flag.CommandLine)

// global flags
AddFlagsTo(flagSet)
//flags.AddGoFlagSet(goflag.CommandLine)
//flags.AddFlagSet(FlagSet)

for _, p := range a.opts.Plugins {
name := p.String()
infraLogger.Debugf("registering flags for: %q", name)
flagSet.AddFlagSet(config.GetFlagSetFor(name))
}

err := flagSet.Parse(os.Args[1:])
if errors.Is(err, flag.ErrHelp) {
os.Exit(2)
} else if err != nil {
//fmt.Fprintf(os.Stderr, "Usage of %s:\n", a.Name())
//fmt.Fprint(os.Stderr, flags.FlagUsages())
return err
}

ver, err := flagSet.GetBool("version")
if ver {
fmt.Fprintf(os.Stdout, "%s %s\n", a.opts.Name, a.opts.Version)
os.Exit(0)
} else if err != nil {
log.Println(err)
}

logLevel, err := flagSet.GetString("log-level")
if err == nil && logLevel != "" {
lvl, _ := logging.ParseLogLevel(logLevel)
agentLogger.Infoln("setting log level to:", lvl)
logging.DefaultLogger.SetLevel(lvl)
}

return nil
}

func (a *agent) loadConfig() error {
conf := a.opts.Config
if err := conf.Read(); err != nil {
return err
}
return nil
}

func (a *agent) starter() error {
if err := a.setup(); err != nil {
return err
}

agentLogger.WithFields(logging.Fields{
"CommitHash": CommitHash,
"BuildDate": BuildDate,
}).Infof("Starting agent version: %v", BuildVersion)
"Version": BuildVersion,
}).Infof("Starting %s", a.opts.Name)

// If we want to properly handle cleanup when a SIG comes in *during*
// agent startup (ie, clean up after its finished) we need to register
Expand All @@ -150,15 +240,15 @@ func (a *agent) starter() error {
go func() {
select {
case s := <-sig:
agentLogger.Infof("Signal %v received during agent start, stopping", s)
agentLogger.Infof("Signal %v received during starting, stopping", s)
os.Exit(1)
case <-started:
// agent started
case <-time.After(timeout):
a.mu.Lock()
curPlugin := a.curPlugin
a.mu.Unlock()
agentLogger.Errorf("Agent failed to start before timeout (%v) last plugin: %s", timeout, curPlugin)
agentLogger.Errorf("Failed to start before timeout (%v) last plugin: %s", timeout, curPlugin)
dumpStacktrace()
os.Exit(1)
}
Expand All @@ -176,8 +266,8 @@ func (a *agent) starter() error {
}
close(started)

agentLogger.Infof("Agent started with %d plugins (took %v)",
len(a.opts.Plugins), time.Since(t).Round(time.Millisecond))
agentLogger.Infof("Started %s with %d plugins (took %v)",
a.Name(), len(a.opts.Plugins), time.Since(t).Round(time.Millisecond))

a.stopCh = make(chan struct{}) // If we are started, we have a stopCh to signal stopping

Expand Down Expand Up @@ -207,7 +297,7 @@ func (a *agent) starter() error {
}

func (a *agent) start() error {
agentLogger.Debugf("starting %d plugins", len(a.opts.Plugins))
agentLogger.Debugf("starting init for %d plugins", len(a.opts.Plugins))

// Init plugins
for _, plugin := range a.opts.Plugins {
Expand Down Expand Up @@ -266,7 +356,7 @@ func (a *agent) start() error {
}

func (a *agent) stopper() error {
agentLogger.Infof("Stopping agent")
agentLogger.Infof("Stopping %s", a.Name())

stopped := make(chan struct{})
defer close(stopped)
Expand All @@ -288,7 +378,7 @@ func (a *agent) stopper() error {
return err
}

agentLogger.Info("Agent stopped")
agentLogger.Infof("Stopped %s", a.Name())

return nil
}
Expand Down
25 changes: 25 additions & 0 deletions agent/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agent

var (
DefaultAgent Agent = NewAgent()
)

func Init(opts ...Option) { DefaultAgent.Init(opts...) }
func Start() error { return DefaultAgent.Start() }
func Stop() error { return DefaultAgent.Stop() }
func Wait() error { return DefaultAgent.Wait() }
func Run() error { return DefaultAgent.Run() }
52 changes: 52 additions & 0 deletions agent/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agent

import (
flag "github.com/spf13/pflag"

"go.ligato.io/cn-infra/v2/config"
)

func init() {
config.SetDefault("log-level", "info")
config.BindEnv("log-level", "LOG_LEVEL")
config.SetDefault("config", "")
config.BindEnv("config", "CONFIG_FILE")
}

func AddFlagsTo(set *flag.FlagSet) {
set.StringP("config", "", "", "Config file location.")
set.StringP("log-level", "", "", "Set the logging level (debug|info|warn|error|fatal).")
set.BoolP("debug", "D", false, "Enable debug mode.")
set.BoolP("version", "V", false, "Print version and exit.")
}

/*
var (
FlagSet = flag.NewFlagSet("agent", flag.ExitOnError)

logLevelVal string
configVal string
versionVal bool
debugVal bool
)

func init() {
FlagSet.StringVarP(&logLevelVal, "log-level", "", "info", "Set the logging level (debug|info|warn|error|fatal).")
FlagSet.StringVarP(&configVal, "config", "", "", "Config file location.")
FlagSet.BoolVarP(&versionVal, "version", "V", false, "Print version and exit.")
FlagSet.BoolVarP(&debugVal, "debug", "D", false, "Enable debug mode.")
}*/
Loading