Skip to content

Commit

Permalink
Merge pull request #3410 from OriginTrail/v8/release/devnet
Browse files Browse the repository at this point in the history
OriginTrail Testnet Release v8.1.2-beta
  • Loading branch information
u-hubar authored Nov 12, 2024
2 parents c6b2910 + 3a689bc commit 096b718
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 289 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "8.1.1+beta.3",
"version": "8.1.2+beta.0",
"description": "OTNode V8",
"main": "index.js",
"type": "module",
Expand Down
126 changes: 92 additions & 34 deletions src/commands/paranet/paranet-sync-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import {
PARANET_SYNC_FREQUENCY_MILLS,
OPERATION_ID_STATUS,
CONTENT_ASSET_HASH_FUNCTION_ID,
SIMPLE_ASSET_SYNC_PARAMETERS,
PARANET_SYNC_PARAMETERS,
PARANET_SYNC_KA_COUNT,
PARANET_SYNC_RETRIES_LIMIT,
PARANET_SYNC_RETRY_DELAY_MS,
OPERATION_STATUS,
PARANET_NODES_ACCESS_POLICIES,
PARANET_SYNC_SOURCES,
TRIPLE_STORE_REPOSITORIES,
LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS,
LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY,
} from '../../constants/constants.js';

class ParanetSyncCommand extends Command {
Expand Down Expand Up @@ -64,7 +67,11 @@ class ParanetSyncCommand extends Command {
PARANET_SYNC_RETRY_DELAY_MS,
);

this.logger.info(`Paranet sync: Total amount of missed assets: ${totalMissedAssetsCount}`);
const paranetRepository = this.paranetService.getParanetRepositoryName(paranetUAL);

this.logger.info(
`Paranet sync: Paranet: ${paranetUAL} (${paranetId}) Total count of Paranet KAs in the contract: ${contractKaCount}; Synced KAs count: ${syncedAssetsCount}; Local Stored KAs count: ${localStoredAssetsCount}; Total count of missed KAs: ${totalMissedAssetsCount}`,
);

// First, attempt to sync missed KAs if any exist
if (missedAssetsCount > 0) {
Expand All @@ -81,8 +88,8 @@ class ParanetSyncCommand extends Command {
const [successulMissedSyncsCount, failedMissedSyncsCount] = await this.syncMissedKAs(
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
);

Expand Down Expand Up @@ -122,8 +129,8 @@ class ParanetSyncCommand extends Command {
contractKaCount,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
);

Expand Down Expand Up @@ -162,14 +169,14 @@ class ParanetSyncCommand extends Command {
blockchain,
contract,
tokenId,
keyword,
assertionIds,
stateIndex,
paranetId,
paranetTokenId,
latestAsset,
latestState,
paranetUAL,
paranetNodesAccessPolicy,
paranetMetadata,
paranetRepository,
) {
const assertionId = assertionIds[stateIndex];

Expand Down Expand Up @@ -210,12 +217,6 @@ class ParanetSyncCommand extends Command {
state: assertionId,
hashFunctionId: CONTENT_ASSET_HASH_FUNCTION_ID,
assertionId,
assetSync: true,
stateIndex,
paranetSync: true,
paranetTokenId,
paranetLatestAsset: latestAsset,
paranetMetadata,
},
transactional: false,
});
Expand All @@ -233,14 +234,6 @@ class ParanetSyncCommand extends Command {
state: assertionId,
hashFunctionId: CONTENT_ASSET_HASH_FUNCTION_ID,
assertionId,
assetSync: true,
stateIndex,
paranetSync: true,
paranetTokenId,
paranetLatestAsset: latestAsset,
paranetUAL,
paranetId,
paranetMetadata,
},
transactional: false,
});
Expand All @@ -255,16 +248,16 @@ class ParanetSyncCommand extends Command {
let attempt = 0;
let getResult;
do {
await setTimeout(SIMPLE_ASSET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS);
await setTimeout(PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS);
getResult = await this.operationIdService.getOperationIdRecord(getOperationId);
attempt += 1;
} while (
attempt < SIMPLE_ASSET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS &&
attempt < PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS &&
getResult?.status !== OPERATION_ID_STATUS.FAILED &&
getResult?.status !== OPERATION_ID_STATUS.COMPLETED
);

if (!getResult || getResult?.status === OPERATION_ID_STATUS.FAILED) {
if (getResult?.status !== OPERATION_ID_STATUS.COMPLETED) {
this.logger.warn(
`Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, GET result: ${JSON.stringify(
getResult,
Expand All @@ -276,12 +269,72 @@ class ParanetSyncCommand extends Command {
ual,
paranetUal: paranetUAL,
});

return false;
}

const data = await this.operationIdService.getCachedOperationIdData(getOperationId);

this.logger.debug(
`Paranet sync: ${data.nquads.length} nquads found for asset with ual: ${ual}, state index: ${stateIndex}, assertionId: ${assertionId}`,
);

let repository;
if (latestState) {
repository = paranetRepository;
} else if (paranetNodesAccessPolicy === 'OPEN') {
repository = TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY;
} else if (paranetNodesAccessPolicy === 'CURATED') {
repository = TRIPLE_STORE_REPOSITORIES.PRIVATE_HISTORY;
} else {
throw new Error('Unsupported access policy');
}

await this.tripleStoreService.localStoreAsset(
repository,
assertionId,
data.nquads,
blockchain,
contract,
tokenId,
keyword,
LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS,
LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY,
);
if (paranetNodesAccessPolicy === 'CURATED' && data.privateNquads) {
await this.tripleStoreService.localStoreAsset(
repository,
data.syncedAssetRecord.privateAssertionId,
data.privateNquads,
blockchain,
contract,
tokenId,
keyword,
);
}
const privateAssertionId =
paranetNodesAccessPolicy === 'CURATED'
? data.syncedAssetRecord?.privateAssertionId
: null;

await this.repositoryModuleManager.incrementParanetKaCount(paranetId, blockchain);
await this.repositoryModuleManager.createParanetSyncedAssetRecord(
blockchain,
ual,
paranetUAL,
assertionId,
privateAssertionId,
data.syncedAssetRecord?.sender,
data.syncedAssetRecord?.transactionHash,
PARANET_SYNC_SOURCES.SYNC,
);

return true;
} catch (error) {
this.logger.warn(
`Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, error: ${error}`,
);

await this.repositoryModuleManager.createMissedParanetAssetRecord({
blockchainId: blockchain,
ual,
Expand All @@ -290,8 +343,6 @@ class ParanetSyncCommand extends Command {

return false;
}

return true;
}

async syncAsset(
Expand All @@ -301,8 +352,8 @@ class ParanetSyncCommand extends Command {
tokenId,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
removeMissingAssetRecord = false,
) {
Expand All @@ -316,7 +367,14 @@ class ParanetSyncCommand extends Command {
contract,
tokenId,
);
const { tokenId: paranetTokenId } = this.ualService.resolveUAL(paranetUAL);

const keyword = await this.ualService.calculateLocationKeyword(
blockchain,
contract,
tokenId,
assertionIds[0],
);

let isSuccessful = true;
for (let stateIndex = 0; stateIndex < assertionIds.length; stateIndex += 1) {
isSuccessful =
Expand All @@ -326,14 +384,14 @@ class ParanetSyncCommand extends Command {
blockchain,
contract,
tokenId,
keyword,
assertionIds,
stateIndex,
paranetId,
paranetTokenId,
stateIndex === assertionIds.length - 1,
paranetUAL,
paranetNodesAccessPolicy,
paranetMetadata,
paranetRepository,
));
}

Expand All @@ -359,8 +417,8 @@ class ParanetSyncCommand extends Command {
async syncMissedKAs(
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
) {
const missedParanetAssets =
Expand Down Expand Up @@ -392,8 +450,8 @@ class ParanetSyncCommand extends Command {
knowledgeAssetTokenId,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
true, // removeMissingAssetRecord
);
Expand All @@ -417,8 +475,8 @@ class ParanetSyncCommand extends Command {
contractKaCount,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
) {
let i = Number(startIndex);
Expand Down Expand Up @@ -488,8 +546,8 @@ class ParanetSyncCommand extends Command {
syncKATokenId,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
paranetRepository,
operationId,
false, // removeMissingAssetRecord
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,7 @@ class FindCuratedParanetNodesCommand extends Command {
);
return Command.empty();
}
// Stop command sequence if no nodes in paranet
if (paranetNodes.length === 0) {
await this.operationService.markOperationAsCompleted(operationId, blockchain, null, [
OPERATION_ID_STATUS.COMPLETED,
]);
return Command.empty();
}

return this.continueSequence(
{
...command.data,
Expand Down
7 changes: 6 additions & 1 deletion src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ export const SIMPLE_ASSET_SYNC_PARAMETERS = {
GET_RESULT_POLLING_MAX_ATTEMPTS: 30,
};

export const PARANET_SYNC_PARAMETERS = {
GET_RESULT_POLLING_INTERVAL_MILLIS: 1 * 1000,
GET_RESULT_POLLING_MAX_ATTEMPTS: 30,
};

export const COMMAND_TX_GAS_INCREASE_FACTORS = {
SUBMIT_COMMIT: 1.2,
SUBMIT_UPDATE_COMMIT: 1.2,
Expand Down Expand Up @@ -553,7 +558,7 @@ export const COMMAND_STATUS = {

export const OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER = 100;

export const REPOSITORY_ROWS_FOR_REMOVAL_MAX_NUMBER = 1000;
export const REPOSITORY_ROWS_FOR_REMOVAL_MAX_NUMBER = 10_000;

export const ARCHIVE_COMMANDS_FOLDER = 'commands';

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export async function up({ context: { queryInterface, Sequelize } }) {
await queryInterface.changeColumn('paranet_synced_asset', 'data_source', {
type: Sequelize.ENUM('sync', 'local_store'),
allowNull: true,
});
}

export async function down({ context: { queryInterface, Sequelize } }) {
await queryInterface.changeColumn('paranet_synced_asset', 'data_source', {
type: Sequelize.TEXT,
allowNull: true,
});
}
Loading

0 comments on commit 096b718

Please sign in to comment.