Source code for crunchflow.output.timeseries

"""A module for loading and plotting CrunchFlow time series output files."""

import os
import re

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


[docs] def get_ts_coords(tsfile): """Given a CrunchFlow time series ouput file, return the coordinate at which that time series was output. Parameters ---------- tsfile : str filename containing timeseries output Returns ------- coords : tuple of int Coordinates of the form (x, y, z) """ # Open the file and read in the first line with open(tsfile) as f: firstline = f.readline() if "Flux weighted" in firstline: # First split on colon coord_str = firstline.split(":")[-1] # Then split on both space and hyphen. Sometimes CrunchFlow outputs # these coordinates as "1-100" and sometimes as "1- 100" raw_coord_list = re.split(r"[-\s]", coord_str) # Remove empty strings coord_list = [x for x in raw_coord_list if x] if len(coord_list) == 6: coords = ( "%s-%s" % (coord_list[0], coord_list[1]), "%s-%s" % (coord_list[2], coord_list[3]), "%s-%s" % (coord_list[4], coord_list[5]), ) else: coords = coord_str else: # Final 3 fields are x, y, and z fields = firstline.split()[-3:] x = int(fields[0]) y = int(fields[1]) z = int(fields[2]) # Return tuple coords = (x, y, z) return coords
[docs] def get_ts_duplicates(tsfile): """Find duplicate columns in a time series file. Useful when a user specifies `time_series_print all` in CrunchFlow; the time series file includes primary species printed twice. Parameters ---------- tsfile : str path to the time series file Returns ------- columns : list list of column headings without duplicates nondup_indices : list list of column indices without duplicates """ # Open the file and read in the second line with open(tsfile) as f: for i, line in enumerate(f): if i == 1: columns = line.split() # Get list of duplicates and how many times each is seen seen = {} # Count occurrences of each item duplicates = [] # List of duplicate items dup_indices = [] # List of first occurrences of each duplicate item for col in columns: if col not in seen: seen[col] = 1 else: if seen[col] == 1: duplicates.append(col) idx = columns.index(col) dup_indices.append(idx) seen[col] += 1 # Do not go through if seen[col] == 1 again # Total number of columns in the file, including duplicates totcols = len(columns) nondup_indices = list(range(totcols)) # List of non-duplicated indices # Delete each duplicate, but reverse sort to avoid throwing off indices # after deleting earlier elements for idx in sorted(dup_indices, reverse=True): del columns[idx] del nondup_indices[idx] return columns, nondup_indices
class TimeSeries: """The timeseries class for working with CrunchFlow time series output files. Attributes ---------- coords : tuple of int x, y and z coordinates of the time series timeunit : str time unit used in the CrunchFlow input unit : str Concentration units included in the file. Automatically set to the default CrunchFlow concentration units (mol/kgw) species : list of str list of aqueous species in the file data : ndarray of float Numpy array of all data. First col is the time step and remaining cols are species in the same order as self.species list df : dataframe of float Pandas dataframe of all data. Index is the time step and columns are the aqueous species Methods ------- convert_mgL(database='datacom.dbs', folder='.') Convert time series concentrations from mol/kgw to mg/L (ppm). plot(species, units='mg/L', **kwargs) Plot the time series of one or more species. Examples -------- >>> ts = TimeSeries('Well1-1.txt') >>> ts.convert_mgL() >>> calcium = ts.df['Ca++'] >>> ts.plot('Ca++') """ def __init__(self, tsfile, folder="."): """Read in and get basic info about the timeseries file `tsfile`. Parameters ---------- tsfile : str Name of the CrunchFlow time series file folder : str Path to the CrunchFlow time series file """ tsfilepath = os.path.join(folder, tsfile) # Get coordinates at which time series was output self.coords = get_ts_coords(tsfilepath) # Get list of duplicates and their indices self.columns, indices = get_ts_duplicates(tsfilepath) # Assume that, if there are duplicates, it's because user # specified `time_series_print all` in CrunchFlow, in which # case non-duplicate columns are printed in log format ncols = len(self.columns) lastcol = max(indices) + 1 # If idx of the last column is greater than the # cols, # duplicates were deleted, so set logformat = True if lastcol > ncols: logformat = True else: logformat = False # List of species (columns without the time column) self.species = self.columns[1:] # Set time and concentration units t = self.columns[0] self.timeunit = t[t.find("(") + 1 : t.find(")")] self.unit = "mol/L" # Load data into numpy array self.data = np.genfromtxt( tsfilepath, skip_header=2, usecols=indices, missing_values=["Infinity", "NaN", "-Infinity"] ) # If necessary, convert from log to real format if logformat: if self.columns[1] != "pH": self.data[:, 1:] = 10 ** self.data[:, 1:] else: # Skip first two cols, which are time and pH self.data[:, 2:] = 10 ** self.data[:, 2:] # Load into a pandas dataframe as well self.df = pd.DataFrame(data=self.data[:, 1:], index=self.data[:, 0], columns=self.species) self.df.index.name = "time" def convert_mgL(self, database="datacom.dbs", folder=".", warnings=True): """Convert time series concentrations from mol/kgw to mg/L (ppm). Note that this assumes that 1 kg water = 1 L water. Parameters ---------- database : str name of the CrunchFlow database. The default is 'datacom.dbs' folder : str path to the database. The default is current directory. warnings : bool whether to print warnings for species not found in the database. Returns ------- None. Modifies timeseries object in place. """ databasepath = os.path.join(folder, database) # If units are already mg/L, no need to do anything if self.unit == "mg/L": return # Check if database exists if not os.path.exists(databasepath): raise OSError("Could not find " + databasepath) molar_mass = {} # Open the database and get the molar mass of each species with open(databasepath) as db: for line in db: # Skip blank lines if not line.strip(): continue for spec in self.species: # Database format is, e.g., "'Ca++' 6.0 2.0 40.0780", # where the last value is the molar mass if line.split()[0] == "'{}'".format(spec): molar_mass[spec] = float(line.split()[-1]) # Delete keys with molar masses of 0 (e.g., tracers) # and do not convert them to mg/L del_keys = [] for key, value in molar_mass.items(): if value == 0: del_keys.append(key) # Cannot delete key within loop, otherwise # it changes size on each iteration for key in del_keys: del molar_mass[key] for spec in self.species: if spec not in molar_mass.keys(): if warnings: print("Warning -- Did not convert {} to mg/L".format(spec)) else: idx = self.columns.index(spec) # Only need to convert .data since .data and .df are linked self.data[:, idx] = self.data[:, idx] * molar_mass[spec] * 1000 # Update the unit attribute self.unit = "mg/L" def plot(self, species, units="mg/L", **kwargs): """Plot the time series of one or more species. Parameters ---------- species : str or list of str Either single species or list of species to be plotted units : str Concentration units to use for plotting. The default is 'mg/L' **kwargs : dict keyword arguments passed to plt.subplots (e.g., figsize) Returns ------- fig : pyplot object figure handle for current plot ax : pyplot object axis handle for current plot """ if units == "mg/L" and self.unit != "mg/L": # Raise error if cannot find datacom.dbs if not os.path.exists("./datacom.dbs"): raise OSError( "Could not find default database. \ Plot with other units or convert to mg/L first using the \ convert_mgL method. See convert_mgL.__doc__ for more info." ) self.convert_mgL() # Accept both str and list input, so if str, convert to list if isinstance(species, str): species = [species] fig, ax = plt.subplots(**kwargs) for spec in species: ax.plot(self.df.index, self.df[spec], label=spec) ax.set(xlabel="Time ({})".format(self.timeunit), ylabel="Concentration ({})".format(units)) ax.legend() return fig, ax if __name__ == "__main__": print(TimeSeries.__doc__)