/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /** * ndbapi_example5.cpp: Using API level events in NDB API */ #include <NdbApi.hpp> // Used for cout #include <stdio.h> #include <iostream> #include <unistd.h> /** * * Assume that there is a table TAB0 which is being updated by * another process (e.g. flexBench -l 0 -stdtables). * We want to monitor what happens with columns COL0, COL2, COL11 * * or together with the mysqlcluster client; * * shell> mysqlcluster -u root * mysql> create database TEST_DB; * mysql> use TEST_DB; * mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int); * * In another window start ndbapi_example5, wait until properly started * * mysql> insert into TAB0 values (1,2,3); * mysql> insert into TAB0 values (2,2,3); * mysql> insert into TAB0 values (3,2,9); * mysql> * * you should see the data popping up in the example window * */ #define APIERROR(error) \ { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \ << error.code << ", msg: " << error.message << "." << std::endl; \ exit(-1); } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventComlumnName, const int noEventComlumnName); int main() { ndb_init(); Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(); // Object representing the cluster int r= cluster_connection->connect(5 /* retries */, 3 /* delay between retries */, 1 /* verbose */); if (r > 0) { std::cout << "Cluster connect failed, possibly resolved with more retries.\n"; exit(-1); } else if (r < 0) { std::cout << "Cluster connect failed.\n"; exit(-1); } if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } Ndb* myNdb= new Ndb(cluster_connection, "TEST_DB"); // Object representing the database if (myNdb->init() == -1) { APIERROR(myNdb->getNdbError()); exit(-1); } NdbDictionary::Dictionary *myDict; const char *eventName= "CHNG_IN_TAB0"; const char *eventTableName= "TAB0"; const int noEventColumnName= 3; const char *eventColumnName[noEventColumnName] = {"COL0", "COL1", "COL11"}; myDict = myNdb->getDictionary(); // Create events myCreateEvent(myNdb, eventName, eventTableName, eventColumnName, noEventColumnName); int j = 0; while (j < 5) { // Start "transaction" for handling events NdbEventOperation* op; printf("create EventOperation\n"); if ((op = myNdb->createEventOperation(eventName,100)) == NULL) { printf("Event operation creation failed\n"); exit(-1); } printf("get values\n"); NdbRecAttr* recAttr[noEventColumnName]; NdbRecAttr* recAttrPre[noEventColumnName]; // primary keys should always be a part of the result for (int i = 0; i < noEventColumnName; i++) { recAttr[i] = op->getValue(eventColumnName[i]); recAttrPre[i] = op->getPreValue(eventColumnName[i]); } // set up the callbacks printf("execute\n"); if (op->execute()) { // This starts changes to "start flowing" printf("operationd execution failed\n"); exit(-1); } int i = 0; while(i < 40) { //printf("now waiting for event...\n"); int r = myNdb->pollEvents(1000); // wait for event or 1000 ms if (r>0) { //printf("got data! %d\n", r); int overrun; while (op->next(&overrun) > 0) { i++; if (!op->isConsistent()) printf("A node failiure has occured and events might be missing\n"); switch (op->getEventType()) { case NdbDictionary::Event::TE_INSERT: printf("%u INSERT: ", i); break; case NdbDictionary::Event::TE_DELETE: printf("%u DELETE: ", i); break; case NdbDictionary::Event::TE_UPDATE: printf("%u UPDATE: ", i); break; default: abort(); // should not happen } printf("overrun %u pk %u: ", overrun, recAttr[0]->u_32_value()); for (int i = 1; i < noEventColumnName; i++) { if (recAttr[i]->isNULL() >= 0) { // we have a value printf(" post[%u]=", i); if (recAttr[i]->isNULL() == 0) // we have a non-null value printf("%u", recAttr[i]->u_32_value()); else // we have a null value printf("NULL"); } if (recAttrPre[i]->isNULL() >= 0) { // we have a value printf(" post[%u]=", i); if (recAttrPre[i]->isNULL() == 0) // we have a non-null value printf("%u", recAttrPre[i]->u_32_value()); else // we have a null value printf("NULL"); } } printf("\n"); } } else ;//printf("timed out\n"); } // don't want to listen to eventsanymore myNdb->dropEventOperation(op); j++; } myDict->dropEvent(eventName); // remove event from database delete myNdb; delete cluster_connection; ndb_end(0); return 0; } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnName, const int noEventColumnName) { NdbDictionary::Dictionary *myDict = myNdb->getDictionary(); if (!myDict) { printf("Event Creation failedDictionary not found"); exit(-1); } NdbDictionary::Event myEvent(eventName); myEvent.setTable(eventTableName); myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); for (int i = 0; i < noEventColumnName; i++) myEvent.addEventColumn(eventColumnName[i]); int res = myDict->createEvent(myEvent); // Add event to database if (res == 0) myEvent.print(); else { printf("Event creation failed\n"); printf("trying drop Event, maybe event exists\n"); res = myDict->dropEvent(eventName); if (res) exit(-1); // try again res = myDict->createEvent(myEvent); // Add event to database if (res) exit(-1); } return res; }