消费者改成 kafka 事件模式

1、去掉了 xxl-job 相关的代码
2、executor 部分直接使用 kafka 事件监听模式实现
3、修改了相关的所有 application.yml 和 docker 文件
This commit is contained in:
巴拉迪维 2021-09-11 20:22:44 +08:00
parent 8026280580
commit b0ce76fa3d
13 changed files with 217 additions and 255 deletions

View File

@ -0,0 +1,48 @@
package cn.org.gitlink.notification.common.utils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
private String bootstrapServers;
@Value("${spring.kafka.producer.client_id:#{null}}")
private String clientId;
@Value("${spring.kafka.producer.retries:#{null}}")
private Integer retries;
@Value("${spring.kafka.producer.batch_size:#{null}}")
private Integer batchSize;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerConfigs(),true);
}
@Bean
ProducerFactory<String, String> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}

View File

@ -1,11 +1,17 @@
package cn.org.gitlink.notification.common.utils;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.PostConstruct;
import java.util.Collection;
@ -26,15 +32,16 @@ import java.util.stream.Collectors;
@Component
public class KafkaUtil {
@Value("${spring.kafka.bootstrap-servers:#{null}}")
private String springKafkaBootstrapServers;
private Logger logger = LogManager.getLogger(KafkaUtil.class);
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
private String kafkaServer;
private AdminClient adminClient;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 初始化AdminClient
* '@PostConstruct该注解被用来修饰一个非静态的void方法
@ -44,29 +51,26 @@ public class KafkaUtil {
@PostConstruct
private void initAdminClient() {
Map<String, Object> props = new HashMap<>(1);
if (springKafkaBootstrapServers == null) {
if (kafkaServer == null) {
return;
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
adminClient = KafkaAdminClient.create(props);
}
/**
* 新增topic支持批量
*/
public void createTopic(Collection<NewTopic> newTopics) {
adminClient.createTopics(newTopics);
}
/**
* 删除topic支持批量
*/
public void deleteTopic(Collection<String> topics) {
adminClient.deleteTopics(topics);
}
/**
* 获取指定topic的信息
* 获取 Topic 的信息
*
* @param topics
* @return
*/
public String getTopicInfo(Collection<String> topics) {
AtomicReference<String> info = new AtomicReference<String>("");
@ -77,28 +81,55 @@ public class KafkaUtil {
}
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
logger.error(e);
}
return info.get();
}
/**
* 获取全部topic
* 获取所有 Topic
*
* @return
*/
public List<String> getAllTopic() {
try {
return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
logger.error(e);
}
return Lists.newArrayList();
}
/**
* 往topic中发送消息
* Kafka 发送消息
*
* @param topic
* @param message
*/
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error(throwable);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
logger.debug(stringStringSendResult.getRecordMetadata());
}
});
}
/**
* kafka 发送信息
*
* @param topic
* @param message
* @param callback
*/
public void sendMessage(String topic, String message, ListenableFutureCallback callback) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(callback);
}
}

View File

@ -21,13 +21,6 @@
<version>${springboot.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.xuxueli/xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>cn.org.gitlink.notification</groupId>
<artifactId>gns-common</artifactId>

View File

@ -1,10 +1,14 @@
package cn.org.gitlink.notification.executor.core.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@ -13,46 +17,43 @@ import java.util.Map;
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
@Value("${spring.kafka.consumer.bootstrap_servers:#{null}}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout:10000}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
@Value("${spring.kafka.consumer.group_id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
@Value("${spring.kafka.consumer.client_id}")
private String clientId;
@Value("${spring.kafka.consumer.auto_offset_reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.max.poll.records}")
@Value("${spring.kafka.consumer.max_poll_records}")
private String maxPollRecords;
@Bean("kafkaConsumer")
public KafkaConsumer<String, String> consumer() {
return new KafkaConsumer<>(consumerConfigs());
@Value("${spring.kafka.consumer.topic}")
private String topic;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> consumerListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerConfigs());
factory.setRecordFilterStrategy(record -> record.topic().toLowerCase().equals(this.topic));
return factory;
}
public Map<String, Object> consumerConfigs() {
@Bean
public ConsumerFactory<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", servers);
//每个消费者分配独立的组号
props.put("group.id", groupId);
//如果value合法则自动提交偏移量
props.put("enable.auto.commit", enableAutoCommit);
// 每次拉取10条
props.put("max.poll.records", maxPollRecords);
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", autoCommitInterval);
//设置会话响应的时间超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", sessionTimeout);
//自动重置offset
props.put("auto.offset.reset", autoOffsetReset);
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return props;
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}

View File

@ -1,54 +0,0 @@
package cn.org.gitlink.notification.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XXLJobConfig {
private Logger logger = LoggerFactory.getLogger(XXLJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}

View File

@ -1,76 +0,0 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.common.utils.SpringContextUtil;
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
import cn.org.gitlink.notification.model.service.notification.SysNotificationService;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
/**
* 系统消息消费执行器
* @author zengwei
* @Date 2021-09-09
*/
@Component
public class NotificationJob {
private static Logger logger = LoggerFactory.getLogger(NotificationJob.class);
@Value("${topic.notification.name: #{null}}")
private String topic;
@Autowired
private SysNotificationService sysNotificationService;
/**
* 系统消息处理入口在xxl-job-admin添加任务时JobHandler栏填注解内名字
* @author zengwei
* @Date 2021-09-09
* */
@XxlJob("notificationMessageHandler")
public void notificationMessageHandler() {
if (null == topic){
XxlJobHelper.handleFail("未配置topic!");
return;
}
XxlJobHelper.log("topic: {}", topic);
KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) SpringContextUtil.getBean("kafkaConsumer");
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
XxlJobHelper.log("拉取到{}条记录!", records.count());
for (ConsumerRecord<String, String> record: records) {
NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(record.value(), NewSysNotificationVo.class);
try {
sysNotificationService.sendNotification(newSysNotificationVo);
XxlJobHelper.log("{} 消费成功!", record.value());
} catch (Exception e) {
XxlJobHelper.log("{} 消费失败, 原因:{}", record.value(), e.getMessage());
}
}
consumer.commitAsync();
}
public void init(){
logger.info("init notification job");
}
public void destroy(){
logger.info("notification job has been destroyed");
}
}

View File

@ -0,0 +1,38 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
import cn.org.gitlink.notification.model.service.notification.SysNotificationService;
import com.alibaba.fastjson.JSONObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Configuration
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group_id}")
public class NotificationListener {
private Logger logger = LogManager.getLogger(NotificationListener.class);
@Autowired
private SysNotificationService sysNotificationService;
@KafkaHandler
public void messageHanlder(String message) {
try {
NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(message, NewSysNotificationVo.class);
sysNotificationService.sendNotification(newSysNotificationVo);
} catch (Exception e) {
logger.error(e);
}
}
@KafkaHandler(isDefault = true)
public void defaultHandler(Object object) {
logger.error("Unknow object received. ->" + object);
}//end of method
}

View File

@ -1,33 +0,0 @@
server.port=8083
logging.config=classpath:logback.xml
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://localhost:9999/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=
### xxl-job executor appname
xxl.job.executor.appname=gitlink-notification-executor
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=82
### xxl-job executor log-path
xxl.job.executor.logpath=./logs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
kafka.consumer.servers=localhost:29092,localhost:39092
kafka.consumer.group.id=gitlink-notification-group
kafka.consumer.auto.offset.reset=earliest
kafka.consumer.enable.auto.commit=false
kafka.consumer.auto.commit.interval=100
kafka.consumer.max.poll.records =1000
# DB settings
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:33306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
spring.datasource.username=gitlink
spring.datasource.password=giTlinK0^827
# kafka topic names
topic.notification.name = gitlink_notification

View File

@ -0,0 +1,30 @@
server:
port: 8083
logging:
config: classpath:logback.xml
spring:
kafka:
producer:
bootstrap_servers: localhost:29092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
consumer:
bootstrap_servers: localhost:29092
client_id: gitlink_consumer
group_id: group-gitlink-notification
auto_offset_reset: earliest
max_poll_records: 5
topic: topic-gitlink-notification
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:33306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
username: gitlink
password: giTlinK0^827
#ip白名单列表多个ip用逗号隔开允许所有用*号
white-list: '*'

View File

@ -33,23 +33,6 @@ services:
networks:
- gitlink_network
xxl-job-admin:
image: xuxueli/xxl-job-admin:${XXL_JOB_ADMIN_VERSION}
container_name: ${XXL_JOB_ADMIN_CONTAINER_NAME}
hostname: xxl_job_admin
environment:
- JAVA_OPTS=-Xmx512m
- PARAMS=--spring.datasource.url=jdbc:mysql://${MYSQL_CONTAINER_NAME}:3306/gitlink_notification?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username=${MYSQL_USER} --spring.datasource.password=${MYSQL_PASSWORD}
volumes:
- ${DOCKER_DATA_PATH}/xxl-job-admin:/data/applogs
ports:
- ${XXL_JOB_ADMIN_LOCAL_PORT}:8080
depends_on:
- mysql
networks:
- gitlink_network
# See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest

View File

@ -27,6 +27,13 @@ spring:
# 连接超时时间(毫秒)
timeout: 1000
kafka:
producer:
bootstrap_servers: localhost:29092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver

View File

@ -18,6 +18,8 @@ import org.apache.logging.log4j.Logger;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.BindingResult;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@ -26,10 +28,11 @@ import java.util.Map;
@RestController
@RequestMapping(value = "/gns/notification")
@Configuration
public class NotificationController {
//kafka topic
private static final String GITLINK_NOTIFICATION_TOPIC = "gitlink_notification"; //在kafka中gitlink的topic
@Value("${spring.kafka.producer.topic}")
private String gitlinkNotificationTopic;
@Autowired
private KafkaUtil kafkaUtil;
@ -68,7 +71,7 @@ public class NotificationController {
newSysNotificationVo.setPlatform(platform);
try {
kafkaUtil.sendMessage(GITLINK_NOTIFICATION_TOPIC, JSONObject.toJSONString(newSysNotificationVo));
kafkaUtil.sendMessage(gitlinkNotificationTopic, JSONObject.toJSONString(newSysNotificationVo));
return DataPacketUtil.jsonSuccessResult();
} catch (Exception e) {
logger.error(e);

View File

@ -5,22 +5,13 @@ logging:
config: classpath:logback.xml
spring:
#kafka配置
kafka:
#这里改为你的kafka服务器ip和端口号
bootstrap-servers: localhost:29092
#=============== producer =======================
producer:
#如果该值大于零时,表示启用重试失败的发送次数
retries: 0
#每当多个记录被发送到同一分区时生产者将尝试将记录一起批量处理为更少的请求默认值为16384(单位字节)
batch-size: 16384
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数默认值为3355443
buffer-memory: 33554432
#key的Serializer类实现类实现了接口org.apache.kafka.common.serialization.Serializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value的Serializer类实现类实现了接口org.apache.kafka.common.serialization.Serializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap_servers: localhost:29092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
topic: topic-gitlink-notification
datasource:
# 配置数据源类型