Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,41 @@ uv run interrogate -vv src/oshconnect # per-symbol (shows which symbols
Once we agree on a baseline, raise `[tool.interrogate].fail-under` from `0` so
new code without docstrings starts failing locally and in CI.

## OGC Format Serialization

Format-explicit conversion methods on the wrapper classes (`System`,
`Datastream`, `ControlStream`) and the underlying pydantic resource models.
Use these to round-trip CS API server JSON in **SML+JSON**, **OM+JSON**, and
**SWE+JSON** without having to remember the `model_dump(by_alias=True, …)`
incantation, and to construct OSHConnect wrappers from raw server payloads.

```python
from oshconnect import Node, System, Datastream

node = Node(protocol="http", address="localhost", port=8282)

# Build a System from an SML+JSON server response
sys_dict = {"type": "PhysicalSystem", "uniqueId": "urn:test:1", "label": "Sensor"}
sys = System.from_csapi_dict(sys_dict, node) # auto-detects SML vs GeoJSON
sys.to_smljson_dict() # -> dict ready to POST

# Build a Datastream from a CS API listing entry
ds = Datastream.from_csapi_dict(ds_json, node)
ds.to_csapi_dict() # the resource body
ds.schema_to_swejson_dict() # the SWE+JSON schema doc
ds.observation_to_omjson_dict({"temperature": 22.5}) # one OM+JSON observation

# Single observations / commands
from oshconnect.resource_datamodels import ObservationResource
obs = ObservationResource.from_omjson_dict(om_json_payload)
obs.to_swejson_dict() # flat SWE+JSON record
```

The two older static factories `System.from_system_resource` and
`Datastream.from_resource` are deprecated in favor of `from_csapi_dict` and
emit `DeprecationWarning` on use. They'll be removed in a future major
version.

## Generating the Docs

The documentation is built with [MkDocs](https://www.mkdocs.org/) using the
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "oshconnect"
version = "0.5.0a1"
version = "0.5.1a0"
description = "Library for interfacing with OSH, helping guide visualization efforts, and providing a place to store configurations. Implements OGC CS API Part 3 (Pub/Sub) MQTT topic conventions including :data topics and resource event topics."
readme = "README.md"
authors = [
Expand Down
175 changes: 174 additions & 1 deletion src/oshconnect/resource_datamodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
# ==============================================================================
from __future__ import annotations

from typing import List
import json
from typing import List, TYPE_CHECKING

from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator
from shapely import Point
Expand All @@ -16,6 +17,9 @@
from .schema_datamodels import DatastreamRecordSchema, CommandSchema
from .timemanagement import TimeInstant, TimePeriod

if TYPE_CHECKING:
from .swe_components import AnyComponent


class BoundingBox(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)
Expand Down Expand Up @@ -132,6 +136,59 @@ class SystemResource(BaseModel):
modes: List[Mode] = Field(None)
method: ProcessMethod = Field(None)

def to_smljson_dict(self) -> dict:
"""Render this system as an `application/sml+json` dict (SensorML JSON encoding).

Sets ``feature_type = "PhysicalSystem"`` to match the SML discriminator
before dumping. Output keys are camelCase per the CS API wire format.
"""
self.feature_type = "PhysicalSystem"
return self.model_dump(by_alias=True, exclude_none=True, mode='json')

def to_smljson(self) -> str:
"""JSON-string variant of `to_smljson_dict`."""
return json.dumps(self.to_smljson_dict())

def to_geojson_dict(self) -> dict:
"""Render this system as an `application/geo+json` dict.

Sets ``feature_type = "Feature"`` to match the GeoJSON discriminator
before dumping. Useful when posting to endpoints that expect the
GeoJSON Feature shape.
"""
self.feature_type = "Feature"
return self.model_dump(by_alias=True, exclude_none=True, mode='json')

def to_geojson(self) -> str:
"""JSON-string variant of `to_geojson_dict`."""
return json.dumps(self.to_geojson_dict())

@classmethod
def from_smljson_dict(cls, data: dict) -> "SystemResource":
"""Build a `SystemResource` from an `application/sml+json` dict
(e.g., a CS API server response body for a system in SML form)."""
return cls.model_validate(data, by_alias=True)

@classmethod
def from_geojson_dict(cls, data: dict) -> "SystemResource":
"""Build a `SystemResource` from an `application/geo+json` dict
(e.g., a CS API server response body for a system in GeoJSON form)."""
return cls.model_validate(data, by_alias=True)

@classmethod
def from_csapi_dict(cls, data: dict) -> "SystemResource":
"""Build a `SystemResource` from a CS API system dict, auto-dispatching
on the ``type`` field: ``"PhysicalSystem"`` → SML+JSON path,
``"Feature"`` → GeoJSON path. Anything else falls through to a
permissive validate.
"""
feature_type = data.get("type")
if feature_type == "PhysicalSystem":
return cls.from_smljson_dict(data)
if feature_type == "Feature":
return cls.from_geojson_dict(data)
return cls.model_validate(data, by_alias=True)


class DatastreamResource(BaseModel):
"""
Expand Down Expand Up @@ -175,6 +232,25 @@ def handle_aliases(cls, values):
break
return values

def to_csapi_dict(self) -> dict:
"""Render this datastream as the CS API `application/json` resource
body. The embedded ``schema`` field is dumped polymorphically per
whichever variant (`SWEDatastreamRecordSchema` /
`JSONDatastreamRecordSchema`) it holds.
"""
return self.model_dump(by_alias=True, exclude_none=True, mode='json')

def to_csapi_json(self) -> str:
"""JSON-string variant of `to_csapi_dict`."""
return json.dumps(self.to_csapi_dict())

@classmethod
def from_csapi_dict(cls, data: dict) -> "DatastreamResource":
"""Build a `DatastreamResource` from a CS API datastream dict
(e.g., a server response body or an entry from a /datastreams
listing)."""
return cls.model_validate(data, by_alias=True)


class ObservationResource(BaseModel):
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
Expand All @@ -187,6 +263,84 @@ class ObservationResource(BaseModel):
result: dict = Field(...)
result_link: Link = Field(None, alias="result@link")

def to_omjson_dict(self, datastream_id: str | None = None) -> dict:
"""Render this observation as an `application/om+json` dict
(the ``ObservationOMJSONInline`` shape).

:param datastream_id: Optional ID to include as ``datastream@id``
on the output. The CS API typically supplies this from URL
context, so it's not required on the model itself.
"""
from .schema_datamodels import ObservationOMJSONInline
kwargs = {"result": self.result}
if datastream_id is not None:
kwargs["datastream_id"] = datastream_id
if self.phenomenon_time:
kwargs["phenomenon_time"] = self.phenomenon_time.get_iso_time()
if self.result_time:
kwargs["result_time"] = self.result_time.get_iso_time()
if self.parameters is not None:
kwargs["parameters"] = self.parameters
wrapper = ObservationOMJSONInline(**kwargs)
return wrapper.model_dump(by_alias=True, exclude_none=True, mode='json')

def to_swejson_dict(self, schema: "AnyComponent" = None) -> dict:
"""Render this observation as an `application/swe+json` payload
(the SWE Common JSON encoding of one record).

SWE+JSON encodes a single observation as a flat JSON object whose
keys are the schema field names; ``self.result`` is already that
dict, so this is essentially a passthrough. The optional
``schema`` argument is accepted for forward compatibility (when
we add field-order / encoding-aware emission).
"""
# ``schema`` reserved for future encoding rules (vector-as-arrays,
# JSONEncoding handling, etc.); current behavior is passthrough.
del schema
return dict(self.result) if self.result is not None else {}

@classmethod
def from_omjson_dict(cls, data: dict) -> "ObservationResource":
"""Build an `ObservationResource` from an `application/om+json` dict.

Parses through `ObservationOMJSONInline` to validate the OM+JSON
envelope, then strips the ``datastream@id`` / ``foi@id`` envelope
fields (those live on the surrounding context, not the resource)
and returns the inner observation.
"""
from .schema_datamodels import ObservationOMJSONInline
wrapper = ObservationOMJSONInline.model_validate(data)
kwargs = {
"result_time": TimeInstant.from_string(wrapper.result_time),
"result": wrapper.result,
}
if wrapper.phenomenon_time:
kwargs["phenomenon_time"] = TimeInstant.from_string(wrapper.phenomenon_time)
if wrapper.parameters is not None:
kwargs["parameters"] = wrapper.parameters
return cls(**kwargs)

@classmethod
def from_swejson_dict(cls, data: dict, schema: "AnyComponent" = None,
result_time: str | None = None) -> "ObservationResource":
"""Build an `ObservationResource` from an `application/swe+json`
observation payload.

SWE+JSON observations don't carry an envelope (no ``resultTime`` /
``phenomenonTime`` fields); pass ``result_time`` explicitly when
you have it, otherwise the current UTC time is used.

:param data: The flat SWE+JSON record dict.
:param schema: Optional schema, reserved for future per-field
type coercion. Currently ignored.
:param result_time: ISO 8601 timestamp for ``resultTime``;
defaults to ``TimeInstant.now_as_time_instant().isoformat()``
if omitted.
"""
del schema # future use
rt = TimeInstant.from_string(result_time) if result_time is not None else TimeInstant.now_as_time_instant()
return cls(result_time=rt, result=dict(data))


class ControlStreamResource(BaseModel):
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
Expand All @@ -206,3 +360,22 @@ class ControlStreamResource(BaseModel):
asynchronous: bool = Field(True, alias="async")
command_schema: SerializeAsAny[CommandSchema] = Field(None, alias="schema")
links: List[Link] = Field(None)

def to_csapi_dict(self) -> dict:
"""Render this control stream as the CS API `application/json`
resource body. The embedded ``schema`` field is dumped
polymorphically per whichever variant
(`SWEJSONCommandSchema` / `JSONCommandSchema`) it holds.
"""
return self.model_dump(by_alias=True, exclude_none=True, mode='json')

def to_csapi_json(self) -> str:
"""JSON-string variant of `to_csapi_dict`."""
return json.dumps(self.to_csapi_dict())

@classmethod
def from_csapi_dict(cls, data: dict) -> "ControlStreamResource":
"""Build a `ControlStreamResource` from a CS API control-stream dict
(e.g., a server response body or an entry from a /controlstreams
listing)."""
return cls.model_validate(data, by_alias=True)
73 changes: 68 additions & 5 deletions src/oshconnect/schema_datamodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
from .geometry import Geometry
from .swe_components import AnyComponent, check_named


def _dump_csapi(model: BaseModel) -> dict:
"""Internal: canonical CS API serialization (alias keys, exclude None, JSON-mode)."""
return model.model_dump(by_alias=True, exclude_none=True, mode='json')


"""
In many of the top level resource models there is a "schema" field of some description. These models are meant to ease
the burden on the end user to create those.
Expand All @@ -33,6 +39,15 @@ class CommandJSON(BaseModel):
sender: str = Field(None)
params: Union[dict, list, int, float, str] = Field(None)

def to_csapi_dict(self) -> dict:
"""Render as the CS API `application/json` command body."""
return _dump_csapi(self)

@classmethod
def from_csapi_dict(cls, data: dict) -> "CommandJSON":
"""Build from a CS API command JSON dict."""
return cls.model_validate(data)


class CommandSchema(BaseModel):
"""
Expand All @@ -58,6 +73,15 @@ def _root_record_schema_requires_name(self):
check_named(self.record_schema, "SWEJSONCommandSchema.recordSchema")
return self

def to_swejson_dict(self) -> dict:
"""Render as an `application/swe+json` command-schema document."""
return _dump_csapi(self)

@classmethod
def from_swejson_dict(cls, data: dict) -> "SWEJSONCommandSchema":
"""Build from an `application/swe+json` command-schema dict."""
return cls.model_validate(data, by_alias=True)


class JSONCommandSchema(CommandSchema):
"""
Expand All @@ -79,6 +103,15 @@ def _root_schemas_require_name(self):
check_named(self.feasibility_schema, "JSONCommandSchema.feasibilityResultSchema")
return self

def to_json_dict(self) -> dict:
"""Render as an `application/json` command-schema document."""
return _dump_csapi(self)

@classmethod
def from_json_dict(cls, data: dict) -> "JSONCommandSchema":
"""Build from an `application/json` command-schema dict."""
return cls.model_validate(data, by_alias=True)


class DatastreamRecordSchema(BaseModel):
"""
Expand Down Expand Up @@ -111,6 +144,16 @@ def _root_record_schema_requires_name(self):
check_named(self.record_schema, "SWEDatastreamRecordSchema.recordSchema")
return self

def to_swejson_dict(self) -> dict:
"""Render as an `application/swe+json` datastream-schema document."""
return _dump_csapi(self)

@classmethod
def from_swejson_dict(cls, data: dict) -> "SWEDatastreamRecordSchema":
"""Build from an `application/swe+json` datastream-schema dict
(e.g., a CS API ``/datastreams/{id}/schema`` response in SWE form)."""
return cls.model_validate(data, by_alias=True)


class JSONDatastreamRecordSchema(DatastreamRecordSchema):
"""Datastream observation schema for the JSON media types
Expand Down Expand Up @@ -144,19 +187,39 @@ def _root_schemas_require_name(self):
check_named(self.parameters_schema, "JSONDatastreamRecordSchema.parametersSchema")
return self

def to_omjson_dict(self) -> dict:
"""Render as an `application/om+json` datastream-schema document."""
return _dump_csapi(self)

@classmethod
def from_omjson_dict(cls, data: dict) -> "JSONDatastreamRecordSchema":
"""Build from an `application/om+json` (or `application/json`)
datastream-schema dict (e.g., a CS API ``/datastreams/{id}/schema``
response in OM+JSON form)."""
return cls.model_validate(data, by_alias=True)


class ObservationOMJSONInline(BaseModel):
"""
A class to represent an observation in OM-JSON format
"""
model_config = ConfigDict(populate_by_name=True)
datastream_id: str = Field(None, serialization_alias="datastream@id")
foi_id: str = Field(None, serialization_alias="foi@id")
phenomenon_time: str = Field(None, serialization_alias="phenomenonTime")
result_time: str = Field(datetime.now().isoformat(), serialization_alias="resultTime")
datastream_id: str = Field(None, alias="datastream@id")
foi_id: str = Field(None, alias="foi@id")
phenomenon_time: str = Field(None, alias="phenomenonTime")
result_time: str = Field(datetime.now().isoformat(), alias="resultTime")
parameters: dict = Field(None)
result: Union[int, float, str, dict, list] = Field(...)
result_links: List[Link] = Field(None, serialization_alias="result@links")
result_links: List[Link] = Field(None, alias="result@links")

def to_csapi_dict(self) -> dict:
"""Render as an `application/om+json` observation body."""
return _dump_csapi(self)

@classmethod
def from_csapi_dict(cls, data: dict) -> "ObservationOMJSONInline":
"""Build from an `application/om+json` observation dict."""
return cls.model_validate(data)


class SystemEventOMJSON(BaseModel):
Expand Down
Loading
Loading