From b0ce76fa3d4bd6ccc2f808ba6a922f9419a55501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B7=B4=E6=8B=89=E8=BF=AA=E7=BB=B4?= Date: Sat, 11 Sep 2021 20:22:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E6=94=B9=E6=88=90?= =?UTF-8?q?=20kafka=20=E4=BA=8B=E4=BB=B6=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、去掉了 xxl-job 相关的代码 2、executor 部分直接使用 kafka 事件监听模式实现 3、修改了相关的所有 application.yml 和 docker 文件 --- .../common/utils/KafkaProducerConfig.java | 48 ++++++++++++ .../notification/common/utils/KafkaUtil.java | 65 +++++++++++----- executor/pom.xml | 7 -- .../core/config/KafkaConsumerConfig.java | 69 ++++++++--------- .../executor/core/config/XXLJobConfig.java | 54 ------------- .../service/jobhandler/NotificationJob.java | 76 ------------------- .../jobhandler/NotificationListener.java | 38 ++++++++++ .../src/main/resources/application.properties | 33 -------- executor/src/main/resources/application.yml | 30 ++++++++ middleware/services.yml | 17 ----- reader/src/main/resources/application.yml | 7 ++ .../controller/NotificationController.java | 9 ++- writer/src/main/resources/application.yml | 19 ++--- 13 files changed, 217 insertions(+), 255 deletions(-) create mode 100644 common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaProducerConfig.java delete mode 100644 executor/src/main/java/cn/org/gitlink/notification/executor/core/config/XXLJobConfig.java delete mode 100644 executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationJob.java create mode 100644 executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationListener.java delete mode 100644 executor/src/main/resources/application.properties create mode 100644 executor/src/main/resources/application.yml diff --git a/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaProducerConfig.java b/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaProducerConfig.java new file mode 100644 index 0000000..869d76e --- /dev/null +++ b/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaProducerConfig.java @@ -0,0 +1,48 @@ +package cn.org.gitlink.notification.common.utils; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + + +@Configuration +public class KafkaProducerConfig { + + @Value("${spring.kafka.producer.bootstrap_servers:#{null}}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.client_id:#{null}}") + private String clientId; + + @Value("${spring.kafka.producer.retries:#{null}}") + private Integer retries; + + @Value("${spring.kafka.producer.batch_size:#{null}}") + private Integer batchSize; + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerConfigs(),true); + } + + @Bean + ProducerFactory producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.RETRIES_CONFIG, retries); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } +} diff --git a/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaUtil.java b/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaUtil.java index 9c8a4f6..a43190c 100644 --- a/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaUtil.java +++ b/common/src/main/java/cn/org/gitlink/notification/common/utils/KafkaUtil.java @@ -1,11 +1,17 @@ package cn.org.gitlink.notification.common.utils; + import com.google.common.collect.Lists; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; import javax.annotation.PostConstruct; import java.util.Collection; @@ -26,15 +32,16 @@ import java.util.stream.Collectors; @Component public class KafkaUtil { - @Value("${spring.kafka.bootstrap-servers:#{null}}") - private String springKafkaBootstrapServers; + private Logger logger = LogManager.getLogger(KafkaUtil.class); + + @Value("${spring.kafka.producer.bootstrap_servers:#{null}}") + private String kafkaServer; private AdminClient adminClient; @Autowired private KafkaTemplate kafkaTemplate; - /** * 初始化AdminClient * '@PostConstruct该注解被用来修饰一个非静态的void()方法。 @@ -44,29 +51,26 @@ public class KafkaUtil { @PostConstruct private void initAdminClient() { Map props = new HashMap<>(1); - if (springKafkaBootstrapServers == null) { + if (kafkaServer == null) { return; } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); adminClient = KafkaAdminClient.create(props); } - /** - * 新增topic,支持批量 - */ public void createTopic(Collection newTopics) { adminClient.createTopics(newTopics); } - /** - * 删除topic,支持批量 - */ public void deleteTopic(Collection topics) { adminClient.deleteTopics(topics); } /** - * 获取指定topic的信息 + * 获取 Topic 的信息 + * + * @param topics + * @return */ public String getTopicInfo(Collection topics) { AtomicReference info = new AtomicReference(""); @@ -77,28 +81,55 @@ public class KafkaUtil { } }); } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); + logger.error(e); } return info.get(); } /** - * 获取全部topic + * 获取所有 Topic + * + * @return */ public List getAllTopic() { try { return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); + logger.error(e); } return Lists.newArrayList(); } /** - * 往topic中发送消息 + * 往 Kafka 发送消息 + * + * @param topic + * @param message */ public void sendMessage(String topic, String message) { - kafkaTemplate.send(topic, message); + ListenableFuture> future = kafkaTemplate.send(topic, message); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + logger.error(throwable); + } + + @Override + public void onSuccess(SendResult stringStringSendResult) { + logger.debug(stringStringSendResult.getRecordMetadata()); + } + }); } + /** + * 往 kafka 发送信息 + * + * @param topic + * @param message + * @param callback + */ + public void sendMessage(String topic, String message, ListenableFutureCallback callback) { + ListenableFuture> future = kafkaTemplate.send(topic, message); + future.addCallback(callback); + } } \ No newline at end of file diff --git a/executor/pom.xml b/executor/pom.xml index 1144879..e8953c0 100644 --- a/executor/pom.xml +++ b/executor/pom.xml @@ -21,13 +21,6 @@ ${springboot.version} - - - com.xuxueli - xxl-job-core - 2.3.0 - - cn.org.gitlink.notification gns-common diff --git a/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/KafkaConsumerConfig.java b/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/KafkaConsumerConfig.java index 2456acc..01db58b 100644 --- a/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/KafkaConsumerConfig.java +++ b/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/KafkaConsumerConfig.java @@ -1,10 +1,14 @@ package cn.org.gitlink.notification.executor.core.config; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; @@ -13,46 +17,43 @@ import java.util.Map; @EnableKafka public class KafkaConsumerConfig { - @Value("${kafka.consumer.servers}") + @Value("${spring.kafka.consumer.bootstrap_servers:#{null}}") private String servers; - @Value("${kafka.consumer.enable.auto.commit}") - private boolean enableAutoCommit; - @Value("${kafka.consumer.session.timeout:10000}") - private String sessionTimeout; - @Value("${kafka.consumer.auto.commit.interval}") - private String autoCommitInterval; - @Value("${kafka.consumer.group.id}") + + @Value("${spring.kafka.consumer.group_id}") private String groupId; - @Value("${kafka.consumer.auto.offset.reset}") + + @Value("${spring.kafka.consumer.client_id}") + private String clientId; + + @Value("${spring.kafka.consumer.auto_offset_reset}") private String autoOffsetReset; - @Value("${kafka.consumer.max.poll.records}") + + @Value("${spring.kafka.consumer.max_poll_records}") private String maxPollRecords; - @Bean("kafkaConsumer") - public KafkaConsumer consumer() { - return new KafkaConsumer<>(consumerConfigs()); + @Value("${spring.kafka.consumer.topic}") + private String topic; + + @Bean + public ConcurrentKafkaListenerContainerFactory consumerListenerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerConfigs()); + factory.setRecordFilterStrategy(record -> record.topic().toLowerCase().equals(this.topic)); + return factory; } - - public Map consumerConfigs() { + @Bean + public ConsumerFactory consumerConfigs() { Map props = new HashMap<>(); - props.put("bootstrap.servers", servers); - //每个消费者分配独立的组号 - props.put("group.id", groupId); - //如果value合法,则自动提交偏移量 - props.put("enable.auto.commit", enableAutoCommit); - // 每次拉取10条 - props.put("max.poll.records", maxPollRecords); - //设置多久一次更新被消费消息的偏移量 - props.put("auto.commit.interval.ms", autoCommitInterval); - //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 - props.put("session.timeout.ms", sessionTimeout); - //自动重置offset - props.put("auto.offset.reset", autoOffsetReset); - props.put("key.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer"); - return props; + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); } } diff --git a/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/XXLJobConfig.java b/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/XXLJobConfig.java deleted file mode 100644 index b27c894..0000000 --- a/executor/src/main/java/cn/org/gitlink/notification/executor/core/config/XXLJobConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -package cn.org.gitlink.notification.executor.core.config; - -import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class XXLJobConfig { - private Logger logger = LoggerFactory.getLogger(XXLJobConfig.class); - - @Value("${xxl.job.admin.addresses}") - private String adminAddresses; - - @Value("${xxl.job.accessToken}") - private String accessToken; - - @Value("${xxl.job.executor.appname}") - private String appname; - - @Value("${xxl.job.executor.address}") - private String address; - - @Value("${xxl.job.executor.ip}") - private String ip; - - @Value("${xxl.job.executor.port}") - private int port; - - @Value("${xxl.job.executor.logpath}") - private String logPath; - - @Value("${xxl.job.executor.logretentiondays}") - private int logRetentionDays; - - - @Bean - public XxlJobSpringExecutor xxlJobExecutor() { - logger.info(">>>>>>>>>>> xxl-job config init."); - XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); - xxlJobSpringExecutor.setAdminAddresses(adminAddresses); - xxlJobSpringExecutor.setAppname(appname); - xxlJobSpringExecutor.setAddress(address); - xxlJobSpringExecutor.setIp(ip); - xxlJobSpringExecutor.setPort(port); - xxlJobSpringExecutor.setAccessToken(accessToken); - xxlJobSpringExecutor.setLogPath(logPath); - xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); - - return xxlJobSpringExecutor; - } -} diff --git a/executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationJob.java b/executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationJob.java deleted file mode 100644 index 0c51382..0000000 --- a/executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationJob.java +++ /dev/null @@ -1,76 +0,0 @@ -package cn.org.gitlink.notification.executor.service.jobhandler; - -import cn.org.gitlink.notification.common.utils.SpringContextUtil; -import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo; -import cn.org.gitlink.notification.model.service.notification.SysNotificationService; -import com.alibaba.fastjson.JSONObject; -import com.xxl.job.core.context.XxlJobHelper; -import com.xxl.job.core.handler.annotation.XxlJob; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import java.time.Duration; -import java.util.Collections; - -/** - * 系统消息消费执行器 - * @author zengwei - * @Date 2021-09-09 - */ -@Component -public class NotificationJob { - private static Logger logger = LoggerFactory.getLogger(NotificationJob.class); - - @Value("${topic.notification.name: #{null}}") - private String topic; - - @Autowired - private SysNotificationService sysNotificationService; - - /** - * 系统消息处理入口,在xxl-job-admin添加任务时,JobHandler栏填注解内名字 - * @author zengwei - * @Date 2021-09-09 - * */ - @XxlJob("notificationMessageHandler") - public void notificationMessageHandler() { - - if (null == topic){ - XxlJobHelper.handleFail("未配置topic!"); - return; - } - - XxlJobHelper.log("topic: {}", topic); - - KafkaConsumer consumer = (KafkaConsumer) SpringContextUtil.getBean("kafkaConsumer"); - consumer.subscribe(Collections.singletonList(topic)); - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - XxlJobHelper.log("拉取到{}条记录!", records.count()); - - for (ConsumerRecord record: records) { - NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(record.value(), NewSysNotificationVo.class); - try { - sysNotificationService.sendNotification(newSysNotificationVo); - XxlJobHelper.log("{} 消费成功!", record.value()); - } catch (Exception e) { - XxlJobHelper.log("{} 消费失败, 原因:{}", record.value(), e.getMessage()); - } - } - - consumer.commitAsync(); - - } - - public void init(){ - logger.info("init notification job"); - } - public void destroy(){ - logger.info("notification job has been destroyed"); - } -} diff --git a/executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationListener.java b/executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationListener.java new file mode 100644 index 0000000..5a8a014 --- /dev/null +++ b/executor/src/main/java/cn/org/gitlink/notification/executor/service/jobhandler/NotificationListener.java @@ -0,0 +1,38 @@ +package cn.org.gitlink.notification.executor.service.jobhandler; + +import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo; +import cn.org.gitlink.notification.model.service.notification.SysNotificationService; +import com.alibaba.fastjson.JSONObject; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Configuration +@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group_id}") +public class NotificationListener { + + private Logger logger = LogManager.getLogger(NotificationListener.class); + + @Autowired + private SysNotificationService sysNotificationService; + + @KafkaHandler + public void messageHanlder(String message) { + try { + NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(message, NewSysNotificationVo.class); + sysNotificationService.sendNotification(newSysNotificationVo); + } catch (Exception e) { + logger.error(e); + } + } + + @KafkaHandler(isDefault = true) + public void defaultHandler(Object object) { + logger.error("Unknow object received. ->" + object); + }//end of method +} diff --git a/executor/src/main/resources/application.properties b/executor/src/main/resources/application.properties deleted file mode 100644 index fd0b450..0000000 --- a/executor/src/main/resources/application.properties +++ /dev/null @@ -1,33 +0,0 @@ -server.port=8083 -logging.config=classpath:logback.xml -### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://localhost:9999/xxl-job-admin -### xxl-job, access token -xxl.job.accessToken= -### xxl-job executor appname -xxl.job.executor.appname=gitlink-notification-executor -### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address= -### xxl-job executor server-info -xxl.job.executor.ip= -xxl.job.executor.port=82 -### xxl-job executor log-path -xxl.job.executor.logpath=./logs/xxl-job/jobhandler -### xxl-job executor log-retention-days -xxl.job.executor.logretentiondays=30 - -kafka.consumer.servers=localhost:29092,localhost:39092 -kafka.consumer.group.id=gitlink-notification-group -kafka.consumer.auto.offset.reset=earliest -kafka.consumer.enable.auto.commit=false -kafka.consumer.auto.commit.interval=100 -kafka.consumer.max.poll.records =1000 - -# DB settings -spring.datasource.driver-class-name=com.mysql.jdbc.Driver -spring.datasource.url=jdbc:mysql://127.0.0.1:33306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false -spring.datasource.username=gitlink -spring.datasource.password=giTlinK0^827 - -# kafka topic names -topic.notification.name = gitlink_notification \ No newline at end of file diff --git a/executor/src/main/resources/application.yml b/executor/src/main/resources/application.yml new file mode 100644 index 0000000..6b4039c --- /dev/null +++ b/executor/src/main/resources/application.yml @@ -0,0 +1,30 @@ +server: + port: 8083 + +logging: + config: classpath:logback.xml + +spring: + kafka: + producer: + bootstrap_servers: localhost:29092 + client_id: gitlink_producer_01 + retries: 5 + batch_size: 16384 + consumer: + bootstrap_servers: localhost:29092 + client_id: gitlink_consumer + group_id: group-gitlink-notification + auto_offset_reset: earliest + max_poll_records: 5 + topic: topic-gitlink-notification + + + datasource: + # 配置数据源类型 + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://127.0.0.1:33306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false + username: gitlink + password: giTlinK0^827 +#ip白名单列表,多个ip用逗号隔开,允许所有用*号 +white-list: '*' \ No newline at end of file diff --git a/middleware/services.yml b/middleware/services.yml index 193cc3e..29ad036 100644 --- a/middleware/services.yml +++ b/middleware/services.yml @@ -33,23 +33,6 @@ services: networks: - gitlink_network - - xxl-job-admin: - image: xuxueli/xxl-job-admin:${XXL_JOB_ADMIN_VERSION} - container_name: ${XXL_JOB_ADMIN_CONTAINER_NAME} - hostname: xxl_job_admin - environment: - - JAVA_OPTS=-Xmx512m - - PARAMS=--spring.datasource.url=jdbc:mysql://${MYSQL_CONTAINER_NAME}:3306/gitlink_notification?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username=${MYSQL_USER} --spring.datasource.password=${MYSQL_PASSWORD} - volumes: - - ${DOCKER_DATA_PATH}/xxl-job-admin:/data/applogs - ports: - - ${XXL_JOB_ADMIN_LOCAL_PORT}:8080 - depends_on: - - mysql - networks: - - gitlink_network - # See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper zookeeper: image: confluentinc/cp-zookeeper:latest diff --git a/reader/src/main/resources/application.yml b/reader/src/main/resources/application.yml index 269b531..df4d960 100644 --- a/reader/src/main/resources/application.yml +++ b/reader/src/main/resources/application.yml @@ -27,6 +27,13 @@ spring: # 连接超时时间(毫秒) timeout: 1000 + kafka: + producer: + bootstrap_servers: localhost:29092 + client_id: gitlink_producer_01 + retries: 5 + batch_size: 16384 + datasource: # 配置数据源类型 driver-class-name: com.mysql.jdbc.Driver diff --git a/writer/src/main/java/cn/org/gitlink/notification/writer/controller/NotificationController.java b/writer/src/main/java/cn/org/gitlink/notification/writer/controller/NotificationController.java index 6dd4eb4..2958914 100644 --- a/writer/src/main/java/cn/org/gitlink/notification/writer/controller/NotificationController.java +++ b/writer/src/main/java/cn/org/gitlink/notification/writer/controller/NotificationController.java @@ -18,6 +18,8 @@ import org.apache.logging.log4j.Logger; import org.springframework.beans.BeanUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; import org.springframework.validation.BindingResult; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; @@ -26,10 +28,11 @@ import java.util.Map; @RestController @RequestMapping(value = "/gns/notification") +@Configuration public class NotificationController { - //kafka topic - private static final String GITLINK_NOTIFICATION_TOPIC = "gitlink_notification"; //在kafka中gitlink的topic + @Value("${spring.kafka.producer.topic}") + private String gitlinkNotificationTopic; @Autowired private KafkaUtil kafkaUtil; @@ -68,7 +71,7 @@ public class NotificationController { newSysNotificationVo.setPlatform(platform); try { - kafkaUtil.sendMessage(GITLINK_NOTIFICATION_TOPIC, JSONObject.toJSONString(newSysNotificationVo)); + kafkaUtil.sendMessage(gitlinkNotificationTopic, JSONObject.toJSONString(newSysNotificationVo)); return DataPacketUtil.jsonSuccessResult(); } catch (Exception e) { logger.error(e); diff --git a/writer/src/main/resources/application.yml b/writer/src/main/resources/application.yml index a8ef998..33866ce 100644 --- a/writer/src/main/resources/application.yml +++ b/writer/src/main/resources/application.yml @@ -5,22 +5,13 @@ logging: config: classpath:logback.xml spring: - #kafka配置 kafka: - #这里改为你的kafka服务器ip和端口号 - bootstrap-servers: localhost:29092 - #=============== producer ======================= producer: - #如果该值大于零时,表示启用重试失败的发送次数 - retries: 0 - #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节) - batch-size: 16384 - #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443 - buffer-memory: 33554432 - #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer - key-serializer: org.apache.kafka.common.serialization.StringSerializer - #value的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer - value-serializer: org.apache.kafka.common.serialization.StringSerializer + bootstrap_servers: localhost:29092 + client_id: gitlink_producer_01 + retries: 5 + batch_size: 16384 + topic: topic-gitlink-notification datasource: # 配置数据源类型