ha_tina.cc 43.2 KB
Newer Older
1 2 3 4
/* Copyright (C) 2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
5
  the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13 14 15 16 17 18

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*
  Make sure to look at ha_tina.h for more details.

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
  First off, this is a play thing for me, there are a number of things
  wrong with it:
    *) It was designed for csv and therefore its performance is highly
       questionable.
    *) Indexes have not been implemented. This is because the files can
       be traded in and out of the table directory without having to worry
       about rebuilding anything.
    *) NULLs and "" are treated equally (like a spreadsheet).
    *) There was in the beginning no point to anyone seeing this other
       then me, so there is a good chance that I haven't quite documented
       it well.
    *) Less design, more "make it work"

  Now there are a few cool things with it:
    *) Errors can result in corrupted data files.
    *) Data files can be read by spreadsheets directly.
35 36 37 38 39 40 41 42 43

TODO:
 *) Move to a block system for larger files
 *) Error recovery, its all there, just need to finish it
 *) Document how the chains work.

 -Brian
*/

unknown's avatar
unknown committed
44
#ifdef USE_PRAGMA_IMPLEMENTATION
45 46 47
#pragma implementation        // gcc: Class implementation
#endif

48 49
#include "mysql_priv.h"
#include <mysql/plugin.h>
50 51
#include "ha_tina.h"

unknown's avatar
unknown committed
52

53 54 55 56 57 58 59 60 61
/*
  uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + uchar
*/
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
  + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)
#define TINA_CHECK_HEADER 254 // The number we use to determine corruption

/* The file extension */
#define CSV_EXT ".CSV"               // The data file
62
#define CSN_EXT ".CSN"               // Files used during repair and update
63 64 65
#define CSM_EXT ".CSM"               // Meta file


66 67 68 69
static TINA_SHARE *get_share(const char *table_name, TABLE *table);
static int free_share(TINA_SHARE *share);
static int read_meta_file(File meta_file, ha_rows *rows);
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
70

71 72 73 74
extern "C" void tina_get_status(void* param, int concurrent_insert);
extern "C" void tina_update_status(void* param);
extern "C" my_bool tina_check_status(void* param);

75 76 77
/* Stuff for shares */
pthread_mutex_t tina_mutex;
static HASH tina_open_tables;
78 79 80
static handler *tina_create_handler(handlerton *hton,
                                    TABLE_SHARE *table, 
                                    MEM_ROOT *mem_root);
81

82 83 84 85 86

/*****************************************************************************
 ** TINA tables
 *****************************************************************************/

87
/*
88
  Used for sorting chains with qsort().
89 90 91
*/
int sort_set (tina_set *a, tina_set *b)
{
92 93 94 95
  /*
    We assume that intervals do not intersect. So, it is enought to compare
    any two points. Here we take start of intervals for comparison.
  */
96
  return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
97 98
}

99
static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
100 101 102
                          my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
103
  return (uchar*) share->table_name;
104 105
}

106
static int tina_init_func(void *p)
unknown's avatar
unknown committed
107
{
108 109
  handlerton *tina_hton;

110 111 112 113 114 115 116 117 118
  tina_hton= (handlerton *)p;
  VOID(pthread_mutex_init(&tina_mutex,MY_MUTEX_INIT_FAST));
  (void) hash_init(&tina_open_tables,system_charset_info,32,0,0,
                   (hash_get_key) tina_get_key,0,0);
  tina_hton->state= SHOW_OPTION_YES;
  tina_hton->db_type= DB_TYPE_CSV_DB;
  tina_hton->create= tina_create_handler;
  tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES | 
                     HTON_NO_PARTITION);
unknown's avatar
unknown committed
119 120 121
  return 0;
}

122
static int tina_done_func(void *p)
unknown's avatar
unknown committed
123
{
124 125 126
  hash_free(&tina_open_tables);
  pthread_mutex_destroy(&tina_mutex);

unknown's avatar
unknown committed
127 128
  return 0;
}
129

unknown's avatar
unknown committed
130

131 132 133 134 135 136
/*
  Simple lock controls.
*/
static TINA_SHARE *get_share(const char *table_name, TABLE *table)
{
  TINA_SHARE *share;
137
  char meta_file_name[FN_REFLEN];
138
  MY_STAT file_stat;                /* Stat information for the data file */
139 140 141 142 143
  char *tmp_name;
  uint length;

  pthread_mutex_lock(&tina_mutex);
  length=(uint) strlen(table_name);
144 145 146 147 148

  /*
    If share is not present in the hash, create a new share and
    initialize its members.
  */
149
  if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
150
                                        (uchar*) table_name,
151
                                       length)))
152 153
  {
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
154 155
                         &share, sizeof(*share),
                         &tmp_name, length+1,
156
                         NullS))
157 158 159 160 161
    {
      pthread_mutex_unlock(&tina_mutex);
      return NULL;
    }

162
    share->use_count= 0;
163
    share->is_log_table= FALSE;
164 165
    share->table_name_length= length;
    share->table_name= tmp_name;
166 167
    share->crashed= FALSE;
    share->rows_recorded= 0;
168 169
    share->update_file_opened= FALSE;
    share->tina_write_opened= FALSE;
170
    share->data_file_version= 0;
171
    strmov(share->table_name, table_name);
172 173 174
    fn_format(share->data_file_name, table_name, "", CSV_EXT,
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
    fn_format(meta_file_name, table_name, "", CSM_EXT,
175
              MY_REPLACE_EXT|MY_UNPACK_FILENAME);
unknown's avatar
unknown committed
176 177 178 179 180

    if (my_stat(share->data_file_name, &file_stat, MYF(MY_WME)) == NULL)
      goto error;
    share->saved_data_file_length= file_stat.st_size;

181
    if (my_hash_insert(&tina_open_tables, (uchar*) share))
182 183 184 185
      goto error;
    thr_lock_init(&share->lock);
    pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);

186 187 188 189 190 191 192 193 194 195 196
    /*
      Open or create the meta file. In the latter case, we'll get
      an error during read_meta_file and mark the table as crashed.
      Usually this will result in auto-repair, and we will get a good
      meta-file in the end.
    */
    if ((share->meta_file= my_open(meta_file_name,
                                   O_RDWR|O_CREAT, MYF(0))) == -1)
      share->crashed= TRUE;

    /*
197
      If the meta file will not open we assume it is crashed and
198 199 200 201
      mark it as such.
    */
    if (read_meta_file(share->meta_file, &share->rows_recorded))
      share->crashed= TRUE;
202 203 204 205 206 207 208 209
  }
  share->use_count++;
  pthread_mutex_unlock(&tina_mutex);

  return share;

error:
  pthread_mutex_unlock(&tina_mutex);
210
  my_free((uchar*) share, MYF(0));
211 212 213 214 215

  return NULL;
}


216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
/*
  Read CSV meta-file

  SYNOPSIS
    read_meta_file()
    meta_file   The meta-file filedes
    ha_rows     Pointer to the var we use to store rows count.
                These are read from the meta-file.

  DESCRIPTION

    Read the meta-file info. For now we are only interested in
    rows counf, crashed bit and magic number.

  RETURN
    0 - OK
232
    non-zero - error occurred
233 234
*/

235
static int read_meta_file(File meta_file, ha_rows *rows)
236 237 238 239 240 241 242
{
  uchar meta_buffer[META_BUFFER_SIZE];
  uchar *ptr= meta_buffer;

  DBUG_ENTER("ha_tina::read_meta_file");

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
243
  if (my_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
      != META_BUFFER_SIZE)
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  /*
    Parse out the meta data, we ignore version at the moment
  */

  ptr+= sizeof(uchar)*2; // Move past header
  *rows= (ha_rows)uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past rows
  /*
    Move past check_point, auto_increment and forced_flushes fields.
    They are present in the format, but we do not use them yet.
  */
  ptr+= 3*sizeof(ulonglong);

  /* check crashed bit and magic number */
  if ((meta_buffer[0] != (uchar)TINA_CHECK_HEADER) ||
      ((bool)(*ptr)== TRUE))
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  my_sync(meta_file, MYF(MY_WME));

  DBUG_RETURN(0);
}


/*
  Write CSV meta-file

  SYNOPSIS
    write_meta_file()
    meta_file   The meta-file filedes
    ha_rows     The number of rows we have in the datafile.
    dirty       A flag, which marks whether we have a corrupt table

  DESCRIPTION

    Write meta-info the the file. Only rows count, crashed bit and
    magic number matter now.

  RETURN
    0 - OK
287
    non-zero - error occurred
288 289
*/

290
static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
{
  uchar meta_buffer[META_BUFFER_SIZE];
  uchar *ptr= meta_buffer;

  DBUG_ENTER("ha_tina::write_meta_file");

  *ptr= (uchar)TINA_CHECK_HEADER;
  ptr+= sizeof(uchar);
  *ptr= (uchar)TINA_VERSION;
  ptr+= sizeof(uchar);
  int8store(ptr, (ulonglong)rows);
  ptr+= sizeof(ulonglong);
  memset(ptr, 0, 3*sizeof(ulonglong));
  /*
     Skip over checkpoint, autoincrement and forced_flushes fields.
     We'll need them later.
  */
  ptr+= 3*sizeof(ulonglong);
  *ptr= (uchar)dirty;

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
312
  if (my_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
      != META_BUFFER_SIZE)
    DBUG_RETURN(-1);

  my_sync(meta_file, MYF(MY_WME));

  DBUG_RETURN(0);
}

bool ha_tina::check_and_repair(THD *thd)
{
  HA_CHECK_OPT check_opt;
  DBUG_ENTER("ha_tina::check_and_repair");

  check_opt.init();

  DBUG_RETURN(repair(thd, &check_opt));
}


332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
int ha_tina::init_tina_writer()
{
  DBUG_ENTER("ha_tina::init_tina_writer");

  /*
    Mark the file as crashed. We will set the flag back when we close
    the file. In the case of the crash it will remain marked crashed,
    which enforce recovery.
  */
  (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);

  if ((share->tina_write_filedes=
        my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
  {
    DBUG_PRINT("info", ("Could not open tina file writes"));
    share->crashed= TRUE;
    DBUG_RETURN(1);
  }
  share->tina_write_opened= TRUE;

  DBUG_RETURN(0);
}


356 357 358 359 360 361
bool ha_tina::is_crashed() const
{
  DBUG_ENTER("ha_tina::is_crashed");
  DBUG_RETURN(share->crashed);
}

362
/*
363 364 365 366 367 368 369 370
  Free lock controls.
*/
static int free_share(TINA_SHARE *share)
{
  DBUG_ENTER("ha_tina::free_share");
  pthread_mutex_lock(&tina_mutex);
  int result_code= 0;
  if (!--share->use_count){
371 372 373 374 375
    /* Write the meta file. Mark it as crashed if needed. */
    (void)write_meta_file(share->meta_file, share->rows_recorded,
                          share->crashed ? TRUE :FALSE);
    if (my_close(share->meta_file, MYF(0)))
      result_code= 1;
376 377 378 379 380 381 382
    if (share->tina_write_opened)
    {
      if (my_close(share->tina_write_filedes, MYF(0)))
        result_code= 1;
      share->tina_write_opened= FALSE;
    }

383
    hash_delete(&tina_open_tables, (uchar*) share);
384 385
    thr_lock_delete(&share->lock);
    pthread_mutex_destroy(&share->mutex);
386
    my_free((uchar*) share, MYF(0));
387 388 389 390 391 392
  }
  pthread_mutex_unlock(&tina_mutex);

  DBUG_RETURN(result_code);
}

393

394
/*
395 396 397 398 399 400 401
  This function finds the end of a line and returns the length
  of the line ending.

  We support three kinds of line endings:
  '\r'     --  Old Mac OS line ending
  '\n'     --  Traditional Unix and Mac OS X line ending
  '\r''\n' --  DOS\Windows line ending
402
*/
403

404 405
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin,
                     off_t end, int *eoln_len)
406
{
unknown's avatar
unknown committed
407
  *eoln_len= 0;
unknown's avatar
unknown committed
408

409
  for (off_t x= begin; x < end; x++)
410 411
  {
    /* Unix (includes Mac OS X) */
412
    if (data_buff->get_value(x) == '\n')
413
      *eoln_len= 1;
unknown's avatar
unknown committed
414
    else
unknown's avatar
unknown committed
415
      if (data_buff->get_value(x) == '\r') // Mac or Dos
416
      {
unknown's avatar
unknown committed
417 418 419 420 421
        /* old Mac line ending */
        if (x + 1 == end || (data_buff->get_value(x + 1) != '\n'))
          *eoln_len= 1;
        else // DOS style ending
          *eoln_len= 2;
422
      }
unknown's avatar
unknown committed
423 424 425

    if (*eoln_len)  // end of line was found
      return x;
426
  }
427 428 429 430

  return 0;
}

431

432 433 434
static handler *tina_create_handler(handlerton *hton,
                                    TABLE_SHARE *table, 
                                    MEM_ROOT *mem_root)
435
{
436
  return new (mem_root) ha_tina(hton, table);
437 438 439
}


440 441
ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
  :handler(hton, table_arg),
442
  /*
unknown's avatar
unknown committed
443 444
    These definitions are found in handler.h
    They are not probably completely right.
445
  */
446
  current_position(0), next_position(0), local_saved_data_file_length(0),
447
  file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
448
  local_data_file_version(0), records_is_known(0)
449 450
{
  /* Set our original buffers from pre-allocated memory */
451
  buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
452
  chain= chain_buffer;
453
  file_buff= new Transparent_file();
454 455
}

456

457 458 459
/*
  Encode a buffer into the quoted format.
*/
unknown's avatar
unknown committed
460

461
int ha_tina::encode_quote(uchar *buf)
462 463
{
  char attribute_buffer[1024];
464 465
  String attribute(attribute_buffer, sizeof(attribute_buffer),
                   &my_charset_bin);
466

unknown's avatar
unknown committed
467
  my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
468
  buffer.length(0);
469

470 471 472 473
  for (Field **field=table->field ; *field ; field++)
  {
    const char *ptr;
    const char *end_ptr;
unknown's avatar
unknown committed
474
    const bool was_null= (*field)->is_null();
475

476
    /*
unknown's avatar
unknown committed
477 478 479 480
      CSV does not support nulls. ::create() prevents creation of a table
      with nullable columns so if we encounter them here, there is a bug.
      This may only occur if the frm was created by an older version of
      mysqld which permitted table creation with nullable columns.
481
    */
unknown's avatar
unknown committed
482 483 484 485 486 487 488
    DBUG_ASSERT(!(*field)->maybe_null());
    
    /*
      assistance for backwards compatibility in production builds.
      note: this will not work for ENUM columns.
    */
    if (was_null)
unknown's avatar
unknown committed
489
    {
unknown's avatar
unknown committed
490 491
      (*field)->set_default();
      (*field)->set_notnull();
unknown's avatar
unknown committed
492
    }
493 494

    (*field)->val_str(&attribute,&attribute);
unknown's avatar
unknown committed
495 496 497
    
    if (was_null)
      (*field)->set_null();
498 499

    if ((*field)->str_needs_quotes())
500 501 502
    {
      ptr= attribute.ptr();
      end_ptr= attribute.length() + ptr;
503

504
      buffer.append('"');
505

506
      while (ptr < end_ptr) 
507
      {
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
        if (*ptr == '"')
        {
          buffer.append('\\');
          buffer.append('"');
          *ptr++;
        }
        else if (*ptr == '\r')
        {
          buffer.append('\\');
          buffer.append('r');
          *ptr++;
        }
        else if (*ptr == '\\')
        {
          buffer.append('\\');
          buffer.append('\\');
          *ptr++;
        }
        else if (*ptr == '\n')
        {
          buffer.append('\\');
          buffer.append('n');
          *ptr++;
        }
        else
          buffer.append(*ptr++);
534
      }
535 536 537 538 539
      buffer.append('"');
    }
    else
    {
      buffer.append(attribute);
540
    }
541

542 543 544 545 546
    buffer.append(',');
  }
  // Remove the comma, add a line feed
  buffer.length(buffer.length() - 1);
  buffer.append('\n');
547

548 549
  //buffer.replace(buffer.length(), 0, "\n", 1);

unknown's avatar
unknown committed
550
  dbug_tmp_restore_column_map(table->read_set, org_bitmap);
551 552 553 554
  return (buffer.length());
}

/*
555
  chain_append() adds delete positions to the chain that we use to keep
556
  track of space. Then the chain will be used to cleanup "holes", occurred
557
  due to deletes and updates.
558 559 560 561 562
*/
int ha_tina::chain_append()
{
  if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
    (chain_ptr -1)->end= next_position;
563
  else
564 565 566 567 568 569 570 571 572
  {
    /* We set up for the next position */
    if ((off_t)(chain_ptr - chain) == (chain_size -1))
    {
      off_t location= chain_ptr - chain;
      chain_size += DEFAULT_CHAIN_LENGTH;
      if (chain_alloced)
      {
        /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
573
        if ((chain= (tina_set *) my_realloc((uchar*)chain,
574
                                            chain_size, MYF(MY_WME))) == NULL)
575 576 577 578
          return -1;
      }
      else
      {
579 580
        tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
                                              MYF(MY_WME));
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
        memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
        chain= ptr;
        chain_alloced++;
      }
      chain_ptr= chain + location;
    }
    chain_ptr->begin= current_position;
    chain_ptr->end= next_position;
    chain_ptr++;
  }

  return 0;
}


596
/*
597 598
  Scans for a row.
*/
599
int ha_tina::find_current_row(uchar *buf)
600
{
601
  off_t end_offset, curr_offset= current_position;
unknown's avatar
unknown committed
602
  int eoln_len;
603
  my_bitmap_map *org_bitmap;
unknown's avatar
unknown committed
604
  int error;
605
  bool read_all;
606 607
  DBUG_ENTER("ha_tina::find_current_row");

608 609 610 611
  /*
    We do not read further then local_saved_data_file_length in order
    not to conflict with undergoing concurrent insert.
  */
612 613 614
  if ((end_offset=
        find_eoln_buff(file_buff, current_position,
                       local_saved_data_file_length, &eoln_len)) == 0)
615 616
    DBUG_RETURN(HA_ERR_END_OF_FILE);

617 618
  /* We must read all columns in case a table is opened for update */
  read_all= !bitmap_is_clear_all(table->write_set);
619 620
  /* Avoid asserts in ::store() for columns that are not going to be updated */
  org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
unknown's avatar
unknown committed
621 622 623
  error= HA_ERR_CRASHED_ON_USAGE;

  memset(buf, 0, table->s->null_bytes);
624

625 626
  for (Field **field=table->field ; *field ; field++)
  {
627 628
    char curr_char;
    
629
    buffer.length(0);
630 631 632 633
    if (curr_offset >= end_offset)
      goto err;
    curr_char= file_buff->get_value(curr_offset);
    if (curr_char == '"')
634
    {
635 636
      curr_offset++; // Incrementpast the first quote

637
      for(; curr_offset < end_offset; curr_offset++)
638
      {
639
        curr_char= file_buff->get_value(curr_offset);
640
        // Need to convert line feeds!
641 642 643
        if (curr_char == '"' &&
            (curr_offset == end_offset - 1 ||
             file_buff->get_value(curr_offset + 1) == ','))
644
        {
645 646 647
          curr_offset+= 2; // Move past the , and the "
          break;
        }
648
        if (curr_char == '\\' && curr_offset != (end_offset - 1))
649 650
        {
          curr_offset++;
651 652
          curr_char= file_buff->get_value(curr_offset);
          if (curr_char == 'r')
653
            buffer.append('\r');
654
          else if (curr_char == 'n' )
655
            buffer.append('\n');
656 657
          else if (curr_char == '\\' || curr_char == '"')
            buffer.append(curr_char);
658 659 660
          else  /* This could only happed with an externally created file */
          {
            buffer.append('\\');
661
            buffer.append(curr_char);
662 663 664 665 666 667 668 669 670 671
          }
        }
        else // ordinary symbol
        {
          /*
            We are at final symbol and no last quote was found =>
            we are working with a damaged file.
          */
          if (curr_offset == end_offset - 1)
            goto err;
672
          buffer.append(curr_char);
673
        }
674
      }
675
    }
676
    else 
677
    {
678
      for(; curr_offset < end_offset; curr_offset++)
679
      {
680 681
        curr_char= file_buff->get_value(curr_offset);
        if (curr_char == ',')
682
        {
683
          curr_offset++;       // Skip the ,
684 685
          break;
        }
686
        buffer.append(curr_char);
687
      }
688
    }
689

690
    if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
691
    {
692 693
      if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
                          CHECK_FIELD_WARN))
694 695
        goto err;
    }
696
  }
697
  next_position= end_offset + eoln_len;
unknown's avatar
unknown committed
698 699 700
  error= 0;

err:
unknown's avatar
unknown committed
701
  dbug_tmp_restore_column_map(table->write_set, org_bitmap);
702

unknown's avatar
unknown committed
703
  DBUG_RETURN(error);
704 705 706 707 708 709
}

/*
  If frm_error() is called in table.cc this is called to find out what file
  extensions exist for this handler.
*/
unknown's avatar
unknown committed
710
static const char *ha_tina_exts[] = {
711 712
  CSV_EXT,
  CSM_EXT,
unknown's avatar
unknown committed
713 714 715
  NullS
};

716
const char **ha_tina::bas_ext() const
unknown's avatar
unknown committed
717 718 719
{
  return ha_tina_exts;
}
720

721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
/*
  Three functions below are needed to enable concurrent insert functionality
  for CSV engine. For more details see mysys/thr_lock.c
*/

void tina_get_status(void* param, int concurrent_insert)
{
  ha_tina *tina= (ha_tina*) param;
  tina->get_status();
}

void tina_update_status(void* param)
{
  ha_tina *tina= (ha_tina*) param;
  tina->update_status();
}

/* this should exist and return 0 for concurrent insert to work */
my_bool tina_check_status(void* param)
{
  return 0;
}

/*
  Save the state of the table

  SYNOPSIS
    get_status()

  DESCRIPTION
    This function is used to retrieve the file length. During the lock
    phase of concurrent insert. For more details see comment to
    ha_tina::update_status below.
*/

void ha_tina::get_status()
{
  if (share->is_log_table)
  {
    /*
      We have to use mutex to follow pthreads memory visibility
      rules for share->saved_data_file_length
    */
    pthread_mutex_lock(&share->mutex);
    local_saved_data_file_length= share->saved_data_file_length;
    pthread_mutex_unlock(&share->mutex);
    return;
  }
  local_saved_data_file_length= share->saved_data_file_length;
}


/*
  Correct the state of the table. Called by unlock routines
  before the write lock is released.

  SYNOPSIS
    update_status()

  DESCRIPTION
    When we employ concurrent insert lock, we save current length of the file
    during the lock phase. We do not read further saved value, as we don't
    want to interfere with undergoing concurrent insert. Writers update file
    length info during unlock with update_status().

  NOTE
    For log tables concurrent insert works different. The reason is that
    log tables are always opened and locked. And as they do not unlock
    tables, the file length after writes should be updated in a different
    way. For this purpose we need is_log_table flag. When this flag is set
    we call update_status() explicitly after each row write.
*/

void ha_tina::update_status()
{
  /* correct local_saved_data_file_length for writers */
797
  share->saved_data_file_length= local_saved_data_file_length;
798 799 800
}


801
/*
802 803 804 805
  Open a database file. Keep in mind that tables are caches, so
  this will not be called for every request. Any sort of positions
  that need to be reset should be kept in the ::extra() call.
*/
806
int ha_tina::open(const char *name, int mode, uint open_options)
807 808 809 810
{
  DBUG_ENTER("ha_tina::open");

  if (!(share= get_share(name, table)))
811
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
812

813
  if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
814 815
  {
    free_share(share);
816
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
817
  }
818

819
  local_data_file_version= share->data_file_version;
820 821 822
  if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
    DBUG_RETURN(0);

823 824 825 826 827 828
  /*
    Init locking. Pass handler object to the locking routines,
    so that they could save/update local_saved_data_file_length value
    during locking. This is needed to enable concurrent inserts.
  */
  thr_lock_data_init(&share->lock, &lock, (void*) this);
829 830
  ref_length=sizeof(off_t);

831 832 833 834
  share->lock.get_status= tina_get_status;
  share->lock.update_status= tina_update_status;
  share->lock.check_status= tina_check_status;

835 836 837 838 839 840
  DBUG_RETURN(0);
}


/*
  Close a database file. We remove ourselves from the shared strucutre.
841
  If it is empty we destroy it.
842 843 844
*/
int ha_tina::close(void)
{
845
  int rc= 0;
846
  DBUG_ENTER("ha_tina::close");
847 848
  rc= my_close(data_file, MYF(0));
  DBUG_RETURN(free_share(share) || rc);
849 850
}

851
/*
852 853 854 855
  This is an INSERT. At the moment this handler just seeks to the end
  of the file and appends the data. In an error case it really should
  just truncate to the original position (this is not done yet).
*/
856
int ha_tina::write_row(uchar * buf)
857 858 859 860
{
  int size;
  DBUG_ENTER("ha_tina::write_row");

861 862 863
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

864
  ha_statistic_increment(&SSV::ha_write_count);
865

866 867
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
868 869 870

  size= encode_quote(buf);

871 872 873
  if (!share->tina_write_opened)
    if (init_tina_writer())
      DBUG_RETURN(-1);
874

875
   /* use pwrite, as concurrent reader could have changed the position */
876
  if (my_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
877
               MYF(MY_WME | MY_NABP)))
878
    DBUG_RETURN(-1);
879 880

  /* update local copy of the max position to see our own changes */
881
  local_saved_data_file_length+= size;
882

883 884 885
  /* update shared info */
  pthread_mutex_lock(&share->mutex);
  share->rows_recorded++;
886 887 888
  /* update status for the log tables */
  if (share->is_log_table)
    update_status();
889
  pthread_mutex_unlock(&share->mutex);
890

891
  stats.records++;
892 893 894 895
  DBUG_RETURN(0);
}


896 897 898 899 900 901 902 903 904 905 906 907 908
int ha_tina::open_update_temp_file_if_needed()
{
  char updated_fname[FN_REFLEN];

  if (!share->update_file_opened)
  {
    if ((update_temp_file=
           my_create(fn_format(updated_fname, share->table_name,
                               "", CSN_EXT,
                               MY_REPLACE_EXT | MY_UNPACK_FILENAME),
                     0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
      return 1;
    share->update_file_opened= TRUE;
909
    temp_file_length= 0;
910 911 912 913
  }
  return 0;
}

914
/*
915
  This is called for an update.
916
  Make sure you put in code to increment the auto increment, also
917 918 919 920 921
  update any timestamp data. Currently auto increment is not being
  fixed since autoincrements have yet to be added to this table handler.
  This will be called in a table scan right before the previous ::rnd_next()
  call.
*/
922
int ha_tina::update_row(const uchar * old_data, uchar * new_data)
923 924 925 926
{
  int size;
  DBUG_ENTER("ha_tina::update_row");

927
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
928

929 930
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
    table->timestamp_field->set_time();
931 932 933

  size= encode_quote(new_data);

934 935 936 937 938 939 940
  /*
    During update we mark each updating record as deleted 
    (see the chain_append()) then write new one to the temporary data file. 
    At the end of the sequence in the rnd_end() we append all non-marked
    records from the data file to the temporary data file then rename it.
    The temp_file_length is used to calculate new data file length.
  */
941 942 943
  if (chain_append())
    DBUG_RETURN(-1);

944 945 946
  if (open_update_temp_file_if_needed())
    DBUG_RETURN(-1);

947
  if (my_write(update_temp_file, (uchar*)buffer.ptr(), size,
948
               MYF(MY_WME | MY_NABP)))
949
    DBUG_RETURN(-1);
950
  temp_file_length+= size;
951 952 953 954

  /* UPDATE should never happen on the log tables */
  DBUG_ASSERT(!share->is_log_table);

955 956 957 958
  DBUG_RETURN(0);
}


959 960 961 962 963 964 965 966
/*
  Deletes a row. First the database will find the row, and then call this
  method. In the case of a table scan, the previous call to this will be
  the ::rnd_next() that found this row.
  The exception to this is an ORDER BY. This will cause the table handler
  to walk the table noting the positions of all rows that match a query.
  The table will then be deleted/positioned based on the ORDER (so RANDOM,
  DESC, ASC).
967
*/
968
int ha_tina::delete_row(const uchar * buf)
969 970
{
  DBUG_ENTER("ha_tina::delete_row");
971
  ha_statistic_increment(&SSV::ha_delete_count);
972 973 974 975

  if (chain_append())
    DBUG_RETURN(-1);

976
  stats.records--;
977 978 979 980 981
  /* Update shared info */
  DBUG_ASSERT(share->rows_recorded);
  pthread_mutex_lock(&share->mutex);
  share->rows_recorded--;
  pthread_mutex_unlock(&share->mutex);
982

983 984 985
  /* DELETE should never happen on the log table */
  DBUG_ASSERT(!share->is_log_table);

986 987 988 989
  DBUG_RETURN(0);
}


990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
/**
  @brief Initialize the data file.
  
  @details Compare the local version of the data file with the shared one.
  If they differ, there are some changes behind and we have to reopen
  the data file to make the changes visible.
  Call @c file_buff->init_buff() at the end to read the beginning of the 
  data file into buffer.
  
  @retval  0  OK.
  @retval  1  There was an error.
*/

int ha_tina::init_data_file()
{
  if (local_data_file_version != share->data_file_version)
  {
    local_data_file_version= share->data_file_version;
    if (my_close(data_file, MYF(0)) ||
        (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
      return 1;
  }
  file_buff->init_buff(data_file);
  return 0;
}


1017 1018
/*
  All table scans call this first.
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
  The order of a table scan is:

  ha_tina::store_lock
  ha_tina::external_lock
  ha_tina::info
  ha_tina::rnd_init
  ha_tina::extra
  ENUM HA_EXTRA_CACHE   Cash record in HA_rrnd()
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::rnd_next
  ha_tina::extra
  ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
  ha_tina::external_lock
  ha_tina::extra
  ENUM HA_EXTRA_RESET   Reset database to after open

1042 1043
  Each call to ::rnd_next() represents a row returned in the can. When no more
  rows can be returned, rnd_next() returns a value of HA_ERR_END_OF_FILE.
1044 1045 1046 1047 1048 1049 1050 1051
  The ::info() call is just for the optimizer.

*/

int ha_tina::rnd_init(bool scan)
{
  DBUG_ENTER("ha_tina::rnd_init");

1052
  /* set buffer to the beginning of the file */
1053 1054
  if (share->crashed || init_data_file())
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1055

1056
  current_position= next_position= 0;
1057
  stats.records= 0;
1058
  records_is_known= 0;
1059 1060 1061 1062 1063 1064
  chain_ptr= chain;

  DBUG_RETURN(0);
}

/*
1065 1066 1067 1068
  ::rnd_next() does all the heavy lifting for a table scan. You will need to
  populate *buf with the correct field data. You can walk the field to
  determine at what position you should store the data (take a look at how
  ::find_current_row() works). The structure is something like:
1069
  0Foo  Dog  Friend
1070 1071 1072 1073 1074 1075 1076
  The first offset is for the first attribute. All space before that is
  reserved for null count.
  Basically this works as a mask for which rows are nulled (compared to just
  empty).
  This table handler doesn't do nulls and does not know the difference between
  NULL and "". This is ok since this table handler is for spreadsheets and
  they don't know about them either :)
1077
*/
1078
int ha_tina::rnd_next(uchar *buf)
1079
{
1080
  int rc;
1081 1082
  DBUG_ENTER("ha_tina::rnd_next");

1083 1084 1085
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1086
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1087 1088

  current_position= next_position;
1089 1090 1091

  /* don't scan an empty file */
  if (!local_saved_data_file_length)
1092
    DBUG_RETURN(HA_ERR_END_OF_FILE);
1093

1094 1095
  if ((rc= find_current_row(buf)))
    DBUG_RETURN(rc);
1096

1097
  stats.records++;
1098 1099 1100 1101 1102
  DBUG_RETURN(0);
}

/*
  In the case of an order by rows will need to be sorted.
1103
  ::position() is called after each call to ::rnd_next(),
1104
  the data it stores is to a byte array. You can store this
1105 1106 1107
  data via my_store_ptr(). ref_length is a variable defined to the
  class that is the sizeof() of position being stored. In our case
  its just a position. Look at the bdb code if you want to see a case
1108 1109
  where something other then a number is stored.
*/
1110
void ha_tina::position(const uchar *record)
1111 1112
{
  DBUG_ENTER("ha_tina::position");
unknown's avatar
unknown committed
1113
  my_store_ptr(ref, ref_length, current_position);
1114 1115 1116 1117
  DBUG_VOID_RETURN;
}


1118 1119
/*
  Used to fetch a row from a posiion stored with ::position().
unknown's avatar
unknown committed
1120
  my_get_ptr() retrieves the data for you.
1121 1122
*/

1123
int ha_tina::rnd_pos(uchar * buf, uchar *pos)
1124 1125
{
  DBUG_ENTER("ha_tina::rnd_pos");
1126
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1127
  current_position= (off_t)my_get_ptr(pos,ref_length);
1128 1129 1130 1131 1132 1133 1134 1135
  DBUG_RETURN(find_current_row(buf));
}

/*
  ::info() is used to return information to the optimizer.
  Currently this table handler doesn't implement most of the fields
  really needed. SHOW also makes use of this data
*/
1136
int ha_tina::info(uint flag)
1137 1138 1139
{
  DBUG_ENTER("ha_tina::info");
  /* This is a lie, but you don't want the optimizer to see zero or 1 */
1140 1141
  if (!records_is_known && stats.records < 2) 
    stats.records= 2;
1142
  DBUG_RETURN(0);
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
}

/*
  Grab bag of flags that are sent to the able handler every so often.
  HA_EXTRA_RESET and HA_EXTRA_RESET_STATE are the most frequently called.
  You are not required to implement any of these.
*/
int ha_tina::extra(enum ha_extra_function operation)
{
  DBUG_ENTER("ha_tina::extra");
1153 1154 1155 1156 1157 1158
 if (operation == HA_EXTRA_MARK_AS_LOG_TABLE)
 {
   pthread_mutex_lock(&share->mutex);
   share->is_log_table= TRUE;
   pthread_mutex_unlock(&share->mutex);
 }
1159 1160 1161
  DBUG_RETURN(0);
}

1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
/*
  Set end_pos to the last valid byte of continuous area, closest
  to the given "hole", stored in the buffer. "Valid" here means,
  not listed in the chain of deleted records ("holes").
*/
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
{
  if (closest_hole == chain_ptr) /* no more chains */
    *end_pos= file_buff->end();
  else
    *end_pos= min(file_buff->end(),
                  closest_hole->begin);
  return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
}


1178
/*
1179 1180 1181 1182
  Called after each table scan. In particular after deletes,
  and updates. In the last case we employ chain of deleted
  slots to clean up all of the dead space we have collected while
  performing deletes/updates.
1183 1184 1185
*/
int ha_tina::rnd_end()
{
1186 1187
  char updated_fname[FN_REFLEN];
  off_t file_buffer_start= 0;
1188 1189
  DBUG_ENTER("ha_tina::rnd_end");

1190 1191
  records_is_known= 1;

1192 1193
  if ((chain_ptr - chain)  > 0)
  {
1194
    tina_set *ptr= chain;
1195

1196
    /*
1197 1198
      Re-read the beginning of a file (as the buffer should point to the
      end of file after the scan).
1199
    */
1200
    file_buff->init_buff(data_file);
1201 1202

    /*
1203 1204
      The sort is needed when there were updates/deletes with random orders.
      It sorts so that we move the firts blocks to the beginning.
1205
    */
1206 1207
    qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
          (qsort_cmp)sort_set);
1208 1209 1210 1211 1212 1213 1214 1215 1216

    off_t write_begin= 0, write_end;

    /* create the file to write updated table if it wasn't yet created */
    if (open_update_temp_file_if_needed())
      DBUG_RETURN(-1);

    /* write the file with updated info */
    while ((file_buffer_start != -1))     // while not end of file
1217
    {
1218
      bool in_hole= get_write_pos(&write_end, ptr);
1219
      off_t write_length= write_end - write_begin;
1220 1221

      /* if there is something to write, write it */
1222 1223 1224 1225 1226 1227 1228 1229 1230
      if (write_length)
      {
        if (my_write(update_temp_file, 
                     (uchar*) (file_buff->ptr() +
                               (write_begin - file_buff->start())),
                     write_length, MYF_RW))
          goto error;
        temp_file_length+= write_length;
      }
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
      if (in_hole)
      {
        /* skip hole */
        while (file_buff->end() <= ptr->end && file_buffer_start != -1)
          file_buffer_start= file_buff->read_next();
        write_begin= ptr->end;
        ptr++;
      }
      else
        write_begin= write_end;

      if (write_end == file_buff->end())
        file_buffer_start= file_buff->read_next(); /* shift the buffer */

1245 1246
    }

unknown's avatar
unknown committed
1247 1248
    if (my_sync(update_temp_file, MYF(MY_WME)) ||
        my_close(update_temp_file, MYF(0)))
1249 1250
      DBUG_RETURN(-1);

1251
    share->update_file_opened= FALSE;
1252

1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271
    if (share->tina_write_opened)
    {
      if (my_close(share->tina_write_filedes, MYF(0)))
        DBUG_RETURN(-1);
      /*
        Mark that the writer fd is closed, so that init_tina_writer()
        will reopen it later.
      */
      share->tina_write_opened= FALSE;
    }

    /*
      Close opened fildes's. Then move updated file in place
      of the old datafile.
    */
    if (my_close(data_file, MYF(0)) ||
        my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
                            MY_REPLACE_EXT | MY_UNPACK_FILENAME),
                  share->data_file_name, MYF(0)))
1272 1273
      DBUG_RETURN(-1);

unknown's avatar
unknown committed
1274 1275
    /* Open the file again */
    if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
1276
      DBUG_RETURN(-1);
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
    /*
      As we reopened the data file, increase share->data_file_version 
      in order to force other threads waiting on a table lock and  
      have already opened the table to reopen the data file.
      That makes the latest changes become visible to them.
      Update local_data_file_version as no need to reopen it in the 
      current thread.
    */
    share->data_file_version++;
    local_data_file_version= share->data_file_version;
1287 1288 1289 1290 1291 1292
    /*
      The datafile is consistent at this point and the write filedes is
      closed, so nothing worrying will happen to it in case of a crash.
      Here we record this fact to the meta-file.
    */
    (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
1293 1294 1295 1296
    /* 
      Update local_saved_data_file_length with the real length of the 
      data file.
    */
1297
    local_saved_data_file_length= temp_file_length;
1298 1299 1300
  }

  DBUG_RETURN(0);
1301 1302 1303 1304
error:
  my_close(update_temp_file, MYF(0));
  share->update_file_opened= FALSE;
  DBUG_RETURN(-1);
1305 1306
}

unknown's avatar
unknown committed
1307

1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
/*
  Repair CSV table in the case, it is crashed.

  SYNOPSIS
    repair()
    thd         The thread, performing repair
    check_opt   The options for repair. We do not use it currently.

  DESCRIPTION
    If the file is empty, change # of rows in the file and complete recovery.
    Otherwise, scan the table looking for bad rows. If none were found,
    we mark file as a good one and return. If a bad row was encountered,
    we truncate the datafile up to the last good row.

   TODO: Make repair more clever - it should try to recover subsequent
         rows (after the first bad one) as well.
*/

1326 1327 1328
int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
{
  char repaired_fname[FN_REFLEN];
1329
  uchar *buf;
1330 1331 1332
  File repair_file;
  int rc;
  ha_rows rows_repaired= 0;
1333
  off_t write_begin= 0, write_end;
1334 1335 1336
  DBUG_ENTER("ha_tina::repair");

  /* empty file */
1337
  if (!share->saved_data_file_length)
1338 1339 1340 1341 1342
  {
    share->rows_recorded= 0;
    goto end;
  }

1343 1344
  /* Don't assert in field::val() functions */
  table->use_all_columns();
1345
  if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1346 1347
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);

1348
  /* position buffer to the start of the file */
1349 1350
  if (init_data_file())
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1351

1352 1353 1354 1355 1356
  /*
    Local_saved_data_file_length is initialized during the lock phase.
    Sometimes this is not getting executed before ::repair (e.g. for
    the log tables). We set it manually here.
  */
1357
  local_saved_data_file_length= share->saved_data_file_length;
1358 1359 1360
  /* set current position to the beginning of the file */
  current_position= next_position= 0;

1361 1362 1363
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
  while (!(rc= find_current_row(buf)))
  {
1364
    thd_inc_row_count(thd);
1365 1366 1367 1368 1369 1370
    rows_repaired++;
    current_position= next_position;
  }

  my_free((char*)buf, MYF(0));

1371 1372 1373
  if (rc == HA_ERR_END_OF_FILE)
  {
    /*
1374 1375 1376
      All rows were read ok until end of file, the file does not need repair.
      If rows_recorded != rows_repaired, we should update rows_recorded value
      to the current amount of rows.
1377 1378
    */
    share->rows_recorded= rows_repaired;
1379
    goto end;
1380
  }
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391

  /*
    Otherwise we've encountered a bad row => repair is needed.
    Let us create a temporary file.
  */
  if ((repair_file= my_create(fn_format(repaired_fname, share->table_name,
                                        "", CSN_EXT,
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),
                           0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);

1392 1393 1394
  file_buff->init_buff(data_file);


1395
  /* we just truncated the file up to the first bad row. update rows count. */
1396 1397
  share->rows_recorded= rows_repaired;

1398 1399 1400 1401 1402
  /* write repaired file */
  while (1)
  {
    write_end= min(file_buff->end(), current_position);
    if ((write_end - write_begin) &&
1403
        (my_write(repair_file, (uchar*)file_buff->ptr(),
1404 1405 1406 1407 1408 1409 1410 1411 1412
                  write_end - write_begin, MYF_RW)))
      DBUG_RETURN(-1);

    write_begin= write_end;
    if (write_end== current_position)
      break;
    else
      file_buff->read_next(); /* shift the buffer */
  }
1413 1414

  /*
1415 1416 1417 1418
    Close the files and rename repaired file to the datafile.
    We have to close the files, as on Windows one cannot rename
    a file, which descriptor is still open. EACCES will be returned
    when trying to delete the "to"-file in my_rename().
1419
  */
1420 1421
  if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
      my_rename(repaired_fname, share->data_file_name, MYF(0)))
1422 1423 1424
    DBUG_RETURN(-1);

  /* Open the file again, it should now be repaired */
1425 1426
  if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
                          MYF(0))) == -1)
1427 1428
     DBUG_RETURN(-1);

1429 1430
  /* Set new file size. The file size will be updated by ::update_status() */
  local_saved_data_file_length= (size_t) current_position;
1431 1432 1433 1434 1435 1436 1437

end:
  share->crashed= FALSE;
  DBUG_RETURN(HA_ADMIN_OK);
}

/*
unknown's avatar
unknown committed
1438
  DELETE without WHERE calls this
1439
*/
unknown's avatar
unknown committed
1440

1441 1442
int ha_tina::delete_all_rows()
{
unknown's avatar
unknown committed
1443
  int rc;
1444 1445
  DBUG_ENTER("ha_tina::delete_all_rows");

1446
  if (!records_is_known)
unknown's avatar
unknown committed
1447
    DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND);
1448

1449 1450 1451
  if (!share->tina_write_opened)
    if (init_tina_writer())
      DBUG_RETURN(-1);
1452 1453

  /* Truncate the file to zero size */
1454
  rc= my_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
1455

1456
  stats.records=0;
1457 1458 1459 1460 1461
  /* Update shared info */
  pthread_mutex_lock(&share->mutex);
  share->rows_recorded= 0;
  pthread_mutex_unlock(&share->mutex);
  local_saved_data_file_length= 0;
1462 1463 1464
  DBUG_RETURN(rc);
}

1465
/*
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
  Called by the database to lock the table. Keep in mind that this
  is an internal lock.
*/
THR_LOCK_DATA **ha_tina::store_lock(THD *thd,
                                    THR_LOCK_DATA **to,
                                    enum thr_lock_type lock_type)
{
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
    lock.type=lock_type;
  *to++= &lock;
  return to;
}

/* 
  Create a table. You do not want to leave the table open after a call to
  this (the database will call ::open() if it needs to).
*/

1484 1485
int ha_tina::create(const char *name, TABLE *table_arg,
                    HA_CREATE_INFO *create_info)
1486 1487 1488 1489 1490
{
  char name_buff[FN_REFLEN];
  File create_file;
  DBUG_ENTER("ha_tina::create");

unknown's avatar
unknown committed
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
  /*
    check columns
  */
  for (Field **field= table_arg->s->field; *field; field++)
  {
    if ((*field)->real_maybe_null())
      DBUG_RETURN(-1);
  }
  

1501 1502 1503 1504 1505 1506 1507 1508 1509
  if ((create_file= my_create(fn_format(name_buff, name, "", CSM_EXT,
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME), 0,
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
    DBUG_RETURN(-1);

  write_meta_file(create_file, 0, FALSE);
  my_close(create_file, MYF(0));

  if ((create_file= my_create(fn_format(name_buff, name, "", CSV_EXT,
1510
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
1511
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
1512 1513
    DBUG_RETURN(-1);

1514
  my_close(create_file, MYF(0));
1515 1516 1517

  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1518

1519 1520 1521
int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
{
  int rc= 0;
1522
  uchar *buf;
1523 1524 1525 1526 1527
  const char *old_proc_info;
  ha_rows count= share->rows_recorded;
  DBUG_ENTER("ha_tina::check");

  old_proc_info= thd_proc_info(thd, "Checking table");
1528
  if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1529 1530
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);

1531
  /* position buffer to the start of the file */
1532 1533
   if (init_data_file())
     DBUG_RETURN(HA_ERR_CRASHED);
1534

1535 1536 1537 1538 1539
  /*
    Local_saved_data_file_length is initialized during the lock phase.
    Check does not use store_lock in certain cases. So, we set it
    manually here.
  */
1540
  local_saved_data_file_length= share->saved_data_file_length;
1541 1542 1543 1544 1545
  /* set current position to the beginning of the file */
  current_position= next_position= 0;
  /* Read the file row-by-row. If everything is ok, repair is not needed. */
  while (!(rc= find_current_row(buf)))
  {
1546
    thd_inc_row_count(thd);
1547 1548 1549 1550 1551 1552 1553 1554 1555
    count--;
    current_position= next_position;
  }

  my_free((char*)buf, MYF(0));
  thd_proc_info(thd, old_proc_info);

  if ((rc != HA_ERR_END_OF_FILE) || count)
  {
1556
    share->crashed= TRUE;
1557 1558 1559 1560 1561 1562 1563
    DBUG_RETURN(HA_ADMIN_CORRUPT);
  }
  else
    DBUG_RETURN(HA_ADMIN_OK);
}


1564 1565 1566 1567 1568 1569
bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info,
					   uint table_changes)
{
  return COMPATIBLE_DATA_YES;
}

unknown's avatar
unknown committed
1570
struct st_mysql_storage_engine csv_storage_engine=
1571
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
unknown's avatar
unknown committed
1572 1573

mysql_declare_plugin(csv)
unknown's avatar
unknown committed
1574 1575
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
unknown's avatar
unknown committed
1576 1577
  &csv_storage_engine,
  "CSV",
unknown's avatar
unknown committed
1578
  "Brian Aker, MySQL AB",
unknown's avatar
unknown committed
1579
  "CSV storage engine",
1580
  PLUGIN_LICENSE_GPL,
unknown's avatar
unknown committed
1581
  tina_init_func, /* Plugin Init */
unknown's avatar
unknown committed
1582 1583
  tina_done_func, /* Plugin Deinit */
  0x0100 /* 1.0 */,
1584 1585 1586
  NULL,                       /* status variables                */
  NULL,                       /* system variables                */
  NULL                        /* config options                  */
unknown's avatar
unknown committed
1587 1588
}
mysql_declare_plugin_end;
unknown's avatar
unknown committed
1589