kafka Persist

This commit is contained in:
MaxKey
2021-02-20 22:00:19 +08:00
parent 49cb4712c9
commit 618aba8b74
12 changed files with 21 additions and 39 deletions

View File

@@ -0,0 +1,30 @@
/*
* Copyright [2020] [MaxKey of copyright http://www.maxkey.top]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.maxkey.persistence.kafka;
public class KafkaIdentityAction {
public static String CREATE_ACTION = "CREATE_ACTION";
public static String DELETE_ACTION = "DELETE_ACTION";
public static String UPDATE_ACTION = "UPDATE_ACTION";
public static String PASSWORD_ACTION = "PASSWORD_ACTION";
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright [2020] [MaxKey of copyright http://www.maxkey.top]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.maxkey.persistence.kafka;
public class KafkaIdentityTopic {
public final static String USERINFO_TOPIC = "IDENTITY_USERINFO_TOPIC";
public final static String ORG_TOPIC = "IDENTITY_ORG_TOPIC";
public final static String GROUP_TOPIC = "IDENTITY_GROUP_TOPIC";
public final static String PASSWORD_TOPIC = "IDENTITY_PASSWORD_TOPIC";
}

View File

@@ -0,0 +1,70 @@
/*
* Copyright [2020] [MaxKey of copyright http://www.maxkey.top]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.maxkey.persistence.kafka;
public class KafkaMessage {
String topic;
String actionType;
String sendTime;
String msgId;
String content;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getActionType() {
return actionType;
}
public void setActionType(String actionType) {
this.actionType = actionType;
}
public String getSendTime() {
return sendTime;
}
public void setSendTime(String sendTime) {
this.sendTime = sendTime;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public Object getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public KafkaMessage() {
}
}

View File

@@ -0,0 +1,112 @@
/*
* Copyright [2020] [MaxKey of copyright http://www.maxkey.top]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.maxkey.persistence.kafka;
import java.util.UUID;
import org.maxkey.configuration.ApplicationConfig;
import org.maxkey.util.DateUtils;
import org.maxkey.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaPersistService {
private static final Logger _logger = LoggerFactory.getLogger(KafkaPersistService.class);
@Autowired
protected ApplicationConfig applicationConfig;
@Autowired
protected KafkaTemplate<String, String> kafkaTemplate;
public void setApplicationConfig(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
}
public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* send msg to kafka
* @param topic kafka TOPIC
* @param content msg Object
* @param actionType CREATE UPDATE DELETE
*/
public void send(String topic,Object content,String actionType) {
//config.identity.kafkasupport , if true
if(applicationConfig.isKafkaSupport()) {
KafkaMessage message = new KafkaMessage();
//message id uuid
message.setMsgId(UUID.randomUUID().toString());
message.setActionType(actionType);
message.setTopic(topic);
//send to kafka time
message.setSendTime(DateUtils.getCurrentDateTimeAsString());
//content Object to json message content
message.setContent(JsonUtils.gson2Json(content));
String msg = JsonUtils.gson2Json(message);
_logger.info("send message = {}", msg);
//通过线程发送Kafka消息
KafkaProvisioningThread thread =
new KafkaProvisioningThread(kafkaTemplate,topic,msg);
thread.start();
}
}
/**
* KafkaProvisioningThread for send message
*
*/
class KafkaProvisioningThread extends Thread{
KafkaTemplate<String, String> kafkaTemplate;
String topic ;
String msg;
public KafkaProvisioningThread(
KafkaTemplate<String, String> kafkaTemplate,
String topic,
String msg) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
this.msg = msg;
}
@Override
public void run() {
kafkaTemplate.send(topic, msg);
}
}
}

View File

@@ -34,9 +34,9 @@ import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.maxkey.domain.Organizations;
import org.maxkey.identity.kafka.KafkaIdentityAction;
import org.maxkey.identity.kafka.KafkaIdentityTopic;
import org.maxkey.identity.kafka.KafkaProvisioningService;
import org.maxkey.persistence.kafka.KafkaIdentityAction;
import org.maxkey.persistence.kafka.KafkaIdentityTopic;
import org.maxkey.persistence.kafka.KafkaPersistService;
import org.maxkey.persistence.mapper.OrganizationsMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -49,7 +49,7 @@ import com.google.common.collect.Lists;
public class OrganizationsService extends JpaBaseService<Organizations>{
@Autowired
KafkaProvisioningService kafkaProvisioningService;
KafkaPersistService kafkaPersistService;
public OrganizationsService() {
super(OrganizationsMapper.class);
@@ -66,7 +66,7 @@ public class OrganizationsService extends JpaBaseService<Organizations>{
public boolean insert(Organizations organization) {
if(super.insert(organization)){
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.ORG_TOPIC, organization, KafkaIdentityAction.CREATE_ACTION);
return true;
}
@@ -75,7 +75,7 @@ public class OrganizationsService extends JpaBaseService<Organizations>{
public boolean update(Organizations organization) {
if(super.update(organization)){
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.ORG_TOPIC, organization, KafkaIdentityAction.UPDATE_ACTION);
return true;
}
@@ -84,7 +84,7 @@ public class OrganizationsService extends JpaBaseService<Organizations>{
public boolean delete(Organizations organization) {
if(super.delete(organization)){
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.ORG_TOPIC, organization, KafkaIdentityAction.DELETE_ACTION);
return true;
}

View File

@@ -40,10 +40,10 @@ import org.maxkey.crypto.ReciprocalUtils;
import org.maxkey.crypto.password.PasswordReciprocal;
import org.maxkey.domain.ChangePassword;
import org.maxkey.domain.UserInfo;
import org.maxkey.identity.kafka.KafkaIdentityAction;
import org.maxkey.identity.kafka.KafkaIdentityTopic;
import org.maxkey.identity.kafka.KafkaProvisioningService;
import org.maxkey.persistence.db.PasswordPolicyValidator;
import org.maxkey.persistence.kafka.KafkaIdentityAction;
import org.maxkey.persistence.kafka.KafkaIdentityTopic;
import org.maxkey.persistence.kafka.KafkaPersistService;
import org.maxkey.persistence.mapper.UserInfoMapper;
import org.maxkey.util.DateUtils;
import org.maxkey.util.StringUtils;
@@ -76,7 +76,7 @@ public class UserInfoService extends JpaBaseService<UserInfo> {
PasswordPolicyValidator passwordPolicyValidator;
@Autowired
KafkaProvisioningService kafkaProvisioningService;
KafkaPersistService kafkaPersistService;
@Autowired
protected JdbcTemplate jdbcTemplate;
@@ -96,7 +96,7 @@ public class UserInfoService extends JpaBaseService<UserInfo> {
public boolean insert(UserInfo userInfo) {
userInfo = passwordEncoder(userInfo);
if (super.insert(userInfo)) {
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.USERINFO_TOPIC,
userInfo,
KafkaIdentityAction.CREATE_ACTION);
@@ -109,7 +109,7 @@ public class UserInfoService extends JpaBaseService<UserInfo> {
public boolean update(UserInfo userInfo) {
userInfo = passwordEncoder(userInfo);
if (super.update(userInfo)) {
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.USERINFO_TOPIC,
userInfo,
KafkaIdentityAction.UPDATE_ACTION);
@@ -122,7 +122,7 @@ public class UserInfoService extends JpaBaseService<UserInfo> {
public boolean delete(UserInfo userInfo) {
if( super.delete(userInfo)){
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.USERINFO_TOPIC,
userInfo,
KafkaIdentityAction.DELETE_ACTION);
@@ -283,7 +283,7 @@ public class UserInfoService extends JpaBaseService<UserInfo> {
changePassword.setUsername(userInfo.getUsername());
changePassword.setDecipherable(userInfo.getDecipherable());
changePassword.setPassword(userInfo.getPassword());
kafkaProvisioningService.send(
kafkaPersistService.send(
KafkaIdentityTopic.PASSWORD_TOPIC,
changePassword,
KafkaIdentityAction.PASSWORD_ACTION);