首页 > 分布式 > Storm > Storm高级原语(三) — Trident topology
2014
04-27

Storm高级原语(三) — Trident topology

本文翻译自Apache Storm主页上的Trident topology 介绍一文,同时参考derekjiang博客。

Trident是在storm基础上,一个以实时计算为目标的高度抽象。 它在提供处理大吞吐量数据能力(每秒百万次消息)的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批处理工具很了解的话,那么应该很容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。Trident也提供一致性(consistent)、有且仅有一次(exactly-once)等语义,这使得我们在使用trident toplogy时变得容易。

举例说明

让我们一起来看一个Trident的例子。在这个例子中,我们主要做了两件事情:

1、从一个流式输入中读取语句并计算每个单词的个数

2、提供查询给定单词列表中每个单词当前总数的功能

因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);

这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
       .parallelismHint(6);

在这段代码中,我们首先创建了一个TridentTopology对象,该对象提供了相应的接口去构造Trident计算过程。①、TridentTopology类中的newStream方法从输入源(input source)中读取数据,并创建一个新的数据流。在这个例子中,我们使用了上面定义的FixedBatchSpout对象作为输入源。输入数据源同样也可以是如Kestrel或者Kafka这样的队列服务。Trident会在Zookeeper中保存一小部分状态信息来追踪数据的处理情况,而在代码中我们指定的字符串“spout1”就是Zookeeper中用来存储状态信息的Znode节点。

Trident在处理输入stream的时候会把输入转换成batch(包含若干个tuple)来处理。比如说,输入的sentence stream可能会被拆分成如下的batch:

split sentenseStream to batch

一般来说,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,这完全取决于你的输入的吞吐量。

Trident提供了一系列非常成熟的批处理API来处理这些小batch。这些API和你在Pig或者Cascading中看到的非常类似, 你可以做groupby、join、 aggregation、执行 function和filter等等。当然,独立的处理每个小的batch并不是非常有趣的事情,所以Trident提供了功能来实现batch之间的聚合并可以将这些聚合的结果存储到内存、Memcached、Cassandra或者是一些其他的存储中。同时,Trident还提供了非常好的功能来查询实时状态,这些实时状态可以被Trident更新,同时它也可以是一个独立的状态源。

回到我们的这个例子中来,spout输出了一个只有单一字段“sentence”的数据流。②、在下一行,topology使用了Split函数来拆分stream中的每一个tuple,Split函数读取输入流中的“sentence”字段并将其拆分成若干个word tuple。每一个sentence tuple可能会被转换成多个word tuple,比如说”the cow jumped over the moon” 会被转换成6个 “word” tuples。下面是Split的定义:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));
       }
   }
}

如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每个单词作为一个tuple输出。

topology的其他部分计算单词的个数并将计算结果保存到了持久存储中。③、首先,word stream被根据“word”字段进行group操作,④、然后每一个group使用Count聚合器进行持久化聚合。persistentAggregate方法会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过我们可以很简单的把这些数据保存到其他的存储当中,如 Memcached、 Cassandra等。如果我们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就可以,这当中的”serverLocations”是Memcached cluster的主机和端口号列表:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))

persistentAggregate存储的数据就是所有batch聚合的结果。

Trident非常酷的一点就是它提供完全容错的(fully fault-tolerant)、处理一次且仅一次(exactly-once)的语义。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

④续、persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的分布式查询部分。

上面的是topology中的第一部分,topology的第二部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词的总个数。这些查询就像普通的RPC调用那样被执行的,要说不同的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

如你所见,除了在storm cluster上并行执行之外,这个查询看上去就是一个普通的RPC调用。这样的简单查询的延时通常在10毫秒左右。当然,更复杂的DRPC调用可能会占用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

Topology中的分布式查询部分实现如下所示:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

我们仍然是使用TridentTopology对象来创建DRPC stream,并且我们将这个函数命名为“words”。这个函数名会作为第一个参数在使用DRPC Client来执行查询的时候用到。

每个DRPC请求会被当做只有一个tuple的batch来处理。在处理的过程中,以这个输入的单一tuple来表示这个请求。这个tuple包含了一个叫做“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

首先,我们使用Split函数把传入的请求参数拆分成独立的单词。然后对“word”流进行group by操作,之后就可以使用stateQuery来在上面代码中创建的TridentState对象上进行查询。stateQuery接受一个state源(在这个例子中,就是我们的topolgoy所计算的单词的个数)以及一个用于查询的函数作为输入。在这个例子中,我们使用了MapGet函数来获取每个单词的出现个数。由于DRPC stream是使用跟TridentState完全同样的group方式(按照“word”字段进行groupby),每个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

接下来,我们用FilterNull这个过滤器把从未出现过的单词给过滤掉(说明没有查询该单词),并使用Sum这个聚合器将这些count累加起来得到结果。最终,Trident会自动把这个结果发送回等待的客户端。

Trident在如何最大程度地保证执行topogloy性能方面是非常智能的。在topology中会自动的发生两件非常有意思的事情:

1、读取和更新状态的操作 (比如说 stateQuery和persistentAggregate ) 会自动地批量处理。 如果当前处理的batch中有20次更新需要被同步到存储中,Trident会自动的把这些操作汇总到一起,只做一次读一次写,而不是进行20次读20次写的操作。因此你可以在很方便的执行计算的同时,保证了非常好的性能。

2、Trident的聚合器已经是被优化的非常好了的。Trident并不是简单的把一个group中所有的tuples都发送到同一个机器上面进行聚合,而是在发送之前已经进行过一次部分的聚合。打个比方,Count聚合器会先在每个partition上面进行count,然后把每个分片count汇总到一起就得到了最终的count。这个技术其实就跟MapReduce里面的combiner是一个思想。

让我们再来看一下Trident的另外一个例子。

Reach

这个例子是一个纯粹的DRPC topology,这个topology会计算一个给定URL的reach值,reach值是该URL对应页面的推文能够送达(Reach)的用户数量,那么我们就把这个数量叫做这个URL的reach。要计算reach,你需要获取转发过这个推文的所有人,然后找到所有该转发者的粉丝,并将这些粉丝去重,最后就得到了去重后的用户的数量。如果把计算reach的整个过程都放在一台机器上面,就太困难了,因为这会需要数千次数据库调用以及千万级别数量的tuple。如果使用Storm和Trident,你就可以把这些计算步骤在整个cluster中并行进行(具体哪些步骤,可以参考DRPC介绍一文,该文有介绍过Reach值的计算方法)。

这个topology会读取两个state源:一个将该URL映射到所有转发该推文的用户列表,还有一个将用户映射到该用户的粉丝列表。topology的定义如下:

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
       .groupBy(new Fields("follower"))
       .aggregate(new One(), new Fields("one"))
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

这个topology使用newStaticState方法创建了TridentState对象来代表一个外部数据库。使用这个TridentState对象,我们就可以在这个topology上面进行动态查询了。和所有的state源一样,在这些数据库上面的查找会自动被批量执行,从而最大程度的提升效率。

这个topology的定义是非常简单的 – 它仅是一个批处理的任务。

首先,查询urlToTweeters数据库来得到转发过这个URL的用户列表。这个查询会返回一个tweeter列表,因此我们使用ExpandList函数来把其中的每一个tweeter转换成一个tuple。

接下来,我们来获取每个tweeter的follower。我们使用shuffle来把要处理的tweeter均匀地分配到toplology运行的每一个worker中并发去处理。然后查询tweetersToFollowers数据库从而的到每个转发者的粉丝。你可以看到我们为topology的这部分分配了很大的并行度,这是因为这部分是整个topology中最耗资源的计算部分。

然后,我们对这些粉丝进行去重和计数。这分为如下两步:①、通过“follower”字段对流进行分组,并对每个组执行“One”聚合器。“One”聚合器对每个分组简单的发送一个tuple,该tuple仅包含一个数字“1”。②、将这些“1”加到一起,得到去重后的粉丝集中的粉丝数。“One”聚合器的定义如下:

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }

   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }

   public Integer zero() {
       return 1;
   }
}

这是一个“汇总聚合器(combiner aggregator)”, 它会在传送结果到其他worker汇总之前进行局部汇总,从而使性能最优。同样,Sum被定义成一个汇总聚合器,在topology的最后部分进行全局求和是高效的。

接下来让我们一起来看看Trident的一些细节。

Fields and tuples

Trident的数据模型是TridentTuple。在一个topology中,tuple是在一系列的处理操作(operation)中增量生成的。operation一般以一组字段作为输入并输出一组功能字段(function fileds)。Operation的输入字段经常是输入tuple的一个子集,而功能字段则是operation的输出。

看下面这个例子。假定你有一个叫做“stream”的stream,它包含了“x”,”y”和”z”三个字段。为了运行一个读取“y”作为输入的过滤器MyFilter,你可以这样写:

stream.each(new Fields("y"), new MyFilter())

MyFilter的实现如下:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}

这会保留所有“y”字段小于10的tuples。传给MyFilter的TridentTuple参将只包含字段“y”。这里需要注意的是,当选择输入字段时,Trident只发送tuple的一个子集,这个操作是非常高效的。

让我们一起看一下“功能字段(function field)”是怎样工作的。假定你有如下这个函数:

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}

这个函数接收两个数作为输入并输出两个新的值:这两个数的和与乘积。假定你有一个stream,其中包含“x”,”y”和”z”三个字段。你可以这样使用这个函数:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

输出的功能字段被添加到输入tuple后面,因此这个时候,每个tuple中将会有5个字段”x”, “y”, “z”, “added”, 和 “multiplied”.。”added” 和”multiplied”对应于AddAndMultiply输出的第一和第二个字段。

另外,我们可以使用聚合器来用输出字段来替换输入tuple。如果你有一个stream包含字段”val1″和”val2″,你可以这样做:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

输出流将会仅包含一个tuple,该tuple有一个“sum”字段,这个sum字段就是一批tuple中“val2”字段的累积和。但是若对groupby之后的流进行该聚合操作,则输出tuple中包含分组字段和聚合器输出的字段,例如:

stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

这个例子中的输出包含“val1”字段和“sum”字段。

State

在实时计算领域的一个主要问题就是怎么样来管理状态并能轻松应对错误和重试。消除错误的是不可能的,当一个节点死掉,或者一些其他的问题出现时,这些batch需要被重新处理。问题是-你怎样做状态更新来保证每一个消息被处理且只被处理一次?

这是一个很棘手的问题,我们可以用接下来的例子进一步说明。假定你在做一个你的stream的计数聚合,并且你想要存储运行时的count到一个数据库中去。如果你只是存储这个count到数据库中,并且想要进行一次更新,我们是没有办法知道同样的状态是不是以前已经被update过了的。这次更新可能在之前就尝试过,并且已经成功的更新到了数据库中,不过在后续的步骤中失败了。还有可能是在上次更新数据库的过程中失败的,这些你都不知道。

Trident通过做下面两件事情来解决这个问题:

1、每一个batch被赋予一个唯一标识id“transaction id”。如果一个batch被重试,它将会拥有和之前同样的transaction id

2、状态更新是按照batch的顺序进行的(强顺序)。也就是说,batch 3的状态更新必须等到batch 2的状态更新成功之后才可以进行。

有了这2个原则,你就可以达到有且只有一次更新的目标。此时,不是只将count存到数据库中,而是将transaction id和count作为原子值存到数据库中。当更新一个count的时候,需要比较数据库中transaction id和当前batch的transaction id。如果相同,就跳过这次更新。如果不同,就更新这个count。

当然,你不需要在topology中手动处理这些逻辑,这些逻辑已经被封装在State的抽象中并自动进行。你的State object也不需要自己去实现transaction id的跟踪操作。如果你想了解更多的关于如何实现一个State以及在容错过程中的一些取舍问题,可以参照这篇文章。

一个State可以采用任何策略来存储状态,它可以存储到一个外部的数据库,也可以在内存中保持状态并备份到HDFS中。State并不需要永久的保持状态。比如说,你有一个内存版的State实现,它保存最近X个小时的数据并丢弃老的数据。可以把 Memcached integration 作为例子来看看State的实现。

Trident topology的执行

Trident的topology会被编译成尽可能高效的Storm topology。只有在需要对数据进行重新分配(repartition)的时候(如groupby或者shuffle)才会把tuple通过network发送出去,如果你有一个trident topology如下:

trident topology example

它将会被编译成如下的storm topology:

trident topology to storm example

小结

Trident使得实时计算更加优雅。你已经看到了如何使用Trident的API来完成大吞吐量的流式计算、状态维护、低延时查询等等功能。Trident让你在获取最大性能的同时,以更自然的一种方式进行实时计算。


Storm高级原语(三) — Trident topology》有 1 条评论

  1. storm 说:

    为什么项目运行第一次可以看到结果,而后面的所有再次运行都不能出现结果?而是直接提示SessionTrackerImpl exited loop!这是为什么?怎么设置后面的可以显示,不能只调试一次啊

留下一个回复

你的email不会被公开。