Source code for chi.hardware

from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Tuple

from chi import exception

from .clients import blazar
from .context import get, RESOURCE_API_URL

import requests
import logging

LOG = logging.getLogger(__name__)

node_types = []


[docs] @dataclass class Node: """ Represents the Chameleon hardware that goes into a single node. A dataclass for node information directly from the hardware browser. """ site: str name: str type: str architecture: dict bios: dict cpu: dict gpu: dict main_memory: dict network_adapters: List[dict] placement: dict storage_devices: List[dict] uid: str version: str
[docs] def next_free_timeslot( self, minimum_hours: int = 1 ) -> Tuple[datetime, Optional[datetime]]: """ Finds the next available timeslot for the hardware using the Blazar client. Args: minimum_hours (int, optional): The minimum number of hours for this timeslot. Returns: A tuple containing the start and end datetime of the next available timeslot. If no timeslot is available, returns (end_datetime_of_last_allocation, None). """ def get_host_id(items, target_uid): for item in items: if item.get("uid") == target_uid: return item["id"] return None blazarclient = blazar() # Get allocation for this specific host host_id = get_host_id(blazarclient.host.list(), self.uid) allocation = blazarclient.host.get_allocation(host_id) now = datetime.now(timezone.utc) if not allocation: return (now, None) reservations = sorted(allocation["reservations"], key=lambda x: x["start_date"]) buffer = timedelta(hours=minimum_hours) # Next time this interval could possibly start possible_start = now for i in range(len(reservations)): # Check we have enough time between last known free period and this reservation this_start = _parse_blazar_dt(reservations[i]["start_date"]) if possible_start + buffer < this_start: # We found a gap return (possible_start, this_start) # Otherwise, no possible start until end of this reservation this_end = _parse_blazar_dt(reservations[i]["end_date"]) possible_start = this_end # If there was no gap, use the last reservation's end time return (possible_start, None)
def _call_api(endpoint): url = "{0}/{1}.{2}".format(RESOURCE_API_URL, endpoint, "json") LOG.info("Requesting %s from reference API ...", url) resp = requests.get(url) LOG.info("Response received. Parsing to json ...") data = resp.json() return data
[docs] def get_nodes( all_sites: bool = False, filter_reserved: bool = False, gpu: Optional[bool] = None, min_number_cpu: Optional[int] = None, node_type: Optional[str] = None, ) -> List[Node]: """ Retrieve a list of nodes based on the specified criteria. Args: all_sites (bool, optional): Flag to indicate whether to retrieve nodes from all sites. Defaults to False. filter_reserved (bool, optional): Flag to indicate whether to filter out reserved nodes. Defaults to False. (Not Currently implemented) gpu (bool, optional): Flag to indicate whether to filter nodes based on GPU availability. Defaults to None. min_number_cpu (int, optional): Minimum number of CPU logical cores per node. Defaults to None. node_type (str, optional): The node type to filter by Returns: List[Node]: A list of Node objects that match the specified criteria. """ sites = [] if all_sites: sites = [site.get("name") for site in _call_api("sites")["items"]] else: sites.append(get("region_name")) nodes = [] for site in sites: # Soufiane: Skipping CHI@EDGE since it is not enrolled in the hardware API, if site == "CHI@Edge": print( "Please visit the Hardware discovery page for information about CHI@Edge devices" ) continue endpoint = f"sites/{site.split('@')[1].lower()}/clusters/chameleon/nodes" data = _call_api(endpoint) allocations = defaultdict(list) reserved_now = set() blazarclient = blazar() now = datetime.now(timezone.utc) if filter_reserved: hosts_by_id = {} for host in blazarclient.host.list(): hosts_by_id[host["id"]] = host for resource in blazarclient.host.list_allocations(): for allocation in resource["reservations"]: blazar_host = hosts_by_id.get(resource["resource_id"], None) if blazar_host: allocations[blazar_host["hypervisor_hostname"]].append( allocation ) if _reserved_now(allocation, now): reserved_now.add(blazar_host["hypervisor_hostname"]) for node_data in data["items"]: node = Node( site=site, name=node_data.get("node_name"), type=node_data.get("node_type"), architecture=node_data.get("architecture"), bios=node_data.get("bios"), cpu=node_data.get("processor"), gpu=node_data.get("gpu"), main_memory=node_data.get("main_memory"), network_adapters=node_data.get("network_adapters"), placement=node_data.get("placement"), storage_devices=node_data.get("storage_devices"), uid=node_data.get("uid"), version=node_data.get("version"), ) if node.type not in node_types: node_types.append(node.type) if isinstance(node.gpu, list): gpu_filter = gpu is None or ( node.gpu and gpu == bool(node.gpu[0]["gpu"]) ) else: gpu_filter = gpu is None or (node.gpu and gpu == bool(node.gpu["gpu"])) cpu_filter = ( min_number_cpu is None or node.architecture["smt_size"] >= min_number_cpu ) if ( gpu_filter and cpu_filter and (not filter_reserved or node.uid not in reserved_now) and (node_type is None or node.type == node_type) ): nodes.append(node) if node_type not in node_types: if all_sites: raise exception.CHIValueError( f"Unknown node_type '{node_type}' at all sites." ) else: raise exception.CHIValueError( f"Unknown node_type '{node_type}' at {get('region_name')}." ) return nodes
def _parse_blazar_dt(datetime_string): d = datetime.strptime(datetime_string, "%Y-%m-%dT%H:%M:%S.%f") return d.replace(tzinfo=timezone.utc) def _reserved_now(allocation, now=datetime.now(timezone.utc)): start_dt_object = _parse_blazar_dt(allocation["start_date"]) end_dt_object = _parse_blazar_dt(allocation["end_date"]) return start_dt_object < now and now < end_dt_object
[docs] def get_node_types() -> List[str]: """ Retrieve a list of unique node types. Returns: List[str]: A list of unique node types. """ if len(node_types) < 1: get_nodes() return list(set(node_types))