Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
pip3 install setuptools
pip3 install -r requirements.txt
pip3 install -r requirements-standalone.txt
pip3 install -r requirements-pubsub.txt
pip3 install -r requirements-dev.txt
pip3 install --upgrade https://github.com/geopython/OWSLib/archive/master.zip
pip3 install tox
Expand Down
5 changes: 5 additions & 0 deletions default-sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ manager:
- 127.0.0.1
csw_harvest_pagesize: 10

#pubsub:
# broker:
# type: mqtt
# url: mqtt://localhost:1883

metadata:
identification:
title: pycsw Geospatial Catalogue
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ pycsw's runtime configuration is defined by ``default.yml``. pycsw ships with a
- **allowed_ips**: comma delimited list of IP addresses (e.g. 192.168.0.103), wildcards (e.g. 192.168.0.*) or CIDR notations (e.g. 192.168.100.0/24) allowed to perform transactions (see :ref:`transactions`)
- **csw_harvest_pagesize**: when harvesting other CSW servers, the number of records per request to page by (default is 10)

**pubsub**

- **broker**: Publish-Subscribe definition

**pubsub.broker**

- **show_link**: whether to display as a link in the landing page (``true`` or ``false``)
- **type**: type of broker
- **url**: endpoint of broker

.. note::

See :ref:`pubsub` for configuring your instance with Pub/Sub capability.

**metadata**

**metadata.identification**
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pycsw |release| Documentation
metadata-model-reference
oarec-support
csw-support
pubsub
stac
distributedsearching
sru
Expand Down
6 changes: 6 additions & 0 deletions docs/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Features

- implements `OGC API - Records - Part 1: Core`_
- implements `OGC API - Features - Part 3: Filtering`_
- implements `STAC API`_
- implements `Common Query Language (CQL2)`_
- certified OGC `Compliant`_ and OGC Reference Implementation for both CSW 2.0.2 and CSW 3.0.0
- harvesting support for WMS, WFS, WCS, WPS, WAF, CSW, SOS
Expand All @@ -20,6 +21,7 @@ Features
- implements Full Text Search capabilities
- implements OGC OpenSearch Geo and Time Extensions
- implements Open Archives Initiative Protocol for Metadata Harvesting
- implements Pub/Sub capability via `OGC API Publish-Subscribe Workflow - Part 1: Core`_
- supports ISO, Dublin Core, DIF, FGDC, Atom, GM03 and DataCite metadata models
- CGI or WSGI deployment
- simple YAML configuration
Expand All @@ -42,6 +44,7 @@ Standards Support
`OGC API - Records - Part 1: Core`_,1.0
`OGC API - Features - Part 3: Filtering`_,draft
"`OGC API - Features - Part 4: Create, Replace, Update and Delete`_",draft
`OGC API Publish-Subscribe Workflow - Part 1: Core`_,draft
`OGC CSW`_,2.0.2/3.0.0
`OGC Filter`_,1.1.0/2.0.0
`OGC OWS Common`_,1.0.0/2.0.0
Expand Down Expand Up @@ -263,3 +266,6 @@ Paging
.. _`GM03`: https://www.geocat.admin.ch/en/dokumentation/gm03.html
.. _`OGC API - Features - Part 4: Create, Replace, Update and Delete`: https://cnn.com
.. _`DataCite`: https://schema.datacite.org/meta/kernel-4.3/
.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html
.. _`STAC API`: https://github.com/radiantearth/stac-api-spec

36 changes: 36 additions & 0 deletions docs/pubsub.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
.. _pubsub:

Publish-Subscribe integration (Pub/Sub)
=======================================

pycsw supports Publish-Subscribe (Pub/Sub) integration by implementing
the `OGC API Publish-Subscribe Workflow - Part 1: Core`_ (draft) specification.

Pub/Sub integration can be enabled by defining a broker that pycsw can use to
publish notifications on given topics using CloudEvents (as per the specification).

When enabled, core functionality of Pub/Sub includes:

- displaying the broker link in the OGC API - Records landing (using the ``rel=hub`` link relation)
- sending a notification message on metadata transactions (create, replace, update, delete)

The following message queuing protocols are supported:

MQTT
----

Example directive:

.. code-block:: yaml

pubsub:
broker:
type: mqtt
url: mqtt://localhost:1883

.. note::

For MQTT endpoints requiring authentication, encode the ``url`` value as follows: ``mqtt://username:password@localhost:1883``


.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html
50 changes: 50 additions & 0 deletions pycsw/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# =================================================================
#
# Authors: Tom Kralidis <[email protected]>
#
# Copyright (c) 2025 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

from pycsw.broker.base import BasePubSubClient
from pycsw.broker.mqtt import MQTTPubSubClient


def load_client(def_: dict) -> BasePubSubClient:
"""
Load Pub/Sub client plugin

:param def: `dict` of client definition

:returns: PubSubClient object
"""

class_ = CLIENTS[def_['type']]

return class_(def_)


CLIENTS = {
'mqtt': MQTTPubSubClient
}
81 changes: 81 additions & 0 deletions pycsw/broker/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# =================================================================
#
# Authors: Tom Kralidis <[email protected]>
#
# Copyright (c) 2025 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

import logging
import random
from urllib.parse import urlparse

from pycsw.core.util import remove_url_auth

LOGGER = logging.getLogger(__name__)


class BasePubSubClient:
"""Base Pub/Sub client"""

def __init__(self, publisher_def: dict):
"""
Initialize object

:param publisher_def: publisher definition

:returns: pycsw.broker.base.BasePubSubClient
"""

self.type = None
self.client_id = f'pycsw-pubsub-{random.randint(0, 1000)}'

self.show_link = publisher_def.get('show_link', True)
self.broker = publisher_def['url']
self.broker_url = urlparse(publisher_def['url'])
self.broker_safe_url = remove_url_auth(self.broker)

def connect(self) -> None:
"""
Connect to a Pub/Sub broker

:returns: None
"""

raise NotImplementedError()

def pub(self, channel: str, message: str) -> bool:
"""
Publish a message to a broker/channel

:param channel: `str` of channel
:param message: `str` of message

:returns: `bool` of publish result
"""

raise NotImplementedError()

def __repr__(self):
return f'<BasePubSubClient> {self.broker_safe_url}'
115 changes: 115 additions & 0 deletions pycsw/broker/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# =================================================================
#
# Authors: Tom Kralidis <[email protected]>
#
# Copyright (c) 2025 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

import logging

from paho.mqtt import client as mqtt_client

from pycsw.broker.base import BasePubSubClient

LOGGER = logging.getLogger(__name__)


class MQTTPubSubClient(BasePubSubClient):
"""MQTT client"""

def __init__(self, broker_url):
"""
Initialize object

:param publisher_def: provider definition

:returns: pycsw.pubsub.mqtt.MQTTClient
"""

super().__init__(broker_url)
self.type = 'mqtt'
self.port = self.broker_url.port

self.userdata = {}

msg = f'Connecting to broker {self.broker_safe_url} with id {self.client_id}' # noqa
LOGGER.debug(msg)
self.conn = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2,
client_id=self.client_id)

self.conn.enable_logger(logger=LOGGER)

if None not in [self.broker_url.username, self.broker_url.password]:
LOGGER.debug('Setting credentials')
self.conn.username_pw_set(
self.broker_url.username,
self.broker_url.password)

if self.port is None:
if self.broker_url.scheme == 'mqtts':
self.port = 8883
else:
self.port = 1883

if self.broker_url.scheme == 'mqtts':
self.conn.tls_set(tls_version=2)

def connect(self) -> None:
"""
Connect to an MQTT broker
:returns: None
"""

self.conn.connect(self.broker_url.hostname, self.port)
LOGGER.debug('Connected to broker')

def pub(self, topic: str, message: str, qos: int = 1) -> bool:
"""
Publish a message to a broker/topic
:param topic: `str` of topic
:param message: `str` of message
:returns: `bool` of publish result
"""

LOGGER.debug(f'Publishing to broker {self.broker_safe_url}')
LOGGER.debug(f'Topic: {topic}')
LOGGER.debug(f'Message: {message}')

result = self.conn.publish(topic, message, qos)
LOGGER.debug(f'Result: {result}')

# TODO: investigate implication
# result.wait_for_publish()

if result.is_published:
LOGGER.debug('Message published')
return True
else:
msg = f'Publishing error code: {result[1]}'
LOGGER.warning(msg)
return False

def __repr__(self):
return f'<MQTTPubSubClient> {self.broker_safe_url}'
14 changes: 14 additions & 0 deletions pycsw/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,17 @@ def str2bool(value: typing.Union[bool, str]) -> bool:
value2 = value.lower() in ('yes', 'true', 't', '1', 'on')

return value2


def remove_url_auth(url: str) -> str:
"""
Provide a RFC1738 URL without embedded authentication

:param url: RFC1738 URL

:returns: RFC1738 URL without authentication
"""

u = urlparse(url)
auth = f'{u.username}:{u.password}@'
return url.replace(auth, '')
Loading