{
- private static final long serialVersionUID = -7994311331389155692L;
-
- private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
-
- private String address;
- private String tableName;
- private String userName;
- private String password;
- private String database;
- private Integer maxRequestsPerConnection;
- private Integer coreConnectionsPerHost;
- private Integer maxConnectionsPerHost;
- private Integer maxQueueSize;
- private Integer readTimeoutMillis;
- private Integer connectTimeoutMillis;
- private Integer poolTimeoutMillis;
-
- protected String[] fieldNames;
- TypeInformation>[] fieldTypes;
-
- private int batchInterval = 5000;
-
- private Cluster cluster;
- private Session session = null;
-
- private int batchCount = 0;
-
- private transient Counter outRecords;
-
- private transient Meter outRecordsRate;
-
- public CassandraOutputFormat() {
- }
-
- @Override
- public void configure(Configuration parameters) {
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- if (session == null) {
- QueryOptions queryOptions = new QueryOptions();
- //The default consistency level for queries: ConsistencyLevel.TWO.
- queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM);
- Integer maxRequestsPerConnection = this.maxRequestsPerConnection == null ? 1 : this.maxRequestsPerConnection;
- Integer coreConnectionsPerHost = this.coreConnectionsPerHost == null ? 8 : this.coreConnectionsPerHost;
- Integer maxConnectionsPerHost = this.maxConnectionsPerHost == null ? 32768 : this.maxConnectionsPerHost;
- Integer maxQueueSize = this.maxQueueSize == null ? 100000 : this.maxQueueSize;
- Integer readTimeoutMillis = this.readTimeoutMillis == null ? 60000 : this.readTimeoutMillis;
- Integer connectTimeoutMillis = this.connectTimeoutMillis == null ? 60000 : this.connectTimeoutMillis;
- Integer poolTimeoutMillis = this.poolTimeoutMillis == null ? 60000 : this.poolTimeoutMillis;
- Integer cassandraPort = 0;
-
- ArrayList serversList = new ArrayList();
- //Read timeout or connection timeout Settings
- SocketOptions so = new SocketOptions()
- .setReadTimeoutMillis(readTimeoutMillis)
- .setConnectTimeoutMillis(connectTimeoutMillis);
-
- //The cluster USES hostdistance.local in the same machine room
- //Hostdistance. REMOTE is used for different machine rooms
- //Ignore use HostDistance. IGNORED
- PoolingOptions poolingOptions = new PoolingOptions()
- //Each connection allows a maximum of 64 concurrent requests
- .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)
- //Have at least two connections to each machine in the cluster
- .setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost)
- //There are up to eight connections to each machine in the cluster
- .setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost)
- .setMaxQueueSize(maxQueueSize)
- .setPoolTimeoutMillis(poolTimeoutMillis);
- //重试策略
- RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
-
- for (String server : address.split(",")) {
- cassandraPort = Integer.parseInt(server.split(":")[1]);
- serversList.add(InetAddress.getByName(server.split(":")[0]));
- }
-
- if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
- cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy)
- .withPort(cassandraPort)
- .withPoolingOptions(poolingOptions).withSocketOptions(so)
- .withQueryOptions(queryOptions).build();
- } else {
- cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy)
- .withPort(cassandraPort)
- .withPoolingOptions(poolingOptions).withSocketOptions(so)
- .withCredentials(userName, password)
- .withQueryOptions(queryOptions).build();
- }
- // 建立连接 连接已存在的键空间
- session = cluster.connect(database);
- LOG.info("connect cassandra is successed!");
- initMetric();
- }
- } catch (Exception e) {
- LOG.error("connect cassandra is error:" + e.getMessage());
- }
- }
-
- private void initMetric() {
- outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
- outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
- }
-
- /**
- * Adds a record to the prepared statement.
- *
- * When this method is called, the output format is guaranteed to be opened.
- *
- *
- * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
- * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
- *
- * @param tuple2 The records to add to the output.
- * @throws IOException Thrown, if the records could not be added due to an I/O problem.
- * @see PreparedStatement
- */
- @Override
- public void writeRecord(Tuple2 tuple2) throws IOException {
- Tuple2 tupleTrans = tuple2;
- Boolean retract = tupleTrans.getField(0);
- Row row = tupleTrans.getField(1);
- try {
- if (retract) {
- insertWrite(row);
- outRecords.inc();
- } else {
- //do nothing
- }
- } catch (Exception e) {
- throw new IllegalArgumentException("writeRecord() failed", e);
- }
- }
-
- private void insertWrite(Row row) {
- try {
- String cql = buildSql(row);
- if (cql != null) {
- ResultSet resultSet = session.execute(cql);
- resultSet.wasApplied();
- }
- } catch (Exception e) {
- LOG.error("[upsert] is error:" + e.getMessage());
- }
- }
-
- private String buildSql(Row row) {
- StringBuffer fields = new StringBuffer();
- StringBuffer values = new StringBuffer();
- for (int index = 0; index < row.getArity(); index++) {
- if (row.getField(index) == null) {
- } else {
- fields.append(fieldNames[index] + ",");
- values.append("'" + row.getField(index) + "'" + ",");
- }
- }
- fields.deleteCharAt(fields.length() - 1);
- values.deleteCharAt(values.length() - 1);
- String cql = "INSERT INTO " + database + "." + tableName + " (" + fields.toString() + ") "
- + " VALUES (" + values.toString() + ")";
- return cql;
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public void close() throws IOException {
- try {
- if (session != null) {
- session.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- if (cluster != null) {
- cluster.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- LOG.info("close cassandra is successed!");
- }
-
- public static CassandraFormatBuilder buildOutputFormat() {
- return new CassandraFormatBuilder();
- }
-
- public static class CassandraFormatBuilder {
- private final CassandraOutputFormat format;
-
- protected CassandraFormatBuilder() {
- this.format = new CassandraOutputFormat();
- }
-
- public CassandraFormatBuilder setUsername(String username) {
- format.userName = username;
- return this;
- }
-
- public CassandraFormatBuilder setPassword(String password) {
- format.password = password;
- return this;
- }
-
- public CassandraFormatBuilder setAddress(String address) {
- format.address = address;
- return this;
- }
-
- public CassandraFormatBuilder setTableName(String tableName) {
- format.tableName = tableName;
- return this;
- }
-
- public CassandraFormatBuilder setDatabase(String database) {
- format.database = database;
- return this;
- }
-
- public CassandraFormatBuilder setFieldNames(String[] fieldNames) {
- format.fieldNames = fieldNames;
- return this;
- }
-
- public CassandraFormatBuilder setFieldTypes(TypeInformation>[] fieldTypes) {
- format.fieldTypes = fieldTypes;
- return this;
- }
-
- public CassandraFormatBuilder setMaxRequestsPerConnection(Integer maxRequestsPerConnection) {
- format.maxRequestsPerConnection = maxRequestsPerConnection;
- return this;
- }
-
- public CassandraFormatBuilder setCoreConnectionsPerHost(Integer coreConnectionsPerHost) {
- format.coreConnectionsPerHost = coreConnectionsPerHost;
- return this;
- }
-
- public CassandraFormatBuilder setMaxConnectionsPerHost(Integer maxConnectionsPerHost) {
- format.maxConnectionsPerHost = maxConnectionsPerHost;
- return this;
- }
-
- public CassandraFormatBuilder setMaxQueueSize(Integer maxQueueSize) {
- format.maxQueueSize = maxQueueSize;
- return this;
- }
-
- public CassandraFormatBuilder setReadTimeoutMillis(Integer readTimeoutMillis) {
- format.readTimeoutMillis = readTimeoutMillis;
- return this;
- }
-
- public CassandraFormatBuilder setConnectTimeoutMillis(Integer connectTimeoutMillis) {
- format.connectTimeoutMillis = connectTimeoutMillis;
- return this;
- }
-
- public CassandraFormatBuilder setPoolTimeoutMillis(Integer poolTimeoutMillis) {
- format.poolTimeoutMillis = poolTimeoutMillis;
- return this;
- }
-
- /**
- * Finalizes the configuration and checks validity.
- *
- * @return Configured RetractJDBCOutputFormat
- */
- public CassandraOutputFormat finish() {
- if (format.userName == null) {
- LOG.info("Username was not supplied separately.");
- }
- if (format.password == null) {
- LOG.info("Password was not supplied separately.");
- }
- if (format.address == null) {
- throw new IllegalArgumentException("No address URL supplied.");
- }
- if (format.database == null) {
- throw new IllegalArgumentException("No dababase suplied");
- }
- if (format.tableName == null) {
- throw new IllegalArgumentException("No tableName supplied");
- }
- return format;
- }
- }
-}
diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java
deleted file mode 100644
index eb7b23b53..000000000
--- a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.sink.cassandra;
-
-
-import com.dtstack.flink.sql.sink.IStreamSinkGener;
-import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
-import com.dtstack.flink.sql.table.TargetTableInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/**
- * Reason:
- * Date: 2018/11/22
- *
- * @author xuqianjin
- */
-public class CassandraSink implements RetractStreamTableSink, IStreamSinkGener {
-
-
- protected String[] fieldNames;
- TypeInformation>[] fieldTypes;
- protected String address;
- protected String tableName;
- protected String userName;
- protected String password;
- protected String database;
- protected Integer maxRequestsPerConnection;
- protected Integer coreConnectionsPerHost;
- protected Integer maxConnectionsPerHost;
- protected Integer maxQueueSize;
- protected Integer readTimeoutMillis;
- protected Integer connectTimeoutMillis;
- protected Integer poolTimeoutMillis;
-
- public CassandraSink() {
- // TO DO NOTHING
- }
-
- @Override
- public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
- CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo;
- this.address = cassandraTableInfo.getAddress();
- this.tableName = cassandraTableInfo.getTableName();
- this.userName = cassandraTableInfo.getUserName();
- this.password = cassandraTableInfo.getPassword();
- this.database = cassandraTableInfo.getDatabase();
- this.maxRequestsPerConnection = cassandraTableInfo.getMaxRequestsPerConnection();
- this.coreConnectionsPerHost = cassandraTableInfo.getCoreConnectionsPerHost();
- this.maxConnectionsPerHost = cassandraTableInfo.getMaxConnectionsPerHost();
- this.maxQueueSize = cassandraTableInfo.getMaxQueueSize();
- this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
- this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
- this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
- return this;
- }
-
- @Override
- public void emitDataStream(DataStream> dataStream) {
- CassandraOutputFormat.CassandraFormatBuilder builder = CassandraOutputFormat.buildOutputFormat();
- builder.setAddress(this.address)
- .setDatabase(this.database)
- .setTableName(this.tableName)
- .setPassword(this.password)
- .setUsername(this.userName)
- .setMaxRequestsPerConnection(this.maxRequestsPerConnection)
- .setCoreConnectionsPerHost(this.coreConnectionsPerHost)
- .setMaxConnectionsPerHost(this.maxConnectionsPerHost)
- .setMaxQueueSize(this.maxQueueSize)
- .setReadTimeoutMillis(this.readTimeoutMillis)
- .setConnectTimeoutMillis(this.connectTimeoutMillis)
- .setPoolTimeoutMillis(this.poolTimeoutMillis)
- .setFieldNames(this.fieldNames)
- .setFieldTypes(this.fieldTypes);
-
- CassandraOutputFormat outputFormat = builder.finish();
- RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
- dataStream.addSink(richSinkFunction);
- }
-
- @Override
- public TableSink> configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
- this.fieldNames = fieldNames;
- this.fieldTypes = fieldTypes;
- return this;
- }
-
- @Override
- public TupleTypeInfo> getOutputType() {
- return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType());
- }
-
- @Override
- public TypeInformation getRecordType() {
- return new RowTypeInfo(fieldTypes, fieldNames);
- }
-
- @Override
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation>[] getFieldTypes() {
- return fieldTypes;
- }
-
-}
diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java
deleted file mode 100644
index 4c68e71ae..000000000
--- a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.sink.cassandra.table;
-
-import com.dtstack.flink.sql.table.AbsTableParser;
-import com.dtstack.flink.sql.table.TableInfo;
-import com.dtstack.flink.sql.util.MathUtil;
-
-import java.util.Map;
-
-import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
-
-/**
- * Reason:
- * Date: 2018/11/22
- *
- * @author xuqianjin
- */
-public class CassandraSinkParser extends AbsTableParser {
-
- public static final String ADDRESS_KEY = "address";
-
- public static final String TABLE_NAME_KEY = "tableName";
-
- public static final String USER_NAME_KEY = "userName";
-
- public static final String PASSWORD_KEY = "password";
-
- public static final String DATABASE_KEY = "database";
-
- public static final String MAX_REQUEST_PER_CONNECTION_KEY = "maxRequestsPerConnection";
-
- public static final String CORE_CONNECTIONS_PER_HOST_KEY = "coreConnectionsPerHost";
-
- public static final String MAX_CONNECTIONS_PER_HOST_KEY = "maxConnectionsPerHost";
-
- public static final String MAX_QUEUE_SIZE_KEY = "maxQueueSize";
-
- public static final String READ_TIMEOUT_MILLIS_KEY = "readTimeoutMillis";
-
- public static final String CONNECT_TIMEOUT_MILLIS_KEY = "connectTimeoutMillis";
-
- public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
-
- @Override
- public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) {
- CassandraTableInfo cassandraTableInfo = new CassandraTableInfo();
- cassandraTableInfo.setName(tableName);
- parseFieldsInfo(fieldsInfo, cassandraTableInfo);
-
- cassandraTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase())));
- cassandraTableInfo.setAddress(MathUtil.getString(props.get(ADDRESS_KEY.toLowerCase())));
- cassandraTableInfo.setTableName(MathUtil.getString(props.get(TABLE_NAME_KEY.toLowerCase())));
- cassandraTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
- cassandraTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
- cassandraTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
- cassandraTableInfo.setMaxRequestsPerConnection(MathUtil.getIntegerVal(props.get(MAX_REQUEST_PER_CONNECTION_KEY.toLowerCase())));
- cassandraTableInfo.setCoreConnectionsPerHost(MathUtil.getIntegerVal(props.get(CORE_CONNECTIONS_PER_HOST_KEY.toLowerCase())));
- cassandraTableInfo.setMaxConnectionsPerHost(MathUtil.getIntegerVal(props.get(MAX_CONNECTIONS_PER_HOST_KEY.toLowerCase())));
- cassandraTableInfo.setMaxQueueSize(MathUtil.getIntegerVal(props.get(MAX_QUEUE_SIZE_KEY.toLowerCase())));
- cassandraTableInfo.setReadTimeoutMillis(MathUtil.getIntegerVal(props.get(READ_TIMEOUT_MILLIS_KEY.toLowerCase())));
- cassandraTableInfo.setConnectTimeoutMillis(MathUtil.getIntegerVal(props.get(CONNECT_TIMEOUT_MILLIS_KEY.toLowerCase())));
- cassandraTableInfo.setPoolTimeoutMillis(MathUtil.getIntegerVal(props.get(POOL_TIMEOUT_MILLIS_KEY.toLowerCase())));
-
- return cassandraTableInfo;
- }
-}
diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java
deleted file mode 100644
index c6626c42a..000000000
--- a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.sink.cassandra.table;
-
-import com.dtstack.flink.sql.table.TargetTableInfo;
-import com.google.common.base.Preconditions;
-
-/**
- * Reason:
- * Date: 2018/11/22
- *
- * @author xuqianjin
- */
-public class CassandraTableInfo extends TargetTableInfo {
-
- private static final String CURR_TYPE = "cassandra";
-
- private String address;
- private String tableName;
- private String userName;
- private String password;
- private String database;
- private Integer maxRequestsPerConnection;
- private Integer coreConnectionsPerHost;
- private Integer maxConnectionsPerHost;
- private Integer maxQueueSize;
- private Integer readTimeoutMillis;
- private Integer connectTimeoutMillis;
- private Integer poolTimeoutMillis;
-
- public CassandraTableInfo() {
- setType(CURR_TYPE);
- }
-
- public String getAddress() {
- return address;
- }
-
- public void setAddress(String address) {
- this.address = address;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public void setDatabase(String database) {
- this.database = database;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public Integer getMaxRequestsPerConnection() {
- return maxRequestsPerConnection;
- }
-
- public void setMaxRequestsPerConnection(Integer maxRequestsPerConnection) {
- this.maxRequestsPerConnection = maxRequestsPerConnection;
- }
-
- public Integer getCoreConnectionsPerHost() {
- return coreConnectionsPerHost;
- }
-
- public void setCoreConnectionsPerHost(Integer coreConnectionsPerHost) {
- this.coreConnectionsPerHost = coreConnectionsPerHost;
- }
-
- public Integer getMaxConnectionsPerHost() {
- return maxConnectionsPerHost;
- }
-
- public void setMaxConnectionsPerHost(Integer maxConnectionsPerHost) {
- this.maxConnectionsPerHost = maxConnectionsPerHost;
- }
-
- public Integer getMaxQueueSize() {
- return maxQueueSize;
- }
-
- public void setMaxQueueSize(Integer maxQueueSize) {
- this.maxQueueSize = maxQueueSize;
- }
-
- public Integer getReadTimeoutMillis() {
- return readTimeoutMillis;
- }
-
- public void setReadTimeoutMillis(Integer readTimeoutMillis) {
- this.readTimeoutMillis = readTimeoutMillis;
- }
-
- public Integer getConnectTimeoutMillis() {
- return connectTimeoutMillis;
- }
-
- public void setConnectTimeoutMillis(Integer connectTimeoutMillis) {
- this.connectTimeoutMillis = connectTimeoutMillis;
- }
-
- public Integer getPoolTimeoutMillis() {
- return poolTimeoutMillis;
- }
-
- public void setPoolTimeoutMillis(Integer poolTimeoutMillis) {
- this.poolTimeoutMillis = poolTimeoutMillis;
- }
-
- @Override
- public boolean check() {
- Preconditions.checkNotNull(address, "Cassandra field of ADDRESS is required");
- Preconditions.checkNotNull(database, "Cassandra field of database is required");
- Preconditions.checkNotNull(tableName, "Cassandra field of tableName is required");
- return true;
- }
-
- @Override
- public String getType() {
- // return super.getType().toLowerCase() + TARGET_SUFFIX;
- return super.getType().toLowerCase();
- }
-}
diff --git a/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java b/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java
deleted file mode 100644
index 33a0233ac..000000000
--- a/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package com.dtstack.flinkx;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest( String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
-
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-}
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
deleted file mode 100644
index f49de388b..000000000
--- a/cassandra/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-
-
-
- flink.sql
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
- sql.cassandra
- pom
-
-
- cassandra-sink
- cassandra-side
-
-
-
-
- junit
- junit
- 3.8.1
- test
-
-
- com.dtstack.flink
- sql.core
- 1.0-SNAPSHOT
- provided
-
-
- com.datastax.cassandra
- cassandra-driver-core
- 3.6.0
-
-
-
-
\ No newline at end of file
diff --git a/clickhouse/clickhouse-side/clickhouse-all-side/pom.xml b/clickhouse/clickhouse-side/clickhouse-all-side/pom.xml
deleted file mode 100644
index 8e6c6f51a..000000000
--- a/clickhouse/clickhouse-side/clickhouse-all-side/pom.xml
+++ /dev/null
@@ -1,92 +0,0 @@
-
-
-
- sql.side.clickhouse
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- sql.side.all.clickhouse
- clickhouse-all-side
-
- jar
-
-
- 1.0-SNAPSHOT
-
-
-
-
- com.dtstack.flink
- sql.side.clickhouse.core
- ${sql.side.clickhouse.core.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 1.4
-
-
- package
-
- shade
-
-
-
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
- maven-antrun-plugin
- 1.2
-
-
- copy-resources
-
- package
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllReqRow.java b/clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllReqRow.java
deleted file mode 100644
index 68c0c7984..000000000
--- a/clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllReqRow.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.side.clickhouse;
-
-import com.dtstack.flink.sql.side.FieldInfo;
-import com.dtstack.flink.sql.side.JoinInfo;
-import com.dtstack.flink.sql.side.SideTableInfo;
-import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
-import com.dtstack.flink.sql.util.DtStringUtil;
-import com.dtstack.flink.sql.util.JDBCUtils;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.List;
-import java.util.Map;
-
-public class ClickhouseAllReqRow extends RdbAllReqRow {
-
- private static final Logger LOG = LoggerFactory.getLogger(ClickhouseAllReqRow.class);
-
- private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
-
- public ClickhouseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) {
- super(new ClickhouseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
- }
-
- @Override
- public Connection getConn(String dbURL, String userName, String passWord) {
- try {
- Connection connection ;
- JDBCUtils.forName(CLICKHOUSE_DRIVER, getClass().getClassLoader());
- // ClickHouseProperties contains all properties
- if (userName == null) {
- connection = DriverManager.getConnection(dbURL);
- } else {
- connection = DriverManager.getConnection(dbURL, userName, passWord);
- }
- return connection;
- } catch (Exception e) {
- LOG.error("", e);
- throw new RuntimeException("", e);
- }
- }
-
-}
diff --git a/clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllSideInfo.java b/clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllSideInfo.java
deleted file mode 100644
index 973c069b9..000000000
--- a/clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllSideInfo.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.side.clickhouse;
-
-import com.dtstack.flink.sql.side.FieldInfo;
-import com.dtstack.flink.sql.side.JoinInfo;
-import com.dtstack.flink.sql.side.SideTableInfo;
-import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import java.util.List;
-
-
-public class ClickhouseAllSideInfo extends RdbAllSideInfo {
- public ClickhouseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) {
- super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
- }
-}
diff --git a/clickhouse/clickhouse-side/clickhouse-async-side/pom.xml b/clickhouse/clickhouse-side/clickhouse-async-side/pom.xml
deleted file mode 100644
index ab889c5a8..000000000
--- a/clickhouse/clickhouse-side/clickhouse-async-side/pom.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-
-
-
- sql.side.clickhouse
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- sql.side.async.clickhouse
- clickhouse-async-side
-
- jar
-
-
- 1.0-SNAPSHOT
-
-
-
-
- com.dtstack.flink
- sql.side.clickhouse.core
- ${sql.side.clickhouse.core.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 1.4
-
-
- package
-
- shade
-
-
-
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
- maven-antrun-plugin
- 1.2
-
-
- copy-resources
-
- package
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java b/clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java
deleted file mode 100644
index 305d65118..000000000
--- a/clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.side.clickhouse;
-
-import com.dtstack.flink.sql.side.FieldInfo;
-import com.dtstack.flink.sql.side.JoinInfo;
-import com.dtstack.flink.sql.side.SideTableInfo;
-import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
-import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
-import io.vertx.core.Vertx;
-import io.vertx.core.VertxOptions;
-import io.vertx.core.json.JsonObject;
-import io.vertx.ext.jdbc.JDBCClient;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-
-import java.util.List;
-
-
-public class ClickhouseAsyncReqRow extends RdbAsyncReqRow {
- private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
-
- public ClickhouseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) {
- super(new ClickhouseAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- JsonObject clickhouseClientConfig = new JsonObject();
- RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
- clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
- .put("driver_class", CLICKHOUSE_DRIVER)
- .put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
- .put("user", rdbSideTableInfo.getUserName())
- .put("password", rdbSideTableInfo.getPassword())
- .put("provider_class", DT_PROVIDER_CLASS);
- System.setProperty("vertx.disableFileCPResolving", "true");
- VertxOptions vo = new VertxOptions();
- vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
- vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
- vo.setFileResolverCachingEnabled(false);
- Vertx vertx = Vertx.vertx(vo);
- setRdbSQLClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
- }
-
-}
diff --git a/clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncSideInfo.java b/clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncSideInfo.java
deleted file mode 100644
index 254561de0..000000000
--- a/clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncSideInfo.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.side.clickhouse;
-
-import com.dtstack.flink.sql.side.FieldInfo;
-import com.dtstack.flink.sql.side.JoinInfo;
-import com.dtstack.flink.sql.side.SideTableInfo;
-import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import java.util.List;
-
-
-public class ClickhouseAsyncSideInfo extends RdbAsyncSideInfo {
-
- public ClickhouseAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) {
- super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
- }
-}
diff --git a/clickhouse/clickhouse-side/clickhouse-side-core/pom.xml b/clickhouse/clickhouse-side/clickhouse-side-core/pom.xml
deleted file mode 100644
index 29304d922..000000000
--- a/clickhouse/clickhouse-side/clickhouse-side-core/pom.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-
-
-
- sql.side.clickhouse
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- sql.side.clickhouse.core
- 1.0-SNAPSHOT
- jar
- clickhouse-side-core
-
-
\ No newline at end of file
diff --git a/clickhouse/clickhouse-side/clickhouse-side-core/src/main/java/com/dtstack/flink/sql/side/clickhouse/table/ClickhouseSideParser.java b/clickhouse/clickhouse-side/clickhouse-side-core/src/main/java/com/dtstack/flink/sql/side/clickhouse/table/ClickhouseSideParser.java
deleted file mode 100644
index 7be387fd8..000000000
--- a/clickhouse/clickhouse-side/clickhouse-side-core/src/main/java/com/dtstack/flink/sql/side/clickhouse/table/ClickhouseSideParser.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.side.clickhouse.table;
-
-import com.dtstack.flink.sql.side.rdb.table.RdbSideParser;
-import com.dtstack.flink.sql.table.TableInfo;
-import ru.yandex.clickhouse.domain.ClickHouseDataType;
-
-import java.util.Map;
-
-/**
- * Reason:
- * Date: 2019/11/04
- * Company: www.dtstack.com
- *
- * @author maqi
- */
-
-public class ClickhouseSideParser extends RdbSideParser {
-
- private static final String CURR_TYPE = "clickhouse";
-
- @Override
- public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) {
- TableInfo clickhouseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
- clickhouseTableInfo.setType(CURR_TYPE);
- return clickhouseTableInfo;
- }
-
- @Override
- public Class dbTypeConvertToJavaType(String fieldType) {
- return ClickHouseDataType.fromTypeString(fieldType).getJavaClass();
- }
-
-}
diff --git a/clickhouse/clickhouse-side/pom.xml b/clickhouse/clickhouse-side/pom.xml
deleted file mode 100644
index 2ba6a14dc..000000000
--- a/clickhouse/clickhouse-side/pom.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-
-
-
- sql.clickhouse
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- sql.side.clickhouse
- 1.0-SNAPSHOT
- clickhouse-side
- pom
-
-
- clickhouse-side-core
- clickhouse-async-side
- clickhouse-all-side
-
-
-
-
- 1.0-SNAPSHOT
-
-
-
-
- com.dtstack.flink
- sql.side.rdb
- ${rdb.side.version}
-
-
-
-
\ No newline at end of file
diff --git a/clickhouse/clickhouse-sink/pom.xml b/clickhouse/clickhouse-sink/pom.xml
deleted file mode 100644
index 75aac514a..000000000
--- a/clickhouse/clickhouse-sink/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-
-
-
- sql.clickhouse
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- sql.sink.clickhouse
- clickhouse-sink
- http://maven.apache.org
-
-
- 1.0-SNAPSHOT
-
-
-
-
- com.dtstack.flink
- sql.sink.rdb
- ${sql.sink.rdb.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 1.4
-
-
- package
-
- shade
-
-
-
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
- maven-antrun-plugin
- 1.2
-
-
- copy-resources
-
- package
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java b/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java
deleted file mode 100644
index 66c1f26d5..000000000
--- a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.sink.clickhouse;
-
-
-import com.dtstack.flink.sql.sink.IStreamSinkGener;
-import com.dtstack.flink.sql.sink.rdb.RdbSink;
-import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
-
-import java.util.List;
-import java.util.Map;
-
-
-public class ClickhouseSink extends RdbSink implements IStreamSinkGener {
-
- private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
-
- public ClickhouseSink() {
- }
-
- @Override
- public RetractJDBCOutputFormat getOutputFormat() {
- return new RetractJDBCOutputFormat();
- }
-
- @Override
- public void buildSql(String scheam, String tableName, List fields) {
- buildInsertSql(tableName, fields);
- }
-
- @Override
- public String buildUpdateSql(String schema, String tableName, List fieldNames, Map> realIndexes, List fullField) {
- return null;
- }
-
- private void buildInsertSql(String tableName, List fields) {
- String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
- String fieldsStr = "";
- String placeholder = "";
-
- for (String fieldName : fields) {
- fieldsStr += ",`" + fieldName + "`";
- placeholder += ",?";
- }
-
- fieldsStr = fieldsStr.replaceFirst(",", "");
- placeholder = placeholder.replaceFirst(",", "");
-
- sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
- this.sql = sqlTmp;
- System.out.println("---insert sql----");
- System.out.println(sql);
- }
-
-
- @Override
- public String getDriverName() {
- return CLICKHOUSE_DRIVER;
- }
-
-
-}
diff --git a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/table/ClickhouseSinkParser.java b/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/table/ClickhouseSinkParser.java
deleted file mode 100644
index 8c3df93d7..000000000
--- a/clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/table/ClickhouseSinkParser.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.sink.clickhouse.table;
-
-import com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser;
-import com.dtstack.flink.sql.table.TableInfo;
-import ru.yandex.clickhouse.domain.ClickHouseDataType;
-
-import java.util.Map;
-
-
-public class ClickhouseSinkParser extends RdbSinkParser {
- private static final String CURR_TYPE = "clickhouse";
-
- @Override
- public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) {
- TableInfo clickhouseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
- clickhouseTableInfo.setType(CURR_TYPE);
- return clickhouseTableInfo;
- }
-
- @Override
- public Class dbTypeConvertToJavaType(String fieldType) {
- return ClickHouseDataType.fromTypeString(fieldType).getJavaClass();
- }
-
-}
\ No newline at end of file
diff --git a/clickhouse/pom.xml b/clickhouse/pom.xml
deleted file mode 100644
index 37589bb36..000000000
--- a/clickhouse/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-
-
-
- flink.sql
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- sql.clickhouse
- pom
-
-
- clickhouse-side
- clickhouse-sink
-
-
-
-
- 1.0-SNAPSHOT
- 0.1.55
-
-
-
-
- com.dtstack.flink
- sql.core
- ${sql.core.version}
- provided
-
-
-
- ru.yandex.clickhouse
- clickhouse-jdbc
- ${clickhouse.jdbc.version}
-
-
-
-
\ No newline at end of file
diff --git a/console/console-sink/pom.xml b/console/console-sink/pom.xml
deleted file mode 100644
index 2f8ad9ef9..000000000
--- a/console/console-sink/pom.xml
+++ /dev/null
@@ -1,79 +0,0 @@
-
-
-
- sql.console
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
-
- console-sink
- jar
-
- console-sink
- http://maven.apache.org
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 1.4
-
-
- package
-
- shade
-
-
-
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
- maven-antrun-plugin
- 1.2
-
-
- copy-resources
-
- package
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java
deleted file mode 100644
index 7658e9979..000000000
--- a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.sink.console;
-
-import com.dtstack.flink.sql.sink.MetricOutputFormat;
-import com.dtstack.flink.sql.sink.console.table.TablePrintUtil;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Reason:
- * Date: 2018/12/19
- *
- * @author xuqianjin
- */
-public class ConsoleOutputFormat extends MetricOutputFormat {
-
- private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
-
- protected String[] fieldNames;
- TypeInformation>[] fieldTypes;
-
- @Override
- public void configure(Configuration parameters) {
-
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- initMetric();
- }
-
- @Override
- public void writeRecord(Tuple2 tuple2) throws IOException {
- Tuple2 tupleTrans = tuple2;
- Boolean retract = tupleTrans.getField(0);
- if (!retract) {
- return;
- }
-
- Row record = tupleTrans.getField(1);
- if (record.getArity() != fieldNames.length) {
- return;
- }
-
- List data = new ArrayList<>();
- data.add(fieldNames);
- data.add(record.toString().split(","));
- TablePrintUtil.build(data).print();
-
- outRecords.inc();
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- private ConsoleOutputFormat() {
- }
-
- public static ConsoleOutputFormatBuilder buildOutputFormat() {
- return new ConsoleOutputFormatBuilder();
- }
-
- public static class ConsoleOutputFormatBuilder {
-
- private final ConsoleOutputFormat format;
-
- protected ConsoleOutputFormatBuilder() {
- this.format = new ConsoleOutputFormat();
- }
-
- public ConsoleOutputFormatBuilder setFieldNames(String[] fieldNames) {
- format.fieldNames = fieldNames;
- return this;
- }
-
- public ConsoleOutputFormatBuilder setFieldTypes(TypeInformation>[] fieldTypes) {
- format.fieldTypes = fieldTypes;
- return this;
- }
-
- /**
- * Finalizes the configuration and checks validity.
- *
- * @return Configured RetractConsoleCOutputFormat
- */
- public ConsoleOutputFormat finish() {
- return format;
- }
- }
-}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java
deleted file mode 100644
index 77a3efea2..000000000
--- a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.sink.console;
-
-import com.dtstack.flink.sql.sink.IStreamSinkGener;
-import com.dtstack.flink.sql.table.TargetTableInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/**
- * Reason:
- * Date: 2018/12/19
- *
- * @author xuqianjin
- */
-public class ConsoleSink implements RetractStreamTableSink, IStreamSinkGener {
-
- protected String[] fieldNames;
- TypeInformation>[] fieldTypes;
-
- @Override
- public TableSink> configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
- this.fieldNames = fieldNames;
- this.fieldTypes = fieldTypes;
- return this;
- }
-
- @Override
- public TupleTypeInfo> getOutputType() {
- return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType());
- }
-
- @Override
- public TypeInformation getRecordType() {
- return new RowTypeInfo(fieldTypes, fieldNames);
- }
-
- @Override
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation>[] getFieldTypes() {
- return fieldTypes;
- }
-
- @Override
- public void emitDataStream(DataStream> dataStream) {
- ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat();
- builder.setFieldNames(this.fieldNames)
- .setFieldTypes(this.fieldTypes);
- ConsoleOutputFormat outputFormat = builder.finish();
- RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
- dataStream.addSink(richSinkFunction);
- }
-
- @Override
- public ConsoleSink genStreamSink(TargetTableInfo targetTableInfo) {
- return this;
- }
-}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleSinkParser.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleSinkParser.java
deleted file mode 100644
index e77444bfd..000000000
--- a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleSinkParser.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.sink.console.table;
-
-import com.dtstack.flink.sql.table.AbsTableParser;
-import com.dtstack.flink.sql.table.TableInfo;
-import com.dtstack.flink.sql.util.MathUtil;
-
-import java.util.Map;
-
-import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
-
-/**
- * Reason:
- * Date: 2018/12/19
- *
- * @author xuqianjin
- */
-public class ConsoleSinkParser extends AbsTableParser {
- @Override
- public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) {
- ConsoleTableInfo consoleTableInfo = new ConsoleTableInfo();
- consoleTableInfo.setName(tableName);
- parseFieldsInfo(fieldsInfo, consoleTableInfo);
-
- consoleTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase())));
- return consoleTableInfo;
- }
-}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleTableInfo.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleTableInfo.java
deleted file mode 100644
index 4b286c667..000000000
--- a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleTableInfo.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.sink.console.table;
-
-import com.dtstack.flink.sql.table.TargetTableInfo;
-
-/**
- * Reason:
- * Date: 2018/12/19
- *
- * @author xuqianjin
- */
-public class ConsoleTableInfo extends TargetTableInfo {
-
- private static final String CURR_TYPE = "console";
-
- public ConsoleTableInfo() {
- setType(CURR_TYPE);
- }
-
- @Override
- public boolean check() {
- return true;
- }
-
- @Override
- public String getType() {
- return super.getType().toLowerCase();
- }
-}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java
deleted file mode 100644
index 8813da619..000000000
--- a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java
+++ /dev/null
@@ -1,341 +0,0 @@
-package com.dtstack.flink.sql.sink.console.table;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Reason:
- * Date: 2018/12/19
- *
- * @author xuqianjin
- */
-public class TablePrintUtil {
- private static final Logger LOG = LoggerFactory.getLogger(TablePrintUtil.class);
- public static final int ALIGN_LEFT = 1;//左对齐
- public static final int ALIGN_RIGHT = 2;//右对齐
- public static final int ALIGN_CENTER = 3;//居中对齐
-
- private int align = ALIGN_CENTER;//默认居中对齐
- private boolean equilong = false;//默认不等宽
- private int padding = 1;//左右边距默认为1
- private char h = '-';//默认水平分隔符
- private char v = '|';//默认竖直分隔符
- private char o = '+';//默认交叉分隔符
- private char s = ' ';//默认空白填充符
- private List data;//数据
-
- private TablePrintUtil() {
- }
-
- /**
- * 链式调用入口方法
- *
- * @param data
- * @return
- */
- public static TablePrintUtil build(String[][] data) {
- TablePrintUtil self = new TablePrintUtil();
- self.data = new ArrayList<>(Arrays.asList(data));
- return self;
- }
-
- /**
- * 链式调用入口方法,T可以是String[]、List、任意实体类
- * 由于java泛型不同无法重载,所以这里要写if instanceof进行类型判断
- *
- * @param data
- * @param
- * @return
- */
- public static TablePrintUtil build(List data) {
- TablePrintUtil self = new TablePrintUtil();
- self.data = new ArrayList<>();
- if (data.size() <= 0) throw new RuntimeException("数据源至少得有一行吧");
- Object obj = data.get(0);
-
-
- if (obj instanceof String[]) {
- //如果泛型为String数组,则直接设置
- self.data = (List) data;
- } else if (obj instanceof List) {
- //如果泛型为List,则把list中的item依次转为String[],再设置
- int length = ((List) obj).size();
- for (Object item : data) {
- List col = (List) item;
- if (col.size() != length) throw new RuntimeException("数据源每列长度必须一致");
- self.data.add(col.toArray(new String[length]));
- }
- } else {
- //如果泛型为实体类,则利用反射获取get方法列表,从而推算出属性列表。
- //根据反射得来的属性列表设置表格第一行thead
- List colList = getColList(obj);
- String[] header = new String[colList.size()];
- for (int i = 0; i < colList.size(); i++) {
- header[i] = colList.get(i).colName;
- }
- self.data.add(header);
- //利用反射调用相应get方法获取属性值来设置表格tbody
- for (int i = 0; i < data.size(); i++) {
- String[] item = new String[colList.size()];
- for (int j = 0; j < colList.size(); j++) {
- String value = null;
- try {
- value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- e.printStackTrace();
- }
- item[j] = value == null ? "null" : value;
- }
- self.data.add(item);
- }
- }
- return self;
- }
-
- private static class Col {
- private String colName;//列名
- private String getMethodName;//get方法名
- }
-
- /**
- * 利用反射获取get方法名和属性名
- *
- * @return
- */
- private static List getColList(Object obj) {
- List colList = new ArrayList<>();
- Method[] methods = obj.getClass().getMethods();
- for (Method m : methods) {
- StringBuilder getMethodName = new StringBuilder(m.getName());
- if (getMethodName.substring(0, 3).equals("get") && !m.getName().equals("getClass")) {
- Col col = new Col();
- col.getMethodName = getMethodName.toString();
- char first = Character.toLowerCase(getMethodName.delete(0, 3).charAt(0));
- getMethodName.delete(0, 1).insert(0, first);
- col.colName = getMethodName.toString();
- colList.add(col);
- }
- }
- return colList;
- }
-
- /**
- * 获取字符串占的字符位数
- *
- * @param str
- * @return
- */
- private int getStringCharLength(String str) {
- Pattern p = Pattern.compile("[\u4e00-\u9fa5]");//利用正则找到中文
- Matcher m = p.matcher(str);
- int count = 0;
- while (m.find()) {
- count++;
- }
- return str.length() + count;
- }
-
- /**
- * 纵向遍历获取数据每列的长度
- *
- * @return
- */
- private int[] getColLengths() {
- int[] result = new int[data.get(0).length];
- for (int x = 0; x < result.length; x++) {
- int max = 0;
- for (int y = 0; y < data.size(); y++) {
- int len = getStringCharLength(data.get(y)[x]);
- if (len > max) {
- max = len;
- }
- }
- result[x] = max;
- }
- if (equilong) {//如果等宽表格
- int max = 0;
- for (int len : result) {
- if (len > max) max = len;
- }
- for (int i = 0; i < result.length; i++) {
- result[i] = max;
- }
- }
- return result;
- }
-
- /**
- * 取得表格字符串
- *
- * @return
- */
- public String getTableString() {
- StringBuilder sb = new StringBuilder();
- int[] colLengths = getColLengths();//获取每列文字宽度
- StringBuilder line = new StringBuilder();//表格横向分隔线
- line.append(o);
- for (int len : colLengths) {
- int allLen = len + padding * 2;//还需要加上边距和分隔符的长度
- for (int i = 0; i < allLen; i++) {
- line.append(h);
- }
- line.append(o);
- }
- sb.append(line).append("\r\n");
- for (int y = 0; y < data.size(); y++) {
- sb.append(v);
- for (int x = 0; x < data.get(y).length; x++) {
- String cell = data.get(y)[x];
- switch (align) {
- case ALIGN_LEFT:
- for (int i = 0; i < padding; i++) {sb.append(s);}
- sb.append(cell);
- for (int i = 0; i < colLengths[x] - getStringCharLength(cell) + padding; i++) {sb.append(s);}
- break;
- case ALIGN_RIGHT:
- for (int i = 0; i < colLengths[x] - getStringCharLength(cell) + padding; i++) {sb.append(s);}
- sb.append(cell);
- for (int i = 0; i < padding; i++) {sb.append(s);}
- break;
- case ALIGN_CENTER:
- int space = colLengths[x] - getStringCharLength(cell);
- int left = space / 2;
- int right = space - left;
- for (int i = 0; i < left + padding; i++) {sb.append(s);}
- sb.append(cell);
- for (int i = 0; i < right + padding; i++) {sb.append(s);}
- break;
- }
- sb.append(v);
- }
- sb.append("\r\n");
- sb.append(line).append("\r\n");
- }
- return sb.toString();
- }
-
- /**
- * 直接打印表格
- */
- public void print() {
- LOG.info("\n"+getTableString());
- System.out.println(getTableString());
- }
-
- //下面是链式调用的set方法
- public TablePrintUtil setAlign(int align) {
- this.align = align;
- return this;
- }
-
- public TablePrintUtil setEquilong(boolean equilong) {
- this.equilong = equilong;
- return this;
- }
-
- public TablePrintUtil setPadding(int padding) {
- this.padding = padding;
- return this;
- }
-
- public TablePrintUtil setH(char h) {
- this.h = h;
- return this;
- }
-
- public TablePrintUtil setV(char v) {
- this.v = v;
- return this;
- }
-
- public TablePrintUtil setO(char o) {
- this.o = o;
- return this;
- }
-
- public TablePrintUtil setS(char s) {
- this.s = s;
- return this;
- }
-
- /**
- * 使用示例
- *
- * @param args
- */
- public static void main(String[] args) {
- List data1 = new ArrayList<>();
- data1.add(new String[]{"用户名", "密码", "姓名"});
- data1.add(new String[]{"xiaoming", "xm123", "小明"});
- data1.add(new String[]{"xiaohong", "xh123", "小红"});
- TablePrintUtil.build(data1).print();
-
- List> data2 = new ArrayList<>();
- data2.add(new ArrayList<>());
- data2.add(new ArrayList<>());
- data2.add(new ArrayList<>());
- data2.get(0).add("用户名");
- data2.get(0).add("密码");
- data2.get(0).add("姓名");
- data2.get(1).add("xiaoming");
- data2.get(1).add("xm123");
- data2.get(1).add("小明");
- data2.get(2).add("xiaohong");
- data2.get(2).add("xh123");
- data2.get(2).add("小红");
- TablePrintUtil.build(data2)
- .setAlign(TablePrintUtil.ALIGN_LEFT)
- .setPadding(5)
- .setEquilong(true)
- .print();
-
-
- class User {
- String username;
- String password;
- String name;
-
- User(String username, String password, String name) {
- this.username = username;
- this.password = password;
- this.name = name;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
- }
- List data3 = new ArrayList<>();
- data3.add(new User("xiaoming", "xm123", "小明"));
- data3.add(new User("xiaohong", "xh123", "小红"));
- TablePrintUtil.build(data3).setH('=').setV('!').print();
- }
-}
diff --git a/console/console-sink/src/test/java/com/dtstack/flinkx/AppTest.java b/console/console-sink/src/test/java/com/dtstack/flinkx/AppTest.java
deleted file mode 100644
index e03e5451f..000000000
--- a/console/console-sink/src/test/java/com/dtstack/flinkx/AppTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package com.dtstack.flinkx;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest(String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
-
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-}
diff --git a/console/pom.xml b/console/pom.xml
deleted file mode 100644
index 983e1c185..000000000
--- a/console/pom.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-
-
-
- flink.sql
- com.dtstack.flink
- 1.0-SNAPSHOT
-
- 4.0.0
- sql.console
- pom
-
-
- console-sink
-
-
-
-
- junit
- junit
- 3.8.1
- test
-
-
- com.dtstack.flink
- sql.core
- 1.0-SNAPSHOT
- provided
-
-
-
-
-
\ No newline at end of file
diff --git a/core/pom.xml b/core/pom.xml
index c357b4c99..1040fcea6 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -17,7 +17,6 @@
UTF-8
core
- 1.16.0
@@ -28,58 +27,21 @@
test
-
- joda-time
- joda-time
- 2.5
-
-
-
- org.apache.flink
- flink-core
- ${flink.version}
-
-
-
- org.apache.flink
- flink-streaming-java_2.11
- ${flink.version}
-
-
- org.apache.flink
- flink-streaming-scala_2.11
- ${flink.version}
-
-
-
-
-
- org.apache.flink
- flink-table-planner_2.11
${flink.version}
org.apache.flink
- flink-table-common
+ flink-core
${flink.version}
-
- org.apache.calcite
- calcite-server
-
- ${calcite.server.version}
-
-
org.apache.flink
- flink-cep-scala_2.11
+ flink-streaming-java_2.11
${flink.version}
@@ -91,7 +53,7 @@
org.apache.flink
- flink-yarn_2.11
+ flink-streaming-scala_2.11
${flink.version}
@@ -144,7 +106,7 @@
+ tofile="${basedir}/../plugins/${project.name}.jar" />
diff --git a/core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java b/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java
similarity index 91%
rename from core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java
rename to core/src/main/java/com/dtstack/flink/sql/ClusterMode.java
index 341258a43..024a31854 100644
--- a/core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java
+++ b/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java
@@ -17,7 +17,7 @@
* limitations under the License.
*/
-package com.dtstack.flink.sql.enums;
+package com.dtstack.flink.sql;
/**
* Created by sishu.yss on 2018/10/10.
@@ -31,8 +31,4 @@ public enum ClusterMode {
ClusterMode(int type){
this.type = type;
}
-
- public int getType(){
- return this.type;
- }
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/Main.java b/core/src/main/java/com/dtstack/flink/sql/Main.java
index a08df9b05..610abf21f 100644
--- a/core/src/main/java/com/dtstack/flink/sql/Main.java
+++ b/core/src/main/java/com/dtstack/flink/sql/Main.java
@@ -16,24 +16,13 @@
* limitations under the License.
*/
-
+
package com.dtstack.flink.sql;
-import com.dtstack.flink.sql.config.CalciteConfig;
-import com.dtstack.flink.sql.classloader.ClassLoaderManager;
-import com.dtstack.flink.sql.constrant.ConfigConstrant;
-import com.dtstack.flink.sql.enums.ClusterMode;
+import com.dtstack.flink.sql.classloader.DtClassLoader;
import com.dtstack.flink.sql.enums.ECacheType;
-import com.dtstack.flink.sql.enums.EPluginLoadMode;
-import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
-import com.dtstack.flink.sql.exec.FlinkSQLExec;
-import com.dtstack.flink.sql.option.OptionParser;
-import com.dtstack.flink.sql.parser.CreateFuncParser;
-import com.dtstack.flink.sql.parser.CreateTmpTableParser;
-import com.dtstack.flink.sql.parser.InsertSqlParser;
-import com.dtstack.flink.sql.parser.SqlParser;
-import com.dtstack.flink.sql.parser.SqlTree;
+import com.dtstack.flink.sql.parser.*;
import com.dtstack.flink.sql.side.SideSqlExec;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.table.SourceTableInfo;
@@ -42,27 +31,27 @@
import com.dtstack.flink.sql.sink.StreamSinkFactory;
import com.dtstack.flink.sql.source.StreamSourceFactory;
import com.dtstack.flink.sql.util.DtStringUtil;
-import com.dtstack.flink.sql.util.PropertiesUtils;
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
import com.dtstack.flink.sql.util.FlinkUtil;
import com.dtstack.flink.sql.util.PluginUtil;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.apache.commons.io.Charsets;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
+import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Sets;
import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.Configuration;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -72,19 +61,21 @@
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import com.dtstack.flink.sql.option.Options;
/**
* Date: 2018/6/26
@@ -100,19 +91,39 @@ public class Main {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
+ private static final int failureRate = 3;
+
+ private static final int failureInterval = 6; //min
+
+ private static final int delayInterval = 10; //sec
public static void main(String[] args) throws Exception {
- OptionParser optionParser = new OptionParser(args);
- Options options = optionParser.getOptions();
- String sql = options.getSql();
- String name = options.getName();
- String addJarListStr = options.getAddjar();
- String localSqlPluginPath = options.getLocalSqlPluginPath();
- String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
- String pluginLoadMode = options.getPluginLoadMode();
- String deployMode = options.getMode();
- String confProp = options.getConfProp();
+ Options options = new Options();
+ options.addOption("sql", true, "sql config");
+ options.addOption("name", true, "job name");
+ options.addOption("addjar", true, "add jar");
+ options.addOption("localSqlPluginPath", true, "local sql plugin path");
+ options.addOption("remoteSqlPluginPath", true, "remote sql plugin path");
+ options.addOption("confProp", true, "env properties");
+ options.addOption("mode", true, "deploy mode");
+
+ options.addOption("savePointPath", true, "Savepoint restore path");
+ options.addOption("allowNonRestoredState", true, "Flag indicating whether non restored state is allowed if the savepoint");
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cl = parser.parse(options, args);
+ String sql = cl.getOptionValue("sql");
+ String name = cl.getOptionValue("name");
+ String addJarListStr = cl.getOptionValue("addjar");
+ String localSqlPluginPath = cl.getOptionValue("localSqlPluginPath");
+ String remoteSqlPluginPath = cl.getOptionValue("remoteSqlPluginPath");
+ String deployMode = cl.getOptionValue("mode");
+ String confProp = cl.getOptionValue("confProp");
+
+ Preconditions.checkNotNull(sql, "parameters of sql is required");
+ Preconditions.checkNotNull(name, "parameters of name is required");
+ Preconditions.checkNotNull(localSqlPluginPath, "parameters of localSqlPluginPath is required");
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
@@ -123,6 +134,17 @@ public static void main(String[] args) throws Exception {
addJarFileList = objMapper.readValue(addJarListStr, List.class);
}
+ ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+ DtClassLoader dtClassLoader = new DtClassLoader(new URL[]{}, threadClassLoader);
+ Thread.currentThread().setContextClassLoader(dtClassLoader);
+
+ URLClassLoader parentClassloader;
+ if(!ClusterMode.local.name().equals(deployMode)){
+ parentClassloader = (URLClassLoader) threadClassLoader.getParent();
+ }else{
+ parentClassloader = dtClassLoader;
+ }
+
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
@@ -141,22 +163,13 @@ public static void main(String[] args) throws Exception {
Map registerTableCache = Maps.newHashMap();
//register udf
- registerUDF(sqlTree, jarURList, tableEnv);
+ registerUDF(sqlTree, jarURList, parentClassloader, tableEnv);
//register table schema
- registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
-
- sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache);
-
- if(env instanceof MyLocalStreamEnvironment) {
- ((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
- }
+ registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
- env.execute(name);
- }
-
- private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map sideTableMap,Map registerTableCache) throws Exception {
SideSqlExec sideSqlExec = new SideSqlExec();
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
+
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
}
@@ -165,13 +178,14 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
if(LOG.isInfoEnabled()){
LOG.info("exe-sql:\n" + result.getExecSql());
}
+
boolean isSide = false;
+
for (String tableName : result.getTargetTableList()) {
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
-
- SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
+ SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql).parseStmt();
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
tmp.setExecSql(tmpSql);
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
@@ -182,21 +196,26 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
break;
}
}
+
if(isSide){
//sql-dimensional table contains the dimension table of execution
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
}else{
- FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
- if(LOG.isInfoEnabled()){
- LOG.info("exec sql: " + result.getExecSql());
- }
+ tableEnv.sqlUpdate(result.getExecSql());
}
}
}
}
+ if(env instanceof MyLocalStreamEnvironment) {
+ List urlList = new ArrayList<>();
+ urlList.addAll(Arrays.asList(dtClassLoader.getURLs()));
+ ((MyLocalStreamEnvironment) env).setClasspaths(urlList);
+ }
+ env.execute(name);
}
+
/**
* This part is just to add classpath for the jar when reading remote execution, and will not submit jar from a local
* @param env
@@ -215,25 +234,27 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set cla
}
}
- private static void registerUDF(SqlTree sqlTree, List jarURList, StreamTableEnvironment tableEnv)
- throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ private static void registerUDF(SqlTree sqlTree, List jarURList, URLClassLoader parentClassloader,
+ StreamTableEnvironment tableEnv)
+ throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
//register urf
- // udf和tableEnv须由同一个类加载器加载
- ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
URLClassLoader classLoader = null;
List funcList = sqlTree.getFunctionList();
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
//classloader
if (classLoader == null) {
- classLoader = FlinkUtil.loadExtraJar(jarURList, (URLClassLoader)levelClassLoader);
+ classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
}
- FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
+ classLoader.loadClass(funcInfo.getClassName());
+ FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName().toUpperCase(),
+ tableEnv, classLoader);
}
}
- private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
- String remoteSqlPluginPath, String pluginLoadMode, Map sideTableMap, Map registerTableCache) throws Exception {
+ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv,
+ String localSqlPluginPath, String remoteSqlPluginPath,
+ Map sideTableMap, Map registerTableCache) throws Exception {
Set classPathSet = Sets.newHashSet();
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
@@ -249,10 +270,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
- DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
- .map((Tuple2 f0) -> { return f0.f1; })
- .returns(typeInfo);
-
+ DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
String fields = String.join(",", typeInfo.getFieldNames());
if(waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)){
@@ -264,22 +282,19 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
tableEnv.registerTable(tableInfo.getName(), regTable);
- if(LOG.isInfoEnabled()){
- LOG.info("registe table {} success.", tableInfo.getName());
- }
registerTableCache.put(tableInfo.getName(), regTable);
- classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
+ classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath));
} else if (tableInfo instanceof TargetTableInfo) {
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
- classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
+ classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
} else if(tableInfo instanceof SideTableInfo){
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
- classPathSet.add(buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
+ classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
}else {
throw new RuntimeException("not support table type:" + tableInfo.getType());
}
@@ -295,57 +310,30 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
}
}
- private static URL buildSourceAndSinkPathByLoadMode(String type, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
- if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
- return PluginUtil.getRemoteJarFilePath(type, suffix, remoteSqlPluginPath, localSqlPluginPath);
- }
- return PluginUtil.getLocalJarFilePath(type, suffix, localSqlPluginPath);
- }
-
- private static URL buildSidePathByLoadMode(String type, String operator, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
- if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
- return PluginUtil.getRemoteSideJarFilePath(type, operator, suffix, remoteSqlPluginPath, localSqlPluginPath);
- }
- return PluginUtil.getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath);
- }
-
- private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
- confProperties = PropertiesUtils.propertiesTrim(confProperties);
-
+ private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
StreamExecutionEnvironment.getExecutionEnvironment() :
new MyLocalStreamEnvironment();
- env.getConfig().disableClosureCleaner();
- env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
- Configuration globalJobParameters = new Configuration();
- //Configuration unsupported set properties key-value
- Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
- method.setAccessible(true);
- for (Map.Entry