diff options
| -rw-r--r-- | Documentation/admin-guide/kernel-parameters.txt | 3 | ||||
| -rw-r--r-- | Documentation/core-api/workqueue.rst | 14 | ||||
| -rw-r--r-- | Documentation/driver-api/driver-model/devres.rst | 4 | ||||
| -rw-r--r-- | drivers/firmware/efi/efi.c | 2 | ||||
| -rw-r--r-- | include/linux/workqueue.h | 47 | ||||
| -rw-r--r-- | kernel/workqueue.c | 285 | ||||
| -rw-r--r-- | lib/Kconfig.debug | 10 | ||||
| -rw-r--r-- | lib/Makefile | 1 | ||||
| -rw-r--r-- | lib/test_workqueue.c | 294 | ||||
| -rw-r--r-- | tools/workqueue/wq_dump.py | 20 |
10 files changed, 629 insertions, 51 deletions
diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt index f2ce1f4975c1..b9a2c649e411 100644 --- a/Documentation/admin-guide/kernel-parameters.txt +++ b/Documentation/admin-guide/kernel-parameters.txt @@ -8543,7 +8543,8 @@ Kernel parameters workqueue.default_affinity_scope= Select the default affinity scope to use for unbound workqueues. Can be one of "cpu", "smt", "cache", - "numa" and "system". Default is "cache". For more + "cache_shard", "numa" and "system". Default is + "cache_shard". For more information, see the Affinity Scopes section in Documentation/core-api/workqueue.rst. diff --git a/Documentation/core-api/workqueue.rst b/Documentation/core-api/workqueue.rst index 165ca73e8351..411e1b28b8de 100644 --- a/Documentation/core-api/workqueue.rst +++ b/Documentation/core-api/workqueue.rst @@ -378,9 +378,9 @@ Affinity Scopes An unbound workqueue groups CPUs according to its affinity scope to improve cache locality. For example, if a workqueue is using the default affinity -scope of "cache", it will group CPUs according to last level cache -boundaries. A work item queued on the workqueue will be assigned to a worker -on one of the CPUs which share the last level cache with the issuing CPU. +scope of "cache_shard", it will group CPUs into sub-LLC shards. A work item +queued on the workqueue will be assigned to a worker on one of the CPUs +within the same shard as the issuing CPU. Once started, the worker may or may not be allowed to move outside the scope depending on the ``affinity_strict`` setting of the scope. @@ -402,7 +402,13 @@ Workqueue currently supports the following affinity scopes. ``cache`` CPUs are grouped according to cache boundaries. Which specific cache boundary is used is determined by the arch code. L3 is used in a lot of - cases. This is the default affinity scope. + cases. + +``cache_shard`` + CPUs are grouped into sub-LLC shards of at most ``wq_cache_shard_size`` + cores (default 8, tunable via the ``workqueue.cache_shard_size`` boot + parameter). Shards are always split on core (SMT group) boundaries. + This is the default affinity scope. ``numa`` CPUs are grouped according to NUMA boundaries. diff --git a/Documentation/driver-api/driver-model/devres.rst b/Documentation/driver-api/driver-model/devres.rst index 7d2b897d66fa..017fb155a5bc 100644 --- a/Documentation/driver-api/driver-model/devres.rst +++ b/Documentation/driver-api/driver-model/devres.rst @@ -464,3 +464,7 @@ SPI WATCHDOG devm_watchdog_register_device() + +WORKQUEUE + devm_alloc_workqueue() + devm_alloc_ordered_workqueue() diff --git a/drivers/firmware/efi/efi.c b/drivers/firmware/efi/efi.c index b2fb92a4bbd1..3dab284a7754 100644 --- a/drivers/firmware/efi/efi.c +++ b/drivers/firmware/efi/efi.c @@ -423,7 +423,7 @@ static int __init efisubsys_init(void) * ordered workqueue (which creates only one execution context) * should suffice for all our needs. */ - efi_rts_wq = alloc_ordered_workqueue("efi_rts_wq", 0); + efi_rts_wq = alloc_ordered_workqueue("efi_runtime", WQ_SYSFS); if (!efi_rts_wq) { pr_err("Creating efi_rts_wq failed, EFI runtime services disabled.\n"); clear_bit(EFI_RUNTIME_SERVICES, &efi.flags); diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h index a4749f56398f..ab6cb70ca1a5 100644 --- a/include/linux/workqueue.h +++ b/include/linux/workqueue.h @@ -131,8 +131,9 @@ struct rcu_work { enum wq_affn_scope { WQ_AFFN_DFL, /* use system default */ WQ_AFFN_CPU, /* one pod per CPU */ - WQ_AFFN_SMT, /* one pod poer SMT */ + WQ_AFFN_SMT, /* one pod per SMT */ WQ_AFFN_CACHE, /* one pod per LLC */ + WQ_AFFN_CACHE_SHARD, /* synthetic sub-LLC shards */ WQ_AFFN_NUMA, /* one pod per NUMA node */ WQ_AFFN_SYSTEM, /* one pod across the whole system */ @@ -440,6 +441,9 @@ enum wq_consts { * system_long_wq is similar to system_percpu_wq but may host long running * works. Queue flushing might take relatively long. * + * system_dfl_long_wq is similar to system_dfl_wq but it may host long running + * works. + * * system_dfl_wq is unbound workqueue. Workers are not bound to * any specific CPU, not concurrency managed, and all queued works are * executed immediately as long as max_active limit is not reached and @@ -468,6 +472,7 @@ extern struct workqueue_struct *system_power_efficient_wq; extern struct workqueue_struct *system_freezable_power_efficient_wq; extern struct workqueue_struct *system_bh_wq; extern struct workqueue_struct *system_bh_highpri_wq; +extern struct workqueue_struct *system_dfl_long_wq; void workqueue_softirq_action(bool highpri); void workqueue_softirq_dead(unsigned int cpu); @@ -512,6 +517,26 @@ __printf(1, 4) struct workqueue_struct * alloc_workqueue_noprof(const char *fmt, unsigned int flags, int max_active, ...); #define alloc_workqueue(...) alloc_hooks(alloc_workqueue_noprof(__VA_ARGS__)) +/** + * devm_alloc_workqueue - Resource-managed allocate a workqueue + * @dev: Device to allocate workqueue for + * @fmt: printf format for the name of the workqueue + * @flags: WQ_* flags + * @max_active: max in-flight work items, 0 for default + * @...: args for @fmt + * + * Resource managed workqueue, see alloc_workqueue() for details. + * + * The workqueue will be automatically destroyed on driver detach. Typically + * this should be used in drivers already relying on devm interafaces. + * + * RETURNS: + * Pointer to the allocated workqueue on success, %NULL on failure. + */ +__printf(2, 5) struct workqueue_struct * +devm_alloc_workqueue(struct device *dev, const char *fmt, unsigned int flags, + int max_active, ...); + #ifdef CONFIG_LOCKDEP /** * alloc_workqueue_lockdep_map - allocate a workqueue with user-defined lockdep_map @@ -568,6 +593,8 @@ alloc_workqueue_lockdep_map(const char *fmt, unsigned int flags, int max_active, */ #define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args) +#define devm_alloc_ordered_workqueue(dev, fmt, flags, args...) \ + devm_alloc_workqueue(dev, fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args) #define create_workqueue(name) \ alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM | WQ_PERCPU, 1, (name)) @@ -712,14 +739,14 @@ static inline bool schedule_work_on(int cpu, struct work_struct *work) } /** - * schedule_work - put work task in global workqueue + * schedule_work - put work task in per-CPU workqueue * @work: job to be done * - * Returns %false if @work was already on the kernel-global workqueue and + * Returns %false if @work was already on the system per-CPU workqueue and * %true otherwise. * - * This puts a job in the kernel-global workqueue if it was not already - * queued and leaves it in the same position on the kernel-global + * This puts a job in the system per-CPU workqueue if it was not already + * queued and leaves it in the same position on the system per-CPU * workqueue otherwise. * * Shares the same memory-ordering properties of queue_work(), cf. the @@ -783,6 +810,8 @@ extern void __warn_flushing_systemwide_wq(void) _wq == system_highpri_wq) || \ (__builtin_constant_p(_wq == system_long_wq) && \ _wq == system_long_wq) || \ + (__builtin_constant_p(_wq == system_dfl_long_wq) && \ + _wq == system_dfl_long_wq) || \ (__builtin_constant_p(_wq == system_dfl_wq) && \ _wq == system_dfl_wq) || \ (__builtin_constant_p(_wq == system_freezable_wq) && \ @@ -796,12 +825,12 @@ extern void __warn_flushing_systemwide_wq(void) }) /** - * schedule_delayed_work_on - queue work in global workqueue on CPU after delay + * schedule_delayed_work_on - queue work in per-CPU workqueue on CPU after delay * @cpu: cpu to use * @dwork: job to be done * @delay: number of jiffies to wait * - * After waiting for a given time this puts a job in the kernel-global + * After waiting for a given time this puts a job in the system per-CPU * workqueue on the specified CPU. */ static inline bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork, @@ -811,11 +840,11 @@ static inline bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork, } /** - * schedule_delayed_work - put work task in global workqueue after delay + * schedule_delayed_work - put work task in per-CPU workqueue after delay * @dwork: job to be done * @delay: number of jiffies to wait or 0 for immediate execution * - * After waiting for a given time this puts a job in the kernel-global + * After waiting for a given time this puts a job in the system per-CPU * workqueue. */ static inline bool schedule_delayed_work(struct delayed_work *dwork, diff --git a/kernel/workqueue.c b/kernel/workqueue.c index c6ea96d5b716..5f747f241a5f 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -41,6 +41,7 @@ #include <linux/mempolicy.h> #include <linux/freezer.h> #include <linux/debug_locks.h> +#include <linux/device/devres.h> #include <linux/lockdep.h> #include <linux/idr.h> #include <linux/jhash.h> @@ -130,6 +131,14 @@ enum wq_internal_consts { WORKER_ID_LEN = 10 + WQ_NAME_LEN, /* "kworker/R-" + WQ_NAME_LEN */ }; +/* Layout of shards within one LLC pod */ +struct llc_shard_layout { + int nr_large_shards; /* number of large shards (cores_per_shard + 1) */ + int cores_per_shard; /* base number of cores per default shard */ + int nr_shards; /* total number of shards */ + /* nr_default shards = (nr_shards - nr_large_shards) */ +}; + /* * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because @@ -404,11 +413,12 @@ struct work_offq_data { u32 flags; }; -static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = { +static const char * const wq_affn_names[WQ_AFFN_NR_TYPES] = { [WQ_AFFN_DFL] = "default", [WQ_AFFN_CPU] = "cpu", [WQ_AFFN_SMT] = "smt", [WQ_AFFN_CACHE] = "cache", + [WQ_AFFN_CACHE_SHARD] = "cache_shard", [WQ_AFFN_NUMA] = "numa", [WQ_AFFN_SYSTEM] = "system", }; @@ -431,13 +441,16 @@ module_param_named(cpu_intensive_warning_thresh, wq_cpu_intensive_warning_thresh static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT); module_param_named(power_efficient, wq_power_efficient, bool, 0444); +static unsigned int wq_cache_shard_size = 8; +module_param_named(cache_shard_size, wq_cache_shard_size, uint, 0444); + static bool wq_online; /* can kworkers be created yet? */ static bool wq_topo_initialized __read_mostly = false; static struct kmem_cache *pwq_cache; static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES]; -static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE; +static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE_SHARD; /* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */ static struct workqueue_attrs *unbound_wq_update_pwq_attrs_buf; @@ -530,6 +543,8 @@ struct workqueue_struct *system_bh_wq; EXPORT_SYMBOL_GPL(system_bh_wq); struct workqueue_struct *system_bh_highpri_wq; EXPORT_SYMBOL_GPL(system_bh_highpri_wq); +struct workqueue_struct *system_dfl_long_wq __ro_after_init; +EXPORT_SYMBOL_GPL(system_dfl_long_wq); static int worker_thread(void *__worker); static void workqueue_sysfs_unregister(struct workqueue_struct *wq); @@ -2519,7 +2534,6 @@ static void __queue_delayed_work(int cpu, struct workqueue_struct *wq, struct timer_list *timer = &dwork->timer; struct work_struct *work = &dwork->work; - WARN_ON_ONCE(!wq); WARN_ON_ONCE(timer->function != delayed_work_timer_fn); WARN_ON_ONCE(timer_pending(timer)); WARN_ON_ONCE(!list_empty(&work->entry)); @@ -5635,8 +5649,16 @@ enomem: for_each_possible_cpu(cpu) { struct pool_workqueue *pwq = *per_cpu_ptr(wq->cpu_pwq, cpu); - if (pwq) + if (pwq) { + /* + * Unlink pwq from wq->pwqs since link_pwq() + * may have already added it. wq->mutex is not + * needed as the wq has not been published yet. + */ + if (!list_empty(&pwq->pwqs_node)) + list_del_rcu(&pwq->pwqs_node); kmem_cache_free(pwq_cache, pwq); + } } free_percpu(wq->cpu_pwq); wq->cpu_pwq = NULL; @@ -5904,6 +5926,33 @@ struct workqueue_struct *alloc_workqueue_noprof(const char *fmt, } EXPORT_SYMBOL_GPL(alloc_workqueue_noprof); +static void devm_workqueue_release(void *res) +{ + destroy_workqueue(res); +} + +__printf(2, 5) struct workqueue_struct * +devm_alloc_workqueue(struct device *dev, const char *fmt, unsigned int flags, + int max_active, ...) +{ + struct workqueue_struct *wq; + va_list args; + int ret; + + va_start(args, max_active); + wq = alloc_workqueue(fmt, flags, max_active, args); + va_end(args); + if (!wq) + return NULL; + + ret = devm_add_action_or_reset(dev, devm_workqueue_release, wq); + if (ret) + return NULL; + + return wq; +} +EXPORT_SYMBOL_GPL(devm_alloc_workqueue); + #ifdef CONFIG_LOCKDEP __printf(1, 5) struct workqueue_struct * @@ -7059,7 +7108,7 @@ int workqueue_unbound_housekeeping_update(const struct cpumask *hk) /* * If the operation fails, it will fall back to * wq_requested_unbound_cpumask which is initially set to - * (HK_TYPE_WQ ∩ HK_TYPE_DOMAIN) house keeping mask and rewritten + * HK_TYPE_DOMAIN house keeping mask and rewritten * by any subsequent write to workqueue/cpumask sysfs file. */ if (!cpumask_and(cpumask, wq_requested_unbound_cpumask, hk)) @@ -7078,13 +7127,7 @@ int workqueue_unbound_housekeeping_update(const struct cpumask *hk) static int parse_affn_scope(const char *val) { - int i; - - for (i = 0; i < ARRAY_SIZE(wq_affn_names); i++) { - if (!strncasecmp(val, wq_affn_names[i], strlen(wq_affn_names[i]))) - return i; - } - return -EINVAL; + return sysfs_match_string(wq_affn_names, val); } static int wq_affn_dfl_set(const char *val, const struct kernel_param *kp) @@ -7191,7 +7234,26 @@ static struct attribute *wq_sysfs_attrs[] = { &dev_attr_max_active.attr, NULL, }; -ATTRIBUTE_GROUPS(wq_sysfs); + +static umode_t wq_sysfs_is_visible(struct kobject *kobj, struct attribute *a, int n) +{ + struct device *dev = kobj_to_dev(kobj); + struct workqueue_struct *wq = dev_to_wq(dev); + + /* + * Adjusting max_active breaks ordering guarantee. Changing it has no + * effect on BH worker. Limit max_active to RO in such case. + */ + if (wq->flags & (WQ_BH | __WQ_ORDERED)) + return 0444; + return a->mode; +} + +static const struct attribute_group wq_sysfs_group = { + .is_visible = wq_sysfs_is_visible, + .attrs = wq_sysfs_attrs, +}; +__ATTRIBUTE_GROUPS(wq_sysfs); static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr, char *buf) @@ -7494,13 +7556,6 @@ int workqueue_sysfs_register(struct workqueue_struct *wq) struct wq_device *wq_dev; int ret; - /* - * Adjusting max_active breaks ordering guarantee. Disallow exposing - * ordered workqueues. - */ - if (WARN_ON(wq->flags & __WQ_ORDERED)) - return -EINVAL; - wq->wq_dev = wq_dev = kzalloc_obj(*wq_dev); if (!wq_dev) return -ENOMEM; @@ -7877,8 +7932,8 @@ void __init workqueue_init_early(void) { struct wq_pod_type *pt = &wq_pod_types[WQ_AFFN_SYSTEM]; int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; - void (*irq_work_fns[2])(struct irq_work *) = { bh_pool_kick_normal, - bh_pool_kick_highpri }; + void (*irq_work_fns[NR_STD_WORKER_POOLS])(struct irq_work *) = + { bh_pool_kick_normal, bh_pool_kick_highpri }; int i, cpu; BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); @@ -7890,7 +7945,6 @@ void __init workqueue_init_early(void) cpumask_copy(wq_online_cpumask, cpu_online_mask); cpumask_copy(wq_unbound_cpumask, cpu_possible_mask); - restrict_unbound_cpumask("HK_TYPE_WQ", housekeeping_cpumask(HK_TYPE_WQ)); restrict_unbound_cpumask("HK_TYPE_DOMAIN", housekeeping_cpumask(HK_TYPE_DOMAIN)); if (!cpumask_empty(&wq_cmdline_cpumask)) restrict_unbound_cpumask("workqueue.unbound_cpus", &wq_cmdline_cpumask); @@ -7974,11 +8028,12 @@ void __init workqueue_init_early(void) system_bh_wq = alloc_workqueue("events_bh", WQ_BH | WQ_PERCPU, 0); system_bh_highpri_wq = alloc_workqueue("events_bh_highpri", WQ_BH | WQ_HIGHPRI | WQ_PERCPU, 0); + system_dfl_long_wq = alloc_workqueue("events_dfl_long", WQ_UNBOUND, WQ_MAX_ACTIVE); BUG_ON(!system_wq || !system_percpu_wq|| !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_dfl_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq || - !system_bh_wq || !system_bh_highpri_wq); + !system_bh_wq || !system_bh_highpri_wq || !system_dfl_long_wq); } static void __init wq_cpu_intensive_thresh_init(void) @@ -8144,6 +8199,186 @@ static bool __init cpus_share_numa(int cpu0, int cpu1) return cpu_to_node(cpu0) == cpu_to_node(cpu1); } +/* Maps each CPU to its shard index within the LLC pod it belongs to */ +static int cpu_shard_id[NR_CPUS] __initdata; + +/** + * llc_count_cores - count distinct cores (SMT groups) within an LLC pod + * @pod_cpus: the cpumask of CPUs in the LLC pod + * @smt_pods: the SMT pod type, used to identify sibling groups + * + * A core is represented by the lowest-numbered CPU in its SMT group. Returns + * the number of distinct cores found in @pod_cpus. + */ +static int __init llc_count_cores(const struct cpumask *pod_cpus, + struct wq_pod_type *smt_pods) +{ + const struct cpumask *sibling_cpus; + int nr_cores = 0, c; + + /* + * Count distinct cores by only counting the first CPU in each + * SMT sibling group. + */ + for_each_cpu(c, pod_cpus) { + sibling_cpus = smt_pods->pod_cpus[smt_pods->cpu_pod[c]]; + if (cpumask_first(sibling_cpus) == c) + nr_cores++; + } + + return nr_cores; +} + +/* + * llc_shard_size - number of cores in a given shard + * + * Cores are spread as evenly as possible. The first @nr_large_shards shards are + * "large shards" with (cores_per_shard + 1) cores; the rest are "default + * shards" with cores_per_shard cores. + */ +static int __init llc_shard_size(int shard_id, int cores_per_shard, int nr_large_shards) +{ + /* The first @nr_large_shards shards are large shards */ + if (shard_id < nr_large_shards) + return cores_per_shard + 1; + + /* The remaining shards are default shards */ + return cores_per_shard; +} + +/* + * llc_calc_shard_layout - compute the shard layout for an LLC pod + * @nr_cores: number of distinct cores in the LLC pod + * + * Chooses the number of shards that keeps average shard size closest to + * wq_cache_shard_size. Returns a struct describing the total number of shards, + * the base size of each, and how many are large shards. + */ +static struct llc_shard_layout __init llc_calc_shard_layout(int nr_cores) +{ + struct llc_shard_layout layout; + + /* Ensure at least one shard; pick the count closest to the target size */ + layout.nr_shards = max(1, DIV_ROUND_CLOSEST(nr_cores, wq_cache_shard_size)); + layout.cores_per_shard = nr_cores / layout.nr_shards; + layout.nr_large_shards = nr_cores % layout.nr_shards; + + return layout; +} + +/* + * llc_shard_is_full - check whether a shard has reached its core capacity + * @cores_in_shard: number of cores already assigned to this shard + * @shard_id: index of the shard being checked + * @layout: the shard layout computed by llc_calc_shard_layout() + * + * Returns true if @cores_in_shard equals the expected size for @shard_id. + */ +static bool __init llc_shard_is_full(int cores_in_shard, int shard_id, + const struct llc_shard_layout *layout) +{ + return cores_in_shard == llc_shard_size(shard_id, layout->cores_per_shard, + layout->nr_large_shards); +} + +/** + * llc_populate_cpu_shard_id - populate cpu_shard_id[] for each CPU in an LLC pod + * @pod_cpus: the cpumask of CPUs in the LLC pod + * @smt_pods: the SMT pod type, used to identify sibling groups + * @nr_cores: number of distinct cores in @pod_cpus (from llc_count_cores()) + * + * Walks @pod_cpus in order. At each SMT group leader, advances to the next + * shard once the current shard is full. Results are written to cpu_shard_id[]. + */ +static void __init llc_populate_cpu_shard_id(const struct cpumask *pod_cpus, + struct wq_pod_type *smt_pods, + int nr_cores) +{ + struct llc_shard_layout layout = llc_calc_shard_layout(nr_cores); + const struct cpumask *sibling_cpus; + /* Count the number of cores in the current shard_id */ + int cores_in_shard = 0; + unsigned int leader; + /* This is a cursor for the shards. Go from zero to nr_shards - 1*/ + int shard_id = 0; + int c; + + /* Iterate at every CPU for a given LLC pod, and assign it a shard */ + for_each_cpu(c, pod_cpus) { + sibling_cpus = smt_pods->pod_cpus[smt_pods->cpu_pod[c]]; + if (cpumask_first(sibling_cpus) == c) { + /* This is the CPU leader for the siblings */ + if (llc_shard_is_full(cores_in_shard, shard_id, &layout)) { + shard_id++; + cores_in_shard = 0; + } + cores_in_shard++; + cpu_shard_id[c] = shard_id; + } else { + /* + * The siblings' shard MUST be the same as the leader. + * never split threads in the same core. + */ + leader = cpumask_first(sibling_cpus); + + /* + * This check silences a Warray-bounds warning on UP + * configs where NR_CPUS=1 makes cpu_shard_id[] + * a single-element array, and the compiler can't + * prove the index is always 0. + */ + if (WARN_ON_ONCE(leader >= nr_cpu_ids)) + continue; + cpu_shard_id[c] = cpu_shard_id[leader]; + } + } + + WARN_ON_ONCE(shard_id != (layout.nr_shards - 1)); +} + +/** + * precompute_cache_shard_ids - assign each CPU its shard index within its LLC + * + * Iterates over all LLC pods. For each pod, counts distinct cores then assigns + * shard indices to all CPUs in the pod. Must be called after WQ_AFFN_CACHE and + * WQ_AFFN_SMT have been initialized. + */ +static void __init precompute_cache_shard_ids(void) +{ + struct wq_pod_type *llc_pods = &wq_pod_types[WQ_AFFN_CACHE]; + struct wq_pod_type *smt_pods = &wq_pod_types[WQ_AFFN_SMT]; + const struct cpumask *cpus_sharing_llc; + int nr_cores; + int pod; + + if (!wq_cache_shard_size) { + pr_warn("workqueue: cache_shard_size must be > 0, setting to 1\n"); + wq_cache_shard_size = 1; + } + + for (pod = 0; pod < llc_pods->nr_pods; pod++) { + cpus_sharing_llc = llc_pods->pod_cpus[pod]; + + /* Number of cores in this given LLC */ + nr_cores = llc_count_cores(cpus_sharing_llc, smt_pods); + llc_populate_cpu_shard_id(cpus_sharing_llc, smt_pods, nr_cores); + } +} + +/* + * cpus_share_cache_shard - test whether two CPUs belong to the same cache shard + * + * Two CPUs share a cache shard if they are in the same LLC and have the same + * shard index. Used as the pod affinity callback for WQ_AFFN_CACHE_SHARD. + */ +static bool __init cpus_share_cache_shard(int cpu0, int cpu1) +{ + if (!cpus_share_cache(cpu0, cpu1)) + return false; + + return cpu_shard_id[cpu0] == cpu_shard_id[cpu1]; +} + /** * workqueue_init_topology - initialize CPU pods for unbound workqueues * @@ -8159,6 +8394,8 @@ void __init workqueue_init_topology(void) init_pod_type(&wq_pod_types[WQ_AFFN_CPU], cpus_dont_share); init_pod_type(&wq_pod_types[WQ_AFFN_SMT], cpus_share_smt); init_pod_type(&wq_pod_types[WQ_AFFN_CACHE], cpus_share_cache); + precompute_cache_shard_ids(); + init_pod_type(&wq_pod_types[WQ_AFFN_CACHE_SHARD], cpus_share_cache_shard); init_pod_type(&wq_pod_types[WQ_AFFN_NUMA], cpus_share_numa); wq_topo_initialized = true; diff --git a/lib/Kconfig.debug b/lib/Kconfig.debug index aac60b6cfa4b..77c3774c1c49 100644 --- a/lib/Kconfig.debug +++ b/lib/Kconfig.debug @@ -2636,6 +2636,16 @@ config TEST_VMALLOC If unsure, say N. +config TEST_WORKQUEUE + tristate "Test module for stress/performance analysis of workqueue" + default n + help + This builds the "test_workqueue" module for benchmarking + workqueue throughput under contention. Useful for evaluating + affinity scope changes (e.g., cache_shard vs cache). + + If unsure, say N. + config TEST_BPF tristate "Test BPF filter functionality" depends on m && NET diff --git a/lib/Makefile b/lib/Makefile index 1b9ee167517f..ea660cca04f4 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -79,6 +79,7 @@ UBSAN_SANITIZE_test_ubsan.o := y obj-$(CONFIG_TEST_KSTRTOX) += test-kstrtox.o obj-$(CONFIG_TEST_LKM) += test_module.o obj-$(CONFIG_TEST_VMALLOC) += test_vmalloc.o +obj-$(CONFIG_TEST_WORKQUEUE) += test_workqueue.o obj-$(CONFIG_TEST_RHASHTABLE) += test_rhashtable.o obj-$(CONFIG_TEST_STATIC_KEYS) += test_static_keys.o obj-$(CONFIG_TEST_STATIC_KEYS) += test_static_key_base.o diff --git a/lib/test_workqueue.c b/lib/test_workqueue.c new file mode 100644 index 000000000000..99e160bd5ad1 --- /dev/null +++ b/lib/test_workqueue.c @@ -0,0 +1,294 @@ +// SPDX-License-Identifier: GPL-2.0 + +/* + * Test module for stress and performance analysis of workqueue. + * + * Benchmarks queue_work() throughput on an unbound workqueue to measure + * pool->lock contention under different affinity scope configurations + * (e.g., cache vs cache_shard). + * + * The affinity scope is changed between runs via the workqueue's sysfs + * affinity_scope attribute (WQ_SYSFS). + * + * Copyright (c) 2026 Meta Platforms, Inc. and affiliates + * Copyright (c) 2026 Breno Leitao <leitao@debian.org> + * + */ +#include <linux/init.h> +#include <linux/kernel.h> +#include <linux/module.h> +#include <linux/workqueue.h> +#include <linux/kthread.h> +#include <linux/moduleparam.h> +#include <linux/completion.h> +#include <linux/atomic.h> +#include <linux/slab.h> +#include <linux/ktime.h> +#include <linux/cpumask.h> +#include <linux/sched.h> +#include <linux/sort.h> +#include <linux/fs.h> + +#define WQ_NAME "bench_wq" +#define SCOPE_PATH "/sys/bus/workqueue/devices/" WQ_NAME "/affinity_scope" + +static int nr_threads; +module_param(nr_threads, int, 0444); +MODULE_PARM_DESC(nr_threads, + "Number of threads to spawn (default: 0 = num_online_cpus())"); + +static int wq_items = 50000; +module_param(wq_items, int, 0444); +MODULE_PARM_DESC(wq_items, + "Number of work items each thread queues (default: 50000)"); + +static struct workqueue_struct *bench_wq; +static atomic_t threads_done; +static DECLARE_COMPLETION(start_comp); +static DECLARE_COMPLETION(all_done_comp); + +struct thread_ctx { + struct completion work_done; + struct work_struct work; + u64 *latencies; + int cpu; + int items; +}; + +static void bench_work_fn(struct work_struct *work) +{ + struct thread_ctx *ctx = container_of(work, struct thread_ctx, work); + + complete(&ctx->work_done); +} + +static int bench_kthread_fn(void *data) +{ + struct thread_ctx *ctx = data; + ktime_t t_start, t_end; + int i; + + /* Wait for all threads to be ready */ + wait_for_completion(&start_comp); + + if (kthread_should_stop()) + return 0; + + for (i = 0; i < ctx->items; i++) { + reinit_completion(&ctx->work_done); + INIT_WORK(&ctx->work, bench_work_fn); + + t_start = ktime_get(); + queue_work(bench_wq, &ctx->work); + t_end = ktime_get(); + + ctx->latencies[i] = ktime_to_ns(ktime_sub(t_end, t_start)); + wait_for_completion(&ctx->work_done); + } + + if (atomic_dec_and_test(&threads_done)) + complete(&all_done_comp); + + /* + * Wait for kthread_stop() so the module text isn't freed + * while we're still executing. + */ + while (!kthread_should_stop()) + schedule(); + + return 0; +} + +static int cmp_u64(const void *a, const void *b) +{ + u64 va = *(const u64 *)a; + u64 vb = *(const u64 *)b; + + if (va < vb) + return -1; + if (va > vb) + return 1; + return 0; +} + +static int __init set_affn_scope(const char *scope) +{ + struct file *f; + loff_t pos = 0; + ssize_t ret; + + f = filp_open(SCOPE_PATH, O_WRONLY, 0); + if (IS_ERR(f)) { + pr_err("test_workqueue: open %s failed: %ld\n", + SCOPE_PATH, PTR_ERR(f)); + return PTR_ERR(f); + } + + ret = kernel_write(f, scope, strlen(scope), &pos); + filp_close(f, NULL); + + if (ret < 0) { + pr_err("test_workqueue: write '%s' failed: %zd\n", scope, ret); + return ret; + } + + return 0; +} + +static int __init run_bench(int n_threads, const char *scope, const char *label) +{ + struct task_struct **tasks; + unsigned long total_items; + struct thread_ctx *ctxs; + u64 *all_latencies; + ktime_t start, end; + int cpu, i, j, ret; + s64 elapsed_us; + + ret = set_affn_scope(scope); + if (ret) + return ret; + + ctxs = kcalloc(n_threads, sizeof(*ctxs), GFP_KERNEL); + if (!ctxs) + return -ENOMEM; + + tasks = kcalloc(n_threads, sizeof(*tasks), GFP_KERNEL); + if (!tasks) { + kfree(ctxs); + return -ENOMEM; + } + + total_items = (unsigned long)n_threads * wq_items; + all_latencies = kvmalloc_array(total_items, sizeof(u64), GFP_KERNEL); + if (!all_latencies) { + kfree(tasks); + kfree(ctxs); + return -ENOMEM; + } + + /* Allocate per-thread latency arrays */ + for (i = 0; i < n_threads; i++) { + ctxs[i].latencies = kvmalloc_array(wq_items, sizeof(u64), + GFP_KERNEL); + if (!ctxs[i].latencies) { + while (--i >= 0) + kvfree(ctxs[i].latencies); + kvfree(all_latencies); + kfree(tasks); + kfree(ctxs); + return -ENOMEM; + } + } + + atomic_set(&threads_done, n_threads); + reinit_completion(&all_done_comp); + reinit_completion(&start_comp); + + /* Create kthreads, each bound to a different online CPU */ + i = 0; + for_each_online_cpu(cpu) { + if (i >= n_threads) + break; + + ctxs[i].cpu = cpu; + ctxs[i].items = wq_items; + init_completion(&ctxs[i].work_done); + + tasks[i] = kthread_create(bench_kthread_fn, &ctxs[i], + "wq_bench/%d", cpu); + if (IS_ERR(tasks[i])) { + ret = PTR_ERR(tasks[i]); + pr_err("test_workqueue: failed to create kthread %d: %d\n", + i, ret); + /* Unblock threads waiting on start_comp before stopping them */ + complete_all(&start_comp); + while (--i >= 0) + kthread_stop(tasks[i]); + goto out_free; + } + + kthread_bind(tasks[i], cpu); + wake_up_process(tasks[i]); + i++; + } + + /* Start timing and release all threads */ + start = ktime_get(); + complete_all(&start_comp); + + /* Wait for all threads to finish the benchmark */ + wait_for_completion(&all_done_comp); + + /* Drain any remaining work */ + flush_workqueue(bench_wq); + + /* Ensure all kthreads have fully exited before module memory is freed */ + for (i = 0; i < n_threads; i++) + kthread_stop(tasks[i]); + + end = ktime_get(); + elapsed_us = ktime_us_delta(end, start); + + /* Merge all per-thread latencies and sort for percentile calculation */ + j = 0; + for (i = 0; i < n_threads; i++) { + memcpy(&all_latencies[j], ctxs[i].latencies, + wq_items * sizeof(u64)); + j += wq_items; + } + + sort(all_latencies, total_items, sizeof(u64), cmp_u64, NULL); + + pr_info("test_workqueue: %-16s %llu items/sec\tp50=%llu\tp90=%llu\tp95=%llu ns\n", + label, + elapsed_us ? div_u64(total_items * 1000000ULL, elapsed_us) : 0, + all_latencies[total_items * 50 / 100], + all_latencies[total_items * 90 / 100], + all_latencies[total_items * 95 / 100]); + + ret = 0; +out_free: + for (i = 0; i < n_threads; i++) + kvfree(ctxs[i].latencies); + kvfree(all_latencies); + kfree(tasks); + kfree(ctxs); + + return ret; +} + +static const char * const bench_scopes[] = { + "cpu", "smt", "cache_shard", "cache", "numa", "system", +}; + +static int __init test_workqueue_init(void) +{ + int n_threads = min(nr_threads ?: num_online_cpus(), num_online_cpus()); + int i; + + if (wq_items <= 0) { + pr_err("test_workqueue: wq_items must be > 0\n"); + return -EINVAL; + } + + bench_wq = alloc_workqueue(WQ_NAME, WQ_UNBOUND | WQ_SYSFS, 0); + if (!bench_wq) + return -ENOMEM; + + pr_info("test_workqueue: running %d threads, %d items/thread\n", + n_threads, wq_items); + + for (i = 0; i < ARRAY_SIZE(bench_scopes); i++) + run_bench(n_threads, bench_scopes[i], bench_scopes[i]); + + destroy_workqueue(bench_wq); + + /* Return -EAGAIN so the module doesn't stay loaded after the benchmark */ + return -EAGAIN; +} + +module_init(test_workqueue_init); +MODULE_AUTHOR("Breno Leitao <leitao@debian.org>"); +MODULE_DESCRIPTION("Stress/performance benchmark for workqueue subsystem"); +MODULE_LICENSE("GPL"); diff --git a/tools/workqueue/wq_dump.py b/tools/workqueue/wq_dump.py index d29b918306b4..ce4161f52f2f 100644 --- a/tools/workqueue/wq_dump.py +++ b/tools/workqueue/wq_dump.py @@ -107,6 +107,7 @@ WQ_MEM_RECLAIM = prog['WQ_MEM_RECLAIM'] WQ_AFFN_CPU = prog['WQ_AFFN_CPU'] WQ_AFFN_SMT = prog['WQ_AFFN_SMT'] WQ_AFFN_CACHE = prog['WQ_AFFN_CACHE'] +WQ_AFFN_CACHE_SHARD = prog['WQ_AFFN_CACHE_SHARD'] WQ_AFFN_NUMA = prog['WQ_AFFN_NUMA'] WQ_AFFN_SYSTEM = prog['WQ_AFFN_SYSTEM'] @@ -138,7 +139,7 @@ def print_pod_type(pt): print(f' [{cpu}]={pt.cpu_pod[cpu].value_()}', end='') print('') -for affn in [WQ_AFFN_CPU, WQ_AFFN_SMT, WQ_AFFN_CACHE, WQ_AFFN_NUMA, WQ_AFFN_SYSTEM]: +for affn in [WQ_AFFN_CPU, WQ_AFFN_SMT, WQ_AFFN_CACHE, WQ_AFFN_CACHE_SHARD, WQ_AFFN_NUMA, WQ_AFFN_SYSTEM]: print('') print(f'{wq_affn_names[affn].string_().decode().upper()}{" (default)" if affn == wq_affn_dfl else ""}') print_pod_type(wq_pod_types[affn]) @@ -227,15 +228,10 @@ if 'node_to_cpumask_map' in prog: print(f'NODE[{node:02}]={cpumask_str(node_to_cpumask_map[node])}') print('') - print(f'[{"workqueue":^{WQ_NAME_LEN-2}}\\ min max', end='') - first = True + print(f'[{"workqueue":^{WQ_NAME_LEN-1}} {"min":>4} {"max":>4}', end='') for node in for_each_node(): - if first: - print(f' NODE {node}', end='') - first = False - else: - print(f' {node:7}', end='') - print(f' {"dfl":>7} ]') + print(f' {"NODE " + str(node):>9}', end='') + print(f' {"dfl":>9} ]') print('') for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(), 'list'): @@ -243,11 +239,11 @@ if 'node_to_cpumask_map' in prog: continue print(f'{wq.name.string_().decode():{WQ_NAME_LEN}} ', end='') - print(f'{wq.min_active.value_():3} {wq.max_active.value_():3}', end='') + print(f'{wq.min_active.value_():4} {wq.max_active.value_():4}', end='') for node in for_each_node(): nna = wq.node_nr_active[node] - print(f' {nna.nr.counter.value_():3}/{nna.max.value_():3}', end='') + print(f' {f"{nna.nr.counter.value_()}/{nna.max.value_()}":>9}', end='') nna = wq.node_nr_active[nr_node_ids] - print(f' {nna.nr.counter.value_():3}/{nna.max.value_():3}') + print(f' {f"{nna.nr.counter.value_()}/{nna.max.value_()}":>9}') else: printf(f'node_to_cpumask_map not present, is NUMA enabled?') |
