ma_delete.c 52.4 KB
Newer Older
1 2 3 4
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13 14 15 16 17

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "ma_fulltext.h"
#include "ma_rt_index.h"
18 19
#include "trnman.h"
#include "ma_key_recover.h"
20

21
static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
22 23
                    my_off_t page, uchar *anc_buff,
                    MARIA_PINNED_PAGE *anc_page_link);
24
static int del(MARIA_HA *info, MARIA_KEY *key,
25 26 27 28 29 30
               my_off_t anc_page, uchar *anc_buff, my_off_t leaf_page,
               uchar *leaf_buff, MARIA_PINNED_PAGE *leaf_page_link,
               uchar *keypos, my_off_t next_block, uchar *ret_key);
static int underflow(MARIA_HA *info,MARIA_KEYDEF *keyinfo,
                     my_off_t anc_page, uchar *anc_buff,
		     my_off_t leaf_page, uchar *leaf_buff,
31
                     MARIA_PINNED_PAGE *leaf_page_link, uchar *keypos);
32 33
static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
                       uchar *keypos, uchar *lastkey, uchar *page_end,
34
		       my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
35

36
/* @breif Remove a row from a MARIA table */
37

unknown's avatar
unknown committed
38
int maria_delete(MARIA_HA *info,const uchar *record)
39 40
{
  uint i;
unknown's avatar
unknown committed
41
  uchar *old_key;
42 43
  int save_errno;
  char lastpos[8];
44
  MARIA_SHARE *share= info->s;
45
  MARIA_KEYDEF *keyinfo;
46 47
  DBUG_ENTER("maria_delete");

48
  /* Test if record is in datafile */
49
  DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
unknown's avatar
unknown committed
50
                  maria_print_error(share, HA_ERR_CRASHED);
51 52
                  DBUG_RETURN(my_errno= HA_ERR_CRASHED););
  DBUG_EXECUTE_IF("my_error_test_undefined_error",
unknown's avatar
unknown committed
53
                  maria_print_error(share, INT_MAX);
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
                  DBUG_RETURN(my_errno= INT_MAX););
  if (!(info->update & HA_STATE_AKTIV))
  {
    DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND);	/* No database read */
  }
  if (share->options & HA_OPTION_READ_ONLY_DATA)
  {
    DBUG_RETURN(my_errno=EACCES);
  }
  if (_ma_readinfo(info,F_WRLCK,1))
    DBUG_RETURN(my_errno);
  if ((*share->compare_record)(info,record))
    goto err;				/* Error on read-check */

  if (_ma_mark_file_changed(info))
    goto err;

71 72
  /* Ensure we don't change the autoincrement value */
  info->last_auto_increment= ~(ulonglong) 0;
unknown's avatar
unknown committed
73
  /* Remove all keys from the index file */
74

75 76 77
  old_key= info->lastkey_buff2;

  for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
78
  {
unknown's avatar
unknown committed
79
    if (maria_is_key_active(share->state.key_map, i))
80
    {
81 82
      keyinfo->version++;
      if (keyinfo->flag & HA_FULLTEXT)
83
      {
84
        if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
85 86 87 88
          goto err;
      }
      else
      {
89 90 91 92 93 94
        MARIA_KEY key;
        if (keyinfo->ck_delete(info,
                               (*keyinfo->make_key)(info, &key, i, old_key,
                                                    record,
                                                    info->cur_row.lastpos,
                                                    info->cur_row.trid)))
95 96 97 98 99 100 101
          goto err;
      }
      /* The above changed info->lastkey2. Inform maria_rnext_same(). */
      info->update&= ~HA_STATE_RNEXT_SAME;
    }
  }

unknown's avatar
unknown committed
102
  if (share->calc_checksum)
unknown's avatar
unknown committed
103
  {
unknown's avatar
unknown committed
104 105 106 107 108
    /*
      We can't use the row based checksum as this doesn't have enough
      precision.
    */
    info->cur_row.checksum= (*share->calc_checksum)(info, record);
unknown's avatar
unknown committed
109
  }
110

unknown's avatar
unknown committed
111 112 113
  if ((*share->delete_record)(info, record))
    goto err;				/* Remove record from database */

114 115
  info->state->checksum-= info->cur_row.checksum;
  info->state->records--;
116
  info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
unknown's avatar
unknown committed
117 118
  share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
                          STATE_NOT_ZEROFILLED);
119

unknown's avatar
unknown committed
120
  mi_sizestore(lastpos, info->cur_row.lastpos);
121 122 123 124
  VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
  allow_break();			/* Allow SIGHUP & SIGINT */
  if (info->invalidator != 0)
  {
unknown's avatar
unknown committed
125 126
    DBUG_PRINT("info", ("invalidator... '%s' (delete)", share->open_file_name));
    (*info->invalidator)(share->open_file_name);
127 128 129 130 131
    info->invalidator=0;
  }
  DBUG_RETURN(0);

err:
unknown's avatar
unknown committed
132 133 134 135 136
  save_errno= my_errno;
  DBUG_ASSERT(save_errno);
  if (!save_errno)
    save_errno= HA_ERR_INTERNAL_ERROR;          /* Should never happen */

unknown's avatar
unknown committed
137
  mi_sizestore(lastpos, info->cur_row.lastpos);
138 139
  if (save_errno != HA_ERR_RECORD_CHANGED)
  {
unknown's avatar
unknown committed
140
    maria_print_error(share, HA_ERR_CRASHED);
141 142 143 144 145 146 147
    maria_mark_crashed(info);		/* mark table crashed */
  }
  VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
  info->update|=HA_STATE_WRITTEN;	/* Buffer changed */
  allow_break();			/* Allow SIGHUP & SIGINT */
  if (save_errno == HA_ERR_KEY_NOT_FOUND)
  {
unknown's avatar
unknown committed
148
    maria_print_error(share, HA_ERR_CRASHED);
149 150
    my_errno=HA_ERR_CRASHED;
  }
unknown's avatar
unknown committed
151
  DBUG_RETURN(my_errno= save_errno);
152 153 154
} /* maria_delete */


155 156 157 158 159 160 161
/*
  Remove a key from the btree index

  TODO:
   Change ma_ck_real_delete() to use another buffer for changed keys instead
   of key->data. This would allows us to remove the copying of the key here.
*/
162

163
int _ma_ck_delete(register MARIA_HA *info, MARIA_KEY *key)
164
{
165
  MARIA_SHARE *share= info->s;
166 167
  int res;
  LSN lsn= LSN_IMPOSSIBLE;
168 169 170
  my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
  uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
  MARIA_KEY org_key;
171 172
  DBUG_ENTER("_ma_ck_delete");

173
  save_key_data= key->data;
174
  if (share->now_transactional)
175 176
  {
    /* Save original value as the key may change */
177 178 179
    memcpy(key_buff, key->data, key->data_length + key->ref_length);
    org_key= *key;
    key->data= key_buff;
180 181
  }

182
  res= _ma_ck_real_delete(info, key, &new_root);
183

184
  key->data= save_key_data;
185
  if (!res && share->now_transactional)
186
    res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
187 188
  else
  {
189
    share->state.key_root[key->keyinfo->key_nr]= new_root;
190 191 192 193
    _ma_fast_unlock_key_del(info);
  }
  _ma_unpin_all_pages_and_finalize_row(info, lsn);
  DBUG_RETURN(res);
194 195 196
} /* _ma_ck_delete */


197 198
int _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
                       my_off_t *root)
199 200 201 202
{
  int error;
  uint nod_flag;
  my_off_t old_root;
unknown's avatar
unknown committed
203
  uchar *root_buff;
204
  MARIA_PINNED_PAGE *page_link;
205
  MARIA_KEYDEF *keyinfo= key->keyinfo;
206 207 208 209 210 211 212
  DBUG_ENTER("_ma_ck_real_delete");

  if ((old_root=*root) == HA_OFFSET_ERROR)
  {
    maria_print_error(info->s, HA_ERR_CRASHED);
    DBUG_RETURN(my_errno=HA_ERR_CRASHED);
  }
unknown's avatar
unknown committed
213
  if (!(root_buff= (uchar*)  my_alloca((uint) keyinfo->block_length+
214
                                       MARIA_MAX_KEY_BUFF*2)))
215 216 217 218
  {
    DBUG_PRINT("error",("Couldn't allocate memory"));
    DBUG_RETURN(my_errno=ENOMEM);
  }
219
  DBUG_PRINT("info",("root_page: %ld", (long) old_root));
220 221 222
  if (!_ma_fetch_keypage(info, keyinfo, old_root,
                         PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0,
                         &page_link))
223 224 225 226
  {
    error= -1;
    goto err;
  }
227 228 229 230
  if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
                                   SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
                                   SEARCH_SAME),
                       old_root, root_buff, page_link)) > 0)
231 232 233 234
  {
    if (error == 2)
    {
      DBUG_PRINT("test",("Enlarging of root when deleting"));
235
      error= _ma_enlarge_root(info, key, root);
236 237 238
    }
    else /* error == 1 */
    {
239
      uint used_length;
240 241
      MARIA_SHARE *share= info->s;
      _ma_get_used_and_nod(share, root_buff, used_length, nod_flag);
242
      page_link->changed= 1;
243
      if (used_length <= nod_flag + share->keypage_header + 1)
244 245 246
      {
	error=0;
	if (nod_flag)
247
	  *root= _ma_kpos(nod_flag, root_buff +share->keypage_header +
248
                          nod_flag);
249 250
	else
	  *root=HA_OFFSET_ERROR;
251
	if (_ma_dispose(info, old_root, 0))
252 253 254
	  error= -1;
      }
      else
255 256 257
	error= _ma_write_keypage(info,keyinfo, old_root,
                                 PAGECACHE_LOCK_LEFT_WRITELOCKED,
                                 DFLT_INIT_HITS, root_buff);
258 259 260
    }
  }
err:
unknown's avatar
unknown committed
261
  my_afree((uchar*) root_buff);
262 263 264 265 266
  DBUG_PRINT("exit",("Return: %d",error));
  DBUG_RETURN(error);
} /* _ma_ck_real_delete */


267 268
/**
   @brief Remove key below key root
269

270
   @param key  Key to delete.  Will contain new key if block was enlarged
271

272 273 274 275 276
   @return
   @retval 0   ok (anc_page is not changed)
   @retval 1   If data on page is too small; In this case anc_buff is not saved
   @retval 2   If data on page is too big
   @retval -1  On errors
277
*/
278

279
static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
280 281
                    my_off_t anc_page, uchar *anc_buff,
                    MARIA_PINNED_PAGE *anc_page_link)
282 283
{
  int flag,ret_value,save_flag;
284
  uint nod_flag, page_flag;
285
  my_bool last_key;
unknown's avatar
unknown committed
286
  uchar *leaf_buff,*keypos;
287
  my_off_t leaf_page,next_block;
288
  uchar lastkey[MARIA_MAX_KEY_BUFF];
289 290
  MARIA_PINNED_PAGE *leaf_page_link;
  MARIA_KEY_PARAM s_temp;
291
  MARIA_SHARE *share= info->s;
292
  MARIA_KEYDEF *keyinfo= key->keyinfo;
293
  DBUG_ENTER("d_search");
294
  DBUG_DUMP("page",anc_buff,_ma_get_page_used(share, anc_buff));
295

296 297
  flag=(*keyinfo->bin_search)(key, anc_buff, comp_flag, &keypos, lastkey,
                              &last_key);
298 299 300 301 302
  if (flag == MARIA_FOUND_WRONG_KEY)
  {
    DBUG_PRINT("error",("Found wrong key"));
    DBUG_RETURN(-1);
  }
303
  page_flag= _ma_get_keypage_flag(share, anc_buff);
304
  nod_flag= _ma_test_if_nod(share, anc_buff);
305

306
  if (!flag && (keyinfo->flag & HA_FULLTEXT))
307 308 309 310 311 312 313 314 315 316 317 318 319 320
  {
    uint off;
    int  subkeys;

    get_key_full_length_rdonly(off, lastkey);
    subkeys=ft_sintXkorr(lastkey+off);
    DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
    comp_flag=SEARCH_SAME;
    if (subkeys >= 0)
    {
      /* normal word, one-level tree structure */
      if (info->ft1_to_ft2)
      {
        /* we're in ft1->ft2 conversion mode. Saving key data */
321
        insert_dynamic(info->ft1_to_ft2, (lastkey+off));
322 323 324 325
      }
      else
      {
        /* we need exact match only if not in ft1->ft2 conversion mode */
326 327
        flag=(*keyinfo->bin_search)(key, anc_buff, comp_flag, &keypos,
                                    lastkey, &last_key);
328 329 330 331 332 333 334 335
      }
      /* fall through to normal delete */
    }
    else
    {
      /* popular word. two-level tree. going down */
      uint tmp_key_length;
      my_off_t root;
unknown's avatar
unknown committed
336
      uchar *kpos=keypos;
337
      MARIA_KEY tmp_key;
338

339 340 341 342 343
      tmp_key.data=    lastkey;
      tmp_key.keyinfo= keyinfo;

      if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
                                               &kpos)))
344
      {
345
        maria_print_error(share, HA_ERR_CRASHED);
346 347 348
        my_errno= HA_ERR_CRASHED;
        DBUG_RETURN(-1);
      }
349
      root= _ma_row_pos_from_key(&tmp_key);
350 351 352
      if (subkeys == -1)
      {
        /* the last entry in sub-tree */
353
        if (_ma_dispose(info, root, 1))
354 355 356 357 358
          DBUG_RETURN(-1);
        /* fall through to normal delete */
      }
      else
      {
359
        MARIA_KEY word_key;
360
        keyinfo=&share->ft2_keyinfo;
361 362
        /* we'll modify key entry 'in vivo' */
        kpos-=keyinfo->keylength+nod_flag;
363 364 365 366 367 368 369 370 371
        get_key_full_length_rdonly(off, key->data);

        word_key.data=        key->data + off;
        word_key.keyinfo=     &share->ft2_keyinfo;
        word_key.data_length= HA_FT_WLEN;
        word_key.ref_length= 0;
        word_key.flag= 0;
        ret_value= _ma_ck_real_delete(info, &word_key, &root);
        _ma_dpointer(share, kpos+HA_FT_WLEN, root);
372 373 374
        subkeys++;
        ft_intXstore(kpos, subkeys);
        if (!ret_value)
375 376 377 378 379 380
        {
          anc_page_link->changed= 1;
          ret_value= _ma_write_keypage(info, keyinfo, anc_page,
                                       PAGECACHE_LOCK_LEFT_WRITELOCKED,
                                       DFLT_INIT_HITS, anc_buff);
        }
381 382 383 384 385 386 387 388 389
        DBUG_PRINT("exit",("Return: %d",ret_value));
        DBUG_RETURN(ret_value);
      }
    }
  }
  leaf_buff=0;
  LINT_INIT(leaf_page);
  if (nod_flag)
  {
390
    /* Read left child page */
391
    leaf_page= _ma_kpos(nod_flag,keypos);
unknown's avatar
unknown committed
392
    if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
393
                                       MARIA_MAX_KEY_BUFF*2)))
394
    {
395
      DBUG_PRINT("error", ("Couldn't allocate memory"));
396 397 398
      my_errno=ENOMEM;
      DBUG_RETURN(-1);
    }
399 400 401
    if (!_ma_fetch_keypage(info,keyinfo,leaf_page,
                           PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
                           0, &leaf_page_link))
402 403 404 405 406 407 408 409
      goto err;
  }

  if (flag != 0)
  {
    if (!nod_flag)
    {
      DBUG_PRINT("error",("Didn't find key"));
410
      maria_print_error(share, HA_ERR_CRASHED);
411 412 413 414
      my_errno=HA_ERR_CRASHED;		/* This should newer happend */
      goto err;
    }
    save_flag=0;
415 416
    ret_value= d_search(info, key, comp_flag, leaf_page, leaf_buff,
                        leaf_page_link);
417 418 419 420
  }
  else
  {						/* Found key */
    uint tmp;
421
    uint anc_buff_length= _ma_get_page_used(share, anc_buff);
422 423 424
    uint anc_page_flag= _ma_get_keypage_flag(share, anc_buff);

    if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
425
                          anc_buff + anc_buff_length,
426
                          &next_block, &s_temp)))
427 428
      goto err;

429
    anc_page_link->changed= 1;
430
    anc_buff_length-= tmp;
431
    _ma_store_page_used(share, anc_buff, anc_buff_length);
432 433 434 435 436 437

    /*
      Log initial changes on pages
      If there is an underflow, there will be more changes logged to the
      page
    */
438
    if (share->now_transactional &&
439
        _ma_log_delete(info, anc_page, anc_buff, s_temp.key_pos,
440
                       s_temp.changed_length, s_temp.move_length))
441 442
      DBUG_RETURN(-1);

443 444
    if (!nod_flag)
    {						/* On leaf page */
445 446 447
      if (anc_buff_length <= (info->quick_mode ?
                              MARIA_MIN_KEYBLOCK_LENGTH :
                              (uint) keyinfo->underflow_block_length))
448
      {
449 450
        /* Page will be written by caller if we return 1 */
        DBUG_RETURN(1);
451
      }
452 453 454 455 456
      if (_ma_write_keypage(info, keyinfo, anc_page,
                            PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
                            anc_buff))
	DBUG_RETURN(-1);
      DBUG_RETURN(0);
457
    }
458
    save_flag=1;                         /* Mark that anc_buff is changed */
459
    ret_value= del(info, key, anc_page, anc_buff,
460 461
                   leaf_page, leaf_buff, leaf_page_link,
                   keypos, next_block, lastkey);
462 463 464 465 466
  }
  if (ret_value >0)
  {
    save_flag=1;
    if (ret_value == 1)
467 468
      ret_value= underflow(info, keyinfo, anc_page, anc_buff,
                           leaf_page, leaf_buff, leaf_page_link, keypos);
469
    else
470 471 472
    {
      /* This can only happen with variable length keys */
      MARIA_KEY last_key;
473
      DBUG_PRINT("test",("Enlarging of key when deleting"));
474 475 476 477

      last_key.data=    lastkey;
      last_key.keyinfo= keyinfo;
      if (!_ma_get_last_key(&last_key, anc_buff, keypos))
478
	goto err;
479 480
      ret_value= _ma_insert(info, key, anc_buff, keypos, anc_page,
                            last_key.data, (my_off_t) 0, (uchar*) 0,
481
                            (MARIA_PINNED_PAGE*) 0, (uchar*) 0, (my_bool) 0);
482 483
    }
  }
484
  if (ret_value == 0 && _ma_get_page_used(share, anc_buff) >
485
      (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
486
  {
487
    /* parent buffer got too big ; We have to split the page */
488
    save_flag=1;
489
    ret_value= _ma_split_page(info, key, anc_page, anc_buff,
490 491 492
                              (uint) (keyinfo->block_length -
                                      KEYPAGE_CHECKSUM_SIZE),
                              (uchar*) 0, 0, 0, lastkey, 0) | 2;
493 494
  }
  if (save_flag && ret_value != 1)
495 496 497 498 499 500
  {
    anc_page_link->changed= 1;
    ret_value|= _ma_write_keypage(info, keyinfo, anc_page,
                                  PAGECACHE_LOCK_LEFT_WRITELOCKED,
                                  DFLT_INIT_HITS, anc_buff);
  }
501 502
  else
  {
503
    DBUG_DUMP("page", anc_buff, _ma_get_page_used(share, anc_buff));
504
  }
unknown's avatar
unknown committed
505
  my_afree(leaf_buff);
506 507 508 509
  DBUG_PRINT("exit",("Return: %d",ret_value));
  DBUG_RETURN(ret_value);

err:
unknown's avatar
unknown committed
510
  my_afree(leaf_buff);
511 512 513 514 515
  DBUG_PRINT("exit",("Error: %d",my_errno));
  DBUG_RETURN (-1);
} /* d_search */


516 517 518 519 520 521 522 523 524 525 526 527
/**
   @brief Remove a key that has a page-reference

   @param info		 Maria handler
   @param key		 Buffer for key to be inserted at upper level
   @param anc_page	 Page address for page where deleted key was
   @param anc_buff       Page buffer (nod) where deleted key was
   @param leaf_page      Page address for nod before the deleted key
   @param leaf_buff      Buffer for leaf_page
   @param leaf_buff_link Pinned page link for leaf_buff
   @param keypos         Pos to where deleted key was on anc_buff
   @param next_block	 Page adress for nod after deleted key
528
   @param ret_key_buff	 Key before keypos in anc_buff
529 530

   @notes
531 532
      leaf_buff must be written to disk if retval > 0
      anc_buff  is not updated on disk. Caller should do this
533 534 535

   @return
   @retval < 0   Error
536 537
   @retval 0     OK.    leaf_buff is written to disk

538
   @retval 1     key contains key to upper level (from balance page)
539
                 leaf_buff has underflow
540 541
   @retval 2     key contains key to upper level (from split space)
*/
542

543 544
static int del(MARIA_HA *info, MARIA_KEY *key,
               my_off_t anc_page, uchar *anc_buff,
545 546
               my_off_t leaf_page, uchar *leaf_buff,
               MARIA_PINNED_PAGE *leaf_page_link,
547
	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
548 549
{
  int ret_value,length;
550
  uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
551
  my_off_t next_page;
552
  uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
553
  MARIA_KEY_PARAM s_temp;
554
  MARIA_PINNED_PAGE *next_page_link;
555 556 557 558
  MARIA_KEY tmp_key;
  MARIA_SHARE *share= info->s;
  MARIA_KEYDEF *keyinfo= key->keyinfo;
  MARIA_KEY ret_key;
559
  DBUG_ENTER("del");
560
  DBUG_PRINT("enter",("leaf_page: %ld  keypos: 0x%lx", (long) leaf_page,
561 562
		      (ulong) keypos));

563 564 565
  page_flag= _ma_get_keypage_flag(share, leaf_buff);
  _ma_get_used_and_nod_with_flag(share, page_flag, leaf_buff, leaf_length,
                                 nod_flag);
566
  DBUG_DUMP("leaf_buff", leaf_buff, leaf_length);
567

568
  endpos= leaf_buff + leaf_length;
569 570 571 572
  tmp_key.keyinfo= keyinfo;
  tmp_key.data=    keybuff;

  if (!(key_start= _ma_get_last_key(&tmp_key, leaf_buff, endpos)))
573 574
    DBUG_RETURN(-1);

575
  if (nod_flag)
576 577
  {
    next_page= _ma_kpos(nod_flag,endpos);
unknown's avatar
unknown committed
578
    if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
579
					MARIA_MAX_KEY_BUFF*2)))
580
      DBUG_RETURN(-1);
581 582
    if (!_ma_fetch_keypage(info, keyinfo, next_page, PAGECACHE_LOCK_WRITE,
                           DFLT_INIT_HITS, next_buff, 0, &next_page_link))
583 584 585
      ret_value= -1;
    else
    {
586
      DBUG_DUMP("next_page", next_buff, _ma_get_page_used(share, next_buff));
587
      if ((ret_value= del(info, key, anc_page, anc_buff, next_page,
588
                          next_buff, next_page_link, keypos, next_block,
589
                          ret_key_buff)) >0)
590
      {
591
        /* Get new length after key was deleted */
592
	endpos=leaf_buff+_ma_get_page_used(share, leaf_buff);
593 594
	if (ret_value == 1)
	{
595
	  ret_value= underflow(info, keyinfo, leaf_page, leaf_buff, next_page,
596
                               next_buff, next_page_link, endpos);
597
	  if (ret_value == 0 &&
598
              _ma_get_page_used(share, leaf_buff) >
599
              (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
600
	  {
601
	    ret_value= (_ma_split_page(info, key,
602 603 604 605
                                       leaf_page, leaf_buff,
                                       (uint) (keyinfo->block_length -
                                               KEYPAGE_CHECKSUM_SIZE),
                                       (uchar*) 0, 0, 0,
606
                                       ret_key_buff, 0) | 2);
607 608 609 610 611
	  }
	}
	else
	{
	  DBUG_PRINT("test",("Inserting of key when deleting"));
612
	  if (!_ma_get_last_key(&tmp_key, leaf_buff, endpos))
613
	    goto err;
614 615 616 617
	  ret_value= _ma_insert(info, key, leaf_buff, endpos,
                                leaf_page, tmp_key.data, (my_off_t) 0,
                                (uchar*) 0, (MARIA_PINNED_PAGE *) 0,
                                (uchar*) 0, 0);
618 619
	}
      }
620
      leaf_page_link->changed= 1;
621 622 623 624 625 626 627 628 629
      /*
        If ret_value <> 0, then leaf_page underflowed and caller will have
        to handle underflow and write leaf_page to disk.
        We can't write it here, as if leaf_page is empty we get an assert
        in _ma_write_keypage.
      */
      if (ret_value == 0 && _ma_write_keypage(info, keyinfo, leaf_page,
                                              PAGECACHE_LOCK_LEFT_WRITELOCKED,
                                              DFLT_INIT_HITS, leaf_buff))
630 631
	goto err;
    }
unknown's avatar
unknown committed
632
    my_afree(next_buff);
633 634 635
    DBUG_RETURN(ret_value);
  }

636 637 638 639 640 641 642 643
  /*
    Remove last key from leaf page
    Note that leaf_page page may only have had one key (can normally only
    happen in quick mode), in which ase it will now temporary have 0 keys
    on it. This will be corrected by the caller as we will return 0.
  */
  new_leaf_length= (uint) (key_start - leaf_buff);
  _ma_store_page_used(share, leaf_buff, new_leaf_length);
644

645
  if (share->now_transactional &&
646
      _ma_log_suffix(info, leaf_page, leaf_buff, leaf_length,
647
                     new_leaf_length))
648 649
    goto err;

650
  leaf_page_link->changed= 1;                 /* Safety */
651 652 653 654 655 656 657 658 659 660 661 662 663 664
  if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
                          (uint) keyinfo->underflow_block_length))
  {
    /* Underflow, leaf_page will be written by caller */
    ret_value= 1;
  }
  else
  {
    ret_value= 0;
    if (_ma_write_keypage(info, keyinfo, leaf_page,
                          PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
                          leaf_buff))
      goto err;
  }
665

666 667
  /* Place last key in ancestor page on deleted key position */
  a_length= _ma_get_page_used(share, anc_buff);
668
  endpos=anc_buff+a_length;
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683

  ret_key.keyinfo= keyinfo;
  ret_key.data=    ret_key_buff;

  prev_key= 0;
  if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
  {
    if (!_ma_get_last_key(&ret_key, anc_buff, keypos))
      goto err;
    prev_key= ret_key.data;
  }
  length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
                               keypos == endpos ? (uchar*) 0 : keypos,
                               prev_key, prev_key,
                               &s_temp);
684
  if (length > 0)
unknown's avatar
unknown committed
685
    bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
686 687 688
  else
    bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
  (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
689 690
  key_start= keypos;

691 692 693
  /* Save pointer to next leaf on parent page */
  if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
                           &keypos))
694 695
    goto err;
  _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
696
  _ma_store_page_used(share, anc_buff, a_length + length);
697

698
  if (share->now_transactional &&
699
      _ma_log_add(info, anc_page, anc_buff, a_length,
700 701 702
                  key_start, s_temp.changed_length, s_temp.move_length, 1))
    goto err;

703
  DBUG_RETURN(new_leaf_length <=
704 705
              (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
               (uint) keyinfo->underflow_block_length));
706 707 708 709 710
err:
  DBUG_RETURN(-1);
} /* del */


711 712 713 714 715 716 717 718 719 720 721 722
/**
   @brief Balances adjacent pages if underflow occours

   @fn    underflow()
   @param anc_buff        Anchestor page data
   @param leaf_page       Page number of leaf page
   @param leaf_buff       Leaf page (page that underflowed)
   @param leaf_page_link  Pointer to pin information about leaf page
   @param keypos          Position after current key in anc_buff

   @note
     This function writes redo entries for all changes
723
     leaf_page is saved to disk
724 725 726 727 728 729 730
     Caller must save anc_buff

   @return
   @retval  0  ok
   @retval  1  ok, but anc_buff did underflow
   @retval -1  error
 */
731

732
static int underflow(register MARIA_HA *info, MARIA_KEYDEF *keyinfo,
733 734
		     my_off_t anc_page, uchar *anc_buff,
		     my_off_t leaf_page, uchar *leaf_buff,
735
                     MARIA_PINNED_PAGE *leaf_page_link,
736
		     uchar *keypos)
737 738
{
  int t_length;
739 740
  uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
  uint next_buff_length, new_buff_length, key_reflength;
741
  uint unchanged_leaf_length, new_leaf_length, new_anc_length;
742
  uint anc_page_flag, page_flag;
743
  my_off_t next_page;
744
  uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
745 746 747
  uchar *buff,*endpos,*next_keypos,*anc_pos,*half_pos,*prev_key;
  uchar *after_key, *anc_end_pos;
  MARIA_KEY_PARAM key_deleted, key_inserted;
748
  MARIA_SHARE *share= info->s;
749
  MARIA_PINNED_PAGE *next_page_link;
750
  my_bool first_key;
751
  MARIA_KEY tmp_key, anc_key, leaf_key;
752 753 754
  DBUG_ENTER("underflow");
  DBUG_PRINT("enter",("leaf_page: %ld  keypos: 0x%lx",(long) leaf_page,
		      (ulong) keypos));
755 756
  DBUG_DUMP("anc_buff", anc_buff,   _ma_get_page_used(share, anc_buff));
  DBUG_DUMP("leaf_buff", leaf_buff, _ma_get_page_used(share, leaf_buff));
757

758
  anc_page_flag= _ma_get_keypage_flag(share, anc_buff);
759
  buff=info->buff;
760
  info->keyread_buff_used=1;
761
  next_keypos=keypos;
762 763 764 765
  nod_flag= _ma_test_if_nod(share, leaf_buff);
  p_length= nod_flag+share->keypage_header;
  anc_length= _ma_get_page_used(share, anc_buff);
  leaf_length= _ma_get_page_used(share, leaf_buff);
766
  key_reflength=share->base.key_reflength;
767
  if (share->keyinfo+info->lastinx == keyinfo)
768
    info->page_changed=1;
769
  first_key= keypos == anc_buff + share->keypage_header + key_reflength;
770

771 772 773 774 775
  tmp_key.data=  buff;
  anc_key.data=  anc_key_buff;
  leaf_key.data= leaf_key_buff;
  tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;

776
  if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
777 778
      first_key)
  {
779
    size_t tmp_length;
780
    /* Use page right of anc-page */
781 782
    DBUG_PRINT("test",("use right page"));

783 784 785 786
    /*
      Calculate position after the current key. Note that keydata itself is
      not used
    */
787 788
    if (keyinfo->flag & HA_BINARY_PACK_KEY)
    {
789
      if (!(next_keypos= _ma_get_key(&tmp_key, anc_buff, keypos)))
790 791 792 793 794 795
	goto err;
    }
    else
    {
      /* Got to end of found key */
      buff[0]=buff[1]=0;	/* Avoid length error check if packed key */
796 797 798
      if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
                               &next_keypos))
        goto err;
799 800
    }
    next_page= _ma_kpos(key_reflength,next_keypos);
801 802
    if (!_ma_fetch_keypage(info,keyinfo, next_page, PAGECACHE_LOCK_WRITE,
                           DFLT_INIT_HITS, buff, 0, &next_page_link))
803
      goto err;
804
    next_buff_length= _ma_get_page_used(share, buff);
805
    DBUG_DUMP("next", buff, next_buff_length);
806 807

    /* find keys to make a big key-page */
808
    bmove(next_keypos-key_reflength, buff + share->keypage_header,
809
          key_reflength);
810 811 812

    if (!_ma_get_last_key(&anc_key, anc_buff, next_keypos) ||
	!_ma_get_last_key(&leaf_key, leaf_buff, leaf_buff+leaf_length))
813 814 815
      goto err;

    /* merge pages and put parting key from anc_buff between */
816 817 818 819 820
    prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
    t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, buff+p_length,
                                   prev_key, prev_key, &key_inserted);
    tmp_length= next_buff_length - p_length;
    endpos= buff+tmp_length+leaf_length+t_length;
821
    /* buff will always be larger than before !*/
822
    bmove_upp(endpos, buff + next_buff_length, tmp_length);
unknown's avatar
unknown committed
823
    memcpy(buff, leaf_buff,(size_t) leaf_length);
824 825
    (*keyinfo->store_key)(keyinfo, buff+leaf_length, &key_inserted);
    buff_length= (uint) (endpos-buff);
826
    _ma_store_page_used(share, buff, buff_length);
827

828 829 830 831 832 833 834 835
    /* Set page flag from combination of both key pages and parting key */
    page_flag= (_ma_get_keypage_flag(share, buff) |
                _ma_get_keypage_flag(share, leaf_buff));
    if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                        SEARCH_PAGE_KEY_HAS_TRANSID))
      page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
    _ma_store_keypage_flag(share, buff, page_flag);

836
    /* remove key from anc_buff */
837 838 839
    if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
                              anc_key_buff, anc_buff+anc_length,
                              (my_off_t *) 0, &key_deleted)))
840 841
      goto err;

842
    new_anc_length= anc_length - s_length;
843
    _ma_store_page_used(share, anc_buff, new_anc_length);
844

845
    if (buff_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
846 847
    {
      /* All keys fitted into one page */
848 849
      next_page_link->changed= 1;
      if (_ma_dispose(info, next_page, 0))
850
       goto err;
851 852 853

      memcpy(leaf_buff, buff, (size_t) buff_length);

854
      if (share->now_transactional)
855 856 857 858 859 860 861 862 863 864 865 866 867 868
      {
        /* Log changes to parent page */
        if (_ma_log_delete(info, anc_page, anc_buff, key_deleted.key_pos,
                           key_deleted.changed_length,
                           key_deleted.move_length))
          goto err;
        /*
          Log changes to leaf page. Data for leaf page is in buff
          which contains original leaf_buff, parting key and next_buff
        */
        if (_ma_log_suffix(info, leaf_page, leaf_buff,
                           leaf_length, buff_length))
          goto err;
      }
869 870
    }
    else
871 872 873 874 875 876 877 878 879 880 881 882
    {
      /*
        Balancing didn't free a page, so we have to split 'buff' into two
        pages:
        - Find key in middle of buffer
        - Store everything before key in 'leaf_buff'
        - Pack key into anc_buff at position of deleted key
          Note that anc_buff may overflow! (is handled by caller)
        - Store remaining keys in next_page (buff)
      */
      MARIA_KEY_PARAM anc_key_inserted;

883
      anc_end_pos= anc_buff + new_anc_length;
884

885 886
      DBUG_PRINT("test",("anc_buff: 0x%lx  anc_end_pos: 0x%lx",
                         (long) anc_buff, (long) anc_end_pos));
887 888

      if (!first_key && !_ma_get_last_key(&anc_key, anc_buff, keypos))
889
	goto err;
890 891
      if (!(half_pos= _ma_find_half_pos(info, &leaf_key, nod_flag, buff,
                                        &after_key)))
892
	goto err;
893 894
      new_leaf_length= (uint) (half_pos-buff);
      memcpy(leaf_buff, buff, (size_t) new_leaf_length);
895
      _ma_store_page_used(share, leaf_buff, new_leaf_length);
896
      _ma_store_keypage_flag(share, leaf_buff, page_flag);
897 898 899

      /* Correct new keypointer to leaf_page */
      half_pos=after_key;
900 901 902
      _ma_kpointer(info,
                   leaf_key.data+leaf_key.data_length + leaf_key.ref_length,
                   next_page);
903

904
      /* Save key in anc_buff */
905 906 907 908 909
      prev_key= (first_key  ? (uchar*) 0 : anc_key.data);
      t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
                                     (keypos == anc_end_pos ? (uchar*) 0 :
                                      keypos),
                                     prev_key, prev_key, &anc_key_inserted);
910
      if (t_length >= 0)
911 912
	bmove_upp(anc_end_pos+t_length, anc_end_pos,
                  (uint) (anc_end_pos - keypos));
913
      else
914 915
	bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
      (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
916
      new_anc_length+= t_length;
917
      _ma_store_page_used(share, anc_buff, new_anc_length);
918 919 920
      if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                           SEARCH_PAGE_KEY_HAS_TRANSID))
        _ma_mark_page_with_transid(share, anc_buff);
921

922
      /* Store key first in new page */
923
      if (nod_flag)
924
	bmove(buff+share->keypage_header, half_pos-nod_flag,
925
              (size_t) nod_flag);
926
      if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
927
	goto err;
928
      t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
unknown's avatar
unknown committed
929
					  (uchar*) 0, (uchar*) 0,
930
					  &key_inserted);
931
      /* t_length will always be > 0 for a new page !*/
932 933
      tmp_length= (size_t) ((buff + buff_length) - half_pos);
      bmove(buff+p_length+t_length, half_pos, tmp_length);
934
      (*keyinfo->store_key)(keyinfo,buff+p_length, &key_inserted);
935
      new_buff_length= tmp_length + t_length + p_length;
936
      _ma_store_page_used(share, buff, new_buff_length);
937
      /* keypage flag is already up to date */
938

939
      if (share->now_transactional)
940 941 942 943 944 945 946 947 948 949 950 951
      {
        /*
          Log changes to parent page
          This has one key deleted from it and one key inserted to it at
          keypos

          ma_log_add ensures that we don't log changes that is outside of
          key block size, as the REDO code can't handle that
        */
        if (_ma_log_add(info, anc_page, anc_buff, anc_length,
                        keypos,
                        anc_key_inserted.move_length +
952
                        max(anc_key_inserted.changed_length -
953 954 955 956 957
                            anc_key_inserted.move_length,
                            key_deleted.changed_length),
                        anc_key_inserted.move_length -
                        key_deleted.move_length, 1))
          goto err;
958

959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990
        /*
          Log changes to leaf page.
          This contains original data with new data added at end
        */
        DBUG_ASSERT(leaf_length <= new_leaf_length);
        if (_ma_log_suffix(info, leaf_page, leaf_buff, leaf_length,
                           new_leaf_length))
          goto err;
        /*
          Log changes to next page

          This contains original data with some prefix data deleted and
          some compressed data at start possible extended

          Data in buff was originally:
          org_leaf_buff     [leaf_length]
          separator_key     [buff_key_inserted.move_length]
          next_key_changes  [buff_key_inserted.changed_length -move_length]
          next_page_data    [next_buff_length - p_length -
                            (buff_key_inserted.changed_length -move_length)]

          After changes it's now:
          unpacked_key      [key_inserted.changed_length]
          next_suffix       [next_buff_length - key_inserted.changed_length]

        */
        DBUG_ASSERT(new_buff_length <= next_buff_length);
        if (_ma_log_prefix(info, next_page, buff,
                           key_inserted.changed_length,
                           (int) (new_buff_length - next_buff_length)))
          goto err;
      }
991 992 993 994
      next_page_link->changed= 1;
      if (_ma_write_keypage(info, keyinfo, next_page,
                            PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
                            buff))
995 996
	goto err;
    }
997

998 999 1000 1001
    leaf_page_link->changed= 1;
    if (_ma_write_keypage(info, keyinfo, leaf_page,
                          PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
                          leaf_buff))
1002
      goto err;
1003 1004 1005
    DBUG_RETURN(new_anc_length <=
                ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
                  (uint) keyinfo->underflow_block_length)));
1006 1007 1008 1009
  }

  DBUG_PRINT("test",("use left page"));

1010
  keypos= _ma_get_last_key(&anc_key, anc_buff, keypos);
1011 1012 1013
  if (!keypos)
    goto err;
  next_page= _ma_kpos(key_reflength,keypos);
1014 1015 1016
  if (!_ma_fetch_keypage(info, keyinfo, next_page, PAGECACHE_LOCK_WRITE,
                         DFLT_INIT_HITS, buff, 0, &next_page_link))
    goto err;
1017
  buff_length= _ma_get_page_used(share, buff);
1018
  endpos= buff + buff_length;
unknown's avatar
unknown committed
1019
  DBUG_DUMP("prev",buff,buff_length);
1020 1021

  /* find keys to make a big key-page */
1022
  bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1023
        key_reflength);
1024
  next_keypos=keypos;
1025 1026
  if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
                           &next_keypos))
1027
    goto err;
1028
  if (!_ma_get_last_key(&leaf_key, buff, endpos))
1029 1030 1031
    goto err;

  /* merge pages and put parting key from anc_buff between */
1032 1033
  prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
  t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1034
				(leaf_length == p_length ?
unknown's avatar
unknown committed
1035
                                 (uchar*) 0 : leaf_buff+p_length),
1036
				prev_key, prev_key,
1037
				&key_inserted);
1038
  if (t_length >= 0)
unknown's avatar
unknown committed
1039 1040
    bmove(endpos+t_length, leaf_buff+p_length,
          (size_t) (leaf_length-p_length));
1041
  else						/* We gained space */
unknown's avatar
unknown committed
1042
    bmove(endpos,leaf_buff+((int) p_length-t_length),
1043
	  (size_t) (leaf_length-p_length+t_length));
1044
  (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1045

1046 1047 1048 1049 1050 1051
  /* Remember for logging how many bytes of leaf_buff that are not changed */
  DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
  unchanged_leaf_length= leaf_length - (key_inserted.changed_length -
                                        key_inserted.move_length);

  new_buff_length= buff_length + leaf_length - p_length + t_length;
1052
  _ma_store_page_used(share, buff, new_buff_length);
1053

1054 1055 1056 1057 1058 1059 1060
  page_flag= (_ma_get_keypage_flag(share, buff) |
              _ma_get_keypage_flag(share, leaf_buff));
  if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                       SEARCH_PAGE_KEY_HAS_TRANSID))
    page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
  _ma_store_keypage_flag(share, buff, page_flag);

1061
  /* remove key from anc_buff */
1062 1063 1064
  if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
                             anc_key_buff,
                             anc_buff+anc_length, (my_off_t *) 0,
1065
                             &key_deleted)))
1066 1067
    goto err;

1068
  new_anc_length= anc_length - s_length;
1069
  _ma_store_page_used(share, anc_buff, new_anc_length);
1070

1071 1072 1073 1074
  if (new_buff_length <= (uint) (keyinfo->block_length -
                                 KEYPAGE_CHECKSUM_SIZE))
  {
    /* All keys fitted into one page */
1075 1076
    leaf_page_link->changed= 1;
    if (_ma_dispose(info, leaf_page, 0))
1077
      goto err;
1078

1079
    if (share->now_transactional)
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
    {
      /* Log changes to parent page */
      if (_ma_log_delete(info, anc_page, anc_buff, key_deleted.key_pos,
                         key_deleted.changed_length, key_deleted.move_length))

        goto err;
      /*
        Log changes to next page. Data for leaf page is in buff
        that contains original leaf_buff, parting key and next_buff
      */
      if (_ma_log_suffix(info, next_page, buff,
                         buff_length, new_buff_length))
        goto err;
    }
1094 1095
  }
  else
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
  {
    /*
      Balancing didn't free a page, so we have to split 'buff' into two
      pages
      - Find key in middle of buffer (buff)
      - Pack key at half_buff into anc_buff at position of deleted key
        Note that anc_buff may overflow! (is handled by caller)
      - Move everything after middlekey to 'leaf_buff'
      - Shorten buff at 'endpos'
    */
    MARIA_KEY_PARAM anc_key_inserted;
1107
    size_t tmp_length;
1108 1109 1110

    if (first_key)
      anc_pos= 0;				/* First key */
1111 1112 1113 1114 1115 1116 1117 1118
    else
    {
      if (!_ma_get_last_key(&anc_key, anc_buff, keypos))
        goto err;
      anc_pos= anc_key.data;
    }
    if (!(endpos= _ma_find_half_pos(info, &leaf_key, nod_flag, buff,
                                    &half_pos)))
1119
      goto err;
1120 1121

    /* Correct new keypointer to leaf_page */
1122 1123
    _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
                 leaf_key.ref_length, leaf_page);
1124

1125
    /* Save key in anc_buff */
1126
    DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1127
    DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1128
    anc_end_pos= anc_buff + new_anc_length;
1129
    t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1130
				  keypos == anc_end_pos ? (uchar*) 0
1131 1132
				  : keypos,
				  anc_pos, anc_pos,
1133
				  &anc_key_inserted);
1134 1135 1136
    if (t_length >= 0)
      bmove_upp(anc_end_pos+t_length, anc_end_pos,
                (uint) (anc_end_pos-keypos));
1137
    else
1138 1139
      bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
    (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1140
    new_anc_length+= t_length;
1141
    _ma_store_page_used(share, anc_buff, new_anc_length);
1142 1143 1144
    if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                         SEARCH_PAGE_KEY_HAS_TRANSID))
      _ma_mark_page_with_transid(share, anc_buff);
1145 1146 1147

    /* Store first key on new page */
    if (nod_flag)
1148
      bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1149
            (size_t) nod_flag);
1150
    if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1151
      goto err;
1152 1153 1154
    DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
    t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
				  (uchar*) 0, (uchar*) 0, &key_inserted);
1155
    /* t_length will always be > 0 for a new page !*/
1156 1157 1158
    tmp_length= (size_t) ((buff + new_buff_length) - half_pos);
    DBUG_PRINT("info",("t_length: %d  length: %d",t_length, (int) tmp_length));
    bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1159
    (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1160
    new_leaf_length= tmp_length + t_length + p_length;
1161
    _ma_store_page_used(share, leaf_buff, new_leaf_length);
1162
    _ma_store_keypage_flag(share, leaf_buff, page_flag);
1163
    new_buff_length= (uint) (endpos - buff);
1164
    _ma_store_page_used(share, buff, new_buff_length);
1165

1166
    if (share->now_transactional)
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
    {
      /*
        Log changes to parent page
        This has one key deleted from it and one key inserted to it at
        keypos

        ma_log_add() ensures that we don't log changes that is outside of
        key block size, as the REDO code can't handle that
      */
      if (_ma_log_add(info, anc_page, anc_buff, anc_length,
                      keypos,
                      anc_key_inserted.move_length +
1179
                      max(anc_key_inserted.changed_length -
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
                          anc_key_inserted.move_length,
                          key_deleted.changed_length),
                      anc_key_inserted.move_length -
                      key_deleted.move_length, 1))
        goto err;

      /*
        Log changes to leaf page.
        This contains original data with new data added first
      */
      DBUG_ASSERT(leaf_length <= new_leaf_length);
      if (_ma_log_prefix(info, leaf_page, leaf_buff,
                         new_leaf_length - unchanged_leaf_length,
                         (int) (new_leaf_length - leaf_length)))
        goto err;
      /*
        Log changes to next page
        This contains original data with some suffix data deleted

      */
      DBUG_ASSERT(new_buff_length <= buff_length);
      if (_ma_log_suffix(info, next_page, buff,
                         buff_length, new_buff_length))
        goto err;
    }

1206 1207 1208 1209
    leaf_page_link->changed= 1;
    if (_ma_write_keypage(info, keyinfo, leaf_page,
                          PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
                          leaf_buff))
1210 1211
      goto err;
  }
1212 1213 1214
  next_page_link->changed= 1;
  if (_ma_write_keypage(info, keyinfo, next_page,
                        PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS, buff))
1215
    goto err;
1216 1217 1218 1219

  DBUG_RETURN(new_anc_length <=
              ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
                (uint) keyinfo->underflow_block_length)));
1220 1221 1222 1223 1224 1225

err:
  DBUG_RETURN(-1);
} /* underflow */


1226
/**
1227 1228 1229 1230
  @brief Remove a key from page

  @fn remove_key()
    keyinfo	          Key handle
1231
    nod_flag              Length of node ptr
1232
    keypos	          Where on page key starts
1233
    lastkey	          Buffer for storing keys to be removed
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
    page_end	          Pointer to end of page
    next_block	          If <> 0 and node-page, this is set to address of
    		          next page
    s_temp	          Information about what changes was done one the page:
    s_temp.key_pos        Start of key
    s_temp.move_length    Number of bytes removed at keypos
    s_temp.changed_length Number of bytes changed at keypos

  @todo
    The current code doesn't handle the case that the next key may be
    packed better against the previous key if there is a case difference

  @return
  @retval 0  error
  @retval #  How many chars was removed
*/
1250

1251
static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1252 1253 1254
		       uchar *keypos, uchar *lastkey,
		       uchar *page_end, my_off_t *next_block,
                       MARIA_KEY_PARAM *s_temp)
1255 1256
{
  int s_length;
unknown's avatar
unknown committed
1257
  uchar *start;
1258
  DBUG_ENTER("remove_key");
1259 1260
  DBUG_PRINT("enter", ("keypos: 0x%lx  page_end: 0x%lx",
                       (long) keypos, (long) page_end));
1261

1262 1263
  start= s_temp->key_pos= keypos;
  s_temp->changed_length= 0;
1264 1265
  if (!(keyinfo->flag &
	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1266 1267
	 HA_BINARY_PACK_KEY)) &&
      !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1268
  {
1269
    /* Static length key */
1270 1271 1272 1273 1274
    s_length=(int) (keyinfo->keylength+nod_flag);
    if (next_block && nod_flag)
      *next_block= _ma_kpos(nod_flag,keypos+s_length);
  }
  else
1275 1276 1277 1278
  {
    /* Let keypos point at next key */
    MARIA_KEY key;

1279
    /* Calculate length of key */
1280 1281 1282
    key.keyinfo= keyinfo;
    key.data=    lastkey;
    if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1283 1284 1285 1286 1287 1288 1289 1290 1291
      DBUG_RETURN(0);				/* Error */

    if (next_block && nod_flag)
      *next_block= _ma_kpos(nod_flag,keypos);
    s_length=(int) (keypos-start);
    if (keypos != page_end)
    {
      if (keyinfo->flag & HA_BINARY_PACK_KEY)
      {
1292
	uchar *old_key= start;
1293
	uint next_length,prev_length,prev_pack_length;
1294 1295

        /* keypos points here on start of next key */
1296 1297 1298 1299
	get_key_length(next_length,keypos);
	get_key_pack_length(prev_length,prev_pack_length,old_key);
	if (next_length > prev_length)
	{
1300
          uint diff= (next_length-prev_length);
1301
	  /* We have to copy data from the current key to the next key */
1302 1303 1304
	  keypos-= diff + prev_pack_length;
	  store_key_length(keypos, prev_length);
          bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1305
	  s_length=(int) (keypos-start);
1306
          s_temp->changed_length= diff + prev_pack_length;
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346
	}
      }
      else
      {
	/* Check if a variable length first key part */
	if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
	{
	  /* Next key is packed against the current one */
	  uint next_length,prev_length,prev_pack_length,lastkey_length,
	    rest_length;
	  if (keyinfo->seg[0].length >= 127)
	  {
	    if (!(prev_length=mi_uint2korr(start) & 32767))
	      goto end;
	    next_length=mi_uint2korr(keypos) & 32767;
	    keypos+=2;
	    prev_pack_length=2;
	  }
	  else
	  {
	    if (!(prev_length= *start & 127))
	      goto end;				/* Same key as previous*/
	    next_length= *keypos & 127;
	    keypos++;
	    prev_pack_length=1;
	  }
	  if (!(*start & 128))
	    prev_length=0;			/* prev key not packed */
	  if (keyinfo->seg[0].flag & HA_NULL_PART)
	    lastkey++;				/* Skip null marker */
	  get_key_length(lastkey_length,lastkey);
	  if (!next_length)			/* Same key after */
	  {
	    next_length=lastkey_length;
	    rest_length=0;
	  }
	  else
	    get_key_length(rest_length,keypos);

	  if (next_length >= prev_length)
1347 1348 1349 1350 1351
	  {
            /* Next key is based on deleted key */
            uint pack_length;
            uint diff= (next_length-prev_length);

1352
            /* keypos points to data of next key (after key length) */
1353 1354
	    bmove(keypos - diff, lastkey + prev_length, diff);
	    rest_length+= diff;
1355
	    pack_length= prev_length ? get_pack_length(rest_length): 0;
1356
	    keypos-= diff + pack_length + prev_pack_length;
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378
	    s_length=(int) (keypos-start);
	    if (prev_length)			/* Pack against prev key */
	    {
	      *keypos++= start[0];
	      if (prev_pack_length == 2)
		*keypos++= start[1];
	      store_key_length(keypos,rest_length);
	    }
	    else
	    {
	      /* Next key is not packed anymore */
	      if (keyinfo->seg[0].flag & HA_NULL_PART)
	      {
		rest_length++;			/* Mark not null */
	      }
	      if (prev_pack_length == 2)
	      {
		mi_int2store(keypos,rest_length);
	      }
	      else
		*keypos= rest_length;
	    }
1379
            s_temp->changed_length= diff + pack_length + prev_pack_length;
1380 1381 1382 1383 1384 1385
	  }
	}
      }
    }
  }
  end:
unknown's avatar
unknown committed
1386
  bmove(start, start+s_length, (uint) (page_end-start-s_length));
1387
  s_temp->move_length= s_length;
1388 1389
  DBUG_RETURN((uint) s_length);
} /* remove_key */
1390 1391 1392 1393 1394 1395


/****************************************************************************
  Logging of redos
****************************************************************************/

1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406
/**
   @brief log entry where some parts are deleted and some things are changed

   @fn _ma_log_delete()
   @param info		  Maria handler
   @param page	          Pageaddress for changed page
   @param buff		  Page buffer
   @param key_pos         Start of change area
   @param changed_length  How many bytes where changed at key_pos
   @param move_length     How many bytes where deleted at key_pos

1407 1408
*/

unknown's avatar
unknown committed
1409 1410 1411
my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, const uchar *buff,
                       const uchar *key_pos, uint changed_length,
                       uint move_length)
1412 1413
{
  LSN lsn;
1414
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 9 + 7], *log_pos;
1415
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
1416
  MARIA_SHARE *share= info->s;
1417 1418 1419
  uint translog_parts;
  uint offset= (uint) (key_pos - buff);
  DBUG_ENTER("_ma_log_delete");
1420 1421
  DBUG_PRINT("enter", ("page: %lu  changed_length: %u  move_length: %d",
                       (ulong) page, changed_length, move_length));
1422 1423
  DBUG_ASSERT(share->now_transactional && move_length);
  DBUG_ASSERT(offset + changed_length <= _ma_get_page_used(share, buff));
1424 1425

  /* Store address of new root page */
1426
  page/= share->block_size;
1427 1428 1429 1430 1431 1432 1433 1434
  page_store(log_data + FILEID_STORE_SIZE, page);
  log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
  log_pos[0]= KEY_OP_OFFSET;
  int2store(log_pos+1, offset);
  log_pos[3]= KEY_OP_SHIFT;
  int2store(log_pos+4, -(int) move_length);
  log_pos+= 6;
  translog_parts= 1;
1435
  if (changed_length)
1436 1437
  {
    log_pos[0]= KEY_OP_CHANGE;
1438
    int2store(log_pos+1, changed_length);
1439 1440
    log_pos+= 3;
    translog_parts= 2;
1441
    log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    buff + offset;
1442
    log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
1443 1444
  }

1445 1446
#ifdef EXTRA_DEBUG_KEY_CHANGES
  {
1447
    int page_length= _ma_get_page_used(share, buff);
1448 1449 1450 1451 1452 1453
    ha_checksum crc;
    crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
    log_pos[0]= KEY_OP_CHECK;
    int2store(log_pos+1, page_length);
    int4store(log_pos+3, crc);

1454
    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
1455 1456 1457 1458 1459 1460
    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
    changed_length+= 7;
    translog_parts++;
  }
#endif

1461
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
1462 1463 1464 1465
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);

  if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                            info->trn, info,
1466
                            (translog_size_t)
1467
                            log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1468
                            changed_length,
1469 1470 1471 1472 1473
                            TRANSLOG_INTERNAL_PARTS + translog_parts,
                            log_array, log_data, NULL))
    DBUG_RETURN(1);
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1474 1475 1476 1477 1478 1479


/****************************************************************************
  Logging of undos
****************************************************************************/

1480
int _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
unknown's avatar
unknown committed
1481 1482 1483 1484 1485 1486 1487 1488
                              my_off_t new_root, LSN *res_lsn)
{
  MARIA_SHARE *share= info->s;
  uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
                 KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
  struct st_msg_to_write_hook_for_undo_key msg;
  enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1489
  uint keynr= key->keyinfo->key_nr;
unknown's avatar
unknown committed
1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510

  lsn_store(log_data, info->trn->undo_lsn);
  key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
  log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;

  /**
    @todo BUG if we had concurrent insert/deletes, reading state's key_root
    like this would be unsafe.
  */
  if (new_root != share->state.key_root[keynr])
  {
    my_off_t page;
    page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
           new_root / share->block_size);
    page_store(log_pos, page);
    log_pos+= PAGE_STORE_SIZE;
    log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
  }

  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1511 1512 1513
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key->data;
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
                                                  key->ref_length);
unknown's avatar
unknown committed
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525

  msg.root= &share->state.key_root[keynr];
  msg.value= new_root;
  /*
    set autoincrement to 1 if this is an auto_increment key
    This is only used if we are now in a rollback of a duplicate key
  */
  msg.auto_increment= share->base.auto_key == keynr + 1;

  return translog_write_record(res_lsn, log_type,
                               info->trn, info,
                               (translog_size_t)
1526 1527
                               (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
                                log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
unknown's avatar
unknown committed
1528 1529 1530
                               TRANSLOG_INTERNAL_PARTS + 2, log_array,
                               log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
}