业务数据库需要做的修改
MySQL 业务数据
配置 MySQL 数据库
- 开启 binlog
# 开启 binlogserver-id = 223344log_bin = mysql-binbinlog_format = ROW # 这里一定是 row 格式binlog_row_image = FULLexpire_logs_days = 10 # 日志保留时间
:::info
- server-id:对于 MySQL 集群中的每个服务器和复制客户端,server-id 的值必须是唯一的。 在 MySQL 连接器设置期间,Debezium 为连接器分配一个唯一的服务器 ID。
- log_bin:log_bin 的值是 binlog 文件序列的基本名称。
- binlog_format:binlog-format 必须设置为 ROW 或 row。
- binlog_row_image:binlog_row_image 必须设置为 FULL 或 full。
- expire_logs_days:这是自动删除 binlog 文件的天数。 默认值为 0,表示不自动删除。 根据自己的需要设置该值。
:::
创建用于同步数据的用户并授权
:::info 需要 SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 权限。
- SELECT:使连接器能够从数据库中的表中选择行。 这仅在执行快照时使用。
- RELOAD:允许连接器使用 FLUSH 语句来清除或重新加载内部缓存、刷新表或获取锁。 这仅在执行快照时使用。
- SHOW DATABASES:通过发出 SHOW DATABASE 语句,使连接器能够查看数据库名称。 这仅在执行快照时使用。
- REPLICATION SLAVE:使连接器能够连接并读取 MySQL 服务器 binlog。
- REPLICATION CLIENT:允许连接器使用以下语句,连接器总是需要这个。
- 显示 MASTER 状态
- 显示 SLAVE 状态
- 显示二进制日志
:::
- 使用 root 用户登录 MySQL
mysql
- 创建 Debezium 用于同步数据的用户并授权
CREATE USER 'debezium'@'%' IDENTIFIED BY '123456';GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';FLUSH PRIVILEGES;
Postgres 业务数据
配置 Postgres 数据库
- 修改 postgresql.conf
vim /etc/postgresql/12/main/postgresql.conf# 更改wal日志方式为 logicalwal_level = logical # minimal, replica, or logical# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)wal_sender_timeout = 180s # in milliseconds; 0 disable# 指示服务器使用最多 10 个单独的进程来处理 WAL 更改,需要根据实际情况修改该值。max_wal_senders = 10# 指示服务器允许为流式 WAL 更改创建最多 10 个复制槽,需要根据实际情况修改该值。max_replication_slots = 10
- 修改 pg_hba.conf
vim /etc/postgresql/12/main/pg_hba.conf# TYPE DATABASE USER ADDRESS METHOD############ REPLICATION ##############local replication all trusthost replication all 127.0.0.1/32 trusthost replication all ::1/128 trust
:::info
- METHOD 这里指定的是
trust,无条件地允许连接。这种方法允许任何可以与 PostgreSQL 数据库服务器连接的用户以他们期望的任意 PostgreSQL 数据库用户身份登入,而不需要口令或者其他任何认证。 - USER 这里指定的是
all,如果已创建具有 REPLICATION 和 LOGIN 权限的其他用户,可以更改为其他用户。 - ADDRESS 这里指定的是
127.0.0.1/32(IPv4 回环地址) 和::1/128(IPv6 回环地址),可以更改为0.0.0.0/0表示所有 IPv4 地址,::0/0表示所有 IPv6 地址
:::
- 重启 postgresql 服务生效,所以一般是在业务低峰期更改
sudo systemctl daemon-reloadsudo systemctl restart postgresql
创建用于同步数据的用户并授权
:::info 需要 REPLICATION,LOGIN,SELECT,CREATE 权限。
要将表添加到 publication,用户必须是表的所有者,表所有者自动拥有表的 SELECT 权限,所以只需要 CREATE 权限。
REPLICATION:为了将表添加到 publication,所以需要数据库上的 REPLICATION 权限。
CREATE:为了添加添加发布,所以需要数据库上的 CREATE 权限。
SELECT:为了复制初始表数据,所以需要表上的 SELECT 权限, 表所有者自动拥有表的 SELECT 权限。
:::
- 使用 postgres 用户登录 PostgreSQL
sudo -u postgres psql
- 方式一:为表的所有者增加 REPLICATION,LOGIN 和 CREATE 权限
-- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限ALTER ROLE postgres WITH REPLICATION LOGIN;-- 把 mydb 数据库创建新的模式和发布的权限赋给 debeziumGRANT CREATE ON DATABASE mydb TO postgres;
- 方式二:由于源表已经存在,使用复制组来与原始所有者共享所有权
:::info 但是由于源表已经存在,需要一种机制来与原始所有者共享所有权。 要启用共享所有权,你需要创建一个 PostgreSQL 复制组,然后将现有表所有者和复制用户添加到该组。
:::
-- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限CREATE USER debezium WITH PASSWORD '123456' REPLICATION LOGIN;-- 创建一个 replication_group 用于共享所有权CREATE ROLE replication_group;-- 将现有表所有者和复制用户添加到该组GRANT replication_group TO postgres;GRANT replication_group TO debezium;-- 将表的所有权转交给 replication_group,这样 debezium 和 postgres都用表的所有权ALTER TABLE public.shipments OWNER TO replication_group;-- 把 mydb 数据库创建新的模式和发布的权限赋给 debeziumGRANT CREATE ON DATABASE mydb TO debezium;
同步业务数据到 Hudi(ODS)
下载 pitrix-data-warehouse
git clone git@git.internal.yunify.com:geekspeng/pitrix-data-warehouse.gitcd pitrix-data-warehouse
写入 Hudi 时同步到 Hive
- 打开根目录下的 warehouse.yaml,配置 Hive
hive_sync:metastore_uris: 'thrift://localhost:9083'username: 'hive' # HMS 用户名password: 'hive' # HMS 密码db: 'warehouse' # Hive 的数据库
配置需要同步的 Postgres 业务数据库
- 打开根目录下的 warehouse.yaml,配置需要同步的 Postgres 业务数据库(可以配置多个数据库地址)
pg_sync:- hostname: '172.31.1.247' # 数据库地址port: '5432' # 数据库端口username: 'yunify' # 数据库用户名password: '123456' # 数据库密码databases: # 需要同步的数据库列表- db: 'zone' # 数据库名称(可以配置多个数据库)schema: 'public' # schema 名称table:- name: 'instance'- name: 'image'zone_id: 'pek3a' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'- name: 'eip'zone_id: 'pek3a'- name: 'dns_alias'zone_id: 'pek3a'zone: 'pek3a' # 用于指定不同 zone 的 SQL 存放目录- db: 'global' # 数据库名称(可以配置多个数据库)schema: 'public' # schema 名称table:- name: 'wan_access'zone_id: 'global'- name: 'wan_cpe_order'zone_id: 'global' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'- name: 'wan_physical_connect'zone_id: 'global'- name: 'app_saas_instance'zone_id: 'global'- name: 'gatewayapi'zone_id: 'global'- db: 'billing_resource'schema: 'public'table:- name: 'leasing'- name: 'leasing_contract'- name: 'leased'- name: 'leased_contract'- name: 'reserved_contract'- hostname: '172.31.1.244' # 数据库地址port: '5432' # 数据库端口username: 'yunify' # 数据库用户名password: '123456' # 数据库密码databases: # 需要同步的数据库列表- db: 'zone' # 数据库名称schema: 'public' # schema 名称(可以配置多个数据库)table:- name: 'instance'- name: 'nfv'- name: 'image'zone_id: 'pek3' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'zone: 'pek3' # 用于指定不同 zone 的 SQL 存放目录
生成数据同步 SQL并提交(自动查询业务数据库的 Schema 并生成SQL)
:::info
注意⚠️: 需要 Python 3.6 以上版本:::
- 切换路径
cd pitrix-data-warehouse
- 安装 pip 包
pip install -r requirements.txt
:::color4 安装 pip install psycopg2 报错如下
pg_config executable not found
解决方法:
sudo apt-get install libpq-dev
参考文档
pg_config executable not found
:::
- 生成数据同步 SQL(每个数据库的每个表一个SQL 文件)
python3 ods/ods_sync.py
生成的 SQL 保存在 ods 对应的 db_name 目录下,如果数据库为 zone,会根据配置的 zone 创建一个子目录,SQL 文件名称为 sync_table_name.sql

- 通过 submit_job.py 提交 job
/submit_job.py --helpusage: submit_job.py [-h] [-f FILE] [-d DIRECTORY] [-i INIT] [-j JAR] [-l LIBRARY]optional arguments:-h, --help show this help message and exit-f FILE, --file FILE Script file that should be executed.-d DIRECTORY, --directory DIRECTORYScript file that should be executed.-i INIT, --init INIT Script file that used to init the session context-j JAR, --jar JAR A JAR file to be imported into the session-l LIBRARY, --library LIBRARYA JAR file directory with which every new session is initialized
:::info
- -f:提交单个 SQL
- -d:提交目录下的所有 SQL
- -i:用于初始化的 SQL,初始化 SQL 文件中允许使用以下语句:
- DDL(CREATE/DROP/ALTER)
- USE CATALOG/DATABASE
- LOAD/UNLOAD MODULE
- SET command
- RESET command
common-init.sql 用来初始化公共参数,默认使用根目录下的 common-init.sql,如果 SQL 同目录下有 common-init.sql,则使用该文件
如果未指定该参数, 同时 SQL 同目录下有同名的后缀为 -init.sql 的文件,则会使用该文件进行初始化
- -j:指定依赖文件
- -l:指定依赖的目录
:::
- 提交单个 SQL
./submit_job.py -f ods/global/sync_wan_access.sql
- 提交目录下的所有 SQL
./submit_job.py -d ods/global
- 通过 Flink SQL Client 提交(一次只能提交一个 SQL,并且需要自己指定用于初始化的 SQL)
/usr/local/flink/bin/sql-client.sh -i common-init.sql -f ods/global/sync_wan_access.sql
生成资源维度表(DIM)
配置资源维度需要抽象的表
dim_resource:databases:- db: 'zone'table:- name: 'instance'- name: 'cluster'resource_name_field: 'name' # resource_name 映射的字段resource_type_field: 'app_id' # resource_type 映射的字段- name: 's2_server'resource_type_field: 'service_type' # resource_type 映射的字段- name: 'vpc_border'resource_name_field: 'border_name' # resource_name 映射的字段- name: 'routing_table'alias: 'rtable' # 如果配置了表的别名,那么 resource_id,resource_name 根据别名映射的字段- name: 'security_group'alias: 'group'- name: 'waf_rule_group'alias: 'rule_group'- name: 'key_pair'- name: 'nfv'resource_type_field: 'nfv_type'router_id_field: 'vpc_router_id' # router_id 映射的字段- db: 'global'table:- name: 'wan_access'resource_type_field: 'access_type' # resource_type 映射的字段resource_type_prefix: 'sdwan_' # resource_type 的前缀- name: 'wan_cpe_order'resource_type_value: 'sdwan_cpe_order' # resource_type 的值- name: 'wan_physical_connect'resource_type_value: 'sdwan_physical_connect' # resource_type 的值- name: 'app_saas_instance'alias: 'ins'resource_type_field: 'app_id' # resource_type 映射的字段user_id_field: 'user_id' # user_id 映射的字段- name: 'gatewayapi'alias: 'api'user_id_field: 'user_id'
生成数据同步 SQL并提交
:::info
注意⚠️: 需要 Python 3.6 以上版本:::
- 切换路径
cd pitrix-data-warehouse
- 生成数据同步 SQL(每个数据库的每个表一个SQL 文件)
python3 dim/dim_resource/dim_resource.py
生成的 SQL 保存在 dim 对应的 dim_resource 目录下,SQL 文件名称为 table_name.sql
- 通过 submit_job.py 提交 job
/submit_job.py --helpusage: submit_job.py [-h] [-f FILE] [-d DIRECTORY] [-i INIT] [-j JAR] [-l LIBRARY]optional arguments:-h, --help show this help message and exit-f FILE, --file FILE Script file that should be executed.-d DIRECTORY, --directory DIRECTORYScript file that should be executed.-i INIT, --init INIT Script file that used to init the session context-j JAR, --jar JAR A JAR file to be imported into the session-l LIBRARY, --library LIBRARYA JAR file directory with which every new session is initialized
:::info
- -f:提交单个 SQL
- -d:提交目录下的所有 SQL
- -i:用于初始化的 SQL,初始化 SQL 文件中允许使用以下语句:
- DDL(CREATE/DROP/ALTER)
- USE CATALOG/DATABASE
- LOAD/UNLOAD MODULE
- SET command
- RESET command
common-init.sql 用来初始化公共参数,默认使用根目录下的 common-init.sql,如果 SQL 同目录下有 common-init.sql,则使用该文件
如果未指定该参数, 同时 SQL 同目录下有同名的后缀为 -init.sql 的文件,则会使用该文件进行初始化
- -j:指定依赖文件
- -l:指定依赖的目录
:::
- 提交单个 SQL
./submit_job.py -f dim/dim_resource/instance.sql
- 提交目录下的所有 SQL
./submit_job.py -d dim/dim_resource
- 通过 Flink SQL Client 提交(一次只能提交一个 SQL,并且需要自己指定用于初始化的 SQL)
/usr/local/flink/bin/sql-client.sh -i common-init.sql -f dim/dim_resource/instance.sql
