Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
787c9786
Commit
787c9786
authored
May 10, 2004
by
magnus@neptunus.(none)
Browse files
Options
Browse Files
Download
Plain Diff
Merged ha_ndbcluster.cc
parents
05d83a89
794b3b52
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
275 additions
and
135 deletions
+275
-135
mysql-test/r/ndb_basic.result
mysql-test/r/ndb_basic.result
+51
-3
mysql-test/t/ndb_basic.test
mysql-test/t/ndb_basic.test
+16
-5
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+205
-124
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+3
-3
No files found.
mysql-test/r/ndb_basic.result
View file @
787c9786
...
...
@@ -25,11 +25,59 @@ pk1 attr1
DELETE FROM t1;
SELECT * FROM t1;
pk1 attr1
INSERT INTO t1 VALUES (9410,9412);
INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765),
(7,8), (8,9), (9,10), (10,11), (11,12), (12,13), (13,14);
UPDATE t1 SET attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9999
8 9999
9 9999
10 9999
11 9999
12 9999
13 9999
9408 9999
9410 9999
9411 9999
UPDATE t1 SET attr1 = 9998 WHERE pk1 < 1000;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9999
9410 9999
9411 9999
UPDATE t1 SET attr1 = 9997 WHERE attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9997
9410 9997
9411 9997
DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1;
SELECT * FROM t1
ORDER BY pk1
;
pk1 attr1
INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765);
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9997
9411 9997
DELETE FROM t1;
SELECT * FROM t1;
pk1 attr1
...
...
mysql-test/t/ndb_basic.test
View file @
787c9786
...
...
@@ -23,6 +23,7 @@ SELECT pk1 FROM t1;
SELECT
*
FROM
t1
;
SELECT
t1
.*
FROM
t1
;
# Update on record by primary key
UPDATE
t1
SET
attr1
=
1
WHERE
pk1
=
9410
;
SELECT
*
FROM
t1
;
...
...
@@ -35,13 +36,23 @@ SELECT * FROM t1;
DELETE
FROM
t1
;
SELECT
*
FROM
t1
;
# Delete the record by specifying pk
INSERT
INTO
t1
VALUES
(
9410
,
9412
);
# Insert more records and update them all at once
INSERT
INTO
t1
VALUES
(
9410
,
9412
),
(
9411
,
9413
),
(
9408
,
8765
),
(
7
,
8
),
(
8
,
9
),
(
9
,
10
),
(
10
,
11
),
(
11
,
12
),
(
12
,
13
),
(
13
,
14
);
UPDATE
t1
SET
attr1
=
9999
;
SELECT
*
FROM
t1
ORDER
BY
pk1
;
UPDATE
t1
SET
attr1
=
9998
WHERE
pk1
<
1000
;
SELECT
*
FROM
t1
ORDER
BY
pk1
;
UPDATE
t1
SET
attr1
=
9997
WHERE
attr1
=
9999
;
SELECT
*
FROM
t1
ORDER
BY
pk1
;
# Delete one record by specifying pk
DELETE
FROM
t1
WHERE
pk1
=
9410
;
SELECT
*
FROM
t1
;
SELECT
*
FROM
t1
ORDER
BY
pk1
;
# Insert three records and delete the
INSERT
INTO
t1
VALUES
(
9410
,
9412
),
(
9411
,
9413
),
(
9408
,
8765
);
# Delete all from table
DELETE
FROM
t1
;
SELECT
*
FROM
t1
;
...
...
sql/ha_ndbcluster.cc
View file @
787c9786
...
...
@@ -418,6 +418,13 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN
;
}
inline
int
ha_ndbcluster
::
get_ndb_lock_type
()
{
return
(
int
)((
m_lock
.
type
==
TL_WRITE_ALLOW_WRITE
)
?
NdbCursorOperation
::
LM_Exclusive
:
NdbCursorOperation
::
LM_Read
);
}
static
const
ulong
index_type_flags
[]
=
{
/* UNDEFINED_INDEX */
...
...
@@ -652,22 +659,61 @@ int ha_ndbcluster::unique_index_read(const byte *key,
}
/*
Get the next record of a started scan
Get the next record of a started scan. Try to fetch
it locally from NdbApi cached records if possible,
otherwise ask NDB for more.
NOTE
If this is a update/delete make sure to not contact
NDB before any pending ops have been sent to NDB.
*/
inline
int
ha_ndbcluster
::
next_result
(
byte
*
buf
)
{
int
check
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
DBUG_ENTER
(
"next_result"
);
if
(
cursor
->
nextResult
()
==
0
)
if
(
!
cursor
)
DBUG_RETURN
(
HA_ERR_END_OF_FILE
);
/*
If this an update or delete, call nextResult with false
to process any records already cached in NdbApi
*/
bool
contact_ndb
=
m_lock
.
type
!=
TL_WRITE_ALLOW_WRITE
;
do
{
DBUG_PRINT
(
"info"
,
(
"Call nextResult, contact_ndb: %d"
,
contact_ndb
));
check
=
cursor
->
nextResult
(
contact_ndb
);
if
(
check
==
0
)
{
// One more record found
DBUG_PRINT
(
"info"
,
(
"One more record found"
));
unpack_record
(
buf
);
table
->
status
=
0
;
DBUG_RETURN
(
0
);
}
else
if
(
check
==
1
||
check
==
2
)
{
// 1: No more records
// 2: No more cached records
/*
Before fetching more rows and releasing lock(s),
all pending update or delete operations should
be sent to NDB
*/
DBUG_PRINT
(
"info"
,
(
"ops_pending: %d"
,
ops_pending
));
if
(
ops_pending
&&
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
ops_pending
=
0
;
contact_ndb
=
(
check
==
2
);
}
}
while
(
check
==
2
);
table
->
status
=
STATUS_NOT_FOUND
;
if
(
ndb_err
(
trans
))
ERR_RETURN
(
trans
->
getNdbError
());
...
...
@@ -739,28 +785,28 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
/*
Read record(s) from NDB using ordered index scan
Start ordered index scan in NDB
*/
int
ha_ndbcluster
::
ordered_index_scan
(
const
key_range
*
start_key
,
const
key_range
*
end_key
,
bool
sorted
,
byte
*
buf
)
{
uint
no_fields
=
table
->
fields
;
uint
i
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbResultSet
*
cursor
;
NdbScanOperation
*
op
;
const
char
*
index_name
;
THD
*
thd
=
current_thd
;
DBUG_ENTER
(
"ordered_index_scan"
);
DBUG_PRINT
(
"enter"
,
(
"index: %u
"
,
active_index
));
DBUG_PRINT
(
"enter"
,
(
"index: %u
, sorted: %d"
,
active_index
,
sorted
));
DBUG_PRINT
(
"enter"
,
(
"Starting new ordered scan on %s"
,
m_tabname
));
index_name
=
get_index_name
(
active_index
);
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
index_name
,
m_tabname
)))
ERR_RETURN
(
trans
->
getNdbError
());
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
)))
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
,
(
NdbCursorOperation
::
LockMode
)
get_ndb_lock_type
())))
ERR_RETURN
(
trans
->
getNdbError
());
m_active_cursor
=
cursor
;
...
...
@@ -817,22 +863,31 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_PRINT
(
"exit"
,
(
"Scan started successfully"
));
DBUG_RETURN
(
next_result
(
buf
));
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
}
#if 0
/*
Read record(s) from NDB using full table scan with filter
Start a filtered scan in NDB.
NOTE
This function is here as an example of how to start a
filtered scan. It should be possible to replace full_table_scan
with this function and make a best effort attempt
at filtering out the irrelevant data by converting the "items"
into interpreted instructions.
This would speed up table scans where there is a limiting WHERE clause
that doesn't match any index in the table.
*/
int
ha_ndbcluster
::
filtered_scan
(
const
byte
*
key
,
uint
key_len
,
byte
*
buf
,
enum
ha_rkey_function
find_flag
)
{
uint no_fields= table->fields;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet *cursor= m_active_cursor;
NdbResultSet
*
cursor
;
NdbScanOperation
*
op
;
DBUG_ENTER
(
"filtered_scan"
);
DBUG_PRINT
(
"enter"
,
(
"key_len: %u, index: %u"
,
...
...
@@ -840,12 +895,12 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_DUMP
(
"key"
,
(
char
*
)
key
,
key_len
);
DBUG_PRINT
(
"info"
,
(
"Starting a new filtered scan on %s"
,
m_tabname
));
NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
if (!op)
ERR_RETURN(trans->getNdbError());
cursor= op->readTuples(parallelism);
if (!cursor)
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
m_tabname
)))
ERR_RETURN
(
trans
->
getNdbError
());
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
,
(
NdbCursorOperation
::
LockMode
)
get_ndb_lock_type
())))
ERR_RETURN
(
trans
->
getNdbError
());
m_active_cursor
=
cursor
;
...
...
@@ -894,59 +949,43 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf
.
end
();
}
// Define attributes to read
for (uint field_no= 0; field_no < no_fields; field_no++)
{
Field *field= table->field[field_no];
// Read attribute
DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
if (get_ndb_value(op, field_no, field->ptr))
ERR_RETURN(op->getNdbError());
}
if (table->primary_key == MAX_KEY)
{
DBUG_PRINT("info", ("Getting hidden key"));
// Scanning table with no primary key
int hidden_no= no_fields;
#ifndef DBUG_OFF
const NDBTAB *tab= (NDBTAB *) m_table;
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
if (get_ndb_value(op, hidden_no, NULL))
ERR_RETURN(op->getNdbError());
}
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
}
#endif
/*
Read records from NDB using full table scan
Start full table scan in NDB
*/
int
ha_ndbcluster
::
full_table_scan
(
byte
*
buf
)
{
uint
i
;
THD
*
thd
=
current_thd
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
;
NdbScanOperation
*
op
;
NdbConnection
*
trans
=
m_active_trans
;
DBUG_ENTER
(
"full_table_scan"
);
DBUG_PRINT
(
"enter"
,
(
"Starting new scan on %s"
,
m_tabname
));
if
(
!
(
op
=
trans
->
getNdbScanOperation
(
m_tabname
)))
ERR_RETURN
(
trans
->
getNdbError
());
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
)))
if
(
!
(
cursor
=
op
->
readTuples
(
parallelism
,
(
NdbCursorOperation
::
LockMode
)
get_ndb_lock_type
())))
ERR_RETURN
(
trans
->
getNdbError
());
m_active_cursor
=
cursor
;
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
}
inline
int
ha_ndbcluster
::
define_read_attrs
(
byte
*
buf
,
NdbOperation
*
op
)
{
uint
i
;
THD
*
thd
=
current_thd
;
NdbConnection
*
trans
=
m_active_trans
;
DBUG_ENTER
(
"define_read_attrs"
);
// Define attributes to read
for
(
i
=
0
;
i
<
table
->
fields
;
i
++
)
...
...
@@ -1042,7 +1081,8 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted
++
;
if
((
rows_inserted
%
bulk_insert_rows
)
==
0
)
if
((
rows_inserted
==
rows_to_insert
)
||
((
rows_inserted
%
bulk_insert_rows
)
==
0
))
{
// Send rows to NDB
DBUG_PRINT
(
"info"
,
(
"Sending inserts to NDB, "
\
...
...
@@ -1097,6 +1137,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
THD
*
thd
=
current_thd
;
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbOperation
*
op
;
uint
i
;
DBUG_ENTER
(
"update_row"
);
...
...
@@ -1105,6 +1146,27 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if
(
table
->
timestamp_on_update_now
)
update_timestamp
(
new_data
+
table
->
timestamp_on_update_now
-
1
);
/* Check for update of primary key and return error */
if
((
table
->
primary_key
!=
MAX_KEY
)
&&
(
key_cmp
(
table
->
primary_key
,
old_data
,
new_data
)))
DBUG_RETURN
(
HA_ERR_UNSUPPORTED
);
if
(
cursor
)
{
/*
We are scanning records and want to update the record
that was just found, call updateTuple on the cursor
to take over the lock to a new update operation
And thus setting the primary key of the record from
the active record in cursor
*/
DBUG_PRINT
(
"info"
,
(
"Calling updateTuple on cursor"
));
if
(
!
(
op
=
cursor
->
updateTuple
()))
ERR_RETURN
(
trans
->
getNdbError
());
ops_pending
++
;
}
else
{
if
(
!
(
op
=
trans
->
getNdbOperation
(
m_tabname
))
||
op
->
updateTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
...
...
@@ -1126,19 +1188,15 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
else
{
/* Check for update of primary key and return error */
if
(
key_cmp
(
table
->
primary_key
,
old_data
,
new_data
))
DBUG_RETURN
(
HA_ERR_UNSUPPORTED
);
int
res
;
if
((
res
=
set_primary_key
(
op
,
old_data
+
table
->
null_bytes
)))
DBUG_RETURN
(
res
);
}
}
// Set non-key attribute(s)
for
(
i
=
0
;
i
<
table
->
fields
;
i
++
)
{
Field
*
field
=
table
->
field
[
i
];
if
((
thd
->
query_id
==
field
->
query_id
)
&&
(
!
(
field
->
flags
&
PRI_KEY_FLAG
))
&&
...
...
@@ -1147,7 +1205,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
// Execute update operation
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
if
(
!
cursor
&&
trans
->
execute
(
NoCommit
)
!=
0
)
DBUG_RETURN
(
ndb_err
(
trans
));
DBUG_RETURN
(
0
);
...
...
@@ -1161,11 +1219,32 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
int
ha_ndbcluster
::
delete_row
(
const
byte
*
record
)
{
NdbConnection
*
trans
=
m_active_trans
;
NdbResultSet
*
cursor
=
m_active_cursor
;
NdbOperation
*
op
;
DBUG_ENTER
(
"delete_row"
);
statistic_increment
(
ha_delete_count
,
&
LOCK_status
);
if
(
cursor
)
{
/*
We are scanning records and want to update the record
that was just found, call deleteTuple on the cursor
to take over the lock to a new update operation
And thus setting the primary key of the record from
the active record in cursor
*/
DBUG_PRINT
(
"info"
,
(
"Calling deleteTuple on cursor"
));
if
(
cursor
->
deleteTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
ops_pending
++
;
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN
(
0
);
}
else
{
if
(
!
(
op
=
trans
->
getNdbOperation
(
m_tabname
))
||
op
->
deleteTuple
()
!=
0
)
ERR_RETURN
(
trans
->
getNdbError
());
...
...
@@ -1187,6 +1266,7 @@ int ha_ndbcluster::delete_row(const byte *record)
if
((
res
=
set_primary_key
(
op
)))
return
res
;
}
}
// Execute delete operation
if
(
trans
->
execute
(
NoCommit
)
!=
0
)
...
...
@@ -1481,11 +1561,7 @@ int ha_ndbcluster::index_next(byte *buf)
int
error
=
1
;
statistic_increment
(
ha_read_next_count
,
&
LOCK_status
);
if
(
!
m_active_cursor
)
error
=
HA_ERR_END_OF_FILE
;
else
error
=
next_result
(
buf
);
DBUG_RETURN
(
error
);
DBUG_RETURN
(
next_result
(
buf
));
}
...
...
@@ -1519,7 +1595,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
{
KEY
*
key_info
;
int
error
=
1
;
byte
*
buf
=
table
->
record
[
0
];
byte
*
buf
=
table
->
record
[
0
];
DBUG_ENTER
(
"ha_ndbcluster::read_range_first"
);
DBUG_PRINT
(
"info"
,
(
"sorted: %d"
,
sorted
));
...
...
@@ -1548,6 +1624,7 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
DBUG_RETURN
(
error
);
}
int
ha_ndbcluster
::
read_range_next
(
bool
eq_range
)
{
DBUG_ENTER
(
"ha_ndbcluster::read_range_next"
);
...
...
@@ -1587,12 +1664,10 @@ int ha_ndbcluster::rnd_next(byte *buf)
{
DBUG_ENTER
(
"rnd_next"
);
statistic_increment
(
ha_read_rnd_next_count
,
&
LOCK_status
);
int
error
=
1
;
if
(
!
m_active_cursor
)
error
=
full_table_scan
(
buf
);
else
error
=
next_result
(
buf
);
DBUG_RETURN
(
error
);
DBUG_RETURN
(
full_table_scan
(
buf
));
DBUG_RETURN
(
next_result
(
buf
));
}
...
...
@@ -1921,6 +1996,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
}
*
to
++=
&
m_lock
;
DBUG_PRINT
(
"exit"
,
(
"lock_type: %d"
,
lock_type
));
DBUG_RETURN
(
to
);
}
...
...
@@ -2034,8 +2111,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
(
NdbConnection
*
)
thd
->
transaction
.
stmt
.
ndb_tid
;
DBUG_ASSERT
(
m_active_trans
);
// Start of transaction
retrieve_all_fields
=
FALSE
;
ops_pending
=
0
;
}
else
{
...
...
@@ -2087,7 +2165,9 @@ int ha_ndbcluster::start_stmt(THD *thd)
}
m_active_trans
=
trans
;
// Start of statement
retrieve_all_fields
=
FALSE
;
ops_pending
=
0
;
DBUG_RETURN
(
error
);
}
...
...
@@ -2568,7 +2648,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
retrieve_all_fields
(
FALSE
),
rows_to_insert
(
0
),
rows_inserted
(
0
),
bulk_insert_rows
(
1024
)
bulk_insert_rows
(
1024
),
ops_pending
(
0
)
{
int
i
;
...
...
sql/ha_ndbcluster.h
View file @
787c9786
...
...
@@ -152,6 +152,7 @@ class ha_ndbcluster: public handler
const
char
*
get_unique_index_name
(
uint
idx_no
)
const
;
NDB_INDEX_TYPE
get_index_type
(
uint
idx_no
)
const
;
NDB_INDEX_TYPE
get_index_type_from_table
(
uint
index_no
)
const
;
int
get_ndb_lock_type
();
int
pk_read
(
const
byte
*
key
,
uint
key_len
,
byte
*
buf
);
...
...
@@ -162,12 +163,10 @@ class ha_ndbcluster: public handler
bool
sorted
,
byte
*
buf
);
int
full_table_scan
(
byte
*
buf
);
int
next_result
(
byte
*
buf
);
#if 0
int
define_read_attrs
(
byte
*
buf
,
NdbOperation
*
op
);
int
filtered_scan
(
const
byte
*
key
,
uint
key_len
,
byte
*
buf
,
enum
ha_rkey_function
find_flag
);
#endif
void
unpack_record
(
byte
*
buf
);
void
set_dbname
(
const
char
*
pathname
);
...
...
@@ -212,6 +211,7 @@ class ha_ndbcluster: public handler
ha_rows
rows_to_insert
;
ha_rows
rows_inserted
;
ha_rows
bulk_insert_rows
;
ha_rows
ops_pending
;
};
bool
ndbcluster_init
(
void
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment