Commit 34d0ac17 authored by stewart@willster.(none)'s avatar stewart@willster.(none)

Merge ssmith@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb

into  willster.(none):/home/stewart/Documents/MySQL/5.1/ndb
parents 10ff2f95 c306b457
......@@ -35,7 +35,7 @@ create table t3(a int, KEY a_data (a));
create table mysqltest.t4(a int);
create table t5 (id int auto_increment primary key);
insert into t5 values (10);
create view v1 (c) as select table_name from information_schema.TABLES;
create view v1 (c) as select table_name from information_schema.TABLES where table_schema!='cluster';
select * from v1;
c
CHARACTER_SETS
......@@ -61,7 +61,6 @@ TABLE_PRIVILEGES
TRIGGERS
USER_PRIVILEGES
VIEWS
binlog_index
columns_priv
db
event
......@@ -744,11 +743,12 @@ create view v1 as select * from t1, t2;
set @got_val= (select count(*) from information_schema.columns);
drop view v1;
drop table t1, t2;
use test;
CREATE TABLE t_crashme ( f1 BIGINT);
CREATE VIEW a1 (t_CRASHME) AS SELECT f1 FROM t_crashme GROUP BY f1;
CREATE VIEW a2 AS SELECT t_CRASHME FROM a1;
count(*)
113
68
drop view a2, a1;
drop table t_crashme;
select table_schema,table_name, column_name from
......@@ -845,9 +845,8 @@ VIEWS TABLE_NAME select
delete from mysql.user where user='mysqltest_4';
delete from mysql.db where user='mysqltest_4';
flush privileges;
SELECT table_schema, count(*) FROM information_schema.TABLES GROUP BY TABLE_SCHEMA;
SELECT table_schema, count(*) FROM information_schema.TABLES where TABLE_SCHEMA!='cluster' GROUP BY TABLE_SCHEMA;
table_schema count(*)
cluster 1
information_schema 23
mysql 21
create table t1 (i int, j int);
......
......@@ -37,7 +37,7 @@ create table t3(a int, KEY a_data (a));
create table mysqltest.t4(a int);
create table t5 (id int auto_increment primary key);
insert into t5 values (10);
create view v1 (c) as select table_name from information_schema.TABLES;
create view v1 (c) as select table_name from information_schema.TABLES where table_schema!='cluster';
select * from v1;
select c,table_name from v1
......@@ -448,7 +448,7 @@ drop table t1, t2;
#
# Bug #7476: crash on SELECT * FROM INFORMATION_SCHEMA.TABLES
#
use test;
CREATE TABLE t_crashme ( f1 BIGINT);
CREATE VIEW a1 (t_CRASHME) AS SELECT f1 FROM t_crashme GROUP BY f1;
CREATE VIEW a2 AS SELECT t_CRASHME FROM a1;
......@@ -462,7 +462,7 @@ while ($tab_count)
--disable_result_log
SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES;
--enable_result_log
SELECT count(*) FROM INFORMATION_SCHEMA.TABLES;
SELECT count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='test';
let $tab_count= 65;
while ($tab_count)
{
......@@ -528,7 +528,7 @@ flush privileges;
# Bug #9404 information_schema: Weird error messages
# with SELECT SUM() ... GROUP BY queries
#
SELECT table_schema, count(*) FROM information_schema.TABLES GROUP BY TABLE_SCHEMA;
SELECT table_schema, count(*) FROM information_schema.TABLES where TABLE_SCHEMA!='cluster' GROUP BY TABLE_SCHEMA;
#
......
......@@ -229,14 +229,23 @@ Backup::execCONTINUEB(Signal* signal)
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptr_I);
if (tabPtr_I == RNIL)
{
closeFiles(signal, ptr);
return;
}
jam();
TablePtr tabPtr;
ptr.p->tables.getPtr(tabPtr, tabPtr_I);
jam();
if(tabPtr.p->fragments.getSize())
{
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
BackupFilePtr filePtr;
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
Uint32 * dst;
if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
......@@ -259,6 +268,8 @@ Backup::execCONTINUEB(Signal* signal)
filePtr.p->operation.dataBuffer.updateWritePtr(sz);
fragPtr_I++;
}
if (fragPtr_I == tabPtr.p->fragments.getSize())
{
signal->theData[0] = tabPtr.p->tableId;
......@@ -2040,6 +2051,12 @@ Backup::sendDropTrig(Signal* signal, BackupRecordPtr ptr)
TablePtr tabPtr;
ptr.p->tables.first(tabPtr);
if(tabPtr.i == RNIL)
{
closeFiles(signal, ptr);
return;
}
signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO;
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
......
......@@ -161,8 +161,15 @@ private:
int try_reconnect;
int m_error;
struct NdbThread* m_event_thread;
NdbMutex *m_print_mutex;
};
struct event_thread_param {
NdbMgmHandle *m;
NdbMutex **p;
};
NdbMutex* print_mutex;
/*
* Facade object for CommandInterpreter
......@@ -340,6 +347,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
m_connected= false;
m_event_thread= 0;
try_reconnect = 0;
m_print_mutex= NdbMutex_Create();
}
/*
......@@ -348,6 +356,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
CommandInterpreter::~CommandInterpreter()
{
disconnect();
NdbMutex_Destroy(m_print_mutex);
}
static bool
......@@ -384,11 +393,13 @@ CommandInterpreter::printError()
static int do_event_thread;
static void*
event_thread_run(void* m)
event_thread_run(void* p)
{
DBUG_ENTER("event_thread_run");
NdbMgmHandle handle= *(NdbMgmHandle*)m;
struct event_thread_param param= *(struct event_thread_param*)p;
NdbMgmHandle handle= *(param.m);
NdbMutex* printmutex= *(param.p);
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
1, NDB_MGM_EVENT_CATEGORY_STARTUP,
......@@ -406,8 +417,12 @@ event_thread_run(void* m)
{
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
if(tmp && strlen(tmp))
{
Guard g(printmutex);
ndbout << tmp;
}
}
} while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
}
......@@ -459,8 +474,11 @@ CommandInterpreter::connect()
assert(m_event_thread == 0);
assert(do_event_thread == 0);
do_event_thread= 0;
struct event_thread_param p;
p.m= &m_mgmsrv2;
p.p= &m_print_mutex;
m_event_thread = NdbThread_Create(event_thread_run,
(void**)&m_mgmsrv2,
(void**)&p,
32768,
"CommandInterpreted_event_thread",
NDB_THREAD_PRIO_LOW);
......@@ -547,6 +565,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect,
int result= execute_impl(_line);
if (error)
*error= m_error;
return result;
}
......@@ -626,6 +645,7 @@ CommandInterpreter::execute_impl(const char *_line)
DBUG_RETURN(true);
if (strcasecmp(firstToken, "SHOW") == 0) {
Guard g(m_print_mutex);
executeShow(allAfterFirstToken);
DBUG_RETURN(true);
}
......@@ -853,6 +873,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun,
ndbout_c("Trying to start all nodes of system.");
ndbout_c("Use ALL STATUS to see the system start-up phases.");
} else {
Guard g(m_print_mutex);
struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
if(cl == 0){
ndbout_c("Unable get status from management server");
......
......@@ -77,7 +77,6 @@
}\
}
extern int global_flag_send_heartbeat_now;
extern int g_no_nodeid_checks;
extern my_bool opt_core;
......@@ -1450,6 +1449,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort)
#include <ClusterMgr.hpp>
void
MgmtSrvr::updateStatus()
{
theFacade->theClusterMgr->forceHB();
}
int
MgmtSrvr::status(int nodeId,
ndb_mgm_node_status * _status,
......@@ -2260,7 +2265,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (found_matching_type && !found_free_node) {
// we have a temporary error which might be due to that
// we have got the latest connect status from db-nodes. Force update.
global_flag_send_heartbeat_now= 1;
updateStatus();
}
BaseString type_string, type_c_string;
......@@ -2603,7 +2608,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
if (!m_reserved_nodes.isclear()) {
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
// node has been reserved, force update signal to ndb nodes
global_flag_send_heartbeat_now= 1;
m_mgmsrv.updateStatus();
char tmp_str[128];
m_mgmsrv.m_reserved_nodes.getText(tmp_str);
......
......@@ -485,6 +485,8 @@ public:
void get_connected_nodes(NodeBitmask &connected_nodes) const;
SocketServer *get_socket_server() { return m_socket_server; }
void updateStatus();
//**************************************************************************
private:
//**************************************************************************
......
......@@ -972,6 +972,7 @@ printNodeStatus(OutputStream *output,
MgmtSrvr &mgmsrv,
enum ndb_mgm_node_type type) {
NodeId nodeId = 0;
mgmsrv.updateStatus();
while(mgmsrv.getNextNodeId(&nodeId, type)) {
enum ndb_mgm_node_status status;
Uint32 startPhase = 0,
......
......@@ -37,8 +37,8 @@
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
int global_flag_send_heartbeat_now= 0;
int global_flag_skip_invalidate_cache = 0;
//#define DEBUG_REG
// Just a C wrapper for threadMain
extern "C"
......@@ -68,6 +68,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
DBUG_ENTER("ClusterMgr::ClusterMgr");
ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create();
waitForHBCond= NdbCondition_Create();
waitingForHB= false;
noOfAliveNodes= 0;
noOfConnectedNodes= 0;
theClusterMgrThread= 0;
......@@ -80,6 +82,7 @@ ClusterMgr::~ClusterMgr()
{
DBUG_ENTER("ClusterMgr::~ClusterMgr");
doStop();
NdbCondition_Destroy(waitForHBCond);
NdbMutex_Destroy(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
}
......@@ -153,6 +156,70 @@ ClusterMgr::doStop( ){
DBUG_VOID_RETURN;
}
void
ClusterMgr::forceHB()
{
theFacade.lock_mutex();
if(waitingForHB)
{
NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
theFacade.unlock_mutex();
return;
}
waitingForHB= true;
NodeBitmask ndb_nodes;
ndb_nodes.clear();
waitForHBFromNodes.clear();
for(Uint32 i = 0; i < MAX_NODES; i++)
{
if(!theNodes[i].defined)
continue;
if(theNodes[i].m_info.m_type == NodeInfo::DB)
{
ndb_nodes.set(i);
const ClusterMgr::Node &node= getNodeInfo(i);
waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
}
}
waitForHBFromNodes.bitAND(ndb_nodes);
#ifdef DEBUG_REG
char buf[128];
ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
#endif
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
signal.theVerId_signalNumber = GSN_API_REGREQ;
signal.theReceiversBlockNumber = QMGR;
signal.theTrace = 0;
signal.theLength = ApiRegReq::SignalLength;
ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
req->version = NDB_VERSION;
int nodeId= 0;
for(int i=0;
NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i));
i= nodeId+1)
{
#ifdef DEBUG_REG
ndbout << "FORCE HB to " << nodeId << endl;
#endif
theFacade.sendSignalUnCond(&signal, nodeId);
}
NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
waitingForHB= false;
#ifdef DEBUG_REG
ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
#endif
theFacade.unlock_mutex();
}
void
ClusterMgr::threadMain( ){
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
......@@ -174,9 +241,6 @@ ClusterMgr::threadMain( ){
/**
* Start of Secure area for use of Transporter
*/
int send_heartbeat_now= global_flag_send_heartbeat_now;
global_flag_send_heartbeat_now= 0;
if (m_cluster_state == CS_waiting_for_clean_cache)
{
theFacade.m_globalDictCache.lock();
......@@ -209,8 +273,7 @@ ClusterMgr::threadMain( ){
}
theNode.hbCounter += timeSlept;
if (theNode.hbCounter >= theNode.hbFrequency ||
send_heartbeat_now) {
if (theNode.hbCounter >= theNode.hbFrequency) {
/**
* It is now time to send a new Heartbeat
*/
......@@ -219,7 +282,7 @@ ClusterMgr::threadMain( ){
theNode.hbCounter = 0;
}
#if 0
#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
#endif
theFacade.sendSignalUnCond(&signal, nodeId);
......@@ -272,7 +335,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
const NodeId nodeId = refToNode(apiRegReq->ref);
#if 0
#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
#endif
......@@ -313,7 +376,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
#if 0
#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
#endif
......@@ -342,6 +405,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
}//if
node.m_info.m_heartbeat_cnt = 0;
node.hbCounter = 0;
if(waitingForHB)
{
waitForHBFromNodes.clear(nodeId);
if(waitForHBFromNodes.isclear())
{
waitingForHB= false;
NdbCondition_Broadcast(waitForHBCond);
}
}
node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
}
......@@ -371,6 +445,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){
default:
break;
}
waitForHBFromNodes.clear(nodeId);
if(waitForHBFromNodes.isclear())
NdbCondition_Signal(waitForHBCond);
}
void
......
......@@ -50,6 +50,8 @@ public:
void doStop();
void startThread();
void forceHB();
private:
void threadMain();
......@@ -91,6 +93,11 @@ private:
Uint32 noOfConnectedNodes;
Node theNodes[MAX_NODES];
NdbThread* theClusterMgrThread;
NodeBitmask waitForHBFromNodes; // used in forcing HBs
NdbCondition* waitForHBCond;
bool waitingForHB;
enum Cluster_state m_cluster_state;
/**
* Used for controlling start/stop of the thread
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment