Commit dfbdbfd0 authored by ram@gw.mysql.r18.ru's avatar ram@gw.mysql.r18.ru

Merge rkalimullin@bk-internal.mysql.com:/home/bk/mysql-4.1

into gw.mysql.r18.ru:/usr/home/ram/work/4.1.b7281
parents da562b7e 1e102d1a
...@@ -1599,11 +1599,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [ ...@@ -1599,11 +1599,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
;; ;;
esac esac
AC_ARG_WITH([ndb-shm],
[
--with-ndb-shm Include the NDB Cluster shared memory transporter],
[ndb_shm="$withval"],
[ndb_shm=no])
AC_ARG_WITH([ndb-test], AC_ARG_WITH([ndb-test],
[ [
--with-ndb-test Include the NDB Cluster ndbapi test programs], --with-ndb-test Include the NDB Cluster ndbapi test programs],
...@@ -1633,19 +1628,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [ ...@@ -1633,19 +1628,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
AC_MSG_CHECKING([for NDB Cluster options]) AC_MSG_CHECKING([for NDB Cluster options])
AC_MSG_RESULT([]) AC_MSG_RESULT([])
have_ndb_shm=no
case "$ndb_shm" in
yes )
AC_MSG_RESULT([-- including shared memory transporter])
AC_DEFINE([NDB_SHM_TRANSPORTER], [1],
[Including Ndb Cluster DB shared memory transporter])
have_ndb_shm="yes"
;;
* )
AC_MSG_RESULT([-- not including shared memory transporter])
;;
esac
have_ndb_test=no have_ndb_test=no
case "$ndb_test" in case "$ndb_test" in
yes ) yes )
......
...@@ -1923,7 +1923,9 @@ AC_CHECK_FUNCS(alarm bcmp bfill bmove bzero chsize cuserid fchmod fcntl \ ...@@ -1923,7 +1923,9 @@ AC_CHECK_FUNCS(alarm bcmp bfill bmove bzero chsize cuserid fchmod fcntl \
pthread_attr_setstacksize pthread_condattr_create pthread_getsequence_np \ pthread_attr_setstacksize pthread_condattr_create pthread_getsequence_np \
pthread_key_delete pthread_rwlock_rdlock pthread_setprio \ pthread_key_delete pthread_rwlock_rdlock pthread_setprio \
pthread_setprio_np pthread_setschedparam pthread_sigmask readlink \ pthread_setprio_np pthread_setschedparam pthread_sigmask readlink \
realpath rename rint rwlock_init setupterm sighold sigset sigthreadmask \ realpath rename rint rwlock_init setupterm \
shmget shmat shmdt shmctl \
sighold sigset sigthreadmask \
snprintf socket stpcpy strcasecmp strerror strnlen strpbrk strstr strtol \ snprintf socket stpcpy strcasecmp strerror strnlen strpbrk strstr strtol \
strtoll strtoul strtoull tell tempnam thr_setconcurrency vidattr) strtoll strtoul strtoull tell tempnam thr_setconcurrency vidattr)
...@@ -3078,10 +3080,19 @@ fi ...@@ -3078,10 +3080,19 @@ fi
AC_SUBST([ndb_port_base]) AC_SUBST([ndb_port_base])
ndb_transporter_opt_objs="" ndb_transporter_opt_objs=""
if test X"$have_ndb_shm" = Xyes if test "$ac_cv_func_shmget" = "yes" &&
then test "$ac_cv_func_shmat" = "yes" &&
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo" test "$ac_cv_func_shmdt" = "yes" &&
test "$ac_cv_func_shmctl" = "yes"
then
AC_DEFINE([NDB_SHM_TRANSPORTER], [1],
[Including Ndb Cluster DB shared memory transporter])
AC_MSG_RESULT([Including ndb shared memory transporter])
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo"
else
AC_MSG_RESULT([Not including ndb shared memory transporter])
fi fi
if test X"$have_ndb_sci" = Xyes if test X"$have_ndb_sci" = Xyes
then then
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SCI_Transporter.lo" ndb_transporter_opt_objs="$ndb_transporter_opt_objs SCI_Transporter.lo"
......
...@@ -88,6 +88,10 @@ int mi_rnext_same(MI_INFO *info, byte *buf) ...@@ -88,6 +88,10 @@ int mi_rnext_same(MI_INFO *info, byte *buf)
if (my_errno == HA_ERR_KEY_NOT_FOUND) if (my_errno == HA_ERR_KEY_NOT_FOUND)
my_errno=HA_ERR_END_OF_FILE; my_errno=HA_ERR_END_OF_FILE;
} }
else if (!buf)
{
DBUG_RETURN(info->lastpos==HA_OFFSET_ERROR ? my_errno : 0);
}
else if (!(*info->read_record)(info,info->lastpos,buf)) else if (!(*info->read_record)(info,info->lastpos,buf))
{ {
info->update|= HA_STATE_AKTIV; /* Record is read */ info->update|= HA_STATE_AKTIV; /* Record is read */
......
...@@ -16,25 +16,36 @@ ...@@ -16,25 +16,36 @@
#include "myrg_def.h" #include "myrg_def.h"
int myrg_rnext_same(MYRG_INFO *info, byte *buf) int myrg_rnext_same(MYRG_INFO *info, byte *buf)
{ {
uint err; int err;
MI_INFO *mi; MI_INFO *mi;
if (!info->current_table) if (!info->current_table)
return (HA_ERR_KEY_NOT_FOUND); return (HA_ERR_KEY_NOT_FOUND);
err=mi_rnext_same(info->current_table->table,buf); /* at first, do rnext for the table found before */
if (err == HA_ERR_END_OF_FILE) if ((err=mi_rnext_same(info->current_table->table,NULL)))
{ {
queue_remove(&(info->by_key),0); if (err == HA_ERR_END_OF_FILE)
if (!info->by_key.elements) {
return HA_ERR_END_OF_FILE; queue_remove(&(info->by_key),0);
if (!info->by_key.elements)
mi=(info->current_table=(MYRG_TABLE *)queue_top(&(info->by_key)))->table; return HA_ERR_END_OF_FILE;
mi->once_flags|= RRND_PRESERVE_LASTINX; }
return mi_rrnd(mi,buf,mi->lastpos); else
return err;
} }
return err; else
{
/* Found here, adding to queue */
queue_top(&(info->by_key))=(byte *)(info->current_table);
queue_replaced(&(info->by_key));
}
/* now, mymerge's read_next is as simple as one queue_top */
mi=(info->current_table=(MYRG_TABLE *)queue_top(&(info->by_key)))->table;
return _myrg_mi_read_record(mi,buf);
} }
...@@ -32,3 +32,39 @@ select * from t1 where concat(A,C,B,D) = 'AAAA2003-03-011051'; ...@@ -32,3 +32,39 @@ select * from t1 where concat(A,C,B,D) = 'AAAA2003-03-011051';
a b c d a b c d
AAAA 105 2003-03-01 1 AAAA 105 2003-03-01 1
drop table t1; drop table t1;
select 'a' union select concat('a', -4);
a
a
a-4
select 'a' union select concat('a', -4.5);
a
a
a-4.5
select 'a' union select concat('a', -(4 + 1));
a
a
a-5
select 'a' union select concat('a', 4 - 5);
a
a
a-1
select 'a' union select concat('a', -'3');
a
a
a-3
select 'a' union select concat('a', -concat('3',4));
a
a
a-34
select 'a' union select concat('a', -0);
a
a
a0
select 'a' union select concat('a', -0.0);
a
a
a-0.0
select 'a' union select concat('a', -0.0000);
a
a
a-0.0000
...@@ -651,3 +651,28 @@ ERROR HY000: You can't specify target table 't1' for update in FROM clause ...@@ -651,3 +651,28 @@ ERROR HY000: You can't specify target table 't1' for update in FROM clause
create table t3 engine=merge union=(t1, t2) select * from t2; create table t3 engine=merge union=(t1, t2) select * from t2;
ERROR HY000: You can't specify target table 't2' for update in FROM clause ERROR HY000: You can't specify target table 't2' for update in FROM clause
drop table t1, t2; drop table t1, t2;
create table t1 (a int,b int,c int, index (a,b,c));
create table t2 (a int,b int,c int, index (a,b,c));
create table t3 (a int,b int,c int, index (a,b,c))
engine=merge union=(t1 ,t2);
insert into t1 (a,b,c) values (1,1,0),(1,2,0);
insert into t2 (a,b,c) values (1,1,1),(1,2,1);
explain select a,b,c from t3 force index (a) where a=1 order by a,b,c;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t3 ref a a 5 const 2 Using where; Using index
select a,b,c from t3 force index (a) where a=1 order by a,b,c;
a b c
1 1 0
1 1 1
1 2 0
1 2 1
explain select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t3 ref a a 5 const 2 Using where; Using index
select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
a b c
1 2 1
1 2 0
1 1 1
1 1 0
drop table t1, t2, t3;
...@@ -3,7 +3,7 @@ select 1, 1.0, -1, "hello", NULL; ...@@ -3,7 +3,7 @@ select 1, 1.0, -1, "hello", NULL;
Catalog Database Table Table_alias Column Column_alias Name Type Length Max length Is_null Flags Decimals Charsetnr Catalog Database Table Table_alias Column Column_alias Name Type Length Max length Is_null Flags Decimals Charsetnr
def 1 8 1 1 N 32769 0 8 def 1 8 1 1 N 32769 0 8
def 1.0 5 3 3 N 32769 1 8 def 1.0 5 3 3 N 32769 1 8
def -1 8 1 2 N 32769 0 8 def -1 8 2 2 N 32769 0 8
def hello 254 5 5 N 1 31 8 def hello 254 5 5 N 1 31 8
def NULL 6 0 0 Y 32896 0 63 def NULL 6 0 0 Y 32896 0 63
1 1.0 -1 hello NULL 1 1.0 -1 hello NULL
......
...@@ -34,3 +34,19 @@ create table t1 (a char(4), b double, c date, d tinyint(4)); ...@@ -34,3 +34,19 @@ create table t1 (a char(4), b double, c date, d tinyint(4));
insert into t1 values ('AAAA', 105, '2003-03-01', 1); insert into t1 values ('AAAA', 105, '2003-03-01', 1);
select * from t1 where concat(A,C,B,D) = 'AAAA2003-03-011051'; select * from t1 where concat(A,C,B,D) = 'AAAA2003-03-011051';
drop table t1; drop table t1;
# BUG#6825
select 'a' union select concat('a', -4);
select 'a' union select concat('a', -4.5);
select 'a' union select concat('a', -(4 + 1));
select 'a' union select concat('a', 4 - 5);
select 'a' union select concat('a', -'3');
select 'a' union select concat('a', -concat('3',4));
select 'a' union select concat('a', -0);
select 'a' union select concat('a', -0.0);
select 'a' union select concat('a', -0.0000);
...@@ -285,3 +285,21 @@ create table t3 engine=merge union=(t1, t2) select * from t1; ...@@ -285,3 +285,21 @@ create table t3 engine=merge union=(t1, t2) select * from t1;
--error 1093 --error 1093
create table t3 engine=merge union=(t1, t2) select * from t2; create table t3 engine=merge union=(t1, t2) select * from t2;
drop table t1, t2; drop table t1, t2;
# BUG#6699 : no sorting on 'ref' retrieval
create table t1 (a int,b int,c int, index (a,b,c));
create table t2 (a int,b int,c int, index (a,b,c));
create table t3 (a int,b int,c int, index (a,b,c))
engine=merge union=(t1 ,t2);
insert into t1 (a,b,c) values (1,1,0),(1,2,0);
insert into t2 (a,b,c) values (1,1,1),(1,2,1);
explain select a,b,c from t3 force index (a) where a=1 order by a,b,c;
select a,b,c from t3 force index (a) where a=1 order by a,b,c;
# this actually wasn't affected:
explain select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
select a,b,c from t3 force index (a) where a=1 order by a desc, b desc, c desc;
drop table t1, t2, t3;
...@@ -110,6 +110,7 @@ ...@@ -110,6 +110,7 @@
#define CFG_CONNECTION_SERVER_PORT 406 #define CFG_CONNECTION_SERVER_PORT 406
#define CFG_CONNECTION_HOSTNAME_1 407 #define CFG_CONNECTION_HOSTNAME_1 407
#define CFG_CONNECTION_HOSTNAME_2 408 #define CFG_CONNECTION_HOSTNAME_2 408
#define CFG_CONNECTION_GROUP 409
#define CFG_TCP_SERVER 452 #define CFG_TCP_SERVER 452
#define CFG_TCP_SEND_BUFFER_SIZE 454 #define CFG_TCP_SEND_BUFFER_SIZE 454
......
...@@ -901,23 +901,6 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); ...@@ -901,23 +901,6 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
NDB_MAX_SCHEMA_NAME_SIZE + \ NDB_MAX_SCHEMA_NAME_SIZE + \
NDB_MAX_TAB_NAME_SIZE*2 NDB_MAX_TAB_NAME_SIZE*2
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
class NdbWaiter {
public:
NdbWaiter();
~NdbWaiter();
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
struct NdbCondition * m_condition;
};
#endif
/** /**
* @class Ndb * @class Ndb
* @brief Represents the NDB kernel and is the main class of the NDB API. * @brief Represents the NDB kernel and is the main class of the NDB API.
...@@ -1199,39 +1182,6 @@ public: ...@@ -1199,39 +1182,6 @@ public:
const char * keyData = 0, const char * keyData = 0,
Uint32 keyLen = 0); Uint32 keyLen = 0);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* This method is a modification of Ndb::startTransaction,
* in which we use only the first two chars of keyData to
* select transaction coordinator.
* This is referred to as a distribution group.
* There are two ways to use the method:
* - In the first, the two characters are used directly as
* the distribution key, and
* - in the second the distribution is calculated as:
* (10 * (char[0] - 0x30) + (char[1] - 0x30)).
* Thus, in the second way, the two ASCII digits '78'
* will provide the distribution key = 78.
*
* @note Transaction priorities are not yet supported.
*
* @param aPrio Priority of the transaction.<br>
* Priority 0 is the highest priority and is used for short transactions
* with requirements on low delay.<br>
* Priority 1 is a medium priority for short transactions.<br>
* Priority 2 is a medium priority for long transactions.<br>
* Priority 3 is a low priority for long transactions.
* @param keyData is a string of which the two first characters
* is used to compute which fragement the data is stored in.
* @param type is the type of distribution group.<br>
* 0 means direct usage of the two characters, and<br>
* 1 means the ASCII digit variant.
* @return NdbConnection, or NULL if it failed.
*/
NdbConnection* startTransactionDGroup(Uint32 aPrio,
const char * keyData, int type);
#endif
/** /**
* When a transactions is completed, the transaction has to be closed. * When a transactions is completed, the transaction has to be closed.
* *
...@@ -1586,8 +1536,6 @@ private: ...@@ -1586,8 +1536,6 @@ private:
/****************************************************************************** /******************************************************************************
* These are the private variables in this class. * These are the private variables in this class.
*****************************************************************************/ *****************************************************************************/
Ndb_cluster_connection *m_ndb_cluster_connection;
NdbConnection** thePreparedTransactionsArray; NdbConnection** thePreparedTransactionsArray;
NdbConnection** theSentTransactionsArray; NdbConnection** theSentTransactionsArray;
NdbConnection** theCompletedTransactionsArray; NdbConnection** theCompletedTransactionsArray;
...@@ -1601,8 +1549,6 @@ private: ...@@ -1601,8 +1549,6 @@ private:
Uint32 theNextConnectNode; Uint32 theNextConnectNode;
NdbWaiter theWaiter;
bool fullyQualifiedNames; bool fullyQualifiedNames;
// Ndb database name. // Ndb database name.
...@@ -1658,35 +1604,6 @@ private: ...@@ -1658,35 +1604,6 @@ private:
InitConfigError InitConfigError
} theInitState; } theInitState;
/**
* Computes fragement id for primary key
*
* Note that keydata has to be "shaped" as it is being sent in KEYINFO
*/
Uint32 computeFragmentId(const char * keyData, Uint32 keyLen);
Uint32 getFragmentId(Uint32 hashValue);
/**
* Make a guess to which node is the primary for the fragment
*/
Uint32 guessPrimaryNode(Uint32 fragmentId);
/**
* Structure containing values for guessing primary node
*/
struct StartTransactionNodeSelectionData {
StartTransactionNodeSelectionData():
fragment2PrimaryNodeMap(0) {};
Uint32 kValue;
Uint32 hashValueMask;
Uint32 hashpointerValue;
Uint32 noOfFragments;
Uint32 * fragment2PrimaryNodeMap;
void init(Uint32 noOfNodes, Uint8 nodeIds[]);
void release();
} startTransactionNodeSelectionData;
NdbApiSignal* theCommitAckSignal; NdbApiSignal* theCommitAckSignal;
......
...@@ -18,13 +18,7 @@ ...@@ -18,13 +18,7 @@
#ifndef CLUSTER_CONNECTION_HPP #ifndef CLUSTER_CONNECTION_HPP
#define CLUSTER_CONNECTION_HPP #define CLUSTER_CONNECTION_HPP
class TransporterFacade; struct Ndb_cluster_connection_node_iter;
class ConfigRetriever;
struct NdbThread;
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection { class Ndb_cluster_connection {
public: public:
...@@ -32,16 +26,27 @@ public: ...@@ -32,16 +26,27 @@ public:
~Ndb_cluster_connection(); ~Ndb_cluster_connection();
int connect(int no_retries, int retry_delay_in_seconds, int verbose); int connect(int no_retries, int retry_delay_in_seconds, int verbose);
int start_connect_thread(int (*connect_callback)(void)= 0); int start_connect_thread(int (*connect_callback)(void)= 0);
// add check coupled to init state of cluster connection
// timeout_after_first_alive negative - ok only if all alive
// timeout_after_first_alive positive - ok if some alive
int wait_until_ready(int timeout_for_first_alive,
int timeout_after_first_alive);
const char *get_connectstring(char *buf, int buf_sz) const; const char *get_connectstring(char *buf, int buf_sz) const;
int get_connected_port() const; int get_connected_port() const;
const char *get_connected_host() const; const char *get_connected_host() const;
void set_optimized_node_selection(int val);
Uint32 no_db_nodes();
private: private:
friend void* run_ndb_cluster_connection_connect_thread(void*); friend class Ndb;
void connect_thread(); friend class NdbImpl;
TransporterFacade *m_facade; friend class Ndb_cluster_connection_impl;
ConfigRetriever *m_config_retriever; class Ndb_cluster_connection_impl & m_impl;
NdbThread *m_connect_thread; Ndb_cluster_connection(Ndb_cluster_connection_impl&);
int (*m_connect_callback)(void);
}; };
#endif #endif
...@@ -17,47 +17,62 @@ ...@@ -17,47 +17,62 @@
#ifndef _NDB_OPTS_H #ifndef _NDB_OPTS_H
#define _NDB_OPTS_H #define _NDB_OPTS_H
#include <ndb_global.h>
#include <my_sys.h> #include <my_sys.h>
#include <my_getopt.h> #include <my_getopt.h>
#include <mysql_version.h> #include <mysql_version.h>
#include <ndb_version.h> #include <ndb_version.h>
#ifndef DBUG_OFF #define NDB_STD_OPTS_VARS \
#define NDB_STD_OPTS(prog_name) \ const char *opt_connect_str= 0;\
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", \ my_bool opt_ndb_shm;\
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, \ my_bool opt_ndb_optimized_node_selection
{ "usage", '?', "Display this help and exit.", \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ #define NDB_STD_OPTS_OPTIONS \
{ "help", '?', "Display this help and exit.", \ OPT_NDB_SHM= 256,\
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ OPT_NDB_OPTIMIZED_NODE_SELECTION
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ #define OPT_NDB_CONNECTSTRING 'c'
{ "ndb-connectstring", 'c', \
"Set connect string for connecting to ndb_mgmd. " \ #ifdef NDB_SHM_TRANSPORTER
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \ #define OPT_NDB_SHM_DEFAULT 1
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#else #else
#define NDB_STD_OPTS(prog_name) \ #define OPT_NDB_SHM_DEFAULT 0
#endif
#define NDB_STD_OPTS_COMMON \
{ "usage", '?', "Display this help and exit.", \ { "usage", '?', "Display this help and exit.", \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "help", '?', "Display this help and exit.", \ { "help", '?', "Display this help and exit.", \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \ { "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "ndb-connectstring", 'c', \ { "ndb-connectstring", OPT_NDB_CONNECTSTRING, \
"Set connect string for connecting to ndb_mgmd. " \ "Set connect string for connecting to ndb_mgmd. " \
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \ "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \ "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\ { "ndb-shm", OPT_NDB_SHM,\
"Allow optimizing using shared memory connections when available",\
(gptr*) &opt_ndb_shm, (gptr*) &opt_ndb_shm, 0,\
GET_BOOL, NO_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0 },\
{"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,\
"Select nodes for transactions in a more optimal way",\
(gptr*) &opt_ndb_optimized_node_selection,\
(gptr*) &opt_ndb_optimized_node_selection, 0,\
GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},\
{ "connect-string", OPT_NDB_CONNECTSTRING, "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 } GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#ifndef DBUG_OFF
#define NDB_STD_OPTS(prog_name) \
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", \
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, \
NDB_STD_OPTS_COMMON
#else
#define NDB_STD_OPTS(prog_name) NDB_STD_OPTS_COMMON
#endif #endif
#endif /*_NDB_OPTS_H */ #endif /*_NDB_OPTS_H */
...@@ -46,7 +46,13 @@ extern "C" { ...@@ -46,7 +46,13 @@ extern "C" {
#include <EventLogger.hpp> #include <EventLogger.hpp>
extern EventLogger g_eventLogger; extern EventLogger g_eventLogger;
static const char* opt_connect_str= 0; enum ndbd_options {
NDB_STD_OPTS_OPTIONS,
OPT_INITIAL,
OPT_NODAEMON
};
NDB_STD_OPTS_VARS;
static int _daemon, _no_daemon, _initial, _no_start; static int _daemon, _no_daemon, _initial, _no_start;
/** /**
* Arguments to NDB process * Arguments to NDB process
...@@ -54,7 +60,7 @@ static int _daemon, _no_daemon, _initial, _no_start; ...@@ -54,7 +60,7 @@ static int _daemon, _no_daemon, _initial, _no_start;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
NDB_STD_OPTS("ndbd"), NDB_STD_OPTS("ndbd"),
{ "initial", 256, { "initial", OPT_INITIAL,
"Perform initial start of ndbd, including cleaning the file system. " "Perform initial start of ndbd, including cleaning the file system. "
"Consult documentation before using this", "Consult documentation before using this",
(gptr*) &_initial, (gptr*) &_initial, 0, (gptr*) &_initial, (gptr*) &_initial, 0,
...@@ -66,7 +72,7 @@ static struct my_option my_long_options[] = ...@@ -66,7 +72,7 @@ static struct my_option my_long_options[] =
{ "daemon", 'd', "Start ndbd as daemon (default)", { "daemon", 'd', "Start ndbd as daemon (default)",
(gptr*) &_daemon, (gptr*) &_daemon, 0, (gptr*) &_daemon, (gptr*) &_daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "nodaemon", 257, { "nodaemon", OPT_NODAEMON,
"Do not start ndbd as daemon, provided for testing purposes", "Do not start ndbd as daemon, provided for testing purposes",
(gptr*) &_no_daemon, (gptr*) &_no_daemon, 0, (gptr*) &_no_daemon, (gptr*) &_no_daemon, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
......
...@@ -56,9 +56,13 @@ handler(int sig){ ...@@ -56,9 +56,13 @@ handler(int sig){
} }
} }
enum ndb_mgm_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char default_prompt[]= "ndb_mgm> "; static const char default_prompt[]= "ndb_mgm> ";
static unsigned _try_reconnect; static unsigned _try_reconnect;
static char *opt_connect_str= 0;
static const char *prompt= default_prompt; static const char *prompt= default_prompt;
static char *opt_execute_str= 0; static char *opt_execute_str= 0;
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "InitConfigFileParser.hpp" #include "InitConfigFileParser.hpp"
#include <m_string.h> #include <m_string.h>
extern my_bool opt_ndb_shm;
#define MAX_LINE_LENGTH 255 #define MAX_LINE_LENGTH 255
#define KEY_INTERNAL 0 #define KEY_INTERNAL 0
#define MAX_INT_RNIL 0xfffffeff #define MAX_INT_RNIL 0xfffffeff
...@@ -79,6 +81,7 @@ static bool transformSystem(InitConfigFileParser::Context & ctx, const char *); ...@@ -79,6 +81,7 @@ static bool transformSystem(InitConfigFileParser::Context & ctx, const char *);
static bool transformExternalSystem(InitConfigFileParser::Context & ctx, const char *); static bool transformExternalSystem(InitConfigFileParser::Context & ctx, const char *);
static bool transformNode(InitConfigFileParser::Context & ctx, const char *); static bool transformNode(InitConfigFileParser::Context & ctx, const char *);
static bool transformExtNode(InitConfigFileParser::Context & ctx, const char *); static bool transformExtNode(InitConfigFileParser::Context & ctx, const char *);
static bool checkConnectionSupport(InitConfigFileParser::Context & ctx, const char *);
static bool transformConnection(InitConfigFileParser::Context & ctx, const char *); static bool transformConnection(InitConfigFileParser::Context & ctx, const char *);
static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *); static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *);
static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *); static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *);
...@@ -108,6 +111,11 @@ ConfigInfo::m_SectionRules[] = { ...@@ -108,6 +111,11 @@ ConfigInfo::m_SectionRules[] = {
{ "REP", transformNode, 0 }, { "REP", transformNode, 0 },
{ "EXTERNAL REP", transformExtNode, 0 }, { "EXTERNAL REP", transformExtNode, 0 },
{ "TCP", checkConnectionSupport, 0 },
{ "SHM", checkConnectionSupport, 0 },
{ "SCI", checkConnectionSupport, 0 },
{ "OSE", checkConnectionSupport, 0 },
{ "TCP", transformConnection, 0 }, { "TCP", transformConnection, 0 },
{ "SHM", transformConnection, 0 }, { "SHM", transformConnection, 0 },
{ "SCI", transformConnection, 0 }, { "SCI", transformConnection, 0 },
...@@ -130,6 +138,8 @@ ConfigInfo::m_SectionRules[] = { ...@@ -130,6 +138,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName1" }, { "TCP", fixHostname, "HostName1" },
{ "TCP", fixHostname, "HostName2" }, { "TCP", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" },
{ "SHM", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" }, { "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" }, { "SCI", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" }, { "SHM", fixHostname, "HostName1" },
...@@ -197,6 +207,9 @@ static bool sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -197,6 +207,9 @@ static bool sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections,
static bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, static bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * rule_data); const char * rule_data);
static bool set_connection_priorities(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections, static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * rule_data); const char * rule_data);
...@@ -208,6 +221,7 @@ const ConfigInfo::ConfigRule ...@@ -208,6 +221,7 @@ const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = { ConfigInfo::m_ConfigRules[] = {
{ sanity_checks, 0 }, { sanity_checks, 0 },
{ add_node_connections, 0 }, { add_node_connections, 0 },
{ set_connection_priorities, 0 },
{ add_server_ports, 0 }, { add_server_ports, 0 },
{ check_node_vs_replicas, 0 }, { check_node_vs_replicas, 0 },
{ 0, 0 } { 0, 0 }
...@@ -1582,6 +1596,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1582,6 +1596,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
MANDATORY, MANDATORY,
0, 0 }, 0, 0 },
{
CFG_CONNECTION_GROUP,
"Group",
"TCP",
"",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"55",
"0", "200" },
{ {
CFG_CONNECTION_SEND_SIGNAL_ID, CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId", "SendSignalId",
...@@ -1747,6 +1772,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1747,6 +1772,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
MANDATORY, MANDATORY,
0, 0 }, 0, 0 },
{
CFG_CONNECTION_GROUP,
"Group",
"SHM",
"",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"35",
"0", "200" },
{ {
CFG_CONNECTION_SEND_SIGNAL_ID, CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId", "SendSignalId",
...@@ -1780,7 +1816,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1780,7 +1816,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_USED, ConfigInfo::CI_USED,
false, false,
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
MANDATORY, "0",
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
...@@ -1857,6 +1893,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1857,6 +1893,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{
CFG_CONNECTION_GROUP,
"Group",
"SCI",
"",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"15",
"0", "200" },
{ {
CFG_CONNECTION_HOSTNAME_1, CFG_CONNECTION_HOSTNAME_1,
"HostName1", "HostName1",
...@@ -2681,11 +2728,50 @@ transformExtNode(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -2681,11 +2728,50 @@ transformExtNode(InitConfigFileParser::Context & ctx, const char * data){
} }
/** /**
* Connection rule: Update "NoOfConnections" * Connection rule: Check support of connection
*/ */
bool bool
transformConnection(InitConfigFileParser::Context & ctx, const char * data){ checkConnectionSupport(InitConfigFileParser::Context & ctx, const char * data)
{
int error= 0;
if (strcasecmp("TCP",ctx.fname) == 0)
{
// always enabled
}
else if (strcasecmp("SHM",ctx.fname) == 0)
{
#ifndef NDB_SHM_TRANSPORTER
error= 1;
#endif
}
else if (strcasecmp("SCI",ctx.fname) == 0)
{
#ifndef NDB_SCI_TRANSPORTER
error= 1;
#endif
}
else if (strcasecmp("OSE",ctx.fname) == 0)
{
#ifndef NDB_OSE_TRANSPORTER
error= 1;
#endif
}
if (error)
{
ctx.reportError("Binary not compiled with this connection support, "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
}
return true;
}
/**
* Connection rule: Update "NoOfConnections"
*/
bool
transformConnection(InitConfigFileParser::Context & ctx, const char * data)
{
Uint32 connections = 0; Uint32 connections = 0;
ctx.m_userProperties.get("NoOfConnections", &connections); ctx.m_userProperties.get("NoOfConnections", &connections);
BaseString::snprintf(ctx.pname, sizeof(ctx.pname), "Connection_%d", connections); BaseString::snprintf(ctx.pname, sizeof(ctx.pname), "Connection_%d", connections);
...@@ -3398,11 +3484,51 @@ sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3398,11 +3484,51 @@ sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections,
return true; return true;
} }
static void
add_a_connection(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
Uint32 nodeId1, Uint32 nodeId2, bool use_shm)
{
ConfigInfo::ConfigRuleSection s;
const char *hostname1= 0, *hostname2= 0;
const Properties *tmp;
require(ctx.m_config->get("Node", nodeId1, &tmp));
tmp->get("HostName", &hostname1);
require(ctx.m_config->get("Node", nodeId2, &tmp));
tmp->get("HostName", &hostname2);
char buf[16];
s.m_sectionData= new Properties(true);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
s.m_sectionData->put("NodeId1", buf);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
s.m_sectionData->put("NodeId2", buf);
if (use_shm &&
hostname1 && hostname1[0] &&
hostname2 && hostname2[0] &&
strcmp(hostname1,hostname2) == 0)
{
s.m_sectionType= BaseString("SHM");
DBUG_PRINT("info",("adding SHM connection %d %d",nodeId1,nodeId2));
}
else
{
s.m_sectionType= BaseString("TCP");
DBUG_PRINT("info",("adding TCP connection %d %d",nodeId1,nodeId2));
}
sections.push_back(s);
}
static bool static bool
add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * rule_data) const char * rule_data)
{ {
DBUG_ENTER("add_node_connections");
Uint32 i; Uint32 i;
Properties * props= ctx.m_config; Properties * props= ctx.m_config;
Properties p_connections(true); Properties p_connections(true);
...@@ -3427,9 +3553,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3427,9 +3553,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
ctx.m_userProperties.get("NoOfNodes", &nNodes); ctx.m_userProperties.get("NoOfNodes", &nNodes);
Properties p_db_nodes(true); Properties p_db_nodes(true);
Properties p_api_mgm_nodes(true); Properties p_api_nodes(true);
Properties p_mgm_nodes(true);
Uint32 i_db= 0, i_api_mgm= 0, n; Uint32 i_db= 0, i_api= 0, i_mgm= 0, n;
for (i= 0, n= 0; n < nNodes; i++){ for (i= 0, n= 0; n < nNodes; i++){
const Properties * tmp; const Properties * tmp;
if(!props->get("Node", i, &tmp)) continue; if(!props->get("Node", i, &tmp)) continue;
...@@ -3440,9 +3567,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3440,9 +3567,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
if (strcmp(type,DB_TOKEN) == 0) if (strcmp(type,DB_TOKEN) == 0)
p_db_nodes.put("", i_db++, i); p_db_nodes.put("", i_db++, i);
else if (strcmp(type,API_TOKEN) == 0 || else if (strcmp(type,API_TOKEN) == 0)
strcmp(type,MGM_TOKEN) == 0) p_api_nodes.put("", i_api++, i);
p_api_mgm_nodes.put("", i_api_mgm++, i); else if (strcmp(type,MGM_TOKEN) == 0)
p_mgm_nodes.put("", i_mgm++, i);
} }
Uint32 nodeId1, nodeId2, dummy; Uint32 nodeId1, nodeId2, dummy;
...@@ -3451,39 +3579,39 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3451,39 +3579,39 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
for (Uint32 j= i+1;; j++){ for (Uint32 j= i+1;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break; if(!p_db_nodes.get("", j, &nodeId2)) break;
if(!p_connections2.get("", nodeId1+nodeId2<<16, &dummy)) { if(!p_connections2.get("", nodeId1+nodeId2<<16, &dummy)) {
ConfigInfo::ConfigRuleSection s; add_a_connection(sections,ctx,nodeId1,nodeId2,opt_ndb_shm);
s.m_sectionType= BaseString("TCP");
s.m_sectionData= new Properties(true);
char buf[16];
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
s.m_sectionData->put("NodeId1", buf);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
s.m_sectionData->put("NodeId2", buf);
sections.push_back(s);
} }
} }
} }
for (i= 0; p_api_mgm_nodes.get("", i, &nodeId1); i++){ for (i= 0; p_api_nodes.get("", i, &nodeId1); i++){
if(!p_connections.get("", nodeId1, &dummy)) { if(!p_connections.get("", nodeId1, &dummy)) {
for (Uint32 j= 0;; j++){ for (Uint32 j= 0;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break; if(!p_db_nodes.get("", j, &nodeId2)) break;
ConfigInfo::ConfigRuleSection s; add_a_connection(sections,ctx,nodeId1,nodeId2,opt_ndb_shm);
s.m_sectionType= BaseString("TCP");
s.m_sectionData= new Properties(true);
char buf[16];
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
s.m_sectionData->put("NodeId1", buf);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
s.m_sectionData->put("NodeId2", buf);
sections.push_back(s);
} }
} }
} }
return true; for (i= 0; p_mgm_nodes.get("", i, &nodeId1); i++){
if(!p_connections.get("", nodeId1, &dummy)) {
for (Uint32 j= 0;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break;
add_a_connection(sections,ctx,nodeId1,nodeId2,0);
}
}
}
DBUG_RETURN(true);
} }
static bool set_connection_priorities(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data)
{
DBUG_ENTER("set_connection_priorities");
DBUG_RETURN(true);
}
static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections, static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
......
...@@ -2225,9 +2225,24 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2225,9 +2225,24 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (*nodeId != 0 || if (*nodeId != 0 ||
type != NDB_MGM_NODE_TYPE_MGM || type != NDB_MGM_NODE_TYPE_MGM ||
no_mgm == 1) { // any match is ok no_mgm == 1) { // any match is ok
if (config_hostname == 0 &&
*nodeId == 0 &&
type != NDB_MGM_NODE_TYPE_MGM)
{
if (!id_found) // only set if not set earlier
id_found= tmp;
continue; /* continue looking for a nodeid with specified
* hostname
*/
}
assert(id_found == 0);
id_found= tmp; id_found= tmp;
break; break;
} }
assert(no_mgm > 1);
assert(*nodeId != 0);
assert(type != NDB_MGM_NODE_TYPE_MGM);
if (id_found) { // mgmt server may only have one match if (id_found) { // mgmt server may only have one match
error_string.appfmt("Ambiguous node id's %d and %d.\n" error_string.appfmt("Ambiguous node id's %d and %d.\n"
"Suggest specifying node id in connectstring,\n" "Suggest specifying node id in connectstring,\n"
......
...@@ -89,50 +89,50 @@ bool g_StopServer; ...@@ -89,50 +89,50 @@ bool g_StopServer;
extern EventLogger g_EventLogger; extern EventLogger g_EventLogger;
extern int global_mgmt_server_check; extern int global_mgmt_server_check;
static char *opt_connect_str= 0;
enum ndb_mgmd_options {
NDB_STD_OPTS_OPTIONS,
OPT_INTERACTIVE,
OPT_NO_NODEID_CHECKS,
OPT_NO_DAEMON
};
NDB_STD_OPTS_VARS;
#if NDB_VERSION_MAJOR <= 4
#undef OPT_NDB_CONNECTSTRING
#define OPT_NDB_CONNECTSTRING 1023
#else
#endif
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
#ifndef DBUG_OFF NDB_STD_OPTS("ndb_mgmd"),
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 },
#endif
{ "usage", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "version", 'V', "Output version information and exit.", 0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "ndb-connectstring", 1023,
"Set connect string for connecting to ndb_mgmd. "
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". "
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "connect-string", 1023,
"same as --ndb-connectstring.",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "config-file", 'f', "Specify cluster configuration file", { "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, (gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)", { "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0, (gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "interactive", 256, "Run interactive. Not supported but provided for testing purposes", { "interactive", OPT_INTERACTIVE,
"Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0, (gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-nodeid-checks", 257, "Do not provide any node id checks", { "no-nodeid-checks", OPT_NO_NODEID_CHECKS,
"Do not provide any node id checks",
(gptr*) &g_no_nodeid_checks, (gptr*) &g_no_nodeid_checks, 0, (gptr*) &g_no_nodeid_checks, (gptr*) &g_no_nodeid_checks, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "nodaemon", 258, "Don't run as daemon, but don't read from stdin", { "nodaemon", OPT_NO_DAEMON,
"Don't run as daemon, but don't read from stdin",
(gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0, (gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
#if NDB_VERSION_MAJOR <= 4
{ "config-file", 'c', { "config-file", 'c',
"-c provided for backwards compatability, will be removed in 5.0." "-c provided for backwards compatability, will be removed in 5.0."
" Use -f instead", " Use -f instead",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, (gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
#endif
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
static void short_usage_sub(void) static void short_usage_sub(void)
...@@ -164,6 +164,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -164,6 +164,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
case 'c': case 'c':
printf("Warning: -c will be removed in 5.0, use -f instead\n"); printf("Warning: -c will be removed in 5.0, use -f instead\n");
break; break;
case OPT_NDB_SHM:
#ifndef NDB_SHM_TRANSPORTER
printf("Warning: binary not compiled with shared memory support,\n"
"use configure option --with-ndb-shm to enable support.\n"
"Tcp connections will now be used instead\n");
opt_ndb_shm= 0;
#endif
break;
case '?': case '?':
usage(); usage();
exit(0); exit(0);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <UtilBuffer.hpp> #include <UtilBuffer.hpp>
#include <NdbDictionary.hpp> #include <NdbDictionary.hpp>
#include <Ndb.hpp> #include <Ndb.hpp>
#include <NdbCondition.h>
#include "NdbLinHash.hpp" #include "NdbLinHash.hpp"
class Ndb_local_table_info { class Ndb_local_table_info {
......
...@@ -46,7 +46,6 @@ Connect to any node which has no connection at the moment. ...@@ -46,7 +46,6 @@ Connect to any node which has no connection at the moment.
NdbConnection* Ndb::doConnect(Uint32 tConNode) NdbConnection* Ndb::doConnect(Uint32 tConNode)
{ {
Uint32 tNode; Uint32 tNode;
Uint32 i = 0;;
Uint32 tAnyAlive = 0; Uint32 tAnyAlive = 0;
int TretCode; int TretCode;
...@@ -65,26 +64,51 @@ NdbConnection* Ndb::doConnect(Uint32 tConNode) ...@@ -65,26 +64,51 @@ NdbConnection* Ndb::doConnect(Uint32 tConNode)
// We will connect to any node. Make sure that we have connections to all // We will connect to any node. Make sure that we have connections to all
// nodes. // nodes.
//**************************************************************************** //****************************************************************************
Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes; if (theImpl->m_optimized_node_selection)
Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex; {
UintR Tcount = 0; Ndb_cluster_connection_node_iter &node_iter=
do { theImpl->m_node_iter;
theCurrentConnectIndex++; theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
if (theCurrentConnectIndex >= tNoOfDbNodes) { while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
theCurrentConnectIndex = 0; {
}//if TretCode= NDB_connect(tNode);
Tcount++; if ((TretCode == 1) ||
tNode = theImpl->theDBnodes[theCurrentConnectIndex]; (TretCode == 2))
TretCode = NDB_connect(tNode); {
if ((TretCode == 1) || (TretCode == 2)) {
//**************************************************************************** //****************************************************************************
// We have connections now to the desired node. Return // We have connections now to the desired node. Return
//**************************************************************************** //****************************************************************************
return getConnectedNdbConnection(tNode); return getConnectedNdbConnection(tNode);
} else if (TretCode != 0) { } else if (TretCode != 0) {
tAnyAlive = 1; tAnyAlive= 1;
}//if }//if
} while (Tcount < tNoOfDbNodes); }
}
else // just do a regular round robin
{
Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
UintR Tcount = 0;
do {
theCurrentConnectIndex++;
if (theCurrentConnectIndex >= tNoOfDbNodes)
theCurrentConnectIndex = 0;
Tcount++;
tNode= theImpl->theDBnodes[theCurrentConnectIndex];
TretCode= NDB_connect(tNode);
if ((TretCode == 1) ||
(TretCode == 2))
{
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
return getConnectedNdbConnection(tNode);
} else if (TretCode != 0) {
tAnyAlive= 1;
}//if
} while (Tcount < tNoOfDbNodes);
}
//**************************************************************************** //****************************************************************************
// We were unable to find a free connection. If no node alive we will report // We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure. // error code for cluster failure otherwise connection failure.
...@@ -149,8 +173,8 @@ Ndb::NDB_connect(Uint32 tNode) ...@@ -149,8 +173,8 @@ Ndb::NDB_connect(Uint32 tNode)
tReturnCode = tp->sendSignal(tSignal, tNode); tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal); releaseSignal(tSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_node = tNode; theImpl->theWaiter.m_node = tNode;
theWaiter.m_state = WAIT_TC_SEIZE; theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse(); tReturnCode = receiveResponse();
}//if }//if
} else { } else {
...@@ -243,50 +267,28 @@ Ndb::waitUntilReady(int timeout) ...@@ -243,50 +267,28 @@ Ndb::waitUntilReady(int timeout)
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
do { while (theNode == 0) {
if ((id = theNode) != 0) {
unsigned int foundAliveNode = 0;
TransporterFacade *tp = TransporterFacade::instance();
tp->lock_mutex();
for (unsigned int i = 0; i < theImpl->theNoOfDBnodes; i++) {
const NodeId nodeId = theImpl->theDBnodes[i];
//************************************************
// If any node is answering, ndb is answering
//************************************************
if (tp->get_node_alive(nodeId) != 0) {
foundAliveNode++;
}//if
}//for
tp->unlock_mutex();
if (foundAliveNode == theImpl->theNoOfDBnodes) {
DBUG_RETURN(0);
}//if
if (foundAliveNode > 0) {
noChecksSinceFirstAliveFound++;
}//if
if (noChecksSinceFirstAliveFound > 30) {
DBUG_RETURN(0);
}//if
}//if theNode != 0
if (secondsCounter >= timeout) if (secondsCounter >= timeout)
break; {
theError.code = 4269;
DBUG_RETURN(-1);
}
NdbSleep_MilliSleep(100); NdbSleep_MilliSleep(100);
milliCounter += 100; milliCounter += 100;
if (milliCounter >= 1000) { if (milliCounter >= 1000) {
secondsCounter++; secondsCounter++;
milliCounter = 0; milliCounter = 0;
}//if }//if
} while (1); }
if (id == 0) {
theError.code = 4269; if (theImpl->m_ndb_cluster_connection.wait_until_ready
(timeout-secondsCounter,30))
{
theError.code = 4009;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (noChecksSinceFirstAliveFound > 0) {
DBUG_RETURN(0); DBUG_RETURN(0);
}//if
theError.code = 4009;
DBUG_RETURN(-1);
} }
/***************************************************************************** /*****************************************************************************
...@@ -311,8 +313,8 @@ Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen) ...@@ -311,8 +313,8 @@ Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen)
*/ */
Uint32 nodeId; Uint32 nodeId;
if(keyData != 0) { if(keyData != 0) {
Uint32 fragmentId = computeFragmentId(keyData, keyLen); nodeId = 0; // guess not supported
nodeId = guessPrimaryNode(fragmentId); // nodeId = m_ndb_cluster_connection->guess_primary_node(keyData, keyLen);
} else { } else {
nodeId = 0; nodeId = 0;
}//if }//if
...@@ -373,44 +375,6 @@ Ndb::hupp(NdbConnection* pBuddyTrans) ...@@ -373,44 +375,6 @@ Ndb::hupp(NdbConnection* pBuddyTrans)
}//if }//if
}//Ndb::hupp() }//Ndb::hupp()
NdbConnection*
Ndb::startTransactionDGroup(Uint32 aPriority, const char * keyData, int type)
{
char DGroup[4];
if ((keyData == NULL) ||
(type > 1)) {
theError.code = 4118;
return NULL;
}//if
if (theInitState == Initialised) {
theError.code = 0;
checkFailedNode();
/**
* If the user supplied key data
* We will make a qualified quess to which node is the primary for the
* the fragment and contact that node
*/
Uint32 fragmentId;
if (type == 0) {
DGroup[0] = keyData[0];
DGroup[1] = keyData[1];
DGroup[2] = 0x30;
DGroup[3] = 0x30;
fragmentId = computeFragmentId(&DGroup[0], 4);
} else {
Uint32 hashValue = ((keyData[0] - 0x30) * 10) + (keyData[1] - 0x30);
fragmentId = getFragmentId(hashValue);
}//if
Uint32 nodeId = guessPrimaryNode(fragmentId);
NdbConnection* trans= startTransactionLocal(aPriority, nodeId);
DBUG_PRINT("exit", ("start DGroup trans: 0x%x transid: 0x%llx",
trans, trans ? trans->getTransactionId() : 0));
return trans;
} else {
return NULL;
}//if
}//Ndb::startTransaction()
NdbConnection* NdbConnection*
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId) Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
...@@ -1010,118 +974,6 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -1010,118 +974,6 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
return ~0; return ~0;
} }
static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
static const Uint32 MAX_KEY_LEN_32_WORDS = 8;
static const Uint32 MAX_KEY_LEN_BYTES = 32;
Uint32
Ndb::computeFragmentId(const char * keyData, Uint32 keyLen)
{
Uint64 tempData[MAX_KEY_LEN_64_WORDS];
const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
const char * usedKeyData = 0;
/**
* If key data buffer is not aligned (on 64 bit boundary)
* or key len is not a multiple of 4
* Use temp data
*/
if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
usedKeyData = keyData;
} else {
memcpy(&tempData[0], keyData, keyLen);
const int slack = keyLen & 3;
if(slack > 0) {
memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
}//if
usedKeyData = (char *)&tempData[0];
}//if
Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
hashValue >>= startTransactionNodeSelectionData.kValue;
return getFragmentId(hashValue);
}//Ndb::computeFragmentId()
Uint32
Ndb::getFragmentId(Uint32 hashValue)
{
Uint32 fragmentId = hashValue &
startTransactionNodeSelectionData.hashValueMask;
if(fragmentId < startTransactionNodeSelectionData.hashpointerValue) {
fragmentId = hashValue &
((startTransactionNodeSelectionData.hashValueMask << 1) + 1);
}//if
return fragmentId;
}
Uint32
Ndb::guessPrimaryNode(Uint32 fragmentId){
//ASSERT(((fragmentId > 0) && fragmentId <
// startTransactionNodeSelectionData.noOfFragments), "Invalid fragementId");
return startTransactionNodeSelectionData.fragment2PrimaryNodeMap[fragmentId];
}
void
Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
Uint8 nodeIds[]) {
kValue = 6;
noOfFragments = 2 * noOfNodes;
/**
* Compute hashValueMask and hashpointerValue
*/
{
Uint32 topBit = (1 << 31);
for(int i = 31; i>=0; i--){
if((noOfFragments & topBit) != 0)
break;
topBit >>= 1;
}
hashValueMask = topBit - 1;
hashpointerValue = noOfFragments - (hashValueMask + 1);
}
/**
* This initialization depends on
* the fact that:
* primary node for fragment i = i % noOfNodes
*
* This algorithm should be implemented in Dbdih
*/
{
if (fragment2PrimaryNodeMap != 0)
abort();
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i;
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i] = nodeIds[i];
}
// Sort them (bubble sort)
for(i = 0; i<noOfNodes-1; i++)
for(Uint32 j = i+1; j<noOfNodes; j++)
if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
Uint32 tmp = fragment2PrimaryNodeMap[i];
fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
fragment2PrimaryNodeMap[j] = tmp;
}
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
}
}
}
void
Ndb::StartTransactionNodeSelectionData::release(){
delete [] fragment2PrimaryNodeMap;
fragment2PrimaryNodeMap = 0;
}
Uint32 Uint32
convertEndian(Uint32 Data) convertEndian(Uint32 Data)
{ {
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include <Bitmask.hpp> #include <Bitmask.hpp>
#include <AttributeList.hpp> #include <AttributeList.hpp>
#include <Ndb.hpp> #include <Ndb.hpp>
#include "NdbImpl.hpp" #include "NdbWaiter.hpp"
#include "DictCache.hpp" #include "DictCache.hpp"
class NdbDictObjectImpl { class NdbDictObjectImpl {
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
#ifndef NDB_IMPL_HPP #ifndef NDB_IMPL_HPP
#define NDB_IMPL_HPP #define NDB_IMPL_HPP
#include <ndb_global.h>
#include <Ndb.hpp> #include <Ndb.hpp>
#include <NdbOut.hpp>
#include <NdbError.hpp> #include <NdbError.hpp>
#include <NdbCondition.h> #include <NdbCondition.h>
#include <NdbReceiver.hpp> #include <NdbReceiver.hpp>
...@@ -26,6 +28,8 @@ ...@@ -26,6 +28,8 @@
#include <NdbTick.h> #include <NdbTick.h>
#include "ndb_cluster_connection_impl.hpp"
#include "NdbDictionaryImpl.hpp"
#include "ObjectMap.hpp" #include "ObjectMap.hpp"
/** /**
...@@ -33,11 +37,16 @@ ...@@ -33,11 +37,16 @@
*/ */
class NdbImpl { class NdbImpl {
public: public:
NdbImpl(); NdbImpl(Ndb_cluster_connection *, Ndb&);
~NdbImpl(); ~NdbImpl();
Ndb_cluster_connection_impl &m_ndb_cluster_connection;
NdbDictionaryImpl m_dictionary;
// Ensure good distribution of connects // Ensure good distribution of connects
Uint32 theCurrentConnectIndex; Uint32 theCurrentConnectIndex;
Ndb_cluster_connection_node_iter m_node_iter;
NdbObjectIdMap theNdbObjectIdMap; NdbObjectIdMap theNdbObjectIdMap;
...@@ -46,6 +55,10 @@ public: ...@@ -46,6 +55,10 @@ public:
// 1 indicates to release all connections to node // 1 indicates to release all connections to node
Uint32 the_release_ind[MAX_NDB_NODES]; Uint32 the_release_ind[MAX_NDB_NODES];
NdbWaiter theWaiter;
int m_optimized_node_selection;
}; };
#ifdef VM_TRACE #ifdef VM_TRACE
...@@ -113,26 +126,6 @@ Ndb::checkInitState() ...@@ -113,26 +126,6 @@ Ndb::checkInitState()
Uint32 convertEndian(Uint32 Data); Uint32 convertEndian(Uint32 Data);
enum WaitSignalType {
NO_WAIT = 0,
WAIT_NODE_FAILURE = 1, // Node failure during wait
WST_WAIT_TIMEOUT = 2, // Timeout during wait
WAIT_TC_SEIZE = 3,
WAIT_TC_RELEASE = 4,
WAIT_NDB_TAMPER = 5,
WAIT_SCAN = 6,
// DICT stuff
WAIT_GET_TAB_INFO_REQ = 11,
WAIT_CREATE_TAB_REQ = 12,
WAIT_DROP_TAB_REQ = 13,
WAIT_ALTER_TAB_REQ = 14,
WAIT_CREATE_INDX_REQ = 15,
WAIT_DROP_INDX_REQ = 16,
WAIT_LIST_TABLES_CONF = 17
};
enum LockMode { enum LockMode {
Read, Read,
Update, Update,
...@@ -140,44 +133,4 @@ enum LockMode { ...@@ -140,44 +133,4 @@ enum LockMode {
Delete Delete
}; };
#include <NdbOut.hpp>
inline
void
NdbWaiter::wait(int waitTime)
{
const bool forever = (waitTime == -1);
const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
while (1) {
if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
break;
if (forever) {
NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
} else {
if (waitTime <= 0) {
m_state = WST_WAIT_TIMEOUT;
break;
}
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
}
inline
void
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
NdbCondition_Signal(m_condition);
}
}
inline
void
NdbWaiter::signal(Uint32 state){
m_state = state;
NdbCondition_Signal(m_condition);
}
#endif #endif
...@@ -528,8 +528,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) ...@@ -528,8 +528,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
/** /**
* No completed... * No completed...
*/ */
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
...@@ -1358,8 +1358,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, ...@@ -1358,8 +1358,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
Uint32 tmp = m_sent_receivers_count; Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver; s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){ while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
...@@ -1506,8 +1506,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1506,8 +1506,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/ */
while(theError.code == 0 && m_sent_receivers_count) while(theError.code == 0 && m_sent_receivers_count)
{ {
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
...@@ -1576,8 +1576,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1576,8 +1576,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/ */
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{ {
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef NDB_WAITER_HPP
#define NDB_WAITER_HPP
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <NdbError.hpp>
#include <NdbCondition.h>
#include <NdbReceiver.hpp>
#include <NdbOperation.hpp>
#include <kernel/ndb_limits.h>
#include <NdbTick.h>
enum WaitSignalType {
NO_WAIT = 0,
WAIT_NODE_FAILURE = 1, // Node failure during wait
WST_WAIT_TIMEOUT = 2, // Timeout during wait
WAIT_TC_SEIZE = 3,
WAIT_TC_RELEASE = 4,
WAIT_NDB_TAMPER = 5,
WAIT_SCAN = 6,
// DICT stuff
WAIT_GET_TAB_INFO_REQ = 11,
WAIT_CREATE_TAB_REQ = 12,
WAIT_DROP_TAB_REQ = 13,
WAIT_ALTER_TAB_REQ = 14,
WAIT_CREATE_INDX_REQ = 15,
WAIT_DROP_INDX_REQ = 16,
WAIT_LIST_TABLES_CONF = 17
};
class NdbWaiter {
public:
NdbWaiter();
~NdbWaiter();
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
struct NdbCondition * m_condition;
};
inline
void
NdbWaiter::wait(int waitTime)
{
const bool forever = (waitTime == -1);
const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
while (1) {
if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
break;
if (forever) {
NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
} else {
if (waitTime <= 0) {
m_state = WST_WAIT_TIMEOUT;
break;
}
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
}
inline
void
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
NdbCondition_Signal(m_condition);
}
}
inline
void
NdbWaiter::signal(Uint32 state){
m_state = state;
NdbCondition_Signal(m_condition);
}
#endif
...@@ -209,8 +209,6 @@ void Ndb::connected(Uint32 ref) ...@@ -209,8 +209,6 @@ void Ndb::connected(Uint32 ref)
tmpTheNode, tmpTheNode,
theImpl->theNoOfDBnodes, theImpl->theNoOfDBnodes,
theFirstTransId)); theFirstTransId));
startTransactionNodeSelectionData.init(theImpl->theNoOfDBnodes,
theImpl->theDBnodes);
theCommitAckSignal = new NdbApiSignal(theMyRef); theCommitAckSignal = new NdbApiSignal(theMyRef);
theDictionary->m_receiver.m_reference= theMyRef; theDictionary->m_receiver.m_reference= theMyRef;
...@@ -251,7 +249,7 @@ Ndb::report_node_failure(Uint32 node_id) ...@@ -251,7 +249,7 @@ Ndb::report_node_failure(Uint32 node_id)
theImpl->the_release_ind[node_id] = 1; theImpl->the_release_ind[node_id] = 1;
// must come after // must come after
theImpl->the_release_ind[0] = 1; theImpl->the_release_ind[0] = 1;
theWaiter.nodeFail(node_id); theImpl->theWaiter.nodeFail(node_id);
return; return;
}//Ndb::report_node_failure() }//Ndb::report_node_failure()
...@@ -330,7 +328,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -330,7 +328,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
NdbConnection* tCon; NdbConnection* tCon;
int tReturnCode = -1; int tReturnCode = -1;
const Uint32* tDataPtr = aSignal->getDataPtr(); const Uint32* tDataPtr = aSignal->getDataPtr();
const Uint32 tWaitState = theWaiter.m_state; const Uint32 tWaitState = theImpl->theWaiter.m_state;
const Uint32 tSignalNumber = aSignal->readSignalNumber(); const Uint32 tSignalNumber = aSignal->readSignalNumber();
const Uint32 tFirstData = *tDataPtr; const Uint32 tFirstData = *tDataPtr;
const Uint32 tLen = aSignal->getLength(); const Uint32 tLen = aSignal->getLength();
...@@ -401,7 +399,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -401,7 +399,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
break; break;
case NdbReceiver::NDB_SCANRECEIVER: case NdbReceiver::NDB_SCANRECEIVER:
tCon->theScanningOp->receiver_delivered(tRec); tCon->theScanningOp->receiver_delivered(tRec);
theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ? theImpl->theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
(Uint32) NO_WAIT : tWaitState); (Uint32) NO_WAIT : tWaitState);
break; break;
default: default:
...@@ -598,7 +596,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -598,7 +596,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCSEIZECONF(aSignal); tReturnCode = tCon->receiveTCSEIZECONF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} else { } else {
goto InvalidSignal; goto InvalidSignal;
}//if }//if
...@@ -618,7 +616,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -618,7 +616,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCSEIZEREF(aSignal); tReturnCode = tCon->receiveTCSEIZEREF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} else { } else {
return; return;
}//if }//if
...@@ -638,7 +636,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -638,7 +636,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCRELEASECONF(aSignal); tReturnCode = tCon->receiveTCRELEASECONF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
}//if }//if
break; break;
} }
...@@ -656,7 +654,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -656,7 +654,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCRELEASEREF(aSignal); tReturnCode = tCon->receiveTCRELEASEREF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
}//if }//if
break; break;
} }
...@@ -708,7 +706,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -708,7 +706,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
return; return;
tReturnCode = tCon->receiveDIHNDBTAMPER(aSignal); tReturnCode = tCon->receiveDIHNDBTAMPER(aSignal);
if (tReturnCode != -1) if (tReturnCode != -1)
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
break; break;
} }
case GSN_SCAN_TABCONF: case GSN_SCAN_TABCONF:
...@@ -730,7 +728,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -730,7 +728,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tLen - ScanTabConf::SignalLength); tLen - ScanTabConf::SignalLength);
} }
if (tReturnCode != -1 && tWaitState == WAIT_SCAN) if (tReturnCode != -1 && tWaitState == WAIT_SCAN)
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
break; break;
} else { } else {
goto InvalidSignal; goto InvalidSignal;
...@@ -749,7 +747,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -749,7 +747,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
if (tCon->checkMagicNumber() == 0){ if (tCon->checkMagicNumber() == 0){
tReturnCode = tCon->receiveSCAN_TABREF(aSignal); tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
if (tReturnCode != -1 && tWaitState == WAIT_SCAN){ if (tReturnCode != -1 && tWaitState == WAIT_SCAN){
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} }
break; break;
} }
...@@ -774,7 +772,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -774,7 +772,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
switch(com){ switch(com){
case 1: case 1:
tCon->theScanningOp->receiver_delivered(tRec); tCon->theScanningOp->receiver_delivered(tRec);
theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ? theImpl->theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
(Uint32) NO_WAIT : tWaitState); (Uint32) NO_WAIT : tWaitState);
break; break;
case 0: case 0:
...@@ -838,16 +836,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -838,16 +836,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
goto InvalidSignal; goto InvalidSignal;
}//switch }//switch
if (theWaiter.m_state == NO_WAIT) { if (theImpl->theWaiter.m_state == NO_WAIT) {
// Wake up the thread waiting for response // Wake up the thread waiting for response
NdbCondition_Signal(theWaiter.m_condition); NdbCondition_Signal(theImpl->theWaiter.m_condition);
}//if }//if
return; return;
InvalidSignal: InvalidSignal:
#ifdef VM_TRACE #ifdef VM_TRACE
ndbout_c("Ndbif: Error Ndb::handleReceivedSignal " ndbout_c("Ndbif: Error Ndb::handleReceivedSignal "
"(GSN=%d, theWaiter.m_state=%d)" "(GSN=%d, theImpl->theWaiter.m_state=%d)"
" sender = (Block: %d Node: %d)", " sender = (Block: %d Node: %d)",
tSignalNumber, tSignalNumber,
tWaitState, tWaitState,
...@@ -895,7 +893,7 @@ Ndb::completedTransaction(NdbConnection* aCon) ...@@ -895,7 +893,7 @@ Ndb::completedTransaction(NdbConnection* aCon)
if ((theMinNoOfEventsToWakeUp != 0) && if ((theMinNoOfEventsToWakeUp != 0) &&
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) { (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
theMinNoOfEventsToWakeUp = 0; theMinNoOfEventsToWakeUp = 0;
NdbCondition_Signal(theWaiter.m_condition); NdbCondition_Signal(theImpl->theWaiter.m_condition);
return; return;
}//if }//if
} else { } else {
...@@ -1155,9 +1153,9 @@ void ...@@ -1155,9 +1153,9 @@ void
Ndb::waitCompletedTransactions(int aMilliSecondsToWait, Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
int noOfEventsToWaitFor) int noOfEventsToWaitFor)
{ {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
/** /**
* theWaiter.m_state = NO_WAIT; * theImpl->theWaiter.m_state = NO_WAIT;
* To ensure no messup with synchronous node fail handling * To ensure no messup with synchronous node fail handling
* (see ReportFailure) * (see ReportFailure)
*/ */
...@@ -1166,8 +1164,8 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait, ...@@ -1166,8 +1164,8 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
do { do {
if (waitTime < 1000) waitTime = 1000; if (waitTime < 1000) waitTime = 1000;
NdbCondition_WaitTimeout(theWaiter.m_condition, NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
(NdbMutex*)theWaiter.m_mutex, (NdbMutex*)theImpl->theWaiter.m_mutex,
waitTime); waitTime);
if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) { if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
break; break;
...@@ -1273,23 +1271,23 @@ Ndb::receiveResponse(int waitTime){ ...@@ -1273,23 +1271,23 @@ Ndb::receiveResponse(int waitTime){
int tResultCode; int tResultCode;
TransporterFacade::instance()->checkForceSend(theNdbBlockNumber); TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
theWaiter.wait(waitTime); theImpl->theWaiter.wait(waitTime);
if(theWaiter.m_state == NO_WAIT) { if(theImpl->theWaiter.m_state == NO_WAIT) {
tResultCode = 0; tResultCode = 0;
} else { } else {
#ifdef VM_TRACE #ifdef VM_TRACE
ndbout << "ERR: receiveResponse - theWaiter.m_state = "; ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
ndbout << theWaiter.m_state << endl; ndbout << theImpl->theWaiter.m_state << endl;
#endif #endif
if (theWaiter.m_state == WAIT_NODE_FAILURE){ if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){
tResultCode = -2; tResultCode = -2;
} else { } else {
tResultCode = -1; tResultCode = -1;
} }
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} }
return tResultCode; return tResultCode;
}//Ndb::receiveResponse() }//Ndb::receiveResponse()
...@@ -1321,8 +1319,8 @@ Ndb::sendRecSignal(Uint16 node_id, ...@@ -1321,8 +1319,8 @@ Ndb::sendRecSignal(Uint16 node_id,
if (tp->check_send_size(node_id, send_size)) { if (tp->check_send_size(node_id, send_size)) {
return_code = tp->sendSignal(aSignal, node_id); return_code = tp->sendSignal(aSignal, node_id);
if (return_code != -1) { if (return_code != -1) {
theWaiter.m_node = node_id; theImpl->theWaiter.m_node = node_id;
theWaiter.m_state = aWaitState; theImpl->theWaiter.m_state = aWaitState;
return_code = receiveResponse(); return_code = receiveResponse();
} else { } else {
return_code = -3; return_code = -3;
......
...@@ -50,7 +50,9 @@ Ndb(const char* aDataBase); ...@@ -50,7 +50,9 @@ Ndb(const char* aDataBase);
Parameters: aDataBase : Name of the database. Parameters: aDataBase : Name of the database.
Remark: Connect to the database. Remark: Connect to the database.
***************************************************************************/ ***************************************************************************/
Ndb::Ndb( const char* aDataBase , const char* aSchema) { Ndb::Ndb( const char* aDataBase , const char* aSchema)
: theImpl(NULL)
{
DBUG_ENTER("Ndb::Ndb()"); DBUG_ENTER("Ndb::Ndb()");
DBUG_PRINT("enter",("(old)Ndb::Ndb this=0x%x", this)); DBUG_PRINT("enter",("(old)Ndb::Ndb this=0x%x", this));
if (theNoOfNdbObjects < 0) if (theNoOfNdbObjects < 0)
...@@ -66,6 +68,7 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) { ...@@ -66,6 +68,7 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) {
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection, Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema) const char* aDataBase , const char* aSchema)
: theImpl(NULL)
{ {
DBUG_ENTER("Ndb::Ndb()"); DBUG_ENTER("Ndb::Ndb()");
DBUG_PRINT("enter",("Ndb::Ndb this=0x%x", this)); DBUG_PRINT("enter",("Ndb::Ndb this=0x%x", this));
...@@ -82,7 +85,10 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -82,7 +85,10 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
{ {
DBUG_ENTER("Ndb::setup"); DBUG_ENTER("Ndb::setup");
m_ndb_cluster_connection= ndb_cluster_connection; assert(theImpl == NULL);
theImpl= new NdbImpl(ndb_cluster_connection,*this);
theDictionary= &(theImpl->m_dictionary);
thePreparedTransactionsArray= NULL; thePreparedTransactionsArray= NULL;
theSentTransactionsArray= NULL; theSentTransactionsArray= NULL;
theCompletedTransactionsArray= NULL; theCompletedTransactionsArray= NULL;
...@@ -93,8 +99,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -93,8 +99,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theMaxNoOfTransactions= 0; theMaxNoOfTransactions= 0;
theMinNoOfEventsToWakeUp= 0; theMinNoOfEventsToWakeUp= 0;
prefixEnd= NULL; prefixEnd= NULL;
theImpl= NULL;
theDictionary= NULL;
theConIdleList= NULL; theConIdleList= NULL;
theOpIdleList= NULL; theOpIdleList= NULL;
theScanOpIdleList= NULL; theScanOpIdleList= NULL;
...@@ -153,14 +157,12 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -153,14 +157,12 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len :
sizeof(prefixName) - 1); sizeof(prefixName) - 1);
theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr; theImpl->theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
// Signal that the constructor has finished OK // Signal that the constructor has finished OK
if (theInitState == NotConstructed) if (theInitState == NotConstructed)
theInitState = NotInitialised; theInitState = NotInitialised;
theImpl = new NdbImpl();
{ {
NdbGlobalEventBufferHandle *h= NdbGlobalEventBufferHandle *h=
NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS); NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS);
...@@ -171,11 +173,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -171,11 +173,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theGlobalEventBufferHandle = h; theGlobalEventBufferHandle = h;
} }
theDictionary = new NdbDictionaryImpl(*this);
if (theDictionary == NULL) {
ndbout_c("Ndb cailed to allocate dictionary");
exit(-1);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -201,8 +198,6 @@ Ndb::~Ndb() ...@@ -201,8 +198,6 @@ Ndb::~Ndb()
DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this)); DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this));
doDisconnect(); doDisconnect();
delete theDictionary;
NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle); NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle);
if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){ if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){
...@@ -245,7 +240,6 @@ Ndb::~Ndb() ...@@ -245,7 +240,6 @@ Ndb::~Ndb()
freeSignal(); freeSignal();
releaseTransactionArrays(); releaseTransactionArrays();
startTransactionNodeSelectionData.release();
delete []theConnectionArray; delete []theConnectionArray;
if(theCommitAckSignal != NULL){ if(theCommitAckSignal != NULL){
...@@ -292,14 +286,20 @@ NdbWaiter::~NdbWaiter(){ ...@@ -292,14 +286,20 @@ NdbWaiter::~NdbWaiter(){
NdbCondition_Destroy(m_condition); NdbCondition_Destroy(m_condition);
} }
NdbImpl::NdbImpl() : theNdbObjectIdMap(1024,1024), NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection,
theCurrentConnectIndex(0), Ndb& ndb)
theNoOfDBnodes(0) : m_ndb_cluster_connection(ndb_cluster_connection->m_impl),
m_dictionary(ndb),
theCurrentConnectIndex(0),
theNdbObjectIdMap(1024,1024),
theNoOfDBnodes(0)
{ {
int i; int i;
for (i = 0; i < MAX_NDB_NODES; i++) { for (i = 0; i < MAX_NDB_NODES; i++) {
the_release_ind[i] = 0; the_release_ind[i] = 0;
} }
m_optimized_node_selection=
m_ndb_cluster_connection.m_optimized_node_selection;
} }
NdbImpl::~NdbImpl() NdbImpl::~NdbImpl()
......
...@@ -127,7 +127,7 @@ private: ...@@ -127,7 +127,7 @@ private:
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
friend class GrepSS; friend class GrepSS;
friend class Ndb; friend class Ndb;
friend class Ndb_cluster_connection; friend class Ndb_cluster_connection_impl;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId); int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
#include <my_pthread.h> #include <my_pthread.h>
#include <my_sys.h> #include <my_sys.h>
#include <ndb_cluster_connection.hpp> #include "ndb_cluster_connection_impl.hpp"
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
#include <TransporterFacade.hpp> #include <TransporterFacade.hpp>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <NdbSleep.h> #include <NdbSleep.h>
...@@ -26,6 +28,8 @@ ...@@ -26,6 +28,8 @@
#include <ndb_limits.h> #include <ndb_limits.h>
#include <ConfigRetriever.hpp> #include <ConfigRetriever.hpp>
#include <ndb_version.h> #include <ndb_version.h>
#include <Vector.hpp>
#include <md5_hash.hpp>
static int g_run_connect_thread= 0; static int g_run_connect_thread= 0;
...@@ -35,56 +39,46 @@ NdbMutex *ndb_global_event_buffer_mutex= NULL; ...@@ -35,56 +39,46 @@ NdbMutex *ndb_global_event_buffer_mutex= NULL;
NdbMutex *ndb_print_state_mutex= NULL; NdbMutex *ndb_print_state_mutex= NULL;
#endif #endif
/*
* Ndb_cluster_connection
*/
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
: m_impl(* new Ndb_cluster_connection_impl(connect_string))
{ {
DBUG_ENTER("Ndb_cluster_connection"); }
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
m_config_retriever= 0; Ndb_cluster_connection::Ndb_cluster_connection
m_connect_thread= 0; (Ndb_cluster_connection_impl& impl) : m_impl(impl)
m_connect_callback= 0; {
}
if (ndb_global_event_buffer_mutex == NULL) Ndb_cluster_connection::~Ndb_cluster_connection()
{ {
ndb_global_event_buffer_mutex= NdbMutex_Create(); Ndb_cluster_connection_impl *tmp = &m_impl;
} if (this != tmp)
#ifdef VM_TRACE delete tmp;
if (ndb_print_state_mutex == NULL)
{
ndb_print_state_mutex= NdbMutex_Create();
}
#endif
m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server: %s",
m_config_retriever->getErrorString());
delete m_config_retriever;
m_config_retriever= 0;
}
DBUG_VOID_RETURN;
} }
int Ndb_cluster_connection::get_connected_port() const int Ndb_cluster_connection::get_connected_port() const
{ {
if (m_config_retriever) if (m_impl.m_config_retriever)
return m_config_retriever->get_mgmd_port(); return m_impl.m_config_retriever->get_mgmd_port();
return -1; return -1;
} }
const char *Ndb_cluster_connection::get_connected_host() const const char *Ndb_cluster_connection::get_connected_host() const
{ {
if (m_config_retriever) if (m_impl.m_config_retriever)
return m_config_retriever->get_mgmd_host(); return m_impl.m_config_retriever->get_mgmd_host();
return 0; return 0;
} }
const char *Ndb_cluster_connection::get_connectstring(char *buf, int buf_sz) const const char *Ndb_cluster_connection::get_connectstring(char *buf,
int buf_sz) const
{ {
if (m_config_retriever) if (m_impl.m_config_retriever)
return m_config_retriever->get_connectstring(buf,buf_sz); return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
return 0; return 0;
} }
...@@ -92,82 +86,415 @@ extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me) ...@@ -92,82 +86,415 @@ extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
{ {
my_thread_init(); my_thread_init();
g_run_connect_thread= 1; g_run_connect_thread= 1;
((Ndb_cluster_connection*) me)->connect_thread(); ((Ndb_cluster_connection_impl*) me)->connect_thread();
my_thread_end(); my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return me; return me;
} }
void Ndb_cluster_connection::connect_thread()
{
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
int r;
do {
NdbSleep_SecSleep(1);
if ((r = connect(0,0,0)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
DBUG_ASSERT(false);
g_run_connect_thread= 0;
} else {
// Wait before making a new connect attempt
NdbSleep_SecSleep(1);
}
} while (g_run_connect_thread);
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void)) int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
{ {
int r; int r;
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread"); DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
m_connect_callback= connect_callback; m_impl.m_connect_callback= connect_callback;
if ((r = connect(0,0,0)) == 1) if ((r = connect(0,0,0)) == 1)
{ {
DBUG_PRINT("info",("starting thread")); DBUG_PRINT("info",("starting thread"));
m_connect_thread= m_impl.m_connect_thread=
NdbThread_Create(run_ndb_cluster_connection_connect_thread, NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this, 32768, "ndb_cluster_connection", (void**)&m_impl, 32768, "ndb_cluster_connection",
NDB_THREAD_PRIO_LOW); NDB_THREAD_PRIO_LOW);
} }
else if (r < 0) else if (r < 0)
{ {
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
else if (m_connect_callback) else if (m_impl.m_connect_callback)
{ {
(*m_connect_callback)(); (*m_impl.m_connect_callback)();
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, int verbose) void Ndb_cluster_connection::set_optimized_node_selection(int val)
{
m_impl.m_optimized_node_selection= val;
}
void
Ndb_cluster_connection_impl::init_get_next_node
(Ndb_cluster_connection_node_iter &iter)
{
if (iter.scan_state != (Uint8)~0)
iter.cur_pos= iter.scan_state;
if (iter.cur_pos >= no_db_nodes())
iter.cur_pos= 0;
iter.init_pos= iter.cur_pos;
iter.scan_state= 0;
// fprintf(stderr,"[init %d]",iter.init_pos);
return;
}
Uint32
Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
{
Uint32 cur_pos= iter.cur_pos;
if (cur_pos >= no_db_nodes())
return 0;
Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase();
Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
if (iter.scan_state != (Uint8)~0)
{
assert(iter.scan_state < no_db_nodes());
if (nodes[iter.scan_state].group == node.group)
iter.scan_state= ~0;
else
return nodes[iter.scan_state++].id;
}
// fprintf(stderr,"[%d]",node.id);
cur_pos++;
Uint32 init_pos= iter.init_pos;
if (cur_pos == node.next_group)
{
cur_pos= nodes[init_pos].this_group;
}
// fprintf(stderr,"[cur_pos %d]",cur_pos);
if (cur_pos != init_pos)
iter.cur_pos= cur_pos;
else
{
iter.cur_pos= node.next_group;
iter.init_pos= node.next_group;
}
return node.id;
}
Uint32
Ndb_cluster_connection::no_db_nodes()
{
return m_impl.m_all_nodes.size();
}
int
Ndb_cluster_connection::wait_until_ready(int timeout,
int timeout_after_first_alive)
{
DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
TransporterFacade *tp = TransporterFacade::instance();
if (tp == 0)
{
DBUG_RETURN(-1);
}
if (tp->ownId() == 0)
{
DBUG_RETURN(-1);
}
int secondsCounter = 0;
int milliCounter = 0;
int noChecksSinceFirstAliveFound = 0;
do {
unsigned int foundAliveNode = 0;
tp->lock_mutex();
for(unsigned i= 0; i < no_db_nodes(); i++)
{
//************************************************
// If any node is answering, ndb is answering
//************************************************
if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
foundAliveNode++;
}
}
tp->unlock_mutex();
if (foundAliveNode == no_db_nodes())
{
DBUG_RETURN(0);
}
else if (foundAliveNode > 0)
{
noChecksSinceFirstAliveFound++;
if (timeout_after_first_alive >= 0)
{
if (noChecksSinceFirstAliveFound > timeout_after_first_alive)
DBUG_RETURN(0);
}
else // timeout_after_first_alive < 0
{
if (noChecksSinceFirstAliveFound > -timeout_after_first_alive)
DBUG_RETURN(-1);
}
}
else if (secondsCounter >= timeout)
{ // no alive nodes and timed out
DBUG_RETURN(-1);
}
NdbSleep_MilliSleep(100);
milliCounter += 100;
if (milliCounter >= 1000) {
secondsCounter++;
milliCounter = 0;
}//if
} while (1);
}
/*
* Ndb_cluster_connection_impl
*/
Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
connect_string)
: Ndb_cluster_connection(*this),
m_optimized_node_selection(1)
{
DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
m_transporter_facade=
TransporterFacade::theFacadeInstance= new TransporterFacade();
m_connect_thread= 0;
m_connect_callback= 0;
if (ndb_global_event_buffer_mutex == NULL)
{
ndb_global_event_buffer_mutex= NdbMutex_Create();
}
#ifdef VM_TRACE
if (ndb_print_state_mutex == NULL)
{
ndb_print_state_mutex= NdbMutex_Create();
}
#endif
m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server: %s",
m_config_retriever->getErrorString());
delete m_config_retriever;
m_config_retriever= 0;
}
DBUG_VOID_RETURN;
}
Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
{
DBUG_ENTER("~Ndb_cluster_connection");
DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this));
TransporterFacade::stop_instance();
if (m_connect_thread)
{
void *status;
g_run_connect_thread= 0;
NdbThread_WaitFor(m_connect_thread, &status);
NdbThread_Destroy(&m_connect_thread);
m_connect_thread= 0;
}
if (m_transporter_facade != 0)
{
delete m_transporter_facade;
if (m_transporter_facade != TransporterFacade::theFacadeInstance)
abort();
TransporterFacade::theFacadeInstance= 0;
}
if (m_config_retriever)
delete m_config_retriever;
// fragmentToNodeMap.release();
DBUG_VOID_RETURN;
}
void
Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
const ndb_mgm_configuration
&config)
{
DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
for(iter.first(); iter.valid(); iter.next())
{
Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
const char * remoteHostName= 0, * localHostName= 0;
if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
iter.get(CFG_CONNECTION_GROUP, &group);
{
const char * host1= 0, * host2= 0;
iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
localHostName = (nodeid == nodeid1 ? host1 : host2);
remoteHostName = (nodeid == nodeid1 ? host2 : host1);
}
Uint32 type = ~0;
if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
switch(type){
case CONNECTION_TYPE_SHM:{
break;
}
case CONNECTION_TYPE_SCI:{
break;
}
case CONNECTION_TYPE_TCP:{
// connecting through localhost
// check if config_hostname is local
if (SocketServer::tryBind(0,remoteHostName))
group--; // upgrade group value
break;
}
case CONNECTION_TYPE_OSE:{
break;
}
}
m_impl.m_all_nodes.push_back(Node(group,remoteNodeId));
DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
for (int i= m_impl.m_all_nodes.size()-2;
i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group;
i--)
{
Node tmp= m_impl.m_all_nodes[i];
m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1];
m_impl.m_all_nodes[i+1]= tmp;
}
}
int i;
Uint32 cur_group, i_group= 0;
cur_group= ~0;
for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--)
{
if (m_impl.m_all_nodes[i].group != cur_group)
{
cur_group= m_impl.m_all_nodes[i].group;
i_group= i+1;
}
m_impl.m_all_nodes[i].next_group= i_group;
}
cur_group= ~0;
for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
{
if (m_impl.m_all_nodes[i].group != cur_group)
{
cur_group= m_impl.m_all_nodes[i].group;
i_group= i;
}
m_impl.m_all_nodes[i].this_group= i_group;
}
#if 0
for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
{
fprintf(stderr, "[%d] %d %d %d %d\n",
i,
m_impl.m_all_nodes[i].id,
m_impl.m_all_nodes[i].group,
m_impl.m_all_nodes[i].this_group,
m_impl.m_all_nodes[i].next_group);
}
do_test();
#endif
DBUG_VOID_RETURN;
}
void
Ndb_cluster_connection_impl::do_test()
{
Ndb_cluster_connection_node_iter iter;
int n= no_db_nodes()+5;
Uint32 *nodes= new Uint32[n+1];
for (int g= 0; g < n; g++)
{
for (int h= 0; h < n; h++)
{
Uint32 id;
Ndb_cluster_connection_node_iter iter2;
{
for (int j= 0; j < g; j++)
{
nodes[j]= get_next_node(iter2);
}
}
for (int i= 0; i < n; i++)
{
init_get_next_node(iter);
fprintf(stderr, "%d dead:(", g);
id= 0;
while (id == 0)
{
if ((id= get_next_node(iter)) == 0)
break;
for (int j= 0; j < g; j++)
{
if (nodes[j] == id)
{
fprintf(stderr, " %d", id);
id= 0;
break;
}
}
}
fprintf(stderr, ")");
if (id == 0)
{
break;
}
fprintf(stderr, " %d\n", id);
}
fprintf(stderr, "\n");
}
}
delete [] nodes;
}
int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
int verbose)
{ {
DBUG_ENTER("Ndb_cluster_connection::connect"); DBUG_ENTER("Ndb_cluster_connection::connect");
const char* error = 0; const char* error = 0;
do { do {
if (m_config_retriever == 0) if (m_impl.m_config_retriever == 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (m_config_retriever->do_connect(no_retries,retry_delay_in_seconds,verbose)) if (m_impl.m_config_retriever->do_connect(no_retries,
retry_delay_in_seconds,
verbose))
DBUG_RETURN(1); // mgmt server not up yet DBUG_RETURN(1); // mgmt server not up yet
Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/); Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/,
3/*delay*/);
if(nodeId == 0) if(nodeId == 0)
break; break;
ndb_mgm_configuration * props = m_config_retriever->getConfig(); ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig();
if(props == 0) if(props == 0)
break; break;
m_facade->start_instance(nodeId, props); m_impl.m_transporter_facade->start_instance(nodeId, props);
m_impl.init_nodes_vector(nodeId, *props);
ndb_mgm_destroy_configuration(props); ndb_mgm_destroy_configuration(props);
m_facade->connected(); m_impl.m_transporter_facade->connected();
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
ndbout << "Configuration error: "; ndbout << "Configuration error: ";
const char* erString = m_config_retriever->getErrorString(); const char* erString = m_impl.m_config_retriever->getErrorString();
if (erString == 0) { if (erString == 0) {
erString = "No error specified!"; erString = "No error specified!";
} }
...@@ -175,29 +502,132 @@ int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, ...@@ -175,29 +502,132 @@ int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
Ndb_cluster_connection::~Ndb_cluster_connection() void Ndb_cluster_connection_impl::connect_thread()
{ {
DBUG_ENTER("~Ndb_cluster_connection"); DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this)); int r;
TransporterFacade::stop_instance(); do {
if (m_connect_thread) NdbSleep_SecSleep(1);
if ((r = connect(0,0,0)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
DBUG_ASSERT(false);
g_run_connect_thread= 0;
} else {
// Wait before making a new connect attempt
NdbSleep_SecSleep(1);
}
} while (g_run_connect_thread);
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
/*
* Hint handling to select node
* ToDo: fix this
*/
void
Ndb_cluster_connection_impl::FragmentToNodeMap::init(Uint32 noOfNodes,
Uint8 nodeIds[])
{
kValue = 6;
noOfFragments = 2 * noOfNodes;
/**
* Compute hashValueMask and hashpointerValue
*/
{ {
void *status; Uint32 topBit = (1 << 31);
g_run_connect_thread= 0; for(int i = 31; i>=0; i--){
NdbThread_WaitFor(m_connect_thread, &status); if((noOfFragments & topBit) != 0)
NdbThread_Destroy(&m_connect_thread); break;
m_connect_thread= 0; topBit >>= 1;
}
hashValueMask = topBit - 1;
hashpointerValue = noOfFragments - (hashValueMask + 1);
} }
if (m_facade != 0)
/**
* This initialization depends on
* the fact that:
* primary node for fragment i = i % noOfNodes
*
* This algorithm should be implemented in Dbdih
*/
{ {
delete m_facade; if (fragment2PrimaryNodeMap != 0)
if (m_facade != TransporterFacade::theFacadeInstance)
abort(); abort();
TransporterFacade::theFacadeInstance= 0;
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i;
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i] = nodeIds[i];
}
// Sort them (bubble sort)
for(i = 0; i<noOfNodes-1; i++)
for(Uint32 j = i+1; j<noOfNodes; j++)
if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
Uint32 tmp = fragment2PrimaryNodeMap[i];
fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
fragment2PrimaryNodeMap[j] = tmp;
}
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
}
} }
if (m_config_retriever)
delete m_config_retriever;
DBUG_VOID_RETURN;
} }
void
Ndb_cluster_connection_impl::FragmentToNodeMap::release(){
delete [] fragment2PrimaryNodeMap;
fragment2PrimaryNodeMap = 0;
}
static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
Uint32
Ndb_cluster_connection_impl::guess_primary_node(const char *keyData,
Uint32 keyLen)
{
Uint64 tempData[MAX_KEY_LEN_64_WORDS];
const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
const char * usedKeyData = 0;
/**
* If key data buffer is not aligned (on 64 bit boundary)
* or key len is not a multiple of 4
* Use temp data
*/
if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
usedKeyData = keyData;
} else {
memcpy(&tempData[0], keyData, keyLen);
const int slack = keyLen & 3;
if(slack > 0) {
memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
}//if
usedKeyData = (char *)&tempData[0];
}//if
Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
hashValue >>= fragmentToNodeMap.kValue;
Uint32 fragmentId = hashValue &
fragmentToNodeMap.hashValueMask;
if(fragmentId < fragmentToNodeMap.hashpointerValue) {
fragmentId = hashValue &
((fragmentToNodeMap.hashValueMask << 1) + 1);
}//if
return fragmentId;
}
template class Vector<Ndb_cluster_connection_impl::Node>;
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CLUSTER_CONNECTION_IMPL_HPP
#define CLUSTER_CONNECTION_IMPL_HPP
#include <ndb_cluster_connection.hpp>
#include <Vector.hpp>
class TransporterFacade;
class ConfigRetriever;
class NdbThread;
class ndb_mgm_configuration;
struct Ndb_cluster_connection_node_iter {
Ndb_cluster_connection_node_iter() : scan_state(~0),
init_pos(0),
cur_pos(0) {};
Uint8 scan_state;
Uint8 init_pos;
Uint8 cur_pos;
};
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection_impl : public Ndb_cluster_connection
{
Ndb_cluster_connection_impl(const char *connectstring);
~Ndb_cluster_connection_impl();
void do_test();
void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
private:
friend class Ndb;
friend class NdbImpl;
friend void* run_ndb_cluster_connection_connect_thread(void*);
friend class Ndb_cluster_connection;
/**
* Structure containing values for guessing primary node
*/
struct FragmentToNodeMap {
FragmentToNodeMap():
fragment2PrimaryNodeMap(0) {};
Uint32 kValue;
Uint32 hashValueMask;
Uint32 hashpointerValue;
Uint32 noOfFragments;
Uint32 *fragment2PrimaryNodeMap;
void init(Uint32 noOfNodes, Uint8 nodeIds[]);
void release();
} fragmentToNodeMap;
struct Node
{
Node(Uint32 _g= 0, Uint32 _id= 0) : this_group(0),
next_group(0),
group(_g),
id(_id) {};
Uint32 this_group;
Uint32 next_group;
Uint32 group;
Uint32 id;
};
Vector<Node> m_all_nodes;
void init_nodes_vector(Uint32 nodeid, const ndb_mgm_configuration &config);
Uint32 guess_primary_node(const char * keyData, Uint32 keyLen);
void connect_thread();
TransporterFacade *m_transporter_facade;
ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);
int m_optimized_node_selection;
};
#endif
...@@ -142,14 +142,22 @@ int runTestMaxTransaction(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -142,14 +142,22 @@ int runTestMaxTransaction(NDBT_Context* ctx, NDBT_Step* step){
4); 4);
break; break;
case 2: case 2:
ndbout_c("startTransactionDGroup not supported");
abort();
/*
pCon = pNdb->startTransactionDGroup(1, pCon = pNdb->startTransactionDGroup(1,
"TEST", "TEST",
0); 0);
*/
break; break;
case 3: case 3:
ndbout_c("startTransactionDGroup not supported");
abort();
/*
pCon = pNdb->startTransactionDGroup(2, pCon = pNdb->startTransactionDGroup(2,
"TEST", "TEST",
1); 1);
*/
break; break;
default: default:
......
...@@ -24,7 +24,11 @@ ...@@ -24,7 +24,11 @@
static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism=240); static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism=240);
static const char* opt_connect_str= 0; enum ndb_delete_all {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
......
...@@ -19,7 +19,11 @@ ...@@ -19,7 +19,11 @@
#include <NDBT.hpp> #include <NDBT.hpp>
#include <NdbApi.hpp> #include <NdbApi.hpp>
static const char* opt_connect_str= 0; enum ndb_desc_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static int _unqualified = 0; static int _unqualified = 0;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
......
...@@ -21,7 +21,11 @@ ...@@ -21,7 +21,11 @@
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include <NDBT.hpp> #include <NDBT.hpp>
static const char* opt_connect_str= 0; enum ndb_drop_index_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
......
...@@ -21,7 +21,11 @@ ...@@ -21,7 +21,11 @@
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include <NDBT.hpp> #include <NDBT.hpp>
static const char* opt_connect_str= 0; enum ndb_drop_table_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
......
...@@ -161,13 +161,17 @@ list(const char * tabname, ...@@ -161,13 +161,17 @@ list(const char * tabname,
} }
} }
static const char* opt_connect_str= 0; enum ndb_show_tables_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static int _loops; static int _loops;
static int _type; static int _type;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
NDB_STD_OPTS("ndb_desc"), NDB_STD_OPTS("ndb_show_tables"),
{ "database", 'd', "Name of database table is in", { "database", 'd', "Name of database table is in",
(gptr*) &_dbname, (gptr*) &_dbname, 0, (gptr*) &_dbname, (gptr*) &_dbname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
......
...@@ -36,7 +36,10 @@ static Vector<class BackupConsumer *> g_consumers; ...@@ -36,7 +36,10 @@ static Vector<class BackupConsumer *> g_consumers;
static const char* ga_backupPath = "." DIR_SEPARATOR; static const char* ga_backupPath = "." DIR_SEPARATOR;
static const char* opt_connect_str= NULL; enum ndb_restore_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
/** /**
* print and restore flags * print and restore flags
......
...@@ -36,7 +36,11 @@ int scanReadRecords(Ndb*, ...@@ -36,7 +36,11 @@ int scanReadRecords(Ndb*,
char delim, char delim,
bool orderby); bool orderby);
static const char* opt_connect_str= 0; enum ndb_select_all_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static const char* _delimiter = "\t"; static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock, static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
......
...@@ -32,7 +32,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab, ...@@ -32,7 +32,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
int* count_rows, int* count_rows,
UtilTransactions::ScanLock lock); UtilTransactions::ScanLock lock);
static const char* opt_connect_str= 0; enum ndb_select_count_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static int _parallelism = 240; static int _parallelism = 240;
static int _lock = 0; static int _lock = 0;
......
...@@ -30,7 +30,11 @@ int ...@@ -30,7 +30,11 @@ int
waitClusterStatus(const char* _addr, ndb_mgm_node_status _status, waitClusterStatus(const char* _addr, ndb_mgm_node_status _status,
unsigned int _timeout); unsigned int _timeout);
static const char* opt_connect_str= 0; enum ndb_waiter_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static int _no_contact = 0; static int _no_contact = 0;
static int _timeout = 120; static int _timeout = 120;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
......
...@@ -32,6 +32,10 @@ ...@@ -32,6 +32,10 @@
#include <ndbapi/NdbApi.hpp> #include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp> #include <ndbapi/NdbScanFilter.hpp>
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring;
// Default value for parallelism // Default value for parallelism
static const int parallelism= 240; static const int parallelism= 240;
...@@ -39,9 +43,6 @@ static const int parallelism= 240; ...@@ -39,9 +43,6 @@ static const int parallelism= 240;
// createable against NDB from this handler // createable against NDB from this handler
static const int max_transactions= 256; static const int max_transactions= 256;
// connectstring to cluster if given by mysqld
const char *ndbcluster_connectstring= 0;
static const char *ha_ndb_ext=".ndb"; static const char *ha_ndb_ext=".ndb";
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
...@@ -4233,15 +4234,19 @@ bool ndbcluster_init() ...@@ -4233,15 +4234,19 @@ bool ndbcluster_init()
int res; int res;
DBUG_ENTER("ndbcluster_init"); DBUG_ENTER("ndbcluster_init");
// Set connectstring if specified // Set connectstring if specified
if (ndbcluster_connectstring != 0) if (opt_ndbcluster_connectstring != 0)
DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring)); DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
if ((g_ndb_cluster_connection= if ((g_ndb_cluster_connection=
new Ndb_cluster_connection(ndbcluster_connectstring)) == 0) new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
{ {
DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring)); DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
opt_ndbcluster_connectstring));
goto ndbcluster_init_error; goto ndbcluster_init_error;
} }
g_ndb_cluster_connection->set_optimized_node_selection
(opt_ndb_optimized_node_selection);
// Create a Ndb object to open the connection to NDB // Create a Ndb object to open the connection to NDB
g_ndb= new Ndb(g_ndb_cluster_connection, "sys"); g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
...@@ -4256,7 +4261,7 @@ bool ndbcluster_init() ...@@ -4256,7 +4261,7 @@ bool ndbcluster_init()
DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d", DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
g_ndb_cluster_connection->get_connected_host(), g_ndb_cluster_connection->get_connected_host(),
g_ndb_cluster_connection->get_connected_port())); g_ndb_cluster_connection->get_connected_port()));
g_ndb->waitUntilReady(10); g_ndb_cluster_connection->wait_until_ready(10,0);
} }
else if(res == 1) else if(res == 1)
{ {
......
...@@ -775,9 +775,25 @@ longlong Item_func_neg::val_int() ...@@ -775,9 +775,25 @@ longlong Item_func_neg::val_int()
void Item_func_neg::fix_length_and_dec() void Item_func_neg::fix_length_and_dec()
{ {
enum Item_result arg_result= args[0]->result_type();
enum Item::Type arg_type= args[0]->type();
decimals=args[0]->decimals; decimals=args[0]->decimals;
max_length=args[0]->max_length; max_length=args[0]->max_length;
hybrid_type= REAL_RESULT; hybrid_type= REAL_RESULT;
/*
We need to account for added '-' in the following cases:
A) argument is a real or integer positive constant - in this case
argument's max_length is set to actual number of bytes occupied, and not
maximum number of bytes real or integer may require. Note that all
constants are non negative so we don't need to account for removed '-'.
B) argument returns a string.
*/
if (arg_result == STRING_RESULT ||
(arg_type == REAL_ITEM && ((Item_real*)args[0])->value >= 0) ||
(arg_type == INT_ITEM && ((Item_int*)args[0])->value > 0))
max_length++;
if (args[0]->result_type() == INT_RESULT) if (args[0]->result_type() == INT_RESULT)
{ {
/* /*
......
...@@ -53,6 +53,11 @@ ...@@ -53,6 +53,11 @@
#endif #endif
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
#define OPT_NDBCLUSTER_DEFAULT 0 #define OPT_NDBCLUSTER_DEFAULT 0
#ifdef NDB_SHM_TRANSPORTER
#define OPT_NDB_SHM_DEFAULT 1
#else
#define OPT_NDB_SHM_DEFAULT 0
#endif
#else #else
#define OPT_NDBCLUSTER_DEFAULT 0 #define OPT_NDBCLUSTER_DEFAULT 0
#endif #endif
...@@ -285,6 +290,10 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; ...@@ -285,6 +290,10 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0; my_bool opt_log_slave_updates= 0;
my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster; my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
#ifdef HAVE_NDBCLUSTER_DB
const char *opt_ndbcluster_connectstring= 0;
my_bool opt_ndb_shm, opt_ndb_optimized_node_selection;
#endif
my_bool opt_readonly, use_temp_pool, relay_log_purge; my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm; my_bool opt_sync_bdb_logs, opt_sync_frm;
my_bool opt_secure_auth= 0; my_bool opt_secure_auth= 0;
...@@ -3998,6 +4007,7 @@ enum options_mysqld ...@@ -3998,6 +4007,7 @@ enum options_mysqld
OPT_INNODB, OPT_ISAM, OPT_INNODB, OPT_ISAM,
OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT, OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ, OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION,
OPT_SKIP_SAFEMALLOC, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
...@@ -4439,24 +4449,46 @@ Disable with --skip-ndbcluster (will save memory).", ...@@ -4439,24 +4449,46 @@ Disable with --skip-ndbcluster (will save memory).",
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
{"ndb-connectstring", OPT_NDB_CONNECTSTRING, {"ndb-connectstring", OPT_NDB_CONNECTSTRING,
"Connect string for ndbcluster.", "Connect string for ndbcluster.",
(gptr*) &ndbcluster_connectstring, (gptr*) &ndbcluster_connectstring, (gptr*) &opt_ndbcluster_connectstring,
(gptr*) &opt_ndbcluster_connectstring,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"ndb_autoincrement_prefetch_sz", OPT_NDB_AUTOINCREMENT_PREFETCH_SZ, {"ndb-autoincrement-prefetch-sz", OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
"Specify number of autoincrement values that are prefetched", "Specify number of autoincrement values that are prefetched.",
(gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz, (gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz,
(gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz, (gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz,
0, GET_INT, REQUIRED_ARG, 32, 1, 256, 0, 0, 0}, 0, GET_INT, REQUIRED_ARG, 32, 1, 256, 0, 0, 0},
{"ndb-force-send", OPT_NDB_FORCE_SEND,
"Force send of buffers to ndb immediately without waiting for "
"other threads.",
(gptr*) &global_system_variables.ndb_force_send,
(gptr*) &global_system_variables.ndb_force_send,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb_force_send", OPT_NDB_FORCE_SEND, {"ndb_force_send", OPT_NDB_FORCE_SEND,
"Force send of buffers to ndb immediately without waiting for other threads", "same as --ndb-force-send.",
(gptr*) &global_system_variables.ndb_force_send, (gptr*) &global_system_variables.ndb_force_send,
(gptr*) &global_system_variables.ndb_force_send, (gptr*) &global_system_variables.ndb_force_send,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT,
"Use exact records count during query planning and for fast "
"select count(*), disable for faster queries.",
(gptr*) &global_system_variables.ndb_use_exact_count,
(gptr*) &global_system_variables.ndb_use_exact_count,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb_use_exact_count", OPT_NDB_USE_EXACT_COUNT, {"ndb_use_exact_count", OPT_NDB_USE_EXACT_COUNT,
"Use exact records count during query planning and for " "same as --ndb-use-exact-count.",
"fast select count(*)",
(gptr*) &global_system_variables.ndb_use_exact_count, (gptr*) &global_system_variables.ndb_use_exact_count,
(gptr*) &global_system_variables.ndb_use_exact_count, (gptr*) &global_system_variables.ndb_use_exact_count,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb-shm", OPT_NDB_SHM,
"Use shared memory connections when available.",
(gptr*) &opt_ndb_shm,
(gptr*) &opt_ndb_shm,
0, GET_BOOL, OPT_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0},
{"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,
"Select nodes for transactions in a more optimal way.",
(gptr*) &opt_ndb_optimized_node_selection,
(gptr*) &opt_ndb_optimized_node_selection,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
#endif #endif
{"new", 'n', "Use very new possible 'unsafe' functions.", {"new", 'n', "Use very new possible 'unsafe' functions.",
(gptr*) &global_system_variables.new_mode, (gptr*) &global_system_variables.new_mode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment