Source code for ni.datastore.metadata._metadata_store_client

"""Metadata store client for publishing and reading metadata."""

from __future__ import annotations

import logging
import sys
from collections.abc import Sequence
from pathlib import Path
from threading import Lock
from types import TracebackType
from typing import TYPE_CHECKING

from grpc import Channel
from ni.datastore.metadata._types._alias import Alias
from ni.datastore.metadata._types._extension_schema import ExtensionSchema
from ni.datastore.metadata._types._hardware_item import HardwareItem
from ni.datastore.metadata._types._metadata_items import MetadataItems
from ni.datastore.metadata._types._operator import Operator
from ni.datastore.metadata._types._software_item import SoftwareItem
from ni.datastore.metadata._types._test import Test
from ni.datastore.metadata._types._test_adapter import TestAdapter
from ni.datastore.metadata._types._test_description import TestDescription
from ni.datastore.metadata._types._test_station import TestStation
from ni.datastore.metadata._types._uut import Uut
from ni.datastore.metadata._types._uut_instance import UutInstance
from ni.measurementlink.discovery.v1.client import DiscoveryClient
from ni.measurements.metadata.v1.client import (
    MetadataStoreClient as MetadataStoreServiceClient,
)
from ni.measurements.metadata.v1.metadata_store_service_pb2 import (
    CreateAliasRequest,
    CreateFromJsonDocumentRequest,
    CreateHardwareItemRequest,
    CreateOperatorRequest,
    CreateSoftwareItemRequest,
    CreateTestAdapterRequest,
    CreateTestDescriptionRequest,
    CreateTestRequest,
    CreateTestStationRequest,
    CreateUutInstanceRequest,
    CreateUutRequest,
    DeleteAliasRequest,
    GetAliasRequest,
    GetHardwareItemRequest,
    GetOperatorRequest,
    GetSoftwareItemRequest,
    GetTestAdapterRequest,
    GetTestDescriptionRequest,
    GetTestRequest,
    GetTestStationRequest,
    GetUutInstanceRequest,
    GetUutRequest,
    ListSchemasRequest,
    QueryAliasesRequest,
    QueryHardwareItemsRequest,
    QueryOperatorsRequest,
    QuerySoftwareItemsRequest,
    QueryTestAdaptersRequest,
    QueryTestDescriptionsRequest,
    QueryTestsRequest,
    QueryTestStationsRequest,
    QueryUutInstancesRequest,
    QueryUutsRequest,
    RegisterSchemaRequest,
)
from ni_grpc_extensions.channelpool import GrpcChannelPool

if TYPE_CHECKING:
    if sys.version_info >= (3, 11):
        from typing import Self
    else:
        from typing_extensions import Self

_logger = logging.getLogger(__name__)


class MetadataStoreClient:
    """Metadata store client for publishing and reading metadata."""

    __slots__ = (
        "_closed",
        "_discovery_client",
        "_grpc_channel",
        "_grpc_channel_pool",
        "_metadata_store_client",
        "_metadata_store_client_lock",
    )

    _METADATA_STORE_CLIENT_CLOSED_ERROR = (
        "This MetadataStoreClient has been closed. Create a new MetadataStoreClient for "
        "further interaction with the metadata store."
    )

    _closed: bool
    _discovery_client: DiscoveryClient | None
    _grpc_channel: Channel | None
    _grpc_channel_pool: GrpcChannelPool | None
    _metadata_store_client: MetadataStoreServiceClient | None
    _metadata_store_client_lock: Lock

    def __init__(
        self,
        discovery_client: DiscoveryClient | None = None,
        grpc_channel: Channel | None = None,
        grpc_channel_pool: GrpcChannelPool | None = None,
    ) -> None:
        """Initialize the MetadataStoreClient.

        Args:
            discovery_client: An optional discovery client (recommended).

            grpc_channel: An optional metadata store gRPC channel. Providing this channel
                will bypass discovery service resolution of the metadata store.

            grpc_channel_pool: An optional gRPC channel pool (recommended).
        """
        self._discovery_client = discovery_client
        self._grpc_channel = grpc_channel
        self._grpc_channel_pool = grpc_channel_pool

        self._metadata_store_client = None
        self._metadata_store_client_lock = Lock()

        self._closed = False

[docs] def __enter__(self) -> Self: """Enter the runtime context of the metadata store client.""" return self
[docs] def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, traceback: TracebackType | None, ) -> None: """Exit the runtime context of the metadata store client.""" self.close()
[docs] def close(self) -> None: """Close the metadata store client and clean up resources that it owns.""" self._closed = True with self._metadata_store_client_lock: if self._metadata_store_client is not None: self._metadata_store_client.close() self._metadata_store_client = None
[docs] def create_uut_instance(self, uut_instance: UutInstance) -> str: """Create a new UUT instance in the metadata store. Args: uut_instance: The metadata of the UUT instance to be created. Returns: str: The identifier of the created UUT instance. """ create_request = CreateUutInstanceRequest(uut_instance=uut_instance.to_protobuf()) create_response = self._get_metadata_store_client().create_uut_instance(create_request) return create_response.uut_instance_id
[docs] def get_uut_instance(self, uut_instance_id: str) -> UutInstance: """Get the UUT instance associated with the given identifier. Args: uut_instance_id: The identifier of the desired UUT instance. Returns: UutInstance: The metadata of the requested UUT instance. """ get_request = GetUutInstanceRequest(uut_instance_id=uut_instance_id) get_response = self._get_metadata_store_client().get_uut_instance(get_request) return UutInstance.from_protobuf(get_response.uut_instance)
[docs] def query_uut_instances(self, odata_query: str = "") -> Sequence[UutInstance]: """Perform an OData query on UUT instances. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". $expand is not supported. Returns: Sequence[UutInstance]: The list of UUT instances that match the query. """ query_request = QueryUutInstancesRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_uut_instances(query_request) return [ UutInstance.from_protobuf(uut_instance) for uut_instance in query_response.uut_instances ]
[docs] def create_uut(self, uut: Uut) -> str: """Create a new UUT in the metadata store. Args: uut: The metadata of the UUT to be created. Returns: str: The identifier of the created UUT. """ create_request = CreateUutRequest(uut=uut.to_protobuf()) create_response = self._get_metadata_store_client().create_uut(create_request) return create_response.uut_id
[docs] def get_uut(self, uut_id: str) -> Uut: """Get the UUT associated with the given identifier. Args: uut_id: The identifier of the desired UUT. Returns: Uut: The metadata of the requested UUT. """ get_request = GetUutRequest(uut_id=uut_id) get_response = self._get_metadata_store_client().get_uut(get_request) return Uut.from_protobuf(get_response.uut)
[docs] def query_uuts(self, odata_query: str = "") -> Sequence[Uut]: """Perform an OData query on UUTs. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". $expand is not supported. Returns: Sequence[Uut]: The list of UUTs that match the query. """ query_request = QueryUutsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_uuts(query_request) return [Uut.from_protobuf(uut) for uut in query_response.uuts]
[docs] def create_operator(self, operator: Operator) -> str: """Create a new operator in the metadata store. Args: operator: The metadata of the operator to be created. Returns: str: The identifier of the created operator. """ create_request = CreateOperatorRequest(operator=operator.to_protobuf()) create_response = self._get_metadata_store_client().create_operator(create_request) return create_response.operator_id
[docs] def get_operator(self, operator_id: str) -> Operator: """Get the operator associated with the given identifier. Args: operator_id: The identifier of the desired operator. Returns: Operator: The metadata of the requested operator. """ get_request = GetOperatorRequest(operator_id=operator_id) get_response = self._get_metadata_store_client().get_operator(get_request) return Operator.from_protobuf(get_response.operator)
[docs] def query_operators(self, odata_query: str = "") -> Sequence[Operator]: """Perform an OData query on operators. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". $expand is not supported. Returns: Sequence[Operator]: The list of operators that match the query. """ query_request = QueryOperatorsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_operators(query_request) return [Operator.from_protobuf(operator) for operator in query_response.operators]
[docs] def create_test_description(self, test_description: TestDescription) -> str: """Create a test description in the metadata store. Args: test_description: The metadata of the test description to be created. Returns: str: The identifier of the created test description. """ create_request = CreateTestDescriptionRequest( test_description=test_description.to_protobuf() ) create_response = self._get_metadata_store_client().create_test_description(create_request) return create_response.test_description_id
[docs] def get_test_description(self, test_description_id: str) -> TestDescription: """Get a test description from the metadata store. Args: test_description_id: The identifier of the desired test description. Returns: TestDescription: The metadata of the requested test description. """ get_request = GetTestDescriptionRequest(test_description_id=test_description_id) get_response = self._get_metadata_store_client().get_test_description(get_request) return TestDescription.from_protobuf(get_response.test_description)
[docs] def query_test_descriptions(self, odata_query: str = "") -> Sequence[TestDescription]: """Query test descriptions from the metadata store. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". $expand is not supported. Returns: Sequence[TestDescription]: The list of test descriptions that match the query. """ query_request = QueryTestDescriptionsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_test_descriptions(query_request) return [ TestDescription.from_protobuf(test_description) for test_description in query_response.test_descriptions ]
[docs] def create_test(self, test: Test) -> str: """Create a test in the metadata store. Args: test: The metadata of the test to be created. Returns: str: The identifier of the created test. """ create_request = CreateTestRequest(test=test.to_protobuf()) create_response = self._get_metadata_store_client().create_test(create_request) return create_response.test_id
[docs] def get_test(self, test_id: str) -> Test: """Get a test from the metadata store. Args: test_id: The identifier of the desired test. Returns: Test: The metadata of the requested test. """ get_request = GetTestRequest(test_id=test_id) get_response = self._get_metadata_store_client().get_test(get_request) return Test.from_protobuf(get_response.test)
[docs] def query_tests(self, odata_query: str = "") -> Sequence[Test]: """Query tests from the metadata store.""" query_request = QueryTestsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_tests(query_request) return [Test.from_protobuf(test) for test in query_response.tests]
[docs] def create_test_station(self, test_station: TestStation) -> str: """Create a test station in the metadata store. Args: test_station: The metadata of the test station to be created. Returns: str: The identifier of the created test station. """ create_request = CreateTestStationRequest(test_station=test_station.to_protobuf()) create_response = self._get_metadata_store_client().create_test_station(create_request) return create_response.test_station_id
[docs] def get_test_station(self, test_station_id: str) -> TestStation: """Get a test station from the metadata store. Args: test_station_id: The identifier of the desired test station. Returns: TestStation: The metadata of the requested test station. """ get_request = GetTestStationRequest(test_station_id=test_station_id) get_response = self._get_metadata_store_client().get_test_station(get_request) return TestStation.from_protobuf(get_response.test_station)
[docs] def query_test_stations(self, odata_query: str = "") -> Sequence[TestStation]: """Query test stations from the metadata store.""" query_request = QueryTestStationsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_test_stations(query_request) return [ TestStation.from_protobuf(test_station) for test_station in query_response.test_stations ]
[docs] def create_hardware_item(self, hardware_item: HardwareItem) -> str: """Create a hardware item in the metadata store. Args: hardware_item: The metadata of the hardware item to be created. Returns: str: The identifier of the created hardware item. """ create_request = CreateHardwareItemRequest(hardware_item=hardware_item.to_protobuf()) create_response = self._get_metadata_store_client().create_hardware_item(create_request) return create_response.hardware_item_id
[docs] def get_hardware_item(self, hardware_item_id: str) -> HardwareItem: """Get a hardware item from the metadata store. Args: hardware_item_id: The identifier of the desired hardware item. Returns: HardwareItem: The metadata of the requested hardware item. """ get_request = GetHardwareItemRequest(hardware_item_id=hardware_item_id) get_response = self._get_metadata_store_client().get_hardware_item(get_request) return HardwareItem.from_protobuf(get_response.hardware_item)
[docs] def query_hardware_items(self, odata_query: str = "") -> Sequence[HardwareItem]: """Query hardware items from the metadata store.""" query_request = QueryHardwareItemsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_hardware_items(query_request) return [ HardwareItem.from_protobuf(hardware_item) for hardware_item in query_response.hardware_items ]
[docs] def create_software_item(self, software_item: SoftwareItem) -> str: """Create a software item in the metadata store. Args: software_item: The metadata of the software item to be created. Returns: str: The identifier of the created software item. """ create_request = CreateSoftwareItemRequest(software_item=software_item.to_protobuf()) create_response = self._get_metadata_store_client().create_software_item(create_request) return create_response.software_item_id
[docs] def get_software_item(self, software_item_id: str) -> SoftwareItem: """Get a software item from the metadata store. Args: software_item_id: The identifier of the desired software item. Returns: SoftwareItem: The metadata of the requested software item. """ get_request = GetSoftwareItemRequest(software_item_id=software_item_id) get_response = self._get_metadata_store_client().get_software_item(get_request) return SoftwareItem.from_protobuf(get_response.software_item)
[docs] def query_software_items(self, odata_query: str = "") -> Sequence[SoftwareItem]: """Query software items from the metadata store.""" query_request = QuerySoftwareItemsRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_software_items(query_request) return [ SoftwareItem.from_protobuf(software_item) for software_item in query_response.software_items ]
[docs] def create_test_adapter(self, test_adapter: TestAdapter) -> str: """Create a test adapter in the metadata store. Args: test_adapter: The metadata of the test adapter to be created. Returns: str: The identifier of the created test adapter. """ create_request = CreateTestAdapterRequest(test_adapter=test_adapter.to_protobuf()) create_response = self._get_metadata_store_client().create_test_adapter(create_request) return create_response.test_adapter_id
[docs] def get_test_adapter(self, test_adapter_id: str) -> TestAdapter: """Get a test adapter from the metadata store. Args: test_adapter_id: The identifier of the desired test adapter. Returns: TestAdapter: The metadata of the requested test adapter. """ get_request = GetTestAdapterRequest(test_adapter_id=test_adapter_id) get_response = self._get_metadata_store_client().get_test_adapter(get_request) return TestAdapter.from_protobuf(get_response.test_adapter)
[docs] def query_test_adapters(self, odata_query: str = "") -> Sequence[TestAdapter]: """Query test adapters from the metadata store.""" query_request = QueryTestAdaptersRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_test_adapters(query_request) return [ TestAdapter.from_protobuf(test_adapter) for test_adapter in query_response.test_adapters ]
[docs] def register_schema_from_file(self, schema_file_path: Path | str) -> str: """Register a schema obtained from the specified file in the metadata store. Args: schema_file_path: The path at which the schema file is located Raises: FileNotFoundError: If the schema file does not exist. """ if isinstance(schema_file_path, str): schema_file_path = Path(schema_file_path) if not schema_file_path.exists(): raise FileNotFoundError(f"Schema file not found: {schema_file_path}") schema_contents = schema_file_path.read_text(encoding="utf-8-sig") return self.register_schema(schema_contents=schema_contents)
[docs] def register_schema(self, schema_contents: str) -> str: """Register a schema in the metadata store. Once a schema has been published, it cannot be modified or removed. Args: schema_contents: The contents of the JSON or TOML schema. This should be a well-formed JSON or TOML schema. Validation will be performed, and an error will be returned if the schema is not valid. Returns: str: The ID of the schema. """ register_request = RegisterSchemaRequest(schema=schema_contents) register_response = self._get_metadata_store_client().register_schema(register_request) return register_response.schema_id
[docs] def list_schemas(self) -> Sequence[ExtensionSchema]: """List the schemas that have been previously registered. Returns: Sequence[ExtensionSchema]: The list of registered schemas. """ list_request = ListSchemasRequest() list_response = self._get_metadata_store_client().list_schemas(list_request) return [ExtensionSchema.from_protobuf(schema) for schema in list_response.schemas]
[docs] def get_alias(self, alias_name: str) -> Alias: """Get an alias and its target (the underlying metadata it represents). Args: alias_name: The name of the alias to retrieve. Returns: Alias: The alias containing the alias name, target type, and target ID of the underlying metadata. """ get_request = GetAliasRequest(alias_name=alias_name) get_response = self._get_metadata_store_client().get_alias(get_request) return Alias.from_protobuf(get_response.alias)
[docs] def query_aliases(self, odata_query: str = "") -> Sequence[Alias]: """Perform an OData query on the registered aliases. Args: odata_query: An OData query string. Example: "$filter=name eq 'Value'". $expand is not supported. Returns: Sequence[Alias]: The list of aliases that match the query. """ query_request = QueryAliasesRequest(odata_query=odata_query) query_response = self._get_metadata_store_client().query_aliases(query_request) return [Alias.from_protobuf(alias) for alias in query_response.aliases]
[docs] def create_alias( self, alias_name: str, alias_target: ( UutInstance | Uut | HardwareItem | SoftwareItem | Operator | TestDescription | Test | TestAdapter | TestStation ), ) -> Alias: """Create (register) an alias of the specified metadata. The specified metadata must have already been created prior to the alias registration. This method may be called with an already registered alias name in order to update the target mapped for that existing alias. Args: alias_name: The alias name to register. alias_target: The metadata instance to alias. The metadata instance to alias must have already been created in the metadata store in order to register an alias for it. Returns: Alias: The created alias containing the alias name, target type, and target ID. """ create_request = CreateAliasRequest(alias_name=alias_name) if isinstance(alias_target, UutInstance): create_request.uut_instance.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, Uut): create_request.uut.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, HardwareItem): create_request.hardware_item.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, SoftwareItem): create_request.software_item.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, Operator): create_request.operator.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, TestDescription): create_request.test_description.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, Test): create_request.test.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, TestAdapter): create_request.test_adapter.CopyFrom(alias_target.to_protobuf()) elif isinstance(alias_target, TestStation): create_request.test_station.CopyFrom(alias_target.to_protobuf()) response = self._get_metadata_store_client().create_alias(create_request) return Alias.from_protobuf(response.alias)
[docs] def delete_alias(self, alias_name: str) -> bool: """Remove a registered alias. Args: alias_name: The name of the alias to unregister. Returns: bool: Whether the action resulted in the specified alias becoming unregistered. False if the alias does not exist. """ delete_request = DeleteAliasRequest(alias_name=alias_name) delete_response = self._get_metadata_store_client().delete_alias(delete_request) return delete_response.unregistered
[docs] def create_from_json_file(self, metadata_file_path: Path | str) -> MetadataItems: """Create metadata items from a JSON file. Args: metadata_file_path: The path to the JSON file containing metadata definitions. Returns: MetadataItems: A collection of metadata items created from the JSON document. Raises: FileNotFoundError: If the JSON file does not exist. """ if isinstance(metadata_file_path, str): metadata_file_path = Path(metadata_file_path) if not metadata_file_path.exists(): raise FileNotFoundError(f"Metadata file not found: {metadata_file_path}") metadata_contents = metadata_file_path.read_text(encoding="utf-8-sig") return self.create_from_json(metadata_contents)
[docs] def create_from_json(self, metadata_file_contents: str) -> MetadataItems: """Create metadata items from a JSON document. Args: metadata_file_contents: The JSON document content containing metadata definitions. Returns: MetadataItems: A collection of metadata items created from the JSON document. """ create_request = CreateFromJsonDocumentRequest(json_document=metadata_file_contents) create_response = self._get_metadata_store_client().create_from_json_document( create_request ) return MetadataItems.from_protobuf(create_response)
def _get_metadata_store_client(self) -> MetadataStoreServiceClient: if self._closed: raise RuntimeError(self._METADATA_STORE_CLIENT_CLOSED_ERROR) if self._metadata_store_client is None: with self._metadata_store_client_lock: if self._metadata_store_client is None: self._metadata_store_client = self._instantiate_metadata_store_client() return self._metadata_store_client def _instantiate_metadata_store_client(self) -> MetadataStoreServiceClient: return MetadataStoreServiceClient( discovery_client=self._discovery_client, grpc_channel=self._grpc_channel, grpc_channel_pool=self._grpc_channel_pool, )