#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (10/2024)
Download and management utilities for syncing time and auxiliary files
Adds additional modules to the icesat2_toolkit utilities
PYTHON DEPENDENCIES:
lxml: processing XML and HTML in Python
https://pypi.python.org/pypi/lxml
UPDATE HISTORY:
Updated 10/2024: update CMR search utility to replace deprecated scrolling
https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html
Updated 05/2024: added generic querying functions for NASA CMR
added wrapper to importlib for optional dependencies
Updated 11/2023: updated ssl context to fix deprecation error
Updated 05/2023: using pathlib to define and expand paths
Updated 01/2023: add default ssl context attribute with protocol
Updated 12/2022: functions for managing and maintaining git repositories
Updated 05/2022: updated docstrings to numpy documentation format
Updated 03/2021: add data path function for this set of utilities
Written 01/2021
"""
from __future__ import annotations
import sys
import ssl
import re
import json
import pathlib
import inspect
import logging
import warnings
import importlib
import posixpath
import lxml.etree
import subprocess
if sys.version_info[0] == 2:
from cookielib import CookieJar
from urllib import urlencode
import urllib2
else:
from http.cookiejar import CookieJar
from urllib.parse import urlencode
import urllib.request as urllib2
# extend icesat2_toolkit utilities
try:
from icesat2_toolkit.utilities import *
except (AttributeError, ImportError, ModuleNotFoundError) as exc:
warnings.warn("icesat2_toolkit not available", ImportWarning)
# PURPOSE: get absolute path within a package from a relative path
[docs]
def get_data_path(relpath: list | str | pathlib.Path):
"""
Get the absolute path within a package from a relative path
Parameters
----------
relpath: list, str or pathlib.Path
relative path
"""
# current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
filepath = pathlib.Path(filename).absolute().parent
if isinstance(relpath, list):
# use *splat operator to extract from list
return filepath.joinpath(*relpath)
elif isinstance(relpath, (str, pathlib.Path)):
return filepath.joinpath(relpath)
[docs]
def import_dependency(
name: str,
extra: str = "",
raise_exception: bool = False
):
"""
Import an optional dependency
Adapted from ``pandas.compat._optional::import_optional_dependency``
Parameters
----------
name: str
Module name
extra: str, default ""
Additional text to include in the ``ImportError`` message
raise_exception: bool, default False
Raise an ``ImportError`` if the module is not found
Returns
-------
module: obj
Imported module
"""
# check if the module name is a string
msg = f"Invalid module name: '{name}'; must be a string"
assert isinstance(name, str), msg
# default error if module cannot be imported
err = f"Missing optional dependency '{name}'. {extra}"
module = type('module', (), {})
# try to import the module
try:
module = importlib.import_module(name)
except (ImportError, ModuleNotFoundError) as exc:
if raise_exception:
raise ImportError(err) from exc
else:
logging.debug(err)
# return the module
return module
# PURPOSE: get the git hash value
def get_git_revision_hash(
refname: str = 'HEAD',
short: bool = False
):
"""
Get the ``git`` hash value for a particular reference
Parameters
----------
refname: str, default HEAD
Symbolic reference name
short: bool, default False
Return the shorted hash value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'rev-parse']
cmd.append('--short') if short else None
cmd.append(refname)
# get output
with warnings.catch_warnings():
return str(subprocess.check_output(cmd), encoding='utf8').strip()
# PURPOSE: get the current git status
def get_git_status():
"""Get the status of a ``git`` repository as a boolean value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain']
with warnings.catch_warnings():
return bool(subprocess.check_output(cmd))
# PURPOSE: convert file lines to arguments
def convert_arg_line_to_args(arg_line):
"""
Convert file lines to arguments
Parameters
----------
arg_line: str
line string containing a single argument and/or comments
"""
# remove commented lines and after argument comments
for arg in re.sub(r'\#(.*?)$',r'',arg_line).split():
if not arg.strip():
continue
yield arg
[docs]
def _create_default_ssl_context() -> ssl.SSLContext:
"""Creates the default SSL context
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_set_ssl_context_options(context)
context.options |= ssl.OP_NO_COMPRESSION
return context
[docs]
def _create_ssl_context_no_verify() -> ssl.SSLContext:
"""Creates an SSL context for unverified connections
"""
context = _create_default_ssl_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
[docs]
def _set_ssl_context_options(context: ssl.SSLContext) -> None:
"""Sets the default options for the SSL context
"""
if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
# default ssl context
_default_ssl_context = _create_ssl_context_no_verify()
# PURPOSE: list a directory on Polar Geospatial Center https server
[docs]
def pgc_list(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
parser = lxml.etree.HTMLParser(),
format: str = '%Y-%m-%d %H:%M',
pattern: str = '',
sort: bool = False
):
"""
List a directory on Polar Geospatial Center (PGC) servers
Parameters
----------
HOST: str or list
remote https host path
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS)
SSL context for ``urllib`` opener object
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
formatt: str, default '%Y-%m-%d %H:%M'
format for input time string
pattern: str, default ''
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
colnames: list
column names in a directory
collastmod: list
last modification times for items in the directory
colerror: list
notification for list error
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from https
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout, context=context)
except (urllib2.HTTPError, urllib2.URLError):
colerror = 'List error from {0}'.format(posixpath.join(*HOST))
return (False, False, colerror)
else:
# read and parse request for files (column names and modified times)
tree = lxml.etree.parse(response, parser)
colnames = [i.replace(posixpath.sep,'')
for i in tree.xpath('//tr/td[not(@*)]//a/@href')]
# get the Unix timestamp value for a modification time
lastmod = [get_unix_time(i,format=format)
for i in tree.xpath('//tr/td[@align="right"][1]/text()')]
# reduce using regular expression pattern
if pattern:
i = [i for i,f in enumerate(colnames) if re.search(pattern,f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
lastmod = [lastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
lastmod = [lastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames, lastmod, None)
# PURPOSE: filter the CMR json response for desired data files
def cmr_filter_json(
search_results: dict,
endpoint: str = "data",
request_type: str = r"application/x-hdf(eos|5)",
readable_granule_pattern: str = r''
):
"""
Filter the CMR json response for desired data files
Parameters
----------
search_results: dict
json response from CMR query
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/x-hdfeos'
data type for reducing CMR query
readable_granule_pattern: str, default ''
regular expression pattern for reducing list
Returns
-------
producer_granule_ids: list
ICESat-2 granules
granule_urls: list
ICESat-2 granule urls
"""
# output list of granule ids and urls
producer_granule_ids = []
granule_urls = []
# check that there are urls for request
if ('feed' not in search_results) or ('entry' not in search_results['feed']):
return (producer_granule_ids, granule_urls)
# descriptor links for each endpoint
rel = {}
rel['data'] = "http://esipfed.org/ns/fedsearch/1.1/data#"
rel['opendap'] = "http://esipfed.org/ns/fedsearch/1.1/service#"
rel['s3'] = "http://esipfed.org/ns/fedsearch/1.1/s3#"
# iterate over references and get cmr location
for entry in search_results['feed']['entry']:
producer_granule_ids.append(entry['producer_granule_id'])
for link in entry['links']:
# skip links without descriptors
if ('rel' not in link.keys()):
continue
if ('type' not in link.keys()):
continue
# append if selected endpoint and request type
if (link['rel'] == rel[endpoint]) and \
re.match(request_type, link['type']) and \
re.search(readable_granule_pattern, link['href']):
granule_urls.append(link['href'])
break
# return the list of urls and granule ids
return (producer_granule_ids, granule_urls)
# PURPOSE: cmr queries for orbital parameters
[docs]
def cmr(
product: str | None = None,
release: str | None = None,
bbox: list | None = None,
start_date: str | None = None,
end_date: str | None = None,
provider: str = 'NSIDC_ECS',
endpoint: str = 'data',
readable_granule_name: str | list = [],
readable_granule_pattern: str = r'',
request_type: str = r"application/x-hdf(eos|5)",
opener = None,
verbose: bool = False,
fid = sys.stdout
):
"""
Query the NASA Common Metadata Repository (CMR)
Parameters
----------
product: str or NoneType, default None
Data product to query
release: str or NoneType, default None
Data release to query
bbox: list or NoneType, default None
Spatial bounding box for CMR query in form
(``lon_min``, ``lat_min``, ``lon_max``, ``lat_max``)
start_date: str or NoneType, default None
starting date for CMR product query
end_date: str or NoneType, default None
ending date for CMR product query
provider: str, default 'NSIDC_ECS'
CMR data provider
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
readable_granule_name: str or list, default []
readable granule name(s) to query from CMR
readable_granule_pattern: str, default ''
regular expression pattern for reducing list
request_type: str, default 'application/x-hdfeos'
data type for reducing CMR query
opener: obj or NoneType, default None
OpenerDirector instance
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
Returns
-------
producer_granule_ids: list
Data granules
granule_urls: list
Data granule urls
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# attempt to build urllib2 opener
if opener is None:
# build urllib2 opener with SSL context
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# Create cookie jar for storing cookies
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
handler.append(urllib2.HTTPSHandler(context=_default_ssl_context))
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# build CMR query
cmr_query_type = 'granules'
cmr_format = 'json'
cmr_page_size = 2000
CMR_HOST = ['https://cmr.earthdata.nasa.gov','search',
f'{cmr_query_type}.{cmr_format}']
# build list of CMR query parameters
CMR_KEYS = []
CMR_KEYS.append(f'?provider={provider}')
CMR_KEYS.append('&sort_key[]=start_date')
CMR_KEYS.append('&sort_key[]=producer_granule_id')
CMR_KEYS.append(f'&page_size={cmr_page_size}')
# append product string
CMR_KEYS.append(f'&short_name={product}')
# append release strings
if release is not None:
CMR_KEYS.append(cmr_query_release(release))
# append keys for start and end time
# verify that start and end times are in ISO format
start_date = isoformat(start_date) if start_date else ''
end_date = isoformat(end_date) if end_date else ''
CMR_KEYS.append(f'&temporal={start_date},{end_date}')
# append keys for spatial bounding box
if bbox is not None:
bounding_box = ','.join([str(b) for b in bbox])
CMR_KEYS.append(f'&bounding_box={bounding_box}')
# verify that readable_granule_name is a list
if isinstance(readable_granule_name, str):
readable_granule_name = [readable_granule_name]
# append keys for querying specific granules
if any(readable_granule_name):
CMR_KEYS.append("&options[readable_granule_name][pattern]=true")
CMR_KEYS.append("&options[spatial][or]=true")
for gran in readable_granule_name:
CMR_KEYS.append(f"&readable_granule_name[]={gran}")
# full CMR query url
cmr_query_url = "".join([posixpath.join(*CMR_HOST),*CMR_KEYS])
logging.info(f'CMR request={cmr_query_url}')
# output list of granule names and urls
producer_granule_ids = []
granule_urls = []
cmr_search_after = None
while True:
req = urllib2.Request(cmr_query_url)
# add CMR search after header
if cmr_search_after:
req.add_header('CMR-Search-After', cmr_search_after)
logging.debug(f'CMR-Search-After: {cmr_search_after}')
response = opener.open(req)
# get search after index for next iteration
headers = {k.lower():v for k,v in dict(response.info()).items()}
cmr_search_after = headers.get('cmr-search-after')
# read the CMR search as JSON
search_page = json.loads(response.read().decode('utf-8'))
ids,urls = cmr_filter_json(search_page,
endpoint=endpoint, request_type=request_type,
readable_granule_pattern=readable_granule_pattern)
if not urls or cmr_search_after is None:
break
# extend lists
producer_granule_ids.extend(ids)
granule_urls.extend(urls)
# return the list of granule ids and urls
return (producer_granule_ids, granule_urls)