Source code for geoh5py.shared.concatenation.drillholes_group_table

#  Copyright (c) 2024 Mira Geoscience Ltd.
#
#  This file is part of geoh5py.
#
#  geoh5py is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  geoh5py is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with geoh5py.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

from abc import ABC
from typing import TYPE_CHECKING, Any
from uuid import UUID

import numpy as np

from ...data.data_type import DataType, ReferencedValueMapType
from ..utils import decode_byte_array, str2uuid, to_tuple
from .property_group import ConcatenatedPropertyGroup


if TYPE_CHECKING:  # pragma: no cover
    from .concatenator import Concatenator
    from .data import ConcatenatedData
    from .drillhole import ConcatenatedDrillhole


[docs] class DrillholesGroupTable(ABC): """ A class to store the information of a PropertyGroup. :param parent: The parent DrillholesConcatenator. :param name: The name of the PropertyGroup to extract. """ def __init__(self, parent: Concatenator, name: str): self._association: tuple | None = None self._index_by_drillhole: dict | None = None self._properties: dict[str, DataType] | None = None self._property_groups: dict[UUID, ConcatenatedPropertyGroup] = ( self._get_property_groups(parent, name) ) self._parent: Concatenator = parent self._name: str = name def _create_empty_structured_array( self, names: tuple[str], drillhole: bool = True, mapped: bool = False, ) -> np.ndarray: """ Create an empty structured array that can contains the data. :param names: The names to extract. :param drillhole: If True, the drillholes are added to the table. :param mapped: Map the referenced data back to the its descriptions instead of indexes. :return: an empty structured array. """ dtypes = [("Drillhole", "O")] if drillhole else [] no_data_values = [None] if drillhole else [] for name in self.association: if name in names: dtypes.append((name, np.float32)) no_data_values.append(np.nan) for name, data_type in self.properties_type.items(): if name in names: if (data_type.dtype not in [np.float32, np.int32, np.uint32, bool]) or ( isinstance(data_type, ReferencedValueMapType) and mapped ): dtype = "O" else: dtype = data_type.dtype dtypes.append((name, dtype)) no_data_values.append(self.nan_value_from_name(name)) empty_array = np.recarray( (self.parent.data[self.association[0]].shape[0],), dtype=dtypes ) for name, ndv in zip(empty_array.dtype.names, no_data_values, strict=False): empty_array[name].fill(ndv) return empty_array def _depth_table_by_key( self, names: tuple, drillholes: bool = True, mapped: bool = False, ) -> np.ndarray: """ Get a table with all the data associated with depth for every drillhole object. The Drillhole name is added at the beginning of the table for every row. The table is based on the association and contains nan values if no data is found. :param names: The names to extract. :param drillholes: If True, the drillholes are added to the table. :param mapped: Map the referenced data back. :return: a structured array with all the data. """ if self.index_by_drillhole is None: raise ValueError("No drillhole found in the concatenator.") output_array = self._create_empty_structured_array( names=names, drillhole=drillholes, mapped=mapped ) current_index = 0 for object_, data_dict in self.index_by_drillhole.items(): for name, info in data_dict.items(): if name in names: temp_array = self.parent.data[name][info[0] : info[0] + info[1]] output_array[name][ current_index : current_index + temp_array.shape[0] ] = temp_array if drillholes: output_array["Drillhole"][ current_index : current_index + data_dict[self.association[0]][1] ] = object_ current_index += data_dict[self.association[0]][1] if mapped: return self._replace_referenced_data(output_array) return output_array @staticmethod def _get_property_groups(parent, name) -> dict[UUID, ConcatenatedPropertyGroup]: """ Get all the property groups with a given name in the concatenator. :param parent: the concatenator object. :param name: the name to get the property groups for. :return: a dictionary of property groups with drillhole object as Key. """ if not hasattr(parent, "property_group_ids"): raise TypeError("The parent must be a Concatenator instance.") property_groups: dict[UUID, ConcatenatedPropertyGroup] = {} for property_group_uid in parent.property_group_ids: property_group = parent.workspace.get_entity(str2uuid(property_group_uid))[ 0 ] if ( isinstance(property_group, ConcatenatedPropertyGroup) and property_group.name == name ): property_groups[property_group.parent.uid] = property_group if not property_groups: raise ValueError( f"No property group with name '{name}' found in the concatenator." ) return property_groups def _get_properties_names_types(self): if not self._properties: properties: dict[str, DataType] = {} for property_group in self.property_groups.values(): if property_group.properties is None: continue for property_ in property_group.properties: property_temp = property_group.parent.get_data(property_)[0] if ( property_temp.name not in self.association and property_temp.name not in properties ): properties[property_temp.name] = property_temp.entity_type # sort the properties names and dtypes if properties: self._properties = dict(sorted(properties.items())) def _replace_referenced_data(self, output_array: np.ndarray) -> np.ndarray: """ Replace the referenced data in the output array. :param output_array: The array to replace the data in. :return: The array with the replaced data. """ # get the for name in output_array.dtype.names: if isinstance(self.properties_type[name], ReferencedValueMapType): output_array[name] = decode_byte_array( self.properties_type[name].value_map.map_values(output_array[name]), str, ) return output_array
[docs] def add_values_to_property_group( self, name: str, values: np.ndarray, data_type: DataType | None = None ): """ Push the values to each drillhole of the property group based on association. :param name: The name of the data to push. :param values: The values to push. :param data_type: The data type associated to description; useful especially for referenced data. """ if not isinstance(name, str) or name in self.parent.data: raise KeyError("The name must be a string not present in data.") # ensure the length of the values is the same as the length of the template if values.shape != self.parent.data[self.association[0]].shape: raise ValueError( "The length of the values must be the same as the association " f"({self.parent.data[self.association[0]].shape})." ) if not isinstance(data_type, DataType): primitive_type = DataType.primitive_type_from_values(values) data_type = DataType.find_or_create_type( self.parent.workspace, primitive_type, name=name ) for drillhole_uid, indices in self.index_by_drillhole.items(): # get the drillhole drillhole: ConcatenatedDrillhole = self.parent.workspace.get_entity( # type: ignore str2uuid(drillhole_uid) )[0] # add data to the drillhole drillhole.add_data( { name: { **{ "values": values[ indices[self.association[0]][0] : indices[ self.association[0] ][0] + indices[self.association[0]][1] ], }, "entity_type": data_type, }, }, property_group=self.property_groups[drillhole.uid], ) self._update_drillholes_group_table(name, data_type)
@property def association(self) -> tuple: """ The depth association of the PropertyGroup. """ if self._association is None: if self.property_group_type == "Interval table": self._association = ( list(self.property_groups.values())[0].from_.name, list(self.property_groups.values())[0].to_.name, ) elif self.property_group_type == "Depth table": self._association = ( list(self.property_groups.values())[0].depth_.name, ) else: raise TypeError( f"The property group type '{self.property_group_type}' is not supported." ) return self._association @property def depth_table( self, ) -> np.ndarray: """ Get a table with all the data associated with depth for every drillhole object. The Drillhole name is added at the beginning of the table for every row. The table is based on the association and contains nan values if no data is found. :return: a structured array with all the data. """ # get the depth table return self._depth_table_by_key(self.association + self.properties, True)
[docs] def depth_table_by_name( self, names: tuple[str] | str, spatial_index: bool = False, mapped: bool = False ) -> np.ndarray: """ Get a table with specific data associated with depth for every drillhole object. :param names: The names to extract. :param spatial_index: If True, the spatial index is added to the table. :param mapped: Map the referenced data back. :return: a table containing the Drillholes, the association and the data. """ names = to_tuple(names) # ensure names are in properties if not all(name in self.properties for name in names): raise KeyError("The names are not in the list of properties.") if spatial_index: return self._depth_table_by_key(self.association + names, True, mapped) return self._depth_table_by_key(names, False, mapped)
@property def index_by_drillhole( self, ) -> dict[bytes, dict[str, list[int]]]: """ Get for every object index and count of all the data in 'association' and 'properties' :return: A dictionary with the object uuid and the index of all the data. """ if self._index_by_drillhole is None: index_by_drillhole: dict[bytes, dict[str, list[int]]] = {} names = self.association + self.properties for drillhole in np.sort(self._parent.index[names[0]], order="Start index")[ "Object ID" ]: index_by_drillhole[drillhole] = {} for name in names: if drillhole in self._parent.index[name]["Object ID"]: index_by_drillhole[drillhole][name] = list( self._parent.index[name][ self._parent.index[name]["Object ID"] == drillhole ][0] )[:2] else: index_by_drillhole[drillhole][name] = [0, 0] if index_by_drillhole: self._index_by_drillhole = index_by_drillhole else: raise AssertionError("No drillhole found in the concatenator.") return self._index_by_drillhole @property def name(self) -> str: """ The name of the PropertyGroup. """ return self._name
[docs] def nan_value_from_name(self, name: str) -> Any: """ Get the nan value of a data from the name. :param name: The name of the data to get. :return: The nan value. """ if name not in self.properties + self.association: raise KeyError(f"The name '{name}' is not in the list of properties.") # get the data of the know association data: ConcatenatedData = self.parent.workspace.get_entity( # type: ignore str2uuid(self.parent.index[name][0][-2]) )[0].get_data(str2uuid(self.parent.index[name][0][-1]))[0] return data.nan_value
@property def parent(self) -> Concatenator: """ The parent Concatenator object. """ return self._parent @property def properties(self) -> tuple: """ The names of the associated data. """ if not self._properties: self._get_properties_names_types() if self._properties is None: return () return tuple(self._properties.keys()) @property def properties_type(self) -> dict: """ A mapper of the type in function of the properties name """ if not self._properties: self._get_properties_names_types() if self._properties is None: return {} return self._properties @property def property_group_type(self) -> str: """ The type of the PropertyGroup. """ return list(self.property_groups.values())[0].property_group_type @property def property_groups(self) -> dict[UUID, ConcatenatedPropertyGroup]: """ Get all the property groups in the concatenator. :return: A list containing all the property groups. """ return self._property_groups def _update_drillholes_group_table(self, name: str, data_type: DataType): """ Update the drillholes group table with a new property group. :param name: The name of the property group to update. :param data_type: The data type of the property group. """ self.parent.update_data_index() self.parent.workspace.update_attribute(self.parent, "concatenated_attributes") self._property_groups = self._get_property_groups(self.parent, self.name) if self.properties and self._properties is not None: self._properties[name] = data_type else: self._properties = {name: data_type} if self._index_by_drillhole is not None: for value in self._index_by_drillhole.values(): value[name] = value[self.association[0]]