#include "stb_ds.h" #include "cell.h" #include "cell_internal.h" // --- Data Structures --- // Simple linked list for the ready queue typedef struct actor_node { cell_rt *actor; struct actor_node *next; } actor_node; // Timer node for the min-heap typedef struct { uint64_t execute_at_ns; cell_rt *actor; uint32_t timer_id; int is_native; // 1 for native remove timer, 0 for JS timer } timer_node; // --- Global State --- static actor_node *ready_head = NULL; static actor_node *ready_tail = NULL; static timer_node *timer_heap = NULL; // stb_ds array static struct { char *key; cell_rt *value; } *actors = NULL; // stb_ds hashmap static int shutting_down = 0; // --- Forward Declarations --- void actor_turn(cell_rt *actor); uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval); // --- Heap Helpers --- static void heap_push(uint64_t when, cell_rt *actor, uint32_t timer_id, int is_native) { timer_node node = { .execute_at_ns = when, .actor = actor, .timer_id = timer_id, .is_native = is_native }; arrput(timer_heap, node); // Bubble up int i = arrlen(timer_heap) - 1; while (i > 0) { int parent = (i - 1) / 2; if (timer_heap[i].execute_at_ns >= timer_heap[parent].execute_at_ns) break; timer_node tmp = timer_heap[i]; timer_heap[i] = timer_heap[parent]; timer_heap[parent] = tmp; i = parent; } } static int heap_pop(timer_node *out) { if (arrlen(timer_heap) == 0) return 0; *out = timer_heap[0]; timer_node last = arrpop(timer_heap); if (arrlen(timer_heap) > 0) { timer_heap[0] = last; // Bubble down int i = 0; int n = arrlen(timer_heap); while (1) { int left = 2 * i + 1; int right = 2 * i + 2; int smallest = i; if (left < n && timer_heap[left].execute_at_ns < timer_heap[smallest].execute_at_ns) smallest = left; if (right < n && timer_heap[right].execute_at_ns < timer_heap[smallest].execute_at_ns) smallest = right; if (smallest == i) break; timer_node tmp = timer_heap[i]; timer_heap[i] = timer_heap[smallest]; timer_heap[smallest] = tmp; i = smallest; } } return 1; } // --- Implementation --- void actor_initialize(void) { } void actor_free(cell_rt *actor) { shdel(actors, actor->id); // Remove from ready queue if present (O(N)) if (ready_head) { if (ready_head->actor == actor) { actor_node *next = ready_head->next; free(ready_head); ready_head = next; if (!ready_head) ready_tail = NULL; } else { actor_node *curr = ready_head; while (curr->next) { if (curr->next->actor == actor) { actor_node *to_free = curr->next; curr->next = to_free->next; if (to_free == ready_tail) ready_tail = curr; free(to_free); break; } curr = curr->next; } } } JSContext *js = actor->context; JS_FreeValue(js, actor->idx_buffer); JS_FreeValue(js, actor->message_handle); JS_FreeValue(js, actor->on_exception); JS_FreeValue(js, actor->unneeded); JS_FreeValue(js, actor->actor_sym); /* Free timer callbacks stored in actor */ for (int i = 0; i < hmlen(actor->timers); i++) { JS_FreeValue(js, actor->timers[i].value); } hmfree(actor->timers); /* Free all letters in the queue */ for (int i = 0; i < arrlen(actor->letters); i++) { if (actor->letters[i].type == LETTER_BLOB) { blob_destroy(actor->letters[i].blob_data); } else if (actor->letters[i].type == LETTER_CALLBACK) { JS_FreeValue(js, actor->letters[i].callback); } } arrfree(actor->letters); JSRuntime *rt = JS_GetRuntime(js); JS_SetInterruptHandler(rt, NULL, NULL); JS_FreeContext(js); JS_FreeRuntime(rt); free(actor->id); free(actor); int actor_count = shlen(actors); if (actor_count == 0) exit(0); } void actor_unneeded(cell_rt *actor, JSValue fn, double seconds) { if (actor->disrupt) return; JS_FreeValue(actor->context, actor->unneeded); if (!JS_IsFunction(actor->context, fn)) { actor->unneeded = JS_NULL; goto END; } actor->unneeded = JS_DupValue(actor->context, fn); actor->ar_secs = seconds; END: // If there was an existing unneeded timer, it will be handled/ignored in set_actor_state logic // or we can explicitly invalidate it if we tracked the ID. // For now, set_actor_state will schedule a new one if idle. if (actor->ar) { // In single threaded, we can't easily remove from heap O(N), // but we can just let it fire and check ID match. actor->ar = 0; } set_actor_state(actor); } void exit_handler(void) { shutting_down = 1; shfree(actors); arrfree(timer_heap); // Clean up queue? while(ready_head) { actor_node *n = ready_head; ready_head = n->next; free(n); } exit(0); } int actor_exists(const char *id) { int idx = shgeti(actors, id); return idx != -1; } void set_actor_state(cell_rt *actor) { if (actor->disrupt) { actor_free(actor); return; } // No mutex needed in single threaded switch(actor->state) { case ACTOR_RUNNING: case ACTOR_READY: if (actor->ar) { // Invalidate existing unneeded timer actor->ar = 0; } break; case ACTOR_IDLE: if (arrlen(actor->letters)) { actor->state = ACTOR_READY; // Add to ready queue actor_node *n = malloc(sizeof(actor_node)); n->actor = actor; n->next = NULL; if (ready_tail) { ready_tail->next = n; } else { ready_head = n; } ready_tail = n; } else if (!arrlen(actor->letters) && !hmlen(actor->timers)) { // Schedule remove timer static uint32_t global_timer_id = 1; uint32_t id = global_timer_id++; actor->ar = id; uint64_t now = cell_ns(); uint64_t execute_at = now + (uint64_t)(actor->ar_secs * 1e9); heap_push(execute_at, actor, id, 1); // 1 = native remove } break; } } uint32_t actor_remove_cb(cell_rt *actor, uint32_t id, uint32_t interval) { // Check if this timer is still valid (match actor->ar) if (actor->ar != id && id != 0) { return 0; } actor->disrupt = 1; if (!JS_IsNull(actor->unneeded)) { JSValue ret = JS_Call(actor->context, actor->unneeded, JS_NULL, 0, NULL); uncaught_exception(actor->context, ret); } int should_free = (actor->state == ACTOR_IDLE); if (should_free) actor_free(actor); return 0; } cell_rt *get_actor(char *id) { int idx = shgeti(actors, id); if (idx == -1) { return NULL; } return shget(actors, id); } void actor_loop() { while (!shutting_down) { // 1. Check Ready Queue if (ready_head) { actor_node *node = ready_head; ready_head = node->next; if (!ready_head) ready_tail = NULL; actor_turn(node->actor); free(node); continue; // Loop again to check for more work or timers } // 2. Check Timers uint64_t now = cell_ns(); if (arrlen(timer_heap) > 0) { if (timer_heap[0].execute_at_ns <= now) { timer_node t; heap_pop(&t); if (t.is_native) { actor_remove_cb(t.actor, t.timer_id, 0); } else { // JS Timer int idx = hmgeti(t.actor->timers, t.timer_id); if (idx != -1) { JSValue cb = t.actor->timers[idx].value; hmdel(t.actor->timers, t.timer_id); actor_clock(t.actor, cb); JS_FreeValue(t.actor->context, cb); } } continue; // Loop again } else { // Wait until next timer uint64_t diff = timer_heap[0].execute_at_ns - now; double seconds = (double)diff / 1e9; cell_sleep(seconds); } } else { exit(0); } } } cell_rt *create_actor(void *wota) { cell_rt *actor = calloc(sizeof(*actor), 1); actor->init_wota = wota; actor->idx_buffer = JS_NULL; actor->message_handle = JS_NULL; actor->unneeded = JS_NULL; actor->on_exception = JS_NULL; actor->actor_sym = JS_NULL; arrsetcap(actor->letters, 5); // No mutexes needed actor->mutex = NULL; actor->msg_mutex = NULL; script_startup(actor); set_actor_state(actor); return actor; } const char *register_actor(const char *id, cell_rt *actor, int mainthread, double ar) { actor->main_thread_only = mainthread; actor->id = strdup(id); actor->ar_secs = ar; if (shgeti(actors, id) != -1) { free(actor->id); return "Actor with given ID already exists."; } shput(actors, id, actor); return NULL; } int actor_interrupt_cb(JSRuntime *rt, cell_rt *crt) { return shutting_down || crt->disrupt; } const char *send_message(const char *id, void *msg) { cell_rt *target = get_actor(id); if (!target) { blob_destroy((blob *)msg); return "Could not get actor from id."; } letter l; l.type = LETTER_BLOB; l.blob_data = (blob *)msg; arrput(target->letters, l); if (target->ar) { // Invalidate unneeded timer target->ar = 0; } set_actor_state(target); return NULL; } void actor_turn(cell_rt *actor) { actor->state = ACTOR_RUNNING; JSValue result; TAKETURN: if (!arrlen(actor->letters)) { goto ENDTURN; } letter l = actor->letters[0]; arrdel(actor->letters, 0); if (l.type == LETTER_BLOB) { // Create a JS blob from the C blob size_t size = blob_length(l.blob_data) / 8; // Convert bits to bytes JSValue arg = js_new_blob_stoned_copy(actor->context, (void *)blob_data(l.blob_data), size); blob_destroy(l.blob_data); result = JS_Call(actor->context, actor->message_handle, JS_NULL, 1, &arg); uncaught_exception(actor->context, result); JS_FreeValue(actor->context, arg); } else if (l.type == LETTER_CALLBACK) { result = JS_Call(actor->context, l.callback, JS_NULL, 0, NULL); uncaught_exception(actor->context, result); JS_FreeValue(actor->context, l.callback); } if (actor->disrupt) goto ENDTURN; ENDTURN: actor->state = ACTOR_IDLE; set_actor_state(actor); } void actor_clock(cell_rt *actor, JSValue fn) { letter l; l.type = LETTER_CALLBACK; l.callback = JS_DupValue(actor->context, fn); arrput(actor->letters, l); set_actor_state(actor); } uint32_t actor_delay(cell_rt *actor, JSValue fn, double seconds) { static uint32_t global_timer_id = 1; uint32_t id = global_timer_id++; JSValue cb = JS_DupValue(actor->context, fn); hmput(actor->timers, id, cb); uint64_t now = cell_ns(); uint64_t execute_at = now + (uint64_t)(seconds * 1e9); heap_push(execute_at, actor, id, 0); // 0 = JS timer return id; } JSValue actor_remove_timer(cell_rt *actor, uint32_t timer_id) { JSValue cb = JS_NULL; int id = hmgeti(actor->timers, timer_id); if (id != -1) { cb = actor->timers[id].value; hmdel(actor->timers, timer_id); } return cb; }