diff --git a/capio-tests/multinode/integration/src/common.hpp b/capio-tests/multinode/integration/src/common.hpp index dd6ea314d..5b3f4bc36 100644 --- a/capio-tests/multinode/integration/src/common.hpp +++ b/capio-tests/multinode/integration/src/common.hpp @@ -58,44 +58,35 @@ static char fmtout[] = "%s/outfile_%05d.dat"; "%s/infile_%05d.dat"; // 5 is the number of digits of maxnumfiles // reading file data into memory -static inline char *readdata(FILE *fp, char *dataptr, size_t *datalen, size_t *datacapacity) { - char *buffer = (char *) malloc(maxphraselen); - if (!buffer) { - perror("malloc"); - return NULL; - } - size_t len = maxphraselen - 1; +static inline std::vector readdata(FILE *fp) { + std::vector data; + data.reserve(REALLOC_BATCH); - if (dataptr == 0) { - dataptr = (char *) realloc(dataptr, REALLOC_BATCH); - if (dataptr == NULL) { - perror("realloc"); - return NULL; - } - *datacapacity = REALLOC_BATCH; - *datalen = 0; - } + char *line = nullptr; + size_t len = 0; ssize_t r; - while (errno = 0, (r = getline(&buffer, &len, fp)) > 0) { - if ((*datalen + r) > *datacapacity) { - *datacapacity += REALLOC_BATCH; - char *tmp = (char *) realloc(dataptr, *datacapacity); - if (tmp == NULL) { - perror("realloc"); - free(dataptr); - return NULL; + + while (errno = 0, (r = getline(&line, &len, fp)) > 0) { + if (data.capacity() < data.size() + static_cast(r)) { + // grow in batches + size_t newCap = data.capacity(); + while (newCap < data.size() + static_cast(r)) { + newCap += REALLOC_BATCH; } - dataptr = tmp; + data.reserve(newCap); } - strncpy(&dataptr[*datalen], buffer, maxphraselen); - *datalen += r; + + // append raw bytes + data.insert(data.end(), line, line + r); } + if (errno != 0) { perror("getline"); - free(dataptr); - return NULL; + data.clear(); } - return dataptr; + + free(line); // getline() allocates, must free + return data; } static inline double diffmsec(const struct timeval a, const struct timeval b) { @@ -109,33 +100,32 @@ static inline double diffmsec(const struct timeval a, const struct timeval b) { return ((double) (sec * 1000) + ((double) usec) / 1000.0); } -[[maybe_unused]] static int writedata(char *dataptr, size_t datalen, float percent, char *destdir, - ssize_t dstart, ssize_t dfiles) { +[[maybe_unused]] static int writedata(const std::vector &data, float percent, + const char *destdir, ssize_t dstart, ssize_t dfiles) { int error = 0; - FILE **fp = (FILE **) calloc(sizeof(FILE *), dfiles); - if (!fp) { - perror("malloc"); - return -1; - } + std::vector fps(dfiles, nullptr); + char filepath[2 * PATH_MAX]{0}; - // opening (truncating) all files - for (int j = 0, i = 0 + dstart; i < (dfiles + dstart); ++i, ++j) { + // Open (truncate) all files + for (int j = 0, i = dstart; i < (dstart + dfiles); ++i, ++j) { sprintf(filepath, fmtout, destdir, i); - fp[j] = fopen(filepath, "w"); - if (!fp[j]) { + fps[j] = fopen(filepath, "w"); + if (!fps[j]) { perror("fopen"); - fprintf(stderr, "cannot create (open) the file %s\n", filepath); + std::cerr << "cannot create (open) the file " << filepath << "\n"; error = -1; } } if (!error) { - size_t nbytes = datalen * percent; - size_t cnt = 0; + size_t nbytes = static_cast(data.size() * percent); + size_t cnt = 0; + const char *dataptr = data.data(); + while (nbytes > 0) { size_t chunk = (nbytes > REDUCE_CHUNK) ? REDUCE_CHUNK : nbytes; - if (fwrite(dataptr, 1, chunk, fp[cnt]) != chunk) { + if (fwrite(dataptr, 1, chunk, fps[cnt]) != chunk) { perror("fwrite"); error = -1; break; @@ -144,13 +134,14 @@ static inline double diffmsec(const struct timeval a, const struct timeval b) { nbytes -= chunk; } } - // closing all files - for (int i = 0; i < dfiles; ++i) { - if (fp[i]) { - fclose(fp[i]); + + // Close all files + for (auto f : fps) { + if (f) { + fclose(f); } } - free(fp); + return error; } diff --git a/capio-tests/multinode/integration/src/mapreduce.cpp b/capio-tests/multinode/integration/src/mapreduce.cpp index e2afd0356..2f7ad2063 100644 --- a/capio-tests/multinode/integration/src/mapreduce.cpp +++ b/capio-tests/multinode/integration/src/mapreduce.cpp @@ -1,48 +1,50 @@ #include "common.hpp" -#include - -int mapReduceFunction(char *sourcedirname, ssize_t sstart, ssize_t sfiles, char *destdirname, - ssize_t dstart, ssize_t dfiles, float percent, - std::promise &&return_value) { - struct timeval before, after; - struct stat statbuf; - char *dataptr = NULL; - size_t datalen = 0; - size_t datacapacity = 0; - gettimeofday(&before, NULL); + +int mapReduceFunction(const char *sourcedirname, ssize_t sstart, ssize_t sfiles, + const char *destdirname, ssize_t dstart, ssize_t dfiles, float percent, + int *return_value) { + struct timeval before{}, after{}; + struct stat statbuf{}; + + gettimeofday(&before, nullptr); EXPECT_NE(stat(sourcedirname, &statbuf), -1); EXPECT_TRUE(S_ISDIR(statbuf.st_mode)); EXPECT_GE(sstart, 0); EXPECT_GT(sfiles, 0); + EXPECT_NE(stat(destdirname, &statbuf), -1); EXPECT_TRUE(S_ISDIR(statbuf.st_mode)); EXPECT_GE(dstart, 0); EXPECT_GT(dfiles, 0); + EXPECT_GT(percent, 0); EXPECT_LE(percent, 1); + std::vector merged; + char filepath[2 * PATH_MAX]{0}; - // concatenating all files in memory (dataptr) - for (int i = 0 + sstart; i < (sfiles + sstart); ++i) { + // Concatenate all files into `merged` + for (int i = sstart; i < (sstart + sfiles); ++i) { sprintf(filepath, fmtin, sourcedirname, i); FILE *fp = fopen(filepath, "r"); - EXPECT_NE(fileno(fp), -1); + EXPECT_TRUE(fp != nullptr); + + auto chunk = readdata(fp); + EXPECT_FALSE(chunk.empty()); - char *ptr = readdata(fp, dataptr, &datalen, &datacapacity); - EXPECT_NE(ptr, nullptr); + merged.insert(merged.end(), chunk.begin(), chunk.end()); - dataptr = ptr; fclose(fp); } - int r = writedata(dataptr, datalen, percent, destdirname, dstart, dfiles); - free(dataptr); + // Call writedata with the concatenated buffer + int r = writedata(merged, percent, destdirname, dstart, dfiles); - return_value.set_value(r); + *return_value = r; - gettimeofday(&after, NULL); + gettimeofday(&after, nullptr); double elapsed_time = diffmsec(after, before); fprintf(stdout, "MAPREDUCE: elapsed time (ms) : %g\n", elapsed_time); @@ -50,17 +52,17 @@ int mapReduceFunction(char *sourcedirname, ssize_t sstart, ssize_t sfiles, char } TEST(integrationTests, RunMapReducerTest) { - std::promise ret1, ret2; + int ret1 = -1, ret2 = -1; std::thread mapReducer1(mapReduceFunction, std::getenv("CAPIO_DIR"), 0, 5, - std::getenv("CAPIO_DIR"), 0, 5, 0.3, std::move(ret1)); + std::getenv("CAPIO_DIR"), 0, 5, 0.3, &ret1); std::thread mapReducer2(mapReduceFunction, std::getenv("CAPIO_DIR"), 1, 5, - std::getenv("CAPIO_DIR"), 1, 5, 0.3, std::move(ret2)); + std::getenv("CAPIO_DIR"), 1, 5, 0.3, &ret2); mapReducer1.join(); mapReducer2.join(); - EXPECT_EQ(ret1.get_future().get(), 0); - EXPECT_EQ(ret2.get_future().get(), 0); + EXPECT_EQ(ret1, 0); + EXPECT_EQ(ret2, 0); } int main(int argc, char **argv, char **envp) { diff --git a/capio-tests/multinode/integration/src/merge.cpp b/capio-tests/multinode/integration/src/merge.cpp index f76558418..3a7477988 100644 --- a/capio-tests/multinode/integration/src/merge.cpp +++ b/capio-tests/multinode/integration/src/merge.cpp @@ -2,13 +2,11 @@ #include -int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) { +int mergeFunction(ssize_t nfiles, const char *sourcedir, const char *destdir) { + timeval before{}, after{}; + struct stat statbuf{}; - struct timeval before, after; - struct stat statbuf; - char *dataptr = NULL; - size_t datalen = 0, datacapacity = 0; - gettimeofday(&before, NULL); + gettimeofday(&before, nullptr); EXPECT_GT(nfiles, 0); @@ -18,16 +16,20 @@ int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) { EXPECT_NE(stat(destdir, &statbuf), -1); EXPECT_TRUE(S_ISDIR(statbuf.st_mode)); + std::vector merged; // final result buffer + char filepath[2 * PATH_MAX]{0}; for (int i = 0; i < nfiles; ++i) { sprintf(filepath, fmtout, sourcedir, i); FILE *fp = fopen(filepath, "r"); - EXPECT_NE(fileno(fp), -1); + EXPECT_TRUE(fp != nullptr); + + auto chunk = readdata(fp); + EXPECT_FALSE(chunk.empty()); - char *ptr = readdata(fp, dataptr, &datalen, &datacapacity); - EXPECT_NE(ptr, nullptr); + // append this file's data to merged + merged.insert(merged.end(), chunk.begin(), chunk.end()); - dataptr = ptr; fclose(fp); } @@ -36,11 +38,11 @@ int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) { FILE *fp = fopen(resultpath, "w"); EXPECT_TRUE(fp); - EXPECT_EQ(fwrite(dataptr, 1, datalen, fp), datalen); + EXPECT_EQ(fwrite(merged.data(), 1, merged.size(), fp), merged.size()); - free(dataptr); + fclose(fp); - gettimeofday(&after, NULL); + gettimeofday(&after, nullptr); double elapsed_time = diffmsec(after, before); fprintf(stdout, "MERGE: elapsed time (ms) : %g\n", elapsed_time);