8.4.2 MLlib的数据存储
MLlib支持存储在本地的向量和矩阵,也提供分布式的矩阵(底层实现是一个或多个RDD)。[1]在目前发布版本的实现中,本地的向量和矩阵数据模型提供公共服务接口,基础的线性代数操作是基于Breeze和jblas库的。在MLlib监督学习中的一个训练样例叫做“标记向量”(labeled point)。
1.本地向量或矩阵
(1)本地向量
一个本地向量内的数据类型为double,并且数组序号是从0开始(0-based indices)的整数类型。本地向量存储在单机中。MLlib支持两种类型的本地向量:稠密向量和稀疏向量。稠密向量的底层实现是一个double型的数组存储向量每个元素的值,一个稀疏向量的底层实现是两个并行的数组,一个数组存储向量的序号,一个存储向量元素值。例如,向量(1.0,0.0,3.0)在稠密向量的存储是[1.0,0.0,3.0],而在稀疏矩阵中的存储是(3,[0,2],[1.0,3.0]),如图8-26所示。这里的3代表向量的维数。图8-26所示为两种向量存储模式。
图8-26 稠密向量与稀疏向量存储
本地向量的基本类是Vector类,官方提供了Vector类的两种实现:稠密向量(dense vector)和稀疏向量(sparse vector)。官方推荐使用Vectors类中提供的工厂模式的方法创建本地向量。
下面看官方的向量创建例子。
- import org.apache.spark.mllib.linalg.{Vector, Vectors}
- // 创建一个密集向量 (1.0, 0.0, 3.0)
- val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
- /* 创建一个稀疏向量 (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries */
- val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
- /* 创建一个密集向量 (1.0, 0.0, 3.0) by specifying its nonzero entries */
- val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
注意:由于Scala默认情况下引入了import scala.collection.immutable.Vector库,所以需要引入import org.apache.spark.mllib.linalg.Vector库区显式使用MLlib内的向量。
(2)标记向量
一个标记向量(labeled point)是一个本地向量,可以是稠密向量,也可以是稀疏向量,并和一个标记(label)相关联。在MLlib中,标记在监督学习的算法(如分类和回归算法)中使用。在标记向量中使用一个Double类型数据存储标记,即可在回归和分类中都可以使用标记向量。对于二元分类来说,label可以为0(代表negative)或者1(代表positive)。对于多元分类问题,标记可以使用从0开始的序号:0,1,2……
一个标记向量可以使用case类LabeledPoint来存储和呈现。
- import org.apache.spark.mllib.linalg.Vectors
- import org.apache.spark.mllib.regression.LabeledPoint
- /* 创建一个有正标记和稠密特征向量的标记点 */
- val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
- /* 创建一个有负标记和稀疏特征向量的标记点 */
- val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
(3)稀疏数据
常见的情况是使用稀疏数据训练模型。下面介绍稀疏数据格式。MLlib支持读取LIBSVM格式(一种文本格式)的训练数据,这种数据默认被LIBSVM和LIBLINEAR使用。文件的每一行代表一个被标记的稀疏特征向量,它的格式请参考:
- label index1:value1 index2:value2 ...
默认序号索引是从1开始并且是升序的,加载之后,特征的序号被转换为从0开始。
可以使用MLUtils.loadLibSVMFile方法读取存储为LIBSVM格式的训练数据。
- import org.apache.spark.mllib.regression.LabeledPoint
- import org.apache.spark.mllib.util.MLUtils
- import org.apache.spark.rdd.RDD
- val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
(4)本地矩阵
一个本地矩阵也存储double型的数据,使用整型的行号和列号。MLlib支持稠密矩阵,稠密矩阵的值存储在一个单独的double数组中,以列序为主序存储。例如,下面的矩阵。
- (1.0 2.0)
- (3.0 4.0)
- (5.0 6.0)
底层存储是以一维数组中存储数据(如[1.0,3.0,5.0,2.0,4.0,6.0])和存储矩阵的行列大小,如(3,2)。在Spark 1.0只提供稠密矩阵,官方将在下一个版本提供稀疏矩阵的实现。本地矩阵的基本类是Matrix,官方目前提供了一种实现:DenseMatrix。官方推荐使用Matrices类中的工厂方法来创建本地矩阵。
- import org.apache.spark.mllib.linalg.{Matrix, Matrices}
- /* 创建一个密集矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))*/
- val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
2.分布式矩阵
一个分布式的矩阵存储的是double类型的数据,底层采用一个或者多个RDD存储,行列序号采用long型存储。存储分布式大数据量矩阵的关键问题是选取正确的数据格式。将一个分布式矩阵转换为另一种不同的格式需要一个全局的Shuffle操作。在分布式环境下,Shuffle开销很大。官方已经实现了3种类型的分布式矩阵,将会在未来提供更多类型矩阵的实现。分布式矩阵的基本类型是RowMatrix。
一个RowMatrix是一个面向行的分布式矩阵。例如,一个特征向量集合的底层实现是一个RDD,RDD的每个元素是一个本地特征向量。在这种情况下,对RowMatrix来说是假设用户的特征矩阵维数不高。一个IndexedRowMatrix和RowMatrix相似,但是会存储行的序号,行序号可以确定行以及有助于进行连接操作。一个CoordinateMatrix是一个分布式矩阵以coordinate list(COO)格式存储,底层也是以存储它的数据项的一个RDD实现的。
注意:因为已经缓存了整个矩阵数据大小的空间,分布式矩阵底层实现的RDD必须是确定的。如果用非确定性的RDD,就很容易出错。
(1)行矩阵
一个行矩阵是面向行存储的分布式矩阵,底层是一个以本地行向量为数据项的一个RDD。由于每个行是一个本地向量,以long型为行序号,所以向量的维数会受数据类型范围限制,但在实际情况下,向量维数会小于这个范围。
一个RowMatrix可以从RDD[Vector]实例创建,然后可以计算行列来统计数据。
- import org.apache.spark.mllib.linalg.Vector
- import org.apache.spark.mllib.linalg.distributed.RowMatrix
- val rows: RDD[Vector] = ... // an RDD of local vectors
- /* 通过 RDD[Vector]创建一个行矩阵*/
- val mat: RowMatrix = new RowMatrix(rows)
- /* 获取矩阵大小*/
- val m = mat.numRows()
- val n = mat.numCols()
(2)行索引矩阵
一个行索引矩阵(indexed Row matrix)的底层实现是一个带行索引的RDD,这个RDD每行是一个长整型的索引和本地向量。一个行索引矩阵可以通过RDD[IndexedRow]的RDD创建,一个indexed row是一个元组(Long,Vector)的封装,Long代表索引,Vector代表本地向量。一个indexed row matrix通过消除行索引可以转换为row matrix。
(3)坐标矩阵
坐标矩阵(coordinate matrix)是一个分布式矩阵,其底层实现也是使用一个RDD存储它的数据项。每个数据项是一个元组(i:Long,j:Long,value:Double),其中,i是行序号索引,j是列序号索引,value是数据项的值。一个坐标矩阵适用于矩阵的行列维度都很大,但矩阵数据很稀疏的情况。
一个坐标矩阵可以从RDD[MatrixEntry]实例创建,matrix entry是一个(Long,Long,Double)的封装。一个coordinate matrix可以通过调用方法toIndexedRowMatrix被转换成含有稀疏行的index row matrix。在Spark MLlib 1.0中,官方不提供针对coordinate matrix的其他计算方法。
[1] 参见Spark官网:https://spark.apache.org/docs/latest/mllib-basics.html。