From ab6382cafd3ff13dce7916b1770b1bd61fe38f01 Mon Sep 17 00:00:00 2001 From: George Joseph Date: Sat, 11 Apr 2015 15:39:29 -0600 Subject: [PATCH 1/3] res_pjsip: Refactor endpt_send_request to include transaction timeout This is the first follow-on to https://reviewboard.asterisk.org/r/4572/ and the discussion at http://lists.digium.com/pipermail/asterisk-dev/2015-March/073921.html Since we currently have no control over pjproject transaction timeout, this patch pulls the pjsip_endpt_send_request function out of pjproject and into res_pjsip/endpt_send_transaction in order to implement that capability. Now when the transaction is initiated, we also schedule our own pj_timer with our own desired timeout. If the transaction completes before either timeout, pjproject cancels its timer, and calls our tsx callback where we cancel our timer and run the app callback. If the pjproject timer times out first, pjproject calls our tsx callback where we cancel our timer and run the app callback. If our timer times out first, we terminate the transaction which causes pjproject to cancel its timer and call our tsx callback where we run the app callback. Regardless of the scenario, pjproject is calling the tsx callback inside the group_lock and there are checks in the callback to make sure it doesn't run twice. As part of this patch ast_sip_send_out_of_dialog_request was created to replace its similarly named private function. It takes a new timeout argument in milliseconds (<= 0 to disable the timeout). ASTERISK-24863 #close Reported-by: George Joseph Tested-by: George Joseph Change-Id: I0778dc730d9689c5147a444a04aee3c1026bf747 --- include/asterisk/res_pjsip.h | 24 ++++++ res/res_pjsip.c | 159 +++++++++++++++++++++++++++++++++-- 2 files changed, 177 insertions(+), 6 deletions(-) diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 2358a72813..0610c95e72 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1259,6 +1259,30 @@ int ast_sip_send_request(pjsip_tx_data *tdata, struct pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint, void *token, void (*callback)(void *token, pjsip_event *e)); +/*! + * \brief General purpose method for sending an Out-Of-Dialog SIP request + * + * This is a companion function for \ref ast_sip_create_request. The request + * created there can be passed to this function, though any request may be + * passed in. + * + * This will automatically set up handling outbound authentication challenges if + * they arrive. + * + * \param tdata The request to send + * \param endpoint Optional. If specified, the out-of-dialog request is sent to the endpoint. + * \param timeout. If non-zero, after the timeout the transaction will be terminated + * and the callback will be called with the PJSIP_EVENT_TIMER type. + * \param token Data to be passed to the callback upon receipt of out-of-dialog response. + * \param callback Callback to be called upon receipt of out-of-dialog response. + * + * \retval 0 Success + * \retval -1 Failure (out-of-dialog callback will not be called.) + */ +int ast_sip_send_out_of_dialog_request(pjsip_tx_data *tdata, + struct ast_sip_endpoint *endpoint, int timeout, void *token, + void (*callback)(void *token, pjsip_event *e)); + /*! * \brief General purpose method for creating a SIP response * diff --git a/res/res_pjsip.c b/res/res_pjsip.c index fcd8516b65..108c5b32dd 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -21,6 +21,8 @@ #include /* Needed for SUBSCRIBE, NOTIFY, and PUBLISH method definitions */ #include +#include +#include #include #include "asterisk/res_pjsip.h" @@ -2815,6 +2817,128 @@ static pj_bool_t does_method_match(const pj_str_t *message_method, const char *s /*! Maximum number of challenges before assuming that we are in a loop */ #define MAX_RX_CHALLENGES 10 +#define TIMER_INACTIVE 0 +#define TIMEOUT_TIMER2 5 + +struct tsx_data { + void *token; + void (*cb)(void*, pjsip_event*); + pjsip_transaction *tsx; + pj_timer_entry *timeout_timer; +}; + +static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event); + +pjsip_module send_tsx_module = { + .name = { "send_tsx_module", 23 }, + .id = -1, + .priority = PJSIP_MOD_PRIORITY_APPLICATION, + .on_tsx_state = &send_tsx_on_tsx_state, +}; + +/*! \brief This is the pjsip_tsx_send_msg callback */ +static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event) +{ + struct tsx_data *tsx_data; + + if (event->type != PJSIP_EVENT_TSX_STATE) { + return; + } + + tsx_data = (struct tsx_data*) tsx->mod_data[send_tsx_module.id]; + if (tsx_data == NULL) { + return; + } + + if (tsx->status_code < 200) { + return; + } + + if (event->body.tsx_state.type == PJSIP_EVENT_TIMER) { + ast_debug(1, "PJSIP tsx timer expired\n"); + } + + if (tsx_data->timeout_timer && tsx_data->timeout_timer->id != TIMER_INACTIVE) { + pj_mutex_lock(tsx->mutex_b); + pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt), + tsx_data->timeout_timer, TIMER_INACTIVE); + pj_mutex_unlock(tsx->mutex_b); + } + + /* Call the callback, if any, and prevent the callback from being called again + * by clearing the transaction's module_data. + */ + tsx->mod_data[send_tsx_module.id] = NULL; + + if (tsx_data->cb) { + (*tsx_data->cb)(tsx_data->token, event); + } +} + +static void tsx_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry) +{ + struct tsx_data *tsx_data = entry->user_data; + + entry->id = TIMER_INACTIVE; + ast_debug(1, "Internal tsx timer expired\n"); + pjsip_tsx_terminate(tsx_data->tsx, PJSIP_SC_TSX_TIMEOUT); +} + +static pj_status_t endpt_send_transaction(pjsip_endpoint *endpt, + pjsip_tx_data *tdata, int timeout, void *token, + pjsip_endpt_send_callback cb) +{ + pjsip_transaction *tsx; + struct tsx_data *tsx_data; + pj_status_t status; + pjsip_event event; + + ast_assert(endpt && tdata); + + status = pjsip_tsx_create_uac(&send_tsx_module, tdata, &tsx); + if (status != PJ_SUCCESS) { + pjsip_tx_data_dec_ref(tdata); + ast_log(LOG_ERROR, "Unable to create pjsip uac\n"); + return status; + } + + tsx_data = PJ_POOL_ALLOC_T(tsx->pool, struct tsx_data); + tsx_data->token = token; + tsx_data->cb = cb; + tsx_data->tsx = tsx; + if (timeout > 0) { + tsx_data->timeout_timer = PJ_POOL_ALLOC_T(tsx->pool, pj_timer_entry); + } else { + tsx_data->timeout_timer = NULL; + } + tsx->mod_data[send_tsx_module.id] = tsx_data; + + PJSIP_EVENT_INIT_TX_MSG(event, tdata); + pjsip_tx_data_set_transport(tdata, &tsx->tp_sel); + + if (timeout > 0) { + pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 }; + + pj_timer_entry_init(tsx_data->timeout_timer, TIMEOUT_TIMER2, + tsx_data, &tsx_timer_callback); + pj_mutex_lock(tsx->mutex_b); + pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt), + tsx_data->timeout_timer, TIMER_INACTIVE); + pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(tsx->endpt), + tsx_data->timeout_timer, &timeout_timer_val); + tsx_data->timeout_timer->id = TIMEOUT_TIMER2; + pj_mutex_unlock(tsx->mutex_b); + } + + status = (*tsx->state_handler)(tsx, &event); + pjsip_tx_data_dec_ref(tdata); + if (status != PJ_SUCCESS) { + ast_log(LOG_ERROR, "Unable to send message\n"); + return status; + } + + return status; +} /*! \brief Structure to hold information about an outbound request */ struct send_request_data { @@ -2874,7 +2998,7 @@ static void endpt_send_request_wrapper(void *token, pjsip_event *e) } static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint, - pjsip_tx_data *tdata, pj_int32_t timeout, void *token, pjsip_endpt_send_callback cb) + pjsip_tx_data *tdata, int timeout, void *token, pjsip_endpt_send_callback cb) { struct send_request_wrapper *req_wrapper; pj_status_t ret_val; @@ -2890,7 +3014,7 @@ static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint, req_wrapper->callback = cb; ao2_ref(req_wrapper, +1); - ret_val = pjsip_endpt_send_request(ast_sip_get_pjsip_endpoint(), tdata, timeout, + ret_val = endpt_send_transaction(ast_sip_get_pjsip_endpoint(), tdata, timeout, req_wrapper, endpt_send_request_wrapper); if (ret_val != PJ_SUCCESS) { char errmsg[PJ_ERR_MSG_SIZE]; @@ -2930,6 +3054,10 @@ static void send_request_cb(void *token, pjsip_event *e) int res; switch(e->body.tsx_state.type) { + case PJSIP_EVENT_USER: + /* Map USER (transaction cancelled by timeout) to TIMER */ + e->body.tsx_state.type = PJSIP_EVENT_TIMER; + break; case PJSIP_EVENT_TRANSPORT_ERROR: case PJSIP_EVENT_TIMER: break; @@ -2980,8 +3108,9 @@ static void send_request_cb(void *token, pjsip_event *e) ao2_ref(req_data, -1); } -static int send_out_of_dialog_request(pjsip_tx_data *tdata, struct ast_sip_endpoint *endpoint, - void *token, void (*callback)(void *token, pjsip_event *e)) +int ast_sip_send_out_of_dialog_request(pjsip_tx_data *tdata, + struct ast_sip_endpoint *endpoint, int timeout, void *token, + void (*callback)(void *token, pjsip_event *e)) { struct ast_sip_supplement *supplement; struct send_request_data *req_data; @@ -3007,7 +3136,7 @@ static int send_out_of_dialog_request(pjsip_tx_data *tdata, struct ast_sip_endpo ast_sip_mod_data_set(tdata->pool, tdata->mod_data, supplement_module.id, MOD_DATA_CONTACT, NULL); ao2_cleanup(contact); - if (endpt_send_request(endpoint, tdata, -1, req_data, send_request_cb) + if (endpt_send_request(endpoint, tdata, timeout, req_data, send_request_cb) != PJ_SUCCESS) { ao2_cleanup(req_data); return -1; @@ -3025,7 +3154,7 @@ int ast_sip_send_request(pjsip_tx_data *tdata, struct pjsip_dialog *dlg, if (dlg) { return send_in_dialog_request(tdata, dlg); } else { - return send_out_of_dialog_request(tdata, endpoint, token, callback); + return ast_sip_send_out_of_dialog_request(tdata, endpoint, -1, token, callback); } } @@ -3543,8 +3672,25 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + if (internal_sip_register_service(&send_tsx_module)) { + ast_log(LOG_ERROR, "Failed to initialize send request module. Aborting load\n"); + internal_sip_unregister_service(&supplement_module); + ast_sip_destroy_distributor(); + ast_res_pjsip_destroy_configuration(); + ast_sip_destroy_global_headers(); + stop_monitor_thread(); + ast_sip_destroy_system(); + pj_pool_release(memory_pool); + memory_pool = NULL; + pjsip_endpt_destroy(ast_pjsip_endpoint); + ast_pjsip_endpoint = NULL; + pj_caching_pool_destroy(&caching_pool); + return AST_MODULE_LOAD_DECLINE; + } + if (internal_sip_initialize_outbound_authentication()) { ast_log(LOG_ERROR, "Failed to initialize outbound authentication. Aborting load\n"); + internal_sip_unregister_service(&send_tsx_module); internal_sip_unregister_service(&supplement_module); ast_sip_destroy_distributor(); ast_res_pjsip_destroy_configuration(); @@ -3588,6 +3734,7 @@ static int unload_pjsip(void *data) ast_res_pjsip_destroy_configuration(); ast_sip_destroy_system(); ast_sip_destroy_global_headers(); + internal_sip_unregister_service(&send_tsx_module); internal_sip_unregister_service(&supplement_module); if (monitor_thread) { stop_monitor_thread(); From 51886c68dc13edf127e64218528b077a5f6de967 Mon Sep 17 00:00:00 2001 From: George Joseph Date: Sat, 11 Apr 2015 15:56:52 -0600 Subject: [PATCH 2/3] pjsip_options: Add qualify_timeout processing and eventing This is the second follow-on to https://reviewboard.asterisk.org/r/4572/ and the discussion at http://lists.digium.com/pipermail/asterisk-dev/2015-March/073921.html The basic issues are that changes in contact status don't cause events to be emitted for the associated endpoint. Only dynamic contact add/delete actions update the endpoint. Also, the qualify timeout is fixed by pjsip at 32 seconds which is a long time. This patch makes use of the new transaction timeout feature in r4585 and provides the following capabilities... 1. A new aor/contact variable 'qualify_timeout' has been added that allows the user to specify the maximum time in milliseconds to wait for a response to an OPTIONS message. The default is 3000ms. When the timer expires, the contact is marked unavailable. 2. Contact status changes are now propagated up to the endpoint as follows... When any contact is 'Available', the endpoint is marked as 'Reachable'. When all contacts are 'Unavailable', the endpoint is marked as 'Unreachable'. The existing endpoint events are generated appropriately. ASTERISK-24863 #close Change-Id: Id0ce0528e58014da1324856ea537e7765466044a Tested-by: Dmitriy Serov Tested-by: George Joseph --- CHANGES | 8 ++ configs/samples/pjsip.conf.sample | 1 + .../461d7d691209_add_pjsip_qualify_timeout.py | 25 ++++ include/asterisk/endpoints.h | 10 ++ include/asterisk/res_pjsip.h | 15 +++ main/endpoints.c | 8 ++ res/res_pjsip.c | 16 +++ res/res_pjsip/location.c | 39 +++++- res/res_pjsip/pjsip_configuration.c | 127 ++++++++++++++++-- res/res_pjsip/pjsip_options.c | 68 ++++++++-- 10 files changed, 292 insertions(+), 25 deletions(-) create mode 100644 contrib/ast-db-manage/config/versions/461d7d691209_add_pjsip_qualify_timeout.py diff --git a/CHANGES b/CHANGES index 4237c82e29..7e54a20956 100644 --- a/CHANGES +++ b/CHANGES @@ -139,6 +139,14 @@ res_pjsip * A new CLI command has been added: "pjsip show settings", which shows both the global and system configuration settings. + * A new aor option has been added: "qualify_timeout", which sets the timeout + in seconds for a qualify. The default is 3 seconds. This overrides the + hard coded 32 seconds in pjproject. + + * Endpoint status will now change to "Unreachable" when all contacts are + unavailable. When any contact becomes available, the endpoint will status + will change back to "Reachable". + res_ari_channels ------------------ * Two new events, 'ChannelHold' and 'ChannelUnhold', have been added to the diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index d3bb518f15..57b712a873 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -812,6 +812,7 @@ ; (default: "no") ;type= ; Must be of type aor (default: "") ;qualify_frequency=0 ; Interval at which to qualify an AoR (default: "0") +;qualify_timeout=3.0 ; Qualify timeout in fractional seconds (default: "3.0") ;authenticate_qualify=no ; Authenticates a qualify request if needed ; (default: "no") ;outbound_proxy= ; Outbound proxy used when sending OPTIONS request diff --git a/contrib/ast-db-manage/config/versions/461d7d691209_add_pjsip_qualify_timeout.py b/contrib/ast-db-manage/config/versions/461d7d691209_add_pjsip_qualify_timeout.py new file mode 100644 index 0000000000..9600c04611 --- /dev/null +++ b/contrib/ast-db-manage/config/versions/461d7d691209_add_pjsip_qualify_timeout.py @@ -0,0 +1,25 @@ +"""add pjsip qualify_timeout + +Revision ID: 461d7d691209 +Revises: 31cd4f4891ec +Create Date: 2015-04-15 13:54:08.047851 + +""" + +# revision identifiers, used by Alembic. +revision = '461d7d691209' +down_revision = '31cd4f4891ec' + +from alembic import op +import sqlalchemy as sa + +def upgrade(): + op.add_column('ps_aors', sa.Column('qualify_timeout', sa.Integer)) + op.add_column('ps_contacts', sa.Column('qualify_timeout', sa.Integer)) + pass + + +def downgrade(): + op.drop_column('ps_aors', 'qualify_timeout') + op.drop_column('ps_contacts', 'qualify_timeout') + pass diff --git a/include/asterisk/endpoints.h b/include/asterisk/endpoints.h index 663dd94d9c..c9cb6b9de7 100644 --- a/include/asterisk/endpoints.h +++ b/include/asterisk/endpoints.h @@ -159,6 +159,16 @@ const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint); */ const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint); +/*! + * \brief Gets the state of the given endpoint. + * + * \param endpoint The endpoint. + * \return state. + * \return \c AST_ENDPOINT_UNKNOWN if endpoint is \c NULL. + * \since 13.4 + */ +enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint); + /*! * \brief Updates the state of the given endpoint. * diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 0610c95e72..184cb57451 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -166,6 +166,8 @@ struct ast_sip_contact { unsigned int qualify_frequency; /*! If true authenticate the qualify if needed */ int authenticate_qualify; + /*! Qualify timeout. 0 is diabled. */ + double qualify_timeout; }; #define CONTACT_STATUS "contact_status" @@ -192,6 +194,8 @@ struct ast_sip_contact_status { struct timeval rtt_start; /*! The round trip time in microseconds */ int64_t rtt; + /*! Last status for a contact (default - unavailable) */ + enum ast_sip_contact_status_type last_status; }; /*! @@ -224,6 +228,8 @@ struct ast_sip_aor { struct ao2_container *permanent_contacts; /*! Determines whether SIP Path headers are supported */ unsigned int support_path; + /*! Qualify timeout. 0 is diabled. */ + double qualify_timeout; }; /*! @@ -904,6 +910,15 @@ struct ao2_container *ast_sip_location_retrieve_aor_contacts(const struct ast_si */ struct ast_sip_contact *ast_sip_location_retrieve_contact_from_aor_list(const char *aor_list); +/*! + * \brief Retrieve all contacts from a list of AORs + * + * \param aor_list A comma-separated list of AOR names + * \retval NULL if no contacts available + * \retval non-NULL container (which must be freed) if contacts available + */ +struct ao2_container *ast_sip_location_retrieve_contacts_from_aor_list(const char *aor_list); + /*! * \brief Retrieve the first bound contact AND the AOR chosen from a list of AORs * diff --git a/main/endpoints.c b/main/endpoints.c index c70170b41f..df9d289c7f 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -415,6 +415,14 @@ const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint) return endpoint->id; } +enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return AST_ENDPOINT_UNKNOWN; + } + return endpoint->state; +} + void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state) { diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 108c5b32dd..4f77b51021 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1011,6 +1011,14 @@ If 0 never qualify. Time in seconds. + + Timeout for qualify + + If the contact doesn't repond to the OPTIONS request before the timeout, + the contact is marked unavailable. + If 0 no timeout. Time in fractional seconds. + + Outbound proxy used when sending OPTIONS request @@ -1125,6 +1133,14 @@ If 0 never qualify. Time in seconds. + + Timeout for qualify + + If the contact doesn't repond to the OPTIONS request before the timeout, + the contact is marked unavailable. + If 0 no timeout. Time in fractional seconds. + + Authenticates a qualify request if needed diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c index 73ffdca0e1..f784cb40fb 100644 --- a/res/res_pjsip/location.c +++ b/res/res_pjsip/location.c @@ -188,6 +188,40 @@ struct ast_sip_contact *ast_sip_location_retrieve_contact_from_aor_list(const ch return contact; } +static int permanent_uri_sort_fn(const void *obj_left, const void *obj_right, int flags); +static int cli_contact_populate_container(void *obj, void *arg, int flags); + +static int gather_contacts_for_aor(void *obj, void *arg, int flags) +{ + struct ao2_container *aor_contacts; + struct ast_sip_aor *aor = obj; + struct ao2_container *container = arg; + + aor_contacts = ast_sip_location_retrieve_aor_contacts(aor); + if (!aor_contacts) { + return 0; + } + ao2_callback(aor_contacts, OBJ_MULTIPLE | OBJ_NODATA, cli_contact_populate_container, + container); + ao2_ref(aor_contacts, -1); + return CMP_MATCH; +} + +struct ao2_container *ast_sip_location_retrieve_contacts_from_aor_list(const char *aor_list) +{ + struct ao2_container *contacts; + + contacts = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, + AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, permanent_uri_sort_fn, NULL); + if (!contacts) { + return NULL; + } + + ast_sip_for_each_aor(aor_list, gather_contacts_for_aor, contacts); + + return contacts; +} + struct ast_sip_contact *ast_sip_location_retrieve_contact(const char *contact_name) { return ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "contact", contact_name); @@ -208,6 +242,7 @@ int ast_sip_location_add_contact(struct ast_sip_aor *aor, const char *uri, ast_string_field_set(contact, uri, uri); contact->expiration_time = expiration_time; contact->qualify_frequency = aor->qualify_frequency; + contact->qualify_timeout = aor->qualify_timeout; contact->authenticate_qualify = aor->authenticate_qualify; if (path_info && aor->support_path) { ast_string_field_set(contact, path, path_info); @@ -853,7 +888,8 @@ int ast_sip_initialize_sorcery_location(void) ast_sorcery_object_field_register(sorcery, "contact", "path", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, path)); ast_sorcery_object_field_register_custom(sorcery, "contact", "expiration_time", "", expiration_str2struct, expiration_struct2str, NULL, 0, 0); ast_sorcery_object_field_register(sorcery, "contact", "qualify_frequency", 0, OPT_UINT_T, - PARSE_IN_RANGE, FLDSET(struct ast_sip_contact, qualify_frequency), 0, 86400); + PARSE_IN_RANGE, FLDSET(struct ast_sip_contact, qualify_frequency), 0, 86400); + ast_sorcery_object_field_register(sorcery, "contact", "qualify_timeout", "3.0", OPT_DOUBLE_T, 0, FLDSET(struct ast_sip_contact, qualify_timeout)); ast_sorcery_object_field_register(sorcery, "contact", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, outbound_proxy)); ast_sorcery_object_field_register(sorcery, "contact", "user_agent", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, user_agent)); @@ -862,6 +898,7 @@ int ast_sip_initialize_sorcery_location(void) ast_sorcery_object_field_register(sorcery, "aor", "maximum_expiration", "7200", OPT_UINT_T, 0, FLDSET(struct ast_sip_aor, maximum_expiration)); ast_sorcery_object_field_register(sorcery, "aor", "default_expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_aor, default_expiration)); ast_sorcery_object_field_register(sorcery, "aor", "qualify_frequency", 0, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_sip_aor, qualify_frequency), 0, 86400); + ast_sorcery_object_field_register(sorcery, "aor", "qualify_timeout", "3.0", OPT_DOUBLE_T, 0, FLDSET(struct ast_sip_aor, qualify_timeout)); ast_sorcery_object_field_register(sorcery, "aor", "authenticate_qualify", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_aor, authenticate_qualify)); ast_sorcery_object_field_register(sorcery, "aor", "max_contacts", "0", OPT_UINT_T, 0, FLDSET(struct ast_sip_aor, max_contacts)); ast_sorcery_object_field_register(sorcery, "aor", "remove_existing", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_aor, remove_existing)); diff --git a/res/res_pjsip/pjsip_configuration.c b/res/res_pjsip/pjsip_configuration.c index 0eecb5e0a2..ab0d084494 100644 --- a/res/res_pjsip/pjsip_configuration.c +++ b/res/res_pjsip/pjsip_configuration.c @@ -19,6 +19,7 @@ #include "asterisk/utils.h" #include "asterisk/sorcery.h" #include "asterisk/callerid.h" +#include "asterisk/test.h" /*! \brief Number of buckets for persistent endpoint information */ #define PERSISTENT_BUCKETS 53 @@ -59,31 +60,66 @@ static int persistent_endpoint_cmp(void *obj, void *arg, int flags) static int persistent_endpoint_update_state(void *obj, void *arg, int flags) { struct sip_persistent_endpoint *persistent = obj; + struct ast_endpoint *endpoint = persistent->endpoint; char *aor = arg; - RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup); - RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); + struct ao2_container *contacts; + struct ast_json *blob; + struct ao2_iterator i; + struct ast_sip_contact *contact; + enum ast_endpoint_state state = AST_ENDPOINT_OFFLINE; if (!ast_strlen_zero(aor) && !strstr(persistent->aors, aor)) { return 0; } - if ((contact = ast_sip_location_retrieve_contact_from_aor_list(persistent->aors))) { - ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_ONLINE); - blob = ast_json_pack("{s: s}", "peer_status", "Reachable"); - } else { - ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_OFFLINE); - blob = ast_json_pack("{s: s}", "peer_status", "Unreachable"); + /* Find all the contacts for this endpoint. If ANY are available, + * mark the endpoint as ONLINE. + */ + contacts = ast_sip_location_retrieve_contacts_from_aor_list(persistent->aors); + if (contacts) { + i = ao2_iterator_init(contacts, 0); + while ((contact = ao2_iterator_next(&i)) + && state == AST_ENDPOINT_OFFLINE) { + struct ast_sip_contact_status *contact_status; + const char *contact_id = ast_sorcery_object_get_id(contact); + + contact_status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), + CONTACT_STATUS, contact_id); + + if (contact_status && contact_status->status == AVAILABLE) { + state = AST_ENDPOINT_ONLINE; + } + ao2_cleanup(contact_status); + ao2_ref(contact, -1); + } + ao2_iterator_destroy(&i); + ao2_ref(contacts, -1); } - ast_endpoint_blob_publish(persistent->endpoint, ast_endpoint_state_type(), blob); + /* If there was no state change, don't publish anything. */ + if (ast_endpoint_get_state(endpoint) == state) { + return 0; + } - ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(persistent->endpoint)); + if (state == AST_ENDPOINT_ONLINE) { + ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE); + blob = ast_json_pack("{s: s}", "peer_status", "Reachable"); + ast_verb(1, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint)); + } else { + ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE); + blob = ast_json_pack("{s: s}", "peer_status", "Unreachable"); + ast_verb(1, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint)); + } + + ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob); + ast_json_unref(blob); + ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint)); return 0; } /*! \brief Function called when stuff relating to a contact happens (created/deleted) */ -static void persistent_endpoint_contact_observer(const void *object) +static void persistent_endpoint_contact_created_observer(const void *object) { char *id = ast_strdupa(ast_sorcery_object_get_id(object)), *aor = NULL; @@ -92,12 +128,74 @@ static void persistent_endpoint_contact_observer(const void *object) ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor); } +/*! \brief Function called when stuff relating to a contact happens (created/deleted) */ +static void persistent_endpoint_contact_deleted_observer(const void *object) +{ + char *id = ast_strdupa(ast_sorcery_object_get_id(object)); + char *aor = NULL; + char *contact = NULL; + + aor = id; + /* Dynamic contacts are delimited with ";@" and static ones with "@@" */ + if ((contact = strstr(id, ";@")) || (contact = strstr(id, "@@"))) { + *contact = '\0'; + contact += 2; + } else { + contact = id; + } + + ast_verb(1, "Contact %s/%s is now Unavailable\n", aor, contact); + + ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor); +} + /*! \brief Observer for contacts so state can be updated on respective endpoints */ static const struct ast_sorcery_observer state_contact_observer = { - .created = persistent_endpoint_contact_observer, - .deleted = persistent_endpoint_contact_observer, + .created = persistent_endpoint_contact_created_observer, + .deleted = persistent_endpoint_contact_deleted_observer, }; +/*! \brief Function called when stuff relating to a contact status happens (updated) */ +static void persistent_endpoint_contact_status_observer(const void *object) +{ + const struct ast_sip_contact_status *contact_status = object; + char *id = ast_strdupa(ast_sorcery_object_get_id(object)); + char *aor = NULL; + char *contact = NULL; + + /* If rtt_start is set (this is the outgoing OPTIONS) or + * there's no status change, ignore. + */ + if (contact_status->rtt_start.tv_sec > 0 + || contact_status->status == contact_status->last_status) { + return; + } + + aor = id; + /* Dynamic contacts are delimited with ";@" and static ones with "@@" */ + if ((contact = strstr(id, ";@")) || (contact = strstr(id, "@@"))) { + *contact = '\0'; + contact += 2; + } else { + contact = id; + } + + ast_test_suite_event_notify("AOR_CONTACT_UPDATE", + "Contact: %s\r\n" + "Status: %s", + ast_sorcery_object_get_id(contact_status), + (contact_status->status == AVAILABLE ? "Available" : "Unavailable")); + + ast_verb(1, "Contact %s/%s is now %s\n", aor, contact, + contact_status->status == AVAILABLE ? "Available" : "Unavailable"); + + ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor); +} + +/*! \brief Observer for contacts so state can be updated on respective endpoints */ +static const struct ast_sorcery_observer state_contact_status_observer = { + .updated = persistent_endpoint_contact_status_observer, +}; static int dtmf_handler(const struct aco_option *opt, struct ast_variable *var, void *obj) { @@ -1796,6 +1894,7 @@ int ast_res_pjsip_initialize_configuration(const struct ast_module_info *ast_mod } ast_sorcery_observer_add(sip_sorcery, "contact", &state_contact_observer); + ast_sorcery_observer_add(sip_sorcery, CONTACT_STATUS, &state_contact_status_observer); if (ast_sip_initialize_sorcery_domain_alias()) { ast_log(LOG_ERROR, "Failed to register SIP domain aliases support with sorcery\n"); @@ -1852,6 +1951,8 @@ int ast_res_pjsip_initialize_configuration(const struct ast_module_info *ast_mod void ast_res_pjsip_destroy_configuration(void) { + ast_sorcery_observer_remove(sip_sorcery, CONTACT_STATUS, &state_contact_status_observer); + ast_sorcery_observer_remove(sip_sorcery, "contact", &state_contact_observer); ast_sip_destroy_sorcery_global(); ast_sip_destroy_sorcery_location(); ast_sip_destroy_sorcery_auth(); diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c index 9794827b56..dc5a70175b 100644 --- a/res/res_pjsip/pjsip_options.c +++ b/res/res_pjsip/pjsip_options.c @@ -28,6 +28,7 @@ #include "asterisk/astobj2.h" #include "asterisk/cli.h" #include "asterisk/time.h" +#include "asterisk/test.h" #include "include/res_pjsip_private.h" #define DEFAULT_LANGUAGE "en" @@ -110,18 +111,20 @@ static void update_contact_status(const struct ast_sip_contact *contact, status = find_or_create_contact_status(contact); if (!status) { + ast_log(LOG_ERROR, "Unable to find ast_sip_contact_status for contact %s\n", + contact->uri); return; } update = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS, ast_sorcery_object_get_id(status)); if (!update) { - ast_log(LOG_ERROR, "Unable to create update ast_sip_contact_status for contact %s\n", + ast_log(LOG_ERROR, "Unable to allocate ast_sip_contact_status for contact %s\n", contact->uri); - ao2_ref(status, -1); return; } + update->last_status = status->status; update->status = value; /* if the contact is available calculate the rtt as @@ -131,13 +134,21 @@ static void update_contact_status(const struct ast_sip_contact *contact, update->rtt_start = ast_tv(0, 0); + ast_test_suite_event_notify("AOR_CONTACT_QUALIFY_RESULT", + "Contact: %s\r\n" + "Status: %s\r\n" + "RTT: %ld", + ast_sorcery_object_get_id(update), + (update->status == AVAILABLE ? "Available" : "Unavailable"), + update->rtt); + if (ast_sorcery_update(ast_sip_get_sorcery(), update)) { ast_log(LOG_ERROR, "Unable to update ast_sip_contact_status for contact %s\n", contact->uri); } - ao2_ref(update, -1); ao2_ref(status, -1); + ao2_ref(update, -1); } /*! @@ -152,18 +163,22 @@ static void init_start_time(const struct ast_sip_contact *contact) status = find_or_create_contact_status(contact); if (!status) { + ast_log(LOG_ERROR, "Unable to find ast_sip_contact_status for contact %s\n", + contact->uri); return; } update = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS, ast_sorcery_object_get_id(status)); if (!update) { - ast_log(LOG_ERROR, "Unable to create update ast_sip_contact_status for contact %s\n", + ast_log(LOG_ERROR, "Unable to copy ast_sip_contact_status for contact %s\n", contact->uri); - ao2_ref(status, -1); return; } + update->status = status->status; + update->last_status = status->last_status; + update->rtt = status->rtt; update->rtt_start = ast_tvnow(); if (ast_sorcery_update(ast_sip_get_sorcery(), update)) { @@ -171,8 +186,8 @@ static void init_start_time(const struct ast_sip_contact *contact) contact->uri); } - ao2_ref(update, -1); ao2_ref(status, -1); + ao2_ref(update, -1); } /*! @@ -320,7 +335,7 @@ static int qualify_contact(struct ast_sip_endpoint *endpoint, struct ast_sip_con init_start_time(contact); ao2_ref(contact, +1); - if (ast_sip_send_request(tdata, NULL, endpoint_local, contact, qualify_contact_cb) + if (ast_sip_send_out_of_dialog_request(tdata, endpoint_local, (int)(contact->qualify_timeout * 1000), contact, qualify_contact_cb) != PJ_SUCCESS) { ast_log(LOG_ERROR, "Unable to send request to qualify contact %s\n", contact->uri); @@ -923,6 +938,32 @@ static int sched_qualifies_cmp_fn(void *obj, void *arg, int flags) return CMP_MATCH; } +static int rtt_start_handler(const struct aco_option *opt, + struct ast_variable *var, void *obj) +{ + struct ast_sip_contact_status *status = obj; + long int sec, usec; + + if (sscanf(var->value, "%ld.%06ld", &sec, &usec) != 2) { + return -1; + } + + status->rtt_start = ast_tv(sec, usec); + + return 0; +} + +static int rtt_start_to_str(const void *obj, const intptr_t *args, char **buf) +{ + const struct ast_sip_contact_status *status = obj; + + if (ast_asprintf(buf, "%ld.%06ld", status->rtt_start.tv_sec, status->rtt_start.tv_usec) == -1) { + return -1; + } + + return 0; +} + int ast_sip_initialize_sorcery_qualify(void) { struct ast_sorcery *sorcery = ast_sip_get_sorcery(); @@ -936,10 +977,14 @@ int ast_sip_initialize_sorcery_qualify(void) return -1; } - ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "rtt", "0", OPT_UINT_T, - 1, FLDSET(struct ast_sip_contact_status, status)); - ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "rtt", "0", OPT_UINT_T, - 1, FLDSET(struct ast_sip_contact_status, rtt)); + ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "last_status", + "0", OPT_UINT_T, 1, FLDSET(struct ast_sip_contact_status, last_status)); + ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "status", + "0", OPT_UINT_T, 1, FLDSET(struct ast_sip_contact_status, status)); + ast_sorcery_object_field_register_custom_nodoc(sorcery, CONTACT_STATUS, "rtt_start", + "0.0", rtt_start_handler, rtt_start_to_str, NULL, 0, 0); + ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "rtt", + "0", OPT_UINT_T, 1, FLDSET(struct ast_sip_contact_status, rtt)); return 0; } @@ -951,6 +996,7 @@ static int qualify_and_schedule_cb(void *obj, void *arg, int flags) int initial_interval; contact->qualify_frequency = aor->qualify_frequency; + contact->qualify_timeout = aor->qualify_timeout; contact->authenticate_qualify = aor->authenticate_qualify; /* Delay initial qualification by a random fraction of the specified interval */ From c6ed681638f5e481001839712fe2d6d5e1fa526c Mon Sep 17 00:00:00 2001 From: George Joseph Date: Sat, 11 Apr 2015 16:04:32 -0600 Subject: [PATCH 3/3] res_pjsip: Add global option to limit the maximum time for initial qualifies Currently when Asterisk starts initial qualifies of contacts are spread out randomly between 0 and qualify_timeout to prevent network and system overload. If a contact's qualify_frequency is 5 minutes however, that contact may be unavailable to accept calls for the entire 5 minutes after startup. So while staggering the initial qualifies is a good idea, basing the time on qualify_timeout could leave contacts unavailable for too long. This patch adds a new global parameter "max_initial_qualify_time" that sets the maximum time for the initial qualifies. This way you could make sure that all your contacts are initialy, randomly qualified within say 30 seconds but still have the contact's ongoing qualifies at a 5 minute interval. If max_initial_qualify_time is > 0, the formula is initial_interval = min(max_initial_interval, qualify_timeout * random(). If not set, qualify_timeout is used. The default is "0" (disabled). ASTERISK-24863 #close Change-Id: Ib80498aa1ea9923277bef51d6a9015c9c79740f4 Tested-by: George Joseph --- CHANGES | 4 ++++ configs/samples/pjsip.conf.sample | 5 ++++- ...5e89_add_pjsip_max_initial_qualify_time.py | 20 ++++++++++++++++++ include/asterisk/res_pjsip.h | 8 +++++++ res/res_pjsip.c | 4 ++++ res/res_pjsip/config_global.c | 21 +++++++++++++++++++ res/res_pjsip/pjsip_options.c | 10 +++++++-- 7 files changed, 69 insertions(+), 3 deletions(-) create mode 100644 contrib/ast-db-manage/config/versions/a541e0b5e89_add_pjsip_max_initial_qualify_time.py diff --git a/CHANGES b/CHANGES index 7e54a20956..ef78d97bf2 100644 --- a/CHANGES +++ b/CHANGES @@ -147,6 +147,10 @@ res_pjsip unavailable. When any contact becomes available, the endpoint will status will change back to "Reachable". + * A new global option has been added: "max_initial_qualify_time", which + sets the maximum amount of time from startup that qualifies should be + attempted on all contacts. + res_ari_channels ------------------ * Two new events, 'ChannelHold' and 'ChannelUnhold', have been added to the diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index 57b712a873..0f95d19e00 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -869,7 +869,10 @@ ; The order by which endpoint identifiers are given priority. ; Identifier names are derived from res_pjsip_endpoint_identifier_* ; modules. (default: ip,username,anonymous) - +;max_initial_qualify_time=4 ; The maximum amount of time (in seconds) from + startup that qualifies should be attempted on all + contacts. If greater than the qualify_frequency + for an aor, qualify_frequency will be used instead. ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl ;==========================ACL SECTION OPTIONS========================= diff --git a/contrib/ast-db-manage/config/versions/a541e0b5e89_add_pjsip_max_initial_qualify_time.py b/contrib/ast-db-manage/config/versions/a541e0b5e89_add_pjsip_max_initial_qualify_time.py new file mode 100644 index 0000000000..0ffd7848da --- /dev/null +++ b/contrib/ast-db-manage/config/versions/a541e0b5e89_add_pjsip_max_initial_qualify_time.py @@ -0,0 +1,20 @@ +"""add pjsip max_initial_qualify_time + +Revision ID: a541e0b5e89 +Revises: 461d7d691209 +Create Date: 2015-04-15 14:37:36.424471 + +""" + +# revision identifiers, used by Alembic. +revision = 'a541e0b5e89' +down_revision = '461d7d691209' + +from alembic import op +import sqlalchemy as sa + +def upgrade(): + op.add_column('ps_globals', sa.Column('max_initial_qualify_time', sa.Integer)) + +def downgrade(): + op.drop_column('ps_globals', 'max_initial_qualify_time') diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 184cb57451..99b65ab088 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1995,4 +1995,12 @@ char *ast_sip_get_endpoint_identifier_order(void); */ unsigned int ast_sip_get_keep_alive_interval(void); +/*! + * \brief Retrieve the system max initial qualify time. + * + * \retval the maximum initial qualify time. + */ +unsigned int ast_sip_get_max_initial_qualify_time(void); + + #endif /* _RES_PJSIP_H */ diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 4f77b51021..e54f2c7cce 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1229,6 +1229,10 @@ The interval (in seconds) to send keepalives to active connection-oriented transports. + + The maximum amount of time from startup that qualifies should be attempted on all contacts. + If greater than the qualify_frequency for an aor, qualify_frequency will be used instead. + Must be of type 'global'. diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c index 2aa15838f4..42ba23487c 100644 --- a/res/res_pjsip/config_global.c +++ b/res/res_pjsip/config_global.c @@ -33,6 +33,7 @@ #define DEFAULT_OUTBOUND_ENDPOINT "default_outbound_endpoint" #define DEFAULT_DEBUG "no" #define DEFAULT_ENDPOINT_IDENTIFIER_ORDER "ip,username,anonymous" +#define DEFAULT_MAX_INITIAL_QUALIFY_TIME 0 static char default_useragent[256]; @@ -50,6 +51,8 @@ struct global_config { unsigned int max_forwards; /* The interval at which to send keep alive messages to active connection-oriented transports */ unsigned int keep_alive_interval; + /* The maximum time for all contacts to be qualified at startup */ + unsigned int max_initial_qualify_time; }; static void global_destructor(void *obj) @@ -161,6 +164,21 @@ unsigned int ast_sip_get_keep_alive_interval(void) return interval; } +unsigned int ast_sip_get_max_initial_qualify_time(void) +{ + unsigned int time; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MAX_INITIAL_QUALIFY_TIME; + } + + time = cfg->max_initial_qualify_time; + ao2_ref(cfg, -1); + return time; +} + /*! * \internal * \brief Observer to set default global object if none exist. @@ -271,6 +289,9 @@ int ast_sip_initialize_sorcery_global(void) ast_sorcery_object_field_register(sorcery, "global", "keep_alive_interval", __stringify(DEFAULT_KEEPALIVE_INTERVAL), OPT_UINT_T, 0, FLDSET(struct global_config, keep_alive_interval)); + ast_sorcery_object_field_register(sorcery, "global", "max_initial_qualify_time", + __stringify(DEFAULT_MAX_INITIAL_QUALIFY_TIME), + OPT_UINT_T, 0, FLDSET(struct global_config, max_initial_qualify_time)); if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) { return -1; diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c index dc5a70175b..9c0a1379d0 100644 --- a/res/res_pjsip/pjsip_options.c +++ b/res/res_pjsip/pjsip_options.c @@ -994,14 +994,20 @@ static int qualify_and_schedule_cb(void *obj, void *arg, int flags) struct ast_sip_contact *contact = obj; struct ast_sip_aor *aor = arg; int initial_interval; + int max_time = ast_sip_get_max_initial_qualify_time(); contact->qualify_frequency = aor->qualify_frequency; contact->qualify_timeout = aor->qualify_timeout; contact->authenticate_qualify = aor->authenticate_qualify; /* Delay initial qualification by a random fraction of the specified interval */ - initial_interval = contact->qualify_frequency * 1000; - initial_interval = (int)(initial_interval * ast_random_double()); + if (max_time && max_time < contact->qualify_frequency) { + initial_interval = max_time; + } else { + initial_interval = contact->qualify_frequency; + } + + initial_interval = (int)((initial_interval * 1000) * ast_random_double()); if (contact->qualify_frequency) { schedule_qualify(contact, initial_interval);