Commit f9c16f26 authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new
parents c9d4d6aa bd9f9533
...@@ -804,7 +804,7 @@ void ndbcluster_setup_binlog_table_shares(THD *thd) ...@@ -804,7 +804,7 @@ void ndbcluster_setup_binlog_table_shares(THD *thd)
#define SCHEMA_SLOCK_SIZE 32u #define SCHEMA_SLOCK_SIZE 32u
#define SCHEMA_QUERY_SIZE 4096u #define SCHEMA_QUERY_SIZE 4096u
struct Cluster_replication_schema struct Cluster_schema
{ {
unsigned char db_length; unsigned char db_length;
char db[64]; char db[64];
...@@ -825,7 +825,7 @@ struct Cluster_replication_schema ...@@ -825,7 +825,7 @@ struct Cluster_replication_schema
Transfer schema table data into corresponding struct Transfer schema table data into corresponding struct
*/ */
static void ndbcluster_get_schema(TABLE *table, static void ndbcluster_get_schema(TABLE *table,
Cluster_replication_schema *s) Cluster_schema *s)
{ {
Field **field; Field **field;
/* db varchar 1 length byte */ /* db varchar 1 length byte */
...@@ -1153,7 +1153,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -1153,7 +1153,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
/* remove any unsubscribed from share->slock */ /* remove any unsubscribed from share->slock */
bitmap_intersect(&share->slock_bitmap, &schema_subscribers); bitmap_intersect(&share->slock_bitmap, &schema_subscribers);
DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, DBUG_DUMP("share->slock_bitmap.bitmap",
(char*)share->slock_bitmap.bitmap,
no_bytes_in_map(&share->slock_bitmap)); no_bytes_in_map(&share->slock_bitmap));
if (bitmap_is_clear_all(&share->slock_bitmap)) if (bitmap_is_clear_all(&share->slock_bitmap))
...@@ -1484,9 +1485,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, ...@@ -1484,9 +1485,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
static int static int
ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
NdbEventOperation *pOp, NdbEventOperation *pOp,
List<Cluster_replication_schema> List<Cluster_schema>
*post_epoch_log_list, *post_epoch_log_list,
List<Cluster_replication_schema> List<Cluster_schema>
*post_epoch_unlock_list, *post_epoch_unlock_list,
MEM_ROOT *mem_root) MEM_ROOT *mem_root)
{ {
...@@ -1497,48 +1498,51 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1497,48 +1498,51 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
NDBEVENT::TableEvent ev_type= pOp->getEventType(); NDBEVENT::TableEvent ev_type= pOp->getEventType();
DBUG_PRINT("enter", ("%s.%s ev_type: %d", DBUG_PRINT("enter", ("%s.%s ev_type: %d",
share->db, share->table_name, ev_type)); share->db, share->table_name, ev_type));
switch (ev_type) if (ev_type == NDBEVENT::TE_UPDATE ||
{ ev_type == NDBEVENT::TE_INSERT)
case NDBEVENT::TE_UPDATE:
/* fall through */
case NDBEVENT::TE_INSERT:
{ {
Cluster_replication_schema *schema= (Cluster_replication_schema *) Cluster_schema *schema= (Cluster_schema *)
sql_alloc(sizeof(Cluster_replication_schema)); sql_alloc(sizeof(Cluster_schema));
MY_BITMAP slock; MY_BITMAP slock;
bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
uint node_id= g_ndb_cluster_connection->node_id(); uint node_id= g_ndb_cluster_connection->node_id();
ndbcluster_get_schema(share->table, schema); ndbcluster_get_schema(share->table, schema);
if (schema->node_id != node_id) if (schema->node_id != node_id)
{ {
int log_query= 0; int log_query= 0, post_epoch_unlock= 0;
DBUG_PRINT("info", ("log query_length: %d query: '%s'", DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query)); schema->query_length, schema->query));
char key[FN_REFLEN]; char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, ""); build_table_filename(key, sizeof(key), schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false); NDB_SHARE *share= get_share(key, 0, false, false);
switch ((enum SCHEMA_OP_TYPE)schema->type) switch ((enum SCHEMA_OP_TYPE)schema->type)
{ {
case SOT_DROP_TABLE: case SOT_DROP_TABLE:
/* binlog dropping table after any table operations */ /* binlog dropping table after any table operations */
if (share && share->op) if (share && share->op)
{
post_epoch_log_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
}
/* table is either ignored or logging is postponed to later */
log_query= 0; log_query= 0;
break; break;
case SOT_RENAME_TABLE: case SOT_RENAME_TABLE:
if (share && share->op) if (share && share->op)
{ {
log_query= 0;
post_epoch_log_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
break; /* discovery will be handled by binlog */ break; /* discovery will be handled by binlog */
} }
goto sot_create_table; goto sot_create_table;
case SOT_ALTER_TABLE: case SOT_ALTER_TABLE:
if (share && share->op) if (share && share->op)
{ {
log_query= 0;
post_epoch_log_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
break; /* discovery will be handled by binlog */ break; /* discovery will be handled by binlog */
} }
goto sot_create_table; goto sot_create_table;
...@@ -1571,8 +1575,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1571,8 +1575,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
TRUE); /* don't binlog the query */ TRUE); /* don't binlog the query */
/* binlog dropping database after any table operations */ /* binlog dropping database after any table operations */
if (ndb_binlog_running) if (ndb_binlog_running)
{
post_epoch_log_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root);
log_query= 0; /* acknowledge this query _after_ epoch completion */
post_epoch_unlock= 1;
}
break; break;
case SOT_CREATE_DB: case SOT_CREATE_DB:
/* fall through */ /* fall through */
...@@ -1581,7 +1588,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1581,7 +1588,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema->query + schema->query_length, schema->query + schema->query_length,
TRUE, /* print error */ TRUE, /* print error */
FALSE); /* binlog the query */ FALSE); /* binlog the query */
log_query= 0;
break; break;
case SOT_CLEAR_SLOCK: case SOT_CLEAR_SLOCK:
{ {
...@@ -1609,25 +1615,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1609,25 +1615,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
free_share(&share); free_share(&share);
share= 0; share= 0;
} }
/* signal that schema operation has been handled */
if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
{
DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
if (bitmap_is_set(&slock, node_id))
{
/*
If it is an SOT_ALTER_TABLE we need to acknowledge the
schema operation _after_ all the events have been
processed so that all schema events coming through
the event operation has been processed
*/
if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE)
post_epoch_unlock_list->push_back(schema, mem_root);
else
ndbcluster_update_slock(thd, schema->db, schema->name);
}
}
if (log_query && ndb_binlog_running) if (log_query && ndb_binlog_running)
{ {
char *thd_db_save= thd->db; char *thd_db_save= thd->db;
...@@ -1637,9 +1624,23 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1637,9 +1624,23 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema->name[0] == 0 || thd->db[0] == 0); schema->name[0] == 0 || thd->db[0] == 0);
thd->db= thd_db_save; thd->db= thd_db_save;
} }
/* signal that schema operation has been handled */
DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
if (bitmap_is_set(&slock, node_id))
{
if (post_epoch_unlock)
post_epoch_unlock_list->push_back(schema, mem_root);
else
ndbcluster_update_slock(thd, schema->db, schema->name);
} }
} }
break; DBUG_RETURN(0);
}
/*
the normal case of UPDATE/INSERT has already been handled
*/
switch (ev_type)
{
case NDBEVENT::TE_DELETE: case NDBEVENT::TE_DELETE:
// skip // skip
break; break;
...@@ -1726,13 +1727,13 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1726,13 +1727,13 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
*/ */
static void static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
List<Cluster_replication_schema> List<Cluster_schema>
*post_epoch_log_list, *post_epoch_log_list,
List<Cluster_replication_schema> List<Cluster_schema>
*post_epoch_unlock_list) *post_epoch_unlock_list)
{ {
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch"); DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
Cluster_replication_schema *schema; Cluster_schema *schema;
while ((schema= post_epoch_log_list->pop())) while ((schema= post_epoch_log_list->pop()))
{ {
DBUG_PRINT("info", ("log query_length: %d query: '%s'", DBUG_PRINT("info", ("log query_length: %d query: '%s'",
...@@ -2120,7 +2121,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, ...@@ -2120,7 +2121,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
} }
if (share->flags & NSF_NO_BINLOG) if (share->flags & NSF_NO_BINLOG)
{ {
DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG)); DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
share->flags, share->flags & NSF_NO_BINLOG));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2137,7 +2139,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, ...@@ -2137,7 +2139,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
share->key); share->key);
if (push_warning) if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION), ER_ILLEGAL_HA_CREATE_OPTION,
ER(ER_ILLEGAL_HA_CREATE_OPTION),
ndbcluster_hton.name, ndbcluster_hton.name,
"Binlog of table with BLOB attribute and no PK"); "Binlog of table with BLOB attribute and no PK");
...@@ -2268,7 +2271,8 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2268,7 +2271,8 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
if (share->flags & NSF_NO_BINLOG) if (share->flags & NSF_NO_BINLOG)
{ {
DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags)); DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
share->flags));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2690,7 +2694,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2690,7 +2694,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
{ {
case NDBEVENT::TE_INSERT: case NDBEVENT::TE_INSERT:
row.n_inserts++; row.n_inserts++;
DBUG_PRINT("info", ("INSERT INTO %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info", ("INSERT INTO %s.%s",
table_s->db.str, table_s->table_name.str));
{ {
if (share->flags & NSF_BLOB_FLAG) if (share->flags & NSF_BLOB_FLAG)
{ {
...@@ -2701,14 +2706,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2701,14 +2706,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
DBUG_ASSERT(ret == 0); DBUG_ASSERT(ret == 0);
} }
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
int ret= trans.write_row(::server_id, injector::transaction::table(table, true), int ret= trans.write_row(::server_id,
injector::transaction::table(table, true),
&b, n_fields, table->record[0]); &b, n_fields, table->record[0]);
DBUG_ASSERT(ret == 0); DBUG_ASSERT(ret == 0);
} }
break; break;
case NDBEVENT::TE_DELETE: case NDBEVENT::TE_DELETE:
row.n_deletes++; row.n_deletes++;
DBUG_PRINT("info",("DELETE FROM %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info",("DELETE FROM %s.%s",
table_s->db.str, table_s->table_name.str));
{ {
/* /*
table->record[0] contains only the primary key in this case table->record[0] contains only the primary key in this case
...@@ -2737,14 +2744,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2737,14 +2744,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
} }
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
DBUG_EXECUTE("info", print_records(table, table->record[n]);); DBUG_EXECUTE("info", print_records(table, table->record[n]););
int ret= trans.delete_row(::server_id, injector::transaction::table(table, true), int ret= trans.delete_row(::server_id,
injector::transaction::table(table, true),
&b, n_fields, table->record[n]); &b, n_fields, table->record[n]);
DBUG_ASSERT(ret == 0); DBUG_ASSERT(ret == 0);
} }
break; break;
case NDBEVENT::TE_UPDATE: case NDBEVENT::TE_UPDATE:
row.n_updates++; row.n_updates++;
DBUG_PRINT("info", ("UPDATE %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info", ("UPDATE %s.%s",
table_s->db.str, table_s->table_name.str));
{ {
if (share->flags & NSF_BLOB_FLAG) if (share->flags & NSF_BLOB_FLAG)
{ {
...@@ -3025,15 +3034,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3025,15 +3034,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
MEM_ROOT *old_root= *root_ptr; MEM_ROOT *old_root= *root_ptr;
MEM_ROOT mem_root; MEM_ROOT mem_root;
init_sql_alloc(&mem_root, 4096, 0); init_sql_alloc(&mem_root, 4096, 0);
List<Cluster_replication_schema> post_epoch_log_list; List<Cluster_schema> post_epoch_log_list;
List<Cluster_replication_schema> post_epoch_unlock_list; List<Cluster_schema> post_epoch_unlock_list;
*root_ptr= &mem_root; *root_ptr= &mem_root;
if (unlikely(schema_res > 0)) if (unlikely(schema_res > 0))
{ {
schema_ndb-> schema_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
schema_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); schema_ndb->
setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
NdbEventOperation *pOp= schema_ndb->nextEvent(); NdbEventOperation *pOp= schema_ndb->nextEvent();
while (pOp != NULL) while (pOp != NULL)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment