Skip to content

Commit

Permalink
common, agent: switch most timer calls to sequential impl
Browse files Browse the repository at this point in the history
  • Loading branch information
dwerner committed Dec 26, 2024
1 parent 2e336b2 commit 2bc01da
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 209 deletions.
134 changes: 69 additions & 65 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
networkIsL1,
DeploymentManagementMode,
SubgraphStatus,
sequentialTimerMap,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
Expand Down Expand Up @@ -253,40 +254,41 @@ export class Agent {
const requestIntervalSmall = this.pollingInterval
const requestIntervalLarge = this.pollingInterval * 5
const logger = this.logger.child({ component: 'ReconciliationLoop' })
const currentEpochNumber: Eventual<NetworkMapped<number>> = timer(
requestIntervalLarge,
).tryMap(
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching current epoch number', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.currentEpochNumber()
}),
{
onError: error =>
logger.warn(`Failed to fetch current epoch`, { error }),
},
)
const currentEpochNumber: Eventual<NetworkMapped<number>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching current epoch number', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.currentEpochNumber()
}),
{
onError: error =>
logger.warn(`Failed to fetch current epoch`, { error }),
},
)

const maxAllocationEpochs: Eventual<NetworkMapped<number>> = timer(
requestIntervalLarge,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.staking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)
const maxAllocationEpochs: Eventual<NetworkMapped<number>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.staking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)

const indexingRules: Eventual<NetworkMapped<IndexingRuleAttributes[]>> =
timer(requestIntervalSmall).tryMap(
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
logger.trace('Fetching indexing rules', {
Expand Down Expand Up @@ -322,24 +324,25 @@ export class Agent {
},
)

const activeDeployments: Eventual<SubgraphDeploymentID[]> = timer(
requestIntervalSmall,
).tryMap(
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
},
{
onError: error =>
logger.warn(
`Failed to obtain active deployments, trying again later`,
{ error },
),
},
)
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
},
{
onError: error =>
logger.warn(
`Failed to obtain active deployments, trying again later`,
{ error },
),
},
)

const networkDeployments: Eventual<NetworkMapped<SubgraphDeployment[]>> =
timer(requestIntervalSmall).tryMap(
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching network deployments', {
Expand All @@ -358,7 +361,8 @@ export class Agent {

const eligibleTransferDeployments: Eventual<
NetworkMapped<TransferredSubgraphDeployment[]>
> = timer(requestIntervalLarge).tryMap(
> = sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
// Return early if the auto migration feature is disabled.
if (!this.autoMigrationSupport) {
Expand Down Expand Up @@ -558,23 +562,23 @@ export class Agent {
},
)

const activeAllocations: Eventual<NetworkMapped<Allocation[]>> = timer(
requestIntervalSmall,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching active allocations', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.allocations(AllocationStatus.ACTIVE)
}),
{
onError: () =>
logger.warn(
`Failed to obtain active allocations, trying again later`,
),
},
)
const activeAllocations: Eventual<NetworkMapped<Allocation[]>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching active allocations', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.allocations(AllocationStatus.ACTIVE)
}),
{
onError: () =>
logger.warn(
`Failed to obtain active allocations, trying again later`,
),
},
)

// `activeAllocations` is used to trigger this Eventual, but not really needed
// inside.
Expand Down
12 changes: 10 additions & 2 deletions packages/indexer-common/src/allocations/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import {
indexerError,
IndexerErrorCode,
parseGraphQLAllocation,
sequentialTimerReduce,
} from '@graphprotocol/indexer-common'
import { Allocation, MonitorEligibleAllocationsOptions } from './types'

import gql from 'graphql-tag'

import { Eventual, timer } from '@graphprotocol/common-ts'
import { Eventual } from '@graphprotocol/common-ts'

export const monitorEligibleAllocations = ({
indexer,
Expand Down Expand Up @@ -168,7 +169,14 @@ export const monitorEligibleAllocations = ({
}
}

const allocations = timer(interval).reduce(refreshAllocations, [])
const allocations = sequentialTimerReduce(
{
logger,
milliseconds: interval,
},
refreshAllocations,
[],
)

allocations.pipe((allocations) => {
logger.info(`Eligible allocations`, {
Expand Down
6 changes: 3 additions & 3 deletions packages/indexer-common/src/allocations/query-fees.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Counter, Gauge, Histogram } from 'prom-client'
import axios from 'axios'
import {
Logger,
timer,
BytesWriter,
toAddress,
formatGRT,
Expand All @@ -20,6 +19,7 @@ import {
ensureAllocationSummary,
TransactionManager,
specification as spec,
sequentialTimerMap,
} from '..'
import { DHeap } from '@thi.ng/heaps'
import { BigNumber, BigNumberish, Contract } from 'ethers'
Expand Down Expand Up @@ -264,7 +264,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
}

// Check if there's another batch of receipts to collect every 10s
timer(10_000).pipe(async () => {
sequentialTimerMap({ logger: this.logger, milliseconds: 10_000 }, async () => {
while (hasReceiptsReadyForCollecting()) {
// Remove the batch from the processing queue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -283,7 +283,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
}

private startVoucherProcessing() {
timer(30_000).pipe(async () => {
sequentialTimerMap({ logger: this.logger, milliseconds: 30_000 }, async () => {
let pendingVouchers: Voucher[] = []
try {
pendingVouchers = await this.pendingVouchers() // Ordered by value
Expand Down
11 changes: 6 additions & 5 deletions packages/indexer-common/src/allocations/tap-collector.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Counter, Gauge, Histogram } from 'prom-client'
import {
Logger,
timer,
toAddress,
formatGRT,
Address,
Metrics,
Eventual,
join as joinEventual,
} from '@graphprotocol/common-ts'
import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings'
import {
Expand All @@ -23,6 +21,7 @@ import {
allocationSigner,
tapAllocationIdProof,
parseGraphQLAllocation,
sequentialTimerMap,
} from '..'
import { BigNumber } from 'ethers'
import pReduce from 'p-reduce'
Expand Down Expand Up @@ -184,9 +183,11 @@ export class TapCollector {
}

private getPendingRAVs(): Eventual<RavWithAllocation[]> {
return joinEventual({
timer: timer(RAV_CHECK_INTERVAL_MS),
}).tryMap(
return sequentialTimerMap(
{
logger: this.logger,
milliseconds: RAV_CHECK_INTERVAL_MS,
},
async () => {
let ravs = await this.pendingRAVs()
if (ravs.length === 0) {
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './utils'
export * from './parsers'
export * as specification from './network-specification'
export * from './multi-networks'
export * from './sequential-timer'
9 changes: 7 additions & 2 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import {
Network,
OrderDirection,
GraphNode,
sequentialTimerMap,
} from '@graphprotocol/indexer-common'

import { Order, Transaction } from 'sequelize'
import { Eventual, join, Logger, timer } from '@graphprotocol/common-ts'
import { Eventual, join, Logger } from '@graphprotocol/common-ts'
import groupBy from 'lodash.groupby'

export class ActionManager {
Expand Down Expand Up @@ -116,7 +117,11 @@ export class ActionManager {

async monitorQueue(): Promise<void> {
const logger = this.logger.child({ component: 'QueueMonitor' })
const approvedActions: Eventual<Action[]> = timer(30_000).tryMap(
const approvedActions: Eventual<Action[]> = sequentialTimerMap(
{
logger,
milliseconds: 30_000,
},
async () => {
logger.trace('Fetching approved actions')
let actions: Action[] = []
Expand Down
Loading

0 comments on commit 2bc01da

Please sign in to comment.