Source code for macsypy.utils

#########################################################################
# MacSyFinder - Detection of macromolecular systems in protein dataset  #
#               using systems modelling and similarity search.          #
# Authors: Sophie Abby, Bertrand Neron                                  #
# Copyright (c) 2014-2023  Institut Pasteur (Paris) and CNRS.           #
# See the COPYRIGHT file for details                                    #
#                                                                       #
# This file is part of MacSyFinder package.                             #
#                                                                       #
# MacSyFinder is free software: you can redistribute it and/or modify   #
# it under the terms of the GNU General Public License as published by  #
# the Free Software Foundation, either version 3 of the License, or     #
# (at your option) any later version.                                   #
#                                                                       #
# MacSyFinder is distributed in the hope that it will be useful,        #
# but WITHOUT ANY WARRANTY; without even the implied warranty of        #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the          #
# GNU General Public License for more details .                         #
#                                                                       #
# You should have received a copy of the GNU General Public License     #
# along with MacSyFinder (COPYING).                                     #
# If not, see <https://www.gnu.org/licenses/>.                          #
#########################################################################

"""
Some macsyfinder helper functions
"""

import os
import os.path
from itertools import groupby

from .registries import DefinitionLocation
from .error import MacsypyError


[docs] def get_def_to_detect(models, model_registry): """ :param models: the list of models to detect as returned by config.models. :type models: list of tuple with the following structure: [('model_fqn', ('def1, def2, ...)), ('model_2', ('def1', ...)), ...] :param model_registry: the models registry for this run. :type model_registry: :class:`macsypy.registries.ModelRegistry` object. :return: the definitions to parse :rtype: list of :class:`macsypy.registries.DefinitionLocation` objects :raise ValueError: if a model name provided in models is not in model_registry. """ root, def_names = models root = root.rstrip(os.path.sep) model_family = DefinitionLocation.root_name(root) model_loc = model_registry[model_family] model_vers = model_loc.version if 'all' in [d.lower() for d in def_names]: if root == model_loc.name: root = None def_to_detect = model_loc.get_all_definitions(root_def_name=root) else: def_to_detect = [model_loc.get_definition(f'{root}/{one_def}') for one_def in def_names] return def_to_detect, model_family, model_vers
[docs] def get_replicon_names(genomee_path, db_type): if db_type == 'gembase': return _get_gembase_replicon_names(genomee_path) elif db_type in ('ordered_replicon', 'unordered'): return [os.path.splitext(os.path.basename(genomee_path))[0]] else: raise MacsypyError(f"Invalid genome type: {db_type}")
def _get_gembase_replicon_names(genome_path): """ parse gembase file and get the list of replicon identifiers :param str genome_path: The path to a file containing sequence in **gembase** format :return: the list of replicon identifiers :rtype: list of str """ def grp_replicon(ids): """ in gembase the identifier of fasta sequence follows the following schema: <replicon-name>_<seq-name> with eventually '_' inside the <replicon_name> but not in the <seq-name>. so grp_replicon allow to group sequences belonging to the same replicon. """ return "_".join(ids.split('_')[: -1]) seq_ids = [] with open(genome_path, 'r') as fh: for line in fh: if line.startswith('>'): seq_ids.append(line.split()[0][1:]) replicons = [rep_name for rep_name, _ in groupby(seq_ids, key=grp_replicon)] return replicons
[docs] def threads_available(): """ :return: The maximal number of threads available. It's nice with cluster scheduler or linux. On Mac it use the number of physical cores :rtype: int """ if hasattr(os, "sched_getaffinity"): threads_nb = len(os.sched_getaffinity(0)) else: threads_nb = os.cpu_count() return threads_nb
def indent_wrapper(ElementTree): """ xml.etree.ElementTree implement ident only from python 3.9 below the code from python 3.9 to inject it in ET at runtime :param ElementTree: ElementTree class :type ElementTree: class :return: function indent :rtype: function """ def indent(tree, space=" ", level=0): """Indent an XML document by inserting newlines and indentation space after elements. *tree* is the ElementTree or Element to modify. The (root) element itself will not be changed, but the tail text of all elements in its subtree will be adapted. *space* is the whitespace to insert for each indentation level, two space characters by default. *level* is the initial indentation level. Setting this to a higher value than 0 can be used for indenting subtrees that are more deeply nested inside of a document. """ if isinstance(tree, ElementTree): tree = tree.getroot() if level < 0: raise ValueError(f"Initial indentation level must be >= 0, got {level}") if not len(tree): return # Reduce the memory consumption by reusing indentation strings. indentations = ["\n" + level * space] def _indent_children(elem, level): # Start a new indentation level for the first child. child_level = level + 1 try: child_indentation = indentations[child_level] except IndexError: child_indentation = indentations[level] + space indentations.append(child_indentation) if not elem.text or not elem.text.strip(): elem.text = child_indentation for child in elem: if len(child): _indent_children(child, child_level) if not child.tail or not child.tail.strip(): child.tail = child_indentation # Dedent after the last child by overwriting the previous indentation. if not child.tail.strip(): child.tail = indentations[level] _indent_children(tree, 0) return indent
[docs] def parse_time(user_time): """ parse user friendly time and return it in seconds user time supports units as s h m d for sec min hour day or a combination of them 1h10m50s means 1 hour 10 minutes 50 seconds all terms will be converted in seconds and added :param user_time: :type user_time: int or str :return: seconds :rtype: int :raise: ValueError if user_time is not parseable """ try: user_time = int(user_time) return user_time # user time has no units , it's seconds except ValueError: pass import re parts_converter = {'s': lambda x: x, 'm': lambda x: x * 60, 'h': lambda x: x * 3600, 'd': lambda x: x * 86400 } time_parts = re.findall(r'(\d+)(\D+)', user_time) time = 0 for value, unit in time_parts: unit = unit.strip().lower() try: time += parts_converter[unit](int(value)) except KeyError: raise ValueError("Not valid time format. Units allowed h/m/s.") return time