首页
Search
1
yamux: how to work?
79 阅读
2
The Art of Memory Allocation: Malloc, Slab, C++ STL, and GoLang Memory Allocation
70 阅读
3
How to receive a network packet in Linux
63 阅读
4
Maps and Memory Leaks in Go
54 阅读
5
C++ redis connection pool
52 阅读
测试
Wireguard
K8s
Redis
C++
Golang
Libcurl
Tailscale
Nginx
Linux
web3
Uniswap V2
Uniswap V3
EVM
security
solidity
openzeppelin
登录
Search
标签搜索
web3
solidity
web3 security
c++
uniswapV3
redis
evm
uniswap
性能测试
k8s
wireguard
CNI
http
tailscale
nginx
linux
设计模式
Jericho
累计撰写
51
篇文章
累计收到
13
条评论
首页
栏目
测试
Wireguard
K8s
Redis
C++
Golang
Libcurl
Tailscale
Nginx
Linux
web3
Uniswap V2
Uniswap V3
EVM
security
solidity
openzeppelin
页面
搜索到
2
篇与
的结果
2022-12-02
C++ redis connection pool
学习teamtalk服务端源码,把redis连接池的实现记录下,用到了hiredis三方库和头文件。整个redis缓存池是三个类实现的,redis-manager,redis-pool,redis-conn。{lamp/}main函数获取redis-manager对象实例CacheManager* CacheManager::getInstance() { if (!s_cache_manager) { s_cache_manager = new CacheManager(); if (s_cache_manager->Init()) { delete s_cache_manager; s_cache_manager = NULL; } } return s_cache_manager; } main() { //初始化redis CacheManager* pCacheManager = CacheManager::getInstance(); if (!pCacheManager) { log("CacheManager init failed"); return -1; } }通过redis-manager类init函数读取配置文件,创建对应的缓存池对象,并初始化缓存池,每个缓存池有一个缓存池名对应一个redis-pool指针加入到map中管理。int CacheManager::Init() { CConfigFileReader config_file("dbproxyserver.conf"); //CacheInstances=unread,group_set,token,sync,group_member char* cache_instances = config_file.GetConfigName("CacheInstances"); if (!cache_instances) { log("not configure CacheIntance"); return 1; } char host[64]; char port[64]; char db[64]; char maxconncnt[64]; CStrExplode instances_name(cache_instances, ','); for (uint32_t i = 0; i < instances_name.GetItemCnt(); i++) { char* pool_name = instances_name.GetItem(i); //printf("%s", pool_name); snprintf(host, 64, "%s_host", pool_name); snprintf(port, 64, "%s_port", pool_name); snprintf(db, 64, "%s_db", pool_name); snprintf(maxconncnt, 64, "%s_maxconncnt", pool_name); char* cache_host = config_file.GetConfigName(host); char* str_cache_port = config_file.GetConfigName(port); char* str_cache_db = config_file.GetConfigName(db); char* str_max_conn_cnt = config_file.GetConfigName(maxconncnt); if (!cache_host || !str_cache_port || !str_cache_db || !str_max_conn_cnt) { log("not configure cache instance: %s", pool_name); return 2; } CachePool* pCachePool = new CachePool(pool_name, cache_host, atoi(str_cache_port), atoi(str_cache_db), atoi(str_max_conn_cnt)); if (pCachePool->Init()) { log("Init cache pool failed"); return 3; } m_cache_pool_map.insert(make_pair(pool_name, pCachePool)); } return 0; }缓存池init的时候会创建redis连接对象,默认数量为2,并将创建的redis连接对象加入到缓存池的成员变量空闲redis连接数组中,等待被取用。int CachePool::Init() { for (int i = 0; i < m_cur_conn_cnt; i++) { CacheConn* pConn = new CacheConn(this); if (pConn->Init()) { delete pConn; return 1; } m_free_list.push_back(pConn); } log("cache pool: %s, list size: %lu", m_pool_name.c_str(), m_free_list.size()); return 0; }redis连接对象init的时候主要用于连接的初始化和重连操作,每次对redis增删改查的之前都会调用init这个函数,每4秒尝试重连一次。/* * redis初始化连接和重连操作,类似mysql_ping() */ int CacheConn::Init() { if (m_pContext) { return 0; } // 4s 尝试重连一次 uint64_t cur_time = (uint64_t)time(NULL); if (cur_time < m_last_connect_time + 4) { return 1; } m_last_connect_time = cur_time; // 200ms超时 struct timeval timeout = {0, 200000}; m_pContext = redisConnectWithTimeout(m_pCachePool->GetServerIP(), m_pCachePool->GetServerPort(), timeout); if (!m_pContext || m_pContext->err) { if (m_pContext) { log("redisConnect failed: %s", m_pContext->errstr); redisFree(m_pContext); m_pContext = NULL; } else { log("redisConnect failed"); } return 1; } redisReply* reply = (redisReply *)redisCommand(m_pContext, "SELECT %d", m_pCachePool->GetDBNum()); if (reply && (reply->type == REDIS_REPLY_STATUS) && (strncmp(reply->str, "OK", 2) == 0)) { freeReplyObject(reply); return 0; } else { log("select cache db failed"); return 2; } }整个初始化的过程大概结束,下面就是通过redis-manager从缓存池中获取释放redis连接的过程。缓存池名是通过配置文件中读入的,获取缓存池中的redis连接需要传入缓存池名,通过map获取到缓存池名对应的缓存池对象指针,并调用其成员函数getCacheConn获取redis连接。CacheConn* CacheManager::GetCacheConn(const char* pool_name) { map<string, CachePool*>::iterator it = m_cache_pool_map.find(pool_name); if (it != m_cache_pool_map.end()) { return it->second->GetCacheConn(); } else { return NULL; } }调用缓存池CachePool成员函数getCacheConn获取redis连接这块代码有可学之处,他用了pthread库里面的信号量来优雅地等待空闲连接数组中的连接对象,为了线程安全需要在访问空闲连接数组的时候加锁,不断判断数组中是否存在可用的redis连接对象,如果没有可用的连接,则将判断此时正在使用的redis连接对象个数是否超过指定最大值,超过则将用信号量阻塞,直到有空闲的连接被唤醒。如果此时连接数小于指定最大值,则将继续创建redis连接对象并初始化一个可用的redis连接对象加入到空闲redis连接数组中。如果数组中有可用的redis连接则将从数组头部取出一个元素并返回。CacheConn* CachePool::GetCacheConn() { m_free_notify.Lock(); while (m_free_list.empty()) { if (m_cur_conn_cnt >= m_max_conn_cnt) { m_free_notify.Wait(); } else { CacheConn* pCacheConn = new CacheConn(this); int ret = pCacheConn->Init(); if (ret) { log("Init CacheConn failed"); delete pCacheConn; m_free_notify.Unlock(); return NULL; } else { m_free_list.push_back(pCacheConn); m_cur_conn_cnt++; log("new cache connection: %s, conn_cnt: %d", m_pool_name.c_str(), m_cur_conn_cnt); } } } CacheConn* pConn = m_free_list.front(); m_free_list.pop_front(); m_free_notify.Unlock(); return pConn; }释放redis连接对象,传入需要释放的redis连接对象指针,加锁,遍历redis连接对象数组中是否存在该连接对象,不存在则将连接对象加入到该空闲redis连接数组中,信号量通知唤醒被阻塞的线程,redis连接数组中有空闲连接可用。void CachePool::RelCacheConn(CacheConn* pCacheConn) { m_free_notify.Lock(); list<CacheConn*>::iterator it = m_free_list.begin(); for (; it != m_free_list.end(); it++) { if (*it == pCacheConn) { break; } } if (it == m_free_list.end()) { m_free_list.push_back(pCacheConn); } m_free_notify.Signal(); m_free_notify.Unlock(); }
2022年12月02日
52 阅读
2 评论
0 点赞
2022-12-01
redis debug asynchronous aof fsync is taking too long (disk is busy?)
一.背景 线上redis的监控数据不太好看,领导觉得看看能不能优化下,分析下什么情况,于是就.....二.现象 看到redis日志里面一堆这个,于是就开始了分析排查先放一张前后对比图吧{callout color="#f0ad4e"}p99延时优化一百倍 😏 {/callout}{lamp/}三.排查 之前在网上找,也看到有人说了相关的解释,不过还是自己决定在多研究下其他的缘由,直接去官网 看看官方的解释 redis troubleshoting 好早之前分析的,现发现,redis的官网变了,这玩意都找不到,直接把之前的图贴上来吧重点看红框部分,此项警告在redis代码中是在AOF fsync刷盘模式是everysecs,代码中的注释 /* Otherwise fall trough, and go write since we can't wait over two seconds,redis中的AOF持久化主要是又两个系统调用实现,一个是fdatasync,另外一个是write,redis发现文件有在执行 fdatasync(2) 时,就先不调用 write(2),只存在 cache 里,免得被 Block。但如果已经超过两秒都还是如此,则会强行执行 write(2),导致redis 会被 Block 住 。居然知道了大概的方向,我们就直接去看日志然后再通过strace去监控write和fdatasync系统调用时间来进行排(线上还是谨慎使用strace吧,这玩意系统调用还是很危险的,优选ebpf)此时观测redis日志,发现redis 主线程 PID:2418redis AOF线程 PID:2412 (服务器配置的AOF同步策略是everysec,故每秒执行一次)redis RDB子进程 PID:14603有了这个日志和数据,我们直接开启硬核的步骤,去看redis的源码,看看RDB和AOF的执行流redis执行流分析 redis进程是一个事件循环,分为文件事件和时间事件文件事件:主要对套接字操作的抽象,服务器通过监听并处理这些事件来完成一系列网络通信的操作,完成客户端的命令请求和回复时间事件:主要分为周期的和定时的事件,做一些统计、清理、aof等的工作因为服务器在处理文件事件时可能会执行命令,使得一些内容被追加到aof_buf缓冲区,所以服务器每次执行一次事件循环都会调用flushAppendOnlyFile函数,考虑要不要将aof_buf缓冲区中的内容写入保存到AOF文件中。RDB后台同步流程rdbSave将数据写入同步到磁盘,write写完文件之后,进行了(fflush,fsync)强行刷盘,导致IO繁忙RDB后台子进程发送退出信号给父进程,父进程捕获然后处理redis主进程AOF 刷盘逻辑根据上一次刷盘的延时,来决定要不要return,如果此时延时超过2s则将延迟加1,打出log,接着主进程为了保证AOF的数据一致性,需要将AOF写命令写入到AOF缓冲区,强行调用write如果当前没有再做AOF_FSYNC的后台任务,根据设置的sync策略进行调用FSYNC刷盘四.解决方案 方法一:配置修改Redis在执行write的时候,由操作系统自身被动控制何时进行fsync。这里如果我们要主动触发操作系统的fsync,可以设置操作系统级别的参数:查看内存的脏页字节大小,设置为0代表由系统自己控制何时调用fsyncsysctl -a | grep vm.dirty_bytesvm.dirty_bytes = 0修改为一个小的值,例如32MB,达到这个数据量就fsync,让操作系统fsync这个动作更频繁一点,避免单次fsync太多数据,导致阻塞echo "vm.dirty_bytes=33554432" >> /etc/sysctl.conf 方法二:关闭RDB或者AOF(安全性换效率)关闭RDB可减少RDB生成的时候,对磁盘的压力,而关闭AOF则可以减少AOF刷盘对磁盘的压力,从而让Redis的性能达到极致。不过这个方法只能用在纯缓存场景,如果有持久化的要求,则一般不靠谱,因为安全性大大降低,一旦宕机,则数据无法恢复。一种折中的方案,是从库开启AOF,而主库关闭AOF。
2022年12月01日
17 阅读
0 评论
1 点赞