var socket = use('socket') var tls = use('net/tls') def CRLF = "\r\n" def status_texts = { "200": "OK", "201": "Created", "204": "No Content", "301": "Moved Permanently", "302": "Found", "307": "Temporary Redirect", "400": "Bad Request", "401": "Unauthorized", "403": "Forbidden", "404": "Not Found", "405": "Method Not Allowed", "500": "Internal Server Error" } // ============================================================ // Server (unchanged) // ============================================================ function serve(port) { var fd = socket.socket("AF_INET", "SOCK_STREAM") socket.setsockopt(fd, "SOL_SOCKET", "SO_REUSEADDR", true) socket.bind(fd, {address: "127.0.0.1", port: port}) socket.listen(fd, 16) return fd } function parse_request(conn_fd) { var data = socket.recv(conn_fd, 65536) var raw = text(data) var hdr_end = search(raw, CRLF + CRLF) if (hdr_end == null) disrupt var header_text = text(raw, 0, hdr_end) var body_text = text(raw, hdr_end + 4) var lines = array(header_text, CRLF) var parts = array(lines[0], " ") var method = parts[0] var url = parts[1] var qpos = search(url, "?") var path = qpos != null ? text(url, 0, qpos) : url var headers = {} var i = 1 var colon = null var key = null var val = null while (i < length(lines)) { colon = search(lines[i], ": ") if (colon != null) { key = lower(text(lines[i], 0, colon)) val = text(lines[i], colon + 2) headers[key] = val } i = i + 1 } var cl = headers["content-length"] var content_length = null var remaining = null var more = null if (cl != null) content_length = number(cl) if (content_length != null && length(body_text) < content_length) { remaining = content_length - length(body_text) more = socket.recv(conn_fd, remaining) body_text = body_text + text(more) } if (content_length == null || content_length == 0) body_text = null return { method: method, path: path, url: url, headers: headers, body: body_text, _conn: conn_fd } } function accept(server_fd) { var conn = socket.accept(server_fd) return parse_request(conn.socket) } function on_request(server_fd, handler) { var _accept = function() { var conn = socket.accept(server_fd) var req = null var _parse = function() { req = parse_request(conn.socket) } disruption { req = null } _parse() if (req != null) handler(req) socket.on_readable(server_fd, _accept) } socket.on_readable(server_fd, _accept) } function respond(conn, status, headers, body) { var st = status_texts[text(status)] if (st == null) st = "Unknown" var out = "HTTP/1.1 " + text(status) + " " + st + CRLF out = out + "Connection: close" + CRLF var body_str = "" var keys = null var i = 0 if (body != null) { if (is_text(body)) body_str = body else body_str = text(body) } if (headers != null) { keys = array(headers) i = 0 while (i < length(keys)) { out = out + keys[i] + ": " + headers[keys[i]] + CRLF i = i + 1 } } out = out + "Content-Length: " + text(length(body_str)) + CRLF out = out + CRLF + body_str socket.send(conn, out) socket.close(conn) } function sse_open(conn, headers) { var out = "HTTP/1.1 200 OK" + CRLF out = out + "Content-Type: text/event-stream" + CRLF out = out + "Cache-Control: no-cache" + CRLF out = out + "Connection: keep-alive" + CRLF var keys = null var i = 0 if (headers != null) { keys = array(headers) i = 0 while (i < length(keys)) { out = out + keys[i] + ": " + headers[keys[i]] + CRLF i = i + 1 } } out = out + CRLF socket.send(conn, out) } function sse_event(conn, event, data) { var frame = "event: " + event + "\ndata: " + data + "\n\n" var ok = true var _send = function() { socket.send(conn, frame) } disruption { ok = false } _send() return ok } function sse_close(conn) { socket.close(conn) } // ============================================================ // Blocking client request (kept for compatibility) // ============================================================ function request(method, url, headers, body) { var parts = array(url, "/") var host_port = parts[2] var path = "/" + text(array(parts, 3, length(parts)), "/") var hp = array(host_port, ":") var host = hp[0] var port = length(hp) > 1 ? number(hp[1]) : 80 var fd = socket.socket("AF_INET", "SOCK_STREAM") var raw = null var hdr_end = null var _do = function() { socket.connect(fd, {address: host, port: port}) var body_str = "" if (body != null) { if (is_text(body)) body_str = body else body_str = text(body) } var keys = null var i = 0 var req = method + " " + path + " HTTP/1.1" + CRLF req = req + "Host: " + host_port + CRLF req = req + "Connection: close" + CRLF if (headers != null) { keys = array(headers) i = 0 while (i < length(keys)) { req = req + keys[i] + ": " + headers[keys[i]] + CRLF i = i + 1 } } if (length(body_str) > 0) { req = req + "Content-Length: " + text(length(body_str)) + CRLF } req = req + CRLF + body_str socket.send(fd, req) raw = text(socket.recv(fd, 65536)) } disruption { raw = null } _do() socket.close(fd) if (raw == null) return null hdr_end = search(raw, CRLF + CRLF) if (hdr_end == null) return null var header_text = text(raw, 0, hdr_end) var lines = array(header_text, CRLF) var status_parts = array(lines[0], " ") var status = number(status_parts[1]) var resp_headers = {} var hi = 1 var colon = null while (hi < length(lines)) { colon = search(lines[hi], ": ") if (colon != null) { resp_headers[lower(text(lines[hi], 0, colon))] = text(lines[hi], colon + 2) } hi = hi + 1 } return { status: status, headers: resp_headers, body: text(raw, hdr_end + 4) } } // ============================================================ // Requestor-based async fetch // ============================================================ // parse_url requestor — sync, extract {scheme, host, port, path} from URL var parse_url = function(callback, value) { var url = null var method = "GET" var req_headers = null var req_body = null log.console("value type=" + text(is_text(value)) + " val=" + text(value)) if (is_text(value)) { url = value log.console("url after assign=" + text(is_text(url)) + " url=" + text(url)) } else { url = value.url if (value.method != null) method = value.method if (value.headers != null) req_headers = value.headers if (value.body != null) req_body = value.body } // strip scheme var scheme = "http" var rest = url var scheme_end = search(url, "://") log.console("A: url_type=" + text(is_text(url)) + " scheme_end=" + text(scheme_end)) if (scheme_end != null) { scheme = lower(text(url, 0, scheme_end)) rest = text(url, scheme_end + 3, length(url)) log.console("B: scheme=" + scheme + " rest=" + rest + " rest_type=" + text(is_text(rest))) } // split host from path var slash = search(rest, "/") var host_port = rest var path = "/" log.console("C: slash=" + text(slash)) if (slash != null) { host_port = text(rest, 0, slash) path = text(rest, slash, length(rest)) } // split host:port var hp = array(host_port, ":") var host = hp[0] var port = null if (length(hp) > 1) { port = number(hp[1]) } else { port = scheme == "https" ? 443 : 80 } callback({ scheme: scheme, host: host, port: port, path: path, host_port: host_port, method: method, req_headers: req_headers, req_body: req_body }) return null } // resolve_dns requestor — blocking getaddrinfo, swappable later var resolve_dns = function(callback, state) { var ok = true var addrs = null var _resolve = function() { addrs = socket.getaddrinfo(state.host, text(state.port)) } disruption { ok = false } _resolve() if (!ok || addrs == null || length(addrs) == 0) { callback(null, "dns resolution failed for " + state.host) return null } callback(record(state, {address: addrs[0].address})) return null } // open_connection requestor — non-blocking connect + optional TLS var open_connection = function(callback, state) { var fd = socket.socket("AF_INET", "SOCK_STREAM") var cancelled = false var cancel = function() { var _close = null if (!cancelled) { cancelled = true _close = function() { socket.unwatch(fd) socket.close(fd) } disruption {} _close() } } socket.setnonblock(fd) var finish_connect = function(the_fd) { var ctx = null if (state.scheme == "https") { ctx = tls.wrap(the_fd, state.host) } callback(record(state, {fd: the_fd, tls: ctx})) } // non-blocking connect — EINPROGRESS is expected var connect_err = false var _connect = function() { socket.connect(fd, {address: state.address, port: state.port}) } disruption { connect_err = true } _connect() // if connect succeeded immediately (localhost, etc) var _finish_immediate = null if (!connect_err && !cancelled) { _finish_immediate = function() { finish_connect(fd) } disruption { cancel() callback(null, "connection setup failed") } _finish_immediate() return cancel } // wait for connect to complete socket.on_writable(fd, function() { if (cancelled) return var err = socket.getsockopt(fd, "SOL_SOCKET", "SO_ERROR") if (err != 0) { cancel() callback(null, "connect failed (errno " + text(err) + ")") return } var _finish = function() { finish_connect(fd) } disruption { cancel() callback(null, "connection setup failed") } _finish() }) return cancel } // send_request requestor — format + send HTTP/1.1 request var send_request = function(callback, state) { var cancelled = false var cancel = function() { cancelled = true } var _send = function() { var body_str = "" var keys = null var i = 0 if (state.req_body != null) { if (is_text(state.req_body)) body_str = state.req_body else body_str = text(state.req_body) } var req = state.method + " " + state.path + " HTTP/1.1" + CRLF req = req + "Host: " + state.host_port + CRLF req = req + "Connection: close" + CRLF req = req + "User-Agent: cell/1.0" + CRLF req = req + "Accept: */*" + CRLF if (state.req_headers != null) { keys = array(state.req_headers) i = 0 while (i < length(keys)) { req = req + keys[i] + ": " + state.req_headers[keys[i]] + CRLF i = i + 1 } } if (length(body_str) > 0) { req = req + "Content-Length: " + text(length(body_str)) + CRLF } req = req + CRLF + body_str if (state.tls != null) { tls.send(state.tls, req) } else { socket.send(state.fd, req) } } disruption { if (!cancelled) callback(null, "send request failed") return cancel } _send() if (!cancelled) callback(state) return cancel } // parse response headers from raw text function parse_headers(raw) { var hdr_end = search(raw, CRLF + CRLF) if (hdr_end == null) return null var header_text = text(raw, 0, hdr_end) var lines = array(header_text, CRLF) var status_parts = array(lines[0], " ") var status_code = number(status_parts[1]) var headers = {} var i = 1 var colon = null while (i < length(lines)) { colon = search(lines[i], ": ") if (colon != null) { headers[lower(text(lines[i], 0, colon))] = text(lines[i], colon + 2) } i = i + 1 } return { status: status_code, headers: headers, body_start: hdr_end + 4 } } // decode chunked transfer encoding function decode_chunked(body_text) { var result = "" var pos = 0 var chunk_end = null var chunk_size = null while (pos < length(body_text)) { chunk_end = search(text(body_text, pos), CRLF) if (chunk_end == null) return result chunk_size = number(text(body_text, pos, pos + chunk_end), 16) if (chunk_size == null || chunk_size == 0) return result pos = pos + chunk_end + 2 result = result + text(body_text, pos, pos + chunk_size) pos = pos + chunk_size + 2 } return result } // receive_response requestor — async incremental receive var receive_response = function(callback, state) { var cancelled = false var buffer = "" var parsed = null var content_length = null var is_chunked = false var body_complete = false var cancel = function() { var _cleanup = null if (!cancelled) { cancelled = true _cleanup = function() { if (state.tls != null) { tls.close(state.tls) } else { socket.unwatch(state.fd) socket.close(state.fd) } } disruption {} _cleanup() } } var finish = function() { if (cancelled) return var body_text = text(buffer, parsed.body_start) if (is_chunked) { body_text = decode_chunked(body_text) } // clean up connection var _cleanup = function() { if (state.tls != null) { tls.close(state.tls) } else { socket.close(state.fd) } } disruption {} _cleanup() callback({ status: parsed.status, headers: parsed.headers, body: body_text }) } var check_complete = function() { var te = null var cl = null var body_text = null // still waiting for headers if (parsed == null) { parsed = parse_headers(buffer) if (parsed == null) return false te = parsed.headers["transfer-encoding"] if (te != null && search(lower(te), "chunked") != null) { is_chunked = true } cl = parsed.headers["content-length"] if (cl != null) content_length = number(cl) } body_text = text(buffer, parsed.body_start) if (is_chunked) { // chunked: look for the terminating 0-length chunk if (search(body_text, CRLF + "0" + CRLF) != null) return true if (starts_with(body_text, "0" + CRLF)) return true return false } if (content_length != null) { return length(body_text) >= content_length } // connection: close — we read until EOF (handled by recv returning 0 bytes) return false } var on_data = function() { if (cancelled) return var chunk = null var got_data = false var eof = false var _recv = function() { if (state.tls != null) { chunk = tls.recv(state.tls, 16384) } else { chunk = socket.recv(state.fd, 16384) } } disruption { // recv error — treat as EOF eof = true } _recv() var chunk_text = null if (!eof && chunk != null) { stone(chunk) chunk_text = text(chunk) if (length(chunk_text) > 0) { buffer = buffer + chunk_text got_data = true } else { eof = true } } if (got_data && check_complete()) { finish() return } if (eof) { // connection closed — if we have headers, deliver what we have if (parsed != null || parse_headers(buffer) != null) { if (parsed == null) parsed = parse_headers(buffer) finish() } else { cancel() callback(null, "connection closed before headers received") } return } // re-register for more data (one-shot watches) if (!cancelled) { if (state.tls != null) { tls.on_readable(state.tls, on_data) } else { socket.on_readable(state.fd, on_data) } } } // start reading if (state.tls != null) { tls.on_readable(state.tls, on_data) } else { socket.on_readable(state.fd, on_data) } return cancel } // ============================================================ // fetch — composed requestor pipeline // ============================================================ var fetch = function(callback, value) { def pipeline = runtime_env.sequence([ parse_url, resolve_dns, open_connection, send_request, receive_response ]) return pipeline(callback, value) } function close(fd) { socket.close(fd) } return { // server serve: serve, accept: accept, on_request: on_request, respond: respond, close: close, sse_open: sse_open, sse_event: sse_event, sse_close: sse_close, // client fetch: fetch, request: request }