# Copyright (c) 2023 Mira Geoscience Ltd.
#
# This file is part of geoh5py.
#
# geoh5py is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# geoh5py is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with geoh5py. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=too-many-lines
from __future__ import annotations
import re
import uuid
from typing import TYPE_CHECKING
import numpy as np
from h5py import special_dtype
from geoh5py.data import Data, DataType
from geoh5py.groups import Group, PropertyGroup
from geoh5py.objects import ObjectBase
from geoh5py.shared.entity import Entity
from geoh5py.shared.utils import (
INV_KEY_MAP,
KEY_MAP,
as_str_if_utf8_bytes,
as_str_if_uuid,
)
if TYPE_CHECKING:
from ..groups import GroupType
PROPERTY_KWARGS = {
"trace": {"maxshape": (None,)},
"trace_depth": {"maxshape": (None,)},
"property_group_ids": {
"dtype": special_dtype(vlen=str),
"maxshape": (None,),
},
"surveys": {"maxshape": (None,)},
}
[docs]class Concatenator(Group): # pylint: disable=too-many-public-methods
"""
Class modifier for concatenation of objects and data.
"""
_concatenated_attributes: dict | None = None
_attributes_keys: list[uuid.UUID] | None = None
_concatenated_object_ids: list[bytes] | None = None
_concat_attr_str: str | None = None
_data: dict
_index: dict
_property_group_ids: np.ndarray | None = None
_property_groups: list | None = None
def __init__(self, group_type: GroupType, **kwargs):
super().__init__(group_type, **kwargs)
getattr(self, "_attribute_map").update(
{
self.concat_attr_str: "concatenated_attributes",
"Property Groups IDs": "property_group_ids",
"Concatenated object IDs": "concatenated_object_ids",
}
)
@property
def attributes_keys(self) -> list | None:
"""List of uuids present in the concatenated attributes."""
if getattr(self, "_attributes_keys", None) is None:
attributes_keys = []
if self.concatenated_attributes is not None:
attributes_keys = [
elem["ID"] for elem in self.concatenated_attributes["Attributes"]
]
self._attributes_keys = attributes_keys
return self._attributes_keys
[docs] def add_save_concatenated(self, child) -> None:
"""
Add or save a concatenated entity.
:param child: Concatenated entity
"""
self.update_concatenated_attributes(child)
if hasattr(child, "values"):
self.update_array_attribute(child, child.name)
elif hasattr(child, "surveys"): # Specific to drillholes
uid = as_str_if_uuid(child.uid).encode()
concat_object_ids = [uid]
if self._concatenated_object_ids is not None:
if uid not in self._concatenated_object_ids:
concat_object_ids = (
self._concatenated_object_ids + concat_object_ids
)
else:
concat_object_ids = self._concatenated_object_ids
self.concatenated_object_ids = concat_object_ids
self.update_array_attribute(child, "surveys")
self.update_array_attribute(child, "trace")
child.on_file = True
@property
def concat_attr_str(self) -> str:
"""String identifier for the concatenated attributes."""
if self._concat_attr_str is None:
self._concat_attr_str = "Attributes"
if (
self.workspace.ga_version is not None
and "4." in self.workspace.ga_version
):
version = re.findall(r"\d+\.\d+", self.workspace.ga_version)
if version and float(version[0]) >= 4.2:
self._concat_attr_str = "Attributes Jsons"
return self._concat_attr_str
@property
def concatenated_attributes(self) -> dict | None:
"""Dictionary of concatenated objects and data attributes."""
if self._concatenated_attributes is None:
concatenated_attributes = self.workspace.fetch_concatenated_attributes(self)
if concatenated_attributes is None:
concatenated_attributes = {"Attributes": []}
self._concatenated_attributes = concatenated_attributes
return self._concatenated_attributes
@concatenated_attributes.setter
def concatenated_attributes(self, concatenated_attributes: dict):
if not isinstance(concatenated_attributes, (dict, type(None))):
raise ValueError(
"Input 'concatenated_attributes' must be a dictionary or None"
)
self._concatenated_attributes = concatenated_attributes
@property
def concatenated_object_ids(self) -> list[bytes] | None:
"""Dictionary of concatenated objects and data concatenated_object_ids."""
if getattr(self, "_concatenated_object_ids", None) is None:
concatenated_object_ids = self.workspace.fetch_array_attribute(
self, "concatenated_object_ids"
)
if isinstance(concatenated_object_ids, np.ndarray):
concatenated_object_ids = concatenated_object_ids.tolist()
self._concatenated_object_ids = concatenated_object_ids
return self._concatenated_object_ids
@concatenated_object_ids.setter
def concatenated_object_ids(self, object_ids: list[bytes] | None):
if isinstance(object_ids, np.ndarray):
object_ids = object_ids.tolist()
if not isinstance(object_ids, (list, type(None))):
raise AttributeError(
"Input value for 'concatenated_object_ids' must be of type list."
)
self._concatenated_object_ids = object_ids
self.workspace.update_attribute(self, "concatenated_object_ids")
[docs] def copy(self, parent=None, copy_children: bool = True, **kwargs):
"""
Function to copy an entity to a different parent entity.
:param parent: Target parent to copy the entity under. Copied to current
:obj:`~geoh5py.shared.entity.Entity.parent` if None.
:param copy_children: Create copies of all children entities along with it.
:return entity: Registered Entity to the workspace.
"""
if parent is None:
parent = self.parent
new_entity = parent.workspace.copy_to_parent(
self, parent, copy_children=False, **kwargs
)
if self.concatenated_attributes is None:
return new_entity
for field in self.index:
values = self.workspace.fetch_concatenated_values(self, field)
if isinstance(values, tuple):
new_entity.data[field], new_entity.index[field] = values
new_entity.save_attribute(field)
# Copy over the data type
for elem in self.concatenated_attributes["Attributes"]:
if "Name" in elem and elem["Name"] == field and "Type ID" in elem:
attr_type = self.workspace.fetch_type(
uuid.UUID(elem["Type ID"]), "Data"
)
data_type = DataType.find_or_create(
new_entity.workspace, **attr_type
)
new_entity.workspace.save_entity_type(data_type)
new_entity.workspace.fetch_children(new_entity)
return new_entity
@property
def data(self) -> dict:
"""
Concatenated data values stored as a dictionary.
"""
if getattr(self, "_data", None) is None:
self._data, self._index = self.fetch_concatenated_data_index()
return self._data
@data.setter
def data(self, data: dict):
if not isinstance(data, dict):
raise ValueError("Input 'data' must be a dictionary")
self._data = data
[docs] def delete_index_data(self, label: str, index: int) -> None:
start, size = self.index[label][index][0], self.index[label][index][1]
self.data[label] = np.delete(
self.data[label], np.arange(start, start + size), axis=0
)
# Shift indices
self.index[label]["Start index"][
self.index[label]["Start index"] > start
] -= size
self.index[label] = np.delete(self.index[label], index, axis=0)
[docs] def fetch_concatenated_data_index(self):
"""Extract concatenation arrays."""
data, index = {}, {}
data_list = self.workspace.fetch_concatenated_list(self, "Index")
if data_list is not None:
for field in data_list:
name = field.replace("\u2044", "/")
values = self.workspace.fetch_concatenated_values(self, field)
if isinstance(values, tuple):
data[name], index[name] = values
return data, index
[docs] def fetch_concatenated_objects(self) -> dict:
"""
Load all concatenated children.
"""
attr_dict = {}
if self.concatenated_object_ids is None:
return {}
for key in self.concatenated_object_ids:
attrs = {
attr: val
for attr, val in self.get_concatenated_attributes(key).items()
if "Property" not in attr
}
attrs["parent"] = self
attr_dict[key] = self.workspace.create_from_concatenation(attrs)
return attr_dict
[docs] def fetch_index(self, entity: Concatenated, field: str) -> int | None:
"""
Fetch the array index for specific concatenated object and data field.
:param entity: Parent entity with data
:param field: Name of the target data.
"""
field = KEY_MAP.get(field, field)
if field not in self.index:
return None
uid = as_str_if_uuid(entity.uid).encode()
if isinstance(entity, ConcatenatedData):
ind = np.where(self.index[field]["Data ID"] == uid)[0]
if len(ind) == 1:
return ind[0]
else:
ind = np.where(self.index[field]["Object ID"] == uid)[0]
if len(ind) == 1:
return ind[0]
return None
[docs] def fetch_start_index(self, entity: Concatenated, label: str) -> int:
"""
Fetch starting index for a given entity and label.
Existing date is removed such that new entries can be appended.
:param entity: Concatenated entity to be added.
:param label: Name of the attribute requiring an update.
"""
index = self.fetch_index(entity, label)
if index is not None: # First remove the old data
self.delete_index_data(label, index)
start = self.data[label].shape[0]
elif label in self.index:
start = np.sum(self.index[label]["Size"])
else:
start = 0
return start
[docs] def fetch_values(self, entity: Concatenated, field: str) -> np.ndarray | None:
"""
Get an array of values from concatenated data.
:param entity: Parent entity with data
:param field: Name of the target data.
"""
field = KEY_MAP.get(field, field)
index = self.fetch_index(entity, field)
if index is None:
return None
start, size = self.index[field][index][0], self.index[field][index][1]
return self.data[field][start : start + size]
[docs] def get_concatenated_attributes(self, uid: bytes | str | uuid.UUID) -> dict:
"""
Fast reference index to concatenated attribute keys.
"""
if self.concatenated_attributes is None:
return {}
uid = as_str_if_utf8_bytes(uid)
if isinstance(uid, str):
uid = uuid.UUID(uid)
uid = as_str_if_utf8_bytes(as_str_if_uuid(uid))
if self.attributes_keys is not None and uid in self.attributes_keys:
index = self.attributes_keys.index(uid)
else:
if self.attributes_keys is not None:
self.attributes_keys.append(uid)
if self.concatenated_attributes is not None:
self.concatenated_attributes["Attributes"].append({})
index = -1
return self.concatenated_attributes["Attributes"][index]
@property
def index(self) -> dict:
"""
Concatenated index stored as a dictionary.
"""
if getattr(self, "_index", None) is None:
self._data, self._index = self.fetch_concatenated_data_index()
return self._index
@index.setter
def index(self, index: dict):
if not isinstance(index, dict):
raise ValueError("Input 'index' must be a dictionary")
self._index = index
@property
def property_group_ids(self) -> list | None:
"""Dictionary of concatenated objects and data property_group_ids."""
if self._property_group_ids is None:
property_groups_ids = self.workspace.fetch_concatenated_values(
self, "property_group_ids"
)
if property_groups_ids is not None:
self._property_group_ids = property_groups_ids[0].tolist()
return self._property_group_ids
[docs] def remove_entity(self, entity: Concatenated):
"""Remove a concatenated entity."""
if isinstance(entity, ConcatenatedData):
# Remove the rows of data and index
self.update_array_attribute(entity, entity.name, remove=True)
# Remove from the concatenated Attributes
parent_attr = self.get_concatenated_attributes(entity.parent.uid)
name = entity.name
del parent_attr[f"Property:{name}"]
elif isinstance(entity, ConcatenatedObject):
if entity.property_groups is not None:
self.update_array_attribute(entity, "property_groups", remove=True)
object_ids = self.concatenated_object_ids
if object_ids is not None:
object_ids.remove(as_str_if_uuid(entity.uid).encode())
self.concatenated_object_ids = object_ids
if self.concatenated_attributes is not None:
attr_handle = self.get_concatenated_attributes(entity.uid)
self.concatenated_attributes["Attributes"].remove(attr_handle)
self.workspace.repack = True
[docs] def save_attribute(self, field: str):
"""
Save a concatenated attribute.
:param field: Name of the attribute
"""
field = INV_KEY_MAP.get(field, field)
alias = KEY_MAP.get(field, field)
self.workspace.update_attribute(self, "index", alias)
if field in PROPERTY_KWARGS: # For group property
if field == "property_groups":
field = "property_group_ids"
self.workspace.update_attribute(
self,
field,
values=self.data.get(alias),
**PROPERTY_KWARGS.get(field, {}),
)
else: # For data values
self.workspace.update_attribute(self, "data", field)
[docs] def update_attributes(self, entity: Concatenated, label: str) -> None:
"""
Update a concatenated entity.
"""
if label == "attributes":
self.update_concatenated_attributes(entity)
elif label == "property_groups":
if getattr(entity, "property_groups", None) is not None:
for prop_group in getattr(entity, "property_groups"):
self.add_save_concatenated(prop_group)
if (
self.property_group_ids is not None
and as_str_if_uuid(prop_group.uid).encode()
not in self.property_group_ids
):
self.property_group_ids.append(
as_str_if_uuid(prop_group.uid).encode()
)
self.update_array_attribute(entity, label)
else:
if isinstance(entity, Data):
label = entity.name
self.update_array_attribute(entity, label)
[docs] def update_concatenated_attributes(self, entity: Concatenated) -> None:
"""
Update the concatenated attributes.
:param entity: Concatenated entity with attributes.
"""
target_attributes = self.get_concatenated_attributes(entity.uid)
for key, attr in entity.attribute_map.items():
val = getattr(entity, attr, None)
if val is None or attr == "property_groups":
continue
if isinstance(val, np.ndarray):
val = "{" + ", ".join(str(e) for e in val.tolist()) + "}"
elif isinstance(val, uuid.UUID):
val = as_str_if_uuid(val)
elif isinstance(val, list):
val = [as_str_if_uuid(uid) for uid in val]
elif attr == "association":
val = val.name.lower().capitalize()
target_attributes[key] = val
if isinstance(entity, Data):
target_attributes["Type ID"] = as_str_if_uuid(entity.entity_type.uid)
elif hasattr(entity, "properties"):
pass
else:
target_attributes["Object Type ID"] = as_str_if_uuid(entity.entity_type.uid)
self.workspace.repack = True
[docs] def update_array_attribute(
self, entity: Concatenated, field: str, remove=False
) -> None:
"""
Update values stored as data.
Row data and indices are first remove then appended.
:param entity: Concatenated entity with array values.
:param field: Name of the valued field.
"""
if hasattr(entity, f"_{field}"):
values = getattr(entity, f"_{field}", None)
obj_id = as_str_if_uuid(entity.uid).encode()
data_id = as_str_if_uuid(uuid.UUID(int=0)).encode()
elif getattr(entity, "name") == field:
values = getattr(entity, "values", None)
obj_id = as_str_if_uuid(entity.parent.uid).encode()
data_id = as_str_if_uuid(entity.uid).encode()
else:
raise UserWarning(
f"Input entity {entity} does not have a property or values "
f"for the requested field {field}"
)
if field == "property_groups" and isinstance(values, list):
field = "property_group_ids"
values = [as_str_if_uuid(val.uid).encode() for val in values]
alias = KEY_MAP.get(field, field)
start = self.fetch_start_index(entity, alias)
if values is not None and not remove:
indices = np.hstack(
[
np.core.records.fromarrays(
(start, len(values), obj_id, data_id),
dtype=[
("Start index", "<u4"),
("Size", "<u4"),
("Object ID", special_dtype(vlen=str)),
("Data ID", special_dtype(vlen=str)),
],
)
]
)
if alias in self.index:
indices = np.hstack([self.index[alias], indices]).astype(
self.index[alias].dtype
)
self.index[alias] = indices
if alias in self.data:
values = np.hstack([self.data[alias], values])
self.data[alias] = values
self.save_attribute(field)
[docs]class Concatenated(Entity):
"""
Base class modifier for concatenated objects and data.
"""
_parent: Concatenated | Concatenator
_concat_attr_str: str = "Attributes"
def __init__(self, entity_type, **kwargs):
attribute_map = getattr(self, "_attribute_map", {})
attr = {"name": "Entity", "parent": None}
for key, value in kwargs.items():
attr[attribute_map.get(key, key)] = value
super().__init__(entity_type, **attr)
@property
def concat_attr_str(self) -> str:
"""String identifier for the concatenated attributes."""
return self._concat_attr_str
@property
def concatenator(self) -> Concatenator:
"""
Parental Concatenator entity.
"""
if isinstance(self._parent, Concatenated):
return self._parent.concatenator
return self._parent
[docs]class ConcatenatedData(Concatenated):
_parent: ConcatenatedObject
def __init__(self, entity_type, **kwargs):
if kwargs.get("parent") is None or not isinstance(
kwargs.get("parent"), ConcatenatedObject
):
raise UserWarning(
"Creating a concatenated data must have a parent "
"of type Concatenated."
)
super().__init__(entity_type, **kwargs)
@property
def property_group(self):
"""Get the property group containing the data interval."""
if self.parent.property_groups is None:
return None
for prop_group in self.parent.property_groups:
if self.uid in prop_group.properties:
return prop_group
return None
@property
def parent(self) -> ConcatenatedObject:
return self._parent
@parent.setter
def parent(self, parent):
if not isinstance(parent, ConcatenatedObject):
raise AttributeError(
"The 'parent' of a concatenated Data must be of type 'Concatenated'."
)
self._parent = parent
self._parent.add_children([self])
parental_attr = self.concatenator.get_concatenated_attributes(self.parent.uid)
if f"Property:{self.name}" not in parental_attr:
parental_attr[f"Property:{self.name}"] = as_str_if_uuid(self.uid)
[docs]class ConcatenatedPropertyGroup(PropertyGroup):
_parent: ConcatenatedObject
def __init__(self, parent: ConcatenatedObject, **kwargs):
if not isinstance(parent, ConcatenatedObject):
raise UserWarning(
"Creating a concatenated data must have a parent "
"of type Concatenated."
)
super().__init__(parent, **kwargs)
@property
def from_(self):
"""Return the data entities definind the 'from' depth intervals."""
if self.properties is None or len(self.properties) < 1:
return None
data = self.parent.get_data( # pylint: disable=no-value-for-parameter
self.properties[0]
)[0]
if isinstance(data, Data) and "from" in data.name.lower():
return data
return None
@property
def to_(self):
"""Return the data entities definind the 'to' depth intervals."""
if self.properties is None or len(self.properties) < 2:
return None
data = self.parent.get_data( # pylint: disable=no-value-for-parameter
self.properties[1]
)[0]
if isinstance(data, Data) and "to" in data.name.lower():
return data
return None
@property
def parent(self):
return self._parent
@parent.setter
def parent(self, parent):
if not isinstance(parent, ConcatenatedObject):
raise AttributeError(
"The 'parent' of a concatenated Data must be of type 'Concatenated'."
)
self._parent = parent
[docs]class ConcatenatedObject(Concatenated, ObjectBase):
_parent: Concatenator
_property_groups: list | None = None
def __init__(self, entity_type, **kwargs):
if kwargs.get("parent") is None or not isinstance(
kwargs.get("parent"), Concatenator
):
raise UserWarning(
"Creating a concatenated object must have a parent "
"of type Concatenator."
)
super().__init__(entity_type, **kwargs)
[docs] def find_or_create_property_group(self, **kwargs) -> ConcatenatedPropertyGroup:
"""
Find or create :obj:`~geoh5py.groups.property_group.PropertyGroup`
from given name and properties.
:param kwargs: Any arguments taken by the
:obj:`~geoh5py.groups.property_group.PropertyGroup` class.
:return: A new or existing :obj:`~geoh5py.groups.property_group.PropertyGroup`
"""
property_groups = []
if self._property_groups is not None:
property_groups = self._property_groups
if "name" in kwargs and any(
pg.name == kwargs["name"] for pg in property_groups
):
prop_group = [pg for pg in property_groups if pg.name == kwargs["name"]][0]
else:
if (
"property_group_type" not in kwargs
and "Property Group Type" not in kwargs
):
kwargs["property_group_type"] = "Interval table"
prop_group = ConcatenatedPropertyGroup(self, **kwargs)
property_groups += [prop_group]
self._property_groups = property_groups
return prop_group
[docs] def get_data(self, name: str | uuid.UUID) -> list[Data]:
"""
Generic function to get data values from object.
"""
entity_list = []
attr = self.concatenator.get_concatenated_attributes(
getattr(self, "uid")
).copy()
for key, value in attr.items():
if "Property:" in key:
child_data = self.workspace.get_entity(uuid.UUID(value))[0]
if child_data is None:
attributes: dict = self.concatenator.get_concatenated_attributes(
value
).copy()
attributes["parent"] = self
self.workspace.create_from_concatenation(attributes)
else:
self.add_children([child_data])
for child in getattr(self, "children"):
if (
isinstance(name, str) and hasattr(child, "name") and child.name == name
) or (
isinstance(name, uuid.UUID)
and hasattr(child, "uid")
and child.uid == name
):
entity_list.append(child)
return entity_list
[docs] def get_data_list(self, attribute="name"):
"""
Get list of data names.
"""
data_list = [
attr.replace("Property:", "").replace("\u2044", "/")
for attr in self.concatenator.get_concatenated_attributes(self.uid)
if "Property:" in attr
]
return data_list
@property
def parent(self) -> Concatenator:
return self._parent
@parent.setter
def parent(self, parent):
if not isinstance(parent, Concatenator):
raise AttributeError(
"The 'parent' of a concatenated Object must be of type "
"'Concatenator'."
)
self._parent = parent
self._parent.add_children([self])
@property
def property_groups(self) -> list | None:
if self._property_groups is None:
prop_groups = self.concatenator.fetch_values(self, "property_group_ids")
if prop_groups is None or isinstance(self, Data):
return None
for key in prop_groups:
getattr(self, "find_or_create_property_group")(
**self.concatenator.get_concatenated_attributes(key)
)
return self._property_groups
[docs]class ConcatenatedDrillhole(ConcatenatedObject):
@property
def from_(self) -> list[Data]:
"""
Depth data corresponding to the tops of the interval values.
"""
obj_list = []
for prop_group in (
self.property_groups if self.property_groups is not None else []
):
data = [self.get_data(child)[0] for child in prop_group.properties]
if len(data) > 0 and "from" in data[0].name.lower():
obj_list.append(data[0])
return obj_list
@property
def to_(self) -> list[Data]:
"""
Depth data corresponding to the bottoms of the interval values.
"""
obj_list = []
for prop_group in (
self.property_groups if self.property_groups is not None else []
):
data = [self.get_data(child)[0] for child in prop_group.properties]
if len(data) > 1 and "to" in data[1].name.lower():
obj_list.append(data[1])
return obj_list
[docs] def validate_data(
self, attributes: dict, property_group=None, collocation_distance=None
) -> tuple:
"""
Validate input drillhole data attributes.
:param attributes: Dictionary of data attributes.
:param property_group: Input property group to validate against.
"""
if collocation_distance is None:
collocation_distance = attributes.get(
"collocation_distance", getattr(self, "default_collocation_distance")
)
if collocation_distance < 0:
raise UserWarning("Input depth 'collocation_distance' must be >0.")
if (
"depth" not in attributes
and "from-to" not in attributes
and "association" not in attributes
):
if property_group is None:
raise AttributeError(
"Input data dictionary must contain {key:values} "
+ "{'from-to':numpy.ndarray} "
+ "or {'association': 'OBJECT'}."
)
attributes["from-to"] = None
if "depth" in attributes.keys():
attributes["from-to"] = np.c_[
attributes["depth"], attributes["depth"] + collocation_distance
]
del attributes["depth"]
if "from-to" in attributes.keys():
attributes["association"] = "DEPTH"
property_group = self.validate_interval_data(
attributes.get("name"),
attributes.get("from-to"),
attributes.get("values"),
property_group=property_group,
collocation_distance=collocation_distance,
)
del attributes["from-to"]
return attributes, property_group
[docs] def validate_interval_data(
self,
name: str | None,
from_to: list | np.ndarray | None,
values: np.ndarray,
property_group: str | ConcatenatedPropertyGroup | None = None,
collocation_distance=1e-4,
) -> ConcatenatedPropertyGroup:
"""
Compare new and current depth values and re-use the property group if possible.
Otherwise a new property group is added.
:param from_to: Array of from-to values.
:param values: Data values to be added on the from-to intervals.
:param property_group: Property group name
:collocation_distance: Threshold on the comparison between existing depth values.
"""
if from_to is not None:
if isinstance(from_to, list):
from_to = np.vstack(from_to)
if from_to.shape[0] == 2:
from_to = from_to.T
assert from_to.shape[0] >= len(values), (
f"Mismatch between input 'from_to' shape{from_to.shape} "
+ f"and 'values' shape{values.shape}"
)
assert from_to.shape[1] == 2, "The `from-to` values must have shape(*, 2)"
if (
from_to is not None
and property_group is None
and self.property_groups is not None
):
for p_g in self.property_groups:
if p_g.from_.values.shape[0] == from_to.shape[0] and np.allclose(
np.c_[p_g.from_.values, p_g.to_.values],
from_to,
atol=collocation_distance,
):
return p_g
ind = 0
label = ""
if len(self.from_) > 0:
ind = len(self.from_)
label = f"({ind})"
if property_group is None:
property_group = f"Interval_{ind}"
if isinstance(property_group, str):
out_group: ConcatenatedPropertyGroup = getattr(
self, "find_or_create_property_group"
)(name=property_group, association="DEPTH")
else:
out_group = property_group
if out_group.from_ is not None:
if out_group.from_.values.shape[0] != values.shape[0]:
raise ValueError(
f"Input values for '{name}' with shape({values.shape[0]}) "
f"do not match the from-to intervals of the group '{out_group}' "
f"with shape({out_group.from_.values.shape[0]}). Check values or "
f"assign to a new property group."
)
return out_group
from_to = getattr(self, "add_data")(
{
f"FROM{label}": {
"association": "DEPTH",
"values": from_to[:, 0],
"entity_type": {"primitive_type": "FLOAT"},
"parent": self,
"allow_move": False,
"allow_delete": False,
},
f"TO{label}": {
"association": "DEPTH",
"values": from_to[:, 1],
"entity_type": {"primitive_type": "FLOAT"},
"parent": self,
"allow_move": False,
"allow_delete": False,
},
},
out_group,
)
return out_group
[docs] def sort_depths(self):
"""Bypass sort_depths from previous version."""