diff --git a/docs/library/probe.md b/docs/library/probe.md new file mode 100644 index 00000000..2c635b59 --- /dev/null +++ b/docs/library/probe.md @@ -0,0 +1,233 @@ +--- +title: "probe" +description: "Runtime observability for actors" +weight: 90 +type: "docs" +--- + +Runtime observability for actors. Register named probe functions on any actor and query them over HTTP while the program runs. + +```javascript +var probe = use('probe') +``` + +The probe server starts automatically on the first `register()` call, listening on `127.0.0.1:9000`. + +## Registering Probes + +### probe.register(target, probes) + +Register a group of probe functions under a target name. Each probe is a function that receives an `args` record and returns a value. + +```javascript +var probe = use('probe') + +var world = { + entities: [ + {id: 1, name: "player", x: 10, y: 20, hp: 100}, + {id: 2, name: "goblin", x: 55, y: 30, hp: 40} + ], + tick: 0 +} + +probe.register("game", { + state: function(args) { + return world + }, + entities: function(args) { + return world.entities + }, + entity: function(args) { + return find(world.entities, function(e) { + return e.id == args.id + }) + } +}) + +probe.register("render", { + info: function(args) { + return {fps: 60, draw_calls: 128, batches: 12} + } +}) +``` + +A target is just a namespace — group related probes under the same target name. Register as many targets as you like; the server starts once and serves them all. + +## HTTP Endpoints + +All responses are JSON with an `ok` field. + +### GET /discover + +Lists all registered targets and their probe names. Designed for tooling — an LLM or dashboard can call this first to learn what's available, then query specific probes. + +``` +$ curl http://127.0.0.1:9000/discover +``` + +```json +{ + "ok": true, + "targets": { + "game": ["state", "entities", "entity"], + "render": ["info"] + } +} +``` + +### POST /probe + +Call a single probe function by target and name. Optionally pass arguments. + +``` +$ curl -X POST -H "Content-Type: application/json" \ + -d '{"target":"game","name":"state"}' \ + http://127.0.0.1:9000/probe +``` + +```json +{ + "ok": true, + "result": { + "entities": [ + {"id": 1, "name": "player", "x": 10, "y": 20, "hp": 100}, + {"id": 2, "name": "goblin", "x": 55, "y": 30, "hp": 40} + ], + "tick": 4821 + } +} +``` + +With arguments: + +``` +$ curl -X POST -H "Content-Type: application/json" \ + -d '{"target":"game","name":"entity","args":{"id":1}}' \ + http://127.0.0.1:9000/probe +``` + +```json +{ + "ok": true, + "result": {"id": 1, "name": "player", "x": 10, "y": 20, "hp": 100} +} +``` + +### POST /snapshot + +Call multiple probes in one request. Returns all results keyed by `target/name`. + +``` +$ curl -X POST -H "Content-Type: application/json" \ + -d '{"probes":[{"target":"game","name":"state"},{"target":"render","name":"info"}]}' \ + http://127.0.0.1:9000/snapshot +``` + +```json +{ + "ok": true, + "results": { + "game/state": { + "entities": [ + {"id": 1, "name": "player", "x": 10, "y": 20, "hp": 100}, + {"id": 2, "name": "goblin", "x": 55, "y": 30, "hp": 40} + ], + "tick": 4821 + }, + "render/info": { + "fps": 60, + "draw_calls": 128, + "batches": 12 + } + } +} +``` + +### Errors + +Unknown paths return 404: + +```json +{"ok": false, "error": "not found"} +``` + +Unknown targets or probe names: + +```json +{"ok": false, "error": "unknown probe: game/nonexistent"} +``` + +If a probe function disrupts: + +```json +{"ok": false, "error": "probe failed"} +``` + +## Example + +A game actor with a simulation loop and probe observability: + +```javascript +// game.ce +var probe = use('probe') + +var state = { + entities: [ + {id: 1, name: "player", x: 0, y: 0, hp: 100}, + {id: 2, name: "enemy", x: 50, y: 50, hp: 60} + ], + frame: 0, + paused: false +} + +probe.register("game", { + state: function(args) { + return state + }, + entities: function(args) { + return state.entities + }, + entity: function(args) { + return find(state.entities, function(e) { + return e.id == args.id + }) + } +}) + +// game loop +def tick = function(_) { + if (!state.paused) { + state.frame = state.frame + 1 + // ... update entities, physics, AI ... + } + $delay(tick, 0.016) +} +$delay(tick, 0.016) +``` + +While the game runs, query it from a terminal: + +``` +$ curl -s http://127.0.0.1:9000/discover | jq .targets +{ + "game": ["state", "entities", "entity"] +} + +$ curl -s -X POST -d '{"target":"game","name":"state"}' \ + -H "Content-Type: application/json" \ + http://127.0.0.1:9000/probe | jq .result.frame +7834 + +$ curl -s -X POST -d '{"target":"game","name":"entity","args":{"id":1}}' \ + -H "Content-Type: application/json" \ + http://127.0.0.1:9000/probe | jq .result +{ + "id": 1, + "name": "player", + "x": 142, + "y": 87, + "hp": 100 +} +``` + +Probes run inside the actor's normal turn, so the values are always consistent — never a half-updated frame. diff --git a/http.cm b/http.cm new file mode 100644 index 00000000..7c1548d0 --- /dev/null +++ b/http.cm @@ -0,0 +1,164 @@ +var socket = use('socket') +var c_http = use('net/http') + +def CRLF = "\r\n" + +def status_texts = { + "200": "OK", "201": "Created", "204": "No Content", + "400": "Bad Request", "401": "Unauthorized", "403": "Forbidden", + "404": "Not Found", "405": "Method Not Allowed", "500": "Internal Server Error" +} + +function serve(port) { + var fd = socket.socket("AF_INET", "SOCK_STREAM") + socket.setsockopt(fd, "SOL_SOCKET", "SO_REUSEADDR", true) + socket.bind(fd, {address: "127.0.0.1", port: port}) + socket.listen(fd, 16) + return fd +} + +function parse_request(conn_fd) { + var data = socket.recv(conn_fd, 65536) + var raw = text(data) + + var hdr_end = search(raw, CRLF + CRLF) + if (hdr_end == null) disrupt + var header_text = text(raw, 0, hdr_end) + var body_text = text(raw, hdr_end + 4) + + var lines = array(header_text, CRLF) + var parts = array(lines[0], " ") + var method = parts[0] + var url = parts[1] + var qpos = search(url, "?") + var path = qpos != null ? text(url, 0, qpos) : url + + var headers = {} + var i = 1 + var colon = null + var key = null + var val = null + while (i < length(lines)) { + colon = search(lines[i], ": ") + if (colon != null) { + key = lower(text(lines[i], 0, colon)) + val = text(lines[i], colon + 2) + headers[key] = val + } + i = i + 1 + } + + var cl = headers["content-length"] + var content_length = null + var remaining = null + var more = null + if (cl != null) content_length = number(cl) + if (content_length != null && length(body_text) < content_length) { + remaining = content_length - length(body_text) + more = socket.recv(conn_fd, remaining) + body_text = body_text + text(more) + } + if (content_length == null || content_length == 0) body_text = null + + return { + method: method, path: path, url: url, + headers: headers, body: body_text, _conn: conn_fd + } +} + +function accept(server_fd) { + var conn = socket.accept(server_fd) + return parse_request(conn.socket) +} + +function on_request(server_fd, handler) { + var _accept = function() { + var conn = socket.accept(server_fd) + var req = null + var _parse = function() { + req = parse_request(conn.socket) + } disruption { + req = null + } + _parse() + if (req != null) handler(req) + socket.on_readable(server_fd, _accept) + } + socket.on_readable(server_fd, _accept) +} + +function respond(conn, status, headers, body) { + var st = status_texts[text(status)] + if (st == null) st = "Unknown" + var out = "HTTP/1.1 " + text(status) + " " + st + CRLF + out = out + "Connection: close" + CRLF + + var body_str = "" + var keys = null + var i = 0 + if (body != null) { + if (is_text(body)) body_str = body + else body_str = text(body) + } + + if (headers != null) { + keys = array(headers) + i = 0 + while (i < length(keys)) { + out = out + keys[i] + ": " + headers[keys[i]] + CRLF + i = i + 1 + } + } + + out = out + "Content-Length: " + text(length(body_str)) + CRLF + out = out + CRLF + body_str + + socket.send(conn, out) + socket.close(conn) +} + +function sse_open(conn, headers) { + var out = "HTTP/1.1 200 OK" + CRLF + out = out + "Content-Type: text/event-stream" + CRLF + out = out + "Cache-Control: no-cache" + CRLF + out = out + "Connection: keep-alive" + CRLF + var keys = null + var i = 0 + if (headers != null) { + keys = array(headers) + i = 0 + while (i < length(keys)) { + out = out + keys[i] + ": " + headers[keys[i]] + CRLF + i = i + 1 + } + } + out = out + CRLF + socket.send(conn, out) +} + +function sse_event(conn, event, data) { + var frame = "event: " + event + "\ndata: " + data + "\n\n" + var ok = true + var _send = function() { + socket.send(conn, frame) + } disruption { + ok = false + } + _send() + return ok +} + +function sse_close(conn) { + socket.close(conn) +} + +function close(fd) { + socket.close(fd) +} + +return { + serve: serve, accept: accept, on_request: on_request, + respond: respond, + sse_open: sse_open, sse_event: sse_event, sse_close: sse_close, + close: close, fetch: c_http.fetch +} diff --git a/internal/engine.cm b/internal/engine.cm index 58a5c0d1..4c1b5b3c 100644 --- a/internal/engine.cm +++ b/internal/engine.cm @@ -145,7 +145,7 @@ function load_pipeline_module(name, env) { hash = content_hash(source_blob) cached = pipeline_cache_path(hash) if (cached && fd.is_file(cached)) { - log.system('engine: pipeline ' + name + ' (cached)') + // log.system('engine: pipeline ' + name + ' (cached)') return mach_load(fd.slurp(cached), env) } @@ -169,7 +169,7 @@ function load_pipeline_module(name, env) { compiled = boot_sl(compiled) mcode_json = json.encode(compiled) mach_blob = mach_compile_mcode_bin(name, mcode_json) - log.system('engine: pipeline ' + name + ' (compiled)') + // log.system('engine: pipeline ' + name + ' (compiled)') if (!native_mode && cached) { ensure_build_dir() fd.slurpwrite(cached, mach_blob) @@ -676,13 +676,13 @@ function use_core(path) { if (!source_blob) source_blob = fd.slurp(file_path) cached_path = module_cache_path(source_blob, 'mach') if (cached_path && fd.is_file(cached_path)) { - log.system('engine: cache hit for core/' + path) + // log.system('engine: cache hit for core/' + path) result = mach_load(fd.slurp(cached_path), env) } else { script = text(source_blob) ast = analyze(script, file_path) mach_blob = compile_to_blob('core:' + path, ast) - log.system('engine: compiled core/' + path) + // log.system('engine: compiled core/' + path) if (!native_mode && cached_path) { ensure_build_dir() fd.slurpwrite(cached_path, mach_blob) diff --git a/meson.build b/meson.build index 29221512..dbe26259 100644 --- a/meson.build +++ b/meson.build @@ -82,6 +82,7 @@ scripts = [ 'internal/os.c', 'internal/fd.c', 'net/http.c', + 'net/socket.c', 'internal/enet.c', 'archive/miniz.c', 'source/cJSON.c' diff --git a/net/http.c b/net/http.c index 674ab227..177e5179 100644 --- a/net/http.c +++ b/net/http.c @@ -321,7 +321,7 @@ static const JSCFunctionListEntry js_http_funcs[] = { JS_CFUNC_DEF("fetch", 2, js_fetch_picoparser), }; -JSValue js_core_http_use(JSContext *js) { +JSValue js_core_net_http_use(JSContext *js) { JS_FRAME(js); par_easycurl_init(0); // Initialize platform HTTP backend JS_ROOT(mod, JS_NewObject(js)); diff --git a/net/socket.c b/net/socket.c index ac8ad204..592dcae3 100644 --- a/net/socket.c +++ b/net/socket.c @@ -563,10 +563,26 @@ JSC_CCALL(socket_setsockopt, JSC_CCALL(socket_close, int sockfd = js2fd(js, argv[0]); if (sockfd < 0) return JS_EXCEPTION; - + if (close(sockfd) != 0) return JS_RaiseDisrupt(js, "close failed: %s", strerror(errno)); - + + return JS_NULL; +) + +JSC_CCALL(socket_on_readable, + int sockfd = js2fd(js, argv[0]); + if (sockfd < 0) return JS_EXCEPTION; + if (!JS_IsFunction(argv[1])) + return JS_RaiseDisrupt(js, "on_readable: callback must be a function"); + actor_watch_readable(js, sockfd, argv[1]); + return JS_NULL; +) + +JSC_CCALL(socket_unwatch, + int sockfd = js2fd(js, argv[0]); + if (sockfd < 0) return JS_EXCEPTION; + actor_unwatch(js, sockfd); return JS_NULL; ) @@ -587,6 +603,8 @@ static const JSCFunctionListEntry js_socket_funcs[] = { MIST_FUNC_DEF(socket, gai_strerror, 1), MIST_FUNC_DEF(socket, setsockopt, 4), MIST_FUNC_DEF(socket, close, 1), + MIST_FUNC_DEF(socket, on_readable, 2), + MIST_FUNC_DEF(socket, unwatch, 1), }; JSValue js_core_socket_use(JSContext *js) { diff --git a/probe.ce b/probe.ce new file mode 100644 index 00000000..3583bfd9 --- /dev/null +++ b/probe.ce @@ -0,0 +1,151 @@ +// cell probe - Query a running probe server +// +// Usage: +// cell probe List all targets and probes +// cell probe Query a probe +// cell probe k=v ... Query with arguments +// cell probe --port=8080 game state Use a different port + +var socket = use('socket') +var json = use('json') + +var host = "127.0.0.1" +var port = 9000 + +def CRLF = "\r\n" + +function request(method, path, body) { + var fd = socket.socket("AF_INET", "SOCK_STREAM") + var raw = null + var hdr_end = null + var _do = function() { + socket.connect(fd, {address: host, port: port}) + var req = method + " " + path + " HTTP/1.1" + CRLF + req = req + "Host: " + host + CRLF + req = req + "Connection: close" + CRLF + if (body != null) { + req = req + "Content-Type: application/json" + CRLF + req = req + "Content-Length: " + text(length(body)) + CRLF + } + req = req + CRLF + if (body != null) req = req + body + socket.send(fd, req) + raw = text(socket.recv(fd, 65536)) + } disruption { + raw = null + } + _do() + socket.close(fd) + if (raw == null) return null + hdr_end = search(raw, CRLF + CRLF) + if (hdr_end == null) return null + return text(raw, hdr_end + 4) +} + +function print_targets(targets) { + var keys = array(targets) + var j = 0 + var p = 0 + var probes = null + while (j < length(keys)) { + probes = targets[keys[j]] + log.console(keys[j]) + p = 0 + while (p < length(probes)) { + log.console(" " + probes[p]) + p = p + 1 + } + j = j + 1 + } +} + +function run() { + var target = null + var name = null + var probe_args = {} + var i = 0 + var eq = null + var k = null + var v = null + var n = null + + for (i = 0; i < length(args); i++) { + if (args[i] == '--help' || args[i] == '-h') { + log.console("Usage: cell probe [target] [name] [key=value ...]") + log.console("") + log.console(" cell probe List all targets and probes") + log.console(" cell probe game state Query game/state") + log.console(" cell probe game entity id=1 Query with arguments") + log.console("") + log.console("Options:") + log.console(" --port=N Connect to port N (default 9000)") + return + } else if (starts_with(args[i], '--port=')) { + port = number(text(args[i], 7)) + } else if (target == null) { + target = args[i] + } else if (name == null) { + name = args[i] + } else { + eq = search(args[i], "=") + if (eq != null) { + k = text(args[i], 0, eq) + v = text(args[i], eq + 1) + n = number(v) + if (n != null) { + v = n + } else if (v == "true") { + v = true + } else if (v == "false") { + v = false + } else if (v == "null") { + v = null + } + probe_args[k] = v + } + } + } + + var resp = null + var body = null + var data = null + + if (target == null) { + resp = request("GET", "/discover", null) + } else { + body = {target: target, name: name} + if (length(array(probe_args)) > 0) body.args = probe_args + resp = request("POST", "/probe", json.encode(body, false)) + } + + if (resp == null) { + log.error("could not connect to probe server on port " + text(port)) + return + } + + var _parse = function() { + data = json.decode(resp) + } disruption { + data = null + } + _parse() + + if (data == null) { + log.error("invalid response from server") + return + } + + if (!data.ok) { + log.error(data.error) + return + } + + if (target == null) { + print_targets(data.targets) + } else { + log.console(json.encode(data.result, 2)) + } +} +run() + +$stop() diff --git a/probe.cm b/probe.cm new file mode 100644 index 00000000..91d7b48a --- /dev/null +++ b/probe.cm @@ -0,0 +1,124 @@ +var http = use('http') +var json = use('json') + +var registry = {} +var server_fd = null +var port = 9000 + +function handle_request(req) { + var result = null + var _try = null + + if (req.method == "GET" && req.path == "/discover") { + result = discover() + http.respond(req._conn, 200, {"content-type": "application/json"}, + json.encode(result)) + return + } + + if (req.method == "POST" && req.path == "/probe") { + _try = function() { + result = handle_probe(req) + } disruption { + result = {ok: false, error: "probe failed"} + } + _try() + http.respond(req._conn, 200, {"content-type": "application/json"}, + json.encode(result)) + return + } + + if (req.method == "POST" && req.path == "/snapshot") { + _try = function() { + result = handle_snapshot(req) + } disruption { + result = {ok: false, error: "snapshot failed"} + } + _try() + http.respond(req._conn, 200, {"content-type": "application/json"}, + json.encode(result)) + return + } + + http.respond(req._conn, 404, {"content-type": "application/json"}, + json.encode({ok: false, error: "not found"})) +} + +function discover() { + var targets = {} + var target_keys = array(registry) + var i = 0 + while (i < length(target_keys)) { + targets[target_keys[i]] = array(registry[target_keys[i]]) + i = i + 1 + } + return {ok: true, targets: targets} +} + +function handle_probe(req) { + var body = json.decode(req.body) + var target = body.target + var name = body.name + var args = body.args + + if (target == null || name == null) { + return {ok: false, error: "missing target or name"} + } + + var target_probes = registry[target] + if (target_probes == null) { + return {ok: false, error: "unknown target: " + target} + } + + var probe_fn = target_probes[name] + if (probe_fn == null) { + return {ok: false, error: "unknown probe: " + target + "/" + name} + } + + if (args == null) args = {} + var result = probe_fn(args) + return {ok: true, result: result} +} + +function handle_snapshot(req) { + var body = json.decode(req.body) + var probes = body.probes + if (probes == null) { + return {ok: false, error: "missing probes array"} + } + + var results = {} + var i = 0 + var p = null + var target_probes = null + var probe_fn = null + var key = null + while (i < length(probes)) { + p = probes[i] + key = p.target + "/" + p.name + target_probes = registry[p.target] + if (target_probes != null) { + probe_fn = target_probes[p.name] + if (probe_fn != null) { + results[key] = probe_fn(p.args != null ? p.args : {}) + } + } + i = i + 1 + } + return {ok: true, results: results} +} + +function start_server() { + server_fd = http.serve(port) + http.on_request(server_fd, handle_request) +} + +function register(target, probes) { + registry[target] = probes + if (server_fd == null) start_server() +} + +return { + register: register, + port: port +} diff --git a/source/cell.h b/source/cell.h index 4b847137..d3d0954c 100644 --- a/source/cell.h +++ b/source/cell.h @@ -912,6 +912,14 @@ int js_is_blob(JSContext *js, JSValue v); #include "blob.h" +/* ============================================================ + Actor I/O Watch — event-driven fd monitoring + ============================================================ */ + +void actor_watch_readable(JSContext *actor, int fd, JSValue fn); +void actor_watch_writable(JSContext *actor, int fd, JSValue fn); +void actor_unwatch(JSContext *actor, int fd); + /* ============================================================ Convenience Functions ============================================================ */ diff --git a/source/pit_internal.h b/source/pit_internal.h index 81ea97d8..36c857c8 100644 --- a/source/pit_internal.h +++ b/source/pit_internal.h @@ -859,6 +859,14 @@ typedef struct letter { }; } letter; +/* I/O watch entry — one per watched file descriptor */ +typedef struct { + int fd; + short events; /* POLLIN, POLLOUT */ + JSContext *actor; + JSValue callback; +} io_watch; + /* Actor state machine constants */ #define ACTOR_IDLE 0 #define ACTOR_READY 1 @@ -1049,6 +1057,10 @@ void enqueue_actor_priority(JSContext *actor); void actor_clock(JSContext *actor, JSValue fn); uint32_t actor_delay(JSContext *actor, JSValue fn, double seconds); JSValue actor_remove_timer(JSContext *actor, uint32_t timer_id); +void actor_watch(JSContext *actor, int fd, short events, JSValue fn); +void actor_watch_readable(JSContext *actor, int fd, JSValue fn); +void actor_watch_writable(JSContext *actor, int fd, JSValue fn); +void actor_unwatch(JSContext *actor, int fd); void exit_handler(void); void actor_loop(void); void actor_initialize(void); diff --git a/source/scheduler.c b/source/scheduler.c index 34c96438..0bd62283 100644 --- a/source/scheduler.c +++ b/source/scheduler.c @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "stb_ds.h" #include "cell.h" @@ -13,6 +15,7 @@ #ifdef _WIN32 #include +#include #endif typedef struct actor_node { @@ -93,6 +96,95 @@ static int has_any_work(actor_node *heads[]) { static pthread_mutex_t *actors_mutex; static struct { char *key; JSContext *value; } *actors = NULL; +/* ============================================================ + I/O Watch Thread — poll()-based fd monitoring + ============================================================ */ +static io_watch *g_io_watches = NULL; /* stb_ds dynamic array */ +static pthread_mutex_t io_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_t io_thread; +static int io_pipe[2] = {-1, -1}; /* self-pipe to wake poll() */ + +static void io_wake(void) { + char c = 1; + (void)write(io_pipe[1], &c, 1); +} + +static void *io_thread_func(void *arg) { + (void)arg; + while (1) { + pthread_mutex_lock(&io_mutex); + if (engine.shutting_down) { + pthread_mutex_unlock(&io_mutex); + return NULL; + } + int n = arrlen(g_io_watches); + /* +1 for the wakeup pipe */ + struct pollfd *fds = malloc(sizeof(struct pollfd) * (n + 1)); + fds[0].fd = io_pipe[0]; + fds[0].events = POLLIN; + for (int i = 0; i < n; i++) { + fds[i + 1].fd = g_io_watches[i].fd; + fds[i + 1].events = g_io_watches[i].events; + } + pthread_mutex_unlock(&io_mutex); + + int ready = poll(fds, n + 1, 500); /* 500ms timeout for shutdown check */ + if (ready <= 0) { + free(fds); + continue; + } + + /* Drain wakeup pipe */ + if (fds[0].revents & POLLIN) { + char buf[64]; + (void)read(io_pipe[0], buf, sizeof(buf)); + } + + /* Fire callbacks for ready fds */ + pthread_mutex_lock(&io_mutex); + for (int i = n - 1; i >= 0; i--) { + if (i >= arrlen(g_io_watches)) continue; + if (fds[i + 1].revents & g_io_watches[i].events) { + io_watch w = g_io_watches[i]; + arrdel(g_io_watches, i); /* one-shot: remove before firing */ + pthread_mutex_unlock(&io_mutex); + actor_clock(w.actor, w.callback); + pthread_mutex_lock(&io_mutex); + } + } + pthread_mutex_unlock(&io_mutex); + free(fds); + } + return NULL; +} + +void actor_watch(JSContext *actor, int fd, short events, JSValue fn) { + io_watch w = { .fd = fd, .events = events, .actor = actor, .callback = fn }; + pthread_mutex_lock(&io_mutex); + arrput(g_io_watches, w); + pthread_mutex_unlock(&io_mutex); + io_wake(); +} + +void actor_watch_readable(JSContext *actor, int fd, JSValue fn) { + actor_watch(actor, fd, POLLIN, fn); +} + +void actor_watch_writable(JSContext *actor, int fd, JSValue fn) { + actor_watch(actor, fd, POLLOUT, fn); +} + +void actor_unwatch(JSContext *actor, int fd) { + pthread_mutex_lock(&io_mutex); + for (int i = arrlen(g_io_watches) - 1; i >= 0; i--) { + if (g_io_watches[i].actor == actor && g_io_watches[i].fd == fd) { + arrdel(g_io_watches, i); + } + } + pthread_mutex_unlock(&io_mutex); + io_wake(); +} + #define lockless_shdel(NAME, KEY) pthread_mutex_lock(NAME##_mutex); shdel(NAME, KEY); pthread_mutex_unlock(NAME##_mutex); #define lockless_shlen(NAME) ({ \ pthread_mutex_lock(NAME##_mutex); \ @@ -307,6 +399,13 @@ void actor_initialize(void) { // Start Timer Thread pthread_create(&engine.timer_thread, NULL, timer_thread_func, NULL); + // Start I/O Watch Thread + if (pipe(io_pipe) == 0) { + fcntl(io_pipe[0], F_SETFL, O_NONBLOCK); + fcntl(io_pipe[1], F_SETFL, O_NONBLOCK); + pthread_create(&io_thread, NULL, io_thread_func, NULL); + } + // Start Workers #ifdef _WIN32 SYSTEM_INFO sysinfo; @@ -359,6 +458,14 @@ void actor_free(JSContext *actor) pthread_mutex_unlock(&engine.lock); } + /* Remove I/O watches for this actor */ + pthread_mutex_lock(&io_mutex); + for (int i = arrlen(g_io_watches) - 1; i >= 0; i--) { + if (g_io_watches[i].actor == actor) + arrdel(g_io_watches, i); + } + pthread_mutex_unlock(&io_mutex); + // Do not go forward with actor destruction until the actor is completely free pthread_mutex_lock(actor->msg_mutex); pthread_mutex_lock(actor->mutex); @@ -438,6 +545,16 @@ void exit_handler(void) { pthread_join(engine.timer_thread, NULL); + /* Shut down I/O thread */ + if (io_pipe[1] >= 0) { + io_wake(); + pthread_join(io_thread, NULL); + close(io_pipe[0]); + close(io_pipe[1]); + io_pipe[0] = io_pipe[1] = -1; + } + arrfree(g_io_watches); + for (int i=0; i < engine.num_workers; i++) { pthread_join(engine.worker_threads[i], NULL); } @@ -883,6 +1000,17 @@ void actor_gc_scan(JSContext *ctx, if (ctx->msg_mutex) pthread_mutex_unlock(ctx->msg_mutex); + + /* Scan I/O watch callbacks belonging to this actor */ + pthread_mutex_lock(&io_mutex); + for (int i = 0; i < arrlen(g_io_watches); i++) { + if (g_io_watches[i].actor == ctx) { + g_io_watches[i].callback = gc_copy_value(ctx, + g_io_watches[i].callback, + from_base, from_end, to_base, to_free, to_end); + } + } + pthread_mutex_unlock(&io_mutex); } void actor_clock(JSContext *actor, JSValue fn)