1.准备
1.1 创建数据库,使用adventureWorks数据库

1.2 创建lg101角色,并且分配只读权限
- 创建角色和密码
CREATE LOGIN lg101 WITH PASSWORD = '123131'
CREATE user lg101 LOGIN lg101
-
分配权限只读权限
image.png
1.3 将密码保存在key-value里
- 创建log101的username的secret
- 创建log101的password的secret
image.png
注意:
使用Key-value的时候,需要在access control里面分配权限
1.4 创建IR

1.5 创建与OnPrem的LinkService
1.5.1 使用key-value来读取Password

1.5.2 设置key-value的linkservice

1.5.3在key-value里添加ADF的权限
-
当我们设置好了上步的inkservice的时候,发现报错,原因是因为ADF无权限使用kv
image.png
-
添加secret的使用权限给ADF
image.png
1.5.4 添加ADF的OnPrem的LinkService
- 用户名使用了不加密,方便使用
-
password使用kv里存储的
image.png
2. 创建pipeline
2.1 创建lookup找出数据库需要的表复制
2.1.1 读取当前数据的schema和表名
SELECT t.name AS table_name, s.name AS schema_name
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
where t.name in ('Address','Customer','CustomerAddress','Product','ProductCategory',
'ProductDescription','ProductModel','ProductModelIllustration','SalesOrderDetail','SalesOrderHeader')
注意:如果表名前面有schema一定要读取,否则无法读取数据
2.1.2 读取当前数据的schema和表名 创建forEach循环,循环Loopup的output

2.1.3 读取当前数据的schema和表名 创建copy程序,设置source

2.1.4 读取当前数据的schema和表名设置sink

3. 使用DataBricks清洗数据
3.1创建cluster

3.2 创建blob的mounts
dbutils.fs.mount(source = 'wasbs://input@dl108lg.blob.core.windows.net',
mount_point= '/mnt/blob108_input',
extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'fuDFNS1ziD9Lw4aeH/N6gw7+4'}
)
3.3 创建文件夹
3.3.1 查看所有挂载点
dbutils.fs.mounts()
3.3.2 进入到指定文件夹
dbutils.fs.ls('/mnt/blob108_input')
3.3.3 创建新的文件夹
dbutils.fs.mkdirs('/mnt/blob108_input/golden')
3.3.4 删除多余的文件夹
dbutils.fs.rm('/mnt/blob108_input/golder', True) # 第二个参数为True表示递归删除子目录和文件
3.4 读取文件并处理
3.4.1 spark读取address文件
df = spark.read.load('/mnt/blob108_input/bronze/Address', format = 'csv', sep = ',', header = True, escape = '"', inferschema = True)
display(df)
网友评论