Skip to content

Pipeline

The pipeline module exposes the composable ops behind get_raw_peaks and get_centroided_spectrum. Each op takes (and most return) a RawSpectrum — raw peaks in their native (scan_number, TOF_index, intensity) integer form.

Use the convenience entry points for common workflows; reach into the ops when you need a custom ordering, want to plug in a transformation, or want to skip a step.

from tdfpy import (
    read_spectrum, subset_scans, exclude_region,
    apply_noise, convert, centroid_peaks,
    ChargeStateRegion, MadThreshold, WatershedCentroider,
)

with tdfpy.timsdata_connect("data.d") as td:
    s = read_spectrum(td, frame_id=1)
    s = subset_scans(s, scan_num_begin=0, scan_num_end=400)
    s = exclude_region(s, ChargeStateRegion(), td=td, frame_id=1)
    s = apply_noise(s, (MadThreshold(k=3),), td=td, frame_id=1)
    centroids = WatershedCentroider(
        attach_scan_half_width=10, attach_mz_idx_half_width=3
    )(s, td, 1)

WatershedCentroider accepts an optional per-group "leash" via max_scan_from_seed and max_mz_idx_from_seed — bounds on how far any group member can be from its seed. Useful for stopping chain-grown groups from wandering across the data. max_mz_idx_from_seed defaults to 10; max_scan_from_seed defaults to None (no bound on that axis).

# Cap group span at ±20 TOF indices from the seed
WatershedCentroider(
    attach_scan_half_width=10, attach_mz_idx_half_width=3,
    max_mz_idx_from_seed=20,
)

The standalone smooth op (and the lower-level box_smooth array helper) rewrite intensities in place — a box sum or mean over a (±scan_half_width, ±mz_idx_half_width) window — without expanding the point set. Summing (the default) amplifies genuine ion-mobility streaks ahead of noise filtering; the mean variant backs WatershedCentroider's seed-stabilising smoother, which runs before seed selection by default via the smooth_scan_half_width / smooth_mz_idx_half_width fields (defaults 5 and 3; set either to 0 to disable).

from tdfpy import read_spectrum, smooth, apply_noise, VerticalNoiseFilter

s = read_spectrum(td, frame_id=1)
s = smooth(s, scan_half_width=5, mz_idx_half_width=2)   # box sum, amplify streaks
s = apply_noise(s, (VerticalNoiseFilter(),), td=td, frame_id=1)

Data carrier

tdfpy.RawSpectrum dataclass

RawSpectrum(
    scan_indices: np.ndarray,
    mz_indices: np.ndarray,
    intensities: np.ndarray,
    num_scans: int,
)

Raw peaks in integer-index (TOF / scan) space.

The native form of Bruker raw data — TOF index and scan number are integers, intensity is a 32-bit-ish count. All pipeline ops operate on this representation; conversion to m/z and 1/K0 happens once at the end via :func:convert.

filter

filter(mask: np.ndarray) -> 'RawSpectrum'

Return a new spectrum keeping only points where mask is True.

Source code in src/tdfpy/pipeline.py
63
64
65
66
67
68
69
70
def filter(self, mask: np.ndarray) -> "RawSpectrum":
    """Return a new spectrum keeping only points where ``mask`` is True."""
    return RawSpectrum(
        scan_indices=self.scan_indices[mask],
        mz_indices=self.mz_indices[mask],
        intensities=self.intensities[mask],
        num_scans=self.num_scans,
    )

Reading

tdfpy.read_spectrum

read_spectrum(td: TimsData, frame_id: int) -> RawSpectrum

Read a frame's raw peaks into integer-index form.

Source code in src/tdfpy/pipeline.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def read_spectrum(td: TimsData, frame_id: int) -> RawSpectrum:
    """Read a frame's raw peaks into integer-index form."""
    if td.conn is None:
        raise RuntimeError("TimsData connection is not open")

    cursor = td.conn.cursor()
    cursor.execute("SELECT NumScans FROM Frames WHERE Id = ?", (frame_id,))
    result = cursor.fetchone()
    if result is None:
        raise ValueError(f"Frame {frame_id} not found in database")
    (num_scans,) = result
    if num_scans == 0:
        return RawSpectrum.empty_like(0)

    scans = td.readScans(frame_id, 0, num_scans)
    scan_lengths = np.fromiter(
        (len(idx) for idx, _ in scans), dtype=np.int64, count=num_scans
    )
    total_peaks = int(scan_lengths.sum())
    if total_peaks == 0:
        return RawSpectrum.empty_like(num_scans)

    scan_indices = np.repeat(np.arange(num_scans, dtype=np.int64), scan_lengths)
    mz_indices = np.concatenate([idx for idx, _ in scans]).astype(np.int64, copy=False)
    intensities = np.concatenate(
        [intens for _, intens in scans]
    ).astype(np.float64, copy=False)
    return RawSpectrum(
        scan_indices=scan_indices,
        mz_indices=mz_indices,
        intensities=intensities,
        num_scans=num_scans,
    )

Scoping

tdfpy.subset_scans

subset_scans(
    spectrum: RawSpectrum,
    *,
    scan_num_begin: int,
    scan_num_end: int
) -> RawSpectrum

Restrict the spectrum to peaks in scans [scan_num_begin, scan_num_end).

The bounds are half-open: begin inclusive, end exclusive — matching Bruker's readScans(frame_id, begin, end) semantics. The returned RawSpectrum keeps its num_scans field (i.e. the parent frame's full scan count) so downstream ops still address scans by their original index.

Used by :class:~tdfpy.DiaWindow and :class:~tdfpy.PrmTransition to restrict centroiding / raw-peak extraction to the isolation window's scan range.

Source code in src/tdfpy/pipeline.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def subset_scans(
    spectrum: RawSpectrum,
    *,
    scan_num_begin: int,
    scan_num_end: int,
) -> RawSpectrum:
    """Restrict the spectrum to peaks in scans ``[scan_num_begin, scan_num_end)``.

    The bounds are half-open: ``begin`` inclusive, ``end`` exclusive — matching
    Bruker's ``readScans(frame_id, begin, end)`` semantics. The returned
    ``RawSpectrum`` keeps its ``num_scans`` field (i.e. the parent frame's
    full scan count) so downstream ops still address scans by their original
    index.

    Used by :class:`~tdfpy.DiaWindow` and :class:`~tdfpy.PrmTransition` to
    restrict centroiding / raw-peak extraction to the isolation window's
    scan range.
    """
    if spectrum.empty:
        return spectrum
    if scan_num_begin < 0 or scan_num_end < scan_num_begin:
        raise ValueError(
            f"invalid scan range [{scan_num_begin}, {scan_num_end})"
        )
    mask = (spectrum.scan_indices >= scan_num_begin) & (
        spectrum.scan_indices < scan_num_end
    )
    return spectrum.filter(mask)

tdfpy.exclude_region

exclude_region(
    spectrum: RawSpectrum,
    region: ChargeStateRegion,
    *,
    td: TimsData,
    frame_id: int
) -> RawSpectrum

Drop peaks lying inside the given region.

Source code in src/tdfpy/pipeline.py
122
123
124
125
126
127
128
129
130
131
132
133
134
def exclude_region(
    spectrum: RawSpectrum,
    region: ChargeStateRegion,
    *,
    td: TimsData,
    frame_id: int,
) -> RawSpectrum:
    """Drop peaks lying inside the given region."""
    if spectrum.empty:
        return spectrum
    cutoff = region.index_cutoff_per_scan(td, frame_id, spectrum.num_scans)
    mask = spectrum.mz_indices >= cutoff[spectrum.scan_indices]
    return spectrum.filter(mask)

Smoothing

The convenience entry points (get_raw_peaks, get_centroided_spectrum, Frame.centroid(), …) accept smoothing as a single smooth=Smooth(...) argument; smooth / box_smooth are the underlying composable ops.

tdfpy.Smooth dataclass

Smooth(
    scan_half_width: int = 5,
    mz_idx_half_width: int = 2,
    mode: Literal["sum", "mean"] = "sum",
)

Config for the pre-noise-filter intensity smoothing step.

A small, hashable carrier for the :func:smooth op's knobs so the convenience entry points (get_raw_peaks, get_centroided_spectrum, Frame.centroid(), …) can accept smoothing as a single smooth=Smooth(...) argument. Frozen so it is hashable (Streamlit-cacheable).

apply

apply(spectrum: RawSpectrum) -> RawSpectrum

Return spectrum with intensities box-smoothed per this config.

Source code in src/tdfpy/pipeline.py
419
420
421
422
423
424
425
426
def apply(self, spectrum: RawSpectrum) -> RawSpectrum:
    """Return ``spectrum`` with intensities box-smoothed per this config."""
    return smooth(
        spectrum,
        scan_half_width=self.scan_half_width,
        mz_idx_half_width=self.mz_idx_half_width,
        mode=self.mode,
    )

tdfpy.smooth

smooth(
    spectrum: RawSpectrum,
    *,
    scan_half_width: int = 5,
    mz_idx_half_width: int = 2,
    mode: Literal["sum", "mean"] = "sum"
) -> RawSpectrum

Return a new spectrum with box-smoothed intensities (positions kept).

A pre-noise-filter signal-amplification step: summing intensity over a small (±scan_half_width, ±mz_idx_half_width) window boosts genuine features that recur across consecutive mobility scans while leaving scattered single-hit noise largely unchanged. Composes ahead of :func:apply_noise in a custom pipeline. See :func:box_smooth.

Source code in src/tdfpy/pipeline.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def smooth(
    spectrum: RawSpectrum,
    *,
    scan_half_width: int = 5,
    mz_idx_half_width: int = 2,
    mode: Literal["sum", "mean"] = "sum",
) -> RawSpectrum:
    """Return a new spectrum with box-smoothed intensities (positions kept).

    A pre-noise-filter signal-amplification step: summing intensity over a
    small ``(±scan_half_width, ±mz_idx_half_width)`` window boosts genuine
    features that recur across consecutive mobility scans while leaving
    scattered single-hit noise largely unchanged. Composes ahead of
    :func:`apply_noise` in a custom pipeline. See :func:`box_smooth`.
    """
    if spectrum.empty:
        return spectrum
    new_int = box_smooth(
        spectrum.scan_indices,
        spectrum.mz_indices,
        spectrum.intensities,
        scan_half_width=scan_half_width,
        mz_idx_half_width=mz_idx_half_width,
        mode=mode,
    )
    return RawSpectrum(
        scan_indices=spectrum.scan_indices,
        mz_indices=spectrum.mz_indices,
        intensities=new_int,
        num_scans=spectrum.num_scans,
    )

tdfpy.box_smooth

box_smooth(
    scan_indices: np.ndarray,
    mz_indices: np.ndarray,
    intensities: np.ndarray,
    *,
    scan_half_width: int,
    mz_idx_half_width: int,
    mode: Literal["sum", "mean"] = "sum"
) -> np.ndarray

Box sum / mean of intensities over a (±scan, ±mz_idx) index window.

For every peak, gathers all peaks within ±scan_half_width mobility scans and ±mz_idx_half_width TOF indices and replaces the peak's intensity with the sum (mode="sum") or mean (mode="mean") of that window. Positions are preserved — only intensities change. Summing amplifies genuine features (which recur across many scans) while leaving isolated background hits untouched; the mean variant is used internally by :class:WatershedCentroider to stabilise seed ordering.

Vectorised per mobility-scan: for each scan offset the contributing source scan's peaks are searched by a sorted-m/z prefix sum, so cost is O((2·scan_half_width+1) · N · log N) rather than the naïve O(N²).

Source code in src/tdfpy/pipeline.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def box_smooth(
    scan_indices: np.ndarray,
    mz_indices: np.ndarray,
    intensities: np.ndarray,
    *,
    scan_half_width: int,
    mz_idx_half_width: int,
    mode: Literal["sum", "mean"] = "sum",
) -> np.ndarray:
    """Box sum / mean of intensities over a (±scan, ±mz_idx) index window.

    For every peak, gathers all peaks within ``±scan_half_width`` mobility
    scans and ``±mz_idx_half_width`` TOF indices and replaces the peak's
    intensity with the sum (``mode="sum"``) or mean (``mode="mean"``) of that
    window. Positions are preserved — only intensities change. Summing
    amplifies genuine features (which recur across many scans) while leaving
    isolated background hits untouched; the mean variant is used internally by
    :class:`WatershedCentroider` to stabilise seed ordering.

    Vectorised per mobility-scan: for each scan offset the contributing source
    scan's peaks are searched by a sorted-m/z prefix sum, so cost is
    ``O((2·scan_half_width+1) · N · log N)`` rather than the naïve ``O(N²)``.
    """
    n = intensities.size
    if n == 0:
        return np.zeros(0, dtype=np.float64)
    scan = np.asarray(scan_indices, dtype=np.int64)
    mz = np.asarray(mz_indices, dtype=np.int64)
    inten = np.asarray(intensities, dtype=np.float64)
    scan_hw = max(0, int(scan_half_width))
    mz_hw = max(0, int(mz_idx_half_width))

    # Per mobility scan: sorted TOF indices + prefix sums of intensity / count.
    order = np.argsort(scan, kind="stable")
    scan_sorted = scan[order]
    uniq, starts = np.unique(scan_sorted, return_index=True)
    ends = np.append(starts[1:], scan_sorted.size)

    queries: dict[int, tuple[np.ndarray, np.ndarray]] = {}
    sources: dict[int, tuple[np.ndarray, np.ndarray, np.ndarray]] = {}
    for sv, s0, s1 in zip(uniq.tolist(), starts.tolist(), ends.tolist()):
        idx = order[s0:s1]
        m = mz[idx]
        msort = np.argsort(m, kind="stable")
        m_sorted = m[msort]
        prefix_int = np.concatenate([[0.0], np.cumsum(inten[idx[msort]])])
        prefix_cnt = np.arange(m_sorted.size + 1, dtype=np.float64)
        queries[sv] = (idx, m)
        sources[sv] = (m_sorted, prefix_int, prefix_cnt)

    result = np.zeros(n, dtype=np.float64)
    count = np.zeros(n, dtype=np.float64)
    for d in range(-scan_hw, scan_hw + 1):
        for sv in uniq.tolist():
            src = sources.get(sv + d)
            if src is None:
                continue
            q_idx, q_mz = queries[sv]
            m_sorted, prefix_int, prefix_cnt = src
            lo = np.searchsorted(m_sorted, q_mz - mz_hw, side="left")
            hi = np.searchsorted(m_sorted, q_mz + mz_hw, side="right")
            result[q_idx] += prefix_int[hi] - prefix_int[lo]
            if mode == "mean":
                count[q_idx] += prefix_cnt[hi] - prefix_cnt[lo]

    if mode == "mean":
        np.divide(result, count, out=result, where=count > 0)
    return result

Noise filtering

tdfpy.apply_noise

apply_noise(
    spectrum: RawSpectrum,
    filters: Iterable[NoiseFilter],
    *,
    td: TimsData,
    frame_id: int
) -> RawSpectrum

Apply each noise filter in order, threading the surviving peaks through.

Source code in src/tdfpy/pipeline.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def apply_noise(
    spectrum: RawSpectrum,
    filters: Iterable[NoiseFilter],
    *,
    td: TimsData,
    frame_id: int,
) -> RawSpectrum:
    """Apply each noise filter in order, threading the surviving peaks through."""
    for f in filters:
        if spectrum.empty:
            break
        mask = f.keep_mask(
            spectrum.scan_indices,
            spectrum.mz_indices,
            spectrum.intensities,
            num_scans=spectrum.num_scans,
            td=td,
            frame_id=frame_id,
        )
        spectrum = spectrum.filter(mask)
    return spectrum

Conversion

tdfpy.convert

convert(
    spectrum: RawSpectrum,
    td: TimsData,
    frame_id: int,
    *,
    ion_mobility_type: Literal[
        "ook0", "ccs", "voltage"
    ] = "ook0"
) -> np.ndarray

Convert integer indices to (m/z, intensity, ion_mobility).

Returns a (N, 3) array. Empty input yields an empty array of the same shape so callers don't need to special-case.

Source code in src/tdfpy/pipeline.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def convert(
    spectrum: RawSpectrum,
    td: TimsData,
    frame_id: int,
    *,
    ion_mobility_type: Literal["ook0", "ccs", "voltage"] = "ook0",
) -> np.ndarray:
    """Convert integer indices to (m/z, intensity, ion_mobility).

    Returns a ``(N, 3)`` array. Empty input yields an empty array of the
    same shape so callers don't need to special-case.
    """
    if spectrum.empty:
        return np.empty((0, 3), dtype=np.float64)

    ook0_per_scan = np.asarray(
        td.scanNumToOneOverK0(frame_id, np.arange(spectrum.num_scans))  # type: ignore[call-arg]
    )
    ion_mobility_array = ook0_per_scan[spectrum.scan_indices]
    mz_array = td.indexToMz(frame_id, spectrum.mz_indices)

    if ion_mobility_type == "ccs":
        ion_mobility_array = np.array(
            [
                oneOverK0ToCCSforMz(ook0, 1, mz)
                for ook0, mz in zip(ion_mobility_array, mz_array)
            ],
            dtype=np.float64,
        )
    elif ion_mobility_type == "voltage":
        ion_mobility_array = td.scanNumToVoltage(frame_id, ion_mobility_array)

    return np.column_stack((mz_array, spectrum.intensities, ion_mobility_array))

Centroiders

The two centroiders share an Centroider ABC. MergePeaksCentroider (default) operates on float m/z values via a greedy tolerance-based merge; WatershedCentroider works in integer index space via intensity-ordered region growing.

tdfpy.Centroider

Bases: ABC

Base class for centroiding algorithms.

Subclasses are frozen dataclasses carrying their tunable knobs as fields and implement :meth:__call__, which takes the (filtered) raw spectrum and returns an (N, 3) array of [mz, intensity, ion_mobility] centroids. Centroiders decide internally whether to operate in integer index space or after conversion to float m/z.

tdfpy.MergePeaksCentroider dataclass

MergePeaksCentroider(
    mz_tolerance: float = 8.0,
    mz_tolerance_type: Literal["ppm", "da"] = "ppm",
    im_tolerance: float = 0.1,
    im_tolerance_type: Literal[
        "relative", "absolute"
    ] = "relative",
    min_peaks: int = 3,
    max_peaks: int | None = None,
    peak_noise_filter: bool = False,
    peak_noise_window: float = 0.1,
    peak_noise_end_fraction: float = 0.1,
    use_numba: bool = True,
)

Bases: Centroider

Greedy m/z-tolerance centroider — wraps :func:tdfpy.merge_peaks.

Operates on float m/z values. Real peaks are matched within an m/z tolerance (ppm or Da) and an ion mobility tolerance. Default algorithm used by :func:tdfpy.get_centroided_spectrum.

tdfpy.WatershedCentroider dataclass

WatershedCentroider(
    attach_scan_half_width: int = 10,
    attach_mz_idx_half_width: int = 3,
    min_seed_intensity: float = 0.0,
    min_centroid_intensity: float = 0.0,
    smooth_scan_half_width: int = 5,
    smooth_mz_idx_half_width: int = 3,
    max_scan_from_seed: int | None = None,
    max_mz_idx_from_seed: int | None = 10,
    use_numba: bool = True,
)

Bases: Centroider

Intensity-ordered region-growing centroider in integer-index space.

Operates on (scan_number, TOF_index) integers — avoiding the floating-point binning step that :class:MergePeaksCentroider does. Each point either joins the nearest already-assigned point's group (within a rectangular tolerance box) or promotes to a new seed. See apps/ALGORITHM.md Stage 3 for the full write-up.

The optional smooth_*_half_width parameters apply a position-preserving box-mean filter to intensities before seed selection, which prevents noisy spikes from outranking the actual peak summit and stabilises seed ordering. Set both to 0 to skip.

The optional max_*_from_seed parameters are per-group "leashes": a follower is rejected if its distance from the candidate group's seed (not its nearest member) exceeds the bound on either axis. This stops a group from wandering by chaining through followers. None disables the bound on that axis.

tdfpy.centroid_peaks

centroid_peaks(
    peaks: np.ndarray, centroider: MergePeaksCentroider
) -> np.ndarray

Cluster (mz, intensity, ion_mobility) peaks into centroids.

Convenience wrapper for users who already have a converted (N, 3) array and want to skip back through a :class:RawSpectrum. Only supports :class:MergePeaksCentroider since :class:WatershedCentroider needs integer indices that aren't recoverable from float peaks.

Source code in src/tdfpy/pipeline.py
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
def centroid_peaks(
    peaks: np.ndarray, centroider: MergePeaksCentroider
) -> np.ndarray:
    """Cluster ``(mz, intensity, ion_mobility)`` peaks into centroids.

    Convenience wrapper for users who already have a converted ``(N, 3)``
    array and want to skip back through a :class:`RawSpectrum`. Only
    supports :class:`MergePeaksCentroider` since :class:`WatershedCentroider`
    needs integer indices that aren't recoverable from float peaks.
    """
    from .centroiding import merge_peaks

    if peaks.size == 0:
        return np.empty((0, 3), dtype=np.float64)
    return merge_peaks(
        peaks[:, 0],
        peaks[:, 1],
        peaks[:, 2],
        mz_tolerance=centroider.mz_tolerance,
        mz_tolerance_type=centroider.mz_tolerance_type,
        im_tolerance=centroider.im_tolerance,
        im_tolerance_type=centroider.im_tolerance_type,
        min_peaks=centroider.min_peaks,
        max_peaks=centroider.max_peaks,
        peak_noise_filter=centroider.peak_noise_filter,
        peak_noise_window=centroider.peak_noise_window,
        peak_noise_end_fraction=centroider.peak_noise_end_fraction,
        use_numba=centroider.use_numba,
    )