// pronto.cm // Extremism in the pursuit of parallelism is no vice. // Based on Douglas Crockford's parseq, adapted for Cell. // Time is in seconds. function make_reason(factory, excuse, evidence) { def reason = Error(`pronto.${factory}${excuse ? ': ' + excuse : ''}`) reason.evidence = evidence return reason } function is_requestor(fn) { return is_function(fn) && (fn.length == 1 || fn.length == 2) } function check_requestors(list, factory) { if (!is_array(list) || list.some(r => !is_requestor(r))) throw make_reason(factory, 'Bad requestor array.', list) } function check_callback(cb, factory) { if (!is_function(cb) || cb.length != 2) throw make_reason(factory, 'Not a callback.', cb) } // fallback(requestor_array) // Tries each requestor in order until one succeeds. function fallback(requestor_array) { def factory = 'fallback' if (!is_array(requestor_array) || requestor_array.length == 0) throw make_reason(factory, 'Empty requestor array.') check_requestors(requestor_array, factory) return function fallback_requestor(callback, value) { check_callback(callback, factory) var index = 0 var current_cancel = null var cancelled = false function cancel(reason) { cancelled = true if (current_cancel) { try { current_cancel(reason) } catch (_) {} current_cancel = null } } function try_next() { if (cancelled) return if (index >= requestor_array.length) { callback(null, make_reason(factory, 'All requestors failed.')) return } def requestor = requestor_array[index] index += 1 try { current_cancel = requestor(function(val, reason) { if (cancelled) return current_cancel = null if (val != null) { callback(val) } else { try_next() } }, value) } catch (ex) { try_next() } } try_next() return cancel } } // parallel(requestor_array, throttle, need) // Runs requestors in parallel, collecting all results. function parallel(requestor_array, throttle, need) { def factory = 'parallel' if (!is_array(requestor_array)) throw make_reason(factory, 'Not an array.', requestor_array) check_requestors(requestor_array, factory) def length = requestor_array.length if (length == 0) return function(callback, value) { callback([]) } if (need == null) need = length if (!is_number(need) || need < 0 || need > length) throw make_reason(factory, 'Bad need.', need) if (throttle != null && (!is_number(throttle) || throttle < 1)) throw make_reason(factory, 'Bad throttle.', throttle) return function parallel_requestor(callback, value) { check_callback(callback, factory) def results = array(length) def cancel_list = array(length) var next_index = 0 var successes = 0 var failures = 0 var finished = false function cancel(reason) { if (finished) return finished = true cancel_list.forEach(c => { try { if (is_function(c)) c(reason) } catch (_) {} }) } function start_one() { if (finished || next_index >= length) return def idx = next_index next_index += 1 def requestor = requestor_array[idx] try { cancel_list[idx] = requestor(function(val, reason) { if (finished) return cancel_list[idx] = null if (val != null) { results[idx] = val successes += 1 if (successes >= need) { finished = true cancel(make_reason(factory, 'Finished.')) callback(results) return } } else { failures += 1 if (failures > length - need) { cancel(reason) callback(null, reason || make_reason(factory, 'Too many failures.')) return } } start_one() }, value) } catch (ex) { failures += 1 if (failures > length - need) { cancel(ex) callback(null, ex) return } start_one() } } def concurrent = throttle ? min(throttle, length) : length for (var i = 0; i < concurrent; i++) start_one() return cancel } } // race(requestor_array, throttle, need) // Runs requestors in parallel, returns first success(es). function race(requestor_array, throttle, need) { def factory = 'race' if (!is_array(requestor_array) || requestor_array.length == 0) throw make_reason(factory, 'Empty requestor array.') check_requestors(requestor_array, factory) def length = requestor_array.length if (need == null) need = 1 if (!is_number(need) || need < 1 || need > length) throw make_reason(factory, 'Bad need.', need) if (throttle != null && (!is_number(throttle) || throttle < 1)) throw make_reason(factory, 'Bad throttle.', throttle) return function race_requestor(callback, value) { check_callback(callback, factory) def results = array(length) def cancel_list = array(length) var next_index = 0 var successes = 0 var failures = 0 var finished = false function cancel(reason) { if (finished) return finished = true cancel_list.forEach(c => { try { if (is_function(c)) c(reason) } catch (_) {} }) } function start_one() { if (finished || next_index >= length) return def idx = next_index next_index += 1 def requestor = requestor_array[idx] try { cancel_list[idx] = requestor(function(val, reason) { if (finished) return cancel_list[idx] = null if (val != null) { results[idx] = val successes += 1 if (successes >= need) { cancel(make_reason(factory, 'Winner.')) if (need == 1) { callback(val) } else { callback(results) } return } } else { failures += 1 if (failures > length - need) { cancel(reason) callback(null, reason || make_reason(factory, 'All failed.')) return } } start_one() }, value) } catch (ex) { failures += 1 if (failures > length - need) { cancel(ex) callback(null, ex) return } start_one() } } def concurrent = throttle ? min(throttle, length) : length for (var i = 0; i < concurrent; i++) start_one() return cancel } } // sequence(requestor_array) // Runs requestors one at a time, passing result to next. function sequence(requestor_array) { def factory = 'sequence' if (!is_array(requestor_array)) throw make_reason(factory, 'Not an array.', requestor_array) check_requestors(requestor_array, factory) if (requestor_array.length == 0) return function(callback, value) { callback(value) } return function sequence_requestor(callback, value) { check_callback(callback, factory) var index = 0 var current_cancel = null var cancelled = false function cancel(reason) { cancelled = true if (current_cancel) { try { current_cancel(reason) } catch (_) {} current_cancel = null } } function run_next(val) { if (cancelled) return if (index >= requestor_array.length) { callback(val) return } def requestor = requestor_array[index] index += 1 try { current_cancel = requestor(function(result, reason) { if (cancelled) return current_cancel = null if (result == null) { callback(null, reason) } else { run_next(result) } }, val) } catch (ex) { callback(null, ex) } } run_next(value) return cancel } } // requestorize(unary) // Converts a unary function into a requestor. function requestorize(unary) { def factory = 'requestorize' if (!is_function(unary)) throw make_reason(factory, 'Not a function.', unary) return function requestorized(callback, value) { check_callback(callback, factory) try { def result = unary(value) callback(result == null ? true : result) } catch (ex) { callback(null, ex) } } } // objectify(factory_fn) // Converts a factory that takes arrays to one that takes objects. function objectify(factory_fn) { def factory = 'objectify' if (!is_function(factory_fn)) throw make_reason(factory, 'Not a factory.', factory_fn) return function objectified_factory(object_of_requestors, ...rest) { if (!is_object(object_of_requestors)) throw make_reason(factory, 'Expected an object.', object_of_requestors) def keys = array(object_of_requestors) def requestor_array = keys.map(k => object_of_requestors[k]) def inner_requestor = factory_fn(requestor_array, ...rest) return function(callback, value) { return inner_requestor(function(results, reason) { if (results == null) { callback(null, reason) } else if (is_array(results)) { def obj = {} keys.forEach((k, i) => { obj[k] = results[i] }) callback(obj, reason) } else { callback(results, reason) } }, value) } } } return { fallback, parallel, race, sequence, requestorize, objectify, is_requestor, check_callback }