slow messags

This commit is contained in:
2026-02-26 00:56:43 -06:00
parent e203700d37
commit eb19b18594
5 changed files with 373 additions and 387 deletions

View File

@@ -5,6 +5,10 @@ var native_mode = false
var _no_warn = (init != null && init.no_warn) ? true : false
var SYSYM = '__SYSTEM__'
var log = function(name, args) {
}
var _cell = {}
var need_stop = false
@@ -908,6 +912,7 @@ $_.delay = function delay(fn, seconds) {
send_messages()
}
var id = actor_mod.delay(delay_turn, _seconds)
log.connection(`$delay: registered timer id=${text(id)} seconds=${text(_seconds)}`)
return function() { actor_mod.removetimer(id) }
}
@@ -959,20 +964,34 @@ $_.couple = function couple(actor) {
}
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
log.connection(`contact: creating actor for ${record.address}:${text(record.port)}`)
var a = create_actor(record)
log.connection(`contact: actor created, sending contact`)
send(a, record, function(reply) {
var server = null
if (reply && reply.actor_id) {
server = create_actor({id: reply.actor_id, address: record.address, port: record.port})
log.connection(`contact: connected, server id=${reply.actor_id}`)
callback(server)
} else {
log.connection(`contact: connection failed or no reply`)
callback(null)
}
})
}
$_.portal = function(fn, port) {
if (portal) {
log.error(`Already started a portal listening on ${portal.port()}`)
log.error(`Already started a portal listening on ${enet.host_port(portal)}`)
disrupt
}
if (!port) {
log.error("Requires a valid port.")
disrupt
}
log.system(`starting a portal on port ${port}`)
log.connection(`portal: starting on port ${text(port)}`)
portal = enet.create_host({address: "any", port})
log.connection(`portal: created host=${portal}`)
portal_fn = fn
enet_check()
}
@@ -1362,59 +1381,74 @@ function guid(bits)
}
enet = use_core('internal/enet')
enet = use_core('enet')
function peer_connection(peer) {
return {
latency: peer.rtt(),
latency: enet.peer_rtt(peer),
bandwidth: {
incoming: peer.incoming_bandwidth(),
outgoing: peer.outgoing_bandwidth()
incoming: enet.peer_incoming_bandwidth(peer),
outgoing: enet.peer_outgoing_bandwidth(peer)
},
activity: {
last_sent: peer.last_send_time(),
last_received: peer.last_receive_time()
last_sent: enet.peer_last_send_time(peer),
last_received: enet.peer_last_receive_time(peer)
},
mtu: peer.mtu(),
mtu: enet.peer_mtu(peer),
data: {
incoming_total: peer.incoming_data_total(),
outgoing_total: peer.outgoing_data_total(),
reliable_in_transit: peer.reliable_data_in_transit()
incoming_total: enet.peer_incoming_data_total(peer),
outgoing_total: enet.peer_outgoing_data_total(peer),
reliable_in_transit: enet.peer_reliable_data_in_transit(peer)
},
latency_variance: peer.rtt_variance(),
packet_loss: peer.packet_loss(),
state: peer.state()
latency_variance: enet.peer_rtt_variance(peer),
packet_loss: enet.peer_packet_loss(peer),
state: enet.peer_state(peer)
}
}
// Strip ::ffff: prefix from IPv6-mapped IPv4 addresses
function normalize_addr(addr) {
if (starts_with(addr, "::ffff:"))
return text(addr, 7)
return addr
}
function handle_host(e) {
var queue = null
var data = null
var addr = null
var port = null
var pkey = null
log.connection(`handle_host: event type=${e.type}`)
if (e.type == "connect") {
addr = e.peer.address()
port = e.peer.port()
log.system(`connected a new peer: ${addr}:${port}`)
peers[`${addr}:${port}`] = e.peer
queue = peer_queue[e.peer]
addr = normalize_addr(enet.peer_address(e.peer))
port = enet.peer_port(e.peer)
pkey = addr + ":" + text(port)
log.connection(`handle_host: peer connected ${pkey}`)
peers[pkey] = e.peer
queue = peer_queue[pkey]
if (queue) {
arrfor(queue, (msg, index) => e.peer.send(nota.encode(msg)))
log.system(`sent queue out of queue`)
delete peer_queue[e.peer]
log.connection(`handle_host: flushing ${text(length(queue))} queued messages to ${pkey}`)
arrfor(queue, (msg, index) => enet.send(e.peer, nota.encode(msg)))
delete peer_queue[pkey]
} else {
log.connection(`handle_host: no queued messages for ${pkey}`)
}
} else if (e.type == "disconnect") {
delete peer_queue[e.peer]
arrfor(array(peers), function(id, index) {
if (peers[id] == e.peer) delete peers[id]
})
log.system('portal got disconnect from ' + e.peer.address() + ":" + e.peer.port())
addr = normalize_addr(enet.peer_address(e.peer))
port = enet.peer_port(e.peer)
pkey = addr + ":" + text(port)
log.connection(`handle_host: peer disconnected ${pkey}`)
delete peer_queue[pkey]
delete peers[pkey]
} else if (e.type == "receive") {
data = nota.decode(e.data)
if (data.replycc && !data.replycc.address) {
data.replycc[ACTORDATA].address = e.peer.address()
data.replycc[ACTORDATA].port = e.peer.port()
log.connection(`handle_host: received data type=${data.type}`)
if (data.replycc_id && !data.replycc) {
data.replycc = create_actor({id: data.replycc_id, address: normalize_addr(enet.peer_address(e.peer)), port: enet.peer_port(e.peer)})
} else if (data.replycc && !data.replycc.address) {
data.replycc[ACTORDATA].address = normalize_addr(enet.peer_address(e.peer))
data.replycc[ACTORDATA].port = enet.peer_port(e.peer)
}
if (data.data) populate_actor_addresses(data.data, e)
handle_message(data)
@@ -1425,8 +1459,8 @@ function handle_host(e) {
function populate_actor_addresses(obj, e) {
if (!is_object(obj)) return
if (obj[ACTORDATA] && !obj[ACTORDATA].address) {
obj[ACTORDATA].address = e.peer.address()
obj[ACTORDATA].port = e.peer.port()
obj[ACTORDATA].address = normalize_addr(enet.peer_address(e.peer))
obj[ACTORDATA].port = enet.peer_port(e.peer)
}
arrfor(array(obj), function(key, index) {
if (key in obj)
@@ -1447,6 +1481,7 @@ function actor_send_immediate(actor, send) {
function actor_send(actor, message) {
var wota_blob = null
var peer = null
var pkey = null
if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop
return
@@ -1463,12 +1498,14 @@ function actor_send(actor, message) {
// message to self
if (actor[ACTORDATA].id == _cell.id) {
log.connection(`actor_send: message to self, type=${message.type}`)
if (receive_fn) receive_fn(message.data)
return
}
// message to actor in same flock
if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
log.connection(`actor_send: local mailbox for ${text(actor[ACTORDATA].id, 0, 8)}`)
wota_blob = wota.encode(message)
actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob)
return
@@ -1480,23 +1517,27 @@ function actor_send(actor, message) {
else
message.type = "contact"
peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port]
pkey = actor[ACTORDATA].address + ":" + text(actor[ACTORDATA].port)
log.connection(`actor_send: remote ${pkey} msg.type=${message.type}`)
peer = peers[pkey]
if (!peer) {
if (!portal) {
log.system(`creating a contactor ...`)
log.connection(`actor_send: no portal, creating contactor`)
portal = enet.create_host({address:"any"})
log.system(`allowing contact to port ${portal.port()}`)
log.connection(`actor_send: contactor on port ${text(enet.host_port(portal))}`)
enet_check()
}
log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`)
peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port)
peer_queue.set(peer, [message])
log.connection(`actor_send: no peer for ${pkey}, connecting...`)
peer = enet.connect(portal, actor[ACTORDATA].address, actor[ACTORDATA].port)
log.connection(`actor_send: connect initiated, peer=${peer}, queuing message`)
peer_queue[pkey] = [message]
} else {
peer.send(nota.encode(message))
log.connection(`actor_send: have peer for ${pkey}, sending directly`)
enet.send(peer, nota.encode(message))
}
return
}
log.system(`Unable to send message to actor ${actor[ACTORDATA].id}`)
log.connection(`actor_send: no route for actor id=${actor[ACTORDATA].id} (no address, not local)`)
}
function send_messages() {
@@ -1509,6 +1550,8 @@ function send_messages() {
var _qi = 0
var _qm = null
if (length(message_queue) > 0)
log.connection(`send_messages: processing ${text(length(message_queue))} queued messages`)
while (_qi < length(message_queue)) {
_qm = message_queue[_qi]
if (_qm.startup) {
@@ -1667,13 +1710,40 @@ function handle_sysym(msg)
function handle_message(msg) {
var letter = null
var fn = null
var conn = null
var pkey = null
var peer = null
var reply_msg = null
log.connection(`handle_message: type=${msg.type}`)
if (msg[SYSYM]) {
handle_sysym(msg[SYSYM])
return
}
if (msg.type == "user") {
if (msg.type == "contact") {
// Remote $contact arrived — create a connection actor for the caller.
// msg.replycc was constructed by handle_host with id + address + port.
conn = msg.replycc
log.connection(`handle_message: contact from ${conn ? conn[ACTORDATA].id : "unknown"}`)
// Reply directly over enet so the client's $contact callback fires
if (conn && msg.reply) {
pkey = conn[ACTORDATA].address + ":" + text(conn[ACTORDATA].port)
peer = peers[pkey]
if (peer) {
reply_msg = {type: "user", data: {type: "connected", actor_id: _cell.id}, return: msg.reply}
log.connection(`handle_message: sending contact reply to ${pkey}`)
enet.send(peer, nota.encode(reply_msg))
}
}
if (portal_fn) {
log.connection(`handle_message: calling portal_fn`)
portal_fn(conn)
}
} else if (msg.type == "user") {
letter = msg.data // what the sender really sent
if (msg.replycc_id) {
msg.replycc = create_actor({id: msg.replycc_id})
@@ -1683,11 +1753,13 @@ function handle_message(msg) {
if (msg.return) {
fn = replies[msg.return]
log.connection(`handle_message: reply callback ${msg.return} fn=${fn ? "yes" : "no"}`)
if (fn) fn(letter)
delete replies[msg.return]
return
}
log.connection(`handle_message: dispatching to receive_fn=${receive_fn ? "yes" : "no"}`)
if (receive_fn) receive_fn(letter)
} else if (msg.type == "stopped") {
handle_actor_disconnect(msg.id)
@@ -1696,8 +1768,12 @@ function handle_message(msg) {
function enet_check()
{
if (portal) portal.service(handle_host)
if (portal) {
log.connection(`enet_check: servicing portal`)
enet.service(portal, handle_host)
} else {
log.connection(`enet_check: no portal`)
}
$_.delay(enet_check, ENETSERVICE);
}