331 lines
8.4 KiB
Plaintext
331 lines
8.4 KiB
Plaintext
// pronto.cm
|
|
// Extremism in the pursuit of parallelism is no vice.
|
|
// Based on Douglas Crockford's parseq, adapted for Cell.
|
|
// Time is in seconds.
|
|
|
|
function make_reason(factory, excuse, evidence) {
|
|
def reason = Error(`pronto.${factory}${excuse ? ': ' + excuse : ''}`)
|
|
reason.evidence = evidence
|
|
return reason
|
|
}
|
|
|
|
function is_requestor(fn) {
|
|
return is_function(fn) && (length(fn) == 1 || length(fn) == 2)
|
|
}
|
|
|
|
function check_requestors(list, factory) {
|
|
if (!is_array(list) || some(list, r => !is_requestor(r)))
|
|
throw make_reason(factory, 'Bad requestor array.', list)
|
|
}
|
|
|
|
function check_callback(cb, factory) {
|
|
if (!is_function(cb) || length(cb) != 2)
|
|
throw make_reason(factory, 'Not a callback.', cb)
|
|
}
|
|
|
|
// fallback(requestor_array)
|
|
// Tries each requestor in order until one succeeds.
|
|
function fallback(requestor_array) {
|
|
def factory = 'fallback'
|
|
if (!is_array(requestor_array) || length(requestor_array) == 0)
|
|
throw make_reason(factory, 'Empty requestor array.')
|
|
check_requestors(requestor_array, factory)
|
|
|
|
return function fallback_requestor(callback, value) {
|
|
check_callback(callback, factory)
|
|
var index = 0
|
|
var current_cancel = null
|
|
var cancelled = false
|
|
|
|
function cancel(reason) {
|
|
cancelled = true
|
|
if (current_cancel) {
|
|
try { current_cancel(reason) } catch (_) {}
|
|
current_cancel = null
|
|
}
|
|
}
|
|
|
|
function try_next() {
|
|
if (cancelled) return
|
|
if (index >= length(requestor_array)) {
|
|
callback(null, make_reason(factory, 'All requestors failed.'))
|
|
return
|
|
}
|
|
|
|
def requestor = requestor_array[index]
|
|
index += 1
|
|
|
|
try {
|
|
current_cancel = requestor(function(val, reason) {
|
|
if (cancelled) return
|
|
current_cancel = null
|
|
if (val != null) {
|
|
callback(val)
|
|
} else {
|
|
try_next()
|
|
}
|
|
}, value)
|
|
} catch (ex) {
|
|
try_next()
|
|
}
|
|
}
|
|
|
|
try_next()
|
|
return cancel
|
|
}
|
|
}
|
|
|
|
// parallel(requestor_array, throttle, need)
|
|
// Runs requestors in parallel, collecting all results.
|
|
function parallel(requestor_array, throttle, need) {
|
|
def factory = 'parallel'
|
|
if (!is_array(requestor_array))
|
|
throw make_reason(factory, 'Not an array.', requestor_array)
|
|
check_requestors(requestor_array, factory)
|
|
|
|
def length = length(requestor_array)
|
|
if (length == 0)
|
|
return function(callback, value) { callback([]) }
|
|
|
|
if (need == null) need = length
|
|
if (!is_number(need) || need < 0 || need > length)
|
|
throw make_reason(factory, 'Bad need.', need)
|
|
|
|
if (throttle != null && (!is_number(throttle) || throttle < 1))
|
|
throw make_reason(factory, 'Bad throttle.', throttle)
|
|
|
|
return function parallel_requestor(callback, value) {
|
|
check_callback(callback, factory)
|
|
def results = array(length)
|
|
def cancel_list = array(length)
|
|
var next_index = 0
|
|
var successes = 0
|
|
var failures = 0
|
|
var finished = false
|
|
|
|
function cancel(reason) {
|
|
if (finished) return
|
|
finished = true
|
|
arrfor(cancel_list, c => {
|
|
try { if (is_function(c)) c(reason) } catch (_) {}
|
|
})
|
|
}
|
|
|
|
function start_one() {
|
|
if (finished || next_index >= length) return
|
|
def idx = next_index
|
|
next_index += 1
|
|
def requestor = requestor_array[idx]
|
|
|
|
try {
|
|
cancel_list[idx] = requestor(function(val, reason) {
|
|
if (finished) return
|
|
cancel_list[idx] = null
|
|
|
|
if (val != null) {
|
|
results[idx] = val
|
|
successes += 1
|
|
if (successes >= need) {
|
|
finished = true
|
|
cancel(make_reason(factory, 'Finished.'))
|
|
callback(results)
|
|
return
|
|
}
|
|
} else {
|
|
failures += 1
|
|
if (failures > length - need) {
|
|
cancel(reason)
|
|
callback(null, reason || make_reason(factory, 'Too many failures.'))
|
|
return
|
|
}
|
|
}
|
|
|
|
start_one()
|
|
}, value)
|
|
} catch (ex) {
|
|
failures += 1
|
|
if (failures > length - need) {
|
|
cancel(ex)
|
|
callback(null, ex)
|
|
return
|
|
}
|
|
start_one()
|
|
}
|
|
}
|
|
|
|
|
|
def concurrent = throttle ? min(throttle, length) : length
|
|
for (var i = 0; i < concurrent; i++) start_one()
|
|
|
|
return cancel
|
|
}
|
|
}
|
|
|
|
// race(requestor_array, throttle, need)
|
|
// Runs requestors in parallel, returns first success(es).
|
|
function race(requestor_array, throttle, need) {
|
|
def factory = 'race'
|
|
if (!is_array(requestor_array) || length(requestor_array) == 0)
|
|
throw make_reason(factory, 'Empty requestor array.')
|
|
check_requestors(requestor_array, factory)
|
|
|
|
def length = length(requestor_array)
|
|
if (need == null) need = 1
|
|
if (!is_number(need) || need < 1 || need > length)
|
|
throw make_reason(factory, 'Bad need.', need)
|
|
|
|
if (throttle != null && (!is_number(throttle) || throttle < 1))
|
|
throw make_reason(factory, 'Bad throttle.', throttle)
|
|
|
|
return function race_requestor(callback, value) {
|
|
check_callback(callback, factory)
|
|
def results = array(length)
|
|
def cancel_list = array(length)
|
|
var next_index = 0
|
|
var successes = 0
|
|
var failures = 0
|
|
var finished = false
|
|
|
|
function cancel(reason) {
|
|
if (finished) return
|
|
finished = true
|
|
arrfor(cancel_list, c => {
|
|
try { if (is_function(c)) c(reason) } catch (_) {}
|
|
})
|
|
}
|
|
|
|
function start_one() {
|
|
if (finished || next_index >= length) return
|
|
def idx = next_index
|
|
next_index += 1
|
|
def requestor = requestor_array[idx]
|
|
|
|
try {
|
|
cancel_list[idx] = requestor(function(val, reason) {
|
|
if (finished) return
|
|
cancel_list[idx] = null
|
|
|
|
if (val != null) {
|
|
results[idx] = val
|
|
successes += 1
|
|
if (successes >= need) {
|
|
cancel(make_reason(factory, 'Winner.'))
|
|
if (need == 1) {
|
|
callback(val)
|
|
} else {
|
|
callback(results)
|
|
}
|
|
return
|
|
}
|
|
} else {
|
|
failures += 1
|
|
if (failures > length - need) {
|
|
cancel(reason)
|
|
callback(null, reason || make_reason(factory, 'All failed.'))
|
|
return
|
|
}
|
|
}
|
|
|
|
start_one()
|
|
}, value)
|
|
} catch (ex) {
|
|
failures += 1
|
|
if (failures > length - need) {
|
|
cancel(ex)
|
|
callback(null, ex)
|
|
return
|
|
}
|
|
start_one()
|
|
}
|
|
}
|
|
|
|
def concurrent = throttle ? min(throttle, length) : length
|
|
for (var i = 0; i < concurrent; i++) start_one()
|
|
|
|
return cancel
|
|
}
|
|
}
|
|
|
|
// sequence(requestor_array)
|
|
// Runs requestors one at a time, passing result to next.
|
|
function sequence(requestor_array) {
|
|
def factory = 'sequence'
|
|
if (!is_array(requestor_array))
|
|
throw make_reason(factory, 'Not an array.', requestor_array)
|
|
check_requestors(requestor_array, factory)
|
|
|
|
if (length(requestor_array) == 0)
|
|
return function(callback, value) { callback(value) }
|
|
|
|
return function sequence_requestor(callback, value) {
|
|
check_callback(callback, factory)
|
|
var index = 0
|
|
var current_cancel = null
|
|
var cancelled = false
|
|
|
|
function cancel(reason) {
|
|
cancelled = true
|
|
if (current_cancel) {
|
|
try { current_cancel(reason) } catch (_) {}
|
|
current_cancel = null
|
|
}
|
|
}
|
|
|
|
function run_next(val) {
|
|
if (cancelled) return
|
|
if (index >= length(requestor_array)) {
|
|
callback(val)
|
|
return
|
|
}
|
|
|
|
def requestor = requestor_array[index]
|
|
index += 1
|
|
|
|
try {
|
|
current_cancel = requestor(function(result, reason) {
|
|
if (cancelled) return
|
|
current_cancel = null
|
|
if (result == null) {
|
|
callback(null, reason)
|
|
} else {
|
|
run_next(result)
|
|
}
|
|
}, val)
|
|
} catch (ex) {
|
|
callback(null, ex)
|
|
}
|
|
}
|
|
|
|
run_next(value)
|
|
return cancel
|
|
}
|
|
}
|
|
|
|
// requestorize(unary)
|
|
// Converts a unary function into a requestor.
|
|
function requestorize(unary) {
|
|
def factory = 'requestorize'
|
|
if (!is_function(unary))
|
|
throw make_reason(factory, 'Not a function.', unary)
|
|
|
|
return function requestorized(callback, value) {
|
|
check_callback(callback, factory)
|
|
try {
|
|
def result = unary(value)
|
|
callback(result == null ? true : result)
|
|
} catch (ex) {
|
|
callback(null, ex)
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
fallback,
|
|
parallel,
|
|
race,
|
|
sequence,
|
|
requestorize,
|
|
is_requestor,
|
|
check_callback
|
|
}
|