1.MQTT框架,长时间运行容易掉线,而且还重连不上 (Springboot+MQTT)?
原因:使用相同的clientID
方案:全局使用的clientID保证唯一性,可以采用UUID等方式
原因: 当前用户没有Topic权限
方案:如果配置有acl权限,则查询当前登录mqtt用户是否具有订阅对应topic的权限,无权时也会造成一直频繁断线重连
原因:在回调函数内进行业务处理遇到异常并没有捕获
方案:在可能出现异常的语句块,进行try-catch捕获
/*** subscribe订阅后得到的消息会执行到这里*/@Overridepublic void messageArrived(String topic, MqttMessage message) {String msg = new String(message.getPayload());try {//此处可能因为收到的消息不合法,会造成JSON转化异常,若异常未捕获,会导致MQTT客户端掉线JSONObject jsonObject = JSON.parseObject(msg);String gwId = String.valueOf(jsonObject.get("gwId"));} catch (JSONException e) {log.error("JSON Format Parsing Exception : {}", msg);}}
jmqtt-master.zip
iot-mqtt-server-master.zip
2.es里的数据导入另一个es?
答:reindexm,es有tool的。
https://gitee.com/ichiva/mybatis-sync-es
核心代码
同步处理器接口
package com.gzwl.interceptor;/*** es 同步处理器*/public interface SynEsHandler {void handler(Object parameter);}
拦截器
package com.gzwl.interceptor;import lombok.extern.slf4j.Slf4j;import org.apache.ibatis.executor.Executor;import org.apache.ibatis.mapping.MappedStatement;import org.apache.ibatis.plugin.*;import org.springframework.stereotype.Component;import java.util.*;/*** es 同步拦截器** 配置拦截器 @Signature* 前置拦截器 type = Executor.class* 只有写入才需要拦截 method = "update"* 拦截方法的参数 args = {MappedStatement.class, Object.class}*/@Slf4j@Component@Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})public class SyncEsInterceptor implements Interceptor {private Map<String,SynEsHandler> handlerMap = new HashMap<>();/*** 拦截器在sql执行成功后同步到es,* 如果同步失败抛出异常,保证数据一致性** @param invocation* @return* @throws Throwable*/@Overridepublic Object intercept(Invocation invocation) throws Throwable {Object res = invocation.proceed();Object[] args = invocation.getArgs();if (args.length >= 2) {MappedStatement mappedStatement = (MappedStatement) args[0];//拦截到的方法,也就是注册的keyString key = mappedStatement.getId();log.debug("拦截到的方法 key=",key);SynEsHandler synEsHandler = handlerMap.get(key);if(null != synEsHandler){try {synEsHandler.handler(args[1]);}catch (Exception e){//包装异常throw new SyncEsException(e);}}else {log.debug("没有处理的key={}",key);}}return res;}@Overridepublic Object plugin(Object o) {if(o instanceof Executor) return Plugin.wrap(o, this);else return o;}@Overridepublic void setProperties(Properties properties) {}/*** 注册同步处理器* @param key* @param parameterHandler*/public void regHandler(String key,SynEsHandler parameterHandler){handlerMap.put(key,parameterHandler);}}
实现处理接口
package com.gzwl.interceptor.impl;import com.gzwl.entity.Employee;import com.gzwl.interceptor.SynEsHandler;import com.gzwl.interceptor.SyncEsInterceptor;import org.apache.ibatis.binding.MapperMethod;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentclass EmployeeMapperHandler implements SynEsHandler {@Autowiredpublic EmployeeMapperHandler(SyncEsInterceptor syncEsInterceptor){//将自己注册到拦截器中,每个方法可用是独立的handler,也可用注册多个方法的处理器syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.insert",this);syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.update",this);}@Overridepublic void handler(Object parameter) {Employee entity;if(parameter instanceof Employee){//insert 可用直接拦截到实体类entity = (Employee) parameter;}else {//update 方法需要特殊处理entity = (Employee) ((MapperMethod.ParamMap) parameter).get("param1");}//在这里编写es的同步逻辑}}
完整代码:https://gitee.com/ichiva/mybatis-sync-es
注意:这个方案也是有致命缺陷的,就是在es执行完成后,程序还有可能让数据库回滚,从而造成数据不一致。
