/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */




/*****************************************************************************
Name:          Ndb.cpp
******************************************************************************/

#include <ndb_global.h>
#include <pthread.h>

#include "NdbApiSignal.hpp"
#include "NdbImpl.hpp"
#include <NdbOperation.hpp>
#include <NdbConnection.hpp>
#include <NdbEventOperation.hpp>
#include <NdbRecAttr.hpp>
#include <md5_hash.hpp>
#include <NdbSleep.h>
#include <NdbOut.hpp>
#include <ndb_limits.h>
#include "API.hpp"
#include <NdbEnv.h>
#include <BaseString.hpp>

/****************************************************************************
void connect();

Connect to any node which has no connection at the moment.
****************************************************************************/
NdbConnection* Ndb::doConnect(Uint32 tConNode) 
{
  Uint32        tNode;
  Uint32        i = 0;;
  Uint32        tAnyAlive = 0;
  int TretCode;

  if (tConNode != 0) {
    TretCode = NDB_connect(tConNode);
    if ((TretCode == 1) || (TretCode == 2)) {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
      return getConnectedNdbConnection(tConNode);
    } else if (TretCode != 0) {
      tAnyAlive = 1;
    }//if
  }//if
//****************************************************************************
// We will connect to any node. Make sure that we have connections to all
// nodes.
//****************************************************************************
  Uint32 tNoOfDbNodes = theNoOfDBnodes;
  i = theCurrentConnectIndex;
  UintR Tcount = 0;
  do {
    if (i >= tNoOfDbNodes) {
      i = 0;
    }//if
    Tcount++;
    tNode = theDBnodes[i];
    TretCode = NDB_connect(tNode);
    if ((TretCode == 1) || (TretCode == 2)) {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
      if (theCurrentConnectIndex == i) {
        theCurrentConnectCounter++;
        if (theCurrentConnectCounter == 8) {
	  theCurrentConnectCounter = 1;
	  theCurrentConnectIndex++;
	}//if
      } else {
	// Set to 2 because we have already connected to a node 
	// when we get here.
        theCurrentConnectCounter = 2;
        theCurrentConnectIndex = i;
      }//if
      return getConnectedNdbConnection(tNode);
    } else if (TretCode != 0) {
      tAnyAlive = 1;
    }//if
    i++;
  } while (Tcount < tNoOfDbNodes);
//****************************************************************************
// We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure.
//****************************************************************************
  if (tAnyAlive == 1) {
#ifdef VM_TRACE
    ndbout << "TretCode = " << TretCode << endl;
#endif
    theError.code = 4006;
  } else {
    theError.code = 4009;
  }//if
  return NULL;
}

int 
Ndb::NDB_connect(Uint32 tNode) 
{
//****************************************************************************
// We will perform seize of a transaction record in DBTC in the specified node.
//***************************************************************************
  
  int	         tReturnCode;
  TransporterFacade *tp = TransporterFacade::instance();

  bool nodeAvail = tp->get_node_alive(tNode);
  if(nodeAvail == false){
    return 0;
  }
  
  NdbConnection * tConArray = theConnectionArray[tNode];
  if (tConArray != NULL) {
    return 2;
  }
  
  NdbConnection * tNdbCon = getNdbCon();	// Get free connection object.
  if (tNdbCon == NULL) {
    return 4;
  }//if
  NdbApiSignal*	tSignal = getSignal();		// Get signal object
  if (tSignal == NULL) {
    releaseNdbCon(tNdbCon);
    return 4;
  }//if
  if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) {
    releaseNdbCon(tNdbCon);
    releaseSignal(tSignal);
    return 4;
  }//if
  tSignal->setData(tNdbCon->ptr2int(), 1);
//************************************************
// Set connection pointer as NdbConnection object
//************************************************
  tSignal->setData(theMyRef, 2);	// Set my block reference
  tNdbCon->Status(NdbConnection::Connecting); // Set status to connecting
  Uint32 nodeSequence;
  { // send and receive signal
    Guard guard(tp->theMutexPtr);
    nodeSequence = tp->getNodeSequence(tNode);
    bool node_is_alive = tp->get_node_alive(tNode);
    if (node_is_alive) { 
      tReturnCode = tp->sendSignal(tSignal, tNode);  
      releaseSignal(tSignal); 
      if (tReturnCode != -1) {
        theWaiter.m_node = tNode;  
        theWaiter.m_state = WAIT_TC_SEIZE;  
        tReturnCode = receiveResponse(); 
      }//if
    } else {
      releaseSignal(tSignal);
      tReturnCode = -1;
    }//if
  }
  if ((tReturnCode == 0) && (tNdbCon->Status() == NdbConnection::Connected)) {
    //************************************************
    // Send and receive was successful
    //************************************************
    NdbConnection* tPrevFirst = theConnectionArray[tNode];
    tNdbCon->setConnectedNodeId(tNode, nodeSequence);
    
    tNdbCon->setMyBlockReference(theMyRef);
    theConnectionArray[tNode] = tNdbCon;
    tNdbCon->theNext = tPrevFirst;
    return 1;
  } else {
    releaseNdbCon(tNdbCon);
//****************************************************************************
// Unsuccessful connect is indicated by 3.
//****************************************************************************
    return 3;
  }//if
}//Ndb::NDB_connect()

NdbConnection *
Ndb::getConnectedNdbConnection(Uint32 nodeId){
  NdbConnection* next = theConnectionArray[nodeId];
  theConnectionArray[nodeId] = next->theNext;
  next->theNext = NULL;

  return next;
}//Ndb::getConnectedNdbConnection()

/*****************************************************************************
disconnect();

Remark:        Disconnect all connections to the database. 
*****************************************************************************/
void 
Ndb::doDisconnect()
{
  NdbConnection* tNdbCon;
  CHECK_STATUS_MACRO_VOID;

  Uint32 tNoOfDbNodes = theNoOfDBnodes;
  UintR i;
  for (i = 0; i < tNoOfDbNodes; i++) {
    Uint32 tNode = theDBnodes[i];
    tNdbCon = theConnectionArray[tNode];
    while (tNdbCon != NULL) {
      NdbConnection* tmpNdbCon = tNdbCon;
      tNdbCon = tNdbCon->theNext;
      releaseConnectToNdb(tmpNdbCon);
    }//while
  }//for
  tNdbCon = theTransactionList;
  while (tNdbCon != NULL) {
    NdbConnection* tmpNdbCon = tNdbCon;
    tNdbCon = tNdbCon->theNext;
    releaseConnectToNdb(tmpNdbCon);
  }//while
}//Ndb::disconnect()

/*****************************************************************************
int waitUntilReady(int timeout);

Return Value:   Returns 0 if the Ndb is ready within timeout seconds.
                Returns -1 otherwise.
Remark:         Waits until a node has status != 0
*****************************************************************************/ 
int
Ndb::waitUntilReady(int timeout)
{
  int secondsCounter = 0;
  int milliCounter = 0;
  int noChecksSinceFirstAliveFound = 0;

  if (theInitState != Initialised) {
    // Ndb::init is not called
    theError.code = 4256;
    return -1;
  }

  do {
    unsigned int foundAliveNode = 0;
    TransporterFacade *tp = TransporterFacade::instance();
    tp->lock_mutex();
    for (unsigned int i = 0; i < theNoOfDBnodes; i++) {
      const NodeId nodeId = theDBnodes[i];
      //************************************************
      // If any node is answering, ndb is answering
      //************************************************
      if (tp->get_node_alive(nodeId) != 0) {
	foundAliveNode++;
      }//if
    }//for
    
    tp->unlock_mutex();
    if (foundAliveNode == theNoOfDBnodes) {
      return 0;
    }//if
    if (foundAliveNode > 0) {
      noChecksSinceFirstAliveFound++;
    }//if
    if (noChecksSinceFirstAliveFound > 30) {
      return 0;
    }//if
    NdbSleep_MilliSleep(100);
    milliCounter += 100;
    if (milliCounter >= 1000) {
      secondsCounter++;
      milliCounter = 0;
    }//if
  } while ( secondsCounter < timeout );
  if (noChecksSinceFirstAliveFound > 0) {
    return 0;
  }//if
  return -1;
}

/*****************************************************************************
NdbConnection* startTransaction();

Return Value:   Returns a pointer to a connection object.
                Return NULL otherwise.
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
NdbConnection* 
Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen)
{
  DBUG_ENTER("Ndb::startTransaction");

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();
  /**
   * If the user supplied key data
   * We will make a qualified quess to which node is the primary for the
   * the fragment and contact that node
   */
    Uint32 nodeId;
    if(keyData != 0) {
      Uint32 fragmentId = computeFragmentId(keyData, keyLen);
      nodeId     = guessPrimaryNode(fragmentId);
    } else {
      nodeId = 0;
    }//if
    DBUG_RETURN(startTransactionLocal(aPriority, nodeId));
  } else {
    DBUG_RETURN(NULL);
  }//if
}//Ndb::startTransaction()

/*****************************************************************************
NdbConnection* hupp(NdbConnection* pBuddyTrans);

Return Value:   Returns a pointer to a connection object.
                Connected to the same node as pBuddyTrans
                and also using the same transction id
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
NdbConnection* 
Ndb::hupp(NdbConnection* pBuddyTrans)
{
  DBUG_ENTER("Ndb::hupp");

  Uint32 aPriority = 0;
  if (pBuddyTrans == NULL){
    DBUG_RETURN(startTransaction());
  }

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();

    Uint32 nodeId = pBuddyTrans->getConnectedNodeId();
    NdbConnection* pCon = startTransactionLocal(aPriority, nodeId);
    if(pCon == NULL)
      DBUG_RETURN(NULL);

    if (pCon->getConnectedNodeId() != nodeId){
      // We could not get a connection to the desired node
      // release the connection and return NULL
      closeTransaction(pCon);
      DBUG_RETURN(NULL);
    }
    pCon->setTransactionId(pBuddyTrans->getTransactionId());
    pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr());
    DBUG_RETURN(pCon);
  } else {
    DBUG_RETURN(NULL);
  }//if
}//Ndb::hupp()

NdbConnection* 
Ndb::startTransactionDGroup(Uint32 aPriority, const char * keyData, int type)
{

  char DGroup[4];
  if ((keyData == NULL) ||
      (type > 1)) {
    theError.code = 4118;
    return NULL;
  }//if
  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();
  /**
   * If the user supplied key data
   * We will make a qualified quess to which node is the primary for the
   * the fragment and contact that node
   */
    Uint32 fragmentId;
    if (type == 0) {
      DGroup[0] = keyData[0];
      DGroup[1] = keyData[1];
      DGroup[2] = 0x30;
      DGroup[3] = 0x30;
      fragmentId = computeFragmentId(&DGroup[0], 4);
    } else {
      Uint32 hashValue = ((keyData[0] - 0x30) * 10) + (keyData[1] - 0x30);
      fragmentId = getFragmentId(hashValue);    
    }//if
    Uint32 nodeId     = guessPrimaryNode(fragmentId);
    return startTransactionLocal(aPriority, nodeId);
  } else {
    return NULL;
  }//if
}//Ndb::startTransaction()

NdbConnection* 
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
{
#ifdef VM_TRACE
  char buf[255];
  const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);
  if(val != 0){
    nodeId = atoi(val);
  }
#endif

  NdbConnection* tConnection;
  Uint64 tFirstTransId = theFirstTransId;
  tConnection = doConnect(nodeId);
  if (tConnection == NULL) {
    return NULL;
  }//if
  NdbConnection* tConNext = theTransactionList;
  tConnection->init();
  theTransactionList = tConnection;        // into a transaction list.
  tConnection->next(tConNext);   // Add the active connection object
  tConnection->setTransactionId(tFirstTransId);
  tConnection->thePriority = aPriority;
  if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {
    //---------------------------------------------------
// Transaction id rolling round. We will start from
// consecutive identity 0 again.
//---------------------------------------------------
    theFirstTransId = ((tFirstTransId >> 32) << 32);      
  } else {
    theFirstTransId = tFirstTransId + 1;
  }//if
#ifdef VM_TRACE
  if (tConnection->theListState != NdbConnection::NotInList) {
    printState("startTransactionLocal %x", tConnection);
    abort();
  }
#endif
  return tConnection;
}//Ndb::startTransactionLocal()

/*****************************************************************************
void closeTransaction(NdbConnection* aConnection);

Parameters:     aConnection: the connection used in the transaction.
Remark:         Close transaction by releasing the connection and all operations.
*****************************************************************************/
void
Ndb::closeTransaction(NdbConnection* aConnection)
{
  DBUG_ENTER("Ndb::closeTransaction");

  NdbConnection* tCon;
  NdbConnection* tPreviousCon;

  if (aConnection == NULL) {
//-----------------------------------------------------
// closeTransaction called on NULL pointer, destructive
// application behaviour.
//-----------------------------------------------------
#ifdef VM_TRACE
    printf("NULL into closeTransaction\n");
#endif
    DBUG_VOID_RETURN;
  }//if
  CHECK_STATUS_MACRO_VOID;
  
  tCon = theTransactionList;
  
  if (aConnection == tCon) {		// Remove the active connection object
    theTransactionList = tCon->next();	// from the transaction list.
  } else { 
    while (aConnection != tCon) {
      if (tCon == NULL) {
//-----------------------------------------------------
// closeTransaction called on non-existing transaction
//-----------------------------------------------------

	if(aConnection->theError.code == 4008){
	  /**
	   * When a SCAN timed-out, returning the NdbConnection leads
	   * to reuse. And TC crashes when the API tries to reuse it to
	   * something else...
	   */
#ifdef VM_TRACE
	  printf("Scan timeout:ed NdbConnection-> "
		 "not returning it-> memory leak\n");
#endif
	  DBUG_VOID_RETURN;
	}

#ifdef VM_TRACE
	printf("Non-existing transaction into closeTransaction\n");
	abort();
#endif
	DBUG_VOID_RETURN;
      }//if
      tPreviousCon = tCon;
      tCon = tCon->next();
    }//while
    tPreviousCon->next(tCon->next());
  }//if
  
  aConnection->release();
  
  if(aConnection->theError.code == 4008){
    /**
     * Something timed-out, returning the NdbConnection leads
     * to reuse. And TC crashes when the API tries to reuse it to
     * something else...
     */
#ifdef VM_TRACE
    printf("Con timeout:ed NdbConnection-> not returning it-> memory leak\n");
#endif
    DBUG_VOID_RETURN;
  }
  
  if (aConnection->theReleaseOnClose == false) {
    /**
     * Put it back in idle list for that node
     */
    Uint32 nodeId = aConnection->getConnectedNodeId();
    aConnection->theNext = theConnectionArray[nodeId];
    theConnectionArray[nodeId] = aConnection;
    DBUG_VOID_RETURN;
  } else {
    aConnection->theReleaseOnClose = false;
    releaseNdbCon(aConnection);
  }//if
  DBUG_VOID_RETURN;
}//Ndb::closeTransaction()

/*****************************************************************************
int* NdbTamper(int aAction, int aNode);

Parameters: aAction     Specifies what action to be taken
            1: Lock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
            2: UnLock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
	    3: Crash node

           aNode        Specifies which node the action will be taken
     	  -1: Master DIH 
       	0-16: Nodnumber

Return Value: -1 Error  .
                
Remark:         Sends a signal to DIH.
*****************************************************************************/ 
int 
Ndb::NdbTamper(TamperType aAction, int aNode)
{
  NdbConnection*	tNdbConn;
  NdbApiSignal		tSignal(theMyRef);
  int			tNode;
  int                   tAction;
  int			ret_code;

#ifdef CUSTOMER_RELEASE
  return -1;
#else
  CHECK_STATUS_MACRO;
  checkFailedNode();

  theRestartGCI = 0;
  switch (aAction) {
// Translate enum to integer. This is done because the SCI layer
// expects integers. 
     case LockGlbChp:
        tAction = 1;
        break;
     case UnlockGlbChp:
        tAction = 2;
	break;
     case CrashNode:
        tAction = 3;
        break;
     case ReadRestartGCI:
	tAction = 4;
	break;
     default:
        theError.code = 4102;
        return -1;
  }

  tNdbConn = getNdbCon();	// Get free connection object
  if (tNdbConn == NULL) {
    theError.code = 4000;
    return -1;
  }
  tSignal.setSignal(GSN_DIHNDBTAMPER);
  tSignal.setData (tAction, 1);
  tSignal.setData(tNdbConn->ptr2int(),2);
  tSignal.setData(theMyRef,3);		// Set return block reference
  tNdbConn->Status(NdbConnection::Connecting); // Set status to connecting
  TransporterFacade *tp = TransporterFacade::instance();
  if (tAction == 3) {
    tp->lock_mutex();
    tp->sendSignal(&tSignal, aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
  } else if ( (tAction == 2) || (tAction == 1) ) {
    tp->lock_mutex();
    tNode = tp->get_an_alive_node();
    if (tNode == 0) {
      theError.code = 4002;
      releaseNdbCon(tNdbConn);
      return -1;
    }//if
    ret_code = tp->sendSignal(&tSignal,aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
    return ret_code;
  } else {
    do {
      tp->lock_mutex();
      // Start protected area
      tNode = tp->get_an_alive_node();
      tp->unlock_mutex();
      // End protected area
      if (tNode == 0) {
        theError.code = 4009;
        releaseNdbCon(tNdbConn);
        return -1;
      }//if
      ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
      if (ret_code == 0) {  
        if (tNdbConn->Status() != NdbConnection::Connected) {
          theRestartGCI = 0;
        }//if
        releaseNdbCon(tNdbConn);
        return theRestartGCI;
      } else if ((ret_code == -5) || (ret_code == -2)) {
        TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
      } else {
        return -1;
      }//if
    } while (1);
  }
  return 0;
#endif
}
#if 0
/****************************************************************************
NdbSchemaCon* startSchemaTransaction();

Return Value:   Returns a pointer to a schema connection object.
                Return NULL otherwise.
Remark:         Start schema transaction. Synchronous.
****************************************************************************/ 
NdbSchemaCon* 
Ndb::startSchemaTransaction()
{
  NdbSchemaCon* tSchemaCon;
  if (theSchemaConToNdbList != NULL) {
    theError.code = 4321;
    return NULL;
  }//if
  tSchemaCon = new NdbSchemaCon(this);
  if (tSchemaCon == NULL) {
    theError.code = 4000;
    return NULL;
  }//if 
  theSchemaConToNdbList = tSchemaCon;
  return tSchemaCon;  
}
/*****************************************************************************
void closeSchemaTransaction(NdbSchemaCon* aSchemaCon);

Parameters:     aSchemaCon: the schemacon used in the transaction.
Remark:         Close transaction by releasing the schemacon and all schemaop.
*****************************************************************************/
void
Ndb::closeSchemaTransaction(NdbSchemaCon* aSchemaCon)
{
  if (theSchemaConToNdbList != aSchemaCon) {
    abort();
    return;
  }//if
  aSchemaCon->release();
  delete aSchemaCon;
  theSchemaConToNdbList = NULL;
  return;
}//Ndb::closeSchemaTransaction()
#endif

/*****************************************************************************
void RestartGCI(int aRestartGCI);

Remark:		Set theRestartGCI on the NDB object
*****************************************************************************/
void
Ndb::RestartGCI(int aRestartGCI)
{
  theRestartGCI = aRestartGCI;
}

/****************************************************************************
int getBlockNumber(void);

Remark:		
****************************************************************************/
int
Ndb::getBlockNumber()
{
  return theNdbBlockNumber;
}

NdbDictionary::Dictionary *
Ndb::getDictionary() const {
  return theDictionary;
}

/****************************************************************************
int getNodeId();

Remark:		
****************************************************************************/
int
Ndb::getNodeId()
{
  return theNode;
}

/****************************************************************************
Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );

Parameters:     aTableId : The TableId.
                cacheSize: Prefetch this many values
Remark:		Returns a new TupleId to the application.
                The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
                It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
****************************************************************************/
#define DEBUG_TRACE(msg) \
//  ndbout << __FILE__ << " line: " << __LINE__ << " msg: " << msg << endl

Uint64
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
{
  DEBUG_TRACE("getAutoIncrementValue");
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0)
    return ~0;
  Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
  return tupleId;
}

Uint64
Ndb::getAutoIncrementValue(NdbDictionary::Table * aTable, Uint32 cacheSize)
{
  DEBUG_TRACE("getAutoIncrementValue");
  if (aTable == 0)
    return ~0;
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
  return tupleId;
}

Uint64 
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0)
    return ~0;
  return getTupleIdFromNdb(table->m_tableId, cacheSize);
}

Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
{
  if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
  {
    theFirstTupleId[aTableId]++;
    return theFirstTupleId[aTableId];
  }
  else // theFirstTupleId == theLastTupleId
  {
    return opTupleIdOnNdb(aTableId, cacheSize, 0);
  }
}

Uint64
Ndb::readAutoIncrementValue(const char* aTableName)
{
  DEBUG_TRACE("readtAutoIncrementValue");
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0)
    return ~0;
  Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
  return tupleId;
}

Uint64
Ndb::readAutoIncrementValue(NdbDictionary::Table * aTable)
{
  DEBUG_TRACE("readtAutoIncrementValue");
  if (aTable == 0)
    return ~0;
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
  return tupleId;
}

Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
{
  if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
    // Cache is empty, check next in database
    return opTupleIdOnNdb(aTableId, 0, 3);

  return theFirstTupleId[aTableId] + 1;
}

bool
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
{
  DEBUG_TRACE("setAutoIncrementValue " << val);
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0)
    return false;
  return setTupleIdInNdb(table->m_tableId, val, increase);
}

bool
Ndb::setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val, bool increase)
{
  DEBUG_TRACE("setAutoIncrementValue " << val);
  if (aTable == 0)
    return ~0;
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  return setTupleIdInNdb(table->m_tableId, val, increase);
}

bool 
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
  DEBUG_TRACE("setTupleIdInNdb");
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0)
    return false;
  return setTupleIdInNdb(table->m_tableId, val, increase);
}

bool
Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
{
  DEBUG_TRACE("setTupleIdInNdb");
  if (increase)
  {
    if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
    {
      // We have a cache sequence
      if (val <= theFirstTupleId[aTableId]+1)
	return false;
      if (val <= theLastTupleId[aTableId])
      {
	theFirstTupleId[aTableId] = val - 1;
	return true;
      }
      // else continue;
    }    
    return (opTupleIdOnNdb(aTableId, val, 2) == val);
  }
  else
    return (opTupleIdOnNdb(aTableId, val, 1) == val);
}

Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
{
  DEBUG_TRACE("opTupleIdOnNdb");

  NdbConnection*     tConnection;
  NdbOperation*      tOperation;
  Uint64             tValue;
  NdbRecAttr*        tRecAttrResult;
  int                result;
  Uint64 ret;

  CHECK_STATUS_MACRO_ZERO;

  BaseString currentDb(getDatabaseName());
  BaseString currentSchema(getDatabaseSchemaName());

  setDatabaseName("sys");
  setDatabaseSchemaName("def");
  tConnection = this->startTransaction();
  if (tConnection == NULL)
    goto error_return;

  if (usingFullyQualifiedNames())
    tOperation = tConnection->getNdbOperation("SYSTAB_0");
  else
    tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
  if (tOperation == NULL)
    goto error_handler;

  switch (op)
    {
    case 0:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->incValue("NEXTID", opValue);
      tRecAttrResult = tOperation->getValue("NEXTID");

      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;

      tValue = tRecAttrResult->u_64_value();

      theFirstTupleId[aTableId] = tValue - opValue;
      theLastTupleId[aTableId]  = tValue - 1;
      ret = theFirstTupleId[aTableId];
      break;
    case 1:
      tOperation->updateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->setValue("NEXTID", opValue);

      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;

      theFirstTupleId[aTableId] = ~(Uint64)0;
      theLastTupleId[aTableId]  = ~(Uint64)0;
      ret = opValue;
      break;
    case 2:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->load_const_u64(1, opValue);
      tOperation->read_attr("NEXTID", 2);
      tOperation->branch_le(2, 1, 0);
      tOperation->write_attr("NEXTID", 1);
      tOperation->interpret_exit_ok();
      tOperation->def_label(0);
      tOperation->interpret_exit_nok(9999);
      
      if ( (result = tConnection->execute( Commit )) == -1 )
        goto error_handler;
      
      if (result == 9999)
        ret = ~(Uint64)0;
      else
      {
        theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
	ret = opValue;
      }
      break;
    case 3:
      tOperation->readTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tRecAttrResult = tOperation->getValue("NEXTID");
      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;
      ret = tRecAttrResult->u_64_value();
      break;
    default:
      goto error_handler;
    }

  this->closeTransaction(tConnection);

  // Restore current name space
  setDatabaseName(currentDb.c_str());
  setDatabaseSchemaName(currentSchema.c_str());

  return ret;

  error_handler:
    theError.code = tConnection->theError.code;
    this->closeTransaction(tConnection);
  error_return:
    // Restore current name space
    setDatabaseName(currentDb.c_str());
    setDatabaseSchemaName(currentSchema.c_str());

  return ~0;
}

static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
static const Uint32 MAX_KEY_LEN_32_WORDS = 8;
static const Uint32 MAX_KEY_LEN_BYTES    = 32;

Uint32
Ndb::computeFragmentId(const char * keyData, Uint32 keyLen)
{
  Uint64 tempData[MAX_KEY_LEN_64_WORDS];
  
  const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
  const char * usedKeyData = 0;
  
  /**
   * If   key data buffer is not aligned (on 64 bit boundary)
   *   or key len is not a multiple of 4
   * Use temp data
   */
  if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
    usedKeyData = keyData;
  } else {
    memcpy(&tempData[0], keyData, keyLen);
    const int slack = keyLen & 3;
    if(slack > 0) {
      memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
    }//if
    usedKeyData = (char *)&tempData[0];
  }//if
  
  Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);

  hashValue >>= startTransactionNodeSelectionData.kValue;
  return getFragmentId(hashValue);
}//Ndb::computeFragmentId()

Uint32
Ndb::getFragmentId(Uint32 hashValue)
{
  Uint32 fragmentId = hashValue &
    startTransactionNodeSelectionData.hashValueMask;
  if(fragmentId < startTransactionNodeSelectionData.hashpointerValue) {
    fragmentId = hashValue &
                 ((startTransactionNodeSelectionData.hashValueMask << 1) + 1);
  }//if
  return fragmentId;
}

Uint32
Ndb::guessPrimaryNode(Uint32 fragmentId){
  //ASSERT(((fragmentId > 0) && fragmentId <
  // startTransactionNodeSelectionData.noOfFragments), "Invalid fragementId");

  return startTransactionNodeSelectionData.fragment2PrimaryNodeMap[fragmentId];
}

void
Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
                                             Uint32 nodeIds[]) {
  kValue           = 6;
  noOfFragments    = 2 * noOfNodes;

  /**
   * Compute hashValueMask and hashpointerValue
   */
  {
    Uint32 topBit = (1 << 31);
    for(int i = 31; i>=0; i--){
      if((noOfFragments & topBit) != 0)
	break;
      topBit >>= 1;
    }
    hashValueMask    = topBit - 1;
    hashpointerValue = noOfFragments - (hashValueMask + 1);
  }
  
  /**
   * This initialization depends on
   * the fact that:
   *  primary node for fragment i = i % noOfNodes
   *
   * This algorithm should be implemented in Dbdih
   */
  {
    fragment2PrimaryNodeMap = new Uint32[noOfFragments];
    Uint32 i;  
    for(i = 0; i<noOfNodes; i++){
      fragment2PrimaryNodeMap[i] = nodeIds[i];
    }
    
    // Sort them (bubble sort)
    for(i = 0; i<noOfNodes-1; i++)
      for(Uint32 j = i+1; j<noOfNodes; j++)
	if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
	  Uint32 tmp = fragment2PrimaryNodeMap[i];
	  fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
	  fragment2PrimaryNodeMap[j] = tmp;
	}
    
    for(i = 0; i<noOfNodes; i++){
      fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
    }
  }
}

void
Ndb::StartTransactionNodeSelectionData::release(){
  delete [] fragment2PrimaryNodeMap;
  fragment2PrimaryNodeMap = 0;
}

Uint32
convertEndian(Uint32 Data)
{
#ifdef WORDS_BIGENDIAN
  Uint32 t1, t2, t3, t4;
  t4 = (Data >> 24) & 255;
  t3 = (Data >> 16) & 255;
  t4 = t4 + (t3 << 8);
  t2 = (Data >> 8) & 255;
  t4 = t4 + (t2 << 16);
  t1 = Data & 255;
  t4 = t4 + (t1 << 24);
  return t4;
#else
  return Data;
#endif
}
const char * Ndb::getCatalogName() const
{
  return theDataBase;
}
 
void Ndb::setCatalogName(const char * a_catalog_name)
{
  if (a_catalog_name) {
    snprintf(theDataBase, sizeof(theDataBase), "%s",
             a_catalog_name ? a_catalog_name : "");

    int len = snprintf(prefixName, sizeof(prefixName), "%s%c%s%c",
                       theDataBase, table_name_separator,
                       theDataBaseSchema, table_name_separator);
    prefixEnd = prefixName + (len < sizeof(prefixName) ? len : 
                              sizeof(prefixName) - 1);
  }
}
 
const char * Ndb::getSchemaName() const
{
  return theDataBaseSchema;
}
 
void Ndb::setSchemaName(const char * a_schema_name)
{
  if (a_schema_name) {
    snprintf(theDataBaseSchema, sizeof(theDataBase), "%s",
             a_schema_name ? a_schema_name : "");

    int len = snprintf(prefixName, sizeof(prefixName), "%s%c%s%c",
                       theDataBase, table_name_separator,
                       theDataBaseSchema, table_name_separator);
    prefixEnd = prefixName + (len < sizeof(prefixName) ? len : 
                              sizeof(prefixName) - 1);
  }
}
 
/*
Deprecated functions
*/
const char * Ndb::getDatabaseName() const
{
  return getCatalogName();
}
 
void Ndb::setDatabaseName(const char * a_catalog_name)
{
  setCatalogName(a_catalog_name);
}
 
const char * Ndb::getDatabaseSchemaName() const
{
  return getSchemaName();
}
 
void Ndb::setDatabaseSchemaName(const char * a_schema_name)
{
  setSchemaName(a_schema_name);
}
 
void Ndb::useFullyQualifiedNames(bool turnNamingOn)
{
  fullyQualifiedNames = turnNamingOn;
}
 
bool Ndb::usingFullyQualifiedNames()
{
  return fullyQualifiedNames;
}
 
const char *
Ndb::externalizeTableName(const char * internalTableName, bool fullyQualifiedNames)
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalTableName;
   
    // Skip database name
    while (*ptr && *ptr++ != table_name_separator);
    // Skip schema name
    while (*ptr && *ptr++ != table_name_separator);
    return ptr;
  }
  else
    return internalTableName;
}

const char *
Ndb::externalizeTableName(const char * internalTableName)
{
  return externalizeTableName(internalTableName, usingFullyQualifiedNames());
}

const char *
Ndb::externalizeIndexName(const char * internalIndexName, bool fullyQualifiedNames)
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalIndexName;
   
    // Scan name from the end
    while (*ptr++); ptr--; // strend
    while (ptr >= internalIndexName && *ptr != table_name_separator)
      ptr--;
     
    return ptr + 1;
  }
  else
    return internalIndexName;
}

const char *
Ndb::externalizeIndexName(const char * internalIndexName)
{
  return externalizeIndexName(internalIndexName, usingFullyQualifiedNames());
}

const char *
Ndb::internalizeTableName(const char * externalTableName)
{
  if (fullyQualifiedNames) {
    strncpy(prefixEnd, externalTableName, NDB_MAX_TAB_NAME_SIZE);
    return prefixName;
  }
  else
    return externalTableName;
}
 
const char *
Ndb::internalizeIndexName(const NdbTableImpl * table,
                          const char * externalIndexName)
{
  if (fullyQualifiedNames) {
    char tableId[10];
    sprintf(tableId, "%d", table->m_tableId);
    Uint32 tabIdLen = strlen(tableId);
    strncpy(prefixEnd, tableId, tabIdLen);
    prefixEnd[tabIdLen] = table_name_separator;
    strncpy(prefixEnd + tabIdLen + 1, 
	    externalIndexName, NDB_MAX_TAB_NAME_SIZE);
    return prefixName;
  }
  else
    return externalIndexName;
}

const BaseString
Ndb::getDatabaseFromInternalName(const char * internalName)
{
  char * databaseName = new char[strlen(internalName) + 1];
  strcpy(databaseName, internalName);
  register char *ptr = databaseName;
   
  /* Scan name for the first table_name_separator */
  while (*ptr && *ptr != table_name_separator)
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(databaseName);
  delete [] databaseName;
  return ret;
}
 
const BaseString
Ndb::getSchemaFromInternalName(const char * internalName)
{
  char * schemaName = new char[strlen(internalName)];
  register const char *ptr1 = internalName;
   
  /* Scan name for the second table_name_separator */
  while (*ptr1 && *ptr1 != table_name_separator)
    ptr1++;
  strcpy(schemaName, ptr1 + 1);
  register char *ptr = schemaName;
  while (*ptr && *ptr != table_name_separator)
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(schemaName);
  delete [] schemaName;
  return ret;
}

NdbEventOperation* Ndb::createEventOperation(const char* eventName,
					     const int bufferLength)
{
  NdbEventOperation* tOp;

  tOp = new NdbEventOperation(this, eventName, bufferLength);

  if (tOp->getState() != NdbEventOperation::CREATED) {
    delete tOp;
    tOp = NULL;
  }

  //now we have to look up this event in dict

  return tOp;
}

int Ndb::dropEventOperation(NdbEventOperation* op) {
  delete op;
  return 0;
}

NdbGlobalEventBufferHandle* Ndb::getGlobalEventBufferHandle()
{
  return theGlobalEventBufferHandle;
}

//void Ndb::monitorEvent(NdbEventOperation *op, NdbEventCallback cb, void* rs)
//{
//}

int
Ndb::pollEvents(int aMillisecondNumber)
{
  return NdbEventOperation::wait(theGlobalEventBufferHandle,
				 aMillisecondNumber);
}

#ifdef VM_TRACE
#include <NdbMutex.h>
static NdbMutex print_state_mutex = NDB_MUTEX_INITIALIZER;
static bool
checkdups(NdbConnection** list, unsigned no)
{
  for (unsigned i = 0; i < no; i++)
    for (unsigned j = i + 1; j < no; j++)
      if (list[i] == list[j])
        return true;
  return false;
}
void
Ndb::printState(const char* fmt, ...)
{
  char buf[200];
  va_list ap;
  va_start(ap, fmt);
  vsprintf(buf, fmt, ap);
  va_end(ap);
  NdbMutex_Lock(&print_state_mutex);
  bool dups = false;
  ndbout << buf << " ndb=" << hex << this << dec;
#ifndef NDB_WIN32
  ndbout << " thread=" << (int)pthread_self();
#endif
  ndbout << endl;
  for (unsigned n = 0; n < MAX_NDB_NODES; n++) {
    NdbConnection* con = theConnectionArray[n];
    if (con != 0) {
      ndbout << "conn " << n << ":" << endl;
      while (con != 0) {
        con->printState();
        con = con->theNext;
      }
    }
  }
  ndbout << "prepared: " << theNoOfPreparedTransactions<< endl;
  if (checkdups(thePreparedTransactionsArray, theNoOfPreparedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (unsigned i = 0; i < theNoOfPreparedTransactions; i++)
    thePreparedTransactionsArray[i]->printState();
  ndbout << "sent: " << theNoOfSentTransactions<< endl;
  if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (unsigned i = 0; i < theNoOfSentTransactions; i++)
    theSentTransactionsArray[i]->printState();
  ndbout << "completed: " << theNoOfCompletedTransactions<< endl;
  if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (unsigned i = 0; i < theNoOfCompletedTransactions; i++)
    theCompletedTransactionsArray[i]->printState();
  NdbMutex_Unlock(&print_state_mutex);
}
#endif