Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4258d74

Browse filesBrowse files
sollhuiYour Name
authored and
Your Name
committed
[fix](cloud) fix routine load loss data when fe master node restart (#46149)
In cloud mode, routine load loss data when fe master node restart. When updating progress, in order to avoid small values covering large values, we introduced pr #39313, Due to the pr that the routine load replays progress metadata by first obtaining the set default offset and then pulling metadata from meta service to update the local value, if the metadata pulled from meta service is not larger than the set default offset, the correct value cannot be assigned to memory. To solve this problem, pulling metadata from meta service when restart, determine whether to obtain default offset from Kafka based on the pulled value.
1 parent 1d4c142 commit 4258d74
Copy full SHA for 4258d74

File tree

Expand file treeCollapse file tree

2 files changed

+5
-5
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+5
-5
lines changed

‎fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,11 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad
367367

368368
@Override
369369
protected void unprotectUpdateProgress() throws UserException {
370+
// For cloud mode, should update cloud progress from meta service,
371+
// then update progress with default offset from Kafka if necessary.
372+
if (Config.isCloudMode()) {
373+
updateCloudProgress();
374+
}
370375
updateNewPartitionProgress();
371376
}
372377

‎fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
-5Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.doris.load.routineload;
1919

2020
import org.apache.doris.catalog.Env;
21-
import org.apache.doris.common.Config;
2221
import org.apache.doris.common.FeConstants;
2322
import org.apache.doris.common.LoadException;
2423
import org.apache.doris.common.MetaNotFoundException;
@@ -79,10 +78,6 @@ private void process() throws UserException {
7978
RoutineLoadJob.JobState errorJobState = null;
8079
UserException userException = null;
8180
try {
82-
if (Config.isCloudMode()) {
83-
routineLoadJob.updateCloudProgress();
84-
}
85-
8681
routineLoadJob.prepare();
8782
// judge nums of tasks more than max concurrent tasks of cluster
8883
int desiredConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.