Commit 8be3794b authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-21924 Clean up InnoDB GIS record comparison

The extension of the record comparison functions for SPATIAL INDEX in
mysql/mysql-server@b66ad511b61fffe75c58d0a607cdb837c6e6c821
was suboptimal for multiple reasons:

Some functions used unnecessary temporary variables of the int type,
instead of the more appropriate size_t, causing type mismatch.

Many functions unnecessarily required rec_get_offsets() to be
computed, or a parameter for length, although the size of the
minimum bounding rectangle (MBR) is hard-coded as
SPDIMS * 2 * sizeof(double), or 32 bytes.

In InnoDB SPATIAL INDEX records, there always is a 32-byte key
followed by either a 4-byte child page number or the PRIMARY KEY value.

The length parameters were not properly validated.
The function cmp_geometry_field() was making an incorrect attempt
at checking that the lengths are at least sizeof(double) (8 bytes),
even though the function is accessing up to 32 bytes in both MBR.

Functions that are called from only one compilation unit are defined
in another compilation unit, making the code harder to follow and
potentially slower to execute.

cmp_dtuple_rec_with_gis(): FIXME: Correct the debug assertion
and possibly the function TABLE_SHARE::init_from_binary_frm_image()
or related code, which causes an unexpected length of
DATA_MBR_LEN + 2 bytes to be passed to this function.
parent 0d767778
......@@ -4357,7 +4357,7 @@ btr_check_node_ptr(
if (dict_index_is_spatial(index)) {
ut_a(!cmp_dtuple_rec_with_gis(
tuple, btr_cur_get_rec(&cursor),
offsets, PAGE_CUR_WITHIN));
PAGE_CUR_WITHIN));
} else {
ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
}
......
......@@ -580,193 +580,71 @@ split_rtree_node(
return(first_rec_group);
}
/*************************************************************//**
Compares two keys a and b depending on nextflag
nextflag can contain these flags:
/** Compare two minimum bounding rectangles.
@param mode comparison operator
MBR_INTERSECT(a,b) a overlaps b
MBR_CONTAIN(a,b) a contains b
MBR_DISJOINT(a,b) a disjoint b
MBR_WITHIN(a,b) a within b
MBR_EQUAL(a,b) All coordinates of MBRs are equal
Return 0 on success, otherwise 1. */
int
rtree_key_cmp(
/*==========*/
page_cur_mode_t mode, /*!< in: compare method. */
const uchar* b, /*!< in: first key. */
int,
const uchar* a, /*!< in: second key. */
int a_len) /*!< in: second key len. */
MBR_DATA(a,b) Data reference is the same
@param b first MBR
@param a second MBR
@retval 0 if the predicate holds
@retval 1 if the precidate does not hold */
int rtree_key_cmp(page_cur_mode_t mode, const void *b, const void *a)
{
double amin, amax, bmin, bmax;
int key_len;
int keyseg_len;
keyseg_len = 2 * sizeof(double);
for (key_len = a_len; key_len > 0; key_len -= keyseg_len) {
amin = mach_double_read(a);
bmin = mach_double_read(b);
amax = mach_double_read(a + sizeof(double));
bmax = mach_double_read(b + sizeof(double));
switch (mode) {
case PAGE_CUR_INTERSECT:
if (INTERSECT_CMP(amin, amax, bmin, bmax)) {
return(1);
}
break;
case PAGE_CUR_CONTAIN:
if (CONTAIN_CMP(amin, amax, bmin, bmax)) {
return(1);
}
break;
case PAGE_CUR_WITHIN:
if (WITHIN_CMP(amin, amax, bmin, bmax)) {
return(1);
}
break;
case PAGE_CUR_MBR_EQUAL:
if (EQUAL_CMP(amin, amax, bmin, bmax)) {
return(1);
}
break;
case PAGE_CUR_DISJOINT:
int result;
result = DISJOINT_CMP(amin, amax, bmin, bmax);
if (result == 0) {
return(0);
}
if (key_len - keyseg_len <= 0) {
return(1);
}
break;
default:
/* if unknown comparison operator */
ut_ad(0);
}
a += keyseg_len;
b += keyseg_len;
}
return(0);
}
/*************************************************************//**
Calculates MBR_AREA(a+b) - MBR_AREA(a)
Note: when 'a' and 'b' objects are far from each other,
the area increase can be really big, so this function
can return 'inf' as a result.
Return the area increaed. */
double
rtree_area_increase(
const uchar* a, /*!< in: original mbr. */
const uchar* b, /*!< in: new mbr. */
int mbr_len, /*!< in: mbr length of a and b. */
double* ab_area) /*!< out: increased area. */
{
double a_area = 1.0;
double loc_ab_area = 1.0;
double amin, amax, bmin, bmax;
int key_len;
int keyseg_len;
double data_round = 1.0;
keyseg_len = 2 * sizeof(double);
for (key_len = mbr_len; key_len > 0; key_len -= keyseg_len) {
double area;
amin = mach_double_read(a);
bmin = mach_double_read(b);
amax = mach_double_read(a + sizeof(double));
bmax = mach_double_read(b + sizeof(double));
area = amax - amin;
if (area == 0) {
a_area *= LINE_MBR_WEIGHTS;
} else {
a_area *= area;
}
area = (double)std::max(amax, bmax) -
(double)std::min(amin, bmin);
if (area == 0) {
loc_ab_area *= LINE_MBR_WEIGHTS;
} else {
loc_ab_area *= area;
}
/* Value of amax or bmin can be so large that small difference
are ignored. For example: 3.2884281489988079e+284 - 100 =
3.2884281489988079e+284. This results some area difference
are not detected */
if (loc_ab_area == a_area) {
if (bmin < amin || bmax > amax) {
data_round *= ((double)std::max(amax, bmax)
- amax
+ (amin - (double)std::min(
amin, bmin)));
} else {
data_round *= area;
}
}
a += keyseg_len;
b += keyseg_len;
}
*ab_area = loc_ab_area;
if (loc_ab_area == a_area && data_round != 1.0) {
return(data_round);
}
return(loc_ab_area - a_area);
}
/** Calculates overlapping area
@param[in] a mbr a
@param[in] b mbr b
@param[in] mbr_len mbr length
@return overlapping area */
double
rtree_area_overlapping(
const uchar* a,
const uchar* b,
int mbr_len)
{
double area = 1.0;
double amin;
double amax;
double bmin;
double bmax;
int key_len;
int keyseg_len;
keyseg_len = 2 * sizeof(double);
for (key_len = mbr_len; key_len > 0; key_len -= keyseg_len) {
amin = mach_double_read(a);
bmin = mach_double_read(b);
amax = mach_double_read(a + sizeof(double));
bmax = mach_double_read(b + sizeof(double));
amin = std::max(amin, bmin);
amax = std::min(amax, bmax);
if (amin > amax) {
return(0);
} else {
area *= (amax - amin);
}
a += keyseg_len;
b += keyseg_len;
}
return(area);
const byte *b_= static_cast<const byte*>(b);
const byte *a_= static_cast<const byte*>(a);
static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double), "compatibility");
for (auto i = SPDIMS; i--; )
{
double amin= mach_double_read(a_);
double bmin= mach_double_read(b_);
a_+= sizeof(double);
b_+= sizeof(double);
double amax= mach_double_read(a_);
double bmax= mach_double_read(b_);
a_+= sizeof(double);
b_+= sizeof(double);
switch (mode) {
case PAGE_CUR_INTERSECT:
if (INTERSECT_CMP(amin, amax, bmin, bmax))
return 1;
continue;
case PAGE_CUR_CONTAIN:
if (CONTAIN_CMP(amin, amax, bmin, bmax))
return 1;
continue;
case PAGE_CUR_WITHIN:
if (WITHIN_CMP(amin, amax, bmin, bmax))
return 1;
continue;
case PAGE_CUR_MBR_EQUAL:
if (EQUAL_CMP(amin, amax, bmin, bmax))
return 1;
continue;
case PAGE_CUR_DISJOINT:
if (!DISJOINT_CMP(amin, amax, bmin, bmax))
return 0;
if (!i)
return 1;
continue;
case PAGE_CUR_UNSUPP:
case PAGE_CUR_G:
case PAGE_CUR_GE:
case PAGE_CUR_L:
case PAGE_CUR_LE:
case PAGE_CUR_RTREE_LOCATE:
case PAGE_CUR_RTREE_GET_FATHER:
case PAGE_CUR_RTREE_INSERT:
break;
}
ut_ad("unknown comparison operator" == 0);
}
return 0;
}
......@@ -1673,6 +1673,113 @@ rtr_check_same_block(
return(false);
}
/*************************************************************//**
Calculates MBR_AREA(a+b) - MBR_AREA(a)
Note: when 'a' and 'b' objects are far from each other,
the area increase can be really big, so this function
can return 'inf' as a result.
Return the area increaed. */
static double
rtree_area_increase(
const uchar* a, /*!< in: original mbr. */
const uchar* b, /*!< in: new mbr. */
double* ab_area) /*!< out: increased area. */
{
double a_area = 1.0;
double loc_ab_area = 1.0;
double amin, amax, bmin, bmax;
double data_round = 1.0;
static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double),
"compatibility");
for (auto i = SPDIMS; i--; ) {
double area;
amin = mach_double_read(a);
bmin = mach_double_read(b);
amax = mach_double_read(a + sizeof(double));
bmax = mach_double_read(b + sizeof(double));
a += 2 * sizeof(double);
b += 2 * sizeof(double);
area = amax - amin;
if (area == 0) {
a_area *= LINE_MBR_WEIGHTS;
} else {
a_area *= area;
}
area = (double)std::max(amax, bmax) -
(double)std::min(amin, bmin);
if (area == 0) {
loc_ab_area *= LINE_MBR_WEIGHTS;
} else {
loc_ab_area *= area;
}
/* Value of amax or bmin can be so large that small difference
are ignored. For example: 3.2884281489988079e+284 - 100 =
3.2884281489988079e+284. This results some area difference
are not detected */
if (loc_ab_area == a_area) {
if (bmin < amin || bmax > amax) {
data_round *= ((double)std::max(amax, bmax)
- amax
+ (amin - (double)std::min(
amin, bmin)));
} else {
data_round *= area;
}
}
}
*ab_area = loc_ab_area;
if (loc_ab_area == a_area && data_round != 1.0) {
return(data_round);
}
return(loc_ab_area - a_area);
}
/** Calculates overlapping area
@param[in] a mbr a
@param[in] b mbr b
@return overlapping area */
static double rtree_area_overlapping(const byte *a, const byte *b)
{
double area = 1.0;
double amin;
double amax;
double bmin;
double bmax;
static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double),
"compatibility");
for (auto i = SPDIMS; i--; ) {
amin = mach_double_read(a);
bmin = mach_double_read(b);
amax = mach_double_read(a + sizeof(double));
bmax = mach_double_read(b + sizeof(double));
a += 2 * sizeof(double);
b += 2 * sizeof(double);
amin = std::max(amin, bmin);
amax = std::min(amax, bmax);
if (amin > amax) {
return(0);
} else {
area *= (amax - amin);
}
}
return(area);
}
/****************************************************************//**
Calculate the area increased for a new record
@return area increased */
......@@ -1685,28 +1792,20 @@ rtr_rec_cal_increase(
dtuple in some of the common fields, or which
has an equal number or more fields than
dtuple */
const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */
double* area) /*!< out: increased area */
{
const dfield_t* dtuple_field;
ulint dtuple_f_len;
ulint rec_f_len;
const byte* rec_b_ptr;
double ret = 0;
ut_ad(!page_rec_is_supremum(rec));
ut_ad(!page_rec_is_infimum(rec));
dtuple_field = dtuple_get_nth_field(dtuple, 0);
dtuple_f_len = dfield_get_len(dtuple_field);
rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
ret = rtree_area_increase(
rec_b_ptr,
static_cast<const byte*>(dfield_get_data(dtuple_field)),
static_cast<int>(dtuple_f_len), area);
ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN);
return(ret);
return rtree_area_increase(rec,
static_cast<const byte*>(
dfield_get_data(dtuple_field)),
area);
}
/** Estimates the number of rows in a given area.
......@@ -1800,10 +1899,9 @@ rtr_estimate_n_rows_in_range(
case PAGE_CUR_WITHIN:
case PAGE_CUR_MBR_EQUAL:
if (rtree_key_cmp(
if (!rtree_key_cmp(
PAGE_CUR_WITHIN, range_mbr_ptr,
DATA_MBR_LEN, rec, DATA_MBR_LEN)
== 0) {
rec)) {
area += 1;
}
......@@ -1817,14 +1915,14 @@ rtr_estimate_n_rows_in_range(
case PAGE_CUR_CONTAIN:
case PAGE_CUR_INTERSECT:
area += rtree_area_overlapping(
range_mbr_ptr, rec, DATA_MBR_LEN)
range_mbr_ptr, rec)
/ rec_area;
break;
case PAGE_CUR_DISJOINT:
area += 1;
area -= rtree_area_overlapping(
range_mbr_ptr, rec, DATA_MBR_LEN)
range_mbr_ptr, rec)
/ rec_area;
break;
......@@ -1832,7 +1930,7 @@ rtr_estimate_n_rows_in_range(
case PAGE_CUR_MBR_EQUAL:
if (!rtree_key_cmp(
PAGE_CUR_WITHIN, range_mbr_ptr,
DATA_MBR_LEN, rec, DATA_MBR_LEN)) {
rec)) {
area += range_area / rec_area;
}
......
......@@ -1620,6 +1620,60 @@ rtr_get_mbr_from_tuple(
mbr);
}
/** Compare minimum bounding rectangles.
@return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
1, 0 for rest compare modes, depends on a and b qualifies the
relationship (CONTAINS, WITHIN etc.) */
static int cmp_gis_field(page_cur_mode_t mode, const void *a, const void *b)
{
return mode == PAGE_CUR_MBR_EQUAL
? cmp_geometry_field(a, b)
: rtree_key_cmp(mode, a, b);
}
/** Compare a GIS data tuple to a physical record in rtree non-leaf node.
We need to check the page number field, since we don't store pk field in
rtree non-leaf node.
@param[in] dtuple data tuple
@param[in] rec R-tree record
@return whether dtuple is less than rec */
static bool
cmp_dtuple_rec_with_gis_internal(const dtuple_t* dtuple, const rec_t* rec)
{
const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN);
if (cmp_gis_field(PAGE_CUR_WITHIN, dfield_get_data(dtuple_field), rec))
return true;
dtuple_field= dtuple_get_nth_field(dtuple, 1);
ut_ad(dfield_get_len(dtuple_field) == 4); /* child page number */
ut_ad(dtuple_field->type.mtype == DATA_SYS_CHILD);
ut_ad(!(dtuple_field->type.prtype & ~DATA_NOT_NULL));
return memcmp(dtuple_field->data, rec + DATA_MBR_LEN, 4) != 0;
}
#ifndef UNIV_DEBUG
static
#endif
/** Compare a GIS data tuple to a physical record.
@param[in] dtuple data tuple
@param[in] rec R-tree record
@param[in] mode compare mode
@retval negative if dtuple is less than rec */
int cmp_dtuple_rec_with_gis(const dtuple_t *dtuple, const rec_t *rec,
page_cur_mode_t mode)
{
const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
/* FIXME: TABLE_SHARE::init_from_binary_frm_image() is adding
field->key_part_length_bytes() to the key length */
ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN ||
dfield_get_len(dtuple_field) == DATA_MBR_LEN + 2);
return cmp_gis_field(mode, dfield_get_data(dtuple_field), rec);
}
/****************************************************************//**
Searches the right position in rtree for a page cursor. */
bool
......@@ -1701,9 +1755,6 @@ rtr_cur_search_with_match(
}
while (!page_rec_is_supremum(rec)) {
offsets = rec_get_offsets(rec, index, offsets, is_leaf,
dtuple_get_n_fields_cmp(tuple),
&heap);
if (!is_leaf) {
switch (mode) {
case PAGE_CUR_CONTAIN:
......@@ -1713,21 +1764,21 @@ rtr_cur_search_with_match(
both CONTAIN and INTERSECT for either of
the search mode */
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets, PAGE_CUR_CONTAIN);
tuple, rec, PAGE_CUR_CONTAIN);
if (cmp != 0) {
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets,
tuple, rec,
PAGE_CUR_INTERSECT);
}
break;
case PAGE_CUR_DISJOINT:
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets, mode);
tuple, rec, mode);
if (cmp != 0) {
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets,
tuple, rec,
PAGE_CUR_INTERSECT);
}
break;
......@@ -1736,11 +1787,11 @@ rtr_cur_search_with_match(
double area;
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets, PAGE_CUR_WITHIN);
tuple, rec, PAGE_CUR_WITHIN);
if (cmp != 0) {
increase = rtr_rec_cal_increase(
tuple, rec, offsets, &area);
tuple, rec, &area);
/* Once it goes beyond DBL_MAX,
it would not make sense to record
such value, just make it
......@@ -1763,19 +1814,19 @@ rtr_cur_search_with_match(
break;
case PAGE_CUR_RTREE_GET_FATHER:
cmp = cmp_dtuple_rec_with_gis_internal(
tuple, rec, offsets);
tuple, rec);
break;
default:
/* WITHIN etc. */
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets, mode);
tuple, rec, mode);
}
} else {
/* At leaf level, INSERT should translate to LE */
ut_ad(mode != PAGE_CUR_RTREE_INSERT);
cmp = cmp_dtuple_rec_with_gis(
tuple, rec, offsets, mode);
tuple, rec, mode);
}
if (cmp == 0) {
......@@ -1948,17 +1999,10 @@ rtr_cur_search_with_match(
} else {
if (mode == PAGE_CUR_RTREE_INSERT) {
ulint child_no;
ut_ad(!last_match_rec && rec);
offsets = rec_get_offsets(
rec, index, offsets, false,
ULINT_UNDEFINED, &heap);
child_no = btr_node_ptr_get_child_page_no(rec, offsets);
ut_ad(!last_match_rec);
rtr_non_leaf_insert_stack_push(
index, rtr_info->parent_path, level, child_no,
index, rtr_info->parent_path, level,
mach_read_from_4(rec + DATA_MBR_LEN),
block, rec, 0);
} else if (rtr_info && found && !is_leaf) {
......
/*****************************************************************************
Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2019, MariaDB Corporation.
Copyright (c) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -106,45 +106,17 @@ split_rtree_node(
int n_dim, /*!< in: dimensions. */
uchar* first_rec); /*!< in: the first rec. */
/*************************************************************//**
Compares two keys a and b depending on nextflag
nextflag can contain these flags:
/** Compare two minimum bounding rectangles.
@param mode comparison operator
MBR_INTERSECT(a,b) a overlaps b
MBR_CONTAIN(a,b) a contains b
MBR_DISJOINT(a,b) a disjoint b
MBR_WITHIN(a,b) a within b
MBR_EQUAL(a,b) All coordinates of MBRs are equal
MBR_DATA(a,b) Data reference is the same
Returns 0 on success. */
int
rtree_key_cmp(
/*==========*/
page_cur_mode_t mode, /*!< in: compare method. */
const uchar* b, /*!< in: first key. */
int b_len, /*!< in: first key len. */
const uchar* a, /*!< in: second key. */
int a_len); /*!< in: second key len. */
/*************************************************************//**
Calculates MBR_AREA(a+b) - MBR_AREA(a)
Note: when 'a' and 'b' objects are far from each other,
the area increase can be really big, so this function
can return 'inf' as a result. */
double
rtree_area_increase(
const uchar* a, /*!< in: first mbr. */
const uchar* b, /*!< in: second mbr. */
int a_len, /*!< in: mbr length. */
double* ab_area); /*!< out: increased area. */
/** Calculates overlapping area
@param[in] a mbr a
@param[in] b mbr b
@param[in] mbr_len mbr length
@return overlapping area */
double
rtree_area_overlapping(
const uchar* a,
const uchar* b,
int mbr_len);
@param b first MBR
@param a second MBR
@retval 0 if the predicate holds
@retval 1 if the precidate does not hold */
int rtree_key_cmp(page_cur_mode_t mode, const void *b, const void *a);
#endif
......@@ -151,7 +151,6 @@ rtr_rec_cal_increase(
dtuple in some of the common fields, or which
has an equal number or more fields than
dtuple */
const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */
double* area); /*!< out: increased area */
/****************************************************************//**
......
......@@ -77,35 +77,62 @@ cmp_dfield_dfield(
const dfield_t* dfield1,/*!< in: data field; must have type field set */
const dfield_t* dfield2);/*!< in: data field */
#ifdef UNIV_DEBUG
/** Compare a GIS data tuple to a physical record.
@param[in] dtuple data tuple
@param[in] rec R-tree record
@param[in] offsets rec_get_offsets(rec)
@param[in] mode compare mode
@retval negative if dtuple is less than rec */
int
cmp_dtuple_rec_with_gis(
/*====================*/
const dtuple_t* dtuple,
const rec_t* rec,
const offset_t* offsets,
page_cur_mode_t mode)
MY_ATTRIBUTE((nonnull));
int cmp_dtuple_rec_with_gis(const dtuple_t *dtuple, const rec_t *rec,
page_cur_mode_t mode)
MY_ATTRIBUTE((nonnull));
#endif
/** Compare a GIS data tuple to a physical record in rtree non-leaf node.
We need to check the page number field, since we don't store pk field in
rtree non-leaf node.
@param[in] dtuple data tuple
@param[in] rec R-tree record
@param[in] offsets rec_get_offsets(rec)
@param[in] mode compare mode
@retval negative if dtuple is less than rec */
int
cmp_dtuple_rec_with_gis_internal(
const dtuple_t* dtuple,
const rec_t* rec,
const offset_t* offsets);
/** Compare two minimum bounding rectangles.
@return 1, 0, -1, if a is greater, equal, less than b, respectively */
inline int cmp_geometry_field(const void *a, const void *b)
{
const byte *mbr1= static_cast<const byte*>(a);
const byte *mbr2= static_cast<const byte*>(b);
static_assert(SPDIMS == 2, "compatibility");
static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double), "compatibility");
/* Try to compare mbr left lower corner (xmin, ymin) */
double x1= mach_double_read(mbr1);
double x2= mach_double_read(mbr2);
if (x1 > x2)
return 1;
if (x2 > x1)
return -1;
double y1= mach_double_read(mbr1 + sizeof(double) * SPDIMS);
double y2= mach_double_read(mbr2 + sizeof(double) * SPDIMS);
if (y1 > y2)
return 1;
if (y2 > y1)
return -1;
/* left lower corner (xmin, ymin) overlaps, now right upper corner */
x1= mach_double_read(mbr1 + sizeof(double));
x2= mach_double_read(mbr2 + sizeof(double));
if (x1 > x2)
return 1;
if (x2 > x1)
return -1;
y1= mach_double_read(mbr1 + sizeof(double) * 2 + sizeof(double));
y2= mach_double_read(mbr2 + sizeof(double) * 2 + sizeof(double));
if (y1 > y2)
return 1;
if (y2 > y1)
return -1;
return 0;
}
/** Compare a data tuple to a physical record.
@param[in] dtuple data tuple
......
......@@ -62,9 +62,9 @@ int
innobase_mysql_cmp(
ulint prtype,
const byte* a,
unsigned int a_length,
ulint a_length,
const byte* b,
unsigned int b_length)
ulint b_length)
{
#ifdef UNIV_DEBUG
switch (prtype & DATA_MYSQL_TYPE_MASK) {
......@@ -154,11 +154,7 @@ TODO: Remove this function. Everything should use MYSQL_TYPE_NEWDECIMAL.
respectively */
static ATTRIBUTE_COLD
int
cmp_decimal(
const byte* a,
unsigned int a_length,
const byte* b,
unsigned int b_length)
cmp_decimal(const byte* a, ulint a_length, const byte* b, ulint b_length)
{
int swap_flag;
......@@ -216,166 +212,6 @@ cmp_decimal(
return(swap_flag);
}
/*************************************************************//**
Innobase uses this function to compare two geometry data fields
@return 1, 0, -1, if a is greater, equal, less than b, respectively */
static
int
cmp_geometry_field(
/*===============*/
ulint prtype, /*!< in: precise type */
const byte* a, /*!< in: data field */
unsigned int a_length, /*!< in: data field length,
not UNIV_SQL_NULL */
const byte* b, /*!< in: data field */
unsigned int b_length) /*!< in: data field length,
not UNIV_SQL_NULL */
{
double x1, x2;
double y1, y2;
ut_ad(prtype & DATA_GIS_MBR);
if (a_length < sizeof(double) || b_length < sizeof(double)) {
return(0);
}
/* Try to compare mbr left lower corner (xmin, ymin) */
x1 = mach_double_read(a);
x2 = mach_double_read(b);
y1 = mach_double_read(a + sizeof(double) * SPDIMS);
y2 = mach_double_read(b + sizeof(double) * SPDIMS);
if (x1 > x2) {
return(1);
} else if (x2 > x1) {
return(-1);
}
if (y1 > y2) {
return(1);
} else if (y2 > y1) {
return(-1);
}
/* left lower corner (xmin, ymin) overlaps, now right upper corner */
x1 = mach_double_read(a + sizeof(double));
x2 = mach_double_read(b + sizeof(double));
y1 = mach_double_read(a + sizeof(double) * SPDIMS + sizeof(double));
y2 = mach_double_read(b + sizeof(double) * SPDIMS + sizeof(double));
if (x1 > x2) {
return(1);
} else if (x2 > x1) {
return(-1);
}
if (y1 > y2) {
return(1);
} else if (y2 > y1) {
return(-1);
}
return(0);
}
/*************************************************************//**
Innobase uses this function to compare two gis data fields
@return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
1, 0 for rest compare modes, depends on a and b qualifies the
relationship (CONTAINT, WITHIN etc.) */
static
int
cmp_gis_field(
/*============*/
page_cur_mode_t mode, /*!< in: compare mode */
const byte* a, /*!< in: data field */
unsigned int a_length, /*!< in: data field length,
not UNIV_SQL_NULL */
const byte* b, /*!< in: data field */
unsigned int b_length) /*!< in: data field length,
not UNIV_SQL_NULL */
{
if (mode == PAGE_CUR_MBR_EQUAL) {
return cmp_geometry_field(DATA_GIS_MBR,
a, a_length, b, b_length);
} else {
return rtree_key_cmp(mode, a, int(a_length), b, int(b_length));
}
}
/** Compare two data fields.
@param[in] mtype main type
@param[in] prtype precise type
@param[in] a data field
@param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
@param[in] b data field
@param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
@return positive, 0, negative, if a is greater, equal, less than b,
respectively */
static
int
cmp_whole_field(
ulint mtype,
ulint prtype,
const byte* a,
unsigned int a_length,
const byte* b,
unsigned int b_length)
{
float f_1;
float f_2;
double d_1;
double d_2;
switch (mtype) {
case DATA_DECIMAL:
return(cmp_decimal(a, a_length, b, b_length));
case DATA_DOUBLE:
d_1 = mach_double_read(a);
d_2 = mach_double_read(b);
if (d_1 > d_2) {
return(1);
} else if (d_2 > d_1) {
return(-1);
}
return(0);
case DATA_FLOAT:
f_1 = mach_float_read(a);
f_2 = mach_float_read(b);
if (f_1 > f_2) {
return(1);
} else if (f_2 > f_1) {
return(-1);
}
return(0);
case DATA_VARCHAR:
case DATA_CHAR:
return(my_charset_latin1.strnncollsp(a, a_length, b, b_length));
case DATA_BLOB:
if (prtype & DATA_BINARY_TYPE) {
ib::error() << "Comparing a binary BLOB"
" using a character set collation!";
ut_ad(0);
}
/* fall through */
case DATA_VARMYSQL:
case DATA_MYSQL:
return(innobase_mysql_cmp(prtype,
a, a_length, b, b_length));
case DATA_GEOMETRY:
return cmp_geometry_field(prtype, a, a_length, b, b_length);
default:
ib::fatal() << "Unknown data type number " << mtype;
}
return(0);
}
/** Compare two data fields.
@param[in] mtype main type
@param[in] prtype precise type
......@@ -413,6 +249,8 @@ cmp_data(
ulint pad;
switch (mtype) {
default:
ib::fatal() << "Unknown data type number " << mtype;
case DATA_FIXBINARY:
case DATA_BINARY:
if (dtype_get_charset_coll(prtype)
......@@ -428,23 +266,56 @@ cmp_data(
break;
case DATA_GEOMETRY:
ut_ad(prtype & DATA_BINARY_TYPE);
pad = ULINT_UNDEFINED;
if (prtype & DATA_GIS_MBR) {
return(cmp_whole_field(mtype, prtype,
data1, (unsigned) len1,
data2, (unsigned) len2));
ut_ad(len1 == DATA_MBR_LEN);
ut_ad(len2 == DATA_MBR_LEN);
return cmp_geometry_field(data1, data2);
}
pad = ULINT_UNDEFINED;
break;
case DATA_BLOB:
if (prtype & DATA_BINARY_TYPE) {
pad = ULINT_UNDEFINED;
break;
}
if (prtype & DATA_BINARY_TYPE) {
ib::error() << "Comparing a binary BLOB"
" using a character set collation!";
ut_ad(0);
}
/* fall through */
default:
return(cmp_whole_field(mtype, prtype,
data1, (unsigned) len1,
data2, (unsigned) len2));
case DATA_VARMYSQL:
case DATA_MYSQL:
return innobase_mysql_cmp(prtype, data1, len1, data2, len2);
case DATA_VARCHAR:
case DATA_CHAR:
return my_charset_latin1.strnncollsp(data1, len1, data2, len2);
case DATA_DECIMAL:
return cmp_decimal(data1, len1, data2, len2);
case DATA_DOUBLE:
{
double d_1 = mach_double_read(data1);
double d_2 = mach_double_read(data2);
if (d_1 > d_2) {
return 1;
} else if (d_2 > d_1) {
return -1;
}
}
return 0;
case DATA_FLOAT:
float f_1 = mach_float_read(data1);
float f_2 = mach_float_read(data2);
if (f_1 > f_2) {
return 1;
} else if (f_2 > f_1) {
return -1;
}
return 0;
}
ulint len = std::min(len1, len2);
......@@ -484,87 +355,6 @@ cmp_data(
return(cmp);
}
/** Compare a GIS data tuple to a physical record.
@param[in] dtuple data tuple
@param[in] rec R-tree record
@param[in] offsets rec_get_offsets(rec)
@param[in] mode compare mode
@retval negative if dtuple is less than rec */
int
cmp_dtuple_rec_with_gis(
/*====================*/
const dtuple_t* dtuple, /*!< in: data tuple */
const rec_t* rec, /*!< in: physical record which differs from
dtuple in some of the common fields, or which
has an equal number or more fields than
dtuple */
const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */
page_cur_mode_t mode) /*!< in: compare mode */
{
const dfield_t* dtuple_field; /* current field in logical record */
ulint dtuple_f_len; /* the length of the current field
in the logical record */
ulint rec_f_len; /* length of current field in rec */
const byte* rec_b_ptr; /* pointer to the current byte in
rec field */
int ret = 0; /* return value */
dtuple_field = dtuple_get_nth_field(dtuple, 0);
dtuple_f_len = dfield_get_len(dtuple_field);
rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
ret = cmp_gis_field(
mode, static_cast<const byte*>(dfield_get_data(dtuple_field)),
(unsigned) dtuple_f_len, rec_b_ptr, (unsigned) rec_f_len);
return(ret);
}
/** Compare a GIS data tuple to a physical record in rtree non-leaf node.
We need to check the page number field, since we don't store pk field in
rtree non-leaf node.
@param[in] dtuple data tuple
@param[in] rec R-tree record
@param[in] offsets rec_get_offsets(rec)
@retval negative if dtuple is less than rec */
int
cmp_dtuple_rec_with_gis_internal(
const dtuple_t* dtuple,
const rec_t* rec,
const offset_t* offsets)
{
const dfield_t* dtuple_field; /* current field in logical record */
ulint dtuple_f_len; /* the length of the current field
in the logical record */
ulint rec_f_len; /* length of current field in rec */
const byte* rec_b_ptr; /* pointer to the current byte in
rec field */
int ret = 0; /* return value */
dtuple_field = dtuple_get_nth_field(dtuple, 0);
dtuple_f_len = dfield_get_len(dtuple_field);
rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
ret = cmp_gis_field(
PAGE_CUR_WITHIN,
static_cast<const byte*>(dfield_get_data(dtuple_field)),
(unsigned) dtuple_f_len, rec_b_ptr, (unsigned) rec_f_len);
if (ret != 0) {
return(ret);
}
dtuple_field = dtuple_get_nth_field(dtuple, 1);
dtuple_f_len = dfield_get_len(dtuple_field);
rec_b_ptr = rec_get_nth_field(rec, offsets, 1, &rec_f_len);
return(cmp_data(dtuple_field->type.mtype,
dtuple_field->type.prtype,
static_cast<const byte*>(dtuple_field->data),
dtuple_f_len,
rec_b_ptr,
rec_f_len));
}
/** Compare two data fields.
@param[in] mtype main type
@param[in] prtype precise type
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment