/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include <ndb_global.h>
#include <my_pthread.h>

#include <SocketServer.hpp>

#include <NdbTCP.h>
#include <NdbOut.hpp>
#include <NdbThread.h>
#include <NdbSleep.h>

#define DEBUG(x) ndbout << x << endl;

SocketServer::SocketServer(unsigned maxSessions) :
  m_sessions(10),
  m_services(5)
{
  m_thread = 0;
  m_stopThread = false;
  m_maxSessions = maxSessions;
}

SocketServer::~SocketServer() {
  unsigned i;
  for(i = 0; i<m_sessions.size(); i++){
    delete m_sessions[i].m_session;
  }
  for(i = 0; i<m_services.size(); i++){
    delete m_services[i].m_service;
  }
}

bool
SocketServer::tryBind(unsigned short port, const char * intface) {
  struct sockaddr_in servaddr;
  memset(&servaddr, 0, sizeof(servaddr));
  servaddr.sin_family = AF_INET;
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  servaddr.sin_port = htons(port);
  
  if(intface != 0){
    if(Ndb_getInAddr(&servaddr.sin_addr, intface))
      return false;
  }
  
  const NDB_SOCKET_TYPE sock  = socket(AF_INET, SOCK_STREAM, 0);
  if (sock == NDB_INVALID_SOCKET) {
    return false;
  }
  
  DBUG_PRINT("info",("NDB_SOCKET: %d", sock));

  const int on = 1;
  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
		 (const char*)&on, sizeof(on)) == -1) {
    NDB_CLOSE_SOCKET(sock);
    return false;
  }
  
  if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
    NDB_CLOSE_SOCKET(sock);
    return false;
  }

  NDB_CLOSE_SOCKET(sock);
  return true;
}

bool
SocketServer::setup(SocketServer::Service * service, 
		    unsigned short port, 
		    const char * intface){
  DBUG_ENTER("SocketServer::setup");
  DBUG_PRINT("enter",("interface=%s, port=%d", intface, port));
  struct sockaddr_in servaddr;
  memset(&servaddr, 0, sizeof(servaddr));
  servaddr.sin_family = AF_INET;
  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  servaddr.sin_port = htons(port);
  
  if(intface != 0){
    if(Ndb_getInAddr(&servaddr.sin_addr, intface))
      DBUG_RETURN(false);
  }
  
  const NDB_SOCKET_TYPE sock  = socket(AF_INET, SOCK_STREAM, 0);
  if (sock == NDB_INVALID_SOCKET) {
    DBUG_PRINT("error",("socket() - %d - %s",
			errno, strerror(errno)));
    DBUG_RETURN(false);
  }
  
  DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
 
  const int on = 1;
  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
		 (const char*)&on, sizeof(on)) == -1) {
    DBUG_PRINT("error",("getsockopt() - %d - %s",
			errno, strerror(errno)));
    NDB_CLOSE_SOCKET(sock);
    DBUG_RETURN(false);
  }
  
  if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
    DBUG_PRINT("error",("bind() - %d - %s",
			errno, strerror(errno)));
    NDB_CLOSE_SOCKET(sock);
    DBUG_RETURN(false);
  }
  
  if (listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1){
    DBUG_PRINT("error",("listen() - %d - %s",
			errno, strerror(errno)));
    NDB_CLOSE_SOCKET(sock);
    DBUG_RETURN(false);
  }
  
  ServiceInstance i;
  i.m_socket = sock;
  i.m_service = service;
  m_services.push_back(i);
  DBUG_RETURN(true);
}

void
SocketServer::doAccept(){
  fd_set readSet, exceptionSet;
  FD_ZERO(&readSet);
  FD_ZERO(&exceptionSet);
  
  m_services.lock();
  int maxSock = 0;
  for (unsigned i = 0; i < m_services.size(); i++){
    const NDB_SOCKET_TYPE s = m_services[i].m_socket;
    FD_SET(s, &readSet);
    FD_SET(s, &exceptionSet);
    maxSock = (maxSock > s ? maxSock : s);
  }
  struct timeval timeout;
  timeout.tv_sec  = 1;
  timeout.tv_usec = 0;
  
  if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
    for (unsigned i = 0; i < m_services.size(); i++){
      ServiceInstance & si = m_services[i];
      
      if(FD_ISSET(si.m_socket, &readSet)){
	NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
	if(childSock == NDB_INVALID_SOCKET){
	  continue;
	}
	
	SessionInstance s;
	s.m_service = si.m_service;
	s.m_session = si.m_service->newSession(childSock);
	if(s.m_session != 0){
	  m_sessions.push_back(s);
	  startSession(m_sessions.back());
	}
	
	continue;
      }      
      
      if(FD_ISSET(si.m_socket, &exceptionSet)){
	DEBUG("socket in the exceptionSet");
	continue;
      }
    }
  }
  m_services.unlock();
}

extern "C"
void* 
socketServerThread_C(void* _ss){
  SocketServer * ss = (SocketServer *)_ss;
  ss->doRun();
  return 0;
}

void
SocketServer::startServer(){
  m_threadLock.lock();
  if(m_thread == 0 && m_stopThread == false){
    m_thread = NdbThread_Create(socketServerThread_C,
				(void**)this,
				32768,
				"NdbSockServ",
				NDB_THREAD_PRIO_LOW);
  }
  m_threadLock.unlock();
}

void
SocketServer::stopServer(){
  m_threadLock.lock();
  if(m_thread != 0){
    m_stopThread = true;
    
    void * res;
    NdbThread_WaitFor(m_thread, &res);
    NdbThread_Destroy(&m_thread);
    m_thread = 0;
  }
  m_threadLock.unlock();
}

void
SocketServer::doRun(){

  while(!m_stopThread){
    checkSessions();
    if(m_sessions.size() < m_maxSessions){
      doAccept();
    } else {
      NdbSleep_MilliSleep(200);
    }
  }
}

void
SocketServer::startSession(SessionInstance & si){
  si.m_thread = NdbThread_Create(sessionThread_C,
				 (void**)si.m_session,
				 32768,
				 "NdbSock_Session",
				 NDB_THREAD_PRIO_LOW);
}

static
bool 
transfer(NDB_SOCKET_TYPE sock){
#if defined NDB_OSE || defined NDB_SOFTOSE
  const PROCESS p = current_process();
  const size_t ps = sizeof(PROCESS);
  int res = setsockopt(sock, SOL_SOCKET, SO_OSEOWNER, &p, ps);
  if(res != 0){
    ndbout << "Failed to transfer ownership of socket" << endl;
    return false;
  }
#endif
  return true;
}

void
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
{
  for(int i = m_sessions.size() - 1; i >= 0; i--){
    (*func)(m_sessions[i].m_session, data);
  }
  checkSessions();
}

void
SocketServer::checkSessions(){
  for(int i = m_sessions.size() - 1; i >= 0; i--){
    if(m_sessions[i].m_session->m_stopped){
      if(m_sessions[i].m_thread != 0){
	void* ret;
	NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
	NdbThread_Destroy(&m_sessions[i].m_thread);
      } 
      m_sessions[i].m_session->stopSession();
      delete m_sessions[i].m_session;
      m_sessions.erase(i);
    }
  }
}

void
SocketServer::stopSessions(bool wait){
  int i;
  for(i = m_sessions.size() - 1; i>=0; i--)
  {
    m_sessions[i].m_session->stopSession();
    m_sessions[i].m_session->m_stop = true; // to make sure
  }
  for(i = m_services.size() - 1; i>=0; i--)
    m_services[i].m_service->stopSessions();
  
  if(wait){
    while(m_sessions.size() > 0){
      checkSessions();
      NdbSleep_MilliSleep(100);
    }
  }
}

/***** Session code ******/

extern "C"
void* 
sessionThread_C(void* _sc){
  SocketServer::Session * si = (SocketServer::Session *)_sc;

  if(!transfer(si->m_socket)){
    si->m_stopped = true;
    return 0;
  }
  
  if(!si->m_stop){
    si->m_stopped = false;
    si->runSession();
  } else {
    NDB_CLOSE_SOCKET(si->m_socket);
  }
  
  si->m_stopped = true;
  return 0;
}

template class MutexVector<SocketServer::ServiceInstance>;
template class MutexVector<SocketServer::SessionInstance>;