add http.cm and probe

This commit is contained in:
2026-02-24 21:04:03 -06:00
parent 3d4c0ec3d3
commit 2b877e6b0c
11 changed files with 846 additions and 7 deletions

233
docs/library/probe.md Normal file
View File

@@ -0,0 +1,233 @@
---
title: "probe"
description: "Runtime observability for actors"
weight: 90
type: "docs"
---
Runtime observability for actors. Register named probe functions on any actor and query them over HTTP while the program runs.
```javascript
var probe = use('probe')
```
The probe server starts automatically on the first `register()` call, listening on `127.0.0.1:9000`.
## Registering Probes
### probe.register(target, probes)
Register a group of probe functions under a target name. Each probe is a function that receives an `args` record and returns a value.
```javascript
var probe = use('probe')
var world = {
entities: [
{id: 1, name: "player", x: 10, y: 20, hp: 100},
{id: 2, name: "goblin", x: 55, y: 30, hp: 40}
],
tick: 0
}
probe.register("game", {
state: function(args) {
return world
},
entities: function(args) {
return world.entities
},
entity: function(args) {
return find(world.entities, function(e) {
return e.id == args.id
})
}
})
probe.register("render", {
info: function(args) {
return {fps: 60, draw_calls: 128, batches: 12}
}
})
```
A target is just a namespace — group related probes under the same target name. Register as many targets as you like; the server starts once and serves them all.
## HTTP Endpoints
All responses are JSON with an `ok` field.
### GET /discover
Lists all registered targets and their probe names. Designed for tooling — an LLM or dashboard can call this first to learn what's available, then query specific probes.
```
$ curl http://127.0.0.1:9000/discover
```
```json
{
"ok": true,
"targets": {
"game": ["state", "entities", "entity"],
"render": ["info"]
}
}
```
### POST /probe
Call a single probe function by target and name. Optionally pass arguments.
```
$ curl -X POST -H "Content-Type: application/json" \
-d '{"target":"game","name":"state"}' \
http://127.0.0.1:9000/probe
```
```json
{
"ok": true,
"result": {
"entities": [
{"id": 1, "name": "player", "x": 10, "y": 20, "hp": 100},
{"id": 2, "name": "goblin", "x": 55, "y": 30, "hp": 40}
],
"tick": 4821
}
}
```
With arguments:
```
$ curl -X POST -H "Content-Type: application/json" \
-d '{"target":"game","name":"entity","args":{"id":1}}' \
http://127.0.0.1:9000/probe
```
```json
{
"ok": true,
"result": {"id": 1, "name": "player", "x": 10, "y": 20, "hp": 100}
}
```
### POST /snapshot
Call multiple probes in one request. Returns all results keyed by `target/name`.
```
$ curl -X POST -H "Content-Type: application/json" \
-d '{"probes":[{"target":"game","name":"state"},{"target":"render","name":"info"}]}' \
http://127.0.0.1:9000/snapshot
```
```json
{
"ok": true,
"results": {
"game/state": {
"entities": [
{"id": 1, "name": "player", "x": 10, "y": 20, "hp": 100},
{"id": 2, "name": "goblin", "x": 55, "y": 30, "hp": 40}
],
"tick": 4821
},
"render/info": {
"fps": 60,
"draw_calls": 128,
"batches": 12
}
}
}
```
### Errors
Unknown paths return 404:
```json
{"ok": false, "error": "not found"}
```
Unknown targets or probe names:
```json
{"ok": false, "error": "unknown probe: game/nonexistent"}
```
If a probe function disrupts:
```json
{"ok": false, "error": "probe failed"}
```
## Example
A game actor with a simulation loop and probe observability:
```javascript
// game.ce
var probe = use('probe')
var state = {
entities: [
{id: 1, name: "player", x: 0, y: 0, hp: 100},
{id: 2, name: "enemy", x: 50, y: 50, hp: 60}
],
frame: 0,
paused: false
}
probe.register("game", {
state: function(args) {
return state
},
entities: function(args) {
return state.entities
},
entity: function(args) {
return find(state.entities, function(e) {
return e.id == args.id
})
}
})
// game loop
def tick = function(_) {
if (!state.paused) {
state.frame = state.frame + 1
// ... update entities, physics, AI ...
}
$delay(tick, 0.016)
}
$delay(tick, 0.016)
```
While the game runs, query it from a terminal:
```
$ curl -s http://127.0.0.1:9000/discover | jq .targets
{
"game": ["state", "entities", "entity"]
}
$ curl -s -X POST -d '{"target":"game","name":"state"}' \
-H "Content-Type: application/json" \
http://127.0.0.1:9000/probe | jq .result.frame
7834
$ curl -s -X POST -d '{"target":"game","name":"entity","args":{"id":1}}' \
-H "Content-Type: application/json" \
http://127.0.0.1:9000/probe | jq .result
{
"id": 1,
"name": "player",
"x": 142,
"y": 87,
"hp": 100
}
```
Probes run inside the actor's normal turn, so the values are always consistent — never a half-updated frame.

164
http.cm Normal file
View File

@@ -0,0 +1,164 @@
var socket = use('socket')
var c_http = use('net/http')
def CRLF = "\r\n"
def status_texts = {
"200": "OK", "201": "Created", "204": "No Content",
"400": "Bad Request", "401": "Unauthorized", "403": "Forbidden",
"404": "Not Found", "405": "Method Not Allowed", "500": "Internal Server Error"
}
function serve(port) {
var fd = socket.socket("AF_INET", "SOCK_STREAM")
socket.setsockopt(fd, "SOL_SOCKET", "SO_REUSEADDR", true)
socket.bind(fd, {address: "127.0.0.1", port: port})
socket.listen(fd, 16)
return fd
}
function parse_request(conn_fd) {
var data = socket.recv(conn_fd, 65536)
var raw = text(data)
var hdr_end = search(raw, CRLF + CRLF)
if (hdr_end == null) disrupt
var header_text = text(raw, 0, hdr_end)
var body_text = text(raw, hdr_end + 4)
var lines = array(header_text, CRLF)
var parts = array(lines[0], " ")
var method = parts[0]
var url = parts[1]
var qpos = search(url, "?")
var path = qpos != null ? text(url, 0, qpos) : url
var headers = {}
var i = 1
var colon = null
var key = null
var val = null
while (i < length(lines)) {
colon = search(lines[i], ": ")
if (colon != null) {
key = lower(text(lines[i], 0, colon))
val = text(lines[i], colon + 2)
headers[key] = val
}
i = i + 1
}
var cl = headers["content-length"]
var content_length = null
var remaining = null
var more = null
if (cl != null) content_length = number(cl)
if (content_length != null && length(body_text) < content_length) {
remaining = content_length - length(body_text)
more = socket.recv(conn_fd, remaining)
body_text = body_text + text(more)
}
if (content_length == null || content_length == 0) body_text = null
return {
method: method, path: path, url: url,
headers: headers, body: body_text, _conn: conn_fd
}
}
function accept(server_fd) {
var conn = socket.accept(server_fd)
return parse_request(conn.socket)
}
function on_request(server_fd, handler) {
var _accept = function() {
var conn = socket.accept(server_fd)
var req = null
var _parse = function() {
req = parse_request(conn.socket)
} disruption {
req = null
}
_parse()
if (req != null) handler(req)
socket.on_readable(server_fd, _accept)
}
socket.on_readable(server_fd, _accept)
}
function respond(conn, status, headers, body) {
var st = status_texts[text(status)]
if (st == null) st = "Unknown"
var out = "HTTP/1.1 " + text(status) + " " + st + CRLF
out = out + "Connection: close" + CRLF
var body_str = ""
var keys = null
var i = 0
if (body != null) {
if (is_text(body)) body_str = body
else body_str = text(body)
}
if (headers != null) {
keys = array(headers)
i = 0
while (i < length(keys)) {
out = out + keys[i] + ": " + headers[keys[i]] + CRLF
i = i + 1
}
}
out = out + "Content-Length: " + text(length(body_str)) + CRLF
out = out + CRLF + body_str
socket.send(conn, out)
socket.close(conn)
}
function sse_open(conn, headers) {
var out = "HTTP/1.1 200 OK" + CRLF
out = out + "Content-Type: text/event-stream" + CRLF
out = out + "Cache-Control: no-cache" + CRLF
out = out + "Connection: keep-alive" + CRLF
var keys = null
var i = 0
if (headers != null) {
keys = array(headers)
i = 0
while (i < length(keys)) {
out = out + keys[i] + ": " + headers[keys[i]] + CRLF
i = i + 1
}
}
out = out + CRLF
socket.send(conn, out)
}
function sse_event(conn, event, data) {
var frame = "event: " + event + "\ndata: " + data + "\n\n"
var ok = true
var _send = function() {
socket.send(conn, frame)
} disruption {
ok = false
}
_send()
return ok
}
function sse_close(conn) {
socket.close(conn)
}
function close(fd) {
socket.close(fd)
}
return {
serve: serve, accept: accept, on_request: on_request,
respond: respond,
sse_open: sse_open, sse_event: sse_event, sse_close: sse_close,
close: close, fetch: c_http.fetch
}

View File

@@ -145,7 +145,7 @@ function load_pipeline_module(name, env) {
hash = content_hash(source_blob)
cached = pipeline_cache_path(hash)
if (cached && fd.is_file(cached)) {
log.system('engine: pipeline ' + name + ' (cached)')
// log.system('engine: pipeline ' + name + ' (cached)')
return mach_load(fd.slurp(cached), env)
}
@@ -169,7 +169,7 @@ function load_pipeline_module(name, env) {
compiled = boot_sl(compiled)
mcode_json = json.encode(compiled)
mach_blob = mach_compile_mcode_bin(name, mcode_json)
log.system('engine: pipeline ' + name + ' (compiled)')
// log.system('engine: pipeline ' + name + ' (compiled)')
if (!native_mode && cached) {
ensure_build_dir()
fd.slurpwrite(cached, mach_blob)
@@ -676,13 +676,13 @@ function use_core(path) {
if (!source_blob) source_blob = fd.slurp(file_path)
cached_path = module_cache_path(source_blob, 'mach')
if (cached_path && fd.is_file(cached_path)) {
log.system('engine: cache hit for core/' + path)
// log.system('engine: cache hit for core/' + path)
result = mach_load(fd.slurp(cached_path), env)
} else {
script = text(source_blob)
ast = analyze(script, file_path)
mach_blob = compile_to_blob('core:' + path, ast)
log.system('engine: compiled core/' + path)
// log.system('engine: compiled core/' + path)
if (!native_mode && cached_path) {
ensure_build_dir()
fd.slurpwrite(cached_path, mach_blob)

View File

@@ -82,6 +82,7 @@ scripts = [
'internal/os.c',
'internal/fd.c',
'net/http.c',
'net/socket.c',
'internal/enet.c',
'archive/miniz.c',
'source/cJSON.c'

View File

@@ -321,7 +321,7 @@ static const JSCFunctionListEntry js_http_funcs[] = {
JS_CFUNC_DEF("fetch", 2, js_fetch_picoparser),
};
JSValue js_core_http_use(JSContext *js) {
JSValue js_core_net_http_use(JSContext *js) {
JS_FRAME(js);
par_easycurl_init(0); // Initialize platform HTTP backend
JS_ROOT(mod, JS_NewObject(js));

View File

@@ -563,10 +563,26 @@ JSC_CCALL(socket_setsockopt,
JSC_CCALL(socket_close,
int sockfd = js2fd(js, argv[0]);
if (sockfd < 0) return JS_EXCEPTION;
if (close(sockfd) != 0)
return JS_RaiseDisrupt(js, "close failed: %s", strerror(errno));
return JS_NULL;
)
JSC_CCALL(socket_on_readable,
int sockfd = js2fd(js, argv[0]);
if (sockfd < 0) return JS_EXCEPTION;
if (!JS_IsFunction(argv[1]))
return JS_RaiseDisrupt(js, "on_readable: callback must be a function");
actor_watch_readable(js, sockfd, argv[1]);
return JS_NULL;
)
JSC_CCALL(socket_unwatch,
int sockfd = js2fd(js, argv[0]);
if (sockfd < 0) return JS_EXCEPTION;
actor_unwatch(js, sockfd);
return JS_NULL;
)
@@ -587,6 +603,8 @@ static const JSCFunctionListEntry js_socket_funcs[] = {
MIST_FUNC_DEF(socket, gai_strerror, 1),
MIST_FUNC_DEF(socket, setsockopt, 4),
MIST_FUNC_DEF(socket, close, 1),
MIST_FUNC_DEF(socket, on_readable, 2),
MIST_FUNC_DEF(socket, unwatch, 1),
};
JSValue js_core_socket_use(JSContext *js) {

151
probe.ce Normal file
View File

@@ -0,0 +1,151 @@
// cell probe - Query a running probe server
//
// Usage:
// cell probe List all targets and probes
// cell probe <target> <name> Query a probe
// cell probe <target> <name> k=v ... Query with arguments
// cell probe --port=8080 game state Use a different port
var socket = use('socket')
var json = use('json')
var host = "127.0.0.1"
var port = 9000
def CRLF = "\r\n"
function request(method, path, body) {
var fd = socket.socket("AF_INET", "SOCK_STREAM")
var raw = null
var hdr_end = null
var _do = function() {
socket.connect(fd, {address: host, port: port})
var req = method + " " + path + " HTTP/1.1" + CRLF
req = req + "Host: " + host + CRLF
req = req + "Connection: close" + CRLF
if (body != null) {
req = req + "Content-Type: application/json" + CRLF
req = req + "Content-Length: " + text(length(body)) + CRLF
}
req = req + CRLF
if (body != null) req = req + body
socket.send(fd, req)
raw = text(socket.recv(fd, 65536))
} disruption {
raw = null
}
_do()
socket.close(fd)
if (raw == null) return null
hdr_end = search(raw, CRLF + CRLF)
if (hdr_end == null) return null
return text(raw, hdr_end + 4)
}
function print_targets(targets) {
var keys = array(targets)
var j = 0
var p = 0
var probes = null
while (j < length(keys)) {
probes = targets[keys[j]]
log.console(keys[j])
p = 0
while (p < length(probes)) {
log.console(" " + probes[p])
p = p + 1
}
j = j + 1
}
}
function run() {
var target = null
var name = null
var probe_args = {}
var i = 0
var eq = null
var k = null
var v = null
var n = null
for (i = 0; i < length(args); i++) {
if (args[i] == '--help' || args[i] == '-h') {
log.console("Usage: cell probe [target] [name] [key=value ...]")
log.console("")
log.console(" cell probe List all targets and probes")
log.console(" cell probe game state Query game/state")
log.console(" cell probe game entity id=1 Query with arguments")
log.console("")
log.console("Options:")
log.console(" --port=N Connect to port N (default 9000)")
return
} else if (starts_with(args[i], '--port=')) {
port = number(text(args[i], 7))
} else if (target == null) {
target = args[i]
} else if (name == null) {
name = args[i]
} else {
eq = search(args[i], "=")
if (eq != null) {
k = text(args[i], 0, eq)
v = text(args[i], eq + 1)
n = number(v)
if (n != null) {
v = n
} else if (v == "true") {
v = true
} else if (v == "false") {
v = false
} else if (v == "null") {
v = null
}
probe_args[k] = v
}
}
}
var resp = null
var body = null
var data = null
if (target == null) {
resp = request("GET", "/discover", null)
} else {
body = {target: target, name: name}
if (length(array(probe_args)) > 0) body.args = probe_args
resp = request("POST", "/probe", json.encode(body, false))
}
if (resp == null) {
log.error("could not connect to probe server on port " + text(port))
return
}
var _parse = function() {
data = json.decode(resp)
} disruption {
data = null
}
_parse()
if (data == null) {
log.error("invalid response from server")
return
}
if (!data.ok) {
log.error(data.error)
return
}
if (target == null) {
print_targets(data.targets)
} else {
log.console(json.encode(data.result, 2))
}
}
run()
$stop()

124
probe.cm Normal file
View File

@@ -0,0 +1,124 @@
var http = use('http')
var json = use('json')
var registry = {}
var server_fd = null
var port = 9000
function handle_request(req) {
var result = null
var _try = null
if (req.method == "GET" && req.path == "/discover") {
result = discover()
http.respond(req._conn, 200, {"content-type": "application/json"},
json.encode(result))
return
}
if (req.method == "POST" && req.path == "/probe") {
_try = function() {
result = handle_probe(req)
} disruption {
result = {ok: false, error: "probe failed"}
}
_try()
http.respond(req._conn, 200, {"content-type": "application/json"},
json.encode(result))
return
}
if (req.method == "POST" && req.path == "/snapshot") {
_try = function() {
result = handle_snapshot(req)
} disruption {
result = {ok: false, error: "snapshot failed"}
}
_try()
http.respond(req._conn, 200, {"content-type": "application/json"},
json.encode(result))
return
}
http.respond(req._conn, 404, {"content-type": "application/json"},
json.encode({ok: false, error: "not found"}))
}
function discover() {
var targets = {}
var target_keys = array(registry)
var i = 0
while (i < length(target_keys)) {
targets[target_keys[i]] = array(registry[target_keys[i]])
i = i + 1
}
return {ok: true, targets: targets}
}
function handle_probe(req) {
var body = json.decode(req.body)
var target = body.target
var name = body.name
var args = body.args
if (target == null || name == null) {
return {ok: false, error: "missing target or name"}
}
var target_probes = registry[target]
if (target_probes == null) {
return {ok: false, error: "unknown target: " + target}
}
var probe_fn = target_probes[name]
if (probe_fn == null) {
return {ok: false, error: "unknown probe: " + target + "/" + name}
}
if (args == null) args = {}
var result = probe_fn(args)
return {ok: true, result: result}
}
function handle_snapshot(req) {
var body = json.decode(req.body)
var probes = body.probes
if (probes == null) {
return {ok: false, error: "missing probes array"}
}
var results = {}
var i = 0
var p = null
var target_probes = null
var probe_fn = null
var key = null
while (i < length(probes)) {
p = probes[i]
key = p.target + "/" + p.name
target_probes = registry[p.target]
if (target_probes != null) {
probe_fn = target_probes[p.name]
if (probe_fn != null) {
results[key] = probe_fn(p.args != null ? p.args : {})
}
}
i = i + 1
}
return {ok: true, results: results}
}
function start_server() {
server_fd = http.serve(port)
http.on_request(server_fd, handle_request)
}
function register(target, probes) {
registry[target] = probes
if (server_fd == null) start_server()
}
return {
register: register,
port: port
}

View File

@@ -912,6 +912,14 @@ int js_is_blob(JSContext *js, JSValue v);
#include "blob.h"
/* ============================================================
Actor I/O Watch — event-driven fd monitoring
============================================================ */
void actor_watch_readable(JSContext *actor, int fd, JSValue fn);
void actor_watch_writable(JSContext *actor, int fd, JSValue fn);
void actor_unwatch(JSContext *actor, int fd);
/* ============================================================
Convenience Functions
============================================================ */

View File

@@ -859,6 +859,14 @@ typedef struct letter {
};
} letter;
/* I/O watch entry — one per watched file descriptor */
typedef struct {
int fd;
short events; /* POLLIN, POLLOUT */
JSContext *actor;
JSValue callback;
} io_watch;
/* Actor state machine constants */
#define ACTOR_IDLE 0
#define ACTOR_READY 1
@@ -1049,6 +1057,10 @@ void enqueue_actor_priority(JSContext *actor);
void actor_clock(JSContext *actor, JSValue fn);
uint32_t actor_delay(JSContext *actor, JSValue fn, double seconds);
JSValue actor_remove_timer(JSContext *actor, uint32_t timer_id);
void actor_watch(JSContext *actor, int fd, short events, JSValue fn);
void actor_watch_readable(JSContext *actor, int fd, JSValue fn);
void actor_watch_writable(JSContext *actor, int fd, JSValue fn);
void actor_unwatch(JSContext *actor, int fd);
void exit_handler(void);
void actor_loop(void);
void actor_initialize(void);

View File

@@ -6,6 +6,8 @@
#include <stdio.h>
#include <unistd.h>
#include <stdatomic.h>
#include <poll.h>
#include <fcntl.h>
#include "stb_ds.h"
#include "cell.h"
@@ -13,6 +15,7 @@
#ifdef _WIN32
#include <windows.h>
#include <winsock2.h>
#endif
typedef struct actor_node {
@@ -93,6 +96,95 @@ static int has_any_work(actor_node *heads[]) {
static pthread_mutex_t *actors_mutex;
static struct { char *key; JSContext *value; } *actors = NULL;
/* ============================================================
I/O Watch Thread — poll()-based fd monitoring
============================================================ */
static io_watch *g_io_watches = NULL; /* stb_ds dynamic array */
static pthread_mutex_t io_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t io_thread;
static int io_pipe[2] = {-1, -1}; /* self-pipe to wake poll() */
static void io_wake(void) {
char c = 1;
(void)write(io_pipe[1], &c, 1);
}
static void *io_thread_func(void *arg) {
(void)arg;
while (1) {
pthread_mutex_lock(&io_mutex);
if (engine.shutting_down) {
pthread_mutex_unlock(&io_mutex);
return NULL;
}
int n = arrlen(g_io_watches);
/* +1 for the wakeup pipe */
struct pollfd *fds = malloc(sizeof(struct pollfd) * (n + 1));
fds[0].fd = io_pipe[0];
fds[0].events = POLLIN;
for (int i = 0; i < n; i++) {
fds[i + 1].fd = g_io_watches[i].fd;
fds[i + 1].events = g_io_watches[i].events;
}
pthread_mutex_unlock(&io_mutex);
int ready = poll(fds, n + 1, 500); /* 500ms timeout for shutdown check */
if (ready <= 0) {
free(fds);
continue;
}
/* Drain wakeup pipe */
if (fds[0].revents & POLLIN) {
char buf[64];
(void)read(io_pipe[0], buf, sizeof(buf));
}
/* Fire callbacks for ready fds */
pthread_mutex_lock(&io_mutex);
for (int i = n - 1; i >= 0; i--) {
if (i >= arrlen(g_io_watches)) continue;
if (fds[i + 1].revents & g_io_watches[i].events) {
io_watch w = g_io_watches[i];
arrdel(g_io_watches, i); /* one-shot: remove before firing */
pthread_mutex_unlock(&io_mutex);
actor_clock(w.actor, w.callback);
pthread_mutex_lock(&io_mutex);
}
}
pthread_mutex_unlock(&io_mutex);
free(fds);
}
return NULL;
}
void actor_watch(JSContext *actor, int fd, short events, JSValue fn) {
io_watch w = { .fd = fd, .events = events, .actor = actor, .callback = fn };
pthread_mutex_lock(&io_mutex);
arrput(g_io_watches, w);
pthread_mutex_unlock(&io_mutex);
io_wake();
}
void actor_watch_readable(JSContext *actor, int fd, JSValue fn) {
actor_watch(actor, fd, POLLIN, fn);
}
void actor_watch_writable(JSContext *actor, int fd, JSValue fn) {
actor_watch(actor, fd, POLLOUT, fn);
}
void actor_unwatch(JSContext *actor, int fd) {
pthread_mutex_lock(&io_mutex);
for (int i = arrlen(g_io_watches) - 1; i >= 0; i--) {
if (g_io_watches[i].actor == actor && g_io_watches[i].fd == fd) {
arrdel(g_io_watches, i);
}
}
pthread_mutex_unlock(&io_mutex);
io_wake();
}
#define lockless_shdel(NAME, KEY) pthread_mutex_lock(NAME##_mutex); shdel(NAME, KEY); pthread_mutex_unlock(NAME##_mutex);
#define lockless_shlen(NAME) ({ \
pthread_mutex_lock(NAME##_mutex); \
@@ -307,6 +399,13 @@ void actor_initialize(void) {
// Start Timer Thread
pthread_create(&engine.timer_thread, NULL, timer_thread_func, NULL);
// Start I/O Watch Thread
if (pipe(io_pipe) == 0) {
fcntl(io_pipe[0], F_SETFL, O_NONBLOCK);
fcntl(io_pipe[1], F_SETFL, O_NONBLOCK);
pthread_create(&io_thread, NULL, io_thread_func, NULL);
}
// Start Workers
#ifdef _WIN32
SYSTEM_INFO sysinfo;
@@ -359,6 +458,14 @@ void actor_free(JSContext *actor)
pthread_mutex_unlock(&engine.lock);
}
/* Remove I/O watches for this actor */
pthread_mutex_lock(&io_mutex);
for (int i = arrlen(g_io_watches) - 1; i >= 0; i--) {
if (g_io_watches[i].actor == actor)
arrdel(g_io_watches, i);
}
pthread_mutex_unlock(&io_mutex);
// Do not go forward with actor destruction until the actor is completely free
pthread_mutex_lock(actor->msg_mutex);
pthread_mutex_lock(actor->mutex);
@@ -438,6 +545,16 @@ void exit_handler(void) {
pthread_join(engine.timer_thread, NULL);
/* Shut down I/O thread */
if (io_pipe[1] >= 0) {
io_wake();
pthread_join(io_thread, NULL);
close(io_pipe[0]);
close(io_pipe[1]);
io_pipe[0] = io_pipe[1] = -1;
}
arrfree(g_io_watches);
for (int i=0; i < engine.num_workers; i++) {
pthread_join(engine.worker_threads[i], NULL);
}
@@ -883,6 +1000,17 @@ void actor_gc_scan(JSContext *ctx,
if (ctx->msg_mutex)
pthread_mutex_unlock(ctx->msg_mutex);
/* Scan I/O watch callbacks belonging to this actor */
pthread_mutex_lock(&io_mutex);
for (int i = 0; i < arrlen(g_io_watches); i++) {
if (g_io_watches[i].actor == ctx) {
g_io_watches[i].callback = gc_copy_value(ctx,
g_io_watches[i].callback,
from_base, from_end, to_base, to_free, to_end);
}
}
pthread_mutex_unlock(&io_mutex);
}
void actor_clock(JSContext *actor, JSValue fn)