__init__.py 8.51 KB
Newer Older
Marco Mariani's avatar
Marco Mariani committed
1
# -*- coding: utf-8 -*-
Antoine Catton's avatar
Antoine Catton committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
Marco Mariani's avatar
Marco Mariani committed
28 29

import argparse
30
import errno
31
from six.moves import dbm_gnu as gdbm
Marco Mariani's avatar
Marco Mariani committed
32
import json
33
from lockfile import LockFile
Marco Mariani's avatar
Marco Mariani committed
34 35 36
import logging
import logging.handlers
import os
Antoine Catton's avatar
Antoine Catton committed
37
import signal
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
38
import socket
Antoine Catton's avatar
Antoine Catton committed
39
import subprocess
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
40
import sys
41 42
from six.moves import socketserver
import io
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
43 44
import threading

45 46 47 48 49 50 51 52
try:
  logging_levels = logging._nameToLevel
  logging_choices = logging_levels.keys()
except AttributeError:
  logging_levels = logging._levelNames
  logging_choices = [i for i in logging_levels
                     if isinstance(i, str)]

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
# Copied from erp5.util:erp5/util/testnode/ProcessManager.py
def subprocess_capture(p, log, log_prefix, get_output=True):
  def readerthread(input, output, buffer):
    while True:
      data = input.readline()
      if not data:
        break
      if get_output:
        buffer.append(data)
      if log_prefix:
        data = "%s : " % log_prefix +  data
      data = data.rstrip('\n')
      output(data)
  if p.stdout:
    stdout = []
    stdout_thread = threading.Thread(target=readerthread,
                                     args=(p.stdout, log, stdout))
    stdout_thread.daemon = True
    stdout_thread.start()
  if p.stderr:
    stderr = []
    stderr_thread = threading.Thread(target=readerthread,
                                     args=(p.stderr, log, stderr))
    stderr_thread.daemon = True
    stderr_thread.start()
  p.wait()
  if p.stdout:
    stdout_thread.join()
  if p.stderr:
    stderr_thread.join()
  return (p.stdout and ''.join(stdout),
          p.stderr and ''.join(stderr))
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
85

86
class EqueueServer(socketserver.ThreadingUnixStreamServer):
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
87 88 89 90 91

  daemon_threads = True

  def __init__(self, *args, **kw):
    self.options = kw.pop('equeue_options')
92 93 94
    socketserver.ThreadingUnixStreamServer.__init__(self,
                                                    RequestHandlerClass=None,
                                                    *args, **kw)
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
95 96 97
    # Equeue Specific elements
    self.setLogger(self.options.logfile[0], self.options.loglevel[0])
    self.setDB(self.options.database[0])
98 99
    if getattr(self.options, 'takeover_triggered_file_path', None):
      self.takeover_triggered_file_path = self.options.takeover_triggered_file_path[0]
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
100
    # Lock to only have one command running at the time
101
    self.thread_lock = threading.Lock()
102
    # Lockfile is used by other commands to know if an import is ongoing.
103
    self.lockfile = LockFile(self.options.lockfile)
104
    self.lockfile.break_lock()
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
105 106 107 108 109

  def setLogger(self, logfile, loglevel):
    self.logger = logging.getLogger("EQueue")
    handler = logging.handlers.WatchedFileHandler(logfile, mode='a')
    # Natively support logrotate
110
    level = logging_levels.get(loglevel, logging.INFO)
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
111 112 113 114 115 116
    self.logger.setLevel(level)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    self.logger.addHandler(handler)

  def setDB(self, database):
117
    self.db = gdbm.open(database, 'cs', 0o700)
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
118

119 120 121 122 123 124
  def _hasTakeoverBeenTriggered(self):
    if hasattr(self, 'takeover_triggered_file_path') and \
       os.path.exists(self.takeover_triggered_file_path):
      return True
    return False

125
  def _runCommandIfNeeded(self, command, timestamp):
126
    with self.thread_lock as thread_lock, self.lockfile as lockfile:
127 128 129
      if self._hasTakeoverBeenTriggered():
        self.logger.info('Takeover has been triggered, preventing to run import script.')
        return
130 131 132 133 134 135
      cmd_list = command.split('\0')
      cmd_readable = ' '.join(cmd_list)
      cmd_executable = cmd_list[0]

      if cmd_executable in self.db and timestamp <= int(self.db[cmd_executable]):
        self.logger.info("%s already run.", cmd_readable)
136 137
        return

138
      self.logger.info("Running %s, %s with output:", cmd_readable, timestamp)
139
      try:
140 141
        sys.stdout.flush()
        p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE,
142
                             stderr=subprocess.PIPE, universal_newlines=True)
143 144 145 146 147 148
        subprocess_capture(p, self.logger.info, '', True)
        if p.returncode == 0:
          self.logger.info("%s finished successfully.", cmd_readable)
          self.db[cmd_executable] = str(timestamp)
        else:
          self.logger.warning("%s exited with status %s." % (cmd_readable, p.returncode))
149 150
      except subprocess.CalledProcessError as e:
        self.logger.warning("%s exited with status %s. output is: \n %s" % (
151
            cmd_readable,
152 153 154 155
            e.returncode,
            e.output,
        ))

Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
156 157 158 159
  def process_request_thread(self, request, client_address):
    # Handle request
    self.logger.debug("Connection with file descriptor %d", request.fileno())
    request.settimeout(self.options.timeout)
160
    request_string = io.StringIO()
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    segment = None
    try:
      while segment != '':
        segment = request.recv(1024)
        request_string.write(segment)
    except socket.timeout:
      pass

    command = '127'
    try:
      request_parameters = json.loads(request_string.getvalue())
      timestamp = request_parameters['timestamp']
      command = str(request_parameters['command'])
      self.logger.info("New command %r at %s", command, timestamp)

    except (ValueError, IndexError) :
      self.logger.warning("Error during the unserialization of json "
                          "message of %r file descriptor. The message "
                          "was %r", request.fileno(), request_string.getvalue())

    try:
      request.send(command)
183
    except Exception:
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
184 185
      self.logger.warning("Couldn't respond to %r", request.fileno())
    self.close_request(request)
186
    self._runCommandIfNeeded(command, timestamp)
187 188 189 190 191
# Well the following function is made for schrodinger's files,
# It will work if the file exists or not
def remove_existing_file(path):
  try:
    os.remove(path)
192
  except OSError as e:
193 194
    if e.errno != errno.ENOENT:
      raise
Antoine Catton's avatar
Antoine Catton committed
195

196 197 198 199 200
def main():
  parser = argparse.ArgumentParser(
    description="Run a single threaded execution queue.")
  parser.add_argument('--database', nargs=1, required=True,
                      help="Path to the database where the last "
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
201
                      "calls are stored")
202 203
  parser.add_argument('--loglevel', nargs=1,
                      default='INFO',
204
                      choices=logging_choices,
205 206 207
                      required=False)
  parser.add_argument('-l', '--logfile', nargs=1, required=True,
                      help="Path to the log file.")
208 209
  parser.add_argument('-t', '--timeout', nargs=1, required=False,
                      dest='timeout', type=int, default=3)
210 211
  parser.add_argument('--lockfile',
                      help="Path to the lock file created when a command is run")
212 213
  parser.add_argument('--takeover-triggered-file-path', nargs=1, required=False,
                      help="Path to the file created by takeover script to state that it has been triggered.")
214 215 216 217 218
  parser.add_argument('socket', help="Path to the unix socket")

  args = parser.parse_args()

  socketpath = args.socket
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
219 220 221 222

  signal.signal(signal.SIGHUP, lambda *args: sys.exit(-1))
  signal.signal(signal.SIGTERM, lambda *args: sys.exit())

223
  remove_existing_file(socketpath)
224 225 226 227 228
  try:
    server = EqueueServer(socketpath, **{'equeue_options':args})
    server.logger.info("Starting server on %r", socketpath)
    server.serve_forever()
  finally:
229
    remove_existing_file(socketpath)
230
    os.kill(0, 9)
231 232 233

if __name__ == '__main__':
  main()