首页 > 分布式 > Storm > Storm入门指南第三章 拓扑结构
2014
03-11

Storm入门指南第三章 拓扑结构

LocalCluster vs. StormSubmitter

到现在为止,你都在使用一个叫LocalCluster的工具在本地计算机上运行topology。在自己的计算机上运行Storm基础结构可以使你方便地运行和调试不同的topology。但是当你想将你的topology提交到一个运行的Storm集群上时该怎么做呢?Storm的一大特点就是可以很方便地将你的topology发送到一个真实的集群上运行,此时你需要将LocalCluster改成StormSubmitter并且实现其中的submitTopology()方法,该方法负责将topology发送到集群上。

代码改变如下:

//LocalCluster cluster = new LocalCluster();
//cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache",conf,
	builder.createTopology());
StormSubmitter.submitTopology("Count-Word-Topology-With-Refresh-Cache",conf,
	builder.createTopology());
//Thread.sleep(1000);
//cluster.shutdown();

提示:当使用StormSubmitter时,不可以在代码中控制集群,但是使用LocalCluster可以。

接着,将源码打包成一个jar文件,它将在你运行Storm客户端命令提交topology时被发送。因为使用的是Maven,所以你只需进入源文件夹下运行命令:mvn package

生成jar文件后,使用 storm jar 命令提交topology(如何安装Storm客户端参见附录A),这个命令的语法是:

storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

本例中,在topologies的源项目文件夹运行:

storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt

通过这些命令,你就已经将topology提交到集群上了。要想停止或者杀死该topology,运行:

storm kill Count-word-Topology-With-Refresh-Cache

提示:Topology的名字必须唯一。

DRPC拓扑结构

有一种被称为DRPC(分布式远程过程调用,Distributed Remote Procedure Call)的特殊的topology类型,它使用Storm分布式的能力来执行远程过程调用 (RPC)。如图3-1所示,Storm提供了一些工具让你使用DRPC,第一个工具是DRPC服务器,它的作用是充当DRPC 客户端和topology之间的连接器 ,以及topology spouts的源。DRPC服务器接收一个函数和它的参数来执行,然后对于函数操作的每个数据片,服务器都分配一个在整个topology中使用的请求ID来识别RPC请求。当topology执行最后一个bolt时,它必须发送标识RPC的请求ID和结果,使得DRPC服务器可以返回结果至正确的客户端。

DRPCTopology图3-1.DRPC topology图解

提示:一个DRPC服务器可以执行很多函数,每个函数都被一个唯一的ID标识。

Storm提供的第二个工具是LinearDRPCTopologyBuilder——一个来帮助构建DRPC topologies的抽象。构建的topology创建DRPCSpouts(它连接DRPC服务器,并且发送数据到topology的剩余部分)、包装bolt(以便结果从最后一个bolt返回)。所有添加到LinearDRPCTopologyBuilder的bolt被顺序执行。

作为这种类型topology的一个例子,我们将创建一个累加数的程序。这个例子很简单,但是这种理念可以被扩展到执行复杂的分布式数学运算。

bolt有如下输出声明:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
	declarer.declare(new Fields("id","result"));
}

由于这是topology中的唯一bolt,所以必须发送标识RPC的请求ID和结果。

execute()方法负责执行累加操作:

public void execute(Tuple input) {
	String[] numbers= input.getString(1).split("\\+");
	Integer added =0;
	if(numbers.length<2){
		throw new InvalidParameterException("Shouldbe at least 2 numbers");
	}
	for(String num:numbers){
		added +=Integer.parseInt(num);
	}
	collector.emit(new Values(input.getValue(0),added));
}

包含做累加的bolt的topology的定义如下:

public static void main(String[] args) {
	LocalDRPC drpc =new LocalDRPC();

	LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
	builder.addBolt(new AdderBolt(),2);

	Config conf =new Config();
	conf.setDebug(true);

	LocalCluster cluster =new LocalCluster();
	cluster.submitTopology("drpc-adder-topology",conf,
		builder.createLocalTopology(drpc));
	String result =drpc.execute("add","1+-1");
	checkResult(result,0);
	result =drpc.execute("add","1+1+5+10");
	checkResult(result,17);

	cluster.shutdown();
	drpc.shutdown();
}

创建LocalDRPC对象来在本地运行DRPC。接着,创建topology builder来添加bolt到topology中。为了测试这个topology,在DRPC对象上使用execute方法。

提示:使用DRPCClient类连接到远程DRPC服务器。DRPC服务器提供一个可被多种语言使用的Trift API,并且在本地或者远程运行DRPC服务器使用同样的API。  为了将topology提交到Storm集群,使用builder对象中的createRemoteTopology()方法代替createLocalTopology()方法,该方法使用storm配置中的DRPC配置。


Storm入门指南第三章 拓扑结构》有 2 条评论

  1. do_sth_diff 说:

    楼主,直接分组那一块,你用context.getComponentTasks的输出做取余的分母?这段确定没写错?

留下一个回复

你的email不会被公开。