文章目录
  1. 1. Abstract
  2. 2. 任务描述
    1. 2.1. 输入格式
  3. 3. 方法思路
  4. 4. 初步探索
  5. 5. 进一步的尝试
    1. 5.1. 尝试一
    2. 5.2. 尝试二
    3. 5.3. 此时的程序性能
  6. 6. 继续优化
  7. 7. 程序性能分析
  8. 8. 后话
  9. 9. References & Links

Abstract

图的三角形计数问题是一个基本的图计算问题,是很多复杂网络分析(比如社交网络分析)的基础。
在一个随机图中三角形的期望数目大约是$\frac {4} {3} (m/n)^3$, 其中m是边数,n是点数。而在一张社会网络关系图中,我们预期其中的三角形数会远远高于随机图中的三角形数目,原因在于A,B是朋友,B,C是朋友时,A,C也是朋友/可能成为朋友的概率大于随机值。所以三角形技术能够帮助我们度量一张图看上去像社会网络的程度,也可以帮我们判断社会网络的成熟度或者社区的年龄。
目前图的三角形计数问题已经成为了Spark系统中GraphX图计算库所提供的一个算法级API。本次实验任务就是要在Hadoop系统上实现 Twitter 社交网络图的三角形计数任务。

任务描述

一个社交网络可以看做是一张图(离散数学中的图)。社交网络中的人对应于图的顶点;社交网络中的人际关系对应于图中的边。在本次实验任务中,我们只考虑一种关系——用户之间的关注关系。假设“王五”在 Twitter/微博中关注了“李四”,则在社交网络图中,有一条对应的从“王五”指向“李四”的有向边。图 1 中展示了一个简单的社交网络图,人之间的关注关系通过图中的有向边标识了出来。本次的实验任务就是在给定的社交网络图中,统计图中所有三角形的数量。
在统计前,需要先进行有向边到无向边的转换,依据如下逻辑转换:
$$IF~(A→B)~OR~(B→A)~ THEN~A→B$$

“A→B”表示从顶点 A 到顶点 B 有一条有向边。A-B 表示顶点 A 和顶点 B 之间有一条无向边。


一个示例见图 1,图 1 右侧的图就是左侧的图去除边方向后对应的无向图。

请在无向图上统计三角形的个数。在图 1 的例子中,一共有 3 个三角形。
本次实验将提供一个 Twitter 局部关系图[1]作为输入数据(给出的图是有向图),请统计该图对应的无向图中的三角形个数。

输入格式

输入数据仅一个文件。该文件由若干行组成,每一行由两个以空格分隔的整数组成:
A B
A,B 分别是两个顶点的 ID。这一行记录表示图中具有一条由 A 到 B 的有向边。整个图的
结构由该文件唯一确定。

下面的框中是文件部分内容的示例:

方法思路

找三角形在离散数学中有数学上的方法,即矩阵自乘两次,矩阵自乘一次后的结果矩阵的对角线上的元素$A_{ii}$代表的是$i$节点经过两条边再回到该点的路径数目,再乘一次后的结果对角线上的元素表示经过三条边回到该点的路径数目,这时候就有了一个三角形,由于每个在三角形中的点都会从两个方向出发走三条边回到自己,所以,每个三角形都做了点数(3)乘以方向数(2)次计算,所以应该将最后的结果除以6得到最终结果。

另一种想法是,直观的去找,要找三角形,先找到两条边,比如要找$\nabla ABC$,可以先找到A->B, A->C,那么下一步就是找B->C或者C->B了,由于我们先对边做了无向处理,将所有的无向边的端点按字典序排序好了,所以相当于找B->C就可以了。所以考虑有A->B和A->C时发射一个B->C的需求,再发射自己A->B,A->C的供给,如果别的地方需要A->B或者A->C的边,那么可以直接供给,一旦有边,就可以供给给任意多的需要,这时候看最终某条边到底有多少需求,然后有没有供给,如果有供给,则三角形数目更新为加上这条边需求的数目。同样的,如果别的地方有B->C这条边的供给,那么此时的B->C需求就得到了满足,相当于新发现了一个三角形。按照这种Feed&Eat的方式,就可以使用MapReduce来处理社交网络三角形统计了。

初步探索

初步探索的算法分以下四个模块,

  • GraphUndirecter.java

图的无向化,按照OR逻辑,$IF~(A→B)~OR~(B→A)~ THEN~A-B$,如果存在某一个方向的边,则按字典序对两个端点从小到大排序,发射一条从小指到大的边,为了防止发射重复边,全局采用一个HashSet来判重。并且需要判断自环。这个类只需要读取文件发射边即可,无需Reduce步骤。

1
2
3
4
Key: 输入文件的行偏移
Value: "A B" (A,B为点的ID)
Output Key: "A#B"
Output Value: "#"

伪代码如下:

1
2
3
4
5
6
7
8
9
void map(Object key, Text value, Context context)
st = Tokenizer(value)

A = st.nextToken()
B = st.nextToken()
if A != B: //判自环
if A < B:
emit("A#B #")
else:
emit("B#A #")

  • TilingGraph.java

将上一步得到的边展开成邻接表一样的形式,方便发射需求边和供给边。如: A->[B, C, D],此时我们会发射需求边: B->C, B->D, C->D,和供给边A->B, A->C, A->D。由于前面已经将

1
2
3
4
Key: 输入文件的行偏移
Value: "A#B #" (A,B为点的ID)
Output Key: "A#B"
Output Value: "#" 或者 "@" ("#"供给,"@"需求)

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void map(Object key, Text value)
st = Tokenizer(value)
A = st.nextToken()
B = st.nextToken()
emit(A,B)

void reduce(Text key, Iterable<Text> values)
for v in values:
emit("key#v", "#")
for i = 1 -> len(values):
for j = i+1 -> len(values):
if values[i] < values[j]:
emit("values[i]#values[j]", "@")
else:
emit("values[j]#values[i]", "@")

  • FeedAndCount.java

得到了所有边是否有需求有供给的情况我们只需要统计每条边是否有供给,如果有,那么它有多少需求,将这些需求满足形成若干个三角形,以此累加三角形数。

1
2
3
Key: 输入文件的行偏移
Value: "A#B #" 或者 "A#B @"
Output Key: <Result of Triangle Number>

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RESULT = 0
void map(Object key, Text value)
st = Tokenizer(value)
A = st.nextToken()
B = st.nextToken()
emit(A, B)

void reduce(Text key, Iterable<Text> values)
int jin = 0, at = 0;
for (Text val : values) {
if (val.toString().equals("#"))
jin++; // "#"
else if (val.toString().equals("@"))
at++; // "@"数目加
}
if (jin > 0) RESULT += at; // 如果有("#"),则统计at个三角形

  • TriangleCountDriver.java

驱动三个Job的执行,读取用户命令行参数,然后调用以上三个类,统筹整个程序的执行。

1
2
3
4
5
GraphUndirecter.main(new String[]{myArgs[0], myArgs[1]});  // input -> middle1

TilingGraph.main(new String[]{myArgs[1], myArgs[2]}); // middle1 -> middle2

FeedAndCount.main(new String[]{myArgs[2], myArgs[3]}); // middle2 -> output

调优后最好的成绩跑twitter数据集用了6分多钟,跑google数据集只跑了一次,跑了20+小时,所以效率并不是最优的。

进一步的尝试

由于以上三个job的方式仍需要6分钟,感觉还是太慢,想了想能否优化一下呢,因为这里会读三次磁盘(数据,中间结果1,中间结果2),这样磁盘读写的消耗是很大的。仔细分析一下是可以缩减磁盘读写次数的。

尝试一

看到第一个类其实就是map一下,去一下重,把无向图做出来,没有reduce,所以想直接一步做图的无向化和图展开,发射需求和供给边,即

1
2
3
4
5
6
7
8
第一步: 
map:
"A B" -> "key:A, value:B" (为了不发射重边,做了一个HashSet判重)
reduce:
A, [B, C, D...,S] -> "A#B #", "A#C #", ... , "B#C @", ...

第二步:
统计

这样直接两步做完,第一步工作量会大一点。可是很遗憾,在本地跑是完全可以的,可是放到集群上运行,map到21%就map不动了,可能是HashSet同步问题的原因?或者是shuffle量太大的原因?不得而知,没有报错也看不到日志,怀疑是后者,量太大导致内存不够,一直在GC,导致卡死。因为缩减数据规模还是勉强能跑的,不过才30M(twitter)就这样了,肯定不可取。

尝试二

上一步的失败可能是因为第一个job工作量太大了,既要判重发射所有无向边,又要发射存在的边和枚举近邻之间发射需求边,这对它不公平,所以考虑将工作分配均匀一下,这样就得到了下面一种方法:
(注:这里的A,B,C,D…都代表某一个点)

1
2
3
4
5
6
7
8
9
10
11
JOB1:
Key: 输入文件的行偏移
Value: "A B"
Output key: "A"
Output value: "B,C,E,D,...,S"

JOB2:
Key: 输入文件的行偏移
Value: "A B,C,E,D,...,S"
Output key: "A#B"
Output value: "#" 或者 "@"

好吧,这样终于可行了,结果也是对的,但是结果却并让人高兴不起来,因为效率上并没有提升,反而比读三次磁盘的方法慢得多,跑了将近11分钟!这也太诡异了。绝对还有什么地方不对,再想想。其实我们没有注意到到底是不是并行在做?查找资料得知,hadoop mapreduce每个作业如果没有配置map,reduce数的话,默认map任务数是2,而默认reduce数为1!这能行?这还叫并行?全挤到一两个reduce task去做,能不慢么?
于是看看哪些地方可以增加reduce数。可以看到,第一种方法的GraphUndirecter虽然reduce数为0,但是TilingGraph的reduce是可以许多节点来做的,我们把numReduceTasks设为4,或者6,然后这个reduce数一般就是下一步的map数,然后最后FeedAndCount由于需要计算总数,所以必须一个reduce节点来做。所以总的来说,我们可以把中间步用多个reduce task来做。
同样的,尝试二的方法也可以在GraphUndirecterAndTiling步多设几个reduce来做。

此时的程序性能

最终采用了尝试二中的方法,两个job完成这个工作,跑了以下2个数据集,效果勉强还可以。大概是4分钟跑完twitter数据集,比原来缩短一倍,跑Google+数据集也比一开始跑的20+个小时缩短到了13个小时。但是显然,这还不够快!天下武功,唯快不破。继续优化吧。

数据集 三角形个数 Driver程序在集群上的运行时间(s)
Twitter 13082506 247
Google+ 1073677742 47760

继续优化

由上面的尝试二,这里又有了启发,我们可以看到,最后一步由1个reduce去统计和求和,这里要求总数的话又必须只让一个reduce来做,鸭梨不要太大,可以看到他做的两个工作实际上差异很大,一个是要扫描统计多少需要,另一个只要简单加和一下就行了,为了减轻该同志的鸭梨,那么我们再造一个job,把这两个工作分开是不是会快一点呢。猜测得到了证实,确实是可以的!意思就是,前面第三个job只统计求局部和(有一个扫描过程),这样的话我们可以搞很多reduce来做局部统计求和,然后把它们存到中间文件,最后由job4来对所有这些数求和,因为最后一步就是发射求和,所以时间远比统计来得快。
所以最终的JOB工作分配如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
JOB1:
Key: 输入文件的行偏移
Value: "A B"
Output key: "A"
Output value: "B,C,E,D,...,S"

JOB2:
Key: 输入文件的行偏移
Value: "A B,C,E,D,...,S"
MapOutputKey: "A#B"
MapOutputValue: 1(have) 或者 2(need)
Output key: NULL
Output value: 三角形个数(局部) //这里做成VIntWritable类型,把单个字符的2个字节空间做成1个字节的VIntWritable

JOB3:
key: 中间文件的行偏移
Value: 三角形个数
OutputKey: "Total Triangles: "
OutputValue: 三角形总数

最终算法如下:

  • GraphUndirecterAndTiling
1
2
3
4
5
6
7
8
void map(key, value):
A = value[1]

B = value[2]
if A != B:
emit(As, Bs) // As = min(A, B), Bs = max(A,B)

void reduce(key, values):
emit(key, ",".join(values)) // A B,C,D...,S

  • JustCount
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void map(key, value):
for val in value.split(','):
emit(key#val, 1) // 供给
for i, j in value.split(',') and i < j:
emit(Vs1#Vs2, 2) // 需求 Vs1 = min(values[i], values[j]), Vs2 = max(values[i], values[j])

void reduce(key, values):
int have = 0, need = 0, res = 0;
for (val : values) {
if (val.equals(1))
have++;
else if (val.equals(2))
need++;
}
if (jin > 0) res += need; // 如果有(have),则统计need个三角形
if res > 0:
write(res)
  • CollectSum
1
2
3
4
5
6
7
void map(key, value):
emit("#", value)

void reduce(key, values):
for val in values:
sum += val
write("Total Triangles: ", sum)
  • TCDriver
1
2
3
4
5
GraphUndirecterAndTiling.main(new String[]{myArgs[0], myArgs[1], myArgs[4]});  // input -> middle1
// myArgs[4] is numReduceTasks
JustCount.main(new String[]{myArgs[1], myArgs[2], myArgs[4]}); // middle1 -> middle2

CollectSum.main(new String[]{myArgs[2], myArgs[3]}); // middle2 -> output

程序性能分析

最终程序性能得到了很大的提高,耗时如下:

数据集 三角形个数 Driver程序在集群上的运行时间(s)
Twitter 13082506 108
Google+ 1073677742 01:54:28 (6868s)

效果还是挺不错的,不到两分钟完成了31M twitter数据集的统计,不到两个小时完成了Google+ 570多M的数据统计,可能也跟集群当前的负载有关系,集群负载轻的时候应该会快一些,较前面的方法提升了两倍和6倍多,并且结果也是对的,

这里的算法和reduce数还是可以调整的,并且在某些条件下,应该还是能够进一步优化下去的。没有最快,只有更快,但是由于时间关系,就不继续探索下去了,读者自探,欢迎留言评论。

后话

这里由于想进行大量的优化,所以用到了一些技巧,也是从别人那里学来的,比如:

  • 在在乎网络传输的时候,可适当使用压缩(我没用)
  • 重用Writable类型。 各类Writable占用空间如下:

Text类采用的UTF-8编码,使用变长的1~4个字节对字符进行编码。对于ASCII字符只使用1个字节,而对于High ASCII和多字节字符使用2~4个字节表示,而不像Java基本Character类的UTF-16编码,每个字符用两个字节表示。所以注意,IntWritable是固定4个字节存储的,比一个字符存储量大,所以如果一个字符能表示,那么就用一个字符,如果不能用一个字符,那么可以考虑VIntWritable类型,VIntWritable类型则根据数值的大小使用相应的字节长度表示,当数值在-112~127之间时使用1个字节表示,在-112~127范围之外的数值使用头一个字节表示该数值的正负符号以及字节长度(zero-compressed encoded integer)。
IntWritable适合数值均匀分布的情形,而变长的Writable类型适合数值分布不均匀的情形,一般情况下变长的Writable类型更节省空间,因为大多数情况下数值是不均匀的,对于整数类型的Writable选择,Zhou’s Blog建议:

  1. 除非对数据的均匀分布很有把握,否则使用变长Writable类型
  2. 除非数据的取值区间确定在int范围之内,否则为了程序的可扩展性,请选择VLongWritable类型

所以多使用VIntWritable, VLongWritable类型也不失一种好的选择。当然,能用一个ASCII字符表示那就用Text也就行了。

所以这里面还是有很多学问的。

大数据 | Big Data