sql_insert.cc 132 KB
Newer Older
Marc Alff's avatar
Marc Alff committed
1
/* Copyright 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
   the Free Software Foundation; version 2 of the License.
unknown's avatar
unknown committed
6

unknown's avatar
unknown committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
11

unknown's avatar
unknown committed
12 13 14 15 16 17 18
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

19 20 21 22 23 24 25 26 27 28 29 30 31
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
32
  Delayed_insert::get_local_table() the table of the thread is copied
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

unknown's avatar
unknown committed
57
#include "mysql_priv.h"
58 59
#include "sp_head.h"
#include "sql_trigger.h"
60
#include "sql_select.h"
61
#include "sql_show.h"
62
#include "slave.h"
63
#include "rpl_mi.h"
Konstantin Osipov's avatar
Konstantin Osipov committed
64
#include "transaction.h"
65
#include "sql_audit.h"
unknown's avatar
unknown committed
66

unknown's avatar
unknown committed
67
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
68
static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
unknown's avatar
unknown committed
69
static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
70
                         LEX_STRING query, bool ignore, bool log_on);
unknown's avatar
unknown committed
71
static void end_delayed_insert(THD *thd);
72
pthread_handler_t handle_delayed_insert(void *arg);
unknown's avatar
unknown committed
73
static void unlink_blobs(register TABLE *table);
unknown's avatar
unknown committed
74
#endif
75
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
unknown's avatar
unknown committed
76 77 78 79 80 81 82 83 84 85 86

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
/*
  Check that insert/update fields are from the same single table of a view.

  SYNOPSIS
    check_view_single_update()
    fields            The insert/update fields to be checked.
    view              The view for insert.
    map     [in/out]  The insert table map.

  DESCRIPTION
    This function is called in 2 cases:
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

  RETURN
    0   OK
    1   Error
*/

112 113
bool check_view_single_update(List<Item> &fields, List<Item> *values,
                              TABLE_LIST *view, table_map *map)
114 115 116 117 118 119 120 121 122 123
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

124 125 126 127 128 129 130 131 132 133 134
  if (values)
  {
    it.init(*values);
    while ((item= it++))
      tables|= item->used_tables();
  }

  /* Convert to real table bits */
  tables&= ~PSEUDO_TABLE_BITS;


135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

  view->table= tbl->table;
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

157

unknown's avatar
unknown committed
158
/*
159
  Check if insert fields are correct.
160 161 162 163 164 165 166

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
unknown's avatar
unknown committed
167
    check_unique                If duplicate values should be rejected.
168 169 170 171 172 173 174 175 176

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
unknown's avatar
unknown committed
177 178
*/

unknown's avatar
unknown committed
179 180
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
181 182 183
                               bool check_unique,
                               bool fields_and_values_from_different_maps,
                               table_map *map)
unknown's avatar
unknown committed
184
{
unknown's avatar
VIEW  
unknown committed
185
  TABLE *table= table_list->table;
186

187 188
  if (!table_list->updatable)
  {
189
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
190 191 192
    return -1;
  }

unknown's avatar
unknown committed
193 194
  if (fields.elements == 0 && values.elements != 0)
  {
195 196 197 198 199 200
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
201
    if (values.elements != table->s->fields)
unknown's avatar
unknown committed
202
    {
unknown's avatar
unknown committed
203
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
204 205
      return -1;
    }
unknown's avatar
unknown committed
206
#ifndef NO_EMBEDDED_ACCESS_CHECKS
207 208 209
    Field_iterator_table_ref field_it;
    field_it.set(table_list);
    if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
210
      return -1;
unknown's avatar
unknown committed
211
#endif
212 213
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
214 215 216 217
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
218
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
219 220 221
  }
  else
  {						// Part field list
unknown's avatar
unknown committed
222 223
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
224
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
225
    int res;
unknown's avatar
unknown committed
226

unknown's avatar
unknown committed
227 228
    if (fields.elements != values.elements)
    {
unknown's avatar
unknown committed
229
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
230 231 232
      return -1;
    }

233
    thd->dup_field= 0;
unknown's avatar
unknown committed
234 235 236
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
237
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
238 239 240 241 242

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
243
    table_list->next_local= 0;
244
    context->resolve_in_table_list_only(table_list);
245
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
unknown's avatar
unknown committed
246 247

    /* Restore the current context. */
248
    ctx_state.restore_state(context, table_list);
249
    thd->lex->select_lex.no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
250

unknown's avatar
unknown committed
251
    if (res)
unknown's avatar
unknown committed
252
      return -1;
unknown's avatar
unknown committed
253

unknown's avatar
unknown committed
254
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
255
    {
256 257 258 259
      if (check_view_single_update(fields,
                                   fields_and_values_from_different_maps ?
                                   (List<Item>*) 0 : &values,
                                   table_list, map))
260
        return -1;
261
      table= table_list->table;
262
    }
263

264
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
265
    {
266
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
unknown's avatar
unknown committed
267 268
      return -1;
    }
269 270 271 272 273 274 275 276 277 278 279 280
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
unknown's avatar
unknown committed
281
  }
unknown's avatar
unknown committed
282
  // For the values we need select_priv
unknown's avatar
unknown committed
283
#ifndef NO_EMBEDDED_ACCESS_CHECKS
284
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
unknown committed
285
#endif
286 287 288

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
289
       check_view_insertability(thd, table_list)))
290
  {
291
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
292 293 294
    return -1;
  }

unknown's avatar
unknown committed
295 296 297 298
  return 0;
}


299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

unknown's avatar
unknown committed
318
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
319 320
                               List<Item> &update_fields,
                               List<Item> &update_values, table_map *map)
321
{
unknown's avatar
unknown committed
322
  TABLE *table= insert_table_list->table;
Staale Smedseng's avatar
Staale Smedseng committed
323
  my_bool timestamp_mark= 0;
324

325 326
  if (table->timestamp_field)
  {
327 328 329 330 331 332
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
333 334
  }

335 336
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
337 338
    return -1;

339
  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
340 341
      check_view_single_update(update_fields, &update_values,
                               insert_table_list, map))
342 343
    return -1;

344 345 346
  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
347 348
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
349 350
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
351 352 353
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
354 355 356 357
  }
  return 0;
}

unknown's avatar
unknown committed
358
/*
359 360 361 362 363 364 365 366 367 368 369 370
  Prepare triggers  for INSERT-like statement.

  SYNOPSIS
    prepare_triggers_for_insert_stmt()
      table   Table to which insert will happen

  NOTE
    Prepare triggers for INSERT-like statement by marking fields
    used by triggers and inform handlers that batching of UPDATE/DELETE 
    cannot be done if there are BEFORE UPDATE/DELETE triggers.
*/

unknown's avatar
unknown committed
371
void prepare_triggers_for_insert_stmt(TABLE *table)
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
{
  if (table->triggers)
  {
    if (table->triggers->has_triggers(TRG_EVENT_DELETE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER DELETE triggers that might access to 
        subject table and therefore might need delete to be done 
        immediately. So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
    }
    if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER UPDATE triggers that might access to subject 
        table and therefore might need update to be done immediately. 
        So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
    }
  }
unknown's avatar
unknown committed
396
  table->mark_columns_needed_for_insert();
397 398
}

399

400 401 402 403 404 405 406 407
/**
  Upgrade table-level lock of INSERT statement to TL_WRITE if
  a more concurrent lock is infeasible for some reason. This is
  necessary for engines without internal locking support (MyISAM).
  An engine with internal locking implementation might later
  downgrade the lock in handler::store_lock() method.
*/

unknown's avatar
unknown committed
408
static
409 410 411 412 413
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
                       enum_duplicates duplic,
                       bool is_multi_insert)
{
  if (duplic == DUP_UPDATE ||
414
      (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
415
  {
416
    *lock_type= TL_WRITE_DEFAULT;
417 418 419 420 421 422 423
    return;
  }

  if (*lock_type == TL_WRITE_DELAYED)
  {
    /*
      We do not use delayed threads if:
unknown's avatar
unknown committed
424 425 426 427 428
      - we're running in the safe mode or skip-new mode -- the
        feature is disabled in these modes
      - we're executing this statement on a replication slave --
        we need to ensure serial execution of queries on the
        slave
429 430
      - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
        insert cannot be concurrent
unknown's avatar
unknown committed
431 432 433 434 435 436 437 438 439 440 441 442
      - this statement is directly or indirectly invoked from
        a stored function or trigger (under pre-locking) - to
        avoid deadlocks, since INSERT DELAYED involves a lock
        upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
        attempt while keeping other table level locks.
      - this statement itself may require pre-locking.
        We should upgrade the lock even though in most cases
        delayed functionality may work. Unfortunately, we can't
        easily identify whether the subject table is not used in
        the statement indirectly via a stored function or trigger:
        if it is used, that will lead to a deadlock between the
        client connection and the delayed thread.
443 444
    */
    if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
unknown's avatar
unknown committed
445
        thd->variables.max_insert_delayed_threads == 0 ||
Konstantin Osipov's avatar
Konstantin Osipov committed
446
        thd->locked_tables_mode > LTM_LOCK_TABLES ||
unknown's avatar
unknown committed
447
        thd->lex->uses_stored_routines())
448 449 450 451
    {
      *lock_type= TL_WRITE;
      return;
    }
452 453 454 455 456 457 458 459
    if (thd->slave_thread)
    {
      /* Try concurrent insert */
      *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
                  TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
      return;
    }

460
    bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG ||
461
                  ! (thd->security_ctx->master_access & SUPER_ACL));
462 463
    if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
        log_on && mysql_bin_log.is_open() && is_multi_insert)
464 465 466 467 468 469 470 471 472 473 474 475 476 477
    {
      /*
        Statement-based binary logging does not work in this case, because:
        a) two concurrent statements may have their rows intermixed in the
        queue, leading to autoincrement replication problems on slave (because
        the values generated used for one statement don't depend only on the
        value generated for the first row of this statement, so are not
        replicable)
        b) if first row of the statement has an error the full statement is
        not binlogged, while next rows of the statement may be inserted.
        c) if first row succeeds, statement is binlogged immediately with a
        zero error code (i.e. "no error"), if then second row fails, query
        will fail on slave too and slave will stop (wrongly believing that the
        master got no error).
478 479 480 481 482 483 484 485 486
        So we fallback to non-delayed INSERT.
        Note that to be fully correct, we should test the "binlog format which
        the delayed thread is going to use for this row". But in the common case
        where the global binlog format is not changed and the session binlog
        format may be changed, that is equal to the global binlog format.
        We test it without mutex for speed reasons (condition rarely true), and
        in the common case (global not changed) it is as good as without mutex;
        if global value is changed, anyway there is uncertainty as the delayed
        thread may be old and use the before-the-change value.
487 488 489 490 491 492 493
      */
      *lock_type= TL_WRITE;
    }
  }
}


unknown's avatar
unknown committed
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
/**
  Find or create a delayed insert thread for the first table in
  the table list, then open and lock the remaining tables.
  If a table can not be used with insert delayed, upgrade the lock
  and open and lock all tables using the standard mechanism.

  @param thd         thread context
  @param table_list  list of "descriptors" for tables referenced
                     directly in statement SQL text.
                     The first element in the list corresponds to
                     the destination table for inserts, remaining
                     tables, if any, are usually tables referenced
                     by sub-queries in the right part of the
                     INSERT.

  @return Status of the operation. In case of success 'table'
  member of every table_list element points to an instance of
  class TABLE.

  @sa open_and_lock_tables for more information about MySQL table
  level locking
*/

static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
  DBUG_ENTER("open_and_lock_for_insert_delayed");

#ifndef EMBEDDED_LIBRARY
523
  if (thd->locked_tables_mode && thd->global_read_lock.is_acquired())
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
  {
    /*
      If this connection has the global read lock, the handler thread
      will not be able to lock the table. It will wait for the global
      read lock to go away, but this will never happen since the
      connection thread will be stuck waiting for the handler thread
      to open and lock the table.
      If we are not in locked tables mode, INSERT will seek protection
      against the global read lock (and fail), thus we will only get
      to this point in locked tables mode.
    */
    my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
    DBUG_RETURN(TRUE);
  }

unknown's avatar
unknown committed
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  if (delayed_get_table(thd, table_list))
    DBUG_RETURN(TRUE);

  if (table_list->table)
  {
    /*
      Open tables used for sub-selects or in stored functions, will also
      cache these functions.
    */
    if (open_and_lock_tables(thd, table_list->next_global))
    {
      end_delayed_insert(thd);
      DBUG_RETURN(TRUE);
    }
    /*
      First table was not processed by open_and_lock_tables(),
      we need to set updatability flag "by hand".
    */
    if (!table_list->derived && !table_list->view)
      table_list->updatable= 1;  // usual table
    DBUG_RETURN(FALSE);
  }
#endif
  /*
    * This is embedded library and we don't have auxiliary
    threads OR
    * a lock upgrade was requested inside delayed_get_table
      because
      - there are too many delayed insert threads OR
      - the table has triggers.
    Use a normal insert.
  */
  table_list->lock_type= TL_WRITE;
  DBUG_RETURN(open_and_lock_tables(thd, table_list));
}


576 577
/**
  INSERT statement implementation
578 579 580 581

  @note Like implementations of other DDL/DML in MySQL, this function
  relies on the caller to close the thread tables. This is done in the
  end of dispatch_command().
582 583
*/

unknown's avatar
unknown committed
584 585 586 587 588
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
589 590
                  enum_duplicates duplic,
		  bool ignore)
unknown's avatar
unknown committed
591
{
592
  int error, res;
593
  bool transactional_table, joins_freed= FALSE;
594
  bool changed;
595
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
unknown's avatar
unknown committed
596 597 598 599
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
600
  TABLE *table= 0;
unknown's avatar
unknown committed
601
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
602
  List_item *values;
unknown's avatar
unknown committed
603
  Name_resolution_context *context;
604
  Name_resolution_context_state ctx_state;
605
#ifndef EMBEDDED_LIBRARY
606
  char *query= thd->query();
unknown's avatar
unknown committed
607 608 609 610 611
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
612
  bool log_on= ((thd->variables.option_bits & OPTION_BIN_LOG) ||
unknown's avatar
unknown committed
613
                (!(thd->security_ctx->master_access & SUPER_ACL)));
unknown's avatar
unknown committed
614
#endif
615
  thr_lock_type lock_type;
unknown's avatar
unknown committed
616
  Item *unused_conds= 0;
unknown's avatar
unknown committed
617 618
  DBUG_ENTER("mysql_insert");

619
  /*
620 621 622 623 624 625 626 627 628 629 630 631
    Upgrade lock type if the requested lock is incompatible with
    the current connection mode or table operation.
  */
  upgrade_lock_type(thd, &table_list->lock_type, duplic,
                    values_list.elements > 1);

  /*
    We can't write-delayed into a table locked with LOCK TABLES:
    this will lead to a deadlock, since the delayed thread will
    never be able to get a lock on the table. QQQ: why not
    upgrade the lock here instead?
  */
Konstantin Osipov's avatar
Konstantin Osipov committed
632 633
  if (table_list->lock_type == TL_WRITE_DELAYED &&
      thd->locked_tables_mode &&
634 635
      find_locked_table(thd->open_tables, table_list->db,
                        table_list->table_name))
636
  {
637 638 639
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
             table_list->table_name);
    DBUG_RETURN(TRUE);
640
  }
unknown's avatar
unknown committed
641

unknown's avatar
unknown committed
642
  if (table_list->lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
643
  {
unknown's avatar
unknown committed
644 645
    if (open_and_lock_for_insert_delayed(thd, table_list))
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
646 647
  }
  else
unknown's avatar
unknown committed
648 649 650 651
  {
    if (open_and_lock_tables(thd, table_list))
      DBUG_RETURN(TRUE);
  }
652
  lock_type= table_list->lock_type;
653

654
  thd_proc_info(thd, "init");
655
  thd->used_tables=0;
unknown's avatar
unknown committed
656
  values= its++;
unknown's avatar
unknown committed
657
  value_count= values->elements;
unknown's avatar
unknown committed
658

unknown's avatar
VIEW  
unknown committed
659
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
unknown's avatar
unknown committed
660
			   update_fields, update_values, duplic, &unused_conds,
unknown's avatar
unknown committed
661
                           FALSE,
662 663
                           (fields.elements || !value_count ||
                            table_list->view != 0),
unknown's avatar
unknown committed
664 665 666
                           !ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES))))
unknown's avatar
unknown committed
667
    goto abort;
668

669 670 671
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

unknown's avatar
unknown committed
672
  context= &thd->lex->select_lex.context;
673 674 675 676 677 678 679 680 681
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

unknown's avatar
unknown committed
682
  /* Save the state of the current name resolution context. */
683
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
684 685 686 687 688

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
689
  table_list->next_local= 0;
unknown's avatar
unknown committed
690 691
  context->resolve_in_table_list_only(table_list);

692
  while ((values= its++))
unknown's avatar
unknown committed
693 694 695 696
  {
    counter++;
    if (values->elements != value_count)
    {
697
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
unknown's avatar
unknown committed
698 699
      goto abort;
    }
700
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
701 702 703
      goto abort;
  }
  its.rewind ();
unknown's avatar
unknown committed
704 705
 
  /* Restore the current context. */
706
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
707

unknown's avatar
unknown committed
708
  /*
unknown's avatar
unknown committed
709
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
710
  */
711
  bzero((char*) &info,sizeof(info));
712
  info.ignore= ignore;
unknown's avatar
unknown committed
713
  info.handle_duplicates=duplic;
714 715
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
716
  info.view= (table_list->view ? table_list : 0);
717

718 719 720
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
721
    to NULL.
722
  */
unknown's avatar
unknown committed
723
  thd->count_cuted_fields= ((values_list.elements == 1 &&
unknown's avatar
unknown committed
724
                             !ignore) ?
725 726
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
unknown's avatar
unknown committed
727 728 729
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

730 731 732 733
#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
734
      rpl_master_has_bug(&active_mi->rli, 24432, TRUE, NULL, NULL))
735 736 737
    goto abort;
#endif

unknown's avatar
unknown committed
738
  error=0;
739
  thd_proc_info(thd, "update");
740 741 742
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
743 744
  if (duplic == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
745 746 747 748
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
749 750 751 752
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
753 754 755 756
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
757
#ifndef EMBEDDED_LIBRARY
758
  if (lock_type != TL_WRITE_DELAYED)
759 760 761 762
#endif /* EMBEDDED_LIBRARY */
  {
    if (duplic != DUP_ERROR || ignore)
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
Konstantin Osipov's avatar
Konstantin Osipov committed
763 764 765 766 767 768 769 770
    /**
      This is a simple check for the case when the table has a trigger
      that reads from it, or when the statement invokes a stored function
      that reads from the table being inserted to.
      Engines can't handle a bulk insert in parallel with a read form the
      same table in the same connection.
    */
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
771
      table->file->ha_start_bulk_insert(values_list.elements);
772
  }
773

unknown's avatar
unknown committed
774 775 776
  thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES)));
unknown's avatar
unknown committed
777

unknown's avatar
unknown committed
778 779
  prepare_triggers_for_insert_stmt(table);

780

781 782 783 784
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

unknown's avatar
unknown committed
785
  while ((values= its++))
unknown's avatar
unknown committed
786 787 788
  {
    if (fields.elements || !value_count)
    {
789
      restore_record(table,s->default_values);	// Get empty record
unknown's avatar
unknown committed
790 791 792
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
793
      {
794
	if (values_list.elements != 1 && ! thd->is_error())
unknown's avatar
unknown committed
795 796 797 798
	{
	  info.records++;
	  continue;
	}
unknown's avatar
unknown committed
799 800 801 802 803
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
unknown's avatar
unknown committed
804 805 806 807 808 809
	error=1;
	break;
      }
    }
    else
    {
810
      if (thd->used_tables)			// Column used in values()
811
	restore_record(table,s->default_values);	// Get empty record
812
      else
unknown's avatar
unknown committed
813
      {
814 815
        TABLE_SHARE *share= table->s;

unknown's avatar
unknown committed
816 817 818 819 820
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
821 822 823 824 825 826 827 828
        table->record[0][0]= share->default_values[0];

        /* Fix undefined null_bits. */
        if (share->null_bytes > 1 && share->last_null_bit_pos)
        {
          table->record[0][share->null_bytes - 1]= 
            share->default_values[share->null_bytes - 1];
        }
unknown's avatar
unknown committed
829 830 831 832
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
833
      {
834
	if (values_list.elements != 1 && ! thd->is_error())
unknown's avatar
unknown committed
835 836 837 838 839 840 841 842
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
843

844 845 846
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
847
					     ignore))) ==
unknown's avatar
unknown committed
848 849 850
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
851
    {
unknown's avatar
unknown committed
852 853
      error= 1;
      break;
unknown's avatar
unknown committed
854
    }
855
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
856
    if (lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
857
    {
858
      LEX_STRING const st_query = { query, thd->query_length() };
859
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
unknown's avatar
unknown committed
860 861 862
      query=0;
    }
    else
863
#endif
unknown's avatar
unknown committed
864
      error=write_record(thd, table ,&info);
865 866
    if (error)
      break;
Marc Alff's avatar
Marc Alff committed
867
    thd->warning_info->inc_current_row_for_warning();
unknown's avatar
unknown committed
868
  }
869

870 871 872
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

873 874 875 876
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
877
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
878 879
  if (lock_type == TL_WRITE_DELAYED)
  {
880 881 882 883 884
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
885 886
  }
  else
887
#endif
unknown's avatar
unknown committed
888
  {
889 890 891 892 893
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
Konstantin Osipov's avatar
Konstantin Osipov committed
894 895
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
        table->file->ha_end_bulk_insert() && !error)
896
    {
unknown's avatar
unknown committed
897 898
      table->file->print_error(my_errno,MYF(0));
      error=1;
899
    }
900 901 902
    if (duplic != DUP_ERROR || ignore)
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);

903
    transactional_table= table->file->has_transactions();
unknown's avatar
unknown committed
904

905
    if ((changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
906
    {
907 908 909 910 911
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
912
      query_cache_invalidate3(thd, table_list, 1);
913
    }
914 915 916 917

    if (thd->transaction.stmt.modified_non_trans_table)
      thd->transaction.all.modified_non_trans_table= TRUE;

Staale Smedseng's avatar
Staale Smedseng committed
918 919 920
    if ((changed && error <= 0) ||
        thd->transaction.stmt.modified_non_trans_table ||
        was_insert_delayed)
921 922
    {
      if (mysql_bin_log.is_open())
923
      {
924
        int errcode= 0;
925
	if (error <= 0)
926
        {
927 928 929 930 931
	  /*
	    [Guilhem wrote] Temporary errors may have filled
	    thd->net.last_error/errno.  For example if there has
	    been a disk full error when writing the row, and it was
	    MyISAM, then thd->net.last_error/errno will be set to
Marc Alff's avatar
Marc Alff committed
932
            "disk full"... and the mysql_file_pwrite() will wait until free
933 934 935 936 937 938
	    space appears, and so when it finishes then the
	    write_row() was entirely successful
	  */
	  /* todo: consider removing */
	  thd->clear_error();
	}
939 940 941
        else
          errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
        
942 943 944 945 946 947 948 949 950 951 952 953 954 955
	/* bug#22725:

	A query which per-row-loop can not be interrupted with
	KILLED, like INSERT, and that does not invoke stored
	routines can be binlogged with neglecting the KILLED error.
        
	If there was no error (error == zero) until after the end of
	inserting loop the KILLED flag that appeared later can be
	disregarded since previously possible invocation of stored
	routines did not result in any error due to the KILLED.  In
	such case the flag is ignored for constructing binlog event.
	*/
	DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
	if (thd->binlog_query(THD::ROW_QUERY_TYPE,
956
			      thd->query(), thd->query_length(),
957 958
			      transactional_table, FALSE, FALSE,
                              errcode))
959
        {
960
	  error= 1;
961
	}
962
      }
unknown's avatar
unknown committed
963
    }
unknown's avatar
unknown committed
964 965
    DBUG_ASSERT(transactional_table || !changed || 
                thd->transaction.stmt.modified_non_trans_table);
unknown's avatar
unknown committed
966
  }
967
  thd_proc_info(thd, "end");
968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
unknown's avatar
unknown committed
984
  table->next_number_field=0;
985
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
986
  table->auto_increment_field_not_null= FALSE;
987 988 989
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
990

unknown's avatar
unknown committed
991 992
  if (error)
    goto abort;
993
  if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
unknown's avatar
unknown committed
994
				    !thd->cuted_fields))
995
  {
996 997 998
    thd->row_count_func= info.copied + info.deleted +
                         ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                          info.touched : info.updated);
999
    my_ok(thd, (ulong) thd->row_count_func, id);
1000
  }
1001 1002
  else
  {
unknown's avatar
unknown committed
1003
    char buff[160];
1004 1005
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                     info.touched : info.updated);
1006
    if (ignore)
1007 1008
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
Marc Alff's avatar
Marc Alff committed
1009 1010
	      (ulong) (info.records - info.copied),
              (ulong) thd->warning_info->statement_warn_count());
unknown's avatar
unknown committed
1011
    else
1012
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
Marc Alff's avatar
Marc Alff committed
1013 1014
	      (ulong) (info.deleted + updated),
              (ulong) thd->warning_info->statement_warn_count());
1015
    thd->row_count_func= info.copied + info.deleted + updated;
1016
    ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
1017
  }
1018
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
1019
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
1020 1021

abort:
1022
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
1023 1024
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
1025
#endif
1026
  if (table != NULL)
1027
    table->file->ha_release_auto_increment();
1028 1029
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
unknown's avatar
unknown committed
1030
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
1031
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
1032 1033 1034
}


unknown's avatar
VIEW  
unknown committed
1035 1036 1037 1038 1039
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
1040
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
1041 1042
    view    - reference on VIEW

1043 1044 1045 1046 1047 1048
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
1049 1050
  RETURN
    FALSE - OK
1051 1052 1053
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
1054 1055 1056
    TRUE  - can't be used for insert
*/

1057
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
unknown's avatar
VIEW  
unknown committed
1058
{
1059
  uint num= view->view->select_lex.item_list.elements;
unknown's avatar
VIEW  
unknown committed
1060
  TABLE *table= view->table;
1061 1062 1063
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
unknown committed
1064
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
unknown's avatar
unknown committed
1065
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1066
  MY_BITMAP used_fields;
unknown's avatar
unknown committed
1067
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
1068 1069
  DBUG_ENTER("check_key_in_view");

1070 1071 1072
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
1073 1074
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

Konstantin Osipov's avatar
Konstantin Osipov committed
1075
  (void) bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
1076 1077
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
1078
  view->contain_auto_increment= 0;
1079 1080 1081 1082
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
unknown's avatar
unknown committed
1083
  thd->mark_used_columns= MARK_COLUMNS_NONE;
unknown's avatar
VIEW  
unknown committed
1084
  /* check simplicity and prepare unique test of view */
1085
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
1086
  {
1087
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1088
    {
unknown's avatar
unknown committed
1089
      thd->mark_used_columns= save_mark_used_columns;
1090 1091
      DBUG_RETURN(TRUE);
    }
1092
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
1093
    /* simple SELECT list entry (field without expression) */
unknown's avatar
merge  
unknown committed
1094
    if (!(field= trans->item->filed_for_view_update()))
1095
    {
unknown's avatar
unknown committed
1096
      thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
1097
      DBUG_RETURN(TRUE);
1098
    }
1099
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
1100 1101
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
1102 1103 1104 1105 1106
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
1107
  }
unknown's avatar
unknown committed
1108
  thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
1109
  /* unique test */
1110
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
1111
  {
1112
    /* Thanks to test above, we know that all columns are of type Item_field */
1113
    Item_field *field= (Item_field *)trans->item;
1114 1115 1116
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
1117 1118 1119 1120 1121 1122 1123
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


unknown's avatar
unknown committed
1124
/*
1125
  Check if table can be updated
unknown's avatar
unknown committed
1126 1127

  SYNOPSIS
1128 1129
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
1130
     table_list		Table list
1131 1132
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
1133
     select_insert      Check is making for SELECT ... INSERT
1134 1135

   RETURN
unknown's avatar
unknown committed
1136 1137
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
1138
*/
1139

unknown's avatar
unknown committed
1140
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1141
                                             List<Item> &fields,
unknown's avatar
unknown committed
1142
                                             bool select_insert)
unknown's avatar
unknown committed
1143
{
unknown's avatar
VIEW  
unknown committed
1144
  bool insert_into_view= (table_list->view != 0);
1145
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
1146

1147 1148 1149 1150 1151 1152 1153
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

1154 1155
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
1156
                                    table_list,
1157
                                    &thd->lex->select_lex.leaf_tables,
1158
                                    select_insert, INSERT_ACL, SELECT_ACL))
unknown's avatar
unknown committed
1159
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
1160 1161 1162 1163

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
1164 1165 1166 1167
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
1168
      DBUG_RETURN(TRUE);
1169
    }
1170
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
1171 1172
  }

unknown's avatar
unknown committed
1173
  DBUG_RETURN(FALSE);
1174 1175 1176
}


1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189
/*
  Get extra info for tables we insert into

  @param table     table(TABLE object) we insert into,
                   might be NULL in case of view
  @param           table(TABLE_LIST object) or view we insert into
*/

static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
{
  if (table)
  {
    if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1190
      table->prepare_for_position();
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
    return;
  }

  DBUG_ASSERT(tables->view);
  List_iterator<TABLE_LIST> it(*tables->view_tables);
  TABLE_LIST *tbl;
  while ((tbl= it++))
    prepare_for_positional_update(tbl->table, tbl);

  return;
}


1204 1205 1206 1207 1208 1209 1210
/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
unknown's avatar
unknown committed
1211 1212
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
unknown's avatar
unknown committed
1213 1214
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
unknown's avatar
unknown committed
1215 1216 1217 1218
    check_fields        TRUE if need to check that all INSERT fields are 
                        given values.
    abort_on_warning    whether to report if some INSERT field is not 
                        assigned as an error (TRUE) or as a warning (FALSE).
1219

unknown's avatar
unknown committed
1220 1221 1222 1223 1224
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
1225

1226 1227 1228
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
unknown's avatar
unknown committed
1229
  
1230
  RETURN VALUE
unknown's avatar
unknown committed
1231 1232
    FALSE OK
    TRUE  error
1233 1234
*/

unknown's avatar
unknown committed
1235
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
unknown's avatar
unknown committed
1236
                          TABLE *table, List<Item> &fields, List_item *values,
unknown's avatar
unknown committed
1237
                          List<Item> &update_fields, List<Item> &update_values,
unknown's avatar
unknown committed
1238
                          enum_duplicates duplic,
unknown's avatar
unknown committed
1239 1240
                          COND **where, bool select_insert,
                          bool check_fields, bool abort_on_warning)
1241
{
unknown's avatar
unknown committed
1242
  SELECT_LEX *select_lex= &thd->lex->select_lex;
unknown's avatar
unknown committed
1243
  Name_resolution_context *context= &select_lex->context;
1244
  Name_resolution_context_state ctx_state;
1245
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
1246
  bool res= 0;
1247
  table_map map= 0;
1248
  DBUG_ENTER("mysql_prepare_insert");
unknown's avatar
unknown committed
1249 1250 1251
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
1252 1253
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
unknown's avatar
unknown committed
1254

1255 1256 1257 1258 1259 1260 1261
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
unknown's avatar
unknown committed
1262
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

unknown's avatar
unknown committed
1275
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1276 1277
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
1278
    if (table_list->set_insert_values(thd->mem_root))
unknown's avatar
unknown committed
1279
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
1280
  }
unknown's avatar
unknown committed
1281

1282
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
unknown's avatar
unknown committed
1283
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
1284

unknown's avatar
unknown committed
1285 1286

  /* Prepare the fields in the statement. */
1287
  if (values)
unknown's avatar
unknown committed
1288
  {
1289 1290 1291 1292 1293 1294
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

1295
    /*
1296 1297 1298 1299 1300 1301
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

1302 1303 1304
    res= (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0) ||
          check_insert_fields(thd, context->table_list, fields, *values,
                              !insert_into_view, 0, &map));
unknown's avatar
unknown committed
1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316

    if (!res && check_fields)
    {
      bool saved_abort_on_warning= thd->abort_on_warning;
      thd->abort_on_warning= abort_on_warning;
      res= check_that_all_fields_are_given_values(thd, 
                                                  table ? table : 
                                                  context->table_list->table,
                                                  context->table_list);
      thd->abort_on_warning= saved_abort_on_warning;
    }

1317 1318 1319
   if (!res)
     res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);

unknown's avatar
unknown committed
1320
    if (!res && duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1321
    {
1322
      select_lex->no_wrap_view_item= TRUE;
1323 1324
      res= check_update_fields(thd, context->table_list, update_fields,
                               update_values, &map);
1325
      select_lex->no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
1326
    }
1327 1328 1329

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
1330
  }
unknown's avatar
unknown committed
1331

unknown's avatar
unknown committed
1332 1333
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
1334

unknown's avatar
unknown committed
1335 1336 1337
  if (!table)
    table= table_list->table;

1338
  if (!select_insert)
unknown's avatar
unknown committed
1339
  {
1340
    Item *fake_conds= 0;
1341
    TABLE_LIST *duplicate;
1342
    if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1343
    {
1344
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1345 1346
      DBUG_RETURN(TRUE);
    }
1347
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
unknown's avatar
unknown committed
1348
    select_lex->first_execution= 0;
unknown's avatar
unknown committed
1349
  }
1350
  /*
1351
    Only call prepare_for_posistion() if we are not performing a DELAYED
1352 1353
    operation. It will instead be executed by delayed insert thread.
  */
1354 1355
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
    prepare_for_positional_update(table, table_list);
unknown's avatar
unknown committed
1356
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
1357 1358 1359
}


unknown's avatar
unknown committed
1360 1361 1362 1363
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
  /*
    When an underlying storage engine informs that the unique key
    conflicts are not reported in the ascending order by setting
    the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
    information to determine the last key conflict.
   
    The information about the last key conflict will be used to
    do a replace of the new row on the conflicting row, rather
    than doing a delete (of old row) + insert (of new row).
   
    Hence check for this flag and disable replacing the last row
    by returning 0 always. Returning 0 will result in doing
    a delete + insert always.
  */
  if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
    return 0;

1381
  while (++keynr < table->s->keys)
unknown's avatar
unknown committed
1382 1383 1384 1385 1386 1387 1388
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
unknown's avatar
unknown committed
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
unknown's avatar
unknown committed
1399

unknown's avatar
unknown committed
1400 1401 1402 1403 1404
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
1405

unknown's avatar
unknown committed
1406
    Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
unknown's avatar
unknown committed
1407 1408 1409 1410 1411
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
1412 1413 1414
*/


unknown's avatar
unknown committed
1415
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
unknown's avatar
unknown committed
1416
{
unknown's avatar
unknown committed
1417
  int error, trg_error= 0;
unknown's avatar
unknown committed
1418
  char *key=0;
1419
  MY_BITMAP *save_read_set, *save_write_set;
1420 1421
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1422
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
1423

unknown's avatar
unknown committed
1424
  info->records++;
1425 1426
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1427

1428 1429
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1430
  {
1431
    while ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1432
    {
1433
      uint key_nr;
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1445
      bool is_duplicate_key_error;
1446
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1447
	goto err;
1448
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
unknown's avatar
unknown committed
1449 1450
      if (!is_duplicate_key_error)
      {
1451 1452 1453 1454 1455
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
unknown's avatar
unknown committed
1456
        if (info->ignore)
1457 1458
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
unknown's avatar
unknown committed
1459
      }
unknown's avatar
unknown committed
1460 1461
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1462
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
unknown's avatar
unknown committed
1463 1464
	goto err;
      }
1465 1466
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1467 1468 1469 1470 1471
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1472 1473
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1474
          key_nr == table->s->next_number_index &&
1475
	  (insert_id_for_cur_row > 0))
1476
	goto err;
1477
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
unknown's avatar
unknown committed
1478
      {
1479
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
unknown's avatar
unknown committed
1480 1481 1482 1483
	  goto err;
      }
      else
      {
1484
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
1485 1486 1487 1488
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
1489

unknown's avatar
unknown committed
1490 1491
	if (!key)
	{
1492
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
unknown's avatar
unknown committed
1493 1494 1495 1496 1497 1498
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1499
	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1500 1501 1502
	if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
                                                    (uchar*) key, HA_WHOLE_KEY,
                                                    HA_READ_KEY_EXACT))))
unknown's avatar
unknown committed
1503 1504
	  goto err;
      }
1505
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1506
      {
unknown's avatar
unknown committed
1507
        int res= 0;
unknown's avatar
unknown committed
1508 1509 1510 1511
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1512
        */
unknown's avatar
unknown committed
1513
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
1514 1515
        store_record(table,insert_values);
        restore_record(table,record[1]);
unknown's avatar
unknown committed
1516 1517
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
unknown's avatar
unknown committed
1518
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
unknown's avatar
unknown committed
1519 1520
                                                 *info->update_values,
                                                 info->ignore,
unknown's avatar
unknown committed
1521 1522 1523
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1524 1525

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
unknown's avatar
unknown committed
1526 1527 1528
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
unknown's avatar
unknown committed
1529
          goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1530
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1531
          goto before_trg_err;
1532

1533
        table->file->restore_auto_increment(prev_insert_id);
1534 1535 1536 1537
        if (table->next_number_field)
          table->file->adjust_next_insert_id_after_explicit_value(
            table->next_number_field->val_int());
        info->touched++;
1538 1539
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
             !bitmap_is_subset(table->write_set, table->read_set)) ||
unknown's avatar
unknown committed
1540
            compare_record(table))
1541
        {
1542
          if ((error=table->file->ha_update_row(table->record[1],
1543 1544
                                                table->record[0])) &&
              error != HA_ERR_RECORD_IS_THE_SAME)
1545
          {
1546 1547
            if (info->ignore &&
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1548 1549 1550 1551
            {
              goto ok_or_after_trg_err;
            }
            goto err;
1552
          }
unknown's avatar
unknown committed
1553

1554 1555 1556 1557
          if (error != HA_ERR_RECORD_IS_THE_SAME)
            info->updated++;
          else
            error= 0;
unknown's avatar
unknown committed
1558 1559 1560 1561 1562 1563 1564 1565
          /*
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
            like a regular UPDATE statement: it should not affect the value of a
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
            handled separately by THD::arg_of_last_insert_id_function.
          */
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
unknown's avatar
unknown committed
1566 1567
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
unknown's avatar
unknown committed
1568
                                                        TRG_ACTION_AFTER, TRUE));
1569 1570
          info->copied++;
        }
1571

1572 1573 1574 1575 1576
        if (table->next_number_field)
          table->file->adjust_next_insert_id_after_explicit_value(
            table->next_number_field->val_int());
        info->touched++;

unknown's avatar
unknown committed
1577
        goto ok_or_after_trg_err;
1578 1579 1580
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
1581 1582 1583 1584 1585
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1586 1587
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1588 1589 1590 1591 1592 1593
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
unknown's avatar
unknown committed
1594 1595
	*/
	if (last_uniq_key(table,key_nr) &&
1596
	    !table->file->referenced_by_foreign_key() &&
1597
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1598 1599
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1600
        {
1601
          if ((error=table->file->ha_update_row(table->record[1],
1602 1603
					        table->record[0])) &&
              error != HA_ERR_RECORD_IS_THE_SAME)
1604
            goto err;
1605 1606 1607 1608
          if (error != HA_ERR_RECORD_IS_THE_SAME)
            info->deleted++;
          else
            error= 0;
1609
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1610 1611 1612 1613 1614
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
1615 1616 1617 1618 1619 1620 1621
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1622
          if ((error=table->file->ha_delete_row(table->record[1])))
unknown's avatar
unknown committed
1623 1624 1625
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
unknown's avatar
unknown committed
1626
            thd->transaction.stmt.modified_non_trans_table= TRUE;
unknown's avatar
unknown committed
1627 1628 1629 1630 1631 1632 1633 1634
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1635
        }
unknown's avatar
unknown committed
1636 1637
      }
    }
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
    
    /*
        If more than one iteration of the above while loop is done, from the second 
        one the row being inserted will have an explicit value in the autoinc field, 
        which was set at the first call of handler::update_auto_increment(). This 
        value is saved to avoid thd->insert_id_for_cur_row becoming 0. Use this saved
        autoinc value.
     */
    if (table->file->insert_id_for_cur_row == 0)
      table->file->insert_id_for_cur_row= insert_id_for_cur_row;
      
1649
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1650 1651 1652 1653 1654 1655 1656
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
1657
  }
1658
  else if ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1659
  {
1660
    if (!info->ignore ||
1661
        table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1662
      goto err;
1663
    table->file->restore_auto_increment(prev_insert_id);
1664
    goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1665
  }
1666 1667 1668

after_trg_n_copied_inc:
  info->copied++;
1669
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1670 1671 1672
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
1673 1674

ok_or_after_trg_err:
unknown's avatar
unknown committed
1675
  if (key)
1676
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
unknown's avatar
unknown committed
1677
  if (!table->file->has_transactions())
unknown's avatar
unknown committed
1678
    thd->transaction.stmt.modified_non_trans_table= TRUE;
unknown's avatar
unknown committed
1679
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
1680 1681

err:
1682
  info->last_errno= error;
1683 1684 1685
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
unknown's avatar
unknown committed
1686
  table->file->print_error(error,MYF(0));
1687
  
unknown's avatar
unknown committed
1688
before_trg_err:
1689
  table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1690 1691
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1692
  table->column_bitmaps_set(save_read_set, save_write_set);
1693
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1694 1695 1696 1697
}


/******************************************************************************
unknown's avatar
unknown committed
1698
  Check that all fields with arn't null_fields are used
unknown's avatar
unknown committed
1699 1700
******************************************************************************/

1701 1702
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
unknown's avatar
unknown committed
1703
{
1704
  int err= 0;
1705 1706
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
1707 1708
  for (Field **field=entry->field ; *field ; field++)
  {
1709
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1710
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1711
        ((*field)->real_type() != MYSQL_TYPE_ENUM))
unknown's avatar
unknown committed
1712
    {
1713 1714 1715
      bool view= FALSE;
      if (table_list)
      {
unknown's avatar
unknown committed
1716
        table_list= table_list->top_table();
unknown's avatar
unknown committed
1717
        view= test(table_list->view);
1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1734
      err= 1;
unknown's avatar
unknown committed
1735 1736
    }
  }
1737
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
1738 1739 1740
}

/*****************************************************************************
unknown's avatar
unknown committed
1741 1742
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
1743 1744
*****************************************************************************/

1745 1746
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
1747 1748
class delayed_row :public ilink {
public:
1749
  char *record;
unknown's avatar
unknown committed
1750 1751
  enum_duplicates dup;
  time_t start_time;
1752 1753
  ulong sql_mode;
  bool auto_increment_field_not_null;
1754 1755 1756
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1757
  ulonglong forced_insert_id;
1758 1759
  ulong auto_increment_increment;
  ulong auto_increment_offset;
1760
  timestamp_auto_set_type timestamp_field_type;
1761
  LEX_STRING query;
1762
  Time_zone *time_zone;
unknown's avatar
unknown committed
1763

1764 1765 1766
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
1767
      forced_insert_id(0), query(query_arg), time_zone(0)
1768
    {}
unknown's avatar
unknown committed
1769 1770
  ~delayed_row()
  {
1771
    x_free(query.str);
unknown's avatar
unknown committed
1772 1773 1774 1775
    x_free(record);
  }
};

1776
/**
1777
  Delayed_insert - context of a thread responsible for delayed insert
1778 1779 1780 1781
  into one table. When processing delayed inserts, we create an own
  thread for every distinct table. Later on all delayed inserts directed
  into that table are handled by a dedicated thread.
*/
unknown's avatar
unknown committed
1782

1783
class Delayed_insert :public ilink {
unknown's avatar
unknown committed
1784
  uint locks_in_memory;
1785
  thr_lock_type delayed_lock;
unknown's avatar
unknown committed
1786 1787 1788
public:
  THD thd;
  TABLE *table;
Marc Alff's avatar
Marc Alff committed
1789 1790
  mysql_mutex_t mutex;
  mysql_cond_t cond, cond_client;
unknown's avatar
unknown committed
1791
  volatile uint tables_in_use,stacked_inserts;
1792
  volatile bool status;
unknown's avatar
unknown committed
1793 1794
  COPY_INFO info;
  I_List<delayed_row> rows;
1795
  ulong group_count;
unknown's avatar
unknown committed
1796
  TABLE_LIST table_list;			// Argument
unknown's avatar
unknown committed
1797

1798
  Delayed_insert()
1799
    :locks_in_memory(0),
1800
     table(0),tables_in_use(0),stacked_inserts(0), status(0), group_count(0)
unknown's avatar
unknown committed
1801
  {
1802
    DBUG_ENTER("Delayed_insert constructor");
1803 1804
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
unknown's avatar
unknown committed
1805 1806 1807
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1808 1809
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1810
    /*
1811 1812 1813 1814 1815 1816 1817
      Statement-based replication of INSERT DELAYED has problems with
      RAND() and user variables, so in mixed mode we go to row-based.
      For normal commands, the unsafe flag is set at parse time.
      However, since the flag is a member of the THD object, of which
      the delayed_insert thread has its own copy, we must set the
      statement to unsafe here and explicitly set row logging mode.

1818
      @todo set_current_stmt_binlog_format_row_if_mixed should not be
1819 1820 1821 1822
      called by anything else than thd->decide_logging_format().  When
      we call set_current_blah here, none of the checks in
      decide_logging_format is made.  We should probably call
      thd->decide_logging_format() directly instead.  /Sven
1823
    */
1824
    thd.lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED);
1825
    thd.set_current_stmt_binlog_format_row_if_mixed();
1826 1827 1828 1829 1830 1831
    /*
      Prevent changes to global.lock_wait_timeout from affecting
      delayed insert threads as any timeouts in delayed inserts
      are not communicated to the client.
    */
    thd.variables.lock_wait_timeout= LONG_TIMEOUT;
unknown's avatar
unknown committed
1832

1833 1834
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1835
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1836
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
1837
    bzero((char*) &info,sizeof(info));
Marc Alff's avatar
Marc Alff committed
1838 1839 1840
    mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
    mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
    mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
Marc Alff's avatar
Marc Alff committed
1841
    mysql_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
1842
    delayed_insert_threads++;
1843 1844
    delayed_lock= global_system_variables.low_priority_updates ?
                                          TL_WRITE_LOW_PRIORITY : TL_WRITE;
Marc Alff's avatar
Marc Alff committed
1845
    mysql_mutex_unlock(&LOCK_thread_count);
1846
    DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1847
  }
1848
  ~Delayed_insert()
unknown's avatar
unknown committed
1849
  {
1850
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
1851 1852 1853 1854 1855
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
Marc Alff's avatar
Marc Alff committed
1856
    mysql_mutex_lock(&LOCK_thread_count);
Marc Alff's avatar
Marc Alff committed
1857 1858 1859
    mysql_mutex_destroy(&mutex);
    mysql_cond_destroy(&cond);
    mysql_cond_destroy(&cond_client);
unknown's avatar
unknown committed
1860
    thd.unlink();				// Must be unlinked under lock
1861
    x_free(thd.query());
1862
    thd.security_ctx->user= thd.security_ctx->host=0;
unknown's avatar
unknown committed
1863 1864
    thread_count--;
    delayed_insert_threads--;
Marc Alff's avatar
Marc Alff committed
1865 1866
    mysql_mutex_unlock(&LOCK_thread_count);
    mysql_cond_broadcast(&COND_thread_count); /* Tell main we are ready */
unknown's avatar
unknown committed
1867 1868 1869 1870 1871 1872 1873 1874 1875
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
Marc Alff's avatar
Marc Alff committed
1876
    mysql_mutex_lock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
1877 1878
    if (!--locks_in_memory)
    {
Marc Alff's avatar
Marc Alff committed
1879
      mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
1880 1881
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
Marc Alff's avatar
Marc Alff committed
1882
        mysql_cond_signal(&cond);
unknown's avatar
unknown committed
1883 1884
	status=1;
      }
Marc Alff's avatar
Marc Alff committed
1885
      mysql_mutex_unlock(&mutex);
unknown's avatar
unknown committed
1886
    }
Marc Alff's avatar
Marc Alff committed
1887
    mysql_mutex_unlock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
1888 1889 1890 1891
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
Konstantin Osipov's avatar
Konstantin Osipov committed
1892
  bool open_and_lock_table();
unknown's avatar
unknown committed
1893 1894 1895 1896
  bool handle_inserts(void);
};


1897
I_List<Delayed_insert> delayed_threads;
unknown's avatar
unknown committed
1898 1899


1900 1901 1902 1903
/**
  Return an instance of delayed insert thread that can handle
  inserts into a given table, if it exists. Otherwise return NULL.
*/
unknown's avatar
unknown committed
1904

1905
static
1906
Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
unknown's avatar
unknown committed
1907
{
1908
  thd_proc_info(thd, "waiting for delay_list");
Marc Alff's avatar
Marc Alff committed
1909
  mysql_mutex_lock(&LOCK_delayed_insert);       // Protect master list
1910
  I_List_iterator<Delayed_insert> it(delayed_threads);
1911 1912
  Delayed_insert *di;
  while ((di= it++))
unknown's avatar
unknown committed
1913
  {
1914 1915
    if (!strcmp(table_list->db, di->table_list.db) &&
	!strcmp(table_list->table_name, di->table_list.table_name))
unknown's avatar
unknown committed
1916
    {
1917
      di->lock();
unknown's avatar
unknown committed
1918 1919 1920
      break;
    }
  }
Marc Alff's avatar
Marc Alff committed
1921
  mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
1922
  return di;
unknown's avatar
unknown committed
1923 1924 1925
}


1926 1927 1928 1929
/**
  Attempt to find or create a delayed insert thread to handle inserts
  into this table.

unknown's avatar
unknown committed
1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947
  @return In case of success, table_list->table points to a local copy
          of the delayed table or is set to NULL, which indicates a
          request for lock upgrade. In case of failure, value of
          table_list->table is undefined.
  @retval TRUE  - this thread ran out of resources OR
                - a newly created delayed insert thread ran out of
                  resources OR
                - the created thread failed to open and lock the table
                  (e.g. because it does not exist) OR
                - the table opened in the created thread turned out to
                  be a view
  @retval FALSE - table successfully opened OR
                - too many delayed insert threads OR
                - the table has triggers and we have to fall back to
                  a normal INSERT
                Two latter cases indicate a request for lock upgrade.

  XXX: why do we regard INSERT DELAYED into a view as an error and
1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965
  do not simply perform a lock upgrade?

  TODO: The approach with using two mutexes to work with the
  delayed thread list -- LOCK_delayed_insert and
  LOCK_delayed_create -- is redundant, and we only need one of
  them to protect the list.  The reason we have two locks is that
  we do not want to block look-ups in the list while we're waiting
  for the newly created thread to open the delayed table. However,
  this wait itself is redundant -- we always call get_local_table
  later on, and there wait again until the created thread acquires
  a table lock.

  As is redundant the concept of locks_in_memory, since we already
  have another counter with similar semantics - tables_in_use,
  both of them are devoted to counting the number of producers for
  a given consumer (delayed insert thread), only at different
  stages of producer-consumer relationship.

1966 1967
  The 'status' variable in Delayed_insert is redundant
  too, since there is already di->stacked_inserts.
1968 1969
*/

unknown's avatar
unknown committed
1970 1971
static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
unknown's avatar
unknown committed
1972 1973
{
  int error;
1974
  Delayed_insert *di;
unknown's avatar
unknown committed
1975 1976
  DBUG_ENTER("delayed_get_table");

unknown's avatar
unknown committed
1977 1978
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
unknown's avatar
unknown committed
1979

1980
  /* Find the thread which handles this table. */
1981
  if (!(di= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1982
  {
1983 1984 1985 1986
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1987
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1988
      DBUG_RETURN(0);
1989
    thd_proc_info(thd, "Creating delayed handler");
Marc Alff's avatar
Marc Alff committed
1990
    mysql_mutex_lock(&LOCK_delayed_create);
1991 1992 1993 1994
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
1995
    if (! (di= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1996
    {
1997
      if (!(di= new Delayed_insert()))
unknown's avatar
unknown committed
1998
        goto end_create;
Marc Alff's avatar
Marc Alff committed
1999
      mysql_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
2000
      thread_count++;
Marc Alff's avatar
Marc Alff committed
2001
      mysql_mutex_unlock(&LOCK_thread_count);
2002
      di->thd.set_db(table_list->db, (uint) strlen(table_list->db));
2003 2004
      di->thd.set_query(my_strdup(table_list->table_name,
                                  MYF(MY_WME | ME_FATALERROR)), 0);
2005
      if (di->thd.db == NULL || di->thd.query() == NULL)
unknown's avatar
unknown committed
2006
      {
unknown's avatar
unknown committed
2007
        /* The error is reported */
2008
	delete di;
unknown's avatar
unknown committed
2009
        goto end_create;
unknown's avatar
unknown committed
2010
      }
2011
      di->table_list= *table_list;			// Needed to open table
2012
      /* Replace volatile strings with local copies */
2013
      di->table_list.alias= di->table_list.table_name= di->thd.query();
2014 2015
      di->table_list.db= di->thd.db;
      di->lock();
Marc Alff's avatar
Marc Alff committed
2016 2017 2018 2019
      mysql_mutex_lock(&di->mutex);
      if ((error= mysql_thread_create(key_thread_delayed_insert,
                                      &di->thd.real_id, &connection_attrib,
                                      handle_delayed_insert, (void*) di)))
unknown's avatar
unknown committed
2020 2021 2022 2023
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
Marc Alff's avatar
Marc Alff committed
2024
        mysql_mutex_unlock(&di->mutex);
2025 2026
	di->unlock();
	delete di;
2027
	my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error);
unknown's avatar
unknown committed
2028
        goto end_create;
unknown's avatar
unknown committed
2029 2030 2031
      }

      /* Wait until table is open */
2032
      thd_proc_info(thd, "waiting for handler open");
2033
      while (!di->thd.killed && !di->table && !thd->killed)
unknown's avatar
unknown committed
2034
      {
Marc Alff's avatar
Marc Alff committed
2035
        mysql_cond_wait(&di->cond_client, &di->mutex);
unknown's avatar
unknown committed
2036
      }
Marc Alff's avatar
Marc Alff committed
2037
      mysql_mutex_unlock(&di->mutex);
2038
      thd_proc_info(thd, "got old table");
2039 2040 2041 2042 2043
      if (thd->killed)
      {
        di->unlock();
        goto end_create;
      }
2044
      if (di->thd.killed)
unknown's avatar
unknown committed
2045
      {
2046
        if (di->thd.is_error())
2047
        {
unknown's avatar
unknown committed
2048 2049 2050
          /*
            Copy the error message. Note that we don't treat fatal
            errors in the delayed thread as fatal errors in the
2051 2052 2053
            main thread. If delayed thread was killed, we don't
            want to send "Server shutdown in progress" in the
            INSERT THREAD.
unknown's avatar
unknown committed
2054
          */
2055 2056 2057 2058 2059 2060 2061
          if (di->thd.stmt_da->sql_errno() == ER_SERVER_SHUTDOWN)
            my_message(ER_QUERY_INTERRUPTED, ER(ER_QUERY_INTERRUPTED), MYF(0));
          else
            my_message(di->thd.stmt_da->sql_errno(), di->thd.stmt_da->message(),
                       MYF(0));
        }
        di->unlock();
unknown's avatar
unknown committed
2062
        goto end_create;
unknown's avatar
unknown committed
2063
      }
Marc Alff's avatar
Marc Alff committed
2064
      mysql_mutex_lock(&LOCK_delayed_insert);
2065
      delayed_threads.append(di);
Marc Alff's avatar
Marc Alff committed
2066
      mysql_mutex_unlock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2067
    }
Marc Alff's avatar
Marc Alff committed
2068
    mysql_mutex_unlock(&LOCK_delayed_create);
unknown's avatar
unknown committed
2069 2070
  }

Marc Alff's avatar
Marc Alff committed
2071
  mysql_mutex_lock(&di->mutex);
2072
  table_list->table= di->get_local_table(thd);
Marc Alff's avatar
Marc Alff committed
2073
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2074 2075
  if (table_list->table)
  {
2076
    DBUG_ASSERT(! thd->is_error());
2077
    thd->di= di;
unknown's avatar
unknown committed
2078
  }
2079
  /* Unlock the delayed insert object after its last access. */
2080
  di->unlock();
2081
  DBUG_RETURN((table_list->table == NULL));
2082

unknown's avatar
unknown committed
2083
end_create:
Marc Alff's avatar
Marc Alff committed
2084
  mysql_mutex_unlock(&LOCK_delayed_create);
2085
  DBUG_RETURN(thd->is_error());
unknown's avatar
unknown committed
2086 2087 2088
}


2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099
/**
  As we can't let many client threads modify the same TABLE
  structure of the dedicated delayed insert thread, we create an
  own structure for each client thread. This includes a row
  buffer to save the column values and new fields that point to
  the new row buffer. The memory is allocated in the client
  thread and is freed automatically.

  @pre This function is called from the client thread.  Delayed
       insert thread mutex must be acquired before invoking this
       function.
unknown's avatar
unknown committed
2100 2101 2102

  @return Not-NULL table object on success. NULL in case of an error,
                    which is set in client_thd.
unknown's avatar
unknown committed
2103 2104
*/

2105
TABLE *Delayed_insert::get_local_table(THD* client_thd)
unknown's avatar
unknown committed
2106 2107
{
  my_ptrdiff_t adjust_ptrs;
2108
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
2109
  TABLE *copy;
2110
  TABLE_SHARE *share;
2111
  uchar *bitmap;
2112
  DBUG_ENTER("Delayed_insert::get_local_table");
unknown's avatar
unknown committed
2113 2114 2115 2116 2117 2118

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
2119
    thd_proc_info(client_thd, "waiting for handler lock");
2120
    mysql_cond_signal(&cond);			// Tell handler to lock table
2121
    while (!thd.killed && !thd.lock && ! client_thd->killed)
unknown's avatar
unknown committed
2122
    {
Marc Alff's avatar
Marc Alff committed
2123
      mysql_cond_wait(&cond_client, &mutex);
unknown's avatar
unknown committed
2124
    }
2125
    thd_proc_info(client_thd, "got handler lock");
unknown's avatar
unknown committed
2126 2127
    if (client_thd->killed)
      goto error;
2128
    if (thd.killed)
unknown's avatar
unknown committed
2129
    {
2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143
      /*
        Copy the error message. Note that we don't treat fatal
        errors in the delayed thread as fatal errors in the
        main thread. If delayed thread was killed, we don't
        want to send "Server shutdown in progress" in the
        INSERT THREAD.

        The thread could be killed with an error message if
        di->handle_inserts() or di->open_and_lock_table() fails.
        The thread could be killed without an error message if
        killed using mysql_notify_thread_having_shared_lock() or
        kill_delayed_threads_for_table().
      */
      if (!thd.is_error() || thd.stmt_da->sql_errno() == ER_SERVER_SHUTDOWN)
2144 2145 2146
        my_message(ER_QUERY_INTERRUPTED, ER(ER_QUERY_INTERRUPTED), MYF(0));
      else
        my_message(thd.stmt_da->sql_errno(), thd.stmt_da->message(), MYF(0));
2147
      goto error;
unknown's avatar
unknown committed
2148 2149
    }
  }
2150
  share= table->s;
unknown's avatar
unknown committed
2151

2152 2153 2154 2155 2156 2157 2158
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
2159
  thd_proc_info(client_thd, "allocating local table");
unknown's avatar
unknown committed
2160
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
unknown's avatar
unknown committed
2161
				   (share->fields+1)*sizeof(Field**)+
2162 2163
				   share->reclength +
                                   share->column_bitmap_size*2);
unknown's avatar
unknown committed
2164 2165 2166
  if (!copy)
    goto error;

2167
  /* Copy the TABLE object. */
unknown's avatar
unknown committed
2168
  *copy= *table;
unknown's avatar
unknown committed
2169
  /* We don't need to change the file handler here */
2170 2171
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
2172
  bitmap= (uchar*) (field + share->fields + 1);
2173 2174
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2175 2176 2177 2178 2179 2180 2181 2182 2183 2184
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
unknown's avatar
unknown committed
2185
  {
2186
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
unknown's avatar
unknown committed
2187
      goto error;
2188
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
2189
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
2190 2191
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
2192 2193 2194 2195 2196 2197 2198 2199
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
unknown's avatar
unknown committed
2200
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
2201
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
2202
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
unknown's avatar
unknown committed
2203 2204
  }

2205 2206
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
2207 2208 2209 2210

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

2211 2212 2213 2214 2215 2216 2217 2218 2219
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

2220
  DBUG_RETURN(copy);
unknown's avatar
unknown committed
2221 2222 2223 2224 2225

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
Marc Alff's avatar
Marc Alff committed
2226
  mysql_cond_signal(&cond);                     // Inform thread about abort
2227
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2228 2229 2230 2231 2232
}


/* Put a question in queue */

unknown's avatar
unknown committed
2233
static
unknown's avatar
unknown committed
2234 2235
int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
                  LEX_STRING query, bool ignore, bool log_on)
unknown's avatar
unknown committed
2236
{
2237
  delayed_row *row= 0;
2238
  Delayed_insert *di=thd->di;
2239
  const Discrete_interval *forced_auto_inc;
unknown's avatar
unknown committed
2240
  DBUG_ENTER("write_delayed");
2241 2242
  DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
                       (ulong) query.length));
unknown's avatar
unknown committed
2243

2244
  thd_proc_info(thd, "waiting for handler insert");
Marc Alff's avatar
Marc Alff committed
2245
  mysql_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
2246
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
Marc Alff's avatar
Marc Alff committed
2247
    mysql_cond_wait(&di->cond_client, &di->mutex);
2248
  thd_proc_info(thd, "storing row into queue");
unknown's avatar
unknown committed
2249

2250
  if (thd->killed)
unknown's avatar
unknown committed
2251 2252
    goto err;

2253 2254 2255 2256 2257 2258 2259
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
2260 2261 2262
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2263
      goto err;
2264 2265
    query.str= str;
  }
2266 2267 2268 2269
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
unknown's avatar
unknown committed
2270
    goto err;
2271
  }
unknown's avatar
unknown committed
2272

2273
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
unknown's avatar
unknown committed
2274
    goto err;
2275
  memcpy(row->record, table->record[0], table->s->reclength);
unknown's avatar
unknown committed
2276 2277
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
2278 2279 2280 2281 2282 2283 2284 2285 2286 2287
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
2288
  row->timestamp_field_type=    table->timestamp_field_type;
unknown's avatar
unknown committed
2289

2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302
  /* Add session variable timezone
     Time_zone object will not be freed even the thread is ended.
     So we can get time_zone object from thread which handling delayed statement.
     See the comment of my_tz_find() for detail.
  */
  if (thd->time_zone_used)
  {
    row->time_zone = thd->variables.time_zone;
  }
  else
  {
    row->time_zone = NULL;
  }
2303
  /* Copy session variables. */
2304 2305
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
2306 2307 2308
  row->sql_mode=                 thd->variables.sql_mode;
  row->auto_increment_field_not_null= table->auto_increment_field_not_null;

2309 2310 2311 2312 2313 2314 2315
  /* Copy the next forced auto increment value, if any. */
  if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
  {
    row->forced_insert_id= forced_auto_inc->minimum();
    DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
                           (ulong) row->forced_insert_id));
  }
2316

unknown's avatar
unknown committed
2317 2318 2319
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
2320
  if (table->s->blob_fields)
unknown's avatar
unknown committed
2321
    unlink_blobs(table);
Marc Alff's avatar
Marc Alff committed
2322
  mysql_cond_signal(&di->cond);
unknown's avatar
unknown committed
2323 2324

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
Marc Alff's avatar
Marc Alff committed
2325
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2326 2327 2328 2329
  DBUG_RETURN(0);

 err:
  delete row;
Marc Alff's avatar
Marc Alff committed
2330
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2331 2332 2333
  DBUG_RETURN(1);
}

unknown's avatar
unknown committed
2334 2335 2336 2337
/**
  Signal the delayed insert thread that this user connection
  is finished using it for this statement.
*/
unknown's avatar
unknown committed
2338 2339 2340

static void end_delayed_insert(THD *thd)
{
2341
  DBUG_ENTER("end_delayed_insert");
2342
  Delayed_insert *di=thd->di;
Marc Alff's avatar
Marc Alff committed
2343
  mysql_mutex_lock(&di->mutex);
2344
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
2345 2346 2347
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
Marc Alff's avatar
Marc Alff committed
2348
    mysql_cond_signal(&di->cond);
unknown's avatar
unknown committed
2349
  }
Marc Alff's avatar
Marc Alff committed
2350
  mysql_mutex_unlock(&di->mutex);
2351
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2352 2353 2354 2355 2356 2357 2358
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
Marc Alff's avatar
Marc Alff committed
2359
  mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
unknown's avatar
unknown committed
2360

2361
  I_List_iterator<Delayed_insert> it(delayed_threads);
2362 2363
  Delayed_insert *di;
  while ((di= it++))
unknown's avatar
unknown committed
2364
  {
2365 2366
    di->thd.killed= THD::KILL_CONNECTION;
    if (di->thd.mysys_var)
unknown's avatar
unknown committed
2367
    {
Marc Alff's avatar
Marc Alff committed
2368
      mysql_mutex_lock(&di->thd.mysys_var->mutex);
2369
      if (di->thd.mysys_var->current_cond)
unknown's avatar
unknown committed
2370
      {
unknown's avatar
unknown committed
2371 2372 2373 2374
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
2375
	if (&di->mutex != di->thd.mysys_var->current_mutex)
Marc Alff's avatar
Marc Alff committed
2376 2377
          mysql_mutex_lock(di->thd.mysys_var->current_mutex);
        mysql_cond_broadcast(di->thd.mysys_var->current_cond);
2378
	if (&di->mutex != di->thd.mysys_var->current_mutex)
Marc Alff's avatar
Marc Alff committed
2379
          mysql_mutex_unlock(di->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
2380
      }
Marc Alff's avatar
Marc Alff committed
2381
      mysql_mutex_unlock(&di->thd.mysys_var->mutex);
unknown's avatar
unknown committed
2382 2383
    }
  }
Marc Alff's avatar
Marc Alff committed
2384
  mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
unknown's avatar
unknown committed
2385 2386 2387
}


Konstantin Osipov's avatar
Konstantin Osipov committed
2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398
/**
   Open and lock table for use by delayed thread and check that
   this table is suitable for delayed inserts.

   @retval FALSE - Success.
   @retval TRUE  - Failure.
*/

bool Delayed_insert::open_and_lock_table()
{
  if (!(table= open_n_lock_single_table(&thd, &table_list,
Konstantin Osipov's avatar
Konstantin Osipov committed
2399
                                        TL_WRITE_DELAYED, 0)))
Konstantin Osipov's avatar
Konstantin Osipov committed
2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423
  {
    thd.fatal_error();				// Abort waiting inserts
    return TRUE;
  }
  if (!(table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
  {
    my_error(ER_DELAYED_NOT_SUPPORTED, MYF(ME_FATALERROR),
             table_list.table_name);
    return TRUE;
  }
  if (table->triggers)
  {
    /*
      Table has triggers. This is not an error, but we do
      not support triggers with delayed insert. Terminate the delayed
      thread without an error and thus request lock upgrade.
    */
    return TRUE;
  }
  table->copy_blobs= 1;
  return FALSE;
}


2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434
/*
 * Create a new delayed insert thread
*/

pthread_handler_t handle_delayed_insert(void *arg)
{
  Delayed_insert *di=(Delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
Marc Alff's avatar
Marc Alff committed
2435
  mysql_mutex_lock(&LOCK_thread_count);
2436 2437 2438 2439
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
  thd->set_current_time();
  threads.append(thd);
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
Marc Alff's avatar
Marc Alff committed
2440
  mysql_mutex_unlock(&LOCK_thread_count);
2441

Marc Alff's avatar
Marc Alff committed
2442 2443
  mysql_thread_set_psi_id(thd->thread_id);

2444
  /*
Marc Alff's avatar
Marc Alff committed
2445
    Wait until the client runs into mysql_cond_wait(),
2446 2447 2448 2449 2450 2451
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
Marc Alff's avatar
Marc Alff committed
2452
  mysql_mutex_lock(&di->mutex);
2453 2454 2455
  if (my_thread_init())
  {
    /* Can't use my_error since store_globals has not yet been called */
2456 2457
    thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
                                   ER(ER_OUT_OF_RESOURCES), NULL);
2458
  }
2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482
  else
  {
    DBUG_ENTER("handle_delayed_insert");
    thd->thread_stack= (char*) &thd;
    if (init_thr_lock() || thd->store_globals())
    {
      /* Can't use my_error since store_globals has perhaps failed */
      thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
                                     ER(ER_OUT_OF_RESOURCES), NULL);
      thd->fatal_error();
      goto err;
    }

    /*
      Open table requires an initialized lex in case the table is
      partitioned. The .frm file contains a partial SQL string which is
      parsed using a lex, that depends on initialized thd->lex.
    */
    lex_start(thd);
    thd->lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
2483 2484
    thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED);
    thd->set_current_stmt_binlog_format_row_if_mixed();
2485

Konstantin Osipov's avatar
Konstantin Osipov committed
2486
    init_mdl_requests(&di->table_list);
2487

Konstantin Osipov's avatar
Konstantin Osipov committed
2488
    if (di->open_and_lock_table())
2489 2490 2491
      goto err;

    /* Tell client that the thread is initialized */
Marc Alff's avatar
Marc Alff committed
2492
    mysql_cond_signal(&di->cond_client);
2493 2494 2495 2496 2497 2498

    /* Now wait until we get an insert or lock to handle */
    /* We will not abort as long as a client thread uses this thread */

    for (;;)
    {
2499
      if (thd->killed)
2500 2501 2502 2503 2504 2505
      {
        uint lock_count;
        /*
          Remove this from delay insert list so that no one can request a
          table from this
        */
Marc Alff's avatar
Marc Alff committed
2506 2507
        mysql_mutex_unlock(&di->mutex);
        mysql_mutex_lock(&LOCK_delayed_insert);
2508 2509
        di->unlink();
        lock_count=di->lock_count();
Marc Alff's avatar
Marc Alff committed
2510 2511
        mysql_mutex_unlock(&LOCK_delayed_insert);
        mysql_mutex_lock(&di->mutex);
2512 2513 2514 2515
        if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
          break;					// Time to die
      }

2516 2517
      /* Shouldn't wait if killed or an insert is waiting. */
      if (!thd->killed && !di->status && !di->stacked_inserts)
2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
      {
        struct timespec abstime;
        set_timespec(abstime, delayed_insert_timeout);

        /* Information for pthread_kill */
        di->thd.mysys_var->current_mutex= &di->mutex;
        di->thd.mysys_var->current_cond= &di->cond;
        thd_proc_info(&(di->thd), "Waiting for INSERT");

        DBUG_PRINT("info",("Waiting for someone to insert rows"));
2528
        while (!thd->killed && !di->status)
2529 2530
        {
          int error;
2531
          mysql_audit_release(thd);
2532
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
Marc Alff's avatar
Marc Alff committed
2533
          error= mysql_cond_wait(&di->cond, &di->mutex);
2534
#else
Marc Alff's avatar
Marc Alff committed
2535
          error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
2536 2537 2538
#ifdef EXTRA_DEBUG
          if (error && error != EINTR && error != ETIMEDOUT)
          {
Marc Alff's avatar
Marc Alff committed
2539 2540 2541
            fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
            DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
                                 error));
2542 2543 2544 2545 2546 2547 2548
          }
#endif
#endif
          if (error == ETIMEDOUT || error == ETIME)
            thd->killed= THD::KILL_CONNECTION;
        }
        /* We can't lock di->mutex and mysys_var->mutex at the same time */
Marc Alff's avatar
Marc Alff committed
2549 2550
        mysql_mutex_unlock(&di->mutex);
        mysql_mutex_lock(&di->thd.mysys_var->mutex);
2551 2552
        di->thd.mysys_var->current_mutex= 0;
        di->thd.mysys_var->current_cond= 0;
Marc Alff's avatar
Marc Alff committed
2553 2554
        mysql_mutex_unlock(&di->thd.mysys_var->mutex);
        mysql_mutex_lock(&di->mutex);
2555 2556 2557
      }
      thd_proc_info(&(di->thd), 0);

2558
      if (di->tables_in_use && ! thd->lock && !thd->killed)
2559
      {
Konstantin Osipov's avatar
Konstantin Osipov committed
2560
        bool need_reopen;
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572
        /*
          Request for new delayed insert.
          Lock the table, but avoid to be blocked by a global read lock.
          If we got here while a global read lock exists, then one or more
          inserts started before the lock was requested. These are allowed
          to complete their work before the server returns control to the
          client which requested the global read lock. The delayed insert
          handler will close the table and finish when the outstanding
          inserts are done.
        */
        if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
                                            MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
Konstantin Osipov's avatar
Konstantin Osipov committed
2573
                                            &need_reopen)))
2574
        {
2575
          if (need_reopen && !thd->killed)
Konstantin Osipov's avatar
Konstantin Osipov committed
2576 2577 2578 2579 2580 2581
          {
            /*
              We were waiting to obtain TL_WRITE_DELAYED (probably due to
              someone having or requesting TL_WRITE_ALLOW_READ) and got
              aborted. Try to reopen table and if it fails die.
            */
2582
            TABLE_LIST *tl_ptr = &di->table_list;
2583
            close_tables_for_reopen(thd, &tl_ptr, NULL);
Konstantin Osipov's avatar
Konstantin Osipov committed
2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594
            di->table= 0;
            if (di->open_and_lock_table())
            {
              thd->killed= THD::KILL_CONNECTION;
            }
          }
          else
          {
            /* Fatal error */
            thd->killed= THD::KILL_CONNECTION;
          }
2595
        }
Marc Alff's avatar
Marc Alff committed
2596
        mysql_cond_broadcast(&di->cond_client);
2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614
      }
      if (di->stacked_inserts)
      {
        if (di->handle_inserts())
        {
          /* Some fatal error */
          thd->killed= THD::KILL_CONNECTION;
        }
      }
      di->status=0;
      if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
      {
        /*
          No one is doing a insert delayed
          Unlock table so that other threads can use it
        */
        MYSQL_LOCK *lock=thd->lock;
        thd->lock=0;
Marc Alff's avatar
Marc Alff committed
2615
        mysql_mutex_unlock(&di->mutex);
2616 2617 2618 2619 2620 2621
        /*
          We need to release next_insert_id before unlocking. This is
          enforced by handler::ha_external_lock().
        */
        di->table->file->ha_release_auto_increment();
        mysql_unlock_tables(thd, lock);
Konstantin Osipov's avatar
Konstantin Osipov committed
2622
        trans_commit_stmt(thd);
2623
        di->group_count=0;
2624
        mysql_audit_release(thd);
Marc Alff's avatar
Marc Alff committed
2625
        mysql_mutex_lock(&di->mutex);
2626 2627
      }
      if (di->tables_in_use)
Marc Alff's avatar
Marc Alff committed
2628
        mysql_cond_broadcast(&di->cond_client); // If waiting clients
2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642
    }

  err:
    /*
      mysql_lock_tables() can potentially start a transaction and write
      a table map. In the event of an error, that transaction has to be
      rolled back.  We only need to roll back a potential statement
      transaction, since real transactions are rolled back in
      close_thread_tables().

      TODO: This is not true any more, table maps are generated on the
      first call to ha_*_row() instead. Remove code that are used to
      cover for the case outlined above.
     */
Konstantin Osipov's avatar
Konstantin Osipov committed
2643
    trans_rollback_stmt(thd);
2644 2645 2646

    DBUG_LEAVE;
  }
2647

unknown's avatar
unknown committed
2648 2649 2650 2651 2652 2653 2654
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2655
  thd->killed= THD::KILL_CONNECTION;	        // If error
Marc Alff's avatar
Marc Alff committed
2656 2657
  mysql_cond_broadcast(&di->cond_client);       // Safety
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2658

Marc Alff's avatar
Marc Alff committed
2659 2660
  mysql_mutex_lock(&LOCK_delayed_create);       // Because of delayed_get_table
  mysql_mutex_lock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2661
  delete di;
Marc Alff's avatar
Marc Alff committed
2662 2663
  mysql_mutex_unlock(&LOCK_delayed_insert);
  mysql_mutex_unlock(&LOCK_delayed_create);
unknown's avatar
unknown committed
2664 2665 2666

  my_thread_end();
  pthread_exit(0);
2667 2668

  return 0;
unknown's avatar
unknown committed
2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
2691
      uchar *str;
unknown's avatar
unknown committed
2692 2693 2694 2695 2696 2697 2698 2699
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


2700
bool Delayed_insert::handle_inserts(void)
unknown's avatar
unknown committed
2701 2702
{
  int error;
2703
  ulong max_rows;
2704
  bool has_trans = TRUE;
2705 2706
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2707
  delayed_row *row;
unknown's avatar
unknown committed
2708 2709 2710
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
Marc Alff's avatar
Marc Alff committed
2711
  mysql_mutex_unlock(&mutex);
unknown's avatar
unknown committed
2712 2713

  table->next_number_field=table->found_next_number_field;
2714
  table->use_all_columns();
unknown's avatar
unknown committed
2715

2716
  thd_proc_info(&thd, "upgrading lock");
2717 2718
  if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
                                   thd.variables.lock_wait_timeout))
unknown's avatar
unknown committed
2719
  {
2720 2721 2722 2723 2724 2725 2726
    /*
      This can happen if thread is killed either by a shutdown
      or if another thread is removing the current table definition
      from the table cache.
    */
    my_error(ER_DELAYED_CANT_CHANGE_LOCK,MYF(ME_FATALERROR),
             table->s->table_name.str);
unknown's avatar
unknown committed
2727 2728 2729
    goto err;
  }

2730
  thd_proc_info(&thd, "insert");
2731
  max_rows= delayed_insert_limit;
2732
  if (thd.killed || table->s->needs_reopen())
unknown's avatar
unknown committed
2733
  {
unknown's avatar
SCRUM  
unknown committed
2734
    thd.killed= THD::KILL_CONNECTION;
2735
    max_rows= ULONG_MAX;                     // Do as much as possible
unknown's avatar
unknown committed
2736 2737
  }

2738 2739 2740 2741 2742 2743 2744
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
Marc Alff's avatar
Marc Alff committed
2745
  mysql_mutex_lock(&mutex);
2746

unknown's avatar
unknown committed
2747 2748 2749
  while ((row=rows.get()))
  {
    stacked_inserts--;
Marc Alff's avatar
Marc Alff committed
2750
    mysql_mutex_unlock(&mutex);
2751
    memcpy(table->record[0],row->record,table->s->reclength);
unknown's avatar
unknown committed
2752 2753 2754

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2755 2756 2757 2758 2759
    /*
      To get the exact auto_inc interval to store in the binlog we must not
      use values from the previous interval (of the previous rows).
    */
    bool log_query= (row->log_query && row->query.str != NULL);
2760 2761 2762
    DBUG_PRINT("delayed", ("query: '%s'  length: %lu", row->query.str ?
                           row->query.str : "[NULL]",
                           (ulong) row->query.length));
2763 2764
    if (log_query)
    {
2765 2766 2767 2768 2769 2770 2771 2772
      /*
        This is the first value of an INSERT statement.
        It is the right place to clear a forced insert_id.
        This is usually done after the last value of an INSERT statement,
        but we won't know this in the insert delayed thread. But before
        the first value is sufficiently equivalent to after the last
        value of the previous statement.
      */
2773 2774 2775
      table->file->ha_release_auto_increment();
      thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
    }
2776 2777 2778 2779
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2780
    table->timestamp_field_type= row->timestamp_field_type;
2781
    table->auto_increment_field_not_null= row->auto_increment_field_not_null;
unknown's avatar
unknown committed
2782

2783
    /* Copy the session variables. */
2784 2785
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
2786 2787
    thd.variables.sql_mode=                 row->sql_mode;

2788 2789 2790 2791 2792 2793 2794
    /* Copy a forced insert_id, if any. */
    if (row->forced_insert_id)
    {
      DBUG_PRINT("delayed", ("received auto_inc: %lu",
                             (ulong) row->forced_insert_id));
      thd.force_one_auto_inc_interval(row->forced_insert_id);
    }
2795

2796
    info.ignore= row->ignore;
unknown's avatar
unknown committed
2797
    info.handle_duplicates= row->dup;
2798
    if (info.ignore ||
unknown's avatar
unknown committed
2799
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2800 2801 2802 2803
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2804 2805 2806 2807 2808 2809 2810
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
unknown's avatar
unknown committed
2811 2812
    if (info.handle_duplicates == DUP_UPDATE)
      table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
unknown's avatar
unknown committed
2813
    thd.clear_error(); // reset error for binlog
unknown's avatar
unknown committed
2814
    if (write_record(&thd, table, &info))
unknown's avatar
unknown committed
2815
    {
2816
      info.error_count++;				// Ignore errors
2817
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
2818
      row->log_query = 0;
unknown's avatar
unknown committed
2819
    }
2820

unknown's avatar
unknown committed
2821 2822 2823 2824 2825
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2826 2827 2828 2829 2830
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2831

2832
    if (log_query && mysql_bin_log.is_open())
unknown's avatar
unknown committed
2833
    {
2834 2835 2836 2837 2838 2839 2840 2841
      bool backup_time_zone_used = thd.time_zone_used;
      Time_zone *backup_time_zone = thd.variables.time_zone;
      if (row->time_zone != NULL)
      {
        thd.time_zone_used = true;
        thd.variables.time_zone = row->time_zone;
      }

2842 2843 2844 2845 2846 2847
      /* if the delayed insert was killed, the killed status is
         ignored while binlogging */
      int errcode= 0;
      if (thd.killed == THD::NOT_KILLED)
        errcode= query_error_code(&thd, TRUE);
      
unknown's avatar
unknown committed
2848 2849 2850 2851 2852 2853 2854 2855
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2856 2857
      if (thd.binlog_query(THD::ROW_QUERY_TYPE,
                           row->query.str, row->query.length,
2858
                           FALSE, FALSE, FALSE, errcode))
2859
        goto err;
2860 2861 2862

      thd.time_zone_used = backup_time_zone_used;
      thd.variables.time_zone = backup_time_zone;
unknown's avatar
unknown committed
2863
    }
2864

2865
    if (table->s->blob_fields)
unknown's avatar
unknown committed
2866
      free_delayed_insert_blobs(table);
2867
    thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
unknown's avatar
unknown committed
2868
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
Marc Alff's avatar
Marc Alff committed
2869
    mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
2870

2871 2872 2873 2874 2875 2876
    /*
      Reset the table->auto_increment_field_not_null as it is valid for
      only one row.
    */
    table->auto_increment_field_not_null= FALSE;

unknown's avatar
unknown committed
2877
    delete row;
2878 2879 2880 2881 2882 2883 2884
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2885
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
2886 2887 2888 2889 2890
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
Marc Alff's avatar
Marc Alff committed
2891
          mysql_cond_broadcast(&cond_client);   // If waiting clients
2892
	thd_proc_info(&thd, "reschedule");
Marc Alff's avatar
Marc Alff committed
2893
        mysql_mutex_unlock(&mutex);
unknown's avatar
unknown committed
2894 2895 2896 2897
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
Marc Alff's avatar
Marc Alff committed
2898
	  sql_print_error("%s", thd.stmt_da->message());
2899
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
unknown's avatar
unknown committed
2900 2901
	  goto err;
	}
unknown's avatar
unknown committed
2902
	query_cache_invalidate3(&thd, table, 1);
2903 2904
	if (thr_reschedule_write_lock(*thd.lock->locks,
                                thd.variables.lock_wait_timeout))
unknown's avatar
unknown committed
2905
	{
2906 2907 2908 2909
    /* This is not known to happen. */
    my_error(ER_DELAYED_CANT_CHANGE_LOCK,MYF(ME_FATALERROR),
             table->s->table_name.str);
    goto err;
unknown's avatar
unknown committed
2910
	}
2911 2912
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
Marc Alff's avatar
Marc Alff committed
2913
        mysql_mutex_lock(&mutex);
2914
	thd_proc_info(&thd, "insert");
unknown's avatar
unknown committed
2915 2916
      }
      if (tables_in_use)
Marc Alff's avatar
Marc Alff committed
2917
        mysql_cond_broadcast(&cond_client);     // If waiting clients
unknown's avatar
unknown committed
2918 2919
    }
  }
2920
  thd_proc_info(&thd, 0);
Marc Alff's avatar
Marc Alff committed
2921
  mysql_mutex_unlock(&mutex);
2922

2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
2934 2935 2936
  */
  has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
              table->file->has_transactions();
2937 2938
  if (thd.is_current_stmt_binlog_format_row() &&
      thd.binlog_flush_pending_rows_event(TRUE, has_trans))
2939
    goto err;
2940

unknown's avatar
unknown committed
2941 2942 2943
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
Marc Alff's avatar
Marc Alff committed
2944
    sql_print_error("%s", thd.stmt_da->message());
2945
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
unknown's avatar
unknown committed
2946 2947
    goto err;
  }
unknown's avatar
unknown committed
2948
  query_cache_invalidate3(&thd, table, 1);
Marc Alff's avatar
Marc Alff committed
2949
  mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
2950 2951 2952
  DBUG_RETURN(0);

 err:
2953 2954 2955
#ifndef DBUG_OFF
  max_rows= 0;                                  // For DBUG output
#endif
2956 2957 2958
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
2959 2960 2961 2962 2963
    if (table->s->blob_fields)
    {
      memcpy(table->record[0],row->record,table->s->reclength);
      free_delayed_insert_blobs(table);
    }
2964 2965 2966
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
2967 2968 2969
#ifndef DBUG_OFF
    max_rows++;
#endif
2970
  }
2971
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
unknown's avatar
unknown committed
2972
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
Marc Alff's avatar
Marc Alff committed
2973
  mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
2974 2975
  DBUG_RETURN(1);
}
2976
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
2977 2978

/***************************************************************************
unknown's avatar
unknown committed
2979
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
2980 2981
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
2982 2983 2984 2985 2986 2987 2988 2989 2990

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
unknown's avatar
unknown committed
2991 2992
    FALSE OK
    TRUE  Error
unknown's avatar
VIEW  
unknown committed
2993 2994
*/

unknown's avatar
unknown committed
2995
bool mysql_insert_select_prepare(THD *thd)
unknown's avatar
VIEW  
unknown committed
2996 2997
{
  LEX *lex= thd->lex;
unknown's avatar
unknown committed
2998
  SELECT_LEX *select_lex= &lex->select_lex;
unknown's avatar
unknown committed
2999
  TABLE_LIST *first_select_leaf_table;
unknown's avatar
VIEW  
unknown committed
3000
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
3001

unknown's avatar
unknown committed
3002 3003
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
3004
    clause if table is VIEW
unknown's avatar
unknown committed
3005
  */
unknown's avatar
unknown committed
3006
  
unknown's avatar
unknown committed
3007 3008 3009 3010
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
unknown's avatar
unknown committed
3011
                           &select_lex->where, TRUE, FALSE, FALSE))
unknown's avatar
unknown committed
3012
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
3013

3014 3015 3016 3017
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
unknown's avatar
unknown committed
3018 3019
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
3020
  /* skip all leaf tables belonged to view where we are insert */
3021
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
3022 3023 3024 3025
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
3026
       first_select_leaf_table= first_select_leaf_table->next_leaf)
3027
  {}
unknown's avatar
unknown committed
3028
  select_lex->leaf_tables= first_select_leaf_table;
unknown's avatar
unknown committed
3029
  DBUG_RETURN(FALSE);
unknown's avatar
VIEW  
unknown committed
3030 3031 3032
}


3033
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
unknown's avatar
unknown committed
3034
                             List<Item> *fields_par,
unknown's avatar
unknown committed
3035 3036
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
3037
                             enum_duplicates duplic,
3038 3039
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
3040
   autoinc_value_of_last_inserted_row(0),
3041
   insert_into_view(table_list_par && table_list_par->view != 0)
3042 3043
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
3044 3045 3046 3047
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
3048
  if (table_list_par)
unknown's avatar
unknown committed
3049
    info.view= (table_list_par->view ? table_list_par : 0);
3050 3051 3052
}


unknown's avatar
unknown committed
3053
int
3054
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
3055
{
3056
  LEX *lex= thd->lex;
3057
  int res;
3058
  table_map map= 0;
3059
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
3060 3061
  DBUG_ENTER("select_insert::prepare");

3062
  unit= u;
3063

3064 3065 3066 3067 3068 3069
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
3070 3071 3072
  res= (setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0) ||
        check_insert_fields(thd, table_list, *fields, values,
                            !insert_into_view, 1, &map));
3073

unknown's avatar
unknown committed
3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085
  if (!res && fields->elements)
  {
    bool saved_abort_on_warning= thd->abort_on_warning;
    thd->abort_on_warning= !info.ignore && (thd->variables.sql_mode &
                                            (MODE_STRICT_TRANS_TABLES |
                                             MODE_STRICT_ALL_TABLES));
    res= check_that_all_fields_are_given_values(thd, table_list->table, 
                                                table_list);
    thd->abort_on_warning= saved_abort_on_warning;
  }

  if (info.handle_duplicates == DUP_UPDATE && !res)
3086
  {
3087
    Name_resolution_context *context= &lex->select_lex.context;
3088 3089 3090 3091
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
3092 3093

    /* Perform name resolution only in the first table - 'table_list'. */
3094
    table_list->next_local= 0;
3095 3096
    context->resolve_in_table_list_only(table_list);

3097
    lex->select_lex.no_wrap_view_item= TRUE;
3098
    res= res || check_update_fields(thd, context->table_list,
3099 3100
                                    *info.update_fields, *info.update_values,
                                    &map);
3101 3102
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
3103 3104 3105
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
      we can refer to other tables in the ON DUPLICATE KEY part.
      We use next_name_resolution_table descructively, so check it first (views?)
3106
    */
3107 3108 3109 3110 3111 3112 3113 3114
    DBUG_ASSERT (!table_list->next_name_resolution_table);
    if (lex->select_lex.group_list.elements == 0 &&
        !lex->select_lex.with_sum_func)
      /*
        We must make a single context out of the two separate name resolution contexts :
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
        To do that we must concatenate the two lists
      */  
unknown's avatar
unknown committed
3115 3116
      table_list->next_name_resolution_table= 
        ctx_state.get_first_name_resolution_table();
3117

unknown's avatar
unknown committed
3118 3119
    res= res || setup_fields(thd, 0, *info.update_values,
                             MARK_COLUMNS_READ, 0, 0);
3120
    if (!res)
3121
    {
3122 3123 3124 3125 3126 3127 3128 3129
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
3130

3131 3132 3133
      while ((item= li++))
      {
        item->transform(&Item::update_value_transformer,
3134
                        (uchar*)lex->current_select);
3135
      }
3136 3137 3138
    }

    /* Restore the current context. */
3139
    ctx_state.restore_state(context, table_list);
3140
  }
3141

3142 3143
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
3144
    DBUG_RETURN(1);
3145 3146 3147 3148 3149 3150 3151 3152 3153 3154
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
3155
  if (unique_table(thd, table_list, table_list->next_global, 0))
3156 3157
  {
    /* Using same table for INSERT and SELECT */
3158 3159
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3160
  }
3161
  else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
Konstantin Osipov's avatar
Konstantin Osipov committed
3162
           thd->locked_tables_mode <= LTM_LOCK_TABLES)
3163 3164 3165 3166 3167 3168 3169
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
3170 3171
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
3172
    */
3173
    table->file->ha_start_bulk_insert((ha_rows) 0);
3174
  }
3175
  restore_record(table,s->default_values);		// Get empty record
unknown's avatar
unknown committed
3176
  table->next_number_field=table->found_next_number_field;
3177 3178 3179 3180 3181

#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
3182
      rpl_master_has_bug(&active_mi->rli, 24432, TRUE, NULL, NULL))
3183 3184 3185
    DBUG_RETURN(1);
#endif

unknown's avatar
unknown committed
3186
  thd->cuted_fields=0;
unknown's avatar
unknown committed
3187 3188
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3189 3190 3191
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
3192 3193
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
unknown's avatar
unknown committed
3194
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
3195 3196 3197
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
unknown's avatar
unknown committed
3198
  res= (table_list->prepare_where(thd, 0, TRUE) ||
3199
        table_list->prepare_check_option(thd));
3200 3201

  if (!res)
unknown's avatar
unknown committed
3202
     prepare_triggers_for_insert_stmt(table);
3203

3204
  DBUG_RETURN(res);
unknown's avatar
unknown committed
3205 3206
}

3207

3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
3227
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
Konstantin Osipov's avatar
Konstantin Osipov committed
3228
      thd->locked_tables_mode <= LTM_LOCK_TABLES)
3229
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
3230
  DBUG_RETURN(0);
3231 3232 3233
}


3234 3235 3236 3237 3238 3239
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
3240 3241
select_insert::~select_insert()
{
3242
  DBUG_ENTER("~select_insert");
unknown's avatar
unknown committed
3243 3244 3245
  if (table)
  {
    table->next_number_field=0;
3246
    table->auto_increment_field_not_null= FALSE;
3247
    table->file->ha_reset();
unknown's avatar
unknown committed
3248
  }
3249
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
3250
  thd->abort_on_warning= 0;
3251
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3252 3253 3254 3255 3256
}


bool select_insert::send_data(List<Item> &values)
{
3257
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
3258
  bool error=0;
3259

3260
  if (unit->offset_limit_cnt)
unknown's avatar
unknown committed
3261
  {						// using limit offset,count
3262
    unit->offset_limit_cnt--;
3263
    DBUG_RETURN(0);
unknown's avatar
unknown committed
3264
  }
unknown's avatar
unknown committed
3265

unknown's avatar
unknown committed
3266
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
unknown's avatar
unknown committed
3267 3268
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3269
  if (thd->is_error())
3270 3271
  {
    table->auto_increment_field_not_null= FALSE;
unknown's avatar
unknown committed
3272
    DBUG_RETURN(1);
3273
  }
unknown's avatar
unknown committed
3274 3275
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
3276
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
3277 3278 3279 3280 3281
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
3282
  }
3283

3284 3285 3286
  // Release latches in case bulk insert takes a long time
  ha_release_temporary_latches(thd);

3287
  error= write_record(thd, table, &info);
3288 3289
  table->auto_increment_field_not_null= FALSE;
  
3290
  if (!error)
unknown's avatar
unknown committed
3291
  {
unknown's avatar
unknown committed
3292
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
3293 3294
    {
      /*
unknown's avatar
unknown committed
3295 3296 3297
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
3298 3299 3300 3301 3302 3303 3304 3305
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
3306 3307 3308 3309 3310 3311 3312
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
unknown's avatar
unknown committed
3313 3314 3315 3316 3317 3318
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
unknown's avatar
unknown committed
3319
  }
unknown's avatar
unknown committed
3320
  DBUG_RETURN(error);
unknown's avatar
unknown committed
3321 3322 3323
}


unknown's avatar
unknown committed
3324 3325 3326
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
unknown's avatar
unknown committed
3327 3328
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3329
  else
unknown's avatar
unknown committed
3330 3331
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3332 3333
}

unknown's avatar
unknown committed
3334 3335
void select_insert::send_error(uint errcode,const char *err)
{
unknown's avatar
unknown committed
3336 3337
  DBUG_ENTER("select_insert::send_error");

3338
  my_message(errcode, err, MYF(0));
3339

unknown's avatar
unknown committed
3340
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3341 3342 3343 3344 3345
}


bool select_insert::send_eof()
{
3346
  int error;
3347
  bool const trans_table= table->file->has_transactions();
3348
  ulonglong id;
unknown's avatar
unknown committed
3349
  bool changed;
3350
  THD::killed_state killed_status= thd->killed;
unknown's avatar
unknown committed
3351
  DBUG_ENTER("select_insert::send_eof");
3352 3353
  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
                       trans_table, table->file->table_type()));
unknown's avatar
unknown committed
3354

Konstantin Osipov's avatar
Konstantin Osipov committed
3355 3356
  error= (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
          table->file->ha_end_bulk_insert() : 0);
unknown's avatar
unknown committed
3357
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3358
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3359

3360 3361
  changed= (info.copied || info.deleted || info.updated);
  if (changed)
3362 3363 3364 3365 3366
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
3367
    query_cache_invalidate3(thd, table, 1);
unknown's avatar
unknown committed
3368
  }
3369 3370 3371 3372

  if (thd->transaction.stmt.modified_non_trans_table)
    thd->transaction.all.modified_non_trans_table= TRUE;

unknown's avatar
unknown committed
3373
  DBUG_ASSERT(trans_table || !changed || 
unknown's avatar
unknown committed
3374
              thd->transaction.stmt.modified_non_trans_table);
unknown's avatar
unknown committed
3375

3376 3377 3378 3379 3380 3381
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
3382 3383
  if (mysql_bin_log.is_open() &&
      (!error || thd->transaction.stmt.modified_non_trans_table))
3384
  {
3385
    int errcode= 0;
unknown's avatar
unknown committed
3386 3387
    if (!error)
      thd->clear_error();
3388 3389
    else
      errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
3390
    if (thd->binlog_query(THD::ROW_QUERY_TYPE,
3391
                      thd->query(), thd->query_length(),
3392
                      trans_table, FALSE, FALSE, errcode))
3393 3394 3395 3396
    {
      table->file->ha_release_auto_increment();
      DBUG_RETURN(1);
    }
3397
  }
3398
  table->file->ha_release_auto_increment();
3399

3400
  if (error)
unknown's avatar
unknown committed
3401 3402
  {
    table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
3403
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3404
  }
unknown's avatar
unknown committed
3405
  char buff[160];
3406
  if (info.ignore)
unknown's avatar
unknown committed
3407
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
Marc Alff's avatar
Marc Alff committed
3408 3409
	    (ulong) (info.records - info.copied),
            (ulong) thd->warning_info->statement_warn_count());
unknown's avatar
unknown committed
3410
  else
unknown's avatar
unknown committed
3411
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
Marc Alff's avatar
Marc Alff committed
3412 3413
	    (ulong) (info.deleted+info.updated),
            (ulong) thd->warning_info->statement_warn_count());
3414 3415 3416
  thd->row_count_func= info.copied + info.deleted +
                       ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                        info.touched : info.updated);
3417 3418 3419 3420 3421 3422

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
3423
  ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
3424
  DBUG_RETURN(0);
unknown's avatar
unknown committed
3425 3426
}

3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437
void select_insert::abort() {

  DBUG_ENTER("select_insert::abort");
  /*
    If the creation of the table failed (due to a syntax error, for
    example), no table will have been opened and therefore 'table'
    will be NULL. In that case, we still need to execute the rollback
    and the end of the function.
   */
  if (table)
  {
3438
    bool changed, transactional_table;
3439 3440 3441 3442
    /*
      If we are not in prelocked mode, we end the bulk insert started
      before.
    */
Konstantin Osipov's avatar
Konstantin Osipov committed
3443
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459
      table->file->ha_end_bulk_insert();

    /*
      If at least one row has been inserted/modified and will stay in
      the table (the table doesn't have transactions) we must write to
      the binlog (and the error code will make the slave stop).

      For many errors (example: we got a duplicate key error while
      inserting into a MyISAM table), no row will be added to the table,
      so passing the error to the slave will not help since there will
      be an error code mismatch (the inserts will succeed on the slave
      with no error).

      If table creation failed, the number of rows modified will also be
      zero, so no check for that is made.
    */
3460 3461 3462
    changed= (info.copied || info.deleted || info.updated);
    transactional_table= table->file->has_transactions();
    if (thd->transaction.stmt.modified_non_trans_table)
3463
    {
3464 3465 3466
        if (!can_rollback_data())
          thd->transaction.all.modified_non_trans_table= TRUE;

3467
        if (mysql_bin_log.is_open())
3468 3469
        {
          int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
3470 3471 3472
          /* error of writing binary log is ignored */
          (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
                                   thd->query_length(),
3473
                                   transactional_table, FALSE, FALSE, errcode);
3474
        }
3475 3476
	if (changed)
	  query_cache_invalidate3(thd, table, 1);
3477
    }
3478 3479
    DBUG_ASSERT(transactional_table || !changed ||
		thd->transaction.stmt.modified_non_trans_table);
3480 3481 3482 3483 3484 3485
    table->file->ha_release_auto_increment();
  }

  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
3486 3487

/***************************************************************************
unknown's avatar
unknown committed
3488
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
3489 3490
***************************************************************************/

3491
/**
unknown's avatar
unknown committed
3492 3493
  Create table from lists of fields and items (or just return TABLE
  object for pre-opened existing table).
3494

3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522
  @param thd           [in]     Thread object
  @param create_info   [in]     Create information (like MAX_ROWS, ENGINE or
                                temporary table flag)
  @param create_table  [in]     Pointer to TABLE_LIST object providing database
                                and name for table to be created or to be open
  @param alter_info    [in/out] Initial list of columns and indexes for the
                                table to be created
  @param items         [in]     List of items which should be used to produce
                                rest of fields for the table (corresponding
                                fields will be added to the end of
                                alter_info->create_list)
  @param lock          [out]    Pointer to the MYSQL_LOCK object for table
                                created will be returned in this parameter.
                                Since this table is not included in THD::lock
                                caller is responsible for explicitly unlocking
                                this table.
  @param hooks         [in]     Hooks to be invoked before and after obtaining
                                table lock on the table being created.

  @note
    This function assumes that either table exists and was pre-opened and
    locked at open_and_lock_tables() stage (and in this case we just emit
    error or warning and return pre-opened TABLE object) or an exclusive
    metadata lock was acquired on table so we can safely create, open and
    lock table in it (we don't acquire metadata lock if this create is
    for temporary table).

  @note
unknown's avatar
unknown committed
3523 3524
    Since this function contains some logic specific to CREATE TABLE ...
    SELECT it should be changed before it can be used in other contexts.
3525

3526 3527
  @retval non-zero  Pointer to TABLE object for table created or opened
  @retval 0         Error
3528 3529 3530 3531
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
3532
                                      Alter_info *alter_info,
unknown's avatar
unknown committed
3533 3534 3535
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
3536
{
unknown's avatar
unknown committed
3537
  TABLE tmp_table;		// Used during 'Create_field()'
unknown's avatar
unknown committed
3538
  TABLE_SHARE share;
3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
3550
  tmp_table.s= &share;
3551
  init_tmp_table_share(thd, &share, "", 0, "", "");
unknown's avatar
unknown committed
3552

3553 3554
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
3555
  tmp_table.s->db_low_byte_first= 
3556 3557
        test(create_info->db_type == myisam_hton ||
             create_info->db_type == heap_hton);
3558 3559 3560 3561
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
unknown's avatar
unknown committed
3562
    Create_field *cr_field;
3563
    Field *field, *def_field;
3564
    if (item->type() == Item::FUNC_ITEM)
3565 3566 3567
      if (item->result_type() != STRING_RESULT)
        field= item->tmp_table_field(&tmp_table);
      else
3568
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
3569
    else
3570 3571 3572
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
3573
    if (!field ||
unknown's avatar
unknown committed
3574
	!(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
3575 3576 3577 3578 3579
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
3580
    alter_info->create_list.push_back(cr_field);
3581
  }
unknown's avatar
unknown committed
3582 3583 3584

  DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););

3585
  /*
unknown's avatar
unknown committed
3586 3587 3588 3589 3590
    Create and lock table.

    Note that we either creating (or opening existing) temporary table or
    creating base table on which name we have exclusive lock. So code below
    should not cause deadlocks or races.
3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
  */
  {
unknown's avatar
unknown committed
3602 3603
    if (!mysql_create_table_no_lock(thd, create_table->db,
                                    create_table->table_name,
3604 3605
                                    create_info, alter_info, 0,
                                    select_field_count))
3606
    {
3607
      if (create_info->table_existed)
unknown's avatar
unknown committed
3608 3609 3610 3611 3612 3613 3614 3615 3616
      {
        /*
          This means that someone created table underneath server
          or it was created via different mysqld front-end to the
          cluster. We don't have much options but throw an error.
        */
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
        DBUG_RETURN(0);
      }
3617

unknown's avatar
unknown committed
3618
      DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););
unknown's avatar
unknown committed
3619

unknown's avatar
unknown committed
3620 3621
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
Konstantin Osipov's avatar
Konstantin Osipov committed
3622
        Open_table_context ot_ctx_unused(thd);
Konstantin Osipov's avatar
Konstantin Osipov committed
3623 3624 3625 3626 3627
        /*
          Here we open the destination table, on which we already have
          an exclusive metadata lock.
        */
        if (open_table(thd, create_table, thd->mem_root,
Konstantin Osipov's avatar
Konstantin Osipov committed
3628
                       &ot_ctx_unused, MYSQL_OPEN_REOPEN))
unknown's avatar
unknown committed
3629
        {
3630
          mysql_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
3631 3632 3633
          quick_rm_table(create_info->db_type, create_table->db,
                         table_case_name(create_info, create_table->table_name),
                         0);
3634
          mysql_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3635 3636 3637 3638 3639 3640
        }
        else
          table= create_table->table;
      }
      else
      {
Konstantin Osipov's avatar
Konstantin Osipov committed
3641 3642
        Open_table_context ot_ctx_unused(thd);
        if (open_table(thd, create_table, thd->mem_root, &ot_ctx_unused,
3643
                       MYSQL_OPEN_TEMPORARY_ONLY))
unknown's avatar
unknown committed
3644 3645 3646 3647 3648 3649
        {
          /*
            This shouldn't happen as creation of temporary table should make
            it preparable for open. But let us do close_temporary_table() here
            just in case.
          */
3650
          drop_temporary_table(thd, create_table);
unknown's avatar
unknown committed
3651
        }
Konstantin Osipov's avatar
Konstantin Osipov committed
3652 3653
        else
          table= create_table->table;
unknown's avatar
unknown committed
3654
      }
3655 3656 3657 3658 3659
    }
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

unknown's avatar
unknown committed
3660 3661
  DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););

3662
  table->reginfo.lock_type=TL_WRITE;
unknown's avatar
unknown committed
3663
  hooks->prelock(&table, 1);                    // Call prelock hooks
Konstantin Osipov's avatar
Konstantin Osipov committed
3664 3665 3666
  /*
    mysql_lock_tables() below should never fail with request to reopen table
    since it won't wait for the table lock (we have exclusive metadata lock on
3667
    the table) and thus can't get aborted.
Konstantin Osipov's avatar
Konstantin Osipov committed
3668
  */
3669
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
3670 3671
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)) ||
        hooks->postlock(&table, 1))
3672
  {
3673 3674 3675 3676 3677
    if (*lock)
    {
      mysql_unlock_tables(thd, *lock);
      *lock= 0;
    }
3678
    drop_open_table(thd, table, create_table->db, create_table->table_name);
3679 3680 3681 3682 3683 3684
    DBUG_RETURN(0);
  }
  DBUG_RETURN(table);
}


unknown's avatar
unknown committed
3685
int
3686
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3687
{
3688
  MYSQL_LOCK *extra_lock= NULL;
3689
  DBUG_ENTER("select_create::prepare");
3690

3691
  TABLEOP_HOOKS *hook_ptr= NULL;
3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709
  /*
    For row-based replication, the CREATE-SELECT statement is written
    in two pieces: the first one contain the CREATE TABLE statement
    necessary to create the table and the second part contain the rows
    that should go into the table.

    For non-temporary tables, the start of the CREATE-SELECT
    implicitly commits the previous transaction, and all events
    forming the statement will be stored the transaction cache. At end
    of the statement, the entire statement is committed as a
    transaction, and all events are written to the binary log.

    On the master, the table is locked for the duration of the
    statement, but since the CREATE part is replicated as a simple
    statement, there is no way to lock the table for accesses on the
    slave.  Hence, we have to hold on to the CREATE part of the
    statement until the statement has finished.
   */
3710 3711
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
Konstantin Osipov's avatar
Konstantin Osipov committed
3712 3713 3714 3715 3716
    MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
             TABLE_LIST *select_tables_arg)
      : ptr(x),
        create_table(create_table_arg),
        select_tables(select_tables_arg)
3717 3718
      {
      }
3719 3720

  private:
3721
    virtual int do_postlock(TABLE **tables, uint count)
3722
    {
Konstantin Osipov's avatar
Konstantin Osipov committed
3723
      int error;
3724
      THD *thd= const_cast<THD*>(ptr->get_thd());
Konstantin Osipov's avatar
Konstantin Osipov committed
3725 3726 3727 3728
      TABLE_LIST *save_next_global= create_table->next_global;

      create_table->next_global= select_tables;

3729
      error= thd->decide_logging_format(create_table);
Konstantin Osipov's avatar
Konstantin Osipov committed
3730 3731 3732 3733

      create_table->next_global= save_next_global;

      if (error)
3734 3735
        return error;

3736
      TABLE const *const table = *tables;
3737
      if (thd->is_current_stmt_binlog_format_row()  &&
3738
          !table->s->tmp_table &&
3739 3740
          !ptr->get_create_info()->table_existed)
      {
3741 3742
        if (int error= ptr->binlog_show_create_table(tables, count))
          return error;
3743
      }
3744
      return 0;
3745 3746 3747
    }

    select_create *ptr;
Konstantin Osipov's avatar
Konstantin Osipov committed
3748 3749
    TABLE_LIST *create_table;
    TABLE_LIST *select_tables;
3750 3751
  };

3752
  MY_HOOKS hooks(this, create_table, select_tables);
3753
  hook_ptr= &hooks;
3754

3755
  unit= u;
3756 3757

  /*
3758 3759 3760
    Start a statement transaction before the create if we are using
    row-based replication for the statement.  If we are creating a
    temporary table, we need to start a statement transaction.
3761 3762
  */
  if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
3763
      thd->is_current_stmt_binlog_format_row() &&
3764
      mysql_bin_log.is_open())
3765 3766 3767 3768
  {
    thd->binlog_start_trans_and_stmt();
  }

3769 3770
  DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););

3771
  if (create_table->table)
3772 3773 3774 3775 3776 3777 3778 3779 3780
  {
    /* Table already exists and was open at open_and_lock_tables() stage. */
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
    {
      /* Mark that table existed */
      create_info->table_existed= 1;
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                          create_table->table_name);
3781
      if (thd->is_current_stmt_binlog_format_row())
3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796
        binlog_show_create_table(&(create_table->table), 1);
      table= create_table->table;
    }
    else
    {
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
      DBUG_RETURN(-1);
    }
  }
  else
    if (!(table= create_table_from_items(thd, create_info, create_table,
                                         alter_info, &values,
                                         &extra_lock, hook_ptr)))
      /* abort() deletes table */
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
3797

3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809
  if (extra_lock)
  {
    DBUG_ASSERT(m_plock == NULL);

    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
      m_plock= &m_lock;
    else
      m_plock= &thd->extra_lock;

    *m_plock= extra_lock;
  }

3810
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
3811
  {
3812
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
unknown's avatar
unknown committed
3813 3814 3815
    DBUG_RETURN(-1);
  }

3816
 /* First field to copy */
unknown's avatar
unknown committed
3817 3818 3819 3820
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
3821
    bitmap_set_bit(table->write_set, (*f)->field_index);
unknown's avatar
unknown committed
3822

3823
  /* Don't set timestamp if used */
3824
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
unknown's avatar
unknown committed
3825 3826
  table->next_number_field=table->found_next_number_field;

3827
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
3828
  thd->cuted_fields=0;
unknown's avatar
unknown committed
3829
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
3830
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3831 3832 3833
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
3834 3835
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
Konstantin Osipov's avatar
Konstantin Osipov committed
3836
  if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
3837
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
3838
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
3839 3840 3841
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
3842 3843 3844
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
unknown's avatar
unknown committed
3845
  table->file->extra(HA_EXTRA_WRITE_CACHE);
3846
  DBUG_RETURN(0);
unknown's avatar
unknown committed
3847 3848
}

3849
int
3850
select_create::binlog_show_create_table(TABLE **tables, uint count)
3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
3868
  */
3869
  DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
3870
  DBUG_ASSERT(tables && *tables && count > 0);
3871 3872 3873

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
3874
  int result;
3875
  TABLE_LIST tmp_table_list;
3876

3877 3878
  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
  tmp_table_list.table = *tables;
3879
  query.length(0);      // Have to zero it since constructor doesn't
3880

3881 3882
  result= store_create_info(thd, &tmp_table_list, &query, create_info,
                            /* show_database */ TRUE);
3883
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
3884

3885
  if (mysql_bin_log.is_open())
3886 3887
  {
    int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
3888 3889 3890
    result= thd->binlog_query(THD::STMT_QUERY_TYPE,
                              query.ptr(), query.length(),
                              /* is_trans */ TRUE,
3891
                              /* direct */ FALSE,
3892 3893
                              /* suppress_use */ FALSE,
                              errcode);
3894
  }
3895
  return result;
3896 3897
}

unknown's avatar
unknown committed
3898
void select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
3899
{
unknown's avatar
unknown committed
3900 3901
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3902 3903
}

unknown's avatar
unknown committed
3904

3905 3906
void select_create::send_error(uint errcode,const char *err)
{
3907 3908 3909 3910
  DBUG_ENTER("select_create::send_error");

  DBUG_PRINT("info",
             ("Current statement %s row-based",
3911
              thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
3912 3913
  DBUG_PRINT("info",
             ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3914
              (ulong) table,
3915 3916 3917 3918 3919
              table && !table->s->tmp_table ? "is NOT" : "is"));
  DBUG_PRINT("info",
             ("Table %s prior to executing this statement",
              get_create_info()->table_existed ? "existed" : "did not exist"));

3920
  /*
3921 3922 3923 3924 3925 3926 3927 3928 3929
    This will execute any rollbacks that are necessary before writing
    the transcation cache.

    We disable the binary log since nothing should be written to the
    binary log.  This disabling is important, since we potentially do
    a "roll back" of non-transactional tables by removing the table,
    and the actual rollback might generate events that should not be
    written to the binary log.

3930 3931 3932 3933
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
3934 3935

  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3936 3937
}

unknown's avatar
unknown committed
3938

unknown's avatar
unknown committed
3939 3940 3941 3942 3943 3944 3945
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
3946 3947 3948 3949 3950 3951
    /*
      Do an implicit commit at end of statement for non-temporary
      tables.  This can fail, but we should unlock the table
      nevertheless.
    */
    if (!table->s->tmp_table)
3952
    {
Konstantin Osipov's avatar
Konstantin Osipov committed
3953 3954
      trans_commit_stmt(thd);
      trans_commit_implicit(thd);
3955
    }
3956

unknown's avatar
unknown committed
3957
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3958
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3959
    if (m_plock)
3960
    {
3961
      mysql_unlock_tables(thd, *m_plock);
3962 3963
      *m_plock= NULL;
      m_plock= NULL;
3964
    }
unknown's avatar
unknown committed
3965 3966 3967 3968
  }
  return tmp;
}

unknown's avatar
unknown committed
3969

unknown's avatar
unknown committed
3970 3971
void select_create::abort()
{
3972 3973 3974
  DBUG_ENTER("select_create::abort");

  /*
3975
    In select_insert::abort() we roll back the statement, including
3976 3977 3978
    truncating the transaction cache of the binary log. To do this, we
    pretend that the statement is transactional, even though it might
    be the case that it was not.
3979 3980 3981 3982 3983 3984 3985 3986 3987 3988

    We roll back the statement prior to deleting the table and prior
    to releasing the lock on the table, since there might be potential
    for failure if the rollback is executed after the drop or after
    unlocking the table.

    We also roll back the statement regardless of whether the creation
    of the table succeeded or not, since we need to reset the binary
    log state.
  */
3989
  tmp_disable_binlog(thd);
3990
  select_insert::abort();
3991
  thd->transaction.stmt.modified_non_trans_table= FALSE;
3992
  reenable_binlog(thd);
3993
  /* possible error of writing binary log is ignored deliberately */
3994
  (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
3995

3996
  if (m_plock)
unknown's avatar
unknown committed
3997
  {
3998
    mysql_unlock_tables(thd, *m_plock);
3999 4000
    *m_plock= NULL;
    m_plock= NULL;
unknown's avatar
unknown committed
4001
  }
4002

unknown's avatar
unknown committed
4003 4004
  if (table)
  {
unknown's avatar
unknown committed
4005
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4006
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4007
    table->auto_increment_field_not_null= FALSE;
unknown's avatar
unknown committed
4008 4009
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
unknown's avatar
unknown committed
4010
    table=0;                                    // Safety
unknown's avatar
unknown committed
4011
  }
4012
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
4013 4014 4015 4016
}


/*****************************************************************************
unknown's avatar
unknown committed
4017
  Instansiate templates
unknown's avatar
unknown committed
4018 4019
*****************************************************************************/

4020
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
4021
template class List_iterator_fast<List_item>;
4022
#ifndef EMBEDDED_LIBRARY
4023 4024
template class I_List<Delayed_insert>;
template class I_List_iterator<Delayed_insert>;
unknown's avatar
unknown committed
4025
template class I_List<delayed_row>;
4026
#endif /* EMBEDDED_LIBRARY */
4027
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */