Skip to content

Commit

Permalink
fix: fixed concurrency issue with profile collection
Browse files Browse the repository at this point in the history
The GatherAll profiles function was blocking the correct execution of
the procedure because the channel used to collect all the profiles from
a particular pod was not well managing errors

Signed-off-by: Gianluca Arbezzano <gianarb92@gmail.com>
  • Loading branch information
Gianluca Arbezzano committed Jan 3, 2020
1 parent a5a0874 commit 1d7dc6f
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 23 deletions.
4 changes: 3 additions & 1 deletion cmd/kubectl-profefe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"os"

"github.com/gianarb/kube-profefe/pkg/cmd"
"go.uber.org/zap"
"k8s.io/cli-runtime/pkg/genericclioptions"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)

func main() {
rootCmd := cmd.NewProfefeCmd(genericclioptions.IOStreams{
logger, _ := zap.NewDevelopment()
rootCmd := cmd.NewProfefeCmd(logger, genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/pprof/profile"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand All @@ -32,7 +33,7 @@ var (
ProfefeHostPortE string
)

func NewCaptureCmd(configFlag *genericclioptions.ConfigFlags, rbFlags *genericclioptions.ResourceBuilderFlags, streams genericclioptions.IOStreams) *cobra.Command {
func NewCaptureCmd(logger *zap.Logger, configFlag *genericclioptions.ConfigFlags, rbFlags *genericclioptions.ResourceBuilderFlags, streams genericclioptions.IOStreams) *cobra.Command {
captureCmd := &cobra.Command{
Use: "capture",
Short: "Capture gathers profiles for a pod or a set of them. If can filter by namespace and via label selector.",
Expand Down Expand Up @@ -134,7 +135,7 @@ func NewCaptureCmd(configFlag *genericclioptions.ConfigFlags, rbFlags *genericcl

println("gathering profiles for pod: " + target.Name)

profiles, err := pprofutil.GatherAllByPod(context.Background(), DefaultForwardHost, target, localPort)
profiles, err := pprofutil.GatherAllByPod(context.Background(), logger, DefaultForwardHost, target, localPort)
if err != nil {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/get_profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func NewGetProfilesCmd() *cobra.Command {
req.To = time.Now().UTC()
req.From = req.To.Add(-from).UTC()

fmt.Printf("FROM: %s - TO: %s\n", req.From.Format(time.RFC1123), req.To.Format(time.RFC1123))

resp, err := pClient.GetProfiles(ctx, req)
if err != nil {
return err
Expand Down
16 changes: 6 additions & 10 deletions pkg/cmd/kprofefe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"net/http"
"sync"
"time"

backoff "github.com/cenkalti/backoff/v3"
"github.com/gianarb/kube-profefe/pkg/kubeutil"
"github.com/gianarb/kube-profefe/pkg/pprofutil"
"github.com/gianarb/kube-profefe/pkg/profefe"
Expand Down Expand Up @@ -112,13 +112,15 @@ func NewKProfefeCmd(logger *zap.Logger, streams genericclioptions.IOStreams) *co
for {
pod, more := <-c
if more == false {
logger.Info("there are not pods to process. Closing goroutine...")
wg.Done()
return
}
ctx, cancel := context.WithTimeout(ctx, time.Second*40)
defer cancel()
do(ctx, logger, pClient, pod)
}
}(poolC)

}

for _, target := range selectedPods {
Expand Down Expand Up @@ -146,21 +148,15 @@ func do(ctx context.Context, l *zap.Logger, pClient *profefe.Client, target core
targetPort := pprofutil.GetProfefePortByPod(target)
var profiles map[pprofutil.Profile]*profile.Profile
var err error
err = backoff.Retry(func() error {
profiles, err = pprofutil.GatherAllByPod(context.Background(), fmt.Sprintf("http://%s", target.Status.PodIP), target, targetPort)
if err != nil {
return err
}
return nil
}, backoff.NewExponentialBackOff())
profiles, err = pprofutil.GatherAllByPod(ctx, logger, fmt.Sprintf("http://%s", target.Status.PodIP), target, targetPort)
if err != nil {
logger.Error("impossible to gather profiles", zap.Error(err))
return
}
for profileType, profile := range profiles {
profefeType := profefe.NewProfileTypeFromString(profileType.String())
if profefeType == profefe.UnknownProfile {
logger.Warn("Unknown profile type it can not be sent to profefe. Skip this profile")
logger.Warn("Unknown profile type it can not be sent to profefe. Skip this profile", zap.String("profile_type", profileType.String()))
continue
}
req := profefe.SavePprofRequest{
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/profefe.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/zap"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

Expand All @@ -12,7 +13,7 @@ type ProfefeCmdOptions struct {
genericclioptions.IOStreams
}

func NewProfefeCmd(streams genericclioptions.IOStreams) *cobra.Command {
func NewProfefeCmd(logger *zap.Logger, streams genericclioptions.IOStreams) *cobra.Command {
flags := pflag.NewFlagSet("kubectl-profefe", pflag.ExitOnError)
pflag.CommandLine = flags

Expand All @@ -36,7 +37,7 @@ func NewProfefeCmd(streams genericclioptions.IOStreams) *cobra.Command {
kubeResouceBuilderFlags.WithAllNamespaces(false)
kubeResouceBuilderFlags.AddFlags(flags)

captureCmd := NewCaptureCmd(kubeConfigFlags, kubeResouceBuilderFlags, streams)
captureCmd := NewCaptureCmd(logger, kubeConfigFlags, kubeResouceBuilderFlags, streams)
flagsCapture := pflag.NewFlagSet("kubectl-profefe-capture", pflag.ExitOnError)
flagsCapture.StringVar(&OutputDir, "output-dir", "/tmp", "Directory where to place the profiles")
captureCmd.Flags().AddFlagSet(flagsCapture)
Expand Down
18 changes: 10 additions & 8 deletions pkg/pprofutil/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

"github.com/google/pprof/profile"
Expand All @@ -15,43 +16,44 @@ const (
DefaultProfilePort = 6060
)

func GatherAllByPod(ctx context.Context, host string, pod corev1.Pod, forwardedPort int) (map[Profile]*profile.Profile, error) {
func GatherAllByPod(ctx context.Context, logger *zap.Logger, host string, pod corev1.Pod, forwardedPort int) (map[Profile]*profile.Profile, error) {
path := DefaultProfilePath
if rawPath, ok := pod.Annotations["profefe.com/path"]; ok && rawPath != "" {
path = rawPath
}
return GatherAll(ctx, fmt.Sprintf("%s:%d%s", host, forwardedPort, path))
return GatherAll(ctx, logger, fmt.Sprintf("%s:%d%s", host, forwardedPort, path))
}

// GatherAll downloads all profile types from address.
func GatherAll(ctx context.Context, addr string) (map[Profile]*profile.Profile, error) {
func GatherAll(ctx context.Context, logger *zap.Logger, addr string) (map[Profile]*profile.Profile, error) {
type res struct {
prof *profile.Profile
profileType Profile
err error
}
profileTypes := Profiles()
profiles := make(chan res, len(profileTypes))

for _, p := range profileTypes {
go func(ctx context.Context, addr string, p Profile) {
prof, err := Gather(ctx, addr, p)
if err != nil {
logger.With(zap.String("profefe_profile_type", p.String())).With(zap.Error(err)).Error("Impossible to gather the profile")
}
profiles <- res{prof, p, err}
}(ctx, addr, p)
}

var err error
profs := map[Profile]*profile.Profile{}
for i := 0; i < len(profileTypes); i++ {
p := <-profiles
if p.prof != nil {
profs[p.profileType] = p.prof
}
if p.err != nil {
err = p.err
}
}

return profs, err
close(profiles)
return profs, nil
}

func GetProfefePortByPod(pod corev1.Pod) int {
Expand Down

0 comments on commit 1d7dc6f

Please sign in to comment.