Last active
August 21, 2024 12:15
-
-
Save Yuripetusko/e8e1ccb899d18e6fedbed1eca7efaedc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { EntityClass, FindOneOptions, Store } from '@subsquid/typeorm-store'; | |
import type { FindOptionsRelations, FindOptionsWhere } from 'typeorm'; | |
import { In } from 'typeorm'; | |
export const splitIntoBatches = <T>(array: T[], maxBatchSize: number): T[][] => { | |
const result: T[][] = []; | |
for (let i = 0; i < array.length; i += maxBatchSize) { | |
const chunk = array.slice(i, i + maxBatchSize); | |
result.push(chunk); | |
} | |
return result; | |
}; | |
// @ts-ignore | |
abstract class StoreWithEntityManager extends Store { | |
// @ts-ignore | |
public em: () => EntityManager; | |
} | |
export type DbStore = StoreWithEntityManager; | |
type ID = string; | |
interface EntityWithId { | |
id: ID; | |
} | |
export class EntitiesManager<Entity extends EntityWithId> { | |
store: DbStore; | |
entity: EntityClass<Entity>; | |
entitiesMap: Map<string, Entity> = new Map(); | |
prefetchItemIdsList: string[] = []; | |
constructor(entity: EntityClass<Entity>, store: DbStore) { | |
this.entity = entity; | |
this.store = store; | |
} | |
add(entity: Entity): void { | |
this.entitiesMap.set(entity.id, entity); | |
} | |
/** | |
* Get entity by ID either from local cache or DB, if it's not existing in | |
* local cache ("entitiesMap"). | |
* | |
* @param id: string | |
* @param relations?: FindOptionsRelations<Entity> | |
*/ | |
async get(id: ID, relations?: FindOptionsRelations<Entity>): Promise<Entity | null> { | |
if (!this.store) { | |
throw new Error('context is not defined'); | |
} | |
let item = this.entitiesMap.get(id) || null; | |
if (!item) { | |
const requestParams = { | |
where: { id }, | |
} as FindOneOptions<Entity>; | |
if (relations) { | |
requestParams.relations = relations; | |
} | |
item = (await this.store.findOne(this.entity, requestParams)) || null; | |
if (item) { | |
this.add(item); | |
} | |
} | |
return item; | |
} | |
async getOrThrow(id: ID, relations?: FindOptionsRelations<Entity>): Promise<Entity> { | |
const entity = await this.get(id, relations); | |
if (!entity) { | |
throw new Error(`Entity: ${this.entity.name} with id ${id} expected to exist`); | |
} | |
return entity; | |
} | |
async getOrCreate( | |
id: ID, | |
creator: () => Promise<Entity>, | |
relations?: FindOptionsRelations<Entity>, | |
): Promise<Entity> { | |
if (!this.store) throw new Error('context is not defined'); | |
let item = await this.get(id, relations); | |
if (!item) { | |
item = await creator(); | |
item.id = id; | |
this.add(item); | |
} | |
return item; | |
} | |
//TODO: optimize deletion | |
async remove(id: ID, soft = false): Promise<void> { | |
if (!this.store) { | |
throw new Error('context is not defined'); | |
} | |
const item = await this.get(id); | |
if (item) { | |
this.entitiesMap.delete(id); | |
if (await this.store.findOneBy(this.entity, { id: item.id } as any)) { | |
if (soft) { | |
await this.store.em().softRemove(this.entity, item); | |
} else { | |
await this.store.remove(item); | |
} | |
} | |
} | |
} | |
/** | |
* Save all entities from local cache at once. | |
* This action should be evoked in the end of batch data processing flow. | |
*/ | |
async saveAll(): Promise<void> { | |
if (!this.store) { | |
throw new Error('context is not defined'); | |
} | |
const entitiesList = [...this.entitiesMap.values()]; | |
await this.store.save(entitiesList); | |
} | |
/** | |
* Clears current cache | |
*/ | |
async resetAll(): Promise<void> { | |
this.entitiesMap.clear(); | |
} | |
/** | |
* Add entity ID to the list for prefetch process. | |
*/ | |
addPrefetchItemId(itemIdOrList: ID | ID[]): void { | |
if (Array.isArray(itemIdOrList)) { | |
this.prefetchItemIdsList.push(...itemIdOrList); | |
} else { | |
this.prefetchItemIdsList.push(itemIdOrList); | |
} | |
} | |
/** | |
* Clear collected list of entity IDs for prefetch process. | |
*/ | |
resetPrefetchItemIdsList(): void { | |
this.prefetchItemIdsList = []; | |
} | |
/** | |
* Prefetch entities which are collected in the beginning of the batch | |
* data processing flow. | |
* | |
* @param relations | |
*/ | |
async prefetchEntities(relations?: FindOptionsRelations<Entity>): Promise<void> { | |
if (!this.store) throw new Error('context is not defined'); | |
if (!this.prefetchItemIdsList || this.prefetchItemIdsList.length === 0) { | |
return; | |
} | |
for (const chunk of splitIntoBatches(this.prefetchItemIdsList, 1000)) { | |
const ids = chunk.map((cid) => cid); | |
const idsNotInCache = ids.filter((id) => !this.entitiesMap.has(id)); | |
const chunkRes = await this.store.find(this.entity, { | |
where: { | |
id: In(idsNotInCache), | |
} as FindOptionsWhere<Entity>, | |
...(!!relations && { relations }), | |
}); | |
for (const chunkResItem of chunkRes) { | |
this.add(chunkResItem); | |
} | |
} | |
this.resetPrefetchItemIdsList(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type EventArgs = { | |
[key: string]: Codec<any> & { indexed?: boolean }; | |
}; | |
export type Logger = SquidLogger; | |
export const logger: Logger = createLogger('sqd:processor'); | |
export type FuncContext = { | |
log: Logger; | |
store: DbStore; | |
}; | |
export type ContainsCid = (ipfsUrl?: string | null) => { | |
containsCid: boolean; | |
cid: string | undefined; | |
}; | |
export type FetchIpfsNftData = ( | |
metadataUri?: string | null, | |
ipfsGateway?: string, | |
) => Promise<{ data: Metadata; provider?: string } | null>; | |
export type IpfsUtils = { | |
containsCID: ContainsCid; | |
fetchIpfsNftData: FetchIpfsNftData; | |
}; | |
export const questsHelperContractTopics = [ | |
questsHelperAbi.events.QuestConfigured.topic, | |
questsHelperAbiOld.events.QuestConfigured.topic, | |
]; | |
export const landSaleContractTopics = [skybreachLands.events.CustomAssetSet.topic]; | |
export const skybreachContractTopics = [ | |
skybreachAbi.events.FinishedLandSurvey.topic, | |
skybreachAbi.events.StakedNft.topic, | |
skybreachAbi.events.FinishedSettlementSurvey.topic, | |
skybreachAbi.events.MovingNft.topic, | |
skybreachAbi.events.StartedSurvey.topic, | |
skybreachAbi.events.UnstakedNft.topic, | |
skybreachAbi.events.StartedQuest.topic, | |
skybreachAbi.events.FinishedQuest.topic, | |
skybreachAbi.events.BreachDungeonFinished.topic, | |
// skybreachAbi.events.SpentResources.topic, | |
// skybreachAbi.events.StartedBuilding.topic, | |
// skybreachAbi.events.StartedGatheringResources.topic, | |
skybreachAbi.events.AbandonedQuest.topic, | |
skybreachAbi.events.ActiveCommonLandSet.topic, | |
]; | |
export const nftContractTopics = [rmrkNftAbi.events.Transfer.topic]; | |
export const allSupportedTopics = [ | |
...landSaleContractTopics, | |
...questsHelperContractTopics, | |
...skybreachContractTopics, | |
...nftContractTopics, | |
]; | |
export const runProcessor = async ( | |
blocks: BlockData<Fields>[], | |
ctx: FuncContext, | |
batchState: BatchState, | |
ipfsUtils: IpfsUtils, | |
) => { | |
const filteredBlocks = filterRelevantBlocks(blocks); | |
for (const block of filteredBlocks) { | |
const blockEntity = createDbBlock(block); | |
batchState.cachedState.blocks.add(blockEntity); | |
} | |
const filteredLogs = filteredBlocks.flatMap((b) => b.logs); | |
await processNftLogs(filteredLogs, ctx, batchState, ipfsUtils); | |
await processLandDeedLogs(filteredLogs, ctx, batchState, ipfsUtils); | |
await processQuestsHelperLogs(filteredLogs, ctx, batchState, ipfsUtils); | |
await processSkybreachLogs(filteredLogs, ctx, batchState, ipfsUtils); | |
await saveAll(batchState.cachedState); | |
await batchState.eventFacade.pushEvents(); | |
await resetAll(batchState.cachedState); | |
}; | |
export const getLogHandlerContext = ( | |
log: Log, | |
ctx: FuncContext, | |
batchState: BatchState, | |
ipfsUtils: IpfsUtils, | |
blockEntity: Block, | |
) => { | |
const handlerContext: HandlerContext = { | |
dbBlock: blockEntity, | |
ctx, | |
batchState, | |
ipfsUtils, | |
eventFacade: { | |
addEvent(event: Event) { | |
event.id = log.id; | |
event.block = blockEntity; | |
event.transactionHash = log.transaction.hash; | |
event.from = log.transaction.from; | |
batchState.eventFacade.addEvent(event); | |
}, | |
}, | |
}; | |
return handlerContext; | |
}; | |
export type HandlerContext = { | |
ctx: FuncContext; | |
dbBlock: Block; | |
batchState: BatchState; | |
ipfsUtils: IpfsUtils; | |
eventFacade: Pick<EventFacade, 'addEvent'>; | |
}; | |
export type Handler = (event: Log, handlerCtx: HandlerContext) => Promise<any>; | |
export function generateHandlerFromProcessor< | |
E extends EventArgs, | |
T extends { [key: string]: AbiEvent<E> }, | |
K extends keyof T, | |
>( | |
event: T[K], | |
processor: ( | |
decoded: ReturnType<typeof event.decode>, | |
item: Log, | |
ctx: HandlerContext, | |
) => Promise<any>, | |
): [string, Handler] { | |
return [event.topic, (item, ctx): Promise<void> => processor(event.decode(item), item, ctx)]; | |
} | |
export type Processor<T> = ( | |
decodedEvent: T, | |
originalEvent: Log, | |
handlerCtx: HandlerContext, | |
) => Promise<unknown>; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Log } from '../../main'; | |
import { | |
FuncContext, | |
generateHandlerFromProcessor, | |
getLogHandlerContext, | |
Handler, | |
IpfsUtils, | |
Processor, | |
questsHelperContractTopics, | |
} from '../../processor'; | |
import { BatchState } from '../../batch-state'; | |
import * as questsHelperAbi from '../../abi/QuestsHelperAbi'; | |
import * as questsHelperAbiOld from '../../abi/QuestsHelperAbi_old'; | |
import { questsHelperPrefetch } from './quests-helper-prefetchers'; | |
import { | |
Erc20, | |
Event, | |
EventType, | |
LandResources, | |
NftByAsset, | |
Plot, | |
Quest, | |
QuestConfiguredPayload, | |
QuestInputs, | |
QuestLimits, | |
QuestRewards, | |
Settlement, | |
} from '../../model'; | |
import { | |
getQuestId, | |
getTokenDatabaseId, | |
safeBigintIsoTimestampToJsTimestamp, | |
} from '../../utils/helpers'; | |
import { config } from '../../config'; | |
import { LAND_AS_SETTLEMENT_OFFSET } from '../../constants'; | |
export const questsHelperEvents = [ | |
questsHelperAbi.events.QuestConfigured, | |
questsHelperAbiOld.events.QuestConfigured, | |
]; | |
export type QuestsHelperEvent = (typeof questsHelperEvents)[number]; | |
export const processQuestConfigured: Processor< | |
ReturnType<(typeof questsHelperAbi.events)['QuestConfigured']['decode']> | |
> = async (decodedEvent, originalEvent, ctx) => { | |
const { | |
config: { questId, landId, questType, duration }, | |
seed, | |
difficulty, | |
} = decodedEvent; | |
const settlementId = getTokenDatabaseId( | |
config.SETTLEMENT_CONTRACT_ADDRESS, | |
LAND_AS_SETTLEMENT_OFFSET + landId, | |
); | |
const settlement = await ctx.batchState.cachedState.settlements.getOrCreate( | |
settlementId, | |
async () => | |
new Settlement({ | |
id: settlementId, | |
metadataUri: '', | |
owner: config.SKYBREACH_CONTRACT_ADDRESSES, | |
}), | |
); | |
let plot = await ctx.batchState.cachedState.plots.get( | |
getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId), | |
); | |
if (landId !== 0n && !plot) { | |
plot = new Plot({ | |
id: getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId), | |
}); | |
ctx.batchState.cachedState.plots.add(plot); | |
} | |
const questInputs = new QuestInputs({ | |
rmrk: decodedEvent.config.inputs.rmrk, | |
sila: decodedEvent.config.inputs.sila, | |
otherResources: decodedEvent.config.inputs.otherResources.map( | |
(resource) => new Erc20(resource), | |
), | |
skybreachResources: new LandResources(decodedEvent.config.inputs.skybreachResources), | |
}); | |
const firstTimeRewards = new QuestRewards({ | |
...decodedEvent.config.firstTimeRewards, | |
nfts: decodedEvent.config.firstTimeRewards.nfts.map((nft) => new NftByAsset(nft)), | |
otherResources: decodedEvent.config.firstTimeRewards.otherResources.map( | |
(resource) => new Erc20(resource), | |
), | |
skybreachResources: new LandResources(decodedEvent.config.firstTimeRewards.skybreachResources), | |
}); | |
const recurrentRewards = new QuestRewards({ | |
...decodedEvent.config.recurrentRewards, | |
nfts: decodedEvent.config.recurrentRewards.nfts.map((nft) => new NftByAsset(nft)), | |
otherResources: decodedEvent.config.recurrentRewards.otherResources.map( | |
(resource) => new Erc20(resource), | |
), | |
skybreachResources: new LandResources(decodedEvent.config.recurrentRewards.skybreachResources), | |
}); | |
const questLimits = new QuestLimits({ | |
...decodedEvent.config.limits, | |
minStartTime: | |
decodedEvent.config.limits.minStartTime > 0n | |
? safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.minStartTime) | |
: ctx.dbBlock.timestamp, | |
maxStartTime: safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.maxStartTime), | |
}); | |
const quest = await ctx.batchState.cachedState.quests.getOrCreate( | |
getQuestId(questId), | |
async () => | |
new Quest({ | |
id: getQuestId(questId), | |
}), | |
); | |
quest.seed = seed; | |
quest.duration = duration; | |
quest.questType = questType; | |
quest.inputs = questInputs; | |
quest.firstTimeRewards = firstTimeRewards; | |
quest.recurrentRewards = recurrentRewards; | |
quest.limits = questLimits; | |
quest.plot = plot; | |
quest.settlement = settlement; | |
quest.difficulty = difficulty; | |
ctx.batchState.cachedState.quests.add(quest); | |
ctx.eventFacade.addEvent( | |
new Event({ | |
eventType: EventType.QuestConfigured, | |
plot, | |
quest, | |
payload: new QuestConfiguredPayload({ | |
...decodedEvent.config, | |
inputs: questInputs, | |
firstTimeRewards, | |
recurrentRewards, | |
limits: questLimits, | |
seed, | |
difficulty, | |
}), | |
}), | |
); | |
}; | |
export const processQuestConfiguredOld: Processor< | |
ReturnType<(typeof questsHelperAbiOld.events)['QuestConfigured']['decode']> | |
> = async (decodedEvent, originalEvent, ctx) => { | |
const { | |
config: { questId, landId, questType, duration }, | |
seed, | |
} = decodedEvent; | |
const settlementId = getTokenDatabaseId( | |
config.SETTLEMENT_CONTRACT_ADDRESS, | |
LAND_AS_SETTLEMENT_OFFSET + landId, | |
); | |
const settlement = await ctx.batchState.cachedState.settlements.getOrCreate( | |
settlementId, | |
async () => | |
new Settlement({ | |
id: settlementId, | |
metadataUri: '', | |
owner: config.SKYBREACH_CONTRACT_ADDRESSES, | |
}), | |
); | |
let plot = await ctx.batchState.cachedState.plots.get( | |
getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId), | |
); | |
if (landId !== 0n && !plot) { | |
plot = new Plot({ | |
id: getTokenDatabaseId(config.LAND_SALE_CONTRACT_ADDRESSES, landId), | |
}); | |
ctx.batchState.cachedState.plots.add(plot); | |
} | |
const questInputs = new QuestInputs({ | |
rmrk: decodedEvent.config.inputs.rmrk, | |
sila: decodedEvent.config.inputs.sila, | |
otherResources: decodedEvent.config.inputs.otherResources.map( | |
(resource) => new Erc20(resource), | |
), | |
skybreachResources: new LandResources(decodedEvent.config.inputs.skybreachResources), | |
}); | |
const firstTimeRewards = new QuestRewards({ | |
...decodedEvent.config.firstTimeRewards, | |
nfts: decodedEvent.config.firstTimeRewards.nfts.map((nft) => new NftByAsset(nft)), | |
otherResources: decodedEvent.config.firstTimeRewards.otherResources.map( | |
(resource) => new Erc20(resource), | |
), | |
skybreachResources: new LandResources(decodedEvent.config.firstTimeRewards.skybreachResources), | |
}); | |
const recurrentRewards = new QuestRewards({ | |
...decodedEvent.config.recurrentRewards, | |
nfts: decodedEvent.config.recurrentRewards.nfts.map((nft) => new NftByAsset(nft)), | |
otherResources: decodedEvent.config.recurrentRewards.otherResources.map( | |
(resource) => new Erc20(resource), | |
), | |
skybreachResources: new LandResources(decodedEvent.config.recurrentRewards.skybreachResources), | |
}); | |
const questLimits = new QuestLimits({ | |
...decodedEvent.config.limits, | |
minStartTime: | |
decodedEvent.config.limits.minStartTime > 0n | |
? safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.minStartTime) | |
: ctx.dbBlock.timestamp, | |
maxStartTime: safeBigintIsoTimestampToJsTimestamp(decodedEvent.config.limits.maxStartTime), | |
}); | |
const quest = await ctx.batchState.cachedState.quests.getOrCreate( | |
getQuestId(questId), | |
async () => | |
new Quest({ | |
id: getQuestId(questId), | |
}), | |
); | |
quest.seed = seed; | |
quest.duration = duration; | |
quest.questType = questType; | |
quest.inputs = questInputs; | |
quest.firstTimeRewards = firstTimeRewards; | |
quest.recurrentRewards = recurrentRewards; | |
quest.limits = questLimits; | |
quest.plot = plot; | |
quest.settlement = settlement; | |
quest.difficulty = 4; | |
ctx.batchState.cachedState.quests.add(quest); | |
ctx.eventFacade.addEvent( | |
new Event({ | |
eventType: EventType.QuestConfigured, | |
plot, | |
quest, | |
payload: new QuestConfiguredPayload({ | |
...decodedEvent.config, | |
inputs: questInputs, | |
firstTimeRewards, | |
recurrentRewards, | |
limits: questLimits, | |
seed, | |
difficulty: 4, | |
}), | |
}), | |
); | |
}; | |
export const questsHelperHandlerProcessor: [QuestsHelperEvent, any][] = [ | |
[questsHelperAbi.events.QuestConfigured, processQuestConfigured], | |
[questsHelperAbiOld.events.QuestConfigured, processQuestConfiguredOld], | |
]; | |
export const questsHelperHandlers: { [key: string]: Handler } = Object.fromEntries( | |
questsHelperHandlerProcessor.map((args) => generateHandlerFromProcessor(...args)), | |
); | |
export const processQuestsHelperLog = async ( | |
log: Log, | |
ctx: FuncContext, | |
batchState: BatchState, | |
ipfsUtils: IpfsUtils, | |
) => { | |
const topic = log.topics[0]; | |
const blockEntity = await batchState.cachedState.blocks.getOrThrow(`${log.block.id}`); | |
const handlerContext = getLogHandlerContext(log, ctx, batchState, ipfsUtils, blockEntity); | |
switch (true) { | |
case questsHelperContractTopics.includes(topic): { | |
await questsHelperHandlers[topic](log, handlerContext); | |
break; | |
} | |
} | |
}; | |
async function processLogPrefetch( | |
log: Log, | |
ctx: FuncContext, | |
batchState: BatchState, | |
ipfsUtils: IpfsUtils, | |
) { | |
const blockEntity = await batchState.cachedState.blocks.getOrThrow(`${log.block.id}`); | |
const topic = log.topics[0]; | |
const handlerContext = getLogHandlerContext(log, ctx, batchState, ipfsUtils, blockEntity); | |
switch (true) { | |
case !!questsHelperContractTopics[topic]: { | |
await questsHelperPrefetch(topic, log, handlerContext); | |
break; | |
} | |
} | |
} | |
export const processQuestsHelperLogs = async ( | |
logs: Log[], | |
ctx: FuncContext, | |
batchState: BatchState, | |
ipfsUtils: IpfsUtils, | |
) => { | |
for (const log of logs) { | |
await processLogPrefetch(log, ctx, batchState, ipfsUtils); | |
} | |
await batchState.cachedState.plots.prefetchEntities(); | |
for (const log of logs) { | |
await processQuestsHelperLog(log, ctx, batchState, ipfsUtils); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment