本文介绍如何利用Hadoop Streaming任务进行数以十亿计的大规模字段提取。
前言
本文介绍如何利用Hadoop Streaming任务进行数以十亿计的大规模字段提取。如有谬误请联系指出,本文遵守CC 4.0 BY-SA版权协议,转载请联系作者并注明出处,谢谢。
e-mail: FesianXu@gmail.com
github: https://github.com/FesianXu
知乎专栏: 计算机视觉/计算机图形理论与应用
微信公众号:机器学习杂货铺3号店
假设现在有两个数据库A和B,在A里面存储了几十亿的url,而在B中储存了数以百亿计的url对应的正排信息,这些正排信息通过json格式进行组织。如下所示1
2
3
4
5# each row in A database
https://www.bilibili.com/video/BV1Dg41167zC
https://www.bilibili.com/video/BV1PU4y1678L
...
https://www.bilibili.com/video/BV1jV4y147jb1
2
3
4
5# each row in B database
{"loc":"https://www.bilibili.com/video/BV1Dg41167zC","loc_info":{"data_size":100,"timestamp":20200810}}
{"loc":"https://www.bilibili.com/video/BV1PU4y1678L","loc_info":{"data_size":120,"timestamp":20200512}}
...
{"loc":"https://www.bilibili.com/video/BV1jV4y147jb","loc_info":{"data_size":123,"timestamp":20210302}}
当然,数据库B里面的数据显然不是按照和数据库A一样的顺序排列的,这里只是为了举例方便而让他们具有一样的序而已。显然不可能对于每个来自于数据库A的url都在B里面遍历一遍以查找对应的正排信息,这样的计算复杂度将会是无法接受的AAAAA
(在sort阶段是按照字符的字典序进行排序的),因此A的mapper输出结果如(url和标识符之间通过\t
隔开):1
2
3
4
5# each row in A database
https://www.bilibili.com/video/BV1Dg41167zC AAAAA
https://www.bilibili.com/video/BV1PU4y1678L AAAAA
...
https://www.bilibili.com/video/BV1jV4y147jb AAAAA
同时解析出来的B数据和其正排信息,如下所示:1
2
3
4https://www.bilibili.com/video/BV1Dg41167zC loc_info:100
https://www.bilibili.com/video/BV1PU4y1678L loc_info:120
...
https://www.bilibili.com/video/BV1jV4y147jb loc_info:123
那么将所有mapper的输出进行合并,并且sort后,其输出如:1
2
3
4
5
6
7https://www.bilibili.com/video/BV1Dg41167zC AAAAA
https://www.bilibili.com/video/BV1Dg41167zC loc_info:100
https://www.bilibili.com/video/BV1PU4y1678L AAAAA
https://www.bilibili.com/video/BV1PU4y1678L loc_info:120
...
https://www.bilibili.com/video/BV1jV4y147jb AAAAA
https://www.bilibili.com/video/BV1jV4y147jb loc_info:123
可以看到,如果B里有A对应url的正排信息的话,来自于A的数据总是在B的前面,通过这个线索即可完成url的正排字段提取。mapper和reducer的示意代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27# mapper code, the input of mapper could come from both A and B
import sys
import json
for line in sys.stdin:
line = line.strip()
line_seg = line.split("\t")
if len(line_seg) == 1 and line[:4] == "http":
# if come from A
print("{}\t{}".format(
line_seg[0], "AAAAA"
))
else:
# if come from B
try:
datapack = json.loads(line)
except:
continue
try:
target_seg = extract_seg(datapack)
loc = extract_loc(datapack)
print("{}\t{}".format(
loc, target_seg
))
except:
continue1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33# reducer code, the input come from the sorted result of A+B output
import sys
old_key = ""
content = ""
is_from_A = False
def output(key, content, is_from_A):
if key == "" or content == "" or (not is_from_A):
# key and content must be valid and is_from_A must be True
return
print("{}\t{}".format(
key, content
))
# 初始化
for line in sys.stdin:
line = line.strip()
line_seg = line.split("\t")
key = line_seg[0]
if key != old_key:
output(old_key, content, is_from_A)
old_key = key
content = ""
is_from_A = False
if line_seg[1] == "AAAAA":
is_from_A = True
else:
content = line_seg[1]
# 循环末尾
output(old_key, content, is_from_A) # 后处理
这里需要简单对reducer的代码进行解释,我们从上面的分析已经可以知道reducer的输入,来自于对A和B的解析结果,并且进行排序后的输出。因此reducer的目的很显然,就是判断当前url是否来自于A,同时其后继行来自于B并且url与A一致,那么就将其正排信息输出即可。同时,由于标识符为“AAAAA”,对于同一个url数据而言,来自于A的数据总是排序在前,来自于B的数据(由于是字典序更小的loc_info
开头)则总是接续其后。因此,我们的reducer有两件事情需要做的,第一判断old_key
是否和当前的key
相同,如果是,那么就认为A的url在B中匹配到了。其次还得考虑当前数据是否来自于A,当然这个判断简单,只要通过标识符判断即可。最后别忘了对最后一个结果进行输出即可,我们的输出output()
是在切换新的key的时候进行输出的,因此最后一行输出在循环中是不会输出的,需要在循环外进行输出。给一个运行例子,考虑我们的模拟输入(其中的AAAAA为标识符,content为正排内容):1
2
3
4
5
6
7url0 AAAAA
url1 content
url2 AAAAA
url2 content
url3 content
url4 AAAAA
url4 content
那么我们的reducer运行结果和关键变量的结果如下表所示(运行到循环末尾处作为断点)
循环数 | old_key | key | content | is_from_A | 当前是否输出output |
---|---|---|---|---|---|
初始化 | "" | "" | "" | False | N |
1 | "url0" | "url0" | "" | True | N |
2 | "url1" | "url1" | content | False | N |
3 | "url2" | "url2" | "" | True | N |
4 | "url2" | "url2" | content | True | N |
5 | “url3” | “url3” | "" | False | Y (根据上一次结果,也即是4进行输出) |
6 | “url4” | “url4” | "" | True | N |
7 | “url4” | “url4” | content | True | N |
后处理 | “url4” | “url4” | content | True | Y(根据上一次结果,也即是7进行输出) |
通过这种处理方法,能够将匹配不上的结果跳过,而将能匹配上的结果进行输出。