Commit f0e1af12 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: Remove remaining transcaions.commit and move BTree to ActiveProcess

parent 2a3e619a
...@@ -32,6 +32,7 @@ from Products.CMFCore import permissions as CMFCorePermissions ...@@ -32,6 +32,7 @@ from Products.CMFCore import permissions as CMFCorePermissions
from Products.ERP5Type.Base import Base from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet from Products.ERP5Type import PropertySheet
from Products.ERP5Type.ConflictFree import ConflictFreeLog from Products.ERP5Type.ConflictFree import ConflictFreeLog
from BTrees.LOBTree import LOBTree
from BTrees.Length import Length from BTrees.Length import Length
from random import randrange from random import randrange
from .ActiveResult import ActiveResult from .ActiveResult import ActiveResult
...@@ -85,6 +86,15 @@ class ActiveProcess(Base): ...@@ -85,6 +86,15 @@ class ActiveProcess(Base):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
Base.__init__(self, *args, **kw) Base.__init__(self, *args, **kw)
self.result_list = ConflictFreeLog() self.result_list = ConflictFreeLog()
self.use_btree = False
security.declareProtected(CMFCorePermissions.ManagePortal, 'useBTree')
def useBTree(self):
# Use BTree instead of Linked List
# this is used by joblib Backend to store results in a dictionary with
# signature as key
self.use_btree = True
self.result_list = LOBTree()
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result): def postResult(self, result):
...@@ -92,8 +102,19 @@ class ActiveProcess(Base): ...@@ -92,8 +102,19 @@ class ActiveProcess(Base):
result_list = self.result_list result_list = self.result_list
except AttributeError: except AttributeError:
# BBB: self was created before implementation of __init__ # BBB: self was created before implementation of __init__
self.result_list = result_list = ConflictFreeLog() if self.use_btree:
self.result_list = result_list = LOBTree()
else:
self.result_list = result_list = ConflictFreeLog()
else: else:
if self.use_btree:
if not hasattr(result, 'sig'):
result_id = randrange(0, 10000 * (id(result) + 1))
else:
result_id = result.sig
result_list.insert(result_id, result)
return
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
# use a random id in order to store result in a way with # use a random id in order to store result in a way with
# fewer conflict errors # fewer conflict errors
...@@ -103,7 +124,12 @@ class ActiveProcess(Base): ...@@ -103,7 +124,12 @@ class ActiveProcess(Base):
result_list[random_id] = result result_list[random_id] = result
self.result_len.change(1) self.result_len.change(1)
return return
result_list.append(result)
if self.use_btree:
signature = int(result.sig, 16)
result_list.insert(signature, result)
else:
result_list.append(result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult')
def postActiveResult(self, *args, **kw): def postActiveResult(self, *args, **kw):
...@@ -124,6 +150,18 @@ class ActiveProcess(Base): ...@@ -124,6 +150,18 @@ class ActiveProcess(Base):
return result_list.values() return result_list.values()
return list(result_list) return list(result_list)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResult')
def getResult(self, key, **kw):
"""
Returns the result with requested key else None
"""
try:
result_list = self.result_list
result = result_list[key]
except:
return None
return result
security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult')
def activateResult(self, result): def activateResult(self, result):
if result not in (None, 0, '', (), []): if result not in (None, 0, '', (), []):
......
...@@ -139,7 +139,6 @@ class SQLBase(Queue): ...@@ -139,7 +139,6 @@ class SQLBase(Queue):
serialization_tag_list = [m.activity_kw.get('serialization_tag', '') serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list] for m in message_list]
processing_node_list = [] processing_node_list = []
for m in message_list: for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m) m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1) processing_node_list.append(0 if x == 'none' else -1)
......
...@@ -26,24 +26,11 @@ ...@@ -26,24 +26,11 @@
# #
############################################################################## ##############################################################################
# XXX: Note from Rafael
# only reimplment the minimal, and only custom the SQL that update this table.
# Always check if things are there (ie.: If the connection, or the script are present).
import copy
import hashlib
import sys
import transaction import transaction
from functools import total_ordering from functools import total_ordering
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from zExceptions import ExceptionFormatter
from ZODB.POSException import ConflictError
from SQLBase import SQLBase, sort_message_key from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from Products.CMFActivity.ActivityTool import (
Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
from Products.CMFActivity.ActivityRuntimeEnvironment import (
DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
# Stop validating more messages when this limit is reached # Stop validating more messages when this limit is reached
...@@ -55,46 +42,6 @@ _DequeueMessageException = Exception() ...@@ -55,46 +42,6 @@ _DequeueMessageException = Exception()
from SQLDict import SQLDict from SQLDict import SQLDict
# this is improvisation of
# http://stackoverflow.com/questions/5884066/hashing-a-python-dictionary/8714242#8714242
def make_hash(o):
"""
Makes a hash from a dictionary, list, tuple or set to any level, that contains
only other hashable types (including any lists, tuples, sets, and
dictionaries).
"""
if isinstance(o, (set, tuple, list)):
return hash(tuple([make_hash(e) for e in o]))
elif not isinstance(o, dict):
try:
return hash(o)
except TypeError:
return hash(int(hashlib.md5(o).hexdigest(), 16))
new_o = copy.deepcopy(o)
for k, v in new_o.items():
new_o[k] = make_hash(v)
return hash(tuple(frozenset(sorted(new_o.items()))))
@total_ordering
class MyBatchedSignature(object):
"""Create hashable signature"""
def __init__(self, batch):
#LOG('CMFActivity', INFO, batch.items)
items = batch.items[0]
self.func = items[0].__name__
self.args = items[1]
self.kwargs = items[2]
def __eq__(self, other):
return (self.func, self.args) == (other.func, other.args)
def __lt__(self, other):
return (self.func, self.args) < (other.func, other.args)
class SQLJoblib(SQLDict): class SQLJoblib(SQLDict):
""" """
XXX SQLJoblib XXX SQLJoblib
...@@ -124,7 +71,7 @@ class SQLJoblib(SQLDict): ...@@ -124,7 +71,7 @@ class SQLJoblib(SQLDict):
def register(self, activity_buffer, activity_tool, message): def register(self, activity_buffer, activity_tool, message):
""" """
Send message to mysql directly Send messages to mysql directly
""" """
assert not message.is_registered, message assert not message.is_registered, message
message.is_registered = True message.is_registered = True
...@@ -137,7 +84,6 @@ class SQLJoblib(SQLDict): ...@@ -137,7 +84,6 @@ class SQLJoblib(SQLDict):
if m.is_registered: if m.is_registered:
uid = portal.portal_ids.generateNewIdList(self.uid_group, uid = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=1, id_generator='uid')[0] id_count=1, id_generator='uid')[0]
#import pdb; pdb.set_trace()
m.order_validation_text = x = self.getOrderValidationText(m) m.order_validation_text = x = self.getOrderValidationText(m)
processing_node = (0 if x == 'none' else -1) processing_node = (0 if x == 'none' else -1)
portal.SQLJoblib_writeMessage( portal.SQLJoblib_writeMessage(
...@@ -151,82 +97,122 @@ class SQLJoblib(SQLDict): ...@@ -151,82 +97,122 @@ class SQLJoblib(SQLDict):
group_method_id=m.getGroupId(), group_method_id=m.getGroupId(),
date=m.activity_kw.get('at_date'), date=m.activity_kw.get('at_date'),
tag=m.activity_kw.get('tag', ''), tag=m.activity_kw.get('tag', ''),
signature=m.activity_kw.get('signature', ''),
processing_node=processing_node, processing_node=processing_node,
serialization_tag=m.activity_kw.get('serialization_tag', '')) serialization_tag=m.activity_kw.get('serialization_tag', ''))
# Queue semantic def getProcessableMessageLoader(self, activity_tool, processing_node):
def dequeueMessage(self, activity_tool, processing_node): path_and_method_id_dict = {}
message_list, group_method_id, uid_to_duplicate_uid_list_dict = \ def load(line):
self.getProcessableMessageList(activity_tool, processing_node) # getProcessableMessageList already fetch messages with the same
if message_list: # group_method_id, so what remains to be filtered on are path, method_id
# Remove group_id parameter from group_method_id # and signature
if group_method_id is not None: path = line.path
group_method_id = group_method_id.split('\0')[0] method_id = line.method_id
if group_method_id not in (None, ""): key = path, method_id
method = activity_tool.invokeGroup uid = line.uid
args = (group_method_id, message_list, self.__class__.__name__, signature = line.signature
hasattr(self, 'generateMessageUID')) original_uid = path_and_method_id_dict.get(key)
activity_runtime_environment = ActivityRuntimeEnvironment(None) if original_uid is None:
else: m = Message.load(line.message, uid=uid, line=line, signature=signature)
method = activity_tool.invoke try:
message = message_list[0] result = activity_tool.SQLJoblib_selectDuplicatedLineList(
args = (message, ) path=path,
activity_runtime_environment = ActivityRuntimeEnvironment(message) method_id=method_id,
# Commit right before executing messages. group_method_id=line.group_method_id,
# As MySQL transaction does not start exactly at the same time as ZODB signature=signature)
# transactions but a bit later, messages available might be called reserve_uid_list = uid_list = [x.uid for x in result]
# on objects which are not available - or available in an old if reserve_uid_list:
# version - to ZODB connector. activity_tool.SQLJoblib_reserveDuplicatedLineList(
# So all connectors must be committed now that we have selected processing_node=processing_node, uid=reserve_uid_list)
# everything needed from MySQL to get a fresh view of ZODB objects. except:
transaction.commit() self._log(WARNING, 'getDuplicateMessageUidList got an exception')
transaction.begin() raise
tv = getTransactionalVariable() if uid_list:
tv['activity_runtime_environment'] = activity_runtime_environment self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
# Try to invoke path_and_method_id_dict[key] = uid
try: return m, uid, uid_list
method(*args) # We know that original_uid != uid because caller skips lines we returned
# Abort if at least 1 message failed. On next tic, only those that # earlier.
# succeeded will be selected because their at_date won't have been return None, original_uid, [uid]
# increased. return load
for m in message_list:
if m.getExecutionState() == MESSAGE_NOT_EXECUTED: def generateMessageUID(self, m):
raise _DequeueMessageException return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def distribute(self, activity_tool, node_count):
offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
transaction.commit() transaction.commit()
for m in message_list: validation_text_dict = {'none': 1}
if m.getExecutionState() == MESSAGE_EXECUTED: message_dict = {}
transaction.begin() for line in result:
# Create a signature and then store result into the dict message = Message.load(line.message, uid=line.uid, line=line)
signature = MyBatchedSignature(m.args[0].batch) if not hasattr(message, 'order_validation_text'): # BBB
# get active process message.order_validation_text = self.getOrderValidationText(message)
active_process = activity_tool.unrestrictedTraverse(m.active_process) self.getExecutableMessageList(activity_tool, message, message_dict,
active_process.process_result_map.update({signature: m.result}) validation_text_dict, now_date=now_date)
transaction.commit()
except: if message_dict:
exc_info = sys.exc_info() message_unique_dict = {}
if exc_info[1] is not _DequeueMessageException: serialization_tag_dict = {}
self._log(WARNING, distributable_uid_set = set()
'Exception raised when invoking messages (uid, path, method_id) %r' deletable_uid_list = []
% [(m.uid, m.object_path, m.method_id) for m in message_list])
for m in message_list: # remove duplicates
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False) # SQLDict considers object_path, method_id, tag to unify activities,
self._abort() # but ignores method arguments. They are outside of semantics.
exc_info = message_list[0].exc_info for message in message_dict.itervalues():
if exc_info: message_unique_dict.setdefault(self.generateMessageUID(message),
try: []).append(message)
# Register it again.
tv['activity_runtime_environment'] = activity_runtime_environment for message_list in message_unique_dict.itervalues():
cancel = message.on_error_callback(*exc_info) if len(message_list) > 1:
del exc_info, message.exc_info # Sort list of duplicates to keep the message with highest score
transaction.commit() message_list.sort(key=sort_message_key)
if cancel: deletable_uid_list += [m.uid for m in message_list[1:]]
message.setExecutionState(MESSAGE_EXECUTED) message = message_list[0]
except: serialization_tag = message.activity_kw.get('serialization_tag')
self._log(WARNING, 'Exception raised when processing error callbacks') if serialization_tag is None:
message.setExecutionState(MESSAGE_NOT_EXECUTED) distributable_uid_set.add(message.uid)
self._abort() else:
self.finalizeMessageExecution(activity_tool, message_list, serialization_tag_dict.setdefault(serialization_tag,
uid_to_duplicate_uid_list_dict) []).append(message)
transaction.commit() # Don't let through if there is the same serialization tag in the
return not message_list # message dict. If there is the same serialization tag, only one can
\ No newline at end of file # be validated and others must wait.
# But messages with group_method_id are exceptions. serialization_tag
# does not stop validating together. Because those messages should
# be processed together at once.
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].line.group_method_id
if group_method_id == '\0':
continue
for message in message_list[1:]:
if group_method_id == message.line.group_method_id:
distributable_uid_set.add(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
distributable_count = len(distributable_uid_set)
if distributable_count:
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
...@@ -26,16 +26,17 @@ ...@@ -26,16 +26,17 @@
############################################################################## ##############################################################################
ENABLE_JOBLIB = True ENABLE_JOBLIB = True
import copy
import hashlib
import sys import sys
import time import time
import transaction
from BTrees.OOBTree import OOBTree
from zLOG import LOG, INFO, WARNING from zLOG import LOG, INFO, WARNING
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
try: try:
from sklearn.externals.joblib import register_parallel_backend from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.hashing import hash as joblib_hash
from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend
from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend
from sklearn.externals.joblib._parallel_backends import SafeFunction from sklearn.externals.joblib._parallel_backends import SafeFunction
...@@ -45,13 +46,11 @@ except ImportError: ...@@ -45,13 +46,11 @@ except ImportError:
LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!") LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!")
ENABLE_JOBLIB = False ENABLE_JOBLIB = False
from Activity.SQLJoblib import MyBatchedSignature
if ENABLE_JOBLIB: if ENABLE_JOBLIB:
class MySafeFunction(SafeFunction): class MySafeFunction(SafeFunction):
"""Wrapper around a SafeFunction that catches any exception """Wrapper around a SafeFunction that catches any exception
The exception can be handled in CMFActivityResult.get
The exception can be handled in CMFActivityResult.get
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(MySafeFunction, self).__init__(*args, **kwargs) super(MySafeFunction, self).__init__(*args, **kwargs)
...@@ -67,23 +66,12 @@ if ENABLE_JOBLIB: ...@@ -67,23 +66,12 @@ if ENABLE_JOBLIB:
self.active_process = active_process self.active_process = active_process
self.active_process_sig = active_process_sig self.active_process_sig = active_process_sig
self.callback = callback self.callback = callback
def get(self, timeout=None): def get(self, timeout=None):
if self.active_process.getResult(self.active_process_sig) is None:
'''
while not self.active_process.getResultList():
time.sleep(1)
if timeout is not None:
timeout -= 1
if timeout < 0:
raise RuntimeError('Timeout reached')
transaction.commit()
'''
if self.active_process.process_result_map[self.active_process_sig] is None:
raise ConflictError raise ConflictError
result = self.active_process.process_result_map[self.active_process_sig] result = self.active_process.getResult(self.active_process_sig).result
# TODO raise before or after the callback?
if isinstance(result, Exception): if isinstance(result, Exception):
raise result raise result
if self.callback is not None: if self.callback is not None:
...@@ -94,10 +82,7 @@ if ENABLE_JOBLIB: ...@@ -94,10 +82,7 @@ if ENABLE_JOBLIB:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.count = 1 self.count = 1
self.active_process = kwargs['active_process'] self.active_process = kwargs['active_process']
if not hasattr(self.active_process, 'process_result_map'):
self.active_process.process_result_map = OOBTree()
transaction.commit()
def effective_n_jobs(self, n_jobs): def effective_n_jobs(self, n_jobs):
"""Dummy implementation to prevent n_jobs <=0 """Dummy implementation to prevent n_jobs <=0
...@@ -113,14 +98,16 @@ if ENABLE_JOBLIB: ...@@ -113,14 +98,16 @@ if ENABLE_JOBLIB:
active_process_id = self.active_process.getId() active_process_id = self.active_process.getId()
joblib_result = None joblib_result = None
sig = MyBatchedSignature(batch) # create a signature and convert it to integer
if not self.active_process.process_result_map.has_key(sig): sig = joblib_hash(batch.items[0])
self.active_process.process_result_map.insert(sig, None) sigint = int(sig, 16) % (10 ** 16)
if not self.active_process.getResult(sigint):
joblib_result = portal_activities.activate(activity='SQLJoblib', joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id, tag="joblib_%s" % active_process_id,
active_process=self.active_process).Base_callSafeFunction(MySafeFunction(batch)) signature=sig,
active_process=self.active_process).Base_callSafeFunction(sigint, MySafeFunction(batch))
if joblib_result is None: if joblib_result is None:
joblib_result = CMFActivityResult(self.active_process, sig, callback) joblib_result = CMFActivityResult(self.active_process, sigint, callback)
return joblib_result return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args): def configure(self, n_jobs=1, parallel=None, **backend_args):
...@@ -138,11 +125,6 @@ if ENABLE_JOBLIB: ...@@ -138,11 +125,6 @@ if ENABLE_JOBLIB:
def abort_everything(self, ensure_ready=True): def abort_everything(self, ensure_ready=True):
# All jobs will be aborted here while they are still processing our backend # All jobs will be aborted here while they are still processing our backend
# remove job with no results
#self.active_process.process_result_map = dict((k, v)
# for k, v in self.active_process.process_result_map.iteritems() if v)
transaction.commit()
if ensure_ready: if ensure_ready:
self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel, self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
**self.parallel._backend_args) **self.parallel._backend_args)
......
...@@ -177,7 +177,6 @@ class Message(BaseMessage): ...@@ -177,7 +177,6 @@ class Message(BaseMessage):
self.method_id = method_id self.method_id = method_id
self.args = args self.args = args
self.kw = kw self.kw = kw
self.result = None
if getattr(portal_activities, 'activity_creation_trace', False): if getattr(portal_activities, 'activity_creation_trace', False):
# Save current traceback, to make it possible to tell where a message # Save current traceback, to make it possible to tell where a message
# was generated. # was generated.
...@@ -316,12 +315,12 @@ class Message(BaseMessage): ...@@ -316,12 +315,12 @@ class Message(BaseMessage):
result = method(*self.args, **self.kw) result = method(*self.args, **self.kw)
finally: finally:
setSecurityManager(old_security_manager) setSecurityManager(old_security_manager)
if method is not None: if method is not None:
if self.active_process and result is not None: if self.active_process and result is not None:
self.activateResult( self.activateResult(
activity_tool.unrestrictedTraverse(self.active_process), activity_tool.unrestrictedTraverse(self.active_process),
result, obj) result, obj)
self.result = result
self.setExecutionState(MESSAGE_EXECUTED) self.setExecutionState(MESSAGE_EXECUTED)
except: except:
self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
...@@ -504,7 +503,6 @@ class Method(object): ...@@ -504,7 +503,6 @@ class Method(object):
request=self._request, request=self._request,
portal_activities=portal_activities, portal_activities=portal_activities,
) )
if portal_activities.activity_tracking: if portal_activities.activity_tracking:
activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name)) activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
portal_activities.getActivityBuffer().deferredQueueMessage( portal_activities.getActivityBuffer().deferredQueueMessage(
...@@ -1065,6 +1063,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1065,6 +1063,7 @@ class ActivityTool (Folder, UniqueObject):
processing_node starts from 1 (there is not node 0) processing_node starts from 1 (there is not node 0)
""" """
global active_threads global active_threads
# return if the number of threads is too high # return if the number of threads is too high
# else, increase the number of active_threads and continue # else, increase the number of active_threads and continue
tic_lock.acquire() tic_lock.acquire()
......
...@@ -20,6 +20,7 @@ CREATE TABLE <dtml-var table> ( ...@@ -20,6 +20,7 @@ CREATE TABLE <dtml-var table> (
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '', `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`signature` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL, `serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL, `message` LONGBLOB NOT NULL,
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid
</params>
UPDATE
message_job
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
signature
</params>
SELECT uid FROM
message_job
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="string">
FOR UPDATE
...@@ -18,10 +18,11 @@ processing_node ...@@ -18,10 +18,11 @@ processing_node
date date
group_method_id group_method_id
tag tag
signature
serialization_tag serialization_tag
</params> </params>
INSERT INTO <dtml-var table> INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message) (uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES VALUES
( (
<dtml-sqlvar expr="uid" type="int">, <dtml-sqlvar expr="uid" type="int">,
...@@ -34,6 +35,7 @@ VALUES ...@@ -34,6 +35,7 @@ VALUES
<dtml-sqlvar expr="priority" type="int">, <dtml-sqlvar expr="priority" type="int">,
<dtml-sqlvar expr="group_method_id" type="string">, <dtml-sqlvar expr="group_method_id" type="string">,
<dtml-sqlvar expr="tag" type="string">, <dtml-sqlvar expr="tag" type="string">,
<dtml-sqlvar expr="signature" type="string">,
<dtml-sqlvar expr="serialization_tag" type="string">, <dtml-sqlvar expr="serialization_tag" type="string">,
<dtml-sqlvar expr="message" type="string"> <dtml-sqlvar expr="message" type="string">
) )
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment