注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

网易杭研后台技术中心的博客

 
 
 
 
 

日志

 
 

基于Row的MySQL并行复制实现分析(一)  

来自raolh   2013-06-07 09:53:27|  分类: 默认分类 |举报 |字号 订阅

  下载LOFTER 我的照片书  |
基于Row的MySQL并行复制方案使用多个SQL线程回放slave上的relay log,提高了slave与master上数据的同步速度。本篇分析一下该方案的实现过程。

该方案只能使用Row格式的Binlog,使用Statement格式的Binlog时退化成单个SQL线程执行。该方案的实现原理是:每个事务操作的行都有一个范围,如果两个事务所操作的行的范围没有重叠则可以并行执行。大体的实现是:一个线程负责分析relay log中的事务,然后交给多个执行线程中的一个去执行事务。如果当前分析的事务与所有执行线程中的事务没有重叠行,则可以将这个事务交给拥有最少事务的执行线程去执行;如果当前分析的事务与一个执行线程的事务有重叠行,则将该事务交给重叠线程去执行;如果当前分析的事务与多于2个线程的事务有重叠行,则分析线程等待执行线程执行了一段时间之后再重新进行判断。

分析线程的工作由原来的SQL线程中执行,执行线程是新开的若干个线程。原来的SQL线程也会执行一些不需要分发给其他执行线程的事务。过程如下图所示:
基于行的MySQL并行复制实现分析 - 网易杭研后台技术中心 - 网易杭研后台技术中心的博客

事务events结构

  分析线程从relay log中读取一个事务的所有event,读取的范围是从BEGIN开始读取到COMMIT,或ROLLBACK或XID_EVENT结束。为了保存读取出来的事务event,定义一个数据结构tansaction_st,该结构的组成是:

class transaction_st
{
  int worker_id;
/*以下用于记录事务event*/
  int inner_events; //事务中包含的event的个数
  Log_event *event_list_head; // event链表的头部
  Log_event *event_list_tail; //event链表的尾部
  bool contains_stat; //该事务是否包含statement格式的event
/*以下用于判断事务是否可以分发给其他线程执行*/
  bool contains_no_pk_row; //该事物是否包含无主键的表的event
  HASH trans_pk_hash; //事务操作的所有行的主键id的哈希表
/*以下用于记录与执行线程的行重叠情况*/
  int conflict_workers[SLAVE_THREAD]; //冲突的执行线程编号
  int conflict_number; //总共和多少个执行线程了
/*以下用于记录事务的位置,方便relay log info更新执行位置信息*/
  my_off_t log_pos; //对于的master binlog中的位置信息
  time_t when; //event产生的时间
  char relay_log_name[FN_REFLEN]; //relay log的名称
  my_off_t relay_log_pos; //relay log中的位置
  trans_pos_t *trans_pos; // 用于 relay log info更新执行位置信息
。。。
}


行主键值的提取

分析线程从relay log中读取到一个Row event后,提取它的主键id值以作为它的哈希索引关键字。提取过程是:
1,通过event中的表id,在relay log info中获得该event操作的表对象:

for (TABLE_LIST *ptr= rli->tables_to_lock; ptr; ptr= ptr->next_global)
  {
    if (m_table_id != ((RPL_TABLE_LIST*)ptr)->table_id)
continue;
   。。。
}

2,判断该表是否存在主键,如果没有主键则退出:

if(m_table->s->keys <=0 || (m_table->s->key_info[0].flags & HA_NOSAME) == 0)
{
rli->curr_trans->contains_no_pk_row= true;
break;
}

3,提取行数据中的主键信息,过程是:使用方法unpack_current_row(rli)将event中的行数据保存到m_table->record[0]中,根据表的主键信息(m_table->s->keys和m_table->s->key_info[])获取行数据的主键值(m_table->s->key_info[j]. key_parts 和 m_table->s->key_info[j].key_part[i].field)。

分析线程提取到了一个event的主键之后将该event加入到事务结构tansaction_st的event链表中。

获取到了一个事务的全部event后就可以将该事务分发给其他线程执行,如果该事务包含statement格式的binlog或则事务中操作了不含主键的表,则该事务不能分发,必须等到所有的事务执行完之后再在分析线程中执行该事务。

事务执行线程

执行线程有一个管理的数据结构,Transfer_worker

class Transfer_worker
{
  transaction_st *trans_list[worker_size]; //线程中包含事务的数组
  int waiting_trans_number; //线程数组中包含的事务的个数
  int list_head, list_end; //事务链表的头和尾位置
  rw_lock_t trans_list_lock; //保护事务链表的锁
  HASH pk_hash; //保存所有事务的行主键值的哈希索引链表
  rw_lock_t pk_hash_lock; //保护哈希索引的链表
  Relay_log_info *rli;
  Relay_log_info *dummy_rli;
  bool terminated; //是否停止执行
  bool running; //是否正在运行
  sem_t event_sem; //分析线程将事务加入到该执行线程后的通知信号。
  THD *thd;
};

分析线程在将事务分发给执行线程时,首先要检查该事务是否和执行线程中的事务有行的冲突。检查的过程是:若事务中的trans_pk_hash中的记录能够在执行线程的pk_hash链表中找到,则存在行的冲突。过程如下:

for(i=0; i<trans->trans_pk_hash.records; i++) { item= (hash_item_t*)my_hash_element(&trans->trans_pk_hash, i); if (worker->check_pk_exist_key(item)) return TRUE; }

  分析线程将事务与所有的执行线程做冲突检查,如果不存在冲突则将事务交给事务量最小的线程去执行,如果只和一个线程冲突,则交给该线程执行。如果和多个线程冲突则等待一段时间再进行检查。

执行线程执行事务的过程是:从线程的事务链表中取得事务执行,如果事务链表为空则进行等待,直到执行线程将事务加入到线程的事务链表中,并将它唤醒:

while (!terminated)
{
   sem_wait(&event_sem);
while(1){
if (waiting_trans_number <= 0) //没有事务进入事务等待
break;
if (execute_transcation(trans_list[list_head]) == 0){ //依次执行链表中的事务
...
list_head= (list_head + 1) % worker_size;
waiting_trans_number--;
...
}
}
...
}


Relay log执行的位置的更新

最后分析一下relay log info是如何更新SQL线程执行的位置的。rli结构中添加了一个记录事务位置的数组

trans_pos_t trans_pos[SLAVE_THREAD *worker_size];

分析线程在将一个事务分发给一个执行线程时,会将该事务的位置添加到rli的事务位置数组中,由于分析线程依次从relay log中读取event,所以trans_pos数组中保存的事务位置是依次递增的。trans_pos的结构如下:

struct trans_pos_t { my_off_t log_pos; time_t when; char relay_log_name[FN_REFLEN]; my_off_t relay_log_pos; bool done;//表示事务是否执行成功 };

当执行线程中执行完一个事务后会调用rli的一个方法,让它尝试更新执行的位置,更新的过程是:

while(trans_number && trans_pos[trans_head].done) { //从事务数组的低位置到高位置,找到连续的已经完成的事务的最后一个。 last_pos= &trans_pos[trans_head]; trans_head= (trans_head+1)%(SLAVE_THREAD* worker_size); trans_number--; } if(last_pos) { 。。。

//将rli的执行位置更新到最后一个事务的位置。 stmt_done(last_pos->log_pos, last_pos->when, last_pos->relay_log_name, last_pos->relay_log_pos); }

由于rli位置的更新办法是找到若干个连续的完成事务,然后更新到最后一个事务的位置。但是事务分发后在多个线程中执行,完成的顺序是不固定的。所以可能造成的一种情况是,后面的事务已经完成了,但前面的事务还没完成。这时如果MySQL发生崩溃,rli中记录的是前面事务的位置。重启以后,SQL线程会继续从前面位置的事务开始执行,这样会重复执行后面已经完成的事务,造成事务的不一致。所以该方案不是crash safe的并发复制方案。后面介绍的in-order commit的办法可以解决这个问题,实现复制的crash safe。







  评论这张
 
阅读(870)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017