# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
# Copyright (c) 2025 Mira Geoscience Ltd. '
# '
# This file is part of geoh5py. '
# '
# geoh5py is free software: you can redistribute it and/or modify '
# it under the terms of the GNU Lesser General Public License as published by '
# the Free Software Foundation, either version 3 of the License, or '
# (at your option) any later version. '
# '
# geoh5py is distributed in the hope that it will be useful, '
# but WITHOUT ANY WARRANTY; without even the implied warranty of '
# GNU Lesser General Public License for more details. '
# '
# You should have received a copy of the GNU Lesser General Public License '
# along with geoh5py. If not, see <https://www.gnu.org/licenses/>. '
# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
# pylint: disable=R0902, R0904
from __future__ import annotations
import re
import uuid
import warnings
from numbers import Real
import numpy as np
from ..data import Data, FloatData, NumericData
from ..shared.utils import (
from .points import Points
class Drillhole(Points):
Drillhole object class defined by a collar and survey.
:param collar: Coordinates of the drillhole.
:param cost: Cost estimate of the drillhole.
:param end_of_hole: End of drillhole in meters.
:param planning: Status of the hole, defaults to 'Default'.
:param surveys: Survey information provided as 'Depth', 'Azimuth', 'Dip'.
:param vertices: Coordinates of the vertices.
:param default_collocation_distance: Minimum collocation distance for matching depth on merge.
_TYPE_UID = uuid.UUID(
fields=(0x7CAEBF0E, 0xD16E, 0x11E3, 0xBC, 0x69, 0xE4632694AA37)
__SURVEY_DTYPE = np.dtype([("Depth", "<f4"), ("Azimuth", "<f4"), ("Dip", "<f4")])
__COLLAR_DTYPE = np.dtype([("x", float), ("y", float), ("z", float)])
_attribute_map = Points._attribute_map.copy()
"Cost": "cost",
"Collar": "collar",
"Planning": "planning",
"End of hole": "end_of_hole",
def __init__( # pylint: disable=too-many-arguments
collar: np.ndarray | list | None = None,
cost: float = 0.0,
end_of_hole: float | None = None,
planning: str = "Default",
surveys: np.ndarray | list | tuple | None = None,
vertices: np.ndarray | None = None,
default_collocation_distance: float = 1e-2,
self._cells: np.ndarray | None = None
self._depths: FloatData | None = None
self._trace: np.ndarray | None = None
self._trace_depth: np.ndarray | None = None
self._locations = None
self._surveys: np.ndarray | None = None
self._intervals: dict | None = None
vertices=(0.0, 0.0, 0.0) if vertices is None else vertices, **kwargs
if vertices is None:
self._vertices = None
self.collar = collar
self.cost = cost
self.default_collocation_distance = default_collocation_distance
self.end_of_hole = end_of_hole
self.planning = planning
if surveys is not None:
self.surveys = surveys
def cells(self) -> np.ndarray | None:
:obj:`numpy.ndarray` of :obj:`int`, shape (\*, 2):
Array of indices defining segments connecting vertices.
if getattr(self, "_cells", None) is None:
if self.on_file:
self._cells = self.workspace.fetch_array_attribute(self)
return self._cells
def cells(self, indices):
if indices.dtype != "uint32":
raise TypeError("Indices array must be of type 'uint32'")
self._cells = indices
self.workspace.update_attribute(self, "cells")
def collar(self) -> np.ndarray:
Coordinates of the collar, shape(1, 3)
return self._collar
def collar(self, value: list | np.ndarray | None):
if value is None:
"No 'collar' provided. Using (0, 0, 0) as default point at the origin.",
value = (0.0, 0.0, 0.0)
if isinstance(value, np.ndarray):
value = value.tolist()
if isinstance(value, str):
value = [float(n) for n in re.findall(r"-?\d+\.\d+", value)]
if len(value) != 3:
raise ValueError("Origin must be a list or numpy array of len (3,).")
value = np.asarray(tuple(value), dtype=self.__COLLAR_DTYPE)
self._collar = value
self._locations = None
self._intervals = None
if self.on_file:
self.workspace.update_attribute(self, "attributes")
if self.trace is not None:
self._trace = None
self._trace_depth = None
self.workspace.update_attribute(self, "trace")
self.workspace.update_attribute(self, "trace_depth")
def cost(self) -> float:
:obj:`float`: Cost estimate of the drillhole
return self._cost
def cost(self, value: Real):
if not isinstance(value, Real):
raise TypeError(f"Provided cost value must be of type {float} or int.")
self._cost = float(value)
if self.on_file:
self.workspace.update_attribute(self, "attributes")
def end_of_hole(self) -> float | None:
End of drillhole in meters
return self._end_of_hole
def end_of_hole(self, value: Real | None):
if not isinstance(value, (int, float, type(None))):
raise TypeError(f"Provided end_of_hole value must be of type {int}")
self._end_of_hole = value
if self.on_file:
self.workspace.update_attribute(self, "attributes")
def extent(self) -> np.ndarray | None:
Geography bounding box of the object.
:return: shape(2, 3) Bounding box defined by the bottom South-West and
top North-East coordinates.
if self.collar is not None:
return (
np.r_[[self.collar["x"], self.collar["y"], self.collar["z"]]], 2
.reshape((-1, 2))
return None
def intervals(self):
Densified and computed interval information
if self._intervals is None:
self._intervals = self.compute_intervals(
self.surveys, self.collar.tolist(), self.end_of_hole
return self._intervals
def locations(self) -> np.ndarray | None:
Lookup array of the well path in x, y, z coordinates.
if getattr(self, "_locations", None) is None and self.collar is not None:
self._locations = self.depths_to_xyz(self.intervals, self.surveys[:, 0])
return self._locations
def mask_by_extent(
self, extent: np.ndarray, inverse: bool = False
) -> np.ndarray | None:
Sub-class extension of :func:`~geoh5py.shared.entity.Entity.mask_by_extent`.
Uses the collar location only.
if self.extent is None or not box_intersect(self.extent, extent):
return None
if self.collar is not None:
return mask_by_extent(
np.c_[[self.collar["x"], self.collar["y"], self.collar["z"]]].T,
return None
def n_cells(self) -> int | None:
Number of cells.
if self.cells is not None:
return self.cells.shape[0]
return None
def planning(self) -> str:
Status of the hole on of "Default", "Ongoing", "Planned", "Completed" or "No status"
return self._planning
def planning(self, value: str):
choices = ["Default", "Ongoing", "Planned", "Completed", "No status"]
if value not in choices:
raise ValueError(f"Provided planning value must be one of {choices}")
self._planning = value
if self.on_file:
self.workspace.update_attribute(self, "attributes")
def surveys(self) -> np.ndarray:
Coordinates of the surveys.
if self._surveys is None and self.on_file:
self._surveys = self.workspace.fetch_array_attribute(self, "surveys")
if self._surveys is None:
surveys = np.c_[0, 0, -90]
surveys = np.vstack(
return surveys.astype(float)
def surveys(self, array: np.ndarray | list | tuple):
if not isinstance(array, (np.ndarray, list, tuple)):
raise TypeError(
"Input 'surveys' must be of type 'numpy.ndarray' or 'list'."
self._surveys = self.format_survey_values(array)
self.end_of_hole = float(self._surveys["Depth"][-1])
if self.on_file:
self.workspace.update_attribute(self, "surveys")
if self.trace is not None:
self._trace = None
self.workspace.update_attribute(self, "trace")
self._locations = None
self._intervals = None
def default_collocation_distance(self):
Minimum collocation distance for matching depth on merge
return self._default_collocation_distance
def default_collocation_distance(self, tol):
if not tol > 0:
raise ValueError("Tolerance value should be >0.")
self._default_collocation_distance = tol
self.workspace.update_attribute(self, "attributes")
def trace(self) -> np.ndarray | None:
:obj:`numpy.array`: Drillhole trace defining the path in 3D
if self._trace is None and self.on_file:
self._trace = self.workspace.fetch_array_attribute(self, "trace")
if self._trace is not None:
return self._trace.view("<f8").reshape((-1, 3))
return None
def trace_depth(self) -> np.ndarray | None:
:obj:`numpy.array`: Drillhole trace depth from top to bottom
if getattr(self, "_trace_depth", None) is None and self.trace is not None:
trace = self.trace
self._trace_depth = trace[0, 2] - trace[:, 2]
return self._trace_depth
def from_(self):
Depth data corresponding to the tops of the interval values.
data_obj = self.get_data("FROM")
if data_obj:
return data_obj[0]
return None
def to_(self):
Depth data corresponding to the bottoms of the interval values.
data_obj = self.get_data("TO")
if data_obj:
return data_obj[0]
return None
def depths(self) -> FloatData | None:
if self._depths is None:
data_obj = self.get_data("DEPTH")
if data_obj and isinstance(data_obj[0], FloatData):
self.depths = data_obj[0]
return self._depths
def depths(self, value: FloatData | np.ndarray | None):
if isinstance(value, np.ndarray):
value = self.workspace.create_entity(
"parent": self,
"association": "VERTEX",
"name": "DEPTH",
"values": value,
entity_type={"primitive_type": "FLOAT"},
if isinstance(value, (FloatData, type(None))):
self._depths = value
raise ValueError(
f"Input '_depth' property must be of type{FloatData} or None"
def desurvey(self, depths):
Function to return x, y, z coordinates from depth.
return self.depths_to_xyz(self.intervals, depths)
def add_vertices(self, xyz):
Function to add vertices to the drillhole
indices = np.arange(xyz.shape[0])
if self._vertices is not None:
indices += self.n_vertices
xyz = np.vstack([self.vertices, xyz])
self._vertices = np.asarray(
dtype=[("x", "<f8"), ("y", "<f8"), ("z", "<f8")],
self.workspace.update_attribute(self, "vertices")
return indices.astype("uint32")
def validate_interval_data( # pylint: disable=too-many-locals
from_to: np.ndarray | list,
values: np.ndarray,
collocation_distance: float = 1e-4,
) -> np.ndarray:
Compare new and current depth values, append new vertices if necessary and return
an augmented values vector that matches the vertices indexing.
:param from_to: Array of from-to values.
:param values: Array of values.
:param collocation_distance: Minimum collocation distance for matching.
:return: Augmented values vector that matches the vertices indexing.
if isinstance(from_to, list):
from_to = np.vstack(from_to)
if from_to.shape[0] != len(values):
raise ValueError(
f"Mismatch between input 'from_to' shape{from_to.shape} "
+ f"and 'values' shape{values.shape}"
if from_to.ndim != 2 or from_to.shape[1] != 2:
raise ValueError("The `from-to` values must have shape(*, 2).")
if (self.from_ is None) and (self.to_ is None):
uni_depth, inv_map = np.unique(from_to, return_inverse=True)
self.cells = self.add_vertices(self.desurvey(uni_depth))[inv_map].reshape(
(-1, 2)
"parent": self,
"association": "CELL",
"name": "FROM",
"values": from_to[:, 0],
entity_type={"primitive_type": "FLOAT"},
"parent": self,
"association": "CELL",
"name": "TO",
"values": from_to[:, 1],
entity_type={"primitive_type": "FLOAT"},
elif self.cells is not None and self.from_ is not None and self.to_ is not None:
values = np.r_[values]
out_vec = np.c_[self.from_.values, self.to_.values]
dist_match = []
for i, elem in enumerate(from_to):
ind = np.where(
np.linalg.norm(elem - out_vec, axis=1) < collocation_distance
if len(ind) > 0:
dist_match.append([ind[0], i])
cell_map: np.ndarray = np.asarray(dist_match, dtype=int)
# Add vertices
vert_new = np.ones_like(from_to, dtype="bool")
if cell_map.ndim == 2:
vert_new[cell_map[:, 1], :] = False
ind_new = np.where(vert_new.flatten())[0]
uni_new, inv_map = np.unique(
from_to.flatten()[ind_new], return_inverse=True
# check if its text data, and defined nan array if so
if values.dtype.kind in ["U", "S"]:
nan_values = np.array([""] * self.n_cells) # type: ignore
nan_values = np.ones(self.n_cells) * np.nan
# Append values
values = merge_arrays(
self.cells = np.r_[
.reshape((-1, 2))
self.from_.values = merge_arrays(
self.from_.values, from_to[:, 0], mapping=cell_map
self.to_.values = merge_arrays(
self.to_.values, from_to[:, 1], mapping=cell_map
return values
def validate_depth_data(
depth: np.ndarray,
values: np.ndarray,
) -> np.ndarray:
Compare new and current depth values. Append new vertices if necessary and return
an augmented values vector that matches the vertices indexing.
:param depth: Array of depth values.
:param values: Array of values.
:param collocation_distance: Minimum collocation distance for matching.
:return: Augmented values vector that matches the vertices indexing.
if depth.shape != values.shape:
raise ValueError(
f"Mismatch between input 'depth' shape{depth.shape} "
+ f"and 'values' shape{values.shape}"
input_values = np.r_[values]
if self.depths is None:
self.depths = np.r_[
np.ones(self.n_vertices - depth.shape[0]) * np.nan, depth
values = np.r_[
np.ones(self.n_vertices - input_values.shape[0]) * np.nan, input_values
depths, indices = merge_arrays(
values = merge_arrays(
np.ones(self.n_vertices) * np.nan,
self.add_vertices(self.desurvey(np.delete(depth, indices[:, 1])))
self.depths.values = depths
return values
def validate_association(
self, attributes: dict, property_group=None, collocation_distance=None, **_
) -> tuple:
Validate input drillhole data attributes.
:param attributes: Dictionary of data attributes.
:param property_group: Input property group to validate against.
:param collocation_distance: Minimum collocation distance for matching.
if collocation_distance is None:
collocation_distance = attributes.get(
"collocation_distance", self.default_collocation_distance
if collocation_distance < 0:
raise UserWarning("Input depth 'collocation_distance' must be >0.")
if "depth" not in attributes and "from-to" not in attributes:
if "association" not in attributes or attributes["association"] != "OBJECT":
raise ValueError(
"Input data dictionary must contain a key/value pair of depth data "
"or contain an 'OBJECT' association. Valid depth keys are 'depth' "
"and 'from-to'."
if attributes["name"] in self.get_data_list():
raise ValueError(
f"Data with name '{attributes['name']}' already present "
f"on the drillhole '{self.name}'. "
"Consider changing the values or renaming."
if "depth" in attributes.keys():
attributes["association"] = "VERTEX"
attributes["values"] = self.validate_depth_data(
del attributes["depth"]
if "from-to" in attributes.keys():
attributes["association"] = "CELL"
attributes["values"] = self.validate_interval_data(
del attributes["from-to"]
return attributes, property_group
def vertices(self) -> np.ndarray | None:
if self._vertices is None and self.on_file:
self._vertices = self.workspace.fetch_array_attribute(self, "vertices")
if self._vertices is not None:
return self._vertices.view("<f8").reshape((-1, 3))
return None
def vertices(self, vertices: np.ndarray | list | tuple):
xyz = self.validate_vertices(vertices)
if self._vertices is not None and self._vertices.shape != xyz.shape:
raise ValueError(
"New vertices array must have the same shape as the current vertices array."
self._vertices = xyz
self.workspace.update_attribute(self, "vertices")
def post_processing(self):
Read the 'DEPTH' data and sort all Data.values if needed
if self.get_data("DEPTH"):
data_obj = self.get_data("DEPTH")[0]
depths = data_obj.values
if isinstance(data_obj, NumericData):
depths = data_obj.validate_values(depths)
if not np.all(np.diff(depths) >= 0):
sort_ind = np.argsort(depths)
for child in self.children:
if (
isinstance(child, NumericData)
and getattr(child.association, "name", None) == "VERTEX"
child.values = child.validate_values(child.values)[sort_ind]
if self.vertices is not None:
self._vertices = self.vertices[sort_ind, :]
self.workspace.update_attribute(self, "vertices")
if self.cells is not None:
key_map = np.argsort(sort_ind)[self.cells.flatten()]
self._cells = key_map.reshape((-1, 2)).astype("uint32")
self.workspace.update_attribute(self, "cells")
def densify_survey_values(
survey: np.ndarray, end_of_hole: float | None
) -> np.ndarray:
Validate the survey values for desurveying.
- Repeat first survey point at surface for de-survey interpolation
- Repeat last survey point at end of hole for de-survey interpolation
- Densify along intervals if greater than 50 m
:param survey: Array of survey points with columns 'Depth', 'Azimuth', 'Dip'.
:param end_of_hole: End of the drillhole in meters.
:return: Augmented array of survey points with columns 'Depth', 'Azimuth', 'Dip'.
full_survey = survey.copy()
# Repeat first survey point at surface for de-survey interpolation
if survey[0, 0] != 0.0:
full_survey = np.vstack([survey[0, :], full_survey])
full_survey[0, 0] = 0.0
if end_of_hole is not None and end_of_hole > full_survey[-1, 0]:
last_row = full_survey[-1, :].copy()
last_row[0] = end_of_hole
full_survey = np.vstack([full_survey, last_row])
# Densify along intervals if greater than 50 m
deltas = np.diff(full_survey[:, 0])
new_points = []
for ii, delta in enumerate(deltas):
depths = np.arange(0, delta, MAXIMUM_DEPTH_INTERVAL)
d_azm = (full_survey[ii + 1, 1] - full_survey[ii, 1]) / delta
d_dip = (full_survey[ii + 1, 2] - full_survey[ii, 2]) / delta
full_survey[ii, 0] + depths[1:],
full_survey[ii, 1] + d_azm * depths[1:],
full_survey[ii, 2] + d_dip * depths[1:],
if len(new_points) > 1:
full_survey = np.vstack([full_survey, np.vstack(new_points)])
return np.unique(full_survey, axis=0)
def compute_intervals(survey: np.ndarray, collar, end_of_hole) -> dict:
Compute the intervals from survey and collar information.
Stores a dictionary describing the arc circle between the survey points
separated by a maximum depth interval of 50 m.
Translated from C++ code.
:param survey: Array of survey points with columns 'Depth', 'Azimuth', 'Dip'.
:param collar: Collar location of the drillhole.
:param end_of_hole: End of the drillhole in meters.
collar = np.asarray(collar).reshape((1, 3))
full_survey = Drillhole.densify_survey_values(survey, end_of_hole)
unit_vector = dip_azimuth_to_vector(full_survey[:, 2], full_survey[:, 1])
if survey.shape[0] < 2:
return {
"depths": np.r_[0],
"rad": np.r_[INFINITE_RADIUS],
"tangential": np.zeros((1, 3)),
"unit_vector": unit_vector,
"locations": collar,
# Cross product between neighbours
cross = np.cross(unit_vector[:-1, :], unit_vector[1:, :], axis=1)
vr = np.linalg.norm(cross, axis=1)
dot = np.sum(unit_vector[:-1, :] * unit_vector[1:, :], axis=1)
tangential = unit_vector[1:, :] - dot[:, None] * unit_vector[:-1, :]
norm = np.linalg.norm(tangential, axis=1)
norm[norm == 0.0] = INFINITE_RADIUS
tangential /= norm[:, None]
alpha = np.abs(0.5 * np.pi - np.arctan2(dot, vr))
alpha[alpha == 0.0] = INFINITE_RADIUS**-1.0
delta_depth = np.diff(full_survey[:, 0])
radius = delta_depth / alpha
intervals = {
"depths": np.r_[full_survey[:, 0]],
"rad": np.r_[radius, INFINITE_RADIUS],
"tangential": np.vstack([tangential, np.zeros(3)]),
"unit_vector": unit_vector,
"locations": np.r_[
+ np.cumsum(
radius[:, None]
* (
np.sin(alpha)[:, None] * unit_vector[:-1, :]
+ (1 - np.cos(alpha))[:, None] * tangential
return intervals
def depths_to_xyz(intervals: dict, depths: np.ndarray) -> np.ndarray:
Convert survey parameters to x, y, z coordinates.
:param intervals: Dictionary of intervals parameters.
:param depths: Array of depth values.
depths = np.asarray(depths).flatten()
ind = np.searchsorted(intervals["depths"], depths, side="right") - 1
dl = depths - intervals["depths"][ind]
radii = intervals["rad"][ind]
radii[radii == 0.0] = 1.0
angle = dl / radii
locations = intervals["locations"][ind, :] + radii[:, None] * (
np.sin(angle)[:, None] * intervals["unit_vector"][ind, :]
+ (1 - np.cos(angle))[:, None] * intervals["tangential"][ind, :]
return locations