Commit a569515a authored by Nikita Malyavin's avatar Nikita Malyavin

online alter: rework savepoints

Use standard handlerton functions for savepoint add/rollback.

To identify the savepoint, the pointer passed is used.

Every table that has online alter in progress maintains a list of
savepoints independently.

Also this removes setting a value to a global variable savepoint_alloc_size
without any protection, which was a race condition bug.
parent 8311eae6
...@@ -2348,9 +2348,6 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) ...@@ -2348,9 +2348,6 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
int error= 1; int error= 1;
DBUG_ENTER("binlog_savepoint_set"); DBUG_ENTER("binlog_savepoint_set");
if (!mysql_bin_log.is_open() && !thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
char buf[1024]; char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin); String log_query(buf, sizeof(buf), &my_charset_bin);
...@@ -2383,9 +2380,6 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) ...@@ -2383,9 +2380,6 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{ {
DBUG_ENTER("binlog_savepoint_rollback"); DBUG_ENTER("binlog_savepoint_rollback");
if (!mysql_bin_log.is_open() && !thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
/* /*
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
non-transactional table. Otherwise, truncate the binlog cache starting non-transactional table. Otherwise, truncate the binlog cache starting
......
...@@ -1438,8 +1438,4 @@ int binlog_commit_by_xid(handlerton *hton, XID *xid); ...@@ -1438,8 +1438,4 @@ int binlog_commit_by_xid(handlerton *hton, XID *xid);
int binlog_rollback_by_xid(handlerton *hton, XID *xid); int binlog_rollback_by_xid(handlerton *hton, XID *xid);
bool write_bin_log_start_alter(THD *thd, bool& partial_alter, bool write_bin_log_start_alter(THD *thd, bool& partial_alter,
uint64 start_alter_id, bool log_if_exists); uint64 start_alter_id, bool log_if_exists);
int online_alter_savepoint_set(THD *thd, LEX_CSTRING name);
int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name);
#endif /* LOG_H */ #endif /* LOG_H */
...@@ -24,7 +24,19 @@ ...@@ -24,7 +24,19 @@
static handlerton *online_alter_hton; static handlerton *online_alter_hton;
typedef ilist<online_alter_cache_data> Online_alter_cache_list; typedef void *sv_id_t;
struct Online_alter_cache_list: ilist<online_alter_cache_data>
{
sv_id_t savepoint_id= 0;
};
struct table_savepoint: ilist_node<>
{
sv_id_t id;
my_off_t log_prev_pos;
table_savepoint(sv_id_t id, my_off_t pos): id(id), log_prev_pos(pos){}
};
class online_alter_cache_data: public Sql_alloc, public ilist_node<>, class online_alter_cache_data: public Sql_alloc, public ilist_node<>,
public binlog_cache_data public binlog_cache_data
...@@ -37,7 +49,32 @@ class online_alter_cache_data: public Sql_alloc, public ilist_node<>, ...@@ -37,7 +49,32 @@ class online_alter_cache_data: public Sql_alloc, public ilist_node<>,
handlerton *hton; handlerton *hton;
Cache_flip_event_log *sink_log; Cache_flip_event_log *sink_log;
SAVEPOINT *sv_list; ilist<table_savepoint> sv_list;
/**
Finds savepoint with specified id and returns its associated data.
Cleans up all the savepoints up to the found one.
*/
my_off_t pop_sv_until(sv_id_t id)
{
my_off_t pos= 0;
auto it= sv_list.begin();
auto sentinel= it->prev;
while (pos == 0 && it != sv_list.end())
{
table_savepoint &sv= *it;
++it;
if (sv.id == id)
pos= sv.log_prev_pos;
delete &sv;
}
sentinel->next= &*it; // drop the range from the list
it->prev= sentinel;
return pos;
}
void cleanup_sv()
{
pop_sv_until((sv_id_t)1); // Erase the whole list
}
}; };
...@@ -155,6 +192,7 @@ cleanup_cache_list(ilist<online_alter_cache_data> &list, bool ending_trans) ...@@ -155,6 +192,7 @@ cleanup_cache_list(ilist<online_alter_cache_data> &list, bool ending_trans)
auto &cache= *it++; auto &cache= *it++;
cache.sink_log->release(); cache.sink_log->release();
cache.reset(); cache.reset();
cache.cleanup_sv();
delete &cache; delete &cache;
} }
list.clear(); list.clear();
...@@ -230,52 +268,41 @@ int online_alter_end_trans(handlerton *hton, THD *thd, bool all, bool commit) ...@@ -230,52 +268,41 @@ int online_alter_end_trans(handlerton *hton, THD *thd, bool all, bool commit)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name,
SAVEPOINT ** const list);
SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list, int online_alter_savepoint_set(handlerton *hton, THD *thd, sv_id_t sv_id)
int (*release_old)(THD*, SAVEPOINT*));
int online_alter_savepoint_set(THD *thd, LEX_CSTRING name)
{ {
DBUG_ENTER("binlog_online_alter_savepoint"); DBUG_ENTER("binlog_online_alter_savepoint");
auto &cache_list= get_cache_list(online_alter_hton, thd); auto &cache_list= get_cache_list(hton, thd);
if (cache_list.empty()) if (cache_list.empty())
DBUG_RETURN(0); DBUG_RETURN(0);
if (savepoint_alloc_size < sizeof (SAVEPOINT) + sizeof(my_off_t))
savepoint_alloc_size= sizeof (SAVEPOINT) + sizeof(my_off_t);
for (auto &cache: cache_list) for (auto &cache: cache_list)
{ {
if (cache.hton->savepoint_set == NULL) if (cache.hton->savepoint_set == NULL)
continue; continue;
SAVEPOINT *sv= savepoint_add(thd, name, &cache.sv_list, NULL); auto *sv= new table_savepoint(sv_id, cache.get_byte_position());
if(unlikely(sv == NULL)) if(unlikely(sv == NULL))
DBUG_RETURN(1); DBUG_RETURN(1);
my_off_t *pos= (my_off_t*)(sv+1); cache.sv_list.push_front(*sv);
*pos= cache.get_byte_position();
sv->prev= cache.sv_list;
cache.sv_list= sv;
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name)
int online_alter_savepoint_rollback(handlerton *hton, THD *thd, sv_id_t sv_id)
{ {
DBUG_ENTER("online_alter_savepoint_rollback"); DBUG_ENTER("online_alter_savepoint_rollback");
auto &cache_list= get_cache_list(online_alter_hton, thd); auto &cache_list= get_cache_list(hton, thd);
for (auto &cache: cache_list) for (auto &cache: cache_list)
{ {
if (cache.hton->savepoint_set == NULL) if (cache.hton->savepoint_set == NULL)
continue; continue;
SAVEPOINT **sv= find_savepoint_in_list(thd, name, &cache.sv_list); // There's no savepoint if it was set up before online table was modified.
// sv is null if savepoint was set up before online table was modified // In that case, restore to 0.
my_off_t pos= *sv ? *(my_off_t*)(*sv+1) : 0; my_off_t pos= cache.pop_sv_until(sv_id);
cache.restore_savepoint(pos); cache.restore_savepoint(pos);
} }
...@@ -299,13 +326,11 @@ static int online_alter_log_init(void *p) ...@@ -299,13 +326,11 @@ static int online_alter_log_init(void *p)
{ {
online_alter_hton= (handlerton *)p; online_alter_hton= (handlerton *)p;
online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER; online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER;
online_alter_hton->savepoint_offset= sizeof(my_off_t); online_alter_hton->savepoint_offset= 0;
online_alter_hton->close_connection= online_alter_close_connection; online_alter_hton->close_connection= online_alter_close_connection;
online_alter_hton->savepoint_set= // Done by online_alter_savepoint_set online_alter_hton->savepoint_set= online_alter_savepoint_set;
[](handlerton *, THD *, void *){ return 0; }; online_alter_hton->savepoint_rollback= online_alter_savepoint_rollback;
online_alter_hton->savepoint_rollback= // Done by online_alter_savepoint_rollback
[](handlerton *, THD *, void *){ return 0; };
online_alter_hton->savepoint_rollback_can_release_mdl= online_alter_hton->savepoint_rollback_can_release_mdl=
[](handlerton *hton, THD *thd){ return true; }; [](handlerton *hton, THD *thd){ return true; };
......
...@@ -641,10 +641,6 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name) ...@@ -641,10 +641,6 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name)
if (unlikely(ha_savepoint(thd, newsv))) if (unlikely(ha_savepoint(thd, newsv)))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
int error= online_alter_savepoint_set(thd, name);
if (unlikely(error))
DBUG_RETURN(error);
newsv->prev= thd->transaction->savepoints; newsv->prev= thd->transaction->savepoints;
thd->transaction->savepoints= newsv; thd->transaction->savepoints= newsv;
...@@ -704,8 +700,6 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name) ...@@ -704,8 +700,6 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name)
ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)); ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
res= res || online_alter_savepoint_rollback(thd, name);
thd->transaction->savepoints= sv; thd->transaction->savepoints= sv;
if (res) if (res)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment