diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java index 6ae8d56bfaa5c..9ba3de17352c4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java @@ -396,7 +396,7 @@ private void testDirectoryError(final String wrongDir, final Statement statement + "PipePlugin.jar")); fail(); } catch (final SQLException e) { - Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe plugin")); + Assert.assertTrue(e.getMessage().contains("701: Failed to create pipe plugin")); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 35403c4dc6d05..24807622cf6b6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -311,8 +311,7 @@ public void testSourcePermission() { receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); fail("Shall fail if password is wrong."); } catch (final SQLException e) { - Assert.assertTrue( - e.getMessage().contains("Fail to CREATE_PIPE because Authentication failed.")); + Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage()); } // Use current session, user is root @@ -506,4 +505,129 @@ public void testSourcePermission() { fail(e.getMessage()); } } + + @Test + public void testIllegalPassword() throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create user `thulab` 'ST@ongpasswd123456'", + "create role `admin`", + "grant role `admin` to `thulab`", + "grant WRITE, READ, SYSTEM, SECURITY on root.** to role `admin`"), + null); + + TestUtils.executeNonQuery( + senderEnv, + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)"); + TestUtils.executeNonQuery( + receiverEnv, + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)"); + + Connection connection = senderEnv.getConnection(); + Statement statement = connection.createStatement(); + try { + statement.execute( + String.format( + "create pipe a2b" + + " with source (" + + "'user'='thulab'" + + ", 'password'='passwd')" + + " with sink (" + + "'node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + fail(); + } catch (final Exception e) { + Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage()); + } + + try { + statement.execute( + "create pipe a2b ('sink'='write-back-sink', 'user'='thulab', 'password'='passwd')"); + fail(); + } catch (final Exception e) { + Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage()); + } + + statement.execute( + String.format( + "create pipe a2b" + + " with source (" + + "'user'='thulab'" + + ", 'password'='ST@ongpasswd123456')" + + " with sink (" + + "'node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + + TestUtils.executeNonQuery( + senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(pressure) from root.vehicle.plane", + "count(root.vehicle.plane.pressure),", + Collections.singleton("1,")); + + statement.execute("alter user thulab set password 'newST@ongPassword'"); + + try { + TestUtils.restartCluster(senderEnv); + } catch (final Throwable e) { + e.printStackTrace(); + return; + } + + connection = senderEnv.getConnection(); + statement = connection.createStatement(); + TestUtils.executeNonQuery( + senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"); + + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "select count(pressure) from root.vehicle.plane", + "count(root.vehicle.plane.pressure),", + Collections.singleton("1,")); + + try { + statement.execute("alter pipe a2b modify source ('password'='fake')"); + } catch (final SQLException e) { + Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage()); + } + + statement.execute("alter pipe a2b modify source ('password'='newST@ongPassword')"); + + // Test empty alter + statement.execute("alter pipe a2b"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(pressure) from root.vehicle.plane", + "count(root.vehicle.plane.pressure),", + Collections.singleton("2,")); + + statement.execute("alter user thulab set password 'anotherST@ongPassword'"); + + try { + TestUtils.restartCluster(senderEnv); + } catch (final Throwable e) { + e.printStackTrace(); + return; + } + + connection = senderEnv.getConnection(); + statement = connection.createStatement(); + TestUtils.executeNonQuery( + senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"); + statement.execute("alter user thulab set password 'newST@ongPassword'"); + statement.execute("alter pipe a2b"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(pressure) from root.vehicle.plane", + "count(root.vehicle.plane.pressure),", + Collections.singleton("3,")); + + statement.close(); + connection.close(); + } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index c4e7edd63d0ce..c13f87ae5794f 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -371,7 +371,7 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame return new PipeParameters(thisMap); } - private static class KeyReducer { + public static class KeyReducer { private static final Set FIRST_PREFIXES = new HashSet<>(); private static final Set SECOND_PREFIXES = new HashSet<>(); @@ -399,7 +399,7 @@ static String shallowReduce(String key) { return key; } - static String reduce(String key) { + public static String reduce(String key) { if (key == null) { return null; } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java new file mode 100644 index 0000000000000..10dce32c59153 --- /dev/null +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.api.exception; + +public class PipePasswordCheckException extends PipeException { + public PipePasswordCheckException(final String message) { + super(message); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index e51d1a43299b5..f455edb26b8b1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1332,10 +1332,11 @@ public DataSet queryPermission(final AuthorPlan authorPlan) { } @Override - public TPermissionInfoResp login(String username, String password) { + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return permissionManager.login(username, password); + return permissionManager.login(username, password, useEncryptedPassword); } else { TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp(); resp.setStatus(status); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index fe83b8bd0c189..02c82164595df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -501,7 +501,8 @@ TDataPartitionTableResp getOrCreateDataPartition( DataSet queryPermission(final AuthorPlan authorPlan); /** login. */ - TPermissionInfoResp login(String username, String password); + TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword); /** Check User Privileges. */ TPermissionInfoResp checkUserPrivileges(String username, PrivilegeUnion union); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java index 06be2818593ae..ee39bfc2b19e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java @@ -112,8 +112,9 @@ protected ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } - public TPermissionInfoResp login(String username, String password) { - return authorInfo.login(username, password); + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { + return authorInfo.login(username, password, useEncryptedPassword); } public String login4Pipe(final String userName, final String password) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index d9f6e07be2b2b..81e82b96adee8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -1225,7 +1225,7 @@ protected boolean shouldLogin() { @Override protected TSStatus login() { - return configManager.login(username, password).getStatus(); + return configManager.login(username, password, false).getStatus(); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java index 60512887703ff..eca4943c46f9e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java @@ -55,8 +55,11 @@ import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException; import org.apache.iotdb.rpc.TSStatusCode; +import javax.annotation.Nonnull; + import java.io.IOException; import java.nio.file.Paths; import java.util.Collections; @@ -108,6 +111,20 @@ public void customize( PipeConfigNodeRemainingTimeMetrics.getInstance().register(this); } + @Override + protected void login(final @Nonnull String password) { + if (ConfigNode.getInstance() + .getConfigManager() + .getPermissionManager() + .login(userName, password, true) + .getStatus() + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipePasswordCheckException( + String.format("Failed to check password for pipe %s.", pipeName)); + } + } + @Override protected AbstractPipeListeningQueue getListeningQueue() { return PipeConfigNodeAgent.runtime().listener(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java index 743e0a3f09a56..d7404009825ae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java @@ -124,8 +124,9 @@ public void setAuthorQueryPlanExecutor(IAuthorPlanExecutor authorPlanExecutor) { this.authorPlanExecutor = authorPlanExecutor; } - public TPermissionInfoResp login(String username, String password) { - return authorPlanExecutor.login(username, password); + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { + return authorPlanExecutor.login(username, password, useEncryptedPassword); } public String login4Pipe(final String username, final String password) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java index 5e8843418c170..8f216df3250ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java @@ -74,13 +74,14 @@ public AuthorPlanExecutor(IAuthorizer authorizer) { } @Override - public TPermissionInfoResp login(String username, String password) { + public TPermissionInfoResp login( + String username, final String password, final boolean useEncryptedPassword) { boolean status; String loginMessage = null; TSStatus tsStatus = new TSStatus(); TPermissionInfoResp result = new TPermissionInfoResp(); try { - status = authorizer.login(username, password); + status = authorizer.login(username, password, useEncryptedPassword); if (status) { // Bring this user's permission information back to the datanode for caching if (authorizer instanceof OpenIdAuthorizer) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java index 24f0d8ceb0a7a..724ebf43f73fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java @@ -33,7 +33,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; public interface IAuthorPlanExecutor { - TPermissionInfoResp login(String username, String password); + TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword); String login4Pipe(final String username, final String password); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index 53f908bf4fc11..921661d13de5f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -107,14 +107,18 @@ public boolean executeFromValidateTask(final ConfigNodeProcedureEnv env) throws PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, PipeSourceConstant.SOURCE_IOTDB_USER_KEY, PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, - PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); final boolean checkSink = new PipeParameters(alterPipeRequest.getConnectorAttributes()) .hasAnyAttributes( PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY, PipeSinkConstant.SINK_IOTDB_USER_KEY, PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY, - PipeSinkConstant.SINK_IOTDB_USERNAME_KEY); + PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, + PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index f99293f0a0276..f880bbdb85cb1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -174,7 +174,9 @@ public static void checkAndEnrichSourceAuthentication( if (sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY) || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY) || sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY) - || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)) { + || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) { final String hashedPassword = env.getConfigManager() .getPermissionManager() @@ -216,7 +218,9 @@ public static void checkAndEnrichSinkAuthentication( if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY) || sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USER_KEY) || sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY) - || sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)) { + || sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY) + || sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY) + || sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)) { final String hashedPassword = env.getConfigManager() .getPermissionManager() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 26fed0e1c4a3a..5d6aa8da9f5df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -744,7 +744,10 @@ public TAuthorizerResp queryRPermission(final TAuthorizerRelationalReq req) { @Override public TPermissionInfoResp login(TLoginReq req) { - return configManager.login(req.getUserrname(), req.getPassword()); + return configManager.login( + req.getUserrname(), + req.getPassword(), + req.isSetUseEncryptedPassword() && req.isUseEncryptedPassword()); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java index 151307e280d8c..34a9a411133fe 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java @@ -664,7 +664,7 @@ public void createUserWithRawPassword() { new ArrayList<>()); status = authorInfo.authorNonQuery(authorPlan); assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - TPermissionInfoResp result = authorInfo.login("testuser", "password123456"); + TPermissionInfoResp result = authorInfo.login("testuser", "password123456", false); assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.getStatus().getCode()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java index 81db578ace16a..21c3bc787ee2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java @@ -142,8 +142,14 @@ public static Optional getUserId(String username) { return Optional.ofNullable(user == null ? null : user.getUserId()); } - public static TSStatus checkUser(String userName, String password) { - TSStatus status = authorityFetcher.get().checkUser(userName, password); + public static TSStatus checkUser(final String userName, final String password) { + return checkUser(userName, password, false); + } + + public static TSStatus checkUser( + final String userName, final String password, final boolean useEncryptedPassword) { + final TSStatus status = + authorityFetcher.get().checkUser(userName, password, useEncryptedPassword); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java index 2894ce8bb478e..325476174c667 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java @@ -543,23 +543,31 @@ public void setAcceptCache(boolean acceptCache) { } @Override - public TSStatus checkUser(String username, String password) { + public TSStatus checkUser( + final String username, final String password, final boolean useEncryptedPassword) { checkCacheAvailable(); - User user = iAuthorCache.getUserCache(username); + final User user = iAuthorCache.getUserCache(username); if (user != null) { if (user.isOpenIdUser()) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); - } else if (password != null && AuthUtils.validatePassword(password, user.getPassword())) { - return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); - } else if (password != null - && AuthUtils.validatePassword( - password, user.getPassword(), AsymmetricEncrypt.DigestAlgorithm.MD5)) { - return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } else if (password != null) { + if (useEncryptedPassword) { + return password.equals(user.getPassword()) + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, "Authentication failed."); + } else { + return AuthUtils.validatePassword(password, user.getPassword()) + || AuthUtils.validatePassword( + password, user.getPassword(), AsymmetricEncrypt.DigestAlgorithm.MD5) + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, "Authentication failed."); + } } else { return RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, "Authentication failed."); } } else { - TLoginReq req = new TLoginReq(username, password); + TLoginReq req = + new TLoginReq(username, password).setUseEncryptedPassword(useEncryptedPassword); TPermissionInfoResp status = null; try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java index 3dc95fa41dc17..302679878955b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java @@ -36,7 +36,8 @@ public interface IAuthorityFetcher { - TSStatus checkUser(String username, String password); + TSStatus checkUser( + final String username, final String password, final boolean useEncryptedPassword); boolean checkRole(String username, String roleName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java index 11c1edafccb13..d57956109d46d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java @@ -27,7 +27,9 @@ import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent; import org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter; import org.apache.iotdb.pipe.api.PipePlugin; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; @@ -70,7 +72,7 @@ public PipeSchemaRegionPluginAgent schemaRegion() { /////////////////////////////// Pipe Plugin Management /////////////////////////////// - public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) + public void register(final PipePluginMeta pipePluginMeta, final ByteBuffer jarFile) throws IOException, PipeException { lock.lock(); try { @@ -86,7 +88,7 @@ public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) } } - private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws PipeException { + private void checkIfRegistered(final PipePluginMeta pipePluginMeta) throws PipeException { final String pluginName = pipePluginMeta.getPluginName(); final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName); if (information == null) { @@ -121,7 +123,8 @@ private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws PipeExcepti // we allow users to register the same pipe plugin multiple times without any error } - private void saveJarFileIfNeeded(String pluginName, String jarName, ByteBuffer byteBuffer) + private void saveJarFileIfNeeded( + final String pluginName, final String jarName, final ByteBuffer byteBuffer) throws IOException { if (byteBuffer != null) { PipePluginExecutableManager.getInstance() @@ -136,7 +139,7 @@ private void saveJarFileIfNeeded(String pluginName, String jarName, ByteBuffer b * @param pipePluginMeta the meta information of the PipePlugin * @throws PipeException if the PipePlugin can not be loaded or its instance can not be created */ - public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException { + public void doRegister(final PipePluginMeta pipePluginMeta) throws PipeException { final String pluginName = pipePluginMeta.getPluginName(); final String className = pipePluginMeta.getClassName(); @@ -156,7 +159,7 @@ public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException { pipePluginMetaKeeper.addPipePluginVisibility( pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); - } catch (IOException + } catch (final IOException | InstantiationException | InvocationTargetException | NoSuchMethodException @@ -173,7 +176,8 @@ public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException { } } - public void deregister(String pluginName, boolean needToDeleteJar) throws PipeException { + public void deregister(final String pluginName, final boolean needToDeleteJar) + throws PipeException { lock.lock(); try { final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName); @@ -197,7 +201,7 @@ public void deregister(String pluginName, boolean needToDeleteJar) throws PipeEx PipePluginExecutableManager.getInstance() .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt"); } - } catch (IOException e) { + } catch (final IOException e) { throw new PipeException(e.getMessage(), e); } finally { lock.unlock(); @@ -206,20 +210,22 @@ public void deregister(String pluginName, boolean needToDeleteJar) throws PipeEx // TODO: validate pipe plugin attributes for config node public void validate( - String pipeName, - Map extractorAttributes, - Map processorAttributes, - Map connectorAttributes) + final String pipeName, + final Map sourceAttributes, + final Map processorAttributes, + final Map sinkAttributes) throws Exception { - dataRegionAgent.validate( - pipeName, extractorAttributes, processorAttributes, connectorAttributes); - schemaRegionAgent.validate( - pipeName, extractorAttributes, processorAttributes, connectorAttributes); + dataRegionAgent.validate(pipeName, sourceAttributes, processorAttributes, sinkAttributes); + + if (SchemaRegionListeningFilter.shouldSchemaRegionBeListened( + new PipeParameters(sinkAttributes))) { + schemaRegionAgent.validate(pipeName, sourceAttributes, processorAttributes, sinkAttributes); + } } public boolean checkIfPluginSameType(final String oldPluginName, final String newPluginName) { - PipePluginMeta oldPipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(oldPluginName); - PipePluginMeta newPipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(newPluginName); + final PipePluginMeta oldPipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(oldPluginName); + final PipePluginMeta newPipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(newPluginName); if (oldPipePluginMeta == null) { throw new PipeException(String.format("plugin %s is not registered.", oldPluginName)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java index a8d002fb27c08..67a7d6549d99a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java @@ -62,24 +62,24 @@ protected PipeSinkConstructor createPipeSinkConstructor( @Override public void validate( - String pipeName, - Map sourceAttributes, - Map processorAttributes, - Map sinkAttributes) + final String pipeName, + final Map sourceAttributes, + final Map processorAttributes, + final Map sinkAttributes) throws Exception { - PipeExtractor temporaryExtractor = validateSource(sourceAttributes); - PipeProcessor temporaryProcessor = validateProcessor(processorAttributes); - PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes); + final PipeExtractor temporaryExtractor = validateSource(pipeName, sourceAttributes); + final PipeProcessor temporaryProcessor = validateProcessor(processorAttributes); + final PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes); // validate visibility // TODO: validate visibility for schema region and config region - Visibility pipeVisibility = + final Visibility pipeVisibility = VisibilityUtils.calculateFromExtractorParameters(new PipeParameters(sourceAttributes)); - Visibility extractorVisibility = + final Visibility extractorVisibility = VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass()); - Visibility processorVisibility = + final Visibility processorVisibility = VisibilityUtils.calculateFromPluginClass(temporaryProcessor.getClass()); - Visibility connectorVisibility = + final Visibility connectorVisibility = VisibilityUtils.calculateFromPluginClass(temporaryConnector.getClass()); if (!VisibilityUtils.isCompatible( pipeVisibility, extractorVisibility, processorVisibility, connectorVisibility)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 8c1ef90f97ff9..6535d371a915c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -195,29 +195,29 @@ private void removeAutoGauge(final String pipeID) { //////////////////////////// register & deregister (pipe integration) //////////////////////////// - public void register(final IoTDBDataRegionSource extractor) { + public void register(final IoTDBDataRegionSource source) { // The metric is global thus the regionId is omitted - final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime(); + final String pipeID = source.getPipeName() + "_" + source.getCreationTime(); remainingEventAndTimeOperatorMap.computeIfAbsent( pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator( - extractor.getPipeName(), extractor.getCreationTime())); + source.getPipeName(), source.getCreationTime())); if (Objects.nonNull(metricService)) { createMetrics(pipeID); } } - public void register(final IoTDBSchemaRegionSource extractor) { + public void register(final IoTDBSchemaRegionSource source) { // The metric is global thus the regionId is omitted - final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime(); + final String pipeID = source.getPipeName() + "_" + source.getCreationTime(); remainingEventAndTimeOperatorMap .computeIfAbsent( pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator( - extractor.getPipeName(), extractor.getCreationTime())) - .register(extractor); + source.getPipeName(), source.getCreationTime())) + .register(source); if (Objects.nonNull(metricService)) { createMetrics(pipeID); } @@ -233,7 +233,7 @@ public void increaseInsertNodeEventCount(final String pipeName, final long creat public void decreaseInsertNodeEventCount( final String pipeName, final long creationTime, final long transferTime) { - PipeDataNodeRemainingEventAndTimeOperator operator = + final PipeDataNodeRemainingEventAndTimeOperator operator = remainingEventAndTimeOperatorMap.computeIfAbsent( pipeName + "_" + creationTime, k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java index 9c752d9753f01..ce5d60449fdc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java @@ -42,14 +42,14 @@ public class PipeSchemaRegionSourceMetrics implements IMetricSet { @SuppressWarnings("java:S3077") private volatile AbstractMetricService metricService; - private final Map extractorMap = new ConcurrentHashMap<>(); + private final Map sourceMap = new ConcurrentHashMap<>(); //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @Override public void bindTo(final AbstractMetricService metricService) { this.metricService = metricService; - ImmutableSet.copyOf(extractorMap.keySet()).forEach(this::createMetrics); + ImmutableSet.copyOf(sourceMap.keySet()).forEach(this::createMetrics); } private void createMetrics(final String taskID) { @@ -57,24 +57,24 @@ private void createMetrics(final String taskID) { } private void createAutoGauge(final String taskID) { - final IoTDBSchemaRegionSource extractor = extractorMap.get(taskID); + final IoTDBSchemaRegionSource source = sourceMap.get(taskID); metricService.createAutoGauge( Metric.UNTRANSFERRED_SCHEMA_COUNT.toString(), MetricLevel.IMPORTANT, - extractorMap.get(taskID), + sourceMap.get(taskID), IoTDBSchemaRegionSource::getUnTransferredEventCount, Tag.NAME.toString(), - extractor.getPipeName(), + source.getPipeName(), Tag.REGION.toString(), - String.valueOf(extractor.getRegionId()), + String.valueOf(source.getRegionId()), Tag.CREATION_TIME.toString(), - String.valueOf(extractor.getCreationTime())); + String.valueOf(source.getCreationTime())); } @Override public void unbindFrom(final AbstractMetricService metricService) { - ImmutableSet.copyOf(extractorMap.keySet()).forEach(this::deregister); - if (!extractorMap.isEmpty()) { + ImmutableSet.copyOf(sourceMap.keySet()).forEach(this::deregister); + if (!sourceMap.isEmpty()) { LOGGER.warn( "Failed to unbind from pipe schema region extractor metrics, extractor map not empty"); } @@ -85,7 +85,7 @@ private void removeMetrics(final String taskID) { } private void removeAutoGauge(final String taskID) { - final IoTDBSchemaRegionSource extractor = extractorMap.get(taskID); + final IoTDBSchemaRegionSource extractor = sourceMap.get(taskID); // pending event count metricService.remove( MetricType.AUTO_GAUGE, @@ -100,41 +100,41 @@ private void removeAutoGauge(final String taskID) { //////////////////////////// register & deregister (pipe integration) //////////////////////////// - public void register(final IoTDBSchemaRegionSource extractor) { - final String taskID = extractor.getTaskID(); - extractorMap.putIfAbsent(taskID, extractor); + public void register(final IoTDBSchemaRegionSource source) { + final String taskID = source.getTaskID(); + sourceMap.putIfAbsent(taskID, source); if (Objects.nonNull(metricService)) { createMetrics(taskID); } } public void deregister(final String taskID) { - if (!extractorMap.containsKey(taskID)) { + if (!sourceMap.containsKey(taskID)) { LOGGER.warn( - "Failed to deregister pipe schema region extractor metrics, IoTDBSchemaRegionExtractor({}) does not exist", + "Failed to deregister pipe schema region source metrics, IoTDBSchemaRegionSource({}) does not exist", taskID); return; } if (Objects.nonNull(metricService)) { removeMetrics(taskID); } - extractorMap.remove(taskID); + sourceMap.remove(taskID); } //////////////////////////// singleton //////////////////////////// - private static class PipeSchemaRegionExtractorMetricsHolder { + private static class PipeSchemaRegionSourceMetricsHolder { private static final PipeSchemaRegionSourceMetrics INSTANCE = new PipeSchemaRegionSourceMetrics(); - private PipeSchemaRegionExtractorMetricsHolder() { + private PipeSchemaRegionSourceMetricsHolder() { // Empty constructor } } public static PipeSchemaRegionSourceMetrics getInstance() { - return PipeSchemaRegionExtractorMetricsHolder.INSTANCE; + return PipeSchemaRegionSourceMetricsHolder.INSTANCE; } private PipeSchemaRegionSourceMetrics() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 904b335ec5874..867e2487a5087 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -953,7 +953,7 @@ protected TSStatus login() { } long userId = AuthorityChecker.getUserId(username).orElse(-1L); - Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, password); + Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, password, false); if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) { return RpcUtils.getStatus( TSStatusCode.ILLEGAL_PASSWORD.getStatusCode(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 9d476c542941b..07b88a98053b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -64,6 +64,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -83,6 +84,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_CLI_HOSTNAME; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_ID; @@ -91,6 +93,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_CLI_HOSTNAME; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_ID; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; @@ -156,6 +159,8 @@ public void customize( SINK_IOTDB_USER_KEY, CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY); + String passwordString = + parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY); String cliHostnameString = parameters.getStringByKeys(CONNECTOR_IOTDB_CLI_HOSTNAME, SINK_IOTDB_CLI_HOSTNAME); userEntity = new UserEntity(Long.parseLong(userIdString), usernameString, cliHostnameString); @@ -191,6 +196,24 @@ public void customize( if (SESSION_MANAGER.getCurrSession() == null) { SESSION_MANAGER.registerSession(session); } + + // Check the password and its expiration + if (Objects.nonNull(passwordString) + && SESSION_MANAGER + .login( + session, + usernameString, + passwordString, + ZoneId.systemDefault().toString(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TREE, + environment.getRegionId() >= 0) + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipePasswordCheckException( + String.format("Failed to check password for pipe %s.", environment.getPipeName())); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index d1ceb48e8aad0..1c510be959b08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.source.dataregion; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; @@ -39,6 +40,9 @@ import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionTsFileSource; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; @@ -50,11 +54,16 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + +import java.time.ZoneId; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -128,8 +137,8 @@ public class IoTDBDataRegionSource extends IoTDBSource { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSource.class); - private PipeHistoricalDataRegionSource historicalExtractor; - private PipeRealtimeDataRegionSource realtimeExtractor; + private PipeHistoricalDataRegionSource historicalSource; + private PipeRealtimeDataRegionSource realtimeSource; private DataRegionWatermarkInjector watermarkInjector; @@ -294,11 +303,11 @@ public void validate(final PipeParameterValidator validator) throws Exception { checkInvalidParameters(validator); - constructHistoricalExtractor(); - constructRealtimeExtractor(validator.getParameters()); + constructHistoricalSource(); + constructRealtimeSource(validator.getParameters()); - historicalExtractor.validate(validator); - realtimeExtractor.validate(validator); + historicalSource.validate(validator); + realtimeSource.validate(validator); } private void validatePattern(final TreePattern treePattern) { @@ -430,16 +439,16 @@ private void checkInvalidParameters(final PipeParameterValidator validator) { } } - private void constructHistoricalExtractor() { - historicalExtractor = new PipeHistoricalDataRegionTsFileAndDeletionSource(); + private void constructHistoricalSource() { + historicalSource = new PipeHistoricalDataRegionTsFileAndDeletionSource(); } - private void constructRealtimeExtractor(final PipeParameters parameters) { + private void constructRealtimeSource(final PipeParameters parameters) { // Use heartbeat only source if disable realtime source if (!parameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { - realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource(); + realtimeSource = new PipeRealtimeDataRegionHeartbeatSource(); LOGGER.info( "Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.", EXTRACTOR_REALTIME_ENABLE_KEY, @@ -449,7 +458,7 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { // Use heartbeat only source if enable snapshot mode if (PipeTaskAgent.isSnapshotMode(parameters)) { - realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource(); + realtimeSource = new PipeRealtimeDataRegionHeartbeatSource(); LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime source."); return; } @@ -457,7 +466,7 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); LOGGER.info( "Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by default.", EXTRACTOR_MODE_STREAMING_KEY, @@ -473,9 +482,9 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY), EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE); if (isStreamingMode) { - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); } else { - realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + realtimeSource = new PipeRealtimeDataRegionTsFileSource(); } return; } @@ -483,18 +492,18 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { case EXTRACTOR_REALTIME_MODE_FILE_VALUE: case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE: - realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + realtimeSource = new PipeRealtimeDataRegionTsFileSource(); break; case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE: case EXTRACTOR_REALTIME_MODE_LOG_VALUE: case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE: - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); break; case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE: - realtimeExtractor = new PipeRealtimeDataRegionLogSource(); + realtimeSource = new PipeRealtimeDataRegionLogSource(); break; default: - realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); + realtimeSource = new PipeRealtimeDataRegionHybridSource(); if (LOGGER.isWarnEnabled()) { LOGGER.warn( "Pipe: Unsupported source realtime mode: {}, create a hybrid source.", @@ -513,8 +522,8 @@ public void customize( super.customize(parameters, configuration); - historicalExtractor.customize(parameters, configuration); - realtimeExtractor.customize(parameters, configuration); + historicalSource.customize(parameters, configuration); + realtimeSource.customize(parameters, configuration); // Set watermark injector long watermarkIntervalInMs = EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE; @@ -542,8 +551,29 @@ public void customize( // register metric after generating taskID PipeDataRegionSourceMetrics.getInstance().register(this); - PipeTsFileToTabletsMetrics.getInstance().register(this); - PipeDataNodeSinglePipeMetrics.getInstance().register(this); + if (regionId >= 0) { + PipeTsFileToTabletsMetrics.getInstance().register(this); + PipeDataNodeSinglePipeMetrics.getInstance().register(this); + } + } + + @Override + protected void login(final @Nonnull String password) { + if (SessionManager.getInstance() + .login( + new InternalClientSession("Source_login_session_" + regionId), + userName, + password, + ZoneId.systemDefault().toString(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TREE, + regionId >= 0) + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipePasswordCheckException( + String.format("Failed to check password for pipe %s.", pipeName)); + } } @Override @@ -557,8 +587,8 @@ public void start() throws Exception { "Pipe {}@{}: Starting historical source {} and realtime source {}.", pipeName, regionId, - historicalExtractor.getClass().getSimpleName(), - realtimeExtractor.getClass().getSimpleName()); + historicalSource.getClass().getSimpleName(), + realtimeSource.getClass().getSimpleName()); super.start(); @@ -591,8 +621,8 @@ public void start() throws Exception { "Pipe {}@{}: Started historical source {} and realtime source {} successfully within {} ms.", pipeName, regionId, - historicalExtractor.getClass().getSimpleName(), - realtimeExtractor.getClass().getSimpleName(), + historicalSource.getClass().getSimpleName(), + realtimeSource.getClass().getSimpleName(), System.currentTimeMillis() - startTime); return; } @@ -603,21 +633,21 @@ public void start() throws Exception { private void startHistoricalExtractorAndRealtimeExtractor( final AtomicReference exceptionHolder) { try { - // Start realtimeExtractor first to avoid losing data. This may cause some + // Start realtimeSource first to avoid losing data. This may cause some // retransmission, yet it is OK according to the idempotency of IoTDB. // Note: The order of historical collection is flushing data -> adding all tsFile events. // There can still be writing when tsFile events are added. If we start - // realtimeExtractor after the process, then this part of data will be lost. - realtimeExtractor.start(); - historicalExtractor.start(); + // realtimeSource after the process, then this part of data will be lost. + realtimeSource.start(); + historicalSource.start(); } catch (final Exception e) { exceptionHolder.set(e); LOGGER.warn( "Pipe {}@{}: Start historical source {} and realtime source {} error.", pipeName, regionId, - historicalExtractor.getClass().getSimpleName(), - realtimeExtractor.getClass().getSimpleName(), + historicalSource.getClass().getSimpleName(), + realtimeSource.getClass().getSimpleName(), e); } } @@ -635,14 +665,14 @@ public Event supply() throws Exception { } Event event = null; - if (!historicalExtractor.hasConsumedAll()) { - event = historicalExtractor.supply(); + if (!historicalSource.hasConsumedAll()) { + event = historicalSource.supply(); } else { if (Objects.nonNull(watermarkInjector)) { event = watermarkInjector.inject(); } if (Objects.isNull(event)) { - event = realtimeExtractor.supply(); + event = realtimeSource.supply(); } } @@ -665,8 +695,8 @@ public void close() throws Exception { return; } - historicalExtractor.close(); - realtimeExtractor.close(); + historicalSource.close(); + realtimeSource.close(); if (Objects.nonNull(taskID)) { PipeDataRegionSourceMetrics.getInstance().deregister(taskID); } @@ -675,20 +705,20 @@ public void close() throws Exception { //////////////////////////// APIs provided for metric framework //////////////////////////// public int getHistoricalTsFileInsertionEventCount() { - return hasBeenStarted.get() && Objects.nonNull(historicalExtractor) - ? historicalExtractor.getPendingQueueSize() + return hasBeenStarted.get() && Objects.nonNull(historicalSource) + ? historicalSource.getPendingQueueSize() : 0; } public int getTabletInsertionEventCount() { - return hasBeenStarted.get() ? realtimeExtractor.getTabletInsertionEventCount() : 0; + return hasBeenStarted.get() ? realtimeSource.getTabletInsertionEventCount() : 0; } public int getRealtimeTsFileInsertionEventCount() { - return hasBeenStarted.get() ? realtimeExtractor.getTsFileInsertionEventCount() : 0; + return hasBeenStarted.get() ? realtimeSource.getTsFileInsertionEventCount() : 0; } public int getPipeHeartbeatEventCount() { - return hasBeenStarted.get() ? realtimeExtractor.getPipeHeartbeatEventCount() : 0; + return hasBeenStarted.get() ? realtimeSource.getPipeHeartbeatEventCount() : 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index ab74d09474020..75934029358d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -305,17 +306,17 @@ public void customize( return; } - final PipeTaskSourceRuntimeEnvironment environment = - (PipeTaskSourceRuntimeEnvironment) configuration.getRuntimeEnvironment(); + final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); pipeName = environment.getPipeName(); creationTime = environment.getCreationTime(); - pipeTaskMeta = environment.getPipeTaskMeta(); - if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { - startIndex = - tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex()); - } else { - startIndex = environment.getPipeTaskMeta().getProgressIndex(); + if (environment instanceof PipeTaskSourceRuntimeEnvironment) { + pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment) environment).getPipeTaskMeta(); + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + startIndex = tryToExtractLocalProgressIndexForIoTV2(pipeTaskMeta.getProgressIndex()); + } else { + startIndex = pipeTaskMeta.getProgressIndex(); + } } dataRegionId = environment.getRegionId(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index cd00e2975a0aa..971fb5bc84b02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -205,8 +206,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { public void customize( final PipeParameters parameters, final PipeExtractorRuntimeConfiguration configuration) throws Exception { - final PipeTaskSourceRuntimeEnvironment environment = - (PipeTaskSourceRuntimeEnvironment) configuration.getRuntimeEnvironment(); + final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); final Pair insertionDeletionListeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters); @@ -215,7 +215,9 @@ public void customize( pipeName = environment.getPipeName(); dataRegionId = String.valueOf(environment.getRegionId()); - pipeTaskMeta = environment.getPipeTaskMeta(); + if (environment instanceof PipeTaskSourceRuntimeEnvironment) { + pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment) environment).getPipeTaskMeta(); + } // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are // indexed by the taskID of IoTDBDataRegionExtractor. To avoid PipeRealtimeDataRegionExtractor @@ -302,7 +304,7 @@ public void customize( if (LOGGER.isInfoEnabled()) { LOGGER.info( - "Pipe {}@{}: realtime data region extractor is initialized with parameters: {}.", + "Pipe {}@{}: realtime data region source is initialized with parameters: {}.", pipeName, dataRegionId, parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 8ea973fbf0a3a..6ae2a12cf3428 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -58,12 +58,12 @@ public class PipeDataRegionAssigner implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionAssigner.class); /** - * The {@link PipeDataRegionMatcher} is used to match the event with the extractor based on the + * The {@link PipeDataRegionMatcher} is used to match the event with the source based on the * pattern. */ private final PipeDataRegionMatcher matcher; - /** The {@link DisruptorQueue} is used to assign the event to the extractor. */ + /** The {@link DisruptorQueue} is used to assign the event to the source. */ private final DisruptorQueue disruptor; private final String dataRegionId; @@ -140,17 +140,15 @@ private void assignToExtractor( matchedAndUnmatched .getLeft() .forEach( - extractor -> { + source -> { if (disruptor.isClosed()) { return; } - if (event.getEvent().isGeneratedByPipe() && !extractor.isForwardingPipeRequests()) { + if (event.getEvent().isGeneratedByPipe() && !source.isForwardingPipeRequests()) { final ProgressReportEvent reportEvent = new ProgressReportEvent( - extractor.getPipeName(), - extractor.getCreationTime(), - extractor.getPipeTaskMeta()); + source.getPipeName(), source.getCreationTime(), source.getPipeTaskMeta()); reportEvent.bindProgressIndex(event.getProgressIndex()); if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { LOGGER.warn( @@ -158,33 +156,33 @@ private void assignToExtractor( reportEvent); return; } - extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); + source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); return; } final PipeRealtimeEvent copiedEvent = event.shallowCopySelfAndBindPipeTaskMetaForProgressReport( - extractor.getPipeName(), - extractor.getCreationTime(), - extractor.getPipeTaskMeta(), - extractor.getTreePattern(), - extractor.getTablePattern(), - String.valueOf(extractor.getUserId()), - extractor.getUserName(), - extractor.getCliHostname(), - extractor.isSkipIfNoPrivileges(), - extractor.getRealtimeDataExtractionStartTime(), - extractor.getRealtimeDataExtractionEndTime()); + source.getPipeName(), + source.getCreationTime(), + source.getPipeTaskMeta(), + source.getTreePattern(), + source.getTablePattern(), + String.valueOf(source.getUserId()), + source.getUserName(), + source.getCliHostname(), + source.isSkipIfNoPrivileges(), + source.getRealtimeDataExtractionStartTime(), + source.getRealtimeDataExtractionEndTime()); final EnrichedEvent innerEvent = copiedEvent.getEvent(); // if using IoTV2, assign a replicateIndex for this realtime event if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus && PipeConsensusProcessor.isShouldReplicate(innerEvent)) { innerEvent.setReplicateIndexForIoTV2( ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2( - extractor.getPipeName())); + source.getPipeName())); LOGGER.debug( "[{}]Set {} for realtime event {}", - extractor.getPipeName(), + source.getPipeName(), innerEvent.getReplicateIndexForIoTV2(), innerEvent); } @@ -192,15 +190,14 @@ private void assignToExtractor( if (innerEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) innerEvent; - tsFileInsertionEvent.disableMod4NonTransferPipes( - extractor.isShouldTransferModFile()); + tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile()); } if (innerEvent instanceof PipeDeleteDataNodeEvent) { final PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) innerEvent; final DeletionResourceManager manager = - DeletionResourceManager.getInstance(extractor.getDataRegionId()); + DeletionResourceManager.getInstance(source.getDataRegionId()); // increase deletion resource's reference and bind real deleteEvent if (Objects.nonNull(manager) && DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2( @@ -217,13 +214,13 @@ private void assignToExtractor( copiedEvent); return; } - extractor.extract(copiedEvent); + source.extract(copiedEvent); }); matchedAndUnmatched .getRight() .forEach( - extractor -> { + source -> { if (disruptor.isClosed()) { return; } @@ -233,9 +230,7 @@ private void assignToExtractor( || innerEvent instanceof TsFileInsertionEvent) { final ProgressReportEvent reportEvent = new ProgressReportEvent( - extractor.getPipeName(), - extractor.getCreationTime(), - extractor.getPipeTaskMeta()); + source.getPipeName(), source.getCreationTime(), source.getPipeTaskMeta()); reportEvent.bindProgressIndex(event.getProgressIndex()); if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { LOGGER.warn( @@ -243,17 +238,17 @@ private void assignToExtractor( reportEvent); return; } - extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); + source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); } }); } - public void startAssignTo(final PipeRealtimeDataRegionSource extractor) { - matcher.register(extractor); + public void startAssignTo(final PipeRealtimeDataRegionSource source) { + matcher.register(source); } - public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) { - matcher.deregister(extractor); + public void stopAssignTo(final PipeRealtimeDataRegionSource source) { + matcher.deregister(source); } public void invalidateCache() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java index b70ff4b7d8d0a..f743e1dc7ccdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java @@ -42,6 +42,9 @@ import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSourceMetrics; import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -58,12 +61,16 @@ import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.common.constant.TsFileConstant; +import javax.annotation.Nonnull; + import java.io.IOException; import java.nio.file.Paths; +import java.time.ZoneId; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -107,7 +114,7 @@ public void customize( .getSchemaRegionConsensusProtocolClass() .equals(ConsensusFactory.SIMPLE_CONSENSUS)) { throw new PipeException( - "IoTDBSchemaRegionExtractor does not transferring events under simple consensus"); + "IoTDBSchemaRegionSource does not support transferring events under simple consensus"); } super.customize(parameters, configuration); @@ -117,7 +124,9 @@ public void customize( treePrivilegeParseVisitor = new PipePlanTreePrivilegeParseVisitor(skipIfNoPrivileges); PipeSchemaRegionSourceMetrics.getInstance().register(this); - PipeDataNodeSinglePipeMetrics.getInstance().register(this); + if (regionId >= 0) { + PipeDataNodeSinglePipeMetrics.getInstance().register(this); + } } @Override @@ -146,6 +155,25 @@ public void start() throws Exception { super.start(); } + @Override + protected void login(final @Nonnull String password) { + if (SessionManager.getInstance() + .login( + new InternalClientSession("Source_login_session_" + regionId), + userName, + password, + ZoneId.systemDefault().toString(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TREE, + regionId >= 0) + .getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipePasswordCheckException( + String.format("Failed to check password for pipe %s.", pipeName)); + } + } + @Override protected boolean needTransferSnapshot() { // Note: the schema region will transfer snapshot if there are table or tree planNode captured. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java index ad37f27ef95ce..b0501d00ae63b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java @@ -121,7 +121,12 @@ public static boolean shouldSchemaRegionBeListened( .getDatabaseFullPath()); return (TreePattern.isTreeModelDataAllowToBeCaptured(parameters) && !isTableModel || TablePattern.isTableModelDataAllowToBeCaptured(parameters) && isTableModel) - && !parseListeningPlanTypeSet(parameters).isEmpty(); + && shouldSchemaRegionBeListened(parameters); + } + + public static boolean shouldSchemaRegionBeListened(final PipeParameters parameters) + throws IllegalPathException { + return !parseListeningPlanTypeSet(parameters).isEmpty(); } public static Set parseListeningPlanTypeSet(final PipeParameters parameters) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java index 975cbb09437bf..d6cdfa63a323c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java @@ -42,7 +42,7 @@ public InternalClientSession(String clientID) { @Override public String getClientAddress() { - return clientID; + return ""; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index ee00655211fe2..ac7cc236d9f6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -130,11 +130,26 @@ public BasicOpenSessionResp login( TSProtocolVersion tsProtocolVersion, IoTDBConstant.ClientVersion clientVersion, IClientSession.SqlDialect sqlDialect) { - BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp(); - - long userId = AuthorityChecker.getUserId(username).orElse(-1L); + return login( + session, username, password, zoneId, tsProtocolVersion, clientVersion, sqlDialect, false); + } - Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, password); + // Only pipe can set useEncryptedPassword to true + public BasicOpenSessionResp login( + final IClientSession session, + final String username, + final String password, + final String zoneId, + final TSProtocolVersion tsProtocolVersion, + final IoTDBConstant.ClientVersion clientVersion, + final IClientSession.SqlDialect sqlDialect, + final boolean useEncryptedPassword) { + final BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp(); + + final long userId = AuthorityChecker.getUserId(username).orElse(-1L); + + Long timeToExpire = + DataNodeAuthUtils.checkPasswordExpiration(userId, password, useEncryptedPassword); if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) { openSessionResp .sessionId(-1) @@ -154,7 +169,8 @@ public BasicOpenSessionResp login( return openSessionResp; } - TSStatus loginStatus = AuthorityChecker.checkUser(username, password); + final TSStatus loginStatus = + AuthorityChecker.checkUser(username, password, useEncryptedPassword); if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // check the version compatibility if (!tsProtocolVersion.equals(CURRENT_RPC_VERSION)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 60deee795093b..1ea10ef37e840 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -683,16 +683,15 @@ public IConfigTask visitAlterPipe( alterPipeStatement.setUserName(userName); final String pipeName = alterPipeStatement.getPipeName(); - final Map extractorAttributes = alterPipeStatement.getSourceAttributes(); + final Map sourceAttributes = alterPipeStatement.getSourceAttributes(); // If the source is replaced, sql-dialect uses the current Alter Pipe sql-dialect. If it is // modified, the original sql-dialect is used. if (alterPipeStatement.isReplaceAllSourceAttributes()) { - extractorAttributes.put( - SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + sourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); checkAndEnrichSourceUser( pipeName, - extractorAttributes, + sourceAttributes, new UserEntity(context.getUserId(), context.getUsername(), context.getCliHostname()), true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index cb45757d3be0b..247e98bcd9fc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -328,6 +328,7 @@ import org.apache.iotdb.db.trigger.service.TriggerClassLoader; import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -372,6 +373,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -994,7 +996,7 @@ public SettableFuture createPipePlugin( future.setException( new IoTDBException( String.format("Failed to create pipe plugin %s. " + pathError, pluginName), - TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())); + TSStatusCode.SEMANTIC_ERROR.getStatusCode())); return future; } @@ -2175,7 +2177,7 @@ public SettableFuture createPipe( future.setException( new IoTDBException( String.format("Failed to create pipe %s, " + pathError, pipeName), - TSStatusCode.PIPE_ERROR.getStatusCode())); + TSStatusCode.SEMANTIC_ERROR.getStatusCode())); return future; } @@ -2189,7 +2191,11 @@ public SettableFuture createPipe( createPipeStatement.getSinkAttributes()); } catch (final Exception e) { future.setException( - new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode())); + new IoTDBException( + e.getMessage(), + e instanceof PipePasswordCheckException + ? TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode() + : TSStatusCode.PIPE_ERROR.getStatusCode())); return future; } @@ -2353,6 +2359,8 @@ public SettableFuture alterPipe(final AlterPipeStatement alter return future; } + boolean hasSourcePassword = false; + boolean hasSinkPassword = false; // Construct temporary pipe static meta for validation final String pipeName = alterPipeStatement.getPipeName(); final Map sourceAttributes; @@ -2368,6 +2376,7 @@ public SettableFuture alterPipe(final AlterPipeStatement alter pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(), new PipeParameters(alterPipeStatement.getSourceAttributes())); } + hasSourcePassword = containsPassword(alterPipeStatement.getSourceAttributes()); if (alterPipeStatement.isReplaceAllSourceAttributes()) { sourceAttributes = alterPipeStatement.getSourceAttributes(); } else { @@ -2407,6 +2416,7 @@ public SettableFuture alterPipe(final AlterPipeStatement alter } if (!alterPipeStatement.getSinkAttributes().isEmpty()) { + hasSinkPassword = containsPassword(alterPipeStatement.getSinkAttributes()); if (alterPipeStatement.isReplaceAllSinkAttributes()) { sinkAttributes = alterPipeStatement.getSinkAttributes(); } else { @@ -2426,11 +2436,29 @@ public SettableFuture alterPipe(final AlterPipeStatement alter sinkAttributes = pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute(); } + final Map checkedSource = new HashMap<>(sourceAttributes); + if (!hasSourcePassword) { + checkedSource.remove(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY); + checkedSource.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + checkedSource.remove( + PipeParameters.KeyReducer.reduce(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + } + final Map checkedSink = new HashMap<>(sinkAttributes); + if (!hasSinkPassword) { + checkedSink.remove(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY); + checkedSink.remove(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + checkedSink.remove( + PipeParameters.KeyReducer.reduce(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)); + } PipeDataNodeAgent.plugin() - .validate(pipeName, sourceAttributes, processorAttributes, sinkAttributes); + .validate(pipeName, checkedSource, processorAttributes, checkedSink); } catch (final Exception e) { future.setException( - new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode())); + new IoTDBException( + e.getMessage(), + e instanceof PipePasswordCheckException + ? TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode() + : TSStatusCode.PIPE_ERROR.getStatusCode())); return future; } @@ -2500,16 +2528,20 @@ private static void checkSourceType( } } - private static boolean onlyContainsUser( - final Map extractorOrConnectorAttributes) { - final PipeParameters extractorOrConnectorParameters = - new PipeParameters(extractorOrConnectorAttributes); - return extractorOrConnectorParameters.hasAnyAttributes( + private static boolean containsPassword(final Map sourceOrSinkAttributes) { + final PipeParameters sourceOrSinkParameters = new PipeParameters(sourceOrSinkAttributes); + return sourceOrSinkParameters.hasAnyAttributes( + PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); + } + + private static boolean onlyContainsUser(final Map sourceOrSinkAttributes) { + final PipeParameters sourceOrSinkParameters = new PipeParameters(sourceOrSinkAttributes); + return sourceOrSinkParameters.hasAnyAttributes( PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY, PipeSinkConstant.SINK_IOTDB_USER_KEY, PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY, PipeSinkConstant.SINK_IOTDB_USERNAME_KEY) - && !extractorOrConnectorParameters.hasAnyAttributes( + && !sourceOrSinkParameters.hasAnyAttributes( PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java index aecadff7f81af..9b5857d263595 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java @@ -75,25 +75,25 @@ private void applyNowFunctionToExtractorAttributes(final Map att CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); // support now() function - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_START_TIME_KEY, PipeSourceConstant.EXTRACTOR_START_TIME_KEY, currentTime); - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_END_TIME_KEY, PipeSourceConstant.EXTRACTOR_END_TIME_KEY, currentTime); - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY, PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY, currentTime); - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY, PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java index a5fbd36f88d4f..fc3884cb27e21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java @@ -37,19 +37,19 @@ public class CreatePipeTask implements IConfigTask { private final CreatePipeStatement createPipeStatement; - public CreatePipeTask(CreatePipeStatement createPipeStatement) { + public CreatePipeTask(final CreatePipeStatement createPipeStatement) { // support now() function - applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes()); + applyNowFunction2SourceAttributes(createPipeStatement.getSourceAttributes()); this.createPipeStatement = createPipeStatement; } - public CreatePipeTask(CreatePipe createPipe) { + public CreatePipeTask(final CreatePipe createPipe) { createPipeStatement = new CreatePipeStatement(StatementType.CREATE_PIPE); createPipeStatement.setPipeName(createPipe.getPipeName()); createPipeStatement.setIfNotExists(createPipe.hasIfNotExistsCondition()); // support now() function - applyNowFunctionToExtractorAttributes(createPipe.getSourceAttributes()); + applyNowFunction2SourceAttributes(createPipe.getSourceAttributes()); createPipeStatement.setSourceAttributes(createPipe.getSourceAttributes()); createPipeStatement.setProcessorAttributes(createPipe.getProcessorAttributes()); @@ -57,37 +57,37 @@ public CreatePipeTask(CreatePipe createPipe) { } @Override - public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + public ListenableFuture execute(final IConfigTaskExecutor configTaskExecutor) throws InterruptedException { return configTaskExecutor.createPipe(createPipeStatement); } - private void applyNowFunctionToExtractorAttributes(final Map attributes) { + private void applyNowFunction2SourceAttributes(final Map attributes) { final long currentTime = CommonDateTimeUtils.convertMilliTimeWithPrecision( System.currentTimeMillis(), CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); // support now() function - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_START_TIME_KEY, PipeSourceConstant.EXTRACTOR_START_TIME_KEY, currentTime); - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_END_TIME_KEY, PipeSourceConstant.EXTRACTOR_END_TIME_KEY, currentTime); - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY, PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY, currentTime); - PipeFunctionSupport.applyNowFunctionToExtractorAttributes( + PipeFunctionSupport.applyNowFunction2SourceAttributes( attributes, PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY, PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java index 0edc18c3f0018..e9fb09e875032 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java @@ -31,7 +31,7 @@ public class PipeFunctionSupport { private static final Logger LOGGER = LoggerFactory.getLogger(PipeFunctionSupport.class); - public static void applyNowFunctionToExtractorAttributes( + public static void applyNowFunction2SourceAttributes( final Map extractorAttributes, final String sourceKey, final String extractorKey, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java index 187575c58da7b..6ee378282adb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java @@ -275,7 +275,8 @@ public static TSStatus deletePasswordHistory(long userId) { * @return the timestamp when the password will expire. Long.MAX if the password never expires. * Null if the password history cannot be found. */ - public static Long checkPasswordExpiration(long userId, String password) { + public static Long checkPasswordExpiration( + final long userId, final String password, final boolean useEncryptedPassword) { if (userId == -1) { return null; } @@ -335,7 +336,8 @@ public static Long checkPasswordExpiration(long userId, String password) { CommonDateTimeUtils.convertIoTDBTimeToMillis(tsBlock.getTimeByIndex(0)); // columns of last query: [timeseriesName, value, dataType] String oldPassword = tsBlock.getColumn(1).getBinary(0).toString(); - if (oldPassword.equals(AuthUtils.encryptPassword(password))) { + if (oldPassword.equals( + useEncryptedPassword ? password : AuthUtils.encryptPassword(password))) { if (lastPasswordTime + passwordExpirationDays * 1000 * 86400 <= lastPasswordTime) { // overflow or passwordExpirationDays <= 0 return Long.MAX_VALUE; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java index 9d0acaf14d898..194f33e8d67b3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java @@ -64,8 +64,8 @@ public void tearDown() throws Exception { @Test public void testLogin() throws AuthException { - Assert.assertTrue(authorizer.login("root", "root")); - Assert.assertThrows(AuthException.class, () -> authorizer.login("root", "error")); + Assert.assertTrue(authorizer.login("root", "root", false)); + Assert.assertThrows(AuthException.class, () -> authorizer.login("root", "error", false)); } @Test @@ -76,7 +76,7 @@ public void createAndDeleteUser() throws AuthException { } catch (AuthException e) { assertEquals("User user already exists", e.getMessage()); } - Assert.assertTrue(authorizer.login(userName, password)); + Assert.assertTrue(authorizer.login(userName, password, false)); authorizer.deleteUser(userName); try { authorizer.deleteUser(userName); @@ -230,7 +230,7 @@ public void testUserRole() throws AuthException { public void testUpdatePassword() throws AuthException { authorizer.createUser(userName, password); authorizer.updateUserPassword(userName, "newPassword123456"); - Assert.assertTrue(authorizer.login(userName, "newPassword123456")); + Assert.assertTrue(authorizer.login(userName, "newPassword123456", false)); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java index bd0c2d399e564..196cc80e5b6da 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java @@ -60,7 +60,7 @@ public void loginWithJWT() throws AuthException, ParseException { "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE5MmQ1MzMzOTMzMCIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgcHJvZmlsZSIsImNsaWVudEhvc3QiOiIxOTIuMTY4LjE2OS4yMSIsImNsaWVudElkIjoiaW90ZGIiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1pb3RkYiIsImNsaWVudEFkZHJlc3MiOiIxOTIuMTY4LjE2OS4yMSJ9.GxQFltm1PrZzVL7rR6K-GpQINFLymjqAxxoDt_DGfQEMt61M6ebmx2oHiP_3G0HDSl7sbamajQbbRrfyTg--emBC2wfhdZ7v_7O0qWC60Yd8cWZ9qxwqwTFKYb8a0Z6_TeH9-vUmsy6kp2BfJZXq3mSy0My21VGUAXRmWTbghiM4RFoHKjAZVhsPHWelFmtLftYPdOGxv-7c9iUOVh_W-nOcCNRJpYY7BEjUYN24TsjvCEwWDQWD9E29LMYfA6LNeG0KdL9Jvqad4bc2FTJn9TaCnJMCiAJ7wEEiotqhXn70uEBWYxGXIVlm3vn3MDe3pTKA2TZy7U5xcrE7S8aGMg"; OpenIdAuthorizer authorizer = new OpenIdAuthorizer(JSONObjectUtils.parse(OPEN_ID_PUBLIC_JWK)); - boolean login = authorizer.login(jwt, null); + boolean login = authorizer.login(jwt, null, false); assertTrue(login); } @@ -99,14 +99,16 @@ public void fetchMetadata() boolean login = openIdAuthorizer.login( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE5MmQ1MzMzOTMzMCIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgcHJvZmlsZSIsImNsaWVudEhvc3QiOiIxOTIuMTY4LjE2OS4yMSIsImNsaWVudElkIjoiaW90ZGIiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1pb3RkYiIsImNsaWVudEFkZHJlc3MiOiIxOTIuMTY4LjE2OS4yMSJ9.GxQFltm1PrZzVL7rR6K-GpQINFLymjqAxxoDt_DGfQEMt61M6ebmx2oHiP_3G0HDSl7sbamajQbbRrfyTg--emBC2wfhdZ7v_7O0qWC60Yd8cWZ9qxwqwTFKYb8a0Z6_TeH9-vUmsy6kp2BfJZXq3mSy0My21VGUAXRmWTbghiM4RFoHKjAZVhsPHWelFmtLftYPdOGxv-7c9iUOVh_W-nOcCNRJpYY7BEjUYN24TsjvCEwWDQWD9E29LMYfA6LNeG0KdL9Jvqad4bc2FTJn9TaCnJMCiAJ7wEEiotqhXn70uEBWYxGXIVlm3vn3MDe3pTKA2TZy7U5xcrE7S8aGMg", - ""); + "", + false); assertTrue(login); config.setOpenIdProviderUrl("https://auth.demo.pragmaticindustries.de/auth/realms/IoTDB/"); OpenIdAuthorizer openIdAuthorizer1 = new OpenIdAuthorizer(); login = openIdAuthorizer1.login( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE5MmQ1MzMzOTMzMCIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgcHJvZmlsZSIsImNsaWVudEhvc3QiOiIxOTIuMTY4LjE2OS4yMSIsImNsaWVudElkIjoiaW90ZGIiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1pb3RkYiIsImNsaWVudEFkZHJlc3MiOiIxOTIuMTY4LjE2OS4yMSJ9.GxQFltm1PrZzVL7rR6K-GpQINFLymjqAxxoDt_DGfQEMt61M6ebmx2oHiP_3G0HDSl7sbamajQbbRrfyTg--emBC2wfhdZ7v_7O0qWC60Yd8cWZ9qxwqwTFKYb8a0Z6_TeH9-vUmsy6kp2BfJZXq3mSy0My21VGUAXRmWTbghiM4RFoHKjAZVhsPHWelFmtLftYPdOGxv-7c9iUOVh_W-nOcCNRJpYY7BEjUYN24TsjvCEwWDQWD9E29LMYfA6LNeG0KdL9Jvqad4bc2FTJn9TaCnJMCiAJ7wEEiotqhXn70uEBWYxGXIVlm3vn3MDe3pTKA2TZy7U5xcrE7S8aGMg", - ""); + "", + false); assertTrue(login); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java index 98f70dc0ee609..6ba8c5336b193 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java @@ -107,12 +107,17 @@ private void checkAdmin(long userId, String errmsg) throws AuthException { } @Override - public boolean login(String username, String password) throws AuthException { + public boolean login( + final String username, final String password, final boolean useEncryptedPassword) + throws AuthException { User user = userManager.getEntity(username); if (user == null || password == null) { throw new AuthException( TSStatusCode.USER_NOT_EXIST, String.format("The user %s does not exist.", username)); } + if (useEncryptedPassword) { + return password.equals(user.getPassword()); + } if (AuthUtils.validatePassword( password, user.getPassword(), AsymmetricEncrypt.DigestAlgorithm.SHA_256)) { return true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java index 445b29c0790dc..3ac33dbeddd41 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java @@ -42,7 +42,8 @@ public interface IAuthorizer extends SnapshotProcessor { * @param password The password of the user. * @return True if such user exists and the given password is correct, else return false. */ - boolean login(String username, String password) throws AuthException; + boolean login(String username, String password, final boolean useEncryptedPassword) + throws AuthException; /** * Login for a user in pipe. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java index 2da1acfaee97c..ee66ee5bced95 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java @@ -145,7 +145,8 @@ private static OIDCProviderMetadata fetchMetadata(String providerUrl) } @Override - public boolean login(String token, String password) throws AuthException { + public boolean login(String token, String password, final boolean useEncryptedPassword) + throws AuthException { if (password != null && !password.isEmpty()) { logger.error( "JWT Login failed as a non-empty Password was given username (token): {}", token); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java index 519d263449b37..45a1acbfed33a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java @@ -87,28 +87,31 @@ public void validate( final Map processorAttributes, final Map sinkAttributes) throws Exception { - validateSource(sourceAttributes); + validateSource(pipeName, sourceAttributes); validateProcessor(processorAttributes); validateSink(pipeName, sinkAttributes); } - protected PipeExtractor validateSource(final Map sourceAttributes) - throws Exception { + protected PipeExtractor validateSource( + final String pipeName, final Map sourceAttributes) throws Exception { final PipeParameters sourceParameters = new PipeParameters(sourceAttributes); - final PipeExtractor temporaryExtractor = reflectSource(sourceParameters); + final PipeExtractor temporarySource = reflectSource(sourceParameters); try { - temporaryExtractor.validate(new PipeParameterValidator(sourceParameters)); + temporarySource.validate(new PipeParameterValidator(sourceParameters)); + temporarySource.customize( + sourceParameters, + new PipeTaskRuntimeConfiguration(new PipeTaskTemporaryRuntimeEnvironment(pipeName))); } finally { try { - temporaryExtractor.close(); - } catch (Exception e) { + temporarySource.close(); + } catch (final Exception e) { LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e); } } - return temporaryExtractor; + return temporarySource; } - protected PipeProcessor validateProcessor(Map processorAttributes) + protected PipeProcessor validateProcessor(final Map processorAttributes) throws Exception { final PipeParameters processorParameters = new PipeParameters(processorAttributes); final PipeProcessor temporaryProcessor = reflectProcessor(processorParameters); @@ -117,15 +120,15 @@ protected PipeProcessor validateProcessor(Map processorAttribute } finally { try { temporaryProcessor.close(); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn("Failed to close temporary processor: {}", e.getMessage(), e); } } return temporaryProcessor; } - protected PipeConnector validateSink(String pipeName, Map sinkAttributes) - throws Exception { + protected PipeConnector validateSink( + final String pipeName, final Map sinkAttributes) throws Exception { final PipeParameters sinkParameters = new PipeParameters(sinkAttributes); final PipeConnector temporarySink = reflectSink(sinkParameters); try { @@ -137,7 +140,7 @@ protected PipeConnector validateSink(String pipeName, Map sinkAt } finally { try { temporarySink.close(); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(), e); } } @@ -154,7 +157,7 @@ protected PipeConnector validateSink(String pipeName, Map sinkAt * @throws PipeException if any exception occurs */ public final List getSubProcessorNamesWithSpecifiedParent( - Class parentClass) throws PipeException { + final Class parentClass) throws PipeException { return StreamSupport.stream(pipePluginMetaKeeper.getAllPipePluginMeta().spliterator(), false) .map(pipePluginMeta -> pipePluginMeta.getPluginName().toLowerCase()) .filter( @@ -162,7 +165,7 @@ public final List getSubProcessorNamesWithSpecifiedParent( try (PipeProcessor processor = (PipeProcessor) pipeProcessorConstructor.reflectPluginByKey(pluginName)) { return processor.getClass().getSuperclass() == parentClass; - } catch (Exception e) { + } catch (final Exception e) { return false; } }) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index f16c6387cf570..e0868156ca9d8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -98,7 +98,7 @@ protected PipeSubtaskExecutor( public final synchronized void register(final PipeSubtask subtask) { if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) { - LOGGER.warn("The subtask {} is already registered.", subtask.getTaskID()); + LOGGER.warn("The subtask {} is already registered.", getSafeSubtaskStr(subtask.getTaskID())); return; } @@ -107,32 +107,36 @@ public final synchronized void register(final PipeSubtask subtask) { subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, schedulerSupplier(this)); } + private static String getSafeSubtaskStr(final String subtaskID) { + return subtaskID.replaceAll("password=[^,}]*", "password=******"); + } + protected PipeSubtaskScheduler schedulerSupplier(final PipeSubtaskExecutor executor) { return new PipeSubtaskScheduler(executor); } public final synchronized void start(final String subTaskID) { if (!registeredIdSubtaskMapper.containsKey(subTaskID)) { - LOGGER.warn("The subtask {} is not registered.", subTaskID); + LOGGER.warn("The subtask {} is not registered.", getSafeSubtaskStr(subTaskID)); return; } final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID); if (subtask.isSubmittingSelf()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("The subtask {} is already running.", subTaskID); + LOGGER.debug("The subtask {} is already running.", getSafeSubtaskStr(subTaskID)); } } else { subtask.allowSubmittingSelf(); subtask.submitSelf(); ++runningSubtaskNumber; - LOGGER.info("The subtask {} is started to submit self.", subTaskID); + LOGGER.info("The subtask {} is started to submit self.", getSafeSubtaskStr(subTaskID)); } } public final synchronized void stop(final String subTaskID) { if (!registeredIdSubtaskMapper.containsKey(subTaskID)) { - LOGGER.warn("The subtask {} is not registered.", subTaskID); + LOGGER.warn("The subtask {} is not registered.", getSafeSubtaskStr(subTaskID)); return; } @@ -149,9 +153,9 @@ public final synchronized void deregister(final String subTaskID) { if (subtask != null) { try { subtask.close(); - LOGGER.info("The subtask {} is closed successfully.", subTaskID); + LOGGER.info("The subtask {} is closed successfully.", getSafeSubtaskStr(subTaskID)); } catch (final Exception e) { - LOGGER.error("Failed to close the subtask {}.", subTaskID, e); + LOGGER.error("Failed to close the subtask {}.", getSafeSubtaskStr(subTaskID), e); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java index 095bd24307362..04cf7f27bd203 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java @@ -291,6 +291,9 @@ public void close() throws Exception { //////////////////////////// APIs provided for metric framework //////////////////////////// public long getUnTransferredEventCount() { + if (Objects.isNull(pipeTaskMeta)) { + return 0L; + } return !(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) ? getListeningQueue().getTailIndex() - ((MetaProgressIndex) pipeTaskMeta.getProgressIndex()).getIndex() diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java index 91a149c9f6690..33acc06123e76 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java @@ -30,20 +30,25 @@ import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import javax.annotation.Nonnull; + import java.util.Arrays; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_SKIP_IF_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_SKIP_IF_KEY; @@ -154,13 +159,14 @@ private void validateDoubleLiving(final PipeParameters parameters) { public void customize( final PipeParameters parameters, final PipeExtractorRuntimeConfiguration configuration) throws Exception { - final PipeTaskSourceRuntimeEnvironment environment = - ((PipeTaskSourceRuntimeEnvironment) configuration.getRuntimeEnvironment()); + final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); regionId = environment.getRegionId(); pipeName = environment.getPipeName(); creationTime = environment.getCreationTime(); taskID = pipeName + "_" + regionId + "_" + creationTime; - pipeTaskMeta = environment.getPipeTaskMeta(); + if (environment instanceof PipeTaskSourceRuntimeEnvironment) { + pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment) environment).getPipeTaskMeta(); + } final boolean isDoubleLiving = parameters.getBooleanOrDefault( @@ -199,8 +205,15 @@ public void customize( userEntity.setAuditLogOperation(AuditLogOperation.QUERY); skipIfNoPrivileges = getSkipIfNoPrivileges(parameters); + final String password = + parameters.getStringByKeys(EXTRACTOR_IOTDB_PASSWORD_KEY, SOURCE_IOTDB_PASSWORD_KEY); + if (Objects.nonNull(password)) { + login(password); + } } + protected abstract void login(final @Nonnull String password); + public static boolean getSkipIfNoPrivileges(final PipeParameters extractorParameters) { final String extractorSkipIfValue = extractorParameters diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index b6569d943fbd8..92312ee81a307 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -445,6 +445,7 @@ struct TAuthizedPatternTreeResp { struct TLoginReq { 1: required string userrname 2: required string password + 3: optional bool useEncryptedPassword } // reqtype : tree, relational, system