ha_federated.cc 85.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/* Copyright (C) 2004 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2 of the License, or
  (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/*

  MySQL Federated Storage Engine

  ha_federated.cc - MySQL Federated Storage Engine
  Patrick Galbraith and Brian Aker, 2004

24
  This is a handler which uses a foreign database as the data file, as
25 26 27 28
  opposed to a handler like MyISAM, which uses .MYD files locally.

  How this handler works
  ----------------------------------
29 30
  Normal database files are local and as such: You create a table called
  'users', a file such as 'users.MYD' is created. A handler reads, inserts,
31 32
  deletes, updates data in this file. The data is stored in particular format,
  so to read, that data has to be parsed into fields, to write, fields have to
33
  be stored in this format to write to this data file.
34

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
35 36 37 38 39 40 41 42
  With MySQL Federated storage engine, there will be no local files
  for each table's data (such as .MYD). A foreign database will store
  the data that would normally be in this file. This will necessitate
  the use of MySQL client API to read, delete, update, insert this
  data. The data will have to be retrieve via an SQL call "SELECT *
  FROM users". Then, to read this data, it will have to be retrieved
  via mysql_fetch_row one row at a time, then converted from the
  column in this select into the format that the handler expects.
43

44 45
  The create table will simply create the .frm file, and within the
  "CREATE TABLE" SQL, there SHALL be any of the following :
46

47 48 49 50 51 52 53 54 55 56 57
  comment=scheme://username:password@hostname:port/database/tablename
  comment=scheme://username@hostname/database/tablename
  comment=scheme://username:password@hostname/database/tablename
  comment=scheme://username:password@hostname/database/tablename

  An example would be:

  comment=mysql://username:password@hostname:port/database/tablename

  ***IMPORTANT***

58
  This is a first release, conceptual release
59 60 61
  Only 'mysql://' is supported at this release.


62 63
  This comment connection string is necessary for the handler to be
  able to connect to the foreign server.
64 65 66 67 68


  The basic flow is this:

  SQL calls issues locally ->
69 70 71
  mysql handler API (data in handler format) ->
  mysql client API (data converted to SQL calls) ->
  foreign database -> mysql client API ->
72 73 74 75 76
  convert result sets (if any) to handler format ->
  handler API -> results or rows affected to local

  What this handler does and doesn't support
  ------------------------------------------
77 78
  * Tables MUST be created on the foreign server prior to any action on those
    tables via the handler, first version. IMPORTANT: IF you MUST use the
79
    federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
80
    the table you connect to IS NOT a table pointing BACK to your ORIGNAL
81
    table! You know  and have heard the screaching of audio feedback? You
82
    know putting two mirror in front of each other how the reflection
83 84
    continues for eternity? Well, need I say more?!
  * There will not be support for transactions.
85
  * There is no way for the handler to know if the foreign database or table
86
    has changed. The reason for this is that this database has to work like a
87 88 89
    data file that would never be written to by anything other than the
    database. The integrity of the data in the local table could be breached
    if there was any change to the foreign database.
90
  * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
91
  * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
92
  * Prepared statements will not be used in the first implementation, it
93
    remains to to be seen whether the limited subset of the client API for the
94
    server supports this.
95 96
  * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
    implementation.
97
  * This will not work with the query cache.
98 99 100 101 102 103 104

   Method calls

   A two column table, with one record:

   (SELECT)

105
   "SELECT * FROM foo"
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    ha_federated::info
    ha_federated::scan_time:
    ha_federated::rnd_init: share->select_query SELECT * FROM foo
    ha_federated::extra

    <for every row of data retrieved>
    ha_federated::rnd_next
    ha_federated::convert_row_to_internal_format
    ha_federated::rnd_next
    </for every row of data retrieved>

    ha_federated::rnd_end
    ha_federated::extra
    ha_federated::reset

    (INSERT)

    "INSERT INTO foo (id, ts) VALUES (2, now());"

    ha_federated::write_row

    <for every field/column>
128 129
    Field::quote_data
    Field::quote_data
130 131 132 133 134 135 136 137 138 139 140
    </for every field/column>

    ha_federated::reset

    (UPDATE)

    "UPDATE foo SET ts = now() WHERE id = 1;"

    ha_federated::index_init
    ha_federated::index_read
    ha_federated::index_read_idx
141
    Field::quote_data
142 143 144
    ha_federated::rnd_next
    ha_federated::convert_row_to_internal_format
    ha_federated::update_row
145

146
    <quote 3 cols, new and old data>
147 148 149 150 151 152
    Field::quote_data
    Field::quote_data
    Field::quote_data
    Field::quote_data
    Field::quote_data
    Field::quote_data
153
    </quote 3 cols, new and old data>
154

155 156 157 158 159 160 161 162 163 164
    ha_federated::extra
    ha_federated::extra
    ha_federated::extra
    ha_federated::external_lock
    ha_federated::reset


    How do I use this handler?
    --------------------------
    First of all, you need to build this storage engine:
165

166 167 168
      ./configure --with-federated-storage-engine
      make

169
    Next, to use this handler, it's very simple. You must
170 171 172
    have two databases running, either both on the same host, or
    on different hosts.

173
    One the server that will be connecting to the foreign
174 175
    host (client), you create your table as such:

176
    CREATE TABLE test_table (
177 178 179 180 181
      id     int(20) NOT NULL auto_increment,
      name   varchar(32) NOT NULL default '',
      other  int(20) NOT NULL default '0',
      PRIMARY KEY  (id),
      KEY name (name),
182 183 184
      KEY other_key (other))
       ENGINE="FEDERATED"
       DEFAULT CHARSET=latin1
185 186
       COMMENT='root@127.0.0.1:9306/federated/test_federated';

187 188 189 190 191 192 193 194
   Notice the "COMMENT" and "ENGINE" field? This is where you
   respectively set the engine type, "FEDERATED" and foreign
   host information, this being the database your 'client' database
   will connect to and use as the "data file". Obviously, the foreign
   database is running on port 9306, so you want to start up your other
   database so that it is indeed on port 9306, and your federated
   database on a port other than that. In my setup, I use port 5554
   for federated, and port 5555 for the foreign database.
195

196
   Then, on the foreign database:
197

198
   CREATE TABLE test_table (
199 200 201 202 203
     id     int(20) NOT NULL auto_increment,
     name   varchar(32) NOT NULL default '',
     other  int(20) NOT NULL default '0',
     PRIMARY KEY  (id),
     KEY name (name),
204
     KEY other_key (other))
205 206 207 208
     ENGINE="<NAME>" <-- whatever you want, or not specify
     DEFAULT CHARSET=latin1 ;

    This table is exactly the same (and must be exactly the same),
209
    except that it is not using the federated handler and does
210
    not need the URL.
211

212 213 214 215 216 217 218

    How to see the handler in action
    --------------------------------

    When developing this handler, I compiled the federated database with
    debugging:

219
    ./configure --with-federated-storage-engine
220 221 222 223
    --prefix=/home/mysql/mysql-build/federated/ --with-debug

    Once compiled, I did a 'make install' (not for the purpose of installing
    the binary, but to install all the files the binary expects to see in the
224 225 226 227
    diretory I specified in the build with --prefix,
    "/home/mysql/mysql-build/federated".

    Then, I started the foreign server:
228

229
    /usr/local/mysql/bin/mysqld_safe
230 231 232 233 234 235 236 237 238 239 240 241 242
    --user=mysql --log=/tmp/mysqld.5555.log -P 5555

    Then, I went back to the directory containing the newly compiled mysqld,
    <builddir>/sql/, started up gdb:

    gdb ./mysqld

    Then, withn the (gdb) prompt:
    (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug

    Next, I open several windows for each:

    1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
243
    2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
244 245 246
    3. A window with a client open to the federated server on port 5554
    4. A window with a client open to the federated server on port 5555

247
    I would create a table on the client to the foreign server on port
248 249
    5555, and then to the federated server on port 5554. At this point,
    I would run whatever queries I wanted to on the federated server,
250
    just always remembering that whatever changes I wanted to make on
251
    the table, or if I created new tables, that I would have to do that
252 253 254
    on the foreign server.

    Another thing to look for is 'show variables' to show you that you have
255 256 257 258 259 260 261 262 263 264 265 266 267 268
    support for federated handler support:

    show variables like '%federat%'

    and:

    show storage engines;

    Both should display the federated storage handler.


    Testing
    -------

269
    There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
270 271 272
    federatedd.test It starts both a slave and master database using
    the same setup that the replication tests use, with the exception that
    it turns off replication, and sets replication to ignore the test tables.
273 274
    After ensuring that you actually do have support for the federated storage
    handler, numerous queries/inserts/updates/deletes are run, many derived
275 276 277 278
    from the MyISAM tests, plus som other tests which were meant to reveal
    any issues that would be most likely to affect this handler. All tests
    should work! ;)

279
    To run these tests, go into ./mysql-test (based in the directory you
280 281 282
    built the server in)

    ./mysql-test-run federatedd
283

284 285 286 287
    To run the test, or if you want to run the test and have debug info:

    ./mysql-test-run --debug federated

288
    This will run the test in debug mode, and you can view the trace and
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    log files in the ./mysql-test/var/log directory

    ls -l mysql-test/var/log/
    -rw-r--r--  1 patg  patg        17  4 Dec 12:27 current_test
    -rw-r--r--  1 patg  patg       692  4 Dec 12:52 manager.log
    -rw-rw----  1 patg  patg     21246  4 Dec 12:51 master-bin.000001
    -rw-rw----  1 patg  patg        68  4 Dec 12:28 master-bin.index
    -rw-r--r--  1 patg  patg      1620  4 Dec 12:51 master.err
    -rw-rw----  1 patg  patg     23179  4 Dec 12:51 master.log
    -rw-rw----  1 patg  patg  16696550  4 Dec 12:51 master.trace
    -rw-r--r--  1 patg  patg         0  4 Dec 12:28 mysqltest-time
    -rw-r--r--  1 patg  patg   2024051  4 Dec 12:51 mysqltest.trace
    -rw-rw----  1 patg  patg     94992  4 Dec 12:51 slave-bin.000001
    -rw-rw----  1 patg  patg        67  4 Dec 12:28 slave-bin.index
    -rw-rw----  1 patg  patg       249  4 Dec 12:52 slave-relay-bin.000003
    -rw-rw----  1 patg  patg        73  4 Dec 12:28 slave-relay-bin.index
    -rw-r--r--  1 patg  patg      1349  4 Dec 12:51 slave.err
    -rw-rw----  1 patg  patg     96206  4 Dec 12:52 slave.log
    -rw-rw----  1 patg  patg  15706355  4 Dec 12:51 slave.trace
    -rw-r--r--  1 patg  patg         0  4 Dec 12:51 warnings

    Of course, again, you can tail the trace log:

312
    tail -f mysql-test/var/log/master.trace |grep ha_fed
313 314

    As well as the slave query log:
315

316 317 318 319 320 321 322 323 324 325 326 327 328
    tail -f mysql-test/var/log/slave.log

    Files that comprise the test suit
    ---------------------------------
    mysql-test/t/federated.test
    mysql-test/r/federated.result
    mysql-test/r/have_federated_db.require
    mysql-test/include/have_federated_db.inc


    Other tidbits
    -------------

329
    These were the files that were modified or created for this
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
    Federated handler to work:

    ./configure.in
    ./sql/Makefile.am
    ./config/ac_macros/ha_federated.m4
    ./sql/handler.cc
    ./sql/mysqld.cc
    ./sql/set_var.cc
    ./sql/field.h
    ./sql/sql_string.h
    ./mysql-test/mysql-test-run(.sh)
    ./mysql-test/t/federated.test
    ./mysql-test/r/federated.result
    ./mysql-test/r/have_federated_db.require
    ./mysql-test/include/have_federated_db.inc
    ./sql/ha_federated.cc
    ./sql/ha_federated.h
347

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
348
*/
349

350 351
#include "mysql_priv.h"
#ifdef USE_PRAGMA_IMPLEMENTATION
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
352
#pragma implementation                          // gcc: Class implementation
353 354
#endif

acurtis@xiphis.org's avatar
acurtis@xiphis.org committed
355
#ifdef WITH_FEDERATED_STORAGE_ENGINE
356
#include "ha_federated.h"
357 358

#include "m_string.h"
acurtis@xiphis.org's avatar
acurtis@xiphis.org committed
359 360 361

#include <mysql/plugin.h>

362
/* Variables for federated share methods */
363 364 365
static HASH federated_open_tables;              // To track open tables
pthread_mutex_t federated_mutex;                // To init the hash
static int federated_init= FALSE;               // Checking the state of hash
366

367
/* Static declaration for handerton */
368 369
static handler *federated_create_handler(TABLE_SHARE *table,
                                         MEM_ROOT *mem_root);
370 371
static int federated_commit(THD *thd, bool all);
static int federated_rollback(THD *thd, bool all);
372

373 374
/* Federated storage engine handlerton */

375
handlerton federated_hton;
376

377 378
static handler *federated_create_handler(TABLE_SHARE *table,
                                         MEM_ROOT *mem_root)
379
{
380
  return new (mem_root) ha_federated(table);
381 382 383
}


384
/* Function we use in the creation of our hash to get key */
385

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
386 387
static byte *federated_get_key(FEDERATED_SHARE *share, uint *length,
                               my_bool not_used __attribute__ ((unused)))
388
{
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
389 390
  *length= share->connect_string_length;
  return (byte*) share->scheme;
391 392
}

393 394 395 396 397 398 399 400 401 402 403 404
/*
  Initialize the federated handler.

  SYNOPSIS
    federated_db_init()
    void

  RETURN
    FALSE       OK
    TRUE        Error
*/

405
int federated_db_init()
406
{
407
  DBUG_ENTER("federated_db_init");
408 409 410 411 412 413 414 415 416

  federated_hton.state= SHOW_OPTION_YES;
  federated_hton.db_type= DB_TYPE_FEDERATED_DB;
  federated_hton.commit= federated_commit;
  federated_hton.rollback= federated_rollback;
  federated_hton.create= federated_create_handler;
  federated_hton.panic= federated_db_end;
  federated_hton.flags= HTON_ALTER_NOT_SUPPORTED;

417 418
  if (pthread_mutex_init(&federated_mutex, MY_MUTEX_INIT_FAST))
    goto error;
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
419
  if (!hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
420 421 422 423 424
                    (hash_get_key) federated_get_key, 0, 0))
  {
    federated_init= TRUE;
    DBUG_RETURN(FALSE);
  }
425 426

  VOID(pthread_mutex_destroy(&federated_mutex));
427 428 429
error:
  have_federated_db= SHOW_OPTION_DISABLED;	// If we couldn't use handler
  DBUG_RETURN(TRUE);
430 431 432 433 434 435 436 437 438 439 440 441 442
}


/*
  Release the federated handler.

  SYNOPSIS
    federated_db_end()

  RETURN
    FALSE       OK
*/

443
int federated_db_end(ha_panic_function type)
444 445 446 447 448 449 450
{
  if (federated_init)
  {
    hash_free(&federated_open_tables);
    VOID(pthread_mutex_destroy(&federated_mutex));
  }
  federated_init= 0;
451
  return 0;
452 453
}

454

455 456 457 458 459 460
/*
 Check (in create) whether the tables exists, and that it can be connected to

  SYNOPSIS
    check_foreign_data_source()
      share               pointer to FEDERATED share
461 462
      table_create_flag   tells us that ::create is the caller, 
                          therefore, return CANT_CREATE_FEDERATED_TABLE
463 464 465 466 467 468 469

  DESCRIPTION
    This method first checks that the connection information that parse url
    has populated into the share will be sufficient to connect to the foreign
    table, and if so, does the foreign table exist.
*/

470
static int check_foreign_data_source(FEDERATED_SHARE *share,
471
                                     bool table_create_flag)
472
{
473
  char escaped_table_name[NAME_LEN*2];
474 475
  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
476 477
  uint error_code;
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
478
  MYSQL *mysql;
479
  DBUG_ENTER("ha_federated::check_foreign_data_source");
480

481
  /* Zero the length, otherwise the string will have misc chars */
482 483 484
  query.length(0);

  /* error out if we can't alloc memory for mysql_init(NULL) (per Georg) */
485
  if (!(mysql= mysql_init(NULL)))
486 487 488 489 490 491 492 493 494 495
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  /* check if we can connect */
  if (!mysql_real_connect(mysql,
                          share->hostname,
                          share->username,
                          share->password,
                          share->database,
                          share->port,
                          share->socket, 0))
  {
496 497 498 499 500 501 502
    /*
      we want the correct error message, but it to return
      ER_CANT_CREATE_FEDERATED_TABLE if called by ::create
    */
    error_code= (table_create_flag ?
                 ER_CANT_CREATE_FEDERATED_TABLE :
                 ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
503

504
    my_sprintf(error_buffer,
505 506
               (error_buffer,
                "database: '%s'  username: '%s'  hostname: '%s'",
507 508 509
                share->database, share->username, share->hostname));

    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), error_buffer);
510 511 512 513
    goto error;
  }
  else
  {
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
514
    int escaped_table_name_length= 0;
515
    /*
516 517 518
      Since we do not support transactions at this version, we can let the 
      client API silently reconnect. For future versions, we will need more 
      logic to deal with transactions
519 520
    */
    mysql->reconnect= 1;
521
    /*
522 523 524
      Note: I am not using INORMATION_SCHEMA because this needs to work with 
      versions prior to 5.0
      
525
      if we can connect, then make sure the table exists 
526 527

      the query will be: SELECT * FROM `tablename` WHERE 1=0
528
    */
529 530 531 532
    query.append(FEDERATED_SELECT);
    query.append(FEDERATED_STAR);
    query.append(FEDERATED_FROM);
    query.append(FEDERATED_BTICK);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
533 534
    escaped_table_name_length=
      escape_string_for_mysql(&my_charset_bin, (char*)escaped_table_name,
535 536 537
                            sizeof(escaped_table_name),
                            share->table_name,
                            share->table_name_length);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
538
    query.append(escaped_table_name, escaped_table_name_length);
539 540
    query.append(FEDERATED_BTICK);
    query.append(FEDERATED_WHERE);
541
    query.append(FEDERATED_FALSE);
542 543 544

    if (mysql_real_query(mysql, query.ptr(), query.length()))
    {
545
      error_code= table_create_flag ?
546
        ER_CANT_CREATE_FEDERATED_TABLE : ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST;
547 548
      my_sprintf(error_buffer, (error_buffer, "error: %d  '%s'",
                                mysql_errno(mysql), mysql_error(mysql)));
549 550

      my_error(error_code, MYF(0), error_buffer);
551 552 553
      goto error;
    }
  }
554
  error_code=0;
555 556 557 558 559 560 561

error:
    mysql_close(mysql);
    DBUG_RETURN(error_code);
}


562 563
static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
{
564 565
  char buf[FEDERATED_QUERY_BUFFER_SIZE];
  int buf_len;
566
  DBUG_ENTER("ha_federated parse_url_error");
567

568 569 570 571 572 573 574 575
  if (share->scheme)
  {
    DBUG_PRINT("info",
               ("error: parse_url. Returning error code %d \
                freeing share->scheme %lx", error_num, share->scheme));
    my_free((gptr) share->scheme, MYF(0));
    share->scheme= 0;
  }
576 577 578
  buf_len= min(table->s->connect_string.length,
               FEDERATED_QUERY_BUFFER_SIZE-1);
  strmake(buf, table->s->connect_string.str, buf_len);
579 580 581 582
  my_error(error_num, MYF(0), buf);
  DBUG_RETURN(error_num);
}

583
/*
584
  Parse connection info from table->s->connect_string
585 586 587

  SYNOPSIS
    parse_url()
588 589 590
    share               pointer to FEDERATED share
    table               pointer to current TABLE class
    table_create_flag   determines what error to throw
591

592
  DESCRIPTION
593
    Populates the share with information about the connection
594
    to the foreign database that will serve as the data source.
595
    This string must be specified (currently) in the "comment" field,
596
    listed in the CREATE TABLE statement.
597 598 599

    This string MUST be in the format of any of these:

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
600 601 602 603
    scheme://username:password@hostname:port/database/table
    scheme://username@hostname/database/table
    scheme://username@hostname:port/database/table
    scheme://username:password@hostname/database/table
604 605 606 607 608 609 610

  An Example:

  mysql://joe:joespass@192.168.1.111:9308/federated/testtable

  ***IMPORTANT***
  Currently, only "mysql://" is supported.
611

612
  'password' and 'port' are both optional.
613 614

  RETURN VALUE
615 616
    0           success
    error_num   particular error code 
617 618

*/
619

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
620 621
static int parse_url(FEDERATED_SHARE *share, TABLE *table,
                     uint table_create_flag)
622
{
623 624 625
  uint error_num= (table_create_flag ?
                   ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
                   ER_FOREIGN_DATA_STRING_INVALID);
626
  DBUG_ENTER("ha_federated::parse_url");
627

628
  share->port= 0;
629
  share->socket= 0;
630 631
  DBUG_PRINT("info", ("Length: %d", table->s->connect_string.length));
  DBUG_PRINT("info", ("String: '%.*s'", table->s->connect_string.length, 
632
                      table->s->connect_string.str));
633
  share->scheme= my_strndup(table->s->connect_string.str,
634 635
                            table->s->connect_string.length,
                            MYF(0));
636

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
637
  share->connect_string_length= table->s->connect_string.length;
638
  DBUG_PRINT("info",("parse_url alloced share->scheme %lx", share->scheme));
639

640 641 642 643
  /*
    remove addition of null terminator and store length
    for each string  in share
  */
644 645 646
  if (!(share->username= strstr(share->scheme, "://")))
    goto error;
  share->scheme[share->username - share->scheme]= '\0';
647

648 649
  if (strcmp(share->scheme, "mysql") != 0)
    goto error;
650

651
  share->username+= 3;
652

653 654 655 656 657
  if (!(share->hostname= strchr(share->username, '@')))
    goto error;
    
  share->username[share->hostname - share->username]= '\0';
  share->hostname++;
658

659 660 661 662 663 664 665
  if ((share->password= strchr(share->username, ':')))
  {
    share->username[share->password - share->username]= '\0';
    share->password++;
    share->username= share->username;
    /* make sure there isn't an extra / or @ */
    if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
666
      goto error;
667 668 669 670 671 672 673
    /*
      Found that if the string is:
      user:@hostname:port/database/table
      Then password is a null string, so set to NULL
    */
    if ((share->password[0] == '\0'))
      share->password= NULL;
674 675
  }
  else
676
    share->username= share->username;
677

678 679
  /* make sure there isn't an extra / or @ */
  if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
680
    goto error;
681

682 683 684 685
  if (!(share->database= strchr(share->hostname, '/')))
    goto error;
  share->hostname[share->database - share->hostname]= '\0';
  share->database++;
686

687
  if ((share->sport= strchr(share->hostname, ':')))
688
  {
689 690 691 692 693 694
    share->hostname[share->sport - share->hostname]= '\0';
    share->sport++;
    if (share->sport[0] == '\0')
      share->sport= NULL;
    else
      share->port= atoi(share->sport);
695
  }
696

697 698 699 700
  if (!(share->table_name= strchr(share->database, '/')))
    goto error;
  share->database[share->table_name - share->database]= '\0';
  share->table_name++;
701

702
  share->table_name_length= strlen(share->table_name);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
703
 
704 705 706
  /* make sure there's not an extra / */
  if ((strchr(share->table_name, '/')))
    goto error;
707

708 709
  if (share->hostname[0] == '\0')
    share->hostname= NULL;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
710

711 712 713 714
  if (!share->port)
  {
    if (strcmp(share->hostname, my_localhost) == 0)
      share->socket= my_strdup(MYSQL_UNIX_ADDR, MYF(0));
715
    else
716
      share->port= MYSQL_PORT;
717
  }
718 719

  DBUG_PRINT("info",
720 721
             ("scheme: %s  username: %s  password: %s \
               hostname: %s  port: %d  database: %s  tablename: %s",
722 723 724
              share->scheme, share->username, share->password,
              share->hostname, share->port, share->database,
              share->table_name));
725 726 727 728

  DBUG_RETURN(0);

error:
729
  DBUG_RETURN(parse_url_error(share, table, error_num));
730 731
}

732

733 734 735 736
/*****************************************************************************
** FEDERATED tables
*****************************************************************************/

737
ha_federated::ha_federated(TABLE_SHARE *table_arg)
738
  :handler(&federated_hton, table_arg),
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
739
  mysql(0), stored_result(0)
740 741 742
{
  trx_next= 0;
}
743 744


745
/*
746 747 748 749
  Convert MySQL result set row to handler internal format

  SYNOPSIS
    convert_row_to_internal_format()
750
      record    Byte pointer to record
751
      row       MySQL result set row from fetchrow()
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
752
      result	Result set to use
753 754

  DESCRIPTION
755
    This method simply iterates through a row returned via fetchrow with
756 757
    values from a successful SELECT , and then stores each column's value
    in the field object via the field object pointer (pointing to the table's
758
    array of field object pointers). This is how the handler needs the data
759 760 761
    to be stored to then return results back to the user

  RETURN VALUE
762
    0   After fields have had field values stored from record
763
*/
764

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
765 766 767
uint ha_federated::convert_row_to_internal_format(byte *record,
                                                  MYSQL_ROW row,
                                                  MYSQL_RES *result)
768
{
769 770
  ulong *lengths;
  Field **field;
771
  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
772 773
  DBUG_ENTER("ha_federated::convert_row_to_internal_format");

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
774
  lengths= mysql_fetch_lengths(result);
775

776
  for (field= table->field; *field; field++, row++, lengths++)
777
  {
778 779 780 781 782 783
    /*
      index variable to move us through the row at the
      same iterative step as the field
    */
    my_ptrdiff_t old_ptr;
    old_ptr= (my_ptrdiff_t) (record - table->record[0]);
784 785
    (*field)->move_field_offset(old_ptr);
    if (!*row)
786 787
      (*field)->set_null();
    else
788
    {
789 790 791 792 793
      if (bitmap_is_set(table->read_set, (*field)->field_index))
      {
        (*field)->set_notnull();
        (*field)->store(*row, *lengths, &my_charset_bin);
      }
794
    }
795
    (*field)->move_field_offset(-old_ptr);
796
  }
797
  dbug_tmp_restore_column_map(table->write_set, old_map);
798 799 800 801 802 803
  DBUG_RETURN(0);
}

static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
{
  DBUG_ENTER("emit_key_part_name");
804
  if (to->append(FEDERATED_BTICK) ||
805
      to->append(part->field->field_name) ||
806
      to->append(FEDERATED_BTICK))
807 808 809 810 811 812 813 814 815 816 817
    DBUG_RETURN(1);                           // Out of memory
  DBUG_RETURN(0);
}

static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
                                  bool needs_quotes, bool is_like,
                                  const byte *ptr, uint len)
{
  Field *field= part->field;
  DBUG_ENTER("emit_key_part_element");

818
  if (needs_quotes && to->append(FEDERATED_SQUOTE))
819 820 821 822 823 824 825 826
    DBUG_RETURN(1);

  if (part->type == HA_KEYTYPE_BIT)
  {
    char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;

    *buf++= '0';
    *buf++= 'x';
monty@mysql.com's avatar
monty@mysql.com committed
827 828
    buf= octet2hex(buf, (char*) ptr, len);
    if (to->append((char*) buff, (uint)(buf - buff)))
829
      DBUG_RETURN(1);
830
  }
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
  else if (part->key_part_flag & HA_BLOB_PART)
  {
    String blob;
    uint blob_length= uint2korr(ptr);
    blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
                   blob_length, &my_charset_bin);
    if (append_escaped(to, &blob))
      DBUG_RETURN(1);
  }
  else if (part->key_part_flag & HA_VAR_LENGTH_PART)
  {
    String varchar;
    uint var_length= uint2korr(ptr);
    varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
                      var_length, &my_charset_bin);
    if (append_escaped(to, &varchar))
      DBUG_RETURN(1);
  }
  else
  {
    char strbuff[MAX_FIELD_WIDTH];
    String str(strbuff, sizeof(strbuff), part->field->charset()), *res;

    res= field->val_str(&str, (char *)ptr);

    if (field->result_type() == STRING_RESULT)
    {
      if (append_escaped(to, res))
        DBUG_RETURN(1);
    }
    else if (to->append(res->ptr(), res->length()))
      DBUG_RETURN(1);
  }

865
  if (is_like && to->append(FEDERATED_PERCENT))
866 867
    DBUG_RETURN(1);

868
  if (needs_quotes && to->append(FEDERATED_SQUOTE))
869
    DBUG_RETURN(1);
870 871 872 873

  DBUG_RETURN(0);
}

874 875 876 877 878 879 880 881 882 883
/*
  Create a WHERE clause based off of values in keys
  Note: This code was inspired by key_copy from key.cc

  SYNOPSIS
    create_where_from_key ()
      to          String object to store WHERE clause
      key_info    KEY struct pointer
      key         byte pointer containing key
      key_length  length of key
884 885
      range_type  0 - no range, 1 - min range, 2 - max range
                  (see enum range_operation)
886 887 888 889 890 891 892 893 894 895

  DESCRIPTION
    Using iteration through all the keys via a KEY_PART_INFO pointer,
    This method 'extracts' the value of each key in the byte pointer
    *key, and for each key found, constructs an appropriate WHERE clause

  RETURN VALUE
    0   After all keys have been accounted for to create the WHERE clause
    1   No keys found

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
    Range flags Table per Timour:

   -----------------
   - start_key:
     * ">"  -> HA_READ_AFTER_KEY
     * ">=" -> HA_READ_KEY_OR_NEXT
     * "="  -> HA_READ_KEY_EXACT

   - end_key:
     * "<"  -> HA_READ_BEFORE_KEY
     * "<=" -> HA_READ_AFTER_KEY

   records_in_range:
   -----------------
   - start_key:
     * ">"  -> HA_READ_AFTER_KEY
     * ">=" -> HA_READ_KEY_EXACT
     * "="  -> HA_READ_KEY_EXACT

   - end_key:
     * "<"  -> HA_READ_BEFORE_KEY
     * "<=" -> HA_READ_AFTER_KEY
     * "="  -> HA_READ_AFTER_KEY
919

920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
0 HA_READ_KEY_EXACT,              Find first record else error
1 HA_READ_KEY_OR_NEXT,            Record or next record
2 HA_READ_KEY_OR_PREV,            Record or previous
3 HA_READ_AFTER_KEY,              Find next rec. after key-record
4 HA_READ_BEFORE_KEY,             Find next rec. before key-record
5 HA_READ_PREFIX,                 Key which as same prefix
6 HA_READ_PREFIX_LAST,            Last key with the same prefix
7 HA_READ_PREFIX_LAST_OR_PREV,    Last or prev key with the same prefix

Flags that I've found:

id, primary key, varchar

id = 'ccccc'
records_in_range: start_key 0 end_key 3
read_range_first: start_key 0 end_key NULL

id > 'ccccc'
records_in_range: start_key 3 end_key NULL
read_range_first: start_key 3 end_key NULL

id < 'ccccc'
records_in_range: start_key NULL end_key 4
read_range_first: start_key NULL end_key 4

id <= 'ccccc'
records_in_range: start_key NULL end_key 3
read_range_first: start_key NULL end_key 3

id >= 'ccccc'
records_in_range: start_key 0 end_key NULL
read_range_first: start_key 1 end_key NULL

id like 'cc%cc'
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 'aaaaa' and id < 'ccccc'
records_in_range: start_key 3 end_key 4
read_range_first: start_key 3 end_key 4

id >= 'aaaaa' and id < 'ccccc';
records_in_range: start_key 0 end_key 4
read_range_first: start_key 1 end_key 4

id >= 'aaaaa' and id <= 'ccccc';
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 'aaaaa' and id <= 'ccccc';
records_in_range: start_key 3 end_key 3
read_range_first: start_key 3 end_key 3

numeric keys:

id = 4
index_read_idx: start_key 0 end_key NULL 

id > 4
records_in_range: start_key 3 end_key NULL
read_range_first: start_key 3 end_key NULL

id >= 4
records_in_range: start_key 0 end_key NULL
read_range_first: start_key 1 end_key NULL

id < 4
records_in_range: start_key NULL end_key 4
read_range_first: start_key NULL end_key 4

id <= 4
records_in_range: start_key NULL end_key 3
read_range_first: start_key NULL end_key 3

id like 4
full table scan, select * from

id > 2 and id < 8
records_in_range: start_key 3 end_key 4
read_range_first: start_key 3 end_key 4

id >= 2 and id < 8
records_in_range: start_key 0 end_key 4
read_range_first: start_key 1 end_key 4

id >= 2 and id <= 8
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 2 and id <= 8
records_in_range: start_key 3 end_key 3
read_range_first: start_key 3 end_key 3

multi keys (id int, name varchar, other varchar)

id = 1;
records_in_range: start_key 0 end_key 3
read_range_first: start_key 0 end_key NULL

id > 4;
id > 2 and name = '333'; remote: id > 2
id > 2 and name > '333'; remote: id > 2
id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
id >= 4 and name = 'eric was here' and other > 'eeee';
records_in_range: start_key 3 end_key NULL
read_range_first: start_key 3 end_key NULL

id >= 4;
id >= 2 and name = '333' and other < 'ddd';
remote: `id`  >= 2 AND `name`  >= '333';
records_in_range: start_key 0 end_key NULL
read_range_first: start_key 1 end_key NULL

id < 4;
id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
records_in_range: start_key NULL end_key 4
read_range_first: start_key NULL end_key 4

id <= 4;
records_in_range: start_key NULL end_key 3
read_range_first: start_key NULL end_key 3

id like 4;
full table scan

id  > 2 and id < 4;
records_in_range: start_key 3 end_key 4
read_range_first: start_key 3 end_key 4

id >= 2 and id < 4;
records_in_range: start_key 0 end_key 4
read_range_first: start_key 1 end_key 4

id >= 2 and id <= 4;
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 2 and id <= 4;
id = 6 and name = 'eric was here' and other > 'eeee';
remote: (`id`  > 6 AND `name`  > 'eric was here' AND `other`  > 'eeee')
AND (`id`  <= 6) AND ( AND `name`  <= 'eric was here')
no results
records_in_range: start_key 3 end_key 3
read_range_first: start_key 3 end_key 3

Summary:

* If the start key flag is 0 the max key flag shouldn't even be set, 
  and if it is, the query produced would be invalid.
* Multipart keys, even if containing some or all numeric columns,
  are treated the same as non-numeric keys

  If the query is " = " (quotes or not):
  - records in range start key flag HA_READ_KEY_EXACT,
    end key flag HA_READ_AFTER_KEY (incorrect)
  - any other: start key flag HA_READ_KEY_OR_NEXT,
    end key flag HA_READ_AFTER_KEY (correct)

* 'like' queries (of key)
  - Numeric, full table scan
  - Non-numeric
      records_in_range: start_key 0 end_key 3
      other : start_key 1 end_key 3

* If the key flag is HA_READ_AFTER_KEY:
   if start_key, append >
   if end_key, append <=

* If create_where_key was called by records_in_range:

 - if the key is numeric:
    start key flag is 0 when end key is NULL, end key flag is 3 or 4
 - if create_where_key was called by any other function:
    start key flag is 1 when end key is NULL, end key flag is 3 or 4
 - if the key is non-numeric, or multipart
    When the query is an exact match, the start key flag is 0,
    end key flag is 3 for what should be a no-range condition where
    you should have 0 and max key NULL, which it is if called by
    read_range_first

Conclusion:

1. Need logic to determin if a key is min or max when the flag is
HA_READ_AFTER_KEY, and handle appending correct operator accordingly

2. Need a boolean flag to pass to create_where_from_key, used in the
switch statement. Add 1 to the flag if:
  - start key flag is HA_READ_KEY_EXACT and the end key is NULL

*/

bool ha_federated::create_where_from_key(String *to,
                                         KEY *key_info,
                                         const key_range *start_key,
                                         const key_range *end_key,
1116 1117
                                         bool records_in_range,
                                         bool eq_range)
1118
{
1119
  bool both_not_null=
1120
    (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1121 1122 1123 1124 1125
  const byte *ptr;
  uint remainder, length;
  char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
  String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
  const key_range *ranges[2]= { start_key, end_key };
1126
  my_bitmap_map *old_map;
1127
  DBUG_ENTER("ha_federated::create_where_from_key");
1128

1129 1130 1131
  tmp.length(0); 
  if (start_key == NULL && end_key == NULL)
    DBUG_RETURN(1);
1132

1133 1134
  old_map= dbug_tmp_use_all_columns(table, table->write_set);
  for (uint i= 0; i <= 1; i++)
1135 1136 1137 1138 1139
  {
    bool needs_quotes;
    KEY_PART_INFO *key_part;
    if (ranges[i] == NULL)
      continue;
1140

1141
    if (both_not_null)
1142
    {
1143
      if (i > 0)
1144
        tmp.append(FEDERATED_CONJUNCTION);
1145
      else
1146
        tmp.append(FEDERATED_OPENPAREN);
1147
    }
1148

1149 1150 1151 1152 1153 1154
    for (key_part= key_info->key_part,
         remainder= key_info->key_parts,
         length= ranges[i]->length,
         ptr= ranges[i]->key; ;
         remainder--,
         key_part++)
1155
    {
1156 1157 1158 1159 1160
      Field *field= key_part->field;
      uint store_length= key_part->store_length;
      uint part_length= min(store_length, length);
      needs_quotes= field->needs_quotes();
      DBUG_DUMP("key, start of loop", (char *) ptr, length);
monty@mysql.com's avatar
monty@mysql.com committed
1161

1162 1163 1164 1165 1166
      if (key_part->null_bit)
      {
        if (*ptr++)
        {
          if (emit_key_part_name(&tmp, key_part) ||
1167
              tmp.append(FEDERATED_ISNULL))
1168
            goto err;
1169 1170 1171
          continue;
        }
      }
1172

1173
      if (tmp.append(FEDERATED_OPENPAREN))
1174
        goto err;
1175

1176 1177
      switch (ranges[i]->flag) {
      case HA_READ_KEY_EXACT:
1178
        DBUG_PRINT("info", ("federated HA_READ_KEY_EXACT %d", i));
1179 1180 1181 1182 1183 1184
        if (store_length >= length ||
            !needs_quotes ||
            key_part->type == HA_KEYTYPE_BIT ||
            field->result_type() != STRING_RESULT)
        {
          if (emit_key_part_name(&tmp, key_part))
1185
            goto err;
1186 1187 1188

          if (records_in_range)
          {
1189
            if (tmp.append(FEDERATED_GE))
1190
              goto err;
1191 1192 1193
          }
          else
          {
1194
            if (tmp.append(FEDERATED_EQ))
1195
              goto err;
1196
          }
1197

1198 1199
          if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
                                    part_length))
1200
            goto err;
1201 1202 1203
        }
        else
        {
1204
          /* LIKE */
1205
          if (emit_key_part_name(&tmp, key_part) ||
1206 1207 1208
              tmp.append(FEDERATED_LIKE) ||
              emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
                                    part_length))
1209
            goto err;
1210 1211
        }
        break;
1212 1213 1214 1215 1216 1217 1218
      case HA_READ_AFTER_KEY:
        if (eq_range)
        {
          if (tmp.append("1=1"))                // Dummy
            goto err;
          break;
        }
1219
        DBUG_PRINT("info", ("federated HA_READ_AFTER_KEY %d", i));
1220 1221 1222
        if (store_length >= length) /* end key */
        {
          if (emit_key_part_name(&tmp, key_part))
1223
            goto err;
1224 1225

          if (i > 0) /* end key */
1226 1227
          {
            if (tmp.append(FEDERATED_LE))
1228
              goto err;
1229
          }
1230
          else /* start key */
1231 1232
          {
            if (tmp.append(FEDERATED_GT))
1233
              goto err;
1234
          }
1235 1236

          if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1237 1238
                                    part_length))
          {
1239
            goto err;
1240
          }
1241 1242
          break;
        }
1243
      case HA_READ_KEY_OR_NEXT:
1244
        DBUG_PRINT("info", ("federated HA_READ_KEY_OR_NEXT %d", i));
1245 1246
        if (emit_key_part_name(&tmp, key_part) ||
            tmp.append(FEDERATED_GE) ||
1247
            emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1248
              part_length))
1249
          goto err;
1250
        break;
1251
      case HA_READ_BEFORE_KEY:
1252
        DBUG_PRINT("info", ("federated HA_READ_BEFORE_KEY %d", i));
1253 1254 1255
        if (store_length >= length)
        {
          if (emit_key_part_name(&tmp, key_part) ||
1256 1257 1258
              tmp.append(FEDERATED_LT) ||
              emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
                                    part_length))
1259
            goto err;
1260 1261
          break;
        }
1262
      case HA_READ_KEY_OR_PREV:
1263
        DBUG_PRINT("info", ("federated HA_READ_KEY_OR_PREV %d", i));
1264 1265
        if (emit_key_part_name(&tmp, key_part) ||
            tmp.append(FEDERATED_LE) ||
1266 1267
            emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
                                  part_length))
1268
          goto err;
1269 1270 1271
        break;
      default:
        DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1272
        goto err;
1273
      }
1274
      if (tmp.append(FEDERATED_CLOSEPAREN))
1275
        goto err;
1276 1277 1278 1279 1280 1281 1282 1283

next_loop:
      if (store_length >= length)
        break;
      DBUG_PRINT("info", ("remainder %d", remainder));
      DBUG_ASSERT(remainder > 1);
      length-= store_length;
      ptr+= store_length;
1284
      if (tmp.append(FEDERATED_AND))
1285
        goto err;
1286 1287 1288 1289

      DBUG_PRINT("info",
                 ("create_where_from_key WHERE clause: %s",
                  tmp.c_ptr_quick()));
1290
    }
1291
  }
1292 1293
  dbug_tmp_restore_column_map(table->write_set, old_map);

1294
  if (both_not_null)
1295 1296
    if (tmp.append(FEDERATED_CLOSEPAREN))
      DBUG_RETURN(1);
1297

1298
  if (to->append(FEDERATED_WHERE))
1299 1300 1301 1302 1303 1304
    DBUG_RETURN(1);

  if (to->append(tmp))
    DBUG_RETURN(1);

  DBUG_RETURN(0);
1305 1306 1307 1308

err:
  dbug_tmp_restore_column_map(table->write_set, old_map);
  DBUG_RETURN(1);
1309 1310 1311 1312 1313 1314 1315
}

/*
  Example of simple lock controls. The "share" it creates is structure we will
  pass to each federated handler. Do you have to have one of these? Well, you
  have pieces that are used for locking, and they are needed to function.
*/
1316

1317 1318
static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
{
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1319
  char *select_query;
1320 1321
  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  Field **field;
1322
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1323
  FEDERATED_SHARE *share= NULL, tmp_share;
1324 1325 1326
  /*
    In order to use this string, we must first zero it's length,
    or it will contain garbage
1327
  */
1328 1329 1330 1331
  query.length(0);

  pthread_mutex_lock(&federated_mutex);

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1332 1333 1334 1335
  if (parse_url(&tmp_share, table, 0))
    goto error;

  /* TODO: change tmp_share.scheme to LEX_STRING object */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1336
  if (!(share= (FEDERATED_SHARE *) hash_search(&federated_open_tables,
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1337 1338 1339
                                               (byte*) tmp_share.scheme,
                                               tmp_share.
                                               connect_string_length)))
1340 1341
  {
    query.set_charset(system_charset_info);
1342
    query.append(FEDERATED_SELECT);
1343 1344
    for (field= table->field; *field; field++)
    {
1345
      query.append(FEDERATED_BTICK);
1346
      query.append((*field)->field_name);
1347 1348
      query.append(FEDERATED_BTICK);
      query.append(FEDERATED_COMMA);
1349
    }
1350
    query.length(query.length()- FEDERATED_COMMA_LEN);
1351 1352
    query.append(FEDERATED_FROM);
    query.append(FEDERATED_BTICK);
1353

1354
    if (!(share= (FEDERATED_SHARE *)
1355
          my_multi_malloc(MYF(MY_WME),
1356
                          &share, sizeof(*share),
1357
                          &select_query,
1358
                          query.length()+table->s->connect_string.length+1,
1359
                          NullS)))
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1360 1361
      goto error;

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1362 1363 1364 1365
    memcpy(share, &tmp_share, sizeof(tmp_share));

    share->table_name_length= strlen(share->table_name);
    /* TODO: share->table_name to LEX_STRING object */
1366
    query.append(share->table_name, share->table_name_length);
1367
    query.append(FEDERATED_BTICK);
1368
    share->select_query= select_query;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1369
    strmov(share->select_query, query.ptr());
1370
    share->use_count= 0;
1371
    DBUG_PRINT("info",
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1372
               ("share->select_query %s", share->select_query));
1373

1374 1375 1376
    if (my_hash_insert(&federated_open_tables, (byte*) share))
      goto error;
    thr_lock_init(&share->lock);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1377
    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
1378 1379 1380 1381 1382 1383 1384 1385
  }
  share->use_count++;
  pthread_mutex_unlock(&federated_mutex);

  return share;

error:
  pthread_mutex_unlock(&federated_mutex);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1386 1387
  my_free((gptr) tmp_share.scheme, MYF(MY_ALLOW_ZERO_PTR));
  my_free((gptr) share, MYF(MY_ALLOW_ZERO_PTR));
1388 1389 1390 1391 1392
  return NULL;
}


/*
1393
  Free lock controls. We call this whenever we close a table.
1394 1395 1396
  If the table had the last reference to the share then we
  free memory associated with it.
*/
1397

1398 1399
static int free_share(FEDERATED_SHARE *share)
{
1400
  DBUG_ENTER("free_share");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1401

1402
  pthread_mutex_lock(&federated_mutex);
1403 1404 1405
  if (!--share->use_count)
  {
    hash_delete(&federated_open_tables, (byte*) share);
1406
    my_free((gptr) share->scheme, MYF(MY_ALLOW_ZERO_PTR));
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1407
    my_free((gptr) share->socket, MYF(MY_ALLOW_ZERO_PTR));
1408
    thr_lock_delete(&share->lock);
1409
    VOID(pthread_mutex_destroy(&share->mutex));
1410 1411 1412 1413
    my_free((gptr) share, MYF(0));
  }
  pthread_mutex_unlock(&federated_mutex);

1414
  DBUG_RETURN(0);
1415 1416 1417
}


1418 1419 1420 1421 1422
ha_rows ha_federated::records_in_range(uint inx, key_range *start_key,
                                   key_range *end_key)
{
  /*

1423 1424 1425
  We really want indexes to be used as often as possible, therefore
  we just need to hard-code the return value to a very low number to
  force the issue
1426 1427 1428 1429 1430

*/
  DBUG_ENTER("ha_federated::records_in_range");
  DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
}
1431 1432
/*
  If frm_error() is called then we will use this to to find out
1433 1434
  what file extentions exist for the storage engine. This is
  also used by the default rename_table and delete_table method
1435 1436
  in handler.cc.
*/
1437

1438
const char **ha_federated::bas_ext() const
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1439
{
1440 1441 1442 1443 1444
  static const char *ext[]=
  {
    NullS
  };
  return ext;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1445
}
1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457


/*
  Used for opening tables. The name will be the name of the file.
  A table is opened when it needs to be opened. For instance
  when a request comes in for a select on the table (tables are not
  open and closed for each request, they are cached).

  Called from handler.cc by handler::ha_open(). The server opens
  all tables by calling ha_open() which then calls the handler
  specific open().
*/
1458

1459 1460
int ha_federated::open(const char *name, int mode, uint test_if_locked)
{
1461
  DBUG_ENTER("ha_federated::open");
1462 1463 1464

  if (!(share= get_share(name, table)))
    DBUG_RETURN(1);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1465
  thr_lock_data_init(&share->lock, &lock, NULL);
1466

1467
  /* Connect to foreign database mysql_real_connect() */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1468
  mysql= mysql_init(0);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1469 1470 1471 1472 1473 1474 1475
  if (!mysql || !mysql_real_connect(mysql,
                                   share->hostname,
                                   share->username,
                                   share->password,
                                   share->database,
                                   share->port,
                                   share->socket, 0))
1476
  {
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1477
    free_share(share);
1478
    DBUG_RETURN(stash_remote_error());
1479
  }
1480 1481
  /*
    Since we do not support transactions at this version, we can let the client
1482 1483
    API silently reconnect. For future versions, we will need more logic to
    deal with transactions
1484 1485
  */
  mysql->reconnect= 1;
1486

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1487 1488 1489 1490 1491
  ref_length= (table->s->primary_key != MAX_KEY ?
               table->key_info[table->s->primary_key].key_length :
               table->s->reclength);
  DBUG_PRINT("info", ("ref_length: %u", ref_length));

1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
  DBUG_RETURN(0);
}


/*
  Closes a table. We call the free_share() function to free any resources
  that we have allocated in the "shared" structure.

  Called from sql_base.cc, sql_select.cc, and table.cc.
  In sql_select.cc it is only used to close up temporary tables or during
  the process where a temporary table is converted over to being a
  myisam table.
  For sql_base.cc look at close_data_tables().
*/
1506

1507 1508
int ha_federated::close(void)
{
1509
  int retval;
1510
  DBUG_ENTER("ha_federated::close");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1511

1512
  /* free the result set */
1513
  if (stored_result)
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1514
  {
1515 1516
    mysql_free_result(stored_result);
    stored_result= 0;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1517
  }
1518
  /* Disconnect from mysql */
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1519 1520
  if (mysql)                                    // QQ is this really needed
    mysql_close(mysql);
1521 1522
  retval= free_share(share);
  DBUG_RETURN(retval);
1523 1524 1525 1526 1527

}

/*

1528
  Checks if a field in a record is SQL NULL.
1529 1530 1531 1532 1533 1534 1535 1536 1537

  SYNOPSIS
    field_in_record_is_null()
      table     TABLE pointer, MySQL table object
      field     Field pointer, MySQL field object
      record    char pointer, contains record

    DESCRIPTION
      This method uses the record format information in table to track
1538
      the null bit in record.
1539 1540 1541

    RETURN VALUE
      1    if NULL
1542
      0    otherwise
1543
*/
1544

1545 1546 1547
inline uint field_in_record_is_null(TABLE *table,
                                    Field *field,
                                    char *record)
1548 1549 1550 1551 1552 1553 1554
{
  int null_offset;
  DBUG_ENTER("ha_federated::field_in_record_is_null");

  if (!field->null_ptr)
    DBUG_RETURN(0);

1555
  null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]);
1556 1557 1558 1559 1560 1561 1562

  if (record[null_offset] & field->null_bit)
    DBUG_RETURN(1);

  DBUG_RETURN(0);
}

1563

1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576
/*
  write_row() inserts a row. No extra() hint is given currently if a bulk load
  is happeneding. buf() is a byte array of data. You can use the field
  information to extract the data from the native byte array type.
  Example of this would be:
  for (Field **field=table->field ; *field ; field++)
  {
    ...
  }

  Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
  sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
*/
1577

1578
int ha_federated::write_row(byte *buf)
1579
{
1580
  bool has_fields= FALSE;
1581 1582 1583
  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1584
  Field **field;
1585

1586
  /* The main insert query string */
1587
  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
1588
  /* The string containing the values to be added to the insert */
1589
  String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1590
  /* The actual value of the field, to be added to the values_string */
1591
  String insert_field_value_string(insert_field_value_buffer,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1592 1593
                                   sizeof(insert_field_value_buffer),
                                   &my_charset_bin);
1594
  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1595
  DBUG_ENTER("ha_federated::write_row");
1596 1597 1598 1599
  DBUG_PRINT("info",
             ("table charset name %s csname %s",
              table->s->table_charset->name,
              table->s->table_charset->csname));
1600

1601 1602 1603
  values_string.length(0);
  insert_string.length(0);
  insert_field_value_string.length(0);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1604
  statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
1605 1606 1607
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();

1608 1609 1610 1611 1612
  /*
    start both our field and field values strings
  */
  insert_string.append(FEDERATED_INSERT);
  insert_string.append(FEDERATED_BTICK);
1613
  insert_string.append(share->table_name, share->table_name_length);
1614 1615 1616 1617 1618
  insert_string.append(FEDERATED_BTICK);
  insert_string.append(FEDERATED_OPENPAREN);

  values_string.append(FEDERATED_VALUES);
  values_string.append(FEDERATED_OPENPAREN);
1619 1620

  /*
1621
    loop through the field pointer array, add any fields to both the values
1622
    list and the fields list that is part of the write set
1623

1624
    You might ask "Why an index variable (has_fields) ?" My answer is that
1625
    we need to count how many fields we actually need
1626
  */
1627
  for (field= table->field; *field; field++)
1628
  {
1629
    /* if there is a query id and if it's equal to the current query id */
1630
    if (bitmap_is_set(table->write_set, (*field)->field_index))
1631
    {
1632 1633 1634 1635
      /*
        There are some fields. This will be used later to determine
        whether to chop off commas and parens.
      */
1636
      has_fields= TRUE;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1637

1638
      if ((*field)->is_null())
1639
        insert_field_value_string.append(FEDERATED_NULL);
1640 1641 1642
      else
      {
        (*field)->val_str(&insert_field_value_string);
1643
        /* quote these fields if they require it */
1644
        (*field)->quote_data(&insert_field_value_string);
1645
      }
1646
      /* append the field name */
1647 1648
      insert_string.append((*field)->field_name);

1649
      /* append the value */
1650 1651 1652
      values_string.append(insert_field_value_string);
      insert_field_value_string.length(0);

1653
      /* append commas between both fields and fieldnames */
1654
      /*
1655 1656 1657
        unfortunately, we can't use the logic if *(fields + 1) to
        make the following appends conditional as we don't know if the
        next field is in the write set
1658
      */
1659 1660
      insert_string.append(FEDERATED_COMMA);
      values_string.append(FEDERATED_COMMA);
1661 1662
    }
  }
1663
  dbug_tmp_restore_column_map(table->read_set, old_map);
1664 1665

  /*
1666 1667 1668
    if there were no fields, we don't want to add a closing paren
    AND, we don't want to chop off the last char '('
    insert will be "INSERT INTO t1 VALUES ();"
1669
  */
1670
  if (has_fields)
1671
  {
1672
    /* chops off leading commas */
1673 1674
    insert_string.length(insert_string.length() - FEDERATED_COMMA_LEN);
    values_string.length(values_string.length() - FEDERATED_COMMA_LEN);
1675
    insert_string.append(FEDERATED_CLOSEPAREN);
1676
  }
1677 1678 1679
  else
    insert_string.length(insert_string.length() - FEDERATED_CLOSEPAREN_LEN);

1680
  /* we always want to append this, even if there aren't any fields */
1681
  values_string.append(FEDERATED_CLOSEPAREN);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1682

1683
  /* add the values */
1684 1685
  insert_string.append(values_string);

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1686
  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
1687
  {
1688
    DBUG_RETURN(stash_remote_error());
1689
  }
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1690
  /*
1691 1692
    If the table we've just written a record to contains an auto_increment
    field, then store the last_insert_id() value from the foreign server
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1693 1694 1695
  */
  if (table->next_number_field)
    update_auto_increment();
1696 1697 1698 1699

  DBUG_RETURN(0);
}

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1700 1701 1702 1703 1704 1705
/*
  ha_federated::update_auto_increment

  This method ensures that last_insert_id() works properly. What it simply does
  is calls last_insert_id() on the foreign database immediately after insert
  (if the table has an auto_increment field) and sets the insert id via
1706
  thd->insert_id(ID)).
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1707 1708 1709 1710 1711 1712
*/
void ha_federated::update_auto_increment(void)
{
  THD *thd= current_thd;
  DBUG_ENTER("ha_federated::update_auto_increment");

1713 1714
  thd->first_successful_insert_id_in_cur_stmt= 
    mysql->last_used_con->insert_id;
1715
  DBUG_PRINT("info",("last_insert_id %d", stats.auto_increment_value));
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1716 1717 1718

  DBUG_VOID_RETURN;
}
1719

1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  char query_buffer[STRING_BUFFER_USUAL_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::optimize");
  
  query.length(0);

  query.set_charset(system_charset_info);
  query.append(FEDERATED_OPTIMIZE);
  query.append(FEDERATED_BTICK);
  query.append(share->table_name, share->table_name_length);
  query.append(FEDERATED_BTICK);

  if (mysql_real_query(mysql, query.ptr(), query.length()))
  {
1736
    DBUG_RETURN(stash_remote_error());
1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761
  }

  DBUG_RETURN(0);
}


int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
{
  char query_buffer[STRING_BUFFER_USUAL_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::repair");

  query.length(0);

  query.set_charset(system_charset_info);
  query.append(FEDERATED_REPAIR);
  query.append(FEDERATED_BTICK);
  query.append(share->table_name, share->table_name_length);
  query.append(FEDERATED_BTICK);
  if (check_opt->flags & T_QUICK)
    query.append(FEDERATED_QUICK);
  if (check_opt->flags & T_EXTEND)
    query.append(FEDERATED_EXTENDED);
  if (check_opt->sql_flags & TT_USEFRM)
    query.append(FEDERATED_USE_FRM);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1762

1763 1764
  if (mysql_real_query(mysql, query.ptr(), query.length()))
  {
1765
    DBUG_RETURN(stash_remote_error());
1766 1767 1768 1769 1770
  }

  DBUG_RETURN(0);
}

1771

1772 1773 1774 1775 1776 1777
/*
  Yes, update_row() does what you expect, it updates a row. old_data will have
  the previous row record in it, while new_data will have the newest data in
  it.

  Keep in mind that the server can do updates based on ordering if an ORDER BY
ingo/mydev@chilla.local's avatar
ingo/mydev@chilla.local committed
1778
  clause was used. Consecutive ordering is not guaranteed.
1779 1780 1781 1782 1783 1784 1785 1786 1787
  Currently new_data will not have an updated auto_increament record, or
  and updated timestamp field. You can do these for federated by doing these:
  if (table->timestamp_on_update_now)
    update_timestamp(new_row+table->timestamp_on_update_now-1);
  if (table->next_number_field && record == table->record[0])
    update_auto_increment();

  Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
*/
1788

1789
int ha_federated::update_row(const byte *old_data, byte *new_data)
1790
{
1791
  /*
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
    This used to control how the query was built. If there was a
    primary key, the query would be built such that there was a where
    clause with only that column as the condition. This is flawed,
    because if we have a multi-part primary key, it would only use the
    first part! We don't need to do this anyway, because
    read_range_first will retrieve the correct record, which is what
    is used to build the WHERE clause. We can however use this to
    append a LIMIT to the end if there is NOT a primary key. Why do
    this? Because we only are updating one record, and LIMIT enforces
    this.
1802
  */
1803
  bool has_a_primary_key= test(table->s->primary_key != MAX_KEY);
ingo/mydev@chilla.local's avatar
ingo/mydev@chilla.local committed
1804
  /*
1805 1806
    buffers for following strings
  */
1807
  char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1808 1809
  char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1810

1811 1812 1813
  /* Work area for field values */
  String field_value(field_value_buffer, sizeof(field_value_buffer),
                     &my_charset_bin);
1814
  /* stores the update query */
1815 1816 1817
  String update_string(update_buffer,
                       sizeof(update_buffer),
                       &my_charset_bin);
1818
  /* stores the WHERE clause */
1819 1820 1821
  String where_string(where_buffer,
                      sizeof(where_buffer),
                      &my_charset_bin);
1822
  DBUG_ENTER("ha_federated::update_row");
ingo/mydev@chilla.local's avatar
ingo/mydev@chilla.local committed
1823
  /*
1824 1825
    set string lengths to 0 to avoid misc chars in string
  */
1826
  field_value.length(0);
1827 1828
  update_string.length(0);
  where_string.length(0);
1829

1830 1831
  update_string.append(FEDERATED_UPDATE);
  update_string.append(FEDERATED_BTICK);
1832
  update_string.append(share->table_name);
1833 1834
  update_string.append(FEDERATED_BTICK);
  update_string.append(FEDERATED_SET);
1835

1836 1837 1838
  /*
    In this loop, we want to match column names to values being inserted
    (while building INSERT statement).
1839

1840 1841
    Iterate through table->field (new data) and share->old_field (old_data)
    using the same index to create an SQL UPDATE statement. New data is
1842 1843 1844
    used to create SET field=value and old data is used to create WHERE
    field=oldvalue
  */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1845

1846
  for (Field **field= table->field; *field; field++)
1847
  {
1848
    if (bitmap_is_set(table->write_set, (*field)->field_index))
1849
    {
ingo/mydev@chilla.local's avatar
ingo/mydev@chilla.local committed
1850 1851 1852
      update_string.append((*field)->field_name);
      update_string.append(FEDERATED_EQ);

1853
      if ((*field)->is_null())
1854
        update_string.append(FEDERATED_NULL);
1855 1856 1857 1858
      else
      {
        my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
        /* otherwise = */
1859 1860 1861 1862
        (*field)->val_str(&field_value);
        (*field)->quote_data(&field_value);
        update_string.append(field_value);
        field_value.length(0);
1863 1864 1865
        tmp_restore_column_map(table->read_set, old_map);
      }
      update_string.append(FEDERATED_COMMA);
1866
    }
1867

1868
    if (bitmap_is_set(table->read_set, (*field)->field_index))
1869
    {
1870 1871 1872 1873 1874 1875
      where_string.append((*field)->field_name);
      if (field_in_record_is_null(table, *field, (char*) old_data))
        where_string.append(FEDERATED_ISNULL);
      else
      {
        where_string.append(FEDERATED_EQ);
ingo/mydev@chilla.local's avatar
ingo/mydev@chilla.local committed
1876
        (*field)->val_str(&field_value,
1877
                          (char*) (old_data + (*field)->offset()));
ingo/mydev@chilla.local's avatar
ingo/mydev@chilla.local committed
1878 1879 1880
        (*field)->quote_data(&field_value);
        where_string.append(field_value);
        field_value.length(0);
1881
      }
1882
      where_string.append(FEDERATED_AND);
1883 1884
    }
  }
1885 1886 1887 1888 1889 1890 1891 1892 1893 1894

  /* Remove last ', '. This works as there must be at least on updated field */
  update_string.length(update_string.length() - FEDERATED_COMMA_LEN);
  if (where_string.length())
  {
    where_string.length(where_string.length() - FEDERATED_AND_LEN);
    update_string.append(FEDERATED_WHERE);
    update_string.append(where_string);
  }

1895 1896 1897 1898
  /*
    If this table has not a primary key, then we could possibly
    update multiple rows. We want to make sure to only update one!
  */
1899 1900
  if (!has_a_primary_key)
    update_string.append(FEDERATED_LIMIT1);
1901

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1902
  if (mysql_real_query(mysql, update_string.ptr(), update_string.length()))
1903
  {
1904
    DBUG_RETURN(stash_remote_error());
1905 1906 1907 1908 1909
  }
  DBUG_RETURN(0);
}

/*
1910
  This will delete a row. 'buf' will contain a copy of the row to be =deleted.
1911
  The server will call this right after the current row has been called (from
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1912
  either a previous rnd_next() or index call).
1913 1914 1915 1916 1917 1918
  If you keep a pointer to the last row or can access a primary key it will
  make doing the deletion quite a bit easier.
  Keep in mind that the server does no guarentee consecutive deletions.
  ORDER BY clauses can be used.

  Called in sql_acl.cc and sql_udf.cc to manage internal table information.
1919
  Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
1920 1921 1922
  it is used for removing duplicates while in insert it is used for REPLACE
  calls.
*/
1923

1924
int ha_federated::delete_row(const byte *buf)
1925
{
1926 1927
  char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1928 1929
  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
1930
  uint found= 0;
1931 1932
  DBUG_ENTER("ha_federated::delete_row");

1933
  delete_string.length(0);
1934 1935 1936
  delete_string.append(FEDERATED_DELETE);
  delete_string.append(FEDERATED_FROM);
  delete_string.append(FEDERATED_BTICK);
1937
  delete_string.append(share->table_name);
1938 1939
  delete_string.append(FEDERATED_BTICK);
  delete_string.append(FEDERATED_WHERE);
1940

monty@mysql.com's avatar
monty@mysql.com committed
1941
  for (Field **field= table->field; *field; field++)
1942
  {
1943
    Field *cur_field= *field;
1944 1945
    found++;
    if (bitmap_is_set(table->read_set, cur_field->field_index))
1946
    {
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961
      data_string.length(0);
      delete_string.append(cur_field->field_name);
      if (cur_field->is_null())
      {
        delete_string.append(FEDERATED_IS);
        delete_string.append(FEDERATED_NULL);
      }
      else
      {
        delete_string.append(FEDERATED_EQ);
        cur_field->val_str(&data_string);
        cur_field->quote_data(&data_string);
        delete_string.append(data_string);
      }
      delete_string.append(FEDERATED_AND);
1962 1963
    }
  }
1964 1965 1966 1967 1968

 // Remove trailing AND
  delete_string.length(delete_string.length() - FEDERATED_AND_LEN);
  if (!found)
    delete_string.length(delete_string.length() - FEDERATED_WHERE_LEN);
1969

1970
  delete_string.append(FEDERATED_LIMIT1);
1971
  DBUG_PRINT("info",
1972
             ("Delete sql: %s", delete_string.c_ptr_quick()));
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
1973
  if (mysql_real_query(mysql, delete_string.ptr(), delete_string.length()))
1974
  {
1975
    DBUG_RETURN(stash_remote_error());
1976
  }
1977
  stats.deleted+= mysql->affected_rows;
1978
  stats.records-= mysql->affected_rows;
1979 1980
  DBUG_PRINT("info",
             ("rows deleted %d rows deleted for all time %d",
1981
             int(mysql->affected_rows), stats.deleted));
1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992

  DBUG_RETURN(0);
}


/*
  Positions an index cursor to the index specified in the handle. Fetches the
  row if available. If the key value is null, begin at the first key of the
  index. This method, which is called in the case of an SQL statement having
  a WHERE clause on a non-primary key index, simply calls index_read_idx.
*/
1993

1994
int ha_federated::index_read(byte *buf, const byte *key,
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1995
                             uint key_len, ha_rkey_function find_flag)
1996 1997
{
  DBUG_ENTER("ha_federated::index_read");
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
1998 1999 2000 2001 2002 2003

  if (stored_result)
    mysql_free_result(stored_result);
  DBUG_RETURN(index_read_idx_with_result_set(buf, active_index, key,
                                             key_len, find_flag,
                                             &stored_result));
2004 2005 2006 2007 2008 2009 2010 2011
}


/*
  Positions an index cursor to the index specified in key. Fetches the
  row if any.  This is only used to read whole keys.

  This method is called via index_read in the case of a WHERE clause using
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2012
  a primary key index OR is called DIRECTLY when the WHERE clause
2013
  uses a PRIMARY KEY index.
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2014 2015 2016 2017

  NOTES
    This uses an internal result set that is deleted before function
    returns.  We need to be able to be calable from ha_rnd_pos()
2018
*/
2019

2020
int ha_federated::index_read_idx(byte *buf, uint index, const byte *key,
2021
                                 uint key_len, enum ha_rkey_function find_flag)
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050
{
  int retval;
  MYSQL_RES *mysql_result;
  DBUG_ENTER("ha_federated::index_read_idx");

  if ((retval= index_read_idx_with_result_set(buf, index, key,
                                              key_len, find_flag,
                                              &mysql_result)))
    DBUG_RETURN(retval);
  mysql_free_result(mysql_result);
  DBUG_RETURN(retval);
}


/*
  Create result set for rows matching query and return first row

  RESULT
    0	ok     In this case *result will contain the result set
	       table->status == 0 
    #   error  In this case *result will contain 0
               table->status == STATUS_NOT_FOUND
*/

int ha_federated::index_read_idx_with_result_set(byte *buf, uint index,
                                                 const byte *key,
                                                 uint key_len,
                                                 ha_rkey_function find_flag,
                                                 MYSQL_RES **result)
2051
{
2052 2053 2054 2055
  int retval;
  char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char index_value[STRING_BUFFER_USUAL_SIZE];
  char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2056
  String index_string(index_value,
2057 2058 2059 2060 2061 2062
                      sizeof(index_value),
                      &my_charset_bin);
  String sql_query(sql_query_buffer,
                   sizeof(sql_query_buffer),
                   &my_charset_bin);
  key_range range;
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2063
  DBUG_ENTER("ha_federated::index_read_idx_with_result_set");
2064

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2065
  *result= 0;                                   // In case of errors
2066
  index_string.length(0);
2067
  sql_query.length(0);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2068 2069
  statistic_increment(table->in_use->status_var.ha_read_key_count,
                      &LOCK_status);
2070 2071 2072

  sql_query.append(share->select_query);

2073 2074 2075 2076 2077 2078
  range.key= key;
  range.length= key_len;
  range.flag= find_flag;
  create_where_from_key(&index_string,
                        &table->key_info[index],
                        &range,
2079
                        NULL, 0, 0);
2080 2081
  sql_query.append(index_string);

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2082
  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
2083
  {
2084
    my_sprintf(error_buffer, (error_buffer, "error: %d '%s'",
2085 2086
                              mysql_errno(mysql), mysql_error(mysql)));
    retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2087
    goto error;
2088
  }
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2089
  if (!(*result= mysql_store_result(mysql)))
2090
  {
2091 2092
    retval= HA_ERR_END_OF_FILE;
    goto error;
2093
  }
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2094 2095
  if (!(retval= read_next(buf, *result)))
    DBUG_RETURN(retval);
2096

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2097 2098 2099
  mysql_free_result(*result);
  *result= 0;
  table->status= STATUS_NOT_FOUND;
2100
  DBUG_RETURN(retval);
2101

2102 2103 2104 2105
error:
  table->status= STATUS_NOT_FOUND;
  my_error(retval, MYF(0), error_buffer);
  DBUG_RETURN(retval);
2106 2107
}

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2108

2109
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2110

2111
int ha_federated::index_init(uint keynr, bool sorted)
2112 2113
{
  DBUG_ENTER("ha_federated::index_init");
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2114
  DBUG_PRINT("info", ("table: '%s'  key: %u", table->s->table_name, keynr));
2115 2116 2117 2118
  active_index= keynr;
  DBUG_RETURN(0);
}

2119

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2120 2121
/*
  Read first range
2122
*/
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2123

2124
int ha_federated::read_range_first(const key_range *start_key,
2125 2126
                                   const key_range *end_key,
                                   bool eq_range, bool sorted)
2127 2128 2129 2130 2131 2132 2133
{
  char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  int retval;
  String sql_query(sql_query_buffer,
                   sizeof(sql_query_buffer),
                   &my_charset_bin);
  DBUG_ENTER("ha_federated::read_range_first");
2134

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2135
  DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
2136 2137 2138 2139 2140

  sql_query.length(0);
  sql_query.append(share->select_query);
  create_where_from_key(&sql_query,
                        &table->key_info[active_index],
2141
                        start_key, end_key, 0, eq_range);
2142

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2143 2144 2145 2146 2147
  if (stored_result)
  {
    mysql_free_result(stored_result);
    stored_result= 0;
  }
2148 2149
  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
  {
2150
    retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2151 2152 2153 2154
    goto error;
  }
  sql_query.length(0);

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2155
  if (!(stored_result= mysql_store_result(mysql)))
2156 2157 2158 2159 2160
  {
    retval= HA_ERR_END_OF_FILE;
    goto error;
  }

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2161
  retval= read_next(table->record[0], stored_result);
2162 2163 2164
  DBUG_RETURN(retval);

error:
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2165 2166
  table->status= STATUS_NOT_FOUND;
  DBUG_RETURN(retval);
2167 2168
}

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2169

2170 2171 2172 2173 2174 2175 2176 2177 2178
int ha_federated::read_range_next()
{
  int retval;
  DBUG_ENTER("ha_federated::read_range_next");
  retval= rnd_next(table->record[0]);
  DBUG_RETURN(retval);
}


2179
/* Used to read forward through the index.  */
2180
int ha_federated::index_next(byte *buf)
2181 2182
{
  DBUG_ENTER("ha_federated::index_next");
2183 2184
  statistic_increment(table->in_use->status_var.ha_read_next_count,
		      &LOCK_status);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2185
  DBUG_RETURN(read_next(buf, stored_result));
2186
}
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2187 2188


2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200
/*
  rnd_init() is called when the system wants the storage engine to do a table
  scan.

  This is the method that gets data for the SELECT calls.

  See the federated in the introduction at the top of this file to see when
  rnd_init() is called.

  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
  sql_table.cc, and sql_update.cc.
*/
2201

2202 2203
int ha_federated::rnd_init(bool scan)
{
2204
  DBUG_ENTER("ha_federated::rnd_init");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2205
  /*
2206 2207 2208
    The use of the 'scan' flag is incredibly important for this handler
    to work properly, especially with updates containing WHERE clauses
    using indexed columns.
2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237

    When the initial query contains a WHERE clause of the query using an
    indexed column, it's index_read_idx that selects the exact record from
    the foreign database.

    When there is NO index in the query, either due to not having a WHERE
    clause, or the WHERE clause is using columns that are not indexed, a
    'full table scan' done by rnd_init, which in this situation simply means
    a 'select * from ...' on the foreign table.

    In other words, this 'scan' flag gives us the means to ensure that if
    there is an index involved in the query, we want index_read_idx to
    retrieve the exact record (scan flag is 0), and do not  want rnd_init
    to do a 'full table scan' and wipe out that result set.

    Prior to using this flag, the problem was most apparent with updates.

    An initial query like 'UPDATE tablename SET anything = whatever WHERE
    indexedcol = someval', index_read_idx would get called, using a query
    constructed with a WHERE clause built from the values of index ('indexcol'
    in this case, having a value of 'someval').  mysql_store_result would
    then get called (this would be the result set we want to use).

    After this rnd_init (from sql_update.cc) would be called, it would then
    unecessarily call "select * from table" on the foreign table, then call
    mysql_store_result, which would wipe out the correct previous result set
    from the previous call of index_read_idx's that had the result set
    containing the correct record, hence update the wrong row!

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2238
  */
2239

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2240
  if (scan)
2241
  {
2242
    if (stored_result)
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2243
    {
2244 2245
      mysql_free_result(stored_result);
      stored_result= 0;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2246
    }
2247

2248 2249 2250 2251
    if (mysql_real_query(mysql,
                         share->select_query,
                         strlen(share->select_query)))
      goto error;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2252

2253 2254
    stored_result= mysql_store_result(mysql);
    if (!stored_result)
2255
      goto error;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2256
  }
2257
  DBUG_RETURN(0);
2258 2259

error:
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2260
  DBUG_RETURN(stash_remote_error());
2261 2262
}

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2263

2264 2265 2266
int ha_federated::rnd_end()
{
  DBUG_ENTER("ha_federated::rnd_end");
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2267
  DBUG_RETURN(index_end());
2268 2269
}

2270

2271 2272 2273
int ha_federated::index_end(void)
{
  DBUG_ENTER("ha_federated::index_end");
2274
  if (stored_result)
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2275
  {
2276 2277
    mysql_free_result(stored_result);
    stored_result= 0;
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2278
  }
2279 2280 2281 2282
  active_index= MAX_KEY;
  DBUG_RETURN(0);
}

2283

2284 2285 2286 2287 2288 2289 2290 2291 2292
/*
  This is called for each row of the table scan. When you run out of records
  you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
  The Field structure for the table is the key to getting data into buf
  in a manner that will allow the server to understand it.

  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
  sql_table.cc, and sql_update.cc.
*/
2293

2294 2295 2296 2297
int ha_federated::rnd_next(byte *buf)
{
  DBUG_ENTER("ha_federated::rnd_next");

2298
  if (stored_result == 0)
2299 2300 2301 2302
  {
    /*
      Return value of rnd_init is not always checked (see records.cc),
      so we can get here _even_ if there is _no_ pre-fetched result-set!
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2303 2304
      TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
    */
2305 2306
    DBUG_RETURN(1);
  }
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339
  DBUG_RETURN(read_next(buf, stored_result));
}


/*
  ha_federated::read_next

  reads from a result set and converts to mysql internal
  format

  SYNOPSIS
    field_in_record_is_null()
      buf       byte pointer to record 
      result    mysql result set 

    DESCRIPTION
     This method is a wrapper method that reads one record from a result
     set and converts it to the internal table format

    RETURN VALUE
      1    error
      0    no error 
*/

int ha_federated::read_next(byte *buf, MYSQL_RES *result)
{
  int retval;
  my_ulonglong num_rows;
  MYSQL_ROW row;
  DBUG_ENTER("ha_federated::read_next");

  table->status= STATUS_NOT_FOUND;              // For easier return

2340
  /* Fetch a row, insert it back in a row format. */
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2341
  if (!(row= mysql_fetch_row(result)))
2342
    DBUG_RETURN(HA_ERR_END_OF_FILE);
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2343

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2344 2345 2346
  if (!(retval= convert_row_to_internal_format(buf, row, result)))
    table->status= 0;

2347
  DBUG_RETURN(retval);
2348 2349 2350 2351
}


/*
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2352 2353
  store reference to current row so that we can later find it for
  a re-read, update or delete.
2354

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2355 2356
  In case of federated, a reference is either a primary key or
  the whole record.
2357 2358 2359

  Called from filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc.
*/
2360

2361 2362 2363
void ha_federated::position(const byte *record)
{
  DBUG_ENTER("ha_federated::position");
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2364 2365 2366 2367 2368
  if (table->s->primary_key != MAX_KEY)
    key_copy(ref, (byte *)record, table->key_info + table->s->primary_key,
             ref_length);
  else
    memcpy(ref, record, ref_length);
2369 2370 2371 2372 2373 2374
  DBUG_VOID_RETURN;
}


/*
  This is like rnd_next, but you are given a position to use to determine the
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2375
  row. The position will be of the type that you stored in ref.
2376

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2377
  This method is required for an ORDER BY
2378 2379 2380

  Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
*/
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2381

2382
int ha_federated::rnd_pos(byte *buf, byte *pos)
2383
{
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2384
  int result;
2385
  DBUG_ENTER("ha_federated::rnd_pos");
2386 2387
  statistic_increment(table->in_use->status_var.ha_read_rnd_count,
                      &LOCK_status);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401
  if (table->s->primary_key != MAX_KEY)
  {
    /* We have a primary key, so use index_read_idx to find row */
    result= index_read_idx(buf, table->s->primary_key, pos,
                           ref_length, HA_READ_KEY_EXACT);
  }
  else
  {
    /* otherwise, get the old record ref as obtained in ::position */
    memcpy(buf, pos, ref_length);
    result= 0;
  }
  table->status= result ? STATUS_NOT_FOUND : 0;
  DBUG_RETURN(result);
2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447
}


/*
  ::info() is used to return information to the optimizer.
  Currently this table handler doesn't implement most of the fields
  really needed. SHOW also makes use of this data
  Another note, you will probably want to have the following in your
  code:
  if (records < 2)
    records = 2;
  The reason is that the server will optimize for cases of only a single
  record. If in a table scan you don't know the number of records
  it will probably be better to set records to two so you can return
  as many records as you need.
  Along with records a few more variables you may wish to set are:
    records
    deleted
    data_file_length
    index_file_length
    delete_length
    check_time
  Take a look at the public variables in handler.h for more information.

  Called in:
    filesort.cc
    ha_heap.cc
    item_sum.cc
    opt_sum.cc
    sql_delete.cc
    sql_delete.cc
    sql_derived.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_show.cc
    sql_show.cc
    sql_show.cc
    sql_show.cc
    sql_table.cc
    sql_union.cc
    sql_update.cc

*/
2448

2449 2450
void ha_federated::info(uint flag)
{
2451 2452 2453 2454 2455
  char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
  char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE];
  int error;
  uint error_code;
2456
  MYSQL_RES *result= 0;
2457 2458
  MYSQL_ROW row;
  String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
2459
  DBUG_ENTER("ha_federated::info");
2460

2461
  error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2462 2463 2464 2465
  /* we want not to show table status if not needed to do so */
  if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
  {
    status_query_string.length(0);
2466 2467
    status_query_string.append(FEDERATED_INFO);
    status_query_string.append(FEDERATED_SQUOTE);
2468 2469 2470 2471 2472 2473

    escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name,
                            sizeof(escaped_table_name),
                            share->table_name,
                            share->table_name_length);
    status_query_string.append(escaped_table_name);
2474
    status_query_string.append(FEDERATED_SQUOTE);
2475 2476 2477 2478 2479 2480 2481 2482

    if (mysql_real_query(mysql, status_query_string.ptr(),
                         status_query_string.length()))
      goto error;

    status_query_string.length(0);

    result= mysql_store_result(mysql);
2483
    if (!result)
2484 2485
      goto error;

2486
    if (!mysql_num_rows(result))
2487 2488 2489 2490 2491 2492 2493
      goto error;

    if (!(row= mysql_fetch_row(result)))
      goto error;

    if (flag & HA_STATUS_VARIABLE | HA_STATUS_CONST)
    {
2494
      /*
2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505
        deleted is set in ha_federated::info
      */
      /*
        need to figure out what this means as far as federated is concerned,
        since we don't have a "file"

        data_file_length = ?
        index_file_length = ?
        delete_length = ?
      */
      if (row[4] != NULL)
2506
        stats.records=   (ha_rows) my_strtoll10(row[4], (char**) 0,
2507
                                                       &error);
2508
      if (row[5] != NULL)
2509
        stats.mean_rec_length= (ha_rows) my_strtoll10(row[5], (char**) 0, &error);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2510

2511
      stats.data_file_length= stats.records * stats.mean_rec_length;
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2512

2513
      if (row[12] != NULL)
2514 2515
        stats.update_time=     (ha_rows) my_strtoll10(row[12], (char**) 0,
                                                      &error);
2516
      if (row[13] != NULL)
2517 2518
        stats.check_time=      (ha_rows) my_strtoll10(row[13], (char**) 0,
                                                      &error);
2519
    }
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2520 2521 2522 2523
    /*
      size of IO operations (This is based on a good guess, no high science
      involved)
    */
2524
    if (flag & HA_STATUS_CONST)
2525
      stats.block_size= 4096;
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2526

2527 2528 2529 2530
  }

  if (result)
    mysql_free_result(result);
2531

2532 2533 2534 2535 2536
  DBUG_VOID_RETURN;

error:
  if (result)
    mysql_free_result(result);
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2537

2538 2539
  my_sprintf(error_buffer, (error_buffer, ": %d : %s",
                            mysql_errno(mysql), mysql_error(mysql)));
2540
  my_error(error_code, MYF(0), error_buffer);
2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555
  DBUG_VOID_RETURN;
}


/*
  Used to delete all rows in a table. Both for cases of truncate and
  for cases where the optimizer realizes that all rows will be
  removed as a result of a SQL statement.

  Called from item_sum.cc by Item_func_group_concat::clear(),
  Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
  Called from sql_delete.cc by mysql_delete().
  Called from sql_select.cc by JOIN::reinit().
  Called from sql_union.cc by st_select_lex_unit::exec().
*/
2556

2557 2558
int ha_federated::delete_all_rows()
{
2559
  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2560
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2561 2562
  DBUG_ENTER("ha_federated::delete_all_rows");

2563 2564 2565
  query.length(0);

  query.set_charset(system_charset_info);
2566 2567
  query.append(FEDERATED_TRUNCATE);
  query.append(FEDERATED_BTICK);
2568
  query.append(share->table_name);
2569
  query.append(FEDERATED_BTICK);
2570

2571 2572 2573
  /*
    TRUNCATE won't return anything in mysql_affected_rows
  */
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2574 2575
  if (mysql_real_query(mysql, query.ptr(), query.length()))
  {
2576
    DBUG_RETURN(stash_remote_error());
2577
  }
2578 2579
  stats.deleted+= stats.records;
  stats.records= 0;
2580
  DBUG_RETURN(0);
2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612
}


/*
  The idea with handler::store_lock() is the following:

  The statement decided which locks we should need for the table
  for updates/deletes/inserts we get WRITE locks, for SELECT... we get
  read locks.

  Before adding the lock into the table lock handler (see thr_lock.c)
  mysqld calls store lock with the requested locks.  Store lock can now
  modify a write lock to a read lock (or some other lock), ignore the
  lock (if we don't want to use MySQL table locks at all) or add locks
  for many tables (like we do when we are using a MERGE handler).

  Berkeley DB for federated  changes all WRITE locks to TL_WRITE_ALLOW_WRITE
  (which signals that we are doing WRITES, but we are still allowing other
  reader's and writer's.

  When releasing locks, store_lock() are also called. In this case one
  usually doesn't have to do anything.

  In some exceptional cases MySQL may send a request for a TL_IGNORE;
  This means that we are requesting the same lock as last time and this
  should also be ignored. (This may happen when someone does a flush
  table when we have opened a part of the tables, in which case mysqld
  closes and reopens the tables and tries to get the same locks at last
  time).  In the future we will probably try to remove this.

  Called from lock.cc by get_lock_data().
*/
2613

2614
THR_LOCK_DATA **ha_federated::store_lock(THD *thd,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2615 2616
                                         THR_LOCK_DATA **to,
                                         enum thr_lock_type lock_type)
2617
{
patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2618
  DBUG_ENTER("ha_federated::store_lock");
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2619
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
2620
  {
2621 2622 2623 2624 2625
    /*
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
      TABLESPACE, then allow multiple writers
2626 2627 2628
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
2629
         lock_type <= TL_WRITE) && !thd->in_lock_tables)
2630 2631
      lock_type= TL_WRITE_ALLOW_WRITE;

2632 2633 2634 2635 2636 2637
    /*
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2.
2638 2639
    */

patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2640
    if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
2641 2642 2643 2644 2645 2646 2647
      lock_type= TL_READ;

    lock.type= lock_type;
  }

  *to++= &lock;

patg@govinda.patg.net's avatar
patg@govinda.patg.net committed
2648
  DBUG_RETURN(to);
2649 2650 2651 2652
}

/*
  create() does nothing, since we have no local setup of our own.
2653
  FUTURE: We should potentially connect to the foreign database and
2654
*/
2655

2656
int ha_federated::create(const char *name, TABLE *table_arg,
patg@krsna.patg.net's avatar
patg@krsna.patg.net committed
2657
                         HA_CREATE_INFO *create_info)
2658
{
2659 2660
  int retval;
  FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
2661
  DBUG_ENTER("ha_federated::create");
2662

2663 2664
  if (!(retval= parse_url(&tmp_share, table_arg, 1)))
    retval= check_foreign_data_source(&tmp_share, 1);
2665

2666
  my_free((gptr) tmp_share.scheme, MYF(MY_ALLOW_ZERO_PTR));
2667
  DBUG_RETURN(retval);
2668

2669
}
2670 2671 2672 2673 2674 2675


int ha_federated::stash_remote_error()
{
  DBUG_ENTER("ha_federated::stash_remote_error()");
  remote_error_number= mysql_errno(mysql);
2676
  strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686
  DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
}


bool ha_federated::get_error_message(int error, String* buf)
{
  DBUG_ENTER("ha_federated::get_error_message");
  DBUG_PRINT("enter", ("error: %d", error));
  if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
  {
2687
    buf->append(STRING_WITH_LEN("Error on remote system: "));
2688
    buf->qs_append(remote_error_number);
2689
    buf->append(STRING_WITH_LEN(": "));
2690
    buf->append(remote_error_buf);
2691 2692 2693 2694 2695 2696 2697 2698

    remote_error_number= 0;
    remote_error_buf[0]= '\0';
  }
  DBUG_PRINT("exit", ("message: %s", buf->ptr()));
  DBUG_RETURN(FALSE);
}

2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832
int ha_federated::external_lock(THD *thd, int lock_type)
{
  int error= 0;
  ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
  DBUG_ENTER("ha_federated::external_lock");

  if (lock_type != F_UNLCK)
  {
    DBUG_PRINT("info",("federated not lock F_UNLCK"));
    if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
    {
      DBUG_PRINT("info",("federated autocommit"));
      /* 
        This means we are doing an autocommit
      */
      error= connection_autocommit(TRUE);
      if (error)
      {
        DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error));
        DBUG_RETURN(error);
      }
      trans_register_ha(thd, FALSE, &federated_hton);
    }
    else 
    { 
      DBUG_PRINT("info",("not autocommit"));
      if (!trx)
      {
        /* 
          This is where a transaction gets its start
        */
        error= connection_autocommit(FALSE);
        if (error)
        { 
          DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error));
          DBUG_RETURN(error);
        }
        thd->ha_data[federated_hton.slot]= this;
        trans_register_ha(thd, TRUE, &federated_hton);
        /*
          Send a lock table to the remote end.
          We do not support this at the moment
        */
        if (thd->options & (OPTION_TABLE_LOCK))
        {
          DBUG_PRINT("info", ("We do not support lock table yet"));
        }
      }
      else
      {
        ha_federated *ptr;
        for (ptr= trx; ptr; ptr= ptr->trx_next)
          if (ptr == this)
            break;
          else if (!ptr->trx_next)
            ptr->trx_next= this;
      }
    }
  }
  DBUG_RETURN(0);
}


static int federated_commit(THD *thd, bool all)
{
  int return_val= 0;
  ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
  DBUG_ENTER("federated_commit");

  if (all)
  {
    int error= 0;
    ha_federated *ptr, *old= NULL;
    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
    {
      if (old)
        old->trx_next= NULL;
      error= ptr->connection_commit();
      if (error && !return_val);
        return_val= error;
    }
    thd->ha_data[federated_hton.slot]= NULL;
  }

  DBUG_PRINT("info", ("error val: %d", return_val));
  DBUG_RETURN(return_val);
}


static int federated_rollback(THD *thd, bool all)
{
  int return_val= 0;
  ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
  DBUG_ENTER("federated_rollback");

  if (all)
  {
    int error= 0;
    ha_federated *ptr, *old= NULL;
    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
    {
      if (old)
        old->trx_next= NULL;
      error= ptr->connection_rollback();
      if (error && !return_val)
        return_val= error;
    }
    thd->ha_data[federated_hton.slot]= NULL;
  }

  DBUG_PRINT("info", ("error val: %d", return_val));
  DBUG_RETURN(return_val);
}

int ha_federated::connection_commit()
{
  DBUG_ENTER("ha_federated::connection_commit");
  DBUG_RETURN(execute_simple_query("COMMIT", 6));
}


int ha_federated::connection_rollback()
{
  DBUG_ENTER("ha_federated::connection_rollback");
  DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
}


int ha_federated::connection_autocommit(bool state)
{
  const char *text;
  DBUG_ENTER("ha_federated::connection_autocommit");
  text= (state == true) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
  DBUG_RETURN(execute_simple_query(text, 16));
2833
}
2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846


int ha_federated::execute_simple_query(const char *query, int len)
{
  DBUG_ENTER("ha_federated::execute_simple_query");

  if (mysql_real_query(mysql, query, len))
  {
    DBUG_RETURN(stash_remote_error());
  }
  DBUG_RETURN(0);
}

2847 2848
struct st_mysql_storage_engine federated_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION, &federated_hton };
acurtis@xiphis.org's avatar
acurtis@xiphis.org committed
2849 2850 2851 2852

mysql_declare_plugin(federated)
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
2853 2854
  &federated_storage_engine,
  "FEDERATED",
acurtis@xiphis.org's avatar
acurtis@xiphis.org committed
2855
  "Patrick Galbraith and Brian Aker, MySQL AB",
2856 2857
  "Federated MySQL storage engine",
  federated_db_init, /* Plugin Init */
acurtis@xiphis.org's avatar
acurtis@xiphis.org committed
2858 2859
  NULL, /* Plugin Deinit */
  0x0100 /* 1.0 */,
2860
  0
acurtis@xiphis.org's avatar
acurtis@xiphis.org committed
2861 2862 2863 2864
}
mysql_declare_plugin_end;

#endif