Commit bdbb091b authored by unknown's avatar unknown

Merged Thd_ndb


ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Auto merged
sql/ha_ndbcluster.h:
  Auto merged
sql/handler.cc:
  Auto merged
sql/sql_base.cc:
  Auto merged
sql/sql_table.cc:
  Auto merged
parents 7f1fcf66 877ba448
......@@ -241,6 +241,7 @@ enum ha_base_keytype {
#define HA_CREATE_TMP_TABLE 4
#define HA_CREATE_CHECKSUM 8
#define HA_CREATE_DELAY_KEY_WRITE 64
#define HA_CREATE_FROM_ENGINE 128
/* Bits in flag to _status */
......@@ -287,7 +288,7 @@ enum ha_base_keytype {
#define HA_ERR_ROW_IS_REFERENCED 152 /* Cannot delete a parent row */
#define HA_ERR_NO_SAVEPOINT 153 /* No savepoint with that name */
#define HA_ERR_NON_UNIQUE_BLOCK_SIZE 154 /* Non unique key block size */
#define HA_ERR_OLD_METADATA 155 /* The frm file on disk is old */
#define HA_ERR_NO_SUCH_TABLE 155 /* The table does not exist in engine */
#define HA_ERR_TABLE_EXIST 156 /* The table existed in storage engine */
#define HA_ERR_NO_CONNECTION 157 /* Could not connect to storage engine */
......
......@@ -496,6 +496,7 @@ if [ x$SOURCE_DIST = x1 ] ; then
CHARSETSDIR="$BASEDIR/sql/share/charsets"
INSTALL_DB="./install_test_db"
MYSQL_FIX_SYSTEM_TABLES="$BASEDIR/scripts/mysql_fix_privilege_tables"
NDB_TOOLS_DIR="$BASEDIR/ndb/tools"
else
if test -x "$BASEDIR/libexec/mysqld"
then
......@@ -515,6 +516,7 @@ else
MYSQL="$CLIENT_BINDIR/mysql"
INSTALL_DB="./install_test_db --bin"
MYSQL_FIX_SYSTEM_TABLES="$CLIENT_BINDIR/mysql_fix_privilege_tables"
NDB_TOOLS_DIR="$CLIENT_BINDIR"
if test -d "$BASEDIR/share/mysql/english"
then
LANGUAGE="$BASEDIR/share/mysql/english/"
......@@ -561,7 +563,8 @@ MYSQL_DUMP="$MYSQL_DUMP --no-defaults -uroot --socket=$MASTER_MYSOCK --password=
MYSQL_BINLOG="$MYSQL_BINLOG --no-defaults --local-load=$MYSQL_TMP_DIR $EXTRA_MYSQLBINLOG_OPT"
MYSQL_FIX_SYSTEM_TABLES="$MYSQL_FIX_SYSTEM_TABLES --no-defaults --host=localhost --port=$MASTER_MYPORT --socket=$MASTER_MYSOCK --user=root --password=$DBPASSWD --basedir=$BASEDIR --bindir=$CLIENT_BINDIR --verbose"
MYSQL="$MYSQL --host=localhost --port=$MASTER_MYPORT --socket=$MASTER_MYSOCK --user=root --password=$DBPASSWD"
export MYSQL MYSQL_DUMP MYSQL_BINLOG MYSQL_FIX_SYSTEM_TABLES CLIENT_BINDIR
export MYSQL MYSQL_DUMP MYSQL_BINLOG MYSQL_FIX_SYSTEM_TABLES CLIENT_BINDIR
export NDB_TOOLS_DIR
MYSQL_TEST_ARGS="--no-defaults --socket=$MASTER_MYSOCK --database=$DB \
--user=$DBUSER --password=$DBPASSWD --silent -v --skip-safemalloc \
......
drop table if exists t1,t2,t3,t4,t5,t6,t9;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t9;
flush status;
create table t1(
id int not null primary key,
......@@ -94,8 +94,6 @@ ERROR 42S01: Table 't3' already exists
show status like 'handler_discover%';
Variable_name Value
Handler_discover 1
SHOW TABLES FROM test;
Tables_in_test
create table IF NOT EXISTS t3(
id int not null primary key,
id2 int not null,
......@@ -119,6 +117,40 @@ Variable_name Value
Handler_discover 2
drop table t3;
flush status;
create table t7(
id int not null primary key,
name char(255)
) engine=ndb;
create table t6(
id int not null primary key,
name char(255)
) engine=MyISAM;
insert into t7 values (1, "Explorer");
insert into t6 values (2, "MyISAM table");
select * from t7;
id name
1 Explorer
show status like 'handler_discover%';
Variable_name Value
Handler_discover 0
flush tables;
show tables from test;
Tables_in_test
t6
t7
show status like 'handler_discover%';
Variable_name Value
Handler_discover 1
flush tables;
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t6 MyISAM 9 Fixed 1 260 # # # 0 NULL # # NULL # NULL
t7 ndbcluster 9 Fixed 100 0 # # # 0 NULL # # NULL # NULL
show status like 'handler_discover%';
Variable_name Value
Handler_discover 2
drop table t6, t7;
flush status;
show status like 'handler_discover%';
Variable_name Value
Handler_discover 0
......
-- source include/have_ndb.inc
--disable_warnings
drop table if exists t1,t2,t3,t4,t5,t6,t9;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t9;
--enable_warnings
################################################
......@@ -122,7 +122,6 @@ create table t3(
# IF NOT EXISTS wasn't specified
show status like 'handler_discover%';
SHOW TABLES FROM test;
# now it should be discovered
create table IF NOT EXISTS t3(
......@@ -145,38 +144,76 @@ show status like 'handler_discover%';
drop table t3;
#######################################################
# Test that a table that already exists as frm file
# but not in NDB can be deleted from disk.
##################################################
# Test that a table that already exists in NDB
# is discovered when SHOW TABLES
# is used
#
# Manual test
#flush status;
#
#create table t4(
# id int not null primary key,
# name char(27)
#) engine=ndb;
#insert into t4 values (1, "Automatic");
#select * from t4;
flush status;
create table t7(
id int not null primary key,
name char(255)
) engine=ndb;
create table t6(
id int not null primary key,
name char(255)
) engine=MyISAM;
insert into t7 values (1, "Explorer");
insert into t6 values (2, "MyISAM table");
select * from t7;
show status like 'handler_discover%';
# Remove the frm file from disk
flush tables;
system rm var/master-data/test/t7.frm ;
show tables from test;
show status like 'handler_discover%';
# Remove the frm file from disk again
flush tables;
system rm var/master-data/test/t7.frm ;
--replace_column 7 # 8 # 9 # 12 # 13 # 15 #
show table status;
show status like 'handler_discover%';
drop table t6, t7;
#######################################################
# Test that a table that has been dropped from NDB
# but still exists on disk, get a consistent error message
# saying "No such table existed"
#
flush status;
create table t4(
id int not null primary key,
name char(27)
) engine=ndb;
insert into t4 values (1, "Automatic");
select * from t4;
# Remove the table from NDB
#system drop_tab -c "$NDB_CONNECTSTRING2" -d test t4 > /dev/null ;
#system drop_tab -c "host=localhost:2200;nodeid=5" -d test t4 > /dev/null ;
#
#--error 1296
#select * from t4;
#
#flush table t4;
#--error 1016
#select * from t4;
#
#show status like 'handler_discover%';
#drop table t4;
#flush tables;
#show tables;
#--error 1146
#select * from t4;
system exec $NDB_TOOLS_DIR/ndb_drop_table -d test t4;
system exec ../ndb/tools/ndb_show_tables > show_tables.log;
# Test that correct error is returned
--error 1146
select * from t4;
--error 1146
select * from t4;
show status like 'handler_discover%';
drop table t4;
show tables;
#########################################################
......@@ -241,7 +278,6 @@ show status like 'handler_discover%';
drop table t6;
######################################################
# Simple test to show use of discover on startup
# Note! This should always be the last step in this
# file, the table t9 will be used and dropped
# by ndb_autodiscover2
......@@ -259,8 +295,7 @@ system rm var/master-data/test/t9.frm ;
# Now leave test case, when ndb_autodiscover2 will run, this
# MySQL Server will have been restarted because it has a
# ndb_autodiscover2-master.opt file. And thus the table should
# have been discovered by the "discover on startup" function.
# ndb_autodiscover2-master.opt file.
#TODO
#SLECT * FROM t1, t2, t4;
......
-- source include/have_ndb.inc
#
# Simple test to show use of discover on startup
# Simple test to show use of discover when the server has been restarted
# The previous step has simply removed the frm file
# from disk, but left the table in NDB
#
--sleep 3;
select * from t9 order by a;
# handler_discover should be zero
# handler_discover should be 1
show status like 'handler_discover%';
drop table t9;
......@@ -1087,8 +1087,6 @@ public:
int waitUntilReady(int timeout = 60);
void connected(Uint32 block_reference);
/** @} *********************************************************************/
/**
......@@ -1447,6 +1445,9 @@ private:
void setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName, const char* aSchemaName);
void connected(Uint32 block_reference);
NdbConnection* startTransactionLocal(Uint32 aPrio, Uint32 aFragmentId);
// Connect the connection object to the Database.
......
......@@ -868,6 +868,8 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
const int noerrcodes,
const int temporaryMask)
{
DBUG_ENTER("NdbDictInterface::dictSignal");
DBUG_PRINT("enter", ("useMasterNodeId: %d", useMasterNodeId));
for(Uint32 i = 0; i<RETRIES; i++){
//if (useMasterNodeId == 0)
m_buffer.clear();
......@@ -887,7 +889,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
if(aNodeId == 0){
m_error.code = 4009;
m_transporter->unlock_mutex();
return -1;
DBUG_RETURN(-1);
}
{
int r;
......@@ -923,7 +925,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
// Normal return
return 0;
DBUG_RETURN(0);
}
/**
......@@ -946,9 +948,9 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
continue;
}
return -1;
DBUG_RETURN(-1);
}
return -1;
DBUG_RETURN(-1);
}
/*****************************************************************
......
......@@ -942,6 +942,8 @@ TransporterFacade::isConnected(NodeId aNodeId){
NodeId
TransporterFacade::get_an_alive_node()
{
DBUG_ENTER("TransporterFacade::get_an_alive_node");
DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
#ifdef VM_TRACE
const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
if (p != 0 && *p != 0)
......@@ -950,17 +952,19 @@ TransporterFacade::get_an_alive_node()
NodeId i;
for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
if (get_node_alive(i)){
DBUG_PRINT("info", ("Node %d is alive", i));
theStartNodeId = ((i + 1) % MAX_NDB_NODES);
return i;
DBUG_RETURN(i);
}
}
for (i = 1; i < theStartNodeId; i++) {
if (get_node_alive(i)){
DBUG_PRINT("info", ("Node %d is alive", i));
theStartNodeId = ((i + 1) % MAX_NDB_NODES);
return i;
DBUG_RETURN(i);
}
}
return (NodeId)0;
DBUG_RETURN((NodeId)0);
}
TransporterFacade::ThreadData::ThreadData(Uint32 size){
......
......@@ -61,6 +61,9 @@ void Ndb_cluster_connection::connect_thread()
printf("Ndb_cluster_connection::connect_thread error\n");
DBUG_ASSERT(false);
g_run_connect_thread= 0;
} else {
// Wait before making a new connect attempt
NdbSleep_SecSleep(1);
}
} while (g_run_connect_thread);
if (m_connect_callback)
......
......@@ -125,47 +125,4 @@ int writefrm(const char *name, const void *frmdata, uint len)
/*
Try to discover table from handler and
if found, write the frm file to disk.
RETURN VALUES:
0 : Table existed in handler and created
on disk if so requested
1 : Table does not exist
>1 : error
*/
int create_table_from_handler(const char *db,
const char *name,
bool create_if_found)
{
int error= 0;
const void* frmblob = NULL;
char path[FN_REFLEN];
uint frmlen = 0;
DBUG_ENTER("create_table_from_handler");
DBUG_PRINT("enter", ("create_if_found: %d", create_if_found));
if (ha_discover(db, name, &frmblob, &frmlen))
DBUG_RETURN(1); // Table does not exist
// Table exists in handler
if (create_if_found)
{
(void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS);
// Save the frm file
error = writefrm(path, frmblob, frmlen);
}
if (frmblob)
my_free((char*) frmblob,MYF(0));
DBUG_RETURN(error);
}
int table_exists_in_handler(const char *db,
const char *name)
{
return (create_table_from_handler(db, name, false) == 0);
}
......@@ -32,7 +32,6 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp>
#define USE_DISCOVER_ON_STARTUP
//#define USE_NDB_POOL
// Default value for parallelism
......@@ -48,11 +47,13 @@ static const ha_rows autoincrement_prefetch= 32;
// connectstring to cluster if given by mysqld
const char *ndbcluster_connectstring= 0;
static const char *ha_ndb_ext=".ndb";
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
#define ERR_PRINT(err) \
DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
DBUG_PRINT("error", ("%d message: %s", err.code, err.message))
#define ERR_RETURN(err) \
{ \
......@@ -107,7 +108,9 @@ static const err_code_mapping err_map[]=
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
{ 241, HA_ERR_OLD_METADATA },
{ 709, HA_ERR_NO_SUCH_TABLE },
{ 284, HA_ERR_NO_SUCH_TABLE },
{ 266, HA_ERR_LOCK_WAIT_TIMEOUT },
{ 274, HA_ERR_LOCK_WAIT_TIMEOUT },
......@@ -268,6 +271,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
NDBDICT *dict= m_ndb->getDictionary();
DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
dict->invalidateTable(m_tabname);
table->version=0L; /* Free when thread is ready */
break;
}
default:
......@@ -307,7 +311,8 @@ bool ha_ndbcluster::get_error_message(int error,
/*
Check if type is supported by NDB.
TODO Use this once, not in every operation
TODO Use this once in open(), not in every operation
*/
static inline bool ndb_supported_type(enum_field_types type)
......@@ -631,7 +636,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_PRINT("error",
("Wrong number of columns, ndb: %d mysql: %d",
ndb_columns, mysql_columns));
DBUG_RETURN(HA_ERR_OLD_METADATA);
DBUG_RETURN(3);
}
/*
......@@ -655,7 +660,7 @@ int ha_ndbcluster::get_metadata(const char *path)
memcmp(pack_data, tab->getFrmData(), pack_length)));
DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
error= HA_ERR_OLD_METADATA;
error= 2;
}
my_free((char*)data, MYF(0));
my_free((char*)pack_data, MYF(0));
......@@ -2613,7 +2618,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const
{ static const char *ext[1]= { NullS }; return ext; }
{ static const char *ext[]= { ".ndb", NullS }; return ext; }
/*
......@@ -2799,18 +2804,26 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd->transaction.stmt.ndb_tid= 0;
}
}
if (m_active_trans)
DBUG_PRINT("warning", ("m_active_trans != NULL"));
/*
This is the place to make sure this handler instance
no longer are connected to the active transaction.
And since the handler is no longer part of the transaction
it can't have open cursors, ops or blobs pending.
*/
m_active_trans= NULL;
if (m_active_cursor)
DBUG_PRINT("warning", ("m_active_cursor != NULL"));
m_active_cursor= NULL;
if (blobs_pending)
DBUG_PRINT("warning", ("blobs_pending != 0"));
blobs_pending= 0;
if (ops_pending)
DBUG_PRINT("warning", ("ops_pending != 0L"));
m_active_trans= NULL;
m_active_cursor= NULL;
ops_pending= 0;
blobs_pending= 0;
}
DBUG_RETURN(error);
}
......@@ -3121,12 +3134,24 @@ int ha_ndbcluster::create(const char *name,
const void *data, *pack_data;
const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN];
bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
DBUG_ENTER("create");
DBUG_PRINT("enter", ("name: %s", name));
fn_format(name2, name, "", "",2); // Remove the .frm extension
set_dbname(name2);
set_tabname(name2);
set_tabname(name2);
if (create_from_engine)
{
/*
Table alreay exists in NDB and frm file has been created by
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
my_errno= write_ndb_file();
DBUG_RETURN(my_errno);
}
DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname);
......@@ -3167,16 +3192,12 @@ int ha_ndbcluster::create(const char *name,
tab.addColumn(col);
}
my_errno= 0;
if (check_ndb_connection())
{
my_errno= HA_ERR_NO_CONNECTION;
if ((my_errno= check_ndb_connection()))
DBUG_RETURN(my_errno);
}
// Create the table in NDB
NDBDICT *dict= m_ndb->getDictionary();
if (dict->createTable(tab))
if (dict->createTable(tab) != 0)
{
const NdbError err= dict->getNdbError();
ERR_PRINT(err);
......@@ -3189,6 +3210,9 @@ int ha_ndbcluster::create(const char *name,
// Create secondary indexes
my_errno= build_index_list(form, ILBP_CREATE);
if (!my_errno)
my_errno= write_ndb_file();
DBUG_RETURN(my_errno);
}
......@@ -3265,14 +3289,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
set_tabname(from);
set_tabname(to, new_tabname);
if (check_ndb_connection()) {
my_errno= HA_ERR_NO_CONNECTION;
DBUG_RETURN(my_errno);
}
if (check_ndb_connection())
DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
int result= alter_table_name(m_tabname, new_tabname);
if (result == 0)
{
set_tabname(to);
handler::rename_table(from, to);
}
DBUG_RETURN(result);
}
......@@ -3316,6 +3342,8 @@ int ha_ndbcluster::delete_table(const char *name)
if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION);
handler::delete_table(name);
DBUG_RETURN(drop_table());
}
......@@ -3483,7 +3511,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
free_share(m_share); m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
DBUG_RETURN(get_metadata(name));
}
......@@ -3552,24 +3580,34 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
seize a Ndb object, assign it to current THD and use it.
Having a Ndb object also means that a connection to
NDB cluster has been opened. The connection is
checked.
NDB cluster has been opened.
*/
int ha_ndbcluster::check_ndb_connection()
Ndb* check_ndb_in_thd(THD* thd)
{
DBUG_ENTER("check_ndb_in_thd");
THD *thd= current_thd;
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
DBUG_ENTER("check_ndb_connection");
if (!thd_ndb)
{
thd_ndb= seize_thd_ndb();
if (!thd_ndb)
DBUG_RETURN(2);
DBUG_RETURN(NULL);
thd->transaction.thd_ndb= thd_ndb;
}
DBUG_RETURN(ndb);
}
int ha_ndbcluster::check_ndb_connection()
{
THD* thd= current_thd;
Ndb* ndb;
DBUG_ENTER("check_ndb_connection");
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
m_ndb= thd_ndb->ndb;
m_ndb->setDatabaseName(m_dbname);
DBUG_RETURN(0);
......@@ -3592,28 +3630,29 @@ void ndbcluster_close_connection(THD *thd)
Try to discover one table from NDB
*/
int ndbcluster_discover(const char *dbname, const char *name,
int ndbcluster_discover(THD* thd, const char *db, const char *name,
const void** frmblob, uint* frmlen)
{
uint len;
const void* data;
const NDBTAB* tab;
Ndb* ndb;
DBUG_ENTER("ndbcluster_discover");
DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
Ndb ndb(g_ndb_cluster_connection, dbname);
ndb.getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
if (ndb.init())
ERR_RETURN(ndb.getNdbError());
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
ndb->setDatabaseName(db);
if (ndb.waitUntilReady(0))
ERR_RETURN(ndb.getNdbError());
if (!(tab= ndb.getDictionary()->getTable(name)))
{
DBUG_PRINT("info", ("Table %s not found", name));
DBUG_RETURN(1);
NDBDICT* dict= ndb->getDictionary();
dict->invalidateTable(name);
if (!(tab= dict->getTable(name)))
{
const NdbError err= dict->getNdbError();
if (err.code == 709)
DBUG_RETURN(1);
ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
......@@ -3636,40 +3675,79 @@ int ndbcluster_discover(const char *dbname, const char *name,
}
#ifdef USE_DISCOVER_ON_STARTUP
int ndbcluster_can_discover(THD *thd, const char *name)
{
DBUG_ENTER("ndbcluster_can_discover");
DBUG_RETURN(!my_strcasecmp(system_charset_info, fn_ext(name), ha_ndb_ext))
}
/*
Check if a table exists in NDB
*/
int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
{
uint len;
const void* data;
const NDBTAB* tab;
Ndb* ndb;
DBUG_ENTER("ndbcluster_table_exists");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
ndb->setDatabaseName(db);
NDBDICT* dict= ndb->getDictionary();
dict->invalidateTable(name);
if (!(tab= dict->getTable(name)))
{
const NdbError err= dict->getNdbError();
if (err.code == 709)
DBUG_RETURN(0);
ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
DBUG_RETURN(1);
}
/*
Dicover tables from NDB Cluster
- fetch a list of tables from NDB
- store the frm file for each table on disk
- if the table has an attached frm file
- if the database of the table exists
List tables in NDB Cluster
*/
int ndb_discover_tables()
int ndbcluster_list_tables(THD* thd, HASH *tables, const char* db)
{
uint i;
NdbDictionary::Dictionary::List list;
NdbDictionary::Dictionary* dict;
char path[FN_REFLEN];
DBUG_ENTER("ndb_discover_tables");
Ndb* ndb;
DBUG_ENTER("ndbcluster_list_tables");
DBUG_PRINT("enter", ("db: %s", db));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
/* List tables in NDB Cluster kernel */
dict= g_ndb->getDictionary();
NDBDICT *dict= ndb->getDictionary();
if (dict->listObjects(list,
NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError());
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
if (create_table_from_handler(t.database, t.name, true))
DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
if (strcmp(t.database, db) == 0)
{
DBUG_PRINT("info", ("my_hash_insert %s", t.name));
(void)my_hash_insert(tables, (byte*)thd->strdup(t.name));;
}
}
DBUG_RETURN(0);
}
#endif
/*
......@@ -3721,10 +3799,6 @@ bool ndbcluster_init()
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
DBUG_RETURN(TRUE);
#endif
DBUG_RETURN(false);
}
......@@ -3738,7 +3812,6 @@ bool ndbcluster_init()
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
if(g_ndb)
delete g_ndb;
g_ndb= NULL;
......@@ -4109,4 +4182,30 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN(-1);
}
/*
Create a .ndb file to serve as a placeholder indicating
that the table with this name is a ndb table
*/
int ha_ndbcluster::write_ndb_file()
{
File file;
bool error=1;
char path[FN_REFLEN];
DBUG_ENTER("write_ndb_file");
DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
(void)strxnmov(path, FN_REFLEN,
mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
{
// It's an empty file
error=0;
my_close(file,MYF(0));
}
DBUG_RETURN(error);
}
#endif /* HAVE_NDBCLUSTER_DB */
......@@ -226,6 +226,8 @@ class ha_ndbcluster: public handler
int ndb_err(NdbConnection*);
bool uses_blob_value(bool all_fields);
int write_ndb_file();
private:
int check_ndb_connection();
......@@ -275,8 +277,11 @@ int ndbcluster_rollback(THD *thd, void* ndb_transaction);
void ndbcluster_close_connection(THD *thd);
int ndbcluster_discover(const char* dbname, const char* name,
int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen);
int ndbcluster_can_discover(THD *thd, const char *name);
int ndbcluster_list_tables(THD* thd, HASH* tables, const char* db);
int ndbcluster_table_exists(THD* thd, const char *db, const char *name);
int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op);
......@@ -1119,6 +1119,16 @@ void handler::print_error(int error, myf errflag)
case HA_ERR_NO_REFERENCED_ROW:
textno=ER_NO_REFERENCED_ROW;
break;
case HA_ERR_NO_SUCH_TABLE:
{
char *db;
char buff[FN_REFLEN];
uint length=dirname_part(buff,table->path);
buff[length-1]=0;
db=buff+dirname_length(buff);
my_error(ER_NO_SUCH_TABLE,MYF(0),db,table->table_name);
break;
}
default:
{
/* The error was "unknown" to this function.
......@@ -1265,6 +1275,71 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
DBUG_RETURN(error != 0);
}
/*
Try to discover table from engine and
if found, write the frm file to disk.
RETURN VALUES:
0 : Table existed in engine and created
on disk if so requested
1 : Table does not exist
>1 : error
*/
int ha_create_table_from_engine(THD* thd,
const char *db,
const char *name,
bool create_if_found)
{
int error= 0;
const void* frmblob = NULL;
uint frmlen = 0;
char path[FN_REFLEN];
HA_CREATE_INFO create_info;
TABLE table;
DBUG_ENTER("ha_create_table_from_engine");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
DBUG_PRINT("enter", ("create_if_found: %d", create_if_found));
bzero((char*) &create_info,sizeof(create_info));
if ((error= ha_discover(thd, db, name, &frmblob, &frmlen)))
DBUG_RETURN(error);
// Table exists in handler
if (create_if_found)
{
(void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS);
// Save the frm file
if ((error = writefrm(path, frmblob, frmlen)))
goto err_end;
if (openfrm(path,"",0,(uint) READ_ALL, 0, &table))
DBUG_RETURN(1);
update_create_info_from_table(&create_info, &table);
create_info.table_options|= HA_CREATE_FROM_ENGINE;
if (lower_case_table_names == 2 &&
!(table.file->table_flags() & HA_FILE_BASED))
{
/* Ensure that handler gets name in lower case */
strmov(path, name);
my_casedn_str(files_charset_info, path);
name= path;
}
error=table.file->create(path,&table,&create_info);
VOID(closefrm(&table));
}
err_end:
if (frmblob)
my_free((char*) frmblob,MYF(0));
DBUG_RETURN(error);
}
static int NEAR_F delete_file(const char *name,const char *ext,int extflag)
{
char buff[FN_REFLEN];
......@@ -1372,15 +1447,15 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
Try to discover one table from handler(s)
*/
int ha_discover(const char* dbname, const char* name,
const void** frmblob, uint* frmlen)
int ha_discover(THD* thd, const char* db, const char* name,
const void** frmblob, uint* frmlen)
{
int error= 1; // Table does not exist in any handler
DBUG_ENTER("ha_discover");
DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
#ifdef HAVE_NDBCLUSTER_DB
if (have_ndbcluster == SHOW_OPTION_YES)
error= ndbcluster_discover(dbname, name, frmblob, frmlen);
error= ndbcluster_discover(thd, db, name, frmblob, frmlen);
#endif
if (!error)
statistic_increment(ha_discover_count,&LOCK_status);
......@@ -1388,6 +1463,65 @@ int ha_discover(const char* dbname, const char* name,
}
/*
Ask handler if it would support discover of a file
with this name
RETURN
0 Does not recognise file
1 File can be discovered
*/
int ha_can_discover(THD* thd, const char* name)
{
int error= 0; // Can't discover this file name
DBUG_ENTER("ha_can_discover");
DBUG_PRINT("enter", ("name: %s", name));
#ifdef HAVE_NDBCLUSTER_DB
if (have_ndbcluster == SHOW_OPTION_YES)
error= ndbcluster_can_discover(thd, name);
#endif
DBUG_RETURN(error);
}
/*
Get a list of tables that exists in handler(s)
*/
int ha_list_tables(THD* thd, HASH *tables, const char* db)
{
int error= 0;
DBUG_ENTER("ha_list_tables");
DBUG_PRINT("enter", ("db: %s", db));
#ifdef HAVE_NDBCLUSTER_DB
if (have_ndbcluster == SHOW_OPTION_YES)
error= ndbcluster_list_tables(thd, tables, db);
#endif
DBUG_RETURN(error);
}
/*
Ask handler if the table exists in engine
RETURN
0 Table does not exist
1 Table exists
# Error code
*/
int ha_table_exists(THD* thd, const char* db, const char* name)
{
int error= 2;
DBUG_ENTER("ha_table_exists");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
#ifdef HAVE_NDBCLUSTER_DB
if (have_ndbcluster == SHOW_OPTION_YES)
error= ndbcluster_table_exists(thd, db, name);
#endif
DBUG_RETURN(error);
}
/*
Read first row between two ranges.
Store ranges for future calls to read_range_next
......
......@@ -540,6 +540,8 @@ void ha_close_connection(THD* thd);
enum db_type ha_checktype(enum db_type database_type);
int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
bool update_create_info);
int ha_create_table_from_engine(THD* thd, const char *db, const char *name,
bool create_if_found);
int ha_delete_table(enum db_type db_type, const char *path);
void ha_drop_database(char* path);
int ha_init_key_cache(const char *name, KEY_CACHE *key_cache);
......@@ -561,5 +563,9 @@ bool ha_flush_logs(void);
int ha_enable_transaction(THD *thd, bool on);
int ha_change_key_cache(KEY_CACHE *old_key_cache,
KEY_CACHE *new_key_cache);
int ha_discover(const char* dbname, const char* name,
int ha_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen);
int ha_list_tables(THD* thd, HASH *tables, const char* db);
int ha_table_exists(THD* thd, const char* db, const char* name);
int ha_can_discover(THD* thd, const char* name);
......@@ -1003,8 +1003,6 @@ int openfrm(const char *name,const char *alias,uint filestat,uint prgflag,
uint ha_open_flags, TABLE *outparam);
int readfrm(const char *name, const void** data, uint* length);
int writefrm(const char* name, const void* data, uint len);
int create_table_from_handler(const char *db, const char *name,
bool create_if_found);
int closefrm(TABLE *table);
db_type get_table_type(const char *name);
int read_string(File file, gptr *to, uint length);
......
......@@ -1343,7 +1343,7 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db,
*/
if (discover_retry_count++ != 0)
goto err;
if (create_table_from_handler(db, name, true) != 0)
if (ha_create_table_from_engine(thd, db, name, true) != 0)
goto err;
thd->clear_error(); // Clear error message
......
......@@ -358,30 +358,59 @@ int mysqld_show_column_types(THD *thd)
}
/*
Ask all engines if they can provide a list of available tables.
Returns a list of available tables.
*/
int
mysql_discover_tables(THD *thd, HASH *ha_tables, const char *db, bool dir)
{
DBUG_ENTER("mysql_discover_files");
if (dir)
DBUG_RETURN(0); // Discover of directories(databases) not supported yet
// Get list of files in storage engine
if (ha_list_tables(thd, ha_tables, db))
DBUG_RETURN(-1);
DBUG_PRINT("info",("discovered: %d files", ha_tables->records));
DBUG_RETURN(0);
}
/*
List all files or directories in a given location
Returns
files - list of files where wild card has been applied
all_files - list of all files
dsc_files - list of files which are discoverable
*/
int
mysql_find_files(THD *thd,List<char> *files, const char *db,const char *path,
const char *wild, bool dir)
mysql_list_files(THD *thd, const char *db, const char *path, const char *wild,
bool dir, List<char> *files, HASH *all_files, HASH* dsc_files)
{
uint i;
char *ext;
char *ext, **dsc_ext;
MY_DIR *dirp;
FILEINFO *file;
#ifndef NO_EMBEDDED_ACCESS_CHECKS
uint col_access=thd->col_access;
#endif
TABLE_LIST table_list;
DBUG_ENTER("mysql_find_files");
DBUG_ENTER("mysql_list_files");
if (wild && !wild[0])
wild=0;
bzero((char*) &table_list,sizeof(table_list));
if (!(dirp = my_dir(path,MYF(MY_WME | (dir ? MY_WANT_STAT : 0)))))
DBUG_RETURN(-1);
for (i=0 ; i < (uint) dirp->number_off_files ; i++)
for (i= 0; i < (uint)dirp->number_off_files; i++)
{
file=dirp->dir_entry+i;
file= dirp->dir_entry+i;
if (dir)
{ /* Return databases */
#ifdef USE_SYMDIR
......@@ -391,7 +420,7 @@ mysql_find_files(THD *thd,List<char> *files, const char *db,const char *path,
/* Only show the sym file if it points to a directory */
char buff[FN_REFLEN], *end;
MY_STAT status;
*ext=0; /* Remove extension */
*ext= 0; /* Remove extension */
unpack_dirname(buff, file->name);
end= strend(buff);
if (end != buff && end[-1] == FN_LIBCHAR)
......@@ -410,11 +439,36 @@ mysql_find_files(THD *thd,List<char> *files, const char *db,const char *path,
}
else
{
// Return only .frm files which aren't temp files.
if (my_strcasecmp(system_charset_info, ext=fn_ext(file->name),reg_ext) ||
is_prefix(file->name,tmp_file_prefix))
continue;
*ext=0;
// Don't process temp files
if (is_prefix(file->name, tmp_file_prefix))
continue;
ext= fn_ext(file->name);
// Check for files that indicates the table can be discovered
if (ha_can_discover(thd, file->name))
{
DBUG_PRINT("info", ("Discoverable file found: %s", file->name));
*ext= 0;
if (my_hash_insert(dsc_files, (byte*)thd->strdup(file->name)))
{
my_dirend(dirp);
DBUG_RETURN(-1);
}
continue;
}
// Return only .frm files
if (my_strcasecmp(system_charset_info, ext,reg_ext))
continue;
*ext=0;
// Insert into list of all .frm files
if (my_hash_insert(all_files, (byte*)thd->strdup(file->name)))
{
my_dirend(dirp);
DBUG_RETURN(-1);
}
if (wild)
{
if (lower_case_table_names)
......@@ -443,11 +497,114 @@ mysql_find_files(THD *thd,List<char> *files, const char *db,const char *path,
DBUG_RETURN(-1);
}
}
DBUG_PRINT("info",("found: %d files", files->elements));
my_dirend(dirp);
DBUG_RETURN(0);
}
extern "C" byte* ha_tables_get_key(const char *entry, uint *length,
my_bool not_used __attribute__((unused)))
{
*length= strlen(entry);
return (byte*) entry;
}
int
mysql_find_files(THD *thd,List<char> *files, const char *db,
const char *path, const char *wild, bool dir)
{
int error= -1;
uint i;
bool discovery_performed= false;
DBUG_ENTER("mysql_find_files");
DBUG_PRINT("enter", ("db: %s, path: %s, wild: %s, dir: %d",
db, path, wild, dir));
if (wild && !wild[0])
wild=0;
HASH ha_tables, all_files, dsc_files;
if (hash_init(&ha_tables,system_charset_info,32,0,0,
(hash_get_key) ha_tables_get_key,0,0) ||
hash_init(&all_files,system_charset_info,32,0,0,
(hash_get_key) ha_tables_get_key,0,0) ||
hash_init(&dsc_files,system_charset_info,32,0,0,
(hash_get_key) ha_tables_get_key,0,0))
goto err_end;
if (mysql_discover_tables(thd, &ha_tables, db, dir))
goto err_end;
if (mysql_list_files(thd, db, path, wild, dir,
files, &all_files, &dsc_files))
goto err_end;
/*
Discovery part 1
Loop through handler files and see if any of them should be discovered
*/
for (i= 0; i < ha_tables.records; i++)
{
const char *name = hash_element(&ha_tables, i);
if (hash_search(&all_files, name, strlen(name)))
continue;
// Table was in handler, but not in list of all tables
DBUG_PRINT("info", ("Table to discover[%d]: %s", i, name));
pthread_mutex_lock(&LOCK_open);
ha_create_table_from_engine(thd, db, name, true);
pthread_mutex_unlock(&LOCK_open);
discovery_performed= true;
}
/*
Discovery part2
Loop through dsc files and see if any of them need to be deleted
*/
for (i= 0; i < dsc_files.records; i++)
{
const char *name = hash_element(&dsc_files, i);
if (hash_search(&ha_tables, name, strlen(name)))
continue;
// Table was only on disk and not in handler
DBUG_PRINT("info", ("Table[%d]: %s only exists on disk", i, name));
// Verify that handler agrees table is gone.
if (ha_table_exists(thd, db, name) == 0)
{
// Delete the table and all related files
TABLE_LIST table_list;
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char*) db;
table_list.real_name=(char*)name;
(void)mysql_rm_table_part2_with_lock(thd, &table_list,
/* if_exists */ true,
/* drop_temporary */ false,
/* dont_log_query*/ true);
discovery_performed= true;
}
}
if (discovery_performed)
{
// Call mysql_list_files one more time to get an updated list
DBUG_PRINT("info", ("Calling mysql_list_files one more time"));
files->empty();
if (mysql_list_files(thd, db, path, wild, dir,
files, &all_files, &dsc_files))
goto err_end;
}
DBUG_PRINT("info",("found: %d files", files->elements));
error = 0;
err_end:
hash_free(&ha_tables);
hash_free(&all_files);
hash_free(&dsc_files);
DBUG_RETURN(error);
}
/***************************************************************************
Extended version of mysqld_show_tables
......
......@@ -222,7 +222,8 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists,
strxmov(path, mysql_data_home, "/", db, "/", alias, reg_ext, NullS);
(void) unpack_filename(path,path);
}
if (drop_temporary || access(path,F_OK))
if (drop_temporary ||
(access(path,F_OK) && ha_create_table_from_engine(thd,db,alias,true)))
{
if (if_exists)
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
......@@ -1243,8 +1244,8 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name,
{
bool create_if_not_exists =
create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
if (!create_table_from_handler(db, table_name,
create_if_not_exists))
if (!ha_create_table_from_engine(thd, db, table_name,
create_if_not_exists))
{
DBUG_PRINT("info", ("Table already existed in handler"));
......
......@@ -705,6 +705,14 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
outparam->crashed=((err == HA_ERR_CRASHED_ON_USAGE) &&
outparam->file->auto_repair() &&
!(ha_open_flags & HA_OPEN_FOR_REPAIR));
if (err==HA_ERR_NO_SUCH_TABLE)
{
/* The table did not exists in storage engine, use same error message
as if the .frm file didn't exist */
error= 1;
my_errno= ENOENT;
}
goto err_not_open; /* purecov: inspected */
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment