Commit 20c8dd98 authored by unknown's avatar unknown

Added support for LIKE and NOT LIKE in condition pushdown

parent 39840baa
......@@ -5907,6 +5907,8 @@ void ndb_serialize_cond(const Item *item, void *arg)
break;
}
}
DBUG_PRINT("info", ("Was not expecting field of type %u",
field->result_type()));
context->supported= FALSE;
break;
}
......@@ -6009,14 +6011,16 @@ void ndb_serialize_cond(const Item *item, void *arg)
DBUG_PRINT("info", ("LIKE_FUNC"));
curr_cond->ndb_item= new Ndb_item(func_item->functype());
context->expect(Item::STRING_ITEM);
context->supported= FALSE; // Currently not supported
context->expect(Item::FIELD_ITEM);
context->expect_field_result(STRING_RESULT);
break;
}
case(Item_func::NOTLIKE_FUNC): {
DBUG_PRINT("info", ("NOTLIKE_FUNC"));
curr_cond->ndb_item= new Ndb_item(func_item->functype());
context->expect(Item::STRING_ITEM);
context->supported= FALSE; // Currently not supported
context->expect(Item::FIELD_ITEM);
context->expect_field_result(STRING_RESULT);
break;
}
case(Item_func::ISNULL_FUNC): {
......@@ -6039,6 +6043,13 @@ void ndb_serialize_cond(const Item *item, void *arg)
context->expect_field_result(DECIMAL_RESULT);
break;
}
case(Item_func::NOT_FUNC): {
DBUG_PRINT("info", ("NOT_FUNC"));
curr_cond->ndb_item= new Ndb_item(func_item->functype());
context->expect(Item::FUNC_ITEM);
context->expect(Item::COND_ITEM);
break;
}
case(Item_func::UNKNOWN_FUNC): {
DBUG_PRINT("info", ("UNKNOWN_FUNC %s",
func_item->const_item()?"const":""));
......@@ -6293,7 +6304,8 @@ ha_ndbcluster::serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond)
int
ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
NdbScanFilter *filter)
NdbScanFilter *filter,
bool negated)
{
DBUG_ENTER("build_scan_filter_predicate");
switch(cond->ndb_item->type) {
......@@ -6301,7 +6313,10 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
if (!cond->next)
break;
Ndb_item *a= cond->next->ndb_item;
switch(cond->ndb_item->qualification.function_type) {
switch((negated) ?
Ndb_item::negate(cond->ndb_item->qualification.function_type)
: cond->ndb_item->qualification.function_type)
{
case(Item_func::EQ_FUNC): {
if (!cond->next->next)
break;
......@@ -6511,14 +6526,13 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
// Save value in right format for the field type
value->save_in_field(field);
DBUG_PRINT("info", ("Generating LIKE filter: like(%d,%s,%d)",
field->get_field_no(), field->get_val(),
field->pack_length()));
/*
if (filter->like(field->get_field_no(),
field->get_field_no(), value->get_val(),
value->pack_length()));
if (filter->cmp(NdbScanFilter::COND_LIKE,
field->get_field_no(),
field->get_val(),
field->pack_length()) == -1)
DBUG_RETURN(1);
*/
cond= cond->next->next->next;
DBUG_RETURN(0);
}
......@@ -6539,13 +6553,13 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
// Save value in right format for the field type
value->save_in_field(field);
DBUG_PRINT("info", ("Generating NOTLIKE filter: notlike(%d,%s,%d)",
field->get_field_no(), field->get_val(),
field->pack_length()));
/*
if (filter->notlike(field->get_field_no(),
field->get_val(), field->pack_length()) == -1)
field->get_field_no(), value->get_val(),
value->pack_length()));
if (filter->cmp(NdbScanFilter::COND_NOT_LIKE,
field->get_field_no(),
field->get_val(),
field->pack_length()) == -1)
DBUG_RETURN(1);
*/
cond= cond->next->next->next;
DBUG_RETURN(0);
}
......@@ -6581,7 +6595,8 @@ ha_ndbcluster::build_scan_filter_predicate(Ndb_cond * &cond,
}
int
ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter,
bool negated)
{
DBUG_ENTER("build_scan_filter_group");
if (!cond) DBUG_RETURN(1);
......@@ -6589,8 +6604,9 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
case(NDB_FUNCTION):
switch(cond->ndb_item->qualification.function_type) {
case(Item_func::COND_AND_FUNC): {
DBUG_PRINT("info", ("Generating AND group"));
if (filter->begin(NdbScanFilter::AND) == -1)
DBUG_PRINT("info", ("Generating %s group", (negated)?"NAND":"AND"));
if ((negated) ? filter->begin(NdbScanFilter::NAND)
: filter->begin(NdbScanFilter::AND) == -1)
DBUG_RETURN(1);
cond= cond->next;
do
......@@ -6601,12 +6617,13 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
if (cond) cond= cond->next;
if (filter->end() == -1)
DBUG_RETURN(1);
DBUG_PRINT("info", ("End of AND group"));
DBUG_PRINT("info", ("End of %s group", (negated)?"NAND":"AND"));
break;
}
case(Item_func::COND_OR_FUNC): {
DBUG_PRINT("info", ("Generating OR group"));
if (filter->begin(NdbScanFilter::OR) == -1)
DBUG_PRINT("info", ("Generating % group", (negated)?"NOR":"OR"));
if ((negated) ? filter->begin(NdbScanFilter::OR)
: filter->begin(NdbScanFilter::OR) == -1)
DBUG_RETURN(1);
cond= cond->next;
do
......@@ -6617,11 +6634,16 @@ ha_ndbcluster::build_scan_filter_group(Ndb_cond* &cond, NdbScanFilter *filter)
if (cond) cond= cond->next;
if (filter->end() == -1)
DBUG_RETURN(1);
DBUG_PRINT("info", ("End of OR group"));
DBUG_PRINT("info", ("End of %s group", (negated)?"NOR":"OR"));
break;
}
case(Item_func::NOT_FUNC): {
cond= cond->next;
build_scan_filter_group(cond, filter, true);
break;
}
default:
if (build_scan_filter_predicate(cond, filter))
if (build_scan_filter_predicate(cond, filter, negated))
DBUG_RETURN(1);
}
break;
......
......@@ -87,6 +87,27 @@ typedef union ndb_item_value {
NDB_ITEM_FIELD_VALUE *field_value;
} NDB_ITEM_VALUE;
struct negated_function_mapping
{
Item_func::Functype pos_fun;
Item_func::Functype neg_fun;
};
static const negated_function_mapping neg_map[]=
{
{Item_func::EQ_FUNC, Item_func::NE_FUNC},
{Item_func::NE_FUNC, Item_func::EQ_FUNC},
{Item_func::LT_FUNC, Item_func::GE_FUNC},
{Item_func::LE_FUNC, Item_func::GT_FUNC},
{Item_func::GT_FUNC, Item_func::LE_FUNC},
{Item_func::GE_FUNC, Item_func::LT_FUNC},
{Item_func::LIKE_FUNC, Item_func::NOTLIKE_FUNC},
{Item_func::NOTLIKE_FUNC, Item_func::LIKE_FUNC},
{Item_func::ISNULL_FUNC, Item_func::ISNOTNULL_FUNC},
{Item_func::ISNOTNULL_FUNC, Item_func::ISNULL_FUNC},
{Item_func::UNKNOWN_FUNC, Item_func::NOT_FUNC}
};
/*
This class is used for serialization of the Item tree for
condition pushdown. It is stored in a linked list implemented
......@@ -141,6 +162,10 @@ class Ndb_item {
uint32 pack_length()
{
switch(type) {
case(NDB_VALUE):
if(qualification.value_type == Item::STRING_ITEM)
return value.item->str_value.length();
break;
case(NDB_FIELD):
return value.field_value->field->pack_length();
default:
......@@ -149,9 +174,27 @@ class Ndb_item {
return 0;
};
Field * get_field() { return value.field_value->field; };
int get_field_no() { return value.field_value->column_no; };
char* get_val() { return value.field_value->field->ptr; };
const char* get_val()
{
switch(type) {
case(NDB_VALUE):
if(qualification.value_type == Item::STRING_ITEM)
return value.item->str_value.ptr();
break;
case(NDB_FIELD):
return value.field_value->field->ptr;
default:
break;
}
return NULL;
};
void save_in_field(Ndb_item *field_item)
{
Field *field = field_item->value.field_value->field;
......@@ -159,7 +202,17 @@ class Ndb_item {
if (item && field)
((Item *)item)->save_in_field(field, false);
}
};
static Item_func::Functype negate(Item_func::Functype fun)
{
uint i;
for (i=0;
fun != neg_map[i].pos_fun &&
neg_map[i].pos_fun != Item_func::UNKNOWN_FUNC;
i++);
return neg_map[i].neg_fun;
};
NDB_ITEM_TYPE type;
NDB_ITEM_QUALIFICATION qualification;
......@@ -478,9 +531,11 @@ private:
void cond_clear();
bool serialize_cond(const COND *cond, Ndb_cond_stack *ndb_cond);
int build_scan_filter_predicate(Ndb_cond* &cond,
NdbScanFilter* filter);
NdbScanFilter* filter,
bool negated= false);
int build_scan_filter_group(Ndb_cond* &cond,
NdbScanFilter* filter);
NdbScanFilter* filter,
bool negated= false);
int build_scan_filter(Ndb_cond* &cond, NdbScanFilter* filter);
int generate_scan_filter(Ndb_cond_stack* cond_stack,
NdbScanOperation* op);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment