Skip to content

Commit 09d3bf8

Browse files
authored
Merge pull request #41 from aiven/parse-views
Parse views
2 parents e5db89d + d8c216d commit 09d3bf8

File tree

8 files changed

+211
-9
lines changed

8 files changed

+211
-9
lines changed

img/reference.png

16.9 KB
Loading

img/sql_reference.png

20.2 KB
Loading

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ pymysql
1111
pydot
1212
colour
1313
cryptography==38.0.1
14+
sqllineage

scripts/create_pg_tbl.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,9 @@ create table pasta (pasta_id serial, pasta_name varchar(100), cooking_minutes in
22
insert into pasta(pasta_name, cooking_minutes) values ('pennette',8),('fusilli',7),('spaghetti',9);
33

44
create table pasta_eater(pasta_id int, eater_name varchar(100), constraint pasta_exitst_fk foreign key(pasta_id) references pasta(pasta_id));
5-
insert into pasta_eater values(1, 'Francesco'), (2, 'Ewelina'), (3, 'Lorna')
5+
insert into pasta_eater values(1, 'Francesco'), (2, 'Ewelina'), (3, 'Lorna');
66

7+
8+
create view vw_pasta_view AS
9+
select pasta.pasta_id, pasta_name, cooking_minutes, eater_name
10+
from pasta join pasta_eater on pasta.pasta_id = pasta_eater.pasta_id;

src/explore_service.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
opensearch,
1414
flink,
1515
)
16-
import traceback
1716

1817

1918
SERVICE_MAP = {}

src/pg.py

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Parsing PostgreSQL services"""
22

33
import psycopg2
4+
from src import sql
45

56

67
def build_conn_string(avnadmin_pwd, service_conn_info):
@@ -82,6 +83,14 @@ def explore_pg(self, service, service_name, project, service_map):
8283
nodes = nodes + new_nodes
8384
edges = edges + new_edges
8485

86+
new_nodes, new_edges = explore_pg_views(cur, service_name)
87+
nodes = nodes + new_nodes
88+
edges = edges + new_edges
89+
90+
new_nodes, new_edges = explore_pg_columns(cur, service_name)
91+
nodes = nodes + new_nodes
92+
edges = edges + new_edges
93+
8594
cur.close()
8695
conn.close()
8796
return (
@@ -176,7 +185,7 @@ def explore_pg_tables(cur, service_name):
176185
+ service_name
177186
+ "~schema~"
178187
+ table[0]
179-
+ "~table~"
188+
+ "~table_view~"
180189
+ table[1],
181190
"service_type": "pg",
182191
"type": "table",
@@ -190,7 +199,7 @@ def explore_pg_tables(cur, service_name):
190199
+ service_name
191200
+ "~schema~"
192201
+ table[0]
193-
+ "~table~"
202+
+ "~table_view~"
194203
+ table[1],
195204
"label": "table",
196205
}
@@ -250,7 +259,7 @@ def explore_pg_grants(cur, service_name):
250259
+ service_name
251260
+ "~schema~"
252261
+ role_table_grant[1]
253-
+ "~table~"
262+
+ "~table_view~"
254263
+ role_table_grant[2],
255264
"label": "grant",
256265
"privilege_type": role_table_grant[3],
@@ -282,7 +291,7 @@ def explore_pg_columns(cur, service_name):
282291
+ service_name
283292
+ "~schema~"
284293
+ column[1]
285-
+ "~table~"
294+
+ "~table_view~"
286295
+ column[2]
287296
+ "~column~"
288297
+ column[3],
@@ -299,17 +308,82 @@ def explore_pg_columns(cur, service_name):
299308
+ service_name
300309
+ "~schema~"
301310
+ column[1]
302-
+ "~table~"
311+
+ "~table_view~"
303312
+ column[2]
304313
+ "~column~"
305314
+ column[3],
306315
"to": "pg~"
307316
+ service_name
308317
+ "~schema~"
309318
+ column[1]
310-
+ "~table~"
319+
+ "~table_view~"
311320
+ column[2],
312321
"label": "column",
313322
}
314323
)
315324
return nodes, edges
325+
326+
327+
def explore_pg_views(cur, service_name):
328+
329+
nodes = []
330+
edges = []
331+
332+
cur.execute(
333+
"""
334+
select table_catalog, table_schema, table_name, view_definition,
335+
check_option, is_updatable, is_insertable_into,
336+
is_trigger_updatable, is_trigger_deletable, is_trigger_insertable_into
337+
from information_schema.views
338+
where table_schema not in ('information_schema', 'pg_catalog');
339+
"""
340+
)
341+
342+
views = cur.fetchall()
343+
for view in views:
344+
nodes.append(
345+
{
346+
"id": "pg~"
347+
+ service_name
348+
+ "~schema~"
349+
+ view[1]
350+
+ "~table_view~"
351+
+ view[2],
352+
"service_type": "pg",
353+
"type": "view",
354+
"view_definition": view[3],
355+
"check_option": view[4],
356+
"is_updatable": view[5],
357+
"is_insertable_into": view[6],
358+
"is_trigger_updatable": view[7],
359+
"is_trigger_deletable": view[8],
360+
"is_trigger_insertable_into": view[9],
361+
"label": view[2],
362+
}
363+
)
364+
edges.append(
365+
{
366+
"from": "pg~" + service_name + "~schema~" + view[1],
367+
"to": "pg~"
368+
+ service_name
369+
+ "~schema~"
370+
+ view[1]
371+
+ "~table_view~"
372+
+ view[2],
373+
"label": "view",
374+
}
375+
)
376+
377+
new_nodes, new_edges = sql.explore_sql(
378+
view[3], service_name, view[1], view[2], "pg"
379+
)
380+
nodes = nodes + new_nodes
381+
edges = edges + new_edges
382+
return nodes, edges
383+
384+
# new_nodes, new_edges = sql.explore_sql(
385+
# "create view testview as select a, b from test",
386+
# "cavallo",
387+
# "serpente",
388+
# "kafka",
389+
# )

src/pyvis_display.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
colors["database"] = "#0000ff"
1111
colors["schema"] = "#000077"
1212
colors["table"] = "#000033"
13+
colors["view"] = "#000033"
1314
colors["table column"] = "#000011"
1415
colors["user"] = "#0000AA"
1516
colors["flink table"] = "#AA0000"
@@ -25,10 +26,13 @@
2526
colors["index"] = "#0000cc"
2627
colors["connection-error"] = "#0000cc"
2728
colors["tag"] = "#0000cc"
28-
colors["backup"] = "cccccc"
29+
colors["backup"] = "#cccccc"
2930
colors["consumer_group"] = "#0000cc"
3031
colors["partition"] = "#0000cc"
3132
colors["service_nodes"] = "#0000cc"
33+
colors["service_nodes"] = "#0000cc"
34+
colors["reference"] = "#0000cc"
35+
colors["sql_reference"] = "#0000cc"
3236

3337

3438
sizes = {}
@@ -39,6 +43,7 @@
3943
sizes["database"] = 20
4044
sizes["schema"] = 15
4145
sizes["table"] = 15
46+
sizes["view"] = 15
4247
sizes["user"] = 15
4348
sizes["flink table"] = 15
4449
sizes["flink job"] = 25
@@ -58,6 +63,8 @@
5863
sizes["consumer_group"] = 10
5964
sizes["partition"] = 10
6065
sizes["service_node"] = 10
66+
sizes["reference"] = 10
67+
sizes["sql_reference"] = 10
6168

6269

6370
images = {}
@@ -68,6 +75,7 @@
6875
images["database"] = "img/database.png"
6976
images["schema"] = "img/document.png"
7077
images["table"] = "img/table.png"
78+
images["view"] = "img/table.png"
7179
images["user"] = "img/user.png"
7280
images["flink table"] = "img/table.png"
7381
images["flink job"] = "img/engineering.png"
@@ -88,6 +96,8 @@
8896
images["consumer_group"] = "img/user.png"
8997
images["partition"] = "img/layout.png"
9098
images["service_node"] = "img/servers.png"
99+
images["reference"] = "img/reference.png"
100+
images["sql_reference"] = "img/sql_reference.png"
91101

92102

93103
def pyviz_graphy(nodes, edges):

src/sql.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""Parsing SQL"""
2+
3+
from sqllineage.runner import LineageRunner
4+
from sqllineage.core.models import SubQuery
5+
6+
7+
def explore_sql(sql_statement, service_name, schema, target, service_type):
8+
"""Parsing SQL"""
9+
nodes = []
10+
edges = []
11+
12+
if not sql_statement.upper().startswith("INSERT"):
13+
sql_statement = "INSERT INTO " + target + " AS " + sql_statement
14+
runner = LineageRunner(
15+
sql_statement,
16+
verbose=False,
17+
draw_options={
18+
"host": "abc",
19+
"port": 123,
20+
"f": "sql.sql",
21+
},
22+
)
23+
24+
for line in runner.get_column_lineage(exclude_subquery=False):
25+
prev_col_id = None
26+
27+
for col in reversed(line):
28+
is_subquery = False
29+
if isinstance(col.parent, SubQuery):
30+
table_name = col.parent.alias
31+
is_subquery = True
32+
else:
33+
table_name = col.parent.raw_name
34+
column_name = col.raw_name
35+
36+
if table_name is None:
37+
# This is the case where is a reference to the end column
38+
new_col_id = (
39+
service_type
40+
+ "~"
41+
+ service_name
42+
+ "~schema~"
43+
+ schema
44+
+ "~table_view~"
45+
+ target
46+
+ "~"
47+
+ "column"
48+
+ "~"
49+
+ column_name
50+
)
51+
else:
52+
new_col_id = (
53+
service_type
54+
+ "~"
55+
+ service_name
56+
+ "~schema~"
57+
+ schema
58+
+ "~table_view~"
59+
+ table_name
60+
+ "~column~"
61+
+ column_name
62+
)
63+
if is_subquery:
64+
nodes.append(
65+
{
66+
"id": new_col_id,
67+
"service_type": service_type,
68+
"type": "reference",
69+
"label": column_name,
70+
}
71+
)
72+
nodes.append(
73+
{
74+
"id": new_col_id.split("~column~")[0],
75+
"service_type": service_type,
76+
"type": "sql_reference",
77+
"label": new_col_id.split("~column~")[0].split(
78+
"~table_view~"
79+
)[1],
80+
}
81+
)
82+
edges.append(
83+
{
84+
"from": new_col_id,
85+
"to": new_col_id.split("~column~")[0],
86+
"type": "sql_reference",
87+
}
88+
)
89+
edges.append(
90+
{
91+
"from": service_type
92+
+ "~"
93+
+ service_name
94+
+ "~schema~"
95+
+ schema
96+
+ "~table_view~"
97+
+ target,
98+
"to": new_col_id.split("~column~")[0],
99+
"type": "sql_reference",
100+
"label": target,
101+
}
102+
)
103+
104+
if prev_col_id is not None:
105+
edges.append(
106+
{
107+
"from": new_col_id,
108+
"to": prev_col_id,
109+
"type": "sql_reference",
110+
}
111+
)
112+
113+
prev_col_id = new_col_id
114+
return nodes, edges

0 commit comments

Comments
 (0)