Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/remove get init #3445

Merged
merged 11 commits into from
Nov 29, 2024
80 changes: 28 additions & 52 deletions src/commands/local-store/local-store-command.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { OPERATION_ID_STATUS, ERROR_TYPE, LOCAL_STORE_TYPES } from '../../constants/constants.js';
import {
OPERATION_ID_STATUS,
ERROR_TYPE,
LOCAL_STORE_TYPES,
OPERATION_REQUEST_STATUS,
NETWORK_MESSAGE_TYPES,
} from '../../constants/constants.js';
import Command from '../command.js';

class LocalStoreCommand extends Command {
Expand All @@ -8,6 +14,7 @@ class LocalStoreCommand extends Command {
this.paranetService = ctx.paranetService;
this.pendingStorageService = ctx.pendingStorageService;
this.operationIdService = ctx.operationIdService;
this.operationService = ctx.publishService;
this.dataService = ctx.dataService;
this.ualService = ctx.ualService;
this.serviceAgreementService = ctx.serviceAgreementService;
Expand All @@ -24,6 +31,7 @@ class LocalStoreCommand extends Command {
blockchain,
storeType = LOCAL_STORE_TYPES.TRIPLE,
paranetId,
datasetRoot,
} = command.data;

try {
Expand All @@ -32,32 +40,27 @@ class LocalStoreCommand extends Command {
blockchain,
OPERATION_ID_STATUS.LOCAL_STORE.LOCAL_STORE_START,
);

const cachedData = await this.operationIdService.getCachedOperationIdData(operationId);
const dataset = await this.operationIdService.getCachedOperationIdData(operationId);

if (storeType === LOCAL_STORE_TYPES.TRIPLE) {
const storePromises = [];
if (cachedData.public.dataset && cachedData.public.datasetRoot) {
storePromises.push(
this.pendingStorageService.cacheDataset(
blockchain,
cachedData.public.datasetRoot,
cachedData.public.dataset,
operationId,
),
);
}
// if (cachedData.private?.assertion && cachedData.private?.assertionId) {
// storePromises.push(
// this.pendingStorageService.cacheDataset(
// blockchain,
// datasetRoot,
// dataset,
// operationId,
// ),
// );
// }
await Promise.all(storePromises);
await this.pendingStorageService.cacheDataset(
blockchain,
datasetRoot,
dataset,
operationId,
);
await this.operationService.processResponse(
command,
OPERATION_REQUEST_STATUS.COMPLETED,
{
messageType: NETWORK_MESSAGE_TYPES.RESPONSES.ACK,
messageData: {
signature: `signature-${Math.floor(Math.random() * 1000000) + 1}`,
},
},
null,
true,
);
} else if (storeType === LOCAL_STORE_TYPES.TRIPLE_PARANET) {
const paranetMetadata = await this.blockchainModuleManager.getParanetMetadata(
blockchain,
Expand All @@ -73,33 +76,6 @@ class LocalStoreCommand extends Command {
await this.tripleStoreModuleManager.initializeParanetRepository(paranetRepository);
await this.paranetService.initializeParanetRecord(blockchain, paranetId);

if (cachedData.public.dataset && cachedData.public.datasetRoot) {
// await this.tripleStoreService.localStoreAsset(
// paranetRepository,
// cachedData.public.assertionId,
// cachedData.public.assertion,
// blockchain,
// contract,
// tokenId,
// keyword,
// LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS,
// LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY,
// );
}
if (cachedData.private?.assertion && cachedData.private?.assertionId) {
// await this.tripleStoreService.localStoreAsset(
// paranetRepository,
// cachedData.private.assertionId,
// cachedData.private.assertion,
// blockchain,
// contract,
// tokenId,
// keyword,
// LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS,
// LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY,
// );
}

await this.repositoryModuleManager.incrementParanetKaCount(paranetId, blockchain);
// await this.repositoryModuleManager.createParanetSyncedAssetRecord(
// blockchain,
Expand Down
133 changes: 0 additions & 133 deletions src/commands/protocols/common/find-nodes-command.js

This file was deleted.

46 changes: 33 additions & 13 deletions src/commands/protocols/common/find-shard-command.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Command from '../../command.js';
import { OPERATION_ID_STATUS } from '../../../constants/constants.js';
import { OPERATION_ID_STATUS, ERROR_TYPE } from '../../../constants/constants.js';

class FindShardCommand extends Command {
constructor(ctx) {
Expand All @@ -14,28 +14,44 @@ class FindShardCommand extends Command {
* @param command
*/
async execute(command) {
const { operationId, blockchain, errorType, networkProtocols, minAckResponses } =
command.data;

this.errorType = errorType;
this.logger.debug(`Searching for shard for operationId: ${operationId}`);

const { operationId, blockchain, datasetRoot } = command.data;
this.errorType = ERROR_TYPE.FIND_SHARD.FIND_SHARD_ERROR;
this.logger.debug(
`Searching for shard for operationId: ${operationId}, dataset root: ${datasetRoot}`,
);
await this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
OPERATION_ID_STATUS.FIND_NODES_START,
);

// TODO: protocol selection
this.minAckResponses = await this.operationService.getMinAckResponses(blockchain);

const networkProtocols = this.operationService.getNetworkProtocols();

const shardNodes = [];
let nodePartOfShard = false;
const currentPeerId = this.networkModuleManager.getPeerId().toB58String();

const foundNodes = await this.findShardNodes(blockchain);
for (const node of foundNodes) {
if (node.id !== this.networkModuleManager.getPeerId().toB58String()) {
if (node.id === currentPeerId) {
nodePartOfShard = true;
} else {
shardNodes.push({ id: node.id, protocol: networkProtocols[0] });
}
}

this.logger.debug(`Found ${shardNodes.length} node(s) for operationId: ${operationId}`);
const commandSequence = this.getOperationCommandSequence(nodePartOfShard);

command.sequence.push(...commandSequence);

this.logger.debug(
`Found ${
shardNodes.length + nodePartOfShard ? 1 : 0
} node(s) for operationId: ${operationId}`,
);
// TODO: Log local node
this.logger.trace(
`Found shard: ${JSON.stringify(
shardNodes.map((node) => node.id),
Expand All @@ -44,11 +60,11 @@ class FindShardCommand extends Command {
)}`,
);

if (shardNodes.length < minAckResponses) {
if (shardNodes.length + (nodePartOfShard ? 1 : 0) < this.minAckResponses) {
await this.handleError(
operationId,
blockchain,
`Unable to find enough nodes for operationId: ${operationId}. Minimum number of nodes required: ${minAckResponses}`,
`Unable to find enough nodes for operationId: ${operationId}. Minimum number of nodes required: ${this.minAckResponses}`,
this.errorType,
true,
);
Expand All @@ -65,7 +81,7 @@ class FindShardCommand extends Command {
{
...command.data,
leftoverNodes: shardNodes,
numberOfShardNodes: shardNodes.length,
numberOfShardNodes: shardNodes.length + nodePartOfShard ? 1 : 0,
},
command.sequence,
);
Expand All @@ -87,6 +103,10 @@ class FindShardCommand extends Command {
return nodesFound;
}

getOperationCommandSequence() {
throw Error('getOperationCommandSequence() is not defined');
}

/**
* Builds default findShardCommand
* @param map
Expand Down
Loading