Skip to content

Commit

Permalink
Merge pull request #36 from RedisLabsModules/fixed_cursor_leak
Browse files Browse the repository at this point in the history
fixed cursor leak on flush command
  • Loading branch information
mnunberg authored Oct 16, 2018
2 parents e5af54b + f256e77 commit bb4681f
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/dep/rmr/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void _MRClsuter_UpdateNodes(MRCluster *cl) {
/* See if this is us - if so we need to update the cluster's host and current id */
if (node->flags & MRNode_Self) {
cl->myNode = node;
cl->myshard = &cl->topo->shards[sh];
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/dep/rmr/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct {
MRClusterTopology *topo;
/* the current node, detected when updating the topology */
MRClusterNode *myNode;
MRClusterShard *myshard;
/* The sharding functino, responsible for transforming keys into slots */
ShardFunc sf;

Expand Down
3 changes: 3 additions & 0 deletions src/dep/rmr/rmr.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,12 @@ size_t MR_NumHosts() {
return cluster_g ? MRCluster_NumHosts(cluster_g) : 0;
}


void SetMyPartition(MRClusterTopology *ct, MRClusterShard* myShard);
/* on-loop update topology request. This can't be done from the main thread */
static void uvUpdateTopologyRequest(struct MRRequestCtx *mc) {
MRCLuster_UpdateTopology(cluster_g, (MRClusterTopology *)mc->ctx);
SetMyPartition((MRClusterTopology *)mc->ctx, cluster_g->myshard);
RQ_Done(rq_g);
// fprintf(stderr, "topo update: conc requests: %d\n", concurrentRequests_g);
free(mc);
Expand Down
18 changes: 13 additions & 5 deletions src/dist_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,14 @@ int NetworkFetcher_Start(struct netCtx *nc) {
int getAggregateFields(FieldList *l, RedisModuleCtx *ctx, CmdArg *cmd);

ResultProcessor *AggregatePlan_BuildProcessorChain(AggregatePlan *plan, RedisSearchCtx *sctx,
ResultProcessor *root, char **err);
ResultProcessor *root, QueryError *status);

static ResultProcessor *buildDistributedProcessorChain(QueryPlan *plan, void *ctx, char **err) {
static ResultProcessor *buildDistributedProcessorChain(QueryPlan *plan, void *ctx, QueryError *status) {
AggregatePlan *ap = ctx;
AggregatePlan *remote = AggregatePlan_MakeDistributed(ap);
if (!remote) {
SET_ERR(err, "Could not process plan for distribution");
// todo: I think this can not happened anyway.
QueryError_SetError(status, QUERY_EGENERIC, "Could not process plan for distribution");
return NULL;
}

Expand All @@ -317,7 +318,7 @@ static ResultProcessor *buildDistributedProcessorChain(QueryPlan *plan, void *ct

ResultProcessor *root = NewNetworkFetcher(plan->ctx, xcmd, GetSearchCluster());
root->ctx.qxc = &plan->execCtx;
ResultProcessor *ret = AggregatePlan_BuildProcessorChain(ap, plan->ctx, root, err);
ResultProcessor *ret = AggregatePlan_BuildProcessorChain(ap, plan->ctx, root, status);
// err is null if there was a problem building the chain.
// If it's not NULL we need to start the network fetcher now
if (ret) {
Expand All @@ -333,6 +334,13 @@ void AggregateCommand_ExecDistAggregate(RedisModuleCtx *ctx, RedisModuleString *
.flags = AGGREGATE_REQUEST_NO_CONCURRENT |
AGGREGATE_REQUEST_NO_PARSE_QUERY |
AGGREGATE_REQUEST_SPECLESS};
settings.cursorLookupName = RedisModule_StringPtrLen(argv[1], NULL);
size_t originalLookupNameLen;
const char* originalLookupName = RedisModule_StringPtrLen(argv[1], &originalLookupNameLen);
SearchCluster *sc = GetSearchCluster();
const char *partTag = PartitionTag(&sc->part, sc->myPartition);
size_t taggedLookupNameLen;
char* taggedLookupName = writeTaggedId(originalLookupName, originalLookupNameLen, partTag, strlen(partTag), &taggedLookupNameLen);
settings.cursorLookupName = taggedLookupName;
AggregateCommand_ExecAggregateEx(ctx, argv, argc, ccx, &settings);
free(taggedLookupName);
}
9 changes: 6 additions & 3 deletions src/partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ size_t PartitionForKey(PartitionCtx *ctx, const char *key, size_t len) {
return fnv_32a_buf((void *)key, len, 0) % ctx->size;
}

int GetSlotByPartition(PartitionCtx *ctx, size_t partition){
size_t step = ctx->tableSize / ctx->size;
return ((partition + 1) * step - 1) % ctx->tableSize;
}

const char *PartitionTag(PartitionCtx *ctx, size_t partition) {

if (partition > ctx->size) {
return NULL;
}

size_t step = ctx->tableSize / ctx->size;
// printf("parition %d, index %d\n", partition, partition * (sp->tableSize / sp->size));
return ctx->table[((partition + 1) * step - 1) % ctx->tableSize];
return ctx->table[GetSlotByPartition(ctx, partition)];
}

void PartitionCtx_Init(PartitionCtx *ctx, size_t numPartitions, const char **table,
Expand Down
2 changes: 2 additions & 0 deletions src/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ typedef struct {

size_t PartitionForKey(PartitionCtx *ctx, const char *key, size_t len);

int GetSlotByPartition(PartitionCtx *ctx, size_t partition);

const char *PartitionTag(PartitionCtx *ctx, size_t partition);

void PartitionCtx_Init(PartitionCtx *ctx, size_t numPartitions, const char **table,
Expand Down
13 changes: 12 additions & 1 deletion src/search_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inline int SearchCluster_Ready(SearchCluster *sc) {
return sc != NULL && sc->size != 0 && sc->part.table != NULL;
}

static char *writeTaggedId(const char *key, size_t keyLen, const char *tag, size_t tagLen,
char *writeTaggedId(const char *key, size_t keyLen, const char *tag, size_t tagLen,
size_t *taggedLen) {
size_t total = keyLen + tagLen + 3; // +3 because of '{', '}', and NUL
char *tagged = malloc(total);
Expand Down Expand Up @@ -256,3 +256,14 @@ void SearchCluster_EnsureSize(RedisModuleCtx *ctx, SearchCluster *c, MRClusterTo
PartitionCtx_SetSize(&c->part, topo->numShards);
}
}

void SetMyPartition(MRClusterTopology *ct, MRClusterShard* myShard){
SearchCluster *c = GetSearchCluster();
for (size_t i = 0 ; i < c->size ; ++i){
int slot = GetSlotByPartition(&c->part, i);
if (myShard->startSlot <= slot && myShard->endSlot >= slot){
c->myPartition = i;
return;
}
}
}
6 changes: 6 additions & 0 deletions src/search_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
typedef struct {
size_t size;
PartitionCtx part;
size_t myPartition;

} SearchCluster;

Expand Down Expand Up @@ -48,4 +49,9 @@ int SearchCluster_RewriteCommandArg(SearchCluster *c, MRCommand *cmd, int partit
/* Make sure that if the cluster is unaware of its sizing, it will take the size from the topology
*/
void SearchCluster_EnsureSize(RedisModuleCtx *ctx, SearchCluster *c, MRClusterTopology *topo);

void SetMyPartition(MRClusterTopology *ct, MRClusterShard* myShard);

char *writeTaggedId(const char *key, size_t keyLen, const char *tag, size_t tagLen,
size_t *taggedLen);
#endif

0 comments on commit bb4681f

Please sign in to comment.