Commit 2b0a80b0 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - rbd will now ignore discards that aren't aligned and big enough to
     actually free up some space (myself). This is controlled by the new
     alloc_size map option and can be disabled if needed.

   - support for rbd deep-flatten feature (myself). Deep-flatten allows
     "rbd flatten" to fully disconnect the clone image and its snapshots
     from the parent and make the parent snapshot removable.

   - a new round of cap handling improvements (Zheng Yan). The kernel
     client should now be much more prompt about releasing its caps and
     it is possible to put a limit on the number of caps held.

   - support for getting ceph.dir.pin extended attribute (Zheng Yan)"

* tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client: (26 commits)
  Documentation: modern versions of ceph are not backed by btrfs
  rbd: advertise support for RBD_FEATURE_DEEP_FLATTEN
  rbd: whole-object write and zeroout should copyup when snapshots exist
  rbd: copyup with an empty snapshot context (aka deep-copyup)
  rbd: introduce rbd_obj_issue_copyup_ops()
  rbd: stop copying num_osd_ops in rbd_obj_issue_copyup()
  rbd: factor out __rbd_osd_req_create()
  rbd: clear ->xferred on error from rbd_obj_issue_copyup()
  rbd: remove experimental designation from kernel layering
  ceph: add mount option to limit caps count
  ceph: periodically trim stale dentries
  ceph: delete stale dentry when last reference is dropped
  ceph: remove dentry_lru file from debugfs
  ceph: touch existing cap when handling reply
  ceph: pass inclusive lend parameter to filemap_write_and_wait_range()
  rbd: round off and ignore discards that are too small
  rbd: handle DISCARD and WRITE_ZEROES separately
  rbd: get rid of obj_req->obj_request_count
  libceph: use struct_size() for kmalloc() in crush_decode()
  ceph: send cap releases more aggressively
  ...
parents 92825b02 d11ae8e0
......@@ -22,9 +22,7 @@ In contrast to cluster filesystems like GFS, OCFS2, and GPFS that rely
on symmetric access by all clients to shared block devices, Ceph
separates data and metadata management into independent server
clusters, similar to Lustre. Unlike Lustre, however, metadata and
storage nodes run entirely as user space daemons. Storage nodes
utilize btrfs to store data objects, leveraging its advanced features
(checksumming, metadata replication, etc.). File data is striped
storage nodes run entirely as user space daemons. File data is striped
across storage nodes in large chunks to distribute workload and
facilitate high throughputs. When storage nodes fail, data is
re-replicated in a distributed fashion by the storage nodes themselves
......@@ -118,6 +116,10 @@ Mount Options
of a non-responsive Ceph file system. The default is 30
seconds.
caps_max=X
Specify the maximum number of caps to hold. Unused caps are released
when number of caps exceeds the limit. The default is 0 (no limit)
rbytes
When stat() is called on a directory, set st_size to 'rbytes',
the summation of file sizes over all files nested beneath that
......@@ -160,11 +162,11 @@ More Information
================
For more information on Ceph, see the home page at
http://ceph.newdream.net/
https://ceph.com/
The Linux kernel client source tree is available at
git://ceph.newdream.net/git/ceph-client.git
https://github.com/ceph/ceph-client.git
git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
and the source for the full system is at
git://ceph.newdream.net/git/ceph.git
https://github.com/ceph/ceph.git
......@@ -115,12 +115,14 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_FEATURE_LAYERING (1ULL<<0)
#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
#define RBD_FEATURE_DATA_POOL (1ULL<<7)
#define RBD_FEATURE_OPERATIONS (1ULL<<8)
#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
RBD_FEATURE_STRIPINGV2 | \
RBD_FEATURE_EXCLUSIVE_LOCK | \
RBD_FEATURE_DEEP_FLATTEN | \
RBD_FEATURE_DATA_POOL | \
RBD_FEATURE_OPERATIONS)
......@@ -214,28 +216,40 @@ enum obj_operation_type {
OBJ_OP_READ = 1,
OBJ_OP_WRITE,
OBJ_OP_DISCARD,
OBJ_OP_ZEROOUT,
};
/*
* Writes go through the following state machine to deal with
* layering:
*
* need copyup
* RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
* | ^ |
* v \------------------------------/
* done
* . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
* . | .
* . v .
* . RBD_OBJ_WRITE_READ_FROM_PARENT. . . .
* . | . .
* . v v (deep-copyup .
* (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) .
* flattened) v | . .
* . v . .
* . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup .
* | not needed) v
* v .
* done . . . . . . . . . . . . . . . . . .
* ^
* |
* RBD_OBJ_WRITE_FLAT
*
* Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
* there is a parent or not.
* assert_exists guard is needed or not (in some cases it's not needed
* even if there is a parent).
*/
enum rbd_obj_write_state {
RBD_OBJ_WRITE_FLAT = 1,
RBD_OBJ_WRITE_GUARD,
RBD_OBJ_WRITE_COPYUP,
RBD_OBJ_WRITE_READ_FROM_PARENT,
RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC,
RBD_OBJ_WRITE_COPYUP_OPS,
};
struct rbd_obj_request {
......@@ -291,7 +305,6 @@ struct rbd_img_request {
int result; /* first nonzero obj_request result */
struct list_head object_extents; /* obj_req.ex structs */
u32 obj_request_count;
u32 pending_count;
struct kref kref;
......@@ -421,6 +434,10 @@ static DEFINE_IDA(rbd_dev_id_ida);
static struct workqueue_struct *rbd_wq;
static struct ceph_snap_context rbd_empty_snapc = {
.nref = REFCOUNT_INIT(1),
};
/*
* single-major requires >= 0.75 version of userspace rbd utility.
*/
......@@ -732,6 +749,7 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
*/
enum {
Opt_queue_depth,
Opt_alloc_size,
Opt_lock_timeout,
Opt_last_int,
/* int args above */
......@@ -748,6 +766,7 @@ enum {
static match_table_t rbd_opts_tokens = {
{Opt_queue_depth, "queue_depth=%d"},
{Opt_alloc_size, "alloc_size=%d"},
{Opt_lock_timeout, "lock_timeout=%d"},
/* int args above */
{Opt_pool_ns, "_pool_ns=%s"},
......@@ -764,6 +783,7 @@ static match_table_t rbd_opts_tokens = {
struct rbd_options {
int queue_depth;
int alloc_size;
unsigned long lock_timeout;
bool read_only;
bool lock_on_read;
......@@ -772,6 +792,7 @@ struct rbd_options {
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
#define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
......@@ -811,6 +832,17 @@ static int parse_rbd_opts_token(char *c, void *private)
}
pctx->opts->queue_depth = intval;
break;
case Opt_alloc_size:
if (intval < 1) {
pr_err("alloc_size out of range\n");
return -EINVAL;
}
if (!is_power_of_2(intval)) {
pr_err("alloc_size must be a power of 2\n");
return -EINVAL;
}
pctx->opts->alloc_size = intval;
break;
case Opt_lock_timeout:
/* 0 is "wait forever" (i.e. infinite timeout) */
if (intval < 0 || intval > INT_MAX / 1000) {
......@@ -857,6 +889,8 @@ static char* obj_op_name(enum obj_operation_type op_type)
return "write";
case OBJ_OP_DISCARD:
return "discard";
case OBJ_OP_ZEROOUT:
return "zeroout";
default:
return "???";
}
......@@ -1344,7 +1378,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
/* Image request now owns object's original reference */
obj_request->img_request = img_request;
img_request->obj_request_count++;
img_request->pending_count++;
dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
}
......@@ -1354,8 +1387,6 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
{
dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
list_del(&obj_request->ex.oe_item);
rbd_assert(img_request->obj_request_count > 0);
img_request->obj_request_count--;
rbd_assert(obj_request->img_request == img_request);
rbd_obj_request_put(obj_request);
}
......@@ -1409,6 +1440,19 @@ static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
rbd_dev->layout.object_size;
}
/*
* Must be called after rbd_obj_calc_img_extents().
*/
static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
{
if (!obj_req->num_img_extents ||
(rbd_obj_is_entire(obj_req) &&
!obj_req->img_request->snapc->num_snaps))
return false;
return true;
}
static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
{
return ceph_file_extents_bytes(obj_req->img_extents,
......@@ -1422,6 +1466,7 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req)
return false;
case OBJ_OP_WRITE:
case OBJ_OP_DISCARD:
case OBJ_OP_ZEROOUT:
return true;
default:
BUG();
......@@ -1470,18 +1515,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
}
static struct ceph_osd_request *
rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
__rbd_osd_req_create(struct rbd_obj_request *obj_req,
struct ceph_snap_context *snapc, unsigned int num_ops)
{
struct rbd_img_request *img_req = obj_req->img_request;
struct rbd_device *rbd_dev = img_req->rbd_dev;
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_request *req;
const char *name_format = rbd_dev->image_format == 1 ?
RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
req = ceph_osdc_alloc_request(osdc,
(rbd_img_is_write(img_req) ? img_req->snapc : NULL),
num_ops, false, GFP_NOIO);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
if (!req)
return NULL;
......@@ -1506,6 +1549,13 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
return NULL;
}
static struct ceph_osd_request *
rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
{
return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
num_ops);
}
static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
{
ceph_osdc_put_request(osd_req);
......@@ -1671,7 +1721,6 @@ static void rbd_img_request_destroy(struct kref *kref)
for_each_obj_request_safe(img_request, obj_request, next_obj_request)
rbd_img_obj_request_del(img_request, obj_request);
rbd_assert(img_request->obj_request_count == 0);
if (img_request_layered_test(img_request)) {
img_request_layered_clear(img_request);
......@@ -1754,7 +1803,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
{
obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
if (!obj_req->osd_req)
return -ENOMEM;
......@@ -1790,6 +1839,11 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
return 0;
}
static int count_write_ops(struct rbd_obj_request *obj_req)
{
return 2; /* setallochint + write/writefull */
}
static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
unsigned int which)
{
......@@ -1816,6 +1870,7 @@ static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
{
unsigned int num_osd_ops, which = 0;
bool need_guard;
int ret;
/* reverse map the entire object onto the parent */
......@@ -1823,35 +1878,102 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
if (ret)
return ret;
if (obj_req->num_img_extents) {
obj_req->write_state = RBD_OBJ_WRITE_GUARD;
num_osd_ops = 3; /* stat + setallochint + write/writefull */
} else {
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
num_osd_ops = 2; /* setallochint + write/writefull */
}
need_guard = rbd_obj_copyup_enabled(obj_req);
num_osd_ops = need_guard + count_write_ops(obj_req);
obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
if (!obj_req->osd_req)
return -ENOMEM;
if (obj_req->num_img_extents) {
if (need_guard) {
ret = __rbd_obj_setup_stat(obj_req, which++);
if (ret)
return ret;
obj_req->write_state = RBD_OBJ_WRITE_GUARD;
} else {
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
}
__rbd_obj_setup_write(obj_req, which);
return 0;
}
static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
{
return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
CEPH_OSD_OP_ZERO;
}
static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
{
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
u64 off = obj_req->ex.oe_off;
u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
int ret;
/*
* Align the range to alloc_size boundary and punt on discards
* that are too small to free up any space.
*
* alloc_size == object_size && is_tail() is a special case for
* filestore with filestore_punch_hole = false, needed to allow
* truncate (in addition to delete).
*/
if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
!rbd_obj_is_tail(obj_req)) {
off = round_up(off, rbd_dev->opts->alloc_size);
next_off = round_down(next_off, rbd_dev->opts->alloc_size);
if (off >= next_off)
return 1;
}
/* reverse map the entire object onto the parent */
ret = rbd_obj_calc_img_extents(obj_req, true);
if (ret)
return ret;
obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
if (!obj_req->osd_req)
return -ENOMEM;
if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
} else {
dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
off, next_off - off);
osd_req_op_extent_init(obj_req->osd_req, 0,
truncate_or_zero_opcode(obj_req),
off, next_off - off, 0, 0);
}
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
rbd_osd_req_format_write(obj_req);
return 0;
}
static int count_zeroout_ops(struct rbd_obj_request *obj_req)
{
int num_osd_ops;
if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
!rbd_obj_copyup_enabled(obj_req))
num_osd_ops = 2; /* create + truncate */
else
num_osd_ops = 1; /* delete/truncate/zero */
return num_osd_ops;
}
static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
unsigned int which)
{
u16 opcode;
if (rbd_obj_is_entire(obj_req)) {
if (obj_req->num_img_extents) {
if (!rbd_obj_copyup_enabled(obj_req))
osd_req_op_init(obj_req->osd_req, which++,
CEPH_OSD_OP_CREATE, 0);
opcode = CEPH_OSD_OP_TRUNCATE;
......@@ -1860,10 +1982,8 @@ static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
CEPH_OSD_OP_DELETE, 0);
opcode = 0;
}
} else if (rbd_obj_is_tail(obj_req)) {
opcode = CEPH_OSD_OP_TRUNCATE;
} else {
opcode = CEPH_OSD_OP_ZERO;
opcode = truncate_or_zero_opcode(obj_req);
}
if (opcode)
......@@ -1875,9 +1995,10 @@ static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
rbd_osd_req_format_write(obj_req);
}
static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
{
unsigned int num_osd_ops, which = 0;
bool need_guard;
int ret;
/* reverse map the entire object onto the parent */
......@@ -1885,33 +2006,24 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
if (ret)
return ret;
if (rbd_obj_is_entire(obj_req)) {
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
if (obj_req->num_img_extents)
num_osd_ops = 2; /* create + truncate */
else
num_osd_ops = 1; /* delete */
} else {
if (obj_req->num_img_extents) {
obj_req->write_state = RBD_OBJ_WRITE_GUARD;
num_osd_ops = 2; /* stat + truncate/zero */
} else {
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
num_osd_ops = 1; /* truncate/zero */
}
}
need_guard = rbd_obj_copyup_enabled(obj_req);
num_osd_ops = need_guard + count_zeroout_ops(obj_req);
obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
if (!obj_req->osd_req)
return -ENOMEM;
if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
if (need_guard) {
ret = __rbd_obj_setup_stat(obj_req, which++);
if (ret)
return ret;
obj_req->write_state = RBD_OBJ_WRITE_GUARD;
} else {
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
}
__rbd_obj_setup_discard(obj_req, which);
__rbd_obj_setup_zeroout(obj_req, which);
return 0;
}
......@@ -1922,10 +2034,10 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
*/
static int __rbd_img_fill_request(struct rbd_img_request *img_req)
{
struct rbd_obj_request *obj_req;
struct rbd_obj_request *obj_req, *next_obj_req;
int ret;
for_each_obj_request(img_req, obj_req) {
for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
switch (img_req->op_type) {
case OBJ_OP_READ:
ret = rbd_obj_setup_read(obj_req);
......@@ -1936,11 +2048,20 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
case OBJ_OP_DISCARD:
ret = rbd_obj_setup_discard(obj_req);
break;
case OBJ_OP_ZEROOUT:
ret = rbd_obj_setup_zeroout(obj_req);
break;
default:
rbd_assert(0);
}
if (ret)
if (ret < 0)
return ret;
if (ret > 0) {
img_req->xferred += obj_req->ex.oe_len;
img_req->pending_count--;
rbd_img_obj_request_del(img_req, obj_req);
continue;
}
ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
if (ret)
......@@ -2356,21 +2477,19 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
return true;
}
static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
#define MODS_ONLY U32_MAX
static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
u32 bytes)
{
unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
int ret;
dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
rbd_assert(bytes > 0 && bytes != MODS_ONLY);
rbd_osd_req_destroy(obj_req->osd_req);
/*
* Create a copyup request with the same number of OSD ops as
* the original request. The original request was stat + op(s),
* the new copyup request will be copyup + the same op(s).
*/
obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
if (!obj_req->osd_req)
return -ENOMEM;
......@@ -2378,27 +2497,65 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
if (ret)
return ret;
/*
* Only send non-zero copyup data to save some I/O and network
* bandwidth -- zero copyup data is equivalent to the object not
* existing.
*/
if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
bytes = 0;
}
osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
obj_req->copyup_bvecs,
obj_req->copyup_bvec_count,
bytes);
rbd_osd_req_format_write(obj_req);
switch (obj_req->img_request->op_type) {
ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
if (ret)
return ret;
rbd_obj_request_submit(obj_req);
return 0;
}
static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
{
struct rbd_img_request *img_req = obj_req->img_request;
unsigned int num_osd_ops = (bytes != MODS_ONLY);
unsigned int which = 0;
int ret;
dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
rbd_osd_req_destroy(obj_req->osd_req);
switch (img_req->op_type) {
case OBJ_OP_WRITE:
__rbd_obj_setup_write(obj_req, 1);
num_osd_ops += count_write_ops(obj_req);
break;
case OBJ_OP_DISCARD:
rbd_assert(!rbd_obj_is_entire(obj_req));
__rbd_obj_setup_discard(obj_req, 1);
case OBJ_OP_ZEROOUT:
num_osd_ops += count_zeroout_ops(obj_req);
break;
default:
rbd_assert(0);
}
obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
if (!obj_req->osd_req)
return -ENOMEM;
if (bytes != MODS_ONLY) {
ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
"copyup");
if (ret)
return ret;
osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
obj_req->copyup_bvecs,
obj_req->copyup_bvec_count,
bytes);
}
switch (img_req->op_type) {
case OBJ_OP_WRITE:
__rbd_obj_setup_write(obj_req, which);
break;
case OBJ_OP_ZEROOUT:
__rbd_obj_setup_zeroout(obj_req, which);
break;
default:
rbd_assert(0);
......@@ -2412,6 +2569,33 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
return 0;
}
static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
{
/*
* Only send non-zero copyup data to save some I/O and network
* bandwidth -- zero copyup data is equivalent to the object not
* existing.
*/
if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
bytes = 0;
}
if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
/*
* Send a copyup request with an empty snapshot context to
* deep-copyup the object through all existing snapshots.
* A second request with the current snapshot context will be
* sent for the actual modification.
*/
obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC;
return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes);
}
obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
return rbd_obj_issue_copyup_ops(obj_req, bytes);
}
static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
{
u32 i;
......@@ -2451,22 +2635,19 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
if (!obj_req->num_img_extents) {
/*
* The overlap has become 0 (most likely because the
* image has been flattened). Use rbd_obj_issue_copyup()
* to re-submit the original write request -- the copyup
* operation itself will be a no-op, since someone must
* have populated the child object while we weren't
* looking. Move to WRITE_FLAT state as we'll be done
* with the operation once the null copyup completes.
* image has been flattened). Re-submit the original write
* request -- pass MODS_ONLY since the copyup isn't needed
* anymore.
*/
obj_req->write_state = RBD_OBJ_WRITE_FLAT;
return rbd_obj_issue_copyup(obj_req, 0);
obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
}
ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
if (ret)
return ret;
obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT;
return rbd_obj_read_from_parent(obj_req);
}
......@@ -2474,7 +2655,6 @@ static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
{
int ret;
again:
switch (obj_req->write_state) {
case RBD_OBJ_WRITE_GUARD:
rbd_assert(!obj_req->xferred);
......@@ -2493,6 +2673,7 @@ static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
}
/* fall through */
case RBD_OBJ_WRITE_FLAT:
case RBD_OBJ_WRITE_COPYUP_OPS:
if (!obj_req->result)
/*
* There is no such thing as a successful short
......@@ -2500,13 +2681,24 @@ static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
*/
obj_req->xferred = obj_req->ex.oe_len;
return true;
case RBD_OBJ_WRITE_COPYUP:
obj_req->write_state = RBD_OBJ_WRITE_GUARD;
case RBD_OBJ_WRITE_READ_FROM_PARENT:
if (obj_req->result)
goto again;
return true;
rbd_assert(obj_req->xferred);
ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
if (ret) {
obj_req->result = ret;
obj_req->xferred = 0;
return true;
}
return false;
case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC:
if (obj_req->result)
return true;
obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
if (ret) {
obj_req->result = ret;
return true;
......@@ -2528,6 +2720,7 @@ static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
case OBJ_OP_WRITE:
return rbd_obj_handle_write(obj_req);
case OBJ_OP_DISCARD:
case OBJ_OP_ZEROOUT:
if (rbd_obj_handle_write(obj_req)) {
/*
* Hide -ENOENT from delete/truncate/zero -- discarding
......@@ -3640,9 +3833,11 @@ static void rbd_queue_workfn(struct work_struct *work)
switch (req_op(rq)) {
case REQ_OP_DISCARD:
case REQ_OP_WRITE_ZEROES:
op_type = OBJ_OP_DISCARD;
break;
case REQ_OP_WRITE_ZEROES:
op_type = OBJ_OP_ZEROOUT;
break;
case REQ_OP_WRITE:
op_type = OBJ_OP_WRITE;
break;
......@@ -3722,12 +3917,12 @@ static void rbd_queue_workfn(struct work_struct *work)
img_request->rq = rq;
snapc = NULL; /* img_request consumes a ref */
if (op_type == OBJ_OP_DISCARD)
if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
result = rbd_img_fill_nodata(img_request, offset, length);
else
result = rbd_img_fill_from_bio(img_request, offset, length,
rq->bio);
if (result)
if (result || !img_request->pending_count)
goto err_img_request;
rbd_img_request_submit(img_request);
......@@ -5388,6 +5583,7 @@ static int rbd_add_parse_args(const char *buf,
pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
......@@ -5795,14 +5991,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
goto err_out_probe;
/*
* Need to warn users if this image is the one being
* mapped and has a parent.
*/
if (!depth && rbd_dev->parent_spec)
rbd_warn(rbd_dev,
"WARNING: kernel layering is EXPERIMENTAL!");
}
ret = rbd_dev_probe_parent(rbd_dev, depth);
......@@ -5885,6 +6073,12 @@ static ssize_t do_rbd_add(struct bus_type *bus,
if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
rbd_dev->opts->read_only = true;
if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
rbd_warn(rbd_dev, "alloc_size adjusted to %u",
rbd_dev->layout.object_size);
rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
}
rc = rbd_dev_device_setup(rbd_dev);
if (rc)
goto err_out_image_probe;
......
......@@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc)
spin_unlock(&mdsc->caps_list_lock);
}
void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
struct ceph_mount_options *fsopt)
{
spin_lock(&mdsc->caps_list_lock);
mdsc->caps_min_count += delta;
BUG_ON(mdsc->caps_min_count < 0);
mdsc->caps_min_count = fsopt->max_readdir;
if (mdsc->caps_min_count < 1024)
mdsc->caps_min_count = 1024;
mdsc->caps_use_max = fsopt->caps_max;
if (mdsc->caps_use_max > 0 &&
mdsc->caps_use_max < mdsc->caps_min_count)
mdsc->caps_use_max = mdsc->caps_min_count;
spin_unlock(&mdsc->caps_list_lock);
}
......@@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
if (!err) {
BUG_ON(have + alloc != need);
ctx->count = need;
ctx->used = 0;
}
spin_lock(&mdsc->caps_list_lock);
......@@ -297,11 +304,22 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx)
{
bool reclaim = false;
if (!ctx->count)
return;
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
spin_lock(&mdsc->caps_list_lock);
__ceph_unreserve_caps(mdsc, ctx->count);
ctx->count = 0;
if (mdsc->caps_use_max > 0 &&
mdsc->caps_use_count > mdsc->caps_use_max)
reclaim = true;
spin_unlock(&mdsc->caps_list_lock);
if (reclaim)
ceph_reclaim_caps_nr(mdsc, ctx->used);
}
struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
......@@ -346,6 +364,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
ctx->used++;
mdsc->caps_reserve_count--;
mdsc->caps_use_count++;
......@@ -500,12 +519,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
{
struct ceph_mount_options *ma = mdsc->fsc->mount_options;
struct ceph_mount_options *opt = mdsc->fsc->mount_options;
ci->i_hold_caps_min = round_jiffies(jiffies +
ma->caps_wanted_delay_min * HZ);
opt->caps_wanted_delay_min * HZ);
ci->i_hold_caps_max = round_jiffies(jiffies +
ma->caps_wanted_delay_max * HZ);
opt->caps_wanted_delay_max * HZ);
dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
}
......@@ -657,6 +676,10 @@ void ceph_add_cap(struct inode *inode,
session->s_nr_caps++;
spin_unlock(&session->s_cap_lock);
} else {
spin_lock(&session->s_cap_lock);
list_move_tail(&cap->session_caps, &session->s_caps);
spin_unlock(&session->s_cap_lock);
if (cap->cap_gen < session->s_cap_gen)
cap->issued = cap->implemented = CEPH_CAP_PIN;
......@@ -1081,9 +1104,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
(!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
cap->queue_release = 1;
if (removed) {
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
session->s_num_cap_releases++;
__ceph_queue_cap_release(session, cap);
removed = 0;
}
} else {
......@@ -1245,7 +1266,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
* Queue cap releases when an inode is dropped from our cache. Since
* inode is about to be destroyed, there is no need for i_ceph_lock.
*/
void ceph_queue_caps_release(struct inode *inode)
void __ceph_remove_caps(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;
......@@ -2393,6 +2414,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) {
ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
/* encode_caps_cb() also will reset these sequence
* numbers. make sure sequence numbers in cap flush
* message match later reconnect message */
cap->seq = 0;
cap->issue_seq = 0;
cap->mseq = 0;
__kick_flushing_caps(mdsc, session, ci,
oldest_flush_tid);
} else {
......@@ -3880,12 +3907,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap->seq = seq;
cap->issue_seq = seq;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
session->s_num_cap_releases++;
__ceph_queue_cap_release(session, cap);
spin_unlock(&session->s_cap_lock);
}
goto flush_cap_releases;
goto done;
}
/* these will work even if we don't have a cap yet */
......@@ -3955,7 +3980,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_cap_op_name(op));
}
goto done;
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(extra_info.pool_ns);
return;
flush_cap_releases:
/*
......@@ -3963,14 +3993,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
* along for the mds (who clearly thinks we still have this
* cap).
*/
ceph_send_cap_releases(mdsc, session);
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(extra_info.pool_ns);
return;
ceph_flush_cap_releases(mdsc, session);
goto done;
bad:
pr_err("ceph_handle_caps: corrupt message\n");
......
......@@ -139,23 +139,6 @@ static int caps_show(struct seq_file *s, void *p)
return 0;
}
static int dentry_lru_show(struct seq_file *s, void *ptr)
{
struct ceph_fs_client *fsc = s->private;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_dentry_info *di;
spin_lock(&mdsc->dentry_lru_lock);
list_for_each_entry(di, &mdsc->dentry_lru, lru) {
struct dentry *dentry = di->dentry;
seq_printf(s, "%p %p\t%pd\n",
di, dentry, dentry);
}
spin_unlock(&mdsc->dentry_lru_lock);
return 0;
}
static int mds_sessions_show(struct seq_file *s, void *ptr)
{
struct ceph_fs_client *fsc = s->private;
......@@ -195,7 +178,6 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
CEPH_DEFINE_SHOW_FUNC(mdsmap_show)
CEPH_DEFINE_SHOW_FUNC(mdsc_show)
CEPH_DEFINE_SHOW_FUNC(caps_show)
CEPH_DEFINE_SHOW_FUNC(dentry_lru_show)
CEPH_DEFINE_SHOW_FUNC(mds_sessions_show)
......@@ -231,7 +213,6 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
debugfs_remove(fsc->debugfs_mds_sessions);
debugfs_remove(fsc->debugfs_caps);
debugfs_remove(fsc->debugfs_mdsc);
debugfs_remove(fsc->debugfs_dentry_lru);
}
int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
......@@ -291,14 +272,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
if (!fsc->debugfs_caps)
goto out;
fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru",
0400,
fsc->client->debugfs_dir,
fsc,
&dentry_lru_show_fops);
if (!fsc->debugfs_dentry_lru)
goto out;
return 0;
out:
......
......@@ -29,6 +29,9 @@
const struct dentry_operations ceph_dentry_ops;
static bool __dentry_lease_is_valid(struct ceph_dentry_info *di);
static int __dir_lease_try_check(const struct dentry *dentry);
/*
* Initialize ceph dentry state.
*/
......@@ -44,7 +47,7 @@ static int ceph_d_init(struct dentry *dentry)
di->lease_session = NULL;
di->time = jiffies;
dentry->d_fsdata = di;
ceph_dentry_lru_add(dentry);
INIT_LIST_HEAD(&di->lease_list);
return 0;
}
......@@ -241,6 +244,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
goto out;
}
if (fpos_cmp(ctx->pos, di->offset) <= 0) {
__ceph_dentry_dir_lease_touch(di);
emit_dentry = true;
}
spin_unlock(&dentry->d_lock);
......@@ -1124,14 +1128,278 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
return err;
}
/*
* Move dentry to tail of mdsc->dentry_leases list when lease is updated.
* Leases at front of the list will expire first. (Assume all leases have
* similar duration)
*
* Called under dentry->d_lock.
*/
void __ceph_dentry_lease_touch(struct ceph_dentry_info *di)
{
struct dentry *dn = di->dentry;
struct ceph_mds_client *mdsc;
dout("dentry_lease_touch %p %p '%pd'\n", di, dn, dn);
di->flags |= CEPH_DENTRY_LEASE_LIST;
if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
di->flags |= CEPH_DENTRY_REFERENCED;
return;
}
mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
spin_lock(&mdsc->dentry_list_lock);
list_move_tail(&di->lease_list, &mdsc->dentry_leases);
spin_unlock(&mdsc->dentry_list_lock);
}
static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc,
struct ceph_dentry_info *di)
{
di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED);
di->lease_gen = 0;
di->time = jiffies;
list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases);
}
/*
* When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases
* list if it's not in the list, otherwise set 'referenced' flag.
*
* Called under dentry->d_lock.
*/
void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di)
{
struct dentry *dn = di->dentry;
struct ceph_mds_client *mdsc;
dout("dentry_dir_lease_touch %p %p '%pd' (offset %lld)\n",
di, dn, dn, di->offset);
if (!list_empty(&di->lease_list)) {
if (di->flags & CEPH_DENTRY_LEASE_LIST) {
/* don't remove dentry from dentry lease list
* if its lease is valid */
if (__dentry_lease_is_valid(di))
return;
} else {
di->flags |= CEPH_DENTRY_REFERENCED;
return;
}
}
if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
di->flags |= CEPH_DENTRY_REFERENCED;
di->flags &= ~CEPH_DENTRY_LEASE_LIST;
return;
}
mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
spin_lock(&mdsc->dentry_list_lock);
__dentry_dir_lease_touch(mdsc, di),
spin_unlock(&mdsc->dentry_list_lock);
}
static void __dentry_lease_unlist(struct ceph_dentry_info *di)
{
struct ceph_mds_client *mdsc;
if (di->flags & CEPH_DENTRY_SHRINK_LIST)
return;
if (list_empty(&di->lease_list))
return;
mdsc = ceph_sb_to_client(di->dentry->d_sb)->mdsc;
spin_lock(&mdsc->dentry_list_lock);
list_del_init(&di->lease_list);
spin_unlock(&mdsc->dentry_list_lock);
}
enum {
KEEP = 0,
DELETE = 1,
TOUCH = 2,
STOP = 4,
};
struct ceph_lease_walk_control {
bool dir_lease;
bool expire_dir_lease;
unsigned long nr_to_scan;
unsigned long dir_lease_ttl;
};
static unsigned long
__dentry_leases_walk(struct ceph_mds_client *mdsc,
struct ceph_lease_walk_control *lwc,
int (*check)(struct dentry*, void*))
{
struct ceph_dentry_info *di, *tmp;
struct dentry *dentry, *last = NULL;
struct list_head* list;
LIST_HEAD(dispose);
unsigned long freed = 0;
int ret = 0;
list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases;
spin_lock(&mdsc->dentry_list_lock);
list_for_each_entry_safe(di, tmp, list, lease_list) {
if (!lwc->nr_to_scan)
break;
--lwc->nr_to_scan;
dentry = di->dentry;
if (last == dentry)
break;
if (!spin_trylock(&dentry->d_lock))
continue;
if (dentry->d_lockref.count < 0) {
list_del_init(&di->lease_list);
goto next;
}
ret = check(dentry, lwc);
if (ret & TOUCH) {
/* move it into tail of dir lease list */
__dentry_dir_lease_touch(mdsc, di);
if (!last)
last = dentry;
}
if (ret & DELETE) {
/* stale lease */
di->flags &= ~CEPH_DENTRY_REFERENCED;
if (dentry->d_lockref.count > 0) {
/* update_dentry_lease() will re-add
* it to lease list, or
* ceph_d_delete() will return 1 when
* last reference is dropped */
list_del_init(&di->lease_list);
} else {
di->flags |= CEPH_DENTRY_SHRINK_LIST;
list_move_tail(&di->lease_list, &dispose);
dget_dlock(dentry);
}
}
next:
spin_unlock(&dentry->d_lock);
if (ret & STOP)
break;
}
spin_unlock(&mdsc->dentry_list_lock);
while (!list_empty(&dispose)) {
di = list_first_entry(&dispose, struct ceph_dentry_info,
lease_list);
dentry = di->dentry;
spin_lock(&dentry->d_lock);
list_del_init(&di->lease_list);
di->flags &= ~CEPH_DENTRY_SHRINK_LIST;
if (di->flags & CEPH_DENTRY_REFERENCED) {
spin_lock(&mdsc->dentry_list_lock);
if (di->flags & CEPH_DENTRY_LEASE_LIST) {
list_add_tail(&di->lease_list,
&mdsc->dentry_leases);
} else {
__dentry_dir_lease_touch(mdsc, di);
}
spin_unlock(&mdsc->dentry_list_lock);
} else {
freed++;
}
spin_unlock(&dentry->d_lock);
/* ceph_d_delete() does the trick */
dput(dentry);
}
return freed;
}
static int __dentry_lease_check(struct dentry *dentry, void *arg)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
int ret;
if (__dentry_lease_is_valid(di))
return STOP;
ret = __dir_lease_try_check(dentry);
if (ret == -EBUSY)
return KEEP;
if (ret > 0)
return TOUCH;
return DELETE;
}
static int __dir_lease_check(struct dentry *dentry, void *arg)
{
struct ceph_lease_walk_control *lwc = arg;
struct ceph_dentry_info *di = ceph_dentry(dentry);
int ret = __dir_lease_try_check(dentry);
if (ret == -EBUSY)
return KEEP;
if (ret > 0) {
if (time_before(jiffies, di->time + lwc->dir_lease_ttl))
return STOP;
/* Move dentry to tail of dir lease list if we don't want
* to delete it. So dentries in the list are checked in a
* round robin manner */
if (!lwc->expire_dir_lease)
return TOUCH;
if (dentry->d_lockref.count > 0 ||
(di->flags & CEPH_DENTRY_REFERENCED))
return TOUCH;
/* invalidate dir lease */
di->lease_shared_gen = 0;
}
return DELETE;
}
int ceph_trim_dentries(struct ceph_mds_client *mdsc)
{
struct ceph_lease_walk_control lwc;
unsigned long count;
unsigned long freed;
spin_lock(&mdsc->caps_list_lock);
if (mdsc->caps_use_max > 0 &&
mdsc->caps_use_count > mdsc->caps_use_max)
count = mdsc->caps_use_count - mdsc->caps_use_max;
else
count = 0;
spin_unlock(&mdsc->caps_list_lock);
lwc.dir_lease = false;
lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE * 2;
freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check);
if (!lwc.nr_to_scan) /* more invalid leases */
return -EAGAIN;
if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE)
lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE;
lwc.dir_lease = true;
lwc.expire_dir_lease = freed < count;
lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ;
freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check);
if (!lwc.nr_to_scan) /* more to check */
return -EAGAIN;
return freed > 0 ? 1 : 0;
}
/*
* Ensure a dentry lease will no longer revalidate.
*/
void ceph_invalidate_dentry_lease(struct dentry *dentry)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock);
ceph_dentry(dentry)->time = jiffies;
ceph_dentry(dentry)->lease_shared_gen = 0;
di->time = jiffies;
di->lease_shared_gen = 0;
__dentry_lease_unlist(di);
spin_unlock(&dentry->d_lock);
}
......@@ -1139,30 +1407,45 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
* Check if dentry lease is valid. If not, delete the lease. Try to
* renew if the least is more than half up.
*/
static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
{
struct ceph_mds_session *session;
if (!di->lease_gen)
return false;
session = di->lease_session;
if (session) {
u32 gen;
unsigned long ttl;
spin_lock(&session->s_gen_ttl_lock);
gen = session->s_cap_gen;
ttl = session->s_cap_ttl;
spin_unlock(&session->s_gen_ttl_lock);
if (di->lease_gen == gen &&
time_before(jiffies, ttl) &&
time_before(jiffies, di->time))
return true;
}
di->lease_gen = 0;
return false;
}
static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
struct inode *dir)
{
struct ceph_dentry_info *di;
struct ceph_mds_session *s;
int valid = 0;
u32 gen;
unsigned long ttl;
struct ceph_mds_session *session = NULL;
u32 seq = 0;
int valid = 0;
spin_lock(&dentry->d_lock);
di = ceph_dentry(dentry);
if (di && di->lease_session) {
s = di->lease_session;
spin_lock(&s->s_gen_ttl_lock);
gen = s->s_cap_gen;
ttl = s->s_cap_ttl;
spin_unlock(&s->s_gen_ttl_lock);
if (di->lease_gen == gen &&
time_before(jiffies, di->time) &&
time_before(jiffies, ttl)) {
if (di && __dentry_lease_is_valid(di)) {
valid = 1;
if (di->lease_renew_after &&
time_after(jiffies, di->lease_renew_after)) {
/*
......@@ -1173,14 +1456,13 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
if (flags & LOOKUP_RCU) {
valid = -ECHILD;
} else {
session = ceph_get_mds_session(s);
session = ceph_get_mds_session(di->lease_session);
seq = di->lease_seq;
di->lease_renew_after = 0;
di->lease_renew_from = jiffies;
}
}
}
}
spin_unlock(&dentry->d_lock);
if (session) {
......@@ -1192,6 +1474,38 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
return valid;
}
/*
* Called under dentry->d_lock.
*/
static int __dir_lease_try_check(const struct dentry *dentry)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
struct inode *dir;
struct ceph_inode_info *ci;
int valid = 0;
if (!di->lease_shared_gen)
return 0;
if (IS_ROOT(dentry))
return 0;
dir = d_inode(dentry->d_parent);
ci = ceph_inode(dir);
if (spin_trylock(&ci->i_ceph_lock)) {
if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0))
valid = 1;
spin_unlock(&ci->i_ceph_lock);
} else {
valid = -EBUSY;
}
if (!valid)
di->lease_shared_gen = 0;
return valid;
}
/*
* Check if directory-wide content lease/cap is valid.
*/
......@@ -1205,6 +1519,8 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen)
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
spin_unlock(&ci->i_ceph_lock);
if (valid)
__ceph_dentry_dir_lease_touch(di);
dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
dir, (unsigned)atomic_read(&ci->i_shared_gen),
dentry, (unsigned)di->lease_shared_gen, valid);
......@@ -1297,17 +1613,39 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
}
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
if (valid) {
ceph_dentry_lru_touch(dentry);
} else {
if (!valid)
ceph_dir_clear_complete(dir);
}
if (!(flags & LOOKUP_RCU))
dput(parent);
return valid;
}
/*
* Delete unused dentry that doesn't have valid lease
*
* Called under dentry->d_lock.
*/
static int ceph_d_delete(const struct dentry *dentry)
{
struct ceph_dentry_info *di;
/* won't release caps */
if (d_really_is_negative(dentry))
return 0;
if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
return 0;
/* vaild lease? */
di = ceph_dentry(dentry);
if (di) {
if (__dentry_lease_is_valid(di))
return 0;
if (__dir_lease_try_check(dentry))
return 0;
}
return 1;
}
/*
* Release our ceph_dentry_info.
*/
......@@ -1316,9 +1654,9 @@ static void ceph_d_release(struct dentry *dentry)
struct ceph_dentry_info *di = ceph_dentry(dentry);
dout("d_release %p\n", dentry);
ceph_dentry_lru_del(dentry);
spin_lock(&dentry->d_lock);
__dentry_lease_unlist(di);
dentry->d_fsdata = NULL;
spin_unlock(&dentry->d_lock);
......@@ -1419,49 +1757,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
return size - left;
}
/*
* We maintain a private dentry LRU.
*
* FIXME: this needs to be changed to a per-mds lru to be useful.
*/
void ceph_dentry_lru_add(struct dentry *dn)
{
struct ceph_dentry_info *di = ceph_dentry(dn);
struct ceph_mds_client *mdsc;
dout("dentry_lru_add %p %p '%pd'\n", di, dn, dn);
mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
spin_lock(&mdsc->dentry_lru_lock);
list_add_tail(&di->lru, &mdsc->dentry_lru);
mdsc->num_dentry++;
spin_unlock(&mdsc->dentry_lru_lock);
}
void ceph_dentry_lru_touch(struct dentry *dn)
{
struct ceph_dentry_info *di = ceph_dentry(dn);
struct ceph_mds_client *mdsc;
dout("dentry_lru_touch %p %p '%pd' (offset %lld)\n", di, dn, dn,
di->offset);
mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
spin_lock(&mdsc->dentry_lru_lock);
list_move_tail(&di->lru, &mdsc->dentry_lru);
spin_unlock(&mdsc->dentry_lru_lock);
}
void ceph_dentry_lru_del(struct dentry *dn)
{
struct ceph_dentry_info *di = ceph_dentry(dn);
struct ceph_mds_client *mdsc;
dout("dentry_lru_del %p %p '%pd'\n", di, dn, dn);
mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
spin_lock(&mdsc->dentry_lru_lock);
list_del_init(&di->lru);
mdsc->num_dentry--;
spin_unlock(&mdsc->dentry_lru_lock);
}
/*
* Return name hash for a given dentry. This is dependent on
......@@ -1531,6 +1827,7 @@ const struct inode_operations ceph_snapdir_iops = {
const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_delete = ceph_d_delete,
.d_release = ceph_d_release,
.d_prune = ceph_d_prune,
.d_init = ceph_d_init,
......
......@@ -590,7 +590,8 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
* but it will at least behave sensibly when they are
* in sequence.
*/
ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
ret = filemap_write_and_wait_range(inode->i_mapping,
off, off + len - 1);
if (ret < 0)
return ret;
......@@ -929,14 +930,15 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
(write ? "write" : "read"), file, pos, (unsigned)count,
snapc, snapc->seq);
ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
ret = filemap_write_and_wait_range(inode->i_mapping,
pos, pos + count - 1);
if (ret < 0)
return ret;
if (write) {
int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_SHIFT,
(pos + count) >> PAGE_SHIFT);
(pos + count - 1) >> PAGE_SHIFT);
if (ret2 < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret2);
......@@ -1132,13 +1134,14 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
dout("sync_write on file %p %lld~%u snapc %p seq %lld\n",
file, pos, (unsigned)count, snapc, snapc->seq);
ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
ret = filemap_write_and_wait_range(inode->i_mapping,
pos, pos + count - 1);
if (ret < 0)
return ret;
ret = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_SHIFT,
(pos + count) >> PAGE_SHIFT);
(pos + count - 1) >> PAGE_SHIFT);
if (ret < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
......
......@@ -497,7 +497,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_wrbuffer_ref = 0;
ci->i_wrbuffer_ref_head = 0;
atomic_set(&ci->i_filelock_ref, 0);
atomic_set(&ci->i_shared_gen, 0);
atomic_set(&ci->i_shared_gen, 1);
ci->i_rdcache_gen = 0;
ci->i_rdcache_revoking = 0;
......@@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
ceph_fscache_unregister_inode_cookie(ci);
ceph_queue_caps_release(inode);
__ceph_remove_caps(inode);
if (__ceph_has_any_quota(ci))
ceph_adjust_quota_realms_count(inode, false);
......@@ -548,10 +548,11 @@ void ceph_destroy_inode(struct inode *inode)
*/
if (ci->i_snap_realm) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
ceph_inode_to_client(inode)->mdsc;
if (ceph_snap(inode) == CEPH_NOSNAP) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
dout(" dropping residual ref to snap realm %p\n", realm);
dout(" dropping residual ref to snap realm %p\n",
realm);
spin_lock(&realm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item);
ci->i_snap_realm = NULL;
......@@ -559,6 +560,10 @@ void ceph_destroy_inode(struct inode *inode)
realm->inode = NULL;
spin_unlock(&realm->inodes_with_caps_lock);
ceph_put_snap_realm(mdsc, realm);
} else {
ceph_put_snapid_map(mdsc, ci->i_snapid_map);
ci->i_snap_realm = NULL;
}
}
kfree(ci->i_symlink);
......@@ -776,6 +781,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
iinfo->pool_ns_len);
if (ceph_snap(inode) != CEPH_NOSNAP && !ci->i_snapid_map)
ci->i_snapid_map = ceph_get_snapid_map(mdsc, ceph_snap(inode));
spin_lock(&ci->i_ceph_lock);
/*
......@@ -869,6 +877,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ci->i_rbytes = le64_to_cpu(info->rbytes);
ci->i_rfiles = le64_to_cpu(info->rfiles);
ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
ci->i_dir_pin = iinfo->dir_pin;
ceph_decode_timespec64(&ci->i_rctime, &info->rctime);
}
}
......@@ -899,6 +908,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
case S_IFBLK:
case S_IFCHR:
case S_IFSOCK:
inode->i_blkbits = PAGE_SHIFT;
init_special_inode(inode, inode->i_mode, inode->i_rdev);
inode->i_op = &ceph_file_iops;
break;
......@@ -1066,9 +1076,10 @@ static void update_dentry_lease(struct dentry *dentry,
goto out_unlock;
di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
if (duration == 0)
if (duration == 0) {
__ceph_dentry_dir_lease_touch(di);
goto out_unlock;
}
if (di->lease_gen == session->s_cap_gen &&
time_before(ttl, di->time))
......@@ -1079,8 +1090,6 @@ static void update_dentry_lease(struct dentry *dentry,
di->lease_session = NULL;
}
ceph_dentry_lru_touch(dentry);
if (!di->lease_session)
di->lease_session = ceph_get_mds_session(session);
di->lease_gen = session->s_cap_gen;
......@@ -1088,6 +1097,8 @@ static void update_dentry_lease(struct dentry *dentry,
di->lease_renew_after = half_ttl;
di->lease_renew_from = 0;
di->time = ttl;
__ceph_dentry_lease_touch(di);
out_unlock:
spin_unlock(&dentry->d_lock);
if (old_lease_session)
......@@ -2259,10 +2270,11 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
if (!err) {
generic_fillattr(inode, stat);
stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
if (ceph_snap(inode) != CEPH_NOSNAP)
stat->dev = ceph_snap(inode);
if (ceph_snap(inode) == CEPH_NOSNAP)
stat->dev = inode->i_sb->s_dev;
else
stat->dev = 0;
stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
if (S_ISDIR(inode->i_mode)) {
if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
RBYTES))
......
......@@ -20,6 +20,8 @@
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>
#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
/*
* A cluster of MDS (metadata server) daemons is responsible for
* managing the file system namespace (the directory hierarchy and
......@@ -46,13 +48,17 @@
*/
struct ceph_reconnect_state {
int nr_caps;
struct ceph_mds_session *session;
int nr_caps, nr_realms;
struct ceph_pagelist *pagelist;
unsigned msg_version;
bool allow_multi;
};
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
static void ceph_cap_release_work(struct work_struct *work);
static void ceph_cap_reclaim_work(struct work_struct *work);
static const struct ceph_connection_operations mds_con_ops;
......@@ -61,6 +67,29 @@ static const struct ceph_connection_operations mds_con_ops;
* mds reply parsing
*/
static int parse_reply_info_quota(void **p, void *end,
struct ceph_mds_reply_info_in *info)
{
u8 struct_v, struct_compat;
u32 struct_len;
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
/* struct_v is expected to be >= 1. we only
* understand encoding with struct_compat == 1. */
if (!struct_v || struct_compat != 1)
goto bad;
ceph_decode_32_safe(p, end, struct_len, bad);
ceph_decode_need(p, end, struct_len, bad);
end = *p + struct_len;
ceph_decode_64_safe(p, end, info->max_bytes, bad);
ceph_decode_64_safe(p, end, info->max_files, bad);
*p = end;
return 0;
bad:
return -EIO;
}
/*
* parse individual inode info
*/
......@@ -68,8 +97,24 @@ static int parse_reply_info_in(void **p, void *end,
struct ceph_mds_reply_info_in *info,
u64 features)
{
int err = -EIO;
int err = 0;
u8 struct_v = 0;
if (features == (u64)-1) {
u32 struct_len;
u8 struct_compat;
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
/* struct_v is expected to be >= 1. we only understand
* encoding with struct_compat == 1. */
if (!struct_v || struct_compat != 1)
goto bad;
ceph_decode_32_safe(p, end, struct_len, bad);
ceph_decode_need(p, end, struct_len, bad);
end = *p + struct_len;
}
ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
info->in = *p;
*p += sizeof(struct ceph_mds_reply_inode) +
sizeof(*info->in->fragtree.splits) *
......@@ -87,6 +132,42 @@ static int parse_reply_info_in(void **p, void *end,
info->xattr_data = *p;
*p += info->xattr_len;
if (features == (u64)-1) {
/* inline data */
ceph_decode_64_safe(p, end, info->inline_version, bad);
ceph_decode_32_safe(p, end, info->inline_len, bad);
ceph_decode_need(p, end, info->inline_len, bad);
info->inline_data = *p;
*p += info->inline_len;
/* quota */
err = parse_reply_info_quota(p, end, info);
if (err < 0)
goto out_bad;
/* pool namespace */
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
if (info->pool_ns_len > 0) {
ceph_decode_need(p, end, info->pool_ns_len, bad);
info->pool_ns_data = *p;
*p += info->pool_ns_len;
}
/* btime, change_attr */
{
struct ceph_timespec btime;
u64 change_attr;
ceph_decode_need(p, end, sizeof(btime), bad);
ceph_decode_copy(p, &btime, sizeof(btime));
ceph_decode_64_safe(p, end, change_attr, bad);
}
/* dir pin */
if (struct_v >= 2) {
ceph_decode_32_safe(p, end, info->dir_pin, bad);
} else {
info->dir_pin = -ENODATA;
}
*p = end;
} else {
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
ceph_decode_64_safe(p, end, info->inline_version, bad);
ceph_decode_32_safe(p, end, info->inline_len, bad);
......@@ -97,20 +178,9 @@ static int parse_reply_info_in(void **p, void *end,
info->inline_version = CEPH_INLINE_NONE;
if (features & CEPH_FEATURE_MDS_QUOTA) {
u8 struct_v, struct_compat;
u32 struct_len;
/*
* both struct_v and struct_compat are expected to be >= 1
*/
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
if (!struct_v || !struct_compat)
goto bad;
ceph_decode_32_safe(p, end, struct_len, bad);
ceph_decode_need(p, end, struct_len, bad);
ceph_decode_64_safe(p, end, info->max_bytes, bad);
ceph_decode_64_safe(p, end, info->max_files, bad);
err = parse_reply_info_quota(p, end, info);
if (err < 0)
goto out_bad;
} else {
info->max_bytes = 0;
info->max_files = 0;
......@@ -127,11 +197,73 @@ static int parse_reply_info_in(void **p, void *end,
}
}
info->dir_pin = -ENODATA;
}
return 0;
bad:
err = -EIO;
out_bad:
return err;
}
static int parse_reply_info_dir(void **p, void *end,
struct ceph_mds_reply_dirfrag **dirfrag,
u64 features)
{
if (features == (u64)-1) {
u8 struct_v, struct_compat;
u32 struct_len;
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
/* struct_v is expected to be >= 1. we only understand
* encoding whose struct_compat == 1. */
if (!struct_v || struct_compat != 1)
goto bad;
ceph_decode_32_safe(p, end, struct_len, bad);
ceph_decode_need(p, end, struct_len, bad);
end = *p + struct_len;
}
ceph_decode_need(p, end, sizeof(**dirfrag), bad);
*dirfrag = *p;
*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
if (unlikely(*p > end))
goto bad;
if (features == (u64)-1)
*p = end;
return 0;
bad:
return -EIO;
}
static int parse_reply_info_lease(void **p, void *end,
struct ceph_mds_reply_lease **lease,
u64 features)
{
if (features == (u64)-1) {
u8 struct_v, struct_compat;
u32 struct_len;
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
/* struct_v is expected to be >= 1. we only understand
* encoding whose struct_compat == 1. */
if (!struct_v || struct_compat != 1)
goto bad;
ceph_decode_32_safe(p, end, struct_len, bad);
ceph_decode_need(p, end, struct_len, bad);
end = *p + struct_len;
}
ceph_decode_need(p, end, sizeof(**lease), bad);
*lease = *p;
*p += sizeof(**lease);
if (features == (u64)-1)
*p = end;
return 0;
bad:
return -EIO;
}
/*
* parse a normal reply, which may contain a (dir+)dentry and/or a
* target inode.
......@@ -147,20 +279,18 @@ static int parse_reply_info_trace(void **p, void *end,
if (err < 0)
goto out_bad;
if (unlikely(*p + sizeof(*info->dirfrag) > end))
goto bad;
info->dirfrag = *p;
*p += sizeof(*info->dirfrag) +
sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
if (unlikely(*p > end))
goto bad;
err = parse_reply_info_dir(p, end, &info->dirfrag, features);
if (err < 0)
goto out_bad;
ceph_decode_32_safe(p, end, info->dname_len, bad);
ceph_decode_need(p, end, info->dname_len, bad);
info->dname = *p;
*p += info->dname_len;
info->dlease = *p;
*p += sizeof(*info->dlease);
err = parse_reply_info_lease(p, end, &info->dlease, features);
if (err < 0)
goto out_bad;
}
if (info->head->is_target) {
......@@ -183,20 +313,16 @@ static int parse_reply_info_trace(void **p, void *end,
/*
* parse readdir results
*/
static int parse_reply_info_dir(void **p, void *end,
static int parse_reply_info_readdir(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
u32 num, i = 0;
int err;
info->dir_dir = *p;
if (*p + sizeof(*info->dir_dir) > end)
goto bad;
*p += sizeof(*info->dir_dir) +
sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
if (*p > end)
goto bad;
err = parse_reply_info_dir(p, end, &info->dir_dir, features);
if (err < 0)
goto out_bad;
ceph_decode_need(p, end, sizeof(num) + 2, bad);
num = ceph_decode_32(p);
......@@ -222,15 +348,16 @@ static int parse_reply_info_dir(void **p, void *end,
while (num) {
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
/* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad);
rde->name_len = ceph_decode_32(p);
ceph_decode_32_safe(p, end, rde->name_len, bad);
ceph_decode_need(p, end, rde->name_len, bad);
rde->name = *p;
*p += rde->name_len;
dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
rde->lease = *p;
*p += sizeof(struct ceph_mds_reply_lease);
/* dentry lease */
err = parse_reply_info_lease(p, end, &rde->lease, features);
if (err)
goto out_bad;
/* inode */
err = parse_reply_info_in(p, end, &rde->inode, features);
if (err < 0)
......@@ -281,7 +408,8 @@ static int parse_reply_info_create(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
if (features == (u64)-1 ||
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
if (*p == end) {
info->has_create_ino = false;
} else {
......@@ -310,7 +438,7 @@ static int parse_reply_info_extra(void **p, void *end,
if (op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info, features);
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
return parse_reply_info_dir(p, end, info, features);
return parse_reply_info_readdir(p, end, info, features);
else if (op == CEPH_MDS_OP_CREATE)
return parse_reply_info_create(p, end, info, features);
else
......@@ -494,7 +622,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
spin_lock_init(&s->s_gen_ttl_lock);
s->s_cap_gen = 0;
s->s_cap_gen = 1;
s->s_cap_ttl = jiffies - 1;
spin_lock_init(&s->s_cap_lock);
......@@ -510,6 +638,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
INIT_LIST_HEAD(&s->s_cap_flushing);
mdsc->sessions[mds] = s;
......@@ -535,6 +665,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
s->s_state = 0;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
atomic_dec(&mdsc->num_sessions);
......@@ -1197,14 +1328,11 @@ static int iterate_session_caps(struct ceph_mds_session *session,
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
if (cap->queue_release) {
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
session->s_num_cap_releases++;
} else {
if (cap->queue_release)
__ceph_queue_cap_release(session, cap);
else
old_cap = cap; /* put_cap it w/o locks held */
}
}
if (ret < 0)
goto out;
}
......@@ -1638,7 +1766,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
session->s_trim_caps = 0;
}
ceph_send_cap_releases(mdsc, session);
ceph_flush_cap_releases(mdsc, session);
return 0;
}
......@@ -1681,7 +1809,7 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
/*
* called under s_mutex
*/
void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_msg *msg = NULL;
......@@ -1774,6 +1902,81 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
spin_unlock(&session->s_cap_lock);
}
static void ceph_cap_release_work(struct work_struct *work)
{
struct ceph_mds_session *session =
container_of(work, struct ceph_mds_session, s_cap_release_work);
mutex_lock(&session->s_mutex);
if (session->s_state == CEPH_MDS_SESSION_OPEN ||
session->s_state == CEPH_MDS_SESSION_HUNG)
ceph_send_cap_releases(session->s_mdsc, session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
}
void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
if (mdsc->stopping)
return;
get_session(session);
if (queue_work(mdsc->fsc->cap_wq,
&session->s_cap_release_work)) {
dout("cap release work queued\n");
} else {
ceph_put_mds_session(session);
dout("failed to queue cap release work\n");
}
}
/*
* caller holds session->s_cap_lock
*/
void __ceph_queue_cap_release(struct ceph_mds_session *session,
struct ceph_cap *cap)
{
list_add_tail(&cap->session_caps, &session->s_cap_releases);
session->s_num_cap_releases++;
if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
ceph_flush_cap_releases(session->s_mdsc, session);
}
static void ceph_cap_reclaim_work(struct work_struct *work)
{
struct ceph_mds_client *mdsc =
container_of(work, struct ceph_mds_client, cap_reclaim_work);
int ret = ceph_trim_dentries(mdsc);
if (ret == -EAGAIN)
ceph_queue_cap_reclaim_work(mdsc);
}
void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
{
if (mdsc->stopping)
return;
if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
dout("caps reclaim work queued\n");
} else {
dout("failed to queue caps release work\n");
}
}
void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
{
int val;
if (!nr)
return;
val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
if (!(val % CEPH_CAPS_PER_RELEASE)) {
atomic_set(&mdsc->cap_reclaim_pending, 0);
ceph_queue_cap_reclaim_work(mdsc);
}
}
/*
* requests
*/
......@@ -2653,6 +2856,9 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
dout("handle_reply tid %lld result %d\n", tid, result);
rinfo = &req->r_reply_info;
if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
err = parse_reply_info(msg, rinfo, (u64)-1);
else
err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
mutex_unlock(&mdsc->mutex);
......@@ -2684,7 +2890,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP))
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
current->journal_info = NULL;
mutex_unlock(&req->r_fill_mutex);
......@@ -2693,13 +2898,19 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (realm)
ceph_put_snap_realm(mdsc, realm);
if (err == 0 && req->r_target_inode &&
if (err == 0) {
if (req->r_target_inode &&
test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
struct ceph_inode_info *ci =
ceph_inode(req->r_target_inode);
spin_lock(&ci->i_unsafe_lock);
list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
list_add_tail(&req->r_unsafe_target_item,
&ci->i_unsafe_iops);
spin_unlock(&ci->i_unsafe_lock);
}
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
out_err:
mutex_lock(&mdsc->mutex);
if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
......@@ -2777,6 +2988,25 @@ static void handle_forward(struct ceph_mds_client *mdsc,
pr_err("mdsc_handle_forward decode error err=%d\n", err);
}
static int __decode_and_drop_session_metadata(void **p, void *end)
{
/* map<string,string> */
u32 n;
ceph_decode_32_safe(p, end, n, bad);
while (n-- > 0) {
u32 len;
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_need(p, end, len, bad);
*p += len;
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_need(p, end, len, bad);
*p += len;
}
return 0;
bad:
return -1;
}
/*
* handle a mds session control message
*/
......@@ -2784,18 +3014,36 @@ static void handle_session(struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct ceph_mds_client *mdsc = session->s_mdsc;
int mds = session->s_mds;
int msg_version = le16_to_cpu(msg->hdr.version);
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
struct ceph_mds_session_head *h;
u32 op;
u64 seq;
int mds = session->s_mds;
struct ceph_mds_session_head *h = msg->front.iov_base;
unsigned long features = 0;
int wake = 0;
/* decode */
if (msg->front.iov_len < sizeof(*h))
goto bad;
ceph_decode_need(&p, end, sizeof(*h), bad);
h = p;
p += sizeof(*h);
op = le32_to_cpu(h->op);
seq = le64_to_cpu(h->seq);
if (msg_version >= 3) {
u32 len;
/* version >= 2, metadata */
if (__decode_and_drop_session_metadata(&p, end) < 0)
goto bad;
/* version >= 3, feature bits */
ceph_decode_32_safe(&p, end, len, bad);
ceph_decode_need(&p, end, len, bad);
memcpy(&features, p, min_t(size_t, len, sizeof(features)));
p += len;
}
mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE) {
get_session(session);
......@@ -2821,6 +3069,7 @@ static void handle_session(struct ceph_mds_session *session,
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info("mds%d reconnect success\n", session->s_mds);
session->s_state = CEPH_MDS_SESSION_OPEN;
session->s_features = features;
renewed_caps(mdsc, session, 0);
wake = 1;
if (mdsc->stopping)
......@@ -2947,6 +3196,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex);
}
static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
{
struct ceph_msg *reply;
struct ceph_pagelist *_pagelist;
struct page *page;
__le32 *addr;
int err = -ENOMEM;
if (!recon_state->allow_multi)
return -ENOSPC;
/* can't handle message that contains both caps and realm */
BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
/* pre-allocate new pagelist */
_pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!_pagelist)
return -ENOMEM;
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
if (!reply)
goto fail_msg;
/* placeholder for nr_caps */
err = ceph_pagelist_encode_32(_pagelist, 0);
if (err < 0)
goto fail;
if (recon_state->nr_caps) {
/* currently encoding caps */
err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
if (err)
goto fail;
} else {
/* placeholder for nr_realms (currently encoding relams) */
err = ceph_pagelist_encode_32(_pagelist, 0);
if (err < 0)
goto fail;
}
err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
if (err)
goto fail;
page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
addr = kmap_atomic(page);
if (recon_state->nr_caps) {
/* currently encoding caps */
*addr = cpu_to_le32(recon_state->nr_caps);
} else {
/* currently encoding relams */
*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
}
kunmap_atomic(addr);
reply->hdr.version = cpu_to_le16(5);
reply->hdr.compat_version = cpu_to_le16(4);
reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
ceph_con_send(&recon_state->session->s_con, reply);
ceph_pagelist_release(recon_state->pagelist);
recon_state->pagelist = _pagelist;
recon_state->nr_caps = 0;
recon_state->nr_realms = 0;
recon_state->msg_version = 5;
return 0;
fail:
ceph_msg_put(reply);
fail_msg:
ceph_pagelist_release(_pagelist);
return err;
}
/*
* Encode information about a cap for a reconnect with the MDS.
*/
......@@ -2966,9 +3291,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued));
err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
if (err)
return err;
spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */
......@@ -3008,7 +3330,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (recon_state->msg_version >= 2) {
int num_fcntl_locks, num_flock_locks;
struct ceph_filelock *flocks = NULL;
size_t struct_len, total_len = 0;
size_t struct_len, total_len = sizeof(u64);
u8 struct_v = 0;
encode_again:
......@@ -3043,7 +3365,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (recon_state->msg_version >= 3) {
/* version, compat_version and struct_len */
total_len = 2 * sizeof(u8) + sizeof(u32);
total_len += 2 * sizeof(u8) + sizeof(u32);
struct_v = 2;
}
/*
......@@ -3060,12 +3382,19 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct_len += sizeof(u64); /* snap_follows */
total_len += struct_len;
err = ceph_pagelist_reserve(pagelist, total_len);
if (err) {
kfree(flocks);
goto out_err;
if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
err = send_reconnect_partial(recon_state);
if (err)
goto out_freeflocks;
pagelist = recon_state->pagelist;
}
err = ceph_pagelist_reserve(pagelist, total_len);
if (err)
goto out_freeflocks;
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
if (recon_state->msg_version >= 3) {
ceph_pagelist_encode_8(pagelist, struct_v);
ceph_pagelist_encode_8(pagelist, 1);
......@@ -3077,7 +3406,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
num_fcntl_locks, num_flock_locks);
if (struct_v >= 2)
ceph_pagelist_encode_64(pagelist, snap_follows);
out_freeflocks:
kfree(flocks);
} else {
u64 pathbase = 0;
......@@ -3098,20 +3427,81 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
}
err = ceph_pagelist_reserve(pagelist,
pathlen + sizeof(u32) + sizeof(rec.v1));
sizeof(u64) + sizeof(u32) +
pathlen + sizeof(rec.v1));
if (err) {
kfree(path);
goto out_err;
goto out_freepath;
}
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
ceph_pagelist_encode_string(pagelist, path, pathlen);
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
out_freepath:
kfree(path);
}
recon_state->nr_caps++;
out_err:
if (err >= 0)
recon_state->nr_caps++;
return err;
}
static int encode_snap_realms(struct ceph_mds_client *mdsc,
struct ceph_reconnect_state *recon_state)
{
struct rb_node *p;
struct ceph_pagelist *pagelist = recon_state->pagelist;
int err = 0;
if (recon_state->msg_version >= 4) {
err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
if (err < 0)
goto fail;
}
/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
* it will tell us.
*/
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
struct ceph_snap_realm *realm =
rb_entry(p, struct ceph_snap_realm, node);
struct ceph_mds_snaprealm_reconnect sr_rec;
if (recon_state->msg_version >= 4) {
size_t need = sizeof(u8) * 2 + sizeof(u32) +
sizeof(sr_rec);
if (pagelist->length + need > RECONNECT_MAX_SIZE) {
err = send_reconnect_partial(recon_state);
if (err)
goto fail;
pagelist = recon_state->pagelist;
}
err = ceph_pagelist_reserve(pagelist, need);
if (err)
goto fail;
ceph_pagelist_encode_8(pagelist, 1);
ceph_pagelist_encode_8(pagelist, 1);
ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
}
dout(" adding snap realm %llx seq %lld parent %llx\n",
realm->ino, realm->seq, realm->parent_ino);
sr_rec.ino = cpu_to_le64(realm->ino);
sr_rec.seq = cpu_to_le64(realm->seq);
sr_rec.parent = cpu_to_le64(realm->parent_ino);
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
if (err)
goto fail;
recon_state->nr_realms++;
}
fail:
return err;
}
......@@ -3132,18 +3522,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_msg *reply;
struct rb_node *p;
int mds = session->s_mds;
int err = -ENOMEM;
int s_nr_caps;
struct ceph_pagelist *pagelist;
struct ceph_reconnect_state recon_state;
struct ceph_reconnect_state recon_state = {
.session = session,
};
LIST_HEAD(dispose);
pr_info("mds%d reconnect start\n", mds);
pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!pagelist)
recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!recon_state.pagelist)
goto fail_nopagelist;
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
......@@ -3187,63 +3576,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
/* replay unsafe requests */
replay_unsafe_requests(mdsc, session);
ceph_early_kick_flushing_caps(mdsc, session);
down_read(&mdsc->snap_rwsem);
/* traverse this session's caps */
s_nr_caps = session->s_nr_caps;
err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
/* placeholder for nr_caps */
err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
if (err)
goto fail;
recon_state.nr_caps = 0;
recon_state.pagelist = pagelist;
if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
recon_state.msg_version = 3;
else
recon_state.allow_multi = true;
} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
recon_state.msg_version = 3;
} else {
recon_state.msg_version = 2;
}
/* trsaverse this session's caps */
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0;
spin_unlock(&session->s_cap_lock);
/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
* it will tell us.
*/
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
struct ceph_snap_realm *realm =
rb_entry(p, struct ceph_snap_realm, node);
struct ceph_mds_snaprealm_reconnect sr_rec;
if (err < 0)
goto fail;
dout(" adding snap realm %llx seq %lld parent %llx\n",
realm->ino, realm->seq, realm->parent_ino);
sr_rec.ino = cpu_to_le64(realm->ino);
sr_rec.seq = cpu_to_le64(realm->seq);
sr_rec.parent = cpu_to_le64(realm->parent_ino);
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
/* check if all realms can be encoded into current message */
if (mdsc->num_snap_realms) {
size_t total_len =
recon_state.pagelist->length +
mdsc->num_snap_realms *
sizeof(struct ceph_mds_snaprealm_reconnect);
if (recon_state.msg_version >= 4) {
/* number of realms */
total_len += sizeof(u32);
/* version, compat_version and struct_len */
total_len += mdsc->num_snap_realms *
(2 * sizeof(u8) + sizeof(u32));
}
if (total_len > RECONNECT_MAX_SIZE) {
if (!recon_state.allow_multi) {
err = -ENOSPC;
goto fail;
}
if (recon_state.nr_caps) {
err = send_reconnect_partial(&recon_state);
if (err)
goto fail;
}
recon_state.msg_version = 5;
}
}
reply->hdr.version = cpu_to_le16(recon_state.msg_version);
err = encode_snap_realms(mdsc, &recon_state);
if (err < 0)
goto fail;
if (recon_state.msg_version >= 5) {
err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
if (err < 0)
goto fail;
}
/* raced with cap release? */
if (s_nr_caps != recon_state.nr_caps) {
struct page *page = list_first_entry(&pagelist->head,
if (recon_state.nr_caps || recon_state.nr_realms) {
struct page *page =
list_first_entry(&recon_state.pagelist->head,
struct page, lru);
__le32 *addr = kmap_atomic(page);
if (recon_state.nr_caps) {
WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
*addr = cpu_to_le32(recon_state.nr_caps);
} else if (recon_state.msg_version >= 4) {
*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
}
kunmap_atomic(addr);
}
reply->hdr.data_len = cpu_to_le32(pagelist->length);
ceph_msg_data_add_pagelist(reply, pagelist);
reply->hdr.version = cpu_to_le16(recon_state.msg_version);
if (recon_state.msg_version >= 4)
reply->hdr.compat_version = cpu_to_le16(4);
ceph_early_kick_flushing_caps(mdsc, session);
reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
ceph_con_send(&session->s_con, reply);
......@@ -3254,7 +3670,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex);
up_read(&mdsc->snap_rwsem);
ceph_pagelist_release(pagelist);
ceph_pagelist_release(recon_state.pagelist);
return;
fail:
......@@ -3262,7 +3678,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
up_read(&mdsc->snap_rwsem);
mutex_unlock(&session->s_mutex);
fail_nomsg:
ceph_pagelist_release(pagelist);
ceph_pagelist_release(recon_state.pagelist);
fail_nopagelist:
pr_err("error %d preparing reconnect for mds%d\n", err, mds);
return;
......@@ -3580,7 +3996,6 @@ static void delayed_work(struct work_struct *work)
int renew_caps;
dout("mdsc delayed_work\n");
ceph_check_delayed_caps(mdsc);
mutex_lock(&mdsc->mutex);
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
......@@ -3628,6 +4043,12 @@ static void delayed_work(struct work_struct *work)
}
mutex_unlock(&mdsc->mutex);
ceph_check_delayed_caps(mdsc);
ceph_queue_cap_reclaim_work(mdsc);
ceph_trim_snapid_map(mdsc);
schedule_delayed(mdsc);
}
......@@ -3660,6 +4081,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT;
INIT_LIST_HEAD(&mdsc->snap_empty);
mdsc->num_snap_realms = 0;
spin_lock_init(&mdsc->snap_empty_lock);
mdsc->last_tid = 0;
mdsc->oldest_tid = 0;
......@@ -3677,11 +4099,19 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq);
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
atomic_set(&mdsc->cap_reclaim_pending, 0);
spin_lock_init(&mdsc->dentry_list_lock);
INIT_LIST_HEAD(&mdsc->dentry_leases);
INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
ceph_caps_init(mdsc);
ceph_adjust_min_caps(mdsc, fsc->min_caps);
ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
spin_lock_init(&mdsc->snapid_map_lock);
mdsc->snapid_map_tree = RB_ROOT;
INIT_LIST_HEAD(&mdsc->snapid_map_lru);
init_rwsem(&mdsc->pool_perm_rwsem);
mdsc->pool_perm_tree = RB_ROOT;
......@@ -3876,8 +4306,10 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
WARN_ON(!list_empty(&mdsc->cap_delay_list));
mutex_unlock(&mdsc->mutex);
ceph_cleanup_snapid_map(mdsc);
ceph_cleanup_empty_realms(mdsc);
cancel_work_sync(&mdsc->cap_reclaim_work);
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
dout("stopped\n");
......
......@@ -21,11 +21,14 @@
#define CEPHFS_FEATURE_REPLY_ENCODING 9
#define CEPHFS_FEATURE_RECLAIM_CLIENT 10
#define CEPHFS_FEATURE_LAZY_CAP_WANTED 11
#define CEPHFS_FEATURE_MULTI_RECONNECT 12
#define CEPHFS_FEATURES_CLIENT_SUPPORTED { \
0, 1, 2, 3, 4, 5, 6, 7, \
CEPHFS_FEATURE_MIMIC, \
CEPHFS_FEATURE_REPLY_ENCODING, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
CEPHFS_FEATURE_MULTI_RECONNECT, \
}
#define CEPHFS_FEATURES_CLIENT_REQUIRED {}
......@@ -65,6 +68,7 @@ struct ceph_mds_reply_info_in {
char *pool_ns_data;
u64 max_bytes;
u64 max_files;
s32 dir_pin;
};
struct ceph_mds_reply_dir_entry {
......@@ -152,6 +156,7 @@ struct ceph_mds_session {
int s_mds;
int s_state;
unsigned long s_ttl; /* time until mds kills us */
unsigned long s_features;
u64 s_seq; /* incoming msg seq # */
struct mutex s_mutex; /* serialize session messages */
......@@ -167,12 +172,13 @@ struct ceph_mds_session {
/* protected by s_cap_lock */
spinlock_t s_cap_lock;
struct list_head s_caps; /* all caps issued by this session */
struct ceph_cap *s_cap_iterator;
int s_nr_caps, s_trim_caps;
int s_num_cap_releases;
int s_cap_reconnect;
int s_readonly;
struct list_head s_cap_releases; /* waiting cap_release messages */
struct ceph_cap *s_cap_iterator;
struct work_struct s_cap_release_work;
/* protected by mutex */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
......@@ -310,6 +316,15 @@ struct ceph_pool_perm {
char pool_ns[];
};
struct ceph_snapid_map {
struct rb_node node;
struct list_head lru;
atomic_t ref;
u64 snap;
dev_t dev;
unsigned long last_used;
};
/*
* mds client state
*/
......@@ -341,6 +356,7 @@ struct ceph_mds_client {
struct rw_semaphore snap_rwsem;
struct rb_root snap_realms;
struct list_head snap_empty;
int num_snap_realms;
spinlock_t snap_empty_lock; /* protect snap_empty */
u64 last_tid; /* most recent mds request */
......@@ -362,6 +378,9 @@ struct ceph_mds_client {
spinlock_t cap_dirty_lock; /* protects above items */
wait_queue_head_t cap_flushing_wq;
struct work_struct cap_reclaim_work;
atomic_t cap_reclaim_pending;
/*
* Cap reservations
*
......@@ -378,13 +397,18 @@ struct ceph_mds_client {
unreserved) */
int caps_total_count; /* total caps allocated */
int caps_use_count; /* in use */
int caps_use_max; /* max used caps */
int caps_reserve_count; /* unused, reserved */
int caps_avail_count; /* unused, unreserved */
int caps_min_count; /* keep at least this many
(unreserved) */
spinlock_t dentry_lru_lock;
struct list_head dentry_lru;
int num_dentry;
spinlock_t dentry_list_lock;
struct list_head dentry_leases; /* fifo list */
struct list_head dentry_dir_leases; /* lru list */
spinlock_t snapid_map_lock;
struct rb_root snapid_map_tree;
struct list_head snapid_map_lru;
struct rw_semaphore pool_perm_rwsem;
struct rb_root pool_perm_tree;
......@@ -438,9 +462,12 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
kref_put(&req->r_kref, ceph_mdsc_release_request);
}
extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
struct ceph_cap *cap);
extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
......
......@@ -3,12 +3,13 @@
#include <linux/sort.h>
#include <linux/slab.h>
#include "super.h"
#include "mds_client.h"
#include <linux/ceph/decode.h>
/* unused map expires after 5 minutes */
#define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ)
/*
* Snapshots in ceph are driven in large part by cooperation from the
* client. In contrast to local file systems or file servers that
......@@ -124,6 +125,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
INIT_LIST_HEAD(&realm->inodes_with_caps);
spin_lock_init(&realm->inodes_with_caps_lock);
__insert_snap_realm(&mdsc->snap_realms, realm);
mdsc->num_snap_realms++;
dout("create_snap_realm %llx %p\n", realm->ino, realm);
return realm;
}
......@@ -175,6 +178,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
rb_erase(&realm->node, &mdsc->snap_realms);
mdsc->num_snap_realms--;
if (realm->parent) {
list_del_init(&realm->child_item);
......@@ -986,3 +990,154 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
up_write(&mdsc->snap_rwsem);
return;
}
struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
u64 snap)
{
struct ceph_snapid_map *sm, *exist;
struct rb_node **p, *parent;
int ret;
exist = NULL;
spin_lock(&mdsc->snapid_map_lock);
p = &mdsc->snapid_map_tree.rb_node;
while (*p) {
exist = rb_entry(*p, struct ceph_snapid_map, node);
if (snap > exist->snap) {
p = &(*p)->rb_left;
} else if (snap < exist->snap) {
p = &(*p)->rb_right;
} else {
if (atomic_inc_return(&exist->ref) == 1)
list_del_init(&exist->lru);
break;
}
exist = NULL;
}
spin_unlock(&mdsc->snapid_map_lock);
if (exist) {
dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
return exist;
}
sm = kmalloc(sizeof(*sm), GFP_NOFS);
if (!sm)
return NULL;
ret = get_anon_bdev(&sm->dev);
if (ret < 0) {
kfree(sm);
return NULL;
}
INIT_LIST_HEAD(&sm->lru);
atomic_set(&sm->ref, 1);
sm->snap = snap;
exist = NULL;
parent = NULL;
p = &mdsc->snapid_map_tree.rb_node;
spin_lock(&mdsc->snapid_map_lock);
while (*p) {
parent = *p;
exist = rb_entry(*p, struct ceph_snapid_map, node);
if (snap > exist->snap)
p = &(*p)->rb_left;
else if (snap < exist->snap)
p = &(*p)->rb_right;
else
break;
exist = NULL;
}
if (exist) {
if (atomic_inc_return(&exist->ref) == 1)
list_del_init(&exist->lru);
} else {
rb_link_node(&sm->node, parent, p);
rb_insert_color(&sm->node, &mdsc->snapid_map_tree);
}
spin_unlock(&mdsc->snapid_map_lock);
if (exist) {
free_anon_bdev(sm->dev);
kfree(sm);
dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
return exist;
}
dout("create snapid map %llx -> %x\n", sm->snap, sm->dev);
return sm;
}
void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
struct ceph_snapid_map *sm)
{
if (!sm)
return;
if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) {
if (!RB_EMPTY_NODE(&sm->node)) {
sm->last_used = jiffies;
list_add_tail(&sm->lru, &mdsc->snapid_map_lru);
spin_unlock(&mdsc->snapid_map_lock);
} else {
/* already cleaned up by
* ceph_cleanup_snapid_map() */
spin_unlock(&mdsc->snapid_map_lock);
kfree(sm);
}
}
}
void ceph_trim_snapid_map(struct ceph_mds_client *mdsc)
{
struct ceph_snapid_map *sm;
unsigned long now;
LIST_HEAD(to_free);
spin_lock(&mdsc->snapid_map_lock);
now = jiffies;
while (!list_empty(&mdsc->snapid_map_lru)) {
sm = list_first_entry(&mdsc->snapid_map_lru,
struct ceph_snapid_map, lru);
if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now))
break;
rb_erase(&sm->node, &mdsc->snapid_map_tree);
list_move(&sm->lru, &to_free);
}
spin_unlock(&mdsc->snapid_map_lock);
while (!list_empty(&to_free)) {
sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
list_del(&sm->lru);
dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev);
free_anon_bdev(sm->dev);
kfree(sm);
}
}
void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
{
struct ceph_snapid_map *sm;
struct rb_node *p;
LIST_HEAD(to_free);
spin_lock(&mdsc->snapid_map_lock);
while ((p = rb_first(&mdsc->snapid_map_tree))) {
sm = rb_entry(p, struct ceph_snapid_map, node);
rb_erase(p, &mdsc->snapid_map_tree);
RB_CLEAR_NODE(p);
list_move(&sm->lru, &to_free);
}
spin_unlock(&mdsc->snapid_map_lock);
while (!list_empty(&to_free)) {
sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
list_del(&sm->lru);
free_anon_bdev(sm->dev);
if (WARN_ON_ONCE(atomic_read(&sm->ref))) {
pr_err("snapid map %llx -> %x still in use\n",
sm->snap, sm->dev);
}
}
}
......@@ -133,6 +133,7 @@ enum {
Opt_rasize,
Opt_caps_wanted_delay_min,
Opt_caps_wanted_delay_max,
Opt_caps_max,
Opt_readdir_max_entries,
Opt_readdir_max_bytes,
Opt_congestion_kb,
......@@ -175,6 +176,7 @@ static match_table_t fsopt_tokens = {
{Opt_rasize, "rasize=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
{Opt_caps_max, "caps_max=%d"},
{Opt_readdir_max_entries, "readdir_max_entries=%d"},
{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
{Opt_congestion_kb, "write_congestion_kb=%d"},
......@@ -286,6 +288,11 @@ static int parse_fsopt_token(char *c, void *private)
return -EINVAL;
fsopt->caps_wanted_delay_max = intval;
break;
case Opt_caps_max:
if (intval < 0)
return -EINVAL;
fsopt->caps_max = intval;
break;
case Opt_readdir_max_entries:
if (intval < 1)
return -EINVAL;
......@@ -576,6 +583,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_printf(m, ",rasize=%d", fsopt->rasize);
if (fsopt->congestion_kb != default_congestion_kb())
seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
if (fsopt->caps_max)
seq_printf(m, ",caps_max=%d", fsopt->caps_max);
if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
seq_printf(m, ",caps_wanted_delay_min=%d",
fsopt->caps_wanted_delay_min);
......@@ -671,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
if (!fsc->trunc_wq)
goto fail_pg_inv_wq;
fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
if (!fsc->cap_wq)
goto fail_trunc_wq;
/* set up mempools */
err = -ENOMEM;
......@@ -678,13 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
size = sizeof (struct page *) * (page_count ? page_count : 1);
fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
if (!fsc->wb_pagevec_pool)
goto fail_trunc_wq;
/* caps */
fsc->min_caps = fsopt->max_readdir;
goto fail_cap_wq;
return fsc;
fail_cap_wq:
destroy_workqueue(fsc->cap_wq);
fail_trunc_wq:
destroy_workqueue(fsc->trunc_wq);
fail_pg_inv_wq:
......@@ -706,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
flush_workqueue(fsc->wb_wq);
flush_workqueue(fsc->pg_inv_wq);
flush_workqueue(fsc->trunc_wq);
flush_workqueue(fsc->cap_wq);
}
static void destroy_fs_client(struct ceph_fs_client *fsc)
......@@ -715,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
destroy_workqueue(fsc->wb_wq);
destroy_workqueue(fsc->pg_inv_wq);
destroy_workqueue(fsc->trunc_wq);
destroy_workqueue(fsc->cap_wq);
mempool_destroy(fsc->wb_pagevec_pool);
......
......@@ -79,6 +79,7 @@ struct ceph_mount_options {
int rasize; /* max readahead */
int congestion_kb; /* max writeback in flight */
int caps_wanted_delay_min, caps_wanted_delay_max;
int caps_max;
int max_readdir; /* max readdir result (entires) */
int max_readdir_bytes; /* max readdir result (bytes) */
......@@ -100,17 +101,18 @@ struct ceph_fs_client {
struct ceph_client *client;
unsigned long mount_state;
int min_caps; /* min caps i added */
loff_t max_file_size;
struct ceph_mds_client *mdsc;
/* writeback */
mempool_t *wb_pagevec_pool;
atomic_long_t writeback_count;
struct workqueue_struct *wb_wq;
struct workqueue_struct *pg_inv_wq;
struct workqueue_struct *trunc_wq;
atomic_long_t writeback_count;
struct workqueue_struct *cap_wq;
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_dentry_lru, *debugfs_caps;
......@@ -260,17 +262,22 @@ struct ceph_inode_xattr {
* Ceph dentry state
*/
struct ceph_dentry_info {
struct dentry *dentry;
struct ceph_mds_session *lease_session;
struct list_head lease_list;
unsigned flags;
int lease_shared_gen;
u32 lease_gen;
u32 lease_seq;
unsigned long lease_renew_after, lease_renew_from;
struct list_head lru;
struct dentry *dentry;
unsigned long time;
u64 offset;
};
#define CEPH_DENTRY_REFERENCED 1
#define CEPH_DENTRY_LEASE_LIST 2
#define CEPH_DENTRY_SHRINK_LIST 4
struct ceph_inode_xattrs_info {
/*
* (still encoded) xattr blob. we avoid the overhead of parsing
......@@ -318,6 +325,8 @@ struct ceph_inode_info {
/* quotas */
u64 i_max_bytes, i_max_files;
s32 i_dir_pin;
struct rb_root i_fragtree;
int i_fragtree_nsplits;
struct mutex i_fragtree_mutex;
......@@ -370,7 +379,10 @@ struct ceph_inode_info {
struct list_head i_unsafe_iops; /* uncommitted mds inode ops */
spinlock_t i_unsafe_lock;
union {
struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
struct ceph_snapid_map *i_snapid_map; /* snapid -> dev_t */
};
int i_snap_realm_counter; /* snap realm (if caps) */
struct list_head i_snap_realm_item;
struct list_head i_snap_flush_item;
......@@ -587,7 +599,7 @@ extern u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
struct ceph_inode_frag *pfrag,
int *found);
static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
static inline struct ceph_dentry_info *ceph_dentry(const struct dentry *dentry)
{
return (struct ceph_dentry_info *)dentry->d_fsdata;
}
......@@ -656,7 +668,8 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
extern void ceph_caps_init(struct ceph_mds_client *mdsc);
extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
struct ceph_mount_options *fsopt);
extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need);
extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
......@@ -837,6 +850,14 @@ extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc,
u64 snap);
extern void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
struct ceph_snapid_map *sm);
extern void ceph_trim_snapid_map(struct ceph_mds_client *mdsc);
extern void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc);
/*
* a cap_snap is "pending" if it is still awaiting an in-progress
* sync write (that may/may not still update size, mtime, etc.).
......@@ -975,11 +996,11 @@ extern void ceph_add_cap(struct inode *inode,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
extern void __ceph_remove_caps(struct inode* inode);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);
extern int ceph_is_any_caps(struct inode *inode);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
int datasync);
......@@ -1049,10 +1070,10 @@ extern int ceph_handle_snapdir(struct ceph_mds_request *req,
extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
struct dentry *dentry, int err);
extern void ceph_dentry_lru_add(struct dentry *dn);
extern void ceph_dentry_lru_touch(struct dentry *dn);
extern void ceph_dentry_lru_del(struct dentry *dn);
extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di);
extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di);
extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
extern int ceph_trim_dentries(struct ceph_mds_client *mdsc);
extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
......
......@@ -228,8 +228,19 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
ci->i_rctime.tv_nsec);
}
/* quotas */
/* dir pin */
static bool ceph_vxattrcb_dir_pin_exists(struct ceph_inode_info *ci)
{
return ci->i_dir_pin != -ENODATA;
}
static size_t ceph_vxattrcb_dir_pin(struct ceph_inode_info *ci, char *val,
size_t size)
{
return snprintf(val, size, "%d", (int)ci->i_dir_pin);
}
/* quotas */
static bool ceph_vxattrcb_quota_exists(struct ceph_inode_info *ci)
{
bool ret = false;
......@@ -314,6 +325,13 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
XATTR_RSTAT_FIELD(dir, rsubdirs),
XATTR_RSTAT_FIELD(dir, rbytes),
XATTR_RSTAT_FIELD(dir, rctime),
{
.name = "ceph.dir.pin",
.name_size = sizeof("ceph.dir_pin"),
.getxattr_cb = ceph_vxattrcb_dir_pin,
.exists_cb = ceph_vxattrcb_dir_pin_exists,
.flags = VXATTR_FLAG_HIDDEN,
},
{
.name = "ceph.quota",
.name_size = sizeof("ceph.quota"),
......
......@@ -24,6 +24,7 @@ struct ceph_vino {
/* context for the caps reservation mechanism */
struct ceph_cap_reservation {
int count;
int used;
};
......
......@@ -495,9 +495,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
/ sizeof(struct crush_rule_step))
goto bad;
#endif
r = c->rules[i] = kmalloc(sizeof(*r) +
yes*sizeof(struct crush_rule_step),
GFP_NOFS);
r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
c->rules[i] = r;
if (r == NULL)
goto badmem;
dout(" rule %d is at %p\n", i, r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment