Files
cell/pronto.cm
2026-02-10 12:13:18 -06:00

367 lines
9.5 KiB
Plaintext

// pronto.cm
// Extremism in the pursuit of parallelism is no vice.
// Based on Douglas Crockford's parseq, adapted for Cell.
// Time is in seconds.
function make_reason(factory, excuse, evidence) {
var msg = 'pronto.' + factory
if (excuse) msg = msg + ': ' + excuse
return { message: msg, evidence: evidence }
}
function is_requestor(fn) {
return is_function(fn) && (length(fn) == 1 || length(fn) == 2)
}
function check_requestors(list, factory) {
if (!is_array(list) || some(list, r => !is_requestor(r))) {
print(make_reason(factory, 'Bad requestor array.', list).message + '\n')
disrupt
}
}
function check_callback(cb, factory) {
if (!is_function(cb) || length(cb) != 2) {
print(make_reason(factory, 'Not a callback.', cb).message + '\n')
disrupt
}
}
// fallback(requestor_array)
// Tries each requestor in order until one succeeds.
function fallback(requestor_array) {
def factory = 'fallback'
if (!is_array(requestor_array) || length(requestor_array) == 0) {
print(make_reason(factory, 'Empty requestor array.').message + '\n')
disrupt
}
check_requestors(requestor_array, factory)
return function fallback_requestor(callback, value) {
check_callback(callback, factory)
var index = 0
var current_cancel = null
var cancelled = false
function cancel(reason) {
var _c = null
cancelled = true
if (current_cancel) {
_c = function() { current_cancel(reason) } disruption {}
_c()
current_cancel = null
}
}
function try_next() {
if (cancelled) return
if (index >= length(requestor_array)) {
callback(null, make_reason(factory, 'All requestors failed.'))
return
}
def requestor = requestor_array[index]
index = index + 1
var _run = function() {
current_cancel = requestor(function(val, reason) {
if (cancelled) return
current_cancel = null
if (val != null) {
callback(val)
} else {
try_next()
}
}, value)
} disruption {
try_next()
}
_run()
}
try_next()
return cancel
}
}
// parallel(requestor_array, throttle, need)
// Runs requestors in parallel, collecting all results.
function parallel(requestor_array, throttle, need) {
def factory = 'parallel'
if (!is_array(requestor_array)) {
print(make_reason(factory, 'Not an array.', requestor_array).message + '\n')
disrupt
}
check_requestors(requestor_array, factory)
def len = length(requestor_array)
if (len == 0)
return function(callback, value) { callback([]) }
var _need = need
if (_need == null) _need = len
if (!is_number(_need) || _need < 0 || _need > len) {
print(make_reason(factory, 'Bad need.', _need).message + '\n')
disrupt
}
if (throttle != null && (!is_number(throttle) || throttle < 1)) {
print(make_reason(factory, 'Bad throttle.', throttle).message + '\n')
disrupt
}
return function parallel_requestor(callback, value) {
check_callback(callback, factory)
def results = array(len)
def cancel_list = array(len)
var next_index = 0
var successes = 0
var failures = 0
var finished = false
function cancel(reason) {
if (finished) return
finished = true
arrfor(cancel_list, c => {
var _c = function() { if (is_function(c)) c(reason) } disruption {}
_c()
})
}
function start_one() {
if (finished || next_index >= len) return
def idx = next_index
next_index = next_index + 1
def requestor = requestor_array[idx]
var _run = function() {
cancel_list[idx] = requestor(function(val, reason) {
if (finished) return
cancel_list[idx] = null
if (val != null) {
results[idx] = val
successes = successes + 1
if (successes >= _need) {
finished = true
cancel(make_reason(factory, 'Finished.'))
callback(results)
return
}
} else {
failures = failures + 1
if (failures > len - _need) {
cancel(reason)
callback(null, reason || make_reason(factory, 'Too many failures.'))
return
}
}
start_one()
}, value)
} disruption {
failures = failures + 1
if (failures > len - _need) {
cancel(make_reason(factory, 'Requestor threw.'))
callback(null, make_reason(factory, 'Requestor threw.'))
return
}
start_one()
}
_run()
}
def concurrent = throttle ? min(throttle, len) : len
var i = 0
while (i < concurrent) { start_one(); i = i + 1 }
return cancel
}
}
// race(requestor_array, throttle, need)
// Runs requestors in parallel, returns first success(es).
function race(requestor_array, throttle, need) {
def factory = 'race'
if (!is_array(requestor_array) || length(requestor_array) == 0) {
print(make_reason(factory, 'Empty requestor array.').message + '\n')
disrupt
}
check_requestors(requestor_array, factory)
def len = length(requestor_array)
var _need = need
if (_need == null) _need = 1
if (!is_number(_need) || _need < 1 || _need > len) {
print(make_reason(factory, 'Bad need.', _need).message + '\n')
disrupt
}
if (throttle != null && (!is_number(throttle) || throttle < 1)) {
print(make_reason(factory, 'Bad throttle.', throttle).message + '\n')
disrupt
}
return function race_requestor(callback, value) {
check_callback(callback, factory)
def results = array(len)
def cancel_list = array(len)
var next_index = 0
var successes = 0
var failures = 0
var finished = false
function cancel(reason) {
if (finished) return
finished = true
arrfor(cancel_list, c => {
var _c = function() { if (is_function(c)) c(reason) } disruption {}
_c()
})
}
function start_one() {
if (finished || next_index >= len) return
def idx = next_index
next_index = next_index + 1
def requestor = requestor_array[idx]
var _run = function() {
cancel_list[idx] = requestor(function(val, reason) {
if (finished) return
cancel_list[idx] = null
if (val != null) {
results[idx] = val
successes = successes + 1
if (successes >= _need) {
cancel(make_reason(factory, 'Winner.'))
if (_need == 1) {
callback(val)
} else {
callback(results)
}
return
}
} else {
failures = failures + 1
if (failures > len - _need) {
cancel(reason)
callback(null, reason || make_reason(factory, 'All failed.'))
return
}
}
start_one()
}, value)
} disruption {
failures = failures + 1
if (failures > len - _need) {
cancel(make_reason(factory, 'Requestor threw.'))
callback(null, make_reason(factory, 'Requestor threw.'))
return
}
start_one()
}
_run()
}
def concurrent = throttle ? min(throttle, len) : len
var i = 0
while (i < concurrent) { start_one(); i = i + 1 }
return cancel
}
}
// sequence(requestor_array)
// Runs requestors one at a time, passing result to next.
function sequence(requestor_array) {
def factory = 'sequence'
if (!is_array(requestor_array)) {
print(make_reason(factory, 'Not an array.', requestor_array).message + '\n')
disrupt
}
check_requestors(requestor_array, factory)
if (length(requestor_array) == 0)
return function(callback, value) { callback(value) }
return function sequence_requestor(callback, value) {
check_callback(callback, factory)
var index = 0
var current_cancel = null
var cancelled = false
function cancel(reason) {
var _c = null
cancelled = true
if (current_cancel) {
_c = function() { current_cancel(reason) } disruption {}
_c()
current_cancel = null
}
}
function run_next(val) {
if (cancelled) return
if (index >= length(requestor_array)) {
callback(val)
return
}
def requestor = requestor_array[index]
index = index + 1
var _run = function() {
current_cancel = requestor(function(result, reason) {
if (cancelled) return
current_cancel = null
if (result == null) {
callback(null, reason)
} else {
run_next(result)
}
}, val)
} disruption {
callback(null, make_reason(factory, 'Requestor threw.'))
}
_run()
}
run_next(value)
return cancel
}
}
// requestorize(unary)
// Converts a unary function into a requestor.
function requestorize(unary) {
def factory = 'requestorize'
if (!is_function(unary)) {
print(make_reason(factory, 'Not a function.', unary).message + '\n')
disrupt
}
return function requestorized(callback, value) {
check_callback(callback, factory)
var _run = function() {
def result = unary(value)
callback(result == null ? true : result)
} disruption {
callback(null, make_reason(factory, 'Function threw.'))
}
_run()
}
}
return {
fallback: fallback,
parallel: parallel,
race: race,
sequence: sequence,
requestorize: requestorize,
is_requestor: is_requestor,
check_callback: check_callback
}