前言
Constrained Application Protocol(CoAP)是用于受约束设备的专用Internet应用程序协议,如RFC 7275所定义。它使那些称为“节点”的受约束设备能够使用类似协议与更广泛的Internet通信。CoAP被设计用于在相同约束网络(例如,低功率,有损网络)上的设备之间,设备与Internet上的一般节点之间以及在都通过Internet连接的不同约束网络上的设备之间使用。CoAP也正在通过其他机制使用,例如移动通信网络上的SMS。
下图是近几年的CoAP关注区域热力图:
具体表格如下:
自2015年5月15号到2020年5月15日统计值
| 关注区域 | 关注值 |
|---|---|
| 中国 | 100 |
| 韩国 | 37 |
| 印度 | 35 |
关联模块一览
和CoAP设备传输协议关联的模块有Thingsboard CoAP Transport Service、Thingsboard CoAP Transport Common和Thingsboard Server Queue components。前面这些名称大家可以看IDEA maven模块名称。
CoAP Transport Service
CoAP Transport Common
Server Queue Components
CoAP Transport Service
org.thingsboard.server.coap.ThingsboardCoapTransportApplication, CoAP服务启动类,使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。
@SpringBootConfiguration@EnableAsync@EnableScheduling@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"})public class ThingsboardCoapTransportApplication {private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-coap-transport";public static void main(String[] args) {SpringApplication.run(ThingsboardCoapTransportApplication.class, updateArguments(args));}private static String[] updateArguments(String[] args) {if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {String[] modifiedArgs = new String[args.length + 1];System.arraycopy(args, 0, modifiedArgs, 0, args.length);modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;return modifiedArgs;}return args;}}Copy to clipboardErrorCopied
第2-3行代码中,@EnableAsync注解使用来开启异步线程,@EnableScheduling注解使用来开启定时任务。
第4行代码@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"}): 扫描这些包下的所有使用@Component 的类,不管自动导入还是导出。
第7-8行代码和updateArguments的作用是:启动时,使用 –spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。
CoAP Transport Common
Californium框架
GitHub项目地址: https://github.com/eclipse/californium
官网介绍: https://www.eclipse.org/californium/
- 基于Java的CoAP框架
Californium是一个强大的CoAP框架,目标是后端服务与小型物联网设备进行通信,当然大型物联网设备也适宜。它为RESTful Web服务提供了一个更方便的API,支持CoAP的所有特性。
- 标准兼容
Californium已经运行了IETF的代码,并通过了所有ETSI插件测试规范。
- 云原生
Californium具有可伸缩性的体系结构,并且性能优于高性能的HTTP服务器。CoAP的低开销允许使用一个服务实例处理数百万个IoT设备。灵活的并发模型允许实现最适合您的应用程序的任何东西。
引入依赖
CoAP Transport common通过引入californium 1.0.2版本的jar包对CoAP进行协议逻辑实现。
<dependency><groupId>org.eclipse.californium</groupId><artifactId>californium-core</artifactId><version>1.0.2</version></dependency>Copy to clipboardErrorCopied
参数配置
transport:# 本地CoAP传输协议参数coap:# 开启/关闭CoAP传输协议.enabled: "${COAP_ENABLED:true}"# 绑定地址bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"# 绑定端口号bind_port: "${COAP_BIND_PORT:5683}"# 超时时间timeout: "${COAP_TIMEOUT:10000}"Copy to clipboardErrorCopied
模块目录结构
首先我们看该模块下的目录结构:
.└── java└── org└── thingsboard└── server└── transport└── coap├── CoapTransportContext.java CoAP传输协议上下文├── CoapTransportResource.java CoAP传输协议资源类├── CoapTransportService.java CoAP传输协议启动类├── adaptors│ ├── CoapTransportAdaptor.java CoAP协议传输适配器│ └── JsonCoapAdaptor.java CoAP传输内容Json适配器└── client└── DeviceEmulator.java 设备仿真器Copy to clipboardErrorCopied
CoAP传输协议逻辑实现
CoapTransportContext
@Slf4j@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.coap.enabled}'=='true')")@Componentpublic class CoapTransportContext extends TransportContext {@Getter@Value("${transport.coap.bind_address}")private String host;@Getter@Value("${transport.coap.bind_port}")private Integer port;@Getter@Value("${transport.coap.timeout}")private Long timeout;@Getter@Autowiredprivate CoapTransportAdaptor adaptor;}Copy to clipboardErrorCopied
通过@Value注解将配置文件中的transport.coap.bind_address、transport.coap.bind_port、transport.coap.timeout取出来。
CoapTransportResource
@Service("CoapTransportService")@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.coap.enabled}'=='true')")@Slf4jpublic class CoapTransportService {private static final String V1 = "v1";private static final String API = "api";@Autowiredprivate CoapTransportContext coapTransportContext;private CoapServer server;@PostConstructpublic void init() throws UnknownHostException {log.info("Starting CoAP transport...");log.info("Starting CoAP transport server");//初始化CoAP Server服务this.server = new CoapServer();//创建CoAP资源服务createResources();//绑定地址和端口号InetAddress addr = InetAddress.getByName(coapTransportContext.getHost());InetSocketAddress sockAddr = new InetSocketAddress(addr, coapTransportContext.getPort());//为CoAP添加节点server.addEndpoint(new CoapEndpoint(sockAddr));//启动服务server.start();log.info("CoAP transport started!");}private void createResources() {//创建CoAP资源CoapResource api = new CoapResource(API);api.add(new CoapTransportResource(coapTransportContext, V1));server.add(api);}@PreDestroypublic void shutdown() {log.info("Stopping CoAP transport!");//优雅关闭CoAP服务this.server.destroy();log.info("CoAP transport stopped!");}}Copy to clipboardErrorCopied
第9-10行代码,通过@Autowired注解将刚才写的CoAPTransportContext注入进来。
第12行代码,通过Californium创建CoAP Server服务。
第14-25行代码中逻辑依次为:
- 初始化CoAP Server服务
- 创建CoAP资源服务
- 绑定地址和端口号
- 为CoAP添加节点
- 启动服务
第39-45行代码中通过@PreDestroy优雅的关闭CoAP服务。
CoapTransportResource
处理GET资源
@Overridepublic void handleGET(CoapExchange exchange) {Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());if (!featureType.isPresent()) {log.trace("Missing feature type parameter");exchange.respond(ResponseCode.BAD_REQUEST);} else if (featureType.get() == FeatureType.TELEMETRY) {log.trace("Can't fetch/subscribe to timeseries updates");exchange.respond(ResponseCode.BAD_REQUEST);} else if (exchange.getRequestOptions().hasObserve()) {processExchangeGetRequest(exchange, featureType.get());} else if (featureType.get() == FeatureType.ATTRIBUTES) {processRequest(exchange, SessionMsgType.GET_ATTRIBUTES_REQUEST);} else {log.trace("Invalid feature type parameter");exchange.respond(ResponseCode.BAD_REQUEST);}}Copy to clipboardErrorCopied
第3行代码中,通过 getFeatureType(Request request)获取Uri路径段字符串的第4个值,例如api/v1/123/telemetry这个Uri路径段,取的就是telemetry这个值的大写。
private static final int FEATURE_TYPE_POSITION = 4;private Optional<FeatureType> getFeatureType(Request request) {//返回Uri路径段字符串的列表List<String> uriPath = request.getOptions().getUriPath();try {//判断Uri路径段长度大小if (uriPath.size() >= FEATURE_TYPE_POSITION) {return Optional.of(FeatureType.valueOf(uriPath.get(FEATURE_TYPE_POSITION - 1).toUpperCase()));}} catch (RuntimeException e) {log.warn("Failed to decode feature type: {}", uriPath);}return Optional.empty();}Copy to clipboardErrorCopied
第4-9行代码中,如果featureType这个枚举为空、TELEMETRY和其他类型的错误的话,则返回128错误码。
第10-11行代码中,检查是否存在Observe选。如果是的话,进入processExchangeGetRequest(exchange, featureType.get())方法。
第12-13行代码中,如果featrueType是ATTRIBUTES,进入processRequest(CoapExchange exchange, SessionMsgType type)方法。
处理POST资源
@Overridepublic void handlePOST(CoapExchange exchange) {//获取POST请求Uri路径的第四个请求参数。Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());//如果请求参数为空时,记录日志并回复错误码。if (!featureType.isPresent()) {log.trace("Missing feature type parameter");exchange.respond(ResponseCode.BAD_REQUEST);} else {switch (featureType.get()) {case ATTRIBUTES:processRequest(exchange, SessionMsgType.POST_ATTRIBUTES_REQUEST);break;case TELEMETRY:processRequest(exchange, SessionMsgType.POST_TELEMETRY_REQUEST);break;case RPC:Optional<Integer> requestId = getRequestId(exchange.advanced().getRequest());if (requestId.isPresent()) {processRequest(exchange, SessionMsgType.TO_DEVICE_RPC_RESPONSE);} else {processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST);}break;case CLAIM:processRequest(exchange, SessionMsgType.CLAIM_REQUEST);break;}}}Copy to clipboardErrorCopied
第4-8行代码,获取POST请求Uri路径的第四个请求参数。如果请求参数为空时,记录日志并回复错误码。
第10-27行代码中,判断请求参数的类型
- ATTRIBUTES(属性) :
processRequest(exchange, SessionMsgType.POST_ATTRIBUTES_REQUEST) - TELEMETRY(遥测数据) :
processRequest(exchange, SessionMsgType.POST_TELEMETRY_REQUEST) - RPC(远程调用)
- 服务端RPC : 如果Uri路径带有请求标识符,则
processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST) - 客户端RPC:
processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST)
- 服务端RPC : 如果Uri路径带有请求标识符,则
- CLAIM(声明) :
processRequest(exchange, SessionMsgType.CLAIM_REQUEST)
processRequest(CoapExchange exchange, SessionMsgType type)
private void processRequest(CoapExchange exchange, SessionMsgType type) {//记录请求的相关信息log.trace("Processing {}", exchange.advanced().getRequest());exchange.accept();Exchange advanced = exchange.advanced();Request request = advanced.getRequest();//获取设备的AccessToken数据Optional<DeviceTokenCredentials> credentials = decodeCredentials(request);//如果设备AccessToken为空,则返回错误码if (!credentials.isPresent()) {exchange.respond(ResponseCode.BAD_REQUEST);return;}transportService.process(TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(credentials.get().getCredentialsId()).build(),new DeviceAuthCallback(transportContext, exchange, sessionInfo -> {UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());try {//判断逻辑,并进入相应的处理类中去switch (type) {case POST_ATTRIBUTES_REQUEST:...//上报设备活跃reportActivity(sessionId, sessionInfo);break;case POST_TELEMETRY_REQUEST:...reportActivity(sessionId, sessionInfo);break;case CLAIM_REQUEST:...case SUBSCRIBE_ATTRIBUTES_REQUEST:...break;case UNSUBSCRIBE_ATTRIBUTES_REQUEST:...break;case SUBSCRIBE_RPC_COMMANDS_REQUEST:...break;case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:...break;case TO_DEVICE_RPC_RESPONSE:...break;case TO_SERVER_RPC_REQUEST:...break;case GET_ATTRIBUTES_REQUEST:...break;}} catch (AdaptorException e) {log.trace("[{}] Failed to decode message: ", sessionId, e);exchange.respond(ResponseCode.BAD_REQUEST);} catch (IllegalAccessException e) {log.trace("[{}] Failed to process message: ", sessionId, e);exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);}}));}Copy to clipboardErrorCopied
第4-13行代码中,获取设备的AccessToken数据,如果设备AccessToken为空,则返回错误码。
第20-60行代码中,判断设备请求参数的逻辑,并进入相应的处理类中去。
第24行代码,通过reportActivity上报设备的活跃数据。
