slapgrid.py 85.6 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# vim: set et sts=2:
Łukasz Nowak's avatar
Łukasz Nowak committed
3 4
##############################################################################
#
5 6
# Copyright (c) 2010, 2011, 2012 Vifib SARL and Contributors.
# All Rights Reserved.
Łukasz Nowak's avatar
Łukasz Nowak committed
7 8 9 10 11
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
12
# guarantees and support are strongly advised to contract a Free Software
Łukasz Nowak's avatar
Łukasz Nowak committed
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
30

Łukasz Nowak's avatar
Łukasz Nowak committed
31 32
import os
import pkg_resources
33
import random
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
34
import socket
Bryton Lacquement's avatar
Bryton Lacquement committed
35
from io import BytesIO
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
36 37 38 39
import sys
import tempfile
import time
import traceback
Łukasz Nowak's avatar
Łukasz Nowak committed
40
import warnings
41
import logging
42
import json
43
import shutil
Bryton Lacquement's avatar
Bryton Lacquement committed
44
import six
45
import errno
46
import pwd
47

48 49 50 51 52
if six.PY3:
  import subprocess
else:
  import subprocess32 as subprocess

Łukasz Nowak's avatar
Łukasz Nowak committed
53
if sys.version_info < (2, 6):
Marco Mariani's avatar
Marco Mariani committed
54
  warnings.warn('Used python version (%s) is old and has problems with'
Łukasz Nowak's avatar
Łukasz Nowak committed
55 56
      ' IPv6 connections' % sys.version.split('\n')[0])

57 58
from requests.exceptions import RequestException

Marco Mariani's avatar
Marco Mariani committed
59 60
from lxml import etree

61
from slapos import manager as slapmanager
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
62 63
from slapos.slap.slap import NotFoundError
from slapos.slap.slap import ServerError
64
from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME
65 66 67 68 69
from slapos.util import (mkdir_p,
                        chownDirectory,
                        string_to_boolean,
                        listifdir,
                        unicode2str)
Marco Mariani's avatar
Marco Mariani committed
70
from slapos.grid.exception import BuildoutFailedError
71
from slapos.grid.SlapObject import Software, Partition
72 73 74
from slapos.grid.svcbackend import (launchSupervisord,
                                    createSupervisordConfiguration,
                                    _getSupervisordConfigurationDirectory,
75 76
                                    _getSupervisordSocketPath,
                                    getSupervisorRPC)
77 78 79
from slapos.grid.utils import (md5digest,
                              dropPrivileges,
                              SlapPopen,
80 81
                              updateFile,
                              getCleanEnvironment)
82 83
from slapos.grid.promise import PromiseLauncher, PromiseError
from slapos.grid.promise.generic import PROMISE_LOG_FOLDER_NAME
84
from slapos.human import human2bytes
Marco Mariani's avatar
Marco Mariani committed
85
import slapos.slap
86
from netaddr import valid_ipv4, valid_ipv6
Łukasz Nowak's avatar
Łukasz Nowak committed
87 88


Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
89
# XXX: should be moved to SLAP library
90
COMPUTER_PARTITION_DESTROYED_STATE = 'destroyed'
91 92
COMPUTER_PARTITION_STARTED_STATE = 'started'
COMPUTER_PARTITION_STOPPED_STATE = 'stopped'
Łukasz Nowak's avatar
Łukasz Nowak committed
93

94 95 96 97
# Global variables about return state of slapgrid
SLAPGRID_SUCCESS = 0
SLAPGRID_FAIL = 1
SLAPGRID_PROMISE_FAIL = 2
98
SLAPGRID_OFFLINE_SUCCESS = 3
99
PROMISE_TIMEOUT = 20
100

101
COMPUTER_PARTITION_TIMESTAMP_FILENAME = '.timestamp'
102
COMPUTER_PARTITION_LATEST_BANG_TIMESTAMP_FILENAME = '.slapos_latest_bang_timestamp'
103
COMPUTER_PARTITION_INSTALL_ERROR_FILENAME = '.slapgrid-%s-error.log'
104
COMPUTER_PARTITION_WAIT_LIST_FILENAME = '.slapos-report-wait-service-list'
105

106 107 108
# XXX hardcoded watchdog_path
WATCHDOG_PATH = '/opt/slapos/bin/slapos-watchdog'

109 110 111 112 113

class _formatXMLError(Exception):
  pass


114 115 116 117 118 119 120 121 122 123 124 125
class FPopen(subprocess.Popen):
  def __init__(self, *args, **kwargs):
    kwargs['stdin'] = subprocess.PIPE
    kwargs['stderr'] = subprocess.STDOUT
    kwargs.setdefault('stdout', subprocess.PIPE)
    kwargs.setdefault('close_fds', True)
    kwargs.setdefault('shell', True)
    subprocess.Popen.__init__(self, *args, **kwargs)
    self.stdin.flush()
    self.stdin.close()
    self.stdin = None

126 127 128
def check_missing_parameters(options):
  required = set([
      'computer_id',
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
129
      # XXX: instance_root is better named "partition_root"
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
      'instance_root',
      'master_url',
      'software_root',
  ])

  if 'key_file' in options:
    required.add('certificate_repository_path')
    required.add('cert_file')
  if 'cert_file' in options:
    required.add('certificate_repository_path')
    required.add('key_file')

  missing = required.difference(options)

  if missing:
    raise RuntimeError('Missing mandatory parameters: %s' % ', '.join(sorted(missing)))

147 148 149 150 151 152 153 154
  # parameter can NOT be empty string or None
  for option in required:
    if not options.get(option):
      missing.add(option)

  if missing:
    raise RuntimeError('Mandatory parameters present but empty: %s' % ', '.join(sorted(missing)))

155

156 157
def check_missing_files(options):
  req_files = [
Marco Mariani's avatar
Marco Mariani committed
158 159 160
      options.get('key_file'),
      options.get('cert_file'),
      options.get('master_ca_file'),
161
      options.get('shacache-ca-file'),
Marco Mariani's avatar
Marco Mariani committed
162 163
      options.get('shacache-cert-file'),
      options.get('shacache-key-file'),
164
      options.get('shadir-ca-file'),
Marco Mariani's avatar
Marco Mariani committed
165 166
      options.get('shadir-cert-file'),
      options.get('shadir-key-file'),
167 168
      options.get('signature-private-key-file',
        options.get('signature_private_key_file')),
Marco Mariani's avatar
Marco Mariani committed
169
  ]
170 171

  req_dirs = [
Marco Mariani's avatar
Marco Mariani committed
172 173
      options.get('certificate_repository_path')
  ]
174 175 176

  for f in req_files:
    if f and not os.path.exists(f):
Marco Mariani's avatar
Marco Mariani committed
177
      raise RuntimeError('File %r does not exist.' % f)
178 179 180 181 182 183

  for d in req_dirs:
    if d and not os.path.isdir(d):
      raise RuntimeError('Directory %r does not exist' % d)


184 185
def merged_options(args, configp):
  options = dict(configp.items('slapos'))
186

187 188
  if configp.has_section('networkcache'):
    options.update(dict(configp.items('networkcache')))
Bryton Lacquement's avatar
Bryton Lacquement committed
189
  for key, value in six.iteritems(vars(args)):
190 191 192
    if value is not None:
      options[key] = value

Marco Mariani's avatar
Marco Mariani committed
193 194 195 196 197
  if options.get('all'):
    options['develop'] = True

  # Parse cache / binary cache options
  # Backward compatibility about "binary-cache-url-blacklist" deprecated option
Marco Mariani's avatar
Marco Mariani committed
198 199
  if (options.get("binary-cache-url-blacklist") and not
        options.get("download-from-binary-cache-url-blacklist")):
Marco Mariani's avatar
Marco Mariani committed
200 201 202 203 204 205 206 207
    options["download-from-binary-cache-url-blacklist"] = \
        options["binary-cache-url-blacklist"]
  options["download-from-binary-cache-url-blacklist"] = [
      url.strip() for url in options.get(
          "download-from-binary-cache-url-blacklist", "").split('\n') if url]
  options["upload-to-binary-cache-url-blacklist"] = [
      url.strip() for url in options.get(
          "upload-to-binary-cache-url-blacklist", "").split('\n') if url]
208 209 210
  options["download-from-binary-cache-force-url-list"] = [
      url.strip() for url in options.get(
          "download-from-binary-cache-force-url-list", "").split('\n') if url]
Marco Mariani's avatar
Marco Mariani committed
211

212 213 214 215 216 217
  options['firewall'] = {}
  if configp.has_section('firewall'):
    options['firewall'] = dict(configp.items('firewall'))
    options['firewall']["authorized_sources"] = [
        source.strip() for source in options['firewall'].get(
            "authorized_sources", "").split('\n') if source]
218 219 220
    options['firewall']['firewall_cmd'] = options['firewall'].get(
            "firewall_cmd", "firewall-cmd")
    options['firewall']['firewall_executable'] = options['firewall'].get(
221
            "firewall_executable", "")
222 223
    options['firewall']['dbus_executable'] = options['firewall'].get(
            "dbus_executable", "")
224 225
    options['firewall']['reload_config_cmd'] = options['firewall'].get(
            "reload_config_cmd",
226
            "slapos node restart firewall")
227

228
  key = 'manager'
229 230 231
  options[key] = {}
  for section in configp.sections():
    if section.startswith(key):
232
      manager = section[len(key)+1:]
233
      options[key][manager] = dict(configp.items(section))
234 235 236
  return options


237
def random_delay(options, logger):
238 239 240 241
  """
  Sleep for a random time to avoid SlapOS Master being DDOSed by an army of
  SlapOS Nodes configured with cron.
  """
242 243
  if options['now']:
    # XXX-Cedric: deprecate '--now'
244 245
    return

246
  maximal_delay = int(options.get('maximal_delay', '0'))
247 248
  if maximal_delay:
    duration = random.randint(1, maximal_delay)
Marco Mariani's avatar
Marco Mariani committed
249 250
    logger.info('Sleeping for %s seconds. To disable this feature, '
                'check --now parameter in slapgrid help.', duration)
251 252 253
    time.sleep(duration)


254
def create_slapgrid_object(options, logger):
255 256 257 258
  signature_certificate_list = None
  if 'signature-certificate-list' in options:
    cert_marker = '-----BEGIN CERTIFICATE-----'
    signature_certificate_list = [
Marco Mariani's avatar
Marco Mariani committed
259 260 261 262
        cert_marker + '\n' + q.strip()
        for q in options['signature-certificate-list'].split(cert_marker)
        if q.strip()
    ]
263 264

  op = options
265 266
  software_min_free_space = human2bytes(op.get('software_min_free_space', '1000M'))
  instance_min_free_space = human2bytes(op.get('instance_min_free_space', '1000M'))
267

268 269 270 271 272 273
  # Nicely check and convert partition_timeout, in order to support slapos.cfg
  # provided, which is string and command line, which is int, and in the same
  # time resort to proper None as default - no timeout
  partition_timeout = op.get('partition_timeout', op.get('partition-timeout'))
  if partition_timeout is not None:
    partition_timeout = int(partition_timeout)
274 275
  return Slapgrid(software_root=op['software_root'],
                  instance_root=op['instance_root'],
276
                  shared_part_list=op.get('shared_part_list', ''),
277 278
                  master_url=op['master_url'],
                  computer_id=op['computer_id'],
279
                  buildout=op.get('buildout'),
280
                  buildout_debug=op.get('buildout_debug'),
281
                  logger=logger,
282
                  maximum_periodicity = op.get('maximum_periodicity', 86400),
283 284
                  key_file=op.get('key_file'),
                  cert_file=op.get('cert_file'),
285 286
                  signature_private_key_file=op.get(
                    'signature-private-key-file', op.get('signature_private_key_file')),
287 288 289
                  signature_certificate_list=signature_certificate_list,
                  download_binary_cache_url=op.get('download-binary-cache-url'),
                  upload_binary_cache_url=op.get('upload-binary-cache-url'),
Marco Mariani's avatar
Marco Mariani committed
290
                  download_from_binary_cache_url_blacklist=
291
                      op.get('download-from-binary-cache-url-blacklist', []),
Marco Mariani's avatar
Marco Mariani committed
292
                  upload_to_binary_cache_url_blacklist=
293
                      op.get('upload-to-binary-cache-url-blacklist', []),
294 295
                  download_from_binary_cache_force_url_list=
                      op.get('download-from-binary-cache-force-url-list', []),
296 297 298 299
                  upload_cache_url=op.get('upload-cache-url'),
                  download_binary_dir_url=op.get('download-binary-dir-url'),
                  upload_binary_dir_url=op.get('upload-binary-dir-url'),
                  upload_dir_url=op.get('upload-dir-url'),
300 301
                  master_ca_file=op.get('master_ca_file'),
                  certificate_repository_path=op.get('certificate_repository_path'),
302
                  promise_timeout=op.get('promise_timeout', PROMISE_TIMEOUT),
303
                  shacache_ca_file=op.get('shacache-ca-file'),
304 305
                  shacache_cert_file=op.get('shacache-cert-file'),
                  shacache_key_file=op.get('shacache-key-file'),
306
                  shadir_ca_file=op.get('shadir-ca-file'),
307 308
                  shadir_cert_file=op.get('shadir-cert-file'),
                  shadir_key_file=op.get('shadir-key-file'),
309
                  forbid_supervisord_automatic_launch=string_to_boolean(op.get('forbid_supervisord_automatic_launch', 'false')),
310 311 312 313
                  develop=op.get('develop', False),
                  # Try to fetch from deprecated argument
                  software_release_filter_list=op.get('only-sr', op.get('only_sr')),
                  # Try to fetch from deprecated argument
314 315
                  computer_partition_filter_list=op.get('only-cp', op.get('only_cp')),
                  software_min_free_space=software_min_free_space,
316
                  instance_min_free_space=instance_min_free_space,
317
                  instance_storage_home=op.get('instance_storage_home'),
318
                  ipv4_global_network=op.get('ipv4_global_network'),
319
                  firewall_conf=op.get('firewall'),
320
                  config=options,
321 322
                  force_stop=op.get('force_stop', False),
                  partition_timeout=partition_timeout)
Łukasz Nowak's avatar
Łukasz Nowak committed
323 324


325
def check_required_only_partitions(existing, required):
326 327 328
  """
  Verify the existence of partitions specified by the --only parameter
  """
329 330
  required = set(required)
  missing = required.difference(existing)
331
  if missing:
Marco Mariani's avatar
Marco Mariani committed
332 333
    plural = ['s', ''][len(missing) == 1]
    raise ValueError('Unknown partition%s: %s' % (plural, ', '.join(sorted(missing))))
334
  return required
335 336


Łukasz Nowak's avatar
Łukasz Nowak committed
337 338 339 340
class Slapgrid(object):
  """ Main class for SlapGrid. Fetches and processes informations from master
  server and pushes usage information to master server.
  """
Antoine Catton's avatar
Antoine Catton committed
341

Łukasz Nowak's avatar
Łukasz Nowak committed
342 343 344 345 346
  def __init__(self,
               software_root,
               instance_root,
               master_url,
               computer_id,
347
               buildout,
348
               logger,
349
               maximum_periodicity=86400,
Łukasz Nowak's avatar
Łukasz Nowak committed
350 351
               key_file=None,
               cert_file=None,
352
               signature_private_key_file=None,
Yingjie Xu's avatar
Yingjie Xu committed
353 354 355
               signature_certificate_list=None,
               download_binary_cache_url=None,
               upload_binary_cache_url=None,
356 357
               download_from_binary_cache_url_blacklist=None,
               upload_to_binary_cache_url_blacklist=None,
358
               download_from_binary_cache_force_url_list=None,
359
               upload_cache_url=None,
Yingjie Xu's avatar
Yingjie Xu committed
360 361
               download_binary_dir_url=None,
               upload_binary_dir_url=None,
362
               upload_dir_url=None,
Łukasz Nowak's avatar
Łukasz Nowak committed
363 364
               master_ca_file=None,
               certificate_repository_path=None,
365
               promise_timeout=PROMISE_TIMEOUT,
366
               shacache_ca_file=None,
367 368
               shacache_cert_file=None,
               shacache_key_file=None,
369
               shadir_ca_file=None,
370
               shadir_cert_file=None,
371
               shadir_key_file=None,
372
               forbid_supervisord_automatic_launch=False,
373
               develop=False,
374
               software_release_filter_list=None,
375
               computer_partition_filter_list=None,
376 377
               software_min_free_space=None,
               instance_min_free_space=None,
378
               instance_storage_home=None,
379
               ipv4_global_network=None,
380
               firewall_conf={},
381
               config=None,
382
               buildout_debug=False,
383 384
               shared_part_list='',
               force_stop=False,
385
               partition_timeout=None,
386
               slapgrid_jio_uri=None,
387
               ):
Łukasz Nowak's avatar
Łukasz Nowak committed
388 389 390 391 392 393
    """Makes easy initialisation of class parameters"""
    # Parses arguments
    self.software_root = os.path.abspath(software_root)
    self.instance_root = os.path.abspath(instance_root)
    self.master_url = master_url
    self.computer_id = computer_id
394
    self.supervisord_socket = _getSupervisordSocketPath(instance_root, logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
395 396 397 398
    self.key_file = key_file
    self.cert_file = cert_file
    self.master_ca_file = master_ca_file
    self.certificate_repository_path = certificate_repository_path
399
    self.signature_private_key_file = signature_private_key_file
Yingjie Xu's avatar
Yingjie Xu committed
400 401 402
    self.signature_certificate_list = signature_certificate_list
    self.download_binary_cache_url = download_binary_cache_url
    self.upload_binary_cache_url = upload_binary_cache_url
403 404 405 406
    self.download_from_binary_cache_url_blacklist = \
        download_from_binary_cache_url_blacklist
    self.upload_to_binary_cache_url_blacklist = \
        upload_to_binary_cache_url_blacklist
407 408
    self.download_from_binary_cache_force_url_list = \
        download_from_binary_cache_force_url_list
409
    self.upload_cache_url = upload_cache_url
Yingjie Xu's avatar
Yingjie Xu committed
410 411
    self.download_binary_dir_url = download_binary_dir_url
    self.upload_binary_dir_url = upload_binary_dir_url
412
    self.upload_dir_url = upload_dir_url
413
    self.shacache_ca_file = shacache_ca_file
414 415
    self.shacache_cert_file = shacache_cert_file
    self.shacache_key_file = shacache_key_file
416
    self.shadir_ca_file = shadir_ca_file
417 418
    self.shadir_cert_file = shadir_cert_file
    self.shadir_key_file = shadir_key_file
419
    self.forbid_supervisord_automatic_launch = forbid_supervisord_automatic_launch
420
    self.logger = logger
421
    self.slapgrid_jio_uri = slapgrid_jio_uri
Łukasz Nowak's avatar
Łukasz Nowak committed
422
    # Creates objects from slap module
Marco Mariani's avatar
Marco Mariani committed
423
    self.slap = slapos.slap.slap()
Łukasz Nowak's avatar
Łukasz Nowak committed
424
    self.slap.initializeConnection(self.master_url, key_file=self.key_file,
425 426 427 428 429 430
        cert_file=self.cert_file, master_ca_file=self.master_ca_file,
        slapgrid_jio_uri=self.slapgrid_jio_uri)
    if self.slap.jio_api_connector:
      self.api_backward_compatibility = False
    else:
      self.api_backward_compatibility = True
Łukasz Nowak's avatar
Łukasz Nowak committed
431 432
    self.computer = self.slap.registerComputer(self.computer_id)
    # Defines all needed paths
433
    self.buildout = buildout
434
    self.buildout_debug = buildout_debug
435
    self.promise_timeout = promise_timeout
436
    self.partition_timeout = partition_timeout
437
    self.develop = develop
438
    if software_release_filter_list is not None:
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
439 440
      self.software_release_filter_list = \
          software_release_filter_list.split(",")
441
    else:
442
      self.software_release_filter_list = []
443 444
    self.computer_partition_filter_list = []
    if computer_partition_filter_list is not None:
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
445 446
      self.computer_partition_filter_list = \
          computer_partition_filter_list.split(",")
447
    self.computer_partition_list = None
448
    self.maximum_periodicity = maximum_periodicity
449 450
    self.software_min_free_space = software_min_free_space
    self.instance_min_free_space = instance_min_free_space
451 452 453 454
    if instance_storage_home:
      self.instance_storage_home = os.path.abspath(instance_storage_home)
    else:
      self.instance_storage_home = ""
455 456 457 458
    if ipv4_global_network:
      self.ipv4_global_network = ipv4_global_network
    else:
      self.ipv4_global_network= ""
459
    self.firewall_conf = firewall_conf
460 461
    self.config = config
    self._manager_list = slapmanager.from_config(config)
462
    self.shared_part_list = shared_part_list
463
    self.force_stop = force_stop
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
464

465
  def _getWatchdogLine(self):
466
    invocation_list = [WATCHDOG_PATH]
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
467
    invocation_list.append("--master-url '%s' " % self.master_url)
468
    if self.certificate_repository_path:
Marco Mariani's avatar
Marco Mariani committed
469 470
      invocation_list.append("--certificate-repository-path '%s'" %
                                self.certificate_repository_path)
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
471
    invocation_list.append("--computer-id '%s'" % self.computer_id)
472
    invocation_list.append("--instance-root '%s'" % self.instance_root)
Cédric Le Ninivin's avatar
Cédric Le Ninivin committed
473
    return ' '.join(invocation_list)
Łukasz Nowak's avatar
Łukasz Nowak committed
474

475 476 477 478 479 480 481 482
  def _generateFirewallSupervisorConf(self):
    """If firewall section is defined in slapos configuration, generate
      supervisor configuration entry for firewall process.
    """
    supervisord_conf_folder_path = os.path.join(self.instance_root,
                                               'etc', 'supervisord.conf.d')
    supervisord_firewall_conf = os.path.join(supervisord_conf_folder_path,
                                              'firewall.conf')
483 484
    if not self.firewall_conf or not self.firewall_conf.get('firewall_executable') \
      or self.firewall_conf.get('testing', False):
485 486 487 488 489 490 491 492
      if os.path.exists(supervisord_firewall_conf):
        os.unlink(supervisord_firewall_conf)
      return
    supervisord_firewall_program_conf = """\
[program:firewall]
directory=/opt/slapos
command=%(firewall_executable)s
process_name=firewall
493
priority=5
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
autostart=true
autorestart=true
startsecs=0
startretries=0
exitcodes=0
stopsignal=TERM
stopwaitsecs=60
user=0
group=0
serverurl=AUTO
redirect_stderr=true
stdout_logfile=%(log_file)s
stdout_logfile_maxbytes=100KB
stdout_logfile_backups=1
stderr_logfile=%(log_file)s
stderr_logfile_maxbytes=100KB
stderr_logfile_backups=1
""" %  {'firewall_executable': self.firewall_conf['firewall_executable'],
        'log_file': self.firewall_conf.get('log_file', '/var/log/firewall.log')}

    if not os.path.exists(supervisord_conf_folder_path):
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
      os.makedirs(supervisord_conf_folder_path)
    updateFile(supervisord_firewall_conf, supervisord_firewall_program_conf)


  def _generateDbusSupervisorConf(self):
    """If dbus command is defined in slapos configuration, generate
      supervisor configuration entry for dbus daemon.
    """
    supervisord_conf_folder_path = os.path.join(self.instance_root,
                                               'etc', 'supervisord.conf.d')
    supervisord_dbus_conf = os.path.join(supervisord_conf_folder_path,
                                              'dbus.conf')
    if not self.firewall_conf or not self.firewall_conf.get('dbus_executable') \
      or self.firewall_conf.get('testing', False):
      if os.path.exists(supervisord_dbus_conf):
        os.unlink(supervisord_dbus_conf)
      return
    supervisord_dbus_program_conf = """\
[program:dbus]
directory=/opt/slapos
command=%(dbus_executable)s
process_name=dbus
priority=1
autostart=true
autorestart=true
startsecs=0
startretries=0
exitcodes=0
stopsignal=TERM
stopwaitsecs=60
user=0
group=0
serverurl=AUTO
redirect_stderr=true
stdout_logfile=%(dbus_log_file)s
stdout_logfile_maxbytes=100KB
stdout_logfile_backups=1
stderr_logfile=%(dbus_log_file)s
stderr_logfile_maxbytes=100KB
stderr_logfile_backups=1
""" %  {'dbus_executable': self.firewall_conf['dbus_executable'],
        'dbus_log_file': self.firewall_conf.get('dbus_log_file', '/var/log/dbus.log')}

    if not os.path.exists(supervisord_conf_folder_path):
      os.makedirs(supervisord_conf_folder_path)
    updateFile(supervisord_dbus_conf, supervisord_dbus_program_conf)
561

Łukasz Nowak's avatar
Łukasz Nowak committed
562 563 564 565 566 567
  def checkEnvironmentAndCreateStructure(self):
    """Checks for software_root and instance_root existence, then creates
       needed files and directories.
    """
    # Checks for software_root and instance_root existence
    if not os.path.isdir(self.software_root):
568
      raise OSError('%s does not exist.' % self.software_root)
569

570 571 572 573
    createSupervisordConfiguration(
        self.instance_root,
        logger=self.logger,
        watchdog_command=self._getWatchdogLine())
574
    self._generateFirewallSupervisorConf()
575
    self._generateDbusSupervisorConf()
576 577

  def _launchSupervisord(self):
578 579
    if not self.forbid_supervisord_automatic_launch:
      launchSupervisord(instance_root=self.instance_root, logger=self.logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
580 581

  def getComputerPartitionList(self):
582 583
    if not self.api_backward_compatibility:
      if self.computer_partition_list is None:
584 585 586 587
        self.computer_partition_list = self.slap.jio_api_connector.allDocs({
          "portal_type": "Software Instance",
          "compute_node_id": self.computer_id,
        }).get("result_list", [])
588 589 590 591 592 593 594 595
    else:
      try:
        slap_partition_list = self.computer.getComputerPartitionList()
      except socket.error as exc:
        self.logger.fatal(exc)
        raise
      self.computer_partition_list = []
      for partition in slap_partition_list:
596
        try:
597 598 599 600 601 602 603 604 605 606 607 608 609 610
          software_release_uri = partition.getSoftwareRelease().getURI()
        except (NotFoundError, TypeError, NameError):
          software_release_uri = None

        parameter_dict = partition.getInstanceParameterDict()
        self.computer_partition_list.append({
          "reference": partition._instance_guid,
          "portal_type": "Software Instance",
          "compute_partition_id": partition.getId(),
          "state": partition.getState(),
          "software_type": parameter_dict.get('slap_software_type', None),
          "parameters": parameter_dict,
          "processing_timestamp": parameter_dict.get("timestamp"),
          "slap_partition": partition,
611
          "ip_list": parameter_dict.get("ip_list", []),
612 613 614 615 616
          "full_ip_list": parameter_dict.get("full_ip_list", []),
          "access_status_message": partition.getAccessStatus(),
          "software_release_uri": software_release_uri,
          "sla_parameters": getattr(partition, '_filter_dict', {}),
        })
617 618 619 620 621 622 623 624 625 626 627 628
    return self.computer_partition_list

  def sendPartitionError(self, partition, error_message, logger=None):
    if not self.api_backward_compatibility:
      self.slap.jio_api_connector.put({
        "portal_type": "Software Instance",
        "reported_state": "error",
        "status_message": str(error_message),
        "reference": partition.get("reference")
      })
    else:
      partition["slap_partition"].error(error_message, logger=logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
629

630 631 632 633
  def getRequiredComputerPartitionList(self):
    """Return the computer partitions that should be processed.
    """
    cp_list = self.getComputerPartitionList()
634
    cp_id_list = [cp.get("computer_partition_id", "") for cp in cp_list]
635 636 637 638
    required_cp_id_set = check_required_only_partitions(
      cp_id_list, self.computer_partition_filter_list)
    busy_cp_list = self.FilterComputerPartitionList(cp_list)
    if required_cp_id_set:
639
      return [cp for cp in busy_cp_list if cp.get("computer_partition_id", "") in required_cp_id_set]
640 641
    return busy_cp_list

Łukasz Nowak's avatar
Łukasz Nowak committed
642 643 644 645
  def processSoftwareReleaseList(self):
    """Will process each Software Release.
    """
    self.checkEnvironmentAndCreateStructure()
646
    self.logger.info('Processing software releases...')
647
    # Boolean to know if every instance has correctly been deployed
Łukasz Nowak's avatar
Łukasz Nowak committed
648
    clean_run = True
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
    if not self.api_backward_compatibility:
      software_installation_list = self.slap.jio_api_connector.allDocs({
        "portal_type": "Software Installation",
        "compute_node_id": self.computer_id
      })
      if "result_list" in software_installation_list:
        software_installation_list = software_installation_list["result_list"]
      else:
        software_installation_list = []
    else:
      software_installation_list = []
      for software_release in self.computer.getSoftwareReleaseList():
        software_installation_list.append({
          "software_release_uri": software_release.getURI(),
          "state": software_release.getState(),
          "compatibility_software_release": software_release,
        })
    for software_release in software_installation_list:
      state = software_release["state"]
Łukasz Nowak's avatar
Łukasz Nowak committed
668
      try:
669
        software_release_uri = software_release["software_release_uri"]
Marco Mariani's avatar
Marco Mariani committed
670
        url_hash = md5digest(software_release_uri)
671
        software_path = os.path.join(self.software_root, url_hash)
Łukasz Nowak's avatar
Łukasz Nowak committed
672 673
        software = Software(url=software_release_uri,
            software_root=self.software_root,
674
            buildout=self.buildout,
675
            buildout_debug=self.buildout_debug,
676
            logger=self.logger,
677
            signature_private_key_file=self.signature_private_key_file,
Yingjie Xu's avatar
Yingjie Xu committed
678 679 680
            signature_certificate_list=self.signature_certificate_list,
            download_binary_cache_url=self.download_binary_cache_url,
            upload_binary_cache_url=self.upload_binary_cache_url,
Marco Mariani's avatar
Marco Mariani committed
681
            download_from_binary_cache_url_blacklist=
682
                self.download_from_binary_cache_url_blacklist,
Marco Mariani's avatar
Marco Mariani committed
683
            upload_to_binary_cache_url_blacklist=
684
                self.upload_to_binary_cache_url_blacklist,
685 686
            download_from_binary_cache_force_url_list=
                self.download_from_binary_cache_force_url_list,
687
            upload_cache_url=self.upload_cache_url,
Yingjie Xu's avatar
Yingjie Xu committed
688 689
            download_binary_dir_url=self.download_binary_dir_url,
            upload_binary_dir_url=self.upload_binary_dir_url,
690
            upload_dir_url=self.upload_dir_url,
691
            shacache_ca_file=self.shacache_ca_file,
692 693
            shacache_cert_file=self.shacache_cert_file,
            shacache_key_file=self.shacache_key_file,
694
            shadir_ca_file=self.shadir_ca_file,
695
            shadir_cert_file=self.shadir_cert_file,
696
            shadir_key_file=self.shadir_key_file,
697 698
            software_min_free_space=self.software_min_free_space,
            shared_part_list=self.shared_part_list)
699 700 701 702 703

        # call manager for every software release
        for manager in self._manager_list:
          manager.software(software)

Łukasz Nowak's avatar
Łukasz Nowak committed
704
        if state == 'available':
705
          completed_tag = os.path.join(software_path, '.completed')
Marco Mariani's avatar
Marco Mariani committed
706 707 708 709
          if (self.develop or (not os.path.exists(completed_tag) and
                 len(self.software_release_filter_list) == 0) or
                 url_hash in self.software_release_filter_list or
                 url_hash in (md5digest(uri) for uri in self.software_release_filter_list)):
710
            try:
711 712 713 714 715 716 717 718 719
              if not self.api_backward_compatibility:
                self.slap.jio_api_connector.put({
                  "portal_type": "Software Installation",
                  "compute_node_id": self.computer_id,
                  "software_release_uri": software_release_uri,
                  "reported_state": "building",
                })
              else:
                software_release["compatibility_software_release"].building()
720 721 722
            except NotFoundError:
              pass
            software.install()
Marco Mariani's avatar
Marco Mariani committed
723 724
            with open(completed_tag, 'w') as fout:
              fout.write(time.asctime())
Łukasz Nowak's avatar
Łukasz Nowak committed
725
        elif state == 'destroyed':
726
          if os.path.exists(software_path):
727
            self.logger.info('Destroying %r...' % software_release_uri)
728
            software.destroy()
729
            self.logger.info('Destroyed %r.' % software_release_uri)
730 731 732 733

        # call manager for every software release
        for manager in self._manager_list:
          manager.softwareTearDown(software)
734
      # Send log before exiting
Łukasz Nowak's avatar
Łukasz Nowak committed
735
      except (SystemExit, KeyboardInterrupt):
736 737 738 739 740 741 742 743 744 745 746
        if not self.api_backward_compatibility:
          self.slap.jio_api_connector.put({
            "portal_type": "Software Installation",
            "compute_node_id": self.computer_id,
            "software_release_uri": software_release_uri,
            "error_status": traceback.format_exc(),
          })
        else:
          software_release["compatibility_software_release"].error(
            traceback.format_exc(), logger=self.logger
          )
Łukasz Nowak's avatar
Łukasz Nowak committed
747
        raise
748 749

      # Buildout failed: send log but don't print it to output (already done)
Marco Mariani's avatar
Marco Mariani committed
750
      except BuildoutFailedError as exc:
751 752
        clean_run = False
        try:
753 754 755 756 757 758 759 760 761
          if not self.api_backward_compatibility:
            self.slap.jio_api_connector.put({
              "portal_type": "Software Installation",
              "compute_node_id": self.computer_id,
              "software_release_uri": software_release_uri,
              "error_status": str(exc),
            })
          else:
            software_release["compatibility_software_release"].error(exc, logger=self.logger)
762 763 764
        except (SystemExit, KeyboardInterrupt):
          raise
        except Exception:
765
          self.logger.exception('Problem while reporting error, continuing:')
766 767

      # For everything else: log it, send it, continue.
Łukasz Nowak's avatar
Łukasz Nowak committed
768
      except Exception:
769
        self.logger.exception('')
770 771 772 773 774 775 776 777 778 779 780
        if not self.api_backward_compatibility:
          self.slap.jio_api_connector.put({
            "portal_type": "Software Installation",
            "compute_node_id": self.computer_id,
            "software_release_uri": software_release_uri,
            "error_status": traceback.format_exc(),
          })
        else:
          software_release["compatibility_software_release"].error(
            traceback.format_exc(), logger=self.logger
          )
Łukasz Nowak's avatar
Łukasz Nowak committed
781 782
        clean_run = False
      else:
Łukasz Nowak's avatar
Łukasz Nowak committed
783
        if state == 'available':
784
          try:
785 786 787 788 789 790 791 792 793
            if not self.api_backward_compatibility:
              self.slap.jio_api_connector.put({
                "portal_type": "Software Installation",
                "compute_node_id": self.computer_id,
                "software_release_uri": software_release_uri,
                "reported_state": "available",
              })
            else:
              software_release["compatibility_software_release"].available()
794
          except (NotFoundError, ServerError):
795
            pass
Łukasz Nowak's avatar
Łukasz Nowak committed
796
        elif state == 'destroyed':
797
          try:
798 799 800 801 802 803 804 805 806
            if not self.api_backward_compatibility:
              self.slap.jio_api_connector.put({
                "portal_type": "Software Installation",
                "compute_node_id": self.computer_id,
                "software_release_uri": software_release_uri,
                "reported_state": "destroyed",
              })
            else:
              software_release["compatibility_software_release"].destroyed()
807
          except (NotFoundError, ServerError):
808
            self.logger.exception('')
809
    self.logger.info('Finished software releases.')
810 811 812 813 814

    # Return success value
    if not clean_run:
      return SLAPGRID_FAIL
    return SLAPGRID_SUCCESS
Łukasz Nowak's avatar
Łukasz Nowak committed
815

816
  def _checkPromiseList(self, partition, force=True, check_anomaly=False):
817 818 819 820
    partition_id = partition.partition_id
    self.logger.info("Checking %s promises...", partition_id)

    instance_path = os.path.join(self.instance_root, partition_id)
821
    promise_log_path = os.path.join(instance_path, PROMISE_LOG_FOLDER_NAME)
822 823
    promise_dir = os.path.join(instance_path, 'etc', 'plugin')
    legacy_promise_dir = os.path.join(instance_path, 'etc', 'promise')
Antoine Catton's avatar
Antoine Catton committed
824 825 826 827

    stat_info = os.stat(instance_path)
    uid = stat_info.st_uid
    gid = stat_info.st_gid
828

829 830 831 832 833 834 835 836 837 838 839 840 841
    promise_config = {
      'promise-folder': promise_dir,
      'legacy-promise-folder': legacy_promise_dir,
      'promise-timeout': self.promise_timeout,
      'uid': uid,
      'gid': gid,
      'partition-folder': instance_path,
      'log-folder': promise_log_path,
      'force': force,
      'check-anomaly': check_anomaly,
      'master-url': partition.server_url,
      'partition-cert': partition.cert_file,
      'partition-key': partition.key_file,
842
      'partition-id': partition_id,
843 844 845
      'computer-id': self.computer_id,
    }

846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
    plugins = sum(
      1 for p in listifdir(promise_dir)
      if p.endswith('.py') and not p.startswith('__init__')
    )
    instance_python = partition.instance_python
    if instance_python is not None and plugins:
      self.logger.info(
        "Switching to %s's python at %s", partition_id, instance_python)
      runpromise_script = os.path.join(
        os.path.dirname(__file__), 'promise', 'runpromises.py')
      command = [instance_python, runpromise_script]
      for option, value in promise_config.items():
        if option in ('uid', 'gid'):
          continue
        if isinstance(value, bool):
          if value:
            command.append('--' + option)
        else:
          command.append('--' + option)
          command.append(str(value))
866
      # Compute timeout as the sum of all promise timeouts + empirical margin.
867 868
      promises = plugins + len(listifdir(legacy_promise_dir))
      timeout = promises * self.promise_timeout + 10
869 870 871
      # The runpromise script uses stderr exclusively to propagate exception
      # messages. It otherwise redirects stderr to stdout so that all outputs
      # from the promises go to stdout.
872 873 874 875 876
      def preexec_fn():
        err = os.dup(2)
        os.dup2(1, 2)
        dropPrivileges(uid, gid, logger=self.logger)
        os.dup2(err, 2)
877 878 879 880
      process = SlapPopen(
        command,
        preexec_fn=preexec_fn,
        cwd=instance_path,
881 882
        env=getCleanEnvironment(self.logger,
          home_path=pwd.getpwuid(uid).pw_dir),
883 884 885 886 887 888 889 890 891 892 893 894
        universal_newlines=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        logger=self.logger,
        timeout=timeout,
      )
      if process.returncode == 2:
        raise PromiseError(unicode2str(process.error))
      elif process.returncode:
        raise Exception(unicode2str(process.error))
      elif process.error:
        self.logger.warn('Unexpected promise runner output:\n%s', process.error)
895 896 897

    else:
      return PromiseLauncher(config=promise_config, logger=self.logger).run()
Antoine Catton's avatar
Antoine Catton committed
898

899
  def _endInstallationTransaction(self, computer_partition):
900
    partition_id = computer_partition.get("compute_partition_id")
901 902 903 904 905 906 907
    transaction_file_name = COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME % partition_id
    transaction_file_path = os.path.join(self.instance_root,
                                      partition_id,
                                      transaction_file_name)

    if os.path.exists(transaction_file_path):
      with open(transaction_file_path, 'r') as tf:
908
        try:
909 910 911 912 913 914 915 916 917 918
          if not self.api_backward_compatibility:
            self.slap.jio_api_connector.put({
              "portal_type": "Software Instance",
              "reference": computer_partition.get("reference"),
              "requested_instance_list": [reference for reference in tf.read().split('\n') if reference],
            })
          else:
            computer_partition["slap_partition"].setComputerPartitionRelatedInstanceList(
              [reference for reference in tf.read().split('\n') if reference]
            )
Bryton Lacquement's avatar
Bryton Lacquement committed
919
        except NotFoundError as e:
920 921
          # Master doesn't implement this feature ?
          self.logger.warning("NotFoundError: %s. \nCannot send requested instance "\
922
                            "list to master. Please check if this feature is "\
923
                            "implemented on SlapOS Master." % str(e))
924

925 926 927 928
  def _addFirewallRule(self, rule_command):
    """
    """
    query_cmd = rule_command.replace('--add-rule', '--query-rule')
Bryton Lacquement's avatar
Bryton Lacquement committed
929
    process = FPopen(query_cmd, universal_newlines=True)
930
    result, stderr = process.communicate()
931
    if result.strip() == 'no':
932
      # rule doesn't exist add to firewall
933
      self.logger.debug(rule_command)
Bryton Lacquement's avatar
Bryton Lacquement committed
934
      process = FPopen(rule_command, universal_newlines=True)
935 936 937 938 939 940 941 942 943 944
      rule_result, stderr = process.communicate()
      if process.returncode == 0:
        if rule_result.strip() != 'success':
          raise Exception(rule_result)
      else:
        raise Exception("Failed to add firewalld rule %s\n%s.\n%s" % (
                        rule_command, rule_result, stderr))
    elif result.strip() != 'no' and process.returncode != 0:
      raise Exception("Failed to run firewalld rule %s\n%s.\n%s" % (
                      query_cmd, result, stderr))
945 946 947 948 949 950 951

    return result.strip() == 'no'

  def _removeFirewallRule(self, rule_command):
    """
    """
    query_cmd = rule_command.replace('--add-rule', '--query-rule')
Bryton Lacquement's avatar
Bryton Lacquement committed
952
    process = FPopen(query_cmd, universal_newlines=True)
953
    result, stderr = process.communicate()
954
    if result.strip() == 'yes':
955
      # The rule really exist, remove it
956 957
      remove_command = rule_command.replace('--add-rule', '--remove-rule')
      self.logger.debug(remove_command)
Bryton Lacquement's avatar
Bryton Lacquement committed
958
      process = FPopen(remove_command, universal_newlines=True)
959 960 961 962 963 964 965 966 967 968
      rule_result, stderr = process.communicate()
      if process.returncode == 0:
        if rule_result.strip() != 'success':
          raise Exception(rule_result)
      else:
        raise Exception("Failed to add firewalld rule %s\n%s.\n%s" % (
                        rule_command, rule_result, stderr))
    elif result.strip() != 'no' and process.returncode != 0:
      raise Exception("Failed to run firewalld rule %s\n%s.\n%s" % (
                      query_cmd, result, stderr))
969 970 971

    return result.strip() == 'yes'

972
  def _checkAddFirewallRules(self, partition_id, command_list, add=True):
973
    """
974
    Process Firewall rules from and save rules to firewall_rules_path
975
    """
976
    
977
    instance_path = os.path.join(self.instance_root, partition_id)
978 979
    firewall_rules_path = os.path.join(instance_path,
                                Partition.partition_firewall_rules_name)
980
    reload_rules = False
981 982
    fw_base_cmd = self.firewall_conf['firewall_cmd']
    json_list = []
983

984 985
    if os.path.exists(firewall_rules_path):
      with open(firewall_rules_path, 'r') as frules:
986 987 988
        rules_list = json.loads(frules.read())

      for command in rules_list:
989
        skip_remove = False
990
        if add:
991
          for new_cmd in command_list:
992
            if command == new_cmd:
993
              skip_remove = True
994
              break
995 996 997 998

        if not skip_remove:
          state = self._removeFirewallRule('%s %s' % (fw_base_cmd, command))
          reload_rules = reload_rules or state
999 1000

    if add:
1001 1002 1003 1004
      json_list = command_list
      for command in command_list:
        state = self._addFirewallRule('%s %s' % (fw_base_cmd, command))
        reload_rules = reload_rules or state
1005

1006 1007 1008 1009 1010
    if reload_rules:
      # Apply changes: reload configuration
      # XXX - need to check firewalld reload instead of restart
      self.logger.info("Reloading firewall configuration...")
      reload_cmd = self.firewall_conf['reload_config_cmd']
Bryton Lacquement's avatar
Bryton Lacquement committed
1011
      reload_process = FPopen(reload_cmd, universal_newlines=True)
1012 1013 1014 1015
      stdout, stderr = reload_process.communicate()
      if reload_process.returncode != 0:
        raise Exception("Failed to load firewalld rules with command %s.\n%" % (
                        stderr, reload_cmd))
1016

1017
    if reload_rules or not os.path.exists(firewall_rules_path):
1018
      with open(firewall_rules_path, 'w') as frules:
1019
        frules.write(json.dumps(json_list))
1020

1021
  def _getFirewallAcceptRules(self, ip, hosting_ip_list, source_ip_list, ip_type='ipv4'):
1022
    """
1023
    Generate rules for firewall based on list of IP that should have access to `ip`
1024
    """
1025 1026 1027
    if ip_type not in ['ipv4', 'ipv6', 'eb']:
      raise NotImplementedError("firewall-cmd has not rules with tables %s." % ip_type)

1028
    command = '--permanent --direct --add-rule %s filter' % ip_type
1029 1030

    cmd_list = []
1031
    ip_list = hosting_ip_list + source_ip_list
1032 1033

    for other_ip in ip_list:
1034
      # Configure INPUT rules
1035 1036
      cmd_list.append('%s INPUT 0 -s %s -d %s -j ACCEPT' % (command,
                                                            other_ip, ip))
1037
      # Configure FORWARD rules
1038 1039 1040
      cmd_list.append('%s FORWARD 0 -s %s -d %s -j ACCEPT' % (command,
                                                              other_ip, ip))

1041 1042 1043 1044
    # Reject all other requests
    cmd_list.append('%s INPUT 1000 -d %s -j REJECT' % (command, ip))
    cmd_list.append('%s FORWARD 1000 -d %s -j REJECT' % (command, ip))
    cmd_list.append('%s INPUT 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
1045
                    command, ip))
1046
    cmd_list.append('%s FORWARD 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
1047 1048
                    command, ip))

1049
    return cmd_list
1050

1051 1052 1053 1054 1055 1056 1057
  def _getFirewallRejectRules(self, ip, hosting_ip_list, source_ip_list, ip_type='ipv4'):
    """
    Generate rules for firewall based on list of IP that should not have access to `ip`
    """
    if ip_type not in ['ipv4', 'ipv6', 'eb']:
      raise NotImplementedError("firewall-cmd has not rules with tables %s." % ip_type)

1058
    command = '--permanent --direct --add-rule %s filter' % ip_type
1059 1060 1061 1062

    cmd_list = []

    # Accept all other requests
1063 1064
    #cmd_list.append('%s INPUT 1000 -d %s -j ACCEPT' % (command, ip))
    #cmd_list.append('%s FORWARD 1000 -d %s -j ACCEPT' % (command, ip))
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075

    # Reject all other requests from the list
    for other_ip in source_ip_list:
      cmd_list.append('%s INPUT 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
                    command, other_ip, ip))
      cmd_list.append('%s FORWARD 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
                    command, other_ip, ip))
      cmd_list.append('%s INPUT 900 -s %s -d %s -j REJECT' % (command,
                                                            other_ip, ip))
      cmd_list.append('%s FORWARD 900 -s %s -d %s -j REJECT' % (command,
                                                              other_ip, ip))
1076
    # Accept on this instance tree
1077
    for other_ip in hosting_ip_list:
1078
      cmd_list.append('%s INPUT 0 -s %s -d %s -j ACCEPT' % (command,
1079
                                                            other_ip, ip))
1080
      cmd_list.append('%s FORWARD 0 -s %s -d %s -j ACCEPT' % (command,
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
                                                              other_ip, ip))

    return cmd_list

  def _getValidIpv4FromList(self, ipv4_list, warn=False):
    """
    Return the list containing only valid ipv4 or network address.
    """
    valid_list = []
    for ip in ipv4_list:
      if not ip:
        continue
      the_ip = ip.split('/')[0]
      if valid_ipv4(the_ip):
        valid_list.append(ip)
      elif warn:
        self.logger.warn("IP/Network address %s is not valid. ignored.." % ip)
    return valid_list
1099

1100
  def _setupComputerPartitionFirewall(self, computer_partition, ip_list, drop_entries=False):
1101 1102
    """
    Using linux iptables, limit access to IP of this partition to all 
1103
    others partitions of the same Instance Tree
1104 1105 1106
    """
    ipv4_list = []
    ipv6_list = []
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
    source_ipv4_list = []
    source_ipv6_list = []
    hosting_ipv4_list = []
    hosting_ipv6_list = []
    getFirewallRules = getattr(self, '_getFirewallAcceptRules')

    if not drop_entries:
      self.logger.info("Configuring firewall...")
      add_rules = True
    else:
      add_rules = False
      self.logger.info("Removing firewall configuration...")
1119 1120 1121 1122 1123

    for net_ip in ip_list:
      iface, ip = (net_ip[0], net_ip[1])
      if not iface.startswith('route_'):
        continue
1124 1125 1126 1127
      if valid_ipv4(ip):
        ipv4_list.append(ip)
      elif valid_ipv6(ip):
        ipv6_list.append(ip)
1128

1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
    if not self.api_backward_compatibility:
      hosting_ip_list = []
      # Get all the instances of the instance tree
      related_instance_list = self.slap.jio_api_connector.allDocs({
        "portal_type": "Software Instance",
        "root_instance_title": computer_partition["root_instance_title"],
      }).get("result_list", [])
      for instance_result in related_instance_list:
        if instance_result["reference"] != computer_partition["reference"]:
          instance = self.slap.jio_api_connector.get({
            "portal_type": "Software Instance",
            "reference": instance_result["reference"],
          })
          hosting_ip_list = hosting_ip_list + instance["ip_list"]
    else:
      hosting_ip_list = computer_partition["slap_partition"].getFullHostingIpAddressList()

1146
    for iface, ip in hosting_ip_list:
1147 1148
      if valid_ipv4(ip):
        if not ip in ipv4_list:
1149
          hosting_ipv4_list.append(ip)
1150 1151
      elif valid_ipv6(ip):
        if not ip in ipv6_list:
1152
          hosting_ipv6_list.append(ip)
1153

1154
    filter_dict = computer_partition.get('sla_parameters', None)
1155
    extra_list = []
1156
    accept_ip_list = []
1157
    if filter_dict is not None:
1158 1159 1160 1161 1162 1163 1164 1165
      if filter_dict.get('fw_restricted_access', 'on') == 'off':
        extra_list = filter_dict.get('fw_rejected_sources', '').split(' ')
        getFirewallRules = getattr(self, '_getFirewallRejectRules')
        accept_ip_list.extend(self.firewall_conf.get('authorized_sources', []))
        accept_ip_list.extend(filter_dict.get('fw_authorized_sources', '').split(' '))
      else:
        extra_list = filter_dict.get('fw_authorized_sources', '').split(' ')
        extra_list.extend(self.firewall_conf.get('authorized_sources', []))
1166

1167 1168
    source_ipv4_list = self._getValidIpv4FromList(extra_list, True)
    hosting_ipv4_list.extend(self._getValidIpv4FromList(accept_ip_list, True))
1169

1170
    # XXX - ipv6_list and source_ipv6_list ignored for the moment
1171
    for ip in ipv4_list:
1172 1173
      cmd_list = getFirewallRules(ip, hosting_ipv4_list,
                                  source_ipv4_list, ip_type='ipv4')
1174
      self._checkAddFirewallRules(computer_partition.get("compute_partition_id"),
1175
                                  cmd_list, add=add_rules)
Antoine Catton's avatar
Antoine Catton committed
1176

1177
  def _checkPromiseAnomaly(self, local_partition, computer_partition):
1178
    partition_access_status = computer_partition.get("access_status_message", "")
1179 1180 1181 1182 1183 1184 1185
    status_error = False
    if partition_access_status and partition_access_status.startswith("#error"):
      status_error = True
    try:
      self._checkPromiseList(local_partition,
                             check_anomaly=True,
                             force=False)
Bryton Lacquement's avatar
Bryton Lacquement committed
1186
    except PromiseError as e:
1187 1188
      self.logger.error(e)
      if partition_access_status is None or not status_error:
1189
        local_partition._updateCertificate()
1190
        self.sendPartitionError(computer_partition, e, logger=self.logger)
1191
    else:
1192
      if partition_access_status is None or status_error:
1193
        local_partition._updateCertificate()
1194 1195 1196 1197 1198 1199 1200 1201
        if not self.api_backward_compatibility:
          self.slap.jio_api_connector.put({
            "portal_type": "Software Instance",
            "reference": computer_partition.get("reference"),
            "reported_state": "started"
          })
        else:
          computer_partition["slap_partition"].started()
1202

1203 1204 1205 1206
  def processPromise(self, computer_partition):
    """
    Process the promises from a given Computer Partition, depending on its state
    """
1207
    computer_partition_id = computer_partition.get("compute_partition_id")
1208 1209 1210 1211 1212 1213 1214 1215

    # Sanity checks before processing
    # Those values should not be None or empty string or any falsy value
    if not computer_partition_id:
      raise ValueError('Computer Partition id is empty.')

    instance_path = os.path.join(self.instance_root, computer_partition_id)
    os.environ['SLAPGRID_INSTANCE_ROOT'] = self.instance_root
1216
    software_url = computer_partition.get("software_release_uri")
1217 1218 1219 1220 1221 1222 1223 1224

    try:
      software_path = os.path.join(self.software_root, md5digest(software_url))
    except TypeError:
      # Problem with instance: SR URI not set.
      # Try to process it anyway, it may need to be deleted.
      software_path = None

1225
    computer_partition_state = computer_partition.get("state")
1226 1227 1228 1229

    local_partition = Partition(
      software_path=software_path,
      instance_path=instance_path,
1230
      shared_part_list=self.shared_part_list,
1231 1232
      supervisord_partition_configuration_dir=(
        _getSupervisordConfigurationDirectory(self.instance_root)),
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
      supervisord_socket=self.supervisord_socket,
      computer_partition=computer_partition,
      computer_id=self.computer_id,
      partition_id=computer_partition_id,
      server_url=self.master_url,
      software_release_url=software_url,
      certificate_repository_path=self.certificate_repository_path,
      buildout=self.buildout,
      buildout_debug=self.buildout_debug,
      logger=self.logger,
1243
      retention_delay=computer_partition.get('sla_parameters', {}).get('retention_delay', '0'),
1244 1245 1246
      instance_min_free_space=self.instance_min_free_space,
      instance_storage_home=self.instance_storage_home,
      ipv4_global_network=self.ipv4_global_network,
1247
      api_backward_compatibility=self.api_backward_compatibility,
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
    )

    self.logger.info('Processing Promises for Computer Partition %s.', computer_partition_id)
    self.logger.info('  Software URL: %s', software_url)
    self.logger.info('  Software path: %s', software_path)
    self.logger.info('  Instance path: %s', instance_path)

    if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
      self._checkPromiseList(local_partition)
      #self._checkPromiseAnomaly(local_partition, computer_partition)

1259 1260 1261 1262
  def processComputerPartition(self, computer_partition):
    """
    Process a Computer Partition, depending on its state
    """
1263
    computer_partition_id = computer_partition.get("compute_partition_id")
1264 1265 1266 1267 1268 1269

    # Sanity checks before processing
    # Those values should not be None or empty string or any falsy value
    if not computer_partition_id:
      raise ValueError('Computer Partition id is empty.')

1270
    self.logger.debug('Check if %s requires processing...' % computer_partition_id)
1271

1272
    instance_path = os.path.join(self.instance_root, computer_partition_id)
1273 1274 1275 1276 1277 1278 1279 1280
    os.environ['SLAPGRID_INSTANCE_ROOT'] = self.instance_root

    # Check if transaction file of this partition exists, if the file was created,
    # remove it so it will be generate with this new transaction
    transaction_file_name = COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME % computer_partition_id
    transaction_file_path = os.path.join(instance_path, transaction_file_name)
    if os.path.exists(transaction_file_path):
      os.unlink(transaction_file_path)
1281 1282

    # Try to get partition timestamp (last modification date)
1283 1284 1285 1286
    timestamp_path = os.path.join(
        instance_path,
        COMPUTER_PARTITION_TIMESTAMP_FILENAME
    )
1287
    timestamp = computer_partition.get("processing_timestamp")
1288

1289 1290 1291 1292 1293
    error_output_file = os.path.join(
        instance_path,
        COMPUTER_PARTITION_INSTALL_ERROR_FILENAME % computer_partition_id
    )

1294
    software_url = computer_partition.get("software_release_uri")
1295
    try:
Marco Mariani's avatar
Marco Mariani committed
1296
      software_path = os.path.join(self.software_root, md5digest(software_url))
1297 1298 1299 1300 1301
    except TypeError:
      # Problem with instance: SR URI not set.
      # Try to process it anyway, it may need to be deleted.
      software_path = None

1302
    computer_partition_state = computer_partition.get("state")
1303
    periodicity = self.maximum_periodicity
1304
    if software_path:
1305 1306 1307
      periodicity_path = os.path.join(software_path, 'periodicity')
      if os.path.exists(periodicity_path):
        try:
1308 1309
          with open(periodicity_path) as f:
            periodicity = int(f.read())
1310 1311 1312
        except ValueError:
          os.remove(periodicity_path)
          self.logger.exception('')
1313

1314 1315 1316
    local_partition = Partition(
      software_path=software_path,
      instance_path=instance_path,
1317
      shared_part_list=self.shared_part_list,
1318 1319
      supervisord_partition_configuration_dir=(
        _getSupervisordConfigurationDirectory(self.instance_root)),
1320 1321 1322 1323 1324 1325 1326 1327
      supervisord_socket=self.supervisord_socket,
      computer_partition=computer_partition,
      computer_id=self.computer_id,
      partition_id=computer_partition_id,
      server_url=self.master_url,
      software_release_url=software_url,
      certificate_repository_path=self.certificate_repository_path,
      buildout=self.buildout,
1328
      buildout_debug=self.buildout_debug,
1329
      logger=self.logger,
1330
      retention_delay=computer_partition.get('sla_parameters', {}).get('retention_delay', '0'),
1331 1332 1333
      instance_min_free_space=self.instance_min_free_space,
      instance_storage_home=self.instance_storage_home,
      ipv4_global_network=self.ipv4_global_network,
1334
      partition_timeout=self.partition_timeout
1335
      api_backward_compatibility=self.api_backward_compatibility,
1336 1337 1338 1339 1340 1341
    )

    # let managers modify current partition
    for manager in self._manager_list:
      manager.instance(local_partition)

