Commit d6676b0f authored by unknown's avatar unknown

Merged ha_ndbcluster.cc


sql/ha_ndbcluster.h:
  Auto merged
parents 075eb338 e6bec02e
...@@ -25,11 +25,59 @@ pk1 attr1 ...@@ -25,11 +25,59 @@ pk1 attr1
DELETE FROM t1; DELETE FROM t1;
SELECT * FROM t1; SELECT * FROM t1;
pk1 attr1 pk1 attr1
INSERT INTO t1 VALUES (9410,9412); INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765),
(7,8), (8,9), (9,10), (10,11), (11,12), (12,13), (13,14);
UPDATE t1 SET attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9999
8 9999
9 9999
10 9999
11 9999
12 9999
13 9999
9408 9999
9410 9999
9411 9999
UPDATE t1 SET attr1 = 9998 WHERE pk1 < 1000;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9999
9410 9999
9411 9999
UPDATE t1 SET attr1 = 9997 WHERE attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9997
9410 9997
9411 9997
DELETE FROM t1 WHERE pk1 = 9410; DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1; SELECT * FROM t1 ORDER BY pk1;
pk1 attr1 pk1 attr1
INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765); 7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9997
9411 9997
DELETE FROM t1; DELETE FROM t1;
SELECT * FROM t1; SELECT * FROM t1;
pk1 attr1 pk1 attr1
......
...@@ -23,6 +23,7 @@ SELECT pk1 FROM t1; ...@@ -23,6 +23,7 @@ SELECT pk1 FROM t1;
SELECT * FROM t1; SELECT * FROM t1;
SELECT t1.* FROM t1; SELECT t1.* FROM t1;
# Update on record by primary key
UPDATE t1 SET attr1=1 WHERE pk1=9410; UPDATE t1 SET attr1=1 WHERE pk1=9410;
SELECT * FROM t1; SELECT * FROM t1;
...@@ -35,13 +36,23 @@ SELECT * FROM t1; ...@@ -35,13 +36,23 @@ SELECT * FROM t1;
DELETE FROM t1; DELETE FROM t1;
SELECT * FROM t1; SELECT * FROM t1;
# Delete the record by specifying pk # Insert more records and update them all at once
INSERT INTO t1 VALUES (9410,9412); INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765),
(7,8), (8,9), (9,10), (10,11), (11,12), (12,13), (13,14);
UPDATE t1 SET attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
UPDATE t1 SET attr1 = 9998 WHERE pk1 < 1000;
SELECT * FROM t1 ORDER BY pk1;
UPDATE t1 SET attr1 = 9997 WHERE attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
# Delete one record by specifying pk
DELETE FROM t1 WHERE pk1 = 9410; DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1; SELECT * FROM t1 ORDER BY pk1;
# Insert three records and delete the # Delete all from table
INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765);
DELETE FROM t1; DELETE FROM t1;
SELECT * FROM t1; SELECT * FROM t1;
......
...@@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata() ...@@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
inline int ha_ndbcluster::get_ndb_lock_type()
{
return (int)((m_lock.type == TL_WRITE_ALLOW_WRITE) ?
NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read);
}
static const ulong index_type_flags[]= static const ulong index_type_flags[]=
{ {
/* UNDEFINED_INDEX */ /* UNDEFINED_INDEX */
...@@ -652,22 +659,61 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -652,22 +659,61 @@ int ha_ndbcluster::unique_index_read(const byte *key,
} }
/* /*
Get the next record of a started scan Get the next record of a started scan. Try to fetch
it locally from NdbApi cached records if possible,
otherwise ask NDB for more.
NOTE
If this is a update/delete make sure to not contact
NDB before any pending ops have been sent to NDB.
*/ */
inline int ha_ndbcluster::next_result(byte *buf) inline int ha_ndbcluster::next_result(byte *buf)
{ {
int check;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet *cursor= m_active_cursor; NdbResultSet *cursor= m_active_cursor;
DBUG_ENTER("next_result"); DBUG_ENTER("next_result");
if (cursor->nextResult() == 0) if (!cursor)
{ DBUG_RETURN(HA_ERR_END_OF_FILE);
// One more record found
unpack_record(buf); /*
table->status= 0; If this an update or delete, call nextResult with false
DBUG_RETURN(0); to process any records already cached in NdbApi
} */
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
check= cursor->nextResult(contact_ndb);
if (check == 0)
{
// One more record found
DBUG_PRINT("info", ("One more record found"));
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
}
else if (check == 1 || check == 2)
{
// 1: No more records
// 2: No more cached records
/*
Before fetching more rows and releasing lock(s),
all pending update or delete operations should
be sent to NDB
*/
DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
if (ops_pending && trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
ops_pending= 0;
contact_ndb= (check == 2);
}
} while (check == 2);
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
if (ndb_err(trans)) if (ndb_err(trans))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -739,28 +785,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op, ...@@ -739,28 +785,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
/* /*
Read record(s) from NDB using ordered index scan Start ordered index scan in NDB
*/ */
int ha_ndbcluster::ordered_index_scan(const key_range *start_key, int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool sorted, byte* buf) bool sorted, byte* buf)
{ {
uint no_fields= table->fields;
uint i;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet *cursor= m_active_cursor; NdbResultSet *cursor;
NdbScanOperation *op; NdbScanOperation *op;
const char *index_name; const char *index_name;
THD* thd = current_thd;
DBUG_ENTER("ordered_index_scan"); DBUG_ENTER("ordered_index_scan");
DBUG_PRINT("enter", ("index: %u", active_index)); DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
index_name= get_index_name(active_index); index_name= get_index_name(active_index);
if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(parallelism))) if (!(cursor=
op->readTuples(parallelism,
(NdbCursorOperation::LockMode)get_ndb_lock_type())))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
...@@ -817,22 +863,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -817,22 +863,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
if (trans->execute(NoCommit) != 0) if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully")); DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf)); DBUG_RETURN(define_read_attrs(buf, op));
} }
#if 0
/* /*
Read record(s) from NDB using full table scan with filter Start a filtered scan in NDB.
NOTE
This function is here as an example of how to start a
filtered scan. It should be possible to replace full_table_scan
with this function and make a best effort attempt
at filtering out the irrelevant data by converting the "items"
into interpreted instructions.
This would speed up table scans where there is a limiting WHERE clause
that doesn't match any index in the table.
*/ */
int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
byte *buf, byte *buf,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
uint no_fields= table->fields;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet *cursor= m_active_cursor; NdbResultSet *cursor;
NdbScanOperation *op;
DBUG_ENTER("filtered_scan"); DBUG_ENTER("filtered_scan");
DBUG_PRINT("enter", ("key_len: %u, index: %u", DBUG_PRINT("enter", ("key_len: %u, index: %u",
...@@ -840,12 +895,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -840,12 +895,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("info", ("Starting a new filtered scan on %s", DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname)); m_tabname));
NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
if (!op) if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor=
cursor= op->readTuples(parallelism); op->readTuples(parallelism,
if (!cursor) (NdbCursorOperation::LockMode)get_ndb_lock_type())))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
...@@ -894,60 +949,44 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -894,60 +949,44 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf.end(); sf.end();
} }
// Define attributes to read DBUG_RETURN(define_read_attrs(buf, op));
for (uint field_no= 0; field_no < no_fields; field_no++)
{
Field *field= table->field[field_no];
// Read attribute
DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
if (get_ndb_value(op, field_no, field->ptr))
ERR_RETURN(op->getNdbError());
}
if (table->primary_key == MAX_KEY)
{
DBUG_PRINT("info", ("Getting hidden key"));
// Scanning table with no primary key
int hidden_no= no_fields;
#ifndef DBUG_OFF
const NDBTAB *tab= (NDBTAB *) m_table;
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
if (get_ndb_value(op, hidden_no, NULL))
ERR_RETURN(op->getNdbError());
}
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
} }
#endif
/* /*
Read records from NDB using full table scan Start full table scan in NDB
*/ */
int ha_ndbcluster::full_table_scan(byte *buf) int ha_ndbcluster::full_table_scan(byte *buf)
{ {
uint i; uint i;
THD *thd= current_thd;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor; NdbResultSet *cursor;
NdbScanOperation *op; NdbScanOperation *op;
NdbConnection *trans= m_active_trans;
DBUG_ENTER("full_table_scan"); DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation(m_tabname))) if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(parallelism))) if (!(cursor=
op->readTuples(parallelism,
(NdbCursorOperation::LockMode)get_ndb_lock_type())))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
}
inline
int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
{
uint i;
THD *thd= current_thd;
NdbConnection *trans= m_active_trans;
DBUG_ENTER("define_read_attrs");
// Define attributes to read // Define attributes to read
for (i= 0; i < table->fields; i++) for (i= 0; i < table->fields; i++)
{ {
...@@ -1042,7 +1081,8 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1042,7 +1081,8 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected! Find out how this is detected!
*/ */
rows_inserted++; rows_inserted++;
if ((rows_inserted % bulk_insert_rows) == 0) if ((rows_inserted == rows_to_insert) ||
((rows_inserted % bulk_insert_rows) == 0))
{ {
// Send rows to NDB // Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\ DBUG_PRINT("info", ("Sending inserts to NDB, "\
...@@ -1097,6 +1137,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1097,6 +1137,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{ {
THD *thd= current_thd; THD *thd= current_thd;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet* cursor= m_active_cursor;
NdbOperation *op; NdbOperation *op;
uint i; uint i;
DBUG_ENTER("update_row"); DBUG_ENTER("update_row");
...@@ -1105,49 +1146,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1105,49 +1146,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (table->timestamp_on_update_now) if (table->timestamp_on_update_now)
update_timestamp(new_data+table->timestamp_on_update_now-1); update_timestamp(new_data+table->timestamp_on_update_now-1);
if (!(op= trans->getNdbOperation(m_tabname)) || /* Check for update of primary key and return error */
op->updateTuple() != 0) if ((table->primary_key != MAX_KEY) &&
ERR_RETURN(trans->getNdbError()); (key_cmp(table->primary_key, old_data, new_data)))
DBUG_RETURN(HA_ERR_UNSUPPORTED);
if (table->primary_key == MAX_KEY)
{ if (cursor)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
if (set_hidden_key(op, no_fields, rec->aRef()))
ERR_RETURN(op->getNdbError());
}
else
{ {
/* Check for update of primary key and return error */ /*
if (key_cmp(table->primary_key, old_data, new_data)) We are scanning records and want to update the record
DBUG_RETURN(HA_ERR_UNSUPPORTED); that was just found, call updateTuple on the cursor
to take over the lock to a new update operation
int res; And thus setting the primary key of the record from
if ((res= set_primary_key(op, old_data + table->null_bytes))) the active record in cursor
DBUG_RETURN(res); */
DBUG_PRINT("info", ("Calling updateTuple on cursor"));
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
ops_pending++;
}
else
{
if (!(op= trans->getNdbOperation(m_tabname)) ||
op->updateTuple() != 0)
ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
if (set_hidden_key(op, no_fields, rec->aRef()))
ERR_RETURN(op->getNdbError());
}
else
{
int res;
if ((res= set_primary_key(op, old_data + table->null_bytes)))
DBUG_RETURN(res);
}
} }
// Set non-key attribute(s) // Set non-key attribute(s)
for (i= 0; i < table->fields; i++) for (i= 0; i < table->fields; i++)
{ {
Field *field= table->field[i]; Field *field= table->field[i];
if ((thd->query_id == field->query_id) && if ((thd->query_id == field->query_id) &&
(!(field->flags & PRI_KEY_FLAG)) && (!(field->flags & PRI_KEY_FLAG)) &&
set_ndb_value(op, field, i)) set_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
} }
// Execute update operation // Execute update operation
if (trans->execute(NoCommit) != 0) if (!cursor && trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -1161,39 +1219,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1161,39 +1219,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
int ha_ndbcluster::delete_row(const byte *record) int ha_ndbcluster::delete_row(const byte *record)
{ {
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet* cursor= m_active_cursor;
NdbOperation *op; NdbOperation *op;
DBUG_ENTER("delete_row"); DBUG_ENTER("delete_row");
statistic_increment(ha_delete_count,&LOCK_status); statistic_increment(ha_delete_count,&LOCK_status);
if (!(op=trans->getNdbOperation(m_tabname)) || if (cursor)
op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
{ {
// This table has no primary key, use "hidden" primary key /*
DBUG_PRINT("info", ("Using hidden key")); We are scanning records and want to update the record
uint no_fields= table->fields; that was just found, call deleteTuple on the cursor
NdbRecAttr* rec= m_value[no_fields]; to take over the lock to a new update operation
DBUG_ASSERT(rec != NULL); And thus setting the primary key of the record from
the active record in cursor
*/
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
if (cursor->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
ops_pending++;
if (set_hidden_key(op, no_fields, rec->aRef())) // If deleting from cursor, NoCommit will be handled in next_result
ERR_RETURN(op->getNdbError()); DBUG_RETURN(0);
} }
else else
{ {
int res;
if ((res= set_primary_key(op))) if (!(op=trans->getNdbOperation(m_tabname)) ||
return res; op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef()))
ERR_RETURN(op->getNdbError());
}
else
{
int res;
if ((res= set_primary_key(op)))
return res;
}
} }
// Execute delete operation // Execute delete operation
if (trans->execute(NoCommit) != 0) if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /*
Unpack a record read from NDB Unpack a record read from NDB
...@@ -1481,11 +1561,7 @@ int ha_ndbcluster::index_next(byte *buf) ...@@ -1481,11 +1561,7 @@ int ha_ndbcluster::index_next(byte *buf)
int error = 1; int error = 1;
statistic_increment(ha_read_next_count,&LOCK_status); statistic_increment(ha_read_next_count,&LOCK_status);
if (!m_active_cursor) DBUG_RETURN(next_result(buf));
error= HA_ERR_END_OF_FILE;
else
error = next_result(buf);
DBUG_RETURN(error);
} }
...@@ -1519,7 +1595,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key, ...@@ -1519,7 +1595,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
{ {
KEY* key_info; KEY* key_info;
int error= 1; int error= 1;
byte* buf = table->record[0]; byte* buf= table->record[0];
DBUG_ENTER("ha_ndbcluster::read_range_first"); DBUG_ENTER("ha_ndbcluster::read_range_first");
DBUG_PRINT("info", ("sorted: %d", sorted)); DBUG_PRINT("info", ("sorted: %d", sorted));
...@@ -1548,6 +1624,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key, ...@@ -1548,6 +1624,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
DBUG_RETURN(error); DBUG_RETURN(error);
} }
int ha_ndbcluster::read_range_next(bool eq_range) int ha_ndbcluster::read_range_next(bool eq_range)
{ {
DBUG_ENTER("ha_ndbcluster::read_range_next"); DBUG_ENTER("ha_ndbcluster::read_range_next");
...@@ -1587,12 +1664,10 @@ int ha_ndbcluster::rnd_next(byte *buf) ...@@ -1587,12 +1664,10 @@ int ha_ndbcluster::rnd_next(byte *buf)
{ {
DBUG_ENTER("rnd_next"); DBUG_ENTER("rnd_next");
statistic_increment(ha_read_rnd_next_count, &LOCK_status); statistic_increment(ha_read_rnd_next_count, &LOCK_status);
int error = 1;
if (!m_active_cursor) if (!m_active_cursor)
error = full_table_scan(buf); DBUG_RETURN(full_table_scan(buf));
else DBUG_RETURN(next_result(buf));
error = next_result(buf);
DBUG_RETURN(error);
} }
...@@ -1920,6 +1995,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, ...@@ -1920,6 +1995,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
m_lock.type=lock_type; m_lock.type=lock_type;
} }
*to++= &m_lock; *to++= &m_lock;
DBUG_PRINT("exit", ("lock_type: %d", lock_type));
DBUG_RETURN(to); DBUG_RETURN(to);
} }
...@@ -2034,8 +2111,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -2034,8 +2111,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
(NdbConnection*)thd->transaction.stmt.ndb_tid; (NdbConnection*)thd->transaction.stmt.ndb_tid;
DBUG_ASSERT(m_active_trans); DBUG_ASSERT(m_active_trans);
// Start of transaction
retrieve_all_fields= FALSE; retrieve_all_fields= FALSE;
ops_pending= 0;
} }
else else
{ {
...@@ -2087,7 +2165,9 @@ int ha_ndbcluster::start_stmt(THD *thd) ...@@ -2087,7 +2165,9 @@ int ha_ndbcluster::start_stmt(THD *thd)
} }
m_active_trans= trans; m_active_trans= trans;
// Start of statement
retrieve_all_fields= FALSE; retrieve_all_fields= FALSE;
ops_pending= 0;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -2568,7 +2648,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -2568,7 +2648,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
retrieve_all_fields(FALSE), retrieve_all_fields(FALSE),
rows_to_insert(0), rows_to_insert(0),
rows_inserted(0), rows_inserted(0),
bulk_insert_rows(1024) bulk_insert_rows(1024),
ops_pending(0)
{ {
int i; int i;
......
...@@ -152,6 +152,7 @@ class ha_ndbcluster: public handler ...@@ -152,6 +152,7 @@ class ha_ndbcluster: public handler
const char* get_unique_index_name(uint idx_no) const; const char* get_unique_index_name(uint idx_no) const;
NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
int get_ndb_lock_type();
int pk_read(const byte *key, uint key_len, int pk_read(const byte *key, uint key_len,
byte *buf); byte *buf);
...@@ -162,12 +163,10 @@ class ha_ndbcluster: public handler ...@@ -162,12 +163,10 @@ class ha_ndbcluster: public handler
bool sorted, byte* buf); bool sorted, byte* buf);
int full_table_scan(byte * buf); int full_table_scan(byte * buf);
int next_result(byte *buf); int next_result(byte *buf);
#if 0 int define_read_attrs(byte* buf, NdbOperation* op);
int filtered_scan(const byte *key, uint key_len, int filtered_scan(const byte *key, uint key_len,
byte *buf, byte *buf,
enum ha_rkey_function find_flag); enum ha_rkey_function find_flag);
#endif
void unpack_record(byte *buf); void unpack_record(byte *buf);
void set_dbname(const char *pathname); void set_dbname(const char *pathname);
...@@ -212,6 +211,7 @@ class ha_ndbcluster: public handler ...@@ -212,6 +211,7 @@ class ha_ndbcluster: public handler
ha_rows rows_to_insert; ha_rows rows_to_insert;
ha_rows rows_inserted; ha_rows rows_inserted;
ha_rows bulk_insert_rows; ha_rows bulk_insert_rows;
ha_rows ops_pending;
}; };
bool ndbcluster_init(void); bool ndbcluster_init(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment