Commit cfad8ecc authored by tomas@poseidon.(none)'s avatar tomas@poseidon.(none)

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into poseidon.(none):/home/tomas/mysql-4.1-ndb
parents 1cc60149 77cd417c
......@@ -5,6 +5,7 @@
# The previous step has simply removed the frm file
# from disk, but left the table in NDB
#
--sleep 3;
select * from t9 order by a;
# handler_discover should be zero
......
......@@ -246,6 +246,7 @@ Ndb::waitUntilReady(int timeout)
int secondsCounter = 0;
int milliCounter = 0;
int noChecksSinceFirstAliveFound = 0;
int id;
if (theInitState != Initialised) {
// Ndb::init is not called
......@@ -254,39 +255,48 @@ Ndb::waitUntilReady(int timeout)
}
do {
unsigned int foundAliveNode = 0;
TransporterFacade *tp = TransporterFacade::instance();
tp->lock_mutex();
for (unsigned int i = 0; i < theNoOfDBnodes; i++) {
const NodeId nodeId = theDBnodes[i];
//************************************************
// If any node is answering, ndb is answering
//************************************************
if (tp->get_node_alive(nodeId) != 0) {
foundAliveNode++;
if ((id = theNode) != 0) {
unsigned int foundAliveNode = 0;
TransporterFacade *tp = TransporterFacade::instance();
tp->lock_mutex();
for (unsigned int i = 0; i < theNoOfDBnodes; i++) {
const NodeId nodeId = theDBnodes[i];
//************************************************
// If any node is answering, ndb is answering
//************************************************
if (tp->get_node_alive(nodeId) != 0) {
foundAliveNode++;
}//if
}//for
tp->unlock_mutex();
if (foundAliveNode == theNoOfDBnodes) {
DBUG_RETURN(0);
}//if
}//for
tp->unlock_mutex();
if (foundAliveNode == theNoOfDBnodes) {
DBUG_RETURN(0);
}//if
if (foundAliveNode > 0) {
noChecksSinceFirstAliveFound++;
}//if
if (noChecksSinceFirstAliveFound > 30) {
DBUG_RETURN(0);
}//if
if (foundAliveNode > 0) {
noChecksSinceFirstAliveFound++;
}//if
if (noChecksSinceFirstAliveFound > 30) {
DBUG_RETURN(0);
}//if
}//if theNode != 0
if (secondsCounter >= timeout)
break;
NdbSleep_MilliSleep(100);
milliCounter += 100;
if (milliCounter >= 1000) {
secondsCounter++;
milliCounter = 0;
}//if
} while ( secondsCounter < timeout );
} while (1);
if (id == 0) {
theError.code = 4269;
DBUG_RETURN(-1);
}
if (noChecksSinceFirstAliveFound > 0) {
DBUG_RETURN(0);
}//if
theError.code = 4009;
DBUG_RETURN(-1);
}
......@@ -789,8 +799,10 @@ Ndb::readAutoIncrementValue(const char* aTableName)
{
DEBUG_TRACE("readtAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
if (table == 0) {
theError= theDictionary->getNdbError();
return ~0;
}
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
return tupleId;
}
......@@ -821,8 +833,10 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
{
DEBUG_TRACE("setAutoIncrementValue " << val);
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
if (table == 0) {
theError= theDictionary->getNdbError();
return false;
}
return setTupleIdInNdb(table->m_tableId, val, increase);
}
......@@ -841,8 +855,10 @@ Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
DEBUG_TRACE("setTupleIdInNdb");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
if (table == 0) {
theError= theDictionary->getNdbError();
return false;
}
return setTupleIdInNdb(table->m_tableId, val, increase);
}
......
......@@ -1500,8 +1500,11 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
if (!alter && haveAutoIncrement) {
// if (!ndb.setAutoIncrementValue(impl.m_internalName.c_str(), autoIncrementValue)) {
if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(), autoIncrementValue)) {
m_error.code = 4336;
ndb.theError = m_error;
if (ndb.theError.code == 0) {
m_error.code = 4336;
ndb.theError = m_error;
} else
m_error= ndb.theError;
ret = -1; // errorcode set in initialize_autoincrement
}
}
......
......@@ -185,10 +185,10 @@ Ndb::executeMessage(void* NdbObject,
void Ndb::connected(Uint32 ref)
{
theMyRef= ref;
theNode= refToNode(ref);
Uint32 tmpTheNode= refToNode(ref);
Uint64 tBlockNo= refToBlock(ref);
if (theNdbBlockNumber >= 0){
assert(theMyRef == numberToRef(theNdbBlockNumber, theNode));
assert(theMyRef == numberToRef(theNdbBlockNumber, tmpTheNode));
}
TransporterFacade * theFacade = TransporterFacade::instance();
......@@ -201,18 +201,19 @@ void Ndb::connected(Uint32 ref)
}
}
theFirstTransId = ((Uint64)tBlockNo << 52)+
((Uint64)theNode << 40);
((Uint64)tmpTheNode << 40);
theFirstTransId += theFacade->m_max_trans_id;
// assert(0);
DBUG_PRINT("info",("connected with ref=%x, id=%d, no_db_nodes=%d, first_trans_id=%lx",
theMyRef,
theNode,
tmpTheNode,
theNoOfDBnodes,
theFirstTransId));
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
theCommitAckSignal = new NdbApiSignal(theMyRef);
theDictionary->m_receiver.m_reference= theMyRef;
theNode= tmpTheNode; // flag that Ndb object is initialized
}
void
......
......@@ -125,7 +125,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theNode= 0;
theFirstTransId= 0;
theMyRef= 0;
theNoOfDBnodes= 0;
fullyQualifiedNames = true;
......
......@@ -54,14 +54,15 @@ void Ndb_cluster_connection::connect_thread()
{
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
int r;
while (g_run_connect_thread) {
do {
if ((r = connect(1)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
abort();
DBUG_ASSERT(false);
g_run_connect_thread= 0;
}
}
} while (g_run_connect_thread);
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
......@@ -69,13 +70,19 @@ void Ndb_cluster_connection::connect_thread()
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
{
int r;
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
m_connect_callback= connect_callback;
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this,
32768,
"ndb_cluster_connection",
NDB_THREAD_PRIO_LOW);
if ((r = connect(1)) == 1)
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this,
32768,
"ndb_cluster_connection",
NDB_THREAD_PRIO_LOW);
else if (r < 0)
DBUG_RETURN(-1)
else if (m_connect_callback)
(*m_connect_callback)();
DBUG_RETURN(0);
}
......
......@@ -424,7 +424,8 @@ ErrorBundle ErrorCodes[] = {
{ 4266, AE, "Invalid blob seek position" },
{ 4267, IE, "Corrupted blob value" },
{ 4268, IE, "Error in blob head update forced rollback of transaction" },
{ 4268, IE, "Unknown blob error" }
{ 4268, IE, "Unknown blob error" },
{ 4269, IE, "No connection to ndb management server" }
};
static
......
......@@ -69,6 +69,7 @@ typedef NdbDictionary::Dictionary NDBDICT;
bool ndbcluster_inited= false;
static Ndb* g_ndb= NULL;
static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;
......@@ -3183,12 +3184,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_active_trans(NULL),
m_active_cursor(NULL),
m_ndb(NULL),
m_share(0),
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
HA_NOT_EXACT_COUNT |
HA_NO_PREFIX_CHAR_KEYS),
m_share(0),
m_use_write(false),
retrieve_all_fields(FALSE),
rows_to_insert(1),
......@@ -3196,8 +3197,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
bulk_insert_rows(1024),
bulk_insert_not_flushed(false),
ops_pending(0),
blobs_pending(0),
skip_auto_increment(true),
blobs_pending(0),
blobs_buffer(0),
blobs_buffer_size(0),
dupkey((uint) -1)
......@@ -3311,7 +3312,7 @@ Ndb* ha_ndbcluster::seize_ndb()
// Seize from pool
ndb= Ndb::seize();
#else
ndb= new Ndb("");
ndb= new Ndb(g_ndb_cluster_connection, "");
#endif
if (ndb->init(max_transactions) != 0)
{
......@@ -3395,8 +3396,12 @@ int ndbcluster_discover(const char *dbname, const char *name,
DBUG_ENTER("ndbcluster_discover");
DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
Ndb ndb(dbname);
if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0))
Ndb ndb(g_ndb_cluster_connection, dbname);
if (ndb.init())
ERR_RETURN(ndb.getNdbError());
if (ndb.waitUntilReady(0))
ERR_RETURN(ndb.getNdbError());
if (!(tab= ndb.getDictionary()->getTable(name)))
......@@ -3471,21 +3476,24 @@ bool ndbcluster_init()
DBUG_ENTER("ndbcluster_init");
// Set connectstring if specified
if (ndbcluster_connectstring != 0)
{
DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring));
Ndb::setConnectString(ndbcluster_connectstring);
if ((g_ndb_cluster_connection=
new Ndb_cluster_connection(ndbcluster_connectstring)) == 0)
{
DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring));
DBUG_RETURN(TRUE);
}
// Create a Ndb object to open the connection to NDB
g_ndb= new Ndb("sys");
if (g_ndb->init() != 0)
if (g_ndb_cluster_connection->start_connect_thread())
{
ERR_PRINT (g_ndb->getNdbError());
DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
DBUG_RETURN(TRUE);
}
if (g_ndb->waitUntilReady() != 0)
// Create a Ndb object to open the connection to NDB
g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
if (g_ndb->init() != 0)
{
ERR_PRINT (g_ndb->getNdbError());
DBUG_RETURN(TRUE);
DBUG_RETURN(TRUE);
}
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
......@@ -3511,6 +3519,9 @@ bool ndbcluster_end()
delete g_ndb;
g_ndb= NULL;
if (g_ndb_cluster_connection)
delete g_ndb_cluster_connection;
g_ndb_cluster_connection= NULL;
if (!ndbcluster_inited)
DBUG_RETURN(0);
hash_free(&ndbcluster_open_tables);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment