From 92781fb29273ab14d995f6889b1c7ee7e65987cd Mon Sep 17 00:00:00 2001 From: Tristan Cavelier <tristan.cavelier@tiolive.com> Date: Tue, 29 Apr 2014 10:58:12 +0200 Subject: [PATCH] replicate synchronizes all storages --- src/jio.storage/replicatestorage.js | 61 ++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/jio.storage/replicatestorage.js b/src/jio.storage/replicatestorage.js index e58f501..eeb4b11 100644 --- a/src/jio.storage/replicatestorage.js +++ b/src/jio.storage/replicatestorage.js @@ -202,7 +202,7 @@ function next() { if (array.length) { try { - value = callback.call(null, array.shift(), array); + value = callback(array.shift(), array); } catch (e) { return fail(e); } @@ -225,6 +225,35 @@ }); } + function doWhile(callback) { + var cancelled, p1 = resolve(), p2; + return new Promise(function (done, fail, notify) { + function next(value) { + if (value) { + try { + value = callback(); + } catch (e) { + return fail(e); + } + if (cancelled) { return; } + if (value && typeof value.then === "function") { + p1 = value; + p2 = value.then(next, fail, notify); + } else { + p2 = p2.then(next.bind(null, value), fail, notify); + } + return; + } + done(); + } + p2 = p1.then(next.bind(null, true)); + }, function () { + cancelled = true; + if (typeof p1.cancel === "function") { p1.cancel(); } + if (typeof p2.cancel === "function") { p2.cancel(); } + }); + } + // ////////////////////////////////////////////////////////////////////// // /** @@ -309,6 +338,13 @@ if (typeof spec.cache_storage === "object" && spec.cache_storage !== null) { this._cache_storage = spec.cache_storage; } + if (typeof spec.batch_length === "number" && isFinite(spec.batch_length) && + spec.batch_length > 0) { + this._batch_length = spec.batch_length; + } else { + this._batch_length = 10; + } + this._cache.batch_index = this._cache.batch_index || 0; } ReplicateStorage.prototype.syncRowFIFO = function (command) { @@ -412,6 +448,29 @@ }); }); p.then(deleteCache, deleteCache); + if (this._cache.batch_index === 0) { + // no global synchronisation is on going + p.then(function () { + return doWhile(function () { + var i = it._cache.batch_index, l = it._batch_length, test = true; + it._cache.batch_index += 1; + return it._allDocs(command, {}, { + "limit": [i * l, l] + }).then(function (answer) { + if (answer.data.total_rows < l) { + test = false; + } + return it._cache.syncRowFIFO; + }).then(function () { + return test; + }); + }); + }).then(null, function () { + return; + }).then(function () { + it._cache.batch_index = 0; + }); + } this._cache.syncRowFIFO = p; return p; }; -- 2.30.9