diff --git a/apps/distributor/src/app.ts b/apps/distributor/src/app.ts index 1ba277506..ef9dc4377 100644 --- a/apps/distributor/src/app.ts +++ b/apps/distributor/src/app.ts @@ -1,6 +1,6 @@ import express, { type Request, type Response, Router } from 'express' import pino from 'pino' -import { DistributorWorker } from './distributor' +import { DistributorV1Worker } from './distributor' import { StandardMerkleTree } from '@openzeppelin/merkle-tree' import { selectAll } from 'app/utils/supabase/selectAll' import { supabaseAdmin } from './supabase' @@ -11,7 +11,7 @@ const logger = pino({ }) // Initialize DistributorWorker -const distributorWorker = new DistributorWorker(logger) +const distributorV1Worker = new DistributorV1Worker(logger) // Initialize Express app const app = express() @@ -25,10 +25,10 @@ app.get('/', (req, res) => { const distributorRouter = Router() -distributorRouter.get('/', async (req: Request, res: Response) => { +distributorRouter.get('/v1', async (req: Request, res: Response) => { res.json({ distributor: true, - ...distributorWorker.toJSON(), + ...distributorV1Worker.toJSON(), }) }) @@ -97,11 +97,11 @@ distributorRouter.post('/merkle', checkAuthorization, async (req: Request, res: res.json(result) }) -distributorRouter.post('/', checkAuthorization, async (req, res) => { +distributorRouter.post('/v1', checkAuthorization, async (req, res) => { const { id } = req.body as { id: string } logger.info({ id }, 'Received request to calculate distribution') try { - await distributorWorker.calculateDistribution(id) + await distributorV1Worker.calculateDistribution(id) } catch (err) { logger.error(err, 'Error while calculating distribution') res.status(500).json({ diff --git a/apps/distributor/src/distributor.test.ts b/apps/distributor/src/distributor.test.ts index a905d4af4..9f4ba1eac 100644 --- a/apps/distributor/src/distributor.test.ts +++ b/apps/distributor/src/distributor.test.ts @@ -6,7 +6,7 @@ import request from 'supertest' import app from './app' import { supabaseAdmin } from './supabase' import pino from 'pino' -import { DistributorWorker } from './distributor' +import { DistributorV1Worker } from './distributor' import type { Tables } from '@my/supabase/database.types' describe('Root Route', () => { @@ -20,19 +20,19 @@ describe('Root Route', () => { describe('Distributor Route', () => { it('should reject unauthorized requests', async () => { - const res = await request(app).post('/distributor') + const res = await request(app).post('/distributor/v1') expect(res.statusCode).toBe(401) expect(res.body).toEqual('Unauthorized') }) it('should handle authorization correctly', async () => { - const res = await request(app).get('/distributor') + const res = await request(app).get('/distributor/v1') expect(res.statusCode).toBe(200) expect(res.body).toMatchObject({ distributor: true, - running: true, + running: false, }) }) @@ -58,7 +58,7 @@ describe('Distributor Route', () => { expect(distribution).toBeDefined() const res = await request(app) - .post('/distributor') + .post('/distributor/v1') .set('Content-Type', 'application/json') .set('Authorization', `Bearer ${process.env.SUPABASE_SERVICE_ROLE}`) .send({ id: distribution.number }) @@ -210,7 +210,7 @@ describe('Distributor Worker', () => { const logger = pino({ level: 'silent', }) - const distributor = new DistributorWorker(logger, false) + const distributor = new DistributorV1Worker(logger, false) await distributor.calculateDistribution('4') const expectedShares = [ diff --git a/apps/distributor/src/distributor.ts b/apps/distributor/src/distributor.ts index 7558186ca..3bb33060b 100644 --- a/apps/distributor/src/distributor.ts +++ b/apps/distributor/src/distributor.ts @@ -28,7 +28,7 @@ const jsonBigint = (key, value) => { return value } -export class DistributorWorker { +export class DistributorV1Worker { private log: Logger private running: boolean private id: string diff --git a/apps/distributor/src/distributorv2.ts b/apps/distributor/src/distributorv2.ts new file mode 100644 index 000000000..480136250 --- /dev/null +++ b/apps/distributor/src/distributorv2.ts @@ -0,0 +1,492 @@ +import { cpus } from 'node:os' +import type { Database, Tables } from '@my/supabase/database.types' +import type { Logger } from 'pino' +import { + createDistributionShares, + fetchAllHodlers, + fetchAllVerifications, + fetchDistribution, + supabaseAdmin, +} from './supabase' +import { fetchAllBalances, isMerkleDropActive } from './wagmi' +import { calculateWeights, calculatePercentageWithBips, PERC_DENOM } from './weights' + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + +const cpuCount = cpus().length + +const inBatches = (array: T[], batchSize = Math.max(8, cpuCount - 1)) => { + return Array.from({ length: Math.ceil(array.length / batchSize) }, (_, i) => + array.slice(i * batchSize, (i + 1) * batchSize) + ) +} + +const jsonBigint = (key, value) => { + if (typeof value === 'bigint') { + return value.toString() + } + return value +} + +export class DistributorV2Worker { + private log: Logger + private running: boolean + private id: string + private lastDistributionId: number | null = null + private workerPromise: Promise + + constructor(log: Logger, start = true) { + this.id = Math.random().toString(36).substring(7) + this.log = log.child({ module: 'distributor', id: this.id }) + if (start) { + this.running = true + this.workerPromise = this.worker() + } else { + this.running = false + this.workerPromise = Promise.resolve() + } + } + + /** + * Calculates distribution shares for distributions in qualification period. + */ + private async calculateDistributions() { + this.log.info('Calculating distributions') + + const { data: distributions, error } = await supabaseAdmin + .from('distributions') + .select( + `*, + distribution_verification_values (*)`, + { count: 'exact' } + ) + .lte('qualification_start', new Date().toISOString()) + .gte('qualification_end', new Date().toISOString()) + + if (error) { + this.log.error({ error: error.message, code: error.code }, 'Error fetching distributions.') + throw error + } + + this.log.debug({ distributions }, `Found ${distributions.length} distributions.`) + + if (distributions.length === 0) { + this.log.info('No distributions found.') + return + } + + if (distributions.length > 1) { + this.log.error(`Found ${distributions.length} distributions. Only one is supported.`) + return + } + + const errors: unknown[] = [] + + for (const distribution of distributions) { + await this._calculateDistributionShares(distribution).catch((error) => errors.push(error)) + } + + if (distributions.length > 0) { + const lastDistribution = distributions[distributions.length - 1] + this.lastDistributionId = lastDistribution?.id ?? null + } else { + this.lastDistributionId = null + } + this.log.info( + { lastDistributionId: this.lastDistributionId }, + 'Finished calculating distributions.' + ) + + if (errors.length > 0) { + this.log.error(`Error calculating distribution shares. Encountered ${errors.length} errors.`) + throw errors[0] + } + } + + /** + * Calculates distribution shares for a single distribution. + */ + private async _calculateDistributionShares( + distribution: Tables<'distributions'> & { + distribution_verification_values: Tables<'distribution_verification_values'>[] + } + ): Promise { + const log = this.log.child({ distribution_id: distribution.id }) + + // verify tranche is not created when in production + if (await isMerkleDropActive(distribution)) { + throw new Error('Tranche is active. Cannot calculate distribution shares.') + } + + log.info({ distribution_id: distribution.id }, 'Calculating distribution shares.') + + const { + data: verifications, + error: verificationsError, + count, + } = await fetchAllVerifications(distribution.id) + + if (verificationsError) { + throw verificationsError + } + + if (verifications === null || verifications.length === 0) { + log.warn('No verifications found. Skipping distribution.') + return + } + + if (count !== verifications.length) { + throw new Error('Verifications count does not match expected count') + } + + log.info(`Found ${verifications.length} verifications.`) + // log.debug({ verifications }) + if (log.isLevelEnabled('debug')) { + await Bun.write( + 'dist/verifications.json', + JSON.stringify(verifications, jsonBigint, 2) + ).catch((e) => { + log.error(e, 'Error writing verifications.json') + }) + } + + const verificationValues = distribution.distribution_verification_values.reduce( + (acc, verification) => { + acc[verification.type] = { + fixedValue: BigInt(verification.fixed_value), + bipsValue: BigInt(verification.bips_value), + } + return acc + }, + {} as Record< + Database['public']['Enums']['verification_type'], + { fixedValue?: bigint; bipsValue?: bigint } + > + ) + const verificationsByUserId = verifications.reduce( + (acc, verification) => { + acc[verification.user_id] = acc[verification.user_id] || [] + acc[verification.user_id]?.push(verification) + return acc + }, + {} as Record + ) + + log.info(`Found ${Object.keys(verificationsByUserId).length} users with verifications.`) + // log.debug({ verificationsByUserId }) + if (log.isLevelEnabled('debug')) { + await Bun.write( + 'dist/verificationsByUserId.json', + JSON.stringify(verificationsByUserId, jsonBigint, 2) + ).catch((e) => { + log.error(e, 'Error writing verificationsByUserId.json') + }) + } + + const { data: hodlerAddresses, error: hodlerAddressesError } = await fetchAllHodlers( + distribution.id + ) + + if (hodlerAddressesError) { + throw hodlerAddressesError + } + + if (hodlerAddresses === null || hodlerAddresses.length === 0) { + throw new Error('No hodler addresses found') + } + + const hodlerAddressesByUserId = hodlerAddresses.reduce( + (acc, address) => { + acc[address.user_id] = address + return acc + }, + {} as Record + ) + const hodlerUserIdByAddress = hodlerAddresses.reduce( + (acc, address) => { + acc[address.address] = address.user_id + return acc + }, + {} as Record + ) + + log.info(`Found ${hodlerAddresses.length} addresses.`) + // log.debug({ hodlerAddresses }) + if (log.isLevelEnabled('debug')) { + await Bun.write( + 'dist/hodlerAddresses.json', + JSON.stringify(hodlerAddresses, jsonBigint, 2) + ).catch((e) => { + log.error(e, 'Error writing hodlerAddresses.json') + }) + } + + // lookup balances of all hodler addresses in qualification period + const batches = inBatches(hodlerAddresses).flatMap(async (addresses) => { + return await Promise.all( + fetchAllBalances({ + addresses, + distribution, + }) + ) + }) + + let minBalanceAddresses: { user_id: string; address: `0x${string}`; balance: string }[] = [] + for await (const batch of batches) { + minBalanceAddresses = minBalanceAddresses.concat(...batch) + } + + log.info(`Found ${minBalanceAddresses.length} balances.`) + // log.debug({ balances }) + + // Filter out hodler with not enough send token balance + minBalanceAddresses = minBalanceAddresses.filter( + ({ balance }) => BigInt(balance) >= BigInt(distribution.hodler_min_balance) + ) + + log.info( + `Found ${minBalanceAddresses.length} balances after filtering hodler_min_balance of ${distribution.hodler_min_balance}` + ) + // log.debug({ balances }) + + if (log.isLevelEnabled('debug')) { + await Bun.write( + 'dist/balances.json', + JSON.stringify(minBalanceAddresses, jsonBigint, 2) + ).catch((e) => { + log.error(e, 'Error writing balances.json') + }) + } + + // Calculate hodler pool share weights + const distAmt = BigInt(distribution.amount) + const hodlerPoolBips = BigInt(distribution.hodler_pool_bips) + const fixedPoolBips = BigInt(distribution.fixed_pool_bips) + const bonusPoolBips = BigInt(distribution.bonus_pool_bips) + const hodlerPoolAvailableAmount = calculatePercentageWithBips(distAmt, hodlerPoolBips) + const minBalanceByAddress: Record = minBalanceAddresses.reduce( + (acc, balance) => { + acc[balance.address] = BigInt(balance.balance) + return acc + }, + {} as Record + ) + const { totalWeight, weightPerSend, poolWeights, weightedShares } = calculateWeights( + minBalanceAddresses, + hodlerPoolAvailableAmount + ) + + log.info( + { totalWeight, hodlerPoolAvailableAmount, weightPerSend }, + `Calculated ${Object.keys(poolWeights).length} weights.` + ) + // log.debug({ poolWeights }) + if (log.isLevelEnabled('debug')) { + await Bun.write('dist/poolWeights.json', JSON.stringify(poolWeights, jsonBigint, 2)).catch( + (e) => { + log.error(e, 'Error writing poolWeights.json') + } + ) + } + + if (totalWeight === 0n) { + log.warn('Total weight is 0. Skipping distribution.') + return + } + + const fixedPoolAvailableAmount = calculatePercentageWithBips(distAmt, fixedPoolBips) + let fixedPoolAllocatedAmount = 0n + const fixedPoolAmountsByAddress: Record = {} + const bonusPoolBipsByAddress: Record = {} + const maxBonusPoolBips = (bonusPoolBips * PERC_DENOM) / hodlerPoolBips // 3500*10000/6500 = 5384.615384615385% 1.53X + + for (const [userId, verifications] of Object.entries(verificationsByUserId)) { + const hodler = hodlerAddressesByUserId[userId] + if (!hodler || !hodler.address) { + continue + } + const { address } = hodler + if (!minBalanceByAddress[address]) { + continue + } + for (const verification of verifications) { + const { fixedValue, bipsValue } = verificationValues[verification.type] + if (fixedValue && fixedPoolAllocatedAmount + fixedValue <= fixedPoolAvailableAmount) { + if (fixedPoolAmountsByAddress[address] === undefined) { + fixedPoolAmountsByAddress[address] = 0n + } + fixedPoolAmountsByAddress[address] += fixedValue + fixedPoolAllocatedAmount += fixedValue + } + if (bipsValue) { + bonusPoolBipsByAddress[address] = (bonusPoolBipsByAddress[address] || 0n) as bigint + bonusPoolBipsByAddress[address] += bipsValue + bonusPoolBipsByAddress[address] = + (bonusPoolBipsByAddress[address] as bigint) > maxBonusPoolBips + ? maxBonusPoolBips + : (bonusPoolBipsByAddress[address] as bigint) // cap at max bonus pool bips + } + } + } + + const hodlerShares = Object.values(weightedShares) + let totalAmount = 0n + let totalHodlerPoolAmount = 0n + let totalBonusPoolAmount = 0n + let totalFixedPoolAmount = 0n + + log.info( + { + maxBonusPoolBips, + }, + 'Calculated fixed & bonus pool amounts.' + ) + // log.debug({ hodlerShares, fixedPoolAmountsByAddress, bonusPoolBipsByAddress }) + if (log.isLevelEnabled('debug')) { + await Bun.write('dist/hodlerShares.json', JSON.stringify(hodlerShares, jsonBigint, 2)).catch( + (e) => { + log.error(e, 'Error writing hodlerShares.json') + } + ) + await Bun.write( + 'dist/fixedPoolAmountsByAddress.json', + JSON.stringify(fixedPoolAmountsByAddress, jsonBigint, 2) + ).catch((e) => { + log.error(e, 'Error writing fixedPoolAmountsByAddress.json') + }) + await Bun.write( + 'dist/bonusPoolBipsByAddress.json', + JSON.stringify(bonusPoolBipsByAddress, jsonBigint, 2) + ).catch((e) => { + log.error(e, 'Error writing bonusPoolBipsByAddress.json') + }) + } + const shares = hodlerShares + .map((share) => { + const userId = hodlerUserIdByAddress[share.address] + const bonusBips = bonusPoolBipsByAddress[share.address] || 0n + const hodlerPoolAmount = share.amount + const bonusPoolAmount = calculatePercentageWithBips(hodlerPoolAmount, bonusBips) + const fixedPoolAmount = fixedPoolAmountsByAddress[share.address] || 0n + const amount = hodlerPoolAmount + bonusPoolAmount + fixedPoolAmount + totalAmount += amount + totalHodlerPoolAmount += hodlerPoolAmount + totalBonusPoolAmount += bonusPoolAmount + totalFixedPoolAmount += fixedPoolAmount + + if (!userId) { + log.debug({ share }, 'Hodler not found for address. Skipping share.') + return null + } + + // log.debug( + // { + // address: share.address, + // balance: balancesByAddress[share.address], + // amount: amount, + // bonusBips, + // hodlerPoolAmount, + // bonusPoolAmount, + // fixedPoolAmount, + // }, + // 'Calculated share.' + // ) + + // @ts-expect-error supabase-js does not support bigint + return { + address: share.address, + distribution_id: distribution.id, + user_id: userId, + amount: amount.toString(), + bonus_pool_amount: bonusPoolAmount.toString(), + fixed_pool_amount: fixedPoolAmount.toString(), + hodler_pool_amount: hodlerPoolAmount.toString(), + } as Tables<'distribution_shares'> + }) + .filter(Boolean) as Tables<'distribution_shares'>[] + + log.info( + { + totalAmount, + totalHodlerPoolAmount, + hodlerPoolAvailableAmount, + totalBonusPoolAmount, + totalFixedPoolAmount, + fixedPoolAllocatedAmount, + fixedPoolAvailableAmount, + maxBonusPoolBips, + name: distribution.name, + shares: shares.length, + }, + 'Distribution totals' + ) + log.info(`Calculated ${shares.length} shares.`) + // log.debug({ shares }) + if (log.isLevelEnabled('debug')) { + await Bun.write('dist/shares.json', JSON.stringify(shares, jsonBigint, 2)).catch((e) => { + log.error(e, 'Error writing shares.json') + }) + } + + if (totalFixedPoolAmount > fixedPoolAvailableAmount) { + log.warn( + 'Fixed pool amount is greater than available amount. This is not a problem, but it means the fixed pool is exhausted.' + ) + } + + // ensure share amounts do not exceed the total distribution amount, ideally this should be done in the database + const totalShareAmounts = shares.reduce((acc, share) => acc + BigInt(share.amount), 0n) + if (totalShareAmounts > distAmt) { + throw new Error('Share amounts exceed total distribution amount') + } + + const { error } = await createDistributionShares(distribution.id, shares) + if (error) { + log.error({ error: error.message, code: error.code }, 'Error saving shares.') + throw error + } + } + + private async worker() { + this.log.info('Starting distributor...', { id: this.id }) + + while (this.running) { + try { + await this.calculateDistributions() + } catch (error) { + this.log.error(error, `Error processing block. ${(error as Error).message}`) + } + await sleep(60_000) // sleep for 1 minute + } + + this.log.info('Distributor stopped.') + } + + public async stop() { + this.log.info('Stopping distributor...') + this.running = false + return await this.workerPromise + } + + public async calculateDistribution(id: string) { + const { data: distribution, error } = await fetchDistribution(id) + if (error) { + this.log.error({ error: error.message, code: error.code }, 'Error fetching distribution.') + throw error + } + try { + return this._calculateDistributionShares(distribution) + } catch (error) { + this.log.error(error, 'Error calculating distribution.') + throw error + } + } + + public toJSON() { + return { + id: this.id, + running: this.running, + lastDistributionId: this.lastDistributionId, + } + } +} diff --git a/packages/snaplet/.snaplet/dataModel.json b/packages/snaplet/.snaplet/dataModel.json index 82febd6a1..e702d5be7 100644 --- a/packages/snaplet/.snaplet/dataModel.json +++ b/packages/snaplet/.snaplet/dataModel.json @@ -9637,4 +9637,4 @@ ] } } -} \ No newline at end of file +}