AmazonKinesis是个用于大规模数据实时处理的完全托管服务。不管你是建立一个从远端传感器中收集数据的系统,还是打造一个在多个不同服务器上做日志收集的应用程序,亦或是建立最新的物联网(IoT)解决方案,AmazonKinesis都可以满足你每个小时从成千上万个不同数据源收集和处理TB级数据的需求。
对于许多这样的系统来说,数据产生的位置对用户来说非常重要。举个例子,从一个远端传感器中发射出的警报不会起到太大的作用,除非用户可以弄清楚事件发生的地点。对于用户来说,地理数据可视化上,在地图上绘制图形是最有效的方法。通过本文,我们将展示如何使用AmazonKinesis建立一个支撑地理标记流数据的系统,并附上了两个简单的数据可视化方法,它们可以让用户快速读懂这些信息。第一个可视化绘制在一个地球仪上,对小规模事件显示非常有效:
第二个可视化可以应对更多规模数量的事件,它将绘制一段时间上事件的热点图。
下图是系统的架构概览。其中,数据生成者将数据推送给AmazonKinesis。随后,AmazonKinesis将处理这些信息,而相关的地理信息则存储在一个AmazonElastiCacheRedisCluster上,运行在ElasticBeanstalk上的node.js网络服务器将负责这些数据的可视化。
整个系统使用Java和JavaScript编码,但是不用担心开发环境不支持这些语言的情况;我们所有的代码都将使用AmazonElasticComputeCloud编译。
重要提示
当在AWS上建立本文所描述的系统,它将在AmazonKinesis和其他AWS服务上产生费用。在可能的情况下,我们将使用AWSFreeTier标准资源。换句话说,即使产生了标准Amazon资费,这个费用也已经被最小化了。
地理标记数据源
鉴于大多数读者都无权访问一个连接了设备或传感器的网络,我们必须为本文用例中需要使用的地理标记数据寻找一个合适的替代源。每天,都会有大约5亿的Tweets被发布,Twitter通过他们的流API为开发者提供了一个小型样本。如果Tweet从一个移动设备中发出,那么它就有可能做地理位置标注,让你清楚这条Tweet发布者的位置。你可以注册一个Twitter开发者账户,随后建立一个应用程序。这里,请确保你的应用程序设置为只读访问,随后点击KeysandAccessTokens面板底部的CreateMyAccessToken按钮。到这个步骤后,你将拥有4个Twitter应用程序秘钥:ConsumerKey(APIKey)、ConsumerSecret(APISecret)、AccessToken和AccessTokenSecret。一旦你拥有这些秘钥,你已经做好了建立AWS解决方案的所有准备。
建立一个安全组(SecurityGroup)
在建立我们系统之前,我们需要为多个服务器建立一个安全组。在AWSConsole上,我们需要通过两个规则建立一个安全组。第一个就是允许SSH(port22)传输,这个步骤允许我们连接到服务器。第二个规则是,当来源是所建立SecurityGroup的Id时,允许所有端口上的任何协议传输。这允许从实例传入的数据会被指定到同一个安全组,同时也保证了系统中所有服务器都可以进行通信。
建立一个AmazonKinesisStream
下一个任务则是建立一个AmazonKinesisStream。在AWSConsole上,进入Region对应的AmazonKinesis页面,并点击CreateStream按钮。给你的数据流起一个名字,并选择shard的数量。一个AmazonKinesisStream由一个或一个以上的shard组成,每个shard都提供了1MB/sec数据输入和2MB/sec的数据输出。在AWS上,一个数据流的总容量可以通过shard的数量非常简单的计算;换个角度来看,系统也可以便捷的增加或减少shard的数量来调整数据流的容量。单独看每个shard,它们每秒可以处理个写入事务。而在本文的测试过程中,Twitter流API总是运行在每秒Tweets以下,因此我们只需要1个shard。
建立一个AmazonElastiCache实例
下一步,使用AWSConsole进入AmazonElastiCache页面,并点击LaunchCacheCluster按钮。同时,我们需要在下个页面上建立一个单节点Redis集群。选择标准的缓存端口,同时设置NodeType,选择cache.m3.medium类型的节点。。在这里,请确保你使用了之前所建立的安全组。一旦AmazonElastiCache服务器建立,你可以在左侧导航的CacheClusters页面中发现终端的名称,并点击Nodes为集群连接。系统其他部分到集群的读写都会使用这个终端名和端口名。
数据生产者(Producer)
如果你有从远端传感器收集数据的条件,那么它们可以直接把数据传送给AmazonKinesis。在你的架构中,我们需要一个“Producer(生产者)”从Twitter中抽取数据并将之传输给AmazonKinesis。Twitter拥有一个名为Hosebird(HBC)的开源Java库,它可以做他们流API的数据抽取。用于配置客户端的代码如下:
如代码所示,我们在终端上使用了“addQueryParameter”来过滤数据,因此只会发送标记地理位置的Tweets。HBCGitHub页面上的例子展示了如何进行检索项过滤,举个例子,你可以通过标签限制分析指定的Tweets。这里有一件事情需要注意:如果你使用多于一个的过滤器,那么它们将进行“OR”计算。如果你在上述代码中增加了“trackTerms”,那么返回的结果将包含你的检索项或者是地理标志Tweets,这显然不是我们所需要的。在这里,通常的做法是让HBC做最严格的检索(通常是搜索项)来限制从Twitter返回的数据大小,随后在代码中建立第二个过滤器。
根据上文的设置,即一个shard组成的Stream,它每秒最多可以处理个事务。因此在通过一个单节点发送数据时就会产生一个问题:在如此速率的传输下,每个Tweet的处理时间只有1毫秒。即使只考虑网络延时这一个因素,我们就很难让到AmazonKinesis的调用如此快速。因此,数据生产者可以使用一个线程池来并发进行调用。下面的代码显示这个步骤如何进行:
每个Tweet都会被设置成JSON字符串。它们取自HBC队列(msgQueue),并传输给数据生产者。在内部,Producer会将这些字符串放到一个队列中,而一个工作者线程将把数据发送给AmazonKinesis。因为我们无需在shard上对消息排序,这里将使用一个随机的分区键。
在AmazonEC2上运行Producer之前,你需要设置一个IAMrole,这个步骤将允许EC2实例对AmazonKinesis和AmazonS3进行访问。这里我们点开AWS控制台,点击IAM,并建立一个role。这个role是一个用于AmazonEC2role的AWSServiceRole,定制策略如下:
建立一个使用AmazonLinuxAMI的m3.mediumAmazonEC2。指定上文所建立的IAMrole和SecurityGroup。修改以下脚本,加入AmazonKinesisStream名称、所使用的Region(比如,us-west-1),以及上文建立Twitter应用程序时所获得的4个秘钥。将这些粘贴到UserData用于修改实例。最后,审查和启动你的实例,记得提供一个秘钥对用于后续的访问。
(注:完整代码请阅读原文获取,下同)
这个脚本将运行在实例启动并安装了OpenJDK和ApacheMaven之后。随后,它会从Git上下载Producer所需的源代码,并建立配置文件。最终,ApacheMaven将用于建立这些源代码。为了启动Producer,跟随说明SSH到实例和类型:
在服务启动及ApacheMaven和jar文件都就绪后,你可能还需要等待数分钟。在这里,你应该可以看到Tweets被发送到AmazonKinesis的信息。AmazonKinesisApplication将检索从AmazonKinesis获得的JSONTweet,提取地理位置信息并将之推送到AmazonElastiCacheRedisCluster。
应用程序来自AmazonKinesisApplication示例,它被托管在AWSGitHub页面上(它非常适合作为所有AmazonKinesisApplication的起点)。AmazonKinesisApplication使用Jedis来与AmazonElastiCacheRedis进行交互。当RecordProcessor构造和对象初始化时,Redis服务器的详情会被提取。为了处理来自Twitter的记录,下面的代码将被添加到RecordProcessor:
如代码所示,Tweet的地理位置提取于我们从AmazonKinesis接收到的JSON消息。这些信息被包装成一个JSON对象,并被推送到一个Redis键。如果你期望在生产环境中运行一个AmazonKinesisApplication,你可以考虑通过之前博客(HostingAmazonKinesisApplicationsonAWSElasticBeanstalk)将代码托管在ElasticBeanstalk上。除此之外,你也可以将应用程序设置为JAR,并在服务器上直接运行。无论使用哪种方式,你必须为你的目标服务器建立一个IAMrole。访问IAM控制台,并建立一个具备以下策略的AWSServiceRole:
如果你在ElasticBeanstalk之外运行你的AmazonKinesisApplication,建立一个AmazonEC2实例(m3.meduimandAmazonLinuxAMI),并分配你之前建立的IAMrole。再次提醒,务必使用之前建立的安全组;它允许你的实例可以与Redis通信。通过键入AmazonKinesisStream名称、Region名称,以及所建立的AmazonElastiCacheRedis集群的名称和端口。随后,你可以将下面的脚本复制到服务配置界面的“UserData”部分。
下面脚本与前文的工作原理相同。OpenJDK和ApacheMaven会被安装,用于AmazonKinesisApplication的代码会从Git上下载并通过ApacheMaven安装。SSH到实例。一旦jar被建立,键入以下的命令:
随后,你将看到一些数据从AmazonKinesis抽取,以及坐标数据被发送到Redis的信息。
Node.js服务器
你可以在这里下载用于node.js服务器的代码。Node.js服务器使用了两个非常重要的库:node_redis来与Redis通信,socket.io在浏览器和服务器之间创建一个socket连接。Socket被用于推送新Tweet信息到任何连接用户。
当node.js服务器启动,它将被注册以监视Redis发布的变化:
如果Redis发布产生变化,系统可以非常便捷的将这个变化推送到所有连接用户:
你下载的node.js代码同样包含了用于可视化地理位置数据的HTML和JavaScript代码。
这个代码包含了两个用于可视化的方法,它们都使用了Three.js来做3D渲染。第一个可视化方法的出发点是一个旋转的地球仪,它基于Bj?rnSandvik一篇非常著名的博客。这里同样需要感谢TomPaterson,它开源了我们需要使用的textures。在这里,地球仪的主要代码是public/js/earth.js。环境建立通过调用createEarth、createClouds、createStars、createSun和createLensFlare完成。
设置最后一部分是建立50个lights,它们开始时都被关闭。当sockets检测到一条tweet被发送到浏览器,lights池将被搜索以获得一个空闲的light。随后这个light会被移动到tweet的发送位置,而在几秒内,你将看到亮度会逐渐增加然后减弱。从而实现了每条Tweet发送时的可视化。需要注意的是,在发光开始时,它所持续的时间和颜色就被确定。下面的代码将触发这个过程:
旋转的地球仪组合灯光让信息来的非常直观,但是这个可视化方法的弱点是可显示信息的数量。建立这个图像和灯光是非常资源密集型的一个操作,如果你测试,你就会发现当显示事件每秒超过25个时,机器的速度就会变慢。
第二个可视化方法旨在展示更多的数据点,其主要部分代码在public/js/earth2.js中。代码的结构非常类似于上个方法,环境通过createEarth和createBackgroundLight两个方法建立。这里所使用的观点非常简单,整个地图被水平表示,从而节省了大量的计算资源。取代建立lights池,这个可视化方法将建立一个拥有个点的PointCloud。每个点都建立在一个无穷大的位置,因此用户不会看到它。
当sockets播放一个Tweet,代码会搜索一个没有被使用的点,随后将之移动到Twitter发送的位置。每个点都使用了AdditiveBlending,这就意味着同一个位置的点越多,那个地方的颜色越亮。将每个点保存一个足够的时间(比如10秒),你可以获得Tweets产生位置的热点图。
为了让node.js运行在ElasticBeanstalk上,你首先需要为代码建立一个zip存档。Zip目录的内容(比如globe.html、heatmap.html和server.js),而不是父文件夹。下一步,使用以下策略为应用程序建立一个IAMrole:
到了这一步,你可以到AWSConsole的ElasticBeanstalk,并建立一个新的应用程序。
1.将Node.js设置为预定义环境,指定代码的zip文件作为应用程序源。
2.在ConfigurationDetails页面,指定你刚建立的IAMrole。
3.在ElasticBeanstalk应用程序建立之后,选择环境左侧导航中的Configuration,随后选择SoftwareConfiguration面板(点击右上角的cog)。
4.在EnvironmentProperties下,在PARAM1参数中键入RedisElastiCache名称。
5.最后,选择实例面板,加入SecurityGroup名称。
如果你回到主应用程序仪表盘,你将看到你可视化应用程序的地址,你可以在AWSElasticBeanstalkApplication建立完成后访问。如果遭遇了WebGL不支持警告,请确保你使用了兼容浏览器。
总结
本文向你展示了AmazonKinesis如何实时捕获并保存地理位置数据,并在一个基于网络的客户端上可视化。当AmazonKinesisApplication处理Tweet时,它只是提取了JSON文件中的地理位置信息。因此,我们可以很简单的将之修改成提取其他数据,并用来提升可视化。
本文介绍的解决方案同样是可以扩展的:给AmazonKinesisStream中添加Shard非常简单,AmazonKinesisApplication同样可以跑在一群主机上以增加吞吐量。
如果你有问题或者是建议,请在评论中填写。同时,如果你已经做了自己的可视化并且愿意进行分享,你只需要留一下一个评论即可。
预览时标签不可点收录于话题#个上一篇下一篇