Skip to content

Commit

Permalink
feat(observability): utilise new logger in indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
forbesus committed Dec 11, 2024
1 parent 26ce923 commit 9400dc9
Show file tree
Hide file tree
Showing 25 changed files with 143 additions and 79 deletions.
3 changes: 2 additions & 1 deletion apps/indexer/mvm.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"dependencies": {
"@akashnetwork/database": "1.0.0",
"@akashnetwork/env-loader": "1.0.1"
"@akashnetwork/env-loader": "1.0.1",
"@akashnetwork/logging": "2.0.2"
},
"devDependencies": {
"@akashnetwork/dev-config": "1.0.0"
Expand Down
1 change: 1 addition & 0 deletions apps/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@akashnetwork/akash-api": "^1.3.0",
"@akashnetwork/database": "*",
"@akashnetwork/env-loader": "*",
"@akashnetwork/logging": "*",
"@cosmjs/crypto": "^0.32.4",
"@cosmjs/encoding": "^0.32.4",
"@cosmjs/math": "^0.32.4",
Expand Down
27 changes: 14 additions & 13 deletions apps/indexer/src/chain/chainSync.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Block, Message } from "@akashnetwork/database/dbSchemas";
import { Day, Transaction } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import { fromBase64 } from "@cosmjs/encoding";
import { decodeTxRaw } from "@cosmjs/proto-signing";
import { asyncify, eachLimit } from "async";
Expand All @@ -25,6 +26,8 @@ import {
import { nodeAccessor } from "./nodeAccessor";
import { statsProcessor } from "./statsProcessor";

const logger = LoggerService.forContext("ChainSync");

export const setMissingBlock = (height: number) => (missingBlock = height);
let missingBlock: number;

Expand Down Expand Up @@ -83,13 +86,13 @@ export async function syncBlocks() {
const latestHeightInCache = await getLatestHeightInCache();

if (latestHeightInCache >= latestBlockToDownload) {
console.log("No blocks to download");
logger.info("No blocks to download");
} else {
let startHeight = !env.KEEP_CACHE ? latestInsertedHeight + 1 : Math.max(latestHeightInCache, 1);

// If database is empty
if (latestInsertedHeight === 0) {
console.log("Starting from scratch");
logger.info("Starting from scratch");
startHeight = activeChain.startHeight || 1;
}

Expand All @@ -101,13 +104,11 @@ export async function syncBlocks() {

const maxDownloadGroupSize = 1_000;
if (latestBlockToDownload - startHeight > maxDownloadGroupSize) {
console.log("Limiting download to " + maxDownloadGroupSize + " blocks");
logger.info("Limiting download to " + maxDownloadGroupSize + " blocks");
latestBlockToDownload = startHeight + maxDownloadGroupSize;
}

console.log("Starting download at block #" + startHeight);
console.log("Will end download at block #" + latestBlockToDownload);
console.log(latestBlockToDownload - startHeight + 1 + " blocks to download");
const blocksCount = latestBlockToDownload - startHeight + 1
logger.info({ event: 'DOWNLOAD', startHeight, latestBlockToDownload, blocksCount });

await benchmark.measureAsync("downloadBlocks", async () => {
await downloadBlocks(startHeight, latestBlockToDownload);
Expand Down Expand Up @@ -151,7 +152,7 @@ export async function syncBlocks() {

async function insertBlocks(startHeight: number, endHeight: number) {
const blockCount = endHeight - startHeight + 1;
console.log("Inserting " + blockCount + " blocks into database");
logger.info("Inserting " + blockCount + " blocks into database");

let lastInsertedBlock = (await Block.findOne({
include: [
Expand Down Expand Up @@ -242,7 +243,7 @@ async function insertBlocks(startHeight: number, endHeight: number) {
const blockDate = new Date(Date.UTC(blockDatetime.getUTCFullYear(), blockDatetime.getUTCMonth(), blockDatetime.getUTCDate()));

if (!lastInsertedBlock || !isEqual(blockDate, lastInsertedBlock.day.date)) {
console.log("Creating day: ", blockDate, i);
logger.info(`Creating day: ${blockDate} ${i}`);
const [newDay, created] = await Day.findOrCreate({
where: {
date: blockDate
Expand All @@ -256,7 +257,7 @@ async function insertBlocks(startHeight: number, endHeight: number) {
});

if (!created) {
console.warn(`Day ${blockDate} already exists in database`);
logger.warn(`Day ${blockDate} already exists in database`);
}

blockEntry.dayId = newDay.id;
Expand Down Expand Up @@ -287,15 +288,15 @@ async function insertBlocks(startHeight: number, endHeight: number) {
blocksToAdd = [];
txsToAdd = [];
msgsToAdd = [];
console.log(`Blocks added to db: ${i - startHeight + 1} / ${blockCount} (${(((i - startHeight + 1) * 100) / blockCount).toFixed(2)}%)`);
logger.info(`Blocks added to db: ${i - startHeight + 1} / ${blockCount} (${(((i - startHeight + 1) * 100) / blockCount).toFixed(2)}%)`);

if (lastInsertedBlock) {
lastInsertedBlock.day.lastBlockHeightYet = lastInsertedBlock.height;
await lastInsertedBlock.day.save({ transaction: insertDbTransaction });
}
});
} catch (error) {
console.log(error, txsToAdd);
logger.info(`${error}, ${txsToAdd}`);
}
}
}
Expand All @@ -319,7 +320,7 @@ async function downloadBlocks(startHeight: number, endHeight: number) {
if (Date.now() - lastLogDate > 500) {
lastLogDate = Date.now();
console.clear();
console.log("Progress: " + ((downloadedCount * 100) / missingBlockCount).toFixed(2) + "%");
logger.info("Progress: " + ((downloadedCount * 100) / missingBlockCount).toFixed(2) + "%");

if (!isProd) {
nodeAccessor.displayTable();
Expand Down
6 changes: 4 additions & 2 deletions apps/indexer/src/chain/dataStore.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { LoggerService } from "@akashnetwork/logging";
import fs from "fs";
import { Level } from "level";
import path from "path";
Expand All @@ -6,6 +7,7 @@ import { dataFolderPath } from "@src/shared/constants";
import { bytesToHumanReadableSize } from "@src/shared/utils/files";

const LevelNotFoundCode = "LEVEL_NOT_FOUND";
const logger = LoggerService.forContext("DataStore");

if (!fs.existsSync(dataFolderPath)) {
fs.mkdirSync(dataFolderPath, { recursive: true });
Expand Down Expand Up @@ -37,10 +39,10 @@ export const getCacheSize = async function () {
};

export const deleteCache = async function () {
console.log("Deleting cache...");
logger.info("Deleting cache...");
await blocksDb.clear();
await blockResultsDb.clear();
console.log("Deleted");
logger.info("Deleted");
};

export async function getCachedBlockByHeight(height: number) {
Expand Down
7 changes: 5 additions & 2 deletions apps/indexer/src/chain/genesisImporter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { LoggerService } from "@akashnetwork/logging";
import fs from "fs";
import { ungzip } from "node-gzip";
import path from "path";
Expand All @@ -7,19 +8,21 @@ import { dataFolderPath } from "@src/shared/constants";
import { download } from "@src/shared/utils/download";
import { IGenesis } from "./genesisTypes";

const logger = LoggerService.forContext("GenesisImports");

export async function getGenesis(): Promise<IGenesis> {
const ext = path.extname(activeChain.genesisFileUrl);
const filename = path.basename(activeChain.genesisFileUrl);

let genesisLocalPath = dataFolderPath + "/" + filename;

if (!fs.existsSync(genesisLocalPath)) {
console.log("Downloading genesis file: " + activeChain.genesisFileUrl);
logger.info("Downloading genesis file: " + activeChain.genesisFileUrl);
await download(activeChain.genesisFileUrl, genesisLocalPath);
}

if (ext === ".gz") {
console.log("Extracting genesis file...");
logger.info("Extracting genesis file...");
const decompressed = await ungzip(fs.readFileSync(genesisLocalPath).buffer);
genesisLocalPath = genesisLocalPath.replace(".gz", "");
fs.writeFileSync(genesisLocalPath, decompressed);
Expand Down
9 changes: 5 additions & 4 deletions apps/indexer/src/chain/nodeAccessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { LoggerService } from "@akashnetwork/logging";
import fs from "fs";

import { concurrentNodeQuery, dataFolderPath } from "@src/shared/constants";
Expand All @@ -10,7 +11,7 @@ interface NodeAccessorSettings {
}

const savedNodeInfoPath = dataFolderPath + "/nodeStatus.json";

const logger = LoggerService.forContext("NodeAccessor");
class NodeAccessor {
private nodes: NodeInfo[];
private settings: NodeAccessorSettings;
Expand All @@ -21,7 +22,7 @@ class NodeAccessor {
}

private async saveNodeStatus() {
console.log("Saving node status...");
logger.info("Saving node status...");
const statuses = this.nodes.map(x => x.getSavedNodeInfo());

await fs.promises.writeFile(savedNodeInfoPath, JSON.stringify(statuses, null, 2));
Expand All @@ -35,13 +36,13 @@ class NodeAccessor {

public async loadNodeStatus() {
if (!fs.existsSync(savedNodeInfoPath)) {
console.log("No saved node status found");
logger.info("No saved node status found");
await this.refetchNodeStatus();
await this.saveNodeStatus();
return;
}

console.log("Loading saved node status...");
logger.info("Loading saved node status...");
const file = await fs.promises.readFile(savedNodeInfoPath, "utf-8");
const savedNodes = JSON.parse(file) as SavedNodeInfo[];

Expand Down
15 changes: 9 additions & 6 deletions apps/indexer/src/chain/statsProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Block, Message } from "@akashnetwork/database/dbSchemas";
import { AkashMessage } from "@akashnetwork/database/dbSchemas/akash";
import { Transaction } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import { fromBase64 } from "@cosmjs/encoding";
import { decodeTxRaw } from "@cosmjs/proto-signing";
import { sha256 } from "js-sha256";
Expand All @@ -16,11 +17,13 @@ import { decodeMsg } from "@src/shared/utils/protobuf";
import { setMissingBlock } from "./chainSync";
import { getGenesis } from "./genesisImporter";

const logger = LoggerService.forContext("StatsProcessor");

class StatsProcessor {
private cacheInitialized: boolean = false;

public async rebuildStatsTables() {
console.log('Setting "isProcessed" to false');
logger.info('Setting "isProcessed" to false');
await Message.update(
{
isProcessed: false,
Expand All @@ -41,7 +44,7 @@ class StatsProcessor {
{ where: { isProcessed: true } }
);

console.log("Rebuilding stats tables...");
logger.info("Rebuilding stats tables...");

for (const indexer of activeIndexers) {
await indexer.recreateTables();
Expand All @@ -58,7 +61,7 @@ class StatsProcessor {
}

public async processMessages() {
console.log("Querying unprocessed messages...");
logger.info("Querying unprocessed messages...");

const shouldProcessEveryBlocks = activeIndexers.some(indexer => indexer.runForEveryBlocks);

Expand All @@ -78,7 +81,7 @@ class StatsProcessor {
const hasNewBlocks = !previousProcessedBlock || maxDbHeight > previousProcessedBlock.height;

if (!hasNewBlocks) {
console.log("No new blocks to process");
logger.info("No new blocks to process");
return;
}

Expand All @@ -94,7 +97,7 @@ class StatsProcessor {
let firstBlockToProcess = firstUnprocessedHeight;
let lastBlockToProcess = Math.min(maxDbHeight, firstBlockToProcess + groupSize, lastBlockToSync);
while (firstBlockToProcess <= Math.min(maxDbHeight, lastBlockToSync)) {
console.log(`Loading blocks ${firstBlockToProcess} to ${lastBlockToProcess}`);
logger.info(`Loading blocks ${firstBlockToProcess} to ${lastBlockToProcess}`);

const getBlocksTimer = benchmark.startTimer("getBlocks");
const blocks = await Block.findAll({
Expand Down Expand Up @@ -150,7 +153,7 @@ class StatsProcessor {
decodeTimer.end();

for (const msg of transaction.messages) {
console.log(`Processing message ${msg.type} - Block #${block.height}`);
logger.info(`Processing message ${msg.type} - Block #${block.height}`);

const encodedMessage = decodedTx.body.messages[msg.index].value;

Expand Down
9 changes: 6 additions & 3 deletions apps/indexer/src/db/buildDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Block, Message } from "@akashnetwork/database/dbSchemas";
import { Day, Transaction } from "@akashnetwork/database/dbSchemas/base";
import { MonitoredValue } from "@akashnetwork/database/dbSchemas/base/monitoredValue";
import { LoggerService } from "@akashnetwork/logging";

import { getGenesis } from "@src/chain/genesisImporter";
import { indexers } from "@src/indexers";
import { ExecutionMode, executionMode } from "@src/shared/constants";
import { sequelize } from "./dbConnection";

const logger = LoggerService.forContext("BuildDatabase");

/**
* Initiate database schema
*/
export const initDatabase = async () => {
console.log(`Connecting to db (${sequelize.config.host}/${sequelize.config.database})...`);
logger.info(`Connecting to db (${sequelize.config.host}/${sequelize.config.database})...`);
await sequelize.authenticate();
console.log("Connection has been established successfully.");
logger.info("Connection has been established successfully.");

if (executionMode === ExecutionMode.RebuildAll) {
await Day.drop({ cascade: true });
Expand All @@ -41,7 +44,7 @@ export const initDatabase = async () => {
if (!activeChain.startHeight) {
const firstBlock = await Block.findOne();
if (!firstBlock) {
console.log("First time syncing, seeding from genesis file...");
logger.info("First time syncing, seeding from genesis file...");

const genesis = await getGenesis();
for (const indexer of indexers) {
Expand Down
9 changes: 6 additions & 3 deletions apps/indexer/src/db/keybaseProvider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { Validator } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import fetch from "node-fetch";
import { Op } from "sequelize";

const logger = LoggerService.forContext("KeybaseProvider");

export async function fetchValidatorKeybaseInfos() {
const validators = await Validator.findAll({
where: {
Expand All @@ -12,11 +15,11 @@ export async function fetchValidatorKeybaseInfos() {
const requests = validators.map(async validator => {
try {
if (!/^[A-F0-9]{16}$/.test(validator.identity)) {
console.warn("Invalid identity " + validator.identity + " for validator " + validator.operatorAddress);
logger.warn("Invalid identity " + validator.identity + " for validator " + validator.operatorAddress);
return Promise.resolve();
}

console.log("Fetching keybase info for " + validator.operatorAddress);
logger.info("Fetching keybase info for " + validator.operatorAddress);
const response = await fetch(`https://keybase.io/_/api/1.0/user/lookup.json?key_suffix=${validator.identity}`);

if (response.status === 200) {
Expand All @@ -31,7 +34,7 @@ export async function fetchValidatorKeybaseInfos() {

await validator.save();
} catch (err) {
console.error("Error while fetching keybase info for " + validator.operatorAddress);
logger.error("Error while fetching keybase info for " + validator.operatorAddress);
throw err;
}
});
Expand Down
9 changes: 6 additions & 3 deletions apps/indexer/src/db/priceHistoryProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Day } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import { isSameDay } from "date-fns";
import fetch from "node-fetch";

Expand All @@ -9,15 +10,17 @@ interface PriceHistoryResponse {
total_volumes: Array<Array<number>>;
}

const logger = LoggerService.forContext("PriceHistoryProvider");

export const syncPriceHistory = async () => {
if (!activeChain.coinGeckoId) {
console.log("No coin gecko id defined for this chain. Skipping price history sync.");
logger.info("No coin gecko id defined for this chain. Skipping price history sync.");
return;
}

const endpointUrl = `https://api.coingecko.com/api/v3/coins/${activeChain.coinGeckoId}/market_chart?vs_currency=usd&days=360`;

console.log("Fetching latest market data from " + endpointUrl);
logger.info("Fetching latest market data from " + endpointUrl);

const response = await fetch(endpointUrl);
const data: PriceHistoryResponse = await response.json();
Expand All @@ -26,7 +29,7 @@ export const syncPriceHistory = async () => {
price: pDate[1]
}));

console.log(`There are ${apiPrices.length} prices to update.`);
logger.info(`There are ${apiPrices.length} prices to update.`);

const days = await Day.findAll();

Expand Down
Loading

0 comments on commit 9400dc9

Please sign in to comment.