ma_delete.c 54.8 KB
Newer Older
1
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2
   Copyright (C) 2009-2010 Monty Program Ab
3 4 5

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
6
   the Free Software Foundation; version 2 of the License.
7 8 9 10 11 12 13 14 15 16 17 18

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "ma_fulltext.h"
#include "ma_rt_index.h"
19 20
#include "trnman.h"
#include "ma_key_recover.h"
21

22
static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
23
                    MARIA_PAGE *page);
24
static int del(MARIA_HA *info, MARIA_KEY *key,
25 26 27 28 29
               MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
		     MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
		     uchar *keypos);
30 31
static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
                       uchar *keypos, uchar *lastkey, uchar *page_end,
32
		       my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
33

34
/* @breif Remove a row from a MARIA table */
35

unknown's avatar
unknown committed
36
int maria_delete(MARIA_HA *info,const uchar *record)
37 38
{
  uint i;
unknown's avatar
unknown committed
39
  uchar *old_key;
40 41
  int save_errno;
  char lastpos[8];
42
  MARIA_SHARE *share= info->s;
43
  MARIA_KEYDEF *keyinfo;
44 45
  DBUG_ENTER("maria_delete");

46
  /* Test if record is in datafile */
47
  DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
unknown's avatar
unknown committed
48
                  maria_print_error(share, HA_ERR_CRASHED);
49 50
                  DBUG_RETURN(my_errno= HA_ERR_CRASHED););
  DBUG_EXECUTE_IF("my_error_test_undefined_error",
unknown's avatar
unknown committed
51
                  maria_print_error(share, INT_MAX);
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
                  DBUG_RETURN(my_errno= INT_MAX););
  if (!(info->update & HA_STATE_AKTIV))
  {
    DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND);	/* No database read */
  }
  if (share->options & HA_OPTION_READ_ONLY_DATA)
  {
    DBUG_RETURN(my_errno=EACCES);
  }
  if (_ma_readinfo(info,F_WRLCK,1))
    DBUG_RETURN(my_errno);
  if ((*share->compare_record)(info,record))
    goto err;				/* Error on read-check */

  if (_ma_mark_file_changed(info))
    goto err;

69 70
  /* Ensure we don't change the autoincrement value */
  info->last_auto_increment= ~(ulonglong) 0;
unknown's avatar
unknown committed
71
  /* Remove all keys from the index file */
72

73 74 75
  old_key= info->lastkey_buff2;

  for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
76
  {
unknown's avatar
unknown committed
77
    if (maria_is_key_active(share->state.key_map, i))
78
    {
79 80
      keyinfo->version++;
      if (keyinfo->flag & HA_FULLTEXT)
81
      {
82
        if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
83 84 85 86
          goto err;
      }
      else
      {
87 88 89 90 91 92
        MARIA_KEY key;
        if (keyinfo->ck_delete(info,
                               (*keyinfo->make_key)(info, &key, i, old_key,
                                                    record,
                                                    info->cur_row.lastpos,
                                                    info->cur_row.trid)))
93 94 95 96 97 98 99
          goto err;
      }
      /* The above changed info->lastkey2. Inform maria_rnext_same(). */
      info->update&= ~HA_STATE_RNEXT_SAME;
    }
  }

unknown's avatar
unknown committed
100
  if (share->calc_checksum)
unknown's avatar
unknown committed
101
  {
unknown's avatar
unknown committed
102 103 104 105 106
    /*
      We can't use the row based checksum as this doesn't have enough
      precision.
    */
    info->cur_row.checksum= (*share->calc_checksum)(info, record);
unknown's avatar
unknown committed
107
  }
108

unknown's avatar
unknown committed
109 110 111
  if ((*share->delete_record)(info, record))
    goto err;				/* Remove record from database */

112 113
  info->state->checksum-= info->cur_row.checksum;
  info->state->records--;
114
  info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
unknown's avatar
unknown committed
115 116
  share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
                          STATE_NOT_ZEROFILLED);
117
  info->state->changed=1;
118

unknown's avatar
unknown committed
119
  mi_sizestore(lastpos, info->cur_row.lastpos);
120 121 122 123
  VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
  allow_break();			/* Allow SIGHUP & SIGINT */
  if (info->invalidator != 0)
  {
124 125 126
    DBUG_PRINT("info", ("invalidator... '%s' (delete)",
                        share->open_file_name.str));
    (*info->invalidator)(share->open_file_name.str);
127 128 129 130 131
    info->invalidator=0;
  }
  DBUG_RETURN(0);

err:
unknown's avatar
unknown committed
132 133 134 135 136
  save_errno= my_errno;
  DBUG_ASSERT(save_errno);
  if (!save_errno)
    save_errno= HA_ERR_INTERNAL_ERROR;          /* Should never happen */

unknown's avatar
unknown committed
137
  mi_sizestore(lastpos, info->cur_row.lastpos);
138 139
  if (save_errno != HA_ERR_RECORD_CHANGED)
  {
unknown's avatar
unknown committed
140
    maria_print_error(share, HA_ERR_CRASHED);
141 142 143 144 145 146 147
    maria_mark_crashed(info);		/* mark table crashed */
  }
  VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
  info->update|=HA_STATE_WRITTEN;	/* Buffer changed */
  allow_break();			/* Allow SIGHUP & SIGINT */
  if (save_errno == HA_ERR_KEY_NOT_FOUND)
  {
unknown's avatar
unknown committed
148
    maria_print_error(share, HA_ERR_CRASHED);
149 150
    my_errno=HA_ERR_CRASHED;
  }
unknown's avatar
unknown committed
151
  DBUG_RETURN(my_errno= save_errno);
152 153 154
} /* maria_delete */


155 156 157 158 159 160 161
/*
  Remove a key from the btree index

  TODO:
   Change ma_ck_real_delete() to use another buffer for changed keys instead
   of key->data. This would allows us to remove the copying of the key here.
*/
162

163
my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
164
{
165
  MARIA_SHARE *share= info->s;
166 167
  int res;
  LSN lsn= LSN_IMPOSSIBLE;
168 169 170
  my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
  uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
  MARIA_KEY org_key;
171 172
  DBUG_ENTER("_ma_ck_delete");

173 174
  LINT_INIT_STRUCT(org_key);

175
  save_key_data= key->data;
176
  if (share->now_transactional)
177 178
  {
    /* Save original value as the key may change */
179 180 181
    memcpy(key_buff, key->data, key->data_length + key->ref_length);
    org_key= *key;
    key->data= key_buff;
182 183
  }

184 185 186 187 188
  if ((res= _ma_ck_real_delete(info, key, &new_root)))
  {
    /* We have to mark the table crashed before unpin_all_pages() */
    maria_mark_crashed(info);
  }
189

190
  key->data= save_key_data;
191
  if (!res && share->now_transactional)
192
    res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
193 194
  else
  {
195
    share->state.key_root[key->keyinfo->key_nr]= new_root;
196 197 198
    _ma_fast_unlock_key_del(info);
  }
  _ma_unpin_all_pages_and_finalize_row(info, lsn);
199
  DBUG_RETURN(res != 0);
200 201 202
} /* _ma_ck_delete */


203 204
my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
                           my_off_t *root)
205 206
{
  int error;
207
  my_bool result= 0;
208
  my_off_t old_root;
unknown's avatar
unknown committed
209
  uchar *root_buff;
210
  MARIA_KEYDEF *keyinfo= key->keyinfo;
211
  MARIA_PAGE page;
212 213 214 215
  DBUG_ENTER("_ma_ck_real_delete");

  if ((old_root=*root) == HA_OFFSET_ERROR)
  {
216 217
    my_errno=HA_ERR_CRASHED;
    DBUG_RETURN(1);
218
  }
unknown's avatar
unknown committed
219
  if (!(root_buff= (uchar*)  my_alloca((uint) keyinfo->block_length+
220
                                       MARIA_MAX_KEY_BUFF*2)))
221 222
  {
    DBUG_PRINT("error",("Couldn't allocate memory"));
223 224
    my_errno=ENOMEM;
    DBUG_RETURN(1);
225
  }
226 227
  DBUG_PRINT("info",("root_page: %lu",
                     (ulong) (old_root / keyinfo->block_length)));
228 229
  if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
                        PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
230
  {
231
    result= 1;
232 233
    goto err;
  }
234 235 236
  if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
                                   SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
                                   SEARCH_SAME),
237
                       &page)))
238
  {
239 240 241
    if (error < 0)
      result= 1;
    else if (error == 2)
242 243
    {
      DBUG_PRINT("test",("Enlarging of root when deleting"));
244 245
      if (_ma_enlarge_root(info, key, root))
        result= 1;
246 247 248
    }
    else /* error == 1 */
    {
249
      MARIA_SHARE *share= info->s;
250 251 252 253

      page_mark_changed(info, &page);

      if (page.size <= page.node + share->keypage_header + 1)
254
      {
255 256 257
	if (page.node)
	  *root= _ma_kpos(page.node, root_buff +share->keypage_header +
                          page.node);
258 259
	else
	  *root=HA_OFFSET_ERROR;
260
	if (_ma_dispose(info, old_root, 0))
261
	  result= 1;
262
      }
263 264 265
      else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
                                 DFLT_INIT_HITS))
        result= 1;
266 267 268
    }
  }
err:
Michael Widenius's avatar
Michael Widenius committed
269
  my_afree(root_buff);
270 271
  DBUG_PRINT("exit",("Return: %d",result));
  DBUG_RETURN(result);
272 273 274
} /* _ma_ck_real_delete */


275 276
/**
   @brief Remove key below key root
277

278
   @param key  Key to delete.  Will contain new key if block was enlarged
279

280 281 282 283 284
   @return
   @retval 0   ok (anc_page is not changed)
   @retval 1   If data on page is too small; In this case anc_buff is not saved
   @retval 2   If data on page is too big
   @retval -1  On errors
285
*/
286

287
static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
288
                    MARIA_PAGE *anc_page)
289 290
{
  int flag,ret_value,save_flag;
291
  uint nod_flag, page_flag;
292
  my_bool last_key;
unknown's avatar
unknown committed
293
  uchar *leaf_buff,*keypos;
294
  uchar lastkey[MARIA_MAX_KEY_BUFF];
295
  MARIA_KEY_PARAM s_temp;
296
  MARIA_SHARE *share= info->s;
297
  MARIA_KEYDEF *keyinfo= key->keyinfo;
298
  MARIA_PAGE leaf_page;
299
  DBUG_ENTER("d_search");
300
  DBUG_DUMP("page", anc_page->buff, anc_page->size);
301

302
  flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
303
                              &last_key);
304 305 306 307 308
  if (flag == MARIA_FOUND_WRONG_KEY)
  {
    DBUG_PRINT("error",("Found wrong key"));
    DBUG_RETURN(-1);
  }
309 310
  page_flag= anc_page->flag;
  nod_flag=  anc_page->node;
311

312
  if (!flag && (keyinfo->flag & HA_FULLTEXT))
313 314 315 316 317 318 319 320 321 322 323 324 325 326
  {
    uint off;
    int  subkeys;

    get_key_full_length_rdonly(off, lastkey);
    subkeys=ft_sintXkorr(lastkey+off);
    DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
    comp_flag=SEARCH_SAME;
    if (subkeys >= 0)
    {
      /* normal word, one-level tree structure */
      if (info->ft1_to_ft2)
      {
        /* we're in ft1->ft2 conversion mode. Saving key data */
327
        insert_dynamic(info->ft1_to_ft2, (lastkey+off));
328 329 330 331
      }
      else
      {
        /* we need exact match only if not in ft1->ft2 conversion mode */
332
        flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
333
                                    lastkey, &last_key);
334 335 336 337 338 339 340 341
      }
      /* fall through to normal delete */
    }
    else
    {
      /* popular word. two-level tree. going down */
      uint tmp_key_length;
      my_off_t root;
unknown's avatar
unknown committed
342
      uchar *kpos=keypos;
343
      MARIA_KEY tmp_key;
344

345 346 347 348 349
      tmp_key.data=    lastkey;
      tmp_key.keyinfo= keyinfo;

      if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
                                               &kpos)))
350 351 352 353
      {
        my_errno= HA_ERR_CRASHED;
        DBUG_RETURN(-1);
      }
354
      root= _ma_row_pos_from_key(&tmp_key);
355 356 357
      if (subkeys == -1)
      {
        /* the last entry in sub-tree */
358
        if (_ma_dispose(info, root, 1))
359 360 361 362 363
          DBUG_RETURN(-1);
        /* fall through to normal delete */
      }
      else
      {
364
        MARIA_KEY word_key;
365
        keyinfo=&share->ft2_keyinfo;
366 367
        /* we'll modify key entry 'in vivo' */
        kpos-=keyinfo->keylength+nod_flag;
368 369 370 371 372 373 374 375 376
        get_key_full_length_rdonly(off, key->data);

        word_key.data=        key->data + off;
        word_key.keyinfo=     &share->ft2_keyinfo;
        word_key.data_length= HA_FT_WLEN;
        word_key.ref_length= 0;
        word_key.flag= 0;
        ret_value= _ma_ck_real_delete(info, &word_key, &root);
        _ma_dpointer(share, kpos+HA_FT_WLEN, root);
377 378 379
        subkeys++;
        ft_intXstore(kpos, subkeys);
        if (!ret_value)
380
        {
381 382
          page_mark_changed(info, anc_page);
          ret_value= _ma_write_keypage(anc_page,
383
                                       PAGECACHE_LOCK_LEFT_WRITELOCKED,
384
                                       DFLT_INIT_HITS);
385
        }
386 387 388 389 390 391 392 393
        DBUG_PRINT("exit",("Return: %d",ret_value));
        DBUG_RETURN(ret_value);
      }
    }
  }
  leaf_buff=0;
  if (nod_flag)
  {
394
    /* Read left child page */
395
    leaf_page.pos= _ma_kpos(nod_flag,keypos);
unknown's avatar
unknown committed
396
    if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
397
                                       MARIA_MAX_KEY_BUFF*2)))
398
    {
399
      DBUG_PRINT("error", ("Couldn't allocate memory"));
400 401 402
      my_errno=ENOMEM;
      DBUG_RETURN(-1);
    }
403 404 405
    if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
                          PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
                          0))
406 407 408 409 410 411 412 413 414 415 416 417
      goto err;
  }

  if (flag != 0)
  {
    if (!nod_flag)
    {
      DBUG_PRINT("error",("Didn't find key"));
      my_errno=HA_ERR_CRASHED;		/* This should newer happend */
      goto err;
    }
    save_flag=0;
418
    ret_value= d_search(info, key, comp_flag, &leaf_page);
419 420 421 422
  }
  else
  {						/* Found key */
    uint tmp;
423 424 425
    uint anc_buff_length= anc_page->size;
    uint anc_page_flag=   anc_page->flag;
    my_off_t next_block;
426 427

    if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
428
                          anc_page->buff + anc_buff_length,
429
                          &next_block, &s_temp)))
430 431
      goto err;

432
    page_mark_changed(info, anc_page);
433
    anc_buff_length-= tmp;
434 435
    anc_page->size= anc_buff_length;
    page_store_size(share, anc_page);
436 437 438 439 440 441

    /*
      Log initial changes on pages
      If there is an underflow, there will be more changes logged to the
      page
    */
442
    if (share->now_transactional &&
443
        _ma_log_delete(anc_page, s_temp.key_pos,
444 445
                       s_temp.changed_length, s_temp.move_length,
                       0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
446 447
      DBUG_RETURN(-1);

448 449
    if (!nod_flag)
    {						/* On leaf page */
450 451 452
      if (anc_buff_length <= (info->quick_mode ?
                              MARIA_MIN_KEYBLOCK_LENGTH :
                              (uint) keyinfo->underflow_block_length))
453
      {
454 455
        /* Page will be written by caller if we return 1 */
        DBUG_RETURN(1);
456
      }
457 458
      if (_ma_write_keypage(anc_page,
                            PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
459 460
	DBUG_RETURN(-1);
      DBUG_RETURN(0);
461
    }
462
    save_flag=1;                         /* Mark that anc_buff is changed */
463
    ret_value= del(info, key, anc_page, &leaf_page,
464
                   keypos, next_block, lastkey);
465 466 467
  }
  if (ret_value >0)
  {
468
    save_flag= 2;
469
    if (ret_value == 1)
470
      ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
471
    else
472 473 474
    {
      /* This can only happen with variable length keys */
      MARIA_KEY last_key;
475
      DBUG_PRINT("test",("Enlarging of key when deleting"));
476 477 478

      last_key.data=    lastkey;
      last_key.keyinfo= keyinfo;
479
      if (!_ma_get_last_key(&last_key, anc_page, keypos))
480
	goto err;
481 482 483
      ret_value= _ma_insert(info, key, anc_page, keypos,
                            last_key.data,
                            (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
484 485 486 487

      if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
                            DFLT_INIT_HITS))
        ret_value= -1;
488 489
    }
  }
490
  if (ret_value == 0 && anc_page->size > share->max_index_block_size)
491
  {
492 493 494 495
    /*
      parent buffer got too big ; We have to split the page.
      The | 2 is there to force write of anc page below
    */
496
    save_flag= 3;
497
    ret_value= _ma_split_page(info, key, anc_page,
498
                              share->max_index_block_size,
499
                              (uchar*) 0, 0, 0, lastkey, 0) | 2;
500
    DBUG_ASSERT(anc_page->org_size == anc_page->size);
501 502
  }
  if (save_flag && ret_value != 1)
503
  {
504 505 506 507
    page_mark_changed(info, anc_page);
    if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
                          DFLT_INIT_HITS))
      ret_value= -1;
508
  }
509 510
  else
  {
511
    DBUG_DUMP("page", anc_page->buff, anc_page->size);
512
  }
unknown's avatar
unknown committed
513
  my_afree(leaf_buff);
514 515 516 517
  DBUG_PRINT("exit",("Return: %d",ret_value));
  DBUG_RETURN(ret_value);

err:
unknown's avatar
unknown committed
518
  my_afree(leaf_buff);
519 520 521 522 523
  DBUG_PRINT("exit",("Error: %d",my_errno));
  DBUG_RETURN (-1);
} /* d_search */


524 525 526 527 528 529 530 531 532 533 534 535
/**
   @brief Remove a key that has a page-reference

   @param info		 Maria handler
   @param key		 Buffer for key to be inserted at upper level
   @param anc_page	 Page address for page where deleted key was
   @param anc_buff       Page buffer (nod) where deleted key was
   @param leaf_page      Page address for nod before the deleted key
   @param leaf_buff      Buffer for leaf_page
   @param leaf_buff_link Pinned page link for leaf_buff
   @param keypos         Pos to where deleted key was on anc_buff
   @param next_block	 Page adress for nod after deleted key
536
   @param ret_key_buff	 Key before keypos in anc_buff
537 538

   @notes
539 540
      leaf_page must be written to disk if retval > 0
      anc_page  is not updated on disk. Caller should do this
541 542 543

   @return
   @retval < 0   Error
544 545
   @retval 0     OK.    leaf_buff is written to disk

546
   @retval 1     key contains key to upper level (from balance page)
547
                 leaf_buff has underflow
548 549
   @retval 2     key contains key to upper level (from split space)
*/
550

551
static int del(MARIA_HA *info, MARIA_KEY *key,
552
               MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
553
	       uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
554 555
{
  int ret_value,length;
556 557
  uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
  uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
558
  uchar *anc_buff;
559
  MARIA_KEY_PARAM s_temp;
560 561 562 563
  MARIA_KEY tmp_key;
  MARIA_SHARE *share= info->s;
  MARIA_KEYDEF *keyinfo= key->keyinfo;
  MARIA_KEY ret_key;
564
  MARIA_PAGE next_page;
565
  DBUG_ENTER("del");
566 567
  DBUG_PRINT("enter",("leaf_page: %lu  keypos: 0x%lx",
                      (ulong) (leaf_page->pos / share->block_size),
568
		      (ulong) keypos));
569
  DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
570

571 572 573
  page_flag=   leaf_page->flag;
  leaf_length= leaf_page->size;
  nod_flag=    leaf_page->node;
574

575
  endpos= leaf_page->buff + leaf_length;
576 577 578
  tmp_key.keyinfo= keyinfo;
  tmp_key.data=    keybuff;

579
  if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
580 581
    DBUG_RETURN(-1);

582
  if (nod_flag)
583
  {
584
    next_page.pos= _ma_kpos(nod_flag,endpos);
unknown's avatar
unknown committed
585
    if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
586
					MARIA_MAX_KEY_BUFF*2)))
587
      DBUG_RETURN(-1);
588 589
    if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
                          PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
590 591 592
      ret_value= -1;
    else
    {
593 594 595
      DBUG_DUMP("next_page", next_page.buff, next_page.size);
      if ((ret_value= del(info, key, anc_page, &next_page,
                          keypos, next_block, ret_key_buff)) >0)
596
      {
597
        /* Get new length after key was deleted */
598
	endpos= leaf_page->buff+ leaf_page->size;
599 600
	if (ret_value == 1)
	{
601
          /* underflow writes "next_page" to disk */
602 603 604
	  ret_value= underflow(info, keyinfo, leaf_page, &next_page,
                               endpos);
	  if (ret_value == 0 && leaf_page->size >
605
              share->max_index_block_size)
606
	  {
607
	    ret_value= (_ma_split_page(info, key, leaf_page,
608
                                       share->max_index_block_size,
609
                                       (uchar*) 0, 0, 0,
610
                                       ret_key_buff, 0) | 2);
611 612 613 614
	  }
	}
	else
	{
615 616 617
          if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
                                DFLT_INIT_HITS))
            goto err;
618
	  DBUG_PRINT("test",("Inserting of key when deleting"));
619
	  if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
620
	    goto err;
621 622 623
	  ret_value= _ma_insert(info, key, leaf_page, endpos,
                                tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
                                0);
624 625
	}
      }
626
      page_mark_changed(info, leaf_page);
627 628 629 630 631 632
      /*
        If ret_value <> 0, then leaf_page underflowed and caller will have
        to handle underflow and write leaf_page to disk.
        We can't write it here, as if leaf_page is empty we get an assert
        in _ma_write_keypage.
      */
633
      if (ret_value == 0 && _ma_write_keypage(leaf_page,
634
                                              PAGECACHE_LOCK_LEFT_WRITELOCKED,
635
                                              DFLT_INIT_HITS))
636 637
	goto err;
    }
unknown's avatar
unknown committed
638
    my_afree(next_buff);
639 640 641
    DBUG_RETURN(ret_value);
  }

642 643 644 645 646 647
  /*
    Remove last key from leaf page
    Note that leaf_page page may only have had one key (can normally only
    happen in quick mode), in which ase it will now temporary have 0 keys
    on it. This will be corrected by the caller as we will return 0.
  */
648 649 650
  new_leaf_length= (uint) (key_start - leaf_page->buff);
  leaf_page->size= new_leaf_length;
  page_store_size(share, leaf_page);
651

652
  if (share->now_transactional &&
653
      _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
654 655
    goto err;

656
  page_mark_changed(info, leaf_page);           /* Safety */
657 658 659 660 661 662 663 664 665
  if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
                          (uint) keyinfo->underflow_block_length))
  {
    /* Underflow, leaf_page will be written by caller */
    ret_value= 1;
  }
  else
  {
    ret_value= 0;
666 667
    if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
                          DFLT_INIT_HITS))
668 669
      goto err;
  }
670

671
  /* Place last key in ancestor page on deleted key position */
672 673 674
  a_length= anc_page->size;
  anc_buff= anc_page->buff;
  endpos=   anc_buff + a_length;
675 676 677 678 679 680 681

  ret_key.keyinfo= keyinfo;
  ret_key.data=    ret_key_buff;

  prev_key= 0;
  if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
  {
682
    if (!_ma_get_last_key(&ret_key, anc_page, keypos))
683 684 685 686 687 688 689
      goto err;
    prev_key= ret_key.data;
  }
  length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
                               keypos == endpos ? (uchar*) 0 : keypos,
                               prev_key, prev_key,
                               &s_temp);
690
  if (length > 0)
unknown's avatar
unknown committed
691
    bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
692 693 694
  else
    bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
  (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
695
  key_start= keypos;
Michael Widenius's avatar
Michael Widenius committed
696 697
  if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                      SEARCH_PAGE_KEY_HAS_TRANSID))
698 699 700
  {
    _ma_mark_page_with_transid(share, anc_page);
  }
701

702 703 704
  /* Save pointer to next leaf on parent page */
  if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
                           &keypos))
705 706
    goto err;
  _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
707 708
  anc_page->size= a_length + length;
  page_store_size(share, anc_page);
709

710
  if (share->now_transactional &&
711
      _ma_log_add(anc_page, a_length,
712 713
                  key_start, s_temp.changed_length, s_temp.move_length, 1,
                  KEY_OP_DEBUG_LOG_ADD_2))
714 715
    goto err;

716
  DBUG_RETURN(new_leaf_length <=
717 718
              (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
               (uint) keyinfo->underflow_block_length));
719 720 721 722 723
err:
  DBUG_RETURN(-1);
} /* del */


724 725 726 727 728
/**
   @brief Balances adjacent pages if underflow occours

   @fn    underflow()
   @param anc_buff        Anchestor page data
729
   @param leaf_page       Leaf page (page that underflowed)
730 731 732 733 734
   @param leaf_page_link  Pointer to pin information about leaf page
   @param keypos          Position after current key in anc_buff

   @note
     This function writes redo entries for all changes
735
     leaf_page is saved to disk
736 737 738 739 740 741 742
     Caller must save anc_buff

   @return
   @retval  0  ok
   @retval  1  ok, but anc_buff did underflow
   @retval -1  error
 */
743

744 745
static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
		     MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
746
		     uchar *keypos)
747 748
{
  int t_length;
749 750
  uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
  uint next_buff_length, new_buff_length, key_reflength;
751
  uint unchanged_leaf_length, new_leaf_length, new_anc_length;
752 753
  uint anc_page_flag, page_flag;
  uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
754 755
  uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
  uchar *anc_buff, *leaf_buff;
756 757
  uchar *after_key, *anc_end_pos;
  MARIA_KEY_PARAM key_deleted, key_inserted;
758
  MARIA_SHARE *share= info->s;
759
  my_bool first_key;
760
  MARIA_KEY tmp_key, anc_key, leaf_key;
761
  MARIA_PAGE next_page;
762
  DBUG_ENTER("underflow");
763 764
  DBUG_PRINT("enter",("leaf_page: %lu  keypos: 0x%lx",
                      (ulong) (leaf_page->pos / share->block_size),
765
		      (ulong) keypos));
766 767
  DBUG_DUMP("anc_buff", anc_page->buff,  anc_page->size);
  DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
768

769 770 771
  anc_page_flag= anc_page->flag;
  anc_buff= anc_page->buff;
  leaf_buff= leaf_page->buff;
772
  info->keyread_buff_used=1;
773
  next_keypos=keypos;
774
  nod_flag= leaf_page->node;
775
  p_length= nod_flag+share->keypage_header;
776 777 778
  anc_length= anc_page->size;
  leaf_length= leaf_page->size;
  key_reflength= share->base.key_reflength;
779
  if (share->keyinfo+info->lastinx == keyinfo)
780
    info->page_changed=1;
781
  first_key= keypos == anc_buff + share->keypage_header + key_reflength;
782

783
  tmp_key.data=  info->buff;
784 785 786 787
  anc_key.data=  anc_key_buff;
  leaf_key.data= leaf_key_buff;
  tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;

788
  if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
789 790
      first_key)
  {
791
    size_t tmp_length;
792
    uint next_page_flag;
793
    /* Use page right of anc-page */
794 795
    DBUG_PRINT("test",("use right page"));

796 797 798 799
    /*
      Calculate position after the current key. Note that keydata itself is
      not used
    */
800 801
    if (keyinfo->flag & HA_BINARY_PACK_KEY)
    {
802
      if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
803 804 805 806
	goto err;
    }
    else
    {
807 808
      /* Avoid length error check if packed key */
      tmp_key.data[0]= tmp_key.data[1]= 0;
809
      /* Got to end of found key */
810 811 812
      if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
                               &next_keypos))
        goto err;
813
    }
814 815 816
    next_page.pos= _ma_kpos(key_reflength, next_keypos);
    if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
                          PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
817
      goto err;
818 819 820
    next_buff_length= next_page.size;
    next_page_flag=   next_page.flag;
    DBUG_DUMP("next", next_page.buff, next_page.size);
821 822

    /* find keys to make a big key-page */
823
    bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
824
          key_reflength);
825

826 827
    if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
	!_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
828 829
      goto err;

830
    /* merge pages and put parting key from anc_page between */
831
    prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
832
    t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
833 834
                                   prev_key, prev_key, &key_inserted);
    tmp_length= next_buff_length - p_length;
835 836 837 838 839 840
    endpos= next_page.buff + tmp_length + leaf_length + t_length;
    /* next_page.buff will always be larger than before !*/
    bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
    memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
    (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
    buff_length= (uint) (endpos - next_page.buff);
841

842
    /* Set page flag from combination of both key pages and parting key */
843
    page_flag= next_page_flag | leaf_page->flag;
844 845 846 847
    if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                        SEARCH_PAGE_KEY_HAS_TRANSID))
      page_flag|= KEYPAGE_FLAG_HAS_TRANSID;

848 849 850 851 852
    next_page.size= buff_length;
    next_page.flag= page_flag;
    page_store_info(share, &next_page);

    /* remove key from anc_page */
853 854 855
    if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
                              anc_key_buff, anc_buff+anc_length,
                              (my_off_t *) 0, &key_deleted)))
856 857
      goto err;

858
    new_anc_length= anc_length - s_length;
859 860
    anc_page->size= new_anc_length;
    page_store_size(share, anc_page);
861

862
    if (buff_length <= share->max_index_block_size)
863 864
    {
      /* All keys fitted into one page */
865 866
      page_mark_changed(info, &next_page);
      if (_ma_dispose(info, next_page.pos, 0))
867
       goto err;
868

869 870 871
      memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
      leaf_page->size= next_page.size;
      leaf_page->flag= next_page.flag;
872

873
      if (share->now_transactional)
874
      {
875 876 877 878
        /*
          Log changes to parent page. Note that this page may have been
          temporarily bigger than block_size.
         */
879
        if (_ma_log_delete(anc_page, key_deleted.key_pos,
880
                           key_deleted.changed_length,
881 882 883
                           key_deleted.move_length,
                           anc_length - anc_page->org_size,
                           KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
884 885
          goto err;
        /*
886
          Log changes to leaf page. Data for leaf page is in leaf_buff
887 888
          which contains original leaf_buff, parting key and next_buff
        */
889
        if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
890 891
          goto err;
      }
892 893
    }
    else
894 895 896 897 898
    {
      /*
        Balancing didn't free a page, so we have to split 'buff' into two
        pages:
        - Find key in middle of buffer
899 900 901
        - Store everything before key in 'leaf_page'
        - Pack key into anc_page at position of deleted key
          Note that anc_page may overflow! (is handled by caller)
902 903 904 905
        - Store remaining keys in next_page (buff)
      */
      MARIA_KEY_PARAM anc_key_inserted;

906
      anc_end_pos= anc_buff + new_anc_length;
907

908 909
      DBUG_PRINT("test",("anc_buff: 0x%lx  anc_end_pos: 0x%lx",
                         (long) anc_buff, (long) anc_end_pos));
910

911
      if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
912
	goto err;
913
      if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
914
	goto err;
915 916 917 918 919 920
      new_leaf_length= (uint) (half_pos - next_page.buff);
      memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);

      leaf_page->size= new_leaf_length;
      leaf_page->flag= page_flag;
      page_store_info(share, leaf_page);
921 922 923

      /* Correct new keypointer to leaf_page */
      half_pos=after_key;
924
      _ma_kpointer(info,
925 926
                   leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
                   next_page.pos);
927

928
      /* Save key in anc_page */
929 930 931 932 933
      prev_key= (first_key  ? (uchar*) 0 : anc_key.data);
      t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
                                     (keypos == anc_end_pos ? (uchar*) 0 :
                                      keypos),
                                     prev_key, prev_key, &anc_key_inserted);
934
      if (t_length >= 0)
935 936
	bmove_upp(anc_end_pos+t_length, anc_end_pos,
                  (uint) (anc_end_pos - keypos));
937
      else
938 939
	bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
      (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
940
      new_anc_length+= t_length;
941 942 943
      anc_page->size= new_anc_length;
      page_store_size(share, anc_page);

944 945
      if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                           SEARCH_PAGE_KEY_HAS_TRANSID))
946
        _ma_mark_page_with_transid(share, anc_page);
947

948
      /* Store key first in new page */
949
      if (nod_flag)
950
	bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
951
              (size_t) nod_flag);
952
      if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
953
	goto err;
954
      t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
unknown's avatar
unknown committed
955
					  (uchar*) 0, (uchar*) 0,
956
					  &key_inserted);
957
      /* t_length will always be > 0 for a new page !*/
958 959 960
      tmp_length= (size_t) ((next_page.buff + buff_length) - half_pos);
      bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
      (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
961
      new_buff_length= tmp_length + t_length + p_length;
962 963
      next_page.size= new_buff_length;
      page_store_size(share, &next_page);
964
      /* keypage flag is already up to date */
965

966
      if (share->now_transactional)
967 968 969 970 971 972 973 974 975
      {
        /*
          Log changes to parent page
          This has one key deleted from it and one key inserted to it at
          keypos

          ma_log_add ensures that we don't log changes that is outside of
          key block size, as the REDO code can't handle that
        */
976
        if (_ma_log_add(anc_page, anc_length, keypos,
977
                        anc_key_inserted.move_length +
978
                        max(anc_key_inserted.changed_length -
979 980 981
                            anc_key_inserted.move_length,
                            key_deleted.changed_length),
                        anc_key_inserted.move_length -
982 983
                        key_deleted.move_length, 1,
                        KEY_OP_DEBUG_LOG_ADD_3))
984
          goto err;
985

986 987 988 989 990
        /*
          Log changes to leaf page.
          This contains original data with new data added at end
        */
        DBUG_ASSERT(leaf_length <= new_leaf_length);
991
        if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
          goto err;
        /*
          Log changes to next page

          This contains original data with some prefix data deleted and
          some compressed data at start possible extended

          Data in buff was originally:
          org_leaf_buff     [leaf_length]
          separator_key     [buff_key_inserted.move_length]
          next_key_changes  [buff_key_inserted.changed_length -move_length]
          next_page_data    [next_buff_length - p_length -
                            (buff_key_inserted.changed_length -move_length)]

          After changes it's now:
          unpacked_key      [key_inserted.changed_length]
          next_suffix       [next_buff_length - key_inserted.changed_length]

        */
        DBUG_ASSERT(new_buff_length <= next_buff_length);
1012
        if (_ma_log_prefix(&next_page, key_inserted.changed_length,
1013 1014
                           (int) (new_buff_length - next_buff_length),
                           KEY_OP_DEBUG_LOG_PREFIX_1))
1015 1016
          goto err;
      }
1017 1018 1019
      page_mark_changed(info, &next_page);
      if (_ma_write_keypage(&next_page,
                            PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1020 1021
	goto err;
    }
1022

1023 1024 1025
    page_mark_changed(info, leaf_page);
    if (_ma_write_keypage(leaf_page,
                          PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1026
      goto err;
1027 1028 1029
    DBUG_RETURN(new_anc_length <=
                ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
                  (uint) keyinfo->underflow_block_length)));
1030 1031 1032 1033
  }

  DBUG_PRINT("test",("use left page"));

1034
  keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1035 1036
  if (!keypos)
    goto err;
1037 1038 1039
  next_page.pos= _ma_kpos(key_reflength,keypos);
  if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
                        PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1040
    goto err;
1041 1042 1043
  buff_length= next_page.size;
  endpos= next_page.buff + buff_length;
  DBUG_DUMP("prev", next_page.buff, next_page.size);
1044 1045

  /* find keys to make a big key-page */
1046
  bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1047
        key_reflength);
1048
  next_keypos=keypos;
1049 1050
  if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
                           &next_keypos))
1051
    goto err;
1052
  if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1053 1054
    goto err;

1055
  /* merge pages and put parting key from anc_page between */
1056 1057
  prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
  t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1058
				(leaf_length == p_length ?
unknown's avatar
unknown committed
1059
                                 (uchar*) 0 : leaf_buff+p_length),
1060
				prev_key, prev_key,
1061
				&key_inserted);
1062
  if (t_length >= 0)
unknown's avatar
unknown committed
1063 1064
    bmove(endpos+t_length, leaf_buff+p_length,
          (size_t) (leaf_length-p_length));
1065
  else						/* We gained space */
unknown's avatar
unknown committed
1066
    bmove(endpos,leaf_buff+((int) p_length-t_length),
1067
	  (size_t) (leaf_length-p_length+t_length));
1068
  (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1069

1070 1071
  /* Remember for logging how many bytes of leaf_buff that are not changed */
  DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1072 1073 1074
  unchanged_leaf_length= (leaf_length - p_length -
                          (key_inserted.changed_length -
                           key_inserted.move_length));
1075 1076

  new_buff_length= buff_length + leaf_length - p_length + t_length;
1077

1078 1079 1080 1081 1082 1083 1084
#ifdef EXTRA_DEBUG
  /* Ensure that unchanged_leaf_length is correct */
  DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
                   leaf_buff + leaf_length - unchanged_leaf_length,
                   unchanged_leaf_length) == 0);
#endif

1085
  page_flag= next_page.flag | leaf_page->flag;
1086 1087 1088 1089
  if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                       SEARCH_PAGE_KEY_HAS_TRANSID))
    page_flag|= KEYPAGE_FLAG_HAS_TRANSID;

1090 1091 1092 1093 1094
  next_page.size= new_buff_length;
  next_page.flag= page_flag;
  page_store_info(share, &next_page);

  /* remove key from anc_page */
1095 1096 1097
  if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
                             anc_key_buff,
                             anc_buff+anc_length, (my_off_t *) 0,
1098
                             &key_deleted)))
1099 1100
    goto err;

1101
  new_anc_length= anc_length - s_length;
1102 1103
  anc_page->size= new_anc_length;
  page_store_size(share, anc_page);
1104

1105
  if (new_buff_length <= share->max_index_block_size)
1106 1107
  {
    /* All keys fitted into one page */
1108 1109
    page_mark_changed(info, leaf_page);
    if (_ma_dispose(info, leaf_page->pos, 0))
1110
      goto err;
1111

1112
    if (share->now_transactional)
1113
    {
1114 1115 1116 1117
      /*
        Log changes to parent page. Note that this page may have been
        temporarily bigger than block_size.
      */
1118
      if (_ma_log_delete(anc_page, key_deleted.key_pos,
1119 1120 1121
                         key_deleted.changed_length, key_deleted.move_length,
                         anc_length - anc_page->org_size,
                         KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
1122 1123 1124 1125 1126
        goto err;
      /*
        Log changes to next page. Data for leaf page is in buff
        that contains original leaf_buff, parting key and next_buff
      */
1127
      if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1128 1129
        goto err;
    }
1130 1131
  }
  else
1132 1133
  {
    /*
1134
      Balancing didn't free a page, so we have to split 'next_page' into two
1135 1136
      pages
      - Find key in middle of buffer (buff)
1137 1138
      - Pack key at half_buff into anc_page at position of deleted key
        Note that anc_page may overflow! (is handled by caller)
1139 1140 1141 1142
      - Move everything after middlekey to 'leaf_buff'
      - Shorten buff at 'endpos'
    */
    MARIA_KEY_PARAM anc_key_inserted;
1143
    size_t tmp_length;
1144

1145
    if (keypos == anc_buff + share->keypage_header + key_reflength)
1146
      anc_pos= 0;				/* First key */
1147 1148
    else
    {
1149
      if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1150 1151 1152
        goto err;
      anc_pos= anc_key.data;
    }
1153
    if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1154
      goto err;
1155 1156

    /* Correct new keypointer to leaf_page */
1157
    _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1158
                 leaf_key.ref_length, leaf_page->pos);
1159

1160
    /* Save key in anc_page */
1161
    DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1162
    DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1163
    anc_end_pos= anc_buff + new_anc_length;
1164
    t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1165
				  keypos == anc_end_pos ? (uchar*) 0
1166 1167
				  : keypos,
				  anc_pos, anc_pos,
1168
				  &anc_key_inserted);
1169 1170 1171
    if (t_length >= 0)
      bmove_upp(anc_end_pos+t_length, anc_end_pos,
                (uint) (anc_end_pos-keypos));
1172
    else
1173 1174
      bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
    (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1175
    new_anc_length+= t_length;
1176 1177 1178
    anc_page->size= new_anc_length;
    page_store_size(share, anc_page);

1179 1180
    if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                         SEARCH_PAGE_KEY_HAS_TRANSID))
1181
      _ma_mark_page_with_transid(share, anc_page);
1182 1183 1184

    /* Store first key on new page */
    if (nod_flag)
1185
      bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1186
            (size_t) nod_flag);
1187
    if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1188
      goto err;
1189 1190 1191
    DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
    t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
				  (uchar*) 0, (uchar*) 0, &key_inserted);
1192
    /* t_length will always be > 0 for a new page !*/
1193
    tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1194 1195
    DBUG_PRINT("info",("t_length: %d  length: %d",t_length, (int) tmp_length));
    bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1196
    (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1197
    new_leaf_length= tmp_length + t_length + p_length;
1198 1199 1200 1201 1202 1203 1204 1205

    leaf_page->size= new_leaf_length;
    leaf_page->flag= page_flag;
    page_store_info(share, leaf_page);

    new_buff_length= (uint) (endpos - next_page.buff);
    next_page.size= new_buff_length;
    page_store_size(share, &next_page);
1206

1207
    if (share->now_transactional)
1208 1209 1210 1211 1212 1213 1214 1215 1216
    {
      /*
        Log changes to parent page
        This has one key deleted from it and one key inserted to it at
        keypos

        ma_log_add() ensures that we don't log changes that is outside of
        key block size, as the REDO code can't handle that
      */
1217
      if (_ma_log_add(anc_page, anc_length, keypos,
1218
                      anc_key_inserted.move_length +
1219
                      max(anc_key_inserted.changed_length -
1220 1221 1222
                          anc_key_inserted.move_length,
                          key_deleted.changed_length),
                      anc_key_inserted.move_length -
1223
                      key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
1224 1225 1226 1227 1228 1229 1230
        goto err;

      /*
        Log changes to leaf page.
        This contains original data with new data added first
      */
      DBUG_ASSERT(leaf_length <= new_leaf_length);
1231
      DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
1232
      if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1233 1234
                         (int) (new_leaf_length - leaf_length),
                         KEY_OP_DEBUG_LOG_PREFIX_2))
1235 1236 1237 1238 1239 1240 1241
        goto err;
      /*
        Log changes to next page
        This contains original data with some suffix data deleted

      */
      DBUG_ASSERT(new_buff_length <= buff_length);
1242
      if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1243 1244 1245
        goto err;
    }

1246 1247 1248
    page_mark_changed(info, leaf_page);
    if (_ma_write_keypage(leaf_page,
                          PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1249 1250
      goto err;
  }
1251 1252 1253
  page_mark_changed(info, &next_page);
  if (_ma_write_keypage(&next_page,
                        PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1254
    goto err;
1255 1256 1257 1258

  DBUG_RETURN(new_anc_length <=
              ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
                (uint) keyinfo->underflow_block_length)));
1259 1260 1261 1262 1263 1264

err:
  DBUG_RETURN(-1);
} /* underflow */


1265
/**
1266 1267 1268 1269
  @brief Remove a key from page

  @fn remove_key()
    keyinfo	          Key handle
1270
    nod_flag              Length of node ptr
1271
    keypos	          Where on page key starts
1272
    lastkey	          Buffer for storing keys to be removed
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
    page_end	          Pointer to end of page
    next_block	          If <> 0 and node-page, this is set to address of
    		          next page
    s_temp	          Information about what changes was done one the page:
    s_temp.key_pos        Start of key
    s_temp.move_length    Number of bytes removed at keypos
    s_temp.changed_length Number of bytes changed at keypos

  @todo
    The current code doesn't handle the case that the next key may be
    packed better against the previous key if there is a case difference

  @return
  @retval 0  error
  @retval #  How many chars was removed
*/
1289

1290
static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1291 1292 1293
		       uchar *keypos, uchar *lastkey,
		       uchar *page_end, my_off_t *next_block,
                       MARIA_KEY_PARAM *s_temp)
1294 1295
{
  int s_length;
unknown's avatar
unknown committed
1296
  uchar *start;
1297
  DBUG_ENTER("remove_key");
1298 1299
  DBUG_PRINT("enter", ("keypos: 0x%lx  page_end: 0x%lx",
                       (long) keypos, (long) page_end));
1300

1301 1302
  start= s_temp->key_pos= keypos;
  s_temp->changed_length= 0;
1303 1304
  if (!(keyinfo->flag &
	(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1305 1306
	 HA_BINARY_PACK_KEY)) &&
      !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1307
  {
1308
    /* Static length key */
1309 1310 1311 1312 1313
    s_length=(int) (keyinfo->keylength+nod_flag);
    if (next_block && nod_flag)
      *next_block= _ma_kpos(nod_flag,keypos+s_length);
  }
  else
1314 1315 1316 1317
  {
    /* Let keypos point at next key */
    MARIA_KEY key;

1318
    /* Calculate length of key */
1319 1320 1321
    key.keyinfo= keyinfo;
    key.data=    lastkey;
    if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1322 1323 1324 1325 1326 1327 1328 1329 1330
      DBUG_RETURN(0);				/* Error */

    if (next_block && nod_flag)
      *next_block= _ma_kpos(nod_flag,keypos);
    s_length=(int) (keypos-start);
    if (keypos != page_end)
    {
      if (keyinfo->flag & HA_BINARY_PACK_KEY)
      {
1331
	uchar *old_key= start;
1332
	uint next_length,prev_length,prev_pack_length;
1333 1334

        /* keypos points here on start of next key */
1335 1336 1337 1338
	get_key_length(next_length,keypos);
	get_key_pack_length(prev_length,prev_pack_length,old_key);
	if (next_length > prev_length)
	{
1339
          uint diff= (next_length-prev_length);
1340
	  /* We have to copy data from the current key to the next key */
1341 1342 1343
	  keypos-= diff + prev_pack_length;
	  store_key_length(keypos, prev_length);
          bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1344
	  s_length=(int) (keypos-start);
1345
          s_temp->changed_length= diff + prev_pack_length;
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
	}
      }
      else
      {
	/* Check if a variable length first key part */
	if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
	{
	  /* Next key is packed against the current one */
	  uint next_length,prev_length,prev_pack_length,lastkey_length,
	    rest_length;
	  if (keyinfo->seg[0].length >= 127)
	  {
	    if (!(prev_length=mi_uint2korr(start) & 32767))
	      goto end;
	    next_length=mi_uint2korr(keypos) & 32767;
	    keypos+=2;
	    prev_pack_length=2;
	  }
	  else
	  {
	    if (!(prev_length= *start & 127))
	      goto end;				/* Same key as previous*/
	    next_length= *keypos & 127;
	    keypos++;
	    prev_pack_length=1;
	  }
	  if (!(*start & 128))
	    prev_length=0;			/* prev key not packed */
	  if (keyinfo->seg[0].flag & HA_NULL_PART)
	    lastkey++;				/* Skip null marker */
	  get_key_length(lastkey_length,lastkey);
	  if (!next_length)			/* Same key after */
	  {
	    next_length=lastkey_length;
	    rest_length=0;
	  }
	  else
	    get_key_length(rest_length,keypos);

	  if (next_length >= prev_length)
1386 1387 1388 1389 1390
	  {
            /* Next key is based on deleted key */
            uint pack_length;
            uint diff= (next_length-prev_length);

1391
            /* keypos points to data of next key (after key length) */
1392 1393
	    bmove(keypos - diff, lastkey + prev_length, diff);
	    rest_length+= diff;
1394
	    pack_length= prev_length ? get_pack_length(rest_length): 0;
1395
	    keypos-= diff + pack_length + prev_pack_length;
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
	    s_length=(int) (keypos-start);
	    if (prev_length)			/* Pack against prev key */
	    {
	      *keypos++= start[0];
	      if (prev_pack_length == 2)
		*keypos++= start[1];
	      store_key_length(keypos,rest_length);
	    }
	    else
	    {
	      /* Next key is not packed anymore */
	      if (keyinfo->seg[0].flag & HA_NULL_PART)
	      {
		rest_length++;			/* Mark not null */
	      }
	      if (prev_pack_length == 2)
	      {
		mi_int2store(keypos,rest_length);
	      }
	      else
		*keypos= rest_length;
	    }
1418
            s_temp->changed_length= diff + pack_length + prev_pack_length;
1419 1420 1421 1422 1423 1424
	  }
	}
      }
    }
  }
  end:
unknown's avatar
unknown committed
1425
  bmove(start, start+s_length, (uint) (page_end-start-s_length));
1426
  s_temp->move_length= s_length;
1427 1428
  DBUG_RETURN((uint) s_length);
} /* remove_key */
1429 1430 1431 1432 1433 1434


/****************************************************************************
  Logging of redos
****************************************************************************/

1435
/**
1436 1437 1438
   @brief
   log entry where some parts are deleted and some things are changed
   and some data could be added last.
1439 1440 1441 1442 1443 1444 1445 1446

   @fn _ma_log_delete()
   @param info		  Maria handler
   @param page	          Pageaddress for changed page
   @param buff		  Page buffer
   @param key_pos         Start of change area
   @param changed_length  How many bytes where changed at key_pos
   @param move_length     How many bytes where deleted at key_pos
1447 1448
   @param append_length	  Length of data added last
		          This is taken from end of ma_page->buff
1449

1450 1451 1452
   This is mainly used when a key is deleted. The append happens
   when we delete a key from a page with data > block_size kept in
   memory and we have to add back the data that was stored > block_size
1453 1454
*/

1455
my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1456
                       uint changed_length, uint move_length,
1457
                       uint append_length __attribute__((unused)),
1458
                       enum en_key_debug debug_marker __attribute__((unused)))
1459 1460
{
  LSN lsn;
1461
  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
1462 1463 1464
  uchar *log_pos;
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
  uint translog_parts, current_size, extra_length;
1465 1466 1467
  uint offset= (uint) (key_pos - ma_page->buff);
  MARIA_HA *info= ma_page->info;
  MARIA_SHARE *share= info->s;
1468
  my_off_t page= ma_page->pos / share->block_size;
1469
  DBUG_ENTER("_ma_log_delete");
1470
  DBUG_PRINT("enter", ("page: %lu  changed_length: %u  move_length: %d",
1471
                       (ulong) page, changed_length, move_length));
1472
  DBUG_ASSERT(share->now_transactional && move_length);
1473
  DBUG_ASSERT(offset + changed_length <= ma_page->size);
1474 1475
  DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
  DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
1476 1477 1478 1479

  /* Store address of new root page */
  page_store(log_data + FILEID_STORE_SIZE, page);
  log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1480 1481 1482 1483 1484
  current_size= ma_page->org_size;

#ifdef EXTRA_DEBUG_KEY_CHANGES
  *log_pos++= KEY_OP_DEBUG;
  *log_pos++= debug_marker;
1485 1486 1487 1488 1489

  *log_pos++= KEY_OP_DEBUG_2;
  int2store(log_pos,   ma_page->org_size);
  int2store(log_pos+2, ma_page->size);
  log_pos+=4;
1490 1491 1492 1493 1494 1495
#endif

  /* Store keypage_flag */
  *log_pos++= KEY_OP_SET_PAGEFLAG;
  *log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];

1496 1497
  log_pos[0]= KEY_OP_OFFSET;
  int2store(log_pos+1, offset);
1498 1499 1500 1501
  log_pos+= 3;
  translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
  extra_length= 0;

1502
  if (changed_length)
1503
  {
1504 1505 1506 1507 1508 1509 1510
    if (offset + changed_length >= share->max_index_block_size)
    {
      changed_length= share->max_index_block_size - offset;
      move_length= 0;                           /* Nothing to move */
      current_size= share->max_index_block_size;
    }

1511
    log_pos[0]= KEY_OP_CHANGE;
1512
    int2store(log_pos+1, changed_length);
1513
    log_pos+= 3;
1514 1515 1516 1517 1518 1519
    log_array[translog_parts].str=    ma_page->buff + offset;
    log_array[translog_parts].length= changed_length;
    translog_parts++;

    /* We only have to move things after offset+changed_length */
    offset+= changed_length;
1520 1521
  }

1522 1523 1524 1525
  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);

  if (move_length)
1526
  {
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550
    uint log_length;
    if (offset + move_length < share->max_index_block_size)
    {
      /*
        Move down things that is on page.
        page_offset in apply_redo_inxed() will be at original offset
        + changed_length.
      */
      log_pos[0]= KEY_OP_SHIFT;
      int2store(log_pos+1, - (int) move_length);
      log_length= 3;
      current_size-= move_length;
    }
    else
    {
      /* Delete to end of page */
      uint tmp= current_size - offset;
      current_size= offset;
      log_pos[0]= KEY_OP_DEL_SUFFIX;
      int2store(log_pos+1, tmp);
      log_length= 3;
    }
    log_array[translog_parts].str=    log_pos;
    log_array[translog_parts].length= log_length;
1551
    translog_parts++;
1552 1553
    log_pos+= log_length;
    extra_length+= log_length;
1554 1555
  }

1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
  if (current_size != ma_page->size &&
      current_size != share->max_index_block_size)
  {
    /* Append data that didn't fit on the page before */
    uint length= (min(ma_page->size, share->max_index_block_size) -
                  current_size);
    uchar *data= ma_page->buff + current_size;

    DBUG_ASSERT(length <= append_length);

    log_pos[0]= KEY_OP_ADD_SUFFIX;
    int2store(log_pos+1, length);
    log_array[translog_parts].str=        log_pos;
    log_array[translog_parts].length=     3;
    log_array[translog_parts + 1].str=    data;
    log_array[translog_parts + 1].length= length;
    log_pos+= 3;
    translog_parts+= 2;
    current_size+= length;
    extra_length+= 3 + length;
  }

  _ma_log_key_changes(ma_page,
                      log_array + translog_parts,
                      log_pos, &extra_length, &translog_parts);
  /* Remember new page length for future log entires for same page */
  ma_page->org_size= current_size;
1583 1584 1585

  if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                            info->trn, info,
1586
                            (translog_size_t)
1587 1588
                            log_array[TRANSLOG_INTERNAL_PARTS].length +
                            changed_length + extra_length, translog_parts,
1589 1590
                            log_array, log_data, NULL))
    DBUG_RETURN(1);
1591

1592 1593
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1594 1595 1596 1597 1598 1599


/****************************************************************************
  Logging of undos
****************************************************************************/

1600 1601
my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
                                  my_off_t new_root, LSN *res_lsn)
unknown's avatar
unknown committed
1602 1603 1604 1605 1606 1607 1608
{
  MARIA_SHARE *share= info->s;
  uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
                 KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
  struct st_msg_to_write_hook_for_undo_key msg;
  enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1609
  uint keynr= key->keyinfo->key_nr;
unknown's avatar
unknown committed
1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630

  lsn_store(log_data, info->trn->undo_lsn);
  key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
  log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;

  /**
    @todo BUG if we had concurrent insert/deletes, reading state's key_root
    like this would be unsafe.
  */
  if (new_root != share->state.key_root[keynr])
  {
    my_off_t page;
    page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
           new_root / share->block_size);
    page_store(log_pos, page);
    log_pos+= PAGE_STORE_SIZE;
    log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
  }

  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1631 1632 1633
  log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key->data;
  log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
                                                  key->ref_length);
unknown's avatar
unknown committed
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645

  msg.root= &share->state.key_root[keynr];
  msg.value= new_root;
  /*
    set autoincrement to 1 if this is an auto_increment key
    This is only used if we are now in a rollback of a duplicate key
  */
  msg.auto_increment= share->base.auto_key == keynr + 1;

  return translog_write_record(res_lsn, log_type,
                               info->trn, info,
                               (translog_size_t)
1646 1647
                               (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
                                log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
unknown's avatar
unknown committed
1648 1649 1650
                               TRANSLOG_INTERNAL_PARTS + 2, log_array,
                               log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
}