diff --git a/src/dep/rmr/cluster.c b/src/dep/rmr/cluster.c index 5336ab917..5c13fb915 100644 --- a/src/dep/rmr/cluster.c +++ b/src/dep/rmr/cluster.c @@ -45,6 +45,7 @@ void _MRClsuter_UpdateNodes(MRCluster *cl) { /* See if this is us - if so we need to update the cluster's host and current id */ if (node->flags & MRNode_Self) { cl->myNode = node; + cl->myshard = &cl->topo->shards[sh]; } } } diff --git a/src/dep/rmr/cluster.h b/src/dep/rmr/cluster.h index b8465523b..9f8bdd689 100644 --- a/src/dep/rmr/cluster.h +++ b/src/dep/rmr/cluster.h @@ -70,6 +70,7 @@ typedef struct { MRClusterTopology *topo; /* the current node, detected when updating the topology */ MRClusterNode *myNode; + MRClusterShard *myshard; /* The sharding functino, responsible for transforming keys into slots */ ShardFunc sf; diff --git a/src/dep/rmr/rmr.c b/src/dep/rmr/rmr.c index 0c48978dd..40c9cfb3b 100644 --- a/src/dep/rmr/rmr.c +++ b/src/dep/rmr/rmr.c @@ -372,9 +372,12 @@ size_t MR_NumHosts() { return cluster_g ? MRCluster_NumHosts(cluster_g) : 0; } + +void SetMyPartition(MRClusterTopology *ct, MRClusterShard* myShard); /* on-loop update topology request. This can't be done from the main thread */ static void uvUpdateTopologyRequest(struct MRRequestCtx *mc) { MRCLuster_UpdateTopology(cluster_g, (MRClusterTopology *)mc->ctx); + SetMyPartition((MRClusterTopology *)mc->ctx, cluster_g->myshard); RQ_Done(rq_g); // fprintf(stderr, "topo update: conc requests: %d\n", concurrentRequests_g); free(mc); diff --git a/src/dist_aggregate.c b/src/dist_aggregate.c index f5a46ab5a..d6e001b03 100644 --- a/src/dist_aggregate.c +++ b/src/dist_aggregate.c @@ -289,13 +289,14 @@ int NetworkFetcher_Start(struct netCtx *nc) { int getAggregateFields(FieldList *l, RedisModuleCtx *ctx, CmdArg *cmd); ResultProcessor *AggregatePlan_BuildProcessorChain(AggregatePlan *plan, RedisSearchCtx *sctx, - ResultProcessor *root, char **err); + ResultProcessor *root, QueryError *status); -static ResultProcessor *buildDistributedProcessorChain(QueryPlan *plan, void *ctx, char **err) { +static ResultProcessor *buildDistributedProcessorChain(QueryPlan *plan, void *ctx, QueryError *status) { AggregatePlan *ap = ctx; AggregatePlan *remote = AggregatePlan_MakeDistributed(ap); if (!remote) { - SET_ERR(err, "Could not process plan for distribution"); + // todo: I think this can not happened anyway. + QueryError_SetError(status, QUERY_EGENERIC, "Could not process plan for distribution"); return NULL; } @@ -317,7 +318,7 @@ static ResultProcessor *buildDistributedProcessorChain(QueryPlan *plan, void *ct ResultProcessor *root = NewNetworkFetcher(plan->ctx, xcmd, GetSearchCluster()); root->ctx.qxc = &plan->execCtx; - ResultProcessor *ret = AggregatePlan_BuildProcessorChain(ap, plan->ctx, root, err); + ResultProcessor *ret = AggregatePlan_BuildProcessorChain(ap, plan->ctx, root, status); // err is null if there was a problem building the chain. // If it's not NULL we need to start the network fetcher now if (ret) { @@ -333,6 +334,13 @@ void AggregateCommand_ExecDistAggregate(RedisModuleCtx *ctx, RedisModuleString * .flags = AGGREGATE_REQUEST_NO_CONCURRENT | AGGREGATE_REQUEST_NO_PARSE_QUERY | AGGREGATE_REQUEST_SPECLESS}; - settings.cursorLookupName = RedisModule_StringPtrLen(argv[1], NULL); + size_t originalLookupNameLen; + const char* originalLookupName = RedisModule_StringPtrLen(argv[1], &originalLookupNameLen); + SearchCluster *sc = GetSearchCluster(); + const char *partTag = PartitionTag(&sc->part, sc->myPartition); + size_t taggedLookupNameLen; + char* taggedLookupName = writeTaggedId(originalLookupName, originalLookupNameLen, partTag, strlen(partTag), &taggedLookupNameLen); + settings.cursorLookupName = taggedLookupName; AggregateCommand_ExecAggregateEx(ctx, argv, argc, ccx, &settings); + free(taggedLookupName); } diff --git a/src/partition.c b/src/partition.c index c383c2e8c..356e67ad7 100644 --- a/src/partition.c +++ b/src/partition.c @@ -6,15 +6,18 @@ size_t PartitionForKey(PartitionCtx *ctx, const char *key, size_t len) { return fnv_32a_buf((void *)key, len, 0) % ctx->size; } +int GetSlotByPartition(PartitionCtx *ctx, size_t partition){ + size_t step = ctx->tableSize / ctx->size; + return ((partition + 1) * step - 1) % ctx->tableSize; +} + const char *PartitionTag(PartitionCtx *ctx, size_t partition) { if (partition > ctx->size) { return NULL; } - size_t step = ctx->tableSize / ctx->size; - // printf("parition %d, index %d\n", partition, partition * (sp->tableSize / sp->size)); - return ctx->table[((partition + 1) * step - 1) % ctx->tableSize]; + return ctx->table[GetSlotByPartition(ctx, partition)]; } void PartitionCtx_Init(PartitionCtx *ctx, size_t numPartitions, const char **table, diff --git a/src/partition.h b/src/partition.h index 56b1ffcce..e9bbb7014 100644 --- a/src/partition.h +++ b/src/partition.h @@ -17,6 +17,8 @@ typedef struct { size_t PartitionForKey(PartitionCtx *ctx, const char *key, size_t len); +int GetSlotByPartition(PartitionCtx *ctx, size_t partition); + const char *PartitionTag(PartitionCtx *ctx, size_t partition); void PartitionCtx_Init(PartitionCtx *ctx, size_t numPartitions, const char **table, diff --git a/src/search_cluster.c b/src/search_cluster.c index d2229ba71..7e0130b57 100644 --- a/src/search_cluster.c +++ b/src/search_cluster.c @@ -24,7 +24,7 @@ inline int SearchCluster_Ready(SearchCluster *sc) { return sc != NULL && sc->size != 0 && sc->part.table != NULL; } -static char *writeTaggedId(const char *key, size_t keyLen, const char *tag, size_t tagLen, +char *writeTaggedId(const char *key, size_t keyLen, const char *tag, size_t tagLen, size_t *taggedLen) { size_t total = keyLen + tagLen + 3; // +3 because of '{', '}', and NUL char *tagged = malloc(total); @@ -256,3 +256,14 @@ void SearchCluster_EnsureSize(RedisModuleCtx *ctx, SearchCluster *c, MRClusterTo PartitionCtx_SetSize(&c->part, topo->numShards); } } + +void SetMyPartition(MRClusterTopology *ct, MRClusterShard* myShard){ + SearchCluster *c = GetSearchCluster(); + for (size_t i = 0 ; i < c->size ; ++i){ + int slot = GetSlotByPartition(&c->part, i); + if (myShard->startSlot <= slot && myShard->endSlot >= slot){ + c->myPartition = i; + return; + } + } +} diff --git a/src/search_cluster.h b/src/search_cluster.h index f79c5718e..94a2cc303 100644 --- a/src/search_cluster.h +++ b/src/search_cluster.h @@ -10,6 +10,7 @@ typedef struct { size_t size; PartitionCtx part; + size_t myPartition; } SearchCluster; @@ -48,4 +49,9 @@ int SearchCluster_RewriteCommandArg(SearchCluster *c, MRCommand *cmd, int partit /* Make sure that if the cluster is unaware of its sizing, it will take the size from the topology */ void SearchCluster_EnsureSize(RedisModuleCtx *ctx, SearchCluster *c, MRClusterTopology *topo); + +void SetMyPartition(MRClusterTopology *ct, MRClusterShard* myShard); + +char *writeTaggedId(const char *key, size_t keyLen, const char *tag, size_t tagLen, + size_t *taggedLen); #endif \ No newline at end of file