Source code for ni.datastore.data._data_store_client

"""Data store client for publishing and reading data."""

from __future__ import annotations

import logging
import sys
from collections.abc import Iterable, Sequence
from threading import Lock
from types import TracebackType
from typing import overload, Type, TYPE_CHECKING, TypeVar

import hightime as ht
from grpc import Channel
from ni.measurementlink.discovery.v1.client import DiscoveryClient
from ni.measurements.data.v1.client import DataStoreClient as DataStoreServiceClient
from ni.measurements.data.v1.data_store_service_pb2 import (
    CreateStepRequest,
    CreateTestResultRequest,
    GetConditionRequest,
    GetMeasurementRequest,
    GetStepRequest,
    GetTestResultRequest,
    PublishConditionBatchRequest,
    PublishConditionRequest,
    PublishMeasurementBatchRequest,
    PublishMeasurementRequest,
    QueryConditionsRequest,
    QueryMeasurementsRequest,
    QueryStepsRequest,
    QueryTestResultsRequest,
    ReadConditionValueRequest,
    ReadMeasurementValueRequest,
)
from ni.protobuf.types.precision_timestamp_conversion import (
    hightime_datetime_to_protobuf,
)
from ni_grpc_extensions.channelpool import GrpcChannelPool

from ni.datastore.data._grpc_conversion import (
    convert_read_condition_response_from_protobuf,
    convert_read_measurement_response_from_protobuf,
    get_publish_measurement_timestamp,
    populate_publish_condition_batch_request_values,
    populate_publish_condition_request_value,
    populate_publish_measurement_batch_request_values,
    populate_publish_measurement_request_value,
)
from ni.datastore.data._types._error_information import ErrorInformation
from ni.datastore.data._types._outcome import Outcome
from ni.datastore.data._types._published_condition import PublishedCondition
from ni.datastore.data._types._published_measurement import PublishedMeasurement
from ni.datastore.data._types._step import Step
from ni.datastore.data._types._test_result import TestResult

if TYPE_CHECKING:
    if sys.version_info >= (3, 11):
        from typing import Self
    else:
        from typing_extensions import Self

TRead = TypeVar("TRead")

_logger = logging.getLogger(__name__)


class DataStoreClient:
    """Data store client for publishing and reading data."""

    __slots__ = (
        "_closed",
        "_discovery_client",
        "_grpc_channel",
        "_grpc_channel_pool",
        "_data_store_client",
        "_data_store_client_lock",
    )

    _DATA_STORE_CLIENT_CLOSED_ERROR = (
        "This DataStoreClient has been closed. Create a new DataStoreClient for further "
        "interaction with the data store."
    )

    _closed: bool
    _discovery_client: DiscoveryClient | None
    _grpc_channel: Channel | None
    _grpc_channel_pool: GrpcChannelPool | None
    _data_store_client: DataStoreServiceClient | None
    _data_store_client_lock: Lock

    def __init__(
        self,
        discovery_client: DiscoveryClient | None = None,
        grpc_channel: Channel | None = None,
        grpc_channel_pool: GrpcChannelPool | None = None,
    ) -> None:
        """Initialize the DataStoreClient.

        Args:
            discovery_client: An optional discovery client (recommended).

            grpc_channel: An optional data store gRPC channel. Providing this channel will bypass
                discovery service resolution of the data store.

            grpc_channel_pool: An optional gRPC channel pool (recommended).
        """
        self._discovery_client = discovery_client
        self._grpc_channel = grpc_channel
        self._grpc_channel_pool = grpc_channel_pool

        self._data_store_client = None

        self._data_store_client_lock = Lock()

        self._closed = False

[docs] def __enter__(self) -> Self: """Enter the runtime context of the data store client.""" return self
[docs] def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, traceback: TracebackType | None, ) -> None: """Exit the runtime context of the data store client.""" self.close()
[docs] def close(self) -> None: """Close the data store client and clean up resources that it owns.""" self._closed = True with self._data_store_client_lock: if self._data_store_client is not None: self._data_store_client.close() self._data_store_client = None
[docs] def publish_condition( self, name: str, condition_type: str, value: object, step_id: str, ) -> str: """Publish a condition value to the data store. Args: name: An identifier describing the condition value. For example, "Voltage" or "Temperature". condition_type: The type of this condition. For example, "Upper Limit", "Environment", or "Setup". value: The single value for this condition to publish on the test step. This should be a scalar value that can be converted to the appropriate protobuf scalar type. step_id: The ID of the step associated with this condition. This value is expected to be a parsable GUID. Returns: str: The condition id - the unique ID of the condition for referencing in queries """ publish_request = PublishConditionRequest( name=name, condition_type=condition_type, step_id=step_id, ) populate_publish_condition_request_value(publish_request, value) publish_response = self._get_data_store_client().publish_condition(publish_request) return publish_response.condition_id
[docs] def publish_condition_batch( self, name: str, condition_type: str, values: object, step_id: str ) -> str: """Publish a batch of N values for a condition to the data store. Args: name: An identifier describing the condition values. For example, "Voltage" or "Temperature". condition_type: The type of this condition. For example, "Upper Limit", "Environment", or "Setup". values: The values for this condition across all publishes on the test step. This should be a Vector of N values. step_id: The ID of the step associated with this batch of condition values. This value is expected to be a parsable GUID. Returns: str: The condition id - the unique ID of the condition for referencing in queries """ publish_request = PublishConditionBatchRequest( name=name, condition_type=condition_type, step_id=step_id, ) populate_publish_condition_batch_request_values(publish_request, values) publish_response = self._get_data_store_client().publish_condition_batch(publish_request) return publish_response.condition_id
[docs] def publish_measurement( self, name: str, value: object, # More strongly typed Union[bool, AnalogWaveform] can be used if needed step_id: str, timestamp: ht.datetime | None = None, outcome: Outcome = Outcome.UNSPECIFIED, error_information: ErrorInformation | None = None, hardware_item_ids: Iterable[str] = tuple(), test_adapter_ids: Iterable[str] = tuple(), software_item_ids: Iterable[str] = tuple(), notes: str = "", ) -> str: """Publish a single measurement value associated with a test step. Args: name: The name used for associating/grouping conceptually alike measurements across multiple publish iterations. For example, "Temperature" can be used for associating temperature readings across multiple iterations. value: The value of the measurement being published. Supported types: - Scalar: Single float, int, str or boolean - Vector: Array of float, int, str or boolean values - DoubleAnalogWaveform: Analog waveform with double precision - DoubleXYData: XY coordinate data with double precision - I16AnalogWaveform: Analog waveform with 16-bit integer precision - DoubleComplexWaveform: Complex waveform with double precision - I16ComplexWaveform: Complex waveform with 16-bit integer precision - DoubleSpectrum: Frequency spectrum data with double precision - DigitalWaveform: Digital waveform data step_id: The ID of the step associated with this measurement. This value is expected to be a parsable GUID. timestamp: The timestamp of the measurement. If None, the current time will be used. outcome: The outcome of the measurement (PASSED, FAILED, INDETERMINATE, or UNSPECIFIED). error_information: Error or exception information in case of measurement failure. hardware_item_ids: The IDs of the hardware items associated with this measurement. These values are expected to be parsable GUIDs or aliases. test_adapter_ids: The IDs of the test adapters associated with this measurement. These values are expected to be parsable GUIDs or aliases. software_item_ids: The IDs of the software items associated with this measurement. These values are expected to be parsable GUIDs or aliases. notes: Any notes to be associated with the captured measurement. Returns: str: The published measurement id. """ publish_request = PublishMeasurementRequest( name=name, step_id=step_id, outcome=outcome.to_protobuf(), error_information=( error_information.to_protobuf() if error_information is not None else None ), hardware_item_ids=hardware_item_ids, test_adapter_ids=test_adapter_ids, software_item_ids=software_item_ids, notes=notes, ) populate_publish_measurement_request_value(publish_request, value) publish_request.timestamp.CopyFrom( get_publish_measurement_timestamp(publish_request, timestamp) ) publish_response = self._get_data_store_client().publish_measurement(publish_request) return publish_response.measurement_id
[docs] def publish_measurement_batch( self, name: str, values: object, step_id: str, timestamps: Iterable[ht.datetime] = tuple(), outcomes: Iterable[Outcome] = tuple(), error_information: Iterable[ErrorInformation] = tuple(), hardware_item_ids: Iterable[str] = tuple(), test_adapter_ids: Iterable[str] = tuple(), software_item_ids: Iterable[str] = tuple(), notes: str = "", ) -> Sequence[str]: """Publish multiple measurements at once for parametric sweeps. Args: name: The name used for associating/grouping conceptually alike measurements across multiple publish iterations. For example, "Temperature" can be used for associating temperature readings across multiple iterations. values: The values of the measurement being published across N iterations. step_id: The ID of the step associated with this measurement. This value is expected to be a parsable GUID. timestamps: The timestamps corresponding to the N iterations of batched measurement being published. Can be empty (no timestamp info), single value (applied to all), or N values (one per measurement). outcomes: The outcomes corresponding to the N iterations of batched measurement being published. Can be empty (no outcome info), single value (applied to all), or N values (one per measurement). error_information: The error information corresponding to the N iterations of batched measurement being published. Can be empty (no error info), single value (applied to all), or N values (one per measurement). hardware_item_ids: The IDs of the hardware items associated with this measurement. These values are expected to be parsable GUIDs or aliases. test_adapter_ids: The IDs of the test adapters associated with this measurement. These values are expected to be parsable GUIDs or aliases. software_item_ids: The IDs of the software items associated with this measurement. These values are expected to be parsable GUIDs or aliases. notes: Any notes to be associated with the published measurements. Returns: Sequence[str]: The IDs of the corresponding PublishedMeasurements. A single ID will be returned when publishing scalar measurement values. N IDs will be returned when publishing (N) non-scalar measurement values. """ publish_request = PublishMeasurementBatchRequest( name=name, step_id=step_id, timestamps=[hightime_datetime_to_protobuf(ts) for ts in timestamps], outcomes=[outcome.to_protobuf() for outcome in outcomes], error_information=( [ei.to_protobuf() for ei in (error_information or [])] if error_information else [] ), hardware_item_ids=hardware_item_ids, test_adapter_ids=test_adapter_ids, software_item_ids=software_item_ids, notes=notes, ) populate_publish_measurement_batch_request_values(publish_request, values) publish_response = self._get_data_store_client().publish_measurement_batch(publish_request) return publish_response.measurement_ids
@overload def read_condition_value( self, read_source: PublishedCondition, expected_type: Type[TRead], ) -> TRead: ... @overload def read_condition_value( self, read_source: PublishedCondition, ) -> object: ...
[docs] def read_condition_value( self, read_source: PublishedCondition, expected_type: Type[TRead] | None = None, ) -> TRead | object: """Read data published to the data store. Args: read_source: The source from which to read data (PublishedCondition). expected_type: Optional type to validate the returned data against. If provided, a TypeError will be raised if the actual data type doesn't match. Returns: The data retrieved from the data store. The return type depends on what was originally published: - Scalar conditions return as Vectors - Other types are returned as originally published If expected_type is specified, the return value is guaranteed to be of that type. Raises: TypeError: If expected_type is provided and the actual data type doesn't match. """ read_value = self._read_condition(read_source) if expected_type is not None and not isinstance(read_value, expected_type): raise TypeError(f"Expected type {expected_type}, got {type(read_value)}") return read_value
@overload def read_measurement_value( self, read_source: PublishedMeasurement, expected_type: Type[TRead], ) -> TRead: ... @overload def read_measurement_value( self, read_source: PublishedMeasurement, ) -> object: ...
[docs] def read_measurement_value( self, read_source: PublishedMeasurement, expected_type: Type[TRead] | None = None, ) -> TRead | object: """Read data published to the data store. Args: read_source: The source from which to read data (PublishedMeasurement). expected_type: Optional type to validate the returned data against. If provided, a TypeError will be raised if the actual data type doesn't match. Returns: The data retrieved from the data store. The return type depends on what was originally published: - Scalar measurements return as Vectors - Other types are returned as originally published If expected_type is specified, the return value is guaranteed to be of that type. Raises: TypeError: If expected_type is provided and the actual data type doesn't match. """ read_value = self._read_measurement(read_source) if expected_type is not None and not isinstance(read_value, expected_type): raise TypeError(f"Expected type {expected_type}, got {type(read_value)}") return read_value
[docs] def create_step(self, step: Step) -> str: """Create a new step in the data store. A step is owned by a test result and is a logical grouping of published measurements and conditions. All measurements and conditions must be associated with a step. Args: step: The metadata of the step to be created. Returns: str: The identifier of the created step. """ create_request = CreateStepRequest(step=step.to_protobuf()) create_response = self._get_data_store_client().create_step(create_request) return create_response.step_id
[docs] def get_step(self, step_id: str) -> Step: """Get the step associated with the given identifier. Args: step_id: The identifier of the desired step. Returns: Step: The metadata of the requested step. """ get_request = GetStepRequest(step_id=step_id) get_response = self._get_data_store_client().get_step(get_request) return Step.from_protobuf(get_response.step)
[docs] def get_measurement(self, measurement_id: str) -> PublishedMeasurement: """Get the measurement associated with the given identifier. Args: measurement_id: The identifier of the desired measurement. Returns: PublishedMeasurement: The metadata of the requested measurement. """ get_request = GetMeasurementRequest(measurement_id=measurement_id) get_response = self._get_data_store_client().get_measurement(get_request) return PublishedMeasurement.from_protobuf(get_response.published_measurement)
[docs] def get_condition(self, condition_id: str) -> PublishedCondition: """Get the condition associated with the given identifier. Args: condition_id: The identifier of the desired condition. Returns: PublishedCondition: The metadata of the requested condition. """ get_request = GetConditionRequest(condition_id=condition_id) get_response = self._get_data_store_client().get_condition(get_request) return PublishedCondition.from_protobuf(get_response.published_condition)
[docs] def create_test_result(self, test_result: TestResult) -> str: """Create a test result object for publishing measurements. Once a test result is created, you can publish an arbitrary number of measurements and conditions to a step which is owned by the test result. Args: test_result: The metadata of the test result to be created. Returns: str: The test result ID. Generated if not specified in the request. """ create_request = CreateTestResultRequest(test_result=test_result.to_protobuf()) create_response = self._get_data_store_client().create_test_result(create_request) return create_response.test_result_id
[docs] def get_test_result(self, test_result_id: str) -> TestResult: """Get the test result associated with the given identifier. Args: test_result_id: The ID of the desired test result. This value is expected to be a parsable GUID. Returns: TestResult: The TestResult object that corresponds to the requested ID. """ get_request = GetTestResultRequest(test_result_id=test_result_id) get_response = self._get_data_store_client().get_test_result(get_request) return TestResult.from_protobuf(get_response.test_result)
[docs] def query_conditions(self, odata_query: str = "") -> Sequence[PublishedCondition]: """Query conditions using OData query syntax. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". An empty string will return all conditions. $expand, $count, and $select are not supported. For more information, see https://learn.microsoft.com/en-us/odata/concepts/ queryoptions-overview. Returns: Sequence[PublishedCondition]: The list of matching conditions. Each item contains an id for retrieving the condition measurements, as well as the metadata associated with the condition. """ query_request = QueryConditionsRequest(odata_query=odata_query) query_response = self._get_data_store_client().query_conditions(query_request) return [ PublishedCondition.from_protobuf(published_condition) for published_condition in query_response.published_conditions ]
[docs] def query_measurements(self, odata_query: str = "") -> Sequence[PublishedMeasurement]: """Query measurements using OData query syntax. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". An empty string will return all measurements. $expand, $count, and $select are not supported. For more information, see https://learn.microsoft.com/en-us/odata/ concepts/queryoptions-overview. Returns: Sequence[PublishedMeasurement]: The list of matching measurements. Each item contains an id for retrieving the measurement, as well as the metadata associated with the measurement. """ query_request = QueryMeasurementsRequest(odata_query=odata_query) query_response = self._get_data_store_client().query_measurements(query_request) return [ PublishedMeasurement.from_protobuf(published_measurement) for published_measurement in query_response.published_measurements ]
[docs] def query_test_results(self, odata_query: str = "") -> Sequence[TestResult]: """Query test results using OData query syntax. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". An empty string will return all test results. $expand, $count, and $select are not supported. For more information, see https://learn.microsoft.com/en-us/odata/ concepts/queryoptions-overview. Returns: Sequence[TestResult]: The list of matching test results. Each item contains the metadata associated with the test result, including test result ID, name, timestamps, and other properties. """ query_request = QueryTestResultsRequest(odata_query=odata_query) query_response = self._get_data_store_client().query_test_results(query_request) return [ TestResult.from_protobuf(test_result) for test_result in query_response.test_results ]
[docs] def query_steps(self, odata_query: str = "") -> Sequence[Step]: """Query for steps matching the given OData query. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". An empty string will return all steps. $expand, $count, and $select are not supported. For more information, see https://learn.microsoft.com/en-us/odata/concepts/ queryoptions-overview. Returns: Sequence[Step]: The list of steps that match the query. """ query_request = QueryStepsRequest(odata_query=odata_query) query_response = self._get_data_store_client().query_steps(query_request) return [Step.from_protobuf(step) for step in query_response.steps]
def _get_data_store_client(self) -> DataStoreServiceClient: if self._closed: raise RuntimeError(self._DATA_STORE_CLIENT_CLOSED_ERROR) if self._data_store_client is None: with self._data_store_client_lock: if self._data_store_client is None: self._data_store_client = self._instantiate_data_store_client() return self._data_store_client def _instantiate_data_store_client(self) -> DataStoreServiceClient: return DataStoreServiceClient( discovery_client=self._discovery_client, grpc_channel=self._grpc_channel, grpc_channel_pool=self._grpc_channel_pool, ) def _read_measurement(self, published_measurement: PublishedMeasurement) -> object: request = ReadMeasurementValueRequest(measurement_id=published_measurement.id) response = self._get_data_store_client().read_measurement_value(request) return convert_read_measurement_response_from_protobuf(response) def _read_condition(self, published_condition: PublishedCondition) -> object: request = ReadConditionValueRequest(condition_id=published_condition.id) response = self._get_data_store_client().read_condition_value(request) return convert_read_condition_response_from_protobuf(response)