logo头像
Snippet 博客主题

Spark大数据开发之二:圈人,找出满足规则的人群

一、开发需求

  • 根据任务规则,圈出符合规则的所有会员,以方便运营针对这些会员做特定的营销活动。
  • 一次有多个任务需要圈人。
  • 常见任务规则如下。

任务规则

二、数据准备

  1. Mysql中的规则表。
  2. Hbase中的会员标签表。Spark大数据开发之一:给会员打标签

三、开发思路

  1. 读取所有需要圈人的任务ID,以及每个任务对应的规则。
  2. 遍历会员标签表。
  3. 对每一个会员,依次判断符合那些任务规则。
  4. 将符合条件的会员保存到Hbase,RowKey为[日期~任务ID~会员ID]

四、参考代码

1. 读取Mysql

1
2
3
4
5
6
7
8
9
10
import java.sql.{Connection, DriverManager, Statement}

val connection = DriverManager.getConnection("db.url", "db.user", "db.password")
val statement: Statement = connection.createStatement()
var sql = "" // 查询任务id和任务规则
var resultSet = statement.executeQuery(sql)
while (resultSet.next()) {
// resultSet.getString("task_id")
// ...
}

2. 遍历会员标签表

创建Hive会员标签表映射Hbase会员标签表,通过SparkSession.sql执行hive-sql查询Hive会员标签表,遍历所有会员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val spark = SparkSession.builder().appName("TaskCircleCommon").enableHiveSupport().getOrCreate()
val sql = "SELECT vip_id, tag_info from dw.vips"
val rows = spark.sql(sql)
val rddData = rows.rdd.flatMap(item => {
// 圈人逻辑,判断会员符合哪些任务规则


// 返回待保存到Hbase的数据结构
var putArr: ArrayBuffer[(ImmutableBytesWritable,Put)] = ArrayBuffer()
val hBaseKey = curDate + "~" + taskId + "~" + audienceId
var p = new Put(hBaseKey.getBytes)
p.addColumn("i".getBytes, "a_id".getBytes, audienceId.getBytes)
val imPut = (new ImmutableBytesWritable(Bytes.toBytes(hBaseKey)), p)
putArr += imPut
putArr
})

3. 保存结果到Hbase

1
2
3
4
5
6
7
//hbase配置
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, "dw:c_plan_targeting")
conf.set("mapreduce.job.outputformat.class", "org.apache.hadoop.hbase.mapreduce.TableOutputFormat")

// 保存
rddData.saveAsNewAPIHadoopDataset(conf)

4. 前端显示


圈人结果

上一篇