1. 应用启动
1.1 请求地址
http://{ip}:{port}/v1/jars/{project_id}/run
1.2 请求方式
POST
1.3 请求头
Content-Type: application/json;charset=UTF-8
1.4 请求参数示例
{"entryClass" : "com.zxelec.fsdap.compute.etl.Application","programArgsList": ["--appName","customer-app","--parallelism","1","--kafkaConsumer","{\"bootstrap\":\"192.168.1.249:9092\",\"topic\":\"imsi\",\"group\":\"f11\"}","--duplicate","{\"isDuplicate\":\"true\",\"keys\":\"field1,field2\",\"duplicateTime\":60}","--sink","[{\"bootstrap\":\"192.168.1.249:9092\",\"topic\":\"ff\",\"conditions\":[{\"key\":\"Method\",\"operator\":\"=\",\"value\":\"POST\"}]}]"]}
1.5 请求参数说明
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| entryClass | string | Y | 任务名 | |
| programArgsList | array | Y | 任务参数 |
1.5.1 programArgsList参数说明
此参数为数组类型,数组每两项组成一对参数 key->value 结构。第一项为key,第二项为value。
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| —appName | string | N | ETLApp | 任务名称 |
| —parallelism | string | N | 1 | 任务并行度 |
| —kafkaConsumer | object | Y | 此项定义了数据源kafka信息。 bootstrap(必填): kafka brokers; topic(必填): kafka topic; group(必填): kafka group; |
|
| —duplicate | object | Y | 此项定义了数据去重逻辑。 isDuplicate(必填): 是否去重true/false; keys(非必填): 过滤属性key,多属性则逗号拼接; duplicateTime(非必填): 去重间隔时间,数据类型为number,单位为秒,默认为60s; |
|
| —sink | array | Y | 此项定义了数据过滤及输出逻辑,数组中每个json对象为一项过滤输出项。 bootstrap(必填): kafka brokers; topic(必填): kafka topic; conditions(非必填): 定义过滤逻辑。类型为数组类型,每项为一条过滤条件。key:字段属性,operator:过滤条件,value:值; |
1.6 响应结果示例
{"jobid": "6d42ddb9c3597aa88357ef56edba7a95"}
2. 应用取消
2.1 请求地址
http://{ip}:{port}/jobs/{jobid}
2.2 请求方式
PATCH
2.3 响应结果示例
{}
3. 应用详情查看
3.1 请求地址
http://{ip}:{port}/jobs/{jobid}
3.2 请求方式
GET
3.3 响应结果示例
{"jid": "107abed300910f0450ce4b09973592a4","name": "ETLApp","isStoppable": false,"state": "RUNNING","start-time": 1652248802057,"end-time": -1,"duration": 1156064,"maxParallelism": -1,"now": 1652249958121,"timestamps": {"SUSPENDED": 0,"CREATED": 1652248802120,"RESTARTING": 0,"FAILED": 0,"RUNNING": 1652248802171,"FAILING": 0,"INITIALIZING": 1652248802057,"RECONCILING": 0,"CANCELED": 0,"FINISHED": 0,"CANCELLING": 0},"vertices": [{"id": "cbc357ccb763df2852fee8c4fc7d55f2","name": "Source: Custom Source -> Timestamps/Watermarks","maxParallelism": 128,"parallelism": 1,"status": "RUNNING","start-time": 1652248802262,"end-time": -1,"duration": 1155859,"tasks": {"CANCELED": 0,"RUNNING": 1,"SCHEDULED": 0,"CREATED": 0,"RECONCILING": 0,"INITIALIZING": 0,"DEPLOYING": 0,"FINISHED": 0,"CANCELING": 0,"FAILED": 0},"metrics": {"read-bytes": 0,"read-bytes-complete": true,"write-bytes": 2088584254,"write-bytes-complete": true,"read-records": 0,"read-records-complete": true,"write-records": 2048046,"write-records-complete": true}},{"id": "90bea66de1c231edf33913ecd54406c1","name": "KeyedProcess -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_4, type=*org.apache.flink.api.java.tuple.Tuple3<`f0` STRING, `f1` STRING, `f2` BIGINT NOT NULL>* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS kafka_key, f1 AS kafka_value, f2 AS kafka_timestamp], where=[(get_json_value(f1, _UTF-16LE'Method') = _UTF-16LE'POST':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]) -> SinkConversionToTuple3 -> Sink: Unnamed","maxParallelism": 128,"parallelism": 1,"status": "RUNNING","start-time": 1652248802265,"end-time": -1,"duration": 1155856,"tasks": {"CANCELED": 0,"RUNNING": 1,"SCHEDULED": 0,"CREATED": 0,"RECONCILING": 0,"INITIALIZING": 0,"DEPLOYING": 0,"FINISHED": 0,"CANCELING": 0,"FAILED": 0},"metrics": {"read-bytes": 2088611006,"read-bytes-complete": true,"write-bytes": 0,"write-bytes-complete": true,"read-records": 2048046,"read-records-complete": true,"write-records": 0,"write-records-complete": true}}],"status-counts": {"CANCELED": 0,"RUNNING": 2,"SCHEDULED": 0,"CREATED": 0,"RECONCILING": 0,"INITIALIZING": 0,"DEPLOYING": 0,"FINISHED": 0,"CANCELING": 0,"FAILED": 0},"plan": {"jid": "107abed300910f0450ce4b09973592a4","name": "ETLApp","nodes": [{"id": "90bea66de1c231edf33913ecd54406c1","parallelism": 1,"operator": "","operator_strategy": "","description": "KeyedProcess -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_4, type=*org.apache.flink.api.java.tuple.Tuple3<`f0` STRING, `f1` STRING, `f2` BIGINT NOT NULL>* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS kafka_key, f1 AS kafka_value, f2 AS kafka_timestamp], where=[(get_json_value(f1, _UTF-16LE'Method') = _UTF-16LE'POST':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> SinkConversionToTuple3 -> Sink: Unnamed","inputs": [{"num": 0,"id": "cbc357ccb763df2852fee8c4fc7d55f2","ship_strategy": "HASH","exchange": "pipelined_bounded"}],"optimizer_properties": {}},{"id": "cbc357ccb763df2852fee8c4fc7d55f2","parallelism": 1,"operator": "","operator_strategy": "","description": "Source: Custom Source -> Timestamps/Watermarks","optimizer_properties": {}}]}}
