Skip to content
This repository was archived by the owner on Dec 17, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "thirdparty/hiredis"]
path = thirdparty/hiredis
url = https://github.com/redis/hiredis
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500
BUILD = build

SRC = src/event_loop.c src/fling.c src/utils.c

all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example

clean:
rm -r $(BUILD)/*

$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/event_loop.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_store.c src/event_loop.c src/fling.c -o $(BUILD)/plasma_store
$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/fling.h $(SRC)
$(CC) $(CFLAGS) src/plasma_store.c $(SRC) -o $(BUILD)/plasma_store

$(BUILD)/plasma_manager: src/plasma_manager.c src/event_loop.h src/event_loop.c src/plasma.h src/plasma_client.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_manager.c src/event_loop.c src/plasma_client.c src/fling.c -o $(BUILD)/plasma_manager
$(BUILD)/plasma_manager: src/plasma_manager.c src/event_loop.h src/plasma.h src/plasma_directory.h src/plasma_directory.c src/plasma_client.c src/fling.h $(SRC)
$(CC) $(CFLAGS) src/plasma_manager.c src/plasma_client.c src/plasma_directory.c $(SRC) -Ithirdparty thirdparty/hiredis/libhiredis.a -o $(BUILD)/plasma_manager

$(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_client.c src/fling.c -fPIC -shared -o $(BUILD)/plasma_client.so
Expand Down
11 changes: 10 additions & 1 deletion lib/python/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ def __init__(self, socket_name, addr=None, port=None):
self.client.plasma_seal.argtypes = [ctypes.c_int, PlasmaID]
self.client.plasma_seal.restype = None

self.client.plasma_transfer.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int, PlasmaID]
self.client.plasma_transfer.restype = None

self.client.plasma_link.argtypes = [ctypes.c_int, PlasmaID]
self.client.plasma_link.resype = None

self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory
self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
self.buffer_from_memory.restype = ctypes.py_object
Expand All @@ -70,7 +76,10 @@ def create(self, object_id, size):
size (int): The size in bytes of the created buffer.
"""
data = ctypes.c_void_p()
self.client.plasma_create(self.sock, make_plasma_id(object_id), size, ctypes.byref(data))
plasma_id = make_plasma_id(object_id)
self.client.plasma_create(self.sock, plasma_id, size, ctypes.byref(data))
if self.manager_conn != -1:
self.client.plasma_link(self.manager_conn, plasma_id)
return self.buffer_from_read_write_memory(data, size)

def get(self, object_id):
Expand Down
30 changes: 24 additions & 6 deletions src/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,50 @@
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)

#define LOG_REDIS_ERR(context, M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", \
__FILE__, __LINE__, context->errstr, ##__VA_ARGS__)

typedef struct {
int64_t size;
int64_t create_time;
int64_t construct_duration;
} plasma_object_info;

/* Size of object ids */
#define PLASMA_SHA1_SIZE 20

/* Represents an object id hash, can hold a full SHA1 hash */
typedef struct {
unsigned char id[20];
unsigned char id[PLASMA_SHA1_SIZE];
} plasma_id;

enum plasma_request_type {
/* Create a new object. */
PLASMA_CREATE,
/* Get an object. */
PLASMA_GET,
/* seal an object */
/* seal an object. */
PLASMA_SEAL,
/* request transfer to another store */
/* request transfer to another store. */
PLASMA_TRANSFER,
/* Header for sending data */
/* Header for sending data. */
PLASMA_DATA,
/* Put object id into the object table. */
PLASMA_LINK
};

typedef struct {
uint8_t addr[4];
int port;
} plasma_addr;

// TODO(pcm): make this platform independent, see http://stackoverflow.com/a/8001279
typedef struct {
int type;
plasma_id object_id;
int64_t size;
uint8_t addr[4];
int port;
plasma_addr addr;
} plasma_request;

enum plasma_reply_type {
Expand All @@ -71,6 +85,10 @@ typedef struct {
int writable;
} plasma_buffer;

/* Convert a 20 byte sha1 hash to a hexdecimal string. This function assumes
* that buffer points to an already allocated char array of size 2 * PLASMA_SHA1_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);

/* Connect to the local plasma store UNIX domain socket */
int plasma_store_connect(const char* socket_name);

Expand Down
10 changes: 8 additions & 2 deletions src/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,18 @@ int plasma_manager_connect(const char* ip_addr, int port) {
}

void plasma_transfer(int manager, const char* addr, int port, plasma_id object_id) {
plasma_request req = {.type = PLASMA_TRANSFER, .object_id = object_id, .port = port};
plasma_request req = {.type = PLASMA_TRANSFER, .object_id = object_id };
char* end = NULL;
for (int i = 0; i < 4; ++i) {
req.addr[i] = strtol(end ? end : addr, &end, 10);
req.addr.addr[i] = strtol(end ? end : addr, &end, 10);
/* skip the '.' */
end += 1;
}
req.addr.port = port;
plasma_send(manager, &req);
}

void plasma_link(int manager, plasma_id object_id) {
plasma_request req = {.type = PLASMA_LINK, .object_id = object_id};
plasma_send(manager, &req);
}
121 changes: 121 additions & 0 deletions src/plasma_directory.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include "plasma_directory.h"

static void poll_add_read(void *privdata) {
plasma_directory *directory = (plasma_directory*) privdata;
if (!directory->reading) {
directory->reading = 1;
event_loop_get(directory->loop, 0)->events |= POLLIN;
}
}

static void poll_del_read(void *privdata) {
plasma_directory *directory = (plasma_directory*) privdata;
if (directory->reading) {
directory->reading = 0;
event_loop_get(directory->loop, 0)->events &= ~POLLIN;
}
}

static void poll_add_write(void *privdata) {
plasma_directory *directory = (plasma_directory*) privdata;
if (!directory->writing) {
directory->writing = 1;
event_loop_get(directory->loop, 0)->events |= POLLOUT;
}
}

static void poll_del_write(void *privdata) {
plasma_directory *directory = (plasma_directory*) privdata;
if (directory->writing) {
directory->writing = 0;
event_loop_get(directory->loop, 0)->events &= ~POLLOUT;
}
}

#define PLASMA_CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \
do { \
CONTEXT_TYPE *_context = (context); \
if (!_context) { \
LOG_ERR("could not allocate redis context"); \
exit(-1); \
} \
if (_context->err) { \
LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \
exit(-1); \
} \
} while (0);

void plasma_directory_link(plasma_directory* directory, plasma_id object_id) {
static char hex_object_id[2 * PLASMA_SHA1_SIZE + 1];
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
printf("XXX %s\n", &hex_object_id[0]);
redisAsyncCommand(directory->context, NULL, NULL, "SET test value");
redisAsyncCommand(directory->context, NULL, NULL, "SADD obj:%s %d", &hex_object_id[0], directory->manager_id);
if (directory->context->err) {
LOG_REDIS_ERR(directory->context, "could not add directory link");
}
}

void plasma_directory_init(plasma_directory* directory, const char* manager_address, int manager_port, const char* directory_address, int directory_port) {
redisReply *reply;
long long num_managers;
/* First use the synchronous redis API to initialize the connection. */
redisContext *context = redisConnect(directory_address, directory_port);
PLASMA_CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d", directory_address, directory_port);
/* Add new client using optimistic locking. */
while (1) {
reply = redisCommand(context, "WATCH plasma_managers");
freeReplyObject(reply);
reply = redisCommand(context, "HLEN plasma_managers");
num_managers = reply->integer;
freeReplyObject(reply);
reply = redisCommand(context, "MULTI");
freeReplyObject(reply);
reply = redisCommand(context, "HSET plasma_managers %lld %s:%d", num_managers, manager_address, manager_port);
freeReplyObject(reply);
reply = redisCommand(context, "EXEC");
if (reply) {
freeReplyObject(reply);
break;
}
freeReplyObject(reply);
}
redisFree(context);

directory->manager_id = num_managers;

/* Now establish the asynchronous connection. */
directory->context = redisAsyncConnect(directory_address, directory_port);
PLASMA_CHECK_REDIS_CONNECT(redisAsyncContext, directory->context, "could not connect to redis %s:%d", directory_address, directory_port);
directory->context->data = directory;
}

void plasma_directory_event(plasma_directory *directory) {
if (directory->reading) {
redisAsyncHandleRead(directory->context);
}
if (directory->writing) {
redisAsyncHandleWrite(directory->context);
}
}

int plasma_directory_attach(plasma_directory *directory, event_loop *loop) {
directory->loop = loop;

redisAsyncContext *ac = directory->context;
redisContext *c = &(ac->c);

if (ac->ev.data != NULL) {
return REDIS_ERR;
}

ac->ev.addRead = poll_add_read;
ac->ev.delRead = poll_del_read;
ac->ev.addWrite = poll_add_write;
ac->ev.delWrite = poll_del_write;
// TODO(pcm): Implement cleanup function

ac->ev.data = directory;

return c->fd;
}
43 changes: 43 additions & 0 deletions src/plasma_directory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* DATABASE: This file contains the API that lets the object store talk to
* the object table. */

#ifndef PLASMA_DIRECTORY_H
#define PLASMA_DIRECTORY_H

#include <hiredis/hiredis.h>
#include <hiredis/async.h>

#include "plasma.h"
#include "event_loop.h"

typedef struct {
/* Manager ID that this plasma directory is part of (assigned by redis). */
int manager_id;
/* Redis context for this directory. */
redisAsyncContext* context;
/* Which events are we processing (read, write)? */
int reading, writing;
/* The plasma manager event loop. */
event_loop* loop;
} plasma_directory;

/* Waits until the object becomes available and then returns. */
// void plasma_directory_lookup(plasma_manager_state* state, plasma_id object_id, int conn_idx);

/* Link a new object in the directory. */
void plasma_directory_link(plasma_directory *directory, plasma_id object_id);

/* Initialize the plasma directory and connect it to redis. */
void plasma_directory_init(plasma_directory *directory, const char* manager_address, int manager_port, const char *directory_address, int directory_port);

// void plasma_directory_disconnect(plasma_directory *directory);

/* Attach plasma directory to plasma manager run loop. */
int plasma_directory_attach(plasma_directory *directory, event_loop *loop);

/* Should be called by the event loop whenever a new event is triggered on the file descriptor */
void plasma_directory_event(plasma_directory *directory);



#endif
Loading