Commit 16893bc0 authored by Olivier Bertrand's avatar Olivier Bertrand

- Add index read previous capacity.

modified:
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/xindex.cpp

- Optimize retrieving numeric values in scan_record. Was previously
  translating numeric values to character representation back and forth.
modified:
  storage/connect/ha_connect.cc
  storage/connect/mysql-test/connect/r/xml.result

- Modify Pivot table creation to avoid reading the entire source table
  when making columns from Discovery. MDEV-6024
modified:
  storage/connect/tabpivot.cpp
parent b43e82dc
......@@ -635,8 +635,8 @@ ulonglong ha_connect::table_flags() const
{
ulonglong flags= HA_CAN_VIRTUAL_COLUMNS | HA_REC_NOT_IN_SEQ |
HA_NO_AUTO_INCREMENT | HA_NO_PREFIX_CHAR_KEYS |
HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
HA_PARTIAL_COLUMN_READ |
// HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
HA_PARTIAL_COLUMN_READ | HA_FILE_BASED |
// HA_NULL_IN_KEY | not implemented yet
// HA_FAST_KEY_READ | causes error when sorting (???)
HA_NO_TRANSACTIONS | HA_DUPLICATE_KEY_NOT_IN_ORDER |
......@@ -647,9 +647,6 @@ ulonglong ha_connect::table_flags() const
if (pos) {
TABTYPE type= hp->GetRealType(pos);
if (IsFileType(type))
flags|= HA_FILE_BASED;
if (IsExactType(type))
flags|= (HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT);
......@@ -1407,8 +1404,9 @@ int ha_connect::MakeRecord(char *buf)
} // endif colp
value= colp->GetValue();
p= NULL;
// All this could be better optimized
// All this was better optimized
if (!value->IsNull()) {
switch (value->GetType()) {
case TYPE_DATE:
......@@ -1433,39 +1431,37 @@ int ha_connect::MakeRecord(char *buf)
// Get date in the format required by MySQL fields
value->FormatValue(sdvalout, fmt);
p= sdvalout->GetCharValue();
rc= fp->store(p, strlen(p), charset, CHECK_FIELD_WARN);
break;
case TYPE_STRING:
case TYPE_DECIM:
p= value->GetCharString(val);
charset= tdbp->data_charset();
rc= fp->store(p, strlen(p), charset, CHECK_FIELD_WARN);
break;
case TYPE_DOUBLE:
p= NULL;
rc= fp->store(value->GetFloatValue());
break;
case TYPE_STRING:
// Passthru
default:
p= value->GetCharString(val);
rc= fp->store(value->GetBigintValue(), value->IsUnsigned());
break;
} // endswitch Type
if (p) {
if (fp->store(p, strlen(p), charset, CHECK_FIELD_WARN)) {
// Avoid "error" on null fields
if (value->GetIntValue())
rc= HA_ERR_WRONG_IN_RECORD;
DBUG_PRINT("MakeRecord", ("%s", p));
} // endif store
} else
if (fp->store(value->GetFloatValue())) {
// rc= HA_ERR_WRONG_IN_RECORD; a Warning was ignored
char buf[128];
THD *thd= ha_thd();
// Store functions returns 1 on overflow and -1 on fatal error
if (rc > 0) {
char buf[128];
THD *thd= ha_thd();
sprintf(buf, "Out of range value for column '%s' at row %ld",
fp->field_name,
thd->get_stmt_da()->current_row_for_warning());
sprintf(buf, "Out of range value %s for column '%s' at row %ld",
value->GetCharString(val),
fp->field_name,
thd->get_stmt_da()->current_row_for_warning());
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, buf);
DBUG_PRINT("MakeRecord", ("%s", value->GetCharString(val)));
} // endif store
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 0, buf);
DBUG_PRINT("MakeRecord", ("%s", buf));
rc= 0;
} else if (rc < 0)
rc= HA_ERR_WRONG_IN_RECORD;
fp->set_notnull();
} else
......@@ -2455,7 +2451,6 @@ int ha_connect::index_next(uchar *buf)
} // end of index_next
#ifdef NOT_USED
/**
@brief
Used to read backwards through the index.
......@@ -2463,9 +2458,15 @@ int ha_connect::index_next(uchar *buf)
int ha_connect::index_prev(uchar *buf)
{
DBUG_ENTER("ha_connect::index_prev");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
#endif // NOT_USED
int rc;
if (indexing > 0) {
rc= ReadIndexed(buf, OP_PREV);
} else
rc= HA_ERR_WRONG_COMMAND;
DBUG_RETURN(rc);
} // end of index_prev
/**
......
......@@ -241,8 +241,8 @@ class ha_connect: public handler
*/
ulong index_flags(uint inx, uint part, bool all_parts) const
{
return HA_READ_NEXT | HA_READ_RANGE | HA_READ_ORDER
| HA_KEYREAD_ONLY | HA_KEY_SCAN_NOT_ROR;
return HA_READ_NEXT | HA_READ_RANGE | HA_READ_ORDER |
HA_READ_PREV | HA_KEYREAD_ONLY | HA_KEY_SCAN_NOT_ROR;
} // end of index_flags
/** @brief
......@@ -405,7 +405,7 @@ const char *GetValStr(OPVAL vop, bool neg);
We implement this in ha_connect.cc. It's not an obligatory method;
skip it and and MySQL will treat it as not implemented.
*/
//int index_prev(uchar *buf);
int index_prev(uchar *buf);
/** @brief
We implement this in ha_connect.cc. It's not an obligatory method;
......
......@@ -365,7 +365,7 @@ int MYSQLC::Open(PGLOBAL g, const char *host, const char *db,
int pt)
{
const char *pipe = NULL;
uint cto = 60, nrt = 120;
uint cto = 6000, nrt = 12000;
m_DB = mysql_init(NULL);
......
......@@ -326,6 +326,9 @@ Warnings:
Level Warning
Code 1366
Message Incorrect string value: '\xC3\x81\xC3\x82\xC3\x83...' for column 'c' at row 1
Level Warning
Code 1105
Message Out of range value ÁÂÃÄÅÆÇ for column 'c' at row 1
DROP TABLE t1;
#
# Testing Cyrillic
......
......@@ -98,7 +98,8 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
char *query, *colname, buf[64];
int rc, ndif, nblin, w = 0;
bool b = false;
PVAL valp;
PVAL valp;
PQRYRES qrp;
PCOLRES *pcrp, crp, fncrp = NULL;
// Save stack and allocation environment and prepare error return
......@@ -113,8 +114,8 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
if (!Tabsrc && Tabname) {
// Locate the query
query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 16);
sprintf(query, "SELECT * FROM %s", Tabname);
query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 26);
sprintf(query, "SELECT * FROM `%s` LIMIT 1", Tabname);
} else if (!Tabsrc) {
strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
return NULL;
......@@ -132,9 +133,8 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
goto err;
// We must have a storage query to get pivot column values
Qryp = Myc.GetResult(g, true);
Myc.Close();
b = false;
if (!(Qryp = Myc.GetResult(g, true)))
goto err;
if (!Fncol) {
for (crp = Qryp->Colresp; crp; crp = crp->Next)
......@@ -143,7 +143,7 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
if (!Fncol) {
strcpy(g->Message, MSG(NO_DEF_FNCCOL));
return NULL;
goto err;
} // endif Fncol
} // endif Fncol
......@@ -156,7 +156,7 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
if (!Picol) {
strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
return NULL;
goto err;
} // endif Picol
} // endif picol
......@@ -166,7 +166,7 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
if (!stricmp(Picol, crp->Name)) {
if (crp->Nulls) {
sprintf(g->Message, "Pivot column %s cannot be nullable", Picol);
return NULL;
goto err;
} // endif Nulls
Rblkp = crp->Kdata;
......@@ -179,31 +179,59 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
if (!Rblkp) {
strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
return NULL;
goto err;
} else if (!fncrp) {
strcpy(g->Message, MSG(NO_DEF_FNCCOL));
return NULL;
goto err;
} // endif
// Before calling sort, initialize all
nblin = Qryp->Nblin;
if (Tabsrc) {
Myc.Close();
b = false;
Index.Size = nblin * sizeof(int);
Index.Sub = TRUE; // Should be small enough
// Before calling sort, initialize all
nblin = Qryp->Nblin;
if (!PlgDBalloc(g, NULL, Index))
return NULL;
Index.Size = nblin * sizeof(int);
Index.Sub = TRUE; // Should be small enough
Offset.Size = (nblin + 1) * sizeof(int);
Offset.Sub = TRUE; // Should be small enough
if (!PlgDBalloc(g, NULL, Index))
return NULL;
if (!PlgDBalloc(g, NULL, Offset))
return NULL;
Offset.Size = (nblin + 1) * sizeof(int);
Offset.Sub = TRUE; // Should be small enough
ndif = Qsort(g, nblin);
if (!PlgDBalloc(g, NULL, Offset))
return NULL;
if (ndif < 0) // error
return NULL;
ndif = Qsort(g, nblin);
if (ndif < 0) // error
return NULL;
} else {
// The query was limited, we must get pivot column values
query = (char*)PlugSubAlloc(g, NULL, 0);
sprintf(query, "SELECT DISTINCT `%s` FROM `%s`", Picol, Tabname);
PlugSubAlloc(g, NULL, strlen(query) + 1);
Myc.FreeResult();
// Send the source command to MySQL
if (Myc.ExecSQL(g, query, &w) == RC_FX)
goto err;
// We must have a storage query to get pivot column values
if (!(qrp = Myc.GetResult(g, true)))
goto err;
Myc.Close();
b = false;
// Get the column list
crp = qrp->Colresp;
Rblkp = crp->Kdata;
ndif = qrp->Nblin;
} // endif Tabsrc
// Allocate the Value used to retieve column names
if (!(valp = AllocateValue(g, Rblkp->GetType(),
......@@ -220,7 +248,11 @@ PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
crp = fncrp;
// Get the value that will be the generated column name
valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]);
if (Tabsrc)
valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]);
else
valp->SetValue_pvblk(Rblkp, i);
colname = valp->GetCharString(buf);
crp->Name = (char*)PlugSubAlloc(g, NULL, strlen(colname) + 1);
strcpy(crp->Name, colname);
......
......@@ -1631,7 +1631,7 @@ int XINDEX::Fetch(PGLOBAL g)
switch (Op) {
case OP_NEXT: // Read next
if (NextVal(false))
return -1; // End of indexed file
return -1; // End of indexed file
break;
case OP_FIRST: // Read first
......@@ -1648,7 +1648,7 @@ int XINDEX::Fetch(PGLOBAL g)
if (NextVal(true)) {
Op = OP_EQ;
return -2; // no more equal values
return -2; // no more equal values
} // endif NextVal
break;
......@@ -1656,9 +1656,9 @@ int XINDEX::Fetch(PGLOBAL g)
// while (!NextVal(true)) ;
// if (Cur_K >= Num_K)
// return -1; // End of indexed file
// return -1; // End of indexed file
if (NextValDif())
return -1; // End of indexed file
return -1; // End of indexed file
break;
case OP_FSTDIF: // Read first diff
......@@ -1667,12 +1667,17 @@ int XINDEX::Fetch(PGLOBAL g)
Op = (Mul || Nval < Nk) ? OP_NXTDIF : OP_NEXT;
break;
case OP_LAST: // Read last key
case OP_LAST: // Read last key
for (Cur_K = Num_K - 1, kp = To_KeyCol; kp; kp = kp->Next)
kp->Val_K = kp->Kblp->GetNval() - 1;
Op = OP_NEXT;
break;
case OP_PREV: // Read previous
if (PrevVal())
return -1; // End of indexed file
break;
default: // Should be OP_EQ
// if (Tbxp->Key_Rank < 0) {
/***************************************************************/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment