Files
cell/source/scheduler.c
2026-02-26 00:56:43 -06:00

1095 lines
31 KiB
C

#include <pthread.h>
#include <sys/time.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdatomic.h>
#include <poll.h>
#include <fcntl.h>
#include "stb_ds.h"
#include "cell.h"
#include "pit_internal.h"
#ifdef _WIN32
#include <windows.h>
#include <winsock2.h>
#endif
typedef struct actor_node {
JSContext *actor;
struct actor_node *next;
} actor_node;
typedef enum {
TIMER_JS,
TIMER_NATIVE_REMOVE,
TIMER_PAUSE,
TIMER_KILL
} timer_type;
typedef struct {
uint64_t execute_at_ns;
JSContext *actor;
uint32_t timer_id;
timer_type type;
uint32_t turn_gen; /* generation at registration time */
} timer_node;
static timer_node *timer_heap = NULL;
// Priority queue indices
#define PQ_READY 0
#define PQ_REFRESHED 1
#define PQ_EXHAUSTED 2
#define PQ_SLOW 3
#define PQ_COUNT 4
// --- 3. The Global Engine State ---
static struct {
pthread_mutex_t lock; // Protects queue, shutdown flag, and timers
pthread_cond_t wake_cond; // Wakes up workers
pthread_cond_t timer_cond; // Wakes up the timer thread
pthread_cond_t main_cond; // Wakes up the main thread
actor_node *q_head[PQ_COUNT], *q_tail[PQ_COUNT]; // worker queues
actor_node *mq_head[PQ_COUNT], *mq_tail[PQ_COUNT]; // main-thread queues
int shutting_down;
int quiescence_enabled; // set after bootstrap, before actor_loop
_Atomic int quiescent_count; // actors idle with no messages and no timers
pthread_t *worker_threads;
int num_workers;
pthread_t timer_thread;
} engine;
static int state_to_pq(int state) {
switch (state) {
case ACTOR_REFRESHED: return PQ_REFRESHED;
case ACTOR_EXHAUSTED: return PQ_EXHAUSTED;
case ACTOR_SLOW: return PQ_SLOW;
default: return PQ_READY;
}
}
static actor_node *dequeue_priority(actor_node *heads[], actor_node *tails[]) {
for (int i = 0; i < PQ_COUNT; i++) {
if (heads[i]) {
actor_node *n = heads[i];
heads[i] = n->next;
if (!heads[i]) tails[i] = NULL;
return n;
}
}
return NULL;
}
static int has_any_work(actor_node *heads[]) {
for (int i = 0; i < PQ_COUNT; i++)
if (heads[i]) return 1;
return 0;
}
static pthread_mutex_t *actors_mutex;
static struct { char *key; JSContext *value; } *actors = NULL;
/* ============================================================
I/O Watch Thread — poll()-based fd monitoring
============================================================ */
static io_watch *g_io_watches = NULL; /* stb_ds dynamic array */
static pthread_mutex_t io_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t io_thread;
static int io_pipe[2] = {-1, -1}; /* self-pipe to wake poll() */
static void io_wake(void) {
char c = 1;
(void)write(io_pipe[1], &c, 1);
}
static void *io_thread_func(void *arg) {
(void)arg;
while (1) {
pthread_mutex_lock(&io_mutex);
if (engine.shutting_down) {
pthread_mutex_unlock(&io_mutex);
return NULL;
}
int n = arrlen(g_io_watches);
/* +1 for the wakeup pipe */
struct pollfd *fds = malloc(sizeof(struct pollfd) * (n + 1));
fds[0].fd = io_pipe[0];
fds[0].events = POLLIN;
for (int i = 0; i < n; i++) {
fds[i + 1].fd = g_io_watches[i].fd;
fds[i + 1].events = g_io_watches[i].events;
}
pthread_mutex_unlock(&io_mutex);
int ready = poll(fds, n + 1, 500); /* 500ms timeout for shutdown check */
if (ready <= 0) {
free(fds);
continue;
}
/* Drain wakeup pipe */
if (fds[0].revents & POLLIN) {
char buf[64];
(void)read(io_pipe[0], buf, sizeof(buf));
}
/* Fire callbacks for ready fds */
pthread_mutex_lock(&io_mutex);
for (int i = n - 1; i >= 0; i--) {
if (i >= arrlen(g_io_watches)) continue;
if (fds[i + 1].revents & g_io_watches[i].events) {
io_watch w = g_io_watches[i];
arrdel(g_io_watches, i); /* one-shot: remove before firing */
pthread_mutex_unlock(&io_mutex);
actor_clock(w.actor, w.callback);
pthread_mutex_lock(&io_mutex);
}
}
pthread_mutex_unlock(&io_mutex);
free(fds);
}
return NULL;
}
void actor_watch(JSContext *actor, int fd, short events, JSValue fn) {
io_watch w = { .fd = fd, .events = events, .actor = actor, .callback = fn };
pthread_mutex_lock(&io_mutex);
arrput(g_io_watches, w);
pthread_mutex_unlock(&io_mutex);
io_wake();
}
void actor_watch_readable(JSContext *actor, int fd, JSValue fn) {
actor_watch(actor, fd, POLLIN, fn);
}
void actor_watch_writable(JSContext *actor, int fd, JSValue fn) {
actor_watch(actor, fd, POLLOUT, fn);
}
void actor_unwatch(JSContext *actor, int fd) {
pthread_mutex_lock(&io_mutex);
for (int i = arrlen(g_io_watches) - 1; i >= 0; i--) {
if (g_io_watches[i].actor == actor && g_io_watches[i].fd == fd) {
arrdel(g_io_watches, i);
}
}
pthread_mutex_unlock(&io_mutex);
io_wake();
}
#define lockless_shdel(NAME, KEY) pthread_mutex_lock(NAME##_mutex); shdel(NAME, KEY); pthread_mutex_unlock(NAME##_mutex);
#define lockless_shlen(NAME) ({ \
pthread_mutex_lock(NAME##_mutex); \
size_t _len = shlen(NAME); \
pthread_mutex_unlock(NAME##_mutex); \
_len; \
})
#define lockless_shgeti(NAME, KEY) ({ \
pthread_mutex_lock(NAME##_mutex); \
int _idx = shgeti(NAME, KEY); \
pthread_mutex_unlock(NAME##_mutex); \
_idx; \
})
#define lockless_shget(NAME, KEY) ({ \
pthread_mutex_lock(NAME##_mutex); \
JSContext *_actor = shget(NAME, KEY); \
pthread_mutex_unlock(NAME##_mutex); \
_actor; \
})
#define lockless_shput_unique(NAME, KEY, VALUE) ({ \
pthread_mutex_lock(NAME##_mutex); \
int _exists = shgeti(NAME, KEY) != -1; \
if (!_exists) shput(NAME, KEY, VALUE); \
pthread_mutex_unlock(NAME##_mutex); \
!_exists; \
})
// Forward declarations
uint32_t actor_remove_cb(JSContext *actor, uint32_t id, uint32_t interval);
void actor_turn(JSContext *actor);
void heap_push(uint64_t when, JSContext *actor, uint32_t timer_id, timer_type type) {
timer_node node = { .execute_at_ns = when, .actor = actor, .timer_id = timer_id, .type = type, .turn_gen = 0 };
if (type == TIMER_PAUSE || type == TIMER_KILL)
node.turn_gen = atomic_load_explicit(&actor->turn_gen, memory_order_relaxed);
arrput(timer_heap, node);
// Bubble up
int i = arrlen(timer_heap) - 1;
while (i > 0) {
int parent = (i - 1) / 2;
if (timer_heap[i].execute_at_ns >= timer_heap[parent].execute_at_ns) break;
timer_node tmp = timer_heap[i];
timer_heap[i] = timer_heap[parent];
timer_heap[parent] = tmp;
i = parent;
}
}
// Helper: Heap Pop
int heap_pop(timer_node *out) {
if (arrlen(timer_heap) == 0) return 0;
*out = timer_heap[0];
timer_node last = arrpop(timer_heap);
if (arrlen(timer_heap) > 0) {
timer_heap[0] = last;
// Bubble down
int i = 0;
int n = arrlen(timer_heap);
while (1) {
int left = 2 * i + 1;
int right = 2 * i + 2;
int smallest = i;
if (left < n && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns)
smallest = left;
if (right < n && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns)
smallest = right;
if (smallest == i) break;
timer_node tmp = timer_heap[i];
timer_heap[i] = timer_heap[smallest];
timer_heap[smallest] = tmp;
i = smallest;
}
}
return 1;
}
void *timer_thread_func(void *arg) {
while (1) {
pthread_mutex_lock(&engine.lock);
if (engine.shutting_down) {
pthread_mutex_unlock(&engine.lock);
return NULL;
}
if (arrlen(timer_heap) == 0) {
pthread_cond_wait(&engine.timer_cond, &engine.lock);
} else {
uint64_t now = cell_ns();
if (timer_heap[0].execute_at_ns <= now) {
// --- TIMER FIRED ---
timer_node t;
heap_pop(&t);
pthread_mutex_unlock(&engine.lock);
if (t.type == TIMER_NATIVE_REMOVE) {
actor_remove_cb(t.actor, t.timer_id, 0);
} else if (t.type == TIMER_PAUSE || t.type == TIMER_KILL) {
/* Only fire if turn_gen still matches (stale timers are ignored) */
uint32_t cur = atomic_load_explicit(&t.actor->turn_gen, memory_order_relaxed);
if (cur == t.turn_gen) {
/* Can't call JS_Log from timer thread — use fprintf */
const char *name = t.actor->name ? t.actor->name : t.actor->id;
if (t.type == TIMER_PAUSE) {
fprintf(stderr, "[slow] %s: pausing (turn exceeded %llums)\n",
name, (unsigned long long)(ACTOR_FAST_TIMER_NS / 1000000));
JS_SetPauseFlag(t.actor, 1);
} else {
fprintf(stderr, "[slow] %s: kill timer fired (turn exceeded %llus)\n",
name, (unsigned long long)(ACTOR_SLOW_TIMER_NS / 1000000000));
t.actor->disrupt = 1;
JS_SetPauseFlag(t.actor, 2);
}
}
} else {
pthread_mutex_lock(t.actor->msg_mutex);
int idx = hmgeti(t.actor->timers, t.timer_id);
if (idx != -1) {
JSValue cb = t.actor->timers[idx].value;
hmdel(t.actor->timers, t.timer_id);
actor_clock(t.actor, cb);
}
pthread_mutex_unlock(t.actor->msg_mutex);
}
continue;
} else {
// --- WAIT FOR DEADLINE ---
struct timespec ts;
uint64_t wait_ns = timer_heap[0].execute_at_ns - now;
// Convert relative wait time to absolute CLOCK_REALTIME
struct timespec now_real;
clock_gettime(CLOCK_REALTIME, &now_real);
uint64_t deadline_real_ns = (uint64_t)now_real.tv_sec * 1000000000ULL +
(uint64_t)now_real.tv_nsec + wait_ns;
ts.tv_sec = deadline_real_ns / 1000000000ULL;
ts.tv_nsec = deadline_real_ns % 1000000000ULL;
pthread_cond_timedwait(&engine.timer_cond, &engine.lock, &ts);
}
}
pthread_mutex_unlock(&engine.lock);
}
return NULL;
}
void enqueue_actor_priority(JSContext *actor) {
actor_node *n = malloc(sizeof(actor_node));
n->actor = actor;
n->next = NULL;
int pq = state_to_pq(actor->state);
pthread_mutex_lock(&engine.lock);
if (actor->main_thread_only) {
if (engine.mq_tail[pq]) engine.mq_tail[pq]->next = n;
else engine.mq_head[pq] = n;
engine.mq_tail[pq] = n;
pthread_cond_signal(&engine.main_cond);
} else {
if (engine.q_tail[pq]) engine.q_tail[pq]->next = n;
else engine.q_head[pq] = n;
engine.q_tail[pq] = n;
pthread_cond_signal(&engine.wake_cond);
}
pthread_mutex_unlock(&engine.lock);
}
void *actor_runner(void *arg) {
while (1) {
pthread_mutex_lock(&engine.lock);
while (!has_any_work(engine.q_head) && !engine.shutting_down) {
pthread_cond_wait(&engine.wake_cond, &engine.lock);
}
if (engine.shutting_down && !has_any_work(engine.q_head)) {
pthread_mutex_unlock(&engine.lock);
break;
}
actor_node *node = dequeue_priority(engine.q_head, engine.q_tail);
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
}
}
return NULL;
}
void actor_initialize(void) {
pthread_mutex_init(&engine.lock, NULL);
pthread_cond_init(&engine.wake_cond, NULL);
pthread_cond_init(&engine.timer_cond, NULL);
pthread_cond_init(&engine.main_cond, NULL);
engine.shutting_down = 0;
for (int i = 0; i < PQ_COUNT; i++) {
engine.q_head[i] = NULL;
engine.q_tail[i] = NULL;
engine.mq_head[i] = NULL;
engine.mq_tail[i] = NULL;
}
actors_mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(actors_mutex, NULL);
// Start Timer Thread
pthread_create(&engine.timer_thread, NULL, timer_thread_func, NULL);
// Start I/O Watch Thread
if (pipe(io_pipe) == 0) {
fcntl(io_pipe[0], F_SETFL, O_NONBLOCK);
fcntl(io_pipe[1], F_SETFL, O_NONBLOCK);
pthread_create(&io_thread, NULL, io_thread_func, NULL);
}
// Start Workers
#ifdef _WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
long n = sysinfo.dwNumberOfProcessors;
#else
long n = sysconf(_SC_NPROCESSORS_ONLN);
#endif
engine.num_workers = (int)n;
engine.worker_threads = malloc(sizeof(pthread_t) * n);
for (int i=0; i < n; i++) {
pthread_create(&engine.worker_threads[i], NULL, actor_runner, NULL);
}
}
void actor_free(JSContext *actor)
{
if (actor->is_quiescent) {
actor->is_quiescent = 0;
atomic_fetch_sub(&engine.quiescent_count, 1);
}
lockless_shdel(actors, actor->id);
// Purge any pending timers referencing this actor from the timer heap
// to prevent the timer thread from accessing freed memory.
{
pthread_mutex_lock(&engine.lock);
int n = arrlen(timer_heap);
int w = 0;
for (int r = 0; r < n; r++) {
if (timer_heap[r].actor != actor)
timer_heap[w++] = timer_heap[r];
}
arrsetlen(timer_heap, w);
for (int i = w / 2 - 1; i >= 0; i--) {
int j = i;
while (1) {
int left = 2 * j + 1, right = 2 * j + 2, smallest = j;
if (left < w && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns)
smallest = left;
if (right < w && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns)
smallest = right;
if (smallest == j) break;
timer_node tmp = timer_heap[j];
timer_heap[j] = timer_heap[smallest];
timer_heap[smallest] = tmp;
j = smallest;
}
}
pthread_mutex_unlock(&engine.lock);
}
/* Remove I/O watches for this actor */
pthread_mutex_lock(&io_mutex);
for (int i = arrlen(g_io_watches) - 1; i >= 0; i--) {
if (g_io_watches[i].actor == actor)
arrdel(g_io_watches, i);
}
pthread_mutex_unlock(&io_mutex);
// Do not go forward with actor destruction until the actor is completely free
pthread_mutex_lock(actor->msg_mutex);
pthread_mutex_lock(actor->mutex);
for (int i = 0; i < hmlen(actor->timers); i++) {
}
hmfree(actor->timers);
/* Free all letters in the queue */
for (int i = 0; i < arrlen(actor->letters); i++) {
if (actor->letters[i].type == LETTER_BLOB) {
blob_destroy(actor->letters[i].blob_data);
} else if (actor->letters[i].type == LETTER_CALLBACK) {
}
}
arrfree(actor->letters);
free(actor->id);
pthread_mutex_t *m = actor->mutex;
pthread_mutex_t *mm = actor->msg_mutex;
JS_FreeContext(actor);
pthread_mutex_unlock(m);
pthread_mutex_destroy(m);
free(m);
pthread_mutex_unlock(mm);
pthread_mutex_destroy(mm);
free(mm);
int actor_count = lockless_shlen(actors);
if (actor_count == 0) {
pthread_mutex_lock(&engine.lock);
engine.shutting_down = 1;
pthread_cond_broadcast(&engine.wake_cond);
pthread_cond_broadcast(&engine.timer_cond);
pthread_cond_broadcast(&engine.main_cond);
pthread_mutex_unlock(&engine.lock);
}
}
int scheduler_actor_count(void) {
return (int)lockless_shlen(actors);
}
void scheduler_enable_quiescence(void) {
engine.quiescence_enabled = 1;
// Check if all actors are already quiescent
int qc = atomic_load(&engine.quiescent_count);
int total = (int)lockless_shlen(actors);
if (qc >= total && total > 0) {
pthread_mutex_lock(&engine.lock);
engine.shutting_down = 1;
pthread_cond_broadcast(&engine.wake_cond);
pthread_cond_broadcast(&engine.timer_cond);
pthread_cond_broadcast(&engine.main_cond);
pthread_mutex_unlock(&engine.lock);
}
}
void exit_handler(void) {
static int already_exiting = 0;
if (already_exiting) return;
already_exiting = 1;
pthread_mutex_lock(&engine.lock);
engine.shutting_down = 1;
pthread_cond_broadcast(&engine.wake_cond);
pthread_cond_broadcast(&engine.timer_cond);
pthread_cond_broadcast(&engine.main_cond);
pthread_mutex_unlock(&engine.lock);
pthread_join(engine.timer_thread, NULL);
/* Shut down I/O thread */
if (io_pipe[1] >= 0) {
io_wake();
pthread_join(io_thread, NULL);
close(io_pipe[0]);
close(io_pipe[1]);
io_pipe[0] = io_pipe[1] = -1;
}
arrfree(g_io_watches);
for (int i=0; i < engine.num_workers; i++) {
pthread_join(engine.worker_threads[i], NULL);
}
free(engine.worker_threads);
pthread_mutex_destroy(&engine.lock);
pthread_cond_destroy(&engine.wake_cond);
pthread_cond_destroy(&engine.timer_cond);
pthread_cond_destroy(&engine.main_cond);
pthread_mutex_destroy(actors_mutex);
free(actors_mutex);
arrfree(timer_heap);
}
int actor_exists(const char *id)
{
int idx = lockless_shgeti(actors, id);
return idx != -1;
}
void set_actor_state(JSContext *actor)
{
if (actor->disrupt) {
actor_free(actor);
return;
}
pthread_mutex_lock(actor->msg_mutex);
switch(actor->state) {
case ACTOR_RUNNING:
case ACTOR_READY:
if (actor->ar)
actor->ar = 0;
break;
case ACTOR_IDLE:
if (arrlen(actor->letters)) {
if (actor->is_quiescent) {
actor->is_quiescent = 0;
atomic_fetch_sub(&engine.quiescent_count, 1);
}
actor->state = ACTOR_READY;
actor->ar = 0;
enqueue_actor_priority(actor);
} else if (!hmlen(actor->timers)) {
// No messages AND no timers
// Only count as quiescent if no $unneeded callback registered
int has_unneeded = !JS_IsNull(actor->unneeded_ref.val);
if (!actor->is_quiescent && actor->id && !has_unneeded) {
actor->is_quiescent = 1;
int qc = atomic_fetch_add(&engine.quiescent_count, 1) + 1;
int total = (int)lockless_shlen(actors);
if (qc >= total && total > 0 && engine.quiescence_enabled) {
pthread_mutex_lock(&engine.lock);
engine.shutting_down = 1;
pthread_cond_broadcast(&engine.wake_cond);
pthread_cond_broadcast(&engine.timer_cond);
pthread_cond_broadcast(&engine.main_cond);
pthread_mutex_unlock(&engine.lock);
}
}
if (!engine.shutting_down) {
// Schedule remove timer
static uint32_t global_timer_id = 1;
uint32_t id = global_timer_id++;
actor->ar = id;
uint64_t now = cell_ns();
uint64_t execute_at = now + (uint64_t)(actor->ar_secs * 1e9);
pthread_mutex_lock(&engine.lock);
heap_push(execute_at, actor, id, TIMER_NATIVE_REMOVE);
if (timer_heap[0].timer_id == id) {
pthread_cond_signal(&engine.timer_cond);
}
pthread_mutex_unlock(&engine.lock);
}
} else {
// Has timers but no letters — waiting, not quiescent
if (actor->is_quiescent) {
actor->is_quiescent = 0;
atomic_fetch_sub(&engine.quiescent_count, 1);
}
}
break;
}
pthread_mutex_unlock(actor->msg_mutex);
}
uint32_t actor_remove_cb(JSContext *actor, uint32_t id, uint32_t interval)
{
pthread_mutex_lock(actor->mutex);
// Check if this timer is still valid (match actor->ar)
if (actor->ar != id && id != 0) { // id 0 means force (optional)
pthread_mutex_unlock(actor->mutex);
return 0;
}
actor->disrupt = 1;
if (!JS_IsNull(actor->unneeded_ref.val)) {
JSValue ret = JS_Call(actor, actor->unneeded_ref.val, JS_NULL, 0, NULL);
uncaught_exception(actor, ret);
}
int should_free = (actor->state == ACTOR_IDLE);
pthread_mutex_unlock(actor->mutex);
if (should_free) actor_free(actor);
return 0;
}
void actor_unneeded(JSContext *actor, JSValue fn, double seconds)
{
if (actor->disrupt) return;
if (!JS_IsFunction(fn)) {
actor->unneeded_ref.val = JS_NULL;
goto END;
}
actor->unneeded_ref.val = fn;
actor->ar_secs = seconds;
END:
if (actor->ar)
actor->ar = 0;
set_actor_state(actor);
}
JSContext *get_actor(char *id)
{
int idx = lockless_shgeti(actors, id);
if (idx == -1) {
return NULL;
}
return lockless_shget(actors, id);
}
void actor_loop()
{
while (!engine.shutting_down) {
pthread_mutex_lock(&engine.lock);
while (!has_any_work(engine.mq_head) && !engine.shutting_down) {
pthread_cond_wait(&engine.main_cond, &engine.lock);
}
if (engine.shutting_down && !has_any_work(engine.mq_head)) {
pthread_mutex_unlock(&engine.lock);
break;
}
actor_node *node = dequeue_priority(engine.mq_head, engine.mq_tail);
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
}
}
}
extern JSRuntime *g_runtime;
JSContext *create_actor(void *wota)
{
JSContext *actor = JS_NewContext(g_runtime);
if (!actor) return NULL;
actor->init_wota = wota;
arrsetcap(actor->letters, 5);
actor->mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(actor->mutex, &attr);
actor->msg_mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(actor->msg_mutex, &attr);
pthread_mutexattr_destroy(&attr);
JS_SetGCScanExternal(actor, actor_gc_scan);
JS_SetHeapMemoryLimit(actor, ACTOR_MEMORY_LIMIT);
/* Lock actor->mutex while initializing JS runtime. */
pthread_mutex_lock(actor->mutex);
script_startup(actor);
set_actor_state(actor);
pthread_mutex_unlock(actor->mutex);
return actor;
}
const char *register_actor(const char *id, JSContext *actor, int mainthread, double ar)
{
actor->main_thread_only = mainthread;
actor->id = strdup(id);
actor->ar_secs = ar;
int added = lockless_shput_unique(actors, actor->id, actor);
if (!added) {
free(actor->id);
return "Actor with given ID already exists.";
}
// Now that actor is in the registry, track its quiescent state
if (actor->state == ACTOR_IDLE && !arrlen(actor->letters)
&& !hmlen(actor->timers) && JS_IsNull(actor->unneeded_ref.val)) {
actor->is_quiescent = 1;
atomic_fetch_add(&engine.quiescent_count, 1);
}
return NULL;
}
const char *send_message(const char *id, void *msg)
{
JSContext *target = get_actor(id);
if (!target) {
blob_destroy((blob *)msg);
return "Could not get actor from id.";
}
letter l;
l.type = LETTER_BLOB;
l.blob_data = (blob *)msg;
pthread_mutex_lock(target->msg_mutex);
arrput(target->letters, l);
pthread_mutex_unlock(target->msg_mutex);
if (target->ar)
target->ar = 0;
set_actor_state(target);
return NULL;
}
void actor_turn(JSContext *actor)
{
pthread_mutex_lock(actor->mutex);
#ifdef ACTOR_TRACE
int prev_state = actor->state;
#endif
actor->state = ACTOR_RUNNING;
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: %d -> RUNNING\n",
actor->name ? actor->name : actor->id, prev_state);
#endif
if (!actor->actor_trace_hook) {
cell_hook gh = cell_trace_gethook();
if (gh) actor->actor_trace_hook = gh;
}
if (actor->actor_trace_hook)
actor->actor_trace_hook(actor, CELL_HOOK_ENTER);
JSValue result;
if (actor->vm_suspended) {
/* RESUME path: continue suspended turn under kill timer only */
g_crash_ctx = actor;
atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed);
JS_SetPauseFlag(actor, 0);
actor->turn_start_ns = cell_ns();
/* Register kill timer only for resume */
pthread_mutex_lock(&engine.lock);
heap_push(actor->turn_start_ns + ACTOR_SLOW_TIMER_NS,
actor, 0, TIMER_KILL);
pthread_cond_signal(&engine.timer_cond);
pthread_mutex_unlock(&engine.lock);
result = JS_ResumeRegisterVM(actor);
actor->vm_suspended = 0;
if (JS_IsSuspended(result)) {
/* Still suspended after kill timer — shouldn't happen, kill it */
fprintf(stderr, "[slow] %s: still suspended after resume, killing\n",
actor->name ? actor->name : actor->id);
actor->disrupt = 1;
goto ENDTURN;
}
if (JS_IsException(result)) {
if (!uncaught_exception(actor, result))
actor->disrupt = 1;
}
actor->slow_strikes++;
JS_Log(actor, "slow", "%s: slow strike %d/%d",
actor->name ? actor->name : actor->id,
actor->slow_strikes, ACTOR_SLOW_STRIKES_MAX);
if (actor->slow_strikes >= ACTOR_SLOW_STRIKES_MAX) {
JS_Log(actor, "slow", "%s: killed after %d consecutive slow turns",
actor->name ? actor->name : actor->id, actor->slow_strikes);
actor->disrupt = 1;
}
goto ENDTURN;
}
/* NORMAL path: pop a message and execute */
pthread_mutex_lock(actor->msg_mutex);
int pending = arrlen(actor->letters);
if (!pending) {
pthread_mutex_unlock(actor->msg_mutex);
goto ENDTURN;
}
letter l = actor->letters[0];
arrdel(actor->letters, 0);
pthread_mutex_unlock(actor->msg_mutex);
atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed);
JS_SetPauseFlag(actor, 0);
actor->turn_start_ns = cell_ns();
/* Register both pause and kill timers */
pthread_mutex_lock(&engine.lock);
heap_push(actor->turn_start_ns + ACTOR_FAST_TIMER_NS,
actor, 0, TIMER_PAUSE);
heap_push(actor->turn_start_ns + ACTOR_SLOW_TIMER_NS + ACTOR_FAST_TIMER_NS,
actor, 0, TIMER_KILL);
pthread_cond_signal(&engine.timer_cond);
pthread_mutex_unlock(&engine.lock);
g_crash_ctx = actor;
if (l.type == LETTER_BLOB) {
size_t size = blob_length(l.blob_data) / 8;
JSValue arg = js_new_blob_stoned_copy(actor,
(void *)blob_data(l.blob_data), size);
blob_destroy(l.blob_data);
result = JS_Call(actor, actor->message_handle_ref.val,
JS_NULL, 1, &arg);
if (JS_IsSuspended(result)) {
actor->vm_suspended = 1;
actor->state = ACTOR_SLOW;
goto ENDTURN_SLOW;
}
if (!uncaught_exception(actor, result))
actor->disrupt = 1;
} else if (l.type == LETTER_CALLBACK) {
result = JS_Call(actor, l.callback, JS_NULL, 0, NULL);
if (JS_IsSuspended(result)) {
actor->vm_suspended = 1;
actor->state = ACTOR_SLOW;
goto ENDTURN_SLOW;
}
if (!uncaught_exception(actor, result))
actor->disrupt = 1;
}
if (actor->disrupt) goto ENDTURN;
actor->slow_strikes = 0; /* completed within fast timer */
ENDTURN:
g_crash_ctx = NULL;
/* Invalidate any outstanding pause/kill timers for this turn */
atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed);
actor->state = ACTOR_IDLE;
if (actor->actor_trace_hook)
actor->actor_trace_hook(actor, CELL_HOOK_EXIT);
if (actor->disrupt) {
pthread_mutex_unlock(actor->mutex);
actor_free(actor);
return;
}
set_actor_state(actor);
pthread_mutex_unlock(actor->mutex);
return;
ENDTURN_SLOW:
g_crash_ctx = NULL;
/* VM suspended mid-turn — can't call JS_Log, use fprintf.
Print stack trace while frames are still intact. */
fprintf(stderr, "[slow] %s: suspended mid-turn, entering slow queue (strike %d/%d)\n",
actor->name ? actor->name : actor->id,
actor->slow_strikes + 1, ACTOR_SLOW_STRIKES_MAX);
if (actor->actor_trace_hook)
actor->actor_trace_hook(actor, CELL_HOOK_EXIT);
enqueue_actor_priority(actor);
pthread_mutex_unlock(actor->mutex);
}
/* GC callback: scan actor's letters and timers so the copying GC
can relocate JSValues stored in C-side data structures. */
void actor_gc_scan(JSContext *ctx,
uint8_t *from_base, uint8_t *from_end,
uint8_t *to_base, uint8_t **to_free, uint8_t *to_end)
{
/* Lock msg_mutex to synchronize with the timer thread, which reads
timers and writes letters under the same lock. */
if (ctx->msg_mutex)
pthread_mutex_lock(ctx->msg_mutex);
for (int i = 0; i < arrlen(ctx->letters); i++) {
if (ctx->letters[i].type == LETTER_CALLBACK) {
ctx->letters[i].callback = gc_copy_value(ctx,
ctx->letters[i].callback,
from_base, from_end, to_base, to_free, to_end);
}
}
for (int i = 0; i < hmlen(ctx->timers); i++) {
ctx->timers[i].value = gc_copy_value(ctx,
ctx->timers[i].value,
from_base, from_end, to_base, to_free, to_end);
}
if (ctx->msg_mutex)
pthread_mutex_unlock(ctx->msg_mutex);
/* Scan I/O watch callbacks belonging to this actor */
pthread_mutex_lock(&io_mutex);
for (int i = 0; i < arrlen(g_io_watches); i++) {
if (g_io_watches[i].actor == ctx) {
g_io_watches[i].callback = gc_copy_value(ctx,
g_io_watches[i].callback,
from_base, from_end, to_base, to_free, to_end);
}
}
pthread_mutex_unlock(&io_mutex);
}
void actor_clock(JSContext *actor, JSValue fn)
{
letter l;
l.type = LETTER_CALLBACK;
l.callback = fn;
pthread_mutex_lock(actor->msg_mutex);
arrput(actor->letters, l);
pthread_mutex_unlock(actor->msg_mutex);
set_actor_state(actor);
}
uint32_t actor_delay(JSContext *actor, JSValue fn, double seconds)
{
pthread_mutex_lock(actor->msg_mutex);
static uint32_t global_timer_id = 1;
uint32_t id = global_timer_id++;
JSValue cb = fn;
hmput(actor->timers, id, cb);
uint64_t now = cell_ns();
uint64_t execute_at = now + (uint64_t)(seconds * 1e9);
pthread_mutex_lock(&engine.lock);
heap_push(execute_at, actor, id, TIMER_JS);
if (timer_heap[0].timer_id == id) {
pthread_cond_signal(&engine.timer_cond);
}
pthread_mutex_unlock(&engine.lock);
pthread_mutex_unlock(actor->msg_mutex);
return id;
}
JSValue actor_remove_timer(JSContext *actor, uint32_t timer_id)
{
JSValue cb = JS_NULL;
pthread_mutex_lock(actor->msg_mutex);
int id = hmgeti(actor->timers, timer_id);
if (id != -1) {
cb = actor->timers[id].value;
hmdel(actor->timers, timer_id);
}
pthread_mutex_unlock(actor->msg_mutex);
// Note: We don't remove from heap, it will misfire safely
return cb;
}
int JS_ActorExists(const char *actor_id)
{
return actor_exists(actor_id);
}
const char *JS_SendMessage(JSContext *ctx, WotaBuffer *wb)
{
if (!wb || !wb->data || wb->size == 0)
return "Empty WOTA buffer";
/* Wrap the caller's payload in the engine protocol envelope:
{type: "user", data: <payload>}
The header takes ~6 words; pre-allocate enough for header + payload. */
WotaBuffer envelope;
wota_buffer_init(&envelope, wb->size + 8);
wota_write_record(&envelope, 2);
wota_write_text(&envelope, "type");
wota_write_text(&envelope, "user");
wota_write_text(&envelope, "data");
/* Append the caller's pre-encoded WOTA payload words directly. */
size_t need = envelope.size + wb->size;
if (need > envelope.capacity) {
size_t new_cap = envelope.capacity ? envelope.capacity * 2 : 8;
while (new_cap < need) new_cap *= 2;
envelope.data = realloc(envelope.data, new_cap * sizeof(uint64_t));
envelope.capacity = new_cap;
}
memcpy(envelope.data + envelope.size, wb->data,
wb->size * sizeof(uint64_t));
envelope.size += wb->size;
size_t byte_len = envelope.size * sizeof(uint64_t);
blob *msg = blob_new(byte_len * 8);
if (!msg) {
wota_buffer_free(&envelope);
return "Could not allocate blob";
}
blob_write_bytes(msg, envelope.data, byte_len);
blob_make_stone(msg);
wota_buffer_free(&envelope);
const char *err = send_message(ctx->id, msg);
if (!err) {
/* Success — send_message took ownership of the blob.
Free the WotaBuffer internals since we consumed them. */
wota_buffer_free(wb);
}
/* On failure, send_message already destroyed the blob.
Caller still owns wb and must free it. */
return err;
}