@@ -574,8 +574,6 @@ actor.delay = function(fn, seconds) {
}
actor . delay . doc = ` Call 'fn' after 'seconds' with 'this' set to the actor. `
var act = use ( 'actor' )
actor [ UNDERLINGS ] = new Set ( )
@@ -619,36 +617,10 @@ var nota = use('nota')
var dying = false
var HEADER = Symbol ( )
var ACTORDATA = Symbol ( )
var $actor = {
toString : print _actor
}
function print _actor ( ) {
return json . encode ( this . _ _ACTORDATA _ _ , 1 )
}
function create _actor ( data = { } ) {
var newactor = Object . create ( $actor )
// Store actual address/port values on the data object
data . _address = data . _address || local _address
data . _port = data . _port || local _port
Object . defineProperty ( data , 'address' , {
get : function ( ) { return this . _address || local _address } ,
set : function ( x ) { this . _address = x } ,
enumerable : true
} )
Object . defineProperty ( data , 'port' , {
get : function ( ) { return this . _port || local _port } ,
set : function ( x ) { this . _port = x } ,
enumerable : true
} )
newactor . _ _ACTORDATA _ _ = data
return newactor
function create _actor ( id = util . guid ( ) ) {
return { id }
}
var $ _ = create _actor ( )
@@ -669,7 +641,7 @@ var receive_fn = undefined
var greeters = { }
$ _ . is _actor = function ( actor ) {
return actor . _ _ACTORDATA _ _
return "id" in actor || ACTORDATA in actor
}
function peer _connection ( peer ) {
@@ -696,12 +668,12 @@ function peer_connection(peer) {
}
$ _ . connection = function ( callback , actor , config ) {
var peer = peers [ actor . _ _ACTORDATA _ _ . id]
var peer = peers [ actor . id ]
if ( peer ) {
callback ( peer _connection ( peer ) )
return
}
if ( os . mailbox _exist ( actor . _ _ACTORDATA _ _ . id) ) {
if ( os . mailbox _exist ( actor . id ) ) {
callback ( { type : "local" } )
return
}
@@ -709,15 +681,22 @@ $_.connection = function(callback, actor, config) {
}
$ _ . connection [ prosperon . DOC ] = "takes a callback function, an actor object, and a configuration record..."
var peers = { }
var id _address = { }
var peer _queue = new WeakMap ( )
var portal = undefined
var portal _fn = undefined
var local _address = undefined
var local _port = undefined
// 1) id → peer (live ENet connection)
const peer _by _id = Object . create ( null )
// 2) id → {address,port} (last seen endpoint)
const id _address = Object . create ( null )
// 3) address:port → peer (fast lookup for incoming events)
const peers = Object . create ( null )
var service _delay = 0.01
var peer _queue = new WeakMap ( )
var portal , portal _fn
var local _address , local _port
var service _delay = 0.01 // how often to ping enet
function route _set ( id , peer ) { peer _by _id [ id ] = peer }
function route _hint ( id , address , port ) { id _address [ id ] = { address , port } }
function route _peerString ( peer ) { return ` ${ peer . address } : ${ peer . port } ` }
$ _ . portal = function ( fn , port ) {
if ( portal ) throw new Error ( ` Already started a portal listening on ${ portal . port } ` )
@@ -727,70 +706,138 @@ $_.portal = function(fn, port) {
local _address = 'localhost'
local _port = port
portal _fn = fn
console . log ( ` I am now ${ $ _ } ` )
console . log ( ` Portal initialized with actor ID: ${ $ _ . id } ` )
}
$ _ . portal [ prosperon . DOC ] = "starts a public address that performs introduction services..."
function handle _host ( e ) {
switch ( e . type ) {
case "connect" :
console . log ( ` connected a new peer: ${ e . peer . address } : ${ e . peer . port } ` )
peers [ ` ${ e . peer . address } : ${ e . peer . port } ` ] = e . peer
// Store peer information for future routing
var key = route _peerString ( e . peer )
peers [ key ] = e . peer
console . log ( ` Connected to peer: ${ e . peer . address } : ${ e . peer . port } ` )
// Check if we have queued messages for this peer
var queue = peer _queue . get ( e . peer )
if ( queue ) {
for ( var msg of queue ) e . peer . send ( nota . encode ( msg ) )
console . log ( ` sent ${ json . encode ( msg ) } out of queue ` )
if ( queue && queue . length > 0 ) {
console . log ( ` Sending ${ queue . length } queued messages to newly connected peer ` )
for ( var msg of queue ) {
try {
e . peer . send ( nota . encode ( msg ) )
console . log ( ` Sent queued message: ${ json . encode ( msg ) } ` )
} catch ( err ) {
console . error ( ` Failed to send queued message: ${ err . message } ` )
}
}
// Clear the queue after sending
peer _queue . delete ( e . peer )
}
break
case "disconnect" :
// Clean up peer references
var key = route _peerString ( e . peer )
console . log ( ` Peer disconnected: ${ key } ` )
// Remove from peers map
delete peers [ key ]
// Remove from queue
peer _queue . delete ( e . peer )
for ( var id in peers ) if ( peers [ id ] === e . peer ) delete peers [ id ]
console . log ( 'portal got disconnect' )
// Remove from any ID-based routing
for ( var id in peer _by _id ) {
if ( peer _by _id [ id ] === e . peer ) {
console . log ( ` Removing route for actor ${ id } ` )
delete peer _by _id [ id ]
}
}
break
case "receive" :
var data = nota . decode ( e . data )
if ( data . replycc && ! data . replycc . _ _ACTORDATA _ _ . address ) {
data . replycc . _ _ACTORDATA _ _ . address = e. peer . address
data . replycc . _ _ACTORDATA _ _ . port = e . peer . port
// Store in global lookup for future routing
id _address [ data . replycc . _ _ACTORDATA _ _ . id ] = {
address : e . peer . address ,
port : e . peer . port
// Decode the message
try {
var msg = nota . decod e( e . data )
console . log ( ` Received message from ${ e . peer . address } : ${ e . peer . port } : ` , json . encode ( msg ) )
// Update routing information based on message contents
function touch ( obj ) {
if ( ! obj || typeof obj !== 'object' ) return
if ( typeof obj . id === 'string' ) {
// Set up routing for this actor ID
route _set ( obj . id , e . peer )
// Add address hint if we don't have one
if ( ! id _address [ obj . id ] ) {
route _hint ( obj . id , e . peer . address , e . peer . port )
console . log ( ` Added route hint for ${ obj . id } : ${ e . peer . address } : ${ e . peer . port } ` )
}
}
// Also populate address/port for any actor objects in the message data
function populate _actor _addresses ( obj ) {
i f ( typeof obj !== 'object' || obj === null ) return
if ( obj . _ _ACTORDATA _ _ && ! obj . _ _ACTORDATA _ _ . address ) {
obj . _ _ACTORDATA _ _ . address = e . peer . address
obj . _ _ACTORDATA _ _ . port = e . peer . port
// Store in global lookup for future routing
id _address [ obj . _ _ACTORDATA _ _ . id ] = {
address : e . peer . address ,
port : e . peer . port
// Recursively touch all properties
for ( const k in obj ) {
if ( obj . hasOwnProperty ( k ) ) touch ( obj [ k ] )
}
}
for ( var key in obj ) {
if ( obj . hasOwnProperty ( key ) ) {
populate _actor _addresses ( obj [ key ] )
// Extract routing information
touch ( msg )
// Process the message
handle _message ( msg )
} catch ( err ) {
console . error ( ` Failed to decode or process received message: ${ err . message } ` )
}
}
}
if ( data . data ) populate _actor _addresses ( data . data )
handle _message ( data )
break
}
}
var contactor = undefined
$ _ . contact = function ( callback , record ) {
$ _ . s end ( {
_ _ACTORDATA _ _ : {
address : record . address ,
port : record . port
console . log ( "Contact function called with:" , json . encode ( record ) ) ;
// Create a pseudo-actor for the initial contact
var sendto = { }
sendto [ ACTORDATA ] = record
// Ensure we send a properly formatted contact message
var contactMsg = {
type : "contact" ,
data : record
} ;
// Wrap the original callback to handle the actor reference properly
function wrappedCallback ( response ) {
console . log ( "Contact wrapped callback received:" , json . encode ( response ) ) ;
// First param should be the actor, second param is reason/error
if ( response && typeof response === 'object' ) {
if ( 'id' in response ) {
// This is an actor reference, pass it directly
console . log ( "Identified actor reference in response" ) ;
callback ( response , null ) ;
} else if ( response . data && typeof response . data === 'object' && 'id' in response . data ) {
// The actor reference is in the data field
console . log ( "Identified actor reference in response.data" ) ;
callback ( response . data , null ) ;
} else {
// No actor reference found, must be an error
console . log ( "No actor reference found in response" ) ;
callback ( null , response ) ;
}
} , record , callback )
} else {
// Error case or unexpected response format
console . log ( "Unexpected response format" ) ;
callback ( null , response ) ;
}
}
// Send the contact message with callback for response
$ _ . send ( sendto , contactMsg , wrappedCallback ) ;
}
$ _ . contact [ prosperon . DOC ] = "The contact function sends a message to a portal..."
@@ -822,7 +869,7 @@ $_.stop = function(actor) {
}
if ( ! $ _ . is _actor ( actor ) )
throw new Error ( 'Can only call stop on an actor.' )
if ( ! underlings . has ( actor . _ _ACTORDATA _ _ . id) )
if ( ! underlings . has ( actor . id ) )
throw new Error ( 'Can only call stop on an underling or self.' )
actor _prep ( actor , { type : "stop" , id : prosperon . id } )
@@ -842,8 +889,8 @@ $_.delay[prosperon.DOC] = "used to schedule the invocation of a function..."
var couplings = new Set ( )
$ _ . couple = function ( actor ) {
console . log ( ` coupled to ${ actor . _ _ACTORDATA _ _ . id} ` )
couplings . add ( actor . _ _ACTORDATA _ _ . id)
console . log ( ` coupled to ${ actor . id } ` )
couplings . add ( actor . id )
}
$ _ . couple [ prosperon . DOC ] = "causes this actor to stop when another actor stops."
@@ -851,41 +898,91 @@ function actor_prep(actor, send) {
message _queue . push ( { actor , send } ) ;
}
function actor _send ( actor , message ) {
if ( ! $ _ . is _actor ( actor ) ) throw new Error ( ` Must send to an actor object. Attempted send to ${ json . encode ( actor ) } ` )
if ( typeof message != = 'object' ) throw new Error ( 'Must send an object record.' )
function ensure _route ( actor ) {
// Extract the actor ID
const id = actor . id
if ( actor . _ _ACTORDATA _ _ . id === prosperon . id ) {
if ( receive _fn ) receive _fn ( message . data )
return
}
if ( actor . _ _ACTORDATA _ _ . id && os . mailbox _exist ( actor . _ _ACTORDATA _ _ . id ) ) {
os . mailbox _push ( actor . _ _ACTORDATA _ _ . id , message )
return
// First check if we already have a peer for this actor
if ( peer _by _id [ id ] ) {
console . log ( ` Found existing peer for actor ${ id } ` )
return peer _by _id [ id ]
}
// Use fallback address lookup if actor doesn't have address info
if ( ! actor . _ _ACTORDATA _ _ . address && id _address [ actor . _ _ACTORDATA _ _ . id ] ) {
Object . assign ( actor . _ _ACTORDATA _ _ , id _address [ actor . _ _ACTORDATA _ _ . id ] )
}
// Check if we have a hint for this actor's address
const hint = id _address [ id ]
if ( hint ) {
console . log ( ` Found address hint for ${ id } : ${ hint . address } : ${ hint . port } ` )
if ( actor . _ _ACTORDATA _ _ . address ) {
if ( actor . _ _ACTORDATA _ _ . id ) message . target = actor . _ _ACTORDATA _ _ . id
else message . type = "contact"
var peer = peers [ actor . _ _ACTORDATA _ _ . address + ":" + actor . _ _ACTORDATA _ _ . port ]
if ( ! peer ) {
// Create host if needed
if ( ! contactor && ! portal ) {
console . log ( ` c reating a contactor ... ` )
console . log ( "C reating new contactor host" )
contactor = enet . create _host ( )
}
peer = ( contactor || portal ) . connect ( actor . _ _ACTORDATA _ _ . address , actor . _ _ACTORDATA _ _ . port )
peer _queue . set ( peer , [ message ] )
} else {
peer . send ( nota . encode ( message ) )
// Connect to the peer
const host = contactor || portal
console . log ( ` Connecting to ${ hint . address } : ${ hint . port } via ${ contactor ? 'contactor' : 'portal' } ` )
const p = host . connect ( hint . address , hint . port )
// Initialize queue for this peer
peer _queue . set ( p , [ ] )
return p
}
// Check if we have connection data for this actor
var cnn = actor [ ACTORDATA ]
if ( cnn ) {
console . log ( ` Using ACTORDATA for connection: ${ cnn . address } : ${ cnn . port } ` )
// Create host if needed
if ( ! contactor && ! portal ) {
console . log ( "Creating new contactor host" )
contactor = enet . create _host ( )
}
// Connect to the peer
const host = contactor || portal
console . log ( ` Connecting to ${ cnn . address } : ${ cnn . port } via ${ contactor ? 'contactor' : 'portal' } ` )
var p = host . connect ( cnn . address , cnn . port )
// Initialize queue for this peer
peer _queue . set ( p , [ ] )
return p
}
console . log ( ` No route found for actor ${ id } ` )
return null
}
function actor _send ( actor , send ) {
if ( ! $ _ . is _actor ( actor ) ) throw Error ( 'bad actor: ' + json . encode ( actor ) )
if ( actor . id === prosperon . id ) { // message to self
console . log ( "actor_send: message to self" )
if ( receive _fn )
receive _fn ( send . data ) ;
return
}
throw new Error ( ` Unable to send message to actor ${ json . encode ( actor ) } ` )
if ( os . mailbox _exist ( actor . id ) ) { // message to local mailbox
os . mailbox _push ( actor . id , send ) ;
return
}
const peer = ensure _route ( actor )
if ( peer ) {
console . log ( "actor_send: sending via peer route" , peer . address + ":" + peer . port )
peer _queue . get ( peer ) ? . push ( send ) || peer . send ( nota . encode ( send ) )
return
}
// fallback – forward via parent header if present
if ( actor [ HEADER ] && actor [ HEADER ] . replycc ) {
console . log ( "actor_send: forwarding via parent header" )
const fwd = { type : 'forward' , forward _to : actor . id , payload : send }
actor _send ( actor [ HEADER ] . replycc , fwd ) ; return
}
console . error ( "actor_send: no route to actor" , actor . id )
throw Error ( ` no route to actor ${ actor . id } ` )
}
// Holds all messages queued during the current turn.
@@ -894,19 +991,28 @@ var message_queue = []
function send _messages ( ) {
// Attempt to flush the queued messages. If one fails, keep going anyway.
var errors = [ ]
// Process all queued messages
while ( message _queue . length > 0 ) {
var item = message _queue . shift ( )
var actor = item . actor
var send = item . send
var { actor , send } = message _queue . shift ( )
// console.log("Processing queued message:", json.encode( send))
try {
actor _send ( actor , send )
// console.log("Message sent successfully")
} catch ( err ) {
console . error ( "Failed to send message:" , err . message )
errors . push ( err )
}
}
if ( errors . length > 0 ) {
console . error ( "Some messages failed to send:" , errors)
for ( var i of errors ) console . error ( i )
// Report any send errors
if ( errors . length ) {
console . error ( ` ${ errors . length } messages failed to send ` )
for ( var i = 0 ; i < Math . min ( errors . length , 3 ) ; i ++ ) {
console . error ( ` Error ${ i + 1 } : ${ errors [ i ] . message } ` )
}
}
}
@@ -915,15 +1021,30 @@ var replies = {}
$ _ . send = function ( actor , message , reply ) {
if ( typeof message !== 'object' )
throw new Error ( 'Message must be an object' )
var send = { type : "user" , data : message }
if ( actor [ HEADER ] && actor [ HEADER ] . replycc ) {
// If message already has type, respect it, otherwise wrap it as user message
var send = message . type ? message : { type : "user" , data : message }
// Make sure contact messages have 'data' property
if ( message . type === 'contact' && ! send . data ) {
console . log ( "Fixing contact message structure" )
send . data = message . data || { }
}
if ( actor [ HEADER ] && actor [ HEADER ] . replycc ) { // in this case, it's not a true actor, but a message we're responding to
console . log ( "Send: responding to message with header" , json . encode ( actor [ HEADER ] ) )
var header = actor [ HEADER ]
if ( ! header . replycc || ! $ _ . is _actor ( header . replycc ) )
throw new Error ( ` Supplied actor had a return, but it's not a valid actor! ${ json . encode ( actor [ HEADER ] ) } ` )
actor = header . replycc
send . return = header . reply
// When replying to a contact message, ensure proper data structure
if ( header . type === "contact" && send . type === "user" ) {
// Make sure we're sending the actor ID in a proper format for contact responses
send . data = send . data || $ _ ;
}
}
if ( reply ) {
@@ -931,9 +1052,11 @@ $_.send = function(actor, message, reply) {
replies [ id ] = reply
send . reply = id
send . replycc = $ _
console . log ( "Send: added reply callback with id" , id )
}
// Instead of sending immediately, queue it
// console.log("Send: queuing message", json.encode(send))
actor _prep ( actor , send ) ;
}
$ _ . send [ prosperon . DOC ] = "sends a message to another actor..."
@@ -956,9 +1079,10 @@ os.register_actor(prosperon.id, function(msg) {
}
} , prosperon . args . main )
$ _ . _ _ACTORDATA _ _ . id = prosperon . id
$ _ . id = prosperon . id
if ( prosperon . args . overling ) overling = create _actor ( prosperon . args . overling )
if ( prosperon . args . overling ) overling = { _ _ACTORDATA _ _ : { id : prosperon . args . overling } }
if ( prosperon . args . root ) root = json . decode ( prosperon . args . root )
else root = $ _
@@ -977,12 +1101,8 @@ actor.spawn(prosperon.args.program)
function destroyself ( ) {
console . log ( ` Got the message to destroy self. ` )
dying = true
for ( var i of underlings ) {
var act = {
_ _ACTORDATA _ _ : { id : i }
}
$ _ . stop ( act ) ;
}
for ( var i of underlings )
$ _ . stop ( create _actor ( id ) ) ;
os . destroy ( )
}
@@ -998,52 +1118,90 @@ function handle_actor_disconnect(id) {
}
function handle _message ( msg ) {
// console.log("Handling message:", json.encode(msg));
if ( msg . target ) {
if ( msg . target !== prosperon . id ) {
os . mailbox _push ( msg . target , msg )
return
console . log ( ` Forwarding message to ${ msg . target } ` ) ;
os . mailbox _push ( msg . target , msg ) ;
return ;
}
}
switch ( msg . type ) {
case "user" :
var letter = msg . data
delete msg . data
letter [ HEADER ] = msg
var letter = msg . data ;
delete msg . data ;
letter [ HEADER ] = msg ;
if ( msg . return ) {
console . log ( ` Received a message for the return id ${ msg . return } ` )
var fn = replies [ msg . return ]
if ( ! fn ) throw new Error ( ` Could not find return function for message ${ msg . return } ` )
fn ( letter )
delete replies [ msg . return ]
return
console . log ( ` Received a message for the return id ${ msg . return } ` ) ;
var fn = replies [ msg . return ] ;
if ( ! fn ) {
console . error ( ` Could not find return function for message ${ msg . return } ` ) ;
return ;
}
if ( receive _fn ) receive _fn ( letter )
break
// Handle contact response specially - first parameter is the actor reference
if ( letter && letter [ HEADER ] && letter [ HEADER ] . type === "user" && letter [ HEADER ] . return ) {
// For contact responses, we need to extract the actor reference
// letter.data should contain the actor object from the portal
if ( letter . data && typeof letter . data === 'object' && letter . data . id ) {
console . log ( "Processing contact response with actor data:" , json . encode ( letter . data ) ) ;
fn ( letter . data ) ; // This is the actor reference
} else {
console . log ( "Processing contact response with letter as actor:" , json . encode ( letter ) ) ;
fn ( letter ) ; // Fallback to the whole message
}
} else {
fn ( letter ) ;
}
delete replies [ msg . return ] ;
return ;
}
if ( receive _fn ) receive _fn ( letter ) ;
break ;
case "stop" :
if ( msg . id !== overling . _ _ACTORDATA _ _ . id)
throw new Error ( ` Got a message from an actor ${ msg . id } to stop... ` )
destroyself ( )
break
if ( overling && msg. id !== overling . id )
throw new Error ( ` Got a message from an actor ${ msg . id } to stop... ` ) ;
destroyself ( ) ;
break ;
case "contact" :
if ( portal _fn ) {
var letter2 = msg . data
letter2 [ HEADER ] = msg
de lete msg . data
portal _fn ( letter2 )
} else throw new Error ( 'Got a contact message, but no portal is established.' )
break
case "stopped" :
handle _actor _disconnect ( msg . id )
break
case "greet" :
var greeter = greeters [ msg . id ]
if ( greeter ) greeter ( { type : "actor_started" , actor : create _actor ( msg ) } )
console . log ( "Portal received contact message" ) ;
var letter2 = msg. data ;
lett er2 [ HEADER ] = msg ;
delete msg . data ;
console . log ( "Portal handling contact message:" , json . encode ( letter2 ) ) ;
portal _fn ( letter2 ) ;
} else {
console . error ( 'Got a contact message, but no portal is established.' ) ;
}
break ;
case "stopped" :
handle _actor _disconnect ( msg . id ) ;
break ;
case "greet" :
var greeter = greeters [ msg . id ] ;
if ( greeter ) {
console . log ( "Greeting actor with id:" , msg . id ) ;
greeter ( { type : "actor_started" , actor : create _actor ( msg ) } ) ;
}
break ;
case "ping" :
// Keep-alive ping, no action needed
break ;
default :
if ( receive _fn ) receive _fn ( msg )
// console.log("Default message handler for type:", msg.type);
if ( receive _fn ) receive _fn ( msg ) ;
break ;
}
} ;