Commit 43546e7a authored by monty@mysql.com's avatar monty@mysql.com

Moved reading of ranges from opt_range.cc to handler.cc

This gives the handler more optimization possiblities and is needed for NDB cluster
Fixed not-initialized memory error detected by valgrind
parent e94d9352
......@@ -670,7 +670,7 @@ report_stats () {
$ECHO "The log files in $MY_LOG_DIR may give you some hint"
$ECHO "of what when wrong."
$ECHO "If you want to report this error, please read first the documentation at"
$ECHO "http://www.mysql.com/doc/M/y/MySQL_test_suite.html"
$ECHO "http://www.mysql.com/doc/en/MySQL_test_suite.html"
fi
if test -z "$USE_RUNNING_SERVER"
......
......@@ -750,3 +750,10 @@ analyze table t1;
Table Op Msg_type Msg_text
test.t1 analyze status OK
drop table t1;
CREATE TABLE t1 (
fid INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
g GEOMETRY NOT NULL,
SPATIAL KEY(g)
) ENGINE=MyISAM;
INSERT INTO t1 (g) VALUES (GeomFromText('LineString(1 2, 2 3)')),(GeomFromText('LineString(1 2, 2 4)'));
drop table t1;
......@@ -76,6 +76,8 @@ delete from mysql.db where user='mysqltest_1';
delete from mysql.tables_priv where user='mysqltest_1';
delete from mysql.columns_priv where user='mysqltest_1';
flush privileges;
show grants for mysqltest_1@localhost;
ERROR 42000: There is no such grant defined for user 'mysqltest_1' on host 'localhost'
create table t1 (a int);
GRANT select,update,insert on t1 to mysqltest_1@localhost;
GRANT select (a), update (a),insert(a), references(a) on t1 to mysqltest_1@localhost;
......
......@@ -103,3 +103,16 @@ check table t1;
analyze table t1;
drop table t1;
#
# The following crashed gis
#
CREATE TABLE t1 (
fid INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
g GEOMETRY NOT NULL,
SPATIAL KEY(g)
) ENGINE=MyISAM;
INSERT INTO t1 (g) VALUES (GeomFromText('LineString(1 2, 2 3)')),(GeomFromText('LineString(1 2, 2 4)'));
#select * from t1 where g<GeomFromText('LineString(1 2, 2 3)');
drop table t1;
......@@ -53,6 +53,8 @@ delete from mysql.db where user='mysqltest_1';
delete from mysql.tables_priv where user='mysqltest_1';
delete from mysql.columns_priv where user='mysqltest_1';
flush privileges;
--error 1141
show grants for mysqltest_1@localhost;
#
# Test what happens when you have same table and colum level grants
......
......@@ -1287,3 +1287,141 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
mi_change_key_cache(old_key_cache, new_key_cache);
return 0;
}
/*
Read first row between two ranges.
Store ranges for future calls to read_range_next
SYNOPSIS
read_range_first()
start_key Start key. Is 0 if no min range
end_key End key. Is 0 if no max range
sorted Set to 1 if result should be sorted per key
NOTES
Record is read into table->record[0]
RETURN
0 Found row
HA_ERR_END_OF_FILE No rows in range
# Error code
*/
int handler::read_range_first(const key_range *start_key,
const key_range *end_key,
bool sorted)
{
int result;
DBUG_ENTER("handler::read_range_first");
end_range= 0;
if (end_key)
{
end_range= &save_end_range;
save_end_range= *end_key;
key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
(end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
}
range_key_part= table->key_info[active_index].key_part;
if (!start_key) // Read first record
result= index_first(table->record[0]);
else
result= index_read(table->record[0],
start_key->key,
start_key->length,
start_key->flag);
if (result)
DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND ||
result == HA_ERR_END_OF_FILE) ? HA_ERR_END_OF_FILE :
result);
DBUG_RETURN (compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
}
/*
Read next row between two ranges.
SYNOPSIS
read_range_next()
eq_range Set to 1 if start_key == end_key
NOTES
Record is read into table->record[0]
RETURN
0 Found row
HA_ERR_END_OF_FILE No rows in range
# Error code
*/
int handler::read_range_next(bool eq_range)
{
int result;
DBUG_ENTER("handler::read_range_next");
if (eq_range)
result= index_next_same(table->record[0],
end_range->key,
end_range->length);
else
result= index_next(table->record[0]);
if (result)
DBUG_RETURN(result);
DBUG_RETURN(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
}
/*
Compare if found key is over max-value
SYNOPSIS
compare_key
range key to compare to row
NOTES
For this to work, the row must be stored in table->record[0]
RETURN
0 Key is equal to range or 'range' == 0 (no range)
-1 Key is less than range
1 Key is larger than range
*/
int handler::compare_key(key_range *range)
{
KEY_PART_INFO *key_part= range_key_part;
uint store_length;
if (!range)
return 0; // No max range
for (const char *key=range->key, *end=key+range->length;
key < end;
key+= store_length, key_part++)
{
int cmp;
store_length= key_part->store_length;
if (key_part->null_bit)
{
if (*key)
{
if (!key_part->field->is_null())
return 1;
continue;
}
else if (key_part->field->is_null())
return 0;
key++; // Skip null byte
store_length--;
}
if ((cmp=key_part->field->key_cmp((byte*) key, key_part->length)) < 0)
return -1;
if (cmp > 0)
return 1;
}
return key_compare_result_on_equal;
}
......@@ -204,6 +204,14 @@ typedef struct st_ha_check_opt
} HA_CHECK_OPT;
typedef struct st_key_range
{
const byte *key;
uint length;
enum ha_rkey_function flag;
} key_range;
class handler :public Sql_alloc
{
protected:
......@@ -225,6 +233,12 @@ public:
time_t create_time; /* When table was created */
time_t check_time;
time_t update_time;
/* The following are for read_range() */
key_range save_end_range, *end_range;
KEY_PART_INFO *range_key_part;
int key_compare_result_on_equal;
uint errkey; /* Last dup key */
uint sortkey, key_used_on_scan;
uint active_index;
......@@ -236,6 +250,7 @@ public:
bool auto_increment_column_changed;
bool implicit_emptied; /* Can be !=0 only if HEAP */
handler(TABLE *table_arg) :table(table_arg),
ref(0), data_file_length(0), max_data_file_length(0), index_file_length(0),
delete_length(0), auto_increment_value(0),
......@@ -285,6 +300,11 @@ public:
{
return (my_errno=HA_ERR_WRONG_COMMAND);
}
virtual int handler::read_range_first(const key_range *start_key,
const key_range *end_key,
bool sorted);
virtual int handler::read_range_next(bool eq_range);
int handler::compare_key(key_range *range);
virtual int ft_init()
{ return -1; }
virtual FT_INFO *ft_init_ext(uint flags,uint inx,const byte *key, uint keylen)
......
......@@ -646,6 +646,7 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use,
MEM_ROOT *old_root,alloc;
SEL_TREE *tree;
KEY_PART *key_parts;
KEY *key_info;
PARAM param;
/* set up parameter that is passed to all functions */
......@@ -671,17 +672,17 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use,
old_root=my_pthread_getspecific_ptr(MEM_ROOT*,THR_MALLOC);
my_pthread_setspecific_ptr(THR_MALLOC,&alloc);
for (idx=0 ; idx < head->keys ; idx++)
key_info= head->key_info;
for (idx=0 ; idx < head->keys ; idx++, key_info++)
{
KEY_PART_INFO *key_part_info;
if (!keys_to_use.is_set(idx))
continue;
KEY *key_info= &head->key_info[idx];
KEY_PART_INFO *key_part_info= key_info->key_part;
if (key_info->flags & HA_FULLTEXT)
continue; // ToDo: ft-keys in non-ft ranges, if possible SerG
param.key[param.keys]=key_parts;
key_part_info= key_info->key_part;
for (uint part=0 ; part < key_info->key_parts ;
part++, key_parts++, key_part_info++)
{
......@@ -2343,8 +2344,14 @@ get_quick_select(PARAM *param,uint idx,SEL_ARG *key_tree)
{
QUICK_SELECT *quick;
DBUG_ENTER("get_quick_select");
if ((quick=new QUICK_SELECT(param->thd, param->table,
param->real_keynr[idx])))
if (param->table->key_info[param->real_keynr[idx]].flags & HA_SPATIAL)
quick=new QUICK_SELECT_GEOM(param->thd, param->table, param->real_keynr[idx],
0);
else
quick=new QUICK_SELECT(param->thd, param->table, param->real_keynr[idx]);
if (quick)
{
if (quick->error ||
get_quick_keys(param,quick,param->key[idx],key_tree,param->min_key,0,
......@@ -2510,8 +2517,9 @@ static bool null_part_in_key(KEY_PART *key_part, const char *key, uint length)
return 0;
}
/****************************************************************************
** Create a QUICK RANGE based on a key
Create a QUICK RANGE based on a key
****************************************************************************/
QUICK_SELECT *get_quick_select_for_ref(THD *thd, TABLE *table, TABLE_REF *ref)
......@@ -2592,115 +2600,74 @@ int QUICK_SELECT::get_next()
for (;;)
{
int result;
key_range start_key, end_key;
if (range)
{ // Already read through key
result=((range->flag & (EQ_RANGE | GEOM_FLAG)) ?
file->index_next_same(record, (byte*) range->min_key,
range->min_length) :
file->index_next(record));
if (!result)
{
if ((range->flag & GEOM_FLAG) || !cmp_next(*it.ref()))
DBUG_RETURN(0);
}
else if (result != HA_ERR_END_OF_FILE)
// Already read through key
result= file->read_range_next(test(range->flag & EQ_RANGE));
if (result != HA_ERR_END_OF_FILE)
DBUG_RETURN(result);
}
if (!(range=it++))
if (!(range= it++))
DBUG_RETURN(HA_ERR_END_OF_FILE); // All ranges used
if (range->flag & GEOM_FLAG)
{
if ((result = file->index_read(record,
(byte*) (range->min_key),
range->min_length,
(ha_rkey_function)(range->flag ^
GEOM_FLAG))))
{
if (result != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(result);
range=0; // Not found, to next range
continue;
}
DBUG_RETURN(0);
}
if (range->flag & NO_MIN_RANGE) // Read first record
{
int local_error;
if ((local_error=file->index_first(record)))
DBUG_RETURN(local_error); // Empty table
if (cmp_next(range) == 0)
DBUG_RETURN(0);
range=0; // No matching records; go to next range
continue;
}
if ((result = file->index_read(record,
(byte*) (range->min_key +
test(range->flag & GEOM_FLAG)),
range->min_length,
(range->flag & NEAR_MIN) ?
HA_READ_AFTER_KEY:
start_key.key= range->min_key;
start_key.length= range->min_length;
start_key.flag= ((range->flag & NEAR_MIN) ? HA_READ_AFTER_KEY :
(range->flag & EQ_RANGE) ?
HA_READ_KEY_EXACT :
HA_READ_KEY_OR_NEXT)))
HA_READ_KEY_EXACT : HA_READ_KEY_OR_NEXT);
end_key.key= range->max_key;
end_key.length= range->max_length;
/*
We use READ_AFTER_KEY here because if we are reading on a key
prefix we want to find all keys with this prefix
*/
end_key.flag= (range->flag & NEAR_MAX ? HA_READ_BEFORE_KEY :
HA_READ_AFTER_KEY);
{
if (result != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(result);
range=0; // Not found, to next range
continue;
}
if (cmp_next(range) == 0)
{
result= file->read_range_first(range->min_length ? &start_key : 0,
range->max_length ? &end_key : 0,
sorted);
if (range->flag == (UNIQUE_RANGE | EQ_RANGE))
range=0; // Stop searching
DBUG_RETURN(0); // Found key is in range
}
range=0; // To next range
if (result != HA_ERR_END_OF_FILE)
DBUG_RETURN(result);
range=0; // No matching rows; go to next range
}
}
/*
Compare if found key is over max-value
Returns 0 if key <= range->max_key
*/
/* Get next for geometrical indexes */
int QUICK_SELECT::cmp_next(QUICK_RANGE *range_arg)
int QUICK_SELECT_GEOM::get_next()
{
if (range_arg->flag & NO_MAX_RANGE)
return 0; /* key can't be to large */
DBUG_ENTER(" QUICK_SELECT_GEOM::get_next");
KEY_PART *key_part=key_parts;
uint store_length;
for (char *key=range_arg->max_key, *end=key+range_arg->max_length;
key < end;
key+= store_length, key_part++)
{
int cmp;
store_length= key_part->store_length;
if (key_part->null_bit)
for (;;)
{
if (*key)
int result;
if (range)
{
if (!key_part->field->is_null())
return 1;
continue;
}
else if (key_part->field->is_null())
return 0;
key++; // Skip null byte
store_length--;
// Already read through key
result= file->index_next_same(record, (byte*) range->min_key,
range->min_length);
if (result != HA_ERR_END_OF_FILE)
DBUG_RETURN(result);
}
if ((cmp=key_part->field->key_cmp((byte*) key, key_part->length)) < 0)
return 0;
if (cmp > 0)
return 1;
if (!(range= it++))
DBUG_RETURN(HA_ERR_END_OF_FILE); // All ranges used
result= file->index_read(record,
(byte*) range->min_key,
range->min_length,
(ha_rkey_function)(range->flag ^ GEOM_FLAG));
if (result != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(result);
range=0; // Not found, to next range
}
return (range_arg->flag & NEAR_MAX) ? 1 : 0; // Exact match
}
......@@ -2966,7 +2933,7 @@ print_key(KEY_PART *key_part,const char *key,uint used_length)
for (; key < key_end; key+=store_length, key_part++)
{
Field *field= key_part->field;
uint store_length= key_part->store_length;
store_length= key_part->store_length;
if (field->real_maybe_null())
{
......@@ -2975,7 +2942,7 @@ print_key(KEY_PART *key_part,const char *key,uint used_length)
fwrite("NULL",sizeof(char),4,DBUG_FILE);
continue;
}
key++;
key++; // Skip null byte
store_length--;
}
field->set_key_image((char*) key, key_part->length, field->charset());
......
......@@ -89,11 +89,20 @@ public:
int init() { return error=file->index_init(index); }
virtual int get_next();
virtual bool reverse_sorted() { return 0; }
int cmp_next(QUICK_RANGE *range);
bool unique_key_range();
};
class QUICK_SELECT_GEOM: public QUICK_SELECT
{
public:
QUICK_SELECT_GEOM(THD *thd, TABLE *table, uint index_arg, bool no_alloc)
:QUICK_SELECT(thd, table, index_arg, no_alloc)
{};
virtual int get_next();
};
class QUICK_SELECT_DESC: public QUICK_SELECT
{
public:
......
......@@ -3744,7 +3744,8 @@ make_join_readinfo(JOIN *join, uint options)
table->key_read=1;
table->file->extra(HA_EXTRA_KEYREAD);
}
else if (!table->used_keys.is_clear_all() && ! (tab->select && tab->select->quick))
else if (!table->used_keys.is_clear_all() &&
!(tab->select && tab->select->quick))
{ // Only read index tree
tab->index=find_shortest_key(table, & table->used_keys);
tab->table->file->index_init(tab->index);
......@@ -6905,6 +6906,7 @@ static int test_if_order_by_key(ORDER *order, TABLE *table, uint idx,
key_part_end=key_part+table->key_info[idx].key_parts;
key_part_map const_key_parts=table->const_key_parts[idx];
int reverse=0;
DBUG_ENTER("test_if_order_by_key");
for (; order ; order=order->next, const_key_parts>>=1)
{
......@@ -6915,25 +6917,24 @@ static int test_if_order_by_key(ORDER *order, TABLE *table, uint idx,
Skip key parts that are constants in the WHERE clause.
These are already skipped in the ORDER BY by const_expression_in_where()
*/
while (const_key_parts & 1)
{
key_part++; const_key_parts>>=1;
}
for (; const_key_parts & 1 ; const_key_parts>>= 1)
key_part++;
if (key_part == key_part_end || key_part->field != field)
return 0;
DBUG_RETURN(0);
/* set flag to 1 if we can use read-next on key, else to -1 */
flag=(order->asc == !(key_part->key_part_flag & HA_REVERSE_SORT))
? 1 : -1;
flag= ((order->asc == !(key_part->key_part_flag & HA_REVERSE_SORT)) ? 1 : -1);
if (reverse && flag != reverse)
return 0;
DBUG_RETURN(0);
reverse=flag; // Remember if reverse
key_part++;
}
*used_key_parts= (uint) (key_part - table->key_info[idx].key_part);
return reverse;
DBUG_RETURN(reverse);
}
static uint find_shortest_key(TABLE *table, const key_map *usable_keys)
{
uint min_length= (uint) ~0;
......@@ -6956,18 +6957,20 @@ static uint find_shortest_key(TABLE *table, const key_map *usable_keys)
}
/*
Test if a second key is the subkey of the first one.
SYNOPSIS
is_subkey()
key_part - first key parts
ref_key_part - second key parts
ref_key_part_end - last+1 part of the second key
DESCRIPTION
Test if a second key is the subkey of the first one.
key_part First key parts
ref_key_part Second key parts
ref_key_part_end Last+1 part of the second key
NOTE
Second key MUST be shorter than the first one.
RETURN
1 - is the subkey
0 - otherwise
1 is a subkey
0 no sub key
*/
inline bool
......@@ -6981,20 +6984,21 @@ is_subkey(KEY_PART_INFO *key_part, KEY_PART_INFO *ref_key_part,
}
/*
Test if we can use one of the 'usable_keys' instead of 'ref' key for sorting
SYNOPSIS
test_if_subkey()
ref - number of key, used for WHERE clause
usable_keys - keys for testing
DESCRIPTION
Test if we can use one of the 'usable_keys' instead of 'ref' key.
ref Number of key, used for WHERE clause
usable_keys Keys for testing
RETURN
MAX_KEY - if we can't use other key
the number of found key - otherwise
MAX_KEY If we can't use other key
the number of found key Otherwise
*/
static uint
test_if_subkey(ORDER *order, TABLE *table, uint ref, uint ref_key_parts,
const key_map& usable_keys)
const key_map *usable_keys)
{
uint nr;
uint min_length= (uint) ~0;
......@@ -7005,7 +7009,7 @@ test_if_subkey(ORDER *order, TABLE *table, uint ref, uint ref_key_parts,
for (nr= 0 ; nr < table->keys ; nr++)
{
if (usable_keys.is_set(nr) &&
if (usable_keys->is_set(nr) &&
table->key_info[nr].key_length < min_length &&
table->key_info[nr].key_parts >= ref_key_parts &&
is_subkey(table->key_info[nr].key_part, ref_key_part,
......@@ -7049,12 +7053,12 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit,
if ((*tmp_order->item)->type() != Item::FIELD_ITEM)
{
usable_keys.clear_all();
break;
DBUG_RETURN(0);
}
usable_keys.intersect(
((Item_field*) (*tmp_order->item))->field->part_of_sortkey);
usable_keys.intersect(((Item_field*) (*tmp_order->item))->
field->part_of_sortkey);
if (usable_keys.is_clear_all())
break; // No usable keys
DBUG_RETURN(0); // No usable keys
}
ref_key= -1;
......@@ -7090,9 +7094,9 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit,
keys
*/
if (table->used_keys.is_set(ref_key))
usable_keys.merge(table->used_keys);
usable_keys.intersect(table->used_keys);
if ((new_ref_key= test_if_subkey(order, table, ref_key, ref_key_parts,
usable_keys)) < MAX_KEY)
&usable_keys)) < MAX_KEY)
{
/* Found key that can be used to retrieve data in sorted order */
if (tab->ref.key >= 0)
......@@ -7292,9 +7296,9 @@ create_sort_index(THD *thd, JOIN *join, ORDER *order,
For impossible ranges (like when doing a lookup on NULL on a NOT NULL
field, quick will contain an empty record set.
*/
if (!(select->quick= tab->type == JT_FT ?
if (!(select->quick= (tab->type == JT_FT ?
new FT_SELECT(thd, table, tab->ref.key) :
get_quick_select_for_ref(thd, table, &tab->ref)))
get_quick_select_for_ref(thd, table, &tab->ref))))
goto err;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment