ha_federated.cc 104 KB
Newer Older
1
/* Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
2 3 4

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
  the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
14
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
15 16 17 18 19 20 21 22

/*

  MySQL Federated Storage Engine

  ha_federated.cc - MySQL Federated Storage Engine
  Patrick Galbraith and Brian Aker, 2004

unknown's avatar
unknown committed
23
  This is a handler which uses a foreign database as the data file, as
24 25 26 27
  opposed to a handler like MyISAM, which uses .MYD files locally.

  How this handler works
  ----------------------------------
unknown's avatar
unknown committed
28 29
  Normal database files are local and as such: You create a table called
  'users', a file such as 'users.MYD' is created. A handler reads, inserts,
30 31
  deletes, updates data in this file. The data is stored in particular format,
  so to read, that data has to be parsed into fields, to write, fields have to
unknown's avatar
unknown committed
32
  be stored in this format to write to this data file.
33

unknown's avatar
unknown committed
34 35 36 37 38 39 40 41
  With MySQL Federated storage engine, there will be no local files
  for each table's data (such as .MYD). A foreign database will store
  the data that would normally be in this file. This will necessitate
  the use of MySQL client API to read, delete, update, insert this
  data. The data will have to be retrieve via an SQL call "SELECT *
  FROM users". Then, to read this data, it will have to be retrieved
  via mysql_fetch_row one row at a time, then converted from the
  column in this select into the format that the handler expects.
42

unknown's avatar
unknown committed
43 44
  The create table will simply create the .frm file, and within the
  "CREATE TABLE" SQL, there SHALL be any of the following :
45

46 47 48 49 50 51 52 53 54 55 56 57
  connection=scheme://username:password@hostname:port/database/tablename
  connection=scheme://username@hostname/database/tablename
  connection=scheme://username:password@hostname/database/tablename
  connection=scheme://username:password@hostname/database/tablename

  - OR -

  As of 5.1 (See worklog #3031), federated now allows you to use a non-url
  format, taking advantage of mysql.servers:

  connection="connection_one"
  connection="connection_one/table_foo"
58 59 60

  An example would be:

61
  connection=mysql://username:password@hostname:port/database/tablename
62

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  or, if we had:

  create server 'server_one' foreign data wrapper 'mysql' options
  (HOST '127.0.0.1',
  DATABASE 'db1',
  USER 'root',
  PASSWORD '',
  PORT 3306,
  SOCKET '',
  OWNER 'root');

  CREATE TABLE federated.t1 (
    `id` int(20) NOT NULL,
    `name` varchar(64) NOT NULL default ''
    )
  ENGINE="FEDERATED" DEFAULT CHARSET=latin1
  CONNECTION='server_one';

  So, this will have been the equivalent of
82

83
  CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
84

85
  Then, we can also change the server to point to a new schema:
86

87 88 89 90 91 92 93 94
  ALTER SERVER 'server_one' options(DATABASE 'db2');

  All subsequent calls will now be against db2.t1! Guess what? You don't
  have to perform an alter table!

  This connecton="connection string" is necessary for the handler to be
  able to connect to the foreign server, either by URL, or by server
  name. 
95 96 97 98 99


  The basic flow is this:

  SQL calls issues locally ->
unknown's avatar
unknown committed
100 101 102
  mysql handler API (data in handler format) ->
  mysql client API (data converted to SQL calls) ->
  foreign database -> mysql client API ->
103 104 105 106 107
  convert result sets (if any) to handler format ->
  handler API -> results or rows affected to local

  What this handler does and doesn't support
  ------------------------------------------
unknown's avatar
unknown committed
108 109
  * Tables MUST be created on the foreign server prior to any action on those
    tables via the handler, first version. IMPORTANT: IF you MUST use the
110
    federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
unknown's avatar
unknown committed
111
    the table you connect to IS NOT a table pointing BACK to your ORIGNAL
112
    table! You know  and have heard the screaching of audio feedback? You
unknown's avatar
unknown committed
113
    know putting two mirror in front of each other how the reflection
114 115
    continues for eternity? Well, need I say more?!
  * There will not be support for transactions.
unknown's avatar
unknown committed
116
  * There is no way for the handler to know if the foreign database or table
117
    has changed. The reason for this is that this database has to work like a
unknown's avatar
unknown committed
118 119 120
    data file that would never be written to by anything other than the
    database. The integrity of the data in the local table could be breached
    if there was any change to the foreign database.
121
  * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
122
  * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
unknown's avatar
unknown committed
123
  * Prepared statements will not be used in the first implementation, it
124
    remains to to be seen whether the limited subset of the client API for the
125
    server supports this.
unknown's avatar
unknown committed
126 127
  * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
    implementation.
128
  * This will not work with the query cache.
129 130 131 132 133 134 135

   Method calls

   A two column table, with one record:

   (SELECT)

unknown's avatar
unknown committed
136
   "SELECT * FROM foo"
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
    ha_federated::info
    ha_federated::scan_time:
    ha_federated::rnd_init: share->select_query SELECT * FROM foo
    ha_federated::extra

    <for every row of data retrieved>
    ha_federated::rnd_next
    ha_federated::convert_row_to_internal_format
    ha_federated::rnd_next
    </for every row of data retrieved>

    ha_federated::rnd_end
    ha_federated::extra
    ha_federated::reset

    (INSERT)

    "INSERT INTO foo (id, ts) VALUES (2, now());"

    ha_federated::write_row

    ha_federated::reset

    (UPDATE)

    "UPDATE foo SET ts = now() WHERE id = 1;"

    ha_federated::index_init
    ha_federated::index_read
    ha_federated::index_read_idx
    ha_federated::rnd_next
    ha_federated::convert_row_to_internal_format
    ha_federated::update_row
unknown's avatar
unknown committed
170

171 172 173 174 175 176 177 178 179 180
    ha_federated::extra
    ha_federated::extra
    ha_federated::extra
    ha_federated::external_lock
    ha_federated::reset


    How do I use this handler?
    --------------------------
    First of all, you need to build this storage engine:
unknown's avatar
unknown committed
181

182 183 184
      ./configure --with-federated-storage-engine
      make

unknown's avatar
unknown committed
185
    Next, to use this handler, it's very simple. You must
186 187 188
    have two databases running, either both on the same host, or
    on different hosts.

unknown's avatar
unknown committed
189
    One the server that will be connecting to the foreign
190 191
    host (client), you create your table as such:

unknown's avatar
unknown committed
192
    CREATE TABLE test_table (
193 194 195 196 197
      id     int(20) NOT NULL auto_increment,
      name   varchar(32) NOT NULL default '',
      other  int(20) NOT NULL default '0',
      PRIMARY KEY  (id),
      KEY name (name),
unknown's avatar
unknown committed
198 199 200
      KEY other_key (other))
       ENGINE="FEDERATED"
       DEFAULT CHARSET=latin1
201
       CONNECTION='mysql://root@127.0.0.1:9306/federated/test_federated';
202

unknown's avatar
unknown committed
203 204 205 206 207 208 209 210
   Notice the "COMMENT" and "ENGINE" field? This is where you
   respectively set the engine type, "FEDERATED" and foreign
   host information, this being the database your 'client' database
   will connect to and use as the "data file". Obviously, the foreign
   database is running on port 9306, so you want to start up your other
   database so that it is indeed on port 9306, and your federated
   database on a port other than that. In my setup, I use port 5554
   for federated, and port 5555 for the foreign database.
211

unknown's avatar
unknown committed
212
   Then, on the foreign database:
213

unknown's avatar
unknown committed
214
   CREATE TABLE test_table (
215 216 217 218 219
     id     int(20) NOT NULL auto_increment,
     name   varchar(32) NOT NULL default '',
     other  int(20) NOT NULL default '0',
     PRIMARY KEY  (id),
     KEY name (name),
unknown's avatar
unknown committed
220
     KEY other_key (other))
221 222 223 224
     ENGINE="<NAME>" <-- whatever you want, or not specify
     DEFAULT CHARSET=latin1 ;

    This table is exactly the same (and must be exactly the same),
unknown's avatar
unknown committed
225
    except that it is not using the federated handler and does
226
    not need the URL.
unknown's avatar
unknown committed
227

228 229 230 231 232 233 234

    How to see the handler in action
    --------------------------------

    When developing this handler, I compiled the federated database with
    debugging:

unknown's avatar
unknown committed
235
    ./configure --with-federated-storage-engine
236 237 238 239
    --prefix=/home/mysql/mysql-build/federated/ --with-debug

    Once compiled, I did a 'make install' (not for the purpose of installing
    the binary, but to install all the files the binary expects to see in the
unknown's avatar
unknown committed
240 241 242 243
    diretory I specified in the build with --prefix,
    "/home/mysql/mysql-build/federated".

    Then, I started the foreign server:
244

unknown's avatar
unknown committed
245
    /usr/local/mysql/bin/mysqld_safe
246 247 248 249 250 251 252 253 254 255 256 257 258
    --user=mysql --log=/tmp/mysqld.5555.log -P 5555

    Then, I went back to the directory containing the newly compiled mysqld,
    <builddir>/sql/, started up gdb:

    gdb ./mysqld

    Then, withn the (gdb) prompt:
    (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug

    Next, I open several windows for each:

    1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
unknown's avatar
unknown committed
259
    2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
260 261 262
    3. A window with a client open to the federated server on port 5554
    4. A window with a client open to the federated server on port 5555

unknown's avatar
unknown committed
263
    I would create a table on the client to the foreign server on port
264 265
    5555, and then to the federated server on port 5554. At this point,
    I would run whatever queries I wanted to on the federated server,
unknown's avatar
unknown committed
266
    just always remembering that whatever changes I wanted to make on
267
    the table, or if I created new tables, that I would have to do that
unknown's avatar
unknown committed
268 269 270
    on the foreign server.

    Another thing to look for is 'show variables' to show you that you have
271 272 273 274 275 276 277 278 279 280 281 282 283 284
    support for federated handler support:

    show variables like '%federat%'

    and:

    show storage engines;

    Both should display the federated storage handler.


    Testing
    -------

unknown's avatar
unknown committed
285
    There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
286 287 288
    federatedd.test It starts both a slave and master database using
    the same setup that the replication tests use, with the exception that
    it turns off replication, and sets replication to ignore the test tables.
unknown's avatar
unknown committed
289 290
    After ensuring that you actually do have support for the federated storage
    handler, numerous queries/inserts/updates/deletes are run, many derived
291 292 293 294
    from the MyISAM tests, plus som other tests which were meant to reveal
    any issues that would be most likely to affect this handler. All tests
    should work! ;)

unknown's avatar
unknown committed
295
    To run these tests, go into ./mysql-test (based in the directory you
296 297
    built the server in)

298
    ./mysql-test-run federated
unknown's avatar
unknown committed
299

300 301 302 303
    To run the test, or if you want to run the test and have debug info:

    ./mysql-test-run --debug federated

unknown's avatar
unknown committed
304
    This will run the test in debug mode, and you can view the trace and
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    log files in the ./mysql-test/var/log directory

    ls -l mysql-test/var/log/
    -rw-r--r--  1 patg  patg        17  4 Dec 12:27 current_test
    -rw-r--r--  1 patg  patg       692  4 Dec 12:52 manager.log
    -rw-rw----  1 patg  patg     21246  4 Dec 12:51 master-bin.000001
    -rw-rw----  1 patg  patg        68  4 Dec 12:28 master-bin.index
    -rw-r--r--  1 patg  patg      1620  4 Dec 12:51 master.err
    -rw-rw----  1 patg  patg     23179  4 Dec 12:51 master.log
    -rw-rw----  1 patg  patg  16696550  4 Dec 12:51 master.trace
    -rw-r--r--  1 patg  patg         0  4 Dec 12:28 mysqltest-time
    -rw-r--r--  1 patg  patg   2024051  4 Dec 12:51 mysqltest.trace
    -rw-rw----  1 patg  patg     94992  4 Dec 12:51 slave-bin.000001
    -rw-rw----  1 patg  patg        67  4 Dec 12:28 slave-bin.index
    -rw-rw----  1 patg  patg       249  4 Dec 12:52 slave-relay-bin.000003
    -rw-rw----  1 patg  patg        73  4 Dec 12:28 slave-relay-bin.index
    -rw-r--r--  1 patg  patg      1349  4 Dec 12:51 slave.err
    -rw-rw----  1 patg  patg     96206  4 Dec 12:52 slave.log
    -rw-rw----  1 patg  patg  15706355  4 Dec 12:51 slave.trace
    -rw-r--r--  1 patg  patg         0  4 Dec 12:51 warnings

    Of course, again, you can tail the trace log:

unknown's avatar
unknown committed
328
    tail -f mysql-test/var/log/master.trace |grep ha_fed
329 330

    As well as the slave query log:
unknown's avatar
unknown committed
331

332 333 334 335 336 337 338 339 340 341 342 343 344
    tail -f mysql-test/var/log/slave.log

    Files that comprise the test suit
    ---------------------------------
    mysql-test/t/federated.test
    mysql-test/r/federated.result
    mysql-test/r/have_federated_db.require
    mysql-test/include/have_federated_db.inc


    Other tidbits
    -------------

unknown's avatar
unknown committed
345
    These were the files that were modified or created for this
346
    Federated handler to work, in 5.0:
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362

    ./configure.in
    ./sql/Makefile.am
    ./config/ac_macros/ha_federated.m4
    ./sql/handler.cc
    ./sql/mysqld.cc
    ./sql/set_var.cc
    ./sql/field.h
    ./sql/sql_string.h
    ./mysql-test/mysql-test-run(.sh)
    ./mysql-test/t/federated.test
    ./mysql-test/r/federated.result
    ./mysql-test/r/have_federated_db.require
    ./mysql-test/include/have_federated_db.inc
    ./sql/ha_federated.cc
    ./sql/ha_federated.h
unknown's avatar
unknown committed
363

364 365 366 367 368 369 370
    In 5.1

    my:~/mysql-build/mysql-5.1-bkbits patg$ ls storage/federated/
    CMakeLists.txt                  Makefile.in                     ha_federated.h                  plug.in
    Makefile                        SCCS                            libfederated.a
    Makefile.am                     ha_federated.cc                 libfederated_a-ha_federated.o

unknown's avatar
unknown committed
371
*/
372

373

374
#define MYSQL_SERVER 1
375 376 377 378
#include "sql_priv.h"
#include "sql_servers.h"         // FOREIGN_SERVER, get_server_by_name
#include "sql_class.h"           // SSV
#include "sql_analyse.h"         // append_escaped
379 380
#include <mysql/plugin.h>

381
#ifdef USE_PRAGMA_IMPLEMENTATION
unknown's avatar
unknown committed
382
#pragma implementation                          // gcc: Class implementation
383 384 385
#endif

#include "ha_federated.h"
386
#include "probes_mysql.h"
387 388

#include "m_string.h"
389
#include "key.h"                                // key_copy
unknown's avatar
unknown committed
390 391 392

#include <mysql/plugin.h>

393
/* Variables for federated share methods */
unknown's avatar
unknown committed
394
static HASH federated_open_tables;              // To track open tables
Marc Alff's avatar
Marc Alff committed
395
mysql_mutex_t federated_mutex;                // To init the hash
unknown's avatar
unknown committed
396 397 398 399
static char ident_quote_char= '`';              // Character for quoting
                                                // identifiers
static char value_quote_char= '\'';             // Character for quoting
                                                // literals
unknown's avatar
unknown committed
400
static const int bulk_padding= 64;              // bytes "overhead" in packet
401

402 403 404 405 406 407
/* Variables used when chopping off trailing characters */
static const uint sizeof_trailing_comma= sizeof(", ") - 1;
static const uint sizeof_trailing_closeparen= sizeof(") ") - 1;
static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;

408
/* Static declaration for handerton */
409 410
static handler *federated_create_handler(handlerton *hton,
                                         TABLE_SHARE *table,
411
                                         MEM_ROOT *mem_root);
412 413
static int federated_commit(handlerton *hton, THD *thd, bool all);
static int federated_rollback(handlerton *hton, THD *thd, bool all);
414

415
/* Federated storage engine handlerton */
416

417 418
static handler *federated_create_handler(handlerton *hton, 
                                         TABLE_SHARE *table,
419
                                         MEM_ROOT *mem_root)
420
{
421
  return new (mem_root) ha_federated(hton, table);
422 423 424
}


unknown's avatar
unknown committed
425
/* Function we use in the creation of our hash to get key */
unknown's avatar
unknown committed
426

427
static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length,
428
                                my_bool not_used __attribute__ ((unused)))
429
{
unknown's avatar
unknown committed
430
  *length= share->share_key_length;
431
  return (uchar*) share->share_key;
432 433
}

Marc Alff's avatar
Marc Alff committed
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
#ifdef HAVE_PSI_INTERFACE
static PSI_mutex_key fe_key_mutex_federated, fe_key_mutex_FEDERATED_SHARE_mutex;

static PSI_mutex_info all_federated_mutexes[]=
{
  { &fe_key_mutex_federated, "federated", PSI_FLAG_GLOBAL},
  { &fe_key_mutex_FEDERATED_SHARE_mutex, "FEDERATED_SHARE::mutex", 0}
};

static void init_federated_psi_keys(void)
{
  const char* category= "federated";
  int count;

  if (PSI_server == NULL)
    return;

  count= array_elements(all_federated_mutexes);
  PSI_server->register_mutex(category, all_federated_mutexes, count);
}
#endif /* HAVE_PSI_INTERFACE */

unknown's avatar
unknown committed
456 457 458 459 460
/*
  Initialize the federated handler.

  SYNOPSIS
    federated_db_init()
unknown's avatar
unknown committed
461
    p		Handlerton
unknown's avatar
unknown committed
462 463 464 465 466 467

  RETURN
    FALSE       OK
    TRUE        Error
*/

468
int federated_db_init(void *p)
unknown's avatar
unknown committed
469
{
470
  DBUG_ENTER("federated_db_init");
Marc Alff's avatar
Marc Alff committed
471 472 473 474 475

#ifdef HAVE_PSI_INTERFACE
  init_federated_psi_keys();
#endif /* HAVE_PSI_INTERFACE */

476
  handlerton *federated_hton= (handlerton *)p;
477 478 479 480 481
  federated_hton->state= SHOW_OPTION_YES;
  federated_hton->db_type= DB_TYPE_FEDERATED_DB;
  federated_hton->commit= federated_commit;
  federated_hton->rollback= federated_rollback;
  federated_hton->create= federated_create_handler;
482
  federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION;
unknown's avatar
unknown committed
483

unknown's avatar
unknown committed
484 485 486 487 488 489 490
  /*
    Support for transactions disabled until WL#2952 fixes it.
	We do it like this to avoid "defined but not used" compiler warnings.
  */
  federated_hton->commit= 0;
  federated_hton->rollback= 0;

Marc Alff's avatar
Marc Alff committed
491 492
  if (mysql_mutex_init(fe_key_mutex_federated,
                       &federated_mutex, MY_MUTEX_INIT_FAST))
493
    goto error;
Konstantin Osipov's avatar
Konstantin Osipov committed
494 495
  if (!my_hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
                    (my_hash_get_key) federated_get_key, 0, 0))
496 497 498
  {
    DBUG_RETURN(FALSE);
  }
unknown's avatar
unknown committed
499

Marc Alff's avatar
Marc Alff committed
500
  mysql_mutex_destroy(&federated_mutex);
501 502
error:
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
503 504 505 506 507 508 509 510 511 512 513 514 515
}


/*
  Release the federated handler.

  SYNOPSIS
    federated_db_end()

  RETURN
    FALSE       OK
*/

516
int federated_done(void *p)
unknown's avatar
unknown committed
517
{
Konstantin Osipov's avatar
Konstantin Osipov committed
518
  my_hash_free(&federated_open_tables);
Marc Alff's avatar
Marc Alff committed
519
  mysql_mutex_destroy(&federated_mutex);
520

521
  return 0;
522 523
}

unknown's avatar
unknown committed
524

unknown's avatar
unknown committed
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
/**
  @brief Append identifiers to the string.

  @param[in,out] string	The target string.
  @param[in] name 		Identifier name
  @param[in] length 	Length of identifier name in bytes
  @param[in] quote_char Quote char to use for quoting identifier.

  @return Operation Status
  @retval FALSE OK
  @retval TRUE  There was an error appending to the string.

  @note This function is based upon the append_identifier() function
        in sql_show.cc except that quoting always occurs.
*/

541
static bool append_ident(String *string, const char *name, size_t length,
unknown's avatar
unknown committed
542 543 544 545 546 547 548 549 550
                         const char quote_char)
{
  bool result;
  uint clen;
  const char *name_end;
  DBUG_ENTER("append_ident");

  if (quote_char)
  {
551
    string->reserve((uint) length * 2 + 2);
unknown's avatar
unknown committed
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
    if ((result= string->append(&quote_char, 1, system_charset_info)))
      goto err;

    for (name_end= name+length; name < name_end; name+= clen)
    {
      uchar c= *(uchar *) name;
      if (!(clen= my_mbcharlen(system_charset_info, c)))
        clen= 1;
      if (clen == 1 && c == (uchar) quote_char &&
          (result= string->append(&quote_char, 1, system_charset_info)))
        goto err;
      if ((result= string->append(name, clen, string->charset())))
        goto err;
    }
    result= string->append(&quote_char, 1, system_charset_info);
  }
  else
569
    result= string->append(name, (uint) length, system_charset_info);
unknown's avatar
unknown committed
570 571 572 573 574 575

err:
  DBUG_RETURN(result);
}


576 577
static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
{
578
  char buf[FEDERATED_QUERY_BUFFER_SIZE];
579
  size_t buf_len;
580
  DBUG_ENTER("ha_federated parse_url_error");
581 582 583 584

  buf_len= min(table->s->connect_string.length,
               FEDERATED_QUERY_BUFFER_SIZE-1);
  strmake(buf, table->s->connect_string.str, buf_len);
585 586 587
  my_error(error_num, MYF(0), buf);
  DBUG_RETURN(error_num);
}
588

unknown's avatar
unknown committed
589 590 591 592 593
/*
  retrieve server object which contains server meta-data 
  from the system table given a server's name, set share
  connection parameter members
*/
594
int get_connection(MEM_ROOT *mem_root, FEDERATED_SHARE *share)
unknown's avatar
unknown committed
595 596
{
  int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
597
  FOREIGN_SERVER *server, server_buffer;
unknown's avatar
unknown committed
598 599
  DBUG_ENTER("ha_federated::get_connection");

600 601 602 603
  /*
    get_server_by_name() clones the server if exists and allocates
	copies of strings in the supplied mem_root
  */
unknown's avatar
unknown committed
604
  if (!(server=
605
       get_server_by_name(mem_root, share->connection_string, &server_buffer)))
unknown's avatar
unknown committed
606 607 608 609 610 611
  {
    DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
    /* need to come up with error handling */
    error_num=1;
    goto error;
  }
612 613
  DBUG_PRINT("info", ("get_server_by_name returned server at %lx",
                      (long unsigned int) server));
unknown's avatar
unknown committed
614 615 616 617 618 619 620 621

  /*
    Most of these should never be empty strings, error handling will
    need to be implemented. Also, is this the best way to set the share
    members? Is there some allocation needed? In running this code, it works
    except there are errors in the trace file of the share being overrun 
    at the address of the share.
  */
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
  share->server_name_length= server->server_name_length;
  share->server_name= server->server_name;
  share->username= server->username;
  share->password= server->password;
  share->database= server->db;
#ifndef I_AM_PARANOID
  share->port= server->port > 0 && server->port < 65536 ? 
#else
  share->port= server->port > 1023 && server->port < 65536 ? 
#endif
               (ushort) server->port : MYSQL_PORT;
  share->hostname= server->host;
  if (!(share->socket= server->socket) &&
      !strcmp(share->hostname, my_localhost))
    share->socket= (char *) MYSQL_UNIX_ADDR;
  share->scheme= server->scheme;
unknown's avatar
unknown committed
638 639 640 641 642 643

  DBUG_PRINT("info", ("share->username %s", share->username));
  DBUG_PRINT("info", ("share->password %s", share->password));
  DBUG_PRINT("info", ("share->hostname %s", share->hostname));
  DBUG_PRINT("info", ("share->database %s", share->database));
  DBUG_PRINT("info", ("share->port %d", share->port));
unknown's avatar
unknown committed
644
  DBUG_PRINT("info", ("share->socket %s", share->socket));
unknown's avatar
unknown committed
645 646 647
  DBUG_RETURN(0);

error:
648 649
  my_printf_error(error_num, "server name: '%s' doesn't exist!",
                  MYF(0), share->connection_string);
unknown's avatar
unknown committed
650 651
  DBUG_RETURN(error_num);
}
652

653
/*
654
  Parse connection info from table->s->connect_string
655 656 657

  SYNOPSIS
    parse_url()
658
    mem_root            MEM_ROOT pointer for memory allocation
unknown's avatar
unknown committed
659 660 661
    share               pointer to FEDERATED share
    table               pointer to current TABLE class
    table_create_flag   determines what error to throw
662

663
  DESCRIPTION
unknown's avatar
unknown committed
664
    Populates the share with information about the connection
unknown's avatar
unknown committed
665
    to the foreign database that will serve as the data source.
unknown's avatar
unknown committed
666
    This string must be specified (currently) in the "CONNECTION" field,
667
    listed in the CREATE TABLE statement.
668 669 670

    This string MUST be in the format of any of these:

unknown's avatar
unknown committed
671 672 673 674 675 676 677 678 679 680
    CONNECTION="scheme://username:password@hostname:port/database/table"
    CONNECTION="scheme://username@hostname/database/table"
    CONNECTION="scheme://username@hostname:port/database/table"
    CONNECTION="scheme://username:password@hostname/database/table"

    _OR_

    CONNECTION="connection name"

    
681 682 683

  An Example:

unknown's avatar
unknown committed
684 685 686 687 688 689 690 691 692
  CREATE TABLE t1 (id int(32))
    ENGINE="FEDERATED"
    CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federated/testtable";

  CREATE TABLE t2 (
    id int(4) NOT NULL auto_increment,
    name varchar(32) NOT NULL,
    PRIMARY KEY(id)
    ) ENGINE="FEDERATED" CONNECTION="my_conn";
693 694

  ***IMPORTANT***
unknown's avatar
unknown committed
695 696 697 698
  Currently, the Federated Storage Engine only supports connecting to another
  MySQL Database ("scheme" of "mysql"). Connections using JDBC as well as 
  other connectors are in the planning stage.
  
699

unknown's avatar
unknown committed
700
  'password' and 'port' are both optional.
701 702

  RETURN VALUE
703 704
    0           success
    error_num   particular error code 
705 706

*/
unknown's avatar
unknown committed
707

708
static int parse_url(MEM_ROOT *mem_root, FEDERATED_SHARE *share, TABLE *table,
unknown's avatar
unknown committed
709
                     uint table_create_flag)
710
{
711 712 713
  uint error_num= (table_create_flag ?
                   ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
                   ER_FOREIGN_DATA_STRING_INVALID);
714
  DBUG_ENTER("ha_federated::parse_url");
715

716
  share->port= 0;
unknown's avatar
unknown committed
717
  share->socket= 0;
unknown's avatar
unknown committed
718
  DBUG_PRINT("info", ("share at %lx", (long unsigned int) share));
719 720
  DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length));
  DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length,
721
                      table->s->connect_string.str));
722 723
  share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
                                       table->s->connect_string.length);
unknown's avatar
unknown committed
724

unknown's avatar
unknown committed
725 726
  DBUG_PRINT("info",("parse_url alloced share->connection_string %lx",
                     (long unsigned int) share->connection_string));
unknown's avatar
unknown committed
727 728

  DBUG_PRINT("info",("share->connection_string %s",share->connection_string));
729 730 731 732 733 734
  /*
    No :// or @ in connection string. Must be a straight connection name of
    either "servername" or "servername/tablename"
  */
  if ( (!strstr(share->connection_string, "://") &&
       (!strchr(share->connection_string, '@'))))
unknown's avatar
unknown committed
735
  {
736

unknown's avatar
unknown committed
737 738
    DBUG_PRINT("info",
               ("share->connection_string %s internal format \
unknown's avatar
unknown committed
739 740 741
                share->connection_string %lx",
                share->connection_string,
                (long unsigned int) share->connection_string));
742

743
    /* ok, so we do a little parsing, but not completely! */
unknown's avatar
unknown committed
744
    share->parsed= FALSE;
745 746 747 748 749 750 751 752 753
    /*
      If there is a single '/' in the connection string, this means the user is
      specifying a table name
    */

    if ((share->table_name= strchr(share->connection_string, '/')))
    {
      share->connection_string[share->table_name - share->connection_string]= '\0';
      share->table_name++;
754
      share->table_name_length= (uint) strlen(share->table_name);
unknown's avatar
unknown committed
755

756 757 758 759 760 761 762 763 764 765 766 767
      DBUG_PRINT("info", 
                 ("internal format, parsed table_name share->connection_string \
                  %s share->table_name %s", 
                  share->connection_string, share->table_name));

      /*
        there better not be any more '/'s !
      */
      if (strchr(share->table_name, '/'))
        goto error;

    }
768
    /*
769 770
      otherwise, straight server name, use tablename of federated table
      as remote table name
771
    */
772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
    else
    {
      /*
        connection specifies everything but, resort to
        expecting remote and foreign table names to match
      */
      share->table_name= strmake_root(mem_root, table->s->table_name.str,
                                      (share->table_name_length= table->s->table_name.length));
      DBUG_PRINT("info", 
                 ("internal format, default table_name share->connection_string \
                  %s share->table_name %s", 
                  share->connection_string, share->table_name));
    }

    if ((error_num= get_connection(mem_root, share)))
      goto error;
788 789
  }
  else
unknown's avatar
unknown committed
790 791 792 793 794
  {
    share->parsed= TRUE;
    // Add a null for later termination of table name
    share->connection_string[table->s->connect_string.length]= 0;
    share->scheme= share->connection_string;
unknown's avatar
unknown committed
795 796
    DBUG_PRINT("info",("parse_url alloced share->scheme %lx",
                       (long unsigned int) share->scheme));
797

unknown's avatar
unknown committed
798 799 800 801 802 803 804
    /*
      remove addition of null terminator and store length
      for each string  in share
    */
    if (!(share->username= strstr(share->scheme, "://")))
      goto error;
    share->scheme[share->username - share->scheme]= '\0';
805

unknown's avatar
unknown committed
806 807
    if (strcmp(share->scheme, "mysql") != 0)
      goto error;
808

unknown's avatar
unknown committed
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
    share->username+= 3;

    if (!(share->hostname= strchr(share->username, '@')))
      goto error;

    share->username[share->hostname - share->username]= '\0';
    share->hostname++;

    if ((share->password= strchr(share->username, ':')))
    {
      share->username[share->password - share->username]= '\0';
      share->password++;
      share->username= share->username;
      /* make sure there isn't an extra / or @ */
      if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
        goto error;
      /*
        Found that if the string is:
827 828 829
        user:@hostname:port/db/table
        Then password is a null string, so set to NULL
      */
unknown's avatar
unknown committed
830 831 832
      if ((share->password[0] == '\0'))
        share->password= NULL;
    }
833
    else
unknown's avatar
unknown committed
834
      share->username= share->username;
835

unknown's avatar
unknown committed
836 837 838
    /* make sure there isn't an extra / or @ */
    if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
      goto error;
unknown's avatar
unknown committed
839

unknown's avatar
unknown committed
840 841 842 843
    if (!(share->database= strchr(share->hostname, '/')))
      goto error;
    share->hostname[share->database - share->hostname]= '\0';
    share->database++;
844

unknown's avatar
unknown committed
845 846 847 848 849 850 851 852 853
    if ((share->sport= strchr(share->hostname, ':')))
    {
      share->hostname[share->sport - share->hostname]= '\0';
      share->sport++;
      if (share->sport[0] == '\0')
        share->sport= NULL;
      else
        share->port= atoi(share->sport);
    }
854

unknown's avatar
unknown committed
855 856 857 858
    if (!(share->table_name= strchr(share->database, '/')))
      goto error;
    share->database[share->table_name - share->database]= '\0';
    share->table_name++;
unknown's avatar
unknown committed
859

unknown's avatar
unknown committed
860 861 862 863 864 865
    share->table_name_length= strlen(share->table_name);

    /* make sure there's not an extra / */
    if ((strchr(share->table_name, '/')))
      goto error;

866 867 868 869 870 871 872
    /*
      If hostname is omitted, we set it to NULL. According to
      mysql_real_connect() manual:
      The value of host may be either a hostname or an IP address.
      If host is NULL or the string "localhost", a connection to the
      local host is assumed.
    */
unknown's avatar
unknown committed
873 874 875
    if (share->hostname[0] == '\0')
      share->hostname= NULL;
  }
unknown's avatar
unknown committed
876

877 878
  if (!share->port)
  {
879
    if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
unknown's avatar
unknown committed
880
      share->socket= (char*) MYSQL_UNIX_ADDR;
881
    else
882
      share->port= MYSQL_PORT;
883
  }
884 885

  DBUG_PRINT("info",
unknown's avatar
unknown committed
886
             ("scheme: %s  username: %s  password: %s \
unknown's avatar
unknown committed
887
               hostname: %s  port: %d  db: %s  tablename: %s",
888 889 890
              share->scheme, share->username, share->password,
              share->hostname, share->port, share->database,
              share->table_name));
unknown's avatar
unknown committed
891 892 893 894

  DBUG_RETURN(0);

error:
895
  DBUG_RETURN(parse_url_error(share, table, error_num));
896 897
}

898 899 900 901
/*****************************************************************************
** FEDERATED tables
*****************************************************************************/

902 903 904
ha_federated::ha_federated(handlerton *hton,
                           TABLE_SHARE *table_arg)
  :handler(hton, table_arg),
unknown's avatar
unknown committed
905
  mysql(0), stored_result(0)
906 907
{
  trx_next= 0;
unknown's avatar
unknown committed
908
  bzero(&bulk_insert, sizeof(bulk_insert));
909
}
910 911


unknown's avatar
unknown committed
912
/*
913 914 915 916
  Convert MySQL result set row to handler internal format

  SYNOPSIS
    convert_row_to_internal_format()
unknown's avatar
unknown committed
917
      record    Byte pointer to record
918
      row       MySQL result set row from fetchrow()
unknown's avatar
unknown committed
919
      result	Result set to use
920 921

  DESCRIPTION
unknown's avatar
unknown committed
922
    This method simply iterates through a row returned via fetchrow with
923 924
    values from a successful SELECT , and then stores each column's value
    in the field object via the field object pointer (pointing to the table's
unknown's avatar
unknown committed
925
    array of field object pointers). This is how the handler needs the data
926 927 928
    to be stored to then return results back to the user

  RETURN VALUE
unknown's avatar
unknown committed
929
    0   After fields have had field values stored from record
unknown's avatar
unknown committed
930
*/
unknown's avatar
unknown committed
931

932
uint ha_federated::convert_row_to_internal_format(uchar *record,
unknown's avatar
unknown committed
933 934
                                                  MYSQL_ROW row,
                                                  MYSQL_RES *result)
935
{
936 937
  ulong *lengths;
  Field **field;
938
  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
939 940
  DBUG_ENTER("ha_federated::convert_row_to_internal_format");

unknown's avatar
unknown committed
941
  lengths= mysql_fetch_lengths(result);
942

unknown's avatar
unknown committed
943
  for (field= table->field; *field; field++, row++, lengths++)
944
  {
945 946 947 948 949 950
    /*
      index variable to move us through the row at the
      same iterative step as the field
    */
    my_ptrdiff_t old_ptr;
    old_ptr= (my_ptrdiff_t) (record - table->record[0]);
unknown's avatar
unknown committed
951 952
    (*field)->move_field_offset(old_ptr);
    if (!*row)
953
    {
954
      (*field)->set_null();
955 956
      (*field)->reset();
    }
957
    else
958
    {
959 960 961 962 963
      if (bitmap_is_set(table->read_set, (*field)->field_index))
      {
        (*field)->set_notnull();
        (*field)->store(*row, *lengths, &my_charset_bin);
      }
964
    }
unknown's avatar
unknown committed
965
    (*field)->move_field_offset(-old_ptr);
966
  }
967
  dbug_tmp_restore_column_map(table->write_set, old_map);
968 969 970 971 972 973
  DBUG_RETURN(0);
}

static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
{
  DBUG_ENTER("emit_key_part_name");
unknown's avatar
unknown committed
974 975
  if (append_ident(to, part->field->field_name, 
                   strlen(part->field->field_name), ident_quote_char))
976 977 978 979 980 981
    DBUG_RETURN(1);                           // Out of memory
  DBUG_RETURN(0);
}

static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
                                  bool needs_quotes, bool is_like,
982
                                  const uchar *ptr, uint len)
983 984 985 986
{
  Field *field= part->field;
  DBUG_ENTER("emit_key_part_element");

987
  if (needs_quotes && to->append(STRING_WITH_LEN("'")))
988 989 990 991 992 993 994 995
    DBUG_RETURN(1);

  if (part->type == HA_KEYTYPE_BIT)
  {
    char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;

    *buf++= '0';
    *buf++= 'x';
unknown's avatar
unknown committed
996 997
    buf= octet2hex(buf, (char*) ptr, len);
    if (to->append((char*) buff, (uint)(buf - buff)))
998
      DBUG_RETURN(1);
999
  }
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
  else if (part->key_part_flag & HA_BLOB_PART)
  {
    String blob;
    uint blob_length= uint2korr(ptr);
    blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
                   blob_length, &my_charset_bin);
    if (append_escaped(to, &blob))
      DBUG_RETURN(1);
  }
  else if (part->key_part_flag & HA_VAR_LENGTH_PART)
  {
    String varchar;
    uint var_length= uint2korr(ptr);
    varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
                      var_length, &my_charset_bin);
    if (append_escaped(to, &varchar))
      DBUG_RETURN(1);
  }
  else
  {
    char strbuff[MAX_FIELD_WIDTH];
    String str(strbuff, sizeof(strbuff), part->field->charset()), *res;

1023
    res= field->val_str(&str, ptr);
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033

    if (field->result_type() == STRING_RESULT)
    {
      if (append_escaped(to, res))
        DBUG_RETURN(1);
    }
    else if (to->append(res->ptr(), res->length()))
      DBUG_RETURN(1);
  }

1034
  if (is_like && to->append(STRING_WITH_LEN("%")))
1035 1036
    DBUG_RETURN(1);

1037
  if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1038
    DBUG_RETURN(1);
1039 1040 1041 1042

  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
/*
  Create a WHERE clause based off of values in keys
  Note: This code was inspired by key_copy from key.cc

  SYNOPSIS
    create_where_from_key ()
      to          String object to store WHERE clause
      key_info    KEY struct pointer
      key         byte pointer containing key
      key_length  length of key
1053 1054
      range_type  0 - no range, 1 - min range, 2 - max range
                  (see enum range_operation)
unknown's avatar
unknown committed
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064

  DESCRIPTION
    Using iteration through all the keys via a KEY_PART_INFO pointer,
    This method 'extracts' the value of each key in the byte pointer
    *key, and for each key found, constructs an appropriate WHERE clause

  RETURN VALUE
    0   After all keys have been accounted for to create the WHERE clause
    1   No keys found

1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
    Range flags Table per Timour:

   -----------------
   - start_key:
     * ">"  -> HA_READ_AFTER_KEY
     * ">=" -> HA_READ_KEY_OR_NEXT
     * "="  -> HA_READ_KEY_EXACT

   - end_key:
     * "<"  -> HA_READ_BEFORE_KEY
     * "<=" -> HA_READ_AFTER_KEY

   records_in_range:
   -----------------
   - start_key:
     * ">"  -> HA_READ_AFTER_KEY
     * ">=" -> HA_READ_KEY_EXACT
     * "="  -> HA_READ_KEY_EXACT

   - end_key:
     * "<"  -> HA_READ_BEFORE_KEY
     * "<=" -> HA_READ_AFTER_KEY
     * "="  -> HA_READ_AFTER_KEY
unknown's avatar
unknown committed
1088

1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284
0 HA_READ_KEY_EXACT,              Find first record else error
1 HA_READ_KEY_OR_NEXT,            Record or next record
2 HA_READ_KEY_OR_PREV,            Record or previous
3 HA_READ_AFTER_KEY,              Find next rec. after key-record
4 HA_READ_BEFORE_KEY,             Find next rec. before key-record
5 HA_READ_PREFIX,                 Key which as same prefix
6 HA_READ_PREFIX_LAST,            Last key with the same prefix
7 HA_READ_PREFIX_LAST_OR_PREV,    Last or prev key with the same prefix

Flags that I've found:

id, primary key, varchar

id = 'ccccc'
records_in_range: start_key 0 end_key 3
read_range_first: start_key 0 end_key NULL

id > 'ccccc'
records_in_range: start_key 3 end_key NULL
read_range_first: start_key 3 end_key NULL

id < 'ccccc'
records_in_range: start_key NULL end_key 4
read_range_first: start_key NULL end_key 4

id <= 'ccccc'
records_in_range: start_key NULL end_key 3
read_range_first: start_key NULL end_key 3

id >= 'ccccc'
records_in_range: start_key 0 end_key NULL
read_range_first: start_key 1 end_key NULL

id like 'cc%cc'
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 'aaaaa' and id < 'ccccc'
records_in_range: start_key 3 end_key 4
read_range_first: start_key 3 end_key 4

id >= 'aaaaa' and id < 'ccccc';
records_in_range: start_key 0 end_key 4
read_range_first: start_key 1 end_key 4

id >= 'aaaaa' and id <= 'ccccc';
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 'aaaaa' and id <= 'ccccc';
records_in_range: start_key 3 end_key 3
read_range_first: start_key 3 end_key 3

numeric keys:

id = 4
index_read_idx: start_key 0 end_key NULL 

id > 4
records_in_range: start_key 3 end_key NULL
read_range_first: start_key 3 end_key NULL

id >= 4
records_in_range: start_key 0 end_key NULL
read_range_first: start_key 1 end_key NULL

id < 4
records_in_range: start_key NULL end_key 4
read_range_first: start_key NULL end_key 4

id <= 4
records_in_range: start_key NULL end_key 3
read_range_first: start_key NULL end_key 3

id like 4
full table scan, select * from

id > 2 and id < 8
records_in_range: start_key 3 end_key 4
read_range_first: start_key 3 end_key 4

id >= 2 and id < 8
records_in_range: start_key 0 end_key 4
read_range_first: start_key 1 end_key 4

id >= 2 and id <= 8
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 2 and id <= 8
records_in_range: start_key 3 end_key 3
read_range_first: start_key 3 end_key 3

multi keys (id int, name varchar, other varchar)

id = 1;
records_in_range: start_key 0 end_key 3
read_range_first: start_key 0 end_key NULL

id > 4;
id > 2 and name = '333'; remote: id > 2
id > 2 and name > '333'; remote: id > 2
id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
id >= 4 and name = 'eric was here' and other > 'eeee';
records_in_range: start_key 3 end_key NULL
read_range_first: start_key 3 end_key NULL

id >= 4;
id >= 2 and name = '333' and other < 'ddd';
remote: `id`  >= 2 AND `name`  >= '333';
records_in_range: start_key 0 end_key NULL
read_range_first: start_key 1 end_key NULL

id < 4;
id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
records_in_range: start_key NULL end_key 4
read_range_first: start_key NULL end_key 4

id <= 4;
records_in_range: start_key NULL end_key 3
read_range_first: start_key NULL end_key 3

id like 4;
full table scan

id  > 2 and id < 4;
records_in_range: start_key 3 end_key 4
read_range_first: start_key 3 end_key 4

id >= 2 and id < 4;
records_in_range: start_key 0 end_key 4
read_range_first: start_key 1 end_key 4

id >= 2 and id <= 4;
records_in_range: start_key 0 end_key 3
read_range_first: start_key 1 end_key 3

id > 2 and id <= 4;
id = 6 and name = 'eric was here' and other > 'eeee';
remote: (`id`  > 6 AND `name`  > 'eric was here' AND `other`  > 'eeee')
AND (`id`  <= 6) AND ( AND `name`  <= 'eric was here')
no results
records_in_range: start_key 3 end_key 3
read_range_first: start_key 3 end_key 3

Summary:

* If the start key flag is 0 the max key flag shouldn't even be set, 
  and if it is, the query produced would be invalid.
* Multipart keys, even if containing some or all numeric columns,
  are treated the same as non-numeric keys

  If the query is " = " (quotes or not):
  - records in range start key flag HA_READ_KEY_EXACT,
    end key flag HA_READ_AFTER_KEY (incorrect)
  - any other: start key flag HA_READ_KEY_OR_NEXT,
    end key flag HA_READ_AFTER_KEY (correct)

* 'like' queries (of key)
  - Numeric, full table scan
  - Non-numeric
      records_in_range: start_key 0 end_key 3
      other : start_key 1 end_key 3

* If the key flag is HA_READ_AFTER_KEY:
   if start_key, append >
   if end_key, append <=

* If create_where_key was called by records_in_range:

 - if the key is numeric:
    start key flag is 0 when end key is NULL, end key flag is 3 or 4
 - if create_where_key was called by any other function:
    start key flag is 1 when end key is NULL, end key flag is 3 or 4
 - if the key is non-numeric, or multipart
    When the query is an exact match, the start key flag is 0,
    end key flag is 3 for what should be a no-range condition where
    you should have 0 and max key NULL, which it is if called by
    read_range_first

Conclusion:

1. Need logic to determin if a key is min or max when the flag is
HA_READ_AFTER_KEY, and handle appending correct operator accordingly

2. Need a boolean flag to pass to create_where_from_key, used in the
switch statement. Add 1 to the flag if:
  - start key flag is HA_READ_KEY_EXACT and the end key is NULL

*/

bool ha_federated::create_where_from_key(String *to,
                                         KEY *key_info,
                                         const key_range *start_key,
                                         const key_range *end_key,
unknown's avatar
unknown committed
1285
                                         bool from_records_in_range,
1286
                                         bool eq_range_arg)
1287
{
1288
  bool both_not_null=
1289
    (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1290
  const uchar *ptr;
1291 1292 1293 1294
  uint remainder, length;
  char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
  String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
  const key_range *ranges[2]= { start_key, end_key };
1295
  my_bitmap_map *old_map;
1296
  DBUG_ENTER("ha_federated::create_where_from_key");
1297

1298 1299 1300
  tmp.length(0); 
  if (start_key == NULL && end_key == NULL)
    DBUG_RETURN(1);
1301

1302 1303
  old_map= dbug_tmp_use_all_columns(table, table->write_set);
  for (uint i= 0; i <= 1; i++)
1304 1305 1306 1307 1308
  {
    bool needs_quotes;
    KEY_PART_INFO *key_part;
    if (ranges[i] == NULL)
      continue;
1309

1310
    if (both_not_null)
1311
    {
1312
      if (i > 0)
1313
        tmp.append(STRING_WITH_LEN(") AND ("));
1314
      else
1315
        tmp.append(STRING_WITH_LEN(" ("));
1316
    }
1317

1318 1319 1320 1321 1322 1323
    for (key_part= key_info->key_part,
         remainder= key_info->key_parts,
         length= ranges[i]->length,
         ptr= ranges[i]->key; ;
         remainder--,
         key_part++)
1324
    {
1325 1326 1327
      Field *field= key_part->field;
      uint store_length= key_part->store_length;
      uint part_length= min(store_length, length);
1328
      needs_quotes= field->str_needs_quotes();
1329
      DBUG_DUMP("key, start of loop", ptr, length);
unknown's avatar
unknown committed
1330

1331 1332 1333 1334
      if (key_part->null_bit)
      {
        if (*ptr++)
        {
1335 1336 1337 1338 1339
          /*
            We got "IS [NOT] NULL" condition against nullable column. We
            distinguish between "IS NOT NULL" and "IS NULL" by flag. For
            "IS NULL", flag is set to HA_READ_KEY_EXACT.
          */
1340
          if (emit_key_part_name(&tmp, key_part) ||
unknown's avatar
unknown committed
1341 1342 1343
              (ranges[i]->flag == HA_READ_KEY_EXACT ?
               tmp.append(STRING_WITH_LEN(" IS NULL ")) :
               tmp.append(STRING_WITH_LEN(" IS NOT NULL "))))
1344
            goto err;
1345 1346 1347 1348 1349
          /*
            We need to adjust pointer and length to be prepared for next
            key part. As well as check if this was last key part.
          */
          goto prepare_for_next_key_part;
1350 1351
        }
      }
1352

1353
      if (tmp.append(STRING_WITH_LEN(" (")))
1354
        goto err;
1355

1356 1357
      switch (ranges[i]->flag) {
      case HA_READ_KEY_EXACT:
1358
        DBUG_PRINT("info", ("federated HA_READ_KEY_EXACT %d", i));
1359 1360 1361 1362 1363 1364
        if (store_length >= length ||
            !needs_quotes ||
            key_part->type == HA_KEYTYPE_BIT ||
            field->result_type() != STRING_RESULT)
        {
          if (emit_key_part_name(&tmp, key_part))
1365
            goto err;
1366

1367
          if (from_records_in_range)
1368
          {
1369
            if (tmp.append(STRING_WITH_LEN(" >= ")))
1370
              goto err;
1371 1372 1373
          }
          else
          {
1374
            if (tmp.append(STRING_WITH_LEN(" = ")))
1375
              goto err;
1376
          }
1377

1378 1379
          if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
                                    part_length))
1380
            goto err;
1381 1382 1383
        }
        else
        {
unknown's avatar
unknown committed
1384
          /* LIKE */
1385
          if (emit_key_part_name(&tmp, key_part) ||
1386
              tmp.append(STRING_WITH_LEN(" LIKE ")) ||
1387 1388
              emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
                                    part_length))
1389
            goto err;
1390 1391
        }
        break;
1392
      case HA_READ_AFTER_KEY:
1393
        if (eq_range_arg)
1394 1395 1396 1397 1398
        {
          if (tmp.append("1=1"))                // Dummy
            goto err;
          break;
        }
1399
        DBUG_PRINT("info", ("federated HA_READ_AFTER_KEY %d", i));
1400 1401 1402
        if (store_length >= length) /* end key */
        {
          if (emit_key_part_name(&tmp, key_part))
1403
            goto err;
1404 1405

          if (i > 0) /* end key */
1406
          {
1407
            if (tmp.append(STRING_WITH_LEN(" <= ")))
1408
              goto err;
1409
          }
1410
          else /* start key */
1411
          {
1412
            if (tmp.append(STRING_WITH_LEN(" > ")))
1413
              goto err;
1414
          }
1415 1416

          if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1417 1418
                                    part_length))
          {
1419
            goto err;
1420
          }
1421 1422
          break;
        }
1423
      case HA_READ_KEY_OR_NEXT:
1424
        DBUG_PRINT("info", ("federated HA_READ_KEY_OR_NEXT %d", i));
1425
        if (emit_key_part_name(&tmp, key_part) ||
1426
            tmp.append(STRING_WITH_LEN(" >= ")) ||
1427
            emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1428
              part_length))
1429
          goto err;
1430
        break;
1431
      case HA_READ_BEFORE_KEY:
1432
        DBUG_PRINT("info", ("federated HA_READ_BEFORE_KEY %d", i));
1433 1434 1435
        if (store_length >= length)
        {
          if (emit_key_part_name(&tmp, key_part) ||
1436
              tmp.append(STRING_WITH_LEN(" < ")) ||
1437 1438
              emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
                                    part_length))
1439
            goto err;
1440 1441
          break;
        }
1442
      case HA_READ_KEY_OR_PREV:
1443
        DBUG_PRINT("info", ("federated HA_READ_KEY_OR_PREV %d", i));
1444
        if (emit_key_part_name(&tmp, key_part) ||
1445
            tmp.append(STRING_WITH_LEN(" <= ")) ||
1446 1447
            emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
                                  part_length))
1448
          goto err;
1449 1450 1451
        break;
      default:
        DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1452
        goto err;
1453
      }
1454
      if (tmp.append(STRING_WITH_LEN(") ")))
1455
        goto err;
1456

1457
prepare_for_next_key_part:
1458 1459 1460 1461 1462
      if (store_length >= length)
        break;
      DBUG_PRINT("info", ("remainder %d", remainder));
      DBUG_ASSERT(remainder > 1);
      length-= store_length;
1463 1464 1465 1466 1467 1468
      /*
        For nullable columns, null-byte is already skipped before, that is
        ptr was incremented by 1. Since store_length still counts null-byte,
        we need to subtract 1 from store_length.
      */
      ptr+= store_length - test(key_part->null_bit);
1469
      if (tmp.append(STRING_WITH_LEN(" AND ")))
1470
        goto err;
1471 1472 1473 1474

      DBUG_PRINT("info",
                 ("create_where_from_key WHERE clause: %s",
                  tmp.c_ptr_quick()));
1475
    }
1476
  }
1477 1478
  dbug_tmp_restore_column_map(table->write_set, old_map);

1479
  if (both_not_null)
1480
    if (tmp.append(STRING_WITH_LEN(") ")))
1481
      DBUG_RETURN(1);
1482

1483
  if (to->append(STRING_WITH_LEN(" WHERE ")))
1484 1485 1486 1487 1488 1489
    DBUG_RETURN(1);

  if (to->append(tmp))
    DBUG_RETURN(1);

  DBUG_RETURN(0);
1490 1491 1492 1493

err:
  dbug_tmp_restore_column_map(table->write_set, old_map);
  DBUG_RETURN(1);
1494 1495 1496 1497 1498 1499 1500
}

/*
  Example of simple lock controls. The "share" it creates is structure we will
  pass to each federated handler. Do you have to have one of these? Well, you
  have pieces that are used for locking, and they are needed to function.
*/
unknown's avatar
unknown committed
1501

1502 1503
static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
{
1504 1505
  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  Field **field;
1506
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
unknown's avatar
unknown committed
1507
  FEDERATED_SHARE *share= NULL, tmp_share;
1508 1509 1510
  MEM_ROOT mem_root;
  DBUG_ENTER("ha_federated.cc::get_share");

1511 1512 1513
  /*
    In order to use this string, we must first zero it's length,
    or it will contain garbage
unknown's avatar
unknown committed
1514
  */
1515 1516
  query.length(0);

1517 1518
  init_alloc_root(&mem_root, 256, 0);

Marc Alff's avatar
Marc Alff committed
1519
  mysql_mutex_lock(&federated_mutex);
1520

unknown's avatar
unknown committed
1521
  tmp_share.share_key= table_name;
1522
  tmp_share.share_key_length= (uint) strlen(table_name);
1523
  if (parse_url(&mem_root, &tmp_share, table, 0))
unknown's avatar
unknown committed
1524 1525 1526
    goto error;

  /* TODO: change tmp_share.scheme to LEX_STRING object */
Konstantin Osipov's avatar
Konstantin Osipov committed
1527 1528 1529 1530
  if (!(share= (FEDERATED_SHARE *) my_hash_search(&federated_open_tables,
                                                  (uchar*) tmp_share.share_key,
                                                  tmp_share.
                                                  share_key_length)))
1531 1532
  {
    query.set_charset(system_charset_info);
1533
    query.append(STRING_WITH_LEN("SELECT "));
1534 1535
    for (field= table->field; *field; field++)
    {
unknown's avatar
unknown committed
1536 1537
      append_ident(&query, (*field)->field_name, 
                   strlen((*field)->field_name), ident_quote_char);
1538
      query.append(STRING_WITH_LEN(", "));
1539
    }
1540 1541 1542
    /* chops off trailing comma */
    query.length(query.length() - sizeof_trailing_comma);

unknown's avatar
unknown committed
1543
    query.append(STRING_WITH_LEN(" FROM "));
unknown's avatar
unknown committed
1544 1545 1546

    append_ident(&query, tmp_share.table_name, 
                 tmp_share.table_name_length, ident_quote_char);
1547

1548
    if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
1549
        !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length() + 1)))
unknown's avatar
unknown committed
1550 1551
      goto error;

1552
    share->use_count= 0;
1553 1554
    share->mem_root= mem_root;

1555
    DBUG_PRINT("info",
unknown's avatar
unknown committed
1556
               ("share->select_query %s", share->select_query));
1557

1558
    if (my_hash_insert(&federated_open_tables, (uchar*) share))
1559 1560
      goto error;
    thr_lock_init(&share->lock);
Marc Alff's avatar
Marc Alff committed
1561 1562
    mysql_mutex_init(fe_key_mutex_FEDERATED_SHARE_mutex,
                     &share->mutex, MY_MUTEX_INIT_FAST);
1563
  }
1564 1565 1566
  else
    free_root(&mem_root, MYF(0)); /* prevents memory leak */

1567
  share->use_count++;
Marc Alff's avatar
Marc Alff committed
1568
  mysql_mutex_unlock(&federated_mutex);
1569

1570
  DBUG_RETURN(share);
1571 1572

error:
Marc Alff's avatar
Marc Alff committed
1573
  mysql_mutex_unlock(&federated_mutex);
1574 1575
  free_root(&mem_root, MYF(0));
  DBUG_RETURN(NULL);
1576 1577 1578 1579
}


/*
unknown's avatar
unknown committed
1580
  Free lock controls. We call this whenever we close a table.
1581 1582 1583
  If the table had the last reference to the share then we
  free memory associated with it.
*/
unknown's avatar
unknown committed
1584

1585 1586
static int free_share(FEDERATED_SHARE *share)
{
1587
  MEM_ROOT mem_root= share->mem_root;
1588
  DBUG_ENTER("free_share");
unknown's avatar
unknown committed
1589

Marc Alff's avatar
Marc Alff committed
1590
  mysql_mutex_lock(&federated_mutex);
1591 1592
  if (!--share->use_count)
  {
Konstantin Osipov's avatar
Konstantin Osipov committed
1593
    my_hash_delete(&federated_open_tables, (uchar*) share);
1594
    thr_lock_delete(&share->lock);
Marc Alff's avatar
Marc Alff committed
1595
    mysql_mutex_destroy(&share->mutex);
1596
    free_root(&mem_root, MYF(0));
1597
  }
Marc Alff's avatar
Marc Alff committed
1598
  mysql_mutex_unlock(&federated_mutex);
1599

1600
  DBUG_RETURN(0);
1601 1602 1603
}


1604
ha_rows ha_federated::records_in_range(uint inx, key_range *start_key,
1605
                                       key_range *end_key)
1606 1607 1608
{
  /*

1609 1610 1611
  We really want indexes to be used as often as possible, therefore
  we just need to hard-code the return value to a very low number to
  force the issue
1612 1613 1614 1615 1616

*/
  DBUG_ENTER("ha_federated::records_in_range");
  DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
}
1617 1618
/*
  If frm_error() is called then we will use this to to find out
unknown's avatar
unknown committed
1619 1620
  what file extentions exist for the storage engine. This is
  also used by the default rename_table and delete_table method
1621 1622
  in handler.cc.
*/
unknown's avatar
unknown committed
1623

1624
const char **ha_federated::bas_ext() const
unknown's avatar
unknown committed
1625
{
1626 1627 1628 1629 1630
  static const char *ext[]=
  {
    NullS
  };
  return ext;
unknown's avatar
unknown committed
1631
}
1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643


/*
  Used for opening tables. The name will be the name of the file.
  A table is opened when it needs to be opened. For instance
  when a request comes in for a select on the table (tables are not
  open and closed for each request, they are cached).

  Called from handler.cc by handler::ha_open(). The server opens
  all tables by calling ha_open() which then calls the handler
  specific open().
*/
unknown's avatar
unknown committed
1644

1645 1646
int ha_federated::open(const char *name, int mode, uint test_if_locked)
{
unknown's avatar
unknown committed
1647
  DBUG_ENTER("ha_federated::open");
1648 1649 1650

  if (!(share= get_share(name, table)))
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1651
  thr_lock_data_init(&share->lock, &lock, NULL);
1652

unknown's avatar
unknown committed
1653
  DBUG_ASSERT(mysql == NULL);
1654

1655
  ref_length= sizeof(MYSQL_RES *) + sizeof(MYSQL_ROW_OFFSET);
unknown's avatar
unknown committed
1656 1657
  DBUG_PRINT("info", ("ref_length: %u", ref_length));

1658
  my_init_dynamic_array(&results, sizeof(MYSQL_RES *), 4, 4);
unknown's avatar
unknown committed
1659 1660
  reset();

1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
  DBUG_RETURN(0);
}


/*
  Closes a table. We call the free_share() function to free any resources
  that we have allocated in the "shared" structure.

  Called from sql_base.cc, sql_select.cc, and table.cc.
  In sql_select.cc it is only used to close up temporary tables or during
  the process where a temporary table is converted over to being a
  myisam table.
  For sql_base.cc look at close_data_tables().
*/
unknown's avatar
unknown committed
1675

1676 1677 1678
int ha_federated::close(void)
{
  DBUG_ENTER("ha_federated::close");
unknown's avatar
unknown committed
1679

1680 1681 1682 1683
  free_result();
  
  delete_dynamic(&results);
  
1684
  /* Disconnect from mysql */
unknown's avatar
unknown committed
1685 1686
  mysql_close(mysql);
  mysql= NULL;
1687

1688 1689 1690 1691 1692 1693 1694 1695 1696
  /*
    mysql_close() might return an error if a remote server's gone
    for some reason. If that happens while removing a table from
    the table cache, the error will be propagated to a client even
    if the original query was not issued against the FEDERATED table.
    So, don't propagate errors from mysql_close().
  */
  table->in_use->clear_error();

1697
  DBUG_RETURN(free_share(share));
1698 1699 1700 1701
}

/*

unknown's avatar
unknown committed
1702
  Checks if a field in a record is SQL NULL.
1703 1704 1705 1706 1707 1708 1709 1710 1711

  SYNOPSIS
    field_in_record_is_null()
      table     TABLE pointer, MySQL table object
      field     Field pointer, MySQL field object
      record    char pointer, contains record

    DESCRIPTION
      This method uses the record format information in table to track
unknown's avatar
unknown committed
1712
      the null bit in record.
1713 1714 1715

    RETURN VALUE
      1    if NULL
unknown's avatar
unknown committed
1716
      0    otherwise
1717
*/
unknown's avatar
unknown committed
1718

1719
static inline uint field_in_record_is_null(TABLE *table,
1720 1721
                                    Field *field,
                                    char *record)
1722 1723 1724 1725 1726 1727 1728
{
  int null_offset;
  DBUG_ENTER("ha_federated::field_in_record_is_null");

  if (!field->null_ptr)
    DBUG_RETURN(0);

unknown's avatar
unknown committed
1729
  null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]);
1730 1731 1732 1733 1734 1735 1736

  if (record[null_offset] & field->null_bit)
    DBUG_RETURN(1);

  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1737

unknown's avatar
unknown committed
1738 1739 1740 1741 1742 1743 1744 1745 1746
/**
  @brief Construct the INSERT statement.
  
  @details This method will construct the INSERT statement and appends it to
  the supplied query string buffer.
  
  @return
    @retval FALSE       No error
    @retval TRUE        Failure
1747
*/
unknown's avatar
unknown committed
1748

unknown's avatar
unknown committed
1749
bool ha_federated::append_stmt_insert(String *query)
1750
{
unknown's avatar
unknown committed
1751 1752 1753
  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  Field **field;
  uint tmp_length;
unknown's avatar
unknown committed
1754
  bool added_field= FALSE;
1755

unknown's avatar
unknown committed
1756 1757 1758
  /* The main insert query string */
  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::append_stmt_insert");
1759

unknown's avatar
unknown committed
1760
  insert_string.length(0);
1761

unknown's avatar
unknown committed
1762 1763 1764 1765 1766 1767 1768 1769
  if (replace_duplicates)
    insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
  else if (ignore_duplicates && !insert_dup_update)
    insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
  else
    insert_string.append(STRING_WITH_LEN("INSERT INTO "));
  append_ident(&insert_string, share->table_name, share->table_name_length, 
               ident_quote_char);
unknown's avatar
unknown committed
1770 1771
  tmp_length= insert_string.length();
  insert_string.append(STRING_WITH_LEN(" ("));
1772

unknown's avatar
unknown committed
1773 1774 1775 1776 1777 1778
  /*
    loop through the field pointer array, add any fields to both the values
    list and the fields list that match the current query id
  */
  for (field= table->field; *field; field++)
  {
1779 1780 1781 1782 1783
    if (bitmap_is_set(table->write_set, (*field)->field_index))
    {
      /* append the field name */
      append_ident(&insert_string, (*field)->field_name, 
                   strlen((*field)->field_name), ident_quote_char);
1784

1785 1786 1787 1788 1789 1790 1791
      /* append commas between both fields and fieldnames */
      /*
        unfortunately, we can't use the logic if *(fields + 1) to
        make the following appends conditional as we don't know if the
        next field is in the write set
      */
      insert_string.append(STRING_WITH_LEN(", "));
unknown's avatar
unknown committed
1792
      added_field= TRUE;
1793
    }
unknown's avatar
unknown committed
1794
  }
1795

unknown's avatar
unknown committed
1796 1797 1798 1799 1800 1801 1802
  if (added_field)
  {
    /* Remove trailing comma. */
    insert_string.length(insert_string.length() - sizeof_trailing_comma);
    insert_string.append(STRING_WITH_LEN(") "));
  }
  else
unknown's avatar
unknown committed
1803
  {
unknown's avatar
unknown committed
1804 1805
    /* If there were no fields, we don't want to add a closing paren. */
    insert_string.length(tmp_length);
unknown's avatar
unknown committed
1806
  }
1807

1808
  insert_string.append(STRING_WITH_LEN(" VALUES "));
1809

unknown's avatar
unknown committed
1810 1811
  DBUG_RETURN(query->append(insert_string));
}
1812 1813


1814 1815 1816 1817 1818 1819 1820 1821 1822
/*
  write_row() inserts a row. No extra() hint is given currently if a bulk load
  is happeneding. buf() is a byte array of data. You can use the field
  information to extract the data from the native byte array type.
  Example of this would be:
  for (Field **field=table->field ; *field ; field++)
  {
    ...
  }
1823

1824 1825 1826
  Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
  sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
*/
1827

1828
int ha_federated::write_row(uchar *buf)
1829
{
1830 1831
  char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
unknown's avatar
unknown committed
1832
  Field **field;
unknown's avatar
unknown committed
1833 1834 1835
  uint tmp_length;
  int error= 0;
  bool use_bulk_insert;
1836
  bool auto_increment_update_required= (table->next_number_field != NULL);
1837

1838
  /* The string containing the values to be added to the insert */
1839
  String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1840
  /* The actual value of the field, to be added to the values_string */
1841
  String insert_field_value_string(insert_field_value_buffer,
unknown's avatar
unknown committed
1842 1843
                                   sizeof(insert_field_value_buffer),
                                   &my_charset_bin);
1844
  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1845
  DBUG_ENTER("ha_federated::write_row");
1846

1847 1848
  values_string.length(0);
  insert_field_value_string.length(0);
unknown's avatar
unknown committed
1849
  ha_statistic_increment(&SSV::ha_write_count);
1850 1851 1852
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();

1853 1854
  /*
    start both our field and field values strings
unknown's avatar
unknown committed
1855 1856 1857 1858 1859 1860
    We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
    Ignore duplicates is always true when insert_dup_update is true.
    When replace_duplicates == TRUE, we can safely enable multi-row insert.
    When performing multi-row insert, we only collect the columns values for
    the row. The start of the statement is only created when the first
    row is copied in to the bulk_insert string.
1861
  */
unknown's avatar
unknown committed
1862 1863 1864
  if (!(use_bulk_insert= bulk_insert.str && 
        (!insert_dup_update || replace_duplicates)))
    append_stmt_insert(&values_string);
1865

1866
  values_string.append(STRING_WITH_LEN(" ("));
unknown's avatar
unknown committed
1867
  tmp_length= values_string.length();
1868 1869

  /*
unknown's avatar
unknown committed
1870
    loop through the field pointer array, add any fields to both the values
1871
    list and the fields list that is part of the write set
1872
  */
1873
  for (field= table->field; *field; field++)
1874
  {
1875
    if (bitmap_is_set(table->write_set, (*field)->field_index))
1876 1877
    {
      if ((*field)->is_null())
1878
        values_string.append(STRING_WITH_LEN(" NULL "));
1879 1880
      else
      {
1881
        bool needs_quote= (*field)->str_needs_quotes();
1882
        (*field)->val_str(&insert_field_value_string);
1883
        if (needs_quote)
1884
          values_string.append(value_quote_char);
1885
        insert_field_value_string.print(&values_string);
1886
        if (needs_quote)
1887
          values_string.append(value_quote_char);
1888

1889
        insert_field_value_string.length(0);
1890
      }
1891

1892
      /* append commas between both fields and fieldnames */
1893
      /*
1894 1895 1896
        unfortunately, we can't use the logic if *(fields + 1) to
        make the following appends conditional as we don't know if the
        next field is in the write set
1897
      */
1898
      values_string.append(STRING_WITH_LEN(", "));
1899 1900
    }
  }
1901
  dbug_tmp_restore_column_map(table->read_set, old_map);
1902 1903

  /*
unknown's avatar
unknown committed
1904 1905 1906
    if there were no fields, we don't want to add a closing paren
    AND, we don't want to chop off the last char '('
    insert will be "INSERT INTO t1 VALUES ();"
1907
  */
unknown's avatar
unknown committed
1908
  if (values_string.length() > tmp_length)
1909
  {
1910
    /* chops off trailing comma */
1911
    values_string.length(values_string.length() - sizeof_trailing_comma);
1912
  }
1913
  /* we always want to append this, even if there aren't any fields */
1914
  values_string.append(STRING_WITH_LEN(") "));
unknown's avatar
unknown committed
1915

unknown's avatar
unknown committed
1916 1917 1918 1919 1920 1921 1922 1923 1924 1925
  if (use_bulk_insert)
  {
    /*
      Send the current bulk insert out if appending the current row would
      cause the statement to overflow the packet size, otherwise set
      auto_increment_update_required to FALSE as no query was executed.
    */
    if (bulk_insert.length + values_string.length() + bulk_padding >
        mysql->net.max_packet_size && bulk_insert.length)
    {
unknown's avatar
unknown committed
1926
      error= real_query(bulk_insert.str, bulk_insert.length);
unknown's avatar
unknown committed
1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943
      bulk_insert.length= 0;
    }
    else
      auto_increment_update_required= FALSE;
      
    if (bulk_insert.length == 0)
    {
      char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
      String insert_string(insert_buffer, sizeof(insert_buffer), 
                           &my_charset_bin);
      insert_string.length(0);
      append_stmt_insert(&insert_string);
      dynstr_append_mem(&bulk_insert, insert_string.ptr(), 
                        insert_string.length());
    }
    else
      dynstr_append_mem(&bulk_insert, ",", 1);
1944

unknown's avatar
unknown committed
1945 1946 1947 1948 1949
    dynstr_append_mem(&bulk_insert, values_string.ptr(), 
                      values_string.length());
  }  
  else
  {
unknown's avatar
unknown committed
1950
    error= real_query(values_string.ptr(), values_string.length());
unknown's avatar
unknown committed
1951 1952 1953
  }
  
  if (error)
1954
  {
1955
    DBUG_RETURN(stash_remote_error());
1956
  }
unknown's avatar
unknown committed
1957
  /*
1958 1959
    If the table we've just written a record to contains an auto_increment
    field, then store the last_insert_id() value from the foreign server
unknown's avatar
unknown committed
1960
  */
unknown's avatar
unknown committed
1961
  if (auto_increment_update_required)
unknown's avatar
unknown committed
1962
  {
unknown's avatar
unknown committed
1963
    update_auto_increment();
1964

unknown's avatar
unknown committed
1965
    /* mysql_insert() uses this for protocol return value */
unknown's avatar
unknown committed
1966
    table->next_number_field->store(stats.auto_increment_value, 1);
unknown's avatar
unknown committed
1967 1968
  }

1969 1970 1971
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997

/**
  @brief Prepares the storage engine for bulk inserts.
  
  @param[in] rows       estimated number of rows in bulk insert 
                        or 0 if unknown.
  
  @details Initializes memory structures required for bulk insert.
*/

void ha_federated::start_bulk_insert(ha_rows rows)
{
  uint page_size;
  DBUG_ENTER("ha_federated::start_bulk_insert");

  dynstr_free(&bulk_insert);
  
  /**
    We don't bother with bulk-insert semantics when the estimated rows == 1
    The rows value will be 0 if the server does not know how many rows
    would be inserted. This can occur when performing INSERT...SELECT
  */
  
  if (rows == 1)
    DBUG_VOID_RETURN;

unknown's avatar
unknown committed
1998 1999 2000 2001 2002 2003 2004
  /*
    Make sure we have an open connection so that we know the 
    maximum packet size.
  */
  if (!mysql && real_connect())
    DBUG_VOID_RETURN;

unknown's avatar
unknown committed
2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032
  page_size= (uint) my_getpagesize();

  if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
    DBUG_VOID_RETURN;
  
  bulk_insert.length= 0;
  DBUG_VOID_RETURN;
}


/**
  @brief End bulk insert.
  
  @details This method will send any remaining rows to the remote server.
  Finally, it will deinitialize the bulk insert data structure.
  
  @return Operation status
  @retval       0       No error
  @retval       != 0    Error occured at remote server. Also sets my_errno.
*/

int ha_federated::end_bulk_insert()
{
  int error= 0;
  DBUG_ENTER("ha_federated::end_bulk_insert");
  
  if (bulk_insert.str && bulk_insert.length)
  {
unknown's avatar
unknown committed
2033
    if (real_query(bulk_insert.str, bulk_insert.length))
unknown's avatar
unknown committed
2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045
      error= stash_remote_error();
    else
    if (table->next_number_field)
      update_auto_increment();
  }

  dynstr_free(&bulk_insert);
  
  DBUG_RETURN(my_errno= error);
}


unknown's avatar
unknown committed
2046 2047 2048 2049 2050 2051
/*
  ha_federated::update_auto_increment

  This method ensures that last_insert_id() works properly. What it simply does
  is calls last_insert_id() on the foreign database immediately after insert
  (if the table has an auto_increment field) and sets the insert id via
2052
  thd->insert_id(ID)).
unknown's avatar
unknown committed
2053 2054 2055 2056 2057 2058
*/
void ha_federated::update_auto_increment(void)
{
  THD *thd= current_thd;
  DBUG_ENTER("ha_federated::update_auto_increment");

unknown's avatar
unknown committed
2059
  ha_federated::info(HA_STATUS_AUTO);
2060
  thd->first_successful_insert_id_in_cur_stmt= 
2061
    stats.auto_increment_value;
unknown's avatar
unknown committed
2062
  DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
unknown's avatar
unknown committed
2063 2064 2065

  DBUG_VOID_RETURN;
}
2066

2067 2068 2069 2070 2071 2072 2073 2074 2075
int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  char query_buffer[STRING_BUFFER_USUAL_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::optimize");
  
  query.length(0);

  query.set_charset(system_charset_info);
2076
  query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
unknown's avatar
unknown committed
2077 2078
  append_ident(&query, share->table_name, share->table_name_length, 
               ident_quote_char);
2079

unknown's avatar
unknown committed
2080
  if (real_query(query.ptr(), query.length()))
2081
  {
2082
    DBUG_RETURN(stash_remote_error());
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097
  }

  DBUG_RETURN(0);
}


int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
{
  char query_buffer[STRING_BUFFER_USUAL_SIZE];
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::repair");

  query.length(0);

  query.set_charset(system_charset_info);
2098
  query.append(STRING_WITH_LEN("REPAIR TABLE "));
unknown's avatar
unknown committed
2099 2100
  append_ident(&query, share->table_name, share->table_name_length, 
               ident_quote_char);
2101
  if (check_opt->flags & T_QUICK)
2102
    query.append(STRING_WITH_LEN(" QUICK"));
2103
  if (check_opt->flags & T_EXTEND)
2104
    query.append(STRING_WITH_LEN(" EXTENDED"));
2105
  if (check_opt->sql_flags & TT_USEFRM)
2106
    query.append(STRING_WITH_LEN(" USE_FRM"));
unknown's avatar
unknown committed
2107

unknown's avatar
unknown committed
2108
  if (real_query(query.ptr(), query.length()))
2109
  {
2110
    DBUG_RETURN(stash_remote_error());
2111 2112 2113 2114 2115
  }

  DBUG_RETURN(0);
}

2116

2117 2118 2119 2120 2121 2122
/*
  Yes, update_row() does what you expect, it updates a row. old_data will have
  the previous row record in it, while new_data will have the newest data in
  it.

  Keep in mind that the server can do updates based on ordering if an ORDER BY
unknown's avatar
unknown committed
2123
  clause was used. Consecutive ordering is not guaranteed.
2124 2125 2126 2127 2128 2129 2130 2131 2132
  Currently new_data will not have an updated auto_increament record, or
  and updated timestamp field. You can do these for federated by doing these:
  if (table->timestamp_on_update_now)
    update_timestamp(new_row+table->timestamp_on_update_now-1);
  if (table->next_number_field && record == table->record[0])
    update_auto_increment();

  Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
*/
unknown's avatar
unknown committed
2133

2134
int ha_federated::update_row(const uchar *old_data, uchar *new_data)
2135
{
2136
  /*
2137 2138 2139 2140 2141 2142 2143 2144 2145 2146
    This used to control how the query was built. If there was a
    primary key, the query would be built such that there was a where
    clause with only that column as the condition. This is flawed,
    because if we have a multi-part primary key, it would only use the
    first part! We don't need to do this anyway, because
    read_range_first will retrieve the correct record, which is what
    is used to build the WHERE clause. We can however use this to
    append a LIMIT to the end if there is NOT a primary key. Why do
    this? Because we only are updating one record, and LIMIT enforces
    this.
2147
  */
2148
  bool has_a_primary_key= test(table->s->primary_key != MAX_KEY);
2149
  
unknown's avatar
unknown committed
2150
  /*
2151 2152
    buffers for following strings
  */
2153
  char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2154 2155
  char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2156

2157 2158 2159
  /* Work area for field values */
  String field_value(field_value_buffer, sizeof(field_value_buffer),
                     &my_charset_bin);
2160
  /* stores the update query */
2161 2162 2163
  String update_string(update_buffer,
                       sizeof(update_buffer),
                       &my_charset_bin);
unknown's avatar
unknown committed
2164
  /* stores the WHERE clause */
2165 2166 2167
  String where_string(where_buffer,
                      sizeof(where_buffer),
                      &my_charset_bin);
2168
  uchar *record= table->record[0];
2169
  DBUG_ENTER("ha_federated::update_row");
unknown's avatar
unknown committed
2170
  /*
2171 2172
    set string lengths to 0 to avoid misc chars in string
  */
2173
  field_value.length(0);
2174 2175
  update_string.length(0);
  where_string.length(0);
2176

unknown's avatar
unknown committed
2177 2178 2179 2180 2181 2182
  if (ignore_duplicates)
    update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
  else
    update_string.append(STRING_WITH_LEN("UPDATE "));
  append_ident(&update_string, share->table_name,
               share->table_name_length, ident_quote_char);
2183
  update_string.append(STRING_WITH_LEN(" SET "));
2184

unknown's avatar
unknown committed
2185 2186 2187
  /*
    In this loop, we want to match column names to values being inserted
    (while building INSERT statement).
2188

2189 2190
    Iterate through table->field (new data) and share->old_field (old_data)
    using the same index to create an SQL UPDATE statement. New data is
unknown's avatar
unknown committed
2191 2192 2193
    used to create SET field=value and old data is used to create WHERE
    field=oldvalue
  */
unknown's avatar
unknown committed
2194

2195
  for (Field **field= table->field; *field; field++)
2196
  {
2197
    if (bitmap_is_set(table->write_set, (*field)->field_index))
2198
    {
2199
      size_t field_name_length= strlen((*field)->field_name);
2200 2201
      append_ident(&update_string, (*field)->field_name, field_name_length,
                   ident_quote_char);
2202
      update_string.append(STRING_WITH_LEN(" = "));
unknown's avatar
unknown committed
2203

2204
      if ((*field)->is_null())
2205
        update_string.append(STRING_WITH_LEN(" NULL "));
2206 2207 2208
      else
      {
        /* otherwise = */
2209 2210
        my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
        bool needs_quote= (*field)->str_needs_quotes();
2211
	(*field)->val_str(&field_value);
2212
        if (needs_quote)
2213
          update_string.append(value_quote_char);
2214
        field_value.print(&update_string);
2215
        if (needs_quote)
2216
          update_string.append(value_quote_char);
2217
        field_value.length(0);
2218 2219
        tmp_restore_column_map(table->read_set, old_map);
      }
2220
      update_string.append(STRING_WITH_LEN(", "));
2221
    }
2222

2223
    if (bitmap_is_set(table->read_set, (*field)->field_index))
2224
    {
2225
      size_t field_name_length= strlen((*field)->field_name);
2226 2227
      append_ident(&where_string, (*field)->field_name, field_name_length,
                   ident_quote_char);
2228
      if (field_in_record_is_null(table, *field, (char*) old_data))
2229
        where_string.append(STRING_WITH_LEN(" IS NULL "));
2230 2231
      else
      {
2232
        bool needs_quote= (*field)->str_needs_quotes();
2233
        where_string.append(STRING_WITH_LEN(" = "));
unknown's avatar
unknown committed
2234
        (*field)->val_str(&field_value,
2235
                          (old_data + (*field)->offset(record)));
2236
        if (needs_quote)
2237
          where_string.append(value_quote_char);
2238
        field_value.print(&where_string);
2239
        if (needs_quote)
2240
          where_string.append(value_quote_char);
unknown's avatar
unknown committed
2241
        field_value.length(0);
2242
      }
2243
      where_string.append(STRING_WITH_LEN(" AND "));
2244 2245
    }
  }
2246 2247

  /* Remove last ', '. This works as there must be at least on updated field */
2248
  update_string.length(update_string.length() - sizeof_trailing_comma);
2249

2250 2251
  if (where_string.length())
  {
2252 2253 2254
    /* chop off trailing AND */
    where_string.length(where_string.length() - sizeof_trailing_and);
    update_string.append(STRING_WITH_LEN(" WHERE "));
2255 2256 2257
    update_string.append(where_string);
  }

2258 2259 2260 2261
  /*
    If this table has not a primary key, then we could possibly
    update multiple rows. We want to make sure to only update one!
  */
2262
  if (!has_a_primary_key)
2263
    update_string.append(STRING_WITH_LEN(" LIMIT 1"));
2264

unknown's avatar
unknown committed
2265
  if (real_query(update_string.ptr(), update_string.length()))
2266
  {
2267
    DBUG_RETURN(stash_remote_error());
2268 2269 2270 2271 2272
  }
  DBUG_RETURN(0);
}

/*
2273
  This will delete a row. 'buf' will contain a copy of the row to be =deleted.
2274
  The server will call this right after the current row has been called (from
unknown's avatar
unknown committed
2275
  either a previous rnd_next() or index call).
2276 2277 2278 2279 2280 2281
  If you keep a pointer to the last row or can access a primary key it will
  make doing the deletion quite a bit easier.
  Keep in mind that the server does no guarentee consecutive deletions.
  ORDER BY clauses can be used.

  Called in sql_acl.cc and sql_udf.cc to manage internal table information.
unknown's avatar
unknown committed
2282
  Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
2283 2284 2285
  it is used for removing duplicates while in insert it is used for REPLACE
  calls.
*/
unknown's avatar
unknown committed
2286

2287
int ha_federated::delete_row(const uchar *buf)
2288
{
2289 2290
  char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2291 2292
  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
2293
  uint found= 0;
2294 2295
  DBUG_ENTER("ha_federated::delete_row");

2296
  delete_string.length(0);
2297
  delete_string.append(STRING_WITH_LEN("DELETE FROM "));
unknown's avatar
unknown committed
2298 2299
  append_ident(&delete_string, share->table_name,
               share->table_name_length, ident_quote_char);
2300
  delete_string.append(STRING_WITH_LEN(" WHERE "));
2301

unknown's avatar
unknown committed
2302
  for (Field **field= table->field; *field; field++)
2303
  {
2304
    Field *cur_field= *field;
2305 2306
    found++;
    if (bitmap_is_set(table->read_set, cur_field->field_index))
2307
    {
2308 2309
      append_ident(&delete_string, (*field)->field_name,
                   strlen((*field)->field_name), ident_quote_char);
2310 2311 2312
      data_string.length(0);
      if (cur_field->is_null())
      {
2313
        delete_string.append(STRING_WITH_LEN(" IS NULL "));
2314 2315 2316
      }
      else
      {
2317 2318 2319 2320
        bool needs_quote= cur_field->str_needs_quotes();
        delete_string.append(STRING_WITH_LEN(" = "));
        cur_field->val_str(&data_string);
        if (needs_quote)
2321
          delete_string.append(value_quote_char);
2322 2323
        data_string.print(&delete_string);
        if (needs_quote)
2324
          delete_string.append(value_quote_char);
2325
      }
2326
      delete_string.append(STRING_WITH_LEN(" AND "));
unknown's avatar
unknown committed
2327
    }
2328
  }
2329

2330 2331
  // Remove trailing AND
  delete_string.length(delete_string.length() - sizeof_trailing_and);
2332
  if (!found)
2333
    delete_string.length(delete_string.length() - sizeof_trailing_where);
2334

2335
  delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
2336
  DBUG_PRINT("info",
2337
             ("Delete sql: %s", delete_string.c_ptr_quick()));
unknown's avatar
unknown committed
2338
  if (real_query(delete_string.ptr(), delete_string.length()))
2339
  {
2340
    DBUG_RETURN(stash_remote_error());
2341
  }
unknown's avatar
unknown committed
2342 2343
  stats.deleted+= (ha_rows) mysql->affected_rows;
  stats.records-= (ha_rows) mysql->affected_rows;
2344
  DBUG_PRINT("info",
unknown's avatar
unknown committed
2345
             ("rows deleted %ld  rows deleted for all time %ld",
unknown's avatar
unknown committed
2346
              (long) mysql->affected_rows, (long) stats.deleted));
2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357

  DBUG_RETURN(0);
}


/*
  Positions an index cursor to the index specified in the handle. Fetches the
  row if available. If the key value is null, begin at the first key of the
  index. This method, which is called in the case of an SQL statement having
  a WHERE clause on a non-primary key index, simply calls index_read_idx.
*/
unknown's avatar
unknown committed
2358

2359
int ha_federated::index_read(uchar *buf, const uchar *key,
unknown's avatar
unknown committed
2360
                             uint key_len, ha_rkey_function find_flag)
2361
{
2362
  int rc;
2363
  DBUG_ENTER("ha_federated::index_read");
unknown's avatar
unknown committed
2364

2365
  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2366
  free_result();
2367 2368 2369
  rc= index_read_idx_with_result_set(buf, active_index, key,
                                     key_len, find_flag,
                                     &stored_result);
2370
  MYSQL_INDEX_READ_ROW_DONE(rc);
2371
  DBUG_RETURN(rc);
2372 2373 2374 2375 2376 2377 2378 2379
}


/*
  Positions an index cursor to the index specified in key. Fetches the
  row if any.  This is only used to read whole keys.

  This method is called via index_read in the case of a WHERE clause using
unknown's avatar
unknown committed
2380
  a primary key index OR is called DIRECTLY when the WHERE clause
2381
  uses a PRIMARY KEY index.
unknown's avatar
unknown committed
2382 2383 2384 2385

  NOTES
    This uses an internal result set that is deleted before function
    returns.  We need to be able to be calable from ha_rnd_pos()
2386
*/
unknown's avatar
unknown committed
2387

2388
int ha_federated::index_read_idx(uchar *buf, uint index, const uchar *key,
2389
                                 uint key_len, enum ha_rkey_function find_flag)
unknown's avatar
unknown committed
2390 2391 2392 2393 2394 2395 2396 2397 2398 2399
{
  int retval;
  MYSQL_RES *mysql_result;
  DBUG_ENTER("ha_federated::index_read_idx");

  if ((retval= index_read_idx_with_result_set(buf, index, key,
                                              key_len, find_flag,
                                              &mysql_result)))
    DBUG_RETURN(retval);
  mysql_free_result(mysql_result);
2400 2401
  results.elements--;
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414
}


/*
  Create result set for rows matching query and return first row

  RESULT
    0	ok     In this case *result will contain the result set
	       table->status == 0 
    #   error  In this case *result will contain 0
               table->status == STATUS_NOT_FOUND
*/

2415 2416
int ha_federated::index_read_idx_with_result_set(uchar *buf, uint index,
                                                 const uchar *key,
unknown's avatar
unknown committed
2417 2418 2419
                                                 uint key_len,
                                                 ha_rkey_function find_flag,
                                                 MYSQL_RES **result)
2420
{
2421 2422 2423 2424
  int retval;
  char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  char index_value[STRING_BUFFER_USUAL_SIZE];
  char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
unknown's avatar
unknown committed
2425
  String index_string(index_value,
2426 2427 2428 2429 2430 2431
                      sizeof(index_value),
                      &my_charset_bin);
  String sql_query(sql_query_buffer,
                   sizeof(sql_query_buffer),
                   &my_charset_bin);
  key_range range;
unknown's avatar
unknown committed
2432
  DBUG_ENTER("ha_federated::index_read_idx_with_result_set");
2433

unknown's avatar
unknown committed
2434
  *result= 0;                                   // In case of errors
2435
  index_string.length(0);
2436
  sql_query.length(0);
unknown's avatar
unknown committed
2437
  ha_statistic_increment(&SSV::ha_read_key_count);
2438 2439 2440

  sql_query.append(share->select_query);

2441 2442 2443 2444 2445 2446
  range.key= key;
  range.length= key_len;
  range.flag= find_flag;
  create_where_from_key(&index_string,
                        &table->key_info[index],
                        &range,
2447
                        NULL, 0, 0);
2448 2449
  sql_query.append(index_string);

unknown's avatar
unknown committed
2450
  if (real_query(sql_query.ptr(), sql_query.length()))
2451
  {
2452 2453
    sprintf(error_buffer, "error: %d '%s'",
            mysql_errno(mysql), mysql_error(mysql));
2454
    retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2455
    goto error;
2456
  }
2457
  if (!(*result= store_result(mysql)))
2458
  {
2459 2460
    retval= HA_ERR_END_OF_FILE;
    goto error;
2461
  }
2462 2463 2464 2465 2466 2467
  if ((retval= read_next(buf, *result)))
  {
    mysql_free_result(*result);
    results.elements--;
    *result= 0;
    table->status= STATUS_NOT_FOUND;
unknown's avatar
unknown committed
2468
    DBUG_RETURN(retval);
2469 2470
  }
  DBUG_RETURN(0);
2471

2472 2473 2474 2475
error:
  table->status= STATUS_NOT_FOUND;
  my_error(retval, MYF(0), error_buffer);
  DBUG_RETURN(retval);
2476 2477
}

unknown's avatar
unknown committed
2478

2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
/*
  This method is used exlusevely by filesort() to check if we
  can create sorting buffers of necessary size.
  If the handler returns more records that it declares
  here server can just crash on filesort().
  We cannot guarantee that's not going to happen with
  the FEDERATED engine, as we have records==0 always if the
  client is a VIEW, and for the table the number of
  records can inpredictably change during execution.
  So we return maximum possible value here.
*/

ha_rows ha_federated::estimate_rows_upper_bound()
{
  return HA_POS_ERROR;
}


unknown's avatar
unknown committed
2497
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
unknown's avatar
unknown committed
2498

2499
int ha_federated::index_init(uint keynr, bool sorted)
2500 2501
{
  DBUG_ENTER("ha_federated::index_init");
unknown's avatar
unknown committed
2502
  DBUG_PRINT("info", ("table: '%s'  key: %u", table->s->table_name.str, keynr));
2503 2504 2505 2506
  active_index= keynr;
  DBUG_RETURN(0);
}

2507

unknown's avatar
unknown committed
2508 2509
/*
  Read first range
2510
*/
unknown's avatar
unknown committed
2511

2512
int ha_federated::read_range_first(const key_range *start_key,
2513
                                   const key_range *end_key,
2514
                                   bool eq_range_arg, bool sorted)
2515 2516 2517 2518 2519 2520 2521
{
  char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
  int retval;
  String sql_query(sql_query_buffer,
                   sizeof(sql_query_buffer),
                   &my_charset_bin);
  DBUG_ENTER("ha_federated::read_range_first");
2522
  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2523

unknown's avatar
unknown committed
2524
  DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
2525 2526 2527 2528 2529

  sql_query.length(0);
  sql_query.append(share->select_query);
  create_where_from_key(&sql_query,
                        &table->key_info[active_index],
unknown's avatar
unknown committed
2530
                        start_key, end_key, 0, eq_range_arg);
unknown's avatar
unknown committed
2531
  if (real_query(sql_query.ptr(), sql_query.length()))
2532
  {
2533
    retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2534 2535 2536 2537
    goto error;
  }
  sql_query.length(0);

2538
  if (!(stored_result= store_result(mysql)))
2539 2540 2541 2542 2543
  {
    retval= HA_ERR_END_OF_FILE;
    goto error;
  }

unknown's avatar
unknown committed
2544
  retval= read_next(table->record[0], stored_result);
2545
  MYSQL_INDEX_READ_ROW_DONE(retval);
2546 2547 2548
  DBUG_RETURN(retval);

error:
unknown's avatar
unknown committed
2549
  table->status= STATUS_NOT_FOUND;
2550
  MYSQL_INDEX_READ_ROW_DONE(retval);
unknown's avatar
unknown committed
2551
  DBUG_RETURN(retval);
2552 2553
}

unknown's avatar
unknown committed
2554

2555 2556 2557 2558
int ha_federated::read_range_next()
{
  int retval;
  DBUG_ENTER("ha_federated::read_range_next");
2559 2560 2561
  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
  retval= rnd_next_int(table->record[0]);
  MYSQL_INDEX_READ_ROW_DONE(retval);
2562 2563 2564 2565
  DBUG_RETURN(retval);
}


unknown's avatar
unknown committed
2566
/* Used to read forward through the index.  */
2567
int ha_federated::index_next(uchar *buf)
2568
{
2569
  int retval;
2570
  DBUG_ENTER("ha_federated::index_next");
2571
  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
unknown's avatar
unknown committed
2572
  ha_statistic_increment(&SSV::ha_read_next_count);
2573 2574 2575
  retval= read_next(buf, stored_result);
  MYSQL_INDEX_READ_ROW_DONE(retval);
  DBUG_RETURN(retval);
2576
}
unknown's avatar
unknown committed
2577 2578


2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590
/*
  rnd_init() is called when the system wants the storage engine to do a table
  scan.

  This is the method that gets data for the SELECT calls.

  See the federated in the introduction at the top of this file to see when
  rnd_init() is called.

  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
  sql_table.cc, and sql_update.cc.
*/
unknown's avatar
unknown committed
2591

2592 2593
int ha_federated::rnd_init(bool scan)
{
2594
  DBUG_ENTER("ha_federated::rnd_init");
unknown's avatar
unknown committed
2595
  /*
2596 2597 2598
    The use of the 'scan' flag is incredibly important for this handler
    to work properly, especially with updates containing WHERE clauses
    using indexed columns.
unknown's avatar
unknown committed
2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627

    When the initial query contains a WHERE clause of the query using an
    indexed column, it's index_read_idx that selects the exact record from
    the foreign database.

    When there is NO index in the query, either due to not having a WHERE
    clause, or the WHERE clause is using columns that are not indexed, a
    'full table scan' done by rnd_init, which in this situation simply means
    a 'select * from ...' on the foreign table.

    In other words, this 'scan' flag gives us the means to ensure that if
    there is an index involved in the query, we want index_read_idx to
    retrieve the exact record (scan flag is 0), and do not  want rnd_init
    to do a 'full table scan' and wipe out that result set.

    Prior to using this flag, the problem was most apparent with updates.

    An initial query like 'UPDATE tablename SET anything = whatever WHERE
    indexedcol = someval', index_read_idx would get called, using a query
    constructed with a WHERE clause built from the values of index ('indexcol'
    in this case, having a value of 'someval').  mysql_store_result would
    then get called (this would be the result set we want to use).

    After this rnd_init (from sql_update.cc) would be called, it would then
    unecessarily call "select * from table" on the foreign table, then call
    mysql_store_result, which would wipe out the correct previous result set
    from the previous call of index_read_idx's that had the result set
    containing the correct record, hence update the wrong row!

unknown's avatar
unknown committed
2628
  */
2629

unknown's avatar
unknown committed
2630
  if (scan)
2631
  {
2632 2633 2634
    if (real_query(share->select_query, strlen(share->select_query)) ||
        !(stored_result= store_result(mysql)))
      DBUG_RETURN(stash_remote_error());
unknown's avatar
unknown committed
2635
  }
2636 2637 2638
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
2639

2640 2641 2642
int ha_federated::rnd_end()
{
  DBUG_ENTER("ha_federated::rnd_end");
unknown's avatar
unknown committed
2643
  DBUG_RETURN(index_end());
2644 2645
}

unknown's avatar
unknown committed
2646

2647 2648 2649
int ha_federated::index_end(void)
{
  DBUG_ENTER("ha_federated::index_end");
2650
  free_result();
2651 2652 2653 2654
  active_index= MAX_KEY;
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
2655

2656 2657 2658 2659 2660 2661 2662 2663 2664
/*
  This is called for each row of the table scan. When you run out of records
  you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
  The Field structure for the table is the key to getting data into buf
  in a manner that will allow the server to understand it.

  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
  sql_table.cc, and sql_update.cc.
*/
unknown's avatar
unknown committed
2665

2666
int ha_federated::rnd_next(uchar *buf)
2667
{
2668
  int rc;
2669
  DBUG_ENTER("ha_federated::rnd_next");
2670 2671 2672 2673 2674 2675 2676 2677 2678 2679
  MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
                       TRUE);
  rc= rnd_next_int(buf);
  MYSQL_READ_ROW_DONE(rc);
  DBUG_RETURN(rc);
}

int ha_federated::rnd_next_int(uchar *buf)
{
  DBUG_ENTER("ha_federated::rnd_next_int");
2680

2681
  if (stored_result == 0)
2682 2683 2684 2685
  {
    /*
      Return value of rnd_init is not always checked (see records.cc),
      so we can get here _even_ if there is _no_ pre-fetched result-set!
unknown's avatar
unknown committed
2686 2687
      TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
    */
2688 2689
    DBUG_RETURN(1);
  }
unknown's avatar
unknown committed
2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713
  DBUG_RETURN(read_next(buf, stored_result));
}


/*
  ha_federated::read_next

  reads from a result set and converts to mysql internal
  format

  SYNOPSIS
    field_in_record_is_null()
      buf       byte pointer to record 
      result    mysql result set 

    DESCRIPTION
     This method is a wrapper method that reads one record from a result
     set and converts it to the internal table format

    RETURN VALUE
      1    error
      0    no error 
*/

2714
int ha_federated::read_next(uchar *buf, MYSQL_RES *result)
unknown's avatar
unknown committed
2715 2716 2717 2718 2719 2720
{
  int retval;
  MYSQL_ROW row;
  DBUG_ENTER("ha_federated::read_next");

  table->status= STATUS_NOT_FOUND;              // For easier return
2721 2722 2723
  
  /* Save current data cursor position. */
  current_position= result->data_cursor;
unknown's avatar
unknown committed
2724

unknown's avatar
unknown committed
2725
  /* Fetch a row, insert it back in a row format. */
unknown's avatar
unknown committed
2726
  if (!(row= mysql_fetch_row(result)))
2727
    DBUG_RETURN(HA_ERR_END_OF_FILE);
unknown's avatar
unknown committed
2728

unknown's avatar
unknown committed
2729 2730 2731
  if (!(retval= convert_row_to_internal_format(buf, row, result)))
    table->status= 0;

2732
  DBUG_RETURN(retval);
2733 2734 2735
}


2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753
/**
  @brief      Store a reference to current row.
  
  @details    During a query execution we may have different result sets (RS),
              e.g. for different ranges. All the RS's used are stored in 
              memory and placed in @c results dynamic array. At the end of 
              execution all stored RS's are freed at once in the
              @c ha_federated::reset().
              So, in case of federated, a reference to current row is a 
              stored result address and current data cursor position.
              As we keep all RS in memory during a query execution,
              we can get any record using the reference any time until
              @c ha_federated::reset() is called.
              TODO: we don't have to store all RS's rows but only those
              we call @c ha_federated::position() for, so we can free memory
              where we store other rows in the @c ha_federated::index_end().
 
  @param[in]  record  record data (unused)
2754
*/
unknown's avatar
unknown committed
2755

2756
void ha_federated::position(const uchar *record __attribute__ ((unused)))
2757 2758
{
  DBUG_ENTER("ha_federated::position");
2759 2760 2761 2762 2763
  
  DBUG_ASSERT(stored_result);

  position_called= TRUE;
  /* Store result set address. */
2764
  memcpy(ref, &stored_result, sizeof(MYSQL_RES *));
2765
  /* Store data cursor position. */
2766
  memcpy(ref + sizeof(MYSQL_RES *), &current_position,
2767
               sizeof(MYSQL_ROW_OFFSET));
2768 2769 2770 2771 2772 2773
  DBUG_VOID_RETURN;
}


/*
  This is like rnd_next, but you are given a position to use to determine the
unknown's avatar
unknown committed
2774
  row. The position will be of the type that you stored in ref.
2775

unknown's avatar
unknown committed
2776
  This method is required for an ORDER BY
2777 2778 2779

  Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
*/
unknown's avatar
unknown committed
2780

2781
int ha_federated::rnd_pos(uchar *buf, uchar *pos)
2782
{
2783
  MYSQL_RES *result;
2784
  int ret_val;
2785
  DBUG_ENTER("ha_federated::rnd_pos");
2786

2787 2788
  MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
                       FALSE);
unknown's avatar
unknown committed
2789
  ha_statistic_increment(&SSV::ha_read_rnd_count);
2790 2791

  /* Get stored result set. */
2792
  memcpy(&result, pos, sizeof(MYSQL_RES *));
2793 2794
  DBUG_ASSERT(result);
  /* Set data cursor position. */
2795 2796
  memcpy(&result->data_cursor, pos + sizeof(MYSQL_RES *),
         sizeof(MYSQL_ROW_OFFSET));
2797
  /* Read a row. */
2798 2799 2800
  ret_val= read_next(buf, result);
  MYSQL_READ_ROW_DONE(ret_val);
  DBUG_RETURN(ret_val);
2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846
}


/*
  ::info() is used to return information to the optimizer.
  Currently this table handler doesn't implement most of the fields
  really needed. SHOW also makes use of this data
  Another note, you will probably want to have the following in your
  code:
  if (records < 2)
    records = 2;
  The reason is that the server will optimize for cases of only a single
  record. If in a table scan you don't know the number of records
  it will probably be better to set records to two so you can return
  as many records as you need.
  Along with records a few more variables you may wish to set are:
    records
    deleted
    data_file_length
    index_file_length
    delete_length
    check_time
  Take a look at the public variables in handler.h for more information.

  Called in:
    filesort.cc
    ha_heap.cc
    item_sum.cc
    opt_sum.cc
    sql_delete.cc
    sql_delete.cc
    sql_derived.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_select.cc
    sql_show.cc
    sql_show.cc
    sql_show.cc
    sql_show.cc
    sql_table.cc
    sql_union.cc
    sql_update.cc

*/
unknown's avatar
unknown committed
2847

2848
int ha_federated::info(uint flag)
2849
{
2850 2851 2852
  char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
  int error;
  uint error_code;
2853
  MYSQL_RES *result= 0;
2854 2855
  MYSQL_ROW row;
  String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
2856
  DBUG_ENTER("ha_federated::info");
2857

2858
  error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2859 2860 2861 2862
  /* we want not to show table status if not needed to do so */
  if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
  {
    status_query_string.length(0);
2863
    status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
unknown's avatar
unknown committed
2864 2865
    append_ident(&status_query_string, share->table_name,
                 share->table_name_length, value_quote_char);
2866

unknown's avatar
unknown committed
2867
    if (real_query(status_query_string.ptr(), status_query_string.length()))
2868 2869 2870 2871 2872
      goto error;

    status_query_string.length(0);

    result= mysql_store_result(mysql);
2873 2874 2875 2876 2877 2878

    /*
      We're going to use fields num. 4, 12 and 13 of the resultset,
      so make sure we have these fields.
    */
    if (!result || (mysql_num_fields(result) < 14))
2879 2880
      goto error;

2881
    if (!mysql_num_rows(result))
2882 2883 2884 2885 2886
      goto error;

    if (!(row= mysql_fetch_row(result)))
      goto error;

Staale Smedseng's avatar
Staale Smedseng committed
2887
    /*
2888 2889 2890 2891 2892
      deleted is set in ha_federated::info
    */
    /*
      need to figure out what this means as far as federated is concerned,
      since we don't have a "file"
2893

2894 2895 2896 2897 2898
      data_file_length = ?
      index_file_length = ?
      delete_length = ?
    */
    if (row[4] != NULL)
Staale Smedseng's avatar
Staale Smedseng committed
2899
      stats.records=   (ha_rows) my_strtoll10(row[4], (char**) 0,
2900
                                                       &error);
Staale Smedseng's avatar
Staale Smedseng committed
2901 2902
    if (row[5] != NULL)
      stats.mean_rec_length= (ulong) my_strtoll10(row[5], (char**) 0, &error);
unknown's avatar
unknown committed
2903

Staale Smedseng's avatar
Staale Smedseng committed
2904
    stats.data_file_length= stats.records * stats.mean_rec_length;
unknown's avatar
unknown committed
2905

2906
    if (row[12] != NULL)
Staale Smedseng's avatar
Staale Smedseng committed
2907
      stats.update_time=     (ulong) my_strtoll10(row[12], (char**) 0,
2908
                                                      &error);
2909
    if (row[13] != NULL)
Staale Smedseng's avatar
Staale Smedseng committed
2910
      stats.check_time=      (ulong) my_strtoll10(row[13], (char**) 0,
2911
                                                      &error);
unknown's avatar
unknown committed
2912 2913 2914 2915 2916

    /*
      size of IO operations (This is based on a good guess, no high science
      involved)
    */
2917
    if (flag & HA_STATUS_CONST)
2918
      stats.block_size= 4096;
unknown's avatar
unknown committed
2919

2920 2921
  }

unknown's avatar
unknown committed
2922
  if (flag & HA_STATUS_AUTO)
Konstantin Osipov's avatar
Konstantin Osipov committed
2923
    stats.auto_increment_value= mysql->insert_id;
unknown's avatar
unknown committed
2924 2925

  mysql_free_result(result);
2926

2927
  DBUG_RETURN(0);
2928 2929

error:
unknown's avatar
unknown committed
2930 2931 2932
  mysql_free_result(result);
  if (mysql)
  {
2933 2934
    my_printf_error(error_code, ": %d : %s", MYF(0),
                    mysql_errno(mysql), mysql_error(mysql));
unknown's avatar
unknown committed
2935 2936
  }
  else
unknown's avatar
unknown committed
2937
  if (remote_error_number != -1 /* error already reported */)
unknown's avatar
unknown committed
2938 2939 2940 2941
  {
    error_code= remote_error_number;
    my_error(error_code, MYF(0), ER(error_code));
  }
2942
  DBUG_RETURN(error_code);
2943 2944 2945
}


unknown's avatar
unknown committed
2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961
/**
  @brief Handles extra signals from MySQL server

  @param[in] operation  Hint for storage engine

  @return Operation Status
  @retval 0     OK
 */
int ha_federated::extra(ha_extra_function operation)
{
  DBUG_ENTER("ha_federated::extra");
  switch (operation) {
  case HA_EXTRA_IGNORE_DUP_KEY:
    ignore_duplicates= TRUE;
    break;
  case HA_EXTRA_NO_IGNORE_DUP_KEY:
unknown's avatar
unknown committed
2962
    insert_dup_update= FALSE;
unknown's avatar
unknown committed
2963 2964 2965 2966 2967 2968
    ignore_duplicates= FALSE;
    break;
  case HA_EXTRA_WRITE_CAN_REPLACE:
    replace_duplicates= TRUE;
    break;
  case HA_EXTRA_WRITE_CANNOT_REPLACE:
unknown's avatar
unknown committed
2969 2970 2971 2972
    /*
      We use this flag to ensure that we do not create an "INSERT IGNORE"
      statement when inserting new rows into the remote table.
    */
unknown's avatar
unknown committed
2973 2974
    replace_duplicates= FALSE;
    break;
unknown's avatar
unknown committed
2975 2976 2977
  case HA_EXTRA_INSERT_WITH_UPDATE:
    insert_dup_update= TRUE;
    break;
unknown's avatar
unknown committed
2978 2979 2980 2981 2982 2983 2984 2985
  default:
    /* do nothing */
    DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
  }
  DBUG_RETURN(0);
}


unknown's avatar
unknown committed
2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000
/**
  @brief Reset state of file to after 'open'.

  @detail This function is called after every statement for all tables
    used by that statement.

  @return Operation status
    @retval     0       OK
*/

int ha_federated::reset(void)
{
  insert_dup_update= FALSE;
  ignore_duplicates= FALSE;
  replace_duplicates= FALSE;
3001 3002 3003 3004 3005 3006 3007 3008 3009 3010

  /* Free stored result sets. */
  for (uint i= 0; i < results.elements; i++)
  {
    MYSQL_RES *result;
    get_dynamic(&results, (uchar *) &result, i);
    mysql_free_result(result);
  }
  reset_dynamic(&results);

unknown's avatar
unknown committed
3011 3012 3013 3014
  return 0;
}


3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025
/*
  Used to delete all rows in a table. Both for cases of truncate and
  for cases where the optimizer realizes that all rows will be
  removed as a result of a SQL statement.

  Called from item_sum.cc by Item_func_group_concat::clear(),
  Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
  Called from sql_delete.cc by mysql_delete().
  Called from sql_select.cc by JOIN::reinit().
  Called from sql_union.cc by st_select_lex_unit::exec().
*/
unknown's avatar
unknown committed
3026

3027 3028
int ha_federated::delete_all_rows()
{
3029
  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
3030
  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3031 3032
  DBUG_ENTER("ha_federated::delete_all_rows");

3033 3034 3035
  query.length(0);

  query.set_charset(system_charset_info);
3036
  query.append(STRING_WITH_LEN("TRUNCATE "));
unknown's avatar
unknown committed
3037 3038
  append_ident(&query, share->table_name, share->table_name_length,
               ident_quote_char);
3039

3040 3041 3042
  /*
    TRUNCATE won't return anything in mysql_affected_rows
  */
unknown's avatar
unknown committed
3043
  if (real_query(query.ptr(), query.length()))
unknown's avatar
unknown committed
3044
  {
3045
    DBUG_RETURN(stash_remote_error());
3046
  }
3047 3048
  stats.deleted+= stats.records;
  stats.records= 0;
3049
  DBUG_RETURN(0);
3050 3051 3052
}


3053 3054 3055 3056 3057 3058 3059 3060 3061 3062
/*
  Used to manually truncate the table via a delete of all rows in a table.
*/

int ha_federated::truncate()
{
  return delete_all_rows();
}


3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091
/*
  The idea with handler::store_lock() is the following:

  The statement decided which locks we should need for the table
  for updates/deletes/inserts we get WRITE locks, for SELECT... we get
  read locks.

  Before adding the lock into the table lock handler (see thr_lock.c)
  mysqld calls store lock with the requested locks.  Store lock can now
  modify a write lock to a read lock (or some other lock), ignore the
  lock (if we don't want to use MySQL table locks at all) or add locks
  for many tables (like we do when we are using a MERGE handler).

  Berkeley DB for federated  changes all WRITE locks to TL_WRITE_ALLOW_WRITE
  (which signals that we are doing WRITES, but we are still allowing other
  reader's and writer's.

  When releasing locks, store_lock() are also called. In this case one
  usually doesn't have to do anything.

  In some exceptional cases MySQL may send a request for a TL_IGNORE;
  This means that we are requesting the same lock as last time and this
  should also be ignored. (This may happen when someone does a flush
  table when we have opened a part of the tables, in which case mysqld
  closes and reopens the tables and tries to get the same locks at last
  time).  In the future we will probably try to remove this.

  Called from lock.cc by get_lock_data().
*/
unknown's avatar
unknown committed
3092

3093
THR_LOCK_DATA **ha_federated::store_lock(THD *thd,
unknown's avatar
unknown committed
3094 3095
                                         THR_LOCK_DATA **to,
                                         enum thr_lock_type lock_type)
3096
{
unknown's avatar
unknown committed
3097
  DBUG_ENTER("ha_federated::store_lock");
unknown's avatar
unknown committed
3098
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3099
  {
unknown's avatar
unknown committed
3100 3101 3102 3103 3104
    /*
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
      TABLESPACE, then allow multiple writers
3105 3106 3107
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3108
         lock_type <= TL_WRITE) && !thd->in_lock_tables)
3109 3110
      lock_type= TL_WRITE_ALLOW_WRITE;

unknown's avatar
unknown committed
3111 3112 3113 3114 3115 3116
    /*
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2.
3117 3118
    */

unknown's avatar
unknown committed
3119
    if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3120 3121 3122 3123 3124 3125 3126
      lock_type= TL_READ;

    lock.type= lock_type;
  }

  *to++= &lock;

unknown's avatar
unknown committed
3127
  DBUG_RETURN(to);
3128 3129 3130 3131
}

/*
  create() does nothing, since we have no local setup of our own.
unknown's avatar
unknown committed
3132
  FUTURE: We should potentially connect to the foreign database and
3133
*/
unknown's avatar
unknown committed
3134

3135
int ha_federated::create(const char *name, TABLE *table_arg,
unknown's avatar
unknown committed
3136
                         HA_CREATE_INFO *create_info)
3137
{
3138
  int retval;
3139
  THD *thd= current_thd;
3140
  FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
3141
  DBUG_ENTER("ha_federated::create");
3142

3143
  retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1);
3144 3145

  DBUG_RETURN(retval);
3146

3147
}
3148 3149


unknown's avatar
unknown committed
3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161
int ha_federated::real_connect()
{
  char buffer[FEDERATED_QUERY_BUFFER_SIZE];
  String sql_query(buffer, sizeof(buffer), &my_charset_bin);
  DBUG_ENTER("ha_federated::real_connect");

  /* 
    Bug#25679
    Ensure that we do not hold the LOCK_open mutex while attempting
    to establish Federated connection to guard against a trivial
    Denial of Service scenerio.
  */
Marc Alff's avatar
Marc Alff committed
3162
  mysql_mutex_assert_not_owner(&LOCK_open);
unknown's avatar
unknown committed
3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203

  DBUG_ASSERT(mysql == NULL);

  if (!(mysql= mysql_init(NULL)))
  {
    remote_error_number= HA_ERR_OUT_OF_MEM;
    DBUG_RETURN(-1);
  }

  /*
    BUG# 17044 Federated Storage Engine is not UTF8 clean
    Add set names to whatever charset the table is at open
    of table
  */
  /* this sets the csname like 'set names utf8' */
  mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
                this->table->s->table_charset->csname);

  sql_query.length(0);

  if (!mysql_real_connect(mysql,
                          share->hostname,
                          share->username,
                          share->password,
                          share->database,
                          share->port,
                          share->socket, 0))
  {
    stash_remote_error();
    mysql_close(mysql);
    mysql= NULL;
    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), remote_error_buf);
    remote_error_number= -1;
    DBUG_RETURN(-1);
  }

  /*
    We have established a connection, lets try a simple dummy query just 
    to check that the table and expected columns are present.
  */
  sql_query.append(share->select_query);
3204
  sql_query.append(STRING_WITH_LEN(" WHERE 1=0"));
unknown's avatar
unknown committed
3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233
  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
  {
    sql_query.length(0);
    sql_query.append("error: ");
    sql_query.qs_append(mysql_errno(mysql));
    sql_query.append("  '");
    sql_query.append(mysql_error(mysql));
    sql_query.append("'");
    mysql_close(mysql);
    mysql= NULL;
    my_error(ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST, MYF(0), sql_query.ptr());
    remote_error_number= -1;
    DBUG_RETURN(-1);
  }

  /* Just throw away the result, no rows anyways but need to keep in sync */
  mysql_free_result(mysql_store_result(mysql));

  /*
    Since we do not support transactions at this version, we can let the client
    API silently reconnect. For future versions, we will need more logic to
    deal with transactions
  */

  mysql->reconnect= 1;
  DBUG_RETURN(0);
}


3234
int ha_federated::real_query(const char *query, size_t length)
unknown's avatar
unknown committed
3235 3236 3237 3238 3239 3240 3241 3242 3243 3244
{
  int rc= 0;
  DBUG_ENTER("ha_federated::real_query");

  if (!mysql && (rc= real_connect()))
    goto end;

  if (!query || !length)
    goto end;

3245
  rc= mysql_real_query(mysql, query, (uint) length);
unknown's avatar
unknown committed
3246 3247 3248 3249 3250 3251
  
end:
  DBUG_RETURN(rc);
}


3252 3253 3254
int ha_federated::stash_remote_error()
{
  DBUG_ENTER("ha_federated::stash_remote_error()");
unknown's avatar
unknown committed
3255 3256
  if (!mysql)
    DBUG_RETURN(remote_error_number);
3257
  remote_error_number= mysql_errno(mysql);
unknown's avatar
unknown committed
3258
  strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
unknown's avatar
unknown committed
3259 3260 3261
  if (remote_error_number == ER_DUP_ENTRY ||
      remote_error_number == ER_DUP_KEY)
    DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3262 3263 3264 3265 3266 3267 3268 3269 3270 3271
  DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
}


bool ha_federated::get_error_message(int error, String* buf)
{
  DBUG_ENTER("ha_federated::get_error_message");
  DBUG_PRINT("enter", ("error: %d", error));
  if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
  {
3272
    buf->append(STRING_WITH_LEN("Error on remote system: "));
3273
    buf->qs_append(remote_error_number);
3274
    buf->append(STRING_WITH_LEN(": "));
3275
    buf->append(remote_error_buf);
3276 3277 3278 3279 3280 3281 3282 3283

    remote_error_number= 0;
    remote_error_buf[0]= '\0';
  }
  DBUG_PRINT("exit", ("message: %s", buf->ptr()));
  DBUG_RETURN(FALSE);
}

3284 3285 3286 3287 3288 3289 3290

/**
  @brief      Store a result set.

  @details    Call @c mysql_store_result() to save a result set then
              append it to the stored results array.

3291
  @param[in]  mysql_arg  MySLQ connection structure.
3292 3293 3294 3295

  @return     Stored result set (MYSQL_RES object).
*/

3296
MYSQL_RES *ha_federated::store_result(MYSQL *mysql_arg)
3297
{
3298
  MYSQL_RES *result= mysql_store_result(mysql_arg);
3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322
  DBUG_ENTER("ha_federated::store_result");
  if (result)
  {
    (void) insert_dynamic(&results, (uchar*) &result);
  }
  position_called= FALSE;
  DBUG_RETURN(result);
}


void ha_federated::free_result()
{
  DBUG_ENTER("ha_federated::free_result");
  if (stored_result && !position_called)
  {
    mysql_free_result(stored_result);
    stored_result= 0;
    if (results.elements > 0)
      results.elements--;
  }
  DBUG_VOID_RETURN;
}

 
3323 3324 3325 3326 3327
int ha_federated::external_lock(THD *thd, int lock_type)
{
  int error= 0;
  DBUG_ENTER("ha_federated::external_lock");

unknown's avatar
unknown committed
3328 3329 3330 3331
  /*
    Support for transactions disabled until WL#2952 fixes it.
  */
#ifdef XXX_SUPERCEDED_BY_WL2952
3332 3333
  if (lock_type != F_UNLCK)
  {
3334
    ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, ht);
3335

3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348
    DBUG_PRINT("info",("federated not lock F_UNLCK"));
    if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
    {
      DBUG_PRINT("info",("federated autocommit"));
      /* 
        This means we are doing an autocommit
      */
      error= connection_autocommit(TRUE);
      if (error)
      {
        DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error));
        DBUG_RETURN(error);
      }
3349
      trans_register_ha(thd, FALSE, ht);
3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364
    }
    else 
    { 
      DBUG_PRINT("info",("not autocommit"));
      if (!trx)
      {
        /* 
          This is where a transaction gets its start
        */
        error= connection_autocommit(FALSE);
        if (error)
        { 
          DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error));
          DBUG_RETURN(error);
        }
3365
        thd_set_ha_data(thd, ht, this);
3366
        trans_register_ha(thd, TRUE, ht);
3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386
        /*
          Send a lock table to the remote end.
          We do not support this at the moment
        */
        if (thd->options & (OPTION_TABLE_LOCK))
        {
          DBUG_PRINT("info", ("We do not support lock table yet"));
        }
      }
      else
      {
        ha_federated *ptr;
        for (ptr= trx; ptr; ptr= ptr->trx_next)
          if (ptr == this)
            break;
          else if (!ptr->trx_next)
            ptr->trx_next= this;
      }
    }
  }
unknown's avatar
unknown committed
3387
#endif /* XXX_SUPERCEDED_BY_WL2952 */
3388
  DBUG_RETURN(error);
3389 3390 3391
}


3392
static int federated_commit(handlerton *hton, THD *thd, bool all)
3393 3394
{
  int return_val= 0;
3395
  ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, hton);
3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406
  DBUG_ENTER("federated_commit");

  if (all)
  {
    int error= 0;
    ha_federated *ptr, *old= NULL;
    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
    {
      if (old)
        old->trx_next= NULL;
      error= ptr->connection_commit();
3407
      if (error && !return_val)
3408 3409
        return_val= error;
    }
3410
    thd_set_ha_data(thd, hton, NULL);
3411 3412 3413 3414 3415 3416 3417
  }

  DBUG_PRINT("info", ("error val: %d", return_val));
  DBUG_RETURN(return_val);
}


3418
static int federated_rollback(handlerton *hton, THD *thd, bool all)
3419 3420
{
  int return_val= 0;
3421
  ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, hton);
3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435
  DBUG_ENTER("federated_rollback");

  if (all)
  {
    int error= 0;
    ha_federated *ptr, *old= NULL;
    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
    {
      if (old)
        old->trx_next= NULL;
      error= ptr->connection_rollback();
      if (error && !return_val)
        return_val= error;
    }
3436
    thd_set_ha_data(thd, hton, NULL);
3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460
  }

  DBUG_PRINT("info", ("error val: %d", return_val));
  DBUG_RETURN(return_val);
}

int ha_federated::connection_commit()
{
  DBUG_ENTER("ha_federated::connection_commit");
  DBUG_RETURN(execute_simple_query("COMMIT", 6));
}


int ha_federated::connection_rollback()
{
  DBUG_ENTER("ha_federated::connection_rollback");
  DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
}


int ha_federated::connection_autocommit(bool state)
{
  const char *text;
  DBUG_ENTER("ha_federated::connection_autocommit");
3461
  text= (state == TRUE) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
3462
  DBUG_RETURN(execute_simple_query(text, 16));
unknown's avatar
unknown committed
3463
}
3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476


int ha_federated::execute_simple_query(const char *query, int len)
{
  DBUG_ENTER("ha_federated::execute_simple_query");

  if (mysql_real_query(mysql, query, len))
  {
    DBUG_RETURN(stash_remote_error());
  }
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
3477
struct st_mysql_storage_engine federated_storage_engine=
3478
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
unknown's avatar
unknown committed
3479 3480 3481 3482

mysql_declare_plugin(federated)
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
unknown's avatar
unknown committed
3483 3484
  &federated_storage_engine,
  "FEDERATED",
unknown's avatar
unknown committed
3485
  "Patrick Galbraith and Brian Aker, MySQL AB",
unknown's avatar
unknown committed
3486
  "Federated MySQL storage engine",
3487
  PLUGIN_LICENSE_GPL,
unknown's avatar
unknown committed
3488
  federated_db_init, /* Plugin Init */
3489
  federated_done, /* Plugin Deinit */
unknown's avatar
unknown committed
3490
  0x0100 /* 1.0 */,
3491 3492
  NULL,                       /* status variables                */
  NULL,                       /* system variables                */
3493 3494
  NULL,                       /* config options                  */
  0,                          /* flags                           */
unknown's avatar
unknown committed
3495 3496
}
mysql_declare_plugin_end;