Skip to content

Commit

Permalink
Merge pull request #2201 from hirosystems/develop
Browse files Browse the repository at this point in the history
release to master
  • Loading branch information
rafaelcr authored Jan 20, 2025
2 parents f00f286 + 26c53dc commit aff882c
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 134 deletions.
87 changes: 87 additions & 0 deletions migrations/1734712921681_drop-nft-custody-unanchored.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/* eslint-disable camelcase */

exports.shorthands = undefined;

exports.up = pgm => {
pgm.dropTable('nft_custody_unanchored');
};

exports.down = pgm => {
pgm.createTable('nft_custody_unanchored', {
asset_identifier: {
type: 'string',
notNull: true,
},
value: {
type: 'bytea',
notNull: true,
},
recipient: {
type: 'text',
},
block_height: {
type: 'integer',
notNull: true,
},
index_block_hash: {
type: 'bytea',
notNull: true,
},
parent_index_block_hash: {
type: 'bytea',
notNull: true,
},
microblock_hash: {
type: 'bytea',
notNull: true,
},
microblock_sequence: {
type: 'integer',
notNull: true,
},
tx_id: {
type: 'bytea',
notNull: true,
},
tx_index: {
type: 'smallint',
notNull: true,
},
event_index: {
type: 'integer',
notNull: true,
},
});
pgm.createConstraint('nft_custody_unanchored', 'nft_custody_unanchored_unique', 'UNIQUE(asset_identifier, value)');
pgm.createIndex('nft_custody_unanchored', ['recipient', 'asset_identifier']);
pgm.createIndex('nft_custody_unanchored', 'value');
pgm.createIndex('nft_custody_unanchored', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },
{ name: 'tx_index', sort: 'DESC' },
{ name: 'event_index', sort: 'DESC' }
]);
pgm.sql(`
INSERT INTO nft_custody_unanchored (asset_identifier, value, recipient, tx_id, block_height, index_block_hash, parent_index_block_hash, microblock_hash, microblock_sequence, tx_index, event_index) (
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height,
nft.index_block_hash, nft.parent_index_block_hash, nft.microblock_hash, nft.microblock_sequence, nft.tx_index, nft.event_index
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
)
`);
};
3 changes: 0 additions & 3 deletions src/api/routes/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export const TokenRoutes: FastifyPluginAsync<
),
limit: LimitParam(ResourceType.Token, 'Limit', 'max number of tokens to fetch'),
offset: OffsetParam('Offset', 'index of first tokens to fetch'),
unanchored: UnanchoredParamSchema,
tx_metadata: Type.Boolean({
default: false,
description:
Expand Down Expand Up @@ -95,15 +94,13 @@ export const TokenRoutes: FastifyPluginAsync<

const limit = getPagingQueryLimit(ResourceType.Token, req.query.limit);
const offset = parsePagingQueryInput(req.query.offset ?? 0);
const includeUnanchored = req.query.unanchored ?? false;
const includeTxMetadata = req.query.tx_metadata ?? false;

const { results, total } = await fastify.db.getNftHoldings({
principal: principal,
assetIdentifiers: assetIdentifiers,
offset: offset,
limit: limit,
includeUnanchored: includeUnanchored,
includeTxMetadata: includeTxMetadata,
});
const parsedResults = results.map(result => {
Expand Down
15 changes: 10 additions & 5 deletions src/datastore/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3356,16 +3356,12 @@ export class PgStore extends BasePgStore {
assetIdentifiers?: string[];
limit: number;
offset: number;
includeUnanchored: boolean;
includeTxMetadata: boolean;
}): Promise<{ results: NftHoldingInfoWithTxMetadata[]; total: number }> {
const queryArgs: (string | string[] | number)[] = [args.principal, args.limit, args.offset];
if (args.assetIdentifiers) {
queryArgs.push(args.assetIdentifiers);
}
const nftCustody = args.includeUnanchored
? this.sql(`nft_custody_unanchored`)
: this.sql(`nft_custody`);
const assetIdFilter =
args.assetIdentifiers && args.assetIdentifiers.length > 0
? this.sql`AND nft.asset_identifier IN ${this.sql(args.assetIdentifiers)}`
Expand All @@ -3375,7 +3371,7 @@ export class PgStore extends BasePgStore {
>`
WITH nft AS (
SELECT *, (COUNT(*) OVER())::INTEGER AS count
FROM ${nftCustody} AS nft
FROM nft_custody AS nft
WHERE nft.recipient = ${args.principal}
${assetIdFilter}
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
Expand Down Expand Up @@ -4528,4 +4524,13 @@ export class PgStore extends BasePgStore {
`;
if (result.count) return result[0];
}

async getStacksBlockCountAtPreviousBurnBlock(): Promise<number> {
const result = await this.sql<{ count: string }[]>`
SELECT COUNT(*) AS count
FROM blocks
WHERE burn_block_height = (SELECT burn_block_height - 1 FROM chain_tip) AND canonical = TRUE
`;
return parseInt(result[0]?.count ?? '0');
}
}
124 changes: 57 additions & 67 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1416,10 +1416,9 @@ export class PgWriteStore extends PgStore {
INSERT INTO nft_events ${sql(nftEventInserts)}
`;
if (tx.canonical && tx.microblock_canonical) {
const table = microblock ? sql`nft_custody_unanchored` : sql`nft_custody`;
await sql`
INSERT INTO ${table} ${sql(Array.from(custodyInsertsMap.values()))}
ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET
INSERT INTO nft_custody ${sql(Array.from(custodyInsertsMap.values()))}
ON CONFLICT ON CONSTRAINT nft_custody_unique DO UPDATE SET
tx_id = EXCLUDED.tx_id,
index_block_hash = EXCLUDED.index_block_hash,
parent_index_block_hash = EXCLUDED.parent_index_block_hash,
Expand All @@ -1431,22 +1430,22 @@ export class PgWriteStore extends PgStore {
block_height = EXCLUDED.block_height
WHERE
(
EXCLUDED.block_height > ${table}.block_height
EXCLUDED.block_height > nft_custody.block_height
)
OR (
EXCLUDED.block_height = ${table}.block_height
AND EXCLUDED.microblock_sequence > ${table}.microblock_sequence
EXCLUDED.block_height = nft_custody.block_height
AND EXCLUDED.microblock_sequence > nft_custody.microblock_sequence
)
OR (
EXCLUDED.block_height = ${table}.block_height
AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence
AND EXCLUDED.tx_index > ${table}.tx_index
EXCLUDED.block_height = nft_custody.block_height
AND EXCLUDED.microblock_sequence = nft_custody.microblock_sequence
AND EXCLUDED.tx_index > nft_custody.tx_index
)
OR (
EXCLUDED.block_height = ${table}.block_height
AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence
AND EXCLUDED.tx_index = ${table}.tx_index
AND EXCLUDED.event_index > ${table}.event_index
EXCLUDED.block_height = nft_custody.block_height
AND EXCLUDED.microblock_sequence = nft_custody.microblock_sequence
AND EXCLUDED.tx_index = nft_custody.tx_index
AND EXCLUDED.event_index > nft_custody.event_index
)
`;
}
Expand Down Expand Up @@ -1781,6 +1780,12 @@ export class PgWriteStore extends PgStore {
});
}

async updateBurnChainBlockHeight(args: { blockHeight: number }): Promise<void> {
await this.sql`
UPDATE chain_tip SET burn_block_height = GREATEST(${args.blockHeight}, burn_block_height)
`;
}

async insertSlotHoldersBatch(sql: PgSqlClient, slotHolders: DbRewardSlotHolder[]): Promise<void> {
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
canonical: true,
Expand Down Expand Up @@ -2515,10 +2520,6 @@ export class PgWriteStore extends PgStore {
AND (index_block_hash = ${args.indexBlockHash} OR index_block_hash = '\\x'::bytea)
AND tx_id IN ${sql(txIds)}
`;
await this.updateNftCustodyFromReOrg(sql, {
index_block_hash: args.indexBlockHash,
microblocks: args.microblocks,
});
}

// Update unanchored tx count in `chain_tip` table
Expand All @@ -2539,54 +2540,46 @@ export class PgWriteStore extends PgStore {
sql: PgSqlClient,
args: {
index_block_hash: string;
microblocks: string[];
}
): Promise<void> {
for (const table of [sql`nft_custody`, sql`nft_custody_unanchored`]) {
await sql`
INSERT INTO ${table}
(asset_identifier, value, tx_id, index_block_hash, parent_index_block_hash, microblock_hash,
microblock_sequence, recipient, event_index, tx_index, block_height)
(
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, tx_id, txs.index_block_hash,
txs.parent_index_block_hash, txs.microblock_hash, txs.microblock_sequence, recipient,
nft.event_index, txs.tx_index, txs.block_height
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
AND nft.index_block_hash = ${args.index_block_hash}
${
args.microblocks.length > 0
? sql`AND nft.microblock_hash IN ${sql(args.microblocks)}`
: sql``
}
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
)
ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET
tx_id = EXCLUDED.tx_id,
index_block_hash = EXCLUDED.index_block_hash,
parent_index_block_hash = EXCLUDED.parent_index_block_hash,
microblock_hash = EXCLUDED.microblock_hash,
microblock_sequence = EXCLUDED.microblock_sequence,
recipient = EXCLUDED.recipient,
event_index = EXCLUDED.event_index,
tx_index = EXCLUDED.tx_index,
block_height = EXCLUDED.block_height
`;
}
await sql`
INSERT INTO nft_custody
(asset_identifier, value, tx_id, index_block_hash, parent_index_block_hash, microblock_hash,
microblock_sequence, recipient, event_index, tx_index, block_height)
(
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, tx_id, txs.index_block_hash,
txs.parent_index_block_hash, txs.microblock_hash, txs.microblock_sequence, recipient,
nft.event_index, txs.tx_index, txs.block_height
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
AND nft.index_block_hash = ${args.index_block_hash}
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
)
ON CONFLICT ON CONSTRAINT nft_custody_unique DO UPDATE SET
tx_id = EXCLUDED.tx_id,
index_block_hash = EXCLUDED.index_block_hash,
parent_index_block_hash = EXCLUDED.parent_index_block_hash,
microblock_hash = EXCLUDED.microblock_hash,
microblock_sequence = EXCLUDED.microblock_sequence,
recipient = EXCLUDED.recipient,
event_index = EXCLUDED.event_index,
tx_index = EXCLUDED.tx_index,
block_height = EXCLUDED.block_height
`;
}

/**
Expand Down Expand Up @@ -3050,10 +3043,7 @@ export class PgWriteStore extends PgStore {
updatedEntities.markedNonCanonical.nftEvents += nftResult.count;
}
if (nftResult.count)
await this.updateNftCustodyFromReOrg(sql, {
index_block_hash: indexBlockHash,
microblocks: [],
});
await this.updateNftCustodyFromReOrg(sql, { index_block_hash: indexBlockHash });
});
q.enqueue(async () => {
const pox2Result = await sql`
Expand Down
35 changes: 25 additions & 10 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async function handleBurnBlockMessage(
burnchainBlockHeight: burnBlockMsg.burn_block_height,
slotHolders: slotHolders,
});
await db.updateBurnChainBlockHeight({ blockHeight: burnBlockMsg.burn_block_height });
}

async function handleMempoolTxsMessage(rawTxs: string[], db: PgWriteStore): Promise<void> {
Expand Down Expand Up @@ -631,18 +632,32 @@ interface EventMessageHandler {
handleNewAttachment(msg: CoreNodeAttachmentMessage[], db: PgWriteStore): Promise<void> | void;
}

function createMessageProcessorQueue(): EventMessageHandler {
function createMessageProcessorQueue(db: PgWriteStore): EventMessageHandler {
// Create a promise queue so that only one message is handled at a time.
const processorQueue = new PQueue({ concurrency: 1 });

let eventTimer: prom.Histogram<'event'> | undefined;
let metrics:
| {
eventTimer: prom.Histogram;
blocksInPreviousBurnBlock: prom.Gauge;
}
| undefined;
if (isProdEnv) {
eventTimer = new prom.Histogram({
name: 'stacks_event_ingestion_timers',
help: 'Event ingestion timers',
labelNames: ['event'],
buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes
});
metrics = {
eventTimer: new prom.Histogram({
name: 'stacks_event_ingestion_timers',
help: 'Event ingestion timers',
labelNames: ['event'],
buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes
}),
blocksInPreviousBurnBlock: new prom.Gauge({
name: 'stacks_blocks_in_previous_burn_block',
help: 'Number of Stacks blocks produced in the previous burn block',
async collect() {
this.set(await db.getStacksBlockCountAtPreviousBurnBlock());
},
}),
};
}

const observeEvent = async (event: string, fn: () => Promise<void>) => {
Expand All @@ -651,7 +666,7 @@ function createMessageProcessorQueue(): EventMessageHandler {
await fn();
} finally {
const elapsedMs = timer.getElapsed();
eventTimer?.observe({ event }, elapsedMs);
metrics?.eventTimer.observe({ event }, elapsedMs);
}
};

Expand Down Expand Up @@ -738,7 +753,7 @@ export async function startEventServer(opts: {
serverPort?: number;
}): Promise<EventStreamServer> {
const db = opts.datastore;
const messageHandler = opts.messageHandler ?? createMessageProcessorQueue();
const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(db);

let eventHost = opts.serverHost ?? process.env['STACKS_CORE_EVENT_HOST'];
const eventPort = opts.serverPort ?? parseInt(process.env['STACKS_CORE_EVENT_PORT'] ?? '', 10);
Expand Down
Loading

0 comments on commit aff882c

Please sign in to comment.