forked from Kimahriman/mpihdfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MPISync.c
157 lines (115 loc) · 4.48 KB
/
MPISync.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
* Synchronous I/O functions
*/
#include "MPIHook.h"
/* Section 9.4.2 */
int MPI_File_read_at(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status)
{
hdfsFile_wrapper *fh_w;
int ret, size;
fh_w = (hdfsFile_wrapper*)fh;
if (fh_w->magic != HDFSFILEMAGIC)
{
int (*real_MPI_File_read_at)(MPI_File, MPI_Offset, void*, int, MPI_Datatype, MPI_Status*) = NULL;
real_MPI_File_read_at = dlsym(RTLD_NEXT, "MPI_File_read_at");
if (!real_MPI_File_read_at)
fprintf(stderr, "Failed to load actual MPI_File_close location.\n");
return -1;
status("Passing File_read_at to actual MPI function.\n");
return real_MPI_File_read_at(fh, offset, buf, count, datatype, status);
}
status("HDFS file found in File_read_at.\n");
//MPI_Type_size(datatype, &size);
size = 1;
status("Got type size: %d.\n", size);
status("Offset: %d\n", offset);
ret = hdfsPread(fh_w->fs, fh_w->file, offset, buf, size * count);
if (ret == -1)
return -1;
return MPI_SUCCESS;
}
int MPI_File_read_at_all(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status) { NOT_IMPLEMENTED; }
int MPI_File_write_at(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status) { NOT_IMPLEMENTED; }
int MPI_File_write_at_all(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status) { NOT_IMPLEMENTED; }
/* nonblocking calls currently use MPIO_Request, because generalized
requests not yet implemented. For the same reason, MPIO_Test and
MPIO_Wait are used to test and wait on nonblocking I/O requests */
int MPI_File_iread_at(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPIO_Request *request) { NOT_IMPLEMENTED; }
int MPI_File_iwrite_at(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPIO_Request *request) { NOT_IMPLEMENTED; }
/* Section 9.4.3 */
int MPI_File_read(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status)
{
hdfsFile_wrapper *fh_w;
int ret, size;
fh_w = (hdfsFile_wrapper*)fh;
if (fh_w->magic != HDFSFILEMAGIC)
{
int (*real_MPI_File_read)(MPI_File, void*, int, MPI_Datatype, MPI_Status*) = NULL;
real_MPI_File_read = dlsym(RTLD_NEXT, "MPI_File_read");
if (!real_MPI_File_read)
fprintf(stderr, "Failed to load actual MPI_File_close location.\n");
return -1;
status("Passing File_read to actual MPI function.\n");
return real_MPI_File_read(fh, buf, count, datatype, status);
}
status("HDFS file found in File_read_at.\n");
//MPI_Type_size(datatype, &size);
size = 1;
status("Got type size: %d.\n", size);
ret = hdfsRead(fh_w->fs, fh_w->file, buf, size * count);
if (ret == -1)
return -1;
return MPI_SUCCESS;
}
int MPI_File_read_all(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status) { NOT_IMPLEMENTED; }
int MPI_File_write(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status)
{
hdfsFile_wrapper *fh_w;
int ret, size, mode;
fh_w = (hdfsFile_wrapper*)fh;
if (fh_w->magic != HDFSFILEMAGIC)
{
int (*real_MPI_File_write)(MPI_File, void*, int, MPI_Datatype, MPI_Status*) = NULL;
real_MPI_File_write = dlsym(RTLD_NEXT, "MPI_File_write");
if (!real_MPI_File_write)
fprintf(stderr, "Failed to load actual MPI_File_close location.\n");
return -1;
status("Passing File_write to actual MPI function.\n");
return real_MPI_File_write(fh, buf, count, datatype, status);
}
status("HDFS file found in File_read_at.\n");
if (fh_w->file != NULL || !(fh_w->amode & MPI_MODE_WRONLY)) {
fprintf(stderr, "Write called on a file not open for writing.\n");
return -1;
}
if (!hdfsExists(fh_w->fs, fh_w->filename))
mode = O_WRONLY | O_APPEND;
else
mode = O_WRONLY;
fh_w->file = hdfsOpenFile(fh_w->fs, fh_w->filename, mode, 0, 0, 0);
if (!fh_w->file) {
fprintf(stderr, "Failed to open hdfs file for writing.\n");
return -1;
}
//MPI_Type_size(datatype, &size);
size = 1;
status("Got type size: %d.\n", size);
ret = hdfsWrite(fh_w->fs, fh_w->file, buf, size * count);
if (ret == -1)
fprintf(stderr, "Failed to write to hdfs file.\n");
if (hdfsCloseFile(fh_w->fs, fh_w->file)) {
fprintf(stderr, "Failed to close file after hdfs write.\n");
return -1;
}
fh_w->file = NULL;
if (ret == -1)
return -1;
return MPI_SUCCESS;
}
int MPI_File_write_all(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status) { NOT_IMPLEMENTED; }