Skip to content

Module: micpy.bin

The micpy.bin module provides methods to read and write binary files.

Field

Bases: ndarray

A field.

Source code in micpy\bin.py
class Field(np.ndarray):
    """A field."""

    def __new__(cls, data, time: float, spacing: Tuple[float, float, float]):
        obj = np.asarray(data).view(cls)
        obj.time = time
        obj.spacing = spacing
        return obj

    def __array_finalize__(self, obj):
        if obj is None:
            return

        # pylint: disable=attribute-defined-outside-init
        self.time = getattr(obj, "time", None)
        self.spacing = getattr(obj, "spacing", None)

    @staticmethod
    def from_bytes(data: bytes, shape=None, spacing=None):
        """Create a new time step from bytes."""

        header = Header.from_bytes(data)

        start, end, count = (
            header.SIZE,
            header.field_size,
            header.body_length,
        )

        data = data[start:end]
        data = np.frombuffer(data, count=count, dtype="float32")

        if shape is not None:
            data = data.reshape(shape)

        return Field(data, time=header.time, spacing=spacing)

    def to_bytes(self):
        """Convert the field to bytes."""
        header = Header(
            size=self.size * self.itemsize + 8,
            time=self.time,
            length=self.size,
        )
        footer = Footer(length=self.size)

        return header.to_bytes() + self.tobytes() + footer.to_bytes()

    @staticmethod
    def dimensions(shape: Tuple[int, int, int]) -> int:
        """Get the number of dimensions of a shape."""
        x, y, _ = shape

        if y == 1:
            if x == 1:
                return 1
            return 2
        return 3

    @staticmethod
    def read(
        file: IO[bytes],
        field_index: int,
        field_size: int,
        shape=None,
        spacing=None,
    ) -> "Field":
        """Read a field from a binary file."""

        if field_index < 0:
            start = t()
            _debug("Indexing file...")
            file.seek(0, 2)
            file_size = file.tell()
            field_count = file_size // field_size
            field_index += field_count
            _debug(f"Index completed in {t() - start:.2f} seconds.")

        start = t()
        _debug(f"Seeking to field index {field_index}...")
        offset = field_size * field_index
        file.seek(offset)
        _debug(f"Seek completed in {t() - start:.2f} seconds.")

        start = t()
        _debug(f"Reading data for field index {field_index}...")
        field_data = file.read(field_size)
        if len(field_data) < field_size:
            raise EOFError("Unexpected end of file")
        _debug(f"Read completed in {t() - start:.2f} seconds.")

        start = t()
        _debug(f"Creating Field object for field index {field_index}...")
        field = Field.from_bytes(field_data, shape=shape, spacing=spacing)
        _debug(f"Field object created in {t() - start:.2f} seconds.")
        return field

    def to_file(self, file: IO[bytes], geometry: bool = True):
        """Write the field to a binary file.

        Args:
            file (IO[bytes]): Binary file.
            geometry (bool, optional): `True` if geometry should be written, `False`
                otherwise. Defaults to `True`.
        """

        file.write(self.to_bytes())

        if geometry:
            geo_filename, geo_data = geo.build(file.name, self.shape, self.spacing)
            geo.write(geo_filename, geo_data, geo.Type.BASIC)

    def write(
        self,
        filename: str,
        compressed: bool = True,
        geometry: bool = True,
    ):
        """Write the field to a binary file.

        Args:
            filename (str): Filename of the binary file.
            compressed (bool, optional): `True` if file should be compressed, `False`
                otherwise. Defaults to `True`.
            geometry (bool, optional): `True` if geometry should be written, `False`
                otherwise. Defaults to `True`.
        """

        file_open = gzip.open if compressed else open
        with file_open(filename, "wb") as file:
            self.to_file(file, geometry)

    def plot(
        self,
        axis: str = "y",
        index: int = 0,
        title: Optional[str] = None,
        xlabel: Optional[str] = None,
        ylabel: Optional[str] = None,
        figsize: Optional[Tuple[float, float]] = None,
        dpi: Optional[int] = None,
        aspect: str = "equal",
        ax: "Axes" = None,
        cax: "Axes" = None,
        vmin: Optional[float] = None,
        vmax: Optional[float] = None,
        cmap: str = "micpy",
        alpha: float = 1.0,
        interpolation: str = "none",
        extent: Optional[Tuple[float, float, float, float]] = None,
    ) -> Tuple["Figure", "Axes", "Colorbar"]:
        """Plot a slice of the field using Matplotlib.

        Args:
            axis (str, optional): Axis to plot. Possible values are `x`, `y`, and `z`.
                Defaults to `y`.
            index (int, optional): Index of the slice. Defaults to `0`.
            title (str, optional): Title of the plot. Defaults to `None`.
            xlabel (str, optional): Label of the x-axis. Defaults to `None`.
            ylabel (str, optional): Label of the y-axis. Defaults to `None`.
            figsize (Tuple[float, float], optional): Figure size. Defaults to `None`.
            dpi (int, optional): Figure DPI. Defaults to `None`.
            aspect (str, optional): Aspect ratio. Defaults to `equal`.
            ax (Axes, optional): Axes of the plot. Defaults to `None`.
            cax (Axes, optional): Axes of the color bar. Defaults to `None`.
            vmin (float, optional): Minimum value of the color bar. Defaults to `None`.
            vmax (float, optional): Maximum value of the color bar. Defaults to `None`.
            cmap (str, optional): Colormap. Defaults to `micpy`.
            alpha (float, optional): Transparency of the plot. Defaults to `1.0`.
            interpolation (str, optional): Interpolation method. Defaults to `none`.
            extent (Tuple[float, float, float, float], optional): Extent of the plot.

        Returns:
            Matplotlib figure, axes, and color bar.
        """

        if not HAS_MATPLOTLIB:
            raise ImportError("Matplotlib is not installed.")

        if self.ndim != 3:
            raise ValueError("'field' must be a 3D array")

        if axis == "z":
            x, y = "x", "y"
            slice_2d = self[index, :, :]
        elif axis == "y":
            x, y = "x", "z"
            slice_2d = self[:, index, :]
        elif axis == "x":
            x, y = "y", "z"
            slice_2d = self[:, :, index]
        else:
            raise ValueError("'axis' must be 'x', 'y', or 'z'")

        fig, ax = (
            pyplot.subplots(figsize=figsize, dpi=dpi)
            if ax is None
            else (ax.get_figure(), ax)
        )

        if title is not None:
            ax.set_title(title)
        else:
            ax.set_title(f"t={np.round(self.time, 7)}s")
        if xlabel is not None:
            ax.set_xlabel(xlabel)
        else:
            ax.set_xlabel(x)
        if ylabel is not None:
            ax.set_ylabel(ylabel)
        else:
            ax.set_ylabel(y)
        if aspect is not None:
            ax.set_aspect(aspect)
        ax.set_frame_on(False)

        image = ax.imshow(
            slice_2d,
            cmap=cmap,
            vmin=vmin,
            vmax=vmax,
            interpolation=interpolation,
            alpha=alpha,
            extent=extent,
            origin="lower",
        )

        cbar = pyplot.colorbar(image, ax=ax, cax=cax)
        cbar.locator = matplotlib.ticker.MaxNLocator(
            integer=np.issubdtype(slice_2d.dtype, np.integer)
        )
        cbar.outline.set_visible(False)
        cbar.update_ticks()

        return fig, ax, cbar

    def as_vti(
        self,
        name: str = "values",
        point_data: bool = False,
    ) -> "vtkImageData":
        """Convert the field to a VTK ImageData object.

        Args:
            name (str, optional): Name of the data array. Defaults to `values`.
            point_data (bool, optional): `True` if data should be stored as PointData,
                `False` if data should be stored as CellData. Defaults to `False`.
        Returns:
            VTK ImageData object.
        """

        if not HAS_VTK:
            raise ImportError("VTK is not installed.")

        cells = np.asarray(self)
        if cells.ndim != 3:
            raise ValueError("field must be 3D (nz, ny, nx)")

        nz, ny, nx = cells.shape
        dz, dy, dx = self.spacing

        def cell_to_point_average(c: np.ndarray) -> np.ndarray:
            nz_, ny_, nx_ = c.shape
            acc = np.zeros((nz_ + 1, ny_ + 1, nx_ + 1),
                        dtype=np.result_type(c.dtype, np.float32))
            w = np.zeros((nz_ + 1, ny_ + 1, nx_ + 1), dtype=np.float32)

            acc[0:nz_,   0:ny_,   0:nx_  ] += c
            acc[0:nz_,   0:ny_,   1:nx_+1] += c
            acc[0:nz_,   1:ny_+1, 0:nx_  ] += c
            acc[0:nz_,   1:ny_+1, 1:nx_+1] += c
            acc[1:nz_+1, 0:ny_,   0:nx_  ] += c
            acc[1:nz_+1, 0:ny_,   1:nx_+1] += c
            acc[1:nz_+1, 1:ny_+1, 0:nx_  ] += c
            acc[1:nz_+1, 1:ny_+1, 1:nx_+1] += c

            w[0:nz_,   0:ny_,   0:nx_  ] += 1
            w[0:nz_,   0:ny_,   1:nx_+1] += 1
            w[0:nz_,   1:ny_+1, 0:nx_  ] += 1
            w[0:nz_,   1:ny_+1, 1:nx_+1] += 1
            w[1:nz_+1, 0:ny_,   0:nx_  ] += 1
            w[1:nz_+1, 0:ny_,   1:nx_+1] += 1
            w[1:nz_+1, 1:ny_+1, 0:nx_  ] += 1
            w[1:nz_+1, 1:ny_+1, 1:nx_+1] += 1

            return acc / w

        if point_data:
            data = cell_to_point_average(cells)
        else:
            data = cells

        data_c = np.ascontiguousarray(data)
        flat = data_c.ravel(order="C")

        image = vtkImageData()
        image.SetOrigin(0.0, 0.0, 0.0)
        image.SetSpacing(float(dx), float(dy), float(dz))
        image.SetDimensions(nx + 1, ny + 1, nz + 1)

        vtk_arr = numpy_to_vtk(flat, deep=False)
        vtk_arr.SetName(name)

        if point_data:
            image.GetPointData().SetScalars(vtk_arr)
        else:
            image.GetCellData().SetScalars(vtk_arr)

        image._numpy_pin = data_c

        return image


    def save_vti(
        self,
        filename: str,
        name: str = "values",
        point_data: bool = False,
    ) -> str:
        """Save the field as a VTK ImageData file.

        Args:
            filename (str): Filename of the VTK ImageData file.
            name (str, optional): Name of the data array. Defaults to `values`.
            point_data (bool, optional): `True` if data should be stored as PointData
                `False` if data should be stored as CellData. Defaults to `False`.
        Returns: 
            Filename of the VTK ImageData file.
        """

        image = self.as_vti(name=name, point_data=point_data)

        filename = str(Path(filename).with_suffix(".vti"))

        writer = vtkXMLImageDataWriter()
        writer.SetFileName(filename)

        writer.SetInputData(image)
        writer.SetDataModeToAppended()
        writer.EncodeAppendedDataOff()

        ok = writer.Write()
        if ok != 1:
            raise OSError(f"Failed to write {filename}")

        return filename

as_vti(name='values', point_data=False)

Convert the field to a VTK ImageData object.

Parameters:

Name Type Description Default
name str

Name of the data array. Defaults to values.

'values'
point_data bool

True if data should be stored as PointData, False if data should be stored as CellData. Defaults to False.

False

Returns: VTK ImageData object.

Source code in micpy\bin.py
def as_vti(
    self,
    name: str = "values",
    point_data: bool = False,
) -> "vtkImageData":
    """Convert the field to a VTK ImageData object.

    Args:
        name (str, optional): Name of the data array. Defaults to `values`.
        point_data (bool, optional): `True` if data should be stored as PointData,
            `False` if data should be stored as CellData. Defaults to `False`.
    Returns:
        VTK ImageData object.
    """

    if not HAS_VTK:
        raise ImportError("VTK is not installed.")

    cells = np.asarray(self)
    if cells.ndim != 3:
        raise ValueError("field must be 3D (nz, ny, nx)")

    nz, ny, nx = cells.shape
    dz, dy, dx = self.spacing

    def cell_to_point_average(c: np.ndarray) -> np.ndarray:
        nz_, ny_, nx_ = c.shape
        acc = np.zeros((nz_ + 1, ny_ + 1, nx_ + 1),
                    dtype=np.result_type(c.dtype, np.float32))
        w = np.zeros((nz_ + 1, ny_ + 1, nx_ + 1), dtype=np.float32)

        acc[0:nz_,   0:ny_,   0:nx_  ] += c
        acc[0:nz_,   0:ny_,   1:nx_+1] += c
        acc[0:nz_,   1:ny_+1, 0:nx_  ] += c
        acc[0:nz_,   1:ny_+1, 1:nx_+1] += c
        acc[1:nz_+1, 0:ny_,   0:nx_  ] += c
        acc[1:nz_+1, 0:ny_,   1:nx_+1] += c
        acc[1:nz_+1, 1:ny_+1, 0:nx_  ] += c
        acc[1:nz_+1, 1:ny_+1, 1:nx_+1] += c

        w[0:nz_,   0:ny_,   0:nx_  ] += 1
        w[0:nz_,   0:ny_,   1:nx_+1] += 1
        w[0:nz_,   1:ny_+1, 0:nx_  ] += 1
        w[0:nz_,   1:ny_+1, 1:nx_+1] += 1
        w[1:nz_+1, 0:ny_,   0:nx_  ] += 1
        w[1:nz_+1, 0:ny_,   1:nx_+1] += 1
        w[1:nz_+1, 1:ny_+1, 0:nx_  ] += 1
        w[1:nz_+1, 1:ny_+1, 1:nx_+1] += 1

        return acc / w

    if point_data:
        data = cell_to_point_average(cells)
    else:
        data = cells

    data_c = np.ascontiguousarray(data)
    flat = data_c.ravel(order="C")

    image = vtkImageData()
    image.SetOrigin(0.0, 0.0, 0.0)
    image.SetSpacing(float(dx), float(dy), float(dz))
    image.SetDimensions(nx + 1, ny + 1, nz + 1)

    vtk_arr = numpy_to_vtk(flat, deep=False)
    vtk_arr.SetName(name)

    if point_data:
        image.GetPointData().SetScalars(vtk_arr)
    else:
        image.GetCellData().SetScalars(vtk_arr)

    image._numpy_pin = data_c

    return image

dimensions(shape) staticmethod

Get the number of dimensions of a shape.

Source code in micpy\bin.py
@staticmethod
def dimensions(shape: Tuple[int, int, int]) -> int:
    """Get the number of dimensions of a shape."""
    x, y, _ = shape

    if y == 1:
        if x == 1:
            return 1
        return 2
    return 3

from_bytes(data, shape=None, spacing=None) staticmethod

Create a new time step from bytes.

Source code in micpy\bin.py
@staticmethod
def from_bytes(data: bytes, shape=None, spacing=None):
    """Create a new time step from bytes."""

    header = Header.from_bytes(data)

    start, end, count = (
        header.SIZE,
        header.field_size,
        header.body_length,
    )

    data = data[start:end]
    data = np.frombuffer(data, count=count, dtype="float32")

    if shape is not None:
        data = data.reshape(shape)

    return Field(data, time=header.time, spacing=spacing)

plot(axis='y', index=0, title=None, xlabel=None, ylabel=None, figsize=None, dpi=None, aspect='equal', ax=None, cax=None, vmin=None, vmax=None, cmap='micpy', alpha=1.0, interpolation='none', extent=None)

Plot a slice of the field using Matplotlib.

Parameters:

Name Type Description Default
axis str

Axis to plot. Possible values are x, y, and z. Defaults to y.

'y'
index int

Index of the slice. Defaults to 0.

0
title str

Title of the plot. Defaults to None.

None
xlabel str

Label of the x-axis. Defaults to None.

None
ylabel str

Label of the y-axis. Defaults to None.

None
figsize Tuple[float, float]

Figure size. Defaults to None.

None
dpi int

Figure DPI. Defaults to None.

None
aspect str

Aspect ratio. Defaults to equal.

'equal'
ax Axes

Axes of the plot. Defaults to None.

None
cax Axes

Axes of the color bar. Defaults to None.

None
vmin float

Minimum value of the color bar. Defaults to None.

None
vmax float

Maximum value of the color bar. Defaults to None.

None
cmap str

Colormap. Defaults to micpy.

'micpy'
alpha float

Transparency of the plot. Defaults to 1.0.

1.0
interpolation str

Interpolation method. Defaults to none.

'none'
extent Tuple[float, float, float, float]

Extent of the plot.

None

Returns:

Type Description
Tuple[Figure, Axes, Colorbar]

Matplotlib figure, axes, and color bar.

Source code in micpy\bin.py
def plot(
    self,
    axis: str = "y",
    index: int = 0,
    title: Optional[str] = None,
    xlabel: Optional[str] = None,
    ylabel: Optional[str] = None,
    figsize: Optional[Tuple[float, float]] = None,
    dpi: Optional[int] = None,
    aspect: str = "equal",
    ax: "Axes" = None,
    cax: "Axes" = None,
    vmin: Optional[float] = None,
    vmax: Optional[float] = None,
    cmap: str = "micpy",
    alpha: float = 1.0,
    interpolation: str = "none",
    extent: Optional[Tuple[float, float, float, float]] = None,
) -> Tuple["Figure", "Axes", "Colorbar"]:
    """Plot a slice of the field using Matplotlib.

    Args:
        axis (str, optional): Axis to plot. Possible values are `x`, `y`, and `z`.
            Defaults to `y`.
        index (int, optional): Index of the slice. Defaults to `0`.
        title (str, optional): Title of the plot. Defaults to `None`.
        xlabel (str, optional): Label of the x-axis. Defaults to `None`.
        ylabel (str, optional): Label of the y-axis. Defaults to `None`.
        figsize (Tuple[float, float], optional): Figure size. Defaults to `None`.
        dpi (int, optional): Figure DPI. Defaults to `None`.
        aspect (str, optional): Aspect ratio. Defaults to `equal`.
        ax (Axes, optional): Axes of the plot. Defaults to `None`.
        cax (Axes, optional): Axes of the color bar. Defaults to `None`.
        vmin (float, optional): Minimum value of the color bar. Defaults to `None`.
        vmax (float, optional): Maximum value of the color bar. Defaults to `None`.
        cmap (str, optional): Colormap. Defaults to `micpy`.
        alpha (float, optional): Transparency of the plot. Defaults to `1.0`.
        interpolation (str, optional): Interpolation method. Defaults to `none`.
        extent (Tuple[float, float, float, float], optional): Extent of the plot.

    Returns:
        Matplotlib figure, axes, and color bar.
    """

    if not HAS_MATPLOTLIB:
        raise ImportError("Matplotlib is not installed.")

    if self.ndim != 3:
        raise ValueError("'field' must be a 3D array")

    if axis == "z":
        x, y = "x", "y"
        slice_2d = self[index, :, :]
    elif axis == "y":
        x, y = "x", "z"
        slice_2d = self[:, index, :]
    elif axis == "x":
        x, y = "y", "z"
        slice_2d = self[:, :, index]
    else:
        raise ValueError("'axis' must be 'x', 'y', or 'z'")

    fig, ax = (
        pyplot.subplots(figsize=figsize, dpi=dpi)
        if ax is None
        else (ax.get_figure(), ax)
    )

    if title is not None:
        ax.set_title(title)
    else:
        ax.set_title(f"t={np.round(self.time, 7)}s")
    if xlabel is not None:
        ax.set_xlabel(xlabel)
    else:
        ax.set_xlabel(x)
    if ylabel is not None:
        ax.set_ylabel(ylabel)
    else:
        ax.set_ylabel(y)
    if aspect is not None:
        ax.set_aspect(aspect)
    ax.set_frame_on(False)

    image = ax.imshow(
        slice_2d,
        cmap=cmap,
        vmin=vmin,
        vmax=vmax,
        interpolation=interpolation,
        alpha=alpha,
        extent=extent,
        origin="lower",
    )

    cbar = pyplot.colorbar(image, ax=ax, cax=cax)
    cbar.locator = matplotlib.ticker.MaxNLocator(
        integer=np.issubdtype(slice_2d.dtype, np.integer)
    )
    cbar.outline.set_visible(False)
    cbar.update_ticks()

    return fig, ax, cbar

read(file, field_index, field_size, shape=None, spacing=None) staticmethod

Read a field from a binary file.

Source code in micpy\bin.py
@staticmethod
def read(
    file: IO[bytes],
    field_index: int,
    field_size: int,
    shape=None,
    spacing=None,
) -> "Field":
    """Read a field from a binary file."""

    if field_index < 0:
        start = t()
        _debug("Indexing file...")
        file.seek(0, 2)
        file_size = file.tell()
        field_count = file_size // field_size
        field_index += field_count
        _debug(f"Index completed in {t() - start:.2f} seconds.")

    start = t()
    _debug(f"Seeking to field index {field_index}...")
    offset = field_size * field_index
    file.seek(offset)
    _debug(f"Seek completed in {t() - start:.2f} seconds.")

    start = t()
    _debug(f"Reading data for field index {field_index}...")
    field_data = file.read(field_size)
    if len(field_data) < field_size:
        raise EOFError("Unexpected end of file")
    _debug(f"Read completed in {t() - start:.2f} seconds.")

    start = t()
    _debug(f"Creating Field object for field index {field_index}...")
    field = Field.from_bytes(field_data, shape=shape, spacing=spacing)
    _debug(f"Field object created in {t() - start:.2f} seconds.")
    return field

save_vti(filename, name='values', point_data=False)

Save the field as a VTK ImageData file.

Parameters:

Name Type Description Default
filename str

Filename of the VTK ImageData file.

required
name str

Name of the data array. Defaults to values.

'values'
point_data bool

True if data should be stored as PointData False if data should be stored as CellData. Defaults to False.

False

Returns: Filename of the VTK ImageData file.

Source code in micpy\bin.py
def save_vti(
    self,
    filename: str,
    name: str = "values",
    point_data: bool = False,
) -> str:
    """Save the field as a VTK ImageData file.

    Args:
        filename (str): Filename of the VTK ImageData file.
        name (str, optional): Name of the data array. Defaults to `values`.
        point_data (bool, optional): `True` if data should be stored as PointData
            `False` if data should be stored as CellData. Defaults to `False`.
    Returns: 
        Filename of the VTK ImageData file.
    """

    image = self.as_vti(name=name, point_data=point_data)

    filename = str(Path(filename).with_suffix(".vti"))

    writer = vtkXMLImageDataWriter()
    writer.SetFileName(filename)

    writer.SetInputData(image)
    writer.SetDataModeToAppended()
    writer.EncodeAppendedDataOff()

    ok = writer.Write()
    if ok != 1:
        raise OSError(f"Failed to write {filename}")

    return filename

to_bytes()

Convert the field to bytes.

Source code in micpy\bin.py
def to_bytes(self):
    """Convert the field to bytes."""
    header = Header(
        size=self.size * self.itemsize + 8,
        time=self.time,
        length=self.size,
    )
    footer = Footer(length=self.size)

    return header.to_bytes() + self.tobytes() + footer.to_bytes()

to_file(file, geometry=True)

Write the field to a binary file.

Parameters:

Name Type Description Default
file IO[bytes]

Binary file.

required
geometry bool

True if geometry should be written, False otherwise. Defaults to True.

True
Source code in micpy\bin.py
def to_file(self, file: IO[bytes], geometry: bool = True):
    """Write the field to a binary file.

    Args:
        file (IO[bytes]): Binary file.
        geometry (bool, optional): `True` if geometry should be written, `False`
            otherwise. Defaults to `True`.
    """

    file.write(self.to_bytes())

    if geometry:
        geo_filename, geo_data = geo.build(file.name, self.shape, self.spacing)
        geo.write(geo_filename, geo_data, geo.Type.BASIC)

write(filename, compressed=True, geometry=True)

Write the field to a binary file.

Parameters:

Name Type Description Default
filename str

Filename of the binary file.

required
compressed bool

True if file should be compressed, False otherwise. Defaults to True.

True
geometry bool

True if geometry should be written, False otherwise. Defaults to True.

True
Source code in micpy\bin.py
def write(
    self,
    filename: str,
    compressed: bool = True,
    geometry: bool = True,
):
    """Write the field to a binary file.

    Args:
        filename (str): Filename of the binary file.
        compressed (bool, optional): `True` if file should be compressed, `False`
            otherwise. Defaults to `True`.
        geometry (bool, optional): `True` if geometry should be written, `False`
            otherwise. Defaults to `True`.
    """

    file_open = gzip.open if compressed else open
    with file_open(filename, "wb") as file:
        self.to_file(file, geometry)

File

A binary file.

Source code in micpy\bin.py
class File:
    """A binary file."""

    def __init__(
        self,
        filename: str,
        threads: int = None,
    ):
        self.shape: Tuple[int, int, int] = None
        self.spacing: Tuple[float, float, float] = None

        self._filename = filename
        self._threads = threads if threads is not None else min(4, os.cpu_count())
        self._file: IO[bytes] = None
        self._compressed: bool = None
        self._indexed: bool = False
        self._header: Header = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def __iter__(self) -> Iterator[Field]:
        index = 0
        while True:
            try:
                yield self._read_field(index)
                index += 1
            except (IndexError, EOFError):
                break

    def open(self) -> "File":
        """Open the file.

        Returns:
            File: The opened binary file.
        """
        self._compressed = utils.is_compressed(self._filename)

        if self._compressed:
            self._file = rapidgzip.open(self._filename, self._threads)
            start = t()
            _debug("Importing index file...")
            _try_import_index(self._file, _index_filename(self._filename))
            _debug(f"Index file imported in {t() - start:.2f} seconds.")
        else:
            self._file = builtins.open(self._filename, "rb")

        self._header = Header.read(self._file)

        try:
            geometry = geo.read(geo.find(self._filename), type=geo.Type.BASIC)
            self.shape = self.shape or geometry["shape"][::-1]
            self.spacing = self.spacing or geometry["spacing"][::-1]
        except (geo.GeometryFileNotFoundError, geo.MultipleGeometryFilesError):
            _warn("Caution: A geometry file was not found.")

        return self

    def times(self) -> list[float]:
        """Get the time steps from the binary file.

        Returns:
            list[float]: List of time steps.
        """
        if not self._file:
            self.open()

        field_size = self._header.field_size

        cur = self._file.tell()
        self._file.seek(0, 2)
        file_size = self._file.tell()
        field_count = file_size // field_size
        self._file.seek(cur)

        times: list[float] = []

        for i in range(int(field_count)):
            hdr = Header.read_at(self._file, i * field_size)
            times.append(hdr.time)

        return times

    def _read_field(self, key: int) -> Field:
        if not self._file:
            self.open()
        if isinstance(key, int):
            return Field.read(
                file=self._file,
                field_index=key,
                field_size=Header.read(self._file).field_size,
                shape=self.shape,
                spacing=self.spacing,
            )
        raise TypeError("Invalid argument type")

    def _read_series(self, key: list[int] = None) -> Series:
        if key is None:
            fields: list[Field] = list(self)
            return Series(fields)
        if isinstance(key, list):
            return Series([self._read_field(i) for i in key])
        raise TypeError("Invalid argument type")

    @overload
    def read(self, key: int, threads: int = None) -> Field: ...
    @overload
    def read(self, key: list[int], threads: int = None) -> Series: ...
    @overload
    def read(self, key: None = None, threads: int = None) -> Series: ...

    def read(self, key: Union[int, list[int]] = None, threads: int = None) -> Union[Field, Series]:
        """Read a field or series from a binary file.

        Args:
            key (Union[int, list[int]], optional): Key to a field index or a list of field
                indices. Defaults to `None`, which reads all fields.
            threads (int, optional): Number of threads to use for reading compressed files.
                Defaults to `None`, which uses up to 4 threads.
        Returns:
            Union[Field, Series]: Field or series of fields.
        """
        if key is None:
            return self._read_series()
        if isinstance(key, int):
            return self._read_field(key)
        if isinstance(key, list):
            return self._read_series(key)
        raise TypeError("Invalid argument type")

    def close(self):
        """Close the file."""
        if not self._file:
            return

        if self._compressed:
            start = t()
            _debug("Exporting index file...")
            _try_export_index(self._file, _index_filename(self._filename))
            _debug(f"Index file exported in {t() - start:.2f} seconds.")

        self._file.close()
        self._file = None

close()

Close the file.

Source code in micpy\bin.py
def close(self):
    """Close the file."""
    if not self._file:
        return

    if self._compressed:
        start = t()
        _debug("Exporting index file...")
        _try_export_index(self._file, _index_filename(self._filename))
        _debug(f"Index file exported in {t() - start:.2f} seconds.")

    self._file.close()
    self._file = None

open()

Open the file.

Returns:

Name Type Description
File File

The opened binary file.

Source code in micpy\bin.py
def open(self) -> "File":
    """Open the file.

    Returns:
        File: The opened binary file.
    """
    self._compressed = utils.is_compressed(self._filename)

    if self._compressed:
        self._file = rapidgzip.open(self._filename, self._threads)
        start = t()
        _debug("Importing index file...")
        _try_import_index(self._file, _index_filename(self._filename))
        _debug(f"Index file imported in {t() - start:.2f} seconds.")
    else:
        self._file = builtins.open(self._filename, "rb")

    self._header = Header.read(self._file)

    try:
        geometry = geo.read(geo.find(self._filename), type=geo.Type.BASIC)
        self.shape = self.shape or geometry["shape"][::-1]
        self.spacing = self.spacing or geometry["spacing"][::-1]
    except (geo.GeometryFileNotFoundError, geo.MultipleGeometryFilesError):
        _warn("Caution: A geometry file was not found.")

    return self

read(key=None, threads=None)

read(key: int, threads: int = None) -> Field
read(key: list[int], threads: int = None) -> Series
read(key: None = None, threads: int = None) -> Series

Read a field or series from a binary file.

Parameters:

Name Type Description Default
key Union[int, list[int]]

Key to a field index or a list of field indices. Defaults to None, which reads all fields.

None
threads int

Number of threads to use for reading compressed files. Defaults to None, which uses up to 4 threads.

None

Returns: Union[Field, Series]: Field or series of fields.

Source code in micpy\bin.py
def read(self, key: Union[int, list[int]] = None, threads: int = None) -> Union[Field, Series]:
    """Read a field or series from a binary file.

    Args:
        key (Union[int, list[int]], optional): Key to a field index or a list of field
            indices. Defaults to `None`, which reads all fields.
        threads (int, optional): Number of threads to use for reading compressed files.
            Defaults to `None`, which uses up to 4 threads.
    Returns:
        Union[Field, Series]: Field or series of fields.
    """
    if key is None:
        return self._read_series()
    if isinstance(key, int):
        return self._read_field(key)
    if isinstance(key, list):
        return self._read_series(key)
    raise TypeError("Invalid argument type")

times()

Get the time steps from the binary file.

Returns:

Type Description
list[float]

list[float]: List of time steps.

Source code in micpy\bin.py
def times(self) -> list[float]:
    """Get the time steps from the binary file.

    Returns:
        list[float]: List of time steps.
    """
    if not self._file:
        self.open()

    field_size = self._header.field_size

    cur = self._file.tell()
    self._file.seek(0, 2)
    file_size = self._file.tell()
    field_count = file_size // field_size
    self._file.seek(cur)

    times: list[float] = []

    for i in range(int(field_count)):
        hdr = Header.read_at(self._file, i * field_size)
        times.append(hdr.time)

    return times

Footer

A field footer.

Source code in micpy\bin.py
class Footer:
    """A field footer."""

    TYPE = [("length", np.int32)]
    SIZE = np.dtype(TYPE).itemsize

    def __init__(self, length: int = 0):
        data = np.array((length,), dtype=self.TYPE)
        self.body_length = data["length"]

    def to_bytes(self):
        """Convert the footer to bytes."""
        return np.array((self.body_length,), dtype=self.TYPE).tobytes()

to_bytes()

Convert the footer to bytes.

Source code in micpy\bin.py
def to_bytes(self):
    """Convert the footer to bytes."""
    return np.array((self.body_length,), dtype=self.TYPE).tobytes()

Header

A field header.

Source code in micpy\bin.py
class Header:
    """A field header."""

    TYPE = [("size", np.int32), ("time", np.float32), ("length", np.int32)]
    SIZE = np.dtype(TYPE).itemsize

    def __init__(self, size: int, time: float, length: int):
        self.size = size
        self.time = round(float(time), 7)
        self.body_length = length

        self.field_size = Header.SIZE + 4 * self.body_length + Footer.SIZE

        if not self.size == self.field_size - 8:
            raise ValueError("Invalid header")

    @staticmethod
    def from_bytes(data: bytes):
        """Create a new header from bytes."""
        kwargs = np.frombuffer(data[: Header.SIZE], dtype=Header.TYPE)
        return Header(*kwargs[0].item())

    def to_bytes(self):
        """Convert the header to bytes."""
        return np.array(
            (self.size, self.time, self.body_length), dtype=Header.TYPE
        ).tobytes()

    @staticmethod
    def read(file: IO[bytes]) -> "Header":
        """Read the header of a binary file."""
        file.seek(0)
        data = file.read(Header.SIZE)
        return Header.from_bytes(data)

    @staticmethod
    def read_at(file: IO[bytes], offset: int) -> "Header":
        file.seek(offset)
        data = file.read(Header.SIZE)
        if len(data) < Header.SIZE:
            raise EOFError("Unexpected end of file")
        return Header.from_bytes(data)


    def get_field_count(self, file_size: int) -> int:
        """Get the number of fields in the file."""
        return file_size // self.field_size

from_bytes(data) staticmethod

Create a new header from bytes.

Source code in micpy\bin.py
@staticmethod
def from_bytes(data: bytes):
    """Create a new header from bytes."""
    kwargs = np.frombuffer(data[: Header.SIZE], dtype=Header.TYPE)
    return Header(*kwargs[0].item())

get_field_count(file_size)

Get the number of fields in the file.

Source code in micpy\bin.py
def get_field_count(self, file_size: int) -> int:
    """Get the number of fields in the file."""
    return file_size // self.field_size

read(file) staticmethod

Read the header of a binary file.

Source code in micpy\bin.py
@staticmethod
def read(file: IO[bytes]) -> "Header":
    """Read the header of a binary file."""
    file.seek(0)
    data = file.read(Header.SIZE)
    return Header.from_bytes(data)

to_bytes()

Convert the header to bytes.

Source code in micpy\bin.py
def to_bytes(self):
    """Convert the header to bytes."""
    return np.array(
        (self.size, self.time, self.body_length), dtype=Header.TYPE
    ).tobytes()

Series

Bases: ndarray

Source code in micpy\bin.py
class Series(np.ndarray):
    def __new__(cls, fields: List[Field]):
        obj = np.asarray(fields).view(cls)
        obj.times = [field.time for field in fields]
        obj.spacings = [field.spacing for field in fields]
        return obj

    def __array_finalize__(self, obj):
        if obj is None:
            return

        # pylint: disable=attribute-defined-outside-init
        self.times = getattr(obj, "times", None)
        self.spacings = getattr(obj, "spacings", None)

    def field(self, index: int) -> Field:
        """Get a field from the series.

        Args:
            index (int): Index of the field.

        Returns:
            Field.
        """
        return Field(self[index], self.times[index], self.spacings[index])

    def series(self, key: Union[int, slice, list]) -> "Series":
        """Get a series of fields.

        Args:
            key (Union[int, slice, list]): Key to list of field indices, a slice object, or a
                list of field indices.

        Returns:
            Series of fields.
        """
        if isinstance(key, int):
            return Series([self.field(key)])
        if isinstance(key, slice):
            return Series([self.field(i) for i in range(*key.indices(len(self)))])
        if isinstance(key, list):
            return Series([self.field(i) for i in key])
        raise TypeError("Invalid argument type")

    def write(self, filename: str, compressed: bool = True, geometry: bool = True):
        """Write the series to a binary file.

        Args:
            filename (str): Filename of the binary file.
            compressed (bool, optional): `True` if file should be compressed, `False`
                otherwise. Defaults to `True`.
            geometry (bool, optional): `True` if geometry should be written, `False`
                otherwise. Defaults to `True`.
        """

        file_open = gzip.open if compressed else open
        with file_open(filename, "wb") as file:
            for item, time, spacing in zip(self, self.times, self.spacings):
                Field(item, time, spacing).to_file(file, geometry)
                geometry = False

field(index)

Get a field from the series.

Parameters:

Name Type Description Default
index int

Index of the field.

required

Returns:

Type Description
Field

Field.

Source code in micpy\bin.py
def field(self, index: int) -> Field:
    """Get a field from the series.

    Args:
        index (int): Index of the field.

    Returns:
        Field.
    """
    return Field(self[index], self.times[index], self.spacings[index])

series(key)

Get a series of fields.

Parameters:

Name Type Description Default
key Union[int, slice, list]

Key to list of field indices, a slice object, or a list of field indices.

required

Returns:

Type Description
Series

Series of fields.

Source code in micpy\bin.py
def series(self, key: Union[int, slice, list]) -> "Series":
    """Get a series of fields.

    Args:
        key (Union[int, slice, list]): Key to list of field indices, a slice object, or a
            list of field indices.

    Returns:
        Series of fields.
    """
    if isinstance(key, int):
        return Series([self.field(key)])
    if isinstance(key, slice):
        return Series([self.field(i) for i in range(*key.indices(len(self)))])
    if isinstance(key, list):
        return Series([self.field(i) for i in key])
    raise TypeError("Invalid argument type")

write(filename, compressed=True, geometry=True)

Write the series to a binary file.

Parameters:

Name Type Description Default
filename str

Filename of the binary file.

required
compressed bool

True if file should be compressed, False otherwise. Defaults to True.

True
geometry bool

True if geometry should be written, False otherwise. Defaults to True.

True
Source code in micpy\bin.py
def write(self, filename: str, compressed: bool = True, geometry: bool = True):
    """Write the series to a binary file.

    Args:
        filename (str): Filename of the binary file.
        compressed (bool, optional): `True` if file should be compressed, `False`
            otherwise. Defaults to `True`.
        geometry (bool, optional): `True` if geometry should be written, `False`
            otherwise. Defaults to `True`.
    """

    file_open = gzip.open if compressed else open
    with file_open(filename, "wb") as file:
        for item, time, spacing in zip(self, self.times, self.spacings):
            Field(item, time, spacing).to_file(file, geometry)
            geometry = False

open(filename, threads=None)

Open a binary file.

Parameters:

Name Type Description Default
filename str

Filename of the binary file.

required
threads int

Number of threads to use for reading compressed files. Defaults to None, which uses up to 4 threads.

None

Returns: File: Binary file.

Source code in micpy\bin.py
def open(filename: str, threads: int = None) -> File:
    """Open a binary file.

    Args:
        filename (str): Filename of the binary file.
        threads (int, optional): Number of threads to use for reading compressed files.
            Defaults to `None`, which uses up to 4 threads.
    Returns:
        File: Binary file.
    """
    return File(filename, threads).open()

pv_plugin_path()

Get the path to the ParaView plugin directory for MicPy.

Returns:

Name Type Description
str str

Path to the ParaView plugin directory for MicPy.

Source code in micpy\bin.py
def pv_plugin_path() -> str:
    """Get the path to the ParaView plugin directory for MicPy.

    Returns:
        str: Path to the ParaView plugin directory for MicPy.
    """
    return str(Path(__file__).parent / "paraview")

read(filename, key=None, threads=None)

read(filename: str, key: int, threads: int = None) -> Field
read(
    filename: str, key: list[int], threads: int = None
) -> Series
read(
    filename: str, key: None = None, threads: int = None
) -> Series

Read a field or series from a binary file.

Parameters:

Name Type Description Default
filename str

Filename of the binary file.

required
key Union[int, list[int]]

Key to a field index or a list of field indices. Defaults to None, which reads all fields.

None
threads int

Number of threads to use for reading compressed files. Defaults to None, which uses up to 4 threads.

None

Returns: Union[Field, Series]: Field or series of fields.

Source code in micpy\bin.py
def read(filename: str, key: Union[int, list[int]] = None, threads: int = None) -> Union[Field, Series]:
    """Read a field or series from a binary file.

    Args:
        filename (str): Filename of the binary file.
        key (Union[int, list[int]], optional): Key to a field index or a list of field
            indices. Defaults to `None`, which reads all fields.
        threads (int, optional): Number of threads to use for reading compressed files.
            Defaults to `None`, which uses up to 4 threads.
    Returns:
        Union[Field, Series]: Field or series of fields.
    """
    with open(filename, threads) as file:
        return file.read(key)

times(filename, threads=None)

Get the time steps from a binary file.

Parameters:

Name Type Description Default
filename str

Filename of the binary file.

required
threads int

Number of threads to use for reading compressed files. Defaults to None, which uses up to 4 threads.

None

Returns: list[float]: List of time steps.

Source code in micpy\bin.py
def times(filename: str, threads: int = None) -> list[float]:
    """Get the time steps from a binary file.

    Args:
        filename (str): Filename of the binary file.
        threads (int, optional): Number of threads to use for reading compressed files.
            Defaults to `None`, which uses up to 4 threads.
    Returns:
        list[float]: List of time steps.
    """

    with open(filename, threads) as file:
        return file.times()