注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

网易杭研后台技术中心的博客

 
 
 
 
 

日志

 
 

MySQL binlog API 实现过程解析  

来自方法   2012-07-03 11:00:32|  分类: MySQL |举报 |字号 订阅

  下载LOFTER 我的照片书  |
Mats Kindahl开发的mysql Replication Listener是一个基于STL/Boost的C++动态库,它提供了一组api函数可以从本地的binlog文件,或则通过网络方式获取binlog文件的具体内容。 mysql Replication Listener获取到的最小单位是binlog event,即mysql执行各种操作所产生的事件。包括,query event,Rotate event,Format event,Table map event等。
使用binlog api之前首先需要创建一个连接,输入的参数有两种类型:1,本地binlog文件,形如file:///path/to/binlog/file;2,mysql的服务器:形如:mysql://username[:password]@ip:port。两类参数分别使用了不同的过程来获取binlog event。分别对这两种实现进行说明。
1,读取本地binlog文件的实现。
file_driver.cpp文件中封装了对本地文件的全部操作过程。
1,connect()
连接过程的原理是,通过判断文件的头四个个字节是不是{0xfe, 0x62, 0x69, 0x6e}来判断输入的文件是不是一个binary log文件。
   2,disconnect()
   直接关闭binlog文件。
   3,set_position()
   通过seek函数将读写指针设置到指定的位置。
   4,get_position()
   返回binlong文件当前的读写位置。
   5,wait_for_next_event()
   该函数首先从binlog文件中读取一个event_log header长度的数据块,hearder的数据结构是:
{
  boost::uint8_t  marker; 
  boost::uint32_t timestamp;
  boost::uint8_t  type_code;
  boost::uint32_t server_id;
  boost::uint32_t event_length;
  boost::uint32_t next_position;
  boost::uint16_t flags;
};
根据type_code的值来判断是什么样的event事件,然后根据不同event事件读取不同长度的文件内容,同时构造一个相应事件的数据结构填充其内容并返回。读完一个事件长度的内容后将文件的读写指针置于该事件内容的末尾,开始下一个事件的读取。直到文件的末尾。
实例分析:
实例1
 int main(int argc, char *argv[]) {
const char *url = “file:///tmp/binlog.000001”;
Binary_log binlog(create_transport(url));
binlog.connect();
Binary_log_event *event;
while (true) {
int result = binlog.wait_for_next_event(&event);
if (result == ERR_EOF)
break;
cout << “ at “ << binlog.get_position()
<< “ event type “ << event.get_type_code()
<< endl;
}
return EXIT_SUCCESS;
}
函数的调用层次关系如下
create_transport   --->  parse_file_url  --->  Binlog_file_driver()
connect ---> Binlog_file_driver :: connect()
wait_for_next_event ---> Binlog_file_driver ::  wait_for_next_event ()

2,读取mysql服务器binlog文件的实现 
    该实现采用网络通信的方式来获取mysql服务器上的binlog信息,通信过程完全采用了mysql网络通信协议的格式,并且使用的是boost::asio::io_servie服务作为通信的基础。tcp_driver.cpp文件中的Binlog_tcp_driver类中封装了整个操作过程的实现。tcp_driver.h文件中定义了所有的对外接口。下面分别对比较重要的接口进行说明。
   1,connect()
   该函数封装了另一个网络连接函数connect(user, passwd, host,  port, binlog_filename, offset)。从字面意思可以看出6个参数的含义。其实现过程如下:
      1)连接mysql服务端,并进行身份验证。调用的函数是sync_connect_and_authenticate。
       2)获取mysql服务端当前使用的binlog文件名和偏移量。调用的函数为fetch_master_status。
       3)开始binlog文件的读取过程。调用的函数为start_binlog_dump。
    分别介绍三个步骤的实现过程:
      sync_connect_and_authenticate:该函数实现了连接mysql服务端和进行身份验证的过程,如果该过程成功则返回一个指向mysql服务端的socket。具体过程是,1,建立一个socket并连接mysql服务端;2,接收服务端发送的握手包;3,向服务端发送一个验证包;4,接收服务端返回的ok包或者是error包。(各种包的格式参见http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol)
     fetch_master_status:该函数首先构造一个COM_QUERY的数据包发送给mysql服务端,请求参数设置为SHOW MASTER STATUS,既获得mysql服务端的master状态。然后接收服务端返回的结果集,最后从结果集中提取出filename和position两个字段的值并返回之。
    start_binlog_dump:该函数开始获取mysql服务端的binlog文件的内容,由于第2步所获得的position是当前binlog文件的末尾,所以只有在建立连接之后写入到binlog文件中的event才能被获取到。读取过程首先构造一个COM_BINLOG_DUMP的数据包发送给mysql服务端。然后异步读取服务端返回的数据。先接收到的是数据包的头部,根据头部中的长度信息读取整个包体的内容。得到一个完整的event数据包后,解析包体的内容。一个完整的包体对应的是一个binlog event,所以可以分析它是属于一个什么事件,并构造一个对应的event数据结构,使用包体中的信息填充其各个字段。最后将这个event数据结构插入到event_queue队列中,以便其他接口读取。在整个读取过程中如果发现了错误,则程序构造一个incident_event事件插入到event_queue队列中。处理完之后又开始异步读取下一个数据包的头部,如此形成一个循环读取binlog日志的过程。
   2,wait_for_next_event()
     上面提到了event_queue队列,需要对其进行说明。它是一个循环队列,里面保存了从mysql服务端获取的全部event信息,基本元素是一个event数据结构。对外提供了若干接口,两个比较重要的是:push_front和pop_back,这两个接口的实现使用了boost::condition进行同步处理,以保证入队列和出队列的正确性。
     wait_for_next_event函数就是调用pop_back()方法,从event_queue队列中取出一个event事件。
  3,set_position()
      该函数的输入参数是binlog文件名,和偏移量position。功能是从mysql服务端的指定binlog文件和指定偏移量position处开始读取binlog event信息。实现步骤如下:
     1)连接mysql服务端,并进行身份验证。调用的函数是sync_connect_and_authenticate。
     2)构造一个‘show binary logs’的COM_QUERY查询数据包,发送给服务端以获得mysql服务器上所有的binlog文件名,以及它们的长度信息。
     3)根据 2)中mysql服务端返回的信息检查输入参数的合法性。
     4)停止当前与mysql的网络io服务,断开与mysql服务端的网络连接。
     5)指定binlog_filename和offset为输入的参数binlog文件名和偏移量position,调用函数connect(user, passwd, host,  port, binlog_filename, offset)重新连接mysql服务端。
  4,get_position()
       该函数获取mysql服务器当前正在使用的binlog文件名和当前的偏移量。实现步骤如下:
    1)连接mysql服务端,并进行身份验证。调用的函数是sync_connect_and_authenticate。
    2)获取mysql服务端当前使用的binlog文件名和偏移量。调用的函数为fetch_master_status。
5,disconnect()
    该函数做的工作是:清空event_queue队列中的全部内容,关闭socket网络连接。
实例分析:
实例2
 int main(int argc, char *argv[]) {
const char *url = “mysql://root@127.0.0.1:3360”;
Binary_log binlog(create_transport(url));
binlog.connect();
Binary_log_event *event;
while (true) {
int result = binlog.wait_for_next_event(&event);
if (result == ERR_EOF)
break;
cout << “ at “ << binlog.get_position()
<< “ event type “ << event.get_type_code()
<< endl;
}
return EXIT_SUCCESS;
}
函数的调用层次关系是:
create_transport   --->  parse_mysql_url  --->  Binlog_tcp_driver()
connect ---> Binlog_tcp_driver :: connect()
wait_for_next_event ---> Binlog_tcp_driver ::  wait_for_next_event ()

3, binlog api的接口函数。
     对外的接口函数全部定义在文件binlog_api.h文件中,封装成类Binary_log,定义的接口函数有:connect(), wait_for_next_event(),  set_position(), get_position(),它们都是Binary_log的方法。Binary_log还有一个类型为Binary_log_driver指针的成员变量m_driver,  Binary_log_driver是file_driver和tcp_driver的基类。构造一个Binary_log实例时需要传入一个指向Binary_log_driver类的指针,该指针指向的实际值是一个file_driver实例或一个tcp_driver实例。接口函数的的实现过程为:connect(),set_position(),get_position()分别直接调用m_driver指向的实例的connect ,  set_position ,get_position方法。 wait_for_next_event方法的实现是,调用m_driver指向的实例的wait_for_next_event的方法,然后将得到的event事件传给一个处理函数进行处理,目前处理函数的行为是直接返回该事件。
  评论这张
 
阅读(1878)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017