Skip to content

Readers

High-level entry points for opening timsTOF .d acquisitions.

tdfpy.get_acquisition_type

get_acquisition_type(
    analysis_dir: str,
) -> Literal["DDA", "DIA", "PRM", "Unknown"]

Determine the acquisition type (DDA or DIA) of a .d folder by examining the MsMsType values in the Frames table.

Parameters:

Name Type Description Default
analysis_dir str

Path to the .d folder

required

Returns:

Type Description
Literal['DDA', 'DIA', 'PRM', 'Unknown']

"DDA" if DDA acquisition detected

Literal['DDA', 'DIA', 'PRM', 'Unknown']

"DIA" if DIA acquisition detected

Literal['DDA', 'DIA', 'PRM', 'Unknown']

"PRM" if PRM acquisition detected

Literal['DDA', 'DIA', 'PRM', 'Unknown']

"Unknown" if type cannot be determined

Raises:

Type Description
FileNotFoundError

If analysis.tdf does not exist

Source code in src/tdfpy/reader.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_acquisition_type(analysis_dir: str) -> Literal["DDA", "DIA", "PRM", "Unknown"]:
    """
    Determine the acquisition type (DDA or DIA) of a .d folder by examining
    the MsMsType values in the Frames table.

    Args:
        analysis_dir: Path to the .d folder

    Returns:
        "DDA" if DDA acquisition detected
        "DIA" if DIA acquisition detected
        "PRM" if PRM acquisition detected
        "Unknown" if type cannot be determined

    Raises:
        FileNotFoundError: If analysis.tdf does not exist
    """
    analysis_tdf_path = Path(analysis_dir) / "analysis.tdf"
    if not analysis_tdf_path.exists():
        raise FileNotFoundError(f"analysis.tdf not found at {analysis_tdf_path}")

    pandas_tdf = PandasTdf(str(analysis_tdf_path))
    frames_df = pandas_tdf.frames

    # Get unique MsMsType values
    msms_types = set(frames_df["MsMsType"].unique())

    # Check for DDA (MS2 type 8)
    if MsMsType.DDA_MS2.value in msms_types:
        return "DDA"

    # Check for DIA (MS2 type 9)
    if MsMsType.DIA_MS2.value in msms_types:
        return "DIA"

    # PRM shows as MsMsType 2
    if 2 in msms_types:
        return "PRM"

    return "Unknown"

tdfpy.DDA

DDA(analysis_dir: str)

Bases: _DFolder

Open a DDA (Data-Dependent Acquisition) .d folder.

Use as a context manager to ensure the TimsData connection is closed when done. Exposes MS1 frames via ms1 and precursors via precursors.

Parameters:

Name Type Description Default
analysis_dir str

Path to the .d folder containing analysis.tdf and analysis.tdf_bin.

required

Raises:

Type Description
FileNotFoundError

If the .d folder or required files are missing.

Example
with DDA("/path/to/data.d") as dda:
    for frame in dda.ms1:
        print(frame.frame_id, frame.time)
Source code in src/tdfpy/reader.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def __init__(self, analysis_dir: str):
    super().__init__(analysis_dir)

    self._precursor_df = PandasTdf(str(self.analysis_tdf_path)).precursors
    self._frames_df = PandasTdf(str(self.analysis_tdf_path)).frames
    self._pasef_frame_msms_info_df = PandasTdf(
        str(self.analysis_tdf_path)
    ).pasef_frame_msms_info

    frame_id_to_rt = {}
    for _, row in self._frames_df.iterrows():
        frame_id = int(row["Id"])
        time = float(row["Time"])
        frame_id_to_rt[frame_id] = time

    frame_to_polarity: dict[int, str] = {}
    for _, row in self._frames_df.iterrows():
        frame_id = int(row["Id"])
        polarity = str(row["Polarity"])
        frame_to_polarity[frame_id] = polarity

    self._pasef_msms_infos: dict[int, list[PasefFrameMsmsInfo]] = {}
    for _, row in self._pasef_frame_msms_info_df.iterrows():
        frame_id = int(row["Frame"])
        polarity = frame_to_polarity[frame_id]
        pasef_info = PasefFrameMsmsInfo(
            _timsdata=self.timsdata,
            frame_id=frame_id,
            scan_num_begin=int(row["ScanNumBegin"]),
            scan_num_end=int(row["ScanNumEnd"]),
            isolation_mz=float(row["IsolationMz"]),
            isolation_width=float(row["IsolationWidth"]),
            collision_energy=float(row["CollisionEnergy"]),
            precursor=int(row["Precursor"])
            if not pd.isna(row["Precursor"])
            else None,
            rt=frame_id_to_rt[frame_id],
            polarity=Polarity.from_str(polarity),
        )

        if pasef_info.precursor is None:
            raise ValueError(
                f"PASEF MS/MS info with null precursor found for frame {pasef_info.frame_id}. All PASEF MS/MS info must have a valid precursor ID."
            )

        if pasef_info.precursor not in self._pasef_msms_infos:
            self._pasef_msms_infos[pasef_info.precursor] = []
        self._pasef_msms_infos[pasef_info.precursor].append(pasef_info)

    self._precursors: dict[int, Precursor] = {}
    self._frame_to_precursors: dict[int, list[Precursor]] = {}
    for _, row in self._precursor_df.iterrows():
        precursor_id = int(row["Id"])
        frame_id = int(row["Parent"])

        precursor = Precursor(
            _timsdata=self.timsdata,
            precursor_id=precursor_id,
            largest_peak_mz=float(row["LargestPeakMz"]),
            average_mz=float(row["AverageMz"]),
            monoisotopic_mz=float(row["MonoisotopicMz"])
            if not pd.isna(row["MonoisotopicMz"])
            else None,
            charge=int(row["Charge"]) if not pd.isna(row["Charge"]) else None,
            scan_number=int(row["ScanNumber"]),
            intensity=float(row["Intensity"]),
            parent_frame=int(row["Parent"]),
            pasef_frame_msms_infos=tuple(
                self._pasef_msms_infos.get(precursor_id, [])
            ),
            rt=frame_id_to_rt[frame_id],
        )
        self._precursors[precursor_id] = precursor
        if frame_id not in self._frame_to_precursors:
            self._frame_to_precursors[frame_id] = []
        self._frame_to_precursors[frame_id].append(precursor)

    self._precursor_lookup = PrecursorLookup(self._precursors)

    self._frames: dict[int, Frame] = {}
    self._ms1_frames: dict[int, DDAMs1Frame] = {}
    for _, row in self._frames_df.iterrows():
        frame_id = int(row["Id"])
        msms_type = int(row["MsMsType"])
        if msms_type == MsMsType.MS1.value:
            precursors_for_frame: list[Precursor] = self._frame_to_precursors.get(
                frame_id, []
            )
            frame = DDAMs1Frame(
                _timsdata=self.timsdata,
                frame_id=frame_id,
                time=float(row["Time"]),
                polarity=Polarity.from_str(str(row["Polarity"])),
                scan_mode=int(row["ScanMode"]),
                msms_type=msms_type,
                tims_id=int(row["TimsId"]),
                max_intensity=int(row["MaxIntensity"]),
                summed_intensities=int(row["SummedIntensities"]),
                num_scans=int(row["NumScans"]),
                num_peaks=int(row["NumPeaks"]),
                mz_calibration=int(row["MzCalibration"]),
                t1=float(row["T1"]),
                t2=float(row["T2"]),
                tims_calibration=int(row["TimsCalibration"]),
                property_group=int(row["PropertyGroup"]),
                accumulation_time=float(row["AccumulationTime"]),
                ramp_time=float(row["RampTime"]),
                precursors=tuple(precursors_for_frame),
            )
            self._ms1_frames[frame_id] = frame
            self._frames[frame_id] = frame
        elif msms_type == MsMsType.DDA_MS2.value:
            pass
        else:
            raise ValueError(f"Unknown MsMsType {msms_type} for frame {frame_id}")

    self._ms1_frames_lookup = Ms1FrameLookup(self._ms1_frames)

    # remove uneeded dataframes to save memory
    del self._precursor_df
    del self._frames_df
    del self._pasef_frame_msms_info_df

metadata property

metadata: MetaData

Global metadata about the acquisition.

calibration property

calibration: Calibration

Calibration information.

ms1 property

Lookup for MS1 frames. Supports indexing by frame ID and .query().

precursors property

precursors: PrecursorLookup

Lookup for all precursors. Supports indexing by precursor ID and .query().

close

close() -> None

Close the TimsData connection.

Source code in src/tdfpy/reader.py
141
142
143
144
145
def close(self) -> None:
    """Close the TimsData connection."""
    if not self._closed:
        self.timsdata.close()
        self._closed = True

tdfpy.DIA

DIA(analysis_dir: str)

Bases: _DFolder

Open a DIA (Data-Independent Acquisition) .d folder.

Use as a context manager to ensure the TimsData connection is closed when done. Exposes MS1 frames via ms1, individual windows via windows, and window groups via window_groups.

Parameters:

Name Type Description Default
analysis_dir str

Path to the .d folder containing analysis.tdf and analysis.tdf_bin.

required

Raises:

Type Description
FileNotFoundError

If the .d folder or required files are missing.

Example
with DIA("/path/to/data.d") as dia:
    for group in dia.window_groups:
        for window in group.windows:
            print(window.isolation_mz)
Source code in src/tdfpy/reader.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def __init__(self, analysis_dir: str):
    super().__init__(analysis_dir)

    # frames
    self._frames_df = PandasTdf(str(self.analysis_tdf_path)).frames

    frame_id_to_rt = {}
    for _, row in self._frames_df.iterrows():
        frame_id = int(row["Id"])
        time = float(row["Time"])
        frame_id_to_rt[frame_id] = time

    frame_id_to_polarity = {}
    for _, row in self._frames_df.iterrows():
        frame_id = int(row["Id"])
        polarity = str(row["Polarity"])
        frame_id_to_polarity[frame_id] = polarity

    # window groups
    self._dia_frame_msms_windows_df = PandasTdf(
        str(self.analysis_tdf_path)
    ).dia_frame_msms_windows

    # frame to window groups
    self._dia_frame_msms_info = PandasTdf(
        str(self.analysis_tdf_path)
    ).dia_frame_msms_info

    self._dia_window_groups: dict[int, list[DiaWindowGroup]] = {}
    for key, row in self._dia_frame_msms_windows_df.iterrows():
        window_id = int(key)  # type: ignore
        window = DiaWindowGroup(
            window_index=window_id,
            window_group=int(row["WindowGroup"]),
            scan_num_begin=int(row["ScanNumBegin"]),
            scan_num_end=int(row["ScanNumEnd"]),
            isolation_mz=float(row["IsolationMz"]),
            isolation_width=float(row["IsolationWidth"]),
            collision_energy=float(row["CollisionEnergy"]),
        )
        if window.window_group not in self._dia_window_groups:
            self._dia_window_groups[window.window_group] = []
        self._dia_window_groups[window.window_group].append(window)

    # now we need to create DiaWindow objects which have an additional frame_id
    # create dia windows
    self._dia_windows: dict[int, list[DiaWindow]] = {}
    self._all_dia_windows: list[DiaWindow] = []
    for _, row in self._dia_frame_msms_info.iterrows():
        frame_id = int(row["Frame"])
        window_group_id = int(row["WindowGroup"])
        # each frame can have multiple window groups
        window_groups = self._dia_window_groups.get(window_group_id, [])
        if frame_id not in self._dia_windows:
            self._dia_windows[frame_id] = []
        for window_group in window_groups:
            dia_window = DiaWindow(
                _timsdata=self.timsdata,
                frame_id=frame_id,
                window_index=window_group.window_index,
                window_group=window_group.window_group,
                scan_num_begin=window_group.scan_num_begin,
                scan_num_end=window_group.scan_num_end,
                isolation_mz=window_group.isolation_mz,
                isolation_width=window_group.isolation_width,
                collision_energy=window_group.collision_energy,
                rt=frame_id_to_rt[frame_id],
                polarity=Polarity.from_str(frame_id_to_polarity[frame_id]),
            )
            self._dia_windows[frame_id].append(dia_window)
            self._all_dia_windows.append(dia_window)

    self._dia_windows_lookup = DiaWindowLookup(self._all_dia_windows)

    self._frames: dict[int, Frame] = {}
    self._ms1_frames: dict[int, DIAMs1Frame] = {}
    for _, row in self._frames_df.iterrows():
        frame_id = int(row["Id"])
        msms_type = int(row["MsMsType"])
        if msms_type == MsMsType.MS1.value:
            frame = DIAMs1Frame(
                _timsdata=self.timsdata,
                frame_id=frame_id,
                time=float(row["Time"]),
                polarity=Polarity.from_str(str(row["Polarity"])),
                scan_mode=int(row["ScanMode"]),
                msms_type=msms_type,
                tims_id=int(row["TimsId"]) if not pd.isna(row["TimsId"]) else None,
                max_intensity=int(row["MaxIntensity"]),
                summed_intensities=int(row["SummedIntensities"]),
                num_scans=int(row["NumScans"]),
                num_peaks=int(row["NumPeaks"]),
                mz_calibration=int(row["MzCalibration"]),
                t1=int(row["T1"]),
                t2=int(row["T2"]),
                tims_calibration=int(row["TimsCalibration"]),
                property_group=int(row["PropertyGroup"])
                if not pd.isna(row["PropertyGroup"])
                else None,
                accumulation_time=float(row["AccumulationTime"]),
                ramp_time=float(row["RampTime"]),
                dia_windows=tuple(self._dia_windows.get(frame_id, [])),
            )
            self._ms1_frames[frame_id] = frame
            self._frames[frame_id] = frame
        elif msms_type == MsMsType.DIA_MS2.value:
            pass
        else:
            raise ValueError(f"Unknown MsMsType {msms_type} for frame {frame_id}")

    self._ms1_frames_lookup = Ms1FrameLookup(self._ms1_frames)

    # remove uneeded dataframes to save memory
    del self._frames_df
    del self._dia_frame_msms_windows_df
    del self._dia_frame_msms_info

metadata property

metadata: MetaData

Global metadata about the acquisition.

calibration property

calibration: Calibration

Calibration information.

ms1 property

Lookup for MS1 frames. Supports indexing by frame ID and .query().

windows property

windows: DiaWindowLookup

Lookup for all DIA windows. Supports indexing by window index and .query().

window_groups property

window_groups: Generator[DiaWindowGroup, None, None]

Iterate over all DiaWindowGroup objects across all window groups.

close

close() -> None

Close the TimsData connection.

Source code in src/tdfpy/reader.py
141
142
143
144
145
def close(self) -> None:
    """Close the TimsData connection."""
    if not self._closed:
        self.timsdata.close()
        self._closed = True

tdfpy.PRM

PRM(analysis_dir: str)

Bases: _DFolder

PRM (Parallel Reaction Monitoring) acquisition mode.

Note

Not yet implemented. Raises NotImplementedError on instantiation.

Source code in src/tdfpy/reader.py
482
483
def __init__(self, analysis_dir: str):
    raise NotImplementedError("PRM acquisition mode is not yet supported.")

metadata property

metadata: MetaData

Global metadata about the acquisition.

calibration property

calibration: Calibration

Calibration information.

close

close() -> None

Close the TimsData connection.

Source code in src/tdfpy/reader.py
141
142
143
144
145
def close(self) -> None:
    """Close the TimsData connection."""
    if not self._closed:
        self.timsdata.close()
        self._closed = True