博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 实时计算整合案例
阅读量:5836 次
发布时间:2019-06-18

本文共 8767 字,大约阅读时间需要 29 分钟。

1.概述

  最近有同学问道,除了使用 Storm 充当实时计算的模型外,还有木有其他的方式来实现实时计算的业务。了解到,在使用 Storm 时,需要编写基于编程语言的代码。比如,要实现一个流水指标的统计,需要去编写相应的业务代码,能不能有一种简便的方式来实现这一需求。在解答了该同学的疑惑后,整理了该实现方案的一个案例,供后面的同学学习参考。

2.内容

  实现该方案,整体的流程是不变的,我这里只是替换了其计算模型,将 Storm 替换为 Spark,原先的数据收集,存储依然可以保留。

2.1 Spark Overview

  Spark 出来也是很久了,说起它,应该并不会陌生。它是一个开源的类似于 Hadoop MapReduce 的通用并行计算模型,它拥有 Hadoop MapReduce 所具有的有点,但与其不同的是,MapReduce 的 JOB 中间输出结果可以保存在内存中,不再需要回写磁盘,因而,Spark 能更好的适用于需要迭代的业务场景。

2.2 Flow

  上面只是对 Spark 进行了一个简要的概述,让大家知道其作用,由于本篇博客的主要内容并不是讲述 Spark 的工作原理和计算方法,多的内容,这里笔者就不再赘述,若是大家想详细了解 Spark 的相关内容,可参考官方文档。[]

  接下来,笔者为大家呈现本案例的一个实现流程图,如下图所示:

  通过上图,我们可以看出,首先是采集上报的日志数据,将其存放于消息中间件,这里消息中间件采用的是 Kafka,然后在使用计算模型按照业务指标实现相应的计算内容,最后是将计算后的结果进行持久化,DB 的选择可以多样化,这里笔者就直接使用了 Redis 来作为演示的存储介质,大家所示在使用中,可以替换该存储介质,比如将结果存放到 HDFS,HBase Cluster,或是 MySQL 等都行。这里,我们使用 Spark SQL 来替换掉 Storm 的业务实现编写。

3.实现

  在介绍完上面的内容后,我们接下来就去实现该内容,首先我们要生产数据源,实际的场景下,会有上报好的日志数据,这里,我们就直接写一个模拟数据类,实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
object 
KafkaIPLoginProducer {
  
private 
val 
uid 
= 
Array(
"123dfe"
"234weq"
,
"213ssf"
)
 
  
private 
val 
random 
= 
new 
Random()
 
  
private 
var 
pointer 
= 
-
1
 
  
def 
getUserID()
: 
String 
= 
{
    
pointer 
= 
pointer + 
1
    
if 
(pointer >
= 
users.length) {
      
pointer 
= 
0
      
uid(pointer)
    
else 
{
      
uid(pointer)
    
}
  
}
 
  
def 
plat()
: 
String 
= 
{
    
random.nextInt(
10
) + 
"10"
  
}
 
  
def 
ip()
: 
String 
= 
{
    
random.nextInt(
10
) + 
".12.1.211"
  
}
 
  
def 
country()
: 
String 
= 
{
    
"中国" 
+ random.nextInt(
10
)
  
}
 
  
def 
city()
: 
String 
= 
{
    
"深圳" 
+ random.nextInt(
10
)
  
}
 
  
def 
location()
: 
JSONArray 
= 
{
    
JSON.parseArray(
"[" 
+ random.nextInt(
10
) + 
"," 
+ random.nextInt(
10
) + 
"]"
)
  
}
 
  
def 
main(args
: 
Array[String])
: 
Unit 
= 
{
    
val 
topic 
= 
"test_data3"
    
val 
brokers 
= 
"dn1:9092,dn2:9092,dn3:9092"
    
val 
props 
= 
new 
Properties()
    
props.put(
"metadata.broker.list"
, brokers)
    
props.put(
"serializer.class"
"kafka.serializer.StringEncoder"
)
 
    
val 
kafkaConfig 
= 
new 
ProducerConfig(props)
    
val 
producer 
= 
new 
Producer[String, String](kafkaConfig)
 
    
while 
(
true
) {
      
val 
event 
= 
new 
JSONObject()
 
      
event
        
.put(
"_plat"
"1001"
)
        
.put(
"_uid"
"10001"
)
        
.put(
"_tm"
, (System.currentTimeMillis / 
1000
).toString())
        
.put(
"ip"
, ip)
        
.put(
"country"
, country)
        
.put(
"city"
, city)
        
.put(
"location"
, JSON.parseArray(
"[0,1]"
))
      
println(
"Message sent: " 
+ event)
      
producer.send(
new 
KeyedMessage[String, String](topic, event.toString))
       
      
event
        
.put(
"_plat"
"1001"
)
        
.put(
"_uid"
"10001"
)
        
.put(
"_tm"
, (System.currentTimeMillis / 
1000
).toString())
        
.put(
"ip"
, ip)
        
.put(
"country"
, country)
        
.put(
"city"
, city)
        
.put(
"location"
, JSON.parseArray(
"[0,1]"
))
      
println(
"Message sent: " 
+ event)
      
producer.send(
new 
KeyedMessage[String, String](topic, event.toString))
       
      
event
        
.put(
"_plat"
"1001"
)
        
.put(
"_uid"
"10002"
)
        
.put(
"_tm"
, (System.currentTimeMillis / 
1000
).toString())
        
.put(
"ip"
, ip)
        
.put(
"country"
, country)
        
.put(
"city"
, city)
        
.put(
"location"
, JSON.parseArray(
"[0,1]"
))
      
println(
"Message sent: " 
+ event)
      
producer.send(
new 
KeyedMessage[String, String](topic, event.toString))
 
      
event
        
.put(
"_plat"
"1002"
)
        
.put(
"_uid"
"10001"
)
        
.put(
"_tm"
, (System.currentTimeMillis / 
1000
).toString())
        
.put(
"ip"
, ip)
        
.put(
"country"
, country)
        
.put(
"city"
, city)
        
.put(
"location"
, JSON.parseArray(
"[0,1]"
))
      
println(
"Message sent: " 
+ event)
      
producer.send(
new 
KeyedMessage[String, String](topic, event.toString))
      
Thread.sleep(
30000
)
    
}
  
}
}

   上面代码,通过 Thread.sleep() 来控制数据生产的速度。接下来,我们来看看如何实现每个用户在各个区域所分布的情况,它是按照坐标分组,平台和用户ID过滤进行累加次数,逻辑用 SQL 实现较为简单,关键是在实现过程中需要注意的一些问题,比如对象的序列化问题。这里,细节的问题,我们先不讨论,先看下实现的代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
object 
IPLoginAnalytics {
 
  
def 
main(args
: 
Array[String])
: 
Unit 
= 
{
    
val 
sdf 
= 
new 
SimpleDateFormat(
"yyyyMMdd"
)
    
var 
masterUrl 
= 
"local[2]"
    
if 
(args.length > 
0
) {
      
masterUrl 
= 
args(
0
)
    
}
 
    
// Create a StreamingContext with the given master URL
    
val 
conf 
= 
new 
SparkConf().setMaster(masterUrl).setAppName(
"IPLoginCountStat"
)
    
val 
ssc 
= 
new 
StreamingContext(conf, Seconds(
5
))
 
    
// Kafka configurations
    
val 
topics 
= 
Set(
"test_data3"
)
    
val 
brokers 
= 
"dn1:9092,dn2:9092,dn3:9092"
    
val 
kafkaParams 
= 
Map[String, String](
      
"metadata.broker.list" 
-> brokers, 
"serializer.class" 
-> 
"kafka.serializer.StringEncoder"
)
 
    
val 
ipLoginHashKey 
= 
"mf::ip::login::" 
+ sdf.format(
new 
Date())
 
    
// Create a direct stream
    
val 
kafkaStream 
= 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
 
    
val 
events 
= 
kafkaStream.flatMap(line 
=
> {
      
val 
data 
= 
JSONObject.fromObject(line.
_
2
)
      
Some(data)
    
})
 
    
def 
func(iter
: 
Iterator[(String, String)])
: 
Unit 
= 
{
      
while 
(iter.hasNext) {
        
val 
item 
= 
iter.next()
        
println(item.
_
1 
"," 
+ item.
_
2
)
      
}
    
}
 
    
events.foreachRDD { rdd 
=
>
      
// Get the singleton instance of SQLContext
      
val 
sqlContext 
= 
SQLContextSingleton.getInstance(rdd.sparkContext)
      
import 
sqlContext.implicits.
_
      
// Convert RDD[String] to DataFrame
      
val 
wordsDataFrame 
= 
rdd.map(f 
=
> Record(f.getString(
"_plat"
), f.getString(
"_uid"
), f.getString(
"_tm"
), f.getString(
"country"
), f.getString(
"location"
))).toDF()
 
      
// Register as table
      
wordsDataFrame.registerTempTable(
"events"
)
      
// Do word count on table using SQL and print it
      
val 
wordCountsDataFrame 
= 
sqlContext.sql(
"select location,count(distinct plat,uid) as value from events where from_unixtime(tm,'yyyyMMdd') = '" 
+ sdf.format(
new 
Date()) + 
"' group by location"
)
      
var 
results 
= 
wordCountsDataFrame.collect().iterator
 
      
/**
       
* Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
       
*/
      
object 
InternalRedisClient 
extends 
Serializable {
 
        
@
transient 
private 
var 
pool
: 
JedisPool 
= 
null
 
        
def 
makePool(redisHost
: 
String, redisPort
: 
Int, redisTimeout
: 
Int,
          
maxTotal
: 
Int, maxIdle
: 
Int, minIdle
: 
Int)
: 
Unit 
= 
{
          
makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, 
true
false
10000
)
        
}
 
        
def 
makePool(redisHost
: 
String, redisPort
: 
Int, redisTimeout
: 
Int,
          
maxTotal
: 
Int, maxIdle
: 
Int, minIdle
: 
Int, testOnBorrow
: 
Boolean,
          
testOnReturn
: 
Boolean, maxWaitMillis
: 
Long)
: 
Unit 
= 
{
          
if 
(pool 
== 
null
) {
            
val 
poolConfig 
= 
new 
GenericObjectPoolConfig()
            
poolConfig.setMaxTotal(maxTotal)
            
poolConfig.setMaxIdle(maxIdle)
            
poolConfig.setMinIdle(minIdle)
            
poolConfig.setTestOnBorrow(testOnBorrow)
            
poolConfig.setTestOnReturn(testOnReturn)
            
poolConfig.setMaxWaitMillis(maxWaitMillis)
            
pool 
= 
new 
JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
 
            
val 
hook 
= 
new 
Thread {
              
override 
def 
run 
= 
pool.destroy()
            
}
            
sys.addShutdownHook(hook.run)
          
}
        
}
 
        
def 
getPool
: 
JedisPool 
= 
{
          
assert(pool !
= 
null
)
          
pool
        
}
      
}
 
      
// Redis configurations
      
val 
maxTotal 
= 
10
      
val 
maxIdle 
= 
10
      
val 
minIdle 
= 
1
      
val 
redisHost 
= 
"dn1"
      
val 
redisPort 
= 
6379
      
val 
redisTimeout 
= 
30000
      
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
      
val 
jedis 
= 
InternalRedisClient.getPool.getResource
      
while 
(results.hasNext) {
        
var 
item 
= 
results.next()
        
var 
key 
= 
item.getString(
0
)
        
var 
value 
= 
item.getLong(
1
)
        
jedis.hincrBy(ipLoginHashKey, key, value)
      
}
    
}
 
    
ssc.start()
    
ssc.awaitTermination()
 
  
}
}
 
/** Case class for converting RDD to DataFrame */
case 
class 
Record(plat
: 
String, uid
: 
String, tm
: 
String, country
: 
String, location
: 
String)
 
/** Lazily instantiated singleton instance of SQLContext */
object 
SQLContextSingleton {
 
  
@
transient 
private 
var 
instance
: 
SQLContext 
= 
_
 
  
def 
getInstance(sparkContext
: 
SparkContext)
: 
SQLContext 
= 
{
    
if 
(instance 
== 
null
) {
      
instance 
= 
new 
SQLContext(sparkContext)
    
}
    
instance
  
}
}

  我们在开发环境进行测试的时候,使用 local[k] 部署模式,在本地启动 K 个 Worker 线程来进行计算,而这 K 个 Worker 在同一个 JVM 中,上面的示例,默认使用 local[k] 模式。这里我们需要普及一下 Spark 的架构,架构图来自 Spark 的官网,[]

  这里,不管是在 local[k] 模式,Standalone 模式,还是 Mesos 或是 YARN 模式,整个 Spark Cluster 的结构都可以用改图来阐述,只是各个组件的运行环境略有不同,从而导致他们可能运行在分布式环境,本地环境,亦或是一个 JVM 实利当中。例如,在 local[k] 模式,上图表示在同一节点上的单个进程上的多个组件,而对于 YARN 模式,驱动程序是在 YARN Cluster 之外的节点上提交 Spark 应用,其他组件都是运行在 YARN Cluster 管理的节点上的。

  而对于 Spark Cluster 部署应用后,在进行相关计算的时候会将 RDD 数据集上的函数发送到集群中的 Worker 上的 Executor,然而,这些函数做操作的对象必须是可序列化的。上述代码利用 Scala 的语言特性,解决了这一问题。

4.结果预览

  在完成上述代码后,我们执行代码,看看预览结果如下,执行结果,如下所示:

4.1 启动生产线程

4.2 Redis 结果预览

5.总结

  整体的实现内容不算太复杂,统计的业务指标,这里我们使用 SQL 来完成这部分工作,对比 Storm 来说,我们专注 SQL 的编写就好,难度不算太大。可操作性较为友好。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

转载地址:http://wzfcx.baihongyu.com/

你可能感兴趣的文章
运维工程师在干什么学些什么?【致菜鸟】
查看>>
Linux中iptables详解
查看>>
java中回调函数以及关于包装类的Demo
查看>>
maven异常:missing artifact jdk.tools:jar:1.6
查看>>
终端安全求生指南(五)-——日志管理
查看>>
Nginx 使用 openssl 的自签名证书
查看>>
创业维艰、守成不易
查看>>
PHP环境安装套件:快速安装LAMP环境
查看>>
CSS3
查看>>
ul下的li浮动,如何是ul有li的高度
查看>>
C++ primer plus
查看>>
python mysqlDB
查看>>
UVALive 3942 Remember the Word Tire+DP
查看>>
从微软的DBML文件中我们能学到什么(它告诉了我们什么是微软的重中之重)~目录...
查看>>
被需求搞的一塌糊涂,怎么办?
查看>>
c_数据结构_队的实现
查看>>
jquery 选择器总结
查看>>
Qt设置背景图片
查看>>
【阿里云文档】常用文档整理
查看>>
java中的Volatile关键字
查看>>