Skip to content

Commit d668929

Browse files
authored
Merge pull request #134 from High-Performance-IO/memory-integration-speedup
Since caches are used to improve performance for small size IO operation, when a read() operation that targets a size greater than the size of the read cache line size is issued, the cache is being bypassed and the read operation occurs directly from the shared memory segment.
2 parents 739ae8b + a095f7d commit d668929

4 files changed

Lines changed: 74 additions & 99 deletions

File tree

src/posix/utils/cache/read_request_cache_mem.hpp

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,20 @@ class ReadRequestCacheMEM {
2424
}
2525
}
2626

27-
protected:
27+
protected:
2828
[[nodiscard]] capio_off64_t read_request(const int fd, const capio_off64_t count,
29-
const long tid) {
30-
START_LOG(capio_syscall(SYS_gettid), "call(fd=%ld, count=%llu, tid=%ld)", fd, count, tid);
29+
const long tid, bool use_cache = true) {
30+
START_LOG(capio_syscall(SYS_gettid), "call(fd=%ld, count=%llu, tid=%ld, load_data=%s)", fd,
31+
count, tid, use_cache ? "true" : "false");
3132
char req[CAPIO_REQ_MAX_SIZE];
3233

3334
// send as the last parameter to the server the maximum amount of data that can be read into
3435
// a single line of cache
3536

3637
auto read_begin_offset = get_capio_fd_offset(fd);
3738

38-
sprintf(req, "%04d %ld %llu %llu %llu %s", CAPIO_REQUEST_READ_MEM, tid, read_begin_offset,
39-
count, _max_line_size, get_capio_fd_path(fd).c_str());
39+
sprintf(req, "%04d %ld %llu %llu %llu %d %s", CAPIO_REQUEST_READ_MEM, tid,
40+
read_begin_offset, count, _max_line_size, use_cache, get_capio_fd_path(fd).c_str());
4041
LOG("Sending read request %s", req);
4142
buf_requests->write(req, CAPIO_REQ_MAX_SIZE);
4243
capio_off64_t stc_queue_read = bufs_response->at(tid)->read();
@@ -49,26 +50,21 @@ class ReadRequestCacheMEM {
4950
LOG("File is commited. Actual offset is: %ld", stc_queue_read);
5051
}
5152

52-
// TODO: this code is not needed as the read size is allways at maximum the size of the
53-
// cache line
54-
/*auto read_size = stc_queue_read;
55-
while (read_size > 0) {
56-
const capio_off64_t tmp_read_size =
57-
read_size > _max_line_size ? _max_line_size : read_size;
58-
stc_queue->read(_cache, tmp_read_size);
53+
if (use_cache) {
54+
stc_queue->read(_cache, stc_queue_read);
5955
_cache_offset = 0;
60-
read_size -= tmp_read_size;
61-
}*/
62-
63-
stc_queue->read(_cache, stc_queue_read);
64-
_cache_offset = 0;
65-
66-
LOG("Completed fetch of data from server");
56+
LOG("Completed fetch of data from server");
57+
} else {
58+
_actual_size = 0;
59+
_cache_offset = 0;
60+
LOG("Data has not been loaded from server, as load_data==false."
61+
" Load will occur indipendently");
62+
}
6763

6864
return stc_queue_read;
6965
}
7066

71-
public:
67+
public:
7268
explicit ReadRequestCacheMEM(const long line_size = get_posix_read_cache_line_size())
7369
: _cache(nullptr), _tid(capio_syscall(SYS_gettid)), _fd(-1), _max_line_size(line_size),
7470
_actual_size(0), _cache_offset(0), _last_read_end(-1) {
@@ -85,7 +81,7 @@ class ReadRequestCacheMEM {
8581
if (_cache_offset != _actual_size) {
8682
_actual_size = _cache_offset = 0;
8783
}
88-
committed = false;
84+
committed = false;
8985
_real_file_size_commmitted = -1;
9086
}
9187

@@ -97,7 +93,7 @@ class ReadRequestCacheMEM {
9793
if (_fd != fd) {
9894
LOG("changed fd from %d to %d: flushing", _fd, fd);
9995
flush();
100-
_fd = fd;
96+
_fd = fd;
10197
_last_read_end = get_capio_fd_offset(fd);
10298
}
10399

@@ -116,11 +112,22 @@ class ReadRequestCacheMEM {
116112
return 0;
117113
}
118114

115+
/*
116+
* Check: if read size is greater than the capability of the cache line, bypass
117+
* the cache and perform a read directly to the provided buffer
118+
*/
119+
if (count > _max_line_size) {
120+
LOG("count > _max_line_size. Bypassing cache. Performing read() directly to buffer.");
121+
const auto _read_size = read_request(_fd, count, _tid, false);
122+
stc_queue->read(static_cast<char *>(buffer), _read_size);
123+
return _read_size;
124+
}
125+
119126
// Check if cache is empty or if all the content of the cache has been already consumed
120127
if (_actual_size == 0 || _actual_size == _cache_offset) {
121128
LOG("No data is present locally. performing request.");
122129
const auto size = count < _max_line_size ? count : _max_line_size;
123-
_actual_size = read_request(_fd, size, _tid);
130+
_actual_size = read_request(_fd, size, _tid);
124131

125132
// Update count for current request. If count exceeds _actual_size, resize it to not
126133
// exceeds the available size on posix application
@@ -132,7 +139,7 @@ class ReadRequestCacheMEM {
132139
LOG("The requested amount of data can be served without performing a request");
133140
_read(buffer, count);
134141
actual_read_size = count;
135-
_last_read_end = get_capio_fd_offset(_fd) + count;
142+
_last_read_end = get_capio_fd_offset(_fd) + count;
136143
set_capio_fd_offset(fd, _last_read_end);
137144

138145
} else {
@@ -153,7 +160,7 @@ class ReadRequestCacheMEM {
153160
LOG("actual_read_size incremented to: %ld", actual_read_size);
154161

155162
// Compute the remaining amount of data to send to client
156-
auto remaining_size = count - first_copy_size;
163+
auto remaining_size = count - first_copy_size;
157164
capio_off64_t copy_offset = first_copy_size;
158165

159166
while (copy_offset < count && !committed) {

src/server/client-manager/handlers/read.hpp

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ inline void read_handler(const char *const str) {
5757
inline void read_mem_handler(const char *const str) {
5858
pid_t tid;
5959
capio_off64_t read_size, client_cache_line_size, read_begin_offset;
60+
int use_cache;
6061
char path[PATH_MAX];
61-
sscanf(str, "%ld %llu %llu %llu %s", &tid, &read_begin_offset, &read_size,
62-
&client_cache_line_size, path);
62+
sscanf(str, "%ld %llu %llu %llu %d %s", &tid, &read_begin_offset, &read_size,
63+
&client_cache_line_size, &use_cache, path);
6364
START_LOG(gettid(),
6465
"call(tid=%d, read_begin_offset=%llu, read_size=%llu, client_cache_line_size=%llu, "
65-
"path=%s)",
66-
tid, read_begin_offset, read_size, client_cache_line_size, path);
66+
"use_cache=%s, path=%s)",
67+
tid, read_begin_offset, read_size, client_cache_line_size,
68+
use_cache ? "true" : "false", path);
6769

6870
if (storage_service->sizeOf(path) < read_begin_offset + read_size &&
6971
!file_manager->isCommitted(path)) {
@@ -73,25 +75,30 @@ inline void read_mem_handler(const char *const str) {
7375
return;
7476
}
7577

76-
LOG("Computing size of data to send: minimum between:");
77-
LOG("client_cache_line_size: %llu", client_cache_line_size);
78-
LOG("file_size:%llu - read_begin_offset=%llu = %llu", storage_service->sizeOf(path),
79-
read_begin_offset, storage_service->sizeOf(path) - read_begin_offset);
80-
auto size_to_send =
81-
std::min({client_cache_line_size, (storage_service->sizeOf(path) - read_begin_offset)});
82-
83-
LOG("Need to sent to client %llu bytes, asking storage service to send data", size_to_send);
84-
storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send);
78+
capio_off64_t size_to_send = storage_service->sizeOf(path);
79+
if (use_cache) {
80+
LOG("Computing size of data to send: minimum between:");
81+
LOG("client_cache_line_size: %llu", client_cache_line_size);
82+
LOG("file_size:%llu - read_begin_offset=%llu = %llu", size_to_send,
83+
read_begin_offset, size_to_send - read_begin_offset);
84+
size_to_send =
85+
std::min({client_cache_line_size, (size_to_send - read_begin_offset)});
86+
}
8587

8688
LOG("Sending to posix app the offset up to which read.");
8789
if (file_manager->isCommitted(path) &&
8890
read_begin_offset + size_to_send >= storage_service->sizeOf(path)) {
8991
LOG("File is committed, and end of read >= than file size."
9092
" signaling it to posix application by setting offset MSB to 1");
91-
size_to_send = 0x8000000000000000 | size_to_send;
93+
LOG("Sending offset: %llu", 0x8000000000000000 |size_to_send);
94+
client_manager->reply_to_client(tid, 0x8000000000000000 | size_to_send);
95+
} else {
96+
LOG("File is not committed. Sending offset: %llu", size_to_send);
97+
client_manager->reply_to_client(tid, size_to_send);
9298
}
93-
LOG("Sending offset: %llu", size_to_send);
94-
client_manager->reply_to_client(tid, size_to_send);
99+
100+
LOG("Need to sent to client %llu bytes, asking storage service to send data", size_to_send);
101+
storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send);
95102
}
96103

97104
#endif // READ_HPP

src/server/storage-service/CapioFile/CapioMemoryFile.hpp

Lines changed: 11 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class CapioMemoryFile : public CapioFile {
1616
std::map<std::size_t, std::vector<char>> memoryBlocks;
1717

1818
// Static file sizes of file pages
19-
static constexpr u_int32_t _pageSizeMB = 4;
19+
static constexpr u_int32_t _pageSizeMB = 4;
2020
static constexpr u_int64_t _pageSizeBytes = _pageSizeMB * 1024 * 1024;
2121

2222
char *cross_page_buffer_view;
@@ -59,7 +59,7 @@ class CapioMemoryFile : public CapioFile {
5959
return block;
6060
}
6161

62-
public:
62+
public:
6363
explicit CapioMemoryFile(const std::string &filePath) : CapioFile(filePath) {
6464
cross_page_buffer_view = new char[_pageSizeBytes];
6565
}
@@ -193,66 +193,27 @@ class CapioMemoryFile : public CapioFile {
193193
*/
194194
std::size_t writeToQueue(SPSCQueue &queue, std::size_t offset,
195195
std::size_t length) const override {
196+
START_LOG(gettid(), "call(offset=%llu, length=%llu)", offset, length);
196197
std::size_t bytesRead = 0;
197198

198-
const auto offsets = compute_offsets(offset, length);
199-
unsigned long map_offset = std::get<0>(offsets);
200-
unsigned long mem_block_offset_begin = std::get<1>(offsets);
201-
unsigned long buffer_view_size = std::get<2>(offsets);
202-
203-
if (buffer_view_size != length) {
204-
/*
205-
* In this case, we have requested a view that spansa over different memory blocks and
206-
* cannot be served in a contiguos manner. We allocate a temporary buffer that is used
207-
* to overlap the two memory block region and then write the temporary buffer.
208-
* This requires more memcpy but it is invoked only when the view overlaps and as such
209-
* is not frequent.
210-
*/
199+
while (bytesRead < length) {
200+
const auto [map_offset,mem_block_offset_begin , buffer_view_size] =
201+
compute_offsets(offset, length - bytesRead);
211202

212-
213-
for (auto it = memoryBlocks.lower_bound(map_offset);
214-
it != memoryBlocks.end() && bytesRead < length; ++it) {
203+
if (const auto it = memoryBlocks.lower_bound(map_offset); it != memoryBlocks.end()) {
215204
auto &[blockOffset, block] = *it;
216205

217206
if (blockOffset >= offset + length) {
218-
break; // Past the requested range
207+
return bytesRead; // Past the requested range
219208
}
220209

221-
// Copy the data to the temporary buffer
222-
memcpy(cross_page_buffer_view + bytesRead, block.data() + mem_block_offset_begin,
223-
buffer_view_size);
210+
// Copy the data to the buffer
211+
queue.write(block.data() + mem_block_offset_begin, buffer_view_size);
224212

225213
bytesRead += buffer_view_size;
226-
227-
const auto updated_offsets =
228-
compute_offsets(offset + bytesRead, length - bytesRead);
229-
map_offset = std::get<0>(updated_offsets);
230-
mem_block_offset_begin = std::get<1>(updated_offsets);
231-
buffer_view_size = std::get<2>(updated_offsets);
214+
offset += buffer_view_size;
232215
}
233-
234-
// send the temporary buffer to the application
235-
queue.write(cross_page_buffer_view, bytesRead);
236-
237-
return bytesRead;
238216
}
239-
240-
/*
241-
* Here we have requested a read that spans over a single memory block.
242-
*/
243-
if (const auto it = memoryBlocks.lower_bound(map_offset); it != memoryBlocks.end()) {
244-
auto &[blockOffset, block] = *it;
245-
246-
if (blockOffset >= offset + length) {
247-
return bytesRead; // Past the requested range
248-
}
249-
250-
// Copy the data to the buffer
251-
queue.write(block.data() + mem_block_offset_begin, buffer_view_size);
252-
253-
bytesRead += buffer_view_size;
254-
}
255-
256217
return bytesRead;
257218
}
258219
};

src/server/storage-service/capio_storage_service.hpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class CapioStorageService {
1010
// TODO: put all of this conde on a different thread
1111

1212
std::unordered_map<pid_t, SPSCQueue *> *_client_to_server_queue;
13-
std::unordered_map<pid_t, SPSCQueue *> *_server_to_clien_queue;
13+
std::unordered_map<pid_t, SPSCQueue *> *_server_to_client_queue;
1414
std::unordered_map<std::string, CapioFile *> *_stored_files;
1515

1616
std::unordered_map<std::string, std::vector<std::tuple<capio_off64_t, capio_off64_t, pid_t>>>
@@ -33,7 +33,7 @@ class CapioStorageService {
3333
START_LOG(gettid(), "call()");
3434
_stored_files = new std::unordered_map<std::string, CapioFile *>;
3535
_client_to_server_queue = new std::unordered_map<pid_t, SPSCQueue *>;
36-
_server_to_clien_queue = new std::unordered_map<pid_t, SPSCQueue *>;
36+
_server_to_client_queue = new std::unordered_map<pid_t, SPSCQueue *>;
3737
_threads_waiting_for_memory_data =
3838
new std::unordered_map<std::string,
3939
std::vector<std::tuple<capio_off64_t, capio_off64_t, pid_t>>>;
@@ -45,7 +45,7 @@ class CapioStorageService {
4545
// TODO: dump files to FS
4646
delete _stored_files;
4747
delete _client_to_server_queue;
48-
delete _server_to_clien_queue;
48+
delete _server_to_client_queue;
4949
delete _threads_waiting_for_memory_data;
5050
}
5151

@@ -116,7 +116,7 @@ class CapioStorageService {
116116
_client_to_server_queue->emplace(
117117
pid, new SPSCQueue("queue-" + std::to_string(pid) + +".cts", get_cache_lines(),
118118
get_cache_line_size(), workflow_name, false));
119-
_server_to_clien_queue->emplace(
119+
_server_to_client_queue->emplace(
120120
pid, new SPSCQueue("queue-" + std::to_string(pid) + +".stc", get_cache_lines(),
121121
get_cache_line_size(), workflow_name, false));
122122
LOG("Created communication queues");
@@ -134,7 +134,7 @@ class CapioStorageService {
134134
START_LOG(gettid(), "call(pid=%llu, file=%s, offset=%llu, size=%llu)", pid, file.c_str(),
135135
offset, size);
136136

137-
getFile(file)->writeToQueue(*_server_to_clien_queue->at(pid), offset, size);
137+
getFile(file)->writeToQueue(*_server_to_client_queue->at(pid), offset, size);
138138
}
139139

140140
/**
@@ -155,7 +155,7 @@ class CapioStorageService {
155155

156156
void remove_client(const pid_t pid) const {
157157
_client_to_server_queue->erase(pid);
158-
_server_to_clien_queue->erase(pid);
158+
_server_to_client_queue->erase(pid);
159159
}
160160

161161
/**
@@ -173,7 +173,7 @@ class CapioStorageService {
173173
auto c_dir = get_capio_dir().string();
174174
memcpy(f, c_dir.c_str(), c_dir.length());
175175
memcpy(f + c_dir.size(), "/*", 2);
176-
_server_to_clien_queue->at(pid)->write(f, PATH_MAX);
176+
_server_to_client_queue->at(pid)->write(f, PATH_MAX);
177177
LOG("Return value=%llu", 1);
178178
return 1;
179179
}
@@ -183,7 +183,7 @@ class CapioStorageService {
183183
LOG("Sending file %s", file.c_str());
184184
char f[PATH_MAX + 1]{0};
185185
memcpy(f, file.c_str(), file.size());
186-
_server_to_clien_queue->at(pid)->write(f, PATH_MAX);
186+
_server_to_client_queue->at(pid)->write(f, PATH_MAX);
187187
}
188188

189189
LOG("Return value=%llu", files_to_store_in_mem.size());

0 commit comments

Comments
 (0)