flink scala编程入门的教程网上有一些,看着步骤相对比较简单,但是实践起来还是遇到不少的坑,这里记录下过程,供日后查阅,备注附录了遇到的一些问题。至于代码解释可自行查阅网上其他资料,这里不再赘述。
工程构建
IDEA新建一个空的maven项目,
src/main
目录下新建scala文件夹,并配置为Sources
类型配置pom.xml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111"1.0" encoding="UTF-8" xml version=
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.eflink</groupId>
<artifactId>word-count</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<finalName>word-count</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.eflink.SocketWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.4</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>pom.xml文件中设置flink和scala的版本,本机flink版本为
1.7.1
,scala的版本根据flink的版本来确定,例如:依赖Flink Clients,查看特定flink版本下scala版本支持的情况。
编写代码
参考Flink tutorials scala code,IDEA创建scala class选择Object,代码如下:
1 | package com.eflink |
nc -l 9000
Cellar/apache-flink/1.7.1/libexec/bin/start-cluster.sh
mvn compile/package
flink run target/word-count.jar --port 9000
备注
maven更换阿里云源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24cp /usr/local/Cellar/maven@3.3/3.3.9/libexec/conf/settings.xml ~/.m2
vim ~/.m2/settings.xml
<mirrors>
<!-- mirror
| Specifies a repository mirror site to use instead of a given repository. The repository that
| this mirror serves has an ID that matches the mirrorOf element of this mirror. IDs are used
| for inheritance and direct lookup purposes, and must be unique across the set of mirrors.
|
<mirror>
<id>mirrorId</id>
<mirrorOf>repositoryId</mirrorOf>
<name>Human Readable Name for this Mirror.</name>
<url>http://my.repository.com/repo/path</url>
</mirror>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>包导入问题
IDEA根据编写的代码能够自动或手动提示导入所需要的包,但是构建官方给的代码中会报如下错误:
1
2error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
[ERROR] .flatMap { w => w.split("\\s") }这是因为程序需要一个隐形参数导致的,引用包改为
import org.apache.flink.streaming.api.scala._
单纯scala项目pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.4</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>packagename.classname</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>远程提交
本地开发然后往集群提交远程作业,ExecutionEnvironment.createRemoteEnvironment 第三个参数是打包之后的jar包路径,也是必不可少,不然会找不到类,同时这个jar包是单一jar包,不是那种把所有依赖都打进去的可执行jar包。
IDEA配置scala SDK
打开scala文件,IDEA会弹出通知
No Scala SDK in module Setup Scala SDK
,需要配置scala sdk,点击Setup Scala SDK
配置。- 选择maven下载的scala(推荐)
选择本地安装的scala
查看本地安装scala的位置
1
2
3
4
5
6
7
8
9▶ brew info scala
scala: stable 2.12.8
JVM-based programming language
https://www.scala-lang.org/
/usr/local/Cellar/scala/2.12.8 (42 files, 20.8MB) *
Built from source on 2019-01-08 at 12:30:35
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/scala.rb
==> Requirements
Required: java >= 1.8 ✔scala安装在
/usr/local/Cellar/scala/2.12.8
,但是IDEA弹出的finder不能选择/usr
文件夹,于是曲线救国,创建一个软链接到可访问的目录下:1
ln -s /usr/local/Cellar/ /Users/xxxuser/Cellar
于是可以通过当前用户目录下的Cellar软连接,选择
/usr/local/Cellar/scala/2.12.8/libexec
配置scala sdk。
scala版本问题
flink版本为1.7.1,推荐使用scala2.11版本,虽说1.7.1的flink版本支持scala 2.11和2.12,但是在实践中发现,当使用scala 2.12版本时,运行程序会报错
java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;
,将scala版本改为2.11后程序可正常运行。查阅网上资料,有说是jdk版本和scala版本适配问题导致的,目前未深入分析,具体原因不明。环境配置
安装flink
1
2
3
4▶ brew install apache-flink
▶ flink --version
Version: 1.7.1, Commit ID: 89eafb4安装scala(非必需)
1
▶ brew install scala@2.11
安装mvn(非必需)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18▶ brew install maven@3.3
▶ brew info maven@3.3
maven@3.3: stable 3.3.9 [keg-only]
Java-based project management
https://maven.apache.org/
/usr/local/Cellar/maven@3.3/3.3.9 (91 files, 9.6MB)
# echo 'export PATH="/usr/local/opt/maven@3.3/bin:$PATH"' >> ~/.zshrc
# or
▶ ln -s /usr/local/Cellar/maven@3.3/3.3.9/bin/mvn /usr/local/bin/mvn
▶ mvn -v
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /usr/local/Cellar/maven@3.3/3.3.9/libexec
Java version: 1.8.0_191, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"IDEA 安装
scala
插件