diff --git a/scripts/socket/FreeSWITCH/Client.pm b/scripts/socket/FreeSWITCH/Client.pm index e70ff78668..8ef812f8a9 100644 --- a/scripts/socket/FreeSWITCH/Client.pm +++ b/scripts/socket/FreeSWITCH/Client.pm @@ -89,7 +89,7 @@ sub output($$) { my ($self,$data) = @_; my $s = $self->{_sock}; - print $s $data; + print $s $data ; } sub cmd($$$) { @@ -97,12 +97,26 @@ sub cmd($$$) { my $cmd = shift; my $to = shift; - $self->output($cmd); + $self->output($cmd->{command}); + foreach(keys %{$cmd}) { + next if ($_ eq "command"); + $self->output($cmd->{$_}); + } + $self->output("\n\n"); + my $h = $self->readhash($to); $h; } +sub disconnect($) { + my $self = shift; + $self->{_sock}->shutdown(2); + $self->{_sock}->close(); + undef $self->{_sock}; + delete $self->{_sock}; +} + sub connect($) { my $self = shift; @@ -120,8 +134,7 @@ sub connect($) { if ($h->{"content-type"} eq "auth/request") { my $pass = $self->{"_password"}; - $self->output("auth $pass"); - $h = $self->readhash(undef); + $h = $self->cmd({command => "auth $pass"}); } if ($h->{'reply-text'} =~ "OK") { diff --git a/scripts/socket/fs.pl b/scripts/socket/fs.pl index e8d356c09d..0c6b3b6bf2 100644 --- a/scripts/socket/fs.pl +++ b/scripts/socket/fs.pl @@ -11,21 +11,29 @@ my $OUT = $term->OUT .. \*STDOUT; my $log = shift; +$SIG{CHLD} = sub {$fs->disconnect(); die "done"}; + if ($log) { $pid = fork; if (!$pid) { my $fs2 = init FreeSWITCH::Client {-password => $password} or die "Error $@"; - $fs2->cmd("log $log"); + + + $fs2->cmd({ command => "log $log" }); while (1) { my $reply = $fs2->readhash(undef); - + if ($reply->{socketerror}) { + die "socket error"; + } if ($reply->{body}) { print $reply->{body} . "\n"; } elsif ($reply->{'reply-text'}) { print $reply->{'reply-text'} . "\n"; } } + exit; } + } @@ -34,8 +42,11 @@ while ( defined ($_ = $term->readline($prompt)) ) { my $reply; if ($_) { - my $reply = $fs->cmd("api $_"); - + my $reply = $fs->cmd({command => "api $_"}); + if ($reply->{socketerror}) { + $fs2->disconnect(); + die "socket error"; + } if ($reply->{body}) { print $reply->{body}; } elsif ($reply->{'reply-text'}) { diff --git a/src/include/switch_types.h b/src/include/switch_types.h index fb6a92533f..727bf4e9d8 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -543,6 +543,7 @@ typedef enum { SWITCH_EVENT_SESSION_CRASH - Session Crashed SWITCH_EVENT_MODULE_LOAD - Module was loaded SWITCH_EVENT_DTMF - DTMF was sent + SWITCH_EVENT_MESSAGE - A Basic Message SWITCH_EVENT_ALL - All events at once @@ -570,6 +571,7 @@ typedef enum { SWITCH_EVENT_SESSION_CRASH, SWITCH_EVENT_MODULE_LOAD, SWITCH_EVENT_DTMF, + SWITCH_EVENT_MESSAGE, SWITCH_EVENT_ALL } switch_event_types_t; diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index fd665c0593..9ef2f11f94 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -114,15 +114,28 @@ static void event_handler(switch_event_t *event) switch_mutex_lock(listen_list.mutex); for (l = listen_list.listeners; l; l = l->next) { - if (switch_test_flag(l, LFLAG_EVENTS) && (l->event_list[(uint8_t)event->event_id] || l->event_list[(uint8_t)SWITCH_EVENT_ALL])) { + uint8_t send = 0; + + if (!switch_test_flag(l, LFLAG_EVENTS)) { + continue; + } + + if (l->event_list[(uint8_t)SWITCH_EVENT_ALL]) { + send = 1; + } else if ((l->event_list[(uint8_t)event->event_id])) { if (event->event_id != SWITCH_EVENT_CUSTOM || (event->subclass && switch_core_hash_find(l->event_hash, event->subclass->name))) { - if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { - switch_queue_push(l->event_queue, clone); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n"); - } + send = 1; } } + + if (send) { + if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { + switch_queue_push(l->event_queue, clone); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n"); + } + } + } switch_mutex_unlock(listen_list.mutex); } @@ -149,6 +162,13 @@ static void close_socket(switch_socket_t **sock) { SWITCH_MOD_DECLARE(switch_status_t) switch_module_shutdown(void) { + listener_t *l; + + switch_mutex_lock(listen_list.mutex); + for (l = listen_list.listeners; l; l = l->next) { + close_socket(&l->sock); + } + switch_mutex_unlock(listen_list.mutex); close_socket(&listen_list.sock); @@ -201,8 +221,165 @@ static void strip_cr(char *s) } } -static void parse_command(listener_t *listener, char *cmd, char *reply, uint32_t reply_len) +static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout) { + switch_size_t mlen; + char mbuf[1024] = ""; + char buf[1024] = ""; + switch_size_t len; + switch_status_t status = SWITCH_STATUS_SUCCESS; + int count = 0, bytes = 0; + uint32_t elapsed = 0; + time_t start = 0; + void *pop; + char *ptr; + uint8_t crcount = 0; + + *event = NULL; + start = time(NULL); + ptr = mbuf; + + while(listener->sock) { + uint8_t do_sleep = 1; + mlen = 1; + status = switch_socket_recv(listener->sock, ptr, &mlen); + + if (status != SWITCH_STATUS_BREAK && status != SWITCH_STATUS_SUCCESS) { + return status; + } + + if (status != SWITCH_STATUS_BREAK && mlen) { + bytes++; + + if (*mbuf == '\r' || *mbuf == '\n') { /* bah */ + ptr = mbuf; + continue; + } + + if (*ptr == '\n') { + crcount++; + } else if (*ptr != '\r') { + crcount = 0; + } + ptr++; + if (crcount == 2) { + char *next; + char *cur = mbuf; + + while(cur) { + if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) { + while (*next == '\r' || *next == '\n') { + next++; + } + } + count++; + if (count == 1) { + switch_event_create(event, SWITCH_EVENT_MESSAGE); + switch_event_add_header(*event, SWITCH_STACK_BOTTOM, "Command", mbuf); + } else { + char *var, *val; + var = mbuf; + if ((val = strchr(var, ':'))) { + *val++ = '\0'; + while(*val == ' ') { + val++; + } + } + if (var && val) { + switch_event_add_header(*event, SWITCH_STACK_BOTTOM, var, val); + } + } + + cur = next; + } + break; + } + } + + if (timeout) { + elapsed = (uint32_t)(time(NULL) - start); + if (elapsed >= timeout) { + switch_clear_flag_locked(listener, LFLAG_RUNNING); + return SWITCH_STATUS_FALSE; + } + } + + if (!*mbuf) { + if (switch_test_flag(listener, LFLAG_LOG)) { + if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { + char *data = (char *) pop; + + + if (data) { + snprintf(buf, sizeof(buf), "Content-Type: log/data\nContent-Length: %"APR_SSIZE_T_FMT"\n\n", strlen(data)); + len = strlen(buf) + 1; + switch_socket_send(listener->sock, buf, &len); + len = strlen(data) + 1; + switch_socket_send(listener->sock, data, &len); + + free(data); + } + do_sleep = 0; + } + } + + if (switch_test_flag(listener, LFLAG_EVENTS)) { + if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + char hbuf[512]; + switch_event_t *event = (switch_event_t *) pop; + char *etype, *packet, *xmlstr = NULL; + + do_sleep = 0; + if (listener->format == EVENT_FORMAT_PLAIN) { + etype = "plain"; + switch_event_serialize(event, buf, sizeof(buf), NULL); + packet = buf; + } else { + switch_xml_t xml; + etype = "xml"; + + if ((xml = switch_event_xmlize(event, NULL))) { + xmlstr = switch_xml_toxml(xml); + packet = xmlstr; + switch_xml_free(xml); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "XML ERROR!\n"); + continue; + } + } + + len = strlen(packet) + 1; + + snprintf(hbuf, sizeof(hbuf), "Content-Length: %"APR_SSIZE_T_FMT"\n" + "Content-Type: text/event-%s\n" + "\n", len, etype); + + len = strlen(hbuf) + 1; + switch_socket_send(listener->sock, hbuf, &len); + + len = strlen(packet) + 1; + switch_socket_send(listener->sock, packet, &len); + + if (xmlstr) { + free(xmlstr); + } + } + } + } + if (do_sleep) { + switch_yield(1000); + } + } + + return status; + +} + +static switch_status_t parse_command(listener_t *listener, switch_event_t *event, char *reply, uint32_t reply_len) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + char *cmd = switch_event_get_header(event, "command"); + *reply = '\0'; if (!strncasecmp(cmd, "exit", 4)) { @@ -262,7 +439,7 @@ static void parse_command(listener_t *listener, char *cmd, char *reply, uint32_t switch_socket_send(listener->sock, buf, &len); len = strlen(listener->retbuf) + 1; switch_socket_send(listener->sock, listener->retbuf, &len); - return; + return SWITCH_STATUS_SUCCESS; } } else if (!strncasecmp(cmd, "log", 3)) { @@ -356,10 +533,15 @@ static void parse_command(listener_t *listener, char *cmd, char *reply, uint32_t } done: + if (event) { + switch_event_destroy(&event); + } + if (switch_strlen_zero(reply)) { snprintf(reply, reply_len, "-ERR command not found"); } + return status; } static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) @@ -368,9 +550,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) char buf[1024]; switch_size_t len; switch_status_t status; - void *pop; - uint32_t elapsed; - time_t start = 0; + switch_event_t *event; char reply[512] = ""; assert(listener != NULL); @@ -386,127 +566,60 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) len = strlen(buf) + 1; switch_socket_send(listener->sock, buf, &len); - start = time(NULL); - while(!switch_test_flag(listener, LFLAG_AUTHED)) { - len = sizeof(buf); - memset(buf, 0, len); - status = switch_socket_recv(listener->sock, buf, &len); - if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_BREAK) { - break; - } - - if (len) { - parse_command(listener, buf, reply, sizeof(reply)); - if (!switch_strlen_zero(reply)) { - snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply); - len = strlen(buf) + 1; - switch_socket_send(listener->sock, buf, &len); - } + while (!switch_test_flag(listener, LFLAG_AUTHED)) { + + status = read_packet(listener, &event, 25); + if (status != SWITCH_STATUS_SUCCESS) { goto done; } - - if (status == SWITCH_STATUS_BREAK) { - elapsed = (uint32_t)(time(NULL) - start); - if (elapsed >= 15) { - switch_clear_flag_locked(listener, LFLAG_RUNNING); - break; - } - - //switch_yield(1000); + if (!event) { continue; } - + if (parse_command(listener, event, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) { + switch_clear_flag_locked(listener, LFLAG_RUNNING); + goto done; + } + if (!switch_strlen_zero(reply)) { + snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply); + len = strlen(buf) + 1; + switch_socket_send(listener->sock, buf, &len); + } + break; } - done: while(switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) { - uint8_t do_sleep = 1; + switch_event_t *event; + len = sizeof(buf); memset(buf, 0, len); - status = switch_socket_recv(listener->sock, buf, &len); + status = read_packet(listener, &event, 0); - if (!len && status != SWITCH_STATUS_BREAK) { + if (status != SWITCH_STATUS_SUCCESS) { break; } - - if (len) { - parse_command(listener, buf, reply, sizeof(reply)); - if (!switch_strlen_zero(reply)) { - snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply); - len = strlen(buf) + 1; - switch_socket_send(listener->sock, buf, &len); - } + + if (!event) { continue; } - if (switch_test_flag(listener, LFLAG_LOG)) { - if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { - char *data = (char *) pop; - if (data) { - snprintf(buf, sizeof(buf), "Content-Type: log/data\nContent-Length: %"APR_SSIZE_T_FMT"\n\n", strlen(data)); - len = strlen(buf) + 1; - switch_socket_send(listener->sock, buf, &len); - len = strlen(data) + 1; - switch_socket_send(listener->sock, data, &len); - - free(data); - } - do_sleep = 0; - } + if (parse_command(listener, event, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) { + switch_clear_flag_locked(listener, LFLAG_RUNNING); + break; } - if (switch_test_flag(listener, LFLAG_EVENTS)) { - if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { - char hbuf[512]; - switch_event_t *event = (switch_event_t *) pop; - char *etype, *packet, *xmlstr = NULL; - do_sleep = 0; - if (listener->format == EVENT_FORMAT_PLAIN) { - etype = "plain"; - switch_event_serialize(event, buf, sizeof(buf), NULL); - packet = buf; - } else { - switch_xml_t xml; - etype = "xml"; - - if ((xml = switch_event_xmlize(event, NULL))) { - xmlstr = switch_xml_toxml(xml); - packet = xmlstr; - switch_xml_free(xml); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "XML ERROR!\n"); - continue; - } - } + if (!switch_strlen_zero(reply)) { + snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply); + len = strlen(buf) + 1; + switch_socket_send(listener->sock, buf, &len); + } - len = strlen(packet) + 1; - - snprintf(hbuf, sizeof(hbuf), "Content-Length: %"APR_SSIZE_T_FMT"\n" - "Content-Type: text/event-%s\n" - "\n", len, etype); - - len = strlen(hbuf) + 1; - switch_socket_send(listener->sock, hbuf, &len); - - len = strlen(packet) + 1; - switch_socket_send(listener->sock, packet, &len); - - switch_event_destroy(&event); - - if (xmlstr) { - free(xmlstr); - } - } - } - - if (do_sleep) { - switch_yield(1000); - } } + done: + remove_listener(listener); close_socket(&listener->sock); diff --git a/src/switch_event.c b/src/switch_event.c index f55165eff7..01ad05c681 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -112,6 +112,7 @@ static char *EVENT_NAMES[] = { "SESSION_CRASH", "MODULE_LOAD", "DTMF", + "MESSAGE", "ALL" };