Skip to content

OpenDataVBB/gtfs-rt-feed

Repository files navigation

gtfs-rt-feed

Continuously matches realtime transit data in the VDV-454 structure against a GTFS Schedule dataset and generates GTFS Realtime (GTFS-RT) data.

ISC-licensed

Tip

If you're just looking for VBB's publicly deployed GTFS-RT feed:

Tip

Although gtfs-rt-feed can be used standalone, it is intended to be used in tandem with vdv-453-nats-adapter – which pulls the input VDV-454 data from a VDV-453/-454 API – and nats-consuming-gtfs-rt-server – which combines the DIFFERENTIAL-mode GTFS-RT data sent by gtfs-rt-feed into a single non-differential feed and serves it via HTTP.

For more details about the architecture gtfs-rt-feed has been designed for, refer to the VBB deployment's readme.

It uses the PostGIS GTFS importer to import the GTFS Schedule data into a new PostgreSQL database whenever it has changed.

How matching works

flowchart TB
	classDef machine fill:none,stroke:#9b59b6

	input@{ shape: sm-circ }
	output@{ shape: framed-circle }

	subgraph gtfs_rt_feed [OpenDataVBB/gtfs-rt-feed]
		subgraph services
			gtfs_matching_service(gtfs-matching-service)
			vdv_reconciliation_service(vdv-reconciliation-service)
		end
		style services fill:none,stroke:none
		%% gtfs_importer(GTFS import script)
		gtfs_db[(PostgreSQL DB with GTFS Schedule data)]
		redis[(Redis)]
	end
	class gtfs_rt_feed machine
	subgraph nats[NATS JetStream]
		nats_ref_aus_sollfahrt["`*REF_AUS_SOLLFAHRT_2* stream`"]:::stream
		nats_aus_istfahrt["`*AUS_ISTFAHRT_2* stream`"]:::stream
		nats_vdv_fahrt["`*VDV_FAHRT_2* stream`"]:::stream
		nats_gtfs_rt["`*GTFS_RT_2* stream`"]:::stream
		classDef stream fill:#ffffde,stroke:#aaaa33
	end
	style nats fill:none

	input-->nats_ref_aus_sollfahrt
	nats_ref_aus_sollfahrt-- "`VDV-453 *REF-AUS* *SollFahrt* messages`" -->vdv_reconciliation_service
	input-->nats_aus_istfahrt
	nats_aus_istfahrt-- "`VDV-454 *AUS* *IstFahrt* messages`" -->vdv_reconciliation_service
	vdv_reconciliation_service-- "`reconciles *SollFahrt*s & *IstFahrt*s using`" ---redis
	vdv_reconciliation_service-- "`VDV *Fahrt*s messages`"-->nats_vdv_fahrt
	nats_vdv_fahrt-->gtfs_matching_service
	%% gtfs_importer-- "`updates with new GTFS Schedule data`" -->gtfs_db
	gtfs_matching_service-- "`matches VDV *Fahrt*s with`" ---gtfs_db
	gtfs_matching_service-- "`caches matching results using`" ---redis
	gtfs_matching_service-- "`GTFS-RT messages`"-->nats_gtfs_rt
	nats_gtfs_rt-->output
Loading

VDV REF-AUS/AUS reconciliation

This service reads both VDV-454 REF-AUS SollFahrts and VDV-454 AUS IstFahrts from a NATS message queue (in JSON instead of XML):

// REF-AUS SollFahrt
// To be more readable, this example only contains essential fields. In practice, there are more.
{
	"LinienID": "M77",
	"UmlaufID": "1234",
	"FahrtID": {
		"FahrtBezeichner": "9325_877_8_2_19_1_1806#BVG",
		"Betriebstag": "2024-09-20",
	},
	"SollHalts": [
		{
			"HaltID": "900073281",
			"Abfahrtszeit": "2024-09-20T12:41:00Z",
		},
		{
			"HaltID": "900073236",
			"Ankunftszeit": "2024-09-20T12:43:00Z",
			"Abfahrtszeit": "2024-09-20T12:45:00Z",
		},
		// Note: Usually the SollFahrt has all SollHalts, but sometimes it may not be complete.
	],
}
// AUS IstFahrt
// Again, non-essential fields are omitted.
{
	"LinienID": "M77",
	"LinienText": "M77",
	"FahrtID": {
		"FahrtBezeichner": "9325_877_8_2_19_1_1806#BVG",
		"Betriebstag": "2024-09-20",
	},
	"Komplettfahrt": null,
	"IstHalts": [
		{
			"HaltID": "900073236",
			"Ankunftszeit": "2024-09-20T12:43:00Z",
			"Abfahrtszeit": "2024-09-20T12:45:00Z",
			"IstAnkunftPrognose": "2024-09-20T13:46:00+01:00", // 3 minutes delay
			"IstAbfahrtPrognose": "2024-09-20T13:47:00+01:00", // 2 minutes delay
		},
		// Note: Sometimes there are more IstHalts, but often IstFahrts are incomplete, with even just one IstHalt.
	],
}

For each trip "instance" (e.g. the M77 bus above, starting at 2024-09-20T12:41:00Z), there may be

  • a REF-AUS SollFahrt, delineating the scheduled (read: as intended by the transport company's medium-term planning, i.e. taking into account construction work, strikes, etc.) sequence of stops. – These messages (there can by multiple per trip "instance") are typically sent at the beginning of the schedule day early in the morning.
  • 0 or more AUS IstFahrts with all IstHalts, as indicated by their Komplettfahrt=true flag, delineating the prognosed complete sequence of stops. – These messages are typically sent right before the first departure of and during a trip "instance". Besides providing prognosed arrival/departure times, they also express cancelled and added stops; They are considered exhaustive descriptions of the trip "instance". Only the most recent is kept for each trip "instance".
  • 0 or more partial AUS IstFahrts, as indicated by the lack of Komplettfahrt=true, expressing realtime changes just to those stops that they contain IstHalts for. For each stop of each trip "instance", the most recent is kept.

For a single trip "instance", both the number of (each) kind of message as well as their order is unknown. This is why gtfs-rt-feed, in a process called "VDV reconciliation",

  1. persists all of these messages in a key-value store (Redis), so that,
  2. whenever a new message is received, it can query all previous ones concerning the same trip "instance", and
  3. merge them into a single new IstFahrt structure, "layering" the realtime data from the received AUS IstFahrts on top of the schedule data from the received REF-AUS SollFahrt.

After merging, the IstFahrt is transformed into a GTFS-RT TripUpdate, so that subsequent code must only deal with GTFS-RT concepts.

// Again, this example has been shortened for readability.
{
	"trip": {},
	"stop_time_update": [
		{
			"stop_id": "900073281",
			"departure": {
				"time": 1726836420,
				"delay": 300,
			},
		},
		{
			"stop_id": "900073236",
			"arrival": {
				"time": 1726836360,
				"delay": 180,
			},
			"departure": {
				"time": 1726836420,
				"delay": 120,
			},
		},
	],
	// not part of the GTFS Realtime spec, we just use it for matching and/or debug-logging
	[kRouteShortName]: "M77",
}

GTFS Schedule matching

Within the imported GTFS Schedule data, gtfs-rt-feed then tries to find trip "instances" that

  • have the same route_short_name ("M77"),
  • for at least two IstHalts, stop at (roughly) the same scheduled time (2024-09-20T12:41:00Z) at (roughly) the same stop (900073281).

If there is exactly one such GTFS Schedule trip "instance", we call it a match. If there are 2 trip "instances", we consider the the match ambiguous and not specific enough, so we stop processing the IstFahrt.

The GTFS Schedule trip "instance" is then formatted as a GTFS-RT TripUpdate (it contains no realtime data). Then the schedule TripUpdate and the matched realtime TripUpdate get merged into a single new TripUpdate.

// Again, this example has been shortened for readability.
{
	"trip": {
		"trip_id": "1234567",
		"route_id": "17462_700",
	},
	"stop_time_update": [
		{
			"stop_id": "de:11000:900073281",
			// Note that `arrival` has been filled in from schedule data.
			"arrival": {
				"time": 1726836060,
			},
			"departure": {
				"time": 1726836420,
				"delay": 300,
			},
		},
		{
			"stop_id": "de:11000:900073236",
			"arrival": {
				"time": 1726836360,
				"delay": 180,
			},
			"departure": {
				"time": 1726836420,
				"delay": 120,
			},
		},
	],
	// not part of the GTFS Realtime spec, we just use it for matching and/or debug-logging
	[kRouteShortName]: "M77",
}

This whole process, which we call matching, is done continuously for each VDV-454 SollFahrt/IstFahrt message received from NATS.

Installation

There is a Docker image available:

# pull the Docker image …
docker pull ghcr.io/opendatavbb/gtfs-rt-feed

# … or install everything manually (you will need Node.js & npm).
git clone https://github.com/OpenDataVBB/gtfs-rt-feed.git gtfs-rt-feed
cd gtfs-rt-feed
npm install --omit dev
# install submodules' dependencies
git submodule update --checkout
cd postgis-gtfs-importer && npm install --omit dev

Getting Started

Important

Although gtfs-rt-feed is intended to be data-source-agnostic, just following the GTFS Schedule and GTFS-RT specs, it currently has some hard-coded assumptions specific to the VBB deployment it has been developed for. Please create an Issue if you want to use gtfs-rt-feed in another setting.

Prerequisites

gtfs-rt-feed needs access to the following services to work:

configure access to PostgreSQL

gtfs-rt-feed uses pg to connect to PostgreSQL; For details about supported environment variables and their defaults, refer to pg's docs.

To make sure that the connection works, use psql from the same context (same permissions, same container if applicable, etc.).

configure access to NATS

gtfs-rt-feed uses nats to connect to NATS. You can use the following environment variables to configure access:

  • $NATS_SERVERS – list of NATS servers (e.g. localhost:4222), separated by ,
  • $NATS_USER & $NATS_PASSWORD – if you need authentication
  • $NATS_CLIENT_NAME – the connection name

By default, gtfs-rt-feed will connect as gtfs-rt-$MAJOR_VERSION to localhost:4222 without authentication.

create NATS stream & consumer

We also need to create two NATS JetStream streams called REF_AUS_SOLLFAHRT_2 and AUS_ISTFAHRT_2 that gtfs-rt-feed will read (unmatched) VDV-454 REF-AUS SollFahrt and AUS IstFahrt messages from, respectively. This can be done using the NATS CLI:

nats stream add \
	# omit this if you want to configure more details
	--defaults \
	# collect all messages published to these subjects
	--subjects='ref_aus.sollfahrt.>' \
	# acknowledge publishes
	--ack \
	# with limited storage, discard the oldest limits first
	--retention=limits --discard=old \
	--description='VDV-454 REF-AUS SollFahrt messages' \
	# name of the stream
	REF_AUS_SOLLFAHRT_2
nats stream add \
	--defaults \
	--subjects='aus.istfahrt.>' \
	--ack \
	--retention=limits --discard=old \
	--description='VDV-454 AUS IstFahrt messages' \
	AUS_ISTFAHRT_2
nats stream add \
	--defaults \
	--subjects='vdv.fahrt.>' \
	--ack \
	--retention=limits --discard=old \
	--description='VDV-454 Fahrt (combined REF-AUS SollFahrt & AUS IstFahrt) messages' \
	VDV_FAHRT_2

On the both streams, we create one durable consumer each called gtfs-rt-feed:

nats consumer add \
	# omit this if you want to configure more details
	--defaults \
	# create a pull-based consumer (refer to the NATS JetStream docs)
	--pull \
	# let gtfs-rt-feed explicitly acknowledge all received messages
	--ack=explicit \
	# let the newly created consumer start with the latest messages in AUS_ISTFAHRT_2 (not all)
	--deliver=new \
	# send gtfs-rt-feed at most 200 messages at once
	--max-pending=200 \
	# when & how often to re-deliver a message that hasn't been acknowledged (usually because it couldn't be processed)
	--max-deliver=3 \
	--backoff=linear \
	--backoff-steps=2 \
	--backoff-min=15s \
	--backoff-max=2m \
	--description 'OpenDataVBB/gtfs-rt-feed: vdv-reconciliation service' \
	# name of the stream
	REF_AUS_SOLLFAHRT_2 \
	# name of the consumer
	'gtfs-rt-feed:vdv-reconciliation'
nats consumer add \
	--defaults \
	--pull \
	--ack=explicit \
	--deliver=new \
	--max-pending=200 \
	--max-deliver=3 \
	--backoff=linear \
	--backoff-steps=2 \
	--backoff-min=15s \
	--backoff-max=2m \
	--description 'OpenDataVBB/gtfs-rt-feed: vdv-reconciliation service' \
	AUS_ISTFAHRT_2 \
	'gtfs-rt-feed:vdv-reconciliation'
nats consumer add \
	--defaults \
	--pull \
	--ack=explicit \
	--deliver=new \
	--max-pending=200 \
	--max-deliver=3 \
	--backoff=linear \
	--backoff-steps=2 \
	--backoff-min=15s \
	--backoff-max=2m \
	--description 'OpenDataVBB/gtfs-rt-feed: gtfs-matching service' \
	VDV_FAHRT_2 \
	'gtfs-rt-feed:gtfs-matching'

Next, again using the NATS CLI, we'll create a stream called GTFS_RT_2 that the gtfs-rt-feed service will write (matched) GTFS-RT messages into:

nats stream add \
	# omit this if you want to configure more details
	--defaults \
	# collect all messages published to these subjects
	--subjects='gtfsrt.>' \
	# acknowledge publishes
	--ack \
	# with limited storage, discard the oldest limits first
	--retention=limits --discard=old \
	--description='GTFS-RT messages' \
	# name of the stream
	GTFS_RT_2

configure access to Redis

gtfs-rt-feed uses ioredis to connect to Redis; For details about supported environment variables and their defaults, refer to its docs.

Tip

You should allow Redis to use at least a few hundred MB of memory. With the VBB deployment, we limit it to 2GB.

import GTFS Schedule data

Make sure your GTFS Schedule dataset is available via HTTP without authentication. Configure the URL using $GTFS_DOWNLOAD_URL. Optionally, you can configure the User-Agent being used for downloading by setting $GTFS_DOWNLOAD_USER_AGENT.

The GTFS import script will

  1. download the GTFS dataset;
  2. import it into a separate database called gtfs_$timestamp_$gtfs_hash (each revision gets its own database);
  3. keep track of the latest successfully imported database's name in a meta "bookkeeping" database ($PGDATABASE by default).

Refer to postgis-gtfs-importer's docs for details about why this is done and how it works.

Optionally, you can

  • activate gtfstidy-ing before import using GTFSTIDY_BEFORE_IMPORT=true;
  • postprocess the imported GTFS dataset using custom SQL scripts by putting them in $PWD/gtfs-postprocessing.d.

Refer to the import script for details about how to customize the GTFS Schedule import.

export GTFS_DOWNLOAD_URL=''
# Run import using Docker …
./import.sh --docker
# … or run import using ./postgis-gtfs-importer
./import.sh

Once the import has finished, you must set $PGDATABASE to the name of the newly created database.

export PGDATABASE="$(psql -q --csv -t -c 'SELECT db_name FROM latest_import')"

Note

For simplicity's sake, in this guide, we only import the GTFS Schedule data by running the command manually. If you're running gtfs-rt-feed in a continuous (service-like) fashion, you'll want to run the GTFS Schedule import regularly, e.g. once per day. postgis-gtfs-importer won't import again if the dataset hasn't changed.

Because it highly depends on your deployment strategy and preferences on how to schedule the import – and how to modify $PGDATABASE for the gtfs-rt-feed process afterwards –, this repo doesn't contain any tool for that. It assumes however that the import is running "out-of-band" (e.g. in a sidecar container or a separate service) from gtfs-rt-feed's main matching service.

As an example, VBB's deployment uses a systemd timer to schedule the import and a systemd service drop-in file to set $PGDATABASE.

run gtfs-rt-feed

# Run using Docker …
# (In production, use the container deployment tool of your choice.)
docker run --rm -it \
	-e PGDATABASE \
	# note: pass through other environment variables here
	ghcr.io/opendatavbb/gtfs-rt-feed

# … or manually.
# (During development, pipe the logs through `./node_modules/.bin/pino-pretty`.)
node index.js

todo: $LOG_LEVEL todo: $LOG_LEVEL_MATCHING todo: $LOG_LEVEL_FORMATTING todo: $LOG_LEVEL_STATION_WEIGHT todo: $METRICS_SERVER_PORT todo: $MATCHING_CONCURRENCY todo: $MATCH_GTFS_RT_TO_GTFS_CACHING todo: $MATCHING_CONSUMER_NAME todo: $MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES todo: $MATCHING_PG_POOL_SIZE todo: $STATION_WEIGHTS_LOOKUP_PG_POOL_SIZE

Alternative: Docker Compose setup

The example docker-compose.yml starts up a complete set of containers (vbb-gtfs-rt-server and all of its dependencies: PostgreSQL & NATS).

Warning

The Docker Compose setup is only intended as a quick demo on how to run gtfs-rt-feed and its dependency services.

Be sure to set POSTGRES_PASSWORD, either via a .env file or an environment variable.

POSTGRES_PASSWORD=my_secret_password docker-compose up

Operating gtfs-rt-feed

Logs

gtfs-rt-feed writes pino-formatted log messages to stdout, so you can use pino-compatible tools to process them.

Monitoring

gtfs-rt-feed exposes Prometheus-compatible metrics via HTTP. By default, the metrics server will listen on a random port. You can configure a permanent port using $METRICS_SERVER_PORT.

The following kinds of metrics will be exported:

Refer to the Grafana dashboard in VBB's deployment for an example how to visualize gtfs-rt-feed's metrics.

License

This project is ISC-licensed.

Note that PostGIS GTFS importer, one of the service's dependencies, is EUPL-licensed.

About

Continuously Matches realtime VDV-454 transit data against GTFS Schedule and generates GTFS Realtime.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors