diff --git a/package.json b/package.json index 807d717d2..1d54bcded 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@paraswap/dex-lib", - "version": "3.8.26", + "version": "3.8.27-dualsync.1", "main": "build/index.js", "types": "build/index.d.ts", "repository": "https://github.com/paraswap/paraswap-dex-lib", diff --git a/src/dex-helper/dummy-dex-helper.ts b/src/dex-helper/dummy-dex-helper.ts index c147c4525..0732db4da 100644 --- a/src/dex-helper/dummy-dex-helper.ts +++ b/src/dex-helper/dummy-dex-helper.ts @@ -299,6 +299,8 @@ export class DummyDexHelper implements IDexHelper { getLogger: LoggerConstructor; web3Provider: Web3; getTokenUSDPrice: (token: Token, amount: bigint) => Promise; + dexStatePublisher: any; // FIXME + dexStateSubscriber: any; // FIXME constructor(network: number, rpcUrl?: string) { this.config = new ConfigHelper(false, generateConfig(network), 'is'); diff --git a/src/dex-helper/idex-helper.ts b/src/dex-helper/idex-helper.ts index a66573821..c3302a9d1 100644 --- a/src/dex-helper/idex-helper.ts +++ b/src/dex-helper/idex-helper.ts @@ -24,4 +24,6 @@ export interface IDexHelper { blockManager: IBlockManager; getLogger: LoggerConstructor; getTokenUSDPrice: (token: Token, amount: bigint) => Promise; + dexStateSubscriber: any; // temp ? + dexStatePublisher: any; // temp ? } diff --git a/src/dex-utils.ts b/src/dex-utils.ts new file mode 100644 index 000000000..f9991ed75 --- /dev/null +++ b/src/dex-utils.ts @@ -0,0 +1,5 @@ +export class DexPoolNotFoundError extends Error { + constructor(dexKey: string, poolIdentifier: string) { + super(`Pool ${poolIdentifier} not found for DEX ${dexKey}`); + } +} diff --git a/src/dex/idex.ts b/src/dex/idex.ts index 6479a6e5d..740b2353d 100644 --- a/src/dex/idex.ts +++ b/src/dex/idex.ts @@ -207,6 +207,14 @@ export interface IDexPricing { // useful for RFQ system isBlacklisted?(userAddress?: Address): AsyncOrSync; + addPoolGenerateState?({ + poolIdentifier, + blockNumber, + }: { + poolIdentifier: string; + blockNumber: number; + }): AsyncOrSync; + // blacklist a specific userAddress from exchange setBlacklist?(userAddress?: Address): AsyncOrSync; } diff --git a/src/dex/pancakeswap-v3/pancakeswap-v3-pool.ts b/src/dex/pancakeswap-v3/pancakeswap-v3-pool.ts index fc2fdb9f1..10f7d7ff1 100644 --- a/src/dex/pancakeswap-v3/pancakeswap-v3-pool.ts +++ b/src/dex/pancakeswap-v3/pancakeswap-v3-pool.ts @@ -29,8 +29,9 @@ import { _reduceTickBitmap, _reduceTicks, } from '../uniswap-v3/contract-math/utils'; +import { StatefulDualSynchronizer } from '../../stateful-dual-synchronizer'; -export class PancakeSwapV3EventPool extends StatefulEventSubscriber { +export class PancakeSwapV3EventPool extends StatefulDualSynchronizer { handlers: { [event: string]: ( event: any, @@ -61,7 +62,7 @@ export class PancakeSwapV3EventPool extends StatefulEventSubscriber { constructor( readonly dexHelper: IDexHelper, - parentName: string, + dexKey: string, readonly stateMultiContract: Contract, readonly erc20Interface: Interface, protected readonly factoryAddress: Address, @@ -73,14 +74,8 @@ export class PancakeSwapV3EventPool extends StatefulEventSubscriber { readonly poolInitCodeHash: string, readonly poolDeployer?: string, ) { - super( - parentName, - `${token0}_${token1}_${feeCode}`, - dexHelper, - logger, - true, - mapKey, - ); + const poolIdentifier = `${token0}_${token1}_${feeCode}`; + super(dexKey, poolIdentifier, dexHelper, logger); this.feeCodeAsString = feeCode.toString(); this.token0 = token0.toLowerCase(); this.token1 = token1.toLowerCase(); diff --git a/src/dex/pancakeswap-v3/pancakeswap-v3.ts b/src/dex/pancakeswap-v3/pancakeswap-v3.ts index fc267ad0f..96707aa9d 100644 --- a/src/dex/pancakeswap-v3/pancakeswap-v3.ts +++ b/src/dex/pancakeswap-v3/pancakeswap-v3.ts @@ -62,6 +62,7 @@ import { PancakeswapV3Factory, } from './pancakeswap-v3-factory'; import { extractReturnAmountPosition } from '../../executor/utils'; +import { DexPoolNotFoundError } from '../../dex-utils'; type PoolPairsInfo = { token0: Address; @@ -293,7 +294,10 @@ export class PancakeswapV3 }, }); } catch (e) { - if (e instanceof Error && e.message.endsWith('Pool does not exist')) { + if ( + (e instanceof Error && e.message.endsWith('Pool does not exist')) || + e instanceof DexPoolNotFoundError + ) { // no need to await we want the set to have the pool key but it's not blocking this.dexHelper.cache.zadd( this.notExistingPoolSetKey, @@ -368,6 +372,22 @@ export class PancakeswapV3 return true; } + async addPoolGenerateState({ + poolIdentifier, + blockNumber, + }: { + poolIdentifier: string; + blockNumber: number; + }): Promise { + const [token0, token1, fee] = poolIdentifier.split('_'); + + const pool = await this.getPool(token0, token1, BigInt(fee), blockNumber); + + if (!pool) return null; + + return pool.getState(blockNumber); + } + async getPoolIdentifiers( srcToken: Token, destToken: Token, @@ -661,7 +681,7 @@ export class PancakeswapV3 if (state.liquidity <= 0n) { if (state.liquidity < 0) { this.logger.error( - `${this.dexKey}-${this.network}: ${pool.poolAddress} pool has negative liquidity: ${state.liquidity}. Find with key: ${pool.mapKey}`, + `${this.dexKey}-${this.network}: ${pool.poolAddress} pool has negative liquidity: ${state.liquidity}. Find with key: ${pool.poolIdentifier}`, ); } this.logger.trace(`pool have 0 liquidity`); diff --git a/src/dex/uniswap-v2/uniswap-v2.ts b/src/dex/uniswap-v2/uniswap-v2.ts index 111b1edc4..df742088a 100644 --- a/src/dex/uniswap-v2/uniswap-v2.ts +++ b/src/dex/uniswap-v2/uniswap-v2.ts @@ -3,7 +3,6 @@ import { pack } from '@ethersproject/solidity'; import _ from 'lodash'; import { AsyncOrSync, DeepReadonly } from 'ts-essentials'; import erc20ABI from '../../abi/erc20.json'; -import { StatefulEventSubscriber } from '../../stateful-event-subscriber'; import { AdapterExchangeParam, Address, @@ -69,6 +68,7 @@ import { hexConcat, } from 'ethers/lib/utils'; import { BigNumber } from 'ethers'; +import { StatefulDualSynchronizer } from '../../stateful-dual-synchronizer'; const rebaseTokens = _rebaseTokens as { chainId: number; address: string }[]; @@ -124,11 +124,12 @@ export interface UniswapV2Pair { pool?: UniswapV2EventPool; } -export class UniswapV2EventPool extends StatefulEventSubscriber { +export class UniswapV2EventPool extends StatefulDualSynchronizer { decoder = (log: Log) => this.iface.parseLog(log); constructor( - parentName: string, + public parentName: string, + public poolIdentifier: string, protected dexHelper: IDexHelper, private poolAddress: Address, private token0: Token, @@ -142,15 +143,7 @@ export class UniswapV2EventPool extends StatefulEventSubscriber number, private iface: Interface = uniswapV2PoolIface, ) { - super( - parentName, - (token0.symbol || token0.address) + - '-' + - (token1.symbol || token1.address) + - ' pool', - dexHelper, - logger, - ); + super(parentName, poolIdentifier, dexHelper, logger); } protected processLog( @@ -301,10 +294,11 @@ export class UniswapV2 reserves1: string, feeCode: number, blockNumber: number, - ) { + ): Promise { const { callEntry, callDecoder } = this.getFeesMultiCallData(pair) || {}; pair.pool = new UniswapV2EventPool( this.dexKey, + this._getPoolIdentifier(pair.token0, pair.token1), this.dexHelper, pair.exchange!, pair.token0, @@ -318,9 +312,50 @@ export class UniswapV2 ); pair.pool.addressesSubscribed.push(pair.exchange!); - await pair.pool.initialize(blockNumber, { + try { + await pair.pool.initialize( + blockNumber, + /*, { state: { reserves0, reserves1, feeCode }, - }); + } + */ + ); + return pair.pool; + } catch (e) { + this.logger.error(`Error_addPool could not add pool with error:`, e); + delete pair.pool; + return null; + } + } + + async addPoolGenerateState({ + poolIdentifier, + blockNumber, + }: { + poolIdentifier: string; + blockNumber: number; + }): Promise { + // poolIdentifier is in the format of 'uniswapv2__' + const [, token0, token1] = poolIdentifier.split('_'); + const pair = await this.findPair( + { address: token0, decimals: 0 }, + { address: token1, decimals: 0 }, + ); + if (!(pair && pair.exchange)) return null; + + const [pairState] = await this.getManyPoolReserves([pair], blockNumber); + + const pool = await this.addPool( + pair, + pairState.reserves0, + pairState.reserves1, + pairState.feeCode, + blockNumber, + ); + + if (!pool) return null; + + return pool.getState(blockNumber); } async getBuyPrice( @@ -479,7 +514,9 @@ export class UniswapV2 pairState.feeCode, blockNumber, ); - } else pair.pool.setState(pairState, blockNumber); + } else { + // pair.pool.setState(pairState, blockNumber); // TEMP: temporarily disabled to prevent confusions with new system + } } } @@ -525,6 +562,19 @@ export class UniswapV2 }; } + _getPoolIdentifier(_from: Token, _to: Token) { + const from = this.dexHelper.config.wrapETH(_from); + const to = this.dexHelper.config.wrapETH(_to); + + const tokenAddress = [from.address.toLowerCase(), to.address.toLowerCase()] + .sort((a, b) => (a > b ? 1 : -1)) + .join('_'); + + const poolIdentifier = `${this.dexKey}_${tokenAddress}`; + + return poolIdentifier.toLowerCase(); + } + async getPoolIdentifiers( _from: Token, _to: Token, diff --git a/src/dex/uniswap-v3/uniswap-v3-pool.ts b/src/dex/uniswap-v3/uniswap-v3-pool.ts index 2110470b2..25500b8ea 100644 --- a/src/dex/uniswap-v3/uniswap-v3-pool.ts +++ b/src/dex/uniswap-v3/uniswap-v3-pool.ts @@ -4,10 +4,7 @@ import { Interface } from '@ethersproject/abi'; import { ethers } from 'ethers'; import { assert, DeepReadonly } from 'ts-essentials'; import { Log, Logger, BlockHeader, Address } from '../../types'; -import { - InitializeStateOptions, - StatefulEventSubscriber, -} from '../../stateful-event-subscriber'; +import { InitializeStateOptions } from '../../stateful-event-subscriber'; import { IDexHelper } from '../../dex-helper/idex-helper'; import { PoolState, @@ -27,8 +24,9 @@ import { TickBitMap } from './contract-math/TickBitMap'; import { uint256ToBigInt } from '../../lib/decoders'; import { decodeStateMultiCallResultWithRelativeBitmaps } from './utils'; import { _reduceTickBitmap, _reduceTicks } from './contract-math/utils'; +import { StatefulDualSynchronizer } from '../../stateful-dual-synchronizer'; -export class UniswapV3EventPool extends StatefulEventSubscriber { +export class UniswapV3EventPool extends StatefulDualSynchronizer { handlers: { [event: string]: ( event: any, @@ -79,7 +77,7 @@ export class UniswapV3EventPool extends StatefulEventSubscriber { poolKey = `${poolKey}_${tickSpacing}`; } - super(parentName, poolKey, dexHelper, logger, true, mapKey); + super(parentName, poolKey, dexHelper, logger); this.feeCodeAsString = feeCode.toString(); this.token0 = token0.toLowerCase(); this.token1 = token1.toLowerCase(); diff --git a/src/dex/uniswap-v3/uniswap-v3.ts b/src/dex/uniswap-v3/uniswap-v3.ts index 3c2e3385a..d1bc005dc 100644 --- a/src/dex/uniswap-v3/uniswap-v3.ts +++ b/src/dex/uniswap-v3/uniswap-v3.ts @@ -72,6 +72,7 @@ import { OptimalSwapExchange } from '@paraswap/core'; import { OnPoolCreatedCallback, UniswapV3Factory } from './uniswap-v3-factory'; import { hexConcat, hexlify, hexZeroPad, hexValue } from 'ethers/lib/utils'; import { extractReturnAmountPosition } from '../../executor/utils'; +import { DexPoolNotFoundError } from '../../dex-utils'; type PoolPairsInfo = { token0: Address; @@ -321,7 +322,10 @@ export class UniswapV3 }, }); } catch (e) { - if (e instanceof Error && e.message.endsWith('Pool does not exist')) { + if ( + (e instanceof Error && e.message.endsWith('Pool does not exist')) || + e instanceof DexPoolNotFoundError + ) { // no need to await we want the set to have the pool key but it's not blocking this.dexHelper.cache.zadd( this.notExistingPoolSetKey, @@ -443,6 +447,28 @@ export class UniswapV3 return true; } + async addPoolGenerateState({ + poolIdentifier, + blockNumber, + }: { + poolIdentifier: string; + blockNumber: number; + }): Promise { + const [token0, token1, fee, tickSpacing] = poolIdentifier.split('_'); + + const pool = await this.getPool( + token0, + token1, + BigInt(fee), + blockNumber, + tickSpacing !== undefined ? BigInt(tickSpacing) : undefined, + ); + + if (!pool) return null; + + return pool.getState(blockNumber); + } + async getPoolIdentifiers( srcToken: Token, destToken: Token, @@ -763,7 +789,7 @@ export class UniswapV3 if (state.liquidity <= 0n) { if (state.liquidity < 0) { this.logger.error( - `${this.dexKey}-${this.network}: ${pool.poolAddress} pool has negative liquidity: ${state.liquidity}. Find with key: ${pool.mapKey}`, + `${this.dexKey}-${this.network}: ${pool.poolAddress} pool has negative liquidity: ${state.liquidity}. Find with key: ${pool.poolIdentifier}`, ); } this.logger.trace(`pool have 0 liquidity`); diff --git a/src/stateful-dual-synchronizer.ts b/src/stateful-dual-synchronizer.ts new file mode 100644 index 000000000..179fba761 --- /dev/null +++ b/src/stateful-dual-synchronizer.ts @@ -0,0 +1,415 @@ +import { assert, AsyncOrSync, DeepReadonly } from 'ts-essentials'; +import { Log, Logger } from './types'; +import { BlockHeader } from 'web3-eth'; +import { EventSubscriber } from './dex-helper/iblock-manager'; + +import { MAX_BLOCKS_HISTORY } from './constants'; +import { IDexHelper } from './dex-helper'; +import { Utils } from './utils'; + +export type InitializeStateOptions = { + state?: DeepReadonly; + initCallback?: (state: DeepReadonly) => void; + forceRegenerate?: boolean; // deprecated ? +}; + +type ObjectOrString = DeepReadonly | string; +type ObjectOrStringOrNull = ObjectOrString | null; + +// TODO: make sure that we can get either state object or string +export abstract class StatefulDualSynchronizer + implements EventSubscriber +{ + //The current state and its block number + //Derived classes should not set these directly, and instead use setState() + protected state: ObjectOrStringOrNull = null; + protected stateBlockNumber: number = 0; + + //Derived classes should use setState() to record a new entry + protected stateHistory: { + [blockNumber: number]: ObjectOrString; + } = {}; + + //Invalid flag - indicates that the currently stored state might not be valid + protected invalid: boolean = false; + + isTracking: () => boolean = () => false; + public addressesSubscribed: string[] = []; + + // parentName and Name are imposed by the interface. Prefer dexKey and poolIdentifier + public parentName: string; + public name: string; + public isInitialized = false; + + constructor( + public dexKey: string, + public poolIdentifier: string, + protected dexHelper: IDexHelper, + protected logger: Logger, + ) { + this.dexKey = dexKey.toLowerCase(); + this.poolIdentifier = poolIdentifier.toLowerCase(); + + // parentName and Name are imposed by the interface. Prefer dexKey and poolIdentifier + this.parentName = this.dexKey; + this.name = this.poolIdentifier; + + if (!this.dexHelper.config.isSlave) { + this.dexHelper.dexStatePublisher.registerPool( + this.dexKey, + this.poolIdentifier, + this, + ); + } + } + + getStateBlockNumber(): Readonly { + return this.stateBlockNumber; + } + + //Function which set the initial state and bounded it to blockNumber + //There is multiple possible case: + // 1. You provide a state in options object the function will initialize with the provided state + // with blockNumber and subscribe to logs. + // 2. if you are a master instance of dex-lib and no state is provided in options object + // then the function generate a new state with blockNumber as height and set the state with + // the result. + // 3. if you are a slave instance of dex-lib + // either: + // - If a state is found in the cache and the state is not null we set our state with the + // cache state and cache blockNumber. Subscribe to logs with the cache blockNumber + // or: + // - If no valid state found in cache, we generate a new state with blockNumber + // and se state with blockNumber. Subscribe to logs with blockNumber. The function + // will also publish a message to cache to tell one master version of dex-lib that this slave + // instance subscribed to a pool from dex this.parentName and name this.name. + async initialize( + blockNumber: number, + options?: InitializeStateOptions, + ) { + let masterBn: undefined | number = undefined; + if (options && options.state) { + this.setState(options.state, blockNumber); + } else { + if (this.dexHelper.config.isSlave) { + let updatedState = + await this.dexHelper.dexStateSubscriber.requestDEXPoolState( + this.dexKey, + this.poolIdentifier, + blockNumber, + ); + + this.setState(updatedState, blockNumber); + } else { + // if you are not a slave instance always generate new state + this.logger.info( + `${this.dexKey}: ${this.poolIdentifier}: cache generating state`, + ); + const state = await this.generateState(blockNumber); + this.setState(state, blockNumber); + } + } + + // apply a callback on the state + if (options && options.initCallback) { + if (this.state) { + assert( + typeof this.state !== 'string', + 'LOGIC ERROR: state is serialised', + ); + options.initCallback(this.state); + } + } + + if (this.dexHelper.config.isSlave) { + this.dexHelper.dexStateSubscriber.subscribeToDEXPoolUpdates( + this.dexKey, + this.poolIdentifier, + (state: string, blockNumber: number) => { + this.setState(state, blockNumber); + }, + ); + } else { + this.dexHelper.blockManager.subscribeToLogs( + this, + this.addressesSubscribed, + masterBn || blockNumber, + ); + } + + this.isInitialized = true; + } + + //Function which transforms the given state for the given log event. + //If the provided log does not affect the state, return null. + protected abstract processLog( + state: DeepReadonly, + log: Readonly, + blockHeader: Readonly, + ): AsyncOrSync | null>; + + //This function processes all logs for a single block (the block number is + //contained in each of the logs). It is not allowed to call this function + //with an empty logs array. The default implementation here will just call + //processLog for each of the logs; it may be overridden, if block specific + //handling or handling multiple logs at once is needed. Null should be + //returned if none of the provided logs affect the state. + protected async processBlockLogs( + state: DeepReadonly, + logs: Readonly[], + blockHeader: Readonly, + ): Promise | null> { + let nextState: DeepReadonly | null = null; + for (const log of logs) { + const retState: DeepReadonly | null = await this.processLog( + nextState || state, + log, + blockHeader, + ); + if (retState) nextState = retState; + } + return nextState; + } + + //Function used to generate a state if one is not currently present, which + //must be the state at exactly the given block number, unless one is not + //provided, in which case one should be generated for latest block. This + //function should not use any previous states to derive a new state, it should + //generate one from scratch. + abstract generateState( + blockNumber?: number | 'latest', + ): AsyncOrSync>; + + restart(blockNumber: number): void { + for (const _bn of Object.keys(this.stateHistory)) { + const bn = +_bn; + if (bn >= blockNumber) break; + delete this.stateHistory[bn]; + } + if (this.state && this.stateBlockNumber < blockNumber) { + this.logger.info( + `StatefulDualSynchronizer restart, bn: ${blockNumber}, state_bn: ${this.stateBlockNumber}: ${this.dexKey}: ${this.poolIdentifier}`, + ); + this._setState(null, blockNumber); + } + } + + //Implementation must call setState() for every block in which the state + //changes and must ignore any logs that aren't newer than the oldest state + //stored. If state is not set or null, then the implementation should derive + //the state by another method for the block number of the first log, ignore + //all logs with that block number and then proceed as normal for the remaining + //logs. Remember to clear the invalid flag, even if there are no logs! + //A default implementation is provided here, but could be overridden. + async update( + logs: Readonly[], + blockHeaders: Readonly<{ [blockNumber: number]: Readonly }>, + ): Promise { + if (this.dexHelper.config.isSlave) { + throw new Error( + 'LOGIC ERROR: we should expect to listen to events on slave instances', + ); + } + + let index = 0; + let lastBlockNumber: number | undefined; + while (index < logs.length) { + const blockNumber = logs[index].blockNumber; + if (index && blockNumber <= lastBlockNumber!) { + this.logger.error('update() received blocks out of order!'); + } + const blockHeader = blockHeaders[blockNumber]; + if (!blockHeader) { + this.logger.error('update() missing block header!'); + } + let lastLogIndex = logs[index].logIndex; + let indexBlockEnd = index + 1; + while ( + indexBlockEnd < logs.length && + logs[indexBlockEnd].blockNumber === blockNumber + ) { + if (logs[indexBlockEnd].logIndex <= lastLogIndex) { + this.logger.error('update() received logs out of order!'); + } + lastLogIndex = logs[indexBlockEnd].logIndex; + ++indexBlockEnd; + } + if (!this.state) { + const freshState = await this.generateState(blockNumber); + this.setState(freshState, blockNumber); + } + //Find the last state before the blockNumber of the logs + let stateBeforeLog: DeepReadonly | string | undefined; + for (const _bn of Object.keys(this.stateHistory)) { + const bn = +_bn; + if (bn >= blockNumber) break; + stateBeforeLog = this.stateHistory[bn]; + } + //Ignoring logs if there's no older state to play them onto + if (stateBeforeLog) { + assert( + typeof stateBeforeLog !== 'string', + 'LOGIC ERROR: state is serialised', + ); + const nextState = await this.processBlockLogs( + stateBeforeLog, + logs.slice(index, indexBlockEnd), + blockHeader, + ); + if (nextState) this.setState(nextState, blockNumber); + } + lastBlockNumber = blockNumber; + index = indexBlockEnd; + } + this.invalid = false; + + if (!this.dexHelper.config.isSlave && this.state === null) { + const network = this.dexHelper.config.data.network; + const createNewState = async () => { + if (this.state !== null) { + return true; + } + const latestBlockNumber = + this.dexHelper.blockManager.getLatestBlockNumber(); + this.logger.warn( + `${network}: ${this.dexKey}: ${this.poolIdentifier}: master generate (latest: ${latestBlockNumber}) new state because state is null`, + ); + try { + const state = await this.generateState(latestBlockNumber); + this.setState(state, latestBlockNumber); + return true; + } catch (e) { + this.logger.error( + `${network}: ${this.dexKey} ${this.poolIdentifier}: (${latestBlockNumber}) failed fetch state:`, + e, + ); + } + return false; + }; + this.dexHelper.promiseScheduler.addPromise(createNewState); + } + } + + //Removes all states that are beyond the given block number and sets the + //current state to the latest one that is left, if any, unless the invalid + //flag is not set, in which case the most recent state can be kept. + rollback(blockNumber: number): void { + if (this.invalid) { + let lastBn = undefined; + //loop in the ascending order of the blockNumber. V8 property when object keys are number. + for (const bn of Object.keys(this.stateHistory)) { + const bnAsNumber = +bn; + if (bnAsNumber > blockNumber) { + delete this.stateHistory[+bn]; + } else { + lastBn = bnAsNumber; + } + } + + if (lastBn) { + this._setState(this.stateHistory[lastBn], lastBn); + } else { + this.logger.info( + `StatefulDualSynchronizer rollback, bn: ${blockNumber}: ${this.dexKey}: ${this.poolIdentifier}`, + ); + this._setState(null, blockNumber); + // TODO: explore generating new state on invalidation + } + } else { + //Keep the current state in this.state and in the history + for (const _bn of Object.keys(this.stateHistory)) { + const bn = +_bn; + if (+bn > blockNumber && +bn !== this.stateBlockNumber) { + delete this.stateHistory[bn]; + } + } + } + } + + invalidate(): void { + this.logger.info( + `StatefulDualSynchronizer invalidate: ${this.dexKey}: ${this.poolIdentifier}`, + ); + this.invalid = true; + + // TODO: explore generating new state on invalidation + } + + //May return a state that is more recent than the block number specified, or + //will return null if a recent enough state cannot be found, in which case the + //caller should derive a state using another method (at an exact block + //number), possibly using generateState(), and set it on this object using + //setState. In case isTracking() returns true, it is assumed that the stored + //state is current and so the minBlockNumber will be disregarded. + getState(minBlockNumber: number): DeepReadonly | null { + if (!this.state || this.invalid) return null; + + if (this.dexHelper.config.isSlave) { + if ( + this.dexHelper.dexStateSubscriber.isSynced(minBlockNumber) || + this.stateBlockNumber >= minBlockNumber + ) { + if (typeof this.state === 'string') { + this.state = Utils.Parse(this.state); + } + assert( + typeof this.state !== 'string', + 'LOGIC ERROR: state is serialised', + ); + return this.state; + } + } else { + if (this.isTracking() || this.stateBlockNumber >= minBlockNumber) { + if (typeof this.state === 'string') { + this.state = Utils.Parse(this.state); + } + assert( + typeof this.state !== 'string', + 'LOGIC ERROR: state is serialised', + ); + return this.state; + } + } + + return null; // DEX need to implement an fallback strategy here + } + + // Returns the last set state. The state might be invalid or not updated. + getStaleState(): DeepReadonly | null { + if (typeof this.state === 'string') { + this.state = Utils.Parse(this.state); + } + assert(typeof this.state !== 'string', 'LOGIC ERROR: state is serialised'); + return this.state; + } + + _setState(state: ObjectOrStringOrNull, blockNumber: number) { + this.state = state; + this.stateBlockNumber = blockNumber; + } + + //Saves the state into the stateHistory, and cleans up any old state that is + //no longer needed. If the blockNumber is greater than or equal to the + //current state, then the current state will be updated and the invalid flag + //can be reset. + setState(state: ObjectOrString, blockNumber: number): void { + if (!blockNumber) { + this.logger.error('setState() with blockNumber', blockNumber); + return; + } + this.stateHistory[blockNumber] = state; + if (!this.state || blockNumber >= this.stateBlockNumber) { + this._setState(state, blockNumber); + this.invalid = false; + } + const minBlockNumberToKeep = this.stateBlockNumber - MAX_BLOCKS_HISTORY; + let lastBlockNumber: number | undefined; + for (const bn of Object.keys(this.stateHistory)) { + if (+bn <= minBlockNumberToKeep) { + if (lastBlockNumber) delete this.stateHistory[lastBlockNumber]; + } + if (+bn >= minBlockNumberToKeep) break; + lastBlockNumber = +bn; + } + } +}