安装Anaconda

  • 创建 .condarc文件添加镜像源
  • 创建环境 conda create -n xxx python=3.7
  • 查看安装环境列表 conda env list
  • 进入创建的环境 conda activate <环境名>
  • 退出环境 conda deactivate

Spark模块

Spark架构

  • 资源管理层面
    • 集群资源管理者 Master
    • 单机资源管理者 Worker
  • 任务计算层面
    • 单任务管理者 Driver
    • 单任务执行者 Executor

local模式
启动一个 JVM Process进程(包含多个线程)执行 Task
可以限制模拟 Spark集成环境的线程数量
* Local[N] or Local[*] (*按照cpu最多核心设置线程数)

StandAlone架构
StandAlone集群在进程上主要有3类进程:
* 主节点 Master: 管理整个集群资源,并托管运行各个任务的Dirver
* 从节点 Wokers: Woker角色管理每个机器的资源,分配对应的资源来运行 Executor(task)
* 历史服务器 HistoryServer(可选): 保存事件日志到HDFS

SparkOnYarn

RDD

定义
Resilient Distributed Dataset 弹性分布式数据集

RDD算子
分布式集合上的 API 称为算子,可分成以下两类

  • Transformation
    返回值仍然是RDD, 这种算子是 lazy懒加载, 如果没有action算子, Transformation算子是不工作的

  • Action算子
    返回值不是 RDD的算子

常用Transformation算子

  • map(func)

  • flatMap(func)
    rdd执行map操作, 然后解除嵌套

    1
    2
    lt = [[1,2,3], [4,5,6]]
    # -> lt = [1,2,3,4,5,6]
  • reauceByKey(func)

    1
    # func(v, v) -> v
  • mapValues(func)
    对二元组的 value 执行 map操作

  • groupBy(func)

    1
    2
    3
    4
    5
    6
    7
    from pyspark import SparkContext
    if et _name__ == '__main__':
    sc = SparkContext('local[*]', 'test')
    rdd = sc.parallelize([('a',1), ('a',1), ('b', 1), ('c', 1)])
    result = rdd.groupBy(lambda x: x[0])
    print(result.map(lambda x: (x[0], list(x[1]))).collect())
    #输出 [('b', [('b', 1)]), ('c', [('c', 1)]), ('a', [('a', 1), ('a', 1)])]
  • groupByKey()

  • Filter(func)

    1
    # func(T) -> bool
  • distinct() 去重

  • union(other_rdd) 合并

  • join(other_rdd) 内连接

    • leftOuterJoin(other_rdd) 左外连接
    • rightOuterJoin(other_rdd) 右外连接
      只能作用于二元组, 根据 key 值进行关联
  • intersection(other_rdd) 求交集

  • glom 按分区进行嵌套

  • sortBy(func, ascending=True or False, numPartitions=num)

    • func(T) -> U
    • True or False
    • 用多少分区进行排序
  • sortByKey(ascending=True or False, numPartitions=num, keyfunc)

常用Action算子

  • conuntByKey()

  • collect()
    RDD 各个分区内的数据统一收集到 Driver 中形成一个 List 对象

  • redece()

  • fold()
    带有初始值的聚合 (作用在分区内和分区外)

  • first() 取出第一个元素

  • take(n)
    将前 n 个元素以 List 形式返回

  • top(n)
    对结果进行降序排序, 取前 n 个元素

  • takeSample(arg1, arg2, arg3)

    • arg2: 是否允许抽到同一个数据
    • arg3: 抽样数量
    • arg3: 随机数种子
  • takeOrdered(n, func)

    • n: 取前 n个数据
    • 对排序的数据进行更改(不会改变数据本身),默认为升序
  • foreach()
    map 作用相同,但是没有返回值

  • saveAsTextFile(path)
    RDD 数据写入文本文件中

  • mapPartition(func)
    一次传递的数据是一整个分区的数据作为一个迭代器(一次性list)对象传入过来

  • foreachPartition(func)

  • partitionBy(n, func)

    • n: 重新分区后分区的数量
    • func: 自定义分区规则 (x) -> int(k) (0 <= k < n)
  • repartition(n)
    重新进行分区

RDD的缓存

  • rdd的数据是过程数据

    • cache() 缓存到内存中
    • persist(StoregeLevel.MEMORY_ONLY)
    • persist(StoregeLevel.MEMORY_ONLY_2) 生成两个副本
    • persist(StoregeLevel.DISK_ONLY)
    • persist(StoregeLevel.MEMORY_AND_DISK) 先放入内存,空间不够时放入硬盘
  • checkPoint 仅支持硬盘存储 (不保留血缘关系)

广播变量

1
2
3
#将本地变量v标记成广播变量
broadcast = sc.broadcast(v)
value = broadcast.value

累加器

1
cnt = ac.accumulator(初始值)

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark import SparkContext
import re

if __name__ == '__main__':
sc = SparkContext('local[*]', 'test')
symbol_char = ['\'', ',', '-', '.']
bd = sc.broadcast(symbol_char)
file_rdd = sc.textFile('file:///home/yuki/RDD/info.txt')
sy_cnt = sc.accumulator(0)

line_rdd = file_rdd.filter(lambda line: line.strip())

def Sepration(line):
sym_char = bd.value
line = line.strip()
for c in sym_char:
line = line.replace(c, ' ' + c + ' ')
print(line)
return line

data_rdd = line_rdd.map(Sepration)

word_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))

def filter_func(data):
global sy_cnt
sym_char = bd.value
if data in sym_char:
sy_cnt += 1
return False
else:
return True

info_rdd = word_rdd.filter(filter_func)

result_rdd = info_rdd.map(lambda x: (x, 1)).\
reduceByKey(lambda a,b: a + b)
print('word_cnt:', result_rdd.collect())
print('char_cnt:', sy_cnt)

SparkSQL

DataFrame的组成

  • 结构层面
    • StructType (表结构)
    • StructField (列信息)
  • 数据层面
    • Row
    • Column

创建 DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.sql import SparkSession

if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()


sc = spark.sparkContext
rdd = sc.textFile("file:///home/yuki/SparkSQL/info.txt").\
map(lambda x: x.split(',')).\
map(lambda x: (x[0], int(x[1])))

df = spark.createDataFrame(rdd, schema=['name', 'age'])

df.printSchema()
df.createOrReplaceTempView("info")
spark.sql("SELECT * FROM info WHERE age < 22").show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':

spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()

sc = spark.sparkContext

rdd = sc.textFile("file:///home/yuki/SparkSQL/info.txt").\
map(lambda x: x.split(',')).\
map(lambda x: (x[0], int(x[1])))

#toDF的方式构建DataFrame
#df1 = rdd.toDF(["name", "age"])
#df1.printSchema()
#df1.show()

schema = StructType().add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()

通过SparkSQL的统一 API 进行数据读取构建

sparksession.read.format(“text|csv|json|…”)
.option(“k”, “V”) 可选
.schema(“StructType | String”)
.load(“path”)

  • text

    1
    2
    3
    4
    schema = StructType().add("data", StringType(), nullable=True)
    df = spark.read.format("text").\
    schema(schema=schema).\
    load("file:///home/yuki/SparkSQL/info.txt")
  • json

    1
    df = spark.read.format("json").load("path")