diff --git a/innobase/include/trx0trx.h b/innobase/include/trx0trx.h
index 9db69261468a6e0854ae412886487f4416ed5777..64e9c87fbbd063f46bd6838304a0682a228b7290 100644
--- a/innobase/include/trx0trx.h
+++ b/innobase/include/trx0trx.h
@@ -390,8 +390,9 @@ struct trx_struct{
 	dulint		table_id;	/* table id if the preceding field is
 					TRUE */
 	/*------------------------------*/
-	int		active_trans;	/* whether a transaction in MySQL
-					is active */
+	int		active_trans;	/* 1 - if a transaction in MySQL
+					is active. 2 - if prepare_commit_mutex
+                                        was taken */
 	void*           mysql_thd;      /* MySQL thread handle corresponding
 					to this trx, or NULL */
 	char**		mysql_query_str;/* pointer to the field in mysqld_thd
diff --git a/mysql-test/r/flush.result b/mysql-test/r/flush.result
index bab9b54330723cb77e45056510ec938e36abdb42..306376b13c3d3c1d969192389e72e2332e10b632 100644
--- a/mysql-test/r/flush.result
+++ b/mysql-test/r/flush.result
@@ -8,7 +8,7 @@ n
 3
 flush tables with read lock;
 drop table t2;
-ERROR HY000: Table 't2' was locked with a READ lock and can't be updated
+ERROR HY000: Can't execute the query because you have a conflicting read lock
  drop table t2;
 unlock tables;
 create database mysqltest;
diff --git a/mysql-test/t/flush.test b/mysql-test/t/flush.test
index 9ee6b5d76b898f28cb74fb37ee8252580ce5a041..62af9d4932bbaed01de3ff56b5bad1a040cbf978 100644
--- a/mysql-test/t/flush.test
+++ b/mysql-test/t/flush.test
@@ -37,7 +37,7 @@ connection con1;
 select * from t1;
 connection con2;
 flush tables with read lock;
---error 1099;
+--error 1223
 drop table t2;
 connection con1;
 send drop table t2;
diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc
index 870ad1af52b91295ce2efcc579879f9cba50bc3d..0ccac274d584d0506c0139361a88e5fa9db9c54c 100644
--- a/sql/ha_innodb.cc
+++ b/sql/ha_innodb.cc
@@ -45,7 +45,8 @@ have disables the InnoDB inlining in this file. */
 
 #include "ha_innodb.h"
 
-pthread_mutex_t innobase_mutex;
+pthread_mutex_t innobase_share_mutex, // to protect innobase_open_files
+                prepare_commit_mutex; // to force correct commit order in binlog
 bool innodb_inited= 0;
 
 /* Store MySQL definition of 'byte': in Linux it is char while InnoDB
@@ -1268,7 +1269,8 @@ innobase_init(void)
 
 	(void) hash_init(&innobase_open_tables,system_charset_info, 32, 0, 0,
 			 		(hash_get_key) innobase_get_key, 0, 0);
-	pthread_mutex_init(&innobase_mutex, MY_MUTEX_INIT_FAST);
+        pthread_mutex_init(&innobase_share_mutex, MY_MUTEX_INIT_FAST);
+        pthread_mutex_init(&prepare_commit_mutex, MY_MUTEX_INIT_FAST);
 	innodb_inited= 1;
 
 	/* If this is a replication slave and we needed to do a crash recovery,
@@ -1322,7 +1324,8 @@ innobase_end(void)
 	  	hash_free(&innobase_open_tables);
 	  	my_free(internal_innobase_data_file_path,
 						MYF(MY_ALLOW_ZERO_PTR));
-	  	pthread_mutex_destroy(&innobase_mutex);
+                pthread_mutex_destroy(&innobase_share_mutex);
+                pthread_mutex_destroy(&prepare_commit_mutex);
 	}
 
   	DBUG_RETURN(err);
@@ -1480,9 +1483,20 @@ innobase_commit(
  		/* We were instructed to commit the whole transaction, or
 		this is an SQL statement end and autocommit is on */
 
+                /* We need current binlog position for HotBackup to work.
+                Note, the position is current because of prepare_commit_mutex */
+                trx->mysql_log_file_name = mysql_bin_log.get_log_fname();
+                trx->mysql_log_offset =
+                        (ib_longlong)mysql_bin_log.get_log_file()->pos_in_file;
+
 		innobase_commit_low(trx);
 
+                if (trx->active_trans == 2) {
+
+                        pthread_mutex_unlock(&prepare_commit_mutex);
+                }
                 trx->active_trans = 0;
+
 	} else {
 	        /* We just mark the SQL statement ended and do not do a
 		transaction commit */
@@ -5953,7 +5967,7 @@ static mysql_byte* innobase_get_key(INNOBASE_SHARE *share,uint *length,
 static INNOBASE_SHARE *get_share(const char *table_name)
 {
   INNOBASE_SHARE *share;
-  pthread_mutex_lock(&innobase_mutex);
+  pthread_mutex_lock(&innobase_share_mutex);
   uint length=(uint) strlen(table_name);
   if (!(share=(INNOBASE_SHARE*) hash_search(&innobase_open_tables,
 					(mysql_byte*) table_name,
@@ -5967,7 +5981,7 @@ static INNOBASE_SHARE *get_share(const char *table_name)
       strmov(share->table_name,table_name);
       if (my_hash_insert(&innobase_open_tables, (mysql_byte*) share))
       {
-	pthread_mutex_unlock(&innobase_mutex);
+        pthread_mutex_unlock(&innobase_share_mutex);
 	my_free((gptr) share,0);
 	return 0;
       }
@@ -5976,13 +5990,13 @@ static INNOBASE_SHARE *get_share(const char *table_name)
     }
   }
   share->use_count++;
-  pthread_mutex_unlock(&innobase_mutex);
+  pthread_mutex_unlock(&innobase_share_mutex);
   return share;
 }
 
 static void free_share(INNOBASE_SHARE *share)
 {
-  pthread_mutex_lock(&innobase_mutex);
+  pthread_mutex_lock(&innobase_share_mutex);
   if (!--share->use_count)
   {
     hash_delete(&innobase_open_tables, (mysql_byte*) share);
@@ -5990,7 +6004,7 @@ static void free_share(INNOBASE_SHARE *share)
     pthread_mutex_destroy(&share->mutex);
     my_free((gptr) share, MYF(0));
   }
-  pthread_mutex_unlock(&innobase_mutex);
+  pthread_mutex_unlock(&innobase_share_mutex);
 }
 
 /*********************************************************************
@@ -6454,15 +6468,19 @@ innobase_xa_prepare(
 			FALSE - the current SQL statement ended */
 {
 	int error = 0;
-        trx_t* trx;
+        trx_t* trx = check_trx_exists(thd);
+
+        if (thd->lex->sql_command != SQLCOM_XA_PREPARE) {
+
+                pthread_mutex_lock(&prepare_commit_mutex);
+                trx->active_trans = 2;
+        }
 
 	if (!thd->variables.innodb_support_xa) {
 
 		return(0);
 	}
 
-        trx = check_trx_exists(thd);
-
         trx->xid=thd->transaction.xid;
 
 	/* Release a possible FIFO ticket and search latch. Since we will
diff --git a/sql/handler.cc b/sql/handler.cc
index f33f987ef77df968e6866f32f7a3ff0b19cb0e76..c7bd65bf24c70164a74d131909a27dcdf1abc287 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -526,7 +526,6 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
 
 /*
   RETURN
-     -1  - cannot prepare
       0  - ok
       1  - error, transaction was rolled back
 */
@@ -539,8 +538,6 @@ int ha_prepare(THD *thd)
 #ifdef USING_TRANSACTIONS
   if (trans->nht)
   {
-    if (trans->no_2pc)
-      DBUG_RETURN(-1);
     for (; *ht; ht++)
     {
       int err;
@@ -762,6 +759,14 @@ static char* xid_to_str(char *buf, XID *xid)
   for (i=0; i < xid->gtrid_length+xid->bqual_length; i++)
   {
     uchar c=(uchar)xid->data[i];
+    bool is_next_dig;
+    if (i < XIDDATASIZE)
+    {
+      char ch=xid->data[i+1];
+      is_next_dig=(c >= '0' && c <='9');
+    }
+    else
+      is_next_dig=FALSE;
     if (i == xid->gtrid_length)
     {
       *s++='\'';
@@ -774,9 +779,11 @@ static char* xid_to_str(char *buf, XID *xid)
     if (c < 32 || c > 126)
     {
       *s++='\\';
-      *s++='x';
-      *s++=_dig_vec_lower[c >> 4];
-      *s++=_dig_vec_lower[c & 15];
+      if (c > 077 || is_next_dig)
+        *s++=_dig_vec_lower[c >> 6];
+      if (c > 007 || is_next_dig)
+        *s++=_dig_vec_lower[(c >> 3) & 7];
+      *s++=_dig_vec_lower[c & 7];
     }
     else
     {
@@ -862,6 +869,10 @@ int ha_recover(HASH *commit_list)
         my_xid x=list[i].get_my_xid();
         if (!x) // not "mine" - that is generated by external TM
         {
+#ifndef DBUG_OFF
+          char buf[XIDDATASIZE*4+6]; // see xid_to_str
+          sql_print_information("ignore xid %s", xid_to_str(buf, list+i));
+#endif
           found_foreign_xids++;
           continue;
         }
@@ -962,9 +973,9 @@ bool mysql_xa_recover(THD *thd)
         if (xid->get_my_xid())
           continue; // skip "our" xids
         protocol->prepare_for_resend();
-        protocol->store_long((longlong)xid->formatID);
-        protocol->store_long((longlong)xid->gtrid_length);
-        protocol->store_long((longlong)xid->bqual_length);
+        protocol->store_longlong((longlong)xid->formatID, FALSE);
+        protocol->store_longlong((longlong)xid->gtrid_length, FALSE);
+        protocol->store_longlong((longlong)xid->bqual_length, FALSE);
         protocol->store(xid->data, xid->gtrid_length+xid->bqual_length,
                         &my_charset_bin);
         if (protocol->write())
diff --git a/sql/lock.cc b/sql/lock.cc
index 35b93c79fee938011c34d619ffbd8703b2725579..b4a51abba708eac1dd2be9e2158ff8d7e456e419 100644
--- a/sql/lock.cc
+++ b/sql/lock.cc
@@ -800,8 +800,8 @@ bool lock_global_read_lock(THD *thd)
 
   if (!thd->global_read_lock)
   {
-    (void) pthread_mutex_lock(&LOCK_open);
-    const char *old_message=thd->enter_cond(&COND_refresh, &LOCK_open,
+    (void) pthread_mutex_lock(&LOCK_global_read_lock);
+    const char *old_message=thd->enter_cond(&COND_refresh, &LOCK_global_read_lock,
 					    "Waiting to get readlock");
     DBUG_PRINT("info",
 	       ("waiting_for: %d  protect_against: %d",
@@ -809,7 +809,7 @@ bool lock_global_read_lock(THD *thd)
 
     waiting_for_read_lock++;
     while (protect_against_global_read_lock && !thd->killed)
-      pthread_cond_wait(&COND_refresh, &LOCK_open);
+      pthread_cond_wait(&COND_refresh, &LOCK_global_read_lock);
     waiting_for_read_lock--;
     if (thd->killed)
     {
@@ -834,11 +834,11 @@ bool lock_global_read_lock(THD *thd)
 void unlock_global_read_lock(THD *thd)
 {
   uint tmp;
-  pthread_mutex_lock(&LOCK_open);
+  pthread_mutex_lock(&LOCK_global_read_lock);
   tmp= --global_read_lock;
   if (thd->global_read_lock == MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT)
     --global_read_lock_blocks_commit;
-  pthread_mutex_unlock(&LOCK_open);
+  pthread_mutex_unlock(&LOCK_global_read_lock);
   /* Send the signal outside the mutex to avoid a context switch */
   if (!tmp)
     pthread_cond_broadcast(&COND_refresh);
@@ -857,7 +857,7 @@ bool wait_if_global_read_lock(THD *thd, bool abort_on_refresh,
   DBUG_ENTER("wait_if_global_read_lock");
 
   LINT_INIT(old_message);
-  (void) pthread_mutex_lock(&LOCK_open);
+  (void) pthread_mutex_lock(&LOCK_global_read_lock);
   if ((need_exit_cond= must_wait))
   {
     if (thd->global_read_lock)		// This thread had the read locks
@@ -865,7 +865,7 @@ bool wait_if_global_read_lock(THD *thd, bool abort_on_refresh,
       if (is_not_commit)
         my_message(ER_CANT_UPDATE_WITH_READLOCK,
                    ER(ER_CANT_UPDATE_WITH_READLOCK), MYF(0));
-      (void) pthread_mutex_unlock(&LOCK_open);
+      (void) pthread_mutex_unlock(&LOCK_global_read_lock);
       /*
         We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
         This allowance is needed to not break existing versions of innobackup
@@ -873,11 +873,11 @@ bool wait_if_global_read_lock(THD *thd, bool abort_on_refresh,
       */
       DBUG_RETURN(is_not_commit);
     }
-    old_message=thd->enter_cond(&COND_refresh, &LOCK_open,
+    old_message=thd->enter_cond(&COND_refresh, &LOCK_global_read_lock,
 				"Waiting for release of readlock");
     while (must_wait && ! thd->killed &&
 	   (!abort_on_refresh || thd->version == refresh_version))
-      (void) pthread_cond_wait(&COND_refresh,&LOCK_open);
+      (void) pthread_cond_wait(&COND_refresh,&LOCK_global_read_lock);
     if (thd->killed)
       result=1;
   }
@@ -890,7 +890,7 @@ bool wait_if_global_read_lock(THD *thd, bool abort_on_refresh,
   if (unlikely(need_exit_cond)) 
     thd->exit_cond(old_message);
   else
-    pthread_mutex_unlock(&LOCK_open);
+    pthread_mutex_unlock(&LOCK_global_read_lock);
   DBUG_RETURN(result);
 }
 
@@ -901,10 +901,10 @@ void start_waiting_global_read_lock(THD *thd)
   DBUG_ENTER("start_waiting_global_read_lock");
   if (unlikely(thd->global_read_lock))
     DBUG_VOID_RETURN;
-  (void) pthread_mutex_lock(&LOCK_open);
+  (void) pthread_mutex_lock(&LOCK_global_read_lock);
   tmp= (!--protect_against_global_read_lock &&
         (waiting_for_read_lock || global_read_lock_blocks_commit));
-  (void) pthread_mutex_unlock(&LOCK_open);
+  (void) pthread_mutex_unlock(&LOCK_global_read_lock);
   if (tmp)
     pthread_cond_broadcast(&COND_refresh);
   DBUG_VOID_RETURN;
@@ -922,16 +922,16 @@ bool make_global_read_lock_block_commit(THD *thd)
   */
   if (thd->global_read_lock != GOT_GLOBAL_READ_LOCK)
     DBUG_RETURN(1);
-  pthread_mutex_lock(&LOCK_open);
+  pthread_mutex_lock(&LOCK_global_read_lock);
   /* increment this BEFORE waiting on cond (otherwise race cond) */
   global_read_lock_blocks_commit++;
   /* For testing we set up some blocking, to see if we can be killed */
   DBUG_EXECUTE_IF("make_global_read_lock_block_commit_loop",
                   protect_against_global_read_lock++;);
-  old_message= thd->enter_cond(&COND_refresh, &LOCK_open,
+  old_message= thd->enter_cond(&COND_refresh, &LOCK_global_read_lock,
                                "Waiting for all running commits to finish");
   while (protect_against_global_read_lock && !thd->killed)
-    pthread_cond_wait(&COND_refresh, &LOCK_open);
+    pthread_cond_wait(&COND_refresh, &LOCK_global_read_lock);
   DBUG_EXECUTE_IF("make_global_read_lock_block_commit_loop",
                   protect_against_global_read_lock--;);
   if (error= thd->killed)
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 876f932759eed3b49247ebda93808c4680aff299..43bb28a5f3604d68119888eb56555617b5b0c50c 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -1101,7 +1101,7 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
        LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status,
        LOCK_error_log, LOCK_delayed_insert, LOCK_uuid_generator,
        LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
-       LOCK_slave_list, LOCK_active_mi, LOCK_manager,
+       LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_global_read_lock,
        LOCK_global_system_variables, LOCK_user_conn;
 extern rw_lock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
 extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c55700b4495f6007ae939e7555c2ca450bce5bb0..6747b79703b1dddf036a821a4420a9522ef26e7a 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -426,7 +426,7 @@ SHOW_COMP_OPTION have_crypt, have_compress;
 pthread_key(MEM_ROOT**,THR_MALLOC);
 pthread_key(THD*, THR_THD);
 pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count,
-		LOCK_mapped_file, LOCK_status,
+		LOCK_mapped_file, LOCK_status, LOCK_global_read_lock,
 		LOCK_error_log, LOCK_uuid_generator,
 		LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
 		LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received,
@@ -1102,6 +1102,7 @@ static void clean_up_mutexes()
   (void) rwlock_destroy(&LOCK_sys_init_connect);
   (void) rwlock_destroy(&LOCK_sys_init_slave);
   (void) pthread_mutex_destroy(&LOCK_global_system_variables);
+  (void) pthread_mutex_destroy(&LOCK_global_read_lock);
   (void) pthread_cond_destroy(&COND_thread_count);
   (void) pthread_cond_destroy(&COND_refresh);
   (void) pthread_cond_destroy(&COND_thread_cache);
@@ -2594,6 +2595,7 @@ static int init_thread_environment()
   (void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
   (void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
   (void) pthread_mutex_init(&LOCK_global_system_variables, MY_MUTEX_INIT_FAST);
+  (void) pthread_mutex_init(&LOCK_global_read_lock, MY_MUTEX_INIT_FAST);
   (void) pthread_mutex_init(&LOCK_uuid_generator, MY_MUTEX_INIT_FAST);
   (void) my_rwlock_init(&LOCK_sys_init_connect, NULL);
   (void) my_rwlock_init(&LOCK_sys_init_slave, NULL);
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index ca8414f9d54bb04ee5e6f96a7ae05c31a0af5097..66b0e69452f01eb2a645e02f042a886cb9c45c4e 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -4380,6 +4380,8 @@ mysql_execute_command(THD *thd)
     {
       if (!(res= !ha_commit_or_rollback_by_xid(&thd->lex->ident, 1)))
         my_error(ER_XAER_NOTA, MYF(0));
+      else
+        send_ok(thd);
       break;
     }
     if (thd->transaction.xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
@@ -4388,9 +4390,7 @@ mysql_execute_command(THD *thd)
       if ((r= ha_commit(thd)))
         my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
       else
-      {
         send_ok(thd);
-      }
     }
     else
     if (thd->transaction.xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
@@ -4398,9 +4398,7 @@ mysql_execute_command(THD *thd)
       if (ha_commit_one_phase(thd, 1))
         my_error(ER_XAER_RMERR, MYF(0));
       else
-      {
         send_ok(thd);
-      }
     }
     else
     {
@@ -4417,6 +4415,8 @@ mysql_execute_command(THD *thd)
     {
       if (!(res= !ha_commit_or_rollback_by_xid(&thd->lex->ident, 0)))
         my_error(ER_XAER_NOTA, MYF(0));
+      else
+        send_ok(thd);
       break;
     }
     if (thd->transaction.xa_state != XA_IDLE &&
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 2f872b2ad055ac69aad512693ce68d842b4f7d24..c79f49afc6416a8db27d3dec7c82fa6a0701197b 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -65,7 +65,7 @@ static int copy_data_between_tables(TABLE *from,TABLE *to,
 bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
                     my_bool drop_temporary)
 {
-  bool error= FALSE;
+  bool error= FALSE, need_start_waiters= FALSE;
   DBUG_ENTER("mysql_rm_table");
 
   /* mark for close and remove all cached entries */
@@ -74,23 +74,19 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
   thd->mysys_var->current_cond= &COND_refresh;
   VOID(pthread_mutex_lock(&LOCK_open));
 
-  if (!drop_temporary && global_read_lock)
+  if (!drop_temporary)
   {
-    if (thd->global_read_lock)
+    if ((error= wait_if_global_read_lock(thd, 0, 1)))
     {
       my_error(ER_TABLE_NOT_LOCKED_FOR_WRITE, MYF(0), tables->table_name);
-      error= TRUE;
       goto err;
     }
-    while (global_read_lock && ! thd->killed)
-    {
-      (void) pthread_cond_wait(&COND_refresh,&LOCK_open);
-    }
-
+    else
+      need_start_waiters= TRUE;
   }
   error= mysql_rm_table_part2(thd, tables, if_exists, drop_temporary, 0, 0);
 
- err:
+err:
   pthread_mutex_unlock(&LOCK_open);
 
   pthread_mutex_lock(&thd->mysys_var->mutex);
@@ -98,6 +94,9 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
   thd->mysys_var->current_cond= 0;
   pthread_mutex_unlock(&thd->mysys_var->mutex);
 
+  if (need_start_waiters)
+    start_waiting_global_read_lock(thd);
+
   if (error)
     DBUG_RETURN(TRUE);
   send_ok(thd);
@@ -114,7 +113,7 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
     tables		List of tables to delete
     if_exists		If 1, don't give error if one table doesn't exists
     dont_log_query	Don't write query to log files. This will also not
-			generate warnings if the handler files doesn't exists  
+                        generate warnings if the handler files doesn't exists
 
  NOTES
    Works like documented in mysql_rm_table(), but don't check