From 93fdf2d9d72bdbde16911cb638a59ab3c8fc0983 Mon Sep 17 00:00:00 2001 From: lazedo Date: Mon, 20 Jan 2020 16:12:05 +0000 Subject: [PATCH] [mod_kazoo] Fix potential memory leaks * fixes several potential memory leaks * formatting * fixes parameter name * logs unsupported parameters * fixes default event profile * fixes unconfigured bindings --- .../event_handlers/mod_kazoo/kazoo_commands.c | 1 + src/mod/event_handlers/mod_kazoo/kazoo_ei.h | 4 +- .../mod_kazoo/kazoo_ei_config.c | 23 ++- .../event_handlers/mod_kazoo/kazoo_ei_utils.c | 60 +++++++- .../mod_kazoo/kazoo_fetch_agent.c | 8 +- src/mod/event_handlers/mod_kazoo/kazoo_node.c | 133 ++++++++++-------- src/mod/event_handlers/mod_kazoo/mod_kazoo.c | 7 +- 7 files changed, 159 insertions(+), 77 deletions(-) diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_commands.c b/src/mod/event_handlers/mod_kazoo/kazoo_commands.c index 2e9a8f2459..3286c34811 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_commands.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_commands.c @@ -419,6 +419,7 @@ SWITCH_STANDARD_API(kz_http_put) if (fstat(fd, &file_info) == -1) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fstat() error: %s\n", strerror(errno)); stream->write_function(stream, "-ERR fstat error\n"); + close(fd); goto done; } close(fd); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h index 1fe826b846..6add449b2b 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h @@ -158,7 +158,7 @@ struct kz_globals_s { switch_hash_t *event_filter; int epmdfd; - int num_worker_threads; + int node_worker_threads; switch_bool_t nat_map; switch_bool_t ei_shortname; int ei_compat_rel; @@ -233,6 +233,8 @@ int ei_decode_string_or_binary_limited(char *buf, int *index, int maxsize, char int ei_decode_string_or_binary(char *buf, int *index, char **dst); switch_status_t create_acceptor(); switch_hash_t *create_default_filter(); +void kz_erl_init(); +void kz_erl_shutdown(); void fetch_config(); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c index 83828a25a2..e6bb4fa5ef 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c @@ -205,9 +205,9 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { } else if (!strcmp(var, "io-fault-tolerance")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val); kazoo_globals.io_fault_tolerance = atoi(val); - } else if (!strcmp(var, "num-worker-threads")) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set num-worker-threads: %s\n", val); - kazoo_globals.num_worker_threads = atoi(val); + } else if (!strcmp(var, "node-worker-threads")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set node-worker-threads: %s\n", val); + kazoo_globals.node_worker_threads = atoi(val); } else if (!strcmp(var, "json-term-encoding")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set json-term-encoding: %s\n", val); if(!strcmp(val, "map")) { @@ -219,6 +219,8 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { } else if (!strcmp(var, "expand-headers-on-fetch")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val); kazoo_globals.expand_headers_on_fetch = switch_true(val); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val); } } } @@ -319,9 +321,9 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { kazoo_globals.profile_vars_prefixes[i] = switch_core_strdup(kazoo_globals.pool, sep_array[i]); } - if (!kazoo_globals.num_worker_threads) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Number of worker threads not found in configuration, using default\n"); - kazoo_globals.num_worker_threads = 10; + if (!kazoo_globals.node_worker_threads) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Number of node worker threads not found in configuration, using default\n"); + kazoo_globals.node_worker_threads = 10; } if (zstr(kazoo_globals.ip)) { @@ -422,12 +424,9 @@ switch_status_t kazoo_config_handlers(switch_xml_t cfg) if(events == NULL) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get default handler for events\n"); - if(kazoo_globals.event_handlers != event_handlers) destroy_config(&event_handlers); - if(kazoo_globals.fetch_handlers != fetch_handlers) destroy_config(&fetch_handlers); - if(kazoo_globals.definitions != definitions) destroy_config(&definitions); - switch_xml_free(def); - switch_safe_free(xml); - return SWITCH_STATUS_GENERR; + destroy_config(&event_handlers); + event_handlers = kazoo_config_event_handlers(definitions, def); + events = (kazoo_event_profile_ptr) switch_core_hash_find(event_handlers->hash, "default"); } if(kazoo_globals.events != events) { diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c index 7f751db4f9..96e2c7de66 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c @@ -514,10 +514,6 @@ switch_status_t create_acceptor() { char ipbuf[48]; const char *ip_addr; -#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11 - ei_init(); -#endif - /* if the config has specified an erlang release compatibility then pass that along to the erlang interface */ if (kazoo_globals.ei_compat_rel) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Compatability with OTP R%d requested\n", kazoo_globals.ei_compat_rel); @@ -882,6 +878,7 @@ static void fetch_config_filters(switch_memory_pool_t *pool) kazoo_globals.config_fetched = 1; switch_xml_free(xml); } + switch_event_destroy(¶ms); } @@ -901,6 +898,7 @@ static void fetch_config_handlers(switch_memory_pool_t *pool) kazoo_globals.config_fetched = 1; switch_xml_free(xml); } + switch_event_destroy(¶ms); } @@ -935,6 +933,60 @@ void fetch_config() { } +#ifdef WITH_KAZOO_ERL_SHUTDOWN +#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11 + typedef struct ei_mutex_s { + #ifdef __WIN32__ + HANDLE lock; + #elif VXWORKS + SEM_ID lock; + #else /* unix */ + #if defined(HAVE_MIT_PTHREAD_H) || defined(HAVE_PTHREAD_H) + pthread_mutex_t *lock; + #else /* ! (HAVE_MIT_PTHREAD_H || HAVE_PTHREAD_H) */ + void *dummy; /* Actually never used */ + #endif /* ! (HAVE_MIT_PTHREAD_H || HAVE_PTHREAD_H) */ + #endif /* unix */ + } ei_mutex_t; + + typedef struct ei_socket_info_s { + int socket; + ei_socket_callbacks *cbs; + void *ctx; + int dist_version; + ei_cnode cnode; /* A copy, not a pointer. We don't know when freed */ + char cookie[EI_MAX_COOKIE_SIZE+1]; + } ei_socket_info; + + extern ei_socket_info *ei_sockets; + extern ei_mutex_t* ei_sockets_lock; + extern int ei_n_sockets; + extern int ei_sz_sockets; + + int ei_mutex_free(ei_mutex_t *l, int nblock); + +#endif +#endif + +void kz_erl_init() +{ +#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11 + ei_init(); +#endif +} + +void kz_erl_shutdown() +{ +#ifdef WITH_KAZOO_ERL_SHUTDOWN +#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11 + ei_mutex_free(ei_sockets_lock, 1); + ei_sockets_lock = NULL; + free(ei_sockets); + ei_sockets = NULL; + ei_n_sockets = ei_sz_sockets = 0; +#endif +#endif +} SWITCH_MODULE_RUNTIME_FUNCTION(mod_kazoo_runtime) { switch_os_socket_t os_socket; diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_fetch_agent.c b/src/mod/event_handlers/mod_kazoo/kazoo_fetch_agent.c index 555fe942bf..f48660d6eb 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_fetch_agent.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_fetch_agent.c @@ -309,8 +309,9 @@ static switch_xml_t fetch_handler(const char *section, const char *tag_name, con ,reply.uuid_str ,(unsigned int) (switch_micro_time_now() - now) / 1000 ,reply.xml_str); - - xml = switch_xml_parse_str_dynamic(reply.xml_str, SWITCH_FALSE); + if ((xml = switch_xml_parse_str_dynamic(reply.xml_str, SWITCH_FALSE)) == NULL) { + switch_safe_free(reply.xml_str); + } } else { /* facepalm */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Request for %s XML (%s) timed-out after %dms\n" @@ -327,6 +328,8 @@ void bind_fetch_profile(ei_xml_agent_t *agent, kazoo_config_ptr fetch_handlers) switch_hash_index_t *hi; kazoo_fetch_profile_ptr val = NULL, ptr = NULL; + if (!fetch_handlers) return; + for (hi = switch_core_hash_first(fetch_handlers->hash); hi; hi = switch_core_hash_next(&hi)) { switch_core_hash_this(hi, NULL, NULL, (void**) &val); if (val && val->section == agent->section) { @@ -335,6 +338,7 @@ void bind_fetch_profile(ei_xml_agent_t *agent, kazoo_config_ptr fetch_handlers) } } agent->profile = ptr; + switch_safe_free(hi); } void rebind_fetch_profiles(kazoo_config_ptr fetch_handlers) diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_node.c b/src/mod/event_handlers/mod_kazoo/kazoo_node.c index a92fe49aa2..273fcbaa7f 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_node.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_node.c @@ -106,6 +106,7 @@ static switch_status_t find_request(char *atom, int *request) { static void destroy_node_handler(ei_node_t *ei_node) { int pending = 0; void *pop; + switch_memory_pool_t *pool = ei_node->pool; switch_clear_flag(ei_node, LFLAG_RUNNING); @@ -145,7 +146,7 @@ static void destroy_node_handler(ei_node_t *ei_node) { switch_mutex_destroy(ei_node->event_streams_mutex); - switch_core_destroy_memory_pool(&ei_node->pool); + switch_core_destroy_memory_pool(&pool); } static switch_status_t add_to_ei_nodes(ei_node_t *this_ei_node) { @@ -319,15 +320,15 @@ static void *SWITCH_THREAD_FUNC bgapi3_exec(switch_thread_t *thread, void *obj) ei_node_t *ei_node = acs->ei_node; ei_send_msg_t *send_msg; - switch_malloc(send_msg, sizeof(*send_msg)); - memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid)); - if(!switch_test_flag(ei_node, LFLAG_RUNNING) || !switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Ignoring command while shuting down\n"); switch_atomic_dec(&ei_node->pending_bgapi); return NULL; } + switch_malloc(send_msg, sizeof(*send_msg)); + memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid)); + ei_x_new_with_version(&send_msg->buf); ei_x_encode_tuple_header(&send_msg->buf, 3); @@ -464,6 +465,7 @@ static void log_sendmsg_request(char *uuid, switch_event_t *event) } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|building xferext extension: %s %s\n", uuid, app_name, app_arg); + switch_safe_free(app_name); } } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|transfered call to xferext extension\n", uuid); @@ -692,20 +694,22 @@ static switch_status_t handle_request_command(ei_node_t *ei_node, erlang_pid *pi return erlang_response_badarg(rbuf); } + if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) { + return erlang_response_baduuid(rbuf); + } + switch_uuid_get(&cmd_uuid); switch_uuid_format(cmd_uuid_str, &cmd_uuid); switch_event_create(&event, SWITCH_EVENT_COMMAND); if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) { + switch_core_session_rwunlock(session); return erlang_response_badarg(rbuf); } log_sendmsg_request(uuid_str, event); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event-uuid", cmd_uuid_str); - if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) { - return erlang_response_baduuid(rbuf); - } switch_core_session_queue_private_event(session, &event, SWITCH_FALSE); switch_core_session_rwunlock(session); @@ -767,16 +771,18 @@ static switch_status_t handle_request_sendmsg(ei_node_t *ei_node, erlang_pid *pi return erlang_response_badarg(rbuf); } + if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) { + return erlang_response_baduuid(rbuf); + } + switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE); if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) { + switch_core_session_rwunlock(session); return erlang_response_badarg(rbuf); } log_sendmsg_request(uuid_str, event); - if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) { - return erlang_response_baduuid(rbuf); - } switch_core_session_queue_private_event(session, &event, SWITCH_FALSE); switch_core_session_rwunlock(session); @@ -903,25 +909,25 @@ static switch_status_t handle_request_bgapi4(ei_node_t *ei_node, erlang_pid *pid } static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { - char cmd[MAXATOMLEN + 1]; - char *arg; + char cmd[MAXATOMLEN + 1]; + char *arg; switch_stream_handle_t stream = { 0 }; SWITCH_STANDARD_STREAM(stream); switch_event_create(&stream.param_event, SWITCH_EVENT_API); - if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) { - return erlang_response_badarg(rbuf); - } + if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) { + return erlang_response_badarg(rbuf); + } - if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) { - return erlang_response_badarg(rbuf); - } + if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) { + return erlang_response_badarg(rbuf); + } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg); - if (rbuf) { - char *reply; + if (rbuf) { + char *reply; switch_status_t status; status = api_exec_stream(cmd, arg, &stream, &reply); @@ -940,17 +946,17 @@ static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid, ei_encode_switch_event_headers(rbuf, stream.param_event); } - switch_safe_free(reply); - } + switch_safe_free(reply); + } if (stream.param_event) { switch_event_fire(&stream.param_event); } - switch_safe_free(arg); + switch_safe_free(arg); switch_safe_free(stream.data); - return SWITCH_STATUS_SUCCESS; + return SWITCH_STATUS_SUCCESS; } static switch_status_t handle_request_json_api(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) @@ -976,11 +982,14 @@ static switch_status_t handle_request_json_api(ei_node_t *ei_node, erlang_pid *p ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "parse_error"); _ei_x_encode_string(rbuf, parse_end); + switch_safe_free(arg); return status; } if ((uuid = cJSON_GetObjectCstr(jcmd, "uuid"))) { if (!(session = switch_core_session_locate(uuid))) { + cJSON_Delete(jcmd); + switch_safe_free(arg); return erlang_response_baduuid(rbuf); } } @@ -1060,7 +1069,6 @@ static switch_status_t handle_request_event(ei_node_t *ei_node, erlang_pid *pid, } for (i = 1; i <= length; i++) { - if (ei_decode_atom_safe(buf->buff, &buf->index, event_name)) { switch_mutex_unlock(ei_node->event_streams_mutex); return erlang_response_badarg(rbuf); @@ -1237,13 +1245,10 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg * /* {'$gen_call', {_, _}, {_, _}} = Buf */ } else if (arity == 3 && !strncmp(atom, "$gen_call", 9)) { switch_status_t status; - ei_send_msg_t *send_msg; + ei_send_msg_t *send_msg = NULL; erlang_ref ref; switch_malloc(send_msg, sizeof(*send_msg)); - - ei_x_new(&send_msg->buf); - ei_x_new_with_version(&send_msg->buf); /* ...{_, _}, {_, _}} = Buf */ @@ -1252,6 +1257,8 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg * /* is_tuple(Type) */ if (type != ERL_SMALL_TUPLE_EXT) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call message of an unexpected type (ensure you are using Kazoo v2.14+).\n"); + ei_x_free(&send_msg->buf); + switch_safe_free(send_msg); return SWITCH_STATUS_GENERR; } @@ -1261,12 +1268,16 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg * /* ...pid(), _}, {_, _}} = Buf */ if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply pid (ensure you are using Kazoo v2.14+).\n"); + ei_x_free(&send_msg->buf); + switch_safe_free(send_msg); return SWITCH_STATUS_GENERR; } /* ...ref()}, {_, _}} = Buf */ if (ei_decode_ref(buf->buff, &buf->index, &ref)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply tag (ensure you are using Kazoo v2.14+).\n"); + ei_x_free(&send_msg->buf); + switch_safe_free(send_msg); return SWITCH_STATUS_GENERR; } @@ -1277,6 +1288,7 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg * status = handle_kazoo_request(ei_node, &msg->from, buf, &send_msg->buf); if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error queuing reply\n"); ei_x_free(&send_msg->buf); switch_safe_free(send_msg); } @@ -1292,15 +1304,14 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg * static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) { int version, size, type, arity; char atom[MAXATOMLEN + 1]; - ei_send_msg_t *send_msg; + ei_send_msg_t *send_msg = NULL; erlang_ref ref; - switch_malloc(send_msg, sizeof(*send_msg)); - - ei_x_new(&send_msg->buf); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message, attempting to reply\n"); + switch_malloc(send_msg, sizeof(*send_msg)); + ei_x_new_with_version(&send_msg->buf); + buf->index = 0; ei_decode_version(buf->buff, &buf->index, &version); ei_get_type(buf->buff, &buf->index, &type, &size); @@ -1308,7 +1319,7 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg /* is_tuple(Buff) */ if (type != ERL_SMALL_TUPLE_EXT) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message of an unexpected type\n"); - return SWITCH_STATUS_GENERR; + goto error; } ei_decode_tuple_header(buf->buff, &buf->index, &arity); @@ -1316,13 +1327,13 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg /* {_, _, _} = Buf */ if (arity != 3) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel tuple has an unexpected arity\n"); - return SWITCH_STATUS_GENERR; + goto error; } /* {'$gen_call', _, _} = Buf */ if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "$gen_call", 9)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message tuple does not begin with the atom '$gen_call'\n"); - return SWITCH_STATUS_GENERR; + goto error; } ei_get_type(buf->buff, &buf->index, &type, &size); @@ -1330,7 +1341,7 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg /* {_, Sender, _}=Buff, is_tuple(Sender) */ if (type != ERL_SMALL_TUPLE_EXT) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel tuple is an unexpected type\n"); - return SWITCH_STATUS_GENERR; + goto error; } ei_decode_tuple_header(buf->buff, &buf->index, &arity); @@ -1338,13 +1349,13 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg /* {_, _}=Sender */ if (arity != 2) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel message has an unexpected arity\n"); - return SWITCH_STATUS_GENERR; + goto error; } /* {Pid, Ref}=Sender */ if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid) || ei_decode_ref(buf->buff, &buf->index, &ref)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to decode erlang pid or ref of the net_kernel tuple second element\n"); - return SWITCH_STATUS_GENERR; + goto error; } ei_get_type(buf->buff, &buf->index, &type, &size); @@ -1352,7 +1363,7 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg /* {_, _, Request}=Buff, is_tuple(Request) */ if (type != ERL_SMALL_TUPLE_EXT) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message is an unexpected type\n"); - return SWITCH_STATUS_GENERR; + goto error; } ei_decode_tuple_header(buf->buff, &buf->index, &arity); @@ -1360,27 +1371,31 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg /* {_, _}=Request */ if (arity != 2) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message has an unexpected arity\n"); - return SWITCH_STATUS_GENERR; + goto error; } /* {is_auth, _}=Request */ if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "The net_kernel message third element does not begin with the atom 'is_auth'\n"); - return SWITCH_STATUS_GENERR; + goto error; } /* To ! {Tag, Reply} */ - ei_x_new_with_version(&send_msg->buf); ei_x_encode_tuple_header(&send_msg->buf, 2); ei_x_encode_ref(&send_msg->buf, &ref); ei_x_encode_atom(&send_msg->buf, "yes"); if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) { - ei_x_free(&send_msg->buf); - switch_safe_free(send_msg); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "unable to queue net kernel message\n"); + goto error; } return SWITCH_STATUS_SUCCESS; + +error: + ei_x_free(&send_msg->buf); + switch_safe_free(send_msg); + return SWITCH_STATUS_GENERR; } static switch_status_t handle_erl_send(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) { @@ -1435,7 +1450,7 @@ static void *SWITCH_THREAD_FUNC receive_handler(switch_thread_t *thread, void *o switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting erlang receive handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port); while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { - void *pop; + void *pop = NULL; if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) { ei_received_msg_t *received_msg = (ei_received_msg_t *) pop; @@ -1469,13 +1484,13 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { int status; int send_msg_count = 0; - void *pop; + void *pop = NULL; if (!received_msg) { switch_malloc(received_msg, sizeof(*received_msg)); /* create a new buf for the erlang message and a rbuf for the reply */ if(kazoo_globals.receive_msg_preallocate > 0) { - received_msg->buf.buff = malloc(kazoo_globals.receive_msg_preallocate); + switch_malloc(received_msg->buf.buff, kazoo_globals.receive_msg_preallocate); received_msg->buf.buffsz = kazoo_globals.receive_msg_preallocate; received_msg->buf.index = 0; if(received_msg->buf.buff == NULL) { @@ -1485,6 +1500,8 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) } else { ei_x_new(&received_msg->buf); } + } else { + received_msg->buf.index = 0; } while (++send_msg_count <= kazoo_globals.send_msg_batch @@ -1510,15 +1527,16 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) case ERL_MSG: fault_count = 0; - if (switch_queue_trypush(ei_node->received_msgs, received_msg) != SWITCH_STATUS_SUCCESS) { - ei_x_free(&received_msg->buf); - switch_safe_free(received_msg); - } - if (kazoo_globals.receive_msg_preallocate > 0 && received_msg->buf.buffsz > kazoo_globals.receive_msg_preallocate) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased received message buffer size to %d\n", received_msg->buf.buffsz); } + if (switch_queue_trypush(ei_node->received_msgs, received_msg) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "failed to push erlang received message from %s <%d.%d.%d> into queue\n", received_msg->msg.from.node, received_msg->msg.from.creation, received_msg->msg.from.num, received_msg->msg.from.serial); + ei_x_free(&received_msg->buf); + switch_safe_free(received_msg); + } + received_msg = NULL; break; case ERL_ERROR: @@ -1573,6 +1591,9 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) destroy_node_handler(ei_node); switch_atomic_dec(&kazoo_globals.threads); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown Complete for erlang node handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port); + return NULL; } @@ -1630,7 +1651,7 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "New erlang connection from node %s (%s:%d) -> (%s:%d)\n", ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port, ei_node->local_ip, ei_node->local_port); - for(i = 0; i < kazoo_globals.num_worker_threads; i++) { + for(i = 0; i < kazoo_globals.node_worker_threads; i++) { switch_threadattr_create(&thd_attr, ei_node->pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); diff --git a/src/mod/event_handlers/mod_kazoo/mod_kazoo.c b/src/mod/event_handlers/mod_kazoo/mod_kazoo.c index 1be309c28d..93ccb5aaf8 100644 --- a/src/mod/event_handlers/mod_kazoo/mod_kazoo.c +++ b/src/mod/event_handlers/mod_kazoo/mod_kazoo.c @@ -38,7 +38,9 @@ kz_globals_t kazoo_globals = {0}; SWITCH_MODULE_DEFINITION(mod_kazoo, mod_kazoo_load, mod_kazoo_shutdown, mod_kazoo_runtime); -SWITCH_MODULE_LOAD_FUNCTION(mod_kazoo_load) { +SWITCH_MODULE_LOAD_FUNCTION(mod_kazoo_load) +{ + kz_erl_init(); memset(&kazoo_globals, 0, sizeof(kazoo_globals)); kazoo_globals.pool = pool; @@ -84,7 +86,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_kazoo_load) { SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_kazoo_shutdown) { int sanity = 0; - remove_cli_api(); kz_tweaks_stop(); @@ -128,6 +129,8 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_kazoo_shutdown) { switch_safe_free(kazoo_globals.ei_cookie); switch_safe_free(kazoo_globals.ei_nodename); + kz_erl_shutdown(); + return SWITCH_STATUS_SUCCESS; }