Skip to content
This repository was archived by the owner on Nov 18, 2023. It is now read-only.

Commit 7a8b0e2

Browse files
authored
Input pipeline refactor (#63)
We simplify the code, and improve the testing, of the input data pipeline. Specifically the ContextBuilder and conversion of contexts into arrays ready for ingestion into the TensorFlow pipeline.
1 parent cb2b480 commit 7a8b0e2

File tree

12 files changed

+902
-834
lines changed

12 files changed

+902
-834
lines changed

kglib/BUILD

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ py_test(
9292
]
9393
)
9494

95+
py_test(
96+
name = "builder_IT",
97+
srcs = [
98+
"kgcn/core/ingest/traverse/data/context/builder_IT.py"
99+
],
100+
deps = [
101+
"kglib",
102+
]
103+
)
104+
95105
py_test(
96106
name = "neighbour_test",
97107
srcs = [
@@ -123,6 +133,16 @@ py_test(
123133
]
124134
)
125135

136+
py_test(
137+
name = "array_IT",
138+
srcs = [
139+
"kgcn/core/ingest/traverse/data/context/array_IT.py"
140+
],
141+
deps = [
142+
"kglib",
143+
]
144+
)
145+
126146
py_test(
127147
name = "model_test",
128148
srcs = [

kglib/kgcn/core/ingest/encode/schema_test.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ def test_input_output(self):
9999
# with self.subTest("Type indices correctness"):
100100
# np.testing.assert_array_equal(type_indices.numpy(), expected_type_indices)
101101

102-
def test_integration(self):
103-
array_data_types = collections.OrderedDict([('neighbour_type', ('U25', 'collie'))])
104-
example_arrays = array.build_default_arrays((3, 2), 4, array_data_types)
105-
example = np.array(example_arrays[0]['neighbour_type'], dtype=str)
102+
def test_encoding_schema_for_an_input_array_works_as_expected(self):
103+
array_shape = (3, 2, 1)
104+
example_arrays = array.initialise_arrays(array_shape, neighbour_type=('U25', 'collie'))
105+
106+
example = np.array(example_arrays['neighbour_type'], dtype=str)
106107
tf.enable_eager_execution()
107108
encoder = se.MultiHotSchemaTypeEncoder(schema_traversal)
108109
embeddings = encoder(tf.convert_to_tensor(example, tf.string))

kglib/kgcn/core/ingest/traverse/data/context/array.py

Lines changed: 77 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -19,196 +19,122 @@
1919

2020
import typing as typ
2121

22-
import collections
23-
2422
import numpy as np
2523

26-
import kglib.kgcn.core.ingest.traverse.data.context.builder as builder
27-
28-
29-
def build_default_arrays(neighbourhood_sizes, n_starting_things, array_data_types):
30-
depthwise_arrays = []
31-
depth_shape = list(neighbourhood_sizes) + [1]
32-
33-
for i in range(len(depth_shape)):
34-
shape_at_this_depth = [n_starting_things] + depth_shape[i:]
35-
arrays = {}
36-
for array_name, (array_data_type, default_value) in array_data_types.items():
3724

38-
if not (i == len(depth_shape) - 1 and array_name in ['role_direction', 'role_type']):
39-
# For the starting nodes we don't need to store roles
40-
arrays[array_name] = np.full(shape=shape_at_this_depth,
41-
fill_value=default_value,
42-
dtype=array_data_type)
25+
def get_context_values_to_put(context):
4326

44-
depthwise_arrays.append(arrays)
45-
return depthwise_arrays
27+
context_values_to_put = {}
4628

29+
for depth, node_list in context.items():
4730

48-
def _get_indices(last_indices, n):
49-
if len(last_indices) == 0:
50-
current_indices = (n, 0)
51-
else:
52-
current_indices = list(last_indices)
53-
current_indices.insert(1, n)
54-
current_indices = tuple(current_indices)
55-
return current_indices
31+
values_to_put_at_this_depth = {}
5632

33+
for node in node_list:
5734

58-
def _get_values_to_put(role_label, role_direction, neighbour_type_label, neighbour_data_type,
59-
neighbour_value):
60-
values_to_put = {}
61-
if role_label is not None:
62-
values_to_put['role_type'] = role_label
63-
if role_direction is not None:
64-
values_to_put['role_direction'] = role_direction
35+
values_to_put_for_this_node = {}
6536

66-
values_to_put['neighbour_type'] = neighbour_type_label
37+
if node.role_label is not None:
38+
values_to_put_for_this_node['role_type'] = node.role_label
39+
if node.role_direction is not None:
40+
values_to_put_for_this_node['role_direction'] = node.role_direction
6741

68-
if neighbour_data_type is not None:
69-
# Potentially confusing to create an index of these arrays, since role type and direction will be omitted
70-
# for the starting things
71-
# values_to_put['neighbour_data_type'] = list(self._array_data_types.keys()).index(
72-
# 'neighbour_value_' + neighbour_data_type)
73-
values_to_put['neighbour_data_type'] = neighbour_data_type
74-
values_to_put['neighbour_value_' + neighbour_data_type] = neighbour_value
42+
values_to_put_for_this_node['neighbour_type'] = node.thing.type_label
7543

76-
return values_to_put
44+
if node.thing.data_type is not None:
45+
values_to_put_for_this_node['neighbour_data_type'] = node.thing.data_type
46+
values_to_put_for_this_node['neighbour_value_' + node.thing.data_type] = node.thing.value
7747

48+
values_to_put_at_this_depth[node.indices] = values_to_put_for_this_node
7849

79-
def _put_values_into_array(arrays_at_this_depth, current_indices, values_to_put):
80-
for key, value in values_to_put.items():
81-
# Ensure that the rank of the array is the same as the number of indices, or risk setting more than
82-
# one value
83-
assert len(arrays_at_this_depth[key].shape) == len(current_indices)
84-
arrays_at_this_depth[key][current_indices] = value
85-
return arrays_at_this_depth
50+
context_values_to_put[depth] = values_to_put_at_this_depth
8651

52+
return context_values_to_put
8753

88-
def _repeat_until_full(current_indices, depth, depthwise_arrays, n, expected_n):
89-
# TODO This has side-effects, it modifies depthwise_arrays in-place
90-
if n < expected_n:
91-
boundary = n + 1
92-
slice_to_repeat = list(current_indices)
93-
slice_to_repeat[1] = slice(boundary)
94-
slice_to_repeat.insert(1, ...)
95-
slice_to_repeat = tuple(slice_to_repeat)
9654

97-
slice_to_replace = list(slice_to_repeat)
98-
slice_to_replace[2] = slice(boundary, None)
99-
slice_to_replace = tuple(slice_to_replace)
55+
def batch_values_to_put(batch_values):
56+
batched_values = {}
57+
for batch_index, structure in enumerate(batch_values):
58+
for depth, indexed_values_to_put in structure.items():
59+
for index, values_to_put in indexed_values_to_put.items():
60+
full_index = (batch_index,) + index
61+
batched_values.setdefault(depth, {})[full_index] = values_to_put
10062

101-
# For the current depth and deeper
102-
for d in list(range(depth, -1, -1)):
103-
for array in list(depthwise_arrays[d].values()):
104-
fill_array_with_repeats(array, slice_to_repeat, slice_to_replace)
105-
return depthwise_arrays
63+
return batched_values
10664

10765

108-
def _add_neighbour_data_to_array(current_indices, depth, depthwise_arrays, neighbour):
109-
# TODO This has side-effects, it modifies depthwise_arrays in-place
110-
thing = neighbour.context.thing
111-
values_to_put = _get_values_to_put(neighbour.role_label, neighbour.role_direction,
112-
thing.type_label, thing.data_type, thing.value)
113-
depthwise_arrays[depth] = _put_values_into_array(depthwise_arrays[depth], current_indices, values_to_put)
114-
return depthwise_arrays
66+
def initialise_arrays(array_shape: typ.Tuple[int], **array_names_with_dtypes_and_default_values):
11567

68+
if len(array_names_with_dtypes_and_default_values) == 0:
69+
raise ValueError('At least one array dtype and default value must be provided')
11670

117-
class ArrayConverter:
118-
"""
119-
Converts contexts into an array
120-
"""
121-
122-
def __init__(self, neighbourhood_sizes: typ.Tuple[int]):
123-
"""
71+
arrays = {}
72+
for array_name, (array_data_type, default_value) in array_names_with_dtypes_and_default_values.items():
12473

125-
:param neighbourhood_sizes: The number of neighbours sampled at each recursion
126-
"""
127-
self._neighbourhood_sizes = tuple(reversed(neighbourhood_sizes))
74+
arrays[array_name] = np.full(shape=array_shape,
75+
fill_value=default_value,
76+
dtype=array_data_type)
77+
return arrays
12878

129-
# Array types and default values
130-
self._array_data_types = collections.OrderedDict(
131-
[('role_type', (np.dtype('U50'), '')),
132-
('role_direction', (np.int, 0)),
133-
('neighbour_type', (np.dtype('U50'), '')),
134-
('neighbour_data_type', (np.dtype('U10'), '')),
135-
('neighbour_value_long', (np.int, 0)),
136-
('neighbour_value_double', (np.float, 0.0)),
137-
('neighbour_value_boolean', (np.int, -1)),
138-
('neighbour_value_date', ('datetime64[s]', '')),
139-
('neighbour_value_string', (np.dtype('U50'), ''))])
140-
self.indices_visited = []
14179

142-
def _initialise_arrays(self, num_example_things):
143-
#####################################################
144-
# Make the empty arrays to fill
145-
#####################################################
80+
def initialise_arrays_for_all_depths(max_hops_shape: typ.Tuple[int], **array_names_with_dtypes_and_default_values):
81+
initialised_depth_arrays = []
82+
depth_array_sizes = get_depth_array_sizes(max_hops_shape)
14683

147-
return build_default_arrays(self._neighbourhood_sizes, num_example_things, self._array_data_types)
84+
for i, array_shape in enumerate(depth_array_sizes):
85+
if i == len(depth_array_sizes) - 1:
14886

149-
def convert_to_array(self, thing_contexts: typ.List[builder.Neighbour]):
150-
"""
151-
Build the arrays to represent the depths of neighbour traversals.
152-
:param top_level_neighbours:
153-
:return: a list of arrays, one for each depth, including one for the starting nodes of interest
154-
"""
87+
array_names_with_dtypes_and_default_values.pop('role_type', None)
88+
array_names_with_dtypes_and_default_values.pop('role_direction', None)
15589

156-
nun_example_things = len(thing_contexts)
157-
self.indices_visited = []
158-
depthwise_arrays = self._initialise_arrays(nun_example_things)
90+
initialised_depth_arrays.append(initialise_arrays(array_shape, **array_names_with_dtypes_and_default_values))
15991

160-
#####################################################
161-
# Populate the arrays from the neighbour contexts
162-
#####################################################
163-
depthwise_arrays = self._build_neighbours(thing_contexts, depthwise_arrays, tuple())
164-
return depthwise_arrays
92+
return initialised_depth_arrays
16593

166-
def _build_neighbours(self, neighbours: typ.List[builder.Neighbour],
167-
depthwise_arrays: typ.List[typ.Dict[str, np.ndarray]],
168-
last_indices: typ.Tuple):
169-
# TODO This has side-effects, it modifies depthwise_arrays in-place
17094

171-
n = None
172-
current_indices = None
95+
def get_depth_array_sizes(max_hops_shape: typ.Tuple[int]):
96+
depth_array_sizes = []
97+
max_hops_size_list = list(max_hops_shape)
98+
for _ in max_hops_shape[1:]:
17399

174-
for n, neighbour in enumerate(neighbours):
175-
current_indices = _get_indices(last_indices, n)
176-
self.indices_visited.append(current_indices) # TODO Remove, but useful for debugging
100+
depth_array_sizes.append(tuple(max_hops_size_list))
177101

178-
depth = self._determine_depth(current_indices)
102+
max_hops_size_list.pop(1)
103+
return depth_array_sizes
179104

180-
depthwise_arrays = _add_neighbour_data_to_array(current_indices, depth, depthwise_arrays, neighbour)
181105

182-
depthwise_arrays = self._build_neighbours(neighbour.context.neighbourhood, depthwise_arrays, current_indices)
183-
184-
print(f'n = {n}, last_indices = {current_indices}')
185-
186-
# Duplicate the sections of the arrays already built so that they are padded to be complete
187-
if n is not None and depth < len(self._neighbourhood_sizes):
188-
expected_n = self._neighbourhood_sizes[depth] - 1
189-
depthwise_arrays = _repeat_until_full(current_indices, depth, depthwise_arrays, n, expected_n)
190-
191-
return depthwise_arrays
192-
193-
def _determine_depth(self, current_indices):
194-
# depth = len(self._neighbourhood_sizes) + 2 - (len(last_indices) + 1)
195-
depth = len(self._neighbourhood_sizes) + 2 - len(current_indices)
196-
return depth
106+
def fill_arrays_at_all_depths(initialised_arrays, batch_values: typ.Dict):
107+
"""
108+
Populates initialised arrays
109+
:param initialised_arrays: Arrays for the different hops of the context, for the different datatypes needed,
110+
initialised with default values
111+
:param batch_values: The sparse values to use to populate the arrays
112+
:return: Populated arrays
113+
"""
197114

115+
for depth, indexed_values_to_put in batch_values.items():
116+
for indices, values_to_put in indexed_values_to_put.items():
117+
for array_name, value_to_put in values_to_put.items():
118+
expanded_indices = indices + (0,)
119+
initialised_arrays[depth][array_name][expanded_indices] = value_to_put
198120

199-
def fill_array_with_repeats(array, slice_to_repeat, slice_to_replace):
200-
to_repeat = array[slice_to_repeat]
201-
to_fill = array[slice_to_replace]
121+
return initialised_arrays
202122

203-
num_repeats = -(-to_fill.shape[0] // to_repeat.shape[0])
204123

205-
tile_axes = [1] * len(to_fill.shape)
206-
tile_axes[0] = num_repeats + 1
124+
def convert_context_batch_to_arrays(context_batch, max_hops_shape: typ.Tuple,
125+
**array_names_with_dtypes_and_default_values: typ.Tuple):
126+
indexed_values = map(get_context_values_to_put, context_batch)
207127

208-
filler = np.tile(to_repeat, tile_axes)
128+
batch_values = batch_values_to_put(indexed_values)
209129

210-
filler_axes = tuple(slice(None, i) for i in to_fill.shape)
130+
# Now we have a data structure like this:
131+
# {
132+
# depth: {
133+
# index: { values to put}
134+
# }
135+
# }
136+
# Where the index now includes the number within the batch as its first element
211137

212-
curtailed_filler = filler[filler_axes]
138+
initialised_arrays = initialise_arrays_for_all_depths(max_hops_shape, **array_names_with_dtypes_and_default_values)
213139

214-
array[slice_to_replace] = curtailed_filler
140+
return fill_arrays_at_all_depths(initialised_arrays, batch_values)

0 commit comments

Comments
 (0)