import struct, math, re, io, copy from . import file_ops # python file to read the important information out of a BCSV file # will try its best to decode the information either on big/little endian # https://humming-owl.neocities.org/smg-stuff/pages/tutorials/bcsv # what this file will do is the following: # check_bcsv_file() takes the first look into the BCSV and it checks if the file is correct. # On the way it assigns all the variables to bcsv_raw_info (smg_bcsv_raw struct). # If the file is correct, then read_bcsv_file() will assign the actually useful variables # to a smg_bcsv_table structure and return that structure # # in case of any error check_bcsv_file() returns a string that can # be read by a human to identify what it is wrong with the BCSV file # if all is good it will return exactly that (as a string) TYPE_INT_TO_STRING = ["LONG", "STRING", "FLOAT", "LONG_2", "SHORT", "CHAR", "STRING_OFFSET"] TYPE_STRING_TO_INT = {"LONG": 0, "STRING": 1, "FLOAT": 2, "LONG_2": 3, "SHORT": 4, "CHAR": 5, "STRING_OFFSET": 6} TYPE_INT = [0, 1, 2, 3, 4, 5, 6] TYPE_INT_TO_SIZE = [4, 32, 4, 4, 2, 1, 4] TYPE_INT_TO_STD_BITMASK = [0xFFFFFFFF, 0x0, 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFF, 0xFF, 0xFFFFFFFF] # ^ STRING type is told to be "deprecated" and # I don't know a test BCSV file in SMG that has the type TYPE_INT_TO_STRUCT_CH = ["I", "32s", "f", "I", "H", "B", "I"] # ^ for python struct.unpack/pack funcs TYPE_INT_TO_PYTHON_TYPE = [int, str, float, int, int, int, str] # ^ last one is actually an integer but this # list is to map the types of a smg_bcsv_table # path to the hash file hashes_path = file_ops.get_base_path(__file__, True) + "/bcsv_hashes.txt" # get the byte size of a data type def get_data_type_size(type_int): if (type_int in TYPE_INT): return TYPE_INT_TO_SIZE[type_int] return None # get the python struct char to read the data type def get_struct_read_char(type_int): if (type_int in TYPE_INT): return TYPE_INT_TO_STRUCT_CH[type_int] return None # calculate the BCSV hash of a byte string def calc_bytes_hash(bytes_array): # check if (type(bytes_array) != bytes): return None try: # BCSV strings are CP932 encoded # I am not certain about this but I will assume it for now bytes_array.decode("cp932") except: return None # return the hash result = 0 string_byte_size = len(bytes_array) - 1 if (bytes_array[-1] == 0): string_byte_size -= 1 for i in range(string_byte_size + 1): # ~ print(string_byte_size - i) result = struct.unpack(">b", bytes_array[i : i + 1])[0] + (31 * result) return 0xFFFFFFFF & result # add the new hash to the bcsv_hashes.txt file def add_new_known_hash(bytes_array): # check if it is a CP932 encoded string str_hash = calc_bytes_hash(bytes_array) if (str_hash == None): return "Not a CP932 decodable string." # get all the previous hashes try: f = open(hashes_path, "r+", encoding = "cp932") except: return "\"bcsv_hashes.txt\" is not a CP932 encoded file." string_list = [] first_non_comment_line_pos = 0 for line in f.readlines(): # avoid comments if (line.startswith("#") == False): # append the string string_list.append(re.search("^.* ", line).group()[:-1]) else: first_non_comment_line_pos += len(line.encode("cp932")) line = f.readline() # append the new string and sort string_list # only if the string is not already present in the list if ((bytes_array.decode("cp932") in string_list) == False): string_list.append(bytes_array.decode("cp932")) string_list.sort() # generate all the hashes again hash_list = [] for string in string_list: hash_list.append("0x%08X" % (calc_bytes_hash(string.encode("cp932")))) # from the first non comment line, delete everything # and start filling the hash list again -> string space hash f.truncate(first_non_comment_line_pos) f.seek(first_non_comment_line_pos) for i in range(len(string_list)): f.write(string_list[i]) f.write(" ") f.write(hash_list[i]) f.write("\n") # done! f.close() return True # return the string related to a hash name # I hope there are no name collisions! def get_hash_string(hash_value): rtn = "0x%08X" % (hash_value) # ~ print(hashes_path) f = open(hashes_path, "r", encoding = "cp932") for line in f.readlines(): name = line.split()[0] if (line.startswith("#") == False and calc_bytes_hash(name.encode("cp932")) == hash_value): rtn = name break f.close() return rtn # return the type as a string def get_type_string(type_int): if (type_int in TYPE_INT): return TYPE_INT_TO_STRING[type_int] return None # return the type as a python type def get_type_python_type(type_int): if (type_int in TYPE_INT): return TYPE_INT_TO_PYTHON_TYPE[type_int] return None # all the raw variables on a BCSV file class smg_bcsv_raw: def __init__(self): self.endian = None self.header = self.header() self.columns_info = [] self.data_pool = bytes() self.string_pool = bytes() def __str__(self): rtn = "### SMG_BCSV_RAW - START\n" rtn += "Endian: %s\n" % (self.endian) rtn += self.header.__str__() rtn += "Column data info: hash, bitmask, offset, right-shift, type\n" for i in range(len(self.columns_info)): rtn += " Col[%s]: " % (i.__str__()) rtn += self.columns_info[i].__str__() rtn += "Data pool (size = %s):\n " % (len(self.data_pool).__str__()) for i in range(len(self.data_pool)): if (i % 16 == 0 and i != 0): rtn += "\n " rtn += " %02X" % self.data_pool[i] rtn += "\nString pool (size = %s):\n " % (len(self.string_pool).__str__()) for i in range(len(self.string_pool)): if (i % 16 == 0 and i != 0): rtn += "\n " rtn += " %02X" % self.string_pool[i] rtn += "\n### SMG_BCSV_RAW - END\n" return rtn # header class header: def __init__(self): self.row_count = None self.col_count = None self.data_pool_offset = None self.row_data_length = None def __str__(self): rtn = "Row count: %s\n" % (self.row_count.__str__()) rtn += "Column count: %s\n" % (self.col_count.__str__()) rtn += "Data pool offset (hex): %s\n" % (self.data_pool_offset.__str__()) rtn += "Row data length (bytes): %s\n" % (self.row_data_length.__str__()) return rtn # cell data info class col_cells_data_info: def __init__(self): self.name_hash = None self.data_bitmask = None self.data_offset = None self.data_rshift = None self.data_type = None def __str__(self): rtn = "0x%08X, 0x%08X, %s, %s, %s " % (self.name_hash, self.data_bitmask, self.data_offset.__str__(), self.data_rshift.__str__(), self.data_type.__str__()) # visible type string rtn += "(%s)\n" % (get_type_string(self.data_type)) return rtn # structure with the data from the BCSV that actually matters class smg_bcsv_table: def __init__(self): self.row_count = None self.col_count = None self.cols_info = [] # see cols_info self.rows_data = [] # all the other data def __str__(self): rtn = "### SMG_BCSV_TABLE - START\n" rtn += "Row count: %s\n" % (self.row_count) rtn += "Column count: %s\n" % (self.col_count) rtn += "Columns info: hash or name, bitmask, right-shift, type\n" for i in range(len(self.cols_info)): rtn += " Col[%d]: " % (i) rtn += "%s" % (self.cols_info[i]) rtn += "Row data:\n" for i in range(len(self.rows_data)): rtn += " Row[%s]:" % (i.__str__()) for data in self.rows_data[i]: rtn += " %s," % (data.__str__()) rtn = rtn[: -1] + "\n" rtn += "### SMG_BCSV_TABLE - END\n" return rtn # column info struct class cols_info: # keep the same column info def __init__(self): self.name_or_hash = None # if hash is known as its name if not, the hash as a hex string self.bitmask = None self.rshift = None self.type = None # as a string bruh def __str__(self): rtn = "%s, 0x%08X, %d, %s\n" % (self.name_or_hash, self.bitmask, self.rshift, self.type) return rtn # create a global variable to hold temporal information bcsv_raw_info = None bcsv_raw_error_str = "bcsv-raw-error: " bcsv_table_error_str = "bcsv-table-error: " f = None # main function, will read and will check while reading # as BCSVs don't have magic, will have to check if it is well formatted # in big endian and if it is not it will try to check if it is good in # little endian, if both checks fail the file is bad (or I have a reading skill issue) def read_bcsv_file(filepath_or_stream, endian): # check params if (((type(filepath_or_stream) != io.BytesIO) and (type(filepath_or_stream) != str)) or (endian not in ["BIG", "LITTLE", "AUTO"])): result = bcsv_raw_error_str + "function parameters" print(result) return result # make global variables editable global f global bcsv_raw_info # "pre read" the file result_str = "" if (endian == "BIG"): result_str = check_bcsv_file(filepath_or_stream, ">") print("big endian: %s" % (result_str)) elif (endian == "LITTLE"): result_str = check_bcsv_file(filepath_or_stream, "<") print("little endian: %s" % (result_str)) elif (endian == "AUTO"): result_str = check_bcsv_file(filepath_or_stream, ">") print("big endian: %s" % (result_str)) if (big_result_str != bcsv_raw_error_str + "all good"): result_str = check_bcsv_file(filepath_or_stream, "<") print("little endian: %s" % (result_str)) # failure trying to identify the BCSV table if ("all good" not in result_str): return None # get the BCSV useful data out of that prison # ~ print(bcsv_raw_info) bcsv_table_info = smg_bcsv_table() # row and col count bcsv_table_info.row_count = bcsv_raw_info.header.row_count bcsv_table_info.col_count = bcsv_raw_info.header.col_count # get the hash names/hex string (if known) # and the column properties for i in range(bcsv_table_info.col_count): string = get_hash_string(bcsv_raw_info.columns_info[i].name_hash) bcsv_table_info.cols_info.append(smg_bcsv_table.cols_info()) bcsv_table_info.cols_info[-1].name_or_hash = string bcsv_table_info.cols_info[-1].bitmask = bcsv_raw_info.columns_info[i].data_bitmask bcsv_table_info.cols_info[-1].rshift = bcsv_raw_info.columns_info[i].data_rshift bcsv_table_info.cols_info[-1].type = get_type_string(bcsv_raw_info.columns_info[i].data_type) # assign the row slots for i in range(bcsv_table_info.row_count): bcsv_table_info.rows_data.append([]) # get all the cell items # iterate over the columns then the rows # each column at a time endian_ch = ">" if (bcsv_raw_info.endian == "BIG") else "<" for i in range(bcsv_table_info.col_count): # get the type, offset, endian base_offset = bcsv_raw_info.columns_info[i].data_offset data_type = bcsv_table_info.cols_info[i].type for j in range(bcsv_table_info.row_count): value_offset = base_offset + (j * bcsv_raw_info.header.row_data_length) # grab the specific datatype value = None # bitmask/bitshift wont be done on STRING/STRING_OFFSET/FLOAT types (it would be weird) # treat integer variables as signed. It is actually a bit more readable if (data_type in ["LONG", "LONG_2"]): value = struct.unpack(endian_ch + "I", bcsv_raw_info.data_pool[value_offset : value_offset + 4])[0] value = (value & bcsv_table_info.cols_info[i].bitmask) >> bcsv_table_info.cols_info[i].rshift value = struct.unpack(">i", struct.pack(">I", value))[0] elif (data_type == "SHORT"): value = struct.unpack(endian_ch + "H", bcsv_raw_info.data_pool[value_offset : value_offset + 2])[0] value = (value & bcsv_table_info.cols_info[i].bitmask) >> bcsv_table_info.cols_info[i].rshift value = struct.unpack(">h", struct.pack(">H", value))[0] elif (data_type == "CHAR"): value = struct.unpack(endian_ch + "B", bcsv_raw_info.data_pool[value_offset : value_offset + 1])[0] value = (value & bcsv_table_info.cols_info[i].bitmask) >> bcsv_table_info.cols_info[i].rshift value = struct.unpack(">b", struct.pack(">B", value))[0] elif (data_type == "FLOAT"): value = struct.unpack(endian_ch + "f", bcsv_raw_info.data_pool[value_offset : value_offset + 4])[0] elif (data_type == "STRING_OFFSET"): value = struct.unpack(endian_ch + "I", bcsv_raw_info.data_pool[value_offset : value_offset + 4])[0] elif (data_type == "STRING"): value = bcsv_raw_info.data_pool[value_offset : value_offset + 32].decode("cp932").replace("\0", "") # check if the data type was a string offset if (data_type == "STRING_OFFSET"): string_offset = value string_length = 0 while (bcsv_raw_info.string_pool[string_offset + string_length] != 0): string_length += 1 value = bcsv_raw_info.string_pool[string_offset : string_offset + string_length].decode("cp932") # assign the value bcsv_table_info.rows_data[j].append(value) f.close() # ~ print(bcsv_table_info) return bcsv_table_info # function to check a BCSV file before getting its full information out def check_bcsv_file(filepath_or_stream, endian_ch): # check its size first file_size = file_ops.get_file_size(filepath_or_stream) if (file_size <= 16): return bcsv_raw_error_str + "file size - header" # make global variables editable global f global bcsv_raw_info # open the file if it is a filepath if (type(filepath_or_stream) == str): f = open(filepath_or_stream, "rb") else: f = filepath_or_stream f.seek(0) # holder for variables bcsv_raw_info = smg_bcsv_raw(); # header # row count, col count, row data offset, row data length bcsv_raw_info.header.row_count = struct.unpack(endian_ch + "I", f.read(4))[0] bcsv_raw_info.header.col_count = struct.unpack(endian_ch + "I", f.read(4))[0] bcsv_raw_info.header.data_pool_offset = struct.unpack(endian_ch + "I", f.read(4))[0] bcsv_raw_info.header.row_data_length = struct.unpack(endian_ch + "I", f.read(4))[0] # row_count can be 0 # StageData/AsteroidBlockZone.arc/stage/jmp/childobj/common/childobjinfo if (bcsv_raw_info.header.col_count == 0): return bcsv_raw_error_str + "col count" # data pool offset will be read and used by the game, idk if not 4 byte aligments will work # I know that the game crashes when reading a float not 4 byte aligned but it is # better to keep the 4 byte alignment if (bcsv_raw_info.header.row_count != 0): if ((bcsv_raw_info.header.data_pool_offset >= file_size) or (bcsv_raw_info.header.data_pool_offset % 4 != 0)): return bcsv_raw_error_str + "row data offset" else: # BCSVs with 0 rows if (bcsv_raw_info.header.data_pool_offset % 4 != 0): return bcsv_raw_error_str + "row data offset" # "several data cells can reference data on the size byte field" # so I can't expect row_data_length to be something related to col_count if (bcsv_raw_info.header.row_data_length == 0): return bcsv_raw_error_str + "row data length" # check file size again # considering the column data info if (bcsv_raw_info.header.row_count != 0): if (file_size <= (16 + bcsv_raw_info.header.col_count * 12)): return bcsv_raw_error_str + "file size - row/col count" else: if (file_size < (16 + bcsv_raw_info.header.col_count * 12)): return bcsv_raw_error_str + "file size - row/col count" # considering the data pool offset + data row length if (bcsv_raw_info.header.row_count != 0): if (file_size < (bcsv_raw_info.header.data_pool_offset + (bcsv_raw_info.header.row_count * bcsv_raw_info.header.row_data_length))): return bcsv_raw_error_str + "file size - data pool offset/row count/row data length" # column data info # read each column data info max_data_pool_size = 0 for i in range(bcsv_raw_info.header.col_count): bcsv_raw_info.columns_info.append(bcsv_raw_info.col_cells_data_info()) # check offset and data type bcsv_raw_info.columns_info[-1].name_hash = struct.unpack(endian_ch + "I", f.read(4))[0] bcsv_raw_info.columns_info[-1].data_bitmask = struct.unpack(endian_ch + "I", f.read(4))[0] bcsv_raw_info.columns_info[-1].data_offset = struct.unpack(endian_ch + "H", f.read(2))[0] bcsv_raw_info.columns_info[-1].data_rshift = struct.unpack(endian_ch + "B", f.read(1))[0] bcsv_raw_info.columns_info[-1].data_type = struct.unpack(endian_ch + "B", f.read(1))[0] # check data type if (bcsv_raw_info.columns_info[-1].data_type > 6): return bcsv_raw_error_str + "data type" # check offset (BCSVs without rows are valid) if ((bcsv_raw_info.header.row_count != 0) and (file_size < (bcsv_raw_info.header.data_pool_offset + bcsv_raw_info.columns_info[-1].data_offset + ((bcsv_raw_info.header.row_count - 1) * bcsv_raw_info.header.row_data_length) + get_data_type_size(bcsv_raw_info.columns_info[-1].data_type)))): return bcsv_raw_error_str + "data cell offset" # if it is a float type, check if the float offset is a multiple of 4 # I think this is the only alignment restriction if ((bcsv_raw_info.columns_info[-1].data_type == 2) and (bcsv_raw_info.columns_info[-1].data_offset % 4 != 0)): return bcsv_raw_error_str + "float value offset" # get the updated max data pool size tmp = (bcsv_raw_info.columns_info[-1].data_offset + ((bcsv_raw_info.header.row_count - 1) * bcsv_raw_info.header.row_data_length) + get_data_type_size(bcsv_raw_info.columns_info[-1].data_type)) if (tmp > max_data_pool_size): max_data_pool_size = tmp # interesting, max_data_pool_size does not necessarily match with row_count * row_data_length # but the last one is the actual data pool length # StageData/AsteroidBlockZone.arc/stage/jmp/childobj/layerb/childobjinfo max_data_pool_size = bcsv_raw_info.header.row_count * bcsv_raw_info.header.row_data_length # there are too much 4 byte alignments, like, too much (for floats) # check if the data pool overflows if (file_size < bcsv_raw_info.header.data_pool_offset + max_data_pool_size): return bcsv_raw_error_str + "data pool size" # check the string offset values to check for overflow max_string_pool_size = 0 for cols_info in bcsv_raw_info.columns_info: for i in range(bcsv_raw_info.header.row_count): # iterate through each row if (cols_info.data_type == 6): # string offset # get the offset value from the data pool f.seek(bcsv_raw_info.header.data_pool_offset + cols_info.data_offset + (i * bcsv_raw_info.header.row_data_length)) string_offset = struct.unpack(endian_ch + "I", f.read(4))[0] # get the string size f.seek(bcsv_raw_info.header.data_pool_offset + max_data_pool_size + string_offset) string_size = 1 # count nul character beforehand tmp_byte = f.read(1) # pre-read the first character while (tmp_byte != b"\x00"): # strings 0x00 terminated is a must if (tmp_byte == b""): # end of file reached return bcsv_raw_error_str + "string offset" string_size += 1 tmp_byte = f.read(1) # update the max string pool size tmp = string_offset + string_size if (tmp > max_string_pool_size): max_string_pool_size = tmp # whether there is a data pool there can # be a string pool floating and unreferenced # or just unreferenced strings attached to the string pool f.seek(bcsv_raw_info.header.data_pool_offset + max_data_pool_size + max_string_pool_size) tmp = f.read(1) while (tmp != b""): max_string_pool_size += 1 tmp = f.read(1) # get the data pool f.seek(bcsv_raw_info.header.data_pool_offset) bcsv_raw_info.data_pool = f.read(max_data_pool_size) # get the string pool bcsv_raw_info.string_pool = f.read(max_string_pool_size) # check the data pool and the string pool for the string types # ensure they are CP932 decodable for i in range(bcsv_raw_info.header.col_count): for j in range(bcsv_raw_info.header.row_count): # STRING or STRING_OFFSET types string_offset = (bcsv_raw_info.columns_info[i].data_offset + (j * bcsv_raw_info.header.row_data_length)) string = b"" if (bcsv_raw_info.columns_info[i].data_type == 1): string = bcsv_raw_info.data_pool[string_offset : string_offset + 32] if (string[-1] != 0): # null terminator (it is for safety) return bcsv_raw_error_str + "string type not null terminated" elif (bcsv_raw_info.columns_info[i].data_type == 6): string_offset = struct.unpack(endian_ch + "I", bcsv_raw_info.data_pool[string_offset : string_offset + 4])[0] k = 0 while (bcsv_raw_info.string_pool[string_offset + k] != 0): string += bcsv_raw_info.string_pool[string_offset + k : string_offset + k + 1] k += 1 # try decoding the string try: string.decode("cp932") except: return bcsv_raw_error_str + "string encoding" # if shit is good so far then the endian choice was probably good! bcsv_raw_info.endian = "BIG" if (endian_ch == ">") else "LITTLE" return bcsv_raw_error_str + "all good" # check if a smg_bcsv_table structure is good def check_smg_bcsv_table(table): # check if the information in the smg_bcsv_table struct is valid # the only stuff I can check is: # row/column count # data types must be the known data types # all the cells in a BCSV table must exist (not sure if this is the case always) # type checking each of the columns against the type specified # strings must be CP932 encoded (unsure but will do this anyways) # enforce structure types if (type(table) != smg_bcsv_table): return bcsv_table_error_str + "smg_bcsv_table struct" # row/col count if (type(table.row_count) != int or type(table.col_count) != int # or table.row_count <= 0 # row_count can be 0 or table.col_count <= 0): return bcsv_table_error_str + "row/col count" # check cols_info if (table.col_count != len(table.cols_info)): return bcsv_table_error_str + "cols_info size" for cols_info in table.cols_info: # check cols_info struct if (type(cols_info) != smg_bcsv_table.cols_info): return bcsv_table_error_str + "cols_info struct" # name or hash if (type(cols_info.name_or_hash) != str): return bcsv_table_error_str + "column name or hash" try: cols_info.name_or_hash.encode("cp932") if (cols_info.name_or_hash.startswith("0x") or cols_info.name_or_hash.startswith("0X")): number = int(cols_info.name_or_hash, 16) if (number > 0xFFFFFFFF): return bcsv_table_error_str + "column name or hash" except: return bcsv_table_error_str + "column name or hash" # bitmask if (type(cols_info.bitmask) != int or cols_info.bitmask < 0): return bcsv_table_error_str + "column bitmask" # right shift if (type(cols_info.rshift) != int or cols_info.rshift < 0): return bcsv_table_error_str + "column right shift" # type if (type(cols_info.type) != str or (cols_info.type in TYPE_INT_TO_STRING) == False): return bcsv_table_error_str + "column data type" # check rows_data and enforce the types if (type(table.rows_data) != list or len(table.rows_data) != table.row_count): return bcsv_table_error_str + "rows_data list (row)" for row in table.rows_data: if (type(row) != list or len(row) != table.col_count): return bcsv_table_error_str + "rows_data list (column)" # actually check the data now for i in range(table.col_count): type_to_compare = TYPE_INT_TO_PYTHON_TYPE[TYPE_STRING_TO_INT[table.cols_info[i].type]] for j in range(table.row_count): # check the type if (type(table.rows_data[j][i]) != type_to_compare): return bcsv_table_error_str + "incorrect cell datatype" # check string encoding string_size = 0 if (type_to_compare == str): try: string_size = len(table.rows_data[j][i].encode("cp932")) except: return bcsv_table_error_str + "string with incorrect encoding" # if it is the STRING type, check if its encoded representation can fit in 32 bytes # include the null terminator, although you could have out of bounds strings if you want (I think) if (TYPE_STRING_TO_INT[table.cols_info[i].type] == 1 and string_size >= 32): return bcsv_table_error_str + "STRING type overflow" # all is good (hopefully) return bcsv_table_error_str + "all good" # create smg_bcsv_raw from smg_bcsv_table # will only attempt to "compress data" into byte fields on "non-standard" bitmask/rshift values # this "compression" will be only done on consecutive data cells def create_smg_bcsv_raw(table, endian_ch, use_std_pad_size): # calls check_smg_bcsv_table() result = check_smg_bcsv_table(table) print(result) if (result != bcsv_table_error_str + "all good"): return None # build a new raw structure and return it raw = smg_bcsv_raw() raw.endian = "BIG" if (endian_ch == "<"): raw.endian = "LITTLE" # assign the easy variables raw.header.row_count = table.row_count raw.header.col_count = table.col_count raw.header.data_pool_offset = 16 + table.col_count * 12 # ^ lame calculation, this offset can be # different and the game will read and use it >:] # calculate row_data_length while filling the column data raw.header.row_data_length = 0 i = 0 # iterate over the column data # do not enforce "field order" for now (because I think it is unnecessary) # the only enforcement I will add for now is that floats need to be 4 byte aligned # hopefully the other integer types don't need alignment (to verify experimentally) accumulated_bitmasks = 0 while (i < table.col_count): # generate new column info raw.columns_info.append(smg_bcsv_raw.col_cells_data_info()) # name hash if (table.cols_info[i].name_or_hash.startswith("0x")): raw.columns_info[-1].name_hash = int(table.cols_info[i].name_or_hash, 16) else: raw.columns_info[-1].name_hash = calc_bytes_hash(table.cols_info[i].name_or_hash.encode("cp932")) # bitmask, data offset, rshift and data type raw.columns_info[-1].data_bitmask = table.cols_info[i].bitmask raw.columns_info[-1].data_offset = 0 # to update in the following if-else raw.columns_info[-1].data_rshift = table.cols_info[i].rshift raw.columns_info[-1].data_type = TYPE_STRING_TO_INT[table.cols_info[i].type] # can be compressed? # if it uses a weird bitmask, surely, the shift variable is read and used # all types will be considered except for the "STRING" type and the "FLOAT" type if ((raw.columns_info[-1].data_bitmask != TYPE_INT_TO_STD_BITMASK[raw.columns_info[-1].data_type]) and ((accumulated_bitmasks & raw.columns_info[-1].data_bitmask) == 0) and (i != 0) and (raw.columns_info[-1].data_type != 1) and (raw.columns_info[-1].data_type != 2)): # update the accumulated_bitmasks accumulated_bitmasks |= raw.columns_info[-1].data_bitmask # grab the previous column data_offset raw.columns_info[-1].data_offset = raw.columns_info[-2].data_offset # do not update raw.header.row_data_length # pack the data normally else: # reset the accumulated bitmask to this exact column bitmask if (raw.columns_info[-1].data_type == 2): # adjust offset for float while (raw.header.row_data_length % 4 != 0): raw.header.row_data_length += 1 accumulated_bitmasks = raw.columns_info[-1].data_bitmask raw.columns_info[-1].data_offset = raw.header.row_data_length raw.header.row_data_length += TYPE_INT_TO_SIZE[raw.columns_info[-1].data_type] # increase i for the next loop i += 1 # populate the data pool (use the last column offset to get the length of the data pool) # bytearray with the save raw.data_pool = bytearray(raw.columns_info[-1].data_offset + ((raw.header.row_count - 1) * raw.header.row_data_length) + TYPE_INT_TO_SIZE[raw.columns_info[-1].data_type]) # with the offsets defined, store the data string_pool_strings_pos = {} string_pool_offset_pos = 0 for i in range(table.row_count): for j in range(table.col_count): tmp = None # only for integers type_ch = None # LONG or LONG_2 if (raw.columns_info[j].data_type in [0, 3]): type_ch = "I" # SHORT elif (raw.columns_info[j].data_type == 4): type_ch = "H" # CHAR elif (raw.columns_info[j].data_type == 5): type_ch = "B" # LONG, LONG_2, SHORT or CHAR if (type_ch in ["I", "H", "B"]): # ~ print((table.rows_data[i][j] << raw.columns_info[j].data_rshift) & raw.columns_info[j].data_bitmask) tmp = struct.pack(endian_ch + type_ch, (table.rows_data[i][j] << raw.columns_info[j].data_rshift) & raw.columns_info[j].data_bitmask) # STRING elif (raw.columns_info[j].data_type == 1): tmp = table.rows_data[i][j].encode("cp932") # FLOAT elif (raw.columns_info[j].data_type == 2): tmp = struct.pack(endian_ch + "f", table.rows_data[i][j]) # STRING_OFFSET elif (raw.columns_info[j].data_type == 6): # search if the string is already in the string pool if (table.rows_data[i][j] in string_pool_strings_pos): tmp = struct.pack(endian_ch + "I", string_pool_strings_pos[table.rows_data[i][j]]) else: encoded_string = table.rows_data[i][j].encode("cp932") + b"\x00" tmp = struct.pack(endian_ch + "I", string_pool_offset_pos) raw.string_pool += encoded_string string_pool_strings_pos.update({table.rows_data[i][j] : string_pool_offset_pos}) string_pool_offset_pos += len(encoded_string) # write the data for k in range(len(tmp)): raw.data_pool[raw.columns_info[j].data_offset + (i * raw.header.row_data_length) + k] |= tmp[k] # lol bytes() raw.data_pool = bytes(raw.data_pool) # append the last padding pad_size = 4 if (use_std_pad_size): pad_size = 32 tmp_file_size = 16 + (raw.header.col_count * 12) + len(raw.data_pool) + len(raw.string_pool) while ((tmp_file_size % pad_size) != 0): raw.string_pool += b"@" tmp_file_size += 1 # done! print(raw) return raw # write smg_bcsv_raw send a bytes object if filepath == None def write_smg_bcsv_raw(raw, filepath): # create the bytes object data = bytes() # get endian_ch endian_ch = ">" if (raw.endian == "LITTLE"): endian_ch = "<" # header data += struct.pack(endian_ch + "I", raw.header.row_count) data += struct.pack(endian_ch + "I", raw.header.col_count) data += struct.pack(endian_ch + "I", raw.header.data_pool_offset) data += struct.pack(endian_ch + "I", raw.header.row_data_length) # column info for i in range(raw.header.col_count): data += struct.pack(endian_ch + "I", raw.columns_info[i].name_hash) data += struct.pack(endian_ch + "I", raw.columns_info[i].data_bitmask) data += struct.pack(endian_ch + "H", raw.columns_info[i].data_offset) data += struct.pack(endian_ch + "B", raw.columns_info[i].data_rshift) data += struct.pack(endian_ch + "B", raw.columns_info[i].data_type) # data pool data += raw.data_pool # string pool data += raw.string_pool # done! if (filepath != None): f = open(file_ops.get_path_str(filepath), "wb") f.write(data) f.close() else: return data # valid table operations # single operations: # insert/move/remove a row/col at a certain index # change a cell value rows_data/cols_info # change a cols_info[index].type value (can change all values of the respective column) # # what a command needs so that it can be executed # operation / type of element operated / list of values needed for the operation # # "INSERT" / "ROW" / [10, [row to insert values]] # insert a row at index 10 # # "INSERT" / "COLUMN" / [7, [col_info to insert values], [column to insert values]] # insert a column at index 7 # # "MOVE" / "COLUMN" / [9, 3] # move a column from index 9 to index 3 # # "REMOVE" / "ROW" / [0, [row to remove values]] # remove the row at index 0 # # "REMOVE" / "COL" / [7, [col_info to remove values], [column to remove values]] # remove the column at index 7 # # "EDIT" / "CELL" / ["cols_info", 3, "bitmask", "FFFF", "ABAB"] # edit the cell cols_info[3].bitmask value from "FFFF" to "ABAB" # # "EDIT" / "CELL" / ["rows_data", 3, 4, "LMAO", "OAML"] # edit the cell rows_data[3][4] value from "LMAO" to "OAML" # # "EDIT" / "CELL" / ["cols_info", 0, "type", "LONG", "STRING", [old column values], [new column values]] # edit the cell cols_info[0].type value from "LONG" to "STRING" COMMAND_LIST = ["INSERT", "MOVE", "REMOVE", "EDIT"] ELEMENT_TO_OP = ["ROW", "COLUMN", "CELL"] # determines if a type is correct for a specific value def cell_data_is_type(type_string, value): # check params if (type_string not in TYPE_INT_TO_STRING): print("value check: type is not valid") return False # check value if (type_string in ["LONG", "LONG_2"]): # LONG, LONG_2 if (type(value) != int or value < -0x7FFFFFFF or value > 0x7FFFFFFF): print("value check: value is not a LONG/LONG_2 type") return False elif (type_string == "SHORT"): # SHORT if (type(value) != int or value < -0x7FFF or value > 0x7FFF): print("value check: value is not a SHORT type") return False elif (type_string == "CHAR"): # CHAR if (type(value) != int or value < -0x7F or value > 0x7F): print("value check: value is not a CHAR type") return False elif (type_string == "FLOAT"): # FLOAT if (type(value) != float): print("value check: value is not a FLOAT type") return False elif (type_string == "STRING"): # STRING if (type(value) != str): print("value check: value is not a STRING type") return False try: enc = value.encode("cp932") if (len(enc) >= 32): print("value check: STRING type encoded representation larger than 32 bytes") return False except: print("value check: STRING type cannot be encoded into CP932") return False elif (type_string == "STRING_OFFSET"): # STRING_OFFSET if (type(value) != str): print("value check: value is not a STRING_OFFSET type") return False try: enc = value.encode("cp932") except: print("value check: STRING_OFFSET type cannot be encoded into CP932") return False # all good return True # determines if a col_info list of values is valid def check_col_info_values(col_info_values): # check params if ((type(col_info_values) != list) or (len(col_info_values) != 4)): print("col info check: invalid col info value list") return False # hash or name if (type(col_info_values[0]) != str): print("col info check: name or hash is not a string") return False try: col_info_values[0].encode("cp932") if (col_info_values[0].upper().startswith("0X")): number = int(col_info_values[0], 16) if (number > 0xFFFFFFFF): print("col info check: hash value larger than expected") return False except: print("col info check: name is not CP932 encodable/hash cannot be interpreted as a hex string") return False # bitmask if ((type(col_info_values[1]) != int) or (col_info_values[2] < 0) or (col_info_values[2] > 0xFFFFFFFF)): print("col info check: invalid bitmask value") return False # rshift if ((type(col_info_values[2]) != int) or (col_info_values[2] < 0) or (col_info_values[2] > 0xFF)): print("col info check: invalid rshift value") return False # type if (col_info_values[3] not in TYPE_INT_TO_STRING): print("col info check: invalid type string value") return False # all good return True # check a smg bcsv table command def check_table_cmd(table, operation, element, rest_of_values): # check the table if ("all good" not in check_smg_bcsv_table(table)): return False # check the command, operation and element if ((operation not in COMMAND_LIST) or (element not in ELEMENT_TO_OP)): return False # rest_of_values_needed if (type(rest_of_values) != list): return False # insert a row at a specific index with some row values if (operation == "INSERT" and element == "ROW"): # rest of values checking if (len(rest_of_values) != 2): return False insert_index = rest_of_values[0] insert_row_values = rest_of_values[1] if (type(insert_index) != int or insert_index < 0 or insert_index > table.row_count): return False if (type(insert_row_values) != list or len(insert_row_values) != table.col_count): return False # check if the elements on the row match the type of the column for i in range(table.col_count): if (cell_data_is_type(table.cols_info[i].type, insert_row_values[i]) == False): return False # insert a column at a specific index with some col_info values and some column values elif (operation == "INSERT" and element == "COLUMN"): # rest of values checking if (len(rest_of_values) != 3): return False insert_index = rest_of_values[0] insert_col_info = rest_of_values[1] insert_col_values = rest_of_values[2] if (type(insert_index) != int or insert_index < 0 or insert_index > table.col_count): return False if (check_col_info_values(insert_col_info) == False): return False if (type(insert_col_values) != list or len(insert_col_values) != table.row_count): return False # check if the elements on the column match the type of the column for i in range(table.row_count): if (cell_data_is_type(insert_col_info[3], insert_col_values[i]) == False): return False # remove a row from a specific index, specify the row values to be removed elif (operation == "REMOVE" and element == "ROW"): # rest of values checking if (len(rest_of_values) != 2): return False remove_index = rest_of_values[0] remove_row_values = rest_of_values[1] if (type(remove_index) != int or remove_index < 0 or remove_index >= table.row_count): return False if (type(remove_row_values) != list or len(remove_row_values) != table.col_count): return False # check if the elements on the row elements match the type of the column for i in range(table.col_count): if (cell_data_is_type(table.cols_info[i].type, remove_row_values[i]) == False): return False # check if the row to remove values are equal to the values of the actual row going to be removed if (remove_row_values != table.rows_data[remove_index]): return False # remove a column from a specific index, specify the col_info and the column values to be removed elif (operation == "REMOVE" and element == "COLUMN"): # rest of values checking if (len(rest_of_values) != 3): return False remove_index = rest_of_values[0] remove_col_info = rest_of_values[1] remove_col_values = rest_of_values[2] if (type(remove_index) != int or remove_index < 0 or remove_index >= table.col_count): return False if (check_col_info_values(remove_col_info) == False): return False # check if the col_info values to remove are the same as the col_info to remove if ((remove_col_info[0] != table.cols_info[remove_index].name_or_hash) or (remove_col_info[1] != table.cols_info[remove_index].bitmask) or (remove_col_info[2] != table.cols_info[remove_index].rshift) or (remove_col_info[3] != table.cols_info[remove_index].type)): return False if (type(remove_col_values) != list or len(remove_col_values) != table.row_count): return False # check if the elements on the column match the type of the column for i in range(table.row_count): if (cell_data_is_type(remove_col_info[3], remove_col_values[i]) == False): return False # move a row from an index to another index elif(operation == "MOVE" and element == "ROW"): # rest of values checking if (len(rest_of_values) != 2): return False old_index = rest_of_values[0] new_index = rest_of_values[1] if (type(old_index) != int or old_index < 0 or old_index >= table.row_count): return False if (type(new_index) != int or new_index < 0 or new_index >= table.row_count): return False # move a column from an index to another index elif(operation == "MOVE" and element == "COLUMN"): # rest of values checking if (len(rest_of_values) != 2): return False old_index = rest_of_values[0] new_index = rest_of_values[1] if (type(old_index) != int or old_index < 0 or old_index >= table.col_count): return False if (type(new_index) != int or new_index < 0 or new_index >= table.col_count): return False # edit elif (operation == "EDIT" and element == "CELL"): # rest_of_values check if (len(rest_of_values) < 5): return False data_path = rest_of_values[0] if (data_path not in ["cols_info", "rows_data"]): return False # rows_data if (data_path == "rows_data"): if (len(rest_of_values) != 5): return False row_index = rest_of_values[1] col_index = rest_of_values[2] if (type(row_index) != int or row_index < 0 or row_index >= table.row_count): return False if (type(col_index) != int or col_index < 0 or col_index >= table.col_count): return False old_value = rest_of_values[3] new_value = rest_of_values[4] if (old_value != table.row_data[row_index][col_index]): return False if (cell_data_is_type(table.cols_info[col_index].type, new_value) == False): return False # cols_info elif (data_path == "cols_info"): col_index = rest_of_values[1] if (type(col_index) != int or col_index < 0 or col_index >= table.col_count): return False inner_data_path = rest_of_values[2] if (inner_data_path not in ["name_or_hash", "bitmask", "rshift", "type"]): return False # type if (inner_data_path == "type"): if (len(rest_of_values) != 7): return False old_value = rest_of_values[3] new_value = rest_of_values[4] if (old_value != table.cols_info[col_index].type): return False if (new_value not in TYPE_INT_TO_STRING): return False # reuse this same function cleverly (recursive lets goooo) tmp = table.cols_info[col_index] tmp_col_info = [tmp.name_or_hash, tmp.bitmask, tmp.rshift, tmp.type] old_column_values = rest_of_values[5] tmp_rest_of_values = [col_index, tmp_col_info, old_column_values] if (check_table_cmd("REMOVE", "COLUMN", tmp_rest_of_values) == False): return False tmp_col_info[3] = new_value new_column_values = rest_of_values[6] tmp_rest_of_values[3] = new_column_values if (check_table_cmd("INSERT", "COLUMN", tmp_rest_of_values) == False): return False # name_or_hash, bitmask, rshift else: if (len(rest_of_values) != 5): return False # reuse check_col_info_values() old_value = rest_of_values[3] new_value = rest_of_values[4] if (old_value != eval("table.cols_info[col_index].%s" % (inner_data_path))): return False if (inner_data_path == "name_or_hash"): if (check_col_info_values([new_value, 0, 0, "LONG"]) == False): return False elif (inner_data_path == "bitmask"): if (check_col_info_values(["a", new_value, 0, "LONG"]) == False): return False elif (inner_data_path == "rshift"): if (check_col_info_values(["a", 0, new_value, "LONG"]) == False): return False # all good return True # execute a table command def exec_table_cmd(table, operation, element, rest_of_values): # check the command if (check_table_cmd(table, operation, element, rest_of_values) == False): return None # execute the operation on the table if (operation == "INSERT"): insert_index = rest_of_values[0] # insert a row if (element == "ROW"): table.row_count += 1 insert_list = rest_of_values[1] # rows_data table.rows_data = table.rows_data[ : insert_index] + [insert_list] + table.rows_data[insert_index : ] # insert a column elif (element == "COLUMN"): table.col_count += 1 insert_col_info = smg_bcsv_table.cols_info() insert_col_info.name_or_hash = rest_of_values[1][0] insert_col_info.bitmask = rest_of_values[1][1] insert_col_info.rshift = rest_of_values[1][2] insert_col_info.type = rest_of_values[1][3] insert_col_values = rest_of_values[2] # col_info table.cols_info = (table.cols_info[ : insert_index] + [insert_col_info] + table.cols_info[insert_index : ]) # rows_data for i in range(table.row_count): table.rows_data[i] = (table.rows_data[i][ : insert_index] + [insert_col_values[i]] + table.rows_data[i][insert_index : ]) elif (operation == "MOVE"): # indexes old_index = rest_of_values[0] new_index = rest_of_values[1] # move a row if (element == "ROW"): # rows_data to_move = table.rows_data[old_index] table.rows_data = table.rows_data[ : old_index] + table.rows_data[old_index + 1 : ] table.rows_data = table.rows_data[ : new_index] + [to_move] + table.rows_data[new_index : ] # move a column elif (element == "COLUMN"): # cols_info col_info_to_move = table.cols_info[old_index] table.cols_info = table.cols_info[ : old_index] + table.cols_info[old_index + 1 : ] table.cols_info = table.cols_info[ : new_index] + [col_info_to_move] + table.cols_info[new_index : ] # row_data for i in range(table.row_count): value_to_move = table.rows_data[i][old_index] table.rows_data[i] = table.rows_data[i][ : old_index] + table.rows_data[i][old_index + 1 : ] table.rows_data[i] = table.rows_data[i][ : new_index] + [value_to_move] + table.rows_data[i][new_index : ] elif (operation == "REMOVE"): remove_index = rest_of_values[0] # remove a row if (element == "ROW"): table.row_count -= 1 # rows_data table.rows_data = table.rows_data[ : remove_index] + table.rows_data[remove_index + 1 : ] # remove a column elif (element == "COLUMN"): table.col_count -= 1 # cols_info table.cols_info = table.cols_info[ : remove_index] + table.cols_info[remove_index + 1: ] # row_data for i in range(table.row_count): table.rows_data[i] = table.rows_data[i][ : remove_index] + table.rows_data[i][remove_index + 1 : ] elif (operation == "EDIT"): # edit a cell if (element == "CELL"): data_path = rest_of_values[0] # rows_data cell if (data_path == "rows_data"): row_index = rest_of_values[1] col_index = rest_of_values[2] table.rows_data[row_index][col_index] = rest_of_values[4] # cols_info cell elif (data_path == "cols_info"): col_index = rest_of_values[1] inner_data_path = rest_of_values[2] # type if (inner_data_path == "type"): table.cols_info[col_index].type = rest_of_values[4] for i in range(table.row_count): table.rows_data[i][col_index] = rest_of_values[6][i] # name_or_hash, bitmask, rshift else: if (inner_data_path == "name_or_hash"): table.cols_info[col_index].name_or_hash = rest_of_values[4] elif (inner_data_path == "bitmask"): table.cols_info[col_index].bitmask = rest_of_values[4] elif (inner_data_path == "rshift"): table.cols_info[col_index].rshift = rest_of_values[4] # all good, return the command list return [operation, element, rest_of_values] # assign a table reference values to another table reference def assign_table_values(src, dest): # src must be valid if ("all good" not in check_smg_bcsv_table(src) or type(dest) != smg_bcsv_table or type(dest.cols_info) != list or type(dest.rows_data) != list): return False # assign the values dest.row_count = src.row_count dest.col_count = src.col_count dest.cols_info.clear() dest.rows_data.clear() for i in range(src.col_count): dest.cols_info.append(copy.deepcopy(src.cols_info[i])) for i in range(src.row_count): dest.rows_data.append(src.rows_data[i].copy()) # done! return True