邮件model层完善,添加emailListener及emailService

This commit is contained in:
万佳 2021-09-15 11:36:39 +08:00
parent 13a22b2764
commit b3e98577ae
13 changed files with 249 additions and 27 deletions

View File

@ -0,0 +1,94 @@
package cn.org.gitlink.notification.executor.service.email;
import cn.org.gitlink.notification.model.dao.entity.EmailJob;
import cn.org.gitlink.notification.model.dao.entity.EmailSendRecord;
import cn.org.gitlink.notification.model.service.notification.EmailJobsService;
import cn.org.gitlink.notification.model.service.notification.EmailSendRecordsService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Component
public class EmailService {
private Logger logger = LogManager.getLogger(EmailService.class);
@Autowired
private EmailJobsService emailJobsService;
@Autowired
private EmailSendRecordsService emailSendRecordsService;
/**
* @Description: 处理邮件发送任务根据emails添加到邮件发送记录表中
*
* @Param platform 平台编码
* @Param dispatchNumber 待处理发送任务列表数量
* @return: boolean
* @Author: wanjia
* @Date: 2021/9/15
*/
public boolean DispatchEmailJobs(String platform, Integer dispatchNumber) {
//获取指定数量待处理列表
List<EmailJob> emailJobList = new ArrayList<>();
try {
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, -1, dispatchNumber);
} catch (Exception e) {
logger.error("获取未处理邮件任务列表失败:\n" + e);
}
//将EmailJob分配到email_send_records中
Boolean flag = null;
for (EmailJob emailJob : emailJobList) {
try {
flag = emailSendRecordsService.newEmailSendRecords(platform, emailJob.getEmails(), emailJob.getId());
} catch (Exception e) {
logger.error("处理EmailJob失败email_job_id: " + emailJob.getId() + "\n" + e);
}
//EmailJob分配成功更新状态
try {
emailJobsService.markEmailJobAs(platform, emailJob.getId(), new Date(), flag ? 1 : 2);
} catch (Exception e) {
logger.error("更新EmailJob状态失败email_job_id: " + emailJob.getId() + "\n" + e);
}
}
return true;
}
/**
* @Description: 发送邮件
*
* @Param platform 平台编码
* @Param sentNumber 一次发送数量
* @return: void
* @Author: wanjia
* @Date: 2021/9/15
*/
public void sendEmail(String platform, Integer sentNumber){
List<EmailSendRecord> emailSendRecordList = new ArrayList<>();
try {
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, -1, sentNumber);
} catch (Exception e) {
logger.error("获取未发送邮件列表失败:\n" + e);
}
Boolean flag = null;
for (EmailSendRecord emailSendRecord : emailSendRecordList){
//todo 发邮件
// 取到一条 emailSendRecord 调用发邮件的Util返回发送结果赋值给flag
//更新emailSendRecord状态
try {
emailSendRecordsService.markEmailSendRecordAs(platform, emailSendRecord.getId(), new Date(), flag ? 1 : 2);
} catch (Exception e){
logger.error("更新EmailSendRecord状态失败email_send_record_id: " + emailSendRecord.getId() + "\n" + e);
}
}
}
}

View File

@ -0,0 +1,39 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import cn.org.gitlink.notification.model.service.notification.EmailJobsService;
import com.alibaba.fastjson.JSONObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Configuration
//todo 邮件的topics和groupId待处理
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group_id}")
public class EmailJobsListener {
private Logger logger = LogManager.getLogger(EmailJobsListener.class);
@Autowired
private EmailJobsService emailJobsService;
@KafkaHandler
public void messageHandler(String message) {
try {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
emailJobsService.sendEmail(newEmailJobVo);
} catch (Exception e) {
logger.error(e);
}
}
@KafkaHandler(isDefault = true)
public void defaultHandler(Object object) {
logger.error("Unknow object received. ->" + object);
}//end of method
}

View File

@ -20,7 +20,6 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<configuration>
<includeSystemScope>true</includeSystemScope>
</configuration>

View File

@ -19,5 +19,5 @@ public interface EmailJobsMapper extends BaseMapper<EmailJob> {
int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record);
List<EmailJob> getNotDispatchedEmailJobs(@Param("platform") String platform);
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("size") Integer size);
}

View File

@ -22,4 +22,5 @@ public interface EmailSendRecordsMapper extends BaseMapper<EmailSendRecord> {
//批量插入邮件发送任务记录
int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList);
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("size") Integer sentNumber);
}

View File

@ -23,14 +23,16 @@ public interface EmailJobsService extends IService<EmailJob> {
* @Description: 获取所有未处理邮件任务列表
*
* @Param platform 平台编码
* @Param status 发送状态-1 未处理,1 处理成功,2 处理失败
* @Param size 列表大小
* @return: List<EmailJob>
* @Author: wanjia
* @Date: 2021/9/13
*/
List<EmailJob> getNotDispatchedEmailJobs(String platform) throws Exception;
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) throws Exception;
/**
* @Description:
* @Description: 更新邮件任务状态
*
* @Param platform 平台编码
* @Param emailJobId 邮件任务id

View File

@ -4,10 +4,12 @@ import cn.org.gitlink.notification.model.dao.entity.EmailSendRecord;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.Date;
import java.util.List;
public interface EmailSendRecordsService extends IService<EmailSendRecord> {
/**
* @Description: 新增邮件任务到邮件发送记录表中
*
* @Param platform 平台编码
* @Param emails 邮件地址eg: w@163.com,j@163.com
* @Param jobId 邮件任务id
@ -18,7 +20,19 @@ public interface EmailSendRecordsService extends IService<EmailSendRecord> {
boolean newEmailSendRecords(String platform, String emails, Integer jobId) throws Exception;
/**
* @Description:
* @Description: 获取发送记录列表
*
* @Param platform 平台编码
* @Param status 邮件发送记录状态
* @Param size 列表数量
* @return: List<EmailSendRecord>
* @Author: wanjia
* @Date: 2021/9/15
*/
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer size) throws Exception;
/**
* @Description: 变更邮件发送记录状态
*
* @Param platform 平台编码
* @Param emailSendRecordId 邮件发送记录id

View File

@ -26,8 +26,8 @@ public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob>
}
@Override
public List<EmailJob> getNotDispatchedEmailJobs(String platform) {
return baseMapper.getNotDispatchedEmailJobs(platform);
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) {
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, size);
}
@Override

View File

@ -38,4 +38,9 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
emailSendRecord.setStatus(status);
return baseMapper.updateByPrimaryKeySelective(platform, emailSendRecord);
}
@Override
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer sentNumber) throws Exception {
return baseMapper.getRecordsByStatus(platform, status, sentNumber);
}
}

View File

@ -125,8 +125,8 @@
dispatched_status = #{record.dispatchedStatus,jdbcType=INTEGER}
where id = #{record.id,jdbcType=INTEGER}
</update>
<select id="getNotDispatchedEmailJobs" resultMap="BaseResultMap">
<select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap">
select * from ${platform}_email_jobs
where dispatched_status = -1
where dispatched_status = #{dispatchedStatus} limit #{size}
</select>
</mapper>

View File

@ -117,4 +117,8 @@
)
</foreach >
</insert >
<select id="getRecordsByStatus" resultMap="BaseResultMap">
select * from ${platform}_email_send_records
where status = #{status} limit #{size}
</select>
</mapper>

View File

@ -0,0 +1,69 @@
package cn.org.gitlink.notification.model.service.notification;
import cn.org.gitlink.notification.model.dao.entity.EmailJob;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.util.Date;
import java.util.List;
@RunWith(MockitoJUnitRunner.class)
public class EmailServiceTest {
@Mock
EmailJobsService emailJobsService;
@Mock
EmailSendRecordsService emailSendRecordsService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
public void testEmailService(){
NewEmailJobVo emailJobVo = new NewEmailJobVo();
emailJobVo.setPlatform("gitlink");
emailJobVo.setSender(1);
emailJobVo.setEmails("w@163.com,j@163.com");
emailJobVo.setSubject("subjectTest");
emailJobVo.setContent("contentTest");
//添加发送邮件任务
try {
Assert.isTrue(emailJobsService.sendEmail(emailJobVo), "done");
} catch (Exception e) {
e.printStackTrace();
}
//获取待处理邮件任务列表
try {
List<EmailJob> emailJobs = emailJobsService.getEmailJobsByDispatchedStatus("gitlink",-1,10);
Assert.notEmpty(emailJobs, "success");
} catch (Exception e) {
e.printStackTrace();
}
//获取待处理邮件任务列表并处理邮件任务在records表中插入发送记录数据
try {
List<EmailJob> emailJobs = emailJobsService.getEmailJobsByDispatchedStatus("gitlink", -1, 10);
for (EmailJob emailJob : emailJobs){
emailSendRecordsService.newEmailSendRecords("gitlink", emailJob.getEmails(), emailJob.getId());
}
//发送邮件成功后更新邮件发送记录
int count = emailSendRecordsService.markEmailSendRecordAs("gitlink", 1, new Date(), 1);
Assert.isTrue(count > 0, "update email_send_record status success");
} catch (Exception e){
e.printStackTrace();
}
try {
int count = emailJobsService.markEmailJobAs("gitlink", 1, new Date(), 1);
Assert.isTrue(count > 0, "update status success");
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -13,8 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ModelApplicationTest.class)
@ -28,22 +26,19 @@ public class ServiceTests {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
public void testSysNotificationService() {
// int i = sysNotificationService.getUnreadNotificationCount("gitlink", 234);
// List<SysNotification> sysNotificationList = sysNotificationService.getUnreadNotificationByType("gitlink", 100, 20, 1);
// Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification(1, 10, "", "gitlink", 100);
// NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo();
// newSysNotificationVo.setSender((long) -1);
// newSysNotificationVo.setReceivers("7,8");
// newSysNotificationVo.setContent("baladiwei 在 gitlink-notification-system 修改ReadMe 文件的标题");
// newSysNotificationVo.setNotification_url("www.baidu.com");
// newSysNotificationVo.setPlatform("gitlink");
// boolean j = sysNotificationService.sendNotification(newSysNotificationVo);
// logger.info("insertResult:" + j);
// int k = sysNotificationService.markNotificationAs("gitlink", "2,3", 2);
// logger.info("updateStatus:" + k);
// int l = sysNotificationService.getUnreadNotificationByType(1,"gitlink",1);
// logger.info("SysNotificationNumber:" + l);
public void testSysNotificationService() throws Exception {
int i = sysNotificationService.getNotificationCount("gitlink", 234,1,1);
Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification("gitlink", 100,1,1,1,20);
NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo();
newSysNotificationVo.setSender(1);
newSysNotificationVo.setReceivers("7,8");
newSysNotificationVo.setContent("baladiwei 在 gitlink-notification-system 修改ReadMe 文件的标题");
newSysNotificationVo.setNotification_url("www.baidu.com");
newSysNotificationVo.setPlatform("gitlink");
boolean j = sysNotificationService.sendNotification(newSysNotificationVo);
logger.info("insertResult:" + j);
int k = sysNotificationService.markNotificationAs("gitlink", 100, "1,2",2,1);
logger.info("updateStatus:" + k);
}
@Test