1342 1343 1344 1345 1346 1347
    # Since --force-stop option alters the processing of started partitions:
    # - the partition should be processed regardless of the previous timestamp
    # - the timestamp should not be updated
    if self.force_stop and computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
      timestamp = None

1348 1349 1350
    # Check if timestamp from server is more recent than local one.
    # If not: it's not worth processing this partition (nothing has
    # changed).
Marco Mariani's avatar
Marco Mariani committed
1351
    if (computer_partition_id not in self.computer_partition_filter_list and
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
          not self.develop and timestamp and periodicity):
      try:
        last_runtime = os.path.getmtime(timestamp_path)
      except OSError as e:
        if e.errno != errno.ENOENT:
          raise
      else:
        with open(timestamp_path) as f:
          try:
            old_timestamp = float(f.read())
          except ValueError:
            self.logger.exception('')
            old_timestamp = 0
        if float(timestamp) <= old_timestamp:
1366 1367
            # Check periodicity, i.e if periodicity is one day, partition
            # should be processed at least every day.
1368
            if time.time() <= last_runtime + periodicity or periodicity < 0:
1369
              # check promises anomaly
1370 1371
              if (computer_partition_state == COMPUTER_PARTITION_STARTED_STATE
                  and not self.force_stop):
1372
                self.logger.debug('Partition already up-to-date.')
1373
                self._checkPromiseAnomaly(local_partition, computer_partition)
1374 1375
              else:
                self.logger.debug('Partition already up-to-date. skipping.')
1376 1377 1378 1379 1380

              # Run manager tear down
              for manager in self._manager_list:
                manager.instanceTearDown(local_partition)

1381
              return
1382
        os.remove(timestamp_path)
1383

1384 1385 1386
    # Include Partition Logging
    log_folder_path = "%s/.slapgrid/log" % instance_path
    mkdir_p(log_folder_path)
1387

1388
    stat_info = os.stat(instance_path)
1389
    chownDirectory("%s/.slapgrid" % instance_path,
1390 1391 1392 1393 1394
                   uid=stat_info.st_uid,
                   gid=stat_info.st_gid)

    formatter = logging.Formatter(
       '[%(asctime)s] %(levelname)-8s %(name)s %(message)s')
1395 1396 1397 1398 1399

    # this partition_file_handler will be cleaned up after this try: block
    partition_file_handler = logging.FileHandler(
                filename="%s/instance.log" % (log_folder_path)
            )
1400 1401 1402 1403 1404 1405 1406
    partition_file_handler.setFormatter(formatter)
    self.logger.addHandler(partition_file_handler)
    try:
      self.logger.info('Processing Computer Partition %s.' % computer_partition_id)
      self.logger.info('  Software URL: %s' % software_url)
      self.logger.info('  Software path: %s' % software_path)
      self.logger.info('  Instance path: %s' % instance_path)
1407

1408 1409 1410
      # Update certifcate at late as possible
      local_partition._updateCertificate()

1411
      # XXX this line breaks 37 tests
1412
      # self.logger.info('  Instance type: %s' % computer_partition.get("software_type"))
1413
      self.logger.info('  Instance status: %s' % computer_partition_state)
1414

1415 1416 1417
      if os.path.exists(error_output_file):
        os.unlink(error_output_file)

1418 1419
      partition_ip_list = full_hosting_ip_list = []
      if self.firewall_conf:
1420
        partition_ip_list = computer_partition['ip_list'] + computer_partition.get(
1421
                                                            'full_ip_list', [])
1422

1423
      if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
1424
        local_partition.install()
1425 1426 1427 1428
        if not self.force_stop:
          local_partition.start()
        else:
          local_partition.stop()
1429 1430
        if self.firewall_conf:
          self._setupComputerPartitionFirewall(computer_partition,
1431
                                              partition_ip_list)
1432 1433
        if not self.force_stop:
          self._checkPromiseList(local_partition)
1434 1435 1436 1437 1438 1439 1440 1441
          if not self.api_backward_compatibility:
            self.slap.jio_api_connector.put({
              "portal_type": "Software Instance",
              "reference": computer_partition.get("reference"),
              "reported_state": "started"
            })
          else:
            computer_partition["slap_partition"].started()
1442
        self._endInstallationTransaction(computer_partition)
1443 1444 1445 1446 1447
      elif computer_partition_state == COMPUTER_PARTITION_STOPPED_STATE:
        try:
          # We want to process the partition, even if stopped, because it should
          # propagate the state to children if any.
          local_partition.install()
1448 1449
          if self.firewall_conf:
            self._setupComputerPartitionFirewall(computer_partition,
1450
                                                partition_ip_list)
1451 1452 1453
        finally:
          # Instance has to be stopped even if buildout/reporting is wrong.
          local_partition.stop()
1454
        try:
1455 1456 1457 1458 1459 1460 1461 1462
          if not self.api_backward_compatibility:
            self.slap.jio_api_connector.put({
              "portal_type": "Software Instance",
              "reference": computer_partition.get("reference"),
              "reported_state": "stopped"
            })
          else:
            computer_partition["slap_partition"].stopped()
1463
        except (SystemExit, KeyboardInterrupt):
1464
          self.sendPartitionError(computer_partition, traceback.format_exc(), logger=self.logger)
1465 1466 1467
          raise
        except Exception:
          pass
1468
        self._endInstallationTransaction(computer_partition)
1469 1470
      elif computer_partition_state == COMPUTER_PARTITION_DESTROYED_STATE:
        local_partition.stop()
1471 1472 1473 1474
        if self.firewall_conf:
          self._setupComputerPartitionFirewall(computer_partition,
                                              partition_ip_list,
                                              drop_entries=True)
1475
        try:
1476 1477 1478 1479 1480 1481 1482 1483
          if not self.api_backward_compatibility:
            self.slap.jio_api_connector.put({
              "portal_type": "Software Instance",
              "reference": computer_partition.get("reference"),
              "reported_state": "stopped"
            })
          else:
            computer_partition["slap_partition"].stopped()
1484
        except (SystemExit, KeyboardInterrupt):
1485
          self.sendPartitionError(computer_partition, traceback.format_exc(), logger=self.logger)
1486 1487 1488 1489 1490 1491
          raise
        except Exception:
          pass
      else:
        error_string = "Computer Partition %r has unsupported state: %s" % \
          (computer_partition_id, computer_partition_state)
1492
        self.sendPartitionError(computer_partition, error_string, logger=self.logger)
1493
        raise NotImplementedError(error_string)
Bryton Lacquement's avatar
Bryton Lacquement committed
1494
    except Exception as e:
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504
      if not isinstance(e, PromiseError):
        with open(error_output_file, 'w') as error_file:
          # Write error message in a log file assible to computer partition user
          error_file.write(str(e))
        if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
          try:
            self._checkPromiseList(local_partition)
          except PromiseError:
            # updating promises state, no need to raise here
            pass
1505
      raise e
1506
    finally:
1507
      self.logger.removeHandler(partition_file_handler)
1508
      partition_file_handler.close()
1509 1510 1511 1512
      # Run manager tear down, even if something happened, like promise error,
      # as manager might be used for this
      for manager in self._manager_list:
        manager.instanceTearDown(local_partition)
1513

1514 1515
    # If partition has been successfully processed, write timestamp
    if timestamp:
1516
      with open(timestamp_path, 'w') as f:
1517
        f.write(str(timestamp))
1518

1519
  def FilterComputerPartitionList(self, computer_partition_list):
Cédric de Saint Martin's avatar
Cédric de Saint Martin committed
1520
    """
1521
    Try to filter valid partitions to be processed from free partitions.
Łukasz Nowak's avatar
Łukasz Nowak committed
1522
    """
1523
    filtered_computer_partition_list = []
1524
    empty_partition_listdir = [], ['.slapos-resource']
1525
    for computer_partition in computer_partition_list:
Łukasz Nowak's avatar
Łukasz Nowak committed
1526
      try:
1527
        computer_partition_path = os.path.join(self.instance_root,
1528
            computer_partition.get("compute_partition_id"))
1529 1530 1531 1532 1533
        if not os.path.exists(computer_partition_path):
          raise NotFoundError('Partition directory %s does not exist.' %
              computer_partition_path)
        # Check state of partition. If it is in "destroyed" state, check if it
        # partition is actually installed in the Computer or if it is "free"
1534
        # partition, and check if it has some Software information.
1535 1536
        # XXX-Cedric: Temporary AND ugly solution to check if an instance
        # is in the partition. Dangerous because not 100% sure it is empty
1537 1538
        computer_partition_state = computer_partition.get("state")
        software_url = computer_partition.get("software_release_uri")
1539
        if computer_partition_state == COMPUTER_PARTITION_DESTROYED_STATE and \
1540
           not software_url:
1541 1542 1543 1544 1545 1546
          # Exclude files which may come from concurrent processing 
          #  ie.: slapos ndoe report and slapos node instance commands 
          # can create a .timestamp file.
          file_list = os.listdir(computer_partition_path)
          for garbage_file in [".slapgrid", ".timestamp"]:
            if garbage_file in file_list:
1547 1548 1549 1550 1551
              garbage_path = "/".join([computer_partition_path, garbage_file])
              if os.path.isfile(garbage_path):
                os.unlink(garbage_path)
              else:
                shutil.rmtree(garbage_path)
1552

1553 1554
          # Ignore .slapos-resource file dumped by slapformat.
          if os.listdir(computer_partition_path) not in empty_partition_listdir:
1555
            self.logger.warning("Free partition %s contains file(s) in %s." % (
1556
                computer_partition.get("compute_partition_id"), computer_partition_path))
1557
          continue
1558

1559 1560 1561 1562 1563 1564 1565
        # Everything seems fine
        filtered_computer_partition_list.append(computer_partition)

      # XXX-Cedric: factor all this error handling

      # Send log before exiting
      except (SystemExit, KeyboardInterrupt):
1566
        self.sendPartitionError(computer_partition, traceback.format_exc(), logger=self.logger)
1567 1568
        raise

Marco Mariani's avatar
Marco Mariani committed
1569
      except Exception as exc:
1570 1571 1572 1573
        # if Buildout failed: send log but don't print it to output (already done)
        if not isinstance(exc, BuildoutFailedError):
          # For everything else: log it, send it, continue.
          self.logger.exception('')
1574
        try:
1575
          self.sendPartitionError(computer_partition, exc, logger=self.logger)
1576 1577 1578
        except (SystemExit, KeyboardInterrupt):
          raise
        except Exception:
1579
          self.logger.exception('Problem while reporting error, continuing:')
1580 1581 1582 1583

    return filtered_computer_partition_list

  def processComputerPartitionList(self):
1584 1585 1586 1587 1588 1589
    try:
      return self.processComputerPartitionListOnline()
    except RequestException:
      return self.processComputerPartitionListOffline()

  def processComputerPartitionListOnline(self):
1590 1591 1592
    """
    Will start supervisord and process each Computer Partition.
    """
1593
    self.logger.info('Processing computer partitions...')
1594 1595 1596
    # Prepares environment
    self.checkEnvironmentAndCreateStructure()
    self._launchSupervisord()
1597 1598

    # Boolean to know if every instance has correctly been deployed
1599
    clean_run = True
1600 1601
    # Boolean to know if every promises correctly passed
    clean_run_promise = True
1602

1603
    computer_partition_list = self.getRequiredComputerPartitionList()
1604

1605 1606 1607
    process_error_partition_list = []
    promise_error_partition_list = []

1608 1609 1610 1611
    for computer_partition in computer_partition_list:
      # Nothing should raise outside of the current loop iteration, so that
      # even if something is terribly wrong while processing an instance, it
      # won't prevent processing other ones.
1612 1613 1614 1615 1616
      if not self.api_backward_compatibility:
        computer_partition = self.slap.jio_api_connector.get({
          "portal_type": "Software Instance",
          "reference": computer_partition["reference"]
        })
1617
      try:
1618
        # Process the partition itself
1619
        self.processComputerPartition(computer_partition)
1620

1621 1622 1623 1624
      # Handle connection loss at the next level
      except RequestException:
        raise

1625
      # Send log before exiting
Łukasz Nowak's avatar
Łukasz Nowak committed
1626
      except (SystemExit, KeyboardInterrupt):
1627
        self.sendPartitionError(computer_partition, traceback.format_exc(), logger=self.logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
1628
        raise
1629

1630
      except PromiseError as exc:
1631 1632
        clean_run_promise = False
        try:
1633
          self.logger.error(exc)
1634
          self.sendPartitionError(computer_partition, exc, logger=self.logger)
1635
          promise_error_partition_list.append((computer_partition, exc))
1636 1637 1638
        except (SystemExit, KeyboardInterrupt):
          raise
        except Exception:
1639
          self.logger.exception('Problem while reporting error, continuing:')
1640

Marco Mariani's avatar
Marco Mariani committed
1641
      except Exception as exc:
Łukasz Nowak's avatar
Łukasz Nowak committed
1642
        clean_run = False
1643 1644 1645 1646
        # if Buildout failed: send log but don't print it to output (already done)
        if not isinstance(exc, BuildoutFailedError):
          # For everything else: log it, send it, continue.
          self.logger.exception('')
1647
        try:
1648
          self.sendPartitionError(computer_partition, exc, logger=self.logger)
1649
          process_error_partition_list.append((computer_partition, exc))
1650 1651 1652
        except (SystemExit, KeyboardInterrupt):
          raise
        except Exception:
1653
          self.logger.exception('Problem while reporting error, continuing:')
1654

1655 1656 1657
    def getPartitionType(part):
      """returns the partition type, if known at that point.
      """
1658 1659
      software_type = partition.get("software_type", None)
      if software_type is None:
1660 1661
        return '(not ready)'

1662
    self.logger.info('Finished computer partitions.')
1663 1664 1665 1666
    self.logger.info('=' * 80)
    if process_error_partition_list:
      self.logger.info('Error while processing the following partitions:')
      for partition, exc in process_error_partition_list:
1667
        self.logger.info('  %s[%s]: %s', partition.get("compute_partition_id"), getPartitionType(partition), exc)
1668 1669 1670
    if promise_error_partition_list:
      self.logger.info('Error with promises for the following partitions:')
      for partition, exc in promise_error_partition_list:
1671
        self.logger.info('  %s[%s]: %s', partition.get("compute_partition_id"), getPartitionType(partition), exc)
1672 1673 1674 1675 1676 1677 1678

    # Return success value
    if not clean_run:
      return SLAPGRID_FAIL
    if not clean_run_promise:
      return SLAPGRID_PROMISE_FAIL
    return SLAPGRID_SUCCESS
Łukasz Nowak's avatar
Łukasz Nowak committed
1679

1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693
  def processComputerPartitionListOffline(self):
    self.logger.info('Processing computer partitions offline...')
    try:
      supervisord_socket_path = _getSupervisordSocketPath(
        self.instance_root,
        self.logger
      )
      with getSupervisorRPC(supervisord_socket_path) as supervisor:
        supervisor.startAllProcesses(False)
    except Exception:
      self.logger.exception('Error in offline mode while starting partitions:')
      return SLAPGRID_FAIL
    return SLAPGRID_OFFLINE_SUCCESS

1694 1695 1696 1697 1698 1699 1700 1701
  def processPromiseList(self):
    """
    Will check and process promises for each Computer Partition.
    """
    self.logger.info('Processing promises...')
    # Return success value
    clean_run_promise = True

1702
    computer_partition_list = self.getRequiredComputerPartitionList()
1703 1704 1705

    promise_error_partition_list = []
    for computer_partition in computer_partition_list:
1706 1707 1708 1709 1710
      if not self.api_backward_compatibility:
        computer_partition = self.slap.jio_api_connector.get({
          "portal_type": "Software Instance",
          "reference": computer_partition["reference"]
        })
1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
      try:
        # Process the partition itself
        self.processPromise(computer_partition)
      except PromiseError as exc:
        clean_run_promise = False
        self.logger.error(exc)
        promise_error_partition_list.append((computer_partition, exc))
      except Exception as exc:
        clean_run_promise = False
        self.logger.exception('Problem while reporting error, continuing:')
        promise_error_partition_list.append((computer_partition, exc))

    def getPartitionType(part):
      """returns the partition type, if known at that point.
      """
1726 1727
      software_type = partition.get("software_type", None)
      if software_type is None:
1728 1729 1730 1731 1732
        return '(not ready)'

    if promise_error_partition_list:
      self.logger.info('Finished computer partitions.')
      for partition, exc in promise_error_partition_list:
1733
        self.logger.info('  %s[%s]: %s', partition.get("compute_partition_id"), getPartitionType(partition), exc)
1734 1735 1736 1737 1738 1739 1740 1741

    # Return success value
    if not clean_run_promise:
      return SLAPGRID_PROMISE_FAIL
    return SLAPGRID_SUCCESS



1742 1743 1744 1745 1746 1747
  def _checkWaitProcessList(self, partition, state_list):
    wait_file = os.path.join(partition.instance_path,
                             COMPUTER_PARTITION_WAIT_LIST_FILENAME)

    if os.path.exists(wait_file) and os.path.isfile(wait_file):
      with open(wait_file) as wait_f:
1748
        processes_list = [name.strip() for name in wait_f if name]
1749 1750 1751 1752 1753
        # return True if one of process in the list is running
        return partition.checkProcessesFromStateList(processes_list,
                                                     state_list)
    return False

1754 1755 1756
  def validateXML(self, to_be_validated, xsd_model):
    """Validates a given xml file"""
    #We retrieve the xsd model
Bryton Lacquement's avatar
Bryton Lacquement committed
1757
    xsd_model = BytesIO(xsd_model)
1758
    xmlschema_doc = etree.parse(xsd_model)
Łukasz Nowak's avatar
Łukasz Nowak committed
1759 1760
    xmlschema = etree.XMLSchema(xmlschema_doc)

1761
    try:
1762
      document = etree.fromstring(to_be_validated)
Marco Mariani's avatar
Marco Mariani committed
1763
    except (etree.XMLSyntaxError, etree.DocumentInvalid) as exc:
Marco Mariani's avatar
Marco Mariani committed
1764 1765
      self.logger.info('Failed to parse this XML report :  %s\n%s' %
                          (to_be_validated, _formatXMLError(exc)))
1766
      self.logger.error(_formatXMLError(exc))
1767 1768
      return False

Łukasz Nowak's avatar
Łukasz Nowak committed
1769 1770 1771 1772 1773
    if xmlschema.validate(document):
      return True

    return False

1774 1775 1776
  def asXML(self, computer_partition_usage_list):
    """Generates a XML report from computer partition usage list
    """
1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790
    xml = ['<?xml version="1.0"?>',
           '<journal>',
           '<transaction type="Sale Packing List">',
           '<title>Resource consumptions</title>',
           '<start_date></start_date>',
           '<stop_date>%s</stop_date>' % time.strftime("%Y-%m-%d at %H:%M:%S"),
           '<reference>%s</reference>' % self.computer_id,
           '<currency></currency>',
           '<payment_mode></payment_mode>',
           '<category></category>',
           '<arrow type="Administration">',
           '<source></source>',
           '<destination></destination>',
           '</arrow>']
1791 1792 1793

    for computer_partition_usage in computer_partition_usage_list:
      try:
1794
        root = etree.fromstring(computer_partition_usage.usage)
Marco Mariani's avatar
Marco Mariani committed
1795
      except UnicodeError as exc:
1796
        self.logger.info("Failed to read %s." % computer_partition_usage.usage)
1797
        self.logger.error(UnicodeError)
Marco Mariani's avatar
Marco Mariani committed
1798 1799
        raise UnicodeError("Failed to read %s: %s" % (computer_partition_usage.usage, exc))
      except (etree.XMLSyntaxError, etree.DocumentInvalid) as exc:
Cédric de Saint Martin's avatar
YATTA  
Cédric de Saint Martin committed
1800
        self.logger.info("Failed to parse %s." % (computer_partition_usage.usage))
Marco Mariani's avatar
Marco Mariani committed
1801 1802 1803 1804
        self.logger.error(exc)
        raise _formatXMLError(exc)
      except Exception as exc:
        raise Exception("Failed to generate XML report: %s" % exc)
1805 1806

      for movement in root.findall('movement'):
1807 1808 1809 1810
        xml.append('<movement>')
        for child in movement.getchildren():
          if child.tag == "reference":
            xml.append('<%s>%s</%s>' % (child.tag, computer_partition_usage.getId(), child.tag))
1811
          else:
1812 1813
            xml.append('<%s>%s</%s>' % (child.tag, child.text, child.tag))
        xml.append('</movement>')
1814

1815
    xml.append('</transaction></journal>')
1816

1817
    return ''.join(xml)
1818

Łukasz Nowak's avatar
Łukasz Nowak committed
1819 1820 1821
  def agregateAndSendUsage(self):
    """Will agregate usage from each Computer Partition.
    """
1822 1823 1824 1825
    # Prepares environment
    self.checkEnvironmentAndCreateStructure()
    self._launchSupervisord()

Łukasz Nowak's avatar
Łukasz Nowak committed
1826
    computer_partition_usage_list = []
1827
    self.logger.info('Aggregating and sending usage reports...')
Łukasz Nowak's avatar
Łukasz Nowak committed
1828

1829 1830 1831 1832 1833 1834 1835 1836 1837
    #We retrieve XSD models
    try:
      computer_consumption_model = \
        pkg_resources.resource_string(
          'slapos.slap',
          'doc/computer_consumption.xsd')
    except IOError:
      computer_consumption_model = \
        pkg_resources.resource_string(
1838
          __name__,
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
          '../../../../slapos/slap/doc/computer_consumption.xsd')

    try:
      partition_consumption_model = \
        pkg_resources.resource_string(
          'slapos.slap',
          'doc/partition_consumption.xsd')
    except IOError:
      partition_consumption_model = \
        pkg_resources.resource_string(
1849
          __name__,
1850 1851
          '../../../../slapos/slap/doc/partition_consumption.xsd')

Łukasz Nowak's avatar
Łukasz Nowak committed
1852
    clean_run = True
1853
    # Loop over the different computer partitions
1854
    computer_partition_list = self.FilterComputerPartitionList(
1855
       self.getComputerPartitionList())
1856

1857
    for computer_partition in computer_partition_list:
1858
      try:
1859
        computer_partition_id = computer_partition.get("compute_partition_id")
1860

1861
        instance_path = os.path.join(self.instance_root, computer_partition_id)
Marco Mariani's avatar
Marco Mariani committed
1862

1863 1864 1865 1866 1867
        # We now generate a pseudorandom name for the report xml file
        # that will be passed to the invocation list
        slapreport_path = tempfile.mktemp(
          prefix='slapreport.',
          dir=os.path.join(instance_path, 'var', 'xml_report'))
Marco Mariani's avatar
Marco Mariani committed
1868

1869
        # We want to execute all the script in the report folder
1870
        failed_script_list = []
1871 1872
        report_dir = os.path.join(instance_path, 'etc', 'report')
        for script in listifdir(report_dir):
1873
          invocation_list = []
1874 1875 1876
          invocation_list.append(os.path.join(report_dir, script))
          invocation_list.append(slapreport_path)

1877
          # Dropping privileges
1878 1879 1880 1881
          stat_info = os.stat(instance_path)
          uid = stat_info.st_uid
          gid = stat_info.st_gid
          process_handler = SlapPopen(invocation_list,
1882
                                      preexec_fn=lambda: dropPrivileges(uid, gid, logger=self.logger),
Marco Mariani's avatar
Marco Mariani committed
1883
                                      cwd=os.path.join(instance_path, 'etc', 'report'),
1884 1885
                                      env=getCleanEnvironment(self.logger,
                                        home_path=pwd.getpwuid(uid).pw_dir),
Marco Mariani's avatar
Marco Mariani committed
1886
                                      stdout=subprocess.PIPE,
1887 1888
                                      stderr=subprocess.STDOUT,
                                      logger=self.logger)
1889 1890 1891 1892
          if process_handler.returncode is None:
            process_handler.kill()
          if process_handler.returncode != 0:
            clean_run = False
1893
            failed_script_list.append("Script %r failed." % script)
1894
            self.logger.warning('Failed to run %r' % invocation_list)
1895
          if len(failed_script_list):
1896
            self.sendPartitionError(computer_partition, '\n'.join(failed_script_list), logger=self.logger)
1897 1898
      # Whatever happens, don't stop processing other instances
      except Exception:
1899
        self.logger.exception('Cannot run usage script(s) for %r:' %
1900
                                  computer_partition.get("compute_partition_id"))
Łukasz Nowak's avatar
Łukasz Nowak committed
1901

1902
    # Now we loop through the different computer partitions to report
Łukasz Nowak's avatar
Łukasz Nowak committed
1903
    report_usage_issue_cp_list = []
1904
    for computer_partition in computer_partition_list:
1905 1906
      try:
        filename_delete_list = []
1907
        computer_partition_id = computer_partition.get("compute_partition_id")
1908
        instance_path = os.path.join(self.instance_root, computer_partition_id)
1909
        dir_report_list = [os.path.join(instance_path, 'var', 'xml_report'),
1910
            os.path.join(self.instance_root, 'var', 'xml_report',
1911
                         computer_partition_id)]
1912

1913 1914 1915 1916 1917
        for dir_reports in dir_report_list:
          # The directory xml_report contain a number of files equal
          # to the number of software instance running inside the same partition
          if os.path.isdir(dir_reports):
            filename_list = os.listdir(dir_reports)
1918
          else:
1919 1920 1921 1922 1923 1924 1925
            filename_list = []
          # self.logger.debug('name List %s' % filename_list)

          for filename in filename_list:

            file_path = os.path.join(dir_reports, filename)
            if os.path.exists(file_path):
1926
              with open(file_path, 'rb') as f:
1927
                usage = f.read()
1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943

              # We check the validity of xml content of each reports
              if not self.validateXML(usage, partition_consumption_model):
                self.logger.info('WARNING: The XML file %s generated by slapreport is '
                                 'not valid - This report is left as is at %s where you can '
                                 'inspect what went wrong ' % (filename, dir_reports))
                # Warn the SlapOS Master that a partition generates corrupted xml
                # report
              else:
                computer_partition_usage = self.slap.registerComputerPartition(
                    self.computer_id, computer_partition_id)
                computer_partition_usage.setUsage(usage)
                computer_partition_usage_list.append(computer_partition_usage)
                filename_delete_list.append(filename)
            else:
              self.logger.debug('Usage report %r not found, ignored' % file_path)
Łukasz Nowak's avatar
Łukasz Nowak committed
1944

1945 1946 1947
          # After sending the aggregated file we remove all the valid xml reports
          for filename in filename_delete_list:
            os.remove(os.path.join(dir_reports, filename))
1948 1949 1950

      # Whatever happens, don't stop processing other instances
      except Exception:
1951
        self.logger.exception('Cannot run usage script(s) for %r:' %
1952
                                computer_partition.get("compute_partition_id"))
1953 1954

    for computer_partition_usage in computer_partition_usage_list:
Marco Mariani's avatar
Marco Mariani committed
1955 1956
      self.logger.info('computer_partition_usage_list: %s - %s' %
                       (computer_partition_usage.usage, computer_partition_usage.getId()))
1957

1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972
    filename_delete_list = []
    computer_report_dir = os.path.join(self.instance_root,
                                 'var', 'xml_report', self.computer_id)

    # The directory xml_report contain a number of files equal
    # to the number of software instance running inside the same partition
    if os.path.isdir(computer_report_dir):
      filename_list = os.listdir(computer_report_dir)
    else:
      filename_list = []

    for filename in filename_list:

      file_path = os.path.join(computer_report_dir, filename)
      if os.path.exists(file_path):
1973
        with open(file_path, 'rb') as f:
1974
          usage = f.read()
1975 1976 1977

      if self.validateXML(usage, computer_consumption_model):
        self.logger.info('XML file generated by asXML is valid')
1978
        self.computer.reportUsage(usage)
1979 1980
        filename_delete_list.append(filename)
      else:
1981
        self.logger.info('XML file is invalid %s' % file_path)
1982 1983 1984 1985 1986

    # After sending the aggregated file we remove all the valid xml reports
    for filename in filename_delete_list:
      os.remove(os.path.join(computer_report_dir, filename))

1987
    # If there is, at least, one report
1988
    if computer_partition_usage_list != []:
Łukasz Nowak's avatar
Łukasz Nowak committed
1989
      try:
1990
        # We generate the final XML report with asXML method
1991 1992
        computer_consumption = self.asXML(computer_partition_usage_list)

1993
        self.logger.info('Final xml report: %s' % computer_consumption)
1994

1995
        # We test the XML report before sending it
1996
        if self.validateXML(computer_consumption, computer_consumption_model):
1997
          self.logger.info('XML file generated by asXML is valid')
1998
          self.computer.reportUsage(computer_consumption)
1999
        else:
2000
          self.logger.info('XML file generated by asXML is not valid !')
2001
          raise ValueError('XML file generated by asXML is not valid !')
Łukasz Nowak's avatar
Łukasz Nowak committed
2002
      except Exception:
2003
        issue = "Cannot report usage for %r: %s" % (
2004
            computer_partition.get("compute_partition_id"),
Marco Mariani's avatar
Marco Mariani committed
2005
            traceback.format_exc())
2006
        self.logger.info(issue)
2007
        self.sendPartitionError(computer_partition, issue, logger=self.logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
2008 2009
        report_usage_issue_cp_list.append(computer_partition_id)

2010
    for computer_partition in computer_partition_list:
2011
      if computer_partition.get("state") == COMPUTER_PARTITION_DESTROYED_STATE:
2012
        destroyed = False
Łukasz Nowak's avatar
Łukasz Nowak committed
2013
        try:
2014 2015
          computer_partition_id = computer_partition.get("compute_partition_id")
          software_url = computer_partition.get("software_release_uri")
2016
          try:
2017
            software_path = os.path.join(self.software_root, md5digest(software_url))
2018 2019 2020
          except TypeError:
            # Problem with instance: SR URI not set.
            # Try to process it anyway, it may need to be deleted.
2021
            software_path = None
2022

2023 2024 2025 2026 2027 2028
          if not self.api_backward_compatibility:
            computer_partition = self.slap.jio_api_connector.get({
              "portal_type": "Software Instance",
              "reference": computer_partition["reference"]
            })

2029 2030 2031
          local_partition = Partition(
            software_path=software_path,
            instance_path=os.path.join(self.instance_root,
2032
                computer_partition.get("compute_partition_id")),
2033
            shared_part_list=self.shared_part_list,
2034 2035 2036
            supervisord_partition_configuration_path=os.path.join(
              _getSupervisordConfigurationDirectory(self.instance_root), '%s.conf' %
              computer_partition_id),
2037 2038 2039 2040 2041 2042 2043
            supervisord_socket=self.supervisord_socket,
            computer_partition=computer_partition,
            computer_id=self.computer_id,
            partition_id=computer_partition_id,
            server_url=self.master_url,
            software_release_url=software_url,
            certificate_repository_path=self.certificate_repository_path,
2044
            buildout=self.buildout,
2045
            buildout_debug=self.buildout_debug,
2046
            logger=self.logger,
2047
            instance_storage_home=self.instance_storage_home,
2048
            ipv4_global_network=self.ipv4_global_network,
2049
            api_backward_compatibility=self.api_backward_compatibility,
2050
          )
Łukasz Nowak's avatar
Łukasz Nowak committed
2051
          local_partition.stop()
2052
          local_partition._updateCertificate()
Łukasz Nowak's avatar
Łukasz Nowak committed
2053
          try:
2054 2055 2056 2057 2058 2059 2060 2061
            if not self.api_backward_compatibility:
              self.slap.jio_api_connector.put({
                "portal_type": "Software Instance",
                "reference": computer_partition.get("reference"),
                "reported_state": "stopped"
              })
            else:
              computer_partition["slap_partition"].stopped()
Łukasz Nowak's avatar
Łukasz Nowak committed
2062
          except (SystemExit, KeyboardInterrupt):
2063
            self.sendPartitionError(computer_partition, traceback.format_exc(), logger=self.logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
2064 2065 2066
            raise
          except Exception:
            pass
2067 2068 2069 2070
          # let managers update current partition
          for manager in self._manager_list:
            manager.report(local_partition)

2071
          if computer_partition.get("compute_partition_id") in report_usage_issue_cp_list:
2072
            self.logger.info('Ignoring destruction of %r, as no report usage was sent' %
2073
                                computer_partition.get("compute_partition_id"))
2074
            continue
2075 2076 2077 2078 2079
          if self._checkWaitProcessList(local_partition,
              state_list=['RUNNING', 'STARTING']):
            self.logger.info('There are running processes into the partition,' \
              ' wait until they finish...')
            continue
2080
          destroyed = local_partition.destroy()
Łukasz Nowak's avatar
Łukasz Nowak committed
2081
        except (SystemExit, KeyboardInterrupt):
2082
          self.sendPartitionError(computer_partition, traceback.format_exc(), logger=self.logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
2083 2084 2085
          raise
        except Exception:
          clean_run = False
2086
          self.logger.exception('')
Marco Mariani's avatar
Marco Mariani committed
2087
          exc = traceback.format_exc()
2088
          self.sendPartitionError(computer_partition, exc, logger=self.logger)
Łukasz Nowak's avatar
Łukasz Nowak committed
2089
        try:
2090
          if destroyed:
2091 2092 2093 2094 2095 2096 2097 2098
            if not self.api_backward_compatibility:
              self.slap.jio_api_connector.put({
                "portal_type": "Software Instance",
                "reference": computer_partition.get("reference"),
                "reported_state": "destroyed"
              })
            else:
              computer_partition["slap_partition"].destroyed()
Marco Mariani's avatar
Marco Mariani committed
2099
        except NotFoundError:
2100 2101
          self.logger.debug('Ignored slap error while trying to inform about '
                            'destroying not fully configured Computer Partition %r' %
2102
                                computer_partition.get("compute_partition_id"))
2103
        except ServerError as server_error:
2104 2105
          self.logger.debug('Ignored server error while trying to inform about '
                            'destroying Computer Partition %r. Error is:\n%r' %
2106
                                (computer_partition.get("compute_partition_id"), server_error.args[0]))
Łukasz Nowak's avatar
Łukasz Nowak committed
2107

2108
    self.logger.info('Finished usage reports.')
2109 2110 2111 2112 2113

    # Return success value
    if not clean_run:
      return SLAPGRID_FAIL
    return SLAPGRID_SUCCESS