Flink DataStream使用JdbcInputFormat方式从Mysql读取数据

2023-06-19 16:44
  • 1. 背景
  • 2. 查看Mysql表数据
  • 3. Flink DataStream查询数据程序
1. 背景

官网Jdbc DataStream Connector只有Sink,没有Source,所以需要用其它方式从Mysql查询数据

2. 查看Mysql表数据
mysql> select * from person;
+------+------+
| id | name |
+------+------+
| 1 | yi |
| 2 | er |
+------+------+
2 rows in set (0.00 sec)
mysql>
3. Flink DataStream查询数据程序
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.connector.jdbc.JdbcInputFormat
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.types.Row
object FlinkTest {
def main(args: Array[String]): Unit = {
val senv:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val mysqlPersonDataInput:DataStream[Row] = senv.createInput(JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://192.168.8.115:3306/test?serverTimezone=GMT%2B8&useSSL=false")
.setUsername("root")
.setPassword("Root_123")
.setQuery("select id, name from person")
.setRowTypeInfo(new RowTypeInfo(
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO))
.finish())
mysqlPersonDataInput.executeAndCollect().foreach(println)
}
}

运行程序,结果如下:

+I[1, yi]
+I[2, er]