storm
免费、开源、分布式、实时计算系统。 吞吐量高。 每秒每节点百万元组。
Spout //水龙头
Cloujr
JStorm
storm hadoop
实时流处理 批处理无状态 有状态使用zk协同的主 无zk的主从架构。从架构每秒处理数万消息 HDFS MR数分钟、数小时不会主动停止 终有完成的时候。
storm优点
1.跨语言2.可伸缩的3.低延迟,秒级/分钟级4.容错。
核心概念
1.Tuple主要的数据结构,有序元素的列表。2.StreamTuple的序列。3.Spouts数据流源头。可以读取kafka队列消息。可以自定义。4.Bolts转接头.逻辑处理单元。spout的数据传递个bolt,bolt计算,完成后产生新的数据。IBolt是接口。
Topology
Spout + bolt连接在一起形成一个top,形成有向图,定点就是计算,边是数据流。
task
Bolt中每个Spout或者bolt都是一个task.
Storm架构
1.Nimbus(灵气)master节点。核心组件,运行top。分析top并收集运行task。分发task给supervisor.监控top。无状态,依靠zk监控top的运行状况。2.Supervisor(监察)每个supervisor有n个worker进程,负责代理task给worker。worker在孵化执行线程最终运行task。storm使用内部消息系统在nimbus和supervisor之间进行通信。接受nimbus指令,管理worker进程完成task派发。3.worker执行特定的task,worker本身不执行任务,而是孵化executors,让executors执行task。4.Executor本质上有worker进程孵化出来的一个线程而已。executor运行task都属于同一spout或者bolt.5.task执行实际上的任务处理。或者是Spout或者是bolt.
storm工作流程
1.nimbus等待提交的top2.提交top后,nimbus收集task,3.nimbus分发task给所有可用的supervisor4.supervisor周期性发送心跳给nimbus表示自己还活着。5.如果supervisor挂掉,不会发送心跳给nimubs,nimbus将task发送给其他的supervisor6.nimubs挂掉,super会继续执行自己task。7.task完成后,supervisor等待新的task8.同时,挂掉的nimbus可以通过监控工具软件自动重启。
安装storm集群
[s201 ~ s204]1.jdk2.tar3.环境变量4.验证安装$>source /etc/profile$>./storm version5.分发安装文件到其他节点。6.配置[storm/conf/storm.yaml]## Nimbus 和 Supervisor 后台进程都需要一个用于存放一些状态数据(比如 jar 包、配置文件等等)的目录storm.local.dir: "/home/centos/storm"## Storm 关联的 ZooKeeper 集群的地址列表storm.zookeeper.servers:- "s202"- "s203"storm.zookeeper.port: 2181### nimbus.* configs are for the master##用于配置主控节点的地址,可以配置多个。从Storm1.0开始,支持Nimbus的HA。nimbus.seeds : ["s201"]### ui.* configs are for the masterui.host: 0.0.0.0ui.port: 8080##配置每个 Supervisor 机器能够运行的工作进程(worker)数。每个 worker 都需要一个单独的端口来接收消息,##这个配置项就定义了 worker 可以使用的端口列表。如果你在这里定义了 5 个端口,那么 Storm 就会在该机器上分配最多 5 个worker。##如果定义 3 个端口,那 Storm 至多只会运行三个 worker。supervisor.slots.ports:- 6700- 6701- 6702- 67037.分发8.启动进程a)启动s201 nimbus进程$>storm nimbus &b)启动s202 ~ s204 supervisor进程$>storm supervisor &c)启动s201的ui进程$>storm ui &9.通过webui查看http://s201:8080/
编程实现CallLog日志统计
0.pom.xml<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.it18zhang</groupId><artifactId>StormDemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.3</version></dependency></dependencies></project>1.创建Spoutpackage com.it18zhang.stormdemo;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichSpout;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Random;/*** Spout类,负责产生数据流*/public class CallLogSpout implements IRichSpout{//Spout输出收集器private SpoutOutputCollector collector;//是否完成private boolean completed = false;//上下文private TopologyContext context;//随机发生器private Random randomGenerator = new Random();//private Integer idx = 0;public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.context = context;this.collector = collector;}public void close() {}public void activate() {}public void deactivate() {}/*** 下一个元组*/public void nextTuple() {if (this.idx <= 1000) {List<String> mobileNumbers = new ArrayList<String>();mobileNumbers.add("1234123401");mobileNumbers.add("1234123402");mobileNumbers.add("1234123403");mobileNumbers.add("1234123404");Integer localIdx = 0;while (localIdx++ < 100 && this.idx++ < 1000) {//取出主叫String caller = mobileNumbers.get(randomGenerator.nextInt(4));//取出被叫String callee = mobileNumbers.get(randomGenerator.nextInt(4));while (caller == callee) {//重新取出被叫callee = mobileNumbers.get(randomGenerator.nextInt(4));}//模拟通话时长Integer duration = randomGenerator.nextInt(60);//输出元组this.collector.emit(new Values(caller, callee, duration));}}}public void ack(Object msgId) {}public void fail(Object msgId) {}/*** 定义输出的字段名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("from", "to", "duration"));}public Map<String, Object> getComponentConfiguration() {return null;}}2.创建CreatorBoltpackage com.it18zhang.stormdemo;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import java.util.Map;/*** 创建CallLog日志的Bolt*/public class CallLogCreatorBolt implements IRichBolt {//private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector ;}public void execute(Tuple tuple) {//处理通话记录String from = tuple.getString(0);String to = tuple.getString(1);Integer duration = tuple.getInteger(2);//产生新的tuplecollector.emit(new Values(from + " - " + to, duration));}public void cleanup() {}/*** 设置输出字段的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("call", "duration"));}public Map<String, Object> getComponentConfiguration() {return null;}}3.创建CounterBoltpackage com.it18zhang.stormdemo;import org.apache.storm.task.IBolt;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import java.util.HashMap;import java.util.Map;/*** 通话记录计数器Bolt*/public class CallLogCounterBolt implements IRichBolt{Map<String, Integer> counterMap;private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.counterMap = new HashMap<String, Integer>();this.collector = collector;}public void execute(Tuple tuple) {String call = tuple.getString(0);Integer duration = tuple.getInteger(1);if (!counterMap.containsKey(call)) {counterMap.put(call, 1);} else {Integer c = counterMap.get(call) + 1;counterMap.put(call, c);}collector.ack(tuple);}public void cleanup() {for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("call"));}public Map<String, Object> getComponentConfiguration() {return null;}}4.Apppackage com.it18zhang.stormdemo;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Fields;/*** App*/public class App {public static void main(String[] args) throws InterruptedException {TopologyBuilder builder = new TopologyBuilder();//设置Spoutbuilder.setSpout("spout", new CallLogSpout());//设置creator-Boltbuilder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");//设置counter-Boltbuilder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));Config conf = new Config();conf.setDebug(true);LocalCluster cluster = new LocalCluster();cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());Thread.sleep(10000);//停止集群cluster.shutdown();}}5.在生产环境的集群上部署storm topa)修改提交方式[App.java]public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();//设置Spoutbuilder.setSpout("spout", new CallLogSpout());//设置creator-Boltbuilder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");//设置counter-Boltbuilder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));Config conf = new Config();conf.setDebug(true);/*** 本地模式storm*/// LocalCluster cluster = new LocalCluster();// cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());// Thread.sleep(10000);StormSubmitter.submitTopology("mytop", conf, builder.createTopology());}b)导入jar包.maven ...c)在centos上运行top$>storm jar xxx.jar com.it18zhang.stormdemo.App
使用storm流计算实现wordcount
1.WordCountSpout...2.SplitBoltString line = ...String[] str = line.split(" ");for(String s : str){collector.emit(new Values("word","1"))}3.CounterBolt
设置top的并发程度和任务
配置并发度.1.设置worker数据conf.setNumWorkers(1);2.设置executors个数//设置Spout的并发暗示 (executor个数)builder.setSpout("wcspout", new WordCountSpout(),3);//设置bolt的并发暗示builder.setBolt("split-bolt", new SplitBolt(),4)3.设置task个数每个线程可以执行多个task.builder.setSpout("wcspout", new WordCountSpout(),3).setNumTasks(2);//builder.setBolt("split-bolt", new SplitBolt(),4).shuffleGrouping("wcspout").setNumTasks(3);4.并发度 ==== 所有的task个数的总和。
