第 12 章 大数据处理
当今社会数据量呈指数级增长,这些数据来自于用户行为监测系统、分布式系统、网络分析、传感器等。移动互联网的迅速发展又带来移动数据的增长,此外,下一个大潮流——物联网(Internet of Things,IoT)又会进一步提升数据增长速度。
挖掘大数据需要有新思路。运行时间很长的复杂算法要么改进要么抛弃,而能够处理更多数据,复杂程度相对简单的算法正变得更加流行起来。例如,支持向量机是很好的分类器,但是它的一些变种很难在大型数据集上使用。相反,逻辑回归等相对简单的算法处理大数据集更容易。
本章主要内容如下:
大数据的挑战及其应用
MapReduce范式
Hadoop MapReduce
在亚马逊平台上运行MapReduce程序的Python包mrjob
12.1 大数据
大数据具有哪些特点呢?它的大多数支持者认为有以下4个特点,简称4V。
(1) 海量(Volume):我们产生和存储的数据量加速增长,未来势头看起来会更猛。现在常用的硬盘容量单位为GB,再过几年就会变为EB1。同时网络吞吐量也会增长。信噪比不容乐观,重要信息很可能被海量重要程度低的信息湮没。
1EB(exabyte),艾字节,1EB=1024PB,1PB=1024TB,1TB=1024GB。1EB约等于10亿GB。——译者注
(2) 高速(Velocity):数据量增长的同时,数据处理速度在加快。就拿现如今的汽车举例子,每辆车装有成百上千个传感器,这些传感器不停地向汽车内置的计算机传送数据,要完成操纵汽车的任务,数据的分析处理速度需要达到次秒级水准。这不仅要从大量数据中寻找答案,还得速度快。
(3) 多样(Variety):数据集有多种形式,各列清晰定义的规整数据集只占其中很少一部分。想想社交网站的一条消息吧,它可能包含文本、照片、提及其他用户2、喜欢数、评论数、视频、地理位置信息等。简单地忽略掉这些难以整合进模型的数据势必会导致信息丢失。
2指“@”加用户名这种形式。——译者注
(4) 准确(Veracity):随着数据量的增加,很难确定采集到的数据是否正确——是否过时、充满噪音,有没有包含异常数据,或者总体来说是否有用等。倘若人们无法对数据的准确性进行有效验证,要信赖数据很难。人们从外部得到的数据集被不停地整合到内部数据集中,使得数据的准确性这个问题更加突出。
从以上四点(也有人认为是五点3)就能看出大数据不仅仅是大量的数据,处理大数据对工程能力要求很高——更不用说对它们进行分析了。很多“蛇油商人”4只关心怎样夸大大数据的应用,却避而不谈大数据工程的挑战以及潜在分析工作的难度。
3有人认为大数据的另一个特点是价值(Value),经过挖掘得到的结论有实际应用价值。——译者注
4蛇油商人(snake oil salesman)跟“江湖郎中”有相同的贬义内涵。19世纪,国人赴美修建北太平洋铁路,带去从南方水蛇体内提炼的蛇油,以祛风湿,缓筋骨之痛,颇受当地人欢迎。后美国商人Clark Stanley等改从北美响尾蛇提炼蛇油,批量生产,但效果较差,甚至出现产品中压根不含蛇油的造假现象。该词遂被用来指打着不容置疑的旗号,推销假冒伪劣谋求私利的骗子。——译者注
我们前面用过的这些算法都是把数据集加载到内存后再进行处理。这对于提升计算速度很有好处,因为处理已经在内存中的数据比起先从硬盘加载到内存后再处理要快得多。此外,我们可以对内存中的数据进行多次迭代,改善模型。
对于大数据,我们就无法将其加载到内存中。从很多方面来讲,可以用此来判断一个问题能不能称得上大数据问题——如果数据可以加载到你计算机内存中,那就算不上大数据。
12.2 大数据应用场景和目标
企业和个人都离不开大数据。
所有以大数据为基础的系统中,人们与之打交道最多的当属谷歌这样的搜索引擎。对于搜索引擎来说,要在零点几秒的时间内对几十亿甚至更多的网站完成一次搜索,基础的文本查询肯定满足不了需求,单是存储这么多网站的文本内容就是个大难题。要完成搜索引擎中的查询任务,就需要专门创建新数据结构和使用新数据挖掘方法。
很多其他科学实验也都用到了大数据技术,例如大型强子对撞机(Large Hadron Collider)。该机器长达27公里,装有1.5亿个传感器,用于监测每秒几亿个强子的对撞情况,对撞机局部照片请见下图。对撞实验产生的数据量十分惊人,每天过滤后还有25PB(假如不过滤,每年将产生1.5亿PB数据)。这些数据隐藏着宇宙的奥秘,对其进行分析能加深我们对宇宙的认识,但是工程量很大,分析起来也相当困难。
政府部门对大数据的重视程度也在逐渐增加,以充分了解人口、商业活动和国家其他各方面的情况。跟踪几千万乃至几亿人口及成百上千亿的交易量(商贸或医疗开支),很多政府机构都需要依靠大数据分析技术才能完成。
全球很多政府尤其关注交通管理,他们使用数以千万计的传感器判断哪些道路堵塞最为严重,预测新修道路对交通状况的影响。
大型零售商使用大数据改善客户体验,降低支出。这包括预测客户需求,保证供货量适度,向客户推销他们可能喜欢的产品,跟踪交易数据,寻找购物潮流和消费模式,发现潜在欺诈行为。
还有很多大公司用大数据提高公司经营管理的自动化程度,改善产品和服务质量。他们通过大数据分析预测行业风向,跟踪外部竞争者。他们还把大数据用到员工管理上——跟踪员工,发现其离职倾向以及时干预。
信息安全部门通过监测网络流量,结合大数据技术,寻找大型网络的恶意软件感染,这包括寻找奇怪的流量模式,恶意软件传播迹象和其他反常现象。高级持续性威胁(Advanced Persistent Threats,APTs)攻击也很令人抓狂,攻击者把恶意代码藏在大型网络中,长期从网络中窃取信息或从事其他破坏活动。寻找APTs往往需要检查很多台计算机进行取证,费时费力,仅靠人工很难完成,这时就可以使用大数据技术实现自动化检查、取证。
大数据方兴未艾,正在越来越多的行业和应用中大显身手。
12.3 MapReduce
大数据挖掘和计算有几个主要概念,其中最为流行的就是MapReduce模型,它可用于任意大数据集的一般性计算任务。
谷歌出于并行计算的需要,最先提出了MapReduce模型。它还引入了容错和可伸缩特性。MapReduce最初的研究发表于2004年,打那时起,成千上万的项目、实现和应用都用到了这一概念。
虽然MapReduce跟之前很多概念有相似之处,但这不影响它成为大数据分析领域的核心概念。
12.3.1 直观理解
MapReduce主要分为两步:映射(Map)和规约(Reduce)。函数式程序设计中有把函数映射到列表和规约结果的概念,MapReduce以这两个概念为基础。为了解释映射和规约,我们举个例子,编写代码遍历一串列表,计算所有列表中数字之和。
MapReduce范式还包括排序(shuffle)和合并(combine)两步,后面会讲。
首先,在映射这一步,接收一个函数,用这个函数处理列表的各元素,返回跟之前列表长度相等的列表,新列表的元素为函数的返回结果。
打开一个新的IPython Notebook笔记本文件,创建一列表,列表中各元素为包含几个数字的列表。
a = [[1,2,1], [3,2], [4,9,1,0,2]]
接着,建立sum
函数和a
之间的映射关系。这一步会用sum
函数处理列表a
的每一个元素。
sums = map(sum, a)
上述sums
为生成器(在我们调用它之前,不会进行计算),上面这行代码大体上等价于:
sums = []
for sublist in a:
results = sum(sublist)
sums.append(results)
规约步骤要稍微复杂些,需要对返回结果的每一个元素应用一个函数,从初始值开始,对初始值和第一个值应用指定函数,得到返回结果,然后再对所得到的结果和下一个值应用指定函数,以此类推。
我们来创建一个函数,接收两个数字作为参数,返回两个数字的和。
def add(a, b):
return a + b
然后进行规约。规约函数形式为reduce(function, sequence, initial)
,参数分别表示用来进行规约的函数的名字、列表和初始值,函数即是对序列的每一步所应用的函数。在第一步,第一个值为初始值,而不是列表的第一个元素。
from functools import reduce
print(reduce(add, sums, 0))
结果为25,它是sums
列表中每个元素的和,也就是原来大列表a
中每个二级列表各元素的和。
上面代码等价于如下代码。
initial = 0
current_result = initial
for element in sums:
current_result = add(current_result, element)
我们这个小例子,代码很简短,但真正的好处是可以使用分布式计算。假如,二级列表的数量为100万,每个二级列表包含100万个元素,对于这么大的计算任务,我们可以交由多台计算机完成。
为了实现分布式计算,我们可以在映射这一步把各个二级列表及函数说明分发到不同的计算机上。计算完成后,各计算机把结果返回主计算机(master)。
然后master把结果发送给另一台计算机做规约。我们的例子有100万个二级列表,因此可以将100万个任务交给不同的计算机处理(计算机完成映射操作后,可再次用于规约)。返回结果为包含100万数字的列表,然后就可以进行求和运算。
这样做的好处是,尽管原始数据集有1万亿数字,但实际计算过程,哪台计算机都不需要存储超过100万个数字。
12.3.2 单词统计示例
MapReduce的实现比起单纯使用映射和规约两步要复杂些。这两步都用键来调用,便于数据的区分和值的跟踪。
映射函数接收一键值对,返回键值对列表。接收和输出的键不一定要彼此相关。例如,统计单词数量的MapReduce程序,输入的键可能是文档编号,而输出的键却可以是单词。输入值可能是文档的文本内容,而输出值为每个单词的词频。
from collections import defaultdict
def map_word_count(document_id, document):
首先,计算每个单词词频。在这个简化过的例子中,我们先根据空格把文档转换成单词列表,当然有更好的解决方法。
counts = defaultdict(int)
for word in document.split():
counts[word] += 1
遍历每个单词,统计次数,注意用到了yield
语句。用MapReduce的术语来说,单词为键,单词出现次数为值。
for word in counts:
yield (word, counts[word])
用单词做键,我们就可以进行shuffle操作,把每个键所有值聚集到一起。
def shuffle_words(results):
首先,把每个单词的统计结果放到一个列表中。
records = defaultdict(list)
遍历映射函数返回的所有结果。
for results in results_generators:
for word, count in results:
records[word].append(count)
接着,遍历每个单词,创建生成器,它能生成(yield)单词和该单词在各文档出现次数的列表这两项。
for word in records:
yield (word, records[word])
最后一步是规约,接收一键值对(值为列表),输出另外一键值对。我们这里,键为单词,输入的列表为shuffle后得到的作为键的单词在不同文档出现次数的列表,输出键值对中的值这一项为单词(键)在所有文档的出现次数之和。
def reduce_counts(word, list_of_counts):
return (word, sum(list_of_counts))
我们使用scikitlearn
提供的来自20个新闻组的语料看下上述代码的实际效果。
from sklearn.datasets import fetch_20newsgroups
dataset = fetch_20newsgroups(subset='train')
documents = dataset.data
然后执行映射操作,这里用enumerate
函数自动为文档生成编号。编号这里用处不大,但是在其他应用中很重要。
map_results = map(map_word_count, enumerate(documents))
上述结果只是生成器,而不是实际的统计结果。也就说,它是能输出键值对(单词、出现次数)的生成器。
接着,对生成器进行shuffle操作,根据单词出现次数进行排序。
shuffle_results = shuffle_words(map_results)
上述例子本质上来说是一个MapReduce任务;然而由于只使用单线程,我们无法从MapReduce数据格式中获得任何好处。下节,我们将使用开源的MapReduce架构Hadoop,实现分布式计算,提升性能。
12.3.3 Hadoop MapReduce
Hadoop是Apache基金会所提供的一组包括MapReduce在内的开源工具。人们实际使用的MapReduce架构也多是它。Hadoop项目由Apache基金会管理(他们也负责维护著名的Web服务器软件Apache)。
Hadoop生态系统很复杂,有大量的工具。我们将要用到的主要组件为Hadoop MapReduce。Hadoop其他处理大数据的工具有如下几种。
Hadoop分布式文件系统(Hadoop Distributed File System, HDFS):该文件系统可以将文件保存到多台计算机上,以防范硬件故障,提高带宽。
YARN:用于调度应用和管理计算机集群。
Pig:用于MapReduce的高级语言。Hadoop MapReduce用Java语言实现,Pig对Java实现做进一步封装,支持用其他语言来编写程序——包括Python。
Hive:用于管理数据仓库和进行查询。
HBase:对谷歌分布式数据库BigTable的一种实现。
这些工具都是用来解决包括分析过程在内的大数据实验所能遇到的各种问题。
还有其他一些没有使用Hadoop工具集实现的MapReduce框架,也有一些具有相似目标的项目。此外,很多云平台都提供以MapReduce为基础的系统。
12.4 应用
我们来建立一个根据博主用词习惯判断博主性别的应用。我们用MapReduce方法训练朴素贝叶斯分类器。最后用模型做预测时,不需要MapReduce,虽然我们可以用映射这个步骤——也就是对列表中的每一篇文档运行预测模型。这是用MapReduce进行数据挖掘常用的映射操作,规约步骤只用来调整预测结果列表,以便把结果和原文档对应起来。
我们将使用亚马逊云平台运行应用,以利用它们的计算资源。
12.4.1 获取数据
我们使用2004年8月份从http://blogger.com网站采集的60多万篇博客文章作为数据集,这些文章共有1.4亿多个单词,每篇文章都标注了博主的年龄、性别、行业(工作),有趣的是还有星座。研究人员曾进行过部分验证,确保同一篇博客从头到尾均由一名博主所写(虽然也不是很确定)。每篇博客还给出了发表时间,这个数据集的内容真是挺丰富的。
访问http://u.cs.biu.ac.il/~koppel/BlogCorpus.htm网站,点击Download Corpus。下载完成后,把数据集文件解压到你的本地计算机数据文件夹Data里。
数据集中,一个博主的所有博客放到同一个文件里,文件名包含博主的相关信息。例如,其中一个文件名如下:
1005545.male.25.Engineering.Sagittarius.xml
文件名用点号分隔,主要包括以下信息。
博客编号:标识博客唯一性的数值。
性别:该部分不是male(男性)就是female(女性)。数据集只包含这两种情况。
年龄:给出确切的年龄,但有时会特意使用年龄段。年龄段(包含起止年龄)有13~17、23~27和33~48。这样做是便于把难以确定博主年龄的博客粗略归到某一年龄段中,因为很难把18岁写的博客和19岁所写的区分开来,此外,当你使用这些数据时,博主实际年龄可能比早先填写的又长了几岁,因此需要做相应调整。5
行业:包括科学、工程、艺术、房地产在内的40种行业的其中一种,若博主没有填写该项,使用indUnk6来表示。
星座:12星座之一。
5作者给我举了个例子,“去年用户发表博客时为18岁,今年就19岁了,我需要在模型中做相应调整。”——译者注
6industry unknown的简写。——译者注
所有的这些数据都是用户自己提供的,这就表明其中有错误或不一致现象,但是绝大多数是可靠的——如果用户为了保护个人隐私起见,不想透露某方面的信息,他们可以选择不填,这一点网站是允许的。
每个文件的格式类似于XML,包含
标签和一系列
标签。每个
标签前都有一个
标签。虽然我们可以把它当作XML来处理,但是按行处理更容易,因为它不是真正规范的XML文件,并且还有些错误(大部分是编码错误)。我们可以使用循环结构遍历文件中的每一行,以读取博客内容。
指定一个文件名,我们来实际看下。
import os
filename = os.path.join(os.path.expanduser("~"), "Data", "blogs",
"1005545.male.25.Engineering.Sagittarius.xml")
首先,创建用于存储每篇博客的列表。
all_posts = []
然后,打开要读取的文件。
with open(filename) as inf:
设置标识是否在博客中的标记。找到博客开始标签
后,将标记的值设置为True
,表示找到了博客的开始位置。找到关闭标签后,将标记的值设置为
False
。
post_start = False
创建用于存储博客当前行内容的列表。
post = []
遍历文件的每一行,删除该行前后的空格。
for line in inf:
line = line.strip()
照前面所说,找到博客的开始、关闭标签后,更改标记post_start
的值。
if line == "<post>":
post_start = True
elif line == "</post>":
post_start = False
找到关闭标签后,记录刚找到的这一篇博客的所有内容。创建新的post列表。下面两行代码的缩进字符数与前一行代码相同。
all_posts.append("\n".join(post))
post = []
最后,如果该行不是博客结尾,也就是还在博客里面,把这一行加到当前博客post列表的最后。
elif post_start:
post.append(line)
如果没有在博客中,忽略这一行。
可以像下面这样获取每篇博客的文本内容。
print(all_posts[0])
我们还可以计算出一位博主共发表了多少博客。
print(len(all_posts))
12.4.2 朴素贝叶斯预测
我们来实现朴素贝叶斯算法(从技术上讲,这是一个简化的版本,复杂实现通常有很多特征)处理博客博主性别分类问题。
1. mrjob包
把用mrjob创建的MapReduce任务推送到亚马逊云平台上很容易。mrjob听上去有点像奇先生7系列儿童故事书的画蛇添足之作,实则表示映射规约任务(Map Reduce Job)。这个包很有用,但是写作本书时,对Python 3的支持还不成熟,对我们后面要讲到的亚马逊EMR服务支持也不够好。
7英国儿童文学作家罗杰·哈格里维斯(Roger Hargreaves)为儿童创作的《奇先生妙小姐》系列图书中的人物形象,比如有荒唐先生、傲慢先生、邋遢先生等。——译者注
你可以使用下面命令安装mrjob的Python 2版本:
sudo pip2 install mrjob
注意使用
pip2
而不是pip3
。
本质上讲,mrjob提供了大部分MapReduce任务所需的标准功能。最令人惊异的特性是,你可以编写同一套代码,既能在没有安装Hadoop的本地计算机上进行测试,测试完后,还可以直接把代码提交到亚马逊的EMR或其他Hadoop服务器上。
这样,测试起代码来很容易,虽然它也不可能神奇地把大问题变成小问题——注意任何本地测试,只是使用一小部分而不是全部数据。
2. 抽取博客内容
首先创建一个MapReduce程序,从每位博主的博客文件中抽取所有博客,分别存储。因为最终要预测博主的性别,我们还需要抽取性别信息,跟博客一起存储。
我们这回没法用笔记本文件,因此使用Python IDE进行开发。如果你没有装Python IDE(比如PyCharm),你可以使用文本编辑器。建议你使用具有代码高亮功能的IDE。
如果你找不到好用的IDE,你可以在笔记本文件中编写代码,然后点击(或选择)File | Download As | Python把代码保存成.py文件,然后再运行。第11章提到过这种方法。
因为需要获取环境变量,我们会用到os
,单词拆分时会用到正则表达式,一并导入re
。
import os
import re
接着导入MRJob
类,我们的MapReduce任务继承自它。
from mrjob.job import MRJob
然后创建MRJob
的子类。
class ExtractPosts(MRJob):
我们使用跟之前类似的循环从文件中抽取博客内容。我们即将定义的映射函数将处理每一行,跟踪不同博客的任务要在映射函数外完成。因此,在函数外而不是在函数内部声明post_start
和post
两个变量。
post_start = False
post = []
然后创建映射函数——从文件中取一行作为输入,最后生成一篇博客的所有内容。每一行都来自同一任务所在处理的文件。这样,我们就可以使用上面创建的变量保存当前博客的内容。
def mapper(self, key, line):
开始采集博客内容之前,我们还要获取到博主的性别。虽然通常我们不会把文件名作为MapReduce任务的一部分,但是这里确实要用到。当前的文件名称以环境变量的形式存储,用下面代码就能获取到。
filename = os.environ["map_input_file"]
用点号切分文件名,获取性别(第二个字符串)。
gender = filename.split(".")[1]
删除一行开头和结尾处的空格(博客中空格很多),然后查找文件开头和结尾。
line = line.strip()
if line == "<post>":
self.post_start = True
elif line == "</post>":
self.post_start = False
之前我们把博客保存到列表里,而这次我们用yield
,便于mrjob跟踪输出:博主性别和博客内容,这样博主性别和博客就能一一对应起来。函数余下部分跟前面用到的循环中的代码一致。
yield gender, repr("\n".join(self.post))
self.post = []
elif self.post_start:
self.post.append(line)
最后,在函数和类的外面,编写下面语句,以便从命令行运行代码时执行MapReduce任务。
if __name__ == '__main__':
ExtractPosts.run()
现在,可以在命令行输入下面的shell命令运行MapReduce任务。注意使用Python 2而不是Python 3。
python extract_posts.py <your_data_folder>/blogs/51* --outputdir=<your_data_folder>/blogposts –nooutput
第一个参数
(注意把
替换为你的数据文件夹的全路径)指定所使用的数据集(所有以51开始的文件,只有11个)。接着指定输出文件夹,即用来保存博客内容的文件夹。最后指定不要在命令行输出内容。否则,程序运行期间,输出的数据将显示在命令行——对我们来说没多大用处,还会降低程序运行速度。
运行上述代码,每篇博客内容很快就会被抽取出来并保存到指定的输出文件夹。在本地计算机上,只使用一个线程,所以速度提升有限,但是我们知道代码确实可以运行了。
我们看下输出文件夹,里面有有一系列文件,文件的每一行为博主的性别及一篇博客的内容。
3. 训练朴素贝叶斯分类器
既然已经抽取了博客内容,接下来就可以用它们训练朴素贝叶斯模型。根据直觉,我们可以这样做,分别统计女性博主和男性博主使用每个单词的概率。对博客进行分类时,我们分别计算博客8博主为女性的概率和博主为男性的概率,选取概率较大的,作为最终类别。
8一篇博客,其博主为女性的概率有多大?计算方法为,从语料中统计到该博客中每个单词出现在女博主所写博客中的概率,然后求出这些概率的积。其中会涉及到数值下溢、博客中某一单词在语料中没有出现等问题,这些问题需要特殊处理。下文也会讲到。——译者注
我们来编写代码,输出所有文件中每个单词及女博主和男博主使用该词的频率。输出文件如下所示:
"'ailleurs" {"female": 0.003205128205128205}
"'air" {"female": 0.003205128205128205}
"'an" {"male": 0.0030581039755351682, "female": 0.004273504273504274}
"'angoisse" {"female": 0.003205128205128205}
"'apprendra" {"male": 0.0013047113868622459, "female":
0.0014172668603481887}
"'attendent" {"female": 0.00641025641025641}
"'autistic" {"male": 0.002150537634408602}
"'auto" {"female": 0.003205128205128205}
"'avais" {"female": 0.00641025641025641}
"'avait" {"female": 0.004273504273504274}
"'behind" {"male": 0.0024390243902439024}
"'bout" {"female": 0.002034152292059272}
输出结果每一行的第一个值为单词,第二值为一个字典,字典的键为性别,字典的值为给定性别使用该词的频率。
用Python IDE或文本编辑器新建一个文件,再次要用到os
和re
,除此之外还要用到NumPy和MRJob。因为要对字典进行排序,所以还用到itemgetter。
import os
import re
import numpy as np
from mrjob.job import MRJob
from operator import itemgetter
我们还需要用MRStep管理MapReduce中的每一步操作。前面的MapReduce任务只有由映射函数和规约函数组成的一个步骤。我们当前这个任务分三步:映射、规约、再次映射和规约。是不是感觉跟前几章我们用到的流水线一样,前一步的输出将作为下一步的输入。
from mrjob.step import MRStep
接着创建用于匹配单词的正则表达式,并对其进行编译,我们用它来查找单词的边界。这种类型的正则表达式比起前面用过的split
方法更加强大,如果你需要更加精确的单词切分工具,建议你使用第6章用到的NLTK库。
word_search_re = re.compile(r"[\w']+")
创建一个新类,用于训练朴素贝叶斯分类器。
class NaiveBayesTrainer(MRJob):
定义MapReduce任务的各个步骤,一共分为两步。第一步抽取单词出现的概率。第二步,比较一个单词在男女博主所写博客出现的概率,选择概率较大的性别作为分类结果,写入输出文件。在上述每一步(MRStep
)中,定义映射和规约函数,它们是NaiveBayesTrainer
类里面的方法(后续会编写这两个函数)。
def steps(self):
return [
MRStep(mapper=self.extract_words_mapping,
reducer=self.reducer_count_words),
MRStep(reducer=self.compare_words_reducer),
]
第一个函数是第一步中的映射函数。这个函数的目标是接收一条博客数据,获取里面的所有单词,因为我们想要得到单词的频率,所以每个单词返回1 / len(all_words)
,便于后面加总求词频。这样计算并不精确——我们需要根据文档数量做归一化处理。但由于数据集中两个类别的文档数相同,因此不做归一化处理对最终结果影响也很小。
输出博主的性别,后面会用到。
def extract_words_mapping(self, key, value):
tokens = value.split()
gender = eval(tokens[0])
blog_post = eval(" ".join(tokens[1:]))
all_words = word_search_re.findall(blog_post)
all_words = [word.lower() for word in all_words]
all_words = word_search_re.findall(blog_post)
all_words = [word.lower() for word in all_words]
for word in all_words:
yield (gender, word), 1. / len(all_words)
上面代码使用
eval
函数简化对每条博客数据的处理,直接把字符串转换为Python列表,但不建议这么做。相反,应该使用JSON等格式存储数据,然后用相应包进行解析。如果在访问数据集的代码中使用eval
语句,攻击者可借此插入恶意代码,并在你的服务器上运行。
在第一步的规约函数中,汇总每个性别使用每个单词的频率。我们还把键改为单词,而不是单词和性别的组合,因为在最后训练得到的模型中,我们要根据单词进行查询(虽然还要输出性别以供后面使用)。
def reducer_count_words(self, key, frequencies):
s = sum(frequencies)
gender, word = key
yield word, (gender, s)
最后一步不需要映射函数,因此我们就没有添加它。数据将作为一致性映射(identity mapper)类型直接传入到规约函数中,而规约函数将会把每个单词在所有文章中出现频率按照性别汇集到一起,输出单词及频率字典。
这正是朴素贝叶斯分类器所需要的信息。
def compare_words_reducer(self, word, values):
per_gender = {}
for value in values:
gender, s = value
per_gender[gender] = s
yield word, per_gender
最后,添加以下代码,以便直接运行代码时,训练朴素贝叶斯模型。
if __name__ == '__main__':
NaiveBayesTrainer.run()
运行上述代码,它的输入即是前面博客抽取代码的输出(实际上可以把它们作为前后相连的步骤,放到同一个MapReduce任务中)。
python nb_train.py <your_data_folder>/blogposts/
--outputdir=<your_data_folder>/models/
nooutput
上述代码运行结束后,该MapReduce任务的输出结果保存到位于输出文件夹中的一个文件里,输出结果为运行朴素贝叶斯分类器所需的概率信息。
4. 组装起来
我们现在就可以使用这些概率来运行贝叶斯分类器。我们将在笔记本文件里编写代码,可以再次使用Python 3喽!
首先,查看上一个MapReduce任务所生成的模型文件夹。如果输出文件多于一个,在命令行切换到模型文件夹下,使用下面命令把它们追加到model.txt后面。
cat * > model.txt
运行上述命令后,记得把下面用到的模型所在的文件名改为model.txt。
回到笔记本文件,导入接下来会用到的几个包。
import os
import re
import numpy as np
from collections import defaultdict
from operator import itemgetter
重新定义用于查找单词的正则表达式——在实际应用中,不要忘记这点,注意训练和测试时应该使用相同的正则表达式来抽取单词。
word_search_re = re.compile(r"[\w']+")
接着,声明从指定文件名中加载模型的函数。
def load_model(model_filename):
模型的参数是一个元素为字典的字典,各元素的键为单词,值(内部字典)为由每个性别及其概率组成的键值对。我们使用defaultdict
,如果键不存在的话,将返回0。
model = defaultdict(lambda: defaultdict(float))
打开模型所在文件,解析每一行。
with open(model_filename) as inf:
for line in inf:
用空格作为分隔符,将模型的每一行切分成两部分。第一部分为单词自身,第二部分为概率字典。对于每一部分,使用eval
函数获得实际的值,它们之前是使用repr
函数存储的。
word, values = line.split(maxsplit=1)
word = eval(word)
values = eval(values)
在模型中,为单词和概率字典建立起映射关系。
model[word] = values
return model
接着,加载实际的模型。你可能需要修改模型的文件名——它存储在上一个MapReduce任务的输出文件夹中。
model_filename = os.path.join(os.path.expanduser("~"), "models",
"part-00000")
model = load_model(model_filename)
举例来说,我们可以像下面这样查看不同性别使用单词i(执行MapReduce任务时,所有单词中的字母全部转换为小写)的情况。
model["i"]["male"], model["i"]["female"]
接着,创建使用模型做预测的函数。这里我们不再使用scikitlearn
接口,只是创建一个简单的函数。这个函数接收模型和一篇文档作为参数,返回对博主性别的预测结果,函数声明如下:
def nb_predict(model, document):
先来创建一个字典,键值分别为每个性别及概率9。
9不同性别用词有自己的偏好。此处的概率指的是,待预测文档每一个单词由给定性别所使用的概率的乘积。——译者注
probabilities = defaultdict(lambda : 1)
从文档中抽取每一个单词。
words = word_search_re.findall(document)
遍历每一个单词,找出数据集中每个性别使用该单词的概率。
for word in set(words):
probabilities["male"] += np.log(model[word].get("male", 1e-15))
probabilities["female"] += np.log(model[word].get("female", 1e-15))
根据概率对性别进行排序,以概率最高的性别作为预测结果返回。
most_likely_genders = sorted(probabilities.items(),
key=itemgetter(1), reverse=True)
return most_likely_genders[0][0]
注意,我们使用np.log
计算概率。朴素贝叶斯模型中,概率值往往很小。把这些很小的数连乘起来会导致数值下溢,由于计算机精度不够,最终结果将为0。在我们这里,就会出现博主为男性或女性的概率都为0的情况,预测结果自然不正确。
为了解决数值下溢问题,我们使用对数概率。对于两个数a和b,log(a,b)
等于log(a) + log(b)
。数值很小的数取对数,得到一个负数,但相对还比较大。例如log(0.00001)
约等于-11.5。这表明与其冒着数值下溢的风险,把多个概率连乘,还不如使用对数概率求和,然后比较两类的和(和越大,可能性越高)。
使用对数概率有个问题就是无法处理0值(虽然,把多个为0的概率连乘也不能解决这个问题)。这是因为log(0)
没有定义。有些朴素贝叶斯算法,为每个特征的计数都加1以避免出现这个问题,当然还有别的解决方法。加一平滑是一种很简单的数据平滑方法。我们这里如果某个单词给定性别没有人用过,就给它赋一个很小的概率值。
再回到我们的预测函数,我们从数据集中复制一篇博客来做下测试。
new_post = """ Every day should be a half day. Took the afternoon
off to hit the dentist, and while I was out I managed to get my oil
changed, too. Remember that business with my car dealership this
winter? Well, consider this the epilogue. The friendly fellas at the
Valvoline Instant Oil Change on Snelling were nice enough to notice
that my dipstick was broken, and the metal piece was too far down in
its little dipstick tube to pull out. Looks like I'm going to need a
magnet. Damn you, Kline Nissan, daaaaaaammmnnn yooouuuu.... Today
I let my boss know that I've submitted my Corps application. The news
has been greeted by everyone in the company with a level of enthusiasm
that really floors me. The back deck has finally been cleared off
by the construction company working on the place. This company, for
anyone who's interested, consists mainly of one guy who spends his
days cursing at his crew of Spanish-speaking laborers. Construction
of my deck began around the time Nixon was getting out of office.
"""
用下面代码进行分类。
nb_predict(model, new_post)
分类结果为男性,分类正确。当然,千万不要只用一条数据进行测试。我们只使用文件名以51打头的文档来训练模型,所以不要指望有太高的正确率。
当务之急是用更多的文档来训练模型。我们使用所有文件名以6或7打头的文档做测试,在剩余文档上进行训练。
使用命令行,切换到博客文档所在的文件夹(cd
),复制文档到新文件夹中。
创建训练集文件夹。
mkdir blogs_train
然后,创建测试集文件夹。
mkdir blogs_test
把所有文件名以6或7打头的文档从训练集文件夹移动到测试文件夹。
cp blogs/6* blogs_train/
cp blogs/7* blogs_train/
我们需要返回训练集中所有博客的抽取结果。然而,计算量很大,最好使用云平台而不是本地计算机处理这个任务。因此,我们接下来使用亚马逊云平台解析博客内容。
跟之前一样,在命令行运行下面代码。唯一的不同点是训练集所在的文件夹路径。运行代码前,删除用于存储抽取结果和模型文件夹中的所有文件。
python extract_posts.py Datablogs_train --outputdir=/home/bob/
Data/blogposts –nooutput
python nb_train.py Datablogposts/ --outputdir=/home/bob/models/
nooutput
上述代码运行时间较长。
我们使用测试集中的所有博客进行测试。我们使用extract_posts.py
MapReduce任务抽取博客内容,但是记得把抽取结果保存到另外一个文件夹中。
python extract_posts.py ~Datablogs_test --outputdir=/home/bobData
blogposts_testing –nooutput
回到笔记本文件,获取到输出的所有测试文档的路径。
testing_folder = os.path.join(os.path.expanduser("~"), "Data",
"blogposts_testing")
testing_filenames = []
for filename in os.listdir(testing_folder):
testing_filenames.append(os.path.join(testing_folder, filename))
对于上面输出的每一篇测试文档,我们抽取博主性别和博客内容,然后调用预测函数进行分类。因为文档数量很多,我们不想占用太多内存,所以使用生成器来处理。生成器会给出实际性别和预测结果。
def nb_predict_many(model, input_filename):
with open(input_filename) as inf:
# remove leading and trailing whitespace
for line in inf:
tokens = line.split()
actual_gender = eval(tokens[0])
blog_post = eval(" ".join(tokens[1:]))
yield actual_gender, nb_predict(model, blog_post)
然后我们记录测试集中所有数据的预测结果和实际性别。预测结果不是男性就是女性。为了使用scikitlearn
的f1_socre
函数,我们需要把预测结果转换为0或1,如果结果为男性,我们记录0,反之为1。我们使用布尔测试,判断性别是否为女性,然后使用NumPy把布尔值转换为整型(int)。
y_true = []
y_pred = []
for actual_gender, predicted_gender in nb_predict_many(model, testing_
filenames[0]):
y_true.append(actual_gender == "female")
y_pred.append(predicted_gender == "female")
y_true = np.array(y_true, dtype='int')
y_pred = np.array(y_pred, dtype='int')
现在,我们使用scikitlearn
中的F1值评估分类结果。
from sklearn.metrics import f1_score
print("f1={:.4f}".format(f1_score(y_true, y_pred, pos_label=None)))
结果为0.78,还算可以。我们使用更多的数据也许能改善分类效果。但是,我们需要使用更强大的云平台才能处理更多数据。
5. 在亚马逊EMR云平台上训练模型
我们接下来使用亚马逊的EMR(Elastic Map Reduce)云平台完成博客抽取、解析和模型创建任务。
首先,需要在亚马逊的云存储(storage cloud)创建存储段(bucket)。具体做法是用浏览器打开亚马逊S3控制界面http://console.aws.amazon.com/s3,点击Create Bucket。记住存储段的名字,稍后会用到。
右键点击新建的存储段,选择Properties
。然后,修改权限,将其设置为对所有人开放。一般来说,这样做不安全,学完这一章后,请及时修改权限。
左键点击存储段,打开它,点击Create Folder,把新文件夹命名为blogs_train。我们将把训练集数据上传到该文件夹,以便在云端处理。
我们在本地计算机上使用亚马逊的AWS CLI命令行工具操作亚马逊云平台。
使用以下命令安装该工具。
sudo pip2 install awscli
按照网址中给出的说明,为CLI工具设置安全证书:http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html。
接下来需要上传数据到新建的存储段。首先,我们创建训练集,它包括所有除去文件名以6或7打头的文件。复制文件有很多种更为优雅的方式,但是考虑到平台兼容性,就不做推荐。相反,我们使用最牢靠的做法,复制所有的原始文件到训练集文件夹,把文件名以6、7打头的从训练集中删除。
cp -R Datablogs Datablogs_train_large
rm Datablogs_train_large/6*
rm Datablogs_train_large/7*
接着,上传数据到亚马逊S3存储段。注意上传数据需要时间较长(好几百兆)。网速慢的读者,找个网快的地方上传比较好。
aws s3 cp ~Datablogs_train_large/ s3://ch12/blogs_train_large
recursive -exclude "" --include ".xml"
接下来使用mrjob连接亚马逊EMR——它帮助我们处理所有任务;只需要给出安全证书即可。按照下面链接给出的说明,为mrjob设置安全证书:https://pythonhosted.org/mrjob/guides/emr-quickstart.html。
设置完成后,修改mrjob的运行方式,让它在亚马逊EMR上运行,改动之处很少。用-r
开关,告诉mrjob使用emr
。然后把输入、输出目录改为我们在S3上创建好的存储段目录。虽然使用亚马逊云平台,但是运行时间仍很长。
python extract_posts.py -r emr s3://ch12gender/blogs_train_large/
outputdir=s3://ch12/blogposts_train/ --nooutput
python nb_train.py -r emr s3://ch12/blogposts_train/ --outputdir=s3://
ch12/model/ --ooutput
这次使用当然也是要付费的,但只有几美元10而已。如果你要持续运行或做其他跟大数据相关的任务,还请留意下费用问题。有一次我运行大量任务,花了20美元。我们这里任务量要小一些,估计花不到4美元。你可以查看下账户余额,设置收费提醒:https://console.aws.amazon.com/billing/home。
10澳元和美元货币符号都为$。作者表示写这本书时,两者汇率差别不大,但目前拉大了,实际费用用美元来计算比较合适。——译者注
没必要创建blogposts_train
和存储模型文件的文件夹——EMR会自动创建。如果这两个文件夹存在,程序反而会报错。如果你再次运行代码的话,记得把这两个文件夹改成别的名字,但是要记得同时修改两条命令中的文件名(第一条命令中的输出目录是第二条命令的输入目录)。
如果等待时间过长,你有些厌烦,第一项任务运行一段时间后就可以停止。我建议至少运行15分钟后再停止,可能的话,最好1小时以上。你不能中断第二项任务,中断后结果好不到哪去。第二项任务的运行时间大约是第一项任务的两到三倍。
回到S3控制界面,从存储段中下载输出的模型文件,将其保存到本地,我们就可以在笔记本文件中使用新模型。重新输入以下代码——与之前的不同之处用粗体表示,其实只更新了模型文件路径。
aws_model_filename = os.path.join(os.path.expanduser("~"), "models",
"aws_model")
aws_model = load_model(aws_model_filename)
y_true = []
y_pred = []
for actual_gender, predicted_gender in nb_predict_many(aws_model,
testing_filenames[0]):
y_true.append(actual_gender == "female")
y_pred.append(predicted_gender == "female")
y_true = np.array(y_true, dtype='int')
y_pred = np.array(y_pred, dtype='int')
print("f1={:.4f}".format(f1_score(y_true, y_pred, pos_label=None)))
使用更多数据后,我们发现F1值为0.81,结果较之前有所改善。
实验顺利完成后,如果不想继续支付费用,记得从亚马逊S3删除存储段——存储也是要钱的。
12.5 小结
本章研究的是如何运行大数据处理任务。其实,从各方面标准来看,我们的数据集还是有点小——只有几百兆。很多行业实际数据集都比这大多了,对其进行计算就需要更强大的计算能力。此外,我们所使用的算法可根据具体的任务做进一步优化,以提升扩展能力。
为了预测博主的性别,我们从博客文章中抽取词频特征。我们借助mrjob包,用映射和规约方法抽取博客内容和词频。抽取完成后,训练朴素贝叶斯分类器,预测博主性别。
我们可以用mrjob包先在本地测试,然后使用亚马逊的EMR云平台。你也可以使用其他云平台,甚至定制亚马逊EMR集群运行MapReduce任务,但是需要更多的操作才能跑起来。