diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index a671ac76d..61aad85ea 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -315,7 +315,7 @@ ######################################################## ################### 操作信息整体配置 ##################### -# 各操作的比例,按照顺序为 写入:Q1:Q2:Q3:Q4:Q5:Q6:Q7:Q8:Q9:Q10:Q11, 请注意使用英文冒号。比例中的每一项是整数。 +# 各操作的比例,按照顺序为 写入:Q1:Q2:Q3:Q4:Q5:Q6:Q7:Q8:Q9:Q10:Q11:Q12, 请注意使用英文冒号。比例中的每一项是整数。 # Qi表示的查询如下: # Q1 精确点查询 select v1... from data where time = ? and device in ? # Q2 范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? @@ -328,7 +328,8 @@ # Q9 倒序范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? order by time desc # Q10 倒序带值过滤的范围查询 select v1... from data where time > ? and time < ? and v1 > ? and device in ? order by time desc # Q11 分组聚合查询,倒序;目前仅支持iotdb、tdengine-3.0、influxdb v1 -# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:0 +# Q12 基于前缀路径的快速 last 查询(iotdb-2.0专用) +OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:1 # 最长等待写时间,单位毫秒,即如果整个写操作在指定时间内没有返回,则终止此操作 # WRITE_OPERATION_TIMEOUT_MS=120000 diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java index 5c2cc5a01..0de2c1035 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java @@ -101,6 +101,12 @@ protected void doTest() { case GROUP_BY_QUERY_ORDER_BY_TIME_DESC: dbWrapper.groupByQueryOrderByDesc(queryWorkLoad.getGroupByQuery()); break; + case FAST_LAST_PREFIX_QUERY: + // 示例:自动生成一个前缀路径列表,可根据实际需求调整 + java.util.List prefixPaths = new java.util.ArrayList<>(); + prefixPaths.add("root"); + dbWrapper.fastLastPrefixQuery(prefixPaths); + break; default: LOGGER.error("Unsupported operation sensorType {}", operation); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java index 10134ac4c..f121f2c76 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java @@ -35,6 +35,7 @@ public enum Operation { RANGE_QUERY_ORDER_BY_TIME_DESC("RANGE_QUERY_DESC"), VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC("VALUE_RANGE_QUERY_DESC"), GROUP_BY_QUERY_ORDER_BY_TIME_DESC("GROUP_BY_DESC"), + FAST_LAST_PREFIX_QUERY("FAST_LAST_PREFIX_QUERY"), // Q12: 基于前缀路径的快速 last 查询 VERIFICATION_QUERY("VERIFICATION_QUERY"), DEVICE_QUERY("DEVICE_QUERY"); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java index 4e9911531..8f7cb73a7 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java @@ -51,6 +51,30 @@ import java.util.stream.Collectors; public class DBWrapper implements IDatabase { + /** Q12: Fast last query for prefix paths (IoTDB 2.0+) */ + @Override + public Status fastLastPrefixQuery(List prefixPaths) { + Status status = null; + Operation operation = Operation.FAST_LAST_PREFIX_QUERY; + String device = + (prefixPaths != null && !prefixPaths.isEmpty()) ? prefixPaths.get(0) : "No Prefix"; + try { + List statuses = new ArrayList<>(); + for (IDatabase database : databases) { + long start = System.nanoTime(); + status = database.fastLastPrefixQuery(prefixPaths); + long end = System.nanoTime(); + status.setTimeCost(end - start); + statuses.add(status); + } + for (Status sta : statuses) { + handleQueryOperation(sta, operation, device); + } + } catch (Exception e) { + handleUnexpectedQueryException(operation, e, device); + } + return status; + } private static final Logger LOGGER = LoggerFactory.getLogger(DBWrapper.class); private static Config config = ConfigDescriptor.getInstance().getConfig(); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java index 028ca8a37..59b23b5c5 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java @@ -41,6 +41,13 @@ import java.util.List; public interface IDatabase { + /** + * Q12: Fast last query for prefix paths (IoTDB 2.0+) + * + * @param prefixPaths 前缀路径列表 + * @return status which contains successfully executed flag, error message and so on. + */ + Status fastLastPrefixQuery(List prefixPaths); /** * Initialize any state for this DB. Called once per DB instance; there is one DB instance per diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/fakedb/FakeDB.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/fakedb/FakeDB.java index 2ed938790..658ec3d37 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/fakedb/FakeDB.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/fakedb/FakeDB.java @@ -36,6 +36,10 @@ import java.util.List; public class FakeDB implements IDatabase { + @Override + public Status fastLastPrefixQuery(List prefixPaths) { + return new Status(true, null, null); + } @Override public void init() throws TsdbException {} diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/self/SelfCheck.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/self/SelfCheck.java index e3ec5b3b5..21238fb60 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/self/SelfCheck.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/self/SelfCheck.java @@ -38,6 +38,11 @@ import java.util.Set; public class SelfCheck implements IDatabase { + @Override + public Status fastLastPrefixQuery(List prefixPaths) { + return check(new Status(true, null, null)); + } + private static final Config config = ConfigDescriptor.getInstance().getConfig(); private static final Logger logger = LoggerFactory.getLogger(SelfCheck.class); private long loop = 0; diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/DMLStrategy/TreeSessionManager.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/DMLStrategy/TreeSessionManager.java index fe8b0d54c..220bc9125 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/DMLStrategy/TreeSessionManager.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/DMLStrategy/TreeSessionManager.java @@ -44,6 +44,30 @@ public class TreeSessionManager extends SessionManager { private static final Logger LOGGER = LoggerFactory.getLogger(TableSessionManager.class); private final Session session; + /** + * 新增:基于前缀路径的快速 last 查询(第11种查询方式) + * + * @param prefixPath 例如 Arrays.asList("root", "sg1") + * @return SessionDataSet 查询结果 + */ + public SessionDataSet executeFastLastDataQueryForOnePrefixPath(List prefixPath) + throws IoTDBConnectionException, StatementExecutionException { + // 兼容未实现的 session 方法,优先用反射调用 + try { + java.lang.reflect.Method method = + session.getClass().getMethod("executeFastLastDataQueryForOnePrefixPath", List.class); + Object result = method.invoke(session, prefixPath); + return (SessionDataSet) result; + } catch (NoSuchMethodException nsme) { + LOGGER.error("Session未实现executeFastLastDataQueryForOnePrefixPath方法", nsme); + throw new StatementExecutionException( + "Session未实现executeFastLastDataQueryForOnePrefixPath方法", nsme); + } catch (Exception e) { + LOGGER.error("executeFastLastDataQueryForOnePrefixPath failed", e); + throw new StatementExecutionException(e); + } + } + public TreeSessionManager(DBConfig dbConfig) { super(dbConfig); List hostUrls = new ArrayList<>(dbConfig.getHOST().size()); diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java index ab4b5486f..c671cae38 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java @@ -448,6 +448,42 @@ public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC); } + /** + * Q12: 基于前缀路径的快速 last 查询(iotdb-2.0专用) + * + * @param prefixPaths 前缀路径列表 + */ + public Status fastLastPrefixQuery(List prefixPaths) { + // 仅支持 SessionStrategy,反射获取 sessionManager 字段 + TreeSessionManager treeSessionManager = null; + if (dmlStrategy instanceof SessionStrategy) { + try { + java.lang.reflect.Field field = dmlStrategy.getClass().getDeclaredField("sessionManager"); + field.setAccessible(true); + Object inner = field.get(dmlStrategy); + if (inner instanceof TreeSessionManager) { + treeSessionManager = (TreeSessionManager) inner; + } + } catch (Exception e) { + LOGGER.error("Q12 反射获取 sessionManager 失败", e); + } + } + if (treeSessionManager == null) { + return new Status( + false, 0, new Exception("Q12 仅支持 TreeSessionManager"), "Q12 仅支持 TreeSessionManager"); + } + try { + Object result = treeSessionManager.executeFastLastDataQueryForOnePrefixPath(prefixPaths); + if (!config.isIS_QUIET_MODE()) { + LOGGER.info("Q12 fastLastPrefixQuery, prefixPaths: {}", prefixPaths); + } + return new Status(true, 0, "Q12 fastLastPrefixQuery executed", null); + } catch (Exception e) { + LOGGER.error("Q12 fastLastPrefixQuery error", e); + return new Status(false, 0, e, "Q12 fastLastPrefixQuery error"); + } + } + /** * Generate simple query header. *