UDN-企业互联网技术人气社区

板块导航

浏览  : 1477
回复  : 0

[运维] Linux 内核 tasklet 机制和工作队列

[复制链接]
cat77的头像 楼主
  Tasklet机制分析

  上面我们介绍了软中断机制,linux内核为什么还要引入tasklet机制呢?主要原因是软中断的pending标志位也就32位,一般情况是不随意增加软中断处理的。而且内核也没有提供通用的增加软中断的接口。其次内,软中断处理函数要求可重入,需要考虑到竞争条件比较多,要求比较高的编程技巧。所以内核提供了tasklet这样的一种通用的机制。

  其实每次写总结的文章,总是想把细节的东西说明白,所以越写越多。这样做的好处是能真正理解其中的机制。但是,内容太多的一个坏处就是难道记忆,所以,在讲清楚讲详细的同时,我还要把精髓总结出来。Tasklet的特点,也是tasklet的精髓就是:tasklet不能休眠,同一个tasklet不能在两个CPU上同时运行,但是不同tasklet可能在不同CPU上同时运行,则需要注意共享数据的保护。

  主要的数据结构
  1. static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);

  2. static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);
复制代码
  1. struct tasklet_struct
  2. {
  3.     struct tasklet_struct *next;
  4.     unsigned long state;
  5.     atomic_t count;
  6.     void (*func)(unsigned long);
  7.     unsigned long data;
  8. };
复制代码

  如何使用tasklet

  使用tasklet比较简单,只需要初始化一个tasklet_struct结构体,然后调用tasklet_schedule,就能利用tasklet机制执行初始化的func函数。
  1. static inline void tasklet_schedule(struct tasklet_struct *t)
  2. {
  3.     if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
  4.         __tasklet_schedule(t);
  5. }
复制代码

  tasklet_schedule处理过程也比较简单,就是把tasklet_struct结构体挂到tasklet_vec链表或者挂接到tasklet_hi_vec链表上,并调度软中断TASKLET_SOFTIRQ或者HI_SOFTIRQ
  1. void __tasklet_schedule(struct tasklet_struct *t)
  2. {
  3.     unsigned long flags;local_irq_save(flags);
  4.     t->next = NULL;
  5.     *__get_cpu_var(tasklet_vec).tail = t;
  6.     __get_cpu_var(tasklet_vec).tail = &(t->next);
  7.     raise_softirq_irqoff(TASKLET_SOFTIRQ);
  8.     local_irq_restore(flags);
  9. }

  10. EXPORT_SYMBOL(__tasklet_schedule);

  11. void __tasklet_hi_schedule(struct tasklet_struct *t)
  12. {
  13.     unsigned long flags;
  14.    
  15.     local_irq_save(flags);
  16.     t->next = NULL;
  17.     *__get_cpu_var(tasklet_hi_vec).tail = t;
  18.     __get_cpu_var(tasklet_hi_vec).tail = &(t->next);
  19.     raise_softirq_irqoff(HI_SOFTIRQ);
  20.     local_irq_restore(flags);
  21. }

  22. EXPORT_SYMBOL(__tasklet_hi_schedule);
复制代码

  Tasklet执行过程

  Tasklet_action在软中断TASKLET_SOFTIRQ被调度到后会被执行,它从tasklet_vec链表中把tasklet_struct结构体都取下来,然后逐个执行。如果t->count的值等于0,说明这个tasklet在调度之后,被disable掉了,所以会将tasklet结构体重新放回到tasklet_vec链表,并重新调度TASKLET_SOFTIRQ软中断,在之后enable这个tasklet之后重新再执行它。
  1. static void tasklet_action(struct softirq_action *a)
  2. {
  3.     struct tasklet_struct *list;local_irq_disable();
  4.     list = __get_cpu_var(tasklet_vec).head;
  5.     __get_cpu_var(tasklet_vec).head = NULL;
  6.     __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;
  7.     local_irq_enable();
  8.    
  9.     while (list)
  10.     {
  11.         struct tasklet_struct *t = list;
  12.         
  13.         list = list->next;
  14.         
  15.         if (tasklet_trylock(t))
  16.         {
  17.             if (!atomic_read(&t->count))
  18.             {
  19.                 if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
  20.                     BUG();
  21.                 t->func(t->data);
  22.                 tasklet_unlock(t);
  23.                 continue;
  24.             }
  25.             tasklet_unlock(t);
  26.         }
  27.         
  28.         local_irq_disable();
  29.         t->next = NULL;
  30.         *__get_cpu_var(tasklet_vec).tail = t;
  31.         __get_cpu_var(tasklet_vec).tail = &(t->next);
  32.         __raise_softirq_irqoff(TASKLET_SOFTIRQ);
  33.         local_irq_enable();
  34.     }
  35. }
复制代码

   Linux工作队列

  前面已经介绍了tasklet机制,有了tasklet机制为什么还要增加工作队列机制呢?我的理解是由于tasklet机制的限制,变形tasklet中的回调函数有很多的限制,比如不能有休眠的操作等等。而是用工作队列机制,需要处理的函数在进程上下文中调用,休眠操作都是允许的。但是工作队列的实时性不如tasklet,采用工作队列的例程可能不能在短时间内被调用执行。

  数据结构说明

  首先需要说明的是workqueue_struct和cpu_workqueue_struct这两个数据结构,创建一个工作队列首先需要创建workqueue_struct,然后可以在每个CPU上创建一个cpu_workqueue_struct管理结构体。
  1. struct cpu_workqueue_struct
  2. {
  3.     spinlock_t lock;
  4.    
  5.     struct list_head worklist;
  6.     wait_queue_head_t more_work;
  7.     struct work_struct *current_work;
  8.    
  9.     struct workqueue_struct *wq;
  10.     struct task_struct *thread;
  11.    
  12.     int run_depth;        /* Detect run_workqueue() recursion depth */
  13. } ____cacheline_aligned;

  14. /*
  15. * The externally visible workqueue abstraction is an array of
  16. * per-CPU workqueues:
  17. */
  18. struct workqueue_struct
  19. {
  20.     struct cpu_workqueue_struct *cpu_wq;
  21.     struct list_head list;
  22.     const char *name;
  23.     int singlethread;
  24.     int freezeable;        /* Freeze threads during suspend */
  25.     int rt;
  26. #ifdef CONFIG_LOCKDEP
  27.     struct lockdep_map lockdep_map;
  28. #endif
  29. };
复制代码

  Work_struct表示将要提交的处理的工作。
  1. struct work_struct
  2. {
  3.     atomic_long_t data;
  4. #define WORK_STRUCT_PENDING 0        /* T if work item pending execution */
  5. #define WORK_STRUCT_FLAG_MASK (3UL)
  6. #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
  7.     struct list_head entry;
  8.     work_func_t func;
  9. #ifdef CONFIG_LOCKDEP
  10.     struct lockdep_map lockdep_map;
  11. #endif
  12. };
复制代码

  上面三个数据结构的关系如下图所示
1.png

  介绍主要数据结构的目的并不是想要把工作队列具体的细节说明白,主要的目的是给大家一个总的架构的轮廓。具体的分析在下面展开。从上面的该模块主要数据结构的关系来看,主要需要分析如下几个问题:

  1. Workqueque是怎样创建的,包括event/0内核进程的创建

  2. Work_queue是如何提交到工作队列的

  3. Event/0内核进程如何处理提交到队列上的工作

  Workqueque的创建

  首先申请了workqueue_struct结构体内存,cpu_workqueue_struct结构体的内存。然后在init_cpu_workqueue函数中对cpu_workqueue_struct结构体进行初始化。同时调用create_workqueue_thread函数创建处理工作队列的内核进程。

  create_workqueue_thread中创建了如下的内核进程
  1. p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
复制代码

  最后调用start_workqueue_thread启动新创建的进程。
  1. struct workqueue_struct *__create_workqueue_key(const char *name,
  2.                                                 int singlethread,
  3.                                                 int freezeable,
  4.                                                 int rt,
  5.                                                 struct lock_class_key *key,
  6.                                                 const char *lock_name)
  7. {
  8.     struct workqueue_struct *wq;
  9.     struct cpu_workqueue_struct *cwq;
  10.     int err = 0, cpu;wq = kzalloc(sizeof(*wq), GFP_KERNEL);
  11.     if (!wq)
  12.         return NULL;
  13.    
  14.     wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
  15.     if (!wq->cpu_wq)
  16.     {
  17.         kfree(wq);
  18.         return NULL;
  19.     }
  20.    
  21.     wq->name = name;
  22.     lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
  23.     wq->singlethread = singlethread;
  24.     wq->freezeable = freezeable;
  25.     wq->rt = rt;
  26.     INIT_LIST_HEAD(&wq->list);
  27.    
  28.     if (singlethread)
  29.     {
  30.         cwq = init_cpu_workqueue(wq, singlethread_cpu);
  31.         err = create_workqueue_thread(cwq, singlethread_cpu);
  32.         start_workqueue_thread(cwq, -1);
  33.     }
  34.     else
  35.     {
  36.         cpu_maps_update_begin();
  37.         /*
  38.          * We must place this wq on list even if the code below fails.
  39.          * cpu_down(cpu) can remove cpu from cpu_populated_map before
  40.          * destroy_workqueue() takes the lock, in that case we leak
  41.          * cwq[cpu]->thread.
  42.          */
  43.         spin_lock(&workqueue_lock);
  44.         list_add(&wq->list, &workqueues);
  45.         spin_unlock(&workqueue_lock);
  46.         /*
  47.          * We must initialize cwqs for each possible cpu even if we
  48.          * are going to call destroy_workqueue() finally. Otherwise
  49.          * cpu_up() can hit the uninitialized cwq once we drop the
  50.          * lock.
  51.          */
  52.         for_each_possible_cpu(cpu)
  53.         {
  54.             cwq = init_cpu_workqueue(wq, cpu);
  55.             if (err || !cpu_online(cpu))
  56.                 continue;
  57.             err = create_workqueue_thread(cwq, cpu);
  58.             start_workqueue_thread(cwq, cpu);
  59.         }
  60.         cpu_maps_update_done();
  61.     }
  62.    
  63.     if (err)
  64.     {
  65.         destroy_workqueue(wq);
  66.         wq = NULL;
  67.     }
  68.     return wq;
  69. }
  70. EXPORT_SYMBOL_GPL(__create_workqueue_key);
复制代码

  向工作队列中添加工作

  Shedule_work 函数向工作队列中添加任务。这个接口比较简单,无非是一些队列操作,不再叙述。
  1. /**
  2. * schedule_work - put work task in global workqueue
  3. * @work: job to be done
  4. *
  5. * This puts a job in the kernel-global workqueue.
  6. */
  7. int schedule_work(struct work_struct *work)
  8. {
  9.     return queue_work(keventd_wq, work);
  10. }
  11. EXPORT_SYMBOL(schedule_work);
复制代码

  工作队列内核进程的处理过程

  在创建工作队列的时候,我们创建了一个或者多个进程来处理挂到队列上的工作。这个内核进程的主要函数体为worker_thread,这个函数比较有意思的地方就是,自己降低的优先级,说明worker_thread调度的优先级比较低。在系统负载大大时候,采用工作队列执行的操作可能存在较大的延迟。

  就函数的执行流程来说是真心的简单,只是从队列中取出work,从队列中删除掉,清除掉pending标记,并执行work设置的回调函数。
  1. static int worker_thread(void *__cwq)
  2. {
  3.     struct cpu_workqueue_struct *cwq = __cwq;
  4.     DEFINE_WAIT(wait);if (cwq->wq->freezeable)
  5.         set_freezable();
  6.    
  7.     set_user_nice(current, -5);
  8.    
  9.     for (;;)
  10.     {
  11.         prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
  12.         if (!freezing(current) &&
  13.             !kthread_should_stop() &&
  14.             list_empty(&cwq->worklist))
  15.             schedule();
  16.         finish_wait(&cwq->more_work, &wait);
  17.         
  18.         try_to_freeze();
  19.         
  20.         if (kthread_should_stop())
  21.             break;
  22.         
  23.         run_workqueue(cwq);
  24.     }
  25.    
  26.     return 0;
  27. }

  28. static void run_workqueue(struct cpu_workqueue_struct *cwq)
  29. {
  30.     spin_lock_irq(&cwq->lock);
  31.     cwq->run_depth++;
  32.     if (cwq->run_depth > 3)
  33.     {
  34.         /* morton gets to eat his hat */
  35.         printk("%s: recursion depth exceeded: %dn",
  36.                __func__, cwq->run_depth);
  37.         dump_stack();
  38.     }
  39.     while (!list_empty(&cwq->worklist))
  40.     {
  41.         struct work_struct *work = list_entry(cwq->worklist.next,
  42.                                               struct work_struct, entry);
  43.         work_func_t f = work->func;
  44. #ifdef CONFIG_LOCKDEP
  45.         /*
  46.          * It is permissible to free the struct work_struct
  47.          * from inside the function that is called from it,
  48.          * this we need to take into account for lockdep too.
  49.          * To avoid bogus "held lock freed" warnings as well
  50.          * as problems when looking into work->lockdep_map,
  51.          * make a copy and use that here.
  52.          */
  53.         struct lockdep_map lockdep_map = work->lockdep_map;
  54. #endifcwq->current_work = work;
  55.         list_del_init(cwq->worklist.next);
  56.         spin_unlock_irq(&cwq->lock);
  57.         
  58.         BUG_ON(get_wq_data(work) != cwq);
  59.         work_clear_pending(work);
  60.         lock_map_acquire(&cwq->wq->lockdep_map);
  61.         lock_map_acquire(&lockdep_map);
  62.         f(work);
  63.         lock_map_release(&lockdep_map);
  64.         lock_map_release(&cwq->wq->lockdep_map);
  65.         
  66.         if (unlikely(in_atomic() || lockdep_depth(current) > 0))
  67.         {
  68.             printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
  69.                    "%s/0x%08x/%dn",
  70.                    current->comm, preempt_count(),
  71.                    task_pid_nr(current));
  72.             printk(KERN_ERR "    last function: ");
  73.             print_symbol("%sn", (unsigned long)f);
  74.             debug_show_held_locks(current);
  75.             dump_stack();
  76.         }
  77.         
  78.         spin_lock_irq(&cwq->lock);
  79.         cwq->current_work = NULL;
  80.     }
  81.     cwq->run_depth--;
  82.     spin_unlock_irq(&cwq->lock);
  83. }
复制代码

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部