From 994cdbd183db25e993b61bd1101666cd7d6b31d9 Mon Sep 17 00:00:00 2001 From: Giovanni Maruzzelli Date: Tue, 26 Oct 2010 04:42:46 -0500 Subject: [PATCH] skypopen: lot of improvements in locking and hangup handling --- src/mod/endpoints/mod_skypopen/mod_skypopen.c | 123 +++++++++++++++++- src/mod/endpoints/mod_skypopen/skypopen.h | 2 + .../mod_skypopen/skypopen_protocol.c | 14 ++ 3 files changed, 132 insertions(+), 7 deletions(-) diff --git a/src/mod/endpoints/mod_skypopen/mod_skypopen.c b/src/mod/endpoints/mod_skypopen/mod_skypopen.c index 5837cbe1e4..62461de115 100644 --- a/src/mod/endpoints/mod_skypopen/mod_skypopen.c +++ b/src/mod/endpoints/mod_skypopen/mod_skypopen.c @@ -454,6 +454,7 @@ static switch_status_t channel_on_init(switch_core_session_t *session) static switch_status_t channel_on_destroy(switch_core_session_t *session) { private_t *tech_pvt = NULL; + switch_status_t status; tech_pvt = switch_core_session_get_private(session); @@ -483,12 +484,37 @@ static switch_status_t channel_on_destroy(switch_core_session_t *session) if(tech_pvt->write_buffer){ switch_buffer_destroy(&tech_pvt->write_buffer); } + DEBUGA_SKYPE("debugging_hangup 13\n", SKYPOPEN_P_LOG); + switch_mutex_lock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli lock\n", SKYPOPEN_P_LOG); + if (tech_pvt->tcp_cli_thread) { + DEBUGA_SKYPE("debugging_hangup 14\n", SKYPOPEN_P_LOG); + switch_thread_join(&status, tech_pvt->tcp_cli_thread); + tech_pvt->tcp_cli_thread = NULL; + DEBUGA_SKYPE("debugging_hangup 15\n", SKYPOPEN_P_LOG); + } + switch_mutex_unlock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli unlock\n", SKYPOPEN_P_LOG); + switch_mutex_lock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv lock\n", SKYPOPEN_P_LOG); + if (tech_pvt->tcp_srv_thread) { + DEBUGA_SKYPE("debugging_hangup 16\n", SKYPOPEN_P_LOG); + switch_thread_join(&status, tech_pvt->tcp_srv_thread); + tech_pvt->tcp_srv_thread = NULL; + DEBUGA_SKYPE("debugging_hangup 17\n", SKYPOPEN_P_LOG); + } + switch_mutex_unlock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv unlock\n", SKYPOPEN_P_LOG); + DEBUGA_SKYPE("debugging_hangup 18\n", SKYPOPEN_P_LOG); *tech_pvt->session_uuid_str = '\0'; tech_pvt->interface_state = SKYPOPEN_STATE_IDLE; + tech_pvt->skype_callflow = CALLFLOW_CALL_IDLE; +#if 0 if (tech_pvt->skype_callflow == CALLFLOW_STATUS_FINISHED) { tech_pvt->skype_callflow = CALLFLOW_CALL_IDLE; } +#endif // switch_core_session_set_private(session, NULL); } else { DEBUGA_SKYPE("!!!!!!NO tech_pvt!!!! CHANNEL DESTROY %s\n", SKYPOPEN_P_LOG, switch_core_session_get_uuid(session)); @@ -502,15 +528,17 @@ static switch_status_t channel_on_hangup(switch_core_session_t *session) switch_channel_t *channel = NULL; private_t *tech_pvt = NULL; char msg_to_skype[256]; - switch_status_t status; + //switch_status_t status; channel = switch_core_session_get_channel(session); switch_assert(channel != NULL); tech_pvt = switch_core_session_get_private(session); - switch_assert(tech_pvt != NULL); + DEBUGA_SKYPE("debugging_hangup 1\n", SKYPOPEN_P_LOG); + + if(tech_pvt){ if (!switch_channel_test_flag(channel, CF_ANSWERED)) { if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) { tech_pvt->ob_failed_calls++; @@ -522,20 +550,44 @@ static switch_status_t channel_on_hangup(switch_core_session_t *session) switch_clear_flag(tech_pvt, TFLAG_IO); switch_clear_flag(tech_pvt, TFLAG_VOICE); + DEBUGA_SKYPE("debugging_hangup 2\n", SKYPOPEN_P_LOG); tech_pvt->interface_state = SKYPOPEN_STATE_HANGUP_REQUESTED; if (strlen(tech_pvt->skype_call_id)) { DEBUGA_SKYPE("hanging up skype call: %s\n", SKYPOPEN_P_LOG, tech_pvt->skype_call_id); + sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", tech_pvt->skype_call_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); sprintf(msg_to_skype, "ALTER CALL %s HANGUP", tech_pvt->skype_call_id); skypopen_signaling_write(tech_pvt, msg_to_skype); } +#if 0 + switch_sleep(1500000); //XXX 1.5 seconds, let's the audio tcp threads die XXX + //FIXME must not allow using the tech_pvt while this sleeps, so must implement a check on interface_state + + DEBUGA_SKYPE("debugging_hangup 3\n", SKYPOPEN_P_LOG); + switch_mutex_lock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli lock\n", SKYPOPEN_P_LOG); if (tech_pvt->tcp_cli_thread) { + DEBUGA_SKYPE("debugging_hangup 4\n", SKYPOPEN_P_LOG); switch_thread_join(&status, tech_pvt->tcp_cli_thread); + tech_pvt->tcp_cli_thread = NULL; + DEBUGA_SKYPE("debugging_hangup 5\n", SKYPOPEN_P_LOG); } + switch_mutex_unlock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli unlock\n", SKYPOPEN_P_LOG); + switch_mutex_lock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv lock\n", SKYPOPEN_P_LOG); if (tech_pvt->tcp_srv_thread) { + DEBUGA_SKYPE("debugging_hangup 6\n", SKYPOPEN_P_LOG); switch_thread_join(&status, tech_pvt->tcp_srv_thread); + tech_pvt->tcp_srv_thread = NULL; + DEBUGA_SKYPE("debugging_hangup 7\n", SKYPOPEN_P_LOG); } + switch_mutex_unlock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv unlock\n", SKYPOPEN_P_LOG); + DEBUGA_SKYPE("debugging_hangup 8\n", SKYPOPEN_P_LOG); +#endif//0 DEBUGA_SKYPE("%s CHANNEL HANGUP\n", SKYPOPEN_P_LOG, tech_pvt->name); switch_mutex_lock(globals.mutex); globals.calls--; @@ -543,11 +595,18 @@ static switch_status_t channel_on_hangup(switch_core_session_t *session) globals.calls = 0; } + DEBUGA_SKYPE("debugging_hangup 9\n", SKYPOPEN_P_LOG); tech_pvt->interface_state = SKYPOPEN_STATE_IDLE; if (tech_pvt->skype_callflow == CALLFLOW_STATUS_FINISHED) { tech_pvt->skype_callflow = CALLFLOW_CALL_IDLE; } + DEBUGA_SKYPE("debugging_hangup 10\n", SKYPOPEN_P_LOG); switch_mutex_unlock(globals.mutex); + }else{ + WARNINGA("FYI %s CHANNEL has no tech_pvt in his private\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel)); + DEBUGA_SKYPE("debugging_hangup 11\n", SKYPOPEN_P_LOG); + } + DEBUGA_SKYPE("debugging_hangup 12\n", SKYPOPEN_P_LOG); return SWITCH_STATUS_SUCCESS; } @@ -595,9 +654,9 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int switch_assert(channel != NULL); tech_pvt = switch_core_session_get_private(session); - switch_assert(tech_pvt != NULL); //DEBUGA_SKYPE("%s CHANNEL KILL_CHANNEL\n", SKYPOPEN_P_LOG, tech_pvt->name); + if(tech_pvt){ switch (sig) { case SWITCH_SIG_KILL: switch_mutex_lock(tech_pvt->flag_mutex); @@ -606,10 +665,10 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int DEBUGA_SKYPE("FYI %s CHANNEL in CALLFLOW_STATUS_REMOTEHOLD got SWITCH_SIG_KILL\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel)); } if (switch_channel_get_state(channel) == CS_NEW) { - ERRORA("FYI %s CHANNEL in CS_NEW state got SWITCH_SIG_KILL\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel)); + WARNINGA("FYI %s CHANNEL in CS_NEW state got SWITCH_SIG_KILL\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel)); } if (switch_channel_get_state(channel) != CS_NEW && switch_channel_get_state(channel) < CS_EXECUTE) { - ERRORA("FYI %s CHANNEL in %d state got SWITCH_SIG_KILL\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel), switch_channel_get_state(channel)); + WARNINGA("FYI %s CHANNEL in %d state got SWITCH_SIG_KILL\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel), switch_channel_get_state(channel)); } switch_clear_flag(tech_pvt, TFLAG_IO); switch_clear_flag(tech_pvt, TFLAG_VOICE); @@ -620,6 +679,12 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int switch_mutex_unlock(tech_pvt->flag_mutex); sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", tech_pvt->ring_id); skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", tech_pvt->ring_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", tech_pvt->skype_call_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", tech_pvt->skype_call_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); break; case SWITCH_SIG_BREAK: DEBUGA_SKYPE("%s CHANNEL got SWITCH_SIG_BREAK\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel)); @@ -630,6 +695,9 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int default: break; } + }else{ + WARNINGA("FYI %s CHANNEL has no tech_pvt in his private\n", SKYPOPEN_P_LOG, switch_channel_get_name(channel)); + } return SWITCH_STATUS_SUCCESS; } @@ -720,8 +788,13 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch tech_pvt->read_frame.flags = SFF_NONE; - if (!switch_channel_ready(channel) || !switch_test_flag(tech_pvt, TFLAG_IO)) { + if (!switch_test_flag(tech_pvt, TFLAG_IO)) { + switch_sleep(20000); + return SWITCH_STATUS_FALSE; + } + if (!switch_channel_ready(channel)) { ERRORA("channel not ready \n", SKYPOPEN_P_LOG); + switch_sleep(20000); return SWITCH_STATUS_FALSE; } @@ -917,6 +990,7 @@ static switch_status_t channel_answer_channel(switch_core_session_t *session) { private_t *tech_pvt; switch_channel_t *channel = NULL; + int conta = 0; channel = switch_core_session_get_channel(session); switch_assert(channel != NULL); @@ -932,6 +1006,10 @@ static switch_status_t channel_answer_channel(switch_core_session_t *session) return SWITCH_STATUS_FALSE; } switch_sleep(5000); + conta++; + if(conta == 100){ //0.5 seconds + return SWITCH_STATUS_FALSE; + } } switch_mutex_lock(globals.mutex); globals.calls++; @@ -973,6 +1051,12 @@ static switch_status_t channel_receive_message(switch_core_session_t *session, s if(switch_set_flag(tech_pvt, TFLAG_PROGRESS)){ sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", tech_pvt->ring_id); skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", tech_pvt->ring_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", tech_pvt->skype_call_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", tech_pvt->skype_call_id); + skypopen_signaling_write(tech_pvt, msg_to_skype); switch_clear_flag(tech_pvt, TFLAG_PROGRESS); } } @@ -1585,6 +1669,8 @@ static switch_status_t load_config(int reload_type) skypopen_audio_init(&globals.SKYPOPEN_INTERFACES[interface_id]); switch_mutex_init(&globals.SKYPOPEN_INTERFACES[interface_id].mutex_audio_srv, SWITCH_MUTEX_NESTED, skypopen_module_pool); switch_mutex_init(&globals.SKYPOPEN_INTERFACES[interface_id].mutex_audio_cli, SWITCH_MUTEX_NESTED, skypopen_module_pool); + switch_mutex_init(&globals.SKYPOPEN_INTERFACES[interface_id].mutex_thread_audio_srv, SWITCH_MUTEX_NESTED, skypopen_module_pool); + switch_mutex_init(&globals.SKYPOPEN_INTERFACES[interface_id].mutex_thread_audio_cli, SWITCH_MUTEX_NESTED, skypopen_module_pool); NOTICA ("WAITING roughly 10 seconds to find a running Skype client and connect to its SKYPE API for interface_id=%d\n", @@ -2020,22 +2106,34 @@ int start_audio_threads(private_t * tech_pvt) switch_threadattr_create(&thd_attr, skypopen_module_pool); switch_threadattr_detach_set(thd_attr, 0); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_mutex_lock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv lock\n", SKYPOPEN_P_LOG); if (switch_thread_create(&tech_pvt->tcp_srv_thread, thd_attr, skypopen_do_tcp_srv_thread, tech_pvt, skypopen_module_pool) == SWITCH_STATUS_SUCCESS) { DEBUGA_SKYPE("started tcp_srv_thread thread.\n", SKYPOPEN_P_LOG); } else { ERRORA("failed to start tcp_srv_thread thread.\n", SKYPOPEN_P_LOG); + switch_mutex_unlock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv unlock\n", SKYPOPEN_P_LOG); return -1; } + switch_mutex_unlock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv unlock\n", SKYPOPEN_P_LOG); switch_threadattr_create(&thd_attr, skypopen_module_pool); switch_threadattr_detach_set(thd_attr, 0); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_mutex_lock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli lock\n", SKYPOPEN_P_LOG); if (switch_thread_create(&tech_pvt->tcp_cli_thread, thd_attr, skypopen_do_tcp_cli_thread, tech_pvt, skypopen_module_pool) == SWITCH_STATUS_SUCCESS) { DEBUGA_SKYPE("started tcp_cli_thread thread.\n", SKYPOPEN_P_LOG); } else { ERRORA("failed to start tcp_cli_thread thread.\n", SKYPOPEN_P_LOG); + switch_mutex_unlock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli unlock\n", SKYPOPEN_P_LOG); return -1; } + switch_mutex_unlock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli unlock\n", SKYPOPEN_P_LOG); switch_sleep(100000); if (tech_pvt->tcp_cli_thread == NULL || tech_pvt->tcp_srv_thread == NULL) { @@ -2487,6 +2585,8 @@ int skypopen_partner_handle_ring(private_t * tech_pvt) DEBUGA_SKYPE("We're in a call now (%s), let's refuse this one (%s)\n", SKYPOPEN_P_LOG, tech_pvt->skype_call_id, id); sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", id); skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", id); + skypopen_signaling_write(tech_pvt, msg_to_skype); } switch_mutex_unlock(globals.mutex); @@ -2528,17 +2628,22 @@ int skypopen_answer(private_t * tech_pvt) if (!zstr(tech_pvt->session_uuid_str)) { session = switch_core_session_locate(tech_pvt->session_uuid_str); } else { - ERRORA("No session???\n", SKYPOPEN_P_LOG); + ERRORA("No session_uuid_str???\n", SKYPOPEN_P_LOG); + break; } if (session) { channel = switch_core_session_get_channel(session); } else { ERRORA("No session???\n", SKYPOPEN_P_LOG); + switch_core_session_rwunlock(session); + break; } if (channel) { switch_channel_set_state(channel, CS_RESET); } else { ERRORA("No channel???\n", SKYPOPEN_P_LOG); + switch_core_session_rwunlock(session); + break; } switch_core_session_rwunlock(session); @@ -2579,6 +2684,8 @@ int skypopen_answer(private_t * tech_pvt) DEBUGA_SKYPE("We're in a call now (%s), let's refuse this one (%s)\n", SKYPOPEN_P_LOG, tech_pvt->skype_call_id, id); sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", id); skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", id); + skypopen_signaling_write(tech_pvt, msg_to_skype); } switch_mutex_unlock(globals.mutex); @@ -2752,6 +2859,8 @@ int skypopen_transfer(private_t * tech_pvt) SKYPOPEN_P_LOG, id, tech_pvt->skype_call_id); sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", id); skypopen_signaling_write(tech_pvt, msg_to_skype); + sprintf(msg_to_skype, "ALTER CALL %s HANGUP", id); + skypopen_signaling_write(tech_pvt, msg_to_skype); } switch_sleep(10000); DEBUGA_SKYPE diff --git a/src/mod/endpoints/mod_skypopen/skypopen.h b/src/mod/endpoints/mod_skypopen/skypopen.h index d678ba23bf..a67bfb2876 100644 --- a/src/mod/endpoints/mod_skypopen/skypopen.h +++ b/src/mod/endpoints/mod_skypopen/skypopen.h @@ -307,6 +307,8 @@ struct private_object { short audiobuf_srv[SAMPLES_PER_FRAME]; switch_mutex_t *mutex_audio_srv; int flag_audio_srv; + switch_mutex_t *mutex_thread_audio_cli; + switch_mutex_t *mutex_thread_audio_srv; FILE *phonebook_writing_fp; int skypopen_dir_entry_extension_prefix; diff --git a/src/mod/endpoints/mod_skypopen/skypopen_protocol.c b/src/mod/endpoints/mod_skypopen/skypopen_protocol.c index b0343652a1..f48d8559e6 100644 --- a/src/mod/endpoints/mod_skypopen/skypopen_protocol.c +++ b/src/mod/endpoints/mod_skypopen/skypopen_protocol.c @@ -554,6 +554,8 @@ int skypopen_signaling_read(private_t * tech_pvt) char msg_to_skype[1024]; skypopen_strncpy(tech_pvt->skype_call_id, id, sizeof(tech_pvt->skype_call_id) - 1); ERRORA("We are in a double call situation, trying to get out hanging up call id: %s.\n", SKYPOPEN_P_LOG, id); + sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", id); + skypopen_signaling_write(tech_pvt, msg_to_skype); sprintf(msg_to_skype, "ALTER CALL %s HANGUP", id); skypopen_signaling_write(tech_pvt, msg_to_skype); skypopen_sleep(10000); @@ -590,6 +592,8 @@ int skypopen_signaling_read(private_t * tech_pvt) if( !remote_party_is_ringing(tech_pvt)){ WARNINGA("We are getting the RINGING from a call we canceled, trying to get out hanging up call id: %s.\n", SKYPOPEN_P_LOG, id); + sprintf(msg_to_skype, "ALTER CALL %s END HANGUP", id); + skypopen_signaling_write(tech_pvt, msg_to_skype); sprintf(msg_to_skype, "ALTER CALL %s HANGUP", id); skypopen_signaling_write(tech_pvt, msg_to_skype); tech_pvt->skype_call_id[0] = '\0'; @@ -925,7 +929,12 @@ void *skypopen_do_tcp_srv_thread_func(void *obj) DEBUGA_SKYPE("incoming audio (read) server (I am it) EXITING\n", SKYPOPEN_P_LOG); skypopen_close_socket(s); s = -1; + DEBUGA_SKYPE("debugging_hangup PRE srv lock\n", SKYPOPEN_P_LOG); + switch_mutex_lock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv lock\n", SKYPOPEN_P_LOG); tech_pvt->tcp_srv_thread = NULL; + switch_mutex_unlock(tech_pvt->mutex_thread_audio_srv); + DEBUGA_SKYPE("debugging_hangup srv unlock\n", SKYPOPEN_P_LOG); return NULL; } @@ -1070,7 +1079,12 @@ void *skypopen_do_tcp_cli_thread_func(void *obj) DEBUGA_SKYPE("outbound audio server (I am it) EXITING\n", SKYPOPEN_P_LOG); skypopen_close_socket(s); s = -1; + DEBUGA_SKYPE("debugging_hangup PRE cli lock\n", SKYPOPEN_P_LOG); + switch_mutex_lock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli lock\n", SKYPOPEN_P_LOG); tech_pvt->tcp_cli_thread = NULL; + switch_mutex_unlock(tech_pvt->mutex_thread_audio_cli); + DEBUGA_SKYPE("debugging_hangup cli unlock\n", SKYPOPEN_P_LOG); return NULL; }