Commit 3a03c67d authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.14-rc6' of git://github.com/ceph/ceph-client

Pull ceph fixes from Ilya Dryomov:
 "A patch to avoid a soft lockup in ceph_check_delayed_caps() from Luis
  and a reference handling fix from Jeff that should address some memory
  corruption reports in the snaprealm area.

  Both marked for stable"

* tag 'ceph-for-5.14-rc6' of git://github.com/ceph/ceph-client:
  ceph: take snap_empty_lock atomically with snaprealm refcount change
  ceph: reduce contention in ceph_check_delayed_caps()
parents 82cce5f4 8434ffe7
...@@ -4150,11 +4150,19 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -4150,11 +4150,19 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* /*
* Delayed work handler to process end of delayed cap release LRU list. * Delayed work handler to process end of delayed cap release LRU list.
*
* If new caps are added to the list while processing it, these won't get
* processed in this run. In this case, the ci->i_hold_caps_max will be
* returned so that the work can be scheduled accordingly.
*/ */
void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
{ {
struct inode *inode; struct inode *inode;
struct ceph_inode_info *ci; struct ceph_inode_info *ci;
struct ceph_mount_options *opt = mdsc->fsc->mount_options;
unsigned long delay_max = opt->caps_wanted_delay_max * HZ;
unsigned long loop_start = jiffies;
unsigned long delay = 0;
dout("check_delayed_caps\n"); dout("check_delayed_caps\n");
spin_lock(&mdsc->cap_delay_lock); spin_lock(&mdsc->cap_delay_lock);
...@@ -4162,6 +4170,11 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) ...@@ -4162,6 +4170,11 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
ci = list_first_entry(&mdsc->cap_delay_list, ci = list_first_entry(&mdsc->cap_delay_list,
struct ceph_inode_info, struct ceph_inode_info,
i_cap_delay_list); i_cap_delay_list);
if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
dout("%s caps added recently. Exiting loop", __func__);
delay = ci->i_hold_caps_max;
break;
}
if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 && if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
time_before(jiffies, ci->i_hold_caps_max)) time_before(jiffies, ci->i_hold_caps_max))
break; break;
...@@ -4177,6 +4190,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) ...@@ -4177,6 +4190,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
} }
} }
spin_unlock(&mdsc->cap_delay_lock); spin_unlock(&mdsc->cap_delay_lock);
return delay;
} }
/* /*
......
...@@ -4490,22 +4490,29 @@ void inc_session_sequence(struct ceph_mds_session *s) ...@@ -4490,22 +4490,29 @@ void inc_session_sequence(struct ceph_mds_session *s)
} }
/* /*
* delayed work -- periodically trim expired leases, renew caps with mds * delayed work -- periodically trim expired leases, renew caps with mds. If
* the @delay parameter is set to 0 or if it's more than 5 secs, the default
* workqueue delay value of 5 secs will be used.
*/ */
static void schedule_delayed(struct ceph_mds_client *mdsc) static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
{ {
int delay = 5; unsigned long max_delay = HZ * 5;
unsigned hz = round_jiffies_relative(HZ * delay);
schedule_delayed_work(&mdsc->delayed_work, hz); /* 5 secs default delay */
if (!delay || (delay > max_delay))
delay = max_delay;
schedule_delayed_work(&mdsc->delayed_work,
round_jiffies_relative(delay));
} }
static void delayed_work(struct work_struct *work) static void delayed_work(struct work_struct *work)
{ {
int i;
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc =
container_of(work, struct ceph_mds_client, delayed_work.work); container_of(work, struct ceph_mds_client, delayed_work.work);
unsigned long delay;
int renew_interval; int renew_interval;
int renew_caps; int renew_caps;
int i;
dout("mdsc delayed_work\n"); dout("mdsc delayed_work\n");
...@@ -4545,7 +4552,7 @@ static void delayed_work(struct work_struct *work) ...@@ -4545,7 +4552,7 @@ static void delayed_work(struct work_struct *work)
} }
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
ceph_check_delayed_caps(mdsc); delay = ceph_check_delayed_caps(mdsc);
ceph_queue_cap_reclaim_work(mdsc); ceph_queue_cap_reclaim_work(mdsc);
...@@ -4553,7 +4560,7 @@ static void delayed_work(struct work_struct *work) ...@@ -4553,7 +4560,7 @@ static void delayed_work(struct work_struct *work)
maybe_recover_session(mdsc); maybe_recover_session(mdsc);
schedule_delayed(mdsc); schedule_delayed(mdsc, delay);
} }
int ceph_mdsc_init(struct ceph_fs_client *fsc) int ceph_mdsc_init(struct ceph_fs_client *fsc)
...@@ -5030,7 +5037,7 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) ...@@ -5030,7 +5037,7 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
mdsc->mdsmap->m_epoch); mdsc->mdsmap->m_epoch);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
schedule_delayed(mdsc); schedule_delayed(mdsc, 0);
return; return;
bad_unlock: bad_unlock:
......
...@@ -67,19 +67,19 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc, ...@@ -67,19 +67,19 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
{ {
lockdep_assert_held(&mdsc->snap_rwsem); lockdep_assert_held(&mdsc->snap_rwsem);
dout("get_realm %p %d -> %d\n", realm,
atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
/* /*
* since we _only_ increment realm refs or empty the empty * The 0->1 and 1->0 transitions must take the snap_empty_lock
* list with snap_rwsem held, adjusting the empty list here is * atomically with the refcount change. Go ahead and bump the
* safe. we do need to protect against concurrent empty list * nref here, unless it's 0, in which case we take the spinlock
* additions, however. * and then do the increment and remove it from the list.
*/ */
if (atomic_inc_return(&realm->nref) == 1) { if (atomic_inc_not_zero(&realm->nref))
spin_lock(&mdsc->snap_empty_lock); return;
spin_lock(&mdsc->snap_empty_lock);
if (atomic_inc_return(&realm->nref) == 1)
list_del_init(&realm->empty_item); list_del_init(&realm->empty_item);
spin_unlock(&mdsc->snap_empty_lock); spin_unlock(&mdsc->snap_empty_lock);
}
} }
static void __insert_snap_realm(struct rb_root *root, static void __insert_snap_realm(struct rb_root *root,
...@@ -208,28 +208,28 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc, ...@@ -208,28 +208,28 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc,
{ {
lockdep_assert_held_write(&mdsc->snap_rwsem); lockdep_assert_held_write(&mdsc->snap_rwsem);
dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm, /*
atomic_read(&realm->nref), atomic_read(&realm->nref)-1); * We do not require the snap_empty_lock here, as any caller that
* increments the value must hold the snap_rwsem.
*/
if (atomic_dec_and_test(&realm->nref)) if (atomic_dec_and_test(&realm->nref))
__destroy_snap_realm(mdsc, realm); __destroy_snap_realm(mdsc, realm);
} }
/* /*
* caller needn't hold any locks * See comments in ceph_get_snap_realm. Caller needn't hold any locks.
*/ */
void ceph_put_snap_realm(struct ceph_mds_client *mdsc, void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm) struct ceph_snap_realm *realm)
{ {
dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm, if (!atomic_dec_and_lock(&realm->nref, &mdsc->snap_empty_lock))
atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
if (!atomic_dec_and_test(&realm->nref))
return; return;
if (down_write_trylock(&mdsc->snap_rwsem)) { if (down_write_trylock(&mdsc->snap_rwsem)) {
spin_unlock(&mdsc->snap_empty_lock);
__destroy_snap_realm(mdsc, realm); __destroy_snap_realm(mdsc, realm);
up_write(&mdsc->snap_rwsem); up_write(&mdsc->snap_rwsem);
} else { } else {
spin_lock(&mdsc->snap_empty_lock);
list_add(&realm->empty_item, &mdsc->snap_empty); list_add(&realm->empty_item, &mdsc->snap_empty);
spin_unlock(&mdsc->snap_empty_lock); spin_unlock(&mdsc->snap_empty_lock);
} }
......
...@@ -1167,7 +1167,7 @@ extern void ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1167,7 +1167,7 @@ extern void ceph_flush_snaps(struct ceph_inode_info *ci,
extern bool __ceph_should_report_size(struct ceph_inode_info *ci); extern bool __ceph_should_report_size(struct ceph_inode_info *ci);
extern void ceph_check_caps(struct ceph_inode_info *ci, int flags, extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session); struct ceph_mds_session *session);
extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); extern unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
extern void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc); extern void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc);
extern int ceph_drop_caps_for_unlink(struct inode *inode); extern int ceph_drop_caps_for_unlink(struct inode *inode);
extern int ceph_encode_inode_release(void **p, struct inode *inode, extern int ceph_encode_inode_release(void **p, struct inode *inode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment