From 48dd9b7aea904c0bf0dd29d84a2a07e80b7e7aef Mon Sep 17 00:00:00 2001 From: wholvecoding <1357881264@qq.com> Date: Tue, 30 Dec 2025 17:57:05 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=BA=86=E7=AE=80?= =?UTF-8?q?=E5=8D=95=E7=9A=84=E6=B5=81=E7=A8=8B=EF=BC=8C=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E9=97=AE=E9=A2=981.=E8=8A=82=E7=82=B9=E4=B8=8B=E7=BA=BF?= =?UTF-8?q?=E6=9C=AA=E8=83=BD=E5=90=8C=E6=AD=A5=E5=88=B0storage=E9=87=8C?= =?UTF-8?q?=E9=9D=A22.=E7=94=A8=E6=88=B7=E6=89=A7=E8=A1=8Cput=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E4=B8=BB=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=B2=A1=E6=9C=89=E5=88=86=E5=8F=91=E5=88=B0=E4=BB=8E?= =?UTF-8?q?=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dkv/dkvclient/DkvClientApplication.java | 12 +- .../com/dkv/dkvclient/client/DkvClient.java | 30 +- .../controller/ClientController.java | 13 + dkv-common/pom.xml | 54 ++-- dkv-master/pom.xml | 6 + .../dkv/dkvmaster/DkvMasterApplication.java | 2 +- .../dkv/dkvmaster/cluster/ClusterManager.java | 16 +- .../controller/MasterController.java | 31 +- .../dkv/dkvmaster/dto/StartNodeRequest.java | 34 ++ .../router/ConsistentHashRouter.java | 107 ++++++- .../service/NodeOrchestrationService.java | 81 +++++ .../src/main/resources/static/index.html | 155 +++++++++- dkv-storage/pom.xml | 78 +++-- .../com/dkv/dkvstorage/ConsistencyTest.java | 2 + .../dkvstorage/DataNodeAgentApplication.java | 11 + .../com/dkv/dkvstorage/DataNodeLauncher.java | 2 +- .../dkv/dkvstorage/agent/DataNodeAgent.java | 23 +- .../controller/AgentController.java | 75 +++++ .../controller/DataNodeController.java | 149 ++------- .../com/dkv/dkvstorage/rocksdb/DataNode.java | 8 +- .../dkvstorage/rocksdb/DataNodeManager.java | 118 ++++--- .../service/DataNodeAgentService.java | 136 ++++++++ .../src/main/resources/application.properties | 1 + simplerocksdb/000008.sst | Bin 1003 -> 0 bytes simplerocksdb/000009.log | Bin 56 -> 0 bytes simplerocksdb/CURRENT | 1 - simplerocksdb/IDENTITY | 1 - simplerocksdb/LOCK | 0 simplerocksdb/LOG | 290 ------------------ simplerocksdb/LOG.old.1766853082893003 | 281 ----------------- simplerocksdb/MANIFEST-000010 | Bin 172 -> 0 bytes simplerocksdb/OPTIONS-000007 | 198 ------------ simplerocksdb/OPTIONS-000012 | 198 ------------ 33 files changed, 856 insertions(+), 1257 deletions(-) create mode 100644 dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java create mode 100644 dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java create mode 100644 dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeAgentApplication.java create mode 100644 dkv-storage/src/main/java/com/dkv/dkvstorage/controller/AgentController.java create mode 100644 dkv-storage/src/main/java/com/dkv/dkvstorage/service/DataNodeAgentService.java create mode 100644 dkv-storage/src/main/resources/application.properties delete mode 100644 simplerocksdb/000008.sst delete mode 100644 simplerocksdb/000009.log delete mode 100644 simplerocksdb/CURRENT delete mode 100644 simplerocksdb/IDENTITY delete mode 100644 simplerocksdb/LOCK delete mode 100644 simplerocksdb/LOG delete mode 100644 simplerocksdb/LOG.old.1766853082893003 delete mode 100644 simplerocksdb/MANIFEST-000010 delete mode 100644 simplerocksdb/OPTIONS-000007 delete mode 100644 simplerocksdb/OPTIONS-000012 diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java b/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java index 4ab5c12..245becc 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java @@ -13,12 +13,12 @@ public class DkvClientApplication { public static void main(String[] args) { // 启动 Spring Boot Web 应用 DkvClient client = new DkvClient("127.0.0.1:2181"); - try { - client.put("name", "hty".getBytes()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println("Put finished"); +// try { +// client.put("name", "hty".getBytes()); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// System.out.println("Put finished"); SpringApplication.run(DkvClientApplication.class, args); } diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java b/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java index 94ec6e1..55ac4e4 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java @@ -13,9 +13,11 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.Watcher; +import org.springframework.web.client.RestTemplate; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -66,20 +68,25 @@ private void updateNodes() throws Exception { /** 简单一致性哈希计算目标节点 */ private String getTargetIp(String key) { - synchronized (nodes) { - if (nodes.isEmpty()) { - throw new RuntimeException("No available nodes in ZooKeeper!"); - } - int hash = Math.abs(key.hashCode()); - int idx = hash % nodes.size(); - return nodes.get(idx); + String url = "http://localhost:8081/api/route?key=" + key; + + RestTemplate restTemplate = new RestTemplate(); + Map response = restTemplate.getForObject(url, Map.class); + + if (response == null || !response.containsKey("primary")) { + return null; } + + return (String) response.get("primary"); } /** PUT 操作 */ - public void put(String key, byte[] value) throws InterruptedException { + public String put(String key, byte[] value) throws InterruptedException { KvMessage message = new KvMessage(KvMessage.Type.PUT, key, value); - sendRequest(getTargetIp(key), message); + String primaryNode = getTargetIp(key); + System.out.println("向节点{}发送请求"+primaryNode); + sendRequest(primaryNode, message); + return primaryNode; } /** GET 操作 */ @@ -149,4 +156,9 @@ protected void initChannel(Channel ch) { group.shutdownGracefully(); } } + public byte[] testIP(String IP, String key) throws InterruptedException { + KvMessage request = new KvMessage(KvMessage.Type.GET, key, null); + KvMessage response = sendRequest(IP, request); + return response != null ? response.getValue() : null; + } } diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java b/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java index b269e16..96dad30 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java @@ -65,4 +65,17 @@ public String delete(@RequestParam String key) { return "Failed to delete key: " + key + ", error: " + e.getMessage(); } } + @GetMapping("/test") + public String testIp(@RequestParam String IP, @RequestParam String key) { + try { + byte[] value = client.testIP(IP, key); + if (value == null) { + return "Key not found: " + key; + } + return new String(value); + } catch (Exception e) { + e.printStackTrace(); + return "Failed to get key: " + key + ", error: " + e.getMessage(); + } + } } diff --git a/dkv-common/pom.xml b/dkv-common/pom.xml index 7c08a79..2938538 100644 --- a/dkv-common/pom.xml +++ b/dkv-common/pom.xml @@ -1,52 +1,56 @@ - 4.0.0 + org.springframework.boot spring-boot-starter-parent - 4.0.1 - + 3.3.4 + com.dkv dkv-common 0.0.1-SNAPSHOT dkv-common - jar - dkv-common - - - - - - - - - - - - - + DKV 系统的通用工具包(哈希算法、常量、POJO) + 17 + 33.4.0-jre + + + com.google.guava + guava + ${guava.version} + org.projectlombok lombok - 1.18.24 - provided + true - + + + + + - com.google.guava - guava - 33.4.8-jre + com.fasterxml.jackson.core + jackson-databind + + org.springframework.boot + spring-boot-starter-test + test + + @@ -59,4 +63,4 @@ - + \ No newline at end of file diff --git a/dkv-master/pom.xml b/dkv-master/pom.xml index 48e86bb..75b1e7b 100644 --- a/dkv-master/pom.xml +++ b/dkv-master/pom.xml @@ -61,6 +61,12 @@ org.springframework.boot spring-boot-starter-web + + com.dkv + dkv-storage + 0.0.1-SNAPSHOT + compile + diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java b/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java index a53403e..a8fecb3 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java @@ -8,7 +8,7 @@ import java.util.Scanner; -@SpringBootApplication +@SpringBootApplication(scanBasePackages = {"com.dkv.dkvmaster", "com.dkv.dkvstorage"}) public class DkvMasterApplication implements CommandLineRunner { private final ClusterManager clusterManager; diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java b/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java index 7e58227..0a9bbb3 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java @@ -68,14 +68,14 @@ else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { // 在 ClusterManager.java 中添加或修改 public void addNodeToZk(String nodeIp, Integer port) throws Exception { String path = "/nodes/" + nodeIp+":"+port; - DataNode node = new DataNode( - config.id, - dataDir, - config.port, - config.isPrimary, - replicaNodes, - configs.length // 复制因子即总节点数 - ); +// DataNode node = new DataNode( +// config.id, +// dataDir, +// config.port, +// config.isPrimary, +// replicaNodes, +// configs.length // 复制因子即总节点数 +// ); // 检查节点是否存在,不存在则创建 if (client.checkExists().forPath(path) == null) { diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java b/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java index 6c1eb5f..d5715b3 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java @@ -1,13 +1,13 @@ package com.dkv.dkvmaster.controller; import com.dkv.dkvmaster.cluster.ClusterManager; +import com.dkv.dkvmaster.dto.StartNodeRequest; import com.dkv.dkvmaster.router.ConsistentHashRouter; +import com.dkv.dkvmaster.service.NodeOrchestrationService; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @RestController @RequestMapping("/api") @@ -16,10 +16,12 @@ public class MasterController { private final ConsistentHashRouter router; private final ClusterManager clusterManager; + private final NodeOrchestrationService orchestrationService; - public MasterController(ConsistentHashRouter router, ClusterManager clusterManager) { + public MasterController(ConsistentHashRouter router, ClusterManager clusterManager, NodeOrchestrationService orchestrationService) { this.router = router; this.clusterManager = clusterManager; + this.orchestrationService = orchestrationService; } // 查看当前所有在线节点 @@ -35,7 +37,7 @@ public List getOnlineNodes() { public void addNodes(@RequestParam("nodeip") String nodeIp,@RequestParam("port")Integer port){ router.addNode(nodeIp,port); } - @GetMapping("/addtozk") + @PostMapping("/addtozk") public void addNodes2ZK(@RequestParam("nodeip") String nodeIp, @RequestParam("port")Integer port ) throws Exception { clusterManager.addNodeToZk(nodeIp,port); @@ -45,6 +47,11 @@ public void deleteNode(@RequestParam("nodeip") String nodeIp,@RequestParam("port clusterManager.offlineNode(nodeIp, port); } + @GetMapping("/debug/ring") + public Object getRing() { + // 打印当前环中所有去重后的物理节点 + return router.getRings(); + } // 查询 Key 的路由信息 @GetMapping("/route") public Map getRoute(@RequestParam String key, @RequestParam(defaultValue = "3") int replicas) { @@ -57,4 +64,16 @@ public Map getRoute(@RequestParam String key, @RequestParam(defa response.put("replicas", targets.size() <= 1 ? Collections.emptyList() : targets.subList(1, targets.size())); return response; } + + @PostMapping("/datanode/start-and-join-zk") + public ResponseEntity> startAndJoinZk(@RequestBody StartNodeRequest request) { + try { + Map res = orchestrationService.startAndRegister(request); + boolean ok = Boolean.TRUE.equals(res.get("success")); + return ok ? ResponseEntity.ok(res) : ResponseEntity.badRequest().body(res); + } catch (Exception e) { + return ResponseEntity.internalServerError() + .body(Map.of("success", false, "stage", "UNEXPECTED", "error", e.getMessage())); + } + } } \ No newline at end of file diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java b/dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java new file mode 100644 index 0000000..a5abbc1 --- /dev/null +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java @@ -0,0 +1,34 @@ +package com.dkv.dkvmaster.dto; + +public class StartNodeRequest { + private String nodeId; + private String host; + private Integer port; + private String dataDir; + private Boolean isPrimary = true; + private String replicas = ""; + + // 可选:是否同步加入一致性哈希环 + private Boolean addToRing = true; + + public String getNodeId() { return nodeId; } + public void setNodeId(String nodeId) { this.nodeId = nodeId; } + + public String getHost() { return host; } + public void setHost(String host) { this.host = host; } + + public Integer getPort() { return port; } + public void setPort(Integer port) { this.port = port; } + + public String getDataDir() { return dataDir; } + public void setDataDir(String dataDir) { this.dataDir = dataDir; } + + public Boolean getIsPrimary() { return isPrimary; } + public void setIsPrimary(Boolean primary) { isPrimary = primary; } + + public String getReplicas() { return replicas; } + public void setReplicas(String replicas) { this.replicas = replicas; } + + public Boolean getAddToRing() { return addToRing; } + public void setAddToRing(Boolean addToRing) { this.addToRing = addToRing; } +} diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java b/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java index 8010495..74e77f4 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java @@ -19,6 +19,9 @@ public class ConsistentHashRouter { // 虚拟节点数:每个物理节点在环上变成 10 个虚拟节点,解决数据倾斜问题 private static final int VIRTUAL_NODES = 10; + public Set getRings() { + return new HashSet<>(ring.values()); + } /** * 添加物理节点 * @param nodeIp 例如 "192.168.1.5:8080" @@ -28,8 +31,9 @@ public synchronized void addNode(String nodeIp,Integer port ) { // 构造虚拟节点名称,例如 "192.168.1.5:8080#1" //作用:让数据分布更加均匀,避免数据倾斜 String virtualNodeName = nodeIp +":"+port+ "#" + i; + int hash = HashUtil.getHash(virtualNodeName); - ring.put(hash, nodeIp); + ring.put(hash, nodeIp +":"+port); } // System.out.println("节点上线: " + nodeIp + ",当前环大小: " + ring.size()); } @@ -96,7 +100,7 @@ public synchronized List routeNodeWithReplicas(String key, int replicas) Iterator headIterator = ring.values().iterator(); // 先从当前位置往后找 - while (tailIterator.hasNext() && nodes.size() < replicas) { + while (tailIterator.hasNext() && nodes.size() <=replicas) { nodes.add(tailIterator.next()); } @@ -107,4 +111,103 @@ public synchronized List routeNodeWithReplicas(String key, int replicas) return new ArrayList<>(nodes); } + // 在 ConsistentHashRouter.java 中添加 + public synchronized String getPredecessorNode(int hash) { + SortedMap headMap = ring.headMap(hash); + if (headMap.isEmpty()) { + return ring.get(ring.lastKey()); // 环状逻辑:取最后一个节点 + } + return ring.get(headMap.lastKey()); + } + + public synchronized String getSuccessorNode(int hash) { + SortedMap tailMap = ring.tailMap(hash + 1); // 找严格大于当前hash的 + if (tailMap.isEmpty()) { + return ring.get(ring.firstKey()); // 环状逻辑:取第一个节点 + } + return ring.get(tailMap.firstKey()); + } +// public synchronized void addNodeWithBalancedMigration(String nodeIp, Integer port) { +// List affectedHashes = new ArrayList<>(); +// // 计算新节点的虚拟节点哈希值 +// for (int i = 0; i < VIRTUAL_NODES; i++) { +// String virtualNodeName = nodeIp + ":" + port + "#" + i; +// int hash = HashUtil.getHash(virtualNodeName); +// affectedHashes.add(hash); +// } +// +// // 计算新节点相邻的前后节点,并迁移数据 +// for (int i = 0; i < VIRTUAL_NODES; i++) { +// String virtualNodeName = nodeIp + ":" + port + "#" + i; +// int hash = HashUtil.getHash(virtualNodeName); +// +// // 获取新节点相邻的前驱节点和后继节点 +// String predecessor = getPredecessorNode(hash); +// String successor = getSuccessorNode(hash); +// +// // 仅迁移受影响的部分数据 +// migrateData(predecessor, successor, nodeIp, port); +// } +// +// // 执行节点添加 +// for (int i = 0; i < VIRTUAL_NODES; i++) { +// String virtualNodeName = nodeIp + ":" + port + "#" + i; +// int hash = HashUtil.getHash(virtualNodeName); +// ring.put(hash, nodeIp + ":" + port); +// } +// } + + // 数据迁移方法:根据前后节点获取受影响的数据并迁移 +// private void migrateData(String predecessor, String successor, String nodeIp, Integer port) { +// List affectedKeys = getAffectedKeys(predecessor, successor); +// +// for (String key : affectedKeys) { +// String currentNode = routeNode(key); +// if (!currentNode.equals(nodeIp + ":" + port)) { +// // 迁移数据到新节点 +// migrateKeyToNewNode(key, nodeIp + ":" + port); +// } +// } +// } + + // 获取受影响的键:假设从前驱节点到后继节点之间的键都需要迁移 +// private List getAffectedKeys(String predecessor, String successor) { +// // 这里的实现依赖于具体的数据存储系统,我们假设RocksDB支持按范围查询 +// List keys = new ArrayList<>(); +// try { +// // 假设你有一个方法来查询指定范围内的所有键 +// ReadOptions readOptions = new ReadOptions(); +// RocksIterator iterator = rocksDB.newIterator(readOptions); +// +// // 从前驱节点的哈希值开始,直到后继节点的哈希值 +// // 假设 getHash 生成的哈希值与键的顺序相关,实际中需要依据你的键的设计来调整 +// byte[] startKey = predecessor.getBytes(); +// byte[] endKey = successor.getBytes(); +// +// iterator.seek(startKey); +// while (iterator.isValid() && Arrays.compare(iterator.key(), endKey) < 0) { +// keys.add(new String(iterator.key())); +// iterator.next(); +// } +// } catch (RocksDBException e) { +// e.printStackTrace(); +// } +// return keys; +// } +// +// // 迁移数据到新节点 +// private void migrateKeyToNewNode(String key, String newNode) { +// try { +// // 从原节点读取数据并存储到新节点 +// byte[] value = rocksDB.get(key.getBytes()); +// +// // 将数据迁移到新的节点(假设有方法支持这种操作) +// // 这里只是示例,具体的操作可以依据你的存储设计 +// // 保存数据到新节点,可以选择将数据写入新节点相关的区域 +// rocksDB.put(key.getBytes(), value); +// } catch (RocksDBException e) { +// e.printStackTrace(); +// } +// } + } \ No newline at end of file diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java b/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java new file mode 100644 index 0000000..9ebc0da --- /dev/null +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java @@ -0,0 +1,81 @@ +package com.dkv.dkvmaster.service; + +import com.dkv.dkvmaster.cluster.ClusterManager; +import com.dkv.dkvmaster.dto.StartNodeRequest; +import com.dkv.dkvmaster.router.ConsistentHashRouter; +import com.dkv.dkvstorage.rocksdb.DataNodeManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; + + +@Service +public class NodeOrchestrationService { + + @Autowired + private final DataNodeManager dataNodeManager; + private final ClusterManager clusterManager; + private final ConsistentHashRouter router; + + public NodeOrchestrationService( + DataNodeManager dataNodeManager, + ClusterManager clusterManager, + ConsistentHashRouter router + ) { + this.dataNodeManager = dataNodeManager; + this.clusterManager = clusterManager; + this.router = router; + } + + public Map startAndRegister(StartNodeRequest req) throws Exception { + String nodeId = req.getNodeId(); + String host = req.getHost(); + int port = req.getPort(); + String dataDir = req.getDataDir(); + boolean isPrimary = req.getIsPrimary() != null && req.getIsPrimary(); + String replicas = req.getReplicas() == null ? "" : req.getReplicas(); + + Map result = new HashMap<>(); + result.put("nodeId", nodeId); + result.put("host", host); + result.put("port", port); + + // 1) start datanode + boolean started = dataNodeManager.startDataNode(nodeId, host, port, dataDir, isPrimary, replicas); + if (!started) { + result.put("success", false); + result.put("stage", "START_DATANODE"); + result.put("message", "DataNode 启动失败:端口冲突/路径非法/实例已存在等"); + return result; + } + + // 2) register to zk (失败需要回滚) + try { + clusterManager.addNodeToZk(host, port); + result.put("zkRegistered", true); + } catch (Exception zkEx) { + // 回滚:把刚启动的节点停掉,避免“启动了但集群不可见” + dataNodeManager.stopDataNode(nodeId); + + result.put("success", false); + result.put("stage", "REGISTER_ZK"); + result.put("zkRegistered", false); + result.put("message", "注册 ZK 失败,已回滚 stop DataNode: " + zkEx.getMessage()); + return result; + } + + // 3) optionally add to ring + if (req.getAddToRing() != null && req.getAddToRing()) { + router.addNode(host, port); + result.put("ringAdded", true); + } else { + result.put("ringAdded", false); + } + + result.put("success", true); + result.put("stage", "DONE"); + return result; + } +} diff --git a/dkv-master/src/main/resources/static/index.html b/dkv-master/src/main/resources/static/index.html index 50160ce..65f9187 100644 --- a/dkv-master/src/main/resources/static/index.html +++ b/dkv-master/src/main/resources/static/index.html @@ -124,8 +124,14 @@
🎡 一致性哈希可视化
🔍 路由模拟追踪

模拟 Key 在环上的落点物理节点

- - +
+ + + 副本数 + + + +
目标主节点 (Primary):
@@ -151,6 +157,7 @@
🔍 路由模拟追踪
const MASTER_API = "/api"; const CLIENT_API = "http://127.0.0.1:8082/api/kv"; let myChart = echarts.init(document.getElementById('ring-chart')); + const STORAGE_API = "http://127.0.0.1:8085"; function refreshStatus() { fetch(`${MASTER_API}/nodes`) @@ -184,57 +191,175 @@
🔍 路由模拟追踪
const key = document.getElementById("kv-key").value; const value = document.getElementById("kv-value").value; const resBox = document.getElementById("kv-result"); - if(!key) return; + + if (!key) return; let url = `${CLIENT_API}/${type}?key=${key}`; - if(type === 'save') url += `&value=${encodeURIComponent(value)}`; + if (type === 'save') url += `&value=${encodeURIComponent(value)}`; resBox.innerHTML += `
> [${type.toUpperCase()}] ${key}
`; fetch(url, { method: (type === 'save' ? 'POST' : (type === 'delete' ? 'DELETE' : 'GET')) }) .then(res => res.text()) .then(data => { - resBox.innerHTML += `
Response: ${data}
`; + resBox.innerHTML += `
Response: ${data}
`; resBox.scrollTop = resBox.scrollHeight; + + // 如果是写入操作,调用 sendClientRequest 向主节点发送请求 + if (type === 'save') { + sendClientRequest(); + } }) .catch(err => { - resBox.innerHTML += `
Error: ${err}
`; + resBox.innerHTML += `
Error: ${err}
`; }); } + function queryRoute() { const key = document.getElementById('route-key').value; - if(!key) return; + const replicas = document.getElementById('route-replicas').value || 3; + + if (!key) return; - fetch(`${MASTER_API}/route?key=${key}`) + // 获取路由信息 + fetch(`${MASTER_API}/route?key=${key}&replicas=${replicas}`) .then(res => res.json()) .then(data => { const resultDiv = document.getElementById('route-result'); resultDiv.classList.remove('d-none'); - // 展示主节点 - document.getElementById('res-primary').innerText = data.primary; + // 获取 primary 节点并拆分为 host 和 port + const [host, port] = data.primary.split(':'); + const replicas = data.replicas; - // 展示副本节点 (Secondary Nodes) + document.getElementById('res-primary').innerText = `${host}:${port}`; + + // 显示副本节点 const secondaryContainer = document.getElementById('res-secondary'); if (secondaryContainer) { if (data.secondary && data.secondary.length > 0) { secondaryContainer.innerHTML = data.secondary - .map(node => `${node}`) + .map(node => `${node}`) .join(''); } else { - secondaryContainer.innerHTML = '无副本节点'; + secondaryContainer.innerHTML = '无可用副本节点'; } } + + // 返回的 primary 和 replicas 已经处理,可以用于后续的客户端操作 + console.log('Primary node:', { host, port }); + console.log('Replicas:', replicas); + + // 可以将 host, port 和 replicas 数组用于后续操作 }) - .catch(err => console.error("路由查询失败:", err)); + .catch(err => { + console.error("路由查询失败:", err); + alert("路由查询请求失败,请检查后端服务是否启动"); + }); } function manageNode(action) { const ip = document.getElementById('node-ip').value; const port = document.getElementById('node-port').value; - fetch(`${MASTER_API}/${action}?nodeip=${ip}&port=${port}`).then(() => refreshStatus()); + const nodeId = `${ip}:${port}`; // 创建 nodeId 为 ip+port + + if (action === 'addtozk') { + // 1. 构建请求体 (Agent 和 Master 共用大部分参数) + const requestBody = { + nodeId:nodeId, + host: ip, + port: parseInt(port), + dataDir: "F:\\javaProject\\distributedKeyValue\\data\\" + ip+"_"+port, + isPrimary: true, + replicas: "", + addToRing: true, + action: 'start' // Agent 需要这个字段,Master 会忽略它 + }; + + // ---------------------- 第一步:调用 Agent 启动物理节点 ---------------------- + fetch(`${STORAGE_API}/agent/command`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(requestBody) + }) + .then(response => response.json()) // 解析 Agent 的响应 + .then(agentData => { + // 2. 检查 Agent 是否启动成功 + if (agentData.success === true) { + console.log('Agent 启动成功,准备向 Master 注册...', agentData); + + // ------------------ 第二步:调用 Master 注册 ZK ------------------ + // 注意:这里 return 另一个 fetch Promise,以便链式调用 + return fetch(`${MASTER_API}/datanode/start-and-join-zk`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(requestBody) // 直接复用参数,Master 会读取 host/port 等字段 + }); + } else { + // 如果 Agent 失败,抛出错误,跳过后续的 then,直接进入 catch + throw new Error("Agent 启动失败: " + (agentData.error || agentData.message)); + } + }) + .then(masterResponse => { + // 3. 处理 Master 的 HTTP 状态 + if (!masterResponse.ok) { + return masterResponse.json().then(err => { throw err; }); + } + return masterResponse.json(); // 解析 Master 的 JSON + }) + .then(masterData => { + // 4. 最终检查 Master 的业务逻辑是否成功 + if (masterData.success) { + console.log('Master 注册成功', masterData); + alert("节点启动并注册成功!"); + refreshStatus(); // 刷新页面表格 + } else { + throw new Error("Master 注册失败: " + masterData.message); + } + }) + .catch(error => { + // 5. 统一错误捕获(无论是 Agent 挂了,还是 Master 挂了,都会走到这里) + console.error('操作流程失败', error); + alert("操作失败: " + (error.message || "未知错误")); + }); + + } else if (action === 'delete') { + // 调用删除节点接口 + fetch(`${MASTER_API}/delete?nodeip=${ip}&port=${port}`, { + method: 'POST' + }).then(() => { + refreshStatus(); + }); + } } + function sendClientRequest() { + const key = document.getElementById('kv-key').value; + const value = document.getElementById('kv-value').value; + const resBox = document.getElementById("kv-result"); + + if (!key) return; + + // 这里使用已经拆分的 primary 和 replicas + // const primaryHost = "127.0.0.1"; // 从上一步获得的 host + // const primaryPort = "7007"; // 从上一步获得的 port + + // 向 primary 节点发送请求 + const url = `${CLIENT_API}/save?key=${encodeURIComponent(key)}&value=${encodeURIComponent(value)}`; + + // resBox.innerHTML += `
> 发送到主节点: ${primaryHost}:${primaryPort}
`; + + fetch(url, { method: 'POST' }) + .then(res => res.text()) + .then(data => { + resBox.innerHTML += `
Response: ${data}
`; + resBox.scrollTop = resBox.scrollHeight; + }) + .catch(err => { + resBox.innerHTML += `
Error: ${err}
`; + }); + } + refreshStatus(); setInterval(refreshStatus, 3000); diff --git a/dkv-storage/pom.xml b/dkv-storage/pom.xml index 73195fa..2b04e76 100644 --- a/dkv-storage/pom.xml +++ b/dkv-storage/pom.xml @@ -29,23 +29,7 @@ ${netty.version} - - org.slf4j - slf4j-api - ${slf4j.version} - - - org.slf4j - slf4j-simple - ${slf4j.version} - - - - org.slf4j - slf4j-simple - ${slf4j.version} - org.junit.jupiter @@ -61,6 +45,19 @@ org.springframework.boot spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-logging + + + + + + org.springframework.boot + spring-boot-starter-log4j2 + 2.2.5.RELEASE @@ -79,14 +76,55 @@ org.apache.curator - curator-client + curator-framework 5.2.0 - compile + + + org.slf4j + slf4j-simple + + + org.slf4j + slf4j-log4j12 + + + org.apache.curator - curator-framework + curator-client + 5.2.0 + + + org.slf4j + slf4j-api + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + + + + + + + + + + @@ -95,4 +133,4 @@ UTF-8 - + \ No newline at end of file diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java index 023556c..2eea862 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java @@ -1,7 +1,9 @@ package com.dkv.dkvstorage; +import com.dkv.dkvcommon.model.*; import com.dkv.dkvcommon.model.KvMessage; + import java.nio.charset.StandardCharsets; import java.util.UUID; diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeAgentApplication.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeAgentApplication.java new file mode 100644 index 0000000..6d9c9a8 --- /dev/null +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeAgentApplication.java @@ -0,0 +1,11 @@ +package com.dkv.dkvstorage; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class DataNodeAgentApplication { + public static void main(String[] args) { + SpringApplication.run(DataNodeAgentApplication.class, args); + } +} diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeLauncher.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeLauncher.java index 87f13bf..77d8957 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeLauncher.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/DataNodeLauncher.java @@ -81,7 +81,7 @@ public static void main(String[] args) throws Exception { node.start(); System.out.println(">>> 节点 [" + config.id + "] 启动成功,端口: " + config.port); } catch (Exception e) { - System.err.println(">>> 节点 [" + config.id + "] 启动失败: " + e.getMessage()); + System.err.println(">>> 节点 [" + config.id +":"+ config.port+ "] 启动失败: " + e.getMessage()); } }).start(); diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java index acaa2ff..f543d00 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java @@ -6,12 +6,9 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; @@ -21,7 +18,7 @@ public class DataNodeAgent { private static final Logger logger = LoggerFactory.getLogger(DataNodeAgent.class); private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final int AGENT_PORT = 8081; + private static final int AGENT_PORT = 8085; private static final Map runningNodes = new ConcurrentHashMap<>(); private static final ExecutorService executor = Executors.newCachedThreadPool(); @@ -40,8 +37,22 @@ public static void main(String[] args) throws Exception { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() - .addLast(new StringDecoder(StandardCharsets.UTF_8)) - .addLast(new StringEncoder(StandardCharsets.UTF_8)) + // 替换掉 StringDecoder,加入 HTTP 支持 + .addLast(new io.netty.handler.codec.http.HttpServerCodec()) + .addLast(new io.netty.handler.codec.http.HttpObjectAggregator(65536)) + .addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest msg) throws Exception { + // 从 HTTP 中提取 Body 字符串 + String content = msg.content().toString(io.netty.util.CharsetUtil.UTF_8); + // 逻辑交给原本的 Handler 或直接在这里处理 + logger.info("Received HTTP JSON: {}", content); + + // 这里可以调用你原来的解析逻辑... + // 注意:HTTP 需要返回标准的 HTTP Response,不能只发 JSON 字符串 + } + } + ) .addLast(new AgentCommandHandler()); } }) diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/AgentController.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/AgentController.java new file mode 100644 index 0000000..7be276c --- /dev/null +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/AgentController.java @@ -0,0 +1,75 @@ +package com.dkv.dkvstorage.controller; + + +import com.dkv.dkvstorage.service.DataNodeAgentService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.Map; + +@RestController +@RequestMapping("/agent") // 基础路径 +@CrossOrigin(origins = "*") +public class AgentController { + + @Autowired + private DataNodeAgentService agentService; + + /** + * 统一入口:兼容之前的 {action: "start", ...} 格式 + * 如果想做更标准的 REST,可以拆分为 @PostMapping("/start") 等 + */ + @PostMapping("/command") + public ResponseEntity> handleCommand(@RequestBody Map payload) { + String action = (String) payload.get("action"); + if (action == null) { + return ResponseEntity.badRequest().body(Map.of("success", false, "error", "Missing action")); + } + + Map result; + try { + switch (action) { + case "start": + result = agentService.startNode( + (String) payload.get("nodeId"), + (String) payload.get("dataDir"), + getInt(payload, "port"), + (boolean) payload.getOrDefault("isPrimary", false), + (String) payload.getOrDefault("replicas", "") + ); + break; + case "stop": + result = agentService.stopNode((String) payload.get("nodeId")); + break; + case "status": + result = agentService.getStatus((String) payload.get("nodeId")); + break; + case "health": + result = agentService.getHealth(); + break; + default: + return ResponseEntity.badRequest().body(Map.of("success", false, "error", "Unknown action")); + } + } catch (Exception e) { + return ResponseEntity.internalServerError().body(Map.of("success", false, "error", e.getMessage())); + } + + return ResponseEntity.ok(result); + } + + // 辅助方法:安全获取 Integer + private int getInt(Map map, String key) { + Object val = map.get(key); + if (val instanceof Number) { + return ((Number) val).intValue(); + } + throw new IllegalArgumentException("Invalid integer for key: " + key); + } + + // 你也可以保留一个简单的 GET 用于浏览器快速检查 + @GetMapping("/health") + public Map healthCheck() { + return agentService.getHealth(); + } +} diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java index b2711fd..460f708 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java @@ -1,177 +1,88 @@ package com.dkv.dkvstorage.controller; import com.dkv.dkvstorage.rocksdb.DataNodeManager; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import io.netty.channel.*; +/** + * DataNode 实例生命周期管理 + */ @RestController @RequestMapping("/api/datanode") public class DataNodeController { + @Autowired private DataNodeManager dataNodeManager; - - /** - * 启动DataNode - * curl -X POST "http://localhost:8080/api/datanode/start" \ - * -H "Content-Type: application/json" \ - * -d '{ - * "nodeId": "node1:9000", - * "dataDir": "/tmp/datanode1", - * "port": 9000, - * "isPrimary": true, - * "replicas": "node2:9001,node3:9002" - * }' - */ + private static final ExecutorService executor = Executors.newCachedThreadPool(); @PostMapping("/start") public ResponseEntity> startDataNode(@RequestBody Map request) { Map response = new HashMap<>(); - try { String nodeId = (String) request.get("nodeId"); String host = (String) request.get("host"); int port = ((Number) request.get("port")).intValue(); String dataDir = (String) request.get("dataDir"); - boolean isPrimary = (boolean) request.get("isPrimary"); - String replicas = (String) request.get("replicas"); + boolean isPrimary = (boolean) request.getOrDefault("isPrimary", true); + String replicas = (String) request.getOrDefault("replicas", ""); boolean success = dataNodeManager.startDataNode(nodeId, host, port, dataDir, isPrimary, replicas); if (success) { response.put("success", true); - response.put("message", "DataNode start requested"); response.put("nodeId", nodeId); - return ResponseEntity.accepted().body(response); // 202 Accepted + return ResponseEntity.accepted().body(response); } else { response.put("success", false); - response.put("message", "Failed to start DataNode"); + response.put("message", "DataNode 启动失败,请检查端口是否冲突或路径是否合法"); return ResponseEntity.badRequest().body(response); } } catch (Exception e) { - e.printStackTrace(); - response.put("success", false); - response.put("message", "Error: " + e.getMessage()); - return ResponseEntity.internalServerError().body(response); + return ResponseEntity.internalServerError().body(Map.of("success", false, "error", e.getMessage())); } } - /** - * 停止DataNode - * curl -X DELETE "http://localhost:8080/api/datanode/stop" \ - * -H "Content-Type: application/json" \ - * -d '{"nodeId": "node1:9000"}' - */ @DeleteMapping("/stop") public ResponseEntity> stopDataNode(@RequestBody Map request) { - Map response = new HashMap<>(); - - try { - String nodeId = (String) request.get("nodeId"); - boolean success = dataNodeManager.stopDataNode(nodeId); - - if (success) { - response.put("success", true); - response.put("message", "DataNode stop requested"); - response.put("nodeId", nodeId); - return ResponseEntity.accepted().body(response); - } else { - response.put("success", false); - response.put("message", "DataNode not found or already stopped"); - return ResponseEntity.badRequest().body(response); - } - } catch (Exception e) { - e.printStackTrace(); - response.put("success", false); - response.put("message", "Error: " + e.getMessage()); - return ResponseEntity.internalServerError().body(response); + String nodeId = (String) request.get("nodeId"); + if (dataNodeManager.stopDataNode(nodeId)) { + return ResponseEntity.ok(Map.of("success", true, "message", "节点已停止", "nodeId", nodeId)); } + return ResponseEntity.badRequest().body(Map.of("success", false, "message", "节点未找到或已停止")); } - /** - * 获取节点状态 - * curl "http://localhost:8080/api/datanode/status?nodeId=node1:9000" - */ @GetMapping("/status") public ResponseEntity> getNodeStatus(@RequestParam String nodeId) { - Map response = new HashMap<>(); - - try { - Map status = dataNodeManager.getNodeStatus(nodeId); - - if ("NOT_FOUND".equals(status.get("status"))) { - response.put("success", false); - response.put("message", status.get("error")); - return ResponseEntity.badRequest().body(response); - } - - response.put("success", true); - response.putAll(status); - return ResponseEntity.ok(response); - - } catch (Exception e) { - e.printStackTrace(); - response.put("success", false); - response.put("message", "Failed to get node status: " + e.getMessage()); - return ResponseEntity.internalServerError().body(response); + Map status = dataNodeManager.getNodeStatus(nodeId); + if (status.containsKey("error")) { + return ResponseEntity.status(404).body(status); } + return ResponseEntity.ok(status); } - - /** - * 获取所有节点 - * curl "http://localhost:8080/api/datanode/nodes" - */ @GetMapping("/nodes") public ResponseEntity> getAllNodes() { - Map response = new HashMap<>(); - try { - var nodes = dataNodeManager.getAllNodes(); + var nodes = dataNodeManager.getAllNodes(); // 假设 DataNodeManager 依然有这个方法 + // 使用新版风格的简写 Map.of 不太适合这里,因为 nodes 是列表 + // 保持原有的结构 + Map response = new HashMap<>(); response.put("success", true); response.put("count", nodes.size()); response.put("nodes", nodes); - return ResponseEntity.ok(response); + return ResponseEntity.ok(response); } catch (Exception e) { - e.printStackTrace(); - response.put("success", false); - response.put("message", "Failed to list nodes: " + e.getMessage()); - return ResponseEntity.internalServerError().body(response); + return ResponseEntity.internalServerError() + .body(Map.of("success", false, "message", "Failed to list nodes: " + e.getMessage())); } } -// /** -// * 集群健康检查 -// * curl "http://localhost:8080/api/datanode/health" -// */ -// @GetMapping("/health") -// public ResponseEntity> healthCheck() { -// Map response = new HashMap<>(); -// -// try { -// var nodes = dataNodeManager.getAllNodes(); -// long runningCount = nodes.stream() -// .filter(n -> "RUNNING".equals(n.get("status"))) -// .count(); -// -// response.put("status",nodes.size() > runningCount / 2 ? "healthy" : "unhealthy"); -// response.put("timestamp", System.currentTimeMillis()); -// response.put("totalNodes", nodes.size()); -// response.put("runningNodes", runningCount); -// response.put("success", true); -// -// return ResponseEntity.ok(response); -// -// } catch (Exception e) { -// e.printStackTrace(); -// response.put("status", "unhealthy"); -// response.put("error", e.getMessage()); -// response.put("success", false); -// return ResponseEntity.internalServerError().body(response); -// } -// } } \ No newline at end of file diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNode.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNode.java index eb8e34d..4a3ef2b 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNode.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNode.java @@ -64,8 +64,8 @@ public void start() throws Exception { // 3. 启动Netty服务器 startNettyServer(); - startNettyServer(); -// registerToZookeeper("127.0.0.1:2181"); +// startNettyServer(); + registerToZookeeper("127.0.0.1:2181"); logger.info("DataNode {} started successfully", nodeId); } @@ -145,8 +145,8 @@ private void registerToZookeeper(String zkAddress ) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy); client.start(); - - String path = "/dkv/nodes/" + this.nodeId + ":"+this.port; + String host = this.nodeId.split(":")[0]; + String path = "/dkv/nodes/" +host + ":"+this.port; if (client.checkExists().forPath(path) == null) { client.create() .creatingParentsIfNeeded() diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNodeManager.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNodeManager.java index e3a0bff..2a3e81e 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNodeManager.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/DataNodeManager.java @@ -6,7 +6,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; import java.util.*; import java.util.concurrent.*; @@ -16,9 +19,9 @@ public class DataNodeManager { private static final Logger logger = LoggerFactory.getLogger(DataNodeManager.class); private static final ObjectMapper objectMapper = new ObjectMapper(); - @Value("${datanode.agent.port:8081}") - private int agentPort = 8081; - + @Value("${datanode.agent.port:8085}") + private int agentPort = 8085; + private static final String AGENT_URL = "http://127.0.0.1:8085/agent/command"; @Value("${datanode.netty.connect.timeout:5000}") private int connectTimeout = 5000; @@ -28,7 +31,10 @@ public class DataNodeManager { private final Map clientPool = new ConcurrentHashMap<>(); // 定时任务执行器 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); - + private final RestTemplate restTemplate = new RestTemplate(); + public DataNodeManager() { + System.out.println("DataNodeManager 实例被创建: " + System.identityHashCode(this)); + } public static class NodeInfo { private String nodeId; private String host; @@ -74,87 +80,67 @@ public enum NodeStatus { /** * 启动DataNode(分布式版本) */ - public boolean startDataNode(String nodeId, String host, int port, String dataDir, - boolean isPrimary, String replicas) { + public boolean startDataNode(String nodeId, String host, int port, String dataDir, boolean isPrimary, String replicas) { + // 1. 本地注册信息 + List replicaList = Arrays.asList((replicas != null ? replicas : "").split(",")); + NodeInfo nodeInfo = new NodeInfo(nodeId, host, port, dataDir, isPrimary, replicaList); + nodeInfo.setStatus(NodeStatus.STARTING); + nodeRegistry.put(nodeId, nodeInfo); + + // 2. 发送 HTTP 请求给 Agent try { - logger.info("Starting DataNode: nodeId={}, port={}, isPrimary={}, replicas={}", - nodeId, port, isPrimary, replicas); + Map payload = new HashMap<>(); + payload.put("action", "start"); + payload.put("nodeId", nodeId); + payload.put("host", host); + payload.put("port", port); + payload.put("dataDir", dataDir); + payload.put("isPrimary", isPrimary); + payload.put("replicas", replicas); + + // 发送 POST 请求 + ResponseEntity response = restTemplate.postForEntity( + AGENT_URL, + payload, + Map.class + ); - if (host == null) { - logger.error("Invalid nodeId format: {}", nodeId); + if (response.getStatusCode() == HttpStatus.OK && + Boolean.TRUE.equals(response.getBody().get("success"))) { + + logger.info("Agent successfully started node {}", nodeId); + nodeInfo.setStatus(NodeStatus.RUNNING); + return true; + } else { + logger.error("Agent failed to start node: {}", response.getBody()); + nodeInfo.setStatus(NodeStatus.ERROR); return false; } - - // 解析副本列表 - List replicaList = parseReplicaList(replicas); - - // 创建节点信息 - NodeInfo nodeInfo = new NodeInfo(nodeId, host, port, dataDir, isPrimary, replicaList); - nodeInfo.setStatus(NodeStatus.STARTING); - nodeRegistry.put(nodeId, nodeInfo); - - // 异步启动节点 - CompletableFuture.runAsync(() -> { - try { - boolean started = startNodeInternal(nodeInfo); - nodeInfo.setStatus(started ? NodeStatus.RUNNING : NodeStatus.ERROR); - - if (started) { - logger.info("DataNode {} started successfully", nodeId); - - // 启动后定期健康检查 - scheduleHealthCheck(nodeId); - } else { - logger.error("Failed to start DataNode {}", nodeId); - } - } catch (Exception e) { - logger.error("Error starting DataNode {}: {}", nodeId, e.getMessage(), e); - nodeInfo.setStatus(NodeStatus.ERROR); - } - }); - - return true; - } catch (Exception e) { - logger.error("Exception in startDataNode: {}", e.getMessage(), e); + logger.error("Failed to communicate with Agent at {}", AGENT_URL, e); + nodeInfo.setStatus(NodeStatus.ERROR); return false; } } /** - * 停止DataNode + * 停止 DataNode */ public boolean stopDataNode(String nodeId) { try { - NodeInfo nodeInfo = nodeRegistry.get(nodeId); - if (nodeInfo == null) { - logger.warn("Node {} not found in registry", nodeId); - return false; - } - - logger.info("Stopping DataNode: {}", nodeId); - nodeInfo.setStatus(NodeStatus.STOPPING); + Map payload = new HashMap<>(); + payload.put("action", "stop"); + payload.put("nodeId", nodeId); - // 同步停止节点 - boolean stopped = stopNodeInternal(nodeInfo); - - if (stopped) { - nodeInfo.setStatus(NodeStatus.STOPPED); - nodeRegistry.remove(nodeId); - logger.info("DataNode {} stopped successfully", nodeId); - } else { - nodeInfo.setStatus(NodeStatus.ERROR); - logger.error("Failed to stop DataNode {}", nodeId); - } - - return stopped; + restTemplate.postForEntity(AGENT_URL, payload, Map.class); + nodeRegistry.remove(nodeId); + return true; } catch (Exception e) { - logger.error("Exception in stopDataNode: {}", e.getMessage(), e); + logger.error("Failed to stop node via Agent", e); return false; } } - /** * 内部启动逻辑 */ diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/service/DataNodeAgentService.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/service/DataNodeAgentService.java new file mode 100644 index 0000000..20ee5b3 --- /dev/null +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/service/DataNodeAgentService.java @@ -0,0 +1,136 @@ +package com.dkv.dkvstorage.service; + +import com.dkv.dkvstorage.rocksdb.DataNode; // 假设这是你的 RocksDB 包装类 +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PreDestroy; // Spring Boot 3.x 使用 jakarta, 2.x 使用 javax +import java.util.*; +import java.util.concurrent.*; + +@Service +public class DataNodeAgentService { + private static final Logger logger = LoggerFactory.getLogger(DataNodeAgentService.class); + + // 管理本机运行的所有 DataNode 实例 + private final Map runningNodes = new ConcurrentHashMap<>(); + + // 专用线程池用于异步启动节点,避免阻塞 HTTP 请求线程 + private final ExecutorService executor = Executors.newCachedThreadPool(); + + /** + * 启动节点 + */ + public Map startNode(String nodeId, String dataDir, int port, boolean isPrimary, String replicasStr) { + if (runningNodes.containsKey(nodeId)) { + return Map.of("success", false, "error", "Node " + nodeId + " is already running"); + } + + // 解析副本配置 + List replicaNodes = new ArrayList<>(); + if (replicasStr != null && !replicasStr.isEmpty()) { + replicaNodes.addAll(Arrays.asList(replicasStr.split(","))); + } + int replicationFactor = replicaNodes.size() + 1; + + logger.info("Starting DataNode: nodeId={}, port={}", nodeId, port); + + // 异步执行启动逻辑(因为 RocksDB 初始化可能耗时) + executor.submit(() -> { + try { + // 这里调用你原本的 DataNode 构造和启动逻辑 + DataNode node = new DataNode(nodeId, dataDir, port, isPrimary, replicaNodes, replicationFactor); + node.start(); + + runningNodes.put(nodeId, node); + logger.info("DataNode {} started successfully on port {}", nodeId, port); + } catch (Exception e) { + logger.error("Failed to start DataNode {}: {}", nodeId, e.getMessage(), e); + } + }); + + // 立即返回 "请求已接收" + return Map.of( + "success", true, + "message", "DataNode start requested (async)", + "nodeId", nodeId + ); + } + + /** + * 停止节点 + */ + public Map stopNode(String nodeId) { + DataNode node = runningNodes.get(nodeId); + if (node == null) { + return Map.of("success", false, "error", "Node " + nodeId + " not found"); + } + + logger.info("Stopping DataNode: {}", nodeId); + + executor.submit(() -> { + try { + node.stop(); + runningNodes.remove(nodeId); + logger.info("DataNode {} stopped successfully", nodeId); + } catch (Exception e) { + logger.error("Error stopping DataNode {}: {}", nodeId, e.getMessage(), e); + } + }); + + return Map.of("success", true, "message", "DataNode stop requested", "nodeId", nodeId); + } + + /** + * 获取状态 + */ + public Map getStatus(String nodeId) { + if (nodeId == null || nodeId.isEmpty()) { + // 返回所有节点 + List> list = new ArrayList<>(); + runningNodes.forEach((k, v) -> { + Map info = new HashMap<>(); + info.put("nodeId", k); + info.put("running", v.isHealthy()); // 假设有 isHealthy 方法 + list.add(info); + }); + return Map.of("success", true, "count", list.size(), "nodes", list); + } else { + // 返回单个节点 + DataNode node = runningNodes.get(nodeId); + if (node == null) { + return Map.of("success", false, "error", "Node not found"); + } + return Map.of("success", true, "nodeId", nodeId, "status", node.isHealthy() ? "RUNNING" : "STOPPED"); + } + } + + /** + * Agent 健康检查 + */ + public Map getHealth() { + return Map.of( + "status", "UP", + "service", "DKV DataNode Agent", + "activeNodes", runningNodes.size(), + "timestamp", System.currentTimeMillis() + ); + } + + /** + * Spring 容器关闭时自动调用,清理资源 + */ + @PreDestroy + public void cleanup() { + logger.info("Agent is shutting down, stopping all DataNodes..."); + runningNodes.values().forEach(node -> { + try { + node.stop(); + } catch (Exception e) { + logger.error("Error stopping node during shutdown", e); + } + }); + executor.shutdownNow(); + } +} \ No newline at end of file diff --git a/dkv-storage/src/main/resources/application.properties b/dkv-storage/src/main/resources/application.properties new file mode 100644 index 0000000..cd2d02b --- /dev/null +++ b/dkv-storage/src/main/resources/application.properties @@ -0,0 +1 @@ +server.port=8085 diff --git a/simplerocksdb/000008.sst b/simplerocksdb/000008.sst deleted file mode 100644 index 1b244e7f63914df0b62450c20697704f0516aea8..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1003 zcmah|zi$&U6h70mxuj{D6563dxuI)Jk~WRnC__t8DXBrr_s$M-QC!oV3A#k9}q?*e41CtclM6Qy#zw>of)% zvsyv1gpiJKNSOvn`Jld9MZ+^;T+boBfg5fG^#w~JjzbbrmSP$>E4e%;5d;vDgz2KY z8pT8sEhK{rU?%@ws$obK+!xoDQhrW?QdIC~oWyZjb+1rQ#M%IAXo?fVo=>&v)*71~ zFQe&hG}~M4j;FMthC$k`cf5#beA?%~Y^yh6cbj)hU6x-)gqQ8f*aAdKPx!aG=gt$ zKYf{V3n(C(U_~!sr!bFzfi}xDyHSoikH%}utWL3vMq?9Zzz6!wDJ&o%BckyINIP=J zT}ER8*xgcvP3O{bp>ht{49n#ebU|2RbfRzo%p?Zh7XbvwEu#qJ0I-Nb5*{e0I+Hi6 z<_Tl5mhK|rNrbanGu0_n$|Ex{Nx@vplS+FN#3?Nz!!m%H$pYE-9YY~lU?iR?&rZN8 zRFR|u$_cYd1Z-0HAYFGkKankr%)0uf3~YrssinAd%ND6qY!u7Dn$@dQw`4L7Gj7{z zsZ^Y_YF}&9AN1$P`=8G3nd=`E%Z$o2@%ZE1)al@WyvtqXy`74M{&QxH{rA^juRrX5 GfAbdx4mMi= diff --git a/simplerocksdb/000009.log b/simplerocksdb/000009.log deleted file mode 100644 index 3ea3f0f8e2ed7580d4bef5c79d627a1d1e8b61fb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 56 zcmewp=e5(2fsu&;0vLf5BTGqYafzHqYEDkRLQsBkcCm|-LRwLNu0nBUZb43}0!SJF DaWD^X diff --git a/simplerocksdb/CURRENT b/simplerocksdb/CURRENT deleted file mode 100644 index 3051f81..0000000 --- a/simplerocksdb/CURRENT +++ /dev/null @@ -1 +0,0 @@ -MANIFEST-000010 diff --git a/simplerocksdb/IDENTITY b/simplerocksdb/IDENTITY deleted file mode 100644 index 9734c1c..0000000 --- a/simplerocksdb/IDENTITY +++ /dev/null @@ -1 +0,0 @@ -ae0b527f-e33c-11f0-9222-b047e949a510 \ No newline at end of file diff --git a/simplerocksdb/LOCK b/simplerocksdb/LOCK deleted file mode 100644 index e69de29..0000000 diff --git a/simplerocksdb/LOG b/simplerocksdb/LOG deleted file mode 100644 index 370f62f..0000000 --- a/simplerocksdb/LOG +++ /dev/null @@ -1,290 +0,0 @@ -2025/12/28-00:31:22.894318 6f88 RocksDB version: 7.10.2 -2025/12/28-00:31:22.894399 6f88 Git sha 3258b5c3e2488464de0827343c8c27bc6499765e -2025/12/28-00:31:22.894413 6f88 Compile date 2023-03-02 18:27:12 -2025/12/28-00:31:22.894424 6f88 DB SUMMARY -2025/12/28-00:31:22.894431 6f88 DB Session ID: VVU6K15DPMNAHANBV6YQ -2025/12/28-00:31:22.894793 6f88 CURRENT file: CURRENT -2025/12/28-00:31:22.894807 6f88 IDENTITY file: IDENTITY -2025/12/28-00:31:22.894864 6f88 MANIFEST file: MANIFEST-000005 size: 66 Bytes -2025/12/28-00:31:22.894876 6f88 SST files in ./simplerocksdb dir, Total Num: 0, files: -2025/12/28-00:31:22.894886 6f88 Write Ahead Log file in ./simplerocksdb: 000004.log size: 56 ; -2025/12/28-00:31:22.894893 6f88 Options.error_if_exists: 0 -2025/12/28-00:31:22.895159 6f88 Options.create_if_missing: 1 -2025/12/28-00:31:22.895168 6f88 Options.paranoid_checks: 1 -2025/12/28-00:31:22.895171 6f88 Options.flush_verify_memtable_count: 1 -2025/12/28-00:31:22.895173 6f88 Options.track_and_verify_wals_in_manifest: 0 -2025/12/28-00:31:22.895176 6f88 Options.verify_sst_unique_id_in_manifest: 1 -2025/12/28-00:31:22.895178 6f88 Options.env: 000002444E33F510 -2025/12/28-00:31:22.895181 6f88 Options.fs: WinFS -2025/12/28-00:31:22.895184 6f88 Options.info_log: 000002444E334E70 -2025/12/28-00:31:22.895186 6f88 Options.max_file_opening_threads: 16 -2025/12/28-00:31:22.895188 6f88 Options.statistics: 0000000000000000 -2025/12/28-00:31:22.895191 6f88 Options.use_fsync: 0 -2025/12/28-00:31:22.895193 6f88 Options.max_log_file_size: 0 -2025/12/28-00:31:22.895196 6f88 Options.max_manifest_file_size: 1073741824 -2025/12/28-00:31:22.895198 6f88 Options.log_file_time_to_roll: 0 -2025/12/28-00:31:22.895201 6f88 Options.keep_log_file_num: 1000 -2025/12/28-00:31:22.895203 6f88 Options.recycle_log_file_num: 0 -2025/12/28-00:31:22.895206 6f88 Options.allow_fallocate: 1 -2025/12/28-00:31:22.895208 6f88 Options.allow_mmap_reads: 0 -2025/12/28-00:31:22.895210 6f88 Options.allow_mmap_writes: 0 -2025/12/28-00:31:22.895213 6f88 Options.use_direct_reads: 0 -2025/12/28-00:31:22.895215 6f88 Options.use_direct_io_for_flush_and_compaction: 0 -2025/12/28-00:31:22.895218 6f88 Options.create_missing_column_families: 0 -2025/12/28-00:31:22.895220 6f88 Options.db_log_dir: -2025/12/28-00:31:22.895222 6f88 Options.wal_dir: -2025/12/28-00:31:22.895225 6f88 Options.table_cache_numshardbits: 6 -2025/12/28-00:31:22.895227 6f88 Options.WAL_ttl_seconds: 0 -2025/12/28-00:31:22.895229 6f88 Options.WAL_size_limit_MB: 0 -2025/12/28-00:31:22.895232 6f88 Options.max_write_batch_group_size_bytes: 1048576 -2025/12/28-00:31:22.895234 6f88 Options.manifest_preallocation_size: 4194304 -2025/12/28-00:31:22.895236 6f88 Options.is_fd_close_on_exec: 1 -2025/12/28-00:31:22.895239 6f88 Options.advise_random_on_open: 1 -2025/12/28-00:31:22.895241 6f88 Options.db_write_buffer_size: 0 -2025/12/28-00:31:22.895243 6f88 Options.write_buffer_manager: 000002444E33FBA0 -2025/12/28-00:31:22.895246 6f88 Options.access_hint_on_compaction_start: 1 -2025/12/28-00:31:22.895248 6f88 Options.random_access_max_buffer_size: 1048576 -2025/12/28-00:31:22.895250 6f88 Options.use_adaptive_mutex: 0 -2025/12/28-00:31:22.895253 6f88 Options.rate_limiter: 0000000000000000 -2025/12/28-00:31:22.895256 6f88 Options.sst_file_manager.rate_bytes_per_sec: 0 -2025/12/28-00:31:22.895258 6f88 Options.wal_recovery_mode: 2 -2025/12/28-00:31:22.895321 6f88 Options.enable_thread_tracking: 0 -2025/12/28-00:31:22.895326 6f88 Options.enable_pipelined_write: 0 -2025/12/28-00:31:22.895328 6f88 Options.unordered_write: 0 -2025/12/28-00:31:22.895331 6f88 Options.allow_concurrent_memtable_write: 1 -2025/12/28-00:31:22.895333 6f88 Options.enable_write_thread_adaptive_yield: 1 -2025/12/28-00:31:22.895335 6f88 Options.write_thread_max_yield_usec: 100 -2025/12/28-00:31:22.895338 6f88 Options.write_thread_slow_yield_usec: 3 -2025/12/28-00:31:22.895340 6f88 Options.row_cache: None -2025/12/28-00:31:22.895342 6f88 Options.wal_filter: None -2025/12/28-00:31:22.895345 6f88 Options.avoid_flush_during_recovery: 0 -2025/12/28-00:31:22.895347 6f88 Options.allow_ingest_behind: 0 -2025/12/28-00:31:22.895349 6f88 Options.two_write_queues: 0 -2025/12/28-00:31:22.895352 6f88 Options.manual_wal_flush: 0 -2025/12/28-00:31:22.895354 6f88 Options.wal_compression: 0 -2025/12/28-00:31:22.895356 6f88 Options.atomic_flush: 0 -2025/12/28-00:31:22.895359 6f88 Options.avoid_unnecessary_blocking_io: 0 -2025/12/28-00:31:22.895361 6f88 Options.persist_stats_to_disk: 0 -2025/12/28-00:31:22.895363 6f88 Options.write_dbid_to_manifest: 0 -2025/12/28-00:31:22.895366 6f88 Options.log_readahead_size: 0 -2025/12/28-00:31:22.895368 6f88 Options.file_checksum_gen_factory: Unknown -2025/12/28-00:31:22.895370 6f88 Options.best_efforts_recovery: 0 -2025/12/28-00:31:22.895373 6f88 Options.max_bgerror_resume_count: 2147483647 -2025/12/28-00:31:22.895375 6f88 Options.bgerror_resume_retry_interval: 1000000 -2025/12/28-00:31:22.895378 6f88 Options.allow_data_in_errors: 0 -2025/12/28-00:31:22.895380 6f88 Options.db_host_id: __hostname__ -2025/12/28-00:31:22.895382 6f88 Options.enforce_single_del_contracts: true -2025/12/28-00:31:22.895385 6f88 Options.max_background_jobs: 2 -2025/12/28-00:31:22.895387 6f88 Options.max_background_compactions: -1 -2025/12/28-00:31:22.895389 6f88 Options.max_subcompactions: 1 -2025/12/28-00:31:22.895392 6f88 Options.avoid_flush_during_shutdown: 0 -2025/12/28-00:31:22.895394 6f88 Options.writable_file_max_buffer_size: 1048576 -2025/12/28-00:31:22.895397 6f88 Options.delayed_write_rate : 16777216 -2025/12/28-00:31:22.895399 6f88 Options.max_total_wal_size: 0 -2025/12/28-00:31:22.895401 6f88 Options.delete_obsolete_files_period_micros: 21600000000 -2025/12/28-00:31:22.895404 6f88 Options.stats_dump_period_sec: 600 -2025/12/28-00:31:22.895406 6f88 Options.stats_persist_period_sec: 600 -2025/12/28-00:31:22.895409 6f88 Options.stats_history_buffer_size: 1048576 -2025/12/28-00:31:22.895411 6f88 Options.max_open_files: -1 -2025/12/28-00:31:22.895413 6f88 Options.bytes_per_sync: 0 -2025/12/28-00:31:22.895416 6f88 Options.wal_bytes_per_sync: 0 -2025/12/28-00:31:22.895418 6f88 Options.strict_bytes_per_sync: 0 -2025/12/28-00:31:22.895420 6f88 Options.compaction_readahead_size: 0 -2025/12/28-00:31:22.895423 6f88 Options.max_background_flushes: -1 -2025/12/28-00:31:22.895425 6f88 Compression algorithms supported: -2025/12/28-00:31:22.895435 6f88 kZSTD supported: 1 -2025/12/28-00:31:22.895438 6f88 kSnappyCompression supported: 1 -2025/12/28-00:31:22.895440 6f88 kBZip2Compression supported: 0 -2025/12/28-00:31:22.895443 6f88 kZlibCompression supported: 1 -2025/12/28-00:31:22.895445 6f88 kLZ4Compression supported: 1 -2025/12/28-00:31:22.895447 6f88 kXpressCompression supported: 0 -2025/12/28-00:31:22.895449 6f88 kLZ4HCCompression supported: 1 -2025/12/28-00:31:22.895466 6f88 kZSTDNotFinalCompression supported: 1 -2025/12/28-00:31:22.895476 6f88 Fast CRC32 supported: Supported on x86 -2025/12/28-00:31:22.895478 6f88 DMutex implementation: std::mutex -2025/12/28-00:31:22.896461 6f88 [db\version_set.cc:5617] Recovering from manifest file: ./simplerocksdb/MANIFEST-000005 -2025/12/28-00:31:22.896713 6f88 [db\column_family.cc:632] --------------- Options for column family [default]: -2025/12/28-00:31:22.896727 6f88 Options.comparator: leveldb.BytewiseComparator -2025/12/28-00:31:22.896730 6f88 Options.merge_operator: None -2025/12/28-00:31:22.896732 6f88 Options.compaction_filter: None -2025/12/28-00:31:22.896734 6f88 Options.compaction_filter_factory: None -2025/12/28-00:31:22.896737 6f88 Options.sst_partitioner_factory: None -2025/12/28-00:31:22.896739 6f88 Options.memtable_factory: SkipListFactory -2025/12/28-00:31:22.896741 6f88 Options.table_factory: BlockBasedTable -2025/12/28-00:31:22.896771 6f88 table_factory options: flush_block_policy_factory: FlushBlockBySizePolicyFactory (000002444E31B8D0) - cache_index_and_filter_blocks: 0 - cache_index_and_filter_blocks_with_high_priority: 1 - pin_l0_filter_and_index_blocks_in_cache: 0 - pin_top_level_index_and_filter: 1 - index_type: 0 - data_block_index_type: 0 - index_shortening: 1 - data_block_hash_table_util_ratio: 0.750000 - checksum: 4 - no_block_cache: 0 - block_cache: 0000024462190E10 - block_cache_name: LRUCache - block_cache_options: - capacity : 8388608 - num_shard_bits : 4 - strict_capacity_limit : 0 - memory_allocator : None - high_pri_pool_ratio: 0.000 - low_pri_pool_ratio: 0.000 - block_cache_compressed: 0000000000000000 - persistent_cache: 0000000000000000 - block_size: 4096 - block_size_deviation: 10 - block_restart_interval: 16 - index_block_restart_interval: 1 - metadata_block_size: 4096 - partition_filters: 0 - use_delta_encoding: 1 - filter_policy: nullptr - whole_key_filtering: 1 - verify_compression: 0 - read_amp_bytes_per_bit: 0 - format_version: 5 - enable_index_compression: 1 - block_align: 0 - max_auto_readahead_size: 262144 - prepopulate_block_cache: 0 - initial_auto_readahead_size: 8192 - num_file_reads_for_auto_readahead: 2 -2025/12/28-00:31:22.896775 6f88 Options.write_buffer_size: 67108864 -2025/12/28-00:31:22.896777 6f88 Options.max_write_buffer_number: 2 -2025/12/28-00:31:22.896779 6f88 Options.compression: Snappy -2025/12/28-00:31:22.896782 6f88 Options.bottommost_compression: Disabled -2025/12/28-00:31:22.896784 6f88 Options.prefix_extractor: nullptr -2025/12/28-00:31:22.896786 6f88 Options.memtable_insert_with_hint_prefix_extractor: nullptr -2025/12/28-00:31:22.896789 6f88 Options.num_levels: 7 -2025/12/28-00:31:22.896791 6f88 Options.min_write_buffer_number_to_merge: 1 -2025/12/28-00:31:22.896793 6f88 Options.max_write_buffer_number_to_maintain: 0 -2025/12/28-00:31:22.896796 6f88 Options.max_write_buffer_size_to_maintain: 0 -2025/12/28-00:31:22.896798 6f88 Options.bottommost_compression_opts.window_bits: -14 -2025/12/28-00:31:22.896800 6f88 Options.bottommost_compression_opts.level: 32767 -2025/12/28-00:31:22.896803 6f88 Options.bottommost_compression_opts.strategy: 0 -2025/12/28-00:31:22.896805 6f88 Options.bottommost_compression_opts.max_dict_bytes: 0 -2025/12/28-00:31:22.896808 6f88 Options.bottommost_compression_opts.zstd_max_train_bytes: 0 -2025/12/28-00:31:22.896810 6f88 Options.bottommost_compression_opts.parallel_threads: 1 -2025/12/28-00:31:22.896812 6f88 Options.bottommost_compression_opts.enabled: false -2025/12/28-00:31:22.896815 6f88 Options.bottommost_compression_opts.max_dict_buffer_bytes: 0 -2025/12/28-00:31:22.896817 6f88 Options.bottommost_compression_opts.use_zstd_dict_trainer: true -2025/12/28-00:31:22.896820 6f88 Options.compression_opts.window_bits: -14 -2025/12/28-00:31:22.896822 6f88 Options.compression_opts.level: 32767 -2025/12/28-00:31:22.896826 6f88 Options.compression_opts.strategy: 0 -2025/12/28-00:31:22.896830 6f88 Options.compression_opts.max_dict_bytes: 0 -2025/12/28-00:31:22.896832 6f88 Options.compression_opts.zstd_max_train_bytes: 0 -2025/12/28-00:31:22.896835 6f88 Options.compression_opts.use_zstd_dict_trainer: true -2025/12/28-00:31:22.896837 6f88 Options.compression_opts.parallel_threads: 1 -2025/12/28-00:31:22.896840 6f88 Options.compression_opts.enabled: false -2025/12/28-00:31:22.896842 6f88 Options.compression_opts.max_dict_buffer_bytes: 0 -2025/12/28-00:31:22.896844 6f88 Options.level0_file_num_compaction_trigger: 4 -2025/12/28-00:31:22.896847 6f88 Options.level0_slowdown_writes_trigger: 20 -2025/12/28-00:31:22.896849 6f88 Options.level0_stop_writes_trigger: 36 -2025/12/28-00:31:22.896851 6f88 Options.target_file_size_base: 67108864 -2025/12/28-00:31:22.896854 6f88 Options.target_file_size_multiplier: 1 -2025/12/28-00:31:22.896856 6f88 Options.max_bytes_for_level_base: 268435456 -2025/12/28-00:31:22.896858 6f88 Options.level_compaction_dynamic_level_bytes: 0 -2025/12/28-00:31:22.896861 6f88 Options.max_bytes_for_level_multiplier: 10.000000 -2025/12/28-00:31:22.896864 6f88 Options.max_bytes_for_level_multiplier_addtl[0]: 1 -2025/12/28-00:31:22.896866 6f88 Options.max_bytes_for_level_multiplier_addtl[1]: 1 -2025/12/28-00:31:22.896869 6f88 Options.max_bytes_for_level_multiplier_addtl[2]: 1 -2025/12/28-00:31:22.896871 6f88 Options.max_bytes_for_level_multiplier_addtl[3]: 1 -2025/12/28-00:31:22.896873 6f88 Options.max_bytes_for_level_multiplier_addtl[4]: 1 -2025/12/28-00:31:22.896876 6f88 Options.max_bytes_for_level_multiplier_addtl[5]: 1 -2025/12/28-00:31:22.896878 6f88 Options.max_bytes_for_level_multiplier_addtl[6]: 1 -2025/12/28-00:31:22.896881 6f88 Options.max_sequential_skip_in_iterations: 8 -2025/12/28-00:31:22.896883 6f88 Options.max_compaction_bytes: 1677721600 -2025/12/28-00:31:22.896886 6f88 Options.ignore_max_compaction_bytes_for_input: true -2025/12/28-00:31:22.896888 6f88 Options.arena_block_size: 1048576 -2025/12/28-00:31:22.896890 6f88 Options.soft_pending_compaction_bytes_limit: 68719476736 -2025/12/28-00:31:22.896893 6f88 Options.hard_pending_compaction_bytes_limit: 274877906944 -2025/12/28-00:31:22.896895 6f88 Options.disable_auto_compactions: 0 -2025/12/28-00:31:22.896898 6f88 Options.compaction_style: kCompactionStyleLevel -2025/12/28-00:31:22.896901 6f88 Options.compaction_pri: kMinOverlappingRatio -2025/12/28-00:31:22.896903 6f88 Options.compaction_options_universal.size_ratio: 1 -2025/12/28-00:31:22.896905 6f88 Options.compaction_options_universal.min_merge_width: 2 -2025/12/28-00:31:22.896908 6f88 Options.compaction_options_universal.max_merge_width: 4294967295 -2025/12/28-00:31:22.896910 6f88 Options.compaction_options_universal.max_size_amplification_percent: 200 -2025/12/28-00:31:22.896913 6f88 Options.compaction_options_universal.compression_size_percent: -1 -2025/12/28-00:31:22.896915 6f88 Options.compaction_options_universal.stop_style: kCompactionStopStyleTotalSize -2025/12/28-00:31:22.896918 6f88 Options.compaction_options_fifo.max_table_files_size: 1073741824 -2025/12/28-00:31:22.896920 6f88 Options.compaction_options_fifo.allow_compaction: 0 -2025/12/28-00:31:22.896924 6f88 Options.table_properties_collectors: -2025/12/28-00:31:22.896929 6f88 Options.inplace_update_support: 0 -2025/12/28-00:31:22.896931 6f88 Options.inplace_update_num_locks: 10000 -2025/12/28-00:31:22.896934 6f88 Options.memtable_prefix_bloom_size_ratio: 0.000000 -2025/12/28-00:31:22.896936 6f88 Options.memtable_whole_key_filtering: 0 -2025/12/28-00:31:22.896939 6f88 Options.memtable_huge_page_size: 0 -2025/12/28-00:31:22.896941 6f88 Options.bloom_locality: 0 -2025/12/28-00:31:22.896943 6f88 Options.max_successive_merges: 0 -2025/12/28-00:31:22.896975 6f88 Options.optimize_filters_for_hits: 0 -2025/12/28-00:31:22.896979 6f88 Options.paranoid_file_checks: 0 -2025/12/28-00:31:22.896981 6f88 Options.force_consistency_checks: 1 -2025/12/28-00:31:22.896983 6f88 Options.report_bg_io_stats: 0 -2025/12/28-00:31:22.896986 6f88 Options.ttl: 2592000 -2025/12/28-00:31:22.896988 6f88 Options.periodic_compaction_seconds: 0 -2025/12/28-00:31:22.896991 6f88 Options.preclude_last_level_data_seconds: 0 -2025/12/28-00:31:22.896993 6f88 Options.preserve_internal_time_seconds: 0 -2025/12/28-00:31:22.896995 6f88 Options.enable_blob_files: false -2025/12/28-00:31:22.896997 6f88 Options.min_blob_size: 0 -2025/12/28-00:31:22.897000 6f88 Options.blob_file_size: 268435456 -2025/12/28-00:31:22.897003 6f88 Options.blob_compression_type: NoCompression -2025/12/28-00:31:22.897005 6f88 Options.enable_blob_garbage_collection: false -2025/12/28-00:31:22.897007 6f88 Options.blob_garbage_collection_age_cutoff: 0.250000 -2025/12/28-00:31:22.897010 6f88 Options.blob_garbage_collection_force_threshold: 1.000000 -2025/12/28-00:31:22.897013 6f88 Options.blob_compaction_readahead_size: 0 -2025/12/28-00:31:22.897015 6f88 Options.blob_file_starting_level: 0 -2025/12/28-00:31:22.897018 6f88 Options.experimental_mempurge_threshold: 0.000000 -2025/12/28-00:31:22.899148 6f88 [db\version_set.cc:5668] Recovered from manifest file:./simplerocksdb/MANIFEST-000005 succeeded,manifest_file_number is 5, next_file_number is 7, last_sequence is 0, log_number is 0,prev_log_number is 0,max_column_family is 0,min_log_number_to_keep is 0 -2025/12/28-00:31:22.899162 6f88 [db\version_set.cc:5677] Column family [default] (ID 0), log number is 0 -2025/12/28-00:31:22.899524 6f88 [db\db_impl\db_impl_open.cc:539] DB ID: ae0b527f-e33c-11f0-9222-b047e949a510 -2025/12/28-00:31:22.900173 6f88 EVENT_LOG_v1 {"time_micros": 1766853082900158, "job": 1, "event": "recovery_started", "wal_files": [4]} -2025/12/28-00:31:22.900186 6f88 [db\db_impl\db_impl_open.cc:1045] Recovering log #4 mode 2 -2025/12/28-00:31:22.912665 6f88 EVENT_LOG_v1 {"time_micros": 1766853082912633, "cf_name": "default", "job": 1, "event": "table_file_creation", "file_number": 8, "file_size": 1003, "file_checksum": "", "file_checksum_func_name": "Unknown", "smallest_seqno": 1, "largest_seqno": 1, "table_properties": {"data_size": 58, "index_size": 21, "index_partitions": 0, "top_level_index_size": 0, "index_key_is_user_key": 1, "index_value_is_delta_encoded": 1, "filter_size": 0, "raw_key_size": 12, "raw_average_key_size": 12, "raw_value_size": 30, "raw_average_value_size": 30, "num_data_blocks": 1, "num_entries": 1, "num_filter_entries": 0, "num_deletions": 0, "num_merge_operands": 0, "num_range_deletions": 0, "format_version": 0, "fixed_key_len": 0, "filter_policy": "", "column_family_name": "default", "column_family_id": 0, "comparator": "leveldb.BytewiseComparator", "merge_operator": "nullptr", "prefix_extractor_name": "nullptr", "property_collectors": "[]", "compression": "Snappy", "compression_options": "window_bits=-14; level=32767; strategy=0; max_dict_bytes=0; zstd_max_train_bytes=0; enabled=0; max_dict_buffer_bytes=0; use_zstd_dict_trainer=1; ", "creation_time": 1766853082, "oldest_key_time": 0, "file_creation_time": 0, "slow_compression_estimated_data_size": 0, "fast_compression_estimated_data_size": 0, "db_id": "ae0b527f-e33c-11f0-9222-b047e949a510", "db_session_id": "VVU6K15DPMNAHANBV6YQ", "orig_file_number": 8, "seqno_to_time_mapping": "N/A"}} -2025/12/28-00:31:22.914822 6f88 EVENT_LOG_v1 {"time_micros": 1766853082914815, "job": 1, "event": "recovery_finished"} -2025/12/28-00:31:22.915232 6f88 [db\version_set.cc:5135] Creating manifest 10 -2025/12/28-00:31:22.927717 6f88 [file\delete_scheduler.cc:78] Deleted file ./simplerocksdb/000004.log immediately, rate_bytes_per_sec 0, total_trash_size 0 max_trash_db_ratio 0.250000 -2025/12/28-00:31:22.927748 6f88 [db\db_impl\db_impl_open.cc:1992] SstFileManager instance 000002444E2EA4E0 -2025/12/28-00:31:22.928084 6f88 DB pointer 000002444E340F40 -2025/12/28-00:31:22.928368 4ad4 [db\db_impl\db_impl.cc:1110] ------- DUMPING STATS ------- -2025/12/28-00:31:22.928382 4ad4 [db\db_impl\db_impl.cc:1111] -** DB Stats ** -Uptime(secs): 0.0 total, 0.0 interval -Cumulative writes: 0 writes, 0 keys, 0 commit groups, 0.0 writes per commit group, ingest: 0.00 GB, 0.00 MB/s -Cumulative WAL: 0 writes, 0 syncs, 0.00 writes per sync, written: 0.00 GB, 0.00 MB/s -Cumulative stall: 00:00:0.000 H:M:S, 0.0 percent -Interval writes: 0 writes, 0 keys, 0 commit groups, 0.0 writes per commit group, ingest: 0.00 MB, 0.00 MB/s -Interval WAL: 0 writes, 0 syncs, 0.00 writes per sync, written: 0.00 GB, 0.00 MB/s -Interval stall: 00:00:0.000 H:M:S, 0.0 percent - -** Compaction Stats [default] ** -Level Files Size Score Read(GB) Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB) ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - L0 1/0 0.98 KB 0.2 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.1 0.01 0.00 1 0.012 0 0 0.0 0.0 - Sum 1/0 0.98 KB 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.1 0.01 0.00 1 0.012 0 0 0.0 0.0 - Int 0/0 0.00 KB 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.1 0.01 0.00 1 0.012 0 0 0.0 0.0 - -** Compaction Stats [default] ** -Priority Files Size Score Read(GB) Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB) ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -User 0/0 0.00 KB 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.1 0.01 0.00 1 0.012 0 0 0.0 0.0 - -Blob file count: 0, total size: 0.0 GB, garbage size: 0.0 GB, space amp: 0.0 - -Uptime(secs): 0.0 total, 0.0 interval -Flush(GB): cumulative 0.000, interval 0.000 -AddFile(GB): cumulative 0.000, interval 0.000 -AddFile(Total Files): cumulative 0, interval 0 -AddFile(L0 Files): cumulative 0, interval 0 -AddFile(Keys): cumulative 0, interval 0 -Cumulative compaction: 0.00 GB write, 0.03 MB/s write, 0.00 GB read, 0.00 MB/s read, 0.0 seconds -Interval compaction: 0.00 GB write, 0.03 MB/s write, 0.00 GB read, 0.00 MB/s read, 0.0 seconds -Stalls(count): 0 level0_slowdown, 0 level0_slowdown_with_compaction, 0 level0_numfiles, 0 level0_numfiles_with_compaction, 0 stop for pending_compaction_bytes, 0 slowdown for pending_compaction_bytes, 0 memtable_compaction, 0 memtable_slowdown, interval 0 total count -Block cache LRUCache@0000024462190E10#11740 capacity: 8.00 MB usage: 0.08 KB table_size: 256 occupancy: 87 collections: 1 last_copies: 0 last_secs: 1.9e-05 secs_since: 0 -Block cache entry stats(count,size,portion): Misc(1,0.00 KB,0%) - -** File Read Latency Histogram By Level [default] ** -2025/12/28-00:31:22.930448 6f88 [db\db_impl\db_impl.cc:497] Shutdown: canceling all background work -2025/12/28-00:31:22.935036 6f88 [db\db_impl\db_impl.cc:704] Shutdown complete diff --git a/simplerocksdb/LOG.old.1766853082893003 b/simplerocksdb/LOG.old.1766853082893003 deleted file mode 100644 index 110970f..0000000 --- a/simplerocksdb/LOG.old.1766853082893003 +++ /dev/null @@ -1,281 +0,0 @@ -2025/12/27-23:57:01.600913 250c RocksDB version: 7.10.2 -2025/12/27-23:57:01.600965 250c Git sha 3258b5c3e2488464de0827343c8c27bc6499765e -2025/12/27-23:57:01.600977 250c Compile date 2023-03-02 18:27:12 -2025/12/27-23:57:01.600986 250c DB SUMMARY -2025/12/27-23:57:01.600992 250c DB Session ID: G7NX4H7IEDATZ4IDZKEJ -2025/12/27-23:57:01.601190 250c SST files in ./simplerocksdb dir, Total Num: 0, files: -2025/12/27-23:57:01.601200 250c Write Ahead Log file in ./simplerocksdb: -2025/12/27-23:57:01.601207 250c Options.error_if_exists: 0 -2025/12/27-23:57:01.601212 250c Options.create_if_missing: 1 -2025/12/27-23:57:01.601218 250c Options.paranoid_checks: 1 -2025/12/27-23:57:01.601223 250c Options.flush_verify_memtable_count: 1 -2025/12/27-23:57:01.601487 250c Options.track_and_verify_wals_in_manifest: 0 -2025/12/27-23:57:01.601495 250c Options.verify_sst_unique_id_in_manifest: 1 -2025/12/27-23:57:01.601497 250c Options.env: 000001DBEF5BAF70 -2025/12/27-23:57:01.601500 250c Options.fs: WinFS -2025/12/27-23:57:01.601502 250c Options.info_log: 000001DBF03F2460 -2025/12/27-23:57:01.601504 250c Options.max_file_opening_threads: 16 -2025/12/27-23:57:01.601507 250c Options.statistics: 0000000000000000 -2025/12/27-23:57:01.601509 250c Options.use_fsync: 0 -2025/12/27-23:57:01.601511 250c Options.max_log_file_size: 0 -2025/12/27-23:57:01.601513 250c Options.max_manifest_file_size: 1073741824 -2025/12/27-23:57:01.601515 250c Options.log_file_time_to_roll: 0 -2025/12/27-23:57:01.601518 250c Options.keep_log_file_num: 1000 -2025/12/27-23:57:01.601520 250c Options.recycle_log_file_num: 0 -2025/12/27-23:57:01.601522 250c Options.allow_fallocate: 1 -2025/12/27-23:57:01.601524 250c Options.allow_mmap_reads: 0 -2025/12/27-23:57:01.601526 250c Options.allow_mmap_writes: 0 -2025/12/27-23:57:01.601528 250c Options.use_direct_reads: 0 -2025/12/27-23:57:01.601530 250c Options.use_direct_io_for_flush_and_compaction: 0 -2025/12/27-23:57:01.601533 250c Options.create_missing_column_families: 0 -2025/12/27-23:57:01.601535 250c Options.db_log_dir: -2025/12/27-23:57:01.601537 250c Options.wal_dir: -2025/12/27-23:57:01.601539 250c Options.table_cache_numshardbits: 6 -2025/12/27-23:57:01.601541 250c Options.WAL_ttl_seconds: 0 -2025/12/27-23:57:01.601544 250c Options.WAL_size_limit_MB: 0 -2025/12/27-23:57:01.601546 250c Options.max_write_batch_group_size_bytes: 1048576 -2025/12/27-23:57:01.601548 250c Options.manifest_preallocation_size: 4194304 -2025/12/27-23:57:01.601550 250c Options.is_fd_close_on_exec: 1 -2025/12/27-23:57:01.601552 250c Options.advise_random_on_open: 1 -2025/12/27-23:57:01.601554 250c Options.db_write_buffer_size: 0 -2025/12/27-23:57:01.601557 250c Options.write_buffer_manager: 000001DBF0574DF0 -2025/12/27-23:57:01.601559 250c Options.access_hint_on_compaction_start: 1 -2025/12/27-23:57:01.601561 250c Options.random_access_max_buffer_size: 1048576 -2025/12/27-23:57:01.601563 250c Options.use_adaptive_mutex: 0 -2025/12/27-23:57:01.601565 250c Options.rate_limiter: 0000000000000000 -2025/12/27-23:57:01.601568 250c Options.sst_file_manager.rate_bytes_per_sec: 0 -2025/12/27-23:57:01.601570 250c Options.wal_recovery_mode: 2 -2025/12/27-23:57:01.601572 250c Options.enable_thread_tracking: 0 -2025/12/27-23:57:01.601574 250c Options.enable_pipelined_write: 0 -2025/12/27-23:57:01.601600 250c Options.unordered_write: 0 -2025/12/27-23:57:01.601604 250c Options.allow_concurrent_memtable_write: 1 -2025/12/27-23:57:01.601606 250c Options.enable_write_thread_adaptive_yield: 1 -2025/12/27-23:57:01.601609 250c Options.write_thread_max_yield_usec: 100 -2025/12/27-23:57:01.601611 250c Options.write_thread_slow_yield_usec: 3 -2025/12/27-23:57:01.601613 250c Options.row_cache: None -2025/12/27-23:57:01.601615 250c Options.wal_filter: None -2025/12/27-23:57:01.601618 250c Options.avoid_flush_during_recovery: 0 -2025/12/27-23:57:01.601620 250c Options.allow_ingest_behind: 0 -2025/12/27-23:57:01.601622 250c Options.two_write_queues: 0 -2025/12/27-23:57:01.601624 250c Options.manual_wal_flush: 0 -2025/12/27-23:57:01.601626 250c Options.wal_compression: 0 -2025/12/27-23:57:01.601628 250c Options.atomic_flush: 0 -2025/12/27-23:57:01.601631 250c Options.avoid_unnecessary_blocking_io: 0 -2025/12/27-23:57:01.601633 250c Options.persist_stats_to_disk: 0 -2025/12/27-23:57:01.601635 250c Options.write_dbid_to_manifest: 0 -2025/12/27-23:57:01.601637 250c Options.log_readahead_size: 0 -2025/12/27-23:57:01.601639 250c Options.file_checksum_gen_factory: Unknown -2025/12/27-23:57:01.601641 250c Options.best_efforts_recovery: 0 -2025/12/27-23:57:01.601643 250c Options.max_bgerror_resume_count: 2147483647 -2025/12/27-23:57:01.601645 250c Options.bgerror_resume_retry_interval: 1000000 -2025/12/27-23:57:01.601648 250c Options.allow_data_in_errors: 0 -2025/12/27-23:57:01.601650 250c Options.db_host_id: __hostname__ -2025/12/27-23:57:01.601652 250c Options.enforce_single_del_contracts: true -2025/12/27-23:57:01.601654 250c Options.max_background_jobs: 2 -2025/12/27-23:57:01.601656 250c Options.max_background_compactions: -1 -2025/12/27-23:57:01.601658 250c Options.max_subcompactions: 1 -2025/12/27-23:57:01.601661 250c Options.avoid_flush_during_shutdown: 0 -2025/12/27-23:57:01.601663 250c Options.writable_file_max_buffer_size: 1048576 -2025/12/27-23:57:01.601665 250c Options.delayed_write_rate : 16777216 -2025/12/27-23:57:01.601667 250c Options.max_total_wal_size: 0 -2025/12/27-23:57:01.601669 250c Options.delete_obsolete_files_period_micros: 21600000000 -2025/12/27-23:57:01.601672 250c Options.stats_dump_period_sec: 600 -2025/12/27-23:57:01.601674 250c Options.stats_persist_period_sec: 600 -2025/12/27-23:57:01.601676 250c Options.stats_history_buffer_size: 1048576 -2025/12/27-23:57:01.601678 250c Options.max_open_files: -1 -2025/12/27-23:57:01.601680 250c Options.bytes_per_sync: 0 -2025/12/27-23:57:01.601682 250c Options.wal_bytes_per_sync: 0 -2025/12/27-23:57:01.601685 250c Options.strict_bytes_per_sync: 0 -2025/12/27-23:57:01.601687 250c Options.compaction_readahead_size: 0 -2025/12/27-23:57:01.601689 250c Options.max_background_flushes: -1 -2025/12/27-23:57:01.601691 250c Compression algorithms supported: -2025/12/27-23:57:01.601699 250c kZSTD supported: 1 -2025/12/27-23:57:01.601701 250c kSnappyCompression supported: 1 -2025/12/27-23:57:01.601704 250c kBZip2Compression supported: 0 -2025/12/27-23:57:01.601706 250c kZlibCompression supported: 1 -2025/12/27-23:57:01.601708 250c kLZ4Compression supported: 1 -2025/12/27-23:57:01.601710 250c kXpressCompression supported: 0 -2025/12/27-23:57:01.601712 250c kLZ4HCCompression supported: 1 -2025/12/27-23:57:01.601714 250c kZSTDNotFinalCompression supported: 1 -2025/12/27-23:57:01.601723 250c Fast CRC32 supported: Supported on x86 -2025/12/27-23:57:01.601725 250c DMutex implementation: std::mutex -2025/12/27-23:57:01.606391 250c [db\db_impl\db_impl_open.cc:317] Creating manifest 1 -2025/12/27-23:57:01.614819 250c [db\version_set.cc:5617] Recovering from manifest file: ./simplerocksdb/MANIFEST-000001 -2025/12/27-23:57:01.620268 250c [db\column_family.cc:632] --------------- Options for column family [default]: -2025/12/27-23:57:01.620286 250c Options.comparator: leveldb.BytewiseComparator -2025/12/27-23:57:01.620289 250c Options.merge_operator: None -2025/12/27-23:57:01.620291 250c Options.compaction_filter: None -2025/12/27-23:57:01.620294 250c Options.compaction_filter_factory: None -2025/12/27-23:57:01.620296 250c Options.sst_partitioner_factory: None -2025/12/27-23:57:01.620298 250c Options.memtable_factory: SkipListFactory -2025/12/27-23:57:01.620300 250c Options.table_factory: BlockBasedTable -2025/12/27-23:57:01.620324 250c table_factory options: flush_block_policy_factory: FlushBlockBySizePolicyFactory (000001DBF054C330) - cache_index_and_filter_blocks: 0 - cache_index_and_filter_blocks_with_high_priority: 1 - pin_l0_filter_and_index_blocks_in_cache: 0 - pin_top_level_index_and_filter: 1 - index_type: 0 - data_block_index_type: 0 - index_shortening: 1 - data_block_hash_table_util_ratio: 0.750000 - checksum: 4 - no_block_cache: 0 - block_cache: 000001DBF024A550 - block_cache_name: LRUCache - block_cache_options: - capacity : 8388608 - num_shard_bits : 4 - strict_capacity_limit : 0 - memory_allocator : None - high_pri_pool_ratio: 0.000 - low_pri_pool_ratio: 0.000 - block_cache_compressed: 0000000000000000 - persistent_cache: 0000000000000000 - block_size: 4096 - block_size_deviation: 10 - block_restart_interval: 16 - index_block_restart_interval: 1 - metadata_block_size: 4096 - partition_filters: 0 - use_delta_encoding: 1 - filter_policy: nullptr - whole_key_filtering: 1 - verify_compression: 0 - read_amp_bytes_per_bit: 0 - format_version: 5 - enable_index_compression: 1 - block_align: 0 - max_auto_readahead_size: 262144 - prepopulate_block_cache: 0 - initial_auto_readahead_size: 8192 - num_file_reads_for_auto_readahead: 2 -2025/12/27-23:57:01.620326 250c Options.write_buffer_size: 67108864 -2025/12/27-23:57:01.620329 250c Options.max_write_buffer_number: 2 -2025/12/27-23:57:01.620331 250c Options.compression: Snappy -2025/12/27-23:57:01.620333 250c Options.bottommost_compression: Disabled -2025/12/27-23:57:01.620335 250c Options.prefix_extractor: nullptr -2025/12/27-23:57:01.620337 250c Options.memtable_insert_with_hint_prefix_extractor: nullptr -2025/12/27-23:57:01.620340 250c Options.num_levels: 7 -2025/12/27-23:57:01.620342 250c Options.min_write_buffer_number_to_merge: 1 -2025/12/27-23:57:01.620344 250c Options.max_write_buffer_number_to_maintain: 0 -2025/12/27-23:57:01.620346 250c Options.max_write_buffer_size_to_maintain: 0 -2025/12/27-23:57:01.620348 250c Options.bottommost_compression_opts.window_bits: -14 -2025/12/27-23:57:01.620350 250c Options.bottommost_compression_opts.level: 32767 -2025/12/27-23:57:01.620352 250c Options.bottommost_compression_opts.strategy: 0 -2025/12/27-23:57:01.620355 250c Options.bottommost_compression_opts.max_dict_bytes: 0 -2025/12/27-23:57:01.620357 250c Options.bottommost_compression_opts.zstd_max_train_bytes: 0 -2025/12/27-23:57:01.620359 250c Options.bottommost_compression_opts.parallel_threads: 1 -2025/12/27-23:57:01.620361 250c Options.bottommost_compression_opts.enabled: false -2025/12/27-23:57:01.620363 250c Options.bottommost_compression_opts.max_dict_buffer_bytes: 0 -2025/12/27-23:57:01.620366 250c Options.bottommost_compression_opts.use_zstd_dict_trainer: true -2025/12/27-23:57:01.620368 250c Options.compression_opts.window_bits: -14 -2025/12/27-23:57:01.620370 250c Options.compression_opts.level: 32767 -2025/12/27-23:57:01.620372 250c Options.compression_opts.strategy: 0 -2025/12/27-23:57:01.620374 250c Options.compression_opts.max_dict_bytes: 0 -2025/12/27-23:57:01.620379 250c Options.compression_opts.zstd_max_train_bytes: 0 -2025/12/27-23:57:01.620383 250c Options.compression_opts.use_zstd_dict_trainer: true -2025/12/27-23:57:01.620385 250c Options.compression_opts.parallel_threads: 1 -2025/12/27-23:57:01.620387 250c Options.compression_opts.enabled: false -2025/12/27-23:57:01.620389 250c Options.compression_opts.max_dict_buffer_bytes: 0 -2025/12/27-23:57:01.620391 250c Options.level0_file_num_compaction_trigger: 4 -2025/12/27-23:57:01.620394 250c Options.level0_slowdown_writes_trigger: 20 -2025/12/27-23:57:01.620396 250c Options.level0_stop_writes_trigger: 36 -2025/12/27-23:57:01.620398 250c Options.target_file_size_base: 67108864 -2025/12/27-23:57:01.620400 250c Options.target_file_size_multiplier: 1 -2025/12/27-23:57:01.620402 250c Options.max_bytes_for_level_base: 268435456 -2025/12/27-23:57:01.620404 250c Options.level_compaction_dynamic_level_bytes: 0 -2025/12/27-23:57:01.620407 250c Options.max_bytes_for_level_multiplier: 10.000000 -2025/12/27-23:57:01.620409 250c Options.max_bytes_for_level_multiplier_addtl[0]: 1 -2025/12/27-23:57:01.620412 250c Options.max_bytes_for_level_multiplier_addtl[1]: 1 -2025/12/27-23:57:01.620414 250c Options.max_bytes_for_level_multiplier_addtl[2]: 1 -2025/12/27-23:57:01.620416 250c Options.max_bytes_for_level_multiplier_addtl[3]: 1 -2025/12/27-23:57:01.620419 250c Options.max_bytes_for_level_multiplier_addtl[4]: 1 -2025/12/27-23:57:01.620421 250c Options.max_bytes_for_level_multiplier_addtl[5]: 1 -2025/12/27-23:57:01.620423 250c Options.max_bytes_for_level_multiplier_addtl[6]: 1 -2025/12/27-23:57:01.620425 250c Options.max_sequential_skip_in_iterations: 8 -2025/12/27-23:57:01.620427 250c Options.max_compaction_bytes: 1677721600 -2025/12/27-23:57:01.620429 250c Options.ignore_max_compaction_bytes_for_input: true -2025/12/27-23:57:01.620431 250c Options.arena_block_size: 1048576 -2025/12/27-23:57:01.620434 250c Options.soft_pending_compaction_bytes_limit: 68719476736 -2025/12/27-23:57:01.620436 250c Options.hard_pending_compaction_bytes_limit: 274877906944 -2025/12/27-23:57:01.620438 250c Options.disable_auto_compactions: 0 -2025/12/27-23:57:01.620441 250c Options.compaction_style: kCompactionStyleLevel -2025/12/27-23:57:01.620443 250c Options.compaction_pri: kMinOverlappingRatio -2025/12/27-23:57:01.620446 250c Options.compaction_options_universal.size_ratio: 1 -2025/12/27-23:57:01.620448 250c Options.compaction_options_universal.min_merge_width: 2 -2025/12/27-23:57:01.620450 250c Options.compaction_options_universal.max_merge_width: 4294967295 -2025/12/27-23:57:01.620452 250c Options.compaction_options_universal.max_size_amplification_percent: 200 -2025/12/27-23:57:01.620454 250c Options.compaction_options_universal.compression_size_percent: -1 -2025/12/27-23:57:01.620457 250c Options.compaction_options_universal.stop_style: kCompactionStopStyleTotalSize -2025/12/27-23:57:01.620459 250c Options.compaction_options_fifo.max_table_files_size: 1073741824 -2025/12/27-23:57:01.620461 250c Options.compaction_options_fifo.allow_compaction: 0 -2025/12/27-23:57:01.620465 250c Options.table_properties_collectors: -2025/12/27-23:57:01.620469 250c Options.inplace_update_support: 0 -2025/12/27-23:57:01.620471 250c Options.inplace_update_num_locks: 10000 -2025/12/27-23:57:01.620473 250c Options.memtable_prefix_bloom_size_ratio: 0.000000 -2025/12/27-23:57:01.620476 250c Options.memtable_whole_key_filtering: 0 -2025/12/27-23:57:01.620478 250c Options.memtable_huge_page_size: 0 -2025/12/27-23:57:01.620480 250c Options.bloom_locality: 0 -2025/12/27-23:57:01.620482 250c Options.max_successive_merges: 0 -2025/12/27-23:57:01.620484 250c Options.optimize_filters_for_hits: 0 -2025/12/27-23:57:01.620514 250c Options.paranoid_file_checks: 0 -2025/12/27-23:57:01.620517 250c Options.force_consistency_checks: 1 -2025/12/27-23:57:01.620519 250c Options.report_bg_io_stats: 0 -2025/12/27-23:57:01.620522 250c Options.ttl: 2592000 -2025/12/27-23:57:01.620524 250c Options.periodic_compaction_seconds: 0 -2025/12/27-23:57:01.620526 250c Options.preclude_last_level_data_seconds: 0 -2025/12/27-23:57:01.620528 250c Options.preserve_internal_time_seconds: 0 -2025/12/27-23:57:01.620530 250c Options.enable_blob_files: false -2025/12/27-23:57:01.620532 250c Options.min_blob_size: 0 -2025/12/27-23:57:01.620534 250c Options.blob_file_size: 268435456 -2025/12/27-23:57:01.620537 250c Options.blob_compression_type: NoCompression -2025/12/27-23:57:01.620539 250c Options.enable_blob_garbage_collection: false -2025/12/27-23:57:01.620541 250c Options.blob_garbage_collection_age_cutoff: 0.250000 -2025/12/27-23:57:01.620544 250c Options.blob_garbage_collection_force_threshold: 1.000000 -2025/12/27-23:57:01.620546 250c Options.blob_compaction_readahead_size: 0 -2025/12/27-23:57:01.620548 250c Options.blob_file_starting_level: 0 -2025/12/27-23:57:01.620551 250c Options.experimental_mempurge_threshold: 0.000000 -2025/12/27-23:57:01.621616 250c [db\version_set.cc:5668] Recovered from manifest file:./simplerocksdb/MANIFEST-000001 succeeded,manifest_file_number is 1, next_file_number is 3, last_sequence is 0, log_number is 0,prev_log_number is 0,max_column_family is 0,min_log_number_to_keep is 0 -2025/12/27-23:57:01.621625 250c [db\version_set.cc:5677] Column family [default] (ID 0), log number is 0 -2025/12/27-23:57:01.626674 250c [db\db_impl\db_impl_open.cc:539] DB ID: ae0b527f-e33c-11f0-9222-b047e949a510 -2025/12/27-23:57:01.627725 250c [db\version_set.cc:5135] Creating manifest 5 -2025/12/27-23:57:01.646496 250c [db\db_impl\db_impl_open.cc:1992] SstFileManager instance 000001DBF05D1DC0 -2025/12/27-23:57:01.646737 250c DB pointer 000001DBF0563E00 -2025/12/27-23:57:01.647031 3c68 [db\db_impl\db_impl.cc:1110] ------- DUMPING STATS ------- -2025/12/27-23:57:01.647043 3c68 [db\db_impl\db_impl.cc:1111] -** DB Stats ** -Uptime(secs): 0.0 total, 0.0 interval -Cumulative writes: 0 writes, 0 keys, 0 commit groups, 0.0 writes per commit group, ingest: 0.00 GB, 0.00 MB/s -Cumulative WAL: 0 writes, 0 syncs, 0.00 writes per sync, written: 0.00 GB, 0.00 MB/s -Cumulative stall: 00:00:0.000 H:M:S, 0.0 percent -Interval writes: 0 writes, 0 keys, 0 commit groups, 0.0 writes per commit group, ingest: 0.00 MB, 0.00 MB/s -Interval WAL: 0 writes, 0 syncs, 0.00 writes per sync, written: 0.00 GB, 0.00 MB/s -Interval stall: 00:00:0.000 H:M:S, 0.0 percent - -** Compaction Stats [default] ** -Level Files Size Score Read(GB) Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB) ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Sum 0/0 0.00 KB 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.00 0.00 0 0.000 0 0 0.0 0.0 - Int 0/0 0.00 KB 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.00 0.00 0 0.000 0 0 0.0 0.0 - -** Compaction Stats [default] ** -Priority Files Size Score Read(GB) Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB) ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - -Blob file count: 0, total size: 0.0 GB, garbage size: 0.0 GB, space amp: 0.0 - -Uptime(secs): 0.0 total, 0.0 interval -Flush(GB): cumulative 0.000, interval 0.000 -AddFile(GB): cumulative 0.000, interval 0.000 -AddFile(Total Files): cumulative 0, interval 0 -AddFile(L0 Files): cumulative 0, interval 0 -AddFile(Keys): cumulative 0, interval 0 -Cumulative compaction: 0.00 GB write, 0.00 MB/s write, 0.00 GB read, 0.00 MB/s read, 0.0 seconds -Interval compaction: 0.00 GB write, 0.00 MB/s write, 0.00 GB read, 0.00 MB/s read, 0.0 seconds -Stalls(count): 0 level0_slowdown, 0 level0_slowdown_with_compaction, 0 level0_numfiles, 0 level0_numfiles_with_compaction, 0 stop for pending_compaction_bytes, 0 slowdown for pending_compaction_bytes, 0 memtable_compaction, 0 memtable_slowdown, interval 0 total count -Block cache LRUCache@000001DBF024A550#20636 capacity: 8.00 MB usage: 0.08 KB table_size: 256 occupancy: 87 collections: 1 last_copies: 0 last_secs: 3.2e-05 secs_since: 0 -Block cache entry stats(count,size,portion): Misc(1,0.00 KB,0%) - -** File Read Latency Histogram By Level [default] ** -2025/12/27-23:57:01.648832 250c [db\db_impl\db_impl.cc:497] Shutdown: canceling all background work -2025/12/27-23:57:01.652642 250c [db\db_impl\db_impl.cc:704] Shutdown complete diff --git a/simplerocksdb/MANIFEST-000010 b/simplerocksdb/MANIFEST-000010 deleted file mode 100644 index 843b156f4f714ff389f0ae7d5a2d926a6db9c070..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 172 zcmZS8)^KKEU<~`Yer`cxQDRAc(HCZ(C>91r zCI%J;;a~Gu*%%l(8JM|P7``rFJ0%1t&I;x-rZaH7X6GqMEiPeXWB>yc4kIHg>#fNL zPO-5uGVn4ovNLe7hvsGH<(KF22r%v0yhWhxLf+4hoqDP==Pom@3e0`U0k(&mi Date: Wed, 31 Dec 2025 03:14:02 +0800 Subject: [PATCH 2/2] =?UTF-8?q?1.=E5=AE=9E=E7=8E=B0=E4=BA=86=E7=94=A8?= =?UTF-8?q?=E6=88=B7Put=E6=95=B0=E6=8D=AE=E7=9A=84=E6=97=B6=E5=80=99?= =?UTF-8?q?=EF=BC=8C=E8=8A=82=E7=82=B9=E7=9A=84=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E5=88=86=E5=8F=91=E6=93=8D=E4=BD=9C,2.=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E9=A1=BA=E5=BA=8F=20master->storage->client=203.=E4=B9=8B?= =?UTF-8?q?=E5=90=8E=E9=9C=80=E8=A6=81=E5=9C=A8=E4=BF=9D=E6=8C=81=E5=89=AF?= =?UTF-8?q?=E6=9C=AC=E6=95=B0=E9=87=8F=E4=B8=8D=E5=8F=98=E7=9A=84=E5=89=8D?= =?UTF-8?q?=E6=8F=90=E4=B8=8B=EF=BC=8C=E8=80=83=E8=99=91=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E7=9A=84=E8=8A=82=E7=82=B9=E7=9A=84=E5=8F=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dkv/dkvclient/client/DkvClient.java | 146 ++++++++++++------ .../controller/ClientController.java | 4 +- dkv-common/pom.xml | 15 +- .../dkv/dkvmaster/DkvMasterApplication.java | 34 +--- .../controller/MasterController.java | 50 ++++-- .../service/NodeOrchestrationService.java | 4 + .../src/main/resources/static/index.html | 141 +++++++---------- dkv-storage/pom.xml | 12 +- .../com/dkv/dkvstorage/ConsistencyTest.java | 3 +- .../dkv/dkvstorage/agent/DataNodeAgent.java | 1 + .../controller/AgentController.java | 97 +++++++++++- .../com/dkv/dkvstorage/rocksdb/DataNode.java | 60 +++++-- .../dkvstorage/rocksdb/DataNodeManager.java | 17 +- .../dkvstorage/rocksdb/DkvServerHandler.java | 55 ++++--- .../rocksdb/ReplicationService.java | 90 +++++------ .../service/DataNodeAgentService.java | 83 +++++++--- 16 files changed, 510 insertions(+), 302 deletions(-) diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java b/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java index 55ac4e4..3244376 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java @@ -2,9 +2,11 @@ import com.dkv.dkvcommon.model.KvMessage; import com.dkv.dkvstorage.KvClientHandler; +import com.dkv.dkvstorage.rocksdb.DataNode; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; @@ -13,6 +15,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.client.RestTemplate; import java.util.ArrayList; @@ -27,6 +31,7 @@ public class DkvClient { private final String zkAddress; // ZooKeeper 地址 private final List nodes = new ArrayList<>(); private CuratorFramework zkClient; + private static final Logger logger = LoggerFactory.getLogger(DataNode.class); public DkvClient(String zkAddress) { this.zkAddress = zkAddress; @@ -76,17 +81,44 @@ private String getTargetIp(String key) { if (response == null || !response.containsKey("primary")) { return null; } - + logger.warn((String) response.get("primary")+"上的key:"+key+"已经被删除了"); return (String) response.get("primary"); } + private Map getRouteData(String key,Integer repeat) { + String url = "http://localhost:8081/api/route?key=" + key+"&replicas=" + repeat; + + RestTemplate restTemplate = new RestTemplate(); + Map response = restTemplate.getForObject(url, Map.class); + + if (response == null || !response.containsKey("primary")) { + return null; + } + + // 直接返回解析后的 Map,或者你可以自己在这里过滤一下只返回这两个字段 + return response; + } /** PUT 操作 */ - public String put(String key, byte[] value) throws InterruptedException { + public String put(String key, byte[] value,Integer repeat) throws InterruptedException { KvMessage message = new KvMessage(KvMessage.Type.PUT, key, value); - String primaryNode = getTargetIp(key); - System.out.println("向节点{}发送请求"+primaryNode); - sendRequest(primaryNode, message); - return primaryNode; + Map data = getRouteData(key,repeat); + if (data != null) { + String primary = (String) data.get("primary"); + List replicas = (List) data.get("replicas"); // 需要强转 + logger.info("当前的replicas "+replicas); + String replicasParam = String.join(",", replicas); + String url = "http://localhost:8085/agent/set?nodeId=" +primary +"&replicas=" + replicasParam; + + RestTemplate restTemplate = new RestTemplate(); + Map response1 = restTemplate.getForObject(url, Map.class); + logger.info("向节点 "+primary+"发送请求"); + sendRequest(primary, message); + return primary; + // ... + }else{ + return "error"; + } + } /** GET 操作 */ @@ -103,57 +135,71 @@ public void delete(String key) throws InterruptedException { } /** 使用 Netty 发送请求并返回响应 */ - private KvMessage sendRequest(String nodeIp, KvMessage request) throws InterruptedException { + private static final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + private KvMessage sendRequest(String nodeIp, KvMessage request) { String[] parts = nodeIp.split(":"); String host = parts[0]; int port = Integer.parseInt(parts[1]); - EventLoopGroup group = new NioEventLoopGroup(); - try { - Bootstrap b = new Bootstrap(); - ClientHandler handler = new ClientHandler(); - b.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer<>() { - @Override - protected void initChannel(Channel ch) { - ch.pipeline().addLast( - new ObjectEncoder(), - new ObjectDecoder(ClassResolvers.cacheDisabled(null)), - handler - ); - } - }); - ChannelFuture future = b.connect(host, port).sync(); - CompletableFuture responseFuture = new CompletableFuture<>(); - -// 添加 handler 时,把 responseFuture 传入 handler - future.channel().pipeline().addLast(new KvClientHandler(responseFuture)); - -// 发送请求 - future.channel().writeAndFlush(request).sync(); - System.out.println("Request sent, waiting for response..."); - - try { - // 等待响应,最多 5 秒 - KvMessage response = responseFuture.get(1, TimeUnit.SECONDS); - System.out.println("Got response: " + response); - } catch (TimeoutException e) { - System.out.println("Response timed out, closing channel..."); - } catch (Exception e) { - e.printStackTrace(); - } finally { - // 超时或正常都要安全关闭 channel - future.channel().close().sync(); - future.channel().closeFuture().sync(); - } - + // 1. 创建 Future 用于接收结果 + CompletableFuture responseFuture = new CompletableFuture<>(); + + Bootstrap b = new Bootstrap(); + b.group(workerGroup) // 复用全局线程池 + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // 连接超时 + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new ObjectEncoder(), + new ObjectDecoder(ClassResolvers.cacheDisabled(null)), + // 2. 直接在这里把 future 传给 Handler + new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, KvMessage msg) { + // 收到消息,完成 future + responseFuture.complete(msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 发生异常,通知 future 失败 + responseFuture.completeExceptionally(cause); + ctx.close(); + } + } + ); + } + }); - return handler.getResponse(); + Channel channel = null; + try { + // 连接 + ChannelFuture connectFuture = b.connect(host, port).sync(); + channel = connectFuture.channel(); + + // 发送请求 + System.out.println("Request sent to " + nodeIp + ", waiting for response..."); + channel.writeAndFlush(request); + + // 3. 等待结果 (等待 Handler 调用 complete) + KvMessage response = responseFuture.get(5, TimeUnit.SECONDS); + System.out.println("Got response: " + response); + return response; + + } catch (TimeoutException e) { + System.out.println("Response timed out from " + nodeIp); + throw new RuntimeException("Request timeout", e); } catch (Exception e) { - throw new RuntimeException(e); + e.printStackTrace(); + throw new RuntimeException("Request failed: " + e.getMessage(), e); } finally { - group.shutdownGracefully(); + // 4. 关闭连接 (注意:不要关闭 workerGroup,只关闭 channel) + if (channel != null) { + channel.close(); + } } } public byte[] testIP(String IP, String key) throws InterruptedException { diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java b/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java index 96dad30..a192b78 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java @@ -27,9 +27,9 @@ public void init() { // 保存键值 // curl -x "" -X POST "http://127.0.0.1:8082/api/kv/save?key=name&value=hty" @PostMapping("/save") - public String save(@RequestParam String key, @RequestParam String value) { + public String save(@RequestParam String key, @RequestParam String value, @RequestParam Integer repeat) { try { - client.put(key, value.getBytes()); + client.put(key, value.getBytes(),repeat); return "Saved key: " + key + ", value: " + value; } catch (Exception e) { e.printStackTrace(); diff --git a/dkv-common/pom.xml b/dkv-common/pom.xml index 2938538..c9ec2a8 100644 --- a/dkv-common/pom.xml +++ b/dkv-common/pom.xml @@ -38,12 +38,23 @@ - + + + + + + com.fasterxml.jackson.core jackson-databind - + + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + org.springframework.boot spring-boot-starter-test diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java b/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java index a8fecb3..63dbb97 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java @@ -1,42 +1,12 @@ package com.dkv.dkvmaster; -import com.dkv.dkvmaster.cluster.ClusterManager; -import com.dkv.dkvmaster.router.ConsistentHashRouter; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import java.util.Scanner; - @SpringBootApplication(scanBasePackages = {"com.dkv.dkvmaster", "com.dkv.dkvstorage"}) -public class DkvMasterApplication implements CommandLineRunner { - - private final ClusterManager clusterManager; - private final ConsistentHashRouter router; - - public DkvMasterApplication() { - this.router = new ConsistentHashRouter(); - this.clusterManager = new ClusterManager(router); - } +public class DkvMasterApplication { public static void main(String[] args) { SpringApplication.run(DkvMasterApplication.class, args); } - - @Override - public void run(String... args) throws Exception { - clusterManager.start(); - - System.out.println("输入任何字符串测试路由 (输入 'quit' 退出):"); - Scanner scanner = new Scanner(System.in); - while (scanner.hasNext()) { - String key = scanner.next(); - if ("quit".equals(key)) { - break; - } - - String targetNode = router.routeNode(key); - System.out.println("Key [" + key + "] ---> Node [" + targetNode + "]"); - } - } -} +} \ No newline at end of file diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java b/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java index d5715b3..2058ee3 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java @@ -4,6 +4,9 @@ import com.dkv.dkvmaster.dto.StartNodeRequest; import com.dkv.dkvmaster.router.ConsistentHashRouter; import com.dkv.dkvmaster.service.NodeOrchestrationService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -12,6 +15,7 @@ @RestController @RequestMapping("/api") @CrossOrigin // 允许跨域访问 +@Tag(name = "Master节点管理接口", description = "提供节点上下线、路由查询及集群编排功能") // 替代 @Api public class MasterController { private final ConsistentHashRouter router; @@ -25,40 +29,54 @@ public MasterController(ConsistentHashRouter router, ClusterManager clusterManag } // 查看当前所有在线节点 - @GetMapping("/home") - public String home() { - return "here is home"; - } @GetMapping("/nodes") + @Operation(summary = "查看在线节点", description = "返回当前集群中所有在线节点的列表") // 替代 @ApiOperation public List getOnlineNodes() { return clusterManager.getOnlineNodes(); } + @GetMapping("/add") - public void addNodes(@RequestParam("nodeip") String nodeIp,@RequestParam("port")Integer port){ - router.addNode(nodeIp,port); + @Operation(summary = "手动添加节点到路由(仅内存)", description = "仅更新本地路由表,不涉及ZK交互") + public void addNodes( + @Parameter(description = "节点IP地址", required = true, example = "192.168.1.10") @RequestParam("nodeip") String nodeIp, // 替代 @ApiParam + @Parameter(description = "节点端口", required = true, example = "8081") @RequestParam("port") Integer port) { + router.addNode(nodeIp, port); } - @PostMapping("/addtozk") - public void addNodes2ZK(@RequestParam("nodeip") String nodeIp, @RequestParam("port")Integer port ) throws Exception { - clusterManager.addNodeToZk(nodeIp,port); + @PostMapping("/addtozk") + @Operation(summary = "添加节点到Zookeeper", description = "将新节点注册到Zookeeper集群管理中") + public void addNodes2ZK( + @Parameter(description = "节点IP地址", required = true) @RequestParam("nodeip") String nodeIp, + @Parameter(description = "节点端口", required = true) @RequestParam("port") Integer port) throws Exception { + clusterManager.addNodeToZk(nodeIp, port); } - @PostMapping("/delete") - public void deleteNode(@RequestParam("nodeip") String nodeIp,@RequestParam("port") Integer port) throws Exception { + @PostMapping("/delete") + @Operation(summary = "节点下线", description = "将指定节点从集群中移除") + public void deleteNode( + @Parameter(description = "节点IP地址", required = true) @RequestParam("nodeip") String nodeIp, + @Parameter(description = "节点端口", required = true) @RequestParam("port") Integer port) throws Exception { + String nodeId = nodeIp+":"+port; + orchestrationService.stop(nodeId); clusterManager.offlineNode(nodeIp, port); } + @GetMapping("/debug/ring") + @Operation(summary = "Debug: 查看哈希环", description = "打印当前一致性哈希环中所有去重后的物理节点信息") public Object getRing() { - // 打印当前环中所有去重后的物理节点 return router.getRings(); } + // 查询 Key 的路由信息 @GetMapping("/route") - public Map getRoute(@RequestParam String key, @RequestParam(defaultValue = "3") int replicas) { + @Operation(summary = "查询Key路由信息", description = "计算指定Key在一致性哈希环上的主节点及副本节点") + public Map getRoute( + @Parameter(description = "存储的Key", required = true) @RequestParam String key, + @Parameter(description = "需要的副本数量") @RequestParam(defaultValue = "3") int replicas) { List targets = router.routeNodeWithReplicas(key, replicas); Map response = new HashMap<>(); response.put("key", key); - response.put("hash", String.format("0x%08X", key.hashCode())); // 展示哈希值 + response.put("hash", String.format("0x%08X", key.hashCode())); response.put("primary", targets.isEmpty() ? "None" : targets.get(0)); response.put("secondary", targets.size() > 1 ? targets.subList(1, targets.size()) : Collections.emptyList()); response.put("replicas", targets.size() <= 1 ? Collections.emptyList() : targets.subList(1, targets.size())); @@ -66,7 +84,9 @@ public Map getRoute(@RequestParam String key, @RequestParam(defa } @PostMapping("/datanode/start-and-join-zk") - public ResponseEntity> startAndJoinZk(@RequestBody StartNodeRequest request) { + @Operation(summary = "启动DataNode并加入ZK", description = "本地启动DataNode进程并自动注册") + public ResponseEntity> startAndJoinZk( + @Parameter(description = "启动请求参数实体", required = true) @RequestBody StartNodeRequest request) { try { Map res = orchestrationService.startAndRegister(request); boolean ok = Boolean.TRUE.equals(res.get("success")); diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java b/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java index 9ebc0da..f7b13bc 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java @@ -28,7 +28,11 @@ public NodeOrchestrationService( this.clusterManager = clusterManager; this.router = router; } + public boolean stop(String nodeId){ + boolean b = dataNodeManager.stopDataNode(nodeId); + return b; + } public Map startAndRegister(StartNodeRequest req) throws Exception { String nodeId = req.getNodeId(); String host = req.getHost(); diff --git a/dkv-master/src/main/resources/static/index.html b/dkv-master/src/main/resources/static/index.html index 65f9187..fe75552 100644 --- a/dkv-master/src/main/resources/static/index.html +++ b/dkv-master/src/main/resources/static/index.html @@ -80,14 +80,21 @@
⚡ 数据操作控制台
-
- - +
+
+ + +
+
+ + +
+
@@ -97,7 +104,12 @@
⚡ 数据操作控制台
- +
+ + +
> Waiting for instructions... @@ -153,12 +165,19 @@
🔍 路由模拟追踪