import uuid
import datetime
import socket
import getpass
import warnings
import pathlib
import yaml
import shutil
import pickle
from io import UnsupportedOperation
[docs]class FileValidationError(Exception):
pass
[docs]class DataFile:
"""
A class representing a DataFile to be made by pipeline stages
and passed on to subsequent ones.
DataFile itself should not be instantiated - instead subclasses
should be defined for different file types.
These subclasses are used in the definition of pipeline stages
to indicate what kind of file is expected. The "suffix" attribute,
which must be defined on subclasses, indicates the file suffix.
The open method, which can optionally be overridden, is used by the
machinery of the PipelineStage class to open an input our output
named by a tag.
"""
supports_parallel_write = False
suffix = None
def __init__(self, path, mode, extra_provenance=None, validate=True, **kwargs):
self.path = path
self.mode = mode
if mode not in ["r", "w"]:
raise ValueError(f"File 'mode' argument must be 'r' or 'w' not '{mode}'")
self.file = self.open(path, mode, **kwargs)
if validate and mode == 'r':
self.validate()
if mode == 'w':
self.provenance = self.generate_provenance(extra_provenance)
self.write_provenance()
else:
self.provenance = self.read_provenance()
[docs] @staticmethod
def generate_provenance(extra_provenance=None):
"""
Generate provenance information - a dictionary
of useful information about the origina
"""
UUID = uuid.uuid4().hex
creation = datetime.datetime.now().isoformat()
domain = socket.getfqdn()
username = getpass.getuser()
# Add other provenance and related
provenance = {
'uuid': UUID,
'creation': creation,
'domain': domain,
'username': username,
}
if extra_provenance:
provenance.update(extra_provenance)
return provenance
[docs] def write_provenance(self):
"""
Concrete subclasses (for which it is possible) should override
this method to save the dictionary self.provenance to the file.
"""
pass
[docs] def add_provenance(self, key, value):
"""
Concrete subclasses (for which it is possible) should override
this method to save the a new string key/value pair to file
"""
pass
[docs] def read_provenance(self):
"""
Concrete subclasses for which it is possible should override
this method and read the provenance information from the file.
Other classes will return this dictionary of UNKNOWNs
"""
provenance = {
'uuid': "UNKNOWN",
'creation': "UNKNOWN",
'domain': "UNKNOWN",
'username': "UNKNOWN",
}
return provenance
[docs] def validate(self):
"""
Concrete subclasses should override this method
to check that all expected columns are present.
"""
pass
[docs] @classmethod
def open(cls, path, mode):
"""
Open a data file. The base implementation of this function just
opens and returns a standard python file object.
Subclasses can override to either open files using different openers
(like fitsio.FITS), or, for more specific data types, return an
instance of the class itself to use as an intermediary for the file.
"""
return open(path, mode)
[docs] def close(self):
self.file.close()
[docs] @classmethod
def make_name(cls, tag):
if cls.suffix:
return f'{tag}.{cls.suffix}'
else:
return tag
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
[docs]class HDFFile(DataFile):
supports_parallel_write = True
"""
A data file in the HDF5 format.
Using these files requires the h5py package, which in turn
requires an HDF5 library installation.
"""
suffix = 'hdf5'
required_datasets = []
[docs] @classmethod
def open(cls, path, mode, **kwargs):
# Suppress a warning that h5py always displays
# on import
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import h5py
# Return an open h5py File
return h5py.File(path, mode, **kwargs)
[docs] def write_provenance(self):
"""
Write provenance information to a new group,
called 'provenance'
"""
if self.mode == 'r':
raise UnsupportedOperation("Cannot write provenance to an HDF5 "
f"file opened in read-only mode "
f"({self.mode}")
# This method *must* be called by all the processes in a parallel
# run.
self._provenance_group = self.file.create_group('provenance')
# Call the sub-method to do each item
for key, value in self.provenance.items():
self._provenance_group.attrs[key] = value
[docs] def read_provenance(self):
try:
group = self.file['provenance']
attrs = group.attrs
except KeyError:
group = None
attrs = {}
provenance = {
'uuid': attrs.get('uuid', "UNKNOWN"),
'creation': attrs.get('creation', "UNKNOWN"),
'domain': attrs.get('domain', "UNKNOWN"),
'username': attrs.get('username', "UNKNOWN"),
}
self._provenance_group = group
return provenance
[docs] def validate(self):
missing = [name for name in self.required_datasets if name not in self.file]
if missing:
text = "\n".join(missing)
raise FileValidationError(f"These data sets are missing from HDF file {self.path}:\n{text}")
[docs] def close(self):
self.file.close()
[docs]class FitsFile(DataFile):
"""
A data file in the FITS format.
Using these files requires the fitsio package.
"""
suffix = 'fits'
required_columns = []
[docs] @classmethod
def open(cls, path, mode, **kwargs):
import fitsio
# Fitsio doesn't have pure 'w' modes, just 'rw'.
# Maybe we should check if the file already exists here?
if mode == 'w':
mode = 'rw'
return fitsio.FITS(path, mode=mode, **kwargs)
[docs] def missing_columns(self, columns, hdu=1):
"""
Check that all supplied columns exist
and are in the chosen HDU
"""
ext = self.file[hdu]
found_cols = ext.get_colnames()
missing_columns = [col for col in columns if col not in found_cols]
return missing_columns
[docs] def write_provenance(self):
"""
Write provenance information to a new group,
called 'provenance'
"""
# Call the sub-method to do each item
if self.mode == 'r':
raise UnsupportedOperation("Cannot write provenance to a FITS file opened "
f"in read-only mode ({self.mode}")
for key, value in self.provenance.items():
if isinstance(value, str) and '\n' in value:
values = value.split("\n")
for i,v in enumerate(values):
self.file[0].write_key(key+f"_{i}", v)
else:
self.file[0].write_key(key, value)
[docs] def read_provenance(self):
header = self.file[0].read_header()
provenance = {
'uuid': header.get('uuid', 'UNKNOWN'),
'creation': header.get('creation', 'UNKNOWN'),
'domain': header.get('domain', 'UNKNOWN'),
'username': header.get('username', 'UNKNOWN'),
}
return provenance
[docs] def validate(self):
"""Check that the catalog has all the required columns and complain otherwise"""
# Find any columns that do not exist in the file
missing = self.missing_columns(self.required_columns)
# If there are any, raise an exception that lists them explicitly
if missing:
text = "\n".join(missing)
raise FileValidationError(f"These columns are missing from FITS file {self.path}:\n{text}")
[docs] def close(self):
self.file.close()
[docs]class TextFile(DataFile):
"""
A data file in plain text format.
"""
suffix = 'txt'
[docs]class YamlFile(DataFile):
"""
A data file in yaml format.
The top-level object in TXPipe YAML
files should always be a dictionary.
"""
suffix = 'yml'
def __init__(self, path, mode, extra_provenance=None, validate=True, load_mode='full'):
self.path = path
self.mode = mode
self.file = self.open(path, mode)
if mode == "r":
if load_mode == 'safe':
self.content = yaml.safe_load(self.file)
elif load_mode == 'full':
self.content = yaml.full_load(self.file)
elif load_mode == 'unsafe':
self.content = yaml.unsafe_load(self.file)
else:
raise ValueError(f"Unknown value {yaml_load} of load_mode. "
"Should be 'safe', 'full', or 'unsafe'")
# get provenance
self.provenance = self.read_provenance()
else:
self.provenance = self.generate_provenance(extra_provenance)
self.write_provenance()
[docs] def read(self, key):
return self.content[key]
[docs] def write(self, d):
if not isinstance(d, dict):
raise ValueError("Only dicts should be passed to YamlFile.write")
yaml.dump(d, self.file)
[docs] def write_provenance(self):
d = {'provenance': self.provenance}
self.write(d)
[docs] def read_provenance(self):
prov = self.content.pop('provenance', {})
req_provenance = {
'uuid': prov.get('uuid', "UNKNOWN"),
'creation': prov.get('creation', "UNKNOWN"),
'domain': prov.get('domain', "UNKNOWN"),
'username': prov.get('username', "UNKNOWN"),
}
prov.update(req_provenance)
return prov
[docs]class Directory(DataFile):
suffix = ''
[docs] @classmethod
def open(self, path, mode):
p = pathlib.Path(path)
if mode == "w":
if p.exists():
shutil.rmtree(p)
p.mkdir(parents=True)
else:
if not p.is_dir():
raise ValueError(f"Directory input {path} does not exist")
return p
[docs] def write_provenance(self):
"""
Write provenance information to a new group,
called 'provenance'
"""
# This method *must* be called by all the processes in a parallel
# run.
if self.mode == 'r':
raise UnsupportedOperation("Cannot write provenance to a directory opened "
f"in read-only mode ({self.mode})")
self._provenance_file = open(self.file / 'provenance.yml', 'w')
# Call the sub-method to do each item
yaml.dump(self.provenance, self._provenance_file)
self._provenance_file.flush()
[docs] def read_provenance(self):
try:
f = open(self.file / 'provenance.yml')
attrs = yaml.load(f)
except KeyError:
f = None
attrs = {}
self._provenance_file = f
provenance = {
'uuid': attrs.get('uuid', "UNKNOWN"),
'creation': attrs.get('creation', "UNKNOWN"),
'domain': attrs.get('domain', "UNKNOWN"),
'username': attrs.get('username', "UNKNOWN"),
}
return provenance
[docs]class FileCollection(Directory):
"""
Represents a grouped bundle of files, for cases where you don't
know the exact list in advance.
"""
suffix = ''
[docs] def write_listing(self, filenames):
"""
Write a listing file in the directory recording
(presumably) the filenames put in it.
"""
fn = self.path_for_file('txpipe_listing.txt')
with open(fn, 'w') as f:
yaml.dump(filenames, f)
[docs] def read_listing(self):
"""
Read a listing file from the directory.
"""
fn = self.path_for_file('txpipe_listing.txt')
with open(fn, 'w') as f:
filenames = yaml.safe_load(f)
return filenames
[docs] def path_for_file(self, filename):
"""
Get the path for a file inside the collection.
Does not check if the file exists or anything like
that.
"""
return str(self.file / filename)
[docs]class PNGFile(DataFile):
suffix = 'png'
[docs] @classmethod
def open(self, path, mode, **kwargs):
import matplotlib
import matplotlib.pyplot as plt
if mode != "w":
raise ValueError("Reading existing PNG files is not supported")
return plt.figure(**kwargs)
[docs] def close(self):
import matplotlib.pyplot as plt
self.file.savefig(self.path, metadata=self.provenance)
plt.close(self.file)
[docs] def write_provenance(self):
# provenance is written on closing the file
pass
[docs] def read_provenance(self):
raise ValueError("Reading existing PNG files is not supported")
[docs]class PickleFile(DataFile):
suffix = "pkl"
[docs] @classmethod
def open(self, path, mode, **kwargs):
return open(path, mode + "b")
[docs] def write_provenance(self):
self.write(self.provenance)
[docs] def read_provenance(self):
return self.read()
[docs] def write(self, obj):
if self.mode != "w":
raise UnsupportedOperation("Cannot write to pickle file opened in "
f"read-only ({self.mode})")
pickle.dump(obj, self.file)
[docs] def read(self):
if self.mode != "r":
raise UnsupportedOperation("Cannot read from pickle file opened in "
f"write-only ({self.mode})")
return pickle.load(self.file)