Source code for sdss_access.path.path

from __future__ import division, print_function

import os
import re
import requests
import ast
import inspect
import six
import datetime
from glob import glob
from os.path import join, sep
from random import choice, sample
from sdss_access import tree, log, config
from sdss_access import is_posix

pathlib = None
try:
    import pathlib
except ImportError:
    import pathlib2 as pathlib

# try:
#     from ConfigParser import RawConfigParser
# except ImportError:
#     from configparser import RawConfigParser

"""
Module for constructing paths to SDSS files.

Example use case:

    from sdss_access.path import Path
    sdss_path = Path()
    filename = sdss_path.full('photoObj', run=94, rerun='301', camcol=1, field=100)

Depends on the tree product. In particular requires path templates in:
  $TREE_DIR/data/sdss_paths.ini
"""


[docs] def check_public_release(release: str = None, public: bool = False) -> bool: """ Check if a release is public Checks a given release to see if it is public. A release is public if it contains "DR" in the release name, and if todays date is <= the release_date as specified in the Tree. Parameters ---------- release : str The name of the release to check public : bool If True, force the release to be public Returns ------- bool If the release if public Raises ------ AttributeError when tree does not have a valid release date for a DR tree config """ today = datetime.datetime.now().date() release_date = getattr(tree, 'release_date', None) # check if tree has a valid release date attr if release_date is None and "DR" in tree.release: raise AttributeError("Cannot find a valid release date in the sdss-tree product. Try upgrading to min. version 3.1.0.") return ('dr' in release.lower() and release_date <= today) or public
[docs] class BasePath(object): """Class for construction of paths in general. Parameters ---------- release : str The release name, e.g. 'DR15', 'MPL-9'. public : bool If True, uses public urls. Only needed for public data releases. Automatically set to True when release contains "DR". mirror : bool If True, uses the mirror data domain url. Default is False. verbose : bool If True, turns on verbosity. Default is False. force_modules : bool If True, forces svn or github software products to use any existing local Module environment paths, e.g. PLATEDESIGN_DIR preserve_envvars : bool | list Flag(s) to indicate some or all original environment variables to preserve Attributes ---------- templates : dict The set of templates read from the configuration file. """ _netloc = {"dtn": "dtn.sdss.org", "sdss": "data.sdss.org", "sdss5": "data.sdss5.org", "mirror": "data.mirror.sdss.org", "svn": "svn.sdss.org"} _s5cfgs = ['sdss', 'ipl'] # SDSS-V releases start with sdss or ipl. def __init__(self, release=None, public=False, mirror=False, verbose=False, force_modules=None, preserve_envvars=None): # set release self.release = release or os.getenv('TREE_VER', 'sdsswork') self.verbose = verbose self.force_modules = force_modules or config.get('force_modules') self.preserve_envvars = preserve_envvars or config.get('preserve_envvars') # set attributes self._special_fxn_pattern = r"\@\w+[|]" self._compressions = ['.gz', '.bz2', '.zip', '.fz'] self._comp_regex = r'({0})$'.format('|'.join(self._compressions)) # set the path templates from the tree self.templates = tree.paths if self.release: self.replant_tree(release=self.release) # set public and mirror keywords self.public = check_public_release(release=self.release, public=public) self.mirror = mirror # set the server location and remote base self.set_netloc() self.set_remote_base() def __repr__(self): return '<BasePath(release="{0}", public={1}, n_paths={2})'.format(self.release.lower(), self.public, len(self.templates))
[docs] def replant_tree(self, release=None): ''' Replants the tree based on release Resets the path definitions given a specified release Parameters ---------- release : str A release to use when replanting the tree ''' release = release or self.release if release: release = release.lower().replace('-', '') tree.replant_tree(release, preserve_envvars=self.preserve_envvars) self.templates = tree.paths self.release = release
[docs] @staticmethod def get_available_releases(public=None): ''' Get the available releases Parameters: public (bool): If True, only return public data releases ''' return tree.get_available_releases(public=public)
[docs] def lookup_keys(self, name): ''' Lookup the keyword arguments needed for a given path name Parameters: name (str): The name of the path Returns: A list of keywords needed for filepath generation ''' assert name, 'Must specify a path name' assert name in self.templates.keys(), '{0} must be defined in the path templates'.format(name) # find all words inside brackets keys = list(set(re.findall(r'{(.*?)}', self.templates[name]))) # lookup any keys referenced inside special functions skeys = self._check_special_kwargs(name) keys.extend(skeys) # remove any duplicates keys = list(set(keys)) # remove the type : descriptor keys = [k.split(':')[0] for k in keys] return keys
def _check_special_kwargs(self, name): ''' check special functions for kwargs Checks the content of the special functions (%methodname) for any keyword arguments referenced within Parameters: name (str): A path key name Returns: A list of keyword arguments found in any special functions ''' keys = [] # find any %method names in the template string functions = re.findall(self._special_fxn_pattern, self.templates[name]) if not functions: return keys # loop over special method names and extract keywords for function in functions: method = getattr(self, function[1:-1]) # get source code of special method source = self._find_source(method) # matches on kwargs.get("xxx"), kwargs.get("xxx", None), or kwargs["xxx"] # on either single or double quoted string, with or without a default value patt = r"kwargs.get\(\W(\w+)\W,*\s*.*\)|kwargs\[\W(\w+)\W\]" fkeys = re.findall(patt, source) if fkeys: # condense gorups down to proper string list fkeys = [str(i) for k in fkeys for i in k if i] keys.extend(fkeys) return keys @staticmethod def _find_source(method): ''' find source code of a given method Find and extract the source code of a given method in a module. Uses inspect.findsource to get all source code and performs some selection magic to identify method source code. Doing it this way because inspect.getsource returns wrong method. Parameters: method (obj): A method object Returns: A string containing the source code of a given method Example: >>> from sdss_access.path import Path >>> path = Path() >>> path._find_source(path.full) ''' # get source code lines of entire module method is in source = inspect.findsource(method) is_method = inspect.ismethod(method) # create single source code string source_str = '\n'.join(source[0]) # define search pattern if is_method: pattern = r'def\s{0}\(self'.format(method.__name__) # search for pattern within the string start = re.search(pattern, source_str) if start: # find start and end positions of source code startpos = start.start() endpos = source_str.find('def ', startpos + 1) code = source_str[startpos:endpos] else: code = None return code
[docs] def lookup_names(self): ''' Lookup what path names are available Returns a list of the available path names in sdss_access. Use with lookup_keys to find the required keyword arguments for a given path name. Returns: A list of the available path names. ''' return self.templates.keys()
[docs] def has_name(self, name): ''' Check if a given path name exists in the set of templates Parameters: name (str): The path name to lookup ''' assert isinstance(name, six.string_types), 'name must be a string' return name in self.lookup_names()
[docs] def extract(self, name, example): ''' Extract keywords from an example path Attempts to extract the defined keyword values from an example filepath for a given path name. The filepath must be a full SDSS SAS filepath. Parameters: name (str): The name of the path definition example (str): The absolute filepath to the example file Returns: A dictionary of path keyword values Example: >>> from sdss_access.path import Path >>> path = Path() >>> filepath = '/Users/Brian/Work/sdss/sas/mangawork/manga/spectro/redux/v2_5_3/8485/stack/manga-8485-1901-LOGCUBE.fits' >>> path.extract('mangacube', filepath) >>> {'drpver': 'v2_5_3', 'plate': '8485', 'ifu': '1901', 'wave': 'LOG'} ''' # if pathlib not available do nothing if not pathlib: return None # ensure example is a string if isinstance(example, pathlib.Path): example = str(example) assert isinstance(example, six.string_types), 'example file must be a string' # get the template assert name in self.lookup_names(), '{0} must be a valid template name'.format(name) template = self.templates[name] # expand the environment variable template = _expandvars(template) # handle special functions; perform a drop in replacement if re.match('@spectrodir[|]', template): template = re.sub('@spectrodir[|]', os.environ['BOSS_SPECTRO_REDUX'], template) elif re.search('@platedir[|]', template): template = re.sub('@platedir[|]', r'(.*)/{plateid:0>6}', template) elif re.search('@definitiondir[|]', template): template = re.sub('@definitiondir[|]', '{designid:0>6}', template) elif re.search('@apgprefix[|]', template): template = re.sub('@apgprefix[|]', '{prefix}', template) elif re.search('@healpixgrp[|]', template): template = re.sub('@healpixgrp[|]', '{healpixgrp}', template) elif re.search('@configgrp[|]', template): template = re.sub('@configgrp[|]', '{configgrp}', template) elif re.search('@isplate[|]', template): template = re.sub('@isplate[|]', '{isplate}', template) elif re.search('@pad_fieldid[|]', template): template = re.sub('@pad_fieldid[|]', '{fieldid}', template) if re.search('@plateid6[|]', template): template = re.sub('@plateid6[|]', '{plateid:0>6}', template) if re.search('@component_default[|]', template): template = re.sub('@component_default[|]', '{component_default}', template) if re.search('@cat_id_groups[|]', template): template = re.sub('@cat_id_groups[|]', '{cat_id_groups}', template) if re.search('@sdss_id_groups[|]', template): template = re.sub('@sdss_id_groups[|]', '{sdss_id_groups}', template) # check if template has any brackets haskwargs = re.search('[{}]', template) if not haskwargs: return None # escape the envvar $ and any dots (use re in case of @platedir sub) template = self._remove_compression(template) subtemp = template.replace('$', '\\$') subtemp = re.sub(r'[.](?!\*)', '\\.', subtemp) # define search pattern; replace all template keywords with regex "(.*)" group research = re.sub('{(.*?)}', '(.*?)', subtemp) research += '$' # mark the end of a search string (captures cases when {} at end of string) # look for matches in template and example pmatch = re.search(research, self._remove_compression(template)) tmatch = re.search(research, self._remove_compression(example)) path_dict = {} # if example match extract keys and values from the match groups if tmatch: values = tmatch.groups(0) keys = pmatch.groups(0) assert len(keys) == len(values), 'pattern and template matches must have same length' parts = zip(keys, values) # parse into dictionary for part in parts: value = part[1] if re.findall('{(.*?)}', part[0]): # get the key name inside the brackets keys = re.findall('{(.*?)}', part[0]) # remove the type : designation keys = [k.split(':')[0] for k in keys] # handle double bracket edge cases; remove this when better solution found if len(keys) > 1: if keys[0] == 'dr': # for {dr}{version} drval = re.match('^DR[1-9][0-9]', value).group(0) otherval = value.split(drval)[-1] pdict = {keys[0]: drval, keys[1]: otherval} elif keys[0] in ['rc', 'br', 'filter', 'camrow']: # for {camrow}{camcol}, {filter}{camcol}, {br}{id}, etc pdict = {keys[0]: value[0], keys[1]: value[1:]} else: raise ValueError('This case has not yet been accounted for.') path_dict.update(pdict) else: path_dict[keys[0]] = value return path_dict
[docs] def dir(self, filetype, **kwargs): """Return the directory containing a file of a given type. Parameters ---------- filetype : str File type parameter. Returns ------- dir : str Directory containing the file. """ full = kwargs.get('full', None) if not full: full = self.full(filetype, **kwargs) return os.path.dirname(full)
[docs] def name(self, filetype, **kwargs): """Return the name of a file of a given type. Parameters ---------- filetype : str File type parameter. Returns ------- name : str Name of a file with no directory information. """ full = kwargs.get('full', None) if not full: full = self.full(filetype, **kwargs) return os.path.basename(full)
[docs] def exists(self, filetype, remote=None, **kwargs): '''Checks if the given type of file exists locally Parameters ---------- filetype : str File type parameter. remote : bool If True, checks for remote existence of the file Returns ------- exists : bool Boolean indicating if the file exists. ''' full = kwargs.get('full', None) if not full: full = self.full(filetype, **kwargs) if remote: # check for remote existence using a HEAD request url = self.url('', full=full) verify = kwargs.get('verify', True) try: resp = requests.head(url, allow_redirects=True, verify=verify) except Exception as e: raise AccessError('Cannot check for remote file existence for {0}: {1}'.format(url, e)) else: return resp.ok else: return os.path.isfile(full)
[docs] def expand(self, filetype, **kwargs): ''' Expand a wildcard path locally Parameters ---------- filetype : str File type parameter. as_url: bool Boolean to return SAS urls refine: str Regular expression string to filter the list of files by before random selection Returns ------- expand : list List of expanded full paths of the given type. ''' full = kwargs.get('full', None) if not full: full = self.full(filetype, **kwargs) # assert '*' in full, 'Wildcard must be present in full path' files = glob(self._add_compression_wild(full)) # return as urls? as_url = kwargs.get('as_url', None) newfiles = [self.url('', full=full) for full in files] if as_url else files # optionally refine the results refine = kwargs.get('refine', None) if refine: newfiles = self.refine(newfiles, refine, **kwargs) return newfiles
[docs] def any(self, filetype, **kwargs): ''' Checks if the local directory contains any of the type of file Parameters ---------- filetype : str File type parameter. Returns ------- any : bool Boolean indicating if the any files exist in the expanded path on disk. ''' expanded_files = self.expand(filetype, **kwargs) return any(expanded_files)
[docs] def one(self, filetype, **kwargs): ''' Returns random one of the given type of file Parameters ---------- filetype : str File type parameter. as_url: bool Boolean to return SAS urls refine: str Regular expression string to filter the list of files by before random selection Returns ------- one : str Random file selected from the expanded list of full paths on disk. ''' expanded_files = self.expand(filetype, **kwargs) isany = self.any(filetype, **kwargs) return choice(expanded_files) if isany else None
[docs] def random(self, filetype, **kwargs): ''' Returns random number of the given type of file Parameters ---------- filetype : str File type parameter. num : int The number of files to return as_url: bool Boolean to return SAS urls refine: str Regular expression string to filter the list of files by before random selection Returns ------- random : list Random file selected from the expanded list of full paths on disk. ''' expanded_files = self.expand(filetype, **kwargs) isany = self.any(filetype, **kwargs) if isany: # get the desired number num = kwargs.get('num', 1) assert num <= len(expanded_files), 'Requested number must be larger the sample. Reduce your number.' return sample(expanded_files, num) else: return None
[docs] def refine(self, filelist, regex, filterdir='out', **kwargs): ''' Returns a list of files filterd by a regular expression Parameters ---------- filelist : list A list of files to filter on. regex : str The regular expression string to filter your list filterdir: {'in', 'out'} Indicates the filter to be inclusive or exclusive 'out' removes the items satisfying the regular expression 'in' keeps the items satisfying the regular expression Returns ------- refine : list A file list refined by an input regular expression. ''' assert filelist, 'Must provide a list of filenames to refine on' assert regex, 'Must provide a regular expression to refine the file list' r = re.compile(regex) # icheck filter direction; default is out assert filterdir in ['in', 'out'], 'Filter direction must be either "in" or "out"' if filterdir == 'out': subset = list(filter(lambda i: r.search(i), filelist)) elif filterdir == 'in': subset = list(filter(lambda i: not r.search(i), filelist)) return subset
[docs] def full(self, filetype, **kwargs): """Return the full local path of a given type of file. Parameters ---------- filetype : str File type parameter. force_module: bool If True, forces software products to use any existing Module environment paths kwargs: dict Any path template keyword arguments Returns ------- full : str The full local path to the file. """ # check if full already in kwargs if 'full' in kwargs: return kwargs.get('full') # check for filetype in template assert filetype in self.templates, ('No entry {0} found. Filetype must ' 'be one of the designated templates ' 'in the currently loaded tree'.format(filetype)) template = self.templates[filetype] if not is_posix: template = template.replace('/', sep) # Check if forcing module paths force_module = kwargs.get('force_module', None) if force_module or self.force_modules: template = self.check_modules(template, permanent=self.force_modules) # Now replace {} items # check for missing keyword arguments keys = self.lookup_keys(filetype) # split keys to remove :format from any "key:format" keys = [k.split(':')[0] for k in keys] missing_keys = set(keys) - set(kwargs.keys()) if missing_keys: raise KeyError('Missing required keyword arguments: {0}'.format(list(missing_keys))) else: template = template.format(**kwargs) # Now replace environmental variables template = _expandvars(template) # Now call special functions as appropriate template = self._call_special_functions(filetype, template, **kwargs) # Now match on any software product tags skip_tag_check = kwargs.get('skip_tag_check', None) if not skip_tag_check: template = re.sub(r'tags/(v?[0-9._]+)', r'\1', template, count=1) return self._check_compression(template)
[docs] @staticmethod def check_modules(template, permanent=None): ''' Check for any existing Module path environment For software product paths, overrides the tree environment paths with existing original envvars from os.environ that may be set from shell bash or module environments. Checks the original os.environ for any environment variables and replaces the template envvar with the original os version. Ignores all SAS data paths. If permanent is True, then permanently replaces the envvar in existing os.environ with the original. Assumes original environment variables points to definitions created by module files or bash profiles. Parameters: template (str): The path template to check permanent (bool): If True, sets the original module environment variable into os.environ Returns: The template with updated environment variable path ''' # if template starts with $SAS_BASE_DIR, then do nothing expanded_template = _expandvars(template) if expanded_template.startswith(os.getenv("SAS_BASE_DIR")): return template # match template against envvar $ENVVAR_DIR ev_match = re.match(r'^\$(\w+)', template) if ev_match: envvar = ev_match.group() envvar_name = ev_match.groups()[0] orig_os = tree.get_orig_os_environ() if envvar_name in orig_os: orig_envvar = orig_os.get(envvar_name) # update the real os environment if permanent: os.environ[envvar_name] = orig_envvar return template.replace(envvar, orig_envvar) else: log.info('No existing envvar found for {0}. Returning input template'.format(envvar_name)) return template
def _remove_compression(self, template): ''' remove a compression suffix ''' is_comp = re.search(self._comp_regex, template) if is_comp: temp_split = re.split(self._comp_regex, template) template = temp_split[0] return template def _add_compression_wild(self, template): ''' add a compression wildcard ''' is_comp = re.search(self._comp_regex, template) if is_comp: for comp in self._compressions: template = template.replace(comp, '*') else: template = template + '*' return template def _check_compression(self, template): ''' check if filepath is actually compressed ''' exists = self.exists('', full=template) if exists: return template # check if file is not compressed compared to template is_comp = re.search(self._comp_regex, template) if is_comp: base = os.path.splitext(template)[0] exists = self.exists('', full=base) if exists: return base # check if file on disk is actually compressed compared to template alternates = glob(template + '*') if alternates: suffixes = list(set([re.search(self._comp_regex, c).group(0) for c in alternates if re.search(self._comp_regex, c)])) if suffixes: assert len(suffixes) == 1, 'should only be one suffix per file template ' if not template.endswith(suffixes[0]): template = template + suffixes[0] return template def _call_special_functions(self, filetype, template, **kwargs): ''' Call the special functions found in a template path Calls special functions indicated by %methodname found in the sdss_paths.ini template file, and replaces the %location in the path with the returned content. Parameters: filetype (str): template name of file template (str): the template path kwargs (dict): Any kwargs needed to pass into the methods Returns: The expanded template path ''' # Now call special functions as appropriate functions = re.findall(self._special_fxn_pattern, template) if not functions: return template for function in functions: try: method = getattr(self, function[1:-1]) except AttributeError: return None else: value = method(filetype, **kwargs) template = template.replace(function, value) return template
[docs] def is_sdss5(self) -> bool: """ Checks if the release is an SDSS-V work or ipl release """ return any(s5cfg for s5cfg in self._s5cfgs if self.release.startswith(s5cfg))
[docs] def get_netloc(self, netloc=None, sdss=None, sdss5=None, dtn=None, svn=None, mirror=None): ''' Get a net url domain Returns an SDSS url domain location. Options are the SDSS SAS domain, the rsync download server, the svn server, or the mirror data domain. The mirror data domain is retrieved either by the ``mirror`` input keyword argument or by the ``path.mirror`` attribute. Parameters ---------- netloc : str An exact net location to return directly sdss : bool If True, returns SDSS data domain: data.sdss.org sdss5 : bool If True, sets the SDSS-V data domain: data.sdss5.org dtn : bool If True, returns SDSS rsync server domain: dtn.sdss.org svn: bool If True, returns SDSS svn domain: svn.sdss.org mirror: bool If True, return SDSS mirror domain: data.mirror.sdss.org. Returns ------- An http domain name ''' if netloc: return netloc if dtn: return self._netloc["dtn"] elif sdss: return self._netloc["sdss"] elif sdss5: return self._netloc["sdss5"] elif mirror or self.mirror: return self._netloc["mirror"] elif svn: return '{0}{1}'.format(self._netloc["svn"], "/public" if self.public else '') else: return self._netloc["sdss5"] if self.is_sdss5() else self._netloc["sdss"]
[docs] def set_netloc(self, netloc=None, sdss=None, sdss5=None, dtn=None, svn=None, mirror=None): ''' Set a url domain location Sets an SDSS url domain location. Options are the SDSS SAS domain, the rsync download server, the svn server, or the mirror data domain. The mirror data domain is set either by the ``mirror`` input keyword argument or by the ``path.mirror`` attribute. Parameters ---------- netloc : str An exact net location to use directly sdss : bool If True, sets the SDSS-IV data domain: data.sdss.org sdss5 : bool If True, sets the SDSS-V data domain: data.sdss5.org dtn : bool If True, sets the SDSS rsync server domain: dtn.sdss.org svn: bool If True, sets the SDSS svn domain: svn.sdss.org mirror: bool If True, sets the SDSS mirror domain: data.mirror.sdss.org. ''' self.netloc = self.get_netloc(netloc=netloc, sdss=sdss, sdss5=sdss5, dtn=dtn, svn=svn, mirror=mirror)
[docs] def set_remote_base(self, scheme='https'): self.remote_base = self.get_remote_base(scheme=scheme or 'https')
[docs] def get_remote_base(self, scheme="https", svn=None): ''' Get the remote base path Parameters ---------- scheme : str The url scheme. Either "https" or "rsync". svn : bool If True, uses the svn url domain svn.sdss.org as the netloc ''' netloc = self.netloc if svn: netloc = self.get_netloc(svn=True) if self.public or scheme == "https": remote_base = "{scheme}://{netloc}".format(scheme=scheme, netloc=netloc) else: user = "sdss5" if self.is_sdss5() else "sdss" remote_base = "{scheme}://{user}@{netloc}".format(scheme=scheme, user=user, netloc=netloc) return remote_base
[docs] def set_base_dir(self, base_dir=None): ''' Sets the base directory Sets the ``base_dir`` attribute. Defaults to $SAS_BASE_DIR. Can be overridden by passing in ``base_dir`` keyword argument. The ``base_dir`` sets the beginning part of all local paths. Parameters ---------- base_dir : str A directory path to use as the base ''' if base_dir: self.base_dir = join(base_dir, '') else: try: self.base_dir = join(os.environ['SAS_BASE_DIR'], '') except Exception: pass
[docs] @staticmethod def yield_product_root(): ''' yields a product root environment name ''' for root in tree._product_roots: yield root
[docs] def find_location(self, filetype, **kwargs): ''' Finds a relative location of a product path Attempts to find a relative path location for a software product path. Loops over all product_roots defined in the tree and tests if a relative location can be extracted, i.e. if the path starts with a given root path. The root environment paths searched are the following in order of precendence: PRODUCT_ROOT, SDSS_SVN_ROOT, SDSS_INSTALL_PRODUCT_ROOT, SDSS_PRODUCT_ROOT, SDSS4_PRODUCT_ROOT. If no root is found uses one directory up from SAS_BASE_DIR. Parameters ---------- filetype : str File type parameter. kwargs : dict Path definition keyword arguments Returns ------- The relative path location (to the base_dir) ''' # loop over all potential git/svn product roots loc = None for root in tree._product_roots: loc = self._extract_location(filetype, base_dir=os.getenv(root), **kwargs) if loc: self.product_root = os.getenv(root) break return loc
def _extract_location(self, filetype, base_dir=None, **kwargs): ''' Extracts the relative path location of the file Parameters ---------- filetype : str File type parameter. base_dir : str A root directory to use as the base. Defaults to SAS_BASE_DIR. Returns ------- The relative path location (to the base_dir) ''' full = kwargs.get('full', None) if not full: full = self.full(filetype, **kwargs) self.set_base_dir(base_dir=base_dir) location = full[len(self.base_dir):] if full and full.startswith(self.base_dir) else None return location
[docs] def location(self, filetype, base_dir=None, **kwargs): """Return the location of the relative sas path of a given type of file. Parameters ---------- filetype : str File type parameter. base_dir : str A root directory to use as the base. Defaults to SAS_BASE_DIR. Returns ------- The relative path location (to the base_dir) """ # extract the location using SAS_BASE_DIR as the base location = self._extract_location(filetype, base_dir=base_dir, **kwargs) # attempt to find a product location if not location: location = self.find_location(filetype, **kwargs) if location and '//' in location: location = location.replace('//', '/') return location
[docs] def url(self, filetype, base_dir=None, sasdir='sas', **kwargs): """Return the url of a given type of file. Parameters ---------- filetype : str File type parameter. base_dir : str A root directory to use as the base. Defaults to SAS_BASE_DIR. Returns ------- full : str The sas url to the file. """ # determine the remote domain location remote_base = self.remote_base full = self.full(filetype, skip_tag_check=True, **kwargs) # if not on the SAS, assume it is an SVN product path if not full.startswith(os.getenv("SAS_BASE_DIR")): remote_base = self.get_remote_base(svn=True) sasdir = '' # get the location and set the url location = self.location(filetype, skip_tag_check=True, base_dir=base_dir, **kwargs) if not location: raise AccessError('Cannot construct url. A path.location could not extracted. ') # create the url path url = join(remote_base, sasdir, location) if remote_base and location else None if not is_posix: url = url.replace(sep, '/') # handle edge case when a full path is passed in as path.url('', full=full) # sanity check on svn tags if 'svn.sdss.org' in url: tag_match = re.search(r'tags/(v?[0-9._]+)', url) if not tag_match: url = re.sub(r'(/v?[0-9._]+/)', r'/tags\1', url, count=1) return url
[docs] def add_temp_path(self, name: str, path: str, envvar_path: str = None): """ Add a temporary path template in sdss_access Add a path template temporarily into the local os environment for use in sdss_access. Define a template name and path. The path must start with an environment variable definition. This is useful for development of new paths before adding them to the tree and tagging a new version. This allows sdss_access to still be used in the interim. This is an alternative to checking out the tree git repo and modifying paths there. The recommended way of adding new paths is through a PR on the tree product. Parameters ---------- name : str the temporary file species name path : str the temporary template directory path envvar_path : str, optional the definition path of the environment variable, by default None Raises ------ ValueError when the name does not match the correct syntax ValueError when the path does not start with an environment variable ValueError when the environment variable is not defined """ # check name syntax if not re.match(r"^[a-zA-Z_0-9\-]+$", name): raise ValueError('Name can only consist of letters, numbers, dashes or underscores.') # check if template path starts with an environment variable envvar = path.split("/", 1)[0] if not envvar.startswith("$"): raise ValueError('Template path must start with an environment variable, $ENVVAR_NAME.') # check envvar is in the local environment if envvar[1:] not in os.environ: if not envvar_path: raise ValueError('Template path envvar not defined in local ' 'environment. Please specify an envvar_path.') # add the envvar envvar_path = envvar_path.rstrip("/") os.environ[envvar[1:]] = envvar_path # add the temporary path template self.templates[name] = path
def _expandvars(template): ''' Recursively run os.path.expandvars Recursively calls os.path.expandvars Parameters: template (str): sdss_access path template Return: A path template with expanded environment variables ''' template = os.path.expandvars(template) if template.startswith('$'): # if the envvar isn't in os.environ, then exit envvar = template.split('/', 1)[0] if envvar[1:] not in os.environ: return template # recurse down return _expandvars(template) return template
[docs] class Path(BasePath): """Class for construction of paths in general. Sets a particular template file. Parameters ---------- release : str The release name, e.g. 'DR15', 'MPL-9'. public : bool If True, uses public urls. Only needed for public data releases. Automatically set to True when release contains "DR". mirror : bool If True, uses the mirror data domain url. Default is False. verbose: bool If True, turns on verbosity. Default is False. force_modules : bool If True, forces svn or github software products to use any existing local Module environment paths, e.g. PLATEDESIGN_DIR preserve_envvars : bool | list Flag(s) to indicate some or all original environment variables to preserve Attributes ---------- templates : dict The set of templates read from the configuration file. """ def __init__(self, release=None, public=False, mirror=False, verbose=False, force_modules=None, preserve_envvars=None): super(Path, self).__init__(release=release, public=public, mirror=mirror, verbose=verbose, force_modules=force_modules, preserve_envvars=preserve_envvars) def __repr__(self): rep = super().__repr__() return rep.replace('BasePath', 'Path')
[docs] def plateid6(self, filetype, **kwargs): """Print plate ID, accounting for 5-6 digit plate IDs. Parameters ---------- filetype : str File type parameter. plateid : int or str Plate ID number. Will be converted to int internally. Returns ------- plateid6 : str Plate ID formatted to a string of 6 characters. """ plateid = int(kwargs['plateid']) if plateid < 10000: return "{:0>6d}".format(plateid) else: return "{:d}".format(plateid)
[docs] def platedir(self, filetype, **kwargs): """Returns plate subdirectory in :envvar:`PLATELIST_DIR` of the form: ``NNNNXX/NNNNNN``. Parameters ---------- filetype : str File type parameter. plateid : int or str Plate ID number. Will be converted to int internally. Returns ------- platedir : str Plate directory in the format ``NNNNXX/NNNNNN``. """ plateid = int(kwargs['plateid']) plateid100 = plateid // 100 subdir = "{:0>4d}".format(plateid100) + "XX" return os.path.join(subdir, "{:0>6d}".format(plateid))
[docs] def plategrp(self, filetype, **kwargs): ''' Returns plate group subdirectory Parameters ---------- filetype : str File type parameter. plate : int or str Plate ID number. Will be converted to int internally. Returns ------- plategrp : str Plate group directory in the format ``NNNNXX``. ''' plate = kwargs.get('plate', kwargs.get('plateid', None)) if not plate: return 'XX' return '{:0>4d}XX'.format(int(plate) // 100)
[docs] def spectrodir(self, filetype, **kwargs): """Returns :envvar:`SPECTRO_REDUX` or :envvar:`BOSS_SPECTRO_REDUX` depending on the value of `run2d`. Parameters ---------- filetype : str File type parameter. run2d : int or str 2D Reduction ID. Returns ------- spectrodir : str Value of the appropriate environment variable. """ if str(kwargs['run2d']) in ('26', '103', '104'): return os.environ['SPECTRO_REDUX'] else: return os.environ['BOSS_SPECTRO_REDUX']
[docs] def definitiondir(self, filetype, **kwargs): """Returns definition subdirectory in :envvar:`PLATELIST_DIR` of the form: ``NNNNXX``. Parameters ---------- filetype : str File type parameter. designid : int or str Design ID number. Will be converted to int internally. Returns ------- definitiondir : str Definition directory in the format ``NNNNXX``. """ designid = int(kwargs['designid']) designid100 = designid // 100 subdir = "{:0>4d}".format(designid100) + "XX" return subdir
[docs] def healpixgrp(self, filetype, **kwargs): ''' Returns HEALPIX group subdirectory Parameters ---------- filetype : str File type parameter. healpix : int or str HEALPix number. Will be converted to int internally. Returns ------- healpixgrp : str HEALPix group directory, HEALPix//1000. ''' healpix = int(kwargs['healpix']) subdir = "{:d}".format(healpix // 1000) return subdir
[docs] def cat_id_groups(self, filetype, **kwargs): ''' Return a folder structure to group data together based on their catalog identifier so that we don't have too many files in any one folder. Parameters ---------- filetype : str File type parameter. cat_id : int or str SDSS-V catalog identifier Returns ------- catalogid_group : str A set of folders. ''' # with k = 100 then even with 10 M sources, each folder will have ~1,000 files k = 100 if 'cat_id' in kwargs: cat_id = int(kwargs['cat_id']) elif 'catid' in kwargs: # removing the undesired version as this messes up the lookup_keys method kwargs['cat_id'] = kwargs.pop('catid') cat_id = int(kwargs['cat_id']) return f"{(cat_id // k) % k:0>2.0f}/{cat_id % k:0>2.0f}"
[docs] def sdss_id_groups(self, filetype, **kwargs): ''' Return a folder structure to group data together based on their SDSS identifier so that we don't have too many files in any one folder. Parameters ---------- filetype : str File type parameter. sdss_id : int or str SDSS-V identifier Returns ------- sdssid_groups : str A set of folders. ''' # with k = 100 then even with 10 M sources, each folder will have ~1,000 files k = 100 sdss_id = int(kwargs["sdss_id"]) return f"{(sdss_id // k) % k:0>2.0f}/{sdss_id % k:0>2.0f}"
[docs] def component_default(self, filetype, **kwargs): ''' Return the component name, if given. The component designates a stellar or planetary body following the Washington Multiplicity Catalog, which was adopted by the XXIV meeting of the International Astronomical Union. When no component is given, the star is assumed to be without a discernible companion. When a component is given it follows the system (Hessman et al., arXiv:1012.0707): – the brightest component is called “A”, whether it is initially resolved into sub-components or not; – subsequent distinct components not contained within “A” are labeled “B”, “C”, etc.; – sub-components are designated by the concatenation of on or more suffixes with the primary label, starting with lowercase letters for the 2nd hierarchical level and then with numbers for the 3rd. Parameters ---------- filetype : str File type parameter. This argument is not used here, but is required for all special functions in the `sdss_access` product. component : str [optional] The component name as given by the fields. Returns ------- component : str The component name if given, otherwise a blank string. ''' # the (..) or '' resolves None to '' # integer 0 resolves to '', i.e. 0 or '' evaluates as None; making check explicit comp = kwargs.get('component', '') return str(comp) if comp is not None else ''
[docs] def apgprefix(self, filetype, **kwargs): ''' Returns APOGEE prefix using telescope/instrument. Parameters ---------- filetype : str File type parameter. telescope : str The APOGEE telescope (apo25m, lco25m, apo1m). instrument : str The APOGEE instrument (apogee-n, apogee-s). Returns ------- prefix : str The APOGEE prefix (ap/as). ''' # since telescope or instrument already defined in the main # template for all paths, we rename the kwargs dict so these keywords # do not get picked up by the lookup_keys method. newkw = kwargs.copy() telescope = newkw.get('telescope', None) if telescope is not None: prefix = {'apo25m': 'ap', 'apo1m': 'ap', 'lco25m': 'as'} if telescope not in prefix: raise ValueError(f'{telescope} not in allowed list of prefixes') return prefix[telescope] instrument = newkw.get('instrument', None) if instrument is not None: prefix = {'apogee-n': 'ap', 'apogee-s': 'as'} if instrument not in prefix: raise ValueError(f'{instrument} not in allowed list of prefixes') return prefix[instrument] return ''
[docs] def apginst(self, filetype, **kwargs): ''' Returns APOGEE "instrument" from "telescope". Parameters ---------- filetype : str File type parameter. telescope : str The APOGEE telescope (apo25m, lco25m, apo1m). Returns ------- instrument : str The APOGEE instrument (apogee-n, apogee-s). ''' telescope = kwargs.get('telescope', None) if telescope is not None: instrument = {'apo25m': 'apogee-n', 'apo1m': 'apogee-n', 'lco25m': 'apogee-s'} if telescope not in instrument: raise ValueError(f'{telescope} not in allowed list of prefixes') return instrument[telescope] return ''
[docs] def configgrp(self, filetype, **kwargs): ''' Returns configuration summary file group subdirectory Parameters ---------- filetype : str File type parameter. configid : int or str Configuration ID number. Will be converted to int internally. Returns ------- configgrp : str Configuration group directory in the format ``NNNNXX``. ''' configid = kwargs.get('configid', None) if not configid: return '0000XX' return '{:0>4d}XX'.format(int(configid) // 100)
[docs] def configsubmodule(self, filetype, **kwargs): ''' Returns configuration summary submodule group subdirectory Parameters ---------- filetype : str File type parameter. configid : int or str Configuration ID number. Will be converted to int internally. Returns ------- configsubmodule : str Configuration submodule directory in the format ``NNNXXX``. ''' configid = kwargs.get('configid', None) if not configid: return '000XXX' return '{:0>3d}XXX'.format(int(configid) // 1000)
[docs] def isplate(self, filetype, **kwargs): ''' Returns the plate flag for BOSS idlspec2d run2d versions that utilize it Parameters --------- filetype : str File type parameter run2d : str BOSS idlspec2d run2d version Returns ------- isplate : str isplate flag = 'p' for relevent run2d plates else flag = '' ''' run2d = kwargs.get('run2d', None) if not run2d: return '' if run2d in ['v6_0_1','v6_0_2', 'v6_0_3', 'v6_0_4']: return 'p' return ''
[docs] def pad_fieldid(self, filetype, **kwargs): ''' Returns the fieldid zero padded to its proper length for the BOSS idlspec2d run2d version Parameters --------- filetype : str File type parameter run2d : str BOSS idlspec2d run2d version fieldid : str or int Field ID number. Will be converted to str internally. Returns ------- fieldid : str padd_fieldid in the form of N*'0' where N is the number of necessary zeros to pad fieldid ''' fieldid = kwargs.get('fieldid', None) run2d = kwargs.get('run2d', None) if (not run2d) & (not fieldid): return '' fieldid = str(fieldid) if run2d in ['v6_0_1','v6_0_2', 'v6_0_3', 'v6_0_4']: return str(fieldid) if fieldid.isnumeric(): return str(fieldid).zfill(6) else: return fieldid
[docs] def tilegrp(self, filetype, **kwargs): ''' Returns LVM tile id group subdirectory Parameters ---------- filetype : str File type parameter. tileid : int or str LVM Tile ID number. Will be converted to int internally. Returns ------- tileidgrp : str Tile ID group directory in the format ``NNNNXX``. ''' tileid = kwargs.get('tileid', None) if not tileid: return '0000XX' elif '*' in str(tileid): return '{0}XX'.format(tileid) return '{:0>4d}XX'.format(int(tileid) // 1000)
[docs] class AccessError(Exception): pass