Skip to content

Commit

Permalink
complitly refactor trnasaction confirmation
Browse files Browse the repository at this point in the history
  • Loading branch information
abrzezinski94 committed Jan 12, 2024
1 parent 747b455 commit 1f62d86
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 152 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@blockworks-foundation/mangolana",
"version": "0.0.1-beta.16",
"version": "0.0.2",
"description": "",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down Expand Up @@ -30,7 +30,7 @@
},
"homepage": "https://github.com/blockworks-foundation/mangolana#readme",
"dependencies": {
"@solana/web3.js": "^1.63.1",
"@solana/web3.js": "^1.88.0",
"bs58": "^5.0.0",
"node-fetch": "^3.2.10"
},
Expand Down
2 changes: 1 addition & 1 deletion src/globalTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class BlockHeightStrategyClass implements BlockHeightStrategy {
block: BlockhashWithExpiryBlockHeight;
getSignatureStatusesPoolIntervalMs: number;
constructor({
startBlockCheckAfterSecs = 90,
startBlockCheckAfterSecs = 10,
block,
getSignatureStatusesPoolIntervalMs = 5000,
}: {
Expand Down
258 changes: 168 additions & 90 deletions src/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
Connection,
Keypair,
RpcResponseAndContext,
SignatureResult,
SignatureStatus,
SimulatedTransactionResponse,
Transaction,
Expand All @@ -17,10 +18,16 @@ import {
getTimeoutConfig,
SequenceType,
TimeStrategy,
TimeStrategyClass,
TransactionInstructionWithSigners,
WalletSigner,
} from './globalTypes';

enum ConfirmationReject {
Timeout = 'Timeout',
Aborted = 'Aborted',
}

export interface TransactionInstructionWithType {
instructionsSet: TransactionInstructionWithSigners[];
sequenceType?: SequenceType;
Expand All @@ -39,6 +46,7 @@ export type awaitTransactionSignatureConfirmationProps = {
config?: {
logFlowInfo?: boolean;
};
abortSignal?: AbortSignal;
};
/**
* waits for transaction confirmation
Expand All @@ -63,115 +71,185 @@ export const awaitTransactionSignatureConfirmation = async ({
connection,
timeoutStrategy,
config,
abortSignal,
}: awaitTransactionSignatureConfirmationProps) => {
const abortController = new AbortController();
const logger = new Logger({ ...config });
const timeoutConfig = getTimeoutConfig(timeoutStrategy);
let timeoutBlockHeight = 0;
let timeout = 0;
if (timeoutConfig instanceof BlockHeightStrategyClass) {
timeoutBlockHeight = timeoutConfig.block.lastValidBlockHeight + MAXIMUM_NUMBER_OF_BLOCKS_FOR_TRANSACTION;
timeout = timeoutConfig.startBlockCheckAfterSecs;
} else {
timeout = timeoutConfig.timeout;
}

let startTimeoutCheck = false;
let done = false;
const confirmLevels: (TransactionConfirmationStatus | null | undefined)[] = ['finalized'];
const confirmLevels: TransactionConfirmationStatus[] = ['finalized'];
if (confirmLevel === 'confirmed') {
confirmLevels.push('confirmed');
} else if (confirmLevel === 'processed') {
confirmLevels.push('confirmed');
confirmLevels.push('processed');
}
let subscriptionId: number | undefined;
try {
const result: RpcResponseAndContext<SignatureStatus> = await Promise.race([
confirmWithSignatureStatuses(
txid,
connection,
confirmLevels,
logger,
timeoutConfig,
abortController.signal,
abortSignal,
),
confirmWithWebSockets(txid, connection, confirmLevel, logger, abortController.signal, abortSignal),
timeoutCheck(txid, timeoutConfig, logger, connection, confirmLevel, abortController.signal, abortSignal),
]);
return result;
} catch (e) {
abortController.abort();
logger.log('await transaction error', e);
throw e;
}
};

const result = await new Promise((resolve, reject) => {
(async () => {
setTimeout(() => {
if (done) {
return;
}
if (timeoutBlockHeight !== 0) {
startTimeoutCheck = true;
const confirmWithSignatureStatuses = (
txid: string,
connection: Connection,
confirmLevels: TransactionConfirmationStatus[],
logger: Logger,
timeoutConfig: TimeStrategyClass | BlockHeightStrategyClass,
internalSignal: AbortSignal,
externalSignal?: AbortSignal,
) => {
return new Promise<RpcResponseAndContext<SignatureStatus>>(async (resolve, reject) => {
let intervalTimeout: NodeJS.Timer | null = null;
const retryTimer = timeoutConfig.getSignatureStatusesPoolIntervalMs || 5000;
const onAbort = () => {
if (intervalTimeout) {
clearInterval(intervalTimeout);
}

reject(ConfirmationReject.Aborted);
};
internalSignal.addEventListener('abort', onAbort);
externalSignal?.addEventListener('abort', onAbort);

intervalTimeout = setInterval(async () => {
try {
const result = await connection.getSignatureStatus(txid);
if (!result) return;

if (result.value?.err) {
logger.log('REST error for', txid, result);
reject({ value: result.value, context: result.context });
} else if (
!(
result.value!.confirmations ||
(result.value!.confirmationStatus && confirmLevels.includes(result.value!.confirmationStatus))
)
) {
logger.log('REST not confirmed', txid, result);
} else {
done = true;
logger.log('Timed out for txid: ', txid);
reject({ timeout: true });
logger.log('REST confirmed', txid, result);
resolve({ value: result.value!, context: result.context });
}
}, timeout);
try {
subscriptionId = connection.onSignature(
txid,
(result, context) => {
subscriptionId = undefined;
done = true;
if (result.err) {
reject(result.err);
} else {
resolve(result);
}
},
confirmLevel,
);
} catch (e) {
done = true;
logger.log('WS error in setup', txid, e);
logger.log('REST connection error: txid', txid, e);
}
const retrySleep = timeoutConfig.getSignatureStatusesPoolIntervalMs || 5000;
while (!done) {
// eslint-disable-next-line no-loop-func
await sleep(retrySleep);
(async () => {
try {
const promises: [Promise<RpcResponseAndContext<(SignatureStatus | null)[]>>, Promise<number>?] = [
connection.getSignatureStatuses([txid]),
];
//if startTimeoutThreshold passed we start to check if
//current blocks are did not passed timeoutBlockHeight threshold
if (startTimeoutCheck) {
promises.push(connection.getBlockHeight('confirmed'));
}
const [signatureStatuses, currentBlockHeight] = await Promise.all(promises);
if (typeof currentBlockHeight !== undefined && timeoutBlockHeight <= currentBlockHeight!) {
logger.log('Timed out for txid: ', txid);
done = true;
reject({ timeout: true });
}
}, retryTimer);
});
};

const result = signatureStatuses && signatureStatuses.value[0];
if (!done) {
if (!result) return;
if (result.err) {
logger.log('REST error for', txid, result);
done = true;
reject(result.err);
} else if (!(result.confirmations || confirmLevels.includes(result.confirmationStatus))) {
logger.log('REST not confirmed', txid, result);
} else {
logger.log('REST confirmed', txid, result);
done = true;
resolve(result);
}
}
} catch (e) {
if (!done) {
logger.log('REST connection error: txid', txid, e);
const confirmWithWebSockets = (
txid: string,
connection: Connection,
confirmLevel: TransactionConfirmationStatus,
logger: Logger,
internalSignal: AbortSignal,
externalSignal?: AbortSignal,
) => {
return new Promise<RpcResponseAndContext<SignatureStatus>>(async (resolve, reject) => {
let subscriptionId: number | undefined;
const onAbort = () => {
if (subscriptionId) {
connection.removeSignatureListener(subscriptionId).catch((e) => {
logger.log('WS error in cleanup', e);
});
}
reject(ConfirmationReject.Aborted);
};
internalSignal.addEventListener('abort', onAbort);
externalSignal?.addEventListener('abort', onAbort);
try {
logger.log('on signature', connection);
subscriptionId = connection.onSignature(
txid,
(result, context) => {
subscriptionId = undefined;
if (result.err) {
reject({ value: result, context });
} else {
//@ts-ignore
resolve({ value: result, context });
}
},
confirmLevel,
);
} catch (e) {
logger.log('WS error in setup', txid, e);
}
if (subscriptionId) {
connection.removeSignatureListener(subscriptionId).catch((e) => {
logger.log('WS error in cleanup', e);
});
}
});
};

const timeoutCheck = (
txid: string,
timeoutConfig: TimeStrategyClass | BlockHeightStrategyClass,
logger: Logger,
connection: Connection,
confirmLevel: TransactionConfirmationStatus,
internalSignal: AbortSignal,
externalSignal?: AbortSignal,
) => {
return new Promise<RpcResponseAndContext<SignatureStatus>>(async (resolve, reject) => {
let intervalTimer: NodeJS.Timeout | null = null;
let setTimeoutTimer: NodeJS.Timeout | null = null;
let timeoutBlockHeight = 0;
let timeout = 0;
if (timeoutConfig instanceof BlockHeightStrategyClass) {
timeoutBlockHeight = timeoutConfig.block.lastValidBlockHeight;
timeout = timeoutConfig.startBlockCheckAfterSecs;
} else {
timeout = timeoutConfig.timeout;
}
const onAbort = () => {
if (intervalTimer) {
clearInterval(intervalTimer);
}
if (setTimeoutTimer) {
clearTimeout(setTimeoutTimer);
}
reject(ConfirmationReject.Aborted);
};
internalSignal.addEventListener('abort', onAbort);
externalSignal?.addEventListener('abort', onAbort);
const retrySleep = timeoutConfig.getSignatureStatusesPoolIntervalMs || 5000;
setTimeoutTimer = setTimeout(async () => {
if (timeoutBlockHeight !== 0) {
intervalTimer = setInterval(async () => {
const currentBlockHeight = await connection.getBlockHeight(confirmLevel);
if (typeof currentBlockHeight !== undefined && timeoutBlockHeight <= currentBlockHeight!) {
logger.log('Timed out for txid: ', txid);
if (intervalTimer) {
clearInterval(intervalTimer);
}
reject(ConfirmationReject.Timeout);
}
})();
}, retrySleep);
} else {
logger.log('Timed out for txid: ', txid);
reject(ConfirmationReject.Timeout);
}
})();
}, timeout);
});

if (subscriptionId) {
connection.removeSignatureListener(subscriptionId).catch((e) => {
logger.log('WS error in cleanup', e);
});
}

done = true;
return result;
};

export type sendAndConfirmSignedTransactionProps = {
Expand Down Expand Up @@ -273,7 +351,7 @@ export const sendAndConfirmSignedTransaction = async ({
}
} catch (err: any) {
logger.log(err);
if (err.timeout) {
if (err === 'Timeout') {
throw { txid };
}
let simulateResult: SimulatedTransactionResponse | null = null;
Expand Down
Loading

0 comments on commit 1f62d86

Please sign in to comment.