partition_info.cc 38.6 KB
Newer Older
1
/* Copyright (C) 2006 MySQL AB
2 3 4

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
   the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13 14 15 16 17

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/* Some general useful functions */

18 19 20 21
#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation
#endif

22 23 24
#include "mysql_priv.h"

#ifdef WITH_PARTITION_STORAGE_ENGINE
unknown's avatar
unknown committed
25 26
#include "ha_partition.h"

27

unknown's avatar
unknown committed
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
partition_info *partition_info::get_clone()
{
  if (!this)
    return 0;
  List_iterator<partition_element> part_it(partitions);
  partition_element *part;
  partition_info *clone= new partition_info();
  if (!clone)
  {
    mem_alloc_error(sizeof(partition_info));
    return NULL;
  }
  memcpy(clone, this, sizeof(partition_info));
  clone->partitions.empty();

  while ((part= (part_it++)))
  {
    List_iterator<partition_element> subpart_it(part->subpartitions);
    partition_element *subpart;
    partition_element *part_clone= new partition_element();
    if (!part_clone)
    {
      mem_alloc_error(sizeof(partition_element));
      return NULL;
    }
    memcpy(part_clone, part, sizeof(partition_element));
    part_clone->subpartitions.empty();
    while ((subpart= (subpart_it++)))
    {
      partition_element *subpart_clone= new partition_element();
      if (!subpart_clone)
      {
        mem_alloc_error(sizeof(partition_element));
        return NULL;
      }
      memcpy(subpart_clone, subpart, sizeof(partition_element));
      part_clone->subpartitions.push_back(subpart_clone);
    }
    clone->partitions.push_back(part_clone);
  }
  return clone;
}
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

/*
  Create a memory area where default partition names are stored and fill it
  up with the names.

  SYNOPSIS
    create_default_partition_names()
    part_no                         Partition number for subparts
    no_parts                        Number of partitions
    start_no                        Starting partition number
    subpart                         Is it subpartitions

  RETURN VALUE
    A pointer to the memory area of the default partition names

  DESCRIPTION
    A support routine for the partition code where default values are
    generated.
    The external routine needing this code is check_partition_info
*/

91
#define MAX_PART_NAME_SIZE 8
92

93
char *partition_info::create_default_partition_names(uint part_no,
94
                                                     uint no_parts_arg,
95
                                                     uint start_no)
96
{
97
  char *ptr= (char*) sql_calloc(no_parts_arg*MAX_PART_NAME_SIZE);
98 99 100 101 102 103 104 105
  char *move_ptr= ptr;
  uint i= 0;
  DBUG_ENTER("create_default_partition_names");

  if (likely(ptr != 0))
  {
    do
    {
106
      my_sprintf(move_ptr, (move_ptr,"p%u", (start_no + i)));
107
      move_ptr+=MAX_PART_NAME_SIZE;
108
    } while (++i < no_parts_arg);
109 110 111
  }
  else
  {
112
    mem_alloc_error(no_parts_arg*MAX_PART_NAME_SIZE);
113 114 115 116 117
  }
  DBUG_RETURN(ptr);
}


118 119 120 121 122 123 124 125 126 127 128
/*
  Create a unique name for the subpartition as part_name'sp''subpart_no'
  SYNOPSIS
    create_subpartition_name()
    subpart_no                  Number of subpartition
    part_name                   Name of partition
  RETURN VALUES
    >0                          A reference to the created name string
    0                           Memory allocation error
*/

129 130
char *partition_info::create_subpartition_name(uint subpart_no,
                                               const char *part_name)
131 132
{
  uint size_alloc= strlen(part_name) + MAX_PART_NAME_SIZE;
133
  char *ptr= (char*) sql_calloc(size_alloc);
134 135 136 137 138 139 140 141 142 143 144 145 146 147
  DBUG_ENTER("create_subpartition_name");

  if (likely(ptr != NULL))
  {
    my_sprintf(ptr, (ptr, "%ssp%u", part_name, subpart_no));
  }
  else
  {
    mem_alloc_error(size_alloc);
  }
  DBUG_RETURN(ptr);
}


148 149 150 151 152 153 154 155
/*
  Set up all the default partitions not set-up by the user in the SQL
  statement. Also perform a number of checks that the user hasn't tried
  to use default values where no defaults exists.

  SYNOPSIS
    set_up_default_partitions()
    file                A reference to a handler of the table
156
    info                Create info
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
    start_no            Starting partition number

  RETURN VALUE
    TRUE                Error, attempted default values not possible
    FALSE               Ok, default partitions set-up

  DESCRIPTION
    The routine uses the underlying handler of the partitioning to define
    the default number of partitions. For some handlers this requires
    knowledge of the maximum number of rows to be stored in the table.
    This routine only accepts HASH and KEY partitioning and thus there is
    no subpartitioning if this routine is successful.
    The external routine needing this code is check_partition_info
*/

172 173
bool partition_info::set_up_default_partitions(handler *file,
                                               HA_CREATE_INFO *info,
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
                                               uint start_no)
{
  uint i;
  char *default_name;
  bool result= TRUE;
  DBUG_ENTER("partition_info::set_up_default_partitions");

  if (part_type != HASH_PARTITION)
  {
    const char *error_string;
    if (part_type == RANGE_PARTITION)
      error_string= partition_keywords[PKW_RANGE].str;
    else
      error_string= partition_keywords[PKW_LIST].str;
    my_error(ER_PARTITIONS_MUST_BE_DEFINED_ERROR, MYF(0), error_string);
    goto end;
  }
unknown's avatar
unknown committed
191 192 193 194 195 196 197 198

  if ((no_parts == 0) &&
      ((no_parts= file->get_default_no_partitions(info)) == 0))
  {
    my_error(ER_PARTITION_NOT_DEFINED_ERROR, MYF(0), "partitions");
    goto end;
  }

199 200 201 202 203 204
  if (unlikely(no_parts > MAX_PARTITIONS))
  {
    my_error(ER_TOO_MANY_PARTITIONS_ERROR, MYF(0));
    goto end;
  }
  if (unlikely((!(default_name= create_default_partition_names(0, no_parts,
205
                                                               start_no)))))
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
    goto end;
  i= 0;
  do
  {
    partition_element *part_elem= new partition_element();
    if (likely(part_elem != 0 &&
               (!partitions.push_back(part_elem))))
    {
      part_elem->engine_type= default_engine_type;
      part_elem->partition_name= default_name;
      default_name+=MAX_PART_NAME_SIZE;
    }
    else
    {
      mem_alloc_error(sizeof(partition_element));
      goto end;
    }
  } while (++i < no_parts);
  result= FALSE;
end:
  DBUG_RETURN(result);
}

229

230 231 232 233 234 235 236 237
/*
  Set up all the default subpartitions not set-up by the user in the SQL
  statement. Also perform a number of checks that the default partitioning
  becomes an allowed partitioning scheme.

  SYNOPSIS
    set_up_default_subpartitions()
    file                A reference to a handler of the table
238
    info                Create info
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253

  RETURN VALUE
    TRUE                Error, attempted default values not possible
    FALSE               Ok, default partitions set-up

  DESCRIPTION
    The routine uses the underlying handler of the partitioning to define
    the default number of partitions. For some handlers this requires
    knowledge of the maximum number of rows to be stored in the table.
    This routine is only called for RANGE or LIST partitioning and those
    need to be specified so only subpartitions are specified.
    The external routine needing this code is check_partition_info
*/

bool partition_info::set_up_default_subpartitions(handler *file, 
254
                                                  HA_CREATE_INFO *info)
255
{
256
  uint i, j;
257 258 259 260 261 262
  bool result= TRUE;
  partition_element *part_elem;
  List_iterator<partition_element> part_it(partitions);
  DBUG_ENTER("partition_info::set_up_default_subpartitions");

  if (no_subparts == 0)
263
    no_subparts= file->get_default_no_partitions(info);
264 265 266 267 268 269 270 271 272 273 274 275
  if (unlikely((no_parts * no_subparts) > MAX_PARTITIONS))
  {
    my_error(ER_TOO_MANY_PARTITIONS_ERROR, MYF(0));
    goto end;
  }
  i= 0;
  do
  {
    part_elem= part_it++;
    j= 0;
    do
    {
276
      partition_element *subpart_elem= new partition_element(part_elem);
277 278 279
      if (likely(subpart_elem != 0 &&
          (!part_elem->subpartitions.push_back(subpart_elem))))
      {
280 281 282
        char *ptr= create_subpartition_name(j, part_elem->partition_name);
        if (!ptr)
          goto end;
283
        subpart_elem->engine_type= default_engine_type;
284
        subpart_elem->partition_name= ptr;
285 286 287 288 289 290 291 292 293 294 295 296 297
      }
      else
      {
        mem_alloc_error(sizeof(partition_element));
        goto end;
      }
    } while (++j < no_subparts);
  } while (++i < no_parts);
  result= FALSE;
end:
  DBUG_RETURN(result);
}

298

299 300 301 302 303 304
/*
  Support routine for check_partition_info

  SYNOPSIS
    set_up_defaults_for_partitioning()
    file                A reference to a handler of the table
305
    info                Create info
306 307 308 309 310 311 312 313 314 315 316 317
    start_no            Starting partition number

  RETURN VALUE
    TRUE                Error, attempted default values not possible
    FALSE               Ok, default partitions set-up

  DESCRIPTION
    Set up defaults for partition or subpartition (cannot set-up for both,
    this will return an error.
*/

bool partition_info::set_up_defaults_for_partitioning(handler *file,
318
                                                      HA_CREATE_INFO *info, 
319 320 321 322 323 324 325 326
                                                      uint start_no)
{
  DBUG_ENTER("partition_info::set_up_defaults_for_partitioning");

  if (!default_partitions_setup)
  {
    default_partitions_setup= TRUE;
    if (use_default_partitions)
327
      DBUG_RETURN(set_up_default_partitions(file, info, start_no));
328 329
    if (is_sub_partitioned() && 
        use_default_subpartitions)
330
      DBUG_RETURN(set_up_default_subpartitions(file, info));
331 332 333 334
  }
  DBUG_RETURN(FALSE);
}

335

336 337 338 339 340 341 342 343 344 345 346
/*
  A support function to check if a partition element's name is unique
  
  SYNOPSIS
    has_unique_name()
    partition_element  element to check

  RETURN VALUES
    TRUE               Has unique name
    FALSE              Doesn't
*/
347

348 349 350 351 352 353 354 355
bool partition_info::has_unique_name(partition_element *element)
{
  DBUG_ENTER("partition_info::has_unique_name");
  
  const char *name_to_check= element->partition_name;
  List_iterator<partition_element> parts_it(partitions);
  
  partition_element *el;
356
  while ((el= (parts_it++)))
357 358 359 360 361
  {
    if (!(my_strcasecmp(system_charset_info, el->partition_name, 
                        name_to_check)) && el != element)
        DBUG_RETURN(FALSE);

362
    if (!el->subpartitions.is_empty()) 
363
    {
364 365
      partition_element *sub_el;    
      List_iterator<partition_element> subparts_it(el->subpartitions);
366
      while ((sub_el= (subparts_it++)))
367 368 369 370 371
      {
        if (!(my_strcasecmp(system_charset_info, sub_el->partition_name, 
                            name_to_check)) && sub_el != element)
            DBUG_RETURN(FALSE);
      }
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
    }
  } 
  DBUG_RETURN(TRUE);
}


/*
  A support function to check partition names for duplication in a
  partitioned table

  SYNOPSIS
    has_unique_names()

  RETURN VALUES
    TRUE               Has unique part and subpart names
    FALSE              Doesn't

  DESCRIPTION
    Checks that the list of names in the partitions doesn't contain any
    duplicated names.
*/
393

394 395 396 397 398 399 400
char *partition_info::has_unique_names()
{
  DBUG_ENTER("partition_info::has_unique_names");
  
  List_iterator<partition_element> parts_it(partitions);

  partition_element *el;  
401
  while ((el= (parts_it++)))
402 403 404 405
  {
    if (! has_unique_name(el))
      DBUG_RETURN(el->partition_name);
      
406
    if (!el->subpartitions.is_empty())
407
    {
408 409
      List_iterator<partition_element> subparts_it(el->subpartitions);
      partition_element *subel;
410
      while ((subel= (subparts_it++)))
411 412 413 414
      {
        if (! has_unique_name(subel))
          DBUG_RETURN(subel->partition_name);
      }
415 416 417 418 419
    }
  } 
  DBUG_RETURN(NULL);
}

unknown's avatar
unknown committed
420 421

/*
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
  Check that the partition/subpartition is setup to use the correct
  storage engine
  SYNOPSIS
    check_engine_condition()
    p_elem                   Partition element
    table_engine_set         Have user specified engine on table level
    inout::engine_type       Current engine used
    inout::first             Is it first partition
  RETURN VALUE
    TRUE                     Failed check
    FALSE                    Ok
  DESCRIPTION
    Specified engine for table and partitions p0 and pn
    Must be correct both on CREATE and ALTER commands
    table p0 pn res (0 - OK, 1 - FAIL)
        -  -  - 0
        -  -  x 1
        -  x  - 1
        -  x  x 0
        x  -  - 0
        x  -  x 0
        x  x  - 0
        x  x  x 0
    i.e:
    - All subpartitions must use the same engine
      AND it must be the same as the partition.
    - All partitions must use the same engine
      AND it must be the same as the table.
    - if one does NOT specify an engine on the table level
      then one must either NOT specify any engine on any
      partition/subpartition OR for ALL partitions/subpartitions
    Note:
    When ALTER a table, the engines are already set for all levels
    (table, all partitions and subpartitions). So if one want to
    change the storage engine, one must specify it on the table level

*/

static bool check_engine_condition(partition_element *p_elem,
                                   bool table_engine_set,
                                   handlerton **engine_type,
                                   bool *first)
{
  DBUG_ENTER("check_engine_condition");

unknown's avatar
unknown committed
467 468 469
  DBUG_PRINT("enter", ("p_eng %s t_eng %s t_eng_set %u first %u state %u",
                       ha_resolve_storage_engine_name(p_elem->engine_type),
                       ha_resolve_storage_engine_name(*engine_type),
470 471 472 473
                       table_engine_set, *first, p_elem->part_state));
  if (*first && !table_engine_set)
  {
    *engine_type= p_elem->engine_type;
unknown's avatar
unknown committed
474 475
    DBUG_PRINT("info", ("setting table_engine = %s",
                         ha_resolve_storage_engine_name(*engine_type)));
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
  }
  *first= FALSE;
  if ((table_engine_set &&
      (p_elem->engine_type != (*engine_type) &&
       p_elem->engine_type)) ||
      (!table_engine_set &&
       p_elem->engine_type != (*engine_type)))
  {
    DBUG_RETURN(TRUE);
  }
  else
  {
    DBUG_RETURN(FALSE);
  }
}

unknown's avatar
unknown committed
492

493 494 495 496
/*
  Check engine mix that it is correct
  Current limitation is that all partitions and subpartitions
  must use the same storage engine.
unknown's avatar
unknown committed
497 498
  SYNOPSIS
    check_engine_mix()
499 500
    inout::engine_type       Current engine used
    table_engine_set         Have user specified engine on table level
unknown's avatar
unknown committed
501
  RETURN VALUE
502 503
    TRUE                     Error, mixed engines
    FALSE                    Ok, no mixed engines
unknown's avatar
unknown committed
504 505 506
  DESCRIPTION
    Current check verifies only that all handlers are the same.
    Later this check will be more sophisticated.
507 508 509 510 511 512 513 514 515
    (specified partition handler ) specified table handler
    (NDB, NDB) NDB           OK
    (MYISAM, MYISAM) -       OK
    (MYISAM, -)      -       NOT OK
    (MYISAM, -)    MYISAM    OK
    (- , MYISAM)   -         NOT OK
    (- , -)        MYISAM    OK
    (-,-)          -         OK
    (NDB, MYISAM) *          NOT OK
unknown's avatar
unknown committed
516 517
*/

518 519
bool partition_info::check_engine_mix(handlerton *engine_type,
                                      bool table_engine_set)
unknown's avatar
unknown committed
520
{
521 522 523
  handlerton *old_engine_type= engine_type;
  bool first= TRUE;
  uint no_parts= partitions.elements;
unknown's avatar
unknown committed
524
  DBUG_ENTER("partition_info::check_engine_mix");
unknown's avatar
unknown committed
525 526
  DBUG_PRINT("info", ("in: engine_type = %s, table_engine_set = %u",
                       ha_resolve_storage_engine_name(engine_type),
527 528
                       table_engine_set));
  if (no_parts)
unknown's avatar
unknown committed
529
  {
530 531 532
    List_iterator<partition_element> part_it(partitions);
    uint i= 0;
    do
unknown's avatar
unknown committed
533
    {
534
      partition_element *part_elem= part_it++;
unknown's avatar
unknown committed
535 536
      DBUG_PRINT("info", ("part = %d engine = %s table_engine_set %u",
                 i, ha_resolve_storage_engine_name(part_elem->engine_type),
537 538 539 540 541 542 543 544 545 546
                 table_engine_set));
      if (is_sub_partitioned() &&
          part_elem->subpartitions.elements)
      {
        uint no_subparts= part_elem->subpartitions.elements;
        uint j= 0;
        List_iterator<partition_element> sub_it(part_elem->subpartitions);
        do
        {
          partition_element *sub_elem= sub_it++;
unknown's avatar
unknown committed
547 548
          DBUG_PRINT("info", ("sub = %d engine = %s table_engie_set %u",
                     j, ha_resolve_storage_engine_name(sub_elem->engine_type),
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
                     table_engine_set));
          if (check_engine_condition(sub_elem, table_engine_set,
                                     &engine_type, &first))
            goto error;
        } while (++j < no_subparts);
        /* ensure that the partition also has correct engine */
        if (check_engine_condition(part_elem, table_engine_set,
                                   &engine_type, &first))
          goto error;
      }
      else if (check_engine_condition(part_elem, table_engine_set,
                                      &engine_type, &first))
        goto error;
    } while (++i < no_parts);
  }
unknown's avatar
unknown committed
564 565
  DBUG_PRINT("info", ("engine_type = %s",
                       ha_resolve_storage_engine_name(engine_type)));
566 567 568
  if (!engine_type)
    engine_type= old_engine_type;
  if (engine_type->flags & HTON_NO_PARTITION)
569
  {
570
    my_error(ER_PARTITION_MERGE_ERROR, MYF(0));
571 572
    DBUG_RETURN(TRUE);
  }
unknown's avatar
unknown committed
573 574
  DBUG_PRINT("info", ("out: engine_type = %s",
                       ha_resolve_storage_engine_name(engine_type)));
575
  DBUG_ASSERT(engine_type != partition_hton);
576
  DBUG_RETURN(FALSE);
577 578 579 580 581 582
error:
  /*
    Mixed engines not yet supported but when supported it will need
    the partition handler
  */
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
}


/*
  This routine allocates an array for all range constants to achieve a fast
  check what partition a certain value belongs to. At the same time it does
  also check that the range constants are defined in increasing order and
  that the expressions are constant integer expressions.

  SYNOPSIS
    check_range_constants()

  RETURN VALUE
    TRUE                An error occurred during creation of range constants
    FALSE               Successful creation of range constant mapping

  DESCRIPTION
    This routine is called from check_partition_info to get a quick error
    before we came too far into the CREATE TABLE process. It is also called
    from fix_partition_func every time we open the .frm file. It is only
    called for RANGE PARTITIONed tables.
*/

bool partition_info::check_range_constants()
{
  partition_element* part_def;
unknown's avatar
unknown committed
609 610 611
  longlong current_largest;
  longlong part_range_value;
  bool first= TRUE;
unknown's avatar
unknown committed
612 613 614
  uint i;
  List_iterator<partition_element> it(partitions);
  bool result= TRUE;
615
  bool signed_flag= !part_expr->unsigned_flag;
unknown's avatar
unknown committed
616 617 618
  DBUG_ENTER("partition_info::check_range_constants");
  DBUG_PRINT("enter", ("INT_RESULT with %d parts", no_parts));

619 620
  LINT_INIT(current_largest);

unknown's avatar
unknown committed
621 622 623 624 625 626 627 628 629 630 631 632 633
  part_result_type= INT_RESULT;
  range_int_array= (longlong*)sql_alloc(no_parts * sizeof(longlong));
  if (unlikely(range_int_array == NULL))
  {
    mem_alloc_error(no_parts * sizeof(longlong));
    goto end;
  }
  i= 0;
  do
  {
    part_def= it++;
    if ((i != (no_parts - 1)) || !defined_max_value)
    {
unknown's avatar
unknown committed
634 635 636 637
      part_range_value= part_def->range_value;
      if (!signed_flag)
        part_range_value-= 0x8000000000000000ULL;
    }
unknown's avatar
unknown committed
638
    else
unknown's avatar
unknown committed
639 640
      part_range_value= LONGLONG_MAX;
    if (first)
unknown's avatar
unknown committed
641
    {
unknown's avatar
unknown committed
642 643 644
      current_largest= part_range_value;
      range_int_array[0]= part_range_value;
      first= FALSE;
unknown's avatar
unknown committed
645 646 647
    }
    else
    {
unknown's avatar
unknown committed
648
      if (likely(current_largest < part_range_value))
649
      {
unknown's avatar
unknown committed
650 651
        current_largest= part_range_value;
        range_int_array[i]= part_range_value;
652
      }
653 654 655 656 657 658 659
      else if (defined_max_value &&
               current_largest == part_range_value &&
               part_range_value == LONGLONG_MAX &&
               i == (no_parts - 1))
      {
        range_int_array[i]= part_range_value;
      }
660 661 662 663 664
      else
      {
        my_error(ER_RANGE_NOT_INCREASING_ERROR, MYF(0));
        goto end;
      }
unknown's avatar
unknown committed
665 666 667 668 669 670 671 672 673
    }
  } while (++i < no_parts);
  result= FALSE;
end:
  DBUG_RETURN(result);
}


/*
674 675
  Support routines for check_list_constants used by qsort to sort the
  constant list expressions. One routine for unsigned and one for signed.
unknown's avatar
unknown committed
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724

  SYNOPSIS
    list_part_cmp()
      a                First list constant to compare with
      b                Second list constant to compare with

  RETURN VALUE
    +1                 a > b
    0                  a  == b
    -1                 a < b
*/

int partition_info::list_part_cmp(const void* a, const void* b)
{
  longlong a1= ((LIST_PART_ENTRY*)a)->list_value;
  longlong b1= ((LIST_PART_ENTRY*)b)->list_value;
  if (a1 < b1)
    return -1;
  else if (a1 > b1)
    return +1;
  else
    return 0;
}


/*
  This routine allocates an array for all list constants to achieve a fast
  check what partition a certain value belongs to. At the same time it does
  also check that there are no duplicates among the list constants and that
  that the list expressions are constant integer expressions.

  SYNOPSIS
    check_list_constants()

  RETURN VALUE
    TRUE                  An error occurred during creation of list constants
    FALSE                 Successful creation of list constant mapping

  DESCRIPTION
    This routine is called from check_partition_info to get a quick error
    before we came too far into the CREATE TABLE process. It is also called
    from fix_partition_func every time we open the .frm file. It is only
    called for LIST PARTITIONed tables.
*/

bool partition_info::check_list_constants()
{
  uint i;
  uint list_index= 0;
725
  part_elem_value *list_value;
unknown's avatar
unknown committed
726
  bool result= TRUE;
unknown's avatar
unknown committed
727
  longlong curr_value, prev_value, type_add, calc_value;
unknown's avatar
unknown committed
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
  partition_element* part_def;
  bool found_null= FALSE;
  List_iterator<partition_element> list_func_it(partitions);
  DBUG_ENTER("partition_info::check_list_constants");

  part_result_type= INT_RESULT;
  no_list_values= 0;
  /*
    We begin by calculating the number of list values that have been
    defined in the first step.

    We use this number to allocate a properly sized array of structs
    to keep the partition id and the value to use in that partition.
    In the second traversal we assign them values in the struct array.

    Finally we sort the array of structs in order of values to enable
    a quick binary search for the proper value to discover the
    partition id.
    After sorting the array we check that there are no duplicates in the
    list.
  */

  i= 0;
  do
  {
    part_def= list_func_it++;
    if (part_def->has_null_value)
    {
      if (found_null)
      {
        my_error(ER_MULTIPLE_DEF_CONST_IN_LIST_PART_ERROR, MYF(0));
        goto end;
      }
      has_null_value= TRUE;
      has_null_part_id= i;
      found_null= TRUE;
    }
765
    List_iterator<part_elem_value> list_val_it1(part_def->list_val_list);
unknown's avatar
unknown committed
766 767 768 769
    while (list_val_it1++)
      no_list_values++;
  } while (++i < no_parts);
  list_func_it.rewind();
770 771
  list_array= (LIST_PART_ENTRY*)sql_alloc((no_list_values+1) *
                                          sizeof(LIST_PART_ENTRY));
unknown's avatar
unknown committed
772 773 774 775 776 777 778
  if (unlikely(list_array == NULL))
  {
    mem_alloc_error(no_list_values * sizeof(LIST_PART_ENTRY));
    goto end;
  }

  i= 0;
unknown's avatar
unknown committed
779 780 781 782 783 784 785 786
  /*
    Fix to be able to reuse signed sort functions also for unsigned
    partition functions.
  */
  type_add= (longlong)(part_expr->unsigned_flag ?
                                       0x8000000000000000ULL :
                                       0ULL);

unknown's avatar
unknown committed
787 788 789
  do
  {
    part_def= list_func_it++;
790
    List_iterator<part_elem_value> list_val_it2(part_def->list_val_list);
unknown's avatar
unknown committed
791 792
    while ((list_value= list_val_it2++))
    {
unknown's avatar
unknown committed
793 794
      calc_value= list_value->value - type_add;
      list_array[list_index].list_value= calc_value;
unknown's avatar
unknown committed
795 796 797 798
      list_array[list_index++].partition_id= i;
    }
  } while (++i < no_parts);

799
  if (fixed && no_list_values)
unknown's avatar
unknown committed
800
  {
801
    bool first= TRUE;
802 803
    my_qsort((void*)list_array, no_list_values, sizeof(LIST_PART_ENTRY), 
             &list_part_cmp);
804
 
805 806
    i= 0;
    LINT_INIT(prev_value);
807
    do
unknown's avatar
unknown committed
808
    {
809 810
      DBUG_ASSERT(i < no_list_values);
      curr_value= list_array[i].list_value;
811
      if (likely(first || prev_value != curr_value))
812 813
      {
        prev_value= curr_value;
814
        first= FALSE;
815 816 817 818 819 820 821
      }
      else
      {
        my_error(ER_MULTIPLE_DEF_CONST_IN_LIST_PART_ERROR, MYF(0));
        goto end;
      }
    } while (++i < no_list_values);
unknown's avatar
unknown committed
822
  }
unknown's avatar
unknown committed
823 824 825 826 827 828 829 830 831 832 833 834
  result= FALSE;
end:
  DBUG_RETURN(result);
}


/*
  This code is used early in the CREATE TABLE and ALTER TABLE process.

  SYNOPSIS
    check_partition_info()
    file                A reference to a handler of the table
835
    info                Create info
unknown's avatar
unknown committed
836
    engine_type         Return value for used engine in partitions
837
    check_partition_function Should we check the partition function
unknown's avatar
unknown committed
838 839 840 841 842 843 844 845 846 847 848 849 850

  RETURN VALUE
    TRUE                 Error, something went wrong
    FALSE                Ok, full partition data structures are now generated

  DESCRIPTION
    We will check that the partition info requested is possible to set-up in
    this version. This routine is an extension of the parser one could say.
    If defaults were used we will generate default data structures for all
    partitions.

*/

851
bool partition_info::check_partition_info(THD *thd, handlerton **eng_type,
852
                                          handler *file, HA_CREATE_INFO *info,
853
                                          bool check_partition_function)
unknown's avatar
unknown committed
854
{
855
  handlerton *table_engine= default_engine_type;
unknown's avatar
unknown committed
856
  uint i, tot_partitions;
857
  bool result= TRUE, table_engine_set;
unknown's avatar
unknown committed
858 859
  char *same_name;
  DBUG_ENTER("partition_info::check_partition_info");
860
  DBUG_ASSERT(default_engine_type != partition_hton);
unknown's avatar
unknown committed
861

unknown's avatar
unknown committed
862 863
  DBUG_PRINT("info", ("default table_engine = %s",
                      ha_resolve_storage_engine_name(table_engine)));
864
  if (check_partition_function)
865
  {
unknown's avatar
unknown committed
866
    int err= 0;
867

868 869
    if (part_type != HASH_PARTITION || !list_of_part_fields)
    {
870
      DBUG_ASSERT(part_expr);
unknown's avatar
unknown committed
871 872 873 874 875
      err= part_expr->walk(&Item::check_partition_func_processor, 0,
                           NULL);
      if (!err && is_sub_partitioned() && !list_of_subpart_fields)
        err= subpart_expr->walk(&Item::check_partition_func_processor, 0,
                                NULL);
876
    }
unknown's avatar
unknown committed
877
    if (err)
878 879 880 881
    {
      my_error(ER_PARTITION_FUNCTION_IS_NOT_ALLOWED, MYF(0));
      goto end;
    }
882
  }
unknown's avatar
unknown committed
883 884 885 886 887 888 889 890 891 892 893 894 895 896
  if (unlikely(!is_sub_partitioned() && 
               !(use_default_subpartitions && use_default_no_subpartitions)))
  {
    my_error(ER_SUBPARTITION_ERROR, MYF(0));
    goto end;
  }
  if (unlikely(is_sub_partitioned() &&
              (!(part_type == RANGE_PARTITION || 
                 part_type == LIST_PARTITION))))
  {
    /* Only RANGE and LIST partitioning can be subpartitioned */
    my_error(ER_SUBPARTITION_ERROR, MYF(0));
    goto end;
  }
897
  if (unlikely(set_up_defaults_for_partitioning(file, info, (uint)0)))
unknown's avatar
unknown committed
898
    goto end;
899 900 901 902 903
  if (!(tot_partitions= get_tot_partitions()))
  {
    my_error(ER_PARTITION_NOT_DEFINED_ERROR, MYF(0), "partitions");
    goto end;
  }
unknown's avatar
unknown committed
904 905 906 907 908
  if (unlikely(tot_partitions > MAX_PARTITIONS))
  {
    my_error(ER_TOO_MANY_PARTITIONS_ERROR, MYF(0));
    goto end;
  }
909 910 911 912 913 914 915 916 917 918
  /*
    if NOT specified ENGINE = <engine>:
      If Create, always use create_info->db_type
      else, use previous tables db_type 
      either ALL or NONE partition should be set to
      default_engine_type when not table_engine_set
      Note: after a table is created its storage engines for
      the table and all partitions/subpartitions are set.
      So when ALTER it is already set on table level
  */
unknown's avatar
unknown committed
919
  if (info && info->used_fields & HA_CREATE_USED_ENGINE)
920 921
  {
    table_engine_set= TRUE;
unknown's avatar
unknown committed
922 923 924 925 926 927 928
    table_engine= info->db_type;
    /* if partition_hton, use thd->lex->create_info */
    if (table_engine == partition_hton)
      table_engine= thd->lex->create_info.db_type;
    DBUG_ASSERT(table_engine != partition_hton);
    DBUG_PRINT("info", ("Using table_engine = %s",
                        ha_resolve_storage_engine_name(table_engine)));
929 930 931 932 933 934 935
  }
  else
  {
    table_engine_set= FALSE;
    if (thd->lex->sql_command != SQLCOM_CREATE_TABLE)
    {
      table_engine_set= TRUE;
unknown's avatar
unknown committed
936 937
      DBUG_PRINT("info", ("No create, table_engine = %s",
                          ha_resolve_storage_engine_name(table_engine)));
938 939 940 941
      DBUG_ASSERT(table_engine && table_engine != partition_hton);
    }
  }

unknown's avatar
unknown committed
942 943 944 945 946 947 948 949
  if ((same_name= has_unique_names()))
  {
    my_error(ER_SAME_NAME_PARTITION, MYF(0), same_name);
    goto end;
  }
  i= 0;
  {
    List_iterator<partition_element> part_it(partitions);
950 951
    uint no_parts_not_set= 0;
    uint prev_no_subparts_not_set= no_subparts + 1;
unknown's avatar
unknown committed
952 953 954
    do
    {
      partition_element *part_elem= part_it++;
955 956 957 958 959
#ifdef HAVE_READLINK
      if (!my_use_symdir || (thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE))
#endif
      {
        if (part_elem->data_file_name)
960 961 962
          push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                              WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
                              "DATA DIRECTORY");
963
        if (part_elem->index_file_name)
964
          push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
Marc Alff's avatar
Marc Alff committed
965
                              WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
966
                              "INDEX DIRECTORY");
967 968
        part_elem->data_file_name= part_elem->index_file_name= NULL;
      }
unknown's avatar
unknown committed
969 970
      if (!is_sub_partitioned())
      {
unknown's avatar
unknown committed
971 972 973 974 975
        if (part_elem->engine_type == NULL)
        {
          no_parts_not_set++;
          part_elem->engine_type= default_engine_type;
        }
976 977 978 979 980 981
        if (check_table_name(part_elem->partition_name,
                             strlen(part_elem->partition_name)))
        {
          my_error(ER_WRONG_PARTITION_NAME, MYF(0));
          goto end;
        }
unknown's avatar
unknown committed
982 983
        DBUG_PRINT("info", ("part = %d engine = %s",
                   i, ha_resolve_storage_engine_name(part_elem->engine_type)));
unknown's avatar
unknown committed
984 985 986 987
      }
      else
      {
        uint j= 0;
988
        uint no_subparts_not_set= 0;
unknown's avatar
unknown committed
989
        List_iterator<partition_element> sub_it(part_elem->subpartitions);
unknown's avatar
unknown committed
990
        partition_element *sub_elem;
unknown's avatar
unknown committed
991 992
        do
        {
unknown's avatar
unknown committed
993
          sub_elem= sub_it++;
994 995
          if (check_table_name(sub_elem->partition_name,
                               strlen(sub_elem->partition_name)))
996 997 998 999
          {
            my_error(ER_WRONG_PARTITION_NAME, MYF(0));
            goto end;
          }
1000
          if (sub_elem->engine_type == NULL)
1001
          {
unknown's avatar
unknown committed
1002 1003 1004 1005 1006 1007 1008
            if (part_elem->engine_type != NULL)
              sub_elem->engine_type= part_elem->engine_type;
            else
            {
              sub_elem->engine_type= default_engine_type;
              no_subparts_not_set++;
            }
1009
          }
unknown's avatar
unknown committed
1010 1011
          DBUG_PRINT("info", ("part = %d sub = %d engine = %s", i, j,
                     ha_resolve_storage_engine_name(sub_elem->engine_type)));
unknown's avatar
unknown committed
1012
        } while (++j < no_subparts);
unknown's avatar
unknown committed
1013 1014 1015

        if (prev_no_subparts_not_set == (no_subparts + 1) &&
            (no_subparts_not_set == 0 || no_subparts_not_set == no_subparts))
1016
          prev_no_subparts_not_set= no_subparts_not_set;
unknown's avatar
unknown committed
1017

1018
        if (!table_engine_set &&
unknown's avatar
unknown committed
1019
            prev_no_subparts_not_set != no_subparts_not_set)
1020 1021 1022 1023 1024 1025
        {
          DBUG_PRINT("info", ("no_subparts_not_set = %u no_subparts = %u",
                     no_subparts_not_set, no_subparts));
          my_error(ER_MIX_HANDLER_ERROR, MYF(0));
          goto end;
        }
unknown's avatar
unknown committed
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036

        if (part_elem->engine_type == NULL)
        {
          if (no_subparts_not_set == 0)
            part_elem->engine_type= sub_elem->engine_type;
          else
          {
            no_parts_not_set++;
            part_elem->engine_type= default_engine_type;
          }
        }
unknown's avatar
unknown committed
1037 1038
      }
    } while (++i < no_parts);
1039 1040 1041 1042 1043 1044 1045 1046 1047
    if (!table_engine_set &&
        no_parts_not_set != 0 &&
        no_parts_not_set != no_parts)
    {
      DBUG_PRINT("info", ("no_parts_not_set = %u no_parts = %u",
                 no_parts_not_set, no_subparts));
      my_error(ER_MIX_HANDLER_ERROR, MYF(0));
      goto end;
    }
unknown's avatar
unknown committed
1048
  }
1049 1050 1051
  if (unlikely(check_engine_mix(table_engine, table_engine_set)))
  {
    my_error(ER_MIX_HANDLER_ERROR, MYF(0));
unknown's avatar
unknown committed
1052
    goto end;
1053
  }
unknown's avatar
unknown committed
1054

unknown's avatar
unknown committed
1055 1056
  DBUG_ASSERT(table_engine != partition_hton &&
              default_engine_type == table_engine);
unknown's avatar
unknown committed
1057
  if (eng_type)
1058 1059
    *eng_type= table_engine;

unknown's avatar
unknown committed
1060 1061 1062 1063 1064 1065 1066

  /*
    We need to check all constant expressions that they are of the correct
    type and that they are increasing for ranges and not overlapping for
    list constants.
  */

1067 1068 1069 1070 1071 1072
  if (fixed)
  {
    if (unlikely((part_type == RANGE_PARTITION && check_range_constants()) ||
                  (part_type == LIST_PARTITION && check_list_constants())))
      goto end;
  }
unknown's avatar
unknown committed
1073 1074 1075 1076 1077 1078
  result= FALSE;
end:
  DBUG_RETURN(result);
}


unknown's avatar
unknown committed
1079 1080
/*
  Print error for no partition found
1081

unknown's avatar
unknown committed
1082 1083 1084
  SYNOPSIS
    print_no_partition_found()
    table                        Table object
1085

unknown's avatar
unknown committed
1086 1087 1088 1089 1090 1091 1092
  RETURN VALUES
*/

void partition_info::print_no_partition_found(TABLE *table)
{
  char buf[100];
  char *buf_ptr= (char*)&buf;
1093
  TABLE_LIST table_list;
unknown's avatar
unknown committed
1094

1095 1096 1097 1098 1099 1100 1101 1102
  bzero(&table_list, sizeof(table_list));
  table_list.db= table->s->db.str;
  table_list.table_name= table->s->table_name.str;

  if (check_single_table_access(current_thd,
                                SELECT_ACL, &table_list, TRUE))
    my_message(ER_NO_PARTITION_FOR_GIVEN_VALUE,
               ER(ER_NO_PARTITION_FOR_GIVEN_VALUE_SILENT), MYF(0));
unknown's avatar
unknown committed
1103
  else
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
  {
    my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
    if (part_expr->null_value)
      buf_ptr= (char*)"NULL";
    else
      longlong2str(err_value, buf,
                   part_expr->unsigned_flag ? 10 : -10);
    my_error(ER_NO_PARTITION_FOR_GIVEN_VALUE, MYF(0), buf_ptr);
    dbug_tmp_restore_column_map(table->read_set, old_map);
  }
unknown's avatar
unknown committed
1114
}
1115 1116 1117 1118
/*
  Set up buffers and arrays for fields requiring preparation
  SYNOPSIS
    set_up_charset_field_preps()
1119

1120 1121 1122
  RETURN VALUES
    TRUE                             Memory Allocation error
    FALSE                            Success
unknown's avatar
unknown committed
1123

1124 1125 1126 1127 1128 1129
  DESCRIPTION
    Set up arrays and buffers for fields that require special care for
    calculation of partition id. This is used for string fields with
    variable length or string fields with fixed length that isn't using
    the binary collation.
*/
unknown's avatar
unknown committed
1130

1131 1132 1133
bool partition_info::set_up_charset_field_preps()
{
  Field *field, **ptr;
1134
  uchar **char_ptrs;
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156
  unsigned i;
  size_t size;
  uint tot_fields= 0;
  uint tot_part_fields= 0;
  uint tot_subpart_fields= 0;
  DBUG_ENTER("set_up_charset_field_preps");

  if (!(part_type == HASH_PARTITION &&
        list_of_part_fields) &&
        check_part_func_fields(part_field_array, FALSE))
  {
    ptr= part_field_array;
    /* Set up arrays and buffers for those fields */
    while ((field= *(ptr++)))
    {
      if (field_is_partition_charset(field))
      {
        tot_part_fields++;
        tot_fields++;
      }
    }
    size= tot_part_fields * sizeof(char*);
1157
    if (!(char_ptrs= (uchar**)sql_calloc(size)))
1158 1159
      goto error;
    part_field_buffers= char_ptrs;
1160
    if (!(char_ptrs= (uchar**)sql_calloc(size)))
1161 1162 1163
      goto error;
    restore_part_field_ptrs= char_ptrs;
    size= (tot_part_fields + 1) * sizeof(Field*);
1164
    if (!(char_ptrs= (uchar**)sql_alloc(size)))
1165 1166 1167 1168 1169 1170 1171 1172
      goto error;
    part_charset_field_array= (Field**)char_ptrs;
    ptr= part_field_array;
    i= 0;
    while ((field= *(ptr++)))
    {
      if (field_is_partition_charset(field))
      {
1173
        uchar *field_buf;
1174
        size= field->pack_length();
1175
        if (!(field_buf= (uchar*) sql_calloc(size)))
1176 1177 1178 1179 1180 1181 1182
          goto error;
        part_charset_field_array[i]= field;
        part_field_buffers[i++]= field_buf;
      }
    }
    part_charset_field_array[i]= NULL;
  }
unknown's avatar
unknown committed
1183
  if (is_sub_partitioned() && !list_of_subpart_fields &&
1184 1185 1186 1187 1188 1189 1190
      check_part_func_fields(subpart_field_array, FALSE))
  {
    /* Set up arrays and buffers for those fields */
    ptr= subpart_field_array;
    while ((field= *(ptr++)))
    {
      if (field_is_partition_charset(field))
unknown's avatar
unknown committed
1191
      {
1192
        tot_subpart_fields++;
unknown's avatar
unknown committed
1193 1194
        tot_fields++;
      }
1195 1196
    }
    size= tot_subpart_fields * sizeof(char*);
1197
    if (!(char_ptrs= (uchar**) sql_calloc(size)))
1198 1199
      goto error;
    subpart_field_buffers= char_ptrs;
1200
    if (!(char_ptrs= (uchar**) sql_calloc(size)))
1201 1202 1203
      goto error;
    restore_subpart_field_ptrs= char_ptrs;
    size= (tot_subpart_fields + 1) * sizeof(Field*);
1204
    if (!(char_ptrs= (uchar**) sql_alloc(size)))
1205 1206
      goto error;
    subpart_charset_field_array= (Field**)char_ptrs;
unknown's avatar
unknown committed
1207
    ptr= subpart_field_array;
1208 1209 1210 1211
    i= 0;
    while ((field= *(ptr++)))
    {
      CHARSET_INFO *cs;
1212
      uchar *field_buf;
1213
      LINT_INIT(field_buf);
1214 1215 1216 1217 1218

      if (!field_is_partition_charset(field))
        continue;
      cs= ((Field_str*)field)->charset();
      size= field->pack_length();
1219
      if (!(field_buf= (uchar*) sql_calloc(size)))
unknown's avatar
unknown committed
1220 1221
        goto error;
      subpart_charset_field_array[i]= field;
1222 1223
      subpart_field_buffers[i++]= field_buf;
    }
unknown's avatar
unknown committed
1224
    subpart_charset_field_array[i]= NULL;
1225 1226 1227
  }
  if (tot_fields)
  {
unknown's avatar
unknown committed
1228
    uint k;
1229
    size= tot_fields*sizeof(char**);
1230
    if (!(char_ptrs= (uchar**)sql_calloc(size)))
1231 1232
      goto error;
    full_part_field_buffers= char_ptrs;
1233
    if (!(char_ptrs= (uchar**)sql_calloc(size)))
1234 1235 1236
      goto error;
    restore_full_part_field_ptrs= char_ptrs;
    size= (tot_fields + 1) * sizeof(char**);
1237
    if (!(char_ptrs= (uchar**)sql_calloc(size)))
1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
      goto error;
    full_part_charset_field_array= (Field**)char_ptrs;
    for (i= 0; i < tot_part_fields; i++)
    {
      full_part_charset_field_array[i]= part_charset_field_array[i];
      full_part_field_buffers[i]= part_field_buffers[i];
    }
    k= tot_part_fields;
    for (i= 0; i < tot_subpart_fields; i++)
    {
unknown's avatar
unknown committed
1248 1249
      uint j;
      bool found= FALSE;
1250
      field= subpart_charset_field_array[i];
unknown's avatar
unknown committed
1251

1252 1253 1254 1255 1256 1257 1258
      for (j= 0; j < tot_part_fields; j++)
      {
        if (field == part_charset_field_array[i])
          found= TRUE;
      }
      if (!found)
      {
unknown's avatar
unknown committed
1259 1260 1261
        full_part_charset_field_array[k]= subpart_charset_field_array[i];
        full_part_field_buffers[k]= subpart_field_buffers[i];
        k++;
1262 1263
      }
    }
unknown's avatar
unknown committed
1264
    full_part_charset_field_array[k]= NULL;
1265 1266 1267 1268 1269 1270
  }
  DBUG_RETURN(FALSE);
error:
  mem_alloc_error(size);
  DBUG_RETURN(TRUE);
}
1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317


/*
  Check if path does not contain mysql data home directory
  for partition elements with data directory and index directory

  SYNOPSIS
    check_partition_dirs()
    part_info               partition_info struct 

  RETURN VALUES
    0	ok
    1	error  
*/

bool check_partition_dirs(partition_info *part_info)
{
  if (!part_info)
    return 0;

  partition_element *part_elem;
  List_iterator<partition_element> part_it(part_info->partitions);
  while ((part_elem= part_it++))
  {
    if (part_elem->subpartitions.elements)
    {
      List_iterator<partition_element> sub_it(part_elem->subpartitions);
      partition_element *subpart_elem;
      while ((subpart_elem= sub_it++))
      {
        if (test_if_data_home_dir(subpart_elem->data_file_name))
          goto dd_err;
        if (test_if_data_home_dir(subpart_elem->index_file_name))
          goto id_err;
      }
    }
    else
    {
      if (test_if_data_home_dir(part_elem->data_file_name))
        goto dd_err;
      if (test_if_data_home_dir(part_elem->index_file_name))
        goto id_err;
    }
  }
  return 0;

dd_err:
1318
  my_error(ER_WRONG_ARGUMENTS,MYF(0),"DATA DIRECTORY");
1319 1320 1321
  return 1;

id_err:
1322
  my_error(ER_WRONG_ARGUMENTS,MYF(0),"INDEX DIRECTORY");
1323 1324 1325 1326
  return 1;
}


1327
#endif /* WITH_PARTITION_STORAGE_ENGINE */