diff --git a/scripts/engine.cm b/scripts/engine.cm index f8e83e05..95e2e286 100644 --- a/scripts/engine.cm +++ b/scripts/engine.cm @@ -820,13 +820,10 @@ var prog_script = `(function ${cell.args.program.name()}_start($_, arg) { var ar var startfn = js.eval(cell.args.program, prog_script); $_.clock(_ => { - log.console(`actor %{cell.id} is now running its program.`) var val = startfn($_, cell.args.arg); if (val) throw new Error('Program must not return anything'); }) -log.console("end. set a clock."); - })() \ No newline at end of file diff --git a/source/cell.c b/source/cell.c index c4bb13ed..875a9613 100644 --- a/source/cell.c +++ b/source/cell.c @@ -58,18 +58,21 @@ int tracy_profiling_enabled = 0; #define ENGINE "engine.cm" static cell_rt **ready_queue = NULL; -static SDL_Mutex *queue_mutex = NULL; // for the ready queue -SDL_Condition *queue_cond = NULL; +static SDL_Semaphore *ready_sem; +static SDL_SpinLock queue_lock = 0; + +static cell_rt **main_queue = NULL; +static SDL_Semaphore *main_sem; +static SDL_SpinLock main_queue_lock = 0; + static SDL_Mutex *actors_mutex = NULL; static struct { char *key; cell_rt *value; } *actors = NULL; static unsigned char *zip_buffer_global = NULL; static char *prosperon = NULL; cell_rt *root_cell = NULL; -static SDL_AtomicInt waiting_threads; static SDL_AtomicInt shutting_down; - -static SDL_Thread **runners = NULL; +static SDL_AtomicInt runners_count; static inline uint64_t now_ns() { @@ -78,15 +81,27 @@ static inline uint64_t now_ns() static void exit_handler(void) { - SDL_LockMutex(queue_mutex); /* 1. take the lock */ - SDL_SetAtomicInt(&shutting_down, 1); /* 2. store *inside* CS */ - SDL_BroadcastCondition(queue_cond); /* 3. wake the waiters */ - SDL_UnlockMutex(queue_mutex); /* 4. release – H-B created */ SDL_SetAtomicInt(&shutting_down, 1); + + /* Signal all waiting threads */ + int count = SDL_GetAtomicInt(&runners_count); + for (int i = 0; i < count; i++) + SDL_SignalSemaphore(ready_sem); + + /* Signal main thread in case it's waiting */ + SDL_SignalSemaphore(main_sem); - int status; - for (int i = 0; i < arrlen(runners); i++) - SDL_WaitThread(runners[i], &status); + /* Wait for all runner threads to exit */ + while (SDL_GetAtomicInt(&runners_count) > 0) { + SDL_Delay(10); + } + + if (ready_sem) + SDL_DestroySemaphore(ready_sem); + if (main_sem) + SDL_DestroySemaphore(main_sem); + if (actors_mutex) + SDL_DestroyMutex(actors_mutex); SDL_Quit(); exit(0); @@ -101,14 +116,23 @@ void actor_free(cell_rt *actor) SDL_UnlockMutex(actors_mutex); // If in a queue, remove it - SDL_LockMutex(queue_mutex); + SDL_LockSpinlock(&queue_lock); for (int i = 0; i < arrlen(ready_queue); i++) { if (ready_queue[i] == actor) { arrdel(ready_queue, i); break; } } - SDL_UnlockMutex(queue_mutex); + SDL_UnlockSpinlock(&queue_lock); + + SDL_LockSpinlock(&main_queue_lock); + for (int i = 0; i < arrlen(main_queue); i++) { + if (main_queue[i] == actor) { + arrdel(main_queue, i); + break; + } + } + SDL_UnlockSpinlock(&main_queue_lock); // Do not go forward with actor destruction until the actor is completely free SDL_LockMutex(actor->msg_mutex); @@ -122,7 +146,7 @@ void actor_free(cell_rt *actor) JS_FreeValue(js, actor->unneeded); JS_FreeAtom(js, actor->actor_sym); - remove_timer(actor->ar); + SDL_RemoveTimer(actor->ar); for (int i = 0; i < arrlen(actor->js_swapchains); i++) JS_FreeValue(js, actor->js_swapchains[i]); @@ -328,7 +352,7 @@ void actor_unneeded(cell_rt *actor, JSValue fn, double seconds) END: if (actor->ar) { - remove_timer(actor->ar); + SDL_RemoveTimer(actor->ar); actor->ar = 0; } set_actor_state(actor); @@ -402,7 +426,7 @@ const char *send_message(const char *id, void *msg) arrput(target->letters, l); if (target->ar) { - remove_timer(target->ar); + SDL_RemoveTimer(target->ar); target->ar = 0; } SDL_UnlockMutex(target->msg_mutex); @@ -412,7 +436,7 @@ const char *send_message(const char *id, void *msg) return NULL; } -static Uint32 actor_remove_cb(Uint32 id, Uint32 interval, cell_rt *actor) +static Uint32 actor_remove_cb(cell_rt *actor, Uint32 id, Uint32 interval) { actor->disrupt = 1; @@ -428,19 +452,16 @@ static Uint32 actor_remove_cb(Uint32 id, Uint32 interval, cell_rt *actor) } /* Timer callback adds an event to the queue under evt_mutex. */ -Uint32 actor_delay_cb(SDL_TimerID id, Uint32 interval, cell_rt *actor) +Uint32 actor_delay_cb(cell_rt *actor, SDL_TimerID id, Uint32 interval) { SDL_LockMutex(actor->msg_mutex); int idx = hmgeti(actor->timers, id); if (idx == -1) goto END; - + JSValue cb = actor->timers[idx].value; hmdel(actor->timers, id); - letter l = {0}; - l.type = LETTER_CALLBACK; - l.callback = cb; - arrput(actor->letters, l); - set_actor_state(actor); + actor_clock(actor, cb); + JS_FreeValue(actor->context, cb); END: SDL_UnlockMutex(actor->msg_mutex); @@ -459,7 +480,7 @@ void set_actor_state(cell_rt *actor) case ACTOR_RUNNING: case ACTOR_READY: if (actor->ar) { - remove_timer(actor->ar); + SDL_RemoveTimer(actor->ar); actor->ar = 0; } break; @@ -467,13 +488,19 @@ void set_actor_state(cell_rt *actor) case ACTOR_IDLE: if (arrlen(actor->letters)) { actor->state = ACTOR_READY; - SDL_LockMutex(queue_mutex); - arrput(ready_queue, actor); - SDL_BroadcastCondition(queue_cond); - SDL_UnlockMutex(queue_mutex); + if (actor->main_thread_only) { + SDL_LockSpinlock(&main_queue_lock); + arrput(main_queue, actor); + SDL_UnlockSpinlock(&main_queue_lock); + SDL_SignalSemaphore(main_sem); + } else { + SDL_LockSpinlock(&queue_lock); + arrput(ready_queue, actor); + SDL_UnlockSpinlock(&queue_lock); + SDL_SignalSemaphore(ready_sem); + } } else if (!arrlen(actor->letters) && !hmlen(actor->timers)) { - actor->ar = add_timer_ns(actor->ar_secs*1e9, actor_remove_cb, actor); - SDL_BroadcastCondition(queue_cond); + actor->ar = SDL_AddTimerNS(actor->ar_secs*1e9, actor_remove_cb, actor); } break; } @@ -490,9 +517,10 @@ void actor_turn(cell_rt *actor) TracyCFiberEnter(actor->name); #endif - TAKETURN: actor->state = ACTOR_RUNNING; + TAKETURN: + SDL_LockMutex(actor->msg_mutex); JSValue result; if (!arrlen(actor->letters)) { @@ -514,10 +542,15 @@ void actor_turn(cell_rt *actor) uncaught_exception(actor->context, result); JS_FreeValue(actor->context, l.callback); } + + if (actor->disrupt) goto ENDTURN; // If there are no waiting threads, bail. otherwise, try for another turn - if (SDL_GetAtomicInt(&waiting_threads) > 0) - goto TAKETURN; + SDL_LockSpinlock(&queue_lock); + int someone_else_waiting = (arrlen(ready_queue) > 0); + SDL_UnlockSpinlock(&queue_lock); + + if (!someone_else_waiting) goto TAKETURN; ENDTURN: actor->state = ACTOR_IDLE; @@ -532,6 +565,17 @@ void actor_turn(cell_rt *actor) SDL_UnlockMutex(actor->mutex); } +void actor_clock(cell_rt *actor, JSValue fn) +{ + SDL_LockMutex(actor->msg_mutex); + letter l; + l.type = LETTER_CALLBACK; + l.callback = JS_DupValue(actor->context, fn); + arrput(actor->letters, l); + SDL_UnlockMutex(actor->msg_mutex); + set_actor_state(actor); +} + /* JS function that schedules a timer. */ JSValue js_actor_delay(JSContext *js, JSValue self, int argc, JSValue *argv) { @@ -542,17 +586,12 @@ JSValue js_actor_delay(JSContext *js, JSValue self, int argc, JSValue *argv) double seconds; JS_ToFloat64(js, &seconds, argv[1]); if (seconds <= 0) { - SDL_LockMutex(actor->msg_mutex); - letter l; - l.type = LETTER_CALLBACK; - l.callback = JS_DupValue(js, argv[0]); - arrput(actor->letters, l); - SDL_UnlockMutex(actor->msg_mutex); - return JS_NewInt32(js, -1); + actor_clock(actor, argv[0]); + return JS_UNDEFINED; } SDL_LockMutex(actor->msg_mutex); - uint32_t id = add_timer_ns(seconds*1e9, actor_delay_cb, actor); + uint32_t id = SDL_AddTimerNS(seconds*1e9, actor_delay_cb, actor); JSValue cb = JS_DupValue(js, argv[0]); hmput(actor->timers, id, cb); SDL_UnlockMutex(actor->msg_mutex); @@ -566,7 +605,7 @@ JSValue js_actor_removetimer(JSContext *js, JSValue self, int argc, JSValue *arg JS_ToUint32(js, &timer_id, argv[0]); if (timer_id == -1) return JS_UNDEFINED; - remove_timer(timer_id); + SDL_RemoveTimer(timer_id); JSValue cb = JS_UNDEFINED; @@ -648,7 +687,7 @@ void actor_disrupt(cell_rt *crt) static int actor_interrupt_cb(JSRuntime *rt, cell_rt *crt) { - return crt->disrupt; + return SDL_GetAtomicInt(&shutting_down) || crt->disrupt; } void script_startup(cell_rt *prt) @@ -726,28 +765,25 @@ int uncaught_exception(JSContext *js, JSValue v) static int actor_runner(void *data) { + SDL_AddAtomicInt(&runners_count, 1); + while (!SDL_GetAtomicInt(&shutting_down)) { - SDL_LockMutex(queue_mutex); + SDL_LockSpinlock(&queue_lock); cell_rt *actor = NULL; - for (int i = 0; i < arrlen(ready_queue); i++) { - if (!ready_queue[i]->main_thread_only) { - actor = ready_queue[i]; - arrdel(ready_queue,i); - break; - } + if (arrlen(ready_queue) > 0) { + actor = ready_queue[0]; + arrdel(ready_queue, 0); } + SDL_UnlockSpinlock(&queue_lock); - if (actor) { - SDL_UnlockMutex(queue_mutex); + if (actor) actor_turn(actor); - } else { - SDL_AddAtomicInt(&waiting_threads, 1); - SDL_WaitCondition(queue_cond, queue_mutex); - SDL_AddAtomicInt(&waiting_threads, -1); - SDL_UnlockMutex(queue_mutex); - if (SDL_GetAtomicInt(&shutting_down)) return 0; - } + + SDL_WaitSemaphore(ready_sem); + if (SDL_GetAtomicInt(&shutting_down)) break; } + + SDL_AddAtomicInt(&runners_count, -1); return 0; } @@ -770,41 +806,35 @@ static void signal_handler(int sig) static void add_runners(int n) { /* Launch runner threads */ - for (int i = 0; i < n; i++) { // -1 to keep the main thread free + for (int i = 0; i < n; i++) { char threadname[128]; snprintf(threadname, sizeof(threadname), "actor runner %d", i); SDL_Thread *thread = SDL_CreateThread(actor_runner, threadname, NULL); - arrput(runners, thread); + SDL_DetachThread(thread); + /* Thread is detached, no need to track */ } } static void loop() { /* Initialize synchronization primitives */ - queue_mutex = SDL_CreateMutex(); - queue_cond = SDL_CreateCondition(); + ready_sem = SDL_CreateSemaphore(0); + main_sem = SDL_CreateSemaphore(0); actors_mutex = SDL_CreateMutex(); - SDL_SetAtomicInt(&waiting_threads, 0); SDL_SetAtomicInt(&shutting_down, 0); + SDL_SetAtomicInt(&runners_count, 0); - timer_init(); - - add_runners(SDL_GetNumLogicalCPUCores()-1); + add_runners(SDL_GetNumLogicalCPUCores()); while (!SDL_GetAtomicInt(&shutting_down)) { - process_due_timers(); - - SDL_LockMutex(queue_mutex); + SDL_WaitSemaphore(main_sem); + SDL_LockSpinlock(&main_queue_lock); cell_rt *actor = NULL; - for (int i = 0; i < arrlen(ready_queue); i++) { - if (ready_queue[i]->main_thread_only) { - actor = ready_queue[i]; - printf("picking up actor %s\n", actor->id); - arrdel(ready_queue,i); - break; - } + if (arrlen(main_queue) > 0) { + actor = main_queue[0]; + arrdel(main_queue, 0); } - SDL_UnlockMutex(queue_mutex); + SDL_UnlockSpinlock(&main_queue_lock); if (actor) { printf("running %s\n", actor->id); @@ -812,16 +842,6 @@ static void loop() actor_turn(actor); continue; } - - uint64_t to_ns = next_timeout_ns(); - - if (to_ns == UINT64_MAX) { - // No more timers - hence, no more actors ... exit if single threaded. - } - - SDL_LockMutex(queue_mutex); - SDL_WaitConditionTimeout(queue_cond, queue_mutex, to_ns); - SDL_UnlockMutex(queue_mutex); } } @@ -840,8 +860,6 @@ int main(int argc, char **argv) exit(1); } - printf("main thread %d\n", SDL_GetThreadID(NULL)); - #ifdef TRACY_ENABLE tracy_profiling_enabled = profile_enabled; #endif diff --git a/source/cell.h b/source/cell.h index e326b04f..b2cd9af8 100644 --- a/source/cell.h +++ b/source/cell.h @@ -103,6 +103,7 @@ int uncaught_exception(JSContext *js, JSValue v); int actor_exists(const char *id); cell_rt *get_actor(char *id); void set_actor_state(cell_rt *actor); +void actor_clock(cell_rt *actor, JSValue fn); int JS_ArrayLength(JSContext *js, JSValue a); diff --git a/source/qjs_actor.c b/source/qjs_actor.c index 3e665f65..27f931f2 100644 --- a/source/qjs_actor.c +++ b/source/qjs_actor.c @@ -166,16 +166,9 @@ JSC_CCALL(actor_on_exception, JSC_CCALL(actor_clock, if (!JS_IsFunction(js, argv[0])) return JS_ThrowReferenceError(js, "Argument must be a function."); - + cell_rt *actor = JS_GetContextOpaque(js); - SDL_LockMutex(actor->msg_mutex); - letter l; - l.type = LETTER_CALLBACK; - l.callback = JS_DupValue(js, argv[0]); - arrput(actor->letters, l); - SDL_UnlockMutex(actor->msg_mutex); - printf("actor %s clocked\n", actor->id); - set_actor_state(actor); + actor_clock(actor, argv[0]); ) static const JSCFunctionListEntry js_actor_funcs[] = { diff --git a/source/timer.c b/source/timer.c index c47679ab..a0eb73a5 100644 --- a/source/timer.c +++ b/source/timer.c @@ -3,17 +3,50 @@ #include #include "stb_ds.h" -extern SDL_Condition *queue_cond; - /* Global timer state */ static timer_t *timers = NULL; static Uint32 next_timer_id = 1; static SDL_Mutex *timer_mutex = NULL; +static SDL_Condition *timer_cond = NULL; +static SDL_Thread *timer_thread = NULL; +static SDL_AtomicInt timer_running; + +static int timer_loop(void *data) +{ + while (SDL_GetAtomicInt(&timer_running)) { + SDL_LockMutex(timer_mutex); + + uint64_t to_ns = next_timeout_ns(); + if (to_ns == UINT64_MAX) { + /* No timers, wait indefinitely */ + SDL_WaitCondition(timer_cond, timer_mutex); + } else if (to_ns == 0) { + /* Timer(s) already due, process immediately */ + SDL_UnlockMutex(timer_mutex); + process_due_timers(); + continue; + } else { + /* Wait until next timer is due or a new timer is added */ + Uint32 wait_ms = (Uint32)(to_ns / 1000000); + if (wait_ms == 0) wait_ms = 1; /* Minimum 1ms wait */ + SDL_WaitConditionTimeout(timer_cond, timer_mutex, wait_ms); + } + SDL_UnlockMutex(timer_mutex); + + /* Process any due timers */ + process_due_timers(); + } + + return 0; +} void timer_init(void) { if (!timer_mutex) { timer_mutex = SDL_CreateMutex(); + timer_cond = SDL_CreateCondition(); + SDL_SetAtomicInt(&timer_running, 1); + timer_thread = SDL_CreateThread(timer_loop, "timer thread", NULL); } } @@ -35,7 +68,7 @@ Uint32 add_timer_ns(uint64_t delay_ns, TimerCallback callback, void *param) t.param = param; arrput(timers, t); SDL_UnlockMutex(timer_mutex); - SDL_BroadcastCondition(queue_cond); + SDL_SignalCondition(timer_cond); return t.id; } @@ -97,4 +130,27 @@ uint64_t next_timeout_ns(void) if (min_due <= now) return 0; return min_due - now; +} + +void timer_quit(void) +{ + if (timer_thread) { + SDL_SetAtomicInt(&timer_running, 0); + SDL_SignalCondition(timer_cond); + SDL_WaitThread(timer_thread, NULL); + timer_thread = NULL; + } + + if (timer_cond) { + SDL_DestroyCondition(timer_cond); + timer_cond = NULL; + } + + if (timer_mutex) { + SDL_DestroyMutex(timer_mutex); + timer_mutex = NULL; + } + + arrfree(timers); + timers = NULL; } \ No newline at end of file diff --git a/source/timer.h b/source/timer.h index faee06f8..153833a6 100644 --- a/source/timer.h +++ b/source/timer.h @@ -17,6 +17,9 @@ typedef struct { /* Initialize timer system - must be called once */ void timer_init(void); +/* Shutdown timer system - must be called before exit */ +void timer_quit(void); + /* Schedule a new timer to fire after delay_ns nanoseconds */ Uint32 add_timer_ns(uint64_t delay_ns, TimerCallback callback, void *param); diff --git a/tests/httpget.ce b/tests/httpget.ce index 7635cae8..18a6f4a7 100644 --- a/tests/httpget.ce +++ b/tests/httpget.ce @@ -8,9 +8,15 @@ var start_time = time.number() var host = arg[0] var path = arg[1] || '/' +$_.start(e => { + send(e.actor, { op: 'get', domain: host, port: 80}, addrs => { + log.console(json.encode(addrs[0])) + }) +}, 'dig') +/* var addrs = socket.getaddrinfo(host, '80') var addr = addrs[0] - +log.console(json.encode(addrs)) var sock = socket.socket() socket.connect(sock, addr) @@ -38,3 +44,4 @@ function get_chunk() } get_chunk() +*/ \ No newline at end of file