Commit 8cccb168 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Move queue methods from ZSQLCatalog to SQLCatalog, and do some horrible optimizations.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@3129 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 4850034d
...@@ -52,6 +52,11 @@ except ImportError: ...@@ -52,6 +52,11 @@ except ImportError:
UID_BUFFER_SIZE = 900 UID_BUFFER_SIZE = 900
MAX_UID_BUFFER_SIZE = 20000 MAX_UID_BUFFER_SIZE = 20000
MAX_QUEUE_SIZE = 100
# Put the queue of catalogged objects in RAM for distributed computation.
catalogged_path_dict = {}
catalogged_path_dict_lock = threading.Lock()
valid_method_meta_type_list = ('Z SQL Method', 'Script (Python)') valid_method_meta_type_list = ('Z SQL Method', 'Script (Python)')
...@@ -1026,6 +1031,49 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -1026,6 +1031,49 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
# LOG("SQLCatalog Warning: could not catalog object with method %s" % method_name, # LOG("SQLCatalog Warning: could not catalog object with method %s" % method_name,
# 100,str(path)) # 100,str(path))
security.declarePrivate('queueCataloggedObject')
def queueCataloggedObject(self, object, **kw):
"""
Add an object into the queue for catalogging the object later in a batch form.
"""
catalogged_path_dict_lock.acquire()
try:
catalogged_path_dict[object.getPath()] = None
size = len(catalogged_path_dict)
finally:
catalogged_path_dict_lock.release()
# It is better to flush the queued objects if they are too many...
if size > MAX_QUEUE_SIZE:
self.flushQueuedObjectList()
security.declarePublic('flushQueuedObjectList')
def flushQueuedObjectList(self, **kw):
"""
Flush queued objects.
"""
catalogged_path_dict_lock.acquire()
try:
path_list = catalogged_path_dict.keys()
catalogged_path_dict.clear()
finally:
catalogged_path_dict_lock.release()
# Stupid optimizations.
object_list = []
append = object_list.append
wrapObject = self.aq_parent.wrapObject
resolve_path = self.resolve_path
id = self.getId()
for path in path_list:
object = resolve_path(path)
if object is not None:
object = wrapObject(object, sql_catalog_id=id)
append(object)
#LOG('flushQueuedObjectList, object_list',0,[x.getPhysicalPath() for x in object_list])
if len(object_list) > 0:
self.catalogObjectList(object_list)
# XXX It is better to merge this with catalogObject. Too much code duplication. # XXX It is better to merge this with catalogObject. Too much code duplication.
def catalogObjectList(self, object_list): def catalogObjectList(self, object_list):
""" """
...@@ -1045,12 +1093,12 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -1045,12 +1093,12 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
else: else:
zope_root = self.getPhysicalRoot() zope_root = self.getPhysicalRoot()
root_indexable = int(getattr(zope_root,'isIndexable',1)) root_indexable = int(getattr(zope_root, 'isIndexable', 1))
if not root_indexable: if not root_indexable:
return return
for object in object_list: for object in object_list:
if getattr(aq_base(object), 'uid',None) is None: if getattr(aq_base(object), 'uid', None) is None:
try: try:
object._setUid(self.newUid()) object._setUid(self.newUid())
except: except:
...@@ -1091,6 +1139,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -1091,6 +1139,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
arguments = method.arguments_src arguments = method.arguments_src
for arg in split(arguments): for arg in split(arguments):
value_list = [] value_list = []
append = value_list.append
for object in catalogged_object_list: for object in catalogged_object_list:
#LOG('catalog_object_list: object.uid',0,getattr(object,'uid',None)) #LOG('catalog_object_list: object.uid',0,getattr(object,'uid',None))
#LOG('catalog_object_list: object.path',0,object.getPhysicalPath()) #LOG('catalog_object_list: object.path',0,object.getPhysicalPath())
...@@ -1098,10 +1147,10 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -1098,10 +1147,10 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
value = getattr(object, arg) value = getattr(object, arg)
if callable(value): if callable(value):
value = value() value = value()
value_list.append(value) append(value)
except: except:
#LOG("SQLCatalog Warning: Callable value could not be called",0,str((path, arg, method_name))) #LOG("SQLCatalog Warning: Callable value could not be called",0,str((path, arg, method_name)))
value_list.append(None) append(None)
kw[arg] = value_list kw[arg] = value_list
method = aq_base(method).__of__(self) # Use method in the context of portal_catalog method = aq_base(method).__of__(self) # Use method in the context of portal_catalog
......
...@@ -32,10 +32,6 @@ import string, os, sys, types, threading ...@@ -32,10 +32,6 @@ import string, os, sys, types, threading
from zLOG import LOG from zLOG import LOG
# Put the queue of catalogged objects in RAM for distributed computation.
catalogged_path_list = []
catalogged_path_list_lock = threading.Lock()
manage_addZSQLCatalogForm=DTMLFile('dtml/addZSQLCatalog',globals()) manage_addZSQLCatalogForm=DTMLFile('dtml/addZSQLCatalog',globals())
def manage_addZSQLCatalog(self, id, title, def manage_addZSQLCatalog(self, id, title,
...@@ -639,40 +635,18 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -639,40 +635,18 @@ class ZCatalog(Folder, Persistent, Implicit):
""" """
Add an object into the queue for catalogging the object later in a batch form. Add an object into the queue for catalogging the object later in a batch form.
""" """
catalogged_path_list_lock.acquire() catalog = self.getSQLCatalog(sql_catalog_id)
try: if catalog is not None:
catalogged_path_list.append(object.getPath()) catalog.queueCataloggedObject(object, **kw)
size = len(catalogged_path_list)
finally:
catalogged_path_list_lock.release()
# It is better to flush the queued objects if they are too many...
if size > 100:
self.flushQueuedObjectList(sql_catalog_id=sql_catalog_id)
security.declarePublic('flushQueuedObjectList') security.declarePublic('flushQueuedObjectList')
def flushQueuedObjectList(self, sql_catalog_id=None, **kw): def flushQueuedObjectList(self, sql_catalog_id=None, **kw):
""" """
Flush queued objects. Flush queued objects.
""" """
catalogged_path_list_lock.acquire()
try:
global catalogged_path_list
path_list = catalogged_path_list
catalogged_path_list = []
finally:
catalogged_path_list_lock.release()
object_list = []
for path in path_list:
object = self.resolve_path(path)
if object is not None:
object = self.wrapObject(object, sql_catalog_id=sql_catalog_id)
object_list.append(object)
#LOG('flushQueuedObjectList, object_list',0,[x.getPhysicalPath() for x in object_list])
if len(object_list) > 0:
catalog = self.getSQLCatalog(sql_catalog_id) catalog = self.getSQLCatalog(sql_catalog_id)
catalog.catalogObjectList(object_list) if catalog is not None:
catalog.flushQueuedObjectList(object, **kw)
def uncatalog_object(self, uid, sql_catalog_id=None): def uncatalog_object(self, uid, sql_catalog_id=None):
""" wrapper around catalog """ """ wrapper around catalog """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment