Commit dd01c46f authored by unknown's avatar unknown

Bug #26021 - valgrind reports error regarding handle_trailing_share and client thread share usage

- add ndb_share connect_count to decide if share can be reused to setup replication


sql/ha_ndbcluster.cc:
  add additional logging info
  remove inline real_free_share (not needed) and confuses valgrind printout
sql/ha_ndbcluster.h:
  add connect_count to ndb_share to enable checking for version of share
sql/ha_ndbcluster_binlog.cc:
  use ndb_share connect_count to decide if share can be reused to setup replication
  - share will be created early if table is accessed prior to handler setting up binlog stuff, see bug 26021
sql/ha_ndbcluster_binlog.h:
  remove inline real_free_share (not needed) and confuses valgrind printout
parent c101bc1f
...@@ -6837,7 +6837,7 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) ...@@ -6837,7 +6837,7 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type)
fprintf(stderr, "NDB: table share %s with use_count %d not freed\n", fprintf(stderr, "NDB: table share %s with use_count %d not freed\n",
share->key, share->use_count); share->key, share->use_count);
#endif #endif
real_free_share(&share); ndbcluster_real_free_share(&share);
} }
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
} }
...@@ -7449,14 +7449,20 @@ int handle_trailing_share(NDB_SHARE *share) ...@@ -7449,14 +7449,20 @@ int handle_trailing_share(NDB_SHARE *share)
bzero((char*) &table_list,sizeof(table_list)); bzero((char*) &table_list,sizeof(table_list));
table_list.db= share->db; table_list.db= share->db;
table_list.alias= table_list.table_name= share->table_name; table_list.alias= table_list.table_name= share->table_name;
safe_mutex_assert_owner(&LOCK_open);
close_cached_tables(thd, 0, &table_list, TRUE); close_cached_tables(thd, 0, &table_list, TRUE);
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
if (!--share->use_count) if (!--share->use_count)
{ {
DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.", if (ndb_extra_logging)
share->key)); sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) "
real_free_share(&share); "released by close_cached_tables at "
"connect_count: %u",
share->key,
share->connect_count,
g_ndb_cluster_connection->get_connect_count());
ndbcluster_real_free_share(&share);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -7466,10 +7472,14 @@ int handle_trailing_share(NDB_SHARE *share) ...@@ -7466,10 +7472,14 @@ int handle_trailing_share(NDB_SHARE *share)
*/ */
if (share->state != NSS_DROPPED && !--share->use_count) if (share->state != NSS_DROPPED && !--share->use_count)
{ {
DBUG_PRINT("info", ("NDB_SHARE: %s already exists, " if (ndb_extra_logging)
"use_count=%d state != NSS_DROPPED.", sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) "
share->key, share->use_count)); "released after NSS_DROPPED check "
real_free_share(&share); "at connect_count: %u",
share->key,
share->connect_count,
g_ndb_cluster_connection->get_connect_count());
ndbcluster_real_free_share(&share);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
DBUG_PRINT("error", ("NDB_SHARE: %s already exists use_count=%d.", DBUG_PRINT("error", ("NDB_SHARE: %s already exists use_count=%d.",
...@@ -7736,7 +7746,7 @@ void ndbcluster_free_share(NDB_SHARE **share, bool have_lock) ...@@ -7736,7 +7746,7 @@ void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
(*share)->util_lock= 0; (*share)->util_lock= 0;
if (!--(*share)->use_count) if (!--(*share)->use_count)
{ {
real_free_share(share); ndbcluster_real_free_share(share);
} }
else else
{ {
......
...@@ -108,6 +108,7 @@ typedef struct st_ndbcluster_share { ...@@ -108,6 +108,7 @@ typedef struct st_ndbcluster_share {
char *table_name; char *table_name;
Ndb::TupleIdRange tuple_id_range; Ndb::TupleIdRange tuple_id_range;
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
uint32 connect_count;
uint32 flags; uint32 flags;
NdbEventOperation *op; NdbEventOperation *op;
NdbEventOperation *op_old; // for rename table NdbEventOperation *op_old; // for rename table
......
...@@ -362,6 +362,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -362,6 +362,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
int do_event_op= ndb_binlog_running; int do_event_op= ndb_binlog_running;
DBUG_ENTER("ndbcluster_binlog_init_share"); DBUG_ENTER("ndbcluster_binlog_init_share");
share->connect_count= g_ndb_cluster_connection->get_connect_count();
share->op= 0; share->op= 0;
share->table= 0; share->table= 0;
...@@ -605,7 +607,7 @@ static int ndbcluster_binlog_end(THD *thd) ...@@ -605,7 +607,7 @@ static int ndbcluster_binlog_end(THD *thd)
("table->s->db.table_name: %s.%s", ("table->s->db.table_name: %s.%s",
share->table->s->db.str, share->table->s->table_name.str)); share->table->s->db.str, share->table->s->table_name.str));
if (share->state != NSS_DROPPED && !--share->use_count) if (share->state != NSS_DROPPED && !--share->use_count)
real_free_share(&share); ndbcluster_real_free_share(&share);
else else
{ {
DBUG_PRINT("share", DBUG_PRINT("share",
...@@ -2443,11 +2445,20 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ...@@ -2443,11 +2445,20 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
if (share->connect_count !=
g_ndb_cluster_connection->get_connect_count())
{
handle_trailing_share(share); handle_trailing_share(share);
share= NULL;
}
} }
/* Create share which is needed to hold replication information */ /* Create share which is needed to hold replication information */
if (!(share= get_share(key, 0, TRUE, TRUE))) if (share)
{
++share->use_count;
}
else if (!(share= get_share(key, 0, TRUE, TRUE)))
{ {
sql_print_error("NDB Binlog: " sql_print_error("NDB Binlog: "
"allocating table share for %s failed", key); "allocating table share for %s failed", key);
......
...@@ -208,11 +208,6 @@ inline void free_share(NDB_SHARE **share, bool have_lock= FALSE) ...@@ -208,11 +208,6 @@ inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
ndbcluster_free_share(share, have_lock); ndbcluster_free_share(share, have_lock);
} }
inline void real_free_share(NDB_SHARE **share)
{
ndbcluster_real_free_share(share);
}
inline inline
Thd_ndb * Thd_ndb *
get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton->slot]; } get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton->slot]; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment