Commit c2ec05d1 authored by unknown's avatar unknown

wl#2126 - read_multi_range ndb part


ndb/include/ndbapi/NdbConnection.hpp:
  Utility methods
ndb/include/ndbapi/NdbOperation.hpp:
  Utility methods
ndb/src/ndbapi/NdbConnection.cpp:
  Don't set theReturnStatus all_the_time, but let it be decided on 
  a operation bases
ndb/src/ndbapi/NdbOperationExec.cpp:
  AbortOption
sql/ha_ndbcluster.cc:
  1) removed execute from define_read_attr
  2) let a bunch of methods use define_read_attr
  3) impl. read_multi_range
sql/ha_ndbcluster.h:
  read_multi_range
parent effa1ed5
...@@ -430,6 +430,10 @@ public: ...@@ -430,6 +430,10 @@ public:
*/ */
const NdbOperation * getNextCompletedOperation(const NdbOperation * op)const; const NdbOperation * getNextCompletedOperation(const NdbOperation * op)const;
const NdbOperation* getFirstDefinedOperation()const{return theFirstOpInList;}
const NdbOperation* getLastDefinedOperation()const{return theLastOpInList;}
/** @} *********************************************************************/ /** @} *********************************************************************/
/** /**
......
...@@ -718,6 +718,7 @@ public: ...@@ -718,6 +718,7 @@ public:
}; };
LockMode getLockMode() const { return theLockMode; } LockMode getLockMode() const { return theLockMode; }
void setAbortOption(Int8 ao) { m_abortOption = ao; }
/** /**
* Set/get distribution/partition key * Set/get distribution/partition key
...@@ -746,10 +747,13 @@ protected: ...@@ -746,10 +747,13 @@ protected:
void initInterpreter(); void initInterpreter();
void next(NdbOperation*); // Set next pointer void next(NdbOperation*); // Set next pointer
NdbOperation* next(); // Get next pointer NdbOperation* next(); // Get next pointer
public:
const NdbOperation* next() const;
protected:
enum OperationStatus{ enum OperationStatus
{
Init, Init,
OperationDefined, OperationDefined,
TupleKeyDefined, TupleKeyDefined,
...@@ -995,6 +999,12 @@ NdbOperation::next() ...@@ -995,6 +999,12 @@ NdbOperation::next()
return theNext; return theNext;
} }
inline
const NdbOperation*
NdbOperation::next() const
{
return theNext;
}
/****************************************************************************** /******************************************************************************
OperationStatus Status(); OperationStatus Status();
......
...@@ -232,21 +232,6 @@ Remark: Handle time-out on a transaction object. ...@@ -232,21 +232,6 @@ Remark: Handle time-out on a transaction object.
void void
NdbConnection::handleExecuteCompletion() NdbConnection::handleExecuteCompletion()
{ {
if (theCompletionStatus == CompletedFailure) {
NdbOperation* tOpTemp = theFirstExecOpInList;
while (tOpTemp != NULL) {
/*****************************************************************************
* Ensure that all executing operations report failed for each
* read attribute when failure occurs.
* We do not want any operations to report both failure and
* success on different read attributes.
****************************************************************************/
tOpTemp->handleFailedAI_ElemLen();
tOpTemp = tOpTemp->next();
}//while
theReturnStatus = ReturnFailure;
}//if
/*************************************************************************** /***************************************************************************
* Move the NdbOperation objects from the list of executing * Move the NdbOperation objects from the list of executing
* operations to list of completed * operations to list of completed
...@@ -1512,6 +1497,7 @@ transactions. ...@@ -1512,6 +1497,7 @@ transactions.
/**********************************************************************/ /**********************************************************************/
theCompletionStatus = CompletedFailure; theCompletionStatus = CompletedFailure;
theCommitStatus = Aborted; theCommitStatus = Aborted;
theReturnStatus = ReturnFailure;
return 0; return 0;
} else { } else {
#ifdef NDB_NO_DROPPED_SIGNAL #ifdef NDB_NO_DROPPED_SIGNAL
......
...@@ -543,10 +543,11 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) ...@@ -543,10 +543,11 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
theStatus = Finished; theStatus = Finished;
// blobs want this // blobs want this
if (m_abortOption != IgnoreError) if (m_abortOption != IgnoreError)
{
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; theNdbCon->theReturnStatus = NdbConnection::ReturnFailure;
}
theError.code = aSignal->readData(4); theError.code = aSignal->readData(4);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), m_abortOption); theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), ao);
if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read
return theNdbCon->OpCompleteFailure(ao, m_abortOption != IgnoreError); return theNdbCon->OpCompleteFailure(ao, m_abortOption != IgnoreError);
......
...@@ -55,8 +55,9 @@ static const char *ha_ndb_ext=".ndb"; ...@@ -55,8 +55,9 @@ static const char *ha_ndb_ext=".ndb";
#define ERR_RETURN(err) \ #define ERR_RETURN(err) \
{ \ { \
ERR_PRINT(err); \ const NdbError& tmp= err; \
DBUG_RETURN(ndb_to_mysql_error(&err)); \ ERR_PRINT(tmp); \
DBUG_RETURN(ndb_to_mysql_error(&tmp)); \
} }
// Typedefs for long names // Typedefs for long names
...@@ -980,20 +981,21 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op) ...@@ -980,20 +981,21 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op)
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
{ {
int res;
DBUG_ENTER("pk_read");
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
uint no_fields= table->fields, i; uint no_fields= table->fields, i;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbOperation *op; NdbOperation *op;
THD *thd= current_thd; THD *thd= current_thd;
DBUG_ENTER("pk_read");
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple(lm) != 0) op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY) if (table->primary_key == MAX_KEY)
{ {
// This table has no primary key, use "hidden" primary key // This table has no primary key, use "hidden" primary key
...@@ -1001,34 +1003,19 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -1001,34 +1003,19 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_DUMP("key", (char*)key, 8); DBUG_DUMP("key", (char*)key, 8);
if (set_hidden_key(op, no_fields, key)) if (set_hidden_key(op, no_fields, key))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
// Read key at the same time, for future reference // Read key at the same time, for future reference
if (get_ndb_value(op, NULL, no_fields, NULL)) if (get_ndb_value(op, NULL, no_fields, NULL))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
} }
else else
{ {
int res;
if ((res= set_primary_key(op, key))) if ((res= set_primary_key(op, key)))
return res; return res;
} }
// Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS if((res= define_read_attrs(buf, op)))
for (i= 0; i < no_fields; i++) DBUG_RETURN(res);
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
m_retrieve_all_fields)
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(trans->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i].ptr= NULL;
}
}
if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans) != 0)
{ {
...@@ -1042,7 +1029,6 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -1042,7 +1029,6 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /*
Read one complementing record from NDB using primary key from old_data Read one complementing record from NDB using primary key from old_data
*/ */
...@@ -1101,6 +1087,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -1101,6 +1087,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
int ha_ndbcluster::unique_index_read(const byte *key, int ha_ndbcluster::unique_index_read(const byte *key,
uint key_len, byte *buf) uint key_len, byte *buf)
{ {
int res;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbIndexOperation *op; NdbIndexOperation *op;
THD *thd= current_thd; THD *thd= current_thd;
...@@ -1133,22 +1120,8 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -1133,22 +1120,8 @@ int ha_ndbcluster::unique_index_read(const byte *key,
key_ptr+= key_part->length; key_ptr+= key_part->length;
} }
// Get non-index attribute(s) if((res= define_read_attrs(buf, op)))
for (i= 0; i < table->fields; i++) DBUG_RETURN(res);
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(op->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i].ptr= NULL;
}
}
if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans) != 0)
{ {
...@@ -1446,11 +1419,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) ...@@ -1446,11 +1419,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (get_ndb_value(op, NULL, hidden_no, NULL)) if (get_ndb_value(op, NULL, hidden_no, NULL))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
} }
DBUG_RETURN(0);
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
} }
/* /*
...@@ -1461,6 +1430,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1461,6 +1430,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool sorted, byte* buf) bool sorted, byte* buf)
{ {
int res;
bool restart; bool restart;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet *cursor; NdbResultSet *cursor;
...@@ -1497,23 +1467,21 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1497,23 +1467,21 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
{ {
const key_range *keys[2]= { start_key, end_key }; const key_range *keys[2]= { start_key, end_key };
int ret= set_bounds(op, keys); res= set_bounds(op, keys);
if (ret) if (res)
DBUG_RETURN(ret); DBUG_RETURN(res);
} }
if (!restart) if (!restart && (res= define_read_attrs(buf, op)))
{ {
DBUG_RETURN(define_read_attrs(buf, op)); DBUG_RETURN(res);
} }
else
{ if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans) != 0) DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf));
DBUG_RETURN(next_result(buf)); }
}
}
/* /*
Start a filtered scan in NDB. Start a filtered scan in NDB.
...@@ -1533,6 +1501,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -1533,6 +1501,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
byte *buf, byte *buf,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
int res;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet *cursor; NdbResultSet *cursor;
NdbScanOperation *op; NdbScanOperation *op;
...@@ -1596,9 +1565,14 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -1596,9 +1565,14 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf.end(); sf.end();
} }
DBUG_RETURN(define_read_attrs(buf, op)); if((res= define_read_attrs(buf, op)))
} DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
}
/* /*
Start full table scan in NDB Start full table scan in NDB
...@@ -1607,6 +1581,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -1607,6 +1581,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
int ha_ndbcluster::full_table_scan(byte *buf) int ha_ndbcluster::full_table_scan(byte *buf)
{ {
uint i; uint i;
int res;
NdbResultSet *cursor; NdbResultSet *cursor;
NdbScanOperation *op; NdbScanOperation *op;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
...@@ -1620,7 +1595,14 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -1620,7 +1595,14 @@ int ha_ndbcluster::full_table_scan(byte *buf)
!(cursor= op->readTuples(lm, 0, parallelism))) !(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
if((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
} }
/* /*
...@@ -4554,4 +4536,171 @@ int ha_ndbcluster::write_ndb_file() ...@@ -4554,4 +4536,171 @@ int ha_ndbcluster::write_ndb_file()
DBUG_RETURN(error); DBUG_RETURN(error);
} }
int
ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
key_multi_range *ranges,
uint range_count,
bool sorted,
handler_buffer *buffer)
{
DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
int res;
uint i;
KEY* key_info= table->key_info + active_index;
NDB_INDEX_TYPE index_type = get_index_type(active_index);
ulong reclength = table->reclength;
NdbOperation* op;
switch(index_type){
case UNIQUE_INDEX:
case PRIMARY_KEY_INDEX:
break;
case PRIMARY_KEY_ORDERED_INDEX:
case UNIQUE_ORDERED_INDEX:
/**
* Currently not supported on ordered indexes
*/
for(i= 0; i<range_count; i++)
{
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
continue;
/**
* Mark that we using hander:: implementation
*/
m_disable_multi_read= true;
return handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer);
}
break;
default:
case ORDERED_INDEX:
m_disable_multi_read= true;
return handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer);
}
m_disable_multi_read= false;
multi_ranges= ranges;
multi_range_count= range_count;
multi_range_sorted= sorted;
multi_range_buffer= buffer;
multi_range_found_p= found_range_p;
byte* curr = buffer->buffer;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
const NdbOperation* lastOp = m_active_trans->getLastDefinedOperation();
switch(index_type){
case PRIMARY_KEY_INDEX:
case PRIMARY_KEY_ORDERED_INDEX:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
{
if ((op= m_active_trans->getNdbOperation(tab)) &&
!op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) &&
!define_read_attrs(curr, op) &&
(op->setAbortOption(IgnoreError), true))
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
}
break;
case UNIQUE_INDEX:
case UNIQUE_ORDERED_INDEX:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
{
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
!op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) &&
!define_read_attrs(curr, op) &&
(op->setAbortOption(IgnoreError), true))
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
}
}
if(i != range_count)
{
buffer->end_of_used_area= buffer->buffer_end;
}
else
{
buffer->end_of_used_area= curr;
}
/**
* Set first operation in multi range
*/
m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if(!(res= execute_no_commit_ie(this, m_active_trans)))
{
multi_range_curr= 0;
m_multi_range_result_ptr= buffer->buffer;
return read_multi_range_next();
}
ERR_RETURN(m_active_trans->getNdbError());
}
int
ha_ndbcluster::read_multi_range_next()
{
DBUG_ENTER("ha_ndbcluster::read_multi_range_next");
if(m_disable_multi_read)
DBUG_RETURN(handler::read_multi_range_next());
ulong reclength = table->reclength;
const NdbOperation* op = m_current_multi_operation;
while(multi_range_curr < multi_range_count && op && op->getNdbError().code)
{
multi_range_curr++;
op = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
}
if(multi_range_curr < multi_range_count && op)
{
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
unpack_record(table->record[0]);
table->status= 0;
/**
* Move to next
*/
multi_range_curr++;
m_current_multi_operation = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
DBUG_RETURN(0);
}
if(multi_range_curr == multi_range_count)
{
DBUG_RETURN(HA_ERR_END_OF_FILE);
}
/**
* Read remaining ranges
*/
uint left = multi_range_count - multi_range_curr;
DBUG_RETURN(read_multi_range_first(multi_range_found_p,
multi_ranges + multi_range_curr,
left,
multi_range_sorted,
multi_range_buffer));
}
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
...@@ -111,6 +111,14 @@ class ha_ndbcluster: public handler ...@@ -111,6 +111,14 @@ class ha_ndbcluster: public handler
byte* buf); byte* buf);
int read_range_next(); int read_range_next();
/**
* Multi range stuff
*/
int read_multi_range_first(key_multi_range **found_range_p,
key_multi_range *ranges, uint range_count,
bool sorted, handler_buffer *buffer);
int read_multi_range_next(void);
bool get_error_message(int error, String *buf); bool get_error_message(int error, String *buf);
void info(uint); void info(uint);
int extra(enum ha_extra_function operation); int extra(enum ha_extra_function operation);
...@@ -257,6 +265,10 @@ class ha_ndbcluster: public handler ...@@ -257,6 +265,10 @@ class ha_ndbcluster: public handler
uint32 m_blobs_buffer_size; uint32 m_blobs_buffer_size;
uint m_dupkey; uint m_dupkey;
bool m_disable_multi_read;
byte* m_multi_range_result_ptr;
const NdbOperation* m_current_multi_operation;
void set_rec_per_key(); void set_rec_per_key();
void records_update(); void records_update();
void no_uncommitted_rows_execute_failure(); void no_uncommitted_rows_execute_failure();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment