Commit 118dc575 authored by unknown's avatar unknown

Merge

parent 810d558f
......@@ -87,6 +87,12 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
// Util thread variables
static pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
pthread_cond_t COND_ndb_util_thread;
extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
ulong ndb_cache_check_time;
/*
Dummy buffer to read zero pack_length fields
......@@ -2354,22 +2360,21 @@ void ha_ndbcluster::print_results()
if (!_db_on_)
DBUG_VOID_RETURN;
char buf_type[MAX_FIELD_WIDTH], buf_val[MAX_FIELD_WIDTH];
String type(buf_type, sizeof(buf_type), &my_charset_bin);
String type(buf_type, sizeof(buf_type), &my_charset_bin);
String val(buf_val, sizeof(buf_val), &my_charset_bin);
for (uint f=0; f<table->s->fields;f++)
{
// Use DBUG_PRINT since DBUG_FILE cannot be filtered out
/* Use DBUG_PRINT since DBUG_FILE cannot be filtered out */
char buf[2000];
Field *field;
void* ptr;
const NDBCOL *col= NULL;
NdbValue value;
NdbBlob *ndb_blob;
buf[0]= 0;
field= table->field[f];
field= table->field[f];
if (!(value= m_value[f]).ptr)
{
my_snprintf(buf, sizeof(buf), "not read");
......@@ -2377,8 +2382,6 @@ void ha_ndbcluster::print_results()
}
ptr= field->ptr;
DBUG_DUMP("field->ptr", (char*)ptr, field->pack_length());
col= tab->getColumn(f);
if (! (field->flags & BLOB_FLAG))
{
......@@ -2404,9 +2407,9 @@ void ha_ndbcluster::print_results()
goto print_value;
}
}
print_value:
DBUG_PRINT("value", ("%u,%s: %s", f, col->getName(), buf));
DBUG_PRINT("value", ("%u,%s: %s", f, field->field_name, buf));
}
#endif
DBUG_VOID_RETURN;
......@@ -3051,8 +3054,12 @@ int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
}
const char **ha_ndbcluster::bas_ext() const
{ static const char *ext[]= { ha_ndb_ext, NullS }; return ext; }
static const char *ha_ndb_bas_ext[]= { ha_ndb_ext, NullS };
const char**
ha_ndbcluster::bas_ext() const
{
return ha_ndb_bas_ext;
}
/*
......@@ -3220,7 +3227,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_transaction_on= FALSE;
else
m_transaction_on= thd->variables.ndb_use_transactions;
// m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
m_active_trans= thd->transaction.all.ndb_tid ?
(NdbTransaction*)thd->transaction.all.ndb_tid:
......@@ -3647,6 +3653,52 @@ static int create_ndb_column(NDBCOL &col,
Create a table in NDB Cluster
*/
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
{
if (form->s->max_rows == 0) /* default setting, don't set fragmentation */
return;
/**
* get the number of fragments right
*/
uint no_fragments;
{
#if MYSQL_VERSION_ID >= 50000
uint acc_row_size= 25 + /*safety margin*/ 2;
#else
uint acc_row_size= pk_length*4;
/* add acc overhead */
if (pk_length <= 8) /* main page will set the limit */
acc_row_size+= 25 + /*safety margin*/ 2;
else /* overflow page will set the limit */
acc_row_size+= 4 + /*safety margin*/ 4;
#endif
ulonglong acc_fragment_size= 512*1024*1024;
ulonglong max_rows= form->s->max_rows;
#if MYSQL_VERSION_ID >= 50100
no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1;
#else
no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1
+1/*correct rounding*/)/2;
#endif
}
{
uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
NDBTAB::FragmentType ftype;
if (no_fragments > 2*no_nodes)
{
ftype= NDBTAB::FragAllLarge;
if (no_fragments > 4*no_nodes)
push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
"Ndb might have problems storing the max amount of rows specified");
}
else if (no_fragments > no_nodes)
ftype= NDBTAB::FragAllMedium;
else
ftype= NDBTAB::FragAllSmall;
tab.setFragmentType(ftype);
}
}
int ha_ndbcluster::create(const char *name,
TABLE *form,
HA_CREATE_INFO *info)
......@@ -3748,7 +3800,9 @@ int ha_ndbcluster::create(const char *name,
break;
}
}
ndb_set_fragmentation(tab, form, pk_length);
if ((my_errno= check_ndb_connection()))
DBUG_RETURN(my_errno);
......@@ -4014,7 +4068,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_force_send(TRUE),
m_autoincrement_prefetch(32),
m_transaction_on(TRUE),
m_use_local_query_cache(FALSE),
m_cond_stack(NULL),
m_multi_cursor(NULL)
{
......@@ -4070,6 +4123,7 @@ ha_ndbcluster::~ha_ndbcluster()
}
/*
Open a table for further use
- fetch metadata for this table from NDB
......@@ -4170,16 +4224,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
Ndb* check_ndb_in_thd(THD* thd)
{
DBUG_ENTER("check_ndb_in_thd");
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
if (!thd_ndb)
{
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
DBUG_RETURN(NULL);
return NULL;
thd->transaction.thd_ndb= thd_ndb;
}
DBUG_RETURN(thd_ndb->ndb);
return thd_ndb->ndb;
}
......@@ -4532,13 +4584,21 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread, NULL);
// Create utility thread
pthread_t tmp;
if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
{
DBUG_PRINT("error", ("Could not create ndb utility thread"));
goto ndbcluster_init_error;
}
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
goto ndbcluster_init_error;
#endif
DBUG_RETURN(FALSE);
ndbcluster_init_error:
ndbcluster_end();
DBUG_RETURN(TRUE);
......@@ -4548,12 +4608,19 @@ bool ndbcluster_init()
/*
End use of the NDB Cluster table handler
- free all global variables allocated by
ndcluster_init()
ndbcluster_init()
*/
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
// Kill ndb utility thread
(void) pthread_mutex_lock(&LOCK_ndb_util_thread);
DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
(void) pthread_cond_signal(&COND_ndb_util_thread);
(void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
if(g_ndb)
delete g_ndb;
g_ndb= NULL;
......@@ -4564,6 +4631,8 @@ bool ndbcluster_end()
DBUG_RETURN(0);
hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex);
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
DBUG_RETURN(0);
}
......@@ -4756,16 +4825,174 @@ const char* ha_ndbcluster::index_type(uint key_number)
return "HASH";
}
}
uint8 ha_ndbcluster::table_cache_type()
{
if (m_use_local_query_cache)
return HA_CACHE_TBL_TRANSACT;
else
return HA_CACHE_TBL_NOCACHE;
DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT");
DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT);
}
uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
Uint64 *commit_count)
{
DBUG_ENTER("ndb_get_commitcount");
if (ndb_cache_check_time > 0)
{
/* Use cached commit_count from share */
char name[FN_REFLEN];
NDB_SHARE *share;
(void)strxnmov(name, FN_REFLEN,
"./",dbname,"/",tabname,NullS);
DBUG_PRINT("info", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) name,
strlen(name))))
{
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(1);
}
*commit_count= share->commit_count;
DBUG_PRINT("info", ("commit_count: %d", *commit_count));
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
/* Get commit_count from NDB */
Ndb *ndb;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(1);
ndb->setDatabaseName(dbname);
struct Ndb_statistics stat;
if (ndb_get_table_statistics(ndb, tabname, &stat))
DBUG_RETURN(1);
*commit_count= stat.commit_count;
DBUG_RETURN(0);
}
/*
Handling the shared NDB_SHARE structure that is needed to
Check if a cached query can be used.
This is done by comparing the supplied engine_data to commit_count of
the table.
The commit_count is either retrieved from the share for the table, where
it has been cached by the util thread. If the util thread is not started,
NDB has to be contacetd to retrieve the commit_count, this will introduce
a small delay while waiting for NDB to answer.
SYNOPSIS
ndbcluster_cache_retrieval_allowed
thd thread handle
full_name concatenation of database name,
the null character '\0', and the table
name
full_name_len length of the full name,
i.e. len(dbname) + len(tablename) + 1
engine_data parameter retrieved when query was first inserted into
the cache. If the value of engine_data is changed,
all queries for this table should be invalidated.
RETURN VALUE
TRUE Yes, use the query from cache
FALSE No, don't use the cached query, and if engine_data
has changed, all queries for this table should be invalidated
*/
static my_bool
ndbcluster_cache_retrieval_allowed(THD *thd,
char *full_name, uint full_name_len,
ulonglong *engine_data)
{
DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
Uint64 commit_count;
bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
char *dbname= full_name;
char *tabname= dbname+strlen(dbname)+1;
DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d",
dbname, tabname, is_autocommit));
if (!is_autocommit)
DBUG_RETURN(FALSE);
if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
{
*engine_data+= 1; /* invalidate */
DBUG_RETURN(FALSE);
}
DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu",
*engine_data, commit_count));
if (*engine_data != commit_count)
{
*engine_data= commit_count; /* invalidate */
DBUG_PRINT("exit",("Do not use cache, commit_count has changed"));
DBUG_RETURN(FALSE);
}
DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data));
DBUG_RETURN(TRUE);
}
/**
Register a table for use in the query cache. Fetch the commit_count
for the table and return it in engine_data, this will later be used
to check if the table has changed, before the cached query is reused.
SYNOPSIS
ha_ndbcluster::can_query_cache_table
thd thread handle
full_name concatenation of database name,
the null character '\0', and the table
name
full_name_len length of the full name,
i.e. len(dbname) + len(tablename) + 1
qc_engine_callback function to be called before using cache on this table
engine_data out, commit_count for this table
RETURN VALUE
TRUE Yes, it's ok to cahce this query
FALSE No, don't cach the query
*/
my_bool
ha_ndbcluster::register_query_cache_table(THD *thd,
char *full_name, uint full_name_len,
qc_engine_callback *engine_callback,
ulonglong *engine_data)
{
DBUG_ENTER("ha_ndbcluster::register_query_cache_table");
bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
m_dbname,m_tabname,is_autocommit));
if (!is_autocommit)
DBUG_RETURN(FALSE);
Uint64 commit_count;
if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
{
*engine_data= 0;
DBUG_PRINT("error", ("Could not get commitcount"))
DBUG_RETURN(FALSE);
}
*engine_data= commit_count;
*engine_callback= ndbcluster_cache_retrieval_allowed;
DBUG_PRINT("exit",("*engine_data=%llu", *engine_data));
DBUG_RETURN(TRUE);
}
/*
Handling the shared NDB_SHARE structure that is needed to
provide table locking.
It's also used for sharing data with other NDB handlers
in the same MySQL Server. There is currently not much
......@@ -4802,8 +5029,14 @@ static NDB_SHARE* get_share(const char *table_name)
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
share->commit_count= 0;
}
}
DBUG_PRINT("share",
("table_name: %s, length: %d, use_count: %d, commit_count: %d",
share->table_name, share->table_name_length, share->use_count,
share->commit_count));
share->use_count++;
pthread_mutex_unlock(&ndbcluster_mutex);
return share;
......@@ -5372,6 +5605,165 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
DBUG_RETURN(0);
}
char*
ha_ndbcluster::update_table_comment(
/* out: table comment + additional */
const char* comment)/* in: table comment defined by user */
{
uint length= strlen(comment);
if(length > 64000 - 3)
{
return((char*)comment); /* string too long */
}
Ndb* ndb;
if (!(ndb= get_ndb()))
{
return((char*)comment);
}
ndb->setDatabaseName(m_dbname);
NDBDICT* dict= ndb->getDictionary();
const NDBTAB* tab;
if (!(tab= dict->getTable(m_tabname)))
{
return((char*)comment);
}
char *str;
const char *fmt="%s%snumber_of_replicas: %d";
const unsigned fmt_len_plus_extra= length + strlen(fmt);
if ((str= my_malloc(fmt_len_plus_extra, MYF(0))) == NULL)
{
return (char*)comment;
}
snprintf(str,fmt_len_plus_extra,fmt,comment,
length > 0 ? " ":"",
tab->getReplicaCount());
return str;
}
// Utility thread main loop
extern "C" pthread_handler_decl(ndb_util_thread_func,
arg __attribute__((unused)))
{
THD *thd; /* needs to be first for thread_stack */
int error= 0;
struct timespec abstime;
my_thread_init();
DBUG_ENTER("ndb_util_thread");
DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));
thd= new THD; /* note that contructor of THD uses DBUG_ */
THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
ndb_util_thread= pthread_self();
thd->thread_stack= (char*)&thd; /* remember where our stack is */
if (thd->store_globals())
{
thd->cleanup();
delete thd;
DBUG_RETURN(NULL);
}
List<NDB_SHARE> util_open_tables;
set_timespec(abstime, ndb_cache_check_time);
for (;;)
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
error= pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread,
&abstime);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
ndb_cache_check_time));
if (abort_loop)
break; /* Shutting down server */
if (ndb_cache_check_time == 0)
{
set_timespec(abstime, 10);
continue;
}
/* Set new time to wake up */
set_timespec(abstime, ndb_cache_check_time);
/* Lock mutex and fill list with pointers to all open tables */
NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
{
share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
share->use_count++; /* Make sure the table can't be closed */
DBUG_PRINT("ndb_util_thread",
("Found open table[%d]: %s, use_count: %d",
i, share->table_name, share->use_count));
/* Store pointer to table */
util_open_tables.push_back(share);
}
pthread_mutex_unlock(&ndbcluster_mutex);
/* Iterate through the open files list */
List_iterator_fast<NDB_SHARE> it(util_open_tables);
while (share= it++)
{
/* Split tab- and dbname */
char buf[FN_REFLEN];
char *tabname, *db;
uint length= dirname_length(share->table_name);
tabname= share->table_name+length;
memcpy(buf, share->table_name, length-1);
buf[length-1]= 0;
db= buf+dirname_length(buf);
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s, db: %s, tab: %s",
share->table_name, db, tabname));
/* Contact NDB to get commit count for table */
g_ndb->setDatabaseName(db);
struct Ndb_statistics stat;;
if(ndb_get_table_statistics(g_ndb, tabname, &stat) == 0)
{
DBUG_PRINT("ndb_util_thread",
("Table: %s, rows: %llu, commit_count: %llu",
share->table_name, stat.row_count, stat.commit_count));
share->commit_count= stat.commit_count;
}
else
{
DBUG_PRINT("ndb_util_thread",
("Error: Could not get commit count for table %s",
share->table_name));
share->commit_count++; /* Invalidate */
}
/* Decrease the use count and possibly free share */
free_share(share);
}
/* Clear the list of open tables */
util_open_tables.empty();
}
thd->cleanup();
delete thd;
DBUG_PRINT("exit", ("ndb_util_thread"));
my_thread_end();
pthread_exit(0);
DBUG_RETURN(NULL);
}
/*
Condition pushdown
*/
......
......@@ -268,6 +268,9 @@ typedef struct st_table TABLE;
struct st_foreign_key_info;
typedef struct st_foreign_key_info FOREIGN_KEY_INFO;
/* Forward declaration for Condition Pushdown to Handler (CPDH) */
typedef struct Item COND;
typedef struct st_ha_check_opt
{
ulong sort_buffer_size;
......@@ -601,7 +604,6 @@ public:
*engine_callback= 0;
return 1;
}
/*
RETURN
true Primary key (if there is one) is clustered key covering all fields
......@@ -613,6 +615,12 @@ public:
{
return memcmp(ref1, ref2, ref_length);
}
/*
Condition pushdown to storage engines
*/
virtual const COND *cond_push(const COND *cond) { return cond; };
virtual void cond_pop() { return; };
};
/* Some extern variables used with handlers */
......
......@@ -408,6 +408,7 @@ struct system_variables
ulong table_type;
ulong tmp_table_size;
ulong tx_isolation;
ulong completion_type;
/* Determines which non-standard SQL behaviour should be enabled */
ulong sql_mode;
/* check of key presence in updatable view */
......@@ -431,6 +432,11 @@ struct system_variables
my_bool new_mode;
my_bool query_cache_wlock_invalidate;
my_bool engine_condition_pushdown;
#ifdef HAVE_REPLICATION
ulong sync_replication;
ulong sync_replication_slave_id;
ulong sync_replication_timeout;
#endif /* HAVE_REPLICATION */
#ifdef HAVE_INNOBASE_DB
my_bool innodb_table_locks;
#endif /* HAVE_INNOBASE_DB */
......@@ -1022,10 +1028,13 @@ public:
bool charset_is_system_charset, charset_is_collation_connection;
bool slow_command;
bool no_trans_update, abort_on_warning;
bool got_warning; /* Set on call to push_warning() */
longlong row_count_func; /* For the ROW_COUNT() function */
sp_rcontext *spcont; // SP runtime context
sp_cache *sp_proc_cache;
sp_cache *sp_func_cache;
bool shortcut_make_view; /* Don't do full mysql_make_view()
during pre-opening of tables. */
/*
If we do a purge of binary logs, log index info of the threads
......@@ -1510,9 +1519,11 @@ public:
select_max_min_finder_subselect(Item_subselect *item, bool mx)
:select_subselect(item), cache(0), fmax(mx)
{}
void cleanup();
bool send_data(List<Item> &items);
bool cmp_real();
bool cmp_int();
bool cmp_decimal();
bool cmp_str();
};
......@@ -1586,9 +1597,10 @@ class user_var_entry
ulong length, update_query_id, used_query_id;
Item_result type;
double val(my_bool *null_value);
double val_real(my_bool *null_value);
longlong val_int(my_bool *null_value);
String *val_str(my_bool *null_value, String *str, uint decimals);
my_decimal *val_decimal(my_bool *null_value, my_decimal *result);
DTCollation collation;
};
......@@ -1617,9 +1629,11 @@ public:
~Unique();
inline bool unique_add(void *ptr)
{
DBUG_ENTER("unique_add");
DBUG_PRINT("info", ("tree %u - %u", tree.elements_in_tree, max_elements));
if (tree.elements_in_tree > max_elements && flush())
return 1;
return !tree_insert(&tree, ptr, 0, tree.custom_arg);
DBUG_RETURN(1);
DBUG_RETURN(!tree_insert(&tree, ptr, 0, tree.custom_arg));
}
bool get(TABLE *table);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment