Source code for chi.hardware

import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Set, Tuple

import pandas as pd
import requests
from ipydatagrid import DataGrid, Expr, TextRenderer
from IPython.display import display
from ipywidgets import HTML, Box, Layout

from chi import exception

from .clients import blazar, connection
from .context import EDGE_RESOURCE_API_URL, RESOURCE_API_URL, get, session

LOG = logging.getLogger(__name__)

node_types = []


def _get_next_free_timeslot(allocation, minimum_hours):
    now = datetime.now(timezone.utc)

    if not allocation:
        return (now, None)

    reservations = sorted(allocation["reservations"], key=lambda x: x["start_date"])

    buffer = timedelta(hours=minimum_hours)
    # Next time this interval could possibly start
    possible_start = now
    for i in range(len(reservations)):
        # Check we have enough time between last known free period and this reservation
        this_start = _parse_blazar_dt(reservations[i]["start_date"])
        if possible_start + buffer < this_start:
            # We found a gap
            return (possible_start, this_start)

        # Otherwise, no possible start until end of this reservation
        this_end = _parse_blazar_dt(reservations[i]["end_date"])
        possible_start = this_end
    # If there was no gap, use the last reservation's end time
    return (possible_start, None)


[docs] @dataclass class Node: """ Represents the Chameleon hardware that goes into a single node. A dataclass for node information directly from the hardware browser. """ site: str name: str type: str architecture: dict bios: dict cpu: dict gpu: dict main_memory: dict network_adapters: List[dict] placement: dict storage_devices: List[dict] uid: str version: str reservable: bool
[docs] def next_free_timeslot( self, minimum_hours: int = 1 ) -> Tuple[datetime, Optional[datetime]]: """ Finds the next available timeslot for the hardware using the Blazar client. Args: minimum_hours (int, optional): The minimum number of hours for this timeslot. Returns: A tuple containing the start and end datetime of the next available timeslot. If no timeslot is available, returns (end_datetime_of_last_allocation, None). """ def get_host_id(items, target_uid): for item in items: if ( item.get("uid") == target_uid or item.get("hypervisor_hostname") == target_uid ): return item["id"] return None blazarclient = blazar() # Get allocation for this specific host host_id = get_host_id(blazarclient.host.list(), self.uid) if not host_id: raise exception.ServiceError(f"Host for {self.uid} not found in Blazar") return _get_next_free_timeslot( blazarclient.host.get_allocation(host_id), minimum_hours )
def _ipython_display_(self): """ Displays information about the node. This function is called passively by the Jupyter display system. """ layout = Layout(padding="4px 10px") style = { "description_width": "initial", "background": "#d3d3d3", "white_space": "nowrap", } reservable_style = { "description_width": "initial", "background": " #a2d9fe", "white_space": "nowrap", } if not self.reservable: reservable_style["background"] = "#f69084" children = [ HTML(f"<b>Node Name:</b> {self.name}", style=style, layout=layout), HTML(f"<b>Site:</b> {self.site}", style=style, layout=layout), HTML(f"<b>Type:</b> {self.type}", style=style, layout=layout), ] if getattr(self, "cpu", False) and "clock_speed" in self.cpu: children.append( HTML( f"<b>Clock Speed:</b> {self.cpu['clock_speed'] / 1e9:.2f} GHz", style=style, layout=layout, ) ) if ( getattr(self, "main_memory", False) and "humanized_ram_size" in self.main_memory ): children.append( HTML( f"<b>RAM:</b> {self.main_memory['humanized_ram_size']}", style=style, layout=layout, ) ) if getattr(self, "gpu", False) and "gpu" in self.gpu and self.gpu["gpu"]: if "gpu_count" in self.gpu: children.append( HTML( f"<b>GPU Count:</b> {self.gpu['gpu_count']}", style=style, layout=layout, ) ) else: children.append(HTML("<b>GPU:</b> True", style=style, layout=layout)) if "gpu_model" in self.gpu: children.append( HTML( f"<b>GPU Model:</b> {self.gpu['gpu_model']}", style=style, layout=layout, ) ) else: children.append(HTML("<b>GPU Count:</b> 0", style=style, layout=layout)) if ( getattr(self, "storage_devices", False) and len(self.storage_devices) > 0 and "humanized_size" in self.storage_devices[0] ): children.append( HTML( f"<b>Storage Size:</b> {self.storage_devices[0]['humanized_size']}", style=style, layout=layout, ) ) if getattr(self, "reservable", False): children.append( HTML( f"<b>Reservable:</b> {'Yes' if self.reservable else 'No'}", style=reservable_style, layout=layout, ) ) box = Box(children=children) box.layout = Layout(flex_flow="row wrap") display(box)
def _call_api(endpoint): url = "{0}/{1}.{2}".format(RESOURCE_API_URL, endpoint, "json") LOG.info("Requesting %s from reference API ...", url) resp = requests.get(url) LOG.info("Response received. Parsing to json ...") data = resp.json() return data
[docs] def get_nodes( all_sites: bool = False, filter_reserved: bool = False, gpu: Optional[bool] = None, min_number_cpu: Optional[int] = None, node_type: Optional[str] = None, ) -> List[Node]: """ Retrieve a list of nodes based on the specified criteria. Args: all_sites (bool, optional): Flag to indicate whether to retrieve nodes from all sites. Defaults to False. filter_reserved (bool, optional): Flag to indicate whether to filter out reserved nodes. Defaults to False. gpu (bool, optional): Flag to indicate whether to filter nodes based on GPU availability. Defaults to None. min_number_cpu (int, optional): Minimum number of CPU logical cores per node. Defaults to None. node_type (str, optional): The node type to filter by Returns: List[Node]: A list of Node objects that match the specified criteria. """ sites = [] if all_sites: sites = [site.get("name") for site in _call_api("sites")["items"]] else: sites.append(get("region_name")) nodes = [] for site in sites: # Soufiane: Skipping CHI@EDGE since it is not enrolled in the hardware API, if site == "CHI@Edge": print("See `hardware.get_devices` for information about CHI@Edge devices") continue allocations = defaultdict(list) reserved_now = set() blazarclient = blazar() now = datetime.now(timezone.utc) endpoint = f"sites/{site.split('@')[1].lower()}/clusters/chameleon/nodes" with ThreadPoolExecutor() as executor: f1 = executor.submit(_call_api, endpoint) f2 = executor.submit(blazarclient.host.list) data = f1.result() blazar_hosts = f2.result() blazar_hosts_by_id = {} for host in blazar_hosts: blazar_hosts_by_id[host["id"]] = host blazar_hosts_by_hypervisor_hostname = {} for host in blazar_hosts: blazar_hosts_by_hypervisor_hostname[host["hypervisor_hostname"]] = host if filter_reserved: for resource in blazarclient.host.list_allocations(): for allocation in resource["reservations"]: blazar_host = blazar_hosts_by_id.get(resource["resource_id"], None) if blazar_host: allocations[blazar_host["hypervisor_hostname"]].append( allocation ) if _reserved_now(allocation, now): reserved_now.add(blazar_host["hypervisor_hostname"]) for node_data in data["items"]: blazar_host = blazar_hosts_by_hypervisor_hostname.get( node_data.get("uid"), {} ) node = Node( site=site, name=node_data.get("node_name"), type=node_data.get("node_type"), architecture=node_data.get("architecture"), bios=node_data.get("bios"), cpu=node_data.get("processor"), gpu=node_data.get("gpu"), main_memory=node_data.get("main_memory"), network_adapters=node_data.get("network_adapters"), placement=node_data.get("placement"), storage_devices=node_data.get("storage_devices"), uid=node_data.get("uid"), version=node_data.get("version"), reservable=blazar_host.get("reservable"), ) if node.type not in node_types: node_types.append(node.type) if isinstance(node.gpu, list): gpu_filter = gpu is None or ( node.gpu and gpu == bool(node.gpu[0].get("gpu")) ) else: gpu_filter = gpu is None or ( node.gpu and gpu == bool(node.gpu.get("gpu")) ) cpu_filter = ( min_number_cpu is None or node.architecture.get("smt_size", 0) >= min_number_cpu ) free_and_reservable = node.uid not in reserved_now and node.reservable if ( gpu_filter and cpu_filter and (not filter_reserved or free_and_reservable) and (node_type is None or node.type == node_type) ): nodes.append(node) if node_type is not None and node_type not in node_types: if all_sites: raise exception.CHIValueError( f"Unknown node_type '{node_type}' at all sites." ) else: raise exception.CHIValueError( f"Unknown node_type '{node_type}' at {get('region_name')}." ) return nodes
def _parse_blazar_dt(datetime_string): d = datetime.strptime(datetime_string, "%Y-%m-%dT%H:%M:%S.%f") return d.replace(tzinfo=timezone.utc) def _reserved_now(allocation, now=datetime.now(timezone.utc)): start_dt_object = _parse_blazar_dt(allocation["start_date"]) end_dt_object = _parse_blazar_dt(allocation["end_date"]) return start_dt_object < now and now < end_dt_object
[docs] def get_node_types() -> List[str]: """ Retrieve a list of unique node types. Returns: List[str]: A list of unique node types. """ if len(node_types) < 1: get_nodes() return list(set(node_types))
def _reservable_color(cell): return "#a2d9fe" if cell.value else "#f69084" def _gpu_background_color(cell): return "#d3d3d3" if not cell.value else None
[docs] def show_nodes(nodes: Optional[List[Node]] = None) -> None: """ Display a sortable, filterable table of available nodes. Args: nodes (Optional[List[Node]], optional): A list of Node objects to display. If not provided, defaults to the output of hardware.get_nodes(). Returns: None """ def estimate_column_width(df, column, char_px=7, padding=0): if column not in df.columns: raise ValueError(f"Column '{column}' not found in DataFrame.") max_chars = df[column].astype(str).map(len).max() return max(max_chars * char_px + padding, 80) if not nodes: nodes = get_nodes() rows = [] for n in nodes: fixed_gpu = {} if not n.gpu or isinstance(n.gpu, list) else n.gpu rows.append( { "Node Name": n.name, "Type": n.type, "Clock Speed (GHz)": round((n.cpu.get("clock_speed") or 0) / 1e9, 2), "RAM": n.main_memory.get("humanized_ram_size", "N/A"), "GPU Model": fixed_gpu.get("gpu_model") or "", "GPU Count": fixed_gpu.get("gpu_count") or "", "Storage Size": n.storage_devices[0].get("humanized_size", "N/A") if n.storage_devices else "N/A", "Site": n.site, "Reservable": bool(n.reservable), } ) df = pd.DataFrame(rows) renderers = { "Reservable": TextRenderer( text_color="black", background_color=Expr(_reservable_color), ), "GPU Model": TextRenderer( background_color=Expr(_gpu_background_color), ), "GPU Count": TextRenderer( background_color=Expr(_gpu_background_color), ), } grid = DataGrid( df, layout=Layout(height="400px"), selection_mode="row", renderers=renderers, column_widths={ "Node Name": int(estimate_column_width(df, "Node Name")), "Site": int(estimate_column_width(df, "Site")), "Type": int(estimate_column_width(df, "Type")), "RAM": int(estimate_column_width(df, "RAM")), "Storage Size": int(estimate_column_width(df, "Storage Size")), "Clock Speed (GHz)": 55, "GPU Model": 90, "GPU Count": 30, "key": 30, "Reservable": 55, }, df=pd.DataFrame(rows), ) display(grid)
[docs] @dataclass class Device: """ A dataclass for device information directly from the hardware browser. """ device_name: str device_type: str supported_device_profiles: List[str] authorized_projects: Set[str] owning_project: str uuid: str reservable: bool
[docs] def next_free_timeslot( self, minimum_hours: int = 1 ) -> Tuple[datetime, Optional[datetime]]: """ Finds the next available timeslot for the device using the Blazar client. Args: minimum_hours (int, optional): The minimum number of hours for this timeslot. Returns: A tuple containing the start and end datetime of the next available timeslot. If no timeslot is available, returns (end_datetime_of_last_allocation, None). """ def get_device_id(items, target_uid): for item in items: if item.get("uid") == target_uid or item.get("uid") == target_uid: return item["id"] return None blazarclient = blazar() # Get allocation for this specific device device_id = get_device_id(blazarclient.device.list(), self.uuid) if not device_id: raise exception.ServiceError(f"Device for {self.uuid} not found in Blazar") # Bug in Blazar API for devices means `get_alloction` doesn't work. We get around this with `list` allocs = blazarclient.device.list_allocations() this_alloc = None for alloc in allocs: if alloc["resource_id"] == device_id: this_alloc = alloc return _get_next_free_timeslot(this_alloc, minimum_hours)
[docs] def get_devices( device_type: Optional[str] = None, filter_reserved: bool = False, filter_unauthorized: bool = True, ) -> List[Device]: """ Retrieve a list of devices based on the specified criteria. Args: device_type (str, optional): The device type to filter by filter_reserved (bool, optional): Flag to indicate whether to filter out reserved devices. Defaults to False. filter_unauthorized (bool, optional): Filter devices that the current project is not authorized to use Returns: List[Device]: A list of Device objects that match the specified criteria. """ # Query hardware API res = requests.get(EDGE_RESOURCE_API_URL) try: res.raise_for_status() except requests.exceptions.HTTPError: raise exception.ServiceError( f"Failed to get devices. Status code {res.status_code}" ) blazarclient = blazar() # Blazar uid matches doni's uuid, so we need to map blazar id to blazar uid for allocations, # and uid to id for reservable status blazar_devices_by_id = {} blazar_devices_by_uid = {} for device in blazarclient.device.list(): blazar_devices_by_id[device["id"]] = device blazar_devices_by_uid[device["uid"]] = device devices = [] for dev_json in res.json(): blazar_host = blazar_devices_by_uid.get(dev_json.get("uuid"), {}) devices.append( Device( device_name=dev_json["device_name"], device_type=dev_json["device_type"], supported_device_profiles=dev_json["supported_device_profiles"], authorized_projects=set(dev_json["authorized_projects"]), owning_project=dev_json["owning_project"], uuid=dev_json["uuid"], reservable=blazar_host.get( "reservable", False ), # not all devices will appear in blazar if registration failed ) ) # Filter based on authorized projects authorized_devices = [] if filter_unauthorized else devices if filter_unauthorized: conn = connection(session=session()) current_project_id = conn.current_project_id for device in devices: if ( "all" in device.authorized_projects or current_project_id in device.authorized_projects ): authorized_devices.append(device) # Filter based on device type matching_type_devices = [] if device_type else authorized_devices if device_type: for device in authorized_devices: if device.device_type == device_type: matching_type_devices.append(device) # Filter based on reserved status unreserved_devices = [] if filter_reserved else matching_type_devices if filter_reserved: now = datetime.now(timezone.utc) reserved_devices = set() for resource in blazarclient.device.list_allocations(): blazar_device = blazar_devices_by_id.get(resource["resource_id"], None) if blazar_device: for allocation in resource["reservations"]: if _reserved_now(allocation, now): reserved_devices.add(blazar_device["uid"]) for device in matching_type_devices: # Ensure the device is free and in `reservable` state if device.uuid not in reserved_devices and device.reservable: unreserved_devices.append(device) return unreserved_devices
[docs] def get_device_types() -> List[str]: """ Retrieve a list of unique device types. Returns: List[str]: A list of unique device types. """ return list(set(d.device_type for d in get_devices()))