Commit 6d059673 authored by Konstantin Osipov's avatar Konstantin Osipov

Implement WL#5502 Remove dead 5.0 class Sensitive_cursor.

Remove dead and unused code.
Update to reflect the code review requests.
parent ec2c3bf2
......@@ -89,23 +89,11 @@ typedef struct st_thr_lock_info
{
pthread_t thread;
my_thread_id thread_id;
ulong n_cursors;
} THR_LOCK_INFO;
/*
Lock owner identifier. Globally identifies the lock owner within the
thread and among all the threads. The address of an instance of this
structure is used as id.
*/
typedef struct st_thr_lock_owner
{
THR_LOCK_INFO *info;
} THR_LOCK_OWNER;
typedef struct st_thr_lock_data {
THR_LOCK_OWNER *owner;
THR_LOCK_INFO *owner;
struct st_thr_lock_data *next,**prev;
struct st_thr_lock *lock;
mysql_cond_t *cond;
......@@ -141,19 +129,18 @@ extern LIST *thr_lock_thread_list;
extern mysql_mutex_t THR_LOCK_lock;
my_bool init_thr_lock(void); /* Must be called once/thread */
#define thr_lock_owner_init(owner, info_arg) (owner)->info= (info_arg)
void thr_lock_info_init(THR_LOCK_INFO *info);
void thr_lock_init(THR_LOCK *lock);
void thr_lock_delete(THR_LOCK *lock);
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data,
void *status_param);
enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data,
THR_LOCK_OWNER *owner,
THR_LOCK_INFO *owner,
enum thr_lock_type lock_type,
ulong lock_wait_timeout);
void thr_unlock(THR_LOCK_DATA *data);
enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data,
uint count, THR_LOCK_OWNER *owner,
uint count, THR_LOCK_INFO *owner,
ulong lock_wait_timeout);
void thr_multi_unlock(THR_LOCK_DATA **data,uint count);
void
......
......@@ -107,7 +107,7 @@ my_bool init_thr_lock()
}
static inline my_bool
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
thr_lock_owner_equal(THR_LOCK_INFO *rhs, THR_LOCK_INFO *lhs)
{
return rhs == lhs;
}
......@@ -122,7 +122,7 @@ static int check_lock(struct st_lock_list *list, const char* lock_type,
{
THR_LOCK_DATA *data,**prev;
uint count=0;
THR_LOCK_OWNER *UNINIT_VAR(first_owner);
THR_LOCK_INFO *UNINIT_VAR(first_owner);
prev= &list->data;
if (list->data)
......@@ -341,7 +341,6 @@ void thr_lock_info_init(THR_LOCK_INFO *info)
struct st_my_thread_var *tmp= my_thread_var;
info->thread= tmp->pthread_self;
info->thread_id= tmp->id;
info->n_cursors= 0;
}
/* Initialize a lock instance */
......@@ -357,7 +356,7 @@ void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
static inline my_bool
has_old_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner)
has_old_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner)
{
for ( ; data ; data=data->next)
{
......@@ -506,13 +505,12 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
enum enum_thr_lock_result
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
enum thr_lock_type lock_type, ulong lock_wait_timeout)
{
THR_LOCK *lock=data->lock;
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
struct st_lock_list *wait_queue;
THR_LOCK_DATA *lock_owner;
DBUG_ENTER("thr_lock");
data->next=0;
......@@ -521,7 +519,7 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
data->owner= owner; /* Must be reset ! */
mysql_mutex_lock(&lock->mutex);
DBUG_PRINT("lock",("data: 0x%lx thread: 0x%lx lock: 0x%lx type: %d",
(long) data, data->owner->info->thread_id,
(long) data, data->owner->thread_id,
(long) lock, (int) lock_type));
check_locks(lock,(uint) lock_type <= (uint) TL_READ_NO_INSERT ?
"enter read_lock" : "enter write_lock",0);
......@@ -558,7 +556,7 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
*/
DBUG_PRINT("lock",("write locked 1 by thread: 0x%lx",
lock->write.data->owner->info->thread_id));
lock->write.data->owner->thread_id));
if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
(lock->write.data->type <= TL_WRITE_DELAYED &&
(((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) ||
......@@ -707,7 +705,7 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
goto end;
}
DBUG_PRINT("lock",("write locked 2 by thread: 0x%lx",
lock->write.data->owner->info->thread_id));
lock->write.data->owner->thread_id));
}
else
{
......@@ -743,23 +741,10 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
}
}
DBUG_PRINT("lock",("write locked 3 by thread: 0x%lx type: %d",
lock->read.data->owner->info->thread_id, data->type));
lock->read.data->owner->thread_id, data->type));
}
wait_queue= &lock->write_wait;
}
/*
Try to detect a trivial deadlock when using cursors: attempt to
lock a table that is already locked by an open cursor within the
same connection. lock_owner can be zero if we succumbed to a high
priority writer in the write_wait queue.
*/
lock_owner= lock->read.data ? lock->read.data : lock->write.data;
if (lock_owner && lock_owner->owner->info == owner->info)
{
DBUG_PRINT("lock",("deadlock"));
result= THR_LOCK_DEADLOCK;
goto end;
}
/* Can't get lock yet; Wait for it */
DBUG_RETURN(wait_for_lock(wait_queue, data, 0, lock_wait_timeout));
end:
......@@ -807,7 +792,7 @@ static inline void free_all_read_locks(THR_LOCK *lock,
}
/* purecov: begin inspected */
DBUG_PRINT("lock",("giving read lock to thread: 0x%lx",
data->owner->info->thread_id));
data->owner->thread_id));
/* purecov: end */
data->cond=0; /* Mark thread free */
mysql_cond_signal(cond);
......@@ -826,7 +811,7 @@ void thr_unlock(THR_LOCK_DATA *data)
enum thr_lock_type lock_type=data->type;
DBUG_ENTER("thr_unlock");
DBUG_PRINT("lock",("data: 0x%lx thread: 0x%lx lock: 0x%lx",
(long) data, data->owner->info->thread_id, (long) lock));
(long) data, data->owner->thread_id, (long) lock));
mysql_mutex_lock(&lock->mutex);
check_locks(lock,"start of release lock",0);
......@@ -915,7 +900,7 @@ static void wake_up_waiters(THR_LOCK *lock)
data->type=TL_WRITE; /* Upgrade lock */
/* purecov: begin inspected */
DBUG_PRINT("lock",("giving write lock of type %d to thread: 0x%lx",
data->type, data->owner->info->thread_id));
data->type, data->owner->thread_id));
/* purecov: end */
{
mysql_cond_t *cond= data->cond;
......@@ -1020,7 +1005,7 @@ static void sort_locks(THR_LOCK_DATA **data,uint count)
enum enum_thr_lock_result
thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_OWNER *owner,
thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner,
ulong lock_wait_timeout)
{
THR_LOCK_DATA **pos,**end;
......@@ -1144,7 +1129,7 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count)
else
{
DBUG_PRINT("lock",("Free lock: data: 0x%lx thread: 0x%lx lock: 0x%lx",
(long) *pos, (*pos)->owner->info->thread_id,
(long) *pos, (*pos)->owner->thread_id,
(long) (*pos)->lock));
}
}
......@@ -1200,7 +1185,7 @@ my_bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
mysql_mutex_lock(&lock->mutex);
for (data= lock->read_wait.data; data ; data= data->next)
{
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
if (data->owner->thread_id == thread_id) /* purecov: tested */
{
DBUG_PRINT("info",("Aborting read-wait lock"));
data->type= TL_UNLOCK; /* Mark killed */
......@@ -1217,7 +1202,7 @@ my_bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
}
for (data= lock->write_wait.data; data ; data= data->next)
{
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
if (data->owner->thread_id == thread_id) /* purecov: tested */
{
DBUG_PRINT("info",("Aborting write-wait lock"));
data->type= TL_UNLOCK;
......@@ -1387,7 +1372,7 @@ static void thr_print_lock(const char* name,struct st_lock_list *list)
prev= &list->data;
for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
{
printf("0x%lx (%lu:%d); ", (ulong) data, data->owner->info->thread_id,
printf("0x%lx (%lu:%d); ", (ulong) data, data->owner->thread_id,
(int) data->type);
if (data->prev != prev)
printf("\nWarning: prev didn't point at previous lock\n");
......@@ -1525,7 +1510,6 @@ static void *test_thread(void *arg)
{
int i,j,param=*((int*) arg);
THR_LOCK_DATA data[MAX_LOCK_COUNT];
THR_LOCK_OWNER owner;
THR_LOCK_INFO lock_info;
THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT];
my_thread_init();
......@@ -1534,7 +1518,6 @@ static void *test_thread(void *arg)
thr_lock_info_init(&lock_info);
thr_lock_owner_init(&owner, &lock_info);
for (i=0; i < lock_counts[param] ; i++)
thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
for (j=1 ; j < 10 ; j++) /* try locking 10 times */
......@@ -1544,7 +1527,7 @@ static void *test_thread(void *arg)
multi_locks[i]= &data[i];
data[i].type= tests[param][i].lock_type;
}
thr_multi_lock(multi_locks, lock_counts[param], &owner, TEST_TIMEOUT);
thr_multi_lock(multi_locks, lock_counts[param], &lock_info, TEST_TIMEOUT);
mysql_mutex_lock(&LOCK_thread_count);
{
int tmp=rand() & 7; /* Do something from 0-2 sec */
......
......@@ -313,7 +313,7 @@ MYSQL_LOCK *mysql_lock_tables(THD *thd, TABLE **tables, uint count, uint flags)
rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(sql_lock->locks +
sql_lock->lock_count,
sql_lock->lock_count,
thd->lock_id, timeout)];
&thd->lock_info, timeout)];
if (rc)
{
if (sql_lock->table_count)
......
......@@ -525,8 +525,7 @@ sp_cursor::open(THD *thd)
MYF(0));
return -1;
}
if (mysql_open_cursor(thd, (uint) ALWAYS_MATERIALIZED_CURSOR, &result,
&server_side_cursor))
if (mysql_open_cursor(thd, &result, &server_side_cursor))
return -1;
return 0;
}
......
......@@ -490,7 +490,6 @@ THD::THD()
:Statement(&main_lex, &main_mem_root, CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0),
lock_id(&main_lock_id),
user_time(0), in_sub_stmt(0),
binlog_unsafe_warning_flags(0),
stmt_accessed_table_flag(0),
......@@ -624,7 +623,6 @@ THD::THD()
randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
substitute_null_with_insert_id = FALSE;
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
thr_lock_owner_init(&main_lock_id, &lock_info);
m_internal_handler= NULL;
current_user_used= FALSE;
......@@ -1113,7 +1111,6 @@ THD::~THD()
}
#endif
stmt_map.reset(); /* close all prepared statements */
DBUG_ASSERT(lock_info.n_cursors == 0);
if (!cleanup_done)
cleanup();
......@@ -2589,7 +2586,6 @@ Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg,
id(id_arg),
mark_used_columns(MARK_COLUMNS_READ),
lex(lex_arg),
cursor(0),
db(NULL),
db_length(0)
{
......@@ -2611,7 +2607,6 @@ void Statement::set_statement(Statement *stmt)
mark_used_columns= stmt->mark_used_columns;
lex= stmt->lex;
query_string= stmt->query_string;
cursor= stmt->cursor;
}
......
......@@ -38,7 +38,7 @@
#include "protocol.h" /* Protocol_text, Protocol_binary */
#include "violite.h" /* vio_is_connected */
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA,
THR_LOCK_INFO, THR_LOCK_OWNER */
THR_LOCK_INFO */
class Reprepare_observer;
......@@ -723,7 +723,6 @@ class Statement: public ilink, public Query_arena
ENGINE INNODB STATUS.
*/
LEX_STRING query_string;
Server_side_cursor *cursor;
inline char *query() { return query_string.str; }
inline uint32 query_length() { return query_string.length; }
......@@ -1416,9 +1415,6 @@ class THD :public Statement,
struct system_status_var status_var; // Per thread statistic vars
struct system_status_var *initial_status_var; /* used by show status */
THR_LOCK_INFO lock_info; // Locking info of this thread
THR_LOCK_OWNER main_lock_id; // To use for conventional queries
THR_LOCK_OWNER *lock_id; // If not main_lock_id, points to
// the lock_id of a cursor.
/**
Protects THD data accessed from other threads:
- thd->query and thd->query_length (used by SHOW ENGINE
......
......@@ -19,60 +19,13 @@
#include "sql_priv.h"
#include "unireg.h"
#include "sql_cursor.h"
#include "sql_select.h"
#include "probes_mysql.h"
#include "sql_parse.h" // mysql_execute_command
#include "sql_base.h"
/****************************************************************************
Declarations.
****************************************************************************/
/**
Sensitive_cursor -- a sensitive non-materialized server side
cursor. An instance of this class cursor has its own runtime
state -- list of used items and memory root for runtime memory,
open and locked tables, change list for the changes of the
parsed tree. This state is freed when the cursor is closed.
*/
class Sensitive_cursor: public Server_side_cursor
{
MEM_ROOT main_mem_root;
Query_arena *stmt_arena;
JOIN *join;
TABLE *open_tables;
MYSQL_LOCK *lock;
TABLE *derived_tables;
/* List of items created during execution */
query_id_t query_id;
struct Engine_info
{
handlerton *ht;
void *read_view;
};
Engine_info ht_info[MAX_HA];
Item_change_list change_list;
my_bool close_at_commit;
THR_LOCK_OWNER lock_id;
private:
/* bzero cursor state in THD */
void reset_thd(THD *thd);
public:
Sensitive_cursor(THD *thd, select_result *result_arg);
THR_LOCK_OWNER *get_lock_id() { return &lock_id; }
/* Save THD state into cursor */
void post_open(THD *thd);
virtual bool is_open() const { return join != 0; }
virtual int open(JOIN *join);
virtual void fetch(ulong num_rows);
virtual void close();
virtual ~Sensitive_cursor();
};
/**
Materialized_cursor -- an insensitive materialized server-side
cursor. The result set of this cursor is saved in a temporary
......@@ -125,10 +78,9 @@ class Select_materialize: public select_union
/**************************************************************************/
/**
Attempt to open a materialized or non-materialized cursor.
Attempt to open a materialized cursor.
@param thd thread handle
@param[in] flags create a materialized cursor or not
@param[in] result result class of the caller used as a destination
for the rows fetched from the cursor
@param[out] pcursor a pointer to store a pointer to cursor in
......@@ -141,37 +93,21 @@ class Select_materialize: public select_union
non-zero an error, 'pcursor' has been left intact.
*/
int mysql_open_cursor(THD *thd, uint flags, select_result *result,
int mysql_open_cursor(THD *thd, select_result *result,
Server_side_cursor **pcursor)
{
Sensitive_cursor *sensitive_cursor;
select_result *save_result;
Select_materialize *result_materialize;
LEX *lex= thd->lex;
int rc;
/*
The lifetime of the sensitive cursor is the same or less as the
lifetime of the runtime memory of the statement it's opened for.
*/
if (! (result_materialize= new (thd->mem_root) Select_materialize(result)))
return 1;
if (! (sensitive_cursor= new (thd->mem_root) Sensitive_cursor(thd, result)))
{
delete result_materialize;
result_materialize= NULL;
return 1;
}
save_result= lex->result;
lex->result= result_materialize;
if (! (flags & (uint) ALWAYS_MATERIALIZED_CURSOR))
{
thd->lock_id= sensitive_cursor->get_lock_id();
thd->cursor= sensitive_cursor;
}
MYSQL_QUERY_EXEC_START(thd->query(),
thd->thread_id,
(char *) (thd->db ? thd->db : ""),
......@@ -182,20 +118,14 @@ int mysql_open_cursor(THD *thd, uint flags, select_result *result,
MYSQL_QUERY_EXEC_DONE(rc);
lex->result= save_result;
thd->lock_id= &thd->main_lock_id;
thd->cursor= 0;
/*
Possible options here:
- a sensitive cursor is open. In this case rc is 0 and
result_materialize->materialized_cursor is NULL, or
- a materialized cursor is open. In this case rc is 0 and
result_materialize->materialized is not NULL
- an error occurred during materialization.
result_materialize->materialized_cursor is not NULL, but rc != 0
- successful completion of mysql_execute_command without
a cursor: rc is 0, result_materialize->materialized_cursor is NULL,
sensitive_cursor is not open.
a cursor: rc is 0, result_materialize->materialized_cursor is NULL.
This is possible if some command writes directly to the
network, bypassing select_result mechanism. An example of
such command is SHOW VARIABLES or SHOW STATUS.
......@@ -204,23 +134,10 @@ int mysql_open_cursor(THD *thd, uint flags, select_result *result,
{
if (result_materialize->materialized_cursor)
delete result_materialize->materialized_cursor;
goto err_open;
}
if (sensitive_cursor->is_open())
{
DBUG_ASSERT(!result_materialize->materialized_cursor);
/*
It's safer if we grab THD state after mysql_execute_command
is finished and not in Sensitive_cursor::open(), because
currently the call to Sensitive_cursor::open is buried deep
in JOIN::exec of the top level join.
*/
sensitive_cursor->post_open(thd);
*pcursor= sensitive_cursor;
goto end;
}
else if (result_materialize->materialized_cursor)
if (result_materialize->materialized_cursor)
{
Materialized_cursor *materialized_cursor=
result_materialize->materialized_cursor;
......@@ -228,18 +145,13 @@ int mysql_open_cursor(THD *thd, uint flags, select_result *result,
if ((rc= materialized_cursor->open(0)))
{
delete materialized_cursor;
materialized_cursor= NULL;
goto err_open;
goto end;
}
*pcursor= materialized_cursor;
thd->stmt_arena->cleanup_stmt();
goto end;
}
err_open:
DBUG_ASSERT(! (sensitive_cursor && sensitive_cursor->is_open()));
delete sensitive_cursor;
end:
delete result_materialize;
return rc;
......@@ -271,280 +183,6 @@ void Server_side_cursor::operator delete(void *ptr, size_t size)
DBUG_VOID_RETURN;
}
/****************************************************************************
Sensitive_cursor
****************************************************************************/
Sensitive_cursor::Sensitive_cursor(THD *thd, select_result *result_arg)
:Server_side_cursor(&main_mem_root, result_arg),
stmt_arena(0),
join(0),
close_at_commit(FALSE)
{
/* We will overwrite it at open anyway. */
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
thr_lock_owner_init(&lock_id, &thd->lock_info);
bzero((void*) ht_info, sizeof(ht_info));
}
/**
Save THD state into cursor.
@todo
- What problems can we have with it if cursor is open?
- TODO: must be fixed because of the prelocked mode.
*/
void
Sensitive_cursor::post_open(THD *thd)
{
Engine_info *info;
/*
We need to save and reset thd->mem_root, otherwise it'll be
freed later in mysql_parse.
We can't just change thd->mem_root here as we want to keep the
things that are already allocated in thd->mem_root for
Sensitive_cursor::fetch()
*/
*mem_root= *thd->mem_root;
stmt_arena= thd->stmt_arena;
state= stmt_arena->state;
/* Allocate a new memory root for thd */
init_sql_alloc(thd->mem_root,
thd->variables.query_alloc_block_size,
thd->variables.query_prealloc_size);
/*
Save tables and zero THD pointers to prevent table close in
close_thread_tables.
*/
derived_tables= thd->derived_tables;
open_tables= thd->open_tables;
lock= thd->lock;
query_id= thd->query_id;
free_list= thd->free_list;
thd->change_list.move_elements_to(&change_list);
reset_thd(thd);
/* Now we have an active cursor and can cause a deadlock */
thd->lock_info.n_cursors++;
close_at_commit= FALSE; /* reset in case we're reusing the cursor */
info= &ht_info[0];
for (Ha_trx_info *ha_trx_info= thd->transaction.stmt.ha_list;
ha_trx_info; ha_trx_info= ha_trx_info->next())
{
handlerton *ht= ha_trx_info->ht();
close_at_commit|= test(ht->flags & HTON_CLOSE_CURSORS_AT_COMMIT);
if (ht->create_cursor_read_view)
{
info->ht= ht;
info->read_view= (ht->create_cursor_read_view)(ht, thd);
++info;
}
}
/*
What problems can we have with it if cursor is open?
TODO: must be fixed because of the prelocked mode.
*/
}
/**
bzero cursor state in THD.
*/
void
Sensitive_cursor::reset_thd(THD *thd)
{
thd->derived_tables= 0;
thd->set_open_tables(NULL);
thd->lock= 0;
thd->free_list= 0;
thd->change_list.empty();
}
int
Sensitive_cursor::open(JOIN *join_arg)
{
join= join_arg;
THD *thd= join->thd;
/* First non-constant table */
JOIN_TAB *join_tab= join->join_tab + join->const_tables;
DBUG_ENTER("Sensitive_cursor::open");
join->change_result(result);
/*
Send fields description to the client; server_status is sent
in 'EOF' packet, which follows send_result_set_metadata().
We don't simply use SEND_EOF flag of send_result_set_metadata because we also
want to flush the network buffer, which is done only in a standalone
send_eof().
*/
result->send_result_set_metadata(*join->fields, Protocol::SEND_NUM_ROWS);
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
result->send_eof();
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
/* Prepare JOIN for reading rows. */
join->tmp_table= 0;
join->join_tab[join->tables-1].next_select= setup_end_select_func(join);
join->send_records= 0;
join->fetch_limit= join->unit->offset_limit_cnt;
/* Disable JOIN CACHE as it is not working with cursors yet */
for (JOIN_TAB *tab= join_tab;
tab != join->join_tab + join->tables - 1;
tab++)
{
if (tab->next_select == sub_select_cache)
tab->next_select= sub_select;
}
DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0);
DBUG_ASSERT(join_tab->not_used_in_distinct == 0);
/*
null_row is set only if row not found and it's outer join: should never
happen for the first table in join_tab list
*/
DBUG_ASSERT(join_tab->table->null_row == 0);
DBUG_RETURN(0);
}
/**
Fetch next num_rows rows from the cursor and send them to the client.
Precondition:
- Sensitive_cursor is open
@param num_rows fetch up to this number of rows (maybe less)
*/
void
Sensitive_cursor::fetch(ulong num_rows)
{
THD *thd= join->thd;
JOIN_TAB *join_tab= join->join_tab + join->const_tables;
enum_nested_loop_state error= NESTED_LOOP_OK;
Query_arena backup_arena;
Engine_info *info;
DBUG_ENTER("Sensitive_cursor::fetch");
DBUG_PRINT("enter",("rows: %lu", num_rows));
DBUG_ASSERT(thd->derived_tables == 0 && thd->open_tables == 0 &&
thd->lock == 0);
thd->derived_tables= derived_tables;
thd->set_open_tables(open_tables);
thd->lock= lock;
thd->set_query_id(query_id);
change_list.move_elements_to(&thd->change_list);
/* save references to memory allocated during fetch */
thd->set_n_backup_active_arena(this, &backup_arena);
for (info= ht_info; info->read_view ; info++)
(info->ht->set_cursor_read_view)(info->ht, thd, info->read_view);
join->fetch_limit+= num_rows;
error= sub_select(join, join_tab, 0);
if (error == NESTED_LOOP_OK || error == NESTED_LOOP_NO_MORE_ROWS)
error= sub_select(join,join_tab,1);
if (error == NESTED_LOOP_QUERY_LIMIT)
error= NESTED_LOOP_OK; /* select_limit used */
if (error == NESTED_LOOP_CURSOR_LIMIT)
join->resume_nested_loop= TRUE;
ha_release_temporary_latches(thd);
/* Grab free_list here to correctly free it in close */
thd->restore_active_arena(this, &backup_arena);
thd->change_list.move_elements_to(&change_list);
reset_thd(thd);
for (info= ht_info; info->read_view; info++)
(info->ht->set_cursor_read_view)(info->ht, thd, 0);
if (error == NESTED_LOOP_CURSOR_LIMIT)
{
/* Fetch limit worked, possibly more rows are there */
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
result->send_eof();
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
}
else
{
close();
if (error == NESTED_LOOP_OK)
{
thd->server_status|= SERVER_STATUS_LAST_ROW_SENT;
result->send_eof();
thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT;
}
else if (error != NESTED_LOOP_KILLED)
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
}
DBUG_VOID_RETURN;
}
/**
@todo
Another hack: we need to set THD state as if in a fetch to be
able to call stmt close.
*/
void
Sensitive_cursor::close()
{
THD *thd= join->thd;
DBUG_ENTER("Sensitive_cursor::close");
for (Engine_info *info= ht_info; info->read_view; info++)
{
(info->ht->close_cursor_read_view)(info->ht, thd, info->read_view);
info->read_view= 0;
info->ht= 0;
}
change_list.move_elements_to(&thd->change_list);
{
/*
XXX: Another hack: we need to set THD state as if in a fetch to be
able to call stmt close.
*/
DBUG_ASSERT(lock || open_tables || derived_tables);
TABLE *tmp_derived_tables= thd->derived_tables;
MYSQL_LOCK *tmp_lock= thd->lock;
thd->set_open_tables(open_tables);
thd->derived_tables= derived_tables;
thd->lock= lock;
close_thread_tables(thd);
/* Is expected to at least close tables and empty thd->change_list */
stmt_arena->cleanup_stmt();
thd->set_open_tables(tmp_derived_tables);
thd->derived_tables= tmp_derived_tables;
thd->lock= tmp_lock;
}
thd->lock_info.n_cursors--; /* Decrease the number of active cursors */
join= 0;
stmt_arena= 0;
free_items();
DBUG_VOID_RETURN;
}
Sensitive_cursor::~Sensitive_cursor()
{
if (is_open())
close();
}
/***************************************************************************
Materialized_cursor
......@@ -570,7 +208,8 @@ Materialized_cursor::Materialized_cursor(select_result *result_arg,
@param send_result_set_metadata List of fields that would be sent.
*/
int Materialized_cursor::fill_item_list(THD *thd, List<Item> &send_result_set_metadata)
int Materialized_cursor::fill_item_list(THD *thd,
List<Item> &send_result_set_metadata)
{
Query_arena backup_arena;
int rc;
......@@ -608,6 +247,7 @@ int Materialized_cursor::fill_item_list(THD *thd, List<Item> &send_result_set_me
return rc || thd->is_error();
}
int Materialized_cursor::open(JOIN *join __attribute__((unused)))
{
THD *thd= fake_unit.thd;
......
......@@ -32,11 +32,11 @@ class JOIN;
*/
/**
Server_side_cursor -- an interface for materialized and
sensitive (non-materialized) implementation of cursors. All
cursors are self-contained (created in their own memory root).
For that reason they must be deleted only using a pointer to
Server_side_cursor, not to its base class.
Server_side_cursor -- an interface for materialized
implementation of cursors. All cursors are self-contained
(created in their own memory root). For that reason they must
be deleted only using a pointer to Server_side_cursor, not to
its base class.
*/
class Server_side_cursor: protected Query_arena, public Sql_alloc
......@@ -60,11 +60,7 @@ class Server_side_cursor: protected Query_arena, public Sql_alloc
};
int mysql_open_cursor(THD *thd, uint flags,
select_result *result,
int mysql_open_cursor(THD *thd, select_result *result,
Server_side_cursor **res);
/** Possible values for flags */
enum { ANY_CURSOR= 1, ALWAYS_MATERIALIZED_CURSOR= 2 };
#endif /* _sql_cusor_h_ */
......@@ -152,6 +152,7 @@ class Prepared_statement: public Statement
THD *thd;
Select_fetch_protocol_binary result;
Item_param **param_array;
Server_side_cursor *cursor;
uint param_count;
uint last_errno;
uint flags;
......@@ -2672,7 +2673,6 @@ void mysqld_stmt_fetch(THD *thd, char *packet, uint packet_length)
if (!cursor->is_open())
{
stmt->close_cursor();
thd->cursor= 0;
reset_stmt_params(stmt);
}
......@@ -3010,6 +3010,7 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
thd(thd_arg),
result(thd_arg),
param_array(0),
cursor(0),
param_count(0),
last_errno(0),
flags((uint) IS_IN_USE)
......@@ -3751,8 +3752,7 @@ bool Prepared_statement::execute(String *expanded_query, bool open_cursor)
/* Go! */
if (open_cursor)
error= mysql_open_cursor(thd, (uint) ALWAYS_MATERIALIZED_CURSOR,
&result, &cursor);
error= mysql_open_cursor(thd, &result, &cursor);
else
{
/*
......
......@@ -33,7 +33,6 @@
#include "sql_select.h"
#include "sql_cache.h" // query_cache_*
#include "sql_table.h" // primary_key_name
#include "sql_cursor.h"
#include "probes_mysql.h"
#include "key.h" // key_copy, key_cmp, key_cmp_if_same
#include "lock.h" // mysql_unlock_some_tables,
......@@ -2340,35 +2339,13 @@ JOIN::exec()
curr_join->fields= curr_fields_list;
curr_join->procedure= procedure;
if (is_top_level_join() && thd->cursor && tables != const_tables)
{
/*
We are here if this is JOIN::exec for the last select of the main unit
and the client requested to open a cursor.
We check that not all tables are constant because this case is not
handled by do_select() separately, and this case is not implemented
for cursors yet.
*/
DBUG_ASSERT(error == 0);
/*
curr_join is used only for reusable joins - that is,
to perform SELECT for each outer row (like in subselects).
This join is main, so we know for sure that curr_join == join.
*/
DBUG_ASSERT(curr_join == this);
/* Open cursor for the last join sweep */
error= thd->cursor->open(this);
}
else
{
thd_proc_info(thd, "Sending data");
DBUG_PRINT("info", ("%s", thd->proc_info));
result->send_result_set_metadata((procedure ? curr_join->procedure_fields_list :
*curr_fields_list),
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
error= do_select(curr_join, curr_fields_list, NULL, procedure);
thd->limit_found_rows= curr_join->send_records;
}
thd_proc_info(thd, "Sending data");
DBUG_PRINT("info", ("%s", thd->proc_info));
result->send_result_set_metadata((procedure ? curr_join->procedure_fields_list :
*curr_fields_list),
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
error= do_select(curr_join, curr_fields_list, NULL, procedure);
thd->limit_found_rows= curr_join->send_records;
/* Accumulate the counts from all join iterations of all join parts. */
thd->examined_row_count+= curr_join->examined_rows;
......@@ -2563,16 +2540,6 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
join->exec();
if (thd->cursor && thd->cursor->is_open())
{
/*
A cursor was opened for the last sweep in exec().
We are here only if this is mysql_select for top-level SELECT_LEX_UNIT
and there were no error.
*/
free_join= 0;
}
if (thd->lex->describe & DESCRIBE_EXTENDED)
{
select_lex->where= join->conds_history;
......@@ -11642,37 +11609,23 @@ sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)
enum_nested_loop_state rc;
READ_RECORD *info= &join_tab->read_record;
if (join->resume_nested_loop)
{
/* If not the last table, plunge down the nested loop */
if (join_tab < join->join_tab + join->tables - 1)
rc= (*join_tab->next_select)(join, join_tab + 1, 0);
else
{
join->resume_nested_loop= FALSE;
rc= NESTED_LOOP_OK;
}
}
else
{
join->return_tab= join_tab;
if (join_tab->last_inner)
{
/* join_tab is the first inner table for an outer join operation. */
join->return_tab= join_tab;
/* Set initial state of guard variables for this table.*/
join_tab->found=0;
join_tab->not_null_compl= 1;
if (join_tab->last_inner)
{
/* join_tab is the first inner table for an outer join operation. */
/* Set first_unmatched for the last inner table of this group */
join_tab->last_inner->first_unmatched= join_tab;
}
join->thd->warning_info->reset_current_row_for_warning();
/* Set initial state of guard variables for this table.*/
join_tab->found=0;
join_tab->not_null_compl= 1;
error= (*join_tab->read_first_record)(join_tab);
rc= evaluate_join_record(join, join_tab, error);
/* Set first_unmatched for the last inner table of this group */
join_tab->last_inner->first_unmatched= join_tab;
}
join->thd->warning_info->reset_current_row_for_warning();
error= (*join_tab->read_first_record)(join_tab);
rc= evaluate_join_record(join, join_tab, error);
while (rc == NESTED_LOOP_OK)
{
......
......@@ -309,11 +309,6 @@ class JOIN :public Sql_alloc
bool sort_and_group;
bool first_record,full_join,group, no_field_update;
bool do_send_rows;
/**
TRUE when we want to resume nested loop iterations when
fetching data from a cursor
*/
bool resume_nested_loop;
table_map const_table_map,found_const_table_map;
/*
Bitmap of all inner tables from outer joins
......@@ -479,7 +474,6 @@ class JOIN :public Sql_alloc
sort_and_group= 0;
first_record= 0;
do_send_rows= 1;
resume_nested_loop= FALSE;
send_records= 0;
found_records= 0;
fetch_limit= HA_POS_ERROR;
......
......@@ -36,8 +36,7 @@ bool mysql_union(THD *thd, LEX *lex, select_result *result,
if (!(res= unit->prepare(thd, result, SELECT_NO_UNLOCK |
setup_tables_done_option)))
res= unit->exec();
if (res || !thd->cursor || !thd->cursor->is_open())
res|= unit->cleanup();
res|= unit->cleanup();
DBUG_RETURN(res);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment