diff --git a/src/include/switch_apr.h b/src/include/switch_apr.h index 3775da76de..5a5cbd3fe8 100644 --- a/src/include/switch_apr.h +++ b/src/include/switch_apr.h @@ -854,6 +854,9 @@ SWITCH_DECLARE(const char *) switch_dir_next_file(switch_dir_t *thedir, char *bu //APR_DECLARE(apr_status_t) apr_threadattr_stacksize_set(apr_threadattr_t *attr, switch_size_t stacksize) SWITCH_DECLARE(switch_status_t) switch_threadattr_stacksize_set(switch_threadattr_t * attr, switch_size_t stacksize); +SWITCH_DECLARE(switch_status_t) switch_threadattr_priority_increase(switch_threadattr_t *attr); + + /** * Create and initialize a new threadattr variable * @param new_attr The newly created threadattr. diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index a9a7860b23..9d2354806c 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -262,7 +262,7 @@ switch_status_t sofia_on_hangup(switch_core_session_t *session) switch_core_hash_delete(tech_pvt->profile->chat_hash, tech_pvt->hash_key); } - if (session) { + if (session && (tech_pvt->profile->pflags & PFLAG_PRESENCE)) { char *sql = switch_mprintf("delete from sip_dialogs where call_id='%q'", tech_pvt->call_id); switch_assert(sql); sofia_glue_execute_sql(tech_pvt->profile, SWITCH_FALSE, sql, tech_pvt->profile->ireg_mutex); @@ -712,6 +712,13 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi goto end; } + if (msg->message_id == SWITCH_MESSAGE_INDICATE_ANSWER || msg->message_id == SWITCH_MESSAGE_INDICATE_PROGRESS) { + const char *var; + if ((var = switch_channel_get_variable(channel, SOFIA_SECURE_MEDIA_VARIABLE)) && switch_true(var)) { + switch_set_flag_locked(tech_pvt, TFLAG_SECURE); + } + } + switch (msg->message_id) { case SWITCH_MESSAGE_INDICATE_VIDEO_REFRESH_REQ: { diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index 72b1efe8e3..fd8ffa4da7 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -88,6 +88,11 @@ typedef struct private_object private_object_t; #include #include +typedef struct { + switch_bool_t master; + char *sql; +} sofia_sql_job_t; + typedef enum { DTMF_2833, @@ -130,7 +135,8 @@ typedef enum { PFLAG_TLS = (1 << 13), PFLAG_CHECKUSER = (1 << 14), PFLAG_SECURE = (1 << 15), - PFLAG_BLIND_AUTH = (1 << 16) + PFLAG_BLIND_AUTH = (1 << 16), + PFLAG_WORKER_RUNNING = (1 << 17), } PFLAGS; typedef enum { @@ -298,6 +304,7 @@ struct sofia_profile { char *odbc_user; char *odbc_pass; switch_odbc_handle_t *master_odbc; + switch_queue_t *sql_queue; }; struct private_object { @@ -503,6 +510,7 @@ void sofia_presence_handle_sip_i_subscribe(int status, nua_t *nua, sofia_profile_t *profile, nua_handle_t *nh, sofia_private_t *sofia_private, sip_t const *sip, tagi_t tags[]); void sofia_glue_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex); +void sofia_glue_actually_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex); void sofia_reg_check_expire(sofia_profile_t *profile, time_t now); void sofia_reg_check_gateway(sofia_profile_t *profile, time_t now); void sofia_reg_unregister(sofia_profile_t *profile); diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 86e09db622..4f701cd959 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -419,13 +419,71 @@ void event_handler(switch_event_t *event) } } +void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj) +{ + sofia_profile_t *profile = (sofia_profile_t *) obj; + uint32_t ireg_loops = 0; + uint32_t gateway_loops = 0; + void *pop; + sofia_sql_job_t *job; + int loops = 0; + + ireg_loops = IREG_SECONDS; + gateway_loops = GATEWAY_SECONDS; + + sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING); + + switch_queue_create(&profile->sql_queue, 500000, profile->pool); + + while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || switch_queue_size(profile->sql_queue)) { + while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + job = (sofia_sql_job_t *) pop; + sofia_glue_actually_execute_sql(profile, job->master, job->sql, NULL); + free(job->sql); + free(job); + job = NULL; + } + + if (++loops >= 100) { + if (++ireg_loops >= IREG_SECONDS) { + sofia_reg_check_expire(profile, switch_timestamp(NULL)); + ireg_loops = 0; + } + + if (++gateway_loops >= GATEWAY_SECONDS) { + sofia_reg_check_gateway(profile, switch_timestamp(NULL)); + gateway_loops = 0; + } + loops = 0; + } + + switch_yield(10000); + } + + sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING); + + return NULL; +} + + +void launch_sofia_worker_thread(sofia_profile_t *profile) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + + switch_threadattr_create(&thd_attr, profile->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_threadattr_priority_increase(thd_attr); + switch_thread_create(&thread, thd_attr, sofia_profile_worker_thread_run, profile, profile->pool); + switch_yield(1000000); +} + void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void *obj) { sofia_profile_t *profile = (sofia_profile_t *) obj; switch_memory_pool_t *pool; sip_alias_node_t *node; - uint32_t ireg_loops = 0; - uint32_t gateway_loops = 0; switch_event_t *s_event; int tportlog = 0; @@ -516,9 +574,6 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool); switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool); - ireg_loops = IREG_SECONDS; - gateway_loops = GATEWAY_SECONDS; - if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) { switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s", (sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : ""); @@ -547,18 +602,9 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void switch_yield(1000000); sofia_set_pflag_locked(profile, PFLAG_RUNNING); + launch_sofia_worker_thread(profile); - while (mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) { - if (++ireg_loops >= IREG_SECONDS) { - sofia_reg_check_expire(profile, switch_timestamp(NULL)); - ireg_loops = 0; - } - - if (++gateway_loops >= GATEWAY_SECONDS) { - sofia_reg_check_gateway(profile, switch_timestamp(NULL)); - gateway_loops = 0; - } - + while (mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING) && sofia_test_pflag(profile, PFLAG_WORKER_RUNNING)) { su_root_step(profile->s_root, 1000); } @@ -568,6 +614,13 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void nua_shutdown(profile->nua); su_root_run(profile->s_root); + sofia_clear_pflag_locked(profile, PFLAG_RUNNING); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "waiting for worker thread\n"); + + while(sofia_test_pflag(profile, PFLAG_WORKER_RUNNING)) { + switch_yield(100000); + } + while(profile->inuse) { switch_yield(100000); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "waiting for %d session(s)\n", profile->inuse); @@ -630,9 +683,12 @@ void launch_sofia_profile_thread(sofia_profile_t *profile) switch_threadattr_create(&thd_attr, profile->pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_threadattr_priority_increase(thd_attr); switch_thread_create(&thread, thd_attr, sofia_profile_thread_run, profile, profile->pool); } + + static void logger(void *logarg, char const *fmt, va_list ap) { char *data = NULL; @@ -1415,26 +1471,28 @@ static void sofia_handle_sip_r_invite(switch_core_session_t *session, int status contact_host = switch_str_nil(contact->url_host); } - sql = switch_mprintf( - "insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')", - call_id, - switch_core_session_get_uuid(session), - to_user, - to_host, - from_user, - from_host, - contact_user, - contact_host, - astate, - "outbound", - user_agent - ); + if (profile->pflags & PFLAG_PRESENCE) { + sql = switch_mprintf( + "insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')", + call_id, + switch_core_session_get_uuid(session), + to_user, + to_host, + from_user, + from_host, + contact_user, + contact_host, + astate, + "outbound", + user_agent + ); - switch_assert(sql); - - sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex); - free(sql); - } else if (status == 200) { + switch_assert(sql); + + sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex); + free(sql); + } + } else if (status == 200 && (profile->pflags & PFLAG_PRESENCE)) { char *sql = NULL; sql = switch_mprintf("update sip_dialogs set state='%s' where uuid='%s';\n", astate, switch_core_session_get_uuid(session)); switch_assert(sql); @@ -2364,6 +2422,7 @@ void sofia_handle_sip_i_invite(nua_t *nua, sofia_profile_t *profile, nua_handle_ int is_auth = 0, calling_myself = 0; su_addrinfo_t *my_addrinfo = msg_addrinfo(nua_current_request(nua)); + if (sess_count >= sess_max || !(profile->pflags & PFLAG_RUNNING)) { nua_respond(nh, 480, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END()); return; @@ -2795,25 +2854,29 @@ void sofia_handle_sip_i_invite(nua_t *nua, sofia_profile_t *profile, nua_handle_ contact_host = switch_str_nil(contact->url_host); } - sql = switch_mprintf( - "insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')", - call_id, - tech_pvt->sofia_private->uuid, - to_user, - to_host, - dialog_from_user, - dialog_from_host, - contact_user, - contact_host, - "confirmed", - "inbound", - user_agent - ); - switch_assert(sql); + if (profile->pflags & PFLAG_PRESENCE) { - sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex); - free(sql); + sql = switch_mprintf( + "insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')", + call_id, + tech_pvt->sofia_private->uuid, + to_user, + to_host, + dialog_from_user, + dialog_from_host, + contact_user, + contact_host, + "confirmed", + "inbound", + user_agent + ); + + switch_assert(sql); + sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex); + free(sql); + } + return; } diff --git a/src/mod/endpoints/mod_sofia/sofia_glue.c b/src/mod/endpoints/mod_sofia/sofia_glue.c index 69ee786a14..b7678e44c0 100644 --- a/src/mod/endpoints/mod_sofia/sofia_glue.c +++ b/src/mod/endpoints/mod_sofia/sofia_glue.c @@ -2444,6 +2444,31 @@ void sofia_glue_sql_close(sofia_profile_t *profile) void sofia_glue_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + sofia_sql_job_t *job = NULL; + + if (profile->sql_queue) { + switch_zmalloc(job, sizeof(*job)); + + job->sql = strdup(sql); + switch_assert(job->sql); + job->master = master; + + status = switch_queue_trypush(profile->sql_queue, job); + } + + if (status != SWITCH_STATUS_SUCCESS) { + if (job) { + free(job->sql); + free(job); + } + sofia_glue_actually_execute_sql(profile, master, sql, mutex); + } + +} + +void sofia_glue_actually_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex) { switch_core_db_t *db; diff --git a/src/switch_apr.c b/src/switch_apr.c index ccde8ef40c..57863cd974 100644 --- a/src/switch_apr.c +++ b/src/switch_apr.c @@ -486,6 +486,33 @@ SWITCH_DECLARE(switch_status_t) switch_threadattr_stacksize_set(switch_threadatt return apr_threadattr_stacksize_set(attr, stacksize); } +#ifndef WIN32 +struct apr_threadattr_t { + apr_pool_t *pool; + pthread_attr_t attr; +}; +#endif + +SWITCH_DECLARE(switch_status_t) switch_threadattr_priority_increase(switch_threadattr_t *attr) +{ + int stat = 0; +#ifndef WIN32 + struct sched_param param; + struct apr_threadattr_t *myattr = attr; + + pthread_attr_getschedparam(&myattr->attr, ¶m); + param.sched_priority = 50; + stat = pthread_attr_setschedparam(&myattr->attr, ¶m); + + if (stat == 0) { + return SWITCH_STATUS_SUCCESS; + } + +#endif + return stat; +} + + SWITCH_DECLARE(switch_status_t) switch_thread_create(switch_thread_t ** new_thread, switch_threadattr_t * attr, switch_thread_start_t func, void *data, switch_memory_pool_t *cont) {