ceph: optimize cap flush waiting

Add a 'wake' flag to ceph_cap_flush struct, which indicates if there
is someone waiting for it to finish. When getting flush ack message,
we check the 'wake' flag in corresponding ceph_cap_flush struct to
decide if we should wake up waiters. One corner case is that the
acked cap flush has 'wake' flags is set, but it is not the first one
on the flushing list. We do not wake up waiters in this case, set
'wake' flags of preceding ceph_cap_flush struct instead

Signed-off-by: Yan, Zheng <zyan@redhat.com>
This commit is contained in:
Yan, Zheng 2016-07-07 15:22:38 +08:00 коммит произвёл Ilya Dryomov
Родитель ed9b430c9b
Коммит c8799fc467
3 изменённых файлов: 73 добавлений и 27 удалений

Просмотреть файл

@ -1473,6 +1473,37 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
return 0; return 0;
} }
/*
* Remove cap_flush from the mdsc's or inode's flushing cap list.
* Return true if caller needs to wake up flush waiters.
*/
static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci,
struct ceph_cap_flush *cf)
{
struct ceph_cap_flush *prev;
bool wake = cf->wake;
if (mdsc) {
/* are there older pending cap flushes? */
if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
prev = list_prev_entry(cf, g_list);
prev->wake = true;
wake = false;
}
list_del(&cf->g_list);
} else if (ci) {
if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
prev = list_prev_entry(cf, i_list);
prev->wake = true;
wake = false;
}
list_del(&cf->i_list);
} else {
BUG_ON(1);
}
return wake;
}
/* /*
* Add dirty inode to the flushing list. Assigned a seq number so we * Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving. * can wait for caps to flush without starving.
@ -1480,7 +1511,7 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
* Called under i_ceph_lock. * Called under i_ceph_lock.
*/ */
static int __mark_caps_flushing(struct inode *inode, static int __mark_caps_flushing(struct inode *inode,
struct ceph_mds_session *session, struct ceph_mds_session *session, bool wake,
u64 *flush_tid, u64 *oldest_flush_tid) u64 *flush_tid, u64 *oldest_flush_tid)
{ {
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
@ -1503,6 +1534,7 @@ static int __mark_caps_flushing(struct inode *inode,
swap(cf, ci->i_prealloc_cap_flush); swap(cf, ci->i_prealloc_cap_flush);
cf->caps = flushing; cf->caps = flushing;
cf->wake = wake;
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
list_del_init(&ci->i_dirty_item); list_del_init(&ci->i_dirty_item);
@ -1808,7 +1840,7 @@ ack:
} }
if (cap == ci->i_auth_cap && ci->i_dirty_caps) { if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
flushing = __mark_caps_flushing(inode, session, flushing = __mark_caps_flushing(inode, session, false,
&flush_tid, &flush_tid,
&oldest_flush_tid); &oldest_flush_tid);
} else { } else {
@ -1885,8 +1917,8 @@ retry:
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
goto out; goto out;
flushing = __mark_caps_flushing(inode, session, &flush_tid, flushing = __mark_caps_flushing(inode, session, true,
&oldest_flush_tid); &flush_tid, &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */ /* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
@ -1902,7 +1934,8 @@ retry:
if (!list_empty(&ci->i_cap_flush_list)) { if (!list_empty(&ci->i_cap_flush_list)) {
struct ceph_cap_flush *cf = struct ceph_cap_flush *cf =
list_last_entry(&ci->i_cap_flush_list, list_last_entry(&ci->i_cap_flush_list,
struct ceph_cap_flush, i_list); struct ceph_cap_flush, i_list);
cf->wake = true;
flush_tid = cf->tid; flush_tid = cf->tid;
} }
flushing = ci->i_flushing_caps; flushing = ci->i_flushing_caps;
@ -3022,7 +3055,9 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
unsigned seq = le32_to_cpu(m->seq); unsigned seq = le32_to_cpu(m->seq);
int dirty = le32_to_cpu(m->dirty); int dirty = le32_to_cpu(m->dirty);
int cleaned = 0; int cleaned = 0;
int drop = 0; bool drop = false;
bool wake_ci = 0;
bool wake_mdsc = 0;
list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
if (cf->tid == flush_tid) if (cf->tid == flush_tid)
@ -3030,7 +3065,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
if (cf->caps == 0) /* capsnap */ if (cf->caps == 0) /* capsnap */
continue; continue;
if (cf->tid <= flush_tid) { if (cf->tid <= flush_tid) {
list_del(&cf->i_list); if (__finish_cap_flush(NULL, ci, cf))
wake_ci = true;
list_add_tail(&cf->i_list, &to_remove); list_add_tail(&cf->i_list, &to_remove);
} else { } else {
cleaned &= ~cf->caps; cleaned &= ~cf->caps;
@ -3052,14 +3088,9 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
if (!list_empty(&to_remove)) { list_for_each_entry(cf, &to_remove, i_list) {
u64 oldest_flush_tid; if (__finish_cap_flush(mdsc, NULL, cf))
list_for_each_entry(cf, &to_remove, i_list) wake_mdsc = true;
list_del(&cf->g_list);
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid)
wake_up_all(&mdsc->cap_flushing_wq);
} }
if (ci->i_flushing_caps == 0) { if (ci->i_flushing_caps == 0) {
@ -3079,7 +3110,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
if (ci->i_dirty_caps == 0) { if (ci->i_dirty_caps == 0) {
dout(" inode %p now clean\n", inode); dout(" inode %p now clean\n", inode);
BUG_ON(!list_empty(&ci->i_dirty_item)); BUG_ON(!list_empty(&ci->i_dirty_item));
drop = 1; drop = true;
if (ci->i_wr_ref == 0 && if (ci->i_wr_ref == 0 &&
ci->i_wrbuffer_ref_head == 0) { ci->i_wrbuffer_ref_head == 0) {
BUG_ON(!ci->i_head_snapc); BUG_ON(!ci->i_head_snapc);
@ -3091,7 +3122,6 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
} }
} }
spin_unlock(&mdsc->cap_dirty_lock); spin_unlock(&mdsc->cap_dirty_lock);
wake_up_all(&ci->i_cap_wq);
out: out:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
@ -3102,6 +3132,11 @@ out:
list_del(&cf->i_list); list_del(&cf->i_list);
ceph_free_cap_flush(cf); ceph_free_cap_flush(cf);
} }
if (wake_ci)
wake_up_all(&ci->i_cap_wq);
if (wake_mdsc)
wake_up_all(&mdsc->cap_flushing_wq);
if (drop) if (drop)
iput(inode); iput(inode);
} }
@ -3120,7 +3155,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
u64 follows = le64_to_cpu(m->snap_follows); u64 follows = le64_to_cpu(m->snap_follows);
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
int flushed = 0; bool flushed = false;
bool wake_ci = false;
bool wake_mdsc = false;
dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
inode, ci, session->s_mds, follows); inode, ci, session->s_mds, follows);
@ -3134,7 +3171,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
flush_tid, capsnap->cap_flush.tid); flush_tid, capsnap->cap_flush.tid);
break; break;
} }
flushed = 1; flushed = true;
break; break;
} else { } else {
dout(" skipping cap_snap %p follows %lld\n", dout(" skipping cap_snap %p follows %lld\n",
@ -3142,31 +3179,31 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
} }
} }
if (flushed) { if (flushed) {
u64 oldest_flush_tid;
WARN_ON(capsnap->dirty_pages || capsnap->writing); WARN_ON(capsnap->dirty_pages || capsnap->writing);
dout(" removing %p cap_snap %p follows %lld\n", dout(" removing %p cap_snap %p follows %lld\n",
inode, capsnap, follows); inode, capsnap, follows);
list_del(&capsnap->ci_item); list_del(&capsnap->ci_item);
list_del(&capsnap->cap_flush.i_list); if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
wake_ci = true;
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
if (list_empty(&ci->i_cap_flush_list)) if (list_empty(&ci->i_cap_flush_list))
list_del_init(&ci->i_flushing_item); list_del_init(&ci->i_flushing_item);
list_del(&capsnap->cap_flush.g_list); if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
wake_mdsc = true;
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid)
wake_up_all(&mdsc->cap_flushing_wq);
spin_unlock(&mdsc->cap_dirty_lock); spin_unlock(&mdsc->cap_dirty_lock);
wake_up_all(&ci->i_cap_wq);
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (flushed) { if (flushed) {
ceph_put_snap_context(capsnap->context); ceph_put_snap_context(capsnap->context);
ceph_put_cap_snap(capsnap); ceph_put_cap_snap(capsnap);
if (wake_ci)
wake_up_all(&ci->i_cap_wq);
if (wake_mdsc)
wake_up_all(&mdsc->cap_flushing_wq);
iput(inode); iput(inode);
} }
} }

Просмотреть файл

@ -1212,6 +1212,8 @@ static void remove_session_caps(struct ceph_mds_session *session)
dout("remove_session_caps on %p\n", session); dout("remove_session_caps on %p\n", session);
iterate_session_caps(session, remove_session_caps_cb, fsc); iterate_session_caps(session, remove_session_caps_cb, fsc);
wake_up_all(&fsc->mdsc->cap_flushing_wq);
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
if (session->s_nr_caps > 0) { if (session->s_nr_caps > 0) {
struct inode *inode; struct inode *inode;
@ -3536,6 +3538,12 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
ceph_flush_dirty_caps(mdsc); ceph_flush_dirty_caps(mdsc);
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
want_flush = mdsc->last_cap_flush_tid; want_flush = mdsc->last_cap_flush_tid;
if (!list_empty(&mdsc->cap_flush_list)) {
struct ceph_cap_flush *cf =
list_last_entry(&mdsc->cap_flush_list,
struct ceph_cap_flush, g_list);
cf->wake = true;
}
spin_unlock(&mdsc->cap_dirty_lock); spin_unlock(&mdsc->cap_dirty_lock);
dout("sync want tid %lld flush_seq %lld\n", dout("sync want tid %lld flush_seq %lld\n",

Просмотреть файл

@ -150,6 +150,7 @@ struct ceph_cap {
struct ceph_cap_flush { struct ceph_cap_flush {
u64 tid; u64 tid;
int caps; /* 0 means capsnap */ int caps; /* 0 means capsnap */
bool wake; /* wake up flush waiters when finish ? */
struct list_head g_list; // global struct list_head g_list; // global
struct list_head i_list; // per inode struct list_head i_list; // per inode
}; };