Commit f2919d25 authored by Cédric Le Ninivin's avatar Cédric Le Ninivin Committed by Cédric Le Ninivin

ReplicateStorage: simplifify code for replication

Creation and modification process now use the same code

/reviewed-on nexedi/jio!29
parent e7f70409
...@@ -189,90 +189,6 @@ ...@@ -189,90 +189,6 @@
}); });
} }
function checkLocalCreation(queue, source, destination, id, options,
getMethod) {
var remote_doc;
queue
.push(function () {
return destination.get(id);
})
.push(function (doc) {
remote_doc = doc;
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
// This document was never synced.
// Push it to the remote storage and store sync information
return;
}
throw error;
})
.push(function () {
// This document was never synced.
// Push it to the remote storage and store sync information
return getMethod(id);
})
.push(function (doc) {
var local_hash = generateHash(stringify(doc)),
remote_hash;
if (remote_doc === undefined) {
return propagateModification(source, destination, doc, local_hash,
id, options);
}
remote_hash = generateHash(stringify(remote_doc));
if (local_hash === remote_hash) {
// Same document
return context._signature_sub_storage.put(id, {
"hash": local_hash
})
.push(function () {
skip_document_dict[id] = null;
});
}
if (options.conflict_ignore === true) {
return;
}
if (options.conflict_force === true) {
return propagateModification(source, destination, doc, local_hash,
id);
}
// Already exists on destination
throw new jIO.util.jIOError("Conflict on '" + id + "': " +
stringify(doc) + " !== " +
stringify(remote_doc),
409);
});
}
function checkBulkLocalCreation(queue, source, destination, id_list,
options) {
queue
.push(function () {
return source.bulk(id_list);
})
.push(function (result_list) {
var i,
sub_queue = new RSVP.Queue();
function getResult(j) {
return function (id) {
if (id !== id_list[j].parameter_list[0]) {
throw new Error("Does not access expected ID " + id);
}
return result_list[j];
};
}
for (i = 0; i < result_list.length; i += 1) {
checkLocalCreation(sub_queue, source, destination,
id_list[i].parameter_list[0],
options, getResult(i));
}
return sub_queue;
});
}
function checkLocalDeletion(queue, destination, id, source) { function checkLocalDeletion(queue, destination, id, source) {
var status_hash; var status_hash;
queue queue
...@@ -312,13 +228,26 @@ ...@@ -312,13 +228,26 @@
function checkSignatureDifference(queue, source, destination, id, function checkSignatureDifference(queue, source, destination, id,
conflict_force, conflict_ignore, conflict_force, conflict_ignore,
getMethod) { is_creation, is_modification,
getMethod, options) {
queue queue
.push(function () { .push(function () {
return RSVP.all([ // Optimisation to save a get call to signature storage
getMethod(id), if (is_creation === true) {
context._signature_sub_storage.get(id) return RSVP.all([
]); getMethod(id),
{hash: undefined}
]);
}
if (is_modification === true) {
return RSVP.all([
getMethod(id),
context._signature_sub_storage.get(id)
]);
}
throw new jIO.util.jIOError("Unexpected call of"
+ " checkSignatureDifference",
409);
}) })
.push(function (result_list) { .push(function (result_list) {
var doc = result_list[0], var doc = result_list[0],
...@@ -354,11 +283,21 @@ ...@@ -354,11 +283,21 @@
return propagateModification(source, destination, doc, return propagateModification(source, destination, doc,
local_hash, id); local_hash, id);
}, function (error) { }, function (error) {
var use_post;
if ((error instanceof jIO.util.jIOError) && if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) { (error.status_code === 404)) {
// Document has been deleted remotely if (is_creation) {
// Remote document does not exists, create it following
// provided options
use_post = options.use_post;
} else {
// Remote document has been erased, put it to save
// modification
use_post = false;
}
return propagateModification(source, destination, doc, return propagateModification(source, destination, doc,
local_hash, id); local_hash, id,
{use_post: use_post});
} }
throw error; throw error;
}); });
...@@ -367,6 +306,7 @@ ...@@ -367,6 +306,7 @@
} }
function checkBulkSignatureDifference(queue, source, destination, id_list, function checkBulkSignatureDifference(queue, source, destination, id_list,
document_status_list, options,
conflict_force, conflict_ignore) { conflict_force, conflict_ignore) {
queue queue
.push(function () { .push(function () {
...@@ -389,7 +329,9 @@ ...@@ -389,7 +329,9 @@
checkSignatureDifference(sub_queue, source, destination, checkSignatureDifference(sub_queue, source, destination,
id_list[i].parameter_list[0], id_list[i].parameter_list[0],
conflict_force, conflict_ignore, conflict_force, conflict_ignore,
getResult(i)); document_status_list[i].is_creation,
document_status_list[i].is_modification,
getResult(i), options);
} }
return sub_queue; return sub_queue;
}); });
...@@ -410,9 +352,11 @@ ...@@ -410,9 +352,11 @@
.push(function (result_list) { .push(function (result_list) {
var i, var i,
local_dict = {}, local_dict = {},
new_list = [], document_list = [],
change_list = [], document_status_list = [],
signature_dict = {}, signature_dict = {},
is_modification,
is_creation,
key; key;
for (i = 0; i < result_list[0].data.total_rows; i += 1) { for (i = 0; i < result_list[0].data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty( if (!skip_document_dict.hasOwnProperty(
...@@ -428,54 +372,46 @@ ...@@ -428,54 +372,46 @@
signature_dict[result_list[1].data.rows[i].id] = i; signature_dict[result_list[1].data.rows[i].id] = i;
} }
} }
for (key in local_dict) {
if (options.check_creation === true) { if (local_dict.hasOwnProperty(key)) {
for (key in local_dict) { is_modification = signature_dict.hasOwnProperty(key)
if (local_dict.hasOwnProperty(key)) { && options.check_modification;
if (!signature_dict.hasOwnProperty(key)) { is_creation = !signature_dict.hasOwnProperty(key)
if (options.use_bulk_get === true) { && options.check_creation;
new_list.push({ if (is_modification === true || is_creation === true) {
method: "get", if (options.use_bulk_get === true) {
parameter_list: [key] document_list.push({
}); method: "get",
} else { parameter_list: [key]
checkLocalCreation(queue, source, destination, key, });
options, source.get.bind(source)); document_status_list.push({
} is_creation: is_creation,
is_modification: is_modification
});
} else {
checkSignatureDifference(queue, source, destination, key,
options.conflict_force,
options.conflict_ignore,
is_creation, is_modification,
source.get.bind(source),
options);
} }
} }
} }
if ((options.use_bulk_get === true) && (new_list.length !== 0)) {
checkBulkLocalCreation(queue, source, destination, new_list,
options);
}
} }
for (key in signature_dict) { if (options.check_deletion === true) {
if (signature_dict.hasOwnProperty(key)) { for (key in signature_dict) {
if (local_dict.hasOwnProperty(key)) { if (signature_dict.hasOwnProperty(key)) {
if (options.check_modification === true) { if (!local_dict.hasOwnProperty(key)) {
if (options.use_bulk_get === true) {
change_list.push({
method: "get",
parameter_list: [key]
});
} else {
checkSignatureDifference(queue, source, destination, key,
options.conflict_force,
options.conflict_ignore,
source.get.bind(source));
}
}
} else {
if (options.check_deletion === true) {
checkLocalDeletion(queue, destination, key, source); checkLocalDeletion(queue, destination, key, source);
} }
} }
} }
} }
if ((options.use_bulk_get === true) && (change_list.length !== 0)) { if ((options.use_bulk_get === true) && (document_list.length !== 0)) {
checkBulkSignatureDifference(queue, source, destination, checkBulkSignatureDifference(queue, source, destination,
change_list, document_list, document_status_list,
options,
options.conflict_force, options.conflict_force,
options.conflict_ignore); options.conflict_ignore);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment