Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,11 @@ private Operator mergeRawData(Map<TimeInterval, List<FragmentMeta>> fragments, L
Operator operator = OperatorUtils.unionOperators(unionList);
if (!dummyFragments.isEmpty()) {
List<Operator> joinList = new ArrayList<>();
dummyFragments.forEach(meta -> joinList.add(new Project(new FragmentSource(meta),
pathMatchPrefix(pathList,meta.getTsInterval().getTimeSeries(), meta.getTsInterval().getSchemaPrefix()), tagFilter)));
dummyFragments.forEach(meta -> {
String schemaPrefix = meta.getTsInterval().getSchemaPrefix();
joinList.add(new AddSchemaPrefix(new OperatorSource(new Project(new FragmentSource(meta),
pathMatchPrefix(pathList, meta.getTsInterval().getTimeSeries(), schemaPrefix), tagFilter)), schemaPrefix));
});
joinList.add(operator);
operator = OperatorUtils.joinOperatorsByTime(joinList);
}
Expand All @@ -346,13 +349,30 @@ private Pair<Map<TimeInterval, List<FragmentMeta>>, List<FragmentMeta>> getFragm
return keyFromTSIntervalToTimeInterval(fragmentsByTSInterval);
}

// 筛选出满足 dataPrefix前缀,并且去除 schemaPrefix
private List<String> pathMatchPrefix(List<String> pathList, String prefix, String schemaPrefix) {
if (prefix == null) return pathList;
if (schemaPrefix != null) prefix = schemaPrefix + "." + prefix; // deal with the schemaPrefix
if (prefix == null && schemaPrefix == null) return pathList;
List<String> ans = new ArrayList<>();

if (prefix == null) { // deal with the schemaPrefix
for(String path : pathList) {
if (path.equals("*.*") || path.equals("*")) {
ans.add(path);
} else if (path.indexOf(schemaPrefix) == 0) {
path = path.substring(schemaPrefix.length() + 1);
ans.add(path);
}
}
return ans;
}
// if (schemaPrefix != null) prefix = schemaPrefix + "." + prefix;

for(String path : pathList) {
if (path.equals("*.*")) {
ans.add(path);
if (schemaPrefix != null && path.indexOf(schemaPrefix) == 0) {
path = path.substring(schemaPrefix.length() + 1);
}
if (path.equals("*.*") || path.equals("*")) {
ans.add(prefix + ".*");
} else if (path.charAt(path.length()-1) == '*' && path.length() != 1) { // 通配符匹配,例如 a.b.*
String queryPrefix = path.substring(0,path.length()-2) + ".(.*)";
if (prefix.matches(queryPrefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public RowStream executeUnaryOperator(UnaryOperator operator, RowStream stream)
return executeRename((Rename) operator, transformToTable(stream));
case Reorder:
return executeReorder((Reorder) operator, transformToTable(stream));
case AddSchemaPrefix:
return executeAddSchemaPrefix((AddSchemaPrefix) operator, transformToTable(stream));
default:
throw new UnexpectedOperatorException("unknown unary operator: " + operator.getType());
}
Expand Down Expand Up @@ -331,6 +333,32 @@ private RowStream executeRename(Rename rename, Table table) throws PhysicalExcep
return new Table(newHeader, rows);
}

private RowStream executeAddSchemaPrefix(AddSchemaPrefix addSchemaPrefix, Table table) throws PhysicalException {
Header header = table.getHeader();
String schemaPrefix = addSchemaPrefix.getSchemaPrefix();

List<Field> fields = new ArrayList<>();
header.getFields().forEach(field -> {
if (schemaPrefix != null)
fields.add(new Field(schemaPrefix + "." + field.getName(), field.getType(), field.getTags()));
else
fields.add(new Field(field.getName(), field.getType(), field.getTags()));
});

Header newHeader = new Header(header.getKey(), fields);

List<Row> rows = new ArrayList<>();
table.getRows().forEach(row -> {
if (newHeader.hasKey()) {
rows.add(new Row(newHeader, row.getKey(), row.getValues()));
} else {
rows.add(new Row(newHeader, row.getValues()));
}
});

return new Table(newHeader, rows);
}

private RowStream executeReorder(Reorder reorder, Table table) throws PhysicalException {
List<String> patterns = reorder.getPatterns();
Header header = table.getHeader();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream;

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSchemaPrefix;
import cn.edu.tsinghua.iginx.engine.shared.operator.Rename;
import cn.edu.tsinghua.iginx.utils.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public class AddSchemaPrefixLazyStream extends UnaryLazyStream {

private final AddSchemaPrefix addSchemaPrefix;

private Header header;

public AddSchemaPrefixLazyStream(AddSchemaPrefix addSchemaPrefix, RowStream stream) {
super(stream);
this.addSchemaPrefix = addSchemaPrefix;
}

@Override
public Header getHeader() throws PhysicalException {
if (header == null) {
Header header = stream.getHeader();
String schemaPrefix = addSchemaPrefix.getSchemaPrefix();

List<Field> fields = new ArrayList<>();
header.getFields().forEach(field -> {
if (schemaPrefix != null)
fields.add(new Field(schemaPrefix + "." + field.getName(), field.getType(), field.getTags()));
else
fields.add(new Field(field.getName(), field.getType(), field.getTags()));
});

this.header = new Header(header.getKey(), fields);
}
return header;
}

@Override
public boolean hasNext() throws PhysicalException {
return stream.hasNext();
}

@Override
public Row next() throws PhysicalException {
if (!hasNext()) {
throw new IllegalStateException("row stream doesn't have more data!");
}

Row row = stream.next();
if (header.hasKey()) {
return new Row(header, row.getKey(), row.getValues());
} else {
return new Row(header, row.getValues());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public RowStream executeUnaryOperator(UnaryOperator operator, RowStream stream)
return executeRename((Rename) operator, stream);
case Reorder:
return executeReorder((Reorder) operator, stream);
case AddSchemaPrefix:
return executeAddSchemaPrefix((AddSchemaPrefix) operator, stream);
default:
throw new UnexpectedOperatorException("unknown unary operator: " + operator.getType());
}
Expand Down Expand Up @@ -128,6 +130,10 @@ private RowStream executeReorder(Reorder reorder, RowStream stream) {
return new ReorderLazyStream(reorder, stream);
}

private RowStream executeAddSchemaPrefix(AddSchemaPrefix addSchemaPrefix, RowStream stream) {
return new AddSchemaPrefixLazyStream(addSchemaPrefix, stream);
}

private RowStream executeJoin(Join join, RowStream streamA, RowStream streamB) throws PhysicalException {
if (!join.getJoinBy().equals(Constants.KEY) && !join.getJoinBy().equals(Constants.ORDINAL)) {
throw new InvalidOperatorParameterException("join operator is not support for field " + join.getJoinBy() + " except for " + Constants.KEY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cn.edu.tsinghua.iginx.engine.shared.operator;

import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;

import java.util.ArrayList;
import java.util.List;

public class AddSchemaPrefix extends AbstractUnaryOperator {

private final String schemaPrefix;// 可以为 null

public AddSchemaPrefix(Source source, String schemaPrefix) {
super(OperatorType.AddSchemaPrefix, source);
this.schemaPrefix = schemaPrefix;
}

@Override
public Operator copy() {
return new AddSchemaPrefix(getSource().copy(), schemaPrefix);
}

public String getSchemaPrefix() {
return schemaPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum OperatorType {
Rename,

Reorder,
AddSchemaPrefix,

Delete,
Insert,
Expand All @@ -55,7 +56,7 @@ public static boolean isBinaryOperator(OperatorType op) {
}

public static boolean isUnaryOperator(OperatorType op) {
return op == Project || op == Select || op == Sort || op == Limit || op == Downsample || op == RowTransform || op == SetTransform || op == MappingTransform || op == Delete || op == Insert || op == Rename || op == Reorder;
return op == Project || op == Select || op == Sort || op == Limit || op == Downsample || op == RowTransform || op == SetTransform || op == MappingTransform || op == Delete || op == Insert || op == Rename || op == Reorder || op == AddSchemaPrefix;
}

public static boolean isMultipleOperator(OperatorType op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,11 @@ public TaskExecuteResult execute(StoragePhysicalTask task) {
return new TaskExecuteResult(new NonExecutablePhysicalTaskException("unsupported physical task"));
}

private String getRealPathWithoutPrefix(String oriPath, String prefix) {
if (prefix != null && !prefix.isEmpty() && oriPath.contains(prefix)) {
return oriPath.substring(oriPath.indexOf(prefix) + prefix.length() + 1);
}
return oriPath;
}

private TaskExecuteResult executeHistoryProjectTask(TimeSeriesRange timeSeriesInterval, TimeInterval timeInterval, Project project) {
Map<String, String> bucketQueries = new HashMap<>();
TagFilter tagFilter = project.getTagFilter();
for (String pattern: project.getPatterns()) {
Pair<String, String> pair = SchemaTransformer.processPatternForQuery(getRealPathWithoutPrefix(pattern, timeSeriesInterval.getSchemaPrefix()), tagFilter);
Pair<String, String> pair = SchemaTransformer.processPatternForQuery(pattern, tagFilter);
String bucketName = pair.k;
String query = pair.v;
String fullQuery = "";
Expand Down Expand Up @@ -276,7 +269,7 @@ private TaskExecuteResult executeHistoryProjectTask(TimeSeriesRange timeSeriesIn
bucketQueryResults.put(bucket, client.getQueryApi().query(statement, organization.getId()));
}

InfluxDBHistoryQueryRowStream rowStream = new InfluxDBHistoryQueryRowStream(bucketQueryResults, project.getPatterns(), timeSeriesInterval.getSchemaPrefix());
InfluxDBHistoryQueryRowStream rowStream = new InfluxDBHistoryQueryRowStream(bucketQueryResults, project.getPatterns());
return new TaskExecuteResult(rowStream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ public class InfluxDBHistoryQueryRowStream implements RowStream {
private int hasMoreRecords;

private int size;
public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryResults, List<String> patterns) {
this(bucketQueryResults, patterns, null);
}

public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryResults, List<String> patterns, String prefix) {
public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryResults, List<String> patterns) {
this.bucketQueryResults = new ArrayList<>(bucketQueryResults.entrySet());
this.indexList = new ArrayList<>();
List<Field> fields = new ArrayList<>();
Expand All @@ -58,7 +55,7 @@ public InfluxDBHistoryQueryRowStream(Map<String, List<FluxTable>> bucketQueryRes
List<FluxTable> tables = this.bucketQueryResults.get(i).getValue();
this.indexList.add(new int[tables.size()]);
for (FluxTable table: tables) {
fields.add(SchemaTransformer.toField(bucket, table, prefix));
fields.add(SchemaTransformer.toField(bucket, table));
this.hasMoreRecords++;
this.size++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
public class SchemaTransformer {

public static Field toField(String bucket, FluxTable table) {
return toField(bucket, table, null);
}

public static Field toField(String bucket, FluxTable table, String prefix) {
FluxRecord record = table.getRecords().get(0);
String measurement = record.getMeasurement();
String field = record.getField();
Expand All @@ -36,10 +32,6 @@ public static Field toField(String bucket, FluxTable table, String prefix) {
DataType dataType = fromInfluxDB(table.getColumns().stream().filter(x -> x.getLabel().equals("_value")).collect(Collectors.toList()).get(0).getDataType());

StringBuilder pathBuilder = new StringBuilder();
if (prefix != null) {
pathBuilder.append(prefix);
pathBuilder.append('.');
}
pathBuilder.append(bucket);
pathBuilder.append('.');
pathBuilder.append(measurement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,12 @@ private TaskExecuteResult executeQueryHistoryTask(TimeSeriesRange timeSeriesInte
try {
StringBuilder builder = new StringBuilder();
for (String path : project.getPatterns()) {
builder.append(getRealPathWithoutPrefix(path, timeSeriesInterval.getSchemaPrefix()));
builder.append(path);
builder.append(',');
}
String statement = String.format(QUERY_HISTORY_DATA, builder.deleteCharAt(builder.length() - 1).toString(), FilterTransformer.toString(filter));
logger.info("[Query] execute query: " + statement);
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project, timeSeriesInterval.getSchemaPrefix()));
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project));
return new TaskExecuteResult(rowStream);
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ enum State {
private State state;

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project) {
this(dataset, trimStorageUnit, project, null);
}

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project, String prefix) {
this.dataset = dataset;
this.trimStorageUnit = trimStorageUnit;
this.filterByTags = project.getTagFilter() != null;
Expand All @@ -91,7 +87,7 @@ public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUni
}
name = transformColumnName(name);
Pair<String, Map<String, String>> pair = TagKVUtils.splitFullName(name);
Field field = new Field(prefix==null? pair.getK() : prefix + "." + pair.getK(), DataTypeTransformer.fromIoTDB(type), pair.getV());
Field field = new Field(pair.getK(), DataTypeTransformer.fromIoTDB(type), pair.getV());
if (!this.trimStorageUnit && field.getFullName().startsWith(UNIT)) {
filterList.add(true);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,12 @@ private TaskExecuteResult executeQueryHistoryTask(TimeSeriesRange timeSeriesInte
try {
StringBuilder builder = new StringBuilder();
for (String path : project.getPatterns()) {
builder.append(getRealPathWithoutPrefix(path, timeSeriesInterval.getSchemaPrefix()));
builder.append(path);
builder.append(',');
}
String statement = String.format(QUERY_HISTORY_DATA, builder.deleteCharAt(builder.length() - 1).toString(), FilterTransformer.toString(filter));
logger.info("[Query] execute query: " + statement);
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project, timeSeriesInterval.getSchemaPrefix()));
RowStream rowStream = new ClearEmptyRowStreamWrapper(new IoTDBQueryRowStream(sessionPool.executeQueryStatement(statement), false, project));
return new TaskExecuteResult(rowStream);
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ enum State {
private State state;

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project) {
this(dataset, trimStorageUnit, project, null);
}

public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUnit, Project project, String prefix) {
this.dataset = dataset;
this.trimStorageUnit = trimStorageUnit;
this.filterByTags = project.getTagFilter() != null;
Expand All @@ -90,7 +86,7 @@ public IoTDBQueryRowStream(SessionDataSetWrapper dataset, boolean trimStorageUni
}
name = transformColumnName(name);
Pair<String, Map<String, String>> pair = TagKVUtils.splitFullName(name);
Field field = new Field(prefix==null? pair.getK() : prefix + "." + pair.getK(), DataTypeTransformer.strFromIoTDB(type), pair.getV());
Field field = new Field(pair.getK(), DataTypeTransformer.strFromIoTDB(type), pair.getV());
if (!this.trimStorageUnit && field.getFullName().startsWith(UNIT)) {
filterList.add(true);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public TaskExecuteResult execute(StoragePhysicalTask task) {
project.getTagFilter(),
FilterTransformer.toString(filter),
storageUnit,
isDummyStorageUnit,
task.getTargetFragment().getTsInterval().getSchemaPrefix());
isDummyStorageUnit);
} else if (op.getType() == OperatorType.Insert) {
Insert insert = (Insert) op;
return executor.executeInsertTask(
Expand Down
Loading