-
Notifications
You must be signed in to change notification settings - Fork 40
/
r_merge.c
96 lines (82 loc) · 2.4 KB
/
r_merge.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include "r_split.h"
typedef struct block
{
int64_t id;
size_t blockSize;
FILE *inputFile;
} block_t;
//prints to stdout, could be modified to write to a file
void MergeInput(char *inputFileNames[], unsigned int numInputFiles)
{
//ring buffer, this implemntation assumes that each round over the file will have the next complete set of blocks
//(ie in 2 blocks, block 1 will have ids 0..2..4 and block 2 will have 1..3..5)
block_t *blockBuf = malloc(sizeof(block_t) * numInputFiles);
size_t bufLen = BUFLEN; //buffer length, would be resized as needed
char *buffer = malloc(bufLen + 1);
int64_t id;
size_t blockSize;
bool isLast;
for (int i = 0; i < numInputFiles; i++)
{
FILE *inputFile = fopen(inputFileNames[i], "r");
if (!inputFile)
{
perror(LOC);
exit(1);
}
blockBuf[i].inputFile = inputFile;
}
int bufferIdx = 0;
int64_t nextID = 0;
for(;;) {
//read header
readHeader(blockBuf[bufferIdx].inputFile, &id, &blockSize, &isLast);
if (feof(blockBuf[bufferIdx].inputFile)) {
PRINTDBG("r_merge: End of file");
break;
}
blockBuf[bufferIdx].id = id;
blockBuf[bufferIdx].blockSize = blockSize;
size_t tot_read = 0, readSize = 0;
// fprintf(stderr, "start reading block %ld\n", id);
while (tot_read < blockSize) {
readSize = MIN(bufLen, blockSize-tot_read);
readSize = handle_reading(buffer, readSize, blockBuf[bufferIdx].inputFile);
tot_read += readSize;
safeWrite(buffer, 1, readSize, stdout);
}
fflush(stdout); //only need to flush after block ended
assert(tot_read == blockSize);
if (isLast) {
bufferIdx = (bufferIdx + 1) % numInputFiles;
nextID += 1;
}
};
//assert reason while loop terminated is eof
assert(feof(blockBuf[bufferIdx].inputFile));
//clean up
for (int i = 0; i < numInputFiles; i++)
{
fclose(blockBuf[i].inputFile);
}
free(blockBuf);
free(buffer);
}
int main(int argc, char *argv[])
{
// args#2... -> input file names
if (argc < 2)
{
// TODO print usage string
fprintf(stderr, "missing input!\n");
exit(1);
}
char **inputFileNames = (char **)malloc((argc - 1) * sizeof(char *));
for (int i = 1; i < argc; i++)
{
inputFileNames[i - 1] = calloc(strlen(argv[i]) + 1, sizeof(char));
strcpy(inputFileNames[i - 1], argv[i]);
}
MergeInput(inputFileNames, argc - 1);
return 0;
}