6.7 股票趋势预测

本例将介绍如何使用Spark构建实时数据分析应用[1],以分析股票价格趋势。

本例假设已预先连接了Spark Streaming。读者可以阅读介绍BDAS的章节预先了解相关概念。

第一步,需要获取数据流,本例使用JSON/WebSocket格式呈现6种实时市场金融信息。

第二步,需要知道如何使用获取到的数据流,本例不涉及专业的金融知识,但可以在这个应用中通过价格改变规律,预测价格趋势。

1.实例描述

本例通过使用scalawebsocket库(Github网址为https://github.com/pbuda/scalawebsocket)访问WebSocket。scalawebsocket库只支持Scala 2.10。获取网上的金融数据流。

输入为:股票名和相应价格。


  1. -------------------------------------------
  2. Time 1375194945000 ms
  3. -------------------------------------------
  4. Croda International PLC - 24.82 - 24.82
  5. ASOS PLC - 47.485 - 47.485
  6. Arian Silver Corp - 0.0435 - 0.0435
  7. Medicx Fund Ltd - 0.7975 - 0.7975
  8. Supergroup PLC - 10.73 - 10.73
  9. Diageo PLC - 20.07 - 20.075
  10. Barclays PLC - 2.891 - 2.8925
  11. QinetiQ Group PLC - 1.874 - 1.874
  12. CSR PLC - 5.7 - 5.7
  13. United Utilities Group PLC - 7.23 - 7.23

输出为:处于增长趋势的股票名称。


  1. ----------------------------------------------
  2. Positive Trending Time 1375269240035 ms
  3. ----------------------------------------------
  4. Real estate
  5. Telecommunication
  6. Graphics publishing & printing media
  7. Environmental services & recycling
  8. Agriculture & fishery

2.设计思路

通过Spark Streaming的时间窗口,增加新数据,减少旧数据。本例中的reduce函数用于对所有价格改变求和(有正向的改变和负向的改变)。之后希望看到正向的价格改变数量是否大于负向的价格改变数量,这里通过改变正向数据将计数器加1,改变负向的数据将计数器减1进行统计,从而统计出股票的趋势。

3.代码示例

通过本书的BDAS章节,假设读者已经对Spark和Spark Streaming有了初步了解,下面将介绍整个应用的设计与开发。

(1)接收流数据

为了在Spark中处理流数据,需要创建一个StreamingContext对象(Spark Streaming中的上下文对象),作为流处理的上下文。之后注册一个输入流(InputDStream),它会初始化一个接收器(Receiver)对象(Spark默认提供了许多类型的接收器,如Twitter、Akka Actor、ZeroMQ等)。由于默认没有网页套接字(WebSocket)的实现,所以本例将自定义这个类,获取网页流数据。

本例通过使用scalawebsocket库(Github网址为https://github.com/pbuda/scalawebsocket)访问WebSocket。scalawebsocket库只支持Scala 2.10。

读者可以选用支持Scala 2.10的Spark版本。

通过下面的代码实现一个简单的trait,进而使用WebSocket(它产生所有可用的股票序列)。


  1. import scalawebsocket.WebSocket
  2. trait PriceWebSocketClient {
  3. import Listings._
  4. def createSockethandleMessage String => Unit = {
  5. websocket = WebSocket().open"ws://localhost:8080/1.0/marketDataWs").onTextMessagem => {
  6. handleMessagem
  7. })
  8. subscriptions.foreachlisting => websocket.sendText"{\"subscribe\":{" + listing + "}}"))
  9. }
  10. var websocket WebSocket = _
  11. }
  12. classPriceEchoextendsPriceWebSocketClient{
  13. createSocketprintln
  14. }

为了能够让Spark正确挂接到WebSocket,并不断接收消息,可以通过实现一个接收器(Receiver)达到这个目的。由于接收的数据符合通用的网络协议,所以通过继承NetworkReceiver类实现接收器。用户需要创建一个块生成器(block generator),并将接收到的消息附加到块生成器中。


  1. classPriceReceiverextendsNetworkReceiver[String]withPriceWebSocketClient{
  2. lazy val blockGenerator = new BlockGeneratorStorageLevel.MEMORY_ONLY_SER
  3. protected override def onStart() {
  4. blockGenerator.start
  5. createSocketm => blockGenerator += m
  6. }
  7. protected override def onStop() {
  8. blockGenerator.stop
  9. websocket.shutdown
  10. }
  11. }

到目前为止,获取的流数据是以JSON格式存储的文本字符串,通过抽取数据中重要的部分进而用其创建case类,这样数据处理将变得更容易。创建一个PriceUpdate case类。


  1. import scala.util.parsing.json.JSON
  2. import scala.collection.JavaConversions
  3. import java.util.TreeMap
  4. case class PriceUpdateid String price Double lastPrice Double
  5. object PriceUpdate{
  6. val lastPrices = JavaConversions.asMapnew TreeMap[StringDouble])
  7. def applytext String): PriceUpdate = {
  8. valid price = getIdAndPriceFromJSONtext
  9. val lastPrice Double = lastPrices.getOrElseid price
  10. lastPrices.putid price
  11. PriceUpdateid price lastPrice
  12. }
  13. /*此方法解析与处理JSON数据格式,暂不赘述*/
  14. def getIdAndPriceFromJSONtext String = // snip - simple JSON processing
  15. }
  16. /*这时,还不能找到金融序列属性,不能获取之前的价格信息。
  17. 同时,需要更新接收器类为下面的情况,解析输入数据*/
  18. import spark.streaming.dstream.NetworkReceiver
  19. import spark.storage.StorageLevel
  20. class PriceReceiver extends NetworkReceiver[PriceUpdate]withPriceWebSocketClient{
  21. lazy val blockGenerator = new BlockGeneratorStorageLevel.MEMORY_ONLY_SER
  22. protected override def onStart() {
  23. blockGenerator.start
  24. createSocketm => {
  25. val priceUpdate = PriceUpdatem
  26. blockGenerator += priceUpdate
  27. })
  28. }
  29. protected override def onStop() {
  30. blockGenerator.stop
  31. websocket.shutdown
  32. }
  33. }

还需要实现一个输入流(InputDStream),这个输入流需要实现getReceiver方法,当外部调用这个方法时,返回一个初始化好的价格接收器。


  1. object streamextendsNetworkInputDStream[PriceUpdate](ssc {
  2. override def getReceiver(): NetworkReceiver[PriceUpdate] = {
  3. newPriceReceiver()
  4. }
  5. }

将之前的程序加入Spark Streaming的主干程序中。


  1. /*创建Spark Streaming上下文*/
  2. val ssc = new StreamingContext"local" "datastream" Seconds15), "C:/software/spark-1.0.0" List"target/scala-2.10.3=/spark-data-stream_2.10.3-1.0.jar"))
  3. /* 创建并注册输入流*/
  4. ssc.registerInputStreamstream
  5. /*启动流数据处理引擎*/
  6. ssc.start()

上面这段代码初始化了流数据处理的上下文,并配置了应用。

local表示在本地执行,datastream是应用的名称,Seconds(15)定义批处理的时间片,"C:/software/spark-1.0.0"定义Spark的路径,List("target/scala-2.10.3=/spark-data-stream_2.10.3-1.0.jar")定义需要的Jar包。

项目结构要为SBT的项目格式,之后在根目录运行SBT package run即可运行,这样将会编译,打包程序生成target/scala-2.10.3=/spark-data-stream_2.10.3-1.0.jar,然后Spark应用使用Jar中的类。

下面代码为到目前为止应用的完整代码。


  1. override def mainargs Array[String]) {
  2. import Listings._
  3. val ssc = new StreamingContext"local" "datastream" Seconds15), "C:/software/spark-0.7.3" List"target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
  4. object stream extends NetworkInputDStream[PriceUpdate](ssc {
  5. override de fgetReceiver(): NetworkReceiver[PriceUpdate] = {
  6. newPriceReceiver()
  7. }
  8. }
  9. ssc.registerInputStreamstream
  10. stream.mappu => listingNamespu.id + " - " + pu.lastPrice + " - " + pu.price).print()
  11. ssc.start()
  12. }

控制台将会产生以下输出。


  1. -------------------------------------------
  2. Time 1375194945000 ms
  3. -------------------------------------------
  4. Croda International PLC - 24.82 - 24.82
  5. ASOS PLC - 47.485 - 47.485
  6. Arian Silver Corp - 0.0435 - 0.0435
  7. Medicx Fund Ltd - 0.7975 - 0.7975
  8. Supergroup PLC - 10.73 - 10.73
  9. Diageo PLC - 20.07 - 20.075
  10. Barclays PLC - 2.891 - 2.8925
  11. QinetiQ Group PLC - 1.874 - 1.874
  12. CSR PLC - 5.7 - 5.7
  13. United Utilities Group PLC - 7.23 - 7.23
  14. ……

(2)处理流数据

通过上文的初始化和数据接收,已经可以源源不断地获取数据了。下面介绍如何处理和分析数据。

下面程序可将数据转化为类股,改变价格和频度的序列。第一次处理时,将每个数据项转化为(类股,价格改变,1)的元组。通过下面的代码完成这个过程。


  1. val sectorPriceChanges = stream.mappu => listingSectorspu.id), pu.price - pu.lastPrice 1)))

现在就可以使用reduceByKeyAndWindow函数了,这个函数允许用户使用滑动窗口处理数据,时间窗口内的数据将会使用reduce函数处理,使用Key-Value对中的Key作为reduce的关键字,这里将使用一个reduce函数和反向reduce函数。

这样每次在时间窗口内迭代时,Spark都对新数据进行reduce处理,需要丢弃的旧数据不再使用reduce处理。

Spark需要做的就是撤销之前最左侧旧数据对整个reduce数据结果的改变,增加右侧新的reduce数据对整个reduce数据结果产生新的改变。

需要写一个reduce和inverse reduce函数。在本例中,reduce函数用于对所有价格改变求和(有正向的改变和负向的改变)。为了看到正向的价格改变数量大于负向的价格改变数量,这里可以通过改变正向数据,将计数器加1,改变负向的数据,将计数器减1达到这个效果。代码如下。


  1. val reduce = reduced DoubleInt), pair DoubleInt)) => {
  2. ifpair._1 > 0 reduced._1 + pair._1 reduced._2 + pair._2
  3. elsereduced._1 + pair._1 reduced._2 - pair._2
  4. }
  5. val invReduce = reduced DoubleInt), pair DoubleInt)) => {
  6. ifpair._1 > 0 reduced._1 + pair._1 reduced._2 - pair._2
  7. elsereduced._1 + pair._1 reduced._2 + pair._2
  8. }
  9. val windowedPriceChanges = sectorPriceChanges.reduceByKeyAndWindowreduce invReduce Seconds5*60), Seconds15))

现在通过上文介绍的函数,已经可以构建一个reduce流处理应用,这个应用能够感知价格波动和趋势。由于只希望呈现出正向波动最剧烈的一些类股。可以过滤流数据,保留下正向波动的类股,然后将数据元组Key-Value的Key改变为可以排序,统计出波动最大的类股的属性。本例假设正向波动剧烈与否的权重为价格改变大小乘以价格改变计数器值,将Value中的两个值组合计算出的结果作为新的Key。最后,将数据按照新的Key打印出最大的5个类股。


  1. import scala.collection.immutable.List
  2. import spark.SparkContext._
  3. import spark.streaming._
  4. import spark.streaming.StreamingContext._
  5. import spark.streaming.dstream._
  6. object DataStreamextendsApp{
  7. val reportHeader = """----------------------------------------------
  8. Positive Trending
  9. =================
  10. """.stripMargin
  11. override def mainargs Array[String]) {
  12. import Listings._
  13. import System._
  14. val ssc = new StreamingContext"local" "datastream" Seconds15), "C:/software/spark-0.7.3" List"target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
  15. object stream extends NetworkInputDStream[PriceUpdate](ssc {
  16. override def getReceiver(): NetworkReceiver[PriceUpdate] = {
  17. newPriceReceiver()
  18. }
  19. }
  20. ssc.checkpoint"spark"
  21. ssc.registerInputStreamstream
  22. val reduce = reduced DoubleInt), pair DoubleInt)) => {
  23. ifpair._1 > 0 reduced._1 + pair._1 reduced._2 + pair._2
  24. elsereduced._1 + pair._1 reduced._2 - pair._2
  25. }
  26. val invReduce = reduced DoubleInt), pair DoubleInt)) => {
  27. ifpair._1 > 0 reduced._1 + pair._1 reduced._2 - pair._2
  28. elsereduced._1 + pair._1 reduced._2 + pair._2
  29. }
  30. val sectorPriceChanges = stream.mappu => listingSectorspu.id), pu.price - pu.lastPrice 1)))
  31. val windowedPriceChanges = sectorPriceChanges.reduceByKeyAndWindowreduce invReduce Seconds5*60), Seconds15))
  32. val positivePriceChanges = windowedPriceChanges.filter{case _ _ count)) => count > 0}
  33. val priceChangesToSector = positivePriceChanges.map{casesector value count)) => value * count sector)}
  34. val sortedSectors = priceChangesToSector.transformrdd => rdd.sortByKeyfalse)).map_._2
  35. sortedSectors.foreachrdd => {
  36. println"""|----------------------------------------------
  37. |Positive Trending (Time: %d ms)
  38. |----------------------------------------------
  39. |""".stripMargin.formatcurrentTimeMillis + rdd.take5).mapsectorCodes_)).mkString"\n"))
  40. })
  41. ssc.start()
  42. }
  43. }

运行上述示例,将会打印出下面的日志,这样就构建出了预测类股趋势的Spark应用。


  1. ----------------------------------------------
  2. Positive Trending Time 1375269240035 ms
  3. ----------------------------------------------
  4. Real estate
  5. Telecommunication
  6. Graphics publishing & printing media
  7. Environmental services & recycling
  8. Agriculture & fishery
  9. ----------------------------------------------
  10. Positive Trending Time 1375269255035 ms
  11. ----------------------------------------------
  12. Real estate
  13. Graphics publishing & printing media
  14. Environmental services & recycling
  15. Agriculture & fishery
  16. Electrical appliances & components
  17. ----------------------------------------------
  18. Positive Trending Time 1375269270034 ms
  19. ----------------------------------------------
  20. Environmental services & recycling
  21. Agriculture & fishery
  22. Electrical appliances & components
  23. Vehicles
  24. Precious metals & precious stones

4.应用场景

读者可以通过这个示例,开发自己的流数据分析应用。数据源可以是爬虫抓取的数据,也可以是消息中间件输出的数据等待。

Spark是整个Spark生态系统的底层核心引擎,单一的Spark框架并不能完成所有计算范式任务。如果有更复杂的数据分析需求,就需要借助Spark的上层组件。例如,为了分析大规模图数据,需要借助GraphX构建内存的图存储结构,然后通过BSP模型迭代算法。为了进行机器学习,需要借助MLlib底层实现的SGD等优化算法,进行搜索和优化。分析流数据需要借助Spark Streaming的流处理框架,将流数据转换为RDD,输入与分析流数据。如果进行SQL查询或者交互式分析,就需要借助Spark SQL这个查询引擎,将SQL翻译为Spark Job。相应的示例用户可以参考BDAS章节和示例进行学习。

[1] 本节参考文章:James Phillpotts,Real-time Data Analysis Using Spark,29 Jul 2013。