Source code for hsr4hci.htcondor

"""
Methods for working with the HTCondor cluster system.
"""

# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------

from typing import Deque, Dict, List, Optional, Sequence, Set, Union
from pathlib import Path

import sys

from collections import deque


# -----------------------------------------------------------------------------
# CLASS DEFINITIONS
# -----------------------------------------------------------------------------

[docs]class SubmitFile: """ A class that provides the methods to comfortably create a submit file for the HTCondor cluster. """
[docs] def __init__( self, clusterlogs_dir: Optional[Union[Path, str]] = None, executable: str = sys.executable, getenv: bool = True, memory: int = 8192, cpus: int = 1, gpus: int = 0, requirements: Sequence[str] = (), ): """ Initialize a new submit file object. Args: clusterlogs_dir: The path to the folder where the log files for this submit file will be stored. If `None` is given, no logs will be created and all output will be forwarded to ``/dev/null``. executable: The executable parameter for the submit file. memory: How much memory (in MB) to request from the cluster. cpus: The number of CPUs to request from the cluster. gpus: The number of GPUs to request from the cluster. requirements: A sequence (usually: a list) with any additional requirements, for example, limitations on the available GPU memory, or the machines on which to run (e.g., `"Target.CpuFamily =!= 21"` to block certain types of nodes from running the job). """ # Store options for this submit file self.executable = executable self.getenv = getenv self.memory = memory self.cpus = cpus self.gpus = gpus self.requirements = requirements self.jobs: List[dict] = [] # Make sure that the clusterlogs_dir exists if clusterlogs_dir is not None: self.clusterlogs_dir: Optional[Path] = Path(clusterlogs_dir) self.clusterlogs_dir.mkdir(exist_ok=True, parents=True) else: self.clusterlogs_dir = None
def add_job( self, name: str, job_script: str, arguments: Dict[str, str], bid: int = 1, queue: int = 1, ) -> None: self.jobs.append( dict( name=name, job_script=job_script, arguments=arguments, bid=bid, queue=queue, ) ) def __str__(self) -> str: # Collect default header for all submit files contents = [ '#' + 78 * '-', '# GENERAL JOB REQUIREMENTS', '#' + 78 * '-' + '\n', f'executable = {self.executable}', f'getenv = {self.getenv}\n', f'request_memory = {self.memory}', f'request_cpus = {self.cpus}\n', ] # Only add request_gpus parameter if we are actually requesting GPUs if self.gpus > 0: contents.append(f'request_gpus = {self.gpus}\n') # Add requirements (e.g. CUDAGlobalMemory, black hole machines, ...) requirements_string = ' && '.join(list(self.requirements)) if requirements_string != '': contents.append(f'requirements = {requirements_string}\n') contents.append('#' + 78 * '-' + '\n\n\n') # Loop over all jobs and add them to the submit file for job in self.jobs: # Add some more formatting to the submit files to make them # easier to read contents.append('#' + 78 * '-') contents.append(f'# {job["name"].upper()}') contents.append('#' + 78 * '-' + '\n') # Add output, error and log file for job contents.append('# Logging Information') if self.clusterlogs_dir is not None: contents.append( f'output = {self.clusterlogs_dir.as_posix()}/' f'{job["name"]}.out.$(Process)' ) contents.append( f'error = {self.clusterlogs_dir.as_posix()}/' f'{job["name"]}.err.$(Process)' ) contents.append( f'log = {self.clusterlogs_dir.as_posix()}/' f'{job["name"]}.log.$(Process)' ) else: contents.append('output = /dev/null') contents.append('error = /dev/null') contents.append('log = /dev/null') contents.append('') # Add actual job and arguments contents.append('# Actual arguments defining the job') arguments = ' '.join( [ f'--{k} {job["arguments"][k]}' for k in sorted(job['arguments'].keys()) ] ) contents.append(f'arguments = {job["job_script"]} {arguments}\n') # Add the job priority / bid contents.append('# Job Priority') contents.append(f'priority = {job["bid"] - 1000}\n') # Queue the job contents.append('# Add this job to the queue') contents.append(f'queue {job["queue"]}\n') contents.append('#' + 78 * '-' + '\n\n\n') return '\n'.join(contents) def save(self, file_path: Union[Path, str]) -> None: with open(file_path, 'w') as submit_file: submit_file.write(self.__str__())
[docs]class Node: """ Auxiliary class to represent nodes in a directed acyclic graph. """
[docs] def __init__(self, name: str, attributes: dict) -> None: self.name = name self.attributes = attributes self.dependent_nodes: Set[str] = set()
def add_dependent_node(self, child_node_name: str) -> None: self.dependent_nodes.add(child_node_name)
[docs]class DAGFile: """ Create submit files for HTCondors DAGman. """
[docs] def __init__(self) -> None: self.graph: Dict[str, Node] = dict()
@property def nodes(self) -> List[Node]: return list(self.graph.values()) @property def node_names(self) -> List[str]: return list(self.graph.keys()) @property def dependent_nodes(self) -> Set[Node]: dependent_nodes: Set[Node] = set() for node in self.nodes: dependencies = set(self.graph[_] for _ in node.dependent_nodes) dependent_nodes = dependent_nodes.union(dependencies) return dependent_nodes @property def independent_nodes(self) -> Set[Node]: return set(self.nodes).difference(self.dependent_nodes) def get_node_by_name(self, node_name: str) -> Node: return self.graph[node_name] def check_validity(self) -> None: # Make sure there's at least one independent node as a starting point if len(self.independent_nodes) == 0: raise ValueError('No independent nodes!') # Make sure the graph is acyclic (this will raise a ValueError if not) self.topological_sort() def add_submit_file(self, name: str, attributes: Optional[dict]) -> None: if name in self.node_names: raise KeyError(f'Node "{name}" already exists!') if attributes is None: attributes = dict(file_path=None, bid=1) if attributes is not None and 'file_path' not in attributes.keys(): raise ValueError('attributes is missing file_path!') if attributes is not None and 'bid' not in attributes.keys(): attributes['bid'] = 1 if attributes is not None: node = Node(name=name, attributes=attributes) self.graph[name] = node def add_dependency( self, parent_node_name: str, child_node_name: str ) -> None: self.graph[parent_node_name].add_dependent_node(child_node_name) self.check_validity() def topological_sort(self) -> List[Node]: in_degree = {} for node_name in self.node_names: in_degree[node_name] = 0 for node in self.nodes: for dependent_node_name in node.dependent_nodes: in_degree[dependent_node_name] += 1 queue: Deque[str] = deque() for node_name in in_degree.keys(): if in_degree[node_name] == 0: queue.appendleft(node_name) sorted_nodes = [] while queue: node_name = queue.pop() sorted_nodes.append(self.graph[node_name]) for dependent_node_name in self.graph[node_name].dependent_nodes: in_degree[dependent_node_name] -= 1 if in_degree[dependent_node_name] == 0: queue.appendleft(dependent_node_name) if len(sorted_nodes) == len(self.nodes): return sorted_nodes raise ValueError('Graph is not acyclic!') def __str__(self) -> str: contents = list() # Add all jobs (i.e., nodes) for node in self.topological_sort(): contents.append(f'JOB {node.name} {node.attributes["file_path"]}') contents.append('') # Add all dependencies (i.e, edges) for node in self.topological_sort(): for dependency in node.dependent_nodes: contents.append(f'PARENT {node.name} CHILD {dependency}') contents.append('') # Add bids for submit files for node in self.topological_sort(): bid = node.attributes["bid"] - 1000 contents.append(f'PRIORITY {node.name} {bid}') return '\n'.join(contents) def save(self, file_path: Union[Path, str]) -> None: with open(file_path, 'w') as dag_file: dag_file.write(self.__str__())