Skip to content

Utilities

slice_d_folder — Extracting a time range from a .d folder

slice_d_folder creates a smaller, self-contained .d folder from an existing one by keeping only a contiguous range of frames. The output is a fully valid Bruker .d folder: both the SQLite metadata (analysis.tdf) and the binary scan data (analysis.tdf_bin) are rebuilt so that downstream tools — including tdfpy's own readers — can open the result directly.

This is useful for:

  • Creating small test datasets from a large acquisition
  • Isolating a chromatographic peak or retention time window for focused analysis
  • Reducing file size before sharing or archiving

What gets filtered

The slicer keeps all frames whose Id falls within [frame_start, frame_end] (inclusive, 1-based) and removes everything else:

Table Behaviour
Frames Rows outside the range are deleted
PasefFrameMsMsInfo Rows referencing deleted frames are deleted
DiaFrameMsMsInfo Rows referencing deleted frames are deleted
PrmFrameMsMsInfo Rows referencing deleted frames are deleted
Precursors Orphaned rows (parent frame deleted) are removed
DiaFrameMsMsWindows Orphaned window groups are removed
analysis.tdf_bin Rebuilt from scratch — only kept frames' blobs are written

The TimsId offsets in the Frames table are updated to point to the correct positions in the new binary file, so the output can be opened immediately with DDA, DIA, PRM, or any Bruker-compatible tool.

Frame IDs vs retention time

frame_start and frame_end are raw frame IDs (the Id column in the Frames table), not retention times. If you need to slice by time, open the .d folder first and look up frame IDs using dda.ms1 or dia.ms1.

Basic usage

from tdfpy import slice_d_folder

out = slice_d_folder(
    source_dir="experiment.d",
    dest_dir="experiment_slice.d",
    frame_start=100,
    frame_end=300,
)
print(out)  # PosixPath('experiment_slice.d')

The destination directory is created automatically. If it already exists it is overwritten.

Slicing by retention time

Open the source file first to map retention time to frame IDs:

from tdfpy import DDA, slice_d_folder

with DDA("experiment.d") as dda:
    # Find frames within a retention time window (seconds)
    rt_min, rt_max = 600.0, 900.0  # 10 – 15 min
    frame_ids = [
        frame.frame_id
        for frame in dda.ms1
        if rt_min <= frame.time <= rt_max
    ]

first_frame = min(frame_ids)
last_frame = max(frame_ids)

slice_d_folder(
    source_dir="experiment.d",
    dest_dir="experiment_10to15min.d",
    frame_start=first_frame,
    frame_end=last_frame,
)

Opening the result

The sliced folder can be opened with any tdfpy reader exactly like the original:

from tdfpy import DDA

with DDA("experiment_slice.d") as dda:
    for frame in dda.ms1:
        peaks = frame.centroid()
        print(frame.frame_id, len(peaks))

tdfpy.slice_d_folder

slice_d_folder(
    source_dir: str | Path,
    dest_dir: str | Path,
    frame_start: int,
    frame_end: int,
) -> Path

Slice a .d folder to contain only frames in [frame_start, frame_end].

Creates a new .d folder at dest_dir with a filtered SQLite database and a rebuilt binary file containing only the kept frames' data.

Parameters:

Name Type Description Default
source_dir str | Path

Path to the source .d folder.

required
dest_dir str | Path

Path for the output .d folder (must not already exist).

required
frame_start int

First frame ID to keep (inclusive, 1-based).

required
frame_end int

Last frame ID to keep (inclusive, 1-based).

required

Returns:

Type Description
Path

The path to the created .d folder.

Source code in src/tdfpy/slicer.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def slice_d_folder(
    source_dir: str | Path,
    dest_dir: str | Path,
    frame_start: int,
    frame_end: int,
) -> Path:
    """Slice a .d folder to contain only frames in [frame_start, frame_end].

    Creates a new .d folder at ``dest_dir`` with a filtered SQLite database
    and a rebuilt binary file containing only the kept frames' data.

    Parameters
    ----------
    source_dir : str | Path
        Path to the source .d folder.
    dest_dir : str | Path
        Path for the output .d folder (must not already exist).
    frame_start : int
        First frame ID to keep (inclusive, 1-based).
    frame_end : int
        Last frame ID to keep (inclusive, 1-based).

    Returns
    -------
    Path
        The path to the created .d folder.
    """
    source_dir = Path(source_dir)
    dest_dir = Path(dest_dir)

    _validate_inputs(source_dir, dest_dir, frame_start, frame_end)

    if dest_dir.exists():
        shutil.rmtree(dest_dir)
    dest_dir.mkdir(parents=True)

    # Step 1: Copy SQLite database and read original offsets before filtering.
    src_tdf = source_dir / TDF_FILE
    dst_tdf = dest_dir / TDF_FILE
    shutil.copy2(src_tdf, dst_tdf)

    with sqlite3.connect(dst_tdf) as conn:
        # Read original offsets for frames we're keeping (before any DELETEs).
        rows = conn.execute(
            "SELECT Id, TimsId FROM Frames WHERE Id >= ? AND Id <= ? ORDER BY Id",
            (frame_start, frame_end),
        ).fetchall()

        if not rows:
            raise ValueError(
                f"No frames found in range [{frame_start}, {frame_end}]"
            )

        frame_ids = [r[0] for r in rows]
        original_offsets = [r[1] for r in rows]

        # Step 2: Rebuild binary file with only kept frames.
        src_bin = source_dir / TDF_BIN_FILE
        dst_bin = dest_dir / TDF_BIN_FILE
        new_offsets = _rebuild_binary(src_bin, dst_bin, original_offsets)

        # Step 3: Filter SQLite tables.
        _filter_sqlite(conn, frame_start, frame_end)

        # Step 4: Update offsets in Frames table.
        conn.executemany(
            "UPDATE Frames SET TimsId = ? WHERE Id = ?",
            list(zip(new_offsets, frame_ids)),
        )

    # VACUUM must run outside a transaction.
    with sqlite3.connect(dst_tdf) as conn:
        conn.execute("VACUUM")

    return dest_dir