Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
bc3e18cd
Commit
bc3e18cd
authored
Jun 30, 2007
by
istruewing@synthia.local
Browse files
Options
Browse Files
Download
Plain Diff
Merge synthia.local:/home/mydev/mysql-5.0-axmrg
into synthia.local:/home/mydev/mysql-5.1-axmrg
parents
e091ce28
c54bce13
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
549 additions
and
148 deletions
+549
-148
include/my_base.h
include/my_base.h
+6
-1
mysql-test/r/federated.result
mysql-test/r/federated.result
+39
-0
mysql-test/r/federated_innodb.result
mysql-test/r/federated_innodb.result
+34
-0
mysql-test/t/federated.test
mysql-test/t/federated.test
+53
-0
mysql-test/t/federated_innodb-slave.opt
mysql-test/t/federated_innodb-slave.opt
+1
-0
mysql-test/t/federated_innodb.test
mysql-test/t/federated_innodb.test
+34
-0
sql/sql_insert.cc
sql/sql_insert.cc
+8
-0
storage/federated/ha_federated.cc
storage/federated/ha_federated.cc
+359
-139
storage/federated/ha_federated.h
storage/federated/ha_federated.h
+15
-8
No files found.
include/my_base.h
View file @
bc3e18cd
...
...
@@ -180,7 +180,12 @@ enum ha_extra_function {
These flags are reset by the handler::extra(HA_EXTRA_RESET) call.
*/
HA_EXTRA_DELETE_CANNOT_BATCH
,
HA_EXTRA_UPDATE_CANNOT_BATCH
HA_EXTRA_UPDATE_CANNOT_BATCH
,
/*
Inform handler that an "INSERT...ON DUPLICATE KEY UPDATE" will be
executed. This condition is unset by HA_EXTRA_NO_IGNORE_DUP_KEY.
*/
HA_EXTRA_INSERT_WITH_UPDATE
};
/* The following is parameter to ha_panic() */
...
...
mysql-test/r/federated.result
View file @
bc3e18cd
...
...
@@ -1843,6 +1843,45 @@ C3A4C3B6C3BCC39F
D18DD184D184D0B5D0BAD182D0B8D0B2D0BDD183D18E
drop table federated.t1;
drop table federated.t1;
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'
DEFAULT CHARSET=utf8;
insert ignore into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
a b
1 Larry
2 Curly
truncate federated.t1;
replace into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
a b
1 Moe
2 Curly
update ignore federated.t1 set a=a+1;
select * from federated.t1;
a b
1 Moe
3 Curly
drop table federated.t1;
drop table federated.t1;
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'
DEFAULT CHARSET=utf8;
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe")
on duplicate key update a=a+100;
ERROR 23000: Can't write; duplicate key in table 't1'
select * from federated.t1;
a b
1 Larry
2 Curly
drop table federated.t1;
drop table federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
...
...
mysql-test/r/federated_innodb.result
0 → 100644
View file @
bc3e18cd
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
create table federated.t1 (a int primary key, b varchar(64))
engine=myisam;
create table federated.t1 (a int primary key, b varchar(64))
engine=federated
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
ERROR 23000: Can't write; duplicate key in table 't1'
select * from federated.t1;
a b
1 Larry
2 Curly
truncate federated.t1;
alter table federated.t1 engine=innodb;
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
ERROR 23000: Can't write; duplicate key in table 't1'
select * from federated.t1;
a b
drop table federated.t1;
drop table federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
mysql-test/t/federated.test
View file @
bc3e18cd
...
...
@@ -1630,4 +1630,57 @@ connection slave;
drop
table
federated
.
t1
;
#
# BUG#21019 Federated Engine does not support REPLACE/INSERT IGNORE/UPDATE IGNORE
#
connection
slave
;
create
table
federated
.
t1
(
a
int
primary
key
,
b
varchar
(
64
))
DEFAULT
CHARSET
=
utf8
;
connection
master
;
--
replace_result
$SLAVE_MYPORT
SLAVE_PORT
eval
create
table
federated
.
t1
(
a
int
primary
key
,
b
varchar
(
64
))
ENGINE
=
FEDERATED
connection
=
'mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'
DEFAULT
CHARSET
=
utf8
;
insert
ignore
into
federated
.
t1
values
(
1
,
"Larry"
),
(
2
,
"Curly"
),
(
1
,
"Moe"
);
select
*
from
federated
.
t1
;
truncate
federated
.
t1
;
replace
into
federated
.
t1
values
(
1
,
"Larry"
),
(
2
,
"Curly"
),
(
1
,
"Moe"
);
select
*
from
federated
.
t1
;
update
ignore
federated
.
t1
set
a
=
a
+
1
;
select
*
from
federated
.
t1
;
drop
table
federated
.
t1
;
connection
slave
;
drop
table
federated
.
t1
;
#
# BUG#25511 Federated Insert failures.
#
# When the user performs a INSERT...ON DUPLICATE KEY UPDATE, we want
# it to fail if a duplicate key exists instead of ignoring it.
#
connection
slave
;
create
table
federated
.
t1
(
a
int
primary
key
,
b
varchar
(
64
))
DEFAULT
CHARSET
=
utf8
;
connection
master
;
--
replace_result
$SLAVE_MYPORT
SLAVE_PORT
eval
create
table
federated
.
t1
(
a
int
primary
key
,
b
varchar
(
64
))
ENGINE
=
FEDERATED
connection
=
'mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'
DEFAULT
CHARSET
=
utf8
;
--
error
ER_DUP_KEY
insert
into
federated
.
t1
values
(
1
,
"Larry"
),
(
2
,
"Curly"
),
(
1
,
"Moe"
)
on
duplicate
key
update
a
=
a
+
100
;
select
*
from
federated
.
t1
;
drop
table
federated
.
t1
;
connection
slave
;
drop
table
federated
.
t1
;
source
include
/
federated_cleanup
.
inc
;
mysql-test/t/federated_innodb-slave.opt
0 → 100644
View file @
bc3e18cd
--innodb
mysql-test/t/federated_innodb.test
0 → 100644
View file @
bc3e18cd
source
include
/
federated
.
inc
;
source
include
/
have_innodb
.
inc
;
#
# Bug#25513 Federated transaction failures
#
connection
slave
;
create
table
federated
.
t1
(
a
int
primary
key
,
b
varchar
(
64
))
engine
=
myisam
;
connection
master
;
--
replace_result
$SLAVE_MYPORT
SLAVE_PORT
eval
create
table
federated
.
t1
(
a
int
primary
key
,
b
varchar
(
64
))
engine
=
federated
connection
=
'mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'
;
--
error
ER_DUP_KEY
insert
into
federated
.
t1
values
(
1
,
"Larry"
),
(
2
,
"Curly"
),
(
1
,
"Moe"
);
select
*
from
federated
.
t1
;
connection
slave
;
truncate
federated
.
t1
;
alter
table
federated
.
t1
engine
=
innodb
;
connection
master
;
--
error
ER_DUP_KEY
insert
into
federated
.
t1
values
(
1
,
"Larry"
),
(
2
,
"Curly"
),
(
1
,
"Moe"
);
select
*
from
federated
.
t1
;
drop
table
federated
.
t1
;
connection
slave
;
drop
table
federated
.
t1
;
source
include
/
federated_cleanup
.
inc
;
sql/sql_insert.cc
View file @
bc3e18cd
...
...
@@ -695,6 +695,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
if
(
duplic
==
DUP_REPLACE
&&
(
!
table
->
triggers
||
!
table
->
triggers
->
has_delete_triggers
()))
table
->
file
->
extra
(
HA_EXTRA_WRITE_CAN_REPLACE
);
if
(
duplic
==
DUP_UPDATE
)
table
->
file
->
extra
(
HA_EXTRA_INSERT_WITH_UPDATE
);
/*
let's *try* to start bulk inserts. It won't necessary
start them as values_list.elements should be greater than
...
...
@@ -2538,6 +2540,8 @@ bool Delayed_insert::handle_inserts(void)
table
->
file
->
extra
(
HA_EXTRA_WRITE_CAN_REPLACE
);
using_opt_replace
=
1
;
}
if
(
info
.
handle_duplicates
==
DUP_UPDATE
)
table
->
file
->
extra
(
HA_EXTRA_INSERT_WITH_UPDATE
);
thd
.
clear_error
();
// reset error for binlog
if
(
write_record
(
&
thd
,
table
,
&
info
))
{
...
...
@@ -2882,6 +2886,8 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
if
(
info
.
handle_duplicates
==
DUP_REPLACE
&&
(
!
table
->
triggers
||
!
table
->
triggers
->
has_delete_triggers
()))
table
->
file
->
extra
(
HA_EXTRA_WRITE_CAN_REPLACE
);
if
(
info
.
handle_duplicates
==
DUP_UPDATE
)
table
->
file
->
extra
(
HA_EXTRA_INSERT_WITH_UPDATE
);
thd
->
no_trans_update
.
stmt
=
FALSE
;
thd
->
abort_on_warning
=
(
!
info
.
ignore
&&
(
thd
->
variables
.
sql_mode
&
...
...
@@ -3469,6 +3475,8 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
if
(
info
.
handle_duplicates
==
DUP_REPLACE
&&
(
!
table
->
triggers
||
!
table
->
triggers
->
has_delete_triggers
()))
table
->
file
->
extra
(
HA_EXTRA_WRITE_CAN_REPLACE
);
if
(
info
.
handle_duplicates
==
DUP_UPDATE
)
table
->
file
->
extra
(
HA_EXTRA_INSERT_WITH_UPDATE
);
if
(
!
thd
->
prelocked_mode
)
table
->
file
->
ha_start_bulk_insert
((
ha_rows
)
0
);
thd
->
no_trans_update
.
stmt
=
FALSE
;
...
...
storage/federated/ha_federated.cc
View file @
bc3e18cd
...
...
@@ -388,6 +388,11 @@
/* Variables for federated share methods */
static
HASH
federated_open_tables
;
// To track open tables
pthread_mutex_t
federated_mutex
;
// To init the hash
static
char
ident_quote_char
=
'`'
;
// Character for quoting
// identifiers
static
char
value_quote_char
=
'\''
;
// Character for quoting
// literals
static
const
int
bulk_padding
=
64
;
// bytes "overhead" in packet
/* Variables used when chopping off trailing characters */
static
const
uint
sizeof_trailing_comma
=
sizeof
(
", "
)
-
1
;
...
...
@@ -415,7 +420,7 @@ static handler *federated_create_handler(handlerton *hton,
/* Function we use in the creation of our hash to get key */
static
uchar
*
federated_get_key
(
FEDERATED_SHARE
*
share
,
size_t
*
length
,
my_bool
not_used
__attribute__
((
unused
)))
my_bool
not_used
__attribute__
((
unused
)))
{
*
length
=
share
->
share_key_length
;
return
(
uchar
*
)
share
->
share_key
;
...
...
@@ -477,6 +482,57 @@ int federated_done(void *p)
}
/**
@brief Append identifiers to the string.
@param[in,out] string The target string.
@param[in] name Identifier name
@param[in] length Length of identifier name in bytes
@param[in] quote_char Quote char to use for quoting identifier.
@return Operation Status
@retval FALSE OK
@retval TRUE There was an error appending to the string.
@note This function is based upon the append_identifier() function
in sql_show.cc except that quoting always occurs.
*/
static
bool
append_ident
(
String
*
string
,
const
char
*
name
,
uint
length
,
const
char
quote_char
)
{
bool
result
;
uint
clen
;
const
char
*
name_end
;
DBUG_ENTER
(
"append_ident"
);
if
(
quote_char
)
{
string
->
reserve
(
length
*
2
+
2
);
if
((
result
=
string
->
append
(
&
quote_char
,
1
,
system_charset_info
)))
goto
err
;
for
(
name_end
=
name
+
length
;
name
<
name_end
;
name
+=
clen
)
{
uchar
c
=
*
(
uchar
*
)
name
;
if
(
!
(
clen
=
my_mbcharlen
(
system_charset_info
,
c
)))
clen
=
1
;
if
(
clen
==
1
&&
c
==
(
uchar
)
quote_char
&&
(
result
=
string
->
append
(
&
quote_char
,
1
,
system_charset_info
)))
goto
err
;
if
((
result
=
string
->
append
(
name
,
clen
,
string
->
charset
())))
goto
err
;
}
result
=
string
->
append
(
&
quote_char
,
1
,
system_charset_info
);
}
else
result
=
string
->
append
(
name
,
length
,
system_charset_info
);
err:
DBUG_RETURN
(
result
);
}
/*
Check (in create) whether the tables exists, and that it can be connected to
...
...
@@ -495,7 +551,6 @@ int federated_done(void *p)
static
int
check_foreign_data_source
(
FEDERATED_SHARE
*
share
,
bool
table_create_flag
)
{
char
escaped_table_name
[
NAME_LEN
*
2
];
char
query_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
char
error_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
uint
error_code
;
...
...
@@ -536,7 +591,6 @@ static int check_foreign_data_source(FEDERATED_SHARE *share,
}
else
{
int
escaped_table_name_length
=
0
;
/*
Since we do not support transactions at this version, we can let the
client API silently reconnect. For future versions, we will need more
...
...
@@ -551,14 +605,10 @@ static int check_foreign_data_source(FEDERATED_SHARE *share,
the query will be: SELECT * FROM `tablename` WHERE 1=0
*/
query
.
append
(
STRING_WITH_LEN
(
"SELECT * FROM `"
));
escaped_table_name_length
=
escape_string_for_mysql
(
&
my_charset_bin
,
(
char
*
)
escaped_table_name
,
sizeof
(
escaped_table_name
),
share
->
table_name
,
share
->
table_name_length
);
query
.
append
(
escaped_table_name
,
escaped_table_name_length
);
query
.
append
(
STRING_WITH_LEN
(
"` WHERE 1=0"
));
query
.
append
(
STRING_WITH_LEN
(
"SELECT * FROM "
));
append_ident
(
&
query
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
query
.
append
(
STRING_WITH_LEN
(
" WHERE 1=0"
);
if
(
mysql_real_query
(
mysql
,
query
.
ptr
(),
query
.
length
()))
{
...
...
@@ -907,6 +957,7 @@ ha_federated::ha_federated(handlerton *hton,
mysql
(
0
),
stored_result
(
0
)
{
trx_next
=
0
;
bzero
(
&
bulk_insert
,
sizeof
(
bulk_insert
));
}
...
...
@@ -969,9 +1020,8 @@ uint ha_federated::convert_row_to_internal_format(uchar *record,
static
bool
emit_key_part_name
(
String
*
to
,
KEY_PART_INFO
*
part
)
{
DBUG_ENTER
(
"emit_key_part_name"
);
if
(
to
->
append
(
STRING_WITH_LEN
(
"`"
))
||
to
->
append
(
part
->
field
->
field_name
)
||
to
->
append
(
STRING_WITH_LEN
(
"`"
)))
if
(
append_ident
(
to
,
part
->
field
->
field_name
,
strlen
(
part
->
field
->
field_name
),
ident_quote_char
))
DBUG_RETURN
(
1
);
// Out of memory
DBUG_RETURN
(
0
);
}
...
...
@@ -1515,20 +1565,20 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
query
.
append
(
STRING_WITH_LEN
(
"SELECT "
));
for
(
field
=
table
->
field
;
*
field
;
field
++
)
{
query
.
append
(
STRING_WITH_LEN
(
"`"
));
query
.
append
((
*
field
)
->
field_name
);
query
.
append
(
STRING_WITH_LEN
(
"
`
, "
));
append_ident
(
&
query
,
(
*
field
)
->
field_name
,
strlen
((
*
field
)
->
field_name
),
ident_quote_char
);
query
.
append
(
STRING_WITH_LEN
(
", "
));
}
/* chops off trailing comma */
query
.
length
(
query
.
length
()
-
sizeof_trailing_comma
);
query
.
append
(
STRING_WITH_LEN
(
" FROM `"
));
query
.
append
(
tmp_share
.
table_name
,
tmp_share
.
table_name_length
);
query
.
append
(
STRING_WITH_LEN
(
"`"
));
DBUG_PRINT
(
"info"
,
(
"calling alloc_root"
)
);
append_ident
(
&
query
,
tmp_share
.
table_name
,
tmp_share
.
table_name_length
,
ident_quote_char
);
if
(
!
(
share
=
(
FEDERATED_SHARE
*
)
memdup_root
(
&
mem_root
,
(
char
*
)
&
tmp_share
,
sizeof
(
*
share
)))
||
!
(
share
->
select_query
=
(
char
*
)
strmake_root
(
&
mem_root
,
query
.
ptr
(),
query
.
length
())))
!
(
share
->
select_query
=
(
char
*
)
strmake_root
(
&
mem_root
,
query
.
ptr
(),
query
.
length
()
+
1
)))
goto
error
;
share
->
use_count
=
0
;
...
...
@@ -1669,6 +1719,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked)
table
->
s
->
reclength
);
DBUG_PRINT
(
"info"
,
(
"ref_length: %u"
,
ref_length
));
reset
();
DBUG_RETURN
(
0
);
}
...
...
@@ -1741,85 +1793,107 @@ static inline uint field_in_record_is_null(TABLE *table,
}
/*
write_row() inserts a row. No extra() hint is given currently if a bulk load
is happeneding. buf() is a byte array of data. You can use the field
information to extract the data from the native byte array type.
Example of this would be:
for (Field **field=table->field ; *field ; field++)
{
...
}
Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
/**
@brief Construct the INSERT statement.
@details This method will construct the INSERT statement and appends it to
the supplied query string buffer.
@return
@retval FALSE No error
@retval TRUE Failure
*/
int
ha_federated
::
write_row
(
uchar
*
buf
)
bool
ha_federated
::
append_stmt_insert
(
String
*
query
)
{
/*
I need a bool again, in 5.0, I used table->s->fields to accomplish this.
This worked as a flag that says there are fields with values or not.
In 5.1, this value doesn't work the same, and I end up with the code
truncating open parenthesis:
the statement "INSERT INTO t1 VALUES ()" ends up being first built
in two strings
"INSERT INTO t1 ("
and
" VALUES ("
If there are fields with values, they get appended, with commas, and
the last loop, a trailing comma is there
"INSERT INTO t1 ( col1, col2, colN, "
" VALUES ( 'val1', 'val2', 'valN', "
Then, if there are fields, it should decrement the string by ", " length.
char
insert_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
Field
**
field
;
uint
tmp_length
;
"INSERT INTO t1 ( col1, col2, colN"
" VALUES ( 'val1', 'val2', 'valN'"
/* The main insert query string */
String
insert_string
(
insert_buffer
,
sizeof
(
insert_buffer
),
&
my_charset_bin
);
DBUG_ENTER
(
"ha_federated::append_stmt_insert"
);
Then it adds a close paren to both - if there are fields
insert_string
.
length
(
0
);
"INSERT INTO t1 ( col1, col2, colN)"
" VALUES ( 'val1', 'val2', 'valN')"
if
(
replace_duplicates
)
insert_string
.
append
(
STRING_WITH_LEN
(
"REPLACE INTO "
));
else
if
(
ignore_duplicates
&&
!
insert_dup_update
)
insert_string
.
append
(
STRING_WITH_LEN
(
"INSERT IGNORE INTO "
));
else
insert_string
.
append
(
STRING_WITH_LEN
(
"INSERT INTO "
));
append_ident
(
&
insert_string
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
insert_string
.
append
(
FEDERATED_OPENPAREN
);
tmp_length
=
insert_string
.
length
()
-
strlen
(
STRING_WITH_LEN
(
", "
));
Then appends both together
"INSERT INTO t1 ( col1, col2, colN) VALUES ( 'val1', 'val2', 'valN')"
/*
loop through the field pointer array, add any fields to both the values
list and the fields list that match the current query id
*/
for
(
field
=
table
->
field
;
*
field
;
field
++
)
{
if
(
bitmap_is_set
(
table
->
write_set
,
(
*
field
)
->
field_index
))
{
/* append the field name */
append_ident
(
&
insert_string
,
(
*
field
)
->
field_name
,
strlen
((
*
field
)
->
field_name
),
ident_quote_char
);
So... the problem, is if you have the original statement:
/* append commas between both fields and fieldnames */
/*
unfortunately, we can't use the logic if *(fields + 1) to
make the following appends conditional as we don't know if the
next field is in the write set
*/
insert_string
.
append
(
STRING_WITH_LEN
(
", "
));
}
}
"INSERT INTO t1 VALUES ()"
/*
remove trailing comma
*/
insert_string
.
length
(
insert_string
.
length
()
-
sizeof_trailing_comma
);
Which is legitimate, but if the code thinks there are fields
/*
if there were no fields, we don't want to add a closing paren
AND, we don't want to chop off the last char '('
insert will be "INSERT INTO t1 VALUES ();"
*/
if
(
insert_string
.
length
()
>
tmp_length
)
{
insert_string
.
append
(
STRING_WITH_LEN
(
") "
);
}
"INSERT INTO t1 ("
" VALUES ( "
insert_string
.
append
(
STRING_WITH_LEN
(
" VALUES "
));
If the field flag is set, but there are no commas, reduces the
string by strlen(", ")
DBUG_RETURN
(
query
->
append
(
insert_string
));
}
"INSERT INTO t1 "
" VALUES "
Then adds the close parenthesis
/*
write_row() inserts a row. No extra() hint is given currently if a bulk load
is happeneding. buf() is a byte array of data. You can use the field
information to extract the data from the native byte array type.
Example of this would be:
for (Field **field=table->field ; *field ; field++)
{
...
}
"INSERT INTO t1 )"
" VALUES )"
Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
*/
So, I have to use a bool as before, set in the loop where fields and commas
are appended to the string
*/
my_bool
commas_added
=
FALSE
;
char
insert_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
int
ha_federated
::
write_row
(
uchar
*
buf
)
{
char
values_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
char
insert_field_value_buffer
[
STRING_BUFFER_USUAL_SIZE
];
Field
**
field
;
uint
tmp_length
;
int
error
=
0
;
bool
use_bulk_insert
;
bool
auto_increment_update_required
=
(
table
->
next_number_field
!=
NULL
);
/* The main insert query string */
String
insert_string
(
insert_buffer
,
sizeof
(
insert_buffer
),
&
my_charset_bin
);
/* The string containing the values to be added to the insert */
String
values_string
(
values_buffer
,
sizeof
(
values_buffer
),
&
my_charset_bin
);
/* The actual value of the field, to be added to the values_string */
...
...
@@ -1830,7 +1904,6 @@ int ha_federated::write_row(uchar *buf)
DBUG_ENTER
(
"ha_federated::write_row"
);
values_string
.
length
(
0
);
insert_string
.
length
(
0
);
insert_field_value_string
.
length
(
0
);
ha_statistic_increment
(
&
SSV
::
ha_write_count
);
if
(
table
->
timestamp_field_type
&
TIMESTAMP_AUTO_SET_ON_INSERT
)
...
...
@@ -1838,14 +1911,19 @@ int ha_federated::write_row(uchar *buf)
/*
start both our field and field values strings
We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
Ignore duplicates is always true when insert_dup_update is true.
When replace_duplicates == TRUE, we can safely enable multi-row insert.
When performing multi-row insert, we only collect the columns values for
the row. The start of the statement is only created when the first
row is copied in to the bulk_insert string.
*/
insert_string
.
append
(
STRING_WITH_LEN
(
"INSERT INTO `"
));
insert_string
.
append
(
share
->
table_name
,
share
->
table_name_length
);
insert_string
.
append
(
'`'
);
insert_string
.
append
(
STRING_WITH_LEN
(
" ("
));
if
(
!
(
use_bulk_insert
=
bulk_insert
.
str
&&
(
!
insert_dup_update
||
replace_duplicates
)))
append_stmt_insert
(
&
values_string
);
values_string
.
append
(
STRING_WITH_LEN
(
" VALUES "
));
values_string
.
append
(
STRING_WITH_LEN
(
" ("
));
tmp_length
=
values_string
.
length
();
/*
loop through the field pointer array, add any fields to both the values
...
...
@@ -1855,7 +1933,6 @@ int ha_federated::write_row(uchar *buf)
{
if
(
bitmap_is_set
(
table
->
write_set
,
(
*
field
)
->
field_index
))
{
commas_added
=
TRUE
;
if
((
*
field
)
->
is_null
())
values_string
.
append
(
STRING_WITH_LEN
(
" NULL "
));
else
...
...
@@ -1863,15 +1940,13 @@ int ha_federated::write_row(uchar *buf)
bool
needs_quote
=
(
*
field
)
->
str_needs_quotes
();
(
*
field
)
->
val_str
(
&
insert_field_value_string
);
if
(
needs_quote
)
values_string
.
append
(
'\''
);
values_string
.
append
(
value_quote_char
);
insert_field_value_string
.
print
(
&
values_string
);
if
(
needs_quote
)
values_string
.
append
(
'\''
);
values_string
.
append
(
value_quote_char
);
insert_field_value_string
.
length
(
0
);
}
/* append the field name */
insert_string
.
append
((
*
field
)
->
field_name
);
/* append commas between both fields and fieldnames */
/*
...
...
@@ -1879,7 +1954,6 @@ int ha_federated::write_row(uchar *buf)
make the following appends conditional as we don't know if the
next field is in the write set
*/
insert_string
.
append
(
STRING_WITH_LEN
(
", "
));
values_string
.
append
(
STRING_WITH_LEN
(
", "
));
}
}
...
...
@@ -1890,26 +1964,53 @@ int ha_federated::write_row(uchar *buf)
AND, we don't want to chop off the last char '('
insert will be "INSERT INTO t1 VALUES ();"
*/
if
(
commas_added
)
if
(
values_string
.
length
()
>
tmp_length
)
{
insert_string
.
length
(
insert_string
.
length
()
-
sizeof_trailing_comma
);
/* chops off leading commas */
/* chops off trailing comma */
values_string
.
length
(
values_string
.
length
()
-
sizeof_trailing_comma
);
insert_string
.
append
(
STRING_WITH_LEN
(
") "
));
}
else
{
/* chops off trailing ) */
insert_string
.
length
(
insert_string
.
length
()
-
sizeof_trailing_closeparen
);
}
/* we always want to append this, even if there aren't any fields */
values_string
.
append
(
STRING_WITH_LEN
(
") "
));
/* add the values */
insert_string
.
append
(
values_string
);
if
(
use_bulk_insert
)
{
/*
Send the current bulk insert out if appending the current row would
cause the statement to overflow the packet size, otherwise set
auto_increment_update_required to FALSE as no query was executed.
*/
if
(
bulk_insert
.
length
+
values_string
.
length
()
+
bulk_padding
>
mysql
->
net
.
max_packet_size
&&
bulk_insert
.
length
)
{
error
=
mysql_real_query
(
mysql
,
bulk_insert
.
str
,
bulk_insert
.
length
);
bulk_insert
.
length
=
0
;
}
else
auto_increment_update_required
=
FALSE
;
if
(
bulk_insert
.
length
==
0
)
{
char
insert_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
String
insert_string
(
insert_buffer
,
sizeof
(
insert_buffer
),
&
my_charset_bin
);
insert_string
.
length
(
0
);
append_stmt_insert
(
&
insert_string
);
dynstr_append_mem
(
&
bulk_insert
,
insert_string
.
ptr
(),
insert_string
.
length
());
}
else
dynstr_append_mem
(
&
bulk_insert
,
","
,
1
);
if
(
mysql_real_query
(
mysql
,
insert_string
.
ptr
(),
insert_string
.
length
()))
dynstr_append_mem
(
&
bulk_insert
,
values_string
.
ptr
(),
values_string
.
length
());
}
else
{
error
=
mysql_real_query
(
mysql
,
values_string
.
ptr
(),
values_string
.
length
());
}
if
(
error
)
{
DBUG_RETURN
(
stash_remote_error
());
}
...
...
@@ -1917,12 +2018,79 @@ int ha_federated::write_row(uchar *buf)
If the table we've just written a record to contains an auto_increment
field, then store the last_insert_id() value from the foreign server
*/
if
(
table
->
next_number_fiel
d
)
if
(
auto_increment_update_require
d
)
update_auto_increment
();
DBUG_RETURN
(
0
);
}
/**
@brief Prepares the storage engine for bulk inserts.
@param[in] rows estimated number of rows in bulk insert
or 0 if unknown.
@details Initializes memory structures required for bulk insert.
*/
void
ha_federated
::
start_bulk_insert
(
ha_rows
rows
)
{
uint
page_size
;
DBUG_ENTER
(
"ha_federated::start_bulk_insert"
);
dynstr_free
(
&
bulk_insert
);
/**
We don't bother with bulk-insert semantics when the estimated rows == 1
The rows value will be 0 if the server does not know how many rows
would be inserted. This can occur when performing INSERT...SELECT
*/
if
(
rows
==
1
)
DBUG_VOID_RETURN
;
page_size
=
(
uint
)
my_getpagesize
();
if
(
init_dynamic_string
(
&
bulk_insert
,
NULL
,
page_size
,
page_size
))
DBUG_VOID_RETURN
;
bulk_insert
.
length
=
0
;
DBUG_VOID_RETURN
;
}
/**
@brief End bulk insert.
@details This method will send any remaining rows to the remote server.
Finally, it will deinitialize the bulk insert data structure.
@return Operation status
@retval 0 No error
@retval != 0 Error occured at remote server. Also sets my_errno.
*/
int
ha_federated
::
end_bulk_insert
()
{
int
error
=
0
;
DBUG_ENTER
(
"ha_federated::end_bulk_insert"
);
if
(
bulk_insert
.
str
&&
bulk_insert
.
length
)
{
if
(
mysql_real_query
(
mysql
,
bulk_insert
.
str
,
bulk_insert
.
length
))
error
=
stash_remote_error
();
else
if
(
table
->
next_number_field
)
update_auto_increment
();
}
dynstr_free
(
&
bulk_insert
);
DBUG_RETURN
(
my_errno
=
error
);
}
/*
ha_federated::update_auto_increment
...
...
@@ -1952,9 +2120,9 @@ int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
query
.
length
(
0
);
query
.
set_charset
(
system_charset_info
);
query
.
append
(
STRING_WITH_LEN
(
"OPTIMIZE TABLE
`
"
));
query
.
append
(
share
->
table_name
,
share
->
table_name_length
);
query
.
append
(
STRING_WITH_LEN
(
"`"
)
);
query
.
append
(
STRING_WITH_LEN
(
"OPTIMIZE TABLE "
));
append_ident
(
&
query
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
if
(
mysql_real_query
(
mysql
,
query
.
ptr
(),
query
.
length
()))
{
...
...
@@ -1974,9 +2142,9 @@ int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
query
.
length
(
0
);
query
.
set_charset
(
system_charset_info
);
query
.
append
(
STRING_WITH_LEN
(
"REPAIR TABLE
`
"
));
query
.
append
(
share
->
table_name
,
share
->
table_name_length
);
query
.
append
(
STRING_WITH_LEN
(
"`"
)
);
query
.
append
(
STRING_WITH_LEN
(
"REPAIR TABLE "
));
append_ident
(
&
query
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
if
(
check_opt
->
flags
&
T_QUICK
)
query
.
append
(
STRING_WITH_LEN
(
" QUICK"
));
if
(
check_opt
->
flags
&
T_EXTEND
)
...
...
@@ -2053,9 +2221,13 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data)
update_string
.
length
(
0
);
where_string
.
length
(
0
);
update_string
.
append
(
STRING_WITH_LEN
(
"UPDATE `"
));
update_string
.
append
(
share
->
table_name
);
update_string
.
append
(
STRING_WITH_LEN
(
"` SET "
));
if
(
ignore_duplicates
)
update_string
.
append
(
STRING_WITH_LEN
(
"UPDATE IGNORE "
));
else
update_string
.
append
(
STRING_WITH_LEN
(
"UPDATE "
));
append_ident
(
&
update_string
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
update_string
.
append
(
STRING_WITH_LEN
(
" SET "
));
/*
In this loop, we want to match column names to values being inserted
...
...
@@ -2071,7 +2243,9 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data)
{
if
(
bitmap_is_set
(
table
->
write_set
,
(
*
field
)
->
field_index
))
{
update_string
.
append
((
*
field
)
->
field_name
);
uint
field_name_length
=
strlen
((
*
field
)
->
field_name
);
append_ident
(
&
update_string
,
(
*
field
)
->
field_name
,
field_name_length
,
ident_quote_char
);
update_string
.
append
(
STRING_WITH_LEN
(
" = "
));
if
((
*
field
)
->
is_null
())
...
...
@@ -2083,10 +2257,10 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data)
bool
needs_quote
=
(
*
field
)
->
str_needs_quotes
();
(
*
field
)
->
val_str
(
&
field_value
);
if
(
needs_quote
)
update_string
.
append
(
'\''
);
update_string
.
append
(
value_quote_char
);
field_value
.
print
(
&
update_string
);
if
(
needs_quote
)
update_string
.
append
(
'\''
);
update_string
.
append
(
value_quote_char
);
field_value
.
length
(
0
);
tmp_restore_column_map
(
table
->
read_set
,
old_map
);
}
...
...
@@ -2095,7 +2269,9 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data)
if
(
bitmap_is_set
(
table
->
read_set
,
(
*
field
)
->
field_index
))
{
where_string
.
append
((
*
field
)
->
field_name
);
uint
field_name_length
=
strlen
((
*
field
)
->
field_name
);
append_ident
(
&
where_string
,
(
*
field
)
->
field_name
,
field_name_length
,
ident_quote_char
);
if
(
field_in_record_is_null
(
table
,
*
field
,
(
char
*
)
old_data
))
where_string
.
append
(
STRING_WITH_LEN
(
" IS NULL "
));
else
...
...
@@ -2105,10 +2281,10 @@ int ha_federated::update_row(const uchar *old_data, uchar *new_data)
(
*
field
)
->
val_str
(
&
field_value
,
(
old_data
+
(
*
field
)
->
offset
(
record
)));
if
(
needs_quote
)
where_string
.
append
(
'\''
);
where_string
.
append
(
value_quote_char
);
field_value
.
print
(
&
where_string
);
if
(
needs_quote
)
where_string
.
append
(
'\''
);
where_string
.
append
(
value_quote_char
);
field_value
.
length
(
0
);
}
where_string
.
append
(
STRING_WITH_LEN
(
" AND "
));
...
...
@@ -2165,9 +2341,10 @@ int ha_federated::delete_row(const uchar *buf)
DBUG_ENTER
(
"ha_federated::delete_row"
);
delete_string
.
length
(
0
);
delete_string
.
append
(
STRING_WITH_LEN
(
"DELETE FROM `"
));
delete_string
.
append
(
share
->
table_name
);
delete_string
.
append
(
STRING_WITH_LEN
(
"` WHERE "
));
delete_string
.
append
(
STRING_WITH_LEN
(
"DELETE FROM "
));
append_ident
(
&
delete_string
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
delete_string
.
append
(
STRING_WITH_LEN
(
" WHERE "
));
for
(
Field
**
field
=
table
->
field
;
*
field
;
field
++
)
{
...
...
@@ -2175,8 +2352,9 @@ int ha_federated::delete_row(const uchar *buf)
found
++
;
if
(
bitmap_is_set
(
table
->
read_set
,
cur_field
->
field_index
))
{
append_ident
(
&
delete_string
,
(
*
field
)
->
field_name
,
strlen
((
*
field
)
->
field_name
),
ident_quote_char
);
data_string
.
length
(
0
);
delete_string
.
append
(
cur_field
->
field_name
);
if
(
cur_field
->
is_null
())
{
delete_string
.
append
(
STRING_WITH_LEN
(
" IS NULL "
));
...
...
@@ -2187,13 +2365,12 @@ int ha_federated::delete_row(const uchar *buf)
delete_string
.
append
(
STRING_WITH_LEN
(
" = "
));
cur_field
->
val_str
(
&
data_string
);
if
(
needs_quote
)
delete_string
.
append
(
'\''
);
delete_string
.
append
(
value_quote_char
);
data_string
.
print
(
&
delete_string
);
if
(
needs_quote
)
delete_string
.
append
(
'\''
);
delete_string
.
append
(
value_quote_char
);
}
delete_string
.
append
(
STRING_WITH_LEN
(
" AND "
));
}
}
// Remove trailing AND
...
...
@@ -2680,7 +2857,6 @@ int ha_federated::info(uint flag)
{
char
error_buffer
[
FEDERATED_QUERY_BUFFER_SIZE
];
char
status_buf
[
FEDERATED_QUERY_BUFFER_SIZE
];
char
escaped_table_name
[
FEDERATED_QUERY_BUFFER_SIZE
];
int
error
;
uint
error_code
;
MYSQL_RES
*
result
=
0
;
...
...
@@ -2693,13 +2869,9 @@ int ha_federated::info(uint flag)
if
(
flag
&
(
HA_STATUS_VARIABLE
|
HA_STATUS_CONST
))
{
status_query_string
.
length
(
0
);
status_query_string
.
append
(
STRING_WITH_LEN
(
"SHOW TABLE STATUS LIKE '"
));
escape_string_for_mysql
(
&
my_charset_bin
,
(
char
*
)
escaped_table_name
,
sizeof
(
escaped_table_name
),
share
->
table_name
,
share
->
table_name_length
);
status_query_string
.
append
(
escaped_table_name
);
status_query_string
.
append
(
STRING_WITH_LEN
(
"'"
));
status_query_string
.
append
(
STRING_WITH_LEN
(
"SHOW TABLE STATUS LIKE "
));
append_ident
(
&
status_query_string
,
share
->
table_name
,
share
->
table_name_length
,
value_quote_char
);
if
(
mysql_real_query
(
mysql
,
status_query_string
.
ptr
(),
status_query_string
.
length
()))
...
...
@@ -2770,6 +2942,51 @@ error:
}
/**
@brief Handles extra signals from MySQL server
@param[in] operation Hint for storage engine
@return Operation Status
@retval 0 OK
*/
int
ha_federated
::
extra
(
ha_extra_function
operation
)
{
DBUG_ENTER
(
"ha_federated::extra"
);
switch
(
operation
)
{
case
HA_EXTRA_IGNORE_DUP_KEY
:
ignore_duplicates
=
TRUE
;
break
;
case
HA_EXTRA_NO_IGNORE_DUP_KEY
:
insert_dup_update
=
FALSE
;
ignore_duplicates
=
FALSE
;
break
;
case
HA_EXTRA_WRITE_CAN_REPLACE
:
replace_duplicates
=
TRUE
;
break
;
case
HA_EXTRA_WRITE_CANNOT_REPLACE
:
/*
We use this flag to ensure that we do not create an "INSERT IGNORE"
statement when inserting new rows into the remote table.
*/
replace_duplicates
=
FALSE
;
break
;
case
HA_EXTRA_INSERT_WITH_UPDATE
:
insert_dup_update
=
TRUE
;
break
;
case
HA_EXTRA_RESET
:
insert_dup_update
=
FALSE
;
ignore_duplicates
=
FALSE
;
replace_duplicates
=
FALSE
;
break
;
default:
/* do nothing */
DBUG_PRINT
(
"info"
,(
"unhandled operation: %d"
,
(
uint
)
operation
));
}
DBUG_RETURN
(
0
);
}
/*
Used to delete all rows in a table. Both for cases of truncate and
for cases where the optimizer realizes that all rows will be
...
...
@@ -2791,9 +3008,9 @@ int ha_federated::delete_all_rows()
query
.
length
(
0
);
query
.
set_charset
(
system_charset_info
);
query
.
append
(
STRING_WITH_LEN
(
"TRUNCATE
`
"
));
query
.
append
(
share
->
table_name
);
query
.
append
(
STRING_WITH_LEN
(
"`"
)
);
query
.
append
(
STRING_WITH_LEN
(
"TRUNCATE "
));
append_ident
(
&
query
,
share
->
table_name
,
share
->
table_name_length
,
ident_quote_char
);
/*
TRUNCATE won't return anything in mysql_affected_rows
...
...
@@ -2901,6 +3118,9 @@ int ha_federated::stash_remote_error()
DBUG_ENTER
(
"ha_federated::stash_remote_error()"
);
remote_error_number
=
mysql_errno
(
mysql
);
strmake
(
remote_error_buf
,
mysql_error
(
mysql
),
sizeof
(
remote_error_buf
)
-
1
);
if
(
remote_error_number
==
ER_DUP_ENTRY
||
remote_error_number
==
ER_DUP_KEY
)
DBUG_RETURN
(
HA_ERR_FOUND_DUPP_KEY
);
DBUG_RETURN
(
HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM
);
}
...
...
storage/federated/ha_federated.h
View file @
bc3e18cd
...
...
@@ -88,6 +88,9 @@ class ha_federated: public handler
MYSQL_ROW_OFFSET
current_position
;
// Current position used by ::position()
int
remote_error_number
;
char
remote_error_buf
[
FEDERATED_QUERY_BUFFER_SIZE
];
bool
ignore_duplicates
,
replace_duplicates
;
bool
insert_dup_update
;
DYNAMIC_STRING
bulk_insert
;
private:
/*
...
...
@@ -102,6 +105,14 @@ private:
bool
records_in_range
,
bool
eq_range
);
int
stash_remote_error
();
bool
append_stmt_insert
(
String
*
query
);
int
read_next
(
byte
*
buf
,
MYSQL_RES
*
result
);
int
index_read_idx_with_result_set
(
byte
*
buf
,
uint
index
,
const
byte
*
key
,
uint
key_len
,
ha_rkey_function
find_flag
,
MYSQL_RES
**
result
);
public:
ha_federated
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
~
ha_federated
()
{}
...
...
@@ -189,6 +200,8 @@ public:
int
open
(
const
char
*
name
,
int
mode
,
uint
test_if_locked
);
// required
int
close
(
void
);
// required
void
start_bulk_insert
(
ha_rows
rows
);
int
end_bulk_insert
();
int
write_row
(
uchar
*
buf
);
int
update_row
(
const
uchar
*
old_data
,
uchar
*
new_data
);
int
delete_row
(
const
uchar
*
buf
);
...
...
@@ -217,6 +230,7 @@ public:
int
rnd_pos
(
uchar
*
buf
,
uchar
*
pos
);
//required
void
position
(
const
uchar
*
record
);
//required
int
info
(
uint
);
//required
int
extra
(
ha_extra_function
operation
);
void
update_auto_increment
(
void
);
int
repair
(
THD
*
thd
,
HA_CHECK_OPT
*
check_opt
);
...
...
@@ -231,18 +245,11 @@ public:
THR_LOCK_DATA
**
store_lock
(
THD
*
thd
,
THR_LOCK_DATA
**
to
,
enum
thr_lock_type
lock_type
);
//required
virtual
bool
get_error_message
(
int
error
,
String
*
buf
);
bool
get_error_message
(
int
error
,
String
*
buf
);
int
external_lock
(
THD
*
thd
,
int
lock_type
);
int
connection_commit
();
int
connection_rollback
();
int
connection_autocommit
(
bool
state
);
int
execute_simple_query
(
const
char
*
query
,
int
len
);
int
read_next
(
uchar
*
buf
,
MYSQL_RES
*
result
);
int
index_read_idx_with_result_set
(
uchar
*
buf
,
uint
index
,
const
uchar
*
key
,
uint
key_len
,
ha_rkey_function
find_flag
,
MYSQL_RES
**
result
);
};
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment