Files
cell/source/scheduler_threaded.c
2025-12-06 14:54:00 -06:00

685 lines
19 KiB
C

#include <pthread.h>
#include <sys/time.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdatomic.h>
#include "stb_ds.h"
#include "cell.h"
#include "cell_internal.h"
#ifdef _WIN32
#include <windows.h>
#endif
typedef struct actor_node {
cell_rt *actor;
struct actor_node *next;
} actor_node;
typedef enum {
TIMER_JS,
TIMER_NATIVE_REMOVE
} timer_type;
typedef struct {
uint64_t execute_at_ns;
cell_rt *actor;
uint32_t timer_id;
timer_type type;
} timer_node;
static timer_node *timer_heap = NULL;
// --- 3. The Global Engine State ---
static struct {
pthread_mutex_t lock; // Protects queue, shutdown flag, and timers
pthread_cond_t wake_cond; // Wakes up workers
pthread_cond_t timer_cond; // Wakes up the timer thread
pthread_cond_t main_cond; // Wakes up the main thread
actor_node *head; // Ready Queue Head
actor_node *tail; // Ready Queue Tail
actor_node *main_head; // Main Thread Queue Head
actor_node *main_tail; // Main Thread Queue Tail
int shutting_down;
pthread_t *worker_threads;
int num_workers;
pthread_t timer_thread;
} engine;
static pthread_mutex_t *actors_mutex;
static struct { char *key; cell_rt *value; } *actors = NULL;
#define lockless_shdel(NAME, KEY) pthread_mutex_lock(NAME##_mutex); shdel(NAME, KEY); pthread_mutex_unlock(NAME##_mutex);
#define lockless_shlen(NAME) ({ \
pthread_mutex_lock(NAME##_mutex); \
size_t _len = shlen(NAME); \
pthread_mutex_unlock(NAME##_mutex); \
_len; \
})
#define lockless_shgeti(NAME, KEY) ({ \
pthread_mutex_lock(NAME##_mutex); \
int _idx = shgeti(NAME, KEY); \
pthread_mutex_unlock(NAME##_mutex); \
_idx; \
})
#define lockless_shget(NAME, KEY) ({ \
pthread_mutex_lock(NAME##_mutex); \
cell_rt *_actor = shget(NAME, KEY); \
pthread_mutex_unlock(NAME##_mutex); \
_actor; \
})
#define lockless_shput_unique(NAME, KEY, VALUE) ({ \
pthread_mutex_lock(NAME##_mutex); \
int _exists = shgeti(NAME, KEY) != -1; \
if (!_exists) shput(NAME, KEY, VALUE); \
pthread_mutex_unlock(NAME##_mutex); \
!_exists; \
})
// Forward declarations
uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval);
void actor_turn(cell_rt *actor);
void heap_push(uint64_t when, cell_rt *actor, uint32_t timer_id, timer_type type) {
timer_node node = { .execute_at_ns = when, .actor = actor, .timer_id = timer_id, .type = type };
arrput(timer_heap, node);
// Bubble up
int i = arrlen(timer_heap) - 1;
while (i > 0) {
int parent = (i - 1) / 2;
if (timer_heap[i].execute_at_ns >= timer_heap[parent].execute_at_ns) break;
timer_node tmp = timer_heap[i];
timer_heap[i] = timer_heap[parent];
timer_heap[parent] = tmp;
i = parent;
}
}
// Helper: Heap Pop
int heap_pop(timer_node *out) {
if (arrlen(timer_heap) == 0) return 0;
*out = timer_heap[0];
timer_node last = arrpop(timer_heap);
if (arrlen(timer_heap) > 0) {
timer_heap[0] = last;
// Bubble down
int i = 0;
int n = arrlen(timer_heap);
while (1) {
int left = 2 * i + 1;
int right = 2 * i + 2;
int smallest = i;
if (left < n && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns)
smallest = left;
if (right < n && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns)
smallest = right;
if (smallest == i) break;
timer_node tmp = timer_heap[i];
timer_heap[i] = timer_heap[smallest];
timer_heap[smallest] = tmp;
i = smallest;
}
}
return 1;
}
void *timer_thread_func(void *arg) {
while (1) {
pthread_mutex_lock(&engine.lock);
if (engine.shutting_down) {
pthread_mutex_unlock(&engine.lock);
return NULL;
}
if (arrlen(timer_heap) == 0) {
pthread_cond_wait(&engine.timer_cond, &engine.lock);
} else {
uint64_t now = cell_ns();
if (timer_heap[0].execute_at_ns <= now) {
// --- TIMER FIRED ---
timer_node t;
heap_pop(&t);
pthread_mutex_unlock(&engine.lock);
if (t.type == TIMER_NATIVE_REMOVE) {
// Execute native remove callback
actor_remove_cb(t.actor, t.timer_id, 0);
} else {
// Inject event into Actor
pthread_mutex_lock(t.actor->msg_mutex);
int idx = hmgeti(t.actor->timers, t.timer_id);
if (idx != -1) {
JSValue cb = t.actor->timers[idx].value;
hmdel(t.actor->timers, t.timer_id);
actor_clock(t.actor, cb);
JS_FreeValue(t.actor->context, cb);
}
pthread_mutex_unlock(t.actor->msg_mutex);
}
// Loop immediately to check for other expired timers
continue;
} else {
// --- WAIT FOR DEADLINE ---
struct timespec ts;
uint64_t ns = timer_heap[0].execute_at_ns;
ts.tv_sec = ns / 1000000000ULL;
ts.tv_nsec = ns % 1000000000ULL;
pthread_cond_timedwait(&engine.timer_cond, &engine.lock, &ts);
}
}
pthread_mutex_unlock(&engine.lock);
}
return NULL;
}
void *actor_runner(void *arg) {
while (1) {
pthread_mutex_lock(&engine.lock);
// Wait while queue is empty AND not shutting down
while (engine.head == NULL && !engine.shutting_down) {
pthread_cond_wait(&engine.wake_cond, &engine.lock);
}
if (engine.shutting_down && engine.head == NULL) {
pthread_mutex_unlock(&engine.lock);
break; // Exit thread
}
// Pop from Linked List
actor_node *node = engine.head;
engine.head = node->next;
if (engine.head == NULL) engine.tail = NULL;
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
}
}
return NULL;
}
void actor_initialize(void) {
pthread_mutex_init(&engine.lock, NULL);
pthread_cond_init(&engine.wake_cond, NULL);
pthread_cond_init(&engine.timer_cond, NULL);
pthread_cond_init(&engine.main_cond, NULL);
engine.shutting_down = 0;
engine.head = NULL;
engine.tail = NULL;
engine.main_head = NULL;
engine.main_tail = NULL;
actors_mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(actors_mutex, NULL);
// Start Timer Thread
pthread_create(&engine.timer_thread, NULL, timer_thread_func, NULL);
// Start Workers
#ifdef _WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
long n = sysinfo.dwNumberOfProcessors;
#else
long n = sysconf(_SC_NPROCESSORS_ONLN);
#endif
engine.num_workers = (int)n;
engine.worker_threads = malloc(sizeof(pthread_t) * n);
for (int i=0; i < n; i++) {
pthread_create(&engine.worker_threads[i], NULL, actor_runner, NULL);
}
}
void actor_free(cell_rt *actor)
{
lockless_shdel(actors, actor->id);
// Note: Removing from ready queue is hard with a singly linked list.
// We assume actor_turn handles disrupted/freed actors gracefully or they run once more.
// The old code did lockless_rm(ready_queue, actor), which was O(N).
// Here we rely on the actor->disrupt flag checked in actor_turn.
// Do not go forward with actor destruction until the actor is completely free
pthread_mutex_lock(actor->msg_mutex);
pthread_mutex_lock(actor->mutex);
JSContext *js = actor->context;
JS_FreeValue(js, actor->idx_buffer);
JS_FreeValue(js, actor->message_handle);
JS_FreeValue(js, actor->on_exception);
JS_FreeValue(js, actor->unneeded);
JS_FreeAtom(js, actor->actor_sym);
for (int i = 0; i < hmlen(actor->timers); i++) {
JS_FreeValue(js, actor->timers[i].value);
}
hmfree(actor->timers);
/* Free all letters in the queue */
for (int i = 0; i < arrlen(actor->letters); i++) {
if (actor->letters[i].type == LETTER_BLOB) {
blob_destroy(actor->letters[i].blob_data);
} else if (actor->letters[i].type == LETTER_CALLBACK) {
JS_FreeValue(js, actor->letters[i].callback);
}
}
arrfree(actor->letters);
JSRuntime *rt = JS_GetRuntime(js);
JS_SetInterruptHandler(rt, NULL, NULL);
JS_FreeContext(js);
JS_FreeRuntime(rt);
free(actor->id);
pthread_mutex_unlock(actor->mutex);
pthread_mutex_destroy(actor->mutex);
free(actor->mutex);
pthread_mutex_unlock(actor->msg_mutex);
pthread_mutex_destroy(actor->msg_mutex);
free(actor->msg_mutex);
#ifdef HAVE_MIMALLOC
mi_heap_destroy(actor->heap);
#endif
free(actor);
int actor_count = lockless_shlen(actors);
if (actor_count == 0) exit(0);
}
void exit_handler(void) {
pthread_mutex_lock(&engine.lock);
engine.shutting_down = 1;
pthread_cond_broadcast(&engine.wake_cond);
pthread_cond_broadcast(&engine.timer_cond);
pthread_cond_broadcast(&engine.main_cond);
pthread_mutex_unlock(&engine.lock);
pthread_join(engine.timer_thread, NULL);
for (int i=0; i < engine.num_workers; i++) {
pthread_join(engine.worker_threads[i], NULL);
}
free(engine.worker_threads);
pthread_mutex_destroy(&engine.lock);
pthread_cond_destroy(&engine.wake_cond);
pthread_cond_destroy(&engine.timer_cond);
pthread_cond_destroy(&engine.main_cond);
pthread_mutex_destroy(actors_mutex);
free(actors_mutex);
arrfree(timer_heap);
exit(0);
}
int actor_exists(const char *id)
{
int idx = lockless_shgeti(actors, id);
return idx != -1;
}
void set_actor_state(cell_rt *actor)
{
if (actor->disrupt) {
actor_free(actor);
return;
}
pthread_mutex_lock(actor->msg_mutex);
switch(actor->state) {
case ACTOR_RUNNING:
case ACTOR_READY:
if (actor->ar)
actor->ar = 0;
break;
case ACTOR_IDLE:
if (arrlen(actor->letters)) {
actor->state = ACTOR_READY;
actor->ar = 0;
actor_node *n = malloc(sizeof(actor_node));
n->actor = actor;
n->next = NULL;
pthread_mutex_lock(&engine.lock);
if (actor->main_thread_only) {
if (engine.main_tail) {
engine.main_tail->next = n;
} else {
engine.main_head = n;
}
engine.main_tail = n;
pthread_cond_signal(&engine.main_cond);
} else {
if (engine.tail) {
engine.tail->next = n;
} else {
engine.head = n;
}
engine.tail = n;
pthread_cond_signal(&engine.wake_cond);
}
pthread_mutex_unlock(&engine.lock);
} else if (!arrlen(actor->letters) && !hmlen(actor->timers)) {
// Schedule remove timer
static uint32_t global_timer_id = 1;
uint32_t id = global_timer_id++;
actor->ar = id;
uint64_t now = cell_ns();
uint64_t execute_at = now + (uint64_t)(actor->ar_secs * 1e9);
pthread_mutex_lock(&engine.lock);
heap_push(execute_at, actor, id, TIMER_NATIVE_REMOVE);
if (timer_heap[0].timer_id == id) {
pthread_cond_signal(&engine.timer_cond);
}
pthread_mutex_unlock(&engine.lock);
}
break;
}
pthread_mutex_unlock(actor->msg_mutex);
}
uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval)
{
pthread_mutex_lock(actor->mutex);
// Check if this timer is still valid (match actor->ar)
if (actor->ar != id && id != 0) { // id 0 means force (optional)
pthread_mutex_unlock(actor->mutex);
return 0;
}
actor->disrupt = 1;
if (!JS_IsNull(actor->unneeded)) {
JSValue ret = JS_Call(actor->context, actor->unneeded, JS_NULL, 0, NULL);
uncaught_exception(actor->context, ret);
}
int should_free = (actor->state == ACTOR_IDLE);
pthread_mutex_unlock(actor->mutex);
if (should_free) actor_free(actor);
return 0;
}
void actor_unneeded(cell_rt *actor, JSValue fn, double seconds)
{
if (actor->disrupt) return;
JS_FreeValue(actor->context, actor->unneeded);
if (!JS_IsFunction(actor->context, fn)) {
actor->unneeded = JS_NULL;
goto END;
}
actor->unneeded = JS_DupValue(actor->context, fn);
actor->ar_secs = seconds;
END:
if (actor->ar)
actor->ar = 0;
set_actor_state(actor);
}
cell_rt *get_actor(char *id)
{
int idx = lockless_shgeti(actors, id);
if (idx == -1) {
return NULL;
}
return lockless_shget(actors, id);
}
void actor_loop()
{
while (!engine.shutting_down) { // Direct read safe enough here or use lock
pthread_mutex_lock(&engine.lock);
while (engine.main_head == NULL && !engine.shutting_down) {
pthread_cond_wait(&engine.main_cond, &engine.lock);
}
if (engine.shutting_down && engine.main_head == NULL) {
pthread_mutex_unlock(&engine.lock);
break;
}
actor_node *node = engine.main_head;
engine.main_head = node->next;
if (engine.main_head == NULL) engine.main_tail = NULL;
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
}
}
}
cell_rt *create_actor(void *wota)
{
cell_rt *actor = calloc(sizeof(*actor), 1);
#ifdef HAVE_MIMALLOC
actor->heap = mi_heap_new();
#endif
actor->init_wota = wota;
actor->idx_buffer = JS_NULL;
actor->message_handle = JS_NULL;
actor->unneeded = JS_NULL;
actor->on_exception = JS_NULL;
actor->actor_sym = JS_ATOM_NULL;
arrsetcap(actor->letters, 5);
actor->mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(actor->mutex, &attr);
actor->msg_mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(actor->msg_mutex, &attr); // msg_mutex can be recursive too to be safe
pthread_mutexattr_destroy(&attr);
/* Lock actor->mutex while initializing JS runtime. */
pthread_mutex_lock(actor->mutex);
script_startup(actor);
set_actor_state(actor);
pthread_mutex_unlock(actor->mutex);
return actor;
}
const char *register_actor(const char *id, cell_rt *actor, int mainthread, double ar)
{
actor->main_thread_only = mainthread;
actor->id = strdup(id);
actor->ar_secs = ar;
int added = lockless_shput_unique(actors, id, actor);
if (!added) {
free(actor->id);
return "Actor with given ID already exists.";
}
return NULL;
}
int actor_interrupt_cb(JSRuntime *rt, cell_rt *crt)
{
// Locking engine.lock for shutting_down might be too expensive for interrupt?
// Check atomic-like access or just access it.
// int s; pthread_mutex_lock(&engine.lock); s = engine.shutting_down; pthread_mutex_unlock(&engine.lock);
// But engine.shutting_down is int, atomic read on x86/arm usually ok.
return engine.shutting_down || crt->disrupt;
}
const char *send_message(const char *id, void *msg)
{
cell_rt *target = get_actor(id);
if (!target) {
blob_destroy((blob *)msg);
return "Could not get actor from id.";
}
letter l;
l.type = LETTER_BLOB;
l.blob_data = (blob *)msg;
pthread_mutex_lock(target->msg_mutex);
arrput(target->letters, l);
pthread_mutex_unlock(target->msg_mutex);
if (target->ar)
target->ar = 0;
set_actor_state(target);
return NULL;
}
void actor_turn(cell_rt *actor)
{
pthread_mutex_lock(actor->mutex);
#ifdef TRACY_ENABLE
int entered = 0;
if (tracy_profiling_enabled && TracyCIsConnected) {
TracyCFiberEnter(actor->name);
entered = 1;
}
#endif
actor->state = ACTOR_RUNNING;
TAKETURN:
pthread_mutex_lock(actor->msg_mutex);
JSValue result;
if (!arrlen(actor->letters)) {
pthread_mutex_unlock(actor->msg_mutex);
goto ENDTURN;
}
letter l = actor->letters[0];
arrdel(actor->letters, 0); // O(N) but we kept array as requested
pthread_mutex_unlock(actor->msg_mutex);
if (l.type == LETTER_BLOB) {
// Create a JS blob from the C blob
size_t size = blob_length(l.blob_data) / 8; // Convert bits to bytes
JSValue arg = js_new_blob_stoned_copy(actor->context, (void*)blob_data(l.blob_data), size);
blob_destroy(l.blob_data);
result = JS_Call(actor->context, actor->message_handle, JS_NULL, 1, &arg);
uncaught_exception(actor->context, result);
JS_FreeValue(actor->context, arg);
} else if (l.type == LETTER_CALLBACK) {
result = JS_Call(actor->context, l.callback, JS_NULL, 0, NULL);
uncaught_exception(actor->context, result);
JS_FreeValue(actor->context, l.callback);
}
if (actor->disrupt) goto ENDTURN;
// Check if anyone else is waiting?
// In the new system, checking the global queue is expensive (lock).
// And "someone else waiting" logic was to yield.
// With threads, we don't need to yield as much, let the OS schedule.
// But we might want to prevent one actor hogging the worker?
// For now, remove the yield check or implement it with try_lock or simple check.
// int someone_else_waiting = (lockless_arrlen(ready_queue) > 0);
// We'll just remove the optimization/yield for now to simplify.
// if (!someone_else_waiting) goto TAKETURN;
ENDTURN:
actor->state = ACTOR_IDLE;
#ifdef TRACY_ENABLE
if (tracy_profiling_enabled && entered)
TracyCFiberLeave(actor->name);
#endif
set_actor_state(actor);
pthread_mutex_unlock(actor->mutex);
}
void actor_clock(cell_rt *actor, JSValue fn)
{
letter l;
l.type = LETTER_CALLBACK;
l.callback = JS_DupValue(actor->context, fn);
pthread_mutex_lock(actor->msg_mutex);
arrput(actor->letters, l);
pthread_mutex_unlock(actor->msg_mutex);
set_actor_state(actor);
}
uint32_t actor_delay(cell_rt *actor, JSValue fn, double seconds)
{
pthread_mutex_lock(actor->msg_mutex);
static uint32_t global_timer_id = 1;
uint32_t id = global_timer_id++;
JSValue cb = JS_DupValue(actor->context, fn);
hmput(actor->timers, id, cb);
uint64_t now = cell_ns();
uint64_t execute_at = now + (uint64_t)(seconds * 1e9);
pthread_mutex_lock(&engine.lock);
heap_push(execute_at, actor, id, TIMER_JS);
if (timer_heap[0].timer_id == id) {
pthread_cond_signal(&engine.timer_cond);
}
pthread_mutex_unlock(&engine.lock);
pthread_mutex_unlock(actor->msg_mutex);
return id;
}
JSValue actor_remove_timer(cell_rt *actor, uint32_t timer_id)
{
JSValue cb = JS_NULL;
pthread_mutex_lock(actor->msg_mutex);
int id = hmgeti(actor->timers, timer_id);
if (id != -1) {
cb = actor->timers[id].value;
hmdel(actor->timers, timer_id);
}
pthread_mutex_unlock(actor->msg_mutex);
// Note: We don't remove from heap, it will misfire safely
return cb;
}