环境搭建
安装rk356x提供好的docker镜像,在该镜像中已经安装好了所需要的环境, 我们使用这个环境主要是将模型转换成rknn格式
参考链接:
https://github.com/airockchip/yolov5.git
https://github.com/rockchip-linux/rknpu2
https://github.com/rockchip-linux/rknn-toolkit2
加载docker
进到环境目录
rknn-toolkit2-1.x.x/docker/docker_full
加载镜像文件
docker load --input rknn-toolkit2-1.x.x-cp36-docker.tar.gz
查看是否加载成功
docker images
创建容器
docker run -it --shm-size 4G -v D:\DATA\ml\rknn-toolkit2-1.5.0\examples\onnx\yolov5:/rknn_yolov5_demo rknn-toolkit2:1.5.0-ml /bin/bash
其中”-v 主机目录:虚拟机目录” 是将本机目录映射到docker中的目录, 成功进到docker目录之后, 我们就可以来生成模型了
以后每次启动的时候,启动对应的容器
docker container lsdocker start -ai <container_name>
可以使用如下命令查看内存使用
df -h
模型导出
在docker容器中进入到rknn_yolov5_demo文件夹,也就是上面映射的文件夹
cd /rknn_yolov5_demo
执行示例代码:
python3 ./test.py
它会自动将onnx格式的模型转换成rknn格式的模型
yolov5s-640-640.rknn
yolov5模型训练
启动训练
python3 train.py --data data/lines/itheima_lines.yaml --weights yolov5s.pt --img 640
模型导出为onnx
python export.py --weights yolov5s.pt --include onnx --rknpu rk3566
鲁班猫板端
RKNN Toolkit Lite2 主要用于 RKNN 模型在 Rockchip NPU 上的部署。
在使用 RKNN Toolkit Lite2 之前,用户需要先通过 RKNN Toolkit2 将各深度学习框架导出的模型转成 RKNN 模型。
RKNN Toolkit2 完整的安装包和使用文档可以从以下链接获取:
https://github.com/rockchip-linux/rknn-toolkit2环境安装
安装依赖
sudo apt-get install -y python3-opencvsudo apt-get install -y python3-numpy
安装rknn_toolkit_lite2板端推理
pip3 install rknn_toolkit_lite2-1.x.0-cp310-cp310-linux_aarch64.whl
将RKNPU2中的对应的so文件复制到开发板中进行替换
ls -al /usr/lib/librkn*/usr/lib/librknn_api.so/usr/lib/librknnrt.so
使用代码进行推理 ```python import os import urllib import traceback import time import sys import numpy as np import cv2
from rknn.api import RKNN
from rknnlite.api import RKNNLite
RKNN_MODEL = ‘yolov8n.rknn’ IMG_PATH = ‘./bus.jpg’ DATASET = ‘./dataset.txt’
QUANTIZE_ON = True
OBJ_THRESH = 0.25 NMS_THRESH = 0.45 IMG_SIZE = 640
CLASSES = (“person”, “bicycle”, “car”, “motorbike “, “aeroplane “, “bus “, “train”, “truck “, “boat”, “traffic light”, “fire hydrant”, “stop sign “, “parking meter”, “bench”, “bird”, “cat”, “dog “, “horse “, “sheep”, “cow”, “elephant”, “bear”, “zebra “, “giraffe”, “backpack”, “umbrella”, “handbag”, “tie”, “suitcase”, “frisbee”, “skis”, “snowboard”, “sports ball”, “kite”, “baseball bat”, “baseball glove”, “skateboard”, “surfboard”, “tennis racket”, “bottle”, “wine glass”, “cup”, “fork”, “knife “, “spoon”, “bowl”, “banana”, “apple”, “sandwich”, “orange”, “broccoli”, “carrot”, “hot dog”, “pizza “, “donut”, “cake”, “chair”, “sofa”, “pottedplant”, “bed”, “diningtable”, “toilet “, “tvmonitor”, “laptop “, “mouse “, “remote “, “keyboard “, “cell phone”, “microwave “, “oven “, “toaster”, “sink”, “refrigerator “, “book”, “clock”, “vase”, “scissors “, “teddy bear “, “hair drier”, “toothbrush “)
def sigmoid(x): return 1 / (1 + np.exp(-x))
def xywh2xyxy(x):
# Convert [x, y, w, h] to [x1, y1, x2, y2]y = np.copy(x)y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left xy[:, 1] = x[:, 1] - x[:, 3] / 2 # top left yy[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right xy[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right yreturn y
def process(input, mask, anchors):
anchors = [anchors[i] for i in mask]grid_h, grid_w = map(int, input.shape[0:2])box_confidence = sigmoid(input[..., 4])box_confidence = np.expand_dims(box_confidence, axis=-1)box_class_probs = sigmoid(input[..., 5:])box_xy = sigmoid(input[..., :2])*2 - 0.5col = np.tile(np.arange(0, grid_w), grid_w).reshape(-1, grid_w)row = np.tile(np.arange(0, grid_h).reshape(-1, 1), grid_h)col = col.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2)row = row.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2)grid = np.concatenate((col, row), axis=-1)box_xy += gridbox_xy *= int(IMG_SIZE/grid_h)box_wh = pow(sigmoid(input[..., 2:4])*2, 2)box_wh = box_wh * anchorsbox = np.concatenate((box_xy, box_wh), axis=-1)return box, box_confidence, box_class_probs
def filter_boxes(boxes, box_confidences, box_class_probs): “””Filter boxes with box threshold. It’s a bit different with origin yolov5 post process!
# Argumentsboxes: ndarray, boxes of objects.box_confidences: ndarray, confidences of objects.box_class_probs: ndarray, class_probs of objects.# Returnsboxes: ndarray, filtered boxes.classes: ndarray, classes for boxes.scores: ndarray, scores for boxes."""boxes = boxes.reshape(-1, 4)box_confidences = box_confidences.reshape(-1)box_class_probs = box_class_probs.reshape(-1, box_class_probs.shape[-1])_box_pos = np.where(box_confidences >= OBJ_THRESH)boxes = boxes[_box_pos]box_confidences = box_confidences[_box_pos]box_class_probs = box_class_probs[_box_pos]class_max_score = np.max(box_class_probs, axis=-1)classes = np.argmax(box_class_probs, axis=-1)_class_pos = np.where(class_max_score >= OBJ_THRESH)boxes = boxes[_class_pos]classes = classes[_class_pos]scores = (class_max_score* box_confidences)[_class_pos]return boxes, classes, scores
def nms_boxes(boxes, scores): “””Suppress non-maximal boxes.
# Argumentsboxes: ndarray, boxes of objects.scores: ndarray, scores of objects.# Returnskeep: ndarray, index of effective boxes."""x = boxes[:, 0]y = boxes[:, 1]w = boxes[:, 2] - boxes[:, 0]h = boxes[:, 3] - boxes[:, 1]areas = w * horder = scores.argsort()[::-1]keep = []while order.size > 0:i = order[0]keep.append(i)xx1 = np.maximum(x[i], x[order[1:]])yy1 = np.maximum(y[i], y[order[1:]])xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]])yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]])w1 = np.maximum(0.0, xx2 - xx1 + 0.00001)h1 = np.maximum(0.0, yy2 - yy1 + 0.00001)inter = w1 * h1ovr = inter / (areas[i] + areas[order[1:]] - inter)inds = np.where(ovr <= NMS_THRESH)[0]order = order[inds + 1]keep = np.array(keep)return keep
def yolov5_post_process(input_data): masks = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] anchors = [[10, 13], [16, 30], [33, 23], [30, 61], [62, 45], [59, 119], [116, 90], [156, 198], [373, 326]]
boxes, classes, scores = [], [], []for input, mask in zip(input_data, masks):b, c, s = process(input, mask, anchors)b, c, s = filter_boxes(b, c, s)boxes.append(b)classes.append(c)scores.append(s)boxes = np.concatenate(boxes)boxes = xywh2xyxy(boxes)classes = np.concatenate(classes)scores = np.concatenate(scores)nboxes, nclasses, nscores = [], [], []for c in set(classes):inds = np.where(classes == c)b = boxes[inds]c = classes[inds]s = scores[inds]keep = nms_boxes(b, s)nboxes.append(b[keep])nclasses.append(c[keep])nscores.append(s[keep])if not nclasses and not nscores:return None, None, Noneboxes = np.concatenate(nboxes)classes = np.concatenate(nclasses)scores = np.concatenate(nscores)return boxes, classes, scores
def draw(image, boxes, scores, classes): “””Draw the boxes on the image.
# Argument:image: original image.boxes: ndarray, boxes of objects.classes: ndarray, classes of objects.scores: ndarray, scores of objects.all_classes: all classes name."""for box, score, cl in zip(boxes, scores, classes):top, left, right, bottom = boxprint('class: {}, score: {}'.format(CLASSES[cl], score))print('box coordinate left,top,right,down: [{}, {}, {}, {}]'.format(top, left, right, bottom))top = int(top)left = int(left)right = int(right)bottom = int(bottom)cv2.rectangle(image, (top, left), (right, bottom), (255, 255, 0), 2)cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score),(top, left - 6),cv2.FONT_HERSHEY_SIMPLEX,0.6, (0, 0, 255), 2)
def letterbox(im, new_shape=(640, 640), color=(0, 0, 0)):
# Resize and pad image while meeting stride-multiple constraintsshape = im.shape[:2] # current shape [height, width]if isinstance(new_shape, int):new_shape = (new_shape, new_shape)# Scale ratio (new / old)r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])# Compute paddingratio = r, r # width, height ratiosnew_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh paddingdw /= 2 # divide padding into 2 sidesdh /= 2if shape[::-1] != new_unpad: # resizeim = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))left, right = int(round(dw - 0.1)), int(round(dw + 0.1))im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add borderreturn im, ratio, (dw, dh)
if name == ‘main‘:
# Create RKNN objectrknn = RKNNLite(verbose=True)print('--> Load RKNN model')ret = rknn.load_rknn("yolov5s.rknn")# Init runtime environmentprint('--> Init runtime environment')ret = rknn.init_runtime()# ret = rknn.init_runtime('rk3566')if ret != 0:print('Init runtime environment failed!')exit(ret)print('done')# Set inputsimg = cv2.imread(IMG_PATH)# img, ratio, (dw, dh) = letterbox(img, new_shape=(IMG_SIZE, IMG_SIZE))img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))# Inferenceprint('--> Running model')outputs = rknn.inference(inputs=[img])np.save('./onnx_yolov5_0.npy', outputs[0])np.save('./onnx_yolov5_1.npy', outputs[1])np.save('./onnx_yolov5_2.npy', outputs[2])print('done')# post processinput0_data = outputs[0]input1_data = outputs[1]input2_data = outputs[2]input0_data = input0_data.reshape([3, -1]+list(input0_data.shape[-2:]))input1_data = input1_data.reshape([3, -1]+list(input1_data.shape[-2:]))input2_data = input2_data.reshape([3, -1]+list(input2_data.shape[-2:]))input_data = list()input_data.append(np.transpose(input0_data, (2, 3, 0, 1)))input_data.append(np.transpose(input1_data, (2, 3, 0, 1)))input_data.append(np.transpose(input2_data, (2, 3, 0, 1)))boxes, classes, scores = yolov5_post_process(input_data)img_1 = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)if boxes is not None:draw(img_1, boxes, scores, classes)cv2.imwrite('result.jpg', img_1)rknn.release()
<a name="ehmjK"></a>### 多线程读取```pythonimport osimport urllibimport tracebackimport timeimport sysimport numpy as npimport cv2# from rknn.api import RKNNfrom rknnlite.api import RKNNLiteimport threadingimport queue# 自定义无缓存读视频类class VideoCapture:"""Customized VideoCapture, always read latest frame"""def __init__(self, name):self.cap = cv2.VideoCapture(name)self.q = queue.Queue(maxsize=3)self.stop_threads = False # to gracefully close sub-threadth = threading.Thread(target=self._reader)th.daemon = True # 设置工作线程为后台运行th.start()# 实时读帧,只保存最后一帧def _reader(self):while not self.stop_threads:ret, frame = self.cap.read()if not ret:breakif not self.q.empty():try:self.q.get_nowait()except queue.Empty:passself.q.put(frame)def read(self):return self.q.get()def terminate(self):self.stop_threads = Trueself.cap.release()if __name__ == '__main__':capture = VideoCapture("/dev/video9")img = capture.read()while img is not None:cv2.imshow('src',img_1)img = capture.read()if ord("q") == cv2.waitKey(10):break;capture.terminate()
ROS订阅摄像头
创建工作空间
mkdir -p car_ws/src
创建包
ros2 pkg create --build-type ament_python yolo_detect
- 导入代码 ```python import rclpy from rclpy.node import Node from std_msgs.msg import String from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 as cv
class SubNode(Node): def init(self): super(SubNode, self).init(‘py_sub’) self.create_subscription(Image,’/image_raw’,self.call_back,10) self.cv_bridge = CvBridge()
def call_back(self,msg):# self.get_logger().info(msg.data)image = self.cv_bridge.imgmsg_to_cv2(msg, 'bgr8')cv.imshow("image",image)cv.waitKey(10)
def main(): rclpy.init() node = SubNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown()
if name == ‘main‘: main()
- 修改配置setup.py```pythonentry_points={'console_scripts': ['yolo_detect = yolo_detect.det:main',],},
编译包
colcon build --packages-select yolo_detect
启动节点
ros2 run yolo_detect yolo_detect
导入usb_cam包
colcon buildsource install/setup.bash
编译完成之后
ros2 launch usb_cam demo_launch.py
常用指令
ros2 topic listros2 topic type /image_rawros2 interface show sensor_msgs/msg/Image
