当先锋百科网

首页 1 2 3 4 5 6 7

flink 环境构建工具类

public class ExecutionEnvUtil {

    /**
     * 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)
     *
     * @param args
     * @return org.apache.flink.api.java.utils.ParameterTool
     * @date 2023/8/4 - 10:05 AM
     */
    public static ParameterTool createParameterTool(final String[] args) throws Exception {
        return ParameterTool
                .fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME))
                .mergeWith(ParameterTool.fromArgs(args))
                .mergeWith(ParameterTool.fromSystemProperties());
    }


    /**
     * flink 环境配置
     *
     * @param parameterTool
     * @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
     * @date 2023/8/4 - 11:10 AM
     */
    public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));
        env.getConfig();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));
        if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {
            CheckPointUtil.setCheckpointConfig(env,parameterTool);
            // 取消作业时保留外部化 Checkpoint 数据
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        }
        env.getConfig().setGlobalJobParameters(parameterTool);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        return env;
    }
}

checkpoint 工具类

public class CheckPointUtil {

    private static final String CHECKPOINT_MEMORY = "memory";
    private static final String CHECKPOINT_FS = "fs";
    private static final String CHECKPOINT_ROCKETSDB = "rocksdb";
    /**
     * 默认的checkpoint 存储地址
     */
    private static final String CHECKPOINT_DEFAULT = "default";


    /**
     * 设置flink check point
     *
     * @param env
     * @param parameterTool
     * @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
     * @date 2023/8/4 - 10:49 AM
     */
    public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{
        // 根据类型,设置合适的状态后端
        String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);
        if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {
            //1、state 存放在内存中,默认是 5M
            StateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);
            env.setStateBackend(stateBackend);
        }
        else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {
        	
            StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);
            env.setStateBackend(stateBackend);
        }
        else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {
            RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);
            env.setStateBackend(rocksDBStateBackend);
        }

        //设置 checkpoint 周期时间
        env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));

        //高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)
        // 设置 exactly-once 模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置 checkpoint 最小间隔 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);
        // 设置 checkpoint 必须在n分钟内完成,否则会被丢弃
        env.getCheckpointConfig().setCheckpointTimeout(15*60000);
        // 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
        // 设置 checkpoint 的并发度为 1
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        return env;
    }
}

构建kafak source 、sink

/**
	 * 构建 source kafka
	 *
	 * @param parameterTool
	 * @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>
	 * @date 2023/8/4 - 2:41 PM
	 */
	private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){
		Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);

		// 正则表达式消费
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
				Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),
				new SimpleStringSchema(),
				props);

		kafkaConsumer.setCommitOffsetsOnCheckpoints(true);

		// 从最开始的位置开始消费
		if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){
			kafkaConsumer.setStartFromEarliest();
		}else{
			kafkaConsumer.setStartFromGroupOffsets();
		}

		return kafkaConsumer;
	}


	/**
	 * 构建 sink kafka
	 *
	 * @param parameterTool
	 * @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>
	 * @date 2023/8/16 - 11:38 AM
	 */
	private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){
		Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);
		return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC)
				, (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->
				new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes())
				,props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
	}

kafka 工具类

public class KafkaConfigUtil {


    /**
     * 设置 kafka 配置
     *
     * @param parameterTool
     * @return java.util.Properties
     * @date 2023/8/4 - 2:39 PM
     */
    public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {
        Properties props = parameterTool.getProperties();
        props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
        props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
        props.put("flink.partition-discovery.interval-millis", "10000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset", "latest");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        //0817 - 消费kafka数据超时时间和尝试次数
        props.put("request.timeout.ms", "30000");
        props.put("retries", 5);

        return props;
    }

    /**
     * 构建 sink kafka 配置
     *
     * @param parameterTool
     * @return java.util.Properties
     * @date 2023/8/14 - 5:54 PM
     */
    public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {
        Properties props = parameterTool.getProperties();
        props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
        return props;
    }

}

jdbc 工具类

public class JdbcDatasourceUtils {


    public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();


    /**
     * 获取hikari数据库链接池
     *
     * @param jdbcUrl
     * @param dsUname
     * @param dsPwd
     * @param dsDriver
     * @return com.zaxxer.hikari.HikariDataSource
     * @date 2023/8/9 - 2:23 PM
     */
    public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
        String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);
        if (!DATASOURCES.containsKey(md5Key)) {
            synchronized (JdbcDatasourceUtils.class) {
                if (!DATASOURCES.containsKey(md5Key)) {
                    DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));
                }
            }
        }
        return DATASOURCES.get(md5Key);
    }


    /**
     * 构建hikari数据库链接池
     *
     * @param jdbcUrl
     * @param dsUname
     * @param dsPwd
     * @param dsDriver
     * @return com.zaxxer.hikari.HikariDataSource
     * @date 2023/8/9 - 2:14 PM
     */
    private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(jdbcUrl);
        config.setUsername(dsUname);
        config.setPassword(dsPwd);
        config.setDriverClassName(dsDriver);
        // 从池返回的连接的默认自动提交,默认值:true
        config.setAutoCommit(true);
        //只读
        config.setReadOnly(true);
        // 连接超时时间:毫秒,默认值30秒
        config.setConnectionTimeout(10000);
        // 最大连接数
        config.setMaximumPoolSize(32);
        // 最小空闲连接
        config.setMinimumIdle(16);
        // 空闲连接超时时间
        config.setIdleTimeout(600000);
        // 连接最大存活时间
        config.setMaxLifetime(540000);
        // 连接测试查询
        config.setConnectionTestQuery("SELECT 1");
        return new HikariDataSource(config);
    }



    /**
     * 按列加载数据
     *
     * @param dataSource
     * @param sql
     * @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
     * @date 2023/8/15 - 6:03 PM
     */
    public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {
        return loadSql(dataSource, sql, resultSet -> {
            List<Map<String, Object>> datas = new ArrayList<>();
            try {
                if (null == resultSet){
                    return datas;
                }
                ResultSetMetaData metaData = resultSet.getMetaData();
                //组装返回值
                Map<String, Object> entry;
                while (resultSet.next()) {
                    entry = new LinkedHashMap<>();
                    // getColumnLabel 取重命名,getColumnName 原始字段名
                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
                        entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));
                    }
                    datas.add(entry);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return datas;
        });
    }


    /**
     * 加载数据遍历放入set集合
     *
     * @param dataSource
     * @param sql
     * @param function
     * @return java.util.Set<R>
     * @date 2023/8/15 - 6:03 PM
     */
    public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {
        return loadSql(dataSource, sql, resultSet -> {
            Set<R> datas = new LinkedHashSet<>();
            try {
                if (null == resultSet){
                    return datas;
                }
                ResultSetMetaData metaData = resultSet.getMetaData();
                while (resultSet.next()) {
                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
                        datas.add(function.apply(resultSet.getObject(i)));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return datas;
        });
    }


    /**
     * 执行查询sql
     *
     * @param dataSource
     * @param sql
     * @param function
     * @return R
     * @date 2023/8/15 - 6:03 PM
     */
    private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            connection = dataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();

            return function.apply(resultSet);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            if (connection != null){
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (preparedStatement != null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (resultSet != null){
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return function.apply(null);
    }
}