From a2bd9000e061bbb6e76c2b81841691eddfec6fcb Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 9 Sep 2016 17:15:57 -0700 Subject: [PATCH 1/4] plasma manager object table --- .gitmodules | 3 +++ Makefile | 10 +++++--- lib/python/plasma.py | 11 ++++++++- src/plasma.h | 29 +++++++++++++++++----- src/plasma_client.c | 10 ++++++-- src/plasma_manager.c | 59 ++++++++++++++++++++++++++++++-------------- test/test.py | 4 +-- thirdparty/hiredis | 1 + 8 files changed, 94 insertions(+), 33 deletions(-) create mode 100644 .gitmodules create mode 160000 thirdparty/hiredis diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..4026f82 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "thirdparty/hiredis"] + path = thirdparty/hiredis + url = https://github.com/redis/hiredis diff --git a/Makefile b/Makefile index 64f66c9..d6f0b01 100644 --- a/Makefile +++ b/Makefile @@ -2,16 +2,18 @@ CC = gcc CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 BUILD = build +SRC = src/event_loop.c src/fling.c src/utils.c + all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example clean: rm -r $(BUILD)/* -$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/event_loop.c src/fling.h src/fling.c - $(CC) $(CFLAGS) src/plasma_store.c src/event_loop.c src/fling.c -o $(BUILD)/plasma_store +$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/fling.h $(SRC) + $(CC) $(CFLAGS) src/plasma_store.c $(SRC) -o $(BUILD)/plasma_store -$(BUILD)/plasma_manager: src/plasma_manager.c src/event_loop.h src/event_loop.c src/plasma.h src/plasma_client.c src/fling.h src/fling.c - $(CC) $(CFLAGS) src/plasma_manager.c src/event_loop.c src/plasma_client.c src/fling.c -o $(BUILD)/plasma_manager +$(BUILD)/plasma_manager: src/plasma_manager.c src/event_loop.h src/plasma.h src/plasma_directory.h src/plasma_directory.c src/plasma_client.c src/fling.h $(SRC) + $(CC) $(CFLAGS) src/plasma_manager.c src/plasma_client.c src/plasma_directory.c $(SRC) -Ithirdparty thirdparty/hiredis/libhiredis.a -o $(BUILD)/plasma_manager $(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c $(CC) $(CFLAGS) src/plasma_client.c src/fling.c -fPIC -shared -o $(BUILD)/plasma_client.so diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 5d18b06..61de7fd 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -45,6 +45,12 @@ def __init__(self, socket_name, addr=None, port=None): self.client.plasma_seal.argtypes = [ctypes.c_int, PlasmaID] self.client.plasma_seal.restype = None + self.client.plasma_transfer.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int, PlasmaID] + self.client.plasma_transfer.restype = None + + self.client.plasma_link.argtypes = [ctypes.c_int, PlasmaID] + self.client.plasma_link.resype = None + self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] self.buffer_from_memory.restype = ctypes.py_object @@ -70,7 +76,10 @@ def create(self, object_id, size): size (int): The size in bytes of the created buffer. """ data = ctypes.c_void_p() - self.client.plasma_create(self.sock, make_plasma_id(object_id), size, ctypes.byref(data)) + plasma_id = make_plasma_id(object_id) + self.client.plasma_create(self.sock, plasma_id, size, ctypes.byref(data)) + if self.manager_conn != -1: + self.client.plasma_link(self.manager_conn, plasma_id) return self.buffer_from_read_write_memory(data, size) def get(self, object_id): diff --git a/src/plasma.h b/src/plasma.h index eda883a..1fdb612 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -20,15 +20,22 @@ #define LOG_INFO(M, ...) \ fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__) +#define LOG_REDIS_ERR(context, M, ...) \ + fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", \ + __FILE__, __LINE__, context->errstr, ##__VA_ARGS__) + typedef struct { int64_t size; int64_t create_time; int64_t construct_duration; } plasma_object_info; +/* Size of object ids */ +#define PLASMA_SHA1_SIZE 20 + /* Represents an object id hash, can hold a full SHA1 hash */ typedef struct { - unsigned char id[20]; + unsigned char id[PLASMA_SHA1_SIZE]; } plasma_id; enum plasma_request_type { @@ -36,20 +43,26 @@ enum plasma_request_type { PLASMA_CREATE, /* Get an object. */ PLASMA_GET, - /* seal an object */ + /* seal an object. */ PLASMA_SEAL, - /* request transfer to another store */ + /* request transfer to another store. */ PLASMA_TRANSFER, - /* Header for sending data */ + /* Header for sending data. */ PLASMA_DATA, + /* Put object id into the object table. */ + PLASMA_LINK }; +typedef struct { + uint8_t addr[4]; + int port; +} plasma_addr; + typedef struct { int type; plasma_id object_id; int64_t size; - uint8_t addr[4]; - int port; + plasma_addr addr; } plasma_request; enum plasma_reply_type { @@ -71,6 +84,10 @@ typedef struct { int writable; } plasma_buffer; +/* Convert a 20 byte sha1 hash to a hexdecimal string. This function assumes + * that buffer points to an already allocated char array of size 2 * PLASMA_SHA1_SIZE + 1 */ +char *sha1_to_hex(const unsigned char *sha1, char *buffer); + /* Connect to the local plasma store UNIX domain socket */ int plasma_store_connect(const char* socket_name); diff --git a/src/plasma_client.c b/src/plasma_client.c index d4f2d20..9a7f5c9 100644 --- a/src/plasma_client.c +++ b/src/plasma_client.c @@ -121,12 +121,18 @@ int plasma_manager_connect(const char* ip_addr, int port) { } void plasma_transfer(int manager, const char* addr, int port, plasma_id object_id) { - plasma_request req = {.type = PLASMA_TRANSFER, .object_id = object_id, .port = port}; + plasma_request req = {.type = PLASMA_TRANSFER, .object_id = object_id }; char* end = NULL; for (int i = 0; i < 4; ++i) { - req.addr[i] = strtol(end ? end : addr, &end, 10); + req.addr.addr[i] = strtol(end ? end : addr, &end, 10); /* skip the '.' */ end += 1; } + req.addr.port = port; + plasma_send(manager, &req); +} + +void plasma_link(int manager, plasma_id object_id) { + plasma_request req = {.type = PLASMA_LINK, .object_id = object_id}; plasma_send(manager, &req); } diff --git a/src/plasma_manager.c b/src/plasma_manager.c index 321f409..d5d1875 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -23,12 +23,15 @@ #include "event_loop.h" #include "plasma.h" #include "plasma_manager.h" +#include "plasma_directory.h" typedef struct { /* Name of the socket connecting to local plasma store. */ - const char* store_socket_name; + const char *store_socket_name; /* Event loop. */ - event_loop* loop; + event_loop *loop; + /* Plasma directory */ + plasma_directory directory; } plasma_manager_state; /* Initialize the plasma manager. This function initializes the event loop @@ -50,10 +53,10 @@ void initiate_transfer(plasma_manager_state* s, plasma_request* req) { char ip_addr[16]; snprintf(ip_addr, 32, "%d.%d.%d.%d", - req->addr[0], req->addr[1], - req->addr[2], req->addr[3]); + req->addr.addr[0], req->addr.addr[1], + req->addr.addr[2], req->addr.addr[3]); - int fd = plasma_manager_connect(&ip_addr[0], req->port); + int fd = plasma_manager_connect(&ip_addr[0], req->addr.port); data_connection conn = { .type = DATA_CONNECTION_WRITE, .store_conn = store_conn, .buf = buf, .cursor = 0 }; event_loop_attach(s->loop, CONNECTION_DATA, &conn, fd, POLLOUT); @@ -77,13 +80,17 @@ void start_reading_data(int64_t index, plasma_manager_state* s, plasma_request* void process_command(int64_t id, plasma_manager_state* state, plasma_request* req) { switch (req->type) { case PLASMA_TRANSFER: - LOG_INFO("transfering object to manager with port %d", req->port); + LOG_INFO("transfering object to manager with port %d", req->addr.port); initiate_transfer(state, req); break; case PLASMA_DATA: LOG_INFO("starting to stream data"); start_reading_data(id, state, req); break; + case PLASMA_LINK: + LOG_INFO("putting link into the plasma directory"); + plasma_directory_link(&state->directory, req->object_id); + break; default: LOG_ERR("invalid request %d", req->type); exit(-1); @@ -154,7 +161,6 @@ void read_from_socket(plasma_manager_state* state, struct pollfd *waiting, int64 /* Main event loop of the plasma manager. */ void run_event_loop(int sock, plasma_manager_state* s) { /* Add listening socket. */ - event_loop_attach(s->loop, CONNECTION_LISTENER, NULL, sock, POLLIN); plasma_request req; while (1) { int num_ready = event_loop_poll(s->loop); @@ -166,7 +172,9 @@ void run_event_loop(int sock, plasma_manager_state* s) { struct pollfd *waiting = event_loop_get(s->loop, i); if (waiting->revents == 0) continue; - if (waiting->fd == sock) { + if (i == 0) { + plasma_directory_event(&s->directory); + } else if (waiting->fd == sock) { /* Handle new incoming connections. */ int new_socket = accept(sock, NULL, NULL); if (new_socket < 0) { @@ -186,7 +194,7 @@ void run_event_loop(int sock, plasma_manager_state* s) { } } -void start_server(const char *store_socket_name, const char* master_addr, int port) { +void start_server(const char *store_socket_name, const char* manager_addr, int manager_port, const char* redis_addr, int redis_port) { struct sockaddr_in name; int sock = socket(PF_INET, SOCK_STREAM, 0); if (sock < 0) { @@ -194,7 +202,7 @@ void start_server(const char *store_socket_name, const char* master_addr, int po exit(-1); } name.sin_family = AF_INET; - name.sin_port = htons(port); + name.sin_port = htons(manager_port); name.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; /* TODO(pcm): http://stackoverflow.com/q/1150635 */ @@ -208,34 +216,43 @@ void start_server(const char *store_socket_name, const char* master_addr, int po LOG_ERR("could not bind socket"); exit(-1); } - LOG_INFO("listening on port %d", port); + LOG_INFO("listening on port %d", manager_port); if (listen(sock, 5) == -1) { LOG_ERR("could not listen to socket"); exit(-1); } plasma_manager_state state; init_plasma_manager(&state, store_socket_name); + plasma_directory_init(&state.directory, manager_addr, manager_port, redis_addr, redis_port); + event_loop_attach(state.loop, CONNECTION_LISTENER, NULL, sock, POLLIN); + int redis_conn = plasma_directory_attach(&state.directory); + event_loop_attach(state.loop, CONNECTION_REDIS, NULL, redis_conn, 0); run_event_loop(sock, &state); } int main(int argc, char* argv[]) { /* Socket name of the plasma store this manager is connected to. */ char *store_socket_name = NULL; + /* IP address and port of redis. */ + char *redis_addr_port = NULL; /* IP address of this node. */ - char *master_addr = NULL; + char *manager_addr = NULL; /* Port number the manager should use. */ - int port; + int manager_port; int c; - while ((c = getopt(argc, argv, "s:m:p:")) != -1) { + while ((c = getopt(argc, argv, "s:m:p:r:")) != -1) { switch (c) { case 's': store_socket_name = optarg; break; case 'm': - master_addr = optarg; + manager_addr = optarg; break; case 'p': - port = atoi(optarg); + manager_port = atoi(optarg); + break; + case 'r': + redis_addr_port = optarg; break; default: LOG_ERR("unknown option %c", c); @@ -246,9 +263,15 @@ int main(int argc, char* argv[]) { LOG_ERR("please specify socket for connecting to the plasma store with -s switch"); exit(-1); } - if (!master_addr) { + if (!manager_addr) { LOG_ERR("please specify ip address of the current host in the format 123.456.789.10 with -m switch"); exit(-1); } - start_server(store_socket_name, master_addr, port); + char redis_addr[16] = { 0 }; + char redis_port[6] = { 0 }; + if(!redis_addr_port || sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != 2) { + LOG_ERR("need to specify redis address like 127.0.0.1:6379 with -r switch"); + exit(-1); + } + start_server(store_socket_name, manager_addr, manager_port, &redis_addr[0], atoi(redis_port)); } diff --git a/test/test.py b/test/test.py index 0fb54d3..f5cb4f9 100644 --- a/test/test.py +++ b/test/test.py @@ -76,8 +76,8 @@ def setUp(self): self.port1 = random.randint(10000, 50000) self.port2 = random.randint(10000, 50000) plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_manager") - self.p4 = subprocess.Popen([plasma_manager_executable, "-s", store_name1, "-m", "127.0.0.1", "-p", str(self.port1)]) - self.p5 = subprocess.Popen([plasma_manager_executable, "-s", store_name2, "-m", "127.0.0.1", "-p", str(self.port2)]) + self.p4 = subprocess.Popen([plasma_manager_executable, "-s", store_name1, "-m", "127.0.0.1", "-p", str(self.port1), "-r", "127.0.0.1:6379"]) + self.p5 = subprocess.Popen([plasma_manager_executable, "-s", store_name2, "-m", "127.0.0.1", "-p", str(self.port2), "-r", "127.0.0.1:6379"]) time.sleep(0.1) # Connect two PlasmaClients. self.client1 = plasma.PlasmaClient(store_name1, "127.0.0.1", self.port1) diff --git a/thirdparty/hiredis b/thirdparty/hiredis new file mode 160000 index 0000000..5f98e1d --- /dev/null +++ b/thirdparty/hiredis @@ -0,0 +1 @@ +Subproject commit 5f98e1d35dcf00a026793ada2662f6e1ba77eb17 From 6b161b922739fdf6851f2e9d64fb1d16dab7137c Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 10 Sep 2016 10:10:47 -0700 Subject: [PATCH 2/4] fix redis event loop polling --- src/plasma.h | 1 + src/plasma_manager.c | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/plasma.h b/src/plasma.h index 1fdb612..3e157dc 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -58,6 +58,7 @@ typedef struct { int port; } plasma_addr; +// TODO(pcm): make this platform independent, see http://stackoverflow.com/a/8001279 typedef struct { int type; plasma_id object_id; diff --git a/src/plasma_manager.c b/src/plasma_manager.c index d5d1875..44a9f10 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -224,9 +224,10 @@ void start_server(const char *store_socket_name, const char* manager_addr, int m plasma_manager_state state; init_plasma_manager(&state, store_socket_name); plasma_directory_init(&state.directory, manager_addr, manager_port, redis_addr, redis_port); + int redis_conn = plasma_directory_attach(&state.directory, state.loop); + int64_t index = event_loop_attach(state.loop, CONNECTION_REDIS, NULL, redis_conn, POLLIN | POLLOUT); + assert(index == 0); event_loop_attach(state.loop, CONNECTION_LISTENER, NULL, sock, POLLIN); - int redis_conn = plasma_directory_attach(&state.directory); - event_loop_attach(state.loop, CONNECTION_REDIS, NULL, redis_conn, 0); run_event_loop(sock, &state); } From 9ffc3653564d061f8e928fcb58456e9ae1ec1a3c Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 10 Sep 2016 10:12:40 -0700 Subject: [PATCH 3/4] commit plasma directory --- src/plasma_directory.c | 121 +++++++++++++++++++++++++++++++++++++++++ src/plasma_directory.h | 43 +++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 src/plasma_directory.c create mode 100644 src/plasma_directory.h diff --git a/src/plasma_directory.c b/src/plasma_directory.c new file mode 100644 index 0000000..75438f3 --- /dev/null +++ b/src/plasma_directory.c @@ -0,0 +1,121 @@ +#include "plasma_directory.h" + +static void poll_add_read(void *privdata) { + plasma_directory *directory = (plasma_directory*) privdata; + if (!directory->reading) { + directory->reading = 1; + event_loop_get(directory->loop, 0)->events |= POLLIN; + } +} + +static void poll_del_read(void *privdata) { + plasma_directory *directory = (plasma_directory*) privdata; + if (directory->reading) { + directory->reading = 0; + event_loop_get(directory->loop, 0)->events &= ~POLLIN; + } +} + +static void poll_add_write(void *privdata) { + plasma_directory *directory = (plasma_directory*) privdata; + if (!directory->writing) { + directory->writing = 1; + event_loop_get(directory->loop, 0)->events |= POLLOUT; + } +} + +static void poll_del_write(void *privdata) { + plasma_directory *directory = (plasma_directory*) privdata; + if (directory->writing) { + directory->writing = 0; + event_loop_get(directory->loop, 0)->events &= ~POLLOUT; + } +} + +#define PLASMA_CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \ + do { \ + CONTEXT_TYPE *_context = (context); \ + if (!_context) { \ + LOG_ERR("could not allocate redis context"); \ + exit(-1); \ + } \ + if (_context->err) { \ + LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \ + exit(-1); \ + } \ + } while (0); + +void plasma_directory_link(plasma_directory* directory, plasma_id object_id) { + static char hex_object_id[2 * PLASMA_SHA1_SIZE + 1]; + sha1_to_hex(&object_id.id[0], &hex_object_id[0]); + printf("XXX %s\n", &hex_object_id[0]); + redisAsyncCommand(directory->context, NULL, NULL, "SET test value"); + redisAsyncCommand(directory->context, NULL, NULL, "SADD obj:%s %d", &hex_object_id[0], directory->manager_id); + if (directory->context->err) { + LOG_REDIS_ERR(directory->context, "could not add directory link"); + } +} + +void plasma_directory_init(plasma_directory* directory, const char* manager_address, int manager_port, const char* directory_address, int directory_port) { + redisReply *reply; + long long num_managers; + /* First use the synchronous redis API to initialize the connection. */ + redisContext *context = redisConnect(directory_address, directory_port); + PLASMA_CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d", directory_address, directory_port); + /* Add new client using optimistic locking. */ + while (1) { + reply = redisCommand(context, "WATCH plasma_managers"); + freeReplyObject(reply); + reply = redisCommand(context, "HLEN plasma_managers"); + num_managers = reply->integer; + freeReplyObject(reply); + reply = redisCommand(context, "MULTI"); + freeReplyObject(reply); + reply = redisCommand(context, "HSET plasma_managers %lld %s:%d", num_managers, manager_address, manager_port); + freeReplyObject(reply); + reply = redisCommand(context, "EXEC"); + if (reply) { + freeReplyObject(reply); + break; + } + freeReplyObject(reply); + } + redisFree(context); + + directory->manager_id = num_managers; + + /* Now establish the asynchronous connection. */ + directory->context = redisAsyncConnect(directory_address, directory_port); + PLASMA_CHECK_REDIS_CONNECT(redisAsyncContext, directory->context, "could not connect to redis %s:%d", directory_address, directory_port); + directory->context->data = directory; +} + +void plasma_directory_event(plasma_directory *directory) { + if (directory->reading) { + redisAsyncHandleRead(directory->context); + } + if (directory->writing) { + redisAsyncHandleWrite(directory->context); + } +} + +int plasma_directory_attach(plasma_directory *directory, event_loop *loop) { + directory->loop = loop; + + redisAsyncContext *ac = directory->context; + redisContext *c = &(ac->c); + + if (ac->ev.data != NULL) { + return REDIS_ERR; + } + + ac->ev.addRead = poll_add_read; + ac->ev.delRead = poll_del_read; + ac->ev.addWrite = poll_add_write; + ac->ev.delWrite = poll_del_write; + // TODO(pcm): Implement cleanup function + + ac->ev.data = directory; + + return c->fd; +} diff --git a/src/plasma_directory.h b/src/plasma_directory.h new file mode 100644 index 0000000..53b5a78 --- /dev/null +++ b/src/plasma_directory.h @@ -0,0 +1,43 @@ +/* DATABASE: This file contains the API that lets the object store talk to + * the object table. */ + +#ifndef PLASMA_DIRECTORY_H +#define PLASMA_DIRECTORY_H + +#include +#include + +#include "plasma.h" +#include "event_loop.h" + +typedef struct { + /* Manager ID that this plasma directory is part of (assigned by redis). */ + int manager_id; + /* Redis context for this directory. */ + redisAsyncContext* context; + /* Which events are we processing (read, write)? */ + int reading, writing; + /* The plasma manager event loop. */ + event_loop* loop; +} plasma_directory; + +/* Waits until the object becomes available and then returns. */ +// void plasma_directory_lookup(plasma_manager_state* state, plasma_id object_id, int conn_idx); + +/* Link a new object in the directory. */ +void plasma_directory_link(plasma_directory *directory, plasma_id object_id); + +/* Initialize the plasma directory and connect it to redis. */ +void plasma_directory_init(plasma_directory *directory, const char* manager_address, int manager_port, const char *directory_address, int directory_port); + +// void plasma_directory_disconnect(plasma_directory *directory); + +/* Attach plasma directory to plasma manager run loop. */ +int plasma_directory_attach(plasma_directory *directory, event_loop *loop); + +/* Should be called by the event loop whenever a new event is triggered on the file descriptor */ +void plasma_directory_event(plasma_directory *directory); + + + +#endif From 004ef49ac9d4efe3be36a75b4ecfd77ca8890b48 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 10 Sep 2016 10:15:28 -0700 Subject: [PATCH 4/4] add utils.c --- src/utils.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 src/utils.c diff --git a/src/utils.c b/src/utils.c new file mode 100644 index 0000000..51b77ae --- /dev/null +++ b/src/utils.c @@ -0,0 +1,15 @@ +#include "plasma.h" + +char *sha1_to_hex(const unsigned char *sha1, char *buffer) { + static const char hex[] = "0123456789abcdef"; + char *buf = buffer; + + for (int i = 0; i < PLASMA_SHA1_SIZE; i++) { + unsigned int val = *sha1++; + *buf++ = hex[val >> 4]; + *buf++ = hex[val & 0xf]; + } + *buf = '\0'; + + return buffer; +}