本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略。本文以及后续的文章将重点介绍Replication策略。Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步。本文先介绍在SolrCloud的leader到replica的数据同步,下一篇文章将介绍通过配置SolrConfig.xml来实现多个Solr节点间的主从同步。
一. Replication策略介绍
Replication的作用在前文已经介绍过了,当需要同步的数据较多时候,Solr会放弃按document为单位的同步模式(PeerSync)而采用以文件为最小单位的同步模式。在Replication的过程中,重点使用了SnapPuller类,它封装了对leader数据快照以及通过快照来实现同步的代码。Replication流程原理如下图所示。接下来根据源码来介绍每一步骤。
- 开始Replication的时候,首先进行一次commitOnLeader操作,即发送commit命令到leader。它的作用就是将leader的update中的数据刷入到索引文件中,使得快照snap完整。
1 private void commitOnLeader(String leaderUrl) throws SolrServerException, 2 IOException { 3 HttpSolrServer server = new HttpSolrServer(leaderUrl); 4 try { 5 server.setConnectionTimeout(30000); 6 UpdateRequest ureq = new UpdateRequest(); 7 ureq.setParams(new ModifiableSolrParams()); 8 ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true); 9 ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false); 10 ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process( 11 server); 12 } finally { 13 server.shutdown(); 14 } 15 }
- 获取leader的lastVersion与lastGeneration,同本分片的进行比较来确定是否需要进行同步。
1 //get the current x27replicateablex27 index version in the master 2 NamedList response = null; 3 try { 4 response = getLatestVersion(); 5 } catch (Exception e) { 6 LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage()); 7 return false; 8 } 9 long latestVersion = (Long) response.get(CMD_INDEX_VERSION); 10 long latestGeneration = (Long) response.get(GENERATION);
- 检查本分片是否打开IndexWriter,如果没有则Recovery失败。这是因为没有打开indexWriter就无法获取索引的generation以及version信息,replication无法进行下去。
1 // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes) 2 IndexCommit commit = core.getDeletionPolicy().getLatestCommit(); 3 if (commit == null) { 4 // Presumably the IndexWriter hasnx27t been opened yet, and hence the deletion policy hasnx27t been updated with commit points 5 RefCounted<SolrIndexSearcher> searcherRefCounted = null; 6 try { 7 searcherRefCounted = core.getNewestSearcher(false); 8 if (searcherRefCounted == null) { 9 LOG.warn("No open searcher found - fetch aborted"); 10 return false; 11 } 12 commit = searcherRefCounted.get().getIndexReader().getIndexCommit(); 13 } finally { 14 if (searcherRefCounted != null) 15 searcherRefCounted.decref(); 16 } 17 }
- 如果获取的leader的lastestVersion为0,则表示leader没有索引数据,那么根本就不需要进行replication。所以返回true结果。
1 if (latestVersion == 0L) { 2 if (forceReplication && commit.getGeneration() != 0) { 3 // since we wonx27t get the files for an empty index, 4 // we just clear ours and commit 5 RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core); 6 try { 7 iw.get().deleteAll(); 8 } finally { 9 iw.decref(); 10 } 11 SolrQueryRequest req = new LocalSolrQueryRequest(core, 12 new ModifiableSolrParams()); 13 core.getUpdateHandler().commit(new CommitUpdateCommand(req, false)); 14 } 15 16 //there is nothing to be replicated 17 successfulInstall = true; 18 return true; 19 }
- 我们还需要通过比较分片的lastestVersion和leader的lastestVersion来确定是否需要继续进行replication,因为两者相等同样没必要进行replication,除非进行的时forceReplication
1 if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) { 2 //master and slave are already in sync just return 3 LOG.info("Slave in sync with master."); 4 successfulInstall = true; 5 return true; 6 }
- 获取leader节点的lastestGeneration的索引文件列表以及相关文件信息,以及配置文件列表以及信息。如果文件列表为空,则退出replication。
1 // get the list of files first 2 fetchFileList(latestGeneration); 3 // this can happen if the commit point is deleted before we fetch the file list. 4 if(filesToDownload.isEmpty()) return false; 5 6 private void fetchFileList(long gen) throws IOException { 7 ModifiableSolrParams params = new ModifiableSolrParams(); 8 params.set(COMMAND, CMD_GET_FILE_LIST); 9 params.set(GENERATION, String.valueOf(gen)); 10 params.set(CommonParams.WT, "javabin"); 11 params.set(CommonParams.QT, "/replication"); 12 QueryRequest req = new QueryRequest(params); 13 HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler 14 try { 15 server.setSoTimeout(60000); 16 server.setConnectionTimeout(15000); 17 NamedList response = server.request(req); 18 19 List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST); 20 if (files != null) 21 filesToDownload = Collections.synchronizedList(files); 22 else { 23 filesToDownload = Collections.emptyList(); 24 LOG.error("No files to download for index generation: "+ gen); 25 } 26 27 files = (List<Map<String,Object>>) response.get(CONF_FILES); 28 if (files != null) 29 confFilesToDownload = Collections.synchronizedList(files); 30 31 } catch (SolrServerException e) { 32 throw new IOException(e); 33 } finally { 34 server.shutdown(); 35 } 36 }
- 建立临时的index目录来存放同步过来的数据,临时index目录的格式是index.timestamp。它存放在data目录下。
1 String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date()); 2 tmpIndex = createTempindexDir(core, tmpIdxDirName); 3 4 tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType); 5 6 // cindex dir... 7 indexDirPath = core.getIndexDir(); 8 indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
- 判断isFullCopyNeeded是否为true来决定是否需要关闭IndexWriter。如果本分片(slave)的数据的version或者generation新于master(leader)或者是forceReplication,那么必须进行所有数据的完整同步。
1 // if the generation of master is older than that of the slave , it means they are not compatible to be copied 2 // then a new index directory to be created and all the files need to be copied 3 boolean isFullCopyNeeded = IndexDeletionPolicyWrapper 4 .getCommitTimestamp(commit) >= latestVersion 5 || commit.getGeneration() >= latestGeneration || forceReplication; 6 7 if (isIndexStale(indexDir)) { 8 isFullCopyNeeded = true; 9 } 10 11 if (!isFullCopyNeeded) { 12 // rollback - and do it before we download any files 13 // so we donx27t remove files we thought we didnx27t need 14 // to download later 15 solrCore.getUpdateHandler().getSolrCoreState() 16 .closeIndexWriter(core, true); 17 }
- 现在才开始真正的下载不同的索引文件,Replication是根据索引文件的大小来判断是否发生过变化.下载文件时候,Replication是以packet的大小为单位进行下载的,这可以在SolrConfig.xml中设置,下一篇文章将具体介绍这个。
1 private void downloadIndexFiles(boolean downloadCompleteIndex, 2 Directory indexDir, Directory tmpIndexDir, long latestGeneration) 3 throws Exception { 4 if (LOG.isDebugEnabled()) { 5 LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll())); 6 } 7 for (Map<String,Object> file : filesToDownload) { 8 if (!slowFileExists(indexDir, (String) file.get(NAME)) 9 || downloadCompleteIndex) { 10 dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file, 11 (String) file.get(NAME), false, latestGeneration); 12 currentFile = file; 13 dirFileFetcher.fetchFile(); 14 filesDownloaded.add(new HashMap<>(file)); 15 } else { 16 LOG.info("Skipping download for " + file.get(NAME) 17 + " because it already exists"); 18 } 19 } 20 } 21 22 ...阅读原文
推荐文章
1. Solr4.8.0源码分析(20)之SolrCloud的Recovery策略(一)
(R:1727)[2016-01-31]
2. Solr4.8.0源码分析(19)之缓存机制(二) (R:1494)[2016-01-30]
3. Solr4.8.0源码分析(15) 之 SolrCloud索引深入(2) (R:1322)[2016-01-23]
4. Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四) (R:1450)[2016-02-12]
5. Solr4.8.0源码分析(9)之Lucene的索引文件(2) (R:1465)[2016-01-11]
6. Solr4.8.0源码分析(8)之Lucene的索引文件(1) (R:1339)[2016-01-10]
7. Solr4.8.0源码分析(1)之Solr的Servlet (R:1366)[2015-12-25]
8. Solr4.8.0源码分析(4)之Eclipse Solr调试环境搭建 (R:1388)[2015-12-31]
9. Solr4.8.0源码分析(17)之SolrCloud索引深入(4) (R:1390)[2016-01-27]
10. Solr4.8.0源码分析(12)之Lucene的索引文件(5) (R:1419)[2016-01-16]
11. Solr4.8.0源码分析(18)之缓存机制(一) (R:1708)[2016-01-28]
12. Solr4.8.0源码分析(21)之SolrCloud的Recovery策略(二) (R:1318)[2016-02-01]
13. Solr4.8.0源码分析(7)之Solr SPI (R:1531)[2016-01-08]
14. Solr4.8.0源码分析(10)之Lucene的索引文件(3) (R:1526)[2016-01-13]
15. Solr4.8.0源码分析(11)之Lucene的索引文件(4) (R:1650)[2016-01-15]
2. Solr4.8.0源码分析(19)之缓存机制(二) (R:1494)[2016-01-30]
3. Solr4.8.0源码分析(15) 之 SolrCloud索引深入(2) (R:1322)[2016-01-23]
4. Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四) (R:1450)[2016-02-12]
5. Solr4.8.0源码分析(9)之Lucene的索引文件(2) (R:1465)[2016-01-11]
6. Solr4.8.0源码分析(8)之Lucene的索引文件(1) (R:1339)[2016-01-10]
7. Solr4.8.0源码分析(1)之Solr的Servlet (R:1366)[2015-12-25]
8. Solr4.8.0源码分析(4)之Eclipse Solr调试环境搭建 (R:1388)[2015-12-31]
9. Solr4.8.0源码分析(17)之SolrCloud索引深入(4) (R:1390)[2016-01-27]
10. Solr4.8.0源码分析(12)之Lucene的索引文件(5) (R:1419)[2016-01-16]
11. Solr4.8.0源码分析(18)之缓存机制(一) (R:1708)[2016-01-28]
12. Solr4.8.0源码分析(21)之SolrCloud的Recovery策略(二) (R:1318)[2016-02-01]
13. Solr4.8.0源码分析(7)之Solr SPI (R:1531)[2016-01-08]
14. Solr4.8.0源码分析(10)之Lucene的索引文件(3) (R:1526)[2016-01-13]
15. Solr4.8.0源码分析(11)之Lucene的索引文件(4) (R:1650)[2016-01-15]