+ }
+
+ if (err) {
+ destroy_workqueue(wq);
+ wq = NULL;
+ }
+ return wq;
+}
+EXPORT_SYMBOL_GPL(__create_workqueue_key);
+
+static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
+{
+ /*
+ * Our caller is either destroy_workqueue() or CPU_DEAD,
+ * workqueue_mutex protects cwq->thread
+ */
+ if (cwq->thread == NULL)
+ return;
+
+ lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
+ lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
+
+ flush_cpu_workqueue(cwq);
+ /*
+ * If the caller is CPU_DEAD and cwq->worklist was not empty,
+ * a concurrent flush_workqueue() can insert a barrier after us.
+ * However, in that case run_workqueue() won't return and check
+ * kthread_should_stop() until it flushes all work_struct's.
+ * When ->worklist becomes empty it is safe to exit because no
+ * more work_structs can be queued on this cwq: flush_workqueue
+ * checks list_empty(), and a "normal" queue_work() can't use
+ * a dead CPU.
+ */
+ kthread_stop(cwq->thread);
+ cwq->thread = NULL;
+}
+
+/**
+ * destroy_workqueue - safely terminate a workqueue
+ * @wq: target workqueue
+ *
+ * Safely destroy a workqueue. All work currently pending will be done first.
+ */
+void destroy_workqueue(struct workqueue_struct *wq)
+{
+ const cpumask_t *cpu_map = wq_cpu_map(wq);
+ struct cpu_workqueue_struct *cwq;
+ int cpu;
+
+ mutex_lock(&workqueue_mutex);
+ list_del(&wq->list);
+ mutex_unlock(&workqueue_mutex);
+
+ for_each_cpu_mask(cpu, *cpu_map) {
+ cwq = per_cpu_ptr(wq->cpu_wq, cpu);
+ cleanup_workqueue_thread(cwq, cpu);
+ }
+
+ free_percpu(wq->cpu_wq);
+ kfree(wq);
+}
+EXPORT_SYMBOL_GPL(destroy_workqueue);