/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/**
 * ndbapi_async.cpp: 
 * Illustrates how to use callbacks and error handling using the asynchronous
 * part of the NDBAPI.
 *
 * Classes and methods in NDBAPI used in this example:
 *
 *  Ndb_cluster_connection
 *       connect()
 *       wait_until_ready()
 *
 *  Ndb
 *       init()
 *       startTransaction()
 *       closeTransaction()
 *       sendPollNdb()
 *       getNdbError()
 *
 *  NdbConnection
 *       getNdbOperation()
 *       executeAsynchPrepare()
 *       getNdbError()
 *
 *  NdbOperation
 *       insertTuple()
 *       equal()
 *       setValue()
 *       
 */


#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>

#include <iostream> // Used for cout

/**
 * Helper sleep function
 */
int
milliSleep(int milliseconds){
  int result = 0;
  struct timespec sleeptime;
  sleeptime.tv_sec = milliseconds / 1000;
  sleeptime.tv_nsec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
  result = nanosleep(&sleeptime, NULL);
  return result;
}

/**
 * error printout macro
 */
#define PRINT_ERROR(code,msg) \
  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
            << ", code: " << code \
            << ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
  exit(-1); }
#define APIERROR(error) { \
  PRINT_ERROR(error.code,error.message); \
  exit(-1); }

#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
 * callback struct.
 * transaction :  index of the transaction in transaction[] array below
 * data : the data that the transaction was modifying.
 * retries : counter for how many times the trans. has been retried
 */
typedef struct  {
  Ndb * ndb;
  int    transaction;  
  int    data;
  int    retries;
} async_callback_t;

/**
 * Structure used in "free list" to a NdbTransaction
 */
typedef struct  {
  NdbTransaction*  conn;   
  int used; 
} transaction_t;

/**
 * Free list holding transactions
 */
transaction_t   transaction[1024];  //1024 - max number of outstanding
                                    //transaction in one Ndb object

#endif 
/**
 * prototypes
 */

/**
 * Prepare and send transaction
 */
int  populate(Ndb * myNdb, int data, async_callback_t * cbData);

/**
 * Error handler.
 */
bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb);

/**
 * Exit function
 */
void asynchExitHandler(Ndb * m_ndb) ;

/**
 * Helper function used in callback(...)
 */
void closeTransaction(Ndb * ndb , async_callback_t * cb);

/**
 * Function to create table
 */
int create_table(Ndb * myNdb);

/**
 * stat. variables
 */
int tempErrors = 0;
int permErrors = 0;

void
closeTransaction(Ndb * ndb , async_callback_t * cb)
{
  ndb->closeTransaction(transaction[cb->transaction].conn);
  transaction[cb->transaction].conn = 0;
  transaction[cb->transaction].used = 0;
  cb->retries++;  
}

/**
 * Callback executed when transaction has return from NDB
 */
static void
callback(int result, NdbTransaction* trans, void* aObject)
{
  async_callback_t * cbData = (async_callback_t *)aObject;
  if (result<0)
  {
    /**
     * Error: Temporary or permanent?
     */
    if (asynchErrorHandler(trans,  (Ndb*)cbData->ndb)) 
    {
      closeTransaction((Ndb*)cbData->ndb, cbData);
      while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0)
	milliSleep(10);
    }
    else
    {
      std::cout << "Restore: Failed to restore data " 
		<< "due to a unrecoverable error. Exiting..." << std::endl;
      delete cbData;
      asynchExitHandler((Ndb*)cbData->ndb);
    }
  } 
  else 
  {
    /**
     * OK! close transaction
     */
    closeTransaction((Ndb*)cbData->ndb, cbData);
    delete cbData;
  }
}


/**
 * Create table "GARAGE"
 */
int create_table(MYSQL &mysql) 
{
  while (mysql_query(&mysql, 
		  "CREATE TABLE"
		  "  GARAGE"
		  "    (REG_NO INT UNSIGNED NOT NULL,"
		  "     BRAND CHAR(20) NOT NULL,"
		  "     COLOR CHAR(20) NOT NULL,"
		  "     PRIMARY KEY USING HASH (REG_NO))"
		  "  ENGINE=NDB"))
  {
    if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
      MYSQLERROR(mysql);
    std::cout << "MySQL Cluster already has example table: GARAGE. "
	      << "Dropping it..." << std::endl; 
    /**************
     * Drop table *
     **************/
    if (mysql_query(&mysql, "DROP TABLE GARAGE"))
      MYSQLERROR(mysql);
  }
  return 1;
}

void asynchExitHandler(Ndb * m_ndb) 
{
  if (m_ndb != NULL)
    delete m_ndb;
  exit(-1);
}

/* returns true if is recoverable (temporary),
 *  false if it is an  error that is permanent.
 */
bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb) 
{  
  NdbError error = trans->getNdbError();
  switch(error.status)
  {
  case NdbError::Success:
    return false;
    break;
    
  case NdbError::TemporaryError:
    /**
     * The error code indicates a temporary error.
     * The application should typically retry.
     * (Includes classifications: NdbError::InsufficientSpace, 
     *  NdbError::TemporaryResourceError, NdbError::NodeRecoveryError,
     *  NdbError::OverloadError, NdbError::NodeShutdown 
     *  and NdbError::TimeoutExpired.)
     *     
     * We should sleep for a while and retry, except for insufficient space
     */
    if(error.classification == NdbError::InsufficientSpace)
      return false;
    milliSleep(10);  
    tempErrors++;  
    return true;
    break;    
  case NdbError::UnknownResult:
    std::cout << error.message << std::endl;
    return false;
    break;
  default:
  case NdbError::PermanentError:
    switch (error.code)
    {
    case 499:
    case 250:
      milliSleep(10);    
      return true; // SCAN errors that can be retried. Requires restart of scan.
    default:
      break;
    }
    //ERROR
    std::cout << error.message << std::endl;
    return false;
    break;
  }
  return false;
}

static int nPreparedTransactions = 0;
static int MAX_RETRIES = 10;
static int parallelism = 100;


/************************************************************************
 * populate()
 * 1. Prepare 'parallelism' number of insert transactions. 
 * 2. Send transactions to NDB and wait for callbacks to execute
 */
int populate(Ndb * myNdb, int data, async_callback_t * cbData)
{

  NdbOperation*   myNdbOperation;       // For operations
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  async_callback_t * cb;
  int retries = 0;
  int current = 0;
  for(int i=0; i<1024; i++)
  {
    if(transaction[i].used == 0)
    {
      current = i;
      if (cbData == 0) 
      {
       /**
        * We already have a callback
	* This is an absolutely new transaction
        */
	cb = new async_callback_t;
	cb->retries = 0;
      }
      else 
      { 
       /**
        * We already have a callback
        */
	cb =cbData;
	retries = cbData->retries;
      }
      /**
       * Set data used by the callback
       */
      cb->ndb = myNdb;  //handle to Ndb object so that we can close transaction
                        // in the callback (alt. make myNdb global).

      cb->data =  data; //this is the data we want to insert
      cb->transaction = current; //This is the number (id)  of this transaction
      transaction[current].used = 1 ; //Mark the transaction as used
      break;
    }
  }
  if(!current)
    return -1;

  while(retries < MAX_RETRIES) 
    {
      transaction[current].conn = myNdb->startTransaction();
      if (transaction[current].conn == NULL) {
	if (asynchErrorHandler(transaction[current].conn, myNdb)) 
	{
          /**
           * no transaction to close since conn == null
           */
	  milliSleep(10);
	  retries++;
	  continue;
	}
	asynchExitHandler(myNdb);	
      }
      myNdbOperation = transaction[current].conn->getNdbOperation(myTable);
      if (myNdbOperation == NULL) 
      {
	if (asynchErrorHandler(transaction[current].conn, myNdb)) 
	{
	  myNdb->closeTransaction(transaction[current].conn);
	  transaction[current].conn = 0;
	  milliSleep(10);
	  retries++;
	  continue;
	}
	asynchExitHandler(myNdb);
      } // if
      if(myNdbOperation->insertTuple() < 0  ||
	 myNdbOperation->equal("REG_NO", data) < 0 ||
	 myNdbOperation->setValue("BRAND", "Mercedes") <0 ||
	 myNdbOperation->setValue("COLOR", "Blue") < 0)
      {
	if (asynchErrorHandler(transaction[current].conn, myNdb)) 
	{
	  myNdb->closeTransaction(transaction[current].conn);
	  transaction[current].conn = 0;
	  retries++;
	  milliSleep(10);
	  continue;
	}
	asynchExitHandler(myNdb);
      }     

      /*Prepare transaction (the transaction is NOT yet sent to NDB)*/
      transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit, 
						       &callback,
						       cb);
      /**
       * When we have prepared parallelism number of transactions ->
       * send the transaction to ndb. 
       * Next time we will deal with the transactions are in the 
       * callback. There we will see which ones that were successful
       * and which ones to retry.
       */
      if (nPreparedTransactions == parallelism-1) 
      {
	// send-poll all transactions
	// close transaction is done in callback
	myNdb->sendPollNdb(3000, parallelism );
	nPreparedTransactions=0;
      } 
      else
	nPreparedTransactions++;
      return 1;
    }
    std::cout << "Unable to recover from errors. Exiting..." << std::endl;
    asynchExitHandler(myNdb);
    return -1;
}

int main()
{
  ndb_init();
  MYSQL mysql;

  /**************************************************************
   * Connect to mysql server and create table                   *
   **************************************************************/
  {
    if ( !mysql_init(&mysql) ) {
      std::cout << "mysql_init failed\n";
      exit(-1);
    }
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
			     3306, "/tmp/mysql.sock", 0) )
      MYSQLERROR(mysql);

    mysql_query(&mysql, "CREATE DATABASE TEST_DB");
    if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);

    create_table(mysql);
  }

  /**************************************************************
   * Connect to ndb cluster                                     *
   **************************************************************/
  Ndb_cluster_connection cluster_connection;
  if (cluster_connection.connect(4, 5, 1))
  {
    std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
    exit(-1);
  }
  // Optionally connect and wait for the storage nodes (ndbd's)
  if (cluster_connection.wait_until_ready(30,0) < 0)
  {
    std::cout << "Cluster was not ready within 30 secs.\n";
    exit(-1);
  }

  Ndb* myNdb = new Ndb( &cluster_connection,
			"TEST_DB" );  // Object representing the database
  if (myNdb->init(1024) == -1) {      // Set max 1024  parallel transactions
    APIERROR(myNdb->getNdbError());
  }

  /**
   * Initialise transaction array
   */
  for(int i = 0 ; i < 1024 ; i++) 
  {
    transaction[i].used = 0;
    transaction[i].conn = 0;
    
  }
  int i=0;
  /**
   * Do 20000 insert transactions.
   */
  while(i < 20000) 
  {
    while(populate(myNdb,i,0)<0)  // <0, no space on free list. Sleep and try again.
      milliSleep(10);
      
    i++;
  }
  std::cout << "Number of temporary errors: " << tempErrors << std::endl;
  delete myNdb; 
}