如有更佳的保存MySQL方法 欢迎私信或留言分享 相互学习~
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
dataStream.addSink( new JDBCSink() )
class JDBCSink extends RichSinkFunction[输入的数据类型]{
var conn: Connection = _
var insertStatement: PreparedStatement = _
var updateStatement: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/数据库", "账号", "密码")
insertStatement = conn.prepareStatement("INSERT INTO 表名 VALUES (?, ?, ?(占位符));")
updateStatement = conn.prepareStatement("UPDATE 表名 SET 字段 = ? WHERE 字段 = ?;")
}
override def close(): Unit = {
insertStatement.close()
updateStatement.close()
conn.close()
}
override def invoke(value: 输入的数据类型, context: SinkFunction.Context[_]): Unit = {
updateStatement.setInt(1, value.count.toInt)
updateStatement.setString(2, value.url)
updateStatement.setDouble(3, value.windowEnd)
updateStatement.execute()
if(updateStatement.getUpdateCount == 0){
insertStatement.setDouble(1, value.windowEnd)
insertStatement.setString(2, value.url)
insertStatement.setInt(3, value.count.toInt)
insertStatement.execute()
}
}
}