Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-agent: Improve batch action preparation #817

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 79 additions & 37 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
Action,
ActionFailure,
ActionType,
Allocation,
allocationIdProof,
AllocationResult,
AllocationStatus,
Expand All @@ -22,6 +23,7 @@ import {
IndexerManagementModels,
IndexingDecisionBasis,
IndexingRuleAttributes,
IndexingStatus,
isActionFailure,
isDeploymentWorthAllocatingTowards,
Network,
Expand All @@ -42,6 +44,12 @@ import {
import { BytesLike } from '@ethersproject/bytes'
import pMap from 'p-map'

export interface TransactionPreparationContext {
activeAllocations: Allocation[]
currentEpoch: BigNumber
indexingStatuses: IndexingStatus[]
}

export interface AllocateTransactionParams {
indexer: string
subgraphDeploymentID: BytesLike
Expand Down Expand Up @@ -228,15 +236,26 @@ export class AllocationManager {
}

async prepareTransactions(actions: Action[]): Promise<PopulateTransactionResult[]> {
const context: TransactionPreparationContext = {
activeAllocations: await this.network.networkMonitor.allocations(
AllocationStatus.ACTIVE,
),
currentEpoch: await this.network.contracts.epochManager.currentEpoch(),
indexingStatuses: await this.graphNode.indexingStatus([]),
}
return await pMap(
actions,
async (action: Action) => await this.prepareTransaction(action),
async (action: Action) => await this.prepareTransaction(action, context),
{
stopOnError: false,
},
)
}
async prepareTransaction(action: Action): Promise<PopulateTransactionResult> {

async prepareTransaction(
action: Action,
context: TransactionPreparationContext,
): Promise<PopulateTransactionResult> {
const logger = this.logger.child({ action: action.id })
logger.trace('Preparing transaction', {
action,
Expand All @@ -246,6 +265,7 @@ export class AllocationManager {
case ActionType.ALLOCATE:
return await this.prepareAllocate(
logger,
context,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new SubgraphDeploymentID(action.deploymentID!),
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -254,6 +274,7 @@ export class AllocationManager {
case ActionType.UNALLOCATE:
return await this.prepareUnallocate(
logger,
context,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
action.allocationID!,
action.poi === null ? undefined : action.poi,
Expand All @@ -262,6 +283,7 @@ export class AllocationManager {
case ActionType.REALLOCATE:
return await this.prepareReallocate(
logger,
context,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
action.allocationID!,
action.poi === null ? undefined : action.poi,
Expand All @@ -287,6 +309,7 @@ export class AllocationManager {

async prepareAllocateParams(
logger: Logger,
context: TransactionPreparationContext,
deployment: SubgraphDeploymentID,
amount: BigNumber,
): Promise<AllocateTransactionParams> {
Expand All @@ -295,10 +318,7 @@ export class AllocationManager {
amount: amount.toString(),
})

const activeAllocations = await this.network.networkMonitor.allocations(
AllocationStatus.ACTIVE,
)
const allocation = activeAllocations.find(
const allocation = context.activeAllocations.find(
(allocation) =>
allocation.subgraphDeployment.id.toString() === deployment.toString(),
)
Expand All @@ -323,22 +343,32 @@ export class AllocationManager {
)
}

const currentEpoch = await this.network.contracts.epochManager.currentEpoch()

// Ensure subgraph is deployed before allocating
await this.graphNode.ensure(
`indexer-agent/${deployment.ipfsHash.slice(-10)}`,
deployment,
// Check that the subgraph is syncing and healthy before allocating
// Throw error if:
// - subgraph deployment is not syncing,
// - subgraph deployment is failed
const status = context.indexingStatuses.find(
(status) => status.subgraphDeployment.ipfsHash == deployment.ipfsHash,
)
if (!status) {
throw indexerError(
IndexerErrorCode.IE020,
`Subgraph deployment, '${deployment.ipfsHash}', is not syncing`,
)
}
if (status && status.health == 'failed') {
throw indexerError(
IndexerErrorCode.IE020,
`Subgraph deployment, '${deployment.ipfsHash}', failed during syncing`,
)
}

logger.debug('Obtain a unique Allocation ID')

// Obtain a unique allocation ID
const { allocationSigner, allocationId } = uniqueAllocationID(
this.network.transactionManager.wallet.mnemonic.phrase,
currentEpoch.toNumber(),
context.currentEpoch.toNumber(),
deployment,
activeAllocations.map((allocation) => allocation.id),
context.activeAllocations.map((allocation) => allocation.id),
)

// Double-check whether the allocationID already exists on chain, to
Expand Down Expand Up @@ -460,10 +490,11 @@ export class AllocationManager {

async prepareAllocate(
logger: Logger,
context: TransactionPreparationContext,
deployment: SubgraphDeploymentID,
amount: BigNumber,
): Promise<PopulatedTransaction> {
const params = await this.prepareAllocateParams(logger, deployment, amount)
const params = await this.prepareAllocateParams(logger, context, deployment, amount)
logger.debug(`Populating allocateFrom transaction`, {
indexer: params.indexer,
subgraphDeployment: params.subgraphDeploymentID,
Expand All @@ -483,6 +514,7 @@ export class AllocationManager {

async prepareUnallocateParams(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
force: boolean,
Expand All @@ -492,12 +524,14 @@ export class AllocationManager {
poi: poi || 'none provided',
})
const allocation = await this.network.networkMonitor.allocation(allocationID)

// Ensure allocation is old enough to close
const currentEpoch = await this.network.contracts.epochManager.currentEpoch()
if (BigNumber.from(allocation.createdAtEpoch).eq(currentEpoch)) {
if (BigNumber.from(allocation.createdAtEpoch).eq(context.currentEpoch)) {
throw indexerError(
IndexerErrorCode.IE064,
`Allocation '${allocation.id}' cannot be closed until epoch ${currentEpoch.add(
`Allocation '${
allocation.id
}' cannot be closed until epoch ${context.currentEpoch.add(
1,
)}. (Allocations cannot be closed in the same epoch they were created)`,
)
Expand Down Expand Up @@ -635,16 +669,24 @@ export class AllocationManager {

async prepareUnallocate(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
force: boolean,
): Promise<PopulatedTransaction> {
const params = await this.prepareUnallocateParams(logger, allocationID, poi, force)
const params = await this.prepareUnallocateParams(
logger,
context,
allocationID,
poi,
force,
)
return await this.populateUnallocateTransaction(logger, params)
}

async prepareReallocateParams(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
amount: BigNumber,
Expand All @@ -657,14 +699,9 @@ export class AllocationManager {
force,
})

/* Fetch all active allocations and search for our input parameter `allocationID`.
* We don't call `fetchAllocations` here because all allocations will be required
* later when generating a new `uniqueAllocationID`. */
const activeAllocations = await this.network.networkMonitor.allocations(
AllocationStatus.ACTIVE,
)
// Validate that the allocation exists and is old enough to close
const allocationAddress = toAddress(allocationID)
const allocation = activeAllocations.find((allocation) => {
const allocation = context.activeAllocations.find((allocation) => {
return allocation.id === allocationAddress
})
if (!allocation) {
Expand All @@ -674,25 +711,28 @@ export class AllocationManager {
`Reallocation failed: No active allocation with id '${allocationID}' found`,
)
}

// Ensure allocation is old enough to close
const currentEpoch = await this.network.contracts.epochManager.currentEpoch()
if (BigNumber.from(allocation.createdAtEpoch).eq(currentEpoch)) {
if (BigNumber.from(allocation.createdAtEpoch).eq(context.currentEpoch)) {
throw indexerError(
IndexerErrorCode.IE064,
`Allocation '${allocation.id}' cannot be closed until epoch ${currentEpoch.add(
`Allocation '${
allocation.id
}' cannot be closed until epoch ${context.currentEpoch.add(
1,
)}. (Allocations cannot be closed in the same epoch they were created)`,
)
}

logger.debug('Resolving POI')
logger.debug('Resolving POI', {
allocation: allocationID,
deployment: allocation.subgraphDeployment.id.ipfsHash,
})
const allocationPOI = await this.network.networkMonitor.resolvePOI(
allocation,
poi,
force,
)
logger.debug('POI resolved', {
deployment: allocation.subgraphDeployment.id.ipfsHash,
userProvidedPOI: poi,
poi: allocationPOI,
})
Expand Down Expand Up @@ -723,9 +763,9 @@ export class AllocationManager {
logger.debug('Generating a new unique Allocation ID')
const { allocationSigner, allocationId: newAllocationId } = uniqueAllocationID(
this.network.transactionManager.wallet.mnemonic.phrase,
currentEpoch.toNumber(),
context.currentEpoch.toNumber(),
allocation.subgraphDeployment.id,
activeAllocations.map((allocation) => allocation.id),
context.activeAllocations.map((allocation) => allocation.id),
)

logger.debug('New unique Allocation ID generated', {
Expand Down Expand Up @@ -774,7 +814,7 @@ export class AllocationManager {
deployment: allocation.subgraphDeployment.id.toString(),
poi: allocationPOI,
proof,
epoch: currentEpoch.toString(),
epoch: context.currentEpoch.toString(),
})

return {
Expand Down Expand Up @@ -921,13 +961,15 @@ export class AllocationManager {

async prepareReallocate(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
amount: BigNumber,
force: boolean,
): Promise<PopulatedTransaction[]> {
const params = await this.prepareReallocateParams(
logger,
context,
allocationID,
poi,
amount,
Expand Down
Loading