From 973d07afdeeeb6a37c78fa6b3203c6517b3ec647 Mon Sep 17 00:00:00 2001 From: dunwu Date: Tue, 31 Dec 2024 08:02:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=86=E5=B8=83=E5=BC=8F=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java-distributed-id/pom.xml | 52 ++++++++++++++++++ .../id/ZookeeperDistributedId.java | 55 +++++++++++++++++++ .../id/ZookeeperDistributedId2.java | 46 ++++++++++++++++ .../scripts/fixed_window_rate_limit.lua | 21 +++++++ .../scripts/token_bucket_rate_limit.lua | 39 +++++++++++++ .../java-load-balance/pom.xml | 1 - .../java-distributed/java-rate-limit/pom.xml | 3 +- codes/java-distributed/java-task/pom.xml | 40 ++++++++++++++ .../dunwu/local/task/DelayQueueExample.java | 52 ++++++++++++++++++ .../task/ScheduledExecutorServiceExample.java | 37 +++++++++++++ .../github/dunwu/local/task/TimerExample.java | 39 +++++++++++++ codes/java-distributed/pom.xml | 40 +++++++++++++- 12 files changed, 420 insertions(+), 5 deletions(-) create mode 100644 codes/java-distributed/java-distributed-id/pom.xml create mode 100644 codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId.java create mode 100644 codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId2.java create mode 100644 codes/java-distributed/java-distributed-id/src/main/resources/scripts/fixed_window_rate_limit.lua create mode 100644 codes/java-distributed/java-distributed-id/src/main/resources/scripts/token_bucket_rate_limit.lua create mode 100644 codes/java-distributed/java-task/pom.xml create mode 100644 codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/DelayQueueExample.java create mode 100644 codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/ScheduledExecutorServiceExample.java create mode 100644 codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/TimerExample.java diff --git a/codes/java-distributed/java-distributed-id/pom.xml b/codes/java-distributed/java-distributed-id/pom.xml new file mode 100644 index 0000000..5f72fc0 --- /dev/null +++ b/codes/java-distributed/java-distributed-id/pom.xml @@ -0,0 +1,52 @@ + + + 4.0.0 + + + io.github.dunwu.distributed + java-distributed + 1.0.0 + + + io.github.dunwu.javatech + java-distributed-id + 1.0.0 + jar + ${project.artifactId} + + + UTF-8 + 1.8 + ${java.version} + ${java.version} + + + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + curator-recipes + + + redis.clients + jedis + + + cn.hutool + hutool-all + + + org.projectlombok + lombok + + + ch.qos.logback + logback-classic + true + + + diff --git a/codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId.java b/codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId.java new file mode 100644 index 0000000..06d7092 --- /dev/null +++ b/codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId.java @@ -0,0 +1,55 @@ +package io.github.dunwu.distributed.id; + +import cn.hutool.core.collection.CollectionUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; + +import java.util.List; + +/** + * ZK 分布式 ID + * + * @author Zhang Peng + * @date 2024-12-20 + */ +@Slf4j +public class ZookeeperDistributedId { + + public static void main(String[] args) throws Exception { + + // 获取客户端 + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); + + // 开启会话 + client.start(); + + String id1 = client.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT_SEQUENTIAL) + .forPath("/zkid/id_"); + log.info("id: {}", id1); + + String id2 = client.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT_SEQUENTIAL) + .forPath("/zkid/id_"); + log.info("id: {}", id2); + + List children = client.getChildren().forPath("/zkid"); + if (CollectionUtil.isNotEmpty(children)) { + for (String child : children) { + client.delete().forPath("/zkid/" + child); + } + } + client.delete().forPath("/zkid"); + + // 关闭客户端 + client.close(); + } + +} diff --git a/codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId2.java b/codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId2.java new file mode 100644 index 0000000..69d5d54 --- /dev/null +++ b/codes/java-distributed/java-distributed-id/src/main/java/io/github/dunwu/distributed/id/ZookeeperDistributedId2.java @@ -0,0 +1,46 @@ +package io.github.dunwu.distributed.id; + +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; +import org.apache.curator.retry.ExponentialBackoffRetry; + +/** + * ZK 分布式 ID + *

+ * 基于原子计数器生成 ID + * + * @author Zhang Peng + * @date 2024-12-20 + */ +@Slf4j +public class ZookeeperDistributedId2 { + + public static void main(String[] args) throws Exception { + + // 获取客户端 + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); + DistributedAtomicLong atomicLong = new DistributedAtomicLong(client, "/zkid", retryPolicy); + + // 开启会话 + client.start(); + + // 基于原子计数器生成 ID + AtomicValue id1 = atomicLong.increment(); + log.info("id: {}", id1.postValue()); + + AtomicValue id2 = atomicLong.increment(); + log.info("id: {}", id2.postValue()); + + // 清理节点 + client.delete().forPath("/zkid"); + + // 关闭客户端 + client.close(); + } + +} diff --git a/codes/java-distributed/java-distributed-id/src/main/resources/scripts/fixed_window_rate_limit.lua b/codes/java-distributed/java-distributed-id/src/main/resources/scripts/fixed_window_rate_limit.lua new file mode 100644 index 0000000..e0c9ad0 --- /dev/null +++ b/codes/java-distributed/java-distributed-id/src/main/resources/scripts/fixed_window_rate_limit.lua @@ -0,0 +1,21 @@ +-- 缓存 Key +local key = KEYS[1] +-- 访问请求数 +local permits = tonumber(ARGV[1]) +-- 过期时间 +local seconds = tonumber(ARGV[2]) +-- 限流阈值 +local limit = tonumber(ARGV[3]) + +-- 获取统计值 +local count = tonumber(redis.call('GET', key) or "0") + +if count + permits > limit then + -- 请求拒绝 + return -1 +else + -- 请求通过 + redis.call('INCRBY', key, permits) + redis.call('EXPIRE', key, seconds) + return count + permits +end \ No newline at end of file diff --git a/codes/java-distributed/java-distributed-id/src/main/resources/scripts/token_bucket_rate_limit.lua b/codes/java-distributed/java-distributed-id/src/main/resources/scripts/token_bucket_rate_limit.lua new file mode 100644 index 0000000..541d70c --- /dev/null +++ b/codes/java-distributed/java-distributed-id/src/main/resources/scripts/token_bucket_rate_limit.lua @@ -0,0 +1,39 @@ +local tokenKey = KEYS[1] +local timeKey = KEYS[2] + +-- 申请令牌数 +local permits = tonumber(ARGV[1]) +-- QPS +local qps = tonumber(ARGV[2]) +-- 桶的容量 +local capacity = tonumber(ARGV[3]) +-- 当前时间(单位:毫秒) +local nowMillis = tonumber(ARGV[4]) +-- 填满令牌桶所需要的时间 +local fillTime = capacity / qps +local ttl = math.min(capacity, math.floor(fillTime * 2)) + +local currentTokenNum = tonumber(redis.call("GET", tokenKey)) +if currentTokenNum == nil then + currentTokenNum = capacity +end + +local endTimeMillis = tonumber(redis.call("GET", timeKey)) +if endTimeMillis == nil then + endTimeMillis = 0 +end + +local gap = nowMillis - endTimeMillis +local newTokenNum = math.max(0, gap * qps / 1000) +local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum) + +if currentTokenNum < permits then + -- 请求拒绝 + return -1 +else + -- 请求通过 + local finalTokenNum = currentTokenNum - permits + redis.call("SETEX", tokenKey, ttl, finalTokenNum) + redis.call("SETEX", timeKey, ttl, nowMillis) + return finalTokenNum +end diff --git a/codes/java-distributed/java-load-balance/pom.xml b/codes/java-distributed/java-load-balance/pom.xml index 485cb7d..b4e73f0 100644 --- a/codes/java-distributed/java-load-balance/pom.xml +++ b/codes/java-distributed/java-load-balance/pom.xml @@ -7,7 +7,6 @@ java-load-balance 1.0.0 jar - ${project.artifactId} UTF-8 diff --git a/codes/java-distributed/java-rate-limit/pom.xml b/codes/java-distributed/java-rate-limit/pom.xml index afa947c..312d7f2 100644 --- a/codes/java-distributed/java-rate-limit/pom.xml +++ b/codes/java-distributed/java-rate-limit/pom.xml @@ -3,11 +3,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - io.github.dunwu.javatech + io.github.dunwu.distributed java-rate-limit 1.0.0 jar - ${project.artifactId} UTF-8 diff --git a/codes/java-distributed/java-task/pom.xml b/codes/java-distributed/java-task/pom.xml new file mode 100644 index 0000000..8d7cc46 --- /dev/null +++ b/codes/java-distributed/java-task/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + + io.github.dunwu.distributed + java-distributed + 1.0.0 + + + io.github.dunwu.distributed + java-task + 1.0.0 + jar + + + UTF-8 + 1.8 + ${java.version} + ${java.version} + + + + + cn.hutool + hutool-all + + + org.projectlombok + lombok + + + ch.qos.logback + logback-classic + 1.2.3 + true + + + diff --git a/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/DelayQueueExample.java b/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/DelayQueueExample.java new file mode 100644 index 0000000..d510eed --- /dev/null +++ b/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/DelayQueueExample.java @@ -0,0 +1,52 @@ +package io.github.dunwu.local.task; + +import cn.hutool.core.date.DateUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DelayQueueExample { + + public static void main(String[] args) throws InterruptedException { + BlockingQueue delayQueue = new DelayQueue<>(); + long now = System.currentTimeMillis(); + delayQueue.put(new SampleTask(now + 1000)); + delayQueue.put(new SampleTask(now + 2000)); + delayQueue.put(new SampleTask(now + 3000)); + for (int i = 0; i < 3; i++) { + log.info("task 执行时间:{}", DateUtil.format(new Date(delayQueue.take().getTime()), "yyyy-MM-dd HH:mm:ss")); + } + } + + static class SampleTask implements Delayed { + + long time; + + public SampleTask(long time) { + this.time = time; + } + + public long getTime() { + return time; + } + + @Override + public int compareTo(Delayed o) { + return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + } + +} + + diff --git a/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/ScheduledExecutorServiceExample.java b/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/ScheduledExecutorServiceExample.java new file mode 100644 index 0000000..78e8f5b --- /dev/null +++ b/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/ScheduledExecutorServiceExample.java @@ -0,0 +1,37 @@ +package io.github.dunwu.local.task; + +import cn.hutool.core.date.DateUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class ScheduledExecutorServiceExample { + + public static void main(String[] args) { + // 创建一个 ScheduledExecutorService 对象,它将使用一个线程池来执行任务 + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + // 创建一个 Runnable 对象,这个任务将在 2 秒后执行,并且每 1 秒重复执行一次 + Runnable task = () -> { + log.info("task 执行时间:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")); + }; + + // 安排任务在 2 秒后执行,并且每 1 秒重复执行一次 + executor.scheduleAtFixedRate(task, 2, 1, TimeUnit.SECONDS); + + // 主线程等待 10 秒后结束 + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // 关闭 executor,这将停止所有正在执行的任务,并拒绝新任务的提交 + executor.shutdown(); + } + +} diff --git a/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/TimerExample.java b/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/TimerExample.java new file mode 100644 index 0000000..ce1d775 --- /dev/null +++ b/codes/java-distributed/java-task/src/main/java/io/github/dunwu/local/task/TimerExample.java @@ -0,0 +1,39 @@ +package io.github.dunwu.local.task; + +import cn.hutool.core.date.DateUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + +@Slf4j +public class TimerExample { + + public static void main(String[] args) { + // 创建一个 Timer 对象 + Timer timer = new Timer(); + + // 创建一个 TimerTask 对象,这个任务将在 2 秒后执行,并且每 1 秒重复执行一次 + TimerTask task = new TimerTask() { + @Override + public void run() { + log.info("task 执行时间:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")); + } + }; + + // 安排任务在 2 秒后执行,并且每 1 秒重复执行一次 + timer.schedule(task, 2000, 1000); + + // 主线程等待 10 秒后结束 + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // 取消定时器和所有已安排的任务 + timer.cancel(); + } + +} diff --git a/codes/java-distributed/pom.xml b/codes/java-distributed/pom.xml index 366bfbf..aa88d15 100644 --- a/codes/java-distributed/pom.xml +++ b/codes/java-distributed/pom.xml @@ -7,12 +7,48 @@ java-distributed 1.0.0 pom - JAVA-DISTRIBUTED - JAVA-DISTRIBUTED 示例源码 java-load-balance java-rate-limit + java-distributed-id + java-task + + + + org.apache.zookeeper + zookeeper + 3.9.2 + + + org.apache.curator + curator-recipes + 4.3.0 + + + redis.clients + jedis + 5.1.0 + + + cn.hutool + hutool-all + 5.8.34 + + + org.projectlombok + lombok + 1.18.30 + + + ch.qos.logback + logback-classic + 1.4.12 + true + + + +