本文共 8865 字,大约阅读时间需要 29 分钟。
我在开发中写了两个Redis的key,没有设置过期时间。
原因是:在视频呼叫接通、挂断的时候会把key删除,当时觉得没有必要设置过期时间。 项目上线后发现,这俩key已经有了30万的数据没有被删除。 经分析发现:呼叫超时、杀进程等情况是不能调到删key接口的,所以才导致了这俩key有30万条都没能被删除。改代码,将两个key加上过期时间,跟着最近的版本上线。
import sysimport redisimport ospool = redis.ConnectionPool(host = 'xx.xx.xx.xx',port = xxxx)r = redis.StrictRedis(connection_pool = pool)match = sys.argv[1]+"*"print(match)count = 10000for key in r.scan_iter(match = match,count = count): time = r.ttl(key) if time==-1: r.expire(key,120) print("set expire key:",key) print(time)
我现在测试环境跑了一遍,大概10分钟3000多条,如果跑线上大概得十几个小时。而且使用r.scan_iter 是一次取出count 个,老大建议我用job,因为jedis.scan可使用游标。
public void clearRedis(){ String key ="redis:aaaa:bbbb:date"; /** * rRouter.getJedis 是我们自己封装的方法-连接Redis */ Jedis jedis = rRouter.getJedis("REDIS1", DEFAULT, true); try { Listlist = new ArrayList<>(); String cursor = ScanParams.SCAN_POINTER_START; int temp = 0; int i =0; do { ScanParams params = new ScanParams(); params.count(200); params.match(key + "*"); ScanResult scan = jedis.scan(cursor, params); list.addAll(scan.getResult()); cursor = scan.getStringCursor(); i++; temp += list.size(); log.info("pointControl->key:{} 第{}轮查询,temp:{}",key,i,temp); //需要加过期时间的key if (list.size() > 0) { for (String keyss : list) { Long expire = jedis.expire(keyss, 1000); } list.clear(); } Thread.sleep(100); } while (!"0".equals(cursor)); } catch (Exception e) { log.info("error", e); } finally { if (null != jedis) { jedis.close(); } } }
此好处仅在于jedis.scan可使用游标,但是效率和第一版几乎是一样的。jedis没有可以匹配加过期时间的方法,只能循环expire。
老大提醒,这30万个key,有些都是半个月前的,其实他们的存在与否已经不影响用户使用了,其实可以直接删掉。
每个用户都有一个yhId,yhId就是自增长的,所以我只要找到10天前的yhId,小于此yhId的全部删掉,大于的再加过期时间就可以了。public void clearRedis(Long startYhId){ String key ="redis:aaaa:bbbb:date"; /** * rRouter.getJedis 是我们自己封装的方法-连接Redis */ Jedis jedis = rRouter.getJedis("REDIS1", RedisDEFAULT, true); try { Listlist = new ArrayList<>(); String cursor = ScanParams.SCAN_POINTER_START; int temp = 0; int i =0; do { ScanParams params = new ScanParams(); params.count(200); params.match(key + "*"); ScanResult scan = jedis.scan(cursor, params); list.addAll(scan.getResult()); cursor = scan.getStringCursor(); i++; temp += list.size(); log.info("pointControl->key:{} 第{}轮查询,temp:{}",key,i,temp); List delList = list.stream().filter(s -> Long.parseLong(s.substring(s.length() - 20)) <= startYhId).collect(Collectors.toList()); //需要删除的key if (delList.size() > 0) { String[] delArray = delList.toArray(new String[delList.size()]); jedis.del(delArray); } //需要加过期时间的key list.removeAll(delList); //需要加过期时间的key if (list.size() > 0) { for (String keyss : list) { Long expire = jedis.expire(keyss, 1000); } list.clear(); } Thread.sleep(100); } while (!"0".equals(cursor)); } catch (Exception e) { log.info("error", e); } finally { if (null != jedis) { jedis.close(); } } }
传入startYhId ,过滤出小于等于他的list,删掉,其他的加过期时间。jedis支持批量删除,所以效率一下子提高了不少。此版代码,3000条数据只用了1分钟
我将代码放到了job里(由于历史原因,我们项目最初使用的是自己开发的job框架)。
老大问:如何控制?比如,万一中途出现什么问题,我需要让他停下来,该怎么办? 那我们自己的job是不能支持的,只能用xxl-job,用输入参数来支持了,so,我将代码移到xxl-job,加入了参数判断。 而且,在我想用老job时,我将startYhId 放在了配置中心,使用了xxl-job之后,startYhId和end都可以从控制台输入,只要在代码里解析字符串就可以了。public void clearRedis(Long startYhId, Integer end){ String key ="redis:aaaa:bbbb:date"; /** * rRouter.getJedis 是我们自己封装的方法-连接Redis */ Jedis jedis = rRouter.getJedis("REDIS1", RedisDEFAULT, true); try { Listlist = new ArrayList<>(); String cursor = ScanParams.SCAN_POINTER_START; int temp = 0; int i =0; do { // xxl-job参数控制 停止 if (temp >= end) { log.info("Stop->key:{},temp:{},end:{}",key,temp,end); return; } ScanParams params = new ScanParams(); params.count(200); params.match(key + "*"); ScanResult scan = jedis.scan(cursor, params); list.addAll(scan.getResult()); cursor = scan.getStringCursor(); i++; temp += list.size(); log.info("pointControl->key:{} 第{}轮查询,temp:{}",key,i,temp); List delList = list.stream().filter(s -> Long.parseLong(s.substring(s.length() - 20)) < startYhId).collect(Collectors.toList()); //需要删除的key if (delList.size() > 0) { String[] delArray = delList.toArray(new String[delList.size()]); jedis.del(delArray); } //需要加过期时间的key list.removeAll(delList); //需要加过期时间的key if (list.size() > 0) { for (String keyss : list) { Long expire = jedis.expire(keyss, 1000); } list.clear(); } Thread.sleep(100); } while (!"0".equals(cursor)); } catch (Exception e) { log.info("error", e); } finally { if (null != jedis) { jedis.close(); } } }
老大又问:你代码是不是只清了一台Redis,我们线上有3台。
艾玛…这么要紧的事情给忘了。 xxl-job的参数里加入数据源,有我手动控制清理哪台Redis 所以我的入参为 “数据源,yhid,end” 的字符串。 代码加入数据源控制public void clearRedis(Long startYhId, Integer end, Integer database){ if (1== database){ RSourceEnum rSourceEnum = REDIS1; }else if (2== database){ RSourceEnum = REDIS2; }else { RSourceEnum = REDIS3; } String key ="redis:aaaa:bbbb:date"; /** * rRouter.getJedis 是我们自己封装的方法-连接Redis */ Jedis jedis = rRouter.getJedis("REDIS1", RedisDEFAULT, true); try { Listlist = new ArrayList<>(); String cursor = ScanParams.SCAN_POINTER_START; int temp = 0; int i =0; do { // xxl-job参数控制 停止 if (temp >= end) { log.info("Stop->key:{},temp:{},end:{}",key,temp,end); return; } ScanParams params = new ScanParams(); params.count(200); params.match(key + "*"); ScanResult scan = jedis.scan(cursor, params); list.addAll(scan.getResult()); cursor = scan.getStringCursor(); i++; temp += list.size(); log.info("pointControl->key:{} 第{}轮查询,temp:{}",key,i,temp); List delList = list.stream().filter(s -> Long.parseLong(s.substring(s.length() - 20)) < startYhId).collect(Collectors.toList()); //需要删除的key if (delList.size() > 0) { String[] delArray = delList.toArray(new String[delList.size()]); jedis.del(delArray); } //需要加过期时间的key list.removeAll(delList); //需要加过期时间的key if (list.size() > 0) { for (String keyss : list) { Long expire = jedis.expire(keyss, 1000); } list.clear(); } Thread.sleep(100); } while (!"0".equals(cursor)); } catch (Exception e) { log.info("error", e); } finally { if (null != jedis) { jedis.close(); } } }
OK,finally,代码上线 ,30万数据,不到一个小时就跑完了,之后数据都正常了
转载地址:http://etgjx.baihongyu.com/