#include #include #include #include #include #include #include #include #include "stb_ds.h" #include "cell.h" #include "cell_internal.h" #ifdef _WIN32 #include #endif typedef struct actor_node { cell_rt *actor; struct actor_node *next; } actor_node; typedef enum { TIMER_JS, TIMER_NATIVE_REMOVE, TIMER_PAUSE, TIMER_KILL } timer_type; typedef struct { uint64_t execute_at_ns; cell_rt *actor; uint32_t timer_id; timer_type type; uint32_t turn_gen; /* generation at registration time */ } timer_node; static timer_node *timer_heap = NULL; // Priority queue indices #define PQ_READY 0 #define PQ_REFRESHED 1 #define PQ_EXHAUSTED 2 #define PQ_SLOW 3 #define PQ_COUNT 4 // --- 3. The Global Engine State --- static struct { pthread_mutex_t lock; // Protects queue, shutdown flag, and timers pthread_cond_t wake_cond; // Wakes up workers pthread_cond_t timer_cond; // Wakes up the timer thread pthread_cond_t main_cond; // Wakes up the main thread actor_node *q_head[PQ_COUNT], *q_tail[PQ_COUNT]; // worker queues actor_node *mq_head[PQ_COUNT], *mq_tail[PQ_COUNT]; // main-thread queues int shutting_down; int quiescence_enabled; // set after bootstrap, before actor_loop _Atomic int quiescent_count; // actors idle with no messages and no timers pthread_t *worker_threads; int num_workers; pthread_t timer_thread; } engine; static int state_to_pq(int state) { switch (state) { case ACTOR_REFRESHED: return PQ_REFRESHED; case ACTOR_EXHAUSTED: return PQ_EXHAUSTED; case ACTOR_SLOW: return PQ_SLOW; default: return PQ_READY; } } static actor_node *dequeue_priority(actor_node *heads[], actor_node *tails[]) { for (int i = 0; i < PQ_COUNT; i++) { if (heads[i]) { actor_node *n = heads[i]; heads[i] = n->next; if (!heads[i]) tails[i] = NULL; return n; } } return NULL; } static int has_any_work(actor_node *heads[]) { for (int i = 0; i < PQ_COUNT; i++) if (heads[i]) return 1; return 0; } static pthread_mutex_t *actors_mutex; static struct { char *key; cell_rt *value; } *actors = NULL; #define lockless_shdel(NAME, KEY) pthread_mutex_lock(NAME##_mutex); shdel(NAME, KEY); pthread_mutex_unlock(NAME##_mutex); #define lockless_shlen(NAME) ({ \ pthread_mutex_lock(NAME##_mutex); \ size_t _len = shlen(NAME); \ pthread_mutex_unlock(NAME##_mutex); \ _len; \ }) #define lockless_shgeti(NAME, KEY) ({ \ pthread_mutex_lock(NAME##_mutex); \ int _idx = shgeti(NAME, KEY); \ pthread_mutex_unlock(NAME##_mutex); \ _idx; \ }) #define lockless_shget(NAME, KEY) ({ \ pthread_mutex_lock(NAME##_mutex); \ cell_rt *_actor = shget(NAME, KEY); \ pthread_mutex_unlock(NAME##_mutex); \ _actor; \ }) #define lockless_shput_unique(NAME, KEY, VALUE) ({ \ pthread_mutex_lock(NAME##_mutex); \ int _exists = shgeti(NAME, KEY) != -1; \ if (!_exists) shput(NAME, KEY, VALUE); \ pthread_mutex_unlock(NAME##_mutex); \ !_exists; \ }) // Forward declarations uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval); void actor_turn(cell_rt *actor); void heap_push(uint64_t when, cell_rt *actor, uint32_t timer_id, timer_type type) { timer_node node = { .execute_at_ns = when, .actor = actor, .timer_id = timer_id, .type = type, .turn_gen = 0 }; if (type == TIMER_PAUSE || type == TIMER_KILL) node.turn_gen = atomic_load_explicit(&actor->turn_gen, memory_order_relaxed); arrput(timer_heap, node); // Bubble up int i = arrlen(timer_heap) - 1; while (i > 0) { int parent = (i - 1) / 2; if (timer_heap[i].execute_at_ns >= timer_heap[parent].execute_at_ns) break; timer_node tmp = timer_heap[i]; timer_heap[i] = timer_heap[parent]; timer_heap[parent] = tmp; i = parent; } } // Helper: Heap Pop int heap_pop(timer_node *out) { if (arrlen(timer_heap) == 0) return 0; *out = timer_heap[0]; timer_node last = arrpop(timer_heap); if (arrlen(timer_heap) > 0) { timer_heap[0] = last; // Bubble down int i = 0; int n = arrlen(timer_heap); while (1) { int left = 2 * i + 1; int right = 2 * i + 2; int smallest = i; if (left < n && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns) smallest = left; if (right < n && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns) smallest = right; if (smallest == i) break; timer_node tmp = timer_heap[i]; timer_heap[i] = timer_heap[smallest]; timer_heap[smallest] = tmp; i = smallest; } } return 1; } void *timer_thread_func(void *arg) { while (1) { pthread_mutex_lock(&engine.lock); if (engine.shutting_down) { pthread_mutex_unlock(&engine.lock); return NULL; } if (arrlen(timer_heap) == 0) { pthread_cond_wait(&engine.timer_cond, &engine.lock); } else { uint64_t now = cell_ns(); if (timer_heap[0].execute_at_ns <= now) { // --- TIMER FIRED --- timer_node t; heap_pop(&t); pthread_mutex_unlock(&engine.lock); if (t.type == TIMER_NATIVE_REMOVE) { actor_remove_cb(t.actor, t.timer_id, 0); } else if (t.type == TIMER_PAUSE || t.type == TIMER_KILL) { /* Only fire if turn_gen still matches (stale timers are ignored) */ uint32_t cur = atomic_load_explicit(&t.actor->turn_gen, memory_order_relaxed); if (cur == t.turn_gen) { if (t.type == TIMER_PAUSE) { JS_SetPauseFlag(t.actor->context, 1); } else { t.actor->disrupt = 1; JS_SetPauseFlag(t.actor->context, 2); } } } else { pthread_mutex_lock(t.actor->msg_mutex); int idx = hmgeti(t.actor->timers, t.timer_id); if (idx != -1) { JSValue cb = t.actor->timers[idx].value; hmdel(t.actor->timers, t.timer_id); actor_clock(t.actor, cb); JS_FreeValue(t.actor->context, cb); } pthread_mutex_unlock(t.actor->msg_mutex); } continue; } else { // --- WAIT FOR DEADLINE --- struct timespec ts; uint64_t wait_ns = timer_heap[0].execute_at_ns - now; // Convert relative wait time to absolute CLOCK_REALTIME struct timespec now_real; clock_gettime(CLOCK_REALTIME, &now_real); uint64_t deadline_real_ns = (uint64_t)now_real.tv_sec * 1000000000ULL + (uint64_t)now_real.tv_nsec + wait_ns; ts.tv_sec = deadline_real_ns / 1000000000ULL; ts.tv_nsec = deadline_real_ns % 1000000000ULL; pthread_cond_timedwait(&engine.timer_cond, &engine.lock, &ts); } } pthread_mutex_unlock(&engine.lock); } return NULL; } void enqueue_actor_priority(cell_rt *actor) { actor_node *n = malloc(sizeof(actor_node)); n->actor = actor; n->next = NULL; int pq = state_to_pq(actor->state); pthread_mutex_lock(&engine.lock); if (actor->main_thread_only) { if (engine.mq_tail[pq]) engine.mq_tail[pq]->next = n; else engine.mq_head[pq] = n; engine.mq_tail[pq] = n; pthread_cond_signal(&engine.main_cond); } else { if (engine.q_tail[pq]) engine.q_tail[pq]->next = n; else engine.q_head[pq] = n; engine.q_tail[pq] = n; pthread_cond_signal(&engine.wake_cond); } pthread_mutex_unlock(&engine.lock); } void *actor_runner(void *arg) { while (1) { pthread_mutex_lock(&engine.lock); while (!has_any_work(engine.q_head) && !engine.shutting_down) { pthread_cond_wait(&engine.wake_cond, &engine.lock); } if (engine.shutting_down && !has_any_work(engine.q_head)) { pthread_mutex_unlock(&engine.lock); break; } actor_node *node = dequeue_priority(engine.q_head, engine.q_tail); pthread_mutex_unlock(&engine.lock); if (node) { actor_turn(node->actor); free(node); } } return NULL; } void actor_initialize(void) { pthread_mutex_init(&engine.lock, NULL); pthread_cond_init(&engine.wake_cond, NULL); pthread_cond_init(&engine.timer_cond, NULL); pthread_cond_init(&engine.main_cond, NULL); engine.shutting_down = 0; for (int i = 0; i < PQ_COUNT; i++) { engine.q_head[i] = NULL; engine.q_tail[i] = NULL; engine.mq_head[i] = NULL; engine.mq_tail[i] = NULL; } actors_mutex = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(actors_mutex, NULL); // Start Timer Thread pthread_create(&engine.timer_thread, NULL, timer_thread_func, NULL); // Start Workers #ifdef _WIN32 SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); long n = sysinfo.dwNumberOfProcessors; #else long n = sysconf(_SC_NPROCESSORS_ONLN); #endif engine.num_workers = (int)n; engine.worker_threads = malloc(sizeof(pthread_t) * n); for (int i=0; i < n; i++) { pthread_create(&engine.worker_threads[i], NULL, actor_runner, NULL); } } void actor_free(cell_rt *actor) { if (actor->is_quiescent) { actor->is_quiescent = 0; atomic_fetch_sub(&engine.quiescent_count, 1); } lockless_shdel(actors, actor->id); // Purge any pending timers referencing this actor from the timer heap // to prevent the timer thread from accessing freed memory. { pthread_mutex_lock(&engine.lock); int n = arrlen(timer_heap); int w = 0; for (int r = 0; r < n; r++) { if (timer_heap[r].actor != actor) timer_heap[w++] = timer_heap[r]; } arrsetlen(timer_heap, w); for (int i = w / 2 - 1; i >= 0; i--) { int j = i; while (1) { int left = 2 * j + 1, right = 2 * j + 2, smallest = j; if (left < w && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns) smallest = left; if (right < w && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns) smallest = right; if (smallest == j) break; timer_node tmp = timer_heap[j]; timer_heap[j] = timer_heap[smallest]; timer_heap[smallest] = tmp; j = smallest; } } pthread_mutex_unlock(&engine.lock); } // Do not go forward with actor destruction until the actor is completely free pthread_mutex_lock(actor->msg_mutex); pthread_mutex_lock(actor->mutex); JSContext *js = actor->context; JS_DeleteGCRef(js, &actor->idx_buffer_ref); JS_DeleteGCRef(js, &actor->on_exception_ref); JS_DeleteGCRef(js, &actor->message_handle_ref); JS_DeleteGCRef(js, &actor->unneeded_ref); JS_DeleteGCRef(js, &actor->actor_sym_ref); for (int i = 0; i < hmlen(actor->timers); i++) { JS_FreeValue(js, actor->timers[i].value); } hmfree(actor->timers); /* Free all letters in the queue */ for (int i = 0; i < arrlen(actor->letters); i++) { if (actor->letters[i].type == LETTER_BLOB) { blob_destroy(actor->letters[i].blob_data); } else if (actor->letters[i].type == LETTER_CALLBACK) { JS_FreeValue(js, actor->letters[i].callback); } } arrfree(actor->letters); JS_FreeContext(js); free(actor->id); pthread_mutex_unlock(actor->mutex); pthread_mutex_destroy(actor->mutex); free(actor->mutex); pthread_mutex_unlock(actor->msg_mutex); pthread_mutex_destroy(actor->msg_mutex); free(actor->msg_mutex); free(actor); int actor_count = lockless_shlen(actors); if (actor_count == 0) { pthread_mutex_lock(&engine.lock); engine.shutting_down = 1; pthread_cond_broadcast(&engine.wake_cond); pthread_cond_broadcast(&engine.timer_cond); pthread_cond_broadcast(&engine.main_cond); pthread_mutex_unlock(&engine.lock); } } int scheduler_actor_count(void) { return (int)lockless_shlen(actors); } void scheduler_enable_quiescence(void) { engine.quiescence_enabled = 1; // Check if all actors are already quiescent int qc = atomic_load(&engine.quiescent_count); int total = (int)lockless_shlen(actors); if (qc >= total && total > 0) { pthread_mutex_lock(&engine.lock); engine.shutting_down = 1; pthread_cond_broadcast(&engine.wake_cond); pthread_cond_broadcast(&engine.timer_cond); pthread_cond_broadcast(&engine.main_cond); pthread_mutex_unlock(&engine.lock); } } void exit_handler(void) { static int already_exiting = 0; if (already_exiting) return; already_exiting = 1; pthread_mutex_lock(&engine.lock); engine.shutting_down = 1; pthread_cond_broadcast(&engine.wake_cond); pthread_cond_broadcast(&engine.timer_cond); pthread_cond_broadcast(&engine.main_cond); pthread_mutex_unlock(&engine.lock); pthread_join(engine.timer_thread, NULL); for (int i=0; i < engine.num_workers; i++) { pthread_join(engine.worker_threads[i], NULL); } free(engine.worker_threads); pthread_mutex_destroy(&engine.lock); pthread_cond_destroy(&engine.wake_cond); pthread_cond_destroy(&engine.timer_cond); pthread_cond_destroy(&engine.main_cond); pthread_mutex_destroy(actors_mutex); free(actors_mutex); arrfree(timer_heap); } int actor_exists(const char *id) { int idx = lockless_shgeti(actors, id); return idx != -1; } void set_actor_state(cell_rt *actor) { if (actor->disrupt) { #ifdef SCHEDULER_DEBUG fprintf(stderr, "set_actor_state: %s disrupted, freeing\n", actor->name ? actor->name : actor->id); #endif actor_free(actor); return; } pthread_mutex_lock(actor->msg_mutex); #ifdef SCHEDULER_DEBUG fprintf(stderr, "set_actor_state: %s state=%d letters=%ld\n", actor->name ? actor->name : actor->id, actor->state, (long)arrlen(actor->letters)); #endif switch(actor->state) { case ACTOR_RUNNING: case ACTOR_READY: if (actor->ar) actor->ar = 0; break; case ACTOR_IDLE: if (arrlen(actor->letters)) { if (actor->is_quiescent) { actor->is_quiescent = 0; atomic_fetch_sub(&engine.quiescent_count, 1); } #ifdef SCHEDULER_DEBUG fprintf(stderr, "set_actor_state: %s IDLE->READY, enqueueing (main=%d)\n", actor->name ? actor->name : actor->id, actor->main_thread_only); #endif actor->state = ACTOR_READY; actor->ar = 0; enqueue_actor_priority(actor); } else if (!hmlen(actor->timers)) { // No messages AND no timers // Only count as quiescent if no $unneeded callback registered int has_unneeded = !JS_IsNull(actor->unneeded_ref.val); if (!actor->is_quiescent && actor->id && !has_unneeded) { actor->is_quiescent = 1; int qc = atomic_fetch_add(&engine.quiescent_count, 1) + 1; int total = (int)lockless_shlen(actors); if (qc >= total && total > 0 && engine.quiescence_enabled) { pthread_mutex_lock(&engine.lock); engine.shutting_down = 1; pthread_cond_broadcast(&engine.wake_cond); pthread_cond_broadcast(&engine.timer_cond); pthread_cond_broadcast(&engine.main_cond); pthread_mutex_unlock(&engine.lock); } } if (!engine.shutting_down) { // Schedule remove timer static uint32_t global_timer_id = 1; uint32_t id = global_timer_id++; actor->ar = id; uint64_t now = cell_ns(); uint64_t execute_at = now + (uint64_t)(actor->ar_secs * 1e9); pthread_mutex_lock(&engine.lock); heap_push(execute_at, actor, id, TIMER_NATIVE_REMOVE); if (timer_heap[0].timer_id == id) { pthread_cond_signal(&engine.timer_cond); } pthread_mutex_unlock(&engine.lock); } } else { // Has timers but no letters — waiting, not quiescent if (actor->is_quiescent) { actor->is_quiescent = 0; atomic_fetch_sub(&engine.quiescent_count, 1); } } break; } pthread_mutex_unlock(actor->msg_mutex); } uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval) { pthread_mutex_lock(actor->mutex); // Check if this timer is still valid (match actor->ar) if (actor->ar != id && id != 0) { // id 0 means force (optional) pthread_mutex_unlock(actor->mutex); return 0; } actor->disrupt = 1; if (!JS_IsNull(actor->unneeded_ref.val)) { JSValue ret = JS_Call(actor->context, actor->unneeded_ref.val, JS_NULL, 0, NULL); uncaught_exception(actor->context, ret); } int should_free = (actor->state == ACTOR_IDLE); pthread_mutex_unlock(actor->mutex); if (should_free) actor_free(actor); return 0; } void actor_unneeded(cell_rt *actor, JSValue fn, double seconds) { if (actor->disrupt) return; if (!JS_IsFunction(fn)) { actor->unneeded_ref.val = JS_NULL; goto END; } actor->unneeded_ref.val = fn; actor->ar_secs = seconds; END: if (actor->ar) actor->ar = 0; set_actor_state(actor); } cell_rt *get_actor(char *id) { int idx = lockless_shgeti(actors, id); if (idx == -1) { return NULL; } return lockless_shget(actors, id); } void actor_loop() { while (!engine.shutting_down) { pthread_mutex_lock(&engine.lock); while (!has_any_work(engine.mq_head) && !engine.shutting_down) { pthread_cond_wait(&engine.main_cond, &engine.lock); } if (engine.shutting_down && !has_any_work(engine.mq_head)) { pthread_mutex_unlock(&engine.lock); break; } actor_node *node = dequeue_priority(engine.mq_head, engine.mq_tail); pthread_mutex_unlock(&engine.lock); if (node) { actor_turn(node->actor); free(node); } } } cell_rt *create_actor(void *wota) { cell_rt *actor = calloc(sizeof(*actor), 1); #ifdef HAVE_MIMALLOC actor->heap = mi_heap_new(); #endif actor->init_wota = wota; /* GCRef fields are registered after JSContext creation in script_startup. For now, zero-init from calloc is sufficient (val = 0 = JS_MKVAL(JS_TAG_INT,0), which is not a pointer so GC-safe). The actual JS_NULL assignment and JS_AddGCRef happen in script_startup. */ arrsetcap(actor->letters, 5); actor->mutex = malloc(sizeof(pthread_mutex_t)); pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(actor->mutex, &attr); actor->msg_mutex = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(actor->msg_mutex, &attr); // msg_mutex can be recursive too to be safe pthread_mutexattr_destroy(&attr); /* Lock actor->mutex while initializing JS runtime. */ pthread_mutex_lock(actor->mutex); script_startup(actor); set_actor_state(actor); pthread_mutex_unlock(actor->mutex); return actor; } const char *register_actor(const char *id, cell_rt *actor, int mainthread, double ar) { actor->main_thread_only = mainthread; actor->id = strdup(id); actor->ar_secs = ar; int added = lockless_shput_unique(actors, actor->id, actor); if (!added) { free(actor->id); return "Actor with given ID already exists."; } // Now that actor is in the registry, track its quiescent state if (actor->state == ACTOR_IDLE && !arrlen(actor->letters) && !hmlen(actor->timers) && JS_IsNull(actor->unneeded_ref.val)) { actor->is_quiescent = 1; atomic_fetch_add(&engine.quiescent_count, 1); } return NULL; } const char *send_message(const char *id, void *msg) { cell_rt *target = get_actor(id); if (!target) { blob_destroy((blob *)msg); return "Could not get actor from id."; } letter l; l.type = LETTER_BLOB; l.blob_data = (blob *)msg; pthread_mutex_lock(target->msg_mutex); arrput(target->letters, l); pthread_mutex_unlock(target->msg_mutex); if (target->ar) target->ar = 0; set_actor_state(target); return NULL; } void actor_turn(cell_rt *actor) { pthread_mutex_lock(actor->mutex); #ifdef ACTOR_TRACE int prev_state = actor->state; #endif actor->state = ACTOR_RUNNING; #ifdef ACTOR_TRACE fprintf(stderr, "[ACTOR_TRACE] %s: %d -> RUNNING\n", actor->name ? actor->name : actor->id, prev_state); #endif if (actor->trace_hook) actor->trace_hook(actor->name, CELL_HOOK_ENTER); JSValue result; if (actor->vm_suspended) { /* RESUME path: continue suspended turn under kill timer only */ atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed); JS_SetPauseFlag(actor->context, 0); actor->turn_start_ns = cell_ns(); /* Register kill timer only for resume */ pthread_mutex_lock(&engine.lock); heap_push(actor->turn_start_ns + ACTOR_SLOW_TIMER_NS, actor, 0, TIMER_KILL); pthread_cond_signal(&engine.timer_cond); pthread_mutex_unlock(&engine.lock); result = JS_ResumeRegisterVM(actor->context); actor->vm_suspended = 0; if (JS_IsSuspended(result)) { /* Still suspended after kill timer — shouldn't happen, kill it */ actor->disrupt = 1; goto ENDTURN; } if (JS_IsException(result)) { if (!uncaught_exception(actor->context, result)) actor->disrupt = 1; } actor->slow_strikes++; #ifdef ACTOR_TRACE fprintf(stderr, "[ACTOR_TRACE] %s: slow strike %d/%d\n", actor->name ? actor->name : actor->id, actor->slow_strikes, ACTOR_SLOW_STRIKES_MAX); #endif if (actor->slow_strikes >= ACTOR_SLOW_STRIKES_MAX) { #ifdef ACTOR_TRACE fprintf(stderr, "[ACTOR_TRACE] %s: %d slow strikes, killing\n", actor->name ? actor->name : actor->id, actor->slow_strikes); #endif actor->disrupt = 1; } goto ENDTURN; } /* NORMAL path: pop a message and execute */ pthread_mutex_lock(actor->msg_mutex); int pending = arrlen(actor->letters); if (!pending) { pthread_mutex_unlock(actor->msg_mutex); goto ENDTURN; } #ifdef SCHEDULER_DEBUG fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n", actor->name ? actor->name : actor->id, pending, actor->letters[0].type); #endif letter l = actor->letters[0]; arrdel(actor->letters, 0); pthread_mutex_unlock(actor->msg_mutex); atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed); JS_SetPauseFlag(actor->context, 0); actor->turn_start_ns = cell_ns(); /* Register both pause and kill timers */ pthread_mutex_lock(&engine.lock); heap_push(actor->turn_start_ns + ACTOR_FAST_TIMER_NS, actor, 0, TIMER_PAUSE); heap_push(actor->turn_start_ns + ACTOR_SLOW_TIMER_NS + ACTOR_FAST_TIMER_NS, actor, 0, TIMER_KILL); pthread_cond_signal(&engine.timer_cond); pthread_mutex_unlock(&engine.lock); if (l.type == LETTER_BLOB) { size_t size = blob_length(l.blob_data) / 8; JSValue arg = js_new_blob_stoned_copy(actor->context, (void *)blob_data(l.blob_data), size); blob_destroy(l.blob_data); result = JS_Call(actor->context, actor->message_handle_ref.val, JS_NULL, 1, &arg); if (JS_IsSuspended(result)) { actor->vm_suspended = 1; actor->state = ACTOR_SLOW; JS_FreeValue(actor->context, arg); goto ENDTURN_SLOW; } if (!uncaught_exception(actor->context, result)) actor->disrupt = 1; JS_FreeValue(actor->context, arg); } else if (l.type == LETTER_CALLBACK) { result = JS_Call(actor->context, l.callback, JS_NULL, 0, NULL); if (JS_IsSuspended(result)) { actor->vm_suspended = 1; actor->state = ACTOR_SLOW; JS_FreeValue(actor->context, l.callback); goto ENDTURN_SLOW; } if (!uncaught_exception(actor->context, result)) actor->disrupt = 1; JS_FreeValue(actor->context, l.callback); } if (actor->disrupt) goto ENDTURN; actor->slow_strikes = 0; /* completed within fast timer */ ENDTURN: /* Invalidate any outstanding pause/kill timers for this turn */ atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed); actor->state = ACTOR_IDLE; if (actor->trace_hook) actor->trace_hook(actor->name, CELL_HOOK_EXIT); if (actor->disrupt) { #ifdef SCHEDULER_DEBUG fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n", actor->name ? actor->name : actor->id); #endif pthread_mutex_unlock(actor->mutex); actor_free(actor); return; } #ifdef SCHEDULER_DEBUG fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n", actor->name ? actor->name : actor->id, (long)arrlen(actor->letters)); #endif set_actor_state(actor); pthread_mutex_unlock(actor->mutex); return; ENDTURN_SLOW: #ifdef ACTOR_TRACE fprintf(stderr, "[ACTOR_TRACE] %s: suspended mid-turn -> SLOW\n", actor->name ? actor->name : actor->id); #endif if (actor->trace_hook) actor->trace_hook(actor->name, CELL_HOOK_EXIT); enqueue_actor_priority(actor); pthread_mutex_unlock(actor->mutex); } void actor_clock(cell_rt *actor, JSValue fn) { letter l; l.type = LETTER_CALLBACK; l.callback = JS_DupValue(actor->context, fn); pthread_mutex_lock(actor->msg_mutex); arrput(actor->letters, l); pthread_mutex_unlock(actor->msg_mutex); set_actor_state(actor); } uint32_t actor_delay(cell_rt *actor, JSValue fn, double seconds) { pthread_mutex_lock(actor->msg_mutex); static uint32_t global_timer_id = 1; uint32_t id = global_timer_id++; JSValue cb = JS_DupValue(actor->context, fn); hmput(actor->timers, id, cb); uint64_t now = cell_ns(); uint64_t execute_at = now + (uint64_t)(seconds * 1e9); pthread_mutex_lock(&engine.lock); heap_push(execute_at, actor, id, TIMER_JS); if (timer_heap[0].timer_id == id) { pthread_cond_signal(&engine.timer_cond); } pthread_mutex_unlock(&engine.lock); pthread_mutex_unlock(actor->msg_mutex); return id; } JSValue actor_remove_timer(cell_rt *actor, uint32_t timer_id) { JSValue cb = JS_NULL; pthread_mutex_lock(actor->msg_mutex); int id = hmgeti(actor->timers, timer_id); if (id != -1) { cb = actor->timers[id].value; hmdel(actor->timers, timer_id); } pthread_mutex_unlock(actor->msg_mutex); // Note: We don't remove from heap, it will misfire safely return cb; }