Commit 8476d305 authored by Olivier Bertrand's avatar Olivier Bertrand

-- Add mutex for user_connect handling

  modified:   storage/connect/ha_connect.cc
  modified:   storage/connect/user_connect.cc
  modified:   storage/connect/plgdbutl.cpp
  modified:   storage/connect/user_connect.cc

-- Trace work storage allocation and freeing in DEVELOPMENT mode
  modified:   storage/connect/ha_connect.cc
  modified:   storage/connect/jsonudf.cpp
  modified:   storage/connect/plugutil.cpp
parent df091237
...@@ -197,11 +197,14 @@ extern "C" { ...@@ -197,11 +197,14 @@ extern "C" {
char *ClassPath; char *ClassPath;
#endif // JDBC_SUPPORT #endif // JDBC_SUPPORT
#if defined(__WIN__) //#if defined(__WIN__)
CRITICAL_SECTION parsec; // Used calling the Flex parser //CRITICAL_SECTION parsec; // Used calling the Flex parser
#else // !__WIN__ //#else // !__WIN__
pthread_mutex_t parmut = PTHREAD_MUTEX_INITIALIZER; //pthread_mutex_t parmut = PTHREAD_MUTEX_INITIALIZER;
#endif // !__WIN__ //#endif // !__WIN__
pthread_mutex_t parmut;
pthread_mutex_t usrmut;
pthread_mutex_t tblmut;
/***********************************************************************/ /***********************************************************************/
/* Utility functions. */ /* Utility functions. */
...@@ -679,10 +682,13 @@ static int connect_init_func(void *p) ...@@ -679,10 +682,13 @@ static int connect_init_func(void *p)
#if defined(__WIN__) #if defined(__WIN__)
sql_print_information("CONNECT: %s", compver); sql_print_information("CONNECT: %s", compver);
InitializeCriticalSection((LPCRITICAL_SECTION)&parsec); //InitializeCriticalSection((LPCRITICAL_SECTION)&parsec);
#else // !__WIN__ #else // !__WIN__
sql_print_information("CONNECT: %s", version); sql_print_information("CONNECT: %s", version);
#endif // !__WIN__ #endif // !__WIN__
pthread_mutex_init(&parmut, NULL);
pthread_mutex_init(&usrmut, NULL);
pthread_mutex_init(&tblmut, NULL);
#if defined(LIBXML2_SUPPORT) #if defined(LIBXML2_SUPPORT)
XmlInitParserLib(); XmlInitParserLib();
...@@ -738,12 +744,15 @@ static int connect_done_func(void *) ...@@ -738,12 +744,15 @@ static int connect_done_func(void *)
JAVAConn::ResetJVM(); JAVAConn::ResetJVM();
#endif // JDBC_SUPPORT #endif // JDBC_SUPPORT
pthread_mutex_destroy(&parmut);
pthread_mutex_destroy(&tblmut);
#if defined(__WIN__) #if defined(__WIN__)
DeleteCriticalSection((LPCRITICAL_SECTION)&parsec); //DeleteCriticalSection((LPCRITICAL_SECTION)&parsec);
#else // !__WIN__ #else // !__WIN__
PROFILE_End(); PROFILE_End();
#endif // !__WIN__ #endif // !__WIN__
pthread_mutex_lock(&usrmut);
for (pc= user_connect::to_users; pc; pc= pn) { for (pc= user_connect::to_users; pc; pc= pn) {
if (pc->g) if (pc->g)
PlugCleanup(pc->g, true); PlugCleanup(pc->g, true);
...@@ -752,6 +761,9 @@ static int connect_done_func(void *) ...@@ -752,6 +761,9 @@ static int connect_done_func(void *)
delete pc; delete pc;
} // endfor pc } // endfor pc
pthread_mutex_unlock(&usrmut);
pthread_mutex_destroy(&usrmut);
connect_hton= NULL; connect_hton= NULL;
DBUG_RETURN(error); DBUG_RETURN(error);
} // end of connect_done_func } // end of connect_done_func
...@@ -864,6 +876,7 @@ ha_connect::~ha_connect(void) ...@@ -864,6 +876,7 @@ ha_connect::~ha_connect(void)
static void PopUser(PCONNECT xp) static void PopUser(PCONNECT xp)
{ {
if (xp) { if (xp) {
pthread_mutex_lock(&usrmut);
xp->count--; xp->count--;
if (!xp->count) { if (!xp->count) {
...@@ -888,6 +901,7 @@ static void PopUser(PCONNECT xp) ...@@ -888,6 +901,7 @@ static void PopUser(PCONNECT xp)
delete xp; delete xp;
} // endif count } // endif count
pthread_mutex_unlock(&usrmut);
} // endif xp } // endif xp
} // end of PopUser } // end of PopUser
...@@ -904,20 +918,29 @@ static PCONNECT GetUser(THD *thd, PCONNECT xp) ...@@ -904,20 +918,29 @@ static PCONNECT GetUser(THD *thd, PCONNECT xp)
if (xp && thd == xp->thdp) if (xp && thd == xp->thdp)
return xp; return xp;
pthread_mutex_lock(&usrmut);
for (xp= user_connect::to_users; xp; xp= xp->next) for (xp= user_connect::to_users; xp; xp= xp->next)
if (thd == xp->thdp) if (thd == xp->thdp)
break; break;
if (xp)
xp->count++;
pthread_mutex_unlock(&usrmut);
if (!xp) { if (!xp) {
xp= new user_connect(thd); xp = new user_connect(thd);
if (xp->user_init()) { if (xp->user_init()) {
delete xp; delete xp;
xp= NULL; xp = NULL;
} // endif user_init } // endif user_init
} else } // endif xp
xp->count++;
//} else
// xp->count++;
return xp; return xp;
} // end of GetUser } // end of GetUser
...@@ -4373,7 +4396,11 @@ bool ha_connect::IsSameIndex(PIXDEF xp1, PIXDEF xp2) ...@@ -4373,7 +4396,11 @@ bool ha_connect::IsSameIndex(PIXDEF xp1, PIXDEF xp2)
MODE ha_connect::CheckMode(PGLOBAL g, THD *thd, MODE ha_connect::CheckMode(PGLOBAL g, THD *thd,
MODE newmode, bool *chk, bool *cras) MODE newmode, bool *chk, bool *cras)
{ {
#if defined(DEVELOPMENT)
if (true) {
#else
if (trace) { if (trace) {
#endif
LEX_STRING *query_string= thd_query_string(thd); LEX_STRING *query_string= thd_query_string(thd);
htrc("%p check_mode: cmdtype=%d\n", this, thd_sql_command(thd)); htrc("%p check_mode: cmdtype=%d\n", this, thd_sql_command(thd));
htrc("Cmd=%.*s\n", (int) query_string->length, query_string->str); htrc("Cmd=%.*s\n", (int) query_string->length, query_string->str);
...@@ -5349,6 +5376,10 @@ static int connect_assisted_discovery(handlerton *, THD* thd, ...@@ -5349,6 +5376,10 @@ static int connect_assisted_discovery(handlerton *, THD* thd,
PCOLRES crp; PCOLRES crp;
PCONNECT xp= NULL; PCONNECT xp= NULL;
PGLOBAL g= GetPlug(thd, xp); PGLOBAL g= GetPlug(thd, xp);
if (!g)
return HA_ERR_INTERNAL_ERROR;
PDBUSER dup= PlgGetUser(g); PDBUSER dup= PlgGetUser(g);
PCATLG cat= (dup) ? dup->Catalog : NULL; PCATLG cat= (dup) ? dup->Catalog : NULL;
PTOS topt= table_s->option_struct; PTOS topt= table_s->option_struct;
...@@ -5356,10 +5387,6 @@ static int connect_assisted_discovery(handlerton *, THD* thd, ...@@ -5356,10 +5387,6 @@ static int connect_assisted_discovery(handlerton *, THD* thd,
String sql(buf, sizeof(buf), system_charset_info); String sql(buf, sizeof(buf), system_charset_info);
sql.copy(STRING_WITH_LEN("CREATE TABLE whatever ("), system_charset_info); sql.copy(STRING_WITH_LEN("CREATE TABLE whatever ("), system_charset_info);
if (!g)
return HA_ERR_INTERNAL_ERROR;
user= host= pwd= tbl= src= col= ocl= pic= fcl= skc= rnk= zfn= dsn= NULL; user= host= pwd= tbl= src= col= ocl= pic= fcl= skc= rnk= zfn= dsn= NULL;
// Get the useful create options // Get the useful create options
......
...@@ -1478,8 +1478,10 @@ static my_bool CheckMemory(PGLOBAL g, UDF_INIT *initid, UDF_ARGS *args, uint n, ...@@ -1478,8 +1478,10 @@ static my_bool CheckMemory(PGLOBAL g, UDF_INIT *initid, UDF_ARGS *args, uint n,
ml += g->More; ml += g->More;
if (ml > g->Sarea_Size) { if (ml > g->Sarea_Size) {
#if !defined(DEVELOPMENT)
if (trace) if (trace)
htrc("Freeing Sarea size=%d\n", g->Sarea_Size); #endif
htrc("Freeing Sarea at %p size=%d\n", g->Sarea, g->Sarea_Size);
free(g->Sarea); free(g->Sarea);
......
...@@ -82,11 +82,11 @@ extern "C" { ...@@ -82,11 +82,11 @@ extern "C" {
extern char version[]; extern char version[];
} // extern "C" } // extern "C"
#if defined(__WIN__) //#if defined(__WIN__)
extern CRITICAL_SECTION parsec; // Used calling the Flex parser //extern CRITICAL_SECTION parsec; // Used calling the Flex parser
#else // !__WIN__ //#else // !__WIN__
extern pthread_mutex_t parmut; extern pthread_mutex_t parmut;
#endif // !__WIN__ //#endif // !__WIN__
// The debug trace used by the main thread // The debug trace used by the main thread
FILE *pfile = NULL; FILE *pfile = NULL;
...@@ -697,21 +697,17 @@ PDTP MakeDateFormat(PGLOBAL g, PCSZ dfmt, bool in, bool out, int flag) ...@@ -697,21 +697,17 @@ PDTP MakeDateFormat(PGLOBAL g, PCSZ dfmt, bool in, bool out, int flag)
/* Call the FLEX generated parser. In multi-threading mode the next */ /* Call the FLEX generated parser. In multi-threading mode the next */
/* instruction is included in an Enter/LeaveCriticalSection bracket. */ /* instruction is included in an Enter/LeaveCriticalSection bracket. */
/*********************************************************************/ /*********************************************************************/
//#if defined(THREAD) //#if defined(__WIN__)
#if defined(__WIN__) // EnterCriticalSection((LPCRITICAL_SECTION)&parsec);
EnterCriticalSection((LPCRITICAL_SECTION)&parsec); //#else // !__WIN__
#else // !__WIN__
pthread_mutex_lock(&parmut); pthread_mutex_lock(&parmut);
#endif // !__WIN__ //#endif // !__WIN__
//#endif // THREAD
rc = fmdflex(pdp); rc = fmdflex(pdp);
//#if defined(THREAD) //#if defined(__WIN__)
#if defined(__WIN__) // LeaveCriticalSection((LPCRITICAL_SECTION)&parsec);
LeaveCriticalSection((LPCRITICAL_SECTION)&parsec); //#else // !__WIN__
#else // !__WIN__
pthread_mutex_unlock(&parmut); pthread_mutex_unlock(&parmut);
#endif // !__WIN__ //#endif // !__WIN__
//#endif // THREAD
if (trace) if (trace)
htrc("Done: in=%s out=%s rc=%d\n", SVP(pdp->InFmt), SVP(pdp->OutFmt), rc); htrc("Done: in=%s out=%s rc=%d\n", SVP(pdp->InFmt), SVP(pdp->OutFmt), rc);
......
...@@ -184,8 +184,10 @@ int PlugExit(PGLOBAL g) ...@@ -184,8 +184,10 @@ int PlugExit(PGLOBAL g)
free(dup); free(dup);
if (g->Sarea) { if (g->Sarea) {
if (trace) #if !defined(DEVELOPMENT)
htrc("Freeing Sarea size=%d\n", g->Sarea_Size); if (trace) {
#endif
htrc("Freeing Sarea at %p size=%d\n", g->Sarea, g->Sarea_Size);
free(g->Sarea); free(g->Sarea);
} // endif Sarea } // endif Sarea
...@@ -467,7 +469,11 @@ void *PlugAllocMem(PGLOBAL g, uint size) ...@@ -467,7 +469,11 @@ void *PlugAllocMem(PGLOBAL g, uint size)
if (!(areap = malloc(size))) if (!(areap = malloc(size)))
sprintf(g->Message, MSG(MALLOC_ERROR), "malloc"); sprintf(g->Message, MSG(MALLOC_ERROR), "malloc");
#if defined(DEVELOPMENT)
if (true) {
#else
if (trace) { if (trace) {
#endif
if (areap) if (areap)
htrc("Memory of %u allocated at %p\n", size, areap); htrc("Memory of %u allocated at %p\n", size, areap);
else else
......
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
#include "user_connect.h" #include "user_connect.h"
#include "mycat.h" #include "mycat.h"
extern pthread_mutex_t usrmut;
/****************************************************************************/ /****************************************************************************/
/* Initialize the user_connect static member. */ /* Initialize the user_connect static member. */
/****************************************************************************/ /****************************************************************************/
...@@ -125,14 +127,18 @@ bool user_connect::user_init() ...@@ -125,14 +127,18 @@ bool user_connect::user_init()
strcpy(ap->Ap_Name, "CONNECT"); strcpy(ap->Ap_Name, "CONNECT");
g->Activityp= ap; g->Activityp= ap;
g->Activityp->Aptr= dup; g->Activityp->Aptr= dup;
pthread_mutex_lock(&usrmut);
next= to_users; next= to_users;
to_users= this; to_users= this;
if (next) if (next)
next->previous= this; next->previous= this;
count = 1;
pthread_mutex_unlock(&usrmut);
last_query_id= thdp->query_id; last_query_id= thdp->query_id;
count= 1;
return false; return false;
} // end of user_init } // end of user_init
...@@ -156,8 +162,11 @@ bool user_connect::CheckCleanup(bool force) ...@@ -156,8 +162,11 @@ bool user_connect::CheckCleanup(bool force)
if (g->Sarea_Size != worksize) { if (g->Sarea_Size != worksize) {
if (g->Sarea) { if (g->Sarea) {
if (trace) #if !defined(DEVELOPMENT)
htrc("CheckCleanup: Free Sarea %d\n", g->Sarea_Size); if (trace) {
#endif
htrc("CheckCleanup: Free Sarea at %p size=%d\n",
g->Sarea, g->Sarea_Size);
free(g->Sarea); free(g->Sarea);
} // endif Size } // endif Size
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment