flink学习之state
state作用
保留当前key的历史状态。
state用法
ListState<Integer> vipList = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vipList", TypeInformation.of(Integer.class)));
有valueState listState mapstate 。冒失没有setstate
state案例
比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小说。
起点做了防爬措施。防爬措施如下。
1.如果一个用户1s翻1页,或者速度更快,连续10次,那么就认为用户是机器人。
上述两种情况,用户 不断的发起点击事件
userId=1 click_time=2023-09-07 00:00:00
userId=1 click_time=2023-09-07 00:00:01
我们如何判断1呢?
lastClickState 保留用户上一次的点击时间。
clickcntState 保留用户1s1页连续点击次数。
来一条数据就与上一次的lastClickState去对比,
如果间隔<1s clickcntState就+1
如果>1s clickcntState就置于0
同时判断clickcntState次数是否>=10如果大于就将该userid 输出到sink
来个demo直接说话。
package com.chenchi.pojo;
public class User {
public Integer userId;
public Integer vip;
public long clickTime=System.currentTimeMillis();
public User() {
}
public User(Integer userId, Integer vip) {
this.userId = userId;
this.vip = vip;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getVip() {
return vip;
}
public void setVip(Integer vip) {
this.vip = vip;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", vip=" + vip +
", clickTime=" + clickTime +
'}';
}
public long getClickTime() {
return clickTime;
}
public void setClickTime(long clickTime) {
this.clickTime = clickTime;
}
}
package com.chenchi.pojo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class UserSource implements SourceFunction<User> {
boolean run = true;
public UserSource(){}
int randomBound=1000;
int interval=1000;
public UserSource(Integer randomBound){
this.randomBound=randomBound;
}
public UserSource(Integer randomBound,int interval){
this.randomBound=randomBound;
this.interval=interval;
}
@Override
public void run(SourceContext<User> sourceContext) throws Exception {
while (true) {
Integer userId = new Random().nextInt(randomBound);
Integer vip = new Random().nextInt(10);
sourceContext.collect(new User(userId, vip));
Thread.sleep(interval);
}
}
@Override
public void cancel() {
run = false;
}
}
package com.chenchi.state;
import com.chenchi.pojo.User;
import com.chenchi.pojo.UserSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ValueStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CustomProcess customProcess = new CustomProcess();
DataStreamSource<User> userDataStreamSource = env.addSource(new UserSource(4,100));
userDataStreamSource.print();
userDataStreamSource
.keyBy(user->user.userId)
.process(customProcess)
.print();
env.execute();
}
static class CustomProcess extends KeyedProcessFunction<Integer,User,String> {
ValueState<Long> lastClickTime;
ValueState<Integer> cnt;
@Override
public void open(Configuration parameters) throws Exception {
lastClickTime= getRuntimeContext().getState(new ValueStateDescriptor<Long>("click", TypeInformation.of(Long.class)));
cnt= getRuntimeContext().getState(new ValueStateDescriptor<Integer>("cnt",Integer.class));
super.open(parameters);
}
@Override
public void processElement(User user, KeyedProcessFunction<Integer, User, String>.Context context, Collector<String> collector) throws Exception {
Integer userId = user.getUserId();
long clickTime = user.getClickTime();
Long last = lastClickTime.value();
Integer value = cnt.value();
if (value==null){
value=0;
}
if (last!=null&&clickTime-last<1000){
//点击太快
cnt.update(value+1);
}else {
//之前可能是不喜欢的页数突然点快了 点击慢就置0
cnt.update(0);
}
//保存当前的listState
lastClickTime.update(clickTime);
if (cnt.value()>10){
collector.collect("userId="+userId+",clickCnt="+cnt.value()+",click太快 注意注意");
}
}
}
}
打印日志
10> User{userId=0, vip=0, clickTime=1694083167883}
11> User{userId=0, vip=4, clickTime=1694083167994}
12> User{userId=2, vip=4, clickTime=1694083168102}
1> User{userId=2, vip=7, clickTime=1694083168212}
2> User{userId=2, vip=3, clickTime=1694083168320}
3> User{userId=1, vip=0, clickTime=1694083168428}
4> User{userId=0, vip=7, clickTime=1694083168537}
5> User{userId=2, vip=3, clickTime=1694083168646}
6> User{userId=2, vip=0, clickTime=1694083168757}
7> User{userId=2, vip=3, clickTime=1694083168866}
8> User{userId=0, vip=9, clickTime=1694083168975}
9> User{userId=0, vip=3, clickTime=1694083169084}
10> User{userId=2, vip=7, clickTime=1694083169191}
11> User{userId=0, vip=7, clickTime=1694083169298}
12> User{userId=0, vip=6, clickTime=1694083169406}
1> User{userId=3, vip=9, clickTime=1694083169513}
2> User{userId=0, vip=4, clickTime=1694083169618}
3> User{userId=3, vip=3, clickTime=1694083169726}
4> User{userId=1, vip=8, clickTime=1694083169833}
5> User{userId=2, vip=1, clickTime=1694083169942}
6> User{userId=3, vip=2, clickTime=1694083170050}
7> User{userId=2, vip=8, clickTime=1694083170158}
8> User{userId=1, vip=4, clickTime=1694083170267}
9> User{userId=1, vip=2, clickTime=1694083170374}
10> User{userId=0, vip=1, clickTime=1694083170481}
11> User{userId=3, vip=6, clickTime=1694083170589}
12> User{userId=0, vip=9, clickTime=1694083170696}
1> User{userId=3, vip=1, clickTime=1694083170804}
2> User{userId=1, vip=8, clickTime=1694083170911}
3> User{userId=1, vip=3, clickTime=1694083171018}
4> User{userId=0, vip=7, clickTime=1694083171126}
5> User{userId=1, vip=8, clickTime=1694083171233}
6> User{userId=3, vip=5, clickTime=1694083171341}
7> User{userId=0, vip=8, clickTime=1694083171448}
9> userId=0,clickCnt=11,click太快 注意注意
效果符合预期。