Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 41 additions & 50 deletions capio-tests/multinode/integration/src/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,44 +58,35 @@ static char fmtout[] = "%s/outfile_%05d.dat";
"%s/infile_%05d.dat"; // 5 is the number of digits of maxnumfiles

// reading file data into memory
static inline char *readdata(FILE *fp, char *dataptr, size_t *datalen, size_t *datacapacity) {
char *buffer = (char *) malloc(maxphraselen);
if (!buffer) {
perror("malloc");
return NULL;
}
size_t len = maxphraselen - 1;
static inline std::vector<char> readdata(FILE *fp) {
std::vector<char> data;
data.reserve(REALLOC_BATCH);

if (dataptr == 0) {
dataptr = (char *) realloc(dataptr, REALLOC_BATCH);
if (dataptr == NULL) {
perror("realloc");
return NULL;
}
*datacapacity = REALLOC_BATCH;
*datalen = 0;
}
char *line = nullptr;
size_t len = 0;
ssize_t r;
while (errno = 0, (r = getline(&buffer, &len, fp)) > 0) {
if ((*datalen + r) > *datacapacity) {
*datacapacity += REALLOC_BATCH;
char *tmp = (char *) realloc(dataptr, *datacapacity);
if (tmp == NULL) {
perror("realloc");
free(dataptr);
return NULL;

while (errno = 0, (r = getline(&line, &len, fp)) > 0) {
if (data.capacity() < data.size() + static_cast<size_t>(r)) {
// grow in batches
size_t newCap = data.capacity();
while (newCap < data.size() + static_cast<size_t>(r)) {
newCap += REALLOC_BATCH;
}
dataptr = tmp;
data.reserve(newCap);
}
strncpy(&dataptr[*datalen], buffer, maxphraselen);
*datalen += r;

// append raw bytes
data.insert(data.end(), line, line + r);
}

if (errno != 0) {
perror("getline");
free(dataptr);
return NULL;
data.clear();
}
return dataptr;

free(line); // getline() allocates, must free
return data;
}

static inline double diffmsec(const struct timeval a, const struct timeval b) {
Expand All @@ -109,33 +100,32 @@ static inline double diffmsec(const struct timeval a, const struct timeval b) {
return ((double) (sec * 1000) + ((double) usec) / 1000.0);
}

[[maybe_unused]] static int writedata(char *dataptr, size_t datalen, float percent, char *destdir,
ssize_t dstart, ssize_t dfiles) {
[[maybe_unused]] static int writedata(const std::vector<char> &data, float percent,
const char *destdir, ssize_t dstart, ssize_t dfiles) {
int error = 0;
FILE **fp = (FILE **) calloc(sizeof(FILE *), dfiles);
if (!fp) {
perror("malloc");
return -1;
}
std::vector<FILE *> fps(dfiles, nullptr);

char filepath[2 * PATH_MAX]{0};

// opening (truncating) all files
for (int j = 0, i = 0 + dstart; i < (dfiles + dstart); ++i, ++j) {
// Open (truncate) all files
for (int j = 0, i = dstart; i < (dstart + dfiles); ++i, ++j) {
sprintf(filepath, fmtout, destdir, i);
fp[j] = fopen(filepath, "w");
if (!fp[j]) {
fps[j] = fopen(filepath, "w");
if (!fps[j]) {
perror("fopen");
fprintf(stderr, "cannot create (open) the file %s\n", filepath);
std::cerr << "cannot create (open) the file " << filepath << "\n";
error = -1;
}
}

if (!error) {
size_t nbytes = datalen * percent;
size_t cnt = 0;
size_t nbytes = static_cast<size_t>(data.size() * percent);
size_t cnt = 0;
const char *dataptr = data.data();

while (nbytes > 0) {
size_t chunk = (nbytes > REDUCE_CHUNK) ? REDUCE_CHUNK : nbytes;
if (fwrite(dataptr, 1, chunk, fp[cnt]) != chunk) {
if (fwrite(dataptr, 1, chunk, fps[cnt]) != chunk) {
perror("fwrite");
error = -1;
break;
Expand All @@ -144,13 +134,14 @@ static inline double diffmsec(const struct timeval a, const struct timeval b) {
nbytes -= chunk;
}
}
// closing all files
for (int i = 0; i < dfiles; ++i) {
if (fp[i]) {
fclose(fp[i]);

// Close all files
for (auto f : fps) {
if (f) {
fclose(f);
}
}
free(fp);

return error;
}

Expand Down
54 changes: 28 additions & 26 deletions capio-tests/multinode/integration/src/mapreduce.cpp
Original file line number Diff line number Diff line change
@@ -1,66 +1,68 @@
#include "common.hpp"
#include <future>

int mapReduceFunction(char *sourcedirname, ssize_t sstart, ssize_t sfiles, char *destdirname,
ssize_t dstart, ssize_t dfiles, float percent,
std::promise<int> &&return_value) {
struct timeval before, after;
struct stat statbuf;
char *dataptr = NULL;
size_t datalen = 0;
size_t datacapacity = 0;
gettimeofday(&before, NULL);

int mapReduceFunction(const char *sourcedirname, ssize_t sstart, ssize_t sfiles,
const char *destdirname, ssize_t dstart, ssize_t dfiles, float percent,
int *return_value) {
struct timeval before{}, after{};
struct stat statbuf{};

gettimeofday(&before, nullptr);

EXPECT_NE(stat(sourcedirname, &statbuf), -1);
EXPECT_TRUE(S_ISDIR(statbuf.st_mode));
EXPECT_GE(sstart, 0);
EXPECT_GT(sfiles, 0);

EXPECT_NE(stat(destdirname, &statbuf), -1);
EXPECT_TRUE(S_ISDIR(statbuf.st_mode));
EXPECT_GE(dstart, 0);
EXPECT_GT(dfiles, 0);

EXPECT_GT(percent, 0);
EXPECT_LE(percent, 1);

std::vector<char> merged;

char filepath[2 * PATH_MAX]{0};
// concatenating all files in memory (dataptr)
for (int i = 0 + sstart; i < (sfiles + sstart); ++i) {
// Concatenate all files into `merged`
for (int i = sstart; i < (sstart + sfiles); ++i) {
sprintf(filepath, fmtin, sourcedirname, i);

FILE *fp = fopen(filepath, "r");
EXPECT_NE(fileno(fp), -1);
EXPECT_TRUE(fp != nullptr);

auto chunk = readdata(fp);
EXPECT_FALSE(chunk.empty());

char *ptr = readdata(fp, dataptr, &datalen, &datacapacity);
EXPECT_NE(ptr, nullptr);
merged.insert(merged.end(), chunk.begin(), chunk.end());

dataptr = ptr;
fclose(fp);
}

int r = writedata(dataptr, datalen, percent, destdirname, dstart, dfiles);
free(dataptr);
// Call writedata with the concatenated buffer
int r = writedata(merged, percent, destdirname, dstart, dfiles);

return_value.set_value(r);
*return_value = r;

gettimeofday(&after, NULL);
gettimeofday(&after, nullptr);
double elapsed_time = diffmsec(after, before);
fprintf(stdout, "MAPREDUCE: elapsed time (ms) : %g\n", elapsed_time);

return 0;
}

TEST(integrationTests, RunMapReducerTest) {
std::promise<int> ret1, ret2;
int ret1 = -1, ret2 = -1;
std::thread mapReducer1(mapReduceFunction, std::getenv("CAPIO_DIR"), 0, 5,
std::getenv("CAPIO_DIR"), 0, 5, 0.3, std::move(ret1));
std::getenv("CAPIO_DIR"), 0, 5, 0.3, &ret1);
std::thread mapReducer2(mapReduceFunction, std::getenv("CAPIO_DIR"), 1, 5,
std::getenv("CAPIO_DIR"), 1, 5, 0.3, std::move(ret2));
std::getenv("CAPIO_DIR"), 1, 5, 0.3, &ret2);

mapReducer1.join();
mapReducer2.join();

EXPECT_EQ(ret1.get_future().get(), 0);
EXPECT_EQ(ret2.get_future().get(), 0);
EXPECT_EQ(ret1, 0);
EXPECT_EQ(ret2, 0);
}

int main(int argc, char **argv, char **envp) {
Expand Down
28 changes: 15 additions & 13 deletions capio-tests/multinode/integration/src/merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

#include <linux/limits.h>

int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) {
int mergeFunction(ssize_t nfiles, const char *sourcedir, const char *destdir) {
timeval before{}, after{};
struct stat statbuf{};

struct timeval before, after;
struct stat statbuf;
char *dataptr = NULL;
size_t datalen = 0, datacapacity = 0;
gettimeofday(&before, NULL);
gettimeofday(&before, nullptr);

EXPECT_GT(nfiles, 0);

Expand All @@ -18,16 +16,20 @@ int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) {
EXPECT_NE(stat(destdir, &statbuf), -1);
EXPECT_TRUE(S_ISDIR(statbuf.st_mode));

std::vector<char> merged; // final result buffer

char filepath[2 * PATH_MAX]{0};
for (int i = 0; i < nfiles; ++i) {
sprintf(filepath, fmtout, sourcedir, i);
FILE *fp = fopen(filepath, "r");
EXPECT_NE(fileno(fp), -1);
EXPECT_TRUE(fp != nullptr);

auto chunk = readdata(fp);
EXPECT_FALSE(chunk.empty());

char *ptr = readdata(fp, dataptr, &datalen, &datacapacity);
EXPECT_NE(ptr, nullptr);
// append this file's data to merged
merged.insert(merged.end(), chunk.begin(), chunk.end());

dataptr = ptr;
fclose(fp);
}

Expand All @@ -36,11 +38,11 @@ int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) {
FILE *fp = fopen(resultpath, "w");
EXPECT_TRUE(fp);

EXPECT_EQ(fwrite(dataptr, 1, datalen, fp), datalen);
EXPECT_EQ(fwrite(merged.data(), 1, merged.size(), fp), merged.size());

free(dataptr);
fclose(fp);

gettimeofday(&after, NULL);
gettimeofday(&after, nullptr);
double elapsed_time = diffmsec(after, before);
fprintf(stdout, "MERGE: elapsed time (ms) : %g\n", elapsed_time);

Expand Down