diff --git a/README.md b/README.md index 36aa365..02b1672 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,41 @@ uv run interrogate -vv src/oshconnect # per-symbol (shows which symbols Once we agree on a baseline, raise `[tool.interrogate].fail-under` from `0` so new code without docstrings starts failing locally and in CI. +## OGC Format Serialization + +Format-explicit conversion methods on the wrapper classes (`System`, +`Datastream`, `ControlStream`) and the underlying pydantic resource models. +Use these to round-trip CS API server JSON in **SML+JSON**, **OM+JSON**, and +**SWE+JSON** without having to remember the `model_dump(by_alias=True, …)` +incantation, and to construct OSHConnect wrappers from raw server payloads. + +```python +from oshconnect import Node, System, Datastream + +node = Node(protocol="http", address="localhost", port=8282) + +# Build a System from an SML+JSON server response +sys_dict = {"type": "PhysicalSystem", "uniqueId": "urn:test:1", "label": "Sensor"} +sys = System.from_csapi_dict(sys_dict, node) # auto-detects SML vs GeoJSON +sys.to_smljson_dict() # -> dict ready to POST + +# Build a Datastream from a CS API listing entry +ds = Datastream.from_csapi_dict(ds_json, node) +ds.to_csapi_dict() # the resource body +ds.schema_to_swejson_dict() # the SWE+JSON schema doc +ds.observation_to_omjson_dict({"temperature": 22.5}) # one OM+JSON observation + +# Single observations / commands +from oshconnect.resource_datamodels import ObservationResource +obs = ObservationResource.from_omjson_dict(om_json_payload) +obs.to_swejson_dict() # flat SWE+JSON record +``` + +The two older static factories `System.from_system_resource` and +`Datastream.from_resource` are deprecated in favor of `from_csapi_dict` and +emit `DeprecationWarning` on use. They'll be removed in a future major +version. + ## Generating the Docs The documentation is built with [MkDocs](https://www.mkdocs.org/) using the diff --git a/pyproject.toml b/pyproject.toml index 13f12ec..23feae1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oshconnect" -version = "0.5.0a1" +version = "0.5.1a0" description = "Library for interfacing with OSH, helping guide visualization efforts, and providing a place to store configurations. Implements OGC CS API Part 3 (Pub/Sub) MQTT topic conventions including :data topics and resource event topics." readme = "README.md" authors = [ diff --git a/src/oshconnect/resource_datamodels.py b/src/oshconnect/resource_datamodels.py index 8262b9a..a18bd8d 100644 --- a/src/oshconnect/resource_datamodels.py +++ b/src/oshconnect/resource_datamodels.py @@ -6,7 +6,8 @@ # ============================================================================== from __future__ import annotations -from typing import List +import json +from typing import List, TYPE_CHECKING from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator from shapely import Point @@ -16,6 +17,9 @@ from .schema_datamodels import DatastreamRecordSchema, CommandSchema from .timemanagement import TimeInstant, TimePeriod +if TYPE_CHECKING: + from .swe_components import AnyComponent + class BoundingBox(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) @@ -132,6 +136,59 @@ class SystemResource(BaseModel): modes: List[Mode] = Field(None) method: ProcessMethod = Field(None) + def to_smljson_dict(self) -> dict: + """Render this system as an `application/sml+json` dict (SensorML JSON encoding). + + Sets ``feature_type = "PhysicalSystem"`` to match the SML discriminator + before dumping. Output keys are camelCase per the CS API wire format. + """ + self.feature_type = "PhysicalSystem" + return self.model_dump(by_alias=True, exclude_none=True, mode='json') + + def to_smljson(self) -> str: + """JSON-string variant of `to_smljson_dict`.""" + return json.dumps(self.to_smljson_dict()) + + def to_geojson_dict(self) -> dict: + """Render this system as an `application/geo+json` dict. + + Sets ``feature_type = "Feature"`` to match the GeoJSON discriminator + before dumping. Useful when posting to endpoints that expect the + GeoJSON Feature shape. + """ + self.feature_type = "Feature" + return self.model_dump(by_alias=True, exclude_none=True, mode='json') + + def to_geojson(self) -> str: + """JSON-string variant of `to_geojson_dict`.""" + return json.dumps(self.to_geojson_dict()) + + @classmethod + def from_smljson_dict(cls, data: dict) -> "SystemResource": + """Build a `SystemResource` from an `application/sml+json` dict + (e.g., a CS API server response body for a system in SML form).""" + return cls.model_validate(data, by_alias=True) + + @classmethod + def from_geojson_dict(cls, data: dict) -> "SystemResource": + """Build a `SystemResource` from an `application/geo+json` dict + (e.g., a CS API server response body for a system in GeoJSON form).""" + return cls.model_validate(data, by_alias=True) + + @classmethod + def from_csapi_dict(cls, data: dict) -> "SystemResource": + """Build a `SystemResource` from a CS API system dict, auto-dispatching + on the ``type`` field: ``"PhysicalSystem"`` → SML+JSON path, + ``"Feature"`` → GeoJSON path. Anything else falls through to a + permissive validate. + """ + feature_type = data.get("type") + if feature_type == "PhysicalSystem": + return cls.from_smljson_dict(data) + if feature_type == "Feature": + return cls.from_geojson_dict(data) + return cls.model_validate(data, by_alias=True) + class DatastreamResource(BaseModel): """ @@ -175,6 +232,25 @@ def handle_aliases(cls, values): break return values + def to_csapi_dict(self) -> dict: + """Render this datastream as the CS API `application/json` resource + body. The embedded ``schema`` field is dumped polymorphically per + whichever variant (`SWEDatastreamRecordSchema` / + `JSONDatastreamRecordSchema`) it holds. + """ + return self.model_dump(by_alias=True, exclude_none=True, mode='json') + + def to_csapi_json(self) -> str: + """JSON-string variant of `to_csapi_dict`.""" + return json.dumps(self.to_csapi_dict()) + + @classmethod + def from_csapi_dict(cls, data: dict) -> "DatastreamResource": + """Build a `DatastreamResource` from a CS API datastream dict + (e.g., a server response body or an entry from a /datastreams + listing).""" + return cls.model_validate(data, by_alias=True) + class ObservationResource(BaseModel): model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) @@ -187,6 +263,84 @@ class ObservationResource(BaseModel): result: dict = Field(...) result_link: Link = Field(None, alias="result@link") + def to_omjson_dict(self, datastream_id: str | None = None) -> dict: + """Render this observation as an `application/om+json` dict + (the ``ObservationOMJSONInline`` shape). + + :param datastream_id: Optional ID to include as ``datastream@id`` + on the output. The CS API typically supplies this from URL + context, so it's not required on the model itself. + """ + from .schema_datamodels import ObservationOMJSONInline + kwargs = {"result": self.result} + if datastream_id is not None: + kwargs["datastream_id"] = datastream_id + if self.phenomenon_time: + kwargs["phenomenon_time"] = self.phenomenon_time.get_iso_time() + if self.result_time: + kwargs["result_time"] = self.result_time.get_iso_time() + if self.parameters is not None: + kwargs["parameters"] = self.parameters + wrapper = ObservationOMJSONInline(**kwargs) + return wrapper.model_dump(by_alias=True, exclude_none=True, mode='json') + + def to_swejson_dict(self, schema: "AnyComponent" = None) -> dict: + """Render this observation as an `application/swe+json` payload + (the SWE Common JSON encoding of one record). + + SWE+JSON encodes a single observation as a flat JSON object whose + keys are the schema field names; ``self.result`` is already that + dict, so this is essentially a passthrough. The optional + ``schema`` argument is accepted for forward compatibility (when + we add field-order / encoding-aware emission). + """ + # ``schema`` reserved for future encoding rules (vector-as-arrays, + # JSONEncoding handling, etc.); current behavior is passthrough. + del schema + return dict(self.result) if self.result is not None else {} + + @classmethod + def from_omjson_dict(cls, data: dict) -> "ObservationResource": + """Build an `ObservationResource` from an `application/om+json` dict. + + Parses through `ObservationOMJSONInline` to validate the OM+JSON + envelope, then strips the ``datastream@id`` / ``foi@id`` envelope + fields (those live on the surrounding context, not the resource) + and returns the inner observation. + """ + from .schema_datamodels import ObservationOMJSONInline + wrapper = ObservationOMJSONInline.model_validate(data) + kwargs = { + "result_time": TimeInstant.from_string(wrapper.result_time), + "result": wrapper.result, + } + if wrapper.phenomenon_time: + kwargs["phenomenon_time"] = TimeInstant.from_string(wrapper.phenomenon_time) + if wrapper.parameters is not None: + kwargs["parameters"] = wrapper.parameters + return cls(**kwargs) + + @classmethod + def from_swejson_dict(cls, data: dict, schema: "AnyComponent" = None, + result_time: str | None = None) -> "ObservationResource": + """Build an `ObservationResource` from an `application/swe+json` + observation payload. + + SWE+JSON observations don't carry an envelope (no ``resultTime`` / + ``phenomenonTime`` fields); pass ``result_time`` explicitly when + you have it, otherwise the current UTC time is used. + + :param data: The flat SWE+JSON record dict. + :param schema: Optional schema, reserved for future per-field + type coercion. Currently ignored. + :param result_time: ISO 8601 timestamp for ``resultTime``; + defaults to ``TimeInstant.now_as_time_instant().isoformat()`` + if omitted. + """ + del schema # future use + rt = TimeInstant.from_string(result_time) if result_time is not None else TimeInstant.now_as_time_instant() + return cls(result_time=rt, result=dict(data)) + class ControlStreamResource(BaseModel): model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) @@ -206,3 +360,22 @@ class ControlStreamResource(BaseModel): asynchronous: bool = Field(True, alias="async") command_schema: SerializeAsAny[CommandSchema] = Field(None, alias="schema") links: List[Link] = Field(None) + + def to_csapi_dict(self) -> dict: + """Render this control stream as the CS API `application/json` + resource body. The embedded ``schema`` field is dumped + polymorphically per whichever variant + (`SWEJSONCommandSchema` / `JSONCommandSchema`) it holds. + """ + return self.model_dump(by_alias=True, exclude_none=True, mode='json') + + def to_csapi_json(self) -> str: + """JSON-string variant of `to_csapi_dict`.""" + return json.dumps(self.to_csapi_dict()) + + @classmethod + def from_csapi_dict(cls, data: dict) -> "ControlStreamResource": + """Build a `ControlStreamResource` from a CS API control-stream dict + (e.g., a server response body or an entry from a /controlstreams + listing).""" + return cls.model_validate(data, by_alias=True) diff --git a/src/oshconnect/schema_datamodels.py b/src/oshconnect/schema_datamodels.py index a1ff338..d000710 100644 --- a/src/oshconnect/schema_datamodels.py +++ b/src/oshconnect/schema_datamodels.py @@ -17,6 +17,12 @@ from .geometry import Geometry from .swe_components import AnyComponent, check_named + +def _dump_csapi(model: BaseModel) -> dict: + """Internal: canonical CS API serialization (alias keys, exclude None, JSON-mode).""" + return model.model_dump(by_alias=True, exclude_none=True, mode='json') + + """ In many of the top level resource models there is a "schema" field of some description. These models are meant to ease the burden on the end user to create those. @@ -33,6 +39,15 @@ class CommandJSON(BaseModel): sender: str = Field(None) params: Union[dict, list, int, float, str] = Field(None) + def to_csapi_dict(self) -> dict: + """Render as the CS API `application/json` command body.""" + return _dump_csapi(self) + + @classmethod + def from_csapi_dict(cls, data: dict) -> "CommandJSON": + """Build from a CS API command JSON dict.""" + return cls.model_validate(data) + class CommandSchema(BaseModel): """ @@ -58,6 +73,15 @@ def _root_record_schema_requires_name(self): check_named(self.record_schema, "SWEJSONCommandSchema.recordSchema") return self + def to_swejson_dict(self) -> dict: + """Render as an `application/swe+json` command-schema document.""" + return _dump_csapi(self) + + @classmethod + def from_swejson_dict(cls, data: dict) -> "SWEJSONCommandSchema": + """Build from an `application/swe+json` command-schema dict.""" + return cls.model_validate(data, by_alias=True) + class JSONCommandSchema(CommandSchema): """ @@ -79,6 +103,15 @@ def _root_schemas_require_name(self): check_named(self.feasibility_schema, "JSONCommandSchema.feasibilityResultSchema") return self + def to_json_dict(self) -> dict: + """Render as an `application/json` command-schema document.""" + return _dump_csapi(self) + + @classmethod + def from_json_dict(cls, data: dict) -> "JSONCommandSchema": + """Build from an `application/json` command-schema dict.""" + return cls.model_validate(data, by_alias=True) + class DatastreamRecordSchema(BaseModel): """ @@ -111,6 +144,16 @@ def _root_record_schema_requires_name(self): check_named(self.record_schema, "SWEDatastreamRecordSchema.recordSchema") return self + def to_swejson_dict(self) -> dict: + """Render as an `application/swe+json` datastream-schema document.""" + return _dump_csapi(self) + + @classmethod + def from_swejson_dict(cls, data: dict) -> "SWEDatastreamRecordSchema": + """Build from an `application/swe+json` datastream-schema dict + (e.g., a CS API ``/datastreams/{id}/schema`` response in SWE form).""" + return cls.model_validate(data, by_alias=True) + class JSONDatastreamRecordSchema(DatastreamRecordSchema): """Datastream observation schema for the JSON media types @@ -144,19 +187,39 @@ def _root_schemas_require_name(self): check_named(self.parameters_schema, "JSONDatastreamRecordSchema.parametersSchema") return self + def to_omjson_dict(self) -> dict: + """Render as an `application/om+json` datastream-schema document.""" + return _dump_csapi(self) + + @classmethod + def from_omjson_dict(cls, data: dict) -> "JSONDatastreamRecordSchema": + """Build from an `application/om+json` (or `application/json`) + datastream-schema dict (e.g., a CS API ``/datastreams/{id}/schema`` + response in OM+JSON form).""" + return cls.model_validate(data, by_alias=True) + class ObservationOMJSONInline(BaseModel): """ A class to represent an observation in OM-JSON format """ model_config = ConfigDict(populate_by_name=True) - datastream_id: str = Field(None, serialization_alias="datastream@id") - foi_id: str = Field(None, serialization_alias="foi@id") - phenomenon_time: str = Field(None, serialization_alias="phenomenonTime") - result_time: str = Field(datetime.now().isoformat(), serialization_alias="resultTime") + datastream_id: str = Field(None, alias="datastream@id") + foi_id: str = Field(None, alias="foi@id") + phenomenon_time: str = Field(None, alias="phenomenonTime") + result_time: str = Field(datetime.now().isoformat(), alias="resultTime") parameters: dict = Field(None) result: Union[int, float, str, dict, list] = Field(...) - result_links: List[Link] = Field(None, serialization_alias="result@links") + result_links: List[Link] = Field(None, alias="result@links") + + def to_csapi_dict(self) -> dict: + """Render as an `application/om+json` observation body.""" + return _dump_csapi(self) + + @classmethod + def from_csapi_dict(cls, data: dict) -> "ObservationOMJSONInline": + """Build from an `application/om+json` observation dict.""" + return cls.model_validate(data) class SystemEventOMJSON(BaseModel): diff --git a/src/oshconnect/streamableresource.py b/src/oshconnect/streamableresource.py index 80f9709..e10de9a 100644 --- a/src/oshconnect/streamableresource.py +++ b/src/oshconnect/streamableresource.py @@ -48,6 +48,7 @@ import logging import traceback import uuid +import warnings from abc import ABC from dataclasses import dataclass, field from enum import Enum @@ -243,6 +244,8 @@ def __init__(self, protocol: str, address: str, port: int, if self.is_secure: self._api_helper.user_auth = True self._systems = [] + # Default to no client session; populated by `register_with_session_manager`. + self._client_session = None if session_manager is not None: session_task = self.register_with_session_manager(session_manager) asyncio.gather(session_task) @@ -363,10 +366,12 @@ def register_streamable(self, streamable: StreamableResource): is driven by `OSHClientSession.connect_streamables` / `close_streamables`. - :raises ValueError: if the node was created without a SessionManager. + Soft no-op when no `SessionManager` was attached at construction; + the caller can still drive the streamable manually via + `initialize()` / `start()` / `stop()`. """ if self._client_session is None: - raise ValueError("Node is not registered with a SessionManager.") + return self._client_session.register_streamable(streamable) def get_session(self) -> OSHClientSession: @@ -992,30 +997,93 @@ def discover_controlstreams(self) -> list[ControlStream]: return controlstreams + @classmethod + def _construct_from_resource(cls, system_resource: SystemResource, parent_node: Node) -> "System": + """Build a `System` from a parsed `SystemResource`. Internal helper + shared by `from_csapi_dict` / `from_smljson_dict` / `from_geojson_dict` + and the deprecated `from_system_resource`. + """ + # exclude_none avoids triggering TimePeriod.ser_model on None-valued + # optional time fields (it does `str(self.start)` unconditionally). + other_props = system_resource.model_dump(exclude_none=True) + # GeoJSON form carries name/uid under properties; SML form has + # label/uid directly on the resource. + if other_props.get('properties'): + props = other_props['properties'] + new_system = cls(name=props.get('name'), + label=props.get('name'), + urn=props.get('uid'), + resource_id=system_resource.system_id, parent_node=parent_node) + else: + new_system = cls(name=system_resource.label, + label=system_resource.label, urn=system_resource.uid, + resource_id=system_resource.system_id, parent_node=parent_node) + + new_system.set_system_resource(system_resource) + return new_system + @staticmethod def from_system_resource(system_resource: SystemResource, parent_node: Node) -> System: """Build a `System` from an already-parsed `SystemResource`. + .. deprecated:: 0.5.1 + Use :meth:`System.from_csapi_dict` (auto-detect), + :meth:`System.from_smljson_dict`, or + :meth:`System.from_geojson_dict` instead. Those accept the raw + CS API dict directly without the manual `model_validate` step. + Handles both shapes the OSH server emits: the GeoJSON form (with a - ``properties`` block carrying ``name``/``uid``) and the flat form - (``name``/``label``/``urn`` directly on the resource). - """ - other_props = system_resource.model_dump() - print(f'Props of SystemResource: {other_props}') - - # case 1: has properties a la geojson - if 'properties' in other_props: - new_system = System(name=other_props['properties']['name'], - label=other_props['properties']['name'], - urn=other_props['properties']['uid'], - resource_id=system_resource.system_id, parent_node=parent_node) - else: - new_system = System(name=system_resource.name, - label=system_resource.label, urn=system_resource.urn, - resource_id=system_resource.system_id, parent_node=parent_node) + ``properties`` block carrying ``name``/``uid``) and the SML form + (``label``/``uid`` directly on the resource). + """ + warnings.warn( + "System.from_system_resource is deprecated; use System.from_csapi_dict " + "(auto-detect), from_smljson_dict, or from_geojson_dict instead.", + DeprecationWarning, stacklevel=2, + ) + return System._construct_from_resource(system_resource, parent_node) - new_system.set_system_resource(system_resource) - return new_system + @classmethod + def from_smljson_dict(cls, data: dict, parent_node: Node) -> "System": + """Build a `System` from an `application/sml+json` dict (e.g., a + CS API server response body for a system in SML form).""" + resource = SystemResource.from_smljson_dict(data) + return cls._construct_from_resource(resource, parent_node) + + @classmethod + def from_geojson_dict(cls, data: dict, parent_node: Node) -> "System": + """Build a `System` from an `application/geo+json` dict (e.g., a + CS API server response body for a system in GeoJSON form).""" + resource = SystemResource.from_geojson_dict(data) + return cls._construct_from_resource(resource, parent_node) + + @classmethod + def from_csapi_dict(cls, data: dict, parent_node: Node) -> "System": + """Build a `System` from any CS API system dict, auto-dispatching on + the ``type`` field (``"PhysicalSystem"`` → SML+JSON, + ``"Feature"`` → GeoJSON, anything else → permissive validate).""" + resource = SystemResource.from_csapi_dict(data) + return cls._construct_from_resource(resource, parent_node) + + def to_smljson_dict(self) -> dict: + """Render this system as an `application/sml+json` dict + (SensorML JSON) ready to POST to a CS API ``/systems`` endpoint.""" + return self._underlying_resource.to_smljson_dict() if self._underlying_resource \ + else self.to_system_resource().to_smljson_dict() + + def to_smljson(self) -> str: + """JSON-string variant of `to_smljson_dict`.""" + return json.dumps(self.to_smljson_dict()) + + def to_geojson_dict(self) -> dict: + """Render this system as an `application/geo+json` dict + (GeoJSON Feature shape).""" + return self._underlying_resource.to_geojson_dict() if self._underlying_resource \ + else self.to_system_resource().to_geojson_dict() + + def to_geojson(self) -> str: + """JSON-string variant of `to_geojson_dict`.""" + return json.dumps(self.to_geojson_dict()) def to_system_resource(self) -> SystemResource: """Render this `System` as a `SystemResource` pydantic model @@ -1252,10 +1320,102 @@ def get_id(self) -> str: @staticmethod def from_resource(ds_resource: DatastreamResource, parent_node: Node) -> 'Datastream': - """Build a `Datastream` from an already-parsed `DatastreamResource`.""" + """Build a `Datastream` from an already-parsed `DatastreamResource`. + + .. deprecated:: 0.5.1 + Use :meth:`Datastream.from_csapi_dict` instead, which accepts + the raw CS API dict directly without the manual `model_validate` + step. + """ + warnings.warn( + "Datastream.from_resource is deprecated; use Datastream.from_csapi_dict instead.", + DeprecationWarning, stacklevel=2, + ) new_ds = Datastream(parent_node=parent_node, datastream_resource=ds_resource) return new_ds + @classmethod + def from_csapi_dict(cls, data: dict, parent_node: Node) -> "Datastream": + """Build a `Datastream` from a CS API datastream dict (e.g., a server + response body or an entry from a ``/datastreams`` listing).""" + ds_resource = DatastreamResource.from_csapi_dict(data) + return cls(parent_node=parent_node, datastream_resource=ds_resource) + + def to_csapi_dict(self) -> dict: + """Render this datastream as a CS API `application/json` resource + body (the same shape the server emits for ``/datastreams/{id}``). + + The embedded ``schema`` field carries whichever variant + (`SWEDatastreamRecordSchema` or `JSONDatastreamRecordSchema`) the + datastream was constructed with. + """ + return self._underlying_resource.to_csapi_dict() + + def to_csapi_json(self) -> str: + """JSON-string variant of `to_csapi_dict`.""" + return self._underlying_resource.to_csapi_json() + + def schema_to_swejson_dict(self) -> dict: + """Return the embedded record schema as an `application/swe+json` + document. Raises if the underlying schema is OM+JSON.""" + from .schema_datamodels import SWEDatastreamRecordSchema + rs = self._underlying_resource.record_schema + if not isinstance(rs, SWEDatastreamRecordSchema): + raise TypeError( + "Datastream is not configured with a SWE+JSON schema; " + f"got {type(rs).__name__}. Use schema_to_omjson_dict() instead." + ) + return rs.to_swejson_dict() + + def schema_to_omjson_dict(self) -> dict: + """Return the embedded record schema as an `application/om+json` + document. Raises if the underlying schema is SWE+JSON.""" + from .schema_datamodels import JSONDatastreamRecordSchema + rs = self._underlying_resource.record_schema + if not isinstance(rs, JSONDatastreamRecordSchema): + raise TypeError( + "Datastream is not configured with an OM+JSON schema; " + f"got {type(rs).__name__}. Use schema_to_swejson_dict() instead." + ) + return rs.to_omjson_dict() + + def observation_to_omjson_dict(self, obs: ObservationResource | dict) -> dict: + """Render a single observation as an `application/om+json` payload. + + :param obs: An `ObservationResource` or a result dict + (``create_observation`` will be used to wrap the latter). + """ + if isinstance(obs, dict): + obs = self.create_observation(obs) + return obs.to_omjson_dict(datastream_id=self._resource_id) + + def observation_to_swejson_dict(self, obs: ObservationResource | dict) -> dict: + """Render a single observation as an `application/swe+json` payload + (a flat record matching the schema's field names).""" + if isinstance(obs, dict): + obs = self.create_observation(obs) + schema = None + rs = getattr(self._underlying_resource, 'record_schema', None) + if rs is not None: + schema = getattr(rs, 'record_schema', None) + return obs.to_swejson_dict(schema=schema) + + @classmethod + def observation_from_omjson_dict(cls, data: dict) -> ObservationResource: + """Build an `ObservationResource` from an `application/om+json` dict.""" + return ObservationResource.from_omjson_dict(data) + + @classmethod + def observation_from_swejson_dict(cls, data: dict, schema=None, + result_time: str | None = None) -> ObservationResource: + """Build an `ObservationResource` from a SWE+JSON payload. + + :param data: The flat SWE+JSON record dict. + :param schema: Optional schema, currently advisory. + :param result_time: ISO 8601 timestamp; defaults to now. + """ + return ObservationResource.from_swejson_dict(data, schema=schema, result_time=result_time) + def set_resource(self, resource: DatastreamResource): """Replace the underlying `DatastreamResource` model.""" self._underlying_resource = resource @@ -1435,6 +1595,80 @@ def add_underlying_resource(self, resource: ControlStreamResource): """Replace the underlying `ControlStreamResource` model.""" self._underlying_resource = resource + @classmethod + def from_csapi_dict(cls, data: dict, parent_node: Node) -> "ControlStream": + """Build a `ControlStream` from a CS API control-stream dict (e.g., + a server response body or an entry from a ``/controlstreams`` + listing).""" + cs_resource = ControlStreamResource.from_csapi_dict(data) + return cls(node=parent_node, controlstream_resource=cs_resource) + + def to_csapi_dict(self) -> dict: + """Render this control stream as a CS API `application/json` + resource body. The embedded ``schema`` field carries whichever + variant (`SWEJSONCommandSchema` or `JSONCommandSchema`) the + control stream was constructed with. + """ + return self._underlying_resource.to_csapi_dict() + + def to_csapi_json(self) -> str: + """JSON-string variant of `to_csapi_dict`.""" + return self._underlying_resource.to_csapi_json() + + def schema_to_swejson_dict(self) -> dict: + """Return the embedded command schema as an `application/swe+json` + document. Raises if the underlying schema is JSON.""" + from .schema_datamodels import SWEJSONCommandSchema + cs = self._underlying_resource.command_schema + if not isinstance(cs, SWEJSONCommandSchema): + raise TypeError( + "ControlStream is not configured with a SWE+JSON schema; " + f"got {type(cs).__name__}. Use schema_to_json_dict() instead." + ) + return cs.to_swejson_dict() + + def schema_to_json_dict(self) -> dict: + """Return the embedded command schema as an `application/json` + document. Raises if the underlying schema is SWE+JSON.""" + cs = self._underlying_resource.command_schema + if not isinstance(cs, JSONCommandSchema): + raise TypeError( + "ControlStream is not configured with a JSON schema; " + f"got {type(cs).__name__}. Use schema_to_swejson_dict() instead." + ) + return cs.to_json_dict() + + def command_to_json_dict(self, payload: dict, sender: str | None = None) -> dict: + """Render a single command as an `application/json` payload + (the `CommandJSON` envelope: ``control@id``, ``issueTime``, + ``sender``, ``params``).""" + from .schema_datamodels import CommandJSON + cmd = CommandJSON( + control_id=self._resource_id, + sender=sender, + params=payload, + ) + return cmd.to_csapi_dict() + + def command_to_swejson_dict(self, payload: dict) -> dict: + """Render a single command as an `application/swe+json` payload + (a flat record matching the schema's field names).""" + return dict(payload) + + @classmethod + def command_from_json_dict(cls, data: dict): + """Build a `CommandJSON` from an `application/json` command dict.""" + from .schema_datamodels import CommandJSON + return CommandJSON.from_csapi_dict(data) + + @classmethod + def command_from_swejson_dict(cls, data: dict, schema=None) -> dict: + """Build a command params dict from a SWE+JSON payload. Schema is + accepted for forward compatibility (per-field type coercion); + currently a passthrough.""" + del schema + return dict(data) + def init_mqtt(self): """Set ``self._topic`` to the control stream's command data topic.""" super().init_mqtt() diff --git a/src/oshconnect/timemanagement.py b/src/oshconnect/timemanagement.py index 5b5286e..d30fd94 100644 --- a/src/oshconnect/timemanagement.py +++ b/src/oshconnect/timemanagement.py @@ -93,7 +93,7 @@ def time_to_iso(a_time: datetime | float) -> str: :return: """ if isinstance(a_time, float): - return datetime.fromtimestamp(a_time).strftime(TimeUtils.iso_format) + return datetime.fromtimestamp(a_time, tz=timezone.utc).strftime(TimeUtils.iso_format) elif isinstance(a_time, datetime): return a_time.strftime(TimeUtils.iso_format) diff --git a/uv.lock b/uv.lock index e1cc0e8..3e8d2cd 100644 --- a/uv.lock +++ b/uv.lock @@ -719,7 +719,7 @@ wheels = [ [[package]] name = "oshconnect" -version = "0.5.0a1" +version = "0.5.1a0" source = { virtual = "." } dependencies = [ { name = "aiohttp" },