diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6ed722aa1..dc4a75642 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -37,6 +37,7 @@ jobs: pip3 install setuptools pip3 install -r requirements.txt pip3 install -r requirements-standalone.txt + pip3 install -r requirements-pubsub.txt pip3 install -r requirements-dev.txt pip3 install --upgrade https://github.com/geopython/OWSLib/archive/master.zip pip3 install tox diff --git a/default-sample.yml b/default-sample.yml index 8f56a1ef4..b3d765c9d 100644 --- a/default-sample.yml +++ b/default-sample.yml @@ -63,6 +63,11 @@ manager: - 127.0.0.1 csw_harvest_pagesize: 10 +#pubsub: +# broker: +# type: mqtt +# url: mqtt://localhost:1883 + metadata: identification: title: pycsw Geospatial Catalogue diff --git a/docs/configuration.rst b/docs/configuration.rst index 25ca86608..58303aed0 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -38,6 +38,20 @@ pycsw's runtime configuration is defined by ``default.yml``. pycsw ships with a - **allowed_ips**: comma delimited list of IP addresses (e.g. 192.168.0.103), wildcards (e.g. 192.168.0.*) or CIDR notations (e.g. 192.168.100.0/24) allowed to perform transactions (see :ref:`transactions`) - **csw_harvest_pagesize**: when harvesting other CSW servers, the number of records per request to page by (default is 10) +**pubsub** + +- **broker**: Publish-Subscribe definition + +**pubsub.broker** + +- **show_link**: whether to display as a link in the landing page (``true`` or ``false``) +- **type**: type of broker +- **url**: endpoint of broker + +.. note:: + + See :ref:`pubsub` for configuring your instance with Pub/Sub capability. + **metadata** **metadata.identification** diff --git a/docs/index.rst b/docs/index.rst index 2332694ab..3186817a4 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ pycsw |release| Documentation metadata-model-reference oarec-support csw-support + pubsub stac distributedsearching sru diff --git a/docs/introduction.rst b/docs/introduction.rst index b931bcb65..d3bc168b0 100644 --- a/docs/introduction.rst +++ b/docs/introduction.rst @@ -10,6 +10,7 @@ Features - implements `OGC API - Records - Part 1: Core`_ - implements `OGC API - Features - Part 3: Filtering`_ +- implements `STAC API`_ - implements `Common Query Language (CQL2)`_ - certified OGC `Compliant`_ and OGC Reference Implementation for both CSW 2.0.2 and CSW 3.0.0 - harvesting support for WMS, WFS, WCS, WPS, WAF, CSW, SOS @@ -20,6 +21,7 @@ Features - implements Full Text Search capabilities - implements OGC OpenSearch Geo and Time Extensions - implements Open Archives Initiative Protocol for Metadata Harvesting +- implements Pub/Sub capability via `OGC API Publish-Subscribe Workflow - Part 1: Core`_ - supports ISO, Dublin Core, DIF, FGDC, Atom, GM03 and DataCite metadata models - CGI or WSGI deployment - simple YAML configuration @@ -42,6 +44,7 @@ Standards Support `OGC API - Records - Part 1: Core`_,1.0 `OGC API - Features - Part 3: Filtering`_,draft "`OGC API - Features - Part 4: Create, Replace, Update and Delete`_",draft + `OGC API Publish-Subscribe Workflow - Part 1: Core`_,draft `OGC CSW`_,2.0.2/3.0.0 `OGC Filter`_,1.1.0/2.0.0 `OGC OWS Common`_,1.0.0/2.0.0 @@ -263,3 +266,6 @@ Paging .. _`GM03`: https://www.geocat.admin.ch/en/dokumentation/gm03.html .. _`OGC API - Features - Part 4: Create, Replace, Update and Delete`: https://cnn.com .. _`DataCite`: https://schema.datacite.org/meta/kernel-4.3/ +.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html +.. _`STAC API`: https://github.com/radiantearth/stac-api-spec + diff --git a/docs/pubsub.rst b/docs/pubsub.rst new file mode 100644 index 000000000..4e12a40b1 --- /dev/null +++ b/docs/pubsub.rst @@ -0,0 +1,36 @@ +.. _pubsub: + +Publish-Subscribe integration (Pub/Sub) +======================================= + +pycsw supports Publish-Subscribe (Pub/Sub) integration by implementing +the `OGC API Publish-Subscribe Workflow - Part 1: Core`_ (draft) specification. + +Pub/Sub integration can be enabled by defining a broker that pycsw can use to +publish notifications on given topics using CloudEvents (as per the specification). + +When enabled, core functionality of Pub/Sub includes: + +- displaying the broker link in the OGC API - Records landing (using the ``rel=hub`` link relation) +- sending a notification message on metadata transactions (create, replace, update, delete) + +The following message queuing protocols are supported: + +MQTT +---- + +Example directive: + +.. code-block:: yaml + + pubsub: + broker: + type: mqtt + url: mqtt://localhost:1883 + +.. note:: + + For MQTT endpoints requiring authentication, encode the ``url`` value as follows: ``mqtt://username:password@localhost:1883`` + + +.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html diff --git a/pycsw/broker/__init__.py b/pycsw/broker/__init__.py new file mode 100644 index 000000000..67893a229 --- /dev/null +++ b/pycsw/broker/__init__.py @@ -0,0 +1,50 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from pycsw.broker.base import BasePubSubClient +from pycsw.broker.mqtt import MQTTPubSubClient + + +def load_client(def_: dict) -> BasePubSubClient: + """ + Load Pub/Sub client plugin + + :param def: `dict` of client definition + + :returns: PubSubClient object + """ + + class_ = CLIENTS[def_['type']] + + return class_(def_) + + +CLIENTS = { + 'mqtt': MQTTPubSubClient +} diff --git a/pycsw/broker/base.py b/pycsw/broker/base.py new file mode 100644 index 000000000..763eb878a --- /dev/null +++ b/pycsw/broker/base.py @@ -0,0 +1,81 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import logging +import random +from urllib.parse import urlparse + +from pycsw.core.util import remove_url_auth + +LOGGER = logging.getLogger(__name__) + + +class BasePubSubClient: + """Base Pub/Sub client""" + + def __init__(self, publisher_def: dict): + """ + Initialize object + + :param publisher_def: publisher definition + + :returns: pycsw.broker.base.BasePubSubClient + """ + + self.type = None + self.client_id = f'pycsw-pubsub-{random.randint(0, 1000)}' + + self.show_link = publisher_def.get('show_link', True) + self.broker = publisher_def['url'] + self.broker_url = urlparse(publisher_def['url']) + self.broker_safe_url = remove_url_auth(self.broker) + + def connect(self) -> None: + """ + Connect to a Pub/Sub broker + + :returns: None + """ + + raise NotImplementedError() + + def pub(self, channel: str, message: str) -> bool: + """ + Publish a message to a broker/channel + + :param channel: `str` of channel + :param message: `str` of message + + :returns: `bool` of publish result + """ + + raise NotImplementedError() + + def __repr__(self): + return f' {self.broker_safe_url}' diff --git a/pycsw/broker/mqtt.py b/pycsw/broker/mqtt.py new file mode 100644 index 000000000..8ca62ce35 --- /dev/null +++ b/pycsw/broker/mqtt.py @@ -0,0 +1,115 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import logging + +from paho.mqtt import client as mqtt_client + +from pycsw.broker.base import BasePubSubClient + +LOGGER = logging.getLogger(__name__) + + +class MQTTPubSubClient(BasePubSubClient): + """MQTT client""" + + def __init__(self, broker_url): + """ + Initialize object + + :param publisher_def: provider definition + + :returns: pycsw.pubsub.mqtt.MQTTClient + """ + + super().__init__(broker_url) + self.type = 'mqtt' + self.port = self.broker_url.port + + self.userdata = {} + + msg = f'Connecting to broker {self.broker_safe_url} with id {self.client_id}' # noqa + LOGGER.debug(msg) + self.conn = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, + client_id=self.client_id) + + self.conn.enable_logger(logger=LOGGER) + + if None not in [self.broker_url.username, self.broker_url.password]: + LOGGER.debug('Setting credentials') + self.conn.username_pw_set( + self.broker_url.username, + self.broker_url.password) + + if self.port is None: + if self.broker_url.scheme == 'mqtts': + self.port = 8883 + else: + self.port = 1883 + + if self.broker_url.scheme == 'mqtts': + self.conn.tls_set(tls_version=2) + + def connect(self) -> None: + """ + Connect to an MQTT broker + :returns: None + """ + + self.conn.connect(self.broker_url.hostname, self.port) + LOGGER.debug('Connected to broker') + + def pub(self, topic: str, message: str, qos: int = 1) -> bool: + """ + Publish a message to a broker/topic + :param topic: `str` of topic + :param message: `str` of message + :returns: `bool` of publish result + """ + + LOGGER.debug(f'Publishing to broker {self.broker_safe_url}') + LOGGER.debug(f'Topic: {topic}') + LOGGER.debug(f'Message: {message}') + + result = self.conn.publish(topic, message, qos) + LOGGER.debug(f'Result: {result}') + + # TODO: investigate implication + # result.wait_for_publish() + + if result.is_published: + LOGGER.debug('Message published') + return True + else: + msg = f'Publishing error code: {result[1]}' + LOGGER.warning(msg) + return False + + def __repr__(self): + return f' {self.broker_safe_url}' diff --git a/pycsw/core/util.py b/pycsw/core/util.py index 80c2b56fa..2a6d20cc0 100644 --- a/pycsw/core/util.py +++ b/pycsw/core/util.py @@ -557,3 +557,17 @@ def str2bool(value: typing.Union[bool, str]) -> bool: value2 = value.lower() in ('yes', 'true', 't', '1', 'on') return value2 + + +def remove_url_auth(url: str) -> str: + """ + Provide a RFC1738 URL without embedded authentication + + :param url: RFC1738 URL + + :returns: RFC1738 URL without authentication + """ + + u = urlparse(url) + auth = f'{u.username}:{u.password}@' + return url.replace(auth, '') diff --git a/pycsw/ogc/api/oapi.py b/pycsw/ogc/api/oapi.py index f08fa60e0..be3503f2f 100644 --- a/pycsw/ogc/api/oapi.py +++ b/pycsw/ogc/api/oapi.py @@ -442,20 +442,24 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'): 'consumes': [ 'application/geo+json', 'application/json', 'application/xml' ], - 'produces': ['application/json'], - 'parameters': [{ - 'in': 'body', - 'name': 'body', - 'description': 'Adds item to collection', - 'required': True, - }], + 'produces': ['application/geo+json'], + 'requestBody': { + 'content': { + 'application/geo+json': { + 'schema': {} + } + } + }, + 'parameters': [ + {'$ref': '#/components/parameters/collectionId'} + ], 'responses': { '201': {'description': 'Successful creation'}, '400': { - '$ref': '{}#/components/responses/InvalidParameter' + '$ref': '#/components/responses/InvalidParameter' }, '500': { - '$ref': '{}#/components/responses/ServerError' + '$ref': '#/components/responses/ServerError' } } } @@ -513,23 +517,24 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'): 'application/geo+json', 'application/json', 'application/xml' ], 'produces': ['application/json'], + 'requestBody': { + 'content': { + 'application/geo+json': { + 'schema': {} + } + } + }, 'parameters': [ {'$ref': '#/components/parameters/collectionId'}, - {'$ref': '#/components/parameters/recordId'}, - { - 'in': 'body', - 'name': 'body', - 'description': 'Updates item to collection', - 'required': True, - } + {'$ref': '#/components/parameters/recordId'} ], 'responses': { '204': {'description': 'Successful update'}, '400': { - '$ref': '{}#/components/responses/InvalidParameter' + '$ref': '#/components/responses/InvalidParameter' }, '500': { - '$ref': '{}#/components/responses/ServerError' + '$ref': '#/components/responses/ServerError' } } } @@ -547,10 +552,10 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'): 'responses': { '204': {'description': 'Successful delete'}, '400': { - '$ref': '{}#/components/responses/InvalidParameter' + '$ref': '#/components/responses/InvalidParameter' }, '500': { - '$ref': '{}#/components/responses/ServerError' + '$ref': '#/components/responses/ServerError' } } } diff --git a/pycsw/ogc/api/records.py b/pycsw/ogc/api/records.py index c9eb4d22a..1bf6a065f 100644 --- a/pycsw/ogc/api/records.py +++ b/pycsw/ogc/api/records.py @@ -41,6 +41,7 @@ from pygeofilter.parsers.cql2_json import parse as parse_cql2_json from pycsw import __version__ +from pycsw.broker import load_client from pycsw.core import log from pycsw.core.config import StaticContext from pycsw.core.metadata import parse_record @@ -48,6 +49,7 @@ from pycsw.core.util import bind_url, get_today_and_now, jsonify_links, load_custom_repo_mappings, str2bool, wkt2geom from pycsw.ogc.api.oapi import gen_oapi from pycsw.ogc.api.util import match_env_var, render_j2_template, to_json, to_rfc3339 +from pycsw.ogc.pubsub import publish_message LOGGER = logging.getLogger(__name__) @@ -92,6 +94,7 @@ def __init__(self, config: dict): self.mode = 'ogcapi-records' self.config = config + self.pubsub_client = None log.setup_logger(self.config.get('logging', {})) @@ -141,6 +144,10 @@ def __init__(self, config: dict): LOGGER.exception(msg) raise + if self.config.get('pubsub') is not None: + LOGGER.debug('Loading PubSub client') + self.pubsub_client = load_client(self.config['pubsub']['broker']) + def get_content_type(self, headers, args): """ Decipher content type requested @@ -295,6 +302,16 @@ def landing_page(self, headers_, args): } ] + if self.pubsub_client is not None and self.pubsub_client.show_link: + LOGGER.debug('Adding PubSub broker link') + pubsub_link = { + 'rel': 'hub', + 'type': 'application/json', + 'title': 'Pub/Sub broker', + 'href': self.pubsub_client.broker_safe_url + } + response['links'].append(pubsub_link) + return self.get_response(200, headers_, response, 'landing_page.html') def openapi(self, headers_, args): @@ -333,6 +350,14 @@ def conformance(self, headers_, args): 'conformsTo': CONFORMANCE_CLASSES } + if self.pubsub_client is not None: + LOGGER.debug('Adding conformance classes for OGC API - Publish-Subscribe') # noqa + pubsub_conformance_classes = [ + 'https://www.opengis.net/spec/ogcapi-pubsub-1/1.0/conf/message-payload-cloudevents-json', # noqa + 'https://www.opengis.net/spec/ogcapi-pubsub-1/1.0/conf/discovery' # noqa + ] + response['conformsTo'] += pubsub_conformance_classes + return self.get_response(200, headers_, response, 'conformance.html') def collections(self, headers_, args): @@ -954,9 +979,11 @@ def item(self, headers_, args, collection, item): return self.get_response(200, headers_, response, 'item.html') - def manage_collection_item(self, headers_, action='create', item=None, data=None): + def manage_collection_item(self, headers_, action='create', collection=None, + item=None, data=None): """ :param action: action (create, update, delete) + :param collection: collection identifier :param item: record identifier :param data: raw data / payload @@ -1032,6 +1059,10 @@ def manage_collection_item(self, headers_, action='create', item=None, data=None code = 200 response = {} + if self.pubsub_client is not None: + LOGGER.debug('Publishing message') + publish_message(self.pubsub_client, action, collection, item, data) + return self.get_response(code, headers_, response) def get_exception(self, status, headers, code, description): diff --git a/pycsw/ogc/pubsub/__init__.py b/pycsw/ogc/pubsub/__init__.py new file mode 100644 index 000000000..9844449d8 --- /dev/null +++ b/pycsw/ogc/pubsub/__init__.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from datetime import datetime +import json +import uuid +from typing import Union + + +def publish_message(pubsub_client, action: str, collection: str = None, + item: str = None, data: str = None) -> bool: + """ + Publish broker message + + :param action: `str` of action trigger name (create, update, delete) + :param collection: `str` of collection identifier + :param item: `str` of item identifier + :param data: `str` of data payload + + :returns: `bool` of whether message publishing was successful + """ + + channel = f'collections/{collection}' + type_ = f'org.ogc.api.collection.item.{action}' + + if action in ['create', 'update']: + media_type = 'application/geo+json' + data_ = data + elif action == 'delete': + media_type = 'text/plain' + data_ = item + + message = generate_ogc_cloudevent(type_, media_type, channel, data_) + + pubsub_client.connect() + pubsub_client.pub(channel, json.dumps(message)) + + +def generate_ogc_cloudevent(type_: str, media_type: str, + subject: str, data: Union[dict, str]) -> dict: + """ + Generate WIS2 Monitoring Event Message of WCMP2 report + + :param type_: `str` of CloudEvents type + :param subject: `str` of centre-id being reported + :param media_type: `str` of media type + :param data: `str` or `dict` of data + + :returns: `dict` of OGC CloudEvent payload + """ + + if isinstance(data, bytes): + data2 = data.decode('utf-8') + else: + data2 = data + + message = { + 'specversion': '1.0', + 'type': type_, + 'source': 'TODO', + 'subject': subject, + 'id': str(uuid.uuid4()), + 'time': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), + 'datacontenttype': media_type, + # 'dataschema': 'TODO', + 'data': data2 + } + + return message diff --git a/pycsw/stac/api.py b/pycsw/stac/api.py index d4dbd61c9..fb22bb94c 100644 --- a/pycsw/stac/api.py +++ b/pycsw/stac/api.py @@ -156,6 +156,16 @@ def landing_page(self, headers_, args): } ] + if self.pubsub_client is not None and self.pubsub_client.show_link: + LOGGER.debug('Adding PubSub broker link') + pubsub_link = { + 'rel': 'hub', + 'type': 'application/json', + 'title': 'Pub/Sub broker', + 'href': self.pubsub_client.broker_safe_url + } + response['links'].append(pubsub_link) + return self.get_response(200, headers_, response) def openapi(self, headers_, args): diff --git a/pycsw/wsgi_flask.py b/pycsw/wsgi_flask.py index 307b7583f..9674f1452 100644 --- a/pycsw/wsgi_flask.py +++ b/pycsw/wsgi_flask.py @@ -170,7 +170,7 @@ def collection(collection='metadata:main'): if request.method == 'PUT': return get_response( stacapi.manage_collection_item( - dict(request.headers), 'update', collection, + dict(request.headers), 'update', collection=collection, data=request.get_json(silent=True))) elif request.method == 'DELETE': return get_response( @@ -229,12 +229,12 @@ def items(collection='metadata:main'): data = request.data return get_response(api_.manage_collection_item(dict(request.headers), - 'create', data=data)) + 'create', collection=collection, data=data)) elif request.method == 'POST' and get_api_type(request.url_rule.rule) == 'stac-api': if request.url_rule.rule.endswith('items'): # STAC API transaction - create data = request.get_json(silent=True) return get_response(stacapi.manage_collection_item(dict(request.headers), - 'create', data=data, collection=collection)) + 'create', collection=collection, data=data)) else: # STAC API search return get_response(stacapi.items(dict(request.headers), request.get_json(silent=True), dict(request.args), @@ -266,11 +266,12 @@ def item(collection='metadata:main', item=None): if request.method == 'PUT': return get_response( api_.manage_collection_item( - dict(request.headers), 'update', item, + dict(request.headers), 'update', collection, item, data=request.get_json(silent=True))) elif request.method == 'DELETE': return get_response( - api_.manage_collection_item(dict(request.headers), 'delete', item)) + api_.manage_collection_item(dict(request.headers), 'delete', + collection, item)) else: if get_api_type(request.url_rule.rule) == 'stac-api': return get_response(stacapi.item(dict(request.headers), request.args, diff --git a/requirements-pubsub.txt b/requirements-pubsub.txt new file mode 100644 index 000000000..8579e8b22 --- /dev/null +++ b/requirements-pubsub.txt @@ -0,0 +1 @@ +paho-mqtt diff --git a/tests/functionaltests/suites/oarec/conftest.py b/tests/functionaltests/suites/oarec/conftest.py index 1df0b7cf6..17e051730 100644 --- a/tests/functionaltests/suites/oarec/conftest.py +++ b/tests/functionaltests/suites/oarec/conftest.py @@ -59,7 +59,7 @@ def config(): 'metadata': { 'identification': { 'title': 'pycsw Geospatial Catalogue', - 'description': 'pycsw is an OARec and OGC CSW server implementation written in Python', + 'description': 'pycsw is an OARec and OGC CSW server implementation written in Python', # noqa 'keywords': [ 'catalogue', 'discovery', @@ -110,7 +110,7 @@ def config(): } }, 'repository': { - 'database': 'sqlite:///tests/functionaltests/suites/cite/data/cite.db', + 'database': 'sqlite:///tests/functionaltests/suites/cite/data/cite.db', # noqa 'table': 'records' } } @@ -239,7 +239,7 @@ def sample_record(): "type": "dataset", "created": "2011-11-11", "updated": "2000-09-01", - "rights": "Copyright (c) 2010 Her Majesty the Queen in Right of Canada" + "rights": "Copyright (c) 2010 Her Majesty the Queen in Right of Canada" # noqa }, "links": [ { diff --git a/tests/functionaltests/suites/pubsub/conftest.py b/tests/functionaltests/suites/pubsub/conftest.py new file mode 100644 index 000000000..5ce71d593 --- /dev/null +++ b/tests/functionaltests/suites/pubsub/conftest.py @@ -0,0 +1,271 @@ +# ================================================================= +# +# Authors: Ricardo Garcia Silva +# Tom Kralidis +# +# Copyright (c) 2023 Ricardo Garcia Silva +# Copyright (c) 2024 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import pytest + + +@pytest.fixture() +def config(): + yield { + 'server': { + 'url': 'http://localhost/pycsw/oarec', + 'mimetype': 'application/xml;', + 'charset': 'UTF-8', + 'encoding': 'UTF-8', + 'language': 'en-US', + 'maxrecords': '10', + 'gzip_compresslevel': '9', + 'profiles': [ + 'apiso' + ] + }, + 'logging': { + 'level': 'ERROR' + }, + 'manager': { + 'transactions': True, + 'allowed_ips': [ + '127.0.0.1' + ] + }, + 'pubsub': { + 'broker': { + 'type': 'mqtt', + 'url': 'mqtt://localhost:1883' + } + }, + 'metadata': { + 'identification': { + 'title': 'pycsw Geospatial Catalogue', + 'description': 'pycsw is an OARec and OGC CSW server implementation written in Python', # noqa + 'keywords': [ + 'catalogue', + 'discovery', + 'metadata' + ], + 'keywords_type': 'theme', + 'fees': 'None', + 'accessconstraints': 'None' + }, + 'provider': { + 'name': 'Organization Name', + 'url': 'https://pycsw.org/' + }, + 'contact': { + 'name': 'Lastname, Firstname', + 'position': 'Position Title', + 'address': 'Mailing Address', + 'city': 'City', + 'stateorprovince': 'Administrative Area', + 'postalcode': 'Zip or Postal Code', + 'country': 'Country', + 'phone': '+xx-xxx-xxx-xxxx', + 'fax': '+xx-xxx-xxx-xxxx', + 'email': 'you@example.org', + 'url': 'Contact URL', + 'hours': 'Hours of Service', + 'instructions': 'During hours of service. Off on weekends.', + 'role': 'pointOfContact' + }, + 'inspire': { + 'enabled': True, + 'languages_supported': [ + 'eng', + 'gre' + ], + 'default_language': 'eng', + 'date': 'YYYY-MM-DD', + 'gemet_keywords': [ + 'Utility and governmental services' + ], + 'conformity_service': 'notEvaluated', + 'contact_name': 'Organization Name', + 'contact_email': 'you@example.org', + 'temp_extent': { + 'begin': 'YYYY-MM-DD', + 'end': 'YYYY-MM-DD' + } + } + }, + 'repository': { + 'database': 'sqlite:///tests/functionaltests/suites/cite/data/cite.db', # noqa + 'table': 'records' + } + } + + +@pytest.fixture() +def config_virtual_collections(config): + database = config['repository']['database'] + config['repository']['database'] = database.replace('cite.db', 'cite-virtual-collections.db') # noqa + return config + + +@pytest.fixture() +def sample_record(): + yield { + "id": "record-123", + "conformsTo": [ + "http://www.opengis.net/spec/ogcapi-records-1/1.0/req/record-core" + ], + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [[ + [-141, 42], + [-141, 84], + [-52, 84], + [-52, 42], + [-141, 42] + ]] + }, + "properties": { + "identifier": "3f342f64-9348-11df-ba6a-0014c2c00eab", + "title": "title in English", + "description": "abstract in English", + "themes": [ + { + "concepts": [ + "kw1 in English", + "kw2 in English", + "kw3 in English" + ] + }, + { + "concepts": [ + "FOO", + "BAR" + ], + "scheme": "http://example.org/vocab" + }, + { + "concepts": [ + "kw1", + "kw2" + ] + } + ], + "providers": [ + { + "name": "Environment Canada", + "individual": "Tom Kralidis", + "positionName": "Senior Systems Scientist", + "contactInfo": { + "phone": { + "office": "+01-123-456-7890" + }, + "email": { + "office": "+01-123-456-7890" + }, + "address": { + "office": { + "deliveryPoint": "4905 Dufferin Street", + "city": "Toronto", + "administrativeArea": "Ontario", + "postalCode": "M3H 5T4", + "country": "Canada" + }, + "onlineResource": { + "href": "https://www.ec.gc.ca/" + } + }, + "hoursOfService": "0700h - 1500h EST", + "contactInstructions": "email", + "url": { + "rel": "canonical", + "type": "text/html", + "href": "https://www.ec.gc.ca/" + } + }, + "roles": [ + {"name": "pointOfContact"} + ] + }, + { + "name": "Environment Canada", + "individual": "Tom Kralidis", + "positionName": "Senior Systems Scientist", + "contactInfo": { + "phone": {"office": "+01-123-456-7890"}, + "email": {"office": "+01-123-456-7890"}, + "address": { + "office": { + "deliveryPoint": "4905 Dufferin Street", + "city": "Toronto", + "administrativeArea": "Ontario", + "postalCode": "M3H 5T4", + "country": "Canada" + }, + "onlineResource": {"href": "https://www.ec.gc.ca/"} + }, + "hoursOfService": "0700h - 1500h EST", + "contactInstructions": "email", + "url": { + "rel": "canonical", + "type": "text/html", + "href": "https://www.ec.gc.ca/" + } + }, + "roles": [ + {"name": "distributor"} + ] + } + ], + "language": { + "code": "en" + }, + "type": "dataset", + "created": "2011-11-11", + "updated": "2000-09-01", + "rights": "Copyright (c) 2010 Her Majesty the Queen in Right of Canada" # noqa + }, + "links": [ + { + "rel": "canonical", + "href": "https://example.org/data", + "type": "WWW:LINK", + "title": "my waf" + }, + { + "rel": "service", + "href": "https://example.org/wms", + "type": "OGC:WMS", + "title": "roads" + } + ], + "time": { + "interval": [ + "1950-07-31", + None + ], + "resolution": "P1Y" + } + } diff --git a/tests/functionaltests/suites/pubsub/test_pubsub_functional.py b/tests/functionaltests/suites/pubsub/test_pubsub_functional.py new file mode 100644 index 000000000..70f099a7c --- /dev/null +++ b/tests/functionaltests/suites/pubsub/test_pubsub_functional.py @@ -0,0 +1,67 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import json + +import pytest + +from pycsw.ogc.api.records import API + +pytestmark = pytest.mark.functional + + +def test_landing_page(config): + api = API(config) + headers, status, content = api.landing_page({}, {'f': 'json'}) + content = json.loads(content) + + assert headers['Content-Type'] == 'application/json' + assert status == 200 + assert len(content['links']) == 16 + + links = ( + api.config['server']['url'], + api.config['pubsub']['broker']['url'] + ) + + for link in content['links']: + assert link['href'].startswith(links) + + headers, status, content = api.landing_page({}, {'f': 'html'}) + assert status == 200 + assert headers['Content-Type'] == 'text/html' + + +def test_conformance(config): + api = API(config) + headers, status, content = api.conformance({}, {}) + content = json.loads(content) + + assert headers['Content-Type'] == 'application/json' + assert len(content['conformsTo']) == 16