8.4.2 MLlib的数据存储

MLlib支持存储在本地的向量和矩阵,也提供分布式的矩阵(底层实现是一个或多个RDD)。[1]在目前发布版本的实现中,本地的向量和矩阵数据模型提供公共服务接口,基础的线性代数操作是基于Breeze和jblas库的。在MLlib监督学习中的一个训练样例叫做“标记向量”(labeled point)。

1.本地向量或矩阵

(1)本地向量

一个本地向量内的数据类型为double,并且数组序号是从0开始(0-based indices)的整数类型。本地向量存储在单机中。MLlib支持两种类型的本地向量:稠密向量和稀疏向量。稠密向量的底层实现是一个double型的数组存储向量每个元素的值,一个稀疏向量的底层实现是两个并行的数组,一个数组存储向量的序号,一个存储向量元素值。例如,向量(1.0,0.0,3.0)在稠密向量的存储是[1.0,0.0,3.0],而在稀疏矩阵中的存储是(3,[0,2],[1.0,3.0]),如图8-26所示。这里的3代表向量的维数。图8-26所示为两种向量存储模式。

8.4.2 MLlib的数据存储 - 图1

图8-26 稠密向量与稀疏向量存储

本地向量的基本类是Vector类,官方提供了Vector类的两种实现:稠密向量(dense vector)和稀疏向量(sparse vector)。官方推荐使用Vectors类中提供的工厂模式的方法创建本地向量。

下面看官方的向量创建例子。


  1. import org.apache.spark.mllib.linalg.{Vector Vectors}
  2. // 创建一个密集向量 (1.0, 0.0, 3.0)
  3. val dv Vector = Vectors.dense1.0 0.0 3.0
  4. /* 创建一个稀疏向量 (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries */
  5. val sv1 Vector = Vectors.sparse3 Array0 2), Array1.0 3.0))
  6. /* 创建一个密集向量 (1.0, 0.0, 3.0) by specifying its nonzero entries */
  7. val sv2 Vector = Vectors.sparse3 Seq((0 1.0), 2 3.0)))

注意:由于Scala默认情况下引入了import scala.collection.immutable.Vector库,所以需要引入import org.apache.spark.mllib.linalg.Vector库区显式使用MLlib内的向量。

(2)标记向量

一个标记向量(labeled point)是一个本地向量,可以是稠密向量,也可以是稀疏向量,并和一个标记(label)相关联。在MLlib中,标记在监督学习的算法(如分类和回归算法)中使用。在标记向量中使用一个Double类型数据存储标记,即可在回归和分类中都可以使用标记向量。对于二元分类来说,label可以为0(代表negative)或者1(代表positive)。对于多元分类问题,标记可以使用从0开始的序号:0,1,2……

一个标记向量可以使用case类LabeledPoint来存储和呈现。


  1. import org.apache.spark.mllib.linalg.Vectors
  2. import org.apache.spark.mllib.regression.LabeledPoint
  3. /* 创建一个有正标记和稠密特征向量的标记点 */
  4. val pos = LabeledPoint1.0 Vectors.dense1.0 0.0 3.0))
  5. /* 创建一个有负标记和稀疏特征向量的标记点 */
  6. val neg = LabeledPoint0.0 Vectors.sparse3 Array0 2), Array1.0 3.0)))

(3)稀疏数据

常见的情况是使用稀疏数据训练模型。下面介绍稀疏数据格式。MLlib支持读取LIBSVM格式(一种文本格式)的训练数据,这种数据默认被LIBSVM和LIBLINEAR使用。文件的每一行代表一个被标记的稀疏特征向量,它的格式请参考:


  1. label index1value1 index2value2 ...

默认序号索引是从1开始并且是升序的,加载之后,特征的序号被转换为从0开始。

可以使用MLUtils.loadLibSVMFile方法读取存储为LIBSVM格式的训练数据。


  1. import org.apache.spark.mllib.regression.LabeledPoint
  2. import org.apache.spark.mllib.util.MLUtils
  3. import org.apache.spark.rdd.RDD
  4. val examples RDD[LabeledPoint] = MLUtils.loadLibSVMFilesc "mllib/data/sample_libsvm_data.txt"

(4)本地矩阵

一个本地矩阵也存储double型的数据,使用整型的行号和列号。MLlib支持稠密矩阵,稠密矩阵的值存储在一个单独的double数组中,以列序为主序存储。例如,下面的矩阵。


  1. 1.0 2.0
  2. 3.0 4.0
  3. 5.0 6.0

底层存储是以一维数组中存储数据(如[1.0,3.0,5.0,2.0,4.0,6.0])和存储矩阵的行列大小,如(3,2)。在Spark 1.0只提供稠密矩阵,官方将在下一个版本提供稀疏矩阵的实现。本地矩阵的基本类是Matrix,官方目前提供了一种实现:DenseMatrix。官方推荐使用Matrices类中的工厂方法来创建本地矩阵。


  1. import org.apache.spark.mllib.linalg.{Matrix Matrices}
  2. /* 创建一个密集矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))*/
  3. val dm Matrix = Matrices.dense3 2 Array1.0 3.0 5.0 2.0 4.0 6.0))

2.分布式矩阵

一个分布式的矩阵存储的是double类型的数据,底层采用一个或者多个RDD存储,行列序号采用long型存储。存储分布式大数据量矩阵的关键问题是选取正确的数据格式。将一个分布式矩阵转换为另一种不同的格式需要一个全局的Shuffle操作。在分布式环境下,Shuffle开销很大。官方已经实现了3种类型的分布式矩阵,将会在未来提供更多类型矩阵的实现。分布式矩阵的基本类型是RowMatrix。

一个RowMatrix是一个面向行的分布式矩阵。例如,一个特征向量集合的底层实现是一个RDD,RDD的每个元素是一个本地特征向量。在这种情况下,对RowMatrix来说是假设用户的特征矩阵维数不高。一个IndexedRowMatrix和RowMatrix相似,但是会存储行的序号,行序号可以确定行以及有助于进行连接操作。一个CoordinateMatrix是一个分布式矩阵以coordinate list(COO)格式存储,底层也是以存储它的数据项的一个RDD实现的。

注意:因为已经缓存了整个矩阵数据大小的空间,分布式矩阵底层实现的RDD必须是确定的。如果用非确定性的RDD,就很容易出错。

(1)行矩阵

一个行矩阵是面向行存储的分布式矩阵,底层是一个以本地行向量为数据项的一个RDD。由于每个行是一个本地向量,以long型为行序号,所以向量的维数会受数据类型范围限制,但在实际情况下,向量维数会小于这个范围。

一个RowMatrix可以从RDD[Vector]实例创建,然后可以计算行列来统计数据。


  1. import org.apache.spark.mllib.linalg.Vector
  2. import org.apache.spark.mllib.linalg.distributed.RowMatrix
  3. val rows RDD[Vector] = ... // an RDD of local vectors
  4. /* 通过 RDD[Vector]创建一个行矩阵*/
  5. val mat RowMatrix = new RowMatrixrows
  6. /* 获取矩阵大小*/
  7. val m = mat.numRows()
  8. val n = mat.numCols()

(2)行索引矩阵

一个行索引矩阵(indexed Row matrix)的底层实现是一个带行索引的RDD,这个RDD每行是一个长整型的索引和本地向量。一个行索引矩阵可以通过RDD[IndexedRow]的RDD创建,一个indexed row是一个元组(Long,Vector)的封装,Long代表索引,Vector代表本地向量。一个indexed row matrix通过消除行索引可以转换为row matrix。

(3)坐标矩阵

坐标矩阵(coordinate matrix)是一个分布式矩阵,其底层实现也是使用一个RDD存储它的数据项。每个数据项是一个元组(i:Long,j:Long,value:Double),其中,i是行序号索引,j是列序号索引,value是数据项的值。一个坐标矩阵适用于矩阵的行列维度都很大,但矩阵数据很稀疏的情况。

一个坐标矩阵可以从RDD[MatrixEntry]实例创建,matrix entry是一个(Long,Long,Double)的封装。一个coordinate matrix可以通过调用方法toIndexedRowMatrix被转换成含有稀疏行的index row matrix。在Spark MLlib 1.0中,官方不提供针对coordinate matrix的其他计算方法。

[1] 参见Spark官网:https://spark.apache.org/docs/latest/mllib-basics.html。