Commit 43362bc9 authored by Olivier Bertrand's avatar Olivier Bertrand

- Fix bug MDEV-5734

modified:
  storage/connect/mysql-test/connect/r/pivot.result
  storage/connect/mysql-test/connect/t/pivot.test
  storage/connect/tabmysql.cpp
  storage/connect/tabpivot.cpp

- Implement a first experimental support of MRR
  (compiled only if MRRBKA_SUPPORT is defined)
modified:
  storage/connect/colblk.h
  storage/connect/connect.cc
  storage/connect/connect.h
  storage/connect/global.h
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/plugutil.c
  storage/connect/user_connect.cc
parent 1699947e
......@@ -49,6 +49,9 @@ class DllExport COLBLK : public XOBJECT {
void AddColUse(ushort u) {ColUse |= u;}
void AddStatus(ushort u) {Status |= u;}
void SetNext(PCOL cp) {Next = cp;}
#if defined(MRRBKA_SUPPORT)
PXCOL GetKcol(void) {return To_Kcol;}
#endif // MRRBKA_SUPPORT
void SetKcol(PXCOL kcp) {To_Kcol = kcp;}
PCOLDEF GetCdp(void) {return Cdp;}
PSZ GetDomain(void) {return (Cdp) ? Cdp->Decode : NULL;}
......
......@@ -57,7 +57,7 @@ extern int xtrace;
/* Routines called internally by semantic routines. */
/***********************************************************************/
void CntEndDB(PGLOBAL);
RCODE EvalColumns(PGLOBAL g, PTDB tdbp);
RCODE EvalColumns(PGLOBAL g, PTDB tdbp, bool mrr= false);
/***********************************************************************/
/* MySQL routines called externally by semantic routines. */
......@@ -237,7 +237,8 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
bool del, PHC h)
{
char *p;
int i, n;
int i, n, rc;
bool rcop= true;
PCOL colp;
//PCOLUMN cp;
PDBUSER dup= PlgGetUser(g);
......@@ -251,7 +252,15 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
return true;
} // endif tdbp
//tdbp->SetMode(mode); done in ha_connect::GetTDB
// Save stack and allocation environment and prepare error return
if (g->jump_level == MAX_JUMP) {
strcpy(g->Message, MSG(TOO_MANY_JUMPS));
return true;
} // endif jump_level
if ((rc= setjmp(g->jumper[++g->jump_level])) != 0) {
goto err;
} // endif rc
if (!c1) {
if (mode == MODE_INSERT)
......@@ -273,7 +282,7 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
if (!colp) {
sprintf(g->Message, "Column %s not found in %s", p, tdbp->GetName());
return true;
goto err;
} // endif colp
n= strlen(p) + 1;
......@@ -281,12 +290,12 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
for (i= 0, colp= tdbp->GetColumns(); colp; i++, colp= colp->GetNext()) {
if (colp->InitValue(g))
return true;
goto err;
if (mode == MODE_INSERT)
// Allow type conversion
if (colp->SetBuffer(g, colp->GetValue(), true, false))
return true;
goto err;
colp->AddColUse(U_P); // For PLG tables
} // endfor colp
......@@ -301,7 +310,7 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
if (!(utp= (PTDBASE)tdbp->Duplicate(g))) {
sprintf(g->Message, MSG(INV_UPDT_TABLE), tdbp->GetName());
return true;
goto err;
} // endif tp
if (!c2)
......@@ -315,10 +324,10 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
for (i= 0, colp= utp->GetColumns(); colp; i++, colp= colp->GetNext()) {
if (colp->InitValue(g))
return true;
goto err;
if (colp->SetBuffer(g, colp->GetValue(), true, false))
return true;
goto err;
} // endfor colp
......@@ -350,13 +359,17 @@ bool CntOpenTable(PGLOBAL g, PTDB tdbp, MODE mode, char *c1, char *c2,
if (mode != MODE_ANY && mode != MODE_ALTER) {
if (tdbp->OpenDB(g)) {
printf("%s\n", g->Message);
return true;
goto err;
} else
tdbp->SetNext(NULL);
} // endif mode
return false;
rcop= false;
err:
g->jump_level--;
return rcop;
} // end of CntOpenTable
/***********************************************************************/
......@@ -374,7 +387,7 @@ bool CntRewindTable(PGLOBAL g, PTDB tdbp)
/***********************************************************************/
/* Evaluate all columns after a record is read. */
/***********************************************************************/
RCODE EvalColumns(PGLOBAL g, PTDB tdbp)
RCODE EvalColumns(PGLOBAL g, PTDB tdbp, bool mrr)
{
RCODE rc= RC_OK;
PCOL colp;
......@@ -402,7 +415,11 @@ RCODE EvalColumns(PGLOBAL g, PTDB tdbp)
colp->Reset();
// Virtual columns are computed by MariaDB
#if defined(MRRBKA_SUPPORT)
if (!colp->GetColUse(U_VIRTUAL) && (!mrr || colp->GetKcol()))
#else // !MRRBKA_SUPPORT
if (!colp->GetColUse(U_VIRTUAL))
#endif // !MRRBKA_SUPPORT
if (colp->Eval(g))
rc= RC_FX;
......@@ -680,7 +697,7 @@ int CntIndexInit(PGLOBAL g, PTDB ptdb, int id)
/* IndexRead: fetch a record having the index value. */
/***********************************************************************/
RCODE CntIndexRead(PGLOBAL g, PTDB ptdb, OPVAL op,
const void *key, int len)
const void *key, int len, bool mrr)
{
char *kp= (char*)key;
int n;
......@@ -757,7 +774,7 @@ RCODE CntIndexRead(PGLOBAL g, PTDB ptdb, OPVAL op,
xbp->SetNth(0);
if ((rc= (RCODE)tdbp->ReadDB(g)) == RC_OK)
rc= EvalColumns(g, tdbp);
rc= EvalColumns(g, tdbp, mrr);
return rc;
} // end of CntIndexRead
......
......@@ -37,7 +37,8 @@ bool CntRewindTable(PGLOBAL g, PTDB tdbp);
int CntCloseTable(PGLOBAL g, PTDB tdbp);
int CntIndexInit(PGLOBAL g, PTDB tdbp, int id);
RCODE CntReadNext(PGLOBAL g, PTDB tdbp);
RCODE CntIndexRead(PGLOBAL g, PTDB, OPVAL op, const void *k, int n);
RCODE CntIndexRead(PGLOBAL g, PTDB, OPVAL op, const void *k, int n,
bool mrr = false);
RCODE CntWriteRow(PGLOBAL g, PTDB tdbp);
RCODE CntUpdateRow(PGLOBAL g, PTDB tdbp);
RCODE CntDeleteRow(PGLOBAL g, PTDB tdbp, bool all);
......@@ -63,7 +64,7 @@ class TDBDOX: public TDBDOS {
friend int MakeIndex(PGLOBAL, PTDB, PIXDEF);
friend int CntCloseTable(PGLOBAL, PTDB);
friend int CntIndexInit(PGLOBAL, PTDB, int);
friend RCODE CntIndexRead(PGLOBAL, PTDB, OPVAL, const void*, int);
friend RCODE CntIndexRead(PGLOBAL, PTDB, OPVAL, const void*, int, bool);
friend RCODE CntDeleteRow(PGLOBAL, PTDB, bool);
friend int CntIndexRange(PGLOBAL, PTDB, const uchar**, uint*,
bool*, key_part_map*);
......
......@@ -222,6 +222,9 @@ typedef struct _global { /* Global structure */
int Createas; /* To pass info to created table */
void *Xchk; /* indexes in create/alter */
short Alchecked; /* Checked for ALTER */
#if defined(MRRBKA_SUPPORT)
short Mrr; /* True when doing mrr */
#endif // MRRBKA_SUPPORT
short Trace;
int jump_level;
jmp_buf jumper[MAX_JUMP + 2];
......
......@@ -481,6 +481,7 @@ ha_connect::ha_connect(handlerton *hton, TABLE_SHARE *table_arg)
creat_query_id= (table && table->in_use) ? table->in_use->query_id : 0;
stop= false;
alter= false;
mrr= false;
indexing= -1;
locked= 0;
data_file_name= NULL;
......@@ -1357,10 +1358,20 @@ int ha_connect::MakeRecord(char *buf)
if (bitmap_is_set(map, fp->field_index) || alter) {
// This is a used field, fill the buffer with value
for (colp= tdbp->GetColumns(); colp; colp= colp->GetNext())
#if defined(MRRBKA_SUPPORT)
if ((!mrr || colp->GetKcol()) &&
!stricmp(colp->GetName(), (char*)fp->field_name))
break;
#else // !MRRBKA_SUPPORT
if (!stricmp(colp->GetName(), (char*)fp->field_name))
break;
#endif // !MRRBKA_SUPPORT
if (!colp) {
#if defined(MRRBKA_SUPPORT)
if (mrr)
continue;
#endif // MRRBKA_SUPPORT
printf("Column %s not found\n", fp->field_name);
dbug_tmp_restore_column_map(table->write_set, org_bitmap);
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
......@@ -2020,9 +2031,17 @@ int ha_connect::open(const char *name, int mode, uint test_if_locked)
PGLOBAL g= (xp) ? xp->g : NULL;
// Try to set the database environment
if (g)
if (g) {
rc= (CntCheckDB(g, this, name)) ? (-2) : 0;
else
#if defined(MRRBKA_SUPPORT)
if (g->Mrr) {
// This should only happen for the mrr secondary handler
mrr= true;
g->Mrr= false;
} else
mrr= false;
#endif // MRRBKA_SUPPORT
} else
rc= HA_ERR_INTERNAL_ERROR;
DBUG_RETURN(rc);
......@@ -2317,7 +2336,7 @@ int ha_connect::ReadIndexed(uchar *buf, OPVAL op, const uchar *key, uint key_len
//statistic_increment(ha_read_key_count, &LOCK_status);
switch (CntIndexRead(xp->g, tdbp, op, key, (int)key_len)) {
switch (CntIndexRead(xp->g, tdbp, op, key, (int)key_len, mrr)) {
case RC_OK:
xp->fnd++;
rc= MakeRecord((char*)buf);
......@@ -5401,7 +5420,7 @@ bool ha_connect::check_if_incompatible_data(HA_CREATE_INFO *info,
#if defined(MRRBKA_SUPPORT)
#error This is not implemented yet
//#error This is not implemented yet
/****************************************************************************
* CONNECT MRR implementation: use DS-MRR
This is just copied from myisam
......@@ -5433,10 +5452,12 @@ ha_rows ha_connect::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
// MMR is implemented for "local" file based tables only
if (!IsFileType(GetRealType(GetTableOptionStruct(table))))
*flags |= HA_MRR_USE_DEFAULT_IMPL;
*flags|= HA_MRR_USE_DEFAULT_IMPL;
return ds_mrr.dsmrr_info_const(keyno, seq, seq_init_param, n_ranges, bufsz,
flags, cost);
ha_rows rows= ds_mrr.dsmrr_info_const(keyno, seq, seq_init_param, n_ranges,
bufsz, flags, cost);
xp->g->Mrr= !(*flags & HA_MRR_USE_DEFAULT_IMPL);
return rows;
} // end of multi_range_read_info_const
ha_rows ha_connect::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
......@@ -5444,7 +5465,15 @@ ha_rows ha_connect::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
uint *flags, Cost_estimate *cost)
{
ds_mrr.init(this, table);
return ds_mrr.dsmrr_info(keyno, n_ranges, keys, key_parts, bufsz, flags, cost);
// MMR is implemented for "local" file based tables only
if (!IsFileType(GetRealType(GetTableOptionStruct(table))))
*flags|= HA_MRR_USE_DEFAULT_IMPL;
ha_rows rows= ds_mrr.dsmrr_info(keyno, n_ranges, keys, key_parts, bufsz,
flags, cost);
xp->g->Mrr= !(*flags & HA_MRR_USE_DEFAULT_IMPL);
return rows;
} // end of multi_range_read_info
......
......@@ -243,8 +243,12 @@ public:
*/
ulong index_flags(uint inx, uint part, bool all_parts) const
{
return HA_READ_NEXT | HA_READ_RANGE | HA_READ_ORDER;
}
return HA_READ_NEXT | HA_READ_RANGE | HA_READ_ORDER
#if defined(MRRBKA_SUPPORT)
| HA_KEYREAD_ONLY
#endif // MRRBKA_SUPPORT
;
} // end of index_flags
/** @brief
unireg.cc will call max_supported_record_length(), max_supported_keys(),
......@@ -484,6 +488,7 @@ protected:
bool valid_info; // True if xinfo is valid
bool stop; // Used when creating index
bool alter; // True when converting to other engine
bool mrr; // True when getting index positions
int indexing; // Type of indexing for CONNECT
int locked; // Table lock
THR_LOCK_DATA lock_data;
......
......@@ -224,3 +224,30 @@ Kevin 0 2 6
Donald 1 0 3
DROP TABLE pivet;
DROP TABLE pets;
#
# MDEV-5734
#
CREATE TABLE fruit (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(32) DEFAULT NULL,
`cnt` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=latin1;
INSERT INTO fruit VALUES (1,'apple',1),(2,'banana',1),(3,'apple',2),(4,'cherry',4),(5,'durazno',2);
SELECT * FROM fruit;
id name cnt
1 apple 1
2 banana 1
3 apple 2
4 cherry 4
5 durazno 2
CREATE TABLE fruit_pivot ENGINE=CONNECT TABLE_TYPE=pivot TABNAME=fruit;
SELECT * FROM fruit_pivot;
id apple banana cherry durazno
1 1 0 0 0
2 0 1 0 0
3 2 0 0 0
4 0 0 4 0
5 0 0 0 2
DROP TABLE fruit_pivot;
DROP TABLE fruit;
......@@ -143,4 +143,21 @@ SELECT * FROM pivet;
DROP TABLE pivet;
DROP TABLE pets;
--echo #
--echo # MDEV-5734
--echo #
CREATE TABLE fruit (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(32) DEFAULT NULL,
`cnt` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=latin1;
INSERT INTO fruit VALUES (1,'apple',1),(2,'banana',1),(3,'apple',2),(4,'cherry',4),(5,'durazno',2);
SELECT * FROM fruit;
CREATE TABLE fruit_pivot ENGINE=CONNECT TABLE_TYPE=pivot TABNAME=fruit;
SELECT * FROM fruit_pivot;
DROP TABLE fruit_pivot;
DROP TABLE fruit;
--remove_file $MYSQLD_DATADIR/test/expenses.txt
......@@ -153,6 +153,9 @@ PGLOBAL PlugInit(LPCSTR Language, uint worksize)
g->Trace = 0;
g->Createas = 0;
g->Alchecked = 0;
#if defined(MRRBKA_SUPPORT)
g->Mrr = 0;
#endif // MRRBKA_SUPPORT
g->Activityp = g->ActivityStart = NULL;
g->Xchk = NULL;
strcpy(g->Message, "");
......
......@@ -1144,9 +1144,6 @@ MYSQLCOL::MYSQLCOL(MYSQL_FIELD *fld, PTDB tdbp, int i, PSZ am)
ColUse = U_P;
Nullable = !IS_NOT_NULL(fld->flags);
if (Buf_Type == TYPE_DECIM)
Precision = ((Field_new_decimal*)fld)->precision;
// Set additional MySQL access method information for column.
Bind = NULL;
To_Val = NULL;
......
......@@ -405,7 +405,7 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g)
strcat(colist, Picol);
// Now we know how much was suballocated
PlugSubAlloc(g, NULL, strlen(colist));
PlugSubAlloc(g, NULL, strlen(colist) + 1);
// Locate the source string (size is not known yet)
Tabsrc = (char*)PlugSubAlloc(g, NULL, 0);
......@@ -423,7 +423,7 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g)
strcat(strcat(Tabsrc, " ORDER BY "), colist);
// Now we know how much was suballocated
PlugSubAlloc(g, NULL, strlen(Tabsrc));
PlugSubAlloc(g, NULL, strlen(Tabsrc) + 1);
} // endif !GBdone
} else if (!Tabsrc) {
......
......@@ -147,6 +147,9 @@ bool user_connect::CheckCleanup(void)
g->Xchk = NULL;
g->Createas = 0;
g->Alchecked = 0;
#if defined(MRRBKA_SUPPORT)
g->Mrr = 0;
#endif // MRRBKA_SUPPORT
last_query_id= thdp->query_id;
if (xtrace)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment