ha_federated.cc 58.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/* Copyright (C) 2004 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2 of the License, or
  (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*

  MySQL Federated Storage Engine

  ha_federated.cc - MySQL Federated Storage Engine
  Patrick Galbraith and Brian Aker, 2004

24
  This is a handler which uses a foreign database as the data file, as
25 26 27 28
  opposed to a handler like MyISAM, which uses .MYD files locally.

  How this handler works
  ----------------------------------
29 30
  Normal database files are local and as such: You create a table called
  'users', a file such as 'users.MYD' is created. A handler reads, inserts,
31 32
  deletes, updates data in this file. The data is stored in particular format,
  so to read, that data has to be parsed into fields, to write, fields have to
33
  be stored in this format to write to this data file.
34 35

  With MySQL Federated storage engine, there will be no local files for each
36 37
  table's data (such as .MYD). A foreign database will store the data that would
  normally be in this file. This will necessitate the use of MySQL client API
38 39
  to read, delete, update, insert this data. The data will have to be retrieve
  via an SQL call "SELECT * FROM users". Then, to read this data, it will have
40
  to be retrieved via mysql_fetch_row one row at a time, then converted from
41 42
  the  column in this select into the format that the handler expects.

43 44
  The create table will simply create the .frm file, and within the
  "CREATE TABLE" SQL, there SHALL be any of the following :
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59
  comment=scheme://username:password@hostname:port/database/tablename
  comment=scheme://username@hostname/database/tablename
  comment=scheme://username:password@hostname/database/tablename
  comment=scheme://username:password@hostname/database/tablename

  An example would be:

  comment=mysql://username:password@hostname:port/database/tablename

  ***IMPORTANT***

  Only 'mysql://' is supported at this release.


60 61
  This comment connection string is necessary for the handler to be
  able to connect to the foreign server.
62 63 64 65 66


  The basic flow is this:

  SQL calls issues locally ->
67 68 69
  mysql handler API (data in handler format) ->
  mysql client API (data converted to SQL calls) ->
  foreign database -> mysql client API ->
70 71 72 73 74
  convert result sets (if any) to handler format ->
  handler API -> results or rows affected to local

  What this handler does and doesn't support
  ------------------------------------------
75 76
  * Tables MUST be created on the foreign server prior to any action on those
    tables via the handler, first version. IMPORTANT: IF you MUST use the
77
    federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
78
    the table you connect to IS NOT a table pointing BACK to your ORIGNAL
79
    table! You know  and have heard the screaching of audio feedback? You
80
    know putting two mirror in front of each other how the reflection
81 82
    continues for eternity? Well, need I say more?!
  * There will not be support for transactions.
83
  * There is no way for the handler to know if the foreign database or table
84
    has changed. The reason for this is that this database has to work like a
85 86 87
    data file that would never be written to by anything other than the
    database. The integrity of the data in the local table could be breached
    if there was any change to the foreign database.
88
  * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
89
  * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
90
  * Prepared statements will not be used in the first implementation, it
91
    remains to to be seen whether the limited subset of the client API for the
92
    server supports this.
93 94
  * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
    implementation.
95
  * This will not work with the query cache.
96 97 98 99 100 101 102

   Method calls

   A two column table, with one record:

   (SELECT)

103
   "SELECT * FROM foo"
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    ha_federated::info
    ha_federated::scan_time:
    ha_federated::rnd_init: share->select_query SELECT * FROM foo
    ha_federated::extra

    <for every row of data retrieved>
    ha_federated::rnd_next
    ha_federated::convert_row_to_internal_format
    ha_federated::rnd_next
    </for every row of data retrieved>

    ha_federated::rnd_end
    ha_federated::extra
    ha_federated::reset

    (INSERT)

    "INSERT INTO foo (id, ts) VALUES (2, now());"

    ha_federated::write_row

    <for every field/column>
126 127
    Field::quote_data
    Field::quote_data
128 129 130 131 132 133 134 135 136 137 138
    </for every field/column>

    ha_federated::reset

    (UPDATE)

    "UPDATE foo SET ts = now() WHERE id = 1;"

    ha_federated::index_init
    ha_federated::index_read
    ha_federated::index_read_idx
139
    Field::quote_data
140 141 142
    ha_federated::rnd_next
    ha_federated::convert_row_to_internal_format
    ha_federated::update_row
143

144
    <quote 3 cols, new and old data>
145 146 147 148 149 150
    Field::quote_data
    Field::quote_data
    Field::quote_data
    Field::quote_data
    Field::quote_data
    Field::quote_data
151
    </quote 3 cols, new and old data>
152

153 154 155 156 157 158 159 160 161 162
    ha_federated::extra
    ha_federated::extra
    ha_federated::extra
    ha_federated::external_lock
    ha_federated::reset


    How do I use this handler?
    --------------------------
    First of all, you need to build this storage engine:
163

164 165 166
      ./configure --with-federated-storage-engine
      make

167
    Next, to use this handler, it's very simple. You must
168 169 170
    have two databases running, either both on the same host, or
    on different hosts.

171
    One the server that will be connecting to the foreign
172 173
    host (client), you create your table as such:

174
    CREATE TABLE test_table (
175 176 177 178 179
      id     int(20) NOT NULL auto_increment,
      name   varchar(32) NOT NULL default '',
      other  int(20) NOT NULL default '0',
      PRIMARY KEY  (id),
      KEY name (name),
180 181 182
      KEY other_key (other))
       ENGINE="FEDERATED"
       DEFAULT CHARSET=latin1
183 184
       COMMENT='root@127.0.0.1:9306/federated/test_federated';

185 186 187 188 189 190 191 192
   Notice the "COMMENT" and "ENGINE" field? This is where you
   respectively set the engine type, "FEDERATED" and foreign
   host information, this being the database your 'client' database
   will connect to and use as the "data file". Obviously, the foreign
   database is running on port 9306, so you want to start up your other
   database so that it is indeed on port 9306, and your federated
   database on a port other than that. In my setup, I use port 5554
   for federated, and port 5555 for the foreign database.
193

194
   Then, on the foreign database:
195

196
   CREATE TABLE test_table (
197 198 199 200 201
     id     int(20) NOT NULL auto_increment,
     name   varchar(32) NOT NULL default '',
     other  int(20) NOT NULL default '0',
     PRIMARY KEY  (id),
     KEY name (name),
202
     KEY other_key (other))
203 204 205 206
     ENGINE="<NAME>" <-- whatever you want, or not specify
     DEFAULT CHARSET=latin1 ;

    This table is exactly the same (and must be exactly the same),
207
    except that it is not using the federated handler and does
208
    not need the URL.
209

210 211 212 213 214 215 216

    How to see the handler in action
    --------------------------------

    When developing this handler, I compiled the federated database with
    debugging:

217
    ./configure --with-federated-storage-engine
218 219 220 221
    --prefix=/home/mysql/mysql-build/federated/ --with-debug

    Once compiled, I did a 'make install' (not for the purpose of installing
    the binary, but to install all the files the binary expects to see in the
222 223 224 225
    diretory I specified in the build with --prefix,
    "/home/mysql/mysql-build/federated".

    Then, I started the foreign server:
226

227
    /usr/local/mysql/bin/mysqld_safe
228 229 230 231 232 233 234 235 236 237 238 239 240
    --user=mysql --log=/tmp/mysqld.5555.log -P 5555

    Then, I went back to the directory containing the newly compiled mysqld,
    <builddir>/sql/, started up gdb:

    gdb ./mysqld

    Then, withn the (gdb) prompt:
    (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug

    Next, I open several windows for each:

    1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
241
    2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
242 243 244
    3. A window with a client open to the federated server on port 5554
    4. A window with a client open to the federated server on port 5555

245
    I would create a table on the client to the foreign server on port
246 247
    5555, and then to the federated server on port 5554. At this point,
    I would run whatever queries I wanted to on the federated server,
248
    just always remembering that whatever changes I wanted to make on
249
    the table, or if I created new tables, that I would have to do that
250 251 252
    on the foreign server.

    Another thing to look for is 'show variables' to show you that you have
253 254 255 256 257 258 259 260 261 262 263 264 265 266
    support for federated handler support:

    show variables like '%federat%'

    and:

    show storage engines;

    Both should display the federated storage handler.


    Testing
    -------

267
    There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
268 269 270
    federatedd.test It starts both a slave and master database using
    the same setup that the replication tests use, with the exception that
    it turns off replication, and sets replication to ignore the test tables.
271 272
    After ensuring that you actually do have support for the federated storage
    handler, numerous queries/inserts/updates/deletes are run, many derived
273 274 275 276
    from the MyISAM tests, plus som other tests which were meant to reveal
    any issues that would be most likely to affect this handler. All tests
    should work! ;)

277
    To run these tests, go into ./mysql-test (based in the directory you
278 279 280
    built the server in)

    ./mysql-test-run federatedd
281

282 283 284 285
    To run the test, or if you want to run the test and have debug info:

    ./mysql-test-run --debug federated

286
    This will run the test in debug mode, and you can view the trace and
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
    log files in the ./mysql-test/var/log directory

    ls -l mysql-test/var/log/
    -rw-r--r--  1 patg  patg        17  4 Dec 12:27 current_test
    -rw-r--r--  1 patg  patg       692  4 Dec 12:52 manager.log
    -rw-rw----  1 patg  patg     21246  4 Dec 12:51 master-bin.000001
    -rw-rw----  1 patg  patg        68  4 Dec 12:28 master-bin.index
    -rw-r--r--  1 patg  patg      1620  4 Dec 12:51 master.err
    -rw-rw----  1 patg  patg     23179  4 Dec 12:51 master.log
    -rw-rw----  1 patg  patg  16696550  4 Dec 12:51 master.trace
    -rw-r--r--  1 patg  patg         0  4 Dec 12:28 mysqltest-time
    -rw-r--r--  1 patg  patg   2024051  4 Dec 12:51 mysqltest.trace
    -rw-rw----  1 patg  patg     94992  4 Dec 12:51 slave-bin.000001
    -rw-rw----  1 patg  patg        67  4 Dec 12:28 slave-bin.index
    -rw-rw----  1 patg  patg       249  4 Dec 12:52 slave-relay-bin.000003
    -rw-rw----  1 patg  patg        73  4 Dec 12:28 slave-relay-bin.index
    -rw-r--r--  1 patg  patg      1349  4 Dec 12:51 slave.err
    -rw-rw----  1 patg  patg     96206  4 Dec 12:52 slave.log
    -rw-rw----  1 patg  patg  15706355  4 Dec 12:51 slave.trace
    -rw-r--r--  1 patg  patg         0  4 Dec 12:51 warnings

    Of course, again, you can tail the trace log:

310
    tail -f mysql-test/var/log/master.trace |grep ha_fed
311 312

    As well as the slave query log:
313

314 315 316 317 318 319 320 321 322 323 324 325 326
    tail -f mysql-test/var/log/slave.log

    Files that comprise the test suit
    ---------------------------------
    mysql-test/t/federated.test
    mysql-test/r/federated.result
    mysql-test/r/have_federated_db.require
    mysql-test/include/have_federated_db.inc


    Other tidbits
    -------------

327
    These were the files that were modified or created for this
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
    Federated handler to work:

    ./configure.in
    ./sql/Makefile.am
    ./config/ac_macros/ha_federated.m4
    ./sql/handler.cc
    ./sql/mysqld.cc
    ./sql/set_var.cc
    ./sql/field.h
    ./sql/sql_string.h
    ./mysql-test/mysql-test-run(.sh)
    ./mysql-test/t/federated.test
    ./mysql-test/r/federated.result
    ./mysql-test/r/have_federated_db.require
    ./mysql-test/include/have_federated_db.inc
    ./sql/ha_federated.cc
    ./sql/ha_federated.h
345

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
346
*/
347 348

#ifdef __GNUC__
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
349
#pragma implementation                          // gcc: Class implementation
350 351 352 353 354 355 356 357
#endif

#include <mysql_priv.h>

#ifdef HAVE_FEDERATED_DB
#include "ha_federated.h"
#define MAX_REMOTE_SIZE IO_SIZE
/* Variables for federated share methods */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
358 359 360 361
static HASH federated_open_tables;              // Hash used to track open
                                                // tables
pthread_mutex_t federated_mutex;                // This is the mutex we use to
                                                // init the hash
362
static int federated_init= FALSE;               // Variable for checking the
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
363
                                                // init state of hash
364

365 366
/* Function we use in the creation of our hash to get key.  */

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
367 368
static byte *federated_get_key(FEDERATED_SHARE *share, uint *length,
                               my_bool not_used __attribute__ ((unused)))
369 370 371 372 373
{
  *length= share->table_name_length;
  return (byte*) share->table_name;
}

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
/*
  Initialize the federated handler.

  SYNOPSIS
    federated_db_init()
    void

  RETURN
    FALSE       OK
    TRUE        Error
*/

bool federated_db_init()
{
  federated_init= 1;
  VOID(pthread_mutex_init(&federated_mutex, MY_MUTEX_INIT_FAST));
  return (hash_init(&federated_open_tables, system_charset_info, 32, 0, 0,
                    (hash_get_key) federated_get_key, 0, 0));
}


/*
  Release the federated handler.

  SYNOPSIS
    federated_db_end()
    void

  RETURN
    FALSE       OK
*/

bool federated_db_end()
{
  if (federated_init)
  {
    hash_free(&federated_open_tables);
    VOID(pthread_mutex_destroy(&federated_mutex));
  }
  federated_init= 0;
  return FALSE;
415 416
}

417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471

/*
 Check (in create) whether the tables exists, and that it can be connected to

  SYNOPSIS
    check_foreign_data_source()
      share               pointer to FEDERATED share

  DESCRIPTION
    This method first checks that the connection information that parse url
    has populated into the share will be sufficient to connect to the foreign
    table, and if so, does the foreign table exist.
*/

static int check_foreign_data_source(FEDERATED_SHARE *share)
{
  char escaped_table_base_name[IO_SIZE];
  MYSQL *mysql;
  MYSQL_RES *result=0;
  uint error_code;
  char query_buffer[IO_SIZE];
  char error_buffer[IO_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::check_foreign_data_source");
  query.length(0);

  /* error out if we can't alloc memory for mysql_init(NULL) (per Georg) */
  if (! (mysql= mysql_init(NULL)))
  {
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }
  /* check if we can connect */
  if (!mysql_real_connect(mysql,
                          share->hostname,
                          share->username,
                          share->password,
                          share->database,
                          share->port,
                          share->socket, 0))
  {
    my_sprintf(error_buffer,
               (error_buffer,
                "unable to connect to database '%s' on host '%s as user '%s' !",
               share->database, share->hostname, share->username));
    error_code= ER_CONNECT_TO_MASTER;
    goto error;
  }
  else
  {
    /* 
      Note: I am not using INORMATION_SCHEMA because this needs to work with < 5.0
      if we can connect, then make sure the table exists 
    */
    query.append("SHOW TABLES LIKE '");
    escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_base_name,
472
                            sizeof(escaped_table_base_name),
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
                            share->table_base_name,
                            share->table_base_name_length);
    query.append(escaped_table_base_name);
    query.append("'");

    error_code= ER_QUERY_ON_MASTER;
    if (mysql_real_query(mysql, query.ptr(), query.length()))
      goto error;

    result= mysql_store_result(mysql);
    if (! result)
      goto error;

    /* if ! mysql_num_rows, the table doesn't exist, send error */
    if (! mysql_num_rows(result))
    {
      my_sprintf(error_buffer,
                 (error_buffer, "foreign table '%s' does not exist!",
                  share->table_base_name));
      goto error;
    }
    mysql_free_result(result);
    result= 0;
    mysql_close(mysql);

  }
  DBUG_RETURN(0);

error:
    if (result)
      mysql_free_result(result);
    mysql_close(mysql);
    my_error(error_code, MYF(0), error_buffer);
    DBUG_RETURN(error_code);

}


511
/*
512
  Parse connection info from table->s->comment
513 514 515

  SYNOPSIS
    parse_url()
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
516 517 518
      share               pointer to FEDERATED share
      table               pointer to current TABLE class
      table_create_flag   determines what error to throw
519

520 521
  DESCRIPTION
    populates the share with information about the connection
522
    to the foreign database that will serve as the data source.
523
    This string must be specified (currently) in the "comment" field,
524
    listed in the CREATE TABLE statement.
525 526 527

    This string MUST be in the format of any of these:

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
528 529 530 531
    scheme://username:password@hostname:port/database/table
    scheme://username@hostname/database/table
    scheme://username@hostname:port/database/table
    scheme://username:password@hostname/database/table
532 533 534 535 536 537 538

  An Example:

  mysql://joe:joespass@192.168.1.111:9308/federated/testtable

  ***IMPORTANT***
  Currently, only "mysql://" is supported.
539 540 541 542 543

    'password' and 'port' are both optional.

  RETURN VALUE
    0   success
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
544
    1  failure, wrong string format
545 546

*/
547

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
548 549
static int parse_url(FEDERATED_SHARE *share, TABLE *table,
                     uint table_create_flag)
550
{
551 552
  uint error_num= (table_create_flag ? ER_CANT_CREATE_TABLE :
                   ER_CONNECT_TO_MASTER);
553
  DBUG_ENTER("ha_federated::parse_url");
554

555 556
  share->port= 0;
  share->socket= 0;
557
  share->scheme= my_strdup(table->s->comment, MYF(0));
558

559
  if ((share->username= strstr(share->scheme, "://")))
560
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
561
    share->scheme[share->username - share->scheme]= '\0';
562

563
    if (strcmp(share->scheme, "mysql") != 0)
564 565
      goto error;

566
    share->username+= 3;
567

568
    if ((share->hostname= strchr(share->username, '@')))
569
    {
570 571
      share->username[share->hostname - share->username]= '\0';
      share->hostname++;
572

573
      if ((share->password= strchr(share->username, ':')))
574
      {
575 576 577
        share->username[share->password - share->username]= '\0';
        share->password++;
        share->username= share->username;
578
        /* make sure there isn't an extra / or @ */
579
        if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
580 581 582 583 584
          goto error;
        /*
          Found that if the string is:
          user:@hostname:port/database/table
          Then password is a null string, so set to NULL
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
585
        */
586
        if ((share->password[0] == '\0'))
587
          share->password= NULL;
588 589
      }
      else
590 591
        share->username= share->username;

592
      /* make sure there isn't an extra / or @ */
593
      if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
594
        goto error;
595

596
      if ((share->database= strchr(share->hostname, '/')))
597
      {
598 599 600
        share->hostname[share->database - share->hostname]= '\0';
        share->database++;

601
        if ((share->sport= strchr(share->hostname, ':')))
602 603 604 605 606 607 608 609 610
        {
          share->hostname[share->sport - share->hostname]= '\0';
          share->sport++;
          if (share->sport[0] == '\0')
            share->sport= NULL;
          else
            share->port= atoi(share->sport);
        }

611
        if ((share->table_base_name= strchr(share->database, '/')))
612 613 614 615 616
        {
          share->database[share->table_base_name - share->database]= '\0';
          share->table_base_name++;
        }
        else
617
          goto error;
618 619

        share->table_base_name_length= strlen(share->table_base_name);
620 621
      }
      else
622 623
        goto error;
      /* make sure there's not an extra / */
624
      if ((strchr(share->table_base_name, '/')))
625 626
        goto error;

627 628 629
      if (share->hostname[0] == '\0')
        share->hostname= NULL;

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
630 631
      if (!share->port)
      {
632 633
        if (strcmp(share->hostname, my_localhost) == 0)
          share->socket= my_strdup(MYSQL_UNIX_ADDR, MYF(0));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
634
        else
635
          share->port= MYSQL_PORT;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
636 637
      }

638
      DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
639
                 ("scheme %s username %s password %s \
640 641
                  hostname %s port %d database %s tablename %s\n",
                  share->scheme, share->username, share->password,
642 643
                  share->hostname, share->port, share->database,
                  share->table_base_name));
644 645
    }
    else
646
      goto error;
647 648
  }
  else
649 650 651 652 653
    goto error;

  DBUG_RETURN(0);

error:
654
    my_error(error_num, MYF(0),
monty@mysql.com's avatar
monty@mysql.com committed
655
             "connection string is not in the correct format",0);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
656
    DBUG_RETURN(1);
657

658 659
}

660

661
/*
662 663 664 665
  Convert MySQL result set row to handler internal format

  SYNOPSIS
    convert_row_to_internal_format()
666
      record    Byte pointer to record
667 668 669
      row       MySQL result set row from fetchrow()

  DESCRIPTION
670
    This method simply iterates through a row returned via fetchrow with
671 672
    values from a successful SELECT , and then stores each column's value
    in the field object via the field object pointer (pointing to the table's
673
    array of field object pointers). This is how the handler needs the data
674 675 676
    to be stored to then return results back to the user

  RETURN VALUE
677
    0   After fields have had field values stored from record
678
 */
679

680 681
uint ha_federated::convert_row_to_internal_format(byte *record, MYSQL_ROW row)
{
682 683 684
  ulong *lengths;
  uint num_fields;
  uint x= 0;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
685

686 687
  DBUG_ENTER("ha_federated::convert_row_to_internal_format");

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
688
  num_fields= mysql_num_fields(result);
689
  lengths= mysql_fetch_lengths(result);
690

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
691
  memset(record, 0, table->s->null_bytes);
692

693
  for (Field **field= table->field; *field; field++, x++)
694
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
695
    if (!row[x])
696 697
      (*field)->set_null();
    else
698
      (*field)->store(row[x], lengths[x], &my_charset_bin);
699 700 701 702 703
  }

  DBUG_RETURN(0);
}

704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
/*
  Create a WHERE clause based off of values in keys
  Note: This code was inspired by key_copy from key.cc

  SYNOPSIS
    create_where_from_key ()
      to          String object to store WHERE clause
      key_info    KEY struct pointer
      key         byte pointer containing key
      key_length  length of key

  DESCRIPTION
    Using iteration through all the keys via a KEY_PART_INFO pointer,
    This method 'extracts' the value of each key in the byte pointer
    *key, and for each key found, constructs an appropriate WHERE clause

  RETURN VALUE
    0   After all keys have been accounted for to create the WHERE clause
    1   No keys found

  */

monty@mysql.com's avatar
monty@mysql.com committed
726 727
bool ha_federated::create_where_from_key(String *to, KEY *key_info,
                                         const byte *key, uint key_length)
728
{
729 730
  uint second_loop= 0;
  KEY_PART_INFO *key_part;
731
  bool needs_quotes;
monty@mysql.com's avatar
monty@mysql.com committed
732
  String tmp;
733

734
  DBUG_ENTER("ha_federated::create_where_from_key");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
735
  for (key_part= key_info->key_part; (int) key_length > 0; key_part++)
736
  {
737
    Field *field= key_part->field;
738
    needs_quotes= field->needs_quotes();
739 740
    uint length= key_part->length;

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
741
    if (second_loop++ && to->append(" AND ", 5))
742
      DBUG_RETURN(1);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
743 744
    if (to->append('`') || to->append(field->field_name) || to->append("` ", 2))
      DBUG_RETURN(1);                           // Out of memory
745 746 747 748 749

    if (key_part->null_bit)
    {
      if (*key++)
      {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
750
        if (to->append("IS NULL", 7))
751
          DBUG_RETURN(1);
752

753
        DBUG_PRINT("info",
754
                   ("NULL type %s", to->c_ptr_quick()));
755
        key_length-= key_part->store_length;
756
        key+= key_part->store_length - 1;
757 758 759 760
        continue;
      }
      key_length--;
    }
761 762 763
    if (to->append("= "))
      DBUG_RETURN(1);
    if (needs_quotes && to->append("'"))
764 765 766
      DBUG_RETURN(1);
    if (key_part->type == HA_KEYTYPE_BIT)
    {
767
      /* This is can be treated as a hex string */
768
      Field_bit *field= (Field_bit *) (key_part->field);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
769
      char buff[64 + 2], *ptr;
770
      byte *end= (byte*)(key)+length;
771

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
772 773 774
      buff[0]= '0';
      buff[1]= 'x';
      for (ptr= buff + 2; key < end; key++)
775
      {
776
        uint tmp= (uint)(uchar) *key;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
777 778
        *ptr++= _dig_vec_upper[tmp >> 4];
        *ptr++= _dig_vec_upper[tmp & 15];
779
      }
780
      if (to->append(buff, (uint)(ptr - buff)))
781
        DBUG_RETURN(1);
782

783 784 785 786 787 788 789 790
      key_length-= length;
      continue;
    }
    if (key_part->key_part_flag & HA_BLOB_PART)
    {
      uint blob_length= uint2korr(key);
      key+= HA_KEY_BLOB_LENGTH;
      key_length-= HA_KEY_BLOB_LENGTH;
monty@mysql.com's avatar
monty@mysql.com committed
791 792 793

      tmp.set_quick((char*) key, blob_length, &my_charset_bin);
      if (append_escaped(to, &tmp))
794
        DBUG_RETURN(1);
795

796 797 798 799
      length= key_part->length;
    }
    else if (key_part->key_part_flag & HA_VAR_LENGTH_PART)
    {
800
      length= uint2korr(key);
801
      key+= HA_KEY_BLOB_LENGTH;
monty@mysql.com's avatar
monty@mysql.com committed
802 803
      tmp.set_quick((char*) key, length, &my_charset_bin);
      if (append_escaped(to, &tmp))
804 805 806 807 808 809
        DBUG_RETURN(1);
    }
    else
    {
      char buff[MAX_FIELD_WIDTH];
      String str(buff, sizeof(buff), field->charset()), *res;
810

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
811
      res= field->val_str(&str, (char*) (key));
812 813
      if (field->result_type() == STRING_RESULT)
      {
monty@mysql.com's avatar
monty@mysql.com committed
814
        if (append_escaped(to, res))
815
          DBUG_RETURN(1);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
816
        res= field->val_str(&str, (char*) (key));
817 818 819 820
      }
      else if (to->append(res->ptr(), res->length()))
        DBUG_RETURN(1);
    }
821 822
    if (needs_quotes && to->append("'"))
      DBUG_RETURN(1);
823
    DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
824
               ("final value for 'to' %s", to->c_ptr_quick()));
825 826
    key+= length;
    key_length-= length;
827 828
    DBUG_RETURN(0);
  }
829
  DBUG_RETURN(1);
830 831 832 833 834 835 836
}

/*
  Example of simple lock controls. The "share" it creates is structure we will
  pass to each federated handler. Do you have to have one of these? Well, you
  have pieces that are used for locking, and they are needed to function.
*/
837

838 839 840 841 842 843 844 845 846 847
static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
{
  FEDERATED_SHARE *share;
  char query_buffer[IO_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  query.length(0);

  uint table_name_length, table_base_name_length;
  char *tmp_table_name, *tmp_table_base_name, *table_base_name, *select_query;

848
  /* share->table_name has the file location - we want the table's name!  */
849
  table_base_name= (char*) table->s->table_name;
850
  DBUG_PRINT("info", ("table_name %s", table_base_name));
851
  /*
852 853 854 855
    So why does this exist? There is no way currently to init a storage engine.
    Innodb and BDB both have modifications to the server to allow them to
    do this. Since you will not want to do this, this is probably the next
    best method.
856 857 858 859 860
  */
  pthread_mutex_lock(&federated_mutex);
  table_name_length= (uint) strlen(table_name);
  table_base_name_length= (uint) strlen(table_base_name);

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
861
  if (!(share= (FEDERATED_SHARE *) hash_search(&federated_open_tables,
862
                                               (byte*) table_name,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
863
                                               table_name_length)))
864 865
  {
    query.set_charset(system_charset_info);
866
    query.append("SELECT * FROM `");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
867

868 869 870
    if (!(share= (FEDERATED_SHARE *)
          my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
                          &share, sizeof(*share),
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
871
                          &tmp_table_name, table_name_length + 1,
872 873
                          &select_query, query.length() +
                          strlen(table->s->comment) + 1,  NullS)))
874 875 876 877 878
    {
      pthread_mutex_unlock(&federated_mutex);
      return NULL;
    }

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
879 880 881
    if (parse_url(share, table, 0))
      goto error;

882 883
    query.append(share->table_base_name);
    query.append("`");
884 885 886 887
    share->use_count= 0;
    share->table_name_length= table_name_length;
    share->table_name= tmp_table_name;
    share->select_query= select_query;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
888 889
    strmov(share->table_name, table_name);
    strmov(share->select_query, query.ptr());
890
    DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
891
               ("share->select_query %s", share->select_query));
892 893 894
    if (my_hash_insert(&federated_open_tables, (byte*) share))
      goto error;
    thr_lock_init(&share->lock);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
895
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
896 897 898 899 900 901 902 903
  }
  share->use_count++;
  pthread_mutex_unlock(&federated_mutex);

  return share;

error:
  pthread_mutex_unlock(&federated_mutex);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
904 905
  if (share->scheme)
    my_free((gptr) share->scheme, MYF(0));
906 907 908 909 910 911 912
  my_free((gptr) share, MYF(0));

  return NULL;
}


/*
913
  Free lock controls. We call this whenever we close a table.
914 915 916
  If the table had the last reference to the share then we
  free memory associated with it.
*/
917

918 919 920
static int free_share(FEDERATED_SHARE *share)
{
  pthread_mutex_lock(&federated_mutex);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
921

922 923
  if (!--share->use_count)
  {
924 925 926
    if (share->scheme)
      my_free((gptr) share->scheme, MYF(0));

927 928
    hash_delete(&federated_open_tables, (byte*) share);
    thr_lock_delete(&share->lock);
929
    VOID(pthread_mutex_destroy(&share->mutex));
930 931 932 933 934 935 936 937 938 939
    my_free((gptr) share, MYF(0));
  }
  pthread_mutex_unlock(&federated_mutex);

  return 0;
}


/*
  If frm_error() is called then we will use this to to find out
940 941
  what file extentions exist for the storage engine. This is
  also used by the default rename_table and delete_table method
942 943
  in handler.cc.
*/
944 945 946
static const char *ha_federated_exts[] = {
  NullS
};
947

948
const char **ha_federated::bas_ext() const
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
949
{
950
  return ha_federated_exts;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
951
}
952 953 954 955 956 957 958 959 960 961 962 963


/*
  Used for opening tables. The name will be the name of the file.
  A table is opened when it needs to be opened. For instance
  when a request comes in for a select on the table (tables are not
  open and closed for each request, they are cached).

  Called from handler.cc by handler::ha_open(). The server opens
  all tables by calling ha_open() which then calls the handler
  specific open().
*/
964

965 966 967
int ha_federated::open(const char *name, int mode, uint test_if_locked)
{
  int rc;
968
  DBUG_ENTER("ha_federated::open");
969 970 971

  if (!(share= get_share(name, table)))
    DBUG_RETURN(1);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
972
  thr_lock_data_init(&share->lock, &lock, NULL);
973

974
  /* Connect to foreign database mysql_real_connect() */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
975
  mysql= mysql_init(0);
976 977 978 979 980
  DBUG_PRINT("info", ("hostname %s", share->hostname));
  DBUG_PRINT("info", ("username %s", share->username));
  DBUG_PRINT("info", ("password %s", share->password));
  DBUG_PRINT("info", ("database %s", share->database));
  DBUG_PRINT("info", ("port %d", share->port));
981
  if (!mysql_real_connect(mysql,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
982 983 984
                          share->hostname,
                          share->username,
                          share->password,
985 986 987
                          share->database,
                          share->port,
                          share->socket, 0))
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
  {
    my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
    DBUG_RETURN(ER_CONNECT_TO_MASTER);
  }
  DBUG_RETURN(0);
}


/*
  Closes a table. We call the free_share() function to free any resources
  that we have allocated in the "shared" structure.

  Called from sql_base.cc, sql_select.cc, and table.cc.
  In sql_select.cc it is only used to close up temporary tables or during
  the process where a temporary table is converted over to being a
  myisam table.
  For sql_base.cc look at close_data_tables().
*/
1006

1007 1008 1009
int ha_federated::close(void)
{
  DBUG_ENTER("ha_federated::close");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1010

1011
  /* free the result set */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1012 1013
  if (result)
  {
1014
    DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1015 1016 1017 1018
               ("mysql_free_result result at address %lx", result));
    mysql_free_result(result);
    result= 0;
  }
1019 1020 1021 1022 1023 1024 1025 1026
  /* Disconnect from mysql */
  mysql_close(mysql);
  DBUG_RETURN(free_share(share));

}

/*

1027
  Checks if a field in a record is SQL NULL.
1028 1029 1030 1031 1032 1033 1034 1035 1036

  SYNOPSIS
    field_in_record_is_null()
      table     TABLE pointer, MySQL table object
      field     Field pointer, MySQL field object
      record    char pointer, contains record

    DESCRIPTION
      This method uses the record format information in table to track
1037
      the null bit in record.
1038 1039 1040

    RETURN VALUE
      1    if NULL
1041
      0    otherwise
1042
*/
1043

1044 1045 1046
inline uint field_in_record_is_null(TABLE *table,
                                    Field *field,
                                    char *record)
1047 1048 1049 1050 1051 1052 1053
{
  int null_offset;
  DBUG_ENTER("ha_federated::field_in_record_is_null");

  if (!field->null_ptr)
    DBUG_RETURN(0);

1054
  null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]);
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074

  if (record[null_offset] & field->null_bit)
    DBUG_RETURN(1);

  DBUG_RETURN(0);
}

/*
  write_row() inserts a row. No extra() hint is given currently if a bulk load
  is happeneding. buf() is a byte array of data. You can use the field
  information to extract the data from the native byte array type.
  Example of this would be:
  for (Field **field=table->field ; *field ; field++)
  {
    ...
  }

  Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
  sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
*/
1075

1076
int ha_federated::write_row(byte *buf)
1077
{
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1078
  uint x= 0, num_fields= 0;
1079
  Field **field;
1080
  ulong current_query_id= 1;
1081
  ulong tmp_query_id= 1;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1082
  uint all_fields_have_same_query_id= 1;
1083 1084 1085 1086

  char insert_buffer[IO_SIZE];
  char values_buffer[IO_SIZE], insert_field_value_buffer[IO_SIZE];

1087
  /* The main insert query string */
1088 1089
  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
  insert_string.length(0);
1090
  /* The string containing the values to be added to the insert */
1091 1092
  String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
  values_string.length(0);
1093
  /* The actual value of the field, to be added to the values_string */
1094
  String insert_field_value_string(insert_field_value_buffer,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1095 1096
                                   sizeof(insert_field_value_buffer),
                                   &my_charset_bin);
1097 1098 1099
  insert_field_value_string.length(0);

  DBUG_ENTER("ha_federated::write_row");
1100
  DBUG_PRINT("info", ("table charset name %s csname %s",
1101
                                         table->s->table_charset->name, table->s->table_charset->csname));
1102

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1103
  statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
1104 1105 1106 1107
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();

  /*
1108 1109 1110
    get the current query id - the fields that we add to the insert
    statement to send to the foreign will not be appended unless they match
    this query id
1111 1112
  */
  current_query_id= table->in_use->query_id;
1113
  DBUG_PRINT("info", ("current query id %d", current_query_id));
1114

1115
  /* start off our string */
1116
  insert_string.append("INSERT INTO `");
1117
  insert_string.append(share->table_base_name);
1118
  insert_string.append("`");
1119
  /* start both our field and field values strings */
1120 1121 1122
  insert_string.append(" (");
  values_string.append(" VALUES (");

1123 1124 1125 1126
  /*
    Even if one field is different, all_fields_same_query_id can't remain
    0 if it remains 0, then that means no fields were specified in the query
    such as in the case of INSERT INTO table VALUES (val1, val2, valN)
1127
  */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1128
  for (field= table->field; *field; field++, x++)
1129 1130 1131 1132 1133 1134 1135
  {
    if (x > 0 && tmp_query_id != (*field)->query_id)
      all_fields_have_same_query_id= 0;

    tmp_query_id= (*field)->query_id;
  }
  /*
1136 1137
    loop through the field pointer array, add any fields to both the values
    list and the fields list that match the current query id
1138
  */
1139
  x=0;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1140
  for (field= table->field; *field; field++, x++)
1141
  {
1142
    /* if there is a query id and if it's equal to the current query id */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1143 1144
    if (((*field)->query_id && (*field)->query_id == current_query_id)
        || all_fields_have_same_query_id)
1145 1146
    {
      num_fields++;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1147

1148 1149
      if ((*field)->is_null())
      {
1150 1151 1152
        DBUG_PRINT("info",
                   ("column %d current query id %d field is_null query id %d",
                    x, current_query_id, (*field)->query_id));
1153 1154 1155 1156
        insert_field_value_string.append("NULL");
      }
      else
      {
1157 1158 1159
        DBUG_PRINT("info",
                   ("column %d current query id %d field is not null query ID %d",
                    x, current_query_id, (*field)->query_id));
1160
        (*field)->val_str(&insert_field_value_string);
1161
        /* quote these fields if they require it */
1162
        (*field)->quote_data(&insert_field_value_string); }
1163
      /* append the field name */
1164 1165
      insert_string.append((*field)->field_name);

1166
      /* append the value */
1167 1168 1169
      values_string.append(insert_field_value_string);
      insert_field_value_string.length(0);

1170
      /* append commas between both fields and fieldnames */
1171 1172 1173 1174 1175 1176 1177
      insert_string.append(',');
      values_string.append(',');

    }
  }

  /*
1178 1179 1180 1181
    chop of the trailing comma, or if there were no fields, a '('
    So, "INSERT INTO foo (" becomes "INSERT INTO foo "
    or, with fields, "INSERT INTO foo (field1, field2," becomes
    "INSERT INTO foo (field1, field2"
1182 1183 1184 1185
  */
  insert_string.chop();

  /*
1186 1187 1188
    if there were no fields, we don't want to add a closing paren
    AND, we don't want to chop off the last char '('
    insert will be "INSERT INTO t1 VALUES ();"
1189
  */
1190
  DBUG_PRINT("info", ("x %d  num fields %d", x, num_fields));
1191 1192
  if (num_fields > 0)
  {
1193
    /* chops off leading commas */
1194 1195 1196
    values_string.chop();
    insert_string.append(')');
  }
1197
  /* we always want to append this, even if there aren't any fields */
1198
  values_string.append(')');
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1199

1200
  /* add the values */
1201 1202
  insert_string.append(values_string);

1203
  DBUG_PRINT("info", ("insert query %s", insert_string.c_ptr_quick()));
1204

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1205
  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
1206
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1207 1208
    my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql));
    DBUG_RETURN(ER_QUERY_ON_MASTER);
1209 1210 1211 1212 1213
  }

  DBUG_RETURN(0);
}

1214

1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230
/*
  Yes, update_row() does what you expect, it updates a row. old_data will have
  the previous row record in it, while new_data will have the newest data in
  it.

  Keep in mind that the server can do updates based on ordering if an ORDER BY
  clause was used. Consecutive ordering is not guarenteed.
  Currently new_data will not have an updated auto_increament record, or
  and updated timestamp field. You can do these for federated by doing these:
  if (table->timestamp_on_update_now)
    update_timestamp(new_row+table->timestamp_on_update_now-1);
  if (table->next_number_field && record == table->record[0])
    update_auto_increment();

  Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
*/
1231

1232
int ha_federated::update_row(const byte *old_data, byte *new_data)
1233
{
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1234
  uint x= 0;
1235
  uint has_a_primary_key= 0;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1236
  uint primary_key_field_num;
1237 1238 1239
  char old_field_value_buffer[IO_SIZE], new_field_value_buffer[IO_SIZE];
  char update_buffer[IO_SIZE], where_buffer[IO_SIZE];

1240
  /* stores the value to be replaced of the field were are updating */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1241 1242
  String old_field_value(old_field_value_buffer, sizeof(old_field_value_buffer),
                         &my_charset_bin);
1243
  /* stores the new value of the field */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1244 1245
  String new_field_value(new_field_value_buffer, sizeof(new_field_value_buffer),
                         &my_charset_bin);
1246
  /* stores the update query */
1247
  String update_string(update_buffer, sizeof(update_buffer), &my_charset_bin);
1248
  /* stores the WHERE clause */
1249 1250 1251
  String where_string(where_buffer, sizeof(where_buffer), &my_charset_bin);

  DBUG_ENTER("ha_federated::update_row");
1252 1253 1254 1255
  old_field_value.length(0);
  new_field_value.length(0);
  update_string.length(0);
  where_string.length(0);
1256

1257
  has_a_primary_key= (table->s->primary_key == 0 ? 1 : 0);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1258 1259
  primary_key_field_num= has_a_primary_key ?
    table->key_info[table->s->primary_key].key_part->fieldnr - 1 : -1;
1260
  if (has_a_primary_key)
1261
    DBUG_PRINT("info", ("has a primary key"));
1262

1263
  update_string.append("UPDATE `");
1264
  update_string.append(share->table_base_name);
1265
  update_string.append("`");
1266 1267 1268
  update_string.append(" SET ");

/*
1269 1270
  In this loop, we want to match column names to values being inserted
  (while building INSERT statement).
1271

1272 1273 1274 1275
  Iterate through table->field (new data) and share->old_filed (old_data)
  using the same index to created an SQL UPDATE statement, new data is
  used to create SET field=value and old data is used to create WHERE
  field=oldvalue
1276
 */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1277

1278
  for (Field **field= table->field; *field; field++, x++)
1279 1280
  {
    /*
1281 1282 1283 1284 1285
      In all of these tests for 'has_a_primary_key', what I'm trying to
      accomplish is to only use the primary key in the WHERE clause if the
      table has a primary key, as opposed to a table without a primary key
      in which case we have to use all the fields to create a WHERE clause
      using the old/current values, as well as adding a LIMIT statement
1286
    */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1287
    if (has_a_primary_key)
1288
    {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1289
      if (x == primary_key_field_num)
1290 1291 1292 1293 1294 1295 1296 1297 1298
        where_string.append((*field)->field_name);
    }
    else
      where_string.append((*field)->field_name);

    update_string.append((*field)->field_name);
    update_string.append('=');

    if ((*field)->is_null())
1299 1300
    {
      DBUG_PRINT("info", ("column %d is NULL", x ));
1301
      new_field_value.append("NULL");
1302
    }
1303 1304
    else
    {
1305
      /* otherwise = */
1306
      (*field)->val_str(&new_field_value);
1307
      (*field)->quote_data(&new_field_value);
1308

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1309
      if (has_a_primary_key)
1310 1311 1312 1313
      {
        if (x == primary_key_field_num)
          where_string.append("=");
      }
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1314 1315
      else if (!field_in_record_is_null(table, *field, (char*) old_data))
        where_string.append("=");
1316 1317
    }

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1318
    if (has_a_primary_key)
1319 1320 1321 1322
    {
      if (x == primary_key_field_num)
      {
        (*field)->val_str(&old_field_value,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1323
                          (char*) (old_data + (*field)->offset()));
1324
        (*field)->quote_data(&old_field_value);
1325 1326 1327 1328 1329
        where_string.append(old_field_value);
      }
    }
    else
    {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1330 1331 1332 1333
      if (field_in_record_is_null(table, *field, (char*) old_data))
        where_string.append(" IS NULL ");
      else
      {
1334
        uint o_len;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1335 1336
        (*field)->val_str(&old_field_value,
                          (char*) (old_data + (*field)->offset()));
1337 1338
        o_len= (*field)->pack_length();
        DBUG_PRINT("info", ("o_len %lu", o_len));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1339 1340 1341
        (*field)->quote_data(&old_field_value);
        where_string.append(old_field_value);
      }
1342
    }
1343 1344 1345
    DBUG_PRINT("info",
               ("column %d new value %s old value %s",
                x, new_field_value.c_ptr_quick(), old_field_value.c_ptr_quick() ));
1346 1347 1348
    update_string.append(new_field_value);
    new_field_value.length(0);

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1349
    if (x + 1 < table->s->fields)
1350 1351
    {
      update_string.append(", ");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1352
      if (!has_a_primary_key)
1353 1354 1355 1356 1357
        where_string.append(" AND ");
    }
    old_field_value.length(0);
  }
  update_string.append(" WHERE ");
1358
  update_string.append(where_string);
1359 1360 1361
  if (! has_a_primary_key)
    update_string.append(" LIMIT 1");

1362
  DBUG_PRINT("info", ("Final update query: %s",
1363
                                          update_string.c_ptr_quick()));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1364
  if (mysql_real_query(mysql, update_string.ptr(), update_string.length()))
1365
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1366 1367
    my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql));
    DBUG_RETURN(ER_QUERY_ON_MASTER);
1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383
  }


  DBUG_RETURN(0);
}

/*
  This will delete a row. 'buf' will contain a copy of the row to be deleted.
  The server will call this right after the current row has been called (from
  either a previous rnd_nexT() or index call).
  If you keep a pointer to the last row or can access a primary key it will
  make doing the deletion quite a bit easier.
  Keep in mind that the server does no guarentee consecutive deletions.
  ORDER BY clauses can be used.

  Called in sql_acl.cc and sql_udf.cc to manage internal table information.
1384
  Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
1385 1386 1387
  it is used for removing duplicates while in insert it is used for REPLACE
  calls.
*/
1388

1389
int ha_federated::delete_row(const byte *buf)
1390
{
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1391
  uint x= 0;
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
  char delete_buffer[IO_SIZE];
  char data_buffer[IO_SIZE];

  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
  delete_string.length(0);
  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
  data_string.length(0);

  DBUG_ENTER("ha_federated::delete_row");

1402
  delete_string.append("DELETE FROM `");
1403
  delete_string.append(share->table_base_name);
1404
  delete_string.append("`");
1405 1406
  delete_string.append(" WHERE ");

1407
  for (Field **field= table->field; *field; field++, x++)
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419
  {
    delete_string.append((*field)->field_name);

    if ((*field)->is_null())
    {
      delete_string.append(" IS ");
      data_string.append("NULL");
    }
    else
    {
      delete_string.append("=");
      (*field)->val_str(&data_string);
1420
      (*field)->quote_data(&data_string);
1421
    }
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1422

1423
    delete_string.append(data_string);
1424
    data_string.length(0);
1425

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1426
    if (x + 1 < table->s->fields)
1427 1428 1429 1430
      delete_string.append(" AND ");
  }

  delete_string.append(" LIMIT 1");
1431
  DBUG_PRINT("info",
1432
             ("Delete sql: %s", delete_string.c_ptr_quick()));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1433
  if (mysql_real_query(mysql, delete_string.ptr(), delete_string.length()))
1434
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1435 1436
    my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql));
    DBUG_RETURN(ER_QUERY_ON_MASTER);
1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
  }

  DBUG_RETURN(0);
}


/*
  Positions an index cursor to the index specified in the handle. Fetches the
  row if available. If the key value is null, begin at the first key of the
  index. This method, which is called in the case of an SQL statement having
  a WHERE clause on a non-primary key index, simply calls index_read_idx.
*/
1449

1450
int ha_federated::index_read(byte *buf, const byte *key,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1451 1452 1453
                             uint key_len __attribute__ ((unused)),
                             enum ha_rkey_function find_flag
                             __attribute__ ((unused)))
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
{
  DBUG_ENTER("ha_federated::index_read");
  DBUG_RETURN(index_read_idx(buf, active_index, key, key_len, find_flag));
}


/*
  Positions an index cursor to the index specified in key. Fetches the
  row if any.  This is only used to read whole keys.

  This method is called via index_read in the case of a WHERE clause using
1465
  a regular non-primary key index, OR is called DIRECTLY when the WHERE clause
1466 1467
  uses a PRIMARY KEY index.
*/
1468

1469
int ha_federated::index_read_idx(byte *buf, uint index, const byte *key,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1470 1471 1472
                                 uint key_len __attribute__ ((unused)),
                                 enum ha_rkey_function find_flag
                                 __attribute__ ((unused)))
1473 1474
{
  char index_value[IO_SIZE];
1475 1476
  char key_value[IO_SIZE];
  char test_value[IO_SIZE];
1477 1478
  String index_string(index_value, sizeof(index_value), &my_charset_bin);
  index_string.length(0);
1479
  uint keylen;
1480 1481 1482 1483 1484 1485

  char sql_query_buffer[IO_SIZE];
  String sql_query(sql_query_buffer, sizeof(sql_query_buffer), &my_charset_bin);
  sql_query.length(0);

  DBUG_ENTER("ha_federated::index_read_idx");
1486

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1487 1488
  statistic_increment(table->in_use->status_var.ha_read_key_count,
                      &LOCK_status);
1489 1490 1491 1492

  sql_query.append(share->select_query);
  sql_query.append(" WHERE ");

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1493
  keylen= strlen((char*) (key));
1494
  create_where_from_key(&index_string, &table->key_info[index], key, keylen);
1495 1496
  sql_query.append(index_string);

1497
  DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1498
             ("current key %d key value %s index_string value %s length %d",
1499
              index, (char*) key, index_string.c_ptr_quick(),
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1500
              index_string.length()));
1501

1502
  DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1503 1504
             ("current position %d sql_query %s", current_position,
              sql_query.c_ptr_quick()));
1505

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1506
  if (result)
1507
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1508 1509 1510 1511
    mysql_free_result(result);
    result= 0;
  }
  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
1512
  {
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1513 1514
    my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql));
    DBUG_RETURN(ER_QUERY_ON_MASTER);
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526
  }
  result= mysql_store_result(mysql);

  if (!result)
  {
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  }

  if (mysql_errno(mysql))
  {
    table->status= STATUS_NOT_FOUND;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1527
    DBUG_RETURN(mysql_errno(mysql));
1528 1529 1530 1531 1532
  }

  DBUG_RETURN(rnd_next(buf));
}

1533
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
1534 1535 1536 1537
int ha_federated::index_init(uint keynr)
{
  int error;
  DBUG_ENTER("ha_federated::index_init");
1538
  DBUG_PRINT("info",
1539
             ("table: '%s'  key: %d", table->s->table_name, keynr));
1540 1541 1542 1543
  active_index= keynr;
  DBUG_RETURN(0);
}

1544
/* Used to read forward through the index.  */
1545
int ha_federated::index_next(byte *buf)
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563
{
  DBUG_ENTER("ha_federated::index_next");
  DBUG_RETURN(rnd_next(buf));
}


/*
  rnd_init() is called when the system wants the storage engine to do a table
  scan.

  This is the method that gets data for the SELECT calls.

  See the federated in the introduction at the top of this file to see when
  rnd_init() is called.

  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
  sql_table.cc, and sql_update.cc.
*/
1564

1565 1566 1567 1568 1569
int ha_federated::rnd_init(bool scan)
{
  DBUG_ENTER("ha_federated::rnd_init");
  int num_fields, rows;

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1570
  /*
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602
    This 'scan' flag is incredibly important for this handler to work
    properly, especially with updates containing WHERE clauses using
    indexed columns.

    When the initial query contains a WHERE clause of the query using an
    indexed column, it's index_read_idx that selects the exact record from
    the foreign database.

    When there is NO index in the query, either due to not having a WHERE
    clause, or the WHERE clause is using columns that are not indexed, a
    'full table scan' done by rnd_init, which in this situation simply means
    a 'select * from ...' on the foreign table.

    In other words, this 'scan' flag gives us the means to ensure that if
    there is an index involved in the query, we want index_read_idx to
    retrieve the exact record (scan flag is 0), and do not  want rnd_init
    to do a 'full table scan' and wipe out that result set.

    Prior to using this flag, the problem was most apparent with updates.

    An initial query like 'UPDATE tablename SET anything = whatever WHERE
    indexedcol = someval', index_read_idx would get called, using a query
    constructed with a WHERE clause built from the values of index ('indexcol'
    in this case, having a value of 'someval').  mysql_store_result would
    then get called (this would be the result set we want to use).

    After this rnd_init (from sql_update.cc) would be called, it would then
    unecessarily call "select * from table" on the foreign table, then call
    mysql_store_result, which would wipe out the correct previous result set
    from the previous call of index_read_idx's that had the result set
    containing the correct record, hence update the wrong row!

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1603 1604 1605
  */
  scan_flag= scan;
  if (scan)
1606
  {
1607
    DBUG_PRINT("info", ("share->select_query %s", share->select_query));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1608 1609
    if (result)
    {
1610
      DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1611 1612 1613 1614
                 ("mysql_free_result address %lx", result));
      mysql_free_result(result);
      result= 0;
    }
1615

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626
    if (mysql_real_query
        (mysql, share->select_query, strlen(share->select_query)))
    {
      my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql));
      DBUG_RETURN(ER_QUERY_ON_MASTER);
    }
    result= mysql_store_result(mysql);

    if (mysql_errno(mysql))
      DBUG_RETURN(mysql_errno(mysql));
  }
1627 1628 1629 1630 1631 1632
  DBUG_RETURN(0);
}

int ha_federated::rnd_end()
{
  DBUG_ENTER("ha_federated::rnd_end");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1633 1634
  if (result)
  {
1635
    DBUG_PRINT("info", ("mysql_free_result address %lx", result));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1636 1637 1638 1639
    mysql_free_result(result);
    result= 0;
  }

1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
  mysql_free_result(result);
  DBUG_RETURN(index_end());
}

int ha_federated::index_end(void)
{
  DBUG_ENTER("ha_federated::index_end");
  active_index= MAX_KEY;
  DBUG_RETURN(0);
}

/*
  This is called for each row of the table scan. When you run out of records
  you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
  The Field structure for the table is the key to getting data into buf
  in a manner that will allow the server to understand it.

  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
  sql_table.cc, and sql_update.cc.
*/
1660

1661 1662 1663 1664 1665
int ha_federated::rnd_next(byte *buf)
{
  MYSQL_ROW row;
  DBUG_ENTER("ha_federated::rnd_next");

1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
  if (result == 0)
  {
    /*
      Return value of rnd_init is not always checked (see records.cc),
      so we can get here _even_ if there is _no_ pre-fetched result-set!
      TODO: fix it.
      */
    DBUG_RETURN(1);
  }
 
1676
  /* Fetch a row, insert it back in a row format. */
1677
  current_position= result->data_cursor;
1678
  DBUG_PRINT("info", ("current position %d", current_position));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1679
  if (!(row= mysql_fetch_row(result)))
1680
    DBUG_RETURN(HA_ERR_END_OF_FILE);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1681 1682

  DBUG_RETURN(convert_row_to_internal_format(buf, row));
1683 1684 1685 1686 1687 1688
}


/*
  'position()' is called after each call to rnd_next() if the data needs to be
  ordered. You can do something like the following to store the position:
petr@mysql.com's avatar
petr@mysql.com committed
1689
  my_store_ptr(ref, ref_length, current_position);
1690 1691 1692 1693 1694 1695 1696 1697 1698

  The server uses ref to store data. ref_length in the above case is the size
  needed to store current_position. ref is just a byte array that the server
  will maintain. If you are using offsets to mark rows, then current_position
  should be the offset. If it is a primary key like in BDB, then it needs to
  be a primary key.

  Called from filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc.
*/
1699

1700 1701 1702
void ha_federated::position(const byte *record)
{
  DBUG_ENTER("ha_federated::position");
1703
  /* my_store_ptr Add seek storage */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1704
  *(MYSQL_ROW_OFFSET *) ref= current_position;  // ref is always aligned
1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
  DBUG_VOID_RETURN;
}


/*
  This is like rnd_next, but you are given a position to use to determine the
  row. The position will be of the type that you stored in ref. You can use
  ha_get_ptr(pos,ref_length) to retrieve whatever key or position you saved
  when position() was called.

  This method is required for an ORDER BY.

  Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
*/
1719
int ha_federated::rnd_pos(byte *buf, byte *pos)
1720 1721
{
  DBUG_ENTER("ha_federated::rnd_pos");
1722 1723 1724 1725
  /*
    we do not need to do any of this if there has been a scan performed
    already, or if this is an update and index_read_idx already has a result
    set in which to build it's update query from
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1726 1727 1728 1729 1730
  */
  if (scan_flag)
  {
    statistic_increment(table->in_use->status_var.ha_read_rnd_count,
                        &LOCK_status);
1731
    memcpy_fixed(&current_position, pos, sizeof(MYSQL_ROW_OFFSET));  // pos
1732
    /* is not aligned */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1733 1734 1735 1736 1737
    result->current_row= 0;
    result->data_cursor= current_position;
    DBUG_RETURN(rnd_next(buf));
  }
  DBUG_RETURN(0);
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
}


/*
  ::info() is used to return information to the optimizer.
  Currently this table handler doesn't implement most of the fields
  really needed. SHOW also makes use of this data
  Another note, you will probably want to have the following in your
  code:
  if (records < 2)
    records = 2;
  The reason is that the server will optimize for cases of only a single
  record. If in a table scan you don't know the number of records
  it will probably be better to set records to two so you can return
  as many records as you need.
  Along with records a few more variables you may wish to set are:
    records
    deleted
    data_file_length
    index_file_length
    delete_length
    check_time
  Take a look at the public variables in handler.h for more information.

  Called in:
    filesort.cc
    ha_heap.cc
    item_sum.cc
    opt_sum.cc
    sql_delete.cc
    sql_delete.cc
    sql_derived.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_show.cc
    sql_show.cc
    sql_show.cc
    sql_show.cc
    sql_table.cc
    sql_union.cc
    sql_update.cc

*/
1784
/* FIX: later version provide better information to the optimizer */
1785

1786 1787 1788
void ha_federated::info(uint flag)
{
  DBUG_ENTER("ha_federated::info");
1789
  records= 10000; // fix later
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804
  DBUG_VOID_RETURN;
}


/*
  Used to delete all rows in a table. Both for cases of truncate and
  for cases where the optimizer realizes that all rows will be
  removed as a result of a SQL statement.

  Called from item_sum.cc by Item_func_group_concat::clear(),
  Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
  Called from sql_delete.cc by mysql_delete().
  Called from sql_select.cc by JOIN::reinit().
  Called from sql_union.cc by st_select_lex_unit::exec().
*/
1805

1806 1807 1808 1809 1810 1811 1812 1813 1814
int ha_federated::delete_all_rows()
{
  DBUG_ENTER("ha_federated::delete_all_rows");

  char query_buffer[IO_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  query.length(0);

  query.set_charset(system_charset_info);
1815
  query.append("TRUNCATE `");
1816
  query.append(share->table_base_name);
1817
  query.append("`");
1818

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1819 1820 1821 1822
  if (mysql_real_query(mysql, query.ptr(), query.length()))
  {
    my_error(ER_QUERY_ON_MASTER, MYF(0), mysql_error(mysql));
    DBUG_RETURN(ER_QUERY_ON_MASTER);
1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857
  }

  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}


/*
  The idea with handler::store_lock() is the following:

  The statement decided which locks we should need for the table
  for updates/deletes/inserts we get WRITE locks, for SELECT... we get
  read locks.

  Before adding the lock into the table lock handler (see thr_lock.c)
  mysqld calls store lock with the requested locks.  Store lock can now
  modify a write lock to a read lock (or some other lock), ignore the
  lock (if we don't want to use MySQL table locks at all) or add locks
  for many tables (like we do when we are using a MERGE handler).

  Berkeley DB for federated  changes all WRITE locks to TL_WRITE_ALLOW_WRITE
  (which signals that we are doing WRITES, but we are still allowing other
  reader's and writer's.

  When releasing locks, store_lock() are also called. In this case one
  usually doesn't have to do anything.

  In some exceptional cases MySQL may send a request for a TL_IGNORE;
  This means that we are requesting the same lock as last time and this
  should also be ignored. (This may happen when someone does a flush
  table when we have opened a part of the tables, in which case mysqld
  closes and reopens the tables and tries to get the same locks at last
  time).  In the future we will probably try to remove this.

  Called from lock.cc by get_lock_data().
*/
1858

1859
THR_LOCK_DATA **ha_federated::store_lock(THD *thd,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1860 1861
                                         THR_LOCK_DATA **to,
                                         enum thr_lock_type lock_type)
1862
{
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1863
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1864
  {
1865 1866 1867 1868 1869
    /*
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
      TABLESPACE, then allow multiple writers
1870 1871 1872
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1873
         lock_type <= TL_WRITE) && !thd->in_lock_tables && !thd->tablespace_op)
1874 1875
      lock_type= TL_WRITE_ALLOW_WRITE;

1876 1877 1878 1879 1880 1881
    /*
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2.
1882 1883
    */

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1884
    if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
      lock_type= TL_READ;

    lock.type= lock_type;
  }

  *to++= &lock;

  return to;
}

/*
  create() does nothing, since we have no local setup of our own.
1897
  FUTURE: We should potentially connect to the foreign database and
1898
*/
1899

1900
int ha_federated::create(const char *name, TABLE *table_arg,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1901
                         HA_CREATE_INFO *create_info)
1902
{
1903
  int connection_error=0;
1904 1905
  FEDERATED_SHARE tmp;
  DBUG_ENTER("ha_federated::create");
1906

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1907
  if (parse_url(&tmp, table_arg, 1))
1908
  {
1909 1910
    my_error(ER_CANT_CREATE_TABLE, MYF(0), name, 1);
    goto error;
1911
  }
1912 1913 1914 1915 1916 1917
  if ((connection_error= check_foreign_data_source(&tmp)))
  {
    my_error(connection_error, MYF(0), name, 1);
    goto error;
  }
  
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1918
  my_free((gptr) tmp.scheme, MYF(0));
1919
  DBUG_RETURN(0);
1920 1921
  
error:
1922
  DBUG_PRINT("info", ("errors, returning %d", ER_CANT_CREATE_TABLE));
1923 1924 1925
  my_free((gptr) tmp.scheme, MYF(0));
  DBUG_RETURN(ER_CANT_CREATE_TABLE);

1926 1927
}
#endif /* HAVE_FEDERATED_DB */