run-zelenium-test.py.in 9.93 KB
#!{{ bin_path }}
# BEWARE: This file is operated by slapgrid
# BEWARE: It will be overwritten automatically

import argparse, os, sys, traceback
from erp5.util import taskdistribution
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
from urllib import urlopen

def main():
  parser = argparse.ArgumentParser(description='Run a test suite.')
  parser_configuration = {{ repr(configuration) }}
  parser.add_argument('--test_suite', help='The test suite name')
  parser.add_argument('--test_suite_title', help='The test suite title')
  parser.add_argument('--test_node_title', help='The test node title')
  parser.add_argument('--project_title', help='The project title')
  parser.add_argument('--revision', help='The revision to test',
                      default='dummy_revision')
  parser.add_argument('--node_quantity', help='ignored', type=int)
  parser.add_argument('--master_url',
                      help='The Url of Master controling many suites')
  parser.add_argument('--remote_access_url',
                      help='The access url',
                      default = parser_configuration.get('remote-access-url')
                     )
  parser.add_argument('--target',
                      help='Target OS to run tests on',
                      default = parser_configuration.get('target')
                     )
  parser.add_argument('--target_version',
                      help='Target OS version to use',
                      default = parser_configuration.get('target-version')
                     )
  parser.add_argument('--target_browser',
                      help='The desired browser of the target OS to be used. Example: Firefox if target is Android.',
                      default = parser_configuration.get('target-browser')
                     )
  parser.add_argument('--target_device',
                      help='The desired device running the target OS. Example: iPad Simulator, if target is iOS.',
                      default = parser_configuration.get('target-device')
                     )
  parser.add_argument('--appium_server_auth',
                      help='Combination of user and token to access SauceLabs service. (i.e. user:token)',
                      default = parser_configuration.get('appium-server-auth')
                     )
  parser.add_argument('--run_only',
                      help='zuite to run',
                      default = parser_configuration.get('run-only')
                     )
  parser.add_argument('--max_duration',
                      type=int,
                      help='max duration for running test en second',
                      default = parser_configuration.get('max-duration', 1800)
                     )
  args = parser.parse_args()
  test_line_dict = {}
  test_suite_title = args.test_suite_title or args.test_suite
  test_suite = args.test_suite
  revision = args.revision

  # curl https://saucelabs.com/rest/v1/info/platforms/all
  # https://wiki.saucelabs.com/display/DOCS/Platform+Configurator#/
  if args.target in ['iOS', 'Android']:
    # parameters for mobile emulators have different names then parameters for
    # desktop OSes
    capabilities = {
      'platformName': args.target,
      'platformVersion': args.target_version,
      'deviceName': args.target_device,
      'browserName': args.target_browser,
      'maxDuration': args.max_duration,
      'name': test_suite_title
    }
  elif 'Windows' in args.target or 'OS X' in args.target:
    capabilities = {
      'browserName': args.target_browser,
      'platform': args.target,
      'version': args.target_version,
      'maxDuration': args.max_duration,
      'name': test_suite_title
    }

  if not args.appium_server_auth or not args.remote_access_url:
    sys.exit(-1)

  #Authentication over HTTPS is not supported in saucelab
  #https://wiki.saucelabs.com/display/DOCS/Instant+Selenium+Python+Tests
  appium_url = "http://%s@ondemand.saucelabs.com/wd/hub" % (args.appium_server_auth)
  # adjust make path to access url
  # Do not store any test result in the ZMI
  if args.run_only:
    url = "%s/erp5/portal_tests/%s/core/TestRunner.html" \
      "?test=../test_suite_html" \
      "&auto=on" \
      "&resultsUrl=../getId" \
      "&__ac_name=%s" \
      "&__ac_password=%s" % (args.remote_access_url, args.run_only, {{ repr(user) }}, {{ repr(password) }})
  else:
    url = "%s/erp5/portal_tests/core/TestRunner.html" \
      "?test=../test_suite_html" \
      "&auto=on" \
      "&resultsUrl=../getId" \
      "&__ac_name=%s" \
      "&__ac_password=%s" % (args.remote_access_url, {{ repr(user) }}, {{ repr(password) }})
    
  # Wait until all activities are finished...
  wait_url = args.remote_access_url + '/erp5/Zuite_waitForActivities'
  while 1:
    try:
      response = urlopen(wait_url)
      try:
        if response.code == 500:
          sys.exit(-1)
        if response.code == 200 and response.read() == 'Done.':
          break
      finally:
        response.close()
    except Exception:
      traceback.print_exc()
    time.sleep(10)
 
  tool = taskdistribution.TaskDistributor(portal_url=args.master_url)
  browser = webdriver.Remote(appium_url, capabilities)
  
  try:
    agent = browser.execute_script("return navigator.userAgent")
    print url
    print agent
    start_time = time.time()
    browser.get(url)
    # Wait for Zelenium to be loaded
    WebDriverWait(browser, 10).until(EC.presence_of_element_located((
      By.XPATH, '//iframe[@id="testSuiteFrame"]'
    )))
    # XXX No idea how to wait for the iframe content to be loaded
    time.sleep(5)
    # Count number of test to be executed
    test_count = browser.execute_script(
      "return document.getElementById('testSuiteFrame').contentDocument.querySelector('tbody').children.length"
    ) - 1 # First child is the file name

    # Wait for test to be executed
    WebDriverWait(browser, args.max_duration).until(EC.presence_of_element_located((
      By.XPATH, '//td[@id="testRuns" and contains(text(), "%i")]' % test_count
    )))
  except:
    test_line_dict['UnexpectedException'] = {
      'test_count': 1,
      'error_count': 0,
      'failure_count': 1,
      'skip_count': 0,
      'duration': 1,
      'command': url,
      'stdout': agent,
      'stderr': traceback.format_exc()
    }
  finally:
    execution_duration = round(time.time() - start_time, 2)
    if test_count:
      test_execution_duration = execution_duration / test_count
    else:
      test_execution_duration = 0
    html_parser = etree.HTMLParser(recover=True)

    # body = etree.fromstring(browser.page_source.encode('UTF-8'), html_parser)
    # test_count = int(body.xpath('//td[@id="testRuns"]')[0].text)
    # failed_test_count = int(body.xpath('//td[@id="testFailures"]')[0].text)
    # print 'Run %i, failed %i' % (test_count, failed_test_count)

    # https://github.com/appium/appium/issues/5199
    # browser.switch_to.frame(browser.find_element_by_id("testSuiteFrame"))
    # iframe = etree.fromstring(browser.page_source.encode('UTF-8'), html_parser)
    iframe = etree.fromstring(
      browser.execute_script(
        "return document.getElementById('testSuiteFrame').contentDocument.querySelector('html').innerHTML"
      ).encode('UTF-8'),
      html_parser
    )
    # parse test result
    tbody = iframe.xpath('.//body/table/tbody')[0]
    for tr in tbody[1:]:
      # First td is the main title
      test_name = tr[0][0].text
      if len(tr) == 1:
        test_line_dict[test_name] = {
        'test_count': 1,
        'error_count': 0,
        'failure_count': 1,
        'skip_count': 0,
        'duration': 0,
        'command': url,
        'stdout': agent,
        'stderr': '',
        'html_test_result': 'Not Executed due to saucelabs related error'
      }
      else:
        skip_count = success_count = error_count = 0
        test_table = tr[1].xpath('.//table')[0]
        test_tbody = tr[1].xpath('.//tbody')[0]

        tr_count = len(test_tbody)
        for tr in test_tbody:
          # print etree.tostring(tr).split('\n')[0]
          status = tr.attrib.get('class')
          if status is None or 'status_done' in status:
            skip_count += 1
          elif 'status_passed' in status:
            success_count += 1
          elif 'status_failed' in status:
            error_count += 1

        test_line_dict[test_name] = {
          'test_count': tr_count,
          'error_count': error_count,
          'failure_count': tr_count - (skip_count + success_count + error_count),
          'skip_count': skip_count,
          'duration': test_execution_duration,
          'command': url,
          'stdout': agent,
          'stderr': '',
          'html_test_result': etree.tostring(test_table)
        }
    browser.quit()

  try:
    test_result = tool.createTestResult(revision = revision,
                                        test_name_list = test_line_dict.keys(),
                                        node_title = args.test_node_title,
                                        test_title = test_suite_title,
                                        project_title = args.project_title)
    if test_result is None or not hasattr(args, 'master_url'):
      return
    # report test results
    while 1:
      test_result_line = test_result.start()
      if not test_result_line:
        print 'No test result anymore.'
        break

      print 'Submitting: "%s"' % test_result_line.name
      # report status back to Nexedi ERP5
      test_result_line.stop(**test_line_dict[test_result_line.name])

  except:
    # Catch any exception here, to warn user instead of being silent,
    # by generating fake error result
    traceback.print_exc()
    result = dict(status_code=-1,
                  command=url,
                  stderr=traceback.format_exc(),
                  stdout='')
    # XXX: inform test node master of error
    raise EnvironmentError(result)

if __name__ == "__main__":
    main()