Source code for grounding_zones.utilities

#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (10/2024)
Download and management utilities for syncing time and auxiliary files
Adds additional modules to the icesat2_toolkit utilities

PYTHON DEPENDENCIES:
    lxml: processing XML and HTML in Python
        https://pypi.python.org/pypi/lxml

UPDATE HISTORY:
    Updated 10/2024: update CMR search utility to replace deprecated scrolling
        https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html
    Updated 05/2024: added generic querying functions for NASA CMR
        added wrapper to importlib for optional dependencies
    Updated 11/2023: updated ssl context to fix deprecation error
    Updated 05/2023: using pathlib to define and expand paths
    Updated 01/2023: add default ssl context attribute with protocol
    Updated 12/2022: functions for managing and maintaining git repositories
    Updated 05/2022: updated docstrings to numpy documentation format
    Updated 03/2021: add data path function for this set of utilities
    Written 01/2021
"""
from __future__ import annotations

import sys
import ssl
import re
import json
import pathlib
import inspect
import logging
import warnings
import importlib
import posixpath
import lxml.etree
import subprocess
if sys.version_info[0] == 2:
    from cookielib import CookieJar
    from urllib import urlencode
    import urllib2
else:
    from http.cookiejar import CookieJar
    from urllib.parse import urlencode
    import urllib.request as urllib2

# extend icesat2_toolkit utilities
try:
    from icesat2_toolkit.utilities import *
except (AttributeError, ImportError, ModuleNotFoundError) as exc:
    warnings.warn("icesat2_toolkit not available", ImportWarning)

# PURPOSE: get absolute path within a package from a relative path
[docs] def get_data_path(relpath: list | str | pathlib.Path): """ Get the absolute path within a package from a relative path Parameters ---------- relpath: list, str or pathlib.Path relative path """ # current file path filename = inspect.getframeinfo(inspect.currentframe()).filename filepath = pathlib.Path(filename).absolute().parent if isinstance(relpath, list): # use *splat operator to extract from list return filepath.joinpath(*relpath) elif isinstance(relpath, (str, pathlib.Path)): return filepath.joinpath(relpath)
[docs] def import_dependency( name: str, extra: str = "", raise_exception: bool = False ): """ Import an optional dependency Adapted from ``pandas.compat._optional::import_optional_dependency`` Parameters ---------- name: str Module name extra: str, default "" Additional text to include in the ``ImportError`` message raise_exception: bool, default False Raise an ``ImportError`` if the module is not found Returns ------- module: obj Imported module """ # check if the module name is a string msg = f"Invalid module name: '{name}'; must be a string" assert isinstance(name, str), msg # default error if module cannot be imported err = f"Missing optional dependency '{name}'. {extra}" module = type('module', (), {}) # try to import the module try: module = importlib.import_module(name) except (ImportError, ModuleNotFoundError) as exc: if raise_exception: raise ImportError(err) from exc else: logging.debug(err) # return the module return module
# PURPOSE: get the git hash value def get_git_revision_hash( refname: str = 'HEAD', short: bool = False ): """ Get the ``git`` hash value for a particular reference Parameters ---------- refname: str, default HEAD Symbolic reference name short: bool, default False Return the shorted hash value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = pathlib.Path(filename).absolute().parent.parent gitpath = basepath.joinpath('.git') # build command cmd = ['git', f'--git-dir={gitpath}', 'rev-parse'] cmd.append('--short') if short else None cmd.append(refname) # get output with warnings.catch_warnings(): return str(subprocess.check_output(cmd), encoding='utf8').strip() # PURPOSE: get the current git status def get_git_status(): """Get the status of a ``git`` repository as a boolean value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = pathlib.Path(filename).absolute().parent.parent gitpath = basepath.joinpath('.git') # build command cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain'] with warnings.catch_warnings(): return bool(subprocess.check_output(cmd)) # PURPOSE: convert file lines to arguments def convert_arg_line_to_args(arg_line): """ Convert file lines to arguments Parameters ---------- arg_line: str line string containing a single argument and/or comments """ # remove commented lines and after argument comments for arg in re.sub(r'\#(.*?)$',r'',arg_line).split(): if not arg.strip(): continue yield arg
[docs] def _create_default_ssl_context() -> ssl.SSLContext: """Creates the default SSL context """ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) _set_ssl_context_options(context) context.options |= ssl.OP_NO_COMPRESSION return context
[docs] def _create_ssl_context_no_verify() -> ssl.SSLContext: """Creates an SSL context for unverified connections """ context = _create_default_ssl_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context
[docs] def _set_ssl_context_options(context: ssl.SSLContext) -> None: """Sets the default options for the SSL context """ if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7): context.minimum_version = ssl.TLSVersion.TLSv1_2 else: context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 context.options |= ssl.OP_NO_TLSv1 context.options |= ssl.OP_NO_TLSv1_1
# default ssl context _default_ssl_context = _create_ssl_context_no_verify() # PURPOSE: list a directory on Polar Geospatial Center https server
[docs] def pgc_list( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, parser = lxml.etree.HTMLParser(), format: str = '%Y-%m-%d %H:%M', pattern: str = '', sort: bool = False ): """ List a directory on Polar Geospatial Center (PGC) servers Parameters ---------- HOST: str or list remote https host path timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS) SSL context for ``urllib`` opener object parser: obj, default lxml.etree.HTMLParser() HTML parser for ``lxml`` formatt: str, default '%Y-%m-%d %H:%M' format for input time string pattern: str, default '' regular expression pattern for reducing list sort: bool, default False sort output list Returns ------- colnames: list column names in a directory collastmod: list last modification times for items in the directory colerror: list notification for list error """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from https try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) response = urllib2.urlopen(request, timeout=timeout, context=context) except (urllib2.HTTPError, urllib2.URLError): colerror = 'List error from {0}'.format(posixpath.join(*HOST)) return (False, False, colerror) else: # read and parse request for files (column names and modified times) tree = lxml.etree.parse(response, parser) colnames = [i.replace(posixpath.sep,'') for i in tree.xpath('//tr/td[not(@*)]//a/@href')] # get the Unix timestamp value for a modification time lastmod = [get_unix_time(i,format=format) for i in tree.xpath('//tr/td[@align="right"][1]/text()')] # reduce using regular expression pattern if pattern: i = [i for i,f in enumerate(colnames) if re.search(pattern,f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] lastmod = [lastmod[indice] for indice in i] # sort the list if sort: i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] lastmod = [lastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, lastmod, None)
# PURPOSE: filter the CMR json response for desired data files def cmr_filter_json( search_results: dict, endpoint: str = "data", request_type: str = r"application/x-hdf(eos|5)", readable_granule_pattern: str = r'' ): """ Filter the CMR json response for desired data files Parameters ---------- search_results: dict json response from CMR query endpoint: str, default 'data' url endpoint type - ``'data'``: NASA Earthdata https archive - ``'opendap'``: NASA Earthdata OPeNDAP archive - ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket request_type: str, default 'application/x-hdfeos' data type for reducing CMR query readable_granule_pattern: str, default '' regular expression pattern for reducing list Returns ------- producer_granule_ids: list ICESat-2 granules granule_urls: list ICESat-2 granule urls """ # output list of granule ids and urls producer_granule_ids = [] granule_urls = [] # check that there are urls for request if ('feed' not in search_results) or ('entry' not in search_results['feed']): return (producer_granule_ids, granule_urls) # descriptor links for each endpoint rel = {} rel['data'] = "http://esipfed.org/ns/fedsearch/1.1/data#" rel['opendap'] = "http://esipfed.org/ns/fedsearch/1.1/service#" rel['s3'] = "http://esipfed.org/ns/fedsearch/1.1/s3#" # iterate over references and get cmr location for entry in search_results['feed']['entry']: producer_granule_ids.append(entry['producer_granule_id']) for link in entry['links']: # skip links without descriptors if ('rel' not in link.keys()): continue if ('type' not in link.keys()): continue # append if selected endpoint and request type if (link['rel'] == rel[endpoint]) and \ re.match(request_type, link['type']) and \ re.search(readable_granule_pattern, link['href']): granule_urls.append(link['href']) break # return the list of urls and granule ids return (producer_granule_ids, granule_urls) # PURPOSE: cmr queries for orbital parameters
[docs] def cmr( product: str | None = None, release: str | None = None, bbox: list | None = None, start_date: str | None = None, end_date: str | None = None, provider: str = 'NSIDC_ECS', endpoint: str = 'data', readable_granule_name: str | list = [], readable_granule_pattern: str = r'', request_type: str = r"application/x-hdf(eos|5)", opener = None, verbose: bool = False, fid = sys.stdout ): """ Query the NASA Common Metadata Repository (CMR) Parameters ---------- product: str or NoneType, default None Data product to query release: str or NoneType, default None Data release to query bbox: list or NoneType, default None Spatial bounding box for CMR query in form (``lon_min``, ``lat_min``, ``lon_max``, ``lat_max``) start_date: str or NoneType, default None starting date for CMR product query end_date: str or NoneType, default None ending date for CMR product query provider: str, default 'NSIDC_ECS' CMR data provider endpoint: str, default 'data' url endpoint type - ``'data'``: NASA Earthdata https archive - ``'opendap'``: NASA Earthdata OPeNDAP archive - ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket readable_granule_name: str or list, default [] readable granule name(s) to query from CMR readable_granule_pattern: str, default '' regular expression pattern for reducing list request_type: str, default 'application/x-hdfeos' data type for reducing CMR query opener: obj or NoneType, default None OpenerDirector instance verbose: bool, default False print file transfer information fid: obj, default sys.stdout open file object to print if verbose Returns ------- producer_granule_ids: list Data granules granule_urls: list Data granule urls """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # attempt to build urllib2 opener if opener is None: # build urllib2 opener with SSL context # https://docs.python.org/3/howto/urllib2.html#id5 handler = [] # Create cookie jar for storing cookies cookie_jar = CookieJar() handler.append(urllib2.HTTPCookieProcessor(cookie_jar)) handler.append(urllib2.HTTPSHandler(context=_default_ssl_context)) # create "opener" (OpenerDirector instance) opener = urllib2.build_opener(*handler) # build CMR query cmr_query_type = 'granules' cmr_format = 'json' cmr_page_size = 2000 CMR_HOST = ['https://cmr.earthdata.nasa.gov','search', f'{cmr_query_type}.{cmr_format}'] # build list of CMR query parameters CMR_KEYS = [] CMR_KEYS.append(f'?provider={provider}') CMR_KEYS.append('&sort_key[]=start_date') CMR_KEYS.append('&sort_key[]=producer_granule_id') CMR_KEYS.append(f'&page_size={cmr_page_size}') # append product string CMR_KEYS.append(f'&short_name={product}') # append release strings if release is not None: CMR_KEYS.append(cmr_query_release(release)) # append keys for start and end time # verify that start and end times are in ISO format start_date = isoformat(start_date) if start_date else '' end_date = isoformat(end_date) if end_date else '' CMR_KEYS.append(f'&temporal={start_date},{end_date}') # append keys for spatial bounding box if bbox is not None: bounding_box = ','.join([str(b) for b in bbox]) CMR_KEYS.append(f'&bounding_box={bounding_box}') # verify that readable_granule_name is a list if isinstance(readable_granule_name, str): readable_granule_name = [readable_granule_name] # append keys for querying specific granules if any(readable_granule_name): CMR_KEYS.append("&options[readable_granule_name][pattern]=true") CMR_KEYS.append("&options[spatial][or]=true") for gran in readable_granule_name: CMR_KEYS.append(f"&readable_granule_name[]={gran}") # full CMR query url cmr_query_url = "".join([posixpath.join(*CMR_HOST),*CMR_KEYS]) logging.info(f'CMR request={cmr_query_url}') # output list of granule names and urls producer_granule_ids = [] granule_urls = [] cmr_search_after = None while True: req = urllib2.Request(cmr_query_url) # add CMR search after header if cmr_search_after: req.add_header('CMR-Search-After', cmr_search_after) logging.debug(f'CMR-Search-After: {cmr_search_after}') response = opener.open(req) # get search after index for next iteration headers = {k.lower():v for k,v in dict(response.info()).items()} cmr_search_after = headers.get('cmr-search-after') # read the CMR search as JSON search_page = json.loads(response.read().decode('utf-8')) ids,urls = cmr_filter_json(search_page, endpoint=endpoint, request_type=request_type, readable_granule_pattern=readable_granule_pattern) if not urls or cmr_search_after is None: break # extend lists producer_granule_ids.extend(ids) granule_urls.extend(urls) # return the list of granule ids and urls return (producer_granule_ids, granule_urls)