sql_insert.cc 97.5 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
unknown's avatar
unknown committed
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
12

unknown's avatar
unknown committed
13 14 15 16 17 18 19
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

unknown's avatar
unknown committed
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
  delayed_insert::get_local_table() the table of the thread is copied
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

unknown's avatar
unknown committed
58
#include "mysql_priv.h"
59 60
#include "sp_head.h"
#include "sql_trigger.h"
61
#include "sql_select.h"
62
#include "sql_show.h"
unknown's avatar
unknown committed
63 64

static int check_null_fields(THD *thd,TABLE *entry);
unknown's avatar
unknown committed
65
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
66
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
67 68
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
                         LEX_STRING query, bool ignore, bool log_on);
unknown's avatar
unknown committed
69
static void end_delayed_insert(THD *thd);
70
pthread_handler_t handle_delayed_insert(void *arg);
unknown's avatar
unknown committed
71
static void unlink_blobs(register TABLE *table);
unknown's avatar
unknown committed
72
#endif
73
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
unknown's avatar
unknown committed
74 75 76 77 78 79 80 81 82 83 84

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

85

unknown's avatar
unknown committed
86
/*
87
  Check if insert fields are correct.
88 89 90 91 92 93 94

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
unknown's avatar
unknown committed
95
    check_unique                If duplicate values should be rejected.
96 97 98 99 100 101 102 103 104

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
unknown's avatar
unknown committed
105 106
*/

unknown's avatar
unknown committed
107 108 109
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
                               bool check_unique)
unknown's avatar
unknown committed
110
{
unknown's avatar
VIEW  
unknown committed
111
  TABLE *table= table_list->table;
112

113 114 115 116 117 118
  if (!table_list->updatable)
  {
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT");
    return -1;
  }

unknown's avatar
unknown committed
119 120
  if (fields.elements == 0 && values.elements != 0)
  {
121 122 123 124 125 126
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
127
    if (values.elements != table->s->fields)
unknown's avatar
unknown committed
128
    {
unknown's avatar
unknown committed
129
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
130 131
      return -1;
    }
unknown's avatar
SCRUM:  
unknown committed
132
#ifndef NO_EMBEDDED_ACCESS_CHECKS
133
    if (grant_option)
unknown's avatar
VIEW  
unknown committed
134 135 136
    {
      Field_iterator_table fields;
      fields.set_table(table);
137
      if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
unknown's avatar
unknown committed
138
                                  table->s->db.str, table->s->table_name.str,
unknown's avatar
VIEW  
unknown committed
139 140 141
                                  &fields))
        return -1;
    }
unknown's avatar
SCRUM:  
unknown committed
142
#endif
143 144
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
145 146 147 148
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
149
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
150 151 152
  }
  else
  {						// Part field list
unknown's avatar
unknown committed
153 154
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
155
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
156
    int res;
unknown's avatar
unknown committed
157

unknown's avatar
unknown committed
158 159
    if (fields.elements != values.elements)
    {
unknown's avatar
unknown committed
160
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
161 162 163
      return -1;
    }

164
    thd->dup_field= 0;
unknown's avatar
unknown committed
165 166 167
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
168
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
169 170 171 172 173

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
174
    table_list->next_local= 0;
175
    context->resolve_in_table_list_only(table_list);
176
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
unknown's avatar
unknown committed
177 178

    /* Restore the current context. */
179
    ctx_state.restore_state(context, table_list);
180
    thd->lex->select_lex.no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
181

unknown's avatar
unknown committed
182
    if (res)
unknown's avatar
unknown committed
183
      return -1;
unknown's avatar
unknown committed
184

unknown's avatar
unknown committed
185
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
186 187 188 189
    {
      /* it is join view => we need to find table for update */
      List_iterator_fast<Item> it(fields);
      Item *item;
unknown's avatar
unknown committed
190
      TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
191
      table_map map= 0;
unknown's avatar
unknown committed
192 193

      while ((item= it++))
194
        map|= item->used_tables();
unknown's avatar
unknown committed
195
      if (table_list->check_single_table(&tbl, map, table_list) || tbl == 0)
196 197 198 199 200 201 202
      {
        my_error(ER_VIEW_MULTIUPDATE, MYF(0),
                 table_list->view_db.str, table_list->view_name.str);
        return -1;
      }
      table_list->table= table= tbl->table;
    }
203

204
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
205
    {
206
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
unknown's avatar
unknown committed
207 208
      return -1;
    }
209 210 211 212 213 214 215 216 217 218 219 220
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
unknown's avatar
unknown committed
221
  }
unknown's avatar
unknown committed
222
  // For the values we need select_priv
unknown's avatar
SCRUM:  
unknown committed
223
#ifndef NO_EMBEDDED_ACCESS_CHECKS
224
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
SCRUM:  
unknown committed
225
#endif
226 227 228

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
229
       check_view_insertability(thd, table_list)))
230 231 232 233 234
  {
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT");
    return -1;
  }

unknown's avatar
unknown committed
235 236 237 238
  return 0;
}


239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

unknown's avatar
unknown committed
258
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
259 260
                               List<Item> &update_fields)
{
unknown's avatar
unknown committed
261
  TABLE *table= insert_table_list->table;
262
  my_bool timestamp_mark;
263 264 265

  if (table->timestamp_field)
  {
266 267 268 269 270 271
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
272 273
  }

274 275
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
276 277 278 279 280
    return -1;

  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
281 282
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
283 284
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
285 286 287
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
288 289 290 291 292
  }
  return 0;
}


unknown's avatar
unknown committed
293 294 295 296 297
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
298 299
                  enum_duplicates duplic,
		  bool ignore)
unknown's avatar
unknown committed
300
{
301
  int error, res;
302 303 304 305 306
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
307 308
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
                (!(thd->security_ctx->master_access & SUPER_ACL)));
309
  bool transactional_table, joins_freed= FALSE;
310
  bool changed;
unknown's avatar
unknown committed
311 312 313 314
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
315
  TABLE *table= 0;
unknown's avatar
unknown committed
316
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
317
  List_item *values;
unknown's avatar
unknown committed
318
  Name_resolution_context *context;
319
  Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
320 321 322
#ifndef EMBEDDED_LIBRARY
  char *query= thd->query;
#endif
unknown's avatar
unknown committed
323
  thr_lock_type lock_type = table_list->lock_type;
unknown's avatar
unknown committed
324
  Item *unused_conds= 0;
unknown's avatar
unknown committed
325 326
  DBUG_ENTER("mysql_insert");

327 328 329 330 331
  /*
    in safe mode or with skip-new change delayed insert to be regular
    if we are told to replace duplicates, the insert cannot be concurrent
    delayed insert changed to regular in slave thread
   */
332 333 334 335
#ifdef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
    lock_type=TL_WRITE;
#else
unknown's avatar
unknown committed
336 337
  if ((lock_type == TL_WRITE_DELAYED &&
       ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
338
	thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
339 340
      (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
      (duplic == DUP_UPDATE))
unknown's avatar
unknown committed
341
    lock_type=TL_WRITE;
342
#endif
unknown's avatar
unknown committed
343
  table_list->lock_type= lock_type;
unknown's avatar
unknown committed
344

345
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
346 347 348 349
  if (lock_type == TL_WRITE_DELAYED)
  {
    if (thd->locked_tables)
    {
unknown's avatar
unknown committed
350 351
      DBUG_ASSERT(table_list->db); /* Must be set in the parser */
      if (find_locked_table(thd, table_list->db, table_list->table_name))
unknown's avatar
unknown committed
352
      {
353
	my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
354
                 table_list->table_name);
unknown's avatar
unknown committed
355
	DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
356 357
      }
    }
358
    if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
359
    {
360 361 362 363 364
      /*
        Open tables used for sub-selects or in stored functions, will also
        cache these functions.
      */
      res= open_and_lock_tables(thd, table_list->next_global);
unknown's avatar
VIEW  
unknown committed
365 366 367 368 369 370
      /*
	First is not processed by open_and_lock_tables() => we need set
	updateability flags "by hands".
      */
      if (!table_list->derived && !table_list->view)
        table_list->updatable= 1;  // usual table
371
    }
unknown's avatar
unknown committed
372
    else
373
    {
374 375
      /* Too many delayed insert threads;  Use a normal insert */
      table_list->lock_type= lock_type= TL_WRITE;
unknown's avatar
unknown committed
376
      res= open_and_lock_tables(thd, table_list);
377
    }
unknown's avatar
unknown committed
378 379
  }
  else
380
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
381
    res= open_and_lock_tables(thd, table_list);
382
  if (res || thd->is_fatal_error)
unknown's avatar
unknown committed
383
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
384

unknown's avatar
unknown committed
385
  thd->proc_info="init";
386
  thd->used_tables=0;
unknown's avatar
unknown committed
387
  values= its++;
unknown's avatar
unknown committed
388

unknown's avatar
VIEW  
unknown committed
389
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
unknown's avatar
unknown committed
390 391
			   update_fields, update_values, duplic, &unused_conds,
                           FALSE))
unknown's avatar
unknown committed
392
    goto abort;
393

394 395 396
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

unknown's avatar
unknown committed
397
  context= &thd->lex->select_lex.context;
398 399 400 401 402 403 404 405 406
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

unknown's avatar
unknown committed
407
  /* Save the state of the current name resolution context. */
408
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
409 410 411 412 413

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
414
  table_list->next_local= 0;
unknown's avatar
unknown committed
415 416
  context->resolve_in_table_list_only(table_list);

unknown's avatar
unknown committed
417
  value_count= values->elements;
unknown's avatar
unknown committed
418
  while ((values= its++))
unknown's avatar
unknown committed
419 420 421 422
  {
    counter++;
    if (values->elements != value_count)
    {
423
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
unknown's avatar
unknown committed
424 425
      goto abort;
    }
426
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
427 428 429
      goto abort;
  }
  its.rewind ();
unknown's avatar
unknown committed
430 431
 
  /* Restore the current context. */
432
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
433

unknown's avatar
unknown committed
434
  /*
unknown's avatar
unknown committed
435
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
436
  */
unknown's avatar
unknown committed
437
  info.records= info.deleted= info.copied= info.updated= 0;
438
  info.ignore= ignore;
unknown's avatar
unknown committed
439
  info.handle_duplicates=duplic;
440 441
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
442
  info.view= (table_list->view ? table_list : 0);
443

444 445 446
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
447
    to NULL.
448
  */
unknown's avatar
unknown committed
449
  thd->count_cuted_fields= ((values_list.elements == 1 &&
unknown's avatar
unknown committed
450
                             !ignore) ?
451 452
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
unknown's avatar
unknown committed
453 454 455 456 457
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

  error=0;
  thd->proc_info="update";
unknown's avatar
unknown committed
458
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
459
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
460 461 462
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
463 464 465 466
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
467 468 469 470
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
471 472 473 474
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
475
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
476
    table->file->ha_start_bulk_insert(values_list.elements);
477

unknown's avatar
unknown committed
478
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
479
  thd->abort_on_warning= (!ignore &&
unknown's avatar
unknown committed
480 481 482 483
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));

484
  if ((fields.elements || !value_count) &&
485
      check_that_all_fields_are_given_values(thd, table, table_list))
unknown's avatar
unknown committed
486 487 488 489 490
  {
    /* thd->net.report_error is now set, which will abort the next loop */
    error= 1;
  }

491
  table->mark_columns_needed_for_insert();
492

493 494 495 496
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

unknown's avatar
unknown committed
497
  while ((values= its++))
unknown's avatar
unknown committed
498 499 500
  {
    if (fields.elements || !value_count)
    {
501
      restore_record(table,s->default_values);	// Get empty record
unknown's avatar
unknown committed
502 503 504
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
505
      {
unknown's avatar
unknown committed
506
	if (values_list.elements != 1 && !thd->net.report_error)
unknown's avatar
unknown committed
507 508 509 510
	{
	  info.records++;
	  continue;
	}
unknown's avatar
unknown committed
511 512 513 514 515
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
unknown's avatar
unknown committed
516 517 518 519 520 521
	error=1;
	break;
      }
    }
    else
    {
522
      if (thd->used_tables)			// Column used in values()
523
	restore_record(table,s->default_values);	// Get empty record
524
      else
unknown's avatar
unknown committed
525 526 527 528 529 530 531 532 533 534 535
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
536
      {
unknown's avatar
unknown committed
537
	if (values_list.elements != 1 && ! thd->net.report_error)
unknown's avatar
unknown committed
538 539 540 541 542 543 544 545
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
546

547 548 549
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
550
					     ignore))) ==
unknown's avatar
unknown committed
551 552 553
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
554
    {
unknown's avatar
unknown committed
555 556
      error= 1;
      break;
unknown's avatar
unknown committed
557
    }
558
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
559
    if (lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
560
    {
561 562
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
unknown's avatar
unknown committed
563 564 565
      query=0;
    }
    else
566
#endif
unknown's avatar
unknown committed
567
      error=write_record(thd, table ,&info);
568 569
    if (error)
      break;
570
    thd->row_count++;
unknown's avatar
unknown committed
571
  }
572

573 574
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;
575
  table->file->ha_release_auto_increment();
576

577 578 579 580
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
581
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
582 583
  if (lock_type == TL_WRITE_DELAYED)
  {
584 585 586 587 588
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
589
    query_cache_invalidate3(thd, table_list, 1);
unknown's avatar
unknown committed
590 591
  }
  else
592
#endif
unknown's avatar
unknown committed
593
  {
594
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
595
    {
unknown's avatar
unknown committed
596 597
      table->file->print_error(my_errno,MYF(0));
      error=1;
598
    }
599
    transactional_table= table->file->has_transactions();
unknown's avatar
unknown committed
600

601
    if ((changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
602
    {
603 604 605 606 607 608 609
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
610
      {
611
        if (mysql_bin_log.is_open())
612
        {
613 614
          if (error <= 0)
            thd->clear_error();
unknown's avatar
unknown committed
615 616 617 618 619
          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                thd->query, thd->query_length,
                                transactional_table, FALSE) &&
              transactional_table)
          {
620
            error=1;
unknown's avatar
unknown committed
621
          }
622
        }
623 624
        if (!transactional_table)
          thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
625
      }
unknown's avatar
unknown committed
626
    }
627
    if (transactional_table)
628
      error=ha_autocommit_or_rollback(thd,error);
unknown's avatar
unknown committed
629

unknown's avatar
unknown committed
630 631 632
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
633 634 635 636 637 638 639 640 641 642
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
unknown's avatar
unknown committed
643 644 645 646
      thd->lock=0;
    }
  }
  thd->proc_info="end";
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
unknown's avatar
unknown committed
663
  table->next_number_field=0;
664
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
665
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
666
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
667 668 669
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
670

unknown's avatar
unknown committed
671 672 673 674
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
675 676
  {
    thd->row_count_func= info.copied+info.deleted+info.updated;
677
    send_ok(thd, (ulong) thd->row_count_func, id);
678
  }
679 680
  else
  {
unknown's avatar
unknown committed
681
    char buff[160];
682
    if (ignore)
683 684 685
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
686
    else
687
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
688
	      (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
689
    thd->row_count_func= info.copied+info.deleted+info.updated;
690
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
691
  }
692
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
693
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
694 695

abort:
696
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
697 698
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
699
#endif
700
  if (table != NULL)
701
    table->file->ha_release_auto_increment();
702 703
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
unknown's avatar
unknown committed
704
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
705
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
706 707 708
}


unknown's avatar
VIEW  
unknown committed
709 710 711 712 713
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
714
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
715 716
    view    - reference on VIEW

717 718 719 720 721 722
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
723 724
  RETURN
    FALSE - OK
725 726 727
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
728 729 730
    TRUE  - can't be used for insert
*/

731
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
unknown's avatar
VIEW  
unknown committed
732
{
733
  uint num= view->view->select_lex.item_list.elements;
unknown's avatar
VIEW  
unknown committed
734
  TABLE *table= view->table;
735 736 737
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
VIEW  
unknown committed
738
  Field **field_ptr= table->field;
unknown's avatar
unknown committed
739
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
unknown's avatar
unknown committed
740
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
741
  MY_BITMAP used_fields;
unknown's avatar
unknown committed
742
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
743 744
  DBUG_ENTER("check_key_in_view");

745 746 747
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
748 749
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

750
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
751 752
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
753
  view->contain_auto_increment= 0;
754 755 756 757
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
unknown's avatar
unknown committed
758
  thd->mark_used_columns= MARK_COLUMNS_NONE;
unknown's avatar
VIEW  
unknown committed
759
  /* check simplicity and prepare unique test of view */
760
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
761
  {
762
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
763
    {
unknown's avatar
unknown committed
764
      thd->mark_used_columns= save_mark_used_columns;
765 766
      DBUG_RETURN(TRUE);
    }
767
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
768
    /* simple SELECT list entry (field without expression) */
unknown's avatar
merge  
unknown committed
769
    if (!(field= trans->item->filed_for_view_update()))
770
    {
unknown's avatar
unknown committed
771
      thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
772
      DBUG_RETURN(TRUE);
773
    }
774
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
775 776
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
777 778 779 780 781
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
782
  }
unknown's avatar
unknown committed
783
  thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
784
  /* unique test */
785
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
786
  {
787
    /* Thanks to test above, we know that all columns are of type Item_field */
788
    Item_field *field= (Item_field *)trans->item;
789 790 791
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
792 793 794 795 796 797 798
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


unknown's avatar
unknown committed
799
/*
800
  Check if table can be updated
unknown's avatar
unknown committed
801 802

  SYNOPSIS
803 804
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
805
     table_list		Table list
806 807
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
808
     select_insert      Check is making for SELECT ... INSERT
809 810

   RETURN
unknown's avatar
unknown committed
811 812
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
813
*/
814

unknown's avatar
unknown committed
815
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
816
                                             List<Item> &fields,
unknown's avatar
unknown committed
817
                                             bool select_insert)
unknown's avatar
unknown committed
818
{
unknown's avatar
VIEW  
unknown committed
819
  bool insert_into_view= (table_list->view != 0);
820
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
821

822 823 824 825 826 827 828
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

829 830
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
831
                                    table_list,
832
                                    &thd->lex->select_lex.leaf_tables,
833
                                    select_insert, INSERT_ACL, SELECT_ACL))
unknown's avatar
unknown committed
834
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
835 836 837 838

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
839 840 841 842
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
843
      DBUG_RETURN(TRUE);
844
    }
845
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
846 847
  }

unknown's avatar
unknown committed
848
  DBUG_RETURN(FALSE);
849 850 851 852 853 854 855 856 857 858
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
unknown's avatar
unknown committed
859 860
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
unknown's avatar
unknown committed
861 862
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
863

unknown's avatar
unknown committed
864 865 866 867 868
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
869

870 871 872
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
unknown's avatar
unknown committed
873
  
874
  RETURN VALUE
unknown's avatar
unknown committed
875 876
    FALSE OK
    TRUE  error
877 878
*/

unknown's avatar
unknown committed
879
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
unknown's avatar
unknown committed
880
                          TABLE *table, List<Item> &fields, List_item *values,
unknown's avatar
unknown committed
881
                          List<Item> &update_fields, List<Item> &update_values,
unknown's avatar
unknown committed
882 883
                          enum_duplicates duplic,
                          COND **where, bool select_insert)
884
{
unknown's avatar
unknown committed
885
  SELECT_LEX *select_lex= &thd->lex->select_lex;
unknown's avatar
unknown committed
886
  Name_resolution_context *context= &select_lex->context;
887
  Name_resolution_context_state ctx_state;
888
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
889
  bool res= 0;
890
  DBUG_ENTER("mysql_prepare_insert");
unknown's avatar
unknown committed
891 892 893
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
unknown's avatar
unknown committed
894

895 896 897 898 899 900 901
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
unknown's avatar
unknown committed
902
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
903 904 905 906 907 908 909 910 911 912 913 914
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

unknown's avatar
unknown committed
915
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
916 917
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
918
    if (table_list->set_insert_values(thd->mem_root))
unknown's avatar
unknown committed
919
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
920
  }
unknown's avatar
unknown committed
921

922
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
unknown's avatar
unknown committed
923
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
924

unknown's avatar
unknown committed
925
  /* Save the state of the current name resolution context. */
926
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
927

unknown's avatar
unknown committed
928 929 930 931
  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
unknown's avatar
unknown committed
932
  table_list->next_local= 0;
unknown's avatar
unknown committed
933 934 935
  context->resolve_in_table_list_only(table_list);

  /* Prepare the fields in the statement. */
936
  if (values &&
937 938
      !(res= check_insert_fields(thd, context->table_list, fields, *values,
                                 !insert_into_view) ||
939
        setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) &&
940
      duplic == DUP_UPDATE)
unknown's avatar
unknown committed
941
  {
unknown's avatar
unknown committed
942
    select_lex->no_wrap_view_item= TRUE;
unknown's avatar
unknown committed
943
    res= check_update_fields(thd, context->table_list, update_fields);
unknown's avatar
unknown committed
944
    select_lex->no_wrap_view_item= FALSE;
945 946 947 948
    /*
      When we are not using GROUP BY we can refer to other tables in the
      ON DUPLICATE KEY part.
    */       
unknown's avatar
unknown committed
949 950
    if (select_lex->group_list.elements == 0)
    {
951
      context->table_list->next_local=       ctx_state.save_next_local;
952
      /* first_name_resolution_table was set by resolve_in_table_list_only() */
unknown's avatar
unknown committed
953
      context->first_name_resolution_table->
954
        next_name_resolution_table=          ctx_state.save_next_local;
unknown's avatar
unknown committed
955 956
    }
    if (!res)
957
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
unknown's avatar
unknown committed
958
  }
unknown's avatar
unknown committed
959 960

  /* Restore the current context. */
961
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
962

unknown's avatar
unknown committed
963 964
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
965

unknown's avatar
unknown committed
966 967 968
  if (!table)
    table= table_list->table;

969
  if (!select_insert)
unknown's avatar
unknown committed
970
  {
971
    Item *fake_conds= 0;
972
    TABLE_LIST *duplicate;
973
    if ((duplicate= unique_table(thd, table_list, table_list->next_global)))
974
    {
975
      update_non_unique_table_error(table_list, "INSERT", duplicate);
976 977
      DBUG_RETURN(TRUE);
    }
unknown's avatar
unknown committed
978 979
    select_lex->fix_prepare_information(thd, &fake_conds);
    select_lex->first_execution= 0;
unknown's avatar
unknown committed
980
  }
981
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
982
    table->prepare_for_position();
unknown's avatar
unknown committed
983
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
984 985 986
}


unknown's avatar
unknown committed
987 988 989 990
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
991
  while (++keynr < table->s->keys)
unknown's avatar
unknown committed
992 993 994 995 996 997 998
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
unknown's avatar
unknown committed
999 1000 1001 1002 1003 1004 1005 1006 1007 1008
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
unknown's avatar
unknown committed
1009

unknown's avatar
unknown committed
1010 1011 1012 1013 1014
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
1015

unknown's avatar
unknown committed
1016 1017 1018 1019 1020 1021
    Sets thd->no_trans_update if table which is updated didn't have
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
1022 1023 1024
*/


unknown's avatar
unknown committed
1025
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
unknown's avatar
unknown committed
1026
{
unknown's avatar
unknown committed
1027
  int error, trg_error= 0;
unknown's avatar
unknown committed
1028
  char *key=0;
1029
  MY_BITMAP *save_read_set, *save_write_set;
1030 1031
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1032
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
1033

unknown's avatar
unknown committed
1034
  info->records++;
1035 1036
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1037

1038 1039
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1040
  {
1041
    while ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1042
    {
1043
      uint key_nr;
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
unknown's avatar
unknown committed
1055
      bool is_duplicate_key_error;
1056
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1057
	goto err;
unknown's avatar
unknown committed
1058
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
unknown's avatar
unknown committed
1059 1060
      if (!is_duplicate_key_error)
      {
1061 1062 1063 1064 1065
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
unknown's avatar
unknown committed
1066
        if (info->ignore)
1067 1068
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
unknown's avatar
unknown committed
1069
      }
unknown's avatar
unknown committed
1070 1071
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1072
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
unknown's avatar
unknown committed
1073 1074
	goto err;
      }
1075 1076
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1077 1078 1079 1080 1081
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1082 1083
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1084
          key_nr == table->s->next_number_index &&
1085
	  (insert_id_for_cur_row > 0))
1086
	goto err;
1087
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
unknown's avatar
unknown committed
1088
      {
1089
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
unknown's avatar
unknown committed
1090 1091 1092 1093
	  goto err;
      }
      else
      {
1094
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
1095 1096 1097 1098
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
1099

unknown's avatar
unknown committed
1100 1101
	if (!key)
	{
1102
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
unknown's avatar
unknown committed
1103 1104 1105 1106 1107 1108
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1109
	key_copy((byte*) key,table->record[0],table->key_info+key_nr,0);
unknown's avatar
unknown committed
1110
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1111
						(byte*) key,
unknown's avatar
unknown committed
1112 1113
						table->key_info[key_nr].
						key_length,
unknown's avatar
unknown committed
1114 1115 1116
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1117
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1118
      {
unknown's avatar
unknown committed
1119
        int res= 0;
unknown's avatar
unknown committed
1120 1121 1122 1123
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1124
        */
unknown's avatar
unknown committed
1125
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
1126 1127
        store_record(table,insert_values);
        restore_record(table,record[1]);
unknown's avatar
unknown committed
1128 1129
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
unknown's avatar
unknown committed
1130 1131 1132 1133 1134
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
                                                 *info->update_values, 0,
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1135 1136

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
unknown's avatar
unknown committed
1137 1138 1139
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
unknown's avatar
unknown committed
1140
          goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1141
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1142
          goto before_trg_err;
1143

1144 1145
        if ((error=table->file->ha_update_row(table->record[1],
                                              table->record[0])))
1146
	{
1147
          if (info->ignore &&
1148
              !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1149
          {
1150
            table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1151
            goto ok_or_after_trg_err;
1152
          }
1153
          goto err;
1154
	}
unknown's avatar
unknown committed
1155
        info->updated++;
1156
        /*
1157 1158 1159 1160 1161
          If ON DUP KEY UPDATE updates a row instead of inserting one, it's
          like a regular UPDATE statement: it should not affect the value of a
          next SELECT LAST_INSERT_ID() or mysql_insert_id().
          Except if LAST_INSERT_ID(#) was in the INSERT query, which is
          handled separately by THD::arg_of_last_insert_id_function.
1162
        */
1163
        insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1164
        if (table->next_number_field)
1165
          table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
unknown's avatar
unknown committed
1166 1167 1168 1169 1170
        trg_error= (table->triggers &&
                    table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
                                                      TRG_ACTION_AFTER, TRUE));
        info->copied++;
        goto ok_or_after_trg_err;
1171 1172 1173
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
1174 1175 1176 1177 1178
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1179 1180
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1181 1182 1183 1184 1185 1186
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
unknown's avatar
unknown committed
1187 1188
	*/
	if (last_uniq_key(table,key_nr) &&
1189
	    !table->file->referenced_by_foreign_key() &&
1190
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1191 1192
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1193
        {
1194 1195
          if ((error=table->file->ha_update_row(table->record[1],
					        table->record[0])))
1196 1197
            goto err;
          info->deleted++;
1198
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1199 1200 1201 1202 1203
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
1204 1205 1206 1207 1208 1209 1210
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1211
          if ((error=table->file->ha_delete_row(table->record[1])))
unknown's avatar
unknown committed
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
            thd->no_trans_update= 1;
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1224
        }
unknown's avatar
unknown committed
1225 1226
      }
    }
1227
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1228 1229 1230 1231 1232 1233 1234
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
1235
  }
1236
  else if ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1237
  {
1238
    if (!info->ignore ||
1239
        table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1240
      goto err;
1241
    table->file->restore_auto_increment(prev_insert_id);
1242
    goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1243
  }
1244 1245 1246

after_trg_n_copied_inc:
  info->copied++;
1247
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1248 1249 1250
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
1251 1252

ok_or_after_trg_err:
unknown's avatar
unknown committed
1253
  if (key)
1254
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
unknown's avatar
unknown committed
1255 1256
  if (!table->file->has_transactions())
    thd->no_trans_update= 1;
unknown's avatar
unknown committed
1257
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
1258 1259

err:
1260
  info->last_errno= error;
1261 1262 1263
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
unknown's avatar
unknown committed
1264
  table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
1265 1266

before_trg_err:
1267
  table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1268 1269
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1270
  table->column_bitmaps_set(save_read_set, save_write_set);
1271
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1272 1273 1274 1275
}


/******************************************************************************
unknown's avatar
unknown committed
1276
  Check that all fields with arn't null_fields are used
unknown's avatar
unknown committed
1277 1278
******************************************************************************/

1279 1280
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
unknown's avatar
unknown committed
1281
{
1282
  int err= 0;
1283 1284
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
1285 1286
  for (Field **field=entry->field ; *field ; field++)
  {
1287
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1288 1289
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
        ((*field)->real_type() != FIELD_TYPE_ENUM))
unknown's avatar
unknown committed
1290
    {
1291 1292 1293
      bool view= FALSE;
      if (table_list)
      {
unknown's avatar
unknown committed
1294
        table_list= table_list->top_table();
unknown's avatar
unknown committed
1295
        view= test(table_list->view);
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1312
      err= 1;
unknown's avatar
unknown committed
1313 1314
    }
  }
1315
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
1316 1317 1318
}

/*****************************************************************************
unknown's avatar
unknown committed
1319 1320
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
1321 1322
*****************************************************************************/

1323 1324
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
1325 1326
class delayed_row :public ilink {
public:
1327
  char *record;
unknown's avatar
unknown committed
1328 1329
  enum_duplicates dup;
  time_t start_time;
1330 1331 1332
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1333
  timestamp_auto_set_type timestamp_field_type;
1334
  LEX_STRING query;
unknown's avatar
unknown committed
1335

1336 1337 1338 1339 1340
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
      query(query_arg)
    {}
unknown's avatar
unknown committed
1341 1342
  ~delayed_row()
  {
1343
    x_free(query.str);
unknown's avatar
unknown committed
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
    x_free(record);
  }
};


class delayed_insert :public ilink {
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1360
  ulong group_count;
unknown's avatar
unknown committed
1361
  TABLE_LIST table_list;			// Argument
unknown's avatar
unknown committed
1362 1363

  delayed_insert()
1364
    :locks_in_memory(0),
unknown's avatar
unknown committed
1365 1366 1367
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1368 1369
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
unknown's avatar
unknown committed
1370 1371 1372
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1373 1374
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1375 1376 1377 1378
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
1379
    thd.set_current_stmt_binlog_row_based_if_mixed();
unknown's avatar
unknown committed
1380

1381 1382
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1383
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1384
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
1385
    bzero((char*) &info,sizeof(info));
1386
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
1387 1388 1389 1390 1391 1392 1393 1394
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
  ~delayed_insert()
  {
1395
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
1396 1397 1398 1399 1400 1401
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1402 1403 1404
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
unknown's avatar
unknown committed
1405 1406
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1407
    thd.security_ctx->user= thd.security_ctx->host=0;
unknown's avatar
unknown committed
1408 1409 1410
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1411
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
unknown's avatar
unknown committed
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


I_List<delayed_insert> delayed_threads;


delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1452 1453
    if (!strcmp(tmp->thd.db, table_list->db) &&
	!strcmp(table_list->table_name, tmp->table->s->table_name.str))
unknown's avatar
unknown committed
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
    {
      tmp->lock();
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
  return tmp;
}


static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{
  int error;
  delayed_insert *tmp;
1468
  TABLE *table;
unknown's avatar
unknown committed
1469 1470
  DBUG_ENTER("delayed_get_table");

unknown's avatar
unknown committed
1471 1472
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
unknown's avatar
unknown committed
1473

1474
  /* Find the thread which handles this table. */
unknown's avatar
unknown committed
1475 1476
  if (!(tmp=find_handler(thd,table_list)))
  {
1477 1478 1479 1480
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1481
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1482
      DBUG_RETURN(0);
unknown's avatar
unknown committed
1483 1484
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1485 1486 1487 1488 1489
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
    if (! (tmp= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1490 1491 1492 1493
    {
      if (!(tmp=new delayed_insert()))
      {
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert));
1494
	goto err1;
unknown's avatar
unknown committed
1495
      }
unknown's avatar
unknown committed
1496 1497 1498
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1499 1500 1501
      tmp->thd.set_db(table_list->db, strlen(table_list->db));
      tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
      if (tmp->thd.db == NULL || tmp->thd.query == NULL)
unknown's avatar
unknown committed
1502 1503
      {
	delete tmp;
unknown's avatar
unknown committed
1504
	my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1505
	goto err1;
unknown's avatar
unknown committed
1506
      }
unknown's avatar
unknown committed
1507
      tmp->table_list= *table_list;			// Needed to open table
1508
      tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
unknown's avatar
unknown committed
1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
      tmp->lock();
      pthread_mutex_lock(&tmp->mutex);
      if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
				handle_delayed_insert,(void*) tmp)))
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
	pthread_mutex_unlock(&tmp->mutex);
	tmp->unlock();
	delete tmp;
unknown's avatar
unknown committed
1520
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1521
	goto err1;
unknown's avatar
unknown committed
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
      while (!tmp->thd.killed && !tmp->table && !thd->killed)
      {
	pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
      }
      pthread_mutex_unlock(&tmp->mutex);
      thd->proc_info="got old table";
      if (tmp->thd.killed)
      {
1534
	if (tmp->thd.is_fatal_error)
unknown's avatar
unknown committed
1535 1536
	{
	  /* Copy error message and abort */
1537
	  thd->fatal_error();
unknown's avatar
unknown committed
1538
	  strmov(thd->net.last_error,tmp->thd.net.last_error);
1539
	  thd->net.last_errno=tmp->thd.net.last_errno;
unknown's avatar
unknown committed
1540 1541
	}
	tmp->unlock();
1542
	goto err;
unknown's avatar
unknown committed
1543 1544 1545 1546
      }
      if (thd->killed)
      {
	tmp->unlock();
1547
	goto err;
unknown's avatar
unknown committed
1548 1549 1550 1551 1552 1553
      }
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

  pthread_mutex_lock(&tmp->mutex);
1554
  table= tmp->get_local_table(thd);
unknown's avatar
unknown committed
1555 1556 1557
  pthread_mutex_unlock(&tmp->mutex);
  if (table)
    thd->di=tmp;
1558 1559
  else if (tmp->thd.is_fatal_error)
    thd->fatal_error();
1560 1561
  /* Unlock the delayed insert object after its last access. */
  tmp->unlock();
unknown's avatar
unknown committed
1562
  DBUG_RETURN((table_list->table=table));
1563 1564 1565 1566 1567 1568

 err1:
  thd->fatal_error();
 err:
  pthread_mutex_unlock(&LOCK_delayed_create);
  DBUG_RETURN(0); // Continue with normal insert
unknown's avatar
unknown committed
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
}


/*
  As we can't let many threads modify the same TABLE structure, we create
  an own structure for each tread.  This includes a row buffer to save the
  column values and new fields that points to the new row buffer.
  The memory is allocated in the client thread and is freed automaticly.
*/

TABLE *delayed_insert::get_local_table(THD* client_thd)
{
  my_ptrdiff_t adjust_ptrs;
1582
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
1583
  TABLE *copy;
unknown's avatar
unknown committed
1584
  TABLE_SHARE *share= table->s;
1585
  byte *bitmap;
unknown's avatar
unknown committed
1586
  DBUG_ENTER("delayed_insert::get_local_table");
unknown's avatar
unknown committed
1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
      strmov(client_thd->net.last_error,thd.net.last_error);
      client_thd->net.last_errno=thd.net.last_errno;
      goto error;
    }
  }

unknown's avatar
unknown committed
1610 1611 1612 1613 1614 1615 1616
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
unknown's avatar
unknown committed
1617
  client_thd->proc_info="allocating local table";
unknown's avatar
unknown committed
1618
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
unknown's avatar
unknown committed
1619
				   (share->fields+1)*sizeof(Field**)+
1620 1621
				   share->reclength +
                                   share->column_bitmap_size*2);
unknown's avatar
unknown committed
1622 1623 1624
  if (!copy)
    goto error;

unknown's avatar
unknown committed
1625
  /* Copy the TABLE object. */
unknown's avatar
unknown committed
1626
  *copy= *table;
unknown's avatar
unknown committed
1627
  /* We don't need to change the file handler here */
unknown's avatar
unknown committed
1628 1629
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
1630 1631 1632
  bitmap= (byte*) (field + share->fields + 1);
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
unknown's avatar
unknown committed
1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
unknown's avatar
unknown committed
1643
  {
unknown's avatar
unknown committed
1644 1645
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
      DBUG_RETURN(0);
1646
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
1647
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
1648 1649
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
1650 1651 1652 1653 1654 1655 1656 1657
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
unknown's avatar
unknown committed
1658
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
1659
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
1660
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
unknown's avatar
unknown committed
1661 1662
  }

1663 1664
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
1665 1666 1667 1668

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

1669 1670 1671 1672 1673 1674 1675 1676 1677
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

unknown's avatar
unknown committed
1678
  DBUG_RETURN(copy);
unknown's avatar
unknown committed
1679 1680 1681 1682 1683 1684

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
unknown's avatar
unknown committed
1685
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1686 1687 1688 1689 1690
}


/* Put a question in queue */

1691 1692 1693
static int
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
              LEX_STRING query, bool ignore, bool log_on)
unknown's avatar
unknown committed
1694
{
1695
  delayed_row *row;
unknown's avatar
unknown committed
1696 1697
  delayed_insert *di=thd->di;
  DBUG_ENTER("write_delayed");
1698
  DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
unknown's avatar
unknown committed
1699 1700 1701 1702 1703 1704 1705

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

1706
  if (thd->killed)
unknown's avatar
unknown committed
1707 1708
    goto err;

1709 1710 1711 1712 1713 1714 1715
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
1716 1717 1718
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
1719
      goto err;
1720 1721
    query.str= str;
  }
1722 1723 1724 1725
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
unknown's avatar
unknown committed
1726
    goto err;
1727
  }
unknown's avatar
unknown committed
1728

1729
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
unknown's avatar
unknown committed
1730
    goto err;
1731
  memcpy(row->record, table->record[0], table->s->reclength);
unknown's avatar
unknown committed
1732 1733
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
1744
  row->timestamp_field_type=    table->timestamp_field_type;
unknown's avatar
unknown committed
1745 1746 1747 1748

  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
1749
  if (table->s->blob_fields)
unknown's avatar
unknown committed
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}


static void end_delayed_insert(THD *thd)
{
1766
  DBUG_ENTER("end_delayed_insert");
unknown's avatar
unknown committed
1767 1768
  delayed_insert *di=thd->di;
  pthread_mutex_lock(&di->mutex);
1769
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
1770 1771 1772 1773 1774 1775
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
1776
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1790
    /* Ensure that the thread doesn't kill itself while we are looking at it */
unknown's avatar
unknown committed
1791
    pthread_mutex_lock(&tmp->mutex);
unknown's avatar
SCRUM  
unknown committed
1792
    tmp->thd.killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1793 1794 1795
    if (tmp->thd.mysys_var)
    {
      pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1796
      if (tmp->thd.mysys_var->current_cond)
unknown's avatar
unknown committed
1797
      {
unknown's avatar
unknown committed
1798 1799 1800 1801 1802 1803
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1804
	pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
unknown's avatar
unknown committed
1805 1806
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819
      }
      pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
    }
    pthread_mutex_unlock(&tmp->mutex);
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

1820
pthread_handler_t handle_delayed_insert(void *arg)
unknown's avatar
unknown committed
1821 1822 1823 1824 1825 1826 1827 1828
{
  delayed_insert *di=(delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id=thread_id++;
1829
  thd->end_time();
unknown's avatar
unknown committed
1830
  threads.append(thd);
unknown's avatar
SCRUM  
unknown committed
1831
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
unknown's avatar
unknown committed
1832 1833
  pthread_mutex_unlock(&LOCK_thread_count);

1834 1835 1836 1837 1838 1839 1840 1841
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
unknown's avatar
unknown committed
1842
  pthread_mutex_lock(&di->mutex);
1843
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
unknown's avatar
unknown committed
1844 1845 1846 1847 1848 1849 1850 1851
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
1852
  thd->thread_stack= (char*) &thd;
unknown's avatar
unknown committed
1853
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1854
  {
1855
    thd->fatal_error();
unknown's avatar
unknown committed
1856
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
1857
    goto err;
unknown's avatar
unknown committed
1858
  }
1859
#if !defined(__WIN__) && !defined(__NETWARE__)
unknown's avatar
unknown committed
1860 1861 1862 1863 1864 1865 1866
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  /* open table */

unknown's avatar
unknown committed
1867
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
unknown's avatar
unknown committed
1868
  {
1869
    thd->fatal_error();				// Abort waiting inserts
1870
    goto err;
unknown's avatar
unknown committed
1871
  }
1872
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
unknown's avatar
unknown committed
1873
  {
1874
    thd->fatal_error();
1875
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
1876
    goto err;
unknown's avatar
unknown committed
1877
  }
unknown's avatar
unknown committed
1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892
  di->table->copy_blobs=1;

  /* One can now use this */
  pthread_mutex_lock(&LOCK_delayed_insert);
  delayed_threads.append(di);
  pthread_mutex_unlock(&LOCK_delayed_insert);

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
unknown's avatar
SCRUM  
unknown committed
1893
    if (thd->killed == THD::KILL_CONNECTION)
unknown's avatar
unknown committed
1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
1913
      set_timespec(abstime, delayed_insert_timeout);
unknown's avatar
unknown committed
1914 1915 1916 1917

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
1918
      di->thd.proc_info="Waiting for INSERT";
unknown's avatar
unknown committed
1919

1920
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
unknown's avatar
unknown committed
1921
      while (!thd->killed)
unknown's avatar
unknown committed
1922 1923
      {
	int error;
1924
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
unknown's avatar
unknown committed
1925 1926 1927 1928
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
1929
	if (error && error != EINTR && error != ETIMEDOUT)
unknown's avatar
unknown committed
1930 1931 1932 1933 1934 1935 1936 1937 1938
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
unknown's avatar
unknown committed
1939
	if (error == ETIMEDOUT || error == ETIME)
unknown's avatar
unknown committed
1940
	{
unknown's avatar
SCRUM  
unknown committed
1941
	  thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1942 1943 1944
	  break;
	}
      }
unknown's avatar
unknown committed
1945 1946
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
1947 1948 1949 1950
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1951
      pthread_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
1952
    }
1953
    di->thd.proc_info=0;
unknown's avatar
unknown committed
1954 1955 1956

    if (di->tables_in_use && ! thd->lock)
    {
1957
      bool not_used;
1958 1959 1960 1961 1962 1963 1964 1965 1966 1967
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
1968
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
1969 1970
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
unknown's avatar
unknown committed
1971
      {
1972 1973 1974
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1975 1976 1977 1978 1979 1980 1981
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
1982 1983 1984
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1985 1986 1987 1988 1989
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
unknown's avatar
unknown committed
1990 1991 1992 1993
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
unknown's avatar
unknown committed
1994 1995 1996
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
1997
      di->table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
1998 1999 2000 2001 2002 2003 2004 2005
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

2006 2007 2008 2009 2010 2011 2012 2013 2014 2015
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
   */
  ha_rollback_stmt(thd);

unknown's avatar
unknown committed
2016 2017 2018 2019 2020 2021 2022 2023
end:
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2024 2025
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
unknown's avatar
unknown committed
2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
      char *str;
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


bool delayed_insert::handle_inserts(void)
{
  int error;
2072
  ulong max_rows;
2073 2074
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2075
  delayed_row *row;
unknown's avatar
unknown committed
2076 2077 2078 2079 2080 2081
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
2082
  table->use_all_columns();
unknown's avatar
unknown committed
2083 2084 2085 2086 2087

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
unknown's avatar
unknown committed
2088
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
unknown's avatar
unknown committed
2089 2090 2091 2092
    goto err;
  }

  thd.proc_info="insert";
2093
  max_rows= delayed_insert_limit;
2094
  if (thd.killed || table->s->version != refresh_version)
unknown's avatar
unknown committed
2095
  {
unknown's avatar
SCRUM  
unknown committed
2096
    thd.killed= THD::KILL_CONNECTION;
2097
    max_rows= ULONG_MAX;                     // Do as much as possible
unknown's avatar
unknown committed
2098 2099
  }

unknown's avatar
unknown committed
2100 2101 2102 2103 2104 2105 2106
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2107
  pthread_mutex_lock(&mutex);
2108

unknown's avatar
unknown committed
2109 2110 2111 2112
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2113
    memcpy(table->record[0],row->record,table->s->reclength);
unknown's avatar
unknown committed
2114 2115 2116

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2117
    /* for the binlog, forget auto_increment ids generated by previous rows */
2118
//    thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
2119 2120 2121 2122
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2123
    table->timestamp_field_type= row->timestamp_field_type;
unknown's avatar
unknown committed
2124

2125
    info.ignore= row->ignore;
unknown's avatar
unknown committed
2126
    info.handle_duplicates= row->dup;
2127
    if (info.ignore ||
unknown's avatar
unknown committed
2128
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2129 2130 2131 2132
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2133 2134 2135 2136 2137 2138 2139
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
unknown's avatar
unknown committed
2140
    thd.clear_error(); // reset error for binlog
unknown's avatar
unknown committed
2141
    if (write_record(&thd, table, &info))
unknown's avatar
unknown committed
2142
    {
2143
      info.error_count++;				// Ignore errors
2144
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
2145
      row->log_query = 0;
unknown's avatar
unknown committed
2146
    }
2147

unknown's avatar
unknown committed
2148 2149 2150 2151 2152
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2153 2154 2155 2156 2157
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2158 2159

    if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
unknown's avatar
unknown committed
2160 2161 2162 2163 2164 2165 2166 2167 2168
    {
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2169 2170 2171
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);
unknown's avatar
unknown committed
2172
    }
2173

2174
    if (table->s->blob_fields)
unknown's avatar
unknown committed
2175 2176 2177 2178 2179 2180
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2181 2182 2183 2184 2185 2186 2187
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2188
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
	  goto err;
	}
unknown's avatar
unknown committed
2204
	query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2205 2206 2207
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
unknown's avatar
unknown committed
2208 2209
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
unknown's avatar
unknown committed
2210
	}
unknown's avatar
unknown committed
2211 2212
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2213 2214 2215 2216 2217 2218 2219 2220 2221
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }
  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2222

2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238
#ifdef HAVE_ROW_BASED_REPLICATION
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
#endif /* HAVE_ROW_BASED_REPLICATION */
2239

unknown's avatar
unknown committed
2240 2241 2242 2243 2244 2245
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
    goto err;
  }
unknown's avatar
unknown committed
2246
  query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2247 2248 2249 2250
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2251 2252 2253 2254 2255 2256 2257
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
  }
unknown's avatar
unknown committed
2258 2259 2260 2261
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2262
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
2263 2264

/***************************************************************************
unknown's avatar
unknown committed
2265
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
2266 2267
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
2268 2269 2270 2271 2272 2273 2274 2275 2276

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
unknown's avatar
unknown committed
2277 2278
    FALSE OK
    TRUE  Error
unknown's avatar
VIEW  
unknown committed
2279 2280
*/

unknown's avatar
unknown committed
2281
bool mysql_insert_select_prepare(THD *thd)
unknown's avatar
VIEW  
unknown committed
2282 2283
{
  LEX *lex= thd->lex;
unknown's avatar
unknown committed
2284
  SELECT_LEX *select_lex= &lex->select_lex;
unknown's avatar
unknown committed
2285
  TABLE_LIST *first_select_leaf_table;
unknown's avatar
VIEW  
unknown committed
2286
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
2287

unknown's avatar
unknown committed
2288 2289
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
2290
    clause if table is VIEW
unknown's avatar
unknown committed
2291
  */
unknown's avatar
unknown committed
2292
  
unknown's avatar
unknown committed
2293 2294 2295 2296
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
unknown's avatar
unknown committed
2297
                           &select_lex->where, TRUE))
unknown's avatar
unknown committed
2298
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2299

2300 2301 2302 2303
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
unknown's avatar
unknown committed
2304 2305
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2306
  /* skip all leaf tables belonged to view where we are insert */
unknown's avatar
unknown committed
2307
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2308 2309 2310 2311 2312 2313
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
       first_select_leaf_table= first_select_leaf_table->next_leaf)
  {}
unknown's avatar
unknown committed
2314
  select_lex->leaf_tables= first_select_leaf_table;
unknown's avatar
unknown committed
2315
  DBUG_RETURN(FALSE);
unknown's avatar
VIEW  
unknown committed
2316 2317 2318
}


2319
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
unknown's avatar
unknown committed
2320
                             List<Item> *fields_par,
unknown's avatar
unknown committed
2321 2322
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
2323
                             enum_duplicates duplic,
2324 2325
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
2326
   autoinc_value_of_last_inserted_row(0),
2327 2328 2329
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
2330 2331 2332 2333
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2334
  if (table_list_par)
unknown's avatar
unknown committed
2335
    info.view= (table_list_par->view ? table_list_par : 0);
2336 2337 2338
}


unknown's avatar
unknown committed
2339
int
2340
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
2341
{
2342
  LEX *lex= thd->lex;
2343
  int res;
2344
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
2345 2346
  DBUG_ENTER("select_insert::prepare");

2347
  unit= u;
2348 2349 2350 2351 2352 2353
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2354
  res= check_insert_fields(thd, table_list, *fields, values,
2355
                           !insert_into_view) ||
2356
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2357

2358 2359
  if (info.handle_duplicates == DUP_UPDATE)
  {
2360 2361
    /* Save the state of the current name resolution context. */
    Name_resolution_context *context= &lex->select_lex.context;
2362 2363 2364 2365
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2366 2367

    /* Perform name resolution only in the first table - 'table_list'. */
2368
    table_list->next_local= 0;
2369 2370
    context->resolve_in_table_list_only(table_list);

2371
    lex->select_lex.no_wrap_view_item= TRUE;
2372 2373
    res= res || check_update_fields(thd, context->table_list,
                                    *info.update_fields);
2374 2375 2376 2377 2378
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
      When we are not using GROUP BY we can refer to other tables in the
      ON DUPLICATE KEY part
    */       
2379 2380
    if (lex->select_lex.group_list.elements == 0)
    {
2381
      context->table_list->next_local=       ctx_state.save_next_local;
2382
      /* first_name_resolution_table was set by resolve_in_table_list_only() */
2383
      context->first_name_resolution_table->
2384
        next_name_resolution_table=          ctx_state.save_next_local;
2385
    }
2386 2387
    res= res || setup_fields(thd, 0, *info.update_values, MARK_COLUMNS_READ,
                             0, 0);
2388 2389

    /* Restore the current context. */
2390
    ctx_state.restore_state(context, table_list);
2391
  }
2392

2393 2394
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
2395
    DBUG_RETURN(1);
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2406
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2407
      unique_table(thd, table_list, table_list->next_global))
2408 2409
  {
    /* Using same table for INSERT and SELECT */
2410 2411
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2412
  }
2413
  else if (!thd->prelocked_mode)
2414 2415 2416 2417 2418 2419 2420
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2421 2422
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2423
    */
2424
    table->file->ha_start_bulk_insert((ha_rows) 0);
2425
  }
2426
  restore_record(table,s->default_values);		// Get empty record
unknown's avatar
unknown committed
2427
  table->next_number_field=table->found_next_number_field;
unknown's avatar
unknown committed
2428
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2429 2430
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2431 2432 2433
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
2434
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2435
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2436 2437 2438
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2439 2440 2441 2442
  res= ((fields->elements &&
         check_that_all_fields_are_given_values(thd, table, table_list)) ||
        table_list->prepare_where(thd, 0, TRUE) ||
        table_list->prepare_check_option(thd));
2443 2444

  if (!res)
2445 2446
    table->mark_columns_needed_for_insert();

2447
  DBUG_RETURN(res);
unknown's avatar
unknown committed
2448 2449
}

2450

2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2470 2471
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2472
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2473
  DBUG_RETURN(0);
2474 2475 2476
}


2477 2478 2479 2480 2481 2482
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
2483 2484
select_insert::~select_insert()
{
2485
  DBUG_ENTER("~select_insert");
unknown's avatar
unknown committed
2486 2487 2488
  if (table)
  {
    table->next_number_field=0;
2489
    table->file->ha_reset();
unknown's avatar
unknown committed
2490
  }
2491
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2492
  thd->abort_on_warning= 0;
2493
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2494 2495 2496 2497 2498
}


bool select_insert::send_data(List<Item> &values)
{
2499
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
2500
  bool error=0;
2501

2502
  if (unit->offset_limit_cnt)
unknown's avatar
unknown committed
2503
  {						// using limit offset,count
2504
    unit->offset_limit_cnt--;
2505
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2506
  }
unknown's avatar
unknown committed
2507

unknown's avatar
unknown committed
2508
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
unknown's avatar
unknown committed
2509 2510
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2511 2512
  if (thd->net.report_error)
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2513 2514
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
2515
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
2516 2517 2518 2519 2520
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
2521
  }
2522

2523
  error= write_record(thd, table, &info);
2524 2525
    
  if (!error)
unknown's avatar
unknown committed
2526
  {
unknown's avatar
unknown committed
2527
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
2528 2529
    {
      /*
unknown's avatar
unknown committed
2530 2531 2532
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
2533 2534 2535 2536 2537 2538 2539 2540
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
2541 2542 2543 2544 2545 2546 2547
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
unknown's avatar
unknown committed
2548 2549 2550 2551 2552 2553
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
unknown's avatar
unknown committed
2554
  }
2555
  table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
2556
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2557 2558 2559
}


unknown's avatar
unknown committed
2560 2561 2562
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
unknown's avatar
unknown committed
2563 2564
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2565
  else
unknown's avatar
unknown committed
2566 2567
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2568 2569
}

unknown's avatar
unknown committed
2570 2571
void select_insert::send_error(uint errcode,const char *err)
{
unknown's avatar
unknown committed
2572 2573
  DBUG_ENTER("select_insert::send_error");

2574 2575 2576
  /* Avoid an extra 'unknown error' message if we already reported an error */
  if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
    my_message(errcode, err, MYF(0));
2577 2578 2579 2580 2581 2582 2583 2584 2585

  if (!table)
  {
    /*
      This can only happen when using CREATE ... SELECT and the table was not
      created becasue of an syntax error
    */
    DBUG_VOID_RETURN;
  }
2586
  if (!thd->prelocked_mode)
2587
    table->file->ha_end_bulk_insert();
unknown's avatar
unknown committed
2588 2589
  /*
    If at least one row has been inserted/modified and will stay in the table
2590
    (the table doesn't have transactions) we must write to the binlog (and
unknown's avatar
unknown committed
2591
    the error code will make the slave stop).
2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603

    For many errors (example: we got a duplicate key error while
    inserting into a MyISAM table), no row will be added to the table,
    so passing the error to the slave will not help since there will
    be an error code mismatch (the inserts will succeed on the slave
    with no error).

    If we are using row-based replication we have two cases where this
    code is executed: replication of CREATE-SELECT and replication of
    INSERT-SELECT.

    When replicating a CREATE-SELECT statement, we shall not write the
2604 2605
    events to the binary log and should thus not set
    OPTION_STATUS_NO_TRANS_UPDATE.
2606 2607 2608 2609 2610 2611

    When replicating INSERT-SELECT, we shall not write the events to
    the binary log for transactional table, but shall write all events
    if there is one or more writes to non-transactional tables. In
    this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
    write to a non-transactional table, otherwise it is cleared.
2612
  */
2613
  if (info.copied || info.deleted || info.updated)
2614
  {
2615
    if (!table->file->has_transactions())
2616
    {
2617 2618 2619 2620 2621 2622 2623 2624 2625
      if (mysql_bin_log.is_open())
      {
        thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
                          table->file->has_transactions(), FALSE);
      }
      if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
          !can_rollback_data())
        thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
      query_cache_invalidate3(thd, table, 1);
2626
    }
unknown's avatar
unknown committed
2627
  }
unknown's avatar
unknown committed
2628
  ha_rollback_stmt(thd);
unknown's avatar
unknown committed
2629
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2630 2631 2632 2633 2634
}


bool select_insert::send_eof()
{
2635
  int error,error2;
2636
  ulonglong id;
unknown's avatar
unknown committed
2637 2638
  DBUG_ENTER("select_insert::send_eof");

2639
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
unknown's avatar
unknown committed
2640
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2641
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2642

unknown's avatar
unknown committed
2643
  if (info.copied || info.deleted || info.updated)
2644 2645 2646 2647 2648
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
2649
    query_cache_invalidate3(thd, table, 1);
2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661
    /*
      Mark that we have done permanent changes if all of the below is true
      - Table doesn't support transactions
      - It's a normal (not temporary) table. (Changes to temporary tables
        are not logged in RBR)
      - We are using statement based replication
    */
    if (!table->file->has_transactions() &&
        (!table->s->tmp_table ||
         !thd->current_stmt_binlog_row_based))
      thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
   }
unknown's avatar
unknown committed
2662

2663 2664 2665 2666 2667 2668
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
2669 2670
  if (mysql_bin_log.is_open())
  {
unknown's avatar
unknown committed
2671 2672
    if (!error)
      thd->clear_error();
2673 2674 2675
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
                      table->file->has_transactions(), FALSE);
2676
  }
2677 2678 2679
  if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
    error=error2;
  if (error)
unknown's avatar
unknown committed
2680 2681
  {
    table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
2682
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2683
  }
unknown's avatar
unknown committed
2684
  char buff[160];
2685
  if (info.ignore)
unknown's avatar
unknown committed
2686 2687
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
2688
  else
unknown's avatar
unknown committed
2689
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
2690
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
2691
  thd->row_count_func= info.copied+info.deleted+info.updated;
2692 2693 2694 2695 2696 2697 2698

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
  ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
2699
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2700 2701 2702 2703
}


/***************************************************************************
unknown's avatar
unknown committed
2704
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
2705 2706
***************************************************************************/

2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726
/*
  Create table from lists of fields and items (or open existing table
  with same name).

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
      extra_fields in/out Initial list of fields for table to be created
      keys         in     List of keys for table to be created
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
                          be added to the end of 'extra_fields' list)
      lock         out    Pointer to the MYSQL_LOCK object for table created
                          (open) will be returned in this parameter. Since
                          this table is not included in THD::lock caller is
                          responsible for explicitly unlocking this table.
unknown's avatar
unknown committed
2727
      hooks
2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745

  NOTES
    If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS
    flag and table with name provided already exists then this function will
    simply open existing table.
    Also note that create, open and lock sequence in this function is not
    atomic and thus contains gap for deadlock and can cause other troubles.
    Since this function contains some logic specific to CREATE TABLE ... SELECT
    it should be changed before it can be used in other contexts.

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
                                      List<create_field> *extra_fields,
unknown's avatar
unknown committed
2746 2747 2748 2749
                                      List<Key> *keys,
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
2750 2751
{
  TABLE tmp_table;		// Used during 'create_field()'
unknown's avatar
unknown committed
2752
  TABLE_SHARE share;
2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
2764 2765 2766
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

2767 2768
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
2769 2770 2771
  tmp_table.s->db_low_byte_first= 
        test(create_info->db_type == &myisam_hton ||
             create_info->db_type == &heap_hton);
2772 2773 2774 2775 2776
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
2777
    Field *field, *def_field;
2778
    if (item->type() == Item::FUNC_ITEM)
2779
      field= item->tmp_table_field(&tmp_table);
2780
    else
2781 2782 2783
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
    extra_fields->push_back(cr_field);
  }
  /*
    create and lock table

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
2804 2805 2806
    NOTE: By locking table which we just have created (or for which we just
    have have found that it already exists) separately from other tables used
    by the statement we create potential window for deadlock.
2807 2808 2809 2810 2811 2812
    TODO: create and open should be done atomic !
  */
  {
    tmp_disable_binlog(thd);
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
                            create_info, *extra_fields, *keys, 0,
2813
                            select_field_count, 0))
2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827
    {
      /*
        If we are here in prelocked mode we either create temporary table
        or prelocked mode is caused by the SELECT part of this statement.
      */
      DBUG_ASSERT(!thd->prelocked_mode ||
                  create_info->options & HA_LEX_CREATE_TMP_TABLE ||
                  thd->lex->requires_prelocking());

      /*
        NOTE: We don't want to ignore set of locked tables here if we are
              under explicit LOCK TABLES since it will open gap for deadlock
              too wide (and also is not backward compatible).
      */
unknown's avatar
unknown committed
2828

2829 2830 2831 2832 2833
      if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                               (MYSQL_LOCK_IGNORE_FLUSH |
                                ((thd->prelocked_mode == PRELOCKED) ?
                                 MYSQL_OPEN_IGNORE_LOCKED_TABLES:0)))))
        quick_rm_table(create_info->db_type, create_table->db,
2834 2835
                       table_case_name(create_info, create_table->table_name),
                       0);
2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

  /*
    FIXME: What happens if trigger manages to be created while we are
           obtaining this lock ? May be it is sensible just to disable
           trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH
           save us from that ?
  */
  table->reginfo.lock_type=TL_WRITE;
unknown's avatar
unknown committed
2849
  hooks->prelock(&table, 1);                    // Call prelock hooks
2850 2851 2852 2853 2854 2855 2856
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
  {
    VOID(pthread_mutex_lock(&LOCK_open));
    hash_delete(&open_cache,(byte*) table);
    VOID(pthread_mutex_unlock(&LOCK_open));
    quick_rm_table(create_info->db_type, create_table->db,
2857
		   table_case_name(create_info, create_table->table_name), 0);
2858 2859 2860 2861 2862 2863 2864
    DBUG_RETURN(0);
  }
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(table);
}


unknown's avatar
unknown committed
2865
int
2866
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2867 2868
{
  DBUG_ENTER("select_create::prepare");
2869

2870 2871
  TABLEOP_HOOKS *hook_ptr= NULL;
#ifdef HAVE_ROW_BASED_REPLICATION
2872 2873 2874 2875 2876
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
    MY_HOOKS(select_create *x) : ptr(x) { }
    virtual void do_prelock(TABLE **tables, uint count)
    {
unknown's avatar
unknown committed
2877 2878 2879
    if (ptr->get_thd()->current_stmt_binlog_row_based  &&
        !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE))
      ptr->binlog_show_create_table(tables, count);
2880 2881 2882 2883 2884 2885 2886
    }

  private:
    select_create *ptr;
  };

  MY_HOOKS hooks(this);
2887 2888
  hook_ptr= &hooks;
#endif
2889

2890
  unit= u;
2891
  if (!(table= create_table_from_items(thd, create_info, create_table,
2892 2893
                                       extra_fields, keys, &values,
                                       &thd->extra_lock, hook_ptr)))
unknown's avatar
unknown committed
2894 2895
    DBUG_RETURN(-1);				// abort() deletes table

2896
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
2897
  {
2898
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
unknown's avatar
unknown committed
2899 2900 2901
    DBUG_RETURN(-1);
  }

2902
 /* First field to copy */
unknown's avatar
unknown committed
2903 2904 2905 2906
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
2907
    bitmap_set_bit(table->write_set, (*f)->field_index);
unknown's avatar
unknown committed
2908

2909
  /* Don't set timestamp if used */
2910
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
unknown's avatar
unknown committed
2911 2912
  table->next_number_field=table->found_next_number_field;

2913
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
2914
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2915
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2916
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2917 2918 2919
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2920
  if (!thd->prelocked_mode)
2921
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2922
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2923
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2924 2925 2926
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2927 2928 2929 2930
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2931 2932 2933
}


2934
#ifdef HAVE_ROW_BASED_REPLICATION
2935
void
2936
select_create::binlog_show_create_table(TABLE **tables, uint count)
2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
2954
  */
2955
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
2956
  DBUG_ASSERT(tables && *tables && count > 0);
2957 2958 2959

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
2960
  int result;
2961
  TABLE_LIST table_list;
2962

2963 2964
  memset(&table_list, 0, sizeof(table_list));
  table_list.table = *tables;
2965
  query.length(0);      // Have to zero it since constructor doesn't
2966

2967
  result= store_create_info(thd, &table_list, &query, create_info);
2968
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
2969

2970 2971 2972 2973 2974
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}
2975
#endif // HAVE_ROW_BASED_REPLICATION
2976

unknown's avatar
unknown committed
2977
void select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
2978
{
unknown's avatar
unknown committed
2979 2980
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2981 2982
}

unknown's avatar
unknown committed
2983

2984 2985 2986 2987 2988 2989 2990 2991 2992
void select_create::send_error(uint errcode,const char *err)
{
  /*
   Disable binlog, because we "roll back" partial inserts in ::abort
   by removing the table, even for non-transactional tables.
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
unknown's avatar
unknown committed
2993 2994
}

unknown's avatar
unknown committed
2995

unknown's avatar
unknown committed
2996 2997 2998 2999 3000 3001 3002
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
unknown's avatar
unknown committed
3003
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3004
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3005
    VOID(pthread_mutex_lock(&LOCK_open));
3006
    mysql_unlock_tables(thd, thd->extra_lock);
3007
    if (!table->s->tmp_table)
3008
    {
3009
      if (close_thread_table(thd, &table))
3010
        broadcast_refresh();
3011
    }
3012
    thd->extra_lock=0;
unknown's avatar
unknown committed
3013
    table=0;
unknown's avatar
unknown committed
3014 3015 3016 3017 3018 3019 3020 3021
    VOID(pthread_mutex_unlock(&LOCK_open));
  }
  return tmp;
}

void select_create::abort()
{
  VOID(pthread_mutex_lock(&LOCK_open));
3022
  if (thd->extra_lock)
unknown's avatar
unknown committed
3023
  {
3024 3025
    mysql_unlock_tables(thd, thd->extra_lock);
    thd->extra_lock=0;
unknown's avatar
unknown committed
3026 3027 3028
  }
  if (table)
  {
unknown's avatar
unknown committed
3029
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3030
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3031
    handlerton *table_type=table->s->db_type;
3032
    if (!table->s->tmp_table)
3033
    {
3034
      ulong version= table->s->version;
unknown's avatar
unknown committed
3035
      table->s->version= 0;
3036
      hash_delete(&open_cache,(byte*) table);
3037
      if (!create_info->table_existed)
3038 3039
        quick_rm_table(table_type, create_table->db,
                       create_table->table_name, 0);
3040
      /* Tell threads waiting for refresh that something has happened */
unknown's avatar
unknown committed
3041
      if (version != refresh_version)
3042
        broadcast_refresh();
3043 3044
    }
    else if (!create_info->table_existed)
unknown's avatar
unknown committed
3045 3046
      close_temporary_table(thd, table, 1, 1);
    table=0;                                    // Safety
unknown's avatar
unknown committed
3047 3048 3049 3050 3051 3052
  }
  VOID(pthread_mutex_unlock(&LOCK_open));
}


/*****************************************************************************
unknown's avatar
unknown committed
3053
  Instansiate templates
unknown's avatar
unknown committed
3054 3055
*****************************************************************************/

3056
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
3057
template class List_iterator_fast<List_item>;
3058
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
3059 3060 3061
template class I_List<delayed_insert>;
template class I_List_iterator<delayed_insert>;
template class I_List<delayed_row>;
3062
#endif /* EMBEDDED_LIBRARY */
3063
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */