Commit 7c2a69f6 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.9-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "Xiubo has completed his work on filesystem client metrics, they are
  sent to all available MDSes once per second now.

  Other than that, we have a lot of fixes and cleanups all around the
  filesystem, including a tweak to cut down on MDS request resends in
  multi-MDS setups from Yanhu and fixups for SELinux symlink labeling
  and MClientSession message decoding from Jeff"

* tag 'ceph-for-5.9-rc1' of git://github.com/ceph/ceph-client: (22 commits)
  ceph: handle zero-length feature mask in session messages
  ceph: use frag's MDS in either mode
  ceph: move sb->wb_pagevec_pool to be a global mempool
  ceph: set sec_context xattr on symlink creation
  ceph: remove redundant initialization of variable mds
  ceph: fix use-after-free for fsc->mdsc
  ceph: remove unused variables in ceph_mdsmap_decode()
  ceph: delete repeated words in fs/ceph/
  ceph: send client provided metric flags in client metadata
  ceph: periodically send perf metrics to MDSes
  ceph: check the sesion state and return false in case it is closed
  libceph: replace HTTP links with HTTPS ones
  ceph: remove unnecessary cast in kfree()
  libceph: just have osd_req_op_init() return a pointer
  ceph: do not access the kiocb after aio requests
  ceph: clean up and optimize ceph_check_delayed_caps()
  ceph: fix potential mdsc use-after-free crash
  ceph: switch to WARN_ON_ONCE in encode_supported_features()
  ceph: add global total_caps to count the mdsc's total caps number
  ceph: add check_session_state() helper and make it global
  ...
parents 7a02c8d4 02e37571
......@@ -13,7 +13,7 @@ config CEPH_FS
scalable file system designed to provide high performance,
reliable access to petabytes of storage.
More information at http://ceph.newdream.net/.
More information at https://ceph.io/.
If unsure, say N.
......
......@@ -862,8 +862,7 @@ static void writepages_finish(struct ceph_osd_request *req)
osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->pages_from_pool)
mempool_free(osd_data->pages,
ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
else
kfree(osd_data->pages);
ceph_osdc_put_request(req);
......@@ -955,10 +954,10 @@ static int ceph_writepages_start(struct address_space *mapping,
int num_ops = 0, op_idx;
unsigned i, pvec_pages, max_pages, locked_pages = 0;
struct page **pages = NULL, **data_pages;
mempool_t *pool = NULL; /* Becomes non-null if mempool used */
struct page *page;
pgoff_t strip_unit_end = 0;
u64 offset = 0, len = 0;
bool from_pool = false;
max_pages = wsize >> PAGE_SHIFT;
......@@ -1057,15 +1056,15 @@ static int ceph_writepages_start(struct address_space *mapping,
sizeof(*pages),
GFP_NOFS);
if (!pages) {
pool = fsc->wb_pagevec_pool;
pages = mempool_alloc(pool, GFP_NOFS);
from_pool = true;
pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
BUG_ON(!pages);
}
len = 0;
} else if (page->index !=
(offset + len) >> PAGE_SHIFT) {
if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
CEPH_OSD_MAX_OPS)) {
redirty_page_for_writepage(wbc, page);
unlock_page(page);
......@@ -1161,7 +1160,7 @@ static int ceph_writepages_start(struct address_space *mapping,
offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx,
data_pages, len, 0,
!!pool, false);
from_pool, false);
osd_req_op_extent_update(req, op_idx, len);
len = 0;
......@@ -1188,12 +1187,12 @@ static int ceph_writepages_start(struct address_space *mapping,
dout("writepages got pages at %llu~%llu\n", offset, len);
osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
0, !!pool, false);
0, from_pool, false);
osd_req_op_extent_update(req, op_idx, len);
BUG_ON(op_idx + 1 != req->r_num_ops);
pool = NULL;
from_pool = false;
if (i < locked_pages) {
BUG_ON(num_ops <= req->r_num_ops);
num_ops -= req->r_num_ops;
......@@ -1204,8 +1203,8 @@ static int ceph_writepages_start(struct address_space *mapping,
pages = kmalloc_array(locked_pages, sizeof(*pages),
GFP_NOFS);
if (!pages) {
pool = fsc->wb_pagevec_pool;
pages = mempool_alloc(pool, GFP_NOFS);
from_pool = true;
pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
BUG_ON(!pages);
}
memcpy(pages, data_pages + i,
......
......@@ -668,6 +668,7 @@ void ceph_add_cap(struct inode *inode,
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps, &session->s_caps);
session->s_nr_caps++;
atomic64_inc(&mdsc->metric.total_caps);
spin_unlock(&session->s_cap_lock);
} else {
spin_lock(&session->s_cap_lock);
......@@ -1161,6 +1162,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
} else {
list_del_init(&cap->session_caps);
session->s_nr_caps--;
atomic64_dec(&mdsc->metric.total_caps);
cap->session = NULL;
removed = 1;
}
......@@ -4187,10 +4189,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
struct ceph_inode_info *ci;
dout("check_delayed_caps\n");
while (1) {
spin_lock(&mdsc->cap_delay_lock);
if (list_empty(&mdsc->cap_delay_list))
break;
while (!list_empty(&mdsc->cap_delay_list)) {
ci = list_first_entry(&mdsc->cap_delay_list,
struct ceph_inode_info,
i_cap_delay_list);
......@@ -4200,13 +4200,13 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
list_del_init(&ci->i_cap_delay_list);
inode = igrab(&ci->vfs_inode);
spin_unlock(&mdsc->cap_delay_lock);
if (inode) {
spin_unlock(&mdsc->cap_delay_lock);
dout("check_delayed_caps on %p\n", inode);
ceph_check_caps(ci, 0, NULL);
/* avoid calling iput_final() in tick thread */
ceph_async_iput(inode);
spin_lock(&mdsc->cap_delay_lock);
}
}
spin_unlock(&mdsc->cap_delay_lock);
......
......@@ -145,7 +145,7 @@ static int metric_show(struct seq_file *s, void *p)
struct ceph_fs_client *fsc = s->private;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_client_metric *m = &mdsc->metric;
int i, nr_caps = 0;
int nr_caps = 0;
s64 total, sum, avg, min, max, sq;
seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n");
......@@ -190,17 +190,7 @@ static int metric_show(struct seq_file *s, void *p)
percpu_counter_sum(&m->d_lease_mis),
percpu_counter_sum(&m->d_lease_hit));
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
struct ceph_mds_session *s;
s = __ceph_lookup_mds_session(mdsc, i);
if (!s)
continue;
nr_caps += s->s_nr_caps;
ceph_put_mds_session(s);
}
mutex_unlock(&mdsc->mutex);
nr_caps = atomic64_read(&m->total_caps);
seq_printf(s, "%-14s%-16d%-16lld%lld\n", "caps", nr_caps,
percpu_counter_sum(&m->i_caps_mis),
percpu_counter_sum(&m->i_caps_hit));
......@@ -272,7 +262,7 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_auth_client *ac = fsc->client->monc.auth;
struct ceph_options *opt = fsc->client->options;
int mds = -1;
int mds;
mutex_lock(&mdsc->mutex);
......
......@@ -930,6 +930,10 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
req->r_num_caps = 2;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
if (as_ctx.pagelist) {
req->r_pagelist = as_ctx.pagelist;
as_ctx.pagelist = NULL;
}
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
......
......@@ -1538,6 +1538,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct page *pinned_page = NULL;
bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
ssize_t ret;
int want, got = 0;
int retry_op = 0, read = 0;
......@@ -1546,7 +1547,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
if (iocb->ki_flags & IOCB_DIRECT)
if (direct_lock)
ceph_start_io_direct(inode);
else
ceph_start_io_read(inode);
......@@ -1603,7 +1604,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
}
ceph_put_cap_refs(ci, got);
if (iocb->ki_flags & IOCB_DIRECT)
if (direct_lock)
ceph_end_io_direct(inode);
else
ceph_end_io_read(inode);
......
......@@ -1103,8 +1103,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
frag.frag, mds);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE) {
if (mode == USE_ANY_MDS &&
!ceph_mdsmap_is_laggy(mdsc->mdsmap,
if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
mds))
goto out;
}
......@@ -1168,7 +1167,7 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
static void encode_supported_features(void **p, void *end)
static int encode_supported_features(void **p, void *end)
{
static const size_t count = ARRAY_SIZE(feature_bits);
......@@ -1176,16 +1175,64 @@ static void encode_supported_features(void **p, void *end)
size_t i;
size_t size = FEATURE_BYTES(count);
BUG_ON(*p + 4 + size > end);
if (WARN_ON_ONCE(*p + 4 + size > end))
return -ERANGE;
ceph_encode_32(p, size);
memset(*p, 0, size);
for (i = 0; i < count; i++)
((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
*p += size;
} else {
BUG_ON(*p + 4 > end);
if (WARN_ON_ONCE(*p + 4 > end))
return -ERANGE;
ceph_encode_32(p, 0);
}
return 0;
}
static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
static int encode_metric_spec(void **p, void *end)
{
static const size_t count = ARRAY_SIZE(metric_bits);
/* header */
if (WARN_ON_ONCE(*p + 2 > end))
return -ERANGE;
ceph_encode_8(p, 1); /* version */
ceph_encode_8(p, 1); /* compat */
if (count > 0) {
size_t i;
size_t size = METRIC_BYTES(count);
if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
return -ERANGE;
/* metric spec info length */
ceph_encode_32(p, 4 + size);
/* metric spec */
ceph_encode_32(p, size);
memset(*p, 0, size);
for (i = 0; i < count; i++)
((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
*p += size;
} else {
if (WARN_ON_ONCE(*p + 4 + 4 > end))
return -ERANGE;
/* metric spec info length */
ceph_encode_32(p, 4);
/* metric spec */
ceph_encode_32(p, 0);
}
return 0;
}
/*
......@@ -1203,6 +1250,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
size_t size, count;
void *p, *end;
int ret;
const char* metadata[][2] = {
{"hostname", mdsc->nodename},
......@@ -1227,12 +1275,19 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
size = FEATURE_BYTES(count);
extra_bytes += 4 + size;
/* metric spec */
size = 0;
count = ARRAY_SIZE(metric_bits);
if (count > 0)
size = METRIC_BYTES(count);
extra_bytes += 2 + 4 + 4 + size;
/* Allocate the message */
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
GFP_NOFS, false);
if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
return NULL;
return ERR_PTR(-ENOMEM);
}
p = msg->front.iov_base;
end = p + msg->front.iov_len;
......@@ -1245,9 +1300,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
* Serialize client metadata into waiting buffer space, using
* the format that userspace expects for map<string, string>
*
* ClientSession messages with metadata are v3
* ClientSession messages with metadata are v4
*/
msg->hdr.version = cpu_to_le16(3);
msg->hdr.version = cpu_to_le16(4);
msg->hdr.compat_version = cpu_to_le16(1);
/* The write pointer, following the session_head structure */
......@@ -1269,7 +1324,20 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
p += val_len;
}
encode_supported_features(&p, end);
ret = encode_supported_features(&p, end);
if (ret) {
pr_err("encode_supported_features failed!\n");
ceph_msg_put(msg);
return ERR_PTR(ret);
}
ret = encode_metric_spec(&p, end);
if (ret) {
pr_err("encode_metric_spec failed!\n");
ceph_msg_put(msg);
return ERR_PTR(ret);
}
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
......@@ -1297,8 +1365,8 @@ static int __open_session(struct ceph_mds_client *mdsc,
/* send connect message */
msg = create_session_open_msg(mdsc, session->s_seq);
if (!msg)
return -ENOMEM;
if (IS_ERR(msg))
return PTR_ERR(msg);
ceph_con_send(&session->s_con, msg);
return 0;
}
......@@ -1312,6 +1380,7 @@ static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
{
struct ceph_mds_session *session;
int ret;
session = __ceph_lookup_mds_session(mdsc, target);
if (!session) {
......@@ -1320,8 +1389,11 @@ __open_export_target_session(struct ceph_mds_client *mdsc, int target)
return session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING)
__open_session(mdsc, session);
session->s_state == CEPH_MDS_SESSION_CLOSING) {
ret = __open_session(mdsc, session);
if (ret)
return ERR_PTR(ret);
}
return session;
}
......@@ -1485,6 +1557,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
atomic64_dec(&session->s_mdsc->metric.total_caps);
if (cap->queue_release)
__ceph_queue_cap_release(session, cap);
else
......@@ -1785,8 +1858,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
/*
* send a session close request
*/
static int request_close_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
static int request_close_session(struct ceph_mds_session *session)
{
struct ceph_msg *msg;
......@@ -1809,7 +1881,7 @@ static int __close_session(struct ceph_mds_client *mdsc,
if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
return 0;
session->s_state = CEPH_MDS_SESSION_CLOSING;
return request_close_session(mdsc, session);
return request_close_session(session);
}
static bool drop_negative_children(struct dentry *dentry)
......@@ -2520,7 +2592,12 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
ceph_encode_copy(&p, &ts, sizeof(ts));
}
BUG_ON(p > end);
if (WARN_ON_ONCE(p > end)) {
ceph_msg_put(msg);
msg = ERR_PTR(-ERANGE);
goto out_free2;
}
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
......@@ -2756,7 +2833,9 @@ static void __do_request(struct ceph_mds_client *mdsc,
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
__open_session(mdsc, session);
err = __open_session(mdsc, session);
if (err)
goto out_session;
/* retry the same mds later */
if (random)
req->r_resend_mds = mds;
......@@ -3279,9 +3358,11 @@ static void handle_session(struct ceph_mds_session *session,
goto bad;
/* version >= 3, feature bits */
ceph_decode_32_safe(&p, end, len, bad);
if (len) {
ceph_decode_64_safe(&p, end, features, bad);
p += len - sizeof(features);
}
}
mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE) {
......@@ -3310,6 +3391,8 @@ static void handle_session(struct ceph_mds_session *session,
session->s_state = CEPH_MDS_SESSION_OPEN;
session->s_features = features;
renewed_caps(mdsc, session, 0);
if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
metric_schedule_delayed(&mdsc->metric);
wake = 1;
if (mdsc->stopping)
__close_session(mdsc, session);
......@@ -4263,6 +4346,30 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc)
ceph_force_reconnect(fsc->sb);
}
bool check_session_state(struct ceph_mds_session *s)
{
if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
dout("resending session close request for mds%d\n",
s->s_mds);
request_close_session(s);
return false;
}
if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
if (s->s_state == CEPH_MDS_SESSION_OPEN) {
s->s_state = CEPH_MDS_SESSION_HUNG;
pr_info("mds%d hung\n", s->s_mds);
}
}
if (s->s_state == CEPH_MDS_SESSION_NEW ||
s->s_state == CEPH_MDS_SESSION_RESTARTING ||
s->s_state == CEPH_MDS_SESSION_CLOSED ||
s->s_state == CEPH_MDS_SESSION_REJECTED)
/* this mds is failed or recovering, just wait */
return false;
return true;
}
/*
* delayed work -- periodically trim expired leases, renew caps with mds
*/
......@@ -4283,6 +4390,9 @@ static void delayed_work(struct work_struct *work)
dout("mdsc delayed_work\n");
if (mdsc->stopping)
return;
mutex_lock(&mdsc->mutex);
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
renew_caps = time_after_eq(jiffies, HZ*renew_interval +
......@@ -4294,23 +4404,8 @@ static void delayed_work(struct work_struct *work)
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
if (!s)
continue;
if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
dout("resending session close request for mds%d\n",
s->s_mds);
request_close_session(mdsc, s);
ceph_put_mds_session(s);
continue;
}
if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
if (s->s_state == CEPH_MDS_SESSION_OPEN) {
s->s_state = CEPH_MDS_SESSION_HUNG;
pr_info("mds%d hung\n", s->s_mds);
}
}
if (s->s_state == CEPH_MDS_SESSION_NEW ||
s->s_state == CEPH_MDS_SESSION_RESTARTING ||
s->s_state == CEPH_MDS_SESSION_REJECTED) {
/* this mds is failed or recovering, just wait */
if (!check_session_state(s)) {
ceph_put_mds_session(s);
continue;
}
......@@ -4359,7 +4454,6 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
goto err_mdsc;
}
fsc->mdsc = mdsc;
init_completion(&mdsc->safe_umount_waiters);
init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
......@@ -4414,6 +4508,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
strscpy(mdsc->nodename, utsname()->nodename,
sizeof(mdsc->nodename));
fsc->mdsc = mdsc;
return 0;
err_mdsmap:
......@@ -4657,7 +4753,16 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
{
dout("stop\n");
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
/*
* Make sure the delayed work stopped before releasing
* the resources.
*
* Because the cancel_delayed_work_sync() will only
* guarantee that the work finishes executing. But the
* delayed work will re-arm itself again after that.
*/
flush_delayed_work(&mdsc->delayed_work);
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
......@@ -4680,6 +4785,7 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
ceph_metric_destroy(&mdsc->metric);
flush_delayed_work(&mdsc->metric.delayed_work);
fsc->mdsc = NULL;
kfree(mdsc);
dout("mdsc_destroy %p done\n", mdsc);
......
......@@ -18,6 +18,7 @@
#include <linux/ceph/auth.h>
#include "metric.h"
#include "super.h"
/* The first 8 bits are reserved for old ceph releases */
enum ceph_feature_type {
......@@ -27,8 +28,9 @@ enum ceph_feature_type {
CEPHFS_FEATURE_LAZY_CAP_WANTED,
CEPHFS_FEATURE_MULTI_RECONNECT,
CEPHFS_FEATURE_DELEG_INO,
CEPHFS_FEATURE_METRIC_COLLECT,
CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_DELEG_INO,
CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_METRIC_COLLECT,
};
/*
......@@ -42,6 +44,7 @@ enum ceph_feature_type {
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
CEPHFS_FEATURE_MULTI_RECONNECT, \
CEPHFS_FEATURE_DELEG_INO, \
CEPHFS_FEATURE_METRIC_COLLECT, \
\
CEPHFS_FEATURE_MAX, \
}
......@@ -476,6 +479,8 @@ struct ceph_mds_client {
extern const char *ceph_mds_op_name(int op);
extern bool check_session_state(struct ceph_mds_session *s);
extern struct ceph_mds_session *
__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
......
......@@ -120,7 +120,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
const void *start = *p;
int i, j, n;
int err;
u8 mdsmap_v, mdsmap_cv;
u8 mdsmap_v;
u16 mdsmap_ev;
m = kzalloc(sizeof(*m), GFP_NOFS);
......@@ -129,7 +129,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
ceph_decode_need(p, end, 1 + 1, bad);
mdsmap_v = ceph_decode_8(p);
mdsmap_cv = ceph_decode_8(p);
*p += sizeof(u8); /* mdsmap_cv */
if (mdsmap_v >= 4) {
u32 mdsmap_len;
ceph_decode_32_safe(p, end, mdsmap_len, bad);
......@@ -174,7 +174,6 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
u64 global_id;
u32 namelen;
s32 mds, inc, state;
u64 state_seq;
u8 info_v;
void *info_end = NULL;
struct ceph_entity_addr addr;
......@@ -189,9 +188,8 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
info_v= ceph_decode_8(p);
if (info_v >= 4) {
u32 info_len;
u8 info_cv;
ceph_decode_need(p, end, 1 + sizeof(u32), bad);
info_cv = ceph_decode_8(p);
*p += sizeof(u8); /* info_cv */
info_len = ceph_decode_32(p);
info_end = *p + info_len;
if (info_end > end)
......@@ -210,7 +208,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
mds = ceph_decode_32(p);
inc = ceph_decode_32(p);
state = ceph_decode_32(p);
state_seq = ceph_decode_64(p);
*p += sizeof(u64); /* state_seq */
err = ceph_decode_entity_addr(p, end, &addr);
if (err)
goto corrupt;
......
/* SPDX-License-Identifier: GPL-2.0 */
#include <linux/ceph/ceph_debug.h>
#include <linux/types.h>
#include <linux/percpu_counter.h>
#include <linux/math64.h>
#include "metric.h"
#include "mds_client.h"
static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
struct ceph_mds_session *s)
{
struct ceph_metric_head *head;
struct ceph_metric_cap *cap;
struct ceph_metric_read_latency *read;
struct ceph_metric_write_latency *write;
struct ceph_metric_metadata_latency *meta;
struct ceph_client_metric *m = &mdsc->metric;
u64 nr_caps = atomic64_read(&m->total_caps);
struct ceph_msg *msg;
struct timespec64 ts;
s64 sum;
s32 items = 0;
s32 len;
len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
+ sizeof(*meta);
msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
if (!msg) {
pr_err("send metrics to mds%d, failed to allocate message\n",
s->s_mds);
return false;
}
head = msg->front.iov_base;
/* encode the cap metric */
cap = (struct ceph_metric_cap *)(head + 1);
cap->type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
cap->ver = 1;
cap->compat = 1;
cap->data_len = cpu_to_le32(sizeof(*cap) - 10);
cap->hit = cpu_to_le64(percpu_counter_sum(&mdsc->metric.i_caps_hit));
cap->mis = cpu_to_le64(percpu_counter_sum(&mdsc->metric.i_caps_mis));
cap->total = cpu_to_le64(nr_caps);
items++;
/* encode the read latency metric */
read = (struct ceph_metric_read_latency *)(cap + 1);
read->type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
read->ver = 1;
read->compat = 1;
read->data_len = cpu_to_le32(sizeof(*read) - 10);
sum = m->read_latency_sum;
jiffies_to_timespec64(sum, &ts);
read->sec = cpu_to_le32(ts.tv_sec);
read->nsec = cpu_to_le32(ts.tv_nsec);
items++;
/* encode the write latency metric */
write = (struct ceph_metric_write_latency *)(read + 1);
write->type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
write->ver = 1;
write->compat = 1;
write->data_len = cpu_to_le32(sizeof(*write) - 10);
sum = m->write_latency_sum;
jiffies_to_timespec64(sum, &ts);
write->sec = cpu_to_le32(ts.tv_sec);
write->nsec = cpu_to_le32(ts.tv_nsec);
items++;
/* encode the metadata latency metric */
meta = (struct ceph_metric_metadata_latency *)(write + 1);
meta->type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
meta->ver = 1;
meta->compat = 1;
meta->data_len = cpu_to_le32(sizeof(*meta) - 10);
sum = m->metadata_latency_sum;
jiffies_to_timespec64(sum, &ts);
meta->sec = cpu_to_le32(ts.tv_sec);
meta->nsec = cpu_to_le32(ts.tv_nsec);
items++;
put_unaligned_le32(items, &head->num);
msg->front.iov_len = len;
msg->hdr.version = cpu_to_le16(1);
msg->hdr.compat_version = cpu_to_le16(1);
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
dout("client%llu send metrics to mds%d\n",
ceph_client_gid(mdsc->fsc->client), s->s_mds);
ceph_con_send(&s->s_con, msg);
return true;
}
static void metric_get_session(struct ceph_mds_client *mdsc)
{
struct ceph_mds_session *s;
int i;
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
s = __ceph_lookup_mds_session(mdsc, i);
if (!s)
continue;
/*
* Skip it if MDS doesn't support the metric collection,
* or the MDS will close the session's socket connection
* directly when it get this message.
*/
if (check_session_state(s) &&
test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &s->s_features)) {
mdsc->metric.session = s;
break;
}
ceph_put_mds_session(s);
}
mutex_unlock(&mdsc->mutex);
}
static void metric_delayed_work(struct work_struct *work)
{
struct ceph_client_metric *m =
container_of(work, struct ceph_client_metric, delayed_work.work);
struct ceph_mds_client *mdsc =
container_of(m, struct ceph_mds_client, metric);
if (mdsc->stopping)
return;
if (!m->session || !check_session_state(m->session)) {
if (m->session) {
ceph_put_mds_session(m->session);
m->session = NULL;
}
metric_get_session(mdsc);
}
if (m->session) {
ceph_mdsc_send_metrics(mdsc, m->session);
metric_schedule_delayed(m);
}
}
int ceph_metric_init(struct ceph_client_metric *m)
{
......@@ -22,6 +162,7 @@ int ceph_metric_init(struct ceph_client_metric *m)
if (ret)
goto err_d_lease_mis;
atomic64_set(&m->total_caps, 0);
ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
if (ret)
goto err_i_caps_hit;
......@@ -51,6 +192,9 @@ int ceph_metric_init(struct ceph_client_metric *m)
m->total_metadatas = 0;
m->metadata_latency_sum = 0;
m->session = NULL;
INIT_DELAYED_WORK(&m->delayed_work, metric_delayed_work);
return 0;
err_i_caps_mis:
......@@ -72,6 +216,11 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
percpu_counter_destroy(&m->i_caps_hit);
percpu_counter_destroy(&m->d_lease_mis);
percpu_counter_destroy(&m->d_lease_hit);
cancel_delayed_work_sync(&m->delayed_work);
if (m->session)
ceph_put_mds_session(m->session);
}
static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
......
......@@ -6,12 +6,91 @@
#include <linux/percpu_counter.h>
#include <linux/ktime.h>
extern bool disable_send_metrics;
enum ceph_metric_type {
CLIENT_METRIC_TYPE_CAP_INFO,
CLIENT_METRIC_TYPE_READ_LATENCY,
CLIENT_METRIC_TYPE_WRITE_LATENCY,
CLIENT_METRIC_TYPE_METADATA_LATENCY,
CLIENT_METRIC_TYPE_DENTRY_LEASE,
CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_DENTRY_LEASE,
};
/*
* This will always have the highest metric bit value
* as the last element of the array.
*/
#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED { \
CLIENT_METRIC_TYPE_CAP_INFO, \
CLIENT_METRIC_TYPE_READ_LATENCY, \
CLIENT_METRIC_TYPE_WRITE_LATENCY, \
CLIENT_METRIC_TYPE_METADATA_LATENCY, \
\
CLIENT_METRIC_TYPE_MAX, \
}
/* metric caps header */
struct ceph_metric_cap {
__le32 type; /* ceph metric type */
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(hit + mis + total) */
__le64 hit;
__le64 mis;
__le64 total;
} __packed;
/* metric read latency header */
struct ceph_metric_read_latency {
__le32 type; /* ceph metric type */
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(sec + nsec) */
__le32 sec;
__le32 nsec;
} __packed;
/* metric write latency header */
struct ceph_metric_write_latency {
__le32 type; /* ceph metric type */
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(sec + nsec) */
__le32 sec;
__le32 nsec;
} __packed;
/* metric metadata latency header */
struct ceph_metric_metadata_latency {
__le32 type; /* ceph metric type */
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(sec + nsec) */
__le32 sec;
__le32 nsec;
} __packed;
struct ceph_metric_head {
__le32 num; /* the number of metrics that will be sent */
} __packed;
/* This is the global metrics */
struct ceph_client_metric {
atomic64_t total_dentries;
struct percpu_counter d_lease_hit;
struct percpu_counter d_lease_mis;
atomic64_t total_caps;
struct percpu_counter i_caps_hit;
struct percpu_counter i_caps_mis;
......@@ -35,8 +114,20 @@ struct ceph_client_metric {
ktime_t metadata_latency_sq_sum;
ktime_t metadata_latency_min;
ktime_t metadata_latency_max;
struct ceph_mds_session *session;
struct delayed_work delayed_work; /* delayed work */
};
static inline void metric_schedule_delayed(struct ceph_client_metric *m)
{
if (disable_send_metrics)
return;
/* per second */
schedule_delayed_work(&m->delayed_work, round_jiffies_relative(HZ));
}
extern int ceph_metric_init(struct ceph_client_metric *m);
extern void ceph_metric_destroy(struct ceph_client_metric *m);
......
......@@ -27,6 +27,9 @@
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>
static DEFINE_SPINLOCK(ceph_fsc_lock);
static LIST_HEAD(ceph_fsc_list);
/*
* Ceph superblock operations
*
......@@ -634,8 +637,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
struct ceph_options *opt)
{
struct ceph_fs_client *fsc;
int page_count;
size_t size;
int err;
fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
......@@ -683,18 +684,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
if (!fsc->cap_wq)
goto fail_inode_wq;
/* set up mempools */
err = -ENOMEM;
page_count = fsc->mount_options->wsize >> PAGE_SHIFT;
size = sizeof (struct page *) * (page_count ? page_count : 1);
fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
if (!fsc->wb_pagevec_pool)
goto fail_cap_wq;
spin_lock(&ceph_fsc_lock);
list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
spin_unlock(&ceph_fsc_lock);
return fsc;
fail_cap_wq:
destroy_workqueue(fsc->cap_wq);
fail_inode_wq:
destroy_workqueue(fsc->inode_wq);
fail_client:
......@@ -717,12 +712,14 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);
spin_lock(&ceph_fsc_lock);
list_del(&fsc->metric_wakeup);
spin_unlock(&ceph_fsc_lock);
ceph_mdsc_destroy(fsc);
destroy_workqueue(fsc->inode_wq);
destroy_workqueue(fsc->cap_wq);
mempool_destroy(fsc->wb_pagevec_pool);
destroy_mount_options(fsc->mount_options);
ceph_destroy_client(fsc->client);
......@@ -741,6 +738,7 @@ struct kmem_cache *ceph_dentry_cachep;
struct kmem_cache *ceph_file_cachep;
struct kmem_cache *ceph_dir_file_cachep;
struct kmem_cache *ceph_mds_request_cachep;
mempool_t *ceph_wb_pagevec_pool;
static void ceph_inode_init_once(void *foo)
{
......@@ -785,6 +783,10 @@ static int __init init_caches(void)
if (!ceph_mds_request_cachep)
goto bad_mds_req;
ceph_wb_pagevec_pool = mempool_create_kmalloc_pool(10, CEPH_MAX_WRITE_SIZE >> PAGE_SHIFT);
if (!ceph_wb_pagevec_pool)
goto bad_pagevec_pool;
error = ceph_fscache_register();
if (error)
goto bad_fscache;
......@@ -793,6 +795,8 @@ static int __init init_caches(void)
bad_fscache:
kmem_cache_destroy(ceph_mds_request_cachep);
bad_pagevec_pool:
mempool_destroy(ceph_wb_pagevec_pool);
bad_mds_req:
kmem_cache_destroy(ceph_dir_file_cachep);
bad_dir_file:
......@@ -823,12 +827,13 @@ static void destroy_caches(void)
kmem_cache_destroy(ceph_file_cachep);
kmem_cache_destroy(ceph_dir_file_cachep);
kmem_cache_destroy(ceph_mds_request_cachep);
mempool_destroy(ceph_wb_pagevec_pool);
ceph_fscache_unregister();
}
/*
* ceph_umount_begin - initiate forced umount. Tear down down the
* ceph_umount_begin - initiate forced umount. Tear down the
* mount, skipping steps that may hang while waiting for server(s).
*/
static void ceph_umount_begin(struct super_block *sb)
......@@ -1282,6 +1287,37 @@ static void __exit exit_ceph(void)
destroy_caches();
}
static int param_set_metrics(const char *val, const struct kernel_param *kp)
{
struct ceph_fs_client *fsc;
int ret;
ret = param_set_bool(val, kp);
if (ret) {
pr_err("Failed to parse sending metrics switch value '%s'\n",
val);
return ret;
} else if (!disable_send_metrics) {
// wake up all the mds clients
spin_lock(&ceph_fsc_lock);
list_for_each_entry(fsc, &ceph_fsc_list, metric_wakeup) {
metric_schedule_delayed(&fsc->mdsc->metric);
}
spin_unlock(&ceph_fsc_lock);
}
return 0;
}
static const struct kernel_param_ops param_ops_metrics = {
.set = param_set_metrics,
.get = param_get_bool,
};
bool disable_send_metrics = false;
module_param_cb(disable_send_metrics, &param_ops_metrics, &disable_send_metrics, 0644);
MODULE_PARM_DESC(disable_send_metrics, "Enable sending perf metrics to ceph cluster (default: on)");
module_init(init_ceph);
module_exit(exit_ceph);
......
......@@ -101,6 +101,8 @@ struct ceph_mount_options {
struct ceph_fs_client {
struct super_block *sb;
struct list_head metric_wakeup;
struct ceph_mount_options *mount_options;
struct ceph_client *client;
......@@ -116,8 +118,6 @@ struct ceph_fs_client {
struct ceph_mds_client *mdsc;
/* writeback */
mempool_t *wb_pagevec_pool;
atomic_long_t writeback_count;
struct workqueue_struct *inode_wq;
......@@ -353,7 +353,7 @@ struct ceph_inode_info {
unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */
/*
* Link to the the auth cap's session's s_cap_dirty list. s_cap_dirty
* Link to the auth cap's session's s_cap_dirty list. s_cap_dirty
* is protected by the mdsc->cap_dirty_lock, but each individual item
* is also protected by the inode's i_ceph_lock. Walking s_cap_dirty
* requires the mdsc->cap_dirty_lock. List presence for an item can
......
......@@ -497,10 +497,10 @@ static int __set_xattr(struct ceph_inode_info *ci,
kfree(*newxattr);
*newxattr = NULL;
if (xattr->should_free_val)
kfree((void *)xattr->val);
kfree(xattr->val);
if (update_xattr) {
kfree((void *)name);
kfree(name);
name = xattr->name;
}
ci->i_xattrs.names_size -= xattr->name_len;
......@@ -566,9 +566,9 @@ static void __free_xattr(struct ceph_inode_xattr *xattr)
BUG_ON(!xattr);
if (xattr->should_free_name)
kfree((void *)xattr->name);
kfree(xattr->name);
if (xattr->should_free_val)
kfree((void *)xattr->val);
kfree(xattr->val);
kfree(xattr);
}
......@@ -582,9 +582,9 @@ static int __remove_xattr(struct ceph_inode_info *ci,
rb_erase(&xattr->node, &ci->i_xattrs.index);
if (xattr->should_free_name)
kfree((void *)xattr->name);
kfree(xattr->name);
if (xattr->should_free_val)
kfree((void *)xattr->val);
kfree(xattr->val);
ci->i_xattrs.names_size -= xattr->name_len;
ci->i_xattrs.vals_size -= xattr->val_len;
......
......@@ -58,7 +58,7 @@
* because 10.2.z (jewel) did not care if its peers advertised this
* feature bit.
*
* - In the second phase we stop advertising the the bit and call it
* - In the second phase we stop advertising the bit and call it
* RETIRED. This can normally be done in the *next* major release
* following the one in which we marked the feature DEPRECATED. In
* the above example, for 12.0.z (luminous) we can say:
......
......@@ -130,6 +130,7 @@ struct ceph_dir_layout {
#define CEPH_MSG_CLIENT_REQUEST 24
#define CEPH_MSG_CLIENT_REQUEST_FORWARD 25
#define CEPH_MSG_CLIENT_REPLY 26
#define CEPH_MSG_CLIENT_METRICS 29
#define CEPH_MSG_CLIENT_CAPS 0x310
#define CEPH_MSG_CLIENT_LEASE 0x311
#define CEPH_MSG_CLIENT_SNAP 0x312
......
......@@ -282,6 +282,7 @@ extern struct kmem_cache *ceph_dentry_cachep;
extern struct kmem_cache *ceph_file_cachep;
extern struct kmem_cache *ceph_dir_file_cachep;
extern struct kmem_cache *ceph_mds_request_cachep;
extern mempool_t *ceph_wb_pagevec_pool;
/* ceph_common.c */
extern bool libceph_compatible(void *data);
......
......@@ -404,7 +404,7 @@ void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc);
&__oreq->r_ops[__whch].typ.fld; \
})
extern void osd_req_op_init(struct ceph_osd_request *osd_req,
struct ceph_osd_req_op *osd_req_op_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode, u32 flags);
extern void osd_req_op_raw_data_in_pages(struct ceph_osd_request *,
......
......@@ -17,7 +17,7 @@
* The algorithm was originally described in detail in this paper
* (although the algorithm has evolved somewhat since then):
*
* http://www.ssrc.ucsc.edu/Papers/weil-sc06.pdf
* https://www.ssrc.ucsc.edu/Papers/weil-sc06.pdf
*
* LGPL2
*/
......
......@@ -13,7 +13,7 @@ config CEPH_LIB
common functionality to both the Ceph filesystem and
to the rados block device (rbd).
More information at http://ceph.newdream.net/.
More information at https://ceph.io/.
If unsure, say N.
......
......@@ -4,7 +4,7 @@
/*
* Robert Jenkin's hash function.
* http://burtleburtle.net/bob/hash/evahash.html
* https://burtleburtle.net/bob/hash/evahash.html
* This is in the public domain.
*/
#define mix(a, b, c) \
......
......@@ -7,7 +7,7 @@
/*
* Robert Jenkins' function for mixing 32-bit values
* http://burtleburtle.net/bob/hash/evahash.html
* https://burtleburtle.net/bob/hash/evahash.html
* a, b = random bits, c = input and output
*/
#define crush_hashmix(a, b, c) do { \
......
......@@ -298,7 +298,7 @@ static __u64 crush_ln(unsigned int xin)
*
* for reference, see:
*
* http://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
* https://en.wikipedia.org/wiki/Exponential_distribution#Distribution_of_the_minimum_of_exponential_random_variables
*
*/
......
......@@ -223,6 +223,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
if (op->op == CEPH_OSD_OP_WATCH)
seq_printf(s, "-%s",
ceph_osd_watch_op_name(op->watch.op));
else if (op->op == CEPH_OSD_OP_CALL)
seq_printf(s, "-%s/%s", op->cls.class_name,
op->cls.method_name);
}
seq_putc(s, '\n');
......
......@@ -525,7 +525,7 @@ EXPORT_SYMBOL(ceph_osdc_put_request);
static void request_init(struct ceph_osd_request *req)
{
/* req only, each op is zeroed in _osd_req_op_init() */
/* req only, each op is zeroed in osd_req_op_init() */
memset(req, 0, sizeof(*req));
kref_init(&req->r_kref);
......@@ -746,8 +746,8 @@ EXPORT_SYMBOL(ceph_osdc_alloc_messages);
* other information associated with them. It also serves as a
* common init routine for all the other init functions, below.
*/
static struct ceph_osd_req_op *
_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
struct ceph_osd_req_op *
osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, u32 flags)
{
struct ceph_osd_req_op *op;
......@@ -762,12 +762,6 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
return op;
}
void osd_req_op_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode, u32 flags)
{
(void)_osd_req_op_init(osd_req, which, opcode, flags);
}
EXPORT_SYMBOL(osd_req_op_init);
void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
......@@ -775,7 +769,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
opcode, 0);
size_t payload_len = 0;
......@@ -822,7 +816,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
BUG_ON(which + 1 >= osd_req->r_num_ops);
prev_op = &osd_req->r_ops[which];
op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
/* dup previous one */
op->indata_len = prev_op->indata_len;
op->outdata_len = prev_op->outdata_len;
......@@ -845,7 +839,7 @@ int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
size_t size;
int ret;
op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!pagelist)
......@@ -883,7 +877,7 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *name, const void *value,
size_t size, u8 cmp_op, u8 cmp_mode)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
opcode, 0);
struct ceph_pagelist *pagelist;
size_t payload_len;
......@@ -928,7 +922,7 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
{
struct ceph_osd_req_op *op;
op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
op->watch.cookie = cookie;
op->watch.op = watch_opcode;
op->watch.gen = 0;
......@@ -943,10 +937,9 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
u64 expected_write_size,
u32 flags)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
CEPH_OSD_OP_SETALLOCHINT,
0);
struct ceph_osd_req_op *op;
op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
op->alloc_hint.expected_object_size = expected_object_size;
op->alloc_hint.expected_write_size = expected_write_size;
op->alloc_hint.flags = flags;
......@@ -3076,9 +3069,7 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
cancel_linger_request(req);
request_reinit(req);
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
req->r_flags = lreq->t.flags;
target_copy(&req->r_t, &lreq->t);
req->r_mtime = lreq->mtime;
mutex_lock(&lreq->lock);
......@@ -4801,7 +4792,7 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
struct ceph_pagelist *pl;
int ret;
op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
pl = ceph_pagelist_alloc(GFP_NOIO);
if (!pl)
......@@ -4870,7 +4861,7 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
struct ceph_pagelist *pl;
int ret;
op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
op->notify.cookie = cookie;
pl = ceph_pagelist_alloc(GFP_NOIO);
......@@ -5334,7 +5325,7 @@ static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
if (IS_ERR(pages))
return PTR_ERR(pages);
op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
dst_fadvise_flags);
op->copy_from.snapid = src_snapid;
op->copy_from.src_version = src_version;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment