"""
Once the circuit is defined, you can to execute it and retrieve the result using
the function :func:`run`. You can execute said circuit on one or several devices
(local or remote). The function will wait (blocking) until the job is completed
and will return a :class:`Result<mpqp.execution.result.Result>` in only one
device was given or a :class:`BatchResult<mpqp.execution.result.BatchResult>`
otherwise (see :ref:`below<Results>`).
Alternatively, when running jobs on a remote device, you could prefer to
retrieve the result asynchronously, without having to wait and block the
application until the computation is completed. In that case, you can use the
:func:`submit` instead. It will submit the job and
return the corresponding job id and :class:`Job<mpqp.execution.job.Job>` object.
.. note::
Unlike :func:`run`, we can only submit on one device at a time.
"""
from __future__ import annotations
from numbers import Complex
from typing import Iterable, Optional, Union
import numpy as np
from sympy import Expr
from typeguard import typechecked
from mpqp.core.circuit import QCircuit
from mpqp.core.instruction.measurement.basis_measure import BasisMeasure
from mpqp.core.instruction.measurement.expectation_value import (
ExpectationMeasure,
Observable,
)
from mpqp.execution.devices import (
ATOSDevice,
AvailableDevice,
AWSDevice,
GOOGLEDevice,
IBMDevice,
)
from mpqp.execution.job import Job, JobStatus, JobType
from mpqp.execution.providers.atos import run_atos, submit_QLM
from mpqp.execution.providers.aws import run_braket, submit_job_braket
from mpqp.execution.providers.google import run_google
from mpqp.execution.providers.ibm import run_ibm, submit_ibmq
from mpqp.execution.result import BatchResult, Result
from mpqp.tools.errors import DeviceJobIncompatibleError, RemoteExecutionError
from mpqp.tools.generics import OneOrMany
[docs]@typechecked
def adjust_measure(measure: ExpectationMeasure, circuit: QCircuit):
"""We allow the measure to not span the entire circuit, but providers
usually don't support this behavior. To make this work we tweak the measure
this function to match the expected behavior.
In order to do this, we add identity measures on the qubits not targeted by
the measure. In addition of this, some swaps are automatically added so the
the qubits measured are ordered and contiguous (though this is done in
:func:`generate_job`)
Args:
measure: The expectation measure, potentially incomplete.
circuit: The circuit to which will be added the potential swaps allowing
the user to get the expectation value of the qubits in an arbitrary
order (this part is not handled by this function).
Returns:
The measure padded with the identities before and after.
"""
Id_before = np.eye(2 ** measure.rearranged_targets[0])
Id_after = np.eye(2 ** (circuit.nb_qubits - measure.rearranged_targets[-1] - 1))
tweaked_measure = ExpectationMeasure(
list(range(circuit.nb_qubits)),
Observable(
np.kron(
np.kron(Id_before, measure.observable.matrix), Id_after
) # pyright: ignore[reportArgumentType]
),
measure.shots,
)
return tweaked_measure
[docs]@typechecked
def generate_job(
circuit: QCircuit, device: AvailableDevice, values: dict[Expr | str, Complex] = {}
) -> Job:
"""Creates the Job of appropriate type and containing the information needed
for the execution of the circuit.
If the circuit contains symbolic variables (see section :ref:`VQA` for more
information on them), the ``values`` parameter is used perform the necessary
substitutions.
Args:
circuit: Circuit to be run.
device: Device on which the circuit will be run.
values: Set of values to substitute symbolic variables.
Returns:
The Job containing information about the execution of the circuit.
"""
circuit = circuit.subs(values, True)
m_list = circuit.get_measurements()
nb_meas = len(m_list)
if nb_meas == 0:
job = Job(JobType.STATE_VECTOR, circuit, device)
elif nb_meas == 1:
measurement = m_list[0]
if isinstance(measurement, BasisMeasure):
# TODO: handle other basis by adding the right rotation (change
# of basis) before measuring in the computational basis
# Muhammad: circuit.add(CustomGate(UnitaryMatrix(change_of_basis_inverse)))
if measurement.shots <= 0:
job = Job(JobType.STATE_VECTOR, circuit, device)
else:
job = Job(JobType.SAMPLE, circuit, device, measurement)
elif isinstance(measurement, ExpectationMeasure):
job = Job(
JobType.OBSERVABLE,
circuit + measurement.pre_measure,
device,
adjust_measure(measurement, circuit),
)
else:
raise NotImplementedError(
f"Measurement type {type(measurement)} not handled"
)
else:
raise NotImplementedError(
"Current version of MPQP do not support multiple measurement in a "
"circuit."
)
return job
@typechecked
def _run_single(
circuit: QCircuit, device: AvailableDevice, values: dict[Expr | str, Complex]
) -> Result:
"""Runs the circuit on the ``backend``. If the circuit depends on variables,
the ``values`` given in parameters are used to do the substitution.
Args:
circuit: QCircuit to be run.
device: Device, on which the circuit will be run.
values: Set of values to substitute symbolic variables. Defaults to ``{}``.
Returns:
The Result containing information about the measurement required.
Raises:
DeviceJobIncompatibleError: if a non noisy simulator is given in
parameter and the circuit contains noise
NotImplementedError: If the device is not handled for noisy simulation
or other submissions.
Example:
>>> c = QCircuit([H(0), CNOT(0, 1), BasisMeasure([0, 1], shots=1000)], label="Bell pair")
>>> result = run(c, IBMDevice.AER_SIMULATOR)
>>> print(result) # doctest: +SKIP
Result: IBMDevice, AER_SIMULATOR
Probabilities: [0.523, 0, 0, 0.477]
Counts: [523, 0, 0, 477]
Samples:
State: 00, Index: 0, Count: 523, Probability: 0.523
State: 11, Index: 3, Count: 477, Probability: 0.477
Error: None
"""
job = generate_job(circuit, device, values)
job.status = JobStatus.INIT
if circuit.noises:
if not device.is_noisy_simulator():
raise DeviceJobIncompatibleError(
f"Device {device} cannot simulate circuits containing NoiseModels."
)
elif not (isinstance(device, ATOSDevice) or isinstance(device, AWSDevice)):
raise NotImplementedError(
f"Noisy simulations are not yet available on devices of type {type(device).name}."
)
if isinstance(device, IBMDevice):
return run_ibm(job)
elif isinstance(device, ATOSDevice):
return run_atos(job)
elif isinstance(device, AWSDevice):
return run_braket(job)
elif isinstance(device, GOOGLEDevice):
return run_google(job)
else:
raise NotImplementedError(f"Device {device} not handled")
[docs]@typechecked
def run(
circuit: OneOrMany[QCircuit],
device: OneOrMany[AvailableDevice],
values: Optional[dict[Expr | str, Complex]] = None,
) -> Union[Result, BatchResult]:
"""Runs the circuit on the backend, or list of backend, provided in
parameter.
If the circuit contains symbolic variables (see section :ref:`VQA` for more
information on them), the ``values`` parameter is used perform the necessary
substitutions.
Args:
circuit: QCircuit to be run.
device: Device, or list of devices, on which the circuit will be run.
values: Set of values to substitute symbolic variables. Defaults to ``{}``.
Returns:
The Result containing information about the measurement required.
Examples:
>>> c = QCircuit(
... [X(0), CNOT(0, 1), BasisMeasure([0, 1], shots=1000)],
... label="X CNOT circuit",
... )
>>> result = run(c, IBMDevice.AER_SIMULATOR)
>>> print(result)
Result: X CNOT circuit, IBMDevice, AER_SIMULATOR
Counts: [0, 0, 0, 1000]
Probabilities: [0, 0, 0, 1]
Samples:
State: 11, Index: 3, Count: 1000, Probability: 1.0
Error: None
>>> batch_result = run(
... c,
... [ATOSDevice.MYQLM_PYLINALG, AWSDevice.BRAKET_LOCAL_SIMULATOR]
... )
>>> print(batch_result)
BatchResult: 2 results
Result: X CNOT circuit, ATOSDevice, MYQLM_PYLINALG
Counts: [0, 0, 0, 1000]
Probabilities: [0, 0, 0, 1]
Samples:
State: 11, Index: 3, Count: 1000, Probability: 1.0
Error: 0.0
Result: X CNOT circuit, AWSDevice, BRAKET_LOCAL_SIMULATOR
Counts: [0, 0, 0, 1000]
Probabilities: [0, 0, 0, 1]
Samples:
State: 11, Index: 3, Count: 1000, Probability: 1.0
Error: None
>>> c2 = QCircuit(
... [X(0), X(1), BasisMeasure([0, 1], shots=1000)],
... label="X circuit",
... )
>>> result = run([c,c2], IBMDevice.AER_SIMULATOR)
>>> print(result)
BatchResult: 2 results
Result: X CNOT circuit, IBMDevice, AER_SIMULATOR
Counts: [0, 0, 0, 1000]
Probabilities: [0, 0, 0, 1]
Samples:
State: 11, Index: 3, Count: 1000, Probability: 1.0
Error: None
Result: X circuit, IBMDevice, AER_SIMULATOR
Counts: [0, 0, 0, 1000]
Probabilities: [0, 0, 0, 1]
Samples:
State: 11, Index: 3, Count: 1000, Probability: 1.0
Error: None
"""
if values is None:
values = {}
if isinstance(circuit, Iterable):
if isinstance(device, Iterable):
return BatchResult([_run_single(circ, dev, values) for circ in circuit for dev in device])
else:
return BatchResult([_run_single(circ, device, values) for circ in circuit])
else:
if isinstance(device, Iterable):
return BatchResult([_run_single(circuit, dev, values) for dev in device])
else:
return _run_single(circuit, device, values)
[docs]@typechecked
def submit(
circuit: QCircuit, device: AvailableDevice, values: dict[Expr | str, Complex] = {}
) -> tuple[str, Job]:
"""Submit the job related with the circuit on the remote backend provided in
parameter. The submission returns a ``job_id`` that can be used to retrieve
the :class:`Result<mpqp.execution.result.Result>` later, using the
:func:`get_remote_result<mpqp.execution.remote_handler.get_remote_result>`
function.
If the circuit contains symbolic variables (see section :ref:`VQA` for more
information on them), the ``values`` parameter is used perform the necessary
substitutions.
Mind that this function only support single device submissions.
Args:
circuit: QCircuit to be run.
device: Remote device on which the circuit will be submitted.
values: Set of values to substitute symbolic variables.
Returns:
The job id provided by the remote device after submission of the job.
Example:
>>> circuit = QCircuit([H(0), CNOT(0,1), BasisMeasure([0,1], shots=10)])
>>> job_id, job = submit(circuit, ATOSDevice.QLM_LINALG) #doctest: +SKIP
Logging as user <qlm_user>...
Submitted a new batch: Job766
>>> print("Status of " +job_id +":", job.job_status) #doctest: +SKIP
Status of Job766: JobStatus.RUNNING
"""
if not device.is_remote():
raise RemoteExecutionError(
"submit(...) function is only made for remote device."
)
job = generate_job(circuit, device, values)
job.status = JobStatus.INIT
if isinstance(device, IBMDevice):
job_id, _ = submit_ibmq(job)
elif isinstance(device, ATOSDevice):
job_id, _ = submit_QLM(job)
elif isinstance(device, AWSDevice):
job_id, _ = submit_job_braket(job)
else:
raise NotImplementedError(f"Device {device} not handled")
return job_id, job