TranslateProject/translated/tech/20171010 Getting Started Analyzing Twitter Data in Apache Kafka through KSQL.md
qhwdw 7c44609be9
翻译完成
原文似乎不全,部分演示的代码没有,请核对者查看一下原文出处。----by qhwdw
2017-10-31 20:46:21 +08:00

8.4 KiB
Raw Blame History

在 Apache Kafka 中通过 KSQL 分析 Twitter 数据入门

KSQL 是 Apache Kafka 中的开源的流式 SQL 引擎。它可以让你在 Kafka 主题topics使用一个简单的并且是交互式的SQL接口很容易地做一些复杂的流处理。在这个短文中我们将看到怎么去很容易地配置并运行在一个沙箱中去探索它使用大家都喜欢的一个演示数据库源 Twitter。我们将去从 teewts 的行流中获取,通过使用 KSQL 中的条件去过滤它。去构建一个合计,如统计 tweets 每个用户每小时的数量。

首先, 抓取一个汇总平台的拷贝。我使用的是 RPM 但是,如果你想去使用的话,你也可以使用 tar, zip, etc 。启动 Confluent

$ confluent start

(如果你感兴趣,这是一个 在Confluent CLI 上的快速教程 )

我们将使用 Kafka 连接去从 Twitter 上拖数据。 Twitter 连接器可以在 on GitHub here上找到。去安装它,像下面这样操作:

# Clone the git repo cd /home/rmoff git clone https://github.com/jcustenborder/kafka-connect-twitter.git

# Compile the code cd kafka-connect-twitter mvn clean package

从我们建立的 连接器 上建立连接, 你要去修改配置文件.自从我们使用 Confluent CLI 真实的配置文件是 etc/schema-registry/connect-avro-distributed.properties, 因此去修改它并增加如下内容:

plugin.path=/home/rmoff/kafka-connect-twitter/target/kafka-connect-twitter-0.2-SNAPSHOT.tar.gz

重启动 Kafka 连接: confluent stop connect confluent start connect

一旦你安装了插件,你可以很容易地去配置它。你可以直接使用 Kafka Connect REST API ,或者创建你的配置文件,这就是我要在这里做的。如果你需要全部的方法查看 Twitter to grab your API keys first

假设你写这些到 /home/rmoff/twitter-source.json,你可以现在运行:

$ confluent load twitter_source -d /home/rmoff/twitter-source.json

然后 tweets 从大家都喜欢的网络明星 [rick]-rolling in…开始

现在我们从 KSQL 开始 ! 马上去下载并构建它:

cd /home/rmoff   git clone https://github.com/confluentinc/ksql.git   cd /home/rmoff/ksql   mvn clean compile install -DskipTests

构建完成后,让我们来运行它:

./bin/ksql-cli local --bootstrap-server localhost:9092

使用 KSQL 我们可以让我们的数据保留在 Kafka 话题上并可以查询它。首先,我们需要去告诉 KSQL 主题上的数据模式schema是什么一个 twitter 消息是一个真实的非常好的巨大的 JSON 对象, 但是,为了简洁,我们只好选出几个行去开始它:

ksql> CREATE STREAM twitter_raw (CreatedAt BIGINT, Id BIGINT, Text VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01', VALUE_FORMAT='JSON'); Message   ---------------- Stream created

在定义的模式中,我们可以查询这些流。使用 KSQL 去展示从开始的主题中取得的数据 (而不是当前时间点,它是缺省的),运行:

ksql> SET 'auto.offset.reset' = 'earliest';   Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

现在,让我们看看这些数据,我们将使用 LIMIT 从句仅检索一行:

现在,让我们使用刚才定义的可用的 tweet 负荷的全部内容重新定义流:

现在,我们可以操作和检查更多的最近的数据,使用一般的 SQL 查询:

注意这里没有 LIMIT 从句,因此,你将在屏幕上看到  continuous query 的结果。不像关系表中返回一个确定数量结果的查询,一个运行在无限的流式数据上的持续查询, 因此,它总是可能返回更多的记录。点击 Ctrl-C 去中断燕返回到 KSQL 提示符。在以上的查询中我们做了一些事情:

  • TIMESTAMPTOSTRING 去转换时间戳从 epoch 到人类可读格式。(译者注: epoch 指的是一个特定的时间 1970-01-01 00:00:00 UTC

  • EXTRACTJSONFIELD 去展示源中嵌套的用户域中的一个它看起来像

  • 应用谓语去展示内容,对#hashtag使用模式匹配 使用 LCASE 去强制小写字母。译者注hashtagtwitter中用来标注线索主题的标签

关于支持的功能列表,查看 the KSQL documentation

我们可以创建一个从这个数据中得到的流:

并且查询这个得到的流:

在我们完成之前,让我们去看一下怎么去做一些聚合。

你将可能得到满屏幕的结果;这是因为 KSQL 在每次给定的时间窗口更新时实际发出聚合值。自从我们设置 KSQL 去读取在主题 (SET 'auto.offset.reset' = 'earliest';) 上的全部消息,它是一次性读取这些所有的消息并计算聚合更新。这里有一个微妙之处值得去深入研究。我们的入站 tweets 流正好就是一个流。但是,现有它不能创建聚合,我们实际上是创建了一个表。一个表是在给定时间点的给定键的值的一个快照。 KSQL 聚合数据基于消息事件的时间,并且如果它更新了,通过简单的相关窗口重申去操作后面到达的数据。困惑了吗? 我希望没有,但是,让我们看一下,如果我们可以用这个例子去说明。 我们将申明我们的聚合作为一个真实的表:

看表中的列,这里除了我们要求的外,还有两个隐含列:

`ksql> DESCRIBE user_tweet_count;

Field           | Type   -----------------------------------   ROWTIME         | BIGINT   ROWKEY          | VARCHAR(STRING)   USER_SCREENNAME | VARCHAR(STRING)   TWEET_COUNT     | BIGINT   ksql>`

我们看一下这些是什么:

 ROWTIME 是窗口开始时间,  ROWKEY 是 GROUP BY(USER_SCREENNAME) 加上窗口的组合。因此,我们可以通过创建另外一个衍生的表来整理一下:

现在它更易于查询和查看我们感兴趣的数据:

结论

所以我们有了它! 我们可以从 Kafka 中取得数据, 并且很容易使用 KSQL 去探索它。 而不仅是去浏览和转换数据,我们可以很容易地使用 KSQL 从流和表中建立流处理。

如果你对 KSQL 能够做什么感兴趣,去查看:

记住KSQL 现在正处于开发者预览版中。 欢迎在 KSQL github repo 上提出任何问题, 或者去我们的 community Slack group 的 #KSQL通道。


via: https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka

作者:Robin Moffatt 译者:qhwdw 校对:校对者ID

本文由 LCTT 原创编译,Linux中国 荣誉推出