# Copyright (c) 2024 Mira Geoscience Ltd.
#
# This file is part of geoh5py.
#
# geoh5py is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# geoh5py is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with geoh5py. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=R0902, R0904
from __future__ import annotations
import re
import uuid
import warnings
from numbers import Real
import numpy as np
from ..data import Data, FloatData, NumericData
from ..shared.utils import (
box_intersect,
dip_azimuth_to_vector,
mask_by_extent,
merge_arrays,
)
from .points import Points
INFINITE_RADIUS = 99999.9
MINIMUM_DEPTH_INTERVAL = 1.0
MAXIMUM_DEPTH_INTERVAL = 50.0
[docs]
class Drillhole(Points):
"""
Drillhole object class defined by a collar and survey.
:param collar: Coordinates of the drillhole.
:param cost: Cost estimate of the drillhole.
:param end_of_hole: End of drillhole in meters.
:param planning: Status of the hole, defaults to 'Default'.
:param surveys: Survey information provided as 'Depth', 'Azimuth', 'Dip'.
:param vertices: Coordinates of the vertices.
:param default_collocation_distance: Minimum collocation distance for matching depth on merge.
"""
_TYPE_UID = uuid.UUID(
fields=(0x7CAEBF0E, 0xD16E, 0x11E3, 0xBC, 0x69, 0xE4632694AA37)
)
__SURVEY_DTYPE = np.dtype([("Depth", "<f4"), ("Azimuth", "<f4"), ("Dip", "<f4")])
__COLLAR_DTYPE = np.dtype([("x", float), ("y", float), ("z", float)])
_attribute_map = Points._attribute_map.copy()
_attribute_map.update(
{
"Cost": "cost",
"Collar": "collar",
"Planning": "planning",
"End of hole": "end_of_hole",
}
)
def __init__( # pylint: disable=too-many-arguments
self,
*,
collar: np.ndarray | list | None = None,
cost: float = 0.0,
end_of_hole: float | None = None,
planning: str = "Default",
surveys: np.ndarray | list | tuple | None = None,
vertices: np.ndarray | None = None,
default_collocation_distance: float = 1e-2,
**kwargs,
):
self._cells: np.ndarray | None = None
self._depths: FloatData | None = None
self._trace: np.ndarray | None = None
self._trace_depth: np.ndarray | None = None
self._locations = None
self._surveys: np.ndarray | None = None
self._intervals: dict | None = None
super().__init__(
vertices=(0.0, 0.0, 0.0) if vertices is None else vertices, **kwargs
)
if vertices is None:
self._vertices = None
self.collar = collar
self.cost = cost
self.default_collocation_distance = default_collocation_distance
self.end_of_hole = end_of_hole
self.planning = planning
if surveys is not None:
self.surveys = surveys
@property
def cells(self) -> np.ndarray | None:
r"""
:obj:`numpy.ndarray` of :obj:`int`, shape (\*, 2):
Array of indices defining segments connecting vertices.
"""
if getattr(self, "_cells", None) is None:
if self.on_file:
self._cells = self.workspace.fetch_array_attribute(self)
return self._cells
@cells.setter
def cells(self, indices):
if indices.dtype != "uint32":
raise TypeError("Indices array must be of type 'uint32'")
self._cells = indices
self.workspace.update_attribute(self, "cells")
@property
def collar(self) -> np.ndarray:
"""
Coordinates of the collar, shape(1, 3)
"""
return self._collar
@collar.setter
def collar(self, value: list | np.ndarray | None):
if value is None:
warnings.warn(
"No 'collar' provided. Using (0, 0, 0) as default point at the origin.",
UserWarning,
)
value = (0.0, 0.0, 0.0)
if isinstance(value, np.ndarray):
value = value.tolist()
if isinstance(value, str):
value = [float(n) for n in re.findall(r"-?\d+\.\d+", value)]
if len(value) != 3:
raise ValueError("Origin must be a list or numpy array of len (3,).")
value = np.asarray(tuple(value), dtype=self.__COLLAR_DTYPE)
self._collar = value
self._locations = None
self._intervals = None
if self.on_file:
self.workspace.update_attribute(self, "attributes")
if self.trace is not None:
self._trace = None
self._trace_depth = None
self.workspace.update_attribute(self, "trace")
self.workspace.update_attribute(self, "trace_depth")
@property
def cost(self) -> float:
"""
:obj:`float`: Cost estimate of the drillhole
"""
return self._cost
@cost.setter
def cost(self, value: Real):
if not isinstance(value, Real):
raise TypeError(f"Provided cost value must be of type {float} or int.")
self._cost = float(value)
if self.on_file:
self.workspace.update_attribute(self, "attributes")
@property
def end_of_hole(self) -> float | None:
"""
End of drillhole in meters
"""
return self._end_of_hole
@end_of_hole.setter
def end_of_hole(self, value: Real | None):
if not isinstance(value, (int, float, type(None))):
raise TypeError(f"Provided end_of_hole value must be of type {int}")
self._end_of_hole = value
if self.on_file:
self.workspace.update_attribute(self, "attributes")
@property
def extent(self) -> np.ndarray | None:
"""
Geography bounding box of the object.
:return: shape(2, 3) Bounding box defined by the bottom South-West and
top North-East coordinates.
"""
if self.collar is not None:
return (
np.repeat(
np.r_[[self.collar["x"], self.collar["y"], self.collar["z"]]], 2
)
.reshape((-1, 2))
.T
)
return None
@property
def intervals(self):
"""
Densified and computed interval information
"""
if self._intervals is None:
self._intervals = self.compute_intervals(
self.surveys, self.collar.tolist(), self.end_of_hole
)
return self._intervals
@property
def locations(self) -> np.ndarray | None:
"""
Lookup array of the well path in x, y, z coordinates.
"""
if getattr(self, "_locations", None) is None and self.collar is not None:
self._locations = self.depths_to_xyz(self.intervals, self.surveys[:, 0])
return self._locations
[docs]
def mask_by_extent(
self, extent: np.ndarray, inverse: bool = False
) -> np.ndarray | None:
"""
Sub-class extension of :func:`~geoh5py.shared.entity.Entity.mask_by_extent`.
Uses the collar location only.
"""
if self.extent is None or not box_intersect(self.extent, extent):
return None
if self.collar is not None:
return mask_by_extent(
np.c_[[self.collar["x"], self.collar["y"], self.collar["z"]]].T,
extent,
inverse=inverse,
)
return None
@property
def n_cells(self) -> int | None:
"""
Number of cells.
"""
if self.cells is not None:
return self.cells.shape[0]
return None
@property
def planning(self) -> str:
"""
Status of the hole on of "Default", "Ongoing", "Planned", "Completed" or "No status"
"""
return self._planning
@planning.setter
def planning(self, value: str):
choices = ["Default", "Ongoing", "Planned", "Completed", "No status"]
if value not in choices:
raise ValueError(f"Provided planning value must be one of {choices}")
self._planning = value
if self.on_file:
self.workspace.update_attribute(self, "attributes")
@property
def surveys(self) -> np.ndarray:
"""
Coordinates of the surveys.
"""
if self._surveys is None and self.on_file:
self._surveys = self.workspace.fetch_array_attribute(self, "surveys")
if self._surveys is None:
surveys = np.c_[0, 0, -90]
else:
surveys = np.vstack(
[
self._surveys["Depth"],
self._surveys["Azimuth"],
self._surveys["Dip"],
]
).T
return surveys.astype(float)
@surveys.setter
def surveys(self, array: np.ndarray | list | tuple):
if not isinstance(array, (np.ndarray, list, tuple)):
raise TypeError(
"Input 'surveys' must be of type 'numpy.ndarray' or 'list'."
)
self._surveys = self.format_survey_values(array)
self.end_of_hole = float(self._surveys["Depth"][-1])
if self.on_file:
self.workspace.update_attribute(self, "surveys")
if self.trace is not None:
self._trace = None
self.workspace.update_attribute(self, "trace")
self._locations = None
self._intervals = None
@property
def default_collocation_distance(self):
"""
Minimum collocation distance for matching depth on merge
"""
return self._default_collocation_distance
@default_collocation_distance.setter
def default_collocation_distance(self, tol):
if not tol > 0:
raise ValueError("Tolerance value should be >0.")
self._default_collocation_distance = tol
self.workspace.update_attribute(self, "attributes")
@property
def trace(self) -> np.ndarray | None:
"""
:obj:`numpy.array`: Drillhole trace defining the path in 3D
"""
if self._trace is None and self.on_file:
self._trace = self.workspace.fetch_array_attribute(self, "trace")
if self._trace is not None:
return self._trace.view("<f8").reshape((-1, 3))
return None
@property
def trace_depth(self) -> np.ndarray | None:
"""
:obj:`numpy.array`: Drillhole trace depth from top to bottom
"""
if getattr(self, "_trace_depth", None) is None and self.trace is not None:
trace = self.trace
self._trace_depth = trace[0, 2] - trace[:, 2]
return self._trace_depth
@property
def from_(self):
"""
Depth data corresponding to the tops of the interval values.
"""
data_obj = self.get_data("FROM")
if data_obj:
return data_obj[0]
return None
@property
def to_(self):
"""
Depth data corresponding to the bottoms of the interval values.
"""
data_obj = self.get_data("TO")
if data_obj:
return data_obj[0]
return None
@property
def depths(self) -> FloatData | None:
if self._depths is None:
data_obj = self.get_data("DEPTH")
if data_obj and isinstance(data_obj[0], FloatData):
self.depths = data_obj[0]
return self._depths
@depths.setter
def depths(self, value: FloatData | np.ndarray | None):
if isinstance(value, np.ndarray):
value = self.workspace.create_entity(
Data,
entity={
"parent": self,
"association": "VERTEX",
"name": "DEPTH",
"values": value,
},
entity_type={"primitive_type": "FLOAT"},
)
if isinstance(value, (FloatData, type(None))):
self._depths = value
else:
raise ValueError(
f"Input '_depth' property must be of type{FloatData} or None"
)
[docs]
def desurvey(self, depths):
"""
Function to return x, y, z coordinates from depth.
"""
return self.depths_to_xyz(self.intervals, depths)
[docs]
def add_vertices(self, xyz):
"""
Function to add vertices to the drillhole
"""
indices = np.arange(xyz.shape[0])
if self._vertices is not None:
indices += self.n_vertices
xyz = np.vstack([self.vertices, xyz])
self._vertices = np.asarray(
np.core.records.fromarrays(
xyz.T.tolist(),
dtype=[("x", "<f8"), ("y", "<f8"), ("z", "<f8")],
)
)
self.workspace.update_attribute(self, "vertices")
return indices.astype("uint32")
[docs]
def validate_interval_data( # pylint: disable=too-many-locals
self,
from_to: np.ndarray | list,
values: np.ndarray,
collocation_distance: float = 1e-4,
) -> np.ndarray:
"""
Compare new and current depth values, append new vertices if necessary and return
an augmented values vector that matches the vertices indexing.
:param from_to: Array of from-to values.
:param values: Array of values.
:param collocation_distance: Minimum collocation distance for matching.
:return: Augmented values vector that matches the vertices indexing.
"""
if isinstance(from_to, list):
from_to = np.vstack(from_to)
if from_to.shape[0] != len(values):
raise ValueError(
f"Mismatch between input 'from_to' shape{from_to.shape} "
+ f"and 'values' shape{values.shape}"
)
if from_to.ndim != 2 or from_to.shape[1] != 2:
raise ValueError("The `from-to` values must have shape(*, 2).")
if (self.from_ is None) and (self.to_ is None):
uni_depth, inv_map = np.unique(from_to, return_inverse=True)
self.cells = self.add_vertices(self.desurvey(uni_depth))[inv_map].reshape(
(-1, 2)
)
self.workspace.create_entity(
Data,
entity={
"parent": self,
"association": "CELL",
"name": "FROM",
"values": from_to[:, 0],
},
entity_type={"primitive_type": "FLOAT"},
)
self.workspace.create_entity(
Data,
entity={
"parent": self,
"association": "CELL",
"name": "TO",
"values": from_to[:, 1],
},
entity_type={"primitive_type": "FLOAT"},
)
elif self.cells is not None and self.from_ is not None and self.to_ is not None:
values = np.r_[values]
out_vec = np.c_[self.from_.values, self.to_.values]
dist_match = []
for i, elem in enumerate(from_to):
ind = np.where(
np.linalg.norm(elem - out_vec, axis=1) < collocation_distance
)[0]
if len(ind) > 0:
dist_match.append([ind[0], i])
cell_map: np.ndarray = np.asarray(dist_match, dtype=int)
# Add vertices
vert_new = np.ones_like(from_to, dtype="bool")
if cell_map.ndim == 2:
vert_new[cell_map[:, 1], :] = False
ind_new = np.where(vert_new.flatten())[0]
uni_new, inv_map = np.unique(
from_to.flatten()[ind_new], return_inverse=True
)
# check if its text data, and defined nan array if so
if values.dtype.kind in ["U", "S"]:
nan_values = np.array([""] * self.n_cells) # type: ignore
else:
nan_values = np.ones(self.n_cells) * np.nan
# Append values
values = merge_arrays(
nan_values,
values,
replace="B->A",
mapping=cell_map,
)
self.cells = np.r_[
self.cells,
self.add_vertices(self.desurvey(uni_new))[inv_map]
.reshape((-1, 2))
.astype("uint32"),
]
self.from_.values = merge_arrays(
self.from_.values, from_to[:, 0], mapping=cell_map
)
self.to_.values = merge_arrays(
self.to_.values, from_to[:, 1], mapping=cell_map
)
return values
[docs]
def validate_depth_data(
self,
depth: np.ndarray,
values: np.ndarray,
collocation_distance=1e-4,
) -> np.ndarray:
"""
Compare new and current depth values. Append new vertices if necessary and return
an augmented values vector that matches the vertices indexing.
:param depth: Array of depth values.
:param values: Array of values.
:param collocation_distance: Minimum collocation distance for matching.
:return: Augmented values vector that matches the vertices indexing.
"""
if depth.shape != values.shape:
raise ValueError(
f"Mismatch between input 'depth' shape{depth.shape} "
+ f"and 'values' shape{values.shape}"
)
input_values = np.r_[values]
if self.depths is None:
self.add_vertices(self.desurvey(depth))
self.depths = np.r_[
np.ones(self.n_vertices - depth.shape[0]) * np.nan, depth
]
values = np.r_[
np.ones(self.n_vertices - input_values.shape[0]) * np.nan, input_values
]
else:
depths, indices = merge_arrays(
self.depths.values,
depth,
return_mapping=True,
collocation_distance=collocation_distance,
)
values = merge_arrays(
np.ones(self.n_vertices) * np.nan,
input_values,
replace="B->A",
mapping=indices,
)
self.add_vertices(self.desurvey(np.delete(depth, indices[:, 1])))
self.depths.values = depths
return values
[docs]
def validate_association(
self, attributes: dict, property_group=None, collocation_distance=None, **_
) -> tuple:
"""
Validate input drillhole data attributes.
:param attributes: Dictionary of data attributes.
:param property_group: Input property group to validate against.
:param collocation_distance: Minimum collocation distance for matching.
"""
if collocation_distance is None:
collocation_distance = attributes.get(
"collocation_distance", self.default_collocation_distance
)
if collocation_distance < 0:
raise UserWarning("Input depth 'collocation_distance' must be >0.")
if "depth" not in attributes and "from-to" not in attributes:
if "association" not in attributes or attributes["association"] != "OBJECT":
raise ValueError(
"Input data dictionary must contain a key/value pair of depth data "
"or contain an 'OBJECT' association. Valid depth keys are 'depth' "
"and 'from-to'."
)
if attributes["name"] in self.get_data_list():
raise ValueError(
f"Data with name '{attributes['name']}' already present "
f"on the drillhole '{self.name}'. "
"Consider changing the values or renaming."
)
if "depth" in attributes.keys():
attributes["association"] = "VERTEX"
attributes["values"] = self.validate_depth_data(
attributes["depth"],
attributes["values"],
collocation_distance=collocation_distance,
)
del attributes["depth"]
if "from-to" in attributes.keys():
attributes["association"] = "CELL"
attributes["values"] = self.validate_interval_data(
attributes["from-to"],
attributes["values"],
collocation_distance=collocation_distance,
)
del attributes["from-to"]
return attributes, property_group
@property
def vertices(self) -> np.ndarray | None:
"""
:obj:`~geoh5py.objects.object_base.ObjectBase.vertices`
"""
if self._vertices is None and self.on_file:
self._vertices = self.workspace.fetch_array_attribute(self, "vertices")
if self._vertices is not None:
return self._vertices.view("<f8").reshape((-1, 3))
return None
@vertices.setter
def vertices(self, vertices: np.ndarray | list | tuple):
xyz = self.validate_vertices(vertices)
if self._vertices is not None and self._vertices.shape != xyz.shape:
raise ValueError(
"New vertices array must have the same shape as the current vertices array."
)
self._vertices = xyz
self.workspace.update_attribute(self, "vertices")
[docs]
def post_processing(self):
"""
Read the 'DEPTH' data and sort all Data.values if needed
"""
if self.get_data("DEPTH"):
data_obj = self.get_data("DEPTH")[0]
depths = data_obj.values
if isinstance(data_obj, NumericData):
depths = data_obj.validate_values(depths)
if not np.all(np.diff(depths) >= 0):
sort_ind = np.argsort(depths)
for child in self.children:
if (
isinstance(child, NumericData)
and getattr(child.association, "name", None) == "VERTEX"
):
child.values = child.validate_values(child.values)[sort_ind]
if self.vertices is not None:
self._vertices = self.vertices[sort_ind, :]
self.workspace.update_attribute(self, "vertices")
if self.cells is not None:
key_map = np.argsort(sort_ind)[self.cells.flatten()]
self._cells = key_map.reshape((-1, 2)).astype("uint32")
self.workspace.update_attribute(self, "cells")
[docs]
@staticmethod
def densify_survey_values(
survey: np.ndarray, end_of_hole: float | None
) -> np.ndarray:
"""
Validate the survey values for desurveying.
- Repeat first survey point at surface for de-survey interpolation
- Repeat last survey point at end of hole for de-survey interpolation
- Densify along intervals if greater than 50 m
:param survey: Array of survey points with columns 'Depth', 'Azimuth', 'Dip'.
:param end_of_hole: End of the drillhole in meters.
:return: Augmented array of survey points with columns 'Depth', 'Azimuth', 'Dip'.
"""
full_survey = survey.copy()
# Repeat first survey point at surface for de-survey interpolation
if survey[0, 0] != 0.0:
full_survey = np.vstack([survey[0, :], full_survey])
full_survey[0, 0] = 0.0
if end_of_hole is not None and end_of_hole > full_survey[-1, 0]:
last_row = full_survey[-1, :].copy()
last_row[0] = end_of_hole
full_survey = np.vstack([full_survey, last_row])
# Densify along intervals if greater than 50 m
deltas = np.diff(full_survey[:, 0])
new_points = []
for ii, delta in enumerate(deltas):
if delta < MAXIMUM_DEPTH_INTERVAL:
continue
depths = np.arange(0, delta, MAXIMUM_DEPTH_INTERVAL)
d_azm = (full_survey[ii + 1, 1] - full_survey[ii, 1]) / delta
d_dip = (full_survey[ii + 1, 2] - full_survey[ii, 2]) / delta
new_points.append(
np.c_[
full_survey[ii, 0] + depths[1:],
full_survey[ii, 1] + d_azm * depths[1:],
full_survey[ii, 2] + d_dip * depths[1:],
]
)
if len(new_points) > 1:
full_survey = np.vstack([full_survey, np.vstack(new_points)])
return np.unique(full_survey, axis=0)
[docs]
@staticmethod
def compute_intervals(survey: np.ndarray, collar, end_of_hole) -> dict:
"""
Compute the intervals from survey and collar information.
Stores a dictionary describing the arc circle between the survey points
separated by a maximum depth interval of 50 m.
Translated from C++ code.
:param survey: Array of survey points with columns 'Depth', 'Azimuth', 'Dip'.
:param collar: Collar location of the drillhole.
:param end_of_hole: End of the drillhole in meters.
"""
collar = np.asarray(collar).reshape((1, 3))
full_survey = Drillhole.densify_survey_values(survey, end_of_hole)
unit_vector = dip_azimuth_to_vector(full_survey[:, 2], full_survey[:, 1])
if survey.shape[0] < 2:
return {
"depths": np.r_[0],
"rad": np.r_[INFINITE_RADIUS],
"tangential": np.zeros((1, 3)),
"unit_vector": unit_vector,
"locations": collar,
}
# Cross product between neighbours
cross = np.cross(unit_vector[:-1, :], unit_vector[1:, :], axis=1)
vr = np.linalg.norm(cross, axis=1)
dot = np.sum(unit_vector[:-1, :] * unit_vector[1:, :], axis=1)
tangential = unit_vector[1:, :] - dot[:, None] * unit_vector[:-1, :]
norm = np.linalg.norm(tangential, axis=1)
norm[norm == 0.0] = INFINITE_RADIUS
tangential /= norm[:, None]
alpha = np.abs(0.5 * np.pi - np.arctan2(dot, vr))
delta_depth = np.diff(full_survey[:, 0])
radius = delta_depth / alpha
radius[alpha == 0.0] = delta_depth[alpha == 0.0] * INFINITE_RADIUS
alpha[alpha == 0.0] = delta_depth[alpha == 0.0] / radius[alpha == 0.0]
intervals = {
"depths": np.r_[full_survey[:, 0]],
"rad": np.r_[radius, INFINITE_RADIUS],
"tangential": np.vstack([tangential, np.zeros(3)]),
"unit_vector": unit_vector,
"locations": np.r_[
collar,
collar
+ np.cumsum(
radius[:, None]
* (
np.sin(alpha)[:, None] * unit_vector[:-1, :]
+ (1 - np.cos(alpha))[:, None] * tangential
),
axis=0,
),
],
}
return intervals
[docs]
@staticmethod
def depths_to_xyz(intervals: dict, depths: np.ndarray) -> np.ndarray:
"""
Convert survey parameters to x, y, z coordinates.
:param intervals: Dictionary of intervals parameters.
:param depths: Array of depth values.
"""
depths = np.asarray(depths).flatten()
ind = np.searchsorted(intervals["depths"], depths, side="right") - 1
dl = depths - intervals["depths"][ind]
radii = intervals["rad"][ind]
radii[radii == 0.0] = 1.0
angle = dl / radii
locations = intervals["locations"][ind, :] + radii[:, None] * (
np.sin(angle)[:, None] * intervals["unit_vector"][ind, :]
+ (1 - np.cos(angle))[:, None] * intervals["tangential"][ind, :]
)
return locations