From ec5db6409d3bd61c46c31778bccf63946255625b Mon Sep 17 00:00:00 2001
From: Vladislav Vaintroub <vvaintroub@gmail.com>
Date: Thu, 23 Nov 2023 16:58:28 +0100
Subject: [PATCH] MDEV-32216 Connection pool with asynchronous query execution.

Parallelism is achieved by using mysql_send_query on multiple connections
without waiting for results, and using IO multiplexing (poll/IOCP) to
wait for completions.

Refresh libmariadb to pick up CONC-676 (fixes for IOCP use with named pipe)
---
 client/connection_pool.cc | 264 ++++++++++++++++++++++++++++++++++++++
 client/connection_pool.h  | 125 ++++++++++++++++++
 libmariadb                |   2 +-
 3 files changed, 390 insertions(+), 1 deletion(-)
 create mode 100644 client/connection_pool.cc
 create mode 100644 client/connection_pool.h

diff --git a/client/connection_pool.cc b/client/connection_pool.cc
new file mode 100644
index 00000000000..7f1db370109
--- /dev/null
+++ b/client/connection_pool.cc
@@ -0,0 +1,264 @@
+/*
+   Copyright (c) 2023, MariaDB.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335
+   USA
+*/
+
+/*
+  Connection pool, with parallel query execution
+  Does not use threads, but IO multiplexing via mysql_send_query and
+  poll/iocp to wait for completions
+*/
+#include <my_global.h>
+#ifdef _WIN32
+#include <winsock2.h>
+#else
+#include <poll.h>
+#endif
+#include "connection_pool.h"
+#include <my_compiler.h>
+
+namespace async_pool
+{
+static ATTRIBUTE_NORETURN void die(const char *format, ...)
+{
+  va_list args;
+  va_start(args, format);
+  vfprintf(stderr, format, args);
+  va_end(args);
+  abort();
+}
+
+pooled_connection *connection_pool::get_connection()
+{
+  while (free_connections.empty())
+    wait_for_completions();
+  auto c= free_connections.front();
+  free_connections.pop();
+  return c;
+}
+
+#ifdef _WIN32
+void connection_pool::add_to_pollset(pooled_connection *c)
+{
+  DWORD err= ERROR_SUCCESS;
+  static char ch;
+  WSABUF buf;
+  buf.len= 0;
+  buf.buf= &ch;
+  if (!c->is_pipe)
+  {
+    /* Do async io (sockets). */
+    DWORD flags= 0;
+    if (WSARecv((SOCKET) c->handle, &buf, 1, 0, &flags, c, NULL))
+      err= WSAGetLastError();
+  }
+  else
+  {
+    /* Do async read (named pipe) */
+    if (!ReadFile(c->handle, buf.buf, buf.len, 0, c))
+      err= GetLastError();
+  }
+
+  if (err && err != ERROR_IO_PENDING)
+    die("%s failed: %d\n", c->is_pipe ? "ReadFile" : "WSARecv", err);
+}
+
+/*
+  Wait for completions of queries.Uses IOCP on windows to wait for completions.
+  (ReadFile/WSARecv with 0 bytes serves as readiness notification)
+*/
+void connection_pool::wait_for_completions()
+{
+  ULONG n;
+  OVERLAPPED_ENTRY events[32];
+  if (!GetQueuedCompletionStatusEx(iocp, events, array_elements(events), &n, INFINITE,
+                                   FALSE))
+  {
+    die("GetQueuedCompletionStatusEx failed: %d\n", GetLastError());
+  }
+
+  for (ULONG i= 0; i < n; i++)
+  {
+    auto c= (pooled_connection *) events[i].lpOverlapped;
+    if (!c)
+      die("GetQueuedCompletionStatusEx unexpected return");
+    DBUG_ASSERT(c->mysql);
+    DBUG_ASSERT(!events[i].lpCompletionKey);
+    DBUG_ASSERT(!events[i].dwNumberOfBytesTransferred);
+    complete_query(c);
+  }
+}
+#else /* !_WIN32 */
+void connection_pool::add_to_pollset(pooled_connection *c)
+{
+  size_t idx= c - &all_connections[0];
+  pollfd *pfd= &pollset[idx];
+  pfd->fd= c->fd;
+  pfd->events= POLLIN;
+  pfd->revents= 0;
+}
+
+/* something Linux-ish, can be returned for POLLIN event*/
+#ifndef POLLRDHUP
+#define POLLRDHUP 0
+#endif
+
+void connection_pool::wait_for_completions()
+{
+  int n;
+  while ((n= poll(pollset.data(), pollset.size(), -1)) <= 0)
+  {
+    if (errno == EINTR)
+      continue;
+    die("poll failed: %d\n", errno);
+  }
+
+  for (uint i= 0; n > 0 && i < pollset.size(); i++)
+  {
+    pollfd* pfd= &pollset[i];
+    if (pfd->revents &
+        (POLLIN | POLLPRI | POLLHUP | POLLRDHUP| POLLERR | POLLNVAL))
+    {
+      pfd->events= 0;
+      pfd->revents= 0;
+      pfd->fd= -1;
+      complete_query(&all_connections[i]);
+      n--;
+    }
+  }
+  if (n)
+    die("poll() failed to find free connection: %d\n");
+}
+#endif
+
+void connection_pool::complete_query(pooled_connection *c)
+{
+  int err= mysql_read_query_result(c->mysql);
+  if (c->on_completion)
+    c->on_completion(c->mysql, c->query.c_str(), !err, c->context);
+  if (c->release_connection)
+  {
+    c->in_use= false;
+    free_connections.push(c);
+  }
+}
+
+connection_pool::~connection_pool()
+{
+  close();
+}
+
+void connection_pool::init(MYSQL *con[], size_t n)
+{
+#ifdef _WIN32
+  iocp= CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+  if (!iocp)
+    die("CreateIoCompletionPort failed: %d\n", GetLastError());
+#else
+  pollset.resize(n);
+  for (auto &pfd : pollset)
+    pfd.fd= -1;
+#endif
+
+
+  for (size_t i= 0; i < n; i++)
+    all_connections.emplace_back(con[i]);
+
+  for (auto &con : all_connections)
+  {
+    free_connections.push(&con);
+#ifdef _WIN32
+    if (!CreateIoCompletionPort(con.handle, iocp, 0, 0))
+      die("CreateIoCompletionPort failed: %d\n", GetLastError());
+#endif
+  }
+}
+
+int connection_pool::execute_async(const char *query,
+                                   query_completion_handler on_completion,
+                                   void *context, bool release_connecton)
+{
+  auto c= get_connection();
+  c->context= context;
+  c->on_completion= on_completion;
+  c->release_connection= release_connecton;
+  c->query= query;
+
+  int ret= mysql_send_query(c->mysql, query, (unsigned long) c->query.size());
+  if (ret)
+  {
+    free_connections.push(c);
+    return ret;
+  }
+
+  c->in_use= true;
+  add_to_pollset(c);
+  return 0;
+}
+
+/*
+  Wait until all queries are completed and all 
+  connections are idle.
+*/
+void connection_pool::wait_all()
+{
+  while (free_connections.size() != all_connections.size())
+    wait_for_completions();
+}
+
+void connection_pool::for_each_connection(void(*f)(MYSQL *mysql))
+{
+  for (auto &c : all_connections)
+   f(c.mysql);
+}
+
+int connection_pool::close()
+{
+  for (auto &c : all_connections)
+    mysql_close(c.mysql);
+
+  all_connections.clear();
+  while (!free_connections.empty())
+    free_connections.pop();
+#ifdef _WIN32
+  if (iocp)
+  {
+    CloseHandle(iocp);
+    iocp= nullptr;
+  }
+#endif
+  return 0;
+}
+
+pooled_connection::pooled_connection(MYSQL *c)
+{
+  mysql= c;
+#ifdef _WIN32
+  OVERLAPPED *ov= static_cast<OVERLAPPED *>(this);
+  memset(ov, 0, sizeof(OVERLAPPED));
+  mysql_protocol_type protocol;
+  if (c->host && !strcmp(c->host, "."))
+    protocol= MYSQL_PROTOCOL_PIPE;
+  else
+    protocol= (mysql_protocol_type) c->options.protocol;
+  is_pipe= protocol == MYSQL_PROTOCOL_PIPE;
+  handle= (HANDLE) mysql_get_socket(c);
+#else
+  fd= mysql_get_socket(c);
+#endif
+}
+
+} // namespace async_pool
diff --git a/client/connection_pool.h b/client/connection_pool.h
new file mode 100644
index 00000000000..48639d5b4fe
--- /dev/null
+++ b/client/connection_pool.h
@@ -0,0 +1,125 @@
+/*
+   Copyright (c) 2023, MariaDB.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335
+   USA
+*/
+
+#pragma once
+
+#include <mysql.h>
+#include <vector>
+#include <queue>
+#include <string>
+#ifdef _WIN32
+#include <windows.h>
+#else
+#include <poll.h>
+#endif
+
+/*
+  Implementation of asynchronous mariadb connection pool.
+
+  This pool consists of set of MYSQL* connections, created by C API
+  function. The intention is that all connections have the same state
+  same server, by the same user etc.
+
+  The "asynchronous" means the queries are executed on the server
+  without waiting for the server reply. The queries are submitted
+  with mysql_send_query(), and completions are picked by poll/IOCP.
+*/
+
+namespace async_pool
+{
+typedef void (*query_completion_handler)(MYSQL *mysql, const char *query, bool success, void *context);
+
+struct pooled_connection
+#ifdef _WIN32
+    : OVERLAPPED
+#endif
+{
+  MYSQL *mysql;
+  query_completion_handler on_completion=NULL;
+  void *context=NULL;
+  std::string query;
+  bool in_use=false;
+  bool release_connection=false;
+#ifdef _WIN32
+  bool is_pipe;
+  HANDLE handle;
+#else
+  int fd;
+#endif
+  pooled_connection(MYSQL *mysql);
+};
+
+
+struct connection_pool
+{
+private:
+  std::vector<pooled_connection> all_connections;
+  std::queue<pooled_connection *> free_connections;
+  pooled_connection *get_connection();
+  void wait_for_completions();
+  void complete_query(pooled_connection *c);
+  void add_to_pollset(pooled_connection *c);
+
+#ifdef _WIN32
+  HANDLE iocp=nullptr;
+#else
+  std::vector<pollfd> pollset;
+#endif
+public:
+  ~connection_pool();
+
+  /**
+    Add connections to the connection pool
+
+    @param con  - connections
+    @param n_connections - number of connections
+  */
+  void init(MYSQL *con[], size_t n_connections);
+
+  /**
+  Send query to the connection pool
+  Executes query on a connection in the pool, using mysql_send_query
+
+  @param query         - query string
+  @param on_completion - callback function to be called on completion
+  @param context       - user context that will be passed to the callback function
+  @param release_connecton - if true, the connection should be released to the
+         pool after the query is executed. If you execute another
+         mysql_send_query() on the same connection, set this to false.
+
+  Note: the function will block if there are no free connections in the pool.
+
+  @return return code of mysql_send_query
+  */
+  int execute_async(const char *query, query_completion_handler on_completion, void *context, bool release_connecton=true);
+
+  /** Waits for all outstanding queries to complete.*/
+  void wait_all();
+
+  /** Execute callback for each connection in the pool. */
+  void for_each_connection(void (*f)(MYSQL *mysql));
+
+  /**
+    Closes all connections in pool and frees all resources.
+    Does not wait for pending queries to complete
+    (use wait_all() for that)
+  */
+  int close();
+};
+
+} // namespace async_pool
diff --git a/libmariadb b/libmariadb
index 458a4396b44..75ab6fb1746 160000
--- a/libmariadb
+++ b/libmariadb
@@ -1 +1 @@
-Subproject commit 458a4396b443dcefedcf464067560078aa09d8b4
+Subproject commit 75ab6fb1746824648ce09805acbe535f9501df37
-- 
2.30.9