首页 > C/C++ > 用线程池实现多线程调用并使用回调函数实现函数调用
2016
08-25

用线程池实现多线程调用并使用回调函数实现函数调用

* 创建线程或者进程的开销是很大的
* 为了防止频繁的创建、销毁线程,提高程序的运行效率
* 往往会建立一个线程池用于多线程程序的调度
* 下面的程序就是完整的线程池实现
*
* 主要通过互斥量和条件变量实现同步

threadpool.h

#ifndef _THREADPOOL_H_  
#define _THREADPOOL_H_  
#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <pthread.h>  
  
/* 线程体数据结构*/  
typedef struct runner  
{  
    void (*callback)(void *arg);//回调函数指针  
    void *arg;                //回调函数参数  
    struct runner *next;  
}thread_runner;  
  
/* 线程池数据结构*/  
typedef struct pool  
{  
    pthread_mutex_t mutex;      //互斥量  
    pthread_cond_t cond;        //条件变量  
    thread_runner* runner_head; //线程池中所有等待任务的头指针  
    thread_runner* runner_tail; //线程池中所有等待任务的尾指针  
    int shutdown;              //线程池是否销毁,0没有注销,1注销  
    pthread_t* threads;        //所有线程  
    int max_thread_size;       //线程池中允许的活动线程数目  
}thread_pool;  
  
void run(void *arg);  
void threadpool_init(thread_pool *pool, int max_thread_size);  
void threadpool_add_runner(thread_pool *pool, void (*callback)(void *arg), void *arg);  
void threadpool_destroy(thread_pool **ppool);  
  
#endif  //_THREADPOOL_H_  

 

threadpool.c

//======================= threadpool.c ===========================  
#include "threadpool.h"  
  
/**********************************************************  
* 初始化线程  
* 参数:  
* pool:指向线程池结构有效地址的动态指针  
* max_thread_size:最大的线程数  
**********************************************************/  
void threadpool_init(thread_pool *pool, int max_thread_size)  
{  
    int iLoop = 0;  

    /*线程池初始化操作*/  
    pthread_mutex_init(&(pool->mutex), NULL);                              //初始化互斥量  
    pthread_cond_init(&(pool->cond), NULL);                                //初始化条件变量  
    pool->shutdown = 0;                                                  //线程池默认没有注销  
    pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t)); //创建所有分离线程  
    pool->runner_head = NULL;  
    pool->runner_tail = NULL;  
    pool->max_thread_size = max_thread_size;  
  
    /*创建线程操作*/  
    for(iLoop; iLoop < max_thread_size; iLoop++)  
    {  
        pthread_attr_t attr;                //定义线程对象  
        pthread_attr_init(&attr);             //初始化线程的属性  
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);    //设置脱离状态的属性(决定这个线程在终止时是否可以被结合)  
        pthread_create(&(pool->threads[iLoop]), &attr, (void *)run, (void *)pool); /*threads[i] 动态创建线程;  
                                                                                   *第一个参数为指向线程标识符的指针。  
                                                                                   *第二个参数用来设置线程属性。  
                                                                                   *第三个参数是线程运行函数的起始地址。  
                                                                                   *最后一个参数是运行函数的参数。*/  
    }   
    printf("threadpool_init-> create %d detached thread\n");  
}  
  
/**********************************************************  
* 线程体,创建线程后调用的函数  
* 参数:  
* arg:接收创建线程后传递的参数  
**********************************************************/  
void run(void *arg)  
{  
    thread_pool *pool = (thread_pool *)arg;  
  
    while(1)  
    {  
        pthread_mutex_lock(&(pool->mutex));               //加锁  
        printf("run-> locked!\n");  
  
        /*如果等待队列为0并且线程池未销毁,则处于阻塞状态 */  
        while(pool->runner_head == NULL && !pool->shutdown)  
        {  
            pthread_cond_wait(&(pool->cond), &(pool->mutex));  
        }  
  
        /*如果线程已经销毁*/  
        if(pool->shutdown)  
        {  
            pthread_mutex_unlock(&(pool->mutex));          //解锁  
            printf("run-> unlock and thread exit!\n");  
            pthread_exit(NULL);  
        }  
        thread_runner *runner = pool->runner_head;         //取链表的头元素  
        pool->runner_head = runner->next;  
        pthread_mutex_unlock(&(pool->mutex));              //解锁  
        printf("run-> unlocked!\n");  
        (runner->callback)(runner->arg);                  //调用回调函数,执行任务  
        free(runner);                                   //释放线程操作  
        runner = NULL;  
        printf("run-> runned and free runner!\n");  
    }  
    pthread_exit(NULL);  
}  
  
/**********************************************************  
* 向线程池加入任务  
* 参数:  
* pool:指向线程池结构有效地址的动态指针  
* callback:线程回调函数  
* arg:回调函数参数  
**********************************************************/  
void threadpool_add_runner(thread_pool *pool, void(*callback)(void *arg), void *arg)  
{    
    thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner));//构建一个新任务  
    newrunner->callback = callback;  
    newrunner->arg = arg;  
    newrunner->next = NULL;  
  
    pthread_mutex_lock(&(pool->mutex));                    //加锁  
    printf("threadpool_add_runner-> locked\n");  
  
    /*将新任务加入到等待队列中,如果等待队列为空,直接运行当前的线程 */  
    if(pool->runner_head != NULL)  
    {  
        pool->runner_tail->next = newrunner;  
        pool->runner_tail = newrunner;  
    }  
    else  
    {  
        pool->runner_head = newrunner;   
        pool->runner_tail = newrunner;  
    }  
    pthread_mutex_unlock(&(pool->mutex));                  //解锁  
    printf("threadpool_add_runner-> unlocked\n");  
    pthread_cond_signal(&(pool->cond));                    //唤醒一个等待线程  
    printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");  
}  
  
/**********************************************************  
* 销毁线程池  
* 参数:  
* ppool:指向线程池结构有效地址的动态指针地址(二级指针)  
**********************************************************/  
void threadpool_destroy(thread_pool **ppool)  
{  
    thread_pool *pool = *ppool;  
  
    /*判断线程池是否注销,防止二次销毁*/  
    if(!pool->shutdown)  
    {  
        pool->shutdown = 1;   
        pthread_cond_broadcast(&(pool->cond));    //唤醒所有的等待线程,线程池要销毁了  
        sleep(1);                              //等待所有的线程终止  
        printf("threadpool_destroy-> wakeup all waitting threads\n");    
        free(pool->threads);                    //回收空间  
  
        /*销毁等待队列*/  
        thread_runner *head = NULL;  
        while(pool->runner_head != NULL)  
        {  
            head = pool->runner_head;  
            pool->runner_head = pool->runner_head->next;  
            free(head);  
        }  
        printf("thread_destroy-> all runners freed\n");  
  
        pthread_mutex_destroy(&(pool->mutex));    //销毁条件变量  
        pthread_cond_destroy(&(pool->cond));      //销毁互斥量  
        printf("thread_destroy-> mutex and cond destroyed\n");  
        free(pool);  
        pool = NULL;  
        (*ppool) = NULL;  
        printf("threadpool_destroy-> pool freed\n");  
    }  
}  
//======================= end threadpool.c ===========================  

 

main.c

#include "threadpool.h"  
  
  
void threadrun(void *arg)  
{  
    int i = *(int *)arg;  
    printf("threadrun result == %d\n", i);  
}  
  
int main(int argc, char *argv[])  
{  
    thread_pool *pool = (thread_pool *)malloc(sizeof(thread_pool));  
    threadpool_init(pool, 9);  
    int i;  
    int tmp[10];  
    for(i = 0; i < 10; i++)  
    {  
        tmp[i] = i;  
        threadpool_add_runner(pool, threadrun, &tmp[i]);  
    }  
    sleep(1);  
    threadpool_destroy(&pool);  
    printf("main-> %p\n", pool);  
    printf("main-> test over\n");  
    return 0;  
}  

 

Makefile

#Makefile  
main: mian.o threadpool.o  
    gcc -o main test.o threadpool.o -lpthread  
mian.o: threadpool.h  
    gcc -c main.c -lpthread  
hreadpool.o: threadpool.h  
    gcc -c threadpool.c -lpthread  
  
.PHONY:clean  
clean:  
    rm -f *.o main  

 

 转自:http://blog.csdn.net/ouchengguo/article/details/25113661

最后编辑:
作者:liujg
真实-不弄虚,不做假,做自己,不违心; 踏实-不浮躁,不盲从,不急功,不近利; 实学-不投机,不取巧,勤于学,精于业。