Skip to content

Instantly share code, notes, and snippets.

@Yuripetusko
Last active August 21, 2024 12:15
Show Gist options
  • Save Yuripetusko/e8e1ccb899d18e6fedbed1eca7efaedc to your computer and use it in GitHub Desktop.
Save Yuripetusko/e8e1ccb899d18e6fedbed1eca7efaedc to your computer and use it in GitHub Desktop.
import { EntityClass, FindOneOptions, Store } from '@subsquid/typeorm-store';
import type { FindOptionsRelations, FindOptionsWhere } from 'typeorm';
import { In } from 'typeorm';
export const splitIntoBatches = <T>(array: T[], maxBatchSize: number): T[][] => {
const result: T[][] = [];
for (let i = 0; i < array.length; i += maxBatchSize) {
const chunk = array.slice(i, i + maxBatchSize);
result.push(chunk);
}
return result;
};
// @ts-ignore
abstract class StoreWithEntityManager extends Store {
// @ts-ignore
public em: () => EntityManager;
}
export type DbStore = StoreWithEntityManager;
type ID = string;
interface EntityWithId {
id: ID;
}
export class EntitiesManager<Entity extends EntityWithId> {
store: DbStore;
entity: EntityClass<Entity>;
entitiesMap: Map<string, Entity> = new Map();
prefetchItemIdsList: string[] = [];
constructor(entity: EntityClass<Entity>, store: DbStore) {
this.entity = entity;
this.store = store;
}
add(entity: Entity): void {
this.entitiesMap.set(entity.id, entity);
}
/**
* Get entity by ID either from local cache or DB, if it's not existing in
* local cache ("entitiesMap").
*
* @param id: string
* @param relations?: FindOptionsRelations<Entity>
*/
async get(id: ID, relations?: FindOptionsRelations<Entity>): Promise<Entity | null> {
if (!this.store) {
throw new Error('context is not defined');
}
let item = this.entitiesMap.get(id) || null;
if (!item) {
const requestParams = {
where: { id },
} as FindOneOptions<Entity>;
if (relations) {
requestParams.relations = relations;
}
item = (await this.store.findOne(this.entity, requestParams)) || null;
if (item) {
this.add(item);
}
}
return item;
}
async getOrThrow(id: ID, relations?: FindOptionsRelations<Entity>): Promise<Entity> {
const entity = await this.get(id, relations);
if (!entity) {
throw new Error(`Entity: ${this.entity.name} with id ${id} expected to exist`);
}
return entity;
}
async getOrCreate(
id: ID,
creator: () => Promise<Entity>,
relations?: FindOptionsRelations<Entity>,
): Promise<Entity> {
if (!this.store) throw new Error('context is not defined');
let item = await this.get(id, relations);
if (!item) {
item = await creator();
item.id = id;
this.add(item);
}
return item;
}
//TODO: optimize deletion
async remove(id: ID, soft = false): Promise<void> {
if (!this.store) {
throw new Error('context is not defined');
}
const item = await this.get(id);
if (item) {
this.entitiesMap.delete(id);
if (await this.store.findOneBy(this.entity, { id: item.id } as any)) {
if (soft) {
await this.store.em().softRemove(this.entity, item);
} else {
await this.store.remove(item);
}
}
}
}
/**
* Save all entities from local cache at once.
* This action should be evoked in the end of batch data processing flow.
*/
async saveAll(): Promise<void> {
if (!this.store) {
throw new Error('context is not defined');
}
const entitiesList = [...this.entitiesMap.values()];
await this.store.save(entitiesList);
}
/**
* Clears current cache
*/
async resetAll(): Promise<void> {
this.entitiesMap.clear();
}
/**
* Add entity ID to the list for prefetch process.
*/
addPrefetchItemId(itemIdOrList: ID | ID[]): void {
if (Array.isArray(itemIdOrList)) {
this.prefetchItemIdsList.push(...itemIdOrList);
} else {
this.prefetchItemIdsList.push(itemIdOrList);
}
}
/**
* Clear collected list of entity IDs for prefetch process.
*/
resetPrefetchItemIdsList(): void {
this.prefetchItemIdsList = [];
}
/**
* Prefetch entities which are collected in the beginning of the batch
* data processing flow.
*
* @param relations
*/
async prefetchEntities(relations?: FindOptionsRelations<Entity>): Promise<void> {
if (!this.store) throw new Error('context is not defined');
if (!this.prefetchItemIdsList || this.prefetchItemIdsList.length === 0) {
return;
}
for (const chunk of splitIntoBatches(this.prefetchItemIdsList, 1000)) {
const ids = chunk.map((cid) => cid);
const idsNotInCache = ids.filter((id) => !this.entitiesMap.has(id));
const chunkRes = await this.store.find(this.entity, {
where: {
id: In(idsNotInCache),
} as FindOptionsWhere<Entity>,
...(!!relations && { relations }),
});
for (const chunkResItem of chunkRes) {
this.add(chunkResItem);
}
}
this.resetPrefetchItemIdsList();
}
}
type EventArgs = {
[key: string]: Codec<any> & { indexed?: boolean };
};
export type Logger = SquidLogger;
export const logger: Logger = createLogger('sqd:processor');
export type FuncContext = {
log: Logger;
store: DbStore;
};
export type ContainsCid = (ipfsUrl?: string | null) => {
containsCid: boolean;
cid: string | undefined;
};
export type FetchIpfsNftData = (
metadataUri?: string | null,
ipfsGateway?: string,
) => Promise<{ data: Metadata; provider?: string } | null>;
export type IpfsUtils = {
containsCID: ContainsCid;
fetchIpfsNftData: FetchIpfsNftData;
};
export const questsHelperContractTopics = [
questsHelperAbi.events.QuestConfigured.topic,
questsHelperAbiOld.events.QuestConfigured.topic,
];
export const landSaleContractTopics = [skybreachLands.events.CustomAssetSet.topic];
export const skybreachContractTopics = [
skybreachAbi.events.FinishedLandSurvey.topic,
skybreachAbi.events.StakedNft.topic,
skybreachAbi.events.FinishedSettlementSurvey.topic,
skybreachAbi.events.MovingNft.topic,
skybreachAbi.events.StartedSurvey.topic,
skybreachAbi.events.UnstakedNft.topic,
skybreachAbi.events.StartedQuest.topic,
skybreachAbi.events.FinishedQuest.topic,
skybreachAbi.events.BreachDungeonFinished.topic,
// skybreachAbi.events.SpentResources.topic,
// skybreachAbi.events.StartedBuilding.topic,
// skybreachAbi.events.StartedGatheringResources.topic,
skybreachAbi.events.AbandonedQuest.topic,
skybreachAbi.events.ActiveCommonLandSet.topic,
];
export const nftContractTopics = [rmrkNftAbi.events.Transfer.topic];
export const allSupportedTopics = [
...landSaleContractTopics,
...questsHelperContractTopics,
...skybreachContractTopics,
...nftContractTopics,
];
export const runProcessor = async (
blocks: BlockData<Fields>[],
ctx: FuncContext,
batchState: BatchState,
ipfsUtils: IpfsUtils,
) => {
const filteredBlocks = filterRelevantBlocks(blocks);
for (const block of filteredBlocks) {
const blockEntity = createDbBlock(block);
batchState.cachedState.blocks.add(blockEntity);
}
const filteredLogs = filteredBlocks.flatMap((b) => b.logs);
await processNftLogs(filteredLogs, ctx, batchState, ipfsUtils);
await processLandDeedLogs(filteredLogs, ctx, batchState, ipfsUtils);
await processQuestsHelperLogs(filteredLogs, ctx, batchState, ipfsUtils);
await processSkybreachLogs(filteredLogs, ctx, batchState, ipfsUtils);
await saveAll(batchState.cachedState);
await batchState.eventFacade.pushEvents();
await resetAll(batchState.cachedState);
};
export const getLogHandlerContext = (
log: Log,
ctx: FuncContext,
batchState: BatchState,
ipfsUtils: IpfsUtils,
blockEntity: Block,
) => {
const handlerContext: HandlerContext = {
dbBlock: blockEntity,
ctx,
batchState,
ipfsUtils,
eventFacade: {
addEvent(event: Event) {
event.id = log.id;
event.block = blockEntity;
event.transactionHash = log.transaction.hash;
event.from = log.transaction.from;
batchState.eventFacade.addEvent(event);
},
},
};
return handlerContext;
};
export type HandlerContext = {
ctx: FuncContext;
dbBlock: Block;
batchState: BatchState;
ipfsUtils: IpfsUtils;
eventFacade: Pick<EventFacade, 'addEvent'>;
};
export type Handler = (event: Log, handlerCtx: HandlerContext) => Promise<any>;
export function generateHandlerFromProcessor<
E extends EventArgs,
T extends { [key: string]: AbiEvent<E> },
K extends keyof T,
>(
event: T[K],
processor: (
decoded: ReturnType<typeof event.decode>,
item: Log,
ctx: HandlerContext,
) => Promise<any>,
): [string, Handler] {
return [event.topic, (item, ctx): Promise<void> => processor(event.decode(item), item, ctx)];
}
export type Processor<T> = (
decodedEvent: T,
originalEvent: Log,
handlerCtx: HandlerContext,
) => Promise<unknown>;
import { Log } from '../../main';
import {
FuncContext,
generateHandlerFromProcessor,
getLogHandlerContext,
Handler,
IpfsUtils,
Processor,
questsHelperContractTopics,
} from '../../processor';
import { BatchState } from '../../batch-state';
import * as questsHelperAbi from '../../abi/QuestsHelperAbi';
import * as questsHelperAbiOld from '../../abi/QuestsHelperAbi_old';
import { questsHelperPrefetch } from './quests-helper-prefetchers';
import {
Erc20,
Event,
EventType,
LandResources,
NftByAsset,
Plot,
Quest,
QuestConfiguredPayload,
QuestInputs,
QuestLimits,
QuestRewards,
Settlement,
} from '../../model';
import {
getQuestId,
getTokenDatabaseId,
safeBigintIsoTimestampToJsTimestamp,
} from '../../utils/helpers';
import { config } from '../../config';
import { LAND_AS_SETTLEMENT_OFFSET } from '../../constants';
export const questsHelperEvents = [
questsHelperAbi.events.QuestConfigured,
questsHelperAbiOld.events.QuestConfigured,
];
export type QuestsHelperEvent = (typeof questsHelperEvents)[number];
export const processQuestConfigured: Processor<
ReturnType<(typeof questsHelperAbi.events)['QuestConfigured']['decode']>
> = async (decodedEvent, originalEvent, ctx) => {
const {
config: { questId, landId, questType, duration },
seed,
difficulty,
} = decodedEvent;
const settlementId = getTokenDatabaseId(
config.SETTLEMENT_CONTRACT_ADDRESS,
LAND_AS_SETTLEMENT_OFFSET + landId,
);
const settlement = await ctx.batchState.cachedState.settlements.getOrCreate(
settlementId,
async () =>
new Settlement({
id: settlementId,
metadataUri: '',
owner: config.SKYBREACH_CONTRACT_ADDRESSES,
}),
);
let plot = await ctx.batchState.cachedState.plots.get(
getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId),
);
if (landId !== 0n && !plot) {
plot = new Plot({
id: getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId),
});
ctx.batchState.cachedState.plots.add(plot);
}
const questInputs = new QuestInputs({
rmrk: decodedEvent.config.inputs.rmrk,
sila: decodedEvent.config.inputs.sila,
otherResources: decodedEvent.config.inputs.otherResources.map(
(resource) => new Erc20(resource),
),
skybreachResources: new LandResources(decodedEvent.config.inputs.skybreachResources),
});
const firstTimeRewards = new QuestRewards({
...decodedEvent.config.firstTimeRewards,
nfts: decodedEvent.config.firstTimeRewards.nfts.map((nft) => new NftByAsset(nft)),
otherResources: decodedEvent.config.firstTimeRewards.otherResources.map(
(resource) => new Erc20(resource),
),
skybreachResources: new LandResources(decodedEvent.config.firstTimeRewards.skybreachResources),
});
const recurrentRewards = new QuestRewards({
...decodedEvent.config.recurrentRewards,
nfts: decodedEvent.config.recurrentRewards.nfts.map((nft) => new NftByAsset(nft)),
otherResources: decodedEvent.config.recurrentRewards.otherResources.map(
(resource) => new Erc20(resource),
),
skybreachResources: new LandResources(decodedEvent.config.recurrentRewards.skybreachResources),
});
const questLimits = new QuestLimits({
...decodedEvent.config.limits,
minStartTime:
decodedEvent.config.limits.minStartTime > 0n
? safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.minStartTime)
: ctx.dbBlock.timestamp,
maxStartTime: safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.maxStartTime),
});
const quest = await ctx.batchState.cachedState.quests.getOrCreate(
getQuestId(questId),
async () =>
new Quest({
id: getQuestId(questId),
}),
);
quest.seed = seed;
quest.duration = duration;
quest.questType = questType;
quest.inputs = questInputs;
quest.firstTimeRewards = firstTimeRewards;
quest.recurrentRewards = recurrentRewards;
quest.limits = questLimits;
quest.plot = plot;
quest.settlement = settlement;
quest.difficulty = difficulty;
ctx.batchState.cachedState.quests.add(quest);
ctx.eventFacade.addEvent(
new Event({
eventType: EventType.QuestConfigured,
plot,
quest,
payload: new QuestConfiguredPayload({
...decodedEvent.config,
inputs: questInputs,
firstTimeRewards,
recurrentRewards,
limits: questLimits,
seed,
difficulty,
}),
}),
);
};
export const processQuestConfiguredOld: Processor<
ReturnType<(typeof questsHelperAbiOld.events)['QuestConfigured']['decode']>
> = async (decodedEvent, originalEvent, ctx) => {
const {
config: { questId, landId, questType, duration },
seed,
} = decodedEvent;
const settlementId = getTokenDatabaseId(
config.SETTLEMENT_CONTRACT_ADDRESS,
LAND_AS_SETTLEMENT_OFFSET + landId,
);
const settlement = await ctx.batchState.cachedState.settlements.getOrCreate(
settlementId,
async () =>
new Settlement({
id: settlementId,
metadataUri: '',
owner: config.SKYBREACH_CONTRACT_ADDRESSES,
}),
);
let plot = await ctx.batchState.cachedState.plots.get(
getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId),
);
if (landId !== 0n && !plot) {
plot = new Plot({
id: getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId),
});
ctx.batchState.cachedState.plots.add(plot);
}
const questInputs = new QuestInputs({
rmrk: decodedEvent.config.inputs.rmrk,
sila: decodedEvent.config.inputs.sila,
otherResources: decodedEvent.config.inputs.otherResources.map(
(resource) => new Erc20(resource),
),
skybreachResources: new LandResources(decodedEvent.config.inputs.skybreachResources),
});
const firstTimeRewards = new QuestRewards({
...decodedEvent.config.firstTimeRewards,
nfts: decodedEvent.config.firstTimeRewards.nfts.map((nft) => new NftByAsset(nft)),
otherResources: decodedEvent.config.firstTimeRewards.otherResources.map(
(resource) => new Erc20(resource),
),
skybreachResources: new LandResources(decodedEvent.config.firstTimeRewards.skybreachResources),
});
const recurrentRewards = new QuestRewards({
...decodedEvent.config.recurrentRewards,
nfts: decodedEvent.config.recurrentRewards.nfts.map((nft) => new NftByAsset(nft)),
otherResources: decodedEvent.config.recurrentRewards.otherResources.map(
(resource) => new Erc20(resource),
),
skybreachResources: new LandResources(decodedEvent.config.recurrentRewards.skybreachResources),
});
const questLimits = new QuestLimits({
...decodedEvent.config.limits,
minStartTime:
decodedEvent.config.limits.minStartTime > 0n
? safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.minStartTime)
: ctx.dbBlock.timestamp,
maxStartTime: safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.maxStartTime),
});
const quest = await ctx.batchState.cachedState.quests.getOrCreate(
getQuestId(questId),
async () =>
new Quest({
id: getQuestId(questId),
}),
);
quest.seed = seed;
quest.duration = duration;
quest.questType = questType;
quest.inputs = questInputs;
quest.firstTimeRewards = firstTimeRewards;
quest.recurrentRewards = recurrentRewards;
quest.limits = questLimits;
quest.plot = plot;
quest.settlement = settlement;
quest.difficulty = 4;
ctx.batchState.cachedState.quests.add(quest);
ctx.eventFacade.addEvent(
new Event({
eventType: EventType.QuestConfigured,
plot,
quest,
payload: new QuestConfiguredPayload({
...decodedEvent.config,
inputs: questInputs,
firstTimeRewards,
recurrentRewards,
limits: questLimits,
seed,
difficulty: 4,
}),
}),
);
};
export const questsHelperHandlerProcessor: [QuestsHelperEvent, any][] = [
[questsHelperAbi.events.QuestConfigured, processQuestConfigured],
[questsHelperAbiOld.events.QuestConfigured, processQuestConfiguredOld],
];
export const questsHelperHandlers: { [key: string]: Handler } = Object.fromEntries(
questsHelperHandlerProcessor.map((args) => generateHandlerFromProcessor(...args)),
);
export const processQuestsHelperLog = async (
log: Log,
ctx: FuncContext,
batchState: BatchState,
ipfsUtils: IpfsUtils,
) => {
const topic = log.topics[0];
const blockEntity = await batchState.cachedState.blocks.getOrThrow(`${log.block.id}`);
const handlerContext = getLogHandlerContext(log, ctx, batchState, ipfsUtils, blockEntity);
switch (true) {
case questsHelperContractTopics.includes(topic): {
await questsHelperHandlers[topic](log, handlerContext);
break;
}
}
};
async function processLogPrefetch(
log: Log,
ctx: FuncContext,
batchState: BatchState,
ipfsUtils: IpfsUtils,
) {
const blockEntity = await batchState.cachedState.blocks.getOrThrow(`${log.block.id}`);
const topic = log.topics[0];
const handlerContext = getLogHandlerContext(log, ctx, batchState, ipfsUtils, blockEntity);
switch (true) {
case !!questsHelperContractTopics[topic]: {
await questsHelperPrefetch(topic, log, handlerContext);
break;
}
}
}
export const processQuestsHelperLogs = async (
logs: Log[],
ctx: FuncContext,
batchState: BatchState,
ipfsUtils: IpfsUtils,
) => {
for (const log of logs) {
await processLogPrefetch(log, ctx, batchState, ipfsUtils);
}
await batchState.cachedState.plots.prefetchEntities();
for (const log of logs) {
await processQuestsHelperLog(log, ctx, batchState, ipfsUtils);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment