diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index b04069b419..897236bd1c 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -7,7 +7,7 @@ AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/ AM_CPPFLAGS = $(AM_CFLAGS) lib_LTLIBRARIES = libks.la -libks_la_SOURCES = src/ks.c src/ks_string.c src/ks_json.c src/ks_thread.c src/ks_mutex.c src/ks_config.c +libks_la_SOURCES = src/ks.c src/ks_string.c src/ks_json.c src/ks_thread.c src/ks_thread_pool.c src/ks_mutex.c src/ks_config.c libks_la_SOURCES += src/ks_log.c src/ks_socket.c src/ks_buffer.c src/ks_pool.c src/simclist.c libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/ks_dso.c src/ks_dht.c libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c @@ -22,6 +22,7 @@ libks_la_LDFLAGS = $(AM_LDFLAGS) -version-info 0:1:0 -lncurses -lpthread -lm library_includedir = $(prefix)/include library_include_HEADERS = src/include/ks_config.h src/include/ks.h src/include/ks_threadmutex.h src/include/ks_json.h src/include/ks_buffer.h +library_include_HEADERS += src/include/ks_thread_pool.h library_include_HEADERS += src/include/ks_pool.h src/include/simclist.h src/include/ks_time.h src/include/ks_q.h src/include/ks_socket.h library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include/ks_platform.h src/include/ks_types.h # src/include/ks_rng.h library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h diff --git a/libs/libks/src/include/ks.h b/libs/libks/src/include/ks.h index 90c06879eb..7839e3f61f 100644 --- a/libs/libks/src/include/ks.h +++ b/libs/libks/src/include/ks.h @@ -115,6 +115,7 @@ KS_DECLARE(void) ks_random_string(char *buf, uint16_t len, char *set); #include "ks_printf.h" #include "ks_json.h" #include "ks_threadmutex.h" +#include "ks_thread_pool.h" #include "ks_hash.h" #include "ks_config.h" #include "ks_q.h" diff --git a/libs/libks/src/include/ks_thread_pool.h b/libs/libks/src/include/ks_thread_pool.h new file mode 100644 index 0000000000..35ea8496cf --- /dev/null +++ b/libs/libks/src/include/ks_thread_pool.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef _KS_THREAD_POOL_H_ +#define _KS_THREAD_POOL_H_ + +KS_BEGIN_EXTERN_C +KS_DECLARE(ks_status_t) ks_thread_pool_create(ks_thread_pool_t **tp, uint32_t min, uint32_t max, size_t stack_size, + ks_thread_priority_t priority, uint32_t idle_sec); +KS_DECLARE(ks_status_t) ks_thread_pool_destroy(ks_thread_pool_t **tp); +KS_DECLARE(ks_status_t) ks_thread_pool_add_job(ks_thread_pool_t *tp, ks_thread_function_t func, void *data); +KS_DECLARE(ks_size_t) ks_thread_pool_backlog(ks_thread_pool_t *tp); + +KS_END_EXTERN_C + +#endif /* defined(_KS_THREAD_POOL_H_) */ + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/src/include/ks_types.h b/libs/libks/src/include/ks_types.h index f10a0229a1..8c088bdd40 100644 --- a/libs/libks/src/include/ks_types.h +++ b/libs/libks/src/include/ks_types.h @@ -210,6 +210,8 @@ struct ks_q_s; typedef struct ks_q_s ks_q_t; typedef void (*ks_flush_fn_t)(ks_q_t *q, void *ptr, void *flush_data); +typedef struct ks_thread_pool_s ks_thread_pool_t; + KS_END_EXTERN_C #endif /* defined(_KS_TYPES_H_) */ diff --git a/libs/libks/src/ks_mutex.c b/libs/libks/src/ks_mutex.c index 6c14f24958..aab2db845c 100644 --- a/libs/libks/src/ks_mutex.c +++ b/libs/libks/src/ks_mutex.c @@ -393,10 +393,14 @@ KS_DECLARE(ks_status_t) ks_cond_timedwait(ks_cond_t *cond, ks_time_t ms) #else struct timespec ts; ks_time_t n = ks_time_now() + (ms * 1000); + int r = 0; + ts.tv_sec = ks_time_sec(n); ts.tv_nsec = ks_time_nsec(n); - if (pthread_cond_timedwait(&cond->cond, &cond->mutex->mutex, &ts)) { - switch(errno) { + r = pthread_cond_timedwait(&cond->cond, &cond->mutex->mutex, &ts); + + if (r) { + switch(r) { case ETIMEDOUT: return KS_STATUS_TIMEOUT; default: diff --git a/libs/libks/src/ks_thread_pool.c b/libs/libks/src/ks_thread_pool.c new file mode 100644 index 0000000000..aa0ff29ef3 --- /dev/null +++ b/libs/libks/src/ks_thread_pool.c @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#define TP_MAX_QLEN 1024 + +typedef enum { + TP_STATE_DOWN = 0, + TP_STATE_RUNNING = 1 +} ks_thread_pool_state_t; + +struct ks_thread_pool_s { + uint32_t min; + uint32_t max; + uint32_t idle_sec; + size_t stack_size; + ks_thread_priority_t priority; + ks_q_t *q; + uint32_t thread_count; + uint32_t busy_thread_count; + uint32_t running_thread_count; + uint32_t dying_thread_count; + ks_thread_pool_state_t state; + ks_mutex_t *mutex; + ks_pool_t *pool; +}; + +typedef struct ks_thread_job_s { + ks_thread_function_t func; + void *data; +} ks_thread_job_t; + + +static void *worker_thread(ks_thread_t *thread, void *data); + +static int check_queue(ks_thread_pool_t *tp, ks_bool_t adding) +{ + ks_thread_t *thread; + int need = 0; + + ks_mutex_lock(tp->mutex); + + if (tp->state != TP_STATE_RUNNING) { + ks_mutex_unlock(tp->mutex); + return 1; + } + + + if (tp->thread_count < tp->min) { + need = tp->min - tp->thread_count; + } + + + if (adding) { + if (!need && tp->busy_thread_count >= tp->running_thread_count - tp->dying_thread_count && + (tp->thread_count - tp->dying_thread_count + 1 <= tp->max)) { + need = 1; + } + } + + tp->thread_count += need; + + ks_mutex_unlock(tp->mutex); + + while(need > 0) { + if (ks_thread_create_ex(&thread, worker_thread, tp, KS_THREAD_FLAG_DETATCHED, tp->stack_size, tp->priority, tp->pool) != KS_STATUS_SUCCESS) { + ks_mutex_lock(tp->mutex); + tp->thread_count--; + ks_mutex_unlock(tp->mutex); + } + + need--; + } + + ks_log(KS_LOG_DEBUG, "WORKER check: adding %d need %d running %d dying %d total %d max %d\n", + adding, need, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max); + + return need; +} + +static uint32_t TID = 0; + +static void *worker_thread(ks_thread_t *thread, void *data) +{ + ks_thread_pool_t *tp = (ks_thread_pool_t *) data; + uint32_t idle_sec = 0; + uint32_t my_id = 0; + int die = 0; + + ks_mutex_lock(tp->mutex); + tp->running_thread_count++; + my_id = ++TID; + ks_mutex_unlock(tp->mutex); + + while(tp->state == TP_STATE_RUNNING) { + ks_thread_job_t *job; + void *pop = NULL; + ks_status_t status; + + status = ks_q_pop_timeout(tp->q, &pop, 1000); + + ks_log(KS_LOG_DEBUG, "WORKER %d idle_sec %d running %d dying %d total %d max %d\n", + my_id, idle_sec, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max); + + check_queue(tp, KS_FALSE); + + if (status == KS_STATUS_TIMEOUT) { + idle_sec++; + + if (idle_sec >= tp->idle_sec) { + + ks_mutex_lock(tp->mutex); + if (tp->running_thread_count - tp->dying_thread_count - tp->busy_thread_count > tp->min) { + tp->dying_thread_count++; + die = 1; + } + ks_mutex_unlock(tp->mutex); + + if (die) { + ks_log(KS_LOG_DEBUG, "WORKER %d IDLE TIMEOUT\n", my_id); + break; + } + } + + continue; + } + + if ((status != KS_STATUS_SUCCESS && status != KS_STATUS_BREAK) || !pop) { + ks_log(KS_LOG_DEBUG, "WORKER %d POP FAIL %d %p\n", my_id, status, (void *)pop); + break; + } + + job = (ks_thread_job_t *) pop; + + ks_mutex_lock(tp->mutex); + tp->busy_thread_count++; + ks_mutex_unlock(tp->mutex); + + idle_sec = 0; + job->func(thread, job->data); + + ks_pool_free(tp->pool, job); + + ks_mutex_lock(tp->mutex); + tp->busy_thread_count--; + ks_mutex_unlock(tp->mutex); + } + + ks_mutex_lock(tp->mutex); + tp->running_thread_count--; + tp->thread_count--; + if (die) { + tp->dying_thread_count--; + } + ks_mutex_unlock(tp->mutex); + + return NULL; +} + +KS_DECLARE(ks_status_t) ks_thread_pool_create(ks_thread_pool_t **tp, uint32_t min, uint32_t max, size_t stack_size, + ks_thread_priority_t priority, uint32_t idle_sec) +{ + ks_pool_t *pool; + + ks_pool_open(&pool); + + *tp = (ks_thread_pool_t *) ks_pool_alloc(pool, sizeof(ks_thread_t)); + + (*tp)->min = min; + (*tp)->max = max; + (*tp)->pool = pool; + (*tp)->stack_size = stack_size; + (*tp)->priority = priority; + (*tp)->state = TP_STATE_RUNNING; + (*tp)->idle_sec = idle_sec; + + ks_mutex_create(&(*tp)->mutex, KS_MUTEX_FLAG_DEFAULT, (*tp)->pool); + ks_q_create(&(*tp)->q, (*tp)->pool, TP_MAX_QLEN); + + check_queue(*tp, KS_FALSE); + + return KS_STATUS_SUCCESS; + +} + + +KS_DECLARE(ks_status_t) ks_thread_pool_destroy(ks_thread_pool_t **tp) +{ + ks_pool_t *pool; + + ks_assert(tp); + + (*tp)->state = TP_STATE_DOWN; + + while((*tp)->thread_count) { + ks_sleep(100000); + } + + pool = (*tp)->pool; + ks_pool_close(&pool); + + return KS_STATUS_SUCCESS; +} + + +KS_DECLARE(ks_status_t) ks_thread_pool_add_job(ks_thread_pool_t *tp, ks_thread_function_t func, void *data) +{ + ks_thread_job_t *job = (ks_thread_job_t *) ks_pool_alloc(tp->pool, sizeof(*job)); + + job->func = func; + job->data = data; + ks_q_push(tp->q, job); + + check_queue(tp, KS_TRUE); + + return KS_STATUS_SUCCESS; +} + + + +KS_DECLARE(ks_size_t) ks_thread_pool_backlog(ks_thread_pool_t *tp) +{ + return ks_q_size(tp->q); +} diff --git a/libs/libks/test/Makefile.am b/libs/libks/test/Makefile.am index c7d3d8fd55..5c64a37fc6 100644 --- a/libs/libks/test/Makefile.am +++ b/libs/libks/test/Makefile.am @@ -9,6 +9,11 @@ testpools_SOURCES = testpools.c tap.c testpools_CFLAGS = $(AM_CFLAGS) testpools_LDADD = $(TEST_LDADD) +check_PROGRAMS += test_thread_pools +test_thread_pools_SOURCES = test_thread_pools.c tap.c +test_thread_pools_CFLAGS = $(AM_CFLAGS) +test_thread_pools_LDADD = $(TEST_LDADD) + check_PROGRAMS += testthreadmutex testthreadmutex_SOURCES = testthreadmutex.c tap.c testthreadmutex_CFLAGS = $(AM_CFLAGS) diff --git a/libs/libks/test/testq.c b/libs/libks/test/testq.c index 90f129755a..9f5936e027 100644 --- a/libs/libks/test/testq.c +++ b/libs/libks/test/testq.c @@ -29,12 +29,19 @@ int qtest1(int loops) ks_q_t *q; ks_pool_t *pool; int i; + int r = 1; + void *pop; ks_pool_open(&pool); ks_q_create(&q, pool, loops); ks_thread_create(&thread, test1_thread, q, pool); + if (ks_q_pop_timeout(q, &pop, 500) != KS_STATUS_TIMEOUT) { + r = 0; + goto end; + } + for (i = 0; i < 10000; i++) { int *val = (int *)ks_pool_alloc(pool, sizeof(int)); *val = i; @@ -57,11 +64,13 @@ int qtest1(int loops) ks_q_push(q, val); } + end: + ks_q_destroy(&q); ks_pool_close(&pool); - return 1; + return r; }