Source code for uatg.test_generator

# See LICENSE.incore for license details

from datetime import datetime
from getpass import getuser
from glob import glob
from multiprocessing import Pool, Manager
from os import mkdir, makedirs, remove
from os.path import join, dirname, abspath, exists, isdir, isfile
from shutil import rmtree, copyfile
from sys import exit

from ruamel.yaml import dump
from yapsy.PluginManager import PluginManager, PluginInfo

from uatg import __file__
from uatg.log import logger
from uatg.utils import create_plugins, generate_test_list, create_linker, \
    create_model_test_h, join_yaml_reports, generate_sv_components, \
    list_of_modules, rvtest_data, dump_makefile, setup_pages, \
    select_paging_modes, macros_parser

# create a manager for shared resources
process_manager = Manager()


[docs]def asm_generation_process(args): """ for every plugin, a process shall be spawned. The new process shall create an Assembly test file. """ # unpacking the args tuple plugin, config_dict, isa, test_format_string, work_tests_dir, \ make_file, module, linker_dir, uarch_dir, work_dir, \ compile_macros_dict, self_checking_dict, module_test_count_dict, page_modes = args # actual generation process check = plugin.plugin_object.execute(config_dict) name = (str(plugin.plugin_object).split(".", 1)) t_name = ((name[1].split(" ", 1))[0]) # data section for paging pages priv_asm_code = ['', '', ''] priv_asm_data = "" if check: test_gen = plugin.plugin_object.generate_asm() seq = '001' for ret_list_of_dicts in test_gen: test_name = t_name + '-' + seq assert isinstance(ret_list_of_dicts, dict) # Checking for the returned sections from each test asm_code = ret_list_of_dicts['asm_code'] try: if ret_list_of_dicts['name_postfix']: inst_name_postfix = '-' + ret_list_of_dicts['name_postfix'] else: inst_name_postfix = '' except KeyError: inst_name_postfix = '' # add inst name to test name as postfix test_name = test_name + inst_name_postfix logger.debug(f'Selected test: {test_name}') try: asm_data = ret_list_of_dicts['asm_data'] except KeyError: asm_data = rvtest_data(bit_width=0, num_vals=1, random=True) try: asm_sig = ret_list_of_dicts['asm_sig'] except KeyError: asm_sig = '\n' # create an entry in the compile_macros dict if 'rv64' in isa.lower(): compile_macros_dict[test_name] = ['XLEN=64'] else: compile_macros_dict[test_name] = ['XLEN=32'] # if self_checking is included in returned dictionary, set the value accordingly # else, default it to False try: self_checking_dict[test_name] = ret_list_of_dicts['self_checking'] except KeyError: self_checking_dict[test_name] = False list_of_env_paths = [join(dirname(__file__), 'env/arch_test_unpriv.h'), join(dirname(__file__), 'env/arch_test_priv.h')] try: available_macros = macros_parser(list_of_env_paths) for i in ret_list_of_dicts['compile_macros']: if i not in available_macros: logger.error(f'{i}: Macro undefined in arch_test.h ') raise Exception('Undefined Macro') exit() compile_macros_dict[test_name] += ret_list_of_dicts[ 'compile_macros'] except KeyError: logger.debug(f'No custom Compile macros specified for ' f'{test_name}') # generate and setup page tables based on info from plugin privileged_dict = {'page_size': 4096, 'll_pages': 64, 'paging_mode': 'sv39', 'mode': 'machine', 'enable': False} try: privileged_dict = ret_list_of_dicts['privileged_test'] except KeyError: privileged_dict['enable'] = False # check to add privileged_test macro in compile macros list if privileged_dict['enable'] == True or \ ('rvtest_mtrap_routine' in compile_macros_dict[test_name]) or \ ('rvtest_strap_routine' in compile_macros_dict[test_name]): logger.debug('This test is a privileged test. Including arch_test_priv header') compile_macros_dict[test_name] += ['privileged_test_enable'] try: pt_fault = privileged_dict['fault'] except KeyError: logger.debug("test does not generate a PT fault") pt_fault = False pass try: pt_mem_fault = privileged_dict['mem_fault'] except KeyError: pt_mem_fault = False pass try: pte_bit_dict = privileged_dict['pte_dict'] except KeyError: pte_bit_dict = None pass try: pt_megapage = privileged_dict['megapage'] except KeyError: pt_megapage = False pass try: pt_gigapage = privileged_dict['gigapage'] except KeyError: pt_gigapage = False pass try: pt_terapage = privileged_dict['terapage'] except KeyError: pt_terapage = False pass try: pt_petapage = privileged_dict['petapage'] except KeyError: pt_petapage = False pass try: pt_user_superpage = privileged_dict['user_superpage'] except KeyError: pt_user_superpage = False pass try: pt_user_supervisor_superpage = privileged_dict['user_supervisor_superpage'] except KeyError: pt_user_supervisor_superpage = False pass try: pt_misaligned_superpage = privileged_dict['misaligned_superpage'] except KeyError: pt_misaligned_superpage = False pass required_paging_modes = select_paging_modes(page_modes) current_paging_mode = privileged_dict['paging_mode'] if privileged_dict['enable']: if (privileged_dict['paging_mode'] == 'sv39') and \ (privileged_dict['mode'] == 'machine'): current_paging_mode = required_paging_modes[0] if current_paging_mode in required_paging_modes: logger.debug(f"{current_paging_mode} is in user listed " \ "paging modes") priv_asm_code, priv_asm_data = setup_pages( pte_dict=pte_bit_dict, page_size=privileged_dict['page_size'], paging_mode=current_paging_mode, valid_ll_pages=privileged_dict['ll_pages'], mode=privileged_dict['mode'], megapage=pt_megapage, gigapage=pt_gigapage, terapage=pt_terapage, petapage=pt_petapage, user_superpage=pt_user_superpage, user_supervisor_superpage=pt_user_supervisor_superpage, fault=pt_fault, mem_fault=pt_mem_fault, misaligned_superpage=pt_misaligned_superpage ) else: logger.warning( f'{current_paging_mode} is not in user listed' \ ' paging modes') logger.warning(f'skipping test generation for {test_name}') continue # Adding License, includes and macros # asm = license_str + includes + test_entry asm = (test_format_string[0] + test_format_string[1] + test_format_string[2]) # Appending Coding Macros & Instructions # asm += rvcode_begin + asm_code + rvcode_end asm += test_format_string[3] + priv_asm_code[0] + \ priv_asm_code[1] + asm_code + \ priv_asm_code[2] + test_format_string[4] # Appending RVTEST_DATA macros and data values # asm += rvtest_data_begin + asm_data + rvtest_data_end asm += test_format_string[5] + asm_data + priv_asm_data + \ test_format_string[6] # Appending RVMODEL macros # asm += rvmodel_data_begin + asm_sig + rvmodel_data_end asm += test_format_string[7] + asm_sig + \ test_format_string[8] mkdir(join(work_tests_dir, test_name)) with open(join(work_tests_dir, test_name, test_name + '.S'), 'w') as f: f.write(asm) seq = '%03d' % (int(seq, 10) + 1) logger.debug(f'Generating test for {test_name}') try: make_file[module].append(test_name) except KeyError: make_file[module] = process_manager.list([test_name]) make_file['tests'].append( (test_name, dump_makefile(isa=isa, link_path=linker_dir, test_path=join(work_tests_dir, test_name, test_name + '.S'), test_name=test_name, compile_macros=compile_macros_dict[test_name], env_path=join(uarch_dir, 'env'), work_dir=work_dir))) module_test_count_dict[f'{t_name}'] = (int(seq)) - 1 else: logger.warning(f'{t_name} is not valid for the current core ' 'configuration') logger.debug(f'Finished Generating Assembly Files for {t_name}') return True
[docs]def sv_generation_process(args): """ for every plugin, a process shall be spawned. The process shall generate System Verilog coverpoints """ # unpack the args plugin = args[0] config_dict = args[1] alias_dict = args[3] cover_list = args[4] _check = plugin.plugin_object.execute(config_dict) _name = (str(plugin.plugin_object).split(".", 1)) _test_name = ((_name[1].split(" ", 1))[0]) if _check: try: _sv = plugin.plugin_object.generate_covergroups(alias_dict) cover_list.append(_sv) logger.debug(f'Generating coverpoints SV file for {_test_name}') except AttributeError: logger.warn(f'Skipping coverpoint generation for {_test_name} as ' f'there is no gen_covergroup method ') pass else: logger.critical(f'Skipped {_test_name} as this test is not ' f'created for the current DUT configuration ') return True
[docs]def generate_tests(work_dir, linker_dir, modules, config_dict, test_list, modules_dir, index_path, paging_modes, jobs): """ The function generates ASM files for all the test classes specified within the module_dir. The user can also select the modules for which he would want the tests to be generated for. The YAPSY plugins for the tests are generated by the function automatically. The tests are created within the work directory passed by the user. A test_list is also created in the yaml format by the function. The test generator also creates a linker file as well as the header files for running the ASM files on the DUT, when required. Finally, the test generator only generates the tests whose targets are implemented in the DUT. """ uarch_dir = dirname(__file__) if work_dir: pass else: work_dir = abspath((join(uarch_dir, '../work/'))) makedirs(work_dir, exist_ok=True) logger.info(f'uatg dir is {uarch_dir}') logger.info(f'work_dir is {work_dir}') isa = 'RV64I' # yaml file containing the ISA parmaeters of the DUT isa_yaml = config_dict['isa_dict'] try: isa = isa_yaml['hart0']['ISA'] except Exception as e: logger.error(e) logger.error('Exiting UATG. ISA cannot be found/understood') exit(0) logger.info('The modules are {0}'.format((', '.join(modules)))) # creating a shared dictionary which can be accessed by all processes # stores the makefile commands make_file = process_manager.dict({ 'all': modules, 'tests': (process_manager.list()) }) # creating a shared dict to store test_list info # test_list_dict = process_manager.dict() test_list_dict = {} # creating a shared compile_macros dict # this dictionary will contain all the compile macros for each test compile_macros_dict = process_manager.dict() # Create a shared self_checking flag dict # this dictionary will store the status of self_check flag for each test self_checking_dict = process_manager.dict() if exists(join(work_dir, 'makefile')): remove(join(work_dir, 'makefile')) logger.info('****** Generating Tests ******') total_test_count_dict = {} for module in modules: # creating a shared list to display the number of tests generated # per module module_test_count_dict = process_manager.dict() module_dir = join(modules_dir, module) work_tests_dir = join(work_dir, module) # initializing make commands for individual modules logger.debug(f'Directory for {module} is {module_dir}') logger.info(f'Starting plugin Creation for {module}') create_plugins(plugins_path=module_dir, index_yaml=index_path, module=module) logger.info(f'Created plugins for {module}') username = getuser() time = ((str(datetime.now())).split("."))[0] license_str = f'# Licensing information can be found at ' \ f'LICENSE.incore\n# Test generated by user - {username}' \ f' at {time}\n\n' includes = f'#include \"model_test.h\" \n#include \"arch_test_unpriv.h\"\n' test_entry = f'RVTEST_ISA(\"{isa}\")\n\n.section .text.init\n.globl' \ f' rvtest_entry_point\nrvtest_entry_point:' rvcode_begin = '\nRVMODEL_BOOT\nRVTEST_CODE_BEGIN\n' rvcode_end = '\nRVTEST_CODE_END\nRVMODEL_HALT\n\n' rvtest_data_begin = '\nRVTEST_DATA_BEGIN\n' rvtest_data_end = '\nRVTEST_DATA_END\n\n' rvmodel_data_begin = '\nRVMODEL_DATA_BEGIN\n' rvmodel_data_end = '\nRVMODEL_DATA_END\n\n' # This function adds module directory to python path def add_module_to_path(plugin_info): import sys from os.path import dirname sys.path.insert(0, dirname(plugin_info.path)) manager = PluginManager() logger.debug('Loaded PluginManager') manager.setPluginPlaces([module_dir]) # plugins are stored in module_dir manager.locatePlugins() # passing add_module_to_path to callback ensures the run of the function # before the plugin is loaded x = manager.loadPlugins(callback=add_module_to_path) error_status = [i.error for i in x if i.error is not None] if len(error_status) > 0: for i in error_status: logger.error(str(i[0]) + ' : ' + str(i[1])) exit('Python Errors at one/multiple files') # check if prior test files are present and remove them. create new dir. if (isdir(work_tests_dir)) and \ exists(work_tests_dir): rmtree(work_tests_dir) mkdir(work_tests_dir) logger.info(f'Generating assembly tests for {module}') # test format strings test_format_string = [ license_str, includes, test_entry, rvcode_begin, rvcode_end, rvtest_data_begin, rvtest_data_end, rvmodel_data_begin, rvmodel_data_end ] # Loop around and find the plugins and writes the contents from the # plugins into an asm file arg_list = [] for plugin in manager.getAllPlugins(): arg_list.append( (plugin, config_dict, isa, test_format_string, work_tests_dir, make_file, module, linker_dir, uarch_dir, work_dir, compile_macros_dict, self_checking_dict, module_test_count_dict, paging_modes)) # multi processing process pool logger.info(f"Spawning {jobs} processes") process_pool = Pool(jobs) # creating a map of processes process_pool.map(asm_generation_process, arg_list) process_pool.close() logger.info('\n****** Count of assembly tests generated (per plugin) ' f'for {module} ******') s_no = 1 for k, v in module_test_count_dict.items(): logger.info(f'{s_no} | {k} : {v}') s_no = s_no + 1 total_test_count_dict[f'{module}'] = sum( [v for v in module_test_count_dict.values()]) logger.info( f'\nTotal number of tests generated for {module} : ' f'{total_test_count_dict[module]}\n\n') logger.info(f'Finished Generating Assembly Tests for {module}') if test_list: logger.info(f'Creating test_list for the {module}') test_list_dict.update( generate_test_list(work_tests_dir, uarch_dir, module_dir, isa, test_list_dict, compile_macros_dict, self_checking_dict)) logger.info('Assembly generation for all modules completed') with open(join(work_dir, 'makefile'), 'w') as f: logger.info('Dumping makefile') f.write('all' + ': ') f.write(' \\\n\t'.join(make_file['all'])) f.write('\n') for i in modules: f.write(i + ': ') try: f.write(' \\\n\t'.join(make_file[i])) except KeyError: logger.critical(f"\"{i}\" is a part of the module list. \n" f"But, No tests were generated by UATG for " f"module \"{i}\"") logger.critical("If this was uninteded, " "Please enable the required test(s) in the " "index.yaml file") f.write('\n') f.write('\n') for i in make_file['tests']: f.write(i[0] + ': \n\t') f.write(i[1] + '\n') if linker_dir and isfile(join(linker_dir, 'link.ld')): logger.info('Using user specified linker: ' + join(linker_dir, 'link.ld')) copyfile(join(linker_dir, 'link.ld'), work_dir + '/link.ld') else: create_linker(target_dir=work_dir) logger.info(f'Creating a linker file at {work_dir}') if linker_dir and isfile(join(linker_dir, 'model_test.h')): logger.info('Using user specified model_test file: ' + join(linker_dir, 'model_test.h')) copyfile(join(linker_dir, 'model_test.h'), work_dir + '/model_test.h') else: create_model_test_h(target_dir=work_dir) logger.info(f'Creating Model_test.h file at {work_dir}') if test_list: logger.info('Test List was generated by UATG. You can find it in ' f'the work dir{work_dir}') with open(join(work_dir, 'test_list.yaml'), 'w') as outfile: dump(test_list_dict, outfile) else: logger.info('Test list will not be generated by uatg') logger.info('\n****** Number of tests generated (per module) ' 'by UATG ******') s_no = 1 for k, v in total_test_count_dict.items(): logger.info(f'{s_no} | {k} - {v}') s_no = s_no + 1 logger.info( f'\nTotal number of tests generated by UATG in this run : ' f'{sum([v for v in total_test_count_dict.values()])}\n\n') logger.info('****** Finished Generating Tests and other dependencies' ' ******')
[docs]def generate_sv(work_dir, config_dict, modules, modules_dir, alias_dict, jobs): """ The generate_sv function dumps the covergroups written by the user into a 'coverpoints.sv' file present within the 'sv_top' directory within the work directory. This function dumps into an SV file only if the test_class contains the generate_covergroups method. This function, like generate_asm also allows to select the modules for which covergroups are to be generated. In addition, the method also takes in an alias_dict which can be used to alias the BSV signal names to something even more comprehensible. """ uarch_dir = dirname(__file__) if work_dir: pass else: work_dir = abspath((join(uarch_dir, '../work/'))) # yaml containing ISA parameters of DUT isa_yaml = config_dict['isa_dict'] logger.info('****** Generating Covergroups ******') sv_dir = join(work_dir, 'sv_top') makedirs(sv_dir, exist_ok=True) # generate the tbtop and interface files generate_sv_components(sv_dir, alias_dict) logger.debug("Generated tbtop, defines and interface files") sv_file = join(sv_dir, 'coverpoints.sv') if isfile(sv_file): logger.debug("Removing Existing coverpoints SV file") remove(sv_file) # create a shared list for storing the coverpoints cover_list = process_manager.list() for module in modules: logger.debug(f'Generating CoverPoints for {module}') module_dir = join(modules_dir, module) # yaml file with core parameters core_yaml = config_dict['core_config'] manager = PluginManager() manager.setPluginPlaces([module_dir]) manager.locatePlugins() x = manager.loadPlugins() error_status = [i.error for i in x if i.error is not None] if len(error_status) > 0: for i in error_status: logger.error(str(i[0]) + ' : ' + str(i[1])) exit('Python Errors at one/multiple files') # Loop around and find the plugins and writes the contents from the # plugins into an asm file arg_list = [] for plugin in manager.getAllPlugins(): arg_list.append( (plugin, core_yaml, isa_yaml, alias_dict, cover_list)) # multi processing process pool logger.debug(f"Spawning {jobs} processes") process_pool = Pool(jobs) # creating a map of processes process_pool.map(sv_generation_process, arg_list) process_pool.close() logger.debug(f'Finished Generating Coverpoints for {module}') with open(sv_file, 'w') as f: logger.info('Dumping the covergroups into SV file') f.write('\n'.join(cover_list)) logger.info('****** Finished Generating Covergroups ******')
[docs]def validate_tests(modules, config_dict, work_dir, modules_dir): """ Parses the log returned from the DUT for finding if the tests were successful. The user should have created regular expressions for the patterns he's expecting to be seen in the log generated by the DUT. In addition to just the checking, it can also be set up to provide a report for every test for which the user tries to validate. """ uarch_dir = dirname(__file__) logger.info('****** Validating Test results, Minimal log checking ******') if modules == ['all']: logger.debug(f'Checking {modules_dir} for modules') modules = list_of_modules(modules_dir) # del modules[-1] # Needed if list_of_modules returns 'all' along with other modules if work_dir: pass else: work_dir = abspath((join(uarch_dir, '../work/'))) _pass_ct = 0 _fail_ct = 0 _tot_ct = 1 for module in modules: module_dir = join(modules_dir, module) # module_tests_dir = join(module_dir, 'tests') work_tests_dir = join(work_dir, module) reports_dir = join(work_dir, 'reports', module) makedirs(reports_dir, exist_ok=True) # YAML with ISA paramters core_yaml = config_dict['core_config'] # isa yaml with ISA paramters isa_yaml = config_dict['isa_dict'] manager = PluginManager() manager.setPluginPlaces([module_dir]) manager.locatePlugins() x = manager.loadPlugins() error_status = [i.error for i in x if i.error is not None] if len(error_status) > 0: for i in error_status: logger.error(str(i[0]) + ' : ' + str(i[1])) exit('Python Errors at one/multiple files') logger.debug(f'Minimal Log Checking for {module}') for plugin in manager.getAllPlugins(): _name = (str(plugin.plugin_object).split(".", 1)) _test_name = ((_name[1].split(" ", 1))[0]) _check = plugin.plugin_object.execute(core_yaml, isa_yaml) _log_file_path = join(work_tests_dir, _test_name, 'log') if _check: try: _result = plugin.plugin_object.check_log( _log_file_path, reports_dir) if _result: logger.info(f'{_tot_ct}. Minimal test: {_test_name} ' f'has passed.') _pass_ct += 1 _tot_ct += 1 else: logger.critical(f"{_tot_ct}. Minimal test: " f"{_test_name} has failed.") _fail_ct += 1 _tot_ct += 1 except FileNotFoundError: logger.error(f'Log for {_test_name} not found. Run the ' f'test on DUT and generate log or check ' f'the path.') else: logger.warn(f'No asm generated for {_test_name}. Skipping') logger.debug(f'Minimal log Checking for {module} complete') logger.info("Minimal Verification Results") logger.info("=" * 28) logger.info(f"Total Tests : {_tot_ct - 1}") if _tot_ct - 1: logger.info(f"Tests Passed : {_pass_ct} - " f"[{_pass_ct // (_tot_ct - 1)} %]") logger.warn(f"Tests Failed : {_fail_ct} - " f"[{100 * _fail_ct // (_tot_ct - 1)} %]") else: logger.warn("No tests were created") logger.info('****** Finished Validating Test results ******') join_yaml_reports(work_dir) logger.info('Joined Yaml reports')
[docs]def clean_dirs(work_dir, modules_dir): """ This function cleans the files generated by UATG. Presently it removes __pycache__, work_dir directory and also removes the '.yapsy plugins' files in the module's directories. """ uarch_dir = dirname(__file__) if work_dir: pass else: work_dir = abspath((join(uarch_dir, '../work/'))) module_dir = join(work_dir, '**') # module_tests_dir = join(module_dir, 'tests') logger.info('****** Cleaning ******') logger.debug(f'work_dir is {module_dir}') yapsy_dir = join(modules_dir, '**/*.yapsy-plugin') pycache_dir = join(modules_dir, '**/__pycache__') logger.debug(f'yapsy_dir is {yapsy_dir}') logger.debug(f'pycache_dir is {pycache_dir}') tf = glob(module_dir) pf = glob(pycache_dir) + glob(join(uarch_dir, '__pycache__')) yf = glob(yapsy_dir, recursive=True) logger.debug(f'removing {tf}, {yf} and {pf}') for element in tf + pf: if isdir(element): rmtree(element) else: remove(element) for element in yf: remove(element) logger.info("Generated Test files/folders removed")