实现接收消息处理的消费者队列

This commit is contained in:
xiaowang 2025-06-24 18:38:25 +08:00
parent a9bcc42b44
commit 32df1cb4ae
12 changed files with 369 additions and 13 deletions

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@ -113,6 +113,8 @@
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>

View File

@ -10,5 +10,4 @@ public class StartBotApplication {
public static void main(String[] args) {
SpringApplication.run(StartBotApplication.class, args);
}
}

View File

@ -0,0 +1,17 @@
package com.startbot.config;
import com.startbot.entity.dto.PrivateMessageDTO;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
@Configuration
public class QueueConfig {
@Bean
public ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue() {
// 设置队列容量根据你的需求调整
return new ArrayBlockingQueue<>(1000);
}
}

View File

@ -0,0 +1,25 @@
package com.startbot.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
// 配置线程池
@Configuration
@EnableAsync
public class ThreadConfig {
@Bean(name = "privateMessageTaskExecutor")
public Executor privateMessageTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);// 核心线程数
executor.setMaxPoolSize(2);// 最大线程数
executor.setQueueCapacity(100);// 队列容量
executor.setThreadNamePrefix("private-message-");
executor.initialize();
return executor;
}
}

View File

@ -1,31 +1,45 @@
package com.startbot.controller;
import com.startbot.entity.dto.PrivateMessageDTO;
import com.startbot.entity.dto.ReqDTO;
import com.startbot.service.PrivateMessageService;
import com.startbot.utils.PrivateMessageConsumer;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ArrayBlockingQueue;
@RestController
public class StarBotController {
private static final Logger logger = LoggerFactory.getLogger(StarBotController.class);
private final PrivateMessageService privateMessageService;
ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue;
public StarBotController(PrivateMessageService privateMessageService) {
public StarBotController(PrivateMessageService privateMessageService,
ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue,
PrivateMessageConsumer privateMessageConsumer
) {
this.privateMessageService = privateMessageService;
this.privateMessageQueue = privateMessageQueue;
new Thread(privateMessageConsumer).start();
}
@PostMapping("/")
public void demo1(@RequestBody ReqDTO req) {
public String PrivateMessageHandler(@RequestBody ReqDTO req) {
switch (req.getEvent()) {
//私聊消息事件
case "10002":
privateMessageService.getMessage(req);
return privateMessageService.getMessage(req, this.privateMessageQueue);
}
return null;
}
}

View File

@ -1,8 +1,5 @@
package com.startbot.entity.dto;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data

View File

@ -2,9 +2,12 @@ package com.startbot.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.startbot.entity.dataobj.PrivateMessageDO;
import com.startbot.entity.dto.PrivateMessageDTO;
import com.startbot.entity.dto.ReqDTO;
import java.util.concurrent.ArrayBlockingQueue;
public interface PrivateMessageService extends IService<PrivateMessageDO> {
public void getMessage(ReqDTO reqDTO);
public String getMessage(ReqDTO reqDTO, ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue);
}

View File

@ -11,14 +11,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.ArrayBlockingQueue;
@Service
public class PrivateMessageServiceImpl extends ServiceImpl<PrivateMessageMapper, PrivateMessageDO> implements PrivateMessageService {
private static final Logger logger = LoggerFactory.getLogger(PrivateMessageServiceImpl.class);
@Override
public void getMessage(ReqDTO reqDTO) {
ObjectMapper jsonMapper = new ObjectMapper();
PrivateMessageDTO message = jsonMapper.convertValue(reqDTO.getData(),PrivateMessageDTO.class);
logger.info("收到来自:{} 的消息:{}", message.getFromNickName(), message.getMessage());
public String getMessage(ReqDTO reqDTO, ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue) {
try {
ObjectMapper jsonMapper = new ObjectMapper();
PrivateMessageDTO message = jsonMapper.convertValue(reqDTO.getData(), PrivateMessageDTO.class);
privateMessageQueue.put(message);
logger.info("收到消息,已放入队列");
return "success";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,245 @@
package com.startbot.utils;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* HTTP 请求工具类基于 Java 11 HttpClient 实现
*/
public class HttpUtils {
private static final HttpClient httpClient;
private static final int DEFAULT_TIMEOUT = 10; // 默认超时时间
static {
// 初始化 HttpClient使用单例模式
httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(DEFAULT_TIMEOUT))
.build();
}
// 私有构造函数防止实例化
private HttpUtils() {}
// ==================== GET 请求 ====================
/**
* 发送不带参数的 GET 请求
* @param url 请求 URL
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String get(String url) throws IOException, InterruptedException {
return get(url, Map.of());
}
/**
* 发送带参数的 GET 请求
* @param url 请求 URL
* @param params 请求参数
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String get(String url, Map<String, String> params) throws IOException, InterruptedException {
return get(url, params, Map.of());
}
/**
* 发送带参数和请求头的 GET 请求
* @param url 请求 URL
* @param params 请求参数
* @param headers 请求头
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String get(String url, Map<String, String> params, Map<String, String> headers) throws IOException, InterruptedException {
String fullUrl = buildUrlWithParams(url, params);
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(fullUrl))
.GET();
addHeaders(requestBuilder, headers);
HttpRequest request = requestBuilder.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
return response.body();
}
// ==================== POST 请求 ====================
/**
* 发送不带请求体的 POST 请求
* @param url 请求 URL
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String post(String url) throws IOException, InterruptedException {
return post(url, Map.of());
}
/**
* 发送带请求头的 POST 请求
* @param url 请求 URL
* @param headers 请求头
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String post(String url, Map<String, String> headers) throws IOException, InterruptedException {
return post(url, headers, "");
}
/**
* 发送带 JSON 请求体的 POST 请求
* @param url 请求 URL
* @param jsonBody JSON 请求体
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String postJson(String url, String jsonBody) throws IOException, InterruptedException {
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", "application/json");
return post(url, headers, jsonBody);
}
/**
* 发送带表单数据的 POST 请求
* @param url 请求 URL
* @param params 表单参数
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String postForm(String url, Map<String, String> params) throws IOException, InterruptedException {
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", "application/x-www-form-urlencoded");
String formBody = buildFormData(params);
return post(url, headers, formBody);
}
/**
* 发送带请求头和请求体的 POST 请求
* @param url 请求 URL
* @param headers 请求头
* @param requestBody 请求体
* @return 响应结果
* @throws IOException 网络异常
* @throws InterruptedException 线程中断异常
*/
public static String post(String url, Map<String, String> headers, String requestBody) throws IOException, InterruptedException {
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(url))
.POST(HttpRequest.BodyPublishers.ofString(requestBody));
addHeaders(requestBuilder, headers);
HttpRequest request = requestBuilder.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
return response.body();
}
// ==================== 异步请求 ====================
/**
* 异步发送 GET 请求
* @param url 请求 URL
* @return 包含响应结果的 CompletableFuture
*/
public static CompletableFuture<String> getAsync(String url) {
return getAsync(url, Map.of());
}
/**
* 异步发送带参数的 GET 请求
* @param url 请求 URL
* @param params 请求参数
* @return 包含响应结果的 CompletableFuture
*/
public static CompletableFuture<String> getAsync(String url, Map<String, String> params) {
return getAsync(url, params, Map.of());
}
/**
* 异步发送带参数和请求头的 GET 请求
* @param url 请求 URL
* @param params 请求参数
* @param headers 请求头
* @return 包含响应结果的 CompletableFuture
*/
public static CompletableFuture<String> getAsync(String url, Map<String, String> params, Map<String, String> headers) {
String fullUrl = buildUrlWithParams(url, params);
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(fullUrl))
.GET();
addHeaders(requestBuilder, headers);
HttpRequest request = requestBuilder.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
// ==================== 辅助方法 ====================
/**
* 构建带参数的 URL
*/
private static String buildUrlWithParams(String url, Map<String, String> params) {
if (params == null || params.isEmpty()) {
return url;
}
String queryString = params.entrySet().stream()
.map(entry -> encodeParam(entry.getKey()) + "=" + encodeParam(entry.getValue()))
.collect(Collectors.joining("&"));
return url + (url.contains("?") ? "&" : "?") + queryString;
}
/**
* 构建表单数据
*/
private static String buildFormData(Map<String, String> params) {
if (params == null || params.isEmpty()) {
return "";
}
return params.entrySet().stream()
.map(entry -> encodeParam(entry.getKey()) + "=" + encodeParam(entry.getValue()))
.collect(Collectors.joining("&"));
}
/**
* 添加请求头
*/
private static void addHeaders(HttpRequest.Builder requestBuilder, Map<String, String> headers) {
headers.forEach(requestBuilder::header);
}
/**
* 编码 URL 参数
*/
private static String encodeParam(String param) {
return Optional.ofNullable(param)
.map(p -> URLEncoder.encode(p, StandardCharsets.UTF_8))
.orElse("");
}
}

View File

@ -0,0 +1,40 @@
package com.startbot.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.startbot.entity.dto.PrivateMessageDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
@Component
public class PrivateMessageConsumer implements Runnable {
ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue;
private static final Logger logger = LoggerFactory.getLogger(PrivateMessageConsumer.class);
public PrivateMessageConsumer(ArrayBlockingQueue<PrivateMessageDTO> privateMessageQueue) {
this.privateMessageQueue = privateMessageQueue;
logger.info("私聊消息处理线程启动成功");
}
public void getMessage() {
while (true) {
PrivateMessageDTO message;
try {
message = privateMessageQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("收到来自:{} 的消息:{}", message.getFromNickName(), message.getMessage());
}
}
@Override
public void run() {
this.getMessage();
}
}

View File

@ -1,5 +1,5 @@
server:
port: 9999 # 应用服务端口号
port: 19999 # 应用服务端口号
spring:
application: