首页 > 分布式 > Storm > Storm入门指南第三章 拓扑结构
2014
03-11

Storm入门指南第三章 拓扑结构

在本章你将会看到如何在一个Storm拓扑的不同组件间传递元组,以及如何将一个topology部署到一个运行的Storm集群上。

流分组

在设计一个topology时,一件最重要的事就是定义数据在组件之间怎样交换(流怎样被bolt消费)。流分组(Stream Grouping)指定了每个bolt消费哪些流,以及这些流如何被消费。

提示:一个节点可以发送不止一条数据流,流分组允许我们选择接收哪些流。

正如第二章中见到的,流分组在topology被定义的时候就已经被设定了:

builder.setBolt("word-normalizer",new WordNormalizer())
	.shuffleGrouping("word-reader");

在上面的代码块中,在topology builder上设置了一个bolt,然后设置该bolt的源使用随机分组(shuffleGrouping)。通常情况下,一个流分组携带源组件的Id作为参数,并且还有其他可选参数,这取决于流分组的种类。

提示:每个InputDeclarer可以有不止一个源,并且每个源可以用不同的流分组来分组。

随机分组

随机分组(Shuffle Grouping)是最常用的分组方式,该分组方式携带一个参数(源组件Id),源组件发送元组到一个随机选择的bolt并确保每个消费者(即bolt)会收到相等数量的元组。

随机分组对于做原子操作(例如数学运算)是很有用的。然而,如果操作不能被随机分配,就应该考虑使用其他分组,例如在第二章为单词计数的例子中。

字段分组

字段分组(Fields Grouping)允许你基于tuple中的一个或多个字段控制元组如何被发送到bolt,该分组方式确保了对于一个组合字段所确定的的值集合总是会被发送到相同的bolt。回到单词计数的例子,如果你根据“word”字段将流分组,则WordNormalizer bolt总是会将包含给定的单词的元组一起发送到相同的WordCounter bolt实例中。

builder.setBolt("word-counter",new WordCounter(),2)
	.fieldsGrouping("word-normalizer",new Fields("word"));

提示:字段分组中设置的所有字段在源组件的输出字段声明中也必须存在(译者注:在源组件的declareOutputFields()方法中声明)。

全部分组

全部分组(All Grouping)会向接收bolt的所有实例发送一份每个元组的副本,这种分组方式被用于向bolts发送信号。例如,如果你需要刷新缓存,你可以向所有bolt发送一个刷新缓存信号。在第二章单词计数的例子中,可以使用所有分组方式增加清空计数器缓存的功能(WordCounter中相应代码如下)

public void execute(Tuple input) {
	String str =null;
	try{
		if(input.getSourceStreamId().equals("signals")){
			str =input.getStringByField("action");
			if("refreshCache".equals(str))
				counters.clear();
		}
	}catch(IllegalArgumentException e) {
	//Do nothing
	}
	...
}

上面的代码中增加了一个if检查源流的名字 (sourceStreamId) 是否为”signal”。Storm中可以声明命名的流 (named streams) ,如果你不发送元组到一个命名的流,则流的默认名字为“default”。这是一个非常好的方式来确定元组的源,正如这个例子中我们需要确定信号一样。

在topology定义中,向word-counter bolt 增加第二个流分组方式,以便来自signals-spout 流中的每个tuple能发送到bolt的所有实例中。

builder.setBolt("word-counter",new WordCounter(),2)
	.fieldsGrouping("word-normalizer",new Fields("word"))
	.allGrouping("signals-spout","signals")

关于signals-spout的实现可以在git库中找到。

自定义分组

通过实现CustomStreamGrouping接口,你也可以创建自定义流分组,这让你有权决定每个元组将被哪个(些)bolt接收。

下面我们修改单词计数的例子,将元组分组以便相同字母开头的单词能被相同的bolt接收。

public class ModuleGrouping implements CustomStreamGrouping,Serializable{

	int numTasks=0;

	@Override
	public List chooseTasks(List</pre>
上面是一个CustomStreamGrouping的简单实现,在这里我们使用任务的数量 (numTasks) 来对单词的第一个字符的整型值取模,由此选择哪个bolt将接收这个元组。
<p align="left">要在单词计数的例子中使用这种分组,按照下列方式修改word-counter分组(<strong>译者注:</strong>此处有修改):</p>

1
builder.setBolt("word-counter",new WordCounter())
	.customGrouping("word-normalizer",new ModuleGrouping());

直接分组

这是一个特殊的分组方式,由源组件决定哪个组件将接收元组。同前面的例子类似,源组件将基于单词中的第一个字母决定哪个bolt接收这个tuple,为了使用直接分组,需要在WordNormalizer bolt中使用emitDirect()方法代替emit方法。

public void execute(Tuple input) {
	...
	for(String word:words){
		if(!word.isEmpty()){
			...
			collector.emitDirect(getWordCountIndex(word),new Values(word));
		}
	}
	// Acknowledge the tuple
	collector.ack(input);
}

public Integer getWordCountIndex(String word) {
	word =word.trim().toUpperCase();
	if(word.isEmpty())
		return 0;
	else
		return word.charAt(0) % numCounterTasks;
}

在prepare()方法中算出目标任务的数目:

public void prepare(Map stormConf,TopologyContext context,
	OutputCollector collector) {
	this.collector=collector;
	this.numCounterTasks=context.getComponentTasks("word-counter");
}

在topology的定义中,指定流将被直接分组:

builder.setBolt("word-counter",new WordCounter(),2)
	.directGrouping("word-normalizer");

全局分组

全局分组(Global Grouping)将源组件的所有实例产生的元组发送到一个目标组件的实例()中(具体地说,是bolt中Id最小的那个任务)。

无分组

在Storm0.7.1时,使用这种分组和使用随机分组一样,即不关注流怎样被分组。


Storm入门指南第三章 拓扑结构》有 2 条评论

  1. do_sth_diff 说:

    楼主,直接分组那一块,你用context.getComponentTasks的输出做取余的分母?这段确定没写错?

留下一个回复

你的email不会被公开。