测试例子:
SELECT e.NAME, d.DEPT_NAME,d.DEPT_ID,EMP_ID,100+EMP_ID+100 FROM EMP e JOIN DEPT d ON e.DEPT_ID = d.DEPT_ID WHERE e.EMP_ID IN (SELECT EMP_ID FROM EMP WHERE DEPT_ID = 10)
代码示例:
package com.test;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;
public class SqlLineageExample {
public static void main(String[] args) throws SQLException, ValidationException, RelConversionException, SqlParseException {
// 创建 Calcite 连接
Properties info = new Properties();
info.setProperty("lex", "JAVA");
CalciteConnection connection = (CalciteConnection) DriverManager.getConnection("jdbc:calcite:", info);
SchemaPlus rootSchema = connection.getRootSchema();
// 定义并添加自定义表到 schema
rootSchema.add("EMP", new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return typeFactory.builder()
.add("EMP_ID", SqlTypeName.INTEGER)
.add("NAME", SqlTypeName.VARCHAR, 20)
.add("DEPT_ID", SqlTypeName.INTEGER)
.build();
}
});
rootSchema.add("DEPT", new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return typeFactory.builder()
.add("DEPT_ID", SqlTypeName.INTEGER)
.add("DEPT_NAME", SqlTypeName.VARCHAR, 20)
.build();
}
});
// 创建 FrameworkConfig
FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema)
.build();
// 创建 Planner
Planner planner = Frameworks.getPlanner(config);
// 复杂 SQL 查询
String sql = "SELECT e.NAME, d.DEPT_NAME,d.DEPT_ID,EMP_ID,100+EMP_ID+100 FROM EMP e JOIN DEPT d ON e.DEPT_ID = d.DEPT_ID " +
"WHERE e.EMP_ID IN (SELECT EMP_ID FROM EMP WHERE DEPT_ID = 10)";
// 解析 SQL
SqlNode parsedNode = planner.parse(sql);
// 校验 SQL
SqlNode validatedNode = planner.validate(parsedNode);
// 转换为关系代数树
RelRoot relRoot = planner.rel(validatedNode);
RelNode relNode = relRoot.project();
// 打印字段来源
printFieldLineage(relNode);
}
private static void printFieldLineage(RelNode relNode) {
RelMetadataQuery mq = relNode.getCluster().getMetadataQuery();
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
Set<RelColumnOrigin> origins = mq.getColumnOrigins(relNode, field.getIndex());
if (origins != null) {
for (RelColumnOrigin origin : origins) {
String fieldName = field.getName();
String tableName = origin.getOriginTable().getQualifiedName().get(0);
System.out.println("Field: " + fieldName + " comes from Table: " + tableName);
}
} else {
System.out.println("Field: " + field.getName() + " origin unknown");
}
}
}
}
结果输出:
打印结果如下:
Field: NAME comes from Table: EMP
Field: DEPT_NAME comes from Table: DEPT
Field: DEPT_ID comes from Table: DEPT
Field: EMP_ID comes from Table: EMP
Field: EXPR$4 comes from Table: EMP
依赖 JAR包
<dependencies> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>1.34.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.36</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.36</version> </dependency> </dependencies>