From 70e647076418d114111aa76b5d3639a5b4271e94 Mon Sep 17 00:00:00 2001 From: Owl Date: Fri, 26 Sep 2025 14:32:34 -0400 Subject: bcsv and other stuff --- bcsv_funcs.py | 1169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1169 insertions(+) create mode 100644 bcsv_funcs.py (limited to 'bcsv_funcs.py') diff --git a/bcsv_funcs.py b/bcsv_funcs.py new file mode 100644 index 0000000..26686e0 --- /dev/null +++ b/bcsv_funcs.py @@ -0,0 +1,1169 @@ +import struct, math, re, io, copy +from . import file_ops + +# python file to read the important information out of a BCSV file +# will try its best to decode the information either on big/little endian +# https://humming-owl.neocities.org/smg-stuff/pages/tutorials/bcsv + +# what this file will do is the following: +# check_bcsv_file() takes the first look into the BCSV and it checks if the file is correct. +# On the way it assigns all the variables to bcsv_raw_info (smg_bcsv_raw struct). +# If the file is correct, then read_bcsv_file() will assign the actually useful variables +# to a smg_bcsv_table structure and return that structure +# +# in case of any error check_bcsv_file() returns a string that can +# be read by a human to identify what it is wrong with the BCSV file +# if all is good it will return exactly that (as a string) + +TYPE_INT_TO_STRING = ["LONG", "STRING", "FLOAT", + "LONG_2", "SHORT", "CHAR", + "STRING_OFFSET"] +TYPE_STRING_TO_INT = {"LONG": 0, "STRING": 1, "FLOAT": 2, + "LONG_2": 3, "SHORT": 4, "CHAR": 5, + "STRING_OFFSET": 6} +TYPE_INT = [0, 1, 2, 3, 4, 5, 6] +TYPE_INT_TO_SIZE = [4, 32, 4, 4, 2, 1, 4] +TYPE_INT_TO_STD_BITMASK = [0xFFFFFFFF, 0x0, 0xFFFFFFFF, + 0xFFFFFFFF, 0xFFFF, 0xFF, + 0xFFFFFFFF] +# ^ STRING type is told to be "deprecated" and +# I don't know a test BCSV file in SMG that has the type +TYPE_INT_TO_STRUCT_CH = ["I", "32s", "f", "I", "H", "B", "I"] +# ^ for python struct.unpack/pack funcs +TYPE_INT_TO_PYTHON_TYPE = [int, str, float, int, int, int, str] +# ^ last one is actually an integer but this +# list is to map the types of a smg_bcsv_table + +# path to the hash file +hashes_path = file_ops.get_base_path(__file__, True) + "/bcsv_hashes.txt" + +# get the byte size of a data type +def get_data_type_size(type_int): + if (type_int in TYPE_INT): + return TYPE_INT_TO_SIZE[type_int] + return None + +# get the python struct char to read the data type +def get_struct_read_char(type_int): + if (type_int in TYPE_INT): + return TYPE_INT_TO_STRUCT_CH[type_int] + return None + +# calculate the BCSV hash of a byte string +def calc_bytes_hash(bytes_array): + # check + if (type(bytes_array) != bytes): + return None + try: + # BCSV strings are CP932 encoded + # I am not certain about this but I will assume it for now + bytes_array.decode("cp932") + except: + return None + + # return the hash + result = 0 + string_byte_size = len(bytes_array) - 1 + if (bytes_array[-1] == 0): + string_byte_size -= 1 + for i in range(string_byte_size + 1): + # ~ print(string_byte_size - i) + result = struct.unpack(">b", bytes_array[i : i + 1])[0] + (31 * result) + return 0xFFFFFFFF & result + +# add the new hash to the bcsv_hashes.txt file +def add_new_known_hash(bytes_array): + # check if it is a CP932 encoded string + str_hash = calc_bytes_hash(bytes_array) + if (str_hash == None): + return "Not a CP932 decodable string." + + # get all the previous hashes + try: + f = open(hashes_path, "r+", encoding = "cp932") + except: + return "\"bcsv_hashes.txt\" is not a CP932 encoded file." + string_list = [] + first_non_comment_line_pos = 0 + for line in f.readlines(): + # avoid comments + if (line.startswith("#") == False): + # append the string + string_list.append(re.search("^.* ", line).group()[:-1]) + else: + first_non_comment_line_pos += len(line.encode("cp932")) + line = f.readline() + + # append the new string and sort string_list + # only if the string is not already present in the list + if ((bytes_array.decode("cp932") in string_list) == False): + string_list.append(bytes_array.decode("cp932")) + string_list.sort() + + # generate all the hashes again + hash_list = [] + for string in string_list: + hash_list.append("0x%08X" % (calc_bytes_hash(string.encode("cp932")))) + + # from the first non comment line, delete everything + # and start filling the hash list again -> string space hash + f.truncate(first_non_comment_line_pos) + f.seek(first_non_comment_line_pos) + for i in range(len(string_list)): + f.write(string_list[i]) + f.write(" ") + f.write(hash_list[i]) + f.write("\n") + + # done! + f.close() + return True + +# return the string related to a hash name +# I hope there are no name collisions! +def get_hash_string(hash_value): + rtn = "0x%08X" % (hash_value) + # ~ print(hashes_path) + f = open(hashes_path, "r", encoding = "cp932") + for line in f.readlines(): + name = line.split()[0] + if (line.startswith("#") == False and calc_bytes_hash(name.encode("cp932")) == hash_value): + rtn = name + break + f.close() + return rtn + +# return the type as a string +def get_type_string(type_int): + if (type_int in TYPE_INT): + return TYPE_INT_TO_STRING[type_int] + return None + +# return the type as a python type +def get_type_python_type(type_int): + if (type_int in TYPE_INT): + return TYPE_INT_TO_PYTHON_TYPE[type_int] + return None + +# all the raw variables on a BCSV file +class smg_bcsv_raw: + def __init__(self): + self.endian = None + self.header = self.header() + self.columns_info = [] + self.data_pool = bytes() + self.string_pool = bytes() + def __str__(self): + rtn = "### SMG_BCSV_RAW - START\n" + rtn += "Endian: %s\n" % (self.endian) + rtn += self.header.__str__() + rtn += "Column data info: hash, bitmask, offset, right-shift, type\n" + for i in range(len(self.columns_info)): + rtn += " Col[%s]: " % (i.__str__()) + rtn += self.columns_info[i].__str__() + rtn += "Data pool (size = %s):\n " % (len(self.data_pool).__str__()) + for i in range(len(self.data_pool)): + if (i % 16 == 0 and i != 0): + rtn += "\n " + rtn += " %02X" % self.data_pool[i] + rtn += "\nString pool (size = %s):\n " % (len(self.string_pool).__str__()) + for i in range(len(self.string_pool)): + if (i % 16 == 0 and i != 0): + rtn += "\n " + rtn += " %02X" % self.string_pool[i] + rtn += "\n### SMG_BCSV_RAW - END\n" + return rtn + + # header + class header: + def __init__(self): + self.row_count = None + self.col_count = None + self.data_pool_offset = None + self.row_data_length = None + def __str__(self): + rtn = "Row count: %s\n" % (self.row_count.__str__()) + rtn += "Column count: %s\n" % (self.col_count.__str__()) + rtn += "Data pool offset (hex): %s\n" % (self.data_pool_offset.__str__()) + rtn += "Row data length (bytes): %s\n" % (self.row_data_length.__str__()) + return rtn + + # cell data info + class col_cells_data_info: + def __init__(self): + self.name_hash = None + self.data_bitmask = None + self.data_offset = None + self.data_rshift = None + self.data_type = None + def __str__(self): + rtn = "0x%08X, 0x%08X, %s, %s, %s " % (self.name_hash, + self.data_bitmask, + self.data_offset.__str__(), + self.data_rshift.__str__(), + self.data_type.__str__()) + # visible type string + rtn += "(%s)\n" % (get_type_string(self.data_type)) + return rtn + +# structure with the data from the BCSV that actually matters +class smg_bcsv_table: + def __init__(self): + self.row_count = None + self.col_count = None + self.cols_info = [] # see cols_info + self.rows_data = [] # all the other data + def __str__(self): + rtn = "### SMG_BCSV_TABLE - START\n" + rtn += "Row count: %s\n" % (self.row_count) + rtn += "Column count: %s\n" % (self.col_count) + rtn += "Columns info: hash or name, bitmask, right-shift, type\n" + for i in range(len(self.cols_info)): + rtn += " Col[%d]: " % (i) + rtn += "%s" % (self.cols_info[i]) + rtn += "Row data:\n" + for i in range(len(self.rows_data)): + rtn += " Row[%s]:" % (i.__str__()) + for data in self.rows_data[i]: + rtn += " %s," % (data.__str__()) + rtn = rtn[: -1] + "\n" + rtn += "### SMG_BCSV_TABLE - END\n" + return rtn + + # column info struct + class cols_info: + # keep the same column info + def __init__(self): + self.name_or_hash = None # if hash is known as its name if not, the hash as a hex string + self.bitmask = None + self.rshift = None + self.type = None # as a string bruh + def __str__(self): + rtn = "%s, 0x%08X, %d, %s\n" % (self.name_or_hash, + self.bitmask, + self.rshift, + self.type) + return rtn + + + +# create a global variable to hold temporal information +bcsv_raw_info = None +bcsv_raw_error_str = "bcsv-raw-error: " +bcsv_table_error_str = "bcsv-table-error: " +f = None + +# main function, will read and will check while reading +# as BCSVs don't have magic, will have to check if it is well formatted +# in big endian and if it is not it will try to check if it is good in +# little endian, if both checks fail the file is bad (or I have a reading skill issue) +def read_bcsv_file(filepath_or_stream, endian): + # check params + if (((type(filepath_or_stream) != io.BytesIO) and (type(filepath_or_stream) != str)) + or (endian not in ["BIG", "LITTLE", "AUTO"])): + result = bcsv_raw_error_str + "function parameters" + print(result) + return result + + # make global variables editable + global f + global bcsv_raw_info + + # "pre read" the file + result_str = "" + if (endian == "BIG"): + result_str = check_bcsv_file(filepath_or_stream, ">") + print("big endian: %s" % (result_str)) + elif (endian == "LITTLE"): + result_str = check_bcsv_file(filepath_or_stream, "<") + print("little endian: %s" % (result_str)) + elif (endian == "AUTO"): + result_str = check_bcsv_file(filepath_or_stream, ">") + print("big endian: %s" % (result_str)) + if (big_result_str != bcsv_raw_error_str + "all good"): + result_str = check_bcsv_file(filepath_or_stream, "<") + print("little endian: %s" % (result_str)) + + # failure trying to identify the BCSV table + if ("all good" not in result_str): + return None + + # get the BCSV useful data out of that prison + # ~ print(bcsv_raw_info) + bcsv_table_info = smg_bcsv_table() + + # row and col count + bcsv_table_info.row_count = bcsv_raw_info.header.row_count + bcsv_table_info.col_count = bcsv_raw_info.header.col_count + # get the hash names/hex string (if known) + # and the column properties + for i in range(bcsv_table_info.col_count): + string = get_hash_string(bcsv_raw_info.columns_info[i].name_hash) + bcsv_table_info.cols_info.append(smg_bcsv_table.cols_info()) + bcsv_table_info.cols_info[-1].name_or_hash = string + bcsv_table_info.cols_info[-1].bitmask = bcsv_raw_info.columns_info[i].data_bitmask + bcsv_table_info.cols_info[-1].rshift = bcsv_raw_info.columns_info[i].data_rshift + bcsv_table_info.cols_info[-1].type = get_type_string(bcsv_raw_info.columns_info[i].data_type) + + # assign the row slots + for i in range(bcsv_table_info.row_count): + bcsv_table_info.rows_data.append([]) + + # get all the cell items + # iterate over the columns then the rows + # each column at a time + endian_ch = ">" if (bcsv_raw_info.endian == "BIG") else "<" + for i in range(bcsv_table_info.col_count): + # get the type, offset, endian + base_offset = bcsv_raw_info.columns_info[i].data_offset + data_type = bcsv_table_info.cols_info[i].type + + for j in range(bcsv_table_info.row_count): + value_offset = base_offset + (j * bcsv_raw_info.header.row_data_length) + # grab the specific datatype + value = None + # crazy this but I will do the bitmask and right shift even with a float + # treat integer variables as signed, it is actually a bit more readable + if (data_type == "LONG" or data_type == "LONG_2" or data_type == "STRING_OFFSET" or data_type == "FLOAT"): + value = struct.unpack(endian_ch + "I", bcsv_raw_info.data_pool[value_offset : value_offset + 4])[0] + value = (value & bcsv_table_info.cols_info[i].bitmask) >> bcsv_table_info.cols_info[i].rshift + if (data_type == "LONG" or data_type == "LONG_2"): + value = struct.unpack(">i", struct.pack(">I", value))[0] + elif (data_type == "STRING"): + value = bcsv_raw_info.data_pool[value_offset : value_offset + 32].decode("cp932").replace("\0", "") + elif (data_type == "SHORT"): + value = struct.unpack(endian_ch + "H", bcsv_raw_info.data_pool[value_offset : value_offset + 2])[0] + value = (value & bcsv_table_info.cols_info[i].bitmask) >> bcsv_table_info.cols_info[i].rshift + value = struct.unpack(">h", struct.pack(">H", value))[0] + elif (data_type == "CHAR"): + value = struct.unpack(endian_ch + "B", bcsv_raw_info.data_pool[value_offset : value_offset + 1])[0] + value = (value & bcsv_table_info.cols_info[i].bitmask) >> bcsv_table_info.cols_info[i].rshift + value = struct.unpack(">b", struct.pack(">B", value))[0] + + # check if the data type was a string offset or a float + if (data_type == "FLOAT"): + value = struct.unpack(">f", struct.pack(">I", value))[0] + elif (data_type == "STRING_OFFSET"): + string_offset = value + string_length = 0 + while (bcsv_raw_info.string_pool[string_offset + string_length] != 0): + string_length += 1 + value = bcsv_raw_info.string_pool[string_offset : string_offset + string_length].decode("cp932") + + # assign the value + bcsv_table_info.rows_data[j].append(value) + + f.close() + # ~ print(bcsv_table_info) + return bcsv_table_info + +# function to check a BCSV file before getting its full information out +def check_bcsv_file(filepath_or_stream, endian_ch): + # check its size first + file_size = file_ops.get_file_size(filepath_or_stream) + if (file_size <= 16): + return bcsv_raw_error_str + "file size - header" + + # make global variables editable + global f + global bcsv_raw_info + + # open the file if it is a filepath + if (type(filepath_or_stream) == str): + f = open(filepath_or_stream, "rb") + else: + f = filepath_or_stream + f.seek(0) + + # holder for variables + bcsv_raw_info = smg_bcsv_raw(); + + # header + + # row count, col count, row data offset, row data length + bcsv_raw_info.header.row_count = struct.unpack(endian_ch + "I", f.read(4))[0] + bcsv_raw_info.header.col_count = struct.unpack(endian_ch + "I", f.read(4))[0] + bcsv_raw_info.header.data_pool_offset = struct.unpack(endian_ch + "I", f.read(4))[0] + bcsv_raw_info.header.row_data_length = struct.unpack(endian_ch + "I", f.read(4))[0] + # row_count can be 0 + # StageData/AsteroidBlockZone.arc/stage/jmp/childobj/common/childobjinfo + if (bcsv_raw_info.header.col_count == 0): + return bcsv_raw_error_str + "col count" + # data pool offset will be read and used by the game, idk if not 4 byte aligments will work + # I know that the game crashes when reading a float not 4 byte aligned but it is + # better to keep the 4 byte alignment + if (bcsv_raw_info.header.row_count != 0): + if ((bcsv_raw_info.header.data_pool_offset >= file_size) + or (bcsv_raw_info.header.data_pool_offset % 4 != 0)): + return bcsv_raw_error_str + "row data offset" + else: # BCSVs with 0 rows + if (bcsv_raw_info.header.data_pool_offset % 4 != 0): + return bcsv_raw_error_str + "row data offset" + + # "several data cells can reference data on the size byte field" + # so I can't expect row_data_length to be something related to col_count + if (bcsv_raw_info.header.row_data_length == 0): + return bcsv_raw_error_str + "row data length" + + # check file size again + + # considering the column data info + if (bcsv_raw_info.header.row_count != 0): + if (file_size <= (16 + bcsv_raw_info.header.col_count * 12)): + return bcsv_raw_error_str + "file size - row/col count" + else: + if (file_size < (16 + bcsv_raw_info.header.col_count * 12)): + return bcsv_raw_error_str + "file size - row/col count" + # considering the data pool offset + data row length + if (bcsv_raw_info.header.row_count != 0): + if (file_size + < (bcsv_raw_info.header.data_pool_offset + + (bcsv_raw_info.header.row_count * bcsv_raw_info.header.row_data_length))): + return bcsv_raw_error_str + "file size - data pool offset/row count/row data length" + + # column data info + + # read each column data info + max_data_pool_size = 0 + for i in range(bcsv_raw_info.header.col_count): + bcsv_raw_info.columns_info.append(bcsv_raw_info.col_cells_data_info()) + # check offset and data type + bcsv_raw_info.columns_info[-1].name_hash = struct.unpack(endian_ch + "I", f.read(4))[0] + bcsv_raw_info.columns_info[-1].data_bitmask = struct.unpack(endian_ch + "I", f.read(4))[0] + bcsv_raw_info.columns_info[-1].data_offset = struct.unpack(endian_ch + "H", f.read(2))[0] + bcsv_raw_info.columns_info[-1].data_rshift = struct.unpack(endian_ch + "B", f.read(1))[0] + bcsv_raw_info.columns_info[-1].data_type = struct.unpack(endian_ch + "B", f.read(1))[0] + + # check data type + if (bcsv_raw_info.columns_info[-1].data_type > 6): + return bcsv_raw_error_str + "data type" + # check offset (BCSVs without rows are valid) + if ((bcsv_raw_info.header.row_count != 0) + and + (file_size < (bcsv_raw_info.header.data_pool_offset + + bcsv_raw_info.columns_info[-1].data_offset + + ((bcsv_raw_info.header.row_count - 1) * bcsv_raw_info.header.row_data_length) + + get_data_type_size(bcsv_raw_info.columns_info[-1].data_type)))): + return bcsv_raw_error_str + "data cell offset" + # if it is a float type, check if the float offset is a multiple of 4 + # I think this is the only alignment restriction + if ((bcsv_raw_info.columns_info[-1].data_type == 2) + and (bcsv_raw_info.columns_info[-1].data_offset % 4 != 0)): + return bcsv_raw_error_str + "float value offset" + # get the updated max data pool size + tmp = (bcsv_raw_info.columns_info[-1].data_offset + + ((bcsv_raw_info.header.row_count - 1) * bcsv_raw_info.header.row_data_length) + + get_data_type_size(bcsv_raw_info.columns_info[-1].data_type)) + if (tmp > max_data_pool_size): + max_data_pool_size = tmp + + # interesting, max_data_pool_size does not necessarily match with row_count * row_data_length + # but the last one is the actual data pool length + # StageData/AsteroidBlockZone.arc/stage/jmp/childobj/layerb/childobjinfo + max_data_pool_size = bcsv_raw_info.header.row_count * bcsv_raw_info.header.row_data_length + # there are too much 4 byte alignments, like, too much (for floats) + + # check if the data pool overflows + if (file_size < bcsv_raw_info.header.data_pool_offset + max_data_pool_size): + return bcsv_raw_error_str + "data pool size" + + # check the string offset values to check for overflow + max_string_pool_size = 0 + for cols_info in bcsv_raw_info.columns_info: + for i in range(bcsv_raw_info.header.row_count): # iterate through each row + if (cols_info.data_type == 6): # string offset + # get the offset value from the data pool + f.seek(bcsv_raw_info.header.data_pool_offset + + cols_info.data_offset + + (i * bcsv_raw_info.header.row_data_length)) + string_offset = struct.unpack(endian_ch + "I", f.read(4))[0] + # get the string size + f.seek(bcsv_raw_info.header.data_pool_offset + max_data_pool_size + string_offset) + string_size = 1 # count nul character beforehand + tmp_byte = f.read(1) # pre-read the first character + while (tmp_byte != b"\x00"): # strings 0x00 terminated is a must + if (tmp_byte == b""): # end of file reached + return bcsv_raw_error_str + "string offset" + string_size += 1 + tmp_byte = f.read(1) + # update the max string pool size + tmp = string_offset + string_size + if (tmp > max_string_pool_size): + max_string_pool_size = tmp + + # whether there is a data pool there can + # be a string pool floating and unreferenced + # or just unreferenced strings attached to the string pool + f.seek(bcsv_raw_info.header.data_pool_offset + + max_data_pool_size + + max_string_pool_size) + tmp = f.read(1) + while (tmp != b""): + max_string_pool_size += 1 + tmp = f.read(1) + + # get the data pool + f.seek(bcsv_raw_info.header.data_pool_offset) + bcsv_raw_info.data_pool = f.read(max_data_pool_size) + # get the string pool + bcsv_raw_info.string_pool = f.read(max_string_pool_size) + + # check the data pool and the string pool for the string types + # ensure they are CP932 decodable + for i in range(bcsv_raw_info.header.col_count): + for j in range(bcsv_raw_info.header.row_count): + # STRING or STRING_OFFSET types + string_offset = (bcsv_raw_info.columns_info[i].data_offset + + (j * bcsv_raw_info.header.row_data_length)) + string = b"" + if (bcsv_raw_info.columns_info[i].data_type == 1): + string = bcsv_raw_info.data_pool[string_offset : string_offset + 32] + if (string[-1] != 0): # null terminator (it is for safety) + return bcsv_raw_error_str + "string type not null terminated" + elif (bcsv_raw_info.columns_info[i].data_type == 6): + string_offset = struct.unpack(endian_ch + "I", bcsv_raw_info.data_pool[string_offset + : string_offset + 4])[0] + k = 0 + while (bcsv_raw_info.string_pool[string_offset + k] != 0): + string += bcsv_raw_info.string_pool[string_offset + k + : string_offset + k + 1] + k += 1 + # try decoding the string + try: + string.decode("cp932") + except: + return bcsv_raw_error_str + "string encoding" + + # if shit is good so far then the endian choice was probably good! + bcsv_raw_info.endian = "BIG" if (endian_ch == ">") else "LITTLE" + return bcsv_raw_error_str + "all good" + +# check if a smg_bcsv_table structure is good +def check_smg_bcsv_table(table): + + # check if the information in the smg_bcsv_table struct is valid + + # the only stuff I can check is: + # row/column count + # data types must be the known data types + # all the cells in a BCSV table must exist (not sure if this is the case always) + # type checking each of the columns against the type specified + # strings must be CP932 encoded (unsure but will do this anyways) + + # enforce structure types + if (type(table) != smg_bcsv_table): + return bcsv_table_error_str + "smg_bcsv_table struct" + + # row/col count + if (type(table.row_count) != int + or type(table.col_count) != int + # or table.row_count <= 0 # row_count can be 0 + or table.col_count <= 0): + return bcsv_table_error_str + "row/col count" + + # check cols_info + if (table.col_count != len(table.cols_info)): + return bcsv_table_error_str + "cols_info size" + for cols_info in table.cols_info: + # check cols_info struct + if (type(cols_info) != smg_bcsv_table.cols_info): + return bcsv_table_error_str + "cols_info struct" + # name or hash + if (type(cols_info.name_or_hash) != str): + return bcsv_table_error_str + "column name or hash" + try: + cols_info.name_or_hash.encode("cp932") + if (cols_info.name_or_hash.startswith("0x") or cols_info.name_or_hash.startswith("0X")): + number = int(cols_info.name_or_hash, 16) + if (number > 0xFFFFFFFF): + return bcsv_table_error_str + "column name or hash" + except: + return bcsv_table_error_str + "column name or hash" + # bitmask + if (type(cols_info.bitmask) != int or cols_info.bitmask < 0): + return bcsv_table_error_str + "column bitmask" + # right shift + if (type(cols_info.rshift) != int or cols_info.rshift < 0): + return bcsv_table_error_str + "column right shift" + # type + if (type(cols_info.type) != str or (cols_info.type in TYPE_INT_TO_STRING) == False): + return bcsv_table_error_str + "column data type" + + # check rows_data and enforce the types + if (type(table.rows_data) != list or len(table.rows_data) != table.row_count): + return bcsv_table_error_str + "rows_data list (row)" + for row in table.rows_data: + if (type(row) != list or len(row) != table.col_count): + return bcsv_table_error_str + "rows_data list (column)" + # actually check the data now + for i in range(table.col_count): + type_to_compare = TYPE_INT_TO_PYTHON_TYPE[TYPE_STRING_TO_INT[table.cols_info[i].type]] + for j in range(table.row_count): + # check the type + if (type(table.rows_data[j][i]) != type_to_compare): + return bcsv_table_error_str + "incorrect cell datatype" + # check string encoding + string_size = 0 + if (type_to_compare == str): + try: + string_size = len(table.rows_data[j][i].encode("cp932")) + except: + return bcsv_table_error_str + "string with incorrect encoding" + # if it is the STRING type, check if its encoded representation can fit in 32 bytes + # include the null terminator, although you could have out of bounds strings if you want (I think) + if (TYPE_STRING_TO_INT[table.cols_info[i].type] == 1 and string_size >= 32): + return bcsv_table_error_str + "STRING type overflow" + + # all is good (hopefully) + return bcsv_table_error_str + "all good" + +# create smg_bcsv_raw from smg_bcsv_table +# will only attempt to "compress data" into byte fields on "non-standard" bitmask/rshift values +# this "compression" will be only done on consecutive data cells +def create_smg_bcsv_raw(table, endian_ch, use_std_pad_size): + + # calls check_smg_bcsv_table() + result = check_smg_bcsv_table(table) + print(result) + if (result != bcsv_table_error_str + "all good"): + return None + + # build a new raw structure and return it + raw = smg_bcsv_raw() + raw.endian = "BIG" + if (endian_ch == "<"): + raw.endian = "LITTLE" + + # assign the easy variables + raw.header.row_count = table.row_count + raw.header.col_count = table.col_count + raw.header.data_pool_offset = 16 + table.col_count * 12 + # ^ lame calculation, this offset can be + # different and the game will read and use it >:] + + # calculate row_data_length while filling the column data + raw.header.row_data_length = 0 + i = 0 + # iterate over the column data + # do not enforce "field order" for now (because I think it is unnecessary) + # the only enforcement I will add for now is that floats need to be 4 byte aligned + # hopefully the other integer types don't need alignment (to verify experimentally) + accumulated_bitmasks = 0 + while (i < table.col_count): + # generate new column info + raw.columns_info.append(smg_bcsv_raw.col_cells_data_info()) + # name hash + if (table.cols_info[i].name_or_hash.startswith("0x")): + raw.columns_info[-1].name_hash = int(table.cols_info[i].name_or_hash, 16) + else: + raw.columns_info[-1].name_hash = calc_bytes_hash(table.cols_info[i].name_or_hash.encode("cp932")) + # bitmask, data offset, rshift and data type + raw.columns_info[-1].data_bitmask = table.cols_info[i].bitmask + raw.columns_info[-1].data_offset = 0 # to update in the following if-else + raw.columns_info[-1].data_rshift = table.cols_info[i].rshift + raw.columns_info[-1].data_type = TYPE_STRING_TO_INT[table.cols_info[i].type] + + # can be compressed? + # if it uses a weird bitmask, surely, the shift variable is read and used + # all types will be considered except for the "STRING" type and the "FLOAT" type + if ((raw.columns_info[-1].data_bitmask != TYPE_INT_TO_STD_BITMASK[raw.columns_info[-1].data_type]) + and ((accumulated_bitmasks & raw.columns_info[-1].data_bitmask) == 0) + and (i != 0) + and (raw.columns_info[-1].data_type != 1) + and (raw.columns_info[-1].data_type != 2)): + # update the accumulated_bitmasks + accumulated_bitmasks |= raw.columns_info[-1].data_bitmask + # grab the previous column data_offset + raw.columns_info[-1].data_offset = raw.columns_info[-2].data_offset + # do not update raw.header.row_data_length + # pack the data normally + else: + # reset the accumulated bitmask to this exact column bitmask + if (raw.columns_info[-1].data_type == 2): # adjust offset for float + while (raw.header.row_data_length % 4 != 0): + raw.header.row_data_length += 1 + accumulated_bitmasks = raw.columns_info[-1].data_bitmask + raw.columns_info[-1].data_offset = raw.header.row_data_length + raw.header.row_data_length += TYPE_INT_TO_SIZE[raw.columns_info[-1].data_type] + + # increase i for the next loop + i += 1 + + # populate the data pool (use the last column offset to get the length of the data pool) + # bytearray with the save + raw.data_pool = bytearray(raw.columns_info[-1].data_offset + + ((raw.header.row_count - 1) * raw.header.row_data_length) + + TYPE_INT_TO_SIZE[raw.columns_info[-1].data_type]) + + # with the offsets defined, store the data + string_pool_strings_pos = {} + string_pool_offset_pos = 0 + for i in range(table.row_count): + for j in range(table.col_count): + tmp = None + # only for integers + type_ch = None + # LONG or LONG_2 + if (raw.columns_info[j].data_type == 0 or raw.columns_info[j].data_type == 3): type_ch = "I" + # SHORT + elif (raw.columns_info[j].data_type == 4): type_ch = "H" + # CHAR + elif (raw.columns_info[j].data_type == 5): type_ch = "B" + + # LONG, LONG_2, SHORT or CHAR + if (type_ch == "I" or type_ch == "H" or type_ch == "B"): + # ~ print((table.rows_data[i][j] << raw.columns_info[j].data_rshift) & raw.columns_info[j].data_bitmask) + tmp = struct.pack(endian_ch + type_ch, + (table.rows_data[i][j] << raw.columns_info[j].data_rshift) & raw.columns_info[j].data_bitmask) + # STRING + elif (raw.columns_info[j].data_type == 1): + tmp = table.rows_data[i][j].encode("cp932") + # FLOAT + elif (raw.columns_info[j].data_type == 2): + tmp = struct.pack(endian_ch + "f", table.rows_data[i][j]) + # STRING_OFFSET + elif (raw.columns_info[j].data_type == 6): + # search if the string is already in the string pool + if (table.rows_data[i][j] in string_pool_strings_pos): + tmp = struct.pack(endian_ch + "I", string_pool_strings_pos[table.rows_data[i][j]]) + else: + encoded_string = table.rows_data[i][j].encode("cp932") + b"\x00" + tmp = struct.pack(endian_ch + "I", string_pool_offset_pos) + raw.string_pool += encoded_string + string_pool_strings_pos.update({table.rows_data[i][j] : string_pool_offset_pos}) + string_pool_offset_pos += len(encoded_string) + + # write the data + for k in range(len(tmp)): + raw.data_pool[raw.columns_info[j].data_offset + (i * raw.header.row_data_length) + k] |= tmp[k] + + # lol bytes() + raw.data_pool = bytes(raw.data_pool) + # append the last padding + pad_size = 4 + if (use_std_pad_size): + pad_size = 32 + tmp_file_size = 16 + (raw.header.col_count * 12) + len(raw.data_pool) + len(raw.string_pool) + while ((tmp_file_size % pad_size) != 0): + raw.string_pool += b"@" + tmp_file_size += 1 + + # done! + print(raw) + return raw + +# write smg_bcsv_raw send a bytes object if filepath == None +def write_smg_bcsv_raw(raw, filepath): + # create the bytes object + data = bytes() + # get endian_ch + endian_ch = ">" + if (raw.endian == "LITTLE"): + endian_ch = "<" + # header + data += struct.pack(endian_ch + "I", raw.header.row_count) + data += struct.pack(endian_ch + "I", raw.header.col_count) + data += struct.pack(endian_ch + "I", raw.header.data_pool_offset) + data += struct.pack(endian_ch + "I", raw.header.row_data_length) + # column info + for i in range(raw.header.col_count): + data += struct.pack(endian_ch + "I", raw.columns_info[i].name_hash) + data += struct.pack(endian_ch + "I", raw.columns_info[i].data_bitmask) + data += struct.pack(endian_ch + "H", raw.columns_info[i].data_offset) + data += struct.pack(endian_ch + "B", raw.columns_info[i].data_rshift) + data += struct.pack(endian_ch + "B", raw.columns_info[i].data_type) + # data pool + data += raw.data_pool + # string pool + data += raw.string_pool + # done! + if (filepath != None): + f = open(file_ops.get_path_str(filepath), "wb") + f.write(data) + f.close() + else: + return data + +# valid table operations +# single operations: +# insert/move/remove a row/col at a certain index +# change a cell value rows_data/cols_info +# change a cols_info[index].type value (can change all values of the respective column) +# +# what a command needs so that it can be executed +# operation / type of element operated / list of values needed for the operation +# +# "INSERT" / "ROW" / [10, [row to insert values]] +# insert a row at index 10 +# +# "INSERT" / "COLUMN" / [7, [col_info to insert values], [column to insert values]] +# insert a column at index 7 +# +# "MOVE" / "COLUMN" / [9, 3] +# move a column from index 9 to index 3 +# +# "REMOVE" / "ROW" / [0, [row to remove values]] +# remove the row at index 0 +# +# "REMOVE" / "COL" / [7, [col_info to remove values], [column to remove values]] +# remove the column at index 7 +# +# "EDIT" / "CELL" / ["cols_info", 3, "bitmask", "FFFF", "ABAB"] +# edit the cell cols_info[3].bitmask value from "FFFF" to "ABAB" +# +# "EDIT" / "CELL" / ["rows_data", 3, 4, "LMAO", "OAML"] +# edit the cell rows_data[3][4] value from "LMAO" to "OAML" +# +# "EDIT" / "CELL" / ["cols_info", 0, "type", "LONG", "STRING", [old column values], [new column values]] +# edit the cell cols_info[0].type value from "LONG" to "STRING" + +COMMAND_LIST = ["INSERT", "MOVE", "REMOVE", "EDIT"] +ELEMENT_TO_OP = ["ROW", "COLUMN", "CELL"] + +# determines if a type is correct for a specific value +def cell_data_is_type(type_string, value): + # check params + if (type_string not in TYPE_INT_TO_STRING): + print("value check: type is not valid") + return False + + # check value + if (type_string in ["LONG", "LONG_2"]): # LONG, LONG_2 + if (type(value) != int or value < -0x7FFFFFFF or value > 0x7FFFFFFF): + print("value check: value is not a LONG/LONG_2 type") + return False + elif (type_string == "SHORT"): # SHORT + if (type(value) != int or value < -0x7FFF or value > 0x7FFF): + print("value check: value is not a SHORT type") + return False + elif (type_string == "CHAR"): # CHAR + if (type(value) != int or value < -0x7F or value > 0x7F): + print("value check: value is not a CHAR type") + return False + elif (type_string == "FLOAT"): # FLOAT + if (type(value) != float): + print("value check: value is not a FLOAT type") + return False + elif (type_string == "STRING"): # STRING + if (type(value) != str): + print("value check: value is not a STRING type") + return False + try: + enc = value.encode("cp932") + if (len(enc) >= 32): + print("value check: STRING type encoded representation larger than 32 bytes") + return False + except: + print("value check: STRING type cannot be encoded into CP932") + return False + elif (type_string == "STRING_OFFSET"): # STRING_OFFSET + if (type(value) != str): + print("value check: value is not a STRING_OFFSET type") + return False + try: + enc = value.encode("cp932") + except: + print("value check: STRING_OFFSET type cannot be encoded into CP932") + return False + + # all good + return True + +# determines if a col_info list of values is valid +def check_col_info_values(col_info_values): + # check params + if ((type(col_info_values) != list) + or (len(col_info_values) != 4)): + print("col info check: invalid col info value list") + return False + + # hash or name + if (type(col_info_values[0]) != str): + print("col info check: name or hash is not a string") + return False + try: + col_info_values[0].encode("cp932") + if (col_info_values[0].upper().startswith("0X")): + number = int(col_info_values[0], 16) + if (number > 0xFFFFFFFF): + print("col info check: hash value larger than expected") + return False + except: + print("col info check: name is not CP932 encodable/hash cannot be interpreted as a hex string") + return False + # bitmask + if ((type(col_info_values[1]) != int) or (col_info_values[2] < 0) or (col_info_values[2] > 0xFFFFFFFF)): + print("col info check: invalid bitmask value") + return False + # rshift + if ((type(col_info_values[2]) != int) or (col_info_values[2] < 0) or (col_info_values[2] > 0xFF)): + print("col info check: invalid rshift value") + return False + # type + if (col_info_values[3] not in TYPE_INT_TO_STRING): + print("col info check: invalid type string value") + return False + + # all good + return True + +# check a smg bcsv table command +def check_table_cmd(table, operation, element, rest_of_values): + # check the table + if ("all good" not in check_smg_bcsv_table(table)): return False + # check the command, operation and element + if ((operation not in COMMAND_LIST) or (element not in ELEMENT_TO_OP)): return False + # rest_of_values_needed + if (type(rest_of_values) != list): return False + + # insert a row at a specific index with some row values + if (operation == "INSERT" and element == "ROW"): + # rest of values checking + if (len(rest_of_values) != 2): return False + insert_index = rest_of_values[0] + insert_row_values = rest_of_values[1] + if (type(insert_index) != int or insert_index < 0 or insert_index > table.row_count): return False + if (type(insert_row_values) != list or len(insert_row_values) != table.col_count): return False + # check if the elements on the row match the type of the column + for i in range(table.col_count): + if (cell_data_is_type(table.cols_info[i].type, insert_row_values[i]) == False): return False + + # insert a column at a specific index with some col_info values and some column values + elif (operation == "INSERT" and element == "COLUMN"): + # rest of values checking + if (len(rest_of_values) != 3): return False + insert_index = rest_of_values[0] + insert_col_info = rest_of_values[1] + insert_col_values = rest_of_values[2] + if (type(insert_index) != int or insert_index < 0 or insert_index > table.col_count): return False + if (check_col_info_values(insert_col_info) == False): return False + if (type(insert_col_values) != list or len(insert_col_values) != table.row_count): return False + # check if the elements on the column match the type of the column + for i in range(table.row_count): + if (cell_data_is_type(insert_col_info[3], insert_col_values[i]) == False): return False + + # remove a row from a specific index, specify the row values to be removed + elif (operation == "REMOVE" and element == "ROW"): + # rest of values checking + if (len(rest_of_values) != 2): return False + remove_index = rest_of_values[0] + remove_row_values = rest_of_values[1] + if (type(remove_index) != int or remove_index < 0 or remove_index >= table.row_count): return False + if (type(remove_row_values) != list or len(remove_row_values) != table.col_count): return False + # check if the elements on the row elements match the type of the column + for i in range(table.col_count): + if (cell_data_is_type(table.cols_info[i].type, remove_row_values[i]) == False): return False + # check if the row to remove values are equal to the values of the actual row going to be removed + if (remove_row_values != table.rows_data[remove_index]): return False + + # remove a column from a specific index, specify the col_info and the column values to be removed + elif (operation == "REMOVE" and element == "COLUMN"): + # rest of values checking + if (len(rest_of_values) != 3): return False + remove_index = rest_of_values[0] + remove_col_info = rest_of_values[1] + remove_col_values = rest_of_values[2] + if (type(remove_index) != int or remove_index < 0 or remove_index >= table.col_count): return False + if (check_col_info_values(remove_col_info) == False): return False + # check if the col_info values to remove are the same as the col_info to remove + if ((remove_col_info[0] != table.cols_info[remove_index].name_or_hash) + or (remove_col_info[1] != table.cols_info[remove_index].bitmask) + or (remove_col_info[2] != table.cols_info[remove_index].rshift) + or (remove_col_info[3] != table.cols_info[remove_index].type)): return False + if (type(remove_col_values) != list or len(remove_col_values) != table.row_count): return False + # check if the elements on the column match the type of the column + for i in range(table.row_count): + if (cell_data_is_type(remove_col_info[3], remove_col_values[i]) == False): return False + + # move a row from an index to another index + elif(operation == "MOVE" and element == "ROW"): + # rest of values checking + if (len(rest_of_values) != 2): return False + old_index = rest_of_values[0] + new_index = rest_of_values[1] + if (type(old_index) != int or old_index < 0 or old_index >= table.row_count): return False + if (type(new_index) != int or new_index < 0 or new_index >= table.row_count): return False + + # move a column from an index to another index + elif(operation == "MOVE" and element == "COLUMN"): + # rest of values checking + if (len(rest_of_values) != 2): return False + old_index = rest_of_values[0] + new_index = rest_of_values[1] + if (type(old_index) != int or old_index < 0 or old_index >= table.col_count): return False + if (type(new_index) != int or new_index < 0 or new_index >= table.col_count): return False + + # edit + elif (operation == "EDIT" and element == "CELL"): + # rest_of_values check + if (len(rest_of_values) < 5): return False + data_path = rest_of_values[0] + if (data_path not in ["cols_info", "rows_data"]): return False + # rows_data + if (data_path == "rows_data"): + if (len(rest_of_values) != 5): return False + row_index = rest_of_values[1] + col_index = rest_of_values[2] + if (type(row_index) != int or row_index < 0 or row_index >= table.row_count): return False + if (type(col_index) != int or col_index < 0 or col_index >= table.col_count): return False + old_value = rest_of_values[3] + new_value = rest_of_values[4] + if (old_value != table.row_data[row_index][col_index]): return False + if (cell_data_is_type(table.cols_info[col_index].type, new_value) == False): return False + # cols_info + elif (data_path == "cols_info"): + col_index = rest_of_values[1] + if (type(col_index) != int or col_index < 0 or col_index >= table.col_count): return False + inner_data_path = rest_of_values[2] + if (inner_data_path not in ["name_or_hash", "bitmask", "rshift", "type"]): return False + # type + if (inner_data_path == "type"): + if (len(rest_of_values) != 7): return False + old_value = rest_of_values[3] + new_value = rest_of_values[4] + if (old_value != table.cols_info[col_index].type): return False + if (new_value not in TYPE_INT_TO_STRING): return False + # reuse this same function cleverly (recursive lets goooo) + tmp = table.cols_info[col_index] + tmp_col_info = [tmp.name_or_hash, tmp.bitmask, tmp.rshift, tmp.type] + old_column_values = rest_of_values[5] + tmp_rest_of_values = [col_index, tmp_col_info, old_column_values] + if (check_table_cmd("REMOVE", "COLUMN", tmp_rest_of_values) == False): return False + tmp_col_info[3] = new_value + new_column_values = rest_of_values[6] + tmp_rest_of_values[3] = new_column_values + if (check_table_cmd("INSERT", "COLUMN", tmp_rest_of_values) == False): return False + # name_or_hash, bitmask, rshift + else: + if (len(rest_of_values) != 5): return False + # reuse check_col_info_values() + old_value = rest_of_values[3] + new_value = rest_of_values[4] + if (old_value != eval("table.cols_info[col_index].%s" % (inner_data_path))): return False + if (inner_data_path == "name_or_hash"): + if (check_col_info_values([new_value, 0, 0, "LONG"]) == False): return False + elif (inner_data_path == "bitmask"): + if (check_col_info_values(["a", new_value, 0, "LONG"]) == False): return False + elif (inner_data_path == "rshift"): + if (check_col_info_values(["a", 0, new_value, "LONG"]) == False): return False + + # all good + return True + +# execute a table command +def exec_table_cmd(table, operation, element, rest_of_values): + # check the command + if (check_table_cmd(table, operation, element, rest_of_values) == False): + return None + + # execute the operation on the table + if (operation == "INSERT"): + insert_index = rest_of_values[0] + # insert a row + if (element == "ROW"): + table.row_count += 1 + insert_list = rest_of_values[1] + # rows_data + table.rows_data = table.rows_data[ : insert_index] + [insert_list] + table.rows_data[insert_index : ] + # insert a column + elif (element == "COLUMN"): + table.col_count += 1 + insert_col_info = smg_bcsv_table.cols_info() + insert_col_info.name_or_hash = rest_of_values[1][0] + insert_col_info.bitmask = rest_of_values[1][1] + insert_col_info.rshift = rest_of_values[1][2] + insert_col_info.type = rest_of_values[1][3] + insert_col_values = rest_of_values[2] + # col_info + table.cols_info = (table.cols_info[ : insert_index] + + [insert_col_info] + + table.cols_info[insert_index : ]) + # rows_data + for i in range(table.row_count): + table.rows_data[i] = (table.rows_data[i][ : insert_index] + + [insert_col_values[i]] + + table.rows_data[i][insert_index : ]) + elif (operation == "MOVE"): + # indexes + old_index = rest_of_values[0] + new_index = rest_of_values[1] + # move a row + if (element == "ROW"): + # rows_data + to_move = table.rows_data[old_index] + table.rows_data = table.rows_data[ : old_index] + table.rows_data[old_index + 1 : ] + table.rows_data = table.rows_data[ : new_index] + [to_move] + table.rows_data[new_index : ] + # move a column + elif (element == "COLUMN"): + # cols_info + col_info_to_move = table.cols_info[old_index] + table.cols_info = table.cols_info[ : old_index] + table.cols_info[old_index + 1 : ] + table.cols_info = table.cols_info[ : new_index] + [col_info_to_move] + table.cols_info[new_index : ] + # row_data + for i in range(table.row_count): + value_to_move = table.rows_data[i][old_index] + table.rows_data[i] = table.rows_data[i][ : old_index] + table.rows_data[i][old_index + 1 : ] + table.rows_data[i] = table.rows_data[i][ : new_index] + [value_to_move] + table.rows_data[i][new_index : ] + elif (operation == "REMOVE"): + remove_index = rest_of_values[0] + # remove a row + if (element == "ROW"): + table.row_count -= 1 + # rows_data + table.rows_data = table.rows_data[ : remove_index] + table.rows_data[remove_index + 1 : ] + # remove a column + elif (element == "COLUMN"): + table.col_count -= 1 + # cols_info + table.cols_info = table.cols_info[ : remove_index] + table.cols_info[remove_index + 1: ] + # row_data + for i in range(table.row_count): + table.rows_data[i] = table.rows_data[i][ : remove_index] + table.rows_data[i][remove_index + 1 : ] + elif (operation == "EDIT"): + # edit a cell + if (element == "CELL"): + data_path = rest_of_values[0] + # rows_data cell + if (data_path == "rows_data"): + row_index = rest_of_values[1] + col_index = rest_of_values[2] + table.rows_data[row_index][col_index] = rest_of_values[4] + # cols_info cell + elif (data_path == "cols_info"): + col_index = rest_of_values[1] + inner_data_path = rest_of_values[2] + # type + if (inner_data_path == "type"): + table.cols_info[col_index].type = rest_of_values[4] + for i in range(table.row_count): + table.rows_data[i][col_index] = rest_of_values[6][i] + # name_or_hash, bitmask, rshift + else: + if (inner_data_path == "name_or_hash"): table.cols_info[col_index].name_or_hash = rest_of_values[4] + elif (inner_data_path == "bitmask"): table.cols_info[col_index].bitmask = rest_of_values[4] + elif (inner_data_path == "rshift"): table.cols_info[col_index].rshift = rest_of_values[4] + + # all good, return the command list + return [operation, element, rest_of_values] + +# assign a table reference values to another table reference +def assign_table_values(src, dest): + # src must be valid + if ("all good" not in check_smg_bcsv_table(src) + or type(dest) != smg_bcsv_table + or type(dest.cols_info) != list + or type(dest.rows_data) != list): + return False + + # assign the values + dest.row_count = src.row_count + dest.col_count = src.col_count + dest.cols_info.clear() + dest.rows_data.clear() + for i in range(src.col_count): + dest.cols_info.append(copy.deepcopy(src.cols_info[i])) + for i in range(src.row_count): + dest.rows_data.append(src.rows_data[i].copy()) + + # done! + return True -- cgit v1.2.3-70-g09d2