Commit d95e797c authored by Olivier Bertrand's avatar Olivier Bertrand

- FIX MDEV-6019 and MDEV-6021

  Exhausted memory cause un-prepared long jump
  Issue proper message when PIVOT column is nullable
modified:
  storage/connect/mysql-test/connect/r/pivot.result
  storage/connect/mysql-test/connect/t/pivot.test
  storage/connect/plgdbsem.h
  storage/connect/tabpivot.cpp

- Prepare adding index_prev (not used yet)
modified:
  storage/connect/plgdbsem.h
  storage/connect/xindex.cpp
  storage/connect/xindex.h
parent 3f361af7
...@@ -229,7 +229,7 @@ DROP TABLE pets; ...@@ -229,7 +229,7 @@ DROP TABLE pets;
# #
CREATE TABLE fruit ( CREATE TABLE fruit (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(32) DEFAULT NULL, `name` varchar(32) NOT NULL,
`cnt` int(11) DEFAULT NULL, `cnt` int(11) DEFAULT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=latin1; ) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=latin1;
......
...@@ -149,7 +149,7 @@ DROP TABLE pets; ...@@ -149,7 +149,7 @@ DROP TABLE pets;
--echo # --echo #
CREATE TABLE fruit ( CREATE TABLE fruit (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(32) DEFAULT NULL, `name` varchar(32) NOT NULL,
`cnt` int(11) DEFAULT NULL, `cnt` int(11) DEFAULT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=latin1; ) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=latin1;
......
...@@ -291,6 +291,7 @@ enum OPVAL {OP_EQ = 1, /* Filtering operator = */ ...@@ -291,6 +291,7 @@ enum OPVAL {OP_EQ = 1, /* Filtering operator = */
OP_CURDT = 113, /* Scalar function Op CurDate */ OP_CURDT = 113, /* Scalar function Op CurDate */
OP_NWEEK = 114, /* Scalar function Op Week number*/ OP_NWEEK = 114, /* Scalar function Op Week number*/
OP_ROW = 115, /* Scalar function Op Row */ OP_ROW = 115, /* Scalar function Op Row */
OP_PREV = 116, /* Index operator Find Previous */
OP_SYSTEM = 200, /* Scalar function Op System */ OP_SYSTEM = 200, /* Scalar function Op System */
OP_REMOVE = 201, /* Scalar function Op Remove */ OP_REMOVE = 201, /* Scalar function Op Remove */
OP_RENAME = 202, /* Scalar function Op Rename */ OP_RENAME = 202, /* Scalar function Op Rename */
......
...@@ -96,10 +96,21 @@ PIVAID::PIVAID(const char *tab, const char *src, const char *picol, ...@@ -96,10 +96,21 @@ PIVAID::PIVAID(const char *tab, const char *src, const char *picol,
PQRYRES PIVAID::MakePivotColumns(PGLOBAL g) PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
{ {
char *query, *colname, buf[64]; char *query, *colname, buf[64];
int ndif, nblin, w = 0; int rc, ndif, nblin, w = 0;
bool b = false;
PVAL valp; PVAL valp;
PCOLRES *pcrp, crp, fncrp = NULL; PCOLRES *pcrp, crp, fncrp = NULL;
// Save stack and allocation environment and prepare error return
if (g->jump_level == MAX_JUMP) {
strcpy(g->Message, MSG(TOO_MANY_JUMPS));
return NULL;
} // endif jump_level
if ((rc= setjmp(g->jumper[++g->jump_level])) != 0) {
goto err;
} // endif rc
if (!Tabsrc && Tabname) { if (!Tabsrc && Tabname) {
// Locate the query // Locate the query
query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 16); query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 16);
...@@ -113,16 +124,17 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g) ...@@ -113,16 +124,17 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
// Open a MySQL connection for this table // Open a MySQL connection for this table
if (Myc.Open(g, Host, Database, User, Pwd, Port)) if (Myc.Open(g, Host, Database, User, Pwd, Port))
return NULL; return NULL;
else
b = true;
// Send the source command to MySQL // Send the source command to MySQL
if (Myc.ExecSQL(g, query, &w) == RC_FX) { if (Myc.ExecSQL(g, query, &w) == RC_FX)
Myc.Close(); goto err;
return NULL;
} // endif Exec
// We must have a storage query to get pivot column values // We must have a storage query to get pivot column values
Qryp = Myc.GetResult(g, true); Qryp = Myc.GetResult(g, true);
Myc.Close(); Myc.Close();
b = false;
if (!Fncol) { if (!Fncol) {
for (crp = Qryp->Colresp; crp; crp = crp->Next) for (crp = Qryp->Colresp; crp; crp = crp->Next)
...@@ -152,6 +164,11 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g) ...@@ -152,6 +164,11 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
// Prepare the column list // Prepare the column list
for (pcrp = &Qryp->Colresp; crp = *pcrp; ) for (pcrp = &Qryp->Colresp; crp = *pcrp; )
if (!stricmp(Picol, crp->Name)) { if (!stricmp(Picol, crp->Name)) {
if (crp->Nulls) {
sprintf(g->Message, "Pivot column %s cannot be nullable", Picol);
return NULL;
} // endif Nulls
Rblkp = crp->Kdata; Rblkp = crp->Kdata;
*pcrp = crp->Next; *pcrp = crp->Next;
} else if (!stricmp(Fncol, crp->Name)) { } else if (!stricmp(Fncol, crp->Name)) {
...@@ -218,6 +235,12 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g) ...@@ -218,6 +235,12 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
// We added ndif columns and removed 2 (picol and fncol) // We added ndif columns and removed 2 (picol and fncol)
Qryp->Nbcol += (ndif - 2); Qryp->Nbcol += (ndif - 2);
return Qryp; return Qryp;
err:
if (b)
Myc.Close();
return NULL;
} // end of MakePivotColumns } // end of MakePivotColumns
/***********************************************************************/ /***********************************************************************/
......
...@@ -1574,6 +1574,46 @@ bool XINDEX::NextVal(bool eq) ...@@ -1574,6 +1574,46 @@ bool XINDEX::NextVal(bool eq)
return (Cur_K == Num_K || (eq && neq <= Nval)); return (Cur_K == Num_K || (eq && neq <= Nval));
} // end of NextVal } // end of NextVal
/***********************************************************************/
/* XINDEX: Find Cur_K and Val_K's of previous index entry. */
/* Returns false if Ok, true if there are no more values. */
/***********************************************************************/
bool XINDEX::PrevVal(void)
{
int n, neq = Nk + 1, curk;
PXCOL kcp;
if (Cur_K == 0)
return true;
else
curk = --Cur_K;
for (n = Nk, kcp = To_LastCol; kcp; n--, kcp = kcp->Previous) {
if (kcp->Kof) {
if (curk < kcp->Kof[kcp->Val_K])
neq = n;
} else {
#ifdef _DEBUG
assert(curk == kcp->Val_K -1);
#endif // _DEBUG
neq = n;
} // endif Kof
#ifdef _DEBUG
assert(kcp->Val_K >= 0);
#endif // _DEBUG
// If this is not a break...
if (neq > n)
break; // all previous columns have same value
curk = --kcp->Val_K; // This is a break, get new column value
} // endfor kcp
return false;
} // end of PrevVal
/***********************************************************************/ /***********************************************************************/
/* XINDEX: Fetch a physical or logical record. */ /* XINDEX: Fetch a physical or logical record. */
/***********************************************************************/ /***********************************************************************/
...@@ -1870,6 +1910,25 @@ int XINDXS::GroupSize(void) ...@@ -1870,6 +1910,25 @@ int XINDXS::GroupSize(void)
: 1; : 1;
} // end of GroupSize } // end of GroupSize
/***********************************************************************/
/* XINDXS: Find Cur_K and Val_K of previous index value. */
/* Returns false if Ok, true if there are no more values. */
/***********************************************************************/
bool XINDXS::PrevVal(void)
{
if (--Cur_K < 0)
return true;
if (Mul) {
if (Cur_K < Pof[To_KeyCol->Val_K])
To_KeyCol->Val_K--;
} else
To_KeyCol->Val_K = Cur_K;
return false;
} // end of PrevVal
/***********************************************************************/ /***********************************************************************/
/* XINDXS: Find Cur_K and Val_K of next index value. */ /* XINDXS: Find Cur_K and Val_K of next index value. */
/* If b is true next value must be equal to last one. */ /* If b is true next value must be equal to last one. */
...@@ -1946,10 +2005,15 @@ int XINDXS::Fetch(PGLOBAL g) ...@@ -1946,10 +2005,15 @@ int XINDXS::Fetch(PGLOBAL g)
break; break;
case OP_LAST: // Read first case OP_LAST: // Read first
Cur_K = Num_K - 1; Cur_K = Num_K - 1;
To_KeyCol->Val_K = To_KeyCol->Kblp->GetNval() - 1; To_KeyCol->Val_K = Ndif - 1;
Op = OP_NEXT; Op = OP_PREV;
break; break;
default: // Should OP_EQ case OP_PREV: // Read previous
if (PrevVal())
return -1; // End of indexed file
break;
default: // Should be OP_EQ
/*****************************************************************/ /*****************************************************************/
/* Look for the first key equal to the link column values */ /* Look for the first key equal to the link column values */
/* and return its rank whithin the index table. */ /* and return its rank whithin the index table. */
......
...@@ -198,6 +198,7 @@ class DllExport XXBASE : public CSORT, public BLOCK { ...@@ -198,6 +198,7 @@ class DllExport XXBASE : public CSORT, public BLOCK {
virtual int MaxRange(void) {return 1;} virtual int MaxRange(void) {return 1;}
virtual int Fetch(PGLOBAL g) = 0; virtual int Fetch(PGLOBAL g) = 0;
virtual bool NextVal(bool eq) {return true;} virtual bool NextVal(bool eq) {return true;}
virtual bool PrevVal(void) {return true;}
virtual int FastFind(int nk) = 0; virtual int FastFind(int nk) = 0;
virtual bool Reorder(PGLOBAL g) {return true;} virtual bool Reorder(PGLOBAL g) {return true;}
virtual int Range(PGLOBAL g, int limit = 0, bool incl = true) virtual int Range(PGLOBAL g, int limit = 0, bool incl = true)
...@@ -263,6 +264,7 @@ class DllExport XINDEX : public XXBASE { ...@@ -263,6 +264,7 @@ class DllExport XINDEX : public XXBASE {
virtual int ColMaxSame(PXCOL kp); virtual int ColMaxSame(PXCOL kp);
virtual void Close(void); virtual void Close(void);
virtual bool NextVal(bool eq); virtual bool NextVal(bool eq);
virtual bool PrevVal(void);
virtual bool Make(PGLOBAL g, PIXDEF sxp); virtual bool Make(PGLOBAL g, PIXDEF sxp);
virtual bool SaveIndex(PGLOBAL g, PIXDEF sxp); virtual bool SaveIndex(PGLOBAL g, PIXDEF sxp);
virtual bool Reorder(PGLOBAL g); virtual bool Reorder(PGLOBAL g);
...@@ -302,6 +304,7 @@ class DllExport XINDXS : public XINDEX { ...@@ -302,6 +304,7 @@ class DllExport XINDXS : public XINDEX {
virtual int Fetch(PGLOBAL g); virtual int Fetch(PGLOBAL g);
virtual int FastFind(int nk); virtual int FastFind(int nk);
virtual bool NextVal(bool eq); virtual bool NextVal(bool eq);
virtual bool PrevVal(void);
virtual int Range(PGLOBAL g, int limit = 0, bool incl = true); virtual int Range(PGLOBAL g, int limit = 0, bool incl = true);
virtual int GroupSize(void); virtual int GroupSize(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment