Final step of Take 7 on review fixes of this

fairly complex bug
parent 6df933a8
drop table if exists t1;
create table t1 (s1 char(2) character set utf8)
partition by list (case when s1 > 'cz' then 1 else 2 end)
(partition p1 values in (1),
partition p2 values in (2));
drop table t1;
create table t1 (a int)
partition by key(a)
partitions 0.2+e1;
......
......@@ -859,5 +859,171 @@ void partition_info::print_no_partition_found(TABLE *table)
my_error(ER_NO_PARTITION_FOR_GIVEN_VALUE, MYF(0), buf_ptr);
dbug_tmp_restore_column_map(table->read_set, old_map);
}
/*
Set up buffers and arrays for fields requiring preparation
SYNOPSIS
set_up_charset_field_preps()
part_info Partition info object
RETURN VALUES
TRUE Memory Allocation error
FALSE Success
DESCRIPTION
Set up arrays and buffers for fields that require special care for
calculation of partition id. This is used for string fields with
variable length or string fields with fixed length that isn't using
the binary collation.
*/
#endif /* WITH_PARTITION_STORAGE_ENGINE */
bool partition_info::set_up_charset_field_preps()
{
Field *field, **ptr;
char *field_buf;
char **char_ptrs;
unsigned i;
bool found;
size_t size;
uint tot_fields= 0;
uint tot_part_fields= 0;
uint tot_subpart_fields= 0;
DBUG_ENTER("set_up_charset_field_preps");
if (!(part_type == HASH_PARTITION &&
list_of_part_fields) &&
check_part_func_fields(part_field_array, FALSE))
{
ptr= part_field_array;
/* Set up arrays and buffers for those fields */
i= 0;
while ((field= *(ptr++)))
{
if (field_is_partition_charset(field))
{
tot_part_fields++;
tot_fields++;
}
}
size= tot_part_fields * sizeof(char*);
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
part_field_buffers= char_ptrs;
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
restore_part_field_ptrs= char_ptrs;
size= (tot_part_fields + 1) * sizeof(Field*);
if (!(char_ptrs= (char**)sql_alloc(size)))
goto error;
part_charset_field_array= (Field**)char_ptrs;
ptr= part_field_array;
i= 0;
while ((field= *(ptr++)))
{
if (field_is_partition_charset(field))
{
CHARSET_INFO *cs= ((Field_str*)field)->charset();
size= field->pack_length();
if (!(field_buf= sql_calloc(size)))
goto error;
part_charset_field_array[i]= field;
part_field_buffers[i++]= field_buf;
}
}
part_charset_field_array[i]= NULL;
}
if (is_sub_partitioned() && list_of_subpart_fields &&
check_part_func_fields(subpart_field_array, FALSE))
{
/* Set up arrays and buffers for those fields */
ptr= subpart_field_array;
while ((field= *(ptr++)))
{
if (field_is_partition_charset(field))
tot_subpart_fields++;
}
size= tot_subpart_fields * sizeof(char*);
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
subpart_field_buffers= char_ptrs;
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
restore_subpart_field_ptrs= char_ptrs;
size= (tot_subpart_fields + 1) * sizeof(Field*);
if (!(char_ptrs= (char**)sql_alloc(size)))
goto error;
subpart_charset_field_array= (Field**)char_ptrs;
i= 0;
while ((field= *(ptr++)))
{
unsigned j= 0;
Field *part_field;
CHARSET_INFO *cs;
if (!field_is_partition_charset(field))
continue;
cs= ((Field_str*)field)->charset();
size= field->pack_length();
found= FALSE;
for (j= 0; j < tot_part_fields; j++)
{
if (field == part_charset_field_array[i])
found= TRUE;
}
if (!found)
{
tot_fields++;
if (!(field_buf= sql_calloc(size)))
goto error;
}
subpart_field_buffers[i++]= field_buf;
}
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
restore_subpart_field_ptrs= char_ptrs;
}
if (tot_fields)
{
Field *part_field, *subpart_field;
uint j,k,l;
size= tot_fields*sizeof(char**);
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
full_part_field_buffers= char_ptrs;
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
restore_full_part_field_ptrs= char_ptrs;
size= (tot_fields + 1) * sizeof(char**);
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
full_part_charset_field_array= (Field**)char_ptrs;
for (i= 0; i < tot_part_fields; i++)
{
full_part_charset_field_array[i]= part_charset_field_array[i];
full_part_field_buffers[i]= part_field_buffers[i];
}
k= tot_part_fields;
l= 0;
for (i= 0; i < tot_subpart_fields; i++)
{
field= subpart_charset_field_array[i];
found= FALSE;
for (j= 0; j < tot_part_fields; j++)
{
if (field == part_charset_field_array[i])
found= TRUE;
}
if (!found)
{
full_part_charset_field_array[l]= subpart_charset_field_array[k];
full_part_field_buffers[l]= subpart_field_buffers[k];
k++; l++;
}
}
full_part_charset_field_array[tot_fields]= NULL;
}
DBUG_RETURN(FALSE);
error:
mem_alloc_error(size);
DBUG_RETURN(TRUE);
}
#endif
/* WITH_PARTITION_STORAGE_ENGINE */
......@@ -73,14 +73,15 @@ public:
/* NULL-terminated array of fields used in partitioned expression */
Field **part_field_array;
/* NULL-terminated array of fields used in subpartitioned expression */
Field **subpart_field_array;
Field **part_charset_field_array;
Field **subpart_charset_field_array;
/*
Array of all fields used in partition and subpartition expression,
without duplicates, NULL-terminated.
*/
Field **full_part_field_array;
Field **full_part_charset_field_array;
/*
When we have a field that requires transformation before calling the
......@@ -89,8 +90,10 @@ public:
*/
char **part_field_buffers;
char **subpart_field_buffers;
char **full_part_field_buffers;
char **restore_part_field_ptrs;
char **restore_subpart_field_ptrs;
char **restore_full_part_field_ptrs;
Item *part_expr;
Item *subpart_expr;
......@@ -208,17 +211,20 @@ public:
bool is_auto_partitioned;
bool from_openfrm;
bool has_null_value;
bool includes_charset_field_part;
bool includes_charset_field_subpart;
partition_info()
: get_partition_id(NULL), get_part_partition_id(NULL),
get_subpartition_id(NULL),
part_field_array(NULL), subpart_field_array(NULL),
part_charset_field_array(NULL),
subpart_charset_field_array(NULL),
full_part_field_array(NULL),
full_part_charset_field_array(NULL),
part_field_buffers(NULL), subpart_field_buffers(NULL),
full_part_field_buffers(NULL),
restore_part_field_ptrs(NULL), restore_subpart_field_ptrs(NULL),
restore_full_part_field_ptrs(NULL),
part_expr(NULL), subpart_expr(NULL), item_free_list(NULL),
first_log_entry(NULL), exec_log_entry(NULL), frm_log_entry(NULL),
list_array(NULL),
......@@ -241,8 +247,7 @@ public:
list_of_part_fields(FALSE), list_of_subpart_fields(FALSE),
linear_hash_ind(FALSE), fixed(FALSE),
is_auto_partitioned(FALSE), from_openfrm(FALSE),
has_null_value(FALSE), includes_charset_field_part(FALSE),
includes_charset_field_subpart(FALSE)
has_null_value(FALSE)
{
all_fields_in_PF.clear_all();
all_fields_in_PPF.clear_all();
......@@ -278,6 +283,7 @@ public:
handler *file, HA_CREATE_INFO *info,
bool check_partition_function);
void print_no_partition_found(TABLE *table);
bool set_up_charset_field_preps();
private:
static int list_part_cmp(const void* a, const void* b);
static int list_part_cmp_unsigned(const void* a, const void* b);
......
......@@ -1346,20 +1346,19 @@ static void set_up_partition_func_pointers(partition_info *part_info)
}
}
}
if (part_info->includes_charset_field_part ||
part_info->includes_charset_field_subpart)
if (part_info->full_part_charset_field_array)
{
DBUG_ASSERT(part_info->get_partition_id);
part_info->get_partition_id_charset= part_info->get_partition_id;
if (part_info->includes_charset_field_part &&
part_info->includes_charset_field_subpart)
if (part_info->part_charset_field_array &&
part_info->subpart_charset_field_array)
part_info->get_partition_id= get_part_id_charset_func_all;
else if (part_info->includes_charset_field_part)
else if (part_info->part_charset_field_array)
part_info->get_partition_id= get_part_id_charset_func_part;
else
part_info->get_partition_id= get_part_id_charset_func_subpart;
}
if (part_info->includes_charset_field_part &&
if (part_info->part_charset_field_array &&
part_info->is_sub_partitioned())
{
DBUG_ASSERT(part_info->get_part_partition_id);
......@@ -1367,7 +1366,7 @@ static void set_up_partition_func_pointers(partition_info *part_info)
part_info->get_part_partition_id;
part_info->get_part_partition_id= get_part_part_id_charset_func;
}
if (part_info->includes_charset_field_subpart)
if (part_info->subpart_charset_field_array)
{
DBUG_ASSERT(part_info->get_subpartition_id);
part_info->get_subpartition_id_charset=
......@@ -1435,6 +1434,32 @@ static uint32 get_part_id_from_linear_hash(longlong hash_value, uint mask,
}
/*
Check if a particular field is in need of character set
handling for partition functions.
SYNOPSIS
field_is_partition_charset()
field The field to check
RETURN VALUES
FALSE Not in need of character set handling
TRUE In need of character set handling
*/
bool field_is_partition_charset(Field *field)
{
if (!field->type() == MYSQL_TYPE_STRING &&
!field->type() == MYSQL_TYPE_VARCHAR)
return FALSE;
{
CHARSET_INFO *cs= ((Field_str*)field)->charset();
if (!field->type() == MYSQL_TYPE_STRING ||
!(cs->state & MY_CS_BINSORT))
return TRUE;
return FALSE;
}
}
/*
Check that partition function do not contain any forbidden
character sets and collations.
......@@ -1453,7 +1478,7 @@ static uint32 get_part_id_from_linear_hash(longlong hash_value, uint mask,
calling the functions to calculate partition id.
*/
static bool check_part_func_fields(Field **ptr, bool ok_with_charsets)
bool check_part_func_fields(Field **ptr, bool ok_with_charsets)
{
Field *field;
DBUG_ENTER("check_part_func_field");
......@@ -1465,13 +1490,9 @@ static bool check_part_func_fields(Field **ptr, bool ok_with_charsets)
Binary collation with CHAR is automatically supported. Other
types need some kind of standardisation function handling
*/
if (field->type() == MYSQL_TYPE_STRING ||
field->type() == MYSQL_TYPE_VARCHAR)
if (field_is_partition_charset(field))
{
CHARSET_INFO *cs= ((Field_str*)field)->charset();
if (field->type() == MYSQL_TYPE_STRING &&
cs->state & MY_CS_BINSORT)
continue;
if (!ok_with_charsets ||
cs->mbmaxlen > 1 ||
cs->strxfrm_multiply > 1)
......@@ -1483,106 +1504,6 @@ static bool check_part_func_fields(Field **ptr, bool ok_with_charsets)
DBUG_RETURN(FALSE);
}
/*
Set up buffers and arrays for fields requiring preparation
SYNOPSIS
set_up_charset_field_preps()
part_info Partition info object
RETURN VALUES
TRUE Memory Allocation error
FALSE Success
DESCRIPTION
Set up arrays and buffers for fields that require special care for
calculation of partition id. This is used for string fields with
variable length or string fields with fixed length that isn't using
the binary collation.
*/
static bool set_up_charset_field_preps(partition_info *part_info)
{
Field *field, **ptr;
char *field_buf;
char **char_ptrs;
unsigned i;
size_t size;
DBUG_ENTER("set_up_charset_field_preps");
if (check_part_func_fields(part_info->part_field_array, FALSE))
{
ptr= part_info->part_field_array;
part_info->includes_charset_field_part= TRUE;
/*
Set up arrays and buffers for those fields
*/
i= 0;
while ((field= *(ptr++)))
i++;
size= i * sizeof(char*);
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
part_info->part_field_buffers= char_ptrs;
if (!(char_ptrs= (char**)sql_calloc(size)))
goto error;
part_info->restore_part_field_ptrs= char_ptrs;
ptr= part_info->part_field_array;
i= 0;
while ((field= *(ptr++)))
{
CHARSET_INFO *cs= ((Field_str*)field)->charset();
size= field->pack_length();
if (!(field_buf= sql_calloc(size)))
goto error;
part_info->part_field_buffers[i++]= field_buf;
}
}
if (part_info->is_sub_partitioned() &&
check_part_func_fields(part_info->subpart_field_array, FALSE))
{
/*
Set up arrays and buffers for those fields
*/
part_info->includes_charset_field_subpart= TRUE;
ptr= part_info->subpart_field_array;
i= 0;
while ((field= *(ptr++)))
{
unsigned j= 0;
Field *part_field;
Field **part_ptr= part_info->part_field_array;
bool field_already_have_buffer= FALSE;
CHARSET_INFO *cs= ((Field_str*)field)->charset();
size= field->pack_length();
while ((part_field= *(part_ptr++)))
{
field_buf= part_info->part_field_buffers[j++];
if (field == part_field)
{
field_already_have_buffer= TRUE;
break;
}
}
if (!field_already_have_buffer)
{
if (!(field_buf= sql_calloc(size)))
goto error;
}
part_info->subpart_field_buffers[i++]= field_buf;
}
size= i * sizeof(char*);
if (!(char_ptrs= (char**)sql_calloc(i * sizeof(char*))))
goto error;
part_info->restore_subpart_field_ptrs= char_ptrs;
}
DBUG_RETURN(FALSE);
error:
mem_alloc_error(size);
DBUG_RETURN(TRUE);
}
/*
fix partition functions
......@@ -1747,7 +1668,7 @@ bool fix_partition_func(THD *thd, TABLE *table,
goto end;
if (unlikely(set_up_partition_bitmap(thd, part_info)))
goto end;
if (unlikely(set_up_charset_field_preps(part_info)))
if (unlikely(part_info->set_up_charset_field_preps()))
{
my_error(ER_PARTITION_FUNCTION_IS_NOT_ALLOWED, MYF(0));
goto end;
......@@ -2491,10 +2412,7 @@ static void copy_to_part_field_buffers(Field **ptr,
{
*restore_ptr= field->ptr;
restore_ptr++;
if ((field->type() == MYSQL_TYPE_VARCHAR ||
(field->type() == MYSQL_TYPE_STRING &&
(!(((Field_str*)field)->charset()->state & MY_CS_BINSORT))) &&
((!field->maybe_null()) || (!field->is_null()))))
if (!field->maybe_null() || !field->is_null())
{
CHARSET_INFO *cs= ((Field_str*)field)->charset();
uint len= field->pack_length();
......@@ -2628,74 +2546,75 @@ static int get_part_id_charset_func_subpart(partition_info *part_info,
longlong *func_value)
{
int res;
copy_to_part_field_buffers(part_info->subpart_field_array,
copy_to_part_field_buffers(part_info->subpart_charset_field_array,
part_info->subpart_field_buffers,
part_info->restore_subpart_field_ptrs);
res= part_info->get_partition_id_charset(part_info, part_id, func_value);
restore_part_field_pointers(part_info->subpart_field_array,
restore_part_field_pointers(part_info->subpart_charset_field_array,
part_info->restore_subpart_field_ptrs);
return res;
}
static int get_part_id_charset_func_part(partition_info *part_info,
uint32 *part_id,
longlong *func_value)
{
int res;
copy_to_part_field_buffers(part_info->part_field_array,
copy_to_part_field_buffers(part_info->part_charset_field_array,
part_info->part_field_buffers,
part_info->restore_part_field_ptrs);
res= part_info->get_partition_id_charset(part_info, part_id, func_value);
restore_part_field_pointers(part_info->part_field_array,
restore_part_field_pointers(part_info->part_charset_field_array,
part_info->restore_part_field_ptrs);
return res;
}
static int get_part_id_charset_func_all(partition_info *part_info,
uint32 *part_id,
longlong *func_value)
{
int res;
copy_to_part_field_buffers(part_info->part_field_array,
part_info->part_field_buffers,
part_info->restore_part_field_ptrs);
copy_to_part_field_buffers(part_info->subpart_field_array,
part_info->subpart_field_buffers,
part_info->restore_subpart_field_ptrs);
copy_to_part_field_buffers(part_info->full_part_field_array,
part_info->full_part_field_buffers,
part_info->restore_full_part_field_ptrs);
res= part_info->get_partition_id_charset(part_info, part_id, func_value);
restore_part_field_pointers(part_info->part_field_array,
part_info->restore_part_field_ptrs);
restore_part_field_pointers(part_info->subpart_field_array,
part_info->restore_subpart_field_ptrs);
restore_part_field_pointers(part_info->full_part_field_array,
part_info->restore_full_part_field_ptrs);
return res;
}
static int get_part_part_id_charset_func(partition_info *part_info,
uint32 *part_id,
longlong *func_value)
{
int res;
copy_to_part_field_buffers(part_info->part_field_array,
copy_to_part_field_buffers(part_info->part_charset_field_array,
part_info->part_field_buffers,
part_info->restore_part_field_ptrs);
res= part_info->get_part_partition_id_charset(part_info,
part_id, func_value);
restore_part_field_pointers(part_info->part_field_array,
restore_part_field_pointers(part_info->part_charset_field_array,
part_info->restore_part_field_ptrs);
return res;
}
static uint32 get_subpart_id_charset_func(partition_info *part_info)
{
int res;
copy_to_part_field_buffers(part_info->subpart_field_array,
copy_to_part_field_buffers(part_info->subpart_charset_field_array,
part_info->subpart_field_buffers,
part_info->restore_subpart_field_ptrs);
res= part_info->get_subpartition_id_charset(part_info);
restore_part_field_pointers(part_info->subpart_field_array,
restore_part_field_pointers(part_info->subpart_charset_field_array,
part_info->restore_subpart_field_ptrs);
return res;
}
int get_partition_id_list(partition_info *part_info,
uint32 *part_id,
longlong *func_value)
......@@ -6782,7 +6701,7 @@ int get_part_iter_for_interval_via_mapping(partition_info *part_info,
if (part_info->part_type == RANGE_PARTITION)
{
if (part_info->includes_charset_field_part)
if (part_info->part_charset_field_array)
get_endpoint= get_partition_id_range_for_endpoint_charset;
else
get_endpoint= get_partition_id_range_for_endpoint;
......@@ -6792,7 +6711,7 @@ int get_part_iter_for_interval_via_mapping(partition_info *part_info,
else if (part_info->part_type == LIST_PARTITION)
{
if (part_info->includes_charset_field_part)
if (part_info->part_charset_field_array)
get_endpoint= get_list_array_idx_for_endpoint_charset;
else
get_endpoint= get_list_array_idx_for_endpoint;
......
......@@ -93,6 +93,9 @@ uint32 get_partition_id_range_for_endpoint(partition_info *part_info,
bool fix_fields_part_func(THD *thd, Item* func_expr, TABLE *table,
bool is_sub_part, bool is_field_to_be_setup);
bool check_part_func_fields(Field **ptr, bool ok_with_charsets);
bool field_is_partition_charset(Field *field);
/*
A "Get next" function for partition iterator.
......
......@@ -3618,9 +3618,8 @@ part_bit_expr:
mem_alloc_error(sizeof(part_elem_value));
YYABORT;
}
part_expr->walk(&Item::check_partition_func_processor, 0,
(byte*)(&part_expression_ok));
if (!part_expression_ok)
if (part_expr->walk(&Item::check_partition_func_processor, 0,
NULL))
{
my_error(ER_PARTITION_FUNCTION_IS_NOT_ALLOWED, MYF(0));
YYABORT;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment