Source code for jsmetrics.utils.data_utils

# -*- coding: utf-8 -*-

"""
Various utility functions needed for the jet-stream metrics and algorithms not belonging to windspeed or spatial utils.
The module is built from xarray data-structures, so this file contains all the stuff that helps the library handle xarray dataset/dataarrays

Classes and Functions ordered alphabetically
"""

# imports
import cftime
import itertools
import numpy as np
import xarray as xr
import scipy.signal

# docs
__author__ = "Thomas Keel"
__email__ = "thomasjames.keel@gmail.com"
__status__ = "Development"


[docs] def add_num_of_days_to_360Datetime(datetime_360day, num_of_days_to_add): """ Adds a number of days to cftime.Datetime360Day format Parameters ---------- datetime_360day : cftime.Datetime360Day Date to add days to num_of_days_to_add : int or float Number of days to add to 360 day format Returns ---------- new_360day_date : cftime.Datetime360Day Date with added number of days Raises ---------- AssertionError : If not '360_day' format or not a cftime.Datetime object ValueError : If number of days not more than 0 """ assert hasattr( datetime_360day, "calendar" ), "date type inputted is not in cftime format." assert ( getattr(datetime_360day, "calendar") == "360_day" ), "input date is not '360_day' format." if num_of_days_to_add <= 0: raise ValueError("Number of days to add to date is not more than 0") new_day = ((datetime_360day.day + num_of_days_to_add) - 30) % 30 if new_day == 0: new_day = 30 num_of_months_to_add = (datetime_360day.day + num_of_days_to_add) / 30 if num_of_months_to_add % 1 == 0: num_of_months_to_add = num_of_months_to_add - 1 else: num_of_months_to_add = np.floor(num_of_months_to_add) new_month = ((datetime_360day.month + num_of_months_to_add) - 12) % 12 if new_month == 0: new_month = 12 num_of_years_to_add = (datetime_360day.month + num_of_months_to_add) / 12 if num_of_years_to_add % 1 == 0: num_of_years_to_add = num_of_years_to_add - 1 else: num_of_years_to_add = np.floor(num_of_years_to_add) new_year = datetime_360day.year + num_of_years_to_add new_360day_date = cftime.Datetime360Day( day=new_day, month=new_month, year=new_year, hour=datetime_360day.hour, minute=datetime_360day.minute, second=datetime_360day.second, ) return new_360day_date
[docs] def add_num_of_days_to_NoLeapDatetime(datetime_noleap, num_of_days_to_add): """ Adds a number of days to cftime.DatetimeNoLeap format Parameters ---------- datetime_noleap : cftime.DatetimeNoLeap Date to add days to num_of_days_to_add : int or float Number of days to add to noleap day format Returns ---------- new_noleap_date : cftime.DatetimeNoLeap Date with added number of days Raises ---------- AssertionError : If not 'noleap' format or not a cftime.Datetime object ValueError : If number of days not more than 0 """ assert hasattr( datetime_noleap, "calendar" ), "date type inputted is not in cftime format." assert ( getattr(datetime_noleap, "calendar") == "noleap" ), "input date is not 'no_leap' format." if num_of_days_to_add <= 0: raise ValueError("Number of days to add to date is not more than 0") return cftime.DatetimeNoLeap.fromordinal( cftime.DatetimeNoLeap.toordinal(datetime_noleap) + num_of_days_to_add, calendar="noleap", has_year_zero=True, )
[docs] def add_pad_to_array(arr, pad_width=1, constant_values=0): """ Add a edge of constant values around a numpy array Parameters ---------- arr : array-like A 2-D array with numeric values (i.e. dtypes float or int) pad_width : int Width of values to add to edges (default: 1) constant_values : int Value to add to edges (default: 0) Returns ---------- padded_arr : array-like A 2-D array with new dimensions as a pad has been added at the edges """ padded_arr = np.pad( arr, pad_width, mode="constant", constant_values=constant_values ) return padded_arr
[docs] def check_at_least_n_plevs_in_data(data, n_plevs): """ Checks there are at least two pressure-levels (plevs) in xarray dataset Parameters ---------- data : xarray.Dataset Data to check Raises ---------- ValueError : If not two pressure levels """ if data["plev"].count() < n_plevs: raise ValueError( "Need at least %s pressure levels (plevs) for calculation" % (n_plevs) )
[docs] def check_coords_in_data(data, req_coords): """ Will check if a given coordinates are in the data and raises a key error if any of them are not. This function is needed to check before algorithm continues and uses too much memory. Built from xarray. Parameters ---------- data : xarray.Dataset Data to check req_coords : array-like or tuple Coordinates to check if in data Raises ---------- KeyError : If any given coord is not in data """ for coord in req_coords: if coord not in data.coords: raise KeyError("'%s' is not in the data" % (coord,))
[docs] def check_if_data_is_xarray_datatype(data): """ What it says on the tin. Parameters ---------- data : xarray.Dataset Data to check Raises ---------- TypeError : If input is not an xarray data type """ if not isinstance(data, xr.Dataset) or isinstance(data, xr.DataArray): raise TypeError("input needs to be xarray.DataSet or xarray.DataArray")
[docs] def check_plev_units(data, expected_plev_units): """ Will check that pressure level units are in data plev coord (i.e. Pa, hPa, mbar, millibars). Parameters ---------- data : xr.DataArray or xr.Dataset Data containing plev coord and units for plev expected_plev_units : list List of names of plev units (i.e. mbar, Pa, hPa, etc.) Returns ---------- units : str Units of plev coord Raises ---------- KeyError If plev is not a coord in input 'data'. And if the units of plev are not in expected plev units """ if "plev" not in data.coords: raise KeyError( "Please provide a plev coordinate for data to determine plev units" ) if not hasattr(data["plev"], "units"): raise KeyError( f'You will need assign units (i.e {expected_plev_units}) to plev to run this method e.g. data["plev"] = data["plev"].assign_attrs(units="Pa")' ) if data["plev"].units not in expected_plev_units: raise KeyError( f'plev unit: {data["plev"].units} is not in {expected_plev_units}. You will need assign units to plev to run this method e.g. data["plev"] = data["plev"].assign_attrs(units="Pa")' ) else: return data["plev"].units
[docs] def check_variables_in_data(data, req_variables): """ What it says on the tin. Built from xarray Parameters ---------- data : xarray.Dataset Data to check req_variables : array-like Variables needed in data Raises ---------- KeyError : If variables not in data """ for var in req_variables: if var not in data.variables: raise KeyError("'%s' is not the data" % (var,))
[docs] def find_nearest_value_to_array(array, value): """ Will find the nearest value to a given array Built for use with xarray adapted from: https://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array Parameters ---------- array : array-like Array to reference value : numeric-like Number to check nearest value in input array Returns ---------- output : numeric Closest value to input value in input array """ array = np.asarray(array) idx = np.argmin(np.abs(array - value)) return array[idx]
[docs] def get_local_maxima(arr, axis=0): """ Uses scipy.signal.argrelextrema to get index location of minimum value in array Taken from https://stackoverflow.com/questions/4624970/finding-local-maxima-minima-with-numpy-in-a-1d-numpy-array Parameters ---------- arr : array-like array to find index location of minima value from axis : int axis for scipy.signal.argrelextrema """ return scipy.signal.argrelextrema(arr, np.greater, axis=axis)
[docs] def get_local_minima(arr, axis=0): """ Uses scipy.signal.argrelextrema to get index location of maximum value in array Taken from https://stackoverflow.com/questions/4624970/finding-local-maxima-minima-with-numpy-in-a-1d-numpy-array Parameters ---------- arr : array-like array to find index location of maxima value from axis : int axis for scipy.signal.argrelextrema """ return scipy.signal.argrelextrema(arr, np.less, axis=axis)
[docs] def filter_local_extremes_to_min_distance( local_extrema, min_distance_threshold=2 ): """ Filter local extremes (i.e. outputs of minima or maxima from scipy.signal.argrelextrema) so that no neighbours remain in array. Parameters ---------- local_extrema : array-like likely (stacked) outputs of scipy.signal.argrelextrema min_distance_threshold : int Minimum distances between indexes in array (default: 2 indexes) Returns ------- filtered_extrema : array-like (stacked) outputs of scipy.signal.argrelextrema with values less than a min_distance_threshold away removed Examples -------- maxima_indices = np.column_stack(jsmetrics.utils.data_utils.get_local_maxima(current['ws'].data)) filtered_indices = filter_local_extremes_so_no_neighbours(maxima_indices, min_distance_threshold=2) """ filtered_extrema = [] for idx in local_extrema: if not any( np.linalg.norm(idx - existing_maxima) < min_distance_threshold for existing_maxima in filtered_extrema ): filtered_extrema.append(idx) filtered_extrema = np.array(filtered_extrema) return filtered_extrema
[docs] def find_intersection_between_two_array_of_arrays(array1, array2): """ Find the intersection between two arrays of arrays. See examples for example. Parameters ---------- array1 : np.array First array of arrays to compare with array2 array2 : np.array Second array of arrays to compare with array1 Returns ------- intersection : np.array Intesection of the two arrays returning only the arrays within the original two that are in both. Examples -------- array1 = [[1, 2], [3, 4]] array2 = [[1, 2], [4, 3]] find_intersection_between_two_array_of_arrays(array1, array2) # returns [[1, 2]] """ intersection = [] for arr1 in array1: for arr2 in array2: if np.array_equal(arr1, arr2): intersection.append(arr1) break intersection = np.array(intersection) return intersection
[docs] def get_num_of_decimal_places(num): """ Gets number of decimal places in a float Parameters ---------- num : float or int input number to get decimal places from Returns ---------- decimal_places : int number of decimal places """ num = "{:f}".format(num).rstrip("0") decimal_places = num[::-1].find(".") return decimal_places
[docs] def remove_duplicates(arr): """ Removes duplicates from array. From: https://stackoverflow.com/questions/2213923/removing-duplicates-from-a-list-of-lists Parameters ---------- arr : array-like arr to remove duplicates from Returns ---------- arr : array-like arr with no duplicates """ arr.sort() return list(v for v, _ in itertools.groupby(arr))
[docs] def remove_unwanted_coords_from_data( data, wanted_coords, unwanted_coords=(), show_error=False ): """ What it says on the tin. Built from xarray Parameters ---------- data : xarray.Dataset Data to check wanted_coords : array-like or tuple Coords to retain in data unwanted_coords : array-like or tuple Coords to remove from data Raises ---------- ValueError : If coord cannot be removed from data """ dims = set(data.dims) for rem in unwanted_coords: try: dims.remove(rem) except Exception as e: # a little sloppy, but probably okay if show_error: print(e) wanted_coords = set(wanted_coords) difference = dims.difference(set(wanted_coords)) if len(difference) > 0: raise ValueError( "Unwanted coords in data: %s.\ Please subset/remove so that the slice can be 2D." % (difference,) )
[docs] def rescale_lat_resolution(lats, lat_resolution): """ Rescale latitude resolution to input lat resolution in degrees. Component of method from Barnes & Polvani (2013) https://doi.org/10.1175/JCLI-D-12-00536.1 & Grise & Polvani 2014 https://doi.org/10.1002/2013GL058466 & Bracegirdle et al (2018) https://doi.org/10.1175/JCLI-D-17-0320.1 Parameters ---------- lats : xr.DataArray or array-like Array of latitude values lat_resolution : int or float Latitude resolution in degrees Returns ---------- output : numpy.array Rescaled array of latitude values """ return np.arange(min(lats), max(lats) + lat_resolution, lat_resolution)
[docs] def slice_array_by_index_breaks(array_to_slice, index_breaks): """ Will break an array down into segments based on a list of indexes containing information about where to create slices Parameters ---------- array_to_slice : array-like Array to break down based on slice breaks index_breaks : array-like Indexes from which to slice array Returns ---------- output: list Broken down slices of original array_to_slice """ return [ array_to_slice[i:j] for i, j in zip([0] + index_breaks, index_breaks + [None]) ]