Commit 8abf6986 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix and improve the hot reindexing.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@4862 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0a086802
...@@ -272,36 +272,46 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -272,36 +272,46 @@ class ZCatalog(Folder, Persistent, Implicit):
Play back must be a distinct method to activate... Play back must be a distinct method to activate...
""" """
catalog = self.getSQLCatalog(sql_catalog_id) catalog = self.getSQLCatalog(sql_catalog_id)
# First, play back unindexing.
while 1: while 1:
result = catalog.readRecordedObjectList() result = catalog.readRecordedObjectList(catalog=0)
if len(result) == 0: if len(result) == 0:
break break
get_transaction().commit() get_transaction().commit()
path_dict = {}
for o in result: for o in result:
if o.path not in path_dict: try:
path_dict[o.path] = None obj = self.resolve_path(o.path)
object = self.resolve_path(o.path) except ConflictError:
if o.catalog: raise
if object is not None: except:
LOG('playBackRecordedObjectList, will reindexObject:',0,object.getPhysicalPath()) obj = None
object.reindexObject(sql_catalog_id=sql_catalog_id, activity='SQLQueue') if obj is None:
#self.activate(passive_commit=1, priority=5).reindexObject(object, sql_catalog_id=sql_catalog_id) self.uncatalog_object(o.path, sql_catalog_id=sql_catalog_id)
# XXX We can't call reindexObject has an activate message, because we can not
# do pickle with acquisition wrappers catalog.deleteRecordedObjectList(uid_list=[o.uid for o in result])
#self.reindexObject(object, sql_catalog_id=sql_catalog_id) # Ask YO, we must find something activated get_transaction().commit()
# Make sure that this object has no extra reference.
object = None # Next, play back indexing.
else: while 1:
if object is None: result = catalog.readRecordedObjectList(catalog=1)
self.uncatalog_object(o.path, sql_catalog_id=sql_catalog_id) if len(result) == 0:
else: break
# Make sure that this object has no extra reference. get_transaction().commit()
object = None
get_transaction().commit() for o in result:
try:
catalog.deleteRecordedObjectList(path_dict.keys()) obj = self.resolve_path(o.path)
except ConflictError:
raise
except:
obj = None
if obj is not None:
obj.reindexObject(sql_catalog_id=sql_catalog_id)
get_transaction().commit()
catalog.deleteRecordedObjectList(uid_list=[o.uid for o in result])
get_transaction().commit() get_transaction().commit()
def exchangeDatabases(self, source_sql_catalog_id, destination_sql_catalog_id, def exchangeDatabases(self, source_sql_catalog_id, destination_sql_catalog_id,
...@@ -383,13 +393,12 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -383,13 +393,12 @@ class ZCatalog(Folder, Persistent, Implicit):
# First of all, make sure that all root objects have uids. # First of all, make sure that all root objects have uids.
# XXX This is a workaround for tools (such as portal_simulation). # XXX This is a workaround for tools (such as portal_simulation).
for path in self.portal_skins.erp5_core.Catalog_getIndexablePathList(): portal = self.getPortalObject()
object = self.resolve_path(path) for id in portal.objectIds():
if object is None: continue obj = portal[id]
if getattr(object, 'uid', None) is None: if hasattr(aq_base(obj), 'getUid'):
object.getUid() obj.getUid()
object = None get_transaction().commit() # Should not have references to objects too long.
get_transaction().commit() # Should not have references to objects too long.
# Recreate the new database. # Recreate the new database.
LOG('hotReindexObjectList', 0, 'Clearing the new database') LOG('hotReindexObjectList', 0, 'Clearing the new database')
...@@ -401,39 +410,16 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -401,39 +410,16 @@ class ZCatalog(Folder, Persistent, Implicit):
self.setHotReindexingState('recording', self.setHotReindexingState('recording',
source_sql_catalog_id=source_sql_catalog_id, source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id) destination_sql_catalog_id=destination_sql_catalog_id)
get_transaction().commit() # Should not have references to objects too long.
# Iterate all objects recursively. # Start reindexing.
# XXX Commit transactions very often and use resolve_path to get objects instead of objectValues self.ERPSite_reindexAll(sql_catalog_id=destination_sql_catalog_id)
# XXX This is not to be disturbed by normal user operations in the foreground. get_transaction().commit() # Should not have references to objects too long.
# XXX Otherwise, many read conflicts happen and hot reindexing restarts again and again.
#for path in self.Catalog_getIndexablePathList():
for path in self.portal_skins.erp5_core.Catalog_getIndexablePathList():
object = self.resolve_path(path)
if object is None: continue
id_list = object.objectIds()
#LOG('will hotReindex this object:',0,object.getPhysicalPath())
#LOG('will hotReindex this object uid:',0,getattr(object,'uid',None))
object.activate(passive_commit=1, priority=5, activity='SQLQueue').queueCataloggedObject(sql_catalog_id=destination_sql_catalog_id, activity='SQLQueue')
object = None
get_transaction().commit() # Should not have references to objects too long.
LOG('hotReindexObjectList', 0, 'Catalogging %s' % path)
for id in id_list:
subpath = '/'.join([path, id])
o = self.resolve_path(subpath)
if o is not None:
o.activate(passive_commit=1, priority=5, activity='SQLQueue').recursiveQueueCataloggedObject(sql_catalog_id=destination_sql_catalog_id, activity='SQLQueue')
o = None
get_transaction().commit() # Should not have references to objects too long.
# Make sure that everything is catalogged.
LOG('hotReindexObjectList', 0, 'Flushing the queue')
self.activate(broadcast=1, passive_commit=1, after_method_id=('recursiveQueueCataloggedObject', 'queueCataloggedObject', 'immediateQueueCataloggedObject'), priority=5, activity='SQLQueue').flushQueuedObjectList(sql_catalog_id=destination_sql_catalog_id)
# Now synchronize this new database with the current one. # Now synchronize this new database with the current one.
# XXX It is necessary to use ActiveObject to wait for queued objects to be flushed. # XXX It is necessary to use ActiveObject to wait for queued objects to be flushed.
LOG('hotReindexObjectList', 0, 'Starting double indexing') LOG('hotReindexObjectList', 0, 'Starting double indexing')
self.activate(passive_commit=1, after_method_id='flushQueuedObjectList', priority=5, activity='SQLQueue').setHotReindexingState('double indexing', self.activate(passive_commit=1, after_method_id=('reindexObject', 'recursiveReindexObject', priority=5).setHotReindexingState('double indexing',
source_sql_catalog_id=source_sql_catalog_id, source_sql_catalog_id=source_sql_catalog_id,
destination_sql_catalog_id=destination_sql_catalog_id) destination_sql_catalog_id=destination_sql_catalog_id)
...@@ -441,15 +427,15 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -441,15 +427,15 @@ class ZCatalog(Folder, Persistent, Implicit):
if source_sql_catalog_id != destination_sql_catalog_id: if source_sql_catalog_id != destination_sql_catalog_id:
# If equals, then this does not make sense to reindex again. # If equals, then this does not make sense to reindex again.
LOG('hotReindexObjectList', 0, 'Playing back records') LOG('hotReindexObjectList', 0, 'Playing back records')
self.activate(passive_commit=1, after_method_id='setHotReindexingState', priority=5, activity='SQLQueue').playBackRecordedObjectList(sql_catalog_id=destination_sql_catalog_id) self.activate(passive_commit=1, after_method_id='setHotReindexingState', priority=5).playBackRecordedObjectList(sql_catalog_id=destination_sql_catalog_id)
# Exchange the databases. # Exchange the databases.
LOG('hotReindexObjectList', 0, 'Exchanging databases') LOG('hotReindexObjectList', 0, 'Exchanging databases')
self.activate(after_method_id=('reindexObject', 'playBackRecordedObjectList', 'setHotReindexingState'), priority=5, activity='SQLQueue').exchangeDatabases(source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict) # XXX Never called by activity tool, why ??? XXX self.activate(after_method_id=('reindexObject', 'playBackRecordedObjectList'), priority=5).exchangeDatabases(source_sql_catalog_id, destination_sql_catalog_id, skin_selection_dict, sql_connection_id_dict) # XXX Never called by activity tool, why ??? XXX
finally: finally:
# Finish. # Finish.
LOG('hotReindexObjectList', 0, 'Finishing hot reindexing') LOG('hotReindexObjectList', 0, 'Finishing hot reindexing')
self.activate(passive_commit=1, after_method_id='exchangeDatabases', priority=5, activity='SQLQueue').finishHotReindexing() self.activate(passive_commit=1, after_method_id='exchangeDatabases', priority=5).finishHotReindexing()
if RESPONSE is not None: if RESPONSE is not None:
URL1 = REQUEST.get('URL1') URL1 = REQUEST.get('URL1')
...@@ -610,10 +596,6 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -610,10 +596,6 @@ class ZCatalog(Folder, Persistent, Implicit):
def catalogObjectList(self, object_list, sql_catalog_id=None,**kw): def catalogObjectList(self, object_list, sql_catalog_id=None,**kw):
"""Catalog a list of objects. """Catalog a list of objects.
""" """
hot_reindexing = 0
if self.hot_reindexing_state is not None and self.source_sql_catalog_id == catalog.id:
hot_reindexing = 1
wrapped_object_list = [] wrapped_object_list = []
failed_object_list = [] failed_object_list = []
url_list = [] url_list = []
...@@ -643,13 +625,13 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -643,13 +625,13 @@ class ZCatalog(Folder, Persistent, Implicit):
if catalog is not None: if catalog is not None:
catalog.catalogObjectList(wrapped_object_list,**kw) catalog.catalogObjectList(wrapped_object_list,**kw)
if hot_reindexing: if self.hot_reindexing_state is not None and self.source_sql_catalog_id == catalog.id:
destination_catalog = self.getSQLCatalog(self.destination_sql_catalog_id) destination_catalog = self.getSQLCatalog(self.destination_sql_catalog_id)
if self.hot_reindexing_state == 'recording': if destination_catalog.id != catalog.id:
destination_catalog.recordCatalogObjectList(url_list) if self.hot_reindexing_state == 'recording':
else: destination_catalog.recordObjectList(url_list, 1)
destination_catalog.deleteRecordedObjectList(url_list) # Prevent this object from being replayed. else:
destination_catalog.catalogObjectList(wrapped_object_list,**kw) destination_catalog.catalogObjectList(wrapped_object_list,**kw)
object_list[:] = failed_object_list[:] object_list[:] = failed_object_list[:]
...@@ -660,13 +642,13 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -660,13 +642,13 @@ class ZCatalog(Folder, Persistent, Implicit):
if catalog is not None: if catalog is not None:
catalog.uncatalogObject(uid) catalog.uncatalogObject(uid)
if self.hot_reindexing_state is not None and self.source_sql_catalog_id == catalog.id: if self.hot_reindexing_state is not None and self.source_sql_catalog_id == catalog.id:
destination_catalog = self.getSQLCatalog(self.destination_sql_catalog_id) destination_catalog = self.getSQLCatalog(self.destination_sql_catalog_id)
if self.hot_reindexing_state == 'recording': if destination_catalog.id != catalog.id:
destination_catalog.recordUncatalogObject(uid) if self.hot_reindexing_state == 'recording':
else: destination_catalog.recordObjectList([uid], 0)
destination_catalog.deleteRecordedObjectList([uid]) # Prevent this object from being replayed. else:
destination_catalog.uncatalogObject(uid) destination_catalog.uncatalogObject(uid)
def catalogTranslationList(self, object_list, sql_catalog_id=None): def catalogTranslationList(self, object_list, sql_catalog_id=None):
"""Catalog translations. """Catalog translations.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment