Source code for pennylane_sf.gbs

# Copyright 2018-2020 Xanadu Quantum Technologies Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at


# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
This module contains the variational GBS device.

The Strawberry Fields variational GBS device is a simulator that provides a way to encode
variational parameters into GBS so that the gradient with respect to the output probability
distribution is accessible.
from collections import OrderedDict

import numpy as np
import strawberryfields as sf
from strawberryfields.ops import GraphEmbed, MeasureFock
from strawberryfields.utils import all_fock_probs_pnr
from thewalrus.quantum import adj_scaling as rescale
from thewalrus.quantum import photon_number_mean_vector

import pennylane as qml
from pennylane.wires import Wires

from .expectations import identity
from .simulator import StrawberryFieldsSimulator

# pylint: disable=too-many-instance-attributes
[docs]class StrawberryFieldsGBS(StrawberryFieldsSimulator): r"""StrawberryFields variational GBS device for PennyLane. This device provides a simulator that can embed variational parameters into GBS such that the analytic gradient of the probability distribution is accessible. For more details see :doc:`/devices/gbs`. Args: wires (int or Iterable[Number, str]]): the number of modes to initialize the device in shots (int): Number of circuit evaluations/random samples used to estimate expectation values of observables. If ``shots=None``, circuit evaluations are computed analytically. cutoff_dim (int): Fock-space truncation dimension use_cache (bool): indicates whether to use samples from previous evaluations to speed up calculation of the probability distribution. If ``shots=None``, this setting is ignored. samples (array): pre-generated samples using the input adjacency matrix specified by :class:`ParamGraphEmbed`. In non-analytic mode with ``use_cache=True``, probabilities will be inferred from these samples rather than generating new samples, resulting in faster evaluation of the circuit and its derivative. """ name = "Strawberry Fields variational GBS PennyLane plugin" short_name = "strawberryfields.gbs" _operation_map = { "ParamGraphEmbed": GraphEmbed, } _observable_map = { "Identity": identity, } _circuits = {} _capabilities = {"model": "cv", "provides_jacobian": True} def __init__(self, wires, *, shots=None, cutoff_dim, use_cache=False, samples=None): super().__init__(wires, shots=shots) self.cutoff = cutoff_dim self._use_cache = use_cache self.samples = samples self._params = None self._WAW = None self.Z_inv = None
[docs] @staticmethod def calculate_WAW(params, A): """Calculates the :math:`WAW` matrix. Args: params (array[float]): variable parameters A (array[float]): adjacency matrix Returns: array[float]: the :math:`WAW` matrix """ W = np.diag(np.sqrt(params)) return W @ A @ W
[docs] @staticmethod def calculate_n_mean(A): """Calculates the mean number of photons for an adjacency matrix encoded into GBS. .. warning:: Note that for ``A`` to be directly encoded into GBS, its singular values must be less than one. Args: A (array[float]): adjacency matrix Returns: float: mean number of photons """ singular_values = np.linalg.svd(A, compute_uv=False) if not all(singular_values < 1): raise ValueError("Singular values of matrix A must be less than 1") return np.sum(singular_values ** 2 / (1 - singular_values ** 2))
# pylint: disable=missing-function-docstring
[docs] def execute(self, queue, observables, parameters=None, **kwargs): parameters = parameters or {} if len(queue) > 1: raise ValueError( "The StrawberryFieldsGBS device accepts only a single application of ParamGraphEmbed" ) return super().execute(queue, observables, parameters, **kwargs)
# pylint: disable=pointless-statement,expression-not-assigned
[docs] def apply(self, operation, wires, par): self._params, A, n_mean = par A *= rescale(A, n_mean) self.Z_inv = self._calculate_z_inv(A) if len(self._params) != self.num_wires: raise ValueError( "The number of variable parameters must be equal to the total number of wires." ) self._WAW = self.calculate_WAW(self._params, A) if self.shots is not None and self._use_cache: op = GraphEmbed(A, mean_photon_per_mode=n_mean / len(A)) else: n_mean_WAW = self.calculate_n_mean(self._WAW) op = GraphEmbed(self._WAW, mean_photon_per_mode=n_mean_WAW / len(A)) op | [self.q[wires.index(i)] for i in wires] if self.shots is not None: MeasureFock() | [self.q[wires.index(i)] for i in wires]
[docs] def reset(self): if self.shots is not None and self._use_cache and self.samples is not None: # In this case, we do not reset because we want to keep our samples return super().reset()
[docs] def pre_measure(self): self.eng = sf.Engine("gaussian", backend_options={"cutoff_dim": self.cutoff}) if self.shots is None: results = elif self._use_cache and self.samples is not None: # use the cached samples return else: results =, shots=self.shots) self.state = results.state self.samples = results.samples
def _reparametrize_probability(self, p): """Takes an input probability distribution of :math:`A` and rescales it to the probability distribution of :math:`WAW`. Args: p (array): the probability distribution of :math:`A` Returns: array: the probability distribution of :math:`WAW` """ Z_inv = self._calculate_z_inv(self._WAW) ind_all_wires = np.ndindex(*[self.cutoff] * self.num_wires) for s in ind_all_wires: res =, s)) p[tuple(s)] *= res * Z_inv / self.Z_inv return p def _marginal_over_wires(self, wires, array): """Calculates the marginal distribution over the specified wires by summing over the remaining. The flat input array is reshaped to have ``self.num_wires + 1`` axes. Args: wires (int or Iterable[Number, str]]): the wires to find marginals over array (array): the input array Returns: array: the traced-over array """ trailing = int(array.size / self.cutoff ** self.num_wires) array = array.reshape([self.cutoff] * self.num_wires + [trailing]) all_indices = set(range(self.num_wires)) requested_indices = set(self.wires.indices(wires)) trace_over_indices = all_indices - requested_indices # Find indices to trace over return np.sum(array, axis=tuple(trace_over_indices)).ravel()
[docs] def probability(self, wires=None): wires = wires or self.wires wires = Wires(wires) if self.shots is None: return super().probability(wires=wires) samples = np.take(self.samples, self.wires.indices(wires), axis=1) fock_probs = all_fock_probs_pnr(samples) cutoff = fock_probs.shape[0] diff = max(self.cutoff - cutoff, 0) probs = np.pad(fock_probs, [(0, diff)] * len(wires)) if self._use_cache: if len(wires) < self.num_wires: raise ValueError( "Caching is only supported when returning the probabilities on " "all of the wires" ) probs = self._reparametrize_probability(probs) ind = np.ndindex(*[self.cutoff] * len(wires)) probs = OrderedDict((tuple(i), probs[tuple(i)]) for i in ind) return probs
[docs] def jacobian(self, tape): """Calculates the Jacobian of the device. Args: tape (.QuantumTape): the tape to compute the Jacobian of. Returns: array[float]: Jacobian matrix of size (``len(probs)``, ``num_wires``) """ self.reset() operations, observables = (tape.operations, tape.observables) requested_wires = observables[0].wires # Create dummy observable to measure probabilities over all wires obs_all_wires = qml.probs(wires=self.wires) prob = self.execute(operations, [obs_all_wires])[0] jac = self._jacobian_all_wires(prob) if requested_wires == self.wires: return jac return self._marginal_over_wires(requested_wires, jac).reshape(-1, self.num_wires)
def _jacobian_all_wires(self, prob): """Calculates the jacobian of the probability distribution with respect to all wires. This function uses Eq. (28) of `this <>`__ paper. Args: prob (array[float]): the probability distribution as a flat array Returns: array[float]: the jacobian """ n = len(self._WAW) disp = np.zeros(2 * n) cov = self.calculate_covariance(self._WAW, hbar=self.hbar) mean_photons_by_mode = photon_number_mean_vector(disp, cov, hbar=self.hbar) basis_states = np.array(list(np.ndindex(*[self.cutoff] * self.num_wires))) jac = (basis_states - mean_photons_by_mode) * np.expand_dims(prob, 1) / self._params return jac
[docs] @staticmethod def calculate_covariance(A, hbar): r"""Calculates the covariance matrix corresponding to an input adjacency matrix. Args: A (array[float]): adjacency matrix hbar (float): the convention chosen in the canonical commutation relation :math:`[x, p] = i \hbar` Returns: array[float]: covariance matrix """ n = len(A) I = np.identity(2 * n) o_mat = np.block([[np.zeros_like(A), np.conj(A)], [A, np.zeros_like(A)]]) cov = hbar * (np.linalg.inv(I - o_mat) - I / 2) return cov
@staticmethod def _calculate_z_inv(A): """Calculates the 1/Z normalization coefficient from GBS. Args: A (array): adjacency matrix Returns: float: the coefficient """ n = len(A) I = np.identity(2 * n) o_mat = np.block([[np.zeros_like(A), np.conj(A)], [A, np.zeros_like(A)]]) return np.sqrt(np.linalg.det(I - o_mat)) @property def use_cache(self): """Indicates whether to use samples from previous evaluations to speed up calculation of the probability distribution. If ``shots=None``, this setting is ignored.""" return self._use_cache