@@ -668,41 +668,17 @@ def initialize_remote_task() -> yb_dist_tests.GlobalTestConfig:
668
668
return global_conf
669
669
670
670
671
- def parallel_list_test_descriptors (rel_test_path : str ) -> Tuple [ List [str ], float ]:
671
+ def list_test_descriptors (rel_test_path : str ) -> List [str ]:
672
672
"""
673
- This is invoked in parallel to list all individual tests within our C++ test programs. Without
674
- this, listing all gtest tests across 330 test programs might take about 5 minutes on TSAN and 2
675
- minutes in debug.
673
+ This function lists all individual tests within a C++ test program.
676
674
"""
677
- # TODO: We should change this from a parallel spark-job stage to just do it serially on the
678
- # jenkins worker host.
679
- # Rationale: The time may be longer than when the above comment was written, since we have more
680
- # tests, but it is still a good trade-off.
681
- # To run the tasks on spark workers, the workspace archive has to be copied over and unpacked,
682
- # which takes a couple minutes by itself. That cost will have to be paid anyway when second
683
- # stage comes to run the tests, but some of the nodes are re-assigned to other jobs while stage
684
- # 0 is being completed, and the cost is wasted for those nodes.
685
- # Having nodes shuffled off the job while last few tasks of stage 0 are running means that
686
- # resources are assigned somewhat out of order and require more switching overhead.
687
- # Changing the jobs from 2 stages to a single stage allows much better knowledge of how many
688
- # tasks the job is actually going to run, instead of finding out only after stage 0 is complete.
689
- # This would allow much easier cluster scaling calculations.
690
- # Finally, the submitting node is generally doing nothing but waiting in a spark queue, during
691
- # that waiting time stage 0 could already be done and reduce the resource contintion of the
692
- # spark cluster workers.
693
-
694
- start_time_sec = time .time ()
695
-
696
- from yugabyte import yb_dist_tests , command_util
697
- global_conf = initialize_remote_task ()
698
-
675
+ global_conf = yb_dist_tests .get_global_conf ()
699
676
os .environ ['BUILD_ROOT' ] = global_conf .build_root
700
677
if not os .path .isdir (os .environ ['YB_THIRDPARTY_DIR' ]):
701
678
find_or_download_thirdparty_script_path = os .path .join (
702
679
global_conf .yb_src_root , 'build-support' , 'find_or_download_thirdparty.sh' )
703
680
subprocess .check_call (find_or_download_thirdparty_script_path )
704
681
705
- wait_for_path_to_exist (global_conf .build_root )
706
682
list_tests_cmd_line = [
707
683
os .path .join (global_conf .build_root , rel_test_path ), '--gtest_list_tests' ]
708
684
@@ -748,7 +724,7 @@ def parallel_list_test_descriptors(rel_test_path: str) -> Tuple[List[str], float
748
724
else :
749
725
current_test = trimmed_line
750
726
751
- return test_descriptors , time . time () - start_time_sec
727
+ return test_descriptors
752
728
753
729
754
730
def get_username () -> str :
@@ -1015,26 +991,14 @@ def collect_cpp_tests(
1015
991
else :
1016
992
app_name_details = ['{} test programs' .format (len (all_test_programs ))]
1017
993
1018
- init_spark_context (app_name_details )
1019
- set_global_conf_for_spark_jobs ()
1020
-
1021
- # Use fewer "slices" (tasks) than there are test programs, in hope to get some batching.
1022
- num_slices = (len (test_programs ) + 1 ) / 2
1023
- assert spark_context is not None
1024
- test_descriptor_lists_and_times : List [Tuple [List [str ], float ]] = run_spark_action (
1025
- lambda : spark_context .parallelize ( # type: ignore
1026
- test_programs , numSlices = num_slices ).map (parallel_list_test_descriptors ).collect ()
1027
- )
1028
- total_elapsed_time_sec = sum ([t [1 ] for t in test_descriptor_lists_and_times ])
994
+ test_descriptor_strs = one_shot_test_programs
995
+ for test_program in test_programs :
996
+ test_descriptor_strs .extend (list_test_descriptors (test_program ))
1029
997
1030
998
elapsed_time_sec = time .time () - start_time_sec
1031
- test_descriptor_strs = one_shot_test_programs + functools .reduce (
1032
- operator .add , [t [0 ] for t in test_descriptor_lists_and_times ], [])
1033
999
logging .info (
1034
1000
f"Collected the list of { len (test_descriptor_strs )} gtest tests in "
1035
- f"{ elapsed_time_sec :.2f} sec wallclock time, total time spent on Spark workers: "
1036
- f"{ total_elapsed_time_sec :.2f} sec, average time per test program: "
1037
- f"{ total_elapsed_time_sec / len (test_programs ):.2f} sec" )
1001
+ f"{ elapsed_time_sec :.2f} sec wallclock time" )
1038
1002
for test_descriptor_str in test_descriptor_strs :
1039
1003
if 'YB_DISABLE_TEST_IN_' in test_descriptor_str :
1040
1004
raise RuntimeError (
@@ -1128,7 +1092,7 @@ def collect_tests(args: argparse.Namespace) -> List[yb_dist_tests.TestDescriptor
1128
1092
1129
1093
1130
1094
def load_test_list (test_list_path : str ) -> List [yb_dist_tests .TestDescriptor ]:
1131
- logging .info ("Loading the list of tests to run from %s" , test_list_path )
1095
+ logging .info ("Loading test list from %s" , test_list_path )
1132
1096
test_descriptors = []
1133
1097
with open (test_list_path , 'r' ) as input_file :
1134
1098
for line in input_file :
@@ -1171,6 +1135,41 @@ def run_spark_action(action: Any) -> Any:
1171
1135
return results
1172
1136
1173
1137
1138
+ def report_skipped_test (test_descriptor : yb_dist_tests .TestDescriptor ) -> None :
1139
+ suite_var_name = 'YB_CSI_' + test_descriptor .language
1140
+ skip_time = time .time ()
1141
+ os .environ [suite_var_name ] = propagated_env_vars [suite_var_name ]
1142
+ csi_id = csi_report .create_test (test_descriptor , skip_time , 0 )
1143
+ csi_report .close_item (csi_id , skip_time , 'skipped' , ['muted' ])
1144
+
1145
+
1146
+ def skip_disabled_tests (test_descriptors : List [yb_dist_tests .TestDescriptor ],
1147
+ disable_list_path : str ) -> List [yb_dist_tests .TestDescriptor ]:
1148
+
1149
+ from yugabyte .test_descriptor import SimpleTestDescriptor
1150
+
1151
+ logging .info ("Loading simple test list from %s" , disable_list_path )
1152
+ disabled_tests = []
1153
+ with open (disable_list_path , 'r' ) as input_file :
1154
+ for line in input_file :
1155
+ line = line .strip ()
1156
+ if line :
1157
+ disabled_tests .append (SimpleTestDescriptor .parse (line ))
1158
+ logging .info ("Loaded %d disabled tests from %s" , len (disabled_tests ), disable_list_path )
1159
+
1160
+ enabled = []
1161
+ for td in test_descriptors :
1162
+ simple = SimpleTestDescriptor .parse (td .descriptor_str )
1163
+ for disabled in disabled_tests :
1164
+ if disabled .matches (simple ):
1165
+ report_skipped_test (td )
1166
+ break
1167
+ else :
1168
+ enabled .append (td )
1169
+ logging .info ("Remaining enabled tests: %d" , len (enabled ))
1170
+ return enabled
1171
+
1172
+
1174
1173
def main () -> None :
1175
1174
parser = argparse .ArgumentParser (
1176
1175
description = 'Run tests on Spark.' )
@@ -1186,6 +1185,17 @@ def main() -> None:
1186
1185
metavar = 'TEST_LIST_FILE' ,
1187
1186
help = 'A file with a list of tests to run. Useful when e.g. re-running '
1188
1187
'failed tests using a file produced with --failed_test_list.' )
1188
+ parser .add_argument ('--ignore_list' ,
1189
+ metavar = 'TEST_LIST_FILE' ,
1190
+ help = 'A file with a list of tests to ignore. '
1191
+ 'These tests will not be run or reported. Useful to avoid '
1192
+ 're-running passed tests of an incomplete run.' )
1193
+ parser .add_argument ('--disable_list' ,
1194
+ metavar = 'TEST_LIST_FILE' ,
1195
+ help = 'A file with a list of tests to skip. '
1196
+ 'Uses test class/name, and may omit cxx_rel_test_binary. '
1197
+ 'These tests will not be run but will report as skipped. '
1198
+ 'Useful to avoid running known flakey/broken tests.' )
1189
1199
parser .add_argument ('--build-root' , dest = 'build_root' , required = True ,
1190
1200
help = 'Build root (e.g. ~/code/yugabyte/build/debug-gcc-dynamic-community)' )
1191
1201
parser .add_argument ('--max-tests' , type = int , dest = 'max_tests' ,
@@ -1287,6 +1297,16 @@ def main() -> None:
1287
1297
fatal_error ("File specified by --test_list does not exist or is not a file: '{}'" .format (
1288
1298
test_list_path ))
1289
1299
1300
+ ignore_list_path = args .ignore_list
1301
+ if ignore_list_path and not os .path .isfile (ignore_list_path ):
1302
+ fatal_error ("File specified by --ignore_list does not exist or is not a file: '{}'" .format (
1303
+ ignore_list_path ))
1304
+
1305
+ disable_list_path = args .disable_list
1306
+ if disable_list_path and not os .path .isfile (disable_list_path ):
1307
+ fatal_error ("File specified by --disable_list does not exist or is not a file: '{}'" .format (
1308
+ disable_list_path ))
1309
+
1290
1310
global_conf = yb_dist_tests .get_global_conf ()
1291
1311
if ('YB_MVN_LOCAL_REPO' not in os .environ and
1292
1312
args .run_java_tests and
@@ -1323,8 +1343,42 @@ def main() -> None:
1323
1343
# Start the timer.
1324
1344
global_start_time = time .time ()
1325
1345
1326
- # This needs to be done before Spark context initialization, which will happen as we try to
1327
- # collect all gtest tests in all C++ test programs.
1346
+ propagate_env_vars ()
1347
+ collect_tests_time_sec : Optional [float ] = None
1348
+ if test_list_path :
1349
+ test_descriptors = load_test_list (test_list_path )
1350
+ else :
1351
+ collect_tests_start_time_sec = time .time ()
1352
+ test_descriptors = collect_tests (args )
1353
+ collect_tests_time_sec = time .time () - collect_tests_start_time_sec
1354
+
1355
+ # Includes ignored tests, assuming they already ran/passed.
1356
+ initial_num = len (test_descriptors )
1357
+ logging .info ("Initial number of tests: %d" , initial_num )
1358
+ num_repetitions = args .num_repetitions
1359
+ # For now, we are just dividing test reports into two suites by language.
1360
+ # Total number per suite planned also includes ignored & disabled tests.
1361
+ num_planned_by_language : Dict [str , int ] = defaultdict (int )
1362
+ for td in test_descriptors :
1363
+ if td .attempt_index == 1 :
1364
+ num_planned_by_language [td .language ] += 1
1365
+
1366
+ if args .save_report_to_build_dir :
1367
+ planned_report_paths = []
1368
+ planned_report_paths .append (os .path .join (global_conf .build_root , 'planned_tests.json' ))
1369
+ planned = []
1370
+ for td in test_descriptors :
1371
+ planned .append (td .descriptor_str )
1372
+ save_json_to_paths ('planned tests' , planned , planned_report_paths , should_gzip = False )
1373
+
1374
+ if ignore_list_path :
1375
+ ignored_tests = load_test_list (ignore_list_path )
1376
+ test_descriptors = [td for td in test_descriptors if td not in ignored_tests ]
1377
+ logging .info ("Ignoring %d tests from ignore list %s" , initial_num - len (test_descriptors ),
1378
+ ignore_list_path )
1379
+
1380
+ # This needs to be done before Spark context initialization.
1381
+ # And before the no-tests-to-run check, so that archive can be pre-built.
1328
1382
if args .send_archive_to_workers :
1329
1383
archive_exists = (
1330
1384
global_conf .archive_for_workers is not None and
@@ -1346,36 +1400,54 @@ def main() -> None:
1346
1400
else :
1347
1401
yb_dist_tests .compute_archive_sha256sum ()
1348
1402
1349
- propagate_env_vars ()
1350
- collect_tests_time_sec : Optional [float ] = None
1351
- if test_list_path :
1352
- test_descriptors = load_test_list (test_list_path )
1353
- else :
1354
- collect_tests_start_time_sec = time .time ()
1355
- test_descriptors = collect_tests (args )
1356
- collect_tests_time_sec = time .time () - collect_tests_start_time_sec
1357
-
1358
1403
if not test_descriptors and not args .allow_no_tests :
1359
1404
logging .info ("No tests to run" )
1360
1405
return
1361
1406
1362
- num_tests = len (test_descriptors )
1407
+ # If CSI is enabled, we need suites created before reporting any disabled tests.
1408
+ if test_descriptors :
1409
+ csi_suites : Dict [str , str ] = {}
1410
+ propagated_env_vars ['YB_CSI_REPS' ] = str (num_repetitions )
1411
+ logging .info ("Propagating env var %s (value: %s) to Spark workers" ,
1412
+ 'YB_CSI_REPS' , num_repetitions )
1413
+ csi_lqid = csi_report .launch_qid ()
1414
+ propagated_env_vars ['YB_CSI_QID' ] = csi_lqid
1415
+ logging .info ("Propagating env var %s (value: %s) to Spark workers" ,
1416
+ 'YB_CSI_QID' , csi_lqid )
1417
+ for suite_name , num_tests in num_planned_by_language .items ():
1418
+ if args .test_filter_re :
1419
+ method = "RegEx"
1420
+ elif args .test_list :
1421
+ method = "Requested"
1422
+ else :
1423
+ method = "All"
1424
+ (csi_var_name , csi_var_value ) = csi_report .create_suite (
1425
+ csi_lqid , suite_name , os .getenv ('YB_CSI_SUITE' , '' ), method , num_tests ,
1426
+ num_repetitions , time .time ())
1427
+ csi_suites [suite_name ] = csi_var_value
1428
+ propagated_env_vars [csi_var_name ] = csi_var_value
1429
+ logging .info ("Propagating env var %s (value: %s) to Spark workers" ,
1430
+ csi_var_name , csi_var_value )
1431
+
1432
+ if disable_list_path :
1433
+ test_descriptors = skip_disabled_tests (test_descriptors , disable_list_path )
1363
1434
1364
- if args .max_tests and num_tests > args .max_tests :
1435
+ # Actual number to be run after ignored & disabled.
1436
+ spark_test_cnt = len (test_descriptors )
1437
+ if args .max_tests and spark_test_cnt > args .max_tests :
1365
1438
logging .info ("Randomly selecting {} tests out of {} possible" .format (
1366
- args .max_tests , num_tests ))
1439
+ args .max_tests , spark_test_cnt ))
1367
1440
random .shuffle (test_descriptors )
1368
1441
test_descriptors = test_descriptors [:args .max_tests ]
1369
- num_tests = len (test_descriptors )
1442
+ spark_test_cnt = len (test_descriptors )
1370
1443
1371
1444
if args .verbose :
1372
1445
for test_descriptor in test_descriptors :
1373
1446
logging .info ("Will run test: {}" .format (test_descriptor ))
1374
1447
1375
- num_repetitions = args .num_repetitions
1376
- total_num_tests = num_tests * num_repetitions
1448
+ total_num_tests = spark_test_cnt * num_repetitions
1377
1449
logging .info ("Running {} tests on Spark, {} times each, for a total of {} tests" .format (
1378
- num_tests , num_repetitions , total_num_tests ))
1450
+ spark_test_cnt , num_repetitions , total_num_tests ))
1379
1451
1380
1452
if num_repetitions > 1 :
1381
1453
test_descriptors = [
@@ -1384,17 +1456,9 @@ def main() -> None:
1384
1456
for i in range (1 , num_repetitions + 1 )
1385
1457
]
1386
1458
1387
- if args .save_report_to_build_dir :
1388
- planned_report_paths = []
1389
- planned_report_paths .append (os .path .join (global_conf .build_root , 'planned_tests.json' ))
1390
- planned = []
1391
- for td in test_descriptors :
1392
- planned .append (td .descriptor_str )
1393
- save_json_to_paths ('planned tests' , planned , planned_report_paths , should_gzip = False )
1394
-
1395
1459
app_name_details = ['{} tests total' .format (total_num_tests )]
1396
1460
if num_repetitions > 1 :
1397
- app_name_details += ['{} repetitions of {} tests' .format (num_repetitions , num_tests )]
1461
+ app_name_details += ['{} repetitions of {} tests' .format (num_repetitions , spark_test_cnt )]
1398
1462
init_spark_context (app_name_details )
1399
1463
1400
1464
set_global_conf_for_spark_jobs ()
@@ -1439,35 +1503,6 @@ def monitor_fail_count(stop_event: threading.Event) -> None:
1439
1503
total_num_tests , len (test_descriptors ))
1440
1504
test_phase_start_time = time .time ()
1441
1505
1442
- # For now, we are just dividing test reports into two suites by language.
1443
- num_planned_by_language : Dict [str , int ] = defaultdict (int )
1444
- for td in test_descriptors :
1445
- if td .attempt_index == 1 :
1446
- num_planned_by_language [td .language ] += 1
1447
-
1448
- csi_suites : Dict [str , str ] = {}
1449
- propagated_env_vars ['YB_CSI_REPS' ] = str (num_repetitions )
1450
- logging .info ("Propagating env var %s (value: %s) to Spark workers" ,
1451
- 'YB_CSI_REPS' , num_repetitions )
1452
- csi_lqid = csi_report .launch_qid ()
1453
- propagated_env_vars ['YB_CSI_QID' ] = csi_lqid
1454
- logging .info ("Propagating env var %s (value: %s) to Spark workers" ,
1455
- 'YB_CSI_QID' , csi_lqid )
1456
- for suite_name , num_tests in num_planned_by_language .items ():
1457
- if args .test_filter_re :
1458
- method = "RegEx"
1459
- elif args .test_list :
1460
- method = "Requested"
1461
- else :
1462
- method = "All"
1463
- (csi_var_name , csi_var_value ) = csi_report .create_suite (
1464
- csi_lqid , suite_name , os .getenv ('YB_CSI_SUITE' , '' ), method , num_tests ,
1465
- num_repetitions , test_phase_start_time )
1466
- csi_suites [suite_name ] = csi_var_value
1467
- propagated_env_vars [csi_var_name ] = csi_var_value
1468
- logging .info ("Propagating env var %s (value: %s) to Spark workers" ,
1469
- csi_var_name , csi_var_value )
1470
-
1471
1506
# Randomize test order to avoid any kind of skew.
1472
1507
random .shuffle (test_descriptors )
1473
1508
test_names_rdd = spark_context .parallelize ( # type: ignore
0 commit comments