fix parseq and parseq test

This commit is contained in:
2025-05-29 02:19:24 -05:00
parent 9279e21b84
commit 4e118dd8e9
4 changed files with 233 additions and 597 deletions

View File

@@ -397,3 +397,4 @@ $_.receiver(e => {
})
var parseq = use('parseq', $_.delay)
for (var i in parseq) log.console(i)

View File

@@ -1,584 +1,224 @@
// parseq.js
// Douglas Crockford
// 2020-11-09
// parseq.js (Misty edition)
// Douglas Crockford → adapted for Misty by ChatGPT, 20250529
// Better living thru eventuality!
// You can access the parseq object in your module by importing it.
// import parseq from "./parseq.js";
/*
 The original parseq.js relied on the browser's setTimeout and ran in
 milliseconds. In Misty we may be given an optional @.delay capability
 (arguments[0]) and time limits are expressed in **seconds**. This rewrite
 removes the setTimeout dependency, uses the @.delay capability when it is
 present, and provides the factories described in the Misty specification:
/*jslint node */
    fallback, par_all, par_any, race, sequence
/*property
concat, create, evidence, fallback, forEach, freeze, isArray, isSafeInteger,
keys, length, min, parallel, parallel_object, pop, push, race, sequence,
some
 Each factory returns a **requestor** function as described by the spec.
*/
function make_reason(factory_name, excuse, evidence) {
const delay = arg[0] // may be undefined
// Make a reason object. These are used for exceptions and cancellations.
// They are made from Error objects.
// ———————————————————————————————————————— helpers
const reason = new Error("parseq." + factory_name + (
excuse === undefined
? ""
: ": " + excuse
));
reason.evidence = evidence;
return reason;
function make_reason (factory, excuse, evidence) {
const reason = new Error(`parseq.${factory}${excuse ? ': ' + excuse : ''}`)
reason.evidence = evidence
return reason
}
function get_array_length(array, factory_name) {
if (Array.isArray(array)) {
return array.length;
}
if (array === undefined) {
return 0;
}
throw make_reason(factory_name, "Not an array.", array);
function is_requestor (fn) {
return typeof fn === 'function' && (fn.length === 1 || fn.length === 2)
}
function check_callback(callback, factory_name) {
if (typeof callback !== "function" || callback.length !== 2) {
throw make_reason(factory_name, "Not a callback function.", callback);
}
function check_requestors (list, factory) {
if (!Array.isArray(list) || list.some(r => !is_requestor(r)))
throw make_reason(factory, 'Bad requestor list.', list)
}
function check_requestors(requestor_array, factory_name) {
// A requestor array contains only requestors. A requestor is a function that
// takes wun or two arguments: 'callback' and optionally 'initial_value'.
if (requestor_array.some(function (requestor) {
return (
typeof requestor !== "function"
|| requestor.length < 1
|| requestor.length > 2
);
})) {
throw make_reason(
factory_name,
"Bad requestors array.",
requestor_array
);
}
function check_callback (cb, factory) {
if (typeof cb !== 'function' || cb.length !== 2)
throw make_reason(factory, 'Not a callback.', cb)
}
function run(
factory_name,
requestor_array,
initial_value,
action,
timeout,
time_limit,
throttle = 0
) {
// The 'run' function does the work that is common to all of the Parseq
// factories. It takes the name of the factory, an array of requestors, an
// initial value, an action callback, a timeout callback, a time limit in
// milliseconds, and a throttle.
// If all goes well, we call all of the requestor functions in the array. Each
// of them might return a cancel function that is kept in the 'cancel_array'.
let cancel_array = new Array(requestor_array.length);
let next_number = 0;
let timer_id;
// We need 'cancel' and 'start_requestor' functions.
function cancel(reason = make_reason(factory_name, "Cancel.")) {
// Stop all unfinished business. This can be called when a requestor fails.
// It can also be called when a requestor succeeds, such as 'race' stopping
// its losers, or 'parallel' stopping the unfinished optionals.
// If a timer is running, stop it.
if (timer_id !== undefined) {
clearTimeout(timer_id);
timer_id = undefined;
function schedule (fn, seconds) {
if (seconds === undefined || seconds <= 0) return fn()
if (typeof delay === 'function') return delay(fn, seconds)
throw make_reason('schedule', '@.delay capability required for timeouts.')
}
// If anything is still going, cancel it.
// ———————————————————————————————————————— core runner
if (cancel_array !== undefined) {
cancel_array.forEach(function (cancel) {
try {
if (typeof cancel === "function") {
return cancel(reason);
}
} catch (ignore) {}
});
cancel_array = undefined;
}
function run (factory, requestors, initial, action, time_limit, throttle = 0) {
let cancel_list = new Array(requestors.length)
let next = 0
let timer_cancel
function cancel (reason = make_reason(factory, 'Cancel.')) {
if (timer_cancel) timer_cancel(), timer_cancel = undefined
if (!cancel_list) return
cancel_list.forEach(c => { try { if (typeof c === 'function') c(reason) } catch (_) {} })
cancel_list = undefined
}
function start_requestor (value) {
if (!cancel_list || next >= requestors.length) return
let idx = next++
const req = requestors[idx]
// The 'start_requestor' function is not recursive, exactly. It does not
// directly call itself, but it does return a function that might call
// 'start_requestor'.
// Start the execution of a requestor, if there are any still waiting.
if (
cancel_array !== undefined
&& next_number < requestor_array.length
) {
// Each requestor has a number.
let number = next_number;
next_number += 1;
// Call the next requestor, passing in a callback function,
// saving the cancel function that the requestor might return.
const requestor = requestor_array[number];
try {
cancel_array[number] = requestor(
function start_requestor_callback(value, reason) {
// This callback function is called by the 'requestor' when it is done.
// If we are no longer running, then this call is ignored.
// For example, it might be a result that is sent back after the time
// limit has expired. This callback function can only be called wunce.
if (
cancel_array !== undefined
&& number !== undefined
) {
// We no longer need the cancel associated with this requestor.
cancel_array[number] = undefined;
// Call the 'action' function to let the requestor know what happened.
action(value, reason, number);
// Clear 'number' so this callback can not be used again.
number = undefined;
// If there are any requestors that are still waiting to start, then
// start the next wun. If the next requestor is in a sequence, then it
// gets the most recent 'value'. The others get the 'initial_value'.
setTimeout(start_requestor, 0, (
factory_name === "sequence"
? value
: initial_value
));
}
},
value
);
// Requestors are required to report their failure thru the callback.
// They are not allowed to throw exceptions. If we happen to catch wun,
// it is treated as a failure.
} catch (exception) {
action(undefined, exception, number);
number = undefined;
start_requestor(value);
cancel_list[idx] = req(function req_cb (val, reason) {
if (!cancel_list || idx === undefined) return
cancel_list[idx] = undefined
action(val, reason, idx)
idx = undefined
if (factory === 'sequence') start_requestor(val)
else if (throttle) start_requestor(initial)
}, value)
} catch (ex) {
action(undefined, ex, idx)
idx = undefined
if (factory === 'sequence') start_requestor(value)
else if (throttle) start_requestor(initial)
}
}
}
// With the 'cancel' and the 'start_requestor' functions in hand,
// we can now get to work.
// If a timeout was requested, start the timer.
if (time_limit !== undefined) {
if (typeof time_limit === "number" && time_limit >= 0) {
if (time_limit > 0) {
timer_id = setTimeout(timeout, time_limit);
if (typeof time_limit !== 'number' || time_limit < 0)
throw make_reason(factory, 'Bad time limit.', time_limit)
if (time_limit > 0) timer_cancel = schedule(() => cancel(make_reason(factory, 'Timeout.', time_limit)), time_limit)
}
} else {
throw make_reason(factory_name, "Bad time limit.", time_limit);
const concurrent = throttle ? Math.min(throttle, requestors.length) : requestors.length
for (let i = 0; i < concurrent; i++) start_requestor(initial)
return cancel
}
// ———————————————————————————————————————— factories
function _normalize (collection, factory) {
if (Array.isArray(collection)) return { names: null, list: collection }
if (collection && typeof collection === 'object') {
const names = Object.keys(collection)
const list = names.map(k => collection[k]).filter(is_requestor)
return { names, list }
}
throw make_reason(factory, 'Expected array or record.', collection)
}
function _denormalize (names, list) {
if (!names) return list
const obj = Object.create(null)
names.forEach((k, i) => { obj[k] = list[i] })
return obj
}
function par_all (collection, time_limit, throttle) {
const factory = 'par_all'
const { names, list } = _normalize(collection, factory)
if (list.length === 0) return (cb, v) => cb(names ? {} : [])
check_requestors(list, factory)
return function par_all_req (cb, initial) {
check_callback(cb, factory)
let pending = list.length
const results = new Array(list.length)
const cancel = run(factory, list, initial, (val, reason, idx) => {
if (val === undefined) {
cancel(reason)
return cb(undefined, reason)
}
results[idx] = val
if (--pending === 0) cb(_denormalize(names, results))
}, time_limit, throttle)
return cancel
}
}
// If we are doing 'race' or 'parallel', we want to start all of the requestors
// at wunce. However, if there is a 'throttle' in place then we start as many
// as the 'throttle' allows, and then as each requestor finishes, another is
// started.
function par_any (collection, time_limit, throttle) {
const factory = 'par_any'
const { names, list } = _normalize(collection, factory)
if (list.length === 0) return (cb, v) => cb(names ? {} : [])
check_requestors(list, factory)
// The 'sequence' and 'fallback' factories set 'throttle' to 1 because they
// process wun at a time and always start another requestor when the
// previous requestor finishes.
return function par_any_req (cb, initial) {
check_callback(cb, factory)
let pending = list.length
const successes = new Array(list.length)
if (!Number.isSafeInteger(throttle) || throttle < 0) {
throw make_reason(factory_name, "Bad throttle.", throttle);
}
let repeat = Math.min(throttle || Infinity, requestor_array.length);
while (repeat > 0) {
setTimeout(start_requestor, 0, initial_value);
repeat -= 1;
const cancel = run(factory, list, initial, (val, reason, idx) => {
pending--
if (val !== undefined) successes[idx] = val
if (successes.some(v => v !== undefined)) {
if (!pending) cancel(make_reason(factory, 'Finished.'))
return cb(_denormalize(names, successes.filter(v => v !== undefined)))
}
if (!pending) cb(undefined, make_reason(factory, 'No successes.'))
}, time_limit, throttle)
// We return 'cancel' which allows the requestor to cancel this work.
return cancel;
}
// The factories ///////////////////////////////////////////////////////////////
function parallel(
required_array,
optional_array,
time_limit,
time_option,
throttle,
factory_name = "parallel"
) {
// The parallel factory is the most complex of these factories. It can take
// a second array of requestors that get a more forgiving failure policy.
// It returns a requestor that produces an array of values.
let requestor_array;
// There are four cases because 'required_array' and 'optional_array'
// can both be empty.
let number_of_required = get_array_length(required_array, factory_name);
if (number_of_required === 0) {
if (get_array_length(optional_array, factory_name) === 0) {
// If both are empty, then 'requestor_array' is empty.
requestor_array = [];
} else {
// If there is only 'optional_array', then it is the 'requestor_array'.
requestor_array = optional_array;
time_option = true;
}
} else {
// If there is only 'required_array', then it is the 'requestor_array'.
if (get_array_length(optional_array, factory_name) === 0) {
requestor_array = required_array;
time_option = undefined;
// If both arrays are provided, we concatenate them together.
} else {
requestor_array = required_array.concat(optional_array);
if (time_option !== undefined && typeof time_option !== "boolean") {
throw make_reason(
factory_name,
"Bad time_option.",
time_option
);
}
}
}
// We check the array and return the requestor.
check_requestors(requestor_array, factory_name);
return function parallel_requestor(callback, initial_value) {
check_callback(callback, factory_name);
let number_of_pending = requestor_array.length;
let number_of_pending_required = number_of_required;
let results = [];
if (number_of_pending === 0) {
callback(
factory_name === "sequence"
? initial_value
: results
);
return;
}
// 'run' gets it started.
let cancel = run(
factory_name,
requestor_array,
initial_value,
function parallel_action(value, reason, number) {
// The action function gets the result of each requestor in the array.
// 'parallel' wants to return an array of all of the values it sees.
results[number] = value;
number_of_pending -= 1;
// If the requestor was wun of the requireds, make sure it was successful.
// If it failed, then the parallel operation fails. If an optionals requestor
// fails, we can still continue.
if (number < number_of_required) {
number_of_pending_required -= 1;
if (value === undefined) {
cancel(reason);
callback(undefined, reason);
callback = undefined;
return;
return cancel
}
}
// If all have been processed, or if the requireds have all succeeded
// and we do not have a 'time_option', then we are done.
function race (list, time_limit, throttle) {
const factory = throttle === 1 ? 'fallback' : 'race'
if (!Array.isArray(list) || list.length === 0)
throw make_reason(factory, 'No requestors.')
check_requestors(list, factory)
if (
number_of_pending < 1
|| (
time_option === undefined
&& number_of_pending_required < 1
)
) {
cancel(make_reason(factory_name, "Optional."));
callback(
factory_name === "sequence"
? results.pop()
: results
);
callback = undefined;
return function race_req (cb, initial) {
check_callback(cb, factory)
let done = false
const cancel = run(factory, list, initial, (val, reason, idx) => {
if (done) return
if (val !== undefined) {
done = true
cancel(make_reason(factory, 'Loser.', idx))
cb(val)
} else if (--list.length === 0) {
done = true
cancel(reason)
cb(undefined, reason)
}
},
function parallel_timeout() {
// When the timer fires, work stops unless we were under the 'false'
// time option. The 'false' time option puts no time limits on the
// requireds, allowing the optionals to run until the requireds finish
// or the time expires, whichever happens last.
const reason = make_reason(
factory_name,
"Timeout.",
time_limit
);
if (time_option === false) {
time_option = undefined;
if (number_of_pending_required < 1) {
cancel(reason);
callback(results);
}, time_limit, throttle)
return cancel
}
} else {
// Time has expired. If all of the requireds were successful,
// then the parallel operation is successful.
cancel(reason);
if (number_of_pending_required < 1) {
callback(results);
} else {
callback(undefined, reason);
}
callback = undefined;
}
},
time_limit,
throttle
);
return cancel;
};
}
function parallel_object(
required_object,
optional_object,
time_limit,
time_option,
throttle
) {
// 'parallel_object' is similar to 'parallel' except that it takes and
// produces objects of requestors instead of arrays of requestors. This
// factory converts the objects to arrays, and the requestor it returns
// turns them back again. It lets 'parallel' do most of the work.
const names = [];
let required_array = [];
let optional_array = [];
// Extract the names and requestors from 'required_object'.
// We only collect functions with an arity of 1 or 2.
if (required_object) {
if (typeof required_object !== "object") {
throw make_reason(
"parallel_object",
"Type mismatch.",
required_object
);
}
Object.keys(required_object).forEach(function (name) {
let requestor = required_object[name];
if (
typeof requestor === "function"
&& (requestor.length === 1 || requestor.length === 2)
) {
names.push(name);
required_array.push(requestor);
}
});
function fallback (list, time_limit) {
return race(list, time_limit, 1)
}
// Extract the names and requestors from 'optional_object'.
// Look for duplicate keys.
function sequence (list, time_limit) {
const factory = 'sequence'
if (!Array.isArray(list)) throw make_reason(factory, 'Not an array.', list)
check_requestors(list, factory)
if (list.length === 0) return (cb, v) => cb(v)
if (optional_object) {
if (typeof optional_object !== "object") {
throw make_reason(
"parallel_object",
"Type mismatch.",
optional_object
);
return function sequence_req (cb, initial) {
check_callback(cb, factory)
let idx = 0
function next (value) {
if (idx >= list.length) return cb(value)
try {
list[idx++](function seq_cb (val, reason) {
if (val === undefined) return cb(undefined, reason)
next(val)
}, value)
} catch (ex) {
cb(undefined, ex)
}
Object.keys(optional_object).forEach(function (name) {
let requestor = optional_object[name];
if (
typeof requestor === "function"
&& (requestor.length === 1 || requestor.length === 2)
) {
if (required_object && required_object[name] !== undefined) {
throw make_reason(
"parallel_object",
"Duplicate name.",
name
);
}
names.push(name);
optional_array.push(requestor);
}
});
}
// Call 'parallel' to get a requestor.
const parallel_requestor = parallel(
required_array,
optional_array,
time_limit,
time_option,
throttle,
"parallel_object"
);
// Return the parallel object requestor.
return function parallel_object_requestor(callback, initial_value) {
// When our requestor is called, we return the result of our parallel requestor.
return parallel_requestor(
// We pass our callback to the parallel requestor,
// converting its value into an object.
function parallel_object_callback(value, reason) {
if (value === undefined) {
return callback(undefined, reason);
next(initial)
}
const object = Object.create(null);
names.forEach(function (name, index) {
object[name] = value[index];
});
return callback(object);
},
initial_value
);
};
}
function race(requestor_array, time_limit, throttle) {
// The 'race' factory returns a requestor that starts all of the
// requestors in 'requestor_array' at wunce. The first success wins.
const factory_name = (
throttle === 1
? "fallback"
: "race"
);
if (get_array_length(requestor_array, factory_name) === 0) {
throw make_reason(factory_name, "No requestors.");
}
check_requestors(requestor_array, factory_name);
return function race_requestor(callback, initial_value) {
check_callback(callback, factory_name);
let number_of_pending = requestor_array.length;
let cancel = run(
factory_name,
requestor_array,
initial_value,
function race_action(value, reason, number) {
number_of_pending -= 1;
if (value !== undefined) {
// We have a winner. Cancel the losers and pass the value to the 'callback'.
cancel(make_reason(factory_name, "Loser.", number));
callback(value);
callback = undefined;
} else if (number_of_pending < 1) {
// There was no winner. Signal a failure.
cancel(reason);
callback(undefined, reason);
callback = undefined;
}
},
function race_timeout() {
let reason = make_reason(
factory_name,
"Timeout.",
time_limit
);
cancel(reason);
callback(undefined, reason);
callback = undefined;
},
time_limit,
throttle
);
return cancel;
};
}
function fallback(requestor_array, time_limit) {
// The 'fallback' factory returns a requestor that tries each requestor
// in 'requestor_array', wun at a time, until it finds a successful wun.
return race(requestor_array, time_limit, 1);
}
function sequence(requestor_array, time_limit) {
// A sequence runs each requestor in order, passing results to the next,
// as long as they are all successful. A sequence is a throttled parallel.
return parallel(
requestor_array,
undefined,
time_limit,
undefined,
1,
"sequence"
);
}
return {
fallback,
parallel,
parallel_object,
par_all,
par_any,
race,
sequence
}

View File

@@ -1,41 +1,28 @@
var parseq = use('parseq')
var parseq = use('parseq', $_.delay)
function load_comment_from_api(id)
{
log.console(`Loading comment #${id}`)
return {
id: id,
title: `Comment #${id}`
function load_comment_from_api_requestor(id) {
return function(cb) {
return $_.delay(() => cb({ id, title: `Comment #${id}` }), 0.5)
// returning the $_.delay return lets them be cancelled up the chain
}
}
/*
only ever expecting a tree in the form
id: number
children:
id: number
...
*/
$_.receiver(tree => {
var requestors = []
for (child of tree.children) {
requestors.push(cb => {
var child_actor = $_.start(undefined, "tests/comments.js")
send(child_actor, child, cb)
var child_reqs = tree.children.map(child => cb => {
$_.start(e => send(e.actor, child, cb), "tests/comments.js")
})
var job = parseq.par_all({
comment: load_comment_from_api_requestor(tree.id),
children: cb => parseq.par_all(child_reqs, /*time*/undefined, /*thr*/10)(cb)
})
job((result, reason) => {
if (!result) {
log.error(reason)
send(tree, reason)
}
parseq.parallel(requestors, undefined, 10)((results, reason) => {
if (results) {
// All children processed
var comment = load_comment_from_api(tree.id)
var full_comment = {
id: tree.id,
children: children_results
}
send(tree, full_comment)
} else {
send(tree, reason);
}
send(tree, { ...result.comment, children: result.children })
})
})

View File

@@ -1,27 +1,35 @@
var tree = {
id: 1,
children: {
id: 2,
},{
id: 3,
children: {
id: 4,
id: 5
id: 100,
children: [
{
id: 101,
children: [
{ id: 102, children: [] },
{ id: 103, children: [] }
]
},
}, {
id: 13,
children: {
id: 14,
id: 15
}
{
id: 104,
children: [
{ id: 105, children: [] }
]
}
]
}
var actor = $_.start(undefined, "tests/comments.js")
var os = use('os')
var st = os.now()
var actor
$_.start(e => {
if (actor) return
actor = e.actor
send(actor, tree, (result, reason) => {
if (reason)
log.console(reason)
else
log.console(json.encode(result))
log.console(`took ${os.now()-st} secs`)
});
}, "tests/comments.js")