Commit 291cf6ef authored by Jondy Zhao's avatar Jondy Zhao

Use sqlite3 db to save local data

parent 76aca390
...@@ -26,95 +26,16 @@ ...@@ -26,95 +26,16 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# #
############################################################################## ##############################################################################
#
# Paramters:
#
# computer_id, which computer sends this report
#
# server_name, if it's not empty, only the net drives in this server
# will be involved in the report.
#
# report_internal, in seconds
#
# data_file, save report state and unsending records last time, pickle format.
#
# main process
#
# read data from data file (pickle)
#
# check start state, there are 3 cases
#
# case 1: first report
#
# Send init report to server
#
# case 2: normal report
#
# case 3: unexpected error
#
# Request server to send back last report information
#
# If there are unsend reports in json file, send them first
#
# Start sending report thread, the thread will enter loop as the following:
#
# Are there unsend records?
#
# Yes, send those.
#
# Resend failed, insert the records in the current buffer to unsending queue.
#
# No.
#
# Send records in the current buffer
#
# Failed, insert all the records in the currnet buffer to unsending queue.
#
# If main process quit?
#
# Enter main loop:
#
# If report interval is out,
#
# Yes, insert a record to the current bffer
#
# No, sleep for (report_interval / 2 + 1) seconds
#
# If user wants to quit
#
# Yes, break main loop
#
# After quit from main loop,
#
# Stop sending report thread
#
# Save unsending queue, report id and state to pickle file.
#
#
# Interface of computer:
#
# reportNetDriveUsage will post a list of record:
#
# (computer_id, sequence_no, timestamp, duration,
# domain, user, usage, remark)
#
# getReportSequenceNo?computer_id=XXXX
#
import argparse import argparse
from datetime import datetime import datetime
import logger import logger
import netuse import netuse
import os.path import os.path
import pickle import pickle
import Queue
import slapos.slap.slap import slapos.slap.slap
import sys import sys
from time import sleep from time import sleep
REPORT_STATE_INIT = 0
REPORT_STATE_RUNNING = 1
REPORT_STATE_STOP = 2
def parseArgumentTuple(): def parseArgumentTuple():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--master-url", parser.add_argument("--master-url",
...@@ -151,18 +72,16 @@ def parseArgumentTuple(): ...@@ -151,18 +72,16 @@ def parseArgumentTuple():
class NetDriveUsageReporter(object): class NetDriveUsageReporter(object):
queue_size = 512
def __init__(self, option_dict): def __init__(self, option_dict):
for option, value in option_dict.items(): for option, value in option_dict.items():
setattr(self, option, value) setattr(self, option, value)
self.slap = slapos.slap.slap() self.slap = slapos.slap.slap()
self.slap_computer = None self.slap_computer = None
self.report_sequence_no = 0
self._queue = None
self._state_file = self.data_file + ".state"
self._domain_name = None self._domain_name = None
self._domain_account = None self._domain_account = None
self._config_id = None
self._report_date = None
self._db = initializeDatabase(self.data_file)
def initializeConnection(self): def initializeConnection(self):
connection_dict = {} connection_dict = {}
...@@ -172,18 +91,25 @@ class NetDriveUsageReporter(object): ...@@ -172,18 +91,25 @@ class NetDriveUsageReporter(object):
**connection_dict) **connection_dict)
self.slap_computer = self.slap.registerComputer(self.computer_id) self.slap_computer = self.slap.registerComputer(self.computer_id)
def _getUserInfo(self): def initializeConfigData(self):
user_info = netuser.userInfo() user_info = netuser.userInfo()
self._domain_name = user_info[1] self._domain_account = "%s\\%s" % user_info[0:2]
self._domain_account = user_info[0]
q = self._db.execute
s = "SELECT _rowid, report_date FROM config WHERE domain_account=? and computer_id=?"
r =
for r in q(s, (self._domain_account, self.computer_id)):
self._config_id, self._report_date = r
else:
q("INSERT OR REPLACE INTO config(domain_account, computer_id, report_date)"
" VALUES (?,?,?)",
(self._domain_account, self.computer_id, datetime.now()))
for r in q(s, (self._domain_account, self.computer_id)):
self._config_id, self._report_date = r
def run(self): def run(self):
self._getUserInfo() self.initializeConfigData()
self.initializeConnection() self.initializeConnection()
self._loadReportState()
self._sendReportInQueue()
self.report_state = REPORT_STATE_RUNNING
pickle.dump(self.report_state, self._state_file)
current_timestamp = datetime.now() current_timestamp = datetime.now()
try: try:
while True: while True:
...@@ -194,41 +120,10 @@ class NetDriveUsageReporter(object): ...@@ -194,41 +120,10 @@ class NetDriveUsageReporter(object):
continue continue
r = self.getUsageReport(d.seconds, current_timestamp) r = self.getUsageReport(d.seconds, current_timestamp)
current_timestamp = last_timestamp current_timestamp = last_timestamp
if not self.sendUsageReport(r): self.insertRecord(r[0], r[1], r[2], r[3])
break self.sendReport()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
self._saveReportState()
self.report_state = REPORT_STATE_STOP
pickle.dump(self.report_state, self._state_file)
def _loadReportState(self):
if not os.path.exists(self.data_file):
s = {
"sequence-no" : -1,
"computer-id" : self.computer_id,
"queue" : [],
}
pickle.dump(s, self.data_file)
else:
s = pickle.load(self.data_file)
if not s.get("computer-id", "") == self.computer_id:
pass # data file is not expected
if s.get("state", None) == REPORT_STATE_RUNNING:
pass # get sequence no from master sever
self.report_sequence_no = s["sequence-no"]
self._queue = s["queue"]
def _saveReportState(self):
s = {
"computer-id" : self.computer_id,
"sequence-no" : self.report_sequence_no,
"queue" : self._queue,
}
pickle.dump(s, self.data_file)
def getUsageReport(self, duration, timestamp=None): def getUsageReport(self, duration, timestamp=None):
if timestamp is None: if timestamp is None:
...@@ -245,26 +140,73 @@ class NetDriveUsageReporter(object): ...@@ -245,26 +140,73 @@ class NetDriveUsageReporter(object):
r.append("\n".join(remark)) r.append("\n".join(remark))
return r return r
def sendUsageReport(self, r): def sendReport(self):
self._queue[0:0] = [r] # If report_date is not today, then
return self._sendReportInQueue() # Generate xml data from local table by report_date
# Send xml data to master node
def _postData(self, r): # Change report_date to today
# (Optional) Move all the reported data to histroy table
today = datetime.now()
if self._report_date < today:
xml_data = self.generateDailyReport()
self._postData(xml_data)
self._db.execute("UPDATE config SET report_date=? where _rowid=?",
(today, self._config_id))
def _postData(self, xml_data):
"""Send a marshalled dictionary of the net drive usage record """Send a marshalled dictionary of the net drive usage record
serialized via_getDict. serialized via_getDict.
""" """
self.slap_computer.reportNetDriveUsage(r) self.slap_computer.reportNetDriveUsage(xml_data)
def _sendReportInQueue(self): def initializeDatabase(self, db_path):
try: self._db = sqlite3.connect(db_path, isolation_level=None)
while True: q = self._db.execute
r = self._queue[-1] q("""CREATE TABLE IF NOT EXISTS config (
if not self._postData(r): domain_account TEXT PRIMARY KEY,
return False computer_id TEXT NOT NULL,
self._queue.pop() report_date TEXT NOT NULL,
except IndexError: remark TEXT)""")
q("""CREATE TABLE IF NOT EXISTS net_drive_usage (
id INTEGER PRIMARY KEY,
config_id INTEGER REFERENCES config ( _rowid ),
drive_letter TEXT NOT NULL,
remote_folder TEXT NOT NULL,
start_timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
duration FLOAT NOT NULL,
usage_bytes INTEGER,
remark TEXT)""")
q("""CREATE TABLE IF NOT EXISTS net_drive_usage_history (
id INTEGER PRIMARY KEY,
config_id INTEGER REFERENCES config ( _rowid ),
drive_letter TEXT NOT NULL,
remote_folder TEXT NOT NULL,
start_timestamp TEXT NOT NULL,
duration FLOAT NOT NULL,
bytes INTEGER
remark TEXT)""")
def insertRecord(self, drive_letter, remote_foler, duration, usage_bytes):
self._db.execute("INSERT INTO net_drive_usage "
"(config_id, drive_letter, remote_folder, duration, usage_bytes )"
" VALUES (?, ?, ?, ?, ?)",
(self._config_id, drive_letter, remote_folder, duration, usage_bytes))
def generateDailyReport(self, report_date=None, remove=False):
if report_date is None:
report_date = self._report_date
q = self._db.execute
xml_data = ""
for r in q("SELECT * FROM net_drive_usage WHERE start_timestamp=?", report_date):
pass pass
return True if remove:
q("INSERT INTO net_drive_usage_history "
"SELECT * FROM net_drive_usage WHERE start_timestamp=?",
report_date)
q("DELETE FROM net_drive_usage WHERE start_timestamp=?",
report_date)
return xml_data
def main(): def main():
reporter = NetDriveUsageReporter(parseArgumentTuple()) reporter = NetDriveUsageReporter(parseArgumentTuple())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment