本文共 7992 字,大约阅读时间需要 26 分钟。
## DB操作的代理线程服务
#ifndef CDBPROXYTHREAD_H#define CDBPROXYTHREAD_H#include "baseThread.h"#include "my_sql.h"#include "mysqldb.h"#include## HandleDBActionProxy.cpp/*************************************************************************************************** 1. Describe One DB Action in details.**************************************************************************************************/class DBAction{public: typedef enum _actType { E_ADD, E_DELETE, E_UPDATE, E_SELECT }ACT_TYPE; DBAction(ACT_TYPE _type ,string _action,string _tbName) { type = _type; action = _action; tbName = _tbName; } ACT_TYPE type; string action; string tbName;};/*************************************************************************************************** One Request Maybe Contains servaral DBAction.**************************************************************************************************/class CDBRequest{public: CDBRequest() { this->type = 0; this->actionList.clear(); } CDBRequest(const CDBRequest & other) { this->type = other.type; this->actionList = other.actionList; } CDBRequest(int _type) { type = _type; } void addAction(DBAction action) { actionList.push_back(action); } void cleanAction() { actionList.clear(); }public: vector actionList; int type;};/*************************************************************************************************** Define the Db Action Search.**************************************************************************************************/typedef list TaskList;class HandleDbActionThread : public BaseThread{public: static HandleDbActionThread * getInstance(); ~HandleDbActionThread() { m_taskList.clear(); } inline void addDbRequest(CDBRequest & request){ pthread_mutex_lock(&m_lock); m_taskList.push_back(request); pthread_cond_signal(&m_request_cond); pthread_mutex_unlock(&m_lock); } inline bool getDbRequest(CDBRequest & request){ bool bResult = false; pthread_mutex_lock(&m_lock); if (m_taskList.empty()){ pthread_cond_wait(&m_request_cond, &m_lock); } if ( !m_taskList.empty() ) { bResult = true; request = m_taskList.front(); m_taskList.pop_front(); } pthread_mutex_unlock(&m_lock); return bResult; } int loadDbConfig(); string getKuIP(); inline bool isConnected() { return m_isConnected; }private: HandleDbActionThread() { pthread_mutex_init(&m_lock ,NULL); pthread_cond_init(&m_request_cond,NULL); m_taskList.clear(); loadDbConfig(); m_isConnected = tryConnectToDb(); } bool tryConnectToDb() { m_isConnected = ( 0 == MYSQL_CONN_MYSQL() ); return m_isConnected; } bool processTask(CDBRequest & request);protected: void run();private: TaskList m_taskList; pthread_mutex_t m_lock; pthread_cond_t m_request_cond; bool m_isConnected; string m_hostName; string m_userName; string m_passwd; int m_port;};#endif // CDBPROXYTHREAD_H
#include "cdbproxythread.h"#include "cstringutils.h"#include "typedef.h"extern MSQL_INST gMySql;extern INT32 gMySqlReconn;/*************************************************************************************************** Single Instance for HandleDbActionThread**************************************************************************************************/static HandleDbActionThread * gInstance = NULL;HandleDbActionThread * HandleDbActionThread::getInstance(){ if (gInstance == NULL) gInstance = new HandleDbActionThread(); return gInstance;}/*************************************************************************************************** Load db`s Config File.**************************************************************************************************/int HandleDbActionThread::loadDbConfig(){ return MYSQL_READ_CONFIG();}/*************************************************************************************************** Overload run() virtual function.**************************************************************************************************/void HandleDbActionThread::run(){ cout << __FUNCTION__ << "::Enter HandleDbActionThread." << endl; while(true){ //[1] if ( false == isConnected() ) { cout << __FUNCTION__ << "::MySql is disConnected,will try to connect to it...\n" << endl; bool conState = tryConnectToDb(); if (conState) { cout << __FUNCTION__ << "::MySql is Connected!\n" << endl; } usleep(100); continue; } //[2] CDBRequest request; bool ret = getDbRequest(request); if (ret == true){ processTask(request); } }}/*************************************************************************************************** Function is Used to get KU`s IP value.**************************************************************************************************/string HandleDbActionThread::getKuIP(){ UINT32 affectRows = 0; static char result[100]; memset(result,0,100); int iRet = SELECT_DATA_TABLE("select var_value from cmt_config where id = 11", "cmt_config", result,0, &affectRows); if (iRet != 0) { cout << "select action errror,errno:: " << errno << endl; return string(""); } else { cout << "select data value::" << result << endl; return string(result); }}/**/bool HandleDbActionThread::processTask(CDBRequest & request){ cout << "-------------------------------------------------------------[DB_PROCESS_BEGIN]" << endl; CDBRequest tmp = request; for (int i = 0;i < tmp.actionList.size();i++) { DBAction action = tmp.actionList.at(i); cout << "#########::" << action.type << "==" << action.action << endl; if (action.type == DBAction::E_ADD) { int iRet = INSER_INTO_DATA_TABLE(const_cast(action.action.c_str()), const_cast (action.tbName.c_str())); if ( iRet != 0) { cout << "add action errror,errno:: " << errno << endl; continue; } } else if (action.type == DBAction::E_DELETE) { UINT32 affectRows = 0; int iRet = UPDATE_INTO_DATA_TABLE(const_cast (action.action.c_str()), const_cast (action.tbName.c_str()), &affectRows); if ( iRet != 0) { cout << "delete action errror,errno:: " << errno << endl; continue; } } else if (action.type == DBAction::E_UPDATE) { UINT32 affectRows = 0; int iRet = UPDATE_INTO_DATA_TABLE(const_cast (action.action.c_str()), const_cast (action.tbName.c_str()), &affectRows); if ( iRet != 0) { cout << "update action errror,errno:: " << errno << endl; continue; } } else if (action.type == DBAction::E_SELECT) { UINT32 affectRows = 0; static char result[1024]; memset(result,0,1024); int iRet = SELECT_DATA_TABLE(const_cast (action.action.c_str()), const_cast (action.tbName.c_str()), result,0, &affectRows); if (iRet != 0) { cout << "select action errror,errno:: " << errno << endl; continue; } else { cout << "select data value::" << result << endl; } } else { ;// } } cout << "-------------------------------------------------------------[DB_PROCESS_END]" << endl;}
转载地址:http://ncvbi.baihongyu.com/