KafkaStreams介绍(二)

说明:

本文是Confluent Platform 3.0版本中对于Kafka Streams的翻译。

原文地址:https://docs.confluent.io/3.0.0/streams/index.html

看了很多其他人翻译的文档,还是第一次翻译,有什么翻译的不好的地方还请指出。

 

这是Kafka Streams介绍的第二篇,以前的介绍如下:

http://blog.csdn.net/ransom0512/article/details/51971112

 

1.  快速入门

1.1.  目标

本快速入门指南的目标是提供与KafkaStreams的第一个应用程序示例。我们将演示在你的第一个示例程序中,如果使用Kafka Streams库和演示一个简单的端到端的数据流。

值得注意的是,这种快速入门只涵盖了KafkaStreams的表面,这篇文档的剩余部分将会提供更多的细节,我们将在快速入门指南中为你指明方向。

1.2.  我们想做什么

在这个快速入门中,我们将运行包含Apachekafka的一个wordcount演示应用程序。下面代码的关键在于使用Java8的lambda表达式,易于阅读。(摘自WordCountLambdaExample):

//序列化/反序列化Sting和Long类型
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
 
//通过指定输入topic “streams-file-input”来构造KStream实例,
//输入数据就以文本的形式保存在topic “streams-file-input” 中。
//(在本示例中,我们忽略所有消息的key.)
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
 
KStream<String, Long> wordCounts = textLines
//以空格为分隔符,将每行文本数据拆分成多个单词。
//这些文本行就是从输入topic中读到的每行消息的Value。
//我们使用flatMapValues方法来处理每个消息Value,而不是更通用的flatMap
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
//我们随后将调用countByKey来计算每个单词出现的次数
//所以我们将每个单词作为map的key。
.map((key, value) -> new KeyValue<>(value, value))
//通过key来统计每个单词的次数
//
//这会将流类型从KStream<String,String>转为KTable<String,Long> (word-count).
//因此我们必须提供String和long的序列化反序列化方法。
    //
.countByKey(stringSerde, "Counts")
//转化KTable<String,Long>到KStream<String,Long>
    .toStream();
 
//将KStream<String,Long>写入到输出topic中。
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");


在上面的代码执行过程中,我们将执行如下步骤:

1、  启动一台kafka集群

2、  使用Kafkaconsole producer命令行生产者客户端往Kafka Topic中写入示例输入数据

3、  在Java应用程序中使用kafkaStream库来处理输入数据。这里,我们使用了一个包含kafka的WordCount示例程序。

4、  使用Kafkaconsole consumer命令行消费者客户端检查应用程序的输出。

5、  停止Kafka集群

1.3.  启动Kafka 集群

在本章节中,我们会在一台机器上安装并启动Kafka集群。该集群有一个单节点Kafka(只有一个Broker)外加一个单节点Zookeeper构成。在wordcount演示程序中,这种集群依赖是必须的。我们假定kafka broker运行地址为localhost:9092, Zookeeper本地地址为localhost:2181。

首先,安装oracle JRE或JDK 1.7及以上版本

然后,下载和安装包含Kafka Streams的新版本Apache Kafka. 为此,我们使用Confluent Platform 3.0.0版本。

(下面操作比较简单,所以不翻译了。)

# Download and install Confluent Platform 3.0.0 from ZIP archive
$ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
$ unzip confluent-3.0.0-2.11.zip
 
# *** IMPORTANT STEP ****
# The subsequent paths and commands used throughout this quickstart assume that
# your are in the following working directory:
$ cd confluent-3.0.0/
 
# Note: If you want to uninstall the Confluent Platform at the end of this quickstart,
# run the following commands.
#
#   $ rm -rf confluent-3.0.0/
#   $ rm -rf /var/lib/kafka          # Data files of Kafka
#   $ rm -rf /var/lib/kafka-streams  # Data files of Kafka Streams
#   $ rm -rf /var/lib/zookeeper      # Data files of ZooKeeper

提示:可以通过Installationvia ZIP and TAR archives 和ConfluentPlatform Quickstart 获取更进一步信息。

我们首先启动ZooKeeper实例。该实例将监听本地2181端口。由于这是一个长期运行的服务,你应该在自己的终端中运行。

# Start ZooKeeper.  Run this command in its own terminal.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

接下来,我们启动Kakfa的Broker,这将监听本地9092端口,然后连接到我们刚刚启动的Zookeeper实例。这也是一个长期运行的服务,也应该在终端中运行它。

# Start Kafka.  Run this command in its own terminal
$ ./bin/kafka-server-start ./etc/kafka/server.properties

现在,我们的单节点kafka集群已经完全运转起来了,我们就可以着手准备输入数据,运行我们的第一个kafka Streams示例程序。

1.4.  准备输入数据

提示:在本章节中,我们将使用内置的命令行工具来输入kakfa数据。在实际使用中,你应该通过其他方式将数据写入Kafka中,比如通过你自己应用程序中的Kafka客户端。

         现在,我们将一些输入数据发送到Kafka的topic中,然后由Kafka Streams的应用程序做后续处理。

         首先,我们要创建名称为streams-file-input的topic:

$ ./bin/kafka-topics --create \
          --zookeeper localhost:2181 \
          --replication-factor 1\
          --partitions 1\
          --topic streams-file-input

下一步,我们生成一些输入数据病保存在本地文件/tmp/file-input.txt中。

$ echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt

生成的文件将包含如下内容:

all streams lead to kafka
hello kafka streams
join kafka summit

最后,我们发送这些数据到input topic

$ cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-file-input

Kafka consoleproducer从stdin中读取数据,并将每一行作为单独的消息发送到kafka的输入流中。该消息的key是null,消息是每行内容,使用字符串编码。

注意: 你可能想知道这样一步步的快速启动和真实流处理系统的差异,在大型的实时的流处理系统中,数据总是在移动的,快速入门的目的仅仅是做功能证明。简单来说,一个端到端的数据管道建立在Kafka和Kafka Streams的各个方面。出于说教的原因,我们故意将快速入门清楚地拆分成一系列分开连续的步骤。

         但在实践中,这些步骤通常会看起来有些不同并且会有并发的存在。比如输入数据可能不会来源于本地文件,而是直接从分布式系统中发送的,并且数据将被连续的写入Kafka。类似的,流处理应用程序可能在第一行数据发送之前就已经启动并运行。

 

1.5.  在KafkaStreams中处理输入数据

现在,我们已经生成了一些输入数据,我们可以运行我们的第一个基于Kafka Streams的java应用程序。

         我们将运行WordCount演示应用程序,它使用了ApacheKafka。它实现了WordCount算法,从输入文本来计算直方图。然而和其他你之前见过的操作被绑定在数据上的WordCount实例程序不同的是,这个示例程序是数据无界,无限流动的。和有界算法的变体类似,他是一个有状态的算法,跟踪并更新word的计数器。然后因为它必须接受无界的输入数据,它会周期性低输出其当前状态和计算结果,同时继续处理更多的数据,因为它不知道是否已经处理了所有的数据。这就是他和Hadoop 的Mapreduce算法之间的典型差异。一旦我们了解这种差异,检查了实际的输出数据之后,会更容易接受它。

         由于wordCount示例程序与kafka打包在一起,已经在Kafka的Broker中集成,这就意味着我们不需要做额外的事情就可以运行它,无需编译任何Java源代码。

# Run the WordCount demo application.  There won't be any STDOUT output.
# You can safely ignore any WARN log messages.
$ ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo

注意,这里没有魔术式的部署,实际上,使用kafkaStreams库中的任何应用程序,就像启动任何普通的Java应用程序,该脚本kafka-run-class也只是一个简单的java -cp命令的包装。

         该WordCount示例程序将从输入topic中读取数据,然后计算wordCount,将计算结果不断进行输出。演示将运行几秒钟,然后和其他典型流处理应用程序不同的是,它将会自动终止。

1.6.  检查输出结果

在本章节中,我们将使用内置的命令行工具从kafka中手工读取数据。在实际使用中,你可以通过其他方式,通过Kakfa客户端从Kafka中读取数据。比如,如果你可以在自己的应用程序中使用Kafka客户端将数据从Kakfa中迁移到其它数据系统。

         现在,我们可以从kafka输出topic中读取数据并检查wordcount实例运行结果。

./bin/kafka-console-consumer --zookeeper localhost:2181 \
          --topic streams-wordcount-output \
          --from-beginning \
          --formatter kafka.tools.DefaultMessageFormatter \
          --property print.key=true\
          --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
          --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Wordcount的数据将会被打印在如下的控制台中:

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

这里,第一列是Kafka消息的key的字符串格式,第二列是消息的值,long类型。你可以通过Ctrl+c命令来终止控制台输出。

但是等一下,输出看起来是不是很奇怪?为什么会出现重复的条目?比如streams出现了两次:

# Why not this, you may ask?
all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

对于上面的输出的解释是,wordCount应用程序的输出实际上是持续更新的流,其中每行记录是一个单一的word(即Message Key,比如Kafka)的计数。对于同一个Key的多个记录,么个记录之后是前一个的更新。

 

下面的两个图说明了在输出之后发生了什么。第一列显示KTable<String, Long>即countByKey的计数当前状态的演化。第二列表示从状态更新到KTable的结果和最终结果,一旦产生从KTable#通Stream()转到KStream<String, Long>的记录,相应结果就会被输出到Kafka。

         首先,文本行”所有到kafka的流”正在处理中,每个新Table项中的新单词结果正在被构建成KTable对象(绿色高亮显示部分),并且相应的变化结果会被发送到下游KStream。





当第二个文本航的hello kafkastreams被处理的时候,我们观察到,相对第一次,已经存在的条目KTable被更新了(Kafak和Streams这两个单词). 修改后的记录被在此发送到了KStream。

这就解释了上述KStream第二列中显示的信息,为什么输出的topic上显示的内容,因为它是包含了变化的完整内容

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

下面的展望超出了这个例子的范围。Kafka Strems使用了KTable和changlog的KStream,利用changlog Stream和KTable之间的二元性,你可以发布KStream表中从开始到结束的完整changelog,这样就可以重建KTable的内容。

1.7.  停止Kafka集群

一旦完成了快速入门,你可以按照以下顺序关闭Kafka集群。

1、  在它运行的终端中,使用Ctrl+c来停止KafkaBroker,或者杀死该进程。

2、  在其他终端,使用Ctrl+C停止Zookeeper实例或者杀死该进程

恭喜你,你已经运行了Kafka Streams的第一个应用程序并且将数据保存在了一个单节点的kafka集群中。Yeah!

1.8.  接下来该何去何从

至于下一步,我们会建议你:

1、  阅读KafkaStreams架构,了解其主要概念和设计原则。

2、  深入阅读KafkaStreams开发指南,这里包含了kafka Streams的DSL等各种文档。这些将会帮助你编写Kafka Streams的第一个应用程序。

处理Kafka Streams,你可能对下面这些也感兴趣:

1、  kafka Connect工具,在kakfa和其他数据系统必须Hadoop中迁移数据。

2、  从Kafka Client中读取和写入数据到你自己的应用程序当中。

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
2016年7月19日下午,笔者做客国泰君安通信研究团队”软银收购ARM“深度解读电话会议,与在线的150多位机构投资者分享了对于”软银收购ARM“的个人观点。 以下为电话会议实录,略经编辑以及后期补充部分观点。 主持人:各位同事朋友大家下午好,我是国泰君安通信行业分析师宋嘉吉,欢迎大家今天参加本次电话会议,此次会议的主题是软银收购ARM,7月18号软银宣布以243亿英镑收购半导体IP供应商ARM,是对未来物联网战略的提前卡位,我们认为这也是物联网行业布局芯片的又一重磅催化。今天大唐电信封了涨停,按照我们对
一、HBase伪分布式集群安装 1、安装包解压 $ cd app/ $ tar -xvfhbase-1.2.0-cdh5.7.1.tar.gz $ rmhbase-1.2.0-cdh5.7.1.tar.gz   2、添加环境变量 $ cd ~ $ vim .bashrc exportHBASE_HOME=/home/developer/app/hbase-1.2.0-cdh5.7.1 exportPATH=$PATH:$HBASE_HOME/bin $ source .bashrc   3、编辑hbase
本文将介绍Oracle集成云Agent的基础架构,所包含的组件,和如何连接云与OP应用。 目前/典型的集成方式 目前常用的将云应用/基于互联网的应用与企业内部部署(OP)应用连接的方式为:穿透一层或者更多的防火墙,使用反向代理、Oracle API Gateway或者OHS。要实现这些操作需要多种专业知识,比如防火墙需要开放入站端口,暴露一个私有的SOAP/REST服务并且配置网络路由。SOAP/REST服务可以用SOA套件之类的产品实现,比如与CRM系统进行通讯,实现客户信息的接收。如下图所示: 如果使
本次主要是详细记录Docker1.12在Ubuntu16.04上的安装过程,创建Docker组(避免每次敲命令都需要sudo),Docker常用的基本命令的总结,在容器中运行Hello world,以及创建一个基于Python Flask的web应用容器的全过程。 1.Docker1.12在Ubuntu16.04上安装 1.1.先决条件1,添加Docker源 wxl @wxl - pc: ~ $ sudo apt-get update 增加CA证书 wxl@wxl -pc :~$ sudo apt -ge
参考自: http://blog.csdn.net/jdplus/article/details/45920733 进行了大范围修改和完善 文件下载 CDH (Cloudera’s Distribution, including Apache Hadoop),是Hadoop众多分支中的一种,由Cloudera维护,基于稳定版本的Apache Hadoop构建,并集成了很多补丁,可直接用于生产环境。  Cloudera Manager则是为了便于在集群中进行Hadoop等大数据处理相关的服务安装和监控管理的
Spark可以通过三种方式配置系统: 通过SparkConf对象, 或者Java系统属性配置Spark的应用参数 通过每个节点上的conf/spark-env.sh脚本为每台机器配置环境变量 通过log4j.properties配置日志属性 Spark属性 Spark属性可以为每个应用分别进行配置,这些属性可以直接通过SparkConf设定,也可以通过set方法设定相关属性。 下面展示了在本地机使用两个线程并发执行的配置代码: val conf = new SparkConf() .setMaster(
云端基于Docker的微服务与持续交付实践笔记,是基于易立老师在阿里巴巴首届在线技术峰会上《云端基于Docker的微服务与持续交付实践》总结而出的。 本次主要讲了什么? Docker Swarm Docker Swarm mode 微服务支持(Docker集群架构体系) Docker的发展趋势和前沿成果 在Docker技术方面还是很佩服大牛的,所以赶紧写下笔记,追随大神的脚步。 阿里云资深专家易立,技术就不说了,他比其他直播间硬生生多讲了半个多点,于情于理还是万分感谢本次分享的(可惜devOps没时间讲了
前面我们已经部署好了一个Docker Swarm集群环境,接下来,我们就对Swarm集群的相关管理进行简单介绍。 集群调度策略 既然是集群,就是有一个调度策略,也就是该集群包含那么多子节点,我到底是设置一个什么样的策略来进行分配呢? 我们查看Docker官方文档可以看到Swarm的集群调度包含三种策略: To choose a ranking strategy, pass the  --strategy  flag and a strategy value to the  swarm manage  co

docker4dotnet #2 容器化主机 - 2016-07-21 14:07:47

.NET 猿自从认识了小鲸鱼,感觉功力大增。上篇 《docker4dotnet #1 前世今生世界你好》 中给大家介绍了如何在Windows上面配置Docker for Windows和Docker Tools for Visual Studio来使用docker协助.NET Core应用的开发,这篇我们来看看如何创建和管理容器化主机。 所谓容器化主机Dockerized Host,就是安装了docker engine的主机,可以使用docker工具进行管理。使用docker来协助开发,我们至少需要本地和

Oozie安装总结 - 2016-07-20 18:07:35

一、使用CM添加服务的方式安装Oozie 如果在创建Oozie数据库时失败,且提示数据库已存在,如下图,则可能是之前已经安装过Oozie,没有卸载干净,需要手动将Oozie服务器数据目录删掉(oozie/data部分),见图二                                                                            (图一)