Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
86bb6a2b
Commit
86bb6a2b
authored
May 10, 2004
by
magnus@neptunus.(none)
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
WL# 1728 Handler: use scanReadExclusive for scan update and delete
parent
1d8bf1cb
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
227 additions
and
175 deletions
+227
-175
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+224
-172
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+3
-3
No files found.
sql/ha_ndbcluster.cc
View file @
86bb6a2b
...
@@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata()
...
@@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN
;
DBUG_VOID_RETURN
;
}
}
inline
int
ha_ndbcluster
::
get_ndb_lock_type
()
{
return
(
int
)((
m_lock
.
type
==
TL_WRITE_ALLOW_WRITE
)
?
NdbCursorOperation
::
LM_Exclusive
:
NdbCursorOperation
::
LM_Read
);
}
static
const
ulong
index_type_flags
[]
=
static
const
ulong
index_type_flags
[]
=
{
{
/* UNDEFINED_INDEX */
/* UNDEFINED_INDEX */
...
@@ -650,22 +657,61 @@ int ha_ndbcluster::unique_index_read(const byte *key,
...
@@ -650,22 +657,61 @@ int ha_ndbcluster::unique_index_read(const byte *key,
}
}
/*
/*
Get the next record of a started scan
Get the next record of a started scan. Try to fetch
it locally from NdbApi cached records if possible,
otherwise ask NDB for more.
NOTE
If this is a update/delete make sure to not contact
NDB before any pending ops have been sent to NDB.
*/
*/
inline
int
ha_ndbcluster
::
next_result
(
byte
*
buf
)
inline
int
ha_ndbcluster
::
next_result
(
byte
*
buf
)
{
{
int
check
;
NdbConnection
*
trans
=
m_active_trans
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbResultSet
*
cursor
=
m_active_cursor
;
DBUG_ENTER
(
"next_result"
);
DBUG_ENTER
(
"next_result"
);
if
(
cursor
->
nextResult
()
==
0
)
if
(
!
cursor
)
{
DBUG_RETURN
(
HA_ERR_END_OF_FILE
);
// One more record found
unpack_record
(
buf
);
/*
table
->
status
=
0
;
If this an update or delete, call nextResult with false
DBUG_RETURN
(
0
);
to process any records already cached in NdbApi
}
*/
bool
contact_ndb
=
m_lock
.
type
!=
TL_WRITE_ALLOW_WRITE
;
do
{
DBUG_PRINT
(
"info"
,
(
"Call nextResult, contact_ndb: %d"
,
contact_ndb
));
check
=
cursor
->
nextResult
(
contact_ndb
);
if
(
check
==
0
)
{
// One more record found
DBUG_PRINT
(
"info"
,
(
"One more record found"
));
unpack_record
(
buf
);
table
->
status
=
0
;
DBUG_RETURN
(
0
);
}
else
if
(
check
==
1
||
check
==
2
)
{
// 1: No more records
// 2: No more cached records
/*
Before fetching more rows and releasing lock(s),
all pending update or delete operations should
be sent to NDB
*/
DBUG_PRINT
(
"info"
,
(
"ops_pending: %d"
,
ops_pending
));
if
(
ops_pending
&&
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
ops_pending
=
0
;
contact_ndb
=
(
check
==
2
);
}
}
while
(
check
==
2
);
table
->
status
=
STATUS_NOT_FOUND
;
table
->
status
=
STATUS_NOT_FOUND
;
if
(
ndb_err
(
trans
))
if
(
ndb_err
(
trans
))
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
...
@@ -737,28 +783,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
...
@@ -737,28 +783,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
/*
/*
Read record(s) from NDB using ordered index scan
Start ordered index scan in NDB
*/
*/
int
ha_ndbcluster
::
ordered_index_scan
(
const
key_range
*
start_key
,
int
ha_ndbcluster
::
ordered_index_scan
(
const
key_range
*
start_key
,
const
key_range
*
end_key
,
const
key_range
*
end_key
,
bool
sorted
,
byte
*
buf
)
bool
sorted
,
byte
*
buf
)
{
{
uint
no_fields
=
table
->
fields
;
uint
i
;
NdbConnection
*
trans
=
m_active_trans
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbResultSet
*
cursor
;
NdbScanOperation
*
op
;
NdbScanOperation
*
op
;
const
char
*
index_name
;
const
char
*
index_name
;
THD
*
thd
=
current_thd
;
DBUG_ENTER
(
"ordered_index_scan"
);
DBUG_ENTER
(
"ordered_index_scan"
);
DBUG_PRINT
(
"enter"
,
(
"index: %u
"
,
active_index
));
DBUG_PRINT
(
"enter"
,
(
"index: %u
, sorted: %d"
,
active_index
,
sorted
));
DBUG_PRINT
(
"enter"
,
(
"Starting new ordered scan on %s"
,
m_tabname
));
DBUG_PRINT
(
"enter"
,
(
"Starting new ordered scan on %s"
,
m_tabname
));
index_name
=
get_index_name
(
active_index
);
index_name
=
get_index_name
(
active_index
);
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
index_name
,
m_tabname
)))
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
index_name
,
m_tabname
)))
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
)))
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
,
(
NdbCursorOperation
::
LockMode
)
get_ndb_lock_type
())))
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
m_active_cursor
=
cursor
;
m_active_cursor
=
cursor
;
...
@@ -782,55 +828,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
...
@@ -782,55 +828,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
NdbOperation
::
BoundGT
))
NdbOperation
::
BoundGT
))
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
// Define attributes to read
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
for
(
i
=
0
;
i
<
no_fields
;
i
++
)
{
Field
*
field
=
table
->
field
[
i
];
if
((
thd
->
query_id
==
field
->
query_id
)
||
(
field
->
flags
&
PRI_KEY_FLAG
))
{
if
(
get_ndb_value
(
op
,
i
,
field
->
ptr
))
ERR_RETURN
(
op
->
getNdbError
());
}
else
{
m_value
[
i
]
=
NULL
;
}
}
if
(
table
->
primary_key
==
MAX_KEY
)
{
DBUG_PRINT
(
"info"
,
(
"Getting hidden key"
));
// Scanning table with no primary key
int
hidden_no
=
no_fields
;
#ifndef DBUG_OFF
const
NDBTAB
*
tab
=
(
NDBTAB
*
)
m_table
;
if
(
!
tab
->
getColumn
(
hidden_no
))
DBUG_RETURN
(
1
);
#endif
if
(
get_ndb_value
(
op
,
hidden_no
,
NULL
))
ERR_RETURN
(
op
->
getNdbError
());
}
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_PRINT
(
"exit"
,
(
"Scan started successfully"
));
DBUG_RETURN
(
next_result
(
buf
));
}
}
#if 0
/*
/*
Read record(s) from NDB using full table scan with filter
Start a filtered scan in NDB.
NOTE
This function is here as an example of how to start a
filtered scan. It should be possible to replace full_table_scan
with this function and make a best effort attempt
at filtering out the irrelevant data by converting the "items"
into interpreted instructions.
This would speed up table scans where there is a limiting WHERE clause
that doesn't match any index in the table.
*/
*/
int
ha_ndbcluster
::
filtered_scan
(
const
byte
*
key
,
uint
key_len
,
int
ha_ndbcluster
::
filtered_scan
(
const
byte
*
key
,
uint
key_len
,
byte
*
buf
,
byte
*
buf
,
enum
ha_rkey_function
find_flag
)
enum
ha_rkey_function
find_flag
)
{
{
uint no_fields= table->fields;
NdbConnection
*
trans
=
m_active_trans
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet *cursor= m_active_cursor;
NdbResultSet
*
cursor
;
NdbScanOperation
*
op
;
DBUG_ENTER
(
"filtered_scan"
);
DBUG_ENTER
(
"filtered_scan"
);
DBUG_PRINT
(
"enter"
,
(
"key_len: %u, index: %u"
,
DBUG_PRINT
(
"enter"
,
(
"key_len: %u, index: %u"
,
...
@@ -838,12 +860,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
...
@@ -838,12 +860,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_DUMP
(
"key"
,
(
char
*
)
key
,
key_len
);
DBUG_DUMP
(
"key"
,
(
char
*
)
key
,
key_len
);
DBUG_PRINT
(
"info"
,
(
"Starting a new filtered scan on %s"
,
DBUG_PRINT
(
"info"
,
(
"Starting a new filtered scan on %s"
,
m_tabname
));
m_tabname
));
NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
if (!
op
)
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
m_tabname
))
)
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
if
(
!
(
cursor
=
cursor= op->readTuples(parallelism);
op
->
readTuples
(
parallelism
,
if (!cursor
)
(
NdbCursorOperation
::
LockMode
)
get_ndb_lock_type
()))
)
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
m_active_cursor
=
cursor
;
m_active_cursor
=
cursor
;
...
@@ -892,60 +914,44 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
...
@@ -892,60 +914,44 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf
.
end
();
sf
.
end
();
}
}
// Define attributes to read
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
for (uint field_no= 0; field_no < no_fields; field_no++)
{
Field *field= table->field[field_no];
// Read attribute
DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
if (get_ndb_value(op, field_no, field->ptr))
ERR_RETURN(op->getNdbError());
}
if (table->primary_key == MAX_KEY)
{
DBUG_PRINT("info", ("Getting hidden key"));
// Scanning table with no primary key
int hidden_no= no_fields;
#ifndef DBUG_OFF
const NDBTAB *tab= (NDBTAB *) m_table;
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
if (get_ndb_value(op, hidden_no, NULL))
ERR_RETURN(op->getNdbError());
}
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
}
}
#endif
/*
/*
Read records from NDB using full table scan
Start full table scan in NDB
*/
*/
int
ha_ndbcluster
::
full_table_scan
(
byte
*
buf
)
int
ha_ndbcluster
::
full_table_scan
(
byte
*
buf
)
{
{
uint
i
;
uint
i
;
THD
*
thd
=
current_thd
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
;
NdbResultSet
*
cursor
;
NdbScanOperation
*
op
;
NdbScanOperation
*
op
;
NdbConnection
*
trans
=
m_active_trans
;
DBUG_ENTER
(
"full_table_scan"
);
DBUG_ENTER
(
"full_table_scan"
);
DBUG_PRINT
(
"enter"
,
(
"Starting new scan on %s"
,
m_tabname
));
DBUG_PRINT
(
"enter"
,
(
"Starting new scan on %s"
,
m_tabname
));
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
m_tabname
)))
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
m_tabname
)))
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
)))
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
,
(
NdbCursorOperation
::
LockMode
)
get_ndb_lock_type
())))
ERR_RETURN
(
trans
->
getNdbError
());
ERR_RETURN
(
trans
->
getNdbError
());
m_active_cursor
=
cursor
;
m_active_cursor
=
cursor
;
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
}
inline
int
ha_ndbcluster
::
define_read_attrs
(
byte
*
buf
,
NdbOperation
*
op
)
{
uint
i
;
THD
*
thd
=
current_thd
;
NdbConnection
*
trans
=
m_active_trans
;
DBUG_ENTER
(
"define_read_attrs"
);
// Define attributes to read
// Define attributes to read
for
(
i
=
0
;
i
<
table
->
fields
;
i
++
)
for
(
i
=
0
;
i
<
table
->
fields
;
i
++
)
{
{
...
@@ -1040,7 +1046,8 @@ int ha_ndbcluster::write_row(byte *record)
...
@@ -1040,7 +1046,8 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
Find out how this is detected!
*/
*/
rows_inserted
++
;
rows_inserted
++
;
if
((
rows_inserted
%
bulk_insert_rows
)
==
0
)
if
((
rows_inserted
==
rows_to_insert
)
||
((
rows_inserted
%
bulk_insert_rows
)
==
0
))
{
{
// Send rows to NDB
// Send rows to NDB
DBUG_PRINT
(
"info"
,
(
"Sending inserts to NDB, "
\
DBUG_PRINT
(
"info"
,
(
"Sending inserts to NDB, "
\
...
@@ -1095,6 +1102,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
...
@@ -1095,6 +1102,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
{
THD
*
thd
=
current_thd
;
THD
*
thd
=
current_thd
;
NdbConnection
*
trans
=
m_active_trans
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbOperation
*
op
;
NdbOperation
*
op
;
uint
i
;
uint
i
;
DBUG_ENTER
(
"update_row"
);
DBUG_ENTER
(
"update_row"
);
...
@@ -1103,49 +1111,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
...
@@ -1103,49 +1111,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if
(
table
->
timestamp_on_update_now
)
if
(
table
->
timestamp_on_update_now
)
update_timestamp
(
new_data
+
table
->
timestamp_on_update_now
-
1
);
update_timestamp
(
new_data
+
table
->
timestamp_on_update_now
-
1
);
if
(
!
(
op
=
trans
->
getNdbOperation
(
m_tabname
))
||
/* Check for update of primary key and return error */
op
->
updateTuple
()
!=
0
)
if
((
table
->
primary_key
!=
MAX_KEY
)
&&
ERR_RETURN
(
trans
->
getNdbError
());
(
key_cmp
(
table
->
primary_key
,
old_data
,
new_data
)))
DBUG_RETURN
(
HA_ERR_UNSUPPORTED
);
if
(
table
->
primary_key
==
MAX_KEY
)
{
if
(
cursor
)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT
(
"info"
,
(
"Using hidden key"
));
// Require that the PK for this record has previously been
// read into m_value
uint
no_fields
=
table
->
fields
;
NdbRecAttr
*
rec
=
m_value
[
no_fields
];
DBUG_ASSERT
(
rec
);
DBUG_DUMP
(
"key"
,
(
char
*
)
rec
->
aRef
(),
NDB_HIDDEN_PRIMARY_KEY_LENGTH
);
if
(
set_hidden_key
(
op
,
no_fields
,
rec
->
aRef
()))
ERR_RETURN
(
op
->
getNdbError
());
}
else
{
{
/* Check for update of primary key and return error */
/*
if
(
key_cmp
(
table
->
primary_key
,
old_data
,
new_data
))
We are scanning records and want to update the record
DBUG_RETURN
(
HA_ERR_UNSUPPORTED
);
that was just found, call updateTuple on the cursor
to take over the lock to a new update operation
int
res
;
And thus setting the primary key of the record from
if
((
res
=
set_primary_key
(
op
,
old_data
+
table
->
null_bytes
)))
the active record in cursor
DBUG_RETURN
(
res
);
*/
DBUG_PRINT
(
"info"
,
(
"Calling updateTuple on cursor"
));
if
(
!
(
op
=
cursor
->
updateTuple
()))
ERR_RETURN
(
trans
->
getNdbError
());
ops_pending
++
;
}
else
{
if
(
!
(
op
=
trans
->
getNdbOperation
(
m_tabname
))
||
op
->
updateTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
if
(
table
->
primary_key
==
MAX_KEY
)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT
(
"info"
,
(
"Using hidden key"
));
// Require that the PK for this record has previously been
// read into m_value
uint
no_fields
=
table
->
fields
;
NdbRecAttr
*
rec
=
m_value
[
no_fields
];
DBUG_ASSERT
(
rec
);
DBUG_DUMP
(
"key"
,
(
char
*
)
rec
->
aRef
(),
NDB_HIDDEN_PRIMARY_KEY_LENGTH
);
if
(
set_hidden_key
(
op
,
no_fields
,
rec
->
aRef
()))
ERR_RETURN
(
op
->
getNdbError
());
}
else
{
int
res
;
if
((
res
=
set_primary_key
(
op
,
old_data
+
table
->
null_bytes
)))
DBUG_RETURN
(
res
);
}
}
}
// Set non-key attribute(s)
// Set non-key attribute(s)
for
(
i
=
0
;
i
<
table
->
fields
;
i
++
)
for
(
i
=
0
;
i
<
table
->
fields
;
i
++
)
{
{
Field
*
field
=
table
->
field
[
i
];
Field
*
field
=
table
->
field
[
i
];
if
((
thd
->
query_id
==
field
->
query_id
)
&&
if
((
thd
->
query_id
==
field
->
query_id
)
&&
(
!
(
field
->
flags
&
PRI_KEY_FLAG
))
&&
(
!
(
field
->
flags
&
PRI_KEY_FLAG
))
&&
set_ndb_value
(
op
,
field
,
i
))
set_ndb_value
(
op
,
field
,
i
))
ERR_RETURN
(
op
->
getNdbError
());
ERR_RETURN
(
op
->
getNdbError
());
}
}
// Execute update operation
// Execute update operation
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
if
(
!
cursor
&&
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
...
@@ -1159,39 +1184,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
...
@@ -1159,39 +1184,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
int
ha_ndbcluster
::
delete_row
(
const
byte
*
record
)
int
ha_ndbcluster
::
delete_row
(
const
byte
*
record
)
{
{
NdbConnection
*
trans
=
m_active_trans
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbOperation
*
op
;
NdbOperation
*
op
;
DBUG_ENTER
(
"delete_row"
);
DBUG_ENTER
(
"delete_row"
);
statistic_increment
(
ha_delete_count
,
&
LOCK_status
);
statistic_increment
(
ha_delete_count
,
&
LOCK_status
);
if
(
!
(
op
=
trans
->
getNdbOperation
(
m_tabname
))
||
if
(
cursor
)
op
->
deleteTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
if
(
table
->
primary_key
==
MAX_KEY
)
{
{
// This table has no primary key, use "hidden" primary key
/*
DBUG_PRINT
(
"info"
,
(
"Using hidden key"
));
We are scanning records and want to update the record
uint
no_fields
=
table
->
fields
;
that was just found, call deleteTuple on the cursor
NdbRecAttr
*
rec
=
m_value
[
no_fields
];
to take over the lock to a new update operation
DBUG_ASSERT
(
rec
!=
NULL
);
And thus setting the primary key of the record from
the active record in cursor
*/
DBUG_PRINT
(
"info"
,
(
"Calling deleteTuple on cursor"
));
if
(
cursor
->
deleteTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
ops_pending
++
;
if
(
set_hidden_key
(
op
,
no_fields
,
rec
->
aRef
()))
// If deleting from cursor, NoCommit will be handled in next_result
ERR_RETURN
(
op
->
getNdbError
()
);
DBUG_RETURN
(
0
);
}
}
else
else
{
{
int
res
;
if
((
res
=
set_primary_key
(
op
)))
if
(
!
(
op
=
trans
->
getNdbOperation
(
m_tabname
))
||
return
res
;
op
->
deleteTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
if
(
table
->
primary_key
==
MAX_KEY
)
{
// This table has no primary key, use "hidden" primary key
DBUG_PRINT
(
"info"
,
(
"Using hidden key"
));
uint
no_fields
=
table
->
fields
;
NdbRecAttr
*
rec
=
m_value
[
no_fields
];
DBUG_ASSERT
(
rec
!=
NULL
);
if
(
set_hidden_key
(
op
,
no_fields
,
rec
->
aRef
()))
ERR_RETURN
(
op
->
getNdbError
());
}
else
{
int
res
;
if
((
res
=
set_primary_key
(
op
)))
return
res
;
}
}
}
// Execute delete operation
// Execute delete operation
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
/*
/*
Unpack a record read from NDB
Unpack a record read from NDB
...
@@ -1479,11 +1526,7 @@ int ha_ndbcluster::index_next(byte *buf)
...
@@ -1479,11 +1526,7 @@ int ha_ndbcluster::index_next(byte *buf)
int
error
=
1
;
int
error
=
1
;
statistic_increment
(
ha_read_next_count
,
&
LOCK_status
);
statistic_increment
(
ha_read_next_count
,
&
LOCK_status
);
if
(
!
m_active_cursor
)
DBUG_RETURN
(
next_result
(
buf
));
error
=
HA_ERR_END_OF_FILE
;
else
error
=
next_result
(
buf
);
DBUG_RETURN
(
error
);
}
}
...
@@ -1515,33 +1558,38 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
...
@@ -1515,33 +1558,38 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
const
key_range
*
end_key
,
const
key_range
*
end_key
,
bool
sorted
)
bool
sorted
)
{
{
int
error
=
1
;
KEY
*
key_info
;
int
error
=
1
;
byte
*
buf
=
table
->
record
[
0
];
DBUG_ENTER
(
"ha_ndbcluster::read_range_first"
);
DBUG_ENTER
(
"ha_ndbcluster::read_range_first"
);
DBUG_PRINT
(
"info"
,
(
"sorted: %d"
,
sorted
));
switch
(
get_index_type
(
active_index
)){
switch
(
get_index_type
(
active_index
)){
case
PRIMARY_KEY_INDEX
:
case
PRIMARY_KEY_INDEX
:
error
=
pk_read
(
start_key
->
key
,
start_key
->
length
,
key_info
=
table
->
key_info
+
active_index
;
table
->
record
[
0
]);
if
(
start_key
&&
start_key
->
length
==
key_info
->
key_length
&&
start_key
->
flag
==
HA_READ_KEY_EXACT
)
DBUG_RETURN
(
pk_read
(
start_key
->
key
,
start_key
->
length
,
buf
));
break
;
break
;
case
UNIQUE_INDEX
:
case
UNIQUE_INDEX
:
error
=
unique_index_read
(
start_key
->
key
,
start_key
->
length
,
key_info
=
table
->
key_info
+
active_index
;
table
->
record
[
0
]);
if
(
start_key
&&
break
;
start_key
->
length
==
key_info
->
key_length
&&
start_key
->
flag
==
HA_READ_KEY_EXACT
)
case
ORDERED_INDEX
:
DBUG_RETURN
(
unique_index_read
(
start_key
->
key
,
start_key
->
length
,
buf
));
// Start the ordered index scan and fetch the first row
error
=
ordered_index_scan
(
start_key
,
end_key
,
sorted
,
table
->
record
[
0
]);
break
;
break
;
default:
default:
case
UNDEFINED_INDEX
:
break
;
break
;
}
}
// Start the ordered index scan and fetch the first row
error
=
ordered_index_scan
(
start_key
,
end_key
,
sorted
,
buf
);
DBUG_RETURN
(
error
);
DBUG_RETURN
(
error
);
}
}
int
ha_ndbcluster
::
read_range_next
(
bool
eq_range
)
int
ha_ndbcluster
::
read_range_next
(
bool
eq_range
)
{
{
DBUG_ENTER
(
"ha_ndbcluster::read_range_next"
);
DBUG_ENTER
(
"ha_ndbcluster::read_range_next"
);
...
@@ -1581,12 +1629,10 @@ int ha_ndbcluster::rnd_next(byte *buf)
...
@@ -1581,12 +1629,10 @@ int ha_ndbcluster::rnd_next(byte *buf)
{
{
DBUG_ENTER
(
"rnd_next"
);
DBUG_ENTER
(
"rnd_next"
);
statistic_increment
(
ha_read_rnd_next_count
,
&
LOCK_status
);
statistic_increment
(
ha_read_rnd_next_count
,
&
LOCK_status
);
int
error
=
1
;
if
(
!
m_active_cursor
)
if
(
!
m_active_cursor
)
error
=
full_table_scan
(
buf
);
DBUG_RETURN
(
full_table_scan
(
buf
));
else
DBUG_RETURN
(
next_result
(
buf
));
error
=
next_result
(
buf
);
DBUG_RETURN
(
error
);
}
}
...
@@ -1914,6 +1960,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
...
@@ -1914,6 +1960,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
m_lock
.
type
=
lock_type
;
m_lock
.
type
=
lock_type
;
}
}
*
to
++=
&
m_lock
;
*
to
++=
&
m_lock
;
DBUG_PRINT
(
"exit"
,
(
"lock_type: %d"
,
lock_type
));
DBUG_RETURN
(
to
);
DBUG_RETURN
(
to
);
}
}
...
@@ -2028,8 +2076,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
...
@@ -2028,8 +2076,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
(
NdbConnection
*
)
thd
->
transaction
.
stmt
.
ndb_tid
;
(
NdbConnection
*
)
thd
->
transaction
.
stmt
.
ndb_tid
;
DBUG_ASSERT
(
m_active_trans
);
DBUG_ASSERT
(
m_active_trans
);
retrieve_all_fields
=
FALSE
;
// Start of transaction
retrieve_all_fields
=
FALSE
;
ops_pending
=
0
;
}
}
else
else
{
{
...
@@ -2081,7 +2130,9 @@ int ha_ndbcluster::start_stmt(THD *thd)
...
@@ -2081,7 +2130,9 @@ int ha_ndbcluster::start_stmt(THD *thd)
}
}
m_active_trans
=
trans
;
m_active_trans
=
trans
;
// Start of statement
retrieve_all_fields
=
FALSE
;
retrieve_all_fields
=
FALSE
;
ops_pending
=
0
;
DBUG_RETURN
(
error
);
DBUG_RETURN
(
error
);
}
}
...
@@ -2562,7 +2613,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
...
@@ -2562,7 +2613,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
retrieve_all_fields
(
FALSE
),
retrieve_all_fields
(
FALSE
),
rows_to_insert
(
0
),
rows_to_insert
(
0
),
rows_inserted
(
0
),
rows_inserted
(
0
),
bulk_insert_rows
(
1024
)
bulk_insert_rows
(
1024
),
ops_pending
(
0
)
{
{
int
i
;
int
i
;
...
...
sql/ha_ndbcluster.h
View file @
86bb6a2b
...
@@ -152,6 +152,7 @@ class ha_ndbcluster: public handler
...
@@ -152,6 +152,7 @@ class ha_ndbcluster: public handler
const
char
*
get_unique_index_name
(
uint
idx_no
)
const
;
const
char
*
get_unique_index_name
(
uint
idx_no
)
const
;
NDB_INDEX_TYPE
get_index_type
(
uint
idx_no
)
const
;
NDB_INDEX_TYPE
get_index_type
(
uint
idx_no
)
const
;
NDB_INDEX_TYPE
get_index_type_from_table
(
uint
index_no
)
const
;
NDB_INDEX_TYPE
get_index_type_from_table
(
uint
index_no
)
const
;
int
get_ndb_lock_type
();
int
pk_read
(
const
byte
*
key
,
uint
key_len
,
int
pk_read
(
const
byte
*
key
,
uint
key_len
,
byte
*
buf
);
byte
*
buf
);
...
@@ -162,12 +163,10 @@ class ha_ndbcluster: public handler
...
@@ -162,12 +163,10 @@ class ha_ndbcluster: public handler
bool
sorted
,
byte
*
buf
);
bool
sorted
,
byte
*
buf
);
int
full_table_scan
(
byte
*
buf
);
int
full_table_scan
(
byte
*
buf
);
int
next_result
(
byte
*
buf
);
int
next_result
(
byte
*
buf
);
#if 0
int
define_read_attrs
(
byte
*
buf
,
NdbOperation
*
op
);
int
filtered_scan
(
const
byte
*
key
,
uint
key_len
,
int
filtered_scan
(
const
byte
*
key
,
uint
key_len
,
byte
*
buf
,
byte
*
buf
,
enum
ha_rkey_function
find_flag
);
enum
ha_rkey_function
find_flag
);
#endif
void
unpack_record
(
byte
*
buf
);
void
unpack_record
(
byte
*
buf
);
void
set_dbname
(
const
char
*
pathname
);
void
set_dbname
(
const
char
*
pathname
);
...
@@ -212,6 +211,7 @@ class ha_ndbcluster: public handler
...
@@ -212,6 +211,7 @@ class ha_ndbcluster: public handler
ha_rows
rows_to_insert
;
ha_rows
rows_to_insert
;
ha_rows
rows_inserted
;
ha_rows
rows_inserted
;
ha_rows
bulk_insert_rows
;
ha_rows
bulk_insert_rows
;
ha_rows
ops_pending
;
};
};
bool
ndbcluster_init
(
void
);
bool
ndbcluster_init
(
void
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment