ceph: use rbtree for pg pools; decode new osdmap format

Since we can now create and destroy pg pools, the pool ids will be sparse,
and an array no longer makes sense for looking up by pool id.  Use an
rbtree instead.

The OSDMap encoding also no longer has a max pool count (previously used to
allocate the array).  There is a new pool_max, that is the largest pool id
we've ever used, although we don't actually need it in the client.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-02-16 15:55:03 -08:00
parent 9794b146fa
commit 4fc51be8fa
4 changed files with 104 additions and 50 deletions

View File

@ -78,6 +78,7 @@ static int osdmap_show(struct seq_file *s, void *p)
{ {
int i; int i;
struct ceph_client *client = s->private; struct ceph_client *client = s->private;
struct rb_node *n;
if (client->osdc.osdmap == NULL) if (client->osdc.osdmap == NULL)
return 0; return 0;
@ -87,11 +88,11 @@ static int osdmap_show(struct seq_file *s, void *p)
" NEARFULL" : "", " NEARFULL" : "",
(client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ? (client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ?
" FULL" : ""); " FULL" : "");
for (i = 0; i < client->osdc.osdmap->num_pools; i++) { for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pool = struct ceph_pg_pool_info *pool =
&client->osdc.osdmap->pg_pool[i]; rb_entry(n, struct ceph_pg_pool_info, node);
seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n", seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n",
i, pool->v.pg_num, pool->pg_num_mask, pool->id, pool->v.pg_num, pool->pg_num_mask,
pool->v.lpg_num, pool->lpg_num_mask); pool->v.lpg_num, pool->lpg_num_mask);
} }
for (i = 0; i < client->osdc.osdmap->max_osd; i++) { for (i = 0; i < client->osdc.osdmap->max_osd; i++) {

View File

@ -328,9 +328,15 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
rb_erase(&pg->node, &map->pg_temp); rb_erase(&pg->node, &map->pg_temp);
kfree(pg); kfree(pg);
} }
while (!RB_EMPTY_ROOT(&map->pg_pools)) {
struct ceph_pg_pool_info *pi =
rb_entry(rb_first(&map->pg_pools),
struct ceph_pg_pool_info, node);
rb_erase(&pi->node, &map->pg_pools);
kfree(pi);
}
kfree(map->osd_state); kfree(map->osd_state);
kfree(map->osd_weight); kfree(map->osd_weight);
kfree(map->pg_pool);
kfree(map->osd_addr); kfree(map->osd_addr);
kfree(map); kfree(map);
} }
@ -432,6 +438,48 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
return NULL; return NULL;
} }
/*
* rbtree of pg pool info
*/
static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
{
struct rb_node **p = &root->rb_node;
struct rb_node *parent = NULL;
struct ceph_pg_pool_info *pi = NULL;
while (*p) {
parent = *p;
pi = rb_entry(parent, struct ceph_pg_pool_info, node);
if (new->id < pi->id)
p = &(*p)->rb_left;
else if (new->id > pi->id)
p = &(*p)->rb_right;
else
return -EEXIST;
}
rb_link_node(&new->node, parent, p);
rb_insert_color(&new->node, root);
return 0;
}
static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
{
struct ceph_pg_pool_info *pi;
struct rb_node *n = root->rb_node;
while (n) {
pi = rb_entry(n, struct ceph_pg_pool_info, node);
if (id < pi->id)
n = n->rb_left;
else if (id > pi->id)
n = n->rb_right;
else
return pi;
}
return NULL;
}
/* /*
* decode a full map. * decode a full map.
*/ */
@ -443,6 +491,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
u8 ev; u8 ev;
int err = -EINVAL; int err = -EINVAL;
void *start = *p; void *start = *p;
struct ceph_pg_pool_info *pi;
dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p)); dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p));
@ -464,32 +513,27 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
ceph_decode_copy(p, &map->created, sizeof(map->created)); ceph_decode_copy(p, &map->created, sizeof(map->created));
ceph_decode_copy(p, &map->modified, sizeof(map->modified)); ceph_decode_copy(p, &map->modified, sizeof(map->modified));
map->num_pools = ceph_decode_32(p);
map->pg_pool = kcalloc(map->num_pools, sizeof(*map->pg_pool),
GFP_NOFS);
if (!map->pg_pool) {
err = -ENOMEM;
goto bad;
}
ceph_decode_32_safe(p, end, max, bad); ceph_decode_32_safe(p, end, max, bad);
while (max--) { while (max--) {
ceph_decode_need(p, end, 4+1+sizeof(map->pg_pool->v), bad); ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
i = ceph_decode_32(p); pi = kmalloc(sizeof(*pi), GFP_NOFS);
if (i >= map->num_pools) if (!pi)
goto bad; goto bad;
pi->id = ceph_decode_32(p);
ev = ceph_decode_8(p); /* encoding version */ ev = ceph_decode_8(p); /* encoding version */
if (ev > CEPH_PG_POOL_VERSION) { if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n", pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION); ev, CEPH_PG_POOL_VERSION);
goto bad; goto bad;
} }
ceph_decode_copy(p, &map->pg_pool[i].v, ceph_decode_copy(p, &pi->v, sizeof(pi->v));
sizeof(map->pg_pool->v)); __insert_pg_pool(&map->pg_pools, pi);
calc_pg_masks(&map->pg_pool[i]); calc_pg_masks(pi);
p += le32_to_cpu(map->pg_pool[i].v.num_snaps) * sizeof(u64); p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
p += le32_to_cpu(map->pg_pool[i].v.num_removed_snap_intervals) p += le32_to_cpu(pi->v.num_removed_snap_intervals)
* sizeof(u64) * 2; * sizeof(u64) * 2;
} }
ceph_decode_32_safe(p, end, map->pool_max, bad);
ceph_decode_32_safe(p, end, map->flags, bad); ceph_decode_32_safe(p, end, map->flags, bad);
@ -581,7 +625,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
u32 epoch = 0; u32 epoch = 0;
struct ceph_timespec modified; struct ceph_timespec modified;
u32 len, pool; u32 len, pool;
__s32 new_flags, max; __s32 new_pool_max, new_flags, max;
void *start = *p; void *start = *p;
int err = -EINVAL; int err = -EINVAL;
u16 version; u16 version;
@ -600,6 +644,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
epoch = ceph_decode_32(p); epoch = ceph_decode_32(p);
BUG_ON(epoch != map->epoch+1); BUG_ON(epoch != map->epoch+1);
ceph_decode_copy(p, &modified, sizeof(modified)); ceph_decode_copy(p, &modified, sizeof(modified));
new_pool_max = ceph_decode_32(p);
new_flags = ceph_decode_32(p); new_flags = ceph_decode_32(p);
/* full map? */ /* full map? */
@ -623,6 +668,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
/* new flags? */ /* new flags? */
if (new_flags >= 0) if (new_flags >= 0)
map->flags = new_flags; map->flags = new_flags;
if (new_pool_max >= 0)
map->pool_max = new_pool_max;
ceph_decode_need(p, end, 5*sizeof(u32), bad); ceph_decode_need(p, end, 5*sizeof(u32), bad);
@ -647,37 +694,42 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
ceph_decode_32_safe(p, end, len, bad); ceph_decode_32_safe(p, end, len, bad);
while (len--) { while (len--) {
__u8 ev; __u8 ev;
struct ceph_pg_pool_info *pi;
ceph_decode_32_safe(p, end, pool, bad); ceph_decode_32_safe(p, end, pool, bad);
if (pool >= map->num_pools) { ceph_decode_need(p, end, 1 + sizeof(pi->v), bad);
void *pg_pool = kcalloc(pool + 1,
sizeof(*map->pg_pool),
GFP_NOFS);
if (!pg_pool) {
err = -ENOMEM;
goto bad;
}
memcpy(pg_pool, map->pg_pool,
map->num_pools * sizeof(*map->pg_pool));
kfree(map->pg_pool);
map->pg_pool = pg_pool;
map->num_pools = pool+1;
}
ceph_decode_need(p, end, 1 + sizeof(map->pg_pool->v), bad);
ev = ceph_decode_8(p); /* encoding version */ ev = ceph_decode_8(p); /* encoding version */
if (ev > CEPH_PG_POOL_VERSION) { if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n", pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION); ev, CEPH_PG_POOL_VERSION);
goto bad; goto bad;
} }
ceph_decode_copy(p, &map->pg_pool[pool].v, pi = __lookup_pg_pool(&map->pg_pools, pool);
sizeof(map->pg_pool->v)); if (!pi) {
calc_pg_masks(&map->pg_pool[pool]); pi = kmalloc(sizeof(*pi), GFP_NOFS);
if (!pi) {
err = -ENOMEM;
goto bad;
}
pi->id = pool;
__insert_pg_pool(&map->pg_pools, pi);
}
ceph_decode_copy(p, &pi->v, sizeof(pi->v));
calc_pg_masks(pi);
} }
/* old_pool (ignore) */ /* old_pool */
ceph_decode_32_safe(p, end, len, bad); ceph_decode_32_safe(p, end, len, bad);
*p += len * sizeof(u32); while (len--) {
struct ceph_pg_pool_info *pi;
ceph_decode_32_safe(p, end, pool, bad);
pi = __lookup_pg_pool(&map->pg_pools, pool);
if (pi) {
rb_erase(&pi->node, &map->pg_pools);
kfree(pi);
}
}
/* new_up */ /* new_up */
err = -EINVAL; err = -EINVAL;
@ -861,10 +913,10 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
unsigned ps; unsigned ps;
BUG_ON(!osdmap); BUG_ON(!osdmap);
if (poolid >= osdmap->num_pools)
return -EIO;
pool = &osdmap->pg_pool[poolid]; pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
if (!pool)
return -EIO;
ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
if (preferred >= 0) { if (preferred >= 0) {
ps += preferred; ps += preferred;
@ -919,9 +971,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
preferred >= osdmap->crush->max_devices) preferred >= osdmap->crush->max_devices)
preferred = -1; preferred = -1;
if (poolid >= osdmap->num_pools) pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
if (!pool)
return NULL; return NULL;
pool = &osdmap->pg_pool[poolid];
ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
pool->v.type, pool->v.size); pool->v.type, pool->v.size);
if (ruleno < 0) { if (ruleno < 0) {

View File

@ -19,6 +19,8 @@
* the change between two successive epochs, or as a fully encoded map. * the change between two successive epochs, or as a fully encoded map.
*/ */
struct ceph_pg_pool_info { struct ceph_pg_pool_info {
struct rb_node node;
int id;
struct ceph_pg_pool v; struct ceph_pg_pool v;
int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask; int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask;
}; };
@ -44,9 +46,8 @@ struct ceph_osdmap {
struct ceph_entity_addr *osd_addr; struct ceph_entity_addr *osd_addr;
struct rb_root pg_temp; struct rb_root pg_temp;
struct rb_root pg_pools;
u32 num_pools; u32 pool_max;
struct ceph_pg_pool_info *pg_pool;
/* the CRUSH map specifies the mapping of placement groups to /* the CRUSH map specifies the mapping of placement groups to
* the list of osds that store+replicate them. */ * the list of osds that store+replicate them. */

View File

@ -11,8 +11,8 @@
/* /*
* osdmap encoding versions * osdmap encoding versions
*/ */
#define CEPH_OSDMAP_INC_VERSION 3 #define CEPH_OSDMAP_INC_VERSION 4
#define CEPH_OSDMAP_VERSION 3 #define CEPH_OSDMAP_VERSION 4
/* /*
* fs id * fs id