Source code for mpqp.execution.job

"""
When you call :func:`run<mpqp.execution.runner.run>` or
:func:`submit<mpqp.execution.runner.submit>`, a :class:`Job` is created by 
:func:`generate_job<mpqp.execution.runner.generate_job>`. This job contains all
the needed information to configure the execution, and eventually retrieve
remote results.

A :class:`Job` can be of three types, given by the :class:`JobType` enum. In 
addition, it has a status, given by the :class:`JobStatus` enum.

As described above, a :class:`Job` is generated on circuit submission so you
would in principle never need to instantiate one yourself.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Optional

from aenum import Enum, NoAlias, auto
from typeguard import typechecked

from mpqp.tools.generics import MessageEnum

# This is needed because for some reason pyright does not understand that Enum
# is a class (probably because Enum does weird things to the Enum class)
if TYPE_CHECKING:
    from enum import Enum

from mpqp.core.instruction.measurement import BasisMeasure, ExpectationMeasure, Measure

from ..core.circuit import QCircuit
from ..tools.errors import IBMRemoteExecutionError, QLMRemoteExecutionError
from .connection.ibm_connection import get_IBMProvider, get_QiskitRuntimeService
from .connection.qlm_connection import get_QLMaaSConnection
from .devices import ATOSDevice, AvailableDevice, AWSDevice, IBMDevice


[docs]class JobStatus(MessageEnum): """Possible states of a Job.""" INIT = auto() """Initializing the job.""" QUEUED = auto() """The job is in the queue.""" RUNNING = auto() """The job is currently running.""" CANCELLED = auto() """The job is cancelled.""" ERROR = auto() """An error occurred with the job.""" DONE = auto() """The job is successfully done."""
[docs]class JobType(Enum): """Possible types of Job to execute. Each type of job is restricted to some measures (and to some backends, but this is tackled by the backends themselves). """ _settings_ = NoAlias STATE_VECTOR = {BasisMeasure, type(None)} """Retrieves the vector representing the quantum state, this type is *ideal*.""" SAMPLE = {BasisMeasure} """Measures several times the quantum state in the basis, and retrieve the counts. Contrarily to the ``STATE_VECTOR`` job type, this one is *realistic*.""" OBSERVABLE = {ExpectationMeasure} """Computes the *expectation value* of an observable, using the state_vector or the samples. This type is ideal too: it requires some trickery to retrieve the expectation value in an optimal manner."""
[docs]@typechecked class Job: """Representation of a job, an object regrouping all the information about the submission of a computation/measure of a quantum circuit on a specific hardware. A job has a type, and a status, and is attached to a specific device. Moreover, the job contains also the quantum circuit and the measure to be performed on the circuit. Args: job_type: Type of the job (sample, observable, ...). circuit: Circuit to execute. device: Device (simulator, quantum computer) on which we want to execute the job. measure: Object representing the measure to perform. Examples: >>> circuit = QCircuit(3) >>> job = Job(JobType.STATE_VECTOR, circuit, IBMDevice.AER_SIMULATOR) >>> circuit.add(BasisMeasure([0, 1], shots=1000)) >>> job2 = Job( ... JobType.STATE_VECTOR, ... circuit, ... IBMDevice.AER_SIMULATOR, ... circuit.get_measurements()[0], ... ) """ # 3M-TODO: decide, when there are several measurements, if we define a # multi-measure job, or if we need several jobs. For the moment, a Job can # handle only one measurement def __init__( self, job_type: JobType, circuit: QCircuit, device: AvailableDevice, measure: Optional[Measure] = None, ): self._status = JobStatus.INIT self.job_type = job_type """See parameter description.""" self.circuit = circuit """See parameter description.""" self.device = device """See parameter description.""" self.measure = measure """See parameter description.""" self.id: Optional[str] = None """Contains the id of the remote job, used to retrieve the result from the remote provider. ``None`` if the job is local. If the job is not local, it will be set later on.""" @property def status(self): """Update and return the current job status. Mainly relevant for remote jobs.""" if self._status not in [ JobStatus.DONE, JobStatus.ERROR, JobStatus.CANCELLED, ]: # in the remote case, we need to check the current status of the job. # in the local case, it is updated automatically after each step if self.device.is_remote(): assert isinstance(self.id, str) if isinstance(self.device, ATOSDevice): self._status = get_qlm_job_status(self.id) elif isinstance(self.device, IBMDevice): self._status = get_ibm_job_status(self.id) elif isinstance(self.device, AWSDevice): self._status = get_aws_job_status(self.id) else: raise NotImplementedError( f"Cannot update job status for the device {self.device} yet" ) return self._status @status.setter def status(self, job_status: JobStatus): self._status = job_status
[docs]def get_qlm_job_status(job_id: str) -> JobStatus: """Retrieves the status of a QLM job from the id in parameter, and returns the corresponding JobStatus of this library. Args: job_id: Id of the job for which we want to retrieve the status. """ from qat.comm.qlmaas.ttypes import JobStatus as QLM_JobStatus from qat.comm.qlmaas.ttypes import QLMServiceException try: qlm_status = get_QLMaaSConnection().get_status(job_id) except QLMServiceException as e: raise QLMRemoteExecutionError( f"Something went wrong when trying to get the state of job {job_id}." f" Please make sure the job_id is correct.\n Trace: " + str(e) ) if qlm_status in [QLM_JobStatus.WAITING, QLM_JobStatus.STOPPED]: return JobStatus.QUEUED elif qlm_status == QLM_JobStatus.RUNNING: return JobStatus.RUNNING elif qlm_status == QLM_JobStatus.CANCELLED: return JobStatus.CANCELLED elif qlm_status in [ QLM_JobStatus.FAILED, QLM_JobStatus.UNKNOWN_JOB, QLM_JobStatus.IN_BUCKET, QLM_JobStatus.DELETED, ]: return JobStatus.ERROR else: return JobStatus.DONE
[docs]def get_ibm_job_status(job_id: str) -> JobStatus: """Retrieves the status of an IBM job from the id in parameter, and returns the corresponding JobStatus of this library. Args: job_id: Id of the job for which we want to retrieve the status. """ from qiskit.providers import JobStatus as IBM_JobStatus # test with QiskitRuntimeService if job_id in [e.job_id() for e in get_QiskitRuntimeService().jobs()]: ibm_job = get_QiskitRuntimeService().job(job_id) # if not, test with IBMProvider elif job_id in [e.job_id() for e in get_IBMProvider().jobs()]: ibm_job = get_IBMProvider().retrieve_job(job_id) else: raise IBMRemoteExecutionError( f"Could not find job with id {job_id} on IBM/QiskitRuntime provider" ) status = ibm_job.status() if status == IBM_JobStatus.ERROR: return JobStatus.ERROR elif status == IBM_JobStatus.CANCELLED: return JobStatus.CANCELLED elif status in [IBM_JobStatus.QUEUED, IBM_JobStatus.VALIDATING]: return JobStatus.QUEUED elif status == IBM_JobStatus.INITIALIZING: return JobStatus.INIT elif status == IBM_JobStatus.RUNNING: return JobStatus.RUNNING else: return JobStatus.DONE
[docs]def get_aws_job_status(job_id: str) -> JobStatus: """Retrieves the status of a AWS Braket from the id in parameter, and returns the corresponding JobStatus of this library. Args: job_id: Id of the job for which we want to retrieve the status. """ from braket.aws import AwsQuantumTask task = AwsQuantumTask(job_id) state = task.state() if state == "FAILED": return JobStatus.ERROR elif state == "CANCELLED": return JobStatus.CANCELLED elif state == "CREATED": return JobStatus.INIT elif state == "QUEUED": return JobStatus.QUEUED elif state == "RUNNING": return JobStatus.RUNNING else: return JobStatus.DONE