Commit 8c01aba7 authored by unknown's avatar unknown

added config parameter Group on connection

    moved NdbWaiter, m_ndb_cluster_connection, to impl class
    moved node selection things to cluster connection
    moved all private things to impl class
    added opts for shared memory and node selection
    changed opts handling somewhat; introduced enum for options and common handling of option variables
    added checks for transporter support
    automatic addition of shared mem transporters
    moved wait_until_ready code to cluster connection
    added control of usage of new node selection method


ndb/include/mgmapi/mgmapi_config_parameters.h:
  added config parameter Group on connection
ndb/include/ndbapi/Ndb.hpp:
  moved NdbWaiter, m_ndb_cluster_connection, to impl class
  moved node selection things to cluster connection
ndb/include/ndbapi/ndb_cluster_connection.hpp:
  moved all private things to impl class
ndb/include/util/ndb_opts.h:
  added opts for shared memory and node selection
ndb/src/kernel/vm/Configuration.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/src/mgmclient/main.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/src/mgmsrv/ConfigInfo.cpp:
  added checks for transporter support
  automatic addition of shared mem transporters
ndb/src/mgmsrv/MgmtSrvr.cpp:
  in alloc node id first choose connection with specified hostname
ndb/src/mgmsrv/main.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/src/ndbapi/DictCache.hpp:
  added include file
ndb/src/ndbapi/Ndb.cpp:
  enabled using new node selection method
  moved wait_until_ready code to cluster connection
  moved node selection (hint) to cluster connection
  removed start transaction dgroup
ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  removed and added inclusde files
ndb/src/ndbapi/NdbImpl.hpp:
  moved things from Ndb into Impl class
  moved waiter things to new file NdbWaiter.hpp
ndb/src/ndbapi/NdbScanOperation.cpp:
  ndbwaiter is no in impl class
ndb/src/ndbapi/Ndbif.cpp:
  ndbwaiter is no in impl class
ndb/src/ndbapi/Ndbinit.cpp:
  moved some Ndb things into impl class
ndb/src/ndbapi/TransporterFacade.hpp:
  changed friend declaration
ndb/src/ndbapi/ndb_cluster_connection.cpp:
  moved node selection things to cluster connection
  moved things from cluster connection to cluster connection impl class
ndb/test/ndbapi/testNdbApi.cpp:
  removed start transaction dgroup
ndb/tools/delete_all.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/desc.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/drop_index.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/drop_tab.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/listTables.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/restore/restore_main.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/select_all.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/select_count.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
ndb/tools/waiter.cpp:
  changed opts handling somewhat; introduced enum for options and common handling of option variables
sql/ha_ndbcluster.cc:
  added control of usage of new node selection method
sql/mysqld.cc:
  added control of usage of new node selection method
parent 6cbcd342
...@@ -110,6 +110,7 @@ ...@@ -110,6 +110,7 @@
#define CFG_CONNECTION_SERVER_PORT 406 #define CFG_CONNECTION_SERVER_PORT 406
#define CFG_CONNECTION_HOSTNAME_1 407 #define CFG_CONNECTION_HOSTNAME_1 407
#define CFG_CONNECTION_HOSTNAME_2 408 #define CFG_CONNECTION_HOSTNAME_2 408
#define CFG_CONNECTION_GROUP 409
#define CFG_TCP_SERVER 452 #define CFG_TCP_SERVER 452
#define CFG_TCP_SEND_BUFFER_SIZE 454 #define CFG_TCP_SEND_BUFFER_SIZE 454
......
...@@ -901,23 +901,6 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); ...@@ -901,23 +901,6 @@ typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
NDB_MAX_SCHEMA_NAME_SIZE + \ NDB_MAX_SCHEMA_NAME_SIZE + \
NDB_MAX_TAB_NAME_SIZE*2 NDB_MAX_TAB_NAME_SIZE*2
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
class NdbWaiter {
public:
NdbWaiter();
~NdbWaiter();
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
struct NdbCondition * m_condition;
};
#endif
/** /**
* @class Ndb * @class Ndb
* @brief Represents the NDB kernel and is the main class of the NDB API. * @brief Represents the NDB kernel and is the main class of the NDB API.
...@@ -1199,39 +1182,6 @@ public: ...@@ -1199,39 +1182,6 @@ public:
const char * keyData = 0, const char * keyData = 0,
Uint32 keyLen = 0); Uint32 keyLen = 0);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* This method is a modification of Ndb::startTransaction,
* in which we use only the first two chars of keyData to
* select transaction coordinator.
* This is referred to as a distribution group.
* There are two ways to use the method:
* - In the first, the two characters are used directly as
* the distribution key, and
* - in the second the distribution is calculated as:
* (10 * (char[0] - 0x30) + (char[1] - 0x30)).
* Thus, in the second way, the two ASCII digits '78'
* will provide the distribution key = 78.
*
* @note Transaction priorities are not yet supported.
*
* @param aPrio Priority of the transaction.<br>
* Priority 0 is the highest priority and is used for short transactions
* with requirements on low delay.<br>
* Priority 1 is a medium priority for short transactions.<br>
* Priority 2 is a medium priority for long transactions.<br>
* Priority 3 is a low priority for long transactions.
* @param keyData is a string of which the two first characters
* is used to compute which fragement the data is stored in.
* @param type is the type of distribution group.<br>
* 0 means direct usage of the two characters, and<br>
* 1 means the ASCII digit variant.
* @return NdbConnection, or NULL if it failed.
*/
NdbConnection* startTransactionDGroup(Uint32 aPrio,
const char * keyData, int type);
#endif
/** /**
* When a transactions is completed, the transaction has to be closed. * When a transactions is completed, the transaction has to be closed.
* *
...@@ -1586,8 +1536,6 @@ private: ...@@ -1586,8 +1536,6 @@ private:
/****************************************************************************** /******************************************************************************
* These are the private variables in this class. * These are the private variables in this class.
*****************************************************************************/ *****************************************************************************/
Ndb_cluster_connection *m_ndb_cluster_connection;
NdbConnection** thePreparedTransactionsArray; NdbConnection** thePreparedTransactionsArray;
NdbConnection** theSentTransactionsArray; NdbConnection** theSentTransactionsArray;
NdbConnection** theCompletedTransactionsArray; NdbConnection** theCompletedTransactionsArray;
...@@ -1601,8 +1549,6 @@ private: ...@@ -1601,8 +1549,6 @@ private:
Uint32 theNextConnectNode; Uint32 theNextConnectNode;
NdbWaiter theWaiter;
bool fullyQualifiedNames; bool fullyQualifiedNames;
// Ndb database name. // Ndb database name.
...@@ -1658,35 +1604,6 @@ private: ...@@ -1658,35 +1604,6 @@ private:
InitConfigError InitConfigError
} theInitState; } theInitState;
/**
* Computes fragement id for primary key
*
* Note that keydata has to be "shaped" as it is being sent in KEYINFO
*/
Uint32 computeFragmentId(const char * keyData, Uint32 keyLen);
Uint32 getFragmentId(Uint32 hashValue);
/**
* Make a guess to which node is the primary for the fragment
*/
Uint32 guessPrimaryNode(Uint32 fragmentId);
/**
* Structure containing values for guessing primary node
*/
struct StartTransactionNodeSelectionData {
StartTransactionNodeSelectionData():
fragment2PrimaryNodeMap(0) {};
Uint32 kValue;
Uint32 hashValueMask;
Uint32 hashpointerValue;
Uint32 noOfFragments;
Uint32 * fragment2PrimaryNodeMap;
void init(Uint32 noOfNodes, Uint8 nodeIds[]);
void release();
} startTransactionNodeSelectionData;
NdbApiSignal* theCommitAckSignal; NdbApiSignal* theCommitAckSignal;
......
...@@ -18,13 +18,7 @@ ...@@ -18,13 +18,7 @@
#ifndef CLUSTER_CONNECTION_HPP #ifndef CLUSTER_CONNECTION_HPP
#define CLUSTER_CONNECTION_HPP #define CLUSTER_CONNECTION_HPP
class TransporterFacade; struct Ndb_cluster_connection_node_iter;
class ConfigRetriever;
struct NdbThread;
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection { class Ndb_cluster_connection {
public: public:
...@@ -32,16 +26,27 @@ public: ...@@ -32,16 +26,27 @@ public:
~Ndb_cluster_connection(); ~Ndb_cluster_connection();
int connect(int no_retries, int retry_delay_in_seconds, int verbose); int connect(int no_retries, int retry_delay_in_seconds, int verbose);
int start_connect_thread(int (*connect_callback)(void)= 0); int start_connect_thread(int (*connect_callback)(void)= 0);
// add check coupled to init state of cluster connection
// timeout_after_first_alive negative - ok only if all alive
// timeout_after_first_alive positive - ok if some alive
int wait_until_ready(int timeout_for_first_alive,
int timeout_after_first_alive);
const char *get_connectstring(char *buf, int buf_sz) const; const char *get_connectstring(char *buf, int buf_sz) const;
int get_connected_port() const; int get_connected_port() const;
const char *get_connected_host() const; const char *get_connected_host() const;
void set_optimized_node_selection(int val);
Uint32 no_db_nodes();
private: private:
friend void* run_ndb_cluster_connection_connect_thread(void*); friend class Ndb;
void connect_thread(); friend class NdbImpl;
TransporterFacade *m_facade; friend class Ndb_cluster_connection_impl;
ConfigRetriever *m_config_retriever; class Ndb_cluster_connection_impl & m_impl;
NdbThread *m_connect_thread; Ndb_cluster_connection(Ndb_cluster_connection_impl&);
int (*m_connect_callback)(void);
}; };
#endif #endif
...@@ -17,47 +17,62 @@ ...@@ -17,47 +17,62 @@
#ifndef _NDB_OPTS_H #ifndef _NDB_OPTS_H
#define _NDB_OPTS_H #define _NDB_OPTS_H
#include <ndb_global.h>
#include <my_sys.h> #include <my_sys.h>
#include <my_getopt.h> #include <my_getopt.h>
#include <mysql_version.h> #include <mysql_version.h>
#include <ndb_version.h> #include <ndb_version.h>
#ifndef DBUG_OFF #define NDB_STD_OPTS_VARS \
#define NDB_STD_OPTS(prog_name) \ const char *opt_connect_str= 0;\
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", \ my_bool opt_ndb_shm;\
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, \ my_bool opt_ndb_optimized_node_selection
{ "usage", '?', "Display this help and exit.", \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ #define NDB_STD_OPTS_OPTIONS \
{ "help", '?', "Display this help and exit.", \ OPT_NDB_SHM= 256,\
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ OPT_NDB_OPTIMIZED_NODE_SELECTION
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ #define OPT_NDB_CONNECTSTRING 'c'
{ "ndb-connectstring", 'c', \
"Set connect string for connecting to ndb_mgmd. " \ #ifdef NDB_SHM_TRANSPORTER
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \ #define OPT_NDB_SHM_DEFAULT 1
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#else #else
#define NDB_STD_OPTS(prog_name) \ #define OPT_NDB_SHM_DEFAULT 0
#endif
#define NDB_STD_OPTS_COMMON \
{ "usage", '?', "Display this help and exit.", \ { "usage", '?', "Display this help and exit.", \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "help", '?', "Display this help and exit.", \ { "help", '?', "Display this help and exit.", \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \ { "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "ndb-connectstring", 'c', \ { "ndb-connectstring", OPT_NDB_CONNECTSTRING, \
"Set connect string for connecting to ndb_mgmd. " \ "Set connect string for connecting to ndb_mgmd. " \
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \ "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \ "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\ { "ndb-shm", OPT_NDB_SHM,\
"Allow optimizing using shared memory connections when available",\
(gptr*) &opt_ndb_shm, (gptr*) &opt_ndb_shm, 0,\
GET_BOOL, NO_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0 },\
{"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,\
"Select nodes for transactions in a more optimal way",\
(gptr*) &opt_ndb_optimized_node_selection,\
(gptr*) &opt_ndb_optimized_node_selection, 0,\
GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},\
{ "connect-string", OPT_NDB_CONNECTSTRING, "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 } GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#ifndef DBUG_OFF
#define NDB_STD_OPTS(prog_name) \
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", \
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, \
NDB_STD_OPTS_COMMON
#else
#define NDB_STD_OPTS(prog_name) NDB_STD_OPTS_COMMON
#endif #endif
#endif /*_NDB_OPTS_H */ #endif /*_NDB_OPTS_H */
...@@ -46,7 +46,13 @@ extern "C" { ...@@ -46,7 +46,13 @@ extern "C" {
#include <EventLogger.hpp> #include <EventLogger.hpp>
extern EventLogger g_eventLogger; extern EventLogger g_eventLogger;
static const char* opt_connect_str= 0; enum ndbd_options {
NDB_STD_OPTS_OPTIONS,
OPT_INITIAL,
OPT_NODAEMON
};
NDB_STD_OPTS_VARS;
static int _daemon, _no_daemon, _initial, _no_start; static int _daemon, _no_daemon, _initial, _no_start;
/** /**
* Arguments to NDB process * Arguments to NDB process
...@@ -54,7 +60,7 @@ static int _daemon, _no_daemon, _initial, _no_start; ...@@ -54,7 +60,7 @@ static int _daemon, _no_daemon, _initial, _no_start;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
NDB_STD_OPTS("ndbd"), NDB_STD_OPTS("ndbd"),
{ "initial", 256, { "initial", OPT_INITIAL,
"Perform initial start of ndbd, including cleaning the file system. " "Perform initial start of ndbd, including cleaning the file system. "
"Consult documentation before using this", "Consult documentation before using this",
(gptr*) &_initial, (gptr*) &_initial, 0, (gptr*) &_initial, (gptr*) &_initial, 0,
...@@ -66,7 +72,7 @@ static struct my_option my_long_options[] = ...@@ -66,7 +72,7 @@ static struct my_option my_long_options[] =
{ "daemon", 'd', "Start ndbd as daemon (default)", { "daemon", 'd', "Start ndbd as daemon (default)",
(gptr*) &_daemon, (gptr*) &_daemon, 0, (gptr*) &_daemon, (gptr*) &_daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "nodaemon", 257, { "nodaemon", OPT_NODAEMON,
"Do not start ndbd as daemon, provided for testing purposes", "Do not start ndbd as daemon, provided for testing purposes",
(gptr*) &_no_daemon, (gptr*) &_no_daemon, 0, (gptr*) &_no_daemon, (gptr*) &_no_daemon, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
......
...@@ -56,9 +56,13 @@ handler(int sig){ ...@@ -56,9 +56,13 @@ handler(int sig){
} }
} }
enum ndb_mgm_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char default_prompt[]= "ndb_mgm> "; static const char default_prompt[]= "ndb_mgm> ";
static unsigned _try_reconnect; static unsigned _try_reconnect;
static char *opt_connect_str= 0;
static const char *prompt= default_prompt; static const char *prompt= default_prompt;
static char *opt_execute_str= 0; static char *opt_execute_str= 0;
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "InitConfigFileParser.hpp" #include "InitConfigFileParser.hpp"
#include <m_string.h> #include <m_string.h>
extern my_bool opt_ndb_shm;
#define MAX_LINE_LENGTH 255 #define MAX_LINE_LENGTH 255
#define KEY_INTERNAL 0 #define KEY_INTERNAL 0
#define MAX_INT_RNIL 0xfffffeff #define MAX_INT_RNIL 0xfffffeff
...@@ -79,6 +81,7 @@ static bool transformSystem(InitConfigFileParser::Context & ctx, const char *); ...@@ -79,6 +81,7 @@ static bool transformSystem(InitConfigFileParser::Context & ctx, const char *);
static bool transformExternalSystem(InitConfigFileParser::Context & ctx, const char *); static bool transformExternalSystem(InitConfigFileParser::Context & ctx, const char *);
static bool transformNode(InitConfigFileParser::Context & ctx, const char *); static bool transformNode(InitConfigFileParser::Context & ctx, const char *);
static bool transformExtNode(InitConfigFileParser::Context & ctx, const char *); static bool transformExtNode(InitConfigFileParser::Context & ctx, const char *);
static bool checkConnectionSupport(InitConfigFileParser::Context & ctx, const char *);
static bool transformConnection(InitConfigFileParser::Context & ctx, const char *); static bool transformConnection(InitConfigFileParser::Context & ctx, const char *);
static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *); static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *);
static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *); static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *);
...@@ -108,6 +111,11 @@ ConfigInfo::m_SectionRules[] = { ...@@ -108,6 +111,11 @@ ConfigInfo::m_SectionRules[] = {
{ "REP", transformNode, 0 }, { "REP", transformNode, 0 },
{ "EXTERNAL REP", transformExtNode, 0 }, { "EXTERNAL REP", transformExtNode, 0 },
{ "TCP", checkConnectionSupport, 0 },
{ "SHM", checkConnectionSupport, 0 },
{ "SCI", checkConnectionSupport, 0 },
{ "OSE", checkConnectionSupport, 0 },
{ "TCP", transformConnection, 0 }, { "TCP", transformConnection, 0 },
{ "SHM", transformConnection, 0 }, { "SHM", transformConnection, 0 },
{ "SCI", transformConnection, 0 }, { "SCI", transformConnection, 0 },
...@@ -130,6 +138,8 @@ ConfigInfo::m_SectionRules[] = { ...@@ -130,6 +138,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName1" }, { "TCP", fixHostname, "HostName1" },
{ "TCP", fixHostname, "HostName2" }, { "TCP", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" },
{ "SHM", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" }, { "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" }, { "SCI", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" }, { "SHM", fixHostname, "HostName1" },
...@@ -197,6 +207,9 @@ static bool sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -197,6 +207,9 @@ static bool sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections,
static bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, static bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * rule_data); const char * rule_data);
static bool set_connection_priorities(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections, static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * rule_data); const char * rule_data);
...@@ -208,6 +221,7 @@ const ConfigInfo::ConfigRule ...@@ -208,6 +221,7 @@ const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = { ConfigInfo::m_ConfigRules[] = {
{ sanity_checks, 0 }, { sanity_checks, 0 },
{ add_node_connections, 0 }, { add_node_connections, 0 },
{ set_connection_priorities, 0 },
{ add_server_ports, 0 }, { add_server_ports, 0 },
{ check_node_vs_replicas, 0 }, { check_node_vs_replicas, 0 },
{ 0, 0 } { 0, 0 }
...@@ -1582,6 +1596,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1582,6 +1596,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
MANDATORY, MANDATORY,
0, 0 }, 0, 0 },
{
CFG_CONNECTION_GROUP,
"Group",
"TCP",
"",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"55",
"0", "200" },
{ {
CFG_CONNECTION_SEND_SIGNAL_ID, CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId", "SendSignalId",
...@@ -1747,6 +1772,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1747,6 +1772,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
MANDATORY, MANDATORY,
0, 0 }, 0, 0 },
{
CFG_CONNECTION_GROUP,
"Group",
"SHM",
"",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"35",
"0", "200" },
{ {
CFG_CONNECTION_SEND_SIGNAL_ID, CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId", "SendSignalId",
...@@ -1780,7 +1816,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1780,7 +1816,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_USED, ConfigInfo::CI_USED,
false, false,
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
MANDATORY, "0",
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
...@@ -1857,6 +1893,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1857,6 +1893,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{
CFG_CONNECTION_GROUP,
"Group",
"SCI",
"",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"15",
"0", "200" },
{ {
CFG_CONNECTION_HOSTNAME_1, CFG_CONNECTION_HOSTNAME_1,
"HostName1", "HostName1",
...@@ -2681,11 +2728,50 @@ transformExtNode(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -2681,11 +2728,50 @@ transformExtNode(InitConfigFileParser::Context & ctx, const char * data){
} }
/** /**
* Connection rule: Update "NoOfConnections" * Connection rule: Check support of connection
*/ */
bool bool
transformConnection(InitConfigFileParser::Context & ctx, const char * data){ checkConnectionSupport(InitConfigFileParser::Context & ctx, const char * data)
{
int error= 0;
if (strcasecmp("TCP",ctx.fname) == 0)
{
// always enabled
}
else if (strcasecmp("SHM",ctx.fname) == 0)
{
#ifndef NDB_SHM_TRANSPORTER
error= 1;
#endif
}
else if (strcasecmp("SCI",ctx.fname) == 0)
{
#ifndef NDB_SCI_TRANSPORTER
error= 1;
#endif
}
else if (strcasecmp("OSE",ctx.fname) == 0)
{
#ifndef NDB_OSE_TRANSPORTER
error= 1;
#endif
}
if (error)
{
ctx.reportError("Binary not compiled with this connection support, "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
}
return true;
}
/**
* Connection rule: Update "NoOfConnections"
*/
bool
transformConnection(InitConfigFileParser::Context & ctx, const char * data)
{
Uint32 connections = 0; Uint32 connections = 0;
ctx.m_userProperties.get("NoOfConnections", &connections); ctx.m_userProperties.get("NoOfConnections", &connections);
BaseString::snprintf(ctx.pname, sizeof(ctx.pname), "Connection_%d", connections); BaseString::snprintf(ctx.pname, sizeof(ctx.pname), "Connection_%d", connections);
...@@ -3398,11 +3484,51 @@ sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3398,11 +3484,51 @@ sanity_checks(Vector<ConfigInfo::ConfigRuleSection>&sections,
return true; return true;
} }
static void
add_a_connection(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
Uint32 nodeId1, Uint32 nodeId2, bool use_shm)
{
ConfigInfo::ConfigRuleSection s;
const char *hostname1= 0, *hostname2= 0;
const Properties *tmp;
require(ctx.m_config->get("Node", nodeId1, &tmp));
tmp->get("HostName", &hostname1);
require(ctx.m_config->get("Node", nodeId2, &tmp));
tmp->get("HostName", &hostname2);
char buf[16];
s.m_sectionData= new Properties(true);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
s.m_sectionData->put("NodeId1", buf);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
s.m_sectionData->put("NodeId2", buf);
if (use_shm &&
hostname1 && hostname1[0] &&
hostname2 && hostname2[0] &&
strcmp(hostname1,hostname2) == 0)
{
s.m_sectionType= BaseString("SHM");
DBUG_PRINT("info",("adding SHM connection %d %d",nodeId1,nodeId2));
}
else
{
s.m_sectionType= BaseString("TCP");
DBUG_PRINT("info",("adding TCP connection %d %d",nodeId1,nodeId2));
}
sections.push_back(s);
}
static bool static bool
add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * rule_data) const char * rule_data)
{ {
DBUG_ENTER("add_node_connections");
Uint32 i; Uint32 i;
Properties * props= ctx.m_config; Properties * props= ctx.m_config;
Properties p_connections(true); Properties p_connections(true);
...@@ -3427,9 +3553,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3427,9 +3553,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
ctx.m_userProperties.get("NoOfNodes", &nNodes); ctx.m_userProperties.get("NoOfNodes", &nNodes);
Properties p_db_nodes(true); Properties p_db_nodes(true);
Properties p_api_mgm_nodes(true); Properties p_api_nodes(true);
Properties p_mgm_nodes(true);
Uint32 i_db= 0, i_api_mgm= 0, n; Uint32 i_db= 0, i_api= 0, i_mgm= 0, n;
for (i= 0, n= 0; n < nNodes; i++){ for (i= 0, n= 0; n < nNodes; i++){
const Properties * tmp; const Properties * tmp;
if(!props->get("Node", i, &tmp)) continue; if(!props->get("Node", i, &tmp)) continue;
...@@ -3440,9 +3567,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3440,9 +3567,10 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
if (strcmp(type,DB_TOKEN) == 0) if (strcmp(type,DB_TOKEN) == 0)
p_db_nodes.put("", i_db++, i); p_db_nodes.put("", i_db++, i);
else if (strcmp(type,API_TOKEN) == 0 || else if (strcmp(type,API_TOKEN) == 0)
strcmp(type,MGM_TOKEN) == 0) p_api_nodes.put("", i_api++, i);
p_api_mgm_nodes.put("", i_api_mgm++, i); else if (strcmp(type,MGM_TOKEN) == 0)
p_mgm_nodes.put("", i_mgm++, i);
} }
Uint32 nodeId1, nodeId2, dummy; Uint32 nodeId1, nodeId2, dummy;
...@@ -3451,39 +3579,39 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3451,39 +3579,39 @@ add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
for (Uint32 j= i+1;; j++){ for (Uint32 j= i+1;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break; if(!p_db_nodes.get("", j, &nodeId2)) break;
if(!p_connections2.get("", nodeId1+nodeId2<<16, &dummy)) { if(!p_connections2.get("", nodeId1+nodeId2<<16, &dummy)) {
ConfigInfo::ConfigRuleSection s; add_a_connection(sections,ctx,nodeId1,nodeId2,opt_ndb_shm);
s.m_sectionType= BaseString("TCP");
s.m_sectionData= new Properties(true);
char buf[16];
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
s.m_sectionData->put("NodeId1", buf);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
s.m_sectionData->put("NodeId2", buf);
sections.push_back(s);
} }
} }
} }
for (i= 0; p_api_mgm_nodes.get("", i, &nodeId1); i++){ for (i= 0; p_api_nodes.get("", i, &nodeId1); i++){
if(!p_connections.get("", nodeId1, &dummy)) { if(!p_connections.get("", nodeId1, &dummy)) {
for (Uint32 j= 0;; j++){ for (Uint32 j= 0;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break; if(!p_db_nodes.get("", j, &nodeId2)) break;
ConfigInfo::ConfigRuleSection s; add_a_connection(sections,ctx,nodeId1,nodeId2,opt_ndb_shm);
s.m_sectionType= BaseString("TCP");
s.m_sectionData= new Properties(true);
char buf[16];
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId1);
s.m_sectionData->put("NodeId1", buf);
BaseString::snprintf(buf, sizeof(buf), "%u", nodeId2);
s.m_sectionData->put("NodeId2", buf);
sections.push_back(s);
} }
} }
} }
return true; for (i= 0; p_mgm_nodes.get("", i, &nodeId1); i++){
if(!p_connections.get("", nodeId1, &dummy)) {
for (Uint32 j= 0;; j++){
if(!p_db_nodes.get("", j, &nodeId2)) break;
add_a_connection(sections,ctx,nodeId1,nodeId2,0);
}
}
}
DBUG_RETURN(true);
} }
static bool set_connection_priorities(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data)
{
DBUG_ENTER("set_connection_priorities");
DBUG_RETURN(true);
}
static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections, static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
......
...@@ -2225,9 +2225,24 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2225,9 +2225,24 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (*nodeId != 0 || if (*nodeId != 0 ||
type != NDB_MGM_NODE_TYPE_MGM || type != NDB_MGM_NODE_TYPE_MGM ||
no_mgm == 1) { // any match is ok no_mgm == 1) { // any match is ok
if (config_hostname == 0 &&
*nodeId == 0 &&
type != NDB_MGM_NODE_TYPE_MGM)
{
if (!id_found) // only set if not set earlier
id_found= tmp;
continue; /* continue looking for a nodeid with specified
* hostname
*/
}
assert(id_found == 0);
id_found= tmp; id_found= tmp;
break; break;
} }
assert(no_mgm > 1);
assert(*nodeId != 0);
assert(type != NDB_MGM_NODE_TYPE_MGM);
if (id_found) { // mgmt server may only have one match if (id_found) { // mgmt server may only have one match
error_string.appfmt("Ambiguous node id's %d and %d.\n" error_string.appfmt("Ambiguous node id's %d and %d.\n"
"Suggest specifying node id in connectstring,\n" "Suggest specifying node id in connectstring,\n"
......
...@@ -89,50 +89,50 @@ bool g_StopServer; ...@@ -89,50 +89,50 @@ bool g_StopServer;
extern EventLogger g_EventLogger; extern EventLogger g_EventLogger;
extern int global_mgmt_server_check; extern int global_mgmt_server_check;
static char *opt_connect_str= 0;
enum ndb_mgmd_options {
NDB_STD_OPTS_OPTIONS,
OPT_INTERACTIVE,
OPT_NO_NODEID_CHECKS,
OPT_NO_DAEMON
};
NDB_STD_OPTS_VARS;
#if NDB_VERSION_MAJOR <= 4
#undef OPT_NDB_CONNECTSTRING
#define OPT_NDB_CONNECTSTRING 1023
#else
#endif
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
#ifndef DBUG_OFF NDB_STD_OPTS("ndb_mgmd"),
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 },
#endif
{ "usage", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "version", 'V', "Output version information and exit.", 0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "ndb-connectstring", 1023,
"Set connect string for connecting to ndb_mgmd. "
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". "
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "connect-string", 1023,
"same as --ndb-connectstring.",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "config-file", 'f', "Specify cluster configuration file", { "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, (gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)", { "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0, (gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "interactive", 256, "Run interactive. Not supported but provided for testing purposes", { "interactive", OPT_INTERACTIVE,
"Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0, (gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-nodeid-checks", 257, "Do not provide any node id checks", { "no-nodeid-checks", OPT_NO_NODEID_CHECKS,
"Do not provide any node id checks",
(gptr*) &g_no_nodeid_checks, (gptr*) &g_no_nodeid_checks, 0, (gptr*) &g_no_nodeid_checks, (gptr*) &g_no_nodeid_checks, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "nodaemon", 258, "Don't run as daemon, but don't read from stdin", { "nodaemon", OPT_NO_DAEMON,
"Don't run as daemon, but don't read from stdin",
(gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0, (gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
#if NDB_VERSION_MAJOR <= 4
{ "config-file", 'c', { "config-file", 'c',
"-c provided for backwards compatability, will be removed in 5.0." "-c provided for backwards compatability, will be removed in 5.0."
" Use -f instead", " Use -f instead",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, (gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
#endif
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
static void short_usage_sub(void) static void short_usage_sub(void)
...@@ -164,6 +164,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -164,6 +164,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
case 'c': case 'c':
printf("Warning: -c will be removed in 5.0, use -f instead\n"); printf("Warning: -c will be removed in 5.0, use -f instead\n");
break; break;
case OPT_NDB_SHM:
#ifndef NDB_SHM_TRANSPORTER
printf("Warning: binary not compiled with shared memory support,\n"
"use configure option --with-ndb-shm to enable support.\n"
"Tcp connections will now be used instead\n");
opt_ndb_shm= 0;
#endif
break;
case '?': case '?':
usage(); usage();
exit(0); exit(0);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <UtilBuffer.hpp> #include <UtilBuffer.hpp>
#include <NdbDictionary.hpp> #include <NdbDictionary.hpp>
#include <Ndb.hpp> #include <Ndb.hpp>
#include <NdbCondition.h>
#include "NdbLinHash.hpp" #include "NdbLinHash.hpp"
class Ndb_local_table_info { class Ndb_local_table_info {
......
...@@ -46,7 +46,6 @@ Connect to any node which has no connection at the moment. ...@@ -46,7 +46,6 @@ Connect to any node which has no connection at the moment.
NdbConnection* Ndb::doConnect(Uint32 tConNode) NdbConnection* Ndb::doConnect(Uint32 tConNode)
{ {
Uint32 tNode; Uint32 tNode;
Uint32 i = 0;;
Uint32 tAnyAlive = 0; Uint32 tAnyAlive = 0;
int TretCode; int TretCode;
...@@ -65,26 +64,51 @@ NdbConnection* Ndb::doConnect(Uint32 tConNode) ...@@ -65,26 +64,51 @@ NdbConnection* Ndb::doConnect(Uint32 tConNode)
// We will connect to any node. Make sure that we have connections to all // We will connect to any node. Make sure that we have connections to all
// nodes. // nodes.
//**************************************************************************** //****************************************************************************
Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes; if (theImpl->m_optimized_node_selection)
Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex; {
UintR Tcount = 0; Ndb_cluster_connection_node_iter &node_iter=
do { theImpl->m_node_iter;
theCurrentConnectIndex++; theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
if (theCurrentConnectIndex >= tNoOfDbNodes) { while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
theCurrentConnectIndex = 0; {
}//if TretCode= NDB_connect(tNode);
Tcount++; if ((TretCode == 1) ||
tNode = theImpl->theDBnodes[theCurrentConnectIndex]; (TretCode == 2))
TretCode = NDB_connect(tNode); {
if ((TretCode == 1) || (TretCode == 2)) {
//**************************************************************************** //****************************************************************************
// We have connections now to the desired node. Return // We have connections now to the desired node. Return
//**************************************************************************** //****************************************************************************
return getConnectedNdbConnection(tNode); return getConnectedNdbConnection(tNode);
} else if (TretCode != 0) { } else if (TretCode != 0) {
tAnyAlive = 1; tAnyAlive= 1;
}//if }//if
} while (Tcount < tNoOfDbNodes); }
}
else // just do a regular round robin
{
Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
UintR Tcount = 0;
do {
theCurrentConnectIndex++;
if (theCurrentConnectIndex >= tNoOfDbNodes)
theCurrentConnectIndex = 0;
Tcount++;
tNode= theImpl->theDBnodes[theCurrentConnectIndex];
TretCode= NDB_connect(tNode);
if ((TretCode == 1) ||
(TretCode == 2))
{
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
return getConnectedNdbConnection(tNode);
} else if (TretCode != 0) {
tAnyAlive= 1;
}//if
} while (Tcount < tNoOfDbNodes);
}
//**************************************************************************** //****************************************************************************
// We were unable to find a free connection. If no node alive we will report // We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure. // error code for cluster failure otherwise connection failure.
...@@ -149,8 +173,8 @@ Ndb::NDB_connect(Uint32 tNode) ...@@ -149,8 +173,8 @@ Ndb::NDB_connect(Uint32 tNode)
tReturnCode = tp->sendSignal(tSignal, tNode); tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal); releaseSignal(tSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_node = tNode; theImpl->theWaiter.m_node = tNode;
theWaiter.m_state = WAIT_TC_SEIZE; theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse(); tReturnCode = receiveResponse();
}//if }//if
} else { } else {
...@@ -243,50 +267,28 @@ Ndb::waitUntilReady(int timeout) ...@@ -243,50 +267,28 @@ Ndb::waitUntilReady(int timeout)
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
do { while (theNode == 0) {
if ((id = theNode) != 0) {
unsigned int foundAliveNode = 0;
TransporterFacade *tp = TransporterFacade::instance();
tp->lock_mutex();
for (unsigned int i = 0; i < theImpl->theNoOfDBnodes; i++) {
const NodeId nodeId = theImpl->theDBnodes[i];
//************************************************
// If any node is answering, ndb is answering
//************************************************
if (tp->get_node_alive(nodeId) != 0) {
foundAliveNode++;
}//if
}//for
tp->unlock_mutex();
if (foundAliveNode == theImpl->theNoOfDBnodes) {
DBUG_RETURN(0);
}//if
if (foundAliveNode > 0) {
noChecksSinceFirstAliveFound++;
}//if
if (noChecksSinceFirstAliveFound > 30) {
DBUG_RETURN(0);
}//if
}//if theNode != 0
if (secondsCounter >= timeout) if (secondsCounter >= timeout)
break; {
theError.code = 4269;
DBUG_RETURN(-1);
}
NdbSleep_MilliSleep(100); NdbSleep_MilliSleep(100);
milliCounter += 100; milliCounter += 100;
if (milliCounter >= 1000) { if (milliCounter >= 1000) {
secondsCounter++; secondsCounter++;
milliCounter = 0; milliCounter = 0;
}//if }//if
} while (1); }
if (id == 0) {
theError.code = 4269; if (theImpl->m_ndb_cluster_connection.wait_until_ready
(timeout-secondsCounter,30))
{
theError.code = 4009;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (noChecksSinceFirstAliveFound > 0) {
DBUG_RETURN(0); DBUG_RETURN(0);
}//if
theError.code = 4009;
DBUG_RETURN(-1);
} }
/***************************************************************************** /*****************************************************************************
...@@ -311,8 +313,8 @@ Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen) ...@@ -311,8 +313,8 @@ Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen)
*/ */
Uint32 nodeId; Uint32 nodeId;
if(keyData != 0) { if(keyData != 0) {
Uint32 fragmentId = computeFragmentId(keyData, keyLen); nodeId = 0; // guess not supported
nodeId = guessPrimaryNode(fragmentId); // nodeId = m_ndb_cluster_connection->guess_primary_node(keyData, keyLen);
} else { } else {
nodeId = 0; nodeId = 0;
}//if }//if
...@@ -373,44 +375,6 @@ Ndb::hupp(NdbConnection* pBuddyTrans) ...@@ -373,44 +375,6 @@ Ndb::hupp(NdbConnection* pBuddyTrans)
}//if }//if
}//Ndb::hupp() }//Ndb::hupp()
NdbConnection*
Ndb::startTransactionDGroup(Uint32 aPriority, const char * keyData, int type)
{
char DGroup[4];
if ((keyData == NULL) ||
(type > 1)) {
theError.code = 4118;
return NULL;
}//if
if (theInitState == Initialised) {
theError.code = 0;
checkFailedNode();
/**
* If the user supplied key data
* We will make a qualified quess to which node is the primary for the
* the fragment and contact that node
*/
Uint32 fragmentId;
if (type == 0) {
DGroup[0] = keyData[0];
DGroup[1] = keyData[1];
DGroup[2] = 0x30;
DGroup[3] = 0x30;
fragmentId = computeFragmentId(&DGroup[0], 4);
} else {
Uint32 hashValue = ((keyData[0] - 0x30) * 10) + (keyData[1] - 0x30);
fragmentId = getFragmentId(hashValue);
}//if
Uint32 nodeId = guessPrimaryNode(fragmentId);
NdbConnection* trans= startTransactionLocal(aPriority, nodeId);
DBUG_PRINT("exit", ("start DGroup trans: 0x%x transid: 0x%llx",
trans, trans ? trans->getTransactionId() : 0));
return trans;
} else {
return NULL;
}//if
}//Ndb::startTransaction()
NdbConnection* NdbConnection*
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId) Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
...@@ -1010,118 +974,6 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -1010,118 +974,6 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
return ~0; return ~0;
} }
static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
static const Uint32 MAX_KEY_LEN_32_WORDS = 8;
static const Uint32 MAX_KEY_LEN_BYTES = 32;
Uint32
Ndb::computeFragmentId(const char * keyData, Uint32 keyLen)
{
Uint64 tempData[MAX_KEY_LEN_64_WORDS];
const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
const char * usedKeyData = 0;
/**
* If key data buffer is not aligned (on 64 bit boundary)
* or key len is not a multiple of 4
* Use temp data
*/
if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
usedKeyData = keyData;
} else {
memcpy(&tempData[0], keyData, keyLen);
const int slack = keyLen & 3;
if(slack > 0) {
memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
}//if
usedKeyData = (char *)&tempData[0];
}//if
Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
hashValue >>= startTransactionNodeSelectionData.kValue;
return getFragmentId(hashValue);
}//Ndb::computeFragmentId()
Uint32
Ndb::getFragmentId(Uint32 hashValue)
{
Uint32 fragmentId = hashValue &
startTransactionNodeSelectionData.hashValueMask;
if(fragmentId < startTransactionNodeSelectionData.hashpointerValue) {
fragmentId = hashValue &
((startTransactionNodeSelectionData.hashValueMask << 1) + 1);
}//if
return fragmentId;
}
Uint32
Ndb::guessPrimaryNode(Uint32 fragmentId){
//ASSERT(((fragmentId > 0) && fragmentId <
// startTransactionNodeSelectionData.noOfFragments), "Invalid fragementId");
return startTransactionNodeSelectionData.fragment2PrimaryNodeMap[fragmentId];
}
void
Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
Uint8 nodeIds[]) {
kValue = 6;
noOfFragments = 2 * noOfNodes;
/**
* Compute hashValueMask and hashpointerValue
*/
{
Uint32 topBit = (1 << 31);
for(int i = 31; i>=0; i--){
if((noOfFragments & topBit) != 0)
break;
topBit >>= 1;
}
hashValueMask = topBit - 1;
hashpointerValue = noOfFragments - (hashValueMask + 1);
}
/**
* This initialization depends on
* the fact that:
* primary node for fragment i = i % noOfNodes
*
* This algorithm should be implemented in Dbdih
*/
{
if (fragment2PrimaryNodeMap != 0)
abort();
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i;
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i] = nodeIds[i];
}
// Sort them (bubble sort)
for(i = 0; i<noOfNodes-1; i++)
for(Uint32 j = i+1; j<noOfNodes; j++)
if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
Uint32 tmp = fragment2PrimaryNodeMap[i];
fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
fragment2PrimaryNodeMap[j] = tmp;
}
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
}
}
}
void
Ndb::StartTransactionNodeSelectionData::release(){
delete [] fragment2PrimaryNodeMap;
fragment2PrimaryNodeMap = 0;
}
Uint32 Uint32
convertEndian(Uint32 Data) convertEndian(Uint32 Data)
{ {
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include <Bitmask.hpp> #include <Bitmask.hpp>
#include <AttributeList.hpp> #include <AttributeList.hpp>
#include <Ndb.hpp> #include <Ndb.hpp>
#include "NdbImpl.hpp" #include "NdbWaiter.hpp"
#include "DictCache.hpp" #include "DictCache.hpp"
class NdbDictObjectImpl { class NdbDictObjectImpl {
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
#ifndef NDB_IMPL_HPP #ifndef NDB_IMPL_HPP
#define NDB_IMPL_HPP #define NDB_IMPL_HPP
#include <ndb_global.h>
#include <Ndb.hpp> #include <Ndb.hpp>
#include <NdbOut.hpp>
#include <NdbError.hpp> #include <NdbError.hpp>
#include <NdbCondition.h> #include <NdbCondition.h>
#include <NdbReceiver.hpp> #include <NdbReceiver.hpp>
...@@ -26,6 +28,8 @@ ...@@ -26,6 +28,8 @@
#include <NdbTick.h> #include <NdbTick.h>
#include "ndb_cluster_connection_impl.hpp"
#include "NdbDictionaryImpl.hpp"
#include "ObjectMap.hpp" #include "ObjectMap.hpp"
/** /**
...@@ -33,11 +37,16 @@ ...@@ -33,11 +37,16 @@
*/ */
class NdbImpl { class NdbImpl {
public: public:
NdbImpl(); NdbImpl(Ndb_cluster_connection *, Ndb&);
~NdbImpl(); ~NdbImpl();
Ndb_cluster_connection_impl &m_ndb_cluster_connection;
NdbDictionaryImpl m_dictionary;
// Ensure good distribution of connects // Ensure good distribution of connects
Uint32 theCurrentConnectIndex; Uint32 theCurrentConnectIndex;
Ndb_cluster_connection_node_iter m_node_iter;
NdbObjectIdMap theNdbObjectIdMap; NdbObjectIdMap theNdbObjectIdMap;
...@@ -46,6 +55,10 @@ public: ...@@ -46,6 +55,10 @@ public:
// 1 indicates to release all connections to node // 1 indicates to release all connections to node
Uint32 the_release_ind[MAX_NDB_NODES]; Uint32 the_release_ind[MAX_NDB_NODES];
NdbWaiter theWaiter;
int m_optimized_node_selection;
}; };
#ifdef VM_TRACE #ifdef VM_TRACE
...@@ -113,26 +126,6 @@ Ndb::checkInitState() ...@@ -113,26 +126,6 @@ Ndb::checkInitState()
Uint32 convertEndian(Uint32 Data); Uint32 convertEndian(Uint32 Data);
enum WaitSignalType {
NO_WAIT = 0,
WAIT_NODE_FAILURE = 1, // Node failure during wait
WST_WAIT_TIMEOUT = 2, // Timeout during wait
WAIT_TC_SEIZE = 3,
WAIT_TC_RELEASE = 4,
WAIT_NDB_TAMPER = 5,
WAIT_SCAN = 6,
// DICT stuff
WAIT_GET_TAB_INFO_REQ = 11,
WAIT_CREATE_TAB_REQ = 12,
WAIT_DROP_TAB_REQ = 13,
WAIT_ALTER_TAB_REQ = 14,
WAIT_CREATE_INDX_REQ = 15,
WAIT_DROP_INDX_REQ = 16,
WAIT_LIST_TABLES_CONF = 17
};
enum LockMode { enum LockMode {
Read, Read,
Update, Update,
...@@ -140,44 +133,4 @@ enum LockMode { ...@@ -140,44 +133,4 @@ enum LockMode {
Delete Delete
}; };
#include <NdbOut.hpp>
inline
void
NdbWaiter::wait(int waitTime)
{
const bool forever = (waitTime == -1);
const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
while (1) {
if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
break;
if (forever) {
NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
} else {
if (waitTime <= 0) {
m_state = WST_WAIT_TIMEOUT;
break;
}
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
}
inline
void
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
NdbCondition_Signal(m_condition);
}
}
inline
void
NdbWaiter::signal(Uint32 state){
m_state = state;
NdbCondition_Signal(m_condition);
}
#endif #endif
...@@ -528,8 +528,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) ...@@ -528,8 +528,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
/** /**
* No completed... * No completed...
*/ */
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
...@@ -1358,8 +1358,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, ...@@ -1358,8 +1358,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
Uint32 tmp = m_sent_receivers_count; Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver; s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){ while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
...@@ -1506,8 +1506,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1506,8 +1506,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/ */
while(theError.code == 0 && m_sent_receivers_count) while(theError.code == 0 && m_sent_receivers_count)
{ {
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
...@@ -1576,8 +1576,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1576,8 +1576,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/ */
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{ {
theNdb->theWaiter.m_node = nodeId; theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef NDB_WAITER_HPP
#define NDB_WAITER_HPP
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <NdbError.hpp>
#include <NdbCondition.h>
#include <NdbReceiver.hpp>
#include <NdbOperation.hpp>
#include <kernel/ndb_limits.h>
#include <NdbTick.h>
enum WaitSignalType {
NO_WAIT = 0,
WAIT_NODE_FAILURE = 1, // Node failure during wait
WST_WAIT_TIMEOUT = 2, // Timeout during wait
WAIT_TC_SEIZE = 3,
WAIT_TC_RELEASE = 4,
WAIT_NDB_TAMPER = 5,
WAIT_SCAN = 6,
// DICT stuff
WAIT_GET_TAB_INFO_REQ = 11,
WAIT_CREATE_TAB_REQ = 12,
WAIT_DROP_TAB_REQ = 13,
WAIT_ALTER_TAB_REQ = 14,
WAIT_CREATE_INDX_REQ = 15,
WAIT_DROP_INDX_REQ = 16,
WAIT_LIST_TABLES_CONF = 17
};
class NdbWaiter {
public:
NdbWaiter();
~NdbWaiter();
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
struct NdbCondition * m_condition;
};
inline
void
NdbWaiter::wait(int waitTime)
{
const bool forever = (waitTime == -1);
const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
while (1) {
if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
break;
if (forever) {
NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
} else {
if (waitTime <= 0) {
m_state = WST_WAIT_TIMEOUT;
break;
}
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
}
inline
void
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
NdbCondition_Signal(m_condition);
}
}
inline
void
NdbWaiter::signal(Uint32 state){
m_state = state;
NdbCondition_Signal(m_condition);
}
#endif
...@@ -209,8 +209,6 @@ void Ndb::connected(Uint32 ref) ...@@ -209,8 +209,6 @@ void Ndb::connected(Uint32 ref)
tmpTheNode, tmpTheNode,
theImpl->theNoOfDBnodes, theImpl->theNoOfDBnodes,
theFirstTransId)); theFirstTransId));
startTransactionNodeSelectionData.init(theImpl->theNoOfDBnodes,
theImpl->theDBnodes);
theCommitAckSignal = new NdbApiSignal(theMyRef); theCommitAckSignal = new NdbApiSignal(theMyRef);
theDictionary->m_receiver.m_reference= theMyRef; theDictionary->m_receiver.m_reference= theMyRef;
...@@ -251,7 +249,7 @@ Ndb::report_node_failure(Uint32 node_id) ...@@ -251,7 +249,7 @@ Ndb::report_node_failure(Uint32 node_id)
theImpl->the_release_ind[node_id] = 1; theImpl->the_release_ind[node_id] = 1;
// must come after // must come after
theImpl->the_release_ind[0] = 1; theImpl->the_release_ind[0] = 1;
theWaiter.nodeFail(node_id); theImpl->theWaiter.nodeFail(node_id);
return; return;
}//Ndb::report_node_failure() }//Ndb::report_node_failure()
...@@ -330,7 +328,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -330,7 +328,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
NdbConnection* tCon; NdbConnection* tCon;
int tReturnCode = -1; int tReturnCode = -1;
const Uint32* tDataPtr = aSignal->getDataPtr(); const Uint32* tDataPtr = aSignal->getDataPtr();
const Uint32 tWaitState = theWaiter.m_state; const Uint32 tWaitState = theImpl->theWaiter.m_state;
const Uint32 tSignalNumber = aSignal->readSignalNumber(); const Uint32 tSignalNumber = aSignal->readSignalNumber();
const Uint32 tFirstData = *tDataPtr; const Uint32 tFirstData = *tDataPtr;
const Uint32 tLen = aSignal->getLength(); const Uint32 tLen = aSignal->getLength();
...@@ -401,7 +399,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -401,7 +399,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
break; break;
case NdbReceiver::NDB_SCANRECEIVER: case NdbReceiver::NDB_SCANRECEIVER:
tCon->theScanningOp->receiver_delivered(tRec); tCon->theScanningOp->receiver_delivered(tRec);
theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ? theImpl->theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
(Uint32) NO_WAIT : tWaitState); (Uint32) NO_WAIT : tWaitState);
break; break;
default: default:
...@@ -598,7 +596,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -598,7 +596,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCSEIZECONF(aSignal); tReturnCode = tCon->receiveTCSEIZECONF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} else { } else {
goto InvalidSignal; goto InvalidSignal;
}//if }//if
...@@ -618,7 +616,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -618,7 +616,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCSEIZEREF(aSignal); tReturnCode = tCon->receiveTCSEIZEREF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} else { } else {
return; return;
}//if }//if
...@@ -638,7 +636,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -638,7 +636,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCRELEASECONF(aSignal); tReturnCode = tCon->receiveTCRELEASECONF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
}//if }//if
break; break;
} }
...@@ -656,7 +654,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -656,7 +654,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if }//if
tReturnCode = tCon->receiveTCRELEASEREF(aSignal); tReturnCode = tCon->receiveTCRELEASEREF(aSignal);
if (tReturnCode != -1) { if (tReturnCode != -1) {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
}//if }//if
break; break;
} }
...@@ -708,7 +706,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -708,7 +706,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
return; return;
tReturnCode = tCon->receiveDIHNDBTAMPER(aSignal); tReturnCode = tCon->receiveDIHNDBTAMPER(aSignal);
if (tReturnCode != -1) if (tReturnCode != -1)
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
break; break;
} }
case GSN_SCAN_TABCONF: case GSN_SCAN_TABCONF:
...@@ -730,7 +728,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -730,7 +728,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tLen - ScanTabConf::SignalLength); tLen - ScanTabConf::SignalLength);
} }
if (tReturnCode != -1 && tWaitState == WAIT_SCAN) if (tReturnCode != -1 && tWaitState == WAIT_SCAN)
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
break; break;
} else { } else {
goto InvalidSignal; goto InvalidSignal;
...@@ -749,7 +747,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -749,7 +747,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
if (tCon->checkMagicNumber() == 0){ if (tCon->checkMagicNumber() == 0){
tReturnCode = tCon->receiveSCAN_TABREF(aSignal); tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
if (tReturnCode != -1 && tWaitState == WAIT_SCAN){ if (tReturnCode != -1 && tWaitState == WAIT_SCAN){
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} }
break; break;
} }
...@@ -774,7 +772,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -774,7 +772,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
switch(com){ switch(com){
case 1: case 1:
tCon->theScanningOp->receiver_delivered(tRec); tCon->theScanningOp->receiver_delivered(tRec);
theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ? theImpl->theWaiter.m_state = (((WaitSignalType) tWaitState) == WAIT_SCAN ?
(Uint32) NO_WAIT : tWaitState); (Uint32) NO_WAIT : tWaitState);
break; break;
case 0: case 0:
...@@ -838,16 +836,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ...@@ -838,16 +836,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
goto InvalidSignal; goto InvalidSignal;
}//switch }//switch
if (theWaiter.m_state == NO_WAIT) { if (theImpl->theWaiter.m_state == NO_WAIT) {
// Wake up the thread waiting for response // Wake up the thread waiting for response
NdbCondition_Signal(theWaiter.m_condition); NdbCondition_Signal(theImpl->theWaiter.m_condition);
}//if }//if
return; return;
InvalidSignal: InvalidSignal:
#ifdef VM_TRACE #ifdef VM_TRACE
ndbout_c("Ndbif: Error Ndb::handleReceivedSignal " ndbout_c("Ndbif: Error Ndb::handleReceivedSignal "
"(GSN=%d, theWaiter.m_state=%d)" "(GSN=%d, theImpl->theWaiter.m_state=%d)"
" sender = (Block: %d Node: %d)", " sender = (Block: %d Node: %d)",
tSignalNumber, tSignalNumber,
tWaitState, tWaitState,
...@@ -895,7 +893,7 @@ Ndb::completedTransaction(NdbConnection* aCon) ...@@ -895,7 +893,7 @@ Ndb::completedTransaction(NdbConnection* aCon)
if ((theMinNoOfEventsToWakeUp != 0) && if ((theMinNoOfEventsToWakeUp != 0) &&
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) { (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
theMinNoOfEventsToWakeUp = 0; theMinNoOfEventsToWakeUp = 0;
NdbCondition_Signal(theWaiter.m_condition); NdbCondition_Signal(theImpl->theWaiter.m_condition);
return; return;
}//if }//if
} else { } else {
...@@ -1155,9 +1153,9 @@ void ...@@ -1155,9 +1153,9 @@ void
Ndb::waitCompletedTransactions(int aMilliSecondsToWait, Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
int noOfEventsToWaitFor) int noOfEventsToWaitFor)
{ {
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
/** /**
* theWaiter.m_state = NO_WAIT; * theImpl->theWaiter.m_state = NO_WAIT;
* To ensure no messup with synchronous node fail handling * To ensure no messup with synchronous node fail handling
* (see ReportFailure) * (see ReportFailure)
*/ */
...@@ -1166,8 +1164,8 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait, ...@@ -1166,8 +1164,8 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
do { do {
if (waitTime < 1000) waitTime = 1000; if (waitTime < 1000) waitTime = 1000;
NdbCondition_WaitTimeout(theWaiter.m_condition, NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
(NdbMutex*)theWaiter.m_mutex, (NdbMutex*)theImpl->theWaiter.m_mutex,
waitTime); waitTime);
if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) { if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
break; break;
...@@ -1273,23 +1271,23 @@ Ndb::receiveResponse(int waitTime){ ...@@ -1273,23 +1271,23 @@ Ndb::receiveResponse(int waitTime){
int tResultCode; int tResultCode;
TransporterFacade::instance()->checkForceSend(theNdbBlockNumber); TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
theWaiter.wait(waitTime); theImpl->theWaiter.wait(waitTime);
if(theWaiter.m_state == NO_WAIT) { if(theImpl->theWaiter.m_state == NO_WAIT) {
tResultCode = 0; tResultCode = 0;
} else { } else {
#ifdef VM_TRACE #ifdef VM_TRACE
ndbout << "ERR: receiveResponse - theWaiter.m_state = "; ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
ndbout << theWaiter.m_state << endl; ndbout << theImpl->theWaiter.m_state << endl;
#endif #endif
if (theWaiter.m_state == WAIT_NODE_FAILURE){ if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){
tResultCode = -2; tResultCode = -2;
} else { } else {
tResultCode = -1; tResultCode = -1;
} }
theWaiter.m_state = NO_WAIT; theImpl->theWaiter.m_state = NO_WAIT;
} }
return tResultCode; return tResultCode;
}//Ndb::receiveResponse() }//Ndb::receiveResponse()
...@@ -1321,8 +1319,8 @@ Ndb::sendRecSignal(Uint16 node_id, ...@@ -1321,8 +1319,8 @@ Ndb::sendRecSignal(Uint16 node_id,
if (tp->check_send_size(node_id, send_size)) { if (tp->check_send_size(node_id, send_size)) {
return_code = tp->sendSignal(aSignal, node_id); return_code = tp->sendSignal(aSignal, node_id);
if (return_code != -1) { if (return_code != -1) {
theWaiter.m_node = node_id; theImpl->theWaiter.m_node = node_id;
theWaiter.m_state = aWaitState; theImpl->theWaiter.m_state = aWaitState;
return_code = receiveResponse(); return_code = receiveResponse();
} else { } else {
return_code = -3; return_code = -3;
......
...@@ -50,7 +50,9 @@ Ndb(const char* aDataBase); ...@@ -50,7 +50,9 @@ Ndb(const char* aDataBase);
Parameters: aDataBase : Name of the database. Parameters: aDataBase : Name of the database.
Remark: Connect to the database. Remark: Connect to the database.
***************************************************************************/ ***************************************************************************/
Ndb::Ndb( const char* aDataBase , const char* aSchema) { Ndb::Ndb( const char* aDataBase , const char* aSchema)
: theImpl(NULL)
{
DBUG_ENTER("Ndb::Ndb()"); DBUG_ENTER("Ndb::Ndb()");
DBUG_PRINT("enter",("(old)Ndb::Ndb this=0x%x", this)); DBUG_PRINT("enter",("(old)Ndb::Ndb this=0x%x", this));
if (theNoOfNdbObjects < 0) if (theNoOfNdbObjects < 0)
...@@ -66,6 +68,7 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) { ...@@ -66,6 +68,7 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) {
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection, Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema) const char* aDataBase , const char* aSchema)
: theImpl(NULL)
{ {
DBUG_ENTER("Ndb::Ndb()"); DBUG_ENTER("Ndb::Ndb()");
DBUG_PRINT("enter",("Ndb::Ndb this=0x%x", this)); DBUG_PRINT("enter",("Ndb::Ndb this=0x%x", this));
...@@ -82,7 +85,10 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -82,7 +85,10 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
{ {
DBUG_ENTER("Ndb::setup"); DBUG_ENTER("Ndb::setup");
m_ndb_cluster_connection= ndb_cluster_connection; assert(theImpl == NULL);
theImpl= new NdbImpl(ndb_cluster_connection,*this);
theDictionary= &(theImpl->m_dictionary);
thePreparedTransactionsArray= NULL; thePreparedTransactionsArray= NULL;
theSentTransactionsArray= NULL; theSentTransactionsArray= NULL;
theCompletedTransactionsArray= NULL; theCompletedTransactionsArray= NULL;
...@@ -93,8 +99,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -93,8 +99,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theMaxNoOfTransactions= 0; theMaxNoOfTransactions= 0;
theMinNoOfEventsToWakeUp= 0; theMinNoOfEventsToWakeUp= 0;
prefixEnd= NULL; prefixEnd= NULL;
theImpl= NULL;
theDictionary= NULL;
theConIdleList= NULL; theConIdleList= NULL;
theOpIdleList= NULL; theOpIdleList= NULL;
theScanOpIdleList= NULL; theScanOpIdleList= NULL;
...@@ -153,14 +157,12 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -153,14 +157,12 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len :
sizeof(prefixName) - 1); sizeof(prefixName) - 1);
theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr; theImpl->theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
// Signal that the constructor has finished OK // Signal that the constructor has finished OK
if (theInitState == NotConstructed) if (theInitState == NotConstructed)
theInitState = NotInitialised; theInitState = NotInitialised;
theImpl = new NdbImpl();
{ {
NdbGlobalEventBufferHandle *h= NdbGlobalEventBufferHandle *h=
NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS); NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS);
...@@ -171,11 +173,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -171,11 +173,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theGlobalEventBufferHandle = h; theGlobalEventBufferHandle = h;
} }
theDictionary = new NdbDictionaryImpl(*this);
if (theDictionary == NULL) {
ndbout_c("Ndb cailed to allocate dictionary");
exit(-1);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -201,8 +198,6 @@ Ndb::~Ndb() ...@@ -201,8 +198,6 @@ Ndb::~Ndb()
DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this)); DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this));
doDisconnect(); doDisconnect();
delete theDictionary;
NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle); NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle);
if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){ if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){
...@@ -245,7 +240,6 @@ Ndb::~Ndb() ...@@ -245,7 +240,6 @@ Ndb::~Ndb()
freeSignal(); freeSignal();
releaseTransactionArrays(); releaseTransactionArrays();
startTransactionNodeSelectionData.release();
delete []theConnectionArray; delete []theConnectionArray;
if(theCommitAckSignal != NULL){ if(theCommitAckSignal != NULL){
...@@ -292,14 +286,20 @@ NdbWaiter::~NdbWaiter(){ ...@@ -292,14 +286,20 @@ NdbWaiter::~NdbWaiter(){
NdbCondition_Destroy(m_condition); NdbCondition_Destroy(m_condition);
} }
NdbImpl::NdbImpl() : theNdbObjectIdMap(1024,1024), NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection,
theCurrentConnectIndex(0), Ndb& ndb)
theNoOfDBnodes(0) : m_ndb_cluster_connection(ndb_cluster_connection->m_impl),
m_dictionary(ndb),
theCurrentConnectIndex(0),
theNdbObjectIdMap(1024,1024),
theNoOfDBnodes(0)
{ {
int i; int i;
for (i = 0; i < MAX_NDB_NODES; i++) { for (i = 0; i < MAX_NDB_NODES; i++) {
the_release_ind[i] = 0; the_release_ind[i] = 0;
} }
m_optimized_node_selection=
m_ndb_cluster_connection.m_optimized_node_selection;
} }
NdbImpl::~NdbImpl() NdbImpl::~NdbImpl()
......
...@@ -127,7 +127,7 @@ private: ...@@ -127,7 +127,7 @@ private:
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
friend class GrepSS; friend class GrepSS;
friend class Ndb; friend class Ndb;
friend class Ndb_cluster_connection; friend class Ndb_cluster_connection_impl;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId); int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
#include <my_pthread.h> #include <my_pthread.h>
#include <my_sys.h> #include <my_sys.h>
#include <ndb_cluster_connection.hpp> #include "ndb_cluster_connection_impl.hpp"
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
#include <TransporterFacade.hpp> #include <TransporterFacade.hpp>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <NdbSleep.h> #include <NdbSleep.h>
...@@ -26,6 +28,8 @@ ...@@ -26,6 +28,8 @@
#include <ndb_limits.h> #include <ndb_limits.h>
#include <ConfigRetriever.hpp> #include <ConfigRetriever.hpp>
#include <ndb_version.h> #include <ndb_version.h>
#include <Vector.hpp>
#include <md5_hash.hpp>
static int g_run_connect_thread= 0; static int g_run_connect_thread= 0;
...@@ -35,56 +39,46 @@ NdbMutex *ndb_global_event_buffer_mutex= NULL; ...@@ -35,56 +39,46 @@ NdbMutex *ndb_global_event_buffer_mutex= NULL;
NdbMutex *ndb_print_state_mutex= NULL; NdbMutex *ndb_print_state_mutex= NULL;
#endif #endif
/*
* Ndb_cluster_connection
*/
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
: m_impl(* new Ndb_cluster_connection_impl(connect_string))
{ {
DBUG_ENTER("Ndb_cluster_connection"); }
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
m_config_retriever= 0; Ndb_cluster_connection::Ndb_cluster_connection
m_connect_thread= 0; (Ndb_cluster_connection_impl& impl) : m_impl(impl)
m_connect_callback= 0; {
}
if (ndb_global_event_buffer_mutex == NULL) Ndb_cluster_connection::~Ndb_cluster_connection()
{ {
ndb_global_event_buffer_mutex= NdbMutex_Create(); Ndb_cluster_connection_impl *tmp = &m_impl;
} if (this != tmp)
#ifdef VM_TRACE delete tmp;
if (ndb_print_state_mutex == NULL)
{
ndb_print_state_mutex= NdbMutex_Create();
}
#endif
m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server: %s",
m_config_retriever->getErrorString());
delete m_config_retriever;
m_config_retriever= 0;
}
DBUG_VOID_RETURN;
} }
int Ndb_cluster_connection::get_connected_port() const int Ndb_cluster_connection::get_connected_port() const
{ {
if (m_config_retriever) if (m_impl.m_config_retriever)
return m_config_retriever->get_mgmd_port(); return m_impl.m_config_retriever->get_mgmd_port();
return -1; return -1;
} }
const char *Ndb_cluster_connection::get_connected_host() const const char *Ndb_cluster_connection::get_connected_host() const
{ {
if (m_config_retriever) if (m_impl.m_config_retriever)
return m_config_retriever->get_mgmd_host(); return m_impl.m_config_retriever->get_mgmd_host();
return 0; return 0;
} }
const char *Ndb_cluster_connection::get_connectstring(char *buf, int buf_sz) const const char *Ndb_cluster_connection::get_connectstring(char *buf,
int buf_sz) const
{ {
if (m_config_retriever) if (m_impl.m_config_retriever)
return m_config_retriever->get_connectstring(buf,buf_sz); return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
return 0; return 0;
} }
...@@ -92,82 +86,415 @@ extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me) ...@@ -92,82 +86,415 @@ extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
{ {
my_thread_init(); my_thread_init();
g_run_connect_thread= 1; g_run_connect_thread= 1;
((Ndb_cluster_connection*) me)->connect_thread(); ((Ndb_cluster_connection_impl*) me)->connect_thread();
my_thread_end(); my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return me; return me;
} }
void Ndb_cluster_connection::connect_thread()
{
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
int r;
do {
NdbSleep_SecSleep(1);
if ((r = connect(0,0,0)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
DBUG_ASSERT(false);
g_run_connect_thread= 0;
} else {
// Wait before making a new connect attempt
NdbSleep_SecSleep(1);
}
} while (g_run_connect_thread);
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void)) int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
{ {
int r; int r;
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread"); DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
m_connect_callback= connect_callback; m_impl.m_connect_callback= connect_callback;
if ((r = connect(0,0,0)) == 1) if ((r = connect(0,0,0)) == 1)
{ {
DBUG_PRINT("info",("starting thread")); DBUG_PRINT("info",("starting thread"));
m_connect_thread= m_impl.m_connect_thread=
NdbThread_Create(run_ndb_cluster_connection_connect_thread, NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this, 32768, "ndb_cluster_connection", (void**)&m_impl, 32768, "ndb_cluster_connection",
NDB_THREAD_PRIO_LOW); NDB_THREAD_PRIO_LOW);
} }
else if (r < 0) else if (r < 0)
{ {
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
else if (m_connect_callback) else if (m_impl.m_connect_callback)
{ {
(*m_connect_callback)(); (*m_impl.m_connect_callback)();
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, int verbose) void Ndb_cluster_connection::set_optimized_node_selection(int val)
{
m_impl.m_optimized_node_selection= val;
}
void
Ndb_cluster_connection_impl::init_get_next_node
(Ndb_cluster_connection_node_iter &iter)
{
if (iter.scan_state != (Uint8)~0)
iter.cur_pos= iter.scan_state;
if (iter.cur_pos >= no_db_nodes())
iter.cur_pos= 0;
iter.init_pos= iter.cur_pos;
iter.scan_state= 0;
// fprintf(stderr,"[init %d]",iter.init_pos);
return;
}
Uint32
Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
{
Uint32 cur_pos= iter.cur_pos;
if (cur_pos >= no_db_nodes())
return 0;
Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase();
Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
if (iter.scan_state != (Uint8)~0)
{
assert(iter.scan_state < no_db_nodes());
if (nodes[iter.scan_state].group == node.group)
iter.scan_state= ~0;
else
return nodes[iter.scan_state++].id;
}
// fprintf(stderr,"[%d]",node.id);
cur_pos++;
Uint32 init_pos= iter.init_pos;
if (cur_pos == node.next_group)
{
cur_pos= nodes[init_pos].this_group;
}
// fprintf(stderr,"[cur_pos %d]",cur_pos);
if (cur_pos != init_pos)
iter.cur_pos= cur_pos;
else
{
iter.cur_pos= node.next_group;
iter.init_pos= node.next_group;
}
return node.id;
}
Uint32
Ndb_cluster_connection::no_db_nodes()
{
return m_impl.m_all_nodes.size();
}
int
Ndb_cluster_connection::wait_until_ready(int timeout,
int timeout_after_first_alive)
{
DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
TransporterFacade *tp = TransporterFacade::instance();
if (tp == 0)
{
DBUG_RETURN(-1);
}
if (tp->ownId() == 0)
{
DBUG_RETURN(-1);
}
int secondsCounter = 0;
int milliCounter = 0;
int noChecksSinceFirstAliveFound = 0;
do {
unsigned int foundAliveNode = 0;
tp->lock_mutex();
for(unsigned i= 0; i < no_db_nodes(); i++)
{
//************************************************
// If any node is answering, ndb is answering
//************************************************
if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
foundAliveNode++;
}
}
tp->unlock_mutex();
if (foundAliveNode == no_db_nodes())
{
DBUG_RETURN(0);
}
else if (foundAliveNode > 0)
{
noChecksSinceFirstAliveFound++;
if (timeout_after_first_alive >= 0)
{
if (noChecksSinceFirstAliveFound > timeout_after_first_alive)
DBUG_RETURN(0);
}
else // timeout_after_first_alive < 0
{
if (noChecksSinceFirstAliveFound > -timeout_after_first_alive)
DBUG_RETURN(-1);
}
}
else if (secondsCounter >= timeout)
{ // no alive nodes and timed out
DBUG_RETURN(-1);
}
NdbSleep_MilliSleep(100);
milliCounter += 100;
if (milliCounter >= 1000) {
secondsCounter++;
milliCounter = 0;
}//if
} while (1);
}
/*
* Ndb_cluster_connection_impl
*/
Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
connect_string)
: Ndb_cluster_connection(*this),
m_optimized_node_selection(1)
{
DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
m_transporter_facade=
TransporterFacade::theFacadeInstance= new TransporterFacade();
m_connect_thread= 0;
m_connect_callback= 0;
if (ndb_global_event_buffer_mutex == NULL)
{
ndb_global_event_buffer_mutex= NdbMutex_Create();
}
#ifdef VM_TRACE
if (ndb_print_state_mutex == NULL)
{
ndb_print_state_mutex= NdbMutex_Create();
}
#endif
m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server: %s",
m_config_retriever->getErrorString());
delete m_config_retriever;
m_config_retriever= 0;
}
DBUG_VOID_RETURN;
}
Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
{
DBUG_ENTER("~Ndb_cluster_connection");
DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this));
TransporterFacade::stop_instance();
if (m_connect_thread)
{
void *status;
g_run_connect_thread= 0;
NdbThread_WaitFor(m_connect_thread, &status);
NdbThread_Destroy(&m_connect_thread);
m_connect_thread= 0;
}
if (m_transporter_facade != 0)
{
delete m_transporter_facade;
if (m_transporter_facade != TransporterFacade::theFacadeInstance)
abort();
TransporterFacade::theFacadeInstance= 0;
}
if (m_config_retriever)
delete m_config_retriever;
// fragmentToNodeMap.release();
DBUG_VOID_RETURN;
}
void
Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
const ndb_mgm_configuration
&config)
{
DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
for(iter.first(); iter.valid(); iter.next())
{
Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
const char * remoteHostName= 0, * localHostName= 0;
if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
iter.get(CFG_CONNECTION_GROUP, &group);
{
const char * host1= 0, * host2= 0;
iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
localHostName = (nodeid == nodeid1 ? host1 : host2);
remoteHostName = (nodeid == nodeid1 ? host2 : host1);
}
Uint32 type = ~0;
if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
switch(type){
case CONNECTION_TYPE_SHM:{
break;
}
case CONNECTION_TYPE_SCI:{
break;
}
case CONNECTION_TYPE_TCP:{
// connecting through localhost
// check if config_hostname is local
if (SocketServer::tryBind(0,remoteHostName))
group--; // upgrade group value
break;
}
case CONNECTION_TYPE_OSE:{
break;
}
}
m_impl.m_all_nodes.push_back(Node(group,remoteNodeId));
DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
for (int i= m_impl.m_all_nodes.size()-2;
i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group;
i--)
{
Node tmp= m_impl.m_all_nodes[i];
m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1];
m_impl.m_all_nodes[i+1]= tmp;
}
}
int i;
Uint32 cur_group, i_group= 0;
cur_group= ~0;
for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--)
{
if (m_impl.m_all_nodes[i].group != cur_group)
{
cur_group= m_impl.m_all_nodes[i].group;
i_group= i+1;
}
m_impl.m_all_nodes[i].next_group= i_group;
}
cur_group= ~0;
for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
{
if (m_impl.m_all_nodes[i].group != cur_group)
{
cur_group= m_impl.m_all_nodes[i].group;
i_group= i;
}
m_impl.m_all_nodes[i].this_group= i_group;
}
#if 0
for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
{
fprintf(stderr, "[%d] %d %d %d %d\n",
i,
m_impl.m_all_nodes[i].id,
m_impl.m_all_nodes[i].group,
m_impl.m_all_nodes[i].this_group,
m_impl.m_all_nodes[i].next_group);
}
do_test();
#endif
DBUG_VOID_RETURN;
}
void
Ndb_cluster_connection_impl::do_test()
{
Ndb_cluster_connection_node_iter iter;
int n= no_db_nodes()+5;
Uint32 *nodes= new Uint32[n+1];
for (int g= 0; g < n; g++)
{
for (int h= 0; h < n; h++)
{
Uint32 id;
Ndb_cluster_connection_node_iter iter2;
{
for (int j= 0; j < g; j++)
{
nodes[j]= get_next_node(iter2);
}
}
for (int i= 0; i < n; i++)
{
init_get_next_node(iter);
fprintf(stderr, "%d dead:(", g);
id= 0;
while (id == 0)
{
if ((id= get_next_node(iter)) == 0)
break;
for (int j= 0; j < g; j++)
{
if (nodes[j] == id)
{
fprintf(stderr, " %d", id);
id= 0;
break;
}
}
}
fprintf(stderr, ")");
if (id == 0)
{
break;
}
fprintf(stderr, " %d\n", id);
}
fprintf(stderr, "\n");
}
}
delete [] nodes;
}
int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
int verbose)
{ {
DBUG_ENTER("Ndb_cluster_connection::connect"); DBUG_ENTER("Ndb_cluster_connection::connect");
const char* error = 0; const char* error = 0;
do { do {
if (m_config_retriever == 0) if (m_impl.m_config_retriever == 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (m_config_retriever->do_connect(no_retries,retry_delay_in_seconds,verbose)) if (m_impl.m_config_retriever->do_connect(no_retries,
retry_delay_in_seconds,
verbose))
DBUG_RETURN(1); // mgmt server not up yet DBUG_RETURN(1); // mgmt server not up yet
Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/); Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/,
3/*delay*/);
if(nodeId == 0) if(nodeId == 0)
break; break;
ndb_mgm_configuration * props = m_config_retriever->getConfig(); ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig();
if(props == 0) if(props == 0)
break; break;
m_facade->start_instance(nodeId, props); m_impl.m_transporter_facade->start_instance(nodeId, props);
m_impl.init_nodes_vector(nodeId, *props);
ndb_mgm_destroy_configuration(props); ndb_mgm_destroy_configuration(props);
m_facade->connected(); m_impl.m_transporter_facade->connected();
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
ndbout << "Configuration error: "; ndbout << "Configuration error: ";
const char* erString = m_config_retriever->getErrorString(); const char* erString = m_impl.m_config_retriever->getErrorString();
if (erString == 0) { if (erString == 0) {
erString = "No error specified!"; erString = "No error specified!";
} }
...@@ -175,29 +502,132 @@ int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, ...@@ -175,29 +502,132 @@ int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
Ndb_cluster_connection::~Ndb_cluster_connection() void Ndb_cluster_connection_impl::connect_thread()
{ {
DBUG_ENTER("~Ndb_cluster_connection"); DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this)); int r;
TransporterFacade::stop_instance(); do {
if (m_connect_thread) NdbSleep_SecSleep(1);
if ((r = connect(0,0,0)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
DBUG_ASSERT(false);
g_run_connect_thread= 0;
} else {
// Wait before making a new connect attempt
NdbSleep_SecSleep(1);
}
} while (g_run_connect_thread);
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
/*
* Hint handling to select node
* ToDo: fix this
*/
void
Ndb_cluster_connection_impl::FragmentToNodeMap::init(Uint32 noOfNodes,
Uint8 nodeIds[])
{
kValue = 6;
noOfFragments = 2 * noOfNodes;
/**
* Compute hashValueMask and hashpointerValue
*/
{ {
void *status; Uint32 topBit = (1 << 31);
g_run_connect_thread= 0; for(int i = 31; i>=0; i--){
NdbThread_WaitFor(m_connect_thread, &status); if((noOfFragments & topBit) != 0)
NdbThread_Destroy(&m_connect_thread); break;
m_connect_thread= 0; topBit >>= 1;
}
hashValueMask = topBit - 1;
hashpointerValue = noOfFragments - (hashValueMask + 1);
} }
if (m_facade != 0)
/**
* This initialization depends on
* the fact that:
* primary node for fragment i = i % noOfNodes
*
* This algorithm should be implemented in Dbdih
*/
{ {
delete m_facade; if (fragment2PrimaryNodeMap != 0)
if (m_facade != TransporterFacade::theFacadeInstance)
abort(); abort();
TransporterFacade::theFacadeInstance= 0;
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i;
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i] = nodeIds[i];
}
// Sort them (bubble sort)
for(i = 0; i<noOfNodes-1; i++)
for(Uint32 j = i+1; j<noOfNodes; j++)
if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
Uint32 tmp = fragment2PrimaryNodeMap[i];
fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
fragment2PrimaryNodeMap[j] = tmp;
}
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
}
} }
if (m_config_retriever)
delete m_config_retriever;
DBUG_VOID_RETURN;
} }
void
Ndb_cluster_connection_impl::FragmentToNodeMap::release(){
delete [] fragment2PrimaryNodeMap;
fragment2PrimaryNodeMap = 0;
}
static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
Uint32
Ndb_cluster_connection_impl::guess_primary_node(const char *keyData,
Uint32 keyLen)
{
Uint64 tempData[MAX_KEY_LEN_64_WORDS];
const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
const char * usedKeyData = 0;
/**
* If key data buffer is not aligned (on 64 bit boundary)
* or key len is not a multiple of 4
* Use temp data
*/
if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
usedKeyData = keyData;
} else {
memcpy(&tempData[0], keyData, keyLen);
const int slack = keyLen & 3;
if(slack > 0) {
memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
}//if
usedKeyData = (char *)&tempData[0];
}//if
Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
hashValue >>= fragmentToNodeMap.kValue;
Uint32 fragmentId = hashValue &
fragmentToNodeMap.hashValueMask;
if(fragmentId < fragmentToNodeMap.hashpointerValue) {
fragmentId = hashValue &
((fragmentToNodeMap.hashValueMask << 1) + 1);
}//if
return fragmentId;
}
template class Vector<Ndb_cluster_connection_impl::Node>;
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CLUSTER_CONNECTION_IMPL_HPP
#define CLUSTER_CONNECTION_IMPL_HPP
#include <ndb_cluster_connection.hpp>
#include <Vector.hpp>
class TransporterFacade;
class ConfigRetriever;
class NdbThread;
class ndb_mgm_configuration;
struct Ndb_cluster_connection_node_iter {
Ndb_cluster_connection_node_iter() : scan_state(~0),
init_pos(0),
cur_pos(0) {};
Uint8 scan_state;
Uint8 init_pos;
Uint8 cur_pos;
};
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection_impl : public Ndb_cluster_connection
{
Ndb_cluster_connection_impl(const char *connectstring);
~Ndb_cluster_connection_impl();
void do_test();
void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
private:
friend class Ndb;
friend class NdbImpl;
friend void* run_ndb_cluster_connection_connect_thread(void*);
friend class Ndb_cluster_connection;
/**
* Structure containing values for guessing primary node
*/
struct FragmentToNodeMap {
FragmentToNodeMap():
fragment2PrimaryNodeMap(0) {};
Uint32 kValue;
Uint32 hashValueMask;
Uint32 hashpointerValue;
Uint32 noOfFragments;
Uint32 *fragment2PrimaryNodeMap;
void init(Uint32 noOfNodes, Uint8 nodeIds[]);
void release();
} fragmentToNodeMap;
struct Node
{
Node(Uint32 _g= 0, Uint32 _id= 0) : this_group(0),
next_group(0),
group(_g),
id(_id) {};
Uint32 this_group;
Uint32 next_group;
Uint32 group;
Uint32 id;
};
Vector<Node> m_all_nodes;
void init_nodes_vector(Uint32 nodeid, const ndb_mgm_configuration &config);
Uint32 guess_primary_node(const char * keyData, Uint32 keyLen);
void connect_thread();
TransporterFacade *m_transporter_facade;
ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);
int m_optimized_node_selection;
};
#endif
...@@ -142,14 +142,22 @@ int runTestMaxTransaction(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -142,14 +142,22 @@ int runTestMaxTransaction(NDBT_Context* ctx, NDBT_Step* step){
4); 4);
break; break;
case 2: case 2:
ndbout_c("startTransactionDGroup not supported");
abort();
/*
pCon = pNdb->startTransactionDGroup(1, pCon = pNdb->startTransactionDGroup(1,
"TEST", "TEST",
0); 0);
*/
break; break;
case 3: case 3:
ndbout_c("startTransactionDGroup not supported");
abort();
/*
pCon = pNdb->startTransactionDGroup(2, pCon = pNdb->startTransactionDGroup(2,
"TEST", "TEST",
1); 1);
*/
break; break;
default: default:
......
...@@ -24,7 +24,11 @@ ...@@ -24,7 +24,11 @@
static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism=240); static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism=240);
static const char* opt_connect_str= 0; enum ndb_delete_all {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
......
...@@ -19,7 +19,11 @@ ...@@ -19,7 +19,11 @@
#include <NDBT.hpp> #include <NDBT.hpp>
#include <NdbApi.hpp> #include <NdbApi.hpp>
static const char* opt_connect_str= 0; enum ndb_desc_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static int _unqualified = 0; static int _unqualified = 0;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
......
...@@ -21,7 +21,11 @@ ...@@ -21,7 +21,11 @@
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include <NDBT.hpp> #include <NDBT.hpp>
static const char* opt_connect_str= 0; enum ndb_drop_index_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
......
...@@ -21,7 +21,11 @@ ...@@ -21,7 +21,11 @@
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include <NDBT.hpp> #include <NDBT.hpp>
static const char* opt_connect_str= 0; enum ndb_drop_table_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
......
...@@ -161,13 +161,17 @@ list(const char * tabname, ...@@ -161,13 +161,17 @@ list(const char * tabname,
} }
} }
static const char* opt_connect_str= 0; enum ndb_show_tables_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static int _loops; static int _loops;
static int _type; static int _type;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
NDB_STD_OPTS("ndb_desc"), NDB_STD_OPTS("ndb_show_tables"),
{ "database", 'd', "Name of database table is in", { "database", 'd', "Name of database table is in",
(gptr*) &_dbname, (gptr*) &_dbname, 0, (gptr*) &_dbname, (gptr*) &_dbname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
......
...@@ -36,7 +36,10 @@ static Vector<class BackupConsumer *> g_consumers; ...@@ -36,7 +36,10 @@ static Vector<class BackupConsumer *> g_consumers;
static const char* ga_backupPath = "." DIR_SEPARATOR; static const char* ga_backupPath = "." DIR_SEPARATOR;
static const char* opt_connect_str= NULL; enum ndb_restore_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
/** /**
* print and restore flags * print and restore flags
......
...@@ -36,7 +36,11 @@ int scanReadRecords(Ndb*, ...@@ -36,7 +36,11 @@ int scanReadRecords(Ndb*,
char delim, char delim,
bool orderby); bool orderby);
static const char* opt_connect_str= 0; enum ndb_select_all_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static const char* _delimiter = "\t"; static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock, static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
......
...@@ -32,7 +32,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab, ...@@ -32,7 +32,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
int* count_rows, int* count_rows,
UtilTransactions::ScanLock lock); UtilTransactions::ScanLock lock);
static const char* opt_connect_str= 0; enum ndb_select_count_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static int _parallelism = 240; static int _parallelism = 240;
static int _lock = 0; static int _lock = 0;
......
...@@ -30,7 +30,11 @@ int ...@@ -30,7 +30,11 @@ int
waitClusterStatus(const char* _addr, ndb_mgm_node_status _status, waitClusterStatus(const char* _addr, ndb_mgm_node_status _status,
unsigned int _timeout); unsigned int _timeout);
static const char* opt_connect_str= 0; enum ndb_waiter_options {
NDB_STD_OPTS_OPTIONS
};
NDB_STD_OPTS_VARS;
static int _no_contact = 0; static int _no_contact = 0;
static int _timeout = 120; static int _timeout = 120;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
......
...@@ -32,6 +32,10 @@ ...@@ -32,6 +32,10 @@
#include <ndbapi/NdbApi.hpp> #include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp> #include <ndbapi/NdbScanFilter.hpp>
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring;
// Default value for parallelism // Default value for parallelism
static const int parallelism= 240; static const int parallelism= 240;
...@@ -39,9 +43,6 @@ static const int parallelism= 240; ...@@ -39,9 +43,6 @@ static const int parallelism= 240;
// createable against NDB from this handler // createable against NDB from this handler
static const int max_transactions= 256; static const int max_transactions= 256;
// connectstring to cluster if given by mysqld
const char *ndbcluster_connectstring= 0;
static const char *ha_ndb_ext=".ndb"; static const char *ha_ndb_ext=".ndb";
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
...@@ -4233,15 +4234,19 @@ bool ndbcluster_init() ...@@ -4233,15 +4234,19 @@ bool ndbcluster_init()
int res; int res;
DBUG_ENTER("ndbcluster_init"); DBUG_ENTER("ndbcluster_init");
// Set connectstring if specified // Set connectstring if specified
if (ndbcluster_connectstring != 0) if (opt_ndbcluster_connectstring != 0)
DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring)); DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
if ((g_ndb_cluster_connection= if ((g_ndb_cluster_connection=
new Ndb_cluster_connection(ndbcluster_connectstring)) == 0) new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
{ {
DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring)); DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
opt_ndbcluster_connectstring));
goto ndbcluster_init_error; goto ndbcluster_init_error;
} }
g_ndb_cluster_connection->set_optimized_node_selection
(opt_ndb_optimized_node_selection);
// Create a Ndb object to open the connection to NDB // Create a Ndb object to open the connection to NDB
g_ndb= new Ndb(g_ndb_cluster_connection, "sys"); g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
...@@ -4256,7 +4261,7 @@ bool ndbcluster_init() ...@@ -4256,7 +4261,7 @@ bool ndbcluster_init()
DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d", DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
g_ndb_cluster_connection->get_connected_host(), g_ndb_cluster_connection->get_connected_host(),
g_ndb_cluster_connection->get_connected_port())); g_ndb_cluster_connection->get_connected_port()));
g_ndb->waitUntilReady(10); g_ndb_cluster_connection->wait_until_ready(10,0);
} }
else if(res == 1) else if(res == 1)
{ {
......
...@@ -53,6 +53,11 @@ ...@@ -53,6 +53,11 @@
#endif #endif
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
#define OPT_NDBCLUSTER_DEFAULT 0 #define OPT_NDBCLUSTER_DEFAULT 0
#ifdef NDB_SHM_TRANSPORTER
#define OPT_NDB_SHM_DEFAULT 1
#else
#define OPT_NDB_SHM_DEFAULT 0
#endif
#else #else
#define OPT_NDBCLUSTER_DEFAULT 0 #define OPT_NDBCLUSTER_DEFAULT 0
#endif #endif
...@@ -285,6 +290,10 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; ...@@ -285,6 +290,10 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0; my_bool opt_log_slave_updates= 0;
my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster; my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
#ifdef HAVE_NDBCLUSTER_DB
const char *opt_ndbcluster_connectstring= 0;
my_bool opt_ndb_shm, opt_ndb_optimized_node_selection;
#endif
my_bool opt_readonly, use_temp_pool, relay_log_purge; my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm; my_bool opt_sync_bdb_logs, opt_sync_frm;
my_bool opt_secure_auth= 0; my_bool opt_secure_auth= 0;
...@@ -3998,6 +4007,7 @@ enum options_mysqld ...@@ -3998,6 +4007,7 @@ enum options_mysqld
OPT_INNODB, OPT_ISAM, OPT_INNODB, OPT_ISAM,
OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT, OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ, OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION,
OPT_SKIP_SAFEMALLOC, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
...@@ -4439,24 +4449,46 @@ Disable with --skip-ndbcluster (will save memory).", ...@@ -4439,24 +4449,46 @@ Disable with --skip-ndbcluster (will save memory).",
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
{"ndb-connectstring", OPT_NDB_CONNECTSTRING, {"ndb-connectstring", OPT_NDB_CONNECTSTRING,
"Connect string for ndbcluster.", "Connect string for ndbcluster.",
(gptr*) &ndbcluster_connectstring, (gptr*) &ndbcluster_connectstring, (gptr*) &opt_ndbcluster_connectstring,
(gptr*) &opt_ndbcluster_connectstring,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"ndb_autoincrement_prefetch_sz", OPT_NDB_AUTOINCREMENT_PREFETCH_SZ, {"ndb-autoincrement-prefetch-sz", OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
"Specify number of autoincrement values that are prefetched", "Specify number of autoincrement values that are prefetched.",
(gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz, (gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz,
(gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz, (gptr*) &global_system_variables.ndb_autoincrement_prefetch_sz,
0, GET_INT, REQUIRED_ARG, 32, 1, 256, 0, 0, 0}, 0, GET_INT, REQUIRED_ARG, 32, 1, 256, 0, 0, 0},
{"ndb-force-send", OPT_NDB_FORCE_SEND,
"Force send of buffers to ndb immediately without waiting for "
"other threads.",
(gptr*) &global_system_variables.ndb_force_send,
(gptr*) &global_system_variables.ndb_force_send,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb_force_send", OPT_NDB_FORCE_SEND, {"ndb_force_send", OPT_NDB_FORCE_SEND,
"Force send of buffers to ndb immediately without waiting for other threads", "same as --ndb-force-send.",
(gptr*) &global_system_variables.ndb_force_send, (gptr*) &global_system_variables.ndb_force_send,
(gptr*) &global_system_variables.ndb_force_send, (gptr*) &global_system_variables.ndb_force_send,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT,
"Use exact records count during query planning and for fast "
"select count(*), disable for faster queries.",
(gptr*) &global_system_variables.ndb_use_exact_count,
(gptr*) &global_system_variables.ndb_use_exact_count,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb_use_exact_count", OPT_NDB_USE_EXACT_COUNT, {"ndb_use_exact_count", OPT_NDB_USE_EXACT_COUNT,
"Use exact records count during query planning and for " "same as --ndb-use-exact-count.",
"fast select count(*)",
(gptr*) &global_system_variables.ndb_use_exact_count, (gptr*) &global_system_variables.ndb_use_exact_count,
(gptr*) &global_system_variables.ndb_use_exact_count, (gptr*) &global_system_variables.ndb_use_exact_count,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb-shm", OPT_NDB_SHM,
"Use shared memory connections when available.",
(gptr*) &opt_ndb_shm,
(gptr*) &opt_ndb_shm,
0, GET_BOOL, OPT_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0},
{"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,
"Select nodes for transactions in a more optimal way.",
(gptr*) &opt_ndb_optimized_node_selection,
(gptr*) &opt_ndb_optimized_node_selection,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
#endif #endif
{"new", 'n', "Use very new possible 'unsafe' functions.", {"new", 'n', "Use very new possible 'unsafe' functions.",
(gptr*) &global_system_variables.new_mode, (gptr*) &global_system_variables.new_mode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment