Taskprocessor optimization; switch Stasis to use taskprocessors

This patch optimizes taskprocessor to use a semaphore for signaling,
which the OS can do a better job at managing contention and waiting
that we can with a mutex and condition.

The taskprocessor execution was also slightly optimized to reduce the
number of locks taken.

The only observable difference in the taskprocessor implementation is
that when the final reference to the taskprocessor goes away, it will
execute all tasks to completion instead of discarding the unexecuted
tasks.

For systems where unnamed semaphores are not supported, a really
simple semaphore implementation is provided. (Which gives identical
performance as the original taskprocessor implementation).

The way we ended up implementing Stasis caused the threadpool to be a
burden instead of a boost to performance. This was switched to just
use taskprocessors directly for subscriptions.

Review: https://reviewboard.asterisk.org/r/2881/


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400178 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
David M. Lee
2013-09-30 18:26:27 +00:00
parent 52d73f05f4
commit e88afe2022
11 changed files with 703 additions and 563 deletions

View File

@@ -34,7 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
@@ -134,9 +133,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
/*! Threadpool for dispatching notifications to subscribers */
static struct ast_threadpool *pool;
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
@@ -286,7 +282,15 @@ struct stasis_subscription *internal_stasis_subscribe(
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
/* With a small number of subscribers, a thread-per-sub is
* acceptable. If our usage changes so that we have larger
* numbers of subscribers, we'll probably want to consider
* a threadpool. We had that originally, but with so few
* subscribers it was actually a performance loss instead of
* a gain.
*/
sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
TPS_REF_DEFAULT);
if (!sub->mailbox) {
return NULL;
}
@@ -731,13 +735,6 @@ void stasis_log_bad_type_access(const char *name)
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
/*! \brief Shutdown function */
static void stasis_exit(void)
{
ast_threadpool_shutdown(pool);
pool = NULL;
}
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@@ -748,36 +745,14 @@ int stasis_init(void)
{
int cache_init;
struct ast_threadpool_options opts;
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
ast_register_atexit(stasis_exit);
if (stasis_config_init() != 0) {
ast_log(LOG_ERROR, "Stasis configuration failed\n");
return -1;
}
if (stasis_wait_init() != 0) {
ast_log(LOG_ERROR, "Stasis initialization failed\n");
return -1;
}
if (pool) {
ast_log(LOG_ERROR, "Stasis double-initialized\n");
return -1;
}
stasis_config_get_threadpool_options(&opts);
ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
opts.initial_size, opts.max_size, opts.idle_timeout);
pool = ast_threadpool_create("stasis-core", NULL, &opts);
if (!pool) {
ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
return -1;
}
cache_init = stasis_cache_init();
if (cache_init != 0) {
return -1;