Commit d42de18b authored by Alain Takoudjou's avatar Alain Takoudjou Committed by Thomas Gambier

slapos.grid.promise: prevent json file corruption while updating data

history and statistics files are sometimes corrupted because of invalid json

We do 2 things to improve this:
 1. we use json.load to read the previous file (this will detect if the file is corrupted)
 2. we use json.dump to write the entire file at once in a temporary file and then we move this file to be atomic.

Probably this will consume more resources.

See merge request !728
parent 6e26288e
Pipeline #39006 failed with stage
in 0 seconds
...@@ -41,6 +41,7 @@ import traceback ...@@ -41,6 +41,7 @@ import traceback
import psutil import psutil
import inspect import inspect
import hashlib import hashlib
import errno
from datetime import datetime from datetime import datetime
from multiprocessing import Process, Queue as MQueue from multiprocessing import Process, Queue as MQueue
from six.moves import queue, reload_module from six.moves import queue, reload_module
...@@ -57,6 +58,7 @@ from slapos.grid.promise.wrapper import WrapPromise ...@@ -57,6 +58,7 @@ from slapos.grid.promise.wrapper import WrapPromise
from slapos.version import version from slapos.version import version
PROMISE_CACHE_FOLDER_NAME = '.slapgrid/promise/cache' PROMISE_CACHE_FOLDER_NAME = '.slapgrid/promise/cache'
TMP_FOLDER_NAME = '.slapgrid/promise/tmp'
class PromiseError(Exception): class PromiseError(Exception):
pass pass
...@@ -354,6 +356,11 @@ class PromiseLauncher(object): ...@@ -354,6 +356,11 @@ class PromiseLauncher(object):
mkdir_p(self.promise_history_output_dir) mkdir_p(self.promise_history_output_dir)
self._updateFolderOwner() self._updateFolderOwner()
self.tmp_dir = os.path.join(self.partition_folder, TMP_FOLDER_NAME)
if not os.path.exists(self.tmp_dir):
mkdir_p(self.tmp_dir)
self._updateFolderOwner(self.tmp_dir)
def _generatePromiseResult(self, promise_process, promise_name, promise_path, def _generatePromiseResult(self, promise_process, promise_name, promise_path,
message, execution_time=0): message, execution_time=0):
if self.check_anomaly: if self.check_anomaly:
...@@ -450,6 +457,7 @@ class PromiseLauncher(object): ...@@ -450,6 +457,7 @@ class PromiseLauncher(object):
self.promise_history_output_dir, self.promise_history_output_dir,
'%s.history.json' % result.title '%s.history.json' % result.title
) )
tmp_history_file = os.path.join(self.tmp_dir, '%s.history.json' % result.title)
# Remove useless informations # Remove useless informations
result_dict = state_dict.pop('result') result_dict = state_dict.pop('result')
...@@ -458,12 +466,20 @@ class PromiseLauncher(object): ...@@ -458,12 +466,20 @@ class PromiseLauncher(object):
state_dict.pop('path', '') state_dict.pop('path', '')
state_dict.pop('type', '') state_dict.pop('type', '')
state_dict["status"] = "ERROR" if result.item.hasFailed() else "OK" state_dict["status"] = "ERROR" if result.item.hasFailed() else "OK"
history_dict = {
"date": time.time(),
"data": [state_dict]
}
if not os.path.exists(history_file) or not os.stat(history_file).st_size: try:
with open(history_file, 'w') as f: with open(history_file) as f:
f.write("""{ history_dict = json.load(f)
"date": %s, except OSError as e:
"data": %s}""" % (time.time(), json.dumps([state_dict]))) if e.errno != errno.ENOENT:
raise
except ValueError:
# Broken json, use default history_dict
pass
else: else:
previous_state_list = previous_state_dict.get(result.name, None) previous_state_list = previous_state_dict.get(result.name, None)
if previous_state_list is not None: if previous_state_list is not None:
...@@ -473,36 +489,42 @@ class PromiseLauncher(object): ...@@ -473,36 +489,42 @@ class PromiseLauncher(object):
current_sum == checksum: current_sum == checksum:
# Only save the changes and not the same info # Only save the changes and not the same info
return return
state_dict.pop('title', '') state_dict.pop('title', '')
state_dict.pop('name', '') state_dict.pop('name', '')
with open (history_file, mode="r+") as f: history_dict['data'].append(state_dict)
f.seek(0,2)
f.seek(f.tell() -2) with open(tmp_history_file, mode="w") as f:
f.write('%s}' % ',{}]'.format(json.dumps(state_dict))) json.dump(history_dict, f)
os.rename(tmp_history_file, history_file)
def _saveStatisticsData(self, stat_file_path, date, success, error): def _saveStatisticsData(self, stat_file_path, date, success, error):
# csv-like document for success/error statictics # csv-like document for success/error statictics
if not os.path.exists(stat_file_path) or os.stat(stat_file_path).st_size == 0: stat_dict = {
with open(stat_file_path, 'w') as fstat: "date": time.time(),
fstat.write("""{ "data": ["Date, Success, Error, Warning"]
"date": %s, }
"data": ["Date, Success, Error, Warning"]}""" % time.time()) try:
with open(stat_file_path) as f:
stat_dict = json.load(f)
except OSError as e:
if e.errno != errno.ENOENT:
raise
except ValueError:
# Broken json, we use default
pass
tmp_stat_file = os.path.join(self.tmp_dir,
os.path.basename(stat_file_path))
current_state = '%s, %s, %s, %s' % ( current_state = '%s, %s, %s, %s' % (
date, date,
success, success,
error, error,
'') '')
stat_dict['data'].append(current_state)
# append to file with open (tmp_stat_file, mode="w") as f:
# XXX this is bad, it is getting everywhere. json.dump(stat_dict, f)
if current_state: os.rename(tmp_stat_file, stat_file_path)
with open (stat_file_path, mode="r+") as fstat:
fstat.seek(0,2)
position = fstat.tell() -2
fstat.seek(position)
fstat.write('%s}' % ',"{}"]'.format(current_state))
def _loadPromiseResult(self, promise_title): def _loadPromiseResult(self, promise_title):
promise_output_file = os.path.join( promise_output_file = os.path.join(
......
...@@ -566,6 +566,51 @@ class RunPromise(GenericPromise): ...@@ -566,6 +566,51 @@ class RunPromise(GenericPromise):
}""") }""")
def test_runpromise_with_currupted_history(self):
promise_name = 'my_promise'
self.configureLauncher()
self.generatePromiseScript('my_promise.py', success=True, periodicity=0.03)
self.launcher.run()
self.assertSuccessResult(promise_name)
# will fail if history json is broken
self.assertSuccessHistoryResult(promise_name)
history_file = os.path.join(self.partition_dir,
PROMISE_HISTORY_RESULT_FOLDER_NAME,
'%s.history.json' % promise_name)
stats_file = os.path.join(self.partition_dir,
PROMISE_STATE_FOLDER_NAME,
'promise_stats.json')
broken_history = """{
"data": [{
"failed": false,
"message": "success",
"name": "%(name)s.py",
"status": "OK",
"title": "%(name)s"
}, {]
}""" % {'name': promise_name}
with open(history_file, 'w') as f:
f.write(broken_history)
with open(stats_file, 'w') as f:
f.write("""{
"data": ["Date, Success,
""")
time.sleep(4)
with self.assertRaises(ValueError):
self.assertSuccessHistoryResult(promise_name)
with self.assertRaises(ValueError):
self.assertSuccessStatsResult(1)
self.launcher.run()
self.assertSuccessResult(promise_name)
# Promise history json is not broken anymore
self.assertSuccessHistoryResult(promise_name)
# Statistics data file is not broken anymore
self.assertSuccessStatsResult(1)
def test_runpromise_no_logdir(self): def test_runpromise_no_logdir(self):
promise_name = 'my_promise.py' promise_name = 'my_promise.py'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment