Commit 332b47d7 authored by Sergey Petrunya's avatar Sergey Petrunya

Backport of:

revno: 3363.3.16
revision-id: jorgen.loland@oracle.com-20110506132631-5wickj6dvrh1dpj6
parent: alexander.nozdrin@oracle.com-20110506132138-46459va9vcbd4nz0
committer: Jorgen Loland <jorgen.loland@oracle.com>
branch nick: mysql-trunk-11765831
timestamp: Fri 2011-05-06 15:26:31 +0200
message:
  BUG#11765831: 'RANGE ACCESS' MAY INCORRECTLY FILTER
                AWAY QUALIFYING ROWS

  Preparation patch (does not include fix for the bug):

   * Extensively document key_or()
   * Remove tab indentations from key_or()
   * Minor code changes like using existing utility functions
     in key_or()
parent 21bfae66
...@@ -8698,7 +8698,7 @@ key_or(RANGE_OPT_PARAM *param, SEL_ARG *key1,SEL_ARG *key2) ...@@ -8698,7 +8698,7 @@ key_or(RANGE_OPT_PARAM *param, SEL_ARG *key1,SEL_ARG *key2)
{ {
key1->free_tree(); key1->free_tree();
key2->free_tree(); key2->free_tree();
return 0; // Can't optimize this return 0; // Can't optimize this
} }
// If one of the key is MAYBE_KEY then the found region may be bigger // If one of the key is MAYBE_KEY then the found region may be bigger
...@@ -8722,248 +8722,495 @@ key_or(RANGE_OPT_PARAM *param, SEL_ARG *key1,SEL_ARG *key2) ...@@ -8722,248 +8722,495 @@ key_or(RANGE_OPT_PARAM *param, SEL_ARG *key1,SEL_ARG *key2)
swap_variables(SEL_ARG *,key1,key2); swap_variables(SEL_ARG *,key1,key2);
} }
if (key1->use_count > 0 && !(key1=key1->clone_tree(param))) if (key1->use_count > 0 && !(key1=key1->clone_tree(param)))
return 0; // OOM return 0; // OOM
} }
// Add tree at key2 to tree at key1 // Add tree at key2 to tree at key1
bool key2_shared=key2->use_count != 0; bool key2_shared=key2->use_count != 0;
key1->maybe_flag|=key2->maybe_flag; key1->maybe_flag|=key2->maybe_flag;
/*
Notation for illustrations used in the rest of this function:
Range: [--------]
^ ^
start stop
Two overlapping ranges:
[-----] [----] [--]
[---] or [---] or [-------]
Ambiguity: ***
The range starts or stops somewhere in the "***" range.
Example: a starts before b and may end before/the same plase/after b
a: [----***]
b: [---]
Adjacent ranges:
Ranges that meet but do not overlap. Example: a = "x < 3", b = "x >= 3"
a: ----]
b: [----
*/
uint max_part_no= max(key1->max_part_no, key2->max_part_no); uint max_part_no= max(key1->max_part_no, key2->max_part_no);
for (key2=key2->first(); key2; ) for (key2=key2->first(); key2; )
{ {
SEL_ARG *tmp=key1->find_range(key2); // Find key1.min <= key2.min /*
int cmp; key1 consists of one or more ranges. tmp is the range currently
being handled.
initialize tmp to the latest range in key1 that starts the same
place or before the range in key2 starts
key2: [------]
key1: [---] [-----] [----]
^
tmp
*/
SEL_ARG *tmp=key1->find_range(key2);
/*
Used to describe how two key values are positioned compared to
each other. Consider key_value_a.<cmp_func>(key_value_b):
-2: key_value_a is smaller than key_value_b, and they are adjacent
-1: key_value_a is smaller than key_value_b (not adjacent)
0: the key values are equal
1: key_value_a is bigger than key_value_b (not adjacent)
-2: key_value_a is bigger than key_value_b, and they are adjacent
Example: "cmp= tmp->cmp_max_to_min(key2)"
key2: [-------- (10 <= x ...)
tmp: -----] (... x < 10) => cmp==-2
tmp: ----] (... x <= 9) => cmp==-1
tmp: ------] (... x = 10) => cmp== 0
tmp: --------] (... x <= 12) => cmp== 1
(cmp == 2 does not make sense for cmp_max_to_min())
*/
int cmp= 0;
if (!tmp) if (!tmp)
{ {
tmp=key1->first(); // tmp.min > key2.min /*
The range in key2 starts before the first range in key1. Use
the first range in key1 as tmp.
key2: [--------]
key1: [****--] [----] [-------]
^
tmp
*/
tmp=key1->first();
cmp= -1; cmp= -1;
} }
else if ((cmp=tmp->cmp_max_to_min(key2)) < 0) else if ((cmp= tmp->cmp_max_to_min(key2)) < 0)
{ // Found tmp.max < key2.min {
/*
This is the case:
key2: [-------]
tmp: [----**]
*/
SEL_ARG *next=tmp->next; SEL_ARG *next=tmp->next;
/* key1 on the left of key2 non-overlapping */
if (cmp == -2 && eq_tree(tmp->next_key_part,key2->next_key_part)) if (cmp == -2 && eq_tree(tmp->next_key_part,key2->next_key_part))
{ {
// Join near ranges like tmp.max < 0 and key2.min >= 0 /*
SEL_ARG *key2_next=key2->next; Adjacent (cmp==-2) and equal next_key_parts => ranges can be merged
if (key2_shared)
{ This is the case:
if (!(key2=new SEL_ARG(*key2))) key2: [-------]
return 0; // out of memory tmp: [----]
key2->increment_use_count(key1->use_count+1);
key2->next=key2_next; // New copy of key2 Result:
} key2: [-------------] => inserted into key1 below
key2->copy_min(tmp); tmp: => deleted
if (!(key1=key1->tree_delete(tmp))) */
{ // Only one key in tree SEL_ARG *key2_next=key2->next;
key1=key2; if (key2_shared)
key1->make_root(); {
key2=key2_next; if (!(key2=new SEL_ARG(*key2)))
break; return 0; // out of memory
} key2->increment_use_count(key1->use_count+1);
key2->next=key2_next; // New copy of key2
}
key2->copy_min(tmp);
if (!(key1=key1->tree_delete(tmp)))
{ // Only one key in tree
key1=key2;
key1->make_root();
key2=key2_next;
break;
}
} }
if (!(tmp=next)) // tmp.min > key2.min if (!(tmp=next)) // Move to next range in key1. Now tmp.min > key2.min
break; // Copy rest of key2 break; // No more ranges in key1. Copy rest of key2
} }
if (cmp < 0) if (cmp < 0)
{ // tmp.min > key2.min {
/*
This is the case:
key2: [--***]
tmp: [----]
*/
int tmp_cmp; int tmp_cmp;
if ((tmp_cmp=tmp->cmp_min_to_max(key2)) > 0) // if tmp.min > key2.max if ((tmp_cmp=tmp->cmp_min_to_max(key2)) > 0)
{ {
/* tmp is on the right of key2 non-overlapping */ /*
if (tmp_cmp == 2 && eq_tree(tmp->next_key_part,key2->next_key_part)) This is the case:
{ // ranges are connected key2: [------**]
tmp->copy_min_to_min(key2); tmp: [----]
key1->merge_flags(key2); */
if (tmp->min_flag & NO_MIN_RANGE && if (tmp_cmp == 2 && eq_tree(tmp->next_key_part,key2->next_key_part))
tmp->max_flag & NO_MAX_RANGE) {
{ /*
if (key1->maybe_flag) Adjacent ranges with equal next_key_part. Merge like this:
return new SEL_ARG(SEL_ARG::MAYBE_KEY);
return 0; This is the case:
} key2: [------]
key2->increment_use_count(-1); // Free not used tree tmp: [-----]
key2=key2->next;
continue; Result:
} key2: [------]
else tmp: [-------------]
{
SEL_ARG *next=key2->next; // Keys are not overlapping Then move on to next key2 range.
if (key2_shared) */
{ tmp->copy_min_to_min(key2);
SEL_ARG *cpy= new SEL_ARG(*key2); // Must make copy key1->merge_flags(key2);
if (!cpy) if (tmp->min_flag & NO_MIN_RANGE &&
return 0; // OOM tmp->max_flag & NO_MAX_RANGE)
key1=key1->insert(cpy); {
key2->increment_use_count(key1->use_count+1); if (key1->maybe_flag)
} return new SEL_ARG(SEL_ARG::MAYBE_KEY);
else return 0;
key1=key1->insert(key2); // Will destroy key2_root }
key2=next; key2->increment_use_count(-1); // Free not used tree
continue; key2=key2->next;
} continue;
}
else
{
/*
key2 not adjacent to tmp or has different next_key_part.
Insert into key1 and move to next range in key2
This is the case:
key2: [------**]
tmp: [----]
Result:
key1_ [------**][----]
^ ^
insert tmp
*/
SEL_ARG *next=key2->next;
if (key2_shared)
{
SEL_ARG *cpy= new SEL_ARG(*key2); // Must make copy
if (!cpy)
return 0; // OOM
key1=key1->insert(cpy);
key2->increment_use_count(key1->use_count+1);
}
else
key1=key1->insert(key2); // Will destroy key2_root
key2=next;
continue;
}
} }
} }
/* /*
tmp.min >= key2.min && tmp.min <= key.max (overlapping ranges) The ranges in tmp and key2 are overlapping:
key2.min <= tmp.min <= key2.max
*/ key2: [----------]
tmp: [*****-----*****]
Corollary: tmp.min <= key2.max
*/
if (eq_tree(tmp->next_key_part,key2->next_key_part)) if (eq_tree(tmp->next_key_part,key2->next_key_part))
{ {
// Merge overlapping ranges with equal next_key_part
if (tmp->is_same(key2)) if (tmp->is_same(key2))
{ {
/* /*
Found exact match of key2 inside key1. Found exact match of key2 inside key1.
Use the relevant range in key1. Use the relevant range in key1.
*/ */
tmp->merge_flags(key2); // Copy maybe flags tmp->merge_flags(key2); // Copy maybe flags
key2->increment_use_count(-1); // Free not used tree key2->increment_use_count(-1); // Free not used tree
} }
else else
{ {
SEL_ARG *last=tmp; SEL_ARG *last= tmp;
SEL_ARG *first=tmp; SEL_ARG *first= tmp;
/*
Find the last range in tmp that overlaps key2 and has the same /*
condition on the rest of the keyparts. Find the last range in key1 that overlaps key2 and
where all ranges first...last have the same next_key_part as
key2.
key2: [****----------------------*******]
key1: [--] [----] [---] [-----] [xxxx]
^ ^ ^
first last different next_key_part
Since key2 covers them, the ranges between first and last
are merged into one range by deleting first...last-1 from
the key1 tree. In the figure, this applies to first and the
two consecutive ranges. The range of last is then extended:
* last.min: Set to min(key2.min, first.min)
* last.max: If there is a last->next that overlaps key2 (i.e.,
last->next has a different next_key_part):
Set adjacent to last->next.min
Otherwise: Set to max(key2.max, last.max)
Result:
key2: [****----------------------*******]
[--] [----] [---] => deleted from key1
key1: [**------------------------***][xxxx]
^ ^
tmp=last different next_key_part
*/ */
while (last->next && last->next->cmp_min_to_max(key2) <= 0 && while (last->next && last->next->cmp_min_to_max(key2) <= 0 &&
eq_tree(last->next->next_key_part,key2->next_key_part)) eq_tree(last->next->next_key_part,key2->next_key_part))
{ {
/* /*
We've found the last overlapping key1 range in last. last->next is covered by key2 and has same next_key_part.
This means that the ranges between (and including) the last can be deleted
first overlapping range (tmp) and the last overlapping range
(last) are fully nested into the current range of key2
and can safely be discarded. We just need the minimum endpoint
of the first overlapping range (tmp) so we can compare it with
the minimum endpoint of the enclosing key2 range.
*/ */
SEL_ARG *save=last; SEL_ARG *save=last;
last=last->next; last=last->next;
key1=key1->tree_delete(save); key1=key1->tree_delete(save);
} }
// Redirect tmp to last which will cover the entire range
tmp= last;
/* /*
The tmp range (the first overlapping range) could have been discarded We need the minimum endpoint of first so we can compare it
by the previous loop. We should re-direct tmp to the new united range with the minimum endpoint of the enclosing key2 range.
that's taking its place.
*/ */
tmp= last;
last->copy_min(first); last->copy_min(first);
bool full_range= last->copy_min(key2); bool full_range= last->copy_min(key2);
if (!full_range) if (!full_range)
{ {
if (last->next && key2->cmp_max_to_min(last->next) >= 0) if (last->next && key2->cmp_max_to_min(last->next) >= 0)
{ {
last->max_value= last->next->min_value; /*
if (last->next->min_flag & NEAR_MIN) This is the case:
last->max_flag&= ~NEAR_MAX; key2: [-------------]
else key1: [***------] [xxxx]
last->max_flag|= NEAR_MAX; ^ ^
last different next_key_part
Extend range of last up to last->next:
key2: [-------------]
key1: [***--------][xxxx]
*/
last->copy_min_to_max(last->next);
} }
else else
/*
This is the case:
key2: [--------*****]
key1: [***---------] [xxxx]
^ ^
last different next_key_part
Extend range of last up to max(last.max, key2.max):
key2: [--------*****]
key1: [***----------**] [xxxx]
*/
full_range= last->copy_max(key2); full_range= last->copy_max(key2);
} }
if (full_range) if (full_range)
{ // Full range { // Full range
key1->free_tree(); key1->free_tree();
for (; key2 ; key2=key2->next) for (; key2 ; key2=key2->next)
key2->increment_use_count(-1); // Free not used tree key2->increment_use_count(-1); // Free not used tree
if (key1->maybe_flag) if (key1->maybe_flag)
return new SEL_ARG(SEL_ARG::MAYBE_KEY); return new SEL_ARG(SEL_ARG::MAYBE_KEY);
return 0; return 0;
} }
} }
} }
if (cmp >= 0 && tmp->cmp_min_to_min(key2) < 0) if (cmp >= 0 && tmp->cmp_min_to_min(key2) < 0)
{ // tmp.min <= x < key2.min {
/*
This is the case ("cmp>=0" means that tmp.max >= key2.min):
key2: [----]
tmp: [------------*****]
The ranges are overlapping but have not been merged because
next_key_part of tmp and key2 are different
Result:
key2: [----]
key1: [--------][--*****]
^ ^
insert tmp
*/
SEL_ARG *new_arg=tmp->clone_first(key2); SEL_ARG *new_arg=tmp->clone_first(key2);
if (!new_arg) if (!new_arg)
return 0; // OOM return 0; // OOM
if ((new_arg->next_key_part= key1->next_key_part)) if ((new_arg->next_key_part= key1->next_key_part))
new_arg->increment_use_count(key1->use_count+1); new_arg->increment_use_count(key1->use_count+1);
tmp->copy_min_to_min(key2); tmp->copy_min_to_min(key2);
key1=key1->insert(new_arg); key1=key1->insert(new_arg);
} } // tmp.min >= key2.min due to this if()
// tmp.min >= key2.min && tmp.min <= key2.max /*
SEL_ARG key(*key2); // Get copy we can modify Now key2.min <= tmp.min <= key2.max:
key2: [---------]
tmp: [****---*****]
*/
SEL_ARG key2_cpy(*key2); // Get copy we can modify
for (;;) for (;;)
{ {
if (tmp->cmp_min_to_min(&key) > 0) if (tmp->cmp_min_to_min(&key2_cpy) > 0)
{ // key.min <= x < tmp.min {
SEL_ARG *new_arg=key.clone_first(tmp); /*
if (!new_arg) This is the case:
return 0; // OOM key2_cpy: [------------]
if ((new_arg->next_key_part=key.next_key_part)) key1: [-*****]
new_arg->increment_use_count(key1->use_count+1); ^
key1=key1->insert(new_arg); tmp
}
if ((cmp=tmp->cmp_max_to_max(&key)) <= 0) Result:
{ // tmp.min. <= x <= tmp.max key2_cpy: [---]
tmp->maybe_flag|= key.maybe_flag; key1: [-------][-*****]
key.increment_use_count(key1->use_count+1); ^ ^
tmp->next_key_part= key_or(param, tmp->next_key_part, key.next_key_part); insert tmp
if (!cmp) // Key2 is ready */
break; SEL_ARG *new_arg=key2_cpy.clone_first(tmp);
key.copy_max_to_min(tmp); if (!new_arg)
if (!(tmp=tmp->next)) return 0; // OOM
{ if ((new_arg->next_key_part=key2_cpy.next_key_part))
SEL_ARG *tmp2= new SEL_ARG(key); new_arg->increment_use_count(key1->use_count+1);
if (!tmp2) key1=key1->insert(new_arg);
return 0; // OOM key2_cpy.copy_min_to_min(tmp);
key1=key1->insert(tmp2); }
key2=key2->next; // Now key2_cpy.min == tmp.min
goto end;
} if ((cmp= tmp->cmp_max_to_max(&key2_cpy)) <= 0)
if (tmp->cmp_min_to_max(&key) > 0) {
{ /*
SEL_ARG *tmp2= new SEL_ARG(key); tmp.max <= key2_cpy.max:
if (!tmp2) key2_cpy: a) [-------] or b) [----]
return 0; // OOM tmp: [----] [----]
key1=key1->insert(tmp2);
break; Steps:
} 1) Update next_key_part of tmp: OR it with key2_cpy->next_key_part.
2) If case a: Insert range [tmp.max, key2_cpy.max] into key1 using
next_key_part of key2_cpy
Result:
key1: a) [----][-] or b) [----]
*/
tmp->maybe_flag|= key2_cpy.maybe_flag;
key2_cpy.increment_use_count(key1->use_count+1);
tmp->next_key_part= key_or(param, tmp->next_key_part,
key2_cpy.next_key_part);
if (!cmp)
break; // case b: done with this key2 range
// Make key2_cpy the range [tmp.max, key2_cpy.max]
key2_cpy.copy_max_to_min(tmp);
if (!(tmp=tmp->next))
{
/*
No more ranges in key1. Insert key2_cpy and go to "end"
label to insert remaining ranges in key2 if any.
*/
SEL_ARG *tmp2= new SEL_ARG(key2_cpy);
if (!tmp2)
return 0; // OOM
key1=key1->insert(tmp2);
key2=key2->next;
goto end;
}
if (tmp->cmp_min_to_max(&key2_cpy) > 0)
{
/*
The next range in key1 does not overlap with key2_cpy.
Insert this range into key1 and move on to the next range
in key2.
*/
SEL_ARG *tmp2= new SEL_ARG(key2_cpy);
if (!tmp2)
return 0; // OOM
key1=key1->insert(tmp2);
break;
}
/*
key2_cpy overlaps with the next range in key1 and the case
is now "key2.min <= tmp.min <= key2.max". Go back to for(;;)
to handle this situation.
*/
continue;
} }
else else
{ {
SEL_ARG *new_arg=tmp->clone_last(&key); // tmp.min <= x <= key.max /*
if (!new_arg) This is the case:
return 0; // OOM key2_cpy: [-------]
tmp->copy_max_to_min(&key); tmp: [------------]
tmp->increment_use_count(key1->use_count+1);
/* Increment key count as it may be used for next loop */ Result:
key.increment_use_count(1); key1: [-------][---]
new_arg->next_key_part= key_or(param, tmp->next_key_part, key.next_key_part); ^ ^
key1=key1->insert(new_arg); new_arg tmp
break; Steps:
1) Make new_arg with range [tmp.min, key2_cpy.max].
new_arg->next_key_part is OR between next_key_part
of tmp and key2_cpy
2) Make tmp the range [key2.max, tmp.max]
3) Insert new_arg into key1
*/
SEL_ARG *new_arg=tmp->clone_last(&key2_cpy);
if (!new_arg)
return 0; // OOM
tmp->copy_max_to_min(&key2_cpy);
tmp->increment_use_count(key1->use_count+1);
/* Increment key count as it may be used for next loop */
key2_cpy.increment_use_count(1);
new_arg->next_key_part= key_or(param, tmp->next_key_part,
key2_cpy.next_key_part);
key1=key1->insert(new_arg);
break;
} }
} }
key2=key2->next; // Move on to next range in key2
key2=key2->next;
} }
end: end:
/*
Add key2 ranges that are non-overlapping with and higher than the
highest range in key1.
*/
while (key2) while (key2)
{ {
SEL_ARG *next=key2->next; SEL_ARG *next=key2->next;
if (key2_shared) if (key2_shared)
{ {
SEL_ARG *tmp=new SEL_ARG(*key2); // Must make copy SEL_ARG *tmp=new SEL_ARG(*key2); // Must make copy
if (!tmp) if (!tmp)
return 0; return 0;
key2->increment_use_count(key1->use_count+1); key2->increment_use_count(key1->use_count+1);
key1=key1->insert(tmp); key1=key1->insert(tmp);
} }
else else
key1=key1->insert(key2); // Will destroy key2_root key1=key1->insert(key2); // Will destroy key2_root
key2=next; key2=next;
} }
key1->use_count++; key1->use_count++;
key1->max_part_no= max_part_no; key1->max_part_no= max_part_no;
return key1; return key1;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment