sql_load.cc 45.2 KB
Newer Older
1
/*
2
   Copyright (c) 2000, 2011, Oracle and/or its affiliates.
unknown's avatar
unknown committed
3

unknown's avatar
unknown committed
4 5
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
6
   the Free Software Foundation; version 2 of the License.
unknown's avatar
unknown committed
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
12

unknown's avatar
unknown committed
13 14
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
15 16
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
*/
unknown's avatar
unknown committed
17 18 19 20 21 22


/* Copy data from a textfile to table */
#include "mysql_priv.h"
#include <my_dir.h>
#include <m_ctype.h>
23
#include "rpl_mi.h"
24
#include "sql_repl.h"
unknown's avatar
unknown committed
25 26
#include "sp_head.h"
#include "sql_trigger.h"
unknown's avatar
unknown committed
27 28 29

class READ_INFO {
  File	file;
30
  uchar	*buffer,			/* Buffer for read text */
unknown's avatar
unknown committed
31 32 33 34 35 36 37 38
	*end_of_buff;			/* Data in bufferts ends here */
  uint	buff_length,			/* Length of buffert */
	max_length;			/* Max length of row */
  char	*field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
  uint	field_term_length,line_term_length,enclosed_length;
  int	field_term_char,line_term_char,enclosed_char,escape_char;
  int	*stack,*stack_pos;
  bool	found_end_of_line,start_of_line,eof;
39
  bool  need_end_io_cache;
unknown's avatar
unknown committed
40 41 42 43 44
  IO_CACHE cache;
  NET *io_net;

public:
  bool error,line_cuted,found_null,enclosed;
45
  uchar	*row_start,			/* Found row starts here */
unknown's avatar
unknown committed
46
	*row_end;			/* Found row ends here */
unknown's avatar
unknown committed
47
  CHARSET_INFO *read_charset;
unknown's avatar
unknown committed
48

unknown's avatar
unknown committed
49
  READ_INFO(File file,uint tot_length,CHARSET_INFO *cs,
unknown's avatar
unknown committed
50
	    String &field_term,String &line_start,String &line_term,
51
	    String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
unknown's avatar
unknown committed
52 53 54 55 56 57 58
  ~READ_INFO();
  int read_field();
  int read_fixed_length(void);
  int next_line(void);
  char unescape(char chr);
  int terminator(char *ptr,uint length);
  bool find_start_of_fields();
unknown's avatar
unknown committed
59 60 61 62
  /*
    We need to force cache close before destructor is invoked to log
    the last read block
  */
63 64 65 66 67
  void end_io_cache()
  {
    ::end_io_cache(&cache);
    need_end_io_cache = 0;
  }
68 69
  my_off_t file_length() { return cache.end_of_file; }
  my_off_t position()    { return my_b_tell(&cache); }
70

unknown's avatar
unknown committed
71 72 73 74 75
  /*
    Either this method, or we need to make cache public
    Arg must be set from mysql_load() since constructor does not see
    either the table or THD value
  */
76
  void set_io_cache_arg(void* arg) { cache.arg = arg; }
unknown's avatar
unknown committed
77 78
};

79
static int read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
unknown's avatar
unknown committed
80 81
                             List<Item> &fields_vars, List<Item> &set_fields,
                             List<Item> &set_values, READ_INFO &read_info,
82 83 84
			     ulong skip_lines,
			     bool ignore_check_option_errors);
static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
unknown's avatar
unknown committed
85 86
                          List<Item> &fields_vars, List<Item> &set_fields,
                          List<Item> &set_values, READ_INFO &read_info,
87 88
			  String &enclosed, ulong skip_lines,
			  bool ignore_check_option_errors);
89
#ifndef EMBEDDED_LIBRARY
90
static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex,
91
                                               const char* db_arg, /* table's database */
92 93 94 95 96
                                               const char* table_name_arg,
                                               enum enum_duplicates duplicates,
                                               bool ignore,
                                               bool transactional_table,
                                               int errocode);
97
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119

/*
  Execute LOAD DATA query

  SYNOPSYS
    mysql_load()
      thd - current thread
      ex  - sql_exchange object representing source file and its parsing rules
      table_list  - list of tables to which we are loading data
      fields_vars - list of fields and variables to which we read
                    data from file
      set_fields  - list of fields mentioned in set clause
      set_values  - expressions to assign to fields in previous list
      handle_duplicates - indicates whenever we should emit error or
                          replace row if we will meet duplicates.
      ignore -          - indicates whenever we should ignore duplicates
      read_file_from_client - is this LOAD DATA LOCAL ?

  RETURN VALUES
    TRUE - error / FALSE - success
*/

120
int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
unknown's avatar
unknown committed
121 122 123
	        List<Item> &fields_vars, List<Item> &set_fields,
                List<Item> &set_values,
                enum enum_duplicates handle_duplicates, bool ignore,
124
                bool read_file_from_client)
unknown's avatar
unknown committed
125 126 127
{
  char name[FN_REFLEN];
  File file;
128
  TABLE *table= NULL;
He Zhenxing's avatar
He Zhenxing committed
129
  int error= 0;
130 131
  String *field_term=ex->field_term,*escaped=ex->escaped;
  String *enclosed=ex->enclosed;
132
  bool is_fifo=0;
133
#ifndef EMBEDDED_LIBRARY
134
  LOAD_FILE_INFO lf_info;
135
  killed_state killed_status;
136
#endif
unknown's avatar
unknown committed
137
  char *db = table_list->db;			// This is never null
unknown's avatar
unknown committed
138 139 140 141 142
  /*
    If path for file is not defined, we will use the current database.
    If this is not set, we will use the directory where the table to be
    loaded is located
  */
unknown's avatar
unknown committed
143
  char *tdb= thd->db ? thd->db : db;		// Result is never null
144
  ulong skip_lines= ex->skip_lines;
Michael Widenius's avatar
Michael Widenius committed
145
  bool transactional_table __attribute__((unused));
unknown's avatar
unknown committed
146 147
  DBUG_ENTER("mysql_load");

148 149 150 151 152 153 154 155
  /*
    Bug #34283
    mysqlbinlog leaves tmpfile after termination if binlog contains
    load data infile, so in mixed mode we go to row-based for
    avoiding the problem.
  */
  thd->set_current_stmt_binlog_row_based_if_mixed();

unknown's avatar
unknown committed
156 157 158 159
#ifdef EMBEDDED_LIBRARY
  read_file_from_client  = 0; //server is always in the same process 
#endif

unknown's avatar
unknown committed
160 161 162 163
  if (escaped->length() > 1 || enclosed->length() > 1)
  {
    my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
	       MYF(0));
unknown's avatar
unknown committed
164
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
165
  }
166 167 168 169 170 171 172 173 174 175 176

  /* Report problems with non-ascii separators */
  if (!escaped->is_ascii() || !enclosed->is_ascii() ||
      !field_term->is_ascii() ||
      !ex->line_term->is_ascii() || !ex->line_start->is_ascii())
  {
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                 WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED,
                 ER(WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED));
  } 

unknown's avatar
unknown committed
177 178
  if (open_and_lock_tables(thd, table_list))
    DBUG_RETURN(TRUE);
179 180 181
  if (mysql_handle_single_derived(thd->lex, table_list, DT_MERGE_FOR_INSERT) ||
      mysql_handle_single_derived(thd->lex, table_list, DT_PREPARE))
    DBUG_RETURN(TRUE);
182 183
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
184
                                    table_list,
185
                                    thd->lex->select_lex.leaf_tables, FALSE,
186
                                    INSERT_ACL | UPDATE_ACL,
187
                                    INSERT_ACL | UPDATE_ACL, FALSE))
188
     DBUG_RETURN(-1);
unknown's avatar
unknown committed
189
  if (!table_list->table ||               // do not suport join view
190
      !table_list->single_table_updatable() || // and derived tables
unknown's avatar
unknown committed
191
      check_key_in_view(thd, table_list))
unknown's avatar
VIEW  
unknown committed
192
  {
193
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "LOAD");
unknown's avatar
unknown committed
194
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
195
  }
196 197 198 199 200
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
  {
    DBUG_RETURN(TRUE);
  }
unknown's avatar
unknown committed
201 202 203 204 205 206
  /*
    Let us emit an error if we are loading data to table which is used
    in subselect in SET clause like we do it for INSERT.

    The main thing to fix to remove this restriction is to ensure that the
    table is marked to be 'used for insert' in which case we should never
207
    mark this table as 'const table' (ie, one that has only one row).
unknown's avatar
unknown committed
208
  */
209
  if (unique_table(thd, table_list, table_list->next_global, 0))
unknown's avatar
unknown committed
210 211 212 213 214
  {
    my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
    DBUG_RETURN(TRUE);
  }

unknown's avatar
VIEW  
unknown committed
215
  table= table_list->table;
216 217
  transactional_table= table->file->has_transactions();

unknown's avatar
unknown committed
218
  if (!fields_vars.elements)
unknown's avatar
unknown committed
219 220 221
  {
    Field **field;
    for (field=table->field; *field ; field++)
unknown's avatar
unknown committed
222
      fields_vars.push_back(new Item_field(*field));
223
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
224 225 226 227 228
    table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
    /*
      Let us also prepare SET clause, altough it is probably empty
      in this case.
    */
229 230
    if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
        setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
231
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
232 233 234
  }
  else
  {						// Part field list
unknown's avatar
VIEW  
unknown committed
235
    /* TODO: use this conds for 'WITH CHECK OPTIONS' */
236 237
    if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
        setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
238
        check_that_all_fields_are_given_values(thd, table, table_list))
unknown's avatar
unknown committed
239
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
240 241 242 243
    /*
      Check whenever TIMESTAMP field with auto-set feature specified
      explicitly.
    */
244 245 246 247 248 249 250 251 252 253 254 255 256
    if (table->timestamp_field)
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
    /* Fix the expressions in SET clause */
    if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
257
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
258 259
  }

unknown's avatar
unknown committed
260
  prepare_triggers_for_insert_stmt(table);
261

unknown's avatar
unknown committed
262
  uint tot_length=0;
unknown's avatar
unknown committed
263 264 265
  bool use_blobs= 0, use_vars= 0;
  List_iterator_fast<Item> it(fields_vars);
  Item *item;
unknown's avatar
unknown committed
266

unknown's avatar
unknown committed
267
  while ((item= it++))
unknown's avatar
unknown committed
268
  {
269 270 271
    Item *real_item= item->real_item();

    if (real_item->type() == Item::FIELD_ITEM)
unknown's avatar
unknown committed
272
    {
273
      Field *field= ((Item_field*)real_item)->field;
unknown's avatar
unknown committed
274 275 276 277 278 279 280
      if (field->flags & BLOB_FLAG)
      {
        use_blobs= 1;
        tot_length+= 256;			// Will be extended if needed
      }
      else
        tot_length+= field->field_length;
unknown's avatar
unknown committed
281
    }
282
    else if (item->type() == Item::STRING_ITEM)
unknown's avatar
unknown committed
283
      use_vars= 1;
unknown's avatar
unknown committed
284 285 286 287 288
  }
  if (use_blobs && !ex->line_term->length() && !field_term->length())
  {
    my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
	       MYF(0));
unknown's avatar
unknown committed
289
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
290
  }
unknown's avatar
unknown committed
291 292 293 294 295
  if (use_vars && !field_term->length() && !enclosed->length())
  {
    my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
    DBUG_RETURN(TRUE);
  }
unknown's avatar
unknown committed
296 297 298

  /* We can't give an error in the middle when using LOCAL files */
  if (read_file_from_client && handle_duplicates == DUP_ERROR)
299
    ignore= 1;
unknown's avatar
unknown committed
300

301
#ifndef EMBEDDED_LIBRARY
302
  if (read_file_from_client)
unknown's avatar
unknown committed
303
  {
unknown's avatar
unknown committed
304
    (void)net_request_file(&thd->net,ex->file_name);
unknown's avatar
unknown committed
305 306 307
    file = -1;
  }
  else
308
#endif
unknown's avatar
unknown committed
309 310 311 312
  {
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
    ex->file_name+=dirname_length(ex->file_name);
#endif
313
    if (!dirname_length(ex->file_name))
unknown's avatar
unknown committed
314
    {
unknown's avatar
unknown committed
315
      strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NullS);
unknown's avatar
unknown committed
316 317
      (void) fn_format(name, ex->file_name, name, "",
		       MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
unknown's avatar
unknown committed
318 319 320
    }
    else
    {
unknown's avatar
unknown committed
321
      (void) fn_format(name, ex->file_name, mysql_real_data_home, "",
322 323
                       MY_RELATIVE_PATH | MY_UNPACK_FILENAME |
                       MY_RETURN_REAL_PATH);
324
    }
325

326 327
    if (thd->slave_thread)
    {
328
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
329 330 331
      if (strncmp(active_mi->rli.slave_patternload_file, name, 
          active_mi->rli.slave_patternload_file_size))
      {
332
        /*
333 334
          LOAD DATA INFILE in the slave SQL Thread can only read from 
          --slave-load-tmpdir". This should never happen. Please, report a bug.
335
        */
336 337 338 339

        sql_print_error("LOAD DATA INFILE in the slave SQL Thread can only read from --slave-load-tmpdir. " \
                        "Please, report a bug.");
        my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--slave-load-tmpdir");
340
        DBUG_RETURN(TRUE);
341
      }
342 343 344 345 346 347 348 349 350 351 352 353 354
#else
      /*
        This is impossible and should never happen.
      */
      DBUG_ASSERT(FALSE); 
#endif
    }
    else if (!is_secure_file_path(name))
    {
      /* Read only allowed from within dir specified by secure_file_priv */
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
      DBUG_RETURN(TRUE);
    }
355

356 357
#if !defined(__WIN__) && ! defined(__NETWARE__)
    MY_STAT stat_info;
358 359
    if (!my_stat(name, &stat_info, MYF(MY_WME)))
      DBUG_RETURN(TRUE);
360 361 362

    // if we are not in slave thread, the file must be:
    if (!thd->slave_thread &&
363 364 365 366
        !((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
          (stat_info.st_mode & S_IFLNK) != S_IFLNK &&  // and not a symlink
          ((stat_info.st_mode & S_IFREG) == S_IFREG || // and a regular file
           (stat_info.st_mode & S_IFIFO) == S_IFIFO))) // or FIFO
367
    {
368 369
      my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
370
    }
371
    if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
372
      is_fifo= 1;
373 374
#endif

unknown's avatar
unknown committed
375
    if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
unknown's avatar
unknown committed
376
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
377 378 379 380
  }

  COPY_INFO info;
  bzero((char*) &info,sizeof(info));
381
  info.ignore= ignore;
unknown's avatar
unknown committed
382
  info.handle_duplicates=handle_duplicates;
383 384 385
  info.escape_char= (escaped->length() && (ex->escaped_given() ||
                    !(thd->variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES)))
                    ? (*escaped)[0] : INT_MAX;
unknown's avatar
unknown committed
386

387 388
  READ_INFO read_info(file,tot_length,
                      ex->cs ? ex->cs : thd->variables.collation_database,
unknown's avatar
unknown committed
389
		      *field_term,*ex->line_start, *ex->line_term, *enclosed,
390
		      info.escape_char, read_file_from_client, is_fifo);
unknown's avatar
unknown committed
391 392 393 394
  if (read_info.error)
  {
    if	(file >= 0)
      my_close(file,MYF(0));			// no files in net reading
unknown's avatar
unknown committed
395
    DBUG_RETURN(TRUE);				// Can't allocate buffers
unknown's avatar
unknown committed
396 397
  }

398
#ifndef EMBEDDED_LIBRARY
399
  if (mysql_bin_log.is_open())
400 401 402 403
  {
    lf_info.thd = thd;
    lf_info.wrote_create_file = 0;
    lf_info.last_pos_in_file = HA_POS_ERROR;
unknown's avatar
unknown committed
404
    lf_info.log_delayed= transactional_table;
unknown's avatar
unknown committed
405
    read_info.set_io_cache_arg((void*) &lf_info);
406
  }
407 408
#endif /*!EMBEDDED_LIBRARY*/

409
  thd->count_cuted_fields= CHECK_FIELD_WARN;		/* calc cuted fields */
unknown's avatar
unknown committed
410
  thd->cuted_fields=0L;
411 412
  /* Skip lines if there is a line terminator */
  if (ex->line_term->length())
unknown's avatar
unknown committed
413
  {
414 415
    /* ex->skip_lines needs to be preserved for logging */
    while (skip_lines > 0)
unknown's avatar
unknown committed
416
    {
417
      skip_lines--;
unknown's avatar
unknown committed
418 419 420 421
      if (read_info.next_line())
	break;
    }
  }
422

423
  thd_proc_info(thd, "reading file");
unknown's avatar
unknown committed
424 425 426
  if (!(error=test(read_info.error)))
  {
    table->next_number_field=table->found_next_number_field;
427
    if (ignore ||
unknown's avatar
unknown committed
428 429
	handle_duplicates == DUP_REPLACE)
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
430 431 432
    if (handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
433
        table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
434
    if (!thd->prelocked_mode)
435
      table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
436
    table->copy_blobs=1;
unknown's avatar
unknown committed
437

unknown's avatar
unknown committed
438
    thd->abort_on_warning= (!ignore &&
unknown's avatar
unknown committed
439 440 441 442
                            (thd->variables.sql_mode &
                             (MODE_STRICT_TRANS_TABLES |
                              MODE_STRICT_ALL_TABLES)));

443
    thd_progress_init(thd, 2);
unknown's avatar
unknown committed
444
    if (!field_term->length() && !enclosed->length())
unknown's avatar
unknown committed
445 446
      error= read_fixed_length(thd, info, table_list, fields_vars,
                               set_fields, set_values, read_info,
447
			       skip_lines, ignore);
unknown's avatar
unknown committed
448
    else
unknown's avatar
unknown committed
449 450 451
      error= read_sep_field(thd, info, table_list, fields_vars,
                            set_fields, set_values, read_info,
			    *enclosed, skip_lines, ignore);
452 453 454
    
    thd_proc_info(thd, "End bulk insert");
    thd_progress_next_stage(thd);
455
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
456 457 458 459
    {
      table->file->print_error(my_errno, MYF(0));
      error= 1;
    }
unknown's avatar
unknown committed
460
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
461
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
462 463
    table->next_number_field=0;
  }
464 465
  if (file >= 0)
    my_close(file,MYF(0));
unknown's avatar
unknown committed
466 467
  free_blobs(table);				/* if pack_blob was used */
  table->copy_blobs=0;
468
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
469 470 471 472 473 474 475
  /* 
     simulated killing in the middle of per-row loop
     must be effective for binlogging
  */
  DBUG_EXECUTE_IF("simulate_kill_bug27571",
                  {
                    error=1;
476
                    thd->killed= KILL_QUERY;
477
                  };);
478 479

#ifndef EMBEDDED_LIBRARY
480
  killed_status= (error == 0) ? NOT_KILLED : thd->killed;
481 482
#endif

483 484 485 486
  /*
    We must invalidate the table in query cache before binlog writing and
    ha_autocommit_...
  */
unknown's avatar
unknown committed
487
  query_cache_invalidate3(thd, table_list, 0);
unknown's avatar
unknown committed
488
  if (error)
489
  {
490 491 492
    if (read_file_from_client)
      while (!read_info.next_line())
	;
unknown's avatar
unknown committed
493

494
#ifndef EMBEDDED_LIBRARY
495
    if (mysql_bin_log.is_open())
496
    {
unknown's avatar
unknown committed
497
      {
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
	/*
	  Make sure last block (the one which caused the error) gets
	  logged.  This is needed because otherwise after write of (to
	  the binlog, not to read_info (which is a cache))
	  Delete_file_log_event the bad block will remain in read_info
	  (because pre_read is not called at the end of the last
	  block; remember pre_read is called whenever a new block is
	  read from disk).  At the end of mysql_load(), the destructor
	  of read_info will call end_io_cache() which will flush
	  read_info, so we will finally have this in the binlog:

	  Append_block # The last successfull block
	  Delete_file
	  Append_block # The failing block
	  which is nonsense.
	  Or could also be (for a small file)
	  Create_file  # The failing block
	  which is nonsense (Delete_file is not written in this case, because:
	  Create_file has not been written, so Delete_file is not written, then
	  when read_info is destroyed end_io_cache() is called which writes
	  Create_file.
	*/
	read_info.end_io_cache();
	/* If the file was not empty, wrote_create_file is true */
	if (lf_info.wrote_create_file)
523
	{
524
          int errcode= query_error_code(thd, killed_status == NOT_KILLED);
525
          
He Zhenxing's avatar
He Zhenxing committed
526 527
          /* since there is already an error, the possible error of
             writing binary log will be ignored */
528
	  if (thd->transaction.stmt.modified_non_trans_table)
He Zhenxing's avatar
He Zhenxing committed
529 530 531 532 533 534
            (void) write_execute_load_query_log_event(thd, ex,
                                                      table_list->db, 
                                                      table_list->table_name,
                                                      handle_duplicates, ignore,
                                                      transactional_table,
                                                      errcode);
535 536 537
	  else
	  {
	    Delete_file_log_event d(thd, db, transactional_table);
He Zhenxing's avatar
He Zhenxing committed
538
	    (void) mysql_bin_log.write(&d);
539
	  }
540
	}
unknown's avatar
unknown committed
541
      }
542
    }
543
#endif /*!EMBEDDED_LIBRARY*/
544 545
    error= -1;				// Error on read
    goto err;
546
  }
547 548
  sprintf(name, ER(ER_LOAD_INFO), (ulong) info.records, (ulong) info.deleted,
	  (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
549

unknown's avatar
unknown committed
550 551
  if (thd->transaction.stmt.modified_non_trans_table)
    thd->transaction.all.modified_non_trans_table= TRUE;
552
#ifndef EMBEDDED_LIBRARY
553
  if (mysql_bin_log.is_open())
554
  {
555
    /*
556 557 558 559 560 561 562
      We need to do the job that is normally done inside
      binlog_query() here, which is to ensure that the pending event
      is written before tables are unlocked and before any other
      events are written.  We also need to update the table map
      version for the binary log to mark that table maps are invalid
      after this point.
     */
563
    if (thd->current_stmt_binlog_row_based)
He Zhenxing's avatar
He Zhenxing committed
564
      error= thd->binlog_flush_pending_rows_event(true);
565 566 567 568 569 570 571 572 573 574
    else
    {
      /*
        As already explained above, we need to call end_io_cache() or the last
        block will be logged only after Execute_load_query_log_event (which is
        wrong), when read_info is destroyed.
      */
      read_info.end_io_cache();
      if (lf_info.wrote_create_file)
      {
575
        int errcode= query_error_code(thd, killed_status == NOT_KILLED);
He Zhenxing's avatar
He Zhenxing committed
576 577 578 579 580
        error= write_execute_load_query_log_event(thd, ex,
                                                  table_list->db, table_list->table_name,
                                                  handle_duplicates, ignore,
                                                  transactional_table,
                                                  errcode);
581
      }
582 583 584 585 586 587 588

      /*
        Flushing the IO CACHE while writing the execute load query log event
        may result in error (for instance, because the max_binlog_size has been 
        reached, and rotation of the binary log failed).
      */
      error= error || mysql_bin_log.get_log_file()->error;
589
    }
He Zhenxing's avatar
He Zhenxing committed
590 591
    if (error)
      goto err;
592
  }
593
#endif /*!EMBEDDED_LIBRARY*/
unknown's avatar
unknown committed
594

595
  /* ok to client sent only after binlog write and engine commit */
596
  my_ok(thd, info.copied + info.deleted, 0L, name);
597
err:
unknown's avatar
unknown committed
598 599
  DBUG_ASSERT(transactional_table || !(info.copied || info.deleted) ||
              thd->transaction.stmt.modified_non_trans_table);
600
  table->file->ha_release_auto_increment();
601
  table->auto_increment_field_not_null= FALSE;
unknown's avatar
unknown committed
602
  thd->abort_on_warning= 0;
603
  DBUG_RETURN(error);
unknown's avatar
unknown committed
604 605
}

606

607 608
#ifndef EMBEDDED_LIBRARY

609
/* Not a very useful function; just to avoid duplication of code */
610
static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex,
611
                                               const char* db_arg,  /* table's database */
612 613 614 615
                                               const char* table_name_arg,
                                               enum enum_duplicates duplicates,
                                               bool ignore,
                                               bool transactional_table,
616
                                               int errcode)
617
{
618 619 620 621 622 623 624 625 626
  char                *load_data_query,
                      *end,
                      *fname_start,
                      *fname_end,
                      *p= NULL;
  size_t               pl= 0;
  List<Item>           fv;
  Item                *item, *val;
  int                  n;
627 628
  const char          *tbl= table_name_arg;
  const char          *tdb= (thd->db != NULL ? thd->db : db_arg);
629 630 631 632 633 634
  char 		      name_buffer[SAFE_NAME_LEN*2];
  char                command_buffer[1024];
  String              string_buf(name_buffer, sizeof(name_buffer),
                                 system_charset_info);
  String              pfields(command_buffer, sizeof(command_buffer),
                              system_charset_info);
635

636 637 638 639 640 641 642
  if (!thd->db || strcmp(db_arg, thd->db)) 
  {
    /*
      If used database differs from table's database, 
      prefix table name with database name so that it 
      becomes a FQ name.
     */
643
    string_buf.length(0);
644 645 646 647 648 649 650 651 652
    string_buf.append(db_arg);
    string_buf.append("`");
    string_buf.append(".");
    string_buf.append("`");
    string_buf.append(table_name_arg);
    tbl= string_buf.c_ptr_safe();
  }

  Load_log_event       lle(thd, ex, tdb, tbl, fv, duplicates,
653 654 655 656 657 658 659 660 661 662 663
                           ignore, transactional_table);

  /*
    force in a LOCAL if there was one in the original.
  */
  if (thd->lex->local_file)
    lle.set_fname_outside_temp_buf(ex->file_name, strlen(ex->file_name));

  /*
    prepare fields-list and SET if needed; print_query won't do that for us.
  */
664
  pfields.length(0);
665 666 667 668 669 670 671 672 673 674 675 676
  if (!thd->lex->field_list.is_empty())
  {
    List_iterator<Item>  li(thd->lex->field_list);

    pfields.append(" (");
    n= 0;

    while ((item= li++))
    {
      if (n++)
        pfields.append(", ");
      if (item->name)
677 678
      {
        pfields.append("`");
679
        pfields.append(item->name);
680 681
        pfields.append("`");
      }
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
      else
        item->print(&pfields, QT_ORDINARY);
    }
    pfields.append(")");
  }

  if (!thd->lex->update_list.is_empty())
  {
    List_iterator<Item> lu(thd->lex->update_list);
    List_iterator<Item> lv(thd->lex->value_list);

    pfields.append(" SET ");
    n= 0;

    while ((item= lu++))
    {
      val= lv++;
      if (n++)
        pfields.append(", ");
701
      pfields.append("`");
702
      pfields.append(item->name);
703
      pfields.append("`");
704 705 706 707 708
      pfields.append("=");
      val->print(&pfields, QT_ORDINARY);
    }
  }

709 710
  p=  pfields.c_ptr_safe();
  pl= pfields.length();
711 712 713 714 715 716 717 718 719 720 721

  if (!(load_data_query= (char *)thd->alloc(lle.get_query_buffer_length() + 1 + pl)))
    return TRUE;

  lle.print_query(FALSE, (const char *) ex->cs?ex->cs->csname:NULL,
                  load_data_query, &end,
                  (char **)&fname_start, (char **)&fname_end);

  strcpy(end, p);
  end += pl;

722
  Execute_load_query_log_event
723 724 725
    e(thd, load_data_query, end-load_data_query,
      (uint) ((char*) fname_start - load_data_query - 1),
      (uint) ((char*) fname_end - load_data_query),
726 727
      (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
      (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
728
      transactional_table, FALSE, errcode);
729 730 731
  return mysql_bin_log.write(&e);
}

732
#endif
733

unknown's avatar
unknown committed
734 735 736 737 738
/****************************************************************************
** Read of rows of fixed size + optional garage + optonal newline
****************************************************************************/

static int
739
read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
unknown's avatar
unknown committed
740 741 742
                  List<Item> &fields_vars, List<Item> &set_fields,
                  List<Item> &set_values, READ_INFO &read_info,
                  ulong skip_lines, bool ignore_check_option_errors)
unknown's avatar
unknown committed
743
{
unknown's avatar
unknown committed
744
  List_iterator_fast<Item> it(fields_vars);
unknown's avatar
unknown committed
745
  Item_field *sql_field;
746
  TABLE *table= table_list->table;
747 748
  bool err, progress_reports;
  ulonglong counter, time_to_report_progress;
unknown's avatar
unknown committed
749 750
  DBUG_ENTER("read_fixed_length");

751 752 753 754 755 756
  counter= 0;
  time_to_report_progress= MY_HOW_OFTEN_TO_WRITE/10;
  progress_reports= 1;
  if ((thd->progress.max_counter= read_info.file_length()) == ~(my_off_t) 0)
    progress_reports= 0;

unknown's avatar
unknown committed
757 758 759 760
  while (!read_info.read_fixed_length())
  {
    if (thd->killed)
    {
unknown's avatar
SCRUM  
unknown committed
761
      thd->send_kill_message();
unknown's avatar
unknown committed
762 763
      DBUG_RETURN(1);
    }
764 765 766 767 768 769 770 771 772 773
    if (progress_reports)
    {
      thd->progress.counter= read_info.position();
      if (++counter >= time_to_report_progress)
      {
        time_to_report_progress+= MY_HOW_OFTEN_TO_WRITE/10;
        thd_progress_report(thd, thd->progress.counter,
                            thd->progress.max_counter);
      }
    }
774 775 776 777 778 779 780 781 782 783 784
    if (skip_lines)
    {
      /*
	We could implement this with a simple seek if:
	- We are not using DATA INFILE LOCAL
	- escape character is  ""
	- line starting prefix is ""
      */
      skip_lines--;
      continue;
    }
unknown's avatar
unknown committed
785
    it.rewind();
786
    uchar *pos=read_info.row_start;
787
#ifdef HAVE_valgrind
unknown's avatar
unknown committed
788 789
    read_info.row_end[0]=0;
#endif
unknown's avatar
unknown committed
790 791 792 793 794 795

    restore_record(table, s->default_values);
    /*
      There is no variables in fields_vars list in this format so
      this conversion is safe.
    */
unknown's avatar
unknown committed
796 797
    while ((sql_field= (Item_field*) it++))
    {
798
      Field *field= sql_field->field;                  
799 800
      if (field == table->next_number_field)
        table->auto_increment_field_not_null= TRUE;
unknown's avatar
unknown committed
801 802 803 804 805 806 807
      /*
        No fields specified in fields_vars list can be null in this format.
        Mark field as not null, we should do this for each row because of
        restore_record...
      */
      field->set_notnull();

unknown's avatar
unknown committed
808 809
      if (pos == read_info.row_end)
      {
810 811 812
        thd->cuted_fields++;			/* Not enough fields */
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 
                            ER_WARN_TOO_FEW_RECORDS, 
813
                            ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
814 815
        if (!field->maybe_null() && field->type() == FIELD_TYPE_TIMESTAMP)
            ((Field_timestamp*) field)->set_time();
unknown's avatar
unknown committed
816 817 818 819
      }
      else
      {
	uint length;
820
	uchar save_chr;
unknown's avatar
unknown committed
821 822 823 824
	if ((length=(uint) (read_info.row_end-pos)) >
	    field->field_length)
	  length=field->field_length;
	save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
unknown's avatar
unknown committed
825
        field->store((char*) pos,length,read_info.read_charset);
unknown's avatar
unknown committed
826 827 828 829 830 831
	pos[length]=save_chr;
	if ((pos+=length) > read_info.row_end)
	  pos= read_info.row_end;	/* Fills rest with space */
      }
    }
    if (pos != read_info.row_end)
832
    {
unknown's avatar
unknown committed
833
      thd->cuted_fields++;			/* To long row */
834 835
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 
                          ER_WARN_TOO_MANY_RECORDS, 
836
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
837
    }
838

unknown's avatar
unknown committed
839 840 841 842 843
    if (thd->killed ||
        fill_record_n_invoke_before_triggers(thd, set_fields, set_values,
                                             ignore_check_option_errors,
                                             table->triggers,
                                             TRG_EVENT_INSERT))
unknown's avatar
unknown committed
844 845
      DBUG_RETURN(1);

846 847
    switch (table_list->view_check_option(thd,
                                          ignore_check_option_errors)) {
848 849 850 851 852 853 854
    case VIEW_CHECK_SKIP:
      read_info.next_line();
      goto continue_loop;
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(-1);
    }

855 856 857
    err= write_record(thd, table, &info);
    table->auto_increment_field_not_null= FALSE;
    if (err)
unknown's avatar
unknown committed
858
      DBUG_RETURN(1);
unknown's avatar
unknown committed
859
   
unknown's avatar
unknown committed
860 861 862 863
    /*
      We don't need to reset auto-increment field since we are restoring
      its default value at the beginning of each loop iteration.
    */
unknown's avatar
unknown committed
864
    if (read_info.next_line())			// Skip to next line
unknown's avatar
unknown committed
865 866
      break;
    if (read_info.line_cuted)
867
    {
unknown's avatar
unknown committed
868
      thd->cuted_fields++;			/* To long row */
869 870
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 
                          ER_WARN_TOO_MANY_RECORDS, 
871
                          ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count); 
872
    }
873
    thd->row_count++;
874
continue_loop:;
unknown's avatar
unknown committed
875 876 877 878 879 880 881
  }
  DBUG_RETURN(test(read_info.error));
}



static int
882
read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
unknown's avatar
unknown committed
883 884
               List<Item> &fields_vars, List<Item> &set_fields,
               List<Item> &set_values, READ_INFO &read_info,
885 886
	       String &enclosed, ulong skip_lines,
	       bool ignore_check_option_errors)
unknown's avatar
unknown committed
887
{
unknown's avatar
unknown committed
888 889
  List_iterator_fast<Item> it(fields_vars);
  Item *item;
890
  TABLE *table= table_list->table;
unknown's avatar
unknown committed
891
  uint enclosed_length;
892 893
  bool err, progress_reports;
  ulonglong counter, time_to_report_progress;
unknown's avatar
unknown committed
894 895 896
  DBUG_ENTER("read_sep_field");

  enclosed_length=enclosed.length();
897

898 899 900 901 902 903
  counter= 0;
  time_to_report_progress= MY_HOW_OFTEN_TO_WRITE/10;
  progress_reports= 1;
  if ((thd->progress.max_counter= read_info.file_length()) == ~(my_off_t) 0)
    progress_reports= 0;

unknown's avatar
unknown committed
904 905 906 907
  for (;;it.rewind())
  {
    if (thd->killed)
    {
unknown's avatar
SCRUM  
unknown committed
908
      thd->send_kill_message();
unknown's avatar
unknown committed
909 910
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
911

912 913 914 915 916 917 918 919 920 921
    if (progress_reports)
    {
      thd->progress.counter= read_info.position();
      if (++counter >= time_to_report_progress)
      {
        time_to_report_progress+= MY_HOW_OFTEN_TO_WRITE/10;
        thd_progress_report(thd, thd->progress.counter,
                            thd->progress.max_counter);
      }
    }
unknown's avatar
unknown committed
922 923 924
    restore_record(table, s->default_values);

    while ((item= it++))
unknown's avatar
unknown committed
925 926
    {
      uint length;
927
      uchar *pos;
928
      Item *real_item;
unknown's avatar
unknown committed
929 930 931

      if (read_info.read_field())
	break;
unknown's avatar
unknown committed
932 933 934 935 936

      /* If this line is to be skipped we don't want to fill field or var */
      if (skip_lines)
        continue;

unknown's avatar
unknown committed
937 938 939
      pos=read_info.row_start;
      length=(uint) (read_info.row_end-pos);

940 941
      real_item= item->real_item();

942 943 944
      if ((!read_info.enclosed &&
           (enclosed_length && length == 4 &&
            !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
unknown's avatar
unknown committed
945 946
	  (length == 1 && read_info.found_null))
      {
947
        if (real_item->type() == Item::FIELD_ITEM)
unknown's avatar
unknown committed
948
        {
949
          Field *field= ((Item_field *)real_item)->field;
950 951 952 953 954 955
          if (field->reset())
          {
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
                     thd->row_count);
            DBUG_RETURN(1);
          }
unknown's avatar
unknown committed
956 957 958
          field->set_null();
          if (!field->maybe_null())
          {
959
            if (field->type() == MYSQL_TYPE_TIMESTAMP)
unknown's avatar
unknown committed
960 961
              ((Field_timestamp*) field)->set_time();
            else if (field != table->next_number_field)
962
              field->set_warning(MYSQL_ERROR::WARN_LEVEL_WARN,
unknown's avatar
unknown committed
963 964
                                 ER_WARN_NULL_TO_NOTNULL, 1);
          }
unknown's avatar
unknown committed
965
	}
966 967
        else if (item->type() == Item::STRING_ITEM)
        {
unknown's avatar
unknown committed
968 969
          ((Item_user_var_as_out_param *)item)->set_null_value(
                                                  read_info.read_charset);
970 971 972 973 974 975 976
        }
        else
        {
          my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
          DBUG_RETURN(1);
        }

unknown's avatar
unknown committed
977 978
	continue;
      }
unknown's avatar
unknown committed
979

980
      if (real_item->type() == Item::FIELD_ITEM)
unknown's avatar
unknown committed
981
      {
982
        Field *field= ((Item_field *)real_item)->field;
unknown's avatar
unknown committed
983 984
        field->set_notnull();
        read_info.row_end[0]=0;			// Safe to change end marker
unknown's avatar
unknown committed
985 986
        if (field == table->next_number_field)
          table->auto_increment_field_not_null= TRUE;
unknown's avatar
unknown committed
987 988
        field->store((char*) pos, length, read_info.read_charset);
      }
989 990
      else if (item->type() == Item::STRING_ITEM)
      {
unknown's avatar
unknown committed
991
        ((Item_user_var_as_out_param *)item)->set_value((char*) pos, length,
992 993 994 995 996 997 998
                                                        read_info.read_charset);
      }
      else
      {
        my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
        DBUG_RETURN(1);
      }
unknown's avatar
unknown committed
999
    }
1000 1001 1002 1003

    if (thd->is_error())
      read_info.error= 1;

unknown's avatar
unknown committed
1004 1005
    if (read_info.error)
      break;
1006 1007
    if (skip_lines)
    {
unknown's avatar
unknown committed
1008
      skip_lines--;
1009 1010
      continue;
    }
unknown's avatar
unknown committed
1011 1012 1013 1014
    if (item)
    {
      /* Have not read any field, thus input file is simply ended */
      if (item == fields_vars.head())
unknown's avatar
unknown committed
1015
	break;
unknown's avatar
unknown committed
1016
      for (; item ; item= it++)
unknown's avatar
unknown committed
1017
      {
1018 1019
        Item *real_item= item->real_item();
        if (real_item->type() == Item::FIELD_ITEM)
unknown's avatar
unknown committed
1020
        {
1021
          Field *field= ((Item_field *)real_item)->field;
1022 1023
          if (field->reset())
          {
unknown's avatar
unknown committed
1024
            my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
1025 1026 1027
                     thd->row_count);
            DBUG_RETURN(1);
          }
1028 1029
          if (!field->maybe_null() && field->type() == FIELD_TYPE_TIMESTAMP)
              ((Field_timestamp*) field)->set_time();
unknown's avatar
unknown committed
1030
          /*
1031
            TODO: We probably should not throw warning for each field.
unknown's avatar
unknown committed
1032 1033 1034 1035 1036 1037 1038 1039 1040
            But how about intention to always have the same number
            of warnings in THD::cuted_fields (and get rid of cuted_fields
            in the end ?)
          */
          thd->cuted_fields++;
          push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                              ER_WARN_TOO_FEW_RECORDS,
                              ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
        }
1041 1042
        else if (item->type() == Item::STRING_ITEM)
        {
unknown's avatar
unknown committed
1043 1044
          ((Item_user_var_as_out_param *)item)->set_null_value(
                                                  read_info.read_charset);
1045 1046 1047 1048 1049 1050
        }
        else
        {
          my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
          DBUG_RETURN(1);
        }
unknown's avatar
unknown committed
1051 1052
      }
    }
1053

unknown's avatar
unknown committed
1054 1055 1056 1057 1058
    if (thd->killed ||
        fill_record_n_invoke_before_triggers(thd, set_fields, set_values,
                                             ignore_check_option_errors,
                                             table->triggers,
                                             TRG_EVENT_INSERT))
unknown's avatar
unknown committed
1059 1060
      DBUG_RETURN(1);

1061 1062
    switch (table_list->view_check_option(thd,
                                          ignore_check_option_errors)) {
1063 1064 1065 1066 1067 1068 1069
    case VIEW_CHECK_SKIP:
      read_info.next_line();
      goto continue_loop;
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(-1);
    }

1070 1071 1072
    err= write_record(thd, table, &info);
    table->auto_increment_field_not_null= FALSE;
    if (err)
unknown's avatar
unknown committed
1073
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1074 1075 1076 1077
    /*
      We don't need to reset auto-increment field since we are restoring
      its default value at the beginning of each loop iteration.
    */
unknown's avatar
unknown committed
1078
    if (read_info.next_line())			// Skip to next line
unknown's avatar
unknown committed
1079 1080
      break;
    if (read_info.line_cuted)
1081
    {
unknown's avatar
unknown committed
1082
      thd->cuted_fields++;			/* To long row */
1083
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 
1084 1085
                          ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), 
                          thd->row_count);   
unknown's avatar
unknown committed
1086 1087
      if (thd->killed)
        DBUG_RETURN(1);
1088
    }
1089
    thd->row_count++;
1090
continue_loop:;
unknown's avatar
unknown committed
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
  }
  DBUG_RETURN(test(read_info.error));
}


/* Unescape all escape characters, mark \N as null */

char
READ_INFO::unescape(char chr)
{
unknown's avatar
unknown committed
1101
  /* keep this switch synchornous with the ESCAPE_CHARS macro */
unknown's avatar
unknown committed
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
  switch(chr) {
  case 'n': return '\n';
  case 't': return '\t';
  case 'r': return '\r';
  case 'b': return '\b';
  case '0': return 0;				// Ascii null
  case 'Z': return '\032';			// Win32 end of file
  case 'N': found_null=1;

    /* fall through */
  default:  return chr;
  }
}


1117 1118 1119 1120
/*
  Read a line using buffering
  If last line is empty (in line mode) then it isn't outputed
*/
unknown's avatar
unknown committed
1121 1122


unknown's avatar
unknown committed
1123 1124
READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
		     String &field_term, String &line_start, String &line_term,
1125 1126
		     String &enclosed_par, int escape, bool get_it_from_net,
		     bool is_fifo)
1127 1128 1129
  :file(file_par), buff_length(tot_length), escape_char(escape),
   found_end_of_line(false), eof(false), need_end_io_cache(false),
   error(false), line_cuted(false), found_null(false), read_charset(cs)
unknown's avatar
unknown committed
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
{
  field_term_ptr=(char*) field_term.ptr();
  field_term_length= field_term.length();
  line_term_ptr=(char*) line_term.ptr();
  line_term_length= line_term.length();
  if (line_start.length() == 0)
  {
    line_start_ptr=0;
    start_of_line= 0;
  }
  else
  {
    line_start_ptr=(char*) line_start.ptr();
    line_start_end=line_start_ptr+line_start.length();
    start_of_line= 1;
  }
  /* If field_terminator == line_terminator, don't use line_terminator */
  if (field_term_length == line_term_length &&
      !memcmp(field_term_ptr,line_term_ptr,field_term_length))
  {
    line_term_length=0;
    line_term_ptr=(char*) "";
  }
  enclosed_char= (enclosed_length=enclosed_par.length()) ?
    (uchar) enclosed_par[0] : INT_MAX;
  field_term_char= field_term_length ? (uchar) field_term_ptr[0] : INT_MAX;
  line_term_char= line_term_length ? (uchar) line_term_ptr[0] : INT_MAX;

  /* Set of a stack for unget if long terminators */
1159
  uint length= max(cs->mbmaxlen, max(field_term_length, line_term_length)) + 1;
unknown's avatar
unknown committed
1160 1161 1162
  set_if_bigger(length,line_start.length());
  stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);

1163
  if (!(buffer=(uchar*) my_malloc(buff_length+1,MYF(0))))
unknown's avatar
unknown committed
1164 1165 1166 1167 1168
    error=1; /* purecov: inspected */
  else
  {
    end_of_buff=buffer+buff_length;
    if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
1169 1170
		      (get_it_from_net) ? READ_NET :
		      (is_fifo ? READ_FIFO : READ_CACHE),0L,1,
unknown's avatar
unknown committed
1171 1172
		      MYF(MY_WME)))
    {
1173
      my_free((uchar*) buffer,MYF(0)); /* purecov: inspected */
1174
      buffer= 0;
unknown's avatar
unknown committed
1175 1176
      error=1;
    }
unknown's avatar
unknown committed
1177
    else
1178
    {
1179 1180
      /*
	init_io_cache() will not initialize read_function member
unknown's avatar
unknown committed
1181
	if the cache is READ_NET. So we work around the problem with a
1182
	manual assignment
unknown's avatar
unknown committed
1183
      */
1184 1185 1186
      need_end_io_cache = 1;

#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
1187 1188
      if (get_it_from_net)
	cache.read_function = _my_b_net_read;
unknown's avatar
unknown committed
1189

1190
      if (mysql_bin_log.is_open())
1191
	cache.pre_read = cache.pre_close =
unknown's avatar
unknown committed
1192
	  (IO_CACHE_CALLBACK) log_loaded_block;
1193
#endif
1194
    }
unknown's avatar
unknown committed
1195 1196 1197 1198 1199 1200
  }
}


READ_INFO::~READ_INFO()
{
1201
  if (need_end_io_cache)
1202 1203
    ::end_io_cache(&cache);
  my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
unknown's avatar
unknown committed
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233
}


#define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache))
#define PUSH(A) *(stack_pos++)=(A)


inline int READ_INFO::terminator(char *ptr,uint length)
{
  int chr=0;					// Keep gcc happy
  uint i;
  for (i=1 ; i < length ; i++)
  {
    if ((chr=GET) != *++ptr)
    {
      break;
    }
  }
  if (i == length)
    return 1;
  PUSH(chr);
  while (i-- > 1)
    PUSH((uchar) *--ptr);
  return 0;
}


int READ_INFO::read_field()
{
  int chr,found_enclosed_char;
1234
  uchar *to,*new_buffer;
unknown's avatar
unknown committed
1235 1236 1237 1238 1239

  found_null=0;
  if (found_end_of_line)
    return 1;					// One have to call next_line

unknown's avatar
unknown committed
1240
  /* Skip until we find 'line_start' */
unknown's avatar
unknown committed
1241 1242

  if (start_of_line)
unknown's avatar
unknown committed
1243
  {						// Skip until line_start
unknown's avatar
unknown committed
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
    start_of_line=0;
    if (find_start_of_fields())
      return 1;
  }
  if ((chr=GET) == my_b_EOF)
  {
    found_end_of_line=eof=1;
    return 1;
  }
  to=buffer;
  if (chr == enclosed_char)
  {
    found_enclosed_char=enclosed_char;
1257
    *to++=(uchar) chr;				// If error
unknown's avatar
unknown committed
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275
  }
  else
  {
    found_enclosed_char= INT_MAX;
    PUSH(chr);
  }

  for (;;)
  {
    while ( to < end_of_buff)
    {
      chr = GET;
      if (chr == my_b_EOF)
	goto found_eof;
      if (chr == escape_char)
      {
	if ((chr=GET) == my_b_EOF)
	{
1276
	  *to++= (uchar) escape_char;
unknown's avatar
unknown committed
1277 1278
	  goto found_eof;
	}
1279 1280 1281 1282 1283 1284 1285 1286 1287
        /*
          When escape_char == enclosed_char, we treat it like we do for
          handling quotes in SQL parsing -- you can double-up the
          escape_char to include it literally, but it doesn't do escapes
          like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
          with data like: "fie""ld1", "field2"
         */
        if (escape_char != enclosed_char || chr == escape_char)
        {
1288
          *to++ = (uchar) unescape((char) chr);
1289 1290
          continue;
        }
1291 1292
        PUSH(chr);
        chr= escape_char;
unknown's avatar
unknown committed
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
      }
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
      if (chr == line_term_char)
#else
      if (chr == line_term_char && found_enclosed_char == INT_MAX)
#endif
      {
	if (terminator(line_term_ptr,line_term_length))
	{					// Maybe unexpected linefeed
	  enclosed=0;
	  found_end_of_line=1;
	  row_start=buffer;
	  row_end=  to;
	  return 0;
	}
      }
      if (chr == found_enclosed_char)
      {
	if ((chr=GET) == found_enclosed_char)
	{					// Remove dupplicated
1313
	  *to++ = (uchar) chr;
unknown's avatar
unknown committed
1314 1315 1316 1317
	  continue;
	}
	// End of enclosed field if followed by field_term or line_term
	if (chr == my_b_EOF ||
1318 1319 1320 1321
	    (chr == line_term_char && terminator(line_term_ptr,
                                                 line_term_length)))
        {
          /* Maybe unexpected linefeed */
unknown's avatar
unknown committed
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
	  enclosed=1;
	  found_end_of_line=1;
	  row_start=buffer+1;
	  row_end=  to;
	  return 0;
	}
	if (chr == field_term_char &&
	    terminator(field_term_ptr,field_term_length))
	{
	  enclosed=1;
	  row_start=buffer+1;
	  row_end=  to;
	  return 0;
	}
1336 1337 1338 1339
	/*
	  The string didn't terminate yet.
	  Store back next character for the loop
	*/
unknown's avatar
unknown committed
1340
	PUSH(chr);
1341 1342
	/* copy the found term character to 'to' */
	chr= found_enclosed_char;
unknown's avatar
unknown committed
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
      }
      else if (chr == field_term_char && found_enclosed_char == INT_MAX)
      {
	if (terminator(field_term_ptr,field_term_length))
	{
	  enclosed=0;
	  row_start=buffer;
	  row_end=  to;
	  return 0;
	}
      }
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
#ifdef USE_MB
      if (my_mbcharlen(read_charset, chr) > 1 &&
          to + my_mbcharlen(read_charset, chr) <= end_of_buff)
      {
        uchar* p= (uchar*) to;
        int ml, i;
        *to++ = chr;

        ml= my_mbcharlen(read_charset, chr);

        for (i= 1; i < ml; i++) 
        {
          chr= GET;
          if (chr == my_b_EOF)
          {
            /*
             Need to back up the bytes already ready from illformed
             multi-byte char 
            */
            to-= i;
            goto found_eof;
          }
          *to++ = chr;
        }
        if (my_ismbchar(read_charset,
                        (const char *)p,
                        (const char *)to))
          continue;
        for (i= 0; i < ml; i++)
          PUSH((uchar) *--to);
        chr= GET;
      }
#endif
1387
      *to++ = (uchar) chr;
unknown's avatar
unknown committed
1388 1389 1390 1391
    }
    /*
    ** We come here if buffer is too small. Enlarge it and continue
    */
1392
    if (!(new_buffer=(uchar*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
unknown's avatar
unknown committed
1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
					MYF(MY_WME))))
      return (error=1);
    to=new_buffer + (to-buffer);
    buffer=new_buffer;
    buff_length+=IO_SIZE;
    end_of_buff=buffer+buff_length;
  }

found_eof:
  enclosed=0;
  found_end_of_line=eof=1;
  row_start=buffer;
  row_end=to;
  return 0;
}

/*
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421
  Read a row with fixed length.

  NOTES
    The row may not be fixed size on disk if there are escape
    characters in the file.

  IMPLEMENTATION NOTE
    One can't use fixed length with multi-byte charset **

  RETURN
    0  ok
    1  error
unknown's avatar
unknown committed
1422
*/
unknown's avatar
unknown committed
1423

unknown's avatar
unknown committed
1424 1425 1426
int READ_INFO::read_fixed_length()
{
  int chr;
1427
  uchar *to;
unknown's avatar
unknown committed
1428 1429 1430 1431
  if (found_end_of_line)
    return 1;					// One have to call next_line

  if (start_of_line)
unknown's avatar
unknown committed
1432
  {						// Skip until line_start
unknown's avatar
unknown committed
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446
    start_of_line=0;
    if (find_start_of_fields())
      return 1;
  }

  to=row_start=buffer;
  while (to < end_of_buff)
  {
    if ((chr=GET) == my_b_EOF)
      goto found_eof;
    if (chr == escape_char)
    {
      if ((chr=GET) == my_b_EOF)
      {
1447
	*to++= (uchar) escape_char;
unknown's avatar
unknown committed
1448 1449
	goto found_eof;
      }
1450
      *to++ =(uchar) unescape((char) chr);
unknown's avatar
unknown committed
1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
      continue;
    }
    if (chr == line_term_char)
    {
      if (terminator(line_term_ptr,line_term_length))
      {						// Maybe unexpected linefeed
	found_end_of_line=1;
	row_end=  to;
	return 0;
      }
    }
1462
    *to++ = (uchar) chr;
unknown's avatar
unknown committed
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490
  }
  row_end=to;					// Found full line
  return 0;

found_eof:
  found_end_of_line=eof=1;
  row_start=buffer;
  row_end=to;
  return to == buffer ? 1 : 0;
}


int READ_INFO::next_line()
{
  line_cuted=0;
  start_of_line= line_start_ptr != 0;
  if (found_end_of_line || eof)
  {
    found_end_of_line=0;
    return eof;
  }
  found_end_of_line=0;
  if (!line_term_length)
    return 0;					// No lines
  for (;;)
  {
    int chr = GET;
#ifdef USE_MB
1491
   if (my_mbcharlen(read_charset, chr) > 1)
unknown's avatar
unknown committed
1492
   {
1493
       for (uint i=1;
unknown's avatar
unknown committed
1494
            chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
unknown's avatar
unknown committed
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538
            i++)
	   chr = GET;
       if (chr == escape_char)
	   continue;
   }
#endif
   if (chr == my_b_EOF)
   {
      eof=1;
      return 1;
    }
    if (chr == escape_char)
    {
      line_cuted=1;
      if (GET == my_b_EOF)
	return 1;
      continue;
    }
    if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
      return 0;
    line_cuted=1;
  }
}


bool READ_INFO::find_start_of_fields()
{
  int chr;
 try_again:
  do
  {
    if ((chr=GET) == my_b_EOF)
    {
      found_end_of_line=eof=1;
      return 1;
    }
  } while ((char) chr != line_start_ptr[0]);
  for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
  {
    chr=GET;					// Eof will be checked later
    if ((char) chr != *ptr)
    {						// Can't be line_start
      PUSH(chr);
      while (--ptr != line_start_ptr)
1539
      {						// Restart with next char
unknown's avatar
unknown committed
1540 1541 1542 1543 1544 1545 1546
	PUSH((uchar) *ptr);
      }
      goto try_again;
    }
  }
  return 0;
}