Is there anyway i can use StreamTableEnvrionment in ProcessWindowFunction?


Using Flink to read MySQL binlog and write to a Hudi table, but I want to partition the binlog data source into windows, and batch insert all the data within a window into the Hudi table when the window closes. My current approach is to use a ProcessWindowFunction to process all the data within one window, concatenate all data of each window into an SQL statement, and perform the batch insertion when the window ends by using the executeSql() method of StreamTableEnvironment.


It seems StreamTableEnvironment can not be used in ProcessWindowFunction

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2052)
    at org.apache.flink.streaming.api.datastream.WindowedStream.process(WindowedStream.java:602)
    at org.apache.flink.streaming.api.datastream.WindowedStream.process(WindowedStream.java:584)
    at flink_learn.chapter09_flinkcdc.Test03_MySqlToHudi.main(Test03_MySqlToHudi.java:113)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
    ... 7 more


I used to think that use RichSinkFunction() to implement what i want, but it seems like Hudi doesn t have any flink api for bulk insert, so the only way i can bulk insert data into hudi table is using the executeSql() method of StreamTableEnvironment.


public class Test05_MySqlToHudi {
    private static String HOSTNAME = "localhost";
    private static String USERNAME = "root";
    private static String PASSWORD = "root";
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "8881");

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);


        String createDeptTableSql = "create table dept(
" +
                "  dept_id BIGINT PRIMARY KEY NOT ENFORCED,
" +
                "  dept_name varchar(10)
" +
" +
                "with (
" +
                "   connector  =  hudi ,
" +
                "   path  =  hdfs://localhost:9000/hudi/dept ,
" +
                "   table.type  =  MERGE_ON_READ 
" +

        String createStaffTableSql = "create table staff(
" +
                "  id BIGINT PRIMARY KEY NOT ENFORCED,
" +
                "  username varchar(10),
" +
                "  password varchar(10),
" +
                "  phone varchar(10),
" +
                "  role_id bigint,
" +
                "  dept_id bigint
" +
" +
                "with (
" +
                "   connector  =  hudi ,
" +
                "   path  =  hdfs://localhost:9000/hudi/staff ,
" +
                "   table.type  =  MERGE_ON_READ 
" +

        SingleOutputStreamOperator<String> process = sEnv
                .fromSource(getMySqlStreamSource(), WatermarkStrategy.noWatermarks(), "MySQL Source")
                .map(new JsonToEntityMapFunction())
                        .withTimestampAssigner((SerializableTimestampAssigner<JSONObject>) (binlogJson, recordTimestamp) -> {

                            String arriveTime = JSONObject.parseObject(binlogJson.getString("after")).getString("arrive_time");
                            String arriveTimeUTC = arriveTime.replace("Z", " UTC");
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd T HH:mm:ss Z");
                            Date date = null;
                            try {
                                date = sdf.parse(arriveTimeUTC);
                            } catch (ParseException e) {
                                throw new RuntimeException(e);
                            return date.getTime();
                .keyBy(binlogJson -> JSONObject.parseObject(binlogJson.getString("source")).getString("table"))
                .process(new ProcessWindowFunction<JSONObject, String, String, TimeWindow>() {
                    public void process(String key, ProcessWindowFunction<JSONObject, String, String, TimeWindow>.Context context, Iterable<JSONObject> elements, Collector<String> out) throws Exception {

                        if ("dept".equals(key)) {
                            String insertDataSql = "insert into dept values
                            ArrayList<JSONObject> deptJsonObjects = Lists.newArrayList(elements);
                            for (JSONObject jsonObject : deptJsonObjects) {
                                String afterData = jsonObject.getString("after");
                                JSONObject rowDataObject = JSON.parseObject(afterData);
                                Long deptId = Long.parseLong(rowDataObject.getString("dept_id"));
                                String deptName = rowDataObject.getString("dept_name");
                                System.out.println("dept---> " + deptId + " -----  " + deptName);
                                insertDataSql += "(" + deptId + ", " + deptName + " ),
                            insertDataSql = insertDataSql.substring(0, insertDataSql.length() - 2);
                            // out.collect(insertDataSql);
                            sTableEnv.executeSql(insertDataSql);  // report exception

                        } else {
                            String insertDataSql = "insert into staff values
                            ArrayList<JSONObject> staffJsonObjects = Lists.newArrayList(elements);
                            for (JSONObject jsonObject : staffJsonObjects) {
                                String afterData = jsonObject.getString("after");
                                JSONObject rowDataObject = JSON.parseObject(afterData);
                                Long id = Long.parseLong(rowDataObject.getString("id"));
                                String username = rowDataObject.getString("username");
                                String phone = rowDataObject.getString("phone");
                                String password = rowDataObject.getString("password");
                                Long deptId = Long.parseLong(rowDataObject.getString("dept_id"));
                                Long roleId = Long.parseLong(rowDataObject.getString("role_id"));
                                insertDataSql += "(" + id + ", " + username + " , " + password + " , " + phone + " ," + deptId + "," + roleId + "),
                            insertDataSql = insertDataSql.substring(0, insertDataSql.length() - 2);
                            sTableEnv.executeSql(insertDataSql); // report exception
                            // out.collect(insertDataSql);

        sEnv.execute("mysql to hudi");



