Skip to content

simulator Module

Core logic of the detector simulation

dict_to_points(points)

Convert a point dictionary to a pointcloud array

Converts dictionary of N pad,tb keys with corresponding number of electrons to Nx3 array where each row is [pad, tb, e], now combined over pad/tb combos. Also returns a length N array which contains labels (particle index) for each point.

Parameters:

Name Type Description Default
points NumbaTypedDict[int, tuple[int, int]]

A dictionary mapping a unique pad,tb key to the number of electrons and a label

required

Returns:

Type Description
tuple[ndarray, ndarray]

Array of points and labels (in that order)

Source code in src/attpc_engine/detector/simulator.py
@njit
def dict_to_points(
    points: NumbaTypedDict[int, tuple[int, int]],
) -> tuple[np.ndarray, np.ndarray]:
    """Convert a point dictionary to a pointcloud array

    Converts dictionary of N pad,tb keys with corresponding number of electrons
    to Nx3 array where each row is [pad, tb, e], now combined over pad/tb combos.
    Also returns a length N array which contains labels (particle index) for each
    point.

    Parameters
    ----------
    points: numba.typed.Dict[int, tuple[int,int]]
        A dictionary mapping a unique pad,tb key to the number of electrons and a label

    Returns
    -------
    tuple[numpy.ndarray, numpy.ndarray]
        Array of points and labels (in that order)
    """
    point_array = np.empty((len(points), 3), dtype=float)
    label_array = np.empty(len(points), dtype=types.int64)
    for idx, (key, data) in enumerate(points.items()):
        tb, pad = unpair(key)
        point_array[idx, 0] = pad
        point_array[idx, 1] = tb
        point_array[idx, 2] = data[0]
        label_array[idx] = data[1]

    return point_array, label_array

run_simulation(config, input_path, writer, indices=None)

Run the detector simulation

Runs the AT-TPC detector simulation with the input parameters on the specified kinematic data hdf5 file generated by the kinematic simulation.

Parameters:

Name Type Description Default
config Config

The simulation configuration

required
input_path Path

Path to HDF5 file containing kinematics

required
writer SimulationWriter

An object which implements the SimulationWriter Protocol

required
indices list[int] | None

List of which nuclei to include in the detector simulation. Nuclei are specified by index of which they occur in the kinematic arrays. For example, in a simple one step reaction, a(b,c)d 0=a, 1=b, 2=c, 3=d. For two step a(b,c)d->e+f e=4, f=5, and so on. If the list is None, all final reaction products are simulated.

None
Source code in src/attpc_engine/detector/simulator.py
def run_simulation(
    config: Config,
    input_path: Path,
    writer: SimulationWriter,
    indices: list[int] | None = None,
):
    """Run the detector simulation

    Runs the AT-TPC detector simulation with the input parameters on the specified
    kinematic data hdf5 file generated by the kinematic simulation.

    Parameters
    ----------
    config: Config
        The simulation configuration
    input_path: pathlib.Path
        Path to HDF5 file containing kinematics
    writer: SimulationWriter
        An object which implements the SimulationWriter Protocol
    indices: list[int] | None
        List of which nuclei to include in the detector simulation. Nuclei are
        specified by index of which they occur in the kinematic arrays. For example,
        in a simple one step reaction, a(b,c)d 0=a, 1=b, 2=c, 3=d. For two step
        a(b,c)d->e+f e=4, f=5, and so on. If the list is None, all final reaction
        products are simulated.
    """
    print("------- AT-TPC Simulation Engine -------")
    print(f"Applying detector effects to kinematics from file: {input_path}")
    input = h5.File(input_path, "r")
    input_data_group: h5.Group = input["data"]  # type: ignore
    proton_numbers: np.ndarray = input_data_group.attrs["proton_numbers"]  # type: ignore
    mass_numbers = input_data_group.attrs["mass_numbers"]

    # Decide which nuclei to sim, either by user input or all reaction final products
    nuclei_to_sim = None
    if indices is not None:
        nuclei_to_sim = indices
    else:
        # default nuclei to sim, all final outgoing particles
        nuclei_to_sim = [idx for idx in range(2, len(proton_numbers), 2)]
        nuclei_to_sim.append(len(proton_numbers) - 1)  # add the last

    n_events: int = input_data_group.attrs["n_events"]  # type: ignore
    miniters = int(0.01 * n_events)
    n_chunks: int = input_data_group.attrs["n_chunks"]  # type: ignore
    chunk_size: int = input_data_group.attrs["chunk_size"]  # type: ignore
    print(
        f"Found {n_events} kinematics events in {n_chunks} {chunk_size} event chunks."
    )
    print(f"Output will be written to {writer.get_directory_name()}.")

    rng = default_rng()

    print("Go!")
    # ASCII art edited from https://emojicombos.com/f1-car-text-art
    print(
        r"""
            ⠀⢀⣀⣀⣀⠀⠀⠀⠀⢀⣀⣀⣀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
            ⠀⢸⣿⣿⡿⢀⣠⣴⣾⣿⣿⣿⣿⣇⡀⠀⣀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
            ⠀⢸⣿⣿⠟⢋⡙⠻⣿⣿⣿⣿⣿⣿⣿⣿⣿⣷⣶⣿⡿⠓⡐⠒⢶⣤⣄⡀⠀⠀
            ⠀⠸⠿⠇⢰⣿⣿⡆⢸⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⠀⣿⣿⡷⠈⣿⣿⣉⠁⠀
            ⠀⠀⠀⠀⠀⠈⠉⠀⠈⠉⠉⠉⠉⠉⠉⠉⠉⠉⠉⠉⠀⠈⠉⠁⠀⠈⠉⠉⠀⠀
        """
    )

    for event_number in trange(n_events, miniters=miniters):  # type: ignore
        chunk = event_number // chunk_size  # integer floor division
        dataset: h5.Dataset = input_data_group[f"chunk_{chunk}"][  # type: ignore
            f"event_{event_number}"
        ]  # type: ignore
        cloud, labels = simulate(
            dataset[:].copy(),  # type: ignore
            np.array(
                [
                    dataset.attrs["vertex_x"],
                    dataset.attrs["vertex_y"],
                    dataset.attrs["vertex_z"],
                ]
            ),
            proton_numbers,  # type: ignore
            mass_numbers,  # type: ignore
            config,
            rng,
            nuclei_to_sim,
        )

        if len(cloud) == 0:
            continue

        writer.write(cloud, labels, config, event_number)
    writer.close()
    print("Done.")
    print("----------------------------------------")

simulate(momenta, vertex, proton_numbers, mass_numbers, config, rng, indices)

Apply detector simulation to a kinematics event

Takes in the data from a sample from the kinematics phase space and applies detector effects to that sample, generating a pointcloud with labels.

Parameters:

Name Type Description Default
momenta ndarray

The 4-momenta of the nuclei in the reaction

required
vertex ndarray

The reaction vertex

required
proton_numbers ndarray

The proton number for each nucleus in the reaction

required
mass_numbers ndarray

The mass number for each nucleus in the reaction

required
config Config

The detector simulation parameters

required
rng Generator

A random number generator

required
indices list[int]

The indicies in the list of nuclei which should be simulated. Typically this would be all final products of the reaction

required

Returns:

Type Description
tuple[ndarray, ndarray]

Returns two arrays. The first is a point cloud of Nx3 shape, where N is the number of points and each point contains a pad ID, time bucket, and electrons deposited. The second is an array of length N, where each entry is the index of the nucleus which generated the point.

Source code in src/attpc_engine/detector/simulator.py
def simulate(
    momenta: np.ndarray,
    vertex: np.ndarray,
    proton_numbers: np.ndarray,
    mass_numbers: np.ndarray,
    config: Config,
    rng: Generator,
    indices: list[int],
) -> tuple[np.ndarray, np.ndarray]:
    """Apply detector simulation to a kinematics event

    Takes in the data from a sample from the kinematics phase space
    and applies detector effects to that sample, generating a pointcloud with
    labels.

    Parameters
    ----------
    momenta: numpy.ndarray
        The 4-momenta of the nuclei in the reaction
    vertex: numpy.ndarray
        The reaction vertex
    proton_numbers: numpy.ndarray
        The proton number for each nucleus in the reaction
    mass_numbers: numpy.ndarray
        The mass number for each nucleus in the reaction
    config: Config
        The detector simulation parameters
    rng: numpy.random.Generator
        A random number generator
    indices: list[int]
        The indicies in the list of nuclei which should be simulated.
        Typically this would be all final products of the reaction

    Returns
    -------
    tuple[numpy.ndarray, numpy.ndarray]
        Returns two arrays. The first is a point cloud of Nx3 shape, where N is
        the number of points and each point contains a pad ID, time bucket, and
        electrons deposited. The second is an array of length N, where each entry
        is the index of the nucleus which generated the point.
    """
    points = Dict.empty(
        key_type=types.int64, value_type=types.Tuple(types=[types.int64, types.int64])
    )
    for idx in indices:
        if proton_numbers[idx] == 0:
            continue
        nucleus = nuclear_map.get_data(proton_numbers[idx], mass_numbers[idx])
        momentum = momenta[idx]
        generate_point_cloud(momentum, vertex, nucleus, config, rng, points, idx)

    # Convert to numpy array of [pad, tb, e], now combined over pad/tb combos
    point_array, label_array = dict_to_points(points)

    # Wiggle point TBs over interval [0.0, 1.0). This simulates effect of converting
    # the (in principle) int TBs to floats.
    point_array[:, 1] += rng.uniform(low=0.0, high=1.0, size=len(point_array))

    # Remove points outside legal bounds in time. TODO check if this is needed
    mask = np.logical_and(0 <= point_array[:, 1], point_array[:, 1] < NUM_TB)
    point_array = point_array[mask]
    label_array = label_array[mask]

    return point_array, label_array