Commit 96e35b40 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: use separate class for ceph sockets' sk_lock
  ceph: reserve one more caps space when doing readdir
  ceph: queue_cap_snap should always queue dirty context
  ceph: fix dentry reference leak in dcache readdir
  ceph: decode v5 of osdmap (pool names) [protocol change]
  ceph: fix ack counter reset on connection reset
  ceph: fix leaked inode ref due to snap metadata writeback race
  ceph: fix snap context reference leaks
  ceph: allow writeback of snapped pages older than 'oldest' snapc
  ceph: fix dentry rehashing on virtual .snap dir
parents f5c07a2d a6a5349d
...@@ -337,16 +337,15 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, ...@@ -337,16 +337,15 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
/* /*
* Get ref for the oldest snapc for an inode with dirty data... that is, the * Get ref for the oldest snapc for an inode with dirty data... that is, the
* only snap context we are allowed to write back. * only snap context we are allowed to write back.
*
* Caller holds i_lock.
*/ */
static struct ceph_snap_context *__get_oldest_context(struct inode *inode, static struct ceph_snap_context *get_oldest_context(struct inode *inode,
u64 *snap_size) u64 *snap_size)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc = NULL; struct ceph_snap_context *snapc = NULL;
struct ceph_cap_snap *capsnap = NULL; struct ceph_cap_snap *capsnap = NULL;
spin_lock(&inode->i_lock);
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap, dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
capsnap->context, capsnap->dirty_pages); capsnap->context, capsnap->dirty_pages);
...@@ -357,21 +356,11 @@ static struct ceph_snap_context *__get_oldest_context(struct inode *inode, ...@@ -357,21 +356,11 @@ static struct ceph_snap_context *__get_oldest_context(struct inode *inode,
break; break;
} }
} }
if (!snapc && ci->i_snap_realm) { if (!snapc && ci->i_head_snapc) {
snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); snapc = ceph_get_snap_context(ci->i_head_snapc);
dout(" head snapc %p has %d dirty pages\n", dout(" head snapc %p has %d dirty pages\n",
snapc, ci->i_wrbuffer_ref_head); snapc, ci->i_wrbuffer_ref_head);
} }
return snapc;
}
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
u64 *snap_size)
{
struct ceph_snap_context *snapc = NULL;
spin_lock(&inode->i_lock);
snapc = __get_oldest_context(inode, snap_size);
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
return snapc; return snapc;
} }
...@@ -392,7 +381,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -392,7 +381,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
int len = PAGE_CACHE_SIZE; int len = PAGE_CACHE_SIZE;
loff_t i_size; loff_t i_size;
int err = 0; int err = 0;
struct ceph_snap_context *snapc; struct ceph_snap_context *snapc, *oldest;
u64 snap_size = 0; u64 snap_size = 0;
long writeback_stat; long writeback_stat;
...@@ -413,13 +402,16 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -413,13 +402,16 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p not dirty?\n", inode, page); dout("writepage %p page %p not dirty?\n", inode, page);
goto out; goto out;
} }
if (snapc != get_oldest_context(inode, &snap_size)) { oldest = get_oldest_context(inode, &snap_size);
if (snapc->seq > oldest->seq) {
dout("writepage %p page %p snapc %p not writeable - noop\n", dout("writepage %p page %p snapc %p not writeable - noop\n",
inode, page, (void *)page->private); inode, page, (void *)page->private);
/* we should only noop if called by kswapd */ /* we should only noop if called by kswapd */
WARN_ON((current->flags & PF_MEMALLOC) == 0); WARN_ON((current->flags & PF_MEMALLOC) == 0);
ceph_put_snap_context(oldest);
goto out; goto out;
} }
ceph_put_snap_context(oldest);
/* is this a partial page at end of file? */ /* is this a partial page at end of file? */
if (snap_size) if (snap_size)
...@@ -458,7 +450,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -458,7 +450,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
ClearPagePrivate(page); ClearPagePrivate(page);
end_page_writeback(page); end_page_writeback(page);
ceph_put_wrbuffer_cap_refs(ci, 1, snapc); ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
ceph_put_snap_context(snapc); ceph_put_snap_context(snapc); /* page's reference */
out: out:
return err; return err;
} }
...@@ -558,9 +550,9 @@ static void writepages_finish(struct ceph_osd_request *req, ...@@ -558,9 +550,9 @@ static void writepages_finish(struct ceph_osd_request *req,
dout("inode %p skipping page %p\n", inode, page); dout("inode %p skipping page %p\n", inode, page);
wbc->pages_skipped++; wbc->pages_skipped++;
} }
ceph_put_snap_context((void *)page->private);
page->private = 0; page->private = 0;
ClearPagePrivate(page); ClearPagePrivate(page);
ceph_put_snap_context(snapc);
dout("unlocking %d %p\n", i, page); dout("unlocking %d %p\n", i, page);
end_page_writeback(page); end_page_writeback(page);
...@@ -618,7 +610,7 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -618,7 +610,7 @@ static int ceph_writepages_start(struct address_space *mapping,
int range_whole = 0; int range_whole = 0;
int should_loop = 1; int should_loop = 1;
pgoff_t max_pages = 0, max_pages_ever = 0; pgoff_t max_pages = 0, max_pages_ever = 0;
struct ceph_snap_context *snapc = NULL, *last_snapc = NULL; struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
struct pagevec pvec; struct pagevec pvec;
int done = 0; int done = 0;
int rc = 0; int rc = 0;
...@@ -770,9 +762,10 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -770,9 +762,10 @@ static int ceph_writepages_start(struct address_space *mapping,
} }
/* only if matching snap context */ /* only if matching snap context */
if (snapc != (void *)page->private) { pgsnapc = (void *)page->private;
dout("page snapc %p != oldest %p\n", if (pgsnapc->seq > snapc->seq) {
(void *)page->private, snapc); dout("page snapc %p %lld > oldest %p %lld\n",
pgsnapc, pgsnapc->seq, snapc, snapc->seq);
unlock_page(page); unlock_page(page);
if (!locked_pages) if (!locked_pages)
continue; /* keep looking for snap */ continue; /* keep looking for snap */
...@@ -914,7 +907,10 @@ static int context_is_writeable_or_written(struct inode *inode, ...@@ -914,7 +907,10 @@ static int context_is_writeable_or_written(struct inode *inode,
struct ceph_snap_context *snapc) struct ceph_snap_context *snapc)
{ {
struct ceph_snap_context *oldest = get_oldest_context(inode, NULL); struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
return !oldest || snapc->seq <= oldest->seq; int ret = !oldest || snapc->seq <= oldest->seq;
ceph_put_snap_context(oldest);
return ret;
} }
/* /*
...@@ -936,8 +932,8 @@ static int ceph_update_writeable_page(struct file *file, ...@@ -936,8 +932,8 @@ static int ceph_update_writeable_page(struct file *file,
int pos_in_page = pos & ~PAGE_CACHE_MASK; int pos_in_page = pos & ~PAGE_CACHE_MASK;
int end_in_page = pos_in_page + len; int end_in_page = pos_in_page + len;
loff_t i_size; loff_t i_size;
struct ceph_snap_context *snapc;
int r; int r;
struct ceph_snap_context *snapc, *oldest;
retry_locked: retry_locked:
/* writepages currently holds page lock, but if we change that later, */ /* writepages currently holds page lock, but if we change that later, */
...@@ -947,23 +943,24 @@ static int ceph_update_writeable_page(struct file *file, ...@@ -947,23 +943,24 @@ static int ceph_update_writeable_page(struct file *file,
BUG_ON(!ci->i_snap_realm); BUG_ON(!ci->i_snap_realm);
down_read(&mdsc->snap_rwsem); down_read(&mdsc->snap_rwsem);
BUG_ON(!ci->i_snap_realm->cached_context); BUG_ON(!ci->i_snap_realm->cached_context);
if (page->private && snapc = (void *)page->private;
(void *)page->private != ci->i_snap_realm->cached_context) { if (snapc && snapc != ci->i_head_snapc) {
/* /*
* this page is already dirty in another (older) snap * this page is already dirty in another (older) snap
* context! is it writeable now? * context! is it writeable now?
*/ */
snapc = get_oldest_context(inode, NULL); oldest = get_oldest_context(inode, NULL);
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
if (snapc != (void *)page->private) { if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
dout(" page %p snapc %p not current or oldest\n", dout(" page %p snapc %p not current or oldest\n",
page, (void *)page->private); page, snapc);
/* /*
* queue for writeback, and wait for snapc to * queue for writeback, and wait for snapc to
* be writeable or written * be writeable or written
*/ */
snapc = ceph_get_snap_context((void *)page->private); snapc = ceph_get_snap_context(snapc);
unlock_page(page); unlock_page(page);
ceph_queue_writeback(inode); ceph_queue_writeback(inode);
r = wait_event_interruptible(ci->i_cap_wq, r = wait_event_interruptible(ci->i_cap_wq,
...@@ -973,6 +970,7 @@ static int ceph_update_writeable_page(struct file *file, ...@@ -973,6 +970,7 @@ static int ceph_update_writeable_page(struct file *file,
return r; return r;
return -EAGAIN; return -EAGAIN;
} }
ceph_put_snap_context(oldest);
/* yay, writeable, do it now (without dropping page lock) */ /* yay, writeable, do it now (without dropping page lock) */
dout(" page %p snapc %p not current, but oldest\n", dout(" page %p snapc %p not current, but oldest\n",
......
...@@ -1205,6 +1205,12 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1205,6 +1205,12 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
if (capsnap->dirty_pages || capsnap->writing) if (capsnap->dirty_pages || capsnap->writing)
continue; continue;
/*
* if cap writeback already occurred, we should have dropped
* the capsnap in ceph_put_wrbuffer_cap_refs.
*/
BUG_ON(capsnap->dirty == 0);
/* pick mds, take s_mutex */ /* pick mds, take s_mutex */
mds = __ceph_get_cap_mds(ci, &mseq); mds = __ceph_get_cap_mds(ci, &mseq);
if (session && session->s_mds != mds) { if (session && session->s_mds != mds) {
...@@ -2118,8 +2124,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ...@@ -2118,8 +2124,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
} }
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
dout("put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had), dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
last ? "last" : ""); last ? " last" : "", put ? " put" : "");
if (last && !flushsnaps) if (last && !flushsnaps)
ceph_check_caps(ci, 0, NULL); ceph_check_caps(ci, 0, NULL);
...@@ -2143,7 +2149,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -2143,7 +2149,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
int last = 0; int last = 0;
int last_snap = 0; int complete_capsnap = 0;
int drop_capsnap = 0;
int found = 0; int found = 0;
struct ceph_cap_snap *capsnap = NULL; struct ceph_cap_snap *capsnap = NULL;
...@@ -2166,19 +2173,32 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -2166,19 +2173,32 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->context == snapc) { if (capsnap->context == snapc) {
found = 1; found = 1;
capsnap->dirty_pages -= nr;
last_snap = !capsnap->dirty_pages;
break; break;
} }
} }
BUG_ON(!found); BUG_ON(!found);
capsnap->dirty_pages -= nr;
if (capsnap->dirty_pages == 0) {
complete_capsnap = 1;
if (capsnap->dirty == 0)
/* cap writeback completed before we created
* the cap_snap; no FLUSHSNAP is needed */
drop_capsnap = 1;
}
dout("put_wrbuffer_cap_refs on %p cap_snap %p " dout("put_wrbuffer_cap_refs on %p cap_snap %p "
" snap %lld %d/%d -> %d/%d %s%s\n", " snap %lld %d/%d -> %d/%d %s%s%s\n",
inode, capsnap, capsnap->context->seq, inode, capsnap, capsnap->context->seq,
ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
ci->i_wrbuffer_ref, capsnap->dirty_pages, ci->i_wrbuffer_ref, capsnap->dirty_pages,
last ? " (wrbuffer last)" : "", last ? " (wrbuffer last)" : "",
last_snap ? " (capsnap last)" : ""); complete_capsnap ? " (complete capsnap)" : "",
drop_capsnap ? " (drop capsnap)" : "");
if (drop_capsnap) {
ceph_put_snap_context(capsnap->context);
list_del(&capsnap->ci_item);
list_del(&capsnap->flushing_item);
ceph_put_cap_snap(capsnap);
}
} }
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
...@@ -2186,10 +2206,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -2186,10 +2206,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (last) { if (last) {
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
iput(inode); iput(inode);
} else if (last_snap) { } else if (complete_capsnap) {
ceph_flush_snaps(ci); ceph_flush_snaps(ci);
wake_up(&ci->i_cap_wq); wake_up(&ci->i_cap_wq);
} }
if (drop_capsnap)
iput(inode);
} }
/* /*
...@@ -2465,8 +2487,8 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, ...@@ -2465,8 +2487,8 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
break; break;
} }
WARN_ON(capsnap->dirty_pages || capsnap->writing); WARN_ON(capsnap->dirty_pages || capsnap->writing);
dout(" removing cap_snap %p follows %lld\n", dout(" removing %p cap_snap %p follows %lld\n",
capsnap, follows); inode, capsnap, follows);
ceph_put_snap_context(capsnap->context); ceph_put_snap_context(capsnap->context);
list_del(&capsnap->ci_item); list_del(&capsnap->ci_item);
list_del(&capsnap->flushing_item); list_del(&capsnap->flushing_item);
......
...@@ -171,11 +171,11 @@ static int __dcache_readdir(struct file *filp, ...@@ -171,11 +171,11 @@ static int __dcache_readdir(struct file *filp,
spin_lock(&inode->i_lock); spin_lock(&inode->i_lock);
spin_lock(&dcache_lock); spin_lock(&dcache_lock);
last = dentry;
if (err < 0) if (err < 0)
goto out_unlock; goto out_unlock;
last = dentry;
p = p->prev; p = p->prev;
filp->f_pos++; filp->f_pos++;
...@@ -312,7 +312,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) ...@@ -312,7 +312,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
req->r_readdir_offset = fi->next_offset; req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag); req->r_args.readdir.frag = cpu_to_le32(frag);
req->r_args.readdir.max_entries = cpu_to_le32(max_entries); req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
req->r_num_caps = max_entries; req->r_num_caps = max_entries + 1;
err = ceph_mdsc_do_request(mdsc, NULL, req); err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) { if (err < 0) {
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
...@@ -489,6 +489,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, ...@@ -489,6 +489,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
struct inode *inode = ceph_get_snapdir(parent); struct inode *inode = ceph_get_snapdir(parent);
dout("ENOENT on snapdir %p '%.*s', linking to snapdir %p\n", dout("ENOENT on snapdir %p '%.*s', linking to snapdir %p\n",
dentry, dentry->d_name.len, dentry->d_name.name, inode); dentry, dentry->d_name.len, dentry->d_name.name, inode);
BUG_ON(!d_unhashed(dentry));
d_add(dentry, inode); d_add(dentry, inode);
err = 0; err = 0;
} }
......
...@@ -886,6 +886,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -886,6 +886,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
struct inode *in = NULL; struct inode *in = NULL;
struct ceph_mds_reply_inode *ininfo; struct ceph_mds_reply_inode *ininfo;
struct ceph_vino vino; struct ceph_vino vino;
struct ceph_client *client = ceph_sb_to_client(sb);
int i = 0; int i = 0;
int err = 0; int err = 0;
...@@ -949,7 +950,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, ...@@ -949,7 +950,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
return err; return err;
} }
if (rinfo->head->is_dentry && !req->r_aborted) { /*
* ignore null lease/binding on snapdir ENOENT, or else we
* will have trouble splicing in the virtual snapdir later
*/
if (rinfo->head->is_dentry && !req->r_aborted &&
(rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
client->mount_args->snapdir_name,
req->r_dentry->d_name.len))) {
/* /*
* lookup link rename : null -> possibly existing inode * lookup link rename : null -> possibly existing inode
* mknod symlink mkdir : null -> new inode * mknod symlink mkdir : null -> new inode
......
...@@ -30,6 +30,10 @@ static char tag_msg = CEPH_MSGR_TAG_MSG; ...@@ -30,6 +30,10 @@ static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
#ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class;
#endif
static void queue_con(struct ceph_connection *con); static void queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *); static void con_work(struct work_struct *);
...@@ -228,6 +232,10 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) ...@@ -228,6 +232,10 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
con->sock = sock; con->sock = sock;
sock->sk->sk_allocation = GFP_NOFS; sock->sk->sk_allocation = GFP_NOFS;
#ifdef CONFIG_LOCKDEP
lockdep_set_class(&sock->sk->sk_lock, &socket_class);
#endif
set_sock_callbacks(sock, con); set_sock_callbacks(sock, con);
dout("connect %s\n", pr_addr(&con->peer_addr.in_addr)); dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
...@@ -333,6 +341,7 @@ static void reset_connection(struct ceph_connection *con) ...@@ -333,6 +341,7 @@ static void reset_connection(struct ceph_connection *con)
con->out_msg = NULL; con->out_msg = NULL;
} }
con->in_seq = 0; con->in_seq = 0;
con->in_seq_acked = 0;
} }
/* /*
......
...@@ -314,71 +314,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end) ...@@ -314,71 +314,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
return ERR_PTR(err); return ERR_PTR(err);
} }
/*
* osd map
*/
void ceph_osdmap_destroy(struct ceph_osdmap *map)
{
dout("osdmap_destroy %p\n", map);
if (map->crush)
crush_destroy(map->crush);
while (!RB_EMPTY_ROOT(&map->pg_temp)) {
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->pg_temp),
struct ceph_pg_mapping, node);
rb_erase(&pg->node, &map->pg_temp);
kfree(pg);
}
while (!RB_EMPTY_ROOT(&map->pg_pools)) {
struct ceph_pg_pool_info *pi =
rb_entry(rb_first(&map->pg_pools),
struct ceph_pg_pool_info, node);
rb_erase(&pi->node, &map->pg_pools);
kfree(pi);
}
kfree(map->osd_state);
kfree(map->osd_weight);
kfree(map->osd_addr);
kfree(map);
}
/*
* adjust max osd value. reallocate arrays.
*/
static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
{
u8 *state;
struct ceph_entity_addr *addr;
u32 *weight;
state = kcalloc(max, sizeof(*state), GFP_NOFS);
addr = kcalloc(max, sizeof(*addr), GFP_NOFS);
weight = kcalloc(max, sizeof(*weight), GFP_NOFS);
if (state == NULL || addr == NULL || weight == NULL) {
kfree(state);
kfree(addr);
kfree(weight);
return -ENOMEM;
}
/* copy old? */
if (map->osd_state) {
memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
kfree(map->osd_state);
kfree(map->osd_addr);
kfree(map->osd_weight);
}
map->osd_state = state;
map->osd_weight = weight;
map->osd_addr = addr;
map->max_osd = max;
return 0;
}
/* /*
* rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
* to a set of osds) * to a set of osds)
...@@ -482,6 +417,13 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) ...@@ -482,6 +417,13 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
return NULL; return NULL;
} }
static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
{
rb_erase(&pi->node, root);
kfree(pi->name);
kfree(pi);
}
void __decode_pool(void **p, struct ceph_pg_pool_info *pi) void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
{ {
ceph_decode_copy(p, &pi->v, sizeof(pi->v)); ceph_decode_copy(p, &pi->v, sizeof(pi->v));
...@@ -490,6 +432,98 @@ void __decode_pool(void **p, struct ceph_pg_pool_info *pi) ...@@ -490,6 +432,98 @@ void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
*p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
} }
static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
{
struct ceph_pg_pool_info *pi;
u32 num, len, pool;
ceph_decode_32_safe(p, end, num, bad);
dout(" %d pool names\n", num);
while (num--) {
ceph_decode_32_safe(p, end, pool, bad);
ceph_decode_32_safe(p, end, len, bad);
dout(" pool %d len %d\n", pool, len);
pi = __lookup_pg_pool(&map->pg_pools, pool);
if (pi) {
kfree(pi->name);
pi->name = kmalloc(len + 1, GFP_NOFS);
if (pi->name) {
memcpy(pi->name, *p, len);
pi->name[len] = '\0';
dout(" name is %s\n", pi->name);
}
}
*p += len;
}
return 0;
bad:
return -EINVAL;
}
/*
* osd map
*/
void ceph_osdmap_destroy(struct ceph_osdmap *map)
{
dout("osdmap_destroy %p\n", map);
if (map->crush)
crush_destroy(map->crush);
while (!RB_EMPTY_ROOT(&map->pg_temp)) {
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->pg_temp),
struct ceph_pg_mapping, node);
rb_erase(&pg->node, &map->pg_temp);
kfree(pg);
}
while (!RB_EMPTY_ROOT(&map->pg_pools)) {
struct ceph_pg_pool_info *pi =
rb_entry(rb_first(&map->pg_pools),
struct ceph_pg_pool_info, node);
__remove_pg_pool(&map->pg_pools, pi);
}
kfree(map->osd_state);
kfree(map->osd_weight);
kfree(map->osd_addr);
kfree(map);
}
/*
* adjust max osd value. reallocate arrays.
*/
static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
{
u8 *state;
struct ceph_entity_addr *addr;
u32 *weight;
state = kcalloc(max, sizeof(*state), GFP_NOFS);
addr = kcalloc(max, sizeof(*addr), GFP_NOFS);
weight = kcalloc(max, sizeof(*weight), GFP_NOFS);
if (state == NULL || addr == NULL || weight == NULL) {
kfree(state);
kfree(addr);
kfree(weight);
return -ENOMEM;
}
/* copy old? */
if (map->osd_state) {
memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
kfree(map->osd_state);
kfree(map->osd_addr);
kfree(map->osd_weight);
}
map->osd_state = state;
map->osd_weight = weight;
map->osd_addr = addr;
map->max_osd = max;
return 0;
}
/* /*
* decode a full map. * decode a full map.
*/ */
...@@ -526,7 +560,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ...@@ -526,7 +560,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
ceph_decode_32_safe(p, end, max, bad); ceph_decode_32_safe(p, end, max, bad);
while (max--) { while (max--) {
ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
pi = kmalloc(sizeof(*pi), GFP_NOFS); pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi) if (!pi)
goto bad; goto bad;
pi->id = ceph_decode_32(p); pi->id = ceph_decode_32(p);
...@@ -539,6 +573,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ...@@ -539,6 +573,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
__decode_pool(p, pi); __decode_pool(p, pi);
__insert_pg_pool(&map->pg_pools, pi); __insert_pg_pool(&map->pg_pools, pi);
} }
if (version >= 5 && __decode_pool_names(p, end, map) < 0)
goto bad;
ceph_decode_32_safe(p, end, map->pool_max, bad); ceph_decode_32_safe(p, end, map->pool_max, bad);
ceph_decode_32_safe(p, end, map->flags, bad); ceph_decode_32_safe(p, end, map->flags, bad);
...@@ -712,7 +750,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ...@@ -712,7 +750,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
} }
pi = __lookup_pg_pool(&map->pg_pools, pool); pi = __lookup_pg_pool(&map->pg_pools, pool);
if (!pi) { if (!pi) {
pi = kmalloc(sizeof(*pi), GFP_NOFS); pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi) { if (!pi) {
err = -ENOMEM; err = -ENOMEM;
goto bad; goto bad;
...@@ -722,6 +760,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ...@@ -722,6 +760,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
} }
__decode_pool(p, pi); __decode_pool(p, pi);
} }
if (version >= 5 && __decode_pool_names(p, end, map) < 0)
goto bad;
/* old_pool */ /* old_pool */
ceph_decode_32_safe(p, end, len, bad); ceph_decode_32_safe(p, end, len, bad);
...@@ -730,10 +770,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ...@@ -730,10 +770,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
ceph_decode_32_safe(p, end, pool, bad); ceph_decode_32_safe(p, end, pool, bad);
pi = __lookup_pg_pool(&map->pg_pools, pool); pi = __lookup_pg_pool(&map->pg_pools, pool);
if (pi) { if (pi)
rb_erase(&pi->node, &map->pg_pools); __remove_pg_pool(&map->pg_pools, pi);
kfree(pi);
}
} }
/* new_up */ /* new_up */
......
...@@ -23,6 +23,7 @@ struct ceph_pg_pool_info { ...@@ -23,6 +23,7 @@ struct ceph_pg_pool_info {
int id; int id;
struct ceph_pg_pool v; struct ceph_pg_pool v;
int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask; int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask;
char *name;
}; };
struct ceph_pg_mapping { struct ceph_pg_mapping {
......
...@@ -11,8 +11,10 @@ ...@@ -11,8 +11,10 @@
/* /*
* osdmap encoding versions * osdmap encoding versions
*/ */
#define CEPH_OSDMAP_INC_VERSION 4 #define CEPH_OSDMAP_INC_VERSION 5
#define CEPH_OSDMAP_VERSION 4 #define CEPH_OSDMAP_INC_VERSION_EXT 5
#define CEPH_OSDMAP_VERSION 5
#define CEPH_OSDMAP_VERSION_EXT 5
/* /*
* fs id * fs id
......
...@@ -431,8 +431,7 @@ static int dup_array(u64 **dst, __le64 *src, int num) ...@@ -431,8 +431,7 @@ static int dup_array(u64 **dst, __le64 *src, int num)
* Caller must hold snap_rwsem for read (i.e., the realm topology won't * Caller must hold snap_rwsem for read (i.e., the realm topology won't
* change). * change).
*/ */
void ceph_queue_cap_snap(struct ceph_inode_info *ci, void ceph_queue_cap_snap(struct ceph_inode_info *ci)
struct ceph_snap_context *snapc)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
...@@ -451,10 +450,11 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci, ...@@ -451,10 +450,11 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
as no new writes are allowed to start when pending, so any as no new writes are allowed to start when pending, so any
writes in progress now were started before the previous writes in progress now were started before the previous
cap_snap. lucky us. */ cap_snap. lucky us. */
dout("queue_cap_snap %p snapc %p seq %llu used %d" dout("queue_cap_snap %p already pending\n", inode);
" already pending\n", inode, snapc, snapc->seq, used);
kfree(capsnap); kfree(capsnap);
} else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) { } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
struct ceph_snap_context *snapc = ci->i_head_snapc;
igrab(inode); igrab(inode);
atomic_set(&capsnap->nref, 1); atomic_set(&capsnap->nref, 1);
...@@ -463,7 +463,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci, ...@@ -463,7 +463,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
INIT_LIST_HEAD(&capsnap->flushing_item); INIT_LIST_HEAD(&capsnap->flushing_item);
capsnap->follows = snapc->seq - 1; capsnap->follows = snapc->seq - 1;
capsnap->context = ceph_get_snap_context(snapc);
capsnap->issued = __ceph_caps_issued(ci, NULL); capsnap->issued = __ceph_caps_issued(ci, NULL);
capsnap->dirty = __ceph_caps_dirty(ci); capsnap->dirty = __ceph_caps_dirty(ci);
...@@ -480,7 +479,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci, ...@@ -480,7 +479,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
snapshot. */ snapshot. */
capsnap->dirty_pages = ci->i_wrbuffer_ref_head; capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
ci->i_wrbuffer_ref_head = 0; ci->i_wrbuffer_ref_head = 0;
ceph_put_snap_context(ci->i_head_snapc); capsnap->context = snapc;
ci->i_head_snapc = NULL; ci->i_head_snapc = NULL;
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
...@@ -522,15 +521,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, ...@@ -522,15 +521,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->ctime = inode->i_ctime; capsnap->ctime = inode->i_ctime;
capsnap->time_warp_seq = ci->i_time_warp_seq; capsnap->time_warp_seq = ci->i_time_warp_seq;
if (capsnap->dirty_pages) { if (capsnap->dirty_pages) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu " dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap, "still has %d dirty pages\n", inode, capsnap,
capsnap->context, capsnap->context->seq, capsnap->context, capsnap->context->seq,
capsnap->size, capsnap->dirty_pages); ceph_cap_string(capsnap->dirty), capsnap->size,
capsnap->dirty_pages);
return 0; return 0;
} }
dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n", dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context, inode, capsnap, capsnap->context,
capsnap->context->seq, capsnap->size); capsnap->context->seq, ceph_cap_string(capsnap->dirty),
capsnap->size);
spin_lock(&mdsc->snap_flush_lock); spin_lock(&mdsc->snap_flush_lock);
list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list); list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
...@@ -602,7 +603,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -602,7 +603,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
if (lastinode) if (lastinode)
iput(lastinode); iput(lastinode);
lastinode = inode; lastinode = inode;
ceph_queue_cap_snap(ci, realm->cached_context); ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
} }
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
...@@ -824,8 +825,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -824,8 +825,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
ceph_queue_cap_snap(ci, ceph_queue_cap_snap(ci);
ci->i_snap_realm->cached_context);
iput(inode); iput(inode);
continue; continue;
......
...@@ -715,8 +715,7 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m, ...@@ -715,8 +715,7 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m,
extern void ceph_handle_snap(struct ceph_mds_client *mdsc, extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_msg *msg); struct ceph_msg *msg);
extern void ceph_queue_cap_snap(struct ceph_inode_info *ci, extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
struct ceph_snap_context *snapc);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap); struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment