Commit 3bf7878f authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-4.13-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The main item here is support for v12.y.z ("Luminous") clusters:
  RESEND_ON_SPLIT, RADOS_BACKOFF, OSDMAP_PG_UPMAP and CRUSH_CHOOSE_ARGS
  feature bits, and various other changes in the RADOS client protocol.

  On top of that we have a new fsc mount option to allow supplying
  fscache uniquifier (similar to NFS) and the usual pile of filesystem
  fixes from Zheng"

* tag 'ceph-for-4.13-rc1' of git://github.com/ceph/ceph-client: (44 commits)
  libceph: advertise support for NEW_OSDOP_ENCODING and SERVER_LUMINOUS
  libceph: osd_state is 32 bits wide in luminous
  crush: remove an obsolete comment
  crush: crush_init_workspace starts with struct crush_work
  libceph, crush: per-pool crush_choose_arg_map for crush_do_rule()
  crush: implement weight and id overrides for straw2
  libceph: apply_upmap()
  libceph: compute actual pgid in ceph_pg_to_up_acting_osds()
  libceph: pg_upmap[_items] infrastructure
  libceph: ceph_decode_skip_* helpers
  libceph: kill __{insert,lookup,remove}_pg_mapping()
  libceph: introduce and switch to decode_pg_mapping()
  libceph: don't pass pgid by value
  libceph: respect RADOS_BACKOFF backoffs
  libceph: make DEFINE_RB_* helpers more general
  libceph: avoid unnecessary pi lookups in calc_target()
  libceph: use target pi for calc_target() calculations
  libceph: always populate t->target_{oid,oloc} in calc_target()
  libceph: make sure need_resend targets reflect latest map
  libceph: delete from need_resend_linger before check_linger_pool_dne()
  ...
parents 07d306c8 33e9c8db
......@@ -530,14 +530,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
long writeback_stat;
u64 truncate_size;
u32 truncate_seq;
int err = 0, len = PAGE_SIZE;
int err, len = PAGE_SIZE;
dout("writepage %p idx %lu\n", page, page->index);
if (!page->mapping || !page->mapping->host) {
dout("writepage %p - no mapping\n", page);
return -EFAULT;
}
inode = page->mapping->host;
ci = ceph_inode(inode);
fsc = ceph_inode_to_client(inode);
......@@ -547,7 +543,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
snapc = page_snap_context(page);
if (snapc == NULL) {
dout("writepage %p page %p not dirty?\n", inode, page);
goto out;
return 0;
}
oldest = get_oldest_context(inode, &snap_size,
&truncate_size, &truncate_seq);
......@@ -555,9 +551,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p snapc %p not writeable - noop\n",
inode, page, snapc);
/* we should only noop if called by kswapd */
WARN_ON((current->flags & PF_MEMALLOC) == 0);
WARN_ON(!(current->flags & PF_MEMALLOC));
ceph_put_snap_context(oldest);
goto out;
redirty_page_for_writepage(wbc, page);
return 0;
}
ceph_put_snap_context(oldest);
......@@ -567,8 +564,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
/* is this a partial page at end of file? */
if (page_off >= snap_size) {
dout("%p page eof %llu\n", page, snap_size);
goto out;
return 0;
}
if (snap_size < page_off + len)
len = snap_size - page_off;
......@@ -595,7 +593,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage interrupted page %p\n", page);
redirty_page_for_writepage(wbc, page);
end_page_writeback(page);
goto out;
return err;
}
dout("writepage setting page/mapping error %d %p\n",
err, page);
......@@ -611,7 +609,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
end_page_writeback(page);
ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
ceph_put_snap_context(snapc); /* page's reference */
out:
return err;
}
......@@ -1318,7 +1315,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata)
{
struct inode *inode = file_inode(file);
int check_cap = 0;
bool check_cap = false;
dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
inode, page, (int)pos, (int)copied, (int)len);
......
......@@ -35,18 +35,34 @@ struct fscache_netfs ceph_cache_netfs = {
.version = 0,
};
static DEFINE_MUTEX(ceph_fscache_lock);
static LIST_HEAD(ceph_fscache_list);
struct ceph_fscache_entry {
struct list_head list;
struct fscache_cookie *fscache;
struct ceph_fsid fsid;
size_t uniq_len;
char uniquifier[0];
};
static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data,
void *buffer, uint16_t maxbuf)
{
const struct ceph_fs_client* fsc = cookie_netfs_data;
uint16_t klen;
const char *fscache_uniq = fsc->mount_options->fscache_uniq;
uint16_t fsid_len, uniq_len;
klen = sizeof(fsc->client->fsid);
if (klen > maxbuf)
fsid_len = sizeof(fsc->client->fsid);
uniq_len = fscache_uniq ? strlen(fscache_uniq) : 0;
if (fsid_len + uniq_len > maxbuf)
return 0;
memcpy(buffer, &fsc->client->fsid, klen);
return klen;
memcpy(buffer, &fsc->client->fsid, fsid_len);
if (uniq_len)
memcpy(buffer + fsid_len, fscache_uniq, uniq_len);
return fsid_len + uniq_len;
}
static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
......@@ -67,13 +83,54 @@ void ceph_fscache_unregister(void)
int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
{
const struct ceph_fsid *fsid = &fsc->client->fsid;
const char *fscache_uniq = fsc->mount_options->fscache_uniq;
size_t uniq_len = fscache_uniq ? strlen(fscache_uniq) : 0;
struct ceph_fscache_entry *ent;
int err = 0;
mutex_lock(&ceph_fscache_lock);
list_for_each_entry(ent, &ceph_fscache_list, list) {
if (memcmp(&ent->fsid, fsid, sizeof(*fsid)))
continue;
if (ent->uniq_len != uniq_len)
continue;
if (uniq_len && memcmp(ent->uniquifier, fscache_uniq, uniq_len))
continue;
pr_err("fscache cookie already registered for fsid %pU\n", fsid);
pr_err(" use fsc=%%s mount option to specify a uniquifier\n");
err = -EBUSY;
goto out_unlock;
}
ent = kzalloc(sizeof(*ent) + uniq_len, GFP_KERNEL);
if (!ent) {
err = -ENOMEM;
goto out_unlock;
}
fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
&ceph_fscache_fsid_object_def,
fsc, true);
if (!fsc->fscache)
pr_err("Unable to register fsid: %p fscache cookie\n", fsc);
return 0;
if (fsc->fscache) {
memcpy(&ent->fsid, fsid, sizeof(*fsid));
if (uniq_len > 0) {
memcpy(&ent->uniquifier, fscache_uniq, uniq_len);
ent->uniq_len = uniq_len;
}
ent->fscache = fsc->fscache;
list_add_tail(&ent->list, &ceph_fscache_list);
} else {
kfree(ent);
pr_err("unable to register fscache cookie for fsid %pU\n",
fsid);
/* all other fs ignore this error */
}
out_unlock:
mutex_unlock(&ceph_fscache_lock);
return err;
}
static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
......@@ -349,7 +406,24 @@ void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
{
fscache_relinquish_cookie(fsc->fscache, 0);
if (fscache_cookie_valid(fsc->fscache)) {
struct ceph_fscache_entry *ent;
bool found = false;
mutex_lock(&ceph_fscache_lock);
list_for_each_entry(ent, &ceph_fscache_list, list) {
if (ent->fscache == fsc->fscache) {
list_del(&ent->list);
kfree(ent);
found = true;
break;
}
}
WARN_ON_ONCE(!found);
mutex_unlock(&ceph_fscache_lock);
__fscache_relinquish_cookie(fsc->fscache, 0);
}
fsc->fscache = NULL;
}
......
......@@ -1653,6 +1653,21 @@ static int try_nonblocking_invalidate(struct inode *inode)
return -1;
}
bool __ceph_should_report_size(struct ceph_inode_info *ci)
{
loff_t size = ci->vfs_inode.i_size;
/* mds will adjust max size according to the reported size */
if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
return false;
if (size >= ci->i_max_size)
return true;
/* half of previous max_size increment has been used */
if (ci->i_max_size > ci->i_reported_size &&
(size << 1) >= ci->i_max_size + ci->i_reported_size)
return true;
return false;
}
/*
* Swiss army knife function to examine currently used and wanted
* versus held caps. Release, flush, ack revoked caps to mds as
......@@ -1806,8 +1821,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
}
/* approaching file_max? */
if ((inode->i_size << 1) >= ci->i_max_size &&
(ci->i_reported_size << 1) < ci->i_max_size) {
if (__ceph_should_report_size(ci)) {
dout("i_size approaching max_size\n");
goto ack;
}
......@@ -3027,8 +3041,10 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
le32_to_cpu(grant->truncate_seq),
le64_to_cpu(grant->truncate_size),
size);
/* max size increase? */
if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
}
if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
if (max_size != ci->i_max_size) {
dout("max_size %lld -> %llu\n",
ci->i_max_size, max_size);
ci->i_max_size = max_size;
......@@ -3037,6 +3053,10 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
ci->i_requested_max_size = 0;
}
wake = true;
} else if (ci->i_wanted_max_size > ci->i_max_size &&
ci->i_wanted_max_size > ci->i_requested_max_size) {
/* CEPH_CAP_OP_IMPORT */
wake = true;
}
}
......@@ -3554,7 +3574,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
}
/* make sure we re-request max_size, if necessary */
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
*old_issued = issued;
......@@ -3790,6 +3809,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
*/
void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
{
struct inode *inode;
struct ceph_inode_info *ci;
int flags = CHECK_CAPS_NODELAY;
......@@ -3805,9 +3825,15 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
time_before(jiffies, ci->i_hold_caps_max))
break;
list_del_init(&ci->i_cap_delay_list);
inode = igrab(&ci->vfs_inode);
spin_unlock(&mdsc->cap_delay_lock);
dout("check_delayed_caps on %p\n", &ci->vfs_inode);
ceph_check_caps(ci, flags, NULL);
if (inode) {
dout("check_delayed_caps on %p\n", inode);
ceph_check_caps(ci, flags, NULL);
iput(inode);
}
}
spin_unlock(&mdsc->cap_delay_lock);
}
......
......@@ -1040,8 +1040,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
int num_pages;
int written = 0;
int flags;
int check_caps = 0;
int ret;
bool check_caps = false;
struct timespec mtime = current_time(inode);
size_t count = iov_iter_count(from);
......
......@@ -1016,6 +1016,7 @@ static void update_dentry_lease(struct dentry *dentry,
long unsigned ttl = from_time + (duration * HZ) / 1000;
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
struct inode *dir;
struct ceph_mds_session *old_lease_session = NULL;
/*
* Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
......@@ -1051,8 +1052,10 @@ static void update_dentry_lease(struct dentry *dentry,
time_before(ttl, di->time))
goto out_unlock; /* we already have a newer lease. */
if (di->lease_session && di->lease_session != session)
goto out_unlock;
if (di->lease_session && di->lease_session != session) {
old_lease_session = di->lease_session;
di->lease_session = NULL;
}
ceph_dentry_lru_touch(dentry);
......@@ -1065,6 +1068,8 @@ static void update_dentry_lease(struct dentry *dentry,
di->time = ttl;
out_unlock:
spin_unlock(&dentry->d_lock);
if (old_lease_session)
ceph_put_mds_session(old_lease_session);
return;
}
......@@ -1653,20 +1658,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
return err;
}
int ceph_inode_set_size(struct inode *inode, loff_t size)
bool ceph_inode_set_size(struct inode *inode, loff_t size)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int ret = 0;
bool ret;
spin_lock(&ci->i_ceph_lock);
dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
i_size_write(inode, size);
inode->i_blocks = calc_inode_blocks(size);
/* tell the MDS if we are approaching max_size */
if ((size << 1) >= ci->i_max_size &&
(ci->i_reported_size << 1) < ci->i_max_size)
ret = 1;
ret = __ceph_should_report_size(ci);
spin_unlock(&ci->i_ceph_lock);
return ret;
......
......@@ -127,6 +127,29 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
req->r_tid);
mutex_lock(&mdsc->mutex);
if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
err = 0;
} else {
/*
* ensure we aren't running concurrently with
* ceph_fill_trace or ceph_readdir_prepopulate, which
* rely on locks (dir mutex) held by our caller.
*/
mutex_lock(&req->r_fill_mutex);
req->r_err = err;
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
mutex_unlock(&req->r_fill_mutex);
if (!req->r_session) {
// haven't sent the request
err = 0;
}
}
mutex_unlock(&mdsc->mutex);
if (!err)
return 0;
intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
USE_AUTH_MDS);
if (IS_ERR(intr_req))
......@@ -146,7 +169,7 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
if (err && err != -ERESTARTSYS)
return err;
wait_for_completion(&req->r_completion);
wait_for_completion_killable(&req->r_safe_completion);
return 0;
}
......
......@@ -3769,13 +3769,13 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
{
struct ceph_mds_client *mdsc = fsc->mdsc;
dout("mdsc_destroy %p\n", mdsc);
ceph_mdsc_stop(mdsc);
/* flush out any connection work with references to us */
ceph_msgr_flush();
ceph_mdsc_stop(mdsc);
fsc->mdsc = NULL;
kfree(mdsc);
dout("mdsc_destroy %p done\n", mdsc);
......
......@@ -121,6 +121,7 @@ enum {
/* int args above */
Opt_snapdirname,
Opt_mds_namespace,
Opt_fscache_uniq,
Opt_last_string,
/* string args above */
Opt_dirstat,
......@@ -158,6 +159,7 @@ static match_table_t fsopt_tokens = {
/* int args above */
{Opt_snapdirname, "snapdirname=%s"},
{Opt_mds_namespace, "mds_namespace=%s"},
{Opt_fscache_uniq, "fsc=%s"},
/* string args above */
{Opt_dirstat, "dirstat"},
{Opt_nodirstat, "nodirstat"},
......@@ -223,6 +225,14 @@ static int parse_fsopt_token(char *c, void *private)
if (!fsopt->mds_namespace)
return -ENOMEM;
break;
case Opt_fscache_uniq:
fsopt->fscache_uniq = kstrndup(argstr[0].from,
argstr[0].to-argstr[0].from,
GFP_KERNEL);
if (!fsopt->fscache_uniq)
return -ENOMEM;
fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
break;
/* misc */
case Opt_wsize:
fsopt->wsize = intval;
......@@ -317,6 +327,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
kfree(args->snapdir_name);
kfree(args->mds_namespace);
kfree(args->server_path);
kfree(args->fscache_uniq);
kfree(args);
}
......@@ -350,8 +361,10 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
if (ret)
return ret;
ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
if (ret)
return ret;
ret = strcmp_null(fsopt1->fscache_uniq, fsopt2->fscache_uniq);
if (ret)
return ret;
......@@ -475,8 +488,12 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noasyncreaddir");
if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
seq_puts(m, ",nodcache");
if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE)
seq_puts(m, ",fsc");
if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
if (fsopt->fscache_uniq)
seq_printf(m, ",fsc=%s", fsopt->fscache_uniq);
else
seq_puts(m, ",fsc");
}
if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM)
seq_puts(m, ",nopoolperm");
......@@ -597,18 +614,11 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
if (!fsc->wb_pagevec_pool)
goto fail_trunc_wq;
/* setup fscache */
if ((fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) &&
(ceph_fscache_register_fs(fsc) != 0))
goto fail_fscache;
/* caps */
fsc->min_caps = fsopt->max_readdir;
return fsc;
fail_fscache:
ceph_fscache_unregister_fs(fsc);
fail_trunc_wq:
destroy_workqueue(fsc->trunc_wq);
fail_pg_inv_wq:
......@@ -626,8 +636,6 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);
ceph_fscache_unregister_fs(fsc);
destroy_workqueue(fsc->wb_wq);
destroy_workqueue(fsc->pg_inv_wq);
destroy_workqueue(fsc->trunc_wq);
......@@ -636,8 +644,6 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
destroy_mount_options(fsc->mount_options);
ceph_fs_debugfs_cleanup(fsc);
ceph_destroy_client(fsc->client);
kfree(fsc);
......@@ -822,6 +828,13 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
if (err < 0)
goto out;
/* setup fscache */
if (fsc->mount_options->flags & CEPH_MOUNT_OPT_FSCACHE) {
err = ceph_fscache_register_fs(fsc);
if (err < 0)
goto out;
}
if (!fsc->mount_options->server_path) {
path = "";
dout("mount opening path \\t\n");
......@@ -1040,6 +1053,12 @@ static void ceph_kill_sb(struct super_block *s)
ceph_mdsc_pre_umount(fsc->mdsc);
generic_shutdown_super(s);
fsc->client->extra_mon_dispatch = NULL;
ceph_fs_debugfs_cleanup(fsc);
ceph_fscache_unregister_fs(fsc);
ceph_mdsc_destroy(fsc);
destroy_fs_client(fsc);
......
......@@ -73,6 +73,7 @@ struct ceph_mount_options {
char *snapdir_name; /* default ".snap" */
char *mds_namespace; /* default NULL */
char *server_path; /* default "/" */
char *fscache_uniq; /* default NULL */
};
struct ceph_fs_client {
......@@ -793,7 +794,7 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
extern int ceph_inode_holds_cap(struct inode *inode, int mask);
extern int ceph_inode_set_size(struct inode *inode, loff_t size);
extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern void ceph_queue_vmtruncate(struct inode *inode);
......@@ -918,6 +919,7 @@ extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc);
extern void ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession);
extern bool __ceph_should_report_size(struct ceph_inode_info *ci);
extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session);
extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
......
......@@ -756,6 +756,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
/* let's see if a virtual xattr was requested */
vxattr = ceph_match_vxattr(inode, name);
if (vxattr) {
err = ceph_do_getattr(inode, 0, true);
if (err)
return err;
err = -ENODATA;
if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
err = vxattr->getxattr_cb(ci, value, size);
......
This diff is collapsed.
......@@ -147,6 +147,7 @@ struct ceph_dir_layout {
#define CEPH_MSG_OSD_OP 42
#define CEPH_MSG_OSD_OPREPLY 43
#define CEPH_MSG_WATCH_NOTIFY 44
#define CEPH_MSG_OSD_BACKOFF 61
/* watch-notify operations */
......
......@@ -132,6 +132,66 @@ static inline char *ceph_extract_encoded_string(void **p, void *end,
return ERR_PTR(-ERANGE);
}
/*
* skip helpers
*/
#define ceph_decode_skip_n(p, end, n, bad) \
do { \
ceph_decode_need(p, end, n, bad); \
*p += n; \
} while (0)
#define ceph_decode_skip_64(p, end, bad) \
ceph_decode_skip_n(p, end, sizeof(u64), bad)
#define ceph_decode_skip_32(p, end, bad) \
ceph_decode_skip_n(p, end, sizeof(u32), bad)
#define ceph_decode_skip_16(p, end, bad) \
ceph_decode_skip_n(p, end, sizeof(u16), bad)
#define ceph_decode_skip_8(p, end, bad) \
ceph_decode_skip_n(p, end, sizeof(u8), bad)
#define ceph_decode_skip_string(p, end, bad) \
do { \
u32 len; \
\
ceph_decode_32_safe(p, end, len, bad); \
ceph_decode_skip_n(p, end, len, bad); \
} while (0)
#define ceph_decode_skip_set(p, end, type, bad) \
do { \
u32 len; \
\
ceph_decode_32_safe(p, end, len, bad); \
while (len--) \
ceph_decode_skip_##type(p, end, bad); \
} while (0)
#define ceph_decode_skip_map(p, end, ktype, vtype, bad) \
do { \
u32 len; \
\
ceph_decode_32_safe(p, end, len, bad); \
while (len--) { \
ceph_decode_skip_##ktype(p, end, bad); \
ceph_decode_skip_##vtype(p, end, bad); \
} \
} while (0)
#define ceph_decode_skip_map_of_map(p, end, ktype1, ktype2, vtype2, bad) \
do { \
u32 len; \
\
ceph_decode_32_safe(p, end, len, bad); \
while (len--) { \
ceph_decode_skip_##ktype1(p, end, bad); \
ceph_decode_skip_map(p, end, ktype2, vtype2, bad); \
} \
} while (0)
/*
* struct ceph_timespec <-> struct timespec
*/
......
......@@ -184,10 +184,11 @@ static inline int calc_pages_for(u64 off, u64 len)
(off >> PAGE_SHIFT);
}
/*
* These are not meant to be generic - an integer key is assumed.
*/
#define DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
#define RB_BYVAL(a) (a)
#define RB_BYPTR(a) (&(a))
#define RB_CMP3WAY(a, b) ((a) < (b) ? -1 : (a) > (b))
#define DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \
static void insert_##name(struct rb_root *root, type *t) \
{ \
struct rb_node **n = &root->rb_node; \
......@@ -197,11 +198,13 @@ static void insert_##name(struct rb_root *root, type *t) \
\
while (*n) { \
type *cur = rb_entry(*n, type, nodefld); \
int cmp; \
\
parent = *n; \
if (t->keyfld < cur->keyfld) \
cmp = cmpexp(keyexp(t->keyfld), keyexp(cur->keyfld)); \
if (cmp < 0) \
n = &(*n)->rb_left; \
else if (t->keyfld > cur->keyfld) \
else if (cmp > 0) \
n = &(*n)->rb_right; \
else \
BUG(); \
......@@ -217,19 +220,24 @@ static void erase_##name(struct rb_root *root, type *t) \
RB_CLEAR_NODE(&t->nodefld); \
}
#define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) \
extern type __lookup_##name##_key; \
static type *lookup_##name(struct rb_root *root, \
typeof(__lookup_##name##_key.keyfld) key) \
/*
* @lookup_param_type is a parameter and not constructed from (@type,
* @keyfld) with typeof() because adding const is too unwieldy.
*/
#define DEFINE_RB_LOOKUP_FUNC2(name, type, keyfld, cmpexp, keyexp, \
lookup_param_type, nodefld) \
static type *lookup_##name(struct rb_root *root, lookup_param_type key) \
{ \
struct rb_node *n = root->rb_node; \
\
while (n) { \
type *cur = rb_entry(n, type, nodefld); \
int cmp; \
\
if (key < cur->keyfld) \
cmp = cmpexp(key, keyexp(cur->keyfld)); \
if (cmp < 0) \
n = n->rb_left; \
else if (key > cur->keyfld) \
else if (cmp > 0) \
n = n->rb_right; \
else \
return cur; \
......@@ -238,6 +246,23 @@ static type *lookup_##name(struct rb_root *root, \
return NULL; \
}
#define DEFINE_RB_FUNCS2(name, type, keyfld, cmpexp, keyexp, \
lookup_param_type, nodefld) \
DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \
DEFINE_RB_LOOKUP_FUNC2(name, type, keyfld, cmpexp, keyexp, \
lookup_param_type, nodefld)
/*
* Shorthands for integer keys.
*/
#define DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, RB_CMP3WAY, RB_BYVAL, nodefld)
#define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) \
extern type __lookup_##name##_key; \
DEFINE_RB_LOOKUP_FUNC2(name, type, keyfld, RB_CMP3WAY, RB_BYVAL, \
typeof(__lookup_##name##_key.keyfld), nodefld)
#define DEFINE_RB_FUNCS(name, type, keyfld, nodefld) \
DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld)
......
......@@ -44,6 +44,8 @@ struct ceph_connection_operations {
struct ceph_msg_header *hdr,
int *skip);
void (*reencode_message) (struct ceph_msg *msg);
int (*sign_message) (struct ceph_msg *msg);
int (*check_message_signature) (struct ceph_msg *msg);
};
......
#ifndef _FS_CEPH_OSD_CLIENT_H
#define _FS_CEPH_OSD_CLIENT_H
#include <linux/bitrev.h>
#include <linux/completion.h>
#include <linux/kref.h>
#include <linux/mempool.h>
......@@ -36,6 +37,8 @@ struct ceph_osd {
struct ceph_connection o_con;
struct rb_root o_requests;
struct rb_root o_linger_requests;
struct rb_root o_backoff_mappings;
struct rb_root o_backoffs_by_id;
struct list_head o_osd_lru;
struct ceph_auth_handshake o_auth;
unsigned long lru_ttl;
......@@ -136,7 +139,8 @@ struct ceph_osd_request_target {
struct ceph_object_id target_oid;
struct ceph_object_locator target_oloc;
struct ceph_pg pgid;
struct ceph_pg pgid; /* last raw pg we mapped to */
struct ceph_spg spgid; /* last actual spg we mapped to */
u32 pg_num;
u32 pg_num_mask;
struct ceph_osds acting;
......@@ -148,6 +152,9 @@ struct ceph_osd_request_target {
unsigned int flags; /* CEPH_OSD_FLAG_* */
bool paused;
u32 epoch;
u32 last_force_resend;
int osd;
};
......@@ -193,7 +200,6 @@ struct ceph_osd_request {
unsigned long r_stamp; /* jiffies, send or check time */
unsigned long r_start_stamp; /* jiffies */
int r_attempts;
u32 r_last_force_resend;
u32 r_map_dne_bound;
struct ceph_osd_req_op r_ops[];
......@@ -203,6 +209,23 @@ struct ceph_request_redirect {
struct ceph_object_locator oloc;
};
/*
* osd request identifier
*
* caller name + incarnation# + tid to unique identify this request
*/
struct ceph_osd_reqid {
struct ceph_entity_name name;
__le64 tid;
__le32 inc;
} __packed;
struct ceph_blkin_trace_info {
__le64 trace_id;
__le64 span_id;
__le64 parent_span_id;
} __packed;
typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len);
typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err);
......@@ -221,7 +244,6 @@ struct ceph_osd_linger_request {
struct list_head pending_lworks;
struct ceph_osd_request_target t;
u32 last_force_resend;
u32 map_dne_bound;
struct timespec mtime;
......@@ -256,6 +278,48 @@ struct ceph_watch_item {
struct ceph_entity_addr addr;
};
struct ceph_spg_mapping {
struct rb_node node;
struct ceph_spg spgid;
struct rb_root backoffs;
};
struct ceph_hobject_id {
void *key;
size_t key_len;
void *oid;
size_t oid_len;
u64 snapid;
u32 hash;
u8 is_max;
void *nspace;
size_t nspace_len;
s64 pool;
/* cache */
u32 hash_reverse_bits;
};
static inline void ceph_hoid_build_hash_cache(struct ceph_hobject_id *hoid)
{
hoid->hash_reverse_bits = bitrev32(hoid->hash);
}
/*
* PG-wide backoff: [begin, end)
* per-object backoff: begin == end
*/
struct ceph_osd_backoff {
struct rb_node spg_node;
struct rb_node id_node;
struct ceph_spg spgid;
u64 id;
struct ceph_hobject_id *begin;
struct ceph_hobject_id *end;
};
#define CEPH_LINGER_ID_START 0xffff000000000000ULL
struct ceph_osd_client {
......
......@@ -24,7 +24,15 @@ struct ceph_pg {
uint32_t seed;
};
#define CEPH_SPG_NOSHARD -1
struct ceph_spg {
struct ceph_pg pgid;
s8 shard;
};
int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs);
int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs);
#define CEPH_POOL_FLAG_HASHPSPOOL (1ULL << 0) /* hash pg seed and pool id
together */
......@@ -135,10 +143,14 @@ struct ceph_pg_mapping {
struct {
int len;
int osds[];
} pg_temp;
} pg_temp, pg_upmap;
struct {
int osd;
} primary_temp;
struct {
int len;
int from_to[][2];
} pg_upmap_items;
};
};
......@@ -150,13 +162,17 @@ struct ceph_osdmap {
u32 flags; /* CEPH_OSDMAP_* */
u32 max_osd; /* size of osd_state, _offload, _addr arrays */
u8 *osd_state; /* CEPH_OSD_* */
u32 *osd_state; /* CEPH_OSD_* */
u32 *osd_weight; /* 0 = failed, 0x10000 = 100% normal */
struct ceph_entity_addr *osd_addr;
struct rb_root pg_temp;
struct rb_root primary_temp;
/* remap (post-CRUSH, pre-up) */
struct rb_root pg_upmap; /* PG := raw set */
struct rb_root pg_upmap_items; /* from -> to within raw set */
u32 *osd_primary_affinity;
struct rb_root pg_pools;
......@@ -187,7 +203,7 @@ static inline bool ceph_osd_is_down(struct ceph_osdmap *map, int osd)
return !ceph_osd_is_up(map, osd);
}
extern char *ceph_osdmap_state_str(char *str, int len, int state);
char *ceph_osdmap_state_str(char *str, int len, u32 state);
extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd);
static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
......@@ -198,11 +214,13 @@ static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
return &map->osd_addr[osd];
}
#define CEPH_PGID_ENCODING_LEN (1 + 8 + 4 + 4)
static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
{
__u8 version;
if (!ceph_has_room(p, end, 1 + 8 + 4 + 4)) {
if (!ceph_has_room(p, end, CEPH_PGID_ENCODING_LEN)) {
pr_warn("incomplete pg encoding\n");
return -EINVAL;
}
......@@ -240,6 +258,8 @@ static inline void ceph_osds_init(struct ceph_osds *set)
void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src);
bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
u32 new_pg_num);
bool ceph_is_new_interval(const struct ceph_osds *old_acting,
const struct ceph_osds *new_acting,
const struct ceph_osds *old_up,
......@@ -262,15 +282,24 @@ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 len,
u64 *bno, u64 *oxoff, u64 *oxlen);
int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid);
int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid);
void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
struct ceph_pg_pool_info *pi,
const struct ceph_pg *raw_pgid,
struct ceph_osds *up,
struct ceph_osds *acting);
bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
struct ceph_pg_pool_info *pi,
const struct ceph_pg *raw_pgid,
struct ceph_spg *spgid);
int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
const struct ceph_pg *raw_pgid);
......
......@@ -439,6 +439,12 @@ enum {
const char *ceph_osd_watch_op_name(int o);
enum {
CEPH_OSD_BACKOFF_OP_BLOCK = 1,
CEPH_OSD_BACKOFF_OP_ACK_BLOCK = 2,
CEPH_OSD_BACKOFF_OP_UNBLOCK = 3,
};
/*
* an individual object operation. each may be accompanied by some data
* payload
......
......@@ -2,6 +2,7 @@
#define CEPH_CRUSH_CRUSH_H
#ifdef __KERNEL__
# include <linux/rbtree.h>
# include <linux/types.h>
#else
# include "crush_compat.h"
......@@ -137,6 +138,68 @@ struct crush_bucket {
};
/** @ingroup API
*
* Replacement weights for each item in a bucket. The size of the
* array must be exactly the size of the straw2 bucket, just as the
* item_weights array.
*
*/
struct crush_weight_set {
__u32 *weights; /*!< 16.16 fixed point weights
in the same order as items */
__u32 size; /*!< size of the __weights__ array */
};
/** @ingroup API
*
* Replacement weights and ids for a given straw2 bucket, for
* placement purposes.
*
* When crush_do_rule() chooses the Nth item from a straw2 bucket, the
* replacement weights found at __weight_set[N]__ are used instead of
* the weights from __item_weights__. If __N__ is greater than
* __weight_set_size__, the weights found at __weight_set_size-1__ are
* used instead. For instance if __weight_set__ is:
*
* [ [ 0x10000, 0x20000 ], // position 0
* [ 0x20000, 0x40000 ] ] // position 1
*
* choosing the 0th item will use position 0 weights [ 0x10000, 0x20000 ]
* choosing the 1th item will use position 1 weights [ 0x20000, 0x40000 ]
* choosing the 2th item will use position 1 weights [ 0x20000, 0x40000 ]
* etc.
*
*/
struct crush_choose_arg {
__s32 *ids; /*!< values to use instead of items */
__u32 ids_size; /*!< size of the __ids__ array */
struct crush_weight_set *weight_set; /*!< weight replacements for
a given position */
__u32 weight_set_size; /*!< size of the __weight_set__ array */
};
/** @ingroup API
*
* Replacement weights and ids for each bucket in the crushmap. The
* __size__ of the __args__ array must be exactly the same as the
* __map->max_buckets__.
*
* The __crush_choose_arg__ at index N will be used when choosing
* an item from the bucket __map->buckets[N]__ bucket, provided it
* is a straw2 bucket.
*
*/
struct crush_choose_arg_map {
#ifdef __KERNEL__
struct rb_node node;
u64 choose_args_index;
#endif
struct crush_choose_arg *args; /*!< replacement for each bucket
in the crushmap */
__u32 size; /*!< size of the __args__ array */
};
struct crush_bucket_uniform {
struct crush_bucket h;
__u32 item_weight; /* 16-bit fixed point; all items equally weighted */
......@@ -236,6 +299,9 @@ struct crush_map {
__u32 allowed_bucket_algs;
__u32 *choose_tries;
#else
/* CrushWrapper::choose_args */
struct rb_root choose_args;
#endif
};
......
......@@ -11,11 +11,10 @@
#include "crush.h"
extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size);
extern int crush_do_rule(const struct crush_map *map,
int ruleno,
int x, int *result, int result_max,
const __u32 *weights, int weight_max,
void *cwin);
int crush_do_rule(const struct crush_map *map,
int ruleno, int x, int *result, int result_max,
const __u32 *weight, int weight_max,
void *cwin, const struct crush_choose_arg *choose_args);
/*
* Returns the exact amount of workspace that will need to be used
......
......@@ -85,6 +85,7 @@ const char *ceph_msg_type_name(int type)
case CEPH_MSG_OSD_OP: return "osd_op";
case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
case CEPH_MSG_OSD_BACKOFF: return "osd_backoff";
default: return "unknown";
}
}
......
#ifdef __KERNEL__
# include <linux/slab.h>
# include <linux/crush/crush.h>
void clear_choose_args(struct crush_map *c);
#else
# include "crush_compat.h"
# include "crush.h"
......@@ -127,6 +128,8 @@ void crush_destroy(struct crush_map *map)
#ifndef __KERNEL__
kfree(map->choose_tries);
#else
clear_choose_args(map);
#endif
kfree(map);
}
......
......@@ -302,19 +302,42 @@ static __u64 crush_ln(unsigned int xin)
*
*/
static __u32 *get_choose_arg_weights(const struct crush_bucket_straw2 *bucket,
const struct crush_choose_arg *arg,
int position)
{
if (!arg || !arg->weight_set || arg->weight_set_size == 0)
return bucket->item_weights;
if (position >= arg->weight_set_size)
position = arg->weight_set_size - 1;
return arg->weight_set[position].weights;
}
static __s32 *get_choose_arg_ids(const struct crush_bucket_straw2 *bucket,
const struct crush_choose_arg *arg)
{
if (!arg || !arg->ids)
return bucket->h.items;
return arg->ids;
}
static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
int x, int r)
int x, int r,
const struct crush_choose_arg *arg,
int position)
{
unsigned int i, high = 0;
unsigned int u;
unsigned int w;
__s64 ln, draw, high_draw = 0;
__u32 *weights = get_choose_arg_weights(bucket, arg, position);
__s32 *ids = get_choose_arg_ids(bucket, arg);
for (i = 0; i < bucket->h.size; i++) {
w = bucket->item_weights[i];
if (w) {
u = crush_hash32_3(bucket->h.hash, x,
bucket->h.items[i], r);
dprintk("weight 0x%x item %d\n", weights[i], ids[i]);
if (weights[i]) {
u = crush_hash32_3(bucket->h.hash, x, ids[i], r);
u &= 0xffff;
/*
......@@ -335,7 +358,7 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
* weight means a larger (less negative) value
* for draw.
*/
draw = div64_s64(ln, w);
draw = div64_s64(ln, weights[i]);
} else {
draw = S64_MIN;
}
......@@ -352,7 +375,9 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
static int crush_bucket_choose(const struct crush_bucket *in,
struct crush_work_bucket *work,
int x, int r)
int x, int r,
const struct crush_choose_arg *arg,
int position)
{
dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
BUG_ON(in->size == 0);
......@@ -374,7 +399,7 @@ static int crush_bucket_choose(const struct crush_bucket *in,
case CRUSH_BUCKET_STRAW2:
return bucket_straw2_choose(
(const struct crush_bucket_straw2 *)in,
x, r);
x, r, arg, position);
default:
dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
return in->items[0];
......@@ -436,7 +461,8 @@ static int crush_choose_firstn(const struct crush_map *map,
unsigned int vary_r,
unsigned int stable,
int *out2,
int parent_r)
int parent_r,
const struct crush_choose_arg *choose_args)
{
int rep;
unsigned int ftotal, flocal;
......@@ -486,7 +512,10 @@ static int crush_choose_firstn(const struct crush_map *map,
else
item = crush_bucket_choose(
in, work->work[-1-in->id],
x, r);
x, r,
(choose_args ?
&choose_args[-1-in->id] : 0),
outpos);
if (item >= map->max_devices) {
dprintk(" bad item %d\n", item);
skip_rep = 1;
......@@ -543,7 +572,8 @@ static int crush_choose_firstn(const struct crush_map *map,
vary_r,
stable,
NULL,
sub_r) <= outpos)
sub_r,
choose_args) <= outpos)
/* didn't get leaf */
reject = 1;
} else {
......@@ -620,7 +650,8 @@ static void crush_choose_indep(const struct crush_map *map,
unsigned int recurse_tries,
int recurse_to_leaf,
int *out2,
int parent_r)
int parent_r,
const struct crush_choose_arg *choose_args)
{
const struct crush_bucket *in = bucket;
int endpos = outpos + left;
......@@ -692,7 +723,10 @@ static void crush_choose_indep(const struct crush_map *map,
item = crush_bucket_choose(
in, work->work[-1-in->id],
x, r);
x, r,
(choose_args ?
&choose_args[-1-in->id] : 0),
outpos);
if (item >= map->max_devices) {
dprintk(" bad item %d\n", item);
out[rep] = CRUSH_ITEM_NONE;
......@@ -746,7 +780,8 @@ static void crush_choose_indep(const struct crush_map *map,
x, 1, numrep, 0,
out2, rep,
recurse_tries, 0,
0, NULL, r);
0, NULL, r,
choose_args);
if (out2[rep] == CRUSH_ITEM_NONE) {
/* placed nothing; no leaf */
break;
......@@ -823,7 +858,7 @@ void crush_init_workspace(const struct crush_map *map, void *v)
* set the pointer first and then reserve the space for it to
* point to by incrementing the point.
*/
v += sizeof(struct crush_work *);
v += sizeof(struct crush_work);
w->work = v;
v += map->max_buckets * sizeof(struct crush_work_bucket *);
for (b = 0; b < map->max_buckets; ++b) {
......@@ -854,11 +889,12 @@ void crush_init_workspace(const struct crush_map *map, void *v)
* @weight: weight vector (for map leaves)
* @weight_max: size of weight vector
* @cwin: pointer to at least crush_work_size() bytes of memory
* @choose_args: weights and ids for each known bucket
*/
int crush_do_rule(const struct crush_map *map,
int ruleno, int x, int *result, int result_max,
const __u32 *weight, int weight_max,
void *cwin)
void *cwin, const struct crush_choose_arg *choose_args)
{
int result_len;
struct crush_work *cw = cwin;
......@@ -968,11 +1004,6 @@ int crush_do_rule(const struct crush_map *map,
for (i = 0; i < wsize; i++) {
int bno;
/*
* see CRUSH_N, CRUSH_N_MINUS macros.
* basically, numrep <= 0 means relative to
* the provided result_max
*/
numrep = curstep->arg1;
if (numrep <= 0) {
numrep += result_max;
......@@ -1013,7 +1044,8 @@ int crush_do_rule(const struct crush_map *map,
vary_r,
stable,
c+osize,
0);
0,
choose_args);
} else {
out_size = ((numrep < (result_max-osize)) ?
numrep : (result_max-osize));
......@@ -1030,7 +1062,8 @@ int crush_do_rule(const struct crush_map *map,
choose_leaf_tries : 1,
recurse_to_leaf,
c+osize,
0);
0,
choose_args);
osize += out_size;
}
}
......
......@@ -77,7 +77,7 @@ static int osdmap_show(struct seq_file *s, void *p)
}
for (i = 0; i < map->max_osd; i++) {
struct ceph_entity_addr *addr = &map->osd_addr[i];
int state = map->osd_state[i];
u32 state = map->osd_state[i];
char sb[64];
seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
......@@ -104,6 +104,29 @@ static int osdmap_show(struct seq_file *s, void *p)
seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
pg->pgid.seed, pg->primary_temp.osd);
}
for (n = rb_first(&map->pg_upmap); n; n = rb_next(n)) {
struct ceph_pg_mapping *pg =
rb_entry(n, struct ceph_pg_mapping, node);
seq_printf(s, "pg_upmap %llu.%x [", pg->pgid.pool,
pg->pgid.seed);
for (i = 0; i < pg->pg_upmap.len; i++)
seq_printf(s, "%s%d", (i == 0 ? "" : ","),
pg->pg_upmap.osds[i]);
seq_printf(s, "]\n");
}
for (n = rb_first(&map->pg_upmap_items); n; n = rb_next(n)) {
struct ceph_pg_mapping *pg =
rb_entry(n, struct ceph_pg_mapping, node);
seq_printf(s, "pg_upmap_items %llu.%x [", pg->pgid.pool,
pg->pgid.seed);
for (i = 0; i < pg->pg_upmap_items.len; i++)
seq_printf(s, "%s%d->%d", (i == 0 ? "" : ","),
pg->pg_upmap_items.from_to[i][0],
pg->pg_upmap_items.from_to[i][1]);
seq_printf(s, "]\n");
}
up_read(&osdc->lock);
return 0;
......@@ -147,17 +170,26 @@ static int monc_show(struct seq_file *s, void *p)
return 0;
}
static void dump_spgid(struct seq_file *s, const struct ceph_spg *spgid)
{
seq_printf(s, "%llu.%x", spgid->pgid.pool, spgid->pgid.seed);
if (spgid->shard != CEPH_SPG_NOSHARD)
seq_printf(s, "s%d", spgid->shard);
}
static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
{
int i;
seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
seq_printf(s, "osd%d\t%llu.%x\t", t->osd, t->pgid.pool, t->pgid.seed);
dump_spgid(s, &t->spgid);
seq_puts(s, "\t[");
for (i = 0; i < t->up.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
seq_printf(s, "]/%d\t", t->acting.primary);
seq_printf(s, "]/%d\te%u\t", t->acting.primary, t->epoch);
if (t->target_oloc.pool_ns) {
seq_printf(s, "%*pE/%*pE\t0x%x",
(int)t->target_oloc.pool_ns->len,
......@@ -234,6 +266,73 @@ static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
mutex_unlock(&osd->lock);
}
static void dump_snapid(struct seq_file *s, u64 snapid)
{
if (snapid == CEPH_NOSNAP)
seq_puts(s, "head");
else if (snapid == CEPH_SNAPDIR)
seq_puts(s, "snapdir");
else
seq_printf(s, "%llx", snapid);
}
static void dump_name_escaped(struct seq_file *s, unsigned char *name,
size_t len)
{
size_t i;
for (i = 0; i < len; i++) {
if (name[i] == '%' || name[i] == ':' || name[i] == '/' ||
name[i] < 32 || name[i] >= 127) {
seq_printf(s, "%%%02x", name[i]);
} else {
seq_putc(s, name[i]);
}
}
}
static void dump_hoid(struct seq_file *s, const struct ceph_hobject_id *hoid)
{
if (hoid->snapid == 0 && hoid->hash == 0 && !hoid->is_max &&
hoid->pool == S64_MIN) {
seq_puts(s, "MIN");
return;
}
if (hoid->is_max) {
seq_puts(s, "MAX");
return;
}
seq_printf(s, "%lld:%08x:", hoid->pool, hoid->hash_reverse_bits);
dump_name_escaped(s, hoid->nspace, hoid->nspace_len);
seq_putc(s, ':');
dump_name_escaped(s, hoid->key, hoid->key_len);
seq_putc(s, ':');
dump_name_escaped(s, hoid->oid, hoid->oid_len);
seq_putc(s, ':');
dump_snapid(s, hoid->snapid);
}
static void dump_backoffs(struct seq_file *s, struct ceph_osd *osd)
{
struct rb_node *n;
mutex_lock(&osd->lock);
for (n = rb_first(&osd->o_backoffs_by_id); n; n = rb_next(n)) {
struct ceph_osd_backoff *backoff =
rb_entry(n, struct ceph_osd_backoff, id_node);
seq_printf(s, "osd%d\t", osd->o_osd);
dump_spgid(s, &backoff->spgid);
seq_printf(s, "\t%llu\t", backoff->id);
dump_hoid(s, backoff->begin);
seq_putc(s, '\t');
dump_hoid(s, backoff->end);
seq_putc(s, '\n');
}
mutex_unlock(&osd->lock);
}
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
......@@ -259,6 +358,13 @@ static int osdc_show(struct seq_file *s, void *pp)
}
dump_linger_requests(s, &osdc->homeless_osd);
seq_puts(s, "BACKOFFS\n");
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
dump_backoffs(s, osd);
}
up_read(&osdc->lock);
return 0;
}
......
......@@ -1288,13 +1288,16 @@ static void prepare_write_message(struct ceph_connection *con)
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
if (con->ops->reencode_message)
con->ops->reencode_message(m);
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
m->data_length);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
/* tag + hdr + front + middle */
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
......@@ -2033,8 +2036,7 @@ static int process_connect(struct ceph_connection *con)
{
u64 sup_feat = from_msgr(con->msgr)->supported_features;
u64 req_feat = from_msgr(con->msgr)->required_features;
u64 server_feat = ceph_sanitize_features(
le64_to_cpu(con->in_reply.features));
u64 server_feat = le64_to_cpu(con->in_reply.features);
int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
......
......@@ -6,6 +6,7 @@
#include <linux/random.h>
#include <linux/sched.h>
#include <linux/ceph/ceph_features.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/debugfs.h>
......@@ -297,6 +298,10 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
mutex_lock(&monc->mutex);
if (monc->sub_renew_sent) {
/*
* This is only needed for legacy (infernalis or older)
* MONs -- see delayed_work().
*/
monc->sub_renew_after = monc->sub_renew_sent +
(seconds >> 1) * HZ - 1;
dout("%s sent %lu duration %d renew after %lu\n", __func__,
......@@ -955,7 +960,8 @@ static void delayed_work(struct work_struct *work)
__validate_auth(monc);
}
if (is_auth) {
if (is_auth &&
!(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
unsigned long now = jiffies;
dout("%s renew subs? now %lu renew after %lu\n",
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment