From 218527fe27adaebeb81eb770459eb335517e90ee Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 29 Mar 2018 23:20:41 +0200 Subject: tipc: replace name table service range array with rb tree The current design of the binding table has an unnecessary memory consuming and complex data structure. It aggregates the service range items into an array, which is expanded by a factor two every time it becomes too small to hold a new item. Furthermore, the arrays never shrink when the number of ranges diminishes. We now replace this array with an RB tree that is holding the range items as tree nodes, each range directly holding a list of bindings. This, along with a few name changes, improves both readability and volume of the code, as well as reducing memory consumption and hopefully improving cache hit rate. Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/name_table.c | 1032 ++++++++++++++++++++++--------------------------- 1 file changed, 470 insertions(+), 562 deletions(-) (limited to 'net/tipc/name_table.c') diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 4359605b1bec..e06c7a8907aa 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -44,52 +44,40 @@ #include "addr.h" #include "node.h" #include "group.h" -#include - -#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ /** - * struct name_info - name sequence publication info - * @node_list: list of publications on own node of this - * @all_publ: list of all publications of this + * struct service_range - container for all bindings of a service range + * @lower: service range lower bound + * @upper: service range upper bound + * @tree_node: member of service range RB tree + * @local_publ: list of identical publications made from this node + * Used by closest_first lookup and multicast lookup algorithm + * @all_publ: all publications identical to this one, whatever node and scope + * Used by round-robin lookup algorithm */ -struct name_info { - struct list_head local_publ; - struct list_head all_publ; -}; - -/** - * struct sub_seq - container for all published instances of a name sequence - * @lower: name sequence lower bound - * @upper: name sequence upper bound - * @info: pointer to name sequence publication info - */ -struct sub_seq { +struct service_range { u32 lower; u32 upper; - struct name_info *info; + struct rb_node tree_node; + struct list_head local_publ; + struct list_head all_publ; }; /** - * struct name_seq - container for all published instances of a name type - * @type: 32 bit 'type' value for name sequence - * @sseq: pointer to dynamically-sized array of sub-sequences of this 'type'; - * sub-sequences are sorted in ascending order - * @alloc: number of sub-sequences currently in array - * @first_free: array index of first unused sub-sequence entry - * @ns_list: links to adjacent name sequences in hash chain - * @subscriptions: list of subscriptions for this 'type' - * @lock: spinlock controlling access to publication lists of all sub-sequences + * struct tipc_service - container for all published instances of a service type + * @type: 32 bit 'type' value for service + * @ranges: rb tree containing all service ranges for this service + * @service_list: links to adjacent name ranges in hash chain + * @subscriptions: list of subscriptions for this service type + * @lock: spinlock controlling access to pertaining service ranges/publications * @rcu: RCU callback head used for deferred freeing */ -struct name_seq { +struct tipc_service { u32 type; - struct sub_seq *sseqs; - u32 alloc; - u32 first_free; - struct hlist_node ns_list; + struct rb_root ranges; + struct hlist_node service_list; struct list_head subscriptions; - spinlock_t lock; + spinlock_t lock; /* Covers service range list */ struct rcu_head rcu; }; @@ -99,17 +87,16 @@ static int hash(int x) } /** - * publ_create - create a publication structure + * tipc_publ_create - create a publication structure */ -static struct publication *publ_create(u32 type, u32 lower, u32 upper, - u32 scope, u32 node, u32 port, - u32 key) +static struct publication *tipc_publ_create(u32 type, u32 lower, u32 upper, + u32 scope, u32 node, u32 port, + u32 key) { struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC); - if (publ == NULL) { - pr_warn("Publication creation failure, no memory\n"); + + if (!publ) return NULL; - } publ->type = type; publ->lower = lower; @@ -119,372 +106,298 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper, publ->port = port; publ->key = key; INIT_LIST_HEAD(&publ->binding_sock); + INIT_LIST_HEAD(&publ->binding_node); + INIT_LIST_HEAD(&publ->local_publ); + INIT_LIST_HEAD(&publ->all_publ); return publ; } /** - * tipc_subseq_alloc - allocate a specified number of sub-sequence structures - */ -static struct sub_seq *tipc_subseq_alloc(u32 cnt) -{ - return kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC); -} - -/** - * tipc_nameseq_create - create a name sequence structure for the specified 'type' + * tipc_service_create - create a service structure for the specified 'type' * - * Allocates a single sub-sequence structure and sets it to all 0's. + * Allocates a single range structure and sets it to all 0's. */ -static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head) +static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd) { - struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC); - struct sub_seq *sseq = tipc_subseq_alloc(1); + struct tipc_service *service = kzalloc(sizeof(*service), GFP_ATOMIC); - if (!nseq || !sseq) { - pr_warn("Name sequence creation failed, no memory\n"); - kfree(nseq); - kfree(sseq); + if (!service) { + pr_warn("Service creation failed, no memory\n"); return NULL; } - spin_lock_init(&nseq->lock); - nseq->type = type; - nseq->sseqs = sseq; - nseq->alloc = 1; - INIT_HLIST_NODE(&nseq->ns_list); - INIT_LIST_HEAD(&nseq->subscriptions); - hlist_add_head_rcu(&nseq->ns_list, seq_head); - return nseq; + spin_lock_init(&service->lock); + service->type = type; + service->ranges = RB_ROOT; + INIT_HLIST_NODE(&service->service_list); + INIT_LIST_HEAD(&service->subscriptions); + hlist_add_head_rcu(&service->service_list, hd); + return service; } /** - * nameseq_find_subseq - find sub-sequence (if any) matching a name instance + * tipc_service_find_range - find service range matching a service instance * - * Very time-critical, so binary searches through sub-sequence array. + * Very time-critical, so binary search through range rb tree */ -static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq, - u32 instance) +static struct service_range *tipc_service_find_range(struct tipc_service *sc, + u32 instance) { - struct sub_seq *sseqs = nseq->sseqs; - int low = 0; - int high = nseq->first_free - 1; - int mid; - - while (low <= high) { - mid = (low + high) / 2; - if (instance < sseqs[mid].lower) - high = mid - 1; - else if (instance > sseqs[mid].upper) - low = mid + 1; + struct rb_node *n = sc->ranges.rb_node; + struct service_range *sr; + + while (n) { + sr = container_of(n, struct service_range, tree_node); + if (sr->lower > instance) + n = n->rb_left; + else if (sr->upper < instance) + n = n->rb_right; else - return &sseqs[mid]; + return sr; } return NULL; } -/** - * nameseq_locate_subseq - determine position of name instance in sub-sequence - * - * Returns index in sub-sequence array of the entry that contains the specified - * instance value; if no entry contains that value, returns the position - * where a new entry for it would be inserted in the array. - * - * Note: Similar to binary search code for locating a sub-sequence. - */ -static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance) +static struct service_range *tipc_service_create_range(struct tipc_service *sc, + u32 lower, u32 upper) { - struct sub_seq *sseqs = nseq->sseqs; - int low = 0; - int high = nseq->first_free - 1; - int mid; - - while (low <= high) { - mid = (low + high) / 2; - if (instance < sseqs[mid].lower) - high = mid - 1; - else if (instance > sseqs[mid].upper) - low = mid + 1; + struct rb_node **n, *parent = NULL; + struct service_range *sr, *tmp; + + n = &sc->ranges.rb_node; + while (*n) { + tmp = container_of(*n, struct service_range, tree_node); + parent = *n; + tmp = container_of(parent, struct service_range, tree_node); + if (lower < tmp->lower) + n = &(*n)->rb_left; + else if (upper > tmp->upper) + n = &(*n)->rb_right; else - return mid; + return NULL; } - return low; + sr = kzalloc(sizeof(*sr), GFP_ATOMIC); + if (!sr) + return NULL; + sr->lower = lower; + sr->upper = upper; + INIT_LIST_HEAD(&sr->local_publ); + INIT_LIST_HEAD(&sr->all_publ); + rb_link_node(&sr->tree_node, parent, n); + rb_insert_color(&sr->tree_node, &sc->ranges); + return sr; } -/** - * tipc_nameseq_insert_publ - */ -static struct publication *tipc_nameseq_insert_publ(struct net *net, - struct name_seq *nseq, +static struct publication *tipc_service_insert_publ(struct net *net, + struct tipc_service *sc, u32 type, u32 lower, u32 upper, u32 scope, - u32 node, u32 port, u32 key) + u32 node, u32 port, + u32 key) { - struct tipc_subscription *s; - struct tipc_subscription *st; - struct publication *publ; - struct sub_seq *sseq; - struct name_info *info; - int created_subseq = 0; - - sseq = nameseq_find_subseq(nseq, lower); - if (sseq) { - - /* Lower end overlaps existing entry => need an exact match */ - if ((sseq->lower != lower) || (sseq->upper != upper)) { - return NULL; - } - - info = sseq->info; - - /* Check if an identical publication already exists */ - list_for_each_entry(publ, &info->all_publ, all_publ) { - if (publ->port == port && publ->key == key && - (!publ->node || publ->node == node)) - return NULL; - } - } else { - u32 inspos; - struct sub_seq *freesseq; - - /* Find where lower end should be inserted */ - inspos = nameseq_locate_subseq(nseq, lower); - - /* Fail if upper end overlaps into an existing entry */ - if ((inspos < nseq->first_free) && - (upper >= nseq->sseqs[inspos].lower)) { - return NULL; - } - - /* Ensure there is space for new sub-sequence */ - if (nseq->first_free == nseq->alloc) { - struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2); - - if (!sseqs) { - pr_warn("Cannot publish {%u,%u,%u}, no memory\n", - type, lower, upper); - return NULL; - } - memcpy(sseqs, nseq->sseqs, - nseq->alloc * sizeof(struct sub_seq)); - kfree(nseq->sseqs); - nseq->sseqs = sseqs; - nseq->alloc *= 2; - } - - info = kzalloc(sizeof(*info), GFP_ATOMIC); - if (!info) { - pr_warn("Cannot publish {%u,%u,%u}, no memory\n", - type, lower, upper); - return NULL; - } - - INIT_LIST_HEAD(&info->local_publ); - INIT_LIST_HEAD(&info->all_publ); - - /* Insert new sub-sequence */ - sseq = &nseq->sseqs[inspos]; - freesseq = &nseq->sseqs[nseq->first_free]; - memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof(*sseq)); - memset(sseq, 0, sizeof(*sseq)); - nseq->first_free++; - sseq->lower = lower; - sseq->upper = upper; - sseq->info = info; - created_subseq = 1; + struct tipc_subscription *sub, *tmp; + struct service_range *sr; + struct publication *p; + bool first = false; + + sr = tipc_service_find_range(sc, lower); + if (!sr) { + sr = tipc_service_create_range(sc, lower, upper); + if (!sr) + goto err; + first = true; } - /* Insert a publication */ - publ = publ_create(type, lower, upper, scope, node, port, key); - if (!publ) + /* Lower end overlaps existing entry, but we need an exact match */ + if (sr->lower != lower || sr->upper != upper) return NULL; - list_add(&publ->all_publ, &info->all_publ); + /* Return if the publication already exists */ + list_for_each_entry(p, &sr->all_publ, all_publ) { + if (p->key == key && (!p->node || p->node == node)) + return NULL; + } + /* Create and insert publication */ + p = tipc_publ_create(type, lower, upper, scope, node, port, key); + if (!p) + goto err; if (in_own_node(net, node)) - list_add(&publ->local_publ, &info->local_publ); + list_add(&p->local_publ, &sr->local_publ); + list_add(&p->all_publ, &sr->all_publ); /* Any subscriptions waiting for notification? */ - list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_sub_report_overlap(s, publ->lower, publ->upper, - TIPC_PUBLISHED, publ->port, - publ->node, publ->scope, - created_subseq); + list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) { + tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_PUBLISHED, + p->port, p->node, p->scope, first); } - return publ; + return p; +err: + pr_warn("Failed to bind to %u,%u,%u, no memory\n", type, lower, upper); + return NULL; } /** - * tipc_nameseq_remove_publ + * tipc_service_remove_publ - remove a publication from a service * * NOTE: There may be cases where TIPC is asked to remove a publication * that is not in the name table. For example, if another node issues a - * publication for a name sequence that overlaps an existing name sequence + * publication for a name range that overlaps an existing name range * the publication will not be recorded, which means the publication won't - * be found when the name sequence is later withdrawn by that node. + * be found when the name range is later withdrawn by that node. * A failed withdraw request simply returns a failure indication and lets the * caller issue any error or warning messages associated with such a problem. */ -static struct publication *tipc_nameseq_remove_publ(struct net *net, - struct name_seq *nseq, +static struct publication *tipc_service_remove_publ(struct net *net, + struct tipc_service *sc, u32 inst, u32 node, u32 port, u32 key) { - struct publication *publ; - struct sub_seq *sseq = nameseq_find_subseq(nseq, inst); - struct name_info *info; - struct sub_seq *free; - struct tipc_subscription *s, *st; - int removed_subseq = 0; - - if (!sseq) - return NULL; + struct tipc_subscription *sub, *tmp; + struct service_range *sr; + struct publication *p; + bool found = false; + bool last = false; - info = sseq->info; + sr = tipc_service_find_range(sc, inst); + if (!sr) + return NULL; - /* Locate publication, if it exists */ - list_for_each_entry(publ, &info->all_publ, all_publ) { - if (publ->key == key && publ->port == port && - (!publ->node || publ->node == node)) - goto found; + /* Find publication, if it exists */ + list_for_each_entry(p, &sr->all_publ, all_publ) { + if (p->key != key || (node && node != p->node)) + continue; + found = true; + break; } - return NULL; + if (!found) + return NULL; -found: - list_del(&publ->all_publ); - if (in_own_node(net, node)) - list_del(&publ->local_publ); - - /* Contract subseq list if no more publications for that subseq */ - if (list_empty(&info->all_publ)) { - kfree(info); - free = &nseq->sseqs[nseq->first_free--]; - memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof(*sseq)); - removed_subseq = 1; + list_del(&p->all_publ); + list_del(&p->local_publ); + + /* Remove service range item if this was its last publication */ + if (list_empty(&sr->all_publ)) { + last = true; + rb_erase(&sr->tree_node, &sc->ranges); + kfree(sr); } /* Notify any waiting subscriptions */ - list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_sub_report_overlap(s, publ->lower, publ->upper, - TIPC_WITHDRAWN, publ->port, - publ->node, publ->scope, - removed_subseq); + list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) { + tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_WITHDRAWN, + p->port, p->node, p->scope, last); } - - return publ; + return p; } /** - * tipc_nameseq_subscribe - attach a subscription, and optionally - * issue the prescribed number of events if there is any sub- - * sequence overlapping with the requested sequence + * tipc_service_subscribe - attach a subscription, and optionally + * issue the prescribed number of events if there is any service + * range overlapping with the requested range */ -static void tipc_nameseq_subscribe(struct name_seq *nseq, +static void tipc_service_subscribe(struct tipc_service *service, struct tipc_subscription *sub) { - struct sub_seq *sseq = nseq->sseqs; + struct tipc_subscr *sb = &sub->evt.s; + struct service_range *sr; struct tipc_name_seq ns; - struct tipc_subscr *s = &sub->evt.s; - bool no_status; + struct publication *p; + struct rb_node *n; + bool first; - ns.type = tipc_sub_read(s, seq.type); - ns.lower = tipc_sub_read(s, seq.lower); - ns.upper = tipc_sub_read(s, seq.upper); - no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS; + ns.type = tipc_sub_read(sb, seq.type); + ns.lower = tipc_sub_read(sb, seq.lower); + ns.upper = tipc_sub_read(sb, seq.upper); tipc_sub_get(sub); - list_add(&sub->nameseq_list, &nseq->subscriptions); + list_add(&sub->service_list, &service->subscriptions); - if (no_status || !sseq) + if (tipc_sub_read(sb, filter) & TIPC_SUB_NO_STATUS) return; - while (sseq != &nseq->sseqs[nseq->first_free]) { - if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) { - struct publication *crs; - struct name_info *info = sseq->info; - int must_report = 1; - - list_for_each_entry(crs, &info->all_publ, all_publ) { - tipc_sub_report_overlap(sub, sseq->lower, - sseq->upper, - TIPC_PUBLISHED, - crs->port, - crs->node, - crs->scope, - must_report); - must_report = 0; - } + for (n = rb_first(&service->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->lower > ns.upper) + break; + if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper)) + continue; + first = true; + + list_for_each_entry(p, &sr->all_publ, all_publ) { + tipc_sub_report_overlap(sub, sr->lower, sr->upper, + TIPC_PUBLISHED, p->port, + p->node, p->scope, first); + first = false; } - sseq++; } } -static struct name_seq *nametbl_find_seq(struct net *net, u32 type) +static struct tipc_service *tipc_service_find(struct net *net, u32 type) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct hlist_head *seq_head; - struct name_seq *ns; - - seq_head = &tn->nametbl->seq_hlist[hash(type)]; - hlist_for_each_entry_rcu(ns, seq_head, ns_list) { - if (ns->type == type) - return ns; + struct name_table *nt = tipc_name_table(net); + struct hlist_head *service_head; + struct tipc_service *service; + + service_head = &nt->services[hash(type)]; + hlist_for_each_entry_rcu(service, service_head, service_list) { + if (service->type == type) + return service; } - return NULL; }; struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, - u32 lower, u32 upper, u32 scope, - u32 node, u32 port, u32 key) + u32 lower, u32 upper, + u32 scope, u32 node, + u32 port, u32 key) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct publication *publ; - struct name_seq *seq = nametbl_find_seq(net, type); - int index = hash(type); + struct name_table *nt = tipc_name_table(net); + struct tipc_service *sc; + struct publication *p; if (scope > TIPC_NODE_SCOPE || lower > upper) { - pr_debug("Failed to publish illegal {%u,%u,%u} with scope %u\n", + pr_debug("Failed to bind illegal {%u,%u,%u} with scope %u\n", type, lower, upper, scope); return NULL; } - - if (!seq) - seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) + sc = tipc_service_create(type, &nt->services[hash(type)]); + if (!sc) return NULL; - spin_lock_bh(&seq->lock); - publ = tipc_nameseq_insert_publ(net, seq, type, lower, upper, - scope, node, port, key); - spin_unlock_bh(&seq->lock); - return publ; + spin_lock_bh(&sc->lock); + p = tipc_service_insert_publ(net, sc, type, lower, upper, + scope, node, port, key); + spin_unlock_bh(&sc->lock); + return p; } struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, - u32 lower, u32 node, u32 port, - u32 key) + u32 lower, u32 node, u32 port, + u32 key) { - struct publication *publ; - struct name_seq *seq = nametbl_find_seq(net, type); + struct tipc_service *sc = tipc_service_find(net, type); + struct publication *p = NULL; - if (!seq) + if (!sc) return NULL; - spin_lock_bh(&seq->lock); - publ = tipc_nameseq_remove_publ(net, seq, lower, node, port, key); - if (!seq->first_free && list_empty(&seq->subscriptions)) { - hlist_del_init_rcu(&seq->ns_list); - kfree(seq->sseqs); - spin_unlock_bh(&seq->lock); - kfree_rcu(seq, rcu); - return publ; + spin_lock_bh(&sc->lock); + p = tipc_service_remove_publ(net, sc, lower, node, port, key); + + /* Delete service item if this no more publications and subscriptions */ + if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) { + hlist_del_init_rcu(&sc->service_list); + kfree_rcu(sc, rcu); } - spin_unlock_bh(&seq->lock); - return publ; + spin_unlock_bh(&sc->lock); + return p; } /** - * tipc_nametbl_translate - perform name translation + * tipc_nametbl_translate - perform service instance to socket translation * * On entry, 'destnode' is the search domain used during translation. * @@ -492,7 +405,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, * - if name translation is deferred to another node/cluster/zone, * leaves 'destnode' unchanged (will be non-zero) and returns 0 * - if name translation is attempted and succeeds, sets 'destnode' - * to publishing node and returns port reference (will be non-zero) + * to publication node and returns port reference (will be non-zero) * - if name translation is attempted and fails, sets 'destnode' to 0 * and returns 0 */ @@ -502,10 +415,9 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, struct tipc_net *tn = tipc_net(net); bool legacy = tn->legacy_addr_format; u32 self = tipc_own_addr(net); - struct sub_seq *sseq; - struct name_info *info; - struct publication *publ; - struct name_seq *seq; + struct service_range *sr; + struct tipc_service *sc; + struct publication *p; u32 port = 0; u32 node = 0; @@ -513,49 +425,49 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, return 0; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (unlikely(!seq)) + sc = tipc_service_find(net, type); + if (unlikely(!sc)) goto not_found; - spin_lock_bh(&seq->lock); - sseq = nameseq_find_subseq(seq, instance); - if (unlikely(!sseq)) + + spin_lock_bh(&sc->lock); + sr = tipc_service_find_range(sc, instance); + if (unlikely(!sr)) goto no_match; - info = sseq->info; /* Closest-First Algorithm */ if (legacy && !*destnode) { - if (!list_empty(&info->local_publ)) { - publ = list_first_entry(&info->local_publ, - struct publication, - local_publ); - list_move_tail(&publ->local_publ, - &info->local_publ); + if (!list_empty(&sr->local_publ)) { + p = list_first_entry(&sr->local_publ, + struct publication, + local_publ); + list_move_tail(&p->local_publ, + &sr->local_publ); } else { - publ = list_first_entry(&info->all_publ, - struct publication, - all_publ); - list_move_tail(&publ->all_publ, - &info->all_publ); + p = list_first_entry(&sr->all_publ, + struct publication, + all_publ); + list_move_tail(&p->all_publ, + &sr->all_publ); } } /* Round-Robin Algorithm */ - else if (*destnode == tipc_own_addr(net)) { - if (list_empty(&info->local_publ)) + else if (*destnode == self) { + if (list_empty(&sr->local_publ)) goto no_match; - publ = list_first_entry(&info->local_publ, struct publication, - local_publ); - list_move_tail(&publ->local_publ, &info->local_publ); + p = list_first_entry(&sr->local_publ, struct publication, + local_publ); + list_move_tail(&p->local_publ, &sr->local_publ); } else { - publ = list_first_entry(&info->all_publ, struct publication, - all_publ); - list_move_tail(&publ->all_publ, &info->all_publ); + p = list_first_entry(&sr->all_publ, struct publication, + all_publ); + list_move_tail(&p->all_publ, &sr->all_publ); } - port = publ->port; - node = publ->node; + port = p->port; + node = p->node; no_match: - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); not_found: rcu_read_unlock(); *destnode = node; @@ -567,34 +479,36 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope, bool all) { u32 self = tipc_own_addr(net); - struct publication *publ; - struct name_info *info; - struct name_seq *seq; - struct sub_seq *sseq; + struct service_range *sr; + struct tipc_service *sc; + struct publication *p; *dstcnt = 0; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (unlikely(!seq)) + sc = tipc_service_find(net, type); + if (unlikely(!sc)) goto exit; - spin_lock_bh(&seq->lock); - sseq = nameseq_find_subseq(seq, instance); - if (likely(sseq)) { - info = sseq->info; - list_for_each_entry(publ, &info->all_publ, all_publ) { - if (publ->scope != scope) - continue; - if (publ->port == exclude && publ->node == self) - continue; - tipc_dest_push(dsts, publ->node, publ->port); - (*dstcnt)++; - if (all) - continue; - list_move_tail(&publ->all_publ, &info->all_publ); - break; - } + + spin_lock_bh(&sc->lock); + + sr = tipc_service_find_range(sc, instance); + if (!sr) + goto no_match; + + list_for_each_entry(p, &sr->all_publ, all_publ) { + if (p->scope != scope) + continue; + if (p->port == exclude && p->node == self) + continue; + tipc_dest_push(dsts, p->node, p->port); + (*dstcnt)++; + if (all) + continue; + list_move_tail(&p->all_publ, &sr->all_publ); + break; } - spin_unlock_bh(&seq->lock); +no_match: + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); return !list_empty(dsts); @@ -603,61 +517,64 @@ exit: void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, bool exact, struct list_head *dports) { - struct sub_seq *sseq_stop; - struct name_info *info; + struct service_range *sr; + struct tipc_service *sc; struct publication *p; - struct name_seq *seq; - struct sub_seq *sseq; + struct rb_node *n; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) goto exit; - spin_lock_bh(&seq->lock); - sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); - sseq_stop = seq->sseqs + seq->first_free; - for (; sseq != sseq_stop; sseq++) { - if (sseq->lower > upper) + spin_lock_bh(&sc->lock); + + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->upper < lower) + continue; + if (sr->lower > upper) break; - info = sseq->info; - list_for_each_entry(p, &info->local_publ, local_publ) { + list_for_each_entry(p, &sr->local_publ, local_publ) { if (p->scope == scope || (!exact && p->scope < scope)) tipc_dest_push(dports, 0, p->port); } } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); } /* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes * - Creates list of nodes that overlap the given multicast address - * - Determines if any node local ports overlap + * - Determines if any node local destinations overlap */ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, struct tipc_nlist *nodes) { - struct sub_seq *sseq, *stop; - struct publication *publ; - struct name_info *info; - struct name_seq *seq; + struct service_range *sr; + struct tipc_service *sc; + struct publication *p; + struct rb_node *n; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) goto exit; - spin_lock_bh(&seq->lock); - sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); - stop = seq->sseqs + seq->first_free; - for (; sseq != stop && sseq->lower <= upper; sseq++) { - info = sseq->info; - list_for_each_entry(publ, &info->all_publ, all_publ) { - tipc_nlist_add(nodes, publ->node); + spin_lock_bh(&sc->lock); + + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->upper < lower) + continue; + if (sr->lower > upper) + break; + list_for_each_entry(p, &sr->all_publ, all_publ) { + tipc_nlist_add(nodes, p->node); } } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); } @@ -667,89 +584,88 @@ exit: void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, u32 type, u32 scope) { - struct sub_seq *sseq, *stop; - struct name_info *info; + struct service_range *sr; + struct tipc_service *sc; struct publication *p; - struct name_seq *seq; + struct rb_node *n; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) goto exit; - spin_lock_bh(&seq->lock); - sseq = seq->sseqs; - stop = seq->sseqs + seq->first_free; - for (; sseq != stop; sseq++) { - info = sseq->info; - list_for_each_entry(p, &info->all_publ, all_publ) { + spin_lock_bh(&sc->lock); + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + list_for_each_entry(p, &sr->all_publ, all_publ) { if (p->scope != scope) continue; tipc_group_add_member(grp, p->node, p->port, p->lower); } } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); } -/* - * tipc_nametbl_publish - add name publication to network name tables +/* tipc_nametbl_publish - add service binding to name table */ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, - u32 upper, u32 scope, u32 port_ref, + u32 upper, u32 scope, u32 port, u32 key) { - struct publication *publ; - struct sk_buff *buf = NULL; - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct name_table *nt = tipc_name_table(net); + struct tipc_net *tn = tipc_net(net); + struct publication *p = NULL; + struct sk_buff *skb = NULL; spin_lock_bh(&tn->nametbl_lock); - if (tn->nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) { - pr_warn("Publication failed, local publication limit reached (%u)\n", - TIPC_MAX_PUBLICATIONS); - spin_unlock_bh(&tn->nametbl_lock); - return NULL; + + if (nt->local_publ_count >= TIPC_MAX_PUBL) { + pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL); + goto exit; } - publ = tipc_nametbl_insert_publ(net, type, lower, upper, scope, - tipc_own_addr(net), port_ref, key); - if (likely(publ)) { - tn->nametbl->local_publ_count++; - buf = tipc_named_publish(net, publ); + p = tipc_nametbl_insert_publ(net, type, lower, upper, scope, + tipc_own_addr(net), port, key); + if (p) { + nt->local_publ_count++; + skb = tipc_named_publish(net, p); /* Any pending external events? */ tipc_named_process_backlog(net); } +exit: spin_unlock_bh(&tn->nametbl_lock); - if (buf) - tipc_node_broadcast(net, buf); - return publ; + if (skb) + tipc_node_broadcast(net, skb); + return p; } /** - * tipc_nametbl_withdraw - withdraw name publication from network name tables + * tipc_nametbl_withdraw - withdraw a service binding */ -int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, - u32 key) +int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, + u32 port, u32 key) { - struct publication *publ; + struct name_table *nt = tipc_name_table(net); + struct tipc_net *tn = tipc_net(net); + u32 self = tipc_own_addr(net); struct sk_buff *skb = NULL; - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct publication *p; spin_lock_bh(&tn->nametbl_lock); - publ = tipc_nametbl_remove_publ(net, type, lower, tipc_own_addr(net), - port, key); - if (likely(publ)) { - tn->nametbl->local_publ_count--; - skb = tipc_named_withdraw(net, publ); + + p = tipc_nametbl_remove_publ(net, type, lower, self, port, key); + if (p) { + nt->local_publ_count--; + skb = tipc_named_withdraw(net, p); /* Any pending external events? */ tipc_named_process_backlog(net); - list_del_init(&publ->binding_sock); - kfree_rcu(publ, rcu); + list_del_init(&p->binding_sock); + kfree_rcu(p, rcu); } else { - pr_err("Unable to remove local publication\n" - "(type=%u, lower=%u, port=%u, key=%u)\n", + pr_err("Failed to remove local publication {%u,%u,%u}/%u\n", type, lower, port, key); } spin_unlock_bh(&tn->nametbl_lock); @@ -766,27 +682,24 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, */ void tipc_nametbl_subscribe(struct tipc_subscription *sub) { + struct name_table *nt = tipc_name_table(sub->net); struct tipc_net *tn = tipc_net(sub->net); struct tipc_subscr *s = &sub->evt.s; u32 type = tipc_sub_read(s, seq.type); - int index = hash(type); - struct name_seq *seq; - struct tipc_name_seq ns; + struct tipc_service *sc; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(sub->net, type); - if (!seq) - seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); - if (seq) { - spin_lock_bh(&seq->lock); - tipc_nameseq_subscribe(seq, sub); - spin_unlock_bh(&seq->lock); + sc = tipc_service_find(sub->net, type); + if (!sc) + sc = tipc_service_create(type, &nt->services[hash(type)]); + if (sc) { + spin_lock_bh(&sc->lock); + tipc_service_subscribe(sc, sub); + spin_unlock_bh(&sc->lock); } else { - ns.type = tipc_sub_read(s, seq.type); - ns.lower = tipc_sub_read(s, seq.lower); - ns.upper = tipc_sub_read(s, seq.upper); - pr_warn("Failed to create subscription for {%u,%u,%u}\n", - ns.type, ns.lower, ns.upper); + pr_warn("Failed to subscribe for {%u,%u,%u}\n", type, + tipc_sub_read(s, seq.lower), + tipc_sub_read(s, seq.upper)); } spin_unlock_bh(&tn->nametbl_lock); } @@ -796,124 +709,122 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) { - struct tipc_subscr *s = &sub->evt.s; struct tipc_net *tn = tipc_net(sub->net); - struct name_seq *seq; + struct tipc_subscr *s = &sub->evt.s; u32 type = tipc_sub_read(s, seq.type); + struct tipc_service *sc; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(sub->net, type); - if (seq != NULL) { - spin_lock_bh(&seq->lock); - list_del_init(&sub->nameseq_list); - tipc_sub_put(sub); - if (!seq->first_free && list_empty(&seq->subscriptions)) { - hlist_del_init_rcu(&seq->ns_list); - kfree(seq->sseqs); - spin_unlock_bh(&seq->lock); - kfree_rcu(seq, rcu); - } else { - spin_unlock_bh(&seq->lock); - } + sc = tipc_service_find(sub->net, type); + if (!sc) + goto exit; + + spin_lock_bh(&sc->lock); + list_del_init(&sub->service_list); + tipc_sub_put(sub); + + /* Delete service item if no more publications and subscriptions */ + if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) { + hlist_del_init_rcu(&sc->service_list); + kfree_rcu(sc, rcu); } + spin_unlock_bh(&sc->lock); +exit: spin_unlock_bh(&tn->nametbl_lock); } int tipc_nametbl_init(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct name_table *tipc_nametbl; + struct tipc_net *tn = tipc_net(net); + struct name_table *nt; int i; - tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC); - if (!tipc_nametbl) + nt = kzalloc(sizeof(*nt), GFP_ATOMIC); + if (!nt) return -ENOMEM; for (i = 0; i < TIPC_NAMETBL_SIZE; i++) - INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]); + INIT_HLIST_HEAD(&nt->services[i]); - INIT_LIST_HEAD(&tipc_nametbl->node_scope); - INIT_LIST_HEAD(&tipc_nametbl->cluster_scope); - tn->nametbl = tipc_nametbl; + INIT_LIST_HEAD(&nt->node_scope); + INIT_LIST_HEAD(&nt->cluster_scope); + tn->nametbl = nt; spin_lock_init(&tn->nametbl_lock); return 0; } /** - * tipc_purge_publications - remove all publications for a given type - * - * tipc_nametbl_lock must be held when calling this function + * tipc_service_delete - purge all publications for a service and delete it */ -static void tipc_purge_publications(struct net *net, struct name_seq *seq) +static void tipc_service_delete(struct net *net, struct tipc_service *sc) { - struct publication *publ, *safe; - struct sub_seq *sseq; - struct name_info *info; - - spin_lock_bh(&seq->lock); - sseq = seq->sseqs; - info = sseq->info; - list_for_each_entry_safe(publ, safe, &info->all_publ, all_publ) { - tipc_nameseq_remove_publ(net, seq, publ->lower, publ->node, - publ->port, publ->key); - kfree_rcu(publ, rcu); + struct service_range *sr, *tmpr; + struct publication *p, *tmpb; + + spin_lock_bh(&sc->lock); + rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) { + list_for_each_entry_safe(p, tmpb, + &sr->all_publ, all_publ) { + tipc_service_remove_publ(net, sc, p->lower, p->node, + p->port, p->key); + kfree_rcu(p, rcu); + } } - hlist_del_init_rcu(&seq->ns_list); - kfree(seq->sseqs); - spin_unlock_bh(&seq->lock); - - kfree_rcu(seq, rcu); + hlist_del_init_rcu(&sc->service_list); + spin_unlock_bh(&sc->lock); + kfree_rcu(sc, rcu); } void tipc_nametbl_stop(struct net *net) { + struct name_table *nt = tipc_name_table(net); + struct tipc_net *tn = tipc_net(net); + struct hlist_head *service_head; + struct tipc_service *service; u32 i; - struct name_seq *seq; - struct hlist_head *seq_head; - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct name_table *tipc_nametbl = tn->nametbl; /* Verify name table is empty and purge any lingering * publications, then release the name table */ spin_lock_bh(&tn->nametbl_lock); for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { - if (hlist_empty(&tipc_nametbl->seq_hlist[i])) + if (hlist_empty(&nt->services[i])) continue; - seq_head = &tipc_nametbl->seq_hlist[i]; - hlist_for_each_entry_rcu(seq, seq_head, ns_list) { - tipc_purge_publications(net, seq); + service_head = &nt->services[i]; + hlist_for_each_entry_rcu(service, service_head, service_list) { + tipc_service_delete(net, service); } } spin_unlock_bh(&tn->nametbl_lock); synchronize_net(); - kfree(tipc_nametbl); - + kfree(nt); } static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, - struct name_seq *seq, - struct sub_seq *sseq, u32 *last_publ) + struct tipc_service *service, + struct service_range *sr, + u32 *last_key) { - void *hdr; - struct nlattr *attrs; - struct nlattr *publ; struct publication *p; + struct nlattr *attrs; + struct nlattr *b; + void *hdr; - if (*last_publ) { - list_for_each_entry(p, &sseq->info->all_publ, all_publ) - if (p->key == *last_publ) + if (*last_key) { + list_for_each_entry(p, &sr->all_publ, all_publ) + if (p->key == *last_key) break; - if (p->key != *last_publ) + if (p->key != *last_key) return -EPIPE; } else { - p = list_first_entry(&sseq->info->all_publ, struct publication, + p = list_first_entry(&sr->all_publ, + struct publication, all_publ); } - list_for_each_entry_from(p, &sseq->info->all_publ, all_publ) { - *last_publ = p->key; + list_for_each_entry_from(p, &sr->all_publ, all_publ) { + *last_key = p->key; hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, NLM_F_MULTI, @@ -925,15 +836,15 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, if (!attrs) goto msg_full; - publ = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); - if (!publ) + b = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); + if (!b) goto attr_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, seq->type)) + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, service->type)) goto publ_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sseq->lower)) + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sr->lower)) goto publ_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sseq->upper)) + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sr->upper)) goto publ_msg_full; if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope)) goto publ_msg_full; @@ -944,16 +855,16 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key)) goto publ_msg_full; - nla_nest_end(msg->skb, publ); + nla_nest_end(msg->skb, b); nla_nest_end(msg->skb, attrs); genlmsg_end(msg->skb, hdr); } - *last_publ = 0; + *last_key = 0; return 0; publ_msg_full: - nla_nest_cancel(msg->skb, publ); + nla_nest_cancel(msg->skb, b); attr_msg_full: nla_nest_cancel(msg->skb, attrs); msg_full: @@ -962,39 +873,34 @@ msg_full: return -EMSGSIZE; } -static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, - u32 *last_lower, u32 *last_publ) +static int __tipc_nl_service_range_list(struct tipc_nl_msg *msg, + struct tipc_service *sc, + u32 *last_lower, u32 *last_key) { - struct sub_seq *sseq; - struct sub_seq *sseq_start; + struct service_range *sr; + struct rb_node *n; int err; - if (*last_lower) { - sseq_start = nameseq_find_subseq(seq, *last_lower); - if (!sseq_start) - return -EPIPE; - } else { - sseq_start = seq->sseqs; - } - - for (sseq = sseq_start; sseq != &seq->sseqs[seq->first_free]; sseq++) { - err = __tipc_nl_add_nametable_publ(msg, seq, sseq, last_publ); + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->lower < *last_lower) + continue; + err = __tipc_nl_add_nametable_publ(msg, sc, sr, last_key); if (err) { - *last_lower = sseq->lower; + *last_lower = sr->lower; return err; } } *last_lower = 0; - return 0; } -static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, - u32 *last_type, u32 *last_lower, u32 *last_publ) +static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg, + u32 *last_type, u32 *last_lower, u32 *last_key) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct hlist_head *seq_head; - struct name_seq *seq = NULL; + struct tipc_net *tn = tipc_net(net); + struct tipc_service *service = NULL; + struct hlist_head *head; int err; int i; @@ -1004,30 +910,31 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, i = 0; for (; i < TIPC_NAMETBL_SIZE; i++) { - seq_head = &tn->nametbl->seq_hlist[i]; + head = &tn->nametbl->services[i]; if (*last_type) { - seq = nametbl_find_seq(net, *last_type); - if (!seq) + service = tipc_service_find(net, *last_type); + if (!service) return -EPIPE; } else { - hlist_for_each_entry_rcu(seq, seq_head, ns_list) + hlist_for_each_entry_rcu(service, head, service_list) break; - if (!seq) + if (!service) continue; } - hlist_for_each_entry_from_rcu(seq, ns_list) { - spin_lock_bh(&seq->lock); - err = __tipc_nl_subseq_list(msg, seq, last_lower, - last_publ); + hlist_for_each_entry_from_rcu(service, service_list) { + spin_lock_bh(&service->lock); + err = __tipc_nl_service_range_list(msg, service, + last_lower, + last_key); if (err) { - *last_type = seq->type; - spin_unlock_bh(&seq->lock); + *last_type = service->type; + spin_unlock_bh(&service->lock); return err; } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&service->lock); } *last_type = 0; } @@ -1036,13 +943,13 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) { - int err; - int done = cb->args[3]; + struct net *net = sock_net(skb->sk); u32 last_type = cb->args[0]; u32 last_lower = cb->args[1]; - u32 last_publ = cb->args[2]; - struct net *net = sock_net(skb->sk); + u32 last_key = cb->args[2]; + int done = cb->args[3]; struct tipc_nl_msg msg; + int err; if (done) return 0; @@ -1052,7 +959,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) msg.seq = cb->nlh->nlmsg_seq; rcu_read_lock(); - err = tipc_nl_seq_list(net, &msg, &last_type, &last_lower, &last_publ); + err = tipc_nl_service_list(net, &msg, &last_type, + &last_lower, &last_key); if (!err) { done = 1; } else if (err != -EMSGSIZE) { @@ -1068,7 +976,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) cb->args[0] = last_type; cb->args[1] = last_lower; - cb->args[2] = last_publ; + cb->args[2] = last_key; cb->args[3] = done; return skb->len; -- cgit v1.2.3