Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit f1e2476
Author: sonpham96 <sonpham1996@gmail.com>
Date:   Sat Mar 18 05:32:01 2023 +0700

    Upgrade Golang base image to 1.18 to remediate CVEs (#5035)

    Co-authored-by: David Porter <david.porter@uber.com>

commit 1519ace
Author: charlese-instaclustr <76502507+charlese-instaclustr@users.noreply.github.com>
Date:   Fri Mar 17 22:11:27 2023 +0000

    Fix type validation in configstore DC client value updating (#5110)

    * Remove misleading type check, Add more detailed log message

    * removing debugging logging

    * Handle nil update edge case

    ---------

    Co-authored-by: allenchen2244 <102192478+allenchen2244@users.noreply.github.com>
    Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>

commit a3e2774
Author: charlese-instaclustr <76502507+charlese-instaclustr@users.noreply.github.com>
Date:   Fri Mar 17 19:02:40 2023 +0000

    Add Canary TLS support (#5086)

    * add support for TLS connections by Canary, add development config for Canary with TLS

    * update README to include new config option

    * remove testing config

    ---------

    Co-authored-by: David Porter <david.porter@uber.com>
    Co-authored-by: Shijie Sheng <shengs@uber.com>
    Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>

commit ff4eab2
Author: Shijie Sheng <shengs@uber.com>
Date:   Thu Mar 16 20:10:54 2023 -0700

    [history] more cautious in deciding domain state to make decisions on dropping queued tasks (#5164)

    What changed?

    When domain cache returned entity not found error, don't drop queued tasks to be more conservative.

    Why?

    In cases when the cache is dubious, we shouldn't drop the queued tasks.

commit 55a8d93
Author: neil-xie <104041627+neil-xie@users.noreply.github.com>
Date:   Thu Mar 16 14:18:35 2023 -0700

    Add Pinot docker files, table config and schema (#5163)

    * Initial checkin for pinot config files

commit 1304570
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Thu Mar 16 15:20:29 2023 +0200

    Set poll interval for filebased dynamic config if not set (#5160)

    * Set poll interval for filebased dynamic config if not set

    * update unit test

commit 42a14b1
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Thu Mar 16 10:49:21 2023 +0200

    Elasticsearch: reduce code duplication (#5137)

    * Elasticsearch: reduce code duplication

    * address comments

    ---------

    Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>

commit cbf0d14
Author: bowen xiao <xbowen@uber.com>
Date:   Wed Mar 15 10:19:34 2023 -0700

    fix samples documentation (#5088)

commit ba19a29
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Wed Mar 15 12:52:29 2023 +0200

    Add ShardID to valid attributes (#5161)

commit a25cba8
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Wed Mar 15 10:56:50 2023 +0200

    ES: single interface for different ES/OpenSearch versions (#5158)

    * ES: single interface for different ES/OpenSearch versions

    * make fmt

commit e3ac246
Author: Ketsia <115650494+ketsiambaku@users.noreply.github.com>
Date:   Tue Mar 14 12:47:40 2023 -0700

    added logging with workflow/domain tags (#5159)

commit 9581488
Author: Ketsia <115650494+ketsiambaku@users.noreply.github.com>
Date:   Mon Mar 13 16:56:45 2023 -0700

    Consistent query pershard metric (#5143)

    * added and update consistent query per shard metric

    * testing pershard metric

    * move sample logger into persistence metric client for cleaness

    * fix test

    * fix lint

    * fix test again

    * fix lint

    * sample logging with workflowid tag

    * added domain tag to logger

    * metric completed

    * addressing comments

    * fix lint

    * Revert "fix lint"

    This reverts commit 1e96944.

    * fix lint second attempt

    ---------

    Co-authored-by: Allen Chen <allenchen2244@uber.com>
  • Loading branch information
davidporter-id-au committed Mar 18, 2023
1 parent 596e076 commit a31ccba
Show file tree
Hide file tree
Showing 25 changed files with 1,001 additions and 678 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ Then register a domain:
./cadence --do samples-domain domain register
```

### Sample Repo
The sample code is available in the [Sample repo]https://github.com/uber-common/cadence-samples

Then run a helloworld from [Go Client Sample](https://github.com/uber-common/cadence-samples/) or [Java Client Sample](https://github.com/uber/cadence-java-samples)

```
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ARG TARGET=server
ARG GOPROXY

# Build Cadence binaries
FROM golang:1.17.13-alpine3.15 AS builder
FROM golang:1.18.8-alpine3.15 AS builder

ARG RELEASE_VERSION

Expand Down
1 change: 1 addition & 0 deletions canary/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ cadence:
service: "cadence-frontend" # frontend service name
address: "127.0.0.1:7833" # frontend address
#host: "127.0.0.1:7933" # replace address with host if using Thrift for compatibility
#tlsCaFile: "path/to/file" # give file path to TLS CA file if TLS is enabled on the Cadence server
#metrics: ... # optional detailed client side metrics like workflow latency. But for monitoring, simply use server side metrics `workflow_success` is enough.
```
- **Metrics**: metrics configuration. Similar to server metric emitter, only M3/Statsd/Prometheus is supported.
Expand Down
2 changes: 2 additions & 0 deletions canary/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type (
ThriftHostNameAndPort string `yaml:"host"`
// gRPC host name and port
GRPCHostNameAndPort string `yaml:"address"`
// TLS cert file if TLS is enabled on the Cadence server
TLSCAFile string `yaml:"tlsCaFile"`
}
)

Expand Down
30 changes: 29 additions & 1 deletion canary/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@ import (
"sync"
"time"

"crypto/tls"
"crypto/x509"
"io/ioutil"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/compatibility"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"

Expand Down Expand Up @@ -59,10 +67,30 @@ func NewCanaryRunner(cfg *Config) (Runnable, error) {
var dispatcher *yarpc.Dispatcher
var runtimeContext *RuntimeContext
if cfg.Cadence.GRPCHostNameAndPort != "" {
var outbounds transport.Outbounds
if cfg.Cadence.TLSCAFile != "" {
caCert, err := ioutil.ReadFile(cfg.Cadence.TLSCAFile)
if err != nil {
logger.Fatal("Failed to load server CA certificate", zap.Error(err))
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
logger.Fatal("Failed to add server CA certificate", zap.Error(err))
}
tlsConfig := tls.Config{
RootCAs: caCertPool,
}
tlsCreds := credentials.NewTLS(&tlsConfig)
grpcTransport := grpc.NewTransport()
tlsChooser := peer.NewSingle(hostport.Identify(cfg.Cadence.GRPCHostNameAndPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds)))
outbounds = transport.Outbounds{Unary: grpcTransport.NewOutbound(tlsChooser)}
} else {
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(cfg.Cadence.GRPCHostNameAndPort)}
}
dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: CanaryServiceName,
Outbounds: yarpc.Outbounds{
cfg.Cadence.ServiceName: {Unary: grpc.NewTransport().NewSingleOutbound(cfg.Cadence.GRPCHostNameAndPort)},
cfg.Cadence.ServiceName: outbounds,
},
})
clientConfig := dispatcher.ClientConfig(cfg.Cadence.ServiceName)
Expand Down
2 changes: 1 addition & 1 deletion common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (csc *configStoreClient) GetListValue(name dc.ListKey, filters map[dc.Filte

func (csc *configStoreClient) UpdateValue(name dc.Key, value interface{}) error {
dcValues, ok := value.([]*types.DynamicConfigValue)
if !ok && dcValues != nil {
if !ok && value != nil {
return errors.New("invalid value")
}
return csc.updateValue(name, dcValues, csc.config.UpdateRetryAttempts)
Expand Down
4 changes: 3 additions & 1 deletion common/dynamicconfig/file_based_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,10 @@ func validateConfig(config *FileBasedClientConfig) error {
if _, err := os.Stat(config.Filepath); err != nil {
return fmt.Errorf("error checking dynamic config file at path %s, error: %v", config.Filepath, err)
}

// check if poll interval needs to be adjusted
if config.PollInterval < minPollInterval {
return fmt.Errorf("poll interval should be at least %v", minPollInterval)
config.PollInterval = minPollInterval
}
return nil
}
9 changes: 6 additions & 3 deletions common/dynamicconfig/file_based_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,14 @@ func (s *fileBasedClientSuite) TestValidateConfig_FileNotExist() {
}

func (s *fileBasedClientSuite) TestValidateConfig_ShortPollInterval() {
_, err := NewFileBasedClient(&FileBasedClientConfig{
cfg := &FileBasedClientConfig{
Filepath: "config/testConfig.yaml",
PollInterval: time.Second,
}, nil, nil)
s.Error(err)
}
_, err := NewFileBasedClient(cfg, log.NewNoop(), nil)
s.NoError(err)
s.Equal(minPollInterval, cfg.PollInterval, "fallback to default poll interval")

}

func (s *fileBasedClientSuite) TestMatch() {
Expand Down
63 changes: 63 additions & 0 deletions common/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package client

import (
"context"
"encoding/json"

"github.com/uber/cadence/common/elasticsearch"
)

// Client is a generic ES client implementation.
// This interface allows to use different Elasticsearch and OpenSearch versions
// without exposing implementation details and structs
type Client interface {
// ClearScroll clears the search context and results for a scrolling search.
ClearScroll(ctx context.Context, scrollID string) error
// Count returns number of document matches by given query
Count(ctx context.Context, index, body string) (int64, error)
// CreateIndex creates index with given name
CreateIndex(ctx context.Context, index string) error
// IsNotFoundError checks if error is a "not found"
IsNotFoundError(err error) bool
// PutMapping updates Client with new field mapping
PutMapping(ctx context.Context, index, body string) error
// RunBulkProcessor starts bulk indexing processor
// @TODO consider to extract Bulk Processor as a separate entity
RunBulkProcessor(ctx context.Context, p *elasticsearch.BulkProcessorParameters) (elasticsearch.GenericBulkProcessor, error)
// Scroll retrieves the next batch of results for a scrolling search.
Scroll(ctx context.Context, index, body, scrollID string) (*Response, error)
// Search returns Elasticsearch hit bytes and additional metadata
Search(ctx context.Context, index, body string) (*Response, error)
}

// Response is used to pass data retrieved from Elasticsearch/OpenSearch to upper layer
type Response struct {
TookInMillis int64
TotalHits int64
Hits [][]byte // response from ES server as bytes, used to unmarshal to internal structs
Aggregations map[string]json.RawMessage
Sort []interface{}
ScrollID string
}
Loading

0 comments on commit a31ccba

Please sign in to comment.