Skip to content

Commit f41b561

Browse files
committed
feat(heap): allow create/use VM/heaps on custom threads
1 parent 3fa6cc7 commit f41b561

10 files changed

Lines changed: 324 additions & 12 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ CORE_OBJECTS = core/poll.o core/ipc.o core/runtime.o core/sys.o core/os.o core/p
6060
core/sock.o core/error.o core/math.o core/cmp.o core/items.o core/logic.o core/compose.o core/order.o core/io.o\
6161
core/misc.o core/freelist.o core/update.o core/join.o core/query.o core/cond.o\
6262
core/iter.o core/dynlib.o core/aggr.o core/index.o core/group.o core/filter.o core/atomic.o\
63-
core/thread.o core/pool.o core/progress.o core/fdmap.o core/signal.o core/log.o core/pivot.o
63+
core/thread.o core/pool.o core/ctx.o core/progress.o core/fdmap.o core/signal.o core/log.o core/pivot.o
6464
APP_COMMON = app/repl.o app/term.o
6565
APP_OBJECTS = app/main.o $(APP_COMMON)
6666
TESTS_OBJECTS = tests/main.o

core/ctx.c

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
#include "ctx.h"
2+
#include "heap.h"
3+
#include "eval.h"
4+
#include "pool.h"
5+
#include "runtime.h"
6+
#include "mmap.h"
7+
#include "log.h"
8+
9+
#include <assert.h>
10+
11+
ctx_registry_t __ctx_registry;
12+
13+
nil_t ctx_registry_init(nil_t) {
14+
__ctx_registry.lock = mutex_create();
15+
__ctx_registry.count = 0;
16+
memset(__ctx_registry.entries, 0, sizeof(__ctx_registry.entries));
17+
}
18+
19+
nil_t ctx_registry_destroy(nil_t) {
20+
i64_t i;
21+
ray_ctx_p ctx;
22+
23+
mutex_lock(&__ctx_registry.lock);
24+
for (i = 0; i < __ctx_registry.count; i++) {
25+
ctx = __ctx_registry.entries[i];
26+
if (ctx != NULL && ctx->heap != NULL) {
27+
LOG_WARN("Leaked custom thread context id=%lld — force merging heap", ctx->id);
28+
heap_push_pending(ctx->heap);
29+
}
30+
if (ctx != NULL)
31+
mmap_free(ctx, sizeof(struct ray_ctx_t));
32+
__ctx_registry.entries[i] = NULL;
33+
}
34+
__ctx_registry.count = 0;
35+
mutex_unlock(&__ctx_registry.lock);
36+
37+
// Drain any pending heaps so they get merged before final cleanup
38+
heap_drain_pending();
39+
40+
mutex_destroy(&__ctx_registry.lock);
41+
}
42+
43+
ray_ctx_p ray_ctx_create(nil_t) {
44+
ray_ctx_p ctx;
45+
i64_t id;
46+
vm_p vm;
47+
48+
assert(__VM == NULL && "ray_ctx_create: thread already has a VM");
49+
50+
id = heap_next_id();
51+
52+
// Create VM + heap for this thread (sets __VM)
53+
vm = vm_create(id, __RUNTIME->pool);
54+
vm->rc_sync = 1; // Always use atomic RC for custom threads
55+
56+
// Register in global registry
57+
mutex_lock(&__ctx_registry.lock);
58+
if (__ctx_registry.count >= MAX_CUSTOM_THREADS) {
59+
mutex_unlock(&__ctx_registry.lock);
60+
LOG_ERROR("ray_ctx_create: too many custom threads (max %d)", MAX_CUSTOM_THREADS);
61+
vm_destroy(vm);
62+
return NULL;
63+
}
64+
assert(__ctx_registry.count < MAX_CUSTOM_THREADS && "ray_ctx_create: too many custom threads");
65+
66+
ctx = (ray_ctx_p)mmap_alloc(sizeof(struct ray_ctx_t));
67+
if (ctx == NULL) {
68+
mutex_unlock(&__ctx_registry.lock);
69+
LOG_ERROR("ray_ctx_create: failed to allocate context");
70+
vm_destroy(vm);
71+
return NULL;
72+
}
73+
74+
ctx->id = id;
75+
ctx->heap = vm->heap;
76+
ctx->vm = vm;
77+
__ctx_registry.entries[__ctx_registry.count++] = ctx;
78+
mutex_unlock(&__ctx_registry.lock);
79+
80+
return ctx;
81+
}
82+
83+
nil_t ray_ctx_destroy(ray_ctx_p ctx) {
84+
i64_t i;
85+
vm_p vm;
86+
heap_p heap;
87+
b8_t found = B8_FALSE;
88+
89+
assert(__VM == ctx->vm && "ray_ctx_destroy: called from wrong thread");
90+
91+
vm = ctx->vm;
92+
heap = ctx->heap;
93+
94+
// Flush own foreign blocks
95+
heap_flush_foreign(heap);
96+
97+
// Flush slab caches to freelists
98+
heap_flush_slabs(heap);
99+
100+
// Clear VM stack objects
101+
while (vm->sp > 0)
102+
drop_obj(vm->ps[--vm->sp]);
103+
104+
// Free VM-owned resources
105+
if (vm->timeit) {
106+
heap_free(vm->timeit);
107+
vm->timeit = NULL;
108+
}
109+
110+
// Push heap to pending merge queue (main thread will drain it)
111+
heap_push_pending(heap);
112+
113+
// Detach heap from VM before destroying VM (heap stays alive for main to drain)
114+
vm->heap = NULL;
115+
__VM = NULL;
116+
mmap_free(vm, sizeof(struct vm_t));
117+
118+
// Unregister from global registry
119+
mutex_lock(&__ctx_registry.lock);
120+
for (i = 0; i < __ctx_registry.count; i++) {
121+
if (__ctx_registry.entries[i] == ctx) {
122+
// Swap with last entry
123+
__ctx_registry.entries[i] = __ctx_registry.entries[--__ctx_registry.count];
124+
found = B8_TRUE;
125+
break;
126+
}
127+
}
128+
mutex_unlock(&__ctx_registry.lock);
129+
130+
if (!found) {
131+
LOG_WARN("ray_ctx_destroy: context not found in registry (id=%lld)", ctx->id);
132+
return;
133+
}
134+
135+
mmap_free(ctx, sizeof(struct ray_ctx_t));
136+
}

core/ctx.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#ifndef CTX_H
2+
#define CTX_H
3+
4+
#include "rayforce.h"
5+
#include "thread.h"
6+
7+
#define MAX_CUSTOM_THREADS 64
8+
9+
struct heap_t;
10+
struct vm_t;
11+
12+
typedef struct ray_ctx_t {
13+
i64_t id;
14+
struct heap_t *heap;
15+
struct vm_t *vm;
16+
} *ray_ctx_p;
17+
18+
typedef struct ctx_registry_t {
19+
mutex_t lock;
20+
i64_t count;
21+
ray_ctx_p entries[MAX_CUSTOM_THREADS];
22+
} ctx_registry_t;
23+
24+
// Global registry (initialized during runtime_create)
25+
extern ctx_registry_t __ctx_registry;
26+
27+
nil_t ctx_registry_init(nil_t);
28+
nil_t ctx_registry_destroy(nil_t);
29+
30+
// Public API — call ON the custom thread
31+
ray_ctx_p ray_ctx_create(nil_t);
32+
nil_t ray_ctx_destroy(ray_ctx_p ctx);
33+
34+
#endif // CTX_H

core/heap.c

Lines changed: 118 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,62 @@
3535
#include "eval.h"
3636
#include "error.h"
3737

38+
// Global heap ID bitmap (u16 range)
39+
#define HEAP_ID_WORDS 1024 // 1024 * 64 = 65536 IDs
40+
#define HEAP_ID_BITS (HEAP_ID_WORDS * 64ull)
41+
static u64_t __heap_id_bitmap[HEAP_ID_WORDS];
42+
static u64_t __heap_id_cursor = 0;
43+
44+
static i64_t heap_id_acquire(nil_t) {
45+
u64_t start = __atomic_fetch_add(&__heap_id_cursor, 1, __ATOMIC_RELAXED);
46+
for (u64_t off = 0; off < HEAP_ID_WORDS; off++) {
47+
u64_t idx = (start + off) % HEAP_ID_WORDS;
48+
u64_t word = __atomic_load_n(&__heap_id_bitmap[idx], __ATOMIC_RELAXED);
49+
if (~word == 0ull)
50+
continue;
51+
52+
for (;;) {
53+
u64_t free_bits = ~word;
54+
if (free_bits == 0ull)
55+
break;
56+
57+
u64_t bit = (u64_t)__builtin_ctzll(free_bits);
58+
u64_t mask = 1ull << bit;
59+
u64_t new_word = word | mask;
60+
61+
if (__atomic_compare_exchange_n(&__heap_id_bitmap[idx], &word, new_word, 0,
62+
__ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
63+
return (i64_t)(idx * 64ull + bit);
64+
}
65+
66+
if (~word == 0ull)
67+
break;
68+
}
69+
}
70+
71+
return -1;
72+
}
73+
74+
static nil_t heap_id_release(i64_t id) {
75+
if (id < 0 || id >= (i64_t)HEAP_ID_BITS)
76+
return;
77+
78+
u64_t idx = (u64_t)id >> 6;
79+
u64_t bit = (u64_t)id & 63ull;
80+
u64_t mask = ~(1ull << bit);
81+
__atomic_fetch_and(&__heap_id_bitmap[idx], mask, __ATOMIC_RELEASE);
82+
}
83+
84+
// Pending merge queue head (lock-free LIFO)
85+
heap_p __heap_pending_merge = NULL;
86+
87+
i64_t heap_next_id(nil_t) {
88+
i64_t id = heap_id_acquire();
89+
if (UNLIKELY(id < 0))
90+
PANIC("heap id pool exhausted");
91+
return id;
92+
}
93+
3894
// Slab cache helpers
3995
#define SLAB_ORDER_MIN MIN_BLOCK_ORDER
4096
#define SLAB_ORDER_MAX (MIN_BLOCK_ORDER + SLAB_ORDERS - 1)
@@ -103,7 +159,7 @@ heap_p heap_get(nil_t) {
103159

104160
#ifdef SYS_MALLOC
105161

106-
static nil_t heap_flush_slabs(heap_p heap) { UNUSED(heap); } // No-op for system malloc
162+
nil_t heap_flush_slabs(heap_p heap) { UNUSED(heap); } // No-op for system malloc
107163

108164
raw_p heap_alloc(i64_t size) { return malloc(size); }
109165
raw_p heap_mmap(i64_t size) { return mmap_alloc(size); }
@@ -117,6 +173,9 @@ nil_t heap_unmap(raw_p ptr, i64_t size) { mmap_free(ptr, size); }
117173
i64_t heap_gc(nil_t) { return 0; }
118174
nil_t heap_borrow(heap_p heap) { UNUSED(heap); }
119175
nil_t heap_merge(heap_p heap) { UNUSED(heap); }
176+
nil_t heap_flush_foreign(heap_p heap) { UNUSED(heap); }
177+
nil_t heap_push_pending(heap_p heap) { UNUSED(heap); }
178+
nil_t heap_drain_pending(nil_t) {}
120179
memstat_t heap_memstat(nil_t) { return (memstat_t){0}; }
121180

122181
#else
@@ -223,7 +282,7 @@ inline __attribute__((always_inline)) nil_t heap_split_block(heap_p heap, block_
223282
}
224283

225284
// Flush slab caches back to freelists for coalescing
226-
static nil_t heap_flush_slabs(heap_p heap) {
285+
nil_t heap_flush_slabs(heap_p heap) {
227286
i64_t i;
228287
block_p block;
229288

@@ -291,6 +350,14 @@ raw_p __attribute__((hot)) heap_alloc(i64_t size) {
291350
// no free block found for this size, so mmap it directly if it is bigger than pool size or
292351
// add a new pool and split as well
293352
if (UNLIKELY(i == 0)) {
353+
// Try reclaiming memory from pending heaps and foreign blocks before mmap
354+
heap_drain_pending();
355+
heap_flush_foreign(heap);
356+
i = (AVAIL_MASK << order) & heap->avail;
357+
358+
if (i != 0)
359+
goto found;
360+
294361
if (order >= MAX_BLOCK_ORDER) {
295362
LOG_TRACE("Adding pool of size %lld requested size %lld", BSIZEOF(order), size);
296363
size = BSIZEOF(order);
@@ -315,8 +382,10 @@ raw_p __attribute__((hot)) heap_alloc(i64_t size) {
315382

316383
i = MAX_BLOCK_ORDER;
317384
heap_insert_block(heap, block, i);
318-
} else
385+
} else {
386+
found:
319387
i = __builtin_ctzll(i);
388+
}
320389

321390
// remove the block out of list
322391
block = heap->freelist[i];
@@ -377,15 +446,15 @@ __attribute__((hot)) nil_t heap_free(raw_p ptr) {
377446

378447
// Fast path: push to slab cache for small blocks (same heap only)
379448
if (heap != NULL && order >= MIN_BLOCK_ORDER && IS_SLAB_ORDER(order) &&
380-
(heap->id == 0 || block->heap_id == heap->id)) {
449+
(block->heap_id == heap->id)) {
381450
i64_t idx = SLAB_INDEX(order);
382451
if (heap->slabs[idx].count < SLAB_CACHE_SIZE) {
383452
heap->slabs[idx].stack[heap->slabs[idx].count++] = block;
384453
return;
385454
}
386455
}
387456

388-
if (UNLIKELY(heap->id != 0 && block->heap_id != heap->id)) {
457+
if (UNLIKELY(block->heap_id != heap->id)) {
389458
block->next = heap->foreign_blocks;
390459
heap->foreign_blocks = block;
391460
return;
@@ -430,7 +499,7 @@ __attribute__((hot)) raw_p heap_realloc(raw_p ptr, i64_t new_size) {
430499
return ptr;
431500

432501
// grow or block is not in the same heap
433-
if (order > block->order || (heap->id != 0 && block->heap_id != heap->id) || block->backed) {
502+
if (order > block->order || (block->heap_id != heap->id) || block->backed) {
434503
new_ptr = heap_alloc(new_size);
435504

436505
if (new_ptr == NULL) {
@@ -463,6 +532,12 @@ i64_t heap_gc(nil_t) {
463532
block_p block, next;
464533
heap_p h = VM->heap; // Cache heap pointer
465534

535+
// Drain pending heaps from destroyed custom threads
536+
heap_drain_pending();
537+
538+
// Flush foreign blocks into own freelist
539+
heap_flush_foreign(h);
540+
466541
// Flush slab caches to allow coalescing
467542
heap_flush_slabs(h);
468543

@@ -615,16 +690,51 @@ nil_t heap_print_blocks(heap_p heap) {
615690
}
616691
}
617692

693+
nil_t heap_flush_foreign(heap_p heap) {
694+
block_p block, next;
695+
696+
block = heap->foreign_blocks;
697+
while (block != NULL) {
698+
next = block->next;
699+
block->heap_id = heap->id;
700+
heap_insert_block(heap, block, block->order);
701+
block = next;
702+
}
703+
heap->foreign_blocks = NULL;
704+
}
705+
706+
nil_t heap_push_pending(heap_p heap) {
707+
heap->pending_next = __atomic_load_n(&__heap_pending_merge, __ATOMIC_RELAXED);
708+
while (!__atomic_compare_exchange_n(&__heap_pending_merge, &heap->pending_next, heap,
709+
1, __ATOMIC_RELEASE, __ATOMIC_RELAXED))
710+
;
711+
}
712+
713+
nil_t heap_drain_pending(nil_t) {
714+
heap_p pending, next;
715+
716+
pending = __atomic_exchange_n(&__heap_pending_merge, NULL, __ATOMIC_ACQUIRE);
717+
while (pending) {
718+
next = pending->pending_next;
719+
heap_merge(pending);
720+
heap_destroy(pending);
721+
pending = next;
722+
}
723+
}
724+
618725
#endif
619726

620727
// heap_destroy defined after #ifdef blocks to use heap_flush_slabs
621728
nil_t heap_destroy(heap_p heap) {
622729
i64_t i;
623730
block_p block, next;
731+
i64_t heap_id;
624732

625733
if (heap == NULL)
626734
return;
627735

736+
heap_id = heap->id;
737+
628738
LOG_INFO("Destroying heap");
629739

630740
// Flush slab caches first
@@ -653,5 +763,7 @@ nil_t heap_destroy(heap_p heap) {
653763
// munmap heap
654764
mmap_free(heap, sizeof(struct heap_t));
655765

766+
heap_id_release(heap_id);
767+
656768
LOG_DEBUG("Heap destroyed successfully");
657769
}

0 commit comments

Comments
 (0)