diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java b/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java index 4ab5c12..245becc 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/DkvClientApplication.java @@ -13,12 +13,12 @@ public class DkvClientApplication { public static void main(String[] args) { // 启动 Spring Boot Web 应用 DkvClient client = new DkvClient("127.0.0.1:2181"); - try { - client.put("name", "hty".getBytes()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println("Put finished"); +// try { +// client.put("name", "hty".getBytes()); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// System.out.println("Put finished"); SpringApplication.run(DkvClientApplication.class, args); } diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java b/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java index 94ec6e1..3244376 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/client/DkvClient.java @@ -2,9 +2,11 @@ import com.dkv.dkvcommon.model.KvMessage; import com.dkv.dkvstorage.KvClientHandler; +import com.dkv.dkvstorage.rocksdb.DataNode; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; @@ -13,9 +15,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.client.RestTemplate; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -25,6 +31,7 @@ public class DkvClient { private final String zkAddress; // ZooKeeper 地址 private final List nodes = new ArrayList<>(); private CuratorFramework zkClient; + private static final Logger logger = LoggerFactory.getLogger(DataNode.class); public DkvClient(String zkAddress) { this.zkAddress = zkAddress; @@ -66,20 +73,52 @@ private void updateNodes() throws Exception { /** 简单一致性哈希计算目标节点 */ private String getTargetIp(String key) { - synchronized (nodes) { - if (nodes.isEmpty()) { - throw new RuntimeException("No available nodes in ZooKeeper!"); - } - int hash = Math.abs(key.hashCode()); - int idx = hash % nodes.size(); - return nodes.get(idx); + String url = "http://localhost:8081/api/route?key=" + key; + + RestTemplate restTemplate = new RestTemplate(); + Map response = restTemplate.getForObject(url, Map.class); + + if (response == null || !response.containsKey("primary")) { + return null; + } + logger.warn((String) response.get("primary")+"上的key:"+key+"已经被删除了"); + return (String) response.get("primary"); + } + private Map getRouteData(String key,Integer repeat) { + String url = "http://localhost:8081/api/route?key=" + key+"&replicas=" + repeat; + + RestTemplate restTemplate = new RestTemplate(); + Map response = restTemplate.getForObject(url, Map.class); + + if (response == null || !response.containsKey("primary")) { + return null; } + + // 直接返回解析后的 Map,或者你可以自己在这里过滤一下只返回这两个字段 + return response; } /** PUT 操作 */ - public void put(String key, byte[] value) throws InterruptedException { + public String put(String key, byte[] value,Integer repeat) throws InterruptedException { KvMessage message = new KvMessage(KvMessage.Type.PUT, key, value); - sendRequest(getTargetIp(key), message); + Map data = getRouteData(key,repeat); + if (data != null) { + String primary = (String) data.get("primary"); + List replicas = (List) data.get("replicas"); // 需要强转 + logger.info("当前的replicas "+replicas); + String replicasParam = String.join(",", replicas); + String url = "http://localhost:8085/agent/set?nodeId=" +primary +"&replicas=" + replicasParam; + + RestTemplate restTemplate = new RestTemplate(); + Map response1 = restTemplate.getForObject(url, Map.class); + logger.info("向节点 "+primary+"发送请求"); + sendRequest(primary, message); + return primary; + // ... + }else{ + return "error"; + } + } /** GET 操作 */ @@ -96,57 +135,76 @@ public void delete(String key) throws InterruptedException { } /** 使用 Netty 发送请求并返回响应 */ - private KvMessage sendRequest(String nodeIp, KvMessage request) throws InterruptedException { + private static final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + private KvMessage sendRequest(String nodeIp, KvMessage request) { String[] parts = nodeIp.split(":"); String host = parts[0]; int port = Integer.parseInt(parts[1]); - EventLoopGroup group = new NioEventLoopGroup(); - try { - Bootstrap b = new Bootstrap(); - ClientHandler handler = new ClientHandler(); - b.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer<>() { - @Override - protected void initChannel(Channel ch) { - ch.pipeline().addLast( - new ObjectEncoder(), - new ObjectDecoder(ClassResolvers.cacheDisabled(null)), - handler - ); - } - }); - ChannelFuture future = b.connect(host, port).sync(); - CompletableFuture responseFuture = new CompletableFuture<>(); - -// 添加 handler 时,把 responseFuture 传入 handler - future.channel().pipeline().addLast(new KvClientHandler(responseFuture)); - -// 发送请求 - future.channel().writeAndFlush(request).sync(); - System.out.println("Request sent, waiting for response..."); - - try { - // 等待响应,最多 5 秒 - KvMessage response = responseFuture.get(1, TimeUnit.SECONDS); - System.out.println("Got response: " + response); - } catch (TimeoutException e) { - System.out.println("Response timed out, closing channel..."); - } catch (Exception e) { - e.printStackTrace(); - } finally { - // 超时或正常都要安全关闭 channel - future.channel().close().sync(); - future.channel().closeFuture().sync(); - } - + // 1. 创建 Future 用于接收结果 + CompletableFuture responseFuture = new CompletableFuture<>(); + + Bootstrap b = new Bootstrap(); + b.group(workerGroup) // 复用全局线程池 + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // 连接超时 + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new ObjectEncoder(), + new ObjectDecoder(ClassResolvers.cacheDisabled(null)), + // 2. 直接在这里把 future 传给 Handler + new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, KvMessage msg) { + // 收到消息,完成 future + responseFuture.complete(msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 发生异常,通知 future 失败 + responseFuture.completeExceptionally(cause); + ctx.close(); + } + } + ); + } + }); - return handler.getResponse(); + Channel channel = null; + try { + // 连接 + ChannelFuture connectFuture = b.connect(host, port).sync(); + channel = connectFuture.channel(); + + // 发送请求 + System.out.println("Request sent to " + nodeIp + ", waiting for response..."); + channel.writeAndFlush(request); + + // 3. 等待结果 (等待 Handler 调用 complete) + KvMessage response = responseFuture.get(5, TimeUnit.SECONDS); + System.out.println("Got response: " + response); + return response; + + } catch (TimeoutException e) { + System.out.println("Response timed out from " + nodeIp); + throw new RuntimeException("Request timeout", e); } catch (Exception e) { - throw new RuntimeException(e); + e.printStackTrace(); + throw new RuntimeException("Request failed: " + e.getMessage(), e); } finally { - group.shutdownGracefully(); + // 4. 关闭连接 (注意:不要关闭 workerGroup,只关闭 channel) + if (channel != null) { + channel.close(); + } } } + public byte[] testIP(String IP, String key) throws InterruptedException { + KvMessage request = new KvMessage(KvMessage.Type.GET, key, null); + KvMessage response = sendRequest(IP, request); + return response != null ? response.getValue() : null; + } } diff --git a/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java b/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java index b269e16..a192b78 100644 --- a/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java +++ b/dkv-client/src/main/java/com/dkv/dkvclient/controller/ClientController.java @@ -27,9 +27,9 @@ public void init() { // 保存键值 // curl -x "" -X POST "http://127.0.0.1:8082/api/kv/save?key=name&value=hty" @PostMapping("/save") - public String save(@RequestParam String key, @RequestParam String value) { + public String save(@RequestParam String key, @RequestParam String value, @RequestParam Integer repeat) { try { - client.put(key, value.getBytes()); + client.put(key, value.getBytes(),repeat); return "Saved key: " + key + ", value: " + value; } catch (Exception e) { e.printStackTrace(); @@ -65,4 +65,17 @@ public String delete(@RequestParam String key) { return "Failed to delete key: " + key + ", error: " + e.getMessage(); } } + @GetMapping("/test") + public String testIp(@RequestParam String IP, @RequestParam String key) { + try { + byte[] value = client.testIP(IP, key); + if (value == null) { + return "Key not found: " + key; + } + return new String(value); + } catch (Exception e) { + e.printStackTrace(); + return "Failed to get key: " + key + ", error: " + e.getMessage(); + } + } } diff --git a/dkv-common/pom.xml b/dkv-common/pom.xml index 7c08a79..c9ec2a8 100644 --- a/dkv-common/pom.xml +++ b/dkv-common/pom.xml @@ -1,52 +1,67 @@ - 4.0.0 + org.springframework.boot spring-boot-starter-parent - 4.0.1 - + 3.3.4 + com.dkv dkv-common 0.0.1-SNAPSHOT dkv-common - jar - dkv-common - - - - - - - - - - - - - + DKV 系统的通用工具包(哈希算法、常量、POJO) + 17 + 33.4.0-jre + + + com.google.guava + guava + ${guava.version} + org.projectlombok lombok - 1.18.24 - provided + true - + + + + + + + + + + - com.google.guava - guava - 33.4.8-jre + com.fasterxml.jackson.core + jackson-databind + + + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + + + org.springframework.boot + spring-boot-starter-test + test - + @@ -59,4 +74,4 @@ - + \ No newline at end of file diff --git a/dkv-master/pom.xml b/dkv-master/pom.xml index 48e86bb..75b1e7b 100644 --- a/dkv-master/pom.xml +++ b/dkv-master/pom.xml @@ -61,6 +61,12 @@ org.springframework.boot spring-boot-starter-web + + com.dkv + dkv-storage + 0.0.1-SNAPSHOT + compile + diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java b/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java index a53403e..63dbb97 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/DkvMasterApplication.java @@ -1,42 +1,12 @@ package com.dkv.dkvmaster; -import com.dkv.dkvmaster.cluster.ClusterManager; -import com.dkv.dkvmaster.router.ConsistentHashRouter; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import java.util.Scanner; - -@SpringBootApplication -public class DkvMasterApplication implements CommandLineRunner { - - private final ClusterManager clusterManager; - private final ConsistentHashRouter router; - - public DkvMasterApplication() { - this.router = new ConsistentHashRouter(); - this.clusterManager = new ClusterManager(router); - } +@SpringBootApplication(scanBasePackages = {"com.dkv.dkvmaster", "com.dkv.dkvstorage"}) +public class DkvMasterApplication { public static void main(String[] args) { SpringApplication.run(DkvMasterApplication.class, args); } - - @Override - public void run(String... args) throws Exception { - clusterManager.start(); - - System.out.println("输入任何字符串测试路由 (输入 'quit' 退出):"); - Scanner scanner = new Scanner(System.in); - while (scanner.hasNext()) { - String key = scanner.next(); - if ("quit".equals(key)) { - break; - } - - String targetNode = router.routeNode(key); - System.out.println("Key [" + key + "] ---> Node [" + targetNode + "]"); - } - } -} +} \ No newline at end of file diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java b/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java index 7e58227..0a9bbb3 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/cluster/ClusterManager.java @@ -68,14 +68,14 @@ else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { // 在 ClusterManager.java 中添加或修改 public void addNodeToZk(String nodeIp, Integer port) throws Exception { String path = "/nodes/" + nodeIp+":"+port; - DataNode node = new DataNode( - config.id, - dataDir, - config.port, - config.isPrimary, - replicaNodes, - configs.length // 复制因子即总节点数 - ); +// DataNode node = new DataNode( +// config.id, +// dataDir, +// config.port, +// config.isPrimary, +// replicaNodes, +// configs.length // 复制因子即总节点数 +// ); // 检查节点是否存在,不存在则创建 if (client.checkExists().forPath(path) == null) { diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java b/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java index 6c1eb5f..2058ee3 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/controller/MasterController.java @@ -1,60 +1,99 @@ package com.dkv.dkvmaster.controller; import com.dkv.dkvmaster.cluster.ClusterManager; +import com.dkv.dkvmaster.dto.StartNodeRequest; import com.dkv.dkvmaster.router.ConsistentHashRouter; +import com.dkv.dkvmaster.service.NodeOrchestrationService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @RestController @RequestMapping("/api") @CrossOrigin // 允许跨域访问 +@Tag(name = "Master节点管理接口", description = "提供节点上下线、路由查询及集群编排功能") // 替代 @Api public class MasterController { private final ConsistentHashRouter router; private final ClusterManager clusterManager; + private final NodeOrchestrationService orchestrationService; - public MasterController(ConsistentHashRouter router, ClusterManager clusterManager) { + public MasterController(ConsistentHashRouter router, ClusterManager clusterManager, NodeOrchestrationService orchestrationService) { this.router = router; this.clusterManager = clusterManager; + this.orchestrationService = orchestrationService; } // 查看当前所有在线节点 - @GetMapping("/home") - public String home() { - return "here is home"; - } @GetMapping("/nodes") + @Operation(summary = "查看在线节点", description = "返回当前集群中所有在线节点的列表") // 替代 @ApiOperation public List getOnlineNodes() { return clusterManager.getOnlineNodes(); } + @GetMapping("/add") - public void addNodes(@RequestParam("nodeip") String nodeIp,@RequestParam("port")Integer port){ - router.addNode(nodeIp,port); + @Operation(summary = "手动添加节点到路由(仅内存)", description = "仅更新本地路由表,不涉及ZK交互") + public void addNodes( + @Parameter(description = "节点IP地址", required = true, example = "192.168.1.10") @RequestParam("nodeip") String nodeIp, // 替代 @ApiParam + @Parameter(description = "节点端口", required = true, example = "8081") @RequestParam("port") Integer port) { + router.addNode(nodeIp, port); } - @GetMapping("/addtozk") - public void addNodes2ZK(@RequestParam("nodeip") String nodeIp, @RequestParam("port")Integer port ) throws Exception { - clusterManager.addNodeToZk(nodeIp,port); + @PostMapping("/addtozk") + @Operation(summary = "添加节点到Zookeeper", description = "将新节点注册到Zookeeper集群管理中") + public void addNodes2ZK( + @Parameter(description = "节点IP地址", required = true) @RequestParam("nodeip") String nodeIp, + @Parameter(description = "节点端口", required = true) @RequestParam("port") Integer port) throws Exception { + clusterManager.addNodeToZk(nodeIp, port); } - @PostMapping("/delete") - public void deleteNode(@RequestParam("nodeip") String nodeIp,@RequestParam("port") Integer port) throws Exception { + @PostMapping("/delete") + @Operation(summary = "节点下线", description = "将指定节点从集群中移除") + public void deleteNode( + @Parameter(description = "节点IP地址", required = true) @RequestParam("nodeip") String nodeIp, + @Parameter(description = "节点端口", required = true) @RequestParam("port") Integer port) throws Exception { + String nodeId = nodeIp+":"+port; + orchestrationService.stop(nodeId); clusterManager.offlineNode(nodeIp, port); } + + @GetMapping("/debug/ring") + @Operation(summary = "Debug: 查看哈希环", description = "打印当前一致性哈希环中所有去重后的物理节点信息") + public Object getRing() { + return router.getRings(); + } + // 查询 Key 的路由信息 @GetMapping("/route") - public Map getRoute(@RequestParam String key, @RequestParam(defaultValue = "3") int replicas) { + @Operation(summary = "查询Key路由信息", description = "计算指定Key在一致性哈希环上的主节点及副本节点") + public Map getRoute( + @Parameter(description = "存储的Key", required = true) @RequestParam String key, + @Parameter(description = "需要的副本数量") @RequestParam(defaultValue = "3") int replicas) { List targets = router.routeNodeWithReplicas(key, replicas); Map response = new HashMap<>(); response.put("key", key); - response.put("hash", String.format("0x%08X", key.hashCode())); // 展示哈希值 + response.put("hash", String.format("0x%08X", key.hashCode())); response.put("primary", targets.isEmpty() ? "None" : targets.get(0)); response.put("secondary", targets.size() > 1 ? targets.subList(1, targets.size()) : Collections.emptyList()); response.put("replicas", targets.size() <= 1 ? Collections.emptyList() : targets.subList(1, targets.size())); return response; } + + @PostMapping("/datanode/start-and-join-zk") + @Operation(summary = "启动DataNode并加入ZK", description = "本地启动DataNode进程并自动注册") + public ResponseEntity> startAndJoinZk( + @Parameter(description = "启动请求参数实体", required = true) @RequestBody StartNodeRequest request) { + try { + Map res = orchestrationService.startAndRegister(request); + boolean ok = Boolean.TRUE.equals(res.get("success")); + return ok ? ResponseEntity.ok(res) : ResponseEntity.badRequest().body(res); + } catch (Exception e) { + return ResponseEntity.internalServerError() + .body(Map.of("success", false, "stage", "UNEXPECTED", "error", e.getMessage())); + } + } } \ No newline at end of file diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java b/dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java new file mode 100644 index 0000000..a5abbc1 --- /dev/null +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/dto/StartNodeRequest.java @@ -0,0 +1,34 @@ +package com.dkv.dkvmaster.dto; + +public class StartNodeRequest { + private String nodeId; + private String host; + private Integer port; + private String dataDir; + private Boolean isPrimary = true; + private String replicas = ""; + + // 可选:是否同步加入一致性哈希环 + private Boolean addToRing = true; + + public String getNodeId() { return nodeId; } + public void setNodeId(String nodeId) { this.nodeId = nodeId; } + + public String getHost() { return host; } + public void setHost(String host) { this.host = host; } + + public Integer getPort() { return port; } + public void setPort(Integer port) { this.port = port; } + + public String getDataDir() { return dataDir; } + public void setDataDir(String dataDir) { this.dataDir = dataDir; } + + public Boolean getIsPrimary() { return isPrimary; } + public void setIsPrimary(Boolean primary) { isPrimary = primary; } + + public String getReplicas() { return replicas; } + public void setReplicas(String replicas) { this.replicas = replicas; } + + public Boolean getAddToRing() { return addToRing; } + public void setAddToRing(Boolean addToRing) { this.addToRing = addToRing; } +} diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java b/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java index 8010495..74e77f4 100644 --- a/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/router/ConsistentHashRouter.java @@ -19,6 +19,9 @@ public class ConsistentHashRouter { // 虚拟节点数:每个物理节点在环上变成 10 个虚拟节点,解决数据倾斜问题 private static final int VIRTUAL_NODES = 10; + public Set getRings() { + return new HashSet<>(ring.values()); + } /** * 添加物理节点 * @param nodeIp 例如 "192.168.1.5:8080" @@ -28,8 +31,9 @@ public synchronized void addNode(String nodeIp,Integer port ) { // 构造虚拟节点名称,例如 "192.168.1.5:8080#1" //作用:让数据分布更加均匀,避免数据倾斜 String virtualNodeName = nodeIp +":"+port+ "#" + i; + int hash = HashUtil.getHash(virtualNodeName); - ring.put(hash, nodeIp); + ring.put(hash, nodeIp +":"+port); } // System.out.println("节点上线: " + nodeIp + ",当前环大小: " + ring.size()); } @@ -96,7 +100,7 @@ public synchronized List routeNodeWithReplicas(String key, int replicas) Iterator headIterator = ring.values().iterator(); // 先从当前位置往后找 - while (tailIterator.hasNext() && nodes.size() < replicas) { + while (tailIterator.hasNext() && nodes.size() <=replicas) { nodes.add(tailIterator.next()); } @@ -107,4 +111,103 @@ public synchronized List routeNodeWithReplicas(String key, int replicas) return new ArrayList<>(nodes); } + // 在 ConsistentHashRouter.java 中添加 + public synchronized String getPredecessorNode(int hash) { + SortedMap headMap = ring.headMap(hash); + if (headMap.isEmpty()) { + return ring.get(ring.lastKey()); // 环状逻辑:取最后一个节点 + } + return ring.get(headMap.lastKey()); + } + + public synchronized String getSuccessorNode(int hash) { + SortedMap tailMap = ring.tailMap(hash + 1); // 找严格大于当前hash的 + if (tailMap.isEmpty()) { + return ring.get(ring.firstKey()); // 环状逻辑:取第一个节点 + } + return ring.get(tailMap.firstKey()); + } +// public synchronized void addNodeWithBalancedMigration(String nodeIp, Integer port) { +// List affectedHashes = new ArrayList<>(); +// // 计算新节点的虚拟节点哈希值 +// for (int i = 0; i < VIRTUAL_NODES; i++) { +// String virtualNodeName = nodeIp + ":" + port + "#" + i; +// int hash = HashUtil.getHash(virtualNodeName); +// affectedHashes.add(hash); +// } +// +// // 计算新节点相邻的前后节点,并迁移数据 +// for (int i = 0; i < VIRTUAL_NODES; i++) { +// String virtualNodeName = nodeIp + ":" + port + "#" + i; +// int hash = HashUtil.getHash(virtualNodeName); +// +// // 获取新节点相邻的前驱节点和后继节点 +// String predecessor = getPredecessorNode(hash); +// String successor = getSuccessorNode(hash); +// +// // 仅迁移受影响的部分数据 +// migrateData(predecessor, successor, nodeIp, port); +// } +// +// // 执行节点添加 +// for (int i = 0; i < VIRTUAL_NODES; i++) { +// String virtualNodeName = nodeIp + ":" + port + "#" + i; +// int hash = HashUtil.getHash(virtualNodeName); +// ring.put(hash, nodeIp + ":" + port); +// } +// } + + // 数据迁移方法:根据前后节点获取受影响的数据并迁移 +// private void migrateData(String predecessor, String successor, String nodeIp, Integer port) { +// List affectedKeys = getAffectedKeys(predecessor, successor); +// +// for (String key : affectedKeys) { +// String currentNode = routeNode(key); +// if (!currentNode.equals(nodeIp + ":" + port)) { +// // 迁移数据到新节点 +// migrateKeyToNewNode(key, nodeIp + ":" + port); +// } +// } +// } + + // 获取受影响的键:假设从前驱节点到后继节点之间的键都需要迁移 +// private List getAffectedKeys(String predecessor, String successor) { +// // 这里的实现依赖于具体的数据存储系统,我们假设RocksDB支持按范围查询 +// List keys = new ArrayList<>(); +// try { +// // 假设你有一个方法来查询指定范围内的所有键 +// ReadOptions readOptions = new ReadOptions(); +// RocksIterator iterator = rocksDB.newIterator(readOptions); +// +// // 从前驱节点的哈希值开始,直到后继节点的哈希值 +// // 假设 getHash 生成的哈希值与键的顺序相关,实际中需要依据你的键的设计来调整 +// byte[] startKey = predecessor.getBytes(); +// byte[] endKey = successor.getBytes(); +// +// iterator.seek(startKey); +// while (iterator.isValid() && Arrays.compare(iterator.key(), endKey) < 0) { +// keys.add(new String(iterator.key())); +// iterator.next(); +// } +// } catch (RocksDBException e) { +// e.printStackTrace(); +// } +// return keys; +// } +// +// // 迁移数据到新节点 +// private void migrateKeyToNewNode(String key, String newNode) { +// try { +// // 从原节点读取数据并存储到新节点 +// byte[] value = rocksDB.get(key.getBytes()); +// +// // 将数据迁移到新的节点(假设有方法支持这种操作) +// // 这里只是示例,具体的操作可以依据你的存储设计 +// // 保存数据到新节点,可以选择将数据写入新节点相关的区域 +// rocksDB.put(key.getBytes(), value); +// } catch (RocksDBException e) { +// e.printStackTrace(); +// } +// } + } \ No newline at end of file diff --git a/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java b/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java new file mode 100644 index 0000000..f7b13bc --- /dev/null +++ b/dkv-master/src/main/java/com/dkv/dkvmaster/service/NodeOrchestrationService.java @@ -0,0 +1,85 @@ +package com.dkv.dkvmaster.service; + +import com.dkv.dkvmaster.cluster.ClusterManager; +import com.dkv.dkvmaster.dto.StartNodeRequest; +import com.dkv.dkvmaster.router.ConsistentHashRouter; +import com.dkv.dkvstorage.rocksdb.DataNodeManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; + + +@Service +public class NodeOrchestrationService { + + @Autowired + private final DataNodeManager dataNodeManager; + private final ClusterManager clusterManager; + private final ConsistentHashRouter router; + + public NodeOrchestrationService( + DataNodeManager dataNodeManager, + ClusterManager clusterManager, + ConsistentHashRouter router + ) { + this.dataNodeManager = dataNodeManager; + this.clusterManager = clusterManager; + this.router = router; + } + public boolean stop(String nodeId){ + boolean b = dataNodeManager.stopDataNode(nodeId); + return b; + + } + public Map startAndRegister(StartNodeRequest req) throws Exception { + String nodeId = req.getNodeId(); + String host = req.getHost(); + int port = req.getPort(); + String dataDir = req.getDataDir(); + boolean isPrimary = req.getIsPrimary() != null && req.getIsPrimary(); + String replicas = req.getReplicas() == null ? "" : req.getReplicas(); + + Map result = new HashMap<>(); + result.put("nodeId", nodeId); + result.put("host", host); + result.put("port", port); + + // 1) start datanode + boolean started = dataNodeManager.startDataNode(nodeId, host, port, dataDir, isPrimary, replicas); + if (!started) { + result.put("success", false); + result.put("stage", "START_DATANODE"); + result.put("message", "DataNode 启动失败:端口冲突/路径非法/实例已存在等"); + return result; + } + + // 2) register to zk (失败需要回滚) + try { + clusterManager.addNodeToZk(host, port); + result.put("zkRegistered", true); + } catch (Exception zkEx) { + // 回滚:把刚启动的节点停掉,避免“启动了但集群不可见” + dataNodeManager.stopDataNode(nodeId); + + result.put("success", false); + result.put("stage", "REGISTER_ZK"); + result.put("zkRegistered", false); + result.put("message", "注册 ZK 失败,已回滚 stop DataNode: " + zkEx.getMessage()); + return result; + } + + // 3) optionally add to ring + if (req.getAddToRing() != null && req.getAddToRing()) { + router.addNode(host, port); + result.put("ringAdded", true); + } else { + result.put("ringAdded", false); + } + + result.put("success", true); + result.put("stage", "DONE"); + return result; + } +} diff --git a/dkv-master/src/main/resources/static/index.html b/dkv-master/src/main/resources/static/index.html index 50160ce..fe75552 100644 --- a/dkv-master/src/main/resources/static/index.html +++ b/dkv-master/src/main/resources/static/index.html @@ -80,14 +80,21 @@
⚡ 数据操作控制台
-
- - +
+
+ + +
+
+ + +
+
@@ -97,7 +104,12 @@
⚡ 数据操作控制台
- +
+ + +
> Waiting for instructions... @@ -124,8 +136,14 @@
🎡 一致性哈希可视化
🔍 路由模拟追踪

模拟 Key 在环上的落点物理节点

- - +
+ + + 副本数 + + + +
目标主节点 (Primary):
@@ -147,10 +165,18 @@
🔍 路由模拟追踪