diff --git a/storage/connect/tabcol.h b/storage/connect/tabcol.h index 20e23b802256b83793b69b6d92fc95b0e355924c..c5ebfdbb10d74e2addbd6aaa0e0eb2b9e22b214c 100644 --- a/storage/connect/tabcol.h +++ b/storage/connect/tabcol.h @@ -17,6 +17,7 @@ /***********************************************************************/ class DllExport XTAB: public BLOCK { // Table Name-Owner-Srcdef block. friend class TDBPRX; + friend class TDBTBM; public: // Constructors XTAB(LPCSTR name, LPCSTR srcdef = NULL); diff --git a/storage/connect/tabtbl.cpp b/storage/connect/tabtbl.cpp index 9207ad636cff8ddf73fad62a84419ffc0bfc2688..61f9c8628d9d526b109ed3c67521aed2b86e0587 100644 --- a/storage/connect/tabtbl.cpp +++ b/storage/connect/tabtbl.cpp @@ -1,7 +1,7 @@ /************* TabTbl C++ Program Source Code File (.CPP) **************/ /* PROGRAM NAME: TABTBL */ /* ------------- */ -/* Version 1.5 */ +/* Version 1.6 */ /* */ /* COPYRIGHT: */ /* ---------- */ @@ -77,6 +77,16 @@ #include "ha_connect.h" #include "mycat.h" // For GetHandler +#if defined(WIN32) +#if defined(__BORLANDC__) +#define SYSEXIT void _USERENTRY +#else +#define SYSEXIT void +#endif +#else // !WIN32 +#define SYSEXIT void * +#endif // !WIN32 + extern "C" int trace; /* ---------------------------- Class TBLDEF ---------------------------- */ @@ -87,6 +97,9 @@ extern "C" int trace; TBLDEF::TBLDEF(void) { //To_Tables = NULL; + Accept = false; + Thread = false; + Maxerr = 0; Ntables = 0; Pseudo = 3; } // end of TBLDEF constructor @@ -143,8 +156,9 @@ bool TBLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff) } // endfor pdb Maxerr = Cat->GetIntCatInfo("Maxerr", 0); - Accept = (Cat->GetBoolCatInfo("Accept", 0) != 0); - } // endif fsec || tablist + Accept = Cat->GetBoolCatInfo("Accept", false); + Thread = Cat->GetBoolCatInfo("Thread", false); + } // endif tablist return FALSE; } // end of DefineAM @@ -156,6 +170,8 @@ PTDB TBLDEF::GetTable(PGLOBAL g, MODE m) { if (Catfunc == FNC_COL) return new(g) TDBTBC(this); + else if (Thread) + return new(g) TDBTBM(this); else return new(g) TDBTBL(this); @@ -173,7 +189,7 @@ TDBTBL::TDBTBL(PTBLDEF tdp) : TDBPRX(tdp) //Tdbp = NULL; Accept = tdp->Accept; Maxerr = tdp->Maxerr; - Nbf = 0; + Nbc = 0; Rows = 0; Crp = 0; // NTables = 0; @@ -227,7 +243,7 @@ bool TDBTBL::InitTableList(PGLOBAL g) // Get the table description block of this table if (!(Tdbp = GetSubTable(g, tabp))) { - if (++Nbf > Maxerr) + if (++Nbc > Maxerr) return TRUE; // Error return else continue; // Skip this table @@ -389,7 +405,7 @@ bool TDBTBL::OpenDB(PGLOBAL g) /*********************************************************************/ if (To_Filter && Tablist) { Tablist = NULL; - Nbf = 0; + Nbc = 0; } // endif To_Filter /*********************************************************************/ @@ -497,4 +513,262 @@ void TBTBLK::ReadColumn(PGLOBAL g) } // end of ReadColumn +/* ------------------------- Class TDBTBM ---------------------------- */ + +/***********************************************************************/ +/* Thread routine that check and open one remote connection. */ +/***********************************************************************/ +pthread_handler_t ThreadOpen(void *p) + { + PTBMT cmp = (PTBMT)p; + + if (!my_thread_init()) { + set_current_thd(cmp->Thd); + + // Try to open the connection + if (!cmp->Tap->GetTo_Tdb()->OpenDB(cmp->G)) { + cmp->Ready = true; + } else + cmp->Rc = RC_FX; + + my_thread_end(); + } else + cmp->Rc = RC_FX; + + return NULL; + } // end of ThreadOpen + +/***********************************************************************/ +/* TDBTBM constructors. */ +/***********************************************************************/ +TDBTBM::TDBTBM(PTBLDEF tdp) : TDBTBL(tdp) + { + Tmp = NULL; // To data table TBMT structures + Cmp = NULL; // Current data table TBMT + Bmp = NULL; // To bad (unconnected) TBMT structures + Done = false; // TRUE after first GetAllResults + Nrc = 0; // Number of remote connections + Nlc = 0; // Number of local connections + } // end of TDBTBL standard constructor + +/***********************************************************************/ +/* Reset read/write position values. */ +/***********************************************************************/ +void TDBTBM::ResetDB(void) + { + for (PCOL colp = Columns; colp; colp = colp->GetNext()) + if (colp->GetAmType() == TYPE_AM_TABID) + colp->COLBLK::Reset(); + + for (PTABLE tabp = Tablist; tabp; tabp = tabp->GetNext()) + ((PTDBASE)tabp->GetTo_Tdb())->ResetDB(); + + Tdbp = (PTDBASE)Tablist->GetTo_Tdb(); + Crp = 0; + } // end of ResetDB + +/***********************************************************************/ +/* Returns RowId if b is false or Rownum if b is true. */ +/***********************************************************************/ +int TDBTBM::RowNumber(PGLOBAL g, bool b) + { + return Tdbp->RowNumber(g) + ((b) ? 0 : Rows); + } // end of RowNumber + +/***********************************************************************/ +/* Initialyze table parallel processing. */ +/***********************************************************************/ +bool TDBTBM::OpenTables(PGLOBAL g) + { + int k; + THD *thd = current_thd; + PTABLE tabp, *ptabp = &Tablist; + PTBMT tp, *ptp = &Tmp; + + // Allocates the TBMT blocks for the tables + for (tabp = Tablist; tabp; tabp = tabp->Next) + if (tabp->GetTo_Tdb()->GetAmType() == TYPE_AM_MYSQL) { + // Remove remote table from the local list + *ptabp = tabp->Next; + + // Make the remote table block + tp = (PTBMT)PlugSubAlloc(g, NULL, sizeof(TBMT)); + memset(tp, 0, sizeof(TBMT)); + tp->G = g; + tp->Tap = tabp; + tp->Thd = thd; + + // Create the thread that will do the table opening. + pthread_attr_init(&tp->attr); +// pthread_attr_setdetachstate(&tp->attr, PTHREAD_CREATE_JOINABLE); + + if ((k = pthread_create(&tp->Tid, &tp->attr, ThreadOpen, tp))) { + sprintf(g->Message, "pthread_create error %d", k); + Nbc++; + continue; + } // endif k + + // Add it to the remote list + *ptp = tp; + ptp = &tp->Next; + Nrc++; // Number of remote connections + } else { + ptabp = &tabp->Next; + Nlc++; // Number of local connections + } // endif Type + + return false; + } // end of OpenTables + +/***********************************************************************/ +/* TBL Access Method opening routine. */ +/* Open first file, other will be opened sequencially when reading. */ +/***********************************************************************/ +bool TDBTBM::OpenDB(PGLOBAL g) + { + if (trace) + htrc("TBM OpenDB: tdbp=%p tdb=R%d use=%d key=%p mode=%d\n", + this, Tdb_No, Use, To_Key_Col, Mode); + + if (Use == USE_OPEN) { + /*******************************************************************/ + /* Table already open, replace it at its beginning. */ + /*******************************************************************/ + ResetDB(); + return Tdbp->OpenDB(g); // Re-open fist table + } // endif use + +#if 0 + /*********************************************************************/ + /* When GetMaxsize was called, To_Filter was not set yet. */ + /*********************************************************************/ + if (To_Filter && Tablist) { + Tablist = NULL; + Nbc = 0; + } // endif To_Filter +#endif // 0 + + /*********************************************************************/ + /* Make the table list. */ + /*********************************************************************/ + if (/*!Tablist &&*/ InitTableList(g)) + return TRUE; + + /*********************************************************************/ + /* Open all remote tables of the list. */ + /*********************************************************************/ + if (OpenTables(g)) + return TRUE; + + /*********************************************************************/ + /* Proceed with local tables. */ + /*********************************************************************/ + if ((CurTable = Tablist)) { + Tdbp = (PTDBASE)CurTable->GetTo_Tdb(); + Tdbp->SetMode(Mode); + + // Check and initialize the subtable columns + for (PCOL cp = Columns; cp; cp = cp->GetNext()) + if (cp->GetAmType() == TYPE_AM_TABID) + cp->COLBLK::Reset(); + else if (((PPRXCOL)cp)->Init(g) && !Accept) + return TRUE; + + if (trace) + htrc("Opening subtable %s\n", Tdbp->GetName()); + + // Now we can safely open the table + if (Tdbp->OpenDB(g)) + return TRUE; + + } // endif *Tablist + + Use = USE_OPEN; + return FALSE; + } // end of OpenDB + +/***********************************************************************/ +/* ReadDB: Data Base read routine for MUL access method. */ +/***********************************************************************/ +int TDBTBM::ReadDB(PGLOBAL g) + { + int rc; + + if (!Done) { + // Get result from local tables + if ((rc = TDBTBL::ReadDB(g)) != RC_EF) + return rc; + else if ((rc = ReadNextRemote(g)) != RC_OK) + return rc; + + Done = true; + } // endif Done + + /*********************************************************************/ + /* Now start the reading process of remote tables. */ + /*********************************************************************/ + retry: + rc = Tdbp->ReadDB(g); + + if (rc == RC_EF) { + // Total number of rows met so far + Rows += Tdbp->RowNumber(g) - 1; + Crp += Tdbp->GetProgMax(g); + Cmp->Complete = true; + + if ((rc = ReadNextRemote(g)) == RC_OK) + goto retry; + + } else if (rc == RC_FX) + strcat(strcat(strcat(g->Message, " ("), Tdbp->GetName()), ")"); + + return rc; + } // end of ReadDB + +/***********************************************************************/ +/* ReadNext: Continue reading from next table. */ +/***********************************************************************/ +int TDBTBM::ReadNextRemote(PGLOBAL g) + { + bool b = false; + + if (Tdbp) + Tdbp->CloseDB(g); + + Cmp = NULL; + + retry: + // Search for a remote table having its result set + for (PTBMT tp = Tmp; tp; tp = tp->Next) + if (tp->Ready) { + if (!tp->Complete) + Cmp = tp; + + } else + b = true; + + if (!Cmp) { + if (b) { // more result to come +// sleep(20); + goto retry; + } else + return RC_EF; + + } // endif Curtable + + Tdbp = (PTDBASE)Cmp->Tap->GetTo_Tdb(); + + // Check and initialize the subtable columns + for (PCOL cp = Columns; cp; cp = cp->GetNext()) + if (cp->GetAmType() == TYPE_AM_TABID) + cp->COLBLK::Reset(); + else if (((PPRXCOL)cp)->Init(g) && !Accept) + return RC_FX; + + if (trace) + htrc("Reading subtable %s\n", Tdbp->GetName()); + + return RC_OK; + } // end of ReadNextRemote + /* ------------------------------------------------------------------- */ diff --git a/storage/connect/tabtbl.h b/storage/connect/tabtbl.h index 69b862ebe4e017e55299615a56f29cde3d035cb1..cdd84a1dd2924a7b55001573181a0e76404b438c 100644 --- a/storage/connect/tabtbl.h +++ b/storage/connect/tabtbl.h @@ -1,5 +1,5 @@ /*************** TabTbl H Declares Source Code File (.H) ***************/ -/* Name: TABTBL.H Version 1.2 */ +/* Name: TABTBL.H Version 1.3 */ /* */ /* (C) Copyright to the author Olivier BERTRAND 2008-2013 */ /* */ @@ -11,7 +11,29 @@ typedef class TBLDEF *PTBLDEF; typedef class TDBTBL *PTDBTBL; - +typedef class TDBTBM *PTDBTBM; +typedef class MYSQLC *PMYC; + +/***********************************************************************/ +/* Defines the structures used for distributed TBM tables. */ +/***********************************************************************/ +typedef struct _TBMtable *PTBMT; + +typedef struct _TBMtable { + PTBMT Next; // Points to next data table struct + PTABLE Tap; // Points to the sub table + PGLOBAL G; // Needed in thread routine + bool Complete; // TRUE when all results are read + bool Ready; // TRUE when results are there + int Rows; // Total number of rows read so far + int ProgCur; // Current pos + int ProgMax; // Max pos + int Rc; // Return code + THD *Thd; + pthread_attr_t attr; // ??? + pthread_t Tid; // CheckOpen thread ID + } TBMT; + /***********************************************************************/ /* TBL table. */ /***********************************************************************/ @@ -32,6 +54,7 @@ class DllExport TBLDEF : public PRXDEF { /* Logical table description */ protected: // Members bool Accept; /* TRUE if bad tables are accepted */ + bool Thread; /* Use thread for remote tables */ int Maxerr; /* Maximum number of bad tables */ int Ntables; /* Number of tables */ }; // end of TBLDEF @@ -41,7 +64,6 @@ class DllExport TBLDEF : public PRXDEF { /* Logical table description */ /***********************************************************************/ class DllExport TDBTBL : public TDBPRX { friend class TBTBLK; - friend class TDBPLG; public: // Constructor TDBTBL(PTBLDEF tdp = NULL); @@ -51,8 +73,8 @@ class DllExport TDBTBL : public TDBPRX { // Methods virtual void ResetDB(void); - virtual int GetRecpos(void) {return Rows;} - virtual int GetBadLines(void) {return (int)Nbf;} + virtual int GetRecpos(void) {return Rows;} + virtual int GetBadLines(void) {return (int)Nbc;} // Database routines virtual PCOL MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n); @@ -72,7 +94,7 @@ class DllExport TDBTBL : public TDBPRX { PTABLE CurTable; // Points to the current table bool Accept; // TRUE if bad tables are accepted int Maxerr; // Maximum number of bad tables - int Nbf; // Number of bad connections + int Nbc; // Number of bad connections int Rows; // Used for RowID int Crp; // Used for CurPos }; // end of class TDBTBL @@ -100,3 +122,44 @@ class TBTBLK : public TIDBLK { protected: // Must not have additional members }; // end of class TBTBLK + +/***********************************************************************/ +/* This is the TBM Access Method class declaration. */ +/***********************************************************************/ +class DllExport TDBTBM : public TDBTBL { + friend class TBTBLK; + public: + // Constructor + TDBTBM(PTBLDEF tdp = NULL); + + // Implementation +//virtual AMT GetAmType(void) {return TYPE_AM_TBL;} + + // Methods + virtual void ResetDB(void); +//virtual int GetRecpos(void) {return Rows;} +//virtual int GetBadLines(void) {return (int)Nbc;} + + // Database routines +//virtual PCOL MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n); + virtual int GetMaxSize(PGLOBAL g) {return 10;} // Temporary + virtual int RowNumber(PGLOBAL g, bool b = FALSE); +//virtual PCOL InsertSpecialColumn(PGLOBAL g, PCOL scp); + virtual bool OpenDB(PGLOBAL g); + virtual int ReadDB(PGLOBAL g); + + protected: + // Internal functions +//bool InitTableList(PGLOBAL g); +//bool TestFil(PGLOBAL g, PFIL filp, PTABLE tabp); + bool OpenTables(PGLOBAL g); + int ReadNextRemote(PGLOBAL g); + + // Members + PTBMT Tmp; // To data table TBMT structures + PTBMT Cmp; // Current data table PLGF (to move to TDBTBL) + PTBMT Bmp; // To bad (unconnected) PLGF structures + bool Done; // TRUE after first GetAllResults + int Nrc; // Number of remote connections + int Nlc; // Number of local connections + }; // end of class TDBTBM diff --git a/storage/connect/value.cpp b/storage/connect/value.cpp index 1c8182b6b8d35377f68f59d5e0edb2642d7188f9..a770ac819c9c1612c29a9a056a73f108efee72e9 100644 --- a/storage/connect/value.cpp +++ b/storage/connect/value.cpp @@ -566,7 +566,7 @@ void TYPVAL<TYPE>::SetValue_char(char *p, int n) if (minus && Tval) Tval = - Tval; - if (trace) + if (trace > 1) htrc(strcat(strcat(strcpy(buf, " setting %s to: "), Fmt), "\n"), GetTypeName(Type), Tval); @@ -585,7 +585,7 @@ void TYPVAL<double>::SetValue_char(char *p, int n) buf[n] = '\0'; Tval = atof(buf); - if (trace) + if (trace > 1) htrc(" setting double: '%s' -> %lf\n", buf, Tval); Null = false; @@ -900,7 +900,7 @@ void TYPVAL<PSZ>::SetValue_char(char *p, int n) *(++p) = '\0'; - if (trace) + if (trace > 1) htrc(" Setting string to: '%s'\n", Strp); Null = false; @@ -1291,7 +1291,7 @@ bool DTVAL::MakeTime(struct tm *ptm) int n, y = ptm->tm_year; time_t t = mktime_mysql(ptm); - if (trace) + if (trace > 1) htrc("MakeTime from (%d,%d,%d,%d,%d,%d)\n", ptm->tm_year, ptm->tm_mon, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec); @@ -1314,7 +1314,7 @@ bool DTVAL::MakeTime(struct tm *ptm) } Tval= (int) t; - if (trace) + if (trace > 1) htrc("MakeTime Ival=%d\n", Tval); return false; @@ -1334,7 +1334,7 @@ bool DTVAL::MakeDate(PGLOBAL g, int *val, int nval) datm.tm_mon=0; datm.tm_year=70; - if (trace) + if (trace > 1) htrc("MakeDate from(%d,%d,%d,%d,%d,%d) nval=%d\n", val[0], val[1], val[2], val[3], val[4], val[5], nval); @@ -1398,7 +1398,7 @@ bool DTVAL::MakeDate(PGLOBAL g, int *val, int nval) } // endfor i - if (trace) + if (trace > 1) htrc("MakeDate datm=(%d,%d,%d,%d,%d,%d)\n", datm.tm_year, datm.tm_mon, datm.tm_mday, datm.tm_hour, datm.tm_min, datm.tm_sec); @@ -1459,7 +1459,7 @@ void DTVAL::SetValue_char(char *p, int n) ndv = ExtractDate(Sdate, Pdtp, DefYear, dval); MakeDate(NULL, dval, ndv); - if (trace) + if (trace > 1) htrc(" setting date: '%s' -> %d\n", Sdate, Tval); Null = false; @@ -1483,7 +1483,7 @@ void DTVAL::SetValue_psz(PSZ p) ndv = ExtractDate(Sdate, Pdtp, DefYear, dval); MakeDate(NULL, dval, ndv); - if (trace) + if (trace > 1) htrc(" setting date: '%s' -> %d\n", Sdate, Tval); Null = false;