Skip to content

Commit

Permalink
feat: publish spot price for token swap event (#8780)
Browse files Browse the repository at this point in the history
* publish spot price for swap event

* comments updated

* dummy changes to unblock the stuck automated actions
  • Loading branch information
cryptomatictrader authored Oct 21, 2024
1 parent 56ddf9d commit a110bc5
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 0 deletions.
1 change: 1 addition & 0 deletions ingest/indexer/domain/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type PoolManagerKeeperI interface {
GetTradingPairTakerFee(ctx sdk.Context, denom0, denom1 string) (osmomath.Dec, error)
GetTotalPoolLiquidity(ctx sdk.Context, poolId uint64) (sdk.Coins, error)
GetPool(ctx sdk.Context, poolId uint64) (types.PoolI, error)
RouteCalculateSpotPrice(ctx sdk.Context, poolId uint64, quoteAssetDenom string, baseAssetDenom string) (price osmomath.BigDec, err error)
}

type Keepers struct {
Expand Down
4 changes: 4 additions & 0 deletions ingest/indexer/service/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ func (s *indexerStreamingService) AdjustTokenInAmountBySpreadFactor(ctx context.
func (s *indexerStreamingService) TrackCreatedPoolID(event abci.Event, blockHeight int64, blockTime time.Time, txHash string) error {
return s.trackCreatedPoolID(event, blockHeight, blockTime, txHash)
}

func (s *indexerStreamingService) SetSpotPrice(ctx context.Context, event *abci.Event) error {
return s.setSpotPrice(ctx, event)
}
63 changes: 63 additions & 0 deletions ingest/indexer/service/indexer_streaming_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,61 @@ func (s *indexerStreamingService) publishBlock(ctx context.Context, req abci.Req
return s.client.PublishBlock(sdkCtx, block)
}

// setSpotPrice sets the spot price for the token swapped event in the event's attributes map
// This approach ensures a reliable and consistent way to provide PriceNative data for token_swapped events.
// Using the event's token amount to provide priceNative may introduce rounding errors, especially with small amounts.
// Additionally, determining PriceNative from pool reserves is not applicable to all pool types (e.g., CL pools).
// Please note the spot price is set in the event's attributes map, keyed by "quote_tokenin_base_tokenout",
// which means it's a quote using tokenin denom using tokenout as the base denom, it may require reversing in the /events endpoint
func (s *indexerStreamingService) setSpotPrice(ctx context.Context, event *abci.Event) error {
var poolId string
var tokensIn sdk.Coin
var tokensOut sdk.Coin
// Find the pool id, tokens in and tokens out from the event attributes
for _, attribute := range event.Attributes {
if attribute.Key == concentratedliquiditytypes.AttributeKeyPoolId {
poolId = attribute.Value
}
if attribute.Key == concentratedliquiditytypes.AttributeKeyTokensIn {
var err error
tokensIn, err = sdk.ParseCoinNormalized(attribute.Value)
if err != nil {
s.logger.Error("Error parsing tokens in", "error", err)
continue
}
}
if attribute.Key == concentratedliquiditytypes.AttributeKeyTokensOut {
var err error
tokensOut, err = sdk.ParseCoinNormalized(attribute.Value)
if err != nil {
s.logger.Error("Error parsing tokens out", "error", err)
continue
}
}
if !tokensIn.IsNil() && !tokensOut.IsNil() && poolId != "" {
break
}
}
if poolId == "" || tokensIn.IsNil() || tokensOut.IsNil() {
return fmt.Errorf("pool ID, tokens in, or tokens out not found in token_swapped event")
}
poolIdUint, err := strconv.ParseUint(poolId, 10, 64)
if err != nil {
return fmt.Errorf("error parsing pool ID %v", err)
}
// Get the spot price from the pool manager keeper
spotPrice, err := s.keepers.PoolManagerKeeper.RouteCalculateSpotPrice(sdk.UnwrapSDKContext(ctx), poolIdUint, tokensIn.Denom, tokensOut.Denom)
if err != nil {
return fmt.Errorf("error getting spot price %v", err)
}
// Set the spot price in the event's attributes map
event.Attributes = append(event.Attributes, abci.EventAttribute{
Key: "quote_tokenin_base_tokenout",
Value: spotPrice.String(),
})
return nil
}

// publishTxn iterates through the transactions in the block and publishes them to the indexer backend.
func (s *indexerStreamingService) publishTxn(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error {
sdkCtx := sdk.UnwrapSDKContext(ctx)
Expand Down Expand Up @@ -158,6 +213,14 @@ func (s *indexerStreamingService) publishTxn(ctx context.Context, req abci.Reque
continue
}
eventType := clonedEvent.Type
if eventType == gammtypes.TypeEvtTokenSwapped {
// Set the spot price for the token swapped event in the event's attributes map
err := s.setSpotPrice(ctx, clonedEvent)
if err != nil {
s.logger.Error("Error setting spot price", "error", err)
continue
}
}
if eventType == gammtypes.TypeEvtTokenSwapped || eventType == gammtypes.TypeEvtPoolJoined || eventType == gammtypes.TypeEvtPoolExited || eventType == concentratedliquiditytypes.TypeEvtCreatePosition || eventType == concentratedliquiditytypes.TypeEvtWithdrawPosition {
includedEvents = append(includedEvents, domain.EventWrapper{Index: i, Event: *clonedEvent})
}
Expand Down
170 changes: 170 additions & 0 deletions ingest/indexer/service/indexer_streaming_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,163 @@ func (s *IndexerServiceTestSuite) TestAddTokenLiquidity() {
}
}

func (s *IndexerServiceTestSuite) TestSetSpotPrice() {
testCases := []struct {
name string // Test case name
eventType string // Event type to be tested. Only gammtypes.TypeEvtTokenSwapped is valid
poolID string // pool_id attribute value
tokenIn string // token_in attribute value
tokenOut string // token_out attribute value
expectedError bool // Expected error flag
expectedSpotPrice bool // Expected spot price flag
}{
{
name: "happy path",
eventType: gammtypes.TypeEvtTokenSwapped,
poolID: "3",
tokenIn: "1000bar",
tokenOut: "1000foo",
expectedError: false,
expectedSpotPrice: true,
},
{
name: "error when no pool_id attribute",
eventType: gammtypes.TypeEvtTokenSwapped,
poolID: "",
tokenIn: "1000bar",
tokenOut: "1000foo",
expectedError: true,
expectedSpotPrice: false,
},
{
name: "error when no token_in attribute",
eventType: gammtypes.TypeEvtTokenSwapped,
poolID: "3",
tokenIn: "",
tokenOut: "1000foo",
expectedError: true,
expectedSpotPrice: false,
},
{
name: "error when no token_out attribute",
eventType: gammtypes.TypeEvtTokenSwapped,
poolID: "3",
tokenIn: "1000bar",
tokenOut: "",
expectedError: true,
expectedSpotPrice: false,
},
}
for _, tc := range testCases {
s.Run(tc.name, func() {
s.Setup()

// This test suite is to test the SetSpotPrice method in the indexer streaming service.
// where it applies to: token_swapped event only, i.e. gammtypes.TypeEvtTokenSwapped
// It then looks for the pool_id, token_in, and token_out attributes, i.e.
// (concentratedliquiditytypes.AttributeKeyPoolId, concentratedliquiditytypes.AttributeKeyTokensIn, and concentratedliquiditytypes.AttributeKeyTokensOut)
// in the event attribute map. With the pool_id, token in and out denom, it then fetches the spot price thru
// keepers.PoolManagerKeeper::RouteCalculateSpotPrice function. The spot price data is then appended
// to the event attribute map with the key "quote_tokenin_base_tokenout", value being the spot price.

// Initialized chain pools
s.PrepareAllSupportedPools()

// Get all chain pools from state for asserting later
concentratedPools, err := s.App.ConcentratedLiquidityKeeper.GetPools(s.Ctx)

s.Require().NoError(err)

cfmmPools, err := s.App.GAMMKeeper.GetPools(s.Ctx)
s.Require().NoError(err)

cosmWasmPools, err := s.App.CosmwasmPoolKeeper.GetPoolsWithWasmKeeper(s.Ctx)
s.Require().NoError(err)

// Initialize a mock block update process utils
blockUpdatesProcessUtilsMock := &sqsmocks.BlockUpdateProcessUtilsMock{}

// Initialize an empty pool tracker
emptyPoolTracker := pooltracker.NewMemory()

// Initialize a mock pool extractor
poolExtractorMock := &sqsmocks.PoolsExtractorMock{
BlockPools: commondomain.BlockPools{
ConcentratedPools: concentratedPools,
CFMMPools: cfmmPools,
CosmWasmPools: cosmWasmPools,
},
}

// Initialize a block process strategy manager
blockProcessStrategyManager := commondomain.NewBlockProcessStrategyManager()

// Initialize keepers
keepers := indexerdomain.Keepers{
PoolManagerKeeper: s.App.PoolManagerKeeper,
}

// Initialize tx decoder and logger
txDecoder := s.App.GetTxConfig().TxDecoder()
logger := s.App.Logger()

// Initialize a mock publisher
publisherMock := &indexermocks.PublisherMock{}

// Initialize the node status checker mock
nodeStatusCheckerMock := &commonmocks.NodeStatusCheckerMock{}

// Initialize an indexer streaming service
indexerStreamingService := indexerservice.New(
blockUpdatesProcessUtilsMock,
blockProcessStrategyManager,
publisherMock,
emptyStoreKeyMap,
poolExtractorMock,
emptyPoolTracker,
keepers,
txDecoder,
nodeStatusCheckerMock,
logger)

// Create the event based on the test cases attributes
event := abcitypes.Event{
Type: tc.eventType,
Attributes: func() []abcitypes.EventAttribute {
attributes := []abcitypes.EventAttribute{
{
Key: concentratedliquiditytypes.AttributeKeyPoolId,
Value: tc.poolID,
Index: false,
},
{
Key: concentratedliquiditytypes.AttributeKeyTokensIn,
Value: tc.tokenIn,
Index: false,
},
{
Key: concentratedliquiditytypes.AttributeKeyTokensOut,
Value: tc.tokenOut,
Index: false,
},
}
return attributes
}(),
}

// Pass the event to the SetSpotPrice method
err = indexerStreamingService.SetSpotPrice(s.Ctx, &event)
s.Require().Equal(tc.expectedError, err != nil)

// Assert the "quote_tokenin_base_tokenout" event attribute
if !tc.expectedError {
s.Require().Equal(tc.expectedSpotPrice, checkIfSpotPriceAttributeExists(event))
}

})
}
}

func (s *IndexerServiceTestSuite) TestTrackCreatedPoolID() {
testCases := []struct {
name string // Test case name
Expand Down Expand Up @@ -646,3 +803,16 @@ func checkIfLiquidityAttributeExists(event abcitypes.Event, denoms []string) boo
}
return foundKey0 && foundKey1
}

// checkIfSpotPriceAttributeExists checks if the spot price attribute exists in the event attributes
// as they should be appended by the SetSpotPrice method in the indexer streaming service.
// i.e. "quote_tokenin_base_tokenout" must exist in the event.Attributes
func checkIfSpotPriceAttributeExists(event abcitypes.Event) bool {
spotPriceKey := "quote_tokenin_base_tokenout"
for _, attribute := range event.Attributes {
if attribute.Key == spotPriceKey {
return true
}
}
return false
}

0 comments on commit a110bc5

Please sign in to comment.