Skip to content

Commit 23ba1e7

Browse files
committed
Extract SQLExtendedFetch implementation
Co-Authored-By: alinalibq <[email protected]>
1 parent 42f27ab commit 23ba1e7

File tree

5 files changed

+164
-8
lines changed

5 files changed

+164
-8
lines changed

cpp/src/arrow/flight/sql/odbc/odbc_api.cc

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -830,16 +830,38 @@ SQLRETURN SQLFetch(SQLHSTMT stmt) {
830830
SQLRETURN SQLExtendedFetch(SQLHSTMT stmt, SQLUSMALLINT fetch_orientation,
831831
SQLLEN fetch_offset, SQLULEN* row_count_ptr,
832832
SQLUSMALLINT* row_status_array) {
833-
// GH-47110 TODO: SQLExtendedFetch should return SQL_SUCCESS_WITH_INFO for certain diag
833+
// GH-47110: SQLExtendedFetch should return SQL_SUCCESS_WITH_INFO for certain diag
834834
// states
835835
ARROW_LOG(DEBUG) << "SQLExtendedFetch called with stmt: " << stmt
836836
<< ", fetch_orientation: " << fetch_orientation
837837
<< ", fetch_offset: " << fetch_offset
838838
<< ", row_count_ptr: " << static_cast<const void*>(row_count_ptr)
839839
<< ", row_status_array: "
840840
<< static_cast<const void*>(row_status_array);
841-
// GH-47714 TODO: Implement SQLExtendedFetch
842-
return SQL_INVALID_HANDLE;
841+
842+
using ODBC::ODBCDescriptor;
843+
using ODBC::ODBCStatement;
844+
return ODBCStatement::ExecuteWithDiagnostics(stmt, SQL_ERROR, [=]() {
845+
if (fetch_orientation != SQL_FETCH_NEXT) {
846+
throw DriverException("Optional feature not supported.", "HYC00");
847+
}
848+
// fetch_offset is ignored as only SQL_FETCH_NEXT is supported
849+
850+
ODBCStatement* statement = reinterpret_cast<ODBCStatement*>(stmt);
851+
852+
// The SQL_ROWSET_SIZE statement attribute specifies the number of rows in the
853+
// rowset.
854+
SQLULEN row_set_size = statement->GetRowsetSize();
855+
ARROW_LOG(DEBUG) << "SQL_ROWSET_SIZE value for SQLExtendedFetch: " << row_set_size;
856+
857+
if (statement->Fetch(static_cast<size_t>(row_set_size), row_count_ptr,
858+
row_status_array)) {
859+
return SQL_SUCCESS;
860+
} else {
861+
// Reached the end of rowset
862+
return SQL_NO_DATA;
863+
}
864+
});
843865
}
844866

845867
SQLRETURN SQLFetchScroll(SQLHSTMT stmt, SQLSMALLINT fetch_orientation,

cpp/src/arrow/flight/sql/odbc/odbc_impl/odbc_statement.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ void ODBCStatement::ExecuteDirect(const std::string& query) {
306306
is_prepared_ = false;
307307
}
308308

309-
bool ODBCStatement::Fetch(size_t rows) {
309+
bool ODBCStatement::Fetch(size_t rows, SQLULEN* row_count_ptr,
310+
SQLUSMALLINT* row_status_array) {
310311
if (has_reached_end_of_result_) {
311312
ird_->SetRowsProcessed(0);
312313
return false;
@@ -339,11 +340,24 @@ bool ODBCStatement::Fetch(size_t rows) {
339340
current_ard_->NotifyBindingsHavePropagated();
340341
}
341342

342-
size_t rows_fetched = current_result_->Move(rows, current_ard_->GetBindOffset(),
343-
current_ard_->GetBoundStructOffset(),
344-
ird_->GetArrayStatusPtr());
343+
uint16_t* array_status_ptr;
344+
if (row_status_array) {
345+
// For SQLExtendedFetch only
346+
array_status_ptr = row_status_array;
347+
} else {
348+
array_status_ptr = ird_->GetArrayStatusPtr();
349+
}
350+
351+
size_t rows_fetched =
352+
current_result_->Move(rows, current_ard_->GetBindOffset(),
353+
current_ard_->GetBoundStructOffset(), array_status_ptr);
345354
ird_->SetRowsProcessed(static_cast<SQLULEN>(rows_fetched));
346355

356+
if (row_count_ptr) {
357+
// For SQLExtendedFetch only
358+
*row_count_ptr = rows_fetched;
359+
}
360+
347361
row_number_ += rows_fetched;
348362
has_reached_end_of_result_ = rows_fetched != rows;
349363
return rows_fetched != 0;

cpp/src/arrow/flight/sql/odbc/odbc_impl/odbc_statement.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ class ODBCStatement : public ODBCHandle<ODBCStatement> {
6060

6161
/**
6262
* @brief Returns true if the number of rows fetch was greater than zero.
63+
* row_count_ptr and row_status_array are optional arguments, they are only needed for
64+
* SQLExtendedFetch
6365
*/
64-
bool Fetch(size_t rows);
66+
bool Fetch(size_t rows, SQLULEN* row_count_ptr = 0, SQLUSMALLINT* row_status_array = 0);
6567
bool IsPrepared() const;
6668

6769
void GetStmtAttr(SQLINTEGER statement_attribute, SQLPOINTER output,

cpp/src/arrow/flight/sql/odbc/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ add_arrow_test(flight_sql_odbc_test
3535
odbc_test_suite.cc
3636
odbc_test_suite.h
3737
connection_test.cc
38+
statement_test.cc
3839
# Enable Protobuf cleanup after test execution
3940
# GH-46889: move protobuf_test_util to a more common location
4041
../../../../engine/substrait/protobuf_test_util.cc
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
#include "arrow/flight/sql/odbc/tests/odbc_test_suite.h"
18+
19+
#include "arrow/flight/sql/odbc/odbc_impl/platform.h"
20+
21+
#include <sql.h>
22+
#include <sqltypes.h>
23+
#include <sqlucode.h>
24+
25+
#include <limits>
26+
27+
#include <gmock/gmock.h>
28+
#include <gtest/gtest.h>
29+
30+
namespace arrow::flight::sql::odbc {
31+
32+
template <typename T>
33+
class StatementTest : public T {};
34+
35+
class StatementMockTest : public FlightSQLODBCMockTestBase {};
36+
class StatementRemoteTest : public FlightSQLODBCRemoteTestBase {};
37+
using TestTypes = ::testing::Types<StatementMockTest, StatementRemoteTest>;
38+
TYPED_TEST_SUITE(StatementTest, TestTypes);
39+
40+
TYPED_TEST(StatementTest, TestSQLExtendedFetchRowFetching) {
41+
// Set SQL_ROWSET_SIZE to fetch 3 rows at once
42+
43+
constexpr SQLULEN rows = 3;
44+
SQLINTEGER val[rows];
45+
SQLLEN buf_len = sizeof(val);
46+
SQLLEN ind[rows];
47+
48+
// Same variable will be used for column 1, the value of `val`
49+
// should be updated after every SQLFetch call.
50+
ASSERT_EQ(SQL_SUCCESS, SQLBindCol(this->stmt, 1, SQL_C_LONG, val, buf_len, ind));
51+
52+
ASSERT_EQ(SQL_SUCCESS, SQLSetStmtAttr(this->stmt, SQL_ROWSET_SIZE,
53+
reinterpret_cast<SQLPOINTER>(rows), 0));
54+
55+
std::wstring wsql =
56+
LR"(
57+
SELECT 1 AS small_table
58+
UNION ALL
59+
SELECT 2
60+
UNION ALL
61+
SELECT 3;
62+
)";
63+
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
64+
65+
ASSERT_EQ(SQL_SUCCESS,
66+
SQLExecDirect(this->stmt, &sql0[0], static_cast<SQLINTEGER>(sql0.size())));
67+
68+
// Fetch row 1-3.
69+
SQLULEN row_count;
70+
SQLUSMALLINT row_status[rows];
71+
72+
ASSERT_EQ(SQL_SUCCESS,
73+
SQLExtendedFetch(this->stmt, SQL_FETCH_NEXT, 0, &row_count, row_status));
74+
EXPECT_EQ(3, row_count);
75+
76+
for (int i = 0; i < rows; i++) {
77+
EXPECT_EQ(SQL_SUCCESS, row_status[i]);
78+
}
79+
80+
// Verify 1 is returned for row 1
81+
EXPECT_EQ(1, val[0]);
82+
// Verify 2 is returned for row 2
83+
EXPECT_EQ(2, val[1]);
84+
// Verify 3 is returned for row 3
85+
EXPECT_EQ(3, val[2]);
86+
87+
// Verify result set has no more data beyond row 3
88+
SQLULEN row_count2;
89+
SQLUSMALLINT row_status2[rows];
90+
EXPECT_EQ(SQL_NO_DATA,
91+
SQLExtendedFetch(this->stmt, SQL_FETCH_NEXT, 0, &row_count2, row_status2));
92+
}
93+
94+
TEST_F(StatementRemoteTest, DISABLED_TestSQLExtendedFetchQueryNullIndicator) {
95+
// GH-47110: SQLExtendedFetch should return SQL_SUCCESS_WITH_INFO for 22002
96+
// Limitation on mock test server prevents null from working properly, so use remote
97+
// server instead. Mock server has type `DENSE_UNION` for null column data.
98+
SQLINTEGER val;
99+
100+
ASSERT_EQ(SQL_SUCCESS, SQLBindCol(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
101+
102+
std::wstring wsql = L"SELECT null as null_col;";
103+
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
104+
105+
ASSERT_EQ(SQL_SUCCESS,
106+
SQLExecDirect(this->stmt, &sql0[0], static_cast<SQLINTEGER>(sql0.size())));
107+
108+
SQLULEN row_count1;
109+
SQLUSMALLINT row_status1[1];
110+
111+
// SQLExtendedFetch should return SQL_SUCCESS_WITH_INFO for 22002 state
112+
ASSERT_EQ(SQL_SUCCESS_WITH_INFO,
113+
SQLExtendedFetch(this->stmt, SQL_FETCH_NEXT, 0, &row_count1, row_status1));
114+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState22002);
115+
}
116+
117+
} // namespace arrow::flight::sql::odbc

0 commit comments

Comments
 (0)