Skip to content
This repository was archived by the owner on Dec 17, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ typedef struct {
ptrdiff_t data_offset;
/* The offset in the memory mapped file of the metadata. */
ptrdiff_t metadata_offset;
/* The size of the memory mapped file. */
int64_t map_size;
/* The size of the data. */
int64_t data_size;
/* The size of the metadata. */
int64_t metadata_size;
/* Numerical value of the fd of the memory mapped file in the store. */
int store_fd_val;
/* The size of the memory mapped file. */
int64_t map_size;
} plasma_reply;

typedef struct {
Expand Down
61 changes: 46 additions & 15 deletions src/plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,51 @@ plasma_store_conn *plasma_store_connect(const char *socket_name);
/* Connect to a possibly remote plasma manager */
int plasma_manager_connect(const char *addr, int port);

void plasma_create(plasma_store_conn *conn,
plasma_id object_id,
int64_t size,
uint8_t *metadata,
int64_t metadata_size,
uint8_t **data);

void plasma_get(plasma_store_conn *conn,
plasma_id object_id,
int64_t *size,
uint8_t **data,
int64_t *metadata_size,
uint8_t **metadata);

void plasma_seal(plasma_store_conn *conn, plasma_id object_id);
/* Create a new object with an object_id provided by the caller, a given
* object size in bytes and given metadata. A pointer to the newly created
* object will be stored in the "data" out parameter. After this call returns,
* the client process can write to the object. After it is done writing,
* the client should call "seal" to make the object immutable so it can
* be shared with other processes. */
void plasma_create_object(plasma_store_conn *conn,
plasma_id object_id,
int64_t size,
uint8_t *metadata,
int64_t metadata_size,
uint8_t **data);

/* Resize an obect that has been created by the same client and that has
* not been sealed yet. All existing pointers to the object will get
* invalidated. */
void plasma_resize_object(plasma_store_conn *conn,
plasma_id object_id,
int64_t new_size,
uint8_t **data);

/* Get an object. If the object has not been sealed yet, this call will block
* until the object is sealed. All or size, data, metadata_size and metadata
* are out parameters that will be set by this function. */
void plasma_get_object(plasma_store_conn *conn,
plasma_id object_id,
int64_t *size,
uint8_t **data,
int64_t *metadata_size,
uint8_t **metadata);

/* Seal an object. This function makes the object immutable and unblocks
* all processes waiting for the object in a "get_object" call. */
void plasma_seal_object(plasma_store_conn *conn, plasma_id object_id);

/* Delete the object from this plasma store. */
void plasma_delete_object(plasma_store_conn *conn, plasma_id object_id);

/* Mark this object as not used any more by this client. Before a deleted
* object can actually be freed, all clients using the object need to
* unmap it. */
void plasma_unmap_object(plasma_id object_id);

/* Fetch an object from a remote plasma store that has the object stored. */
void plasma_fetch_object(plasma_store_conn *conn, plasma_id object_id);


#endif
19 changes: 19 additions & 0 deletions src/plasma_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@
#include <poll.h>
#include "utarray.h"

/* Start writing the object with id "object_id" to the plasma manager with
* with connection index "conn_index". */
void start_write_object(plasma_manager_state *s, object_id object_id, int64_t conn_index);

/* Start reading the object with id "object_id" from the plasma manager with
* connection index "conn_index". */
void start_read_object(plasma_manager_state *s, object_id object_id, int64_t conn_index);

/* Read the next chunk of the object with id "object_id" from the plasma manager
* that is connected to the connection with index "conn_index". */
void read_object_chunk(plasma_manager_state *s, object_id object_id, int64_t conn_index);

/* Write the next chunk of the object with id "object_id" to the plasma manager
* that is connected to the connection with index "conn_index". */
void write_object_chunk(plasma_manager_state *s, object_id object_id, int64_t conn_index);

/* Fetch object with id object_id from plasma manager at address addr. */
void fetch_object(plasma_manager_state *s, object_id object_id, manager_addr addr);

/* The buffer size in bytes. Data will get transfered in multiples of this */
#define BUFSIZE 4096

Expand Down
42 changes: 20 additions & 22 deletions src/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,19 @@

void* dlmalloc(size_t);

typedef struct {
/* Event loop for the plasma store. */
event_loop* loop;
} plasma_store_state;

void init_state(plasma_store_state* s) {
s->loop = malloc(sizeof(event_loop));
event_loop_init(s->loop);
}

typedef struct {
/* Object id of this object. */
plasma_id object_id;
/* Object info like size, creation time and owner. */
plasma_object_info info;
/* Memory mapped file containing the object. */
int fd;
/* Size of the underlying map. */
int64_t map_size;
mmap_handle handle;
/* Offset from the base of the mmap. */
ptrdiff_t offset;
/* Handle for the uthash table. */
UT_hash_handle handle;
} object_table_entry;

/* Objects that are still being written by their owner process. */
object_table_entry* open_objects = NULL;

/* Objects that have already been sealed by their owner process and
* can now be shared with other processes. */
object_table_entry* sealed_objects = NULL;

typedef struct {
/* Object id of this object. */
plasma_id object_id;
Expand All @@ -74,8 +55,25 @@ typedef struct {
UT_hash_handle handle;
} object_notify_entry;

/* Objects that processes are waiting for. */
object_notify_entry* objects_notify = NULL;
typedef struct {
/* Event loop for the plasma store. */
event_loop* loop;
/* Objects that are still being written by their owner process. */
object_table_entry* open_objects;
/* Objects that have already been sealed by their owner process and
* can now be shared with other processes. */
object_table_entry* sealed_objects;
/* Objects that processes are waiting for. */
object_notify_entry* objects_notify;
} plasma_store_state;

void init_state(plasma_store_state* s) {
s->loop = malloc(sizeof(event_loop));
event_loop_init(s->loop);
s->open_objects = NULL;
s->sealed_objects = NULL;
s->objects_notify = NULL;
}

/* Create a new object buffer in the hash table. */
void create_object(int conn, plasma_request* req) {
Expand Down
49 changes: 49 additions & 0 deletions src/plasma_store.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef PLASMA_STORE_H
#define PLASMA_STORE_H

/* Handle to access memory mapped file and map it into client address space */
struct {
int fd;
int64_t mmap_size;
} mmap_handle;

struct {
/* Handle for memory mapped file the object is stored in. */
mmap_handle handle;
/* The offset in the memory mapped file of the data. */
ptrdiff_t data_offset;
/* The offset in the memory mapped file of the metadata. */
ptrdiff_t metadata_offset;
/* The size of the data. */
int64_t data_size;
/* The size of the metadata. */
int64_t metadata_size;
} plasma_object;

/* Create a new object:
*
* object_id: Object ID of the object to be created.
* data_size: Size in bytes of the object to be created.
* metadata_size: Size in bytes of the object metadata.
*/
plasma_object create_object(int conn, plasma_id object_id, int64_t data_size, int64_t metadata_size);

/* Get an object:
*
* object_id: Object ID of the object to be gotten.
*
* If the object is available, it should be returned to conn
* via send_fd. Else, conn should be notified when the object
* is available.
*/
plasma_object get_object(int conn, plasma_id object_id);

/* Seal an object:
*
* req->object_id: Object ID of the object to be sealed.
*
* Should notify all the sockets waiting for the object.
*/
void seal_object(int conn, plasma_id object_id);

#endif