Commit 7099d8e6 authored by unknown's avatar unknown

Merge mysql.com:/home/jonas/src/mysql-4.1-fix

into mysql.com:/home/jonas/src/mysql-5.0-ndb


ndb/tools/restore/consumer_restore.cpp:
  Auto merged
parents ff51fe45 b181f610
......@@ -52,19 +52,10 @@ BackupRestore::init()
return false;
}
m_tuples = new TupleS[m_parallelism];
if (m_tuples == 0)
{
err << "Failed to allocate tuples" << endl;
return false;
}
m_free_callback= m_callback;
for (Uint32 i= 0; i < m_parallelism; i++) {
m_callback[i].restore= this;
m_callback[i].connection= 0;
m_callback[i].tup= &m_tuples[i];
if (i > 0)
m_callback[i-1].next= &(m_callback[i]);
}
......@@ -86,12 +77,6 @@ void BackupRestore::release()
delete [] m_callback;
m_callback= 0;
}
if (m_tuples)
{
delete [] m_tuples;
m_tuples= 0;
}
}
BackupRestore::~BackupRestore()
......@@ -118,14 +103,21 @@ BackupRestore::get_table(const NdbDictionary::Table* tab){
m_cache.m_old_table = tab;
int cnt, id1, id2;
char buf[256];
if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
BaseString::snprintf(buf, sizeof(buf), "NDB$BLOB_%d_%d", m_new_tables[id1]->getTableId(), id2);
m_cache.m_new_table = m_ndb->getDictionary()->getTable(buf);
char db[256], schema[256];
if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d",
db, schema, &id1, &id2)) == 4){
m_ndb->setDatabaseName(db);
m_ndb->setSchemaName(schema);
BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d",
m_new_tables[id1]->getTableId(), id2);
m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
} else {
m_cache.m_new_table = m_new_tables[tab->getTableId()];
}
assert(m_cache.m_new_table);
return m_cache.m_new_table;
}
......@@ -266,6 +258,14 @@ void BackupRestore::tuple(const TupleS & tup)
if (!m_restore)
return;
while (m_free_callback == 0)
{
assert(m_transactions == m_parallelism);
// send-poll all transactions
// close transaction is done in callback
m_ndb->sendPollNdb(3000, 1);
}
restore_callback_t * cb = m_free_callback;
if (cb == 0)
......@@ -273,15 +273,9 @@ void BackupRestore::tuple(const TupleS & tup)
m_free_callback = cb->next;
cb->retries = 0;
*(cb->tup) = tup; // must do copy!
cb->tup = tup; // must do copy!
tuple_a(cb);
if (m_free_callback == 0)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb->sendPollNdb(3000, 1);
}
}
void BackupRestore::tuple_a(restore_callback_t *cb)
......@@ -303,7 +297,7 @@ void BackupRestore::tuple_a(restore_callback_t *cb)
exitHandler();
} // if
const TupleS &tup = *(cb->tup);
const TupleS &tup = cb->tup;
const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
NdbOperation * op = cb->connection->getNdbOperation(table);
......@@ -372,7 +366,9 @@ void BackupRestore::tuple_a(restore_callback_t *cb)
m_transactions++;
return;
}
err << "Unable to recover from errors. Exiting..." << endl;
err << "Retried transaction " << cb->retries << " times.\nLast error"
<< m_ndb->getNdbError(cb->error_code) << endl
<< "...Unable to recover from errors. Exiting..." << endl;
exitHandler();
}
......@@ -416,7 +412,12 @@ bool BackupRestore::errorHandler(restore_callback_t *cb)
NdbError error= cb->connection->getNdbError();
m_ndb->closeTransaction(cb->connection);
cb->connection= 0;
Uint32 sleepTime = 100 + cb->retries * 300;
cb->retries++;
cb->error_code = error.code;
switch(error.status)
{
case NdbError::Success:
......@@ -425,7 +426,7 @@ bool BackupRestore::errorHandler(restore_callback_t *cb)
break;
case NdbError::TemporaryError:
NdbSleep_MilliSleep(10);
NdbSleep_MilliSleep(sleepTime);
return true;
// RETRY
break;
......@@ -438,15 +439,6 @@ bool BackupRestore::errorHandler(restore_callback_t *cb)
default:
case NdbError::PermanentError:
switch (error.code)
{
case 499:
case 250:
NdbSleep_MilliSleep(10);
return true; //temp errors?
default:
break;
}
//ERROR
err << error << endl;
return false;
......@@ -468,13 +460,10 @@ BackupRestore::tuple_free()
if (!m_restore)
return;
if (m_transactions > 0) {
// Send all transactions to NDB
m_ndb->sendPreparedTransactions(0);
// Poll all transactions
while (m_transactions > 0)
m_ndb->pollNdb(3000, m_transactions);
while (m_transactions)
{
m_ndb->sendPollNdb(3000);
}
}
......
......@@ -21,9 +21,10 @@
struct restore_callback_t {
class BackupRestore *restore;
class TupleS *tup;
class TupleS tup;
class NdbConnection *connection;
int retries;
int error_code;
restore_callback_t *next;
};
......@@ -39,7 +40,6 @@ public:
m_restore_meta = false;
m_parallelism = parallelism;
m_callback = 0;
m_tuples = 0;
m_free_callback = 0;
m_transactions = 0;
m_cache.m_old_table = 0;
......@@ -68,9 +68,8 @@ public:
Uint32 m_dataCount;
Uint32 m_parallelism;
Uint32 m_transactions;
volatile Uint32 m_transactions;
TupleS *m_tuples;
restore_callback_t *m_callback;
restore_callback_t *m_free_callback;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment