Commit 81c5a148 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: split large reconnect into multiple messages

Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 84bf3950
...@@ -2393,6 +2393,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -2393,6 +2393,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
if ((cap->issued & ci->i_flushing_caps) != if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) { ci->i_flushing_caps) {
ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
/* encode_caps_cb() also will reset these sequence
* numbers. make sure sequence numbers in cap flush
* message match later reconnect message */
cap->seq = 0;
cap->issue_seq = 0;
cap->mseq = 0;
__kick_flushing_caps(mdsc, session, ci, __kick_flushing_caps(mdsc, session, ci,
oldest_flush_tid); oldest_flush_tid);
} else { } else {
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#include <linux/ceph/auth.h> #include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h> #include <linux/ceph/debugfs.h>
#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
/* /*
* A cluster of MDS (metadata server) daemons is responsible for * A cluster of MDS (metadata server) daemons is responsible for
* managing the file system namespace (the directory hierarchy and * managing the file system namespace (the directory hierarchy and
...@@ -46,9 +48,11 @@ ...@@ -46,9 +48,11 @@
*/ */
struct ceph_reconnect_state { struct ceph_reconnect_state {
int nr_caps; struct ceph_mds_session *session;
int nr_caps, nr_realms;
struct ceph_pagelist *pagelist; struct ceph_pagelist *pagelist;
unsigned msg_version; unsigned msg_version;
bool allow_multi;
}; };
static void __wake_requests(struct ceph_mds_client *mdsc, static void __wake_requests(struct ceph_mds_client *mdsc,
...@@ -2985,6 +2989,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, ...@@ -2985,6 +2989,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
} }
static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
{
struct ceph_msg *reply;
struct ceph_pagelist *_pagelist;
struct page *page;
__le32 *addr;
int err = -ENOMEM;
if (!recon_state->allow_multi)
return -ENOSPC;
/* can't handle message that contains both caps and realm */
BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
/* pre-allocate new pagelist */
_pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!_pagelist)
return -ENOMEM;
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
if (!reply)
goto fail_msg;
/* placeholder for nr_caps */
err = ceph_pagelist_encode_32(_pagelist, 0);
if (err < 0)
goto fail;
if (recon_state->nr_caps) {
/* currently encoding caps */
err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
if (err)
goto fail;
} else {
/* placeholder for nr_realms (currently encoding relams) */
err = ceph_pagelist_encode_32(_pagelist, 0);
if (err < 0)
goto fail;
}
err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
if (err)
goto fail;
page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
addr = kmap_atomic(page);
if (recon_state->nr_caps) {
/* currently encoding caps */
*addr = cpu_to_le32(recon_state->nr_caps);
} else {
/* currently encoding relams */
*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
}
kunmap_atomic(addr);
reply->hdr.version = cpu_to_le16(5);
reply->hdr.compat_version = cpu_to_le16(4);
reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
ceph_con_send(&recon_state->session->s_con, reply);
ceph_pagelist_release(recon_state->pagelist);
recon_state->pagelist = _pagelist;
recon_state->nr_caps = 0;
recon_state->nr_realms = 0;
recon_state->msg_version = 5;
return 0;
fail:
ceph_msg_put(reply);
fail_msg:
ceph_pagelist_release(_pagelist);
return err;
}
/* /*
* Encode information about a cap for a reconnect with the MDS. * Encode information about a cap for a reconnect with the MDS.
*/ */
...@@ -3004,9 +3084,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3004,9 +3084,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout(" adding %p ino %llx.%llx cap %p %lld %s\n", dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id, inode, ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued)); ceph_cap_string(cap->issued));
err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
if (err)
return err;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */ cap->seq = 0; /* reset cap seq */
...@@ -3046,7 +3123,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3046,7 +3123,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (recon_state->msg_version >= 2) { if (recon_state->msg_version >= 2) {
int num_fcntl_locks, num_flock_locks; int num_fcntl_locks, num_flock_locks;
struct ceph_filelock *flocks = NULL; struct ceph_filelock *flocks = NULL;
size_t struct_len, total_len = 0; size_t struct_len, total_len = sizeof(u64);
u8 struct_v = 0; u8 struct_v = 0;
encode_again: encode_again:
...@@ -3081,7 +3158,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3081,7 +3158,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (recon_state->msg_version >= 3) { if (recon_state->msg_version >= 3) {
/* version, compat_version and struct_len */ /* version, compat_version and struct_len */
total_len = 2 * sizeof(u8) + sizeof(u32); total_len += 2 * sizeof(u8) + sizeof(u32);
struct_v = 2; struct_v = 2;
} }
/* /*
...@@ -3098,12 +3175,19 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3098,12 +3175,19 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct_len += sizeof(u64); /* snap_follows */ struct_len += sizeof(u64); /* snap_follows */
total_len += struct_len; total_len += struct_len;
err = ceph_pagelist_reserve(pagelist, total_len);
if (err) { if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
kfree(flocks); err = send_reconnect_partial(recon_state);
goto out_err; if (err)
goto out_freeflocks;
pagelist = recon_state->pagelist;
} }
err = ceph_pagelist_reserve(pagelist, total_len);
if (err)
goto out_freeflocks;
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
if (recon_state->msg_version >= 3) { if (recon_state->msg_version >= 3) {
ceph_pagelist_encode_8(pagelist, struct_v); ceph_pagelist_encode_8(pagelist, struct_v);
ceph_pagelist_encode_8(pagelist, 1); ceph_pagelist_encode_8(pagelist, 1);
...@@ -3115,7 +3199,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3115,7 +3199,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
num_fcntl_locks, num_flock_locks); num_fcntl_locks, num_flock_locks);
if (struct_v >= 2) if (struct_v >= 2)
ceph_pagelist_encode_64(pagelist, snap_follows); ceph_pagelist_encode_64(pagelist, snap_follows);
out_freeflocks:
kfree(flocks); kfree(flocks);
} else { } else {
u64 pathbase = 0; u64 pathbase = 0;
...@@ -3136,20 +3220,81 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3136,20 +3220,81 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
} }
err = ceph_pagelist_reserve(pagelist, err = ceph_pagelist_reserve(pagelist,
pathlen + sizeof(u32) + sizeof(rec.v1)); sizeof(u64) + sizeof(u32) +
pathlen + sizeof(rec.v1));
if (err) { if (err) {
kfree(path); goto out_freepath;
goto out_err;
} }
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
ceph_pagelist_encode_string(pagelist, path, pathlen); ceph_pagelist_encode_string(pagelist, path, pathlen);
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
out_freepath:
kfree(path); kfree(path);
} }
recon_state->nr_caps++;
out_err: out_err:
if (err >= 0)
recon_state->nr_caps++;
return err;
}
static int encode_snap_realms(struct ceph_mds_client *mdsc,
struct ceph_reconnect_state *recon_state)
{
struct rb_node *p;
struct ceph_pagelist *pagelist = recon_state->pagelist;
int err = 0;
if (recon_state->msg_version >= 4) {
err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
if (err < 0)
goto fail;
}
/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
* it will tell us.
*/
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
struct ceph_snap_realm *realm =
rb_entry(p, struct ceph_snap_realm, node);
struct ceph_mds_snaprealm_reconnect sr_rec;
if (recon_state->msg_version >= 4) {
size_t need = sizeof(u8) * 2 + sizeof(u32) +
sizeof(sr_rec);
if (pagelist->length + need > RECONNECT_MAX_SIZE) {
err = send_reconnect_partial(recon_state);
if (err)
goto fail;
pagelist = recon_state->pagelist;
}
err = ceph_pagelist_reserve(pagelist, need);
if (err)
goto fail;
ceph_pagelist_encode_8(pagelist, 1);
ceph_pagelist_encode_8(pagelist, 1);
ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
}
dout(" adding snap realm %llx seq %lld parent %llx\n",
realm->ino, realm->seq, realm->parent_ino);
sr_rec.ino = cpu_to_le64(realm->ino);
sr_rec.seq = cpu_to_le64(realm->seq);
sr_rec.parent = cpu_to_le64(realm->parent_ino);
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
if (err)
goto fail;
recon_state->nr_realms++;
}
fail:
return err; return err;
} }
...@@ -3170,18 +3315,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -3170,18 +3315,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session) struct ceph_mds_session *session)
{ {
struct ceph_msg *reply; struct ceph_msg *reply;
struct rb_node *p;
int mds = session->s_mds; int mds = session->s_mds;
int err = -ENOMEM; int err = -ENOMEM;
int s_nr_caps; struct ceph_reconnect_state recon_state = {
struct ceph_pagelist *pagelist; .session = session,
struct ceph_reconnect_state recon_state; };
LIST_HEAD(dispose); LIST_HEAD(dispose);
pr_info("mds%d reconnect start\n", mds); pr_info("mds%d reconnect start\n", mds);
pagelist = ceph_pagelist_alloc(GFP_NOFS); recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!pagelist) if (!recon_state.pagelist)
goto fail_nopagelist; goto fail_nopagelist;
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
...@@ -3225,63 +3369,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -3225,63 +3369,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
/* replay unsafe requests */ /* replay unsafe requests */
replay_unsafe_requests(mdsc, session); replay_unsafe_requests(mdsc, session);
ceph_early_kick_flushing_caps(mdsc, session);
down_read(&mdsc->snap_rwsem); down_read(&mdsc->snap_rwsem);
/* traverse this session's caps */ /* placeholder for nr_caps */
s_nr_caps = session->s_nr_caps; err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
if (err) if (err)
goto fail; goto fail;
recon_state.nr_caps = 0; if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
recon_state.pagelist = pagelist;
if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
recon_state.msg_version = 3; recon_state.msg_version = 3;
else recon_state.allow_multi = true;
} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
recon_state.msg_version = 3;
} else {
recon_state.msg_version = 2; recon_state.msg_version = 2;
}
/* trsaverse this session's caps */
err = iterate_session_caps(session, encode_caps_cb, &recon_state); err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0; session->s_cap_reconnect = 0;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
/* if (err < 0)
* snaprealms. we provide mds with the ino, seq (version), and goto fail;
* parent for all of our realms. If the mds has any newer info,
* it will tell us.
*/
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
struct ceph_snap_realm *realm =
rb_entry(p, struct ceph_snap_realm, node);
struct ceph_mds_snaprealm_reconnect sr_rec;
dout(" adding snap realm %llx seq %lld parent %llx\n", /* check if all realms can be encoded into current message */
realm->ino, realm->seq, realm->parent_ino); if (mdsc->num_snap_realms) {
sr_rec.ino = cpu_to_le64(realm->ino); size_t total_len =
sr_rec.seq = cpu_to_le64(realm->seq); recon_state.pagelist->length +
sr_rec.parent = cpu_to_le64(realm->parent_ino); mdsc->num_snap_realms *
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); sizeof(struct ceph_mds_snaprealm_reconnect);
if (recon_state.msg_version >= 4) {
/* number of realms */
total_len += sizeof(u32);
/* version, compat_version and struct_len */
total_len += mdsc->num_snap_realms *
(2 * sizeof(u8) + sizeof(u32));
}
if (total_len > RECONNECT_MAX_SIZE) {
if (!recon_state.allow_multi) {
err = -ENOSPC;
goto fail;
}
if (recon_state.nr_caps) {
err = send_reconnect_partial(&recon_state);
if (err) if (err)
goto fail; goto fail;
} }
recon_state.msg_version = 5;
}
}
reply->hdr.version = cpu_to_le16(recon_state.msg_version); err = encode_snap_realms(mdsc, &recon_state);
if (err < 0)
goto fail;
/* raced with cap release? */ if (recon_state.msg_version >= 5) {
if (s_nr_caps != recon_state.nr_caps) { err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
struct page *page = list_first_entry(&pagelist->head, if (err < 0)
goto fail;
}
if (recon_state.nr_caps || recon_state.nr_realms) {
struct page *page =
list_first_entry(&recon_state.pagelist->head,
struct page, lru); struct page, lru);
__le32 *addr = kmap_atomic(page); __le32 *addr = kmap_atomic(page);
if (recon_state.nr_caps) {
WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
*addr = cpu_to_le32(recon_state.nr_caps); *addr = cpu_to_le32(recon_state.nr_caps);
} else if (recon_state.msg_version >= 4) {
*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
}
kunmap_atomic(addr); kunmap_atomic(addr);
} }
reply->hdr.data_len = cpu_to_le32(pagelist->length); reply->hdr.version = cpu_to_le16(recon_state.msg_version);
ceph_msg_data_add_pagelist(reply, pagelist); if (recon_state.msg_version >= 4)
reply->hdr.compat_version = cpu_to_le16(4);
ceph_early_kick_flushing_caps(mdsc, session); reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
ceph_con_send(&session->s_con, reply); ceph_con_send(&session->s_con, reply);
...@@ -3292,7 +3463,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -3292,7 +3463,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
ceph_pagelist_release(pagelist); ceph_pagelist_release(recon_state.pagelist);
return; return;
fail: fail:
...@@ -3300,7 +3471,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -3300,7 +3471,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
fail_nomsg: fail_nomsg:
ceph_pagelist_release(pagelist); ceph_pagelist_release(recon_state.pagelist);
fail_nopagelist: fail_nopagelist:
pr_err("error %d preparing reconnect for mds%d\n", err, mds); pr_err("error %d preparing reconnect for mds%d\n", err, mds);
return; return;
...@@ -3698,6 +3869,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -3698,6 +3869,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
init_rwsem(&mdsc->snap_rwsem); init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT; mdsc->snap_realms = RB_ROOT;
INIT_LIST_HEAD(&mdsc->snap_empty); INIT_LIST_HEAD(&mdsc->snap_empty);
mdsc->num_snap_realms = 0;
spin_lock_init(&mdsc->snap_empty_lock); spin_lock_init(&mdsc->snap_empty_lock);
mdsc->last_tid = 0; mdsc->last_tid = 0;
mdsc->oldest_tid = 0; mdsc->oldest_tid = 0;
......
...@@ -21,11 +21,13 @@ ...@@ -21,11 +21,13 @@
#define CEPHFS_FEATURE_REPLY_ENCODING 9 #define CEPHFS_FEATURE_REPLY_ENCODING 9
#define CEPHFS_FEATURE_RECLAIM_CLIENT 10 #define CEPHFS_FEATURE_RECLAIM_CLIENT 10
#define CEPHFS_FEATURE_LAZY_CAP_WANTED 11 #define CEPHFS_FEATURE_LAZY_CAP_WANTED 11
#define CEPHFS_FEATURE_MULTI_RECONNECT 12
#define CEPHFS_FEATURES_CLIENT_SUPPORTED { \ #define CEPHFS_FEATURES_CLIENT_SUPPORTED { \
0, 1, 2, 3, 4, 5, 6, 7, \ 0, 1, 2, 3, 4, 5, 6, 7, \
CEPHFS_FEATURE_MIMIC, \ CEPHFS_FEATURE_MIMIC, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \ CEPHFS_FEATURE_LAZY_CAP_WANTED, \
CEPHFS_FEATURE_MULTI_RECONNECT, \
} }
#define CEPHFS_FEATURES_CLIENT_REQUIRED {} #define CEPHFS_FEATURES_CLIENT_REQUIRED {}
...@@ -342,6 +344,7 @@ struct ceph_mds_client { ...@@ -342,6 +344,7 @@ struct ceph_mds_client {
struct rw_semaphore snap_rwsem; struct rw_semaphore snap_rwsem;
struct rb_root snap_realms; struct rb_root snap_realms;
struct list_head snap_empty; struct list_head snap_empty;
int num_snap_realms;
spinlock_t snap_empty_lock; /* protect snap_empty */ spinlock_t snap_empty_lock; /* protect snap_empty */
u64 last_tid; /* most recent mds request */ u64 last_tid; /* most recent mds request */
......
...@@ -124,6 +124,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm( ...@@ -124,6 +124,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
INIT_LIST_HEAD(&realm->inodes_with_caps); INIT_LIST_HEAD(&realm->inodes_with_caps);
spin_lock_init(&realm->inodes_with_caps_lock); spin_lock_init(&realm->inodes_with_caps_lock);
__insert_snap_realm(&mdsc->snap_realms, realm); __insert_snap_realm(&mdsc->snap_realms, realm);
mdsc->num_snap_realms++;
dout("create_snap_realm %llx %p\n", realm->ino, realm); dout("create_snap_realm %llx %p\n", realm->ino, realm);
return realm; return realm;
} }
...@@ -175,6 +177,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, ...@@ -175,6 +177,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
rb_erase(&realm->node, &mdsc->snap_realms); rb_erase(&realm->node, &mdsc->snap_realms);
mdsc->num_snap_realms--;
if (realm->parent) { if (realm->parent) {
list_del_init(&realm->child_item); list_del_init(&realm->child_item);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment