Commit 2acfc2b9 authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb

into poseidon.ndb.mysql.com:/home/tomas/mysql-5.0-ndb


ndb/include/ndbapi/NdbDictionary.hpp:
  Auto merged
ndb/src/ndbapi/NdbDictionary.cpp:
  Auto merged
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Auto merged
sql/mysqld.cc:
  Auto merged
parents d078687a 9d9d7582
...@@ -34,13 +34,13 @@ col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null, ...@@ -34,13 +34,13 @@ col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null,
col6 int not null, to_be_deleted int) ENGINE=ndbcluster; col6 int not null, to_be_deleted int) ENGINE=ndbcluster;
show table status; show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 10 Dynamic 0 0 0 0 0 0 1 NULL NULL NULL latin1_swedish_ci NULL t1 ndbcluster 10 Dynamic 0 0 0 0 0 0 1 NULL NULL NULL latin1_swedish_ci NULL number_of_replicas: 2
SET SQL_MODE=NO_AUTO_VALUE_ON_ZERO; SET SQL_MODE=NO_AUTO_VALUE_ON_ZERO;
insert into t1 values insert into t1 values
(0,4,3,5,"PENDING",1,7),(NULL,4,3,5,"PENDING",1,7),(31,4,3,5,"PENDING",1,7), (7,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7), (100,4,3,5,"PENDING",1,7), (99,4,3,5,"PENDING",1,7), (8,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7); (0,4,3,5,"PENDING",1,7),(NULL,4,3,5,"PENDING",1,7),(31,4,3,5,"PENDING",1,7), (7,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7), (100,4,3,5,"PENDING",1,7), (99,4,3,5,"PENDING",1,7), (8,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7);
show table status; show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 10 Dynamic 9 0 0 0 0 0 101 NULL NULL NULL latin1_swedish_ci NULL t1 ndbcluster 10 Dynamic 9 96 131072 0 0 0 101 NULL NULL NULL latin1_swedish_ci NULL number_of_replicas: 2
select * from t1 order by col1; select * from t1 order by col1;
col1 col2 col3 col4 col5 col6 to_be_deleted col1 col2 col3 col4 col5 col6 to_be_deleted
0 4 3 5 PENDING 1 7 0 4 3 5 PENDING 1 7
...@@ -60,7 +60,7 @@ change column col2 fourth varchar(30) not null after col3, ...@@ -60,7 +60,7 @@ change column col2 fourth varchar(30) not null after col3,
modify column col6 int not null first; modify column col6 int not null first;
show table status; show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 10 Dynamic 9 0 0 0 0 0 102 NULL NULL NULL latin1_swedish_ci NULL t1 ndbcluster 10 Dynamic 9 152 131072 0 0 0 102 NULL NULL NULL latin1_swedish_ci NULL number_of_replicas: 2
select * from t1 order by col1; select * from t1 order by col1;
col6 col1 col3 fourth col4 col4_5 col5 col7 col8 col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 0 3 4 5 PENDING 0000-00-00 00:00:00 1 0 3 4 5 PENDING 0000-00-00 00:00:00
...@@ -75,7 +75,7 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8 ...@@ -75,7 +75,7 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8
insert into t1 values (2, NULL,4,3,5,99,"PENDING","EXTRA",'2004-01-01 00:00:00'); insert into t1 values (2, NULL,4,3,5,99,"PENDING","EXTRA",'2004-01-01 00:00:00');
show table status; show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 ndbcluster 10 Dynamic 10 0 0 0 0 0 103 NULL NULL NULL latin1_swedish_ci NULL t1 ndbcluster 10 Dynamic 10 152 131072 0 0 0 103 NULL NULL NULL latin1_swedish_ci NULL number_of_replicas: 2
select * from t1 order by col1; select * from t1 order by col1;
col6 col1 col3 fourth col4 col4_5 col5 col7 col8 col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 0 3 4 5 PENDING 0000-00-00 00:00:00 1 0 3 4 5 PENDING 0000-00-00 00:00:00
......
...@@ -145,7 +145,7 @@ flush tables; ...@@ -145,7 +145,7 @@ flush tables;
show table status; show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t6 MyISAM 9 Fixed 1 260 # # # 0 NULL # # NULL # NULL t6 MyISAM 9 Fixed 1 260 # # # 0 NULL # # NULL # NULL
t7 ndbcluster 9 Fixed 1 0 # # # 0 NULL # # NULL # NULL t7 ndbcluster 9 Fixed 1 276 # # # 0 NULL # # NULL # NULL number_of_replicas: 2
show status like 'handler_discover%'; show status like 'handler_discover%';
Variable_name Value Variable_name Value
Handler_discover 2 Handler_discover 2
......
...@@ -70,12 +70,13 @@ ...@@ -70,12 +70,13 @@
* mysql> use TEST_DB; * mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb; * mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
* *
* In another window start ndbapi_example5, wait until properly started * In another window start ndbapi_event, wait until properly started
* *
* mysql> insert into TAB0 values (1,2,3); insert into TAB0 values (1,2,3);
* mysql> insert into TAB0 values (2,2,3); insert into TAB0 values (2,2,3);
* mysql> insert into TAB0 values (3,2,9); insert into TAB0 values (3,2,9);
* mysql> update TAB0 set COL1=10 where COL0=1;
delete from TAB0 where COL0=1;
* *
* you should see the data popping up in the example window * you should see the data popping up in the example window
* *
......
...@@ -40,6 +40,7 @@ public: ...@@ -40,6 +40,7 @@ public:
STATIC_CONST( RANGE_NO = 0xFFFB ); // Read range no (when batched ranges) STATIC_CONST( RANGE_NO = 0xFFFB ); // Read range no (when batched ranges)
STATIC_CONST( ROW_SIZE = 0xFFFA ); STATIC_CONST( ROW_SIZE = 0xFFFA );
STATIC_CONST( FRAGMENT_MEMORY= 0xFFF9 );
/** Initialize AttributeHeader at location aHeaderPtr */ /** Initialize AttributeHeader at location aHeaderPtr */
static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId, static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId,
......
...@@ -56,6 +56,7 @@ struct TriggerActionTime { ...@@ -56,6 +56,7 @@ struct TriggerActionTime {
}; };
struct TriggerEvent { struct TriggerEvent {
/** TableEvent must match 1 << TriggerEvent */
enum Value { enum Value {
TE_INSERT = 0, TE_INSERT = 0,
TE_DELETE = 1, TE_DELETE = 1,
......
...@@ -438,6 +438,7 @@ public: ...@@ -438,6 +438,7 @@ public:
const char* getDefaultValue() const; const char* getDefaultValue() const;
static const Column * FRAGMENT; static const Column * FRAGMENT;
static const Column * FRAGMENT_MEMORY;
static const Column * ROW_COUNT; static const Column * ROW_COUNT;
static const Column * COMMIT_COUNT; static const Column * COMMIT_COUNT;
static const Column * ROW_SIZE; static const Column * ROW_SIZE;
...@@ -716,6 +717,8 @@ public: ...@@ -716,6 +717,8 @@ public:
int getRowSizeInBytes() const ; int getRowSizeInBytes() const ;
int createTableInDb(Ndb*, bool existingEqualIsOk = true) const ; int createTableInDb(Ndb*, bool existingEqualIsOk = true) const ;
int getReplicaCount() const ;
#endif #endif
private: private:
...@@ -916,6 +919,9 @@ public: ...@@ -916,6 +919,9 @@ public:
/** /**
* Specifies the type of database operations an Event listens to * Specifies the type of database operations an Event listens to
*/ */
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** TableEvent must match 1 << TriggerEvent */
#endif
enum TableEvent { enum TableEvent {
TE_INSERT=1, ///< Insert event on table TE_INSERT=1, ///< Insert event on table
TE_DELETE=2, ///< Delete event on table TE_DELETE=2, ///< Delete event on table
......
...@@ -86,6 +86,7 @@ public: ...@@ -86,6 +86,7 @@ public:
void set_optimized_node_selection(int val); void set_optimized_node_selection(int val);
unsigned no_db_nodes(); unsigned no_db_nodes();
unsigned node_id();
#endif #endif
private: private:
......
...@@ -1001,6 +1001,13 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){ ...@@ -1001,6 +1001,13 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){
case AttributeHeader::FRAGMENT: case AttributeHeader::FRAGMENT:
* outBuffer = operPtr.p->fragId >> 1; // remove "hash" bit * outBuffer = operPtr.p->fragId >> 1; // remove "hash" bit
return 1; return 1;
case AttributeHeader::FRAGMENT_MEMORY:
{
Uint64 tmp= fragptr.p->noOfPages;
tmp*= 32768;
memcpy(outBuffer,&tmp,8);
}
return 2;
case AttributeHeader::ROW_SIZE: case AttributeHeader::ROW_SIZE:
* outBuffer = tabptr.p->tupheadsize << 2; * outBuffer = tabptr.p->tupheadsize << 2;
return 1; return 1;
......
...@@ -437,6 +437,11 @@ NdbDictionary::Table::getRowSizeInBytes() const { ...@@ -437,6 +437,11 @@ NdbDictionary::Table::getRowSizeInBytes() const {
return sz * 4; return sz * 4;
} }
int
NdbDictionary::Table::getReplicaCount() const {
return m_impl.m_replicaCount;
}
int int
NdbDictionary::Table::createTableInDb(Ndb* pNdb, bool equalOk) const { NdbDictionary::Table::createTableInDb(Ndb* pNdb, bool equalOk) const {
const NdbDictionary::Table * pTab = const NdbDictionary::Table * pTab =
...@@ -1005,6 +1010,7 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) ...@@ -1005,6 +1010,7 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
} }
const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0; const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_MEMORY = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0; const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0; const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0; const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0;
......
...@@ -232,6 +232,11 @@ NdbColumnImpl::create_psuedo(const char * name){ ...@@ -232,6 +232,11 @@ NdbColumnImpl::create_psuedo(const char * name){
col->m_impl.m_attrId = AttributeHeader::FRAGMENT; col->m_impl.m_attrId = AttributeHeader::FRAGMENT;
col->m_impl.m_attrSize = 4; col->m_impl.m_attrSize = 4;
col->m_impl.m_arraySize = 1; col->m_impl.m_arraySize = 1;
} else if(!strcmp(name, "NDB$FRAGMENT_MEMORY")){
col->setType(NdbDictionary::Column::Bigunsigned);
col->m_impl.m_attrId = AttributeHeader::FRAGMENT_MEMORY;
col->m_impl.m_attrSize = 8;
col->m_impl.m_arraySize = 1;
} else if(!strcmp(name, "NDB$ROW_COUNT")){ } else if(!strcmp(name, "NDB$ROW_COUNT")){
col->setType(NdbDictionary::Column::Bigunsigned); col->setType(NdbDictionary::Column::Bigunsigned);
col->m_impl.m_attrId = AttributeHeader::ROW_COUNT; col->m_impl.m_attrId = AttributeHeader::ROW_COUNT;
...@@ -685,10 +690,12 @@ NdbDictionaryImpl::~NdbDictionaryImpl() ...@@ -685,10 +690,12 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
m_globalHash->lock(); m_globalHash->lock();
if(--f_dictionary_count == 0){ if(--f_dictionary_count == 0){
delete NdbDictionary::Column::FRAGMENT; delete NdbDictionary::Column::FRAGMENT;
delete NdbDictionary::Column::FRAGMENT_MEMORY;
delete NdbDictionary::Column::ROW_COUNT; delete NdbDictionary::Column::ROW_COUNT;
delete NdbDictionary::Column::COMMIT_COUNT; delete NdbDictionary::Column::COMMIT_COUNT;
delete NdbDictionary::Column::ROW_SIZE; delete NdbDictionary::Column::ROW_SIZE;
NdbDictionary::Column::FRAGMENT= 0; NdbDictionary::Column::FRAGMENT= 0;
NdbDictionary::Column::FRAGMENT_MEMORY= 0;
NdbDictionary::Column::ROW_COUNT= 0; NdbDictionary::Column::ROW_COUNT= 0;
NdbDictionary::Column::COMMIT_COUNT= 0; NdbDictionary::Column::COMMIT_COUNT= 0;
NdbDictionary::Column::ROW_SIZE= 0; NdbDictionary::Column::ROW_SIZE= 0;
...@@ -754,6 +761,8 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb, ...@@ -754,6 +761,8 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb,
if(f_dictionary_count++ == 0){ if(f_dictionary_count++ == 0){
NdbDictionary::Column::FRAGMENT= NdbDictionary::Column::FRAGMENT=
NdbColumnImpl::create_psuedo("NDB$FRAGMENT"); NdbColumnImpl::create_psuedo("NDB$FRAGMENT");
NdbDictionary::Column::FRAGMENT_MEMORY=
NdbColumnImpl::create_psuedo("NDB$FRAGMENT_MEMORY");
NdbDictionary::Column::ROW_COUNT= NdbDictionary::Column::ROW_COUNT=
NdbColumnImpl::create_psuedo("NDB$ROW_COUNT"); NdbColumnImpl::create_psuedo("NDB$ROW_COUNT");
NdbDictionary::Column::COMMIT_COUNT= NdbDictionary::Column::COMMIT_COUNT=
......
...@@ -224,9 +224,8 @@ NdbEventOperationImpl::execute() ...@@ -224,9 +224,8 @@ NdbEventOperationImpl::execute()
int hasSubscriber; int hasSubscriber;
int r= int r= m_bufferHandle->prepareAddSubscribeEvent(this,
m_bufferHandle->prepareAddSubscribeEvent(m_eventImpl->m_eventId, hasSubscriber /*return value*/);
hasSubscriber /* return value */);
m_error.code= 4709; m_error.code= 4709;
if (r < 0) if (r < 0)
...@@ -697,10 +696,11 @@ NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle) ...@@ -697,10 +696,11 @@ NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle)
} }
*/ */
int int
NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(Uint32 eventId, NdbGlobalEventBufferHandle::prepareAddSubscribeEvent
int& hasSubscriber) (NdbEventOperationImpl *eventOp, int& hasSubscriber)
{ {
ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventId, hasSubscriber)); ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventOp,
hasSubscriber));
} }
void void
NdbGlobalEventBufferHandle::addSubscribeEvent NdbGlobalEventBufferHandle::addSubscribeEvent
...@@ -893,11 +893,13 @@ NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h) ...@@ -893,11 +893,13 @@ NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
int int
NdbGlobalEventBuffer::real_prepareAddSubscribeEvent NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
(NdbGlobalEventBufferHandle *aHandle, Uint32 eventId, int& hasSubscriber) (NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp,
int& hasSubscriber)
{ {
DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent"); DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent");
int i; int i;
int bufferId= -1; int bufferId= -1;
Uint32 eventId= eventOp->m_eventId;
// add_drop_lock(); // only one thread can do add or drop at a time // add_drop_lock(); // only one thread can do add or drop at a time
...@@ -939,6 +941,7 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent ...@@ -939,6 +941,7 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
bufferId= NO_ID(0, bufferId); bufferId= NO_ID(0, bufferId);
b.gId= eventId; b.gId= eventId;
b.eventType= (Uint32)eventOp->m_eventImpl->mi_type;
if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) { if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) {
ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed"); ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
...@@ -1137,6 +1140,8 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId, ...@@ -1137,6 +1140,8 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
#ifdef EVENT_DEBUG #ifdef EVENT_DEBUG
int n = NO(bufferId); int n = NO(bufferId);
#endif #endif
if ( b.eventType & (1 << (Uint32)sdata->operation) )
{ {
if (b.subs) { if (b.subs) {
#ifdef EVENT_DEBUG #ifdef EVENT_DEBUG
...@@ -1175,6 +1180,13 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId, ...@@ -1175,6 +1180,13 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
#endif #endif
} }
} }
else
{
#ifdef EVENT_DEBUG
ndbout_c("skipped");
#endif
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -79,7 +79,7 @@ public: ...@@ -79,7 +79,7 @@ public:
//static NdbGlobalEventBufferHandle *init(int MAX_NUMBER_ACTIVE_EVENTS); //static NdbGlobalEventBufferHandle *init(int MAX_NUMBER_ACTIVE_EVENTS);
// returns bufferId 0-N if ok otherwise -1 // returns bufferId 0-N if ok otherwise -1
int prepareAddSubscribeEvent(Uint32 eventId, int& hasSubscriber); int prepareAddSubscribeEvent(NdbEventOperationImpl *, int& hasSubscriber);
void unprepareAddSubscribeEvent(int bufferId); void unprepareAddSubscribeEvent(int bufferId);
void addSubscribeEvent(int bufferId, void addSubscribeEvent(int bufferId,
NdbEventOperationImpl *ndbEventOperationImpl); NdbEventOperationImpl *ndbEventOperationImpl);
...@@ -133,7 +133,8 @@ private: ...@@ -133,7 +133,8 @@ private:
int MAX_NUMBER_ACTIVE_EVENTS); int MAX_NUMBER_ACTIVE_EVENTS);
int real_prepareAddSubscribeEvent(NdbGlobalEventBufferHandle *h, int real_prepareAddSubscribeEvent(NdbGlobalEventBufferHandle *h,
Uint32 eventId, int& hasSubscriber); NdbEventOperationImpl *,
int& hasSubscriber);
void real_unprepareAddSubscribeEvent(int bufferId); void real_unprepareAddSubscribeEvent(int bufferId);
void real_addSubscribeEvent(int bufferId, void *ndbEventOperation); void real_addSubscribeEvent(int bufferId, void *ndbEventOperation);
...@@ -177,6 +178,7 @@ private: ...@@ -177,6 +178,7 @@ private:
// local mutex for each event/buffer // local mutex for each event/buffer
NdbMutex *p_buf_mutex; NdbMutex *p_buf_mutex;
Uint32 gId; Uint32 gId;
Uint32 eventType;
struct Data { struct Data {
SubTableData *sdata; SubTableData *sdata;
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
......
...@@ -183,6 +183,12 @@ Ndb_cluster_connection::no_db_nodes() ...@@ -183,6 +183,12 @@ Ndb_cluster_connection::no_db_nodes()
return m_impl.m_all_nodes.size(); return m_impl.m_all_nodes.size();
} }
unsigned
Ndb_cluster_connection::node_id()
{
return m_impl.m_transporter_facade->ownId();
}
int int
Ndb_cluster_connection::wait_until_ready(int timeout, Ndb_cluster_connection::wait_until_ready(int timeout,
......
...@@ -85,7 +85,7 @@ static int unpackfrm(const void **data, uint *len, ...@@ -85,7 +85,7 @@ static int unpackfrm(const void **data, uint *len,
const void* pack_data); const void* pack_data);
static int ndb_get_table_statistics(Ndb*, const char *, static int ndb_get_table_statistics(Ndb*, const char *,
Uint64* rows, Uint64* commits); struct Ndb_statistics *);
/* /*
...@@ -94,6 +94,44 @@ static int ndb_get_table_statistics(Ndb*, const char *, ...@@ -94,6 +94,44 @@ static int ndb_get_table_statistics(Ndb*, const char *,
*/ */
static uint32 dummy_buf; static uint32 dummy_buf;
/*
Stats that can be retrieved from ndb
*/
struct Ndb_statistics {
Uint64 row_count;
Uint64 commit_count;
Uint64 row_size;
Uint64 fragment_memory;
};
/* Status variables shown with 'show status like 'Ndb%' */
static long ndb_cluster_node_id= 0;
static const char * ndb_connected_host= 0;
static long ndb_connected_port= 0;
static long ndb_number_of_replicas= 0;
static long ndb_number_of_storage_nodes= 0;
static int update_status_variables(Ndb_cluster_connection *c)
{
ndb_cluster_node_id= c->node_id();
ndb_connected_port= c->get_connected_port();
ndb_connected_host= c->get_connected_host();
ndb_number_of_replicas= 0;
ndb_number_of_storage_nodes= c->no_db_nodes();
return 0;
}
struct show_var_st ndb_status_variables[]= {
{"cluster_node_id", (char*) &ndb_cluster_node_id, SHOW_LONG},
{"connected_host", (char*) &ndb_connected_host, SHOW_CHAR_PTR},
{"connected_port", (char*) &ndb_connected_port, SHOW_LONG},
// {"number_of_replicas", (char*) &ndb_number_of_replicas, SHOW_LONG},
{"number_of_storage_nodes",(char*) &ndb_number_of_storage_nodes, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
/* /*
Error handling functions Error handling functions
*/ */
...@@ -262,9 +300,11 @@ void ha_ndbcluster::records_update() ...@@ -262,9 +300,11 @@ void ha_ndbcluster::records_update()
// if (info->records == ~(ha_rows)0) // if (info->records == ~(ha_rows)0)
{ {
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
Uint64 rows; struct Ndb_statistics stat;
if(ndb_get_table_statistics(ndb, m_tabname, &rows, 0) == 0){ if(ndb_get_table_statistics(ndb, m_tabname, &stat) == 0){
info->records= rows; mean_rec_length= stat.row_size;
data_file_length= stat.fragment_memory;
info->records= stat.row_count;
} }
} }
{ {
...@@ -2724,10 +2764,19 @@ void ha_ndbcluster::info(uint flag) ...@@ -2724,10 +2764,19 @@ void ha_ndbcluster::info(uint flag)
if ((my_errno= check_ndb_connection())) if ((my_errno= check_ndb_connection()))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
Uint64 rows= 100; struct Ndb_statistics stat;
if (current_thd->variables.ndb_use_exact_count) if (current_thd->variables.ndb_use_exact_count &&
ndb_get_table_statistics(ndb, m_tabname, &rows, 0); ndb_get_table_statistics(ndb, m_tabname, &stat) == 0)
records= rows; {
mean_rec_length= stat.row_size;
data_file_length= stat.fragment_memory;
records= stat.row_count;
}
else
{
mean_rec_length= 0;
records= 100;
}
} }
} }
if (flag & HA_STATUS_CONST) if (flag & HA_STATUS_CONST)
...@@ -4034,6 +4083,8 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb() ...@@ -4034,6 +4083,8 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb()
thd_ndb= new Thd_ndb(); thd_ndb= new Thd_ndb();
thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
if (thd_ndb->ndb->init(max_transactions) != 0) if (thd_ndb->ndb->init(max_transactions) != 0)
{ {
ERR_PRINT(thd_ndb->ndb->getNdbError()); ERR_PRINT(thd_ndb->ndb->getNdbError());
...@@ -4359,6 +4410,13 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -4359,6 +4410,13 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
a NDB Cluster table handler a NDB Cluster table handler
*/ */
/* Call back after cluster connect */
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
return 0;
}
bool ndbcluster_init() bool ndbcluster_init()
{ {
int res; int res;
...@@ -4388,6 +4446,7 @@ bool ndbcluster_init() ...@@ -4388,6 +4446,7 @@ bool ndbcluster_init()
if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0) if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0)
{ {
connect_callback();
DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d", DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
g_ndb_cluster_connection->get_connected_host(), g_ndb_cluster_connection->get_connected_host(),
g_ndb_cluster_connection->get_connected_port())); g_ndb_cluster_connection->get_connected_port()));
...@@ -4395,7 +4454,7 @@ bool ndbcluster_init() ...@@ -4395,7 +4454,7 @@ bool ndbcluster_init()
} }
else if(res == 1) else if(res == 1)
{ {
if (g_ndb_cluster_connection->start_connect_thread()) if (g_ndb_cluster_connection->start_connect_thread(connect_callback))
{ {
DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()")); DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
goto ndbcluster_init_error; goto ndbcluster_init_error;
...@@ -4814,7 +4873,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, ...@@ -4814,7 +4873,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
static static
int int
ndb_get_table_statistics(Ndb* ndb, const char * table, ndb_get_table_statistics(Ndb* ndb, const char * table,
Uint64* row_count, Uint64* commit_count) struct Ndb_statistics * ndbstat)
{ {
DBUG_ENTER("ndb_get_table_statistics"); DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table)); DBUG_PRINT("enter", ("table: %s", table));
...@@ -4835,9 +4894,11 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4835,9 +4894,11 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (check == -1) if (check == -1)
break; break;
Uint64 rows, commits; Uint64 rows, commits, size, mem;
pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows); pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits); pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
pOp->getValue(NdbDictionary::Column::ROW_SIZE, (char*)&size);
pOp->getValue(NdbDictionary::Column::FRAGMENT_MEMORY, (char*)&mem);
check= pTrans->execute(NdbTransaction::NoCommit, check= pTrans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError, NdbTransaction::AbortOnError,
...@@ -4847,10 +4908,15 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4847,10 +4908,15 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
Uint64 sum_rows= 0; Uint64 sum_rows= 0;
Uint64 sum_commits= 0; Uint64 sum_commits= 0;
Uint64 sum_row_size= 0;
Uint64 sum_mem= 0;
while((check= pOp->nextResult(TRUE, TRUE)) == 0) while((check= pOp->nextResult(TRUE, TRUE)) == 0)
{ {
sum_rows+= rows; sum_rows+= rows;
sum_commits+= commits; sum_commits+= commits;
if (sum_row_size < size)
sum_row_size= size;
sum_mem+= mem;
} }
if (check == -1) if (check == -1)
...@@ -4859,11 +4925,14 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4859,11 +4925,14 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
pOp->close(TRUE); pOp->close(TRUE);
ndb->closeTransaction(pTrans); ndb->closeTransaction(pTrans);
if(row_count)
* row_count= sum_rows; ndbstat->row_count= sum_rows;
if(commit_count) ndbstat->commit_count= sum_commits;
* commit_count= sum_commits; ndbstat->row_size= sum_row_size;
DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits)); ndbstat->fragment_memory= sum_mem;
DBUG_PRINT("exit", ("records: %u commits: %u row_size: %d mem: %d",
sum_rows, sum_commits, sum_row_size, sum_mem));
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
...@@ -5248,4 +5317,43 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr) ...@@ -5248,4 +5317,43 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
char*
ha_ndbcluster::update_table_comment(
/* out: table comment + additional */
const char* comment)/* in: table comment defined by user */
{
uint length= strlen(comment);
if(length > 64000 - 3)
{
return((char*)comment); /* string too long */
}
Ndb* ndb;
if (!(ndb= get_ndb()))
{
return((char*)comment);
}
ndb->setDatabaseName(m_dbname);
NDBDICT* dict= ndb->getDictionary();
const NDBTAB* tab;
if (!(tab= dict->getTable(m_tabname)))
{
return((char*)comment);
}
char *str;
const char *fmt="%s%snumber_of_replicas: %d";
const unsigned fmt_len_plus_extra= length + strlen(fmt);
if ((str= my_malloc(fmt_len_plus_extra, MYF(0))) == NULL)
{
return (char*)comment;
}
snprintf(str,fmt_len_plus_extra,fmt,comment,
length > 0 ? " ":"",
tab->getReplicaCount());
return str;
}
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
...@@ -215,6 +215,8 @@ class ha_ndbcluster: public handler ...@@ -215,6 +215,8 @@ class ha_ndbcluster: public handler
int write_ndb_file(); int write_ndb_file();
char *update_table_comment(const char * comment);
private: private:
int check_ndb_connection(); int check_ndb_connection();
...@@ -277,6 +279,8 @@ class ha_ndbcluster: public handler ...@@ -277,6 +279,8 @@ class ha_ndbcluster: public handler
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*);
}; };
extern struct show_var_st ndb_status_variables[];
bool ndbcluster_init(void); bool ndbcluster_init(void);
bool ndbcluster_end(void); bool ndbcluster_end(void);
......
...@@ -5615,6 +5615,9 @@ struct show_var_st status_vars[]= { ...@@ -5615,6 +5615,9 @@ struct show_var_st status_vars[]= {
SHOW_KEY_CACHE_LONG}, SHOW_KEY_CACHE_LONG},
{"Last_query_cost", (char*) &last_query_cost, SHOW_DOUBLE}, {"Last_query_cost", (char*) &last_query_cost, SHOW_DOUBLE},
{"Max_used_connections", (char*) &max_used_connections, SHOW_LONG}, {"Max_used_connections", (char*) &max_used_connections, SHOW_LONG},
#ifdef HAVE_NDBCLUSTER_DB
{"Ndb_", (char*) &ndb_status_variables, SHOW_VARS},
#endif /*HAVE_NDBCLUSTER_DB*/
{"Not_flushed_delayed_rows", (char*) &delayed_rows_in_use, SHOW_LONG_CONST}, {"Not_flushed_delayed_rows", (char*) &delayed_rows_in_use, SHOW_LONG_CONST},
{"Open_files", (char*) &my_file_opened, SHOW_LONG_CONST}, {"Open_files", (char*) &my_file_opened, SHOW_LONG_CONST},
{"Open_streams", (char*) &my_stream_opened, SHOW_LONG_CONST}, {"Open_streams", (char*) &my_stream_opened, SHOW_LONG_CONST},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment