actors keep running if they have messages, until no threads are available

This commit is contained in:
2025-06-06 18:22:12 -05:00
parent 83c816fd0e
commit 1a61ae6f77
4 changed files with 45 additions and 8 deletions

View File

@@ -6,7 +6,7 @@ ar_timer = 60
actor_memory = 0
net_service = 0.1
reply_timeout = 60
actor_max = "10_000"
actor_max = 10_000
stack_max = 0
[actors]
[actors.prosperon/sdl_video]

View File

@@ -73,6 +73,8 @@ static unsigned char *zip_buffer_global = NULL;
static char *prosperon = NULL;
cell_rt *root_cell = NULL;
SDL_AtomicInt waiting_threads;
static SDL_Thread **runners = NULL;
static inline uint64_t now_ns()
@@ -489,11 +491,15 @@ void actor_turn(cell_rt *actor)
TracyCFiberEnter(actor->name);
#endif
SDL_LockMutex(actor->msg_mutex);
TAKETURN:
actor->state = ACTOR_RUNNING;
SDL_LockMutex(actor->msg_mutex);
JSValue result;
if (!arrlen(actor->letters)) {
SDL_UnlockMutex(actor->msg_mutex);
goto ENDTURN;
}
letter l = actor->letters[0];
arrdel(actor->letters, 0);
SDL_UnlockMutex(actor->msg_mutex);
@@ -505,13 +511,18 @@ void actor_turn(cell_rt *actor)
uncaught_exception(actor->context, result);
JS_FreeValue(actor->context, arg);
} else if (l.type == LETTER_CALLBACK) {
printf("running a callback\n");
result = JS_Call(actor->context, l.callback, JS_UNDEFINED, 0, NULL);
printf("turn is over for %s\n", actor->id);
uncaught_exception(actor->context, result);
JS_FreeValue(actor->context, l.callback);
}
// If there are no waiting threads, bail. otherwise, try for another turn
if (SDL_GetAtomicInt(&waiting_threads) == 0)
goto ENDTURN;
else
goto TAKETURN;
ENDTURN:
actor->state = ACTOR_IDLE;
#ifdef TRACY_ENABLE
@@ -724,7 +735,9 @@ static int actor_runner(void *data)
actor = ready_queue[0];
arrdel(ready_queue, 0);
} else {
SDL_AddAtomicInt(&waiting_threads, 1);
SDL_WaitCondition(queue_cond, queue_mutex);
SDL_AddAtomicInt(&waiting_threads, -1);
SDL_UnlockMutex(queue_mutex);
continue;
}
@@ -769,11 +782,12 @@ static void loop()
queue_mutex = SDL_CreateMutex();
queue_cond = SDL_CreateCondition();
actors_mutex = SDL_CreateMutex();
SDL_SetAtomicInt(&waiting_threads, 0);
timer_init();
add_runners(SDL_GetNumLogicalCPUCores()-1);
while (1) {
process_due_timers();

View File

@@ -6,6 +6,6 @@ var f = fd.open(arg[0], 'r')
var stat = fd.fstat(f)
var data = fd.read(f,stat.size);
fd.close(f)
log.console(text(data))
log.console(`cat took ${time.number()-st}`)
$_.stop()

23
tests/chunkread.ce Normal file
View File

@@ -0,0 +1,23 @@
var fd = use('fd')
var time = use('time')
var blob = use('blob')
var data = new blob
var st = time.number()
var f = fd.open(arg[0], 'r')
var chunksize = 4096
function getchunk()
{
var chunk = fd.read(f,chunksize);
data.write_blob(chunk);
if (chunk.length < chunksize*8) {
// we're done
fd.close(f)
log.console(`read took ${time.number()-st}`)
$_.stop()
} else
$_.clock(getchunk)
}
getchunk()