Commit 583d7761 authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new

parents bccd6a02 7412739f
......@@ -30,6 +30,7 @@ struct SubCreateReq {
friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
STATIC_CONST( SignalLength = 6 );
STATIC_CONST( SignalLength2 = 7 );
enum SubscriptionType {
SingleTableScan = 1, //
......@@ -50,6 +51,7 @@ struct SubCreateReq {
Uint32 subscriptionKey;
Uint32 subscriptionType;
Uint32 tableId;
Uint32 state;
};
struct SubCreateRef {
......
......@@ -1040,6 +1040,15 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
Subscription::REPORT_SUBSCRIBE : 0;
const Uint32 tableId = req.tableId;
Subscription::State state = (Subscription::State) req.state;
if (signal->getLength() != SubCreateReq::SignalLength2)
{
/*
api or restarted by older version
if restarted by old version, do the best we can
*/
state = Subscription::DEFINED;
}
Subscription key;
key.m_subscriptionId = subId;
......@@ -1067,6 +1076,17 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
addTableId(req.tableId, subPtr, 0);
}
} else {
if (c_startup.m_restart_server_node_id &&
refToNode(subRef) != c_startup.m_restart_server_node_id)
{
/**
* only allow "restart_server" Suma's to come through
* for restart purposes
*/
jam();
sendSubStartRef(signal, 1405);
DBUG_VOID_RETURN;
}
// Check that id/key is unique
if(c_subscriptions.find(subPtr, key)) {
jam();
......@@ -1090,7 +1110,7 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
subPtr.p->m_options = reportSubscribe | reportAll;
subPtr.p->m_tableId = tableId;
subPtr.p->m_table_ptrI = RNIL;
subPtr.p->m_state = Subscription::DEFINED;
subPtr.p->m_state = state;
subPtr.p->n_subscribers = 0;
subPtr.p->m_current_sync_ptrI = RNIL;
......@@ -1446,7 +1466,9 @@ Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbP
jam();
DBUG_ENTER("Suma::completeOneSubscriber");
if (tabPtr.p->m_error)
if (tabPtr.p->m_error &&
(c_startup.m_restart_server_node_id == 0 ||
tabPtr.p->m_state != Table::DROPPED))
{
sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
SubscriptionData::TableData);
......@@ -1531,8 +1553,44 @@ Suma::completeInitTable(Signal *signal, TablePtr tabPtr)
void
Suma::execGET_TABINFOREF(Signal* signal){
jamEntry();
/* ToDo handle this */
ndbrequire(false);
GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
Uint32 tableId = ref->tableId;
Uint32 senderData = ref->senderData;
GetTabInfoRef::ErrorCode errorCode =
(GetTabInfoRef::ErrorCode) ref->errorCode;
int do_resend_request = 0;
TablePtr tabPtr;
c_tablePool.getPtr(tabPtr, senderData);
switch (errorCode)
{
case GetTabInfoRef::TableNotDefined:
// wrong state
break;
case GetTabInfoRef::InvalidTableId:
// no such table
break;
case GetTabInfoRef::Busy:
do_resend_request = 1;
break;
case GetTabInfoRef::TableNameTooLong:
ndbrequire(false);
}
if (do_resend_request)
{
GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
req->senderRef = reference();
req->senderData = senderData;
req->requestType =
GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
req->tableId = tableId;
sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
30, GetTabInfoReq::SignalLength);
return;
}
tabPtr.p->m_state = Table::DROPPED;
tabPtr.p->m_error = errorCode;
completeAllSubscribers(signal, tabPtr);
completeInitTable(signal, tabPtr);
}
void
......@@ -2153,7 +2211,7 @@ Suma::execSUB_START_REQ(Signal* signal){
Subscription key;
key.m_subscriptionId = req->subscriptionId;
key.m_subscriptionKey = req->subscriptionKey;
if (c_startup.m_restart_server_node_id &&
refToNode(senderRef) != c_startup.m_restart_server_node_id)
{
......@@ -2173,13 +2231,24 @@ Suma::execSUB_START_REQ(Signal* signal){
DBUG_VOID_RETURN;
}
if (subPtr.p->m_state != Subscription::DEFINED) {
if (subPtr.p->m_state == Subscription::LOCKED) {
jam();
DBUG_PRINT("info",("Locked"));
sendSubStartRef(signal, 1411);
DBUG_VOID_RETURN;
}
if (subPtr.p->m_state == Subscription::DROPPED &&
c_startup.m_restart_server_node_id == 0) {
jam();
DBUG_PRINT("info",("Dropped"));
sendSubStartRef(signal, 1418);
DBUG_VOID_RETURN;
}
ndbrequire(subPtr.p->m_state == Subscription::DEFINED ||
c_startup.m_restart_server_node_id);
SubscriberPtr subbPtr;
if(!c_subscriberPool.seize(subbPtr)){
jam();
......@@ -2193,7 +2262,8 @@ Suma::execSUB_START_REQ(Signal* signal){
c_subscriber_nodes.set(refToNode(subscriberRef));
// setup subscription record
subPtr.p->m_state = Subscription::LOCKED;
if (subPtr.p->m_state == Subscription::DEFINED)
subPtr.p->m_state = Subscription::LOCKED;
// store these here for later use
subPtr.p->m_senderRef = senderRef;
subPtr.p->m_senderData = senderData;
......@@ -2241,8 +2311,14 @@ Suma::sendSubStartComplete(Signal* signal,
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
ndbrequire( subPtr.p->m_state == Subscription::LOCKED )
subPtr.p->m_state = Subscription::DEFINED;
ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
(subPtr.p->m_state == Subscription::DROPPED &&
c_startup.m_restart_server_node_id));
if (subPtr.p->m_state == Subscription::LOCKED)
{
jam();
subPtr.p->m_state = Subscription::DEFINED;
}
subPtr.p->n_subscribers++;
DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
......@@ -2293,8 +2369,14 @@ Suma::sendSubStartRef(Signal* signal,
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
ndbrequire( subPtr.p->m_state == Subscription::LOCKED );
subPtr.p->m_state = Subscription::DEFINED;
ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
(subPtr.p->m_state == Subscription::DROPPED &&
c_startup.m_restart_server_node_id));
if (subPtr.p->m_state == Subscription::LOCKED)
{
jam();
subPtr.p->m_state = Subscription::DEFINED;
}
SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
ref->senderRef = reference();
......@@ -2360,6 +2442,18 @@ Suma::execSUB_STOP_REQ(Signal* signal){
DBUG_VOID_RETURN;
}
if (c_startup.m_restart_server_node_id &&
refToNode(senderRef) != c_startup.m_restart_server_node_id)
{
/**
* only allow "restart_server" Suma's to come through
* for restart purposes
*/
jam();
sendSubStopRef(signal, 1405);
DBUG_VOID_RETURN;
}
if (subPtr.p->m_state == Subscription::LOCKED) {
jam();
DBUG_PRINT("error", ("locked"));
......@@ -3668,7 +3762,17 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
sendSubRemoveRef(signal, req, 1413);
DBUG_VOID_RETURN;
}
if (subPtr.p->m_state == Subscription::DROPPED)
{
/**
* already dropped
*/
jam();
sendSubRemoveRef(signal, req, 1419);
DBUG_VOID_RETURN;
}
ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));
if (subPtr.p->n_subscribers == 0)
......@@ -3981,8 +4085,9 @@ Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
case SubCreateReq::TableEvent:
jam();
req->tableId = subPtr.p->m_tableId;
req->state = subPtr.p->m_state;
suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
SubCreateReq::SignalLength, JBB);
SubCreateReq::SignalLength2, JBB);
DBUG_VOID_RETURN;
case SubCreateReq::SingleTableScan:
jam();
......
......@@ -475,6 +475,8 @@ ErrorBundle ErrorCodes[] = {
{ 1415, DMEC, SE, "Subscription not unique in subscriber manager" },
{ 1416, DMEC, IS, "Can't accept more subscriptions, out of space in pool" },
{ 1417, DMEC, SE, "Table in suscription not defined, probably dropped" },
{ 1418, DMEC, SE, "Subscription dropped, no new subscribers allowed" },
{ 1419, DMEC, SE, "Subscription already dropped" },
{ 4004, DMEC, AE, "Attribute name not found in the Table" },
......
......@@ -101,6 +101,40 @@ static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
return NDBT_OK;
}
static
NdbEventOperation *createEventOperation(Ndb *ndb,
const NdbDictionary::Table &tab,
int do_report_error = 1)
{
char buf[1024];
sprintf(buf, "%s_EVENT", tab.getName());
NdbEventOperation *pOp= ndb->createEventOperation(buf);
if (pOp == 0)
{
if (do_report_error)
g_err << "createEventOperation: "
<< ndb->getNdbError().code << " "
<< ndb->getNdbError().message << endl;
return 0;
}
int n_columns= tab.getNoOfColumns();
for (int j = 0; j < n_columns; j++)
{
pOp->getValue(tab.getColumn(j)->getName());
pOp->getPreValue(tab.getColumn(j)->getName());
}
if ( pOp->execute() )
{
if (do_report_error)
g_err << "pOp->execute(): "
<< pOp->getNdbError().code << " "
<< pOp->getNdbError().message << endl;
ndb->dropEventOperation(pOp);
return 0;
}
return pOp;
}
static int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step)
{
if (createEvent(GETNDB(step),* ctx->getTab()) != 0){
......@@ -870,7 +904,7 @@ static int createAllEvents(NDBT_Context* ctx, NDBT_Step* step)
static int dropAllEvents(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("createAllEvents");
DBUG_ENTER("dropAllEvents");
Ndb * ndb= GETNDB(step);
int i;
......@@ -1212,6 +1246,18 @@ static int createEventOperations(Ndb * ndb)
DBUG_RETURN(NDBT_OK);
}
static int createAllEventOperations(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("createAllEventOperations");
Ndb * ndb= GETNDB(step);
int r= createEventOperations(ndb);
if (r != NDBT_OK)
{
DBUG_RETURN(NDBT_FAILED);
}
DBUG_RETURN(NDBT_OK);
}
static int dropEventOperations(Ndb * ndb)
{
DBUG_ENTER("dropEventOperations");
......@@ -1228,6 +1274,18 @@ static int dropEventOperations(Ndb * ndb)
DBUG_RETURN(NDBT_OK);
}
static int dropAllEventOperations(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("dropAllEventOperations");
Ndb * ndb= GETNDB(step);
int r= dropEventOperations(ndb);
if (r != NDBT_OK)
{
DBUG_RETURN(NDBT_FAILED);
}
DBUG_RETURN(NDBT_OK);
}
static int runMulti(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("runMulti");
......@@ -1409,6 +1467,87 @@ static int runMulti_NR(NDBT_Context* ctx, NDBT_Step* step)
DBUG_RETURN(NDBT_OK);
}
static int restartAllNodes()
{
NdbRestarter restarter;
int id = 0;
do {
int nodeId = restarter.getDbNodeId(id++);
ndbout << "Restart node " << nodeId << endl;
if(restarter.restartOneDbNode(nodeId, false, false, true) != 0){
g_err << "Failed to restartNextDbNode" << endl;
break;
}
if(restarter.waitClusterStarted(60) != 0){
g_err << "Cluster failed to start" << endl;
break;
}
id = id % restarter.getNumDbNodes();
} while (id);
return id != 0;
}
static int runCreateDropNR(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("runCreateDropNR");
Ndb * ndb= GETNDB(step);
int result = NDBT_OK;
NdbRestarter restarter;
int loops = ctx->getNumLoops();
if (restarter.getNumDbNodes() < 2)
{
ctx->stopTest();
return NDBT_OK;
}
do
{
result = NDBT_FAILED;
const NdbDictionary::Table* pTab = ctx->getTab();
if (createEvent(ndb, *pTab))
{
g_err << "createEvent failed" << endl;
break;
}
NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
if (pOp == 0)
{
g_err << "Failed to createEventOperation" << endl;
break;
}
if (dropEvent(ndb, *pTab))
{
g_err << "Failed to dropEvent()" << endl;
break;
}
ndbout << "Restarting with dropped events with subscribers" << endl;
if (restartAllNodes())
break;
if (ndb->getDictionary()->dropTable(pTab->getName()) != 0){
g_err << "Failed to drop " << pTab->getName() <<" in db" << endl;
break;
}
ndbout << "Restarting with dropped events and dropped "
<< "table with subscribers" << endl;
if (restartAllNodes())
break;
if (ndb->dropEventOperation(pOp))
{
g_err << "Failed dropEventOperation" << endl;
break;
}
NdbDictionary::Table tmp(*pTab);
tmp.setNodeGroupIds(0, 0);
if (ndb->getDictionary()->createTable(tmp) != 0){
g_err << "createTable failed: "
<< ndb->getDictionary()->getNdbError() << endl;
break;
}
result = NDBT_OK;
} while (--loops);
DBUG_RETURN(result);
}
NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation",
......@@ -1492,6 +1631,11 @@ TESTCASE("Multi_NR",
FINALIZER(dropAllShadows);
FINALIZER(dropAllEvents);
}
TESTCASE("CreateDropNR",
"Verify that we can Create and Drop in any order"
"NOTE! No errors are allowed!" ){
FINALIZER(runCreateDropNR);
}
NDBT_TESTSUITE_END(test_event);
int main(int argc, const char** argv){
......
......@@ -218,6 +218,11 @@ max-time: 2500
cmd: test_event
args: -n Multi
#
max-time: 2500
cmd: test_event
args: -n CreateDropNR -l 2
max-time: 600
cmd: testBasic
args: -n PkRead T1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment