English 中文(简体)
Is there anyway i can use StreamTableEnvrionment in ProcessWindowFunction?
原标题:

Scenario

Using Flink to read MySQL binlog and write to a Hudi table, but I want to partition the binlog data source into windows, and batch insert all the data within a window into the Hudi table when the window closes. My current approach is to use a ProcessWindowFunction to process all the data within one window, concatenate all data of each window into an SQL statement, and perform the batch insertion when the window ends by using the executeSql() method of StreamTableEnvironment.

Problem

It seems StreamTableEnvironment can not be used in ProcessWindowFunction

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2052)
    at org.apache.flink.streaming.api.datastream.WindowedStream.process(WindowedStream.java:602)
    at org.apache.flink.streaming.api.datastream.WindowedStream.process(WindowedStream.java:584)
    at flink_learn.chapter09_flinkcdc.Test03_MySqlToHudi.main(Test03_MySqlToHudi.java:113)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
    ... 7 more

Thought

I used to think that use RichSinkFunction() to implement what i want, but it seems like Hudi doesn t have any flink api for bulk insert, so the only way i can bulk insert data into hudi table is using the executeSql() method of StreamTableEnvironment.

Code


public class Test05_MySqlToHudi {
    
    private static String HOSTNAME = "localhost";
    private static String USERNAME = "root";
    private static String PASSWORD = "root";
    
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "8881");

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);

        sEnv.setParallelism(1);
        sEnv.enableCheckpointing(3000);

        String createDeptTableSql = "create table dept(
" +
                "  dept_id BIGINT PRIMARY KEY NOT ENFORCED,
" +
                "  dept_name varchar(10)
" +
                ")
" +
                "with (
" +
                "   connector  =  hudi ,
" +
                "   path  =  hdfs://localhost:9000/hudi/dept ,
" +
                "   table.type  =  MERGE_ON_READ 
" +
                ")";
        sTableEnv.executeSql(createDeptTableSql);

        String createStaffTableSql = "create table staff(
" +
                "  id BIGINT PRIMARY KEY NOT ENFORCED,
" +
                "  username varchar(10),
" +
                "  password varchar(10),
" +
                "  phone varchar(10),
" +
                "  role_id bigint,
" +
                "  dept_id bigint
" +
                ")
" +
                "with (
" +
                "   connector  =  hudi ,
" +
                "   path  =  hdfs://localhost:9000/hudi/staff ,
" +
                "   table.type  =  MERGE_ON_READ 
" +
                ")";
        sTableEnv.executeSql(createStaffTableSql);

        SingleOutputStreamOperator<String> process = sEnv
                .fromSource(getMySqlStreamSource(), WatermarkStrategy.noWatermarks(), "MySQL Source")
                .map(new JsonToEntityMapFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner((SerializableTimestampAssigner<JSONObject>) (binlogJson, recordTimestamp) -> {

                            String arriveTime = JSONObject.parseObject(binlogJson.getString("after")).getString("arrive_time");
                            String arriveTimeUTC = arriveTime.replace("Z", " UTC");
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd T HH:mm:ss Z");
                            Date date = null;
                            try {
                                date = sdf.parse(arriveTimeUTC);
                            } catch (ParseException e) {
                                throw new RuntimeException(e);
                            }
                            return date.getTime();
                        }))
                .keyBy(binlogJson -> JSONObject.parseObject(binlogJson.getString("source")).getString("table"))
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<JSONObject, String, String, TimeWindow>() {
                    @Override
                    public void process(String key, ProcessWindowFunction<JSONObject, String, String, TimeWindow>.Context context, Iterable<JSONObject> elements, Collector<String> out) throws Exception {

                        if ("dept".equals(key)) {
                            String insertDataSql = "insert into dept values
";
                            ArrayList<JSONObject> deptJsonObjects = Lists.newArrayList(elements);
                            for (JSONObject jsonObject : deptJsonObjects) {
                                String afterData = jsonObject.getString("after");
                                JSONObject rowDataObject = JSON.parseObject(afterData);
                                Long deptId = Long.parseLong(rowDataObject.getString("dept_id"));
                                String deptName = rowDataObject.getString("dept_name");
                                System.out.println("dept---> " + deptId + " -----  " + deptName);
                                insertDataSql += "(" + deptId + ", " + deptName + " ),
";
                            }
                            insertDataSql = insertDataSql.substring(0, insertDataSql.length() - 2);
                            // out.collect(insertDataSql);
                            sTableEnv.executeSql(insertDataSql);  // report exception

                        } else {
                            String insertDataSql = "insert into staff values
";
                            ArrayList<JSONObject> staffJsonObjects = Lists.newArrayList(elements);
                            for (JSONObject jsonObject : staffJsonObjects) {
                                String afterData = jsonObject.getString("after");
                                JSONObject rowDataObject = JSON.parseObject(afterData);
                                Long id = Long.parseLong(rowDataObject.getString("id"));
                                String username = rowDataObject.getString("username");
                                String phone = rowDataObject.getString("phone");
                                String password = rowDataObject.getString("password");
                                Long deptId = Long.parseLong(rowDataObject.getString("dept_id"));
                                Long roleId = Long.parseLong(rowDataObject.getString("role_id"));
                                insertDataSql += "(" + id + ", " + username + " , " + password + " , " + phone + " ," + deptId + "," + roleId + "),
";
                            }
                            insertDataSql = insertDataSql.substring(0, insertDataSql.length() - 2);
                            sTableEnv.executeSql(insertDataSql); // report exception
                            // out.collect(insertDataSql);
                        }
                    }
                });
        

        sEnv.execute("mysql to hudi");
    }
}

问题回答

暂无回答




相关问题
How to Process Events from now() to -10 minutes in Flink

I want to create a Stream Flink app that will process the events that falls within the latest event timestamp to different time windows, e.g. 10 minutes, 1 day and 1 month. So lets say I got an event ...

Flink Batch vs Stream how they process real time data

I have read the documentation of streaming mode and batch mode. I assume that if I have an unbounded stream and I apply windows (like tumbling) on it it becomes a bounded stream? Please correct and ...

Flink failed initials checkpoints

I have a flink job deployed on a local kind cluster, it saves checkpoints to AWS S3. The following error kept occurring in job manager log at the initial stage: 2023-07-07 19:33:48,657 INFO org....

RocksDB state backend - WhenTableAPI used

We are using TableAPI to execute few SQL queries to achieve the desired results. Whehn creating Windows out of the streams, we observed OutOfMenmory issues. We enabled RocksDB as the state backend in ...

flink on k8s How to set the minimum and maximum CPUs

I am using flink on k8s, and now there is a problem, the dynamic resource feature of k8s cannot be utilized, one of my flink tasks only occupies a small amount of CPU resources when it runs normally, ...

热门标签