diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java index 023556c..ba90d44 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/ConsistencyTest.java @@ -1,6 +1,5 @@ package com.dkv.dkvstorage; - import com.dkv.dkvcommon.model.KvMessage; import java.nio.charset.StandardCharsets; import java.util.UUID; diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/ManualTest.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/ManualTest.java new file mode 100644 index 0000000..bfa1856 --- /dev/null +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/ManualTest.java @@ -0,0 +1,131 @@ +package com.dkv.dkvstorage; + +import com.dkv.dkvstorage.controller.DataNodeController; +import com.dkv.dkvstorage.rocksdb.DataNodeManager; +import org.springframework.http.ResponseEntity; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class ManualTest { + + public static void main(String[] args) throws IOException { + System.out.println("=== 测试DataNodeController ==="); + + // 1. 创建DataNodeManager(需要先创建这个类或模拟) + DataNodeManager dataNodeManager = new DataNodeManager(); + + // 2. 创建Controller实例 + DataNodeController controller = new DataNodeController(); + + // 3. 手动设置依赖(因为不能使用@Autowired) + // 这里需要使用反射或修改Controller代码 + // 方法A:添加setter方法(修改Controller) + // 方法B:使用反射(如下) + setPrivateField(controller, "dataNodeManager", dataNodeManager); + + // 4. 测试各个方法 + testStartDataNode1(controller); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 恢复中断状态 + } + testStartDataNode2(controller); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 恢复中断状态 + } + testGetNodeStatus(controller); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 恢复中断状态 + } + testGetAllNodes(controller); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 恢复中断状态 + } +// testStopDataNode1(controller); +// try { +// Thread.sleep(400); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); // 恢复中断状态 +// } + + testStopDataNode2(controller); + + + dataNodeManager.shutdown(); + } + + private static void setPrivateField(Object target, String fieldName, Object value) { + try { + java.lang.reflect.Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static void testStartDataNode1(DataNodeController controller) { + System.out.println("\n=== 测试startDataNode1 ==="); + Map request = new HashMap<>(); + request.put("nodeId", "node1"); + request.put("host", "127.0.0.1"); + request.put("port", 9000); + request.put("dataDir", "/dkv/datanode1"); + request.put("isPrimary", true); + request.put("replicas", "127.0.0.1:9001,127.0.0.1:9002"); + ResponseEntity> response = controller.startDataNode(request); + System.out.println("响应: " + response.getBody()); + } + + private static void testStartDataNode2(DataNodeController controller) { + System.out.println("\n=== 测试startDataNode2 ==="); + Map request = new HashMap<>(); + request.put("nodeId", "node2"); + request.put("host", "127.0.0.1"); + request.put("port", 9001); + request.put("dataDir", "/dkv/datanode2"); + request.put("isPrimary", false); + request.put("replicas", "127.0.0.1:9000,127.0.0.1:9002"); + ResponseEntity> response = controller.startDataNode(request); + System.out.println("响应: " + response.getBody()); + } + + private static void testStopDataNode1(DataNodeController controller) { + System.out.println("\n=== 测试stopDataNode ==="); + Map request = new HashMap<>(); + request.put("nodeId", "node1"); + + ResponseEntity> response = controller.stopDataNode(request); + System.out.println("响应: " + response.getBody()); + } + + private static void testStopDataNode2(DataNodeController controller) { + System.out.println("\n=== 测试stopDataNode ==="); + Map request = new HashMap<>(); + request.put("nodeId", "node2"); + + ResponseEntity> response = controller.stopDataNode(request); + System.out.println("响应: " + response.getBody()); + } + + private static void testGetNodeStatus(DataNodeController controller) { + System.out.println("\n=== 测试getNodeStatus ==="); + ResponseEntity> response = controller.getNodeStatus("node1"); + System.out.println("响应: " + response.getBody()); + } + + private static void testGetAllNodes(DataNodeController controller) { + System.out.println("\n=== 测试getAllNodes ==="); + ResponseEntity> response = controller.getAllNodes(); + System.out.println("响应: " + response.getBody()); + } +} \ No newline at end of file diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java index acaa2ff..ed0764e 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/DataNodeAgent.java @@ -51,12 +51,12 @@ protected void initChannel(SocketChannel ch) { ChannelFuture f = b.bind(AGENT_PORT).sync(); logger.info("DataNode Agent started on port {}", AGENT_PORT); - // 添加关闭钩子 - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - shutdown(); - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - })); +// // 添加关闭钩子 +// Runtime.getRuntime().addShutdownHook(new Thread(() -> { +// shutdown(); +// bossGroup.shutdownGracefully(); +// workerGroup.shutdownGracefully(); +// })); f.channel().closeFuture().sync(); @@ -171,6 +171,7 @@ private Map handleStart(Map request) { private Map handleStop(Map request) { String nodeId = (String) request.get("nodeId"); DataNode node = runningNodes.get(nodeId); + logger.info("Process Stop"); if (node == null) { return Map.of( @@ -200,7 +201,7 @@ private Map handleStop(Map request) { private Map handleStatus(Map request) { String nodeId = (String) request.get("nodeId"); - + logger.info("Process Status"); if (nodeId == null) { // 返回所有节点状态 List> nodesInfo = new ArrayList<>(); diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/NettyAgentClient.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/NettyAgentClient.java index 18ef57d..e5874ae 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/NettyAgentClient.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/agent/NettyAgentClient.java @@ -1,5 +1,6 @@ package com.dkv.dkvstorage.agent; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; @@ -8,181 +9,329 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.handler.timeout.WriteTimeoutHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public class NettyAgentClient { - private static final Logger logger = LoggerFactory.getLogger(NettyAgentClient.class); - private static final ObjectMapper objectMapper = new ObjectMapper(); private final String host; private final int port; - private final int connectTimeout; - private final int requestTimeout; - private EventLoopGroup group; - private Channel channel; - private Bootstrap bootstrap; - private final ResponseHandler responseHandler; + private final EventLoopGroup group; + private final Bootstrap bootstrap; - public NettyAgentClient(String host, int port) { - this(host, port, 5000, 10000); - } + private volatile Channel channel; + + private final BlockingQueue> responseQueue = + new LinkedBlockingQueue<>(); - public NettyAgentClient(String host, int port, int connectTimeout, int requestTimeout) { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public NettyAgentClient(String host, int port) { this.host = host; this.port = port; - this.connectTimeout = connectTimeout; - this.requestTimeout = requestTimeout; - this.responseHandler = new ResponseHandler(); - } - - /** - * 连接到Agent服务器 - */ - public void connect() throws Exception { - if (channel != null && channel.isActive()) { - logger.debug("Already connected to {}:{}", host, port); - return; - } - logger.info("Connecting to Agent at {}:{}", host, port); + this.group = new NioEventLoopGroup(); + this.bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new StringDecoder()); + p.addLast(new StringEncoder()); + p.addLast(new ClientHandler(responseQueue)); + } + }); + connect(); - group = new NioEventLoopGroup(1); + } + private synchronized void connect() { try { - bootstrap = new Bootstrap(); - bootstrap.group(group) - .channel(NioSocketChannel.class) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - - // 超时处理器 - pipeline.addLast(new ReadTimeoutHandler(requestTimeout / 1000)); - pipeline.addLast(new WriteTimeoutHandler(requestTimeout / 1000)); - - // 字符串编解码器 - pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8)); - pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8)); - - // 响应处理器 - pipeline.addLast(responseHandler); - } - }); + if (channel != null && channel.isActive()) { + return; + } ChannelFuture future = bootstrap.connect(host, port).sync(); - channel = future.channel(); - - logger.info("Connected to Agent at {}:{}", host, port); + this.channel = future.channel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Netty connect interrupted", e); } catch (Exception e) { - logger.error("Failed to connect to Agent {}:{} - {}", host, port, e.getMessage()); - if (group != null) { - group.shutdownGracefully(); - } - throw e; + throw new RuntimeException("Netty connect failed", e); } } /** - * 发送请求到Agent + * Map 请求 → Map 响应(同步) */ - public Map sendRequest(Map request) throws Exception { - if (channel == null || !channel.isActive()) { - connect(); - } + public Map sendAndReceive(Map request) { + ensureConnected(); - try { - // 将请求转换为JSON字符串 - String requestJson = objectMapper.writeValueAsString(request); - - // 发送请求 - ChannelFuture writeFuture = channel.writeAndFlush(requestJson + "\n").sync(); - if (!writeFuture.isSuccess()) { - throw new Exception("Failed to send request: " + writeFuture.cause().getMessage()); - } + // ⭐ 防止读到上一次响应 + responseQueue.clear(); - logger.debug("Sent request to Agent: {}", request); + try { + String json = MAPPER.writeValueAsString(request); + channel.writeAndFlush(json); - // 等待响应 - String responseJson = responseHandler.getResponse(requestTimeout, TimeUnit.MILLISECONDS); + Map resp = + responseQueue.poll(5, TimeUnit.SECONDS); - if (responseJson == null) { - throw new TimeoutException("Request timeout after " + requestTimeout + "ms"); + if (resp == null) { + throw new RuntimeException("Netty response timeout"); } - - // 解析响应 - Map response = objectMapper.readValue(responseJson, Map.class); - logger.debug("Received response from Agent: {}", response); - - return response; + return resp; } catch (Exception e) { - logger.error("Error sending request to Agent {}:{} - {}", host, port, e.getMessage()); - - // 连接可能已断开,下次重新连接 - disconnect(); - throw e; + throw new RuntimeException("Netty send failed", e); } } - /** - * 断开连接 - */ - public void disconnect() { - logger.info("Disconnecting from Agent at {}:{}", host, port); - - if (channel != null) { - channel.close().awaitUninterruptibly(); - channel = null; + private void ensureConnected() { + if (channel == null || !channel.isActive()) { + connect(); } + } - if (group != null) { + public void close() { + try { + if (channel != null) { + channel.close().sync(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { group.shutdownGracefully(); - group = null; } - - logger.info("Disconnected from Agent at {}:{}", host, port); } /** - * 检查是否连接 + * Handler:JSON → Map */ - public boolean isConnected() { - return channel != null && channel.isActive(); - } + private static class ClientHandler + extends SimpleChannelInboundHandler { - /** - * 响应处理器 - */ - private static class ResponseHandler extends SimpleChannelInboundHandler { - private final BlockingQueue responseQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue> responseQueue; - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) { - logger.debug("Received raw response: {}", msg); - responseQueue.offer(msg); + ClientHandler(BlockingQueue> responseQueue) { + this.responseQueue = responseQueue; } - public String getResponse(long timeout, TimeUnit unit) throws InterruptedException { - return responseQueue.poll(timeout, unit); + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) + throws Exception { + + Map map = + MAPPER.readValue(msg, + new TypeReference>() {}); + responseQueue.offer(map); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.error("Error in ResponseHandler", cause); ctx.close(); } } -} \ No newline at end of file +} + + + +//package com.dkv.dkvstorage.agent; +// +//import com.fasterxml.jackson.databind.ObjectMapper; +//import io.netty.bootstrap.Bootstrap; +//import io.netty.channel.*; +//import io.netty.channel.nio.NioEventLoopGroup; +//import io.netty.channel.socket.SocketChannel; +//import io.netty.channel.socket.nio.NioSocketChannel; +//import io.netty.handler.codec.string.StringDecoder; +//import io.netty.handler.codec.string.StringEncoder; +//import io.netty.handler.timeout.ReadTimeoutHandler; +//import io.netty.handler.timeout.WriteTimeoutHandler; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.nio.charset.StandardCharsets; +//import java.util.Map; +//import java.util.concurrent.*; +// +//public class NettyAgentClient { +// private static final Logger logger = LoggerFactory.getLogger(NettyAgentClient.class); +// private static final ObjectMapper objectMapper = new ObjectMapper(); +// +// private final String host; +// private final int port; +// private final int connectTimeout; +// private final int requestTimeout; +// +// private EventLoopGroup group; +// private Channel channel; +// private Bootstrap bootstrap; +// private final ResponseHandler responseHandler; +// +// public NettyAgentClient(String host, int port) { +// this(host, port, 5000, 10000); +// +// } +// +// public NettyAgentClient(String host, int port, int connectTimeout, int requestTimeout) { +// System.out.println(host); +// System.out.println(port); +// this.host = host; +// this.port = port; +// this.connectTimeout = connectTimeout; +// this.requestTimeout = requestTimeout; +// System.out.println("new ResponseHandler"); +// this.responseHandler = new ResponseHandler(); +// } +// +// /** +// * 连接到Agent服务器 +// */ +// public void connect() throws Exception { +// if (channel != null && channel.isActive()) { +// logger.debug("Already connected to {}:{}", host, port); +// return; +// } +// +// logger.info("Connecting to Agent at {}:{}", host, port); +// +// group = new NioEventLoopGroup(1); +// +// try { +// bootstrap = new Bootstrap(); +// bootstrap.group(group) +// .channel(NioSocketChannel.class) +// .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) +// .option(ChannelOption.TCP_NODELAY, true) +// .option(ChannelOption.SO_KEEPALIVE, true) +// .handler(new ChannelInitializer() { +// @Override +// protected void initChannel(SocketChannel ch) { +// ChannelPipeline pipeline = ch.pipeline(); +// +// // 超时处理器 +// pipeline.addLast(new ReadTimeoutHandler(requestTimeout / 1000)); +// pipeline.addLast(new WriteTimeoutHandler(requestTimeout / 1000)); +// +// // 字符串编解码器 +// pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8)); +// pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8)); +//// +// // 响应处理器 +// pipeline.addLast(responseHandler); +// } +// }); +// +// ChannelFuture future = bootstrap.connect(host, port).sync(); +// channel = future.channel(); +// +// logger.info("Connected to Agent at {}:{}", host, port); +// +// } catch (Exception e) { +// logger.error("Failed to connect to Agent {}:{} - {}", host, port, e.getMessage()); +// if (group != null) { +// group.shutdownGracefully(); +// } +// throw e; +// } +// } +// +// /** +// * 发送请求到Agent +// */ +// public Map sendRequest(Map request) throws Exception { +// if (channel == null || !channel.isActive()) { +// connect(); +// } +// +// try { +// // 将请求转换为JSON字符串 +// String requestJson = objectMapper.writeValueAsString(request); +// +// // 发送请求 +// ChannelFuture writeFuture = channel.writeAndFlush(requestJson + "\n").sync(); +// if (!writeFuture.isSuccess()) { +// throw new Exception("Failed to send request: " + writeFuture.cause().getMessage()); +// } +// +// logger.debug("Sent request to Agent: {}", request); +// +// // 等待响应 +// String responseJson = responseHandler.getResponse(requestTimeout, TimeUnit.MILLISECONDS); +// +// if (responseJson == null) { +// throw new TimeoutException("Request timeout after " + requestTimeout + "ms"); +// } +// +// // 解析响应 +// Map response = objectMapper.readValue(responseJson, Map.class); +// logger.debug("Received response from Agent: {}", response); +// +// return response; +// +// } catch (Exception e) { +// logger.error("Error sending request to Agent {}:{} - {}", host, port, e.getMessage()); +// // 连接可能已断开,下次重新连接 +// disconnect(); +// throw e; +// } +// } +// +// /** +// * 断开连接 +// */ +// public void disconnect() { +// logger.info("Disconnecting from Agent at {}:{}", host, port); +// +// if (channel != null) { +// channel.close().awaitUninterruptibly(); +// channel = null; +// } +// +// if (group != null) { +// group.shutdownGracefully(); +// group = null; +// } +// +// logger.info("Disconnected from Agent at {}:{}", host, port); +// } +// +// /** +// * 检查是否连接 +// */ +// public boolean isConnected() { +// return channel != null && channel.isActive(); +// } +// +// /** +// * 响应处理器 +// */ +// private static class ResponseHandler extends SimpleChannelInboundHandler { +// private final BlockingQueue responseQueue = new LinkedBlockingQueue<>(); +// +// @Override +// protected void channelRead0(ChannelHandlerContext ctx, String msg) { +// logger.debug("Received raw response: {}", msg); +// responseQueue.offer(msg); +// } +// +// public String getResponse(long timeout, TimeUnit unit) throws InterruptedException { +// return responseQueue.poll(timeout, unit); +// } +// +// @Override +// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { +// logger.error("Error in ResponseHandler", cause); +// ctx.close(); +// } +// } +//} \ No newline at end of file diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java index b2711fd..8e75158 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/controller/DataNodeController.java @@ -1,6 +1,8 @@ package com.dkv.dkvstorage.controller; import com.dkv.dkvstorage.rocksdb.DataNodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -12,6 +14,7 @@ @RequestMapping("/api/datanode") public class DataNodeController { + private static final Logger log = LoggerFactory.getLogger(DataNodeController.class); @Autowired private DataNodeManager dataNodeManager; @@ -22,15 +25,15 @@ public class DataNodeController { * -d '{ * "nodeId": "node1:9000", * "dataDir": "/tmp/datanode1", + * "host":"127.0.0.1", * "port": 9000, * "isPrimary": true, - * "replicas": "node2:9001,node3:9002" + * "replicas": "127.0.0.1:9001,127.0.0.1:9002" * }' */ @PostMapping("/start") public ResponseEntity> startDataNode(@RequestBody Map request) { Map response = new HashMap<>(); - try { String nodeId = (String) request.get("nodeId"); String host = (String) request.get("host"); @@ -38,9 +41,7 @@ public ResponseEntity> startDataNode(@RequestBody Map> stopDataNode(@RequestBody Map request = new HashMap<>(); request.put("nodeId", nodeId); @@ -175,8 +173,9 @@ private boolean startNodeInternal(NodeInfo nodeInfo) throws Exception { request.put("action", "start"); // 发送请求到Agent - Map response = client.sendRequest(request); + Map response = client.sendAndReceive(request); + logger.info("send"); if (response != null && Boolean.TRUE.equals(response.get("success"))) { logger.info("Start command sent successfully to {}", host); return true; @@ -195,13 +194,14 @@ private boolean stopNodeInternal(NodeInfo nodeInfo) throws Exception { String host = nodeInfo.getHost(); NettyAgentClient client = getOrCreateClient(host); + logger.info("Client get successful!"); // 构建停止请求 Map request = new HashMap<>(); request.put("nodeId", nodeId); request.put("action", "stop"); - Map response = client.sendRequest(request); + Map response = client.sendAndReceive(request); if (response != null && Boolean.TRUE.equals(response.get("success"))) { logger.info("Stop command sent successfully to {}", host); @@ -218,14 +218,11 @@ private boolean stopNodeInternal(NodeInfo nodeInfo) throws Exception { */ private synchronized NettyAgentClient getOrCreateClient(String host) throws Exception { String clientKey = host + ":" + agentPort; - if (!clientPool.containsKey(clientKey)) { NettyAgentClient client = new NettyAgentClient(host, agentPort); - client.connect(); clientPool.put(clientKey, client); logger.info("Created Netty client for {}:{}", host, agentPort); } - return clientPool.get(clientKey); } @@ -249,7 +246,7 @@ public Map getNodeStatus(String nodeId) { request.put("nodeId", nodeId); request.put("action", "status"); - Map agentResponse = client.sendRequest(request); + Map agentResponse = client.sendAndReceive(request); Map result = new HashMap<>(); result.put("nodeId", nodeId); @@ -361,7 +358,7 @@ public void shutdown() { // 关闭客户端连接 for (Map.Entry entry : clientPool.entrySet()) { try { - entry.getValue().disconnect(); + entry.getValue().close(); } catch (Exception e) { logger.warn("Error disconnecting client {}: {}", entry.getKey(), e.getMessage()); } diff --git a/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/RocksDbEngine.java b/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/RocksDbEngine.java index 1cfb3e9..b618376 100644 --- a/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/RocksDbEngine.java +++ b/dkv-storage/src/main/java/com/dkv/dkvstorage/rocksdb/RocksDbEngine.java @@ -103,6 +103,7 @@ public void close() { } } + // 统计信息 public long getWriteCount() { return writeCount.get();