Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2c8e506

Browse filesBrowse files
deardengYour Name
authored and
Your Name
committed
[fix](cloud) Fix cloud decomission and check wal (#47187)
1 parent 25d52ab commit 2c8e506
Copy full SHA for 2c8e506

File tree

6 files changed

+395
-46
lines changed
Filter options

6 files changed

+395
-46
lines changed

‎fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+14-1Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,22 @@ private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> expec
183183
} catch (UserException e) {
184184
LOG.warn("failed to register water shed txn id, decommission be {}", be.getId(), e);
185185
}
186-
be.setDecommissioned(true);
186+
be.setDecommissioning(true);
187187
}
188188
}
189+
190+
if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED) {
191+
// When the synchronization status of the node is "NODE_STATUS_DECOMMISSIONED",
192+
// it indicates that the conditions for decommissioning have
193+
// already been checked in CloudTabletRebalancer.java,
194+
// such as the tablets having been successfully migrated and no remnants of WAL on the backend (BE).
195+
if (!be.isDecommissioned()) {
196+
LOG.warn("impossible status, somewhere has bug, backend: {} status: {}", be, status);
197+
}
198+
be.setDecommissioned(true);
199+
// edit log
200+
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
201+
}
189202
}
190203
}
191204

‎fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+48-41Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -483,46 +483,53 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
483483
LOG.info("backend {} not found", beId);
484484
continue;
485485
}
486-
if ((backend.isDecommissioned() && tabletNum == 0 && !backend.isActive())
487-
|| (backend.isDecommissioned() && beList.size() == 1)) {
488-
LOG.info("check decommission be {} state {} tabletNum {} isActive {} beList {}",
489-
backend.getId(), backend.isDecommissioned(), tabletNum, backend.isActive(), beList);
490-
if (!beToDecommissionedTime.containsKey(beId)) {
491-
LOG.info("prepare to notify meta service be {} decommissioned", backend.getId());
492-
Cloud.AlterClusterRequest.Builder builder =
493-
Cloud.AlterClusterRequest.newBuilder();
494-
builder.setCloudUniqueId(Config.cloud_unique_id);
495-
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);
496-
497-
Cloud.ClusterPB.Builder clusterBuilder =
498-
Cloud.ClusterPB.newBuilder();
499-
clusterBuilder.setClusterName(backend.getCloudClusterName());
500-
clusterBuilder.setClusterId(backend.getCloudClusterId());
501-
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);
502-
503-
Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
504-
nodeBuilder.setIp(backend.getHost());
505-
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
506-
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
507-
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);
508-
509-
clusterBuilder.addNodes(nodeBuilder);
510-
builder.setCluster(clusterBuilder);
511-
512-
Cloud.AlterClusterResponse response;
513-
try {
514-
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
515-
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
516-
LOG.warn("notify decommission response: {}", response);
517-
}
518-
LOG.info("notify decommission response: {} ", response);
519-
} catch (RpcException e) {
520-
LOG.info("failed to notify decommission", e);
521-
return;
522-
}
523-
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
486+
if (!backend.isDecommissioning()) {
487+
continue;
488+
}
489+
// here check wal
490+
long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
491+
LOG.info("check decommissioning be {} state {} tabletNum {} isActive {} beList {}, wal num {}",
492+
backend.getId(), backend.isDecommissioning(), tabletNum, backend.isActive(), beList, walNum);
493+
if ((tabletNum != 0 || backend.isActive() || walNum != 0) && beList.size() != 1) {
494+
continue;
495+
}
496+
if (beToDecommissionedTime.containsKey(beId)) {
497+
continue;
498+
}
499+
LOG.info("prepare to notify meta service be {} decommissioned", backend.getAddress());
500+
Cloud.AlterClusterRequest.Builder builder =
501+
Cloud.AlterClusterRequest.newBuilder();
502+
builder.setCloudUniqueId(Config.cloud_unique_id);
503+
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);
504+
505+
Cloud.ClusterPB.Builder clusterBuilder =
506+
Cloud.ClusterPB.newBuilder();
507+
clusterBuilder.setClusterName(backend.getCloudClusterName());
508+
clusterBuilder.setClusterId(backend.getCloudClusterId());
509+
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);
510+
511+
Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
512+
nodeBuilder.setIp(backend.getHost());
513+
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
514+
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
515+
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);
516+
517+
clusterBuilder.addNodes(nodeBuilder);
518+
builder.setCluster(clusterBuilder);
519+
520+
Cloud.AlterClusterResponse response;
521+
try {
522+
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
523+
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
524+
LOG.warn("notify decommission response: {}", response);
525+
continue;
524526
}
527+
LOG.info("notify decommission response: {} ", response);
528+
} catch (RpcException e) {
529+
LOG.warn("failed to notify decommission", e);
530+
continue;
525531
}
532+
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
526533
}
527534
}
528535
}
@@ -884,7 +891,7 @@ private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTabl
884891
LOG.info("backend {} not found", be);
885892
continue;
886893
}
887-
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioned()
894+
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioning()
888895
&& !backend.isSmoothUpgradeSrc()) {
889896
destBe = be;
890897
minTabletsNum = tabletNum;
@@ -898,7 +905,7 @@ private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTabl
898905
LOG.info("backend {} not found", be);
899906
continue;
900907
}
901-
if (backend.isDecommissioned() && tabletNum > 0) {
908+
if (backend.isDecommissioning() && tabletNum > 0) {
902909
srcBe = be;
903910
srcDecommissioned = true;
904911
break;
@@ -967,7 +974,7 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
967974
for (Long be : bes) {
968975
long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size();
969976
Backend backend = cloudSystemInfoService.getBackend(be);
970-
if (backend != null && !backend.isDecommissioned()) {
977+
if (backend != null && !backend.isDecommissioning()) {
971978
beNum++;
972979
}
973980
totalTabletsNum += tabletNum;

‎fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.common.ErrorCode;
2727
import org.apache.doris.common.ErrorReport;
2828
import org.apache.doris.common.LoadException;
29+
import org.apache.doris.common.util.DebugPointUtil;
2930
import org.apache.doris.common.util.SlidingWindowCounter;
3031
import org.apache.doris.mysql.privilege.Auth;
3132
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@@ -127,6 +128,11 @@ private boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
127128
}
128129

129130
public long getAllWalQueueSize(Backend backend) {
131+
long getAllWalQueueSizeDP = DebugPointUtil.getDebugParamOrDefault("FE.GET_ALL_WAL_QUEUE_SIZE", -1L);
132+
if (getAllWalQueueSizeDP > 0) {
133+
LOG.info("backend id:" + backend.getHost() + ",use dp all wal size:" + getAllWalQueueSizeDP);
134+
return getAllWalQueueSizeDP;
135+
}
130136
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
131137
.setTableId(-1)
132138
.build();

‎fe/fe-core/src/main/java/org/apache/doris/system/Backend.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+14Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class Backend implements Writable {
9797
@SerializedName("isDecommissioned")
9898
private AtomicBoolean isDecommissioned;
9999

100+
private AtomicBoolean isDecommissioning = new AtomicBoolean(false);
101+
100102
// rootPath -> DiskInfo
101103
@SerializedName("disksRef")
102104
private volatile ImmutableMap<String, DiskInfo> disksRef;
@@ -404,6 +406,14 @@ public boolean setDecommissioned(boolean isDecommissioned) {
404406
return false;
405407
}
406408

409+
public boolean setDecommissioning(boolean isDecommissioning) {
410+
if (this.isDecommissioning.compareAndSet(!isDecommissioning, isDecommissioning)) {
411+
LOG.warn("{} set decommissioning: {}", this.toString(), isDecommissioning);
412+
return true;
413+
}
414+
return false;
415+
}
416+
407417
public void setHost(String host) {
408418
this.host = host;
409419
}
@@ -490,6 +500,10 @@ public boolean isDecommissioned() {
490500
return this.isDecommissioned.get();
491501
}
492502

503+
public boolean isDecommissioning() {
504+
return this.isDecommissioning.get();
505+
}
506+
493507
public boolean isQueryAvailable() {
494508
return isAlive() && !isQueryDisabled() && !isShutDown.get();
495509
}

‎regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy

Copy file name to clipboardExpand all lines: regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+12-4Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2382,15 +2382,19 @@ class Suite implements GroovyInterceptable {
23822382
}
23832383
}
23842384

2385-
def get_cluster = { be_unique_id ->
2385+
def get_cluster = { be_unique_id , MetaService ms=null->
23862386
def jsonOutput = new JsonOutput()
23872387
def map = [instance_id: "${instance_id}", cloud_unique_id: "${be_unique_id}" ]
23882388
def js = jsonOutput.toJson(map)
23892389
log.info("get cluster req: ${js} ".toString())
23902390

23912391
def add_cluster_api = { request_body, check_func ->
23922392
httpTest {
2393-
endpoint context.config.metaServiceHttpAddress
2393+
if (ms) {
2394+
endpoint ms.host+':'+ms.httpPort
2395+
} else {
2396+
endpoint context.config.metaServiceHttpAddress
2397+
}
23942398
uri "/MetaService/http/get_cluster?token=${token}"
23952399
body request_body
23962400
check check_func
@@ -2563,7 +2567,7 @@ class Suite implements GroovyInterceptable {
25632567
}
25642568
}
25652569

2566-
def d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
2570+
def d_node = { be_unique_id, ip, port, cluster_name, cluster_id, MetaService ms=null ->
25672571
def jsonOutput = new JsonOutput()
25682572
def clusterInfo = [
25692573
type: "COMPUTE",
@@ -2583,7 +2587,11 @@ class Suite implements GroovyInterceptable {
25832587

25842588
def d_cluster_api = { request_body, check_func ->
25852589
httpTest {
2586-
endpoint context.config.metaServiceHttpAddress
2590+
if (ms) {
2591+
endpoint ms.host+':'+ms.httpPort
2592+
} else {
2593+
endpoint context.config.metaServiceHttpAddress
2594+
}
25872595
uri "/MetaService/http/decommission_node?token=${token}"
25882596
body request_body
25892597
check check_func

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.