Commit 0016855f authored by Jérome Perrin's avatar Jérome Perrin

ERP5Catalog: add support for multiples databases in upgradeSchema

This is needed to support datawarehouse on another database.
parent 0c2c42f0
......@@ -42,6 +42,7 @@ from Acquisition import aq_base, aq_inner, aq_parent, ImplicitAcquisitionWrapper
from Products.CMFActivity.ActiveObject import ActiveObject
from Products.CMFActivity.ActivityTool import GroupedMessage
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from Products.ZMySQLDA.DA import DeferredConnection
from AccessControl.PermissionRole import rolesForPermissionOn
......@@ -1360,20 +1361,45 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
security.declareProtected(Permissions.ManagePortal, 'upgradeSchema')
def upgradeSchema(self, sql_catalog_id=None, src__=0):
"""Upgrade all catalog tables, with ALTER or CREATE queries"""
portal = self.getPortalObject()
catalog = self.getSQLCatalog(sql_catalog_id)
connection_id = catalog.z_create_catalog.connection_id
src = []
db = self.getPortalObject()[connection_id]()
# group methods by connection
method_list_by_connection_id = defaultdict(list)
for method_id in catalog.sql_clear_catalog:
method = catalog[method_id]
method_list_by_connection_id[method.connection_id].append(method)
# Because we cannot select on deferred connections, _upgradeSchema
# cannot be used on SQL methods using a deferred connection.
# We try to find a "non deferred" connection using the same connection
# string and we'll use it instead.
connection_by_connection_id = {}
for connection_id in method_list_by_connection_id:
connection = portal[connection_id]
connection_string = connection.connection_string
connection_by_connection_id[connection_id] = connection
if isinstance(connection, DeferredConnection):
for other_connection in portal.objectValues(
spec=('Z MySQL Database Connection',)):
if connection_string == other_connection.connection_string:
connection_by_connection_id[connection_id] = other_connection
break
queries_by_connection_id = defaultdict(list)
for connection_id, method_list in method_list_by_connection_id.items():
connection = connection_by_connection_id[connection_id]
db = connection()
with db.lock():
for clear_method in catalog.sql_clear_catalog:
r = catalog[clear_method]._upgradeSchema(
connection_id, create_if_not_exists=1, src__=1)
if r:
src.append(r)
for method in method_list:
query = method._upgradeSchema(connection.getId(), create_if_not_exists=1, src__=1)
if query:
queries_by_connection_id[connection_id].append(query)
if not src__:
for r in src:
db.query(r)
return src
for query in queries_by_connection_id[connection_id]:
db.query(query)
return sum(queries_by_connection_id.values(), [])
security.declarePublic('getDocumentValueList')
def getDocumentValueList(self, sql_catalog_id=None,
......
......@@ -34,6 +34,7 @@ import httplib
from AccessControl import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime
from _mysql_exceptions import ProgrammingError
from OFS.ObjectManager import ObjectManager
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList
......@@ -3842,8 +3843,9 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
db1, db2 = getExtraSqlConnectionStringList()[:2]
addConnection = self.portal.manage_addProduct[
"ZMySQLDA"].manage_addZMySQLConnection
addConnection("erp5_test_connection_1", '', db1)
addConnection("erp5_test_connection_2", '', db2)
addConnection("erp5_test_connection_1", "", db1)
addConnection("erp5_test_connection_2", "", db2)
addConnection("erp5_test_connection_deferred_2", "", db2, deferred=True)
self.catalog_tool = self.portal.portal_catalog
self.catalog = self.catalog_tool.newContent(portal_type="Catalog")
......@@ -3862,8 +3864,10 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
self.query_connection_1("DROP TABLE IF EXISTS `%s`" % table)
for table in self._db2_table_list:
self.query_connection_2("DROP TABLE IF EXISTS `%s`" % table)
self.portal.manage_delObjects(
["erp5_test_connection_1", "erp5_test_connection_2"])
self.portal.manage_delObjects([
"erp5_test_connection_1",
"erp5_test_connection_2",
"erp5_test_connection_deferred_2"])
self.commit()
def query_connection_1(self, q):
......@@ -3911,3 +3915,50 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
self.upgradeSchema()
self.commit()
self.query_connection_1("SELECT b from altered_table")
def test_upgradeSchema_multi_connections(self):
# Check that we can upgrade tables on more than one connection,
# like when using an external datawarehouse. This is a reproduction
# for https://nexedi.erp5.net/bug_module/20170426-A3962E
# In this test we use both "normal" and deferred connections,
# which is what happens in default erp5 catalog.
self._db1_table_list.append("table1")
self.query_connection_1("CREATE TABLE table1 (a int)")
self._db2_table_list.extend(("table2", "table_deferred2"))
self.query_connection_2("CREATE TABLE table2 (a int)")
self.query_connection_2("CREATE TABLE table_deferred2 (a int)")
self.commit()
method1 = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_1",
src="CREATE TABLE table1 (a int, b int)")
method2 = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_2",
src="CREATE TABLE table2 (a int, b int)")
method_deferred2 = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_deferred_2",
src="CREATE TABLE table_deferred2 (a int, b int)")
self.catalog.setSqlClearCatalogList(
[method1.getId(),
method2.getId(),
method_deferred2.getId()])
self.commit()
self.upgradeSchema()
self.commit()
self.query_connection_1("SELECT b from table1")
self.query_connection_2("SELECT b from table2")
self.query_connection_2("SELECT b from table_deferred2")
with self.assertRaisesRegexp(ProgrammingError,
r"Table '.*\.table2' doesn't exist"):
self.query_connection_1("SELECT b from table2")
with self.assertRaisesRegexp(ProgrammingError,
r"Table '.*\.table_deferred2' doesn't exist"):
self.query_connection_1("SELECT b from table_deferred2")
with self.assertRaisesRegexp(ProgrammingError,
r"Table '.*\.table1' doesn't exist"):
self.query_connection_2("SELECT b from table1")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment