Source code for geoh5py.io.h5_writer

#  Copyright (c) 2024 Mira Geoscience Ltd.
#
#  This file is part of geoh5py.
#
#  geoh5py is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  geoh5py is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with geoh5py.  If not, see <https://www.gnu.org/licenses/>.

# pylint: disable=R0904

from __future__ import annotations

import json
import uuid
from copy import deepcopy
from typing import TYPE_CHECKING

import h5py
import numpy as np

from ..data import (
    BooleanData,
    CommentsData,
    Data,
    DataType,
    FilenameData,
    IntegerData,
    TextData,
)
from ..groups import Group, GroupType, PropertyGroup, RootGroup
from ..objects import ObjectBase, ObjectType
from ..shared import FLOAT_NDV, Entity, EntityType, fetch_h5_handle
from ..shared.concatenation import Concatenator
from ..shared.utils import KEY_MAP, as_str_if_uuid, dict_mapper

if TYPE_CHECKING:
    from .. import shared, workspace


[docs] class H5Writer: """ Writing class to a geoh5 file. """ str_type = h5py.special_dtype(vlen=str)
[docs] @classmethod def init_geoh5( cls, file: str | h5py.File, workspace: workspace.Workspace, ): """ Add the geoh5 core structure. :param file: Name or handle to a geoh5 file. :param workspace: :obj:`~geoh5py.workspace.workspace.Workspace` object defining the project structure. :return h5file: Pointer to a geoh5 file. """ with fetch_h5_handle(file, mode="r+") as h5file: project = h5file.create_group(workspace.name) cls.write_attributes(h5file, workspace) project.create_group("Data") project.create_group("Groups") project.create_group("Objects") types = project.create_group("Types") types.create_group("Data types") types.create_group("Group types") types.create_group("Object types")
[docs] @classmethod def create_dataset(cls, entity_handle, dataset: np.ndarray, label: str) -> None: """ Create a dataset on geoh5. :param entity_handle: Pointer to a hdf5 group :param dataset: Array of values to be written :param label: Name of the dataset on file """ entity_handle.create_dataset( label, data=dataset, dtype=dataset.dtype, compression="gzip", compression_opts=9, )
[docs] @staticmethod def remove_child( file: str | h5py.File, uid: uuid.UUID, ref_type: str, parent: Entity, ) -> None: """ Remove a child from a parent. :param file: Name or handle to a geoh5 file :param uid: uuid of the target :obj:`~geoh5py.shared.entity.Entity` :param ref_type: Input type from: 'Types', 'Groups', 'Objects' or 'Data :param parent: Remove entity from parent. """ with fetch_h5_handle(file, mode="r+") as h5file: uid_str = as_str_if_uuid(uid) parent_handle = H5Writer.fetch_handle(h5file, parent) if parent_handle is None or parent_handle.get(ref_type) is None: return if uid_str in parent_handle[ref_type]: del parent_handle[ref_type][uid_str] parent.workspace.repack = True
[docs] @staticmethod def remove_entity( file: str | h5py.File, uid: uuid.UUID, ref_type: str, parent: Entity | None = None, ) -> None: """ Remove an entity and its type from the target geoh5 file. :param file: Name or handle to a geoh5 file :param uid: uuid of the target :obj:`~geoh5py.shared.entity.Entity` :param ref_type: Input type from: 'Types', 'Groups', 'Objects' or 'Data :param parent: Remove entity from parent. """ with fetch_h5_handle(file, mode="r+") as h5file: base = list(h5file)[0] base_type_handle = h5file[base][ref_type] uid_str = as_str_if_uuid(uid) if ref_type == "Types": for e_type in ["Data types", "Group types", "Object types"]: if uid_str in base_type_handle[e_type]: del base_type_handle[e_type][uid_str] else: if uid_str in base_type_handle: del base_type_handle[uid_str] if parent is not None: H5Writer.remove_child(h5file, uid, ref_type, parent)
[docs] @classmethod def fetch_handle( cls, file: str | h5py.File, entity, return_parent: bool = False, ) -> None | h5py.Group: """ Get a pointer to an :obj:`~geoh5py.shared.entity.Entity` in geoh5. :param file: Name or handle to a geoh5 file :param entity: Target :obj:`~geoh5py.shared.entity.Entity` :param return_parent: Option to return the handle to the parent entity. :return entity_handle: HDF5 pointer to an existing entity, parent or None if not found. """ with fetch_h5_handle(file, mode="r+") as h5file: base = list(h5file)[0] base_handle = h5file[base] if entity.name == base: return base_handle uid = entity.uid hierarchy = { Data: "Data", ObjectBase: "Objects", Group: "Groups", DataType: "Data types", ObjectType: "Object types", GroupType: "Group types", } if isinstance(entity, EntityType): try: base_handle = base_handle["Types"] except KeyError: base_handle = base_handle.create_group("Types") for key, value in hierarchy.items(): if isinstance(entity, key): try: base_handle = base_handle[value] except KeyError: base_handle = base_handle.create_group(value) break # Check if already in the project if as_str_if_uuid(uid) in base_handle: if return_parent: return base_handle return base_handle[as_str_if_uuid(uid)] return None
[docs] @classmethod def save_entity( cls, file: str | h5py.File, entity, compression: int = 5, add_children: bool = True, ) -> h5py.Group: """ Save a :obj:`~geoh5py.shared.entity.Entity` to geoh5 with its :obj:`~geoh5py.shared.entity.Entity.children` recursively. :param file: Name or handle to a geoh5 file. :param entity: Target :obj:`~geoh5py.shared.entity.Entity`. :param compression: Compression level for the data. :param add_children: Add :obj:`~geoh5py.shared.entity.Entity.children`. """ with fetch_h5_handle(file, mode="r+") as h5file: new_entity = H5Writer.write_entity(h5file, entity, compression) if add_children and not isinstance(entity, Concatenator): # Write children entities and add to current parent for child in entity.children: if not isinstance(child, PropertyGroup): H5Writer.save_entity(h5file, child, compression) H5Writer.write_to_parent( h5file, entity, compression=compression, recursively=False ) return new_entity
[docs] @classmethod def update_concatenated_field( cls, file: str | h5py.File, entity, attribute: str, channel: str ) -> None: """ Update the attributes of a concatenated :obj:`~geoh5py.shared.entity.Entity`. :param file: Name or handle to a geoh5 file. :param entity: Target :obj:`~geoh5py.shared.entity.Entity`. :param attribute: Name of the attribute to get updated. :param channel: Name of the data or index to be modified. """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return attr_handle = entity_handle["Concatenated Data"].get(attribute.capitalize()) if attr_handle is None: attr_handle = entity_handle["Concatenated Data"].create_group( attribute.capitalize() ) name = channel.replace("/", "\u2044") try: del attr_handle[name] entity.workspace.repack = True except KeyError: pass dict_values = getattr(entity, attribute) if channel in dict_values: values = dict_values[channel].copy() if isinstance(values, np.ndarray) and values.dtype in ( np.float64, np.float32, ): values[np.isnan(values)] = FLOAT_NDV values = values.astype(np.float32) attr_handle.create_dataset( name, data=values, compression="gzip", compression_opts=9, )
[docs] @classmethod def update_field( cls, file: str | h5py.File, entity, attribute: str, compression: int = 5, **kwargs, ) -> None: """ Update the attributes of an :obj:`~geoh5py.shared.entity.Entity`. :param file: Name or handle to a geoh5 file. :param entity: Target :obj:`~geoh5py.shared.entity.Entity`. :param attribute: Name of the attribute to get updated. :param compression: Compression level for the data. """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return if attribute in [ "concatenated_attributes", "metadata", "options", "trace_depth", "values", ]: cls.write_data_values(h5file, entity, attribute, compression, **kwargs) elif attribute in [ "cells", "concatenated_object_ids", "layers", "octree_cells", "property_group_ids", "prisms", "surveys", "trace", "u_cell_delimiters", "v_cell_delimiters", "vertices", "z_cell_delimiters", ]: cls.write_array_attribute(h5file, entity, attribute, **kwargs) elif attribute == "property_groups": cls.write_property_groups(h5file, entity) elif attribute == "color_map": cls.write_color_map(h5file, entity) elif attribute == "value_map": cls.write_value_map(h5file, entity) elif attribute == "entity_type": del entity_handle["Type"] entity.workspace.repack = True new_type = H5Writer.write_entity_type(h5file, entity.entity_type) entity_handle["Type"] = new_type else: cls.write_attributes(h5file, entity)
[docs] @classmethod def write_attributes( cls, file: str | h5py.File, entity, ) -> None: """ Write attributes of an :obj:`~geoh5py.shared.entity.Entity`. :param file: Name or handle to a geoh5 file. :param entity: Entity with attributes to be added to the geoh5 file. """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return for key, attr in entity.attribute_map.items(): try: value = getattr(entity, attr) except AttributeError: continue value = as_str_if_uuid(value) if ( key in [ "PropertyGroups", "Attributes", "Attributes Jsons", "Property Groups IDs", "Concatenated object IDs", ] or value is None ): # or key in Concatenator._attribute_map: continue if key in ["Association", "Primitive type"]: value = KEY_MAP[value.name] if isinstance(value, (np.int8, bool)): entity_handle.attrs.create(key, int(value), dtype="int8") elif isinstance(value, str): entity_handle.attrs.create(key, value, dtype=cls.str_type) else: entity_handle.attrs.create( key, value, dtype=np.asarray(value).dtype )
[docs] @classmethod def write_color_map( cls, file: str | h5py.File, entity_type: shared.EntityType, ) -> None: """ Add :obj:`~geoh5py.data.color_map.ColorMap` to a :obj:`~geoh5py.data.data_type.DataType`. :param file: Name or handle to a geoh5 file :param entity_type: Target entity_type with color_map """ with fetch_h5_handle(file, mode="r+") as h5file: color_map = getattr(entity_type, "color_map", None) entity_type_handle = H5Writer.fetch_handle(h5file, entity_type) if entity_type_handle is None: return try: del entity_type_handle["Color map"] entity_type.workspace.repack = True except KeyError: pass if color_map is not None and color_map.values is not None: cls.create_dataset( entity_type_handle, getattr(color_map, "_values"), "Color map", ) entity_type_handle["Color map"].attrs.create( "File name", color_map.name, dtype=cls.str_type )
[docs] @classmethod def write_value_map( cls, file: str | h5py.File, entity_type: shared.EntityType, ) -> None: """ Add :obj:`~geoh5py.data.reference_value_map.ReferenceValueMap` to a :obj:`~geoh5py.data.data_type.DataType`. :param file: Name or handle to a geoh5 file :param entity_type: Target entity_type with value_map """ with fetch_h5_handle(file, mode="r+") as h5file: reference_value_map = getattr(entity_type, "value_map", None) names = ["Key", "Value"] formats = ["<u4", h5py.special_dtype(vlen=str)] entity_type_handle = H5Writer.fetch_handle(h5file, entity_type) if entity_type_handle is None: return try: del entity_type_handle["Value map"] entity_type.workspace.repack = True except KeyError: pass if reference_value_map is not None and reference_value_map.map is not None: dtype = list(zip(names, formats)) array = np.array(list(reference_value_map.map.items()), dtype=dtype) cls.create_dataset(entity_type_handle, array, "Value map")
[docs] @classmethod def write_visible( cls, file: str | h5py.File, entity, ) -> None: """ Needs revision once Visualization is implemented :param file: Name or handle to a geoh5 file :param entity: Target entity """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return dtype = np.dtype( [("ViewID", h5py.special_dtype(vlen=str)), ("Visible", "int8")] ) if entity.visible: visible = entity_handle.create_dataset( "Visible", shape=(1,), dtype=dtype ) visible["Visible"] = 1
[docs] @classmethod def write_array_attribute( cls, file: str | h5py.File, entity, attribute, values=None, **kwargs ) -> None: """ Add :obj:`~geoh5py.objects.object_base.ObjectBase.surveys` of an object. :param file: Name or handle to a geoh5 file. :param entity: Target entity. :param attribute: Name of the attribute to be written to geoh5 """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return if values is None and getattr(entity, f"{attribute}", None) is not None: values = getattr(entity, f"_{attribute}", None) if ( isinstance(entity, Concatenator) and attribute != "concatenated_object_ids" ): entity_handle = entity_handle["Concatenated Data"] try: del entity_handle[KEY_MAP[attribute]] entity.workspace.repack = True except KeyError: pass if values is not None: entity_handle.create_dataset( KEY_MAP[attribute], data=values, compression="gzip", compression_opts=9, **kwargs, )
[docs] @classmethod def write_data_values( # pylint: disable=too-many-branches cls, file: str | h5py.File, entity, attribute, compression: int, values=None, **kwargs, ) -> None: """ Add data :obj:`~geoh5py.data.data.Data.values`. :param file: Name or handle to a geoh5 file. :param entity: Target entity. :param attribute: Name of the attribute to be written to geoh5 :param compression: Compression level for the data. :param values: Data values. """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return name_map = KEY_MAP[attribute] if isinstance(entity, Concatenator): entity_handle = entity_handle["Concatenated Data"] if ( attribute == "concatenated_attributes" and entity.concat_attr_str == "Attributes Jsons" ): name_map = entity.concat_attr_str if name_map in entity_handle: del entity_handle[name_map] entity.workspace.repack = True if values is None: if getattr(entity, attribute, None) is None: return values = getattr(entity, "_" + attribute) if ( attribute == "concatenated_attributes" and entity.concat_attr_str == "Attributes Jsons" ): values = [ json.dumps(val).encode("utf-8") for val in values["Attributes"] ] # Adding an array of values if isinstance(values, dict) or isinstance(entity, CommentsData): values = deepcopy(values) if isinstance(entity, CommentsData): values = {"Comments": values} values = dict_mapper(values, [as_str_if_uuid]) entity_handle.create_dataset( name_map, data=json.dumps(values, indent=4), dtype=h5py.special_dtype(vlen=str), shape=(1,), **kwargs, ) elif isinstance(entity, FilenameData): cls.write_file_name_data(entity_handle, entity, values) elif isinstance(values, str): entity_handle.create_dataset( name_map, data=values, dtype=h5py.special_dtype(vlen=str), shape=(1,), **kwargs, ) else: out_values = deepcopy(values) if isinstance(entity, BooleanData): out_values = np.round(out_values).astype("int8") elif isinstance(entity, IntegerData): out_values = np.round(out_values).astype("int32") elif isinstance(entity, TextData) and not isinstance(values[0], bytes): out_values = [val.encode() for val in values] if getattr(entity, "ndv", None) is not None: out_values[np.isnan(out_values)] = entity.ndv entity_handle.create_dataset( name_map, data=out_values, compression="gzip", compression_opts=compression, **kwargs, )
[docs] @classmethod def clear_stats_cache( cls, file: str | h5py.File, entity: Data, ) -> None: """ Clear the StatsCache dataset. :param file: Name or handle to a geoh5 file. :param entity: Target entity. """ with fetch_h5_handle(file, mode="r+") as h5file: if not isinstance(entity, Data): return entity_type_handle = H5Writer.fetch_handle(h5file, entity.entity_type) if entity_type_handle is None: return stats_cache = entity_type_handle.get("StatsCache") if stats_cache is not None: del entity_type_handle["StatsCache"] entity.workspace.repack = True
[docs] @classmethod def write_entity( cls, file: str | h5py.File, entity, compression: int, ) -> h5py.Group: """ Add an :obj:`~geoh5py.shared.entity.Entity` and its attributes to geoh5. The function returns a pointer to the entity if already present on file. :param file: Name or handle to a geoh5 file. :param entity: Target :obj:`~geoh5py.shared.entity.Entity`. :param compression: Compression level for data. :return entity: Pointer to the written entity. Active link if "close_file" is False. """ with fetch_h5_handle(file, mode="r+") as h5file: base = list(h5file)[0] if isinstance(entity, Data): entity_type = "Data" elif isinstance(entity, ObjectBase): entity_type = "Objects" else: entity_type = "Groups" uid = entity.uid if entity_type not in h5file[base]: h5file[base].create_group(entity_type) # Check if already in the project if as_str_if_uuid(uid) in h5file[base][entity_type]: entity.on_file = True return h5file[base][entity_type][as_str_if_uuid(uid)] entity_handle = h5file[base][entity_type].create_group(as_str_if_uuid(uid)) if isinstance(entity, Concatenator): concat_group = entity_handle.create_group("Concatenated Data") entity_handle.create_group("Data") concat_group.create_group("Index") concat_group.create_group("Data") entity_handle.create_group("Groups") elif entity_type == "Groups": entity_handle.create_group("Data") entity_handle.create_group("Groups") entity_handle.create_group("Objects") elif entity_type == "Objects": entity_handle.create_group("Data") # Add the type new_type = H5Writer.write_entity_type(h5file, entity.entity_type) entity_handle["Type"] = new_type entity.entity_type.on_file = True cls.write_properties(h5file, entity, compression) entity.on_file = True if isinstance(entity, RootGroup): if "Root" in h5file[base]: del h5file[base]["Root"] h5file[base]["Root"] = entity_handle return entity_handle
[docs] @classmethod def write_entity_type( cls, file: str | h5py.File, entity_type: shared.EntityType, ) -> h5py.Group: """ Add an :obj:`~geoh5py.shared.entity_type.EntityType` to geoh5. :param file: Name or handle to a geoh5 file. :param entity_type: Entity with type to be added. :return type: Pointer to :obj:`~geoh5py.shared.entity_type.EntityType` in geoh5. """ with fetch_h5_handle(file, mode="r+") as h5file: base = list(h5file)[0] uid = entity_type.uid if isinstance(entity_type, DataType): entity_type_str = "Data types" elif isinstance(entity_type, ObjectType): entity_type_str = "Object types" elif isinstance(entity_type, GroupType): entity_type_str = "Group types" else: return None if "Types" not in h5file[base]: h5file[base].create_group("Types") # Check if already in the project if entity_type_str not in h5file[base]["Types"]: h5file[base]["Types"].create_group(entity_type_str) if as_str_if_uuid(uid) in h5file[base]["Types"][entity_type_str]: entity_type.on_file = True return h5file[base]["Types"][entity_type_str][as_str_if_uuid(uid)] new_type = h5file[base]["Types"][entity_type_str].create_group( as_str_if_uuid(uid) ) H5Writer.write_attributes(h5file, entity_type) if hasattr(entity_type, "color_map"): H5Writer.write_color_map(h5file, entity_type) if hasattr(entity_type, "value_map"): H5Writer.write_value_map(h5file, entity_type) entity_type.on_file = True return new_type
[docs] @classmethod def write_file_name_data( cls, entity_handle: h5py.Group, entity: FilenameData, values: bytes ) -> None: """ Write a dataset for the file name and file blob. :param entity_handle: Pointer to the geoh5 Group. :param entity: Target :obj:`~geoh5py.data.filename_data.FilenameData` entity. :param values: Bytes data """ if entity.file_name is None: raise AttributeError("FilenameData requires the 'file_name' to be set.") entity_handle.create_dataset( "Data", data=entity.file_name, dtype=h5py.special_dtype(vlen=str), shape=(1,), ) if entity.file_name in entity_handle: del entity_handle[entity.file_name] entity.workspace.repack = True entity_handle.create_dataset( entity.file_name, data=np.asarray(np.void(values[:])), shape=(1,), )
[docs] @classmethod def write_properties( cls, file: str | h5py.File, entity: Entity, compression: int, ) -> None: """ Add properties of an :obj:`~geoh5py.shared.entity.Entity`. :param file: Name or handle to a geoh5 file. :param entity: Target :obj:`~geoh5py.shared.entity.Entity`. :param compression: Compression level for data. """ with fetch_h5_handle(file, mode="r+") as h5file: H5Writer.update_field(h5file, entity, "attributes", compression) for attribute in KEY_MAP: if getattr(entity, attribute, None) is not None: H5Writer.update_field(h5file, entity, attribute, compression)
[docs] @classmethod def write_property_groups( cls, file: str | h5py.File, entity, ) -> None: """ Write :obj:`~geoh5py.groups.property_group.PropertyGroup` associated with an :obj:`~geoh5py.shared.entity.Entity`. :param file: Name or handle to a geoh5 file. :param entity: Target :obj:`~geoh5py.shared.entity.Entity`. """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, entity) if entity_handle is None: return try: del entity_handle["PropertyGroups"] entity.workspace.repack = True except KeyError: pass if hasattr(entity, "property_groups") and isinstance( entity.property_groups, list ): for p_g in entity.property_groups: cls.add_or_update_property_group(h5file, p_g)
[docs] @classmethod def add_or_update_property_group(cls, file, property_group, remove=False): """ Update a :obj:`~geoh5py.groups.property_group.PropertyGroup` associated with an :obj:`~geoh5py.shared.entity.Entity`. :param file: Name or handle to a geoh5 file. :param property_group: Target PropertyGroup """ with fetch_h5_handle(file, mode="r+") as h5file: entity_handle = H5Writer.fetch_handle(h5file, property_group.parent) if entity_handle is None: return if "PropertyGroups" not in entity_handle: entity_handle.create_group("PropertyGroups") uid = as_str_if_uuid(property_group.uid) if uid in entity_handle["PropertyGroups"]: del entity_handle["PropertyGroups"][uid] property_group.parent.workspace.repack = True if remove: return entity_handle["PropertyGroups"].create_group(uid) group_handle = entity_handle["PropertyGroups"][uid] for key, attr in property_group.attribute_map.items(): try: value = getattr(property_group, attr) except AttributeError: continue if key == "Association": value = value.name.capitalize() elif key == "Properties": if value is None: continue value = np.asarray([as_str_if_uuid(val) for val in value]) elif key == "ID": value = as_str_if_uuid(value) group_handle.attrs.create( key, value, dtype=h5py.special_dtype(vlen=str) ) property_group.on_file = True
[docs] @classmethod def write_to_parent( cls, file: str | h5py.File, entity: Entity, compression: int, recursively: bool = False, ) -> None: """ Add/create an :obj:`~geoh5py.shared.entity.Entity` and add it to its parent. :param file: Name or handle to a geoh5 file. :param entity: Entity to be added or linked to a parent in geoh5. :param compression: Compression level for data. :param recursively: Add parents recursively until reaching the :obj:`~geoh5py.groups.root_group.RootGroup`. """ with fetch_h5_handle(file, mode="r+") as h5file: if isinstance(entity, RootGroup): return uid = entity.uid entity_handle = H5Writer.write_entity(h5file, entity, compression) parent_handle = H5Writer.write_entity(h5file, entity.parent, compression) if isinstance(entity, Data): entity_type = "Data" elif isinstance(entity, ObjectBase): entity_type = "Objects" elif isinstance(entity, Group): entity_type = "Groups" else: return # Check if child h5py.Group already exists if entity_type not in parent_handle: parent_handle.create_group(entity_type) # Check if child uuid not already in h5 if as_str_if_uuid(uid) not in parent_handle[entity_type]: parent_handle[entity_type][as_str_if_uuid(uid)] = entity_handle if recursively: H5Writer.write_to_parent( h5file, entity.parent, compression=compression, recursively=True )