add pronto and tests

This commit is contained in:
2025-12-13 19:14:20 -06:00
parent 33dc6b053c
commit b98cef6e8d
3 changed files with 573 additions and 0 deletions

View File

@@ -223,6 +223,15 @@ globalThis.use = shop.use
globalThis.json = use('json')
var time = use('time')
var pronto = use('pronto')
globalThis.fallback = pronto.fallback
globalThis.parallel = pronto.parallel
globalThis.race = pronto.race
globalThis.sequence = pronto.sequence
globalThis.time_limit = pronto.time_limit
globalThis.requestorize = pronto.requestorize
globalThis.objectify = pronto.objectify
var config = {
ar_timer: 60,
actor_memory:0,

427
pronto.cm Normal file
View File

@@ -0,0 +1,427 @@
// pronto.cm
// Extremism in the pursuit of parallelism is no vice.
// Based on Douglas Crockford's parseq, adapted for Cell.
// Time is in seconds.
var os = use('os')
var $_ = os.$_
function make_reason(factory, excuse, evidence) {
def reason = new Error(`pronto.${factory}${excuse ? ': ' + excuse : ''}`)
reason.evidence = evidence
return reason
}
function is_requestor(fn) {
return typeof fn == 'function' && (fn.length == 1 || fn.length == 2)
}
function check_requestors(list, factory) {
if (!Array.isArray(list) || list.some(r => !is_requestor(r)))
throw make_reason(factory, 'Bad requestor array.', list)
}
function check_callback(cb, factory) {
if (typeof cb != 'function' || cb.length != 2)
throw make_reason(factory, 'Not a callback.', cb)
}
// fallback(requestor_array)
// Tries each requestor in order until one succeeds.
function fallback(requestor_array) {
def factory = 'fallback'
if (!Array.isArray(requestor_array) || requestor_array.length == 0)
throw make_reason(factory, 'Empty requestor array.')
check_requestors(requestor_array, factory)
return function fallback_requestor(callback, value) {
check_callback(callback, factory)
let index = 0
let current_cancel = null
let cancelled = false
function cancel(reason) {
cancelled = true
if (current_cancel) {
try { current_cancel(reason) } catch (_) {}
current_cancel = null
}
}
function try_next() {
if (cancelled) return
if (index >= requestor_array.length) {
callback(null, make_reason(factory, 'All requestors failed.'))
return
}
def requestor = requestor_array[index]
index += 1
try {
current_cancel = requestor(function(val, reason) {
if (cancelled) return
current_cancel = null
if (val != null) {
callback(val)
} else {
try_next()
}
}, value)
} catch (ex) {
try_next()
}
}
try_next()
return cancel
}
}
// parallel(requestor_array, throttle, need)
// Runs requestors in parallel, collecting all results.
function parallel(requestor_array, throttle, need) {
def factory = 'parallel'
if (!Array.isArray(requestor_array))
throw make_reason(factory, 'Not an array.', requestor_array)
check_requestors(requestor_array, factory)
def length = requestor_array.length
if (length == 0)
return function(callback, value) { callback([]) }
if (need == null) need = length
if (typeof need != 'number' || need < 0 || need > length)
throw make_reason(factory, 'Bad need.', need)
if (throttle != null && (typeof throttle != 'number' || throttle < 1))
throw make_reason(factory, 'Bad throttle.', throttle)
return function parallel_requestor(callback, value) {
check_callback(callback, factory)
def results = new Array(length)
def cancel_list = new Array(length)
let next_index = 0
let successes = 0
let failures = 0
let finished = false
function cancel(reason) {
if (finished) return
finished = true
cancel_list.forEach(c => {
try { if (typeof c == 'function') c(reason) } catch (_) {}
})
}
function start_one() {
if (finished || next_index >= length) return
def idx = next_index
next_index += 1
def requestor = requestor_array[idx]
try {
cancel_list[idx] = requestor(function(val, reason) {
if (finished) return
cancel_list[idx] = null
if (val != null) {
results[idx] = val
successes += 1
if (successes >= need) {
finished = true
cancel(make_reason(factory, 'Finished.'))
callback(results)
return
}
} else {
failures += 1
if (failures > length - need) {
cancel(reason)
callback(null, reason || make_reason(factory, 'Too many failures.'))
return
}
}
start_one()
}, value)
} catch (ex) {
failures += 1
if (failures > length - need) {
cancel(ex)
callback(null, ex)
return
}
start_one()
}
}
def concurrent = throttle ? Math.min(throttle, length) : length
for (let i = 0; i < concurrent; i++) start_one()
return cancel
}
}
// race(requestor_array, throttle, need)
// Runs requestors in parallel, returns first success(es).
function race(requestor_array, throttle, need) {
def factory = 'race'
if (!Array.isArray(requestor_array) || requestor_array.length == 0)
throw make_reason(factory, 'Empty requestor array.')
check_requestors(requestor_array, factory)
def length = requestor_array.length
if (need == null) need = 1
if (typeof need != 'number' || need < 1 || need > length)
throw make_reason(factory, 'Bad need.', need)
if (throttle != null && (typeof throttle != 'number' || throttle < 1))
throw make_reason(factory, 'Bad throttle.', throttle)
return function race_requestor(callback, value) {
check_callback(callback, factory)
def results = new Array(length)
def cancel_list = new Array(length)
let next_index = 0
let successes = 0
let failures = 0
let finished = false
function cancel(reason) {
if (finished) return
finished = true
cancel_list.forEach(c => {
try { if (typeof c == 'function') c(reason) } catch (_) {}
})
}
function start_one() {
if (finished || next_index >= length) return
def idx = next_index
next_index += 1
def requestor = requestor_array[idx]
try {
cancel_list[idx] = requestor(function(val, reason) {
if (finished) return
cancel_list[idx] = null
if (val != null) {
results[idx] = val
successes += 1
if (successes >= need) {
cancel(make_reason(factory, 'Winner.'))
if (need == 1) {
callback(val)
} else {
callback(results)
}
return
}
} else {
failures += 1
if (failures > length - need) {
cancel(reason)
callback(null, reason || make_reason(factory, 'All failed.'))
return
}
}
start_one()
}, value)
} catch (ex) {
failures += 1
if (failures > length - need) {
cancel(ex)
callback(null, ex)
return
}
start_one()
}
}
def concurrent = throttle ? Math.min(throttle, length) : length
for (let i = 0; i < concurrent; i++) start_one()
return cancel
}
}
// sequence(requestor_array)
// Runs requestors one at a time, passing result to next.
function sequence(requestor_array) {
def factory = 'sequence'
if (!Array.isArray(requestor_array))
throw make_reason(factory, 'Not an array.', requestor_array)
check_requestors(requestor_array, factory)
if (requestor_array.length == 0)
return function(callback, value) { callback(value) }
return function sequence_requestor(callback, value) {
check_callback(callback, factory)
let index = 0
let current_cancel = null
let cancelled = false
function cancel(reason) {
cancelled = true
if (current_cancel) {
try { current_cancel(reason) } catch (_) {}
current_cancel = null
}
}
function run_next(val) {
if (cancelled) return
if (index >= requestor_array.length) {
callback(val)
return
}
def requestor = requestor_array[index]
index += 1
try {
current_cancel = requestor(function(result, reason) {
if (cancelled) return
current_cancel = null
if (result == null) {
callback(null, reason)
} else {
run_next(result)
}
}, val)
} catch (ex) {
callback(null, ex)
}
}
run_next(value)
return cancel
}
}
// time_limit(requestor, seconds)
// Wraps a requestor with a time limit.
function time_limit(requestor, seconds) {
def factory = 'time_limit'
if (!is_requestor(requestor))
throw make_reason(factory, 'Not a requestor.', requestor)
if (typeof seconds != 'number' || seconds <= 0)
throw make_reason(factory, 'Bad time limit.', seconds)
return function time_limit_requestor(callback, value) {
check_callback(callback, factory)
let finished = false
let requestor_cancel = null
let timer_cancel = null
function cancel(reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
if (requestor_cancel) {
try { requestor_cancel(reason) } catch (_) {}
requestor_cancel = null
}
}
timer_cancel = $_.delay(function() {
if (finished) return
def reason = make_reason(factory, 'Timeout.', seconds)
if (requestor_cancel) {
try { requestor_cancel(reason) } catch (_) {}
requestor_cancel = null
}
finished = true
callback(null, reason)
}, seconds)
try {
requestor_cancel = requestor(function(val, reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
callback(val, reason)
}, value)
} catch (ex) {
cancel(ex)
callback(null, ex)
}
return function(reason) {
if (requestor_cancel) {
try { requestor_cancel(reason) } catch (_) {}
requestor_cancel = null
}
}
}
}
// requestorize(unary)
// Converts a unary function into a requestor.
function requestorize(unary) {
def factory = 'requestorize'
if (typeof unary != 'function')
throw make_reason(factory, 'Not a function.', unary)
return function requestorized(callback, value) {
check_callback(callback, factory)
try {
def result = unary(value)
callback(result == null ? true : result)
} catch (ex) {
callback(null, ex)
}
}
}
// objectify(factory_fn)
// Converts a factory that takes arrays to one that takes objects.
function objectify(factory_fn) {
def factory = 'objectify'
if (typeof factory_fn != 'function')
throw make_reason(factory, 'Not a factory.', factory_fn)
return function objectified_factory(object_of_requestors, ...rest) {
if (typeof object_of_requestors != 'object' || Array.isArray(object_of_requestors))
throw make_reason(factory, 'Expected an object.', object_of_requestors)
def keys = Object.keys(object_of_requestors)
def requestor_array = keys.map(k => object_of_requestors[k])
def inner_requestor = factory_fn(requestor_array, ...rest)
return function(callback, value) {
return inner_requestor(function(results, reason) {
if (results == null) {
callback(null, reason)
} else if (Array.isArray(results)) {
def obj = {}
keys.forEach((k, i) => { obj[k] = results[i] })
callback(obj, reason)
} else {
callback(results, reason)
}
}, value)
}
}
}
return {
fallback,
parallel,
race,
sequence,
time_limit,
requestorize,
objectify
}

137
tests/pronto.cm Normal file
View File

@@ -0,0 +1,137 @@
// Test pronto functions
// Tests for fallback, parallel, race, sequence, time_limit, requestorize, objectify
var test_count = 0
function make_requestor(name, delay_seconds, should_succeed) {
return function(callback, value) {
log.console(`Starting ${name} with value: ${value}`)
if (delay_seconds > 0) {
$_.delay(function() {
if (should_succeed) {
log.console(`${name} succeeded with: ${value + 1}`)
callback(value + 1)
} else {
log.console(`${name} failed`)
callback(null, `${name} error`)
}
}, delay_seconds)
} else {
if (should_succeed) {
log.console(`${name} succeeded immediately with: ${value + 1}`)
callback(value + 1)
} else {
log.console(`${name} failed immediately`)
callback(null, `${name} error`)
}
}
}
}
return {
test_fallback: function() {
log.console("Testing fallback...")
var req1 = make_requestor("fallback_req1", 0.1, false) // fails
var req2 = make_requestor("fallback_req2", 0.1, true) // succeeds
var req3 = make_requestor("fallback_req3", 0.1, true) // should not run
fallback([req1, req2, req3])(function(result, reason) {
if (result != null) {
log.console(`Fallback succeeded: ${result}`)
} else {
log.console(`Fallback failed: ${reason}`)
}
}, 10)
},
test_parallel: function() {
log.console("Testing parallel...")
var req1 = make_requestor("parallel_req1", 0.2, true)
var req2 = make_requestor("parallel_req2", 0.1, true)
var req3 = make_requestor("parallel_req3", 0.3, true)
parallel([req1, req2, req3])(function(results, reason) {
if (results != null) {
log.console(`Parallel results: ${results}`)
} else {
log.console(`Parallel failed: ${reason}`)
}
}, 100)
},
test_race: function() {
log.console("Testing race...")
var req1 = make_requestor("race_req1", 0.3, true) // slow
var req2 = make_requestor("race_req2", 0.1, true) // fast winner
var req3 = make_requestor("race_req3", 0.2, true) // medium
race([req1, req2, req3])(function(result, reason) {
if (result != null) {
log.console(`Race winner: ${result}`)
} else {
log.console(`Race failed: ${reason}`)
}
}, 200)
},
test_sequence: function() {
log.console("Testing sequence...")
var req1 = make_requestor("seq_req1", 0.1, true)
var req2 = make_requestor("seq_req2", 0.1, true)
var req3 = make_requestor("seq_req3", 0.1, true)
sequence([req1, req2, req3])(function(result, reason) {
if (result != null) {
log.console(`Sequence result: ${result}`)
} else {
log.console(`Sequence failed: ${reason}`)
}
}, 1000)
},
test_time_limit: function() {
log.console("Testing time_limit...")
var slow_req = make_requestor("slow_req", 0.5, true) // takes 0.5s
var timed_req = time_limit(slow_req, 0.2) // 0.2s limit
timed_req(function(result, reason) {
if (result != null) {
log.console(`Time limit succeeded: ${result}`)
} else {
log.console(`Time limit failed: ${reason}`)
}
}, 100)
},
test_requestorize: function() {
log.console("Testing requestorize...")
var add_one = function(x) { return x + 1 }
var req = requestorize(add_one)
req(function(result, reason) {
if (result != null) {
log.console(`Requestorize result: ${result}`)
} else {
log.console(`Requestorize failed: ${reason}`)
}
}, 42)
},
test_objectify: function() {
log.console("Testing objectify...")
var req_a = make_requestor("obj_req_a", 0.1, true)
var req_b = make_requestor("obj_req_b", 0.1, true)
var req_c = make_requestor("obj_req_c", 0.1, true)
var parallel_obj = objectify(parallel)
var req = parallel_obj({a: req_a, b: req_b, c: req_c})
req(function(result, reason) {
if (result != null) {
log.console(`Objectify result: ${json.encode(result)}`)
} else {
log.console(`Objectify failed: ${reason}`)
}
}, 1000)
}
}