Commit 830ff681 authored by Tristan Cavelier's avatar Tristan Cavelier

job features event management changed to be more maintainable

Job queues are now saved in workspace at different moment
parent 3452b61e
......@@ -21,10 +21,10 @@ function JIO(storage_spec, options) {
enableJobMaker(this, shared, options);
enableJobReference(this, shared, options);
enableJobRetry(this, shared, options);
enableJobTimeout(this, shared, options);
enableJobChecker(this, shared, options);
enableJobQueue(this, shared, options);
enableJobRecovery(this, shared, options);
enableJobTimeout(this, shared, options);
enableJobExecuter(this, shared, options);
shared.emit('load');
......
......@@ -13,7 +13,9 @@ function enableJobChecker(jio, shared, options) {
// creates
// - shared.job_rules Array
// uses 'job' event
// uses 'job:new' event
// emits 'job:modified', 'job:start', 'job:resolved',
// 'job:end', 'job:reject' events
var i;
......@@ -22,33 +24,39 @@ function enableJobChecker(jio, shared, options) {
shared.job_rule_actions = {
wait: function (original_job, new_job) {
original_job.promise.always(function () {
shared.emit('job', new_job);
new_job.state = 'ready';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
shared.emit('job:start', new_job);
});
new_job.state = 'waiting';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
},
update: function (original_job, new_job) {
if (!new_job.solver) {
// promise associated to the job
new_job.state = 'done';
shared.emit('jobDone', new_job);
shared.emit('job:resolved', new_job, []); // XXX why resolve?
shared.emit('job:end', new_job);
} else {
if (!original_job.solver) {
original_job.solver = new_job.solver;
} else {
original_job.promise.then(
new_job.command.resolve,
new_job.command.reject
new_job.command.reject,
new_job.command.notify
);
}
}
new_job.state = 'running';
new_job.modified = new Date();
shared.emit('job:modified', new_job);
},
deny: function (original_job, new_job) {
new_job.state = 'fail';
new_job.modified = new Date();
restCommandRejecter(new_job, [
new_job.state = "running";
shared.emit('job:reject', new_job, [
'precondition_failed',
'command denied',
'Command rejected by the job checker.'
......@@ -170,7 +178,7 @@ function enableJobChecker(jio, shared, options) {
}
} else {
// browsing jobs
for (j = 0; j < shared.jobs.length; j += 1) {
for (j = shared.jobs.length - 1; j >= 0; j -= 1) {
if (shared.jobs[j] !== job) {
if (
jobsRespectConditions(
......@@ -235,7 +243,7 @@ function enableJobChecker(jio, shared, options) {
}
}
shared.on('job', checkJob);
shared.on('job:new', checkJob);
}
......
......@@ -4,19 +4,28 @@
function enableJobExecuter(jio, shared) { // , options) {
// uses 'job', 'jobDone', 'jobFail' and 'jobNotify' events
// emits 'jobRun' and 'jobEnd' events
// uses 'job:new' events
// uses actions 'job:resolve', 'job:reject' and 'job:notify'
// listeners
// emits 'job:modified', 'job:started', 'job:resolved',
// 'job:rejected', 'job:notified' and 'job:end' events
// emits action 'job:start'
shared.on('job', function (param) {
function startJobIfReady(job) {
if (job.state === 'ready') {
shared.emit('job:start', job);
}
}
function executeJobIfReady(param) {
var storage;
if (param.state === 'ready') {
param.tried += 1;
param.started = new Date();
param.state = 'running';
param.modified = new Date();
shared.emit('jobRun', param);
shared.emit('job:modified', param);
shared.emit('job:started', param);
try {
storage = createStorage(deepClone(param.storage_spec));
} catch (e) {
......@@ -44,33 +53,47 @@ function enableJobExecuter(jio, shared) { // , options) {
);
});
}
});
}
shared.on('jobDone', function (param, args) {
if (param.state === 'running') {
param.state = 'done';
param.modified = new Date();
shared.emit('jobEnd', param);
if (param.solver) {
restCommandResolver(param, args);
function endAndResolveIfRunning(job, args) {
if (job.state === 'running') {
job.state = 'done';
job.modified = new Date();
shared.emit('job:modified', job);
if (job.solver) {
restCommandResolver(job, args);
}
shared.emit('job:resolved', job, args);
shared.emit('job:end', job);
}
}
});
shared.on('jobFail', function (param, args) {
if (param.state === 'running') {
param.state = 'fail';
param.modified = new Date();
shared.emit('jobEnd', param);
if (param.solver) {
restCommandRejecter(param, args);
function endAndRejectIfRunning(job, args) {
if (job.state === 'running') {
job.state = 'fail';
job.modified = new Date();
shared.emit('job:modified', job);
if (job.solver) {
restCommandRejecter(job, args);
}
shared.emit('job:rejected', job, args);
shared.emit('job:end', job);
}
}
});
shared.on('jobNotify', function (param, args) {
if (param.state === 'running' && param.solver) {
param.solver.notify(args[0]);
function notifyJobIfRunning(job, args) {
if (job.state === 'running' && job.solver) {
job.solver.notify(args[0]);
shared.emit('job:notified', job, args);
}
});
}
// listeners
shared.on('job:new', startJobIfReady);
shared.on('job:start', executeJobIfReady);
shared.on('job:resolve', endAndResolveIfRunning);
shared.on('job:reject', endAndRejectIfRunning);
shared.on('job:notify', notifyJobIfRunning);
}
......@@ -20,10 +20,16 @@ function enableJobMaker(jio, shared, options) {
// - param.options object
// - param.command object
// uses method events
// add emits 'job' events
// list of job events:
// - Job existence -> new, end
// - Job execution -> started, stopped
// - Job resolution -> resolved, rejected, notified, cancelled
// - Job modification -> modified
// the job can emit 'jobDone', 'jobFail' and 'jobNotify'
// emits actions 'job:resolve', 'job:reject' and 'job:notify'
// uses `rest method` events
// emits 'job:new' event
shared.job_keys = arrayExtend(shared.job_keys || [], [
"created",
......@@ -36,48 +42,49 @@ function enableJobMaker(jio, shared, options) {
"options"
]);
function addCommandToJob(param) {
param.command = {};
param.command.resolve = function () {
shared.emit('jobDone', param, arguments);
function addCommandToJob(job) {
job.command = {};
job.command.resolve = function () {
shared.emit('job:resolve', job, arguments);
};
param.command.success = param.command.resolve;
param.command.reject = function () {
shared.emit('jobFail', param, arguments);
job.command.success = job.command.resolve;
job.command.reject = function () {
shared.emit('job:reject', job, arguments);
};
param.command.error = param.command.reject;
param.command.notify = function () {
shared.emit('jobNotify', param, arguments);
job.command.error = job.command.reject;
job.command.notify = function () {
shared.emit('job:notify', job, arguments);
};
param.command.storage = function () {
job.command.storage = function () {
return shared.createRestApi.apply(null, arguments);
};
}
// listeners
shared.rest_method_names.forEach(function (method) {
shared.on(method, function (param) {
function createJobFromRest(param) {
if (param.solver) {
// params are good
shared.emit('job', param);
// rest parameters are good
shared.emit('job:new', param);
}
}
});
});
shared.on('job', function (param) {
// new or recovered job
param.state = 'ready';
if (typeof param.tried !== 'number' || !isFinite(param.tried)) {
param.tried = 0;
function initJob(job) {
job.state = 'ready';
if (typeof job.tried !== 'number' || !isFinite(job.tried)) {
job.tried = 0;
}
if (!param.created) {
param.created = new Date();
if (!job.created) {
job.created = new Date();
}
if (!param.command) {
addCommandToJob(param);
addCommandToJob(job);
job.modified = new Date();
}
param.modified = new Date();
// listeners
shared.rest_method_names.forEach(function (method) {
shared.on(method, createJobFromRest);
});
shared.on('job:new', initJob);
}
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
/*global arrayExtend, localStorage, Workspace, uniqueJSONStringify, JobQueue,
constants, indexOf */
constants, indexOf, setTimeout, clearTimeout */
function enableJobQueue(jio, shared, options) {
......@@ -16,74 +16,88 @@ function enableJobQueue(jio, shared, options) {
// - shared.workspace Workspace
// - shared.job_queue JobQueue
// uses 'job', 'jobRun', 'jobStop', 'jobEnd' events
// emits 'jobEnd' events
// uses 'job:new', 'job:started', 'job:stopped', 'job:modified',
// 'job:notified', 'job:end' events
if (options.job_management !== false) {
shared.job_keys = arrayExtend(shared.job_keys || [], ["id"]);
if (typeof options.workspace !== 'object') {
shared.workspace = localStorage;
} else {
shared.workspace = new Workspace(options.workspace);
}
if (!shared.storage_spec_str) {
shared.storage_spec_str = uniqueJSONStringify(shared.storage_spec);
}
shared.job_queue = new JobQueue(
shared.workspace,
'jio/jobs/' + shared.storage_spec_str,
shared.job_keys
);
// emits 'job:end' event
shared.on('job', function (param) {
if (indexOf(param.state, ['fail', 'done']) === -1) {
if (!param.stored) {
function postJobIfReady(param) {
if (!param.stored && param.state === 'ready') {
clearTimeout(param.queue_ident);
delete param.queue_ident;
shared.job_queue.load();
shared.job_queue.post(param);
shared.job_queue.save();
param.stored = true;
}
}
});
['jobRun', 'jobStop'].forEach(function (event) {
shared.on(event, function (param) {
function deferredPutJob(param) {
if (param.queue_ident === undefined) {
param.queue_ident = setTimeout(function () {
delete param.queue_ident;
if (param.stored) {
shared.job_queue.load();
if (param.state === 'done' || param.state === 'fail') {
if (shared.job_queue.remove(param.id)) {
shared.job_queue.save();
delete param.storad;
}
} else {
shared.job_queue.put(param);
shared.job_queue.save();
}
}
});
});
}
}
shared.on('jobEnd', function (param) {
function removeJob(param) {
clearTimeout(param.queue_ident);
delete param.queue_ident;
if (param.stored) {
shared.job_queue.load();
if (shared.job_queue.remove(param.id)) {
shared.job_queue.remove(param.id);
shared.job_queue.save();
delete param.stored;
delete param.id;
}
}
});
}
shared.on('job', function (param) {
function initJob(param) {
if (!param.command.end) {
param.command.end = function () {
shared.emit('jobEnd', param);
shared.emit('job:end', param);
};
}
});
}
shared.on('job:new', initJob);
if (options.job_management !== false) {
shared.job_keys = arrayExtend(shared.job_keys || [], ["id"]);
if (typeof options.workspace !== 'object') {
shared.workspace = localStorage;
} else {
shared.workspace = new Workspace(options.workspace);
}
if (!shared.storage_spec_str) {
shared.storage_spec_str = uniqueJSONStringify(shared.storage_spec);
}
shared.job_queue = new JobQueue(
shared.workspace,
'jio/jobs/' + shared.storage_spec_str,
shared.job_keys
);
// Listeners
shared.on('job:new', postJobIfReady);
shared.on('job:started', deferredPutJob);
shared.on('job:stopped', deferredPutJob);
shared.on('job:modified', deferredPutJob);
shared.on('job:notified', deferredPutJob);
shared.on('job:end', removeJob);
}
}
......@@ -9,20 +9,23 @@ function enableJobRecovery(jio, shared, options) {
// uses
// - shared.job_queue JobQueue
// emits 'job:new' event
function numberOrDefault(number, default_value) {
return (typeof number === 'number' &&
isFinite(number) ? number : default_value);
}
function recoverJob(param) {
shared.job_queue.load();
shared.job_queue.remove(param.id);
delete param.id;
if (methodType(param.method) === 'writer' ||
param.state === 'ready' ||
if (methodType(param.method) === 'writer' &&
(param.state === 'ready' ||
param.state === 'running' ||
param.state === 'waiting') {
param.state === 'waiting')) {
shared.job_queue.save();
shared.emit('job', param);
shared.emit('job:new', param);
}
}
......
......@@ -6,17 +6,17 @@ function enableJobReference(jio, shared, options) {
// creates
// - shared.jobs Object Array
// uses 'job', 'jobEnd' events
// uses 'job:new' and 'job:end' events
shared.jobs = [];
var job_references = new ReferenceArray(shared.jobs);
shared.on('job', function (param) {
shared.on('job:new', function (param) {
job_references.put(param);
});
shared.on('jobEnd', function (param) {
shared.on('job:end', function (param) {
job_references.remove(param);
});
}
......@@ -27,9 +27,9 @@ function enableJobRetry(jio, shared, options) {
// - param.options object
// - param.command object
// uses 'job' and 'jobRetry' events
// emits 'job', 'jobFail' and 'jobStateChange' events
// job can emit 'jobRetry'
// uses 'job:new' and 'job:retry' events
// emits action 'job:start' event
// emits 'job:retry', 'job:reject', 'job:modified' and 'job:stopped' events
shared.job_keys = arrayExtend(shared.job_keys || [], ["max_retry"]);
......@@ -73,9 +73,7 @@ function enableJobRetry(jio, shared, options) {
2
);
// listeners
shared.on('job', function (param) {
function initJob(param) {
if (typeof param.max_retry !== 'number' || param.max_retry < 0) {
param.max_retry = positiveNumberOrDefault(
param.options.max_retry,
......@@ -84,32 +82,40 @@ function enableJobRetry(jio, shared, options) {
}
param.command.reject = function (status) {
if (constants.http_action[status || 0] === "retry") {
shared.emit('jobRetry', param, arguments);
shared.emit('job:retry', param, arguments);
} else {
shared.emit('jobFail', param, arguments);
shared.emit('job:reject', param, arguments);
}
};
param.command.retry = function () {
shared.emit('jobRetry', param, arguments);
shared.emit('job:retry', param, arguments);
};
});
}
shared.on('jobRetry', function (param, args) {
function retryIfRunning(param, args) {
if (param.state === 'running') {
if (param.max_retry === undefined ||
param.max_retry === null ||
param.max_retry >= param.tried) {
param.state = 'waiting';
param.modified = new Date();
shared.emit('jobStop', param);
shared.emit('job:modified', param);
shared.emit('job:stopped', param);
setTimeout(function () {
param.state = 'ready';
param.modified = new Date();
shared.emit('job', param);
shared.emit('job:modified', param);
shared.emit('job:start', param);
}, min(10000, param.tried * 2000));
} else {
shared.emit('jobFail', param, args);
shared.emit('job:reject', param, args);
}
}
}
});
// listeners
shared.on('job:new', initJob);
shared.on('job:retry', retryIfRunning);
}
......@@ -13,7 +13,9 @@ function enableJobTimeout(jio, shared, options) {
// - param.timeout_ident Timeout
// - param.state string 'running'
// uses 'job', 'jobDone', 'jobFail', 'jobRetry' and 'jobNotify' events
// uses 'job:new', 'job:stopped', 'job:started',
// 'job:notified' and 'job:end' events
// emits 'job:modified' event
shared.job_keys = arrayExtend(shared.job_keys || [], ["timeout"]);
......@@ -38,34 +40,39 @@ function enableJobTimeout(jio, shared, options) {
};
}
// listeners
shared.on('job', function (param) {
if (typeof param.timeout !== 'number' || param.timeout < 0) {
param.timeout = positiveNumberOrDefault(
param.options.timeout,
function initJob(job) {
if (typeof job.timeout !== 'number' || job.timeout < 0) {
job.timeout = positiveNumberOrDefault(
job.options.timeout,
default_timeout
);
}
param.modified = new Date();
});
job.modified = new Date();
shared.emit('job:modified', job);
}
["jobDone", "jobFail", "jobRetry"].forEach(function (event) {
shared.on(event, function (param) {
clearTimeout(param.timeout_ident);
delete param.timeout_ident;
});
});
function clearJobTimeout(job) {
clearTimeout(job.timeout_ident);
delete job.timeout_ident;
}
["jobRun", "jobNotify", "jobEnd"].forEach(function (event) {
shared.on(event, function (param) {
clearTimeout(param.timeout_ident);
if (param.state === 'running' && param.timeout > 0) {
param.timeout_ident = setTimeout(timeoutReject(param), param.timeout);
param.modified = new Date();
function restartJobTimeoutIfRunning(job) {
clearTimeout(job.timeout_ident);
if (job.state === 'running' && job.timeout > 0) {
job.timeout_ident = setTimeout(timeoutReject(job), job.timeout);
job.modified = new Date();
} else {
delete param.timeout_ident;
delete job.timeout_ident;
}
}
});
});
// listeners
shared.on('job:new', initJob);
shared.on("job:stopped", clearJobTimeout);
shared.on("job:end", clearJobTimeout);
shared.on("job:started", restartJobTimeoutIfRunning);
shared.on("job:notified", restartJobTimeoutIfRunning);
}
......@@ -668,8 +668,8 @@
"storage_spec": {"type": "fake", "id": "1 Job Manage"},
"method": "get",
//"created": new Date(),
"tried": 1,
"state": "running",
"tried": 0, // deferred writing 1
"state": "ready", // deferred writing "running"
//"modified": new Date(),
"max_retry": 2,
"timeout": 1200,
......@@ -683,7 +683,7 @@
setTimeout(function () {
commands["1 Job Manage/get"].success({"data": {"b": "c"}});
}, 50); // wait 50 ms
}, 100); // wait 100 ms
setTimeout(function () {
deepEqual(workspace, {}, 'Job ended, empty workspace');
......@@ -703,7 +703,7 @@
o.job1.kwargs._id = 'b';
// o.job1.created = new Date();
// o.job1.modified = new Date();
}, 100); // wait 50 ms
}, 200); // wait 100 ms
setTimeout(function () {
commands["1 Job Manage/get"].storage({
"type": "fake",
......@@ -720,19 +720,23 @@
}, "Second job respond");
});
o.job1.tried = 1;
o.job1.state = 'running';
o.job2 = {
"kwargs": {"_id": "c"},
"options": {},
"storage_spec": {"type": "fake", "id": "2 Job Manage"},
"method": "get",
//"created": new Date(),
"tried": 1,
"state": "running",
"tried": 0, // deferred writing 1
"state": "ready", // deferred writing "running"
//"modified": new Date(),
"max_retry": 2,
"timeout": 10000,
"id": 2
};
tmp = workspace["jio/jobs/{\"id\":\"1 Job Manage\",\"type\":\"fake\"}"];
tmp = JSON.parse(tmp);
delete tmp[0].created;
......@@ -743,20 +747,24 @@
o.job1,
o.job2
], 'Job calls another job, workspace have two jobs');
}, 150); // wait 50 ms
}, 300); // wait 100 ms
setTimeout(function () {
commands['1 Job Manage/get'].end();
tmp = workspace["jio/jobs/{\"id\":\"1 Job Manage\",\"type\":\"fake\"}"];
tmp = JSON.parse(tmp);
delete tmp[0].created;
delete tmp[0].modified;
o.job2.tried = 1;
o.job2.state = 'running';
deepEqual(tmp, [o.job2], 'First Job ended, second still there');
commands['1 Job Manage/get'].success({"data": {"c": "d"}});
commands['2 Job Manage/get'].success({"data": {"d": "e"}});
deepEqual(workspace, {}, 'No more job in the queue');
}, 200); // wait 50 ms
}, 400); // wait 100 ms
});
test('job state running, job recovery', 2, function () {
......@@ -831,8 +839,11 @@
setTimeout(function () {
// copy workspace when job is waiting
commands['Job Recovw/post'].retry();
workspace = jIO.util.deepClone(workspace);
}, 50);
setTimeout(function () {
workspace = jIO.util.deepClone(workspace);
}, 100);
setTimeout(function () {
commands['Job Recovw/post'].success({"id": "a"});
}, 2100);
......@@ -850,7 +861,7 @@
if (commands['Job Recovw/post']) {
ok(false, "Command called, job recovered to earlier");
}
}, 19999);
}, 19889); // need to wait around 19900 ms
setTimeout(function () {
if (!commands['Job Recovw/post']) {
......@@ -865,7 +876,7 @@
start();
deepEqual(workspace, {}, 'No more job in the queue');
}, 20100);
}, 51);
}, 150);
//////////////////////////////
// XXX Waiting for jobs job recovery
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment