rbd: reference count parent requests

Keep a reference count for uses of the parent information for an rbd
device.

An initial reference is set in rbd_img_request_create() if the
target image has a parent (with non-zero overlap).  Each image
request for an image with a non-zero parent overlap gets another
reference when it's created, and that reference is dropped when the
request is destroyed.

The initial reference is dropped when the image gets torn down.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2013-05-08 22:50:04 -05:00
Родитель e93f315235
Коммит a2acd00e79
1 изменённых файлов: 102 добавлений и 2 удалений

Просмотреть файл

@ -55,6 +55,39 @@
#define SECTOR_SHIFT 9
#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
/*
* Increment the given counter and return its updated value.
* If the counter is already 0 it will not be incremented.
* If the counter is already at its maximum value returns
* -EINVAL without updating it.
*/
static int atomic_inc_return_safe(atomic_t *v)
{
unsigned int counter;
counter = (unsigned int)__atomic_add_unless(v, 1, 0);
if (counter <= (unsigned int)INT_MAX)
return (int)counter;
atomic_dec(v);
return -EINVAL;
}
/* Decrement the counter. Return the resulting value, or -EINVAL */
static int atomic_dec_return_safe(atomic_t *v)
{
int counter;
counter = atomic_dec_return(v);
if (counter >= 0)
return counter;
atomic_inc(v);
return -EINVAL;
}
#define RBD_DRV_NAME "rbd"
#define RBD_DRV_NAME_LONG "rbd (rados block device)"
@ -312,6 +345,7 @@ struct rbd_device {
struct rbd_spec *parent_spec;
u64 parent_overlap;
atomic_t parent_ref;
struct rbd_device *parent;
/* protects updating the header */
@ -361,6 +395,7 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
size_t count);
static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
static void rbd_spec_put(struct rbd_spec *spec);
static struct bus_attribute rbd_bus_attrs[] = {
__ATTR(add, S_IWUSR, NULL, rbd_add),
@ -1505,6 +1540,12 @@ static void img_request_layered_set(struct rbd_img_request *img_request)
smp_mb();
}
static void img_request_layered_clear(struct rbd_img_request *img_request)
{
clear_bit(IMG_REQ_LAYERED, &img_request->flags);
smp_mb();
}
static bool img_request_layered_test(struct rbd_img_request *img_request)
{
smp_mb();
@ -1859,6 +1900,58 @@ static void rbd_dev_unparent(struct rbd_device *rbd_dev)
rbd_dev->parent_overlap = 0;
}
/*
* Parent image reference counting is used to determine when an
* image's parent fields can be safely torn down--after there are no
* more in-flight requests to the parent image. When the last
* reference is dropped, cleaning them up is safe.
*/
static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
{
int counter;
if (!rbd_dev->parent_spec)
return;
counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
if (counter > 0)
return;
/* Last reference; clean up parent data structures */
if (!counter)
rbd_dev_unparent(rbd_dev);
else
rbd_warn(rbd_dev, "parent reference underflow\n");
}
/*
* If an image has a non-zero parent overlap, get a reference to its
* parent.
*
* Returns true if the rbd device has a parent with a non-zero
* overlap and a reference for it was successfully taken, or
* false otherwise.
*/
static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
{
int counter;
if (!rbd_dev->parent_spec)
return false;
counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
if (counter > 0 && rbd_dev->parent_overlap)
return true;
/* Image was flattened, but parent is not yet torn down */
if (counter < 0)
rbd_warn(rbd_dev, "parent reference overflow\n");
return false;
}
/*
* Caller is responsible for filling in the list of object requests
* that comprises the image request, and the Linux request pointer
@ -1892,7 +1985,7 @@ static struct rbd_img_request *rbd_img_request_create(
} else {
img_request->snap_id = rbd_dev->spec->snap_id;
}
if (rbd_dev->parent_overlap)
if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_request);
spin_lock_init(&img_request->completion_lock);
img_request->next_completion = 0;
@ -1923,6 +2016,11 @@ static void rbd_img_request_destroy(struct kref *kref)
rbd_img_obj_request_del(img_request, obj_request);
rbd_assert(img_request->obj_request_count == 0);
if (img_request_layered_test(img_request)) {
img_request_layered_clear(img_request);
rbd_dev_parent_put(img_request->rbd_dev);
}
if (img_request_write_test(img_request))
ceph_put_snap_context(img_request->snapc);
@ -3502,6 +3600,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
spin_lock_init(&rbd_dev->lock);
rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node);
init_rwsem(&rbd_dev->header_rwsem);
@ -4534,7 +4633,7 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
{
struct rbd_image_header *header;
rbd_dev_unparent(rbd_dev);
rbd_dev_parent_put(rbd_dev);
/* Free dynamic fields from the header, then zero it out */
@ -4606,6 +4705,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
if (ret < 0)
goto out_err;
rbd_dev->parent = parent;
atomic_set(&rbd_dev->parent_ref, 1);
return 0;
out_err: