http downloading in chunks
This commit is contained in:
51
CLAUDE.md
51
CLAUDE.md
@@ -223,10 +223,36 @@ meson test -C build_dbg
|
||||
|
||||
### Actor Pattern Usage
|
||||
- Create actors with `actor.spawn(script, config)`
|
||||
- Start actors with `$_.start(callback, script)` - the system automatically sends a greeting, callback receives {type: 'greet', actor: actor_ref}
|
||||
- No need to manually send greetings - `$_.start` handles this automatically
|
||||
- Manage actor hierarchy with overlings and underlings
|
||||
- Schedule actor tasks with `delay()` method
|
||||
- Schedule actor tasks with `$_.delay()` method
|
||||
- Clean up with `kill()` and `garbage()`
|
||||
|
||||
### Actor Messaging with Callbacks
|
||||
When sending a message with a callback, respond by sending to the message itself:
|
||||
```javascript
|
||||
// Sender side:
|
||||
send(actor, {type: 'status'}, response => {
|
||||
console.log(response); // Handle the response
|
||||
});
|
||||
|
||||
// Receiver side:
|
||||
$_.receiver(msg => {
|
||||
if (msg.type === 'status') {
|
||||
send(msg, {status: 'ok'}); // Send response to the message itself
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
**Critical Rules for Message Callbacks**:
|
||||
- **A message can only be used ONCE as a send target** - after sending a response to a message, it cannot be used again
|
||||
- If you need to send multiple updates (like progress), only the download request message should be used for the final response
|
||||
- Status requests should each get their own individual response
|
||||
- Actor objects and message headers are completely opaque - never try to access internal properties
|
||||
- Never access `msg.__HEADER__` or similar - the actor system handles routing internally
|
||||
- Use `$_.delay()` to schedule work and avoid blocking the message receiver
|
||||
|
||||
### Game Loop Registration
|
||||
- Register functions like `update`, `draw`, `gui`, etc.
|
||||
- Set function.layer property to control execution order
|
||||
@@ -289,21 +315,24 @@ Proper Misty networking follows a two-phase pattern:
|
||||
- Normal bidirectional messaging begins
|
||||
- Application logic handles game/service initialization
|
||||
|
||||
### Message Header Management
|
||||
Messages contain `__HEADER__` information that can cause issues:
|
||||
### Message Handling Best Practices
|
||||
Messages should be treated as opaque objects with your application data:
|
||||
|
||||
```javascript
|
||||
// CORRECT: Extract clean actor reference
|
||||
$_.receiver(e => {
|
||||
if (e.type === 'join_game') {
|
||||
var opponent = e.__HEADER__.replycc; // Clean actor reference
|
||||
send(opponent, {type: 'game_start'});
|
||||
// CORRECT: Store actor references separately
|
||||
var players = {};
|
||||
$_.receiver(msg => {
|
||||
if (msg.type === 'join_game' && msg.player_id) {
|
||||
// Store the message for later response
|
||||
players[msg.player_id] = msg;
|
||||
// Later, respond to the stored message
|
||||
send(players[msg.player_id], {type: 'game_start'});
|
||||
}
|
||||
});
|
||||
|
||||
// WRONG: Using message object directly
|
||||
$_.receiver(e => {
|
||||
opponent = e; // Contains return headers that pollute future sends
|
||||
// WRONG: Trying to access internal message properties
|
||||
$_.receiver(msg => {
|
||||
var sender = msg.__HEADER__.replycc; // Never do this!
|
||||
});
|
||||
```
|
||||
|
||||
|
||||
234
examples/http_download_actor.js
Normal file
234
examples/http_download_actor.js
Normal file
@@ -0,0 +1,234 @@
|
||||
// HTTP Download Actor
|
||||
// Handles download requests and progress queries
|
||||
var http = use('http');
|
||||
var os = use('os');
|
||||
|
||||
// Actor state
|
||||
var state = {
|
||||
downloading: false,
|
||||
current_url: null,
|
||||
total_bytes: 0,
|
||||
downloaded_bytes: 0,
|
||||
start_time: 0,
|
||||
error: null,
|
||||
connection: null,
|
||||
download_msg: null,
|
||||
chunks: []
|
||||
};
|
||||
|
||||
// Helper to calculate progress percentage
|
||||
function get_progress() {
|
||||
if (state.total_bytes === 0) {
|
||||
return 0;
|
||||
}
|
||||
return Math.round((state.downloaded_bytes / state.total_bytes) * 100);
|
||||
}
|
||||
|
||||
// Helper to format status response
|
||||
function get_status() {
|
||||
if (!state.downloading) {
|
||||
return {
|
||||
status: 'idle',
|
||||
error: state.error
|
||||
};
|
||||
}
|
||||
|
||||
var elapsed = os.now() - state.start_time;
|
||||
var bytes_per_sec = elapsed > 0 ? state.downloaded_bytes / elapsed : 0;
|
||||
|
||||
return {
|
||||
status: 'downloading',
|
||||
url: state.current_url,
|
||||
progress: get_progress(),
|
||||
downloaded_bytes: state.downloaded_bytes,
|
||||
total_bytes: state.total_bytes,
|
||||
elapsed_seconds: elapsed,
|
||||
bytes_per_second: Math.round(bytes_per_sec)
|
||||
};
|
||||
}
|
||||
|
||||
// Main message receiver
|
||||
$_.receiver(function(msg) {
|
||||
switch (msg.type) {
|
||||
case 'download':
|
||||
if (state.downloading) {
|
||||
send(msg, {
|
||||
type: 'error',
|
||||
error: 'Already downloading',
|
||||
current_url: state.current_url
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!msg.url) {
|
||||
send(msg, {
|
||||
type: 'error',
|
||||
error: 'No URL provided'
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Start download
|
||||
state.downloading = true;
|
||||
state.current_url = msg.url;
|
||||
state.total_bytes = 0;
|
||||
state.downloaded_bytes = 0;
|
||||
state.start_time = os.now();
|
||||
state.error = null;
|
||||
state.download_msg = msg;
|
||||
state.chunks = [];
|
||||
|
||||
try {
|
||||
// Start the connection
|
||||
state.connection = http.fetch_start(msg.url, msg.options || {});
|
||||
if (!state.connection) {
|
||||
throw new Error('Failed to start download');
|
||||
}
|
||||
|
||||
// Schedule the first chunk read
|
||||
$_.delay(read_next_chunk, 0);
|
||||
|
||||
} catch (e) {
|
||||
state.error = e.toString();
|
||||
state.downloading = false;
|
||||
|
||||
send(msg, {
|
||||
type: 'error',
|
||||
error: state.error,
|
||||
url: msg.url
|
||||
});
|
||||
}
|
||||
break;
|
||||
|
||||
case 'status':
|
||||
console.log(`got status request. current is ${json.encode(get_status())}`)
|
||||
send(msg, {
|
||||
type: 'status_response',
|
||||
...get_status()
|
||||
});
|
||||
break;
|
||||
|
||||
case 'cancel':
|
||||
if (state.downloading) {
|
||||
// Cancel the download
|
||||
if (state.connection) {
|
||||
http.fetch_close(state.connection);
|
||||
state.connection = null;
|
||||
}
|
||||
state.downloading = false;
|
||||
state.current_url = null;
|
||||
state.download_msg = null;
|
||||
state.chunks = [];
|
||||
|
||||
send(msg, {
|
||||
type: 'cancelled',
|
||||
message: 'Download cancelled',
|
||||
url: state.current_url
|
||||
});
|
||||
} else {
|
||||
send(msg, {
|
||||
type: 'error',
|
||||
error: 'No download in progress'
|
||||
});
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
send(msg, {
|
||||
type: 'error',
|
||||
error: 'Unknown message type: ' + msg.type
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Non-blocking chunk reader
|
||||
function read_next_chunk() {
|
||||
if (!state.downloading || !state.connection) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
var chunk = http.fetch_read_chunk(state.connection);
|
||||
|
||||
if (chunk === null) {
|
||||
// Download complete
|
||||
finish_download();
|
||||
return;
|
||||
}
|
||||
|
||||
// Store chunk
|
||||
state.chunks.push(chunk);
|
||||
|
||||
// Update progress
|
||||
var info = http.fetch_info(state.connection);
|
||||
state.downloaded_bytes = info.bytes_read;
|
||||
if (info.headers_complete && info.content_length > 0) {
|
||||
state.total_bytes = info.content_length;
|
||||
}
|
||||
|
||||
// Schedule next chunk read
|
||||
$_.delay(read_next_chunk, 0);
|
||||
|
||||
} catch (e) {
|
||||
// Error during download
|
||||
state.error = e.toString();
|
||||
if (state.connection) {
|
||||
http.fetch_close(state.connection);
|
||||
}
|
||||
|
||||
if (state.download_msg) {
|
||||
send(state.download_msg, {
|
||||
type: 'error',
|
||||
error: state.error,
|
||||
url: state.current_url
|
||||
});
|
||||
}
|
||||
|
||||
// Reset state
|
||||
state.downloading = false;
|
||||
state.connection = null;
|
||||
state.download_msg = null;
|
||||
state.chunks = [];
|
||||
}
|
||||
}
|
||||
|
||||
// Complete the download and send result
|
||||
function finish_download() {
|
||||
if (state.connection) {
|
||||
http.fetch_close(state.connection);
|
||||
}
|
||||
|
||||
// Combine all chunks into single ArrayBuffer
|
||||
var total_size = 0;
|
||||
for (var i = 0; i < state.chunks.length; i++) {
|
||||
total_size += state.chunks[i].byteLength;
|
||||
}
|
||||
|
||||
var result = new ArrayBuffer(total_size);
|
||||
var view = new Uint8Array(result);
|
||||
var offset = 0;
|
||||
|
||||
for (var i = 0; i < state.chunks.length; i++) {
|
||||
var chunk_view = new Uint8Array(state.chunks[i]);
|
||||
view.set(chunk_view, offset);
|
||||
offset += state.chunks[i].byteLength;
|
||||
}
|
||||
|
||||
// Send complete message
|
||||
if (state.download_msg) {
|
||||
send(state.download_msg, {
|
||||
type: 'complete',
|
||||
url: state.current_url,
|
||||
data: result,
|
||||
size: result.byteLength,
|
||||
duration: os.now() - state.start_time
|
||||
});
|
||||
}
|
||||
|
||||
// Reset state
|
||||
state.downloading = false;
|
||||
state.connection = null;
|
||||
state.current_url = null;
|
||||
state.download_msg = null;
|
||||
state.chunks = [];
|
||||
}
|
||||
@@ -73,7 +73,7 @@ prosperon.PATH = [
|
||||
var res_cache = {}
|
||||
|
||||
function console_rec(category, priority, line, file, msg) {
|
||||
return `[${file}:${line}: [${category} ${priority}]: ${msg}\n`
|
||||
return `[${prosperon.id.slice(0,5)}] [${file}:${line}: [${category} ${priority}]: ${msg}\n`
|
||||
|
||||
var now = time.now()
|
||||
|
||||
@@ -336,6 +336,8 @@ function is_actor(actor) {
|
||||
return actor.__ACTORDATA__
|
||||
}
|
||||
|
||||
globalThis.is_actor = is_actor;
|
||||
|
||||
function peer_connection(peer) {
|
||||
return {
|
||||
latency: peer.rtt,
|
||||
@@ -405,11 +407,11 @@ function handle_host(e) {
|
||||
case "disconnect":
|
||||
peer_queue.delete(e.peer)
|
||||
for (var id in peers) if (peers[id] === e.peer) delete peers[id]
|
||||
console.log('portal got disconnect')
|
||||
console.log('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
|
||||
break
|
||||
case "receive":
|
||||
var data = nota.decode(e.data)
|
||||
console.log(`got message ${json.encode(data)} over the wire`)
|
||||
// console.log(`got message ${json.encode(data)} over the wire`)
|
||||
if (data.replycc && !data.replycc.address) {
|
||||
data.replycc.__ACTORDATA__.address = e.peer.address
|
||||
data.replycc.__ACTORDATA__.port = e.peer.port
|
||||
@@ -428,13 +430,12 @@ function handle_host(e) {
|
||||
}
|
||||
}
|
||||
if (data.data) populate_actor_addresses(data.data)
|
||||
console.log(`turned it into ${json.encode(data)} over the wire`)
|
||||
// console.log(`turned it into ${json.encode(data)} over the wire`)
|
||||
handle_message(data)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var contactor = undefined
|
||||
$_.contact = function(callback, record) {
|
||||
send(create_actor(record), record, callback)
|
||||
}
|
||||
@@ -454,7 +455,7 @@ $_.start = function(cb, prg, arg) {
|
||||
}
|
||||
var id = util.guid()
|
||||
greeters[id] = cb
|
||||
var argv = ["./prosperon", "spawn", "--id", id, "--overling", prosperon.id, "--root", root]
|
||||
var argv = ["./prosperon", "spawn", "--id", id, "--overling", json.encode($_), "--root", json.encode(root)]
|
||||
if (prg) argv = argv.concat(['--program', prg])
|
||||
if (arg) argv = argv.concat(cmd.encode(arg))
|
||||
underlings.add(id)
|
||||
@@ -502,24 +503,33 @@ function actor_send(actor, message) {
|
||||
if (!is_actor(actor)) throw new Error(`Must send to an actor object. Attempted send to ${json.encode(actor)}`)
|
||||
if (typeof message !== 'object') throw new Error('Must send an object record.')
|
||||
|
||||
// message to self
|
||||
if (actor.__ACTORDATA__.id === prosperon.id) {
|
||||
if (receive_fn) receive_fn(message.data)
|
||||
return
|
||||
}
|
||||
|
||||
// message to actor in same flock
|
||||
if (actor.__ACTORDATA__.id && actor_mod.mailbox_exist(actor.__ACTORDATA__.id)) {
|
||||
actor_mod.mailbox_push(actor.__ACTORDATA__.id, message)
|
||||
return
|
||||
}
|
||||
|
||||
if (actor.__ACTORDATA__.address) {
|
||||
if (actor.__ACTORDATA__.id) message.target = actor.__ACTORDATA__.id
|
||||
else message.type = "contact"
|
||||
if (actor.__ACTORDATA__.id)
|
||||
message.target = actor.__ACTORDATA__.id
|
||||
else
|
||||
message.type = "contact"
|
||||
|
||||
var peer = peers[actor.__ACTORDATA__.address + ":" + actor.__ACTORDATA__.port]
|
||||
if (!peer) {
|
||||
if (!contactor && !portal) {
|
||||
if (!portal) {
|
||||
console.log(`creating a contactor ...`)
|
||||
contactor = enet.create_host()
|
||||
portal = enet.create_host({address:"any"})
|
||||
console.log(`allowing contact to port ${portal.port}`)
|
||||
}
|
||||
peer = (contactor || portal).connect(actor.__ACTORDATA__.address, actor.__ACTORDATA__.port)
|
||||
console.log(`no peer! connecting to ${actor.__ACTORDATA__.address}:${actor.__ACTORDATA__.port}`)
|
||||
peer = portal.connect(actor.__ACTORDATA__.address, actor.__ACTORDATA__.port)
|
||||
peer_queue.set(peer, [message])
|
||||
} else {
|
||||
peer.send(nota.encode(message))
|
||||
@@ -603,12 +613,12 @@ actor_mod.register_actor(prosperon.id, function(msg) {
|
||||
}
|
||||
}, prosperon.args.main)
|
||||
|
||||
if (prosperon.args.overling) overling = create_actor({id:prosperon.args.overling})
|
||||
if (prosperon.args.overling) overling = json.decode(prosperon.args.overling)
|
||||
|
||||
if (prosperon.args.root) root = json.decode(prosperon.args.root)
|
||||
else root = $_
|
||||
|
||||
if (overling) actor_prep(overling, {type:'greet', id: prosperon.id})
|
||||
if (overling) actor_prep(overling, {type:'greet', actor: $_})
|
||||
|
||||
if (!prosperon.args.program)
|
||||
os.exit(1)
|
||||
@@ -675,8 +685,8 @@ function handle_message(msg) {
|
||||
handle_actor_disconnect(msg.id)
|
||||
break
|
||||
case "greet":
|
||||
var greeter = greeters[msg.id]
|
||||
if (greeter) greeter({type: "actor_started", actor: create_actor(msg)})
|
||||
var greeter = greeters[msg.actor.__ACTORDATA__.id]
|
||||
if (greeter) greeter(msg)
|
||||
break;
|
||||
default:
|
||||
if (receive_fn) receive_fn(msg)
|
||||
@@ -687,7 +697,6 @@ function handle_message(msg) {
|
||||
function enet_check()
|
||||
{
|
||||
if (portal) portal.service(handle_host)
|
||||
if (contactor) contactor.service(handle_host)
|
||||
|
||||
send_messages();
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
#include <mbedtls/entropy.h>
|
||||
#include <mbedtls/ctr_drbg.h>
|
||||
#include <mbedtls/error.h>
|
||||
#include <ctype.h>
|
||||
|
||||
typedef struct {
|
||||
char *data;
|
||||
@@ -14,6 +15,23 @@ typedef struct {
|
||||
size_t capacity;
|
||||
} buffer_t;
|
||||
|
||||
typedef struct {
|
||||
mbedtls_net_context server_fd;
|
||||
mbedtls_ssl_context ssl;
|
||||
mbedtls_ssl_config conf;
|
||||
mbedtls_entropy_context entropy;
|
||||
mbedtls_ctr_drbg_context ctr_drbg;
|
||||
int use_ssl;
|
||||
int headers_complete;
|
||||
buffer_t header_buf;
|
||||
size_t content_length;
|
||||
size_t bytes_read;
|
||||
int chunked_encoding;
|
||||
size_t chunk_size;
|
||||
size_t chunk_remaining;
|
||||
int chunk_state; // 0: size, 1: data, 2: trailer
|
||||
} http_connection_t;
|
||||
|
||||
static void buffer_init(buffer_t *buf) {
|
||||
buf->data = NULL;
|
||||
buf->size = 0;
|
||||
@@ -247,6 +265,451 @@ static char *extract_body(const char *response, size_t response_len, size_t *bod
|
||||
return (char *)body_start;
|
||||
}
|
||||
|
||||
// Parse HTTP headers
|
||||
static int parse_headers(http_connection_t *conn) {
|
||||
char *headers_end = strstr(conn->header_buf.data, "\r\n\r\n");
|
||||
if (!headers_end) {
|
||||
return 0; // Headers not complete
|
||||
}
|
||||
|
||||
*headers_end = '\0';
|
||||
|
||||
// Parse Content-Length
|
||||
char *content_length = strstr(conn->header_buf.data, "Content-Length:");
|
||||
if (!content_length) {
|
||||
content_length = strstr(conn->header_buf.data, "content-length:");
|
||||
}
|
||||
if (content_length) {
|
||||
content_length += 15;
|
||||
while (*content_length == ' ') content_length++;
|
||||
conn->content_length = strtoul(content_length, NULL, 10);
|
||||
}
|
||||
|
||||
// Check for chunked encoding
|
||||
char *transfer_encoding = strstr(conn->header_buf.data, "Transfer-Encoding:");
|
||||
if (!transfer_encoding) {
|
||||
transfer_encoding = strstr(conn->header_buf.data, "transfer-encoding:");
|
||||
}
|
||||
if (transfer_encoding) {
|
||||
transfer_encoding += 18;
|
||||
while (*transfer_encoding == ' ') transfer_encoding++;
|
||||
if (strncmp(transfer_encoding, "chunked", 7) == 0) {
|
||||
conn->chunked_encoding = 1;
|
||||
}
|
||||
}
|
||||
|
||||
conn->headers_complete = 1;
|
||||
|
||||
// Move any body data to the beginning of the buffer
|
||||
headers_end += 4;
|
||||
size_t body_len = conn->header_buf.size - (headers_end - conn->header_buf.data);
|
||||
if (body_len > 0) {
|
||||
memmove(conn->header_buf.data, headers_end, body_len);
|
||||
conn->header_buf.size = body_len;
|
||||
} else {
|
||||
conn->header_buf.size = 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Initialize HTTP connection
|
||||
static void http_connection_init(http_connection_t *conn) {
|
||||
memset(conn, 0, sizeof(http_connection_t));
|
||||
mbedtls_net_init(&conn->server_fd);
|
||||
mbedtls_ssl_init(&conn->ssl);
|
||||
mbedtls_ssl_config_init(&conn->conf);
|
||||
mbedtls_entropy_init(&conn->entropy);
|
||||
mbedtls_ctr_drbg_init(&conn->ctr_drbg);
|
||||
buffer_init(&conn->header_buf);
|
||||
}
|
||||
|
||||
// Free HTTP connection
|
||||
static void http_connection_free(http_connection_t *conn) {
|
||||
if (conn->use_ssl) {
|
||||
mbedtls_ssl_close_notify(&conn->ssl);
|
||||
}
|
||||
mbedtls_net_free(&conn->server_fd);
|
||||
mbedtls_ssl_free(&conn->ssl);
|
||||
mbedtls_ssl_config_free(&conn->conf);
|
||||
mbedtls_ctr_drbg_free(&conn->ctr_drbg);
|
||||
mbedtls_entropy_free(&conn->entropy);
|
||||
buffer_free(&conn->header_buf);
|
||||
}
|
||||
|
||||
// Start HTTP connection
|
||||
static int http_connection_start(http_connection_t *conn, const char *host, const char *port,
|
||||
const char *request, size_t request_len, int use_ssl) {
|
||||
conn->use_ssl = use_ssl;
|
||||
int ret;
|
||||
|
||||
// Connect to server
|
||||
if ((ret = mbedtls_net_connect(&conn->server_fd, host, port, MBEDTLS_NET_PROTO_TCP)) != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (use_ssl) {
|
||||
const char *pers = "qjs_https_client";
|
||||
|
||||
// Seed RNG
|
||||
if ((ret = mbedtls_ctr_drbg_seed(&conn->ctr_drbg, mbedtls_entropy_func, &conn->entropy,
|
||||
(const unsigned char *)pers, strlen(pers))) != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Configure SSL
|
||||
if ((ret = mbedtls_ssl_config_defaults(&conn->conf, MBEDTLS_SSL_IS_CLIENT,
|
||||
MBEDTLS_SSL_TRANSPORT_STREAM,
|
||||
MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
mbedtls_ssl_conf_rng(&conn->conf, mbedtls_ctr_drbg_random, &conn->ctr_drbg);
|
||||
mbedtls_ssl_conf_authmode(&conn->conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
|
||||
|
||||
if ((ret = mbedtls_ssl_setup(&conn->ssl, &conn->conf)) != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((ret = mbedtls_ssl_set_hostname(&conn->ssl, host)) != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
mbedtls_ssl_set_bio(&conn->ssl, &conn->server_fd, mbedtls_net_send, mbedtls_net_recv, NULL);
|
||||
|
||||
// Handshake
|
||||
while ((ret = mbedtls_ssl_handshake(&conn->ssl)) != 0) {
|
||||
if (ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Send request
|
||||
size_t written = 0;
|
||||
while (written < request_len) {
|
||||
ret = mbedtls_ssl_write(&conn->ssl, (const unsigned char *)request + written, request_len - written);
|
||||
if (ret < 0) {
|
||||
if (ret != MBEDTLS_ERR_SSL_WANT_WRITE && ret != MBEDTLS_ERR_SSL_WANT_READ) {
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
written += ret;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Send request for plain HTTP
|
||||
size_t written = 0;
|
||||
while (written < request_len) {
|
||||
ret = mbedtls_net_send(&conn->server_fd, (unsigned char *)request + written, request_len - written);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
written += ret;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Read chunk from connection
|
||||
static int http_connection_read_chunk(http_connection_t *conn, void *buf, size_t max_len, size_t *actual_len) {
|
||||
int ret;
|
||||
*actual_len = 0;
|
||||
|
||||
// First, read headers if not complete
|
||||
if (!conn->headers_complete) {
|
||||
unsigned char temp[1024];
|
||||
|
||||
if (conn->use_ssl) {
|
||||
ret = mbedtls_ssl_read(&conn->ssl, temp, sizeof(temp));
|
||||
} else {
|
||||
ret = mbedtls_net_recv(&conn->server_fd, temp, sizeof(temp));
|
||||
}
|
||||
|
||||
if (ret > 0) {
|
||||
buffer_append(&conn->header_buf, temp, ret);
|
||||
if (parse_headers(conn)) {
|
||||
// Headers complete, check if there's body data in the buffer
|
||||
if (conn->header_buf.size > 0 && max_len > 0) {
|
||||
size_t to_copy = conn->header_buf.size > max_len ? max_len : conn->header_buf.size;
|
||||
memcpy(buf, conn->header_buf.data, to_copy);
|
||||
*actual_len = to_copy;
|
||||
conn->bytes_read += to_copy;
|
||||
|
||||
// Remove copied data from buffer
|
||||
if (to_copy < conn->header_buf.size) {
|
||||
memmove(conn->header_buf.data, conn->header_buf.data + to_copy,
|
||||
conn->header_buf.size - to_copy);
|
||||
conn->header_buf.size -= to_copy;
|
||||
} else {
|
||||
conn->header_buf.size = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (ret < 0 && ret != MBEDTLS_ERR_SSL_WANT_READ) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Headers are complete, read body data
|
||||
if (conn->chunked_encoding) {
|
||||
// Handle chunked encoding - simplified for now
|
||||
// In a full implementation, we'd need to parse chunk sizes
|
||||
if (conn->use_ssl) {
|
||||
ret = mbedtls_ssl_read(&conn->ssl, buf, max_len);
|
||||
} else {
|
||||
ret = mbedtls_net_recv(&conn->server_fd, buf, max_len);
|
||||
}
|
||||
} else {
|
||||
// Regular content-length based reading
|
||||
size_t remaining = conn->content_length - conn->bytes_read;
|
||||
if (remaining == 0) {
|
||||
return 0; // EOF
|
||||
}
|
||||
|
||||
size_t to_read = remaining > max_len ? max_len : remaining;
|
||||
|
||||
if (conn->use_ssl) {
|
||||
ret = mbedtls_ssl_read(&conn->ssl, buf, to_read);
|
||||
} else {
|
||||
ret = mbedtls_net_recv(&conn->server_fd, buf, to_read);
|
||||
}
|
||||
}
|
||||
|
||||
if (ret > 0) {
|
||||
*actual_len = ret;
|
||||
conn->bytes_read += ret;
|
||||
} else if (ret == 0 || ret == MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY) {
|
||||
return 0; // EOF
|
||||
} else if (ret == MBEDTLS_ERR_SSL_WANT_READ || ret == MBEDTLS_ERR_SSL_WANT_WRITE) {
|
||||
return 0; // Try again
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// JS function: fetch_start(url, options) - starts a chunked download
|
||||
static JSValue js_fetch_start(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv) {
|
||||
if (argc < 1 || !JS_IsString(argv[0])) {
|
||||
return JS_ThrowTypeError(ctx, "fetch_start expects a URL string");
|
||||
}
|
||||
|
||||
const char *url = JS_ToCString(ctx, argv[0]);
|
||||
if (!url) {
|
||||
return JS_ThrowTypeError(ctx, "Invalid URL");
|
||||
}
|
||||
|
||||
char *host = NULL;
|
||||
char *port = NULL;
|
||||
char *path = NULL;
|
||||
int use_ssl = 0;
|
||||
buffer_t request_buf;
|
||||
JSValue result = JS_EXCEPTION;
|
||||
|
||||
buffer_init(&request_buf);
|
||||
|
||||
// Parse URL
|
||||
if (parse_url(url, &host, &port, &path, &use_ssl) < 0) {
|
||||
JS_FreeCString(ctx, url);
|
||||
return JS_ThrowTypeError(ctx, "Invalid URL format");
|
||||
}
|
||||
|
||||
// Build request
|
||||
buffer_append(&request_buf, "GET ", 4);
|
||||
buffer_append(&request_buf, path, strlen(path));
|
||||
buffer_append(&request_buf, " HTTP/1.1\r\n", 11);
|
||||
buffer_append(&request_buf, "Host: ", 6);
|
||||
buffer_append(&request_buf, host, strlen(host));
|
||||
buffer_append(&request_buf, "\r\n", 2);
|
||||
|
||||
// Add headers from options if provided
|
||||
if (argc >= 2 && JS_IsObject(argv[1])) {
|
||||
JSValue headers = JS_GetPropertyStr(ctx, argv[1], "headers");
|
||||
if (JS_IsObject(headers)) {
|
||||
JSPropertyEnum *tab;
|
||||
uint32_t len;
|
||||
if (JS_GetOwnPropertyNames(ctx, &tab, &len, headers, JS_GPN_STRING_MASK | JS_GPN_ENUM_ONLY) == 0) {
|
||||
for (uint32_t i = 0; i < len; i++) {
|
||||
JSValue key = JS_AtomToString(ctx, tab[i].atom);
|
||||
JSValue val = JS_GetProperty(ctx, headers, tab[i].atom);
|
||||
|
||||
const char *key_str = JS_ToCString(ctx, key);
|
||||
const char *val_str = JS_ToCString(ctx, val);
|
||||
|
||||
if (key_str && val_str) {
|
||||
buffer_append(&request_buf, key_str, strlen(key_str));
|
||||
buffer_append(&request_buf, ": ", 2);
|
||||
buffer_append(&request_buf, val_str, strlen(val_str));
|
||||
buffer_append(&request_buf, "\r\n", 2);
|
||||
}
|
||||
|
||||
JS_FreeCString(ctx, key_str);
|
||||
JS_FreeCString(ctx, val_str);
|
||||
JS_FreeValue(ctx, key);
|
||||
JS_FreeValue(ctx, val);
|
||||
}
|
||||
js_free(ctx, tab);
|
||||
}
|
||||
}
|
||||
JS_FreeValue(ctx, headers);
|
||||
}
|
||||
|
||||
buffer_append(&request_buf, "Connection: close\r\n\r\n", 21);
|
||||
|
||||
// Create connection object
|
||||
http_connection_t *conn = js_mallocz(ctx, sizeof(http_connection_t));
|
||||
if (!conn) {
|
||||
result = JS_ThrowOutOfMemory(ctx);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
http_connection_init(conn);
|
||||
|
||||
// Start connection
|
||||
int ret = http_connection_start(conn, host, port, request_buf.data, request_buf.size, use_ssl);
|
||||
if (ret != 0) {
|
||||
char error_buf[256];
|
||||
mbedtls_strerror(ret, error_buf, sizeof(error_buf));
|
||||
http_connection_free(conn);
|
||||
js_free(ctx, conn);
|
||||
result = JS_ThrowInternalError(ctx, "Connection failed: %s", error_buf);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
// Create JS object to represent the connection
|
||||
result = JS_NewObjectClass(ctx, JS_CLASS_OBJECT);
|
||||
JS_SetOpaque(result, conn);
|
||||
|
||||
// Add properties
|
||||
JS_DefinePropertyValueStr(ctx, result, "url", JS_NewString(ctx, url), JS_PROP_C_W_E);
|
||||
JS_DefinePropertyValueStr(ctx, result, "_connection", JS_NewInt64(ctx, (intptr_t)conn), JS_PROP_C_W_E);
|
||||
|
||||
cleanup:
|
||||
JS_FreeCString(ctx, url);
|
||||
free(host);
|
||||
free(port);
|
||||
free(path);
|
||||
buffer_free(&request_buf);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// JS function: fetch_read_chunk(connection, maxBytes) - reads a chunk
|
||||
static JSValue js_fetch_read_chunk(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv) {
|
||||
if (argc < 1) {
|
||||
return JS_ThrowTypeError(ctx, "fetch_read_chunk expects a connection object");
|
||||
}
|
||||
|
||||
JSValue conn_val = JS_GetPropertyStr(ctx, argv[0], "_connection");
|
||||
if (JS_IsUndefined(conn_val)) {
|
||||
return JS_ThrowTypeError(ctx, "Invalid connection object");
|
||||
}
|
||||
|
||||
int64_t conn_ptr;
|
||||
if (JS_ToInt64(ctx, &conn_ptr, conn_val) < 0) {
|
||||
JS_FreeValue(ctx, conn_val);
|
||||
return JS_ThrowTypeError(ctx, "Invalid connection pointer");
|
||||
}
|
||||
JS_FreeValue(ctx, conn_val);
|
||||
|
||||
http_connection_t *conn = (http_connection_t *)(intptr_t)conn_ptr;
|
||||
|
||||
size_t max_bytes = 8192;
|
||||
if (argc >= 2) {
|
||||
int64_t mb;
|
||||
if (JS_ToInt64(ctx, &mb, argv[1]) == 0 && mb > 0) {
|
||||
max_bytes = mb;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t *buf = js_malloc(ctx, max_bytes);
|
||||
if (!buf) {
|
||||
return JS_ThrowOutOfMemory(ctx);
|
||||
}
|
||||
|
||||
size_t actual_len;
|
||||
int ret = http_connection_read_chunk(conn, buf, max_bytes, &actual_len);
|
||||
|
||||
JSValue result;
|
||||
if (ret < 0 && ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) {
|
||||
char error_buf[256];
|
||||
mbedtls_strerror(ret, error_buf, sizeof(error_buf));
|
||||
js_free(ctx, buf);
|
||||
return JS_ThrowInternalError(ctx, "Read failed: %s", error_buf);
|
||||
}
|
||||
|
||||
if (actual_len == 0 && ret == 0) {
|
||||
// EOF or would block
|
||||
js_free(ctx, buf);
|
||||
return JS_NULL;
|
||||
}
|
||||
|
||||
// Return chunk as ArrayBuffer
|
||||
result = JS_NewArrayBufferCopy(ctx, buf, actual_len);
|
||||
js_free(ctx, buf);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// JS function: fetch_info(connection) - gets connection info
|
||||
static JSValue js_fetch_info(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv) {
|
||||
if (argc < 1) {
|
||||
return JS_ThrowTypeError(ctx, "fetch_info expects a connection object");
|
||||
}
|
||||
|
||||
JSValue conn_val = JS_GetPropertyStr(ctx, argv[0], "_connection");
|
||||
if (JS_IsUndefined(conn_val)) {
|
||||
return JS_ThrowTypeError(ctx, "Invalid connection object");
|
||||
}
|
||||
|
||||
int64_t conn_ptr;
|
||||
if (JS_ToInt64(ctx, &conn_ptr, conn_val) < 0) {
|
||||
JS_FreeValue(ctx, conn_val);
|
||||
return JS_ThrowTypeError(ctx, "Invalid connection pointer");
|
||||
}
|
||||
JS_FreeValue(ctx, conn_val);
|
||||
|
||||
http_connection_t *conn = (http_connection_t *)(intptr_t)conn_ptr;
|
||||
|
||||
JSValue result = JS_NewObject(ctx);
|
||||
JS_DefinePropertyValueStr(ctx, result, "headers_complete", JS_NewBool(ctx, conn->headers_complete), JS_PROP_C_W_E);
|
||||
JS_DefinePropertyValueStr(ctx, result, "content_length", JS_NewInt64(ctx, conn->content_length), JS_PROP_C_W_E);
|
||||
JS_DefinePropertyValueStr(ctx, result, "bytes_read", JS_NewInt64(ctx, conn->bytes_read), JS_PROP_C_W_E);
|
||||
JS_DefinePropertyValueStr(ctx, result, "chunked_encoding", JS_NewBool(ctx, conn->chunked_encoding), JS_PROP_C_W_E);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// JS function: fetch_close(connection) - closes the connection
|
||||
static JSValue js_fetch_close(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv) {
|
||||
if (argc < 1) {
|
||||
return JS_ThrowTypeError(ctx, "fetch_close expects a connection object");
|
||||
}
|
||||
|
||||
JSValue conn_val = JS_GetPropertyStr(ctx, argv[0], "_connection");
|
||||
if (JS_IsUndefined(conn_val)) {
|
||||
return JS_ThrowTypeError(ctx, "Invalid connection object");
|
||||
}
|
||||
|
||||
int64_t conn_ptr;
|
||||
if (JS_ToInt64(ctx, &conn_ptr, conn_val) < 0) {
|
||||
JS_FreeValue(ctx, conn_val);
|
||||
return JS_ThrowTypeError(ctx, "Invalid connection pointer");
|
||||
}
|
||||
JS_FreeValue(ctx, conn_val);
|
||||
|
||||
http_connection_t *conn = (http_connection_t *)(intptr_t)conn_ptr;
|
||||
http_connection_free(conn);
|
||||
js_free(ctx, conn);
|
||||
|
||||
// Clear the connection pointer
|
||||
JS_DefinePropertyValueStr(ctx, argv[0], "_connection", JS_UNDEFINED, JS_PROP_C_W_E);
|
||||
|
||||
return JS_UNDEFINED;
|
||||
}
|
||||
|
||||
// JS function: fetch(url, options)
|
||||
static JSValue js_fetch(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv) {
|
||||
if (argc < 1 || !JS_IsString(argv[0])) {
|
||||
@@ -358,6 +821,10 @@ static JSValue js_fetch(JSContext *ctx, JSValueConst this_val, int argc, JSValue
|
||||
// Module exports
|
||||
static const JSCFunctionListEntry js_http_funcs[] = {
|
||||
JS_CFUNC_DEF("fetch", 2, js_fetch),
|
||||
JS_CFUNC_DEF("fetch_start", 2, js_fetch_start),
|
||||
JS_CFUNC_DEF("fetch_read_chunk", 2, js_fetch_read_chunk),
|
||||
JS_CFUNC_DEF("fetch_info", 1, js_fetch_info),
|
||||
JS_CFUNC_DEF("fetch_close", 1, js_fetch_close),
|
||||
};
|
||||
|
||||
JSValue js_http_use(JSContext *js)
|
||||
|
||||
@@ -1,6 +1,49 @@
|
||||
var http = use('http')
|
||||
var os = use('os')
|
||||
|
||||
var res = http.fetch("https://dictionary.ink/find?word=palm")
|
||||
var downloader
|
||||
var download_complete = false
|
||||
|
||||
function checkin()
|
||||
{
|
||||
if (download_complete) return
|
||||
|
||||
send(downloader, {type:'status'}, e => {
|
||||
console.log("Status:", json.encode(e))
|
||||
|
||||
// Check if download is complete or error
|
||||
if (e.type === 'error' || (e.type === 'status_response' && e.status === 'idle')) {
|
||||
// Stop checking if no download in progress
|
||||
return
|
||||
}
|
||||
|
||||
// Continue checking
|
||||
$_.delay(checkin, 0.5)
|
||||
})
|
||||
}
|
||||
|
||||
$_.start(e => {
|
||||
console.log(json.encode(e))
|
||||
if (e.type === 'greet') {
|
||||
downloader = e.actor
|
||||
|
||||
// Start download
|
||||
send(downloader, {
|
||||
type:'download',
|
||||
url: 'https://dictionary.ink/find?word=palm'
|
||||
}, e => {
|
||||
console.log("Download response:", json.encode(e))
|
||||
download_complete = true
|
||||
|
||||
if (e.type === 'complete') {
|
||||
console.log("Download complete! Size:", e.size, "bytes")
|
||||
} else if (e.type === 'error') {
|
||||
console.log("Download failed:", e.error)
|
||||
}
|
||||
})
|
||||
|
||||
// Start status checking after a small delay
|
||||
$_.delay(checkin, 0.01)
|
||||
}
|
||||
}, "examples/http_download_actor.js")
|
||||
|
||||
console.log(os.buffer2string(res))
|
||||
Reference in New Issue
Block a user