欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

MySQL:MGR學(xué)習(xí)(2):Writeset(寫(xiě)集合)的寫(xiě)入過(guò)程

MySQL:MGR 學(xué)習(xí)(2):Write set(寫(xiě)集合)的寫(xiě)入過(guò)程


水平有限,有誤請(qǐng)諒解。
源碼版本5.7.22

公司主營(yíng)業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)推出潞城免費(fèi)做網(wǎng)站回饋大家。


一、前文總結(jié)

前文 <<MySQL:MGR 學(xué)習(xí)(1):寫(xiě)集合(Write set)>>中已經(jīng)說(shuō)明了Write set的生成過(guò)程,但是Write set是需要封裝如下Transaction_context_log_event中進(jìn)行廣播到其他節(jié)點(diǎn)進(jìn)行認(rèn)證的。本文就描述Write set的寫(xiě)入和廣播的過(guò)程。
如前文所描述,整個(gè)事物的Write set在函數(shù)binlog_log_row中生成,對(duì)于5.7來(lái)講每一行的每個(gè)唯一鍵都會(huì)生成一個(gè)Write set(但是咨詢宋利兵老師得知8.0唯一鍵不會(huì)再記錄Write set了),每個(gè)Write set實(shí)際上是一個(gè)8字節(jié)的uint64類型其通過(guò)hash函數(shù)生成,并且在Rpl_transaction_write_set_ctx存儲(chǔ)了一個(gè)vector數(shù)組和一個(gè)set集合來(lái)分別存儲(chǔ),如果修改的行比較多那么可能需要一個(gè)更多內(nèi)存來(lái)存儲(chǔ)這些hash值,雖然8字節(jié)比較小,但是如果是大事物上千萬(wàn)的表在一個(gè)事物里面做修改那么內(nèi)存可能消耗會(huì)上百兆。如下圖是事物執(zhí)行期間(commit之前)最終形成的Write set內(nèi)存空間示意圖。

MySQL:MGR 學(xué)習(xí)(2):Write set(寫(xiě)集合)的寫(xiě)入過(guò)程

image.png

二、Transaction_context_log_event生成的時(shí)機(jī)

在事物執(zhí)行期間會(huì)生成map event/query event/dml event等并且會(huì)源源不斷的寫(xiě)入到binlog cache中,同時(shí)會(huì)將Write set 不斷的寫(xiě)入到Rpl_transaction_write_set_ctx保存在內(nèi)存中,這些邏輯都在binlog_log_row中。但是Transaction_context_log_event的生成卻是在commit的時(shí)候,具體的位置是在MYSQL_BIN_LOG::prepare之后但是在MYSQL_BIN_LOG::ordered_commit之前,顯而易見(jiàn)這個(gè)時(shí)候的binlog event還在bing cache中,還沒(méi)有寫(xiě)入binlog file中。所以MGR的事物全局認(rèn)證的動(dòng)作是發(fā)生在binlog event落地之前。下面是這個(gè)棧幀:

#0  group_replication_trans_before_commit (param=0x7ffff0e7b8d0) at /root/softm/percona-server-5.7.22-22/rapid/plugin/group_replication/src/observer_trans.cc:511#1  0x00000000014e4814 in Trans_delegate::before_commit (this=0x2e44800, thd=0x7fffd8000df0, all=false, trx_cache_log=0x7fffd8907a10, stmt_cache_log=0x7fffd8907858, 
    cache_log_max_size=18446744073709547520) at /root/softm/percona-server-5.7.22-22/sql/rpl_handler.cc:325#2  0x000000000188a386 in MYSQL_BIN_LOG::commit (this=0x2e7b440, thd=0x7fffd8000df0, all=false) at /root/softm/percona-server-5.7.22-22/sql/binlog.cc:8974#3  0x0000000000f80623 in ha_commit_trans (thd=0x7fffd8000df0, all=false, ignore_global_read_lock=false) at /root/softm/percona-server-5.7.22-22/sql/handler.cc:1830#4  0x00000000016ddab9 in trans_commit_stmt (thd=0x7fffd8000df0) at /root/softm/percona-server-5.7.22-22/sql/transaction.cc:458#5  0x00000000015d1a8d in mysql_execute_command (thd=0x7fffd8000df0, first_level=true) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5293#6  0x00000000015d3182 in mysql_parse (thd=0x7fffd8000df0, parser_state=0x7ffff0e7e600) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5901#7  0x00000000015c6d16 in dispatch_command (thd=0x7fffd8000df0, com_data=0x7ffff0e7ed70, command=COM_QUERY)
    at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1490#8  0x00000000015c5aa3 in do_command (thd=0x7fffd8000df0) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1021#9  0x000000000170ebb0 in handle_connection (arg=0x3cd32d0) at /root/softm/percona-server-5.7.22-22/sql/conn_handler/connection_handler_per_thread.cc:312#10 0x0000000001946140 in pfs_spawn_thread (arg=0x3c71630) at /root/softm/percona-server-5.7.22-22/storage/perfschema/pfs.cc:2190#11 0x00007ffff7bc7851 in start_thread () from /lib64/libpthread.so.0#12 0x00007ffff651290d in clone () from /lib64/libc.so.6

三、MGR全局認(rèn)證發(fā)送內(nèi)容的生成過(guò)程

下面是我通過(guò)對(duì)源碼淺顯的理解得出過(guò)程:

  • 1、獲取當(dāng)前的binlog cache內(nèi)容記錄為cache_log,這些就是已經(jīng)在執(zhí)行階段生成map/query/dml event等。

  • 2、生成一個(gè)新的IO_CACHE作為臨時(shí)存儲(chǔ)為cache,目的在于存儲(chǔ)。Transaction_context_log_event 和Gtid_log_event。

  • 3、將cache_log類型轉(zhuǎn)換為READ類型同時(shí)初始化各種輔助類容如偏移量。

  • 4、初始化Transaction_context_log_event 。

  • 5、掃描Rpl_transaction_write_set_ctx中的write_set_unique 集合的內(nèi)容,并且將其存儲(chǔ)到Transaction_write_set 定義的內(nèi)存空間中write_set中,注意這里只是用到了集合沒(méi)用到數(shù)組。這里也就是進(jìn)行Write set的一個(gè)拷貝而已其考到write_set臨時(shí)變量中。

  • 6、將write_set內(nèi)容填充到Transaction_context_log_event中,整個(gè)過(guò)程還會(huì)做base64的轉(zhuǎn)換,最終填充到event的是base64格式的Write set類容。完成后析構(gòu)write_set來(lái)臨時(shí)變量

  • 7、 將Transaction_context_log_event寫(xiě)入到第二步定義的cache中。

  • 8、生成Gtid_log_event,只是做一些初始化動(dòng)作,Gtid并沒(méi)有生成。

  • 9、將Gtid_log_event寫(xiě)入到第二步定義的cache中。

  • 10、通過(guò)cache+cache_log的總和來(lái)對(duì)比 group_replication_transaction_size_limit 設(shè)置的值,也就是判斷整個(gè)事物的binlog event是否操作了參數(shù)設(shè)置。

  • 11、將cache類型轉(zhuǎn)換為READ類型同時(shí)初始化各種輔助類容如偏移量。

  • 12、將cache和cache_log分別寫(xiě)入到到transaction_msg中。

  • 13、流控相關(guān),沒(méi)仔細(xì)看,如果有機(jī)會(huì)學(xué)習(xí)流控機(jī)制在仔細(xì)學(xué)習(xí)。

  • 14、gcs_module負(fù)責(zé)發(fā)送transaction_msg到各個(gè)節(jié)點(diǎn)

  • 15、掛起等待事物認(rèn)證的結(jié)果。

那么整個(gè)過(guò)程大概就是:

  • 經(jīng)過(guò)hash的Write set (集合)->拷貝到write_set變量(類數(shù)組)->通過(guò)base64算法寫(xiě)入到Transaction_context_log_event ->合并其他binlog event到transaction_msg->gcs_module廣播transaction_msg到其他節(jié)點(diǎn)->等待認(rèn)證結(jié)果

四、相關(guān)源碼

1、group_replication_trans_before_commit 函數(shù)相關(guān)內(nèi)容
 if (trx_cache_log_position > 0 && stmt_cache_log_position == 0) //如果存在事物cache
  {
    cache_log= param->trx_cache_log; //設(shè)置到IO_cache
    cache_log_position= trx_cache_log_position;
  }  else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)//如果存在語(yǔ)句cache
  {
    cache_log= param->stmt_cache_log;
    cache_log_position= stmt_cache_log_position;
    is_dml= false;
    may_have_sbr_stmts= true;
  }  else
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "We can only use one cache type at a "
                                "time on session %u", param->thread_id);
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);    /* purecov: end */
  }
  applier_module->get_pipeline_stats_member_collector()
      ->increment_transactions_local();
  DBUG_ASSERT(cache_log->type == WRITE_CACHE);
  DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu,"
                           " stmt_cache_log_position: %llu",
                           param->thread_id, trx_cache_log_position,
                           stmt_cache_log_position));  /*
    Open group replication cache.
    Reuse the same cache on each session for improved performance.
  */
  cache= observer_trans_get_io_cache(param->thread_id,
                                     param->cache_log_max_size); //獲取一個(gè)新的IO_CACHE系統(tǒng)
  if (cache == NULL) //錯(cuò)誤處理
  {    /* purecov: begin inspected */
    error= pre_wait_error;    goto err;    /* purecov: end */
  }  // Reinit binlog cache to read.
  if (reinit_cache(cache_log, READ_CACHE, 0)) ////將IO_CACHE類型進(jìn)行轉(zhuǎn)換 并且位置還原
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read "
                                "on session %u", param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }  
  /*
    After this, cache_log should be reinit to old saved value when we
    are going out of the function scope.
  */
  reinit_cache_log_required= true;  // Create transaction context.
  tcle= new Transaction_context_log_event(param->server_uuid, Rpl_transaction_write_set_ctx
                                          is_dml,
                                          param->thread_id,
                                          is_gtid_specified); //初始化 Transaction_context_log_event
  if (!tcle->is_valid())
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,                "Failed to create the context of the current "
                "transaction on session %u", param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }  if (is_dml)
  {
    Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);// 獲取前期得到write set 并且放回到一個(gè)臨時(shí)內(nèi)存空間 write_set中 
    /*
      When GTID is specified we may have empty transactions, that is,
      a transaction may have not write set at all because it didn't
      change any data, it will just persist that GTID as applied.
    */
    if ((write_set == NULL) && (!is_gtid_specified)) 
    {
      log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written "
                                  "during the execution of the current "
                                  "transaction on session %u", param->thread_id);
      error= pre_wait_error;      goto err;
    }    if (write_set != NULL)
    {      if (add_write_set(tcle, write_set))//將整個(gè)wirte_set內(nèi)容復(fù)制到event Transaction_context_log_event中 此時(shí)就進(jìn)入了event了
      {        /* purecov: begin inspected */
        cleanup_transaction_write_set(write_set); //write set已經(jīng)完成了它的功能需要析構(gòu)
        log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written "
                                    "during the execution of the current "
                                    "transaction on session %u", param->thread_id);
        error= pre_wait_error;        goto err;        /* purecov: end */
      } 
      cleanup_transaction_write_set(write_set); //如果add_write_set函數(shù)調(diào)用出現(xiàn) 有問(wèn)題 也需要析構(gòu)掉
      DBUG_ASSERT(is_gtid_specified || (tcle->get_write_set()->size() > 0));
    }    else
    {      /*
        For empty transactions we should set the GTID may_have_sbr_stmts. See
        comment at binlog_cache_data::may_have_sbr_stmts().
      */
      may_have_sbr_stmts= true; 
    } Log_event::write
  }  // Write transaction context to group replication cache.
  tcle->write(cache); //寫(xiě)入到MGR CACHE 寫(xiě)入 TCLE的header(virtual)  body(virtual)   footer
  // Write Gtid log event to group replication cache.
  gle= new Gtid_log_event(param->server_id, is_dml, 0, 1,
                          may_have_sbr_stmts,
                          gtid_specification);
  gle->write(cache); //寫(xiě)入GTID event到MGR CACHE 占位
  transaction_size= cache_log_position + my_b_tell(cache);  if (is_dml && transaction_size_limit &&
     transaction_size > transaction_size_limit)
  {
    log_message(MY_ERROR_LEVEL, "Error on session %u. "
                "Transaction of size %llu exceeds specified limit %lu. "
                "To increase the limit please adjust group_replication_transaction_size_limit option.",
                param->thread_id, transaction_size,
                transaction_size_limit); //group_replication_transaction_size_limit 事物大小參數(shù)
    error= pre_wait_error;    goto err;
  }  // Reinit group replication cache to read.
  if (reinit_cache(cache, READ_CACHE, 0))//將IO_CACHE類型進(jìn)行轉(zhuǎn)換 并且位置還原
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
                                "cache, for read operations, on session %u",
                                param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }  // Copy group replication cache to buffer.
  if (transaction_msg.append_cache(cache)) //加入到transaction_msg
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while appending data to an internal "
                                "cache on session %u", param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }  // Copy binlog cache content to buffer.
  if (transaction_msg.append_cache(cache_log))//加入到transaction_msg
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on "
                                "session %u", param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }
  DBUG_ASSERT(certification_latch != NULL);  if (certification_latch->registerTicket(param->thread_id))
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications "
                                "regarding the outcome of the transaction on "
                                "session %u", param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }#ifndef DBUG_OFF
  DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",
                  {
                    DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");
                    DBUG_ASSERT(!sql_command_check());
                  };);
  DBUG_EXECUTE_IF("group_replication_before_message_broadcast",
                  {                    const char act[]= "now wait_for waiting";
                    DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
                  });#endif
  /*
    Check if member needs to throttle its transactions to avoid
    cause starvation on the group.
  */
  applier_module->get_flow_control_module()->do_wait(); //流控相關(guān)
  //Broadcast the Transaction Message
  send_error= gcs_module->send_message(transaction_msg); //gcs廣播
  if (send_error == GCS_MESSAGE_TOO_BIG)
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group "
                                "on session %u. Message is too big.",
                                param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }  else if (send_error == GCS_NOK)
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to "
                                "the group on session %u", param->thread_id);
    error= pre_wait_error;    goto err;    /* purecov: end */
  }
  shared_plugin_stop_lock->release_read_lock();
  DBUG_ASSERT(certification_latch != NULL);  if (certification_latch->waitTicket(param->thread_id)) //等待認(rèn)證結(jié)果
  {    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection "
                                "procedure to finish on session %u",
                                param->thread_id);
    error= post_wait_error;    goto err;    /* purecov: end */
  }
2、add_write_set函數(shù)
int add_write_set(Transaction_context_log_event *tcle,
                  Transaction_write_set *set){
  DBUG_ENTER("add_write_set");  int iterator= set->write_set_size; //將循環(huán)次數(shù)設(shè)置為 set的長(zhǎng)度 也就是有多少個(gè)write sets
  for (int i = 0; i < iterator; i++)
  {
    uchar buff[BUFFER_READ_PKE];
    int8store(buff, set->write_set[i]); //逐字節(jié)復(fù)制到buffer中
    uint64 const tmp_str_sz= base64_needed_encoded_length((uint64) BUFFER_READ_PKE);    char *write_set_value= (char *) my_malloc(PSI_NOT_INSTRUMENTED,                                              static_cast<size_t>(tmp_str_sz), MYF(MY_WME)); //13bytes (gdb) p tmp_str_sz $2 = 13
    if (!write_set_value)//分配內(nèi)存錯(cuò)誤
    { 
      /* purecov: begin inspected */
      log_message(MY_ERROR_LEVEL, "No memory to generate write identification hash");
      DBUG_RETURN(1);      /* purecov: end */
    }    if (base64_encode(buff, (size_t) BUFFER_READ_PKE, write_set_value)) //做base64算法 
    {      /* purecov: begin inspected */
      log_message(MY_ERROR_LEVEL,                  "Base 64 encoding of the write identification hash failed");
      DBUG_RETURN(1);      /* purecov: end */
    }
    tcle->add_write_set(write_set_value); //最終將base64格式的write set寫(xiě)入到event中
  }
  DBUG_RETURN(0);
}
3、get_transaction_write_set函數(shù)
Transaction_write_set* get_transaction_write_set(unsigned long m_thread_id){
  DBUG_ENTER("get_transaction_write_set");
  THD *thd= NULL;
  Transaction_write_set *result_set= NULL;  Find_thd_with_id find_thd_with_id(m_thread_id, false);
  thd= Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);  if (thd)
  {    std::set<uint64> *write_set= thd->get_transaction()
        ->get_transaction_write_set_ctx()->get_write_set(); //Rpl_transaction_write_set_ctx   std::set<uint64> *get_write_set();
    unsigned long write_set_size= write_set->size(); //返回集合大小
    if (write_set_size == 0)
    {
      mysql_mutex_unlock(&thd->LOCK_thd_data);
      DBUG_RETURN(NULL);
    }
    result_set= (Transaction_write_set*)my_malloc(key_memory_write_set_extraction,                                                  sizeof(Transaction_write_set),
                                                  MYF(0));//這里為其Transaction_write_set分配內(nèi)存空間
    result_set->write_set_size= write_set_size; //獲取size
    result_set->write_set=
        (unsigned long long*)my_malloc(key_memory_write_set_extraction,
                                       write_set_size *                                       sizeof(unsigned long long),
                                       MYF(0));//分配內(nèi)存
    int result_set_index= 0;    for (std::set<uint64>::iterator it= write_set->begin();//完成復(fù)制注意是從set中復(fù)制到簡(jiǎn)單的內(nèi)存中
         it != write_set->end();
         ++it)
    {
      uint64 temp= *it;
      result_set->write_set[result_set_index++]=temp;
    }
    mysql_mutex_unlock(&thd->LOCK_thd_data);
  }
  DBUG_RETURN(result_set);
}

作者微信:

MySQL:MGR 學(xué)習(xí)(2):Write set(寫(xiě)集合)的寫(xiě)入過(guò)程

當(dāng)前文章:MySQL:MGR學(xué)習(xí)(2):Writeset(寫(xiě)集合)的寫(xiě)入過(guò)程
文章出自:http://aaarwkj.com/article16/pphjdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版用戶體驗(yàn)、移動(dòng)網(wǎng)站建設(shè)、軟件開(kāi)發(fā)、品牌網(wǎng)站制作網(wǎng)站策劃

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司
韩日男人女人性生活视频| 日韩精品 在线一区二区| 亚洲天堂男人的天堂狠狠操| 99热精品在线免费观看| 国产真人作爱视频免费| 三级av电影中文字幕| 日本av二区三区在线| 香蕉夜夜草草久久亚洲香蕉| 日韩一二三四区精品电影免费播放 | 亚洲中文有码一区二区| 中文字幕在线感觉av| 国产精品九九久久精品女同| 精品中文人妻中文字幕| 日韩人妻一区中文字幕| 国产区二区三区在线视频| 韩国专区福利一区二区| av黄色资源在线观看| 久久久亚洲精品中文字幕蜜桃| 久久综合激情亚洲欧美专区| 日本一区二区三区久久久| 黄色午夜福利在线观看| 日本久久久视频在线观看| 我要看亚洲黄色片一级| 欧美日本黄色一级视频| 国产一区二区传媒视频| 91国产精品视频在线| 国产亚洲一区二区日韩欧美| 婷婷色爱区综合五月激情 | 风韵丰满熟妇老熟女啪啪| 成人福利网站午夜一区| 91九色在线精品人妻| 九九九视频精品免费九九| 99热视频在线观看免费| 中文字幕韩国三级电影| 男女性生活视频成年人观看| 亚洲综合中文字幕精品| 三级精品一区二区三区| 依依成人影院在线观看av| 日韩激情中文字幕一区二区三区| 国产精品重口调教系列| 97国产精品成人免费视频|