Skip to content

Commit

Permalink
Memory leaks fixed. dco interface updated.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Schanen committed Mar 12, 2014
1 parent 250e9ae commit ef0eb01
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 47 deletions.
11 changes: 8 additions & 3 deletions interfaces/dco/ampi_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ extern "C" {
//forward declare von AMPI

#ifndef DCO_AMPI
//void ampi_interpret_tape(long int idx) {}
void ampi_interpret_tape(long int idx) {}
#endif
//long int ampi_counter=0;


void ampi_get_val(void *buf, int *i, double *x) {
Expand Down Expand Up @@ -54,7 +55,7 @@ extern "C" {
int idx;
AMPI_data(const int nidx) : idx(nidx) {}
virtual ~AMPI_data() {
std::cout << "ampi_reset_entry with idx=" << idx << std::endl;
//std::cout << "ampi_reset_entry with idx=" << idx << std::endl;
ampi_reset_entry(idx);
}
};
Expand All @@ -67,12 +68,16 @@ extern "C" {

void ampi_create_tape_entry(long int *i) {
if(!global_tape->is_active()) {
std::cout << "tape is passive, not AMPI Callback will be created!" << std::endl;
// std::cout << "tape is passive, not AMPI Callback will be created!" << std::endl;
return;
}
//todo: insert an external function handler!!!
global_tape->register_external_function(&ampi_tape_wrapper, new AMPI_data(*i));
// ampi_counter++;
// std::cout << "ampi_counter: " << ampi_counter << endl;

//this will call ampi_interpret_tape
//std::cout << "i: " << *i << endl;
}

void ampi_create_dummies(void *buf, int *size) {
Expand Down
7 changes: 4 additions & 3 deletions src/ampi.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ int AMPI_Allreduce_b(double *sendbuf, double *recvbuf, int count, MPI_Datatype d
double **recvbuf_tmp = 0;
double *minmaxbuf_tmp = 0;
MPI_Request *requests = 0;
MPI_Status *status = 0;
double *s = 0;
double *s_d = 0;
double *r = 0;
Expand All @@ -473,20 +472,22 @@ int AMPI_Allreduce_b(double *sendbuf, double *recvbuf, int count, MPI_Datatype d
if(op == MPI_PROD || op == MPI_SUM) {
if(myid==0) {
requests = (MPI_Request*) malloc(sizeof(MPI_Request)*(numprocs-1));
status = (MPI_Status*) malloc(sizeof(MPI_Status)*(numprocs-1));
recvbuf_tmp = (double**) malloc(sizeof(double*)*numprocs);
recvbuf_tmp[0] = recvbuf;
for(i=1;i<numprocs;i++) {
recvbuf_tmp[i] = (double*) malloc(sizeof(double)*count);
MPI_Irecv(recvbuf_tmp[i], count, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, &requests[i-1]);
}
MPI_Waitall(numprocs-1,requests,status);
MPI_Waitall(numprocs-1,requests,MPI_STATUSES_IGNORE);
free(requests);
for(j=0;j<count;j++) {
for(i=1;i<numprocs;i++) {
recvbuf[j]=recvbuf[j]+recvbuf_tmp[i][j];
}
}
for(j=1;j<numprocs;j++) {
free(recvbuf_tmp[j]);
}
free(recvbuf_tmp);
}
else {
Expand Down
82 changes: 41 additions & 41 deletions src/ampi_tape.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ int AMPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, M
}
int i=0;
/*new AMPI request*/
AMPI_Request *request=(AMPI_Request*) malloc(sizeof(AMPI_Request));
AMPI_Request request;
/*new hash element*/
AMPI_ht_el *ht_el=(AMPI_ht_el*) malloc(sizeof(AMPI_ht_el));
ht_el->key=mpi_request;
request->buf = buf;
request->aw=0;
request->request=mpi_request;
request.buf = buf;
request.aw=0;
request.request=mpi_request;
double * tmp = (double*) malloc(sizeof(double)*count);
ampi_check_tape_size(count+1);
ampi_tape[ampi_vac+count].arg=(int*) malloc(sizeof(int)*2);
Expand Down Expand Up @@ -184,13 +184,12 @@ int AMPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, M
/*tape[tape_entry::vac+count].request = new AMPI_dco_Request;*/
/*}*/
/*point current request index to this tape entry*/
request->va = ampi_vac+count;
request->v = tmp;
int temp = AMPI_Isend_f(tmp, count, datatype, dest, tag, comm, request);
request.va = ampi_vac+count;
request.v = tmp;
int temp = AMPI_Isend_f(tmp, count, datatype, dest, tag, comm, &request);
ampi_vac+=count+1;
ht_el->request=*request;
ht_el->request=request;
HASH_ADD_PTR(AMPI_ht,key,ht_el);
free(request);
return temp;
}

Expand All @@ -202,13 +201,13 @@ int AMPI_Irecv(void *buf, int count, MPI_Datatype datatype, int dest, int tag, M
return MPI_Irecv(buf, count, datatype, dest, tag, comm, mpi_request);
}
int i=0;
AMPI_Request *request=(AMPI_Request*) malloc(sizeof(AMPI_Request));
AMPI_Request request;
/*INT64 tmp_int64 = 0;*/
AMPI_ht_el *ht_el=(AMPI_ht_el*) malloc(sizeof(AMPI_ht_el));
ht_el->key=mpi_request;
request->buf = buf;
request->aw=0;
request->request=mpi_request;
request.buf = buf;
request.aw=0;
request.request=mpi_request;
double * tmp = (double*) malloc(sizeof(double)*count);
ampi_check_tape_size(count+1);
ampi_tape[ampi_vac].arg=(int*) malloc(sizeof(int)*2);
Expand All @@ -217,7 +216,7 @@ int AMPI_Irecv(void *buf, int count, MPI_Datatype datatype, int dest, int tag, M
ampi_create_dummies(buf, &count);
ampi_create_tape_entry(&new_vac);

int temp = AMPI_Irecv_f(tmp, count, datatype, dest, tag, comm, request);
int temp = AMPI_Irecv_f(tmp, count, datatype, dest, tag, comm, &request);

ampi_tape[ampi_vac].arg[0] = count;
ampi_tape[ampi_vac].arg[1] = dest;
Expand Down Expand Up @@ -246,11 +245,10 @@ int AMPI_Irecv(void *buf, int count, MPI_Datatype datatype, int dest, int tag, M
/*}*/
/*request->r.v = tmp;*/
/*point current request index to this tape entry*/
request->va = ampi_vac;
request.va = ampi_vac;
ampi_vac+=count+1;
ht_el->request=*request;
ht_el->request=request;
HASH_ADD_PTR(AMPI_ht,key,ht_el);
free(request);
return temp;
}

Expand All @@ -261,42 +259,43 @@ int AMPI_Wait(MPI_Request *mpi_request, MPI_Status *status) {
int i=0;
int ret=0;
AMPI_ht_el *ht_req=NULL;
AMPI_Request *request=(AMPI_Request*) malloc(sizeof(AMPI_Request));
void *addr=mpi_request;
HASH_FIND_PTR(AMPI_ht,&addr,ht_req);
if(ht_req) {
*request=ht_req->request;
double * tmp = (double*) request->v;
AMPI_Request request;
/*AMPI_Request *request=(AMPI_Request*) malloc(sizeof(AMPI_Request));*/
request=ht_req->request;
double * tmp = (double*) request.v;
ampi_tape[ampi_vac].arg=(int*) malloc(sizeof(int));
ampi_check_tape_size(1);
long int new_vac = ampi_vac;

ampi_create_tape_entry(&new_vac);
/*get the corresponding isend or irecv tape entry*/

ampi_tape[ampi_vac].arg[0] = request->va;
ampi_tape[ampi_vac].arg[0] = request.va;
ampi_tape[ampi_vac].oc = WAIT;
ret=AMPI_Wait_f(request, status);
ret=AMPI_Wait_f(&request, status);
/*finally copy the request to the tape*/
if(status==MPI_STATUS_IGNORE)
request->tag=ampi_tape[request->va].tag;
request.tag=ampi_tape[request.va].tag;
else
request->tag=status->MPI_TAG;
*ampi_tape[request->va].request = *request;
ampi_tape[ampi_vac].request = ampi_tape[request->va].request;
if(request->oc == AMPI_IR) {
for(i=0 ; i<ampi_tape[request->va].arg[0] ; i=i+1) {
ampi_set_val(request->buf, &i, &tmp[i]);
request.tag=status->MPI_TAG;
*ampi_tape[request.va].request = request;
ampi_tape[ampi_vac].request = ampi_tape[request.va].request;
if(request.oc == AMPI_IR) {
for(i=0 ; i<ampi_tape[request.va].arg[0] ; i=i+1) {
ampi_set_val(request.buf, &i, &tmp[i]);
}

}
/*ampi_tape[ampi_vac].request->a = &ampi_tape[request->va].d;*/
ampi_tape[ampi_vac].request->size = ampi_tape[request->va].arg[0];
ampi_tape[ampi_vac].request->size = ampi_tape[request.va].arg[0];
ampi_vac++;
free(tmp);
HASH_DEL(AMPI_ht,ht_req);
free(ht_req);
return ret;
/*return MPI_Wait(mpi_request,status);*/
}
else {
return MPI_Wait(mpi_request,status);
Expand Down Expand Up @@ -760,14 +759,14 @@ int AMPI_Recv_init(void *buf, int count, MPI_Datatype datatype, int source, int
return MPI_Recv_init(buf, count, datatype, source, tag, comm, mpi_request);
}
ampi_check_tape_size(1);
AMPI_Request *request=(AMPI_Request*) malloc(sizeof(AMPI_Request));
AMPI_Request request;
AMPI_ht_el *ht_el=(AMPI_ht_el*) malloc(sizeof(AMPI_ht_el));
ampi_tape[ampi_vac].arg=(int*) malloc(sizeof(int)*2);
ht_el->key=mpi_request;
request->buf = buf;
request->aw=0;
request->va=ampi_vac;
ht_el->request=*request;
request.buf = buf;
request.aw=0;
request.va=ampi_vac;
ht_el->request=request;
ampi_create_tape_entry(&ampi_vac);
ampi_tape[ampi_vac].oc = RECV_INIT;
ampi_tape[ampi_vac].arg[0] = count;
Expand All @@ -786,14 +785,14 @@ int AMPI_Send_init(void *buf, int count, MPI_Datatype datatype, int dest, int ta
return MPI_Send_init(buf, count, datatype, dest, tag, comm, mpi_request);
}
ampi_check_tape_size(1);
AMPI_Request *request=(AMPI_Request*) malloc(sizeof(AMPI_Request));
AMPI_Request request;
AMPI_ht_el *ht_el=(AMPI_ht_el*) malloc(sizeof(AMPI_ht_el));
ampi_tape[ampi_vac].arg=(int*) malloc(sizeof(int)*2);
ht_el->key=mpi_request;
request->buf = buf;
request->aw=0;
request->va=ampi_vac;
ht_el->request=*request;
request.buf = buf;
request.aw=0;
request.va=ampi_vac;
ht_el->request=request;
ampi_create_tape_entry(&ampi_vac);
ampi_tape[ampi_vac].oc = SEND_INIT;
ampi_tape[ampi_vac].arg[0] = count;
Expand Down Expand Up @@ -1214,6 +1213,7 @@ void ampi_interpret_tape(long int idx){
}
}
void ampi_reset_entry(long int idx){
free(ampi_tape[idx].arg);
ampi_vac=idx;
}
void ampi_print_tape() {
Expand Down

0 comments on commit ef0eb01

Please sign in to comment.