/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/*
 * ndbapi_scan.cpp: 
 * Illustrates how to use the scan api in the NDBAPI.
 * The example shows how to do scan, scan for update and scan for delete
 * using NdbScanFilter and NdbScanOperation
 *
 * Classes and methods used in this example:
 *
 *  Ndb
 *       init()
 *       waitUntilRead()
 *       getDictionary()
 *       startTransaction()
 *       closeTransaction()
 *       sendPreparedTransactions()
 *       pollNdb()
 *
 *  NdbConnection
 *       getNdbOperation()
 *       executeAsynchPrepare()
 *       getNdbError()
 *       executeScan()
 *       nextScanResult()
 *
 *  NdbDictionary::Dictionary
 *       getTable()
 *       dropTable()
 *       createTable()
 *
 *  NdbDictionary::Column
 *       setName()
 *       setType()
 *       setLength()
 *       setPrimaryKey()
 *       setNullable()
 *
 *  NdbDictionary::Table
 *       setName()
 *       addColumn()
 *
 *  NdbOperation
 *       insertTuple()
 *       equal()
 *       setValue()
 *       openScanRead()
 *       openScanExclusive()
 *
 *  NdbRecAttr
 *       aRef()
 *       u_32_value()
 *
 *  NdbResultSet
 *       nextResult()
 *       deleteTuple()
 *       updateTuple()
 *
 *  NdbScanOperation
 *       getValue() 
 *       readTuplesExclusive()
 *
 *  NdbScanFilter
 *       begin()
 *	 eq()
 *	 end()
 *
 *       
 */


#include <ndb_global.h>

#include <NdbApi.hpp>
#include <NdbScanFilter.hpp>
// Used for cout
#include <iostream>

/**
 * Helper sleep function
 */
int
milliSleep(int milliseconds){
  int result = 0;
  struct timespec sleeptime;
  sleeptime.tv_sec = milliseconds / 1000;
  sleeptime.tv_nsec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
  result = nanosleep(&sleeptime, NULL);
  return result;
}


/**
 * Helper sleep function
 */
#define APIERROR(error) \
  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
              << error.code << ", msg: " << error.message << "." << std::endl; \
    exit(-1); }

/*
 *   callback : This is called when the transaction is polled
 *              
 *   (This function must have three arguments: 
 *   - The result of the transaction, 
 *   - The NdbConnection object, and 
 *   - A pointer to an arbitrary object.)
 */
static void
callback(int result, NdbConnection* myTrans, void* aObject)
{
  if (result == -1) {
    std::cout << "In callback: " << std::endl;     
    /**
     * Put error checking code here (see ndb_async_example)
     */
    APIERROR(myTrans->getNdbError());
  } else {
    /**
     * Ok!
     */
    return;
  }
}

/**
 * Function to create table
 */
int create_table(Ndb * myNdb) 
{
  NdbDictionary::Table myTable;
  NdbDictionary::Column myColumn;
  
  NdbDictionary::Dictionary* myDict = myNdb->getDictionary();
  
  /*********************************************************
   * Create a table named GARAGE if it does not exist *
   *********************************************************/
  if (myDict->getTable("GARAGE") != NULL) {
    std::cout << "NDB already has example table: GARAGE. "
	      << "Dropping it..." << std::endl; 
    if(myDict->dropTable("GARAGE") == -1)
    {
      std::cout << "Failed to drop: GARAGE." << std::endl; 
      exit(1);
    }
  } 

  myTable.setName("GARAGE");
  
  myColumn.setName("REG_NO");
  myColumn.setType(NdbDictionary::Column::Unsigned);
  myColumn.setLength(1);
  myColumn.setPrimaryKey(true);
  myColumn.setNullable(false);
  myTable.addColumn(myColumn);

  myColumn.setName("BRAND");
  myColumn.setType(NdbDictionary::Column::Char);
  myColumn.setLength(20);
  myColumn.setPrimaryKey(false);
  myColumn.setNullable(false);
  myTable.addColumn(myColumn);


  myColumn.setName("COLOR");
  myColumn.setType(NdbDictionary::Column::Char);
  myColumn.setLength(20);
  myColumn.setPrimaryKey(false);
  myColumn.setNullable(false);
  myTable.addColumn(myColumn);

  if (myDict->createTable(myTable) == -1) {
      APIERROR(myDict->getNdbError());
      return -1;
  }
  return 1;
}


int populate(Ndb * myNdb)
{
  NdbConnection*  myNdbConnection[15];   // For transactions
  NdbOperation*   myNdbOperation;       // For operations
  /******************************************************
   * Insert (we do 15 insert transactions in parallel) *
   ******************************************************/
  /**
   * Five blue mercedes
   */
  for (int i = 0; i < 5; i++) 
  {
    myNdbConnection[i] = myNdb->startTransaction();
    if (myNdbConnection[i] == NULL) 
      APIERROR(myNdb->getNdbError());
    myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
    // Error check. If error, then maybe table GARAGE is not in database
    if (myNdbOperation == NULL) 
      APIERROR(myNdbConnection[i]->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", i);
    myNdbOperation->setValue("BRAND", "Mercedes");
    myNdbOperation->setValue("COLOR", "Blue");
    // Prepare transaction (the transaction is NOT yet sent to NDB)
    myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
  }


  /**
   * Five black bmw
   */
  for (int i = 5; i < 10; i++) 
  {
    myNdbConnection[i] = myNdb->startTransaction();
    if (myNdbConnection[i] == NULL)
      APIERROR(myNdb->getNdbError());
    myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
    // Error check. If error, then maybe table MYTABLENAME is not in database
    if (myNdbOperation == NULL) 
      APIERROR(myNdbConnection[i]->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", i);
    myNdbOperation->setValue("BRAND", "BMW");
    myNdbOperation->setValue("COLOR", "Black");
    // Prepare transaction (the transaction is NOT yet sent to NDB)
    myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
  }

  /**
   * Five pink toyotas
   */
  for (int i = 10; i < 15; i++) {
    myNdbConnection[i] = myNdb->startTransaction();
    if (myNdbConnection[i] == NULL) APIERROR(myNdb->getNdbError());
    myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
    // Error check. If error, then maybe table MYTABLENAME is not in database
    if (myNdbOperation == NULL) APIERROR(myNdbConnection[i]->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", i);
    myNdbOperation->setValue("BRAND", "Toyota");
    myNdbOperation->setValue("COLOR", "Pink");
    // Prepare transaction (the transaction is NOT yet sent to NDB)
    myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
  }

  // Send all transactions to NDB 
  myNdb->sendPreparedTransactions(0);
  // Poll all transactions
  myNdb->pollNdb(3000, 0);

  //  it is also possible to use sendPollNdb instead of
  //  myNdb->sendPreparedTransactions(0); and myNdb->pollNdb(3000, 15);  above.
  //  myNdb->sendPollNdb(3000,0);
  //  Note! Neither sendPollNdb or pollNdb returs until all 15 callbacks have 
  //  executed.

  //  Close all transactions. It is also possible to close transactions
  //  in the callback.
  for (int i = 0; i < 15; i++) 
    myNdb->closeTransaction(myNdbConnection[i]);
  return 1;
}

int scan_delete(Ndb* myNdb, 
		int parallelism,
		int column,
		int column_len,
		const char * color)
		
{
  
  // Scan all records exclusive and delete 
  // them one by one
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int deletedRows = 0;
  int check;
  NdbError              err;
  NdbConnection		*myTrans;
  NdbScanOperation	*myScanOp;

  /**
   * Loop as long as :
   *  retryMax not reached
   *  failed operations due to TEMPORARY erros
   *
   * Exit loop;
   *  retyrMax reached
   *  Permanent error (return -1)
   */
  while (true)
  {
    if (retryAttempt >= retryMax)
    {
      std::cout << "ERROR: has retried this operation " << retryAttempt 
		<< " times, failing!" << std::endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
	milliSleep(50);
	retryAttempt++;
	continue;
      }
      std::cout <<  err.message << std::endl;
      return -1;
    }

   /**
    * Get a scan operation.
    */
    myScanOp = myTrans->getNdbScanOperation("GARAGE");	
    if (myScanOp == NULL) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    /**
     * Define a result set for the scan.
     */ 
    NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
    if( rs == 0 ) {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    } 

    /**
     * Use NdbScanFilter to define a search critera
     */ 
    NdbScanFilter filter(myScanOp) ;   
    if(filter.begin(NdbScanFilter::AND) < 0  || 
       filter.eq(column, color, column_len, false) <0||
       filter.end() <0)
    {
      std::cout <<  myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }    
    
    /**
     * Start scan    (NoCommit since we are only reading at this stage);
     */     
    if(myTrans->execute(NoCommit) != 0){      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
	std::cout << myTrans->getNdbError().message << std::endl;
	myNdb->closeTransaction(myTrans);
	milliSleep(50);
	continue;
      }
      std::cout << err.code << std::endl;
      std::cout << myTrans->getNdbError().code << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }


   /**
    * start of loop: nextResult(true) means that "parallelism" number of
    * rows are fetched from NDB and cached in NDBAPI
    */    
    while((check = rs->nextResult(true)) == 0){
      do {
	if (rs->deleteTuple() != 0){
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  return -1;
	}
	deletedRows++;
	
	/**
	 * nextResult(false) means that the records 
	 * cached in the NDBAPI are modified before
	 * fetching more rows from NDB.
	 */    
      } while((check = rs->nextResult(false)) == 0);
      
      /**
       * Commit when all cached tuple have been marked for deletion
       */    
      if(check != -1){
	check = myTrans->execute(Commit);   
	myTrans->releaseCompletedOperations();
      }
      /**
       * Check for errors
       */
      err = myTrans->getNdbError();    
      if(check == -1){
	if(err.status == NdbError::TemporaryError){
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  milliSleep(50);
	  continue;
	}	
      }
      /**
       * End of loop 
       */
    }
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return 0;

    
  }
  if(myTrans!=0) {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
  }
  return -1;
}


int scan_update(Ndb* myNdb, 
		int parallelism,
		int column_len,
		int update_column,
		const char * column_name,
		const char * before_color,
		const char * after_color)
		
{
  
  // Scan all records exclusive and update
  // them one by one
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int updatedRows = 0;
  int check;
  NdbError              err;
  NdbConnection		*myTrans;
  NdbScanOperation	*myScanOp;

  /**
   * Loop as long as :
   *  retryMax not reached
   *  failed operations due to TEMPORARY erros
   *
   * Exit loop;
   *  retyrMax reached
   *  Permanent error (return -1)
   */
  while (true)
  {

    if (retryAttempt >= retryMax)
    {
      std::cout << "ERROR: has retried this operation " << retryAttempt 
		<< " times, failing!" << std::endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
	milliSleep(50);
	retryAttempt++;
	continue;
      }
      std::cout <<  err.message << std::endl;
      return -1;
    }

   /**
    * Get a scan operation.
    */
    myScanOp = myTrans->getNdbScanOperation("GARAGE");	
    if (myScanOp == NULL) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    /**
     * Define a result set for the scan.
     */ 
    NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
    if( rs == 0 ) {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    } 

    /**
     * Use NdbScanFilter to define a search critera
     */ 
    NdbScanFilter filter(myScanOp) ;   
    if(filter.begin(NdbScanFilter::AND) < 0  || 
       filter.eq(update_column, before_color, column_len, false) <0||
       filter.end() <0)
    {
      std::cout <<  myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }    
    
    /**
     * Start scan    (NoCommit since we are only reading at this stage);
     */     
    if(myTrans->execute(NoCommit) != 0){      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
	std::cout << myTrans->getNdbError().message << std::endl;
	myNdb->closeTransaction(myTrans);
	milliSleep(50);
	continue;
      }
      std::cout << myTrans->getNdbError().code << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

   /**
    * Define an update operation
    */    
    NdbOperation * myUpdateOp;
   /**
    * start of loop: nextResult(true) means that "parallelism" number of
    * rows are fetched from NDB and cached in NDBAPI
    */    
    while((check = rs->nextResult(true)) == 0){
      do {
	/**
	 * Get update operation
	 */    
	myUpdateOp = rs->updateTuple();
	if (myUpdateOp == 0){
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  return -1;
	}
	updatedRows++;
	/**
	 * do the update
	 */    
	myUpdateOp->setValue(update_column,after_color);
	/**
	 * nextResult(false) means that the records 
	 * cached in the NDBAPI are modified before
	 * fetching more rows from NDB.
	 */    
      } while((check = rs->nextResult(false)) == 0);
      
      /**
       * Commit when all cached tuple have been updated
       */    
      if(check != -1){
	check = myTrans->execute(Commit);   
	myTrans->releaseCompletedOperations();
      }
      /**
       * Check for errors
       */
      err = myTrans->getNdbError();    
      if(check == -1){
	if(err.status == NdbError::TemporaryError){
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  milliSleep(50);
	  continue;
	}	
      }
      /**
       * End of loop 
       */
    }
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return 0;

    
  }
  if(myTrans!=0) {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
  }
  return -1;
}



int scan_print(Ndb * myNdb, int parallelism, 
	       int column_len_brand, 
	       int column_len_color) 
{
// Scan all records exclusive and update
  // them one by one
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int fetchedRows = 0;
  int check;
  NdbError              err;
  NdbConnection		*myTrans;
  NdbScanOperation	*myScanOp;
  /* Result of reading attribute value, three columns:
     REG_NO, BRAND, and COLOR
   */
  NdbRecAttr *    	myRecAttr[3];   

  /**
   * Loop as long as :
   *  retryMax not reached
   *  failed operations due to TEMPORARY erros
   *
   * Exit loop;
   *  retyrMax reached
   *  Permanent error (return -1)
   */
  while (true)
  {

    if (retryAttempt >= retryMax)
    {
      std::cout << "ERROR: has retried this operation " << retryAttempt 
		<< " times, failing!" << std::endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
	milliSleep(50);
	retryAttempt++;
	continue;
      }
     std::cout << err.message << std::endl;
      return -1;
    }
    /*
     * Define a scan operation. 
     * NDBAPI.
     */
    myScanOp = myTrans->getNdbScanOperation("GARAGE");	
    if (myScanOp == NULL) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    /**
     * Define a result set for the scan.
     */ 
    NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
    if( rs == 0 ) {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    } 

    /**
     * Define storage for fetched attributes.
     * E.g., the resulting attributes of executing
     * myOp->getValue("REG_NO") is placed in myRecAttr[0].
     * No data exists in myRecAttr until transaction has commited!
     */
    myRecAttr[0] = myScanOp->getValue("REG_NO");
    myRecAttr[1] = myScanOp->getValue("BRAND");
    myRecAttr[2] = myScanOp->getValue("COLOR");
    if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) 
    {
	std::cout << myTrans->getNdbError().message << std::endl;
	myNdb->closeTransaction(myTrans);
	return -1;
    }
    /**
     * Start scan   (NoCommit since we are only reading at this stage);
     */     
    if(myTrans->execute(NoCommit) != 0){      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
	std::cout << myTrans->getNdbError().message << std::endl;
	myNdb->closeTransaction(myTrans);
	milliSleep(50);
	continue;
      }
      std::cout << err.code << std::endl;
      std::cout << myTrans->getNdbError().code << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }
    
    /**
     * start of loop: nextResult(true) means that "parallelism" number of
     * rows are fetched from NDB and cached in NDBAPI
     */    
    while((check = rs->nextResult(true)) == 0){
      do {
	
	fetchedRows++;
	/**
	 * print  REG_NO unsigned int
	 */
	std::cout << myRecAttr[0]->u_32_value() << "\t";
	char * buf_brand = new char[column_len_brand+1];
	char * buf_color = new char[column_len_color+1];
	/**
	 * print  BRAND character string
	 */
	memcpy(buf_brand, myRecAttr[1]->aRef(), column_len_brand);
	buf_brand[column_len_brand] = 0;
	std::cout << buf_brand << "\t";
	delete [] buf_brand;
	/**
	 * print  COLOR character string
	 */
	memcpy(buf_color, myRecAttr[2]->aRef(), column_len_color);
	buf_brand[column_len_color] = 0;
	std::cout << buf_color << std::endl;
	delete [] buf_color;	
	/**
	 * nextResult(false) means that the records 
	 * cached in the NDBAPI are modified before
	 * fetching more rows from NDB.
	 */    
      } while((check = rs->nextResult(false)) == 0);

    }    
    myNdb->closeTransaction(myTrans);
    return 1;
  }
  return -1;

}


int main()
{
  ndb_init();
  Ndb* myNdb = new Ndb( "TEST_DB" );  // Object representing the database

  
  
  /*******************************************
   * Initialize NDB and wait until its ready *
   *******************************************/
  if (myNdb->init(1024) == -1) {          // Set max 1024  parallel transactions
    APIERROR(myNdb->getNdbError());
    exit(-1);
  }

  if (myNdb->waitUntilReady(30) != 0) {
    std::cout << "NDB was not ready within 30 secs." << std::endl;
    exit(-1);
  }
  create_table(myNdb);
  
  NdbDictionary::Dictionary* myDict = myNdb->getDictionary();
  int column_color = myDict->getTable("GARAGE")->getColumn("COLOR")->getColumnNo();
  int column_len_color = 
    myDict->getTable("GARAGE")->getColumn("COLOR")->getLength();
  int column_len_brand = 
    myDict->getTable("GARAGE")->getColumn("BRAND")->getLength();
  int parallelism = 16;
  

  if(populate(myNdb) > 0)
    std::cout << "populate: Success!" << std::endl;

  if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
  
  std::cout << "Going to delete all pink cars!" << std::endl;
  if(scan_delete(myNdb, parallelism, column_color,
		 column_len_color, "Pink") > 0)
    std::cout << "scan_delete: Success!" << std::endl  << std::endl;

  if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
  
  std::cout << "Going to update all blue cars to black cars!" << std::endl;
  if(scan_update(myNdb, parallelism, column_len_color, column_color, 
		 "COLOR", "Blue", "Black") > 0) 
  {
    std::cout << "scan_update: Success!" << std::endl  << std::endl;
  }
  if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;

  delete myNdb; 
}