Source code for swmmio.version_control.inp

from collections import OrderedDict

from swmmio.utils.text import get_inp_sections_details
from swmmio.version_control import utils as vc_utils
from swmmio.utils.dataframes import dataframe_from_bi, dataframe_from_inp
import swmmio
import pandas as pd
import os
from copy import deepcopy
pd.options.display.max_colwidth = 200

problem_sections = ['[CURVES]', '[TIMESERIES]', '[RDII]', '[HYDROGRAPHS]']


[docs]class BuildInstructions(object): """ similar to the INPSectionDiff object, this object contains information used to generate an inp based on 'serialized' (though human readable, inp-esque) build instructions files. This object is meant to neatly encapsulate things. self.instructions attribute contains a dictionary with keys of the headers that have changes i.e. build instructions w.r.t baseline model """ def __init__(self, build_instr_file=None): # create a change object for each section that is different from baseline self.instructions = {} self.metadata = {} if build_instr_file: # read the instructions and create a dictionary of Change objects allheaders = get_inp_sections_details(build_instr_file) instructions = {} for section, _ in allheaders.items(): change = INPSectionDiff(build_instr_file=build_instr_file, section=section) instructions.update({section: change}) self.instructions = instructions # read the meta data self.metadata = vc_utils.read_meta_data(build_instr_file) def __add__(self, other): bi = BuildInstructions() for section, change_obj in self.instructions.items(): if section in other.instructions: new_change = change_obj + other.instructions[section] bi.instructions[section] = new_change else: # section doesn't exist in other, maintain current instructions bi.instructions[section] = change_obj for section, change_obj in other.instructions.items(): if section not in self.instructions: bi.instructions[section] = change_obj # combine the metadata # deepcopy so child structures aren't linked to original bi.metadata = deepcopy(self.metadata) otherbaseline = other.metadata['Parent Models']['Baseline'] otheralternatives = other.metadata['Parent Models']['Alternatives'] bi.metadata['Parent Models']['Baseline'].update(otherbaseline) bi.metadata['Parent Models']['Alternatives'].update(otheralternatives) bi.metadata['Log'].update(other.metadata['Log']) return bi def __radd__(self, other): # this is so we can call sum() on a list of build_instructions if other == 0: return self else: return self.__add__(other)
[docs] def save(self, dir, filename): """ save the current BuildInstructions instance to file (human readable) """ if not os.path.exists(dir): os.makedirs(dir) filepath = os.path.join(dir, filename) with open(filepath, 'w') as f: vc_utils.write_meta_data(f, self.metadata) for section, change_obj in self.instructions.items(): section_df = pd.concat([change_obj.removed, change_obj.altered, change_obj.added]) vc_utils.write_inp_section(f, allheaders=None, sectionheader=section, section_data=section_df, pad_top=False, na_fill='NaN')
[docs] def build(self, baseline_dir, target_path): """ build a complete INP file with the build instructions committed to a baseline model. """ basemodel = swmmio.Model(baseline_dir) allheaders = get_inp_sections_details(basemodel.inp.path) # new_inp = os.path.join(target_dir, 'model.inp') with open(target_path, 'w') as f: for section, _ in allheaders.items(): # check if the section is not in problem_sections and there are changes # in self.instructions and commit changes to it from baseline accordingly if (section not in problem_sections and allheaders[section]['columns'] != ['blob'] and section in self.instructions): # df of baseline model section basedf = dataframe_from_bi(basemodel.inp.path, section) basedf[';'] = ';' # grab the changes to changes = self.instructions[section] # remove elements that have alterations and or tagged for removal remove_ids = changes.removed.index | changes.altered.index new_section = basedf.drop(remove_ids) # add elements new_section = pd.concat([new_section, changes.altered, changes.added]) else: # section is not well understood or is problematic, just blindly copy new_section = dataframe_from_bi(basemodel.inp.path, section=section) new_section[';'] = ';' # write the section vc_utils.write_inp_section(f, allheaders, section, new_section)
[docs]class INPSectionDiff(object): """ This object represents the 'changes' of a given section of a INP file with respect to another INP. Three main dataframes are attributes: - **added**: includes elements that are new in model2 (compare to model1) - **removed**: elements that do not exist in model2, that were found to model1 - **altered**: elements whose attributes have changes from model1 to model2 :param model1: base model for diff :param model2: target model for diff :param section: section of the inp used for comparison :param build_instr_file: optionally instantiate an INPSectionDiff from an existing Build Instructions file >>> from swmmio.examples import jersey, jerzey >>> mydiff = INPSectionDiff(jersey, jerzey, section='JUNCTIONS') >>> print(mydiff) <BLANKLINE> InvertElev MaxDepth InitDepth SurchargeDepth PondedArea ; Comment Origin Name 1 17.0 0 0 0 0 ; Removed model_full_features_b.inp 2 17.0 0 0 0 0 ; Removed model_full_features_b.inp 3 16.5 0 0 0 0 ; Removed model_full_features_b.inp 4 16.0 0 0 0 0 ; Removed model_full_features_b.inp 5 15.0 0 0 0 0 ; Removed model_full_features_b.inp """ def __init__(self, model1=None, model2=None, section='JUNCTIONS', build_instr_file=None): self.model1 = model1 if model1 else "" self.model2 = model2 if model2 else "" if model1 and model2: df1 = dataframe_from_inp(model1.inp.path, section) df2 = dataframe_from_inp(model2.inp.path, section) df1[';'] = ';' df2[';'] = ';' col_order = list(df2.columns) + ['Comment', 'Origin'] m2_origin_string = os.path.basename(model2.inp.path).replace(' ', '-') # BUG -> this fails if a df1 or df2 is None i.e. if a section doesn't exist in one model added_ids = df2.index.difference(df1.index) removed_ids = df1.index.difference(df2.index) # find where elements were changed (but kept with same ID) common_ids = df1.index.difference(removed_ids) # original - removed = in common # both dfs concatenated, with matched indices for each element full_set = pd.concat([df1.loc[common_ids], df2.loc[common_ids]], sort=False) # remove whitespace full_set = full_set.apply(lambda x: x.astype(str).str.strip() if x.dtype == "object" else x) # drop dupes on the set, all things that did not changed should have 1 row changes_with_dupes = full_set.drop_duplicates() # duplicate indicies are rows that have changes, isolate these # idx[idx.duplicated()].unique() changed_ids = changes_with_dupes.index[changes_with_dupes.index.duplicated()].unique() # .get_duplicates() added = df2.loc[added_ids].copy() added['Comment'] = 'Added' # from model {}'.format(model2.inp.path) added['Origin'] = m2_origin_string altered = df2.loc[changed_ids].copy() altered['Comment'] = 'Altered' # in model {}'.format(model2.inp.path) altered['Origin'] = m2_origin_string removed = df1.loc[removed_ids].copy() removed['Comment'] = 'Removed' # in model {}'.format(model2.inp.path) removed['Origin'] = m2_origin_string # removed = removed[col_order] self.old = df1 self.new = df2 self.added = added self.removed = removed self.altered = altered if build_instr_file: # if generating from a build instructions file, do this (more efficient) df = dataframe_from_bi(build_instr_file, section=section) self.added = df.loc[df['Comment'] == 'Added'] self.removed = df.loc[df['Comment'] == 'Removed'] self.altered = df.loc[df['Comment'] == 'Altered'] def __add__(self, other): # this should be made more robust to catch conflicts change = INPSectionDiff() change.added = pd.concat([self.added, other.added], axis=0) change.removed = pd.concat([self.removed, other.removed], axis=0) change.altered = pd.concat([self.altered, other.altered], axis=0) return change def __str__(self): s = '' diff = pd.concat([self.removed, self.added, self.altered]) diffs = '\n{}'.format(diff.head().to_string()) return s+diffs
[docs]class INPDiff(object): """ Diff of all INP sections between two models :param model1: base model for diff :param model2: target model for diff >>> from swmmio.tests.data import MODEL_FULL_FEATURES_XY, MODEL_FULL_FEATURES_XY_B >>> mydiff = INPDiff(MODEL_FULL_FEATURES_XY, MODEL_FULL_FEATURES_XY_B) >>> print(mydiff.diffs['XSECTIONS']) <BLANKLINE> Shape Geom1 Geom2 Geom3 Geom4 Barrels XX ; Comment Origin Link 1:4 CIRCULAR 1 0 0 0 1.0 NaN ; Removed model_full_features_b.inp 2:5 CIRCULAR 1 0 0 0 1.0 NaN ; Removed model_full_features_b.inp 3:4 CIRCULAR 1 0 0 0 1.0 NaN ; Removed model_full_features_b.inp 4:5 CIRCULAR 1 0 0 0 1.0 NaN ; Removed model_full_features_b.inp 5:J1 CIRCULAR 1 0 0 0 1.0 NaN ; Removed model_full_features_b.inp """ def __init__(self, model1=None, model2=None): m1 = model1 m2 = model2 if isinstance(m1, str): m1 = swmmio.Model(m1) if isinstance(m2, str): m2 = swmmio.Model(m2) self.m1 = m1 self.m2 = m2 self.diffs = OrderedDict() m1_sects = get_inp_sections_details(m1.inp.path) m2_sects = get_inp_sections_details(m2.inp.path) # get union of sections found, maintain order sects = list(m1_sects.keys()) + list(m2_sects.keys()) seen = set() self.all_sections = [x for x in sects if not (x in seen or seen.add(x))] self.all_inp_objects = OrderedDict(m1_sects) self.all_inp_objects.update(m2_sects) for section in self.all_sections: if section not in problem_sections: # calculate the changes in the current section changes = INPSectionDiff(m1, m2, section) self.diffs[section] = changes def __str__(self): s = '--- {}\n+++ {}\n\n'.format(self.m1.inp.path, self.m2.inp.path) diffs = '\n\n'.join(['{}\n{}'.format(sect, d.__str__()) for sect, d in self.diffs.items()]) return s+diffs
[docs]def create_inp_build_instructions(inpA, inpB, path, filename, comments=''): """ pass in two inp file paths and produce a spreadsheet showing the differences found in each of the INP sections. These differences should then be used whenever we need to rebuild this model from the baseline reference model. Note: this should be split into a func that creates a overall model "diff" that can then be written as a BI file or used programmatically """ allsections_a = get_inp_sections_details(inpA) modela = swmmio.Model(inpA) modelb = swmmio.Model(inpB) # create build insructions folder if not os.path.exists(path): os.makedirs(path) filepath = os.path.join(path, filename) + '.txt' problem_sections = ['TITLE', 'CURVES', 'TIMESERIES', 'RDII', 'HYDROGRAPHS'] with open(filepath, 'w') as newf: # write meta data metadata = { # 'Baseline Model':modela.inp.path, # 'ID':filename, 'Parent Models': { 'Baseline': {inpA: vc_utils.modification_date(inpA)}, 'Alternatives': {inpB: vc_utils.modification_date(inpB)} }, 'Log': {filename: comments} } # print metadata vc_utils.write_meta_data(newf, metadata) for section, _ in allsections_a.items(): if section not in problem_sections: # calculate the changes in the current section changes = INPSectionDiff(modela, modelb, section) data = pd.concat([changes.removed, changes.added, changes.altered], axis=0, sort=False) # vc_utils.write_excel_inp_section(excelwriter, allsections_a, section, data) vc_utils.write_inp_section(newf, allsections_a, section, data, pad_top=False, na_fill='NaN') # na fill fixes SNOWPACK blanks spaces issue return BuildInstructions(filepath)
[docs]def merge_models(inp1, inp2, target='merged_model.inp'): """ Merge two separate swmm models into one model. This creates a diff, ignores removed sections, and uses inp1 settings where conflicts exist (altered sections in diff) :param inp1: swmmio.Model.inp object to be combined with inp2 :param inp2: swmmio.Model.inp object to be combined with inp1 :param target: path of new model :return: path to target """ # model object to store resulting merged model m3 = swmmio.Model(inp1) inp_diff = INPDiff(inp1, inp2) with open(target, 'w') as newf: for section, _ in inp_diff.all_inp_objects.items(): # don't consider the "removed" parts of the diff # print('{}: {}'.format(section,inp_diff.all_inp_objects[section]['columns'])) # check if the section is not in problem_sections and there are changes # in self.instructions and commit changes to it from baseline accordingly col_order = [] if (section not in problem_sections and inp_diff.all_inp_objects[section]['columns'] != ['blob'] and section in inp_diff.diffs): # df of baseline model section basedf = dataframe_from_inp(m3.inp.path, section, additional_cols=[';', 'Comment', 'Origin']) basedf[';'] = ';' col_order = basedf.columns # grab the changes to changes = inp_diff.diffs[section] # remove elements that have alterations keep ones tagged for removal # (unchanged, but not present in m2) remove_ids = changes.altered.index new_section = basedf.drop(remove_ids) # add elements new_section = pd.concat([new_section, changes.altered, changes.added], axis=0, sort=False) else: # section is not well understood or is problematic, just blindly copy new_section = dataframe_from_inp(m3.inp.path, section, additional_cols=[';', 'Comment', 'Origin']) new_section[';'] = ';' # print ('dealing with confusing section: {}\n{}'.format(section, new_section)) # print(new_section.head()) # write the section new_section = new_section[col_order] new_section[';'] = ';' vc_utils.write_inp_section(newf, inp_diff.all_inp_objects, section, new_section, pad_top=True) return target