944 lines
27 KiB
C
944 lines
27 KiB
C
#include <pthread.h>
|
|
#include <sys/time.h>
|
|
#include <stdlib.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <unistd.h>
|
|
#include <stdatomic.h>
|
|
|
|
#include "stb_ds.h"
|
|
#include "cell.h"
|
|
#include "quickjs-internal.h"
|
|
#include "cell_internal.h"
|
|
|
|
#ifdef _WIN32
|
|
#include <windows.h>
|
|
#endif
|
|
|
|
typedef struct actor_node {
|
|
cell_rt *actor;
|
|
struct actor_node *next;
|
|
} actor_node;
|
|
|
|
typedef enum {
|
|
TIMER_JS,
|
|
TIMER_NATIVE_REMOVE,
|
|
TIMER_PAUSE,
|
|
TIMER_KILL
|
|
} timer_type;
|
|
|
|
typedef struct {
|
|
uint64_t execute_at_ns;
|
|
cell_rt *actor;
|
|
uint32_t timer_id;
|
|
timer_type type;
|
|
uint32_t turn_gen; /* generation at registration time */
|
|
} timer_node;
|
|
|
|
static timer_node *timer_heap = NULL;
|
|
|
|
// Priority queue indices
|
|
#define PQ_READY 0
|
|
#define PQ_REFRESHED 1
|
|
#define PQ_EXHAUSTED 2
|
|
#define PQ_SLOW 3
|
|
#define PQ_COUNT 4
|
|
|
|
// --- 3. The Global Engine State ---
|
|
static struct {
|
|
pthread_mutex_t lock; // Protects queue, shutdown flag, and timers
|
|
pthread_cond_t wake_cond; // Wakes up workers
|
|
pthread_cond_t timer_cond; // Wakes up the timer thread
|
|
pthread_cond_t main_cond; // Wakes up the main thread
|
|
|
|
actor_node *q_head[PQ_COUNT], *q_tail[PQ_COUNT]; // worker queues
|
|
actor_node *mq_head[PQ_COUNT], *mq_tail[PQ_COUNT]; // main-thread queues
|
|
|
|
int shutting_down;
|
|
int quiescence_enabled; // set after bootstrap, before actor_loop
|
|
_Atomic int quiescent_count; // actors idle with no messages and no timers
|
|
|
|
pthread_t *worker_threads;
|
|
int num_workers;
|
|
pthread_t timer_thread;
|
|
} engine;
|
|
|
|
static int state_to_pq(int state) {
|
|
switch (state) {
|
|
case ACTOR_REFRESHED: return PQ_REFRESHED;
|
|
case ACTOR_EXHAUSTED: return PQ_EXHAUSTED;
|
|
case ACTOR_SLOW: return PQ_SLOW;
|
|
default: return PQ_READY;
|
|
}
|
|
}
|
|
|
|
static actor_node *dequeue_priority(actor_node *heads[], actor_node *tails[]) {
|
|
for (int i = 0; i < PQ_COUNT; i++) {
|
|
if (heads[i]) {
|
|
actor_node *n = heads[i];
|
|
heads[i] = n->next;
|
|
if (!heads[i]) tails[i] = NULL;
|
|
return n;
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int has_any_work(actor_node *heads[]) {
|
|
for (int i = 0; i < PQ_COUNT; i++)
|
|
if (heads[i]) return 1;
|
|
return 0;
|
|
}
|
|
|
|
static pthread_mutex_t *actors_mutex;
|
|
static struct { char *key; cell_rt *value; } *actors = NULL;
|
|
|
|
#define lockless_shdel(NAME, KEY) pthread_mutex_lock(NAME##_mutex); shdel(NAME, KEY); pthread_mutex_unlock(NAME##_mutex);
|
|
#define lockless_shlen(NAME) ({ \
|
|
pthread_mutex_lock(NAME##_mutex); \
|
|
size_t _len = shlen(NAME); \
|
|
pthread_mutex_unlock(NAME##_mutex); \
|
|
_len; \
|
|
})
|
|
#define lockless_shgeti(NAME, KEY) ({ \
|
|
pthread_mutex_lock(NAME##_mutex); \
|
|
int _idx = shgeti(NAME, KEY); \
|
|
pthread_mutex_unlock(NAME##_mutex); \
|
|
_idx; \
|
|
})
|
|
#define lockless_shget(NAME, KEY) ({ \
|
|
pthread_mutex_lock(NAME##_mutex); \
|
|
cell_rt *_actor = shget(NAME, KEY); \
|
|
pthread_mutex_unlock(NAME##_mutex); \
|
|
_actor; \
|
|
})
|
|
#define lockless_shput_unique(NAME, KEY, VALUE) ({ \
|
|
pthread_mutex_lock(NAME##_mutex); \
|
|
int _exists = shgeti(NAME, KEY) != -1; \
|
|
if (!_exists) shput(NAME, KEY, VALUE); \
|
|
pthread_mutex_unlock(NAME##_mutex); \
|
|
!_exists; \
|
|
})
|
|
|
|
// Forward declarations
|
|
uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval);
|
|
void actor_turn(cell_rt *actor);
|
|
|
|
void heap_push(uint64_t when, cell_rt *actor, uint32_t timer_id, timer_type type) {
|
|
timer_node node = { .execute_at_ns = when, .actor = actor, .timer_id = timer_id, .type = type, .turn_gen = 0 };
|
|
if (type == TIMER_PAUSE || type == TIMER_KILL)
|
|
node.turn_gen = atomic_load_explicit(&actor->turn_gen, memory_order_relaxed);
|
|
arrput(timer_heap, node);
|
|
|
|
// Bubble up
|
|
int i = arrlen(timer_heap) - 1;
|
|
while (i > 0) {
|
|
int parent = (i - 1) / 2;
|
|
if (timer_heap[i].execute_at_ns >= timer_heap[parent].execute_at_ns) break;
|
|
|
|
timer_node tmp = timer_heap[i];
|
|
timer_heap[i] = timer_heap[parent];
|
|
timer_heap[parent] = tmp;
|
|
i = parent;
|
|
}
|
|
}
|
|
|
|
// Helper: Heap Pop
|
|
int heap_pop(timer_node *out) {
|
|
if (arrlen(timer_heap) == 0) return 0;
|
|
|
|
*out = timer_heap[0];
|
|
timer_node last = arrpop(timer_heap);
|
|
|
|
if (arrlen(timer_heap) > 0) {
|
|
timer_heap[0] = last;
|
|
// Bubble down
|
|
int i = 0;
|
|
int n = arrlen(timer_heap);
|
|
while (1) {
|
|
int left = 2 * i + 1;
|
|
int right = 2 * i + 2;
|
|
int smallest = i;
|
|
|
|
if (left < n && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns)
|
|
smallest = left;
|
|
if (right < n && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns)
|
|
smallest = right;
|
|
|
|
if (smallest == i) break;
|
|
|
|
timer_node tmp = timer_heap[i];
|
|
timer_heap[i] = timer_heap[smallest];
|
|
timer_heap[smallest] = tmp;
|
|
i = smallest;
|
|
}
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
void *timer_thread_func(void *arg) {
|
|
while (1) {
|
|
pthread_mutex_lock(&engine.lock);
|
|
|
|
if (engine.shutting_down) {
|
|
pthread_mutex_unlock(&engine.lock);
|
|
return NULL;
|
|
}
|
|
|
|
if (arrlen(timer_heap) == 0) {
|
|
pthread_cond_wait(&engine.timer_cond, &engine.lock);
|
|
} else {
|
|
uint64_t now = cell_ns();
|
|
if (timer_heap[0].execute_at_ns <= now) {
|
|
// --- TIMER FIRED ---
|
|
timer_node t;
|
|
heap_pop(&t);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
if (t.type == TIMER_NATIVE_REMOVE) {
|
|
actor_remove_cb(t.actor, t.timer_id, 0);
|
|
} else if (t.type == TIMER_PAUSE || t.type == TIMER_KILL) {
|
|
/* Only fire if turn_gen still matches (stale timers are ignored) */
|
|
uint32_t cur = atomic_load_explicit(&t.actor->turn_gen, memory_order_relaxed);
|
|
if (cur == t.turn_gen) {
|
|
if (t.type == TIMER_PAUSE) {
|
|
JS_SetPauseFlag(t.actor->context, 1);
|
|
} else {
|
|
t.actor->disrupt = 1;
|
|
JS_SetPauseFlag(t.actor->context, 2);
|
|
}
|
|
}
|
|
} else {
|
|
pthread_mutex_lock(t.actor->msg_mutex);
|
|
int idx = hmgeti(t.actor->timers, t.timer_id);
|
|
if (idx != -1) {
|
|
JSValue cb = t.actor->timers[idx].value;
|
|
hmdel(t.actor->timers, t.timer_id);
|
|
actor_clock(t.actor, cb);
|
|
JS_FreeValue(t.actor->context, cb);
|
|
}
|
|
pthread_mutex_unlock(t.actor->msg_mutex);
|
|
}
|
|
|
|
continue;
|
|
} else {
|
|
// --- WAIT FOR DEADLINE ---
|
|
struct timespec ts;
|
|
uint64_t wait_ns = timer_heap[0].execute_at_ns - now;
|
|
|
|
// Convert relative wait time to absolute CLOCK_REALTIME
|
|
struct timespec now_real;
|
|
clock_gettime(CLOCK_REALTIME, &now_real);
|
|
|
|
uint64_t deadline_real_ns = (uint64_t)now_real.tv_sec * 1000000000ULL +
|
|
(uint64_t)now_real.tv_nsec + wait_ns;
|
|
ts.tv_sec = deadline_real_ns / 1000000000ULL;
|
|
ts.tv_nsec = deadline_real_ns % 1000000000ULL;
|
|
|
|
pthread_cond_timedwait(&engine.timer_cond, &engine.lock, &ts);
|
|
}
|
|
}
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void enqueue_actor_priority(cell_rt *actor) {
|
|
actor_node *n = malloc(sizeof(actor_node));
|
|
n->actor = actor;
|
|
n->next = NULL;
|
|
int pq = state_to_pq(actor->state);
|
|
pthread_mutex_lock(&engine.lock);
|
|
if (actor->main_thread_only) {
|
|
if (engine.mq_tail[pq]) engine.mq_tail[pq]->next = n;
|
|
else engine.mq_head[pq] = n;
|
|
engine.mq_tail[pq] = n;
|
|
pthread_cond_signal(&engine.main_cond);
|
|
} else {
|
|
if (engine.q_tail[pq]) engine.q_tail[pq]->next = n;
|
|
else engine.q_head[pq] = n;
|
|
engine.q_tail[pq] = n;
|
|
pthread_cond_signal(&engine.wake_cond);
|
|
}
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
|
|
void *actor_runner(void *arg) {
|
|
while (1) {
|
|
pthread_mutex_lock(&engine.lock);
|
|
|
|
while (!has_any_work(engine.q_head) && !engine.shutting_down) {
|
|
pthread_cond_wait(&engine.wake_cond, &engine.lock);
|
|
}
|
|
|
|
if (engine.shutting_down && !has_any_work(engine.q_head)) {
|
|
pthread_mutex_unlock(&engine.lock);
|
|
break;
|
|
}
|
|
|
|
actor_node *node = dequeue_priority(engine.q_head, engine.q_tail);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
if (node) {
|
|
actor_turn(node->actor);
|
|
free(node);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void actor_initialize(void) {
|
|
pthread_mutex_init(&engine.lock, NULL);
|
|
pthread_cond_init(&engine.wake_cond, NULL);
|
|
pthread_cond_init(&engine.timer_cond, NULL);
|
|
pthread_cond_init(&engine.main_cond, NULL);
|
|
|
|
engine.shutting_down = 0;
|
|
for (int i = 0; i < PQ_COUNT; i++) {
|
|
engine.q_head[i] = NULL;
|
|
engine.q_tail[i] = NULL;
|
|
engine.mq_head[i] = NULL;
|
|
engine.mq_tail[i] = NULL;
|
|
}
|
|
|
|
actors_mutex = malloc(sizeof(pthread_mutex_t));
|
|
pthread_mutex_init(actors_mutex, NULL);
|
|
|
|
// Start Timer Thread
|
|
pthread_create(&engine.timer_thread, NULL, timer_thread_func, NULL);
|
|
|
|
// Start Workers
|
|
#ifdef _WIN32
|
|
SYSTEM_INFO sysinfo;
|
|
GetSystemInfo(&sysinfo);
|
|
long n = sysinfo.dwNumberOfProcessors;
|
|
#else
|
|
long n = sysconf(_SC_NPROCESSORS_ONLN);
|
|
#endif
|
|
engine.num_workers = (int)n;
|
|
engine.worker_threads = malloc(sizeof(pthread_t) * n);
|
|
for (int i=0; i < n; i++) {
|
|
pthread_create(&engine.worker_threads[i], NULL, actor_runner, NULL);
|
|
}
|
|
}
|
|
|
|
void actor_free(cell_rt *actor)
|
|
{
|
|
if (actor->is_quiescent) {
|
|
actor->is_quiescent = 0;
|
|
atomic_fetch_sub(&engine.quiescent_count, 1);
|
|
}
|
|
lockless_shdel(actors, actor->id);
|
|
|
|
// Purge any pending timers referencing this actor from the timer heap
|
|
// to prevent the timer thread from accessing freed memory.
|
|
{
|
|
pthread_mutex_lock(&engine.lock);
|
|
int n = arrlen(timer_heap);
|
|
int w = 0;
|
|
for (int r = 0; r < n; r++) {
|
|
if (timer_heap[r].actor != actor)
|
|
timer_heap[w++] = timer_heap[r];
|
|
}
|
|
arrsetlen(timer_heap, w);
|
|
for (int i = w / 2 - 1; i >= 0; i--) {
|
|
int j = i;
|
|
while (1) {
|
|
int left = 2 * j + 1, right = 2 * j + 2, smallest = j;
|
|
if (left < w && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns)
|
|
smallest = left;
|
|
if (right < w && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns)
|
|
smallest = right;
|
|
if (smallest == j) break;
|
|
timer_node tmp = timer_heap[j];
|
|
timer_heap[j] = timer_heap[smallest];
|
|
timer_heap[smallest] = tmp;
|
|
j = smallest;
|
|
}
|
|
}
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
|
|
// Do not go forward with actor destruction until the actor is completely free
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
pthread_mutex_lock(actor->mutex);
|
|
|
|
JSContext *js = actor->context;
|
|
|
|
JS_DeleteGCRef(js, &actor->idx_buffer_ref);
|
|
JS_DeleteGCRef(js, &actor->on_exception_ref);
|
|
JS_DeleteGCRef(js, &actor->message_handle_ref);
|
|
JS_DeleteGCRef(js, &actor->unneeded_ref);
|
|
JS_DeleteGCRef(js, &actor->actor_sym_ref);
|
|
|
|
for (int i = 0; i < hmlen(actor->timers); i++) {
|
|
JS_FreeValue(js, actor->timers[i].value);
|
|
}
|
|
|
|
hmfree(actor->timers);
|
|
|
|
/* Free all letters in the queue */
|
|
for (int i = 0; i < arrlen(actor->letters); i++) {
|
|
if (actor->letters[i].type == LETTER_BLOB) {
|
|
blob_destroy(actor->letters[i].blob_data);
|
|
} else if (actor->letters[i].type == LETTER_CALLBACK) {
|
|
JS_FreeValue(js, actor->letters[i].callback);
|
|
}
|
|
}
|
|
|
|
arrfree(actor->letters);
|
|
|
|
JS_FreeContext(js);
|
|
free(actor->id);
|
|
|
|
pthread_mutex_unlock(actor->mutex);
|
|
pthread_mutex_destroy(actor->mutex);
|
|
free(actor->mutex);
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
pthread_mutex_destroy(actor->msg_mutex);
|
|
free(actor->msg_mutex);
|
|
|
|
free(actor);
|
|
|
|
int actor_count = lockless_shlen(actors);
|
|
if (actor_count == 0) {
|
|
pthread_mutex_lock(&engine.lock);
|
|
engine.shutting_down = 1;
|
|
pthread_cond_broadcast(&engine.wake_cond);
|
|
pthread_cond_broadcast(&engine.timer_cond);
|
|
pthread_cond_broadcast(&engine.main_cond);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
}
|
|
|
|
int scheduler_actor_count(void) {
|
|
return (int)lockless_shlen(actors);
|
|
}
|
|
|
|
void scheduler_enable_quiescence(void) {
|
|
engine.quiescence_enabled = 1;
|
|
// Check if all actors are already quiescent
|
|
int qc = atomic_load(&engine.quiescent_count);
|
|
int total = (int)lockless_shlen(actors);
|
|
if (qc >= total && total > 0) {
|
|
pthread_mutex_lock(&engine.lock);
|
|
engine.shutting_down = 1;
|
|
pthread_cond_broadcast(&engine.wake_cond);
|
|
pthread_cond_broadcast(&engine.timer_cond);
|
|
pthread_cond_broadcast(&engine.main_cond);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
}
|
|
|
|
void exit_handler(void) {
|
|
static int already_exiting = 0;
|
|
if (already_exiting) return;
|
|
already_exiting = 1;
|
|
|
|
pthread_mutex_lock(&engine.lock);
|
|
engine.shutting_down = 1;
|
|
pthread_cond_broadcast(&engine.wake_cond);
|
|
pthread_cond_broadcast(&engine.timer_cond);
|
|
pthread_cond_broadcast(&engine.main_cond);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
pthread_join(engine.timer_thread, NULL);
|
|
|
|
for (int i=0; i < engine.num_workers; i++) {
|
|
pthread_join(engine.worker_threads[i], NULL);
|
|
}
|
|
|
|
free(engine.worker_threads);
|
|
pthread_mutex_destroy(&engine.lock);
|
|
pthread_cond_destroy(&engine.wake_cond);
|
|
pthread_cond_destroy(&engine.timer_cond);
|
|
pthread_cond_destroy(&engine.main_cond);
|
|
|
|
pthread_mutex_destroy(actors_mutex);
|
|
free(actors_mutex);
|
|
|
|
arrfree(timer_heap);
|
|
}
|
|
|
|
int actor_exists(const char *id)
|
|
{
|
|
int idx = lockless_shgeti(actors, id);
|
|
return idx != -1;
|
|
}
|
|
|
|
void set_actor_state(cell_rt *actor)
|
|
{
|
|
if (actor->disrupt) {
|
|
#ifdef SCHEDULER_DEBUG
|
|
fprintf(stderr, "set_actor_state: %s disrupted, freeing\n", actor->name ? actor->name : actor->id);
|
|
#endif
|
|
actor_free(actor);
|
|
return;
|
|
}
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
|
|
#ifdef SCHEDULER_DEBUG
|
|
fprintf(stderr, "set_actor_state: %s state=%d letters=%ld\n", actor->name ? actor->name : actor->id, actor->state, (long)arrlen(actor->letters));
|
|
#endif
|
|
switch(actor->state) {
|
|
case ACTOR_RUNNING:
|
|
case ACTOR_READY:
|
|
if (actor->ar)
|
|
actor->ar = 0;
|
|
break;
|
|
|
|
case ACTOR_IDLE:
|
|
if (arrlen(actor->letters)) {
|
|
if (actor->is_quiescent) {
|
|
actor->is_quiescent = 0;
|
|
atomic_fetch_sub(&engine.quiescent_count, 1);
|
|
}
|
|
#ifdef SCHEDULER_DEBUG
|
|
fprintf(stderr, "set_actor_state: %s IDLE->READY, enqueueing (main=%d)\n", actor->name ? actor->name : actor->id, actor->main_thread_only);
|
|
#endif
|
|
actor->state = ACTOR_READY;
|
|
actor->ar = 0;
|
|
enqueue_actor_priority(actor);
|
|
|
|
} else if (!hmlen(actor->timers)) {
|
|
// No messages AND no timers
|
|
// Only count as quiescent if no $unneeded callback registered
|
|
int has_unneeded = !JS_IsNull(actor->unneeded_ref.val);
|
|
if (!actor->is_quiescent && actor->id && !has_unneeded) {
|
|
actor->is_quiescent = 1;
|
|
int qc = atomic_fetch_add(&engine.quiescent_count, 1) + 1;
|
|
int total = (int)lockless_shlen(actors);
|
|
if (qc >= total && total > 0 && engine.quiescence_enabled) {
|
|
pthread_mutex_lock(&engine.lock);
|
|
engine.shutting_down = 1;
|
|
pthread_cond_broadcast(&engine.wake_cond);
|
|
pthread_cond_broadcast(&engine.timer_cond);
|
|
pthread_cond_broadcast(&engine.main_cond);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
}
|
|
|
|
if (!engine.shutting_down) {
|
|
// Schedule remove timer
|
|
static uint32_t global_timer_id = 1;
|
|
uint32_t id = global_timer_id++;
|
|
actor->ar = id;
|
|
|
|
uint64_t now = cell_ns();
|
|
uint64_t execute_at = now + (uint64_t)(actor->ar_secs * 1e9);
|
|
|
|
pthread_mutex_lock(&engine.lock);
|
|
heap_push(execute_at, actor, id, TIMER_NATIVE_REMOVE);
|
|
if (timer_heap[0].timer_id == id) {
|
|
pthread_cond_signal(&engine.timer_cond);
|
|
}
|
|
pthread_mutex_unlock(&engine.lock);
|
|
}
|
|
} else {
|
|
// Has timers but no letters — waiting, not quiescent
|
|
if (actor->is_quiescent) {
|
|
actor->is_quiescent = 0;
|
|
atomic_fetch_sub(&engine.quiescent_count, 1);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
}
|
|
|
|
uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval)
|
|
{
|
|
pthread_mutex_lock(actor->mutex);
|
|
// Check if this timer is still valid (match actor->ar)
|
|
if (actor->ar != id && id != 0) { // id 0 means force (optional)
|
|
pthread_mutex_unlock(actor->mutex);
|
|
return 0;
|
|
}
|
|
|
|
actor->disrupt = 1;
|
|
|
|
if (!JS_IsNull(actor->unneeded_ref.val)) {
|
|
JSValue ret = JS_Call(actor->context, actor->unneeded_ref.val, JS_NULL, 0, NULL);
|
|
uncaught_exception(actor->context, ret);
|
|
}
|
|
|
|
int should_free = (actor->state == ACTOR_IDLE);
|
|
pthread_mutex_unlock(actor->mutex);
|
|
|
|
if (should_free) actor_free(actor);
|
|
return 0;
|
|
}
|
|
|
|
void actor_unneeded(cell_rt *actor, JSValue fn, double seconds)
|
|
{
|
|
if (actor->disrupt) return;
|
|
|
|
if (!JS_IsFunction(fn)) {
|
|
actor->unneeded_ref.val = JS_NULL;
|
|
goto END;
|
|
}
|
|
|
|
actor->unneeded_ref.val = fn;
|
|
actor->ar_secs = seconds;
|
|
|
|
END:
|
|
if (actor->ar)
|
|
actor->ar = 0;
|
|
set_actor_state(actor);
|
|
}
|
|
|
|
cell_rt *get_actor(char *id)
|
|
{
|
|
int idx = lockless_shgeti(actors, id);
|
|
if (idx == -1) {
|
|
return NULL;
|
|
}
|
|
|
|
return lockless_shget(actors, id);
|
|
}
|
|
|
|
void actor_loop()
|
|
{
|
|
while (!engine.shutting_down) {
|
|
pthread_mutex_lock(&engine.lock);
|
|
while (!has_any_work(engine.mq_head) && !engine.shutting_down) {
|
|
pthread_cond_wait(&engine.main_cond, &engine.lock);
|
|
}
|
|
|
|
if (engine.shutting_down && !has_any_work(engine.mq_head)) {
|
|
pthread_mutex_unlock(&engine.lock);
|
|
break;
|
|
}
|
|
|
|
actor_node *node = dequeue_priority(engine.mq_head, engine.mq_tail);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
if (node) {
|
|
actor_turn(node->actor);
|
|
free(node);
|
|
}
|
|
}
|
|
}
|
|
|
|
cell_rt *create_actor(void *wota)
|
|
{
|
|
cell_rt *actor = calloc(sizeof(*actor), 1);
|
|
#ifdef HAVE_MIMALLOC
|
|
actor->heap = mi_heap_new();
|
|
#endif
|
|
actor->init_wota = wota;
|
|
/* GCRef fields are registered after JSContext creation in script_startup.
|
|
For now, zero-init from calloc is sufficient (val = 0 = JS_MKVAL(JS_TAG_INT,0),
|
|
which is not a pointer so GC-safe). The actual JS_NULL assignment and
|
|
JS_AddGCRef happen in script_startup. */
|
|
|
|
arrsetcap(actor->letters, 5);
|
|
|
|
actor->mutex = malloc(sizeof(pthread_mutex_t));
|
|
pthread_mutexattr_t attr;
|
|
pthread_mutexattr_init(&attr);
|
|
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
|
pthread_mutex_init(actor->mutex, &attr);
|
|
actor->msg_mutex = malloc(sizeof(pthread_mutex_t));
|
|
pthread_mutex_init(actor->msg_mutex, &attr); // msg_mutex can be recursive too to be safe
|
|
pthread_mutexattr_destroy(&attr);
|
|
|
|
/* Lock actor->mutex while initializing JS runtime. */
|
|
pthread_mutex_lock(actor->mutex);
|
|
script_startup(actor);
|
|
set_actor_state(actor);
|
|
pthread_mutex_unlock(actor->mutex);
|
|
|
|
return actor;
|
|
}
|
|
|
|
const char *register_actor(const char *id, cell_rt *actor, int mainthread, double ar)
|
|
{
|
|
actor->main_thread_only = mainthread;
|
|
actor->id = strdup(id);
|
|
actor->ar_secs = ar;
|
|
int added = lockless_shput_unique(actors, actor->id, actor);
|
|
if (!added) {
|
|
free(actor->id);
|
|
return "Actor with given ID already exists.";
|
|
}
|
|
// Now that actor is in the registry, track its quiescent state
|
|
if (actor->state == ACTOR_IDLE && !arrlen(actor->letters)
|
|
&& !hmlen(actor->timers) && JS_IsNull(actor->unneeded_ref.val)) {
|
|
actor->is_quiescent = 1;
|
|
atomic_fetch_add(&engine.quiescent_count, 1);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
|
|
const char *send_message(const char *id, void *msg)
|
|
{
|
|
cell_rt *target = get_actor(id);
|
|
if (!target) {
|
|
blob_destroy((blob *)msg);
|
|
return "Could not get actor from id.";
|
|
}
|
|
|
|
letter l;
|
|
l.type = LETTER_BLOB;
|
|
l.blob_data = (blob *)msg;
|
|
|
|
pthread_mutex_lock(target->msg_mutex);
|
|
arrput(target->letters, l);
|
|
pthread_mutex_unlock(target->msg_mutex);
|
|
|
|
if (target->ar)
|
|
target->ar = 0;
|
|
|
|
set_actor_state(target);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void actor_turn(cell_rt *actor)
|
|
{
|
|
pthread_mutex_lock(actor->mutex);
|
|
#ifdef ACTOR_TRACE
|
|
int prev_state = actor->state;
|
|
#endif
|
|
actor->state = ACTOR_RUNNING;
|
|
#ifdef ACTOR_TRACE
|
|
fprintf(stderr, "[ACTOR_TRACE] %s: %d -> RUNNING\n",
|
|
actor->name ? actor->name : actor->id, prev_state);
|
|
#endif
|
|
|
|
if (!actor->trace_hook) {
|
|
cell_hook gh = cell_trace_gethook();
|
|
if (gh) actor->trace_hook = gh;
|
|
}
|
|
|
|
if (actor->trace_hook)
|
|
actor->trace_hook(actor, CELL_HOOK_ENTER);
|
|
|
|
JSValue result;
|
|
|
|
if (actor->vm_suspended) {
|
|
/* RESUME path: continue suspended turn under kill timer only */
|
|
g_crash_ctx = actor->context;
|
|
atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed);
|
|
JS_SetPauseFlag(actor->context, 0);
|
|
actor->turn_start_ns = cell_ns();
|
|
|
|
/* Register kill timer only for resume */
|
|
pthread_mutex_lock(&engine.lock);
|
|
heap_push(actor->turn_start_ns + ACTOR_SLOW_TIMER_NS,
|
|
actor, 0, TIMER_KILL);
|
|
pthread_cond_signal(&engine.timer_cond);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
result = JS_ResumeRegisterVM(actor->context);
|
|
actor->vm_suspended = 0;
|
|
|
|
if (JS_IsSuspended(result)) {
|
|
/* Still suspended after kill timer — shouldn't happen, kill it */
|
|
actor->disrupt = 1;
|
|
goto ENDTURN;
|
|
}
|
|
if (JS_IsException(result)) {
|
|
if (!uncaught_exception(actor->context, result))
|
|
actor->disrupt = 1;
|
|
}
|
|
actor->slow_strikes++;
|
|
#ifdef ACTOR_TRACE
|
|
fprintf(stderr, "[ACTOR_TRACE] %s: slow strike %d/%d\n",
|
|
actor->name ? actor->name : actor->id,
|
|
actor->slow_strikes, ACTOR_SLOW_STRIKES_MAX);
|
|
#endif
|
|
if (actor->slow_strikes >= ACTOR_SLOW_STRIKES_MAX) {
|
|
#ifdef ACTOR_TRACE
|
|
fprintf(stderr, "[ACTOR_TRACE] %s: %d slow strikes, killing\n",
|
|
actor->name ? actor->name : actor->id, actor->slow_strikes);
|
|
#endif
|
|
actor->disrupt = 1;
|
|
}
|
|
goto ENDTURN;
|
|
}
|
|
|
|
/* NORMAL path: pop a message and execute */
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
int pending = arrlen(actor->letters);
|
|
if (!pending) {
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
goto ENDTURN;
|
|
}
|
|
#ifdef SCHEDULER_DEBUG
|
|
fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n",
|
|
actor->name ? actor->name : actor->id, pending, actor->letters[0].type);
|
|
#endif
|
|
letter l = actor->letters[0];
|
|
arrdel(actor->letters, 0);
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
|
|
atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed);
|
|
JS_SetPauseFlag(actor->context, 0);
|
|
actor->turn_start_ns = cell_ns();
|
|
|
|
/* Register both pause and kill timers */
|
|
pthread_mutex_lock(&engine.lock);
|
|
heap_push(actor->turn_start_ns + ACTOR_FAST_TIMER_NS,
|
|
actor, 0, TIMER_PAUSE);
|
|
heap_push(actor->turn_start_ns + ACTOR_SLOW_TIMER_NS + ACTOR_FAST_TIMER_NS,
|
|
actor, 0, TIMER_KILL);
|
|
pthread_cond_signal(&engine.timer_cond);
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
g_crash_ctx = actor->context;
|
|
|
|
if (l.type == LETTER_BLOB) {
|
|
size_t size = blob_length(l.blob_data) / 8;
|
|
JSValue arg = js_new_blob_stoned_copy(actor->context,
|
|
(void *)blob_data(l.blob_data), size);
|
|
blob_destroy(l.blob_data);
|
|
result = JS_Call(actor->context, actor->message_handle_ref.val,
|
|
JS_NULL, 1, &arg);
|
|
if (JS_IsSuspended(result)) {
|
|
actor->vm_suspended = 1;
|
|
actor->state = ACTOR_SLOW;
|
|
JS_FreeValue(actor->context, arg);
|
|
goto ENDTURN_SLOW;
|
|
}
|
|
if (!uncaught_exception(actor->context, result))
|
|
actor->disrupt = 1;
|
|
JS_FreeValue(actor->context, arg);
|
|
} else if (l.type == LETTER_CALLBACK) {
|
|
result = JS_Call(actor->context, l.callback, JS_NULL, 0, NULL);
|
|
if (JS_IsSuspended(result)) {
|
|
actor->vm_suspended = 1;
|
|
actor->state = ACTOR_SLOW;
|
|
JS_FreeValue(actor->context, l.callback);
|
|
goto ENDTURN_SLOW;
|
|
}
|
|
if (!uncaught_exception(actor->context, result))
|
|
actor->disrupt = 1;
|
|
JS_FreeValue(actor->context, l.callback);
|
|
}
|
|
|
|
if (actor->disrupt) goto ENDTURN;
|
|
actor->slow_strikes = 0; /* completed within fast timer */
|
|
|
|
ENDTURN:
|
|
g_crash_ctx = NULL;
|
|
/* Invalidate any outstanding pause/kill timers for this turn */
|
|
atomic_fetch_add_explicit(&actor->turn_gen, 1, memory_order_relaxed);
|
|
actor->state = ACTOR_IDLE;
|
|
|
|
if (actor->trace_hook)
|
|
actor->trace_hook(actor, CELL_HOOK_EXIT);
|
|
|
|
if (actor->disrupt) {
|
|
#ifdef SCHEDULER_DEBUG
|
|
fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n",
|
|
actor->name ? actor->name : actor->id);
|
|
#endif
|
|
pthread_mutex_unlock(actor->mutex);
|
|
actor_free(actor);
|
|
return;
|
|
}
|
|
|
|
#ifdef SCHEDULER_DEBUG
|
|
fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n",
|
|
actor->name ? actor->name : actor->id, (long)arrlen(actor->letters));
|
|
#endif
|
|
set_actor_state(actor);
|
|
pthread_mutex_unlock(actor->mutex);
|
|
return;
|
|
|
|
ENDTURN_SLOW:
|
|
g_crash_ctx = NULL;
|
|
#ifdef ACTOR_TRACE
|
|
fprintf(stderr, "[ACTOR_TRACE] %s: suspended mid-turn -> SLOW\n",
|
|
actor->name ? actor->name : actor->id);
|
|
#endif
|
|
if (actor->trace_hook)
|
|
actor->trace_hook(actor, CELL_HOOK_EXIT);
|
|
enqueue_actor_priority(actor);
|
|
pthread_mutex_unlock(actor->mutex);
|
|
}
|
|
|
|
/* GC callback: scan actor's letters and timers so the copying GC
|
|
can relocate JSValues stored in C-side data structures. */
|
|
void actor_gc_scan(JSContext *ctx,
|
|
uint8_t *from_base, uint8_t *from_end,
|
|
uint8_t *to_base, uint8_t **to_free, uint8_t *to_end)
|
|
{
|
|
cell_rt *actor = JS_GetContextOpaque(ctx);
|
|
if (!actor) return;
|
|
|
|
/* Lock msg_mutex to synchronize with the timer thread, which reads
|
|
timers and writes letters under the same lock. */
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
|
|
for (int i = 0; i < arrlen(actor->letters); i++) {
|
|
if (actor->letters[i].type == LETTER_CALLBACK) {
|
|
actor->letters[i].callback = gc_copy_value(ctx,
|
|
actor->letters[i].callback,
|
|
from_base, from_end, to_base, to_free, to_end);
|
|
}
|
|
}
|
|
|
|
for (int i = 0; i < hmlen(actor->timers); i++) {
|
|
actor->timers[i].value = gc_copy_value(ctx,
|
|
actor->timers[i].value,
|
|
from_base, from_end, to_base, to_free, to_end);
|
|
}
|
|
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
}
|
|
|
|
void actor_clock(cell_rt *actor, JSValue fn)
|
|
{
|
|
letter l;
|
|
l.type = LETTER_CALLBACK;
|
|
l.callback = JS_DupValue(actor->context, fn);
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
arrput(actor->letters, l);
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
set_actor_state(actor);
|
|
}
|
|
|
|
uint32_t actor_delay(cell_rt *actor, JSValue fn, double seconds)
|
|
{
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
|
|
static uint32_t global_timer_id = 1;
|
|
uint32_t id = global_timer_id++;
|
|
|
|
JSValue cb = JS_DupValue(actor->context, fn);
|
|
hmput(actor->timers, id, cb);
|
|
|
|
uint64_t now = cell_ns();
|
|
uint64_t execute_at = now + (uint64_t)(seconds * 1e9);
|
|
|
|
pthread_mutex_lock(&engine.lock);
|
|
heap_push(execute_at, actor, id, TIMER_JS);
|
|
if (timer_heap[0].timer_id == id) {
|
|
pthread_cond_signal(&engine.timer_cond);
|
|
}
|
|
pthread_mutex_unlock(&engine.lock);
|
|
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
return id;
|
|
}
|
|
|
|
JSValue actor_remove_timer(cell_rt *actor, uint32_t timer_id)
|
|
{
|
|
JSValue cb = JS_NULL;
|
|
pthread_mutex_lock(actor->msg_mutex);
|
|
int id = hmgeti(actor->timers, timer_id);
|
|
if (id != -1) {
|
|
cb = actor->timers[id].value;
|
|
hmdel(actor->timers, timer_id);
|
|
}
|
|
pthread_mutex_unlock(actor->msg_mutex);
|
|
// Note: We don't remove from heap, it will misfire safely
|
|
return cb;
|
|
}
|