Skip to content

Commit

Permalink
refactor: minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 26, 2024
1 parent 873690d commit b256c52
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 6 deletions.
7 changes: 5 additions & 2 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type Config struct {

// ChainReadTimeout is the timeout for reading payment state from chain
ChainReadTimeout time.Duration

// UpdateInterval is the interval for refreshing the on-chain state
UpdateInterval time.Duration
}

// Meterer handles payment accounting across different accounts. Disperser API server receives requests from clients and each request contains a blob header
Expand Down Expand Up @@ -47,9 +50,9 @@ func NewMeterer(
}

// Start starts to periodically refreshing the on-chain state
func (m *Meterer) Start(ctx context.Context) {
func (m *Meterer) Start(ctx context.Context, updateInterval time.Duration) {
go func() {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()

for {
Expand Down
2 changes: 1 addition & 1 deletion core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func setup(_ *testing.M) {
// metrics.NewNoopMetrics(),
)

mt.Start(context.Background())
mt.Start(context.Background(), config.UpdateInterval)
}

func teardown() {
Expand Down
4 changes: 2 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
}

// Disperse the blob
reply, err := s.disperseBlob(ctx, blob, authenticatedAddress, "DisperseBlobAuthenticated", &core.PaymentMetadata{})
reply, err := s.disperseBlob(ctx, blob, authenticatedAddress, "DisperseBlobAuthenticated", nil)
if err != nil {
// Note the disperseBlob already updated metrics for this error.
s.logger.Info("failed to disperse blob", "err", err)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
s.logger.Debug("received a new blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", "))

// If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available
if paymentHeader != nil && *paymentHeader != (core.PaymentMetadata{}) {
if paymentHeader != nil {
err := s.meterer.MeterRequest(ctx, *blob, *paymentHeader)
if err != nil {
return nil, api.NewErrorResourceExhausted(err.Error())
Expand Down
2 changes: 2 additions & 0 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Config struct {
RateConfig apiserver.RateConfig
EnableRatelimiter bool
EnablePaymentMeterer bool
UpdateInterval int
ChainReadTimeout int
ReservationsTableName string
OnDemandTableName string
GlobalRateTableName string
Expand Down
3 changes: 2 additions & 1 deletion disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func RunDisperserServer(ctx *cli.Context) error {
var meterer *mt.Meterer
if config.EnablePaymentMeterer {
mtConfig := mt.Config{
ChainReadTimeout: 3 * time.Second,
ChainReadTimeout: time.Duration(config.ChainReadTimeout) * time.Second,
UpdateInterval: time.Duration(config.UpdateInterval) * time.Second,
}

paymentChainState, err := mt.NewOnchainPaymentState(context.Background(), transactor)
Expand Down

0 comments on commit b256c52

Please sign in to comment.