sql_insert.cc 105 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000-2006 MySQL AB
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
   the Free Software Foundation; version 2 of the License.
unknown's avatar
unknown committed
6

unknown's avatar
unknown committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
11

unknown's avatar
unknown committed
12 13 14 15 16 17 18
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
  delayed_insert::get_local_table() the table of the thread is copied
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

unknown's avatar
unknown committed
57
#include "mysql_priv.h"
58 59
#include "sp_head.h"
#include "sql_trigger.h"
60
#include "sql_select.h"
61
#include "sql_show.h"
unknown's avatar
unknown committed
62

unknown's avatar
unknown committed
63
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
64
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
65 66
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
                         LEX_STRING query, bool ignore, bool log_on);
unknown's avatar
unknown committed
67
static void end_delayed_insert(THD *thd);
68
pthread_handler_t handle_delayed_insert(void *arg);
unknown's avatar
unknown committed
69
static void unlink_blobs(register TABLE *table);
unknown's avatar
unknown committed
70
#endif
71
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
unknown's avatar
unknown committed
72 73 74 75 76 77 78 79 80 81 82

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
/*
  Check that insert/update fields are from the same single table of a view.

  SYNOPSIS
    check_view_single_update()
    fields            The insert/update fields to be checked.
    view              The view for insert.
    map     [in/out]  The insert table map.

  DESCRIPTION
    This function is called in 2 cases:
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

  RETURN
    0   OK
    1   Error
*/

bool check_view_single_update(List<Item> &fields, TABLE_LIST *view,
                              table_map *map)
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

  view->table= tbl->table;
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

142

unknown's avatar
unknown committed
143
/*
144
  Check if insert fields are correct.
145 146 147 148 149 150 151

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
unknown's avatar
unknown committed
152
    check_unique                If duplicate values should be rejected.
153 154 155 156 157 158 159 160 161

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
unknown's avatar
unknown committed
162 163
*/

unknown's avatar
unknown committed
164 165
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
166
                               bool check_unique, table_map *map)
unknown's avatar
unknown committed
167
{
unknown's avatar
VIEW  
unknown committed
168
  TABLE *table= table_list->table;
169

170 171
  if (!table_list->updatable)
  {
172
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
173 174 175
    return -1;
  }

unknown's avatar
unknown committed
176 177
  if (fields.elements == 0 && values.elements != 0)
  {
178 179 180 181 182 183
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
184
    if (values.elements != table->s->fields)
unknown's avatar
unknown committed
185
    {
unknown's avatar
unknown committed
186
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
187 188
      return -1;
    }
unknown's avatar
unknown committed
189
#ifndef NO_EMBEDDED_ACCESS_CHECKS
190
    if (grant_option)
unknown's avatar
VIEW  
unknown committed
191
    {
192 193
      Field_iterator_table field_it;
      field_it.set_table(table);
194
      if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
unknown's avatar
unknown committed
195
                                  table->s->db.str, table->s->table_name.str,
196
                                  &field_it))
unknown's avatar
VIEW  
unknown committed
197 198
        return -1;
    }
unknown's avatar
unknown committed
199
#endif
200 201
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
202 203 204 205
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
206
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
207 208 209
  }
  else
  {						// Part field list
unknown's avatar
unknown committed
210 211
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
212
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
213
    int res;
unknown's avatar
unknown committed
214

unknown's avatar
unknown committed
215 216
    if (fields.elements != values.elements)
    {
unknown's avatar
unknown committed
217
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
218 219 220
      return -1;
    }

221
    thd->dup_field= 0;
unknown's avatar
unknown committed
222 223 224
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
225
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
226 227 228 229 230

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
231
    table_list->next_local= 0;
232
    context->resolve_in_table_list_only(table_list);
233
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
unknown's avatar
unknown committed
234 235

    /* Restore the current context. */
236
    ctx_state.restore_state(context, table_list);
237
    thd->lex->select_lex.no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
238

unknown's avatar
unknown committed
239
    if (res)
unknown's avatar
unknown committed
240
      return -1;
unknown's avatar
unknown committed
241

unknown's avatar
unknown committed
242
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
243
    {
244
      if (check_view_single_update(fields, table_list, map))
245
        return -1;
246
      table= table_list->table;
247
    }
248

249
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
250
    {
251
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
unknown's avatar
unknown committed
252 253
      return -1;
    }
254 255 256 257 258 259 260 261 262 263 264 265
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
unknown's avatar
unknown committed
266
  }
unknown's avatar
unknown committed
267
  // For the values we need select_priv
unknown's avatar
unknown committed
268
#ifndef NO_EMBEDDED_ACCESS_CHECKS
269
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
unknown committed
270
#endif
271 272 273

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
274
       check_view_insertability(thd, table_list)))
275
  {
276
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
277 278 279
    return -1;
  }

unknown's avatar
unknown committed
280 281 282 283
  return 0;
}


284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

unknown's avatar
unknown committed
303
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
304
                               List<Item> &update_fields, table_map *map)
305
{
unknown's avatar
unknown committed
306
  TABLE *table= insert_table_list->table;
307
  my_bool timestamp_mark;
308

309 310
  LINT_INIT(timestamp_mark);

311 312
  if (table->timestamp_field)
  {
313 314 315 316 317 318
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
319 320
  }

321 322
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
323 324
    return -1;

325 326 327 328
  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
      check_view_single_update(update_fields, insert_table_list, map))
    return -1;

329 330 331
  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
332 333
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
334 335
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
336 337 338
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
339 340 341 342 343
  }
  return 0;
}


unknown's avatar
unknown committed
344 345 346 347 348
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
349 350
                  enum_duplicates duplic,
		  bool ignore)
unknown's avatar
unknown committed
351
{
352
  int error, res;
353
  bool transactional_table, joins_freed= FALSE;
354
  bool changed;
unknown's avatar
unknown committed
355 356 357 358
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
359
  TABLE *table= 0;
unknown's avatar
unknown committed
360
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
361
  List_item *values;
unknown's avatar
unknown committed
362
  Name_resolution_context *context;
363
  Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
364 365
#ifndef EMBEDDED_LIBRARY
  char *query= thd->query;
unknown's avatar
unknown committed
366 367 368 369 370 371
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
unknown's avatar
unknown committed
372
                (!(thd->security_ctx->master_access & SUPER_ACL)));
unknown's avatar
unknown committed
373
#endif
unknown's avatar
unknown committed
374
  thr_lock_type lock_type = table_list->lock_type;
unknown's avatar
unknown committed
375
  Item *unused_conds= 0;
unknown's avatar
unknown committed
376 377
  DBUG_ENTER("mysql_insert");

378 379 380 381 382
  /*
    in safe mode or with skip-new change delayed insert to be regular
    if we are told to replace duplicates, the insert cannot be concurrent
    delayed insert changed to regular in slave thread
   */
383 384 385 386
#ifdef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
    lock_type=TL_WRITE;
#else
unknown's avatar
unknown committed
387 388
  if ((lock_type == TL_WRITE_DELAYED &&
       ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
389
	thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
390 391
      (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
      (duplic == DUP_UPDATE))
unknown's avatar
unknown committed
392
    lock_type=TL_WRITE;
393
#endif
394
  table_list->lock_type= lock_type;
unknown's avatar
unknown committed
395

396
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
397 398 399 400
  if (lock_type == TL_WRITE_DELAYED)
  {
    if (thd->locked_tables)
    {
unknown's avatar
unknown committed
401 402
      DBUG_ASSERT(table_list->db); /* Must be set in the parser */
      if (find_locked_table(thd, table_list->db, table_list->table_name))
unknown's avatar
unknown committed
403
      {
404
	my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
405
                 table_list->table_name);
unknown's avatar
unknown committed
406
	DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
407 408
      }
    }
409
    if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
410
    {
411 412 413 414 415
      /*
        Open tables used for sub-selects or in stored functions, will also
        cache these functions.
      */
      res= open_and_lock_tables(thd, table_list->next_global);
unknown's avatar
VIEW  
unknown committed
416 417 418 419 420 421
      /*
	First is not processed by open_and_lock_tables() => we need set
	updateability flags "by hands".
      */
      if (!table_list->derived && !table_list->view)
        table_list->updatable= 1;  // usual table
422
    }
423
    else
424
    {
425 426
      /* Too many delayed insert threads;  Use a normal insert */
      table_list->lock_type= lock_type= TL_WRITE;
427
      res= open_and_lock_tables(thd, table_list);
428
    }
unknown's avatar
unknown committed
429 430
  }
  else
431
#endif /* EMBEDDED_LIBRARY */
432
    res= open_and_lock_tables(thd, table_list);
433
  if (res || thd->is_fatal_error)
unknown's avatar
unknown committed
434
    DBUG_RETURN(TRUE);
435

unknown's avatar
unknown committed
436
  thd->proc_info="init";
437
  thd->used_tables=0;
unknown's avatar
unknown committed
438
  values= its++;
unknown's avatar
unknown committed
439

unknown's avatar
VIEW  
unknown committed
440
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
unknown's avatar
unknown committed
441 442
			   update_fields, update_values, duplic, &unused_conds,
                           FALSE))
unknown's avatar
unknown committed
443
    goto abort;
444

445 446 447
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

unknown's avatar
unknown committed
448
  context= &thd->lex->select_lex.context;
449 450 451 452 453 454 455 456 457
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

unknown's avatar
unknown committed
458
  /* Save the state of the current name resolution context. */
459
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
460 461 462 463 464

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
465
  table_list->next_local= 0;
unknown's avatar
unknown committed
466 467
  context->resolve_in_table_list_only(table_list);

unknown's avatar
unknown committed
468
  value_count= values->elements;
469
  while ((values= its++))
unknown's avatar
unknown committed
470 471 472 473
  {
    counter++;
    if (values->elements != value_count)
    {
474
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
unknown's avatar
unknown committed
475 476
      goto abort;
    }
477
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
478 479 480
      goto abort;
  }
  its.rewind ();
unknown's avatar
unknown committed
481 482
 
  /* Restore the current context. */
483
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
484

unknown's avatar
unknown committed
485
  /*
unknown's avatar
unknown committed
486
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
487
  */
unknown's avatar
unknown committed
488
  info.records= info.deleted= info.copied= info.updated= 0;
489
  info.ignore= ignore;
unknown's avatar
unknown committed
490
  info.handle_duplicates=duplic;
491 492
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
493
  info.view= (table_list->view ? table_list : 0);
494

495 496 497
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
498
    to NULL.
499
  */
unknown's avatar
unknown committed
500
  thd->count_cuted_fields= ((values_list.elements == 1 &&
unknown's avatar
unknown committed
501
                             !ignore) ?
502 503
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
unknown's avatar
unknown committed
504 505 506 507 508
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

  error=0;
  thd->proc_info="update";
unknown's avatar
unknown committed
509
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
510
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
511 512 513
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
514 515 516 517
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
518 519 520 521
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
522 523 524 525
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
526
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
527
    table->file->ha_start_bulk_insert(values_list.elements);
528

unknown's avatar
unknown committed
529
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
530
  thd->abort_on_warning= (!ignore &&
unknown's avatar
unknown committed
531 532 533 534
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));

535
  if ((fields.elements || !value_count) &&
536
      check_that_all_fields_are_given_values(thd, table, table_list))
unknown's avatar
unknown committed
537 538 539 540 541
  {
    /* thd->net.report_error is now set, which will abort the next loop */
    error= 1;
  }

542
  table->mark_columns_needed_for_insert();
543

544 545 546 547
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

unknown's avatar
unknown committed
548
  while ((values= its++))
unknown's avatar
unknown committed
549 550 551
  {
    if (fields.elements || !value_count)
    {
552
      restore_record(table,s->default_values);	// Get empty record
unknown's avatar
unknown committed
553 554 555
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
556
      {
unknown's avatar
unknown committed
557
	if (values_list.elements != 1 && !thd->net.report_error)
unknown's avatar
unknown committed
558 559 560 561
	{
	  info.records++;
	  continue;
	}
unknown's avatar
unknown committed
562 563 564 565 566
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
unknown's avatar
unknown committed
567 568 569 570 571 572
	error=1;
	break;
      }
    }
    else
    {
573
      if (thd->used_tables)			// Column used in values()
574
	restore_record(table,s->default_values);	// Get empty record
575
      else
unknown's avatar
unknown committed
576 577 578 579 580 581 582 583 584 585 586
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
587
      {
unknown's avatar
unknown committed
588
	if (values_list.elements != 1 && ! thd->net.report_error)
unknown's avatar
unknown committed
589 590 591 592 593 594 595 596
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
597

598 599 600
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
601
					     ignore))) ==
unknown's avatar
unknown committed
602 603 604
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
605
    {
unknown's avatar
unknown committed
606 607
      error= 1;
      break;
unknown's avatar
unknown committed
608
    }
609
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
610
    if (lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
611
    {
612 613
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
unknown's avatar
unknown committed
614 615 616
      query=0;
    }
    else
617
#endif
unknown's avatar
unknown committed
618
      error=write_record(thd, table ,&info);
619 620
    if (error)
      break;
621
    thd->row_count++;
unknown's avatar
unknown committed
622
  }
623

624 625 626
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

627 628 629 630
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
631
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
632 633
  if (lock_type == TL_WRITE_DELAYED)
  {
634 635 636 637 638
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
639
    query_cache_invalidate3(thd, table_list, 1);
unknown's avatar
unknown committed
640 641
  }
  else
642
#endif
unknown's avatar
unknown committed
643
  {
644 645 646 647 648
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
649
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
650
    {
unknown's avatar
unknown committed
651 652
      table->file->print_error(my_errno,MYF(0));
      error=1;
653
    }
654
    transactional_table= table->file->has_transactions();
unknown's avatar
unknown committed
655

656
    if ((changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
657
    {
658 659 660 661 662 663 664
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
665
      {
666
        if (mysql_bin_log.is_open())
667
        {
668 669
          if (error <= 0)
            thd->clear_error();
unknown's avatar
unknown committed
670 671 672 673 674
          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                thd->query, thd->query_length,
                                transactional_table, FALSE) &&
              transactional_table)
          {
675
            error=1;
unknown's avatar
unknown committed
676
          }
677
        }
678 679
        if (!transactional_table)
          thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
680
      }
unknown's avatar
unknown committed
681
    }
682
    if (transactional_table)
683
      error=ha_autocommit_or_rollback(thd,error);
unknown's avatar
unknown committed
684

unknown's avatar
unknown committed
685 686 687
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
688 689 690 691 692 693 694 695 696 697
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
unknown's avatar
unknown committed
698 699 700 701
      thd->lock=0;
    }
  }
  thd->proc_info="end";
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
unknown's avatar
unknown committed
718
  table->next_number_field=0;
719
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
720
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
721
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
722 723 724
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
725

unknown's avatar
unknown committed
726 727 728 729
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
730 731
  {
    thd->row_count_func= info.copied+info.deleted+info.updated;
732
    send_ok(thd, (ulong) thd->row_count_func, id);
733
  }
734 735
  else
  {
unknown's avatar
unknown committed
736
    char buff[160];
737
    if (ignore)
738 739 740
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
741
    else
742
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
743
	      (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
744
    thd->row_count_func= info.copied+info.deleted+info.updated;
745
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
746
  }
747
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
748
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
749 750

abort:
751
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
752 753
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
754
#endif
755
  if (table != NULL)
756
    table->file->ha_release_auto_increment();
757 758
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
unknown's avatar
unknown committed
759
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
760
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
761 762 763
}


unknown's avatar
VIEW  
unknown committed
764 765 766 767 768
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
769
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
770 771
    view    - reference on VIEW

772 773 774 775 776 777
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
778 779
  RETURN
    FALSE - OK
780 781 782
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
783 784 785
    TRUE  - can't be used for insert
*/

786
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
unknown's avatar
VIEW  
unknown committed
787
{
788
  uint num= view->view->select_lex.item_list.elements;
unknown's avatar
VIEW  
unknown committed
789
  TABLE *table= view->table;
790 791 792
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
unknown committed
793
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
unknown's avatar
unknown committed
794
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
795
  MY_BITMAP used_fields;
unknown's avatar
unknown committed
796
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
797 798
  DBUG_ENTER("check_key_in_view");

799 800 801
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
802 803
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

804
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
805 806
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
807
  view->contain_auto_increment= 0;
808 809 810 811
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
unknown's avatar
unknown committed
812
  thd->mark_used_columns= MARK_COLUMNS_NONE;
unknown's avatar
VIEW  
unknown committed
813
  /* check simplicity and prepare unique test of view */
814
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
815
  {
816
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
817
    {
unknown's avatar
unknown committed
818
      thd->mark_used_columns= save_mark_used_columns;
819 820
      DBUG_RETURN(TRUE);
    }
821
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
822
    /* simple SELECT list entry (field without expression) */
unknown's avatar
merge  
unknown committed
823
    if (!(field= trans->item->filed_for_view_update()))
824
    {
unknown's avatar
unknown committed
825
      thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
826
      DBUG_RETURN(TRUE);
827
    }
828
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
829 830
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
831 832 833 834 835
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
836
  }
unknown's avatar
unknown committed
837
  thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
838
  /* unique test */
839
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
840
  {
841
    /* Thanks to test above, we know that all columns are of type Item_field */
842
    Item_field *field= (Item_field *)trans->item;
843 844 845
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
846 847 848 849 850 851 852
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


unknown's avatar
unknown committed
853
/*
854
  Check if table can be updated
unknown's avatar
unknown committed
855 856

  SYNOPSIS
857 858
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
859
     table_list		Table list
860 861
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
862
     select_insert      Check is making for SELECT ... INSERT
863 864

   RETURN
unknown's avatar
unknown committed
865 866
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
867
*/
868

unknown's avatar
unknown committed
869
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
870
                                             List<Item> &fields,
unknown's avatar
unknown committed
871
                                             bool select_insert)
unknown's avatar
unknown committed
872
{
unknown's avatar
VIEW  
unknown committed
873
  bool insert_into_view= (table_list->view != 0);
874
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
875

876 877 878 879 880 881 882
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

883 884
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
885
                                    table_list,
886
                                    &thd->lex->select_lex.leaf_tables,
887
                                    select_insert, INSERT_ACL, SELECT_ACL))
unknown's avatar
unknown committed
888
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
889 890 891 892

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
893 894 895 896
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
897
      DBUG_RETURN(TRUE);
898
    }
899
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
900 901
  }

unknown's avatar
unknown committed
902
  DBUG_RETURN(FALSE);
903 904 905 906 907 908 909 910 911 912
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
unknown's avatar
unknown committed
913 914
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
unknown's avatar
unknown committed
915 916
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
917

unknown's avatar
unknown committed
918 919 920 921 922
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
923

924 925 926
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
unknown's avatar
unknown committed
927
  
928
  RETURN VALUE
unknown's avatar
unknown committed
929 930
    FALSE OK
    TRUE  error
931 932
*/

unknown's avatar
unknown committed
933
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
unknown's avatar
unknown committed
934
                          TABLE *table, List<Item> &fields, List_item *values,
unknown's avatar
unknown committed
935
                          List<Item> &update_fields, List<Item> &update_values,
unknown's avatar
unknown committed
936 937
                          enum_duplicates duplic,
                          COND **where, bool select_insert)
938
{
unknown's avatar
unknown committed
939
  SELECT_LEX *select_lex= &thd->lex->select_lex;
unknown's avatar
unknown committed
940
  Name_resolution_context *context= &select_lex->context;
941
  Name_resolution_context_state ctx_state;
942
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
943
  bool res= 0;
944
  table_map map= 0;
945
  DBUG_ENTER("mysql_prepare_insert");
unknown's avatar
unknown committed
946 947 948
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
949 950
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
unknown's avatar
unknown committed
951

952 953 954 955 956 957 958
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
unknown's avatar
unknown committed
959
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
960 961 962 963 964 965 966 967 968 969 970 971
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

unknown's avatar
unknown committed
972
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
973 974
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
975
    if (table_list->set_insert_values(thd->mem_root))
unknown's avatar
unknown committed
976
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
977
  }
unknown's avatar
unknown committed
978

979
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
unknown's avatar
unknown committed
980
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
981

unknown's avatar
unknown committed
982 983

  /* Prepare the fields in the statement. */
984
  if (values)
unknown's avatar
unknown committed
985
  {
986 987 988 989 990 991
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

992
    /*
993 994 995 996 997 998 999 1000
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

    if (!(res= check_insert_fields(thd, context->table_list, fields, *values,
                                 !insert_into_view, &map) ||
1001
          setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) 
1002
        && duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1003
    {
1004 1005 1006
      select_lex->no_wrap_view_item= TRUE;
      res= check_update_fields(thd, context->table_list, update_fields, &map);
      select_lex->no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
1007
    }
1008 1009 1010 1011

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);

unknown's avatar
unknown committed
1012
    if (!res)
1013
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
unknown's avatar
unknown committed
1014
  }
unknown's avatar
unknown committed
1015

unknown's avatar
unknown committed
1016 1017
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
1018

unknown's avatar
unknown committed
1019 1020 1021
  if (!table)
    table= table_list->table;

1022
  if (!select_insert)
unknown's avatar
unknown committed
1023
  {
1024
    Item *fake_conds= 0;
1025
    TABLE_LIST *duplicate;
1026
    if ((duplicate= unique_table(thd, table_list, table_list->next_global)))
1027
    {
1028
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1029 1030
      DBUG_RETURN(TRUE);
    }
1031
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
unknown's avatar
unknown committed
1032
    select_lex->first_execution= 0;
unknown's avatar
unknown committed
1033
  }
1034
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1035
    table->prepare_for_position();
unknown's avatar
unknown committed
1036
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
1037 1038 1039
}


unknown's avatar
unknown committed
1040 1041 1042 1043
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
1044
  while (++keynr < table->s->keys)
unknown's avatar
unknown committed
1045 1046 1047 1048 1049 1050 1051
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
unknown's avatar
unknown committed
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
unknown's avatar
unknown committed
1062

unknown's avatar
unknown committed
1063 1064 1065 1066 1067
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
1068

unknown's avatar
unknown committed
1069 1070 1071 1072 1073 1074
    Sets thd->no_trans_update if table which is updated didn't have
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
1075 1076 1077
*/


unknown's avatar
unknown committed
1078
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
unknown's avatar
unknown committed
1079
{
unknown's avatar
unknown committed
1080
  int error, trg_error= 0;
unknown's avatar
unknown committed
1081
  char *key=0;
1082
  MY_BITMAP *save_read_set, *save_write_set;
1083 1084
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1085
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
1086

unknown's avatar
unknown committed
1087
  info->records++;
1088 1089
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1090

1091 1092
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1093
  {
1094
    while ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1095
    {
1096
      uint key_nr;
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1108
      bool is_duplicate_key_error;
1109
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1110
	goto err;
1111
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
unknown's avatar
unknown committed
1112 1113
      if (!is_duplicate_key_error)
      {
1114 1115 1116 1117 1118
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
unknown's avatar
unknown committed
1119
        if (info->ignore)
1120 1121
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
unknown's avatar
unknown committed
1122
      }
unknown's avatar
unknown committed
1123 1124
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1125
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
unknown's avatar
unknown committed
1126 1127
	goto err;
      }
1128 1129
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1130 1131 1132 1133 1134
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1135 1136
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1137
          key_nr == table->s->next_number_index &&
1138
	  (insert_id_for_cur_row > 0))
1139
	goto err;
1140
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
unknown's avatar
unknown committed
1141
      {
1142
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
unknown's avatar
unknown committed
1143 1144 1145 1146
	  goto err;
      }
      else
      {
1147
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
1148 1149 1150 1151
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
1152

unknown's avatar
unknown committed
1153 1154
	if (!key)
	{
1155
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
unknown's avatar
unknown committed
1156 1157 1158 1159 1160 1161
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1162
	key_copy((byte*) key,table->record[0],table->key_info+key_nr,0);
unknown's avatar
unknown committed
1163
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1164
						(byte*) key,
unknown's avatar
unknown committed
1165 1166
						table->key_info[key_nr].
						key_length,
unknown's avatar
unknown committed
1167 1168 1169
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1170
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1171
      {
unknown's avatar
unknown committed
1172
        int res= 0;
unknown's avatar
unknown committed
1173 1174 1175 1176
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1177
        */
unknown's avatar
unknown committed
1178
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
1179 1180
        store_record(table,insert_values);
        restore_record(table,record[1]);
unknown's avatar
unknown committed
1181 1182
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
unknown's avatar
unknown committed
1183 1184 1185 1186 1187
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
                                                 *info->update_values, 0,
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1188 1189

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
unknown's avatar
unknown committed
1190 1191 1192
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
unknown's avatar
unknown committed
1193
          goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1194
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1195
          goto before_trg_err;
1196

1197 1198
        if ((error=table->file->ha_update_row(table->record[1],
                                              table->record[0])))
1199
	{
1200
          if (info->ignore &&
1201
              !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1202
          {
1203
            table->file->restore_auto_increment(prev_insert_id);
1204
 
unknown's avatar
unknown committed
1205
            goto ok_or_after_trg_err;
1206
          }
1207
          goto err;
1208
        }
unknown's avatar
unknown committed
1209 1210
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ) ||
             compare_record(table))
1211 1212
        {
          info->updated++;
1213 1214 1215 1216 1217 1218 1219 1220
          /*
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
            like a regular UPDATE statement: it should not affect the value of a
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
            handled separately by THD::arg_of_last_insert_id_function.
          */
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1221
          if (table->next_number_field)
1222
            table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
1223 1224
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1225
                                                        TRG_ACTION_AFTER, TRUE));
1226 1227
          info->copied++;
        }
unknown's avatar
unknown committed
1228
        goto ok_or_after_trg_err;
1229 1230 1231
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
1232 1233 1234 1235 1236
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1237 1238
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1239 1240 1241 1242 1243 1244
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
unknown's avatar
unknown committed
1245 1246
	*/
	if (last_uniq_key(table,key_nr) &&
1247
	    !table->file->referenced_by_foreign_key() &&
1248
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1249 1250
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1251
        {
1252 1253
          if ((error=table->file->ha_update_row(table->record[1],
					        table->record[0])))
1254 1255
            goto err;
          info->deleted++;
1256
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1257 1258 1259 1260 1261
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
1262 1263 1264 1265 1266 1267 1268
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1269
          if ((error=table->file->ha_delete_row(table->record[1])))
unknown's avatar
unknown committed
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
            thd->no_trans_update= 1;
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1282
        }
unknown's avatar
unknown committed
1283 1284
      }
    }
1285
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1286 1287 1288 1289 1290 1291 1292
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
1293
  }
1294
  else if ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1295
  {
1296
    if (!info->ignore ||
1297
        table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1298
      goto err;
1299
    table->file->restore_auto_increment(prev_insert_id);
1300
    goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1301
  }
1302 1303 1304

after_trg_n_copied_inc:
  info->copied++;
1305
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1306 1307 1308
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
1309 1310

ok_or_after_trg_err:
unknown's avatar
unknown committed
1311
  if (key)
1312
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
unknown's avatar
unknown committed
1313 1314
  if (!table->file->has_transactions())
    thd->no_trans_update= 1;
unknown's avatar
unknown committed
1315
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
1316 1317

err:
1318
  info->last_errno= error;
1319 1320 1321
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
unknown's avatar
unknown committed
1322
  table->file->print_error(error,MYF(0));
1323
  
unknown's avatar
unknown committed
1324
before_trg_err:
1325
  table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1326 1327
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1328
  table->column_bitmaps_set(save_read_set, save_write_set);
1329
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1330 1331 1332 1333
}


/******************************************************************************
unknown's avatar
unknown committed
1334
  Check that all fields with arn't null_fields are used
unknown's avatar
unknown committed
1335 1336
******************************************************************************/

1337 1338
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
unknown's avatar
unknown committed
1339
{
1340
  int err= 0;
1341 1342
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
1343 1344
  for (Field **field=entry->field ; *field ; field++)
  {
1345
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1346
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1347
        ((*field)->real_type() != MYSQL_TYPE_ENUM))
unknown's avatar
unknown committed
1348
    {
1349 1350 1351
      bool view= FALSE;
      if (table_list)
      {
unknown's avatar
unknown committed
1352
        table_list= table_list->top_table();
unknown's avatar
unknown committed
1353
        view= test(table_list->view);
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1370
      err= 1;
unknown's avatar
unknown committed
1371 1372
    }
  }
1373
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
1374 1375 1376
}

/*****************************************************************************
unknown's avatar
unknown committed
1377 1378
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
1379 1380
*****************************************************************************/

1381 1382
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
1383 1384
class delayed_row :public ilink {
public:
1385
  char *record;
unknown's avatar
unknown committed
1386 1387
  enum_duplicates dup;
  time_t start_time;
1388 1389 1390
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1391
  ulonglong forced_insert_id;
1392 1393
  ulong auto_increment_increment;
  ulong auto_increment_offset;
1394
  timestamp_auto_set_type timestamp_field_type;
1395
  LEX_STRING query;
unknown's avatar
unknown committed
1396

1397 1398 1399
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
1400
      forced_insert_id(0), query(query_arg)
1401
    {}
unknown's avatar
unknown committed
1402 1403
  ~delayed_row()
  {
1404
    x_free(query.str);
unknown's avatar
unknown committed
1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
    x_free(record);
  }
};


class delayed_insert :public ilink {
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1421
  ulong group_count;
unknown's avatar
unknown committed
1422
  TABLE_LIST table_list;			// Argument
unknown's avatar
unknown committed
1423 1424

  delayed_insert()
1425
    :locks_in_memory(0),
unknown's avatar
unknown committed
1426 1427 1428
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1429 1430
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
unknown's avatar
unknown committed
1431 1432 1433
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1434 1435
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1436 1437 1438 1439
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
1440
    thd.set_current_stmt_binlog_row_based_if_mixed();
unknown's avatar
unknown committed
1441

1442 1443
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1444
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1445
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
1446
    bzero((char*) &info,sizeof(info));
1447
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
1448 1449 1450 1451 1452 1453 1454 1455
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
  ~delayed_insert()
  {
1456
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
1457 1458 1459 1460 1461 1462
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1463 1464 1465
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
unknown's avatar
unknown committed
1466 1467
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1468
    thd.security_ctx->user= thd.security_ctx->host=0;
unknown's avatar
unknown committed
1469 1470 1471
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1472
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
unknown's avatar
unknown committed
1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


I_List<delayed_insert> delayed_threads;


delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1513 1514
    if (!strcmp(tmp->thd.db, table_list->db) &&
	!strcmp(table_list->table_name, tmp->table->s->table_name.str))
unknown's avatar
unknown committed
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
    {
      tmp->lock();
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
  return tmp;
}


static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{
  int error;
  delayed_insert *tmp;
1529
  TABLE *table;
unknown's avatar
unknown committed
1530 1531
  DBUG_ENTER("delayed_get_table");

unknown's avatar
unknown committed
1532 1533
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
unknown's avatar
unknown committed
1534

1535
  /* Find the thread which handles this table. */
unknown's avatar
unknown committed
1536 1537
  if (!(tmp=find_handler(thd,table_list)))
  {
1538 1539 1540 1541
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1542
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1543
      DBUG_RETURN(0);
unknown's avatar
unknown committed
1544 1545
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1546 1547 1548 1549 1550
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
    if (! (tmp= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1551 1552 1553 1554
    {
      if (!(tmp=new delayed_insert()))
      {
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert));
1555
	goto err1;
unknown's avatar
unknown committed
1556
      }
unknown's avatar
unknown committed
1557 1558 1559
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1560 1561 1562
      tmp->thd.set_db(table_list->db, strlen(table_list->db));
      tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
      if (tmp->thd.db == NULL || tmp->thd.query == NULL)
unknown's avatar
unknown committed
1563 1564
      {
	delete tmp;
unknown's avatar
unknown committed
1565
	my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1566
	goto err1;
unknown's avatar
unknown committed
1567
      }
unknown's avatar
unknown committed
1568
      tmp->table_list= *table_list;			// Needed to open table
1569
      tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
unknown's avatar
unknown committed
1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
      tmp->lock();
      pthread_mutex_lock(&tmp->mutex);
      if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
				handle_delayed_insert,(void*) tmp)))
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
	pthread_mutex_unlock(&tmp->mutex);
	tmp->unlock();
	delete tmp;
unknown's avatar
unknown committed
1581
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1582
	goto err1;
unknown's avatar
unknown committed
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
      while (!tmp->thd.killed && !tmp->table && !thd->killed)
      {
	pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
      }
      pthread_mutex_unlock(&tmp->mutex);
      thd->proc_info="got old table";
      if (tmp->thd.killed)
      {
1595
	if (tmp->thd.is_fatal_error)
unknown's avatar
unknown committed
1596 1597
	{
	  /* Copy error message and abort */
1598
	  thd->fatal_error();
unknown's avatar
unknown committed
1599
	  strmov(thd->net.last_error,tmp->thd.net.last_error);
1600
	  thd->net.last_errno=tmp->thd.net.last_errno;
unknown's avatar
unknown committed
1601 1602
	}
	tmp->unlock();
1603
	goto err;
unknown's avatar
unknown committed
1604 1605 1606 1607
      }
      if (thd->killed)
      {
	tmp->unlock();
1608
	goto err;
unknown's avatar
unknown committed
1609 1610 1611 1612 1613 1614
      }
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

  pthread_mutex_lock(&tmp->mutex);
1615
  table= tmp->get_local_table(thd);
unknown's avatar
unknown committed
1616 1617 1618
  pthread_mutex_unlock(&tmp->mutex);
  if (table)
    thd->di=tmp;
1619 1620
  else if (tmp->thd.is_fatal_error)
    thd->fatal_error();
1621 1622
  /* Unlock the delayed insert object after its last access. */
  tmp->unlock();
unknown's avatar
unknown committed
1623
  DBUG_RETURN((table_list->table=table));
1624 1625 1626 1627 1628 1629

 err1:
  thd->fatal_error();
 err:
  pthread_mutex_unlock(&LOCK_delayed_create);
  DBUG_RETURN(0); // Continue with normal insert
unknown's avatar
unknown committed
1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
}


/*
  As we can't let many threads modify the same TABLE structure, we create
  an own structure for each tread.  This includes a row buffer to save the
  column values and new fields that points to the new row buffer.
  The memory is allocated in the client thread and is freed automaticly.
*/

TABLE *delayed_insert::get_local_table(THD* client_thd)
{
  my_ptrdiff_t adjust_ptrs;
1643
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
1644
  TABLE *copy;
unknown's avatar
unknown committed
1645
  TABLE_SHARE *share= table->s;
1646
  byte *bitmap;
1647
  DBUG_ENTER("delayed_insert::get_local_table");
unknown's avatar
unknown committed
1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
      strmov(client_thd->net.last_error,thd.net.last_error);
      client_thd->net.last_errno=thd.net.last_errno;
      goto error;
    }
  }

1671 1672 1673 1674 1675 1676 1677
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
unknown's avatar
unknown committed
1678
  client_thd->proc_info="allocating local table";
unknown's avatar
unknown committed
1679
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
unknown's avatar
unknown committed
1680
				   (share->fields+1)*sizeof(Field**)+
1681 1682
				   share->reclength +
                                   share->column_bitmap_size*2);
unknown's avatar
unknown committed
1683 1684 1685
  if (!copy)
    goto error;

1686
  /* Copy the TABLE object. */
unknown's avatar
unknown committed
1687
  *copy= *table;
unknown's avatar
unknown committed
1688
  /* We don't need to change the file handler here */
1689 1690
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
1691 1692 1693
  bitmap= (byte*) (field + share->fields + 1);
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
unknown's avatar
unknown committed
1704
  {
1705 1706
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
      DBUG_RETURN(0);
1707
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
1708
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
1709 1710
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
1711 1712 1713 1714 1715 1716 1717 1718
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
unknown's avatar
unknown committed
1719
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
1720
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
1721
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
unknown's avatar
unknown committed
1722 1723
  }

1724 1725
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
1726 1727 1728 1729

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

1730 1731 1732 1733 1734 1735 1736 1737 1738
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

1739
  DBUG_RETURN(copy);
unknown's avatar
unknown committed
1740 1741 1742 1743 1744 1745

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
1746
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1747 1748 1749 1750 1751
}


/* Put a question in queue */

1752 1753 1754
static int
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
              LEX_STRING query, bool ignore, bool log_on)
unknown's avatar
unknown committed
1755
{
1756
  delayed_row *row= 0;
unknown's avatar
unknown committed
1757
  delayed_insert *di=thd->di;
1758
  const Discrete_interval *forced_auto_inc;
unknown's avatar
unknown committed
1759
  DBUG_ENTER("write_delayed");
1760
  DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
unknown's avatar
unknown committed
1761 1762 1763 1764 1765 1766 1767

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

1768
  if (thd->killed)
unknown's avatar
unknown committed
1769 1770
    goto err;

1771 1772 1773 1774 1775 1776 1777
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
1778 1779 1780
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
1781
      goto err;
1782 1783
    query.str= str;
  }
1784 1785 1786 1787
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
unknown's avatar
unknown committed
1788
    goto err;
1789
  }
unknown's avatar
unknown committed
1790

1791
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
unknown's avatar
unknown committed
1792
    goto err;
1793
  memcpy(row->record, table->record[0], table->s->reclength);
unknown's avatar
unknown committed
1794 1795
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
1796 1797 1798 1799 1800 1801 1802 1803 1804 1805
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
1806
  row->timestamp_field_type=    table->timestamp_field_type;
unknown's avatar
unknown committed
1807

1808
  /* Copy session variables. */
1809 1810
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
1811 1812 1813 1814 1815 1816 1817
  /* Copy the next forced auto increment value, if any. */
  if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
  {
    row->forced_insert_id= forced_auto_inc->minimum();
    DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
                           (ulong) row->forced_insert_id));
  }
1818

unknown's avatar
unknown committed
1819 1820 1821
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
1822
  if (table->s->blob_fields)
unknown's avatar
unknown committed
1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}


static void end_delayed_insert(THD *thd)
{
1839
  DBUG_ENTER("end_delayed_insert");
unknown's avatar
unknown committed
1840 1841
  delayed_insert *di=thd->di;
  pthread_mutex_lock(&di->mutex);
1842
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
1843 1844 1845 1846 1847 1848
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
1849
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
SCRUM  
unknown committed
1863
    tmp->thd.killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1864 1865 1866
    if (tmp->thd.mysys_var)
    {
      pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1867
      if (tmp->thd.mysys_var->current_cond)
unknown's avatar
unknown committed
1868
      {
unknown's avatar
unknown committed
1869 1870 1871 1872 1873 1874
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1875
	pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
unknown's avatar
unknown committed
1876 1877
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889
      }
      pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
    }
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

1890
pthread_handler_t handle_delayed_insert(void *arg)
unknown's avatar
unknown committed
1891 1892 1893 1894 1895 1896 1897 1898
{
  delayed_insert *di=(delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id=thread_id++;
1899
  thd->end_time();
unknown's avatar
unknown committed
1900
  threads.append(thd);
unknown's avatar
SCRUM  
unknown committed
1901
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
unknown's avatar
unknown committed
1902 1903
  pthread_mutex_unlock(&LOCK_thread_count);

1904 1905 1906 1907 1908 1909 1910 1911
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
unknown's avatar
unknown committed
1912
  pthread_mutex_lock(&di->mutex);
1913
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
unknown's avatar
unknown committed
1914 1915 1916 1917 1918 1919 1920 1921
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
1922
  thd->thread_stack= (char*) &thd;
unknown's avatar
unknown committed
1923
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1924
  {
1925
    thd->fatal_error();
unknown's avatar
unknown committed
1926
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
1927
    goto err;
unknown's avatar
unknown committed
1928
  }
1929
#if !defined(__WIN__) && !defined(__NETWARE__)
unknown's avatar
unknown committed
1930 1931 1932 1933 1934 1935 1936
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  /* open table */

unknown's avatar
unknown committed
1937
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
unknown's avatar
unknown committed
1938
  {
1939
    thd->fatal_error();				// Abort waiting inserts
1940
    goto err;
unknown's avatar
unknown committed
1941
  }
1942
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
unknown's avatar
unknown committed
1943
  {
1944
    thd->fatal_error();
1945
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
1946
    goto err;
unknown's avatar
unknown committed
1947
  }
unknown's avatar
unknown committed
1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962
  di->table->copy_blobs=1;

  /* One can now use this */
  pthread_mutex_lock(&LOCK_delayed_insert);
  delayed_threads.append(di);
  pthread_mutex_unlock(&LOCK_delayed_insert);

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
unknown's avatar
SCRUM  
unknown committed
1963
    if (thd->killed == THD::KILL_CONNECTION)
unknown's avatar
unknown committed
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
1983
      set_timespec(abstime, delayed_insert_timeout);
unknown's avatar
unknown committed
1984 1985 1986 1987

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
1988
      di->thd.proc_info="Waiting for INSERT";
unknown's avatar
unknown committed
1989

1990
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
unknown's avatar
unknown committed
1991
      while (!thd->killed)
unknown's avatar
unknown committed
1992 1993
      {
	int error;
1994
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
unknown's avatar
unknown committed
1995 1996 1997 1998
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
1999
	if (error && error != EINTR && error != ETIMEDOUT)
unknown's avatar
unknown committed
2000 2001 2002 2003 2004 2005 2006 2007 2008
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
unknown's avatar
unknown committed
2009
	if (error == ETIMEDOUT || error == ETIME)
unknown's avatar
unknown committed
2010
	{
unknown's avatar
SCRUM  
unknown committed
2011
	  thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2012 2013 2014
	  break;
	}
      }
unknown's avatar
unknown committed
2015 2016
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2017 2018 2019 2020
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
unknown's avatar
unknown committed
2021
      pthread_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
2022
    }
2023
    di->thd.proc_info=0;
unknown's avatar
unknown committed
2024 2025 2026

    if (di->tables_in_use && ! thd->lock)
    {
2027
      bool not_used;
2028 2029 2030 2031 2032 2033 2034 2035 2036 2037
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
2038
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
2039 2040
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
unknown's avatar
unknown committed
2041
      {
2042 2043 2044
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2045 2046 2047 2048 2049 2050 2051
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
2052 2053 2054
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2055 2056 2057 2058 2059
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
unknown's avatar
unknown committed
2060 2061 2062 2063
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
unknown's avatar
unknown committed
2064 2065 2066
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
2067 2068 2069 2070
      /*
        We need to release next_insert_id before unlocking. This is
        enforced by handler::ha_external_lock().
      */
2071
      di->table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
2072 2073 2074 2075 2076 2077 2078 2079
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

2080 2081 2082 2083 2084 2085 2086
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
2087 2088 2089 2090

    TODO: This is not true any more, table maps are generated on the
    first call to ha_*_row() instead. Remove code that are used to
    cover for the case outlined above.
2091 2092 2093
   */
  ha_rollback_stmt(thd);

2094
#ifndef __WIN__
unknown's avatar
unknown committed
2095
end:
2096
#endif
unknown's avatar
unknown committed
2097 2098 2099 2100 2101 2102 2103
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2104 2105
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
unknown's avatar
unknown committed
2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
      char *str;
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


bool delayed_insert::handle_inserts(void)
{
  int error;
2152
  ulong max_rows;
2153 2154
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2155
  delayed_row *row;
unknown's avatar
unknown committed
2156 2157 2158 2159 2160 2161
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
2162
  table->use_all_columns();
unknown's avatar
unknown committed
2163 2164 2165 2166 2167

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
unknown's avatar
unknown committed
2168
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
unknown's avatar
unknown committed
2169 2170 2171 2172
    goto err;
  }

  thd.proc_info="insert";
2173
  max_rows= delayed_insert_limit;
2174
  if (thd.killed || table->s->version != refresh_version)
unknown's avatar
unknown committed
2175
  {
unknown's avatar
SCRUM  
unknown committed
2176
    thd.killed= THD::KILL_CONNECTION;
2177
    max_rows= ULONG_MAX;                     // Do as much as possible
unknown's avatar
unknown committed
2178 2179
  }

2180 2181 2182 2183 2184 2185 2186
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2187
  pthread_mutex_lock(&mutex);
2188

unknown's avatar
unknown committed
2189 2190 2191 2192
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2193
    memcpy(table->record[0],row->record,table->s->reclength);
unknown's avatar
unknown committed
2194 2195 2196

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2197 2198 2199 2200 2201
    /*
      To get the exact auto_inc interval to store in the binlog we must not
      use values from the previous interval (of the previous rows).
    */
    bool log_query= (row->log_query && row->query.str != NULL);
2202 2203
    DBUG_PRINT("delayed", ("query: '%s'  length: %u", row->query.str ?
                           row->query.str : "[NULL]", row->query.length));
2204 2205
    if (log_query)
    {
2206 2207 2208 2209 2210 2211 2212 2213
      /*
        This is the first value of an INSERT statement.
        It is the right place to clear a forced insert_id.
        This is usually done after the last value of an INSERT statement,
        but we won't know this in the insert delayed thread. But before
        the first value is sufficiently equivalent to after the last
        value of the previous statement.
      */
2214 2215 2216
      table->file->ha_release_auto_increment();
      thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
    }
2217 2218 2219 2220
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2221
    table->timestamp_field_type= row->timestamp_field_type;
unknown's avatar
unknown committed
2222

2223
    /* Copy the session variables. */
2224 2225
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
2226 2227 2228 2229 2230 2231 2232
    /* Copy a forced insert_id, if any. */
    if (row->forced_insert_id)
    {
      DBUG_PRINT("delayed", ("received auto_inc: %lu",
                             (ulong) row->forced_insert_id));
      thd.force_one_auto_inc_interval(row->forced_insert_id);
    }
2233

2234
    info.ignore= row->ignore;
unknown's avatar
unknown committed
2235
    info.handle_duplicates= row->dup;
2236
    if (info.ignore ||
unknown's avatar
unknown committed
2237
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2238 2239 2240 2241
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2242 2243 2244 2245 2246 2247 2248
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
unknown's avatar
unknown committed
2249
    thd.clear_error(); // reset error for binlog
unknown's avatar
unknown committed
2250
    if (write_record(&thd, table, &info))
unknown's avatar
unknown committed
2251
    {
2252
      info.error_count++;				// Ignore errors
2253
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
2254
      row->log_query = 0;
unknown's avatar
unknown committed
2255
    }
2256

unknown's avatar
unknown committed
2257 2258 2259 2260 2261
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2262 2263 2264 2265 2266
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2267

2268
    if (log_query && mysql_bin_log.is_open())
unknown's avatar
unknown committed
2269 2270 2271 2272 2273 2274 2275 2276 2277
    {
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2278 2279 2280
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);
unknown's avatar
unknown committed
2281
    }
2282

2283
    if (table->s->blob_fields)
unknown's avatar
unknown committed
2284 2285 2286 2287 2288 2289
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2290 2291 2292 2293 2294 2295 2296
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2297
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
2311
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
unknown's avatar
unknown committed
2312 2313
	  goto err;
	}
unknown's avatar
unknown committed
2314
	query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2315 2316 2317
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
unknown's avatar
unknown committed
2318 2319
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
unknown's avatar
unknown committed
2320
	}
2321 2322
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2323 2324 2325 2326 2327 2328 2329 2330 2331
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }
  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2332

2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
2347

unknown's avatar
unknown committed
2348 2349 2350 2351
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
2352
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
unknown's avatar
unknown committed
2353 2354
    goto err;
  }
unknown's avatar
unknown committed
2355
  query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2356 2357 2358 2359
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2360 2361 2362
#ifndef DBUG_OFF
  max_rows= 0;                                  // For DBUG output
#endif
2363 2364 2365 2366 2367 2368
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
2369 2370 2371
#ifndef DBUG_OFF
    max_rows++;
#endif
2372
  }
2373
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
unknown's avatar
unknown committed
2374 2375 2376 2377
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2378
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
2379 2380

/***************************************************************************
unknown's avatar
unknown committed
2381
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
2382 2383
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
2384 2385 2386 2387 2388 2389 2390 2391 2392

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
unknown's avatar
unknown committed
2393 2394
    FALSE OK
    TRUE  Error
unknown's avatar
VIEW  
unknown committed
2395 2396
*/

unknown's avatar
unknown committed
2397
bool mysql_insert_select_prepare(THD *thd)
unknown's avatar
VIEW  
unknown committed
2398 2399
{
  LEX *lex= thd->lex;
unknown's avatar
unknown committed
2400
  SELECT_LEX *select_lex= &lex->select_lex;
unknown's avatar
unknown committed
2401
  TABLE_LIST *first_select_leaf_table;
unknown's avatar
VIEW  
unknown committed
2402
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
2403

unknown's avatar
unknown committed
2404 2405
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
2406
    clause if table is VIEW
unknown's avatar
unknown committed
2407
  */
unknown's avatar
unknown committed
2408
  
unknown's avatar
unknown committed
2409 2410 2411 2412
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
unknown's avatar
unknown committed
2413
                           &select_lex->where, TRUE))
unknown's avatar
unknown committed
2414
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2415

2416 2417 2418 2419
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
unknown's avatar
unknown committed
2420 2421
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2422
  /* skip all leaf tables belonged to view where we are insert */
unknown's avatar
unknown committed
2423
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2424 2425 2426 2427 2428 2429
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
       first_select_leaf_table= first_select_leaf_table->next_leaf)
  {}
unknown's avatar
unknown committed
2430
  select_lex->leaf_tables= first_select_leaf_table;
unknown's avatar
unknown committed
2431
  DBUG_RETURN(FALSE);
unknown's avatar
VIEW  
unknown committed
2432 2433 2434
}


2435
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
unknown's avatar
unknown committed
2436
                             List<Item> *fields_par,
unknown's avatar
unknown committed
2437 2438
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
2439
                             enum_duplicates duplic,
2440 2441
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
2442
   autoinc_value_of_last_inserted_row(0),
2443 2444 2445
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
2446 2447 2448 2449
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2450
  if (table_list_par)
unknown's avatar
unknown committed
2451
    info.view= (table_list_par->view ? table_list_par : 0);
2452 2453 2454
}


unknown's avatar
unknown committed
2455
int
2456
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
2457
{
2458
  LEX *lex= thd->lex;
2459
  int res;
2460
  table_map map= 0;
2461
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
2462 2463
  DBUG_ENTER("select_insert::prepare");

2464
  unit= u;
2465

2466 2467 2468 2469 2470 2471
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2472
  res= check_insert_fields(thd, table_list, *fields, values,
2473
                           !insert_into_view, &map) ||
2474
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2475

2476 2477
  if (info.handle_duplicates == DUP_UPDATE)
  {
2478
    Name_resolution_context *context= &lex->select_lex.context;
2479 2480 2481 2482
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2483 2484

    /* Perform name resolution only in the first table - 'table_list'. */
2485
    table_list->next_local= 0;
2486 2487
    context->resolve_in_table_list_only(table_list);

2488
    lex->select_lex.no_wrap_view_item= TRUE;
2489
    res= res || check_update_fields(thd, context->table_list,
2490
                                    *info.update_fields, &map);
2491 2492
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
2493 2494 2495
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
      we can refer to other tables in the ON DUPLICATE KEY part.
      We use next_name_resolution_table descructively, so check it first (views?)
2496
    */
2497 2498 2499 2500 2501 2502 2503 2504
    DBUG_ASSERT (!table_list->next_name_resolution_table);
    if (lex->select_lex.group_list.elements == 0 &&
        !lex->select_lex.with_sum_func)
      /*
        We must make a single context out of the two separate name resolution contexts :
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
        To do that we must concatenate the two lists
      */  
unknown's avatar
unknown committed
2505 2506
      table_list->next_name_resolution_table= 
        ctx_state.get_first_name_resolution_table();
2507

unknown's avatar
unknown committed
2508 2509
    res= res || setup_fields(thd, 0, *info.update_values,
                             MARK_COLUMNS_READ, 0, 0);
2510
    if (!res)
2511
    {
2512 2513 2514 2515 2516 2517 2518 2519
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
2520

2521 2522 2523 2524 2525
      while ((item= li++))
      {
        item->transform(&Item::update_value_transformer,
                        (byte*)lex->current_select);
      }
2526 2527 2528
    }

    /* Restore the current context. */
2529
    ctx_state.restore_state(context, table_list);
2530
  }
2531

2532 2533
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
2534
    DBUG_RETURN(1);
2535 2536 2537 2538 2539 2540 2541 2542 2543 2544
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2545
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2546
      unique_table(thd, table_list, table_list->next_global))
2547 2548
  {
    /* Using same table for INSERT and SELECT */
2549 2550
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2551
  }
2552
  else if (!thd->prelocked_mode)
2553 2554 2555 2556 2557 2558 2559
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2560 2561
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2562
    */
2563
    table->file->ha_start_bulk_insert((ha_rows) 0);
2564
  }
2565
  restore_record(table,s->default_values);		// Get empty record
unknown's avatar
unknown committed
2566
  table->next_number_field=table->found_next_number_field;
unknown's avatar
unknown committed
2567
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2568 2569
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2570 2571 2572
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
2573
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2574
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2575 2576 2577
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2578 2579 2580 2581
  res= ((fields->elements &&
         check_that_all_fields_are_given_values(thd, table, table_list)) ||
        table_list->prepare_where(thd, 0, TRUE) ||
        table_list->prepare_check_option(thd));
2582 2583

  if (!res)
2584 2585
    table->mark_columns_needed_for_insert();

2586
  DBUG_RETURN(res);
unknown's avatar
unknown committed
2587 2588
}

2589

2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2609 2610
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2611
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2612
  DBUG_RETURN(0);
2613 2614 2615
}


2616 2617 2618 2619 2620 2621
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
2622 2623
select_insert::~select_insert()
{
2624
  DBUG_ENTER("~select_insert");
unknown's avatar
unknown committed
2625 2626 2627
  if (table)
  {
    table->next_number_field=0;
2628
    table->file->ha_reset();
unknown's avatar
unknown committed
2629
  }
2630
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2631
  thd->abort_on_warning= 0;
2632
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2633 2634 2635 2636 2637
}


bool select_insert::send_data(List<Item> &values)
{
2638
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
2639
  bool error=0;
2640

2641
  if (unit->offset_limit_cnt)
unknown's avatar
unknown committed
2642
  {						// using limit offset,count
2643
    unit->offset_limit_cnt--;
2644
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2645
  }
unknown's avatar
unknown committed
2646

unknown's avatar
unknown committed
2647
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
unknown's avatar
unknown committed
2648 2649
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2650 2651
  if (thd->net.report_error)
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2652 2653
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
2654
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
2655 2656 2657 2658 2659
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
2660
  }
2661

2662
  error= write_record(thd, table, &info);
2663 2664
    
  if (!error)
unknown's avatar
unknown committed
2665
  {
unknown's avatar
unknown committed
2666
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
2667 2668
    {
      /*
unknown's avatar
unknown committed
2669 2670 2671
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
2672 2673 2674 2675 2676 2677 2678 2679
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
2680 2681 2682 2683 2684 2685 2686
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
unknown's avatar
unknown committed
2687 2688 2689 2690 2691 2692
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
unknown's avatar
unknown committed
2693
  }
unknown's avatar
unknown committed
2694
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2695 2696 2697
}


unknown's avatar
unknown committed
2698 2699 2700
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
unknown's avatar
unknown committed
2701 2702
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2703
  else
unknown's avatar
unknown committed
2704 2705
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2706 2707
}

unknown's avatar
unknown committed
2708 2709
void select_insert::send_error(uint errcode,const char *err)
{
unknown's avatar
unknown committed
2710 2711
  DBUG_ENTER("select_insert::send_error");

2712 2713 2714
  /* Avoid an extra 'unknown error' message if we already reported an error */
  if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
    my_message(errcode, err, MYF(0));
2715

2716 2717 2718 2719
  /*
    If the creation of the table failed (due to a syntax error, for
    example), no table will have been opened and therefore 'table'
    will be NULL. In that case, we still need to execute the rollback
2720
    and the end of the function.
2721 2722
   */
  if (table)
2723 2724
  {
    /*
2725 2726
      If we are not in prelocked mode, we end the bulk insert started
      before.
2727
    */
2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745
    if (!thd->prelocked_mode)
      table->file->ha_end_bulk_insert();

    /*
      If at least one row has been inserted/modified and will stay in
      the table (the table doesn't have transactions) we must write to
      the binlog (and the error code will make the slave stop).

      For many errors (example: we got a duplicate key error while
      inserting into a MyISAM table), no row will be added to the table,
      so passing the error to the slave will not help since there will
      be an error code mismatch (the inserts will succeed on the slave
      with no error).

      If table creation failed, the number of rows modified will also be
      zero, so no check for that is made.
    */
    if (info.copied || info.deleted || info.updated)
2746
    {
2747 2748
      DBUG_ASSERT(table != NULL);
      if (!table->file->has_transactions())
2749
      {
2750 2751 2752 2753 2754 2755 2756
        if (mysql_bin_log.is_open())
          thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
                            table->file->has_transactions(), FALSE);
        if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
            !can_rollback_data())
          thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
        query_cache_invalidate3(thd, table, 1);
2757
      }
2758
    }
unknown's avatar
unknown committed
2759
    table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
2760
  }
2761

unknown's avatar
unknown committed
2762
  ha_rollback_stmt(thd);
unknown's avatar
unknown committed
2763
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2764 2765 2766 2767 2768
}


bool select_insert::send_eof()
{
2769
  int error;
2770
  bool const trans_table= table->file->has_transactions();
2771
  ulonglong id;
unknown's avatar
unknown committed
2772
  DBUG_ENTER("select_insert::send_eof");
2773 2774
  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
                       trans_table, table->file->table_type()));
unknown's avatar
unknown committed
2775

2776
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
unknown's avatar
unknown committed
2777
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2778
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2779

unknown's avatar
unknown committed
2780
  if (info.copied || info.deleted || info.updated)
2781 2782 2783 2784 2785
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
2786
    query_cache_invalidate3(thd, table, 1);
2787 2788 2789 2790 2791 2792 2793
    /*
      Mark that we have done permanent changes if all of the below is true
      - Table doesn't support transactions
      - It's a normal (not temporary) table. (Changes to temporary tables
        are not logged in RBR)
      - We are using statement based replication
    */
2794 2795
    if (!trans_table &&
        (!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
2796 2797
      thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
   }
unknown's avatar
unknown committed
2798

2799 2800 2801 2802 2803 2804
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
2805 2806
  if (mysql_bin_log.is_open())
  {
unknown's avatar
unknown committed
2807 2808
    if (!error)
      thd->clear_error();
2809 2810
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
2811
                      trans_table, FALSE);
2812
  }
2813 2814 2815 2816 2817 2818 2819 2820
  /*
    We will call ha_autocommit_or_rollback() also for
    non-transactional tables under row-based replication: there might
    be events in the binary logs transaction, and we need to write
    them to the binary log.
   */
  if (trans_table || thd->current_stmt_binlog_row_based)
  {
2821
    int error2= ha_autocommit_or_rollback(thd, error);
2822
    if (error2 && !error)
2823
      error= error2;
2824
  }
2825
  table->file->ha_release_auto_increment();
2826

2827
  if (error)
unknown's avatar
unknown committed
2828 2829
  {
    table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
2830
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2831
  }
unknown's avatar
unknown committed
2832
  char buff[160];
2833
  if (info.ignore)
unknown's avatar
unknown committed
2834 2835
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
2836
  else
unknown's avatar
unknown committed
2837
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
2838
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
2839
  thd->row_count_func= info.copied+info.deleted+info.updated;
2840 2841 2842 2843 2844 2845 2846

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
  ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
2847
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2848 2849 2850 2851
}


/***************************************************************************
unknown's avatar
unknown committed
2852
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
2853 2854
***************************************************************************/

2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874
/*
  Create table from lists of fields and items (or open existing table
  with same name).

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
      extra_fields in/out Initial list of fields for table to be created
      keys         in     List of keys for table to be created
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
                          be added to the end of 'extra_fields' list)
      lock         out    Pointer to the MYSQL_LOCK object for table created
                          (open) will be returned in this parameter. Since
                          this table is not included in THD::lock caller is
                          responsible for explicitly unlocking this table.
unknown's avatar
unknown committed
2875
      hooks
2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893

  NOTES
    If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS
    flag and table with name provided already exists then this function will
    simply open existing table.
    Also note that create, open and lock sequence in this function is not
    atomic and thus contains gap for deadlock and can cause other troubles.
    Since this function contains some logic specific to CREATE TABLE ... SELECT
    it should be changed before it can be used in other contexts.

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
                                      List<create_field> *extra_fields,
unknown's avatar
unknown committed
2894 2895 2896 2897
                                      List<Key> *keys,
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
2898 2899
{
  TABLE tmp_table;		// Used during 'create_field()'
unknown's avatar
unknown committed
2900
  TABLE_SHARE share;
2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
2912 2913 2914
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

2915 2916
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
2917
  tmp_table.s->db_low_byte_first= 
2918 2919
        test(create_info->db_type == myisam_hton ||
             create_info->db_type == heap_hton);
2920 2921 2922 2923 2924
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
2925
    Field *field, *def_field;
2926
    if (item->type() == Item::FUNC_ITEM)
2927
      field= item->tmp_table_field(&tmp_table);
2928
    else
2929 2930 2931
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
    extra_fields->push_back(cr_field);
  }
  /*
    create and lock table

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
2952 2953 2954
    NOTE: By locking table which we just have created (or for which we just
    have have found that it already exists) separately from other tables used
    by the statement we create potential window for deadlock.
2955 2956 2957 2958 2959 2960
    TODO: create and open should be done atomic !
  */
  {
    tmp_disable_binlog(thd);
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
                            create_info, *extra_fields, *keys, 0,
2961
                            select_field_count, 0))
2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975
    {
      /*
        If we are here in prelocked mode we either create temporary table
        or prelocked mode is caused by the SELECT part of this statement.
      */
      DBUG_ASSERT(!thd->prelocked_mode ||
                  create_info->options & HA_LEX_CREATE_TMP_TABLE ||
                  thd->lex->requires_prelocking());

      /*
        NOTE: We don't want to ignore set of locked tables here if we are
              under explicit LOCK TABLES since it will open gap for deadlock
              too wide (and also is not backward compatible).
      */
unknown's avatar
unknown committed
2976

2977 2978 2979 2980 2981
      if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                               (MYSQL_LOCK_IGNORE_FLUSH |
                                ((thd->prelocked_mode == PRELOCKED) ?
                                 MYSQL_OPEN_IGNORE_LOCKED_TABLES:0)))))
        quick_rm_table(create_info->db_type, create_table->db,
2982 2983
                       table_case_name(create_info, create_table->table_name),
                       0);
2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

  /*
    FIXME: What happens if trigger manages to be created while we are
           obtaining this lock ? May be it is sensible just to disable
           trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH
           save us from that ?
  */
  table->reginfo.lock_type=TL_WRITE;
unknown's avatar
unknown committed
2997
  hooks->prelock(&table, 1);                    // Call prelock hooks
2998 2999 3000 3001 3002 3003 3004
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
  {
    VOID(pthread_mutex_lock(&LOCK_open));
    hash_delete(&open_cache,(byte*) table);
    VOID(pthread_mutex_unlock(&LOCK_open));
    quick_rm_table(create_info->db_type, create_table->db,
3005
		   table_case_name(create_info, create_table->table_name), 0);
3006 3007 3008 3009 3010 3011 3012
    DBUG_RETURN(0);
  }
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(table);
}


unknown's avatar
unknown committed
3013
int
3014
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3015 3016
{
  DBUG_ENTER("select_create::prepare");
3017

3018
  TABLEOP_HOOKS *hook_ptr= NULL;
3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036
  /*
    For row-based replication, the CREATE-SELECT statement is written
    in two pieces: the first one contain the CREATE TABLE statement
    necessary to create the table and the second part contain the rows
    that should go into the table.

    For non-temporary tables, the start of the CREATE-SELECT
    implicitly commits the previous transaction, and all events
    forming the statement will be stored the transaction cache. At end
    of the statement, the entire statement is committed as a
    transaction, and all events are written to the binary log.

    On the master, the table is locked for the duration of the
    statement, but since the CREATE part is replicated as a simple
    statement, there is no way to lock the table for accesses on the
    slave.  Hence, we have to hold on to the CREATE part of the
    statement until the statement has finished.
   */
3037 3038 3039
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
    MY_HOOKS(select_create *x) : ptr(x) { }
3040 3041

  private:
3042 3043
    virtual void do_prelock(TABLE **tables, uint count)
    {
3044 3045
      TABLE const *const table = *tables;
      if (ptr->get_thd()->current_stmt_binlog_row_based  &&
3046
          !table->s->tmp_table &&
3047 3048 3049 3050
          !ptr->get_create_info()->table_existed)
      {
        ptr->binlog_show_create_table(tables, count);
      }
3051 3052 3053 3054 3055 3056
    }

    select_create *ptr;
  };

  MY_HOOKS hooks(this);
3057
  hook_ptr= &hooks;
3058

3059
  unit= u;
3060 3061

  /*
3062 3063 3064
    Start a statement transaction before the create if we are using
    row-based replication for the statement.  If we are creating a
    temporary table, we need to start a statement transaction.
3065 3066 3067 3068 3069 3070 3071
  */
  if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
      thd->current_stmt_binlog_row_based)
  {
    thd->binlog_start_trans_and_stmt();
  }

3072
  if (!(table= create_table_from_items(thd, create_info, create_table,
3073 3074
                                       extra_fields, keys, &values,
                                       &thd->extra_lock, hook_ptr)))
unknown's avatar
unknown committed
3075 3076
    DBUG_RETURN(-1);				// abort() deletes table

3077
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
3078
  {
3079
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
unknown's avatar
unknown committed
3080 3081 3082
    DBUG_RETURN(-1);
  }

3083
 /* First field to copy */
unknown's avatar
unknown committed
3084 3085 3086 3087
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
3088
    bitmap_set_bit(table->write_set, (*f)->field_index);
unknown's avatar
unknown committed
3089

3090
  /* Don't set timestamp if used */
3091
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
unknown's avatar
unknown committed
3092 3093
  table->next_number_field=table->found_next_number_field;

3094
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
3095
  thd->cuted_fields=0;
unknown's avatar
unknown committed
3096
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
3097
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3098 3099 3100
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3101
  if (!thd->prelocked_mode)
3102
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
3103
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
3104
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
3105 3106 3107
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
3108 3109 3110 3111
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
  DBUG_RETURN(0);
unknown's avatar
unknown committed
3112 3113
}

3114
void
3115
select_create::binlog_show_create_table(TABLE **tables, uint count)
3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
3133
  */
3134
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
3135
  DBUG_ASSERT(tables && *tables && count > 0);
3136 3137 3138

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
3139
  int result;
3140
  TABLE_LIST table_list;
3141

3142 3143
  memset(&table_list, 0, sizeof(table_list));
  table_list.table = *tables;
3144
  query.length(0);      // Have to zero it since constructor doesn't
3145

3146
  result= store_create_info(thd, &table_list, &query, create_info);
3147
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
3148

3149 3150 3151 3152 3153 3154
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}

unknown's avatar
unknown committed
3155
void select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
3156
{
unknown's avatar
unknown committed
3157 3158
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3159 3160
}

unknown's avatar
unknown committed
3161

3162 3163
void select_create::send_error(uint errcode,const char *err)
{
3164 3165 3166 3167 3168 3169
  DBUG_ENTER("select_create::send_error");

  DBUG_PRINT("info",
             ("Current statement %s row-based",
              thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
  DBUG_PRINT("info",
unknown's avatar
unknown committed
3170 3171 3172
             ("Current table (at 0x%lx) %s a temporary (or non-existing) "
              "table",
              (ulong) table,
3173 3174 3175 3176 3177
              table && !table->s->tmp_table ? "is NOT" : "is"));
  DBUG_PRINT("info",
             ("Table %s prior to executing this statement",
              get_create_info()->table_existed ? "existed" : "did not exist"));

3178
  /*
3179 3180 3181 3182 3183 3184 3185 3186 3187
    This will execute any rollbacks that are necessary before writing
    the transcation cache.

    We disable the binary log since nothing should be written to the
    binary log.  This disabling is important, since we potentially do
    a "roll back" of non-transactional tables by removing the table,
    and the actual rollback might generate events that should not be
    written to the binary log.

3188 3189 3190 3191
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
3192 3193

  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3194 3195
}

unknown's avatar
unknown committed
3196

unknown's avatar
unknown committed
3197 3198 3199 3200 3201 3202 3203
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
3204 3205 3206 3207 3208 3209 3210 3211
    /*
      Do an implicit commit at end of statement for non-temporary
      tables.  This can fail, but we should unlock the table
      nevertheless.
    */
    if (!table->s->tmp_table)
      ha_commit(thd);               // Can fail, but we proceed anyway

unknown's avatar
unknown committed
3212
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3213
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3214
    VOID(pthread_mutex_lock(&LOCK_open));
3215
    mysql_unlock_tables(thd, thd->extra_lock);
3216
    if (!table->s->tmp_table)
3217
    {
3218
      if (close_thread_table(thd, &table))
3219
        broadcast_refresh();
3220
    }
3221
    thd->extra_lock=0;
unknown's avatar
unknown committed
3222
    table=0;
unknown's avatar
unknown committed
3223 3224 3225 3226 3227 3228 3229
    VOID(pthread_mutex_unlock(&LOCK_open));
  }
  return tmp;
}

void select_create::abort()
{
3230
  DBUG_ENTER("select_create::abort");
unknown's avatar
unknown committed
3231
  VOID(pthread_mutex_lock(&LOCK_open));
3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248

  /*
    We roll back the statement, including truncating the transaction
    cache of the binary log, if the statement failed.

    We roll back the statement prior to deleting the table and prior
    to releasing the lock on the table, since there might be potential
    for failure if the rollback is executed after the drop or after
    unlocking the table.

    We also roll back the statement regardless of whether the creation
    of the table succeeded or not, since we need to reset the binary
    log state.
  */
  if (thd->current_stmt_binlog_row_based)
    ha_rollback_stmt(thd);

3249
  if (thd->extra_lock)
unknown's avatar
unknown committed
3250
  {
3251 3252
    mysql_unlock_tables(thd, thd->extra_lock);
    thd->extra_lock=0;
unknown's avatar
unknown committed
3253
  }
3254

unknown's avatar
unknown committed
3255 3256
  if (table)
  {
unknown's avatar
unknown committed
3257
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3258
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3259
    handlerton *table_type=table->s->db_type;
3260
    if (!table->s->tmp_table)
3261
    {
3262
      ulong version= table->s->version;
unknown's avatar
unknown committed
3263
      table->s->version= 0;
3264
      hash_delete(&open_cache,(byte*) table);
3265
      if (!create_info->table_existed)
3266 3267
        quick_rm_table(table_type, create_table->db,
                       create_table->table_name, 0);
3268
      /* Tell threads waiting for refresh that something has happened */
unknown's avatar
unknown committed
3269
      if (version != refresh_version)
3270
        broadcast_refresh();
3271 3272
    }
    else if (!create_info->table_existed)
unknown's avatar
unknown committed
3273 3274
      close_temporary_table(thd, table, 1, 1);
    table=0;                                    // Safety
unknown's avatar
unknown committed
3275 3276
  }
  VOID(pthread_mutex_unlock(&LOCK_open));
3277
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3278 3279 3280 3281
}


/*****************************************************************************
unknown's avatar
unknown committed
3282
  Instansiate templates
unknown's avatar
unknown committed
3283 3284
*****************************************************************************/

3285
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
3286
template class List_iterator_fast<List_item>;
3287
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
3288 3289 3290
template class I_List<delayed_insert>;
template class I_List_iterator<delayed_insert>;
template class I_List<delayed_row>;
3291
#endif /* EMBEDDED_LIBRARY */
3292
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */