Skip to content

Commit 2b20640

Browse files
committed
Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR
1 parent 0339630 commit 2b20640

File tree

10 files changed

+431
-36
lines changed

10 files changed

+431
-36
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
pip install "setuptools>=64" --upgrade
6666
6767
# Install package in editable mode.
68-
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
68+
pip install --use-pep517 --prefer-binary --editable=.[all,develop,test]
6969
7070
- name: Run linter and software tests
7171
run: |

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog for Meltano/Singer Target for CrateDB
22

33
## In progress
4+
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.
45

56
## 2023-12-08 v0.0.1
67
- Make it work. It can run the canonical Meltano GitHub -> DB example.

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,27 @@ LIMIT
123123
```
124124

125125

126+
## Vector Store Support
127+
128+
In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR`
129+
data type, you will need to install `numpy`. It has been added to an "extra"
130+
of the Python package, called `vector`.
131+
132+
When installing the package using pip, this would apply:
133+
```
134+
pip install 'meltano-target-cratedb[vector]'
135+
```
136+
137+
When installing the package using the Meltano's project definition, this
138+
would probably be the right way to write it down, but it hasn't been verified
139+
yet.
140+
```yaml
141+
- name: target-cratedb
142+
variant: cratedb
143+
pip_url: meltano-target-cratedb[vector]
144+
```
145+
146+
126147
## Development
127148

128149
In order to work on this adapter dialect on behalf of a real pipeline definition,

pyproject.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,13 @@ dynamic = [
9494
dependencies = [
9595
"crate[sqlalchemy]",
9696
"cratedb-toolkit",
97-
'importlib-resources; python_version < "3.9"',
98-
"meltanolabs-target-postgres==0.0.9",
97+
'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9",
98+
"meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
9999
]
100100
[project.optional-dependencies]
101+
all = [
102+
"meltano-target-cratedb[vector]",
103+
]
101104
develop = [
102105
"black<24",
103106
"mypy==1.7.1",
@@ -115,6 +118,9 @@ test = [
115118
"pytest-cov<5",
116119
"pytest-mock<4",
117120
]
121+
vector = [
122+
"numpy",
123+
]
118124
[project.urls]
119125
changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md"
120126
documentation = "https://github.com/crate-workbench/meltano-target-cratedb"

target_cratedb/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""Init CrateDB."""
2-
from target_cratedb.patch import patch_sqlalchemy
2+
from target_cratedb.sqlalchemy.patch import patch_sqlalchemy
33

44
patch_sqlalchemy()

target_cratedb/connector.py

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@
66
from datetime import datetime
77

88
import sqlalchemy
9+
import sqlalchemy as sa
910
from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray
1011
from singer_sdk import typing as th
11-
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT
12+
from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type
1213
from sqlalchemy.types import (
14+
ARRAY,
15+
BIGINT,
1316
BOOLEAN,
1417
DATE,
1518
DATETIME,
1619
DECIMAL,
20+
FLOAT,
1721
INTEGER,
1822
TEXT,
1923
TIME,
@@ -22,7 +26,8 @@
2226
)
2327
from target_postgres.connector import NOTYPE, PostgresConnector
2428

25-
from target_cratedb.patch import polyfill_refresh_after_dml_engine
29+
from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine
30+
from target_cratedb.sqlalchemy.vector import FloatVector
2631

2732

2833
class CrateDBConnector(PostgresConnector):
@@ -111,8 +116,52 @@ def pick_individual_type(jsonschema_type: dict):
111116
if "object" in jsonschema_type["type"]:
112117
return ObjectType
113118
if "array" in jsonschema_type["type"]:
114-
# TODO: Handle other inner-types as well?
119+
# Select between different kinds of `ARRAY` data types.
120+
#
121+
# This currently leverages an unspecified definition for the Singer SCHEMA,
122+
# using the `additionalProperties` attribute to convey additional type
123+
# information, agnostic of the target database.
124+
#
125+
# In this case, it is about telling different kinds of `ARRAY` types apart:
126+
# Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
127+
# alternatively, it can be a "vector" kind `ARRAY` of floating point
128+
# numbers, effectively what pgvector is storing in its `VECTOR` type.
129+
#
130+
# Still, `type: "vector"` is only a surrogate label here, because other
131+
# database systems may use different types for implementing the same thing,
132+
# and need to translate accordingly.
133+
"""
134+
Schema override rule in `meltano.yml`:
135+
136+
type: "array"
137+
items:
138+
type: "number"
139+
additionalProperties:
140+
storage:
141+
type: "vector"
142+
dim: 4
143+
144+
Produced schema annotation in `catalog.json`:
145+
146+
{"type": "array",
147+
"items": {"type": "number"},
148+
"additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
149+
"""
150+
if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]:
151+
storage_properties = jsonschema_type["additionalProperties"]["storage"]
152+
if "type" in storage_properties and storage_properties["type"] == "vector":
153+
# On PostgreSQL/pgvector, use the corresponding type definition
154+
# from its SQLAlchemy dialect.
155+
return FloatVector(storage_properties["dim"])
156+
157+
# Discover/translate inner types.
158+
inner_type = resolve_array_inner_type(jsonschema_type)
159+
if inner_type is not None:
160+
return ARRAY(inner_type)
161+
162+
# When type discovery fails, assume `TEXT`.
115163
return ARRAY(TEXT())
164+
116165
if jsonschema_type.get("format") == "date-time":
117166
return TIMESTAMP()
118167
individual_type = th.to_sql_type(jsonschema_type)
@@ -139,20 +188,18 @@ def pick_best_sql_type(sql_type_array: list):
139188
DATE,
140189
TIME,
141190
DECIMAL,
191+
FLOAT,
142192
BIGINT,
143193
INTEGER,
144194
BOOLEAN,
145195
NOTYPE,
146196
ARRAY,
147-
ObjectType,
197+
FloatVector,
198+
ObjectTypeImpl,
148199
]
149200

150201
for sql_type in precedence_order:
151202
for obj in sql_type_array:
152-
# FIXME: Workaround. Currently, ObjectType can not be resolved back to a type?
153-
# TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
154-
if isinstance(sql_type, ObjectTypeImpl):
155-
return ObjectType
156203
if isinstance(obj, sql_type):
157204
return obj
158205
return TEXT()
@@ -188,6 +235,8 @@ def _get_type_sort_key(
188235

189236
if isinstance(sql_type, _ObjectArray):
190237
return 0, _len
238+
if isinstance(sql_type, FloatVector):
239+
return 0, _len
191240
if isinstance(sql_type, NOTYPE):
192241
return 0, _len
193242

@@ -245,3 +294,18 @@ def prepare_schema(self, schema_name: str) -> None:
245294
Don't emit `CREATE SCHEMA` statements to CrateDB.
246295
"""
247296
pass
297+
298+
299+
def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]:
300+
if "items" in jsonschema_type:
301+
if is_boolean_type(jsonschema_type["items"]):
302+
return BOOLEAN()
303+
if is_number_type(jsonschema_type["items"]):
304+
return FLOAT()
305+
if is_integer_type(jsonschema_type["items"]):
306+
return BIGINT()
307+
if is_object_type(jsonschema_type["items"]):
308+
return ObjectType()
309+
if is_array_type(jsonschema_type["items"]):
310+
return resolve_array_inner_type(jsonschema_type["items"]["type"])
311+
return None

target_cratedb/sqlalchemy/__init__.py

Whitespace-only changes.

target_cratedb/patch.py renamed to target_cratedb/sqlalchemy/patch.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,36 @@
11
from datetime import datetime
22

33
import sqlalchemy as sa
4-
from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime
4+
from _decimal import Decimal
5+
from crate.client.http import CrateJsonEncoder
6+
from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime
57
from crate.client.sqlalchemy.types import _ObjectArray
68
from sqlalchemy.sql import sqltypes
79

810

911
def patch_sqlalchemy():
12+
patch_types()
13+
patch_json_encoder()
14+
15+
16+
def patch_types():
1017
"""
11-
Register missing timestamp data type.
18+
Register missing data types, and fix erroneous ones.
1219
1320
TODO: Upstream to crate-python.
1421
"""
15-
# TODO: Submit patch to `crate-python`.
22+
TYPES_MAP["bigint"] = sqltypes.BIGINT
23+
TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT)
24+
TYPES_MAP["long"] = sqltypes.BIGINT
25+
TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT)
26+
TYPES_MAP["real"] = sqltypes.DOUBLE
27+
TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE)
1628
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
1729
TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP
1830

31+
# TODO: Can `ARRAY` be inherited from PostgreSQL's
32+
# `ARRAY`, to make type checking work?
33+
1934
def as_generic(self):
2035
return sqltypes.ARRAY
2136

@@ -36,6 +51,23 @@ def process(value):
3651
DateTime.bind_processor = bind_processor
3752

3853

54+
def patch_json_encoder():
55+
"""
56+
`Decimal` types have been rendered as strings.
57+
58+
TODO: Upstream to crate-python.
59+
"""
60+
61+
json_encoder_default = CrateJsonEncoder.default
62+
63+
def default(self, o):
64+
if isinstance(o, Decimal):
65+
return float(o)
66+
return json_encoder_default(o)
67+
68+
CrateJsonEncoder.default = default
69+
70+
3971
def polyfill_refresh_after_dml_engine(engine: sa.Engine):
4072
def receive_after_execute(
4173
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result

0 commit comments

Comments
 (0)