enet portal works
This commit is contained in:
@@ -1143,6 +1143,8 @@ var root = null
|
||||
var receive_fn = null
|
||||
var greeters = {} // Router functions for when messages are received for a specific actor
|
||||
|
||||
var enet = use_core('internal/enet')
|
||||
|
||||
var peers = {}
|
||||
var id_address = {}
|
||||
var peer_queue = {}
|
||||
@@ -1151,24 +1153,24 @@ var portal_fn = null
|
||||
|
||||
function peer_connection(peer) {
|
||||
return {
|
||||
latency: peer.rtt,
|
||||
latency: peer.rtt(),
|
||||
bandwidth: {
|
||||
incoming: peer.incoming_bandwidth,
|
||||
outgoing: peer.outgoing_bandwidth
|
||||
incoming: peer.incoming_bandwidth(),
|
||||
outgoing: peer.outgoing_bandwidth()
|
||||
},
|
||||
activity: {
|
||||
last_sent: peer.last_send_time,
|
||||
last_received: peer.last_receive_time
|
||||
last_sent: peer.last_send_time(),
|
||||
last_received: peer.last_receive_time()
|
||||
},
|
||||
mtu: peer.mtu,
|
||||
mtu: peer.mtu(),
|
||||
data: {
|
||||
incoming_total: peer.incoming_data_total,
|
||||
outgoing_total: peer.outgoing_data_total,
|
||||
reliable_in_transit: peer.reliable_data_in_transit
|
||||
incoming_total: peer.incoming_data_total(),
|
||||
outgoing_total: peer.outgoing_data_total(),
|
||||
reliable_in_transit: peer.reliable_data_in_transit()
|
||||
},
|
||||
latency_variance: peer.rtt_variance,
|
||||
packet_loss: peer.packet_loss,
|
||||
state: peer.state
|
||||
latency_variance: peer.rtt_variance(),
|
||||
packet_loss: peer.packet_loss(),
|
||||
state: peer.state()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,7 +1192,7 @@ $_.connection = function(callback, actor, config) {
|
||||
// takes a function input value that will eventually be called with the current time in number form.
|
||||
$_.portal = function(fn, port) {
|
||||
if (portal) {
|
||||
log.error(`Already started a portal listening on ${portal.port}`)
|
||||
log.error(`Already started a portal listening on ${portal.port()}`)
|
||||
disrupt
|
||||
}
|
||||
if (!port) {
|
||||
@@ -1200,43 +1202,49 @@ $_.portal = function(fn, port) {
|
||||
log.system(`starting a portal on port ${port}`)
|
||||
portal = enet.create_host({address: "any", port})
|
||||
portal_fn = fn
|
||||
enet_check()
|
||||
}
|
||||
|
||||
function handle_host(e) {
|
||||
var queue = null
|
||||
var data = null
|
||||
var addr = null
|
||||
var port = null
|
||||
|
||||
if (e.type == "connect") {
|
||||
log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`)
|
||||
peers[`${e.peer.address}:${e.peer.port}`] = e.peer
|
||||
queue = peer_queue.get(e.peer)
|
||||
addr = e.peer.address()
|
||||
port = e.peer.port()
|
||||
log.system(`connected a new peer: ${addr}:${port}`)
|
||||
peers[`${addr}:${port}`] = e.peer
|
||||
queue = peer_queue[e.peer]
|
||||
if (queue) {
|
||||
arrfor(queue, (msg, index) => e.peer.send(nota.encode(msg)))
|
||||
log.system(`sent queue out of queue`)
|
||||
peer_queue.delete(e.peer)
|
||||
delete peer_queue[e.peer]
|
||||
}
|
||||
} else if (e.type == "disconnect") {
|
||||
peer_queue.delete(e.peer)
|
||||
delete peer_queue[e.peer]
|
||||
arrfor(array(peers), function(id, index) {
|
||||
if (peers[id] == e.peer) delete peers[id]
|
||||
})
|
||||
log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
|
||||
log.system('portal got disconnect from ' + e.peer.address() + ":" + e.peer.port())
|
||||
} else if (e.type == "receive") {
|
||||
data = nota.decode(e.data)
|
||||
if (data.replycc && !data.replycc.address) {
|
||||
data.replycc[ACTORDATA].address = e.peer.address
|
||||
data.replycc[ACTORDATA].port = e.peer.port
|
||||
data.replycc[ACTORDATA].address = e.peer.address()
|
||||
data.replycc[ACTORDATA].port = e.peer.port()
|
||||
}
|
||||
if (data.data) populate_actor_addresses(data.data, e)
|
||||
turn(data)
|
||||
handle_message(data)
|
||||
send_messages()
|
||||
}
|
||||
}
|
||||
|
||||
function populate_actor_addresses(obj, e) {
|
||||
if (!is_object(obj)) return
|
||||
if (obj[ACTORDATA] && !obj[ACTORDATA].address) {
|
||||
obj[ACTORDATA].address = e.peer.address
|
||||
obj[ACTORDATA].port = e.peer.port
|
||||
obj[ACTORDATA].address = e.peer.address()
|
||||
obj[ACTORDATA].port = e.peer.port()
|
||||
}
|
||||
arrfor(array(obj), function(key, index) {
|
||||
if (key in obj)
|
||||
@@ -1307,8 +1315,6 @@ $_.delay = function delay(fn, seconds) {
|
||||
return function() { actor_mod.removetimer(id) }
|
||||
}
|
||||
|
||||
var enet = use_core('enet')
|
||||
|
||||
// causes this actor to stop when another actor stops.
|
||||
var couplings = {}
|
||||
$_.couple = function couple(actor) {
|
||||
@@ -1368,7 +1374,8 @@ function actor_send(actor, message) {
|
||||
if (!portal) {
|
||||
log.system(`creating a contactor ...`)
|
||||
portal = enet.create_host({address:"any"})
|
||||
log.system(`allowing contact to port ${portal.port}`)
|
||||
log.system(`allowing contact to port ${portal.port()}`)
|
||||
enet_check()
|
||||
}
|
||||
log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`)
|
||||
peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port)
|
||||
@@ -1564,7 +1571,7 @@ function handle_message(msg) {
|
||||
var fn = null
|
||||
|
||||
if (msg[SYSYM]) {
|
||||
handle_sysym(msg[SYSYM], msg.from)
|
||||
handle_sysym(msg[SYSYM])
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1596,7 +1603,7 @@ function enet_check()
|
||||
$_.delay(enet_check, ENETSERVICE);
|
||||
}
|
||||
|
||||
// enet_check();
|
||||
// enet_check started on demand when $portal() is called
|
||||
|
||||
// Finally, run the program
|
||||
actor_mod.setname(_cell.args.program)
|
||||
@@ -1747,8 +1754,6 @@ $_.clock(_ => {
|
||||
}
|
||||
env.args = _cell.args.arg
|
||||
env.log = log
|
||||
os.print(`[debug] env keys: ${text(array(env), ',')}\n`)
|
||||
os.print(`[debug] $stop in env: ${text(env['$stop'] != null)}\n`)
|
||||
env = stone(env)
|
||||
|
||||
var native_build = null
|
||||
|
||||
Reference in New Issue
Block a user