Files
cell/http.cm
2026-02-25 23:29:37 -06:00

784 lines
20 KiB
Plaintext

var socket = use('socket')
var tls = use('net/tls')
var Blob = use('blob')
def CRLF = "\r\n"
def status_texts = {
"200": "OK", "201": "Created", "204": "No Content",
"301": "Moved Permanently", "302": "Found", "307": "Temporary Redirect",
"400": "Bad Request", "401": "Unauthorized", "403": "Forbidden",
"404": "Not Found", "405": "Method Not Allowed", "500": "Internal Server Error"
}
// ============================================================
// Server (unchanged)
// ============================================================
function serve(port) {
var fd = socket.socket("AF_INET", "SOCK_STREAM")
socket.setsockopt(fd, "SOL_SOCKET", "SO_REUSEADDR", true)
socket.bind(fd, {address: "127.0.0.1", port: port})
socket.listen(fd, 16)
return fd
}
function parse_request(conn_fd) {
var data = socket.recv(conn_fd, 65536)
var raw = text(data)
var hdr_end = search(raw, CRLF + CRLF)
if (hdr_end == null) disrupt
var header_text = text(raw, 0, hdr_end)
var body_text = text(raw, hdr_end + 4)
var lines = array(header_text, CRLF)
var parts = array(lines[0], " ")
var method = parts[0]
var url = parts[1]
var qpos = search(url, "?")
var path = qpos != null ? text(url, 0, qpos) : url
var headers = {}
var i = 1
var colon = null
var key = null
var val = null
while (i < length(lines)) {
colon = search(lines[i], ": ")
if (colon != null) {
key = lower(text(lines[i], 0, colon))
val = text(lines[i], colon + 2)
headers[key] = val
}
i = i + 1
}
var cl = headers["content-length"]
var content_length = null
var remaining = null
var more = null
if (cl != null) content_length = number(cl)
if (content_length != null && length(body_text) < content_length) {
remaining = content_length - length(body_text)
more = socket.recv(conn_fd, remaining)
body_text = body_text + text(more)
}
if (content_length == null || content_length == 0) body_text = null
return {
method: method, path: path, url: url,
headers: headers, body: body_text, _conn: conn_fd
}
}
function accept(server_fd) {
var conn = socket.accept(server_fd)
return parse_request(conn.socket)
}
function on_request(server_fd, handler) {
var _accept = function() {
var conn = socket.accept(server_fd)
var req = null
var _parse = function() {
req = parse_request(conn.socket)
} disruption {
req = null
}
_parse()
if (req != null) handler(req)
socket.on_readable(server_fd, _accept)
}
socket.on_readable(server_fd, _accept)
}
function respond(conn, status, headers, body) {
var st = status_texts[text(status)]
if (st == null) st = "Unknown"
var out = "HTTP/1.1 " + text(status) + " " + st + CRLF
out = out + "Connection: close" + CRLF
var body_str = ""
var keys = null
var i = 0
if (body != null) {
if (is_text(body)) body_str = body
else body_str = text(body)
}
if (headers != null) {
keys = array(headers)
i = 0
while (i < length(keys)) {
out = out + keys[i] + ": " + headers[keys[i]] + CRLF
i = i + 1
}
}
out = out + "Content-Length: " + text(length(body_str)) + CRLF
out = out + CRLF + body_str
socket.send(conn, out)
socket.close(conn)
}
function sse_open(conn, headers) {
var out = "HTTP/1.1 200 OK" + CRLF
out = out + "Content-Type: text/event-stream" + CRLF
out = out + "Cache-Control: no-cache" + CRLF
out = out + "Connection: keep-alive" + CRLF
var keys = null
var i = 0
if (headers != null) {
keys = array(headers)
i = 0
while (i < length(keys)) {
out = out + keys[i] + ": " + headers[keys[i]] + CRLF
i = i + 1
}
}
out = out + CRLF
socket.send(conn, out)
}
function sse_event(conn, event, data) {
var frame = "event: " + event + "\ndata: " + data + "\n\n"
var ok = true
var _send = function() {
socket.send(conn, frame)
} disruption {
ok = false
}
_send()
return ok
}
function sse_close(conn) {
socket.close(conn)
}
// ============================================================
// Blocking client request (kept for compatibility)
// ============================================================
function request(method, url, headers, body) {
var parts = array(url, "/")
var host_port = parts[2]
var path = "/" + text(array(parts, 3, length(parts)), "/")
var hp = array(host_port, ":")
var host = hp[0]
var port = length(hp) > 1 ? number(hp[1]) : 80
var fd = socket.socket("AF_INET", "SOCK_STREAM")
var raw = null
var hdr_end = null
var _do = function() {
socket.connect(fd, {address: host, port: port})
var body_str = ""
if (body != null) {
if (is_text(body)) body_str = body
else body_str = text(body)
}
var keys = null
var i = 0
var req = method + " " + path + " HTTP/1.1" + CRLF
req = req + "Host: " + host_port + CRLF
req = req + "Connection: close" + CRLF
if (headers != null) {
keys = array(headers)
i = 0
while (i < length(keys)) {
req = req + keys[i] + ": " + headers[keys[i]] + CRLF
i = i + 1
}
}
if (length(body_str) > 0) {
req = req + "Content-Length: " + text(length(body_str)) + CRLF
}
req = req + CRLF + body_str
socket.send(fd, req)
raw = text(socket.recv(fd, 65536))
} disruption {
raw = null
}
_do()
socket.close(fd)
if (raw == null) return null
hdr_end = search(raw, CRLF + CRLF)
if (hdr_end == null) return null
var header_text = text(raw, 0, hdr_end)
var lines = array(header_text, CRLF)
var status_parts = array(lines[0], " ")
var status = number(status_parts[1])
var resp_headers = {}
var hi = 1
var colon = null
while (hi < length(lines)) {
colon = search(lines[hi], ": ")
if (colon != null) {
resp_headers[lower(text(lines[hi], 0, colon))] = text(lines[hi], colon + 2)
}
hi = hi + 1
}
return {
status: status,
headers: resp_headers,
body: text(raw, hdr_end + 4)
}
}
// ============================================================
// Requestor-based async fetch
// ============================================================
// parse_url requestor — sync, extract {scheme, host, port, path} from URL
var parse_url = function(callback, value) {
var url = null
var method = "GET"
var req_headers = null
var req_body = null
log.console("value type=" + text(is_text(value)) + " val=" + text(value))
if (is_text(value)) {
url = value
log.console("url after assign=" + text(is_text(url)) + " url=" + text(url))
} else {
url = value.url
if (value.method != null) method = value.method
if (value.headers != null) req_headers = value.headers
if (value.body != null) req_body = value.body
}
// strip scheme
var scheme = "http"
var rest = url
var scheme_end = search(url, "://")
log.console("A: url_type=" + text(is_text(url)) + " scheme_end=" + text(scheme_end))
if (scheme_end != null) {
scheme = lower(text(url, 0, scheme_end))
rest = text(url, scheme_end + 3, length(url))
log.console("B: scheme=" + scheme + " rest=" + rest + " rest_type=" + text(is_text(rest)))
}
// split host from path
var slash = search(rest, "/")
var host_port = rest
var path = "/"
log.console("C: slash=" + text(slash))
if (slash != null) {
host_port = text(rest, 0, slash)
path = text(rest, slash, length(rest))
}
// split host:port
var hp = array(host_port, ":")
var host = hp[0]
var port = null
if (length(hp) > 1) {
port = number(hp[1])
} else {
port = scheme == "https" ? 443 : 80
}
callback({
scheme: scheme, host: host, port: port, path: path,
host_port: host_port, method: method,
req_headers: req_headers, req_body: req_body
})
return null
}
// resolve_dns requestor — blocking getaddrinfo, swappable later
var resolve_dns = function(callback, state) {
var ok = true
var addrs = null
var _resolve = function() {
addrs = socket.getaddrinfo(state.host, text(state.port))
} disruption {
ok = false
}
_resolve()
if (!ok || addrs == null || length(addrs) == 0) {
callback(null, "dns resolution failed for " + state.host)
return null
}
callback(record(state, {address: addrs[0].address}))
return null
}
// open_connection requestor — non-blocking connect + optional TLS
var open_connection = function(callback, state) {
var fd = socket.socket("AF_INET", "SOCK_STREAM")
var cancelled = false
var cancel = function() {
var _close = null
if (!cancelled) {
cancelled = true
_close = function() {
socket.unwatch(fd)
socket.close(fd)
} disruption {}
_close()
}
}
socket.setnonblock(fd)
var finish_connect = function(the_fd) {
var ctx = null
if (state.scheme == "https") {
ctx = tls.wrap(the_fd, state.host)
}
callback(record(state, {fd: the_fd, tls: ctx}))
}
// non-blocking connect — EINPROGRESS is expected
var connect_err = false
var _connect = function() {
socket.connect(fd, {address: state.address, port: state.port})
} disruption {
connect_err = true
}
_connect()
// if connect succeeded immediately (localhost, etc)
var _finish_immediate = null
if (!connect_err && !cancelled) {
_finish_immediate = function() {
finish_connect(fd)
} disruption {
cancel()
callback(null, "connection setup failed")
}
_finish_immediate()
return cancel
}
// wait for connect to complete
socket.on_writable(fd, function() {
if (cancelled) return
var err = socket.getsockopt(fd, "SOL_SOCKET", "SO_ERROR")
if (err != 0) {
cancel()
callback(null, "connect failed (errno " + text(err) + ")")
return
}
var _finish = function() {
finish_connect(fd)
} disruption {
cancel()
callback(null, "connection setup failed")
}
_finish()
})
return cancel
}
// send_request requestor — format + send HTTP/1.1 request
var send_request = function(callback, state) {
var cancelled = false
var cancel = function() {
cancelled = true
}
var _send = function() {
var body_str = ""
var keys = null
var i = 0
if (state.req_body != null) {
if (is_text(state.req_body)) body_str = state.req_body
else body_str = text(state.req_body)
}
var req = state.method + " " + state.path + " HTTP/1.1" + CRLF
req = req + "Host: " + state.host_port + CRLF
req = req + "Connection: close" + CRLF
req = req + "User-Agent: cell/1.0" + CRLF
req = req + "Accept: */*" + CRLF
if (state.req_headers != null) {
keys = array(state.req_headers)
i = 0
while (i < length(keys)) {
req = req + keys[i] + ": " + state.req_headers[keys[i]] + CRLF
i = i + 1
}
}
if (length(body_str) > 0) {
req = req + "Content-Length: " + text(length(body_str)) + CRLF
}
req = req + CRLF + body_str
if (state.tls != null) {
tls.send(state.tls, req)
} else {
socket.send(state.fd, req)
}
} disruption {
if (!cancelled) callback(null, "send request failed")
return cancel
}
_send()
if (!cancelled) callback(state)
return cancel
}
// parse response headers from raw text
function parse_headers(raw) {
var hdr_end = search(raw, CRLF + CRLF)
if (hdr_end == null) return null
var header_text = text(raw, 0, hdr_end)
var lines = array(header_text, CRLF)
var status_parts = array(lines[0], " ")
var status_code = number(status_parts[1])
var headers = {}
var i = 1
var colon = null
while (i < length(lines)) {
colon = search(lines[i], ": ")
if (colon != null) {
headers[lower(text(lines[i], 0, colon))] = text(lines[i], colon + 2)
}
i = i + 1
}
return {
status: status_code, headers: headers,
body_start: hdr_end + 4
}
}
// decode chunked transfer encoding (text version, for async responses)
function decode_chunked(body_text) {
var result = ""
var pos = 0
var chunk_end = null
var chunk_size = null
while (pos < length(body_text)) {
chunk_end = search(text(body_text, pos), CRLF)
if (chunk_end == null) return result
chunk_size = number(text(body_text, pos, pos + chunk_end), 16)
if (chunk_size == null || chunk_size == 0) return result
pos = pos + chunk_end + 2
result = result + text(body_text, pos, pos + chunk_size)
pos = pos + chunk_size + 2
}
return result
}
// decode chunked transfer encoding (blob version, preserves binary data)
function decode_chunked_blob(buf, body_start_bytes) {
var result = Blob()
var pos = body_start_bytes
var total_bytes = length(buf) / 8
var header_end = null
var header_blob = null
var header_text = null
var crlf_pos = null
var chunk_size = null
var chunk_data = null
while (pos < total_bytes) {
header_end = pos + 20
if (header_end > total_bytes) header_end = total_bytes
header_blob = buf.read_blob(pos * 8, header_end * 8)
stone(header_blob)
header_text = text(header_blob)
crlf_pos = search(header_text, CRLF)
if (crlf_pos == null) break
chunk_size = number(text(header_text, 0, crlf_pos), 16)
if (chunk_size == null || chunk_size == 0) break
pos = pos + crlf_pos + 2
chunk_data = buf.read_blob(pos * 8, (pos + chunk_size) * 8)
stone(chunk_data)
result.write_blob(chunk_data)
pos = pos + chunk_size + 2
}
stone(result)
return result
}
// receive_response requestor — async incremental receive
var receive_response = function(callback, state) {
var cancelled = false
var buffer = ""
var parsed = null
var content_length = null
var is_chunked = false
var body_complete = false
var cancel = function() {
var _cleanup = null
if (!cancelled) {
cancelled = true
_cleanup = function() {
if (state.tls != null) {
tls.close(state.tls)
} else {
socket.unwatch(state.fd)
socket.close(state.fd)
}
} disruption {}
_cleanup()
}
}
var finish = function() {
if (cancelled) return
var body_text = text(buffer, parsed.body_start)
if (is_chunked) {
body_text = decode_chunked(body_text)
}
// clean up connection
var _cleanup = function() {
if (state.tls != null) {
tls.close(state.tls)
} else {
socket.close(state.fd)
}
} disruption {}
_cleanup()
callback({
status: parsed.status,
headers: parsed.headers,
body: body_text
})
}
var check_complete = function() {
var te = null
var cl = null
var body_text = null
// still waiting for headers
if (parsed == null) {
parsed = parse_headers(buffer)
if (parsed == null) return false
te = parsed.headers["transfer-encoding"]
if (te != null && search(lower(te), "chunked") != null) {
is_chunked = true
}
cl = parsed.headers["content-length"]
if (cl != null) content_length = number(cl)
}
body_text = text(buffer, parsed.body_start)
if (is_chunked) {
// chunked: look for the terminating 0-length chunk
if (search(body_text, CRLF + "0" + CRLF) != null) return true
if (starts_with(body_text, "0" + CRLF)) return true
return false
}
if (content_length != null) {
return length(body_text) >= content_length
}
// connection: close — we read until EOF (handled by recv returning 0 bytes)
return false
}
var on_data = function() {
if (cancelled) return
var chunk = null
var got_data = false
var eof = false
var _recv = function() {
if (state.tls != null) {
chunk = tls.recv(state.tls, 16384)
} else {
chunk = socket.recv(state.fd, 16384)
}
} disruption {
// recv error — treat as EOF
eof = true
}
_recv()
var chunk_text = null
if (!eof && chunk != null) {
stone(chunk)
chunk_text = text(chunk)
if (length(chunk_text) > 0) {
buffer = buffer + chunk_text
got_data = true
} else {
eof = true
}
}
if (got_data && check_complete()) {
finish()
return
}
if (eof) {
// connection closed — if we have headers, deliver what we have
if (parsed != null || parse_headers(buffer) != null) {
if (parsed == null) parsed = parse_headers(buffer)
finish()
} else {
cancel()
callback(null, "connection closed before headers received")
}
return
}
// re-register for more data (one-shot watches)
if (!cancelled) {
if (state.tls != null) {
tls.on_readable(state.tls, on_data)
} else {
socket.on_readable(state.fd, on_data)
}
}
}
// start reading
if (state.tls != null) {
tls.on_readable(state.tls, on_data)
} else {
socket.on_readable(state.fd, on_data)
}
return cancel
}
// ============================================================
// fetch — synchronous HTTP(S) GET, returns response body (stoned blob)
// ============================================================
var fetch = function(url) {
var scheme = "http"
var rest = url
var scheme_end = search(url, "://")
var slash = null
var host_port = null
var path = "/"
var hp = null
var host = null
var port = null
var fd = null
var ctx = null
var buf = Blob()
var raw_text = null
var hdr_end = null
var header_text = null
var body_start_bits = null
var body = null
var addrs = null
var address = null
var ok = true
var status_line = null
var status_code = null
if (scheme_end != null) {
scheme = lower(text(url, 0, scheme_end))
rest = text(url, scheme_end + 3, length(url))
}
slash = search(rest, "/")
host_port = rest
if (slash != null) {
host_port = text(rest, 0, slash)
path = text(rest, slash, length(rest))
}
hp = array(host_port, ":")
host = hp[0]
port = length(hp) > 1 ? number(hp[1]) : (scheme == "https" ? 443 : 80)
addrs = socket.getaddrinfo(host, text(port))
if (addrs == null || length(addrs) == 0) return null
address = addrs[0].address
fd = socket.socket("AF_INET", "SOCK_STREAM")
var _do = function() {
var req = null
var chunk = null
socket.connect(fd, {address: address, port: port})
if (scheme == "https") ctx = tls.wrap(fd, host)
req = "GET " + path + " HTTP/1.1" + CRLF
req = req + "Host: " + host_port + CRLF
req = req + "Connection: close" + CRLF
req = req + "User-Agent: cell/1.0" + CRLF
req = req + "Accept: */*" + CRLF + CRLF
if (ctx != null) tls.send(ctx, req)
else socket.send(fd, req)
while (true) {
if (ctx != null) chunk = tls.recv(ctx, 16384)
else chunk = socket.recv(fd, 16384)
if (chunk == null) break
stone(chunk)
if (length(chunk) == 0) break
buf.write_blob(chunk)
}
} disruption {
ok = false
}
_do()
var _cleanup = function() {
if (ctx != null) tls.close(ctx)
else socket.close(fd)
} disruption {}
_cleanup()
if (!ok) return null
stone(buf)
raw_text = text(buf)
hdr_end = search(raw_text, CRLF + CRLF)
if (hdr_end == null) return null
header_text = text(raw_text, 0, hdr_end)
status_line = text(header_text, 0, search(header_text, CRLF) || length(header_text))
status_code = number(text(status_line, 9, 12))
if (status_code == null || status_code < 200 || status_code >= 300) {
log.error("fetch: " + status_line)
disrupt
}
if (search(lower(header_text), "transfer-encoding: chunked") != null) {
body = decode_chunked_blob(buf, hdr_end + 4)
return body
}
// Headers are ASCII so char offset = byte offset
body_start_bits = (hdr_end + 4) * 8
body = buf.read_blob(body_start_bits, length(buf))
stone(body)
return body
}
// ============================================================
// fetch_requestor — async requestor pipeline for fetch
// ============================================================
var fetch_requestor = sequence([
parse_url,
resolve_dns,
open_connection,
send_request,
receive_response
])
function close(fd) {
socket.close(fd)
}
return {
// server
serve: serve, accept: accept, on_request: on_request,
respond: respond, close: close,
sse_open: sse_open, sse_event: sse_event, sse_close: sse_close,
// client
fetch: fetch,
fetch_requestor: fetch_requestor,
request: request
}