Commit 02f00743 authored by monty@donna.mysql.fi's avatar monty@donna.mysql.fi

Added Unique class to be used for duplicate removal in multi-table delete.

parent 8685094e
......@@ -19,9 +19,6 @@
get_ptr_compare(len) returns a pointer to a optimal byte-compare function
for a array of stringpointer where all strings have size len.
The bytes are compare as unsigned chars.
Because the size is saved in a static variable.
When using threads the program must have called my_init and the thread
my_init_thread()
*/
#include "mysys_priv.h"
......
......@@ -205,7 +205,7 @@ make_tempname (filename)
strcpy (tmpname, filename);
strcat (tmpname, "/");
strcat (tmpname, template);
mktemp (tmpname);
mkstemp (tmpname);
*slash = c;
}
else
......
......@@ -56,7 +56,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h\
lex.h lex_symbol.h sql_acl.h sql_crypt.h md5.h \
log_event.h mini_client.h sql_repl.h slave.h \
stacktrace.h
stacktrace.h sql_sort.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
item_cmpfunc.cc item_strfunc.cc item_timefunc.cc \
......@@ -67,7 +67,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc \
mysqld.cc password.c hash_filo.cc hostname.cc \
convert.cc sql_parse.cc sql_yacc.yy \
sql_base.cc table.cc sql_select.cc sql_insert.cc \
sql_update.cc sql_delete.cc \
sql_update.cc sql_delete.cc uniques.cc \
procedure.cc item_uniq.cc sql_test.cc \
log.cc log_event.cc init.cc derror.cc sql_acl.cc \
unireg.cc \
......
......@@ -22,13 +22,11 @@
#include <stddef.h> /* for macro offsetof */
#endif
#include <m_ctype.h>
#include "sql_sort.h"
#ifndef THREAD
#define SKIPP_DBUG_IN_FILESORT
#endif
/* static variabels */
#define MERGEBUFF 7
#define MERGEBUFF2 15
/* How to write record_ref. */
......@@ -36,28 +34,6 @@
if (my_b_write((file),(byte*) (from),param->ref_length)) \
DBUG_RETURN(1);
typedef struct st_buffpek { /* Struktur om sorteringsbuffrarna */
my_off_t file_pos; /* Where we are in the sort file */
uchar *base,*key; /* key pointers */
ha_rows count; /* Number of rows in table */
ulong mem_count; /* numbers of keys in memory */
ulong max_keys; /* Max keys in buffert */
} BUFFPEK;
typedef struct st_sort_param {
uint sort_length; /* Length of sortarg */
uint keys; /* Max antal nycklar / buffert */
uint ref_length; /* Length of record ref. */
ha_rows max_rows,examined_rows;
TABLE *sort_form; /* For quicker make_sortkey */
SORT_FIELD *local_sortorder;
SORT_FIELD *end;
#ifdef USE_STRCOLL
char* tmp_buffer;
#endif
} SORTPARAM;
/* functions defined in this file */
static char **make_char_array(register uint fields, uint length, myf my_flag);
......@@ -70,20 +46,11 @@ static int write_keys(SORTPARAM *param,uchar * *sort_keys,
IO_CACHE *tempfile);
static void make_sortkey(SORTPARAM *param,uchar *to,
byte *ref_pos);
static bool save_index(SORTPARAM *param,uchar **sort_keys, uint count);
static int merge_many_buff(SORTPARAM *param,uchar * *sort_keys,
BUFFPEK *buffpek,
uint *maxbuffer, IO_CACHE *t_file);
static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
uint sort_length);
static int merge_buffers(SORTPARAM *param,IO_CACHE *from_file,
IO_CACHE *to_file,uchar * *sort_keys,
BUFFPEK *lastbuff,BUFFPEK *Fb,
BUFFPEK *Tb,int flag);
static int merge_index(SORTPARAM *param,uchar * *sort_keys,
static int merge_index(SORTPARAM *param,uchar *sort_buffer,
BUFFPEK *buffpek,
uint maxbuffer,IO_CACHE *tempfile,
IO_CACHE *outfile);
static bool save_index(SORTPARAM *param,uchar **sort_keys, uint count);
static uint sortlength(SORT_FIELD *sortorder,uint length);
/* Makes a indexfil of recordnumbers of a sorted database */
......@@ -225,12 +192,14 @@ ha_rows filesort(TABLE **table, SORT_FIELD *sortorder, uint s_length,
param.keys=((param.keys*(param.sort_length+sizeof(char*))) /
param.sort_length-1);
if (merge_many_buff(&param,sort_keys,buffpek,&maxbuffer,&tempfile))
if (merge_many_buff(&param,(uchar*) sort_keys,buffpek,&maxbuffer,
&tempfile))
goto err;
if (flush_io_cache(&tempfile) ||
reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
goto err;
if (merge_index(&param,sort_keys,buffpek,maxbuffer,&tempfile,outfile))
if (merge_index(&param,(uchar*) sort_keys,buffpek,maxbuffer,&tempfile,
outfile))
goto err;
}
if (records > param.max_rows)
......@@ -629,8 +598,8 @@ static bool save_index(SORTPARAM *param, uchar **sort_keys, uint count)
/* Merge buffers to make < MERGEBUFF2 buffers */
static int merge_many_buff(SORTPARAM *param, uchar **sort_keys,
BUFFPEK *buffpek, uint *maxbuffer, IO_CACHE *t_file)
int merge_many_buff(SORTPARAM *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint *maxbuffer, IO_CACHE *t_file)
{
register int i;
IO_CACHE t_file2,*from_file,*to_file,*temp;
......@@ -652,11 +621,11 @@ static int merge_many_buff(SORTPARAM *param, uchar **sort_keys,
lastbuff=buffpek;
for (i=0 ; i <= (int) *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
{
if (merge_buffers(param,from_file,to_file,sort_keys,lastbuff++,
if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
buffpek+i,buffpek+i+MERGEBUFF-1,0))
break; /* purecov: inspected */
}
if (merge_buffers(param,from_file,to_file,sort_keys,lastbuff++,
if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
buffpek+i,buffpek+ *maxbuffer,0))
break; /* purecov: inspected */
if (flush_io_cache(to_file))
......@@ -675,8 +644,8 @@ static int merge_many_buff(SORTPARAM *param, uchar **sort_keys,
/* Read data to buffer */
/* This returns (uint) -1 if something goes wrong */
static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
uint sort_length)
uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
uint sort_length)
{
register uint count;
uint length;
......@@ -697,10 +666,10 @@ static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
/* Merge buffers to one buffer */
static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
IO_CACHE *to_file, uchar **sort_keys,
BUFFPEK *lastbuff, BUFFPEK *Fb, BUFFPEK *Tb,
int flag)
int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
IO_CACHE *to_file, uchar *sort_buffer,
BUFFPEK *lastbuff, BUFFPEK *Fb, BUFFPEK *Tb,
int flag)
{
int error;
uint sort_length,offset;
......@@ -711,21 +680,21 @@ static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
BUFFPEK *buffpek,**refpek;
QUEUE queue;
volatile bool *killed= &current_thd->killed;
qsort2_cmp cmp;
DBUG_ENTER("merge_buffers");
statistic_increment(filesort_merge_passes, &LOCK_status);
count=error=0;
offset=param->sort_length-param->ref_length;
offset=(sort_length=param->sort_length)-param->ref_length;
maxcount=(ulong) (param->keys/((uint) (Tb-Fb) +1));
to_start_filepos=my_b_tell(to_file);
strpos=(uchar*) sort_keys;
sort_length=param->sort_length;
strpos=(uchar*) sort_buffer;
max_rows=param->max_rows;
if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
(int (*) (void *, byte *,byte*))
get_ptr_compare(sort_length),(void*) &sort_length))
(cmp=get_ptr_compare(sort_length)),(void*) &sort_length))
DBUG_RETURN(1); /* purecov: inspected */
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
{
......@@ -739,6 +708,26 @@ static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
queue_insert(&queue,(byte*) buffpek);
}
if (param->unique_buff)
{
/*
Called by Unique::get()
Copy the first argument to param->unique_buff for unique removal.
Store it also in 'to_file'.
This is safe as we know that there is always more than one element
in each block to merge (This is guaranteed by the Unique:: algorithm
*/
buffpek=(BUFFPEK*) queue_top(&queue);
memcpy(param->unique_buff, buffpek->key, sort_length);
if (my_b_write(to_file,(byte*) buffpek->key, sort_length))
{
error=1; goto err; /* purecov: inspected */
}
}
else
cmp=0; // Not unique
while (queue.elements > 1)
{
if (*killed)
......@@ -748,6 +737,12 @@ static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
for (;;)
{
buffpek=(BUFFPEK*) queue_top(&queue);
if (cmp) // Remove duplicates
{
if (!cmp(&sort_length, param->unique_buff, (uchar*) buffpek->key))
goto skip_duplicate;
memcpy(param->unique_buff, (uchar*) buffpek->key,sort_length);
}
if (flag == 0)
{
if (my_b_write(to_file,(byte*) buffpek->key, sort_length))
......@@ -764,6 +759,8 @@ static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
error=0; /* purecov: inspected */
goto end; /* purecov: inspected */
}
skip_duplicate:
buffpek->key+=sort_length;
if (! --buffpek->mem_count)
{
......@@ -802,7 +799,7 @@ static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
}
}
buffpek=(BUFFPEK*) queue_top(&queue);
buffpek->base=(uchar *) sort_keys;
buffpek->base= sort_buffer;
buffpek->max_keys=param->keys;
do
{
......@@ -845,12 +842,12 @@ static int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
/* Do a merge to output-file (save only positions) */
static int merge_index(SORTPARAM *param, uchar **sort_keys,
static int merge_index(SORTPARAM *param, uchar *sort_buffer,
BUFFPEK *buffpek, uint maxbuffer,
IO_CACHE *tempfile, IO_CACHE *outfile)
{
DBUG_ENTER("merge_index");
if (merge_buffers(param,tempfile,outfile,sort_keys,buffpek,buffpek,
if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek,buffpek,
buffpek+maxbuffer,1))
DBUG_RETURN(1); /* purecov: inspected */
DBUG_RETURN(0);
......
......@@ -567,3 +567,32 @@ class user_var_entry
Item_result type;
};
/* Class for unique (removing of duplicates) */
class Unique :public Sql_alloc
{
DYNAMIC_ARRAY file_ptrs;
ulong max_elements, max_in_memory_size;
IO_CACHE file;
TREE tree;
char *record_pointers;
bool flush();
public:
ulong elements;
Unique(qsort_cmp2 comp_func, uint size, ulong max_in_memory_size_arg);
~Unique();
inline bool Unique::unique_add(gptr ptr)
{
if (tree.elements_in_tree > max_elements && flush())
return 1;
return tree_insert(&tree,ptr,0);
}
bool get(TABLE *table);
friend int unique_write_to_file(gptr key, Unique *unique,
element_count count);
friend int unique_write_to_ptrs(gptr key, Unique *unique,
element_count count);
};
......@@ -43,7 +43,8 @@
thd->open_tables=thd->handler_tables; \
thd->handler_tables=tmp; }
static TABLE **find_table_ptr_by_name(THD *thd, char *db, char *table_name);
static TABLE **find_table_ptr_by_name(THD *thd, const char *db,
const char *table_name);
int mysql_ha_open(THD *thd, TABLE_LIST *tables)
{
......@@ -231,14 +232,15 @@ int mysql_ha_read(THD *thd, TABLE_LIST *tables,
/* Note: this function differs from find_locked_table() because we're looking
here for alias, not real table name
*/
static TABLE **find_table_ptr_by_name(THD *thd, char *db, char *table_name)
static TABLE **find_table_ptr_by_name(THD *thd, const char *db,
const char *table_name)
{
int dblen;
TABLE **ptr;
if (!db || ! *db) db=thd->db;
if (!db || ! *db) db="";
dblen=strlen(db);
if (!db || ! *db)
db= thd->db ? thd->db : "";
dblen=strlen(db)+1;
ptr=&(thd->handler_tables);
for (TABLE *table=*ptr; table ; table=*ptr)
......
......@@ -3928,8 +3928,8 @@ bool create_myisam_from_heap(TABLE *table, TMP_TABLE_PARAM *param, int error,
table->file->rnd_init();
if (table->no_rows)
{
new_table->file->extra(HA_EXTRA_NO_ROWS);
new_table->no_rows=1;
new_table.file->extra(HA_EXTRA_NO_ROWS);
new_table.no_rows=1;
}
/* copy all old rows */
......
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* Defines used by filesort and uniques */
#define MERGEBUFF 7
#define MERGEBUFF2 15
typedef struct st_buffpek { /* Struktur om sorteringsbuffrarna */
my_off_t file_pos; /* Where we are in the sort file */
uchar *base,*key; /* key pointers */
ha_rows count; /* Number of rows in table */
ulong mem_count; /* numbers of keys in memory */
ulong max_keys; /* Max keys in buffert */
} BUFFPEK;
typedef struct st_sort_param {
uint sort_length; /* Length of sort columns */
uint keys; /* Max keys / buffert */
uint ref_length; /* Length of record ref. */
ha_rows max_rows,examined_rows;
TABLE *sort_form; /* For quicker make_sortkey */
SORT_FIELD *local_sortorder;
SORT_FIELD *end;
uchar *unique_buff;
#ifdef USE_STRCOLL
char* tmp_buffer;
#endif
} SORTPARAM;
int merge_many_buff(SORTPARAM *param, uchar *sort_buffer,
BUFFPEK *buffpek,
uint *maxbuffer, IO_CACHE *t_file);
uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
uint sort_length);
int merge_buffers(SORTPARAM *param,IO_CACHE *from_file,
IO_CACHE *to_file, uchar *sort_buffer,
BUFFPEK *lastbuff,BUFFPEK *Fb,
BUFFPEK *Tb,int flag);
/* Copyright (C) 2001 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Function to handle quick removal of duplicates
This code is used when doing multi-table deletes to find the rows in
reference tables that needs to be deleted.
The basic idea is as follows:
Store first all strings in a binary tree, ignoring duplicates.
When the three uses more memory than 'max_heap_table_size',
write the tree (in sorted order) out to disk and start with a new tree.
When all data has been generated, merge the trees (removing any found
duplicates).
The unique entries will be returned in sort order, to ensure that we do the
deletes in disk order.
*/
#include "mysql_priv.h"
#include "sql_sort.h"
Unique::Unique(qsort_cmp2 comp_func, uint size, ulong max_in_memory_size_arg)
:max_in_memory_size(max_in_memory_size_arg),elements(0)
{
my_b_clear(&file);
init_tree(&tree, max_in_memory_size / 16, size, comp_func, 0, 0);
/* If the following fail's the next add will also fail */
init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
max_elements= max_in_memory_size / ALIGN_SIZE(sizeof(TREE_ELEMENT)+size);
}
Unique::~Unique()
{
close_cached_file(&file);
delete_tree(&tree);
delete_dynamic(&file_ptrs);
}
/* Write tree to disk; clear tree */
bool Unique::flush()
{
BUFFPEK file_ptr;
elements+= tree.elements_in_tree;
file_ptr.count=tree.elements_in_tree;
file_ptr.file_pos=my_b_tell(&file);
if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
(void*) this, left_root_right) ||
insert_dynamic(&file_ptrs, (gptr) &file_ptr))
return 1;
delete_tree(&tree);
return 0;
}
int unique_write_to_file(gptr key, Unique *unique, element_count count)
{
return my_b_write(&unique->file, key, unique->tree.size_of_element) ? 1 : 0;
}
int unique_write_to_ptrs(gptr key, Unique *unique, element_count count)
{
memcpy(unique->record_pointers, key, unique->tree.size_of_element);
unique->record_pointers+=unique->tree.size_of_element;
return 0;
}
/*
Modify the TABLE element so that when one calls init_records()
the rows will be read in priority order.
*/
bool Unique::get(TABLE *table)
{
SORTPARAM sort_param;
table->found_records=elements+tree.elements_in_tree;
if (!my_b_inited(&file))
{
/* Whole tree is in memory; Don't use disk if you don't need to */
if ((record_pointers=table->record_pointers= (byte*)
my_malloc(tree.size_of_element * tree.elements_in_tree, MYF(0))))
{
(void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
this, left_root_right);
return 0;
}
}
/* Not enough memory; Save the result to file */
if (flush())
return 1;
IO_CACHE *outfile=table->io_cache, tempfile;
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
uint maxbuffer= file_ptrs.elements;
uchar *sort_buffer;
my_off_t save_pos;
bool error=1;
my_b_clear(&tempfile);
/* Open cached file if it isn't open */
if (! my_b_inited(outfile) &&
open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
MYF(MY_WME)))
return 1;
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
sort_param.keys=elements;
sort_param.sort_form=table;
sort_param.sort_length=sort_param.ref_length=tree.size_of_element;
sort_param.keys= max_in_memory_size / sort_param.sort_length;
if (!(sort_buffer=(uchar*) my_malloc((sort_param.keys+1) *
sort_param.sort_length,
MYF(0))))
return 1;
sort_param.unique_buff= sort_buffer+(sort_param.keys*
sort_param.sort_length);
/* Merge the buffers to one file, removing duplicates */
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&tempfile))
goto err;
if (flush_io_cache(&tempfile) ||
reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
goto err;
if (merge_buffers(&sort_param, &tempfile, outfile, sort_buffer, file_ptr,
file_ptr, file_ptr+maxbuffer,0))
goto err;
error=0;
err:
x_free((gptr) sort_buffer);
close_cached_file(&tempfile);
if (flush_io_cache(outfile))
error=1;
/* Setup io_cache for reading */
save_pos=outfile->pos_in_file;
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
error=1;
outfile->end_of_file=save_pos;
return error;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment