Skip to content

Commit b86cdac

Browse files
authored
Merge pull request #166 from quisitive-crogers/feature/source-json-support
Support for SqlServer JSON Fields
2 parents 4ff6cd2 + fff0c9e commit b86cdac

File tree

5 files changed

+240
-78
lines changed

5 files changed

+240
-78
lines changed

Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerDataSourceExtensionTests.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
using Cosmos.DataTransfer.Common.UnitTests;
77
using Moq;
88
using Microsoft.Extensions.Configuration;
9+
using System.Text.Json;
910

1011
namespace Cosmos.DataTransfer.SqlServerExtension.UnitTests;
1112

1213
[TestClass]
1314
public class SqlServerDataSourceExtensionTests
1415
{
1516

16-
private static async Task<Tuple<SqliteFactory,DbConnection>> connectionFactory(CancellationToken cancellationToken = default(CancellationToken)) {
17+
private static async Task<Tuple<SqliteFactory, DbConnection>> connectionFactory(CancellationToken cancellationToken = default(CancellationToken))
18+
{
1719
var provider = SqliteFactory.Instance;
1820
var connection = provider.CreateConnection();
1921
await connection.OpenAsync(cancellationToken);
@@ -35,15 +37,23 @@ name TEXT
3537
}
3638

3739
[TestMethod]
38-
public async Task TestReadAsync() {
39-
var config = new Mock<IConfiguration>();
40+
public async Task TestReadAsync()
41+
{
42+
var configSettings = new Dictionary<string, string>
43+
{
44+
{ "ConnectionString", "Data Source=:memory:" },
45+
{ "QueryText", "SELECT 1;" }
46+
};
47+
48+
var config = TestHelpers.CreateConfig(configSettings);
49+
4050
var cancellationToken = new CancellationTokenSource(500);
4151
var (providerFactory, connection) = await connectionFactory(cancellationToken.Token);
4252

4353
var extension = new SqlServerDataSourceExtension();
4454
Assert.AreEqual("SqlServer", extension.DisplayName);
4555

46-
var result = await extension.ReadAsync(config.Object, NullLogger.Instance,
56+
var result = await extension.ReadAsync(config, NullLogger.Instance,
4757
"SELECT * FROM foobar", Array.Empty<DbParameter>(), connection, providerFactory, cancellationToken.Token).ToListAsync();
4858
var expected = new List<DictionaryDataItem> {
4959
new DictionaryDataItem(new Dictionary<string, object?> { { "id", (long)1 }, { "name", "zoo" } }),
@@ -53,8 +63,16 @@ public async Task TestReadAsync() {
5363
}
5464

5565
[TestMethod]
56-
public async Task TestReadAsyncWithParameters() {
57-
var config = new Mock<IConfiguration>();
66+
public async Task TestReadAsyncWithParameters()
67+
{
68+
var configSettings = new Dictionary<string, string>
69+
{
70+
{ "ConnectionString", "Data Source=:memory:" },
71+
{ "QueryText", "SELECT 1;" }
72+
};
73+
74+
var config = TestHelpers.CreateConfig(configSettings);
75+
5876
var cancellationToken = new CancellationTokenSource();
5977
var (providerFactory, connection) = await connectionFactory(cancellationToken.Token);
6078

@@ -66,8 +84,8 @@ public async Task TestReadAsyncWithParameters() {
6684
parameter.DbType = System.Data.DbType.Int32;
6785
parameter.Value = 2;
6886

69-
var result = await extension.ReadAsync(config.Object, NullLogger.Instance,
70-
"SELECT * FROM foobar WHERE id = @x",
87+
var result = await extension.ReadAsync(config, NullLogger.Instance,
88+
"SELECT * FROM foobar WHERE id = @x",
7189
new DbParameter[] { parameter }, connection, providerFactory, cancellationToken.Token).FirstAsync();
7290
Assert.That.AreEqual(result,
7391
new DictionaryDataItem(new Dictionary<string, object?> { { "id", (long)2 }, { "name", null } }),
@@ -90,10 +108,12 @@ public async Task TestReadAsyncWithParameters() {
90108
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable.
91109
[TestMethod]
92110
[Timeout(1000)]
93-
public async Task TestReadAsync_LiveSqlServer() {
111+
public async Task TestReadAsync_LiveSqlServer()
112+
{
94113
var connectionString = (string?)TestContext.Properties["TestReadAsync_LiveSqlServer_ConnectionString"];
95114
connectionString ??= Environment.GetEnvironmentVariable("TestReadAsync_LiveSqlServer_ConnectionString");
96-
if (connectionString is null) {
115+
if (connectionString is null)
116+
{
97117
Assert.Inconclusive("Could not run, as no connection string to live SQL Server was provided.");
98118
}
99119

@@ -105,7 +125,7 @@ public async Task TestReadAsync_LiveSqlServer() {
105125

106126
var result = await extension.ReadAsync(config, NullLogger.Instance).FirstAsync();
107127

108-
Assert.IsTrue(new DataItemComparer().Equals(result,
128+
Assert.IsTrue(new DataItemComparer().Equals(result,
109129
new DictionaryDataItem(new Dictionary<string, object?> {
110130
{ "", 1 },
111131
{ "bar", "foo" },

Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSourceExtension.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,30 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
2323
var connection = providerFactory.CreateConnection()!;
2424
connection.ConnectionString = settings!.ConnectionString;
2525

26-
var iterable = this.ReadAsync(config, logger, settings.GetQueryText(),
27-
settings.GetDbParameters(providerFactory), connection,
26+
var iterable = this.ReadAsync(config, logger, settings.GetQueryText(),
27+
settings.GetDbParameters(providerFactory), connection,
2828
providerFactory, cancellationToken);
29-
30-
await foreach (var item in iterable) {
29+
30+
await foreach (var item in iterable)
31+
{
3132
yield return item;
3233
}
3334
}
3435

3536
public async IAsyncEnumerable<IDataItem> ReadAsync(
36-
IConfiguration config,
37-
ILogger logger,
37+
IConfiguration config,
38+
ILogger logger,
3839
string queryText,
3940
DbParameter[] parameters,
4041
DbConnection connection,
4142
DbProviderFactory dbProviderFactory,
4243
[EnumeratorCancellation] CancellationToken cancellationToken = default)
4344
{
44-
try {
45+
try
46+
{
47+
var settings = config.Get<SqlServerSourceSettings>();
48+
settings.Validate();
49+
4550
await connection.OpenAsync(cancellationToken);
4651
var command = connection.CreateCommand();
4752
command.CommandText = queryText;
@@ -59,11 +64,19 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(
5964
{
6065
value = null;
6166
}
67+
68+
if (settings?.JsonFields != null && settings.JsonFields.Contains(column.ColumnName))
69+
{
70+
value = DataItemJsonConverter.Deserialize(value!.ToString());
71+
}
72+
6273
fields[column.ColumnName] = value;
6374
}
6475
yield return new DictionaryDataItem(fields);
6576
}
66-
} finally {
77+
}
78+
finally
79+
{
6780
await connection.CloseAsync();
6881
}
6982
}

Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSourceSettings.cs

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,42 @@ public class SqlServerSourceSettings : IDataExtensionSettings, IValidatableObjec
1212
public string? ConnectionString { get; set; }
1313

1414
public string? QueryText { get; set; }
15+
public string[] JsonFields { get; set; } = [];
1516

1617
public string? FilePath { get; set; }
1718

1819
public IDictionary<string, object>? Parameters { get; set; }
19-
20+
2021
public IEnumerable<ValidationResult> Validate(ValidationContext validationContext)
2122
{
22-
if (String.IsNullOrWhiteSpace(this.ConnectionString)) {
23+
if (String.IsNullOrWhiteSpace(this.ConnectionString))
24+
{
2325
yield return new ValidationResult("The `ConnectionString` field is required.",
2426
new string[] { "ConnectionString" });
25-
}
27+
}
2628
if (String.IsNullOrWhiteSpace(this.QueryText) &&
27-
String.IsNullOrWhiteSpace(this.FilePath)) {
28-
yield return new ValidationResult(
29-
"Either `QueryText` or `FilePath` are required!",
30-
new string[] { "QueryText", "FilePath"});
31-
} else if (String.IsNullOrWhiteSpace(this.QueryText) == false &&
32-
String.IsNullOrWhiteSpace(this.FilePath) == false) {
33-
yield return new ValidationResult(
34-
"Both `QueryText` and `FilePath` are not allowed.",
35-
new string[] { "QueryText", "FilePath"});
29+
String.IsNullOrWhiteSpace(this.FilePath))
30+
{
31+
yield return new ValidationResult(
32+
"Either `QueryText` or `FilePath` are required!",
33+
new string[] { "QueryText", "FilePath" });
3634
}
37-
if (!String.IsNullOrWhiteSpace(this.FilePath)) {
35+
else if (String.IsNullOrWhiteSpace(this.QueryText) == false &&
36+
String.IsNullOrWhiteSpace(this.FilePath) == false)
37+
{
38+
yield return new ValidationResult(
39+
"Both `QueryText` and `FilePath` are not allowed.",
40+
new string[] { "QueryText", "FilePath" });
41+
}
42+
if (!String.IsNullOrWhiteSpace(this.FilePath))
43+
{
3844
ValidationResult? res = null;
39-
try {
45+
try
46+
{
4047
_ = File.ReadAllText(this.FilePath);
41-
} catch (Exception e) {
48+
}
49+
catch (Exception e)
50+
{
4251
res = new ValidationResult("Could not read `FilePath`. Reason: \n" + e.Message,
4352
new string[] { "FilePath" });
4453
}
@@ -54,44 +63,60 @@ public IEnumerable<ValidationResult> Validate(ValidationContext validationContex
5463
/// or <code>Microsoft.Data.Sqlite.SqliteFactory.Instance</code>.
5564
/// </param>
5665
/// <returns></returns>
57-
public DbParameter[] GetDbParameters(DbProviderFactory dbProviderFactory) {
66+
public DbParameter[] GetDbParameters(DbProviderFactory dbProviderFactory)
67+
{
5868
var result = new List<DbParameter>();
5969

60-
if (this.Parameters is null || this.Parameters.Count == 0) {
70+
if (this.Parameters is null || this.Parameters.Count == 0)
71+
{
6172
return Array.Empty<DbParameter>();
6273
}
6374

64-
foreach (var param in this.Parameters) {
75+
foreach (var param in this.Parameters)
76+
{
6577
var dbparam = dbProviderFactory.CreateParameter()!;
6678
dbparam.ParameterName = param.Key;
67-
if (param.Value is bool b) {
79+
if (param.Value is bool b)
80+
{
6881
dbparam.DbType = DbType.Boolean;
6982
dbparam.Value = b;
70-
} else if (param.Value is long l) {
83+
}
84+
else if (param.Value is long l)
85+
{
7186
dbparam.DbType = DbType.Int64;
7287
dbparam.Value = l;
73-
} else if (param.Value is int i) {
88+
}
89+
else if (param.Value is int i)
90+
{
7491
dbparam.DbType = DbType.Int32;
7592
dbparam.Value = i;
76-
} else if (param.Value is float f) {
93+
}
94+
else if (param.Value is float f)
95+
{
7796
dbparam.DbType = DbType.Single;
7897
dbparam.Value = f;
79-
} else if (param.Value is double d) {
98+
}
99+
else if (param.Value is double d)
100+
{
80101
dbparam.DbType = DbType.Double;
81102
dbparam.Value = d;
82-
} else {
103+
}
104+
else
105+
{
83106
dbparam.DbType = DbType.String;
84107
dbparam.Value = param.Value;
85-
}
86-
result.Add(dbparam);
87-
}
88-
return result.ToArray();
108+
}
109+
result.Add(dbparam);
110+
}
111+
return result.ToArray();
89112
}
90113

91-
public string GetQueryText() {
92-
if (!String.IsNullOrWhiteSpace(this.FilePath)) {
114+
public string GetQueryText()
115+
{
116+
if (!String.IsNullOrWhiteSpace(this.FilePath))
117+
{
93118
return File.ReadAllText(this.FilePath);
94-
}
119+
}
95120
return this.QueryText!;
96121
}
97122
}

Extensions/SqlServer/README.md

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ The SQL Server data transfer extension provides source and sink capabilities for
88

99
Source and sink settings both require a `ConnectionString` parameter.
1010

11+
If surfacing data as JSON, the `JsonFields` parameter can be used to specify which fields should be treated as JSON strings. This is useful for building nested sets of data that should be handled as a nested object downstream.
12+
1113
### Source
1214

1315
Source settings also require either a `QueryText` or `FilePath` parameter with the SQL statement that defines the data to select.
14-
`QueryText` can be the SQL statement, while `FilePath` can point to a file with the SQL statement.
15-
In both cases, the SQL statement can combine data from multiple tables, views, etc.,
16+
`QueryText` can be the SQL statement, while `FilePath` can point to a file with the SQL statement.
17+
In both cases, the SQL statement can combine data from multiple tables, views, etc.,
1618
but should produce a single result set.
1719

1820
```json
@@ -23,13 +25,15 @@ but should produce a single result set.
2325
}
2426
```
2527

28+
### Example Source settings
29+
2630
The extension supports parameterized queries, using named placeholders.
27-
Use `@param-name` for the named parameters.
31+
Use `@param-name` for the named parameters.
2832
Positional parameters are *not* supported.
2933

3034
Example (works better when query is in a separate file though):
3135

32-
```
36+
```json
3337
{
3438
"ConnectionString": "",
3539
"QueryText": "SELECT * FROM Logs WHERE UserId = @id",
@@ -39,18 +43,52 @@ Example (works better when query is in a separate file though):
3943
}
4044
```
4145

46+
This example illustrates the usage of JsonFields. You can see the subquery aliased as 'OrderLineItems' from Sales.OrderLines that is returned as Json using the 'FOR JSON AUTO' clause. This field is then referenced in the JsonFields Array.
47+
48+
```json
49+
{
50+
"ConnectionString": "Server=.;Database=WideWorldImporters;Trusted_Connection=True;TrustServerCertificate=True;",
51+
"IncludeMetadataFields": false,
52+
"QueryText": "SELECT TOP 1000 o.OrderID , c.CustomerName , o.OrderDate , o.ExpectedDeliveryDate , o.CustomerPurchaseOrderNumber , (select li.Description, li.Quantity FROM Sales.OrderLines li WHERE li.OrderID= o.OrderID FOR JSON AUTO) OrderLineItems FROM Sales.Customers c JOIN Sales.Orders o ON o.CustomerID = c.CustomerID ORDER BY OrderId",
53+
"JsonFields": [
54+
"OrderLineItems"
55+
]
56+
}
57+
```
58+
59+
Example Nested JSON Output
60+
61+
```json
62+
{
63+
"OrderID": 2,
64+
"CustomerName": "Bala Dixit",
65+
"OrderDate": "2013-01-01T00:00:00.0000000",
66+
"ExpectedDeliveryDate": "2013-01-02T00:00:00.0000000",
67+
"CustomerPurchaseOrderNumber": "15342",
68+
"OrderLineItems": [
69+
{
70+
"Description": "Developer joke mug - old C developers never die (White)",
71+
"Quantity": 9
72+
},
73+
{
74+
"Description": "USB food flash drive - chocolate bar",
75+
"Quantity": 9
76+
}
77+
]
78+
}
79+
```
80+
4281
### Sink
4382

4483
Sink settings require a `TableName` to define where to insert data and an array of `ColumnMappings`. Only fields listed in `ColumnMappings` will be imported. Each element in `ColumnMappings` requires a `ColumnName` specifying the target SQL column along with situation specific fields:
84+
4585
- `SourceFieldName`: This should be set in cases where the source data uses a different name than the SQL column. Column name to source field mapping defaults to using the `ColumnName` for both sides and is case-insensitive so it is not necessary to specify this parameter for mappings like `"id"` -> `"Id"`.
4686
- `AllowNull`: Depending on the table schema you may need to force values to be set for columns when no value is present in the source. If this is set to `false` this column will use the `DefaultValue` for any records missing a source value. Defaults to `true`.
4787
- `DefaultValue`: Value to be used in place of missing or null source fields. This parameter is ignored unless `AllowNull` is set to `false` for this column.
48-
- `DataType`: This setting specify the DataType of the column, default will be String: please refer documentation. https://learn.microsoft.com/sql/relational-databases/clr-integration-database-objects-types-net-framework/mapping-clr-parameter-data?view=sql-server-ver16&redirectedfrom=MSDN&tabs=csharp
88+
- `DataType`: This setting specify the DataType of the column, default will be String: please refer documentation. <https://learn.microsoft.com/sql/relational-databases/clr-integration-database-objects-types-net-framework/mapping-clr-parameter-data?view=sql-server-ver16&redirectedfrom=MSDN&tabs=csharp>
4989

50-
-
5190
Sink settings also include an optional `BatchSize` parameter to specify the count of records to accumulate before bulk inserting, default value is 1000.
5291

53-
5492
```json
5593
{
5694
"ConnectionString": "",
@@ -78,4 +116,4 @@ Sink settings also include an optional `BatchSize` parameter to specify the coun
78116
],
79117
"BatchSize": 1000
80118
}
81-
```
119+
```

0 commit comments

Comments
 (0)