/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "Configuration.hpp"
#include <ErrorHandlingMacros.hpp>
#include "GlobalData.hpp"

#include <ConfigRetriever.hpp>
#include <IPCConfig.hpp>
#include <ndb_version.h>
#include <NdbMem.h>
#include <NdbOut.hpp>
#include <WatchDog.hpp>

#include <getarg.h>

#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters_debug.h>
#include <kernel_config_parameters.h>

#include <kernel_types.h>
#include <ndb_limits.h>
#include "pc.hpp"
#include <LogLevel.hpp>
#include <NdbSleep.h>

extern "C" {
  void ndbSetOwnVersion();
}

#include <EventLogger.hpp>
extern EventLogger g_eventLogger;

bool
Configuration::init(int argc, const char** argv){

  /**
   * Default values for arguments
   */
  int _no_start = 0;
  int _initial = 0;
  const char* _connect_str = NULL;
  int _deamon = 0;
  int _help = 0;
  int _print_version = 0;
  
  /**
   * Arguments to NDB process
   */ 

  struct getargs args[] = {
    { "version", 'v', arg_flag, &_print_version, "Print ndbd version", "" },
    { "nostart", 'n', arg_flag, &_no_start,
      "Don't start ndbd immediately. Ndbd will await command from ndb_mgmd", "" },
    { "daemon", 'd', arg_flag, &_deamon, "Start ndbd as daemon", "" },
    { "initial", 'i', arg_flag, &_initial,
      "Perform initial start of ndbd, e.g. clean file system. Consult documentation before using this", "" },

    { "connect-string", 'c', arg_string, &_connect_str,
      "Set connect string for connecting to ndb_mgmd. <constr>=\"host=<hostname:port>[;nodeid=<id>]\". Overides specifying entries in NDB_CONNECTSTRING and config file",
      "<constr>" },
    { "usage", '?', arg_flag, &_help, "Print help", "" }
  };
  int num_args = sizeof(args) / sizeof(args[0]);
  int optind = 0;
  char desc[] = 
    "The MySQL Cluster kernel";
  
  if(getarg(args, num_args, argc, argv, &optind) || _help) {
    arg_printusage(args, num_args, argv[0], desc);
    return false;
  }

#if 0  
  ndbout << "no_start=" <<_no_start<< endl;
  ndbout << "initial=" <<_initial<< endl;
  ndbout << "deamon=" <<_deamon<< endl;
  ndbout << "connect_str="<<_connect_str<<endl;
  arg_printusage(args, num_args, argv[0], desc);
  return false;
#endif

  ndbSetOwnVersion();

  if (_print_version) {
    ndbPrintVersion();
    return false;
  }

  // Check the start flag
  if (_no_start)
    globalData.theRestartFlag = initial_state;
  else 
    globalData.theRestartFlag = perform_start;

  // Check the initial flag
  if (_initial)
    _initialStart = true;
  
  // Check connectstring
  if (_connect_str)
    _connectString = strdup(_connect_str);
  
  // Check deamon flag
  if (_deamon)
    _daemonMode = true;

  // Save programname
  if(argc > 0 && argv[0] != 0)
    _programName = strdup(argv[0]);
  else
    _programName = strdup("");
  
  return true;
}

Configuration::Configuration()
{
  _programName = 0;
  _connectString = 0;
  _fsPath = 0;
  _initialStart = false;
  _daemonMode = false;
  m_config_retriever= 0;
}

Configuration::~Configuration(){
  if(_programName != NULL)
    free(_programName);

  if(_fsPath != NULL)
    free(_fsPath);

  if (m_config_retriever) {
    delete m_config_retriever;
  }
}

void
Configuration::closeConfiguration(){
  if (m_config_retriever) {
    delete m_config_retriever;
  }
  m_config_retriever= 0;
}

void
Configuration::fetch_configuration(){
  /**
   * Fetch configuration from management server
   */
  if (m_config_retriever) {
    delete m_config_retriever;
  }

  m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_DB);
  m_config_retriever->setConnectString(_connectString ? _connectString : "");
  if(m_config_retriever->init() == -1 ||
     m_config_retriever->do_connect() == -1){
    
    const char * s = m_config_retriever->getErrorString();
    if(s == 0)
      s = "No error given!";
    
    /* Set stop on error to true otherwise NDB will
       go into an restart loop...
    */
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Could connect to ndb_mgmd", s);
  }
  
  ConfigRetriever &cr= *m_config_retriever;
  
  if((globalData.ownId = cr.allocNodeId()) == 0){
    for(Uint32 i = 0; i<3; i++){
      NdbSleep_SecSleep(3);
      if(globalData.ownId = cr.allocNodeId())
	break;
    }
  }
  
  if(globalData.ownId == 0){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, 
	      "Unable to alloc node id", m_config_retriever->getErrorString());
  }
  
  ndb_mgm_configuration * p = cr.getConfig();
  if(p == 0){
    const char * s = cr.getErrorString();
    if(s == 0)
      s = "No error given!";
    
    /* Set stop on error to true otherwise NDB will
       go into an restart loop...
    */
    
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Could not fetch configuration"
	      "/invalid configuration", s);
  }
  if(m_clusterConfig)
    free(m_clusterConfig);
  
  m_clusterConfig = p;
  
  ndb_mgm_configuration_iterator iter(* p, CFG_SECTION_NODE);
  if (iter.find(CFG_NODE_ID, globalData.ownId)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "DB missing");
  }
  
  if(iter.get(CFG_DB_STOP_ON_ERROR, &_stopOnError)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
	      "StopOnError missing");
  }
}

void
Configuration::setupConfiguration(){
  ndb_mgm_configuration * p = m_clusterConfig;

  /**
   * Configure transporters
   */
  {  
    int res = IPCConfig::configureTransporters(globalData.ownId,
					       * p, 
					       globalTransporterRegistry);
    if(res <= 0){
      ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
		"No transporters configured");
    }
  }

  /**
   * Setup cluster configuration data
   */
  ndb_mgm_configuration_iterator iter(* p, CFG_SECTION_NODE);
  if (iter.find(CFG_NODE_ID, globalData.ownId)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "DB missing");
  }

  unsigned type;
  if(!(iter.get(CFG_TYPE_OF_SECTION, &type) == 0 && type == NODE_TYPE_DB)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched",
	      "I'm wrong type of node");
  }
  
  if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
	      "MaxNoOfSavedMessages missing");
  }
  
  if(iter.get(CFG_DB_MEMLOCK, &_lockPagesInMainMemory)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
	      "LockPagesInMainMemory missing");
  }

  if(iter.get(CFG_DB_WATCHDOG_INTERVAL, &_timeBetweenWatchDogCheck)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
	      "TimeBetweenWatchDogCheck missing");
  }

  /**
   * Get filesystem path
   */  
  { 
    const char* pFileSystemPath = NULL;
    if(iter.get(CFG_DB_FILESYSTEM_PATH, &pFileSystemPath)){
      ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
		"FileSystemPath missing");
    } 
    
    if(pFileSystemPath == 0 || strlen(pFileSystemPath) == 0){
      ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
		"Configuration does not contain valid filesystem path");
    }
    
    if(pFileSystemPath[strlen(pFileSystemPath) - 1] == '/')
      _fsPath = strdup(pFileSystemPath);
    else {
      _fsPath = (char *)malloc(strlen(pFileSystemPath) + 2);
      strcpy(_fsPath, pFileSystemPath);
      strcat(_fsPath, "/");
    }
  }
  
  if(iter.get(CFG_DB_STOP_ON_ERROR_INSERT, &m_restartOnErrorInsert)){
    ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", 
	      "RestartOnErrorInsert missing");
  }

  /**
   * Create the watch dog thread
   */
  { 
    Uint32 t = _timeBetweenWatchDogCheck;
    t = globalEmulatorData.theWatchDog ->setCheckInterval(t);
    _timeBetweenWatchDogCheck = t;
  }
  
  ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config);

  m_clusterConfigIter = ndb_mgm_create_configuration_iterator
    (p, CFG_SECTION_NODE);

  calcSizeAlt(cf);
}

bool 
Configuration::lockPagesInMainMemory() const {
  return _lockPagesInMainMemory;
}

int 
Configuration::timeBetweenWatchDogCheck() const {
  return _timeBetweenWatchDogCheck;
}

void 
Configuration::timeBetweenWatchDogCheck(int value) {
  _timeBetweenWatchDogCheck = value;
}

int 
Configuration::maxNoOfErrorLogs() const {
  return _maxErrorLogs;
}

void 
Configuration::maxNoOfErrorLogs(int val){
  _maxErrorLogs = val;
}

bool
Configuration::stopOnError() const {
  return _stopOnError;
}

void 
Configuration::stopOnError(bool val){
  _stopOnError = val;
}

int
Configuration::getRestartOnErrorInsert() const {
  return m_restartOnErrorInsert;
}

void
Configuration::setRestartOnErrorInsert(int i){
  m_restartOnErrorInsert = i;
}

char *
Configuration::getConnectStringCopy() const {
  if(_connectString != 0)
    return strdup(_connectString);
  return 0;
}

const ndb_mgm_configuration_iterator * 
Configuration::getOwnConfigIterator() const {
  return m_ownConfigIterator;
}
  
ndb_mgm_configuration_iterator * 
Configuration::getClusterConfigIterator() const {
  return m_clusterConfigIter;
}

void
Configuration::calcSizeAlt(ConfigValues * ownConfig){
  const char * msg = "Invalid configuration fetched";
  char buf[255];

  unsigned int noOfTables = 0;
  unsigned int noOfIndexes = 0;
  unsigned int noOfReplicas = 0;
  unsigned int noOfDBNodes = 0;
  unsigned int noOfAPINodes = 0;
  unsigned int noOfMGMNodes = 0;
  unsigned int noOfNodes = 0;
  unsigned int noOfAttributes = 0;
  unsigned int noOfOperations = 0;
  unsigned int noOfTransactions = 0;
  unsigned int noOfIndexPages = 0;
  unsigned int noOfDataPages = 0;
  unsigned int noOfScanRecords = 0;
  m_logLevel = new LogLevel();
  
  /**
   * {"NoOfConcurrentCheckpointsDuringRestart", &cd.ispValues[1][5] },
   * {"NoOfConcurrentCheckpointsAfterRestart", &cd.ispValues[2][4] },
   * {"NoOfConcurrentProcessesHandleTakeover", &cd.ispValues[1][7] },
   * {"TimeToWaitAlive", &cd.ispValues[0][0] },
   */
  struct AttribStorage { int paramId; Uint32 * storage; };
  AttribStorage tmp[] = {
    { CFG_DB_NO_SCANS, &noOfScanRecords },
    { CFG_DB_NO_TABLES, &noOfTables },
    { CFG_DB_NO_INDEXES, &noOfIndexes },
    { CFG_DB_NO_REPLICAS, &noOfReplicas },
    { CFG_DB_NO_ATTRIBUTES, &noOfAttributes },
    { CFG_DB_NO_OPS, &noOfOperations },
    { CFG_DB_NO_TRANSACTIONS, &noOfTransactions }
#if 0
    { "NoOfDiskPagesToDiskDuringRestartTUP", &cd.ispValues[3][8] },
    { "NoOfDiskPagesToDiskAfterRestartTUP", &cd.ispValues[3][9] },
    { "NoOfDiskPagesToDiskDuringRestartACC", &cd.ispValues[3][10] },
    { "NoOfDiskPagesToDiskAfterRestartACC", &cd.ispValues[3][11] },
#endif
  };

  ndb_mgm_configuration_iterator db(*(ndb_mgm_configuration*)ownConfig, 0);
  
  const int sz = sizeof(tmp)/sizeof(AttribStorage);
  for(int i = 0; i<sz; i++){
    if(ndb_mgm_get_int_parameter(&db, tmp[i].paramId, tmp[i].storage)){
      snprintf(buf, sizeof(buf), "ConfigParam: %d not found", tmp[i].paramId);
      ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
    }
  }

  Uint64 indexMem = 0, dataMem = 0;
  ndb_mgm_get_int64_parameter(&db, CFG_DB_DATA_MEM, &dataMem);
  ndb_mgm_get_int64_parameter(&db, CFG_DB_INDEX_MEM, &indexMem);
  if(dataMem == 0){
    snprintf(buf, sizeof(buf), "ConfigParam: %d not found", CFG_DB_DATA_MEM);
    ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
  }

  if(indexMem == 0){
    snprintf(buf, sizeof(buf), "ConfigParam: %d not found", CFG_DB_INDEX_MEM);
    ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
  }

  noOfDataPages = (dataMem / 8192);
  noOfIndexPages = (indexMem / 8192);

  for(unsigned j = 0; j<LogLevel::LOGLEVEL_CATEGORIES; j++){
    Uint32 tmp;
    if(!ndb_mgm_get_int_parameter(&db, LogLevel::MIN_LOGLEVEL_ID+j, &tmp)){
      m_logLevel->setLogLevel((LogLevel::EventCategory)j, tmp);
    }
  }
  
  // tmp
  ndb_mgm_configuration_iterator * p = m_clusterConfigIter;

  Uint32 nodeNo = noOfNodes = 0;
  NodeBitmask nodes;
  for(ndb_mgm_first(p); ndb_mgm_valid(p); ndb_mgm_next(p), nodeNo++){
    
    Uint32 nodeId;
    Uint32 nodeType;
    
    if(ndb_mgm_get_int_parameter(p, CFG_NODE_ID, &nodeId)){
      ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, "Node data (Id) missing");
    }
    
    if(ndb_mgm_get_int_parameter(p, CFG_TYPE_OF_SECTION, &nodeType)){
      ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, "Node data (Type) missing");
    }
    
    if(nodeId > MAX_NODES || nodeId == 0){
      snprintf(buf, sizeof(buf),
	       "Invalid node id: %d", nodeId);
      ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
    }
    
    if(nodes.get(nodeId)){
      snprintf(buf, sizeof(buf), "Two node can not have the same node id: %d",
	       nodeId);
      ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
    }
    nodes.set(nodeId);
        
    switch(nodeType){
    case NODE_TYPE_DB:
      noOfDBNodes++; // No of NDB processes
      
      if(nodeId > MAX_NDB_NODES){
	snprintf(buf, sizeof(buf), "Maximum node id for a ndb node is: %d", 
		 MAX_NDB_NODES);
	ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
      }
      break;
    case NODE_TYPE_API:
      noOfAPINodes++; // No of API processes
      break;
    case NODE_TYPE_REP:
      break;
    case NODE_TYPE_MGM:
      noOfMGMNodes++; // No of MGM processes
      break;
    case NODE_TYPE_EXT_REP:
      break;
    default:
      snprintf(buf, sizeof(buf), "Unknown node type: %d", nodeType);
      ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf);
    }
  }
  noOfNodes = nodeNo;
  
  /**
   * Do size calculations
   */
  ConfigValuesFactory cfg(ownConfig);

  noOfTables++;         		 // Remove impact of system table
  noOfTables += noOfIndexes; // Indexes are tables too
  noOfAttributes += 2;  // ---"----
  noOfTables *= 2;      // Remove impact of Dict need 2 ids for each table

  if (noOfDBNodes > 15) {
    noOfDBNodes = 15;
  }//if
  Uint32 noOfLocalScanRecords = (noOfDBNodes * noOfScanRecords) + 1;
  Uint32 noOfTCScanRecords = noOfScanRecords;

  {
    /**
     * Acc Size Alt values
     */
    // Can keep 65536 pages (= 0.5 GByte)
    cfg.put(CFG_ACC_DIR_RANGE, 
	    4 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); 
    
    cfg.put(CFG_ACC_DIR_ARRAY,
	    (noOfIndexPages >> 8) + 
	    4 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas);
    
    cfg.put(CFG_ACC_FRAGMENT,
	    2 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas);
    
    /*-----------------------------------------------------------------------*/
    // The extra operation records added are used by the scan and node 
    // recovery process. 
    // Node recovery process will have its operations dedicated to ensure
    // that they never have a problem with allocation of the operation record.
    // The remainder are allowed for use by the scan processes.
    /*-----------------------------------------------------------------------*/
    cfg.put(CFG_ACC_OP_RECS,
	    ((11 * noOfOperations) / 10 + 50) + 
	    (noOfLocalScanRecords * MAX_PARALLEL_SCANS_PER_FRAG) +
	    NODE_RECOVERY_SCAN_OP_RECORDS);
    
    cfg.put(CFG_ACC_OVERFLOW_RECS,
	    noOfIndexPages + 
	    2 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas);
    
    cfg.put(CFG_ACC_PAGE8, 
	    noOfIndexPages + 32);
    
    cfg.put(CFG_ACC_ROOT_FRAG, 
	    NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas);
    
    cfg.put(CFG_ACC_TABLE, noOfTables);
    
    cfg.put(CFG_ACC_SCAN, noOfLocalScanRecords);
  }
  
  {
    /**
     * Dict Size Alt values
     */
    cfg.put(CFG_DICT_ATTRIBUTE, 
	    noOfAttributes);

    cfg.put(CFG_DICT_TABLE, 
	    noOfTables);
  }
  
  {
    /**
     * Dih Size Alt values
     */
    cfg.put(CFG_DIH_API_CONNECT, 
	    2 * noOfTransactions);
    
    cfg.put(CFG_DIH_CONNECT, 
	    noOfOperations + noOfTransactions + 46);
    
    cfg.put(CFG_DIH_FRAG_CONNECT, 
	    NO_OF_FRAG_PER_NODE *  noOfTables *  noOfDBNodes);
    
    int temp;
    temp = noOfReplicas - 2;
    if (temp < 0)
      temp = 1;
    else
      temp++;
    cfg.put(CFG_DIH_MORE_NODES, 
	    temp * NO_OF_FRAG_PER_NODE *
	    noOfTables *  noOfDBNodes);
    
    cfg.put(CFG_DIH_REPLICAS, 
	    NO_OF_FRAG_PER_NODE * noOfTables *
	    noOfDBNodes * noOfReplicas);

    cfg.put(CFG_DIH_TABLE, 
	    noOfTables);
  }
  
  {
    /**
     * Lqh Size Alt values
     */
    cfg.put(CFG_LQH_FRAG, 
	    NO_OF_FRAG_PER_NODE * noOfTables * noOfReplicas);
    
    cfg.put(CFG_LQH_TABLE, 
	    noOfTables);

    cfg.put(CFG_LQH_TC_CONNECT, 
	    (11 * noOfOperations) / 10 + 50);
    
    cfg.put(CFG_LQH_SCAN, 
	    noOfLocalScanRecords);
  }
  
  {
    /**
     * Tc Size Alt values
     */
    cfg.put(CFG_TC_API_CONNECT, 
	    3 * noOfTransactions);
    
    cfg.put(CFG_TC_TC_CONNECT, 
	    (2 * noOfOperations) + 16 + noOfTransactions);
    
    cfg.put(CFG_TC_TABLE, 
	    noOfTables);
    
    cfg.put(CFG_TC_LOCAL_SCAN, 
	    noOfLocalScanRecords);
    
    cfg.put(CFG_TC_SCAN, 
	    noOfTCScanRecords);
  }
  
  {
    /**
     * Tup Size Alt values
     */
    cfg.put(CFG_TUP_FRAG, 
	    2 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas);
    
    cfg.put(CFG_TUP_OP_RECS, 
	    (11 * noOfOperations) / 10 + 50);
    
    cfg.put(CFG_TUP_PAGE, 
	    noOfDataPages);
    
    cfg.put(CFG_TUP_PAGE_RANGE, 
	    4 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas);
    
    cfg.put(CFG_TUP_TABLE, 
	    noOfTables);
    
    cfg.put(CFG_TUP_TABLE_DESC, 
	    4 * NO_OF_FRAG_PER_NODE * noOfAttributes* noOfReplicas +
	    12 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas );
    
    cfg.put(CFG_TUP_STORED_PROC,
	    noOfLocalScanRecords);
  }

  {
    /**
     * Tux Size Alt values
     */
    cfg.put(CFG_TUX_INDEX, 
	    noOfTables);
    
    cfg.put(CFG_TUX_FRAGMENT, 
	    2 * NO_OF_FRAG_PER_NODE * noOfTables * noOfReplicas);
    
    cfg.put(CFG_TUX_ATTRIBUTE, 
	    noOfIndexes * 4);

    cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords); 
  }

  m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
  m_ownConfigIterator = ndb_mgm_create_configuration_iterator
    (m_ownConfig, 0);
}

void
Configuration::setInitialStart(bool val){
  _initialStart = val;
}