Source code for sdss_access.sync.baseaccess

from __future__ import absolute_import, division, print_function, unicode_literals
# The line above will help with 2to3 support.

import abc
import six
from os.path import join, sep
from sdss_access import Path
from sdss_access.sync.auth import Auth, AuthMixin
from sdss_access.sync.stream import Stream
from sdss_access import is_posix, AccessError


[docs] class BaseAccess(six.with_metaclass(abc.ABCMeta, AuthMixin, Path)): """Class for providing Rsync or Curl access to SDSS SAS Paths """ remote_scheme = None access_mode = 'rsync' if is_posix else 'curl' def __init__(self, label=None, stream_count=5, mirror=False, public=False, release=None, verbose=False, force_modules=None, preserve_envvars=None): super(BaseAccess, self).__init__(release=release, public=public, mirror=mirror, verbose=verbose, force_modules=force_modules, preserve_envvars=preserve_envvars) self.label = label self.auth = None self.stream = None self.stream_count = stream_count self._stream_command = None self.verbose = verbose self.initial_stream = self.get_stream()
[docs] def remote(self, username=None, password=None, inquire=None): """ Configures remote access """ use_dtn = self.remote_scheme == 'rsync' # simplifies things to have a single sdss (or sdss5) machine in # .netrc for SDSS-IV (or SDSS-V, respectively). sdss5 = self.is_sdss5() # set sdss5 true if s5cfg (e.g. sdsswork or ipl-N) starts with # values defined in self._s5cfgs (e.g. sdss or ipl). self.set_netloc(sdss=not sdss5, sdss5=sdss5) self.set_auth(username=username, password=password, inquire=inquire) if use_dtn: self.set_netloc(dtn=use_dtn) self.set_remote_base(scheme=self.remote_scheme)
[docs] def add(self, filetype, **kwargs): """ Adds a filepath into the list of tasks to download""" location = self.location(filetype, **kwargs) sas_module, location = location.split(sep, 1) if location else (None, location) # set proper sasdir based on access method sasdir = 'sas' if self.access_mode == 'curl' else '' source = self.url(filetype, sasdir=sasdir, **kwargs) # raise error if attempting to add a software product path if 'svn.sdss.org' in source: raise AccessError('Rsync/Curl Access not allowed for svn paths. Please use HttpAccess.') if 'full' not in kwargs: destination = self.full(filetype, **kwargs) else: destination = kwargs.get('full') if sas_module and location and source and destination: self.initial_stream.append_task( sas_module=sas_module, location=location, source=source, destination=destination) else: print("There is no file with filetype=%r to access in the tree module loaded" % filetype)
[docs] def add_file(self, path, input_type = None): """ Adds a file into the list of tasks to download Adds a full filepath, url, or location to the list of tasks to download. This takes advantage of ``sdss_access`` parallel streams to download a list of files. This is similar to the ``.add`` method, except this takes full filepaths or urls as input, whereas the ``.add`` method is better when inputing a product name and path template kwargs. Parameters ---------- path : str the filepath, url, or location input_type : str, optional the type of input, by default None """ # check for input_type if none provided if not input_type: if path.startswith(('rsync', 'http')): input_type = 'url' elif path.startswith('/'): input_type = 'filepath' else: input_type = 'location' # use the right sasdir based on mode sasdir = 'sas' if self.access_mode == 'curl' else '' # determine stream task info if input_type == 'filepath': location = self.location('', full=path) sas_module, location = location.split(sep, 1) if location else (None, location) source = self.url('', sasdir=sasdir, full=path) dest = path elif input_type == 'url': self.set_base_dir() dest = path.replace(f'{self.remote_base}/{sasdir}', self.base_dir) source = path location = self.location('', full=dest) sas_module, location = location.split(sep, 1) if location else (None, location) elif input_type == 'location': sas_module, location = path.split(sep, 1) if path else (None, path) dest = join(self.base_dir, path) source = self.url('', sasdir=sasdir, full=dest) # append the task to the stream self.initial_stream.append_task(sas_module=sas_module, location=location, source=source, destination=dest)
[docs] def set_stream(self): """ Sets the download streams """ if not self.auth: raise AccessError( "Please use the remote() method to set rsync authorization or use remote(public=True) for public data") elif not self.initial_stream.task: raise AccessError("No files to download.") else: self.stream = self.get_stream() # set stream source based on access mode if self.access_mode == 'rsync': self.stream.source = self.remote_base elif self.access_mode == 'curl': self.stream.source = join(self.remote_base, 'sas').replace(sep, '/') # set stream destination self.stream.destination = self.base_dir # set client env dict based on access mode if self.access_mode == 'rsync': key = 'RSYNC_PASSWORD' elif self.access_mode == 'curl': key = 'CURL_PASSWORD' self.stream.cli.env = {key: self.auth.password} if self.auth.ready() else None if self.stream.source and self.stream.destination: for task in self.initial_stream.task: self.set_stream_task(task) ntask = len(self.stream.task) if self.stream.stream_count > ntask: if self.verbose: print("SDSS_ACCESS> Reducing the number of streams from %r to %r, the number of download tasks." % ( self.stream.stream_count, ntask)) self.stream.stream_count = ntask self.stream.streamlet = self.stream.streamlet[:ntask]
[docs] def get_stream(self): ''' return a Stream object ''' stream = Stream(stream_count=self.stream_count, verbose=self.verbose) return stream
[docs] def reset(self): ''' Reset all streams ''' # reset the main stream if self.stream: self.stream.reset() # reset the initial stream (otherwise old 'adds' remain in the new stream) if self.initial_stream: self.initial_stream.reset()
[docs] def shuffle(self): ''' Shuffle the stream ''' self.stream.shuffle()
[docs] def get_locations(self, offset=None, limit=None): ''' Rreturn the locations for all paths in the stream ''' return self.stream.get_locations(offset=offset, limit=limit) if self.stream else None
[docs] def get_paths(self, offset=None, limit=None): ''' Return the base paths for all paths in the stream ''' locations = self.get_locations(offset=offset, limit=limit) sasdir = self._get_sas_module() paths = [join(self.base_dir, sasdir, location) for location in locations] if locations else None return paths
[docs] def get_urls(self, offset=None, limit=None): ''' Return the urls for all paths in the stream ''' locations = self.get_locations(offset=offset, limit=limit) remote_base = self.get_remote_base() sasdir = self._get_sas_module() urls = [join(remote_base, sasdir, location) for location in locations] if locations else None return urls
[docs] @abc.abstractmethod def generate_stream_task(self, task=None, out=None): ''' creates the task to put in the download stream '''
[docs] def set_stream_task(self, task=None, out=None): ''' sets the path input dictionary for a task in a stream ''' stream_has_task = False for sas_module, location, source, destination in self.generate_stream_task(task=task, out=out): if sas_module and location and source and destination: stream_has_task = True self.stream.append_task(sas_module=sas_module, location=location, source=source, destination=destination) """if self.verbose: print("SDSS_ACCESS> Preparing to download: %s" % join(sas_module, location)) print("SDSS_ACCESS> from: %s" % source) print("SDSS_ACCESS> to: %s" % destination) print("-"*80)""" if not stream_has_task: print('SDSS_ACCESS> Error: stream has nothing to do.')
@abc.abstractmethod def _get_stream_command(self): ''' gets the stream command used when committing the download '''
[docs] def commit(self, offset=None, limit=None, follow_symlinks: bool = True): """ Start the download """ self.stream.command = self._get_stream_command(follow_symlinks=follow_symlinks) self.stream.sas_module = self._get_sas_module() self.stream.append_tasks_to_streamlets(offset=offset, limit=limit) self.stream.commit_streamlets() self.stream.run_streamlets() self.stream.reset_streamlet()