Source code for mpqp.execution.connection.qlm_connection

from __future__ import annotations

import os
from getpass import getpass
from typing import Any

from termcolor import colored
from typeguard import typechecked

from mpqp.execution.connection.env_manager import (
    get_env_variable,
    load_env_variables,
    save_env_variable,
)
from mpqp.tools.errors import QLMRemoteExecutionError

QLM_connection = None


[docs]@typechecked def config_qlm_account(username: str, password: str, global_config: bool) -> bool: """Configures and saves locally QLM account's information. Args: username: QLM username. password: QLM password. global_config: If True, this QLM account will be configured to work even outside MPQP. Raises: QLMRemoteExecutionError: If the account could not be saved. """ # store the username and password in environment variables QLM_USER and QLM_PASSWD in .mpqp prev_user = get_env_variable("QLM_USER") prev_pass = get_env_variable("QLM_PASSWD") prev_configure = get_env_variable("QLM_CONFIGURED") if prev_configure == "": prev_configure = "False" file_content = None netrc_path = os.path.expanduser("~") + "/.netrc" if global_config: try: with open(netrc_path, 'r') as file: file_content = file.read() except: pass save_env_variable("QLM_USER", username) if not global_config: save_env_variable("QLM_PASSWD", password) try: if global_config: print("we are in the global part") # if file doesn't exist, create it, or overwrite the credentials in the ~/.netrc file with open(netrc_path, "w") as file: file.write( f"""\ machine qlm35e.neasqc.eu login {username} password {password}""" ) # Set the permissions to read and right for user only os.chmod(netrc_path, 0o600) else: if os.path.exists(netrc_path): rename_decision = input( f"'~/.netrc' already exists and will override the configuration. Do you want to rename it in '~/.netrc_back'? [Y/n]" ) if rename_decision.lower().strip() in ('', 'y', 'yes'): os.rename(netrc_path, netrc_path + "_back") from qat.qlmaas import ( QLMaaSConnection, # pyright: ignore[reportAttributeAccessIssue] ) c = QLMaaSConnection( hostname="qlm35e.neasqc.eu", port=443, authentication="password", timeout=2500, ) c.create_config() except Exception as err: save_env_variable("QLM_USER", prev_user) save_env_variable("QLM_PASSWD", prev_pass) save_env_variable("QLM_CONFIGURED", prev_configure) if global_config: if file_content is None: try: os.remove(netrc_path) except FileNotFoundError: print(f"{netrc_path} does not exist.") else: with open(netrc_path, "w") as file: file.write(file_content) if "Invalid credential" in str(err): return False raise QLMRemoteExecutionError( "Error when saving the account.\nTrace: " + str(err) ) return True
[docs]def setup_qlm_account() -> tuple[str, list[Any]]: """Setups the QLM account, by looking at the existing configuration, asking for username/password and updating the current account.""" already_configured = get_env_variable("QLM_CONFIGURED") == "True" if already_configured: decision = input( "An QLM account is already configured. Do you want to update it? [y/N] " ) if decision.lower().strip() != "y": return "Canceled.", [] username = input("Enter your QLM username: ") password = getpass("Enter your QLM password (hidden): ") global_decision = input( "Do you want to configure this account only for MPQP? [y/N] " ) global_config = True if global_decision.lower().strip() != "y" else False connection_success = config_qlm_account(username, password, global_config) if connection_success: save_env_variable("QLM_CONFIGURED", "True") if connection_success: return "QLM account correctly configured", [] else: print(colored("Wrong credential", "red")) getpass("Press 'Enter' to continue") return "", []
[docs]def get_all_job_ids() -> list[str]: """Retrieves from the remote QLM all the job-ids associated with this account. Returns: List of all job-ids associated with this account. Example: >>> get_all_job_ids() ['Job144361', 'Job144360', 'Job144359', 'Job144358', 'Job144357', 'Job143334', 'Job143333', 'Job143332', 'Job141862', 'Job141861', 'Job141722', 'Job141720', 'Job141716', 'Job141715', 'Job141712', 'Job19341'] """ if get_env_variable("QLM_CONFIGURED") == "True": connection = get_QLMaaSConnection() return [job_info.id for job_info in connection.get_jobs_info()] return []
[docs]def get_QLMaaSConnection(): """Connects to the QLM and returns the QLMaaSConnection. If the connection was already established, we only return the one stored in global variable, otherwise we instantiate a new QLMaaSConnection.""" global QLM_connection if QLM_connection is None: loaded = load_env_variables() if not loaded: raise IOError("Could not load environment variables from ~/.mpqp") if get_env_variable("QLM_CONFIGURED") == "False": raise QLMRemoteExecutionError( "Error when instantiating QLMaaSConnection. No QLM account configured." ) from qat.qlmaas.connection import QLMaaSConnection QLM_connection = QLMaaSConnection( hostname="qlm35e.neasqc.eu", port=443, authentication="password", timeout=2500, ) return QLM_connection