/* Copyright (C) 2005 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include <ndb_opts.h>
#include <NdbApi.hpp>
#include <NdbTest.hpp>
#include <my_sys.h>
#include <ndb_version.h>

#if NDB_VERSION_D < MAKE_VERSION(5, 1, 0)
#define version50
#else
#undef version50
#endif

#if !defined(min) || !defined(max)
#define min(x, y) ((x) < (y) ? (x) : (y))
#define max(x, y) ((x) > (y) ? (x) : (y))
#endif

/*
 * Test composite operations on same PK via events.  The merge of event
 * data can happen in 2 places:
 *
 * 1) In TUP at commit, the detached triggers report a single composite
 * operation and its post/pre data
 *
 * 2) In event API version >= 5.1 separate commits within same GCI are
 * by default merged.  This is required to read blob data via NdbBlob.
 *
 * This test program ignores Blob columns in version 5.0.
 *
 * There are 5 ways (ignoring NUL operand) to compose 2 ops:
 *                      5.0 bugs        5.1 bugs
 * INS o DEL = NUL
 * INS o UPD = INS                      type=INS
 * DEL o INS = UPD      type=INS        type=INS
 * UPD o DEL = DEL      no event
 * UPD o UPD = UPD
 */

struct Opts {
  my_bool abort_on_error;
  int loglevel;
  uint loop;
  uint maxops;
  uint maxpk;
  const char* opstr;
  uint seed;
  my_bool separate_events;
  my_bool use_table;
};

static Opts g_opts;
static const uint g_maxops = 10000;
static const uint g_maxpk = 100;

static Ndb_cluster_connection* g_ncc = 0;
static Ndb* g_ndb = 0;
static NdbDictionary::Dictionary* g_dic = 0;
static NdbTransaction* g_con = 0;
static NdbOperation* g_op = 0;

static const char* g_tabname = "tem1";
static const char* g_evtname = "tem1ev1";
static const uint g_charlen = 5;
static const char* g_charval = "abcde";
static const char* g_csname = "latin1_swedish_ci";

static const NdbDictionary::Table* g_tab = 0;
static const NdbDictionary::Event* g_evt = 0;

static NdbEventOperation* g_evt_op = 0;

static uint
urandom(uint n)
{
  uint r = (uint)random();
  if (n != 0)
    r = r % n;
  return r;
}

static int& g_loglevel = g_opts.loglevel; // default log level

#define chkdb(x) \
  do { if (x) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; errdb(); if (g_opts.abort_on_error) abort(); return -1; } while (0)

#define chkrc(x) \
  do { if (x) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; if (g_opts.abort_on_error) abort(); return -1; } while (0)

#define reqrc(x) \
  do { if (x) break; ndbout << "line " << __LINE__ << " ASSERT " << #x << endl; abort(); } while (0)

#define ll0(x) \
  do { if (g_loglevel < 0) break; ndbout << x << endl; } while (0)

#define ll1(x) \
  do { if (g_loglevel < 1) break; ndbout << x << endl; } while (0)

#define ll2(x) \
  do { if (g_loglevel < 2) break; ndbout << x << endl; } while (0)

static void
errdb()
{
  uint any = 0;
  if (g_ndb != 0) {
    const NdbError& e = g_ndb->getNdbError();
    if (e.code != 0)
      ll0(++any << " ndb: error " << e);
  }
  if (g_dic != 0) {
    const NdbError& e = g_dic->getNdbError();
    if (e.code != 0)
      ll0(++any << " dic: error " << e);
  }
  if (g_con != 0) {
    const NdbError& e = g_con->getNdbError();
    if (e.code != 0)
      ll0(++any << " con: error " << e);
  }
  if (g_op != 0) {
    const NdbError& e = g_op->getNdbError();
    if (e.code != 0)
      ll0(++any << " op: error " << e);
  }
  if (g_evt_op != 0) {
    const NdbError& e = g_evt_op->getNdbError();
    if (e.code != 0)
      ll0(++any << " evt_op: error " << e);
  }
  if (! any)
    ll0("unknown db error");
}

struct Col {
  uint no;
  const char* name;
  NdbDictionary::Column::Type type;
  bool pk;
  bool nullable;
  uint length;
  uint size;
};

static Col g_col[] = {
  { 0, "pk1", NdbDictionary::Column::Unsigned, true, false, 1, 4 },
  { 1, "pk2", NdbDictionary::Column::Char, true, false,  g_charlen, g_charlen },
  { 2, "seq", NdbDictionary::Column::Unsigned,  false, false, 1, 4 },
  { 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen }
};

static const uint g_ncol = sizeof(g_col)/sizeof(g_col[0]);

static const Col&
getcol(uint i)
{
  if (i < g_ncol)
    return g_col[i];
  assert(false);
  return g_col[g_ncol];
}

static const Col&
getcol(const char* name)
{
  uint i;
  for (i = 0; i < g_ncol; i++)
    if (strcmp(g_col[i].name, name) == 0)
      break;
  return getcol(i);
}

static int
createtable()
{
  g_tab = 0;
  NdbDictionary::Table tab(g_tabname);
  tab.setLogging(false);
  CHARSET_INFO* cs;
  chkrc((cs = get_charset_by_name(g_csname, MYF(0))) != 0);
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    NdbDictionary::Column col(c.name);
    col.setType(c.type);
    col.setPrimaryKey(c.pk);
    if (! c.pk)
      col.setNullable(true);
    col.setLength(c.length);
    switch (c.type) {
    case NdbDictionary::Column::Unsigned:
      break;
    case NdbDictionary::Column::Char:
      col.setLength(c.length);
      col.setCharset(cs);
      break;
    default:
      assert(false);
      break;
    }
    tab.addColumn(col);
  }
  g_dic = g_ndb->getDictionary();
  if (! g_opts.use_table) {
    if (g_dic->getTable(g_tabname) != 0)
      chkdb(g_dic->dropTable(g_tabname) == 0);
    chkdb(g_dic->createTable(tab) == 0);
  }
  chkdb((g_tab = g_dic->getTable(g_tabname)) != 0);
  g_dic = 0;
  if (! g_opts.use_table) {
    // extra row for GCI probe
    chkdb((g_con = g_ndb->startTransaction()) != 0);
    chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
    chkdb(g_op->insertTuple() == 0);
    Uint32 pk1;
    char pk2[g_charlen + 1];
    pk1 = g_maxpk;
    sprintf(pk2, "%-*u", g_charlen, pk1);
    chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
    chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0);
    chkdb(g_con->execute(Commit) == 0);
    g_ndb->closeTransaction(g_con);
    g_op = 0;
    g_con = 0;
  }
  return 0;
}

static int
droptable()
{
  if (! g_opts.use_table) {
    g_dic = g_ndb->getDictionary();
    chkdb(g_dic->dropTable(g_tab->getName()) == 0);
    g_tab = 0;
    g_dic = 0;
  }
  return 0;
}

struct Data {
  Uint32 pk1;
  char pk2[g_charlen + 1];
  Uint32 seq;
  char cc1[g_charlen + 1];
  void* ptr[g_ncol];
  int ind[g_ncol]; // -1 = no data, 1 = NULL, 0 = not NULL
  void init() {
    uint i;
    pk1 = 0;
    memset(pk2, 0, sizeof(pk2));
    seq = 0;
    memset(cc1, 0, sizeof(cc1));
    ptr[0] = &pk1;
    ptr[1] = pk2;
    ptr[2] = &seq;
    ptr[3] = cc1;
    for (i = 0; i < g_ncol; i++)
      ind[i] = -1;
  }
};

static int
cmpdata(const Data& d1, const Data& d2)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    if (d1.ind[i] != d2.ind[i])
      return 1;
    if (d1.ind[i] == 0 && memcmp(d1.ptr[i], d2.ptr[i], c.size) != 0)
      return 1;
  }
  return 0;
}

static int
cmpdata(const Data (&d1)[2], const Data (&d2)[2])
{
  if (cmpdata(d1[0], d2[0]) != 0)
    return 1;
  if (cmpdata(d1[1], d2[1]) != 0)
    return 1;
  return 0;
}

static NdbOut&
operator<<(NdbOut& out, const Data& d)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    out << (i == 0 ? "" : " ") << c.name << "=";
    if (d.ind[i] == -1)
      continue;
    if (d.ind[i] == 1) {
      out << "NULL";
      continue;
    }
    switch (c.type) {
    case NdbDictionary::Column::Unsigned:
      out << *(Uint32*)d.ptr[i];
      break;
    case NdbDictionary::Column::Char:
      {
        char buf[g_charlen + 1];
        memcpy(buf, d.ptr[i], g_charlen);
        uint n = g_charlen;
        while (1) {
          buf[n] = 0;
          if (n == 0 || buf[n - 1] != 0x20)
            break;
          n--;
        }
        out << "'" << buf << "'";
      }
      break;
    default:
      out << "?";
      break;
    }
  }
  return out;
}

static const uint g_optypes = 3; // real ops 0-2

/*
 * Represents single or composite operation or received event.  The
 * post/pre data is either computed here for operations or received from
 * the event.
 */
struct Op { // single or composite
  enum Kind { OP = 1, EV = 2 };
  enum Type { NUL = -1, INS, DEL, UPD };
  Kind kind;
  Type type;
  Op* next_op; // within one commit
  Op* next_com; // next commit chain or next event
  uint num_op;
  uint num_com;
  Data data[2]; // 0-post 1-pre
  bool match; // matched to event
  void init() {
    assert(kind == OP || kind == EV);
    type = NUL;
    next_op = next_com = 0;
    num_op = num_com = 0;
    data[0].init();
    data[1].init();
    match = false;
  }
};

static NdbOut&
operator<<(NdbOut& out, Op::Type t)
{
  switch (t) {
  case Op::NUL:
    out << "NUL";
    break;
  case Op::INS:
    out << "INS";
    break;
  case Op::DEL:
    out << "DEL";
    break;
  case Op::UPD:
    out << "UPD";
    break;
  default:
    out << (int)t;
    break;
  }
  return out;
}

static NdbOut&
operator<<(NdbOut& out, const Op& op)
{
  out << op.type;
  out << " " << op.data[0];
  out << " [" << op.data[1] << "]";
  return out;
}

static int
seteventtype(Op* ev, NdbDictionary::Event::TableEvent te)
{
  Op::Type t = Op::NUL;
  switch (te) {
  case NdbDictionary::Event::TE_INSERT:
    t = Op::INS;
    break;
  case NdbDictionary::Event::TE_DELETE:
    t = Op::DEL;
    break;
  case NdbDictionary::Event::TE_UPDATE:
    t = Op::UPD;
    break;
  default:
    ll0("EVT: " << *ev << ": bad event type" << (int)te);
    return -1;
  }
  ev->type = t;
  return 0;
}

static uint g_usedops = 0;
static uint g_usedevs = 0;
static Op g_oplist[g_maxops];
static Op g_evlist[g_maxops];
static uint g_maxcom = 8; // max ops per commit

static Op* g_pk_op[g_maxpk];
static Op* g_pk_ev[g_maxpk];
static uint g_seq = 0;

static NdbRecAttr* g_ra[2][g_ncol]; // 0-post 1-pre
static Op* g_rec_ev;
static uint g_ev_cnt[g_maxpk];

static uint
getfreeops()
{
  assert(g_opts.maxops >= g_usedops);
  return g_opts.maxops - g_usedops;
}

static uint
getfreeevs()
{
  assert(g_opts.maxops >= g_usedevs);
  return g_opts.maxops - g_usedevs;
}

static Op*
getop()
{
  if (g_usedops < g_opts.maxops) {
    Op* op = &g_oplist[g_usedops++];
    op->kind = Op::OP;
    op->init();
    return op;
  }
  assert(false);
  return 0;
}

static Op*
getev()
{
  if (g_usedevs < g_opts.maxops) {
    Op* ev = &g_evlist[g_usedevs++];
    ev->kind = Op::EV;
    ev->init();
    return ev;
  }
  assert(false);
  return 0;
}

static void
resetmem()
{
  int i, j;
  for (j = 0; j < 2; j++)
    for (i = 0; i < g_ncol; i++)
      g_ra[j][i] = 0;
  g_rec_ev = 0;
  for (i = 0; i < g_opts.maxpk; i++)
    g_pk_op[i] = 0;
  for (i = 0; i < g_opts.maxpk; i++)
    g_ev_cnt[i] = 0;
  g_seq = 0;
  g_usedops = 0;
  g_usedevs = 0;
}

struct Comp {
  Op::Type t1, t2, t3;
};

static Comp
g_comp[] = {
  { Op::INS, Op::DEL, Op::NUL },
  { Op::INS, Op::UPD, Op::INS },
  { Op::DEL, Op::INS, Op::UPD },
  { Op::UPD, Op::DEL, Op::DEL },
  { Op::UPD, Op::UPD, Op::UPD }
};

static const uint g_ncomp = sizeof(g_comp)/sizeof(g_comp[0]);

static int
checkop(const Op* op, Uint32& pk1)
{
  const Data (&d)[2] = op->data;
  Op::Type t = op->type;
  chkrc(t == Op::NUL || t == Op::INS || t == Op::DEL || t == Op::UPD);
  { const Col& c = getcol("pk1");
    chkrc(d[0].ind[c.no] == 0);
    pk1 = d[0].pk1;
    chkrc(pk1 < g_opts.maxpk);
  }
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    if (t != Op::NUL) {
      if (c.pk) {
        chkrc(d[0].ind[i] == 0); // even DEL has PK in post data
        if (t == Op::INS) {
          chkrc(d[1].ind[i] == -1);
        } else if (t == Op::DEL) {
          chkrc(d[1].ind[i] == -1);
        } else {
          chkrc(d[1].ind[i] == 0);
        }
      } else {
        if (t == Op::INS) {
          chkrc(d[0].ind[i] >= 0);
          chkrc(d[1].ind[i] == -1);
        } else if (t == Op::DEL) {
          chkrc(d[0].ind[i] == -1);
          chkrc(d[1].ind[i] >= 0);
        } else if (op->kind == Op::OP) {
          chkrc(d[0].ind[i] >= 0);
          chkrc(d[1].ind[i] >= 0);
        }
      }
    }
  }
  return 0;
}

static Comp*
comptype(Op::Type t1, Op::Type t2) // only non-NUL
{
  uint i;
  for (i = 0; i < g_ncomp; i++)
    if (g_comp[i].t1 == t1 && g_comp[i].t2 == t2)
      return &g_comp[i];
  return 0;
}

static void
copycol(const Col& c, const Data& d1, Data& d3)
{
  if ((d3.ind[c.no] = d1.ind[c.no]) != -1)
    memmove(d3.ptr[c.no], d1.ptr[c.no], c.size);
}

static void
copydata(const Data& d1, Data& d3, bool pk, bool nonpk)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    if (c.pk && pk || ! c.pk && nonpk)
      copycol(c, d1, d3);
  }
}

// not needed for ops
static void
compdata(const Data& d1, const Data& d2, Data& d3, bool pk, bool nonpk)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    if (c.pk && pk || ! c.pk && nonpk) {
      const Data* d = 0;
      if (d1.ind[i] == -1 && d2.ind[i] == -1)
        d3.ind[i] = -1;
      else if (d1.ind[i] == -1 && d2.ind[i] != -1)
        d = &d2;
      else if (d1.ind[i] != -1 && d2.ind[i] == -1)
        d = &d1;
      else
        d = &d2;
      if (d != 0)
        copycol(c, *d, d3);
    }
  }
}

static void
copyop(const Op* op1, Op* op3)
{
  op3->type = op1->type;
  copydata(op1->data[0], op3->data[0], true, true);
  copydata(op1->data[1], op3->data[1], true, true);
  Uint32 pk1_tmp;
  reqrc(checkop(op3, pk1_tmp) == 0);
}

static int
compop(const Op* op1, const Op* op2, Op* op3) // op1 o op2 = op3
{
  Comp* comp;
  if (op2->type == Op::NUL) {
    copyop(op1, op3);
    return 0;
  }
  if (op1->type == Op::NUL) {
    copyop(op2, op3);
    return 0;
  }
  chkrc((comp = comptype(op1->type, op2->type)) != 0);
  op3->type = comp->t3;
  // sucks
  copydata(op2->data[0], op3->data[0], true, false);
  if (op3->type != Op::DEL)
    copydata(op2->data[0], op3->data[0], false, true);
  if (op3->type != Op::INS)
    copydata(op1->data[1], op3->data[1], false, true);
  if (op3->type == Op::UPD)
    copydata(op2->data[0], op3->data[1], true, false);
  Uint32 pk1_tmp;
  reqrc(checkop(op3, pk1_tmp) == 0);
  // not eliminating identical post-pre fields
  return 0;
}

static int
createevent()
{
  ll1("createevent");
  g_evt = 0;
  g_dic = g_ndb->getDictionary();
  NdbDictionary::Event evt(g_evtname);
  evt.setTable(*g_tab);
  evt.addTableEvent(NdbDictionary::Event::TE_ALL);
  // pk always
  evt.addEventColumn("pk1");
  evt.addEventColumn("pk2");
  // simple cols
  evt.addEventColumn("seq");
  evt.addEventColumn("cc1");
  if (g_dic->getEvent(evt.getName()) != 0)
    chkdb(g_dic->dropEvent(evt.getName()) == 0);
  chkdb(g_dic->createEvent(evt) == 0);
  chkdb((g_evt = g_dic->getEvent(evt.getName())) != 0);
  g_dic = 0;
  return 0;
}

static int
dropevent()
{
  ll1("dropevent");
  g_dic = g_ndb->getDictionary();
  chkdb(g_dic->dropEvent(g_evt->getName()) == 0);
  g_evt = 0;
  g_dic = 0;
  return 0;
}

static int
createeventop()
{
  ll1("createeventop");
#ifdef version50
  uint bsz = 10 * g_opts.maxops;
  chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName(), bsz)) != 0);
#else
  chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0);
#endif
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    Data (&d)[2] = g_rec_ev->data;
    switch (c.type) {
    case NdbDictionary::Column::Unsigned:
    case NdbDictionary::Column::Char:
      chkdb((g_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i])) != 0);
      chkdb((g_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i])) != 0);
      break;
    default:
      assert(false);
      break;
    }
  }
  return 0;
}

static int
dropeventop()
{
  ll1("dropeventop");
  chkdb(g_ndb->dropEventOperation(g_evt_op) == 0);
  g_evt_op = 0;
  return 0;
}

static int
waitgci() // wait for event to be installed and for at least 1 GCI to pass
{
  const uint ngci = 3;
  ll1("waitgci " << ngci);
  Uint32 gci[2];
  uint i = 0;
  while (1) {
    chkdb((g_con = g_ndb->startTransaction()) != 0);
    { // forced to exec a dummy op
      Uint32 pk1;
      char pk2[g_charlen + 1];
      pk1 = g_maxpk;
      sprintf(pk2, "%-*u", g_charlen, pk1);
      chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
      chkdb(g_op->readTuple() == 0);
      chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
      chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0);
      chkdb(g_con->execute(Commit) == 0);
      g_op = 0;
    }
    gci[i] = g_con->getGCI();
    g_ndb->closeTransaction(g_con);
    g_con = 0;
    if (i == 1 && gci[0] + ngci <= gci[1]) {
      ll1("waitgci: " << gci[0] << " " << gci[1]);
      break;
    }
    i = 1;
    sleep(1);
  }
  return 0;
}

static int
makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
{
  op->type = t;
  if (t != Op::INS)
    copydata(prev_op->data[0], op->data[1], true, true);
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    Data (&d)[2] = op->data;
    if (c.pk && t == Op::DEL) {
      d[1].ind[i] = -1;
    }
    if (i == getcol("pk1").no) {
      d[0].pk1 = pk1;
      d[0].ind[i] = 0;
      continue;
    }
    if (i == getcol("pk2").no) {
      sprintf(d[0].pk2, "%-*u", g_charlen, d[0].pk1);
      d[0].ind[i] = 0;
      continue;
    }
    if (t == Op::DEL) {
      d[0].ind[i] = -1;
      continue;
    }
    if (i == getcol("seq").no) {
      d[0].seq = g_seq++;
      d[0].ind[i] = 0;
      continue;
    }
    uint u;
    u = urandom(100);
    if (c.nullable && u < 10) {
      d[0].ind[i] = 1;
      continue;
    }
    switch (c.type) {
    case NdbDictionary::Column::Unsigned:
      {
        u = urandom(0);
        Uint32* p = (Uint32*)d[0].ptr[i];
        *p = u;
      }
      break;
    case NdbDictionary::Column::Char:
      {
        u = urandom(g_charlen);
        char* p = (char*)d[0].ptr[i];
        uint j;
        for (j = 0; j < g_charlen; j++) {
          uint v = urandom(strlen(g_charval));
          p[j] = j < u ? g_charval[v] : 0x20;
        }
      }
      break;
    default:
      assert(false);
      break;
    }
    d[0].ind[i] = 0;
  }
  Uint32 pk1_tmp = ~(Uint32)0;
  chkrc(checkop(op, pk1_tmp) == 0);
  reqrc(pk1 == pk1_tmp);
  return 0;
}

static void
makeop(Op* tot_op, Op* com_op, Uint32 pk1, Op::Type t)
{
  Op tmp_op;
  tmp_op.kind = Op::OP;
  Op* op = getop();
  reqrc(makeop(op, pk1, t, tot_op) == 0);
  // add to end
  Op* last_op = com_op;
  while (last_op->next_op != 0)
    last_op = last_op->next_op;
  last_op->next_op = op;
  // merge into chain head
  tmp_op.init();
  reqrc(compop(com_op, op, &tmp_op) == 0);
  copyop(&tmp_op, com_op);
  // merge into total op
  tmp_op.init();
  reqrc(compop(tot_op, op, &tmp_op) == 0);
  copyop(&tmp_op, tot_op);
  // counts
  com_op->num_op += 1;
  tot_op->num_op += 1;
}

static void
makeops()
{
  ll1("makeops");
  uint resv = g_opts.opstr == 0 ? 2 * g_opts.maxpk : 0; // for final deletes
  uint next = g_opts.opstr == 0 ? g_maxcom : strlen(g_opts.opstr);
  Op tmp_op;
  tmp_op.kind = Op::OP;
  Uint32 pk1 = 0;
  while (getfreeops() >= resv + 2 + next && pk1 < g_opts.maxpk) {
    if (g_opts.opstr == 0)
      pk1 = urandom(g_opts.maxpk);
    ll2("makeops: pk1=" << pk1 << " free=" << getfreeops());
    // total op on the pk so far
    // optype either NUL=initial/deleted or INS=created
    Op* tot_op = g_pk_op[pk1];
    if (tot_op == 0)
      tot_op = g_pk_op[pk1] = getop(); //1
    assert(tot_op->type == Op::NUL || tot_op->type == Op::INS);
    // add new commit chain to end
    Op* last_com = tot_op;
    while (last_com->next_com != 0)
      last_com = last_com->next_com;
    Op* com_op = getop(); //2
    last_com->next_com = com_op;
    // length of random chain
    uint len = ~0;
    if (g_opts.opstr == 0)
      len = 1 + urandom(g_maxcom - 1);
    ll2("makeops: com chain");
    uint n = 0;
    while (1) {
      // random or from g_opts.opstr
      Op::Type t;
      if (g_opts.opstr == 0) {
        if (n == len)
          break;
        do {
          t = (Op::Type)urandom(g_optypes);
        } while (tot_op->type == Op::NUL && (t == Op::DEL || t == Op::UPD) ||
                 tot_op->type == Op::INS && t == Op::INS);
      } else {
        uint m = strlen(g_opts.opstr);
        uint k = tot_op->num_com + tot_op->num_op;
        assert(k < m);
        char c = g_opts.opstr[k];
        if (c == 'c') {
          if (k + 1 == m)
            pk1 += 1;
          break;
        }
        const char* p = "idu";
        const char* q = strchr(p, c);
        assert(q != 0);
        t = (Op::Type)(q - p);
      }
      makeop(tot_op, com_op, pk1, t);
      assert(tot_op->type == Op::NUL || tot_op->type == Op::INS);
      n++;
    }
    tot_op->num_com += 1;
  }
  assert(getfreeops() >= resv);
  // terminate with DEL if necessary
  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
    Op* tot_op = g_pk_op[pk1];
    if (tot_op == 0)
      continue;
    if (tot_op->type == Op::NUL)
      continue;
    assert(g_opts.opstr == 0);
    Op* last_com = tot_op;
    while (last_com->next_com != 0)
      last_com = last_com->next_com;
    Op* com_op = getop(); //1
    last_com->next_com = com_op;
    makeop(tot_op, com_op, pk1, Op::DEL);
    assert(tot_op->type == Op::NUL);
    tot_op->num_com += 1;
  }
}

static int
addndbop(Op* op)
{
  chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
  switch (op->type) {
  case Op::INS:
    chkdb(g_op->insertTuple() == 0);
    break;
  case Op::DEL:
    chkdb(g_op->deleteTuple() == 0);
    break;
  case Op::UPD:
    chkdb(g_op->updateTuple() == 0);
    break;
  default:
    assert(false);
    break;
  }
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    const Data& d = op->data[0];
    if (! c.pk)
      continue;
    chkdb(g_op->equal(c.name, (char*)d.ptr[i]) == 0);
  }
  if (op->type != Op::DEL) {
    for (i = 0; i < g_ncol; i++) {
      const Col& c = getcol(i);
      const Data& d = op->data[0];
      if (c.pk)
        continue;
      if (d.ind[i] == -1)
        continue;
      const char* ptr = d.ind[i] == 0 ? (char*)d.ptr[i] : 0;
      chkdb(g_op->setValue(c.name, ptr) == 0);
    }
  }
  g_op = 0;
  return 0;
}

static int
runops()
{
  ll1("runops");
  Uint32 pk1;
  const Op* com_op[g_maxpk];
  uint left = 0;
  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
    com_op[pk1] = 0;
    // total op on the pk
    Op* tot_op = g_pk_op[pk1];
    if (tot_op == 0)
      continue;
    // first commit chain
    assert(tot_op->next_com != 0);
    com_op[pk1] = tot_op->next_com;
    left++;
  }
  while (left != 0) {
    pk1 = urandom(g_opts.maxpk);
    if (com_op[pk1] == 0)
      continue;
    // do the ops in one transaction
    ll2("runops: pk1=" << pk1);
    chkdb((g_con = g_ndb->startTransaction()) != 0);
    // first op in chain
    Op* op = com_op[pk1]->next_op;
    assert(op != 0);
    while (op != 0) {
      ll2("add op:" << *op);
      chkrc(addndbop(op) == 0);
      op = op->next_op;
    }
    chkdb(g_con->execute(Commit) == 0);
    g_ndb->closeTransaction(g_con);
    g_con = 0;
    // next chain
    com_op[pk1] = com_op[pk1]->next_com;
    if (com_op[pk1] == 0) {
      assert(left != 0);
      left--;
    }
  }
  assert(left == 0);
  return 0;
}

static int
matchevent(Op* ev)
{
  Op::Type t = ev->type;
  Data (&d2)[2] = ev->data;
  // get PK
  Uint32 pk1 = d2[0].pk1;
  chkrc(pk1 < g_opts.maxpk);
  // on error repeat and print details
  uint loop = 0;
  while (loop <= 1) {
    uint g_loglevel = loop == 0 ? g_opts.loglevel : 2;
    ll1("matchevent: pk1=" << pk1 << " type=" << t);
    ll2("EVT: " << *ev);
    Op* tot_op = g_pk_op[pk1];
    Op* com_op = tot_op ? tot_op->next_com : 0;
    uint cnt = 0;
    bool ok = false;
    while (com_op != 0) {
      ll2("COM: " << *com_op);
      Op* op = com_op->next_op;
      assert(op != 0);
      while (op != 0) {
        ll2("---: " << *op);
        op = op->next_op;
      }
      if (com_op->type != Op::NUL) {
        const Data (&d1)[2] = com_op->data;
        if (cmpdata(d1, d2) == 0) {
          bool tmpok = true;
          if (com_op->type != t) {
            ll2("***: wrong type " << com_op->type << " != " << t);
            tmpok = false;
          }
          if (com_op->match) {
            ll2("***: duplicate match");
            tmpok = false;
          }
          if (cnt != g_ev_cnt[pk1]) {
            ll2("***: wrong pos " << cnt << " != " << g_ev_cnt[pk1]);
            tmpok = false;
          }
          if (tmpok) {
            ok = com_op->match = true;
            ll2("===: match");
          }
        }
        cnt++;
      }
      com_op = com_op->next_com;
    }
    if (ok) {
      ll1("matchevent: match");
      return 0;
    }
    ll1("matchevent: ERROR: no match");
    if (g_loglevel >= 2)
      return -1;
    loop++;
  }
  return 0;
}

static int
matchevents()
{
  uint nomatch = 0;
  Uint32 pk1;
  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
    Op* tot_ev = g_pk_ev[pk1];
    if (tot_ev == 0)
      continue;
    Op* com_ev = tot_ev->next_com;
    while (com_ev != 0) {
      if (matchevent(com_ev) < 0)
        nomatch++;
      g_ev_cnt[pk1]++;
      com_ev = com_ev->next_com;
    }
  }
  chkrc(nomatch == 0);
  return 0;
}

static int
matchops()
{
  Uint32 pk1;
  for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
    Op* tot_op = g_pk_op[pk1];
    if (tot_op == 0)
      continue;
    Op* com_op = tot_op->next_com;
    while (com_op != 0) {
      if (com_op->type != Op::NUL && ! com_op->match) {
        ll0("COM: " << *com_op);
        Op* op = com_op->next_op;
        assert(op != 0);
        while (op != 0) {
          ll0("---: " << *op);
          op = op->next_op;
        }
        ll0("no matching event");
        return -1;
      }
      com_op = com_op->next_com;
    }
  }
  return 0;
}

static int
runevents()
{
  ll1("runevents");
  NdbEventOperation* evt_op;
  uint mspoll = 1000;
  uint npoll = 7; // strangely long delay
  while (npoll != 0) {
    npoll--;
    int ret;
    ll1("poll");
    ret = g_ndb->pollEvents(mspoll);
    if (ret <= 0)
      continue;
    while (1) {
      g_rec_ev->init();
      Data (&d)[2] = g_rec_ev->data;
#ifdef version50
      int overrun = g_opts.maxops;
      chkdb((ret = g_evt_op->next(&overrun)) >= 0);
      chkrc(overrun == 0);
      if (ret == 0)
        break;
#else
      NdbEventOperation* tmp_op = g_ndb->nextEvent();
      if (tmp_op == 0)
        break;
      reqrc(g_evt_op == tmp_op);
#endif
      chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0);
      // get indicators
      { int i, j;
        for (j = 0; j < 2; j++)
          for (i = 0; i < g_ncol; i++)
            d[j].ind[i] = g_ra[j][i]->isNULL();
      }
      ll2("runevents: EVT: " << *g_rec_ev);
      // check basic sanity
      Uint32 pk1 = ~(Uint32)0;
      chkrc(checkop(g_rec_ev, pk1) == 0);
      // add to events
      chkrc(getfreeevs() >= 2);
      Op* tot_ev = g_pk_ev[pk1];
      if (tot_ev == 0)
        tot_ev = g_pk_ev[pk1] = getev(); //1
      Op* last_com = tot_ev;
      while (last_com->next_com != 0)
        last_com = last_com->next_com;
      // copy and add
      Op* ev = getev(); //3
      copyop(g_rec_ev, ev);
      last_com->next_com = ev;
    }
  }
  chkrc(matchevents() == 0);
  chkrc(matchops() == 0);
  return 0;
}

static void
setseed(int n)
{
  uint seed;
  if (n == -1) {
    if (g_opts.seed == 0)
      return;
    if (g_opts.seed != -1)
      seed = (uint)g_opts.seed;
    else
      seed = 1 + (ushort)getpid();
  } else {
    if (g_opts.seed != 0)
      return;
    seed = n;
  }
  ll0("seed=" << seed);
  srandom(seed);
}

static int
runtest()
{
  setseed(-1);
  chkrc(createtable() == 0);
  chkrc(createevent() == 0);
  uint n;
  for (n = 0; g_opts.loop == 0 || n < g_opts.loop; n++) {
    ll0("loop " << n);
    setseed(n);
    resetmem();
    g_rec_ev = getev();
    chkrc(createeventop() == 0);
    chkdb(g_evt_op->execute() == 0);
    chkrc(waitgci() == 0);
    makeops();
    chkrc(runops() == 0);
    chkrc(runevents() == 0);
    chkrc(dropeventop() == 0);
  }
  chkrc(dropevent() == 0);
  chkrc(droptable() == 0);
  return 0;
}

NDB_STD_OPTS_VARS;

static struct my_option
my_long_options[] =
{
  NDB_STD_OPTS("test_event_merge"),
  { "abort-on-error", 1008, "Do abort() on any error",
    (gptr*)&g_opts.abort_on_error, (gptr*)&g_opts.abort_on_error, 0,
    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
  { "loglevel", 1001, "Logging level in this program (default 0)",
    (gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0,
    GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
  { "loop", 1002, "Number of test loops (default 1, 0=forever)",
    (gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0,
    GET_INT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0 },
  { "maxops", 1003, "Number of PK operations (default 2000)",
    (gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0,
    GET_UINT, REQUIRED_ARG, 2000, 0, g_maxops, 0, 0, 0 },
  { "maxpk", 1004, "Number of different PK values (default 10)",
    (gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0,
    GET_UINT, REQUIRED_ARG, 10, 1, g_maxpk, 0, 0, 0 },
  { "opstr", 1005, "Ops to run e.g. idiucdc (c = commit, default random)",
    (gptr*)&g_opts.opstr, (gptr*)&g_opts.opstr, 0,
    GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
  { "seed", 1006, "Random seed (0=loop number, default -1=random)",
    (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0,
    GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
  { "separate-events", 1007, "Do not combine events per GCI >5.0",
    (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0,
    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
  { "use-table", 1008, "Use existing table 'tem1'",
    (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0,
    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
  { 0, 0, 0,
    0, 0, 0,
    GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
};

static void
usage()
{
  my_print_help(my_long_options);
}

static int
checkopts()
{
  if (g_opts.opstr != 0) {
    const char* s = g_opts.opstr;
    uint n = strlen(s);
    if (n < 3 || s[0] != 'i' || s[n-2] != 'd' || s[n-1] != 'c')
      return -1;
    while (*s != 0)
      if (strchr("iduc", *s++) == 0)
        return -1;
  }
  return 0;
}

int
main(int argc, char** argv)
{
  ndb_init();
  const char* progname =
    strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0];
  uint i;
  ndbout << progname;
  for (i = 1; i < argc; i++)
    ndbout << " " << argv[i];
  ndbout << endl;
  int ret;
  ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
  if (ret != 0 || argc != 0 || checkopts() != 0)
    return NDBT_ProgramExit(NDBT_WRONGARGS);
  g_ncc = new Ndb_cluster_connection();
  if (g_ncc->connect(30) == 0) {
    g_ndb = new Ndb(g_ncc, "TEST_DB");
    if (g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0) {
      if (runtest() == 0)
        return NDBT_ProgramExit(NDBT_OK);
    }
  }
  if (g_evt_op != 0) {
    (void)dropeventop();
    g_evt_op = 0;
  }
  delete g_ndb;
  delete g_ncc;
  return NDBT_ProgramExit(NDBT_FAILED);
}