Commit 8b9558aa authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

libceph: use keepalive2 to verify the mon session is alive

Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent d194cd1d
...@@ -46,6 +46,7 @@ struct ceph_options { ...@@ -46,6 +46,7 @@ struct ceph_options {
unsigned long mount_timeout; /* jiffies */ unsigned long mount_timeout; /* jiffies */
unsigned long osd_idle_ttl; /* jiffies */ unsigned long osd_idle_ttl; /* jiffies */
unsigned long osd_keepalive_timeout; /* jiffies */ unsigned long osd_keepalive_timeout; /* jiffies */
unsigned long monc_ping_timeout; /* jiffies */
/* /*
* any type that can't be simply compared or doesn't need need * any type that can't be simply compared or doesn't need need
...@@ -66,6 +67,7 @@ struct ceph_options { ...@@ -66,6 +67,7 @@ struct ceph_options {
#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000) #define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000) #define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000)
#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000) #define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000)
#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000)
#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) #define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
......
...@@ -248,6 +248,8 @@ struct ceph_connection { ...@@ -248,6 +248,8 @@ struct ceph_connection {
int in_base_pos; /* bytes read */ int in_base_pos; /* bytes read */
__le64 in_temp_ack; /* for reading an ack */ __le64 in_temp_ack; /* for reading an ack */
struct timespec last_keepalive_ack;
struct delayed_work work; /* send|recv work */ struct delayed_work work; /* send|recv work */
unsigned long delay; /* current delay interval */ unsigned long delay; /* current delay interval */
}; };
...@@ -285,6 +287,8 @@ extern void ceph_msg_revoke(struct ceph_msg *msg); ...@@ -285,6 +287,8 @@ extern void ceph_msg_revoke(struct ceph_msg *msg);
extern void ceph_msg_revoke_incoming(struct ceph_msg *msg); extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con); extern void ceph_con_keepalive(struct ceph_connection *con);
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);
extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment); size_t length, size_t alignment);
......
...@@ -84,10 +84,12 @@ struct ceph_entity_inst { ...@@ -84,10 +84,12 @@ struct ceph_entity_inst {
#define CEPH_MSGR_TAG_MSG 7 /* message */ #define CEPH_MSGR_TAG_MSG 7 /* message */
#define CEPH_MSGR_TAG_ACK 8 /* message ack */ #define CEPH_MSGR_TAG_ACK 8 /* message ack */
#define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */ #define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */
#define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */ #define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */
#define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */ #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
#define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */ #define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */
#define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */ #define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */
#define CEPH_MSGR_TAG_KEEPALIVE2 14 /* keepalive2 byte + ceph_timespec */
#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive2 reply */
/* /*
......
...@@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name, ...@@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name,
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
/* get mon ip(s) */ /* get mon ip(s) */
/* ip1[:port1][,ip2[:port2]...] */ /* ip1[:port1][,ip2[:port2]...] */
......
...@@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache; ...@@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache;
static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
#ifdef CONFIG_LOCKDEP #ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class; static struct lock_class_key socket_class;
...@@ -1351,7 +1352,15 @@ static void prepare_write_keepalive(struct ceph_connection *con) ...@@ -1351,7 +1352,15 @@ static void prepare_write_keepalive(struct ceph_connection *con)
{ {
dout("prepare_write_keepalive %p\n", con); dout("prepare_write_keepalive %p\n", con);
con_out_kvec_reset(con); con_out_kvec_reset(con);
con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
struct timespec ts = CURRENT_TIME;
struct ceph_timespec ceph_ts;
ceph_encode_timespec(&ceph_ts, &ts);
con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
con_out_kvec_add(con, sizeof(ceph_ts), &ceph_ts);
} else {
con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
}
con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CON_FLAG_WRITE_PENDING);
} }
...@@ -1625,6 +1634,12 @@ static void prepare_read_tag(struct ceph_connection *con) ...@@ -1625,6 +1634,12 @@ static void prepare_read_tag(struct ceph_connection *con)
con->in_tag = CEPH_MSGR_TAG_READY; con->in_tag = CEPH_MSGR_TAG_READY;
} }
static void prepare_read_keepalive_ack(struct ceph_connection *con)
{
dout("prepare_read_keepalive_ack %p\n", con);
con->in_base_pos = 0;
}
/* /*
* Prepare to read a message. * Prepare to read a message.
*/ */
...@@ -2457,6 +2472,17 @@ static void process_message(struct ceph_connection *con) ...@@ -2457,6 +2472,17 @@ static void process_message(struct ceph_connection *con)
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
} }
static int read_keepalive_ack(struct ceph_connection *con)
{
struct ceph_timespec ceph_ts;
size_t size = sizeof(ceph_ts);
int ret = read_partial(con, size, size, &ceph_ts);
if (ret <= 0)
return ret;
ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
prepare_read_tag(con);
return 1;
}
/* /*
* Write something to the socket. Called in a worker thread when the * Write something to the socket. Called in a worker thread when the
...@@ -2526,6 +2552,10 @@ static int try_write(struct ceph_connection *con) ...@@ -2526,6 +2552,10 @@ static int try_write(struct ceph_connection *con)
do_next: do_next:
if (con->state == CON_STATE_OPEN) { if (con->state == CON_STATE_OPEN) {
if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
/* is anything else pending? */ /* is anything else pending? */
if (!list_empty(&con->out_queue)) { if (!list_empty(&con->out_queue)) {
prepare_write_message(con); prepare_write_message(con);
...@@ -2535,10 +2565,6 @@ static int try_write(struct ceph_connection *con) ...@@ -2535,10 +2565,6 @@ static int try_write(struct ceph_connection *con)
prepare_write_ack(con); prepare_write_ack(con);
goto more; goto more;
} }
if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
} }
/* Nothing to do! */ /* Nothing to do! */
...@@ -2641,6 +2667,9 @@ static int try_read(struct ceph_connection *con) ...@@ -2641,6 +2667,9 @@ static int try_read(struct ceph_connection *con)
case CEPH_MSGR_TAG_ACK: case CEPH_MSGR_TAG_ACK:
prepare_read_ack(con); prepare_read_ack(con);
break; break;
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
prepare_read_keepalive_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE: case CEPH_MSGR_TAG_CLOSE:
con_close_socket(con); con_close_socket(con);
con->state = CON_STATE_CLOSED; con->state = CON_STATE_CLOSED;
...@@ -2684,6 +2713,12 @@ static int try_read(struct ceph_connection *con) ...@@ -2684,6 +2713,12 @@ static int try_read(struct ceph_connection *con)
process_ack(con); process_ack(con);
goto more; goto more;
} }
if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
ret = read_keepalive_ack(con);
if (ret <= 0)
goto out;
goto more;
}
out: out:
dout("try_read done on %p ret %d\n", con, ret); dout("try_read done on %p ret %d\n", con, ret);
...@@ -3101,6 +3136,20 @@ void ceph_con_keepalive(struct ceph_connection *con) ...@@ -3101,6 +3136,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
} }
EXPORT_SYMBOL(ceph_con_keepalive); EXPORT_SYMBOL(ceph_con_keepalive);
bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval)
{
if (interval > 0 &&
(con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
struct timespec now = CURRENT_TIME;
struct timespec ts;
jiffies_to_timespec(interval, &ts);
ts = timespec_add(con->last_keepalive_ack, ts);
return timespec_compare(&now, &ts) >= 0;
}
return false;
}
static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
{ {
struct ceph_msg_data *data; struct ceph_msg_data *data;
......
...@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc) ...@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc)
CEPH_ENTITY_TYPE_MON, monc->cur_mon, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
&monc->monmap->mon_inst[monc->cur_mon].addr); &monc->monmap->mon_inst[monc->cur_mon].addr);
/* send an initial keepalive to ensure our timestamp is
* valid by the time we are in an OPENED state */
ceph_con_keepalive(&monc->con);
/* initiatiate authentication handshake */ /* initiatiate authentication handshake */
ret = ceph_auth_build_hello(monc->auth, ret = ceph_auth_build_hello(monc->auth,
monc->m_auth->front.iov_base, monc->m_auth->front.iov_base,
...@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc) ...@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc)
*/ */
static void __schedule_delayed(struct ceph_mon_client *monc) static void __schedule_delayed(struct ceph_mon_client *monc)
{ {
unsigned int delay; struct ceph_options *opt = monc->client->options;
unsigned long delay;
if (monc->cur_mon < 0 || __sub_expired(monc)) if (monc->cur_mon < 0 || __sub_expired(monc)) {
delay = 10 * HZ; delay = 10 * HZ;
else } else {
delay = 20 * HZ; delay = 20 * HZ;
dout("__schedule_delayed after %u\n", delay); if (opt->monc_ping_timeout > 0)
schedule_delayed_work(&monc->delayed_work, delay); delay = min(delay, opt->monc_ping_timeout / 3);
}
dout("__schedule_delayed after %lu\n", delay);
schedule_delayed_work(&monc->delayed_work,
round_jiffies_relative(delay));
} }
/* /*
...@@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work) ...@@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work)
__close_session(monc); __close_session(monc);
__open_session(monc); /* continue hunting */ __open_session(monc); /* continue hunting */
} else { } else {
ceph_con_keepalive(&monc->con); struct ceph_options *opt = monc->client->options;
int is_auth = ceph_auth_is_authenticated(monc->auth);
if (ceph_con_keepalive_expired(&monc->con,
opt->monc_ping_timeout)) {
dout("monc keepalive timeout\n");
is_auth = 0;
__close_session(monc);
monc->hunting = true;
__open_session(monc);
}
__validate_auth(monc); if (!monc->hunting) {
ceph_con_keepalive(&monc->con);
__validate_auth(monc);
}
if (ceph_auth_is_authenticated(monc->auth)) if (is_auth)
__send_subscribe(monc); __send_subscribe(monc);
} }
__schedule_delayed(monc); __schedule_delayed(monc);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment