Commit 4e46f2db authored by unknown's avatar unknown

I had forgotten to delete an already disabled line of C++ code.

---
BUG#20265 (Replication of CREATE-SELECT does not work correctly):
Fixing bug by making binary log handle statement transactions.
The binary log transaction cache can now be truncated to remove 
events inserted during this statement or transaction. Also, the
binary log participate in XA transaction handling, although not
as a full 2pc resource.


mysql-test/r/binlog_row_mix_innodb_myisam.result:
  Result change
sql/log.cc:
  Making change to binlog_end_trans() to support that it can be called
  for statement transactions as well.
sql/sql_class.h:
  Adding function THD::binlog_start_trans_and_stmt() to start a real transaction
  (if necessary) and also a statement transaction.
sql/sql_insert.cc:
  I had forgotten to delete this line (it was already disabled using //;
  this line was not needed because we do the empty() every time
  we write to the binlog (in MYSQL_LOG::write());
  t/binlog_stm_binlog.test already tests that the empty() indeed happens
  for INSERT DELAYED.
  ---
  Changes to use the statement transactions that the binary log now can handle.
parent 9cb377f9
...@@ -359,15 +359,6 @@ show binlog events from 102; ...@@ -359,15 +359,6 @@ show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Query 1 # use `test`; BEGIN
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
`a` int(11) NOT NULL DEFAULT '0',
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2 master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
...@@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2 ...@@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2
master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Query 1 # use `test`; BEGIN
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
`a` int(11) NOT NULL DEFAULT '0',
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2 master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2
master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
......
This diff is collapsed.
...@@ -929,6 +929,7 @@ public: ...@@ -929,6 +929,7 @@ public:
/* /*
Public interface to write RBR events to the binlog Public interface to write RBR events to the binlog
*/ */
void binlog_start_trans_and_stmt();
int binlog_write_table_map(TABLE *table, bool is_transactional); int binlog_write_table_map(TABLE *table, bool is_transactional);
int binlog_write_row(TABLE* table, bool is_transactional, int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, my_size_t colcnt, MY_BITMAP const* cols, my_size_t colcnt,
......
...@@ -1234,7 +1234,7 @@ err: ...@@ -1234,7 +1234,7 @@ err:
if (thd->lex->current_select) if (thd->lex->current_select)
thd->lex->current_select->no_error= 0; // Give error thd->lex->current_select->no_error= 0; // Give error
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
before_trg_err: before_trg_err:
table->file->restore_auto_increment(prev_insert_id); table->file->restore_auto_increment(prev_insert_id);
if (key) if (key)
...@@ -1982,6 +1982,10 @@ err: ...@@ -1982,6 +1982,10 @@ err:
rolled back. We only need to roll back a potential statement rolled back. We only need to roll back a potential statement
transaction, since real transactions are rolled back in transaction, since real transactions are rolled back in
close_thread_tables(). close_thread_tables().
TODO: This is not true any more, table maps are generated on the
first call to ha_*_row() instead. Remove code that are used to
cover for the case outlined above.
*/ */
ha_rollback_stmt(thd); ha_rollback_stmt(thd);
...@@ -2086,8 +2090,6 @@ bool delayed_insert::handle_inserts(void) ...@@ -2086,8 +2090,6 @@ bool delayed_insert::handle_inserts(void)
thd.start_time=row->start_time; thd.start_time=row->start_time;
thd.query_start_used=row->query_start_used; thd.query_start_used=row->query_start_used;
/* for the binlog, forget auto_increment ids generated by previous rows */
// thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
thd.first_successful_insert_id_in_prev_stmt= thd.first_successful_insert_id_in_prev_stmt=
row->first_successful_insert_id_in_prev_stmt; row->first_successful_insert_id_in_prev_stmt;
thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
...@@ -2317,6 +2319,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2317,6 +2319,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
DBUG_ENTER("select_insert::prepare"); DBUG_ENTER("select_insert::prepare");
unit= u; unit= u;
/* /*
Since table in which we are going to insert is added to the first Since table in which we are going to insert is added to the first
select, LEX::current_select should point to the first select while select, LEX::current_select should point to the first select while
...@@ -2547,56 +2550,54 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2547,56 +2550,54 @@ void select_insert::send_error(uint errcode,const char *err)
if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
my_message(errcode, err, MYF(0)); my_message(errcode, err, MYF(0));
if (!table) /*
If the creation of the table failed (due to a syntax error, for
example), no table will have been opened and therefore 'table'
will be NULL. In that case, we still need to execute the rollback
and the end of the function to truncate the binary log, but we can
skip all the intermediate steps.
*/
if (table)
{ {
/* /*
This can only happen when using CREATE ... SELECT and the table was not If we are not in prelocked mode, we end the bulk insert started
created becasue of an syntax error before.
*/ */
DBUG_VOID_RETURN; if (!thd->prelocked_mode)
} table->file->ha_end_bulk_insert();
if (!thd->prelocked_mode)
table->file->ha_end_bulk_insert(); /*
/* If at least one row has been inserted/modified and will stay in
If at least one row has been inserted/modified and will stay in the table the table (the table doesn't have transactions) we must write to
(the table doesn't have transactions) we must write to the binlog (and the binlog (and the error code will make the slave stop).
the error code will make the slave stop).
For many errors (example: we got a duplicate key error while
For many errors (example: we got a duplicate key error while inserting into a MyISAM table), no row will be added to the table,
inserting into a MyISAM table), no row will be added to the table, so passing the error to the slave will not help since there will
so passing the error to the slave will not help since there will be an error code mismatch (the inserts will succeed on the slave
be an error code mismatch (the inserts will succeed on the slave with no error).
with no error).
If table creation failed, the number of rows modified will also be
If we are using row-based replication we have two cases where this zero, so no check for that is made.
code is executed: replication of CREATE-SELECT and replication of */
INSERT-SELECT. if (info.copied || info.deleted || info.updated)
When replicating a CREATE-SELECT statement, we shall not write the
events to the binary log and should thus not set
OPTION_STATUS_NO_TRANS_UPDATE.
When replicating INSERT-SELECT, we shall not write the events to
the binary log for transactional table, but shall write all events
if there is one or more writes to non-transactional tables. In
this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
write to a non-transactional table, otherwise it is cleared.
*/
if (info.copied || info.deleted || info.updated)
{
if (!table->file->has_transactions())
{ {
if (mysql_bin_log.is_open()) DBUG_ASSERT(table != NULL);
if (!table->file->has_transactions())
{ {
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, if (mysql_bin_log.is_open())
table->file->has_transactions(), FALSE); {
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
table->file->has_transactions(), FALSE);
}
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
!can_rollback_data())
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
query_cache_invalidate3(thd, table, 1);
} }
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
!can_rollback_data())
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
query_cache_invalidate3(thd, table, 1);
} }
} }
ha_rollback_stmt(thd); ha_rollback_stmt(thd);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2605,8 +2606,11 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2605,8 +2606,11 @@ void select_insert::send_error(uint errcode,const char *err)
bool select_insert::send_eof() bool select_insert::send_eof()
{ {
int error,error2; int error,error2;
bool const trans_table= table->file->has_transactions();
ulonglong id; ulonglong id;
DBUG_ENTER("select_insert::send_eof"); DBUG_ENTER("select_insert::send_eof");
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type()));
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
...@@ -2626,9 +2630,8 @@ bool select_insert::send_eof() ...@@ -2626,9 +2630,8 @@ bool select_insert::send_eof()
are not logged in RBR) are not logged in RBR)
- We are using statement based replication - We are using statement based replication
*/ */
if (!table->file->has_transactions() && if (!trans_table &&
(!table->s->tmp_table || (!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
!thd->current_stmt_binlog_row_based))
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
} }
...@@ -2644,10 +2647,21 @@ bool select_insert::send_eof() ...@@ -2644,10 +2647,21 @@ bool select_insert::send_eof()
thd->clear_error(); thd->clear_error();
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query, thd->query_length, thd->query, thd->query_length,
table->file->has_transactions(), FALSE); trans_table, FALSE);
} }
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) /*
error=error2; We will call ha_autocommit_or_rollback() also for
non-transactional tables under row-based replication: there might
be events in the binary logs transaction, and we need to write
them to the binary log.
*/
if (trans_table || thd->current_stmt_binlog_row_based)
{
int const error2= ha_autocommit_or_rollback(thd, error);
if (error2 && !error)
error=error2;
}
if (error) if (error)
{ {
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
...@@ -2843,14 +2857,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2843,14 +2857,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
class MY_HOOKS : public TABLEOP_HOOKS { class MY_HOOKS : public TABLEOP_HOOKS {
public: public:
MY_HOOKS(select_create *x) : ptr(x) { } MY_HOOKS(select_create *x) : ptr(x) { }
private:
virtual void do_prelock(TABLE **tables, uint count) virtual void do_prelock(TABLE **tables, uint count)
{ {
if (ptr->get_thd()->current_stmt_binlog_row_based && TABLE const *const table = *tables;
!(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) if (ptr->get_thd()->current_stmt_binlog_row_based &&
ptr->binlog_show_create_table(tables, count); table->s->tmp_table == NO_TMP_TABLE &&
!ptr->get_create_info()->table_existed)
{
ptr->binlog_show_create_table(tables, count);
}
} }
private:
select_create *ptr; select_create *ptr;
}; };
...@@ -2859,6 +2878,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2859,6 +2878,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
#endif #endif
unit= u; unit= u;
#ifdef HAVE_ROW_BASED_REPLICATION
/*
Start a statement transaction before the create if we are creating
a non-temporary table and are using row-based replication for the
statement.
*/
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
thd->current_stmt_binlog_row_based)
{
thd->binlog_start_trans_and_stmt();
}
#endif
if (!(table= create_table_from_items(thd, create_info, create_table, if (!(table= create_table_from_items(thd, create_info, create_table,
extra_fields, keys, &values, extra_fields, keys, &values,
&thd->extra_lock, hook_ptr))) &thd->extra_lock, hook_ptr)))
...@@ -3006,7 +3039,16 @@ void select_create::abort() ...@@ -3006,7 +3039,16 @@ void select_create::abort()
table->s->version= 0; table->s->version= 0;
hash_delete(&open_cache,(byte*) table); hash_delete(&open_cache,(byte*) table);
if (!create_info->table_existed) if (!create_info->table_existed)
{
quick_rm_table(table_type, create_table->db, create_table->table_name); quick_rm_table(table_type, create_table->db, create_table->table_name);
/*
We roll back the statement, including truncating the
transaction cache of the binary log, if the statement
failed.
*/
if (thd->current_stmt_binlog_row_based)
ha_rollback_stmt(thd);
}
/* Tell threads waiting for refresh that something has happened */ /* Tell threads waiting for refresh that something has happened */
if (version != refresh_version) if (version != refresh_version)
broadcast_refresh(); broadcast_refresh();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment