pyspark是spark的Python API,允许python调用spark编程模型
from pyspark import SparkContext
sc = SparkContext(master='local[2]')
./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py
from pyspark import SparkConf,SparkContext
conf = (SparkConf().setMaster("local").setAppName("my APP").set("spark.executor.memory","1g"))
sc = SparkContext(conf=conf)
pyspark shell已经为SparkContext创建了名为sc的变量
./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py
用—master参数设定Context连接到哪个Master服务器,通过传递逗号分隔列表至—py-files添加Python.zip、egg或.py文件到Runtime路径
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])
使用textFile()函数从HDFS、本地文件或其它支持hadoop的文件系统里读取文件,或使用wholeTextFiles()函数读取目录下所有文本文件
textFile = sc.textFile('a.txt')
textFile2 = sc.wholeTextFiles(/aa)
rdd.getNumPatitions() 列出分区数
rdd.count() 计算RDD的实例数量
rdd.countByKey() 按键计算RDD实例数量
defaultdict(<type 'int'>,('a':2,'b':1))
rdd.countByValue() 按值计算RDD实例数量
defaultdict(<type 'int'>,(('b',2):1,('a',2):1,('a',7):1))
rdd.collectAsMap() 以字典的形式返回键值
('a':2,'b':2)
rdd.sum() 汇总RDD元素
4959
sc.parallelize([]).isEmpty() 检查RDD是否为空
rdd.max() RDD元素的最大值
rdd.min() RDD元素的最小值
rdd.mean() RDD元素的平均值
rdd.stdev() RDD元素的标准差
rdd.variance() RDD元素的方差
rdd.histogram(3) 分箱(bin)生成直方图
rdd.stats() 综合统计包括:计数、平均值、标准差、最大值和最小值
rdd.map(lambda x:x+(x[1],x[0])).collect() 对每个RDD元素执行函数
rdd.flatMap(lambda x:x+(x[1],x[0])) 对每个RDD元素执行函数,并拉平结果
rdd.collect()
rdd.flatMapValues(lambda x:x).collect() 不改变键,对rdd的每个键值对执行flatMap函数
获取
rdd.collect() 返回包含所以RDD元素的列表
rdd.take(4) 提取前4个RDD元素
rdd.first() 提取第一个RDD元素
rdd.top(2) 提取前两个RDD元素
抽样rdd.sample(False,0.15,81) 返回RDD的采样子集
筛选rdd.filter(lambda x:'a' in x) 筛选RDD
rdd.distinct() 返回RDD里的唯一值
rdd.keys() 返回RDD键值对里的键
def g(x):print(x)
rdd.foreach(g)
规约
rdd.reduceByKey(lambda x,y:x+y) 合并每个键的值
rdd.reduce(lambda x,y:x+y) 合并RDD的值
分组rdd.groupBy(lambda x:x%2).mapValues(list) 返回RDD的分组值
rdd.groupByKey().mapValues(list) 按键分组RDD集合seqOp = (lambda x,y:(x[0]+y,x[1]+1))
combOP = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd.aggregate((0,0),seqOp,combOP) 汇总每个分区里的RDD元素,并输出结果
rdd.aggregeteByKey((0,0),seqOp,combOP) 汇总每个RDD的键的值
rdd.fold(0,add) 汇总每个分区里的RDD元素,并输出结果
rdd.foldByKey(0,add) 合并每个键的值
rdd,keyBy(lambda x:x+x) 通过执行函数,创建RDD元素的元组
rdd.subtract(rdd2) 返回RDD2里没有匹配键的rdd的兼职对
rdd2.subtractByKey(rdd) 返回rdd2里的每个(键、值)对,rdd中,没有匹配的键
rdd.cartesian(rdd2) 返回rdd和rdd2的笛卡尔积
rdd.sortBy(lambda x:x[1]) 按给定函数排序RDD
rdd.sortByKey() 按键排序RDD的键值对
rdd.repartition(4) 新建一个含4个分区的RDD
rdd.coalesce(1) 将RDD中的分区数缩减为1个
rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.Apache.hadoop.mapred.TextOutputFormat')
sc.stop()
./bin/spark-submit examples/src/main/python/pi.py
Spark SQL是Apache Spark处理结构化数据的模块
SparkSession用于创建数据框,将数据框注册为表,执行SQL查询,缓存表及读取Parquet文件
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").config("spark.some.config.option","some-value").getOrCreate()
from pyspark.sql.types import *
推断Schemasc = spark.sparkContextlines = sc.textFile("people.txt")
parts = lines.map(lambda l:l.split(","))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)指定Schemapeople = parts.map(lambda p:Row(name=p[0],age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)spark.createDataFrame(people,schema).show()
json
df = spark.read.json("customer.json")
df.show()df2 = spark.read.load("people.json",format = "json")
Parquet文件df3 = spark.read.load("users.parquet")
文本文件df4 = spark.read.text("people.txt")
df.dtypes 返回df的列名与数据类型
df.show() 显示df内容
df.head() 返回前n行数据
df.first() 返回第一行数据
df.take(2) 返回前两行数据
df.schema 返回df的schema
df.describe().show() 汇总统计数据
df.columns 返回df列名
df.count() 返回df的行数
df.distinct().count() 返回df中不重复的行数
df.printSchema() 返回df的Schema
df.explain() 返回逻辑与实体方案
df = df.dropDuplicates()
from pyspark.sql import functions as F
Select
df.select("firstName").show() 显示firstName列的所有条目
df.select("firstName","lastName".show())
df.select("firstName","age",
explode("phoneNumber") 显示firstName、age的所有条目和类型
.alias("contactInfo"))
.select("ContactInfo.type","firstName","age")
df.select(df["firstName"],df["age"]+1).show() 显示firstName和age列的所有记录添加
df.select(df["age"]>24).show() 显示所有小于24的记录
When
df.select("firstName",F.when(df.age>30,1)) 显示firstName,且大于30岁显示1,小于30显示0
.otherwise(0).show()
df[df.firstName.isin("Jane","Boris")].collect() 显示符合特定条件的firstName列的记录
Like
df.select("firstName",df.lastName, 显示lastName列中包含Smith的firstName列的记录
like("Smith")).show()
Startswith-Endwith
df.select("firstName",df.lastName. 显示lastName列中以Sm开头的firstName列的记录
startswith("Sm")).show()
df.select(df.lastName.endswith("th")).show() 显示以th结尾的lastName
Substring
df.select(df.firstName.substr(1,3).alias("name"))返回firstName的子字符串
Between
df.select(df.age.between(22,24)).show() 显示介于22到24直接的age列的所有记录
df = df.withColumn('city',df.address.city)
.withColumn('postalCode',df.address.postalCode)
.withColumn('state',df.address.state)
.withColumn('streetAddress',df.address.streetAddress)
.withColumn('telePhoneNumber',explode(df.phoneNumber.number))
.withColumn('telePhoneType',explode(df.phoneNumber.type))
df = df.withColumnRenamed('telePhoneNumber','phoneNumber')
df = df.drop("address","phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)
df.groupBy("age").count().show() 按age列分组,统计每组人数
df.filter(df["age"]>24).show() 按age列筛选,保留年龄大于24岁的
peopledf.sort(peopledf.age.desc()).collect()
df.sort("age",ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1]).collect()
df.na.fill(50).show() 用一个值替换空值
df.na.drop().show() 去除df中为空值的行
df.na.replace(10,20).show() 用一个值去替换另一个值
df.repartition(10).rdd.getNumPartitions() 将df拆分为10个分区
df.coalesce(1).rdd.getNumPartitions() 将df合并为1个分区
peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")
df = spark.sql("select * from customer").show()
peopledf = spark.sql("select * from global_temp.people").show()
rdd1 = df.rdd 将df转为rdd
df.toJSON().first() 将df转为rdd字符串df.toPandas() 将df的内容转为Pandas的数据框
df.select("firstName","city").write.save("nameAndCity.parquet")
df.select("firstName","age").write.save("nameAndAges.json",format="json")
spark.stop()