##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

import cgi
import json
import multiprocessing
import os
import tempfile
import unittest
import urllib.parse
import base64
import hashlib
import logging
import contextlib
from http.server import BaseHTTPRequestHandler

from io import BytesIO

import paramiko
import requests
from PIL import Image
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.remote_connection import RemoteConnection
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
import urllib3

from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import findFreeTCPPort, ImageComparisonTestCase, ManagedHTTPServer

setUpModule, SeleniumServerTestCase = makeModuleSetUpAndTestCaseClass(
    os.path.abspath(
        os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))



class WebServer(ManagedHTTPServer):
  class RequestHandler(BaseHTTPRequestHandler):
    """Request handler for our test server.

    The implemented server is:
      - submit q and you'll get a page with q as title
      - upload a file and the file content will be displayed in div.uploadedfile
    """
    def do_GET(self):
      self.send_response(200)
      self.end_headers()
      self.wfile.write(
          b'''
        <html>
          <title>Test page</title>
          <body>
            <style> p { font-family: Arial; } </style>
            <form action="/" method="POST" enctype="multipart/form-data">
              <input name="q" type="text"></input>
              <input name="f" type="file" ></input>
              <input type="submit" value="I'm feeling lucky"></input>
            </form>
            <p>the quick brown fox jumps over the lazy dog</p>
          </body>
        </html>''')

    def do_POST(self):
      form = cgi.FieldStorage(
          fp=self.rfile,
          headers=self.headers,
          environ={
              'REQUEST_METHOD': 'POST',
              'CONTENT_TYPE': self.headers['Content-Type'],
          })
      self.send_response(200)
      self.end_headers()
      file_data = 'no file'
      if 'f' in form:
        file_data = form['f'].file.read().decode()
      self.wfile.write(
          ('''
        <html>
          <title>%s</title>
          <div>%s</div>
        </html>
      ''' % (form['q'].value, file_data)).encode())

    log_message = logging.getLogger(__name__ + '.WebServer').info


class WebServerMixin(object):
  """Mixin class which provides a simple web server reachable at self.server_url
  """
  def setUp(self):
    self.server_url = self.getManagedResource('web_server', WebServer).url


class BrowserCompatibilityMixin(WebServerMixin):
  """Mixin class to run validation tests on a specific browser
  """
  desired_capabilities = NotImplemented
  user_agent = NotImplemented

  def setUp(self):
    super(BrowserCompatibilityMixin, self).setUp()
    self.driver = webdriver.Remote(
        command_executor=self.computer_partition.getConnectionParameterDict()
        ['backend-url'],
        desired_capabilities=self.desired_capabilities)

  def tearDown(self):
    self.driver.quit()
    super(BrowserCompatibilityMixin, self).tearDown()

  def test_user_agent(self):
    self.assertIn(
        self.user_agent,
        self.driver.execute_script('return navigator.userAgent'))

  def test_simple_submit_scenario(self):
    self.driver.get(self.server_url)
    input_element = WebDriverWait(self.driver, 3).until(
        EC.visibility_of_element_located((By.NAME, 'q')))
    input_element.send_keys(self.id())
    input_element.submit()
    WebDriverWait(self.driver, 3).until(EC.title_is(self.id()))

  def test_upload_file(self):
    f = tempfile.NamedTemporaryFile(delete=False, mode='w')
    f.write(self.id())
    f.close()
    self.addCleanup(lambda: os.remove(f.name))

    self.driver.get(self.server_url)
    self.driver.find_element_by_xpath('//input[@name="f"]').send_keys(f.name)
    self.driver.find_element_by_xpath('//input[@type="submit"]').click()

    self.assertEqual(self.id(), self.driver.find_element_by_xpath('//div').text)

  def test_screenshot(self):
    self.driver.get(self.server_url)
    screenshot = Image.open(BytesIO(self.driver.get_screenshot_as_png()))
    reference_filename = os.path.join(
        os.path.dirname(__file__), "data",
        self.id() + ".png")

    # save the screenshot somewhere in a path that will be in snapshot folder.
    # XXX we could use a better folder name ...
    screenshot.save(
        os.path.join(self.slap.instance_directory, 'etc',
                     self.id() + ".png"))

    reference = Image.open(reference_filename)
    self.assertImagesSame(screenshot, reference)

  def test_window_and_screen_size(self):
    size = json.loads(
        self.driver.execute_script(
            '''
      return JSON.stringify({
        'screen.width': window.screen.width,
        'screen.height': window.screen.height,
        'screen.pixelDepth': window.screen.pixelDepth,
        'innerWidth': window.innerWidth,
        'innerHeight': window.innerHeight
      })'''))
    # Xvfb is configured like this
    self.assertEqual(1024, size['screen.width'])
    self.assertEqual(768, size['screen.height'])
    self.assertEqual(24, size['screen.pixelDepth'])

    # window size must not be 0 (wrong firefox integration report this)
    self.assertGreater(size['innerWidth'], 0)
    self.assertGreater(size['innerHeight'], 0)

  def test_resize_window(self):
    self.driver.set_window_size(800, 900)
    size = json.loads(
        self.driver.execute_script(
            '''
      return JSON.stringify({
        'outerWidth': window.outerWidth,
        'outerHeight': window.outerHeight
        })'''))
    self.assertEqual(800, size['outerWidth'])
    self.assertEqual(900, size['outerHeight'])

  def test_multiple_clients(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    queue = multiprocessing.Queue()

    def _test(q, server_url):
      driver = webdriver.Remote(
          command_executor=webdriver_url,
          desired_capabilities=self.desired_capabilities)
      try:
        driver.get(server_url)
        q.put(driver.title == 'Test page')
      finally:
        driver.quit()

    nb_workers = 10
    workers = []
    for _ in range(nb_workers):
      worker = multiprocessing.Process(
          target=_test, args=(queue, self.server_url))

      worker.start()
      workers.append(worker)
    del worker  # pylint
    _ = [worker.join(timeout=30) for worker in workers]

    # terminate workers if they are still alive after 30 seconds
    _ = [worker.terminate() for worker in workers if worker.is_alive()]
    _ = [worker.join() for worker in workers]

    del _  # pylint
    self.assertEqual(
        [True] * nb_workers, [queue.get() for _ in range(nb_workers)])


class TestBrowserSelection(WebServerMixin, SeleniumServerTestCase):
  """Test browser can be selected by `desiredCapabilities``
  """
  def test_chrome(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    driver = webdriver.Remote(
        command_executor=webdriver_url,
        desired_capabilities=webdriver.ChromeOptions().to_capabilities())

    driver.get(self.server_url)
    self.assertEqual('Test page', driver.title)

    self.assertIn('Chrome', driver.execute_script('return navigator.userAgent'))
    self.assertNotIn(
        'Firefox', driver.execute_script('return navigator.userAgent'))
    driver.quit()

  def test_firefox(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    driver = webdriver.Remote(
        command_executor=webdriver_url,
        desired_capabilities=webdriver.FirefoxOptions().to_capabilities())

    driver.get(self.server_url)
    self.assertEqual('Test page', driver.title)

    self.assertIn(
        'Firefox', driver.execute_script('return navigator.userAgent'))
    driver.quit()

  def test_firefox_desired_version(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['backend-url']

    desired_capabilities = webdriver.FirefoxOptions().to_capabilities()
    desired_capabilities['version'] = '102.15.1esr'
    driver = webdriver.Remote(
        command_executor=webdriver_url,
        desired_capabilities=desired_capabilities)
    self.assertIn(
        'Gecko/20100101 Firefox/102.0',
        driver.execute_script('return navigator.userAgent'))
    driver.quit()
    desired_capabilities['version'] = '115.3.1esr'
    driver = webdriver.Remote(
        command_executor=webdriver_url,
        desired_capabilities=desired_capabilities)
    self.assertIn(
        'Gecko/20100101 Firefox/115.0',
        driver.execute_script('return navigator.userAgent'))
    driver.quit()


class TestFrontend(WebServerMixin, SeleniumServerTestCase):
  """Test hub's https frontend.
  """
  def test_admin(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    admin_url = parameter_dict['admin-url']

    parsed = urllib.parse.urlparse(admin_url)
    self.assertEqual('admin', parsed.username)
    self.assertTrue(parsed.password)
    self.assertEqual(
      requests.get(
        parsed._replace(netloc=f"[{parsed.hostname}]:{parsed.port}").geturl(),
        verify=False).status_code,
      requests.codes.unauthorized
    )

    self.assertIn('Grid Console', requests.get(admin_url, verify=False).text)

  def test_browser_use_hub(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    webdriver_url = parameter_dict['url']
    parsed = urllib.parse.urlparse(webdriver_url)
    self.assertEqual('selenium', parsed.username)
    self.assertTrue(parsed.password)
    self.assertEqual(
      requests.get(
        parsed._replace(netloc=f"[{parsed.hostname}]:{parsed.port}").geturl(),
        verify=False).status_code,
      requests.codes.unauthorized
    )

    # XXX we are using a self signed certificate, but selenium 3.141.0 does
    # not expose API to ignore certificate verification
    executor = RemoteConnection(webdriver_url, keep_alive=True)
    executor._conn = urllib3.PoolManager(cert_reqs='CERT_NONE', ca_certs=None)

    driver = webdriver.Remote(
        command_executor=executor,
        desired_capabilities=webdriver.ChromeOptions().to_capabilities())

    driver.get(self.server_url)
    self.assertEqual('Test page', driver.title)
    driver.quit()


class TestSSHServer(SeleniumServerTestCase):
  @classmethod
  def getInstanceParameterDict(cls):
    cls.ssh_key = paramiko.ECDSAKey.generate(bits=384)
    return {'ssh-authorized-key': 'ecdsa-sha2-nistp384 {}'.format(cls.ssh_key.get_base64())}

  def test_connect(self):
    parameter_dict = self.computer_partition.getConnectionParameterDict()
    ssh_url = parameter_dict['ssh-url']
    parsed = urllib.parse.urlparse(ssh_url)
    self.assertEqual('ssh', parsed.scheme)

    client = paramiko.SSHClient()

    class TestKeyPolicy(object):
      """Accept server key and keep it in self.key for inspection
      """
      def missing_host_key(self, client, hostname, key):
        self.key = key

    key_policy = TestKeyPolicy()
    client.set_missing_host_key_policy(key_policy)

    with contextlib.closing(client):
      client.connect(
          username=urllib.parse.urlparse(ssh_url).username,
          hostname=urllib.parse.urlparse(ssh_url).hostname,
          port=urllib.parse.urlparse(ssh_url).port,
          pkey=self.ssh_key,
      )

      # Check fingerprint from server matches the published one.
      # The publish format is the raw output of ssh-keygen and is something like this:
      #   521 SHA256:9aZruv3LmFizzueIFdkd78eGtzghDoPSCBXFkkrHqXE user@hostname (ECDSA)
      # we only want to parse SHA256:9aZruv3LmFizzueIFdkd78eGtzghDoPSCBXFkkrHqXE
      _, fingerprint_string, _, key_type = parameter_dict[
          'ssh-fingerprint'].split()
      self.assertEqual(key_type, '(ECDSA)')

      fingerprint_algorithm, fingerprint = fingerprint_string.split(':', 1)
      self.assertEqual(fingerprint_algorithm, 'SHA256')
      # Paramiko does not allow to get the fingerprint as SHA256 easily yet
      # https://github.com/paramiko/paramiko/pull/1103
      self.assertEqual(
          fingerprint.encode(),
          # XXX with sha256, we need to remove that trailing =
          base64.b64encode(
              hashlib.new(fingerprint_algorithm,
                          key_policy.key.asbytes()).digest())[:-1])

      channel = client.invoke_shell()
      channel.settimeout(30)
      received = b''
      while True:
        r = channel.recv(1024)
        if not r:
          break
        received += r
        if b'Selenium Server.' in received:
          break
      self.assertIn(b"Welcome to SlapOS Selenium Server.", received)


class TestFirefox102(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
  desired_capabilities = dict(webdriver.FirefoxOptions().to_capabilities(),
                              version='102.15.1esr')
  user_agent = 'Gecko/20100101 Firefox/102.0'

  def test_resize_window(self):
    super().test_resize_window()


class TestFirefox115(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
  desired_capabilities = dict(webdriver.FirefoxOptions().to_capabilities(),
                              version='115.3.1esr')
  user_agent = 'Gecko/20100101 Firefox/115.0'

  def test_resize_window(self):
    super().test_resize_window()


class TestChrome91(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
  desired_capabilities = dict(webdriver.ChromeOptions().to_capabilities(),
                              version='91.0.4472.114')
  user_agent = 'Chrome/91.0.4472.0'


class TestChrome120(
    BrowserCompatibilityMixin,
    SeleniumServerTestCase,
    ImageComparisonTestCase,
):
  desired_capabilities = dict(webdriver.ChromeOptions().to_capabilities(),
                              version='120.0.6099.109')
  user_agent = 'Chrome/120.0.0.0'