diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cc0a2f6..290e773 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -48,6 +48,7 @@ repos: - types-requests - typing-extensions - pytest + - matplotlib - scipy-stubs ci: diff --git a/README.md b/README.md index 1ea29ca..594b5cb 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,9 @@ We are looking forward to your contributions via Pull-Requests. -### Installation for developers +## Installation for developers -#### Prerequisites for Windows +### Prerequisites for Windows Before installing `MRseq` with development dependencies on Windows, you need: @@ -29,7 +29,7 @@ Before installing `MRseq` with development dependencies on Windows, you need: 2. **Rust toolchain** (automatically installed by MRzeroCore if not present) -#### Installation steps +### Installation steps 1. Clone the `MRseq` repository 2. Create/select a python environment @@ -37,7 +37,7 @@ Before installing `MRseq` with development dependencies on Windows, you need: 4. Setup pre-commit hook: ``` pre-commit install ``` -### Licensing and Dependencies +## Licensing and Dependencies The core source code of `MRseq` is licensed under the **Apache-2.0 license**. @@ -45,3 +45,21 @@ For quality control and demonstration purposes, the notebooks in the `examples/` **Important for Commercial Users:** The AGPL dependency is listed only as an optional dev dependency. Installing this package via `pip install mrseq` **does not** install the AGPL library. Your usage of the core library remains subject to the permissive Apache-2.0 terms, unaffected by the licensing of the test suite or examples. + +## Contact +For questions or support, please contact us: + +> Christoph Kolbitsch +> Physikalisch-Technische Bundesanstalt (PTB) +> Abbestraße 2-12 +> 10587 Berlin, Germany +> christoph.kolbitsch@ptb.de + +> Patrick Schünke +> NVision Quantum Technologies GmbH +> Wolfgang-Paul-Straße 2 +> 89081 Ulm, Germany +> patrick.schuenke@nvision-quantum.com + +## Acknowledgments +We would like to thank [NVision Quantum Technologies GmbH](https://www.nvision-quantum.com/) for their contributions to this project. diff --git a/examples/epi2d.ipynb b/examples/epi2d.ipynb new file mode 100644 index 0000000..5fc4a78 --- /dev/null +++ b/examples/epi2d.ipynb @@ -0,0 +1,399 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Echo Planar Imaging\n", + "\n", + "Qualitative imaging with an EPI readout." + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "### Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "import tempfile\n", + "from pathlib import Path\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import MRzeroCore as mr0\n", + "import numpy as np\n", + "from mrpro.algorithms.reconstruction import DirectReconstruction\n", + "from mrpro.data import KData\n", + "from mrpro.data.traj_calculators import KTrajectoryIsmrmrd\n", + "\n", + "from mrseq.scripts.epi2d_fid import main as create_seq_fid\n", + "from mrseq.scripts.epi2d_se import main as create_seq_se\n", + "from mrseq.utils import combine_ismrmrd_files\n", + "from mrseq.utils import sys_defaults\n", + "from mrseq.utils.EpiReadout import EpiReadout" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "### Settings\n", + "We are going to use a numerical phantom with a matrix size of 128 x 128. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "image_matrix_size = [128, 128]\n", + "flip_angle_degree = 12\n", + "n_dummy_excitations = 200\n", + "\n", + "tmp = tempfile.TemporaryDirectory()" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "### Create the digital phantom\n", + "\n", + "We use the standard Brainweb phantom from [MRzero](https://github.com/MRsources/MRzero-Core), but we choose the B1-field to be constant everywhere." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "phantom = mr0.util.load_phantom(image_matrix_size)\n", + "phantom.B1[:] = 1.0" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "### EPI readouts\n", + "\n", + "Single-shot EPI readouts are a very efficient way to obtain a full image after a single excitation. \n", + "There are different options of how they can be carried out. " + ] + }, + { + "cell_type": "markdown", + "id": "8", + "metadata": {}, + "source": [ + "We can have a symmetric readout where we alternate between going from left to right and right to left through k-space." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "epi_readout = EpiReadout(n_readout=12, n_phase_encoding=12, ramp_sampling=False, readout_type='symmetric')\n", + "fig = epi_readout.plot_trajectory()" + ] + }, + { + "cell_type": "markdown", + "id": "10", + "metadata": {}, + "source": [ + "Here we are sampling only during the flat time of the readout gradient. \n", + "We can make the sequence faster by also sampling during the ramp-up and ramp-down part of the readout gradient.\n", + "Nevertheless, this means that we don't have a strictly Cartesian trajectory anymore and gridding/nufft is required \n", + "during image reconstruction." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "epi_readout = EpiReadout(n_readout=12, n_phase_encoding=12, ramp_sampling=True, readout_type='symmetric')\n", + "fig = epi_readout.plot_trajectory()" + ] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "We can shorten the echo time by applying partial Fourier along the phase encoding direction." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "epi_readout = EpiReadout(\n", + " n_readout=12, n_phase_encoding=12, ramp_sampling=True, partial_fourier_factor=0.75, readout_type='symmetric'\n", + ")\n", + "fig = epi_readout.plot_trajectory()" + ] + }, + { + "cell_type": "markdown", + "id": "14", + "metadata": {}, + "source": [ + "Another approach is to always go back to one side of k-space after each readout such that the readout direction is always from left to right.\n", + "This is less efficient but minimized any artifacts due to asymmetry between positive and negative readout gradients. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15", + "metadata": {}, + "outputs": [], + "source": [ + "epi_readout = EpiReadout(\n", + " n_readout=12, n_phase_encoding=12, partial_fourier_factor=0.75, ramp_sampling=True, readout_type='flyback'\n", + ")\n", + "fig = epi_readout.plot_trajectory()" + ] + }, + { + "cell_type": "markdown", + "id": "16", + "metadata": {}, + "source": [ + "### EPI sequences\n", + "\n", + "To create the EPI FID and SE sequences, we use the previously imported [epi2d_fid script](../src/mrseq/scripts/ei2d_fid.py) and [epi2d_se script](../src/mrseq/scripts/ei2d_se.py).\n", + "\n", + "We are going to simulate different types of EPI sequences and compare the obtained images." + ] + }, + { + "cell_type": "markdown", + "id": "17", + "metadata": {}, + "source": [ + "#### EPI FID with symmetric readout without ramp sampling" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [], + "source": [ + "sequence, fname_seq = create_seq_fid(\n", + " system=sys_defaults,\n", + " test_report=False,\n", + " timing_check=False,\n", + " show_plots=False,\n", + " te=0.073,\n", + " fov_xy=float(phantom.size.numpy()[0]),\n", + " n_readout=image_matrix_size[0],\n", + " n_phase_encoding=image_matrix_size[0],\n", + " partial_fourier_factor=0.75,\n", + " ramp_sampling=False,\n", + " readout_type='symmetric',\n", + ")\n", + "\n", + "mr0_sequence = mr0.Sequence.import_file(str(fname_seq.with_suffix('.seq')))\n", + "signal, ktraj_adc = mr0.util.simulate(mr0_sequence, phantom, accuracy=1e0)\n", + "fname_mrd = Path(tmp.name) / 'epi_fid_sym_no_ramp.mrd'\n", + "mr0.sig_to_mrd(fname_mrd, signal, sequence)\n", + "combine_ismrmrd_files(fname_mrd, Path(f'{fname_seq}_header.h5'))\n", + "\n", + "kdata = KData.from_file(str(fname_mrd).replace('.mrd', '_with_traj.mrd'), trajectory=KTrajectoryIsmrmrd())\n", + "recon = DirectReconstruction(kdata, csm=None)\n", + "idata_fid_sym_no_ramp = recon(kdata)" + ] + }, + { + "cell_type": "markdown", + "id": "19", + "metadata": {}, + "source": [ + "#### EPI FID with symmetric readout and ramp sampling" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20", + "metadata": {}, + "outputs": [], + "source": [ + "sequence, fname_seq = create_seq_fid(\n", + " system=sys_defaults,\n", + " test_report=False,\n", + " timing_check=False,\n", + " show_plots=False,\n", + " te=0.073,\n", + " fov_xy=float(phantom.size.numpy()[0]),\n", + " n_readout=image_matrix_size[0],\n", + " n_phase_encoding=image_matrix_size[0],\n", + " partial_fourier_factor=0.75,\n", + " readout_type='symmetric',\n", + ")\n", + "\n", + "mr0_sequence = mr0.Sequence.import_file(str(fname_seq.with_suffix('.seq')))\n", + "signal, ktraj_adc = mr0.util.simulate(mr0_sequence, phantom, accuracy=1e0)\n", + "fname_mrd = Path(tmp.name) / 'epi_fid_sym_ramp.mrd'\n", + "mr0.sig_to_mrd(fname_mrd, signal, sequence)\n", + "combine_ismrmrd_files(fname_mrd, Path(f'{fname_seq}_header.h5'))\n", + "\n", + "kdata = KData.from_file(str(fname_mrd).replace('.mrd', '_with_traj.mrd'), trajectory=KTrajectoryIsmrmrd())\n", + "recon = DirectReconstruction(kdata, csm=None)\n", + "idata_fid_sym_ramp = recon(kdata)" + ] + }, + { + "cell_type": "markdown", + "id": "21", + "metadata": {}, + "source": [ + "#### EPI FID with flyback readout and ramp sampling" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "sequence, fname_seq = create_seq_fid(\n", + " system=sys_defaults,\n", + " test_report=False,\n", + " timing_check=False,\n", + " show_plots=False,\n", + " te=0.073,\n", + " fov_xy=float(phantom.size.numpy()[0]),\n", + " n_readout=image_matrix_size[0],\n", + " n_phase_encoding=image_matrix_size[0],\n", + " partial_fourier_factor=0.75,\n", + " readout_type='flyback',\n", + ")\n", + "\n", + "mr0_sequence = mr0.Sequence.import_file(str(fname_seq.with_suffix('.seq')))\n", + "signal, ktraj_adc = mr0.util.simulate(mr0_sequence, phantom, accuracy=1e0)\n", + "fname_mrd = Path(tmp.name) / 'epi_fid_flyback_ramp.mrd'\n", + "mr0.sig_to_mrd(fname_mrd, signal, sequence)\n", + "combine_ismrmrd_files(fname_mrd, Path(f'{fname_seq}_header.h5'))\n", + "\n", + "kdata = KData.from_file(str(fname_mrd).replace('.mrd', '_with_traj.mrd'), trajectory=KTrajectoryIsmrmrd())\n", + "recon = DirectReconstruction(kdata, csm=None)\n", + "idata_fid_flyback_ramp = recon(kdata)" + ] + }, + { + "cell_type": "markdown", + "id": "23", + "metadata": {}, + "source": [ + "#### EPI SE with symmetric readout and ramp sampling" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], + "source": [ + "sequence, fname_seq = create_seq_se(\n", + " system=sys_defaults,\n", + " test_report=False,\n", + " timing_check=False,\n", + " show_plots=False,\n", + " n_slices=1,\n", + " fov_xy=float(phantom.size.numpy()[0]),\n", + " n_readout=image_matrix_size[0],\n", + " n_phase_encoding=image_matrix_size[0],\n", + " partial_fourier_factor=0.75,\n", + " readout_type='symmetric',\n", + ")\n", + "\n", + "mr0_sequence = mr0.Sequence.import_file(str(fname_seq.with_suffix('.seq')))\n", + "signal, ktraj_adc = mr0.util.simulate(mr0_sequence, phantom, accuracy=1e-6)\n", + "fname_mrd = Path(tmp.name) / 'epi_se_sym_ramp.mrd'\n", + "mr0.sig_to_mrd(fname_mrd, signal, sequence)\n", + "combine_ismrmrd_files(fname_mrd, Path(f'{fname_seq}_header.h5'))\n", + "\n", + "kdata = KData.from_file(str(fname_mrd).replace('.mrd', '_with_traj.mrd'), trajectory=KTrajectoryIsmrmrd())\n", + "recon = DirectReconstruction(kdata, csm=None)\n", + "idata_se_sym_ramp = recon(kdata)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [ + "fig, ax = plt.subplots(2, 4, figsize=(16, 8))\n", + "for cax in ax.flatten():\n", + " cax.set_xticks([])\n", + " cax.set_yticks([])\n", + "\n", + "for idx, (idata, label) in enumerate(\n", + " zip(\n", + " [idata_fid_sym_no_ramp, idata_fid_sym_ramp, idata_fid_flyback_ramp, idata_se_sym_ramp],\n", + " ['symmetric no ramp', 'symmetric ramp', 'flyback ramp', 'se symmetric ramp'],\n", + " strict=True,\n", + " )\n", + "):\n", + " idat = idata.data.abs().squeeze().numpy()\n", + " idat /= np.sort(idat.flatten())[int(idat.size * 0.99)]\n", + " if idx == 0:\n", + " idat_ref = idat\n", + " relative_error = 0.0\n", + " relative_error += np.sum(np.abs(idat - idat_ref)) / np.sum(np.abs(idat_ref))\n", + "\n", + " ax[0, idx].imshow(idat, vmin=0, vmax=1, cmap='gray')\n", + " ax[0, idx].set_title(label)\n", + " ax[1, idx].imshow(idat - idat_ref, vmin=-1, vmax=1, cmap='bwr')\n", + "\n", + "relative_error /= 3\n", + "\n", + "print(f'Relative error {relative_error}')\n", + "assert relative_error < 0.20" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "mrseq_mrzero", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyproject.toml b/pyproject.toml index dc236f9..84ef7e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ dependencies = [ "numpy>=1.23,<3.0", "matplotlib<4.0", "pypulseq>=1.4.1", - "ismrmrd>=1.14.1", + "ismrmrd>=1.14.1,<1.15", ] [project.optional-dependencies] diff --git a/src/mrseq/scripts/epi2d_fid.py b/src/mrseq/scripts/epi2d_fid.py new file mode 100644 index 0000000..0addece --- /dev/null +++ b/src/mrseq/scripts/epi2d_fid.py @@ -0,0 +1,444 @@ +"""2D Echo Planar Imaging (EPI) FID sequence.""" + +from pathlib import Path +from typing import Literal + +import ismrmrd +import numpy as np +import pypulseq as pp + +from mrseq.utils import round_to_raster +from mrseq.utils import sys_defaults +from mrseq.utils import write_sequence +from mrseq.utils.EpiReadout import EpiReadout +from mrseq.utils.ismrmrd import Fov +from mrseq.utils.ismrmrd import Limits +from mrseq.utils.ismrmrd import MatrixSize +from mrseq.utils.ismrmrd import create_header + + +def epi2d_fid_kernel( + system: pp.Opts, + te: float | None, + tr: float | None, + fov_xy: float, + n_readout: int, + n_phase_encoding: int, + bandwidth: float, + slice_thickness: float, + n_slices: int, + rf_duration: float, + rf_flip_angle: float, + rf_bwt: float, + rf_apodization: float, + readout_type: Literal['symmetric', 'flyback'], + readout_oversampling: Literal[1, 2, 4], + ramp_sampling: bool, + partial_fourier_factor: float, + add_spoiler: bool, + add_noise_acq: bool, + add_navigator_acq: bool, + mrd_header_file: str | Path | None, +) -> tuple[pp.Sequence, float, float]: + """Generate a 2D Echo Planar Imaging (EPI) sequence. + + Parameters + ---------- + system + PyPulseq system limits object. + te + Desired echo time (TE) (in seconds). Minimum echo time is used if set to None. + tr + Desired repetition time (TR) (in seconds). + fov_xy + Field of view in x and y direction (in meters). + n_readout + Number of frequency encoding steps. + n_phase_encoding + Number of phase encoding steps. + bandwidth + Total receiver bandwidth (in Hz). + slice_thickness + Slice thickness of the 2D slice (in meters). + n_slices + Number of slices. + rf_duration + Duration of the rf excitation pulse (in seconds) + rf_flip_angle + Flip angle of rf excitation pulse (in degrees) + rf_bwt + Bandwidth-time product of rf excitation pulse (in Hz * seconds) + rf_apodization + Apodization factor of rf excitation pulse. + readout_type + Readout type ('symmetric' or 'flyback'). + readout_oversampling + Readout oversampling factor. Can be 1 (no oversampling), 2, or 4. + ramp_sampling + If True, ADC is active during gradient ramps for optimized timing. + partial_fourier_factor + Desired partial Fourier factor in "phase encoding" direction. Must be larger than 0.5 and smaller or equal to 1. + The actual partial Fourier factor might slightly deviate from the desired value. + add_spoiler + If True, a spoiler gradients will be added to the sequence after the EPI readout. + add_noise_acq + If True, noise acquisitions will be added at the beginning of the sequence. + add_navigator_acq + If True, 3 navigator acquisitions will be added to the sequence to allow for ghost corrections. + The navigator acquisitions are added between the rf excitation pulse and the EPI readout. + Be aware that navigator acquisitions will increase the minimum echo and repetition times. + mrd_header_file + Filename of the ISMRMRD header file to be created. If None, no header file is created. + + Returns + ------- + seq + PyPulseq Sequence object + min_te + Shortest possible echo time. + min_tr + Shortest possible repetition time. + + """ + # create PyPulseq Sequence object and set system limits + seq = pp.Sequence(system=system) + + # define number of navigator acquisitions + n_navigator_acq = 3 if add_navigator_acq else 0 + + # create EpiReadout object + epi2d = EpiReadout( + system=system, + fov=fov_xy, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=bandwidth, + oversampling=readout_oversampling, + readout_type=readout_type, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + pe_enable=True, + spoiling_enable=add_spoiler, + ) + + # create slice selective excitation pulse and gradients + rf, gz, gzr = pp.make_sinc_pulse( + flip_angle=np.deg2rad(rf_flip_angle), + duration=rf_duration, + slice_thickness=slice_thickness, + apodization=rf_apodization, + time_bw_product=rf_bwt, + delay=system.rf_dead_time, + system=system, + return_gz=True, + use='excitation', + ) + + # calculate echo time delay + min_te = rf.shape_dur / 2 # time from center to end of RF pulse + min_te += max(rf.ringdown_time, gz.fall_time) # RF ringdown time or gradient fall time + if add_navigator_acq: + min_te += pp.calc_duration(gzr, epi2d.gx_pre) + min_te += n_navigator_acq * pp.calc_duration(epi2d.gx) + min_te += pp.calc_duration(epi2d.gy_pre) + else: + min_te += pp.calc_duration(gzr, epi2d.gx_pre, epi2d.gx_pre) + min_te += epi2d.time_to_center_without_prephaser + + if te is None: + te_delay = 0.0 + else: + te_delay = round_to_raster(te - min_te, system.block_duration_raster) + if te_delay < 0: + raise ValueError(f'TE must be larger than {min_te * 1000:.3f} ms. Current value is {te * 1000:.3f} ms.') + + # calculate repetition time delay (tr_delay) for current TE settings + min_tr = pp.calc_duration(rf, gz) + if add_navigator_acq: + min_tr += pp.calc_duration(gzr, epi2d.gx_pre) + min_tr += n_navigator_acq * pp.calc_duration(epi2d.gx) + min_tr += pp.calc_duration(epi2d.gy_pre) + else: + min_tr += pp.calc_duration(gzr, epi2d.gx_pre, epi2d.gx_pre) + min_tr += epi2d.total_duration_without_prephaser + min_tr += te_delay + + if tr is None: + tr_delay = 0.0 + else: + tr_delay = round_to_raster(tr - min_tr, system.block_duration_raster) + if tr_delay < 0: + raise ValueError(f'TR must be larger than {min_tr * 1000:.2f} ms. Current value is {tr * 1000:.3f} ms.') + + # create header + prot = None + if mrd_header_file: + hdr = create_header( + traj_type='other', + encoding_fov=Fov(x=fov_xy * readout_oversampling, y=fov_xy, z=slice_thickness), + recon_fov=Fov(x=fov_xy, y=fov_xy, z=slice_thickness), + encoding_matrix=MatrixSize(n_x=n_readout * readout_oversampling, n_y=epi2d.n_phase_encoding, n_z=1), + recon_matrix=MatrixSize(n_x=n_readout, n_y=n_phase_encoding, n_z=1), + dwell_time=epi2d.adc.dwell, + slice_limits=Limits(min=0, max=n_slices, center=n_slices // 2), + k1_limits=Limits(min=0, max=epi2d.n_phase_enc_total, center=epi2d.n_phase_enc_pre_center + 1), + k2_limits=Limits(min=0, max=1, center=0), + ) + + # write header to file + prot = ismrmrd.Dataset(mrd_header_file, 'w') + prot.write_xml_header(hdr.toXML('utf-8')) + + # obtain noise samples if selected + if add_noise_acq: + seq.add_block( + pp.make_label(label='LIN', type='SET', value=0), + pp.make_label(label='SLC', type='SET', value=0), + pp.make_label(label='NOISE', type='SET', value=True), + ) + seq.add_block( + epi2d.adc, pp.make_delay(round_to_raster(pp.calc_duration(epi2d.adc), system.block_duration_raster, 'ceil')) + ) + seq.add_block(pp.make_label(label='NOISE', type='SET', value=False)) + seq.add_block(pp.make_delay(system.rf_dead_time)) + + # Write noise trajectory to MRD (zero trajectory — no gradients active) + if mrd_header_file: + assert prot is not None + n_samples = epi2d.adc.num_samples + acq = ismrmrd.Acquisition() + acq.resize(trajectory_dimensions=2, number_of_samples=n_samples) + acq.traj[:] = np.zeros((n_samples, 2), dtype=np.float32) + prot.append_acquisition(acq) + + for slice_ in range(n_slices): + # define slice label + slice_label = pp.make_label(label='SLC', type='SET', value=slice_) + + # set frequency offset for current slice + rf.freq_offset = gz.amplitude * slice_thickness * (slice_ - (n_slices - 1) / 2) + # rf.phase_offset = - 2 * np.pi * rf.freq_offset * pp.calc_rf_center(rf) + + # add slice-selective excitation pulse and set slice label + seq.add_block(rf, gz, slice_label) + + # add navigator scans for ghost correction + if add_navigator_acq: + # FID: invert pre-winder for odd number of navigators + gx_pre_factor = -1 if n_navigator_acq % 2 == 1 else 1 + seq, prot = epi2d.add_navigator_to_seq( + seq, + gzr, + n_navigator_acq, + n_phase_encoding, + gx_pre_factor, + mrd_dataset=prot, + ) + + # add echo time delay + seq.add_block(pp.make_delay(te_delay)) + + # add gy_pre and reset labels + seq.add_block( + epi2d.gy_pre, + pp.make_label(label='NAV', type='SET', value=0), + pp.make_label(label='AVG', type='SET', value=0), + ) + else: + # add echo time delay + seq.add_block(pp.make_delay(te_delay)) + + # align and add slice-selection rewinder and readout pre-winder gradients + gzr, gx_pre, gy_pre = pp.align(left=[gzr], right=[epi2d.gx_pre, epi2d.gy_pre]) + seq.add_block(gzr, gx_pre, gy_pre) + + # add EPI readout block without pre-phaser gradients + # (trajectory is written per-readout inside add_to_seq when mrd_dataset is provided) + seq, prot = epi2d.add_to_seq(seq, add_prephaser=False, mrd_dataset=prot) + + # add repetition time delay + if tr_delay > 0: + seq.add_block(pp.make_delay(tr_delay)) + + # close ISMRMRD file + if mrd_header_file: + assert prot is not None + prot.close() + + # set gridding definitions extracted from EpiReadout + if ramp_sampling: + gridding_params = [ + epi2d.gx.rise_time, + epi2d.gx.flat_time, + epi2d.gx.fall_time, + epi2d.adc.delay - epi2d.gx.delay, + epi2d.adc.num_samples * epi2d.adc.dwell, + ] + + seq.set_definition(key='TargetGriddedSamples', value=n_readout * readout_oversampling) + seq.set_definition(key='TrapezoidGriddingParameters', value=gridding_params) + + # write all required parameters in the seq-file header/definitions + seq.set_definition('FOV', [fov_xy, fov_xy, slice_thickness * n_slices]) + seq.set_definition('ReconMatrix', (n_readout, n_phase_encoding, n_slices)) + seq.set_definition('SliceThickness', slice_thickness) + seq.set_definition('TE', te or float(min_te)) + seq.set_definition('TR', tr or float(min_tr)) + seq.set_definition('ReadoutOversamplingFactor', readout_oversampling) + + return seq, float(min_te), float(min_tr) + + +def main( + system: pp.Opts | None = None, + te: float | None = None, + tr: float | None = None, + fov_xy: float = 200e-3, + n_readout: int = 64, + n_phase_encoding: int = 64, + n_slices: int = 1, + slice_thickness: float = 8e-3, + bandwidth: float = 100e3, + readout_type: Literal['symmetric', 'flyback'] = 'symmetric', + readout_oversampling: Literal[1, 2, 4] = 2, + ramp_sampling: bool = True, + partial_fourier_factor: float = 0.7, + add_navigator_acq: bool = True, + add_noise_acq: bool = True, + show_plots: bool = True, + test_report: bool = True, + timing_check: bool = True, + v141_compatibility: bool = True, +) -> tuple[pp.Sequence, Path]: + """Generate a 2D Echo Planar Imaging (EPI) FID sequence. + + Parameters + ---------- + system + PyPulseq system limits object. + te + Desired echo time (TE) (in seconds). Minimum echo time is used if set to None. + tr + Desired repetition time (TR) (in seconds). Minimum repetition time is used if set to None. + fov_xy + Field of view in x and y direction (in meters). + n_readout + Number of frequency encoding steps. + n_phase_encoding + Number of phase encoding steps. + n_slices + Number of slices. + slice_thickness + Slice thickness of the 2D slice (in meters). + bandwidth + Total receiver bandwidth (in Hz). + readout_type + Readout type ('symmetric' or 'flyback'). + readout_oversampling + Readout oversampling factor. Can be 1 (no oversampling), 2, or 4. + ramp_sampling + If True, ADC is active during gradient ramps for optimized timing. + partial_fourier_factor + Desired partial Fourier factor in "phase encoding" direction. + add_navigator_acq + If True, navigator acquisitions will be added for ghost corrections. + add_noise_acq + If True, noise acquisitions will be added at the beginning of the sequence. + show_plots + Toggles sequence plot. + test_report + Toggles advanced test report. + timing_check + Toggles timing check of the sequence. + v141_compatibility + Save the sequence in pulseq v1.4.1 for backwards compatibility. + + Returns + ------- + seq + Sequence object of 2D EPI FID sequence. + file_path + Path to the sequence file. + """ + if system is None: + system = sys_defaults + + # define settings of rf excitation pulse + rf_flip_angle = 90.0 # flip angle of the rf excitation pulse [°] + rf_duration = 1.28e-3 # duration of the rf excitation pulse [s] + rf_bwt = 4.0 # bandwidth-time product of rf excitation pulse [Hz*s] + rf_apodization = 0.5 # apodization factor of rf excitation pulse + + # define spoiling settings + enable_gradient_spoiling = True + + # define sequence filename + rs_string = 'rs' if ramp_sampling else 'nors' # ramp sampling + pf_string = f'{partial_fourier_factor}pf'.replace('.', 'p') # partial fourier factor + readout_string = 'sym' if readout_type == 'symmetric' else 'flyb' # readout type + noise_string = 'withnoise' if add_noise_acq else 'nonoise' # noise acquisition + nav_string = 'withnav' if add_navigator_acq else 'nonav' # navigator acquisition + + filename = f'{Path(__file__).stem}_{int(fov_xy * 1000)}fov_{n_readout}px' + filename += f'_{readout_string}_fid_{readout_oversampling}ro_{rs_string}_{pf_string}' + filename += f'_{noise_string}_{nav_string}' + + output_path = Path.cwd() / 'output' + output_path.mkdir(parents=True, exist_ok=True) + + # delete existing header file + if (output_path / Path(filename + '_header.h5')).exists(): + (output_path / Path(filename + '_header.h5')).unlink() + + mrd_file = output_path / Path(filename + '_header.h5') + + seq, _min_te, _min_tr = epi2d_fid_kernel( + system=system, + te=te, + tr=tr, + fov_xy=fov_xy, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=bandwidth, + readout_oversampling=readout_oversampling, + slice_thickness=slice_thickness, + n_slices=n_slices, + rf_duration=rf_duration, + rf_flip_angle=rf_flip_angle, + rf_bwt=rf_bwt, + rf_apodization=rf_apodization, + readout_type=readout_type, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + add_spoiler=enable_gradient_spoiling, + add_noise_acq=add_noise_acq, + add_navigator_acq=add_navigator_acq, + mrd_header_file=mrd_file, + ) + + # check timing of the sequence + if timing_check and not test_report: + ok, error_report = seq.check_timing() + if ok: + print('\nTiming check passed successfully') + else: + print('\nTiming check failed! Error listing follows\n') + print(error_report) + + # show advanced test report + if test_report: + print('\nCreating advanced test report...') + print(seq.test_report()) + + # save seq-file to disk + print(f"\nSaving sequence file '{filename}.seq' into folder '{output_path}'.") + write_sequence(seq, str(output_path / filename), create_signature=True, v141_compatibility=v141_compatibility) + + if show_plots: + seq.plot(time_range=(0, tr or _min_tr), plot_now=True) + + return seq, output_path / filename + + +if __name__ == '__main__': + main() diff --git a/src/mrseq/scripts/epi2d_se.py b/src/mrseq/scripts/epi2d_se.py new file mode 100644 index 0000000..d02aaaf --- /dev/null +++ b/src/mrseq/scripts/epi2d_se.py @@ -0,0 +1,508 @@ +"""2D Echo Planar Imaging (EPI) spin echo (SE) sequence.""" + +from pathlib import Path +from typing import Literal + +import ismrmrd +import numpy as np +import pypulseq as pp + +from mrseq.utils import round_to_raster +from mrseq.utils import sys_defaults +from mrseq.utils import write_sequence +from mrseq.utils.EpiReadout import EpiReadout +from mrseq.utils.ismrmrd import Fov +from mrseq.utils.ismrmrd import Limits +from mrseq.utils.ismrmrd import MatrixSize +from mrseq.utils.ismrmrd import create_header + + +def epi2d_se_kernel( + system: pp.Opts, + te: float | None, + tr: float | None, + fov_xy: float, + n_readout: int, + n_phase_encoding: int, + bandwidth: float, + slice_thickness: float, + n_slices: int, + rf_duration: float, + rf_flip_angle: float, + rf_bwt: float, + rf_apodization: float, + readout_type: Literal['symmetric', 'flyback'], + readout_oversampling: Literal[1, 2, 4], + ramp_sampling: bool, + partial_fourier_factor: float, + add_spoiler: bool, + add_noise_acq: bool, + add_navigator_acq: bool, + mrd_header_file: str | Path | None, +) -> tuple[pp.Sequence, float, float]: + """Generate a 2D Echo Planar Imaging (EPI) sequence. + + Parameters + ---------- + system + PyPulseq system limits object. + te + Desired echo time (TE) (in seconds). Minimum echo time is used if set to None. + tr + Desired repetition time (TR) (in seconds). + fov_xy + Field of view in x and y direction (in meters). + n_readout + Number of frequency encoding steps. + n_phase_encoding + Number of phase encoding steps. + bandwidth + Total receiver bandwidth in Hz. + slice_thickness + Slice thickness of the 2D slice (in meters). + n_slices + Number of slices. + rf_duration + Duration of the rf excitation pulse (in seconds) + rf_flip_angle + Flip angle of rf excitation pulse (in degrees) + rf_bwt + Bandwidth-time product of rf excitation pulse (Hz * seconds) + rf_apodization + Apodization factor of rf excitation pulse + readout_type + Readout type ('symmetric' or 'flyback'). + readout_oversampling + Readout oversampling factor. Can be 1 (no oversampling), 2, or 4. + ramp_sampling + If True, ADC is active during gradient ramps for optimized timing. + partial_fourier_factor + Desired partial Fourier factor in "phase encoding" direction. Must be larger than 0.5 and smaller or equal to 1. + The actual partial Fourier factor might slightly deviate from the desired value. + add_spoiler + If True, a spoiler gradient will be added to the sequence after the EPI readout. + add_noise_acq + If True, noise acquisitions will be added at the beginning of the sequence. + add_navigator_acq + If True, 3 navigator acquisitions will be added to the sequence to allow for ghost corrections. + The navigator acquisitions are added between the rf excitation pulse and the refocusing pulse. + Be aware that navigator acquisitions will increase the minimum echo and repetition times. + mrd_header_file + Filename of the ISMRMRD header file to be created. If None, no header file is created. + + Returns + ------- + seq + PyPulseq Sequence object + min_te + Shortest possible echo time. + min_tr + Shortest possible repetition time. + + """ + # create PyPulseq Sequence object and set system limits + seq = pp.Sequence(system=system) + + # define number of navigator acquisitions + n_navigator_acq = 3 if add_navigator_acq else 0 + + # create EpiReadout object + epi2d = EpiReadout( + system=system, + fov=fov_xy, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=bandwidth, + oversampling=readout_oversampling, + readout_type=readout_type, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + pe_enable=True, + spoiling_enable=add_spoiler, + ) + + # create slice selective excitation pulse and gradients + rf, gz, gzr = pp.make_sinc_pulse( + flip_angle=np.deg2rad(rf_flip_angle), + duration=rf_duration, + slice_thickness=slice_thickness, + apodization=rf_apodization, + time_bw_product=rf_bwt, + delay=system.rf_dead_time, + system=system, + return_gz=True, + use='excitation', + ) + + # create refocussing pulse and gradients if echo type is 'SE (spin echo) + rf180, gz180, _ = pp.make_sinc_pulse( + flip_angle=np.pi, + system=system, + duration=rf_duration * 2, + slice_thickness=slice_thickness, + apodization=0.5, + time_bw_product=4, + phase_offset=np.pi / 2, + use='refocusing', + return_gz=True, + delay=system.rf_dead_time, + ) + + _spoil_factor = 1.5 + _, gzr1_t, gzr1_a = pp.make_extended_trapezoid_area( + channel='z', + grad_start=0, + grad_end=gz180.amplitude, + area=_spoil_factor * gz.area, + system=system, + ) + _, gzr2_t, gzr2_a = pp.make_extended_trapezoid_area( + channel='z', + grad_start=gz180.amplitude, + grad_end=0, + area=_spoil_factor * gz.area, + system=system, + ) + + # create combined gradient including pre/post spoiler + gz180n = pp.make_extended_trapezoid( + channel='z', + system=system, + times=np.array([*gzr1_t, *gzr2_t + gzr1_t[-1] + gz180.flat_time]), + amplitudes=np.array([*gzr1_a, *gzr2_a]), + ) + + # update rf delay of refocussing pulse to ensure it's centered in plateau of combined gradient + rf180.delay = gzr1_t[-1] + + # calculate echo time delay(s) + t_exc_to_ref = rf.shape_dur / 2 + t_exc_to_ref += max(rf.ringdown_time, gz.fall_time) + if add_navigator_acq: + t_exc_to_ref += pp.calc_duration(gzr, epi2d.gx_pre) + t_exc_to_ref += 3 * pp.calc_duration(epi2d.gx) + t_exc_to_ref += pp.calc_duration(epi2d.gy_pre) + else: + t_exc_to_ref += pp.calc_duration(gzr, epi2d.gx_pre, epi2d.gy_pre) + t_exc_to_ref += gzr1_t[-1] + t_exc_to_ref += rf180.shape_dur / 2 + + t_ref_to_kcenter = rf180.shape_dur / 2 + t_ref_to_kcenter += gzr2_t[-1] + t_ref_to_kcenter += epi2d.time_to_center_without_prephaser + + # calculate minimum echo time + min_te = 2 * round_to_raster(max(t_exc_to_ref, t_ref_to_kcenter), system.block_duration_raster) + + # calculate echo time delays for minimum echo time + te_delay1 = round_to_raster(min_te / 2 - t_exc_to_ref, system.block_duration_raster) + te_delay2 = round_to_raster(min_te / 2 - t_ref_to_kcenter, system.block_duration_raster) + + if te is not None: + if te > min_te: + # calculate echo time delays for desired echo time + additional_te_delay_half = round_to_raster((te - min_te) / 2, system.block_duration_raster) + te_delay1 += additional_te_delay_half + te_delay2 += additional_te_delay_half + else: + raise ValueError(f'Desired TE ({te * 1000:.3f} ms) is smaller than minimum TE ({min_te * 1000:.3f} ms).') + + t_exc_to_ref = t_exc_to_ref + te_delay1 # might NOT be on block_raster + t_ref_to_kcenter = t_ref_to_kcenter + te_delay2 # might NOT be on block_raster + + # calculate repetition time delay (tr_delay) for current TE settings + min_tr = pp.calc_duration(rf, gz) + if add_navigator_acq: + min_tr += pp.calc_duration(gzr, epi2d.gx_pre) + min_tr += 3 * pp.calc_duration(epi2d.gx) + min_tr += pp.calc_duration(epi2d.gy_pre) + else: + min_tr += pp.calc_duration(gzr, epi2d.gx_pre, epi2d.gy_pre) + min_tr += te_delay1 + min_tr += pp.calc_duration(rf180, gz180n) + min_tr += te_delay2 + min_tr += epi2d.total_duration_without_prephaser + + if tr is None: + tr_delay = 0.0 + else: + tr_delay = round_to_raster(tr - min_tr, system.block_duration_raster) + if tr_delay < 0: + raise ValueError(f'TR must be larger than {min_tr * 1000:.2f} ms. Current value is {tr * 1000:.3f} ms.') + + print(f'\nCurrent echo time = {(t_exc_to_ref + t_ref_to_kcenter) * 1000:.4f} ms') + print(f'Current repetition time = {(min_tr + tr_delay) * 1000:.4f} ms') + + # create header + prot = None + if mrd_header_file: + hdr = create_header( + traj_type='other', + encoding_fov=Fov(x=fov_xy * readout_oversampling, y=fov_xy, z=slice_thickness), + recon_fov=Fov(x=fov_xy, y=fov_xy, z=slice_thickness), + encoding_matrix=MatrixSize(n_x=n_readout * readout_oversampling, n_y=epi2d.n_phase_encoding, n_z=1), + recon_matrix=MatrixSize(n_x=n_readout, n_y=n_phase_encoding, n_z=1), + dwell_time=epi2d.adc.dwell, + slice_limits=Limits(min=0, max=n_slices, center=n_slices // 2), + k1_limits=Limits(min=0, max=epi2d.n_phase_enc_total, center=epi2d.n_phase_enc_pre_center + 1), + k2_limits=Limits(min=0, max=1, center=0), + ) + + # write header to file + prot = ismrmrd.Dataset(mrd_header_file, 'w') + prot.write_xml_header(hdr.toXML('utf-8')) + + # obtain noise samples if selected + if add_noise_acq: + seq.add_block( + pp.make_label(label='LIN', type='SET', value=0), + pp.make_label(label='SLC', type='SET', value=0), + pp.make_label(label='NOISE', type='SET', value=True), + ) + seq.add_block( + epi2d.adc, pp.make_delay(round_to_raster(pp.calc_duration(epi2d.adc), system.block_duration_raster, 'ceil')) + ) + seq.add_block(pp.make_label(label='NOISE', type='SET', value=False)) + seq.add_block(pp.make_delay(system.rf_dead_time)) + + # Write noise trajectory to MRD (zero trajectory — no gradients active) + if mrd_header_file: + assert prot is not None + n_samples = epi2d.adc.num_samples + acq = ismrmrd.Acquisition() + acq.resize(trajectory_dimensions=2, number_of_samples=n_samples) + acq.traj[:] = np.zeros((n_samples, 2), dtype=np.float32) + prot.append_acquisition(acq) + + for slice_ in range(n_slices): + # define slice label + slice_label = pp.make_label(label='SLC', type='SET', value=slice_) + + # set frequency offset for current slice + rf.freq_offset = gz.amplitude * slice_thickness * (slice_ - (n_slices - 1) / 2) + # rf.phase_offset = - 2 * np.pi * rf.freq_offset * pp.calc_rf_center(rf) + + # add slice selective excitation pulse and set slice label + seq.add_block(rf, gz, slice_label) + + # add navigator scans for ghost correction + if add_navigator_acq: + # SE: invert pre-winder for even number of navigators + gx_pre_factor = -1 if n_navigator_acq % 2 == 0 else 1 + seq, prot = epi2d.add_navigator_to_seq( + seq, + gzr, + n_navigator_acq, + n_phase_encoding, + gx_pre_factor, + mrd_dataset=prot, + ) + + # add gy_pre and reset labels + seq.add_block( + pp.scale_grad(epi2d.gy_pre, -1), + pp.make_label(label='NAV', type='SET', value=0), + pp.make_label(label='AVG', type='SET', value=0), + ) + else: + seq.add_block(gzr, pp.scale_grad(epi2d.gx_pre, -1), pp.scale_grad(epi2d.gy_pre, -1)) + + if te_delay1 > 0: + seq.add_block(pp.make_delay(te_delay1)) + + # add refocusing pulse + combined gradient + seq.add_block(rf180, gz180n) + + if te_delay2 > 0: + seq.add_block(pp.make_delay(te_delay2)) + + # add EPI readout block without pre-phaser gradients + # (trajectory is written per-readout inside add_to_seq when mrd_dataset is provided) + seq, prot = epi2d.add_to_seq(seq, add_prephaser=False, mrd_dataset=prot) + + # add repetition time delay + if tr_delay > 0: + seq.add_block(pp.make_delay(tr_delay)) + + # close ISMRMRD file + if mrd_header_file: + assert prot is not None + prot.close() + + # set gridding definitions extracted from EpiReadout + if ramp_sampling: + gridding_params = [ + epi2d.gx.rise_time, + epi2d.gx.flat_time, + epi2d.gx.fall_time, + epi2d.adc.delay - epi2d.gx.delay, + epi2d.adc.num_samples * epi2d.adc.dwell, + ] + + seq.set_definition(key='TargetGriddedSamples', value=n_readout * readout_oversampling) + seq.set_definition(key='TrapezoidGriddingParameters', value=gridding_params) + + # write all required parameters in the seq-file header/definitions + seq.set_definition('FOV', [fov_xy, fov_xy, slice_thickness * n_slices]) + seq.set_definition('ReconMatrix', (n_readout, n_phase_encoding, n_slices)) + seq.set_definition('SliceThickness', slice_thickness) + seq.set_definition('TE', float(t_exc_to_ref + t_ref_to_kcenter)) + seq.set_definition('TR', tr or float(min_tr)) + seq.set_definition('ReadoutOversamplingFactor', readout_oversampling) + + return seq, float(min_te), float(min_tr) + + +def main( + system: pp.Opts | None = None, + te: float | None = None, + tr: float | None = None, + fov_xy: float = 200e-3, + n_readout: int = 64, + n_phase_encoding: int = 64, + n_slices: int = 1, + slice_thickness: float = 8e-3, + bandwidth: float = 100e3, + readout_type: Literal['symmetric', 'flyback'] = 'symmetric', + readout_oversampling: Literal[1, 2, 4] = 2, + ramp_sampling: bool = True, + partial_fourier_factor: float = 0.7, + add_navigator_acq: bool = True, + add_noise_acq: bool = True, + show_plots: bool = True, + test_report: bool = True, + timing_check: bool = True, + v141_compatibility: bool = True, +) -> tuple[pp.Sequence, Path]: + """Generate a 2D Echo Planar Imaging (EPI) spin echo (SE) sequence. + + Parameters + ---------- + system + PyPulseq system limits object. + te + Desired echo time (TE) (in seconds). Minimum echo time is used if set to None. + tr + Desired repetition time (TR) (in seconds). Minimum repetition time is used if set to None. + fov_xy + Field of view in x and y direction (in meters). + n_readout + Number of frequency encoding steps. + n_phase_encoding + Number of phase encoding steps. + n_slices + Number of slices. + slice_thickness + Slice thickness of the 2D slice (in meters). + bandwidth + Total receiver bandwidth (in Hz). + readout_type + Readout type ('symmetric' or 'flyback'). + readout_oversampling + Readout oversampling factor. Can be 1 (no oversampling), 2, or 4. + ramp_sampling + If True, ADC is active during gradient ramps for optimized timing. + partial_fourier_factor + Desired partial Fourier factor in "phase encoding" direction. + add_navigator_acq + If True, navigator acquisitions will be added for ghost corrections. + add_noise_acq + If True, noise acquisitions will be added at the beginning of the sequence. + show_plots + Toggles sequence plot. + test_report + Toggles advanced test report. + timing_check + Toggles timing check of the sequence. + v141_compatibility + Save the sequence in pulseq v1.4.1 for backwards compatibility. + + Returns + ------- + seq + Sequence object of 2D EPI SE sequence. + file_path + Path to the sequence file. + """ + if system is None: + system = sys_defaults + + # define settings of rf excitation pulse + rf_flip_angle = 90.0 # flip angle of the rf excitation pulse [°] + rf_duration = 1.28e-3 # duration of the rf excitation pulse [s] + rf_bwt = 4.0 # bandwidth-time product of rf excitation pulse [Hz*s] + rf_apodization = 0.5 # apodization factor of rf excitation pulse + + # define spoiling settings + enable_gradient_spoiling = True + + # define sequence filename + rs_string = 'rs' if ramp_sampling else 'nors' # ramp sampling + pf_string = f'{partial_fourier_factor}pf'.replace('.', 'p') # partial fourier factor + readout_string = 'sym' if readout_type == 'symmetric' else 'flyb' # readout type + noise_string = 'withnoise' if add_noise_acq else 'nonoise' # noise acquisition + nav_string = 'withnav' if add_navigator_acq else 'nonav' # navigator acquisition + + filename = f'{Path(__file__).stem}_{int(fov_xy * 1000)}fov_{n_readout}px' + filename += f'_{readout_string}_se_{readout_oversampling}ro_{rs_string}_{pf_string}' + filename += f'_{noise_string}_{nav_string}' + + output_path = Path.cwd() / 'output' + output_path.mkdir(parents=True, exist_ok=True) + + # delete existing header file + if (output_path / Path(filename + '_header.h5')).exists(): + (output_path / Path(filename + '_header.h5')).unlink() + + mrd_file = output_path / Path(filename + '_header.h5') + + seq, _min_te, min_tr = epi2d_se_kernel( + system=system, + te=te, + tr=tr, + fov_xy=fov_xy, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=bandwidth, + readout_oversampling=readout_oversampling, + slice_thickness=slice_thickness, + n_slices=n_slices, + rf_duration=rf_duration, + rf_flip_angle=rf_flip_angle, + rf_bwt=rf_bwt, + rf_apodization=rf_apodization, + readout_type=readout_type, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + add_spoiler=enable_gradient_spoiling, + add_noise_acq=add_noise_acq, + add_navigator_acq=add_navigator_acq, + mrd_header_file=mrd_file, + ) + + # check timing of the sequence + if timing_check and not test_report: + ok, error_report = seq.check_timing() + if ok: + print('\nTiming check passed successfully') + else: + print('\nTiming check failed! Error listing follows\n') + print(error_report) + + # show advanced test report + if test_report: + print('\nCreating advanced test report...') + print(seq.test_report()) + + # save seq-file to disk + print(f"\nSaving sequence file '{filename}.seq' into folder '{output_path}'.") + write_sequence(seq, str(output_path / filename), create_signature=True, v141_compatibility=v141_compatibility) + + if show_plots: + seq.plot(time_range=(0, 10 * (tr or min_tr)), plot_now=True) + + return seq, output_path / filename + + +if __name__ == '__main__': + main() diff --git a/src/mrseq/utils/EpiReadout.py b/src/mrseq/utils/EpiReadout.py new file mode 100644 index 0000000..252b430 --- /dev/null +++ b/src/mrseq/utils/EpiReadout.py @@ -0,0 +1,808 @@ +"""Echo Planar Imaging (EPI) readout sequence.""" + +from typing import Literal + +import ismrmrd +import matplotlib.pyplot as plt +import numpy as np +import pypulseq as pp + +from mrseq.utils import round_to_raster +from mrseq.utils import sys_defaults + + +def _trapezoid_area_at_times( + rise_time: float, + flat_time: float, + fall_time: float, + amplitude: float, + sample_times: np.ndarray, +) -> np.ndarray: + """Calculate accumulated gradient area analytically for a trapezoidal gradient. + + Parameters + ---------- + rise_time + Gradient rise time (in seconds). + flat_time + Gradient flat time (in seconds). + fall_time + Gradient fall time (in seconds). + amplitude + Gradient amplitude (in Hz/m). + sample_times + Array of time points (in seconds) relative to the start of the gradient. + + Returns + ------- + area + Accumulated gradient area at each sample time (in 1/m). + """ + t1 = rise_time + t2 = t1 + flat_time + t3 = t2 + fall_time + + area_at_t1 = amplitude * rise_time / 2 + area_at_t2 = area_at_t1 + amplitude * flat_time + area_at_t3 = area_at_t2 + amplitude * fall_time / 2 + + area = np.zeros_like(sample_times) + for i, t in enumerate(sample_times): + if t <= 0: + area[i] = 0.0 + elif t <= t1: + area[i] = amplitude * t**2 / (2 * rise_time) + elif t <= t2: + area[i] = area_at_t1 + amplitude * (t - t1) + elif t <= t3: + dt = t - t2 + area[i] = area_at_t2 + amplitude * dt - amplitude * dt**2 / (2 * fall_time) + else: + area[i] = area_at_t3 + + return area + + +def _piecewise_linear_area_at_times( + waveform_times: np.ndarray, + waveform_values: np.ndarray, + sample_times: np.ndarray, +) -> np.ndarray: + """Compute cumulative integral of a piecewise-linear waveform at given sample times. + + Parameters + ---------- + waveform_times + Time points of the piecewise-linear waveform (in seconds), sorted ascending. + waveform_values + Waveform amplitude values at each time point (in Hz/m). + sample_times + Times at which to evaluate the cumulative area (in seconds). + + Returns + ------- + area + Cumulative area at each sample time (in 1/m). + """ + n_knots = len(waveform_times) + cum_area = np.zeros(n_knots) + for k in range(1, n_knots): + dt = waveform_times[k] - waveform_times[k - 1] + cum_area[k] = cum_area[k - 1] + (waveform_values[k] + waveform_values[k - 1]) / 2 * dt + + area = np.zeros_like(sample_times) + for i, t in enumerate(sample_times): + if t <= waveform_times[0]: + area[i] = 0.0 + elif t >= waveform_times[-1]: + area[i] = cum_area[-1] + else: + k = np.searchsorted(waveform_times, t, side='right') + base_area = cum_area[k - 1] + dt_seg = waveform_times[k] - waveform_times[k - 1] + dt = t - waveform_times[k - 1] + frac = dt / dt_seg + wf_at_t = waveform_values[k - 1] + frac * (waveform_values[k] - waveform_values[k - 1]) + area[i] = base_area + (waveform_values[k - 1] + wf_at_t) / 2 * dt + + return area + + +class EpiReadout: + """EPI readout module supporting flyback and symmetric readout, ramp sampling, oversampling, and partial Fourier. + + Attributes + ---------- + system + System limits. + ramp_sampling + If True, ADC is active during gradient ramps (optimized timing). + readout_type + Readout type ('flyback' or 'symmetric'). + spoiling_enable + Enable spoiling gradients (useful for calibration if False). + adc + ADC event. + gx_pre + Pre-phaser gradient in readout direction. + gy_pre + Pre-phaser gradient in phase encoding direction. + gx + Readout gradient. + gy + Phase encoding gradient. + gy_blip + Complete blip gradient used for flyback readout. + gy_blipup + Ramp-up part of blip gradient used for symmetric readout. + gy_blipdown + Ramp-down part of blip gradient used for symmetric readout. + gy_blipdownup + Combined gy_blipdown and gy_blipup gradient used for symmetric readout. + gz_spoil + Spoiler gradient in z-direction. + gx_flyback + Flyback gradient in x-direction. + """ + + def __init__( + self, + system: pp.Opts | None = None, + fov: float = 128e-3, + n_readout: int = 64, + n_phase_encoding: int = 64, + bandwidth: float = 50e3, + oversampling: int = 2, + ramp_sampling: bool = True, + readout_type: Literal['flyback', 'symmetric'] = 'symmetric', + partial_fourier_factor: float = 1.0, + adc_freq_offset: float = 0, + pe_enable: bool = True, + spoiling_enable: bool = True, + ): + """Initialize EPI Readout. + + Parameters + ---------- + system + System limits. + fov + Field of view in meters (square). + n_readout + Number of readout points. + n_phase_encoding + Number of phase encoding points. + bandwidth + Total receiver bandwidth in Hz. + oversampling + ADC oversampling factor. + ramp_sampling + If True, ADC is active during gradient ramps (optimized timing). + readout_type + Readout type ('flyback' or 'symmetric'). + partial_fourier_factor + Partial Fourier factor (0.5 to 1.0). + adc_freq_offset + Frequency offset for the ADC. + pe_enable + Enable phase encoding (useful for calibration scans if False). + spoiling_enable + Enable spoiling gradients. + """ + if not 0.5 < partial_fourier_factor <= 1.0: + raise ValueError('Desired partial Fourier factor must be larger than 0.5 and smaller or equal to 1.0.') + + # set system to default if not provided + if system is None: + system = sys_defaults + + self.system = system + self.fov = fov + self.n_readout = n_readout + self.n_phase_encoding = n_phase_encoding + self.bandwidth = bandwidth + self.oversampling = oversampling + self.ramp_sampling = ramp_sampling + self.readout_type = readout_type + self.partial_fourier_factor = partial_fourier_factor + self.adc_freq_offset = adc_freq_offset + self.pe_enable = pe_enable + self.spoiling_enable = spoiling_enable + + # Derived parameters + readout_time = n_readout / bandwidth + delta_kx = 1 / (fov * oversampling) + delta_ky = 1 / fov + + # Initiate all optional gradients as None + self.gx_flyback = None + self.gy_blipdown = None + self.gy_blipup = None + self.gy_blipdownup = None + self.gz_spoil = None + + # Create blip gradient with shortest possible timing + gy_blip_duration = np.ceil(2 * np.sqrt(delta_ky / system.max_slew) / 10e-6 / 2) * 10e-6 * 2 + gy_blip_half_dur = gy_blip_duration / 2 + self.gy_blip = pp.make_trapezoid(channel='y', system=self.system, area=delta_ky, duration=gy_blip_duration) + + # Create readout gradient + gx_encoding_area = n_readout * delta_kx * oversampling + if self.ramp_sampling: + # Calculate additional gradient area from gy_blip assuming maximum slew rate + extra_area = np.power(gy_blip_half_dur, 2) * self.system.max_slew + + # Create gradient with additional area + gx = pp.make_trapezoid( + channel='x', + system=self.system, + area=gx_encoding_area + extra_area, + duration=gy_blip_half_dur + readout_time + gy_blip_half_dur, + ) + + if not gx.fall_time == gx.rise_time: + raise ValueError('Gradient fall time must be equal to rise time for ramp sampling.') + + # Second, correct area taking actual slew rate into account + gx_slew = gx.amplitude / gx.rise_time + gx_area_reduced_slew = gx.area - gx_slew * np.power(gy_blip_half_dur, 2) + + gx.amplitude = float(gx.amplitude * gx_encoding_area / gx_area_reduced_slew) + gx.area = float(gx.amplitude * (gx.rise_time / 2 + gx.flat_time + gx.fall_time / 2)) + gx.flat_area = float(gx.amplitude * gx.flat_time) + self.gx = gx + else: + self.gx = pp.make_trapezoid( + channel='x', + system=self.system, + flat_area=gx_encoding_area, + flat_time=readout_time, + ) + + # Create ADC event + adc_dwell = delta_kx / self.gx.amplitude + adc_dwell = round_to_raster(adc_dwell, self.system.adc_raster_time) + + adc_samples = int(round(readout_time / adc_dwell / 4) * 4) + + adc_time_to_center = adc_dwell * (adc_samples / 2 + 0.5) + adc_delay = self.gx.rise_time + self.gx.flat_time / 2 - adc_time_to_center + adc_dwell / 2 + # the adc delay has to be rounded to the rf raster time (not the adc raster time) + adc_delay = round_to_raster(adc_delay, self.system.rf_raster_time) + + self.adc = pp.make_adc( + num_samples=adc_samples, + dwell=adc_dwell, + freq_offset=adc_freq_offset, + delay=adc_delay, + system=self.system, + ) + + # Create and align pre-phaser gradients considering partial fourier factor + # determine the number of "PE" lines after (and including) k-space center (independent of partial fourier) + self.n_phase_enc_post_center = int(np.ceil(self.n_phase_encoding / 2 + 1)) + # find the closest number of lines to the desired partial fourier factor + valid_n_phase_total = int(np.ceil(partial_fourier_factor * self.n_phase_encoding)) + # ensure that at least one line before the center is acquired + self.n_phase_enc_pre_center = max(1, valid_n_phase_total - self.n_phase_enc_post_center) + # update the total number of "PE" lines + self.n_phase_enc_total = self.n_phase_enc_pre_center + self.n_phase_enc_post_center + # recalculate the actual partial fourier factor + actual_pf_factor = self.n_phase_enc_total / self.n_phase_encoding + if actual_pf_factor != partial_fourier_factor: + print(f'Adjusted partial Fourier factor from {partial_fourier_factor} to {actual_pf_factor:.2f}.') + self.partial_fourier_factor = actual_pf_factor + + # Create pre-phaser gradients + self.gx_pre = pp.make_trapezoid( + channel='x', + system=self.system, + area=-self.gx.area / 2, + ) + self.gy_pre = pp.make_trapezoid( + channel='y', + system=self.system, + area=-self.n_phase_enc_pre_center * delta_ky, + ) + self.gx_pre, self.gy_pre = pp.align(right=[self.gx_pre, self.gy_pre]) + + # Create and align "phase encoding" gradients + if self.readout_type == 'flyback': + self.gx_flyback = pp.make_trapezoid(channel='x', system=self.system, area=-self.gx.area) + elif self.readout_type == 'symmetric': + # Split and align blip gradient in case of symmetric readout + gy_parts = pp.split_gradient_at(grad=self.gy_blip, time_point=gy_blip_duration / 2, system=self.system) + self.gy_blipup, self.gy_blipdown, _ = pp.align(right=gy_parts[0], left=[gy_parts[1], self.gx]) + self.gy_blipdownup = pp.add_gradients((self.gy_blipdown, self.gy_blipup), system=self.system) + else: + raise NotImplementedError('Currently, only "symmetric" and "flyback" readout types are supported.') + + # Disable phase encoding if self.pe_enable is False + if not self.pe_enable: + self.gy_pre.amplitude = 0 + if ( + self.readout_type == 'symmetric' + and self.gy_blipup is not None + and self.gy_blipdown is not None + and self.gy_blipdownup is not None + ): + self.gy_blipup.waveform *= 0 + self.gy_blipdown.waveform *= 0 + self.gy_blipdownup.waveform *= 0 + elif self.readout_type == 'flyback': + self.gy_blip.waveform *= 0 + + # Create spoiler gradient if spoiling is enabled + if self.spoiling_enable: + self.gz_spoil = pp.make_trapezoid(channel='z', system=self.system, area=4 * delta_ky * n_readout) + + @property + def time_to_center(self) -> float: + """Return time from beginning of readout to center of k-space (needed for TE calculations).""" + # i) add time for pre phaser + time_to_center = pp.calc_duration(self.gx_pre, self.gy_pre) + # ii) add time for completed k-space lines before central (ky = 0) line + if self.readout_type == 'flyback': + time_to_center += self.n_phase_enc_pre_center * ( + pp.calc_duration(self.gx) + pp.calc_duration(self.gx_flyback, self.gy_blip) + ) + elif self.readout_type == 'symmetric': + time_to_center += self.n_phase_enc_pre_center * pp.calc_duration(self.gx) + # iii) add time before start of ADC of central k-space line + if self.ramp_sampling: + time_to_center += self.adc.delay + else: + time_to_center += self.gx.rise_time + # iv) add time from start of ADC (for ky = 0) to timepoint when kx = 0 as well + time_to_center += self.adc.dwell * (self.adc.num_samples / 2 + 0.5) + + return float(time_to_center) + + @property + def time_to_center_without_prephaser(self) -> float: + """Return time from after pre-phasers to center of k-space (needed for TE calculations).""" + return self.time_to_center - pp.calc_duration(self.gx_pre, self.gy_pre) + + @property + def total_duration(self) -> float: + """Return total duration of readout including pre-phaser and optional spoiler.""" + total_duration = pp.calc_duration(self.gx_pre, self.gy_pre) + if self.readout_type == 'flyback': + total_duration += self.n_phase_enc_total * ( + pp.calc_duration(self.gx) + pp.calc_duration(self.gx_flyback, self.gy_blip) + ) + total_duration -= pp.calc_duration(self.gx_flyback, self.gy_blip) + elif self.readout_type == 'symmetric': + total_duration += self.n_phase_enc_total * pp.calc_duration(self.gx) + # add time for spoiler if enabled + if self.spoiling_enable: + total_duration += pp.calc_duration(self.gz_spoil) + + return float(total_duration) + + @property + def total_duration_without_prephaser(self) -> float: + """Return total duration of readout excluding pre-phasers but including optional spoiler.""" + return self.total_duration - pp.calc_duration(self.gx_pre, self.gy_pre) + + def _readout_block_kxy( + self, + pe_idx: int, + sample_times: np.ndarray, + ky_base: float, + ) -> tuple[np.ndarray, np.ndarray, float]: + """Compute kx/ky at given time points for a single readout block. + + Parameters + ---------- + pe_idx + Phase encoding index (0-based). + sample_times + Time points relative to the start of the readout block (in seconds). + ky_base + Cumulative ky offset at the start of this block (in 1/m). + + Returns + ------- + kx + kx positions at each sample time (in 1/m). + ky + ky positions at each sample time (in 1/m). + ky_base_next + Updated ky_base for the next block (in 1/m). + """ + n_pe = self.n_phase_enc_total + kx_offset = self.gx_pre.area + + # kx: accumulated area under the readout gradient + kx_forward = _trapezoid_area_at_times( + self.gx.rise_time, self.gx.flat_time, self.gx.fall_time, abs(self.gx.amplitude), sample_times + ) + + if self.readout_type == 'flyback': + kx = kx_offset + kx_forward + ky = np.full_like(sample_times, ky_base) + ky_base_next = ky_base + self.gy_blip.area + + elif self.readout_type == 'symmetric': + if pe_idx % 2 == 0: + kx = kx_offset + kx_forward + else: + kx = kx_offset + self.gx.area - kx_forward + + # ky changes during readout due to blip overlap + if pe_idx == 0: + assert self.gy_blipup is not None + bu_tt = self.gy_blipup.tt + self.gy_blipup.delay + ky_blip = _piecewise_linear_area_at_times(bu_tt, self.gy_blipup.waveform, sample_times) + blip_total = _piecewise_linear_area_at_times(bu_tt, self.gy_blipup.waveform, np.array([bu_tt[-1]]))[0] + elif pe_idx == n_pe - 1: + assert self.gy_blipdown is not None + bd_tt = self.gy_blipdown.tt + self.gy_blipdown.delay + ky_blip = _piecewise_linear_area_at_times(bd_tt, self.gy_blipdown.waveform, sample_times) + blip_total = 0.0 + else: + assert self.gy_blipdownup is not None + bdu_tt = self.gy_blipdownup.tt + self.gy_blipdownup.delay + ky_blip = _piecewise_linear_area_at_times(bdu_tt, self.gy_blipdownup.waveform, sample_times) + blip_total = _piecewise_linear_area_at_times( + bdu_tt, self.gy_blipdownup.waveform, np.array([bdu_tt[-1]]) + )[0] + + ky = ky_base + ky_blip + ky_base_next = ky_base + blip_total + else: + raise NotImplementedError(f'Readout type "{self.readout_type}" is not supported.') + + return kx, ky, ky_base_next + + def calculate_trajectory(self) -> tuple[np.ndarray, np.ndarray]: + """Compute analytical k-space trajectory for the EPI readout. + + Returns the kx and ky positions at each ADC sample for each phase encoding line. + This replaces the need to call ``seq.calculate_kspace()`` for trajectory information. + + Returns + ------- + kx + kx trajectory in 1/m, shape ``(n_phase_enc_total, adc.num_samples)``. + ky + ky trajectory in 1/m, shape ``(n_phase_enc_total, adc.num_samples)``. + """ + n_samples = self.adc.num_samples + n_pe = self.n_phase_enc_total + + # ADC sample times relative to the start of the gradient block (center of each dwell) + sample_times = self.adc.delay + (np.arange(n_samples) + 0.5) * self.adc.dwell + + kx = np.zeros((n_pe, n_samples)) + ky = np.zeros((n_pe, n_samples)) + ky_base = self.gy_pre.area + + for pe_idx in range(n_pe): + kx[pe_idx, :], ky[pe_idx, :], ky_base = self._readout_block_kxy(pe_idx, sample_times, ky_base) + + return kx, ky + + def calculate_full_trajectory(self, n_points_per_segment: int = 200) -> dict: + """Compute full analytical k-space trajectory including prephaser and transitions. + + This extends ``calculate_trajectory()`` by also computing the k-space path during + the prephaser gradients and the transitions between readout lines (flyback gradients + or symmetric ramp transitions). Useful for visualization. + + Parameters + ---------- + n_points_per_segment + Number of densely sampled time points per gradient block for smooth trajectory lines. + + Returns + ------- + result + Dictionary with keys: + + - ``'kx'``: full kx trajectory as 1-D array (1/m). + - ``'ky'``: full ky trajectory as 1-D array (1/m). + - ``'adc_indices'``: indices into kx/ky arrays that correspond to ADC samples. + """ + n_samples = self.adc.num_samples + n_pe = self.n_phase_enc_total + + all_kx: list[np.ndarray] = [] + all_ky: list[np.ndarray] = [] + adc_indices: list[int] = [] + current_idx = 0 + + # --- Prephaser block --- + pre_dur = pp.calc_duration(self.gx_pre, self.gy_pre) + t_pre = np.linspace(0, pre_dur, n_points_per_segment, endpoint=False) + kx_pre = _trapezoid_area_at_times( + self.gx_pre.rise_time, + self.gx_pre.flat_time, + self.gx_pre.fall_time, + self.gx_pre.amplitude, + t_pre - self.gx_pre.delay, + ) + ky_pre = _trapezoid_area_at_times( + self.gy_pre.rise_time, + self.gy_pre.flat_time, + self.gy_pre.fall_time, + self.gy_pre.amplitude, + t_pre - self.gy_pre.delay, + ) + all_kx.append(kx_pre) + all_ky.append(ky_pre) + current_idx += len(t_pre) + + # Current kx/ky offsets at the end of the prephaser + kx_offset = self.gx_pre.area + ky_base = self.gy_pre.area + + for pe_idx in range(n_pe): + # --- Readout block (densely sampled) --- + block_dur = pp.calc_duration(self.gx) + t_block = np.linspace(0, block_dur, n_points_per_segment, endpoint=False) + + kx_block, ky_block, ky_base = self._readout_block_kxy(pe_idx, t_block, ky_base) + + all_kx.append(kx_block) + all_ky.append(ky_block) + + # Record ADC sample indices + for s in range(n_samples): + adc_indices.append(current_idx + np.searchsorted(t_block, self.adc.delay + (s + 0.5) * self.adc.dwell)) + current_idx += len(t_block) + + # Flyback + blip transition block (flyback only) + if self.readout_type == 'flyback' and pe_idx < n_pe - 1: + assert self.gx_flyback is not None + fb_dur = pp.calc_duration(self.gx_flyback, self.gy_blip) + t_fb = np.linspace(0, fb_dur, n_points_per_segment // 2, endpoint=False) + # kx during flyback + kx_fb = ( + kx_offset + + self.gx.area + + _trapezoid_area_at_times( + self.gx_flyback.rise_time, + self.gx_flyback.flat_time, + self.gx_flyback.fall_time, + self.gx_flyback.amplitude, + t_fb - self.gx_flyback.delay, + ) + ) + # ky during flyback + ky_pre_blip = ky_base - self.gy_blip.area + ky_fb = ky_pre_blip + _trapezoid_area_at_times( + self.gy_blip.rise_time, + self.gy_blip.flat_time, + self.gy_blip.fall_time, + self.gy_blip.amplitude, + t_fb - self.gy_blip.delay, + ) + all_kx.append(kx_fb) + all_ky.append(ky_fb) + current_idx += len(t_fb) + + kx_full = np.concatenate(all_kx) + ky_full = np.concatenate(all_ky) + adc_idx = np.array(adc_indices, dtype=int) + # Clip to valid range + adc_idx = np.clip(adc_idx, 0, len(kx_full) - 1) + + return {'kx': kx_full, 'ky': ky_full, 'adc_indices': adc_idx} + + def add_navigator_to_seq( + self, + seq: pp.Sequence, + gz_rewinder, + n_navigator_acq: int, + n_phase_encoding: int, + gx_pre_factor: int, + mrd_dataset: ismrmrd.Dataset | None = None, + ) -> tuple[pp.Sequence, ismrmrd.Dataset | None]: + """Add navigator acquisitions to the sequence for EPI ghost correction. + + Parameters + ---------- + seq + PyPulseq Sequence object. + gz_rewinder + Slice-selection rewinder gradient. + n_navigator_acq + Number of navigator acquisitions. + n_phase_encoding + Number of phase encoding steps (used for LIN label). + gx_pre_factor + Sign factor for the readout pre-winder. Use ``-1`` for odd navigator + count (FID) or even navigator count (SE) to ensure the kx position + is correct at the start of the imaging readout. + mrd_dataset + ISMRMRD dataset for writing navigator trajectories. If None, no + trajectory is written. + """ + from math import floor + + # Align slice-selection rewinder and readout pre-winder + gz_rewinder, gx_pre = pp.align(left=[gz_rewinder], right=[self.gx_pre]) + seq.add_block( + gz_rewinder, + pp.scale_grad(gx_pre, gx_pre_factor), + pp.make_label(label='NAV', type='SET', value=1), + pp.make_label(label='LIN', type='SET', value=floor(n_phase_encoding / 2)), + ) + + # Precompute analytical navigator kx trajectory (single line, no ky blip) + nav_sample_times = self.adc.delay + (np.arange(self.adc.num_samples) + 0.5) * self.adc.dwell + nav_kx_forward = _trapezoid_area_at_times( + self.gx.rise_time, self.gx.flat_time, self.gx.fall_time, abs(self.gx.amplitude), nav_sample_times + ) + + # Navigator kx offset: starts from gx_pre.area scaled by gx_pre_factor + nav_kx_offset = self.gx_pre.area * gx_pre_factor + + # Add navigator acquisitions + for n in range(n_navigator_acq): + gx_sign = gx_pre_factor * (-1) ** n + seq.add_block( + pp.scale_grad(self.gx, gx_sign), + self.adc, + pp.make_label(label='REV', type='SET', value=gx_sign < 0), + pp.make_label(label='SEG', type='SET', value=gx_sign < 0), + pp.make_label(label='AVG', type='SET', value=(n + 1) == 3), + ) + + # Write navigator trajectory to MRD + if mrd_dataset is not None: + n_samples = self.adc.num_samples + traj = np.zeros((n_samples, 2), dtype=np.float32) + if gx_sign > 0: + nav_kx = nav_kx_offset + nav_kx_forward + else: + nav_kx = nav_kx_offset - nav_kx_forward + traj[:, 0] = nav_kx * self.fov * self.oversampling + acq = ismrmrd.Acquisition() + acq.resize(trajectory_dimensions=2, number_of_samples=n_samples) + acq.traj[:] = traj + mrd_dataset.append_acquisition(acq) + + # Update kx offset for next navigator + nav_kx_offset += gx_sign * self.gx.area + + return seq, mrd_dataset + + def add_to_seq( + self, + seq: pp.Sequence, + add_prephaser: bool = True, + mrd_dataset: ismrmrd.Dataset | None = None, + ) -> tuple[pp.Sequence, ismrmrd.Dataset | None]: + """Add EPI readout blocks to the sequence. + + If `mrd_dataset` is provided, the analytical k-space trajectory is + computed and written to the MRD file for each readout line. + """ + # (Re)set phase encoding (LIN) label + lin_label = pp.make_label(label='LIN', type='SET', value=0) + + # Compute analytical trajectory for MRD writing + if mrd_dataset is not None: + kx_traj, ky_traj = self.calculate_trajectory() + + # Add pre-phaser gradients if enabled + if add_prephaser: + seq.add_block(self.gx_pre, self.gy_pre) + + for pe_idx in range(self.n_phase_enc_total): + if self.readout_type == 'symmetric': + # Select blip gradient based on phase encoding index + if pe_idx == 0: + gy_blip = self.gy_blipup + elif pe_idx == self.n_phase_enc_total - 1: + gy_blip = self.gy_blipdown + else: + gy_blip = self.gy_blipdownup + + # Add readout block and reverse polarity of readout gradient + gx_scale = -1 if np.mod(pe_idx, 2) else 1 + rev_label = pp.make_label(type='SET', label='REV', value=gx_scale == -1) + seg_label = pp.make_label(type='SET', label='SEG', value=gx_scale == -1) + seq.add_block(pp.scale_grad(self.gx, gx_scale), gy_blip, self.adc, lin_label, rev_label, seg_label) + + elif self.readout_type == 'flyback': + seq.add_block(self.gx, self.adc, lin_label) + if pe_idx != self.n_phase_enc_total - 1: + seq.add_block(self.gx_flyback, self.gy_blip) + + # Write per-readout trajectory to MRD + if mrd_dataset is not None: + n_samples = self.adc.num_samples + traj = np.zeros((n_samples, 2), dtype=np.float32) + traj[:, 0] = kx_traj[pe_idx] * self.fov * self.oversampling + traj[:, 1] = ky_traj[pe_idx] * self.fov + + acq = ismrmrd.Acquisition() + acq.resize(trajectory_dimensions=2, number_of_samples=n_samples) + acq.traj[:] = traj + mrd_dataset.append_acquisition(acq) + + lin_label = pp.make_label(label='LIN', type='INC', value=1) + + if self.spoiling_enable: + seq.add_block(self.gz_spoil) + + return seq, mrd_dataset + + def plot_sequence(self): + """Plot the sequence.""" + seq = pp.Sequence(self.system) + seq, _ = self.add_to_seq(seq) + _, axs1, _, axs2 = seq.plot(grad_disp='mT/m', plot_now=False) + # add vertical line at time to center to all plots in both figures + for ax in axs1 + axs2: + ax.axvline(x=self.time_to_center, color='r', linestyle='--') + plt.show() + + def plot_trajectory(self, show_adc: bool = True): + """Plot full k-space trajectory with color-coded direction. + + The trajectory is colored from start (dark) to end (bright) using the + copper colormap. Small arrows at the midpoint of each readout line + indicate the traversal direction. ADC sample positions are shown as + red markers. + + Parameters + ---------- + show_adc + If True, ADC sample positions are shown as red markers. + + Returns + ------- + fig + Matplotlib figure object. + """ + from matplotlib.collections import LineCollection + + traj = self.calculate_full_trajectory() + kx = traj['kx'] + ky = traj['ky'] + adc_idx = traj['adc_indices'] + + fig, ax = plt.subplots() + + # Create line segments colored by traversal order + points = np.column_stack([kx, ky]).reshape(-1, 1, 2) + segments = np.concatenate([points[:-1], points[1:]], axis=1) + t = np.linspace(0, 1, len(segments)) + lc = LineCollection(segments.tolist(), cmap='copper', linewidth=1.0) + lc.set_array(t) + ax.add_collection(lc) + + # ADC samples + if show_adc: + ax.plot(kx[adc_idx], ky[adc_idx], 'x', color='red', markersize=3, zorder=3) + + # Direction arrows at the midpoint of each readout line + n_samples = self.adc.num_samples + for pe_idx in range(self.n_phase_enc_total): + start = pe_idx * n_samples + mid = start + n_samples // 2 + kx_adc = kx[adc_idx] + ky_adc = ky[adc_idx] + dx = kx_adc[mid] - kx_adc[mid - 1] + dy = ky_adc[mid] - ky_adc[mid - 1] + ax.annotate( + '', + xy=(kx_adc[mid] + dx, ky_adc[mid] + dy), + xytext=(kx_adc[mid], ky_adc[mid]), + arrowprops={'arrowstyle': '->', 'color': 'blue', 'lw': 1.5}, + ) + + ax.set_xlabel('kx (1/m)') + ax.set_ylabel('ky (1/m)') + ax.set_aspect('equal', adjustable='datalim') + ax.autoscale_view() + ax.grid(True, alpha=0.3) + plt.tight_layout() + plt.show() + + return fig diff --git a/src/mrseq/utils/__init__.py b/src/mrseq/utils/__init__.py index 31bd0a0..b19657d 100644 --- a/src/mrseq/utils/__init__.py +++ b/src/mrseq/utils/__init__.py @@ -3,3 +3,4 @@ from mrseq.utils.trajectory import cartesian_phase_encoding, spiral_acquisition, MultiEchoAcquisition from mrseq.utils.vds import variable_density_spiral_trajectory from mrseq.utils.ismrmrd import create_header, combine_ismrmrd_files +from mrseq.utils.EpiReadout import EpiReadout diff --git a/tests/scripts/test_epi2d_fid.py b/tests/scripts/test_epi2d_fid.py new file mode 100644 index 0000000..6e13ecb --- /dev/null +++ b/tests/scripts/test_epi2d_fid.py @@ -0,0 +1,128 @@ +"""Tests for EPI FID sequence.""" + +from typing import Literal + +import ismrmrd +import numpy as np +import pytest +from mrseq.scripts.epi2d_fid import epi2d_fid_kernel +from mrseq.scripts.epi2d_fid import main as create_seq +from mrseq.utils.EpiReadout import EpiReadout + +EXPECTED_DUR = 0.0389 # defined 2026-02-28 + + +def test_default_seq_duration(system_defaults): + """Test if default values result in expected sequence duration.""" + seq, _ = create_seq(system=system_defaults, show_plots=False) + duration = seq.duration()[0] + assert duration == pytest.approx(EXPECTED_DUR) + + +def test_seq_creation_error_on_short_te(system_defaults): + """Test if error is raised on too short echo time.""" + with pytest.raises(ValueError): + create_seq(system=system_defaults, te=1e-3, show_plots=False) + + +def test_seq_creation_error_on_short_tr(system_defaults): + """Test if error is raised on too short repetition time.""" + with pytest.raises(ValueError): + create_seq(system=system_defaults, tr=2e-3, show_plots=False) + + +@pytest.mark.parametrize('add_noise_acq', [True, False]) +@pytest.mark.parametrize('readout_type', ('flyback', 'symmetric')) +@pytest.mark.parametrize('add_navigator_acq', [True, False]) +@pytest.mark.parametrize('partial_fourier_factor', [0.75, 1.0]) +@pytest.mark.parametrize('ramp_sampling', [True, False]) +@pytest.mark.parametrize('oversampling', [1, 2]) +@pytest.mark.parametrize('n_slices', [1, 4]) +def test_mrd_trajectory( + system_defaults, + readout_type: Literal['flyback', 'symmetric'], + add_navigator_acq: bool, + add_noise_acq: bool, + partial_fourier_factor: float, + ramp_sampling: bool, + oversampling: Literal[1, 2], + n_slices: int, + tmp_path, +): + """Test that the MRD trajectory matches the analytical and PyPulseq trajectories.""" + fov = 200e-3 + n_readout = 32 + n_phase_encoding = 32 + + mrd_file = tmp_path / 'test_epi2d_fid_traj.h5' + + seq, _, _ = epi2d_fid_kernel( + system=system_defaults, + te=None, + tr=None, + fov_xy=fov, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=100e3, + slice_thickness=5e-3, + n_slices=n_slices, + rf_duration=2e-3, + rf_flip_angle=90, + rf_bwt=4, + rf_apodization=0.5, + readout_type=readout_type, + readout_oversampling=oversampling, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + add_spoiler=True, + add_noise_acq=add_noise_acq, + add_navigator_acq=add_navigator_acq, + mrd_header_file=mrd_file, + ) + + # Read MRD trajectories + ds = ismrmrd.Dataset(mrd_file, 'w', False) + n_acq = ds.number_of_acquisitions() + traj_mrd = [ds.read_acquisition(i).traj.copy() for i in range(n_acq)] + ds.close() + + # Recreate EpiReadout for reference + epi2d = EpiReadout( + system=system_defaults, + fov=fov, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=100e3, + oversampling=oversampling, + ramp_sampling=ramp_sampling, + readout_type=readout_type, + partial_fourier_factor=partial_fourier_factor, + pe_enable=True, + spoiling_enable=True, + ) + n_samples = epi2d.adc.num_samples + n_nav = 3 if add_navigator_acq else 0 + n_noise = 1 if add_noise_acq else 0 + n_pre_per_slice = n_nav + n_epi = epi2d.n_phase_enc_total + assert n_acq == n_noise + (n_pre_per_slice + n_epi) * n_slices + + # Compare Pulseq trajectories against MRD trajectories including noise and navigator acquisitions + k_traj_adc, _, _, _, _ = seq.calculate_kspace() + for i in range(n_acq): + start = i * n_samples + end = start + n_samples + kx_calc = (k_traj_adc[0, start:end] * fov * oversampling).astype(np.float32) + ky_calc = (k_traj_adc[1, start:end] * fov).astype(np.float32) + np.testing.assert_array_almost_equal(traj_mrd[i][:, 0], kx_calc, decimal=10) + np.testing.assert_array_almost_equal(traj_mrd[i][:, 1], ky_calc, decimal=10) + + # Compare analytical trajectories against MRD trajectories without noise and navigator acquisitions + kx_analytical, ky_analytical = epi2d.calculate_trajectory() + for s in range(n_slices): + for pe_idx in range(n_epi): + mrd_idx = n_noise + s * (n_nav + n_epi) + n_nav + pe_idx + kx_expected = (kx_analytical[pe_idx] * fov * oversampling).astype(np.float32) + ky_expected = (ky_analytical[pe_idx] * fov).astype(np.float32) + np.testing.assert_array_equal(traj_mrd[mrd_idx][:, 0], kx_expected) + np.testing.assert_array_equal(traj_mrd[mrd_idx][:, 1], ky_expected) diff --git a/tests/scripts/test_epi2d_se.py b/tests/scripts/test_epi2d_se.py new file mode 100644 index 0000000..6d8daf7 --- /dev/null +++ b/tests/scripts/test_epi2d_se.py @@ -0,0 +1,128 @@ +"""Tests for EPI SE sequence.""" + +from typing import Literal + +import ismrmrd +import numpy as np +import pytest +from mrseq.scripts.epi2d_se import epi2d_se_kernel +from mrseq.scripts.epi2d_se import main as create_seq +from mrseq.utils.EpiReadout import EpiReadout + +EXPECTED_DUR = 0.04846 # defined 2026-05-19 + + +def test_default_seq_duration(system_defaults): + """Test if default values result in expected sequence duration.""" + seq, _ = create_seq(system=system_defaults, show_plots=False) + duration = seq.duration()[0] + assert duration == pytest.approx(EXPECTED_DUR) + + +def test_seq_creation_error_on_short_te(system_defaults): + """Test if error is raised on too short echo time.""" + with pytest.raises(ValueError): + create_seq(system=system_defaults, te=1e-3, show_plots=False) + + +def test_seq_creation_error_on_short_tr(system_defaults): + """Test if error is raised on too short repetition time.""" + with pytest.raises(ValueError): + create_seq(system=system_defaults, tr=2e-3, show_plots=False) + + +@pytest.mark.parametrize('add_noise_acq', [True, False]) +@pytest.mark.parametrize('readout_type', ('flyback', 'symmetric')) +@pytest.mark.parametrize('add_navigator_acq', [True, False]) +@pytest.mark.parametrize('partial_fourier_factor', [0.75, 1.0]) +@pytest.mark.parametrize('ramp_sampling', [True, False]) +@pytest.mark.parametrize('oversampling', [1, 2]) +@pytest.mark.parametrize('n_slices', [1, 4]) +def test_mrd_trajectory( + system_defaults, + readout_type: Literal['flyback', 'symmetric'], + add_navigator_acq: bool, + add_noise_acq: bool, + partial_fourier_factor: float, + ramp_sampling: bool, + oversampling: Literal[1, 2], + n_slices: int, + tmp_path, +): + """Test that the MRD trajectory matches the analytical and PyPulseq trajectories.""" + fov = 200e-3 + n_readout = 32 + n_phase_encoding = 32 + + mrd_file = tmp_path / 'test_epi2d_se_traj.h5' + + seq, _, _ = epi2d_se_kernel( + system=system_defaults, + te=None, + tr=None, + fov_xy=fov, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=100e3, + slice_thickness=5e-3, + n_slices=n_slices, + rf_duration=2e-3, + rf_flip_angle=90, + rf_bwt=4, + rf_apodization=0.5, + readout_type=readout_type, + readout_oversampling=oversampling, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + add_spoiler=True, + add_noise_acq=add_noise_acq, + add_navigator_acq=add_navigator_acq, + mrd_header_file=mrd_file, + ) + + # Read MRD trajectories + ds = ismrmrd.Dataset(mrd_file, 'w', False) + n_acq = ds.number_of_acquisitions() + traj_mrd = [ds.read_acquisition(i).traj.copy() for i in range(n_acq)] + ds.close() + + # Recreate EpiReadout for reference + epi2d = EpiReadout( + system=system_defaults, + fov=fov, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=100e3, + oversampling=oversampling, + ramp_sampling=ramp_sampling, + readout_type=readout_type, + partial_fourier_factor=partial_fourier_factor, + pe_enable=True, + spoiling_enable=True, + ) + n_samples = epi2d.adc.num_samples + n_nav = 3 if add_navigator_acq else 0 + n_noise = 1 if add_noise_acq else 0 + n_pre_per_slice = n_nav + n_epi = epi2d.n_phase_enc_total + assert n_acq == n_noise + (n_pre_per_slice + n_epi) * n_slices + + # Compare Pulseq trajectories against MRD trajectories including noise and navigator acquisitions + k_traj_adc, _, _, _, _ = seq.calculate_kspace() + for i in range(n_acq): + start = i * n_samples + end = start + n_samples + kx_calc = (k_traj_adc[0, start:end] * fov * oversampling).astype(np.float32) + ky_calc = (k_traj_adc[1, start:end] * fov).astype(np.float32) + np.testing.assert_array_almost_equal(traj_mrd[i][:, 0], kx_calc, decimal=10) + np.testing.assert_array_almost_equal(traj_mrd[i][:, 1], ky_calc, decimal=10) + + # Compare analytical trajectories against MRD trajectories without noise and navigator acquisitions + kx_analytical, ky_analytical = epi2d.calculate_trajectory() + for s in range(n_slices): + for pe_idx in range(n_epi): + mrd_idx = n_noise + s * (n_nav + n_epi) + n_nav + pe_idx + kx_expected = (kx_analytical[pe_idx] * fov * oversampling).astype(np.float32) + ky_expected = (ky_analytical[pe_idx] * fov).astype(np.float32) + np.testing.assert_array_equal(traj_mrd[mrd_idx][:, 0], kx_expected) + np.testing.assert_array_equal(traj_mrd[mrd_idx][:, 1], ky_expected) diff --git a/tests/utils/gradient_waveforms.py b/tests/utils/gradient_waveforms.py new file mode 100644 index 0000000..10ed2a8 --- /dev/null +++ b/tests/utils/gradient_waveforms.py @@ -0,0 +1,42 @@ +"""Helper function to get interpolate gradient waveforms.""" + +import numpy as np +import pypulseq as pp + + +def get_interp_waveform_for_gx_gy(seq: pp.Sequence, dt: np.ndarray | None = None, scale: float = 1.0): + """Interpolate gradient waveforms for the x and y axes. + + Parameters + ---------- + seq + The PyPulseq sequence object containing gradient waveforms. + dt + Desired time points for interpolation. If None, a default time array is generated. + scale + Scaling factor for the gradient waveforms. Default is 1. + + Returns + ------- + gx_waveform_intp + Interpolated gradient waveform for the x-axis. + gy_waveform_intp + Interpolated gradient waveform for the y-axis. + dt + Time points corresponding to the interpolated waveforms. + """ + w = seq.waveforms_and_times() + gx_waveform = w[0][0][1] * scale + gx_waveform_time = w[0][0][0] + + gy_waveform = w[0][1][1] * scale + gy_waveform_time = w[0][1][0] + + if dt is None: + dt = np.arange( + min(gx_waveform_time[0], gy_waveform_time[0]), max(gx_waveform_time[-1], gy_waveform_time[-1]), step=1e-7 + ) + gx_waveform_intp = np.interp(dt, gx_waveform_time, gx_waveform) + gy_waveform_intp = np.interp(dt, gy_waveform_time, gy_waveform) + + return gx_waveform_intp, gy_waveform_intp, dt diff --git a/tests/utils/test_epi_readout.py b/tests/utils/test_epi_readout.py new file mode 100644 index 0000000..82c29f3 --- /dev/null +++ b/tests/utils/test_epi_readout.py @@ -0,0 +1,124 @@ +"""Tests for EPI readout.""" + +from typing import Literal + +import numpy as np +import pypulseq as pp +import pytest +from mrseq.utils.EpiReadout import EpiReadout + + +@pytest.mark.parametrize('fov', (64e-3, 128e-3)) +@pytest.mark.parametrize('n_readout', (32, 64)) +@pytest.mark.parametrize('n_phase_encoding', (32, 64)) +@pytest.mark.parametrize('bandwidth', (50e3, 40e3)) +@pytest.mark.parametrize('oversampling', (1, 2)) +@pytest.mark.parametrize('ramp_sampling', (True, False)) +@pytest.mark.parametrize('readout_type', ('flyback', 'symmetric')) +@pytest.mark.parametrize('partial_fourier_factor', (0.75, 1.0)) +@pytest.mark.parametrize('spoiling_enable', (True, False)) +def test_epi_time_to_center( + system_defaults: pp.Opts | None, + fov: float, + n_readout, + n_phase_encoding, + bandwidth: float, + oversampling: int, + ramp_sampling: bool, + readout_type: Literal['flyback', 'symmetric'], + partial_fourier_factor: float, + spoiling_enable: bool, +): + """Test epi readout for different parameter combinations.""" + + def get_time_to_k0(seq): + """Find time when gradients are 0 (k-space center).""" + k_traj_adc, _, _, _, t_adc = seq.calculate_kspace() + m0 = np.sqrt(k_traj_adc[0] ** 2 + k_traj_adc[1] ** 2) + k0_idx = np.argmin(m0[100:-100]) + 100 + return t_adc[k0_idx], np.median(np.diff(t_adc)) + + # create EpiReadout object + epi2d = EpiReadout( + system=system_defaults, + fov=fov, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=bandwidth, + oversampling=oversampling, + readout_type=readout_type, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + adc_freq_offset=0.0, + pe_enable=True, + spoiling_enable=spoiling_enable, + ) + + # Complete EPI readout + seq = pp.Sequence(system=system_defaults) + seq, _ = epi2d.add_to_seq(seq) + assert sum(seq.block_durations.values()) == pytest.approx(epi2d.total_duration, abs=1e-6) + time_to_k0, dwell_time = get_time_to_k0(seq) + assert np.isclose(time_to_k0, epi2d.time_to_center, atol=dwell_time) + + # EPI readout without prephaseser gradients + seq = pp.Sequence(system=system_defaults) + seq, _ = epi2d.add_to_seq(seq, add_prephaser=False) + assert sum(seq.block_durations.values()) == pytest.approx(epi2d.total_duration_without_prephaser, abs=1e-6) + + +@pytest.mark.parametrize('fov', (64e-3, 128e-3)) +@pytest.mark.parametrize('n_readout', (32, 64)) +@pytest.mark.parametrize('n_phase_encoding', (32, 64)) +@pytest.mark.parametrize('bandwidth', (50e3, 40e3)) +@pytest.mark.parametrize('oversampling', (1, 2)) +@pytest.mark.parametrize('ramp_sampling', (True, False)) +@pytest.mark.parametrize('readout_type', ('flyback', 'symmetric')) +@pytest.mark.parametrize('partial_fourier_factor', (0.75, 1.0)) +def test_epi_analytical_trajectory( + system_defaults: pp.Opts | None, + fov: float, + n_readout: int, + n_phase_encoding: int, + bandwidth: float, + oversampling: int, + ramp_sampling: bool, + readout_type: Literal['flyback', 'symmetric'], + partial_fourier_factor: float, +): + """Test that the analytical trajectory matches calculate_kspace().""" + epi2d = EpiReadout( + system=system_defaults, + fov=fov, + n_readout=n_readout, + n_phase_encoding=n_phase_encoding, + bandwidth=bandwidth, + oversampling=oversampling, + readout_type=readout_type, + ramp_sampling=ramp_sampling, + partial_fourier_factor=partial_fourier_factor, + adc_freq_offset=0.0, + pe_enable=True, + spoiling_enable=True, + ) + + # Build sequence and get PyPulseq trajectory + seq = pp.Sequence(system=system_defaults) + seq, _ = epi2d.add_to_seq(seq) + k_traj_adc, _, _, _, _ = seq.calculate_kspace() + + # Get analytical trajectory + kx_analytical, ky_analytical = epi2d.calculate_trajectory() + + n_samples = epi2d.adc.num_samples + n_pe = epi2d.n_phase_enc_total + + # The prephaser block has no ADC, so calculate_kspace ADC samples start at the first readout + for pe_idx in range(n_pe): + start = pe_idx * n_samples + end = start + n_samples + kx_pulseq = k_traj_adc[0, start:end] + ky_pulseq = k_traj_adc[1, start:end] + + np.testing.assert_allclose(kx_analytical[pe_idx], kx_pulseq, atol=1e-9) + np.testing.assert_allclose(ky_analytical[pe_idx], ky_pulseq, atol=1e-9) diff --git a/tests/utils/test_trajectory.py b/tests/utils/test_trajectory.py index 352c852..10f4a12 100644 --- a/tests/utils/test_trajectory.py +++ b/tests/utils/test_trajectory.py @@ -11,6 +11,8 @@ from mrseq.utils.trajectory import undersampled_variable_density_spiral from scipy.signal import argrelextrema +from .gradient_waveforms import get_interp_waveform_for_gx_gy + @pytest.mark.parametrize('n_phase_encoding', [50, 51, 100]) @pytest.mark.parametrize('acceleration', [1, 2, 3, 4, 6]) @@ -87,44 +89,6 @@ def test_multi_gradient_echo(system_defaults): seq, _ = mecho.add_to_seq(seq, n_echoes=3) -def get_interp_waveform_for_gx_gy(seq: pp.Sequence, dt: np.ndarray | None = None, scale: float = 1.0): - """Interpolate gradient waveforms for the x and y axes. - - Parameters - ---------- - seq - The PyPulseq sequence object containing gradient waveforms. - dt - Desired time points for interpolation. If None, a default time array is generated. - scale - Scaling factor for the gradient waveforms. Default is 1. - - Returns - ------- - gx_waveform_intp - Interpolated gradient waveform for the x-axis. - gy_waveform_intp - Interpolated gradient waveform for the y-axis. - dt - Time points corresponding to the interpolated waveforms. - """ - w = seq.waveforms_and_times() - gx_waveform = w[0][0][1] * scale - gx_waveform_time = w[0][0][0] - - gy_waveform = w[0][1][1] * scale - gy_waveform_time = w[0][1][0] - - if dt is None: - dt = np.arange( - min(gx_waveform_time[0], gy_waveform_time[0]), max(gx_waveform_time[-1], gy_waveform_time[-1]), step=1e-7 - ) - gx_waveform_intp = np.interp(dt, gx_waveform_time, gx_waveform) - gy_waveform_intp = np.interp(dt, gy_waveform_time, gy_waveform) - - return gx_waveform_intp, gy_waveform_intp, dt - - @pytest.mark.parametrize('n_readout', (128, 256)) @pytest.mark.parametrize('fov', (128e-3, 320e-3)) @pytest.mark.parametrize('undersampling_factor', (1, 2, 3, 4))