// pronto.cm // Extremism in the pursuit of parallelism is no vice. // Based on Douglas Crockford's parseq, adapted for Cell. // Time is in seconds. function make_reason(factory, excuse, evidence) { var msg = 'pronto.' + factory if (excuse) msg = msg + ': ' + excuse return { message: msg, evidence: evidence } } function is_requestor(fn) { return is_function(fn) && (length(fn) == 1 || length(fn) == 2) } function check_requestors(list, factory) { if (!is_array(list) || some(list, r => !is_requestor(r))) { print(make_reason(factory, 'Bad requestor array.', list).message + '\n') disrupt } } function check_callback(cb, factory) { if (!is_function(cb) || length(cb) != 2) { print(make_reason(factory, 'Not a callback.', cb).message + '\n') disrupt } } // fallback(requestor_array) // Tries each requestor in order until one succeeds. function fallback(requestor_array) { def factory = 'fallback' if (!is_array(requestor_array) || length(requestor_array) == 0) { print(make_reason(factory, 'Empty requestor array.').message + '\n') disrupt } check_requestors(requestor_array, factory) return function fallback_requestor(callback, value) { check_callback(callback, factory) var index = 0 var current_cancel = null var cancelled = false function cancel(reason) { cancelled = true if (current_cancel) { var _c = function() { current_cancel(reason) } disruption {} _c() current_cancel = null } } function try_next() { if (cancelled) return if (index >= length(requestor_array)) { callback(null, make_reason(factory, 'All requestors failed.')) return } def requestor = requestor_array[index] index = index + 1 var _run = function() { current_cancel = requestor(function(val, reason) { if (cancelled) return current_cancel = null if (val != null) { callback(val) } else { try_next() } }, value) } disruption { try_next() } _run() } try_next() return cancel } } // parallel(requestor_array, throttle, need) // Runs requestors in parallel, collecting all results. function parallel(requestor_array, throttle, need) { def factory = 'parallel' if (!is_array(requestor_array)) { print(make_reason(factory, 'Not an array.', requestor_array).message + '\n') disrupt } check_requestors(requestor_array, factory) def len = length(requestor_array) if (len == 0) return function(callback, value) { callback([]) } var _need = need if (_need == null) _need = len if (!is_number(_need) || _need < 0 || _need > len) { print(make_reason(factory, 'Bad need.', _need).message + '\n') disrupt } if (throttle != null && (!is_number(throttle) || throttle < 1)) { print(make_reason(factory, 'Bad throttle.', throttle).message + '\n') disrupt } return function parallel_requestor(callback, value) { check_callback(callback, factory) def results = array(len) def cancel_list = array(len) var next_index = 0 var successes = 0 var failures = 0 var finished = false function cancel(reason) { if (finished) return finished = true arrfor(cancel_list, c => { var _c = function() { if (is_function(c)) c(reason) } disruption {} _c() }) } function start_one() { if (finished || next_index >= len) return def idx = next_index next_index = next_index + 1 def requestor = requestor_array[idx] var _run = function() { cancel_list[idx] = requestor(function(val, reason) { if (finished) return cancel_list[idx] = null if (val != null) { results[idx] = val successes = successes + 1 if (successes >= _need) { finished = true cancel(make_reason(factory, 'Finished.')) callback(results) return } } else { failures = failures + 1 if (failures > len - _need) { cancel(reason) callback(null, reason || make_reason(factory, 'Too many failures.')) return } } start_one() }, value) } disruption { failures = failures + 1 if (failures > len - _need) { cancel(make_reason(factory, 'Requestor threw.')) callback(null, make_reason(factory, 'Requestor threw.')) return } start_one() } _run() } def concurrent = throttle ? min(throttle, len) : len var i = 0 while (i < concurrent) { start_one(); i = i + 1 } return cancel } } // race(requestor_array, throttle, need) // Runs requestors in parallel, returns first success(es). function race(requestor_array, throttle, need) { def factory = 'race' if (!is_array(requestor_array) || length(requestor_array) == 0) { print(make_reason(factory, 'Empty requestor array.').message + '\n') disrupt } check_requestors(requestor_array, factory) def len = length(requestor_array) var _need = need if (_need == null) _need = 1 if (!is_number(_need) || _need < 1 || _need > len) { print(make_reason(factory, 'Bad need.', _need).message + '\n') disrupt } if (throttle != null && (!is_number(throttle) || throttle < 1)) { print(make_reason(factory, 'Bad throttle.', throttle).message + '\n') disrupt } return function race_requestor(callback, value) { check_callback(callback, factory) def results = array(len) def cancel_list = array(len) var next_index = 0 var successes = 0 var failures = 0 var finished = false function cancel(reason) { if (finished) return finished = true arrfor(cancel_list, c => { var _c = function() { if (is_function(c)) c(reason) } disruption {} _c() }) } function start_one() { if (finished || next_index >= len) return def idx = next_index next_index = next_index + 1 def requestor = requestor_array[idx] var _run = function() { cancel_list[idx] = requestor(function(val, reason) { if (finished) return cancel_list[idx] = null if (val != null) { results[idx] = val successes = successes + 1 if (successes >= _need) { cancel(make_reason(factory, 'Winner.')) if (_need == 1) { callback(val) } else { callback(results) } return } } else { failures = failures + 1 if (failures > len - _need) { cancel(reason) callback(null, reason || make_reason(factory, 'All failed.')) return } } start_one() }, value) } disruption { failures = failures + 1 if (failures > len - _need) { cancel(make_reason(factory, 'Requestor threw.')) callback(null, make_reason(factory, 'Requestor threw.')) return } start_one() } _run() } def concurrent = throttle ? min(throttle, len) : len var i = 0 while (i < concurrent) { start_one(); i = i + 1 } return cancel } } // sequence(requestor_array) // Runs requestors one at a time, passing result to next. function sequence(requestor_array) { def factory = 'sequence' if (!is_array(requestor_array)) { print(make_reason(factory, 'Not an array.', requestor_array).message + '\n') disrupt } check_requestors(requestor_array, factory) if (length(requestor_array) == 0) return function(callback, value) { callback(value) } return function sequence_requestor(callback, value) { check_callback(callback, factory) var index = 0 var current_cancel = null var cancelled = false function cancel(reason) { cancelled = true if (current_cancel) { var _c = function() { current_cancel(reason) } disruption {} _c() current_cancel = null } } function run_next(val) { if (cancelled) return if (index >= length(requestor_array)) { callback(val) return } def requestor = requestor_array[index] index = index + 1 var _run = function() { current_cancel = requestor(function(result, reason) { if (cancelled) return current_cancel = null if (result == null) { callback(null, reason) } else { run_next(result) } }, val) } disruption { callback(null, make_reason(factory, 'Requestor threw.')) } _run() } run_next(value) return cancel } } // requestorize(unary) // Converts a unary function into a requestor. function requestorize(unary) { def factory = 'requestorize' if (!is_function(unary)) { print(make_reason(factory, 'Not a function.', unary).message + '\n') disrupt } return function requestorized(callback, value) { check_callback(callback, factory) var _run = function() { def result = unary(value) callback(result == null ? true : result) } disruption { callback(null, make_reason(factory, 'Function threw.')) } _run() } } return { fallback: fallback, parallel: parallel, race: race, sequence: sequence, requestorize: requestorize, is_requestor: is_requestor, check_callback: check_callback }