From 97c25275a629f88700d28513a98f234d821ac9d7 Mon Sep 17 00:00:00 2001 From: Sergey Safarov Date: Tue, 23 Jun 2015 10:41:40 +0300 Subject: [PATCH] FS-7628: mod_erlang_event - added ipv6 support --- src/include/switch_apr.h | 1 + .../mod_erlang_event/ei_helpers.c | 59 +---- .../mod_erlang_event/handle_msg.c | 8 +- .../mod_erlang_event/mod_erlang_event.c | 215 +++++++++--------- .../mod_erlang_event/mod_erlang_event.h | 17 +- src/switch_apr.c | 5 + 6 files changed, 132 insertions(+), 173 deletions(-) diff --git a/src/include/switch_apr.h b/src/include/switch_apr.h index 55b6b85f5c..0bddcd4978 100644 --- a/src/include/switch_apr.h +++ b/src/include/switch_apr.h @@ -1103,6 +1103,7 @@ SWITCH_DECLARE(switch_status_t) switch_socket_connect(switch_socket_t *sock, swi SWITCH_DECLARE(uint16_t) switch_sockaddr_get_port(switch_sockaddr_t *sa); SWITCH_DECLARE(const char *) switch_get_addr(char *buf, switch_size_t len, switch_sockaddr_t *in); +SWITCH_DECLARE(switch_status_t) switch_getnameinfo(char **hostname, switch_sockaddr_t *sa, int32_t flags); SWITCH_DECLARE(int32_t) switch_sockaddr_get_family(switch_sockaddr_t *sa); SWITCH_DECLARE(switch_status_t) switch_sockaddr_ip_get(char **addr, switch_sockaddr_t *sa); SWITCH_DECLARE(int) switch_sockaddr_equal(const switch_sockaddr_t *sa1, const switch_sockaddr_t *sa2); diff --git a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c index ecadb76549..106cda6c93 100644 --- a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c +++ b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c @@ -64,6 +64,8 @@ void ei_link(listener_t *listener, erlang_pid * from, erlang_pid * to) char msgbuf[2048]; char *s; int index = 0; + switch_socket_t *sock = NULL; + switch_os_sock_put(&sock, &listener->sockdes, listener->pool); index = 5; /* max sizes: */ ei_encode_version(msgbuf, &index); /* 1 */ @@ -79,13 +81,9 @@ void ei_link(listener_t *listener, erlang_pid * from, erlang_pid * to) /* sum: 542 */ switch_mutex_lock(listener->sock_mutex); -#ifdef WIN32 - send(listener->sockfd, msgbuf, index, 0); -#else - if (write(listener->sockfd, msgbuf, index) == -1) { + if (switch_socket_send(sock, msgbuf, (switch_size_t *) &index)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to link to process on %s\n", listener->peer_nodename); } -#endif switch_mutex_unlock(listener->sock_mutex); } @@ -329,30 +327,17 @@ int ei_decode_string_or_binary(char *buf, int *index, int maxlen, char *dst) switch_status_t initialise_ei(struct ei_cnode_s *ec) { - int rv; - struct sockaddr_in server_addr; - struct hostent *nodehost; - char thishostname[EI_MAXHOSTNAMELEN + 1] = ""; + char *thishostname = NULL; char thisnodename[MAXNODELEN + 1]; char thisalivename[MAXNODELEN + 1]; char *atsign; - /* zero out the struct before we use it */ - memset(&server_addr, 0, sizeof(server_addr)); - - /* convert the configured IP to network byte order, handing errors */ - rv = switch_inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr); - if (rv == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip); - return SWITCH_STATUS_FALSE; - } else if (rv == -1) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno)); - return SWITCH_STATUS_FALSE; + if (zstr(listen_list.hostname) || !strncasecmp(prefs.ip, "0.0.0.0", 7) || !strncasecmp(prefs.ip, "::", 2)) { + listen_list.hostname=(char *) switch_core_get_hostname(); + } + if (strlen(listen_list.hostname) > EI_MAXHOSTNAMELEN) { + *(listen_list.hostname+EI_MAXHOSTNAMELEN) = '\0'; } - - /* set the address family and port */ - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(prefs.port); /* copy the prefs.nodename into something we can modify */ strncpy(thisalivename, prefs.nodename, MAXNODELEN); @@ -363,39 +348,19 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec) /* truncate the alivename at the @ */ *atsign = '\0'; } else { -#ifdef WIN32 - if ((nodehost = gethostbyaddr((const char *) &server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET))) -#else - if ((nodehost = gethostbyaddr((const char *) &server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET))) -#endif - memcpy(thishostname, nodehost->h_name, EI_MAXHOSTNAMELEN); - - if (zstr_buf(thishostname) || !strncasecmp(prefs.ip, "0.0.0.0", 7)) { - gethostname(thishostname, EI_MAXHOSTNAMELEN); - } - if (prefs.shortname) { char *off; - if ((off = strchr(thishostname, '.'))) { + if ((off = strchr(listen_list.hostname, '.'))) { *off = '\0'; } - } else { - if (!(_res.options & RES_INIT)) { - // init the resolver - res_init(); - } - if (_res.dnsrch[0] && !zstr_buf(_res.dnsrch[0])) { - strncat(thishostname, ".", 1); - strncat(thishostname, _res.dnsrch[0], EI_MAXHOSTNAMELEN - strlen(thishostname)); - } } - snprintf(thisnodename, MAXNODELEN + 1, "%s@%s", prefs.nodename, thishostname); + snprintf(thisnodename, MAXNODELEN + 1, "%s@%s", prefs.nodename, listen_list.hostname); } /* init the ei stuff */ - if (ei_connect_xinit(ec, thishostname, thisalivename, thisnodename, (Erl_IpAddr) (&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) { + if (ei_connect_xinit(ec, listen_list.hostname, thisalivename, thisnodename, (Erl_IpAddr) listen_list.addr, prefs.cookie, 0) < 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); return SWITCH_STATUS_FALSE; } diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 8461579a60..f161f94e3c 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -111,7 +111,7 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) _ei_x_encode_string(&ebuf, reply); switch_mutex_lock(acs->listener->sock_mutex); - ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); + ei_send(acs->listener->sockdes, &acs->pid, ebuf.buff, ebuf.index); switch_mutex_unlock(acs->listener->sock_mutex); #ifdef EI_DEBUG ei_x_print_msg(&ebuf, &acs->pid, 1); @@ -139,7 +139,7 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) switch_mutex_lock(acs->listener->sock_mutex); - ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); + ei_send(acs->listener->sockdes, &acs->pid, rbuf.buff, rbuf.index); switch_mutex_unlock(acs->listener->sock_mutex); #ifdef EI_DEBUG ei_x_print_msg(&rbuf, &acs->pid, 1); @@ -1332,7 +1332,7 @@ static switch_status_t handle_net_kernel_msg(listener_t *listener, erlang_msg * ei_x_encode_atom(rbuf, "yes"); switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &pid, rbuf->buff, rbuf->index); + ei_send(listener->sockdes, &pid, rbuf->buff, rbuf->index); switch_mutex_unlock(listener->sock_mutex); #ifdef EI_DEBUG ei_x_print_msg(rbuf, &pid, 1); @@ -1398,7 +1398,7 @@ int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buf return 0; } else if (rbuf->index > 1) { switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); + ei_send(listener->sockdes, &msg->from, rbuf->buff, rbuf->index); switch_mutex_unlock(listener->sock_mutex); #ifdef EI_DEBUG ei_x_print_msg(rbuf, &msg->from, 1); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 83d0ae7036..ce04dbcede 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -308,25 +308,17 @@ static void event_handler(switch_event_t *event) } -#ifdef WIN32 -static void close_socket(SOCKET * sock) -#else -static void close_socket(int *sock) -#endif +static void close_socket(switch_socket_t ** sock) { + switch_mutex_lock(listen_list.sock_mutex); if (*sock) { -#ifdef WIN32 - shutdown(*sock, SD_BOTH); - closesocket(*sock); -#else - shutdown(*sock, SHUT_RDWR); - close(*sock); -#endif - sock = NULL; + switch_socket_shutdown(*sock, SWITCH_SHUTDOWN_READWRITE); + switch_socket_close(*sock); + *sock = NULL; } + switch_mutex_unlock(listen_list.sock_mutex); } - static void add_listener(listener_t *listener) { /* add me to the listeners so I get events */ @@ -564,8 +556,8 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c on our condition before the action starts. */ switch_mutex_lock(ptr->listener->sock_mutex); - if (ptr->listener->sockfd) { - ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf); + if (ptr->listener->sockdes) { + ei_sendto(ptr->listener->ec, ptr->listener->sockdes, &ptr->process, &buf); } switch_mutex_unlock(ptr->listener->sock_mutex); } @@ -674,7 +666,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t * session_element->uuid_str); switch_mutex_lock(listener->sock_mutex); - result = ei_sendto(listener->ec, listener->sockfd, &session_element->process, &lbuf); + result = ei_sendto(listener->ec, listener->sockdes, &session_element->process, &lbuf); switch_mutex_unlock(listener->sock_mutex); if (result) { @@ -746,7 +738,7 @@ static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_s ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); + ei_sendto(listener->ec, listener->sockdes, &sp->process, &ebuf); switch_mutex_unlock(listener->sock_mutex); (*msgs_sent)++; ei_x_free(&ebuf); @@ -762,7 +754,7 @@ static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_s _ei_x_encode_string(&ebuf, sp->uuid_str); switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); + ei_sendto(listener->ec, listener->sockdes, &sp->process, &ebuf); (*msgs_sent)++; switch_mutex_unlock(listener->sock_mutex); ei_x_free(&ebuf); @@ -787,7 +779,7 @@ static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_s ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); + ei_sendto(listener->ec, listener->sockdes, &sp->process, &ebuf); switch_mutex_unlock(listener->sock_mutex); (*msgs_sent)++; ei_x_free(&ebuf); @@ -864,7 +856,7 @@ static int check_log_queue(listener_t *listener) ei_x_encode_empty_list(&lbuf); switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &lbuf); + ei_sendto(listener->ec, listener->sockdes, &listener->log_process, &lbuf); switch_mutex_unlock(listener->sock_mutex); msgs_sent ++; @@ -895,7 +887,7 @@ static int check_event_queue(listener_t *listener) ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &listener->event_process, &ebuf); + ei_sendto(listener->ec, listener->sockdes, &listener->event_process, &ebuf); switch_mutex_unlock(listener->sock_mutex); msgs_sent++; @@ -985,7 +977,7 @@ static void listener_main_loop(listener_t *listener) /* do we need the mutex when reading? */ /*switch_mutex_lock(listener->sock_mutex); */ - status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 1); + status = ei_xreceive_msg_tmo(listener->sockdes, &msg, &buf, 1); /*switch_mutex_unlock(listener->sock_mutex); */ switch (status) { @@ -1089,7 +1081,7 @@ static switch_bool_t check_inbound_acl(listener_t *listener) ei_x_buff buf; ei_x_new(&buf); - status = ei_xreceive_msg(listener->sockfd, &msg, &buf); + status = ei_xreceive_msg(listener->sockdes, &msg, &buf); /* get data off the socket, just so we can get the pid on the other end */ if (status == ERL_MSG) { /* if we got a message, return an ACL error. */ @@ -1101,7 +1093,7 @@ static switch_bool_t check_inbound_acl(listener_t *listener) ei_x_encode_atom(&rbuf, "acldeny"); switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index); + ei_send(listener->sockdes, &msg.from, rbuf.buff, rbuf.index); switch_mutex_unlock(listener->sock_mutex); #ifdef EI_DEBUG ei_x_print_msg(&rbuf, &msg.from, 1); @@ -1274,6 +1266,8 @@ static int config(void) prefs.max_event_bulk = atoi(val); } else if (!strcasecmp(var, "max-log-bulk") && !zstr(val)) { prefs.max_log_bulk = atoi(val); + } else if (!strcasecmp(var, "stop-on-bind-error")) { + prefs.stop_on_bind_error = switch_true(val) ? 1 : 0; } } } @@ -1333,7 +1327,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool); - listener->sockfd = clientfd; + listener->sockdes = clientfd; listener->pool = pool; listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode)); memcpy(listener->ec, ec, sizeof(ei_cnode)); @@ -1385,14 +1379,15 @@ void destroy_listener(listener_t * listener) const void *key; void *value; switch_hash_index_t *iter; + switch_socket_t *sock=NULL; + switch_os_sock_put(&sock, &listener->sockdes, listener->pool); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); switch_thread_rwlock_wrlock(listener->rwlock); switch_mutex_lock(listener->sock_mutex); - if (listener->sockfd) { - close_socket(&listener->sockfd); - } + close_socket(&sock); + listener->sockdes = -1; switch_mutex_unlock(listener->sock_mutex); switch_core_hash_destroy(&listener->event_hash); @@ -1514,7 +1509,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul char hash[100]; spawn_reply_t *p; erlang_ref ref; - + switch_os_socket_t sockdes; + switch_os_sock_get(&sockdes, listen_list.sock); ei_init_ref(listener->ec, &ref); ei_hash_ref(&ref, hash); @@ -1550,7 +1546,7 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul ei_x_encode_ref(&rbuf, &ref); ei_x_encode_pid(&rbuf, ei_self(listener->ec)); /* should lock with mutex? */ - ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); + ei_reg_send(listener->ec, sockdes, module, rbuf.buff, rbuf.index); #ifdef EI_DEBUG ei_x_print_reg_msg(&rbuf, module, 1); #endif @@ -1559,7 +1555,7 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); /* should lock with mutex? */ switch_mutex_lock(listener->sock_mutex); - ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); + ei_pid_from_rpc(listener->ec, sockdes, &ref, module, function); switch_mutex_unlock(listener->sock_mutex); /* char *argv[1]; @@ -1750,7 +1746,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) } if (listener) { - ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index); + ei_reg_send(listener->ec, listener->sockdes, reg_name, buf.buff, buf.index); switch_thread_rwlock_unlock(listener->rwlock); } @@ -1929,7 +1925,6 @@ SWITCH_STANDARD_API(erlang_cmd) return status; } - SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) { switch_application_interface_t *app_interface; @@ -1943,9 +1938,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) switch_thread_rwlock_create(&globals.bindings_rwlock, pool); switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool); switch_mutex_init(&globals.listener_count_mutex, SWITCH_MUTEX_UNNESTED, pool); + switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); switch_core_hash_init(&globals.fetch_reply_hash); /* intialize the unique reference stuff */ + switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&globals.ref_mutex, SWITCH_MUTEX_NESTED, pool); globals.reference0 = 0; globals.reference1 = 0; @@ -1953,7 +1950,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind to all events!\n"); - close_socket(&listen_list.sockfd); + close_socket(&listen_list.sock); return SWITCH_STATUS_GENERR; } @@ -1963,7 +1960,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) if (switch_xml_bind_search_function_ret(erlang_fetch, SWITCH_XML_SECTION_MAX, NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't set up xml search bindings!\n"); - close_socket(&listen_list.sockfd); + close_socket(&listen_list.sock); return SWITCH_STATUS_GENERR; } @@ -1983,92 +1980,80 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) return SWITCH_STATUS_SUCCESS; } - SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) { switch_memory_pool_t *pool = NULL, *listener_pool = NULL; - int rv; + switch_status_t rv; + switch_sockaddr_t *sa; + switch_os_socket_t sockdes; listener_t *listener; uint32_t x = 0; struct ei_cnode_s ec; ErlConnect conn; - struct sockaddr_in server_addr; - int on = 1; int clientfd; - int epmdfd; -#ifdef WIN32 - /* borrowed from MSDN, stupid winsock */ - WORD wVersionRequested; - WSADATA wsaData; - - wVersionRequested = MAKEWORD(2, 2); - - if (WSAStartup(wVersionRequested, &wsaData) != 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Winsock initialization failed, oh well\n"); - return SWITCH_STATUS_TERM; - } - - if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Your winsock version doesn't support the 2.2 specification, bailing\n"); - return SWITCH_STATUS_TERM; - } -#endif - - memset(&listen_list, 0, sizeof(listen_list)); - config(); + switch_os_socket_t epmdfd; + switch_socket_t *epmd_sock = NULL; if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); return SWITCH_STATUS_TERM; } - /* zero out the struct before we use it */ - memset(&server_addr, 0, sizeof(server_addr)); - - /* convert the configured IP to network byte order, handing errors */ - rv = switch_inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr); - if (rv == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip); - goto init_failed; - } else if (rv == -1) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno)); - goto init_failed; - } - - /* set the address family and port */ - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(prefs.port); + config(); /* do the socket setup ei is too lazy to do for us */ - for (;;) { + while (!prefs.done) { + switch_sockaddr_t *sa_local; - if ((listen_list.sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate socket on %s:%u\n", prefs.ip, prefs.port); + rv = switch_sockaddr_info_get(&sa, !strcmp(prefs.ip, "*") ? NULL : prefs.ip, SWITCH_UNSPEC, prefs.port, 0, pool); + if (rv) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip); + goto fail; + } + + rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool); + if (rv) { + goto sock_fail; + } + switch_os_sock_get(&sockdes, listen_list.sock); + rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1); + if (rv) { goto sock_fail; } #ifdef WIN32 - if (setsockopt(listen_list.sockfd, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, sizeof(on))) { -#else - if (setsockopt(listen_list.sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { + /* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */ + if (switch_sockaddr_get_family(sa) == AF_INET6) { + rv = switch_socket_opt_set(listen_list_apr.sock, IPV6_V6ONLY, 0); + if (rv) { + goto sock_fail; + } + } #endif - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to enable SO_REUSEADDR for socket on %s:%u : %s\n", prefs.ip, prefs.port, - strerror(errno)); + rv = switch_socket_bind(listen_list.sock, sa); + if (rv) { + goto sock_fail; + } + rv = switch_socket_listen(listen_list.sock, 5); + if (rv) { goto sock_fail; } - if (bind(listen_list.sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to bind to %s:%u\n", prefs.ip, prefs.port); - goto sock_fail; + switch_socket_addr_get(&sa_local, SWITCH_FALSE, listen_list.sock); + switch_get_addr(listen_list.addr, sizeof(listen_list.addr), sa_local); + switch_getnameinfo(&listen_list.hostname, sa_local, 0); + if (switch_sockaddr_get_family(sa) == AF_INET6) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on [%s]:%u\n", listen_list.addr, prefs.port); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", listen_list.addr, prefs.port); } - if (listen(listen_list.sockfd, 5) < 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to listen on %s:%u\n", prefs.ip, prefs.port); - goto sock_fail; - } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket %d up listening on %s:%u\n", listen_list.sockfd, prefs.ip, prefs.port); break; sock_fail: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", prefs.ip, prefs.port); + if (prefs.stop_on_bind_error) { + prefs.done = 1; + goto fail; + } switch_yield(100000); } @@ -2079,7 +2064,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) if (SWITCH_STATUS_SUCCESS != initialise_ei(&ec)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); - goto init_failed; + goto end; } /* return value is -1 for error, a descriptor pointing to epmd otherwise */ @@ -2088,12 +2073,12 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) if (system("epmd -daemon")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to start epmd manually! Is epmd in $PATH? If not, start it yourself or run an erl shell with -sname or -name\n"); - goto init_failed; + goto end; } switch_yield(100000); if ((epmdfd = ei_publish(&ec, prefs.port)) == -1) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to publish port to epmd AGAIN\n"); - goto init_failed; + goto end; } } @@ -2101,7 +2086,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) listen_list.ready = 1; - for (;;) { + + while (!prefs.done) { /* zero out errno because ei_accept doesn't differentiate between a * failed authentication or a socket failure, or a client version * mismatch or a godzilla attack */ @@ -2110,7 +2096,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) #else errno = 0; #endif - if ((clientfd = ei_accept_tmo(&ec, (int) listen_list.sockfd, &conn, 500)) == ERL_ERROR) { + if ((clientfd = ei_accept_tmo(&ec, (int) sockdes, &conn, 500)) == ERL_ERROR) { if (prefs.done) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n"); break; @@ -2140,47 +2126,50 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching listener, connection from node %s, ip %s\n", conn.nodename, listener->remote_ip); launch_listener_thread(listener); - } else + } else { /* if we fail to create a listener (memory error), then the module will exit */ break; + } } - + end: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Exiting module mod_erlang_event\n"); /* cleanup epmd registration */ - ei_unpublish(&ec); - close_socket(&epmdfd); + switch_os_sock_put(&epmd_sock, &epmdfd, pool); + close_socket(&epmd_sock); + epmdfd = -1; + + close_socket(&listen_list.sock); - init_failed: - close_socket(&listen_list.sockfd); if (pool) { switch_core_destroy_memory_pool(&pool); } + if (listener_pool) { switch_core_destroy_memory_pool(&listener_pool); } + for (x = 0; x < prefs.acl_count; x++) { switch_safe_free(prefs.acl[x]); } - prefs.done = 2; + fail: return SWITCH_STATUS_TERM; } - SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) { listener_t *l; int sanity = 0; + switch_socket_t *sock = NULL; - if (prefs.done == 0) /* main thread might already have exited */ - prefs.done = 1; + prefs.done = 1; switch_log_unbind_logger(socket_logger); - /*close_socket(&listen_list.sockfd); */ + close_socket(&listen_list.sock); - while (prefs.threads || prefs.done == 1) { + while (prefs.threads) { switch_yield(10000); if (++sanity == 1000) { break; @@ -2193,7 +2182,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) switch_thread_rwlock_wrlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { - close_socket(&l->sockfd); + switch_os_sock_put(&sock, &l->sockdes, l->pool); + close_socket(&sock); + l->sockdes = -1; } #ifdef WIN32 diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 967318b1d6..2761c1848d 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -112,11 +112,7 @@ typedef enum { 5 call sessions will be "attached" to the same listener. */ struct listener { -#ifdef WIN32 - SOCKET sockfd; -#else - int sockfd; -#endif + switch_os_socket_t sockdes; struct ei_cnode_s *ec; struct erlang_process log_process; struct erlang_process event_process; @@ -170,6 +166,7 @@ struct api_command_struct { }; struct globals_struct { + switch_mutex_t *listener_mutex; switch_thread_rwlock_t *listener_rwlock; switch_thread_rwlock_t *bindings_rwlock; switch_event_node_t *node; @@ -187,11 +184,10 @@ struct globals_struct { typedef struct globals_struct globals_t; struct listen_list_struct { -#ifdef WIN32 - SOCKET sockfd; -#else - int sockfd; -#endif + char *hostname; + char addr[64]; + switch_socket_t *sock; + switch_mutex_t *sock_mutex; listener_t *listeners; uint8_t ready; }; @@ -220,6 +216,7 @@ struct prefs_struct { int compat_rel; int max_event_bulk; int max_log_bulk; + int stop_on_bind_error; }; typedef struct prefs_struct prefs_t; diff --git a/src/switch_apr.c b/src/switch_apr.c index 9abc3b7035..35c00ee22e 100644 --- a/src/switch_apr.c +++ b/src/switch_apr.c @@ -913,6 +913,11 @@ SWITCH_DECLARE(int32_t) switch_sockaddr_get_family(switch_sockaddr_t *sa) return sa->family; } +SWITCH_DECLARE(switch_status_t) switch_getnameinfo(char **hostname, switch_sockaddr_t *sa, int32_t flags) +{ + return apr_getnameinfo(hostname, sa, flags); +} + SWITCH_DECLARE(switch_status_t) switch_socket_atmark(switch_socket_t *sock, int *atmark) { return apr_socket_atmark(sock, atmark);