Commit c90fdd44 authored by joreland@mysql.com's avatar joreland@mysql.com

Added support for HA_READ_ORDER +

Added table statistics
parent 7e22cf6f
......@@ -86,6 +86,9 @@ static int packfrm(const void *data, uint len, const void **pack_data, uint *pac
static int unpackfrm(const void **data, uint *len,
const void* pack_data);
static int ndb_get_table_statistics(Ndb*, const char *,
Uint64* rows, Uint64* commits);
/*
Error handling functions
*/
......@@ -551,7 +554,11 @@ int ha_ndbcluster::get_metadata(const char *path)
// All checks OK, lets use the table
m_table= (void*)tab;
Uint64 rows;
if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
records= rows;
}
DBUG_RETURN(build_index_list(table, ILBP_OPEN));
}
......@@ -709,19 +716,22 @@ static const ulong index_type_flags[]=
through the index.
*/
// HA_KEYREAD_ONLY |
HA_READ_NEXT |
HA_READ_RANGE,
HA_READ_NEXT |
HA_READ_RANGE |
HA_READ_ORDER,
/* UNIQUE_INDEX */
HA_ONLY_WHOLE_INDEX,
/* UNIQUE_ORDERED_INDEX */
HA_READ_NEXT |
HA_READ_RANGE,
HA_READ_NEXT |
HA_READ_RANGE |
HA_READ_ORDER,
/* ORDERED_INDEX */
HA_READ_NEXT |
HA_READ_RANGE,
HA_READ_NEXT |
HA_READ_RANGE |
HA_READ_ORDER
};
static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
......@@ -1956,7 +1966,10 @@ int ha_ndbcluster::index_first(byte *buf)
{
DBUG_ENTER("index_first");
statistic_increment(ha_read_first_count,&LOCK_status);
DBUG_RETURN(1);
// Start the ordered index scan and fetch the first row
// Only HA_READ_ORDER indexes get called by index_first
DBUG_RETURN(ordered_index_scan(0, 0, true, buf));
}
......@@ -1964,6 +1977,16 @@ int ha_ndbcluster::index_last(byte *buf)
{
DBUG_ENTER("index_last");
statistic_increment(ha_read_last_count,&LOCK_status);
int res;
if((res= ordered_index_scan(0, 0, true, buf)) == 0){
NdbResultSet *cursor= m_active_cursor;
while((res= cursor->nextResult(true)) == 0);
if(res == 1){
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
}
}
DBUG_RETURN(1);
}
......@@ -2397,7 +2420,11 @@ const char **ha_ndbcluster::bas_ext() const
double ha_ndbcluster::scan_time()
{
return rows2double(records*1000);
DBUG_ENTER("ha_ndbcluster::scan_time()");
double res= rows2double(records*1000);
DBUG_PRINT("exit", ("table: %s value: %f",
m_tabname, res));
DBUG_RETURN(res);
}
......@@ -3579,8 +3606,6 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
NDB_INDEX_TYPE idx_type= get_index_type(inx);
DBUG_ENTER("records_in_range");
DBUG_PRINT("enter", ("inx: %u", inx));
// Prevent partial read of hash indexes by returning HA_POS_ERROR
if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
((min_key && min_key->length < key_length) ||
......@@ -3755,4 +3780,63 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
DBUG_RETURN(0);
}
static
int
ndb_get_table_statistics(Ndb* ndb, const char * table,
Uint64* row_count, Uint64* commit_count)
{
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
do
{
NdbConnection* pTrans= ndb->startTransaction();
if (pTrans == NULL)
break;
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
NdbResultSet* rs= pOp->readTuples(NdbScanOperation::LM_Dirty);
if (rs == 0)
break;
int check= pOp->interpret_exit_last_row();
if (check == -1)
break;
Uint64 rows, commits;
pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
check= pTrans->execute(NoCommit);
if (check == -1)
break;
Uint64 sum_rows= 0;
Uint64 sum_commits= 0;
while((check= rs->nextResult(true)) == 0)
{
sum_rows+= rows;
sum_commits+= commits;
}
if (check == -1)
break;
ndb->closeTransaction(pTrans);
if(row_count)
* row_count= sum_rows;
if(commit_count)
* commit_count= sum_commits;
DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
DBUG_RETURN(0);
} while(0);
DBUG_PRINT("exit", ("failed"));
DBUG_RETURN(-1);
}
#endif /* HAVE_NDBCLUSTER_DB */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment