diff --git a/internal/engine.cm b/internal/engine.cm index 318255fb..e0c7211d 100644 --- a/internal/engine.cm +++ b/internal/engine.cm @@ -223,6 +223,15 @@ globalThis.use = shop.use globalThis.json = use('json') var time = use('time') +var pronto = use('pronto') +globalThis.fallback = pronto.fallback +globalThis.parallel = pronto.parallel +globalThis.race = pronto.race +globalThis.sequence = pronto.sequence +globalThis.time_limit = pronto.time_limit +globalThis.requestorize = pronto.requestorize +globalThis.objectify = pronto.objectify + var config = { ar_timer: 60, actor_memory:0, diff --git a/pronto.cm b/pronto.cm new file mode 100644 index 00000000..a6165893 --- /dev/null +++ b/pronto.cm @@ -0,0 +1,427 @@ +// pronto.cm +// Extremism in the pursuit of parallelism is no vice. +// Based on Douglas Crockford's parseq, adapted for Cell. +// Time is in seconds. + +var os = use('os') +var $_ = os.$_ + +function make_reason(factory, excuse, evidence) { + def reason = new Error(`pronto.${factory}${excuse ? ': ' + excuse : ''}`) + reason.evidence = evidence + return reason +} + +function is_requestor(fn) { + return typeof fn == 'function' && (fn.length == 1 || fn.length == 2) +} + +function check_requestors(list, factory) { + if (!Array.isArray(list) || list.some(r => !is_requestor(r))) + throw make_reason(factory, 'Bad requestor array.', list) +} + +function check_callback(cb, factory) { + if (typeof cb != 'function' || cb.length != 2) + throw make_reason(factory, 'Not a callback.', cb) +} + +// fallback(requestor_array) +// Tries each requestor in order until one succeeds. +function fallback(requestor_array) { + def factory = 'fallback' + if (!Array.isArray(requestor_array) || requestor_array.length == 0) + throw make_reason(factory, 'Empty requestor array.') + check_requestors(requestor_array, factory) + + return function fallback_requestor(callback, value) { + check_callback(callback, factory) + let index = 0 + let current_cancel = null + let cancelled = false + + function cancel(reason) { + cancelled = true + if (current_cancel) { + try { current_cancel(reason) } catch (_) {} + current_cancel = null + } + } + + function try_next() { + if (cancelled) return + if (index >= requestor_array.length) { + callback(null, make_reason(factory, 'All requestors failed.')) + return + } + + def requestor = requestor_array[index] + index += 1 + + try { + current_cancel = requestor(function(val, reason) { + if (cancelled) return + current_cancel = null + if (val != null) { + callback(val) + } else { + try_next() + } + }, value) + } catch (ex) { + try_next() + } + } + + try_next() + return cancel + } +} + +// parallel(requestor_array, throttle, need) +// Runs requestors in parallel, collecting all results. +function parallel(requestor_array, throttle, need) { + def factory = 'parallel' + if (!Array.isArray(requestor_array)) + throw make_reason(factory, 'Not an array.', requestor_array) + check_requestors(requestor_array, factory) + + def length = requestor_array.length + if (length == 0) + return function(callback, value) { callback([]) } + + if (need == null) need = length + if (typeof need != 'number' || need < 0 || need > length) + throw make_reason(factory, 'Bad need.', need) + + if (throttle != null && (typeof throttle != 'number' || throttle < 1)) + throw make_reason(factory, 'Bad throttle.', throttle) + + return function parallel_requestor(callback, value) { + check_callback(callback, factory) + def results = new Array(length) + def cancel_list = new Array(length) + let next_index = 0 + let successes = 0 + let failures = 0 + let finished = false + + function cancel(reason) { + if (finished) return + finished = true + cancel_list.forEach(c => { + try { if (typeof c == 'function') c(reason) } catch (_) {} + }) + } + + function start_one() { + if (finished || next_index >= length) return + def idx = next_index + next_index += 1 + def requestor = requestor_array[idx] + + try { + cancel_list[idx] = requestor(function(val, reason) { + if (finished) return + cancel_list[idx] = null + + if (val != null) { + results[idx] = val + successes += 1 + if (successes >= need) { + finished = true + cancel(make_reason(factory, 'Finished.')) + callback(results) + return + } + } else { + failures += 1 + if (failures > length - need) { + cancel(reason) + callback(null, reason || make_reason(factory, 'Too many failures.')) + return + } + } + + start_one() + }, value) + } catch (ex) { + failures += 1 + if (failures > length - need) { + cancel(ex) + callback(null, ex) + return + } + start_one() + } + } + + def concurrent = throttle ? Math.min(throttle, length) : length + for (let i = 0; i < concurrent; i++) start_one() + + return cancel + } +} + +// race(requestor_array, throttle, need) +// Runs requestors in parallel, returns first success(es). +function race(requestor_array, throttle, need) { + def factory = 'race' + if (!Array.isArray(requestor_array) || requestor_array.length == 0) + throw make_reason(factory, 'Empty requestor array.') + check_requestors(requestor_array, factory) + + def length = requestor_array.length + if (need == null) need = 1 + if (typeof need != 'number' || need < 1 || need > length) + throw make_reason(factory, 'Bad need.', need) + + if (throttle != null && (typeof throttle != 'number' || throttle < 1)) + throw make_reason(factory, 'Bad throttle.', throttle) + + return function race_requestor(callback, value) { + check_callback(callback, factory) + def results = new Array(length) + def cancel_list = new Array(length) + let next_index = 0 + let successes = 0 + let failures = 0 + let finished = false + + function cancel(reason) { + if (finished) return + finished = true + cancel_list.forEach(c => { + try { if (typeof c == 'function') c(reason) } catch (_) {} + }) + } + + function start_one() { + if (finished || next_index >= length) return + def idx = next_index + next_index += 1 + def requestor = requestor_array[idx] + + try { + cancel_list[idx] = requestor(function(val, reason) { + if (finished) return + cancel_list[idx] = null + + if (val != null) { + results[idx] = val + successes += 1 + if (successes >= need) { + cancel(make_reason(factory, 'Winner.')) + if (need == 1) { + callback(val) + } else { + callback(results) + } + return + } + } else { + failures += 1 + if (failures > length - need) { + cancel(reason) + callback(null, reason || make_reason(factory, 'All failed.')) + return + } + } + + start_one() + }, value) + } catch (ex) { + failures += 1 + if (failures > length - need) { + cancel(ex) + callback(null, ex) + return + } + start_one() + } + } + + def concurrent = throttle ? Math.min(throttle, length) : length + for (let i = 0; i < concurrent; i++) start_one() + + return cancel + } +} + +// sequence(requestor_array) +// Runs requestors one at a time, passing result to next. +function sequence(requestor_array) { + def factory = 'sequence' + if (!Array.isArray(requestor_array)) + throw make_reason(factory, 'Not an array.', requestor_array) + check_requestors(requestor_array, factory) + + if (requestor_array.length == 0) + return function(callback, value) { callback(value) } + + return function sequence_requestor(callback, value) { + check_callback(callback, factory) + let index = 0 + let current_cancel = null + let cancelled = false + + function cancel(reason) { + cancelled = true + if (current_cancel) { + try { current_cancel(reason) } catch (_) {} + current_cancel = null + } + } + + function run_next(val) { + if (cancelled) return + if (index >= requestor_array.length) { + callback(val) + return + } + + def requestor = requestor_array[index] + index += 1 + + try { + current_cancel = requestor(function(result, reason) { + if (cancelled) return + current_cancel = null + if (result == null) { + callback(null, reason) + } else { + run_next(result) + } + }, val) + } catch (ex) { + callback(null, ex) + } + } + + run_next(value) + return cancel + } +} + +// time_limit(requestor, seconds) +// Wraps a requestor with a time limit. +function time_limit(requestor, seconds) { + def factory = 'time_limit' + if (!is_requestor(requestor)) + throw make_reason(factory, 'Not a requestor.', requestor) + if (typeof seconds != 'number' || seconds <= 0) + throw make_reason(factory, 'Bad time limit.', seconds) + + return function time_limit_requestor(callback, value) { + check_callback(callback, factory) + let finished = false + let requestor_cancel = null + let timer_cancel = null + + function cancel(reason) { + if (finished) return + finished = true + if (timer_cancel) { + timer_cancel() + timer_cancel = null + } + if (requestor_cancel) { + try { requestor_cancel(reason) } catch (_) {} + requestor_cancel = null + } + } + + timer_cancel = $_.delay(function() { + if (finished) return + def reason = make_reason(factory, 'Timeout.', seconds) + if (requestor_cancel) { + try { requestor_cancel(reason) } catch (_) {} + requestor_cancel = null + } + finished = true + callback(null, reason) + }, seconds) + + try { + requestor_cancel = requestor(function(val, reason) { + if (finished) return + finished = true + if (timer_cancel) { + timer_cancel() + timer_cancel = null + } + callback(val, reason) + }, value) + } catch (ex) { + cancel(ex) + callback(null, ex) + } + + return function(reason) { + if (requestor_cancel) { + try { requestor_cancel(reason) } catch (_) {} + requestor_cancel = null + } + } + } +} + +// requestorize(unary) +// Converts a unary function into a requestor. +function requestorize(unary) { + def factory = 'requestorize' + if (typeof unary != 'function') + throw make_reason(factory, 'Not a function.', unary) + + return function requestorized(callback, value) { + check_callback(callback, factory) + try { + def result = unary(value) + callback(result == null ? true : result) + } catch (ex) { + callback(null, ex) + } + } +} + +// objectify(factory_fn) +// Converts a factory that takes arrays to one that takes objects. +function objectify(factory_fn) { + def factory = 'objectify' + if (typeof factory_fn != 'function') + throw make_reason(factory, 'Not a factory.', factory_fn) + + return function objectified_factory(object_of_requestors, ...rest) { + if (typeof object_of_requestors != 'object' || Array.isArray(object_of_requestors)) + throw make_reason(factory, 'Expected an object.', object_of_requestors) + + def keys = Object.keys(object_of_requestors) + def requestor_array = keys.map(k => object_of_requestors[k]) + + def inner_requestor = factory_fn(requestor_array, ...rest) + + return function(callback, value) { + return inner_requestor(function(results, reason) { + if (results == null) { + callback(null, reason) + } else if (Array.isArray(results)) { + def obj = {} + keys.forEach((k, i) => { obj[k] = results[i] }) + callback(obj, reason) + } else { + callback(results, reason) + } + }, value) + } + } +} + +return { + fallback, + parallel, + race, + sequence, + time_limit, + requestorize, + objectify +} diff --git a/tests/pronto.cm b/tests/pronto.cm new file mode 100644 index 00000000..4dc9fac6 --- /dev/null +++ b/tests/pronto.cm @@ -0,0 +1,137 @@ +// Test pronto functions +// Tests for fallback, parallel, race, sequence, time_limit, requestorize, objectify + +var test_count = 0 + +function make_requestor(name, delay_seconds, should_succeed) { + return function(callback, value) { + log.console(`Starting ${name} with value: ${value}`) + if (delay_seconds > 0) { + $_.delay(function() { + if (should_succeed) { + log.console(`${name} succeeded with: ${value + 1}`) + callback(value + 1) + } else { + log.console(`${name} failed`) + callback(null, `${name} error`) + } + }, delay_seconds) + } else { + if (should_succeed) { + log.console(`${name} succeeded immediately with: ${value + 1}`) + callback(value + 1) + } else { + log.console(`${name} failed immediately`) + callback(null, `${name} error`) + } + } + } +} + +return { + test_fallback: function() { + log.console("Testing fallback...") + var req1 = make_requestor("fallback_req1", 0.1, false) // fails + var req2 = make_requestor("fallback_req2", 0.1, true) // succeeds + var req3 = make_requestor("fallback_req3", 0.1, true) // should not run + + fallback([req1, req2, req3])(function(result, reason) { + if (result != null) { + log.console(`Fallback succeeded: ${result}`) + } else { + log.console(`Fallback failed: ${reason}`) + } + }, 10) + }, + + test_parallel: function() { + log.console("Testing parallel...") + var req1 = make_requestor("parallel_req1", 0.2, true) + var req2 = make_requestor("parallel_req2", 0.1, true) + var req3 = make_requestor("parallel_req3", 0.3, true) + + parallel([req1, req2, req3])(function(results, reason) { + if (results != null) { + log.console(`Parallel results: ${results}`) + } else { + log.console(`Parallel failed: ${reason}`) + } + }, 100) + }, + + test_race: function() { + log.console("Testing race...") + var req1 = make_requestor("race_req1", 0.3, true) // slow + var req2 = make_requestor("race_req2", 0.1, true) // fast winner + var req3 = make_requestor("race_req3", 0.2, true) // medium + + race([req1, req2, req3])(function(result, reason) { + if (result != null) { + log.console(`Race winner: ${result}`) + } else { + log.console(`Race failed: ${reason}`) + } + }, 200) + }, + + test_sequence: function() { + log.console("Testing sequence...") + var req1 = make_requestor("seq_req1", 0.1, true) + var req2 = make_requestor("seq_req2", 0.1, true) + var req3 = make_requestor("seq_req3", 0.1, true) + + sequence([req1, req2, req3])(function(result, reason) { + if (result != null) { + log.console(`Sequence result: ${result}`) + } else { + log.console(`Sequence failed: ${reason}`) + } + }, 1000) + }, + + test_time_limit: function() { + log.console("Testing time_limit...") + var slow_req = make_requestor("slow_req", 0.5, true) // takes 0.5s + var timed_req = time_limit(slow_req, 0.2) // 0.2s limit + + timed_req(function(result, reason) { + if (result != null) { + log.console(`Time limit succeeded: ${result}`) + } else { + log.console(`Time limit failed: ${reason}`) + } + }, 100) + }, + + test_requestorize: function() { + log.console("Testing requestorize...") + var add_one = function(x) { return x + 1 } + var req = requestorize(add_one) + + req(function(result, reason) { + if (result != null) { + log.console(`Requestorize result: ${result}`) + } else { + log.console(`Requestorize failed: ${reason}`) + } + }, 42) + }, + + test_objectify: function() { + log.console("Testing objectify...") + var req_a = make_requestor("obj_req_a", 0.1, true) + var req_b = make_requestor("obj_req_b", 0.1, true) + var req_c = make_requestor("obj_req_c", 0.1, true) + + var parallel_obj = objectify(parallel) + var req = parallel_obj({a: req_a, b: req_b, c: req_c}) + + req(function(result, reason) { + if (result != null) { + log.console(`Objectify result: ${json.encode(result)}`) + } else { + log.console(`Objectify failed: ${reason}`) + } + }, 1000) + } +} \ No newline at end of file