导航菜单
路很长,又很短
博主信息
昵   称:Cocodroid ->关于我
Q     Q:2531075716
博文数:360
阅读量:2225807
访问量:259925
至今:
×
云标签 标签球>>
云标签 - Su的技术博客
Tags : Solr,源码分析发表时间: 2016-02-02 19:55:10

 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略。本文以及后续的文章将重点介绍Replication策略。Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步。本文先介绍在SolrCloud的leader到replica的数据同步,下一篇文章将介绍通过配置SolrConfig.xml来实现多个Solr节点间的主从同步。solr_logo1

一. Replication策略介绍

        Replication的作用在前文已经介绍过了,当需要同步的数据较多时候,Solr会放弃按document为单位的同步模式(PeerSync)而采用以文件为最小单位的同步模式。在Replication的过程中,重点使用了SnapPuller类,它封装了对leader数据快照以及通过快照来实现同步的代码。Replication流程原理如下图所示。接下来根据源码来介绍每一步骤。

062148254676168

  • 开始Replication的时候,首先进行一次commitOnLeader操作,即发送commit命令到leader。它的作用就是将leader的update中的数据刷入到索引文件中,使得快照snap完整。
 1   private void commitOnLeader(String leaderUrl) throws SolrServerException,
 2       IOException {
 3     HttpSolrServer server = new HttpSolrServer(leaderUrl);
 4     try {
 5       server.setConnectionTimeout(30000);
 6       UpdateRequest ureq = new UpdateRequest();
 7       ureq.setParams(new ModifiableSolrParams());
 8       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
 9       ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
10       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
11           server);
12     } finally {
13       server.shutdown();
14     }
15   }

 

 

  • 获取leader的lastVersion与lastGeneration,同本分片的进行比较来确定是否需要进行同步。
 1       //get the current x27replicateablex27 index version in the master
 2       NamedList response = null;
 3       try {
 4         response = getLatestVersion();
 5       } catch (Exception e) {
 6         LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
 7         return false;
 8       }
 9       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
10       long latestGeneration = (Long) response.get(GENERATION);

 

 

  • 检查本分片是否打开IndexWriter,如果没有则Recovery失败。这是因为没有打开indexWriter就无法获取索引的generation以及version信息,replication无法进行下去。
 1       // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
 2       IndexCommit commit = core.getDeletionPolicy().getLatestCommit();
 3       if (commit == null) {
 4         // Presumably the IndexWriter hasnx27t been opened yet, and hence the deletion policy hasnx27t been updated with commit points
 5         RefCounted<SolrIndexSearcher> searcherRefCounted = null;
 6         try {
 7           searcherRefCounted = core.getNewestSearcher(false);
 8           if (searcherRefCounted == null) {
 9             LOG.warn("No open searcher found - fetch aborted");
10             return false;
11           }
12           commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
13         } finally {
14           if (searcherRefCounted != null)
15             searcherRefCounted.decref();
16         }
17       }

 

 

  • 如果获取的leader的lastestVersion为0,则表示leader没有索引数据,那么根本就不需要进行replication。所以返回true结果。
 1       if (latestVersion == 0L) {
 2         if (forceReplication && commit.getGeneration() != 0) {
 3           // since we wonx27t get the files for an empty index,
 4           // we just clear ours and commit
 5           RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
 6           try {
 7             iw.get().deleteAll();
 8           } finally {
 9             iw.decref();
10           }
11           SolrQueryRequest req = new LocalSolrQueryRequest(core,
12               new ModifiableSolrParams());
13           core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
14         }
15         
16         //there is nothing to be replicated
17         successfulInstall = true;
18         return true;
19       }

 

 

  • 我们还需要通过比较分片的lastestVersion和leader的lastestVersion来确定是否需要继续进行replication,因为两者相等同样没必要进行replication,除非进行的时forceReplication
复制代码
1       if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
2         //master and slave are already in sync just return
3         LOG.info("Slave in sync with master.");
4         successfulInstall = true;
5         return true;
6       }

 

 

  • 获取leader节点的lastestGeneration的索引文件列表以及相关文件信息,以及配置文件列表以及信息。如果文件列表为空,则退出replication。
 1       // get the list of files first
 2       fetchFileList(latestGeneration);
 3       // this can happen if the commit point is deleted before we fetch the file list.
 4       if(filesToDownload.isEmpty()) return false;
 5 
 6  private void fetchFileList(long gen) throws IOException {
 7     ModifiableSolrParams params = new ModifiableSolrParams();
 8     params.set(COMMAND,  CMD_GET_FILE_LIST);
 9     params.set(GENERATION, String.valueOf(gen));
10     params.set(CommonParams.WT, "javabin");
11     params.set(CommonParams.QT, "/replication");
12     QueryRequest req = new QueryRequest(params);
13     HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient);  //XXX modify to use shardhandler
14     try {
15       server.setSoTimeout(60000);
16       server.setConnectionTimeout(15000);
17       NamedList response = server.request(req);
18 
19       List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST);
20       if (files != null)
21         filesToDownload = Collections.synchronizedList(files);
22       else {
23         filesToDownload = Collections.emptyList();
24         LOG.error("No files to download for index generation: "+ gen);
25       }
26 
27       files = (List<Map<String,Object>>) response.get(CONF_FILES);
28       if (files != null)
29         confFilesToDownload = Collections.synchronizedList(files);
30 
31     } catch (SolrServerException e) {
32       throw new IOException(e);
33     } finally {
34       server.shutdown();
35     }
36   }

 

 

  • 建立临时的index目录来存放同步过来的数据,临时index目录的格式是index.timestamp。它存放在data目录下。
1 String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
2       tmpIndex = createTempindexDir(core, tmpIdxDirName);
3 
4       tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
5       
6       // cindex dir...
7       indexDirPath = core.getIndexDir();
8       indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);

 

 

  • 判断isFullCopyNeeded是否为true来决定是否需要关闭IndexWriter。如果本分片(slave)的数据的version或者generation新于master(leader)或者是forceReplication,那么必须进行所有数据的完整同步。
 1 // if the generation of master is older than that of the slave , it means they are not compatible to be copied
 2       // then a new index directory to be created and all the files need to be copied
 3       boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
 4           .getCommitTimestamp(commit) >= latestVersion
 5           || commit.getGeneration() >= latestGeneration || forceReplication;
 6 
 7         if (isIndexStale(indexDir)) {
 8           isFullCopyNeeded = true;
 9         }
10         
11         if (!isFullCopyNeeded) {
12           // rollback - and do it before we download any files
13           // so we donx27t remove files we thought we didnx27t need
14           // to download later
15           solrCore.getUpdateHandler().getSolrCoreState()
16           .closeIndexWriter(core, true);
17         }

 

 

  • 现在才开始真正的下载不同的索引文件,Replication是根据索引文件的大小来判断是否发生过变化.下载文件时候,Replication是以packet的大小为单位进行下载的,这可以在SolrConfig.xml中设置,下一篇文章将具体介绍这个。
 1   private void downloadIndexFiles(boolean downloadCompleteIndex,
 2       Directory indexDir, Directory tmpIndexDir, long latestGeneration)
 3       throws Exception {
 4     if (LOG.isDebugEnabled()) {
 5       LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll()));
 6     }
 7     for (Map<String,Object> file : filesToDownload) {
 8       if (!slowFileExists(indexDir, (String) file.get(NAME))
 9           || downloadCompleteIndex) {
10         dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
11             (String) file.get(NAME), false, latestGeneration);
12         currentFile = file;
13         dirFileFetcher.fetchFile();
14         filesDownloaded.add(new HashMap<>(file));
15       } else {
16         LOG.info("Skipping download for " + file.get(NAME)
17             + " because it already exists");
18       }
19     }
20   }
21 
22  ...阅读原文





        
文章来源:itd4j 类别:搜索引擎| 阅读(1583)
推荐文章