tls and http
This commit is contained in:
427
http.cm
427
http.cm
@@ -1,14 +1,19 @@
|
||||
var socket = use('socket')
|
||||
var c_http = use('net/http')
|
||||
var tls = use('net/tls')
|
||||
|
||||
def CRLF = "\r\n"
|
||||
|
||||
def status_texts = {
|
||||
"200": "OK", "201": "Created", "204": "No Content",
|
||||
"301": "Moved Permanently", "302": "Found", "307": "Temporary Redirect",
|
||||
"400": "Bad Request", "401": "Unauthorized", "403": "Forbidden",
|
||||
"404": "Not Found", "405": "Method Not Allowed", "500": "Internal Server Error"
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Server (unchanged)
|
||||
// ============================================================
|
||||
|
||||
function serve(port) {
|
||||
var fd = socket.socket("AF_INET", "SOCK_STREAM")
|
||||
socket.setsockopt(fd, "SOL_SOCKET", "SO_REUSEADDR", true)
|
||||
@@ -152,6 +157,10 @@ function sse_close(conn) {
|
||||
socket.close(conn)
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Blocking client request (kept for compatibility)
|
||||
// ============================================================
|
||||
|
||||
function request(method, url, headers, body) {
|
||||
var parts = array(url, "/")
|
||||
var host_port = parts[2]
|
||||
@@ -221,13 +230,425 @@ function request(method, url, headers, body) {
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Requestor-based async fetch
|
||||
// ============================================================
|
||||
|
||||
// parse_url requestor — sync, extract {scheme, host, port, path} from URL
|
||||
var parse_url = function(callback, value) {
|
||||
var url = null
|
||||
var method = "GET"
|
||||
var req_headers = null
|
||||
var req_body = null
|
||||
log.console("value type=" + text(is_text(value)) + " val=" + text(value))
|
||||
if (is_text(value)) {
|
||||
url = value
|
||||
log.console("url after assign=" + text(is_text(url)) + " url=" + text(url))
|
||||
} else {
|
||||
url = value.url
|
||||
if (value.method != null) method = value.method
|
||||
if (value.headers != null) req_headers = value.headers
|
||||
if (value.body != null) req_body = value.body
|
||||
}
|
||||
|
||||
// strip scheme
|
||||
var scheme = "http"
|
||||
var rest = url
|
||||
var scheme_end = search(url, "://")
|
||||
log.console("A: url_type=" + text(is_text(url)) + " scheme_end=" + text(scheme_end))
|
||||
if (scheme_end != null) {
|
||||
scheme = lower(text(url, 0, scheme_end))
|
||||
rest = text(url, scheme_end + 3, length(url))
|
||||
log.console("B: scheme=" + scheme + " rest=" + rest + " rest_type=" + text(is_text(rest)))
|
||||
}
|
||||
|
||||
// split host from path
|
||||
var slash = search(rest, "/")
|
||||
var host_port = rest
|
||||
var path = "/"
|
||||
log.console("C: slash=" + text(slash))
|
||||
if (slash != null) {
|
||||
host_port = text(rest, 0, slash)
|
||||
path = text(rest, slash, length(rest))
|
||||
}
|
||||
// split host:port
|
||||
var hp = array(host_port, ":")
|
||||
var host = hp[0]
|
||||
var port = null
|
||||
if (length(hp) > 1) {
|
||||
port = number(hp[1])
|
||||
} else {
|
||||
port = scheme == "https" ? 443 : 80
|
||||
}
|
||||
|
||||
callback({
|
||||
scheme: scheme, host: host, port: port, path: path,
|
||||
host_port: host_port, method: method,
|
||||
req_headers: req_headers, req_body: req_body
|
||||
})
|
||||
return null
|
||||
}
|
||||
|
||||
// resolve_dns requestor — blocking getaddrinfo, swappable later
|
||||
var resolve_dns = function(callback, state) {
|
||||
var ok = true
|
||||
var addrs = null
|
||||
var _resolve = function() {
|
||||
addrs = socket.getaddrinfo(state.host, text(state.port))
|
||||
} disruption {
|
||||
ok = false
|
||||
}
|
||||
_resolve()
|
||||
if (!ok || addrs == null || length(addrs) == 0) {
|
||||
callback(null, "dns resolution failed for " + state.host)
|
||||
return null
|
||||
}
|
||||
callback(record(state, {address: addrs[0].address}))
|
||||
return null
|
||||
}
|
||||
|
||||
// open_connection requestor — non-blocking connect + optional TLS
|
||||
var open_connection = function(callback, state) {
|
||||
var fd = socket.socket("AF_INET", "SOCK_STREAM")
|
||||
var cancelled = false
|
||||
|
||||
var cancel = function() {
|
||||
var _close = null
|
||||
if (!cancelled) {
|
||||
cancelled = true
|
||||
_close = function() {
|
||||
socket.unwatch(fd)
|
||||
socket.close(fd)
|
||||
} disruption {}
|
||||
_close()
|
||||
}
|
||||
}
|
||||
|
||||
socket.setnonblock(fd)
|
||||
|
||||
var finish_connect = function(the_fd) {
|
||||
var ctx = null
|
||||
if (state.scheme == "https") {
|
||||
ctx = tls.wrap(the_fd, state.host)
|
||||
}
|
||||
callback(record(state, {fd: the_fd, tls: ctx}))
|
||||
}
|
||||
|
||||
// non-blocking connect — EINPROGRESS is expected
|
||||
var connect_err = false
|
||||
var _connect = function() {
|
||||
socket.connect(fd, {address: state.address, port: state.port})
|
||||
} disruption {
|
||||
connect_err = true
|
||||
}
|
||||
_connect()
|
||||
|
||||
// if connect succeeded immediately (localhost, etc)
|
||||
var _finish_immediate = null
|
||||
if (!connect_err && !cancelled) {
|
||||
_finish_immediate = function() {
|
||||
finish_connect(fd)
|
||||
} disruption {
|
||||
cancel()
|
||||
callback(null, "connection setup failed")
|
||||
}
|
||||
_finish_immediate()
|
||||
return cancel
|
||||
}
|
||||
|
||||
// wait for connect to complete
|
||||
socket.on_writable(fd, function() {
|
||||
if (cancelled) return
|
||||
var err = socket.getsockopt(fd, "SOL_SOCKET", "SO_ERROR")
|
||||
if (err != 0) {
|
||||
cancel()
|
||||
callback(null, "connect failed (errno " + text(err) + ")")
|
||||
return
|
||||
}
|
||||
var _finish = function() {
|
||||
finish_connect(fd)
|
||||
} disruption {
|
||||
cancel()
|
||||
callback(null, "connection setup failed")
|
||||
}
|
||||
_finish()
|
||||
})
|
||||
|
||||
return cancel
|
||||
}
|
||||
|
||||
// send_request requestor — format + send HTTP/1.1 request
|
||||
var send_request = function(callback, state) {
|
||||
var cancelled = false
|
||||
var cancel = function() {
|
||||
cancelled = true
|
||||
}
|
||||
|
||||
var _send = function() {
|
||||
var body_str = ""
|
||||
var keys = null
|
||||
var i = 0
|
||||
if (state.req_body != null) {
|
||||
if (is_text(state.req_body)) body_str = state.req_body
|
||||
else body_str = text(state.req_body)
|
||||
}
|
||||
|
||||
var req = state.method + " " + state.path + " HTTP/1.1" + CRLF
|
||||
req = req + "Host: " + state.host_port + CRLF
|
||||
req = req + "Connection: close" + CRLF
|
||||
req = req + "User-Agent: cell/1.0" + CRLF
|
||||
req = req + "Accept: */*" + CRLF
|
||||
|
||||
if (state.req_headers != null) {
|
||||
keys = array(state.req_headers)
|
||||
i = 0
|
||||
while (i < length(keys)) {
|
||||
req = req + keys[i] + ": " + state.req_headers[keys[i]] + CRLF
|
||||
i = i + 1
|
||||
}
|
||||
}
|
||||
|
||||
if (length(body_str) > 0) {
|
||||
req = req + "Content-Length: " + text(length(body_str)) + CRLF
|
||||
}
|
||||
|
||||
req = req + CRLF + body_str
|
||||
|
||||
if (state.tls != null) {
|
||||
tls.send(state.tls, req)
|
||||
} else {
|
||||
socket.send(state.fd, req)
|
||||
}
|
||||
} disruption {
|
||||
if (!cancelled) callback(null, "send request failed")
|
||||
return cancel
|
||||
}
|
||||
|
||||
_send()
|
||||
if (!cancelled) callback(state)
|
||||
return cancel
|
||||
}
|
||||
|
||||
// parse response headers from raw text
|
||||
function parse_headers(raw) {
|
||||
var hdr_end = search(raw, CRLF + CRLF)
|
||||
if (hdr_end == null) return null
|
||||
|
||||
var header_text = text(raw, 0, hdr_end)
|
||||
var lines = array(header_text, CRLF)
|
||||
var status_parts = array(lines[0], " ")
|
||||
var status_code = number(status_parts[1])
|
||||
|
||||
var headers = {}
|
||||
var i = 1
|
||||
var colon = null
|
||||
while (i < length(lines)) {
|
||||
colon = search(lines[i], ": ")
|
||||
if (colon != null) {
|
||||
headers[lower(text(lines[i], 0, colon))] = text(lines[i], colon + 2)
|
||||
}
|
||||
i = i + 1
|
||||
}
|
||||
|
||||
return {
|
||||
status: status_code, headers: headers,
|
||||
body_start: hdr_end + 4
|
||||
}
|
||||
}
|
||||
|
||||
// decode chunked transfer encoding
|
||||
function decode_chunked(body_text) {
|
||||
var result = ""
|
||||
var pos = 0
|
||||
var chunk_end = null
|
||||
var chunk_size = null
|
||||
while (pos < length(body_text)) {
|
||||
chunk_end = search(text(body_text, pos), CRLF)
|
||||
if (chunk_end == null) return result
|
||||
chunk_size = number(text(body_text, pos, pos + chunk_end), 16)
|
||||
if (chunk_size == null || chunk_size == 0) return result
|
||||
pos = pos + chunk_end + 2
|
||||
result = result + text(body_text, pos, pos + chunk_size)
|
||||
pos = pos + chunk_size + 2
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// receive_response requestor — async incremental receive
|
||||
var receive_response = function(callback, state) {
|
||||
var cancelled = false
|
||||
var buffer = ""
|
||||
var parsed = null
|
||||
var content_length = null
|
||||
var is_chunked = false
|
||||
var body_complete = false
|
||||
|
||||
var cancel = function() {
|
||||
var _cleanup = null
|
||||
if (!cancelled) {
|
||||
cancelled = true
|
||||
_cleanup = function() {
|
||||
if (state.tls != null) {
|
||||
tls.close(state.tls)
|
||||
} else {
|
||||
socket.unwatch(state.fd)
|
||||
socket.close(state.fd)
|
||||
}
|
||||
} disruption {}
|
||||
_cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
var finish = function() {
|
||||
if (cancelled) return
|
||||
var body_text = text(buffer, parsed.body_start)
|
||||
|
||||
if (is_chunked) {
|
||||
body_text = decode_chunked(body_text)
|
||||
}
|
||||
|
||||
// clean up connection
|
||||
var _cleanup = function() {
|
||||
if (state.tls != null) {
|
||||
tls.close(state.tls)
|
||||
} else {
|
||||
socket.close(state.fd)
|
||||
}
|
||||
} disruption {}
|
||||
_cleanup()
|
||||
|
||||
callback({
|
||||
status: parsed.status,
|
||||
headers: parsed.headers,
|
||||
body: body_text
|
||||
})
|
||||
}
|
||||
|
||||
var check_complete = function() {
|
||||
var te = null
|
||||
var cl = null
|
||||
var body_text = null
|
||||
// still waiting for headers
|
||||
if (parsed == null) {
|
||||
parsed = parse_headers(buffer)
|
||||
if (parsed == null) return false
|
||||
te = parsed.headers["transfer-encoding"]
|
||||
if (te != null && search(lower(te), "chunked") != null) {
|
||||
is_chunked = true
|
||||
}
|
||||
cl = parsed.headers["content-length"]
|
||||
if (cl != null) content_length = number(cl)
|
||||
}
|
||||
|
||||
body_text = text(buffer, parsed.body_start)
|
||||
|
||||
if (is_chunked) {
|
||||
// chunked: look for the terminating 0-length chunk
|
||||
if (search(body_text, CRLF + "0" + CRLF) != null) return true
|
||||
if (starts_with(body_text, "0" + CRLF)) return true
|
||||
return false
|
||||
}
|
||||
|
||||
if (content_length != null) {
|
||||
return length(body_text) >= content_length
|
||||
}
|
||||
|
||||
// connection: close — we read until EOF (handled by recv returning 0 bytes)
|
||||
return false
|
||||
}
|
||||
|
||||
var on_data = function() {
|
||||
if (cancelled) return
|
||||
var chunk = null
|
||||
var got_data = false
|
||||
var eof = false
|
||||
|
||||
var _recv = function() {
|
||||
if (state.tls != null) {
|
||||
chunk = tls.recv(state.tls, 16384)
|
||||
} else {
|
||||
chunk = socket.recv(state.fd, 16384)
|
||||
}
|
||||
} disruption {
|
||||
// recv error — treat as EOF
|
||||
eof = true
|
||||
}
|
||||
_recv()
|
||||
|
||||
var chunk_text = null
|
||||
if (!eof && chunk != null) {
|
||||
stone(chunk)
|
||||
chunk_text = text(chunk)
|
||||
if (length(chunk_text) > 0) {
|
||||
buffer = buffer + chunk_text
|
||||
got_data = true
|
||||
} else {
|
||||
eof = true
|
||||
}
|
||||
}
|
||||
|
||||
if (got_data && check_complete()) {
|
||||
finish()
|
||||
return
|
||||
}
|
||||
|
||||
if (eof) {
|
||||
// connection closed — if we have headers, deliver what we have
|
||||
if (parsed != null || parse_headers(buffer) != null) {
|
||||
if (parsed == null) parsed = parse_headers(buffer)
|
||||
finish()
|
||||
} else {
|
||||
cancel()
|
||||
callback(null, "connection closed before headers received")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// re-register for more data (one-shot watches)
|
||||
if (!cancelled) {
|
||||
if (state.tls != null) {
|
||||
tls.on_readable(state.tls, on_data)
|
||||
} else {
|
||||
socket.on_readable(state.fd, on_data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// start reading
|
||||
if (state.tls != null) {
|
||||
tls.on_readable(state.tls, on_data)
|
||||
} else {
|
||||
socket.on_readable(state.fd, on_data)
|
||||
}
|
||||
|
||||
return cancel
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// fetch — composed requestor pipeline
|
||||
// ============================================================
|
||||
|
||||
var fetch = function(callback, value) {
|
||||
def pipeline = runtime_env.sequence([
|
||||
parse_url,
|
||||
resolve_dns,
|
||||
open_connection,
|
||||
send_request,
|
||||
receive_response
|
||||
])
|
||||
return pipeline(callback, value)
|
||||
}
|
||||
|
||||
function close(fd) {
|
||||
socket.close(fd)
|
||||
}
|
||||
|
||||
return {
|
||||
// server
|
||||
serve: serve, accept: accept, on_request: on_request,
|
||||
respond: respond, request: request,
|
||||
respond: respond, close: close,
|
||||
sse_open: sse_open, sse_event: sse_event, sse_close: sse_close,
|
||||
close: close, fetch: c_http.fetch
|
||||
// client
|
||||
fetch: fetch,
|
||||
request: request
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user