2025-08-06 13:19:20 -03:00
/*
* Asterisk - - An open source telephony toolkit .
*
* Copyright ( C ) 2025 , Sangoma Technologies Inc
*
* Joshua Colp < jcolp @ sangoma . com >
*
* See http : //www.asterisk.org for more information about
* the Asterisk project . Please do not directly contact
* any of the maintainers of this project for assistance ;
* the project provides a web site , mailing lists and IRC
* channels for your use .
*
* This program is free software , distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree .
*/
/*!
* \ file
* \ brief taskpool unit tests
*
* \ author Joshua Colp < jcolp @ sangoma . com >
*
*/
/*** MODULEINFO
< depend > TEST_FRAMEWORK < / depend >
< support_level > core < / support_level >
* * */
# include "asterisk.h"
# include "asterisk/astobj2.h"
# include "asterisk/lock.h"
# include "asterisk/logger.h"
# include "asterisk/module.h"
# include "asterisk/taskprocessor.h"
# include "asterisk/test.h"
# include "asterisk/taskpool.h"
# include "asterisk/cli.h"
struct test_data {
ast_mutex_t lock ;
ast_cond_t cond ;
int executed ;
struct ast_taskprocessor * taskprocessor ;
2025-12-04 17:08:58 -04:00
void * test_specific_data ;
2025-08-06 13:19:20 -03:00
} ;
static struct test_data * test_alloc ( void )
{
struct test_data * td = ast_calloc ( 1 , sizeof ( * td ) ) ;
if ( ! td ) {
return NULL ;
}
ast_mutex_init ( & td - > lock ) ;
ast_cond_init ( & td - > cond , NULL ) ;
return td ;
}
static void test_destroy ( struct test_data * td )
{
ast_mutex_destroy ( & td - > lock ) ;
ast_cond_destroy ( & td - > cond ) ;
ast_free ( td ) ;
}
static int simple_task ( void * data )
{
struct test_data * td = data ;
SCOPED_MUTEX ( lock , & td - > lock ) ;
td - > taskprocessor = ast_taskpool_serializer_get_current ( ) ;
td - > executed = 1 ;
ast_cond_signal ( & td - > cond ) ;
return 0 ;
}
AST_TEST_DEFINE ( taskpool_push )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct timeval start ;
struct timespec end ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool pushing test " ;
info - > description =
" Pushes a single task into a taskpool asynchronously and ensures it is executed. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
if ( ast_taskpool_push ( pool , simple_task , td ) ) {
goto end ;
}
/* It should not take more than 5 seconds for a single simple task to execute */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
AST_TEST_DEFINE ( taskpool_push_synchronous )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push_synchronous " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool synchronous pushing test " ;
info - > description =
" Pushes a single task into a taskpool synchronously and ensures it is executed. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
if ( ast_taskpool_push_wait ( pool , simple_task , td ) ) {
goto end ;
}
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
AST_TEST_DEFINE ( taskpool_push_serializer )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct ast_taskprocessor * serializer = NULL ;
struct timeval start ;
struct timespec end ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push_serializer " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool serializer pushing test " ;
info - > description =
" Pushes a single task into a taskpool serializer and ensures it is executed. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
serializer = ast_taskpool_serializer ( " serializer " , pool ) ;
if ( ! serializer ) {
goto end ;
}
if ( ast_taskprocessor_push ( serializer , simple_task , td ) ) {
goto end ;
}
/* It should not take more than 5 seconds for a single simple task to execute */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
if ( td - > taskprocessor ! = serializer ) {
ast_test_status_update ( test , " Expected taskprocessor to be same as serializer but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskprocessor_unreference ( serializer ) ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
AST_TEST_DEFINE ( taskpool_push_serializer_synchronous )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct ast_taskprocessor * serializer = NULL ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push_serializer_synchronous " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool serializer synchronous pushing test " ;
info - > description =
" Pushes a single task into a taskpool serializer synchronously and ensures it is executed. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
serializer = ast_taskpool_serializer ( " serializer " , pool ) ;
if ( ! serializer ) {
goto end ;
}
if ( ast_taskpool_serializer_push_wait ( serializer , simple_task , td ) ) {
goto end ;
}
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
if ( td - > taskprocessor ! = serializer ) {
ast_test_status_update ( test , " Expected taskprocessor to be same as serializer but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskprocessor_unreference ( serializer ) ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
static int requeue_task ( void * data )
{
return ast_taskpool_serializer_push_wait ( ast_taskpool_serializer_get_current ( ) , simple_task , data ) ;
}
AST_TEST_DEFINE ( taskpool_push_serializer_synchronous_requeue )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct ast_taskprocessor * serializer = NULL ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push_serializer_synchronous_requeue " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool serializer synchronous requeueing test " ;
info - > description =
" Pushes a single task into a taskpool serializer synchronously and ensures it is requeued and executed. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
serializer = ast_taskpool_serializer ( " serializer " , pool ) ;
if ( ! serializer ) {
goto end ;
}
if ( ast_taskpool_serializer_push_wait ( serializer , requeue_task , td ) ) {
goto end ;
}
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
if ( td - > taskprocessor ! = serializer ) {
ast_test_status_update ( test , " Expected taskprocessor to be same as serializer but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskprocessor_unreference ( serializer ) ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
AST_TEST_DEFINE ( taskpool_push_grow )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 1 ,
. minimum_size = 0 ,
. initial_size = 0 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct timeval start ;
struct timespec end ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push_grow " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool pushing test with auto-grow enabled " ;
info - > description =
" Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool grows. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
if ( ast_taskpool_taskprocessors_count ( pool ) ! = 0 ) {
ast_test_status_update ( test , " Expected taskpool to have 0 taskprocessors but it has %zu \n " , ast_taskpool_taskprocessors_count ( pool ) ) ;
res = AST_TEST_FAIL ;
goto end ;
}
if ( ast_taskpool_push ( pool , simple_task , td ) ) {
goto end ;
}
if ( ast_taskpool_taskprocessors_count ( pool ) ! = 1 ) {
ast_test_status_update ( test , " Expected taskpool to have 1 taskprocessor but it has %zu \n " , ast_taskpool_taskprocessors_count ( pool ) ) ;
res = AST_TEST_FAIL ;
goto end ;
}
/* It should not take more than 5 seconds for a single simple task to execute */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
AST_TEST_DEFINE ( taskpool_push_shrink )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 1 ,
. auto_increment = 1 ,
. minimum_size = 0 ,
. initial_size = 0 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct timeval start ;
struct timespec end ;
int iterations = 0 ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " push_shrink " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool pushing test with auto-shrink enabled " ;
info - > description =
" Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool shrinks. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
if ( ast_taskpool_taskprocessors_count ( pool ) ! = 0 ) {
ast_test_status_update ( test , " Expected taskpool to have 0 taskprocessors but it has %zu \n " , ast_taskpool_taskprocessors_count ( pool ) ) ;
res = AST_TEST_FAIL ;
goto end ;
}
if ( ast_taskpool_push ( pool , simple_task , td ) ) {
res = AST_TEST_FAIL ;
goto end ;
}
if ( ast_taskpool_taskprocessors_count ( pool ) ! = 1 ) {
ast_test_status_update ( test , " Expected taskpool to have 1 taskprocessor but it has %zu \n " , ast_taskpool_taskprocessors_count ( pool ) ) ;
res = AST_TEST_FAIL ;
goto end ;
}
/* We give 10 seconds for the pool to shrink back to normal, but if it happens earlier we
* stop our check early .
*/
ast_mutex_lock ( & td - > lock ) ;
do {
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 1 ;
end . tv_nsec = start . tv_usec * 1000 ;
if ( ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) = = ETIMEDOUT ) {
iterations + + ;
}
} while ( ast_taskpool_taskprocessors_count ( pool ) ! = 0 & & iterations ! = 10 ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
if ( ast_taskpool_taskprocessors_count ( pool ) ! = 0 ) {
ast_test_status_update ( test , " Expected taskpool to have 0 taskprocessors but it has %zu \n " , ast_taskpool_taskprocessors_count ( pool ) ) ;
res = AST_TEST_FAIL ;
goto end ;
}
end :
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
2025-12-04 17:08:58 -04:00
AST_TEST_DEFINE ( taskpool_serializer_suspension )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskprocessor * serializer = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct timeval start ;
struct timespec end ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " serializer_suspension " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool serializer suspension test " ;
info - > description =
" Pushes a single task into a taskpool serializer asynchronously while suspended, and ensures it only executes after unsuspension. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
serializer = ast_taskpool_serializer ( " serializer " , pool ) ;
if ( ! serializer ) {
goto end ;
}
ast_taskpool_serializer_suspend ( serializer ) ;
if ( ast_taskprocessor_push ( serializer , simple_task , td ) ) {
goto end ;
}
/* Give 5 seconds to ensure the task isn't executed */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( td - > executed ) {
ast_test_status_update ( test , " Expected simple task to not be executed but it was \n " ) ;
res = AST_TEST_FAIL ;
}
ast_taskpool_serializer_unsuspend ( serializer ) ;
/* Give 5 seconds to ensure the task is executed */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskprocessor_unreference ( serializer ) ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
AST_TEST_DEFINE ( taskpool_serializer_multiple_suspension )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskprocessor * serializer = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct timeval start ;
struct timespec end ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " serializer_multiple_suspension " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool serializer multiple suspension test " ;
info - > description =
" Pushes a single task into a taskpool serializer asynchronously while suspended multiple times, and ensures it only executes after unsuspension. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
serializer = ast_taskpool_serializer ( " serializer " , pool ) ;
if ( ! serializer ) {
goto end ;
}
ast_taskpool_serializer_suspend ( serializer ) ;
ast_taskpool_serializer_suspend ( serializer ) ;
if ( ast_taskprocessor_push ( serializer , simple_task , td ) ) {
goto end ;
}
/* Give 5 seconds to ensure the task isn't executed */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( td - > executed ) {
ast_test_status_update ( test , " Expected simple task to not be executed but it was \n " ) ;
res = AST_TEST_FAIL ;
}
ast_taskpool_serializer_unsuspend ( serializer ) ;
ast_taskpool_serializer_unsuspend ( serializer ) ;
/* Give 5 seconds to ensure the task is executed */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskprocessor_unreference ( serializer ) ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
static int cascade_task ( void * data )
{
struct test_data * td = data ;
if ( ast_taskpool_serializer_push_wait ( td - > test_specific_data , simple_task , td ) ) {
return - 1 ;
}
return 0 ;
}
AST_TEST_DEFINE ( taskpool_serializer_push_wait_while_suspended_from_other_serializer )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskprocessor * suspended_serializer = NULL ;
struct ast_taskprocessor * push_serializer = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 1 ,
. initial_size = 1 ,
. max_size = 1 ,
} ;
enum ast_test_result_state res = AST_TEST_PASS ;
struct timeval start ;
struct timespec end ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " serializer_push_wait_while_suspended_from_other_serializer " ;
info - > category = " /main/taskpool/ " ;
info - > summary = " Taskpool serializer push wait while suspended from other serializer test " ;
info - > description =
" Pushes a single task into a taskpool serializer synchronously from another serializer while suspended, and ensures it only executes after unsuspension. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return AST_TEST_FAIL ;
}
pool = ast_taskpool_create ( info - > name , & options ) ;
if ( ! pool ) {
goto end ;
}
suspended_serializer = ast_taskpool_serializer ( " suspended_serializer " , pool ) ;
if ( ! suspended_serializer ) {
goto end ;
}
td - > test_specific_data = suspended_serializer ;
push_serializer = ast_taskpool_serializer ( " push_serializer " , pool ) ;
if ( ! push_serializer ) {
goto end ;
}
ast_taskpool_serializer_suspend ( suspended_serializer ) ;
/* We push a task to the unsuspended serializer which will push a task to the suspended serializer
* in a synchronous fashion . That task can not execute until the suspended serializer becomes
* unsuspended .
*/
if ( ast_taskprocessor_push ( push_serializer , cascade_task , td ) ) {
goto end ;
}
/* Give 5 seconds to ensure the task isn't executed */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( td - > executed ) {
ast_test_status_update ( test , " Expected simple task to not be executed but it was \n " ) ;
res = AST_TEST_FAIL ;
}
ast_taskpool_serializer_unsuspend ( suspended_serializer ) ;
/* Give 5 seconds to ensure the task is executed */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 5 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ! td - > executed & & ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
if ( ! td - > executed ) {
ast_test_status_update ( test , " Expected simple task to be executed but it was not \n " ) ;
res = AST_TEST_FAIL ;
}
end :
ast_taskprocessor_unreference ( suspended_serializer ) ;
ast_taskprocessor_unreference ( push_serializer ) ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return res ;
}
2025-08-06 13:19:20 -03:00
struct efficiency_task_data {
struct ast_taskpool * pool ;
int num_tasks_executed ;
int shutdown ;
} ;
static int efficiency_task ( void * data )
{
struct efficiency_task_data * etd = data ;
if ( etd - > shutdown ) {
ao2_ref ( etd - > pool , - 1 ) ;
return 0 ;
}
ast_atomic_fetchadd_int ( & etd - > num_tasks_executed , + 1 ) ;
if ( ast_taskpool_push ( etd - > pool , efficiency_task , etd ) ) {
ao2_ref ( etd - > pool , - 1 ) ;
return - 1 ;
}
return 0 ;
}
static char * handle_cli_taskpool_push_efficiency ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 5 ,
. initial_size = 5 ,
. max_size = 5 ,
} ;
struct efficiency_task_data etd = {
. pool = NULL ,
. num_tasks_executed = 0 ,
. shutdown = 0 ,
} ;
struct timeval start ;
struct timespec end ;
int i ;
switch ( cmd ) {
case CLI_INIT :
e - > command = " taskpool push efficiency " ;
e - > usage =
" Usage: taskpool push efficiency \n "
" Pushes 200 tasks to a taskpool and measures \n "
" the number of tasks executed within 30 seconds. \n " ;
return NULL ;
case CLI_GENERATE :
return NULL ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return CLI_SUCCESS ;
}
pool = ast_taskpool_create ( " taskpool_push_efficiency " , & options ) ;
if ( ! pool ) {
goto end ;
}
etd . pool = pool ;
/* Push in 200 tasks, cause why not */
for ( i = 0 ; i < 200 ; i + + ) {
/* Ensure that the task has a reference to the pool */
ao2_bump ( pool ) ;
if ( ast_taskpool_push ( pool , efficiency_task , & etd ) ) {
goto end ;
}
}
/* Wait for 30 seconds */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 30 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
/* Give the total tasks executed, and tell each task to not requeue */
2025-12-04 17:08:58 -04:00
ast_cli ( a - > fd , " Total tasks executed in 30 seconds: %d (%d per second) \n " , etd . num_tasks_executed , etd . num_tasks_executed / 30 ) ;
2025-08-06 13:19:20 -03:00
end :
etd . shutdown = 1 ;
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return CLI_SUCCESS ;
}
struct serializer_efficiency_task_data {
struct ast_taskprocessor * serializer [ 2 ] ;
int * num_tasks_executed ;
int * shutdown ;
} ;
static int serializer_efficiency_task ( void * data )
{
struct serializer_efficiency_task_data * etd = data ;
struct ast_taskprocessor * taskprocessor = etd - > serializer [ 0 ] ;
if ( * etd - > shutdown ) {
return 0 ;
}
ast_atomic_fetchadd_int ( etd - > num_tasks_executed , + 1 ) ;
/* We ping pong a task between a pair of taskprocessors to ensure that
* a single taskprocessor does not receive a thread from the threadpool
* exclusively .
*/
if ( taskprocessor = = ast_taskpool_serializer_get_current ( ) ) {
taskprocessor = etd - > serializer [ 1 ] ;
}
if ( ast_taskprocessor_push ( taskprocessor ,
serializer_efficiency_task , etd ) ) {
return - 1 ;
}
return 0 ;
}
static char * handle_cli_taskpool_push_serializer_efficiency ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ast_taskpool * pool = NULL ;
struct test_data * td = NULL ;
struct ast_taskpool_options options = {
. version = AST_TASKPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. minimum_size = 5 ,
. initial_size = 5 ,
. max_size = 5 ,
} ;
struct serializer_efficiency_task_data etd [ 200 ] ;
struct timeval start ;
struct timespec end ;
int i ;
int num_tasks_executed = 0 ;
int shutdown = 0 ;
switch ( cmd ) {
case CLI_INIT :
e - > command = " taskpool push serializer efficiency " ;
e - > usage =
" Usage: taskpool push serializer efficiency \n "
" Pushes 200 tasks to a taskpool in serializers and measures \n "
" the number of tasks executed within 30 seconds. \n " ;
return NULL ;
case CLI_GENERATE :
return NULL ;
}
td = test_alloc ( ) ;
if ( ! td ) {
return CLI_SUCCESS ;
}
memset ( & etd , 0 , sizeof ( etd ) ) ;
pool = ast_taskpool_create ( " taskpool_push_serializer_efficiency " , & options ) ;
if ( ! pool ) {
goto end ;
}
/* We create 400 (200 pairs) of serializers */
for ( i = 0 ; i < 200 ; i + + ) {
char serializer_name [ AST_TASKPROCESSOR_MAX_NAME + 1 ] ;
ast_taskprocessor_build_name ( serializer_name , sizeof ( serializer_name ) , " serializer%d " , i ) ;
etd [ i ] . serializer [ 0 ] = ast_taskpool_serializer ( serializer_name , pool ) ;
if ( ! etd [ i ] . serializer [ 0 ] ) {
goto end ;
}
ast_taskprocessor_build_name ( serializer_name , sizeof ( serializer_name ) , " serializer%d " , i ) ;
etd [ i ] . serializer [ 1 ] = ast_taskpool_serializer ( serializer_name , pool ) ;
if ( ! etd [ i ] . serializer [ 1 ] ) {
goto end ;
}
etd [ i ] . num_tasks_executed = & num_tasks_executed ;
etd [ i ] . shutdown = & shutdown ;
}
/* And once created we push in 200 tasks */
for ( i = 0 ; i < 200 ; i + + ) {
if ( ast_taskprocessor_push ( etd [ i ] . serializer [ 0 ] , serializer_efficiency_task , & etd [ i ] ) ) {
goto end ;
}
}
/* Wait for 30 seconds */
start = ast_tvnow ( ) ;
end . tv_sec = start . tv_sec + 30 ;
end . tv_nsec = start . tv_usec * 1000 ;
ast_mutex_lock ( & td - > lock ) ;
while ( ast_cond_timedwait ( & td - > cond , & td - > lock , & end ) ! = ETIMEDOUT ) {
}
ast_mutex_unlock ( & td - > lock ) ;
/* Give the total tasks executed, and tell each task to not requeue */
2025-12-04 17:08:58 -04:00
ast_cli ( a - > fd , " Total tasks executed in 30 seconds: %d (%d per second) \n " , num_tasks_executed , num_tasks_executed / 30 ) ;
2025-08-06 13:19:20 -03:00
shutdown = 1 ;
end :
/* We need to unreference each serializer */
for ( i = 0 ; i < 200 ; i + + ) {
ast_taskprocessor_unreference ( etd [ i ] . serializer [ 0 ] ) ;
ast_taskprocessor_unreference ( etd [ i ] . serializer [ 1 ] ) ;
}
ast_taskpool_shutdown ( pool ) ;
test_destroy ( td ) ;
return CLI_SUCCESS ;
}
static struct ast_cli_entry cli [ ] = {
AST_CLI_DEFINE ( handle_cli_taskpool_push_efficiency , " Push tasks to a taskpool and measure efficiency " ) ,
AST_CLI_DEFINE ( handle_cli_taskpool_push_serializer_efficiency , " Push tasks to a taskpool in serializers and measure efficiency " ) ,
} ;
static int unload_module ( void )
{
ast_cli_unregister_multiple ( cli , ARRAY_LEN ( cli ) ) ;
AST_TEST_UNREGISTER ( taskpool_push ) ;
AST_TEST_UNREGISTER ( taskpool_push_synchronous ) ;
AST_TEST_UNREGISTER ( taskpool_push_serializer ) ;
AST_TEST_UNREGISTER ( taskpool_push_serializer_synchronous ) ;
AST_TEST_UNREGISTER ( taskpool_push_serializer_synchronous_requeue ) ;
AST_TEST_UNREGISTER ( taskpool_push_grow ) ;
AST_TEST_UNREGISTER ( taskpool_push_shrink ) ;
2025-12-04 17:08:58 -04:00
AST_TEST_UNREGISTER ( taskpool_serializer_suspension ) ;
AST_TEST_UNREGISTER ( taskpool_serializer_multiple_suspension ) ;
AST_TEST_UNREGISTER ( taskpool_serializer_push_wait_while_suspended_from_other_serializer ) ;
2025-08-06 13:19:20 -03:00
return 0 ;
}
static int load_module ( void )
{
ast_cli_register_multiple ( cli , ARRAY_LEN ( cli ) ) ;
AST_TEST_REGISTER ( taskpool_push ) ;
AST_TEST_REGISTER ( taskpool_push_synchronous ) ;
AST_TEST_REGISTER ( taskpool_push_serializer ) ;
AST_TEST_REGISTER ( taskpool_push_serializer_synchronous ) ;
AST_TEST_REGISTER ( taskpool_push_serializer_synchronous_requeue ) ;
AST_TEST_REGISTER ( taskpool_push_grow ) ;
AST_TEST_REGISTER ( taskpool_push_shrink ) ;
2025-12-04 17:08:58 -04:00
AST_TEST_REGISTER ( taskpool_serializer_suspension ) ;
AST_TEST_REGISTER ( taskpool_serializer_multiple_suspension ) ;
AST_TEST_REGISTER ( taskpool_serializer_push_wait_while_suspended_from_other_serializer ) ;
2025-08-06 13:19:20 -03:00
return AST_MODULE_LOAD_SUCCESS ;
}
AST_MODULE_INFO_STANDARD ( ASTERISK_GPL_KEY , " taskpool test module " ) ;