方案更新说明:
其实pig没有直接连接mysql的操作,借用sqoop这么不容易的话,可以用shell封装sqoop和pig,这样pig就可以直接使用sqoop导出的mysql数据了,这样会合理很多。
—————
查了很久,网上都没有用pig直接读写mysql的实现,只能借用sqoop来处理。因为打算直接用pig脚本实现工作,在pig脚本中调用sqoop是个麻烦的问题,需要记录下来。
工作内容:增量增加/修改ip库。
流程:在pig脚本中,提取hive表的日志ip,调用sqoop提取mysql的ip库,join起来,得到已有ip和新ip。调研python脚本,查询新ip的地区,写入mysql,更新旧ip的地区。
pig脚本:
set mapreduce.reduce.memory.mb 4096set default_parallel 20;-- 导入包,自己补全绝对路径REGISTER /.../jar/elephant-bird-core-4.10.jarREGISTER /.../jar/elephant-bird-hadoop-compat-4.10.jarREGISTER /.../jar/elephant-bird-pig-4.10.jarREGISTER /.../jar/json-simple-1.1.1.jarREGISTER /.../jar/elasticsearch-hadoop-pig-7.10.1.jarREGISTER /.../jar/commons-httpclient-3.1.jar%declare Today `date -d "1 day ago" +%Y-%m-%d`%declare TodayIndex `date +%Y%m%d`-- 从hive表提取昨天的日志,字段要对齐表字段顺序,写全Parquet = LOAD 'log_info' USING org.apache.hive.hcatalog.pig.HCatLoader AS (logtime:chararray,clientip:chararray,ip:chararray,authuser:chararray);Remote2LocalFull = FILTER Parquet BY ymdh<ToString(CurrentTime(),'yyyy-MM-dd') AND ymdh>'$Today' ;-- 计算ip唯一值,待查询GeoIP = FOREACH Remote2LocalFull GENERATE ip;DistinctGeoIP = DISTINCT GeoIP;rmf /user/hive/warehouse/abnormalLogin/aliyunGeoip/authguardIPWaitingForQuery;STORE DistinctGeoIP INTO '/user/hive/warehouse/abnormalLogin/aliyunGeoip/authguardIPWaitingForQuery' USING PigStorage;-- 导出mysql已查询ip数据到hdfs,格式为parquet-- 这里是调用sqoop的重点,复杂是因为 --是pig的注释符.......%declare SqoopDir /tmp/pig_sqoop_order;rmf /user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip;sh echo 'sqoop import [AA]username pqchen [AA]password 0000000 [AA]connect jdbc:mysql://真实ip:端口号/mysql database name?useSSL=false [AA]table aliyunip [AA]target-dir /user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip [AA]as-parquetfile -m 1' > '$SqoopDir';sh sed -i 's/\\\\\[A/-/g' $SqoopDir;sh sed -i 's/A\\\\\]/-/g' $SqoopDir;sh sed -i 's/\\\\\[B\\\\\]/\\\\\$CONDITIONS/g' $SqoopDir;sh /bin/sh $SqoopDir;-- pig加载导出的文件,字段写全,对齐mysqlIP = LOAD '/user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip' USING org.apache.parquet.pig.ParquetLoader AS (ip:chararray, isp:chararray,latitude:long, city:chararray, county:chararray, longitude:long, province:chararray, country:chararray, updatetime:long);a = FOREACH mysqlIP GENERATE ip;b = limit a 10;dump b;
