会话情况:
添加依赖
<properties> <flink.version>1.17.2</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.13.6</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.2-1.17</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> <exclude>org.apache.hadoop:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
java代码
package com.demo.day1; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import static org.apache.flink.connector.kafka.sink.KafkaSink.builder; public class Demo1_WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092") .setTopics("topic1") .setValueOnlyDeserializer(new SimpleStringSchema()) .setGroupId("g1") .setStartingOffsets(OffsetsInitializer.latest()) .build(); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source"); ds.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(","); for(String s1:arr){ out.collect(Tuple2.of(s1,1)); } } }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }).sum("f1").print(); env.execute(); } }
测试:
查看(数据同步):
申请一个yarn会话:
打架包:
上传架包:
运行:
测试:
修改并行度为3进行测试:
打架包并上传:
查看:
生产者向topica(3个分区)发送数据:
向topica发送2000条数据
package com.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class producer{ public static void main(String[] args) throws Exception{ Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //设置自定义分区 configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner"); KafkaProducer<String,String> producer = new KafkaProducer<>(configs); for (int i=0;i<1000;i++){ ProducerRecord producerRecord=new ProducerRecord("topica","kafka"); producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.println("发送成功:"+recordMetadata.partition()); System.out.println("发送成功:"+recordMetadata.topic()); System.out.println("发送成功:"+recordMetadata.offset()); } } }); } producer.close(); } }
发送分区随机:
查看数据:
向topica发送1000000条数据:
package com.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class producer{ public static void main(String[] args) throws Exception{ Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //设置自定义分区 configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner"); KafkaProducer<String,String> producer = new KafkaProducer<>(configs); for (int i=0;i<1000000;i++){ ProducerRecord producerRecord=new ProducerRecord("topica",i%3,null,"kafka"); producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.println("发送成功:"+recordMetadata.partition()); System.out.println("发送成功:"+recordMetadata.topic()); System.out.println("发送成功:"+recordMetadata.offset()); } } }); } producer.close(); } }
查看数据:
1. 上传flink的lib和plugins到HDFS上 hdfs dfs -mkdir /flink-dist hdfs dfs -put /opt/installs/flink/lib /flink-dist hdfs dfs -put /opt/installs/flink/plugins/ /flink-dist 2. 上传自己的jar包到HDFS hdfs dfs -mkdir /my-flinkjars hdfs dfs -put /opt/flinkjob/flink-test-1.0-SNAPSHOT.jar /my-flinkjars 3. 提交作业 flink run-application \ -t yarn-application \ -Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \ -c com.demo.day1.Demo1_WordCount \ hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
提交作业后查看yarn:
测试:
写数据:
查看数据(不在一个分区,具有随机性):