ocfs2/dlm: Migrate lockres with no locks if it has a reference

o2dlm was not migrating resources with zero locks because it assumed that that
resource would get purged by dlm_thread. However, some usage patterns involve
creating and dropping locks at a high rate leading to the migrate thread seeing
zero locks but the purge thread seeing an active reference. When this happens,
the dlm_thread cannot purge the resource and the migrate thread sees no reason
to migrate that resource. The spell is broken when the migrate thread catches
the resource with a lock.

The fix is to make the migrate thread also consider the reference map.

This usage pattern can be triggered by userspace on userdlm locks and flocks.

Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com>
Signed-off-by: Joel Becker <joel.becker@oracle.com>
This commit is contained in:
Sunil Mushran 2010-11-19 15:06:50 -08:00 коммит произвёл Joel Becker
Родитель 771f8bc71c
Коммит 388c4bcb4e
1 изменённых файлов: 27 добавлений и 13 удалений

Просмотреть файл

@ -2346,7 +2346,8 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
*/ */
static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm, static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, struct dlm_lock_resource *res,
int *numlocks) int *numlocks,
int *hasrefs)
{ {
int ret; int ret;
int i; int i;
@ -2356,6 +2357,9 @@ static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
assert_spin_locked(&res->spinlock); assert_spin_locked(&res->spinlock);
*numlocks = 0;
*hasrefs = 0;
ret = -EINVAL; ret = -EINVAL;
if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "cannot migrate lockres with unknown owner!\n"); mlog(0, "cannot migrate lockres with unknown owner!\n");
@ -2386,7 +2390,13 @@ static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
} }
*numlocks = count; *numlocks = count;
mlog(0, "migrateable lockres having %d locks\n", *numlocks);
count = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
if (count < O2NM_MAX_NODES)
*hasrefs = 1;
mlog(0, "%s: res %.*s, Migrateable, locks %d, refs %d\n", dlm->name,
res->lockname.len, res->lockname.name, *numlocks, *hasrefs);
leave: leave:
return ret; return ret;
@ -2408,7 +2418,7 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
const char *name; const char *name;
unsigned int namelen; unsigned int namelen;
int mle_added = 0; int mle_added = 0;
int numlocks; int numlocks, hasrefs;
int wake = 0; int wake = 0;
if (!dlm_grab(dlm)) if (!dlm_grab(dlm))
@ -2417,13 +2427,13 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
name = res->lockname.name; name = res->lockname.name;
namelen = res->lockname.len; namelen = res->lockname.len;
mlog(0, "migrating %.*s to %u\n", namelen, name, target); mlog(0, "%s: Migrating %.*s to %u\n", dlm->name, namelen, name, target);
/* /*
* ensure this lockres is a proper candidate for migration * ensure this lockres is a proper candidate for migration
*/ */
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
ret = dlm_is_lockres_migrateable(dlm, res, &numlocks); ret = dlm_is_lockres_migrateable(dlm, res, &numlocks, &hasrefs);
if (ret < 0) { if (ret < 0) {
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
goto leave; goto leave;
@ -2431,10 +2441,8 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
/* no work to do */ /* no work to do */
if (numlocks == 0) { if (numlocks == 0 && !hasrefs)
mlog(0, "no locks were found on this lockres! done!\n");
goto leave; goto leave;
}
/* /*
* preallocate up front * preallocate up front
@ -2459,14 +2467,14 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
* find a node to migrate the lockres to * find a node to migrate the lockres to
*/ */
mlog(0, "picking a migration node\n");
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
/* pick a new node */ /* pick a new node */
if (!test_bit(target, dlm->domain_map) || if (!test_bit(target, dlm->domain_map) ||
target >= O2NM_MAX_NODES) { target >= O2NM_MAX_NODES) {
target = dlm_pick_migration_target(dlm, res); target = dlm_pick_migration_target(dlm, res);
} }
mlog(0, "node %u chosen for migration\n", target); mlog(0, "%s: res %.*s, Node %u chosen for migration\n", dlm->name,
namelen, name, target);
if (target >= O2NM_MAX_NODES || if (target >= O2NM_MAX_NODES ||
!test_bit(target, dlm->domain_map)) { !test_bit(target, dlm->domain_map)) {
@ -2667,7 +2675,7 @@ int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
{ {
int ret; int ret;
int lock_dropped = 0; int lock_dropped = 0;
int numlocks; int numlocks, hasrefs;
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
if (res->owner != dlm->node_num) { if (res->owner != dlm->node_num) {
@ -2681,8 +2689,8 @@ int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
} }
/* No need to migrate a lockres having no locks */ /* No need to migrate a lockres having no locks */
ret = dlm_is_lockres_migrateable(dlm, res, &numlocks); ret = dlm_is_lockres_migrateable(dlm, res, &numlocks, &hasrefs);
if (ret >= 0 && numlocks == 0) { if (ret >= 0 && numlocks == 0 && !hasrefs) {
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
goto leave; goto leave;
} }
@ -2915,6 +2923,12 @@ static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
} }
queue++; queue++;
} }
nodenum = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
if (nodenum < O2NM_MAX_NODES) {
spin_unlock(&res->spinlock);
return nodenum;
}
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
mlog(0, "have not found a suitable target yet! checking domain map\n"); mlog(0, "have not found a suitable target yet! checking domain map\n");