Commit 0e20f021 authored by Olivier Bertrand's avatar Olivier Bertrand

- Implement dynamic indexing

modified:
  storage/connect/connect.cc
  storage/connect/filter.cpp
  storage/connect/filter.h
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/tabdos.cpp
  storage/connect/tabdos.h
  storage/connect/table.cpp
  storage/connect/xindex.cpp
  storage/connect/xindex.h
  storage/connect/xtable.h
parent 9d296474
......@@ -651,6 +651,14 @@ int CntIndexInit(PGLOBAL g, PTDB ptdb, int id)
return 0;
} // endif xdp
if (xdp->IsDynamic()) {
// This is a dynamically created index (KINDEX)
// It cannot be created now, before cond_push is executed
tdbp->SetXdp(xdp);
return (xdp->IsUnique()) ? 1 : 2;
} // endif dynamic
// Static indexes must be initialized now for records_in_range
// Allocate the key columns definition block
tdbp->Knum= xdp->GetNparts();
tdbp->To_Key_Col= (PCOL*)PlugSubAlloc(g, NULL, tdbp->Knum * sizeof(PCOL));
......@@ -738,9 +746,22 @@ RCODE CntIndexRead(PGLOBAL g, PTDB ptdb, OPVAL op,
// Set reference values and index operator
if (!tdbp->To_Link || !tdbp->To_Kindex) {
if (!tdbp->To_Xdp) {
sprintf(g->Message, "Index not initialized for table %s", tdbp->Name);
return RC_FX;
} else
} // endif !To_Xdp
// Now it's time to make the dynamic index
tdbp->SetFilter(tdbp->To_Def->GetHandler()->CheckFilter(g));
if (tdbp->MakeDynamicIndex(g)) {
sprintf(g->Message, "Fail to make dynamic index %s",
tdbp->To_Xdp->GetName());
return RC_FX;
} // endif MakeDynamicIndex
} // endif !To_Kindex
xbp= (XXBASE*)tdbp->To_Kindex;
if (key) {
......@@ -829,10 +850,14 @@ int CntIndexRange(PGLOBAL g, PTDB ptdb, const uchar* *key, uint *len,
} else
tdbp= (PTDBDOX)ptdb;
if (!tdbp->To_Link || !tdbp->To_Kindex) {
if (!tdbp->To_Kindex || !tdbp->To_Link) {
if (!tdbp->To_Xdp) {
sprintf(g->Message, "Index not initialized for table %s", tdbp->Name);
DBUG_PRINT("Range", ("%s", g->Message));
return -1;
} else // Dynamic index
return tdbp->To_Xdp->GetMaxSame(); // TODO a better estimate
} else
xbp= (XXBASE*)tdbp->To_Kindex;
......
......@@ -1711,7 +1711,7 @@ PFIL PrepareFilter(PGLOBAL g, PFIL fp, bool having)
/* New algorithm: evaluate from the root a de-linearized filter so */
/* AND/OR clauses can be optimized throughout the whole tree. */
/***********************************************************************/
DllExport bool ApplyFilter(PGLOBAL g, PFIL filp, PTDB tdbp)
DllExport bool ApplyFilter(PGLOBAL g, PFIL filp)
{
if (!filp)
return TRUE;
......
......@@ -26,7 +26,7 @@ PFIL MakeFilter(PGLOBAL g, PCOL *colp, POPER pop, PPARM pfirst, bool neg);
/***********************************************************************/
class DllExport FILTER : public XOBJECT { /* Filter description block */
//friend PFIL PrepareFilter(PGLOBAL, PFIL, bool);
friend DllExport bool ApplyFilter(PGLOBAL, PFIL, PTDB = NULL);
friend DllExport bool ApplyFilter(PGLOBAL, PFIL);
public:
// Constructors
FILTER(PGLOBAL g, POPER pop, PPARM *tp = NULL);
......
......@@ -170,7 +170,7 @@
#define SZWMIN 4194304 // Minimum work area size 4M
extern "C" {
char version[]= "Version 1.02.0002 March 16, 2014";
char version[]= "Version 1.03.0002 April 23, 2014";
#if defined(XMSG)
char msglang[]; // Default message language
......@@ -324,7 +324,7 @@ ha_create_table_option connect_field_option_list[]=
*/
ha_create_table_option connect_index_option_list[]=
{
HA_IOPTION_BOOL("DYN", kindx, 0),
HA_IOPTION_BOOL("DYNAMIC", kindx, 0),
HA_IOPTION_BOOL("MAPPED", mapped, 0),
};
......@@ -658,7 +658,7 @@ TABTYPE ha_connect::GetRealType(PTOS pos)
const char *ha_connect::index_type(uint inx)
{
switch (GetIndexType(GetRealType())) {
case 1: return "XPLUG";
case 1: return "XINDEX";
case 2: return "REMOTE";
} // endswitch
......@@ -1142,6 +1142,14 @@ void *ha_connect::GetColumnOption(PGLOBAL g, void *field, PCOLINFO pcf)
return fldp;
} // end of GetColumnOption
/****************************************************************************/
/* Return an index option structure. */
/****************************************************************************/
PXOS ha_connect::GetIndexOptionStruct(KEY *kp)
{
return kp->option_struct;
} // end of GetIndexOptionStruct
/****************************************************************************/
/* Returns the index description structure used to make the index. */
/****************************************************************************/
......@@ -1151,6 +1159,7 @@ PIXDEF ha_connect::GetIndexInfo(TABLE_SHARE *s)
bool unique;
PIXDEF xdp, pxd=NULL, toidx= NULL;
PKPDEF kpp, pkp;
PXOS xosp;
KEY kp;
PGLOBAL& g= xp->g;
......@@ -1163,6 +1172,7 @@ PIXDEF ha_connect::GetIndexInfo(TABLE_SHARE *s)
// Find the index to describe
kp= s->key_info[n];
xosp= kp.option_struct;
// Now get index information
pn= (char*)s->keynames.type_names[n];
......@@ -1205,6 +1215,20 @@ PIXDEF ha_connect::GetIndexInfo(TABLE_SHARE *s)
xdp->SetNParts(kp.user_defined_key_parts);
if (xosp) {
xdp->Dynamic= xosp->kindx;
xdp->Mapped= xosp->mapped;
} else if (kp.comment.str != NULL) {
char *pv, *oplist= kp.comment.str;
if ((pv= GetListOption(g, "Dynamic", oplist)))
xdp->Dynamic= (!*pv || *pv == 'y' || *pv == 'Y' || atoi(pv) != 0);
if ((pv= GetListOption(g, "Mapped", oplist)))
xdp->Mapped= (!*pv || *pv == 'y' || *pv == 'Y' || atoi(pv) != 0);
} // endif comment
if (pxd)
pxd->SetNext(xdp);
else
......@@ -1844,6 +1868,15 @@ const char *ha_connect::GetValStr(OPVAL vop, bool neg)
} // end of GetValStr
/***********************************************************************/
/* Check the WHERE condition and return a CONNECT filter. */
/***********************************************************************/
PFIL ha_connect::CheckFilter(PGLOBAL g)
{
return CondFilter(g, (Item *)pushed_cond);
} // end of CheckFilter
/***********************************************************************/
/* Check the WHERE condition and return a CONNECT filter. */
/***********************************************************************/
......@@ -2680,7 +2713,7 @@ int ha_connect::index_init(uint idx, bool sorted)
htrc("index_init CONNECT: %s\n", g->Message);
active_index= MAX_KEY;
rc= HA_ERR_INTERNAL_ERROR;
} else {
} else if (((PTDBDOX)tdbp)->To_Kindex) {
if (((PTDBDOX)tdbp)->To_Kindex->GetNum_K()) {
if (((PTDBASE)tdbp)->GetFtype() != RECFM_NAF)
((PTDBDOX)tdbp)->GetTxfp()->ResetBuffer(g);
......@@ -2689,7 +2722,6 @@ int ha_connect::index_init(uint idx, bool sorted)
} else // Void table
indexing= 0;
rc= 0;
} // endif indexing
if (xtrace)
......
......@@ -72,6 +72,7 @@ typedef class XCHK *PCHK;
typedef class user_connect *PCONNECT;
typedef struct ha_table_option_struct TOS, *PTOS;
typedef struct ha_field_option_struct FOS, *PFOS;
typedef struct ha_index_option_struct XOS, *PXOS;
extern handlerton *connect_hton;
......@@ -195,6 +196,7 @@ public:
bool NoFieldOptionChange(TABLE *tab);
PFOS GetFieldOptionStruct(Field *fp);
void *GetColumnOption(PGLOBAL g, void *field, PCOLINFO pcf);
PXOS GetIndexOptionStruct(KEY *kp);
PIXDEF GetIndexInfo(TABLE_SHARE *s= NULL);
const char *GetDBName(const char *name);
const char *GetTableName(void);
......@@ -345,6 +347,7 @@ virtual const COND *cond_push(const COND *cond);
PCFIL CheckCond(PGLOBAL g, PCFIL filp, AMT tty, Item *cond);
const char *GetValStr(OPVAL vop, bool neg);
PFIL CondFilter(PGLOBAL g, Item *cond);
PFIL CheckFilter(PGLOBAL g);
/**
Number of rows in table. It will only be called if
......
......@@ -1477,12 +1477,17 @@ PBF TDBDOS::CheckBlockFilari(PGLOBAL g, PXOB *arg, int op, bool *cnv)
} // end of CheckBlockFilari
/***********************************************************************/
/* ResetBlkFil: reset the block filter and restore filtering. */
/* ResetBlkFil: reset the block filter and restore filtering, or make */
/* the block filter if To_Filter was not set when opening the table. */
/***********************************************************************/
void TDBDOS::ResetBlockFilter(PGLOBAL g)
{
if (!To_BlkFil)
if (!To_BlkFil) {
if (To_Filter)
To_BlkFil = InitBlockFilter(g, To_Filter);
return;
} // endif To_BlkFil
To_BlkFil->Reset(g);
......@@ -1590,7 +1595,7 @@ int TDBDOS::MakeIndex(PGLOBAL g, PIXDEF pxdf, bool add)
// Allocate all columns that will be used by indexes.
// This must be done before opening the table so specific
// column initialization can be done ( in particular by TDBVCT)
// column initialization can be done (in particular by TDBVCT)
for (n = 0, xdp = pxdf; xdp; xdp = xdp->GetNext())
for (kdp = xdp->GetToKeyParts(); kdp; kdp = kdp->GetNext()) {
if (!(colp = ColDB(g, kdp->GetName(), 0))) {
......@@ -1687,6 +1692,79 @@ err:
return RC_FX;
} // end of MakeIndex
/***********************************************************************/
/* Make a dynamic index. */
/***********************************************************************/
bool TDBDOS::MakeDynamicIndex(PGLOBAL g)
{
int k, rc;
bool brc;
PCOL colp;
PCOLDEF cdp;
PVAL valp;
PIXDEF xdp;
PKXBASE kxp;
PKPDEF kdp;
if (!(xdp = To_Xdp)) {
strcpy(g->Message, "NULL dynamic index");
return true;
} // endif To_Xdp
// Allocate the key columns definition block
Knum = xdp->GetNparts();
To_Key_Col = (PCOL*)PlugSubAlloc(g, NULL, Knum * sizeof(PCOL));
// Get the key column description list
for (k = 0, kdp = xdp->GetToKeyParts(); kdp; kdp = kdp->GetNext())
if (!(colp = ColDB(g, kdp->GetName(), 0)) || colp->InitValue(g)) {
sprintf(g->Message, "Wrong column %s", kdp->GetName());
return true;
} else
To_Key_Col[k++] = colp;
#if defined(_DEBUG)
if (k != Knum) {
sprintf(g->Message, "Key part number mismatch for %s",
xdp->GetName());
return 0;
} // endif k
#endif // _DEBUG
// Allocate the pseudo constants that will contain the key values
To_Link = (PXOB*)PlugSubAlloc(g, NULL, Knum * sizeof(PXOB));
for (k = 0, kdp = xdp->GetToKeyParts(); kdp; k++, kdp = kdp->GetNext()) {
cdp = Key(k)->GetCdp();
valp = AllocateValue(g, cdp->GetType(), cdp->GetLength());
To_Link[k]= new(g) CONSTANT(valp);
} // endfor k
// Make the index on xdp
if (!xdp->IsAuto()) {
if (Knum == 1) // Single index
kxp= new(g) XINDXS(this, xdp, NULL, To_Key_Col, To_Link);
else // Multi-Column index
kxp= new(g) XINDEX(this, xdp, NULL, To_Key_Col, To_Link);
} else // Column contains same values as ROWID
kxp= new(g) XXROW(this);
// Prepare error return
if (g->jump_level == MAX_JUMP) {
strcpy(g->Message, MSG(TOO_MANY_JUMPS));
return true;
} // endif
if ((rc = setjmp(g->jumper[++g->jump_level])) != 0) {
brc = true;
} else if (!(brc = kxp->Make(g, xdp)))
To_Kindex= kxp;
g->jump_level--;
return brc;
} // end of MakeDynamicIndex
/***********************************************************************/
/* DOS GetProgMax: get the max value for progress information. */
/***********************************************************************/
......
......@@ -169,6 +169,7 @@ class DllExport TDBDOS : public TDBASE {
// Optimization routines
virtual int MakeIndex(PGLOBAL g, PIXDEF pxdf, bool add);
bool MakeDynamicIndex(PGLOBAL g);
void ResetBlockFilter(PGLOBAL g);
bool GetDistinctColumnValues(PGLOBAL g, int nrec);
......
......@@ -139,6 +139,7 @@ TDBASE::TDBASE(PTABDEF tdp) : TDB(tdp)
To_Link = NULL;
To_Key_Col = NULL;
To_Kindex = NULL;
To_Xdp = NULL;
To_SetCols = NULL;
MaxSize = -1;
Knum = 0;
......@@ -149,8 +150,13 @@ TDBASE::TDBASE(PTABDEF tdp) : TDB(tdp)
TDBASE::TDBASE(PTDBASE tdbp) : TDB(tdbp)
{
To_Def = tdbp->To_Def;
To_Link = tdbp->To_Link;
To_Key_Col = tdbp->To_Key_Col;
To_Kindex = tdbp->To_Kindex;
To_Xdp = tdbp->To_Xdp;
To_SetCols = tdbp->To_SetCols; // ???
MaxSize = tdbp->MaxSize;
Knum = tdbp->Knum;
Read_Only = tdbp->Read_Only;
m_data_charset= tdbp->m_data_charset;
} // end of TDBASE copy constructor
......
......@@ -112,6 +112,8 @@ INDEXDEF::INDEXDEF(char *name, bool uniq, int n)
Unique = uniq;
Invalid = false;
AutoInc = false;
Dynamic = false;
Mapped = false;
Nparts = 0;
ID = n;
//Offset = 0;
......@@ -242,6 +244,7 @@ void XINDEX::Reset(void)
void XINDEX::Close(void)
{
// Close file or view of file
if (X)
X->Close();
// De-allocate data
......@@ -286,7 +289,7 @@ bool XINDEX::Make(PGLOBAL g, PIXDEF sxp)
int k, rc = RC_OK;
int *bof, i, j, n, ndf, nkey;
PKPDEF kdfp = Xdp->GetToKeyParts();
bool brc = true;
bool brc = false;
PCOL colp;
PXCOL kp, prev = NULL, kcp = NULL;
PDBUSER dup = (PDBUSER)g->Activityp->Aptr;
......@@ -378,11 +381,14 @@ bool XINDEX::Make(PGLOBAL g, PIXDEF sxp)
// Check return code and do whatever must be done according to it
switch (rc) {
case RC_OK:
if (ApplyFilter(g, Tdbp->GetFilter()))
break;
case RC_EF:
goto end_of_file;
// passthru
case RC_NF:
continue;
case RC_EF:
goto end_of_file;
default:
sprintf(g->Message, MSG(RC_READING), rc, Tdbp->Name);
goto err;
......@@ -579,13 +585,14 @@ bool XINDEX::Make(PGLOBAL g, PIXDEF sxp)
Cur_K = Num_K;
/*********************************************************************/
/* Save the index so it has not to be recalculated. */
/* Save the xindex so it has not to be recalculated. */
/*********************************************************************/
if (!SaveIndex(g, sxp))
brc = false;
if (X && SaveIndex(g, sxp))
brc = true;
err:
// We don't need the index anymore
if (X || brc)
Close();
if (brc)
......
......@@ -87,6 +87,7 @@ class DllExport INDEXDEF : public BLOCK { /* Index description block */
void SetNext(PIXDEF pxdf) {Next = pxdf;}
PSZ GetName(void) {return (PSZ)Name;}
bool IsUnique(void) {return Unique;}
bool IsDynamic(void) {return Dynamic;}
bool IsAuto(void) {return AutoInc;}
void SetAuto(bool b) {AutoInc = b;}
void SetInvalid(bool b) {Invalid = b;}
......@@ -115,6 +116,8 @@ class DllExport INDEXDEF : public BLOCK { /* Index description block */
bool Unique; /* true if defined as unique */
bool Invalid; /* true if marked as Invalid */
bool AutoInc; /* true if unique key in auto increment */
bool Dynamic; /* KINDEX style */
bool Mapped; /* Use file mapping */
int Nparts; /* Number of key parts */
int ID; /* Index ID number */
int MaxSame; /* Max number of same values */
......@@ -192,6 +195,7 @@ class DllExport XXBASE : public CSORT, public BLOCK {
virtual void Print(PGLOBAL g, FILE *f, uint n);
virtual void Print(PGLOBAL g, char *ps, uint z);
virtual bool Init(PGLOBAL g) = 0;
virtual bool Make(PGLOBAL g, PIXDEF sxp) = 0;
#if defined(XMAP)
virtual bool MapInit(PGLOBAL g) = 0;
#endif // XMAP
......@@ -420,6 +424,7 @@ class DllExport XXROW : public XXBASE {
virtual int MaxRange(void) {return 1;}
virtual int Range(PGLOBAL g, int limit = 0, bool incl = true);
virtual int Qcompare(int *, int *) {assert(false); return 0;}
virtual bool Make(PGLOBAL g, PIXDEF sxp) {return false;}
virtual void Close(void) {}
protected:
......
......@@ -145,6 +145,7 @@ class DllExport TDBASE : public TDB {
inline PKXBASE GetKindex(void) {return To_Kindex;}
inline PCOL GetSetCols(void) {return To_SetCols;}
inline void SetSetCols(PCOL colp) {To_SetCols = colp;}
inline void SetXdp(PIXDEF xdp) {To_Xdp = xdp;}
// Properties
void SetKindex(PKXBASE kxp);
......@@ -191,6 +192,7 @@ class DllExport TDBASE : public TDB {
PXOB *To_Link; // Points to column of previous relations
PCOL *To_Key_Col; // Points to key columns in current file
PKXBASE To_Kindex; // Points to table key index
PIXDEF To_Xdp; // To the index definition block
PCOL To_SetCols; // Points to updated columns
int MaxSize; // Max size in number of lines
int Knum; // Size of key arrays
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment