From b0506d5b0b5ab8eb1b605915401127d348d76192 Mon Sep 17 00:00:00 2001 From: Tristan Cavelier <tristan.cavelier@tiolive.com> Date: Fri, 11 Oct 2013 17:29:31 +0200 Subject: [PATCH] jio.js updated --- jio.js | 417 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 252 insertions(+), 165 deletions(-) diff --git a/jio.js b/jio.js index 7fef5e1..f54b52d 100644 --- a/jio.js +++ b/jio.js @@ -402,6 +402,23 @@ defaults.job_rule_conditions = {}; methodType(b.method) === 'writer'; } + /** + * Compare two jobs and test if they use metadata only + * + * @param {Object} a The first job to compare + * @param {Object} b The second job to compare + * @return {Boolean} True if equal, else false + */ + function useMetadataOnly(a, b) { + if (['post', 'put', 'get', 'remove', 'allDocs'].indexOf(a.method) === -1) { + return false; + } + if (['post', 'put', 'get', 'remove', 'allDocs'].indexOf(b.method) === -1) { + return false; + } + return true; + } + /** * Compare two jobs and test if they are readers * @@ -481,6 +498,7 @@ defaults.job_rule_conditions = {}; "sameStorageDescription": sameStorageDescription, "areWriters": areWriters, "areReaders": areReaders, + "useMetadataOnly": useMetadataOnly, "sameMethod": sameMethod, "sameDocumentId": sameDocumentId, "sameParameters": sameParameters, @@ -1189,10 +1207,10 @@ function JIO(storage_spec, options) { enableJobMaker(this, shared, options); enableJobReference(this, shared, options); enableJobRetry(this, shared, options); + enableJobTimeout(this, shared, options); enableJobChecker(this, shared, options); enableJobQueue(this, shared, options); enableJobRecovery(this, shared, options); - enableJobTimeout(this, shared, options); enableJobExecuter(this, shared, options); shared.emit('load'); @@ -2306,7 +2324,9 @@ function enableJobChecker(jio, shared, options) { // creates // - shared.job_rules Array - // uses 'job' event + // uses 'job:new' event + // emits 'job:modified', 'job:start', 'job:resolved', + // 'job:end', 'job:reject' events var i; @@ -2315,33 +2335,39 @@ function enableJobChecker(jio, shared, options) { shared.job_rule_actions = { wait: function (original_job, new_job) { original_job.promise.always(function () { - shared.emit('job', new_job); + new_job.state = 'ready'; + new_job.modified = new Date(); + shared.emit('job:modified', new_job); + shared.emit('job:start', new_job); }); new_job.state = 'waiting'; new_job.modified = new Date(); + shared.emit('job:modified', new_job); }, update: function (original_job, new_job) { if (!new_job.solver) { // promise associated to the job new_job.state = 'done'; - shared.emit('jobDone', new_job); + shared.emit('job:resolved', new_job, []); // XXX why resolve? + shared.emit('job:end', new_job); } else { if (!original_job.solver) { original_job.solver = new_job.solver; } else { original_job.promise.then( new_job.command.resolve, - new_job.command.reject + new_job.command.reject, + new_job.command.notify ); } } new_job.state = 'running'; new_job.modified = new Date(); + shared.emit('job:modified', new_job); }, deny: function (original_job, new_job) { - new_job.state = 'fail'; - new_job.modified = new Date(); - restCommandRejecter(new_job, [ + new_job.state = "running"; + shared.emit('job:reject', new_job, [ 'precondition_failed', 'command denied', 'Command rejected by the job checker.' @@ -2463,7 +2489,7 @@ function enableJobChecker(jio, shared, options) { } } else { // browsing jobs - for (j = 0; j < shared.jobs.length; j += 1) { + for (j = shared.jobs.length - 1; j >= 0; j -= 1) { if (shared.jobs[j] !== job) { if ( jobsRespectConditions( @@ -2498,10 +2524,11 @@ function enableJobChecker(jio, shared, options) { ], "action": "update" }, { - "code_name": "writers update", + "code_name": "metadata writers update", "conditions": [ "sameStorageDescription", "areWriters", + "useMetadataOnly", "sameMethod", "haveDocumentIds", "sameParameters" @@ -2528,7 +2555,7 @@ function enableJobChecker(jio, shared, options) { } } - shared.on('job', checkJob); + shared.on('job:new', checkJob); } @@ -2544,19 +2571,28 @@ function enableJobChecker(jio, shared, options) { function enableJobExecuter(jio, shared) { // , options) { - // uses 'job', 'jobDone', 'jobFail' and 'jobNotify' events - // emits 'jobRun' and 'jobEnd' events + // uses 'job:new' events + // uses actions 'job:resolve', 'job:reject' and 'job:notify' - // listeners + // emits 'job:modified', 'job:started', 'job:resolved', + // 'job:rejected', 'job:notified' and 'job:end' events + // emits action 'job:start' - shared.on('job', function (param) { + function startJobIfReady(job) { + if (job.state === 'ready') { + shared.emit('job:start', job); + } + } + + function executeJobIfReady(param) { var storage; if (param.state === 'ready') { param.tried += 1; param.started = new Date(); param.state = 'running'; param.modified = new Date(); - shared.emit('jobRun', param); + shared.emit('job:modified', param); + shared.emit('job:started', param); try { storage = createStorage(deepClone(param.storage_spec)); } catch (e) { @@ -2584,35 +2620,49 @@ function enableJobExecuter(jio, shared) { // , options) { ); }); } - }); + } - shared.on('jobDone', function (param, args) { - if (param.state === 'running') { - param.state = 'done'; - param.modified = new Date(); - shared.emit('jobEnd', param); - if (param.solver) { - restCommandResolver(param, args); + function endAndResolveIfRunning(job, args) { + if (job.state === 'running') { + job.state = 'done'; + job.modified = new Date(); + shared.emit('job:modified', job); + if (job.solver) { + restCommandResolver(job, args); } + shared.emit('job:resolved', job, args); + shared.emit('job:end', job); } - }); + } - shared.on('jobFail', function (param, args) { - if (param.state === 'running') { - param.state = 'fail'; - param.modified = new Date(); - shared.emit('jobEnd', param); - if (param.solver) { - restCommandRejecter(param, args); + function endAndRejectIfRunning(job, args) { + if (job.state === 'running') { + job.state = 'fail'; + job.modified = new Date(); + shared.emit('job:modified', job); + if (job.solver) { + restCommandRejecter(job, args); } + shared.emit('job:rejected', job, args); + shared.emit('job:end', job); } - }); + } - shared.on('jobNotify', function (param, args) { - if (param.state === 'running' && param.solver) { - param.solver.notify(args[0]); + function notifyJobIfRunning(job, args) { + if (job.state === 'running' && job.solver) { + job.solver.notify(args[0]); + shared.emit('job:notified', job, args); } - }); + } + + // listeners + + shared.on('job:new', startJobIfReady); + shared.on('job:start', executeJobIfReady); + + shared.on('job:resolve', endAndResolveIfRunning); + shared.on('job:reject', endAndRejectIfRunning); + shared.on('job:notify', notifyJobIfRunning); } /*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */ @@ -2637,10 +2687,16 @@ function enableJobMaker(jio, shared, options) { // - param.options object // - param.command object - // uses method events - // add emits 'job' events + // list of job events: + // - Job existence -> new, end + // - Job execution -> started, stopped + // - Job resolution -> resolved, rejected, notified, cancelled + // - Job modification -> modified - // the job can emit 'jobDone', 'jobFail' and 'jobNotify' + // emits actions 'job:resolve', 'job:reject' and 'job:notify' + + // uses `rest method` events + // emits 'job:new' event shared.job_keys = arrayExtend(shared.job_keys || [], [ "created", @@ -2653,55 +2709,56 @@ function enableJobMaker(jio, shared, options) { "options" ]); - function addCommandToJob(param) { - param.command = {}; - param.command.resolve = function () { - shared.emit('jobDone', param, arguments); + function addCommandToJob(job) { + job.command = {}; + job.command.resolve = function () { + shared.emit('job:resolve', job, arguments); }; - param.command.success = param.command.resolve; - param.command.reject = function () { - shared.emit('jobFail', param, arguments); + job.command.success = job.command.resolve; + job.command.reject = function () { + shared.emit('job:reject', job, arguments); }; - param.command.error = param.command.reject; - param.command.notify = function () { - shared.emit('jobNotify', param, arguments); + job.command.error = job.command.reject; + job.command.notify = function () { + shared.emit('job:notify', job, arguments); }; - param.command.storage = function () { + job.command.storage = function () { return shared.createRestApi.apply(null, arguments); }; } + function createJobFromRest(param) { + if (param.solver) { + // rest parameters are good + shared.emit('job:new', param); + } + } + + function initJob(job) { + job.state = 'ready'; + if (typeof job.tried !== 'number' || !isFinite(job.tried)) { + job.tried = 0; + } + if (!job.created) { + job.created = new Date(); + } + addCommandToJob(job); + job.modified = new Date(); + } + // listeners shared.rest_method_names.forEach(function (method) { - shared.on(method, function (param) { - if (param.solver) { - // params are good - shared.emit('job', param); - } - }); + shared.on(method, createJobFromRest); }); - shared.on('job', function (param) { - // new or recovered job - param.state = 'ready'; - if (typeof param.tried !== 'number' || !isFinite(param.tried)) { - param.tried = 0; - } - if (!param.created) { - param.created = new Date(); - } - if (!param.command) { - addCommandToJob(param); - } - param.modified = new Date(); - }); + shared.on('job:new', initJob); } /*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */ /*global arrayExtend, localStorage, Workspace, uniqueJSONStringify, JobQueue, - constants, indexOf */ + constants, indexOf, setTimeout, clearTimeout */ function enableJobQueue(jio, shared, options) { @@ -2717,8 +2774,56 @@ function enableJobQueue(jio, shared, options) { // - shared.workspace Workspace // - shared.job_queue JobQueue - // uses 'job', 'jobRun', 'jobStop', 'jobEnd' events - // emits 'jobEnd' events + // uses 'job:new', 'job:started', 'job:stopped', 'job:modified', + // 'job:notified', 'job:end' events + + // emits 'job:end' event + + function postJobIfReady(param) { + if (!param.stored && param.state === 'ready') { + clearTimeout(param.queue_ident); + delete param.queue_ident; + shared.job_queue.load(); + shared.job_queue.post(param); + shared.job_queue.save(); + param.stored = true; + } + } + + function deferredPutJob(param) { + if (param.queue_ident === undefined) { + param.queue_ident = setTimeout(function () { + delete param.queue_ident; + if (param.stored) { + shared.job_queue.load(); + shared.job_queue.put(param); + shared.job_queue.save(); + } + }); + } + } + + function removeJob(param) { + clearTimeout(param.queue_ident); + delete param.queue_ident; + if (param.stored) { + shared.job_queue.load(); + shared.job_queue.remove(param.id); + shared.job_queue.save(); + delete param.stored; + delete param.id; + } + } + + function initJob(param) { + if (!param.command.end) { + param.command.end = function () { + shared.emit('job:end', param); + }; + } + } + + shared.on('job:new', initJob); if (options.job_management !== false) { @@ -2740,52 +2845,18 @@ function enableJobQueue(jio, shared, options) { shared.job_keys ); - shared.on('job', function (param) { - if (indexOf(param.state, ['fail', 'done']) === -1) { - if (!param.stored) { - shared.job_queue.load(); - shared.job_queue.post(param); - shared.job_queue.save(); - param.stored = true; - } - } - }); + // Listeners - ['jobRun', 'jobStop'].forEach(function (event) { - shared.on(event, function (param) { - if (param.stored) { - shared.job_queue.load(); - if (param.state === 'done' || param.state === 'fail') { - if (shared.job_queue.remove(param.id)) { - shared.job_queue.save(); - delete param.storad; - } - } else { - shared.job_queue.put(param); - shared.job_queue.save(); - } - } - }); - }); + shared.on('job:new', postJobIfReady); - shared.on('jobEnd', function (param) { - if (param.stored) { - shared.job_queue.load(); - if (shared.job_queue.remove(param.id)) { - shared.job_queue.save(); - } - } - }); + shared.on('job:started', deferredPutJob); + shared.on('job:stopped', deferredPutJob); + shared.on('job:modified', deferredPutJob); + shared.on('job:notified', deferredPutJob); - } + shared.on('job:end', removeJob); - shared.on('job', function (param) { - if (!param.command.end) { - param.command.end = function () { - shared.emit('jobEnd', param); - }; - } - }); + } } @@ -2800,20 +2871,23 @@ function enableJobRecovery(jio, shared, options) { // uses // - shared.job_queue JobQueue + // emits 'job:new' event + function numberOrDefault(number, default_value) { return (typeof number === 'number' && isFinite(number) ? number : default_value); } function recoverJob(param) { + shared.job_queue.load(); shared.job_queue.remove(param.id); delete param.id; - if (methodType(param.method) === 'writer' || - param.state === 'ready' || - param.state === 'running' || - param.state === 'waiting') { + if (methodType(param.method) === 'writer' && + (param.state === 'ready' || + param.state === 'running' || + param.state === 'waiting')) { shared.job_queue.save(); - shared.emit('job', param); + shared.emit('job:new', param); } } @@ -2870,17 +2944,17 @@ function enableJobReference(jio, shared, options) { // creates // - shared.jobs Object Array - // uses 'job', 'jobEnd' events + // uses 'job:new' and 'job:end' events shared.jobs = []; var job_references = new ReferenceArray(shared.jobs); - shared.on('job', function (param) { + shared.on('job:new', function (param) { job_references.put(param); }); - shared.on('jobEnd', function (param) { + shared.on('job:end', function (param) { job_references.remove(param); }); } @@ -2914,9 +2988,9 @@ function enableJobRetry(jio, shared, options) { // - param.options object // - param.command object - // uses 'job' and 'jobRetry' events - // emits 'job', 'jobFail' and 'jobStateChange' events - // job can emit 'jobRetry' + // uses 'job:new' and 'job:retry' events + // emits action 'job:start' event + // emits 'job:retry', 'job:reject', 'job:modified' and 'job:stopped' events shared.job_keys = arrayExtend(shared.job_keys || [], ["max_retry"]); @@ -2960,9 +3034,7 @@ function enableJobRetry(jio, shared, options) { 2 ); - // listeners - - shared.on('job', function (param) { + function initJob(param) { if (typeof param.max_retry !== 'number' || param.max_retry < 0) { param.max_retry = positiveNumberOrDefault( param.options.max_retry, @@ -2971,34 +3043,42 @@ function enableJobRetry(jio, shared, options) { } param.command.reject = function (status) { if (constants.http_action[status || 0] === "retry") { - shared.emit('jobRetry', param, arguments); + shared.emit('job:retry', param, arguments); } else { - shared.emit('jobFail', param, arguments); + shared.emit('job:reject', param, arguments); } }; param.command.retry = function () { - shared.emit('jobRetry', param, arguments); + shared.emit('job:retry', param, arguments); }; - }); + } - shared.on('jobRetry', function (param, args) { + function retryIfRunning(param, args) { if (param.state === 'running') { if (param.max_retry === undefined || param.max_retry === null || param.max_retry >= param.tried) { param.state = 'waiting'; param.modified = new Date(); - shared.emit('jobStop', param); + shared.emit('job:modified', param); + shared.emit('job:stopped', param); setTimeout(function () { param.state = 'ready'; param.modified = new Date(); - shared.emit('job', param); + shared.emit('job:modified', param); + shared.emit('job:start', param); }, min(10000, param.tried * 2000)); } else { - shared.emit('jobFail', param, args); + shared.emit('job:reject', param, args); } } - }); + } + + // listeners + + shared.on('job:new', initJob); + + shared.on('job:retry', retryIfRunning); } /*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */ @@ -3016,7 +3096,9 @@ function enableJobTimeout(jio, shared, options) { // - param.timeout_ident Timeout // - param.state string 'running' - // uses 'job', 'jobDone', 'jobFail', 'jobRetry' and 'jobNotify' events + // uses 'job:new', 'job:stopped', 'job:started', + // 'job:notified' and 'job:end' events + // emits 'job:modified' event shared.job_keys = arrayExtend(shared.job_keys || [], ["timeout"]); @@ -3041,36 +3123,41 @@ function enableJobTimeout(jio, shared, options) { }; } - // listeners - - shared.on('job', function (param) { - if (typeof param.timeout !== 'number' || param.timeout < 0) { - param.timeout = positiveNumberOrDefault( - param.options.timeout, + function initJob(job) { + if (typeof job.timeout !== 'number' || job.timeout < 0) { + job.timeout = positiveNumberOrDefault( + job.options.timeout, default_timeout ); } - param.modified = new Date(); - }); + job.modified = new Date(); + shared.emit('job:modified', job); + } - ["jobDone", "jobFail", "jobRetry"].forEach(function (event) { - shared.on(event, function (param) { - clearTimeout(param.timeout_ident); - delete param.timeout_ident; - }); - }); + function clearJobTimeout(job) { + clearTimeout(job.timeout_ident); + delete job.timeout_ident; + } - ["jobRun", "jobNotify", "jobEnd"].forEach(function (event) { - shared.on(event, function (param) { - clearTimeout(param.timeout_ident); - if (param.state === 'running' && param.timeout > 0) { - param.timeout_ident = setTimeout(timeoutReject(param), param.timeout); - param.modified = new Date(); - } else { - delete param.timeout_ident; - } - }); - }); + function restartJobTimeoutIfRunning(job) { + clearTimeout(job.timeout_ident); + if (job.state === 'running' && job.timeout > 0) { + job.timeout_ident = setTimeout(timeoutReject(job), job.timeout); + job.modified = new Date(); + } else { + delete job.timeout_ident; + } + } + + // listeners + + shared.on('job:new', initJob); + + shared.on("job:stopped", clearJobTimeout); + shared.on("job:end", clearJobTimeout); + + shared.on("job:started", restartJobTimeoutIfRunning); + shared.on("job:notified", restartJobTimeoutIfRunning); } /*jslint indent: 2, maxlen: 80, sloppy: true */ -- 2.30.9