diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 2432c5fe25..232df2d9b2 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -115,7 +115,8 @@ typedef enum { MFLAG_CAN_HEAR = (1 << 2), MFLAG_KICKED = (1 << 3), MFLAG_ITHREAD = (1 << 4), - MFLAG_NOCHANNEL = (1 << 5) + MFLAG_NOCHANNEL = (1 << 5), + MFLAG_INTREE = (1 << 6) } member_flag_t; typedef enum { @@ -189,7 +190,7 @@ typedef struct conference_obj { uint32_t interval; switch_mutex_t *mutex; conference_member_t *members; - switch_mutex_t *member_mutex; + switch_thread_rwlock_t *member_rwlock; conference_file_node_t *fnode; switch_memory_pool_t *pool; switch_thread_rwlock_t *rwlock; @@ -372,6 +373,7 @@ static conference_member_t *conference_member_get(conference_obj_t *conference, assert(conference != NULL); assert(id != 0); + switch_thread_rwlock_rdlock(conference->member_rwlock); for(member = conference->members; member; member = member->next) { if (switch_test_flag(member, MFLAG_NOCHANNEL)) { @@ -382,7 +384,12 @@ static conference_member_t *conference_member_get(conference_obj_t *conference, break; } } - + + if (member && !switch_test_flag(member, MFLAG_INTREE)) { + member = NULL; + } + + switch_thread_rwlock_unlock(conference->member_rwlock); return member; } @@ -394,14 +401,14 @@ static switch_status_t conference_record_stop(conference_obj_t *conference, char int count = 0; assert (conference != NULL); - + switch_thread_rwlock_rdlock(conference->member_rwlock); for(member = conference->members; member; member = member->next) { if (switch_test_flag(member, MFLAG_NOCHANNEL) && (!path || !strcmp(path, member->rec_path))) { switch_clear_flag_locked(member, MFLAG_RUNNING); count++; } } - + switch_thread_rwlock_unlock(conference->member_rwlock); return count; } @@ -459,7 +466,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe assert(member != NULL); switch_mutex_lock(conference->mutex); - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_wrlock(conference->member_rwlock); switch_mutex_lock(member->audio_in_mutex); switch_mutex_lock(member->audio_out_mutex); switch_mutex_lock(member->flag_mutex); @@ -467,7 +474,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe member->next = conference->members; member->energy_level = conference->energy_level; conference->members = member; - + switch_set_flag(member, MFLAG_INTREE); if (!switch_test_flag(member, MFLAG_NOCHANNEL)) { conference->count++; @@ -506,7 +513,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe switch_mutex_unlock(member->flag_mutex); switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_in_mutex); - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); switch_mutex_unlock(conference->mutex); status = SWITCH_STATUS_SUCCESS; @@ -524,15 +531,13 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe assert(conference != NULL); assert(member != NULL); - - - switch_mutex_lock(conference->mutex); - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_wrlock(conference->member_rwlock); switch_mutex_lock(member->audio_in_mutex); switch_mutex_lock(member->audio_out_mutex); switch_mutex_lock(member->flag_mutex); - + switch_clear_flag(member, MFLAG_INTREE); + for (imember = conference->members; imember; imember = imember->next) { if (imember == member ) { if (last) { @@ -606,7 +611,7 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe switch_mutex_unlock(member->flag_mutex); switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_in_mutex); - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); switch_mutex_unlock(conference->mutex); status = SWITCH_STATUS_SUCCESS; @@ -648,6 +653,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v ready = 0; /* Read one frame of audio from each member channel and save it for redistribution */ + switch_thread_rwlock_rdlock(conference->member_rwlock); for (imember = conference->members; imember; imember = imember->next) { if (imember->buflen) { memset(imember->frame, 255, imember->buflen); @@ -686,7 +692,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v } switch_mutex_unlock(imember->audio_in_mutex); } - + switch_thread_rwlock_unlock(conference->member_rwlock); /* If a file or speech event is being played */ if (conference->fnode) { /* Lead in time */ @@ -719,6 +725,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v if (ready) { /* Build a muxed frame for every member that contains the mixed audio of everyone else */ + switch_thread_rwlock_rdlock(conference->member_rwlock); for (omember = conference->members; omember; omember = omember->next) { omember->len = bytes; if (conference->fnode) { @@ -785,6 +792,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v switch_mutex_unlock(imember->audio_out_mutex); } } + switch_thread_rwlock_unlock(conference->member_rwlock); if (conference->fnode && conference->fnode->done) { conference_file_node_t *fnode; @@ -849,6 +857,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v conference->fnode = NULL; } + switch_thread_rwlock_rdlock(conference->member_rwlock); for(imember = conference->members; imember; imember = imember->next) { switch_channel_t *channel; @@ -867,6 +876,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v switch_clear_flag_locked(imember, MFLAG_RUNNING); } + switch_thread_rwlock_unlock(conference->member_rwlock); switch_mutex_unlock(conference->mutex); @@ -1428,7 +1438,8 @@ static void conference_loop_output(conference_member_t *member) launch_conference_loop_input(member, switch_core_session_get_pool(member->session)); /* build a digit stream object */ - if (member->conference->dtmf_parser != NULL && switch_ivr_digit_stream_new(member->conference->dtmf_parser, &member->digit_stream) != SWITCH_STATUS_SUCCESS) { + if (member->conference->dtmf_parser != NULL && + switch_ivr_digit_stream_new(member->conference->dtmf_parser, &member->digit_stream) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Warning Will Robinson, there is no digit parser stream object\n"); } @@ -1813,9 +1824,9 @@ static switch_status_t conference_play_file(conference_obj_t *conference, char * assert(conference != NULL); switch_mutex_lock(conference->mutex); - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_rdlock(conference->member_rwlock); count = conference->count; - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); switch_mutex_unlock(conference->mutex); if (!count) { @@ -2040,15 +2051,13 @@ static switch_status_t conference_say(conference_obj_t *conference, const char * return SWITCH_STATUS_GENERR; } - - switch_mutex_lock(conference->mutex); - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_rdlock(conference->member_rwlock); count = conference->count; if (!(conference->tts_engine && conference->tts_voice)) { count = 0; } - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); switch_mutex_unlock(conference->mutex); if (!count) { @@ -2113,13 +2122,11 @@ static void conference_member_itterator(conference_obj_t *conference, switch_str assert(stream != NULL); assert(pfncallback != NULL); - switch_mutex_lock(conference->member_mutex); - + switch_thread_rwlock_rdlock(conference->member_rwlock); for (member = conference->members; member; member = member->next) { pfncallback(member, stream, data); } - - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); } @@ -2130,7 +2137,7 @@ static void conference_list_pretty(conference_obj_t *conference, switch_stream_h assert(conference != NULL); assert(stream != NULL); - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_rdlock(conference->member_rwlock); // stream->write_function(stream, "
Current Callers:\n"); for (member = conference->members; member; member = member->next) { @@ -2152,7 +2159,7 @@ static void conference_list_pretty(conference_obj_t *conference, switch_stream_h } - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); } @@ -2165,7 +2172,7 @@ static void conference_list(conference_obj_t *conference, switch_stream_handle_t assert(delim != NULL); - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_rdlock(conference->member_rwlock); for (member = conference->members; member; member = member->next) { switch_channel_t *channel; @@ -2209,7 +2216,7 @@ static void conference_list(conference_obj_t *conference, switch_stream_handle_t member->energy_level); } - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); } @@ -2785,23 +2792,17 @@ static switch_status_t conf_api_sub_relate(conference_obj_t *conference, switch_ uint32_t id = atoi(argv[2]); uint32_t oid = atoi(argv[3]); - switch_mutex_lock(conference->mutex); - switch_mutex_lock(conference->member_mutex); if ((member = conference_member_get(conference, id))) { member_del_relationship(member, oid); stream->write_function(stream, "relationship %u->%u cleared.", id, oid); } else { stream->write_function(stream, "relationship %u->%u not found", id, oid); } - switch_mutex_unlock(conference->member_mutex); - switch_mutex_unlock(conference->mutex); } else if (nospeak || nohear) { conference_member_t *member = NULL, *other_member = NULL; uint32_t id = atoi(argv[2]); uint32_t oid = atoi(argv[3]); - switch_mutex_lock(conference->mutex); - switch_mutex_lock(conference->member_mutex); if ((member = conference_member_get(conference, id)) && (other_member = conference_member_get(conference, oid))) { conference_relationship_t *rel = NULL; if ((rel = member_get_relationship(member, other_member))) { @@ -2825,8 +2826,6 @@ static switch_status_t conf_api_sub_relate(conference_obj_t *conference, switch_ } else { stream->write_function(stream, "relationship %u->%u not found", id, oid); } - switch_mutex_unlock(conference->member_mutex); - switch_mutex_unlock(conference->mutex); } } @@ -3119,7 +3118,7 @@ switch_status_t conf_api_dispatch(conference_obj_t *conference, switch_stream_ha conference_member_t *member = NULL; conference_member_t *last_member = NULL; - switch_mutex_lock(conference->member_mutex); + switch_thread_rwlock_rdlock(conference->member_rwlock); /* find last (oldest) member */ member = conference->members; @@ -3136,7 +3135,7 @@ switch_status_t conf_api_dispatch(conference_obj_t *conference, switch_stream_ha pfn(last_member, stream, argv[argn+2]); } - switch_mutex_unlock(conference->member_mutex); + switch_thread_rwlock_unlock(conference->member_rwlock); } else { conf_api_member_cmd_t pfn = (conf_api_member_cmd_t)conf_api_sub_commands[i].pfnapicmd; conference_member_t *member = conference_member_get(conference, id); @@ -4461,9 +4460,9 @@ static conference_obj_t *conference_new(char *name, conf_xml_cfg_t cfg, switch_m /* Activate the conference mutex for exclusivity */ switch_mutex_init(&conference->mutex, SWITCH_MUTEX_NESTED, conference->pool); - switch_mutex_init(&conference->member_mutex, SWITCH_MUTEX_NESTED, conference->pool); switch_mutex_init(&conference->flag_mutex, SWITCH_MUTEX_NESTED, conference->pool); switch_thread_rwlock_create(&conference->rwlock, conference->pool); + switch_thread_rwlock_create(&conference->member_rwlock, conference->pool); return conference; }