在实时流数据处理中,我们通常可以采用Flink+Clickhouse的方式做实时的OLAP处理。关于两者的优点就不再赘述,本文采用一个案例来简要介绍一下整体的流程。

整体流程:

  1. 向kafka特定主题下导入json格式数据
  2. 编写Flink Kafka Comsumer消费主题下的数据
  3. 利用Flink算子对数据进行处理(ETL)
  4. 将处理后的数据下沉到Clickhouse数据库中

向kafka特定主题下导入json格式数据

在创建好主题后,利用kafka-console-producer.sh命令将预先的JSON格式数据发送到创建好的主题下,比如JSON格式数据:

{"appKey":"mailandroid","deviceId":"1807516f-1cb3-4a6e-8ac1-454d401a5716","version":"1.0","uid":"","dashiUid":"1388f4059f87578418ba2906c5425af5","ua":"","carrier":"中国移动", ...}
{"appKey":"mailios","deviceId":"0B4D45A9-3212-4C38-B58E-1A96792AF297","version":"1.0","uid":"","dashiUid":"c53f631b1d33273f28953893b7383e0a","ua":"Mozilla/5.0 (iPhone; CPU iPhone OS 15_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148","carrier":"中国移动", ...}
...
复制代码

写入完成后,可以利用kafka-console-consumer.sh来进行查看对应主题下的数据是否有被写入。

编写Flink Kafka Comsumer消费主题下的数据

在Idea中创建项目来编写代码连接Kafka进行消费。

package com.demo.flink;

import com.alibaba.fastjson.JSON;
import com.demo.flink.pojo.Mail;
import com.demo.flink.utils.MyClickHouseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.HashMap;
import java.util.Properties;

publicclassFlinkSinkClickhouse {

publicstaticvoidmain(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// sourceString topic = "test_process";

Properties props =newProperties();
// 设置连接kafka集群的参数
        props.setProperty("bootstrap.servers", "10.224.192.133:9092, 10.224.192.134:9092");

// 定义Flink Kafka Consumer
        FlinkKafkaConsumer<String> consumer =newFlinkKafkaConsumer<String>(topic,newSimpleStringSchema(), props);

        consumer.setStartFromGroupOffsets();
        consumer.setStartFromEarliest();// 设置每次都从头消费

// 添加source数据流
        DataStreamSource<String> source = env.addSource(consumer);
        System.out.println(source);
        SingleOutputStreamOperator<Mail> dataStream = source.map(newMapFunction<String, Mail>() {
@Overridepublic Mailmap(String value)throws Exception {
                HashMap<String, String> hashMap = JSON.parseObject(value, HashMap.class);
// System.out.println(hashMap);String appKey = hashMap.get("appKey");
String appVersion = hashMap.get("appVersion");
String deviceId = hashMap.get("deviceId");
String phone_no = hashMap.get("phone_no");
Mail mail =newMail(appKey, appVersion, deviceId, phone_no);
// System.out.println(mail);return mail;
            }
        });
        dataStream.print();

// sinkString sql = "INSERT INTO test.ods_countlyV2 (appKey, appVersion, deviceId, phone_no) " +
                "VALUES (?, ?, ?, ?)";
MyClickHouseUtil ckSink =newMyClickHouseUtil(sql);
        dataStream.addSink(ckSink);

        env.execute();
复制代码

上面利用了Java Flink连接Kafka的方式进行连接,设置了一些初始化和连接必要的参数。最后addSource添加数据流

利用Flink算子对数据进行处理(ETL)

一个简单的ETL过程,使用了Flink的Map算子,在Map算子中编写自己的数据处理逻辑。这里的Mail类是我自己定义的Pojo类,用来封装处理后需要保存的json结果。由于Kafka读取出来的数据是String格式的value,因此利用了fastjson的JSON.parseObject(value, HashMap.class)来转换为HashMap的格式,便于取出我需要的键值对。最后将所需要的键值对封装为MailPojo类进行返回。以此来对数据流做一个简单的ETL过程。

将处理后的数据下沉到Clickhouse数据库中

处理好的数据最后需要下沉到Clickhouse中进行保存和使用。下面给出sink clickhouse的代码

package com.demo.flink.utils;

import com.demo.flink.pojo.Mail;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;

publicclassMyClickHouseUtilextendsRichSinkFunction<Mail> {
privateClickHouseConnection conn = null;

    String sql;

publicMyClickHouseUtil(String sql) {
        this.sql = sql;
    }

@Overridepublicvoidopen(Configuration parameters)throws Exception {
        super.open(parameters);
return ;
    }

@Overridepublicvoidclose()throws Exception {
        super.close();
if (conn != null)
        {
            conn.close();
        }
    }

@Overridepublicvoidinvoke(Mail mail, Context context)throws Exception {

String url = "jdbc:clickhouse://10.224.192.133:8123/test";
ClickHouseProperties properties =newClickHouseProperties();
        properties.setUser("default");
        properties.setPassword("ch20482048");
        properties.setSessionId("default-session-id");

ClickHouseDataSource dataSource =newClickHouseDataSource(url, properties);
        Map<ClickHouseQueryParam, String> additionalDBParams =newHashMap<>();

        additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, "new-session-id");

try {
             conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
             preparedStatement.setString(1,mail.getAppKey());
             preparedStatement.setString(2, mail.getAppVersion());
             preparedStatement.setString(3, mail.getDeviceId());
             preparedStatement.setString(4, mail.getPhone_no());

             preparedStatement.execute();
        }
catch (Exception e){
            e.printStackTrace();
        }
    }
}
复制代码

MyClickHouseUtil类继承了RichSinkFunction类。由于前面的Flink算子处理后的数据流类型是Mail类型的,因此RichSinkFunction类的泛型为Mail类型。