在实际使用Presto的过程中,经常会有以下的一些需求。
- 添加一个新的Catalog
- 对不再使用的Catalog希望把它删除
- 修改某个Catalog的参数
但在Presto中如果进行上述的修改,需要重启Presto服务才可以生效,这给集群维护带来额外的工作量之外,还给上层应用带来很不好的使用体验。
如果还不能在开发环境很好运行Presto的话,参考在windows的IDEA运行Presto。 观察PrestoServer的run方法,可以知道Presot分了多个模块,而且多个模块依赖airlift(Airlift framework for building REST services)的项目,倒不如说airlift是Presto的根基。说实在话,我对于Presto的理解可能还在管中窥豹的阶段,但是不影响我对它做一些手脚。
1、新增一个Hello world测试接口
在presto-main中com.facebook.presto.server中新增一个CatalogResource,作用和Spring类似吧。
/** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.facebook.presto.server;import javax.inject.Inject;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.core.Response;/*** @author Gin* @since 2018/8/15.*/@Path("/v1/catalog")public class CatalogResource{@Injectpublic CatalogResource(){}@GET@Path("test")public Response test(){return Response.ok("Hello world").build();}}
在ServerMainModule的setup()的方法中最后一行注册CatalogResource:
// CatalogsjaxrsBinder(binder).bind(CatalogResource.class);
启动server,访问http://localhost:8080/v1/catalog/test,如果出现HelloWorld的话,那么后面的步骤才行得通。
2、新增一个Add Connector的RESTful接口
新建CatalogInfo用于接收参数:
package com.facebook.presto.server;import com.fasterxml.jackson.annotation.JsonCreator;import com.fasterxml.jackson.annotation.JsonProperty;import java.util.Map;import static java.util.Objects.requireNonNull;/*** @author Gin* @since 2018/8/17.*/public class CatalogInfo{private final String catalogName;private final String connectorName;private final Map<String, String> properties;@JsonCreatorpublic CatalogInfo(@JsonProperty("catalogName") String catalogName,@JsonProperty("connectorName") String connectorName,@JsonProperty("properties")Map<String, String> properties){this.catalogName = requireNonNull(catalogName, "catalogName is null");this.connectorName = requireNonNull(connectorName, "connectorName is null");this.properties = requireNonNull(properties, "properties is null");}@JsonPropertypublic String getCatalogName() {return catalogName;}@JsonPropertypublic String getConnectorName() {return connectorName;}@JsonPropertypublic Map<String, String> getProperties() {return properties;}}
在CatalogResource中增加对应的接口,用到的服务用注入方法声明。
/** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.facebook.presto.server;import com.facebook.presto.connector.ConnectorId;import com.facebook.presto.connector.ConnectorManager;import com.facebook.presto.metadata.InternalNodeManager;import com.google.common.base.Joiner;import com.google.common.base.Splitter;import io.airlift.discovery.client.Announcer;import io.airlift.discovery.client.ServiceAnnouncement;import javax.inject.Inject;import javax.ws.rs.*;import javax.ws.rs.core.MediaType;import javax.ws.rs.core.Response;import java.util.LinkedHashMap;import java.util.LinkedHashSet;import java.util.Map;import java.util.Set;import static com.google.common.base.Strings.nullToEmpty;import static io.airlift.discovery.client.ServiceAnnouncement.serviceAnnouncement;import static java.util.Objects.requireNonNull;/*** @author Gin* @since 2018/8/15.*/@Path("/v1/catalog")public class CatalogResource{private final ConnectorManager connectorManager;private final Announcer announcer;@Injectpublic CatalogResource(ConnectorManager connectorManager,Announcer announcer){this.connectorManager = requireNonNull(connectorManager, "connectorManager is null");this.announcer = requireNonNull(announcer, "announcer is null");}@GET@Path("test")public Response test(){return Response.ok("Hello world").build();}@PUT@Consumes(MediaType.APPLICATION_JSON)@Produces(MediaType.APPLICATION_JSON)public Response createCatalog(CatalogInfo catalogInfo){requireNonNull(catalogInfo, "catalogInfo is null");ConnectorId connectorId = connectorManager.createConnection(catalogInfo.getCatalogName(),catalogInfo.getConnectorName(),catalogInfo.getProperties());updateConnectorIdAnnouncement(announcer, connectorId);return Response.status(Response.Status.OK).build();}private static void updateConnectorIdAnnouncement(Announcer announcer, ConnectorId connectorId){//// This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed//// get existing announcementServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());// update connectorIds propertyMap<String, String> properties = new LinkedHashMap<>(announcement.getProperties());String property = nullToEmpty(properties.get("connectorIds"));Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));connectorIds.add(connectorId.toString());properties.put("connectorIds", Joiner.on(',').join(connectorIds));// update announcementannouncer.removeServiceAnnouncement(announcement.getId());announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());announcer.forceAnnounce();}private static ServiceAnnouncement getPrestoAnnouncement(Set<ServiceAnnouncement> announcements){for (ServiceAnnouncement announcement : announcements) {if (announcement.getType().equals("presto")) {return announcement;}}throw new RuntimeException("Presto announcement not found: " + announcements);}}
3、测试RESTful接口
这步需要安装需要的插件,检查插件是否安装。使用postman类似的东西,发送application/json的PUT请求到http://localhost:8080/v1/catalog/,body为
{"catalogName": "test","connectorName": "mysql","properties": {"connection-url":"jdbc:mysql://localhost:3306","connection-user":"root","connection-password":"root"}}
我们可以看到控制台,重新输出了connector的信息:
2018-08-19T14:09:03.502+0800 INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap PROPERTY DEFAULT RUNTIME DESCRIPTION2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap connection-password [REDACTED] [REDACTED]2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap connection-url null jdbc:mysql://localhost:33062018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap connection-user null root2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap allow-drop-table false false Allow connector to drop tables2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap mysql.auto-reconnect true true2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap mysql.connection-timeout 10.00s 10.00s2018-08-19T14:09:23.496+0800 INFO http-worker-133 Bootstrap mysql.max-reconnects 3 32018-08-19T14:09:23.876+0800 INFO http-worker-133 io.airlift.bootstrap.LifeCycleManager Life cycle starting...2018-08-19T14:09:23.877+0800 INFO http-worker-133 io.airlift.bootstrap.LifeCycleManager Life cycle startup complete. System ready.
接下来要怎么利用,要看大家的脑洞了:)?
参考文献: Presto技术内幕
