第 8 章 数据清洗:标准化和脚本化

你已经学习了数据的匹配和解析方法,以及如何寻找重复值,你已经开始探索数据清洗的奇妙世界。随着进一步理解你的数据集和你想要回答的问题,你需要考虑数据标准化和清洗自动化的问题。

本章我们将探索数据标准化的方法和时机,以及何时将数据清洗脚本化并对脚本进行测试。如果你管理的数据集是定期更新或新增数据的话,你需要使清洗过程尽可能高效清楚,这样你就可以将更多时间花在数据分析和撰写报告上。我们首先讲数据集的标准化(standardizing)和归一化(normalizing),以及如果数据集没有归一化应该怎么做。

8.1 数据归一化和标准化

数据集的标准化和归一化可能意味着利用当前数据计算新数据,也可能是对特定列或特定数据进行标准化或归一化,这取决于你的数据和所从事的研究类型。

从统计学的观点来看,归一化通常需要对数据集进行计算,使数据都位于一个特定的范围。比如说,你可能需要将测验成绩归一化到一定范围,这样你就可以准确查看成绩分布。你可能还需要对数据做归一化,以便准确查看百分位数,或不同群体(或世代)之间的百分位数。

假设你想查看某队在给定赛季得分的分布情况。你可能首先会将比赛分为赢、输、平三种情况。然后再进一步分为赢多少分、输多少分,等等。你还可以按比赛时长和每分钟得分数来分类。你可以访问所有这些数据集,现在你希望在球队之间进行对比。如果要对数据归一化,你可能会将总得分归一化到 0-1 区间。离群值(最高得分)将会接近于 1,较低得分将会接近于 0。然后你可以利用新数据的分布情况,查看有多少支球队的得分位于中游,在低分和高分区间是否有很多球队。你还可以找出离群值(比方说,如果大多数得分都在 0.3 和 0.4 之间,那么你就知道,没在这个范围内的得分可能就是离群值)。

如果想对同样的数据做标准化,应该怎么做呢?举个例子,你可以将数据标准化,计算出每分钟的平均得分。然后你可以将平均得分作图,查看分布情况。哪些球队每分钟得分较高?有没有离群值?

你还可以计算标准差来查看分布情况。在第 9 章中我们会更全面地介绍标准化,但主要问题就是:数据的正常范围是什么?这个范围之外都有哪些数据?数据有没有什么规律?

可以看出,归一化和标准化是不同的。但二者通常都可以让研究人员或调查人员确定数据的分布,并明白该分布对后续研究或计算的含义。

数据标准化和归一化有时还需要删除离群值,这样你才能更好地发现数据的规律和分布。回头看前面的球队例子,如果你从整个联赛中删除顶级得分球员的得分,球队的成绩是否发生了巨大的变化?如果一名球员得到了所在球队一半的得分,那么回答是“是的”,这会使球队成绩发生巨大的变化。

与此类似,如果某支球队总是大比分胜出,从联赛数据中剔除这支队伍,可能会大幅改变平均得分及其分布情况。你可以使用归一化、标准化和剔除离群值的方法来帮你找到问题的答案,这取决于你要解决的问题。

8.2 数据存储

我们已经讲过几种数据存储的方法,现在有了可用的数据,我们先来复习一下这些方法。如果你正在使用数据库、知道预期的表格格式,并想要保存已经清洗过的数据,那么你应该继续使用第 6 章讲过的 Python 库来连接数据库并保存数据。对于这些 Python 库中的大部分库,你都可以使用游标直接向数据库提交。

第 8 章 数据清洗:标准化和脚本化 - 图1 我们强烈建议在数据库脚本中添加错误信息,在遇到网络故障或数据库故障时可以捕获这些错误信息。我们建议频繁向数据库提交,这样可以避免网络问题或延迟问题影响脚本的运行。

如果你用的是第 6 章中讲过的 SQLite 例子,你需要将新的干净数据保存到你的数据库中。我们来看一下如何做到这一点:

  1. import dataset
  2. db = dataset.connect('sqlite:///data_wrangling.db')
  3. table = db['unicef_survey']
  4. for row_num, data in enumerate(zipped_data):
  5. for question, answer in data:
  6. data_dict = {
  7. 'question': question[1],
  8. 'question_code': question[0],
  9. 'answer': answer,
  10. 'response_number': row_num,
  11. 'survey': 'mn',
  12. }
  13. table.insert(data_dict)

❶ 这里我们访问本地数据库。如果你将文件保存到其他目录,一定要修改文件路径,将其修改为数据库文件相对于当前目录的位置(例如,如果数据库文件保存在上层目录中:file:///../datawrangling.db)。

❷ 本行代码创建一个新表:unicef_data。我们知道很多 UNICEF 调查都有相同的规律,所以我们这个数据库名是没有歧义、可复用的。

❸ 我们希望保存所在的行编号,这样每个回答都有一个编号。本行代码用到了 enumerate 函数,这样在数据库中可以找到(每一行 / 每一个回答的)每一条数据(它们的共用一个行编号)。

❹ 我们知道,我们的数据被分割成元组,标题列表是元组的第一个元素,问题回答是元组的第二个元素。本行代码利用 for 循环解析其中包含的数据并进行存储。

❺ 每一个问题和回答在数据库中都有对应的条目,所以我们可以将每行(即每次采访)所有的回答合并在一起。本行代码创建一个字典,其中包含每次采访中每个回答的必要数据。

❻ 标题列表中第二个元素是问题的详细说明。本行代码将其保存为 question,并将 UNICEF 问题代码保存为 question_code

❼ 为了记录每一行回答(或每一次采访),本行代码添加了 enumerate 函数得到的 rownum

❽ 最后,利用新表的 insert 方法将新字典插入我们的数据库中。

我们希望将清洗过的数据保存到 SQLite 数据库中。我们创建了一个新的数据库,用到了 enumerate 函数,这样我们可以合并每一个回答(每一行)。如果我们要访问数据,可以访问新表,利用第 6 章学过的函数来查看所有的数据记录,并在需要时进行检索。

如果你希望将清洗过的数据导出到简单文件中,应该也很容易做到。我们来看一下:

  1. from csv import writer
  2. def write_file(zipped_data, file_name):
  3. with open(file_name, 'wb') as new_csv_file:
  4. wrtr = writer(new_csv_file)
  5. titles = [row[0][1] for row in zipped_data[0]]
  6. wrtr.writerow(titles)
  7. for row in zipped_data:
  8. answers = [resp[1] for resp in row]
  9. wrtr.writerow(answers)
  10. write_file(zipped_data, 'cleaned_unicef_data.csv')

with…as 的作用是将第一个函数的输出赋值给第二个变量名。本行代码将新文件 open(file_name, 'wb') 赋值给变量 new_csv_file'wb' 的意思是以二进制模式写入。

❷ 初始化 CSV writer 对象,传入一个打开的文件,然后将 writer 对象赋值给 wrtr 变量。

❸ writer 对象需要数据列表来逐行写入,本行创建的是标题行的标题列表。长标题是元组第一部分的第二个元素,所以对应的代码是 row[0][1]

❹ 用到了 writer 对象的 writerow 方法,将一个可迭代对象转换成一行逗号分隔的数据。本行代码写入的是标题行。

❺ 利用列表生成式提取出所有回答(元组的第二个元素)。

❻ 将利用列表生成式创建的所有列表或回答写入 CSV 数据文件。

这里我们用到了学过的语法,也用到了一些新语法。我们已经学过如何用 with…as 将简单函数的返回值赋值给一个变量名。这里我们希望将打开的文件赋值给 new_csv_file 变量。这种语法通常用于文件和其他 I/O 对象,因为 Python 执行完 with 代码块中的代码之后,它会自动关闭文件,这很棒!

此外,代码中我们用到了 CSV writer 对象,与 CSV reader 对象的用法类似。writerow 可以将包含所有数据列的列表写入到 CSV 文件中。

第 8 章 数据清洗:标准化和脚本化 - 图2 writerow 方法接受一个可迭代对象,所以一定要传入一个列表或元组。如果你传入一个字符串,那么看到一些有趣的 CSV(“l,i,k,e,,t,h,i,s”)时不要惊讶。

我们还用到了列表生成式来创建标题列表和回答列表。由于我们不需要用这个函数生成一个新对象或修改过的对象,所以没有返回任何值。这个函数可以帮我们复习目前学过的许多概念。

如果你想用其他方法来保存数据,可以参考第 6 章给出的关于保存数据的建议。保存完清洗过的数据之后,你可以继续进行后面的清洗过程,并对数据进行分析。

8.3 找到适合项目的数据清洗方法

根据数据的可靠性,以及你分析数据的频率,你可以选择一种完全不同的数据清洗方式。如果你要处理的数据是非常杂乱的,或者有许多不同的来源,你可能无法准确地将清洗过程脚本化。

第 8 章 数据清洗:标准化和脚本化 - 图3 你需要分析将数据清洗完全脚本化所要付出的时间和精力,然后判断数据清洗自动化能否真正节省时间。

如果清洗过程特别繁琐,有很多步骤,你可能需要创建一个包含许多辅助脚本的仓库。这样即使你没有一个按顺序完成所有步骤的脚本,这个仓库也会为你提供许多函数,在整个数据处理过程中均可使用,还可以让你处理新数据时速度更快。举个例子,你有一些在列表或矩阵中搜索重复值的脚本,还有一些函数,可以从 CSV 导入或导出数据,或者格式化字符串和日期。对于这种方法,你可以随时导入这些函数并使用,可以在 IPython 或 Jupyter 中导入使用(我们会在第 10 章中学到),也可以在当前仓库的其他文件中导入使用。

如果你的清洗代码有固定的规律,不太可能发生变化,那么可以将整个清洗过程脚本化。

8.4 数据清洗脚本化

随着你的 Python 知识的逐步深化与丰富,你编写的 Python 代码也会逐渐变得复杂。现在你可以编写函数、解析文件、导入并使用多个 Python 库,甚至还可以存储数据。是时候开始将代码脚本化了。脚本化(scripting)的意思是,确定代码的结构,用于后续使用、学习和分享。

以 UNICEF 数据为例。我们知道,UNICEF 每隔几年会发布这些数据集,其中许多数据是不变的。调查不太可能发生较大变化——它是建立在多年经验的基础之上。考虑到这些事实,我们可以信任这些数据集有相当高的一致性。如果我们需要再次用到 UNICEF 数据,可能至少可以复用第一次写的脚本中的一部分代码。

目前我们代码的结构比较简单,也缺少代码文档。除了可读性较差外,这样的代码还很难复用。虽然现在我们可以看懂自己写的函数,但一年后我们还能准确地读懂并理解这些函数吗?我们把这些函数发给同事,他们能看懂我们的笔记吗?在我们对这些问题做出肯定的回答之前,最好一行代码也不要写。如果一年后我们无法读懂自己的代码,那么这些代码是没有任何用处的,当发布新报告时会有人(很可能是我们自己)重新写这些代码。

Python 之禅不仅适用于编写代码,还适用于组织代码,函数、变量和类的命名,等等。最好在选择命名上花点时间,判断哪些名字可以让你和他人都一目了然。注释和文档可以帮助理解,但代码本身也应该具有较强的可读性。

第 8 章 数据清洗:标准化和脚本化 - 图4 经常有人称赞 Python 是最容易读懂的语言之一,即使是看不懂代码的人也能读懂!保持代码语法简洁可读,这样解释代码功能的文档也不需要太长。

Python 之禅

Python 之禅(https://www.python.org/dev/peps/pep-0020/)总是非常值得参考的(还可以输入 import this 来轻松查看)。它的要点是,对于Python(和许多语言)来说,尽可能保持明确、简洁和实用总是最好的。1

随着你编程水平的提高,明确和实用的含义可能会发生变化,但我们强烈建议你尽可能保持代码清晰、精确和简单。有时可能会使代码量变大或者运行时间变长,但随着经验地增长,你总会找到方法将代码写得既快速又清晰。

现阶段应该将代码写得尽可能清晰,这样以后回看代码时,你可以理解自己当时的意思。

1中文版 Python 之禅可参见:https://wiki.python.org/moin/PythonZenChineseTranslate。——译者注

通读 PEP-8 Python 风格指南(https://www.python.org/dev/peps/pep-0008/),并遵守里面的规则。有许多 PEP-8 的检查工具(linter),可以通读你的代码,并指出其中不符合 PEP-8 的地方。

除了风格标准和用法,你还可以用检查工具评估代码的复杂度。有些是根据 McCabe 关于循环复杂度的理论和计算方法(https://en.wikipedia.org/wiki/Cyclomatic_complexity)来对代码进行分析。虽然不是每次都能将代码分割成简单的代码块,但你应该尽量将复杂任务拆分成更小、更简单的任务,降低代码复杂度,使代码更明确。

在使代码更加清晰明确的同时,另一个很有用的做法是,让可复用的代码块更加通用。但注意不要过于一般化(def foo 这样的定义毫无用处),但如果你创建通用的辅助函数,你将会经常用到它们(例如用一个列表创建 CSV,或者用包含重复值的列表创建一个集合),你的代码也会更加有序、简洁和简单。

第 8 章 数据清洗:标准化和脚本化 - 图5 如果所有报告都用相同的代码连接数据库或打开数据文件,你可以为此创建一个函数。编写通用的辅助函数,其目的是创建简单、可读、可用且不重复的代码。

表 8-1 汇总了一些编程的最佳实践,你可以在以后的编程中考虑这些做法。这些最佳实践并没有包含关于 Python 和编程的所有内容,但可以为今后的学习和编程打下良好的基础。

表8-1:Python编程最佳实践

实践 说明
文档 包括代码中的注释、函数说明和脚本说明,以及 README.md 文件和仓库中其他必要的说明文件
命名清晰 所有函数、变量和文件都应该有清晰的命名,从名字中就可以看出其内容或功能
语法正确 变量和函数应该遵守正确的 Python 语法(一般用小写字母,单词之间加下划线,对于类名采用驼峰式大小写(CamelCase,https://en.wikipedia.org/wiki/CamelCase),代码应遵守 PEP-8 标准
导入 只导入需要使用的内容,导入方式遵守 PEP-8 的原则
辅助函数 创建抽象的辅助函数,使代码变得清晰、可复用(例如,export_to_csv 函数将列表内容导入 CSV 文件)
仓库管理 用逻辑结构和层级结构管理仓库,共用的代码放在一起,符合一般的逻辑规律
版本控制 所有代码都应该有版本控制,这样你或你的同事可以创建新分支、尝试新特性,而不会影响仓库主分支的运行
快速,但是更要清晰 利用 Python 语法糖写出快速高效的代码,但当速度和清晰只能二选一时,选择清晰的代码
利用现成的库 当你想做点什么,而前人已经用 Python 做过了,不要重复造轮子。善于利用优秀的 Python 库,对这些库做贡献来帮助开源社区
代码测试 在适当可行的时候,为单个函数编写测试,并利用测试数据来测试代码
详实准确 try 代码块中正确地编写例外(exception),代码文档要详实,变量名要准确

为代码编写文档是编写脚本的一个重要步骤。正如 Eric Holscher(Python 主义者,Write the Docs 的创始人之一)恰如其分地总结(http://www.writethedocs.org/guide/writing/beginners-guide-to-docs/):为代码编写文档的原因有很多,最重要的原因就是你可能会再次用到这些代码——或者其他人可能会阅读并使用这些代码,或者你想发布到 GitHub 上,或者你想在以后的面试中用到,或者你想将代码发给你母亲。无论什么原因,为代码编写完备的文档,可以在未来减少数小时的痛苦。如果你是团队的一员,还会减少整个团队数百小时的痛苦。想到未来会有这些好处,现在值得花精力坐下来分析代码的用途,以及这么编写的原因。

类似 Read the Docs(https://readthedocs.org/)或者 Write the Docs(http://www.writethedocs.org/)之类的机构给出了许多好的建议和帮助,使编写文档变得更加轻松。一个好的经验做法是,在项目根目录里创建一个 README.md,简要说明代码的作用、安装方法和运行方法、基本要求以及在哪里可以找到更多信息。

第 8 章 数据清洗:标准化和脚本化 - 图6 有时在 README.md 里放一个简短的代码示例也是很有用的,这取决于用户(读者)与核心组件的交互次数多少。

除了 README.md 文件,你还需要添加代码注释。第 5 章中说过,注释可以是只给自己看的快速笔记,也可以是说明脚本和函数用法的长注释。

第 8 章 数据清洗:标准化和脚本化 - 图7 Python 中各种注释的语法和用法在 PEP-350(https://www.python.org/dev/peps/pep-0350/)中有详细说明。遵循这些标准,任何人都可以轻松看懂你写的注释。

我们来尝试为之前的清洗代码编写文档。为了让我们编写文档的思路清晰,我们首先简要列出需要完成的任务。

  • 从 UNICEF 数据文件中导入数据。

  • 找到数据行对应的标题。

  • 将我们可以读懂的标题与内置缩写标题正确匹配。

  • 解析数据,检查是否有重复值。

  • 解析数据,检查数据是否有缺失。

  • 将同一家庭的多行数据合并。

  • 保存数据。

上述任务基本上是按先后顺序排列的,列出这些任务,可以让我们在组织代码结构、编写脚本以及为新脚本编写文档时减轻一些痛苦。

我们要做的第一件事情,就是将本章和上一章写的所有代码块放到同一个脚本文件中。把它们放在一起之后,我们可以开始按照规则写出好代码。我们来看一下当前的脚本:

  1. from csv import reader
  2. import dataset
  3. data_rdr = reader(open('../../../data/unicef/mn.csv', 'rb'))
  4. header_rdr = reader(open('../../../data/unicef/mn_headers_updated.csv', 'rb'))
  5. data_rows = [d for d in data_rdr]
  6. header_rows = [h for h in header_rdr if h[0] in data_rows[0]]
  7. all_short_headers = [h[0] for h in header_rows]
  8. skip_index = []
  9. final_header_rows = []
  10. for header in data_rows[0]:
  11. if header not in all_short_headers:
  12. print header
  13. index = data_rows[0].index(header)
  14. if index not in skip_index:
  15. skip_index.append(index)
  16. else:
  17. for head in header_rows:
  18. if head[0] == header:
  19. final_header_rows.append(head)
  20. break
  21. new_data = []
  22. for row in data_rows[1:]:
  23. new_row = []
  24. for i, d in enumerate(row):
  25. if i not in skip_index:
  26. new_row.append(d)
  27. new_data.append(new_row)
  28. zipped_data = []
  29. for drow in new_data:
  30. zipped_data.append(zip(final_header_rows, drow))
  31. # 检查数据是否有缺失
  32. for x in zipped_data[0]:
  33. if not x[1]:
  34. print x
  35. # 检查是否有重复值
  36. set_of_keys = set([
  37. '%s-%s-%s' % (x[0][1], x[1][1], x[2][1]) for x in zipped_data])
  38. uniques = [x for x in zipped_data if not
  39. set_of_keys.remove('%s-%s-%s' %
  40. (x[0][1], x[1][1], x[2][1]))]
  41. print len(set_of_keys)
  42. # 保存到数据库
  43. db = dataset.connect('sqlite:///../../data_wrangling.db')
  44. table = db['unicef_survey']
  45. for row_num, data in enumerate(zipped_data):
  46. for question, answer in data:
  47. data_dict = {
  48. 'question': question[1],
  49. 'question_code': question[0],
  50. 'answer': answer,
  51. 'response_number': row_num,
  52. 'survey': 'mn',
  53. }
  54. table.insert(data_dict)

可以看出,大部分代码都是扁平的(flat),即没有重要性的嵌套关系。文件中大部分代码和函数都没有缩进或文档。代码本身不够抽象,变量名也不够清晰。我们从头开始解决这些问题。前两段代码重复。我们可以编写一个函数来代替:

  1. def get_rows(file_name):
  2. rdr = reader(open(file_name, 'rb'))
  3. return [row for row in rdr]

有了这个函数,现在我们的文件就变短了。我们来看下一段代码是否还能进一步改进。

我们修改 header_rows 使其与 data_rows 里的标题对齐,花了不少时间,但现在已经不需要这段代码了。我们创建了 final_header_rows,里面的 header_rowsdata_rows 已经匹配好了,所以我们无需担心二者不匹配的问题。我们可以删除这行代码。

14~27 行的作用是创建 final_header_rowsskip_index 两个列表。我们可以将这两个列表的用途总结一下,就是用于删除不匹配的元素,这样我们才能合并最终列表。我们把两个列表放在同一个方法中:

  1. def eliminate_mismatches(header_rows, data_rows):
  2. all_short_headers = [h[0] for h in header_rows]
  3. skip_index = []
  4. final_header_rows = []
  5. for header in data_rows[0]:
  6. if header not in all_short_headers:
  7. index = data_rows[0].index(header)
  8. if index not in skip_index:
  9. skip_index.append(index)
  10. else:
  11. for head in header_rows:
  12. if head[0] == header:
  13. final_header_rows.append(head)
  14. break
  15. return skip_index, final_header_rows

现在我们已经将清洗脚本中的很多代码都合并成函数了。这有助于我们描述每一个函数的功能,编写代码文档,当需要修改代码时知道需要查看哪些内容。

我们继续阅读脚本,看能否找到更多需要修改之处。下一节代码似乎是用于创建合并后的数据集。我们可以将其拆分为两个函数:一个找出与标题匹配的数据行,另一个合并两个列表。我们也可以只用一个函数来创建合并后的数据。最终由你自己决定哪种方法更好。这里我们用的是一个函数外加一个简短的辅助函数,后面可能会再次用到:

  1. def zip_data(headers, data):
  2. zipped_data = []
  3. for drow in data:
  4. zipped_data.append(zip(headers, drow))
  5. return zipped_data
  6. def create_zipped_data(final_header_rows, data_rows, skip_index):
  7. new_data = []
  8. for row in data_rows[1:]:
  9. new_row = []
  10. for index, data in enumerate(row):
  11. if index not in skip_index:
  12. new_row.append(data)
  13. new_data.append(new_row)
  14. zipped_data = zip_data(final_header_rows, new_data)
  15. return zipped_data

有了这些新函数,我们可以保存代码、清洗变量名,还可以利用辅助函数将标题与数据行合并,并返回合并后的数据列表。代码更加清晰,分块也更加合理。我们继续将同样的逻辑应用到文件中的其他代码。我们来看一下最终结果:

  1. from csv import reader
  2. import dataset
  3. def get_rows(file_name):
  4. rdr = reader(open(file_name, 'rb'))
  5. return [row for row in rdr]
  6. def eliminate_mismatches(header_rows, data_rows):
  7. all_short_headers = [h[0] for h in header_rows]
  8. skip_index = []
  9. final_header_rows = []
  10. for header in data_rows[0]:
  11. if header not in all_short_headers:
  12. index = data_rows[0].index(header)
  13. if index not in skip_index:
  14. skip_index.append(index)
  15. else:
  16. for head in header_rows:
  17. if head[0] == header:
  18. final_header_rows.append(head)
  19. break
  20. return skip_index, final_header_rows
  21. def zip_data(headers, data):
  22. zipped_data = []
  23. for drow in data:
  24. zipped_data.append(zip(headers, drow))
  25. return zipped_data
  26. def create_zipped_data(final_header_rows, data_rows, skip_index):
  27. new_data = []
  28. for row in data_rows[1:]:
  29. new_row = []
  30. for index, data in enumerate(row):
  31. if index not in skip_index:
  32. new_row.append(data)
  33. new_data.append(new_row)
  34. zipped_data = zip_data(final_header_rows, new_data)
  35. return zipped_data
  36. def find_missing_data(zipped_data):
  37. missing_count = 0
  38. for question, answer in zipped_data:
  39. if not answer:
  40. missing_count += 1
  41. return missing_count
  42. def find_duplicate_data(zipped_data):
  43. set_of_keys = set([
  44. '%s-%s-%s' % (row[0][1], row[1][1], row[2][1])
  45. for row in zipped_data])
  46. uniques = [row for row in zipped_data if not
  47. set_of_keys.remove('%s-%s-%s' %
  48. (row[0][1], row[1][1], row[2][1]))]
  49. return uniques, len(set_of_keys)
  50. def save_to_sqlitedb(db_file, zipped_data, survey_type):
  51. db = dataset.connect(db_file)
  52. table = db['unicef_survey']
  53. all_rows = []
  54. for row_num, data in enumerate(zipped_data):
  55. for question, answer in data:
  56. data_dict = {
  57. 'question': question[1],
  58. 'question_code': question[0],
  59. 'answer': answer,
  60. 'response_number': row_num,
  61. 'survey': survey_type,
  62. }
  63. all_rows.append(data_dict)
  64. table.insert_many(all_rows)

现在我们有了许多不错的函数,却改变了程序的运行方式。如果现在运行这个脚本,一行代码都不会运行。只有一些写好的函数,却都没有被调用。

现在我们要在一个 main 函数中说明使用这些函数的方法。Python 开发者一般会将通过命令行运行的代码放到 main 函数里。下面我们添加 main 函数的代码,用于清洗数据集:

  1. """ 这部分代码放在已写脚本的下面。 """
  2. def main():
  3. data_rows = get_rows('data/unicef/mn.csv')
  4. header_rows = get_rows('data/unicef/mn_headers_updated.csv')
  5. skip_index, final_header_rows = eliminate_mismatches(header_rows,
  6. data_rows)
  7. zipped_data = create_zipped_data(final_header_rows, data_rows, skip_index)
  8. num_missing = find_missing_data(zipped_data)
  9. uniques, num_dupes = find_duplicate_data(zipped_data)
  10. if num_missing == 0 and num_dupes == 0:
  11. save_to_sqlitedb('sqlite:///data/data_wrangling.db', zipped_data)
  12. else:
  13. error_msg = ''
  14. if num_missing:
  15. error_msg += 'We are missing {} values. '.format(num_missing)
  16. if num_dupes:
  17. error_msg += 'We have {} duplicates. '.format(num_dupes)
  18. error_msg += 'Please have a look and fix!'
  19. print error_msg
  20. if __name__ == '__main__':
  21. main()

现在我们有了一个可以从命令行运行的可执行文件。运行此文件会发生什么?你会得到我们刚刚创建的错误信息,还是将数据保存到本地的 SQLite 数据库中?

使一个文件可以在命令行中运行

大多数可以在命令行中运行的 Python 文件都有一些相同的属性。它们一般都有一个 main 函数,里面再调用小型函数或辅助函数,和我们上面的清洗脚本类似。

main 函数一般会在文件的主缩进级别的代码块中进行调用。调用的语法是 if __name__ == '__main__':。这个语法用到了全局的私有变量(所以变量名两边才有双下划线),当你在命令行运行文件时会返回 True

如果不是在命令行中运行脚本,那么 if 语句中的代码不会运行。如果我们将这些函数导入另一个脚本中,__name__ 变量不等于 '__main__',代码就不会运行。这是 Python 脚本常用的约定。

第 8 章 数据清洗:标准化和脚本化 - 图8 遇到任何错误,检查你的代码和上述代码是否完全相同,检查仓库中数据的文件路径是否正确,还要检查第 6 章创建的本地数据库的文件路径是否正确。

下面我们来为代码编写文档。我们要给函数添加一些文档字符串和行内注释,方便我们理解脚本中比较复杂的代码段,还要在脚本开头添加一大段说明文字,这些文字以后可以放到 README.md 文件中:

  1. """
  2. Usage: python our_cleanup_script.py
  3. This script is used to intake the male survey data from UNICEF
  4. and save it to a simple database file after it has been checked
  5. for duplicates and missing data and after the headers have been properly
  6. matched with the data. It expects there to be a 'mn.csv' file with the
  7. data and the 'mn_updated_headers.csv' file in a subfolder called 'unicef' within
  8. a data folder in this directory. It also expects there to be a SQLite
  9. file called 'data_wrangling.db' in the root of this directory. Finally,
  10. it expects to utilize the dataset library
  11. (http://dataset.readthedocs.org/en/latest/).
  12. If the script runs without finding any errors, it will save the
  13. cleaned data to the 'unicef_survey' table in the SQLite.
  14. The saved data will have the following structure:
  15. - question: string
  16. - question_code: string
  17. - answer: string
  18. - response_number: integer
  19. - survey: string
  20. The response number can later be used to join entire responses together
  21. (i.e., all of response_number 3 come from the same interview, etc.).
  22. If you have any questions, please feel free to contact me via ...
  23. """
  24. from csv import reader
  25. import dataset
  26. def get_rows(file_name):
  27. """Return a list of rows from a given csv filename."""
  28. rdr = reader(open(file_name, 'rb'))
  29. return [row for row in rdr]
  30. def eliminate_mismatches(header_rows, data_rows):
  31. """
  32. Return index numbers to skip in a list and final header rows in a list
  33. when given header rows and data rows from a UNICEF dataset. This
  34. function assumes the data_rows object has headers in the first element.
  35. It assumes those headers are the shortened UNICEF form. It also assumes
  36. the first element of each header row in the header data is the
  37. shortened UNICEF form. It will return the list of indexes to skip in the
  38. data rows (ones that don't match properly with headers) as the first element
  39. and will return the final cleaned header rows as the second element.
  40. """
  41. all_short_headers = [h[0] for h in header_rows]
  42. skip_index = []
  43. final_header_rows = []
  44. for header in data_rows[0]:
  45. if header not in all_short_headers:
  46. index = data_rows[0].index(header)
  47. if index not in skip_index:
  48. skip_index.append(index)
  49. else:
  50. for head in header_rows:
  51. if head[0] == header:
  52. final_header_rows.append(head)
  53. break
  54. return skip_index, final_header_rows
  55. def zip_data(headers, data):
  56. """
  57. Return a list of zipped data when given a header list and data list. Assumes
  58. the length of data elements per row and the length of headers are the same.
  59. example output: [(['question code', 'question summary', 'question text'],
  60. 'resp'), ....]
  61. """
  62. zipped_data = []
  63. for drow in data:
  64. zipped_data.append(zip(headers, drow))
  65. return zipped_data
  66. def create_zipped_data(final_header_rows, data_rows, skip_index):
  67. """
  68. Returns a list of zipped data rows (matching header and data) when given a
  69. list of final header rows, a list of data rows, and a list of indexes on
  70. those data rows to skip as they don't match properly. The function assumes
  71. the first row in the data rows contains the original data header values,
  72. and will remove those values from the final list.
  73. """
  74. new_data = []
  75. for row in data_rows[1:]:
  76. new_row = []
  77. for index, data in enumerate(row):
  78. if index not in skip_index:
  79. new_row.append(data)
  80. new_data.append(new_row)
  81. zipped_data = zip_data(final_header_rows, new_data)
  82. return zipped_data
  83. def find_missing_data(zipped_data):
  84. """
  85. Returns a count of how many answers are missing in an entire set of zipped
  86. data. This function assumes all responses are stored as the second element.
  87. It also assumes every response is stored in a list of these matched question,
  88. answer groupings. It returns an integer.
  89. """
  90. missing_count = 0
  91. for response in zipped_data:
  92. for question, answer in response:
  93. if not answer:
  94. missing_count += 1
  95. return missing_count
  96. def find_duplicate_data(zipped_data):
  97. """
  98. Returns a list of unique elements and a number of duplicates found when given
  99. a UNICEF zipped_data list. This function assumes that the first three rows of
  100. data are structured to have the house, cluster, and line number of the
  101. interview and uses these values to create a unique key that should not be
  102. repeated.
  103. """
  104. set_of_keys = set([
  105. '%s-%s-%s' % (row[0][1], row[1][1], row[2][1])
  106. for row in zipped_data])
  107. #TODO: this will throw an error if we have duplicates-we should find a way
  108. #around this
  109. uniques = [row for row in zipped_data if not
  110. set_of_keys.remove('%s-%s-%s' %
  111. (row[0][1], row[1][1], row[2][1]))]
  112. return uniques, len(set_of_keys)
  113. def save_to_sqlitedb(db_file, zipped_data, survey_type):
  114. """
  115. When given a path to a SQLite file, the cleaned zipped_data, and the
  116. UNICEF survey type that was used, saves the data to SQLite in a
  117. table called 'unicef_survey' with the following attributes:
  118. question, question_code, answer, response_number, survey
  119. """
  120. db = dataset.connect(db_file)
  121. table = db['unicef_survey']
  122. all_rows = []
  123. for row_num, data in enumerate(zipped_data):
  124. for question, answer in data:
  125. data_dict = {
  126. 'question': question[1],
  127. 'question_code': question[0],
  128. 'answer': answer,
  129. 'response_number': row_num,
  130. 'survey': survey_type,
  131. }
  132. all_rows.append(data_dict)
  133. table.insert_many(all_rows)
  134. def main():
  135. """
  136. Import all data into rows, clean it, and then if
  137. no errors are found, save it to SQlite.
  138. If there are errors found, print out details so
  139. developers can begin work on fixing the script
  140. or seeing if there is an error in the data.
  141. """
  142. #TODO: we probably should abstract these files so that we can pass
  143. # them in as variables and use the main function with other surveys
  144. data_rows = get_rows('data/unicef/mn.csv')
  145. header_rows = get_rows('data/unicef/mn_updated_headers.csv')
  146. skip_index, final_header_rows = eliminate_mismatches(header_rows,
  147. data_rows)
  148. zipped_data = create_zipped_data(final_header_rows, data_rows, skip_index)
  149. num_missing = find_missing_data(zipped_data)
  150. uniques, num_dupes = find_duplicate_data(zipped_data)
  151. if num_missing == 0 and num_dupes == 0:
  152. #TODO: we probably also want to abstract this
  153. # file away, or make sure it exists before continuing
  154. save_to_sqlite('sqlite:///data_wrangling.db', zipped_data, 'mn')
  155. else:
  156. #TODO: eventually we probably want to log this, and
  157. # maybe send an email if an error is thrown rather than print it
  158. error_msg = ''
  159. if num_missing:
  160. error_msg += 'We are missing {} values. '.format(num_missing)
  161. if num_dupes:
  162. error_msg += 'We have {} duplicates. '.format(num_dupes)
  163. error_msg += 'Please have a look and fix!'
  164. print error_msg
  165. if __name__ == '__main__':
  166. main()

现在我们的代码文档更详细、结构更合理,还有许多可复用的函数。对于我们的第一个脚本来说,这是一个很好的开始。利用这些代码,希望我们可以导入许多 UNICEF 数据!

第 8 章 数据清洗:标准化和脚本化 - 图9 我们还在代码里添加了许多“TODO”(待办)的注释,这样我们以后可以继续完善脚本。你认为哪个问题是最迫切需要解决的?为什么?你能尝试解决这个问题吗?

我们只用了一个文件来运行代码。但随着代码量的增加,你的仓库也会变得越来越复杂。在初期就要思考你可能需要向仓库中添加的内容,这一点是很重要的。代码和代码结构很相似。如果你认为这个仓库可能的用途不仅仅是解析 UNICEF 数据,你的代码结构可能会大不相同。

为什么会这样?首先,你可能需要将数据保存在一个单独的文件中。事实上,根据你的仓库未来的复杂程度,你可能需要在不同的文件夹中使用不同的数据解析方法和清洗方法。

第 8 章 数据清洗:标准化和脚本化 - 图10 在初期不必过分担心这些决策。随着你 Python 编程水平的提高和对数据集的理解进一步加深,你会更清楚地认识到应该从哪里开始。

在仓库的结构中,经常会有一个名为 utils 或 common 的文件夹,你可以在里面保存代码之间共享的脚本。许多开发者将数据库连接脚本,常用的 API 代码和通信或 email 脚本等保存在这样的文件夹中,方便导入其他脚本中。

你可能创建了多个目录来保存项目的不同内容,具体取决于仓库的管理结构。其中一个目录只和 UNICEF 数据有关。另一个目录可能包含网络抓取脚本或最终报告代码。如何组织仓库的结构由你自己决定。永远保持清晰、明确、有序。

如果你最后不得不重新组织仓库结构,那么在开始时就尽可能保持仓库有序,后面就不会太过痛苦。相反,如果你的仓库里都是 800 行的文件,而且没有清晰的文档,那么你要做的事情就很多了。最好的经验做法是最开始给出结构框架,随着仓库内容的增加和变化对结构进行临时调整。

除了良好的文件结构,保持目录、文件、函数和类的命名清晰明确也是很有用的。在 utils 文件夹中可能有多个文件。如果你将其命名为 utils1、utils2 等,你可能需要打开文件才能知道它们的具体内容。但如果你将其命名为 email.py、database.py、twitter_api.py 等,文件名本身就包含了更多信息。

在代码中尽量保持明确,对长期而成功的 Python 数据处理事业是一个良好的开端。我们思考一下仓库的结构,看如何找到相应的文件:

  1. data_wrangling_repo/
  2. |-- README.md
  3. |-- data_wrangling.db
  4. |-- data/
  5. | `-- unicef/
  6. | |-- mn.csv
  7. | |-- mn_updated_headers.csv
  8. | |-- wm.csv
  9. | `-- wm_headers.csv
  10. |-- scripts/
  11. | `-- unicef/
  12. | `-- unicef_cleanup.py (本章的脚本)
  13. `-- utils/
  14. |-- databases.py
  15. `-- emailer.py

我们还没有编写 databases 或 emailer 文件,但我们或许应该这么做。我们还可以向文件结构中添加哪些内容?我们在仓库中创建了两个不同的 unicef 文件夹,你认为这么做的原因是什么?开发者是否应该将数据文件和脚本文件分开保存?

第 8 章 数据清洗:标准化和脚本化 - 图11 你的项目文件结构可能和这个类似,但要记得,数据通常都不保存在仓库中。将项目的数据文件保存在共享文件服务器或本地网络的其他位置。如果你是独立开发,一定要在其他地方备份。不要将这些大文件提交到你的仓库中。这样不仅会在需要在新设备上查看仓库时降低工作效率,而且也不是管理数据的好方法。

我们也建议不要将 db 文件或任何 log、config 文件提交到仓库中。仓库结构应尽可能实用。你总是可以将预期的文件结构添加到 README.md 文件中,并详细说明去哪里获取数据文件。

Git 和 .gitignore 文件

如果你还没有用 Git(https://git-scm.com/)做版本控制的话,学完本书就会用了!版本控制可以让你创建仓库来管理和修改代码,并将其分享给团队或其他同事。

在第 14 章中我们将会深入讲解Git,但现在我们在讨论仓库结构,希望重点说一下 .gitignore 文件(https://github.com/github/gitignore)。.gitignore 文件的作用是,让 Git 忽略某些文件,不要将这些文件上传到仓库中。这个文件使用简单模式来匹配文件名,与我们在第 7 章中学过的正则表达式类似。

在我们的仓库结构中,我们可以用一个 .gitignore 文件,这样 Git 就不会将任何数据文件提交到仓库中。然后我们可以在 README.md 中说明仓库的结构,给出获取数据文件的联系信息。这样我们的仓库就比较简洁,且易于下载,还可以保持良好的代码结构。

创建一个符合逻辑的仓库结构,并添加 README.md 和 .gitignore 文件,可以保持模块化代码的项目文件夹有序,并避免将大型数据文件或可能敏感的数据(数据库或登录数据)放在仓库中。

8.5 用新数据测试

前面我们已经学习了编写文档、代码脚本化、组织代码结构,现在我们应该编写一些测试,或者用新数据来测试。这可以帮我们检查代码的运行是否正确,是否与与预期相同,还可以明确代码的含义。我们将数据清洗脚本化的原因之一就是我们可以复用这些代码,因此用新数据测试可以证明我们在代码标准化上花的时间和精力是值得的。

测试刚写过的脚本的一种方法是,我们能否将其轻松应用于在 UNICEF 网站上找到的相似数据。我们来看一下。你应该已经从本书仓库(https://github.com/jackiekazil/datawrangling)中下载了 wm.csv 和 wm_headers.csv 两个文件。这两个文件是津巴布韦 UNICEF 数据中的女性调查数据。

我们尝试在脚本中使用这些文件,替换男性调查数据。要做到这一点,我们只需修改清洗脚本中的两个文件名,将其修改为两个妇女调查数据文件的路径。我们还应该将调查类型修改为 'wm',这样我们才能区分两个数据集中的数据。

第 8 章 数据清洗:标准化和脚本化 - 图12 女性数据集要比男性的大得多。如果你有未保存的数据,我们建议先保存数据,关闭其他程序,然后再进行下一步。关于这一点,可以思考一下如何在脚本中改善内存使用。

我们来看一下能否成功导入数据:

  1. import dataset
  2. db = dataset.connect('sqlite:///data_wrangling.db')
  3. wm_count = db.query('select count(*) from unicef_survey where survey="wm"')
  4. count_result = wm_count.next()
  5. print count_result

❶ 我们用的是直接查询,可以快速查看 survey='wm' 的行数。这应该只包括我们将类型设为 'wm' 后第二次运行的数据行。

❷ 读取查询结果,利用查询响应的 next 方法提取出第一个结果。我们用的是 count,所以我们应该只得到一个结果。

我们从女性数据集中成功导入了超过 300 万个问题和回答。我们的脚本是有效的,我们可以看到输出结果!

利用相似数据测试脚本是确保脚本按预期运行的一种方法。从中还可以发现,你的脚本通用性很高,可以复用。但测试代码还有很多其他方法。Python 有不少很好的测试库,可以帮你编写测试脚本和应用测试数据(甚至测试 API 响应),从而保证代码正常运行。

Python 标准库中有一些内置的测试模块。unittest 库(https://docs.python.org/2/library/unittest.html)可以为 Python 代码做单元测试。它有一些好用的内置类,利用 assert 语句来测试代码是否正常运行。如果想为代码编写单元测试,我们可以写一个测试,判断 get_rows 函数返回的是不是一个列表。我们还可以判断列表长度和文件的数据行数是否相同。我们可以用这些断言测试每一个函数。

另一个流行的 Python 测试框架是 nose 库(https://nose.readthedocs.org/en/latest/)。nose 是一个非常强大的测试框架,额外插件(https://nose.readthedocs.io/en/latest/plugins/builtin.html)和配置可以提供很多功能选项。如果你的仓库很大,许多开发者负责同样的代码,有不同的测试需求,那么这个库是很好用的。

不知道先用哪一个?那么 pytest 库(http://pytest.org/latest/)可能适合你。你可以用任意一种风格编写测试,还可以在需要时换用另一种风格。它的社区也相当活跃,里面有许多演讲和教程(http://docs.pytest.org/en/latest/talks.html#talks-and-blog-postings),所以如果你想深入了解之后编写自己的测试,那么可以先用这个库。

第 8 章 数据清洗:标准化和脚本化 - 图13 通常来说,测试集的结构是在每一个模块下有一个测试文件(对于我们当前的仓库结构来说,我们会在每一个目录下放一个测试文件,除了数据文件夹和配置文件夹)。有些人为文件夹中的每一个 Python 文件编写一个测试文件,所以很容易判断每项测试针对的是哪一个具体文件。其他人将测试放在一个单独的目录中,其结构与 Python 文件结构相同。

无论你选择哪种测试风格或文件结构,一定要确保前后一致、风格明确。这样你就会知道在哪里可以找到测试文件,你(和其他人)也可以在必要时运行这些测试代码。

8.6 小结

本章我们学习了数据标准化的一些基本知识,还有何时适合做数据归一化或删除离群值。你可以将干净的数据(来自第 6 章)导入到数据库或本地文件中,并且你开始为那些重复性过程编写了条理更加清晰的函数。

此外,你还学习了用嵌套文件夹和正确命名的文件来组织 Python 仓库结构,开始编写文档并分析代码。最后,你对测试和编写测试的一些工具有了基本的了解。

表 8-2 列出了本章讲到的 Python 概念。

表8-2:Python编程的新概念和新库

概念/库 作用
数据库 insert 方法 利用 insert 命令可以将数据轻松保存到 SQLite 数据库中
CSV writer 对象 利用 csv 库的 writer 类,可以将数据保存到 CSV 文件中
Python 之禅(import this 像 Python 程序员一样写代码和思考的哲学
Python 最佳实践 作为一名新的 Python 开发者,应该遵循的最佳实践的基本框架
Python 命令行运行(if __name__ == '__main__': 利用这个代码块对脚本进行格式化,可以在命令行中运行 main 函数
TODO 标记 利用 TODO 注释,很容易发现下一步需要对脚本做哪些改进
Git(https://git-scm.com/ 帮助记录代码变化的版本控制系统。对于你想要部署的代码或想要与他人共享的代码,这一点是绝对必要的,而且对于本地的个人项目也是非常有用的。第 14 章会介绍 Git 的更多内容

在下一章里,我们将学习数据分析,你将会继续练习数据清洗和数据分析的方法,并利用这些方法来分析新的数据集。