use spinlocks and other fixes

This commit is contained in:
2025-06-07 17:09:03 -05:00
parent c02bd06ec0
commit d039e2cfe6
7 changed files with 183 additions and 108 deletions

View File

@@ -820,13 +820,10 @@ var prog_script = `(function ${cell.args.program.name()}_start($_, arg) { var ar
var startfn = js.eval(cell.args.program, prog_script);
$_.clock(_ => {
log.console(`actor %{cell.id} is now running its program.`)
var val = startfn($_, cell.args.arg);
if (val)
throw new Error('Program must not return anything');
})
log.console("end. set a clock.");
})()

View File

@@ -58,18 +58,21 @@ int tracy_profiling_enabled = 0;
#define ENGINE "engine.cm"
static cell_rt **ready_queue = NULL;
static SDL_Mutex *queue_mutex = NULL; // for the ready queue
SDL_Condition *queue_cond = NULL;
static SDL_Semaphore *ready_sem;
static SDL_SpinLock queue_lock = 0;
static cell_rt **main_queue = NULL;
static SDL_Semaphore *main_sem;
static SDL_SpinLock main_queue_lock = 0;
static SDL_Mutex *actors_mutex = NULL;
static struct { char *key; cell_rt *value; } *actors = NULL;
static unsigned char *zip_buffer_global = NULL;
static char *prosperon = NULL;
cell_rt *root_cell = NULL;
static SDL_AtomicInt waiting_threads;
static SDL_AtomicInt shutting_down;
static SDL_Thread **runners = NULL;
static SDL_AtomicInt runners_count;
static inline uint64_t now_ns()
{
@@ -78,15 +81,27 @@ static inline uint64_t now_ns()
static void exit_handler(void)
{
SDL_LockMutex(queue_mutex); /* 1. take the lock */
SDL_SetAtomicInt(&shutting_down, 1); /* 2. store *inside* CS */
SDL_BroadcastCondition(queue_cond); /* 3. wake the waiters */
SDL_UnlockMutex(queue_mutex); /* 4. release H-B created */
SDL_SetAtomicInt(&shutting_down, 1);
int status;
for (int i = 0; i < arrlen(runners); i++)
SDL_WaitThread(runners[i], &status);
/* Signal all waiting threads */
int count = SDL_GetAtomicInt(&runners_count);
for (int i = 0; i < count; i++)
SDL_SignalSemaphore(ready_sem);
/* Signal main thread in case it's waiting */
SDL_SignalSemaphore(main_sem);
/* Wait for all runner threads to exit */
while (SDL_GetAtomicInt(&runners_count) > 0) {
SDL_Delay(10);
}
if (ready_sem)
SDL_DestroySemaphore(ready_sem);
if (main_sem)
SDL_DestroySemaphore(main_sem);
if (actors_mutex)
SDL_DestroyMutex(actors_mutex);
SDL_Quit();
exit(0);
@@ -101,14 +116,23 @@ void actor_free(cell_rt *actor)
SDL_UnlockMutex(actors_mutex);
// If in a queue, remove it
SDL_LockMutex(queue_mutex);
SDL_LockSpinlock(&queue_lock);
for (int i = 0; i < arrlen(ready_queue); i++) {
if (ready_queue[i] == actor) {
arrdel(ready_queue, i);
break;
}
}
SDL_UnlockMutex(queue_mutex);
SDL_UnlockSpinlock(&queue_lock);
SDL_LockSpinlock(&main_queue_lock);
for (int i = 0; i < arrlen(main_queue); i++) {
if (main_queue[i] == actor) {
arrdel(main_queue, i);
break;
}
}
SDL_UnlockSpinlock(&main_queue_lock);
// Do not go forward with actor destruction until the actor is completely free
SDL_LockMutex(actor->msg_mutex);
@@ -122,7 +146,7 @@ void actor_free(cell_rt *actor)
JS_FreeValue(js, actor->unneeded);
JS_FreeAtom(js, actor->actor_sym);
remove_timer(actor->ar);
SDL_RemoveTimer(actor->ar);
for (int i = 0; i < arrlen(actor->js_swapchains); i++)
JS_FreeValue(js, actor->js_swapchains[i]);
@@ -328,7 +352,7 @@ void actor_unneeded(cell_rt *actor, JSValue fn, double seconds)
END:
if (actor->ar) {
remove_timer(actor->ar);
SDL_RemoveTimer(actor->ar);
actor->ar = 0;
}
set_actor_state(actor);
@@ -402,7 +426,7 @@ const char *send_message(const char *id, void *msg)
arrput(target->letters, l);
if (target->ar) {
remove_timer(target->ar);
SDL_RemoveTimer(target->ar);
target->ar = 0;
}
SDL_UnlockMutex(target->msg_mutex);
@@ -412,7 +436,7 @@ const char *send_message(const char *id, void *msg)
return NULL;
}
static Uint32 actor_remove_cb(Uint32 id, Uint32 interval, cell_rt *actor)
static Uint32 actor_remove_cb(cell_rt *actor, Uint32 id, Uint32 interval)
{
actor->disrupt = 1;
@@ -428,7 +452,7 @@ static Uint32 actor_remove_cb(Uint32 id, Uint32 interval, cell_rt *actor)
}
/* Timer callback adds an event to the queue under evt_mutex. */
Uint32 actor_delay_cb(SDL_TimerID id, Uint32 interval, cell_rt *actor)
Uint32 actor_delay_cb(cell_rt *actor, SDL_TimerID id, Uint32 interval)
{
SDL_LockMutex(actor->msg_mutex);
int idx = hmgeti(actor->timers, id);
@@ -436,11 +460,8 @@ Uint32 actor_delay_cb(SDL_TimerID id, Uint32 interval, cell_rt *actor)
JSValue cb = actor->timers[idx].value;
hmdel(actor->timers, id);
letter l = {0};
l.type = LETTER_CALLBACK;
l.callback = cb;
arrput(actor->letters, l);
set_actor_state(actor);
actor_clock(actor, cb);
JS_FreeValue(actor->context, cb);
END:
SDL_UnlockMutex(actor->msg_mutex);
@@ -459,7 +480,7 @@ void set_actor_state(cell_rt *actor)
case ACTOR_RUNNING:
case ACTOR_READY:
if (actor->ar) {
remove_timer(actor->ar);
SDL_RemoveTimer(actor->ar);
actor->ar = 0;
}
break;
@@ -467,13 +488,19 @@ void set_actor_state(cell_rt *actor)
case ACTOR_IDLE:
if (arrlen(actor->letters)) {
actor->state = ACTOR_READY;
SDL_LockMutex(queue_mutex);
if (actor->main_thread_only) {
SDL_LockSpinlock(&main_queue_lock);
arrput(main_queue, actor);
SDL_UnlockSpinlock(&main_queue_lock);
SDL_SignalSemaphore(main_sem);
} else {
SDL_LockSpinlock(&queue_lock);
arrput(ready_queue, actor);
SDL_BroadcastCondition(queue_cond);
SDL_UnlockMutex(queue_mutex);
SDL_UnlockSpinlock(&queue_lock);
SDL_SignalSemaphore(ready_sem);
}
} else if (!arrlen(actor->letters) && !hmlen(actor->timers)) {
actor->ar = add_timer_ns(actor->ar_secs*1e9, actor_remove_cb, actor);
SDL_BroadcastCondition(queue_cond);
actor->ar = SDL_AddTimerNS(actor->ar_secs*1e9, actor_remove_cb, actor);
}
break;
}
@@ -490,9 +517,10 @@ void actor_turn(cell_rt *actor)
TracyCFiberEnter(actor->name);
#endif
TAKETURN:
actor->state = ACTOR_RUNNING;
TAKETURN:
SDL_LockMutex(actor->msg_mutex);
JSValue result;
if (!arrlen(actor->letters)) {
@@ -515,9 +543,14 @@ void actor_turn(cell_rt *actor)
JS_FreeValue(actor->context, l.callback);
}
if (actor->disrupt) goto ENDTURN;
// If there are no waiting threads, bail. otherwise, try for another turn
if (SDL_GetAtomicInt(&waiting_threads) > 0)
goto TAKETURN;
SDL_LockSpinlock(&queue_lock);
int someone_else_waiting = (arrlen(ready_queue) > 0);
SDL_UnlockSpinlock(&queue_lock);
if (!someone_else_waiting) goto TAKETURN;
ENDTURN:
actor->state = ACTOR_IDLE;
@@ -532,6 +565,17 @@ void actor_turn(cell_rt *actor)
SDL_UnlockMutex(actor->mutex);
}
void actor_clock(cell_rt *actor, JSValue fn)
{
SDL_LockMutex(actor->msg_mutex);
letter l;
l.type = LETTER_CALLBACK;
l.callback = JS_DupValue(actor->context, fn);
arrput(actor->letters, l);
SDL_UnlockMutex(actor->msg_mutex);
set_actor_state(actor);
}
/* JS function that schedules a timer. */
JSValue js_actor_delay(JSContext *js, JSValue self, int argc, JSValue *argv)
{
@@ -542,17 +586,12 @@ JSValue js_actor_delay(JSContext *js, JSValue self, int argc, JSValue *argv)
double seconds;
JS_ToFloat64(js, &seconds, argv[1]);
if (seconds <= 0) {
SDL_LockMutex(actor->msg_mutex);
letter l;
l.type = LETTER_CALLBACK;
l.callback = JS_DupValue(js, argv[0]);
arrput(actor->letters, l);
SDL_UnlockMutex(actor->msg_mutex);
return JS_NewInt32(js, -1);
actor_clock(actor, argv[0]);
return JS_UNDEFINED;
}
SDL_LockMutex(actor->msg_mutex);
uint32_t id = add_timer_ns(seconds*1e9, actor_delay_cb, actor);
uint32_t id = SDL_AddTimerNS(seconds*1e9, actor_delay_cb, actor);
JSValue cb = JS_DupValue(js, argv[0]);
hmput(actor->timers, id, cb);
SDL_UnlockMutex(actor->msg_mutex);
@@ -566,7 +605,7 @@ JSValue js_actor_removetimer(JSContext *js, JSValue self, int argc, JSValue *arg
JS_ToUint32(js, &timer_id, argv[0]);
if (timer_id == -1) return JS_UNDEFINED;
remove_timer(timer_id);
SDL_RemoveTimer(timer_id);
JSValue cb = JS_UNDEFINED;
@@ -648,7 +687,7 @@ void actor_disrupt(cell_rt *crt)
static int actor_interrupt_cb(JSRuntime *rt, cell_rt *crt)
{
return crt->disrupt;
return SDL_GetAtomicInt(&shutting_down) || crt->disrupt;
}
void script_startup(cell_rt *prt)
@@ -726,28 +765,25 @@ int uncaught_exception(JSContext *js, JSValue v)
static int actor_runner(void *data)
{
SDL_AddAtomicInt(&runners_count, 1);
while (!SDL_GetAtomicInt(&shutting_down)) {
SDL_LockMutex(queue_mutex);
SDL_LockSpinlock(&queue_lock);
cell_rt *actor = NULL;
for (int i = 0; i < arrlen(ready_queue); i++) {
if (!ready_queue[i]->main_thread_only) {
actor = ready_queue[i];
arrdel(ready_queue,i);
break;
if (arrlen(ready_queue) > 0) {
actor = ready_queue[0];
arrdel(ready_queue, 0);
}
SDL_UnlockSpinlock(&queue_lock);
if (actor)
actor_turn(actor);
SDL_WaitSemaphore(ready_sem);
if (SDL_GetAtomicInt(&shutting_down)) break;
}
if (actor) {
SDL_UnlockMutex(queue_mutex);
actor_turn(actor);
} else {
SDL_AddAtomicInt(&waiting_threads, 1);
SDL_WaitCondition(queue_cond, queue_mutex);
SDL_AddAtomicInt(&waiting_threads, -1);
SDL_UnlockMutex(queue_mutex);
if (SDL_GetAtomicInt(&shutting_down)) return 0;
}
}
SDL_AddAtomicInt(&runners_count, -1);
return 0;
}
@@ -770,41 +806,35 @@ static void signal_handler(int sig)
static void add_runners(int n)
{
/* Launch runner threads */
for (int i = 0; i < n; i++) { // -1 to keep the main thread free
for (int i = 0; i < n; i++) {
char threadname[128];
snprintf(threadname, sizeof(threadname), "actor runner %d", i);
SDL_Thread *thread = SDL_CreateThread(actor_runner, threadname, NULL);
arrput(runners, thread);
SDL_DetachThread(thread);
/* Thread is detached, no need to track */
}
}
static void loop()
{
/* Initialize synchronization primitives */
queue_mutex = SDL_CreateMutex();
queue_cond = SDL_CreateCondition();
ready_sem = SDL_CreateSemaphore(0);
main_sem = SDL_CreateSemaphore(0);
actors_mutex = SDL_CreateMutex();
SDL_SetAtomicInt(&waiting_threads, 0);
SDL_SetAtomicInt(&shutting_down, 0);
SDL_SetAtomicInt(&runners_count, 0);
timer_init();
add_runners(SDL_GetNumLogicalCPUCores()-1);
add_runners(SDL_GetNumLogicalCPUCores());
while (!SDL_GetAtomicInt(&shutting_down)) {
process_due_timers();
SDL_LockMutex(queue_mutex);
SDL_WaitSemaphore(main_sem);
SDL_LockSpinlock(&main_queue_lock);
cell_rt *actor = NULL;
for (int i = 0; i < arrlen(ready_queue); i++) {
if (ready_queue[i]->main_thread_only) {
actor = ready_queue[i];
printf("picking up actor %s\n", actor->id);
arrdel(ready_queue,i);
break;
if (arrlen(main_queue) > 0) {
actor = main_queue[0];
arrdel(main_queue, 0);
}
}
SDL_UnlockMutex(queue_mutex);
SDL_UnlockSpinlock(&main_queue_lock);
if (actor) {
printf("running %s\n", actor->id);
@@ -812,16 +842,6 @@ static void loop()
actor_turn(actor);
continue;
}
uint64_t to_ns = next_timeout_ns();
if (to_ns == UINT64_MAX) {
// No more timers - hence, no more actors ... exit if single threaded.
}
SDL_LockMutex(queue_mutex);
SDL_WaitConditionTimeout(queue_cond, queue_mutex, to_ns);
SDL_UnlockMutex(queue_mutex);
}
}
@@ -840,8 +860,6 @@ int main(int argc, char **argv)
exit(1);
}
printf("main thread %d\n", SDL_GetThreadID(NULL));
#ifdef TRACY_ENABLE
tracy_profiling_enabled = profile_enabled;
#endif

View File

@@ -103,6 +103,7 @@ int uncaught_exception(JSContext *js, JSValue v);
int actor_exists(const char *id);
cell_rt *get_actor(char *id);
void set_actor_state(cell_rt *actor);
void actor_clock(cell_rt *actor, JSValue fn);
int JS_ArrayLength(JSContext *js, JSValue a);

View File

@@ -168,14 +168,7 @@ JSC_CCALL(actor_clock,
return JS_ThrowReferenceError(js, "Argument must be a function.");
cell_rt *actor = JS_GetContextOpaque(js);
SDL_LockMutex(actor->msg_mutex);
letter l;
l.type = LETTER_CALLBACK;
l.callback = JS_DupValue(js, argv[0]);
arrput(actor->letters, l);
SDL_UnlockMutex(actor->msg_mutex);
printf("actor %s clocked\n", actor->id);
set_actor_state(actor);
actor_clock(actor, argv[0]);
)
static const JSCFunctionListEntry js_actor_funcs[] = {

View File

@@ -3,17 +3,50 @@
#include <limits.h>
#include "stb_ds.h"
extern SDL_Condition *queue_cond;
/* Global timer state */
static timer_t *timers = NULL;
static Uint32 next_timer_id = 1;
static SDL_Mutex *timer_mutex = NULL;
static SDL_Condition *timer_cond = NULL;
static SDL_Thread *timer_thread = NULL;
static SDL_AtomicInt timer_running;
static int timer_loop(void *data)
{
while (SDL_GetAtomicInt(&timer_running)) {
SDL_LockMutex(timer_mutex);
uint64_t to_ns = next_timeout_ns();
if (to_ns == UINT64_MAX) {
/* No timers, wait indefinitely */
SDL_WaitCondition(timer_cond, timer_mutex);
} else if (to_ns == 0) {
/* Timer(s) already due, process immediately */
SDL_UnlockMutex(timer_mutex);
process_due_timers();
continue;
} else {
/* Wait until next timer is due or a new timer is added */
Uint32 wait_ms = (Uint32)(to_ns / 1000000);
if (wait_ms == 0) wait_ms = 1; /* Minimum 1ms wait */
SDL_WaitConditionTimeout(timer_cond, timer_mutex, wait_ms);
}
SDL_UnlockMutex(timer_mutex);
/* Process any due timers */
process_due_timers();
}
return 0;
}
void timer_init(void)
{
if (!timer_mutex) {
timer_mutex = SDL_CreateMutex();
timer_cond = SDL_CreateCondition();
SDL_SetAtomicInt(&timer_running, 1);
timer_thread = SDL_CreateThread(timer_loop, "timer thread", NULL);
}
}
@@ -35,7 +68,7 @@ Uint32 add_timer_ns(uint64_t delay_ns, TimerCallback callback, void *param)
t.param = param;
arrput(timers, t);
SDL_UnlockMutex(timer_mutex);
SDL_BroadcastCondition(queue_cond);
SDL_SignalCondition(timer_cond);
return t.id;
}
@@ -98,3 +131,26 @@ uint64_t next_timeout_ns(void)
return 0;
return min_due - now;
}
void timer_quit(void)
{
if (timer_thread) {
SDL_SetAtomicInt(&timer_running, 0);
SDL_SignalCondition(timer_cond);
SDL_WaitThread(timer_thread, NULL);
timer_thread = NULL;
}
if (timer_cond) {
SDL_DestroyCondition(timer_cond);
timer_cond = NULL;
}
if (timer_mutex) {
SDL_DestroyMutex(timer_mutex);
timer_mutex = NULL;
}
arrfree(timers);
timers = NULL;
}

View File

@@ -17,6 +17,9 @@ typedef struct {
/* Initialize timer system - must be called once */
void timer_init(void);
/* Shutdown timer system - must be called before exit */
void timer_quit(void);
/* Schedule a new timer to fire after delay_ns nanoseconds */
Uint32 add_timer_ns(uint64_t delay_ns, TimerCallback callback, void *param);

View File

@@ -8,9 +8,15 @@ var start_time = time.number()
var host = arg[0]
var path = arg[1] || '/'
$_.start(e => {
send(e.actor, { op: 'get', domain: host, port: 80}, addrs => {
log.console(json.encode(addrs[0]))
})
}, 'dig')
/*
var addrs = socket.getaddrinfo(host, '80')
var addr = addrs[0]
log.console(json.encode(addrs))
var sock = socket.socket()
socket.connect(sock, addr)
@@ -38,3 +44,4 @@ function get_chunk()
}
get_chunk()
*/