Commit bbe294b1 authored by Vincent Pelletier's avatar Vincent Pelletier

Add connection pooling at db level.

Remove volatile-attribute-related code, it has nothing to do on such class.
Rename the flag used to know wether _abort or _finish has to be called.
Initialise that flag during _begin.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13666 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7b0c0e60
...@@ -132,7 +132,7 @@ class Connection(DABase.Connection): ...@@ -132,7 +132,7 @@ class Connection(DABase.Connection):
self._v_database_connection = connection self._v_database_connection = connection
else: else:
if connection is not None: if connection is not None:
connection.close() connection.closeConnection()
DB = self.factory() DB = self.factory()
database_connection_pool[pool_key] = DB(s) database_connection_pool[pool_key] = DB(s)
self._v_database_connection = database_connection_pool[pool_key] self._v_database_connection = database_connection_pool[pool_key]
......
...@@ -106,6 +106,7 @@ from zLOG import LOG, ERROR, INFO ...@@ -106,6 +106,7 @@ from zLOG import LOG, ERROR, INFO
import string, sys import string, sys
from string import strip, split, find, upper, rfind from string import strip, split, find, upper, rfind
from time import time from time import time
from thread import get_ident
hosed_connection = ( hosed_connection = (
CR.SERVER_GONE_ERROR, CR.SERVER_GONE_ERROR,
...@@ -155,7 +156,7 @@ def int_or_long(s): ...@@ -155,7 +156,7 @@ def int_or_long(s):
try: return int(s) try: return int(s)
except: return long(s) except: return long(s)
FINISH_OR_ABORT_CALLED_ID = '_v_finish_or_abort_called' FINISH_OR_ABORT_CALLED_ID = '_finish_or_abort_called'
class DB(TM): class DB(TM):
...@@ -185,9 +186,10 @@ class DB(TM): ...@@ -185,9 +186,10 @@ class DB(TM):
def __init__(self,connection): def __init__(self,connection):
self.connection=connection self.connection=connection
self.kwargs = kwargs = self._parse_connection_string(connection) self.kwargs = self._parse_connection_string(connection)
self.db=apply(self.Database_Connection, (), kwargs) self.db = {}
transactional = self.db.server_capabilities & CLIENT.TRANSACTIONS db = self.getConnection()
transactional = db.server_capabilities & CLIENT.TRANSACTIONS
if self._try_transactions == '-': if self._try_transactions == '-':
transactional = 0 transactional = 0
elif not transactional and self._try_transactions == '+': elif not transactional and self._try_transactions == '+':
...@@ -196,10 +198,24 @@ class DB(TM): ...@@ -196,10 +198,24 @@ class DB(TM):
if self._mysql_lock: if self._mysql_lock:
self._use_TM = 1 self._use_TM = 1
def close(self): def forceReconnection(self):
if self.db is not None: db = apply(self.Database_Connection, (), self.kwargs)
self.db.close() self.db[get_ident()] = db
self.db = None return db
def getConnection(self):
ident = get_ident()
db = self.db.get(ident)
if db is None:
db = self.forceReconnection()
return db
def closeConnection(self):
ident = get_ident()
db = self.db.get(ident)
if db is not None:
db.close()
del self.db[ident]
def _parse_connection_string(self, connection): def _parse_connection_string(self, connection):
kwargs = {'conv': self.conv} kwargs = {'conv': self.conv}
...@@ -242,11 +258,11 @@ class DB(TM): ...@@ -242,11 +258,11 @@ class DB(TM):
_care=('TABLE', 'VIEW')): _care=('TABLE', 'VIEW')):
r=[] r=[]
a=r.append a=r.append
if 1: db = self.getConnection()
self.db.query("SHOW TABLES") db.query("SHOW TABLES")
result = self.db.store_result() result = db.store_result()
row = result.fetch_row(1) row = result.fetch_row(1)
while row: while row:
a({'TABLE_NAME': row[0][0], 'TABLE_TYPE': 'TABLE'}) a({'TABLE_NAME': row[0][0], 'TABLE_TYPE': 'TABLE'})
row = result.fetch_row(1) row = result.fetch_row(1)
return r return r
...@@ -254,10 +270,9 @@ class DB(TM): ...@@ -254,10 +270,9 @@ class DB(TM):
def columns(self, table_name): def columns(self, table_name):
from string import join from string import join
try: try:
# Field, Type, Null, Key, Default, Extra db = self.getConnection()
if 1: db.query('SHOW COLUMNS FROM %s' % table_name)
self.db.query('SHOW COLUMNS FROM %s' % table_name) c = db.store_result()
c=self.db.store_result()
except: except:
return () return ()
r=[] r=[]
...@@ -301,7 +316,7 @@ class DB(TM): ...@@ -301,7 +316,7 @@ class DB(TM):
self._use_TM and self._register() self._use_TM and self._register()
desc=None desc=None
result=() result=()
db=self.db db = self.getConnection()
try: try:
if 1: if 1:
for qs in filter(None, map(strip,split(query_string, '\0'))): for qs in filter(None, map(strip,split(query_string, '\0'))):
...@@ -321,11 +336,10 @@ class DB(TM): ...@@ -321,11 +336,10 @@ class DB(TM):
result=c.fetch_row(max_rows) result=c.fetch_row(max_rows)
else: else:
desc=None desc=None
except OperationalError, m: except OperationalError, m:
if m[0] not in hosed_connection: raise if m[0] not in hosed_connection: raise
# Hm. maybe the db is hosed. Let's restart it. # Hm. maybe the db is hosed. Let's restart it.
db=self.db=apply(self.Database_Connection, (), self.kwargs) self.forceReconnection()
return self.query(query_string, max_rows) return self.query(query_string, max_rows)
if desc is None: return (),() if desc is None: return (),()
...@@ -342,50 +356,54 @@ class DB(TM): ...@@ -342,50 +356,54 @@ class DB(TM):
func(item) func(item)
return items, result return items, result
def string_literal(self, s): return self.db.string_literal(s) def string_literal(self, s):
return self.getConnection().string_literal(s)
def _begin(self, *ignored): def _begin(self, *ignored):
try: try:
db = self.getConnection()
if self._transactions: if self._transactions:
self.db.query("BEGIN") db.query("BEGIN")
self.db.store_result() db.store_result()
if self._mysql_lock: if self._mysql_lock:
self.db.query("SELECT GET_LOCK('%s',0)" % self._mysql_lock) db.query("SELECT GET_LOCK('%s',0)" % self._mysql_lock)
self.db.store_result() db.store_result()
except: except:
LOG('ZMySQLDA', ERROR, "exception during _begin", LOG('ZMySQLDA', ERROR, "exception during _begin",
error=sys.exc_info()) error=sys.exc_info())
raise raise
setattr(self, FINISH_OR_ABORT_CALLED_ID, False)
def _finish(self, *ignored): def _finish(self, *ignored):
if getattr(self, FINISH_OR_ABORT_CALLED_ID, False): if getattr(self, FINISH_OR_ABORT_CALLED_ID, False):
return return
try: try:
try: try:
db = self.getConnection()
if self._mysql_lock: if self._mysql_lock:
self.db.query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock) db.query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
self.db.store_result() db.store_result()
if self._transactions: if self._transactions:
self.db.query("COMMIT") db.query("COMMIT")
self.db.store_result() db.store_result()
except: except:
LOG('ZMySQLDA', ERROR, "exception during _finish", LOG('ZMySQLDA', ERROR, "exception during _finish",
error=sys.exc_info()) error=sys.exc_info())
raise raise
finally: finally:
self._v_database_connection = None
setattr(self, FINISH_OR_ABORT_CALLED_ID, True) setattr(self, FINISH_OR_ABORT_CALLED_ID, True)
def _abort(self, *ignored): def _abort(self, *ignored):
if getattr(self, FINISH_OR_ABORT_CALLED_ID, False): if getattr(self, FINISH_OR_ABORT_CALLED_ID, False):
return return
try: try:
db = self.getConnection()
if self._mysql_lock: if self._mysql_lock:
self.db.query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock) db.query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
self.db.store_result() db.store_result()
if self._transactions: if self._transactions:
self.db.query("ROLLBACK") db.query("ROLLBACK")
self.db.store_result() db.store_result()
else: else:
LOG('ZMySQLDA', ERROR, "aborting when non-transactional") LOG('ZMySQLDA', ERROR, "aborting when non-transactional")
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment