Commit 1204c464 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "This time around we have a collection of CephFS fixes from Zheng
  around MDS failure handling and snapshots, support for a new CRUSH
  straw2 algorithm (to sync up with userspace) and several RBD cleanups
  and fixes from Ilya, an error path leak fix from Taesoo, and then an
  assorted collection of cleanups from others"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (28 commits)
  rbd: rbd_wq comment is obsolete
  libceph: announce support for straw2 buckets
  crush: straw2 bucket type with an efficient 64-bit crush_ln()
  crush: ensuring at most num-rep osds are selected
  crush: drop unnecessary include from mapper.c
  ceph: fix uninline data function
  ceph: rename snapshot support
  ceph: fix null pointer dereference in send_mds_reconnect()
  ceph: hold on to exclusive caps on complete directories
  libceph: simplify our debugfs attr macro
  ceph: show non-default options only
  libceph: expose client options through debugfs
  libceph, ceph: split ceph_show_options()
  rbd: mark block queue as non-rotational
  libceph: don't overwrite specific con error msgs
  ceph: cleanup unsafe requests when reconnecting is denied
  ceph: don't zero i_wrbuffer_ref when reconnecting is denied
  ceph: don't mark dirty caps when there is no auth cap
  ceph: keep i_snap_realm while there are writers
  libceph: osdmap.h: Add missing format newlines
  ...
parents 4f211235 f77303bd
......@@ -3762,8 +3762,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
goto out_tag_set;
}
/* We use the default size, but let's be explicit about it. */
blk_queue_physical_block_size(q, SECTOR_SIZE);
queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
/* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
/* set io sizes to object size */
segment_size = rbd_obj_bytes(&rbd_dev->header);
......@@ -5301,8 +5301,13 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
if (mapping) {
ret = rbd_dev_header_watch_sync(rbd_dev);
if (ret)
if (ret) {
if (ret == -ENOENT)
pr_info("image %s/%s does not exist\n",
rbd_dev->spec->pool_name,
rbd_dev->spec->image_name);
goto out_header_name;
}
}
ret = rbd_dev_header_info(rbd_dev);
......@@ -5319,8 +5324,14 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
ret = rbd_spec_fill_snap_id(rbd_dev);
else
ret = rbd_spec_fill_names(rbd_dev);
if (ret)
if (ret) {
if (ret == -ENOENT)
pr_info("snap %s/%s@%s does not exist\n",
rbd_dev->spec->pool_name,
rbd_dev->spec->image_name,
rbd_dev->spec->snap_name);
goto err_out_probe;
}
if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
ret = rbd_dev_v2_parent_info(rbd_dev);
......@@ -5390,8 +5401,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
/* pick the pool */
rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
if (rc < 0)
if (rc < 0) {
if (rc == -ENOENT)
pr_info("pool %s does not exist\n", spec->pool_name);
goto err_out_client;
}
spec->pool_id = (u64)rc;
/* The ceph file layout needs to fit pool id in 32 bits */
......@@ -5673,7 +5687,7 @@ static int __init rbd_init(void)
/*
* The number of active work items is limited by the number of
* rbd devices, so leave @max_active at default.
* rbd devices * queue depth, so leave @max_active at default.
*/
rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
if (!rbd_wq) {
......
......@@ -1146,6 +1146,10 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
inode, page, (int)pos, (int)len);
r = ceph_update_writeable_page(file, pos, len, page);
if (r < 0)
page_cache_release(page);
else
*pagep = page;
} while (r == -EAGAIN);
return r;
......@@ -1534,19 +1538,27 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
"inline_version", &inline_version,
sizeof(inline_version),
CEPH_OSD_CMPXATTR_OP_GT,
CEPH_OSD_CMPXATTR_MODE_U64);
if (err)
goto out_put;
err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
"inline_version", &inline_version,
sizeof(inline_version), 0, 0);
if (err)
goto out_put;
{
__le64 xattr_buf = cpu_to_le64(inline_version);
err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
"inline_version", &xattr_buf,
sizeof(xattr_buf),
CEPH_OSD_CMPXATTR_OP_GT,
CEPH_OSD_CMPXATTR_MODE_U64);
if (err)
goto out_put;
}
{
char xattr_buf[32];
int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
"%llu", inline_version);
err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
"inline_version",
xattr_buf, xattr_len, 0, 0);
if (err)
goto out_put;
}
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
......
......@@ -896,6 +896,18 @@ int ceph_is_any_caps(struct inode *inode)
return ret;
}
static void drop_inode_snap_realm(struct ceph_inode_info *ci)
{
struct ceph_snap_realm *realm = ci->i_snap_realm;
spin_lock(&realm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item);
ci->i_snap_realm_counter++;
ci->i_snap_realm = NULL;
spin_unlock(&realm->inodes_with_caps_lock);
ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc,
realm);
}
/*
* Remove a cap. Take steps to deal with a racing iterate_session_caps.
*
......@@ -946,15 +958,13 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
if (removed)
ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
spin_lock(&realm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item);
ci->i_snap_realm_counter++;
ci->i_snap_realm = NULL;
spin_unlock(&realm->inodes_with_caps_lock);
ceph_put_snap_realm(mdsc, realm);
}
/* when reconnect denied, we remove session caps forcibly,
* i_wr_ref can be non-zero. If there are ongoing write,
* keep i_snap_realm.
*/
if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm)
drop_inode_snap_realm(ci);
if (!__ceph_is_any_real_caps(ci))
__cap_delay_cancel(mdsc, ci);
}
......@@ -1394,6 +1404,13 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
int was = ci->i_dirty_caps;
int dirty = 0;
if (!ci->i_auth_cap) {
pr_warn("__mark_dirty_caps %p %llx mask %s, "
"but no auth cap (session was closed?)\n",
inode, ceph_ino(inode), ceph_cap_string(mask));
return 0;
}
dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode,
ceph_cap_string(mask), ceph_cap_string(was),
ceph_cap_string(was | mask));
......@@ -1404,7 +1421,6 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
ci->i_snap_realm->cached_context);
dout(" inode %p now dirty snapc %p auth cap %p\n",
&ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
WARN_ON(!ci->i_auth_cap);
BUG_ON(!list_empty(&ci->i_dirty_item));
spin_lock(&mdsc->cap_dirty_lock);
list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
......@@ -1545,7 +1561,19 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
if (!mdsc->stopping && inode->i_nlink > 0) {
if (want) {
retain |= CEPH_CAP_ANY; /* be greedy */
} else if (S_ISDIR(inode->i_mode) &&
(issued & CEPH_CAP_FILE_SHARED) &&
__ceph_dir_is_complete(ci)) {
/*
* If a directory is complete, we want to keep
* the exclusive cap. So that MDS does not end up
* revoking the shared cap on every create/unlink
* operation.
*/
want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
retain |= want;
} else {
retain |= CEPH_CAP_ANY_SHARED;
/*
* keep RD only if we didn't have the file open RW,
......@@ -2309,6 +2337,9 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
wake = 1;
}
}
/* see comment in __ceph_remove_cap() */
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
drop_inode_snap_realm(ci);
}
spin_unlock(&ci->i_ceph_lock);
......
......@@ -281,6 +281,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* can we use the dcache? */
spin_lock(&ci->i_ceph_lock);
if ((ctx->pos == 2 || fi->dentry) &&
ceph_test_mount_opt(fsc, DCACHE) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete_ordered(ci) &&
......@@ -336,16 +337,23 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ceph_mdsc_put_request(req);
return err;
}
req->r_inode = inode;
ihold(inode);
req->r_dentry = dget(file->f_path.dentry);
/* hints to request -> mds selection code */
req->r_direct_mode = USE_AUTH_MDS;
req->r_direct_hash = ceph_frag_value(frag);
req->r_direct_is_hash = true;
req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
if (fi->last_name) {
req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
if (!req->r_path2) {
ceph_mdsc_put_request(req);
return -ENOMEM;
}
}
req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag);
req->r_inode = inode;
ihold(inode);
req->r_dentry = dget(file->f_path.dentry);
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) {
ceph_mdsc_put_request(req);
......@@ -629,6 +637,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
fsc->mount_options->snapdir_name,
dentry->d_name.len) &&
!is_root_ceph_dentry(dir, dentry) &&
ceph_test_mount_opt(fsc, DCACHE) &&
__ceph_dir_is_complete(ci) &&
(__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
spin_unlock(&ci->i_ceph_lock);
......@@ -755,10 +764,15 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
err = PTR_ERR(req);
goto out;
}
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_path2 = kstrdup(dest, GFP_NOFS);
if (!req->r_path2) {
err = -ENOMEM;
ceph_mdsc_put_request(req);
goto out;
}
req->r_locked_dir = dir;
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req);
......@@ -933,16 +947,20 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
int op = CEPH_MDS_OP_RENAME;
int err;
if (ceph_snap(old_dir) != ceph_snap(new_dir))
return -EXDEV;
if (ceph_snap(old_dir) != CEPH_NOSNAP ||
ceph_snap(new_dir) != CEPH_NOSNAP)
return -EROFS;
if (ceph_snap(old_dir) != CEPH_NOSNAP) {
if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
op = CEPH_MDS_OP_RENAMESNAP;
else
return -EROFS;
}
dout("rename dir %p dentry %p to dir %p dentry %p\n",
old_dir, old_dentry, new_dir, new_dentry);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
ihold(old_dir);
......@@ -1240,11 +1258,12 @@ static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
dout("dir_fsync %p wait on tid %llu (until %llu)\n",
inode, req->r_tid, last_tid);
if (req->r_timeout) {
ret = wait_for_completion_timeout(
&req->r_safe_completion, req->r_timeout);
if (ret > 0)
unsigned long time_left = wait_for_completion_timeout(
&req->r_safe_completion,
req->r_timeout);
if (time_left > 0)
ret = 0;
else if (ret == 0)
else
ret = -EIO; /* timed out */
} else {
wait_for_completion(&req->r_safe_completion);
......@@ -1372,6 +1391,7 @@ const struct inode_operations ceph_snapdir_iops = {
.getattr = ceph_getattr,
.mkdir = ceph_mkdir,
.rmdir = ceph_unlink,
.rename = ceph_rename,
};
const struct dentry_operations ceph_dentry_ops = {
......
......@@ -1021,6 +1021,33 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
spin_unlock(&session->s_cap_lock);
}
static void cleanup_session_requests(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_mds_request *req;
struct rb_node *p;
dout("cleanup_session_requests mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex);
while (!list_empty(&session->s_unsafe)) {
req = list_first_entry(&session->s_unsafe,
struct ceph_mds_request, r_unsafe_item);
list_del_init(&req->r_unsafe_item);
pr_info(" dropping unsafe request %llu\n", req->r_tid);
__unregister_request(mdsc, req);
}
/* zero r_attempts, so kick_requests() will re-send requests */
p = rb_first(&mdsc->request_tree);
while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p);
if (req->r_session &&
req->r_session->s_mds == session->s_mds)
req->r_attempts = 0;
}
mutex_unlock(&mdsc->mutex);
}
/*
* Helper to safely iterate over all caps associated with a session, with
* special care taken to handle a racing __ceph_remove_cap().
......@@ -1098,7 +1125,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
__ceph_remove_cap(cap, false);
if (!__ceph_is_any_real_caps(ci)) {
if (!ci->i_auth_cap) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
......@@ -1120,13 +1147,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
mdsc->num_cap_flushing--;
drop = 1;
}
if (drop && ci->i_wrbuffer_ref) {
pr_info(" dropping dirty data for %p %lld\n",
inode, ceph_ino(inode));
ci->i_wrbuffer_ref = 0;
ci->i_wrbuffer_ref_head = 0;
drop++;
}
spin_unlock(&mdsc->cap_dirty_lock);
}
spin_unlock(&ci->i_ceph_lock);
......@@ -1853,7 +1873,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
*/
static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req,
int mds)
int mds, bool drop_cap_releases)
{
struct ceph_msg *msg;
struct ceph_mds_request_head *head;
......@@ -1937,6 +1957,12 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
releases += ceph_encode_inode_release(&p,
req->r_old_dentry->d_inode,
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
if (drop_cap_releases) {
releases = 0;
p = msg->front.iov_base + req->r_request_release_offset;
}
head->num_releases = cpu_to_le16(releases);
/* time stamp */
......@@ -1989,7 +2015,7 @@ static void complete_request(struct ceph_mds_client *mdsc,
*/
static int __prepare_send_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req,
int mds)
int mds, bool drop_cap_releases)
{
struct ceph_mds_request_head *rhead;
struct ceph_msg *msg;
......@@ -2048,7 +2074,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
ceph_msg_put(req->r_request);
req->r_request = NULL;
}
msg = create_request_message(mdsc, req, mds);
msg = create_request_message(mdsc, req, mds, drop_cap_releases);
if (IS_ERR(msg)) {
req->r_err = PTR_ERR(msg);
complete_request(mdsc, req);
......@@ -2132,7 +2158,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
if (req->r_request_started == 0) /* note request start time */
req->r_request_started = jiffies;
err = __prepare_send_request(mdsc, req, mds);
err = __prepare_send_request(mdsc, req, mds, false);
if (!err) {
ceph_msg_get(req->r_request);
ceph_con_send(&session->s_con, req->r_request);
......@@ -2590,6 +2616,7 @@ static void handle_session(struct ceph_mds_session *session,
case CEPH_SESSION_CLOSE:
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info("mds%d reconnect denied\n", session->s_mds);
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
wake = 2; /* for good measure */
wake_up_all(&mdsc->session_close_wq);
......@@ -2658,7 +2685,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
mutex_lock(&mdsc->mutex);
list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
err = __prepare_send_request(mdsc, req, session->s_mds);
err = __prepare_send_request(mdsc, req, session->s_mds, true);
if (!err) {
ceph_msg_get(req->r_request);
ceph_con_send(&session->s_con, req->r_request);
......@@ -2679,7 +2706,8 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
continue; /* only old requests */
if (req->r_session &&
req->r_session->s_mds == session->s_mds) {
err = __prepare_send_request(mdsc, req, session->s_mds);
err = __prepare_send_request(mdsc, req,
session->s_mds, true);
if (!err) {
ceph_msg_get(req->r_request);
ceph_con_send(&session->s_con, req->r_request);
......@@ -2864,7 +2892,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
spin_unlock(&session->s_cap_lock);
/* trim unused caps to reduce MDS's cache rejoin time */
shrink_dcache_parent(mdsc->fsc->sb->s_root);
if (mdsc->fsc->sb->s_root)
shrink_dcache_parent(mdsc->fsc->sb->s_root);
ceph_con_close(&session->s_con);
ceph_con_open(&session->s_con,
......@@ -3133,7 +3162,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
di->lease_renew_from &&
di->lease_renew_after == 0) {
unsigned long duration =
le32_to_cpu(h->duration_ms) * HZ / 1000;
msecs_to_jiffies(le32_to_cpu(h->duration_ms));
di->lease_seq = seq;
dentry->d_time = di->lease_renew_from + duration;
......
......@@ -75,6 +75,7 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_LSSNAP: return "lssnap";
case CEPH_MDS_OP_MKSNAP: return "mksnap";
case CEPH_MDS_OP_RMSNAP: return "rmsnap";
case CEPH_MDS_OP_RENAMESNAP: return "renamesnap";
case CEPH_MDS_OP_SETFILELOCK: return "setfilelock";
case CEPH_MDS_OP_GETFILELOCK: return "getfilelock";
}
......
......@@ -345,6 +345,11 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->rsize = CEPH_RSIZE_DEFAULT;
fsopt->rasize = CEPH_RASIZE_DEFAULT;
fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
if (!fsopt->snapdir_name) {
err = -ENOMEM;
goto out;
}
fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
......@@ -406,31 +411,20 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(root->d_sb);
struct ceph_mount_options *fsopt = fsc->mount_options;
struct ceph_options *opt = fsc->client->options;
if (opt->flags & CEPH_OPT_FSID)
seq_printf(m, ",fsid=%pU", &opt->fsid);
if (opt->flags & CEPH_OPT_NOSHARE)
seq_puts(m, ",noshare");
if (opt->flags & CEPH_OPT_NOCRC)
seq_puts(m, ",nocrc");
if (opt->flags & CEPH_OPT_NOMSGAUTH)
seq_puts(m, ",nocephx_require_signatures");
if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
seq_puts(m, ",notcp_nodelay");
if (opt->name)
seq_printf(m, ",name=%s", opt->name);
if (opt->key)
seq_puts(m, ",secret=<hidden>");
if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
seq_printf(m, ",osdkeepalivetimeout=%d",
opt->osd_keepalive_timeout);
size_t pos;
int ret;
/* a comma between MNT/MS and client options */
seq_putc(m, ',');
pos = m->count;
ret = ceph_print_client_options(m, fsc->client);
if (ret)
return ret;
/* retract our comma if no client options */
if (m->count == pos)
m->count--;
if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
seq_puts(m, ",dirstat");
......@@ -438,14 +432,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",norbytes");
if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
seq_puts(m, ",noasyncreaddir");
if (fsopt->flags & CEPH_MOUNT_OPT_DCACHE)
seq_puts(m, ",dcache");
else
if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
seq_puts(m, ",nodcache");
if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE)
seq_puts(m, ",fsc");
else
seq_puts(m, ",nofsc");
#ifdef CONFIG_CEPH_FS_POSIX_ACL
if (fsopt->sb_flags & MS_POSIXACL)
......@@ -477,6 +467,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_printf(m, ",readdir_max_bytes=%d", fsopt->max_readdir_bytes);
if (strcmp(fsopt->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
seq_printf(m, ",snapdirname=%s", fsopt->snapdir_name);
return 0;
}
......@@ -730,6 +721,11 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
if (IS_ERR(req))
return ERR_CAST(req);
req->r_path1 = kstrdup(path, GFP_NOFS);
if (!req->r_path1) {
root = ERR_PTR(-ENOMEM);
goto out;
}
req->r_ino1.ino = CEPH_INO_ROOT;
req->r_ino1.snap = CEPH_NOSNAP;
req->r_started = started;
......
......@@ -36,7 +36,8 @@
#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES)
#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES | \
CEPH_MOUNT_OPT_DCACHE)
#define ceph_set_mount_opt(fsc, opt) \
(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
......@@ -881,7 +882,6 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
/* file.c */
extern const struct file_operations ceph_file_fops;
extern const struct address_space_operations ceph_aops;
extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
......
......@@ -877,16 +877,23 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
err = PTR_ERR(req);
goto out;
}
req->r_inode = inode;
ihold(inode);
req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
req->r_num_caps = 1;
req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_path2 = kstrdup(name, GFP_NOFS);
if (!req->r_path2) {
ceph_mdsc_put_request(req);
err = -ENOMEM;
goto out;
}
req->r_pagelist = pagelist;
pagelist = NULL;
req->r_inode = inode;
ihold(inode);
req->r_num_caps = 1;
req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
......@@ -1019,12 +1026,14 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_path2 = kstrdup(name, GFP_NOFS);
if (!req->r_path2)
return -ENOMEM;
req->r_inode = inode;
ihold(inode);
req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
req->r_num_caps = 1;
req->r_path2 = kstrdup(name, GFP_NOFS);
req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
return err;
......
......@@ -50,6 +50,19 @@
#define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<40)
#define CEPH_FEATURE_CRUSH_TUNABLES3 (1ULL<<41)
#define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41) /* overlap w/ tunables3 */
#define CEPH_FEATURE_MSGR_KEEPALIVE2 (1ULL<<42)
#define CEPH_FEATURE_OSD_POOLRESEND (1ULL<<43)
#define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 (1ULL<<44)
#define CEPH_FEATURE_OSD_SET_ALLOC_HINT (1ULL<<45)
#define CEPH_FEATURE_OSD_FADVISE_FLAGS (1ULL<<46)
#define CEPH_FEATURE_OSD_REPOP (1ULL<<46) /* overlap with fadvise */
#define CEPH_FEATURE_OSD_OBJECT_DIGEST (1ULL<<46) /* overlap with fadvise */
#define CEPH_FEATURE_OSD_TRANSACTION_MAY_LAYOUT (1ULL<<46) /* overlap w/ fadvise */
#define CEPH_FEATURE_MDS_QUOTA (1ULL<<47)
#define CEPH_FEATURE_CRUSH_V4 (1ULL<<48) /* straw2 buckets */
#define CEPH_FEATURE_OSD_MIN_SIZE_RECOVERY (1ULL<<49)
// duplicated since it was introduced at the same time as MIN_SIZE_RECOVERY
#define CEPH_FEATURE_OSD_PROXY_FEATURES (1ULL<<49) /* overlap w/ above */
/*
* The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
......@@ -93,7 +106,8 @@ static inline u64 ceph_sanitize_features(u64 features)
CEPH_FEATURE_EXPORT_PEER | \
CEPH_FEATURE_OSDMAP_ENC | \
CEPH_FEATURE_CRUSH_TUNABLES3 | \
CEPH_FEATURE_OSD_PRIMARY_AFFINITY)
CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \
CEPH_FEATURE_CRUSH_V4)
#define CEPH_FEATURES_REQUIRED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \
......
......@@ -323,6 +323,7 @@ enum {
CEPH_MDS_OP_MKSNAP = 0x01400,
CEPH_MDS_OP_RMSNAP = 0x01401,
CEPH_MDS_OP_LSSNAP = 0x00402,
CEPH_MDS_OP_RENAMESNAP = 0x01403,
};
extern const char *ceph_mds_op_name(int op);
......
......@@ -7,13 +7,7 @@
#define CEPH_DEFINE_SHOW_FUNC(name) \
static int name##_open(struct inode *inode, struct file *file) \
{ \
struct seq_file *sf; \
int ret; \
\
ret = single_open(file, name, NULL); \
sf = file->private_data; \
sf->private = inode->i_private; \
return ret; \
return single_open(file, name, inode->i_private); \
} \
\
static const struct file_operations name##_fops = { \
......
......@@ -135,6 +135,7 @@ struct ceph_client {
struct dentry *debugfs_dir;
struct dentry *debugfs_monmap;
struct dentry *debugfs_osdmap;
struct dentry *debugfs_options;
#endif
};
......@@ -191,6 +192,7 @@ extern struct ceph_options *ceph_parse_options(char *options,
const char *dev_name, const char *dev_name_end,
int (*parse_extra_token)(char *c, void *private),
void *private);
int ceph_print_client_options(struct seq_file *m, struct ceph_client *client);
extern void ceph_destroy_options(struct ceph_options *opt);
extern int ceph_compare_options(struct ceph_options *new_opt,
struct ceph_client *client);
......
......@@ -175,13 +175,12 @@ static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
__u8 version;
if (!ceph_has_room(p, end, 1 + 8 + 4 + 4)) {
pr_warning("incomplete pg encoding");
pr_warn("incomplete pg encoding\n");
return -EINVAL;
}
version = ceph_decode_8(p);
if (version > 1) {
pr_warning("do not understand pg encoding %d > 1",
pr_warn("do not understand pg encoding %d > 1\n",
(int)version);
return -EINVAL;
}
......
......@@ -96,13 +96,15 @@ struct crush_rule {
* uniform O(1) poor poor
* list O(n) optimal poor
* tree O(log n) good good
* straw O(n) optimal optimal
* straw O(n) better better
* straw2 O(n) optimal optimal
*/
enum {
CRUSH_BUCKET_UNIFORM = 1,
CRUSH_BUCKET_LIST = 2,
CRUSH_BUCKET_TREE = 3,
CRUSH_BUCKET_STRAW = 4
CRUSH_BUCKET_STRAW = 4,
CRUSH_BUCKET_STRAW2 = 5,
};
extern const char *crush_bucket_alg_name(int alg);
......@@ -149,6 +151,11 @@ struct crush_bucket_straw {
__u32 *straws; /* 16-bit fixed point */
};
struct crush_bucket_straw2 {
struct crush_bucket h;
__u32 *item_weights; /* 16-bit fixed point */
};
/*
......@@ -189,6 +196,7 @@ extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b);
extern void crush_destroy_bucket_list(struct crush_bucket_list *b);
extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b);
extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b);
extern void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b);
extern void crush_destroy_bucket(struct crush_bucket *b);
extern void crush_destroy_rule(struct crush_rule *r);
extern void crush_destroy(struct crush_map *map);
......
......@@ -490,6 +490,43 @@ ceph_parse_options(char *options, const char *dev_name,
}
EXPORT_SYMBOL(ceph_parse_options);
int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
{
struct ceph_options *opt = client->options;
size_t pos = m->count;
if (opt->name)
seq_printf(m, "name=%s,", opt->name);
if (opt->key)
seq_puts(m, "secret=<hidden>,");
if (opt->flags & CEPH_OPT_FSID)
seq_printf(m, "fsid=%pU,", &opt->fsid);
if (opt->flags & CEPH_OPT_NOSHARE)
seq_puts(m, "noshare,");
if (opt->flags & CEPH_OPT_NOCRC)
seq_puts(m, "nocrc,");
if (opt->flags & CEPH_OPT_NOMSGAUTH)
seq_puts(m, "nocephx_require_signatures,");
if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
seq_puts(m, "notcp_nodelay,");
if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
seq_printf(m, "mount_timeout=%d,", opt->mount_timeout);
if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
seq_printf(m, "osd_idle_ttl=%d,", opt->osd_idle_ttl);
if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
seq_printf(m, "osdkeepalivetimeout=%d,",
opt->osd_keepalive_timeout);
/* drop redundant comma */
if (m->count != pos)
m->count--;
return 0;
}
EXPORT_SYMBOL(ceph_print_client_options);
u64 ceph_client_id(struct ceph_client *client)
{
return client->monc.auth->global_id;
......
......@@ -17,6 +17,7 @@ const char *crush_bucket_alg_name(int alg)
case CRUSH_BUCKET_LIST: return "list";
case CRUSH_BUCKET_TREE: return "tree";
case CRUSH_BUCKET_STRAW: return "straw";
case CRUSH_BUCKET_STRAW2: return "straw2";
default: return "unknown";
}
}
......@@ -40,6 +41,8 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)];
case CRUSH_BUCKET_STRAW:
return ((struct crush_bucket_straw *)b)->item_weights[p];
case CRUSH_BUCKET_STRAW2:
return ((struct crush_bucket_straw2 *)b)->item_weights[p];
}
return 0;
}
......@@ -77,6 +80,14 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
kfree(b);
}
void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b)
{
kfree(b->item_weights);
kfree(b->h.perm);
kfree(b->h.items);
kfree(b);
}
void crush_destroy_bucket(struct crush_bucket *b)
{
switch (b->alg) {
......@@ -92,6 +103,9 @@ void crush_destroy_bucket(struct crush_bucket *b)
case CRUSH_BUCKET_STRAW:
crush_destroy_bucket_straw((struct crush_bucket_straw *)b);
break;
case CRUSH_BUCKET_STRAW2:
crush_destroy_bucket_straw2((struct crush_bucket_straw2 *)b);
break;
}
}
......
This diff is collapsed.
......@@ -20,7 +20,7 @@
#include <linux/crush/crush.h>
#include <linux/crush/hash.h>
#include <linux/crush/mapper.h>
#include "crush_ln_table.h"
/*
* Implement the core CRUSH mapping algorithm.
......@@ -238,6 +238,102 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
return bucket->h.items[high];
}
// compute 2^44*log2(input+1)
uint64_t crush_ln(unsigned xin)
{
unsigned x=xin, x1;
int iexpon, index1, index2;
uint64_t RH, LH, LL, xl64, result;
x++;
// normalize input
iexpon = 15;
while(!(x&0x18000)) { x<<=1; iexpon--; }
index1 = (x>>8)<<1;
// RH ~ 2^56/index1
RH = __RH_LH_tbl[index1 - 256];
// LH ~ 2^48 * log2(index1/256)
LH = __RH_LH_tbl[index1 + 1 - 256];
// RH*x ~ 2^48 * (2^15 + xf), xf<2^8
xl64 = (int64_t)x * RH;
xl64 >>= 48;
x1 = xl64;
result = iexpon;
result <<= (12 + 32);
index2 = x1 & 0xff;
// LL ~ 2^48*log2(1.0+index2/2^15)
LL = __LL_tbl[index2];
LH = LH + LL;
LH >>= (48-12 - 32);
result += LH;
return result;
}
/*
* straw2
*
* for reference, see:
*
* http://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
*
*/
static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket,
int x, int r)
{
unsigned i, high = 0;
unsigned u;
unsigned w;
__s64 ln, draw, high_draw = 0;
for (i = 0; i < bucket->h.size; i++) {
w = bucket->item_weights[i];
if (w) {
u = crush_hash32_3(bucket->h.hash, x,
bucket->h.items[i], r);
u &= 0xffff;
/*
* for some reason slightly less than 0x10000 produces
* a slightly more accurate distribution... probably a
* rounding effect.
*
* the natural log lookup table maps [0,0xffff]
* (corresponding to real numbers [1/0x10000, 1] to
* [0, 0xffffffffffff] (corresponding to real numbers
* [-11.090355,0]).
*/
ln = crush_ln(u) - 0x1000000000000ll;
/*
* divide by 16.16 fixed-point weight. note
* that the ln value is negative, so a larger
* weight means a larger (less negative) value
* for draw.
*/
draw = div64_s64(ln, w);
} else {
draw = S64_MIN;
}
if (i == 0 || draw > high_draw) {
high = i;
high_draw = draw;
}
}
return bucket->h.items[high];
}
static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
{
dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
......@@ -255,12 +351,16 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
case CRUSH_BUCKET_STRAW:
return bucket_straw_choose((struct crush_bucket_straw *)in,
x, r);
case CRUSH_BUCKET_STRAW2:
return bucket_straw2_choose((struct crush_bucket_straw2 *)in,
x, r);
default:
dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
return in->items[0];
}
}
/*
* true if device is marked "out" (failed, fully offloaded)
* of the cluster
......@@ -290,6 +390,7 @@ static int is_out(const struct crush_map *map,
* @type: the type of item to choose
* @out: pointer to output vector
* @outpos: our position in that vector
* @out_size: size of the out vector
* @tries: number of attempts to make
* @recurse_tries: number of attempts to have recursive chooseleaf make
* @local_retries: localized retries
......@@ -304,6 +405,7 @@ static int crush_choose_firstn(const struct crush_map *map,
const __u32 *weight, int weight_max,
int x, int numrep, int type,
int *out, int outpos,
int out_size,
unsigned int tries,
unsigned int recurse_tries,
unsigned int local_retries,
......@@ -322,6 +424,7 @@ static int crush_choose_firstn(const struct crush_map *map,
int item = 0;
int itemtype;
int collide, reject;
int count = out_size;
dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n",
recurse_to_leaf ? "_LEAF" : "",
......@@ -329,7 +432,7 @@ static int crush_choose_firstn(const struct crush_map *map,
tries, recurse_tries, local_retries, local_fallback_retries,
parent_r);
for (rep = outpos; rep < numrep; rep++) {
for (rep = outpos; rep < numrep && count > 0 ; rep++) {
/* keep trying until we get a non-out, non-colliding item */
ftotal = 0;
skip_rep = 0;
......@@ -403,7 +506,7 @@ static int crush_choose_firstn(const struct crush_map *map,
map->buckets[-1-item],
weight, weight_max,
x, outpos+1, 0,
out2, outpos,
out2, outpos, count,
recurse_tries, 0,
local_retries,
local_fallback_retries,
......@@ -463,6 +566,7 @@ static int crush_choose_firstn(const struct crush_map *map,
dprintk("CHOOSE got %d\n", item);
out[outpos] = item;
outpos++;
count--;
}
dprintk("CHOOSE returns %d\n", outpos);
......@@ -654,6 +758,7 @@ int crush_do_rule(const struct crush_map *map,
__u32 step;
int i, j;
int numrep;
int out_size;
/*
* the original choose_total_tries value was off by one (it
* counted "retries" and not "tries"). add one.
......@@ -761,6 +866,7 @@ int crush_do_rule(const struct crush_map *map,
x, numrep,
curstep->arg2,
o+osize, j,
result_max-osize,
choose_tries,
recurse_tries,
choose_local_retries,
......@@ -770,11 +876,13 @@ int crush_do_rule(const struct crush_map *map,
c+osize,
0);
} else {
out_size = ((numrep < (result_max-osize)) ?
numrep : (result_max-osize));
crush_choose_indep(
map,
map->buckets[-1-w[i]],
weight, weight_max,
x, numrep, numrep,
x, out_size, numrep,
curstep->arg2,
o+osize, j,
choose_tries,
......@@ -783,7 +891,7 @@ int crush_do_rule(const struct crush_map *map,
recurse_to_leaf,
c+osize,
0);
osize += numrep;
osize += out_size;
}
}
......
......@@ -22,6 +22,7 @@
* .../monmap - current monmap
* .../osdc - active osd requests
* .../monc - mon client state
* .../client_options - libceph-only (i.e. not rbd or cephfs) options
* .../dentry_lru - dump contents of dentry lru
* .../caps - expose cap (reservation) stats
* .../bdi - symlink to ../../bdi/something
......@@ -177,10 +178,24 @@ static int osdc_show(struct seq_file *s, void *pp)
return 0;
}
static int client_options_show(struct seq_file *s, void *p)
{
struct ceph_client *client = s->private;
int ret;
ret = ceph_print_client_options(s, client);
if (ret)
return ret;
seq_putc(s, '\n');
return 0;
}
CEPH_DEFINE_SHOW_FUNC(monmap_show)
CEPH_DEFINE_SHOW_FUNC(osdmap_show)
CEPH_DEFINE_SHOW_FUNC(monc_show)
CEPH_DEFINE_SHOW_FUNC(osdc_show)
CEPH_DEFINE_SHOW_FUNC(client_options_show)
int ceph_debugfs_init(void)
{
......@@ -242,6 +257,14 @@ int ceph_debugfs_client_init(struct ceph_client *client)
if (!client->debugfs_osdmap)
goto out;
client->debugfs_options = debugfs_create_file("client_options",
0600,
client->debugfs_dir,
client,
&client_options_show_fops);
if (!client->debugfs_options)
goto out;
return 0;
out:
......@@ -252,6 +275,7 @@ int ceph_debugfs_client_init(struct ceph_client *client)
void ceph_debugfs_client_cleanup(struct ceph_client *client)
{
dout("ceph_debugfs_client_cleanup %p\n", client);
debugfs_remove(client->debugfs_options);
debugfs_remove(client->debugfs_osdmap);
debugfs_remove(client->debugfs_monmap);
debugfs_remove(client->osdc.debugfs_file);
......
......@@ -505,8 +505,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
pr_err("connect %s error %d\n",
ceph_pr_addr(&con->peer_addr.in_addr), ret);
sock_release(sock);
con->error_msg = "connect error";
return ret;
}
......@@ -2145,12 +2143,10 @@ static int process_connect(struct ceph_connection *con)
* to WAIT. This shouldn't happen if we are the
* client.
*/
pr_err("process_connect got WAIT as client\n");
con->error_msg = "protocol error, got WAIT as client";
return -1;
default:
pr_err("connect protocol error, will retry\n");
con->error_msg = "protocol error, garbage tag during connect";
return -1;
}
......@@ -2282,8 +2278,7 @@ static int read_partial_message(struct ceph_connection *con)
crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
if (cpu_to_le32(crc) != con->in_hdr.crc) {
pr_err("read_partial_message bad hdr "
" crc %u != expected %u\n",
pr_err("read_partial_message bad hdr crc %u != expected %u\n",
crc, con->in_hdr.crc);
return -EBADMSG;
}
......@@ -2313,7 +2308,7 @@ static int read_partial_message(struct ceph_connection *con)
pr_err("read_partial_message bad seq %lld expected %lld\n",
seq, con->in_seq + 1);
con->error_msg = "bad message sequence # for incoming message";
return -EBADMSG;
return -EBADE;
}
/* allocate message? */
......@@ -2660,6 +2655,8 @@ static int try_read(struct ceph_connection *con)
switch (ret) {
case -EBADMSG:
con->error_msg = "bad crc";
/* fall through */
case -EBADE:
ret = -EIO;
break;
case -EIO:
......@@ -2838,7 +2835,8 @@ static void con_work(struct work_struct *work)
if (ret < 0) {
if (ret == -EAGAIN)
continue;
con->error_msg = "socket error on read";
if (!con->error_msg)
con->error_msg = "socket error on read";
fault = true;
break;
}
......@@ -2847,7 +2845,8 @@ static void con_work(struct work_struct *work)
if (ret < 0) {
if (ret == -EAGAIN)
continue;
con->error_msg = "socket error on write";
if (!con->error_msg)
con->error_msg = "socket error on write";
fault = true;
}
......@@ -2869,11 +2868,13 @@ static void con_work(struct work_struct *work)
*/
static void con_fault(struct ceph_connection *con)
{
pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
con->error_msg = NULL;
WARN_ON(con->state != CON_STATE_CONNECTING &&
con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN);
......@@ -3295,8 +3296,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
*/
if (*skip)
return 0;
con->error_msg = "error allocating memory for incoming message";
con->error_msg = "error allocating memory for incoming message";
return -ENOMEM;
}
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
......
......@@ -122,6 +122,22 @@ static int crush_decode_straw_bucket(void **p, void *end,
return -EINVAL;
}
static int crush_decode_straw2_bucket(void **p, void *end,
struct crush_bucket_straw2 *b)
{
int j;
dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
if (b->item_weights == NULL)
return -ENOMEM;
ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
for (j = 0; j < b->h.size; j++)
b->item_weights[j] = ceph_decode_32(p);
return 0;
bad:
return -EINVAL;
}
static int skip_name_map(void **p, void *end)
{
int len;
......@@ -204,6 +220,9 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
case CRUSH_BUCKET_STRAW:
size = sizeof(struct crush_bucket_straw);
break;
case CRUSH_BUCKET_STRAW2:
size = sizeof(struct crush_bucket_straw2);
break;
default:
err = -EINVAL;
goto bad;
......@@ -261,6 +280,12 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
if (err < 0)
goto bad;
break;
case CRUSH_BUCKET_STRAW2:
err = crush_decode_straw2_bucket(p, end,
(struct crush_bucket_straw2 *)b);
if (err < 0)
goto bad;
break;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment