add rwlock to members

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@3945 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2007-01-11 22:34:04 +00:00
parent 0c97a56956
commit 9d91113b47
1 changed files with 40 additions and 41 deletions

View File

@ -115,7 +115,8 @@ typedef enum {
MFLAG_CAN_HEAR = (1 << 2), MFLAG_CAN_HEAR = (1 << 2),
MFLAG_KICKED = (1 << 3), MFLAG_KICKED = (1 << 3),
MFLAG_ITHREAD = (1 << 4), MFLAG_ITHREAD = (1 << 4),
MFLAG_NOCHANNEL = (1 << 5) MFLAG_NOCHANNEL = (1 << 5),
MFLAG_INTREE = (1 << 6)
} member_flag_t; } member_flag_t;
typedef enum { typedef enum {
@ -189,7 +190,7 @@ typedef struct conference_obj {
uint32_t interval; uint32_t interval;
switch_mutex_t *mutex; switch_mutex_t *mutex;
conference_member_t *members; conference_member_t *members;
switch_mutex_t *member_mutex; switch_thread_rwlock_t *member_rwlock;
conference_file_node_t *fnode; conference_file_node_t *fnode;
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
switch_thread_rwlock_t *rwlock; switch_thread_rwlock_t *rwlock;
@ -372,6 +373,7 @@ static conference_member_t *conference_member_get(conference_obj_t *conference,
assert(conference != NULL); assert(conference != NULL);
assert(id != 0); assert(id != 0);
switch_thread_rwlock_rdlock(conference->member_rwlock);
for(member = conference->members; member; member = member->next) { for(member = conference->members; member; member = member->next) {
if (switch_test_flag(member, MFLAG_NOCHANNEL)) { if (switch_test_flag(member, MFLAG_NOCHANNEL)) {
@ -382,7 +384,12 @@ static conference_member_t *conference_member_get(conference_obj_t *conference,
break; break;
} }
} }
if (member && !switch_test_flag(member, MFLAG_INTREE)) {
member = NULL;
}
switch_thread_rwlock_unlock(conference->member_rwlock);
return member; return member;
} }
@ -394,14 +401,14 @@ static switch_status_t conference_record_stop(conference_obj_t *conference, char
int count = 0; int count = 0;
assert (conference != NULL); assert (conference != NULL);
switch_thread_rwlock_rdlock(conference->member_rwlock);
for(member = conference->members; member; member = member->next) { for(member = conference->members; member; member = member->next) {
if (switch_test_flag(member, MFLAG_NOCHANNEL) && (!path || !strcmp(path, member->rec_path))) { if (switch_test_flag(member, MFLAG_NOCHANNEL) && (!path || !strcmp(path, member->rec_path))) {
switch_clear_flag_locked(member, MFLAG_RUNNING); switch_clear_flag_locked(member, MFLAG_RUNNING);
count++; count++;
} }
} }
switch_thread_rwlock_unlock(conference->member_rwlock);
return count; return count;
} }
@ -459,7 +466,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
assert(member != NULL); assert(member != NULL);
switch_mutex_lock(conference->mutex); switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_wrlock(conference->member_rwlock);
switch_mutex_lock(member->audio_in_mutex); switch_mutex_lock(member->audio_in_mutex);
switch_mutex_lock(member->audio_out_mutex); switch_mutex_lock(member->audio_out_mutex);
switch_mutex_lock(member->flag_mutex); switch_mutex_lock(member->flag_mutex);
@ -467,7 +474,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
member->next = conference->members; member->next = conference->members;
member->energy_level = conference->energy_level; member->energy_level = conference->energy_level;
conference->members = member; conference->members = member;
switch_set_flag(member, MFLAG_INTREE);
if (!switch_test_flag(member, MFLAG_NOCHANNEL)) { if (!switch_test_flag(member, MFLAG_NOCHANNEL)) {
conference->count++; conference->count++;
@ -506,7 +513,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
switch_mutex_unlock(member->flag_mutex); switch_mutex_unlock(member->flag_mutex);
switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_out_mutex);
switch_mutex_unlock(member->audio_in_mutex); switch_mutex_unlock(member->audio_in_mutex);
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
switch_mutex_unlock(conference->mutex); switch_mutex_unlock(conference->mutex);
status = SWITCH_STATUS_SUCCESS; status = SWITCH_STATUS_SUCCESS;
@ -524,15 +531,13 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe
assert(conference != NULL); assert(conference != NULL);
assert(member != NULL); assert(member != NULL);
switch_mutex_lock(conference->mutex); switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_wrlock(conference->member_rwlock);
switch_mutex_lock(member->audio_in_mutex); switch_mutex_lock(member->audio_in_mutex);
switch_mutex_lock(member->audio_out_mutex); switch_mutex_lock(member->audio_out_mutex);
switch_mutex_lock(member->flag_mutex); switch_mutex_lock(member->flag_mutex);
switch_clear_flag(member, MFLAG_INTREE);
for (imember = conference->members; imember; imember = imember->next) { for (imember = conference->members; imember; imember = imember->next) {
if (imember == member ) { if (imember == member ) {
if (last) { if (last) {
@ -606,7 +611,7 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe
switch_mutex_unlock(member->flag_mutex); switch_mutex_unlock(member->flag_mutex);
switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_out_mutex);
switch_mutex_unlock(member->audio_in_mutex); switch_mutex_unlock(member->audio_in_mutex);
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
switch_mutex_unlock(conference->mutex); switch_mutex_unlock(conference->mutex);
status = SWITCH_STATUS_SUCCESS; status = SWITCH_STATUS_SUCCESS;
@ -648,6 +653,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
ready = 0; ready = 0;
/* Read one frame of audio from each member channel and save it for redistribution */ /* Read one frame of audio from each member channel and save it for redistribution */
switch_thread_rwlock_rdlock(conference->member_rwlock);
for (imember = conference->members; imember; imember = imember->next) { for (imember = conference->members; imember; imember = imember->next) {
if (imember->buflen) { if (imember->buflen) {
memset(imember->frame, 255, imember->buflen); memset(imember->frame, 255, imember->buflen);
@ -686,7 +692,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
} }
switch_mutex_unlock(imember->audio_in_mutex); switch_mutex_unlock(imember->audio_in_mutex);
} }
switch_thread_rwlock_unlock(conference->member_rwlock);
/* If a file or speech event is being played */ /* If a file or speech event is being played */
if (conference->fnode) { if (conference->fnode) {
/* Lead in time */ /* Lead in time */
@ -719,6 +725,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
if (ready) { if (ready) {
/* Build a muxed frame for every member that contains the mixed audio of everyone else */ /* Build a muxed frame for every member that contains the mixed audio of everyone else */
switch_thread_rwlock_rdlock(conference->member_rwlock);
for (omember = conference->members; omember; omember = omember->next) { for (omember = conference->members; omember; omember = omember->next) {
omember->len = bytes; omember->len = bytes;
if (conference->fnode) { if (conference->fnode) {
@ -785,6 +792,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
switch_mutex_unlock(imember->audio_out_mutex); switch_mutex_unlock(imember->audio_out_mutex);
} }
} }
switch_thread_rwlock_unlock(conference->member_rwlock);
if (conference->fnode && conference->fnode->done) { if (conference->fnode && conference->fnode->done) {
conference_file_node_t *fnode; conference_file_node_t *fnode;
@ -849,6 +857,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
conference->fnode = NULL; conference->fnode = NULL;
} }
switch_thread_rwlock_rdlock(conference->member_rwlock);
for(imember = conference->members; imember; imember = imember->next) { for(imember = conference->members; imember; imember = imember->next) {
switch_channel_t *channel; switch_channel_t *channel;
@ -867,6 +876,7 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
switch_clear_flag_locked(imember, MFLAG_RUNNING); switch_clear_flag_locked(imember, MFLAG_RUNNING);
} }
switch_thread_rwlock_unlock(conference->member_rwlock);
switch_mutex_unlock(conference->mutex); switch_mutex_unlock(conference->mutex);
@ -1428,7 +1438,8 @@ static void conference_loop_output(conference_member_t *member)
launch_conference_loop_input(member, switch_core_session_get_pool(member->session)); launch_conference_loop_input(member, switch_core_session_get_pool(member->session));
/* build a digit stream object */ /* build a digit stream object */
if (member->conference->dtmf_parser != NULL && switch_ivr_digit_stream_new(member->conference->dtmf_parser, &member->digit_stream) != SWITCH_STATUS_SUCCESS) { if (member->conference->dtmf_parser != NULL &&
switch_ivr_digit_stream_new(member->conference->dtmf_parser, &member->digit_stream) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Warning Will Robinson, there is no digit parser stream object\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Warning Will Robinson, there is no digit parser stream object\n");
} }
@ -1813,9 +1824,9 @@ static switch_status_t conference_play_file(conference_obj_t *conference, char *
assert(conference != NULL); assert(conference != NULL);
switch_mutex_lock(conference->mutex); switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_rdlock(conference->member_rwlock);
count = conference->count; count = conference->count;
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
switch_mutex_unlock(conference->mutex); switch_mutex_unlock(conference->mutex);
if (!count) { if (!count) {
@ -2040,15 +2051,13 @@ static switch_status_t conference_say(conference_obj_t *conference, const char *
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
switch_mutex_lock(conference->mutex); switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_rdlock(conference->member_rwlock);
count = conference->count; count = conference->count;
if (!(conference->tts_engine && conference->tts_voice)) { if (!(conference->tts_engine && conference->tts_voice)) {
count = 0; count = 0;
} }
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
switch_mutex_unlock(conference->mutex); switch_mutex_unlock(conference->mutex);
if (!count) { if (!count) {
@ -2113,13 +2122,11 @@ static void conference_member_itterator(conference_obj_t *conference, switch_str
assert(stream != NULL); assert(stream != NULL);
assert(pfncallback != NULL); assert(pfncallback != NULL);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_rdlock(conference->member_rwlock);
for (member = conference->members; member; member = member->next) { for (member = conference->members; member; member = member->next) {
pfncallback(member, stream, data); pfncallback(member, stream, data);
} }
switch_thread_rwlock_unlock(conference->member_rwlock);
switch_mutex_unlock(conference->member_mutex);
} }
@ -2130,7 +2137,7 @@ static void conference_list_pretty(conference_obj_t *conference, switch_stream_h
assert(conference != NULL); assert(conference != NULL);
assert(stream != NULL); assert(stream != NULL);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_rdlock(conference->member_rwlock);
// stream->write_function(stream, "<pre>Current Callers:\n"); // stream->write_function(stream, "<pre>Current Callers:\n");
for (member = conference->members; member; member = member->next) { for (member = conference->members; member; member = member->next) {
@ -2152,7 +2159,7 @@ static void conference_list_pretty(conference_obj_t *conference, switch_stream_h
} }
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
} }
@ -2165,7 +2172,7 @@ static void conference_list(conference_obj_t *conference, switch_stream_handle_t
assert(delim != NULL); assert(delim != NULL);
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_rdlock(conference->member_rwlock);
for (member = conference->members; member; member = member->next) { for (member = conference->members; member; member = member->next) {
switch_channel_t *channel; switch_channel_t *channel;
@ -2209,7 +2216,7 @@ static void conference_list(conference_obj_t *conference, switch_stream_handle_t
member->energy_level); member->energy_level);
} }
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
} }
@ -2785,23 +2792,17 @@ static switch_status_t conf_api_sub_relate(conference_obj_t *conference, switch_
uint32_t id = atoi(argv[2]); uint32_t id = atoi(argv[2]);
uint32_t oid = atoi(argv[3]); uint32_t oid = atoi(argv[3]);
switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex);
if ((member = conference_member_get(conference, id))) { if ((member = conference_member_get(conference, id))) {
member_del_relationship(member, oid); member_del_relationship(member, oid);
stream->write_function(stream, "relationship %u->%u cleared.", id, oid); stream->write_function(stream, "relationship %u->%u cleared.", id, oid);
} else { } else {
stream->write_function(stream, "relationship %u->%u not found", id, oid); stream->write_function(stream, "relationship %u->%u not found", id, oid);
} }
switch_mutex_unlock(conference->member_mutex);
switch_mutex_unlock(conference->mutex);
} else if (nospeak || nohear) { } else if (nospeak || nohear) {
conference_member_t *member = NULL, *other_member = NULL; conference_member_t *member = NULL, *other_member = NULL;
uint32_t id = atoi(argv[2]); uint32_t id = atoi(argv[2]);
uint32_t oid = atoi(argv[3]); uint32_t oid = atoi(argv[3]);
switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex);
if ((member = conference_member_get(conference, id)) && (other_member = conference_member_get(conference, oid))) { if ((member = conference_member_get(conference, id)) && (other_member = conference_member_get(conference, oid))) {
conference_relationship_t *rel = NULL; conference_relationship_t *rel = NULL;
if ((rel = member_get_relationship(member, other_member))) { if ((rel = member_get_relationship(member, other_member))) {
@ -2825,8 +2826,6 @@ static switch_status_t conf_api_sub_relate(conference_obj_t *conference, switch_
} else { } else {
stream->write_function(stream, "relationship %u->%u not found", id, oid); stream->write_function(stream, "relationship %u->%u not found", id, oid);
} }
switch_mutex_unlock(conference->member_mutex);
switch_mutex_unlock(conference->mutex);
} }
} }
@ -3119,7 +3118,7 @@ switch_status_t conf_api_dispatch(conference_obj_t *conference, switch_stream_ha
conference_member_t *member = NULL; conference_member_t *member = NULL;
conference_member_t *last_member = NULL; conference_member_t *last_member = NULL;
switch_mutex_lock(conference->member_mutex); switch_thread_rwlock_rdlock(conference->member_rwlock);
/* find last (oldest) member */ /* find last (oldest) member */
member = conference->members; member = conference->members;
@ -3136,7 +3135,7 @@ switch_status_t conf_api_dispatch(conference_obj_t *conference, switch_stream_ha
pfn(last_member, stream, argv[argn+2]); pfn(last_member, stream, argv[argn+2]);
} }
switch_mutex_unlock(conference->member_mutex); switch_thread_rwlock_unlock(conference->member_rwlock);
} else { } else {
conf_api_member_cmd_t pfn = (conf_api_member_cmd_t)conf_api_sub_commands[i].pfnapicmd; conf_api_member_cmd_t pfn = (conf_api_member_cmd_t)conf_api_sub_commands[i].pfnapicmd;
conference_member_t *member = conference_member_get(conference, id); conference_member_t *member = conference_member_get(conference, id);
@ -4461,9 +4460,9 @@ static conference_obj_t *conference_new(char *name, conf_xml_cfg_t cfg, switch_m
/* Activate the conference mutex for exclusivity */ /* Activate the conference mutex for exclusivity */
switch_mutex_init(&conference->mutex, SWITCH_MUTEX_NESTED, conference->pool); switch_mutex_init(&conference->mutex, SWITCH_MUTEX_NESTED, conference->pool);
switch_mutex_init(&conference->member_mutex, SWITCH_MUTEX_NESTED, conference->pool);
switch_mutex_init(&conference->flag_mutex, SWITCH_MUTEX_NESTED, conference->pool); switch_mutex_init(&conference->flag_mutex, SWITCH_MUTEX_NESTED, conference->pool);
switch_thread_rwlock_create(&conference->rwlock, conference->pool); switch_thread_rwlock_create(&conference->rwlock, conference->pool);
switch_thread_rwlock_create(&conference->member_rwlock, conference->pool);
return conference; return conference;
} }