Source code for reprox.validate_run

"""
Validate that the data can be loaded successfully and move the data to the production folder
"""
from enum import IntEnum
import typing as ty
import json
import strax
from warnings import warn
import os
import shutil
import glob
from collections import defaultdict
from reprox import core


[docs]class ValidationLevel(IntEnum): SHALLOW = 0 DEEP = 1
[docs]class RunValidation: """ Check that a directory (corresponding to a single datatype is """ def __init__(self, path: str, context: strax.Context = None, mode: ty.Union[int, ValidationLevel] = ValidationLevel.SHALLOW, ): self.path = path self.mode = mode self.st = context if mode > ValidationLevel.SHALLOW and context is None: raise ValueError('Context is a required argument if a DEEP ' 'validation of the data is performed')
[docs] def find_error(self) -> str: """Run several checks on a path to see if the processing was done correctly""" if self._is_temp(): return 'is_temp_folder' md = self._open_metadata() if not md: return 'has_no_metadata' if self._did_fail(md): return 'has_exception' if self._misses_chunks(md): return 'misses_chunks' key = os.path.split(self.path)[-1] split_key = key.split('-') if self._wrong_format(split_key): return 'is_wrong_format' if self.mode == ValidationLevel.DEEP: run, data_type, lineage_hash = split_key if self._different_hash(run, data_type, lineage_hash): return 'cannot_validate_different_hash' if self._cannot_load(run, data_type): return 'loading_error' return False
def _is_temp(self): if '_temp' in self.path: warn(f'{self.path} is not finished', UserWarning) return True def _did_fail(self, md): if 'exception' in md: warn(f'{self.path} has an exception', UserWarning) return True def _misses_chunks(self, md): n_files = os.listdir(self.path) n_chunks = len(n_files) - 1 # metadata chunks = [c.get('n') > 0 for c in md['chunks']] if n_chunks != sum(chunks): warn(f'{self.path} misses chunks?!', UserWarning) return True def _different_hash(self, run, data_type, lineage_hash): context_lineage_hash = self.st.key_for(run, data_type).lineage_hash if lineage_hash != context_lineage_hash: warn(f'Lineage hash differs from context. Expected ' f'{lineage_hash}, got {context_lineage_hash} for ' f'{data_type}', UserWarning) return True def _cannot_load(self, run, data_type): st = self.st.new_context() st.storage = [ strax.DataDirectory( os.path.split(self.path)[0], readline=False)] try: for _ in st.get_iter(run, data_type, progress_bar=False): pass except Exception as e: warn(f'{self.path} cannot be loaded due to {e}', UserWarning) return True @staticmethod def _wrong_format(split_key): if len(split_key) != 3: warn(f'{split_key} is not correct format?!', UserWarning) return True return False def _open_metadata(self) -> dict: files = glob.glob(os.path.join(self.path, '*')) for f in files: if 'metadata' in f: md_path = f break else: return {} with open(md_path, mode='r') as f: return json.loads(f.read())
[docs]def change_ownership(path, group): shutil.chown(path, group=group) # Change to drwxrwxr-x os.chmod(path, 0o775)
[docs]def move_folder(path: str, destination_folder: str = core.config['context']['destination_folder'], group='xenon1t-admins', validation_level: int = ValidationLevel.SHALLOW, context: ty.Optional[strax.Context] = None, ) -> ty.Union[str, None]: """ Move data from path into the destination folder and change the ownership of the folder :param path: The folder to move :param destination_folder: The folder where the <path> folder should be moved to :param group: Name of the group that the permissions should be set to :param validation_level: the level at which to validate the data: - ValidationLevel.SHALLOW: <0> for basic validation - ValidationLevel.DEEP: <1> where we actually try loading the data with (requires a context) :param context: for when the validation_level is set to ValidationLevel.DEEP :return: error string if an error occurred :raises FileExistsError: when there is already a folder at the destination path """ validation = RunValidation(path, context=context, mode=validation_level) error_string = validation.find_error() if error_string: return error_string change_ownership(path, group) dest_path = os.path.join(destination_folder, os.path.split(path)[-1]) if os.path.exists(dest_path): raise FileExistsError(f'{dest_path} already exists!') shutil.move(path, dest_path) return None
[docs]def move_all( source_folder: str = os.path.join(core.config['context']['base_folder'], 'strax_data'), destination_folder: str = core.config['context']['destination_folder'], **move_kwargs): """ Move data from all folders in <source_folder> into the destination folder and change the ownership of the folder :param source_folder: The main folder where to look for folders to move :param destination_folder: The folder where the <path> folder should be moved to :param move_kwargs: Takes the following kwargs: :group: Name of the group that the permissions should be set to :validation_level: the level at which to validate the data: - ValidationLevel.SHALLOW: <0> for basic validation - ValidationLevel.DEEP: <1> where we actually try loading the data with (requires a context) :context: for when the validation_level is set to ValidationLevel.DEEP :return: None """ folders = sorted(glob.glob(os.path.join(source_folder, '*'))) fails = defaultdict(list) for folder in strax.utils.tqdm( folders, desc='move folders', disable=not core.config['display']['progress_bar'], ): try: error_string = move_folder(folder, destination_folder=destination_folder, **move_kwargs) if error_string: fails[error_string].append(folder) except FileExistsError: fails['FileExistsError'].append(folder) except Exception as e: print(f'Ran into {e}') # most likely a permission error fails['Other'].append(folder) for key, value in fails.items(): core.log.warning(f'{key}, {len(value)}') if len(value) < 5: for v in value: core.log.info(f'\t{v}') core.log.info( f'Did {len(folders)}. Ran into {sum(len(v) for v in fails.values())} failures' )