Source code for reprox.find_runs

import typing as ty
import os
import numpy as np
import pandas as pd
import strax
import utilix
from reprox import core


[docs]def find_data( targets: ty.Union[str, list, tuple], exclude_from_invalid_cmt_version: ty.Union[bool, str] = ( core.config['context']['cmt_version']), context_kwargs: ty.Optional[dict] = None, keep_detectors: ty.Union[str, tuple, list] = core.config['context']['include_detectors'].split(','), ignore_runs=tuple() ) -> None: """ Determine which data to process, see determine_data_to_reprocess :param targets: List of targets to process :param exclude_from_invalid_cmt_version: A CMT version (optional) to exclude runs that lie outside it's validity from :param context_kwargs: Any context kwargs :return: """ if context_kwargs is None: context_kwargs = {} st = core.get_context(**context_kwargs) runs = determine_data_to_reprocess( st=st, targets=targets, exclude_from_invalid_cmt=exclude_from_invalid_cmt_version, keep_detectors=keep_detectors, ignore_runs=ignore_runs, ) save_as = core.runs_csv if len(runs): runs.to_csv(save_as) core.log.info(f"Written to {save_as}") else: core.log.info("No runs to process!") if os.path.exists(save_as): core.log.info(f"Removing {save_as} since there are no runs to process") os.remove(save_as)
[docs]def determine_data_to_reprocess( st: strax.Context, targets: ty.Union[str, tuple, list] = tuple(), special_modes: ty.Union[ty.List[str], ty.Tuple[str]] = ( 'LED', 'noise', 'pmtap', 'pmtgain', 'exttrig'), keep_detectors: ty.Union[str, tuple, list] = ('tpc',), exclude_from_invalid_cmt: ty.Optional[str] = core.config['context']['cmt_version'], _max_workers: int = 50, ignore_runs=tuple(), ) -> pd.DataFrame: """ Find data that we can process. This data needs to: 1. (optional) be within the validity of a specified CMT version. Disable with exclude_from_invalid_cmt=False 2. Don't be some calibration mode (led/noise etc. data) 3. Not be available already (why would you want to reprocess that?) 4. Have the data which we need in order to compute this target. :param st: Context to run with :param targets: Data types to produce :param special_modes: list of modes to exclude to determine here (usually you can do this trivially, so no need to use this function) :param exclude_from_invalid_cmt: A CMT version whereof we will check that the CMT version extends to those ranges where we would like to reprocess. :param _max_workers: Max workers for finding the stored data :return: """ runs = st.select_runs(exclude_tags=('messy', 'abandoned')) core.log.info(f"Found {len(runs)} runs in total") if exclude_from_invalid_cmt: core.log.info('Find CMT validity') included = _within_valid_cmt_dates(runs, exclude_from_invalid_cmt) core.log.info(f"Found {np.sum(~included)}/{len(runs)} runs " f"outside of validity of {exclude_from_invalid_cmt}") runs = runs[included] if keep_detectors: correct_detector = _find_correct_detectors(runs, keep_detectors) core.log.info(f"Found {np.sum(~correct_detector)}/{len(runs)} runs " f"that do not have the {keep_detectors}-detector(s)") runs = runs[correct_detector] core.log.info('Find special modes') special_mode_mask = np.array( [any(x in mode for x in special_modes) for mode in runs['mode'].values] ) core.log.info(f"Found {np.sum(special_mode_mask)}/{len(runs)} special modes ({special_modes}) " f"Leave these alone for now.") runs = runs[~special_mode_mask] if ignore_runs: core.log.info(f'Ignoring runs: {ignore_runs}') ignore = np.in1d(runs['number'], ignore_runs) runs = runs[~ignore] core.log.info('Find already stored runs') already_done = st.select_runs(available=targets) already_done = np.in1d(runs['number'], already_done['number']) core.log.info(f"Found {np.sum(already_done)}/{len(runs)} runs where " f"the data is already stored") runs = runs[~already_done] core.log.warning(f'We are going to do a data-availability check for' f' {len(runs)} runs, this may take a while (~10 it/s)') core.log.info('Find runs with all requirements stored') has_base = strax.utils.multi_run( st.get_sources, runs['name'], targets, max_workers=_max_workers, multi_run_progress_bar=core.config['display']['progress_bar'] ) has_base = np.array(has_base) can_make = has_base['run_id'][has_base['can_make']] can_make = np.in1d(runs['name'], can_make) core.log.info(f"Found {np.sum(~can_make)}/{len(runs)} runs where there is no" f" source for {targets}") runs = runs[can_make] core.log.info(f"That leaves {len(runs)} runs to work on.") return runs
def _get_detectors(runs): return utilix.xent_collection().find( {'number': {'$in': [int(r) for r in runs]} }, projection={'number': True, 'detectors': True}, ) def _find_correct_detectors(runs, keep_detectors): """For each of the runs, find if the correct detector is in the list""" core.log.info('Find correct detector runs') if isinstance(keep_detectors, str): keep_detectors = [keep_detectors] dets = list(_get_detectors(runs['number'])) correct_detector = [] for r in runs['number']: for i, rd in enumerate(dets): if rd['number'] == int(r): det = rd.get('detectors', '?') if det == '?': raise ValueError(f'No detector for {r}:{rd}??') del dets[i] break else: raise ValueError('No rundoc?!') is_correct = any(d in det for d in keep_detectors) correct_detector.append(is_correct) correct_detector = np.array(correct_detector) return correct_detector @strax.context.Context.add_method def get_sources(self, r, targets, **kwargs): """Allow multithreading of st.get_source""" for t in strax.to_str_tuple(targets): if t not in self._plugin_class_registry: raise ValueError(f'One or more of {targets} is not in correct format') res = np.zeros(1, dtype=[('can_make', np.bool_)]) try: source = self.__get_source(r, targets) if source: res['can_make'][0] = source != set(targets) except Exception as e: core.log.error( f'No result for {r}, {targets} due to {e} is the data corrupted?' ) raise e return res # Copied from strax.Context.get_source for strax <= 1.1.3 (when this feature was added) @strax.context.Context.add_method def __get_source(self, run_id: str, target: str, check_forbidden: bool = True, ) -> ty.Union[set, None]: """ For a given run_id and target get the stored bases where we can start processing from, if no base is available, return None. :param run_id: run_id :param target: target :param check_forbidden: Check that we are not requesting to make a plugin that is forbidden by the context to be created. :return: set of plugin names that are needed to start processing from and are needed in order to build this target. """ try: return {plugin_name for plugin_name, plugin_stored in self.__stored_dependencies(run_id=run_id, target=target, check_forbidden=check_forbidden ).items() if plugin_stored} except strax.DataNotAvailable: return None # Copied from strax.Context.get_source for strax <= 1.1.3 (when this feature was added) @strax.context.Context.add_method def __stored_dependencies(self, run_id: str, target: ty.Union[str, list, tuple], check_forbidden: bool = True, _targets_stored: ty.Optional[dict] = None, ) -> ty.Optional[dict]: """ For a given run_id and target(s) get a dictionary of all the datatypes that: :param run_id: run_id :param target: target or a list of targets :param check_forbidden: Check that we are not requesting to make a plugin that is forbidden by the context to be created. :return: dictionary of data types (keys) required for building the requested target(s) and if they are stored (values) :raises strax.DataNotAvailable: if there is at least one data type that is not stored and has no dependency or if it cannot be created """ if _targets_stored is None: _targets_stored = {} targets = strax.to_str_tuple(target) if len(targets) > 1: # Multiple targets, do them all for dep in targets: self.__stored_dependencies(run_id, dep, check_forbidden=check_forbidden, _targets_stored=_targets_stored, ) return _targets_stored # Make sure we have the string not ('target',) target = targets[0] if target in _targets_stored: return this_target_is_stored = self.is_stored(run_id, target) _targets_stored[target] = this_target_is_stored if this_target_is_stored: return _targets_stored # Need to init the class e.g. if we want to allow depends on like this: # https://github.com/XENONnT/cutax/blob/d7ec0685650d03771fef66507fd6882676151b9b/cutax/cutlist.py#L33 # noqa plugin = self._plugin_class_registry[target]() dependencies = strax.to_str_tuple(plugin.depends_on) if not dependencies: raise strax.DataNotAvailable(f'Lowest level dependency {target} is not stored') forbidden = strax.to_str_tuple(self.context_config['forbid_creation_of']) if check_forbidden and target in forbidden: forbidden_warning = ( 'For {run_id}:{target}, you are not allowed to make {dep} and ' 'it is not stored. Disable check with check_forbidden=False' ) raise strax.DataNotAvailable( forbidden_warning.format(run_id=run_id, target=target, dep=target, )) self.__stored_dependencies(run_id, target=dependencies, check_forbidden=check_forbidden, _targets_stored=_targets_stored, ) return _targets_stored def _within_valid_cmt_dates(runs: pd.DataFrame, cmt_version: str) -> np.ndarray: v5_start, v5_end = utilix.rundb.cmt_global_valid_range(cmt_version) mask = pd.to_datetime(runs['end']) <= v5_end mask &= pd.to_datetime(runs['start']) >= v5_start return np.array(mask)