导读:抽样是从整体样本中通过一定的方法选择一部分样本。抽样是数据处理的基本步骤之一,也是科学实验、质量检验、社会调查普遍采用的一种经济有效的工作和研究方法。
作者:宋天龙
如需转载请联系华章科技
抽样工作在数据获取较少或处理大量数据比较困难的时期非常流行,这主要有以下几方面原因:
如果存在上述条件限制或有类似强制性要求,那么抽样工作仍然必不可少。
但是在当前数据化运营的大背景下,数据计算资源充足、数据采集端可以采集更多的数据并且可以通过多种方式满足时效性的要求,抽样工作是否就没有必要了?其实不是的,即使上述限制条件都满足,还有很多场景依然需要通过抽样方法来解决具体问题。
抽样方法从整体上分为非概率抽样和概率抽样两种。非概率抽样不是按照等概率的原则进行抽样,而是根据人类的主观经验和状态进行判断;概率抽样则是以数学概率论为基础,按照随机的原则进行抽样。本节以下内容介绍的抽样方法属于概率抽样。
1. 简单随机抽样
该抽样方法是按等概率原则直接从总样本中抽取n个样本,这种随机抽样方法简单、易于操作,但是它并不能保证样本能完美代表总体。这种抽样的基本前提是所有样本个体都是等概率分布的,但真实情况却是多数样本都不是或无法判断是否是等概率分布的。
在简单随机抽样中,得到的结果是不重复的样本集,还可以使用有放回的简单随机抽样,这样得到的样本集中会存在重复数据。该方法适用于个体分布均匀的场景。
2. 等距抽样
等距抽样是先将总体中的每个个体按顺序编号,然后计算出抽样间隔,再按照固定抽样间隔抽取个体。
这种操作方法易于理解、简便易行,但当总体样本的分布呈现明显的分布规律时容易产生偏差,例如增减趋势、周期性规律等。该方法适用于个体分布均匀或呈现明显的均匀分布规律,无明显趋势或周期性规律的数据。
3. 分层抽样
分层抽样是先将所有个体样本按照某种特征划分为几个类别,然后从每个类别中使用随机抽样或等距抽样的方法选择个体组成样本。这种操作方法能明显降低抽样误差,并且便于针对不同类别的数据样本进行单独研究,因此是一种较好的实现方法。该方法适用于带有分类逻辑的属性、标签等特征的数据。
4. 整群抽样
整群抽样是先将所有样本分为几个小群体集,然后随机抽样几个小群体集来代表总体。
这种操作方法与之前的3种方法的差异点在于该方法抽取的是小群体集,而不是每个数据个体本身。该方法虽然简单易行,但是样本的分布受限于小群体集的划分,抽样误差较大。这种方法适用于小群体集的特征差异比较小的数据,并且对划分小群体集有更高要求。
1. 数据抽样要能反映运营背景
数据能正确反映运营背景,这看起来非常简单,但实际上需要数据工作者对于运营环节和流程非常熟悉才有可能实现。以下是常见的抽样不能反映运营背景的情况。
2. 数据抽样要能满足数据分析和建模需求
数据抽样必须兼顾后续的其他数据处理工作,尤其是分析和建模需求。这时需要注意以下几个方面的问题。
(1)抽样样本量的问题
对于大多数数据分析建模而言,数据规模越大,模型拟合结果越准确。但到底如何定义数据量的大小,笔者根据不同类型的数据应用总结为以下几个维度:
以上的数据记录数不是固定的,在实际工作时,如果没有特定时间要求,笔者一般会选择一个适中的样本量做分析,此时应综合考虑特征数、特征值域分布数、模型算法适应性、建模需求等;如果是面向机器计算的工作项目,一般会选择尽量多的数据参与计算,而有关算法实时性和效率的问题会让技术和运维人员配合实现,例如提高服务器配置、扩大分布式集群规模、优化底层程序代码、使用实时计算的引擎和机制等。
(2)抽样样本在不同类别中的分布问题
做分类分析建模问题时,不同类别下的数据样本需要均衡分布。
抽样样本能准确代表全部整体特征:
异常检测类数据的处理:
本示例中,将使用random包以及自定义代码实现抽样处理。数据源文件data2.txt、data3.txt和data4.txt位于“附件-chapter3”中。
整个示例代码分为5部分。
第1部分:导入需要的库
import random # 导入标准库 import numpy as np # 导入第三方库
这里用到了Python内置标准库random以及第三方库Numpy,前者用于做随机抽样,后者用于读取文件并做数据切片使用。
第2部分:实现简单随机抽样
data = np.loadtxt('data3.txt') # 导入普通数据文件 data_sample = data[random.sample([i for i in range(len(data))], 2000)] # 随机抽取2000个样本 print(data_sample[:2]) # 打印输出前2条数据 print(len(data_sample)) # 打印输出抽样样本量
首先通过Numpy的loadtxt方法读取数据文件。
然后使用Random库中的sample方法做数据抽样。由于sample库要求抽取的对象是一个序列或set,因此这里使用了一个列表推导式直接基于data数据集的记录数生成索引列表,然后再返回给sample随机抽样,抽样数量为2000;最后从data中直接基于索引获得随机抽样后的结果。
打印输出前2条数据和总抽样样本量。返回结果如下:
[[-4.59501348 8.82741653 4.40096599 3.40332532 -6.54589933] [-7.23173404 -8.92692519 6.82830873 3.0378005 4.64450399]] 2000
本示例中,我们使用了列表推导式来生成data的索引列表。传统方法的实现可以这样写:
ind = [] for i in range(len(data)): ind.Append(i)
而这里的列表推导式的写法[i for i in range(len(data))]除了在语法上更加简洁和优雅外,在性能上同样会有提升。我们通过如下实验做简单测试,对从0到1000000的每个数求平方然后添加到列表。两种方法如下:
# 方法1:传统方法 import time t0=time.time() # 开始时间 ind = [] for i in range(1000000): sqr_values = i*i ind.append(sqr_values) t1 = time.time() # 结束时间 print(t1-t0) # 打印时间 # 方法2:列表推导式 import time t0=time.time() # 开始时间 sqr_values = [i*i for i in range(1000000)] t1 = time.time() # 结束时间 print(t1-t0) # 打印时间
上述代码执行后的输出结果分别是:
0.39202237129211426 0.12700724601745605
上面只是简单的计算逻辑并且数据量也不大,如果配合大数据量以及更复杂的运算,那么效率提升会非常明显。与之类似的还有生成器表达式、字典推导式,都是很Pythonic的实现方法。
第3部分:实现等距抽样
data = np.loadtxt('data3.txt') # 导入普通数据文件 sample_count = 2000 # 指定抽样数量 record_count = data.shape[0] # 获取最大样本量 width = record_count / sample_count # 计算抽样间距 data_sample = [] # 初始化空白列表,用来存放抽样结果数据 i = 0 # 自增计数以得到对应索引值 while len(data_sample) <= sample_count and i * width <= record_count - 1: # 当样本量小于等于指定抽样数量并且矩阵索引在有效范围内时 data_sample.append(data[int(i * width)]) # 新增样本 i += 1 # 自增长 print(data_sample[:2]) # 打印输出前2条数据 print(len(data_sample)) # 打印输出样本数量
首先使用Numpy的loadtxt方法读取数据文件;然后指定抽样样本量为2000,并通过读取原始数据的形状找到最大样本量边界,这可以用来作为循环的终止条件之一;接着通过最大样本量除抽样样本量得到抽样间距;建立一个空列表用于存储最终抽样结果数据,通过一个变量i做循环增长并用来做索引递增,然后进入抽样条件判断过程。
当样本量小于等于指定抽样数量并且矩阵索引在有效范围内时做处理,这里需要注意的是索引从0开始,因此最大数量值减去1得到循环边界,否则会报索引溢出错误。
通过列表的append方法不断追加通过间距得到的新增样本,在本节后面的方法中还会提到列表追加的extend方法,前者用于每次追加1个元素,后者用于批量追加多个元素。
i += 1指的是每次循环都增加1,可以写成i = i + 1。
最后打印输出前2条数据和抽样样本量。
返回结果如下:
[array([-3.08057779, 8.09020329, 2.02732982, 2.92353937, -6.06318211]), array([-2.11984871, 7.74916701, 5.7318711 , 4.75148273, -5.68598747])] 2000
第4部分:实现分层抽样
data2 = np.loadtxt('data2.txt') # 导入带有分层逻辑的数据 each_sample_count = 200 # 定义每个分层的抽样数量 label_data_unique = np.unique(data2[:, -1]) # 定义分层值域 sample_data = [] # 定义空列表,用于存放最终抽样数据 sample_dict = {} # 定义空字典,用来显示各分层样本数量 for label_data in label_data_unique: # 遍历每个分层标签 sample_list = [] # 定义空列表,用于存放临时分层数据 for data_tmp in data2: # 读取每条数据 if data_tmp[-1] == label_data: # 如果数据最后一列等于标签 sample_list.append(data_tmp) # 将数据加入分层数据中 each_sample_data = random.sample(sample_list, each_sample_count) # 对每层数据都随机抽样 sample_data.extend(each_sample_data) # 将抽样数据追加到总体样本集 sample_dict[label_data] = len(each_sample_data) # 样本集统计结果 print(sample_dict) # 打印输出样本集统计结果
首先使用Numpy的loadtxt方法导入带有分层逻辑的数据。在该示例中,读取的数据文件中包含了分类标签,放在最后一列。该列分类标签用于做分层抽样的标识。接着通过unique方法获取分层(分类标签)的值域,用于后续做循环处理。然后分别定义了用于存放临时分层数据、最终抽样数据、显示各分层样本数量的空列表和空字典。
下面进入正式的主循环过程,实现分层抽样:
{0.0: 200, 1.0: 200}
第5部分:实现整群抽样
data3 = np.loadtxt('data4.txt') # 导入已经划分好整群的数据集 label_data_unique = np.unique(data3[:, -1]) # 定义整群标签值域 print(label_data_unique) # 打印输出所有整群标签 sample_label = random.sample(set(label_data_unique), 2) # 随机抽取2个整群 sample_data = [] # 定义空列表,用来存储最终抽样数据 for each_label in sample_label: # 遍历每个整群标签值域 for data_tmp in data3: # 遍历每个样本 if data_tmp[-1] == each_label: # 判断样本是否属于抽样整群 sample_data.append(data_tmp) # 样本添加到最终抽样数据集 print(sample_label) # 打印输出样本整群标签 print(len(sample_data)) # 打印输出总抽样数据记录条数
首先使用Numpy的loadtxt方法导入已经划分好整群的数据集。在该示例中,读取的数据文件中的最后一列存放了不同整群的标识,整群一共被划分为4个群组,标识分别为0、1、2、3。接着通过unique方法获取整群标签的值域,用于基于整群的抽样。打印输出结果如下:
[ 0. 1. 2. 3.]
然后使用Random的sample方法从整群标签中进行抽样,这里定义抽取2个整群。最后将所有属于抽取到的整群下的数据进行读取和追加,并得到最终样本集,打印输出样本集的整群标签和总样本数量,结果如下:
[3.0, 1.0] 502
由于是随机概率抽样,因此读者使用代码抽取到的样本很可能与笔者示例不一致,这属于正常现象。另外,读者多次随机抽样程序也可能得到不一样的结果。
上述过程中,需要考虑的关键点是:如何根据不同的数据特点、建模需求、业务背景综合考虑抽样方法,得到最适合的结果
代码实操小结:本节示例中,主要用了几个知识点:
关于作者:宋天龙,深大数据技术专家,触脉咨询合伙人兼副总裁,前Webtrekk中国区技术和咨询负责人(德国最大在线数据分析服务提供商)。擅长数据挖掘、建模、分析与运营,精通端到端数据价值场景设计、业务需求转换、数据结构梳理、数据建模与学习以及数据工程交付。在电子商务、零售、银行、保险等多个行业拥有丰富的数据项目工作经验。