Commit af7e3fce authored by eric@mysql.com's avatar eric@mysql.com

Re-applying the work initially done by Brian, and since worked upon by me...

Re-applying the work initially done by Brian, and since worked upon by me previously in several separate patches to the 5.1 parent but never pushed.
WL#2952 - add simple single-table only transactions to federated.
parent ee3cd794
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP TABLE IF EXISTS federated.t1;
Warnings:
Note 1051 Unknown table 't1'
CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(32) NOT NULL default ''
)
DEFAULT CHARSET=latin1 ENGINE=BerkeleyDB;
DROP TABLE IF EXISTS federated.t1;
Warnings:
Note 1051 Unknown table 't1'
CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(32) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
set autocommit=0;
INSERT INTO federated.t1 (id, name) VALUES (1, 'foo');
INSERT INTO federated.t1 (id, name) VALUES (2, 'fee');
COMMIT;
INSERT INTO federated.t1 (id, name) VALUES (3, 'fie');
INSERT INTO federated.t1 (id, name) VALUES (4, 'fum');
ROLLBACK;
set autocommit=1;
INSERT INTO federated.t1 (id, name) VALUES (5, 'foe');
INSERT INTO federated.t1 (id, name) VALUES (6, 'fig');
SELECT * FROM federated.t1;
id name
1 foo
2 fee
5 foe
6 fig
DELETE FROM federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
source include/federated.inc;
connection slave;
DROP TABLE IF EXISTS federated.t1;
#SHOW ENGINES;
CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(32) NOT NULL default ''
)
DEFAULT CHARSET=latin1 ENGINE=BerkeleyDB;
connection master;
DROP TABLE IF EXISTS federated.t1;
# # correct connection, same named tables
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(32) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1';
set autocommit=0;
INSERT INTO federated.t1 (id, name) VALUES (1, 'foo');
INSERT INTO federated.t1 (id, name) VALUES (2, 'fee');
COMMIT;
INSERT INTO federated.t1 (id, name) VALUES (3, 'fie');
INSERT INTO federated.t1 (id, name) VALUES (4, 'fum');
ROLLBACK;
set autocommit=1;
INSERT INTO federated.t1 (id, name) VALUES (5, 'foe');
INSERT INTO federated.t1 (id, name) VALUES (6, 'fig');
SELECT * FROM federated.t1;
DELETE FROM federated.t1;
source include/federated_cleanup.inc;
...@@ -363,9 +363,9 @@ static int federated_init= FALSE; // Variable for checking the ...@@ -363,9 +363,9 @@ static int federated_init= FALSE; // Variable for checking the
// init state of hash // init state of hash
/* Static declaration for handerton */ /* Static declaration for handerton */
static handler *federated_create_handler(TABLE *table); static handler *federated_create_handler(TABLE *table);
static int federated_commit(THD *thd, bool all);
static int federated_rollback(THD *thd, bool all);
/* Federated storage engine handlerton */ /* Federated storage engine handlerton */
...@@ -381,8 +381,8 @@ handlerton federated_hton= { ...@@ -381,8 +381,8 @@ handlerton federated_hton= {
NULL, /* savepoint */ NULL, /* savepoint */
NULL, /* rollback to savepoint */ NULL, /* rollback to savepoint */
NULL, /* release savepoint */ NULL, /* release savepoint */
NULL, /* commit */ federated_commit, /* commit */
NULL, /* rollback */ federated_rollback, /* rollback */
NULL, /* prepare */ NULL, /* prepare */
NULL, /* recover */ NULL, /* recover */
NULL, /* commit_by_xid */ NULL, /* commit_by_xid */
...@@ -647,8 +647,8 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, ...@@ -647,8 +647,8 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table,
share->port= 0; share->port= 0;
share->socket= 0; share->socket= 0;
DBUG_PRINT("info", ("Length %d \n", table->s->connect_string.length)); DBUG_PRINT("info", ("Length: %d", table->s->connect_string.length));
DBUG_PRINT("info", ("String %.*s \n", table->s->connect_string.length, DBUG_PRINT("info", ("String: '%.*s'", table->s->connect_string.length,
table->s->connect_string.str)); table->s->connect_string.str));
share->scheme= my_strdup_with_length((const byte*)table->s-> share->scheme= my_strdup_with_length((const byte*)table->s->
connect_string.str, connect_string.str,
...@@ -740,7 +740,7 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table, ...@@ -740,7 +740,7 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table,
DBUG_PRINT("info", DBUG_PRINT("info",
("scheme %s username %s password %s \ ("scheme %s username %s password %s \
hostname %s port %d database %s tablename %s\n", hostname %s port %d database %s tablename %s",
share->scheme, share->username, share->password, share->scheme, share->username, share->password,
share->hostname, share->port, share->database, share->hostname, share->port, share->database,
share->table_name)); share->table_name));
...@@ -760,7 +760,9 @@ ha_federated::ha_federated(TABLE *table_arg) ...@@ -760,7 +760,9 @@ ha_federated::ha_federated(TABLE *table_arg)
:handler(&federated_hton, table_arg), :handler(&federated_hton, table_arg),
mysql(0), stored_result(0), scan_flag(0), mysql(0), stored_result(0), scan_flag(0),
ref_length(sizeof(MYSQL_ROW_OFFSET)), current_position(0) ref_length(sizeof(MYSQL_ROW_OFFSET)), current_position(0)
{} {
trx_next= 0;
}
/* /*
...@@ -1488,6 +1490,7 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) ...@@ -1488,6 +1490,7 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked)
with transactions with transactions
*/ */
mysql->reconnect= 1; mysql->reconnect= 1;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2633,3 +2636,151 @@ bool ha_federated::get_error_message(int error, String* buf) ...@@ -2633,3 +2636,151 @@ bool ha_federated::get_error_message(int error, String* buf)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
int ha_federated::external_lock(THD *thd, int lock_type)
{
int error= 0;
ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
DBUG_ENTER("ha_federated::external_lock");
if (lock_type != F_UNLCK)
{
DBUG_PRINT("info",("federated not lock F_UNLCK"));
if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{
DBUG_PRINT("info",("federated autocommit"));
/*
This means we are doing an autocommit
*/
error= connection_autocommit(TRUE);
if (error)
{
DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error));
DBUG_RETURN(error);
}
trans_register_ha(thd, FALSE, &federated_hton);
}
else
{
DBUG_PRINT("info",("not autocommit"));
if (!trx)
{
/*
This is where a transaction gets its start
*/
error= connection_autocommit(FALSE);
if (error)
{
DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error));
DBUG_RETURN(error);
}
thd->ha_data[federated_hton.slot]= this;
trans_register_ha(thd, TRUE, &federated_hton);
/*
Send a lock table to the remote end.
We do not support this at the moment
*/
if (thd->options & (OPTION_TABLE_LOCK))
{
DBUG_PRINT("info", ("We do not support lock table yet"));
}
}
else
{
ha_federated *ptr;
for (ptr= trx; ptr; ptr= ptr->trx_next)
if (ptr == this)
break;
else if (!ptr->trx_next)
ptr->trx_next= this;
}
}
}
DBUG_RETURN(0);
}
static int federated_commit(THD *thd, bool all)
{
int return_val= 0;
ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
DBUG_ENTER("federated_commit");
if (all)
{
int error= 0;
ha_federated *ptr, *old= NULL;
for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
{
if (old)
old->trx_next= NULL;
error= ptr->connection_commit();
if (error && !return_val);
return_val= error;
}
thd->ha_data[federated_hton.slot]= NULL;
}
DBUG_PRINT("info", ("error val: %d", return_val));
DBUG_RETURN(return_val);
}
static int federated_rollback(THD *thd, bool all)
{
int return_val= 0;
ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
DBUG_ENTER("federated_rollback");
if (all)
{
int error= 0;
ha_federated *ptr, *old= NULL;
for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
{
if (old)
old->trx_next= NULL;
error= ptr->connection_rollback();
if (error && !return_val)
return_val= error;
}
thd->ha_data[federated_hton.slot]= NULL;
}
DBUG_PRINT("info", ("error val: %d", return_val));
DBUG_RETURN(return_val);
}
int ha_federated::connection_commit()
{
DBUG_ENTER("ha_federated::connection_commit");
DBUG_RETURN(execute_simple_query("COMMIT", 6));
}
int ha_federated::connection_rollback()
{
DBUG_ENTER("ha_federated::connection_rollback");
DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
}
int ha_federated::connection_autocommit(bool state)
{
const char *text;
DBUG_ENTER("ha_federated::connection_autocommit");
text= (state == true) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
DBUG_RETURN(execute_simple_query(text, 16));
}
int ha_federated::execute_simple_query(const char *query, int len)
{
DBUG_ENTER("ha_federated::execute_simple_query");
if (mysql_real_query(mysql, query, len))
{
DBUG_RETURN(stash_remote_error());
}
DBUG_RETURN(0);
}
...@@ -174,11 +174,13 @@ private: ...@@ -174,11 +174,13 @@ private:
public: public:
ha_federated(TABLE *table_arg); ha_federated(TABLE *table_arg);
~ha_federated() ~ha_federated() {}
{
}
/* The name that will be used for display purposes */ /* The name that will be used for display purposes */
const char *table_type() const { return "FEDERATED"; } const char *table_type() const { return "FEDERATED"; }
/*
Next pointer used in transaction
*/
ha_federated *trx_next;
/* /*
The name of the index type that will be used for display The name of the index type that will be used for display
don't implement this method unless you really have indexes don't implement this method unless you really have indexes
...@@ -298,7 +300,14 @@ public: ...@@ -298,7 +300,14 @@ public:
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type); //required enum thr_lock_type lock_type); //required
virtual bool get_error_message(int error, String *buf); virtual bool get_error_message(int error, String *buf);
int external_lock(THD *thd, int lock_type);
int connection_commit();
int connection_rollback();
bool has_transactions() { return 1; }
int connection_autocommit(bool state);
int execute_simple_query(const char *query, int len);
}; };
bool federated_db_init(void); bool federated_db_init(void);
int federated_db_end(ha_panic_function type); int federated_db_end(ha_panic_function type);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment