From 40052d91c3a3105476b9926f9bb0922724d576fd Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 20 Sep 2016 23:12:22 -0700 Subject: [PATCH 1/5] apis --- src/plasma.h | 4 ++-- src/plasma_client.h | 7 +++++++ src/plasma_manager.h | 7 +++++++ src/plasma_store.c | 42 ++++++++++++++++++------------------- src/plasma_store.h | 49 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 24 deletions(-) create mode 100644 src/plasma_store.h diff --git a/src/plasma.h b/src/plasma.h index ddf89ad..78ed82e 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -75,14 +75,14 @@ typedef struct { ptrdiff_t data_offset; /* The offset in the memory mapped file of the metadata. */ ptrdiff_t metadata_offset; - /* The size of the memory mapped file. */ - int64_t map_size; /* The size of the data. */ int64_t data_size; /* The size of the metadata. */ int64_t metadata_size; /* Numerical value of the fd of the memory mapped file in the store. */ int store_fd_val; + /* The size of the memory mapped file. */ + int64_t map_size; } plasma_reply; typedef struct { diff --git a/src/plasma_client.h b/src/plasma_client.h index 4c7cc00..c2ccacc 100644 --- a/src/plasma_client.h +++ b/src/plasma_client.h @@ -15,6 +15,11 @@ void plasma_create(plasma_store_conn *conn, int64_t metadata_size, uint8_t **data); +void plasma_resize(plasma_store_conn *conn, + plasma_id object_id, + int64_t new_size, + uint8_t **data); + void plasma_get(plasma_store_conn *conn, plasma_id object_id, int64_t *size, @@ -24,4 +29,6 @@ void plasma_get(plasma_store_conn *conn, void plasma_seal(plasma_store_conn *conn, plasma_id object_id); +void plasma_fetch(plasma_store_conn *conn, plasma_id object_id); + #endif diff --git a/src/plasma_manager.h b/src/plasma_manager.h index f7cf6b4..481fde6 100644 --- a/src/plasma_manager.h +++ b/src/plasma_manager.h @@ -4,6 +4,13 @@ #include #include "utarray.h" +void start_write_object(plasma_manager_state *s, object_id object_id); +void start_read_object(plasma_manager_state *s, object_id object_id); +void read_object_chunk(plasma_manager_state *s, object_id object_id); +void write_object_chunk(plasma_manager_state *s, object_id object_id); + +void fetch_object(plasma_manager_state *s, object_id object_id, manager_addr addr); + /* The buffer size in bytes. Data will get transfered in multiples of this */ #define BUFSIZE 4096 diff --git a/src/plasma_store.c b/src/plasma_store.c index a7aef2c..f6b2a71 100644 --- a/src/plasma_store.c +++ b/src/plasma_store.c @@ -31,38 +31,19 @@ void* dlmalloc(size_t); -typedef struct { - /* Event loop for the plasma store. */ - event_loop* loop; -} plasma_store_state; - -void init_state(plasma_store_state* s) { - s->loop = malloc(sizeof(event_loop)); - event_loop_init(s->loop); -} - typedef struct { /* Object id of this object. */ plasma_id object_id; /* Object info like size, creation time and owner. */ plasma_object_info info; /* Memory mapped file containing the object. */ - int fd; - /* Size of the underlying map. */ - int64_t map_size; + mmap_handle handle; /* Offset from the base of the mmap. */ ptrdiff_t offset; /* Handle for the uthash table. */ UT_hash_handle handle; } object_table_entry; -/* Objects that are still being written by their owner process. */ -object_table_entry* open_objects = NULL; - -/* Objects that have already been sealed by their owner process and - * can now be shared with other processes. */ -object_table_entry* sealed_objects = NULL; - typedef struct { /* Object id of this object. */ plasma_id object_id; @@ -74,8 +55,25 @@ typedef struct { UT_hash_handle handle; } object_notify_entry; -/* Objects that processes are waiting for. */ -object_notify_entry* objects_notify = NULL; +typedef struct { + /* Event loop for the plasma store. */ + event_loop* loop; + /* Objects that are still being written by their owner process. */ + object_table_entry* open_objects; + /* Objects that have already been sealed by their owner process and + * can now be shared with other processes. */ + object_table_entry* sealed_objects; + /* Objects that processes are waiting for. */ + object_notify_entry* objects_notify; +} plasma_store_state; + +void init_state(plasma_store_state* s) { + s->loop = malloc(sizeof(event_loop)); + event_loop_init(s->loop); + s->open_objects = NULL; + s->sealed_objects = NULL; + s->objects_notify = NULL; +} /* Create a new object buffer in the hash table. */ void create_object(int conn, plasma_request* req) { diff --git a/src/plasma_store.h b/src/plasma_store.h new file mode 100644 index 0000000..713d34d --- /dev/null +++ b/src/plasma_store.h @@ -0,0 +1,49 @@ +#ifndef PLASMA_STORE_H +#define PLASMA_STORE_H + +/* Handle to access memory mapped file and map it into client address space */ +struct { + int fd; + int64_t mmap_size; +} mmap_handle; + +struct { + /* Handle for memory mapped file the object is stored in. */ + mmap_handle handle; + /* The offset in the memory mapped file of the data. */ + ptrdiff_t data_offset; + /* The offset in the memory mapped file of the metadata. */ + ptrdiff_t metadata_offset; + /* The size of the data. */ + int64_t data_size; + /* The size of the metadata. */ + int64_t metadata_size; +} plasma_object; + +/* Create a new object: + * + * object_id: Object ID of the object to be created. + * data_size: Size in bytes of the object to be created. + * metadata_size: Size in bytes of the object metadata. + */ +plasma_object create_object(int conn, plasma_id object_id, int64_t data_size, int64_t metadata_size); + +/* Get an object: + * + * object_id: Object ID of the object to be gotten. + * + * If the object is available, it should be returned to conn + * via send_fd. Else, conn should be notified when the object + * is available. + */ +plasma_object get_object(int conn, plasma_id object_id); + +/* Seal an object: + * + * req->object_id: Object ID of the object to be sealed. + * + * Should notify all the sockets waiting for the object. + */ +void seal_object(int conn, plasma_id object_id); + +#endif From 63d9c33e01d76002ebea3c63d01accdc9b2dc45d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 20 Sep 2016 23:41:37 -0700 Subject: [PATCH 2/5] add delete and unmap --- src/plasma_client.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/plasma_client.h b/src/plasma_client.h index c2ccacc..0992c3e 100644 --- a/src/plasma_client.h +++ b/src/plasma_client.h @@ -29,6 +29,11 @@ void plasma_get(plasma_store_conn *conn, void plasma_seal(plasma_store_conn *conn, plasma_id object_id); +void plasma_delete(plasma_store_conn *conn, plasma_id object_id); + +void plasma_unmap(plasma_id object_id); + void plasma_fetch(plasma_store_conn *conn, plasma_id object_id); + #endif From f741024646a5e72111ffbb38df74896bec63c545 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Wed, 21 Sep 2016 14:04:08 -0700 Subject: [PATCH 3/5] fix API --- src/plasma_manager.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/plasma_manager.h b/src/plasma_manager.h index 481fde6..74048cc 100644 --- a/src/plasma_manager.h +++ b/src/plasma_manager.h @@ -4,10 +4,10 @@ #include #include "utarray.h" -void start_write_object(plasma_manager_state *s, object_id object_id); -void start_read_object(plasma_manager_state *s, object_id object_id); -void read_object_chunk(plasma_manager_state *s, object_id object_id); -void write_object_chunk(plasma_manager_state *s, object_id object_id); +void start_write_object(plasma_manager_state *s, int64_t conn_index, object_id object_id); +void start_read_object(plasma_manager_state *s, int64_t conn_index, object_id object_id); +void read_object_chunk(plasma_manager_state *s, int64_t conn_index, object_id object_id); +void write_object_chunk(plasma_manager_state *s, int64_t conn_index, object_id object_id); void fetch_object(plasma_manager_state *s, object_id object_id, manager_addr addr); From fa0a868643587953e305d19430b5435790dfd510 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Thu, 22 Sep 2016 17:05:01 -0700 Subject: [PATCH 4/5] add comments --- src/plasma_client.h | 71 ++++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/src/plasma_client.h b/src/plasma_client.h index 0992c3e..d70bfb4 100644 --- a/src/plasma_client.h +++ b/src/plasma_client.h @@ -8,32 +8,51 @@ plasma_store_conn *plasma_store_connect(const char *socket_name); /* Connect to a possibly remote plasma manager */ int plasma_manager_connect(const char *addr, int port); -void plasma_create(plasma_store_conn *conn, - plasma_id object_id, - int64_t size, - uint8_t *metadata, - int64_t metadata_size, - uint8_t **data); - -void plasma_resize(plasma_store_conn *conn, - plasma_id object_id, - int64_t new_size, - uint8_t **data); - -void plasma_get(plasma_store_conn *conn, - plasma_id object_id, - int64_t *size, - uint8_t **data, - int64_t *metadata_size, - uint8_t **metadata); - -void plasma_seal(plasma_store_conn *conn, plasma_id object_id); - -void plasma_delete(plasma_store_conn *conn, plasma_id object_id); - -void plasma_unmap(plasma_id object_id); - -void plasma_fetch(plasma_store_conn *conn, plasma_id object_id); +/* Create a new object with an object_id provided by the caller, a given + * object size in bytes and given metadata. A pointer to the newly created + * object will be stored in the "data" out parameter. After this call returns, + * the client process can write to the object. After it is done writing, + * the client should call "seal" to make the object immutable so it can + * be shared with other processes. */ +void plasma_create_object(plasma_store_conn *conn, + plasma_id object_id, + int64_t size, + uint8_t *metadata, + int64_t metadata_size, + uint8_t **data); + +/* Resize an obect that has been created by the same client and that has + * not been sealed yet. All existing pointers to the object will get + * invalidated. */ +void plasma_resize_object(plasma_store_conn *conn, + plasma_id object_id, + int64_t new_size, + uint8_t **data); + +/* Get an object. If the object has not been sealed yet, this call will block + * until the object is sealed. All or size, data, metadata_size and metadata + * are out parameters that will be set by this function. */ +void plasma_get_object(plasma_store_conn *conn, + plasma_id object_id, + int64_t *size, + uint8_t **data, + int64_t *metadata_size, + uint8_t **metadata); + +/* Seal an object. This function makes the object immutable and unblocks + * all processes waiting for the object in a "get_object" call. */ +void plasma_seal_object(plasma_store_conn *conn, plasma_id object_id); + +/* Delete the object from this plasma store. */ +void plasma_delete_object(plasma_store_conn *conn, plasma_id object_id); + +/* Mark this object as not used any more by this client. Before a deleted + * object can actually be freed, all clients using the object need to + * unmap it. */ +void plasma_unmap_object(plasma_id object_id); + +/* Fetch an object from a remote plasma store that has the object stored. */ +void plasma_fetch_object(plasma_store_conn *conn, plasma_id object_id); #endif From 0718798cd87a0c7fdccaf274fce4a0c5de9ea90e Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Thu, 22 Sep 2016 17:53:16 -0700 Subject: [PATCH 5/5] update API --- src/plasma_manager.h | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/plasma_manager.h b/src/plasma_manager.h index 74048cc..cc8ff54 100644 --- a/src/plasma_manager.h +++ b/src/plasma_manager.h @@ -4,11 +4,23 @@ #include #include "utarray.h" -void start_write_object(plasma_manager_state *s, int64_t conn_index, object_id object_id); -void start_read_object(plasma_manager_state *s, int64_t conn_index, object_id object_id); -void read_object_chunk(plasma_manager_state *s, int64_t conn_index, object_id object_id); -void write_object_chunk(plasma_manager_state *s, int64_t conn_index, object_id object_id); +/* Start writing the object with id "object_id" to the plasma manager with + * with connection index "conn_index". */ +void start_write_object(plasma_manager_state *s, object_id object_id, int64_t conn_index); +/* Start reading the object with id "object_id" from the plasma manager with + * connection index "conn_index". */ +void start_read_object(plasma_manager_state *s, object_id object_id, int64_t conn_index); + +/* Read the next chunk of the object with id "object_id" from the plasma manager + * that is connected to the connection with index "conn_index". */ +void read_object_chunk(plasma_manager_state *s, object_id object_id, int64_t conn_index); + +/* Write the next chunk of the object with id "object_id" to the plasma manager + * that is connected to the connection with index "conn_index". */ +void write_object_chunk(plasma_manager_state *s, object_id object_id, int64_t conn_index); + +/* Fetch object with id object_id from plasma manager at address addr. */ void fetch_object(plasma_manager_state *s, object_id object_id, manager_addr addr); /* The buffer size in bytes. Data will get transfered in multiples of this */