快盘下载:好资源、好软件、快快下载吧!

快盘排行|快盘最新

当前位置:首页软件教程电脑软件教程 → Flink读取Kafka数据下沉到HDFS

Flink读取Kafka数据下沉到HDFS

时间:2022-09-24 13:01:18人气:作者:快盘下载我要评论

1:采用BucketingSink的方式

public class BucketingSinkDemo {
	public static void main(String[] args) throws Exception {
	
		long rolloverInterval = 2 * 60 * 1000;
		long batchSize = 1024 * 1024 * 100;

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
	    System.setProperty("HADOOP_USER_NAME", "hadoop");
		String topic = "ods_lark_order";
		Properties prop = new Properties();
		prop.setProperty("bootstrap.servers","ip:port");
		prop.setProperty("group.id","groupid");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
		kafkaConsumer.setStartFromGroupOffsets();//默认消费策略
		DataStreamSource<String> source = env.addSource(kafkaConsumer);
		//
		BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://ip:port/flink/order_sink");
		// HDFS的配置
		Configuration configuration = new Configuration();
		// 1.能够指定block的副本数
		configuration.set("dfs.replication","1");
		hadoopSink.setFSConfig(configuration);
		// 2.指定分区文件夹的命名
		hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")));
		// 3.指定块大小和时间间隔生成新的文件
		hadoopSink.setBatchSize(batchSize);
		hadoopSink.setBatchRolloverInterval(rolloverInterval);
		// 4.指定生成文件的前缀,后缀,正在运行文件前缀
		hadoopSink.setPendingPrefix("order_sink");
		hadoopSink.setPendingSuffix("");
		hadoopSink.setInProgressPrefix(".in");
		source.addSink(hadoopSink);
		env.execute();
	}
}

采用这种方式的好处:

1.能够指定block的副本数

2.指定分区文件夹的命名

3.指定块大小和时间间隔生成新的文件

4.指定生成文件的前缀,后缀,正在运行文件前缀

缺点:

该方法已经过期,新版建议采用StreamingFileSink,笔者第一次找到该类发现能够写入成功,但是没有找到如何能够对写入HDFS进行压缩,比如parquet或者orc

2:采用StreamingFileSink的方式-行编码【forRowFormat】

public class StreamingFileSinkForRowFormatDemo {
	public static void main(String[] args) throws Exception {

		//获取Flink的运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		String topic = "ods_lark_order";
		Properties prop = new Properties();
		prop.setProperty("bootstrap.servers","ip:port");
		prop.setProperty("group.id","first");

		FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
		myConsumer.setStartFromGroupOffsets();//默认消费策略
		DataStreamSource<String> source = env.addSource(myConsumer);


		// 自定义滚动策略
		DefaultRollingPolicy<String, String> rollPolicy = DefaultRollingPolicy.builder()
				.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))/*每隔多长时间生成一个文件*/
				.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))/*默认60秒,未写入数据处于不活跃状态超时会滚动新文件*/
				.withMaxPartSize(128 * 1024 * 1024)/*设置每个文件的最大大小 ,默认是128M*/
				.build();
		// 输出文件的前、后缀配置
		OutputFileConfig config = OutputFileConfig
				.builder()
				.withPartPrefix("prefix")
				.withPartSuffix(".txt")
				.build();
		StreamingFileSink<String> streamingFileSink = StreamingFileSink
				.forRowFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"),new SimpleStringEncoder<String>("UTF-8") )
				.withBucketAssigner(new DateTimeBucketAssigner<>())
				// 设置指定的滚动策略
				.withRollingPolicy(rollPolicy)
				// 桶检查间隔,这里设置为1s
				.withBucketCheckInterval(1)
				// 指定输出文件的前、后缀
				.withOutputFileConfig(config)
				.build();
		source.addSink(streamingFileSink);
		env.execute("StreamingFileSinkTest");
	}
}

采用这种方式的好处:

1.能够指定block的副本数

2.指定分区文件夹的命名

3.指定块大小和时间间隔生成新的文件

4.指定生成文件的前缀,后缀,正在运行文件前缀

缺点:

由于是按照行进行的,所以不能进行压缩

3:采用StreamingFileSink的方式-bucket压缩 【forBulkFormat】

public class StreamingFileSinkDemo {
	public static void main(String[] args) throws Exception {

		//获取Flink的运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		// checkpoint配置
		env.enableCheckpointing(60000);
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		String topic = "ods_lark_order";
		Properties prop = new Properties();
		prop.setProperty("bootstrap.servers","ip:port");
		prop.setProperty("group.id","first");
        // 获取流
		FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
		myConsumer.setStartFromGroupOffsets();
		DataStreamSource<String> source = env.addSource(myConsumer);
		DataStream<Order> nameDS = source.map(new MapFunction<String, Order>() {
			@Override
			public Order map(String s) throws Exception {
				Order order = new Order();
				JSONObject jsonObject = JSONObject.parseObject(s);
				order.setName(jsonObject.getString("name"));
				return order;
			}
		});

		// 1.输出文件的前、后缀配置
		OutputFileConfig config = OutputFileConfig
				.builder()
				.withPartPrefix("prefix")
				.withPartSuffix(".txt")
				.build();
		// 设置为Parquet的压缩方式
		StreamingFileSink<Order> streamingFileSink = StreamingFileSink
				.forBulkFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"), ParquetAvroWriters.forReflectRecord(Order.class))
				/*这里是采用默认的分桶策略DateTimeBucketAssigner,它基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH*/
				.withBucketAssigner(new DateTimeBucketAssigner<>())
				.withRollingPolicy(OnCheckpointRollingPolicy.build())
				.withOutputFileConfig(config)
				.build();
				
		nameDS.addSink(streamingFileSink);
		env.execute("StreamingFileSinkTest");
	}
}

采用这种方式的好处:

1.输出文件的前、后缀配置

2.设置为Parquet的压缩方式

缺点:

文件生成是通过checkpoint时候触发的,当checkpoint 过于频繁的话会生成很多的小文件,同时任务数过多,也会生成很多小文件,涉及到后续的小文件合并的情况

相关文章

  • 一条命令轻松解决 「应用程序 “xxx” 不能打开」的问题

    一条命令轻松解决 「应用程序 “xxx” 不能打开」的问题,很多人刚从熟悉的Windows转到较为陌生的Mac,在使用过程中遇到一些困难是必然的。有小伙伴就遇到了这样的问题,在网上下载软件,下载下来的软件安装包是经过压缩......
  • AntDB数据并行加载工具的实现

    AntDB数据并行加载工具的实现,数据加载速度是评判数据库性能的重要指标,能否提高数据加载速度,对文件数据进行并行解析,直接影响数据库运维管理效率。基于此,AntDB分布式数据库提供了两种数据加......

网友评论

快盘下载暂未开通留言功能。

关于我们| 广告联络| 联系我们| 网站帮助| 免责声明| 软件发布

Copyright 2019-2029 【快快下载吧】 版权所有 快快下载吧 | 豫ICP备10006759号公安备案:41010502004165

声明: 快快下载吧上的所有软件和资料来源于互联网,仅供学习和研究使用,请测试后自行销毁,如有侵犯你版权的,请来信指出,本站将立即改正。