#! /usr/bin/env python2 # # sysgen - System Generator # # # Copyright (c) 2015, Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # # Arguments: # - name of MDEF file # - name of directory for output files (optional) # Generates: # - kernel_main.c file # - kernel_main.h file (local copy) # - micro_private_types.h file (local copy) # - sysgen.h file import os import sys import subprocess import argparse # global variables describing system MIN_HEAP = 64 heap_pos_in_pool_list = -1 num_kargs = 0 num_timers = 0 num_prios = 0 task_list = [] event_list = [] mutex_list = [] sema_list = [] fifo_list = [] pipe_list = [] mbx_list = [] map_list = [] pool_list = [] group_dictionary = {} group_key_list = [] # global variables used during generation of output files do_not_edit_warning = \ "\n\n\n/* THIS FILE IS AUTOGENERATED -- DO NOT MODIFY! */\n\n\n" copyright = \ "/*\n" + \ " * Copyright (c) 2015-2017 Wind River Systems, Inc.\n" + \ " *\n" + \ " * SPDX-License-Identifier: Apache-2.0\n" + \ " */\n" output_dir = "" input_mdef_file = "" def get_cmdline_args(): """ Handle optional output directory argument """ global input_mdef_file global output_dir output_dir_help='output directory for kernel_main.*, sysgen.h, etc' input_mdef_file_help='input MDEF file' parser = argparse.ArgumentParser() parser.add_argument('-i', '--input-mdef-file', action='store', required=True, help=input_mdef_file_help) parser.add_argument('-o', '--output-dir', action='store', help=output_dir_help) args = parser.parse_args() input_mdef_file = args.input_mdef_file if (args.output_dir != None): output_dir = args.output_dir def write_file(filename, contents): """ Create file using specified name and contents """ f = open(filename, 'w') # overwrites file if it already exists f.write(contents) f.close() # # ERROR HANDLING # def sysgen_error(msg): print("\n*** sysgen error: " + msg + "\n") sys.exit(1) def error_arg_count(line): sysgen_error("invalid number of arguments on following line\n" + line) # # CREATE INTERNAL REPRESENTATION OF SYSTEM # def mdef_parse(): """ Parse MDEF file """ global num_kargs global num_timers global num_prios global MIN_HEAP global heap_pos_in_pool_list # read file contents in a single shot with open(input_mdef_file, 'r') as infile: data = infile.read() # create list of the lines, breaking at line boundaries my_list = data.splitlines() # process each line for line in my_list: words = line.split() if (len(words) == 0): continue # ignore blank line if (words[0][0] == "%"): continue # ignore comment line if (words[0] == "CONFIG"): if (len(words) != 4): error_arg_count(line) num_kargs = int(words[1]) num_timers = int(words[2]) num_prios = int(words[3]) continue if (words[0] == "TASK"): if len(words) < 6 and len(words) > 10: error_arg_count(line) p1 = 0 p2 = 0 p3 = 0 if len(words) >= 7: p1 = words[6] if len(words) >= 8: p2 = words[7] if len(words) == 9: p3 = words[8] abort = 0 if len(words) == 10: abort = words[9] task_list.append((words[1], int(words[2]), words[3], int(words[4]), words[5], p1, p2, p3, abort)) continue if (words[0] == "TASKGROUP"): if (len(words) != 2): error_arg_count(line) if words[1] in group_dictionary: continue # ignore re-definition of a task group group_bitmask = 1 << len(group_dictionary) group_dictionary[words[1]] = group_bitmask group_key_list.append(words[1]) continue if (words[0] == "EVENT"): if (len(words) != 3): error_arg_count(line) event_list.append((words[1], words[2])) continue if (words[0] == "SEMA"): if len(words) < 2 and len(words) > 4: error_arg_count(line) if len(words) == 2: sema_list.append((words[1], 0, 0xffffffff)) elif len(words) == 3: sema_list.append((words[1], int(words[2]), 0xffffffff)) else: sema_list.append((words[1], int(words[2]), int(words[3]))) continue if (words[0] == "MUTEX"): if (len(words) != 2): error_arg_count(line) mutex_list.append((words[1],)) continue if (words[0] == "FIFO"): if (len(words) != 4): error_arg_count(line) fifo_list.append((words[1], int(words[2]), int(words[3]))) continue if (words[0] == "PIPE"): if (len(words) != 3): error_arg_count(line) pipe_list.append((words[1], int(words[2]))) continue if (words[0] == "MAILBOX"): if (len(words) != 2): error_arg_count(line) mbx_list.append((words[1],)) continue if (words[0] == "MAP"): if (len(words) != 4): error_arg_count(line) map_list.append((words[1], int(words[2]), int(words[3]))) continue if (words[0] == "POOL"): if (len(words) != 5): error_arg_count(line) pool_list.append((words[1], int(words[2]), int(words[3]), int(words[4]))) continue if (words[0] == "HEAP_SIZE"): if (len(words) != 2): error_arg_count(line) heap_size = int(words[1]) heap_pos_in_pool_list = len(pool_list) pool_list.append(("_HEAP_MEM_POOL", MIN_HEAP, heap_size, 1)) continue sysgen_error("unrecognized keyword %s on following line\n%s" % (words[0], line)) # # GENERATE kernel_main.c FILE # kernel_main_c_data = "" kernel_main_c_filename_str = \ "/* kernel_main.c - kernel objects */\n\n" def kernel_main_c_out(string): """ Append a string to kernel_main.c """ global kernel_main_c_data kernel_main_c_data += string def kernel_main_c_header(): """ Generate initial portion of kernel_main.c """ kernel_main_c_out( kernel_main_c_filename_str + copyright + do_not_edit_warning + "\n" + "#include \n" + "#include \n" + "#include \n" + "#include \n" + "#include \n") def get_group_bitmask(group_str): # create bitmask of group(s) task belongs to group_bitmask = 0 group_set = group_str[1:len(group_str) - 1] # drop [] surrounding groups if (group_set != ""): group_list = group_set.split(',') for group in group_list: group_bitmask |= group_dictionary[group] return group_bitmask def is_float(x): try: float(x) return True except ValueError: return False def is_int(x): try: int(x) return True except ValueError: return False def is_number(x): return is_float(x) or is_int(x) def kernel_main_c_tasks(): global num_prios kernel_main_c_out("\n") # declare task entry points kernel_main_c_out("\n") for task in task_list: entry = task[2] if entry == "main": # We will re-use existing main_thread continue kernel_main_c_out("EXTERN_C void %s(void *, void *, void *);\n" % entry) # thread_init objects kernel_main_c_out("\n") for task in task_list: name = task[0] prio = task[1] entry = task[2] stack_size = task[3] if entry == "main": # We will re-use existing main thread continue groups = get_group_bitmask(task[4]) params = (task[5], task[6], task[7]) for param in params: if not is_number(param): kernel_main_c_out("extern void *%s;\n" % (param)); abort = task[8] if abort != 0 and abort != 'NULL': kernel_main_c_out("EXTERN_C void %s(void);\n" % abort) kernel_main_c_out( "_MDEF_THREAD_DEFINE(%s, %u, %s, %s, %s, %s, %s, %d, 0x%x);\n" % (name, int(stack_size), entry, params[0], params[1], params[2], abort, int(prio), int(groups))) def kernel_main_c_events(): """ Generate event variables """ event_type = 'struct k_alert *' # event descriptors # project-specific events for event in event_list: # if there is a handler function, it needs to be declared # before setting up the object via DEFINE_EVENT() # # in other words, no declaration if handler is NULL or 0 handler = event[1].strip().lower() if handler != "null" and handler != "0": kernel_main_c_out("extern int %s(%s event);\n" % (event[1], event_type)) kernel_main_c_out("K_ALERT_DEFINE(_k_event_obj_%s, %s, 1);\n" % (event[0], event[1])) def kernel_main_c_mutexes(): """ Generate mutex variables """ total_mutexes = len(mutex_list) if (total_mutexes == 0): return # mutex descriptors kernel_main_c_out("\n") for mutex in mutex_list: name = mutex[0] kernel_main_c_out("K_MUTEX_DEFINE(_k_mutex_obj_%s);\n" % (name)) def kernel_main_c_semas(): """ Generate semaphore variables """ total_semas = len(sema_list) if (total_semas == 0): return # semaphore descriptors kernel_main_c_out("\n") for semaphore in sema_list: name = semaphore[0] initial_count = semaphore[1] limit = semaphore[2] kernel_main_c_out("K_SEM_DEFINE(_k_sem_obj_%s, %s, %s);\n" % (name, initial_count, limit)) def kernel_main_c_fifos(): """ Generate FIFO variables """ total_fifos = len(fifo_list) if (total_fifos == 0): return kernel_main_c_out("\n") # message queue objects for fifo in fifo_list: name = fifo[0] depth = fifo[1] width = fifo[2] kernel_main_c_out("K_MSGQ_DEFINE(_k_fifo_obj_%s, %s, %s, 4);\n" % (name, width, depth)) def kernel_main_c_pipes(): """ Generate pipe variables """ total_pipes = len(pipe_list) if (total_pipes == 0): return # pipe buffers kernel_main_c_out("\n") # pipe objects for pipe in pipe_list: name = pipe[0] size = pipe[1] kernel_main_c_out("K_PIPE_DEFINE(_k_pipe_obj_%s, %d, 4);\n" % (name, size)) def kernel_main_c_mailboxes(): """ Generate mailbox variables """ total_mbxs = len(mbx_list) if (total_mbxs == 0): return kernel_main_c_out("\n") # mailbox objects for mbx in mbx_list: name = mbx[0] kernel_main_c_out("K_MBOX_DEFINE(_k_mbox_obj_%s);\n" % (name)) def kernel_main_c_maps(): """ Generate memory map variables """ total_maps = len(map_list) if (total_maps == 0): return kernel_main_c_out("\n") # memory map objects for map in map_list: name = map[0] blocks = map[1] block_size = map[2] kernel_main_c_out("K_MEM_SLAB_DEFINE(_k_mem_map_obj_%s, %s, %s, 4);\n" % (name, block_size, blocks)) def kernel_main_c_pools(): """ Generate memory pool variables """ global heap_pos_in_pool_list total_pools = len(pool_list) # pool global variables kernel_main_c_out("\nint _k_mem_pool_count = %d;\n" % (total_pools)) pool_descriptors = "" for pool in pool_list: kernel_main_c_out("\n") min_block_size = pool[1] max_block_size = pool[2] num_maximal_blocks = pool[3] pool_descriptors += "K_MEM_POOL_DEFINE(_k_mem_pool_obj_%s, %d, %d, %d, 4);\n" % \ (pool[0], min_block_size, max_block_size, num_maximal_blocks) if (heap_pos_in_pool_list != -1): kernel_main_c_out( "\nkmemory_pool_t _heap_mem_pool_ptr = " + "&_k_mem_pool_obj__HEAP_MEM_POOL;\n") kernel_main_c_out(pool_descriptors) def kernel_main_c_generate(): """ Generate kernel_main.c file """ global kernel_main_c_data kernel_main_c_header() kernel_main_c_mutexes() kernel_main_c_semas() kernel_main_c_events() kernel_main_c_maps() kernel_main_c_fifos() kernel_main_c_mailboxes() kernel_main_c_tasks() kernel_main_c_pipes() kernel_main_c_pools() write_file(output_dir + 'kernel_main.c', kernel_main_c_data) # # GENERATE sysgen.h FILE # sysgen_h_data = "" sysgen_h_filename_str = \ "/* sysgen.h - system generated kernel definitions */\n\n" sysgen_h_include_guard = "_SYSGEN__H_" sysgen_h_header_include_guard_str = \ "#ifndef " + sysgen_h_include_guard + "\n" \ "#define " + sysgen_h_include_guard + "\n\n" def generate_sysgen_h_header(): global sysgen_h_data kernel_api_file = "#include \n" sysgen_h_data += \ sysgen_h_filename_str + \ copyright + \ do_not_edit_warning + \ kernel_api_file + \ sysgen_h_header_include_guard_str + \ "\n" def generate_taskgroup_line(taskgroup, group_id): global sysgen_h_data sysgen_h_data += \ "#define " + taskgroup + " 0x%8.8x\n" % group_id def generate_sysgen_h_taskgroups(): global sysgen_h_data for group in group_key_list: generate_taskgroup_line(group, group_dictionary[group]) sysgen_h_data += "\n" def generate_obj_id_line(name, obj_id): return "#define " + name + " 0x0001%4.4x\n" % obj_id def generate_obj_id_lines(obj_types): data = "" for obj_type in obj_types: for obj in obj_type[0]: data += generate_obj_id_line(str(obj[0]), obj_type[1]) obj_type[1] += 1 if obj_type[1] > 0: data += "\n" return data def generate_sysgen_h_obj_ids(): global sysgen_h_data mutex_struct = 'k_mutex' mutex_type = 'struct k_mutex *' sem_struct = 'k_sem' sem_type = 'struct k_sem *' pipe_struct = 'k_pipe' pipe_type = 'struct k_pipe *' map_struct = 'k_mem_slab' map_type = 'struct k_mem_slab *' fifo_struct = 'k_msgq' fifo_type = 'struct k_msgq *' mbox_struct = 'k_mbox' mbox_type = 'struct k_mbox *' event_type = 'struct k_alert *' mem_pool_type = 'struct k_mem_pool' # add missing object types # mutex object ids sysgen_h_data += "\n" for mutex in mutex_list: name = mutex[0] sysgen_h_data += \ "extern struct %s _k_mutex_obj_%s;\n" % (mutex_struct, name) sysgen_h_data += \ "#define %s ((%s)&_k_mutex_obj_%s)\n\n" % (name, mutex_type, name) # semaphore object ids sysgen_h_data += "\n" for semaphore in sema_list: name = semaphore[0] sysgen_h_data += \ "extern struct %s _k_sem_obj_%s;\n" % (sem_struct, name) sysgen_h_data += \ "#define %s ((%s)&_k_sem_obj_%s)\n\n" % (name, sem_type, name) # fifo (aka message queue) object ids sysgen_h_data += "\n" for fifo in fifo_list: name = fifo[0] sysgen_h_data += \ "extern struct %s _k_fifo_obj_%s;\n" % (fifo_struct, name) sysgen_h_data += \ "#define %s ((%s)&_k_fifo_obj_%s)\n\n" % (name, fifo_type, name) # mailbox object ids sysgen_h_data += "\n" for mbx in mbx_list: name = mbx[0] sysgen_h_data += \ "extern struct %s _k_mbox_obj_%s;\n" % (mbox_struct, name) sysgen_h_data += \ "#define %s ((%s)&_k_mbox_obj_%s)\n\n" % (name, mbox_type, name) # pipe object id sysgen_h_data += "\n" for pipe in pipe_list: name = pipe[0]; sysgen_h_data += \ "extern struct %s _k_pipe_obj_%s;\n" % (pipe_struct, name) sysgen_h_data += \ "#define %s ((%s)&_k_pipe_obj_%s)\n\n" % (name, pipe_type, name) # memory map object id sysgen_h_data += "\n" for map in map_list: name = map[0]; sysgen_h_data += \ "extern struct %s _k_mem_map_obj_%s;\n" % (map_struct, name) sysgen_h_data += \ "#define %s ((%s)&_k_mem_map_obj_%s)\n" % (name, map_type, name) # task object id sysgen_h_data += "\n" for task in task_list: name = task[0]; prio = task[1] entry = task[2] stack_size = task[3] if entry == "main": # Special case: if the MDEF defines a main() thread, # re-purpose the already existing main_thread for it. sysgen_h_data += \ "#define MDEF_MAIN_STACK_SIZE %d\n" % stack_size sysgen_h_data += \ "#define MDEF_MAIN_THREAD_PRIORITY %d\n" % prio else: sysgen_h_data += \ "extern char _k_thread_obj_%s[];\n" % (name) + \ "#define %s ((k_tid_t)_k_thread_obj_%s)\n" % (name, name) # event object ids sysgen_h_data += "\n" for event in event_list: # no need to expose the irq task events if not (event[0].startswith("_TaskIrqEvt")): name = event[0]; sysgen_h_data += \ "extern struct k_alert _k_event_obj_%s;\n" % (name) sysgen_h_data += \ "#define %s (&_k_event_obj_%s)\n\n" % (name, name) # memory pool object ids for mem_pool in pool_list: name = mem_pool[0]; sysgen_h_data += \ "extern %s _k_mem_pool_obj_%s;\n" % (mem_pool_type, name) sysgen_h_data += \ "#define %s ((%s *)&_k_mem_pool_obj_%s)\n" % (name, mem_pool_type, name) # all other object ids sysgen_h_footer_include_guard_str = \ "\n#endif /* " + sysgen_h_include_guard + " */\n" def generate_sysgen_h_footer(): global sysgen_h_data sysgen_h_data += \ sysgen_h_footer_include_guard_str def sysgen_h_generate(): """ Generate sysgen.h file """ generate_sysgen_h_header() generate_sysgen_h_taskgroups() generate_sysgen_h_obj_ids() generate_sysgen_h_footer() write_file(output_dir + 'sysgen.h', sysgen_h_data) # # SYSTEM GENERATOR MAINLINE # get_cmdline_args() mdef_parse() kernel_main_c_generate() sysgen_h_generate()