some perfomance tweaks
This commit is contained in:
parent
2368f556ac
commit
8664dc6d5a
|
@ -1 +1 @@
|
|||
Thu Apr 26 10:23:33 CDT 2012
|
||||
Thu May 3 16:30:20 CDT 2012
|
||||
|
|
|
@ -4165,7 +4165,7 @@ nta_leg_t *nta_leg_tcreate(nta_agent_t *agent,
|
|||
if (i == NONE) /* Magic value, used for compatibility */
|
||||
no_dialog = 1;
|
||||
|
||||
if (!(leg = su_home_clone(agent->sa_home, sizeof(*leg))))
|
||||
if (!(leg = su_home_clone(NULL, sizeof(*leg))))
|
||||
return NULL;
|
||||
home = leg->leg_home;
|
||||
|
||||
|
@ -4394,7 +4394,8 @@ void nta_leg_destroy(nta_leg_t *leg)
|
|||
static
|
||||
void leg_free(nta_agent_t *sa, nta_leg_t *leg)
|
||||
{
|
||||
su_free(sa->sa_home, leg);
|
||||
//su_free(sa->sa_home, leg);
|
||||
su_home_unref((su_home_t *)leg);
|
||||
}
|
||||
|
||||
/** Return application context for the leg */
|
||||
|
@ -5327,7 +5328,7 @@ nta_incoming_t *incoming_create(nta_agent_t *agent,
|
|||
}
|
||||
irq->irq_branch = sip->sip_via->v_branch;
|
||||
irq->irq_reliable_tp = tport_is_reliable(tport);
|
||||
irq->irq_extra_100 = 1; /* Sending extra 100 trying true by default */
|
||||
irq->irq_extra_100 = 0; /* Sending extra 100 trying false by default */
|
||||
|
||||
if (sip->sip_timestamp)
|
||||
irq->irq_timestamp = sip_timestamp_copy(home, sip->sip_timestamp);
|
||||
|
@ -6068,7 +6069,7 @@ incoming_recv(nta_incoming_t *irq, msg_t *msg, sip_t *sip, tport_t *tport)
|
|||
if (irq->irq_status >= 100) {
|
||||
SU_DEBUG_5(("nta: re-received %s request, retransmitting %u reply\n",
|
||||
sip->sip_request->rq_method_name, irq->irq_status));
|
||||
incoming_retransmit_reply(irq, tport);
|
||||
incoming_retransmit_reply(irq, tport);
|
||||
}
|
||||
else if (irq->irq_agent->sa_extra_100 &&
|
||||
irq->irq_extra_100) {
|
||||
|
@ -6910,6 +6911,7 @@ _nta_incoming_timer(nta_agent_t *sa)
|
|||
incoming_reset_timer(irq);
|
||||
|
||||
if(irq->irq_extra_100) {
|
||||
printf("COCK FACE\n");
|
||||
SU_DEBUG_5(("nta: timer N1 fired, sending %u %s\n", SIP_100_TRYING));
|
||||
nta_incoming_treply(irq, SIP_100_TRYING, TAG_END());
|
||||
}
|
||||
|
|
|
@ -109,8 +109,8 @@ nua_handle_t *nh_create_handle(nua_t *nua,
|
|||
|
||||
assert(nua->nua_home);
|
||||
|
||||
if ((nh = su_home_clone(nua->nua_home, sizeof(*nh)))) {
|
||||
//if ((nh = su_home_new(sizeof(*nh)))) {
|
||||
//if ((nh = su_home_clone(nua->nua_home, sizeof(*nh)))) {
|
||||
if ((nh = su_home_new(sizeof(*nh)))) {
|
||||
nh->nh_valid = nua_valid_handle_cookie;
|
||||
nh->nh_nua = nua;
|
||||
nh->nh_magic = hmagic;
|
||||
|
|
|
@ -261,11 +261,12 @@ int nua_stack_process_request(nua_handle_t *nh,
|
|||
}
|
||||
|
||||
if (sr->sr_status <= 100) {
|
||||
SR_STATUS1(sr, SIP_100_TRYING);
|
||||
SR_STATUS1(sr, SIP_100_TRYING);
|
||||
if (method == sip_method_invite || sip->sip_timestamp) {
|
||||
nta_incoming_treply(irq, SIP_100_TRYING,
|
||||
SIPTAG_USER_AGENT_STR(user_agent),
|
||||
TAG_END());
|
||||
nta_incoming_treply(irq, SIP_100_TRYING,
|
||||
SIPTAG_USER_AGENT_STR(user_agent),
|
||||
TAG_END());
|
||||
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -91,6 +91,10 @@ static switch_status_t sofia_on_init(switch_core_session_t *session)
|
|||
sofia_set_flag(tech_pvt, TFLAG_RECOVERED);
|
||||
}
|
||||
|
||||
if (switch_channel_direction(tech_pvt->channel) == SWITCH_CALL_DIRECTION_INBOUND) {
|
||||
nua_respond(tech_pvt->nh, 101, "Dialing", TAG_END());
|
||||
}
|
||||
|
||||
if (sofia_test_flag(tech_pvt, TFLAG_OUTBOUND) || sofia_test_flag(tech_pvt, TFLAG_RECOVERING)) {
|
||||
const char *var;
|
||||
|
||||
|
@ -5343,7 +5347,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load)
|
|||
switch_chat_interface_t *chat_interface;
|
||||
switch_api_interface_t *api_interface;
|
||||
switch_management_interface_t *management_interface;
|
||||
uint32_t cpus = switch_core_cpu_count();
|
||||
struct in_addr in;
|
||||
|
||||
memset(&mod_sofia_globals, 0, sizeof(mod_sofia_globals));
|
||||
|
@ -5381,9 +5384,16 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load)
|
|||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for profiles to start\n");
|
||||
switch_yield(1500000);
|
||||
|
||||
/* start one message thread per cpu */
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Starting %u message threads.\n", cpus);
|
||||
sofia_msg_thread_start(cpus);
|
||||
mod_sofia_globals.cpu_count = switch_core_cpu_count();
|
||||
mod_sofia_globals.max_msg_queues = mod_sofia_globals.cpu_count + 1;
|
||||
|
||||
if (mod_sofia_globals.max_msg_queues > SOFIA_MAX_MSG_QUEUE) {
|
||||
mod_sofia_globals.max_msg_queues = SOFIA_MAX_MSG_QUEUE;
|
||||
}
|
||||
|
||||
/* start one message thread */
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Starting initial message thread.\n");
|
||||
sofia_msg_thread_start(0);
|
||||
|
||||
if (switch_event_bind_removable(modname, SWITCH_EVENT_CUSTOM, MULTICAST_EVENT, event_handler, NULL,
|
||||
&mod_sofia_globals.custom_node) != SWITCH_STATUS_SUCCESS) {
|
||||
|
|
|
@ -336,8 +336,8 @@ typedef enum {
|
|||
TFLAG_MAX
|
||||
} TFLAGS;
|
||||
|
||||
#define SOFIA_MAX_MSG_QUEUE 101
|
||||
#define SOFIA_MSG_QUEUE_SIZE 5000
|
||||
#define SOFIA_MAX_MSG_QUEUE 64
|
||||
#define SOFIA_MSG_QUEUE_SIZE 100
|
||||
|
||||
struct mod_sofia_globals {
|
||||
switch_memory_pool_t *pool;
|
||||
|
@ -347,6 +347,8 @@ struct mod_sofia_globals {
|
|||
uint32_t callid;
|
||||
int32_t running;
|
||||
int32_t threads;
|
||||
int cpu_count;
|
||||
int max_msg_queues;
|
||||
switch_mutex_t *mutex;
|
||||
char guess_ip[80];
|
||||
char hostname[512];
|
||||
|
|
|
@ -1236,8 +1236,25 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj
|
|||
{
|
||||
void *pop;
|
||||
switch_queue_t *q = (switch_queue_t *) obj;
|
||||
int my_id;
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread Started\n");
|
||||
for (my_id = 0; my_id < mod_sofia_globals.msg_queue_len; my_id++) {
|
||||
if (mod_sofia_globals.msg_queue[my_id] == q) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id);
|
||||
|
||||
#ifdef HAVE_CPU_SET_MACROS
|
||||
{
|
||||
cpu_set_t set;
|
||||
CPU_ZERO(&set);
|
||||
CPU_SET(my_id, &set);
|
||||
sched_setaffinity(0, sizeof(set), &set);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
while(switch_queue_pop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) {
|
||||
|
@ -1251,12 +1268,11 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int IDX = 0;
|
||||
|
||||
void sofia_msg_thread_start(int idx)
|
||||
{
|
||||
|
||||
if (idx >= SOFIA_MAX_MSG_QUEUE || (idx < mod_sofia_globals.msg_queue_len && mod_sofia_globals.msg_queue_thread[idx])) {
|
||||
if (idx >= mod_sofia_globals.max_msg_queues ||
|
||||
idx >= SOFIA_MAX_MSG_QUEUE || (idx < mod_sofia_globals.msg_queue_len && mod_sofia_globals.msg_queue_thread[idx])) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1274,7 +1290,7 @@ void sofia_msg_thread_start(int idx)
|
|||
|
||||
switch_threadattr_create(&thd_attr, mod_sofia_globals.pool);
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
//switch_threadattr_priority_increase(thd_attr);
|
||||
switch_threadattr_priority_increase(thd_attr);
|
||||
switch_thread_create(&mod_sofia_globals.msg_queue_thread[i],
|
||||
thd_attr,
|
||||
sofia_msg_thread_run,
|
||||
|
@ -1290,31 +1306,32 @@ void sofia_msg_thread_start(int idx)
|
|||
|
||||
static void sofia_queue_message(sofia_dispatch_event_t *de)
|
||||
{
|
||||
int idx = 0;
|
||||
int idx = 0, queued = 0;
|
||||
|
||||
if (mod_sofia_globals.running == 0) {
|
||||
if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue[0]) {
|
||||
sofia_process_dispatch_event(&de);
|
||||
return;
|
||||
}
|
||||
|
||||
again:
|
||||
|
||||
switch_mutex_lock(mod_sofia_globals.mutex);
|
||||
idx = IDX;
|
||||
IDX++;
|
||||
if (IDX >= mod_sofia_globals.msg_queue_len) IDX = 0;
|
||||
switch_mutex_unlock(mod_sofia_globals.mutex);
|
||||
|
||||
sofia_msg_thread_start(idx);
|
||||
|
||||
if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) != SWITCH_STATUS_SUCCESS) {
|
||||
if (mod_sofia_globals.msg_queue_len < SOFIA_MAX_MSG_QUEUE) {
|
||||
sofia_msg_thread_start(idx + 1);
|
||||
goto again;
|
||||
} else {
|
||||
switch_queue_push(mod_sofia_globals.msg_queue[idx], de);
|
||||
for (idx = 0; idx < mod_sofia_globals.msg_queue_len; idx++) {
|
||||
if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) == SWITCH_STATUS_SUCCESS) {
|
||||
queued++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!queued) {
|
||||
|
||||
if (mod_sofia_globals.msg_queue_len < mod_sofia_globals.max_msg_queues) {
|
||||
sofia_msg_thread_start(mod_sofia_globals.msg_queue_len + 1);
|
||||
goto again;
|
||||
}
|
||||
|
||||
switch_queue_push(mod_sofia_globals.msg_queue[0], de);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -1959,6 +1976,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
|
|||
NUTAG_AUTOACK(0),
|
||||
NUTAG_AUTOALERT(0),
|
||||
NUTAG_ENABLEMESSENGER(1),
|
||||
NTATAG_EXTRA_100(0),
|
||||
TAG_IF((profile->mflags & MFLAG_REGISTER), NUTAG_ALLOW("REGISTER")),
|
||||
TAG_IF((profile->mflags & MFLAG_REFER), NUTAG_ALLOW("REFER")),
|
||||
TAG_IF(!sofia_test_pflag(profile, PFLAG_DISABLE_100REL), NUTAG_ALLOW("PRACK")),
|
||||
|
@ -3652,18 +3670,6 @@ switch_status_t config_sofia(int reload, char *profile_name)
|
|||
mod_sofia_globals.debug_sla = atoi(val);
|
||||
} else if (!strcasecmp(var, "auto-restart")) {
|
||||
mod_sofia_globals.auto_restart = switch_true(val);
|
||||
} else if (!strcasecmp(var, "message-threads")) {
|
||||
int num = atoi(val);
|
||||
|
||||
if (num < 1 || num > SOFIA_MAX_MSG_QUEUE - 1) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "message-threads must be between 1 and %d", SOFIA_MAX_MSG_QUEUE -1);
|
||||
}
|
||||
|
||||
if (num < 1) num = 1;
|
||||
if (num > SOFIA_MAX_MSG_QUEUE - 1) num = SOFIA_MAX_MSG_QUEUE -1;
|
||||
|
||||
sofia_msg_thread_start(num);
|
||||
|
||||
} else if (!strcasecmp(var, "reg-deny-binding-fetch-and-no-lookup")) { /* backwards compatibility */
|
||||
mod_sofia_globals.reg_deny_binding_fetch_and_no_lookup = switch_true(val); /* remove when noone complains about the extra lookup */
|
||||
if (switch_true(val)) {
|
||||
|
@ -7521,7 +7527,7 @@ void sofia_handle_sip_i_invite(nua_t *nua, sofia_profile_t *profile, nua_handle_
|
|||
nua_respond(nh, 400, "Missing Contact Header", TAG_END());
|
||||
goto fail;
|
||||
}
|
||||
|
||||
|
||||
sofia_glue_get_addr(de->data->e_msg, network_ip, sizeof(network_ip), &network_port);
|
||||
|
||||
if (sofia_test_pflag(profile, PFLAG_AGGRESSIVE_NAT_DETECTION)) {
|
||||
|
|
|
@ -1684,7 +1684,7 @@ static void switch_load_core_config(const char *file)
|
|||
} else if (end_of(val) == 'm') {
|
||||
tmp *= (1024 * 1024);
|
||||
}
|
||||
|
||||
|
||||
if (tmp >= 32000 && tmp < 10500000) {
|
||||
runtime.sql_buffer_len = tmp;
|
||||
} else {
|
||||
|
|
|
@ -63,7 +63,7 @@ struct switch_event_subclass {
|
|||
int bind;
|
||||
};
|
||||
|
||||
#define MAX_DISPATCH_VAL 20
|
||||
#define MAX_DISPATCH_VAL 64
|
||||
static unsigned int MAX_DISPATCH = MAX_DISPATCH_VAL;
|
||||
static unsigned int SOFT_MAX_DISPATCH = 0;
|
||||
static char guess_ip_v4[80] = "";
|
||||
|
@ -254,6 +254,16 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
|
|||
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
|
||||
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
|
||||
|
||||
#ifdef HAVE_CPU_SET_MACROS
|
||||
{
|
||||
cpu_set_t set;
|
||||
CPU_ZERO(&set);
|
||||
CPU_SET(my_id, &set);
|
||||
sched_setaffinity(0, sizeof(set), &set);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
for (;;) {
|
||||
void *pop = NULL;
|
||||
switch_event_t *event = NULL;
|
||||
|
@ -291,7 +301,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
|
|||
switch_queue_t *queue = (switch_queue_t *) obj;
|
||||
uint32_t index = 0;
|
||||
int my_id = 0;
|
||||
int auto_pause = 0;
|
||||
|
||||
switch_mutex_lock(EVENT_QUEUE_MUTEX);
|
||||
THREAD_COUNT++;
|
||||
|
@ -306,15 +315,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
|
|||
for (;;) {
|
||||
void *pop = NULL;
|
||||
switch_event_t *event = NULL;
|
||||
int loops = 0;
|
||||
|
||||
if (auto_pause) {
|
||||
if (!--auto_pause) {
|
||||
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
|
||||
} else {
|
||||
switch_cond_next();
|
||||
}
|
||||
}
|
||||
|
||||
if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
|
||||
break;
|
||||
|
@ -332,19 +332,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
|
|||
|
||||
while (event) {
|
||||
|
||||
|
||||
if (++loops > 2) {
|
||||
if (auto_pause) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event system *still* overloading.\n");
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
|
||||
"Event system overloading. Taking a 10 second break\n");
|
||||
auto_pause = 10;
|
||||
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
|
||||
}
|
||||
switch_yield(1000000);
|
||||
}
|
||||
|
||||
for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
|
||||
if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
|
||||
event = NULL;
|
||||
|
@ -358,8 +345,8 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
|
|||
launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
|
||||
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of event dispatch threads! Slowing things down.\n");
|
||||
switch_yield(1000000);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of event dispatch threads! Resorting to a blocking push.... Look for laggy event consumers or event_socket connections!\n");
|
||||
switch_queue_push(EVENT_DISPATCH_QUEUE[0], event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -484,6 +471,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_reserve_subclass_detailed(const cha
|
|||
|
||||
SWITCH_DECLARE(void) switch_core_memory_reclaim_events(void)
|
||||
{
|
||||
int index;
|
||||
|
||||
for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
|
||||
if (EVENT_DISPATCH_QUEUE[index]) {
|
||||
printf("%d size: %u\n", index, switch_queue_size(EVENT_DISPATCH_QUEUE[index]));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef SWITCH_EVENT_RECYCLE
|
||||
|
||||
void *pop;
|
||||
|
@ -520,9 +516,11 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
|
|||
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
|
||||
|
||||
for (x = 0; x < 3; x++) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x);
|
||||
switch_queue_trypush(EVENT_QUEUE[x], NULL);
|
||||
switch_queue_interrupt_all(EVENT_QUEUE[x]);
|
||||
if (EVENT_QUEUE[x]) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x);
|
||||
switch_queue_trypush(EVENT_QUEUE[x], NULL);
|
||||
switch_queue_interrupt_all(EVENT_QUEUE[x]);
|
||||
}
|
||||
}
|
||||
|
||||
for (x = 0; x < SOFT_MAX_DISPATCH; x++) {
|
||||
|
@ -558,12 +556,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
|
|||
switch_event_t *event = NULL;
|
||||
switch_status_t st;
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x);
|
||||
switch_thread_join(&st, EVENT_QUEUE_THREADS[x]);
|
||||
if (EVENT_QUEUE_THREADS[x]) {
|
||||
|
||||
while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
|
||||
event = (switch_event_t *) pop;
|
||||
switch_event_destroy(&event);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x);
|
||||
switch_thread_join(&st, EVENT_QUEUE_THREADS[x]);
|
||||
|
||||
while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
|
||||
event = (switch_event_t *) pop;
|
||||
switch_event_destroy(&event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -608,7 +609,11 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
|
|||
switch_threadattr_priority_increase(thd_attr);
|
||||
switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool);
|
||||
while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
|
||||
if (index == 1) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create additional event dispatch thread %d\n", index);
|
||||
}
|
||||
launched++;
|
||||
break;
|
||||
}
|
||||
|
@ -618,7 +623,7 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
|
|||
|
||||
SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
|
||||
{
|
||||
switch_threadattr_t *thd_attr;;
|
||||
//switch_threadattr_t *thd_attr;
|
||||
|
||||
/*
|
||||
This statement doesn't do anything commenting it out for now.
|
||||
|
@ -626,6 +631,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
|
|||
switch_assert(switch_arraylen(EVENT_NAMES) == SWITCH_EVENT_ALL + 1);
|
||||
*/
|
||||
|
||||
/* don't need any more dispatch threads than we have CPU's*/
|
||||
MAX_DISPATCH = switch_core_cpu_count();
|
||||
|
||||
|
||||
switch_assert(pool != NULL);
|
||||
THRUNTIME_POOL = RUNTIME_POOL = pool;
|
||||
|
@ -640,26 +648,26 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
|
|||
SYSTEM_RUNNING = -1;
|
||||
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
|
||||
|
||||
switch_threadattr_create(&thd_attr, pool);
|
||||
//switch_threadattr_create(&thd_attr, pool);
|
||||
switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), NULL, AF_INET);
|
||||
switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), NULL, AF_INET6);
|
||||
|
||||
|
||||
switch_queue_create(&EVENT_QUEUE[0], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
switch_queue_create(&EVENT_QUEUE[1], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
switch_queue_create(&EVENT_QUEUE[2], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
//switch_queue_create(&EVENT_QUEUE[0], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
//switch_queue_create(&EVENT_QUEUE[1], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
//switch_queue_create(&EVENT_QUEUE[2], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
#ifdef SWITCH_EVENT_RECYCLE
|
||||
switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
|
||||
switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
|
||||
#endif
|
||||
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
switch_threadattr_priority_increase(thd_attr);
|
||||
//switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
//switch_threadattr_priority_increase(thd_attr);
|
||||
|
||||
launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
|
||||
switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
|
||||
switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
|
||||
switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
|
||||
//switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
|
||||
//switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
|
||||
//switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
|
||||
|
||||
while (!THREAD_COUNT) {
|
||||
switch_cond_next();
|
||||
|
@ -1775,8 +1783,6 @@ SWITCH_DECLARE(void) switch_event_prep_for_delivery_detailed(const char *file, c
|
|||
SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, const char *func, int line, switch_event_t **event, void *user_data)
|
||||
{
|
||||
|
||||
int index;
|
||||
|
||||
switch_assert(BLOCK != NULL);
|
||||
switch_assert(RUNTIME_POOL != NULL);
|
||||
switch_assert(EVENT_QUEUE_MUTEX != NULL);
|
||||
|
@ -1792,13 +1798,22 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con
|
|||
(*event)->event_user_data = user_data;
|
||||
}
|
||||
|
||||
if (!EVENT_QUEUE_THREADS[(*event)->priority] && (*event)->priority < 3) {
|
||||
switch_threadattr_t *thd_attr;
|
||||
|
||||
switch_queue_create(&EVENT_QUEUE[(*event)->priority], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
|
||||
switch_threadattr_create(&thd_attr, THRUNTIME_POOL);
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
switch_threadattr_priority_increase(thd_attr);
|
||||
switch_thread_create(&EVENT_QUEUE_THREADS[(*event)->priority], thd_attr, switch_event_thread, EVENT_QUEUE[(*event)->priority], RUNTIME_POOL);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
for (index = (*event)->priority; index < 3; index++) {
|
||||
if (switch_queue_trypush(EVENT_QUEUE[index], *event) == SWITCH_STATUS_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
if (switch_queue_trypush(EVENT_QUEUE[(*event)->priority], *event) == SWITCH_STATUS_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event queue is full!\n");
|
||||
switch_yield(100000);
|
||||
}
|
||||
|
|
|
@ -799,8 +799,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime)
|
|||
if (runtime.timer_affinity > -1) {
|
||||
cpu_set_t set;
|
||||
CPU_ZERO(&set);
|
||||
CPU_SET(0, &set);
|
||||
sched_setaffinity(runtime.timer_affinity, sizeof(set), &set);
|
||||
CPU_SET(runtime.timer_affinity, &set);
|
||||
sched_setaffinity(0, sizeof(set), &set);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
|
Loading…
Reference in New Issue