Commit 97d2eb13 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://ceph.newdream.net/git/ceph-client

* 'for-linus' of git://ceph.newdream.net/git/ceph-client:
  libceph: fix double-free of page vector
  ceph: fix 32-bit ino numbers
  libceph: force resend of osd requests if we skip an osdmap
  ceph: use kernel DNS resolver
  ceph: fix ceph_monc_init memory leak
  ceph: let the set_layout ioctl set single traits
  Revert "ceph: don't truncate dirty pages in invalidate work thread"
  ceph: replace leading spaces with tabs
  libceph: warn on msg allocation failures
  libceph: don't complain on msgpool alloc failures
  libceph: always preallocate mon connection
  libceph: create messenger with client
  ceph: document ioctls
  ceph: implement (optional) max read size
  ceph: rename rsize -> rasize
  ceph: make readpages fully async
parents 68d99b2c 33957340
...@@ -260,7 +260,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt, ...@@ -260,7 +260,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
kref_init(&rbdc->kref); kref_init(&rbdc->kref);
INIT_LIST_HEAD(&rbdc->node); INIT_LIST_HEAD(&rbdc->node);
rbdc->client = ceph_create_client(opt, rbdc); rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
if (IS_ERR(rbdc->client)) if (IS_ERR(rbdc->client))
goto out_rbdc; goto out_rbdc;
opt = NULL; /* Now rbdc->client is responsible for opt */ opt = NULL; /* Now rbdc->client is responsible for opt */
......
...@@ -228,102 +228,155 @@ static int ceph_readpage(struct file *filp, struct page *page) ...@@ -228,102 +228,155 @@ static int ceph_readpage(struct file *filp, struct page *page)
} }
/* /*
* Build a vector of contiguous pages from the provided page list. * Finish an async read(ahead) op.
*/ */
static struct page **page_vector_from_list(struct list_head *page_list, static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
unsigned *nr_pages)
{ {
struct page **pages; struct inode *inode = req->r_inode;
struct page *page; struct ceph_osd_reply_head *replyhead;
int next_index, contig_pages = 0; int rc, bytes;
int i;
/* build page vector */ /* parse reply */
pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); replyhead = msg->front.iov_base;
if (!pages) WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
return ERR_PTR(-ENOMEM); rc = le32_to_cpu(replyhead->result);
bytes = le32_to_cpu(msg->hdr.data_len);
BUG_ON(list_empty(page_list)); dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
next_index = list_entry(page_list->prev, struct page, lru)->index;
list_for_each_entry_reverse(page, page_list, lru) { /* unlock all pages, zeroing any data we didn't read */
if (page->index == next_index) { for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
dout("readpages page %d %p\n", contig_pages, page); struct page *page = req->r_pages[i];
pages[contig_pages] = page;
contig_pages++; if (bytes < (int)PAGE_CACHE_SIZE) {
next_index++; /* zero (remainder of) page */
} else { int s = bytes < 0 ? 0 : bytes;
break; zero_user_segment(page, s, PAGE_CACHE_SIZE);
} }
dout("finish_read %p uptodate %p idx %lu\n", inode, page,
page->index);
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
page_cache_release(page);
} }
*nr_pages = contig_pages; kfree(req->r_pages);
return pages;
} }
/* /*
* Read multiple pages. Leave pages we don't read + unlock in page_list; * start an async read(ahead) operation. return nr_pages we submitted
* the caller (VM) cleans them up. * a read for on success, or negative error code.
*/ */
static int ceph_readpages(struct file *file, struct address_space *mapping, static int start_read(struct inode *inode, struct list_head *page_list, int max)
struct list_head *page_list, unsigned nr_pages)
{ {
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc = struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc; &ceph_inode_to_client(inode)->client->osdc;
int rc = 0; struct ceph_inode_info *ci = ceph_inode(inode);
struct page **pages; struct page *page = list_entry(page_list->prev, struct page, lru);
loff_t offset; struct ceph_osd_request *req;
u64 off;
u64 len; u64 len;
int i;
struct page **pages;
pgoff_t next_index;
int nr_pages = 0;
int ret;
dout("readpages %p file %p nr_pages %d\n", off = page->index << PAGE_CACHE_SHIFT;
inode, file, nr_pages);
pages = page_vector_from_list(page_list, &nr_pages);
if (IS_ERR(pages))
return PTR_ERR(pages);
/* guess read extent */ /* count pages */
offset = pages[0]->index << PAGE_CACHE_SHIFT; next_index = page->index;
list_for_each_entry_reverse(page, page_list, lru) {
if (page->index != next_index)
break;
nr_pages++;
next_index++;
if (max && nr_pages == max)
break;
}
len = nr_pages << PAGE_CACHE_SHIFT; len = nr_pages << PAGE_CACHE_SHIFT;
rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
offset, &len, off, len);
ci->i_truncate_seq, ci->i_truncate_size,
pages, nr_pages, 0); req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
if (rc == -ENOENT) off, &len,
rc = 0; CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
if (rc < 0) NULL, 0,
goto out; ci->i_truncate_seq, ci->i_truncate_size,
NULL, false, 1, 0);
for (; !list_empty(page_list) && len > 0; if (!req)
rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { return -ENOMEM;
struct page *page =
list_entry(page_list->prev, struct page, lru);
/* build page vector */
nr_pages = len >> PAGE_CACHE_SHIFT;
pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
ret = -ENOMEM;
if (!pages)
goto out;
for (i = 0; i < nr_pages; ++i) {
page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page));
list_del(&page->lru); list_del(&page->lru);
if (rc < (int)PAGE_CACHE_SIZE) { dout("start_read %p adding %p idx %lu\n", inode, page,
/* zero (remainder of) page */ page->index);
int s = rc < 0 ? 0 : rc; if (add_to_page_cache_lru(page, &inode->i_data, page->index,
zero_user_segment(page, s, PAGE_CACHE_SIZE);
}
if (add_to_page_cache_lru(page, mapping, page->index,
GFP_NOFS)) { GFP_NOFS)) {
page_cache_release(page); page_cache_release(page);
dout("readpages %p add_to_page_cache failed %p\n", dout("start_read %p add_to_page_cache failed %p\n",
inode, page); inode, page);
continue; nr_pages = i;
goto out_pages;
} }
dout("readpages %p adding %p idx %lu\n", inode, page, pages[i] = page;
page->index);
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
page_cache_release(page);
} }
rc = 0; req->r_pages = pages;
req->r_num_pages = nr_pages;
req->r_callback = finish_read;
req->r_inode = inode;
dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
ret = ceph_osdc_start_request(osdc, req, false);
if (ret < 0)
goto out_pages;
ceph_osdc_put_request(req);
return nr_pages;
out_pages:
ceph_release_page_vector(pages, nr_pages);
out:
ceph_osdc_put_request(req);
return ret;
}
/*
* Read multiple pages. Leave pages we don't read + unlock in page_list;
* the caller (VM) cleans them up.
*/
static int ceph_readpages(struct file *file, struct address_space *mapping,
struct list_head *page_list, unsigned nr_pages)
{
struct inode *inode = file->f_dentry->d_inode;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
int rc = 0;
int max = 0;
if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
>> PAGE_SHIFT;
dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
max);
while (!list_empty(page_list)) {
rc = start_read(inode, page_list, max);
if (rc < 0)
goto out;
BUG_ON(rc == 0);
}
out: out:
kfree(pages); dout("readpages %p file %p ret %d\n", inode, file, rc);
return rc; return rc;
} }
......
...@@ -945,7 +945,7 @@ static int send_cap_msg(struct ceph_mds_session *session, ...@@ -945,7 +945,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size, seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
if (!msg) if (!msg)
return -ENOMEM; return -ENOMEM;
......
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
#include <linux/namei.h> #include <linux/namei.h>
#include <linux/writeback.h> #include <linux/writeback.h>
#include <linux/vmalloc.h> #include <linux/vmalloc.h>
#include <linux/pagevec.h>
#include "super.h" #include "super.h"
#include "mds_client.h" #include "mds_client.h"
...@@ -1363,49 +1362,6 @@ void ceph_queue_invalidate(struct inode *inode) ...@@ -1363,49 +1362,6 @@ void ceph_queue_invalidate(struct inode *inode)
} }
} }
/*
* invalidate any pages that are not dirty or under writeback. this
* includes pages that are clean and mapped.
*/
static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
{
struct pagevec pvec;
pgoff_t next = 0;
int i;
pagevec_init(&pvec, 0);
while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
for (i = 0; i < pagevec_count(&pvec); i++) {
struct page *page = pvec.pages[i];
pgoff_t index;
int skip_page =
(PageDirty(page) || PageWriteback(page));
if (!skip_page)
skip_page = !trylock_page(page);
/*
* We really shouldn't be looking at the ->index of an
* unlocked page. But we're not allowed to lock these
* pages. So we rely upon nobody altering the ->index
* of this (pinned-by-us) page.
*/
index = page->index;
if (index > next)
next = index;
next++;
if (skip_page)
continue;
generic_error_remove_page(mapping, page);
unlock_page(page);
}
pagevec_release(&pvec);
cond_resched();
}
}
/* /*
* Invalidate inode pages in a worker thread. (This can't be done * Invalidate inode pages in a worker thread. (This can't be done
* in the message handler context.) * in the message handler context.)
...@@ -1429,7 +1385,7 @@ static void ceph_invalidate_work(struct work_struct *work) ...@@ -1429,7 +1385,7 @@ static void ceph_invalidate_work(struct work_struct *work)
orig_gen = ci->i_rdcache_gen; orig_gen = ci->i_rdcache_gen;
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
ceph_invalidate_nondirty_pages(inode->i_mapping); truncate_inode_pages(&inode->i_data, 0);
spin_lock(&inode->i_lock); spin_lock(&inode->i_lock);
if (orig_gen == ci->i_rdcache_gen && if (orig_gen == ci->i_rdcache_gen &&
......
...@@ -42,17 +42,39 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) ...@@ -42,17 +42,39 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req; struct ceph_mds_request *req;
struct ceph_ioctl_layout l; struct ceph_ioctl_layout l;
struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
struct ceph_ioctl_layout nl;
int err, i; int err, i;
/* copy and validate */
if (copy_from_user(&l, arg, sizeof(l))) if (copy_from_user(&l, arg, sizeof(l)))
return -EFAULT; return -EFAULT;
if ((l.object_size & ~PAGE_MASK) || /* validate changed params against current layout */
(l.stripe_unit & ~PAGE_MASK) || err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
!l.stripe_unit || if (!err) {
(l.object_size && nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
(unsigned)l.object_size % (unsigned)l.stripe_unit)) nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
nl.object_size = ceph_file_layout_object_size(ci->i_layout);
nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
nl.preferred_osd =
(s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
} else
return err;
if (l.stripe_count)
nl.stripe_count = l.stripe_count;
if (l.stripe_unit)
nl.stripe_unit = l.stripe_unit;
if (l.object_size)
nl.object_size = l.object_size;
if (l.data_pool)
nl.data_pool = l.data_pool;
if (l.preferred_osd)
nl.preferred_osd = l.preferred_osd;
if ((nl.object_size & ~PAGE_MASK) ||
(nl.stripe_unit & ~PAGE_MASK) ||
((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
return -EINVAL; return -EINVAL;
/* make sure it's a valid data pool */ /* make sure it's a valid data pool */
......
...@@ -6,7 +6,31 @@ ...@@ -6,7 +6,31 @@
#define CEPH_IOCTL_MAGIC 0x97 #define CEPH_IOCTL_MAGIC 0x97
/* just use u64 to align sanely on all archs */ /*
* CEPH_IOC_GET_LAYOUT - get file layout or dir layout policy
* CEPH_IOC_SET_LAYOUT - set file layout
* CEPH_IOC_SET_LAYOUT_POLICY - set dir layout policy
*
* The file layout specifies how file data is striped over objects in
* the distributed object store, which object pool they belong to (if
* it differs from the default), and an optional 'preferred osd' to
* store them on.
*
* Files get a new layout based on the policy set on the containing
* directory or one of its ancestors. The GET_LAYOUT ioctl will let
* you examine the layout for a file or the policy on a directory.
*
* SET_LAYOUT will let you set a layout on a newly created file. This
* only works immediately after the file is created and before any
* data is written to it.
*
* SET_LAYOUT_POLICY will let you set a layout policy (default layout)
* on a directory that will apply to any new files created in that
* directory (or any child directory that doesn't specify a layout of
* its own).
*/
/* use u64 to align sanely on all archs */
struct ceph_ioctl_layout { struct ceph_ioctl_layout {
__u64 stripe_unit, stripe_count, object_size; __u64 stripe_unit, stripe_count, object_size;
__u64 data_pool; __u64 data_pool;
...@@ -21,6 +45,8 @@ struct ceph_ioctl_layout { ...@@ -21,6 +45,8 @@ struct ceph_ioctl_layout {
struct ceph_ioctl_layout) struct ceph_ioctl_layout)
/* /*
* CEPH_IOC_GET_DATALOC - get location of file data in the cluster
*
* Extract identity, address of the OSD and object storing a given * Extract identity, address of the OSD and object storing a given
* file offset. * file offset.
*/ */
...@@ -39,7 +65,34 @@ struct ceph_ioctl_dataloc { ...@@ -39,7 +65,34 @@ struct ceph_ioctl_dataloc {
#define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \ #define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \
struct ceph_ioctl_dataloc) struct ceph_ioctl_dataloc)
/*
* CEPH_IOC_LAZYIO - relax consistency
*
* Normally Ceph switches to synchronous IO when multiple clients have
* the file open (and or more for write). Reads and writes bypass the
* page cache and go directly to the OSD. Setting this flag on a file
* descriptor will allow buffered IO for this file in cases where the
* application knows it won't interfere with other nodes (or doesn't
* care).
*/
#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4) #define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
/*
* CEPH_IOC_SYNCIO - force synchronous IO
*
* This ioctl sets a file flag that forces the synchronous IO that
* bypasses the page cache, even if it is not necessary. This is
* essentially the opposite behavior of IOC_LAZYIO. This forces the
* same read/write path as a file opened by multiple clients when one
* or more of those clients is opened for write.
*
* Note that this type of sync IO takes a different path than a file
* opened with O_SYNC/D_SYNC (writes hit the page cache and are
* immediately flushed on page boundaries). It is very similar to
* O_DIRECT (writes bypass the page cache) excep that O_DIRECT writes
* are not copied (user page must remain stable) and O_DIRECT writes
* have alignment restrictions (on the buffer and file offset).
*/
#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5) #define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5)
#endif #endif
...@@ -764,7 +764,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) ...@@ -764,7 +764,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
struct ceph_msg *msg; struct ceph_msg *msg;
struct ceph_mds_session_head *h; struct ceph_mds_session_head *h;
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
false);
if (!msg) { if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n"); pr_err("create_session_msg ENOMEM creating msg\n");
return NULL; return NULL;
...@@ -1240,7 +1241,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, ...@@ -1240,7 +1241,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
while (session->s_num_cap_releases < session->s_nr_caps + extra) { while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
GFP_NOFS); GFP_NOFS, false);
if (!msg) if (!msg)
goto out_unlocked; goto out_unlocked;
dout("add_cap_releases %p msg %p now %d\n", session, msg, dout("add_cap_releases %p msg %p now %d\n", session, msg,
...@@ -1652,7 +1653,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -1652,7 +1653,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_old_dentry_drop) if (req->r_old_dentry_drop)
len += req->r_old_dentry->d_name.len; len += req->r_old_dentry->d_name.len;
msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
if (!msg) { if (!msg) {
msg = ERR_PTR(-ENOMEM); msg = ERR_PTR(-ENOMEM);
goto out_free2; goto out_free2;
...@@ -2518,7 +2519,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -2518,7 +2519,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail_nopagelist; goto fail_nopagelist;
ceph_pagelist_init(pagelist); ceph_pagelist_init(pagelist);
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
if (!reply) if (!reply)
goto fail_nomsg; goto fail_nomsg;
...@@ -2831,7 +2832,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, ...@@ -2831,7 +2832,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
dnamelen = dentry->d_name.len; dnamelen = dentry->d_name.len;
len += dnamelen; len += dnamelen;
msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
if (!msg) if (!msg)
return; return;
lease = msg->front.iov_base; lease = msg->front.iov_base;
......
...@@ -114,6 +114,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait) ...@@ -114,6 +114,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
enum { enum {
Opt_wsize, Opt_wsize,
Opt_rsize, Opt_rsize,
Opt_rasize,
Opt_caps_wanted_delay_min, Opt_caps_wanted_delay_min,
Opt_caps_wanted_delay_max, Opt_caps_wanted_delay_max,
Opt_cap_release_safety, Opt_cap_release_safety,
...@@ -136,6 +137,7 @@ enum { ...@@ -136,6 +137,7 @@ enum {
static match_table_t fsopt_tokens = { static match_table_t fsopt_tokens = {
{Opt_wsize, "wsize=%d"}, {Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"}, {Opt_rsize, "rsize=%d"},
{Opt_rasize, "rasize=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
{Opt_cap_release_safety, "cap_release_safety=%d"}, {Opt_cap_release_safety, "cap_release_safety=%d"},
...@@ -196,6 +198,9 @@ static int parse_fsopt_token(char *c, void *private) ...@@ -196,6 +198,9 @@ static int parse_fsopt_token(char *c, void *private)
case Opt_rsize: case Opt_rsize:
fsopt->rsize = intval; fsopt->rsize = intval;
break; break;
case Opt_rasize:
fsopt->rasize = intval;
break;
case Opt_caps_wanted_delay_min: case Opt_caps_wanted_delay_min:
fsopt->caps_wanted_delay_min = intval; fsopt->caps_wanted_delay_min = intval;
break; break;
...@@ -289,28 +294,29 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, ...@@ -289,28 +294,29 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name); dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name);
fsopt->sb_flags = flags; fsopt->sb_flags = flags;
fsopt->flags = CEPH_MOUNT_OPT_DEFAULT; fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
fsopt->rsize = CEPH_RSIZE_DEFAULT; fsopt->rsize = CEPH_RSIZE_DEFAULT;
fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); fsopt->rasize = CEPH_RASIZE_DEFAULT;
fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT; fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb(); fsopt->congestion_kb = default_congestion_kb();
/* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
err = -EINVAL; err = -EINVAL;
if (!dev_name) if (!dev_name)
goto out; goto out;
*path = strstr(dev_name, ":/"); *path = strstr(dev_name, ":/");
if (*path == NULL) { if (*path == NULL) {
pr_err("device name is missing path (no :/ in %s)\n", pr_err("device name is missing path (no :/ in %s)\n",
dev_name); dev_name);
goto out; goto out;
} }
dev_name_end = *path; dev_name_end = *path;
dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name); dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
...@@ -376,6 +382,8 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) ...@@ -376,6 +382,8 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
seq_printf(m, ",wsize=%d", fsopt->wsize); seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_RSIZE_DEFAULT) if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
seq_printf(m, ",rsize=%d", fsopt->rsize); seq_printf(m, ",rsize=%d", fsopt->rsize);
if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
seq_printf(m, ",rasize=%d", fsopt->rsize);
if (fsopt->congestion_kb != default_congestion_kb()) if (fsopt->congestion_kb != default_congestion_kb())
seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb); seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT) if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
...@@ -422,20 +430,23 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -422,20 +430,23 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
struct ceph_options *opt) struct ceph_options *opt)
{ {
struct ceph_fs_client *fsc; struct ceph_fs_client *fsc;
const unsigned supported_features =
CEPH_FEATURE_FLOCK |
CEPH_FEATURE_DIRLAYOUTHASH;
const unsigned required_features = 0;
int err = -ENOMEM; int err = -ENOMEM;
fsc = kzalloc(sizeof(*fsc), GFP_KERNEL); fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
if (!fsc) if (!fsc)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
fsc->client = ceph_create_client(opt, fsc); fsc->client = ceph_create_client(opt, fsc, supported_features,
required_features);
if (IS_ERR(fsc->client)) { if (IS_ERR(fsc->client)) {
err = PTR_ERR(fsc->client); err = PTR_ERR(fsc->client);
goto fail; goto fail;
} }
fsc->client->extra_mon_dispatch = extra_mon_dispatch; fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
CEPH_FEATURE_DIRLAYOUTHASH;
fsc->client->monc.want_mdsmap = 1; fsc->client->monc.want_mdsmap = 1;
fsc->mount_options = fsopt; fsc->mount_options = fsopt;
...@@ -774,10 +785,10 @@ static int ceph_register_bdi(struct super_block *sb, ...@@ -774,10 +785,10 @@ static int ceph_register_bdi(struct super_block *sb,
{ {
int err; int err;
/* set ra_pages based on rsize mount option? */ /* set ra_pages based on rasize mount option? */
if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) if (fsc->mount_options->rasize >= PAGE_CACHE_SIZE)
fsc->backing_dev_info.ra_pages = fsc->backing_dev_info.ra_pages =
(fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) (fsc->mount_options->rasize + PAGE_CACHE_SIZE - 1)
>> PAGE_SHIFT; >> PAGE_SHIFT;
else else
fsc->backing_dev_info.ra_pages = fsc->backing_dev_info.ra_pages =
......
...@@ -36,7 +36,8 @@ ...@@ -36,7 +36,8 @@
#define ceph_test_mount_opt(fsc, opt) \ #define ceph_test_mount_opt(fsc, opt) \
(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
#define CEPH_RSIZE_DEFAULT (512*1024) /* readahead */ #define CEPH_RSIZE_DEFAULT 0 /* max read size */
#define CEPH_RASIZE_DEFAULT (8192*1024) /* readahead */
#define CEPH_MAX_READDIR_DEFAULT 1024 #define CEPH_MAX_READDIR_DEFAULT 1024
#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) #define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
#define CEPH_SNAPDIRNAME_DEFAULT ".snap" #define CEPH_SNAPDIRNAME_DEFAULT ".snap"
...@@ -45,8 +46,9 @@ struct ceph_mount_options { ...@@ -45,8 +46,9 @@ struct ceph_mount_options {
int flags; int flags;
int sb_flags; int sb_flags;
int wsize; int wsize; /* max write size */
int rsize; /* max readahead */ int rsize; /* max read size */
int rasize; /* max readahead */
int congestion_kb; /* max writeback in flight */ int congestion_kb; /* max writeback in flight */
int caps_wanted_delay_min, caps_wanted_delay_max; int caps_wanted_delay_min, caps_wanted_delay_max;
int cap_release_safety; int cap_release_safety;
...@@ -344,9 +346,10 @@ static inline struct ceph_vino ceph_vino(struct inode *inode) ...@@ -344,9 +346,10 @@ static inline struct ceph_vino ceph_vino(struct inode *inode)
* x86_64+ino32 64 32 * x86_64+ino32 64 32
* x86_64 64 64 * x86_64 64 64
*/ */
static inline u32 ceph_ino_to_ino32(ino_t ino) static inline u32 ceph_ino_to_ino32(__u64 vino)
{ {
ino ^= ino >> (sizeof(ino) * 8 - 32); u32 ino = vino & 0xffffffff;
ino ^= vino >> 32;
if (!ino) if (!ino)
ino = 1; ino = 1;
return ino; return ino;
...@@ -357,11 +360,11 @@ static inline u32 ceph_ino_to_ino32(ino_t ino) ...@@ -357,11 +360,11 @@ static inline u32 ceph_ino_to_ino32(ino_t ino)
*/ */
static inline ino_t ceph_vino_to_ino(struct ceph_vino vino) static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
{ {
ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */
#if BITS_PER_LONG == 32 #if BITS_PER_LONG == 32
ino = ceph_ino_to_ino32(ino); return ceph_ino_to_ino32(vino.ino);
#else
return (ino_t)vino.ino;
#endif #endif
return ino;
} }
/* /*
......
...@@ -215,7 +215,9 @@ extern void ceph_destroy_options(struct ceph_options *opt); ...@@ -215,7 +215,9 @@ extern void ceph_destroy_options(struct ceph_options *opt);
extern int ceph_compare_options(struct ceph_options *new_opt, extern int ceph_compare_options(struct ceph_options *new_opt,
struct ceph_client *client); struct ceph_client *client);
extern struct ceph_client *ceph_create_client(struct ceph_options *opt, extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
void *private); void *private,
unsigned supported_features,
unsigned required_features);
extern u64 ceph_client_id(struct ceph_client *client); extern u64 ceph_client_id(struct ceph_client *client);
extern void ceph_destroy_client(struct ceph_client *client); extern void ceph_destroy_client(struct ceph_client *client);
extern int __ceph_open_session(struct ceph_client *client, extern int __ceph_open_session(struct ceph_client *client,
......
...@@ -237,7 +237,8 @@ extern void ceph_con_keepalive(struct ceph_connection *con); ...@@ -237,7 +237,8 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con);
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags); extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail);
extern void ceph_msg_kfree(struct ceph_msg *m); extern void ceph_msg_kfree(struct ceph_msg *m);
......
...@@ -27,3 +27,17 @@ config CEPH_LIB_PRETTYDEBUG ...@@ -27,3 +27,17 @@ config CEPH_LIB_PRETTYDEBUG
If unsure, say N. If unsure, say N.
config CEPH_LIB_USE_DNS_RESOLVER
bool "Use in-kernel support for DNS lookup"
depends on CEPH_LIB
select DNS_RESOLVER
default n
help
If you say Y here, hostnames (e.g. monitor addresses) will
be resolved using the CONFIG_DNS_RESOLVER facility.
For information on how to use CONFIG_DNS_RESOLVER consult
Documentation/networking/dns_resolver.txt
If unsure, say N.
...@@ -432,9 +432,12 @@ EXPORT_SYMBOL(ceph_client_id); ...@@ -432,9 +432,12 @@ EXPORT_SYMBOL(ceph_client_id);
/* /*
* create a fresh client instance * create a fresh client instance
*/ */
struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
unsigned supported_features,
unsigned required_features)
{ {
struct ceph_client *client; struct ceph_client *client;
struct ceph_entity_addr *myaddr = NULL;
int err = -ENOMEM; int err = -ENOMEM;
client = kzalloc(sizeof(*client), GFP_KERNEL); client = kzalloc(sizeof(*client), GFP_KERNEL);
...@@ -449,15 +452,27 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) ...@@ -449,15 +452,27 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
client->auth_err = 0; client->auth_err = 0;
client->extra_mon_dispatch = NULL; client->extra_mon_dispatch = NULL;
client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT; client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT |
client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT; supported_features;
client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT |
client->msgr = NULL; required_features;
/* msgr */
if (ceph_test_opt(client, MYIP))
myaddr = &client->options->my_addr;
client->msgr = ceph_messenger_create(myaddr,
client->supported_features,
client->required_features);
if (IS_ERR(client->msgr)) {
err = PTR_ERR(client->msgr);
goto fail;
}
client->msgr->nocrc = ceph_test_opt(client, NOCRC);
/* subsystems */ /* subsystems */
err = ceph_monc_init(&client->monc, client); err = ceph_monc_init(&client->monc, client);
if (err < 0) if (err < 0)
goto fail; goto fail_msgr;
err = ceph_osdc_init(&client->osdc, client); err = ceph_osdc_init(&client->osdc, client);
if (err < 0) if (err < 0)
goto fail_monc; goto fail_monc;
...@@ -466,6 +481,8 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) ...@@ -466,6 +481,8 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
fail_monc: fail_monc:
ceph_monc_stop(&client->monc); ceph_monc_stop(&client->monc);
fail_msgr:
ceph_messenger_destroy(client->msgr);
fail: fail:
kfree(client); kfree(client);
return ERR_PTR(err); return ERR_PTR(err);
...@@ -490,8 +507,7 @@ void ceph_destroy_client(struct ceph_client *client) ...@@ -490,8 +507,7 @@ void ceph_destroy_client(struct ceph_client *client)
ceph_debugfs_client_cleanup(client); ceph_debugfs_client_cleanup(client);
if (client->msgr) ceph_messenger_destroy(client->msgr);
ceph_messenger_destroy(client->msgr);
ceph_destroy_options(client->options); ceph_destroy_options(client->options);
...@@ -514,24 +530,9 @@ static int have_mon_and_osd_map(struct ceph_client *client) ...@@ -514,24 +530,9 @@ static int have_mon_and_osd_map(struct ceph_client *client)
*/ */
int __ceph_open_session(struct ceph_client *client, unsigned long started) int __ceph_open_session(struct ceph_client *client, unsigned long started)
{ {
struct ceph_entity_addr *myaddr = NULL;
int err; int err;
unsigned long timeout = client->options->mount_timeout * HZ; unsigned long timeout = client->options->mount_timeout * HZ;
/* initialize the messenger */
if (client->msgr == NULL) {
if (ceph_test_opt(client, MYIP))
myaddr = &client->options->my_addr;
client->msgr = ceph_messenger_create(myaddr,
client->supported_features,
client->required_features);
if (IS_ERR(client->msgr)) {
client->msgr = NULL;
return PTR_ERR(client->msgr);
}
client->msgr->nocrc = ceph_test_opt(client, NOCRC);
}
/* open session, and wait for mon and osd maps */ /* open session, and wait for mon and osd maps */
err = ceph_monc_open_session(&client->monc); err = ceph_monc_open_session(&client->monc);
if (err < 0) if (err < 0)
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <linux/string.h> #include <linux/string.h>
#include <linux/bio.h> #include <linux/bio.h>
#include <linux/blkdev.h> #include <linux/blkdev.h>
#include <linux/dns_resolver.h>
#include <net/tcp.h> #include <net/tcp.h>
#include <linux/ceph/libceph.h> #include <linux/ceph/libceph.h>
...@@ -1077,6 +1078,101 @@ static void addr_set_port(struct sockaddr_storage *ss, int p) ...@@ -1077,6 +1078,101 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
} }
} }
/*
* Unlike other *_pton function semantics, zero indicates success.
*/
static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
char delim, const char **ipend)
{
struct sockaddr_in *in4 = (void *)ss;
struct sockaddr_in6 *in6 = (void *)ss;
memset(ss, 0, sizeof(*ss));
if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
ss->ss_family = AF_INET;
return 0;
}
if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
ss->ss_family = AF_INET6;
return 0;
}
return -EINVAL;
}
/*
* Extract hostname string and resolve using kernel DNS facility.
*/
#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
static int ceph_dns_resolve_name(const char *name, size_t namelen,
struct sockaddr_storage *ss, char delim, const char **ipend)
{
const char *end, *delim_p;
char *colon_p, *ip_addr = NULL;
int ip_len, ret;
/*
* The end of the hostname occurs immediately preceding the delimiter or
* the port marker (':') where the delimiter takes precedence.
*/
delim_p = memchr(name, delim, namelen);
colon_p = memchr(name, ':', namelen);
if (delim_p && colon_p)
end = delim_p < colon_p ? delim_p : colon_p;
else if (!delim_p && colon_p)
end = colon_p;
else {
end = delim_p;
if (!end) /* case: hostname:/ */
end = name + namelen;
}
if (end <= name)
return -EINVAL;
/* do dns_resolve upcall */
ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
if (ip_len > 0)
ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
else
ret = -ESRCH;
kfree(ip_addr);
*ipend = end;
pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
ret, ret ? "failed" : ceph_pr_addr(ss));
return ret;
}
#else
static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
struct sockaddr_storage *ss, char delim, const char **ipend)
{
return -EINVAL;
}
#endif
/*
* Parse a server name (IP or hostname). If a valid IP address is not found
* then try to extract a hostname to resolve using userspace DNS upcall.
*/
static int ceph_parse_server_name(const char *name, size_t namelen,
struct sockaddr_storage *ss, char delim, const char **ipend)
{
int ret;
ret = ceph_pton(name, namelen, ss, delim, ipend);
if (ret)
ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
return ret;
}
/* /*
* Parse an ip[:port] list into an addr array. Use the default * Parse an ip[:port] list into an addr array. Use the default
* monitor port if a port isn't specified. * monitor port if a port isn't specified.
...@@ -1085,15 +1181,13 @@ int ceph_parse_ips(const char *c, const char *end, ...@@ -1085,15 +1181,13 @@ int ceph_parse_ips(const char *c, const char *end,
struct ceph_entity_addr *addr, struct ceph_entity_addr *addr,
int max_count, int *count) int max_count, int *count)
{ {
int i; int i, ret = -EINVAL;
const char *p = c; const char *p = c;
dout("parse_ips on '%.*s'\n", (int)(end-c), c); dout("parse_ips on '%.*s'\n", (int)(end-c), c);
for (i = 0; i < max_count; i++) { for (i = 0; i < max_count; i++) {
const char *ipend; const char *ipend;
struct sockaddr_storage *ss = &addr[i].in_addr; struct sockaddr_storage *ss = &addr[i].in_addr;
struct sockaddr_in *in4 = (void *)ss;
struct sockaddr_in6 *in6 = (void *)ss;
int port; int port;
char delim = ','; char delim = ',';
...@@ -1102,15 +1196,11 @@ int ceph_parse_ips(const char *c, const char *end, ...@@ -1102,15 +1196,11 @@ int ceph_parse_ips(const char *c, const char *end,
p++; p++;
} }
memset(ss, 0, sizeof(*ss)); ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, if (ret)
delim, &ipend))
ss->ss_family = AF_INET;
else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
delim, &ipend))
ss->ss_family = AF_INET6;
else
goto bad; goto bad;
ret = -EINVAL;
p = ipend; p = ipend;
if (delim == ']') { if (delim == ']') {
...@@ -1155,7 +1245,7 @@ int ceph_parse_ips(const char *c, const char *end, ...@@ -1155,7 +1245,7 @@ int ceph_parse_ips(const char *c, const char *end,
bad: bad:
pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
return -EINVAL; return ret;
} }
EXPORT_SYMBOL(ceph_parse_ips); EXPORT_SYMBOL(ceph_parse_ips);
...@@ -2281,7 +2371,8 @@ EXPORT_SYMBOL(ceph_con_keepalive); ...@@ -2281,7 +2371,8 @@ EXPORT_SYMBOL(ceph_con_keepalive);
* construct a new message with given type, size * construct a new message with given type, size
* the new msg has a ref count of 1. * the new msg has a ref count of 1.
*/ */
struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail)
{ {
struct ceph_msg *m; struct ceph_msg *m;
...@@ -2333,7 +2424,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) ...@@ -2333,7 +2424,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->front.iov_base = kmalloc(front_len, flags); m->front.iov_base = kmalloc(front_len, flags);
} }
if (m->front.iov_base == NULL) { if (m->front.iov_base == NULL) {
pr_err("msg_new can't allocate %d bytes\n", dout("ceph_msg_new can't allocate %d bytes\n",
front_len); front_len);
goto out2; goto out2;
} }
...@@ -2348,7 +2439,14 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) ...@@ -2348,7 +2439,14 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
out2: out2:
ceph_msg_put(m); ceph_msg_put(m);
out: out:
pr_err("msg_new can't create type %d front %d\n", type, front_len); if (!can_fail) {
pr_err("msg_new can't create type %d front %d\n", type,
front_len);
WARN_ON(1);
} else {
dout("msg_new can't create type %d front %d\n", type,
front_len);
}
return NULL; return NULL;
} }
EXPORT_SYMBOL(ceph_msg_new); EXPORT_SYMBOL(ceph_msg_new);
...@@ -2398,7 +2496,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, ...@@ -2398,7 +2496,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
} }
if (!msg) { if (!msg) {
*skip = 0; *skip = 0;
msg = ceph_msg_new(type, front_len, GFP_NOFS); msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
if (!msg) { if (!msg) {
pr_err("unable to allocate msg type %d len %d\n", pr_err("unable to allocate msg type %d len %d\n",
type, front_len); type, front_len);
......
...@@ -116,14 +116,12 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) ...@@ -116,14 +116,12 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
*/ */
static void __close_session(struct ceph_mon_client *monc) static void __close_session(struct ceph_mon_client *monc)
{ {
if (monc->con) { dout("__close_session closing mon%d\n", monc->cur_mon);
dout("__close_session closing mon%d\n", monc->cur_mon); ceph_con_revoke(monc->con, monc->m_auth);
ceph_con_revoke(monc->con, monc->m_auth); ceph_con_close(monc->con);
ceph_con_close(monc->con); monc->cur_mon = -1;
monc->cur_mon = -1; monc->pending_auth = 0;
monc->pending_auth = 0; ceph_auth_reset(monc->auth);
ceph_auth_reset(monc->auth);
}
} }
/* /*
...@@ -302,15 +300,6 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) ...@@ -302,15 +300,6 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
*/ */
int ceph_monc_open_session(struct ceph_mon_client *monc) int ceph_monc_open_session(struct ceph_mon_client *monc)
{ {
if (!monc->con) {
monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
if (!monc->con)
return -ENOMEM;
ceph_con_init(monc->client->msgr, monc->con);
monc->con->private = monc;
monc->con->ops = &mon_con_ops;
}
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
__open_session(monc); __open_session(monc);
__schedule_delayed(monc); __schedule_delayed(monc);
...@@ -528,10 +517,12 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) ...@@ -528,10 +517,12 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
init_completion(&req->completion); init_completion(&req->completion);
err = -ENOMEM; err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
true);
if (!req->request) if (!req->request)
goto out; goto out;
req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
true);
if (!req->reply) if (!req->reply)
goto out; goto out;
...@@ -626,10 +617,12 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, ...@@ -626,10 +617,12 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
init_completion(&req->completion); init_completion(&req->completion);
err = -ENOMEM; err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
true);
if (!req->request) if (!req->request)
goto out; goto out;
req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
true);
if (!req->reply) if (!req->reply)
goto out; goto out;
...@@ -755,13 +748,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) ...@@ -755,13 +748,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
if (err) if (err)
goto out; goto out;
monc->con = NULL; /* connection */
monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
if (!monc->con)
goto out_monmap;
ceph_con_init(monc->client->msgr, monc->con);
monc->con->private = monc;
monc->con->ops = &mon_con_ops;
/* authentication */ /* authentication */
monc->auth = ceph_auth_init(cl->options->name, monc->auth = ceph_auth_init(cl->options->name,
cl->options->key); cl->options->key);
if (IS_ERR(monc->auth)) if (IS_ERR(monc->auth)) {
return PTR_ERR(monc->auth); err = PTR_ERR(monc->auth);
goto out_con;
}
monc->auth->want_keys = monc->auth->want_keys =
CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
...@@ -770,19 +771,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) ...@@ -770,19 +771,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
err = -ENOMEM; err = -ENOMEM;
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
sizeof(struct ceph_mon_subscribe_ack), sizeof(struct ceph_mon_subscribe_ack),
GFP_NOFS); GFP_NOFS, true);
if (!monc->m_subscribe_ack) if (!monc->m_subscribe_ack)
goto out_monmap; goto out_auth;
monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
true);
if (!monc->m_subscribe) if (!monc->m_subscribe)
goto out_subscribe_ack; goto out_subscribe_ack;
monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
true);
if (!monc->m_auth_reply) if (!monc->m_auth_reply)
goto out_subscribe; goto out_subscribe;
monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
monc->pending_auth = 0; monc->pending_auth = 0;
if (!monc->m_auth) if (!monc->m_auth)
goto out_auth_reply; goto out_auth_reply;
...@@ -808,6 +811,10 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) ...@@ -808,6 +811,10 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
ceph_msg_put(monc->m_subscribe); ceph_msg_put(monc->m_subscribe);
out_subscribe_ack: out_subscribe_ack:
ceph_msg_put(monc->m_subscribe_ack); ceph_msg_put(monc->m_subscribe_ack);
out_auth:
ceph_auth_destroy(monc->auth);
out_con:
monc->con->ops->put(monc->con);
out_monmap: out_monmap:
kfree(monc->monmap); kfree(monc->monmap);
out: out:
...@@ -822,11 +829,11 @@ void ceph_monc_stop(struct ceph_mon_client *monc) ...@@ -822,11 +829,11 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
__close_session(monc); __close_session(monc);
if (monc->con) {
monc->con->private = NULL; monc->con->private = NULL;
monc->con->ops->put(monc->con); monc->con->ops->put(monc->con);
monc->con = NULL; monc->con = NULL;
}
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
ceph_auth_destroy(monc->auth); ceph_auth_destroy(monc->auth);
...@@ -973,7 +980,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, ...@@ -973,7 +980,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_MAP: case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP: case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP: case CEPH_MSG_OSD_MAP:
m = ceph_msg_new(type, front_len, GFP_NOFS); m = ceph_msg_new(type, front_len, GFP_NOFS, false);
break; break;
} }
...@@ -1000,7 +1007,7 @@ static void mon_fault(struct ceph_connection *con) ...@@ -1000,7 +1007,7 @@ static void mon_fault(struct ceph_connection *con)
if (!con->private) if (!con->private)
goto out; goto out;
if (monc->con && !monc->hunting) if (!monc->hunting)
pr_info("mon%d %s session lost, " pr_info("mon%d %s session lost, "
"hunting for new mon\n", monc->cur_mon, "hunting for new mon\n", monc->cur_mon,
ceph_pr_addr(&monc->con->peer_addr.in_addr)); ceph_pr_addr(&monc->con->peer_addr.in_addr));
......
...@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg) ...@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
struct ceph_msgpool *pool = arg; struct ceph_msgpool *pool = arg;
struct ceph_msg *msg; struct ceph_msg *msg;
msg = ceph_msg_new(0, pool->front_len, gfp_mask); msg = ceph_msg_new(0, pool->front_len, gfp_mask, true);
if (!msg) { if (!msg) {
dout("msgpool_alloc %s failed\n", pool->name); dout("msgpool_alloc %s failed\n", pool->name);
} else { } else {
...@@ -61,7 +61,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, ...@@ -61,7 +61,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
WARN_ON(1); WARN_ON(1);
/* try to alloc a fresh message */ /* try to alloc a fresh message */
return ceph_msg_new(0, front_len, GFP_NOFS); return ceph_msg_new(0, front_len, GFP_NOFS, false);
} }
msg = mempool_alloc(pool->pool, GFP_NOFS); msg = mempool_alloc(pool->pool, GFP_NOFS);
......
...@@ -227,7 +227,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -227,7 +227,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else else
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
OSD_OPREPLY_FRONT_LEN, gfp_flags); OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
if (!msg) { if (!msg) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return NULL; return NULL;
...@@ -250,7 +250,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -250,7 +250,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
if (use_mempool) if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0); msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else else
msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags); msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
if (!msg) { if (!msg) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return NULL; return NULL;
...@@ -943,7 +943,7 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger); ...@@ -943,7 +943,7 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
* Caller should hold map_sem for read and request_mutex. * Caller should hold map_sem for read and request_mutex.
*/ */
static int __map_request(struct ceph_osd_client *osdc, static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req) struct ceph_osd_request *req, int force_resend)
{ {
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
struct ceph_pg pgid; struct ceph_pg pgid;
...@@ -967,7 +967,8 @@ static int __map_request(struct ceph_osd_client *osdc, ...@@ -967,7 +967,8 @@ static int __map_request(struct ceph_osd_client *osdc,
num = err; num = err;
} }
if ((req->r_osd && req->r_osd->o_osd == o && if ((!force_resend &&
req->r_osd && req->r_osd->o_osd == o &&
req->r_sent >= req->r_osd->o_incarnation && req->r_sent >= req->r_osd->o_incarnation &&
req->r_num_pg_osds == num && req->r_num_pg_osds == num &&
memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
...@@ -1289,18 +1290,18 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) ...@@ -1289,18 +1290,18 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
* *
* Caller should hold map_sem for read and request_mutex. * Caller should hold map_sem for read and request_mutex.
*/ */
static void kick_requests(struct ceph_osd_client *osdc) static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
{ {
struct ceph_osd_request *req, *nreq; struct ceph_osd_request *req, *nreq;
struct rb_node *p; struct rb_node *p;
int needmap = 0; int needmap = 0;
int err; int err;
dout("kick_requests\n"); dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node); req = rb_entry(p, struct ceph_osd_request, r_node);
err = __map_request(osdc, req); err = __map_request(osdc, req, force_resend);
if (err < 0) if (err < 0)
continue; /* error */ continue; /* error */
if (req->r_osd == NULL) { if (req->r_osd == NULL) {
...@@ -1318,7 +1319,7 @@ static void kick_requests(struct ceph_osd_client *osdc) ...@@ -1318,7 +1319,7 @@ static void kick_requests(struct ceph_osd_client *osdc)
r_linger_item) { r_linger_item) {
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
err = __map_request(osdc, req); err = __map_request(osdc, req, force_resend);
if (err == 0) if (err == 0)
continue; /* no change and no osd was specified */ continue; /* no change and no osd was specified */
if (err < 0) if (err < 0)
...@@ -1395,7 +1396,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1395,7 +1396,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_osdmap_destroy(osdc->osdmap); ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap; osdc->osdmap = newmap;
} }
kick_requests(osdc); kick_requests(osdc, 0);
reset_changed_osds(osdc); reset_changed_osds(osdc);
} else { } else {
dout("ignoring incremental map %u len %d\n", dout("ignoring incremental map %u len %d\n",
...@@ -1423,6 +1424,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1423,6 +1424,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
"older than our %u\n", epoch, maplen, "older than our %u\n", epoch, maplen,
osdc->osdmap->epoch); osdc->osdmap->epoch);
} else { } else {
int skipped_map = 0;
dout("taking full map %u len %d\n", epoch, maplen); dout("taking full map %u len %d\n", epoch, maplen);
newmap = osdmap_decode(&p, p+maplen); newmap = osdmap_decode(&p, p+maplen);
if (IS_ERR(newmap)) { if (IS_ERR(newmap)) {
...@@ -1432,9 +1435,12 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1432,9 +1435,12 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
BUG_ON(!newmap); BUG_ON(!newmap);
oldmap = osdc->osdmap; oldmap = osdc->osdmap;
osdc->osdmap = newmap; osdc->osdmap = newmap;
if (oldmap) if (oldmap) {
if (oldmap->epoch + 1 < newmap->epoch)
skipped_map = 1;
ceph_osdmap_destroy(oldmap); ceph_osdmap_destroy(oldmap);
kick_requests(osdc); }
kick_requests(osdc, skipped_map);
} }
p += maplen; p += maplen;
nr_maps--; nr_maps--;
...@@ -1707,7 +1713,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, ...@@ -1707,7 +1713,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
* the request still han't been touched yet. * the request still han't been touched yet.
*/ */
if (req->r_sent == 0) { if (req->r_sent == 0) {
rc = __map_request(osdc, req); rc = __map_request(osdc, req, 0);
if (rc < 0) { if (rc < 0) {
if (nofail) { if (nofail) {
dout("osdc_start_request failed map, " dout("osdc_start_request failed map, "
...@@ -2032,7 +2038,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2032,7 +2038,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
if (front > req->r_reply->front.iov_len) { if (front > req->r_reply->front.iov_len) {
pr_warning("get_reply front %d > preallocated %d\n", pr_warning("get_reply front %d > preallocated %d\n",
front, (int)req->r_reply->front.iov_len); front, (int)req->r_reply->front.iov_len);
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
if (!m) if (!m)
goto out; goto out;
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
...@@ -2080,7 +2086,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, ...@@ -2080,7 +2086,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
switch (type) { switch (type) {
case CEPH_MSG_OSD_MAP: case CEPH_MSG_OSD_MAP:
case CEPH_MSG_WATCH_NOTIFY: case CEPH_MSG_WATCH_NOTIFY:
return ceph_msg_new(type, front, GFP_NOFS); return ceph_msg_new(type, front, GFP_NOFS, false);
case CEPH_MSG_OSD_OPREPLY: case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip); return get_reply(con, hdr, skip);
default: default:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment