from __future__ import absolute_import, division, print_function, unicode_literals
# The line above will help with 2to3 support.
import re
from sdss_access.sync import Cli
from random import shuffle
from os.path import sep, join
from sdss_access import is_posix
[docs]
class Stream(object):
max_stream_count = 5
def __init__(self, stream_count=None, verbose=False):
self.verbose = verbose
try:
self.stream_count = min(int(stream_count), self.max_stream_count)
except Exception:
self.stream_count = 0
self.streamlet = [{'index': index, 'sas_module': None, 'location': None, 'source': None, 'destination': None} for index in range(0, self.stream_count)]
self.reset()
self.index = 0
self.command = None
self.env = None
self.source = None
self.destination = None
self.cli = Cli(verbose=verbose)
[docs]
def reset(self):
self.reset_task()
self.reset_streamlet()
[docs]
def reset_task(self):
self.task = []
[docs]
def reset_streamlet(self):
for index in range(0, self.stream_count):
self.set_streamlet(index=index, sas_module=[], location=[], source=[], destination=[])
[docs]
def set_streamlet(self, index=None, sas_module=None, location=None, source=None, destination=None):
streamlet = self.get_streamlet(index=index)
if streamlet:
try:
n = len(location)
ok = n == len(sas_module) and n == len(source) and n == len(destination)
streamlet['sas_module'], streamlet['location'], streamlet['source'], streamlet['destination'] = (
sas_module, location, source, destination) if ok else (None, None, None, None)
except Exception:
streamlet['sas_module'], streamlet['location'], streamlet['source'], streamlet['destination'] = (
None, None, None, None)
[docs]
def get_streamlet(self, index=None, increment=1):
if index is None:
self.index += increment
if self.index >= self.stream_count:
self.index = 0
else:
try:
self.index = int(index)
except Exception:
self.index = 0
try:
streamlet = self.streamlet[self.index]
except Exception:
streamlet = None
return streamlet
[docs]
def get_locations(self, offset=None, limit=None):
locations = [task['location'] for task in self.task] if self.task else None
if offset:
locations = locations[offset:]
if limit:
locations = locations[:limit]
if not is_posix:
locations = [loc.replace('/', sep) for loc in locations]
else:
locations = [loc for loc in locations]
return locations
[docs]
def shuffle(self):
shuffle(self.task)
[docs]
def refine_task(self, regex=None):
locations = self.get_locations()
r = re.compile(regex)
subset = filter(lambda i: r.search(i), locations)
self.task = [self.task[locations.index(s)] for s in subset]
[docs]
def append_task(self, sas_module=None, location=None, source=None, destination=None):
if sas_module and location and source and destination:
task = {'sas_module': sas_module, 'location': location, 'source': source,
'destination': destination, 'exists': None}
self.task.append(task)
[docs]
def append_tasks_to_streamlets(self, offset=None, limit=None):
tasks = []
ntasks = 0
for index, task in enumerate(self.task):
if (offset is None or index >= offset):
tasks.append(task)
ntasks += 1
if limit is not None and ntasks >= limit:
break
for task in tasks:
self.append_streamlet(task=task)
[docs]
def append_streamlet(self, index=None, task=None):
streamlet = self.get_streamlet(index=index)
if streamlet and task:
streamlet['sas_module'].append(task['sas_module'])
streamlet['location'].append(task['location'])
streamlet['source'].append(task['source'])
streamlet['destination'].append(task['destination'])
[docs]
def commit_streamlets(self):
if self.command:
self.cli.set_dir()
for streamlet in self.streamlet:
self.commit_streamlet(streamlet)
if self.verbose:
print("SDSS_ACCESS> streamlets added to {dir}".format(dir=self.cli.dir))
[docs]
def commit_streamlet(self, streamlet=None):
if streamlet:
streamlet['path'] = self.cli.get_path(index=streamlet['index'])
path_txt = "{0}.txt".format(streamlet['path'])
streamlet['location']
streamlet['command'] = self.command.format(path=path_txt, sas_module=self.sas_module,
source=self.source, destination=self.destination)
if 'rsync -' in self.command:
lines = [location for location in streamlet['location']]
else:
if not is_posix:
lines = ['url ' + join(self.source, location).replace(sep,'/')+'\n'+'output ' +
join(self.destination, location) for location in streamlet['location']]
else:
lines = ['url ' + join(self.source, location)+'\n'+'output ' +
join(self.destination, location) for location in streamlet['location']]
self.cli.write_lines(path=path_txt, lines=lines)
[docs]
def run_streamlets(self):
for streamlet in self.streamlet:
streamlet['logfile'] = open("{0}.log".format(streamlet['path']), "w")
streamlet['errfile'] = open("{0}.err".format(streamlet['path']), "w")
streamlet['process'] = self.cli.get_background_process(streamlet['command'],
logfile=streamlet['logfile'], errfile=streamlet['errfile'])
if self.verbose:
print("SDSS_ACCESS> rsync stream %s logging to %s" % (streamlet['index'],streamlet['logfile'].name))
# get the number of tasks per stream
tasks_per_stream = [len(streamlet['location']) for streamlet in self.streamlet]
# submit the stream subprocesses to the background
self.cli.wait_for_processes(list(streamlet['process'] for streamlet in self.streamlet),
n_tasks=len(self.task), tasks_per_stream=tasks_per_stream)
if any(self.cli.returncode):
path = self.streamlet[0]['path'][:-3]
if self.verbose:
print("SDSS_ACCESS> return code {returncode}".format(
returncode=self.cli.returncode))
print("SDSS_ACCESS> Failed! See error logs in %s." % (path))
else:
print("SDSS_ACCESS> Done!")
for streamlet in self.streamlet:
streamlet['logfile'].close()
streamlet['errfile'].close()