-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.cpp
More file actions
178 lines (161 loc) · 5.86 KB
/
server.cpp
File metadata and controls
178 lines (161 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
@author: antriksh
Version 0: 3/14/2018
Version 0.1: 4/11/2018
* Documentation updated
* More structure to directories
* #includes optimized
* Ready for Version 0.1
Version 1: Project 3 complete
*/
#include "header/Info/Meta/utils.h"
#include "header/Socket.h"
// Server:
// * Manages files and file paths
// * Takes client requests to read and write
// * Does not do anything for maintaining mutual exclusion
class Server : protected Socket {
private:
string directory;
int mserverfd;
string mserverID;
vector<string> files;
public:
Server(int argc, char *argv[]) : Socket(argv) {
if (argc >= 4)
directory = argv[3];
else {
directory = id + "Directory";
Logger("Directory: " + directory);
const int dir_err =
mkdir(directory.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
}
// Infinite thread sending heartbeat messages to mserver
void heartBeat() {
while (1) {
ProcessInfo p = mserver[0];
int mserverfd = this->connectTo(p.hostname, p.port);
Logger("[HEARTBEAT]", false);
files = readDirectory(directory);
send(personalfd, mserverfd, "heartbeat", makeFileTuple(files),
p.processID);
close(mserverfd);
sleep(5);
}
}
// Infinite thread to accept connection and detach a thread as
// a receiver and checker of messages
void listener() {
while (1) {
// Accept a connection with the accept() system call
int newsockfd =
accept(personalfd, (struct sockaddr *)&cli_addr, &clilen);
if (newsockfd < 0) {
error("ERROR on accept");
}
std::thread connectedThread(&Server::processMessages, this,
newsockfd);
connectedThread.detach();
}
}
// Starts as a thread which receives a message and checks the message
// @newsockfd - fd socket stream from which message would be received
void processMessages(int newsockfd) {
try {
Message *message = this->receive(newsockfd);
this->checkMessage(message, newsockfd);
} catch (const char *e) {
Logger(e);
close(newsockfd);
}
}
// Checks the message for different types of incoming messages
// 1. hi - just a hello from some client/server (not in use)
// 2. something else (means a request to be granted)
// @m - Message just received
// @newsockfd - socket stream it was received from
void checkMessage(Message *m, int newsockfd) {
if (m->type == "create") {
createEmptyChunk(m);
} else if (m->type == "head") {
int size = getChunkSize(directory + "/" + m->fileName);
cout << size << endl;
writeReply(m, newsockfd, "head", to_string(size));
} else if (m->type == "recover") {
int size = getChunkSize(directory + "/" + m->fileName);
cout << size << endl;
int offset = stoi(m->message);
string line = readFile(directory + "/" + m->fileName, offset,
(size - offset));
writeReply(m, newsockfd, "recover", line);
} else if (m->type == "update") {
writeToFile(directory + "/" + m->fileName, m->message);
writeReply(m, newsockfd, "update", m->message);
} else if (m->type == "twophase") {
if (hasFile(files, m->fileName)) {
connectAndReply(m, "commit", "commit");
} else {
connectAndReply(m, "abort", "abort");
}
} else {
this->checkReadWrite(m, newsockfd);
}
throw "BREAKING CONNECTION";
}
// Checks the request type
// 1 - Read: reads the last line of the file and sends it to the
// process 2 - Write: Writes to the file and replies 3 - Create: replies
// with a list of files
// @m - Message just received
// @newsockfd - socket stream it was received from
void checkReadWrite(Message *m, int newsockfd) {
switch (m->readWrite) {
case 1: {
string line = readFile(directory + "/" + m->fileName, m->offset,
m->byteCount);
writeReply(m, newsockfd, "reply", line);
break;
}
case 2: {
string writeMessage = m->message;
if (m->offset > 0) {
writeMessage = m->message.substr(m->offset, m->byteCount);
} else if (m->byteCount > 0) {
writeMessage = m->message.substr(0, m->byteCount);
}
writeToFile(directory + "/" + m->fileName, writeMessage);
writeReply(m, newsockfd, "reply", m->message);
break;
}
default: {
string line = "UNRECOGNIZED MESSAGE !";
connectAndReply(m, "", line);
break;
}
}
}
// Creates an empty chunk in the server directory
void createEmptyChunk(Message *m) {
string name = m->fileName;
Logger("[CREATING NEW CHUNK]: " + name);
ofstream fs;
fs.open(directory + "/" + name, ios::out);
fs.close();
files.push_back(name);
}
~Server() { close(mserverfd); }
};
int main(int argc, char *argv[]) {
if (argc < 3) {
fprintf(stderr, "usage %s ID port\n", argv[0]);
exit(1);
}
Server *server = new Server(argc, argv);
std::thread listenerThread(&Server::listener, server);
std::thread heartbeatThread(&Server::heartBeat, server);
heartbeatThread.join();
listenerThread.join();
logger.close();
return 0;
}