Lucence源码分析---5

lucence源码分析—flush

在前几章的分析中经常遇到flush操作,即当索引的相关数据存入内存中的某些数据结构后,再适当的实际就会通过flush函数将这些数据写入文件中,本章就开始分析flush函数,从DocumentsWriter的doflush函数开始分析,下面来看。
DocumentsWriter::doflush

  private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
    boolean hasEvents = false;
    while (flushingDWPT != null) {
      hasEvents = true;
      boolean success = false;
      SegmentFlushTicket ticket = null;
      try {
        try {
          ticket = ticketQueue.addFlushTicket(flushingDWPT);

          final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
          boolean dwptSuccess = false;
          try {
            final FlushedSegment newSegment = flushingDWPT.flush();
            ticketQueue.addSegment(ticket, newSegment);
            dwptSuccess = true;
          } finally {
            subtractFlushedNumDocs(flushingDocsInRam);
            if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
              putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
              hasEvents = true;
            }
            if (!dwptSuccess) {
              putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
              hasEvents = true;
            }
          }
          success = true;
        } finally {
          if (!success && ticket != null) {
            ticketQueue.markTicketFailed(ticket);
          }
        }

        if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) {
          putEvent(ForcedPurgeEvent.INSTANCE);
          break;
        }
      } finally {
        flushControl.doAfterFlush(flushingDWPT);
      }

      flushingDWPT = flushControl.nextPendingFlush();
    }
    if (hasEvents) {
      putEvent(MergePendingEvent.INSTANCE);
    }
    final double ramBufferSizeMB = config.getRAMBufferSizeMB();
    if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
        flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
      hasEvents = true;
      if (!this.applyAllDeletes(deleteQueue)) {
        putEvent(ApplyDeletesEvent.INSTANCE);
      }
    }

    return hasEvents;
  }

传入的参数flushingDWPT是DocumentsWriterPerThread类型代表一个索引文档的线程。
ticketQueue被定义为DocumentsWriterFlushQueue,用来同步多个flush线程,其addFlushTicket定义如下,
DocumentsWriter::doflush->DocumentsWriterFlushQueue::addFlushTicket

  synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
    incTickets();
    boolean success = false;
    try {
      final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
      queue.add(ticket);
      success = true;
      return ticket;
    } finally {
      if (!success) {
        decTickets();
      }
    }
  }

addFlushTicket函数首先通过incTickets增加计数。prepareFlush操作在flush还没开始前将一些被标记的文档删除。该函数主要创建一个SegmentFlushTicket并添加进内部队列queue中。。

回到DocumentsWriter的doflush函数中,该函数继续通过getNumDocsInRAM获得在内存中的文档数,然后调用DocumentsWriterPerThread的flush函数继续进行。
DocumentsWriter::doflush->DocumentsWriterPerThread::flush

  FlushedSegment flush() throws IOException, AbortingException {
    segmentInfo.setMaxDoc(numDocsInRAM);
    final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
    final double startMBUsed = bytesUsed() / 1024. / 1024.;

    if (pendingUpdates.docIDs.size() > 0) {
      flushState.liveDocs = codec.liveDocsFormat().newLiveDocs(numDocsInRAM);
      for(int delDocID : pendingUpdates.docIDs) {
        flushState.liveDocs.clear(delDocID);
      }
      flushState.delCountOnFlush = pendingUpdates.docIDs.size();
      pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.docIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
      pendingUpdates.docIDs.clear();
    }

    if (aborted) {
      return null;
    }

    long t0 = System.nanoTime();

    try {
      consumer.flush(flushState);
      pendingUpdates.terms.clear();
      segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));

      final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, -1L, -1L, -1L);

      final BufferedUpdates segmentDeletes;
      if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
        pendingUpdates.clear();
        segmentDeletes = null;
      } else {
        segmentDeletes = pendingUpdates;
      }

      FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
      sealFlushedSegment(fs);

      return fs;
    } catch (Throwable th) {
    }
  }

flush函数的pendingUpdates保存了待删除或更新的文档ID。假设待删除或更新的文档数大于0,就要标记处这些文档,接下来的codec被定义为Lucene60Codec,往下跟踪可知liveDocsFormat函数返回Lucene50LiveDocsFormat,Lucene50LiveDocsFormat的newLiveDocs函数创建FixedBitSet用来标记待删除或更新的文档ID。再往下的consumer在创建函数中被定义为DefaultIndexingChain,下面开始重点看DefaultIndexingChain的flush函数。

DefaultIndexingChain的flush函数

DefaultIndexingChain的flush函数代码如下所示,
DocumentsWriter::doflush->DocumentsWriterPerThread::flush->DefaultIndexingChain::flush

  public void flush(SegmentWriteState state) throws IOException, AbortingException {

    int maxDoc = state.segmentInfo.maxDoc();
    long t0 = System.nanoTime();
    writeNorms(state);

    t0 = System.nanoTime();
    writeDocValues(state);

    t0 = System.nanoTime();
    writePoints(state);

    t0 = System.nanoTime();
    initStoredFieldsWriter();
    fillStoredFields(maxDoc);
    storedFieldsWriter.finish(state.fieldInfos, maxDoc);
    storedFieldsWriter.close();

    t0 = System.nanoTime();
    Map<String,TermsHashPerField> fieldsToFlush = new HashMap<>();
    for (int i=0;i<fieldHash.length;i++) {
      PerField perField = fieldHash[i];
      while (perField != null) {
        if (perField.invertState != null) {
          fieldsToFlush.put(perField.fieldInfo.name, perField.termsHashPerField);
        }
        perField = perField.next;
      }
    }

    termsHash.flush(fieldsToFlush, state);

    t0 = System.nanoTime();
    docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
  }

参数state中的segmentInfo是DocumentsWriterPerThread构造函数中创建的SegmentInfo,保存了相应的段信息,maxDoc函数返回目前在内存中的文档树。DefaultIndexingChain的flush函数接下来通过writeNorms函数将norm信息写入.nvm和.nvd文件中。

DefaultIndexingChain的writeNorms函数

DefaultIndexingChain::flush->DefaultIndexingChain::writeNorms

  private void writeNorms(SegmentWriteState state) throws IOException {
    boolean success = false;
    NormsConsumer normsConsumer = null;
    try {
      if (state.fieldInfos.hasNorms()) {
        NormsFormat normsFormat = state.segmentInfo.getCodec().normsFormat();
        normsConsumer = normsFormat.normsConsumer(state);

        for (FieldInfo fi : state.fieldInfos) {
          PerField perField = getPerField(fi.name);
          assert perField != null;

          if (fi.omitsNorms() == false && fi.getIndexOptions() != IndexOptions.NONE) {
            perField.norms.finish(state.segmentInfo.maxDoc());
            perField.norms.flush(state, normsConsumer);
          }
        }
      }
      success = true;
    } finally {

    }
  }

Lucene60Codec的normsFormat函数最终返回Lucene53NormsFormat,对应的normsConsumer函数返回一个Lucene53NormsConsumer。
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer

  public NormsConsumer normsConsumer(SegmentWriteState state) throws IOException {
    return new Lucene53NormsConsumer(state, DATA_CODEC, DATA_EXTENSION, METADATA_CODEC, METADATA_EXTENSION);
  }

  Lucene53NormsConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
    boolean success = false;
    try {
      String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
      data = state.directory.createOutput(dataName, state.context);
      CodecUtil.writeIndexHeader(data, dataCodec, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
      meta = state.directory.createOutput(metaName, state.context);
      CodecUtil.writeIndexHeader(meta, metaCodec, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      maxDoc = state.segmentInfo.maxDoc();
      success = true;
    } finally {

    }
  }

IndexFileNames的segmentFileName函数会根据传入的参数段名(例如_0)和拓展名(例如.nvd)构造文件名_0.nvd。接着通过FSDirectory的createOutput创建输出流,代码如下,
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer->TrackingDirectoryWrapper::createOutput

  public IndexOutput createOutput(String name, IOContext context) throws IOException {
    IndexOutput output = in.createOutput(name, context);
    createdFileNames.add(name);
    return output;
  }

  public IndexOutput createOutput(String name, IOContext context) throws IOException {
    ensureOpen();

    pendingDeletes.remove(name);
    maybeDeletePendingFiles();
    return new FSIndexOutput(name);
  }

  public FSIndexOutput(String name) throws IOException {
    this(name, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
  }

createOutput函数内部会根据文件名创建一个FSIndexOutput并返回。

回到Lucene53NormsFormat的normsConsumer函数中,接下来就通过writeIndexHeader向文件写入头信息。
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer->CodecUtil::writeIndexHeader

  public static void writeIndexHeader(DataOutput out, String codec, int version, byte[] id, String suffix) throws IOException {
    writeHeader(out, codec, version);
    out.writeBytes(id, 0, id.length);
    BytesRef suffixBytes = new BytesRef(suffix);
    out.writeByte((byte) suffixBytes.length);
    out.writeBytes(suffixBytes.bytes, suffixBytes.offset, suffixBytes.length);
  }

  public static void writeHeader(DataOutput out, String codec, int version) throws IOException {
    BytesRef bytes = new BytesRef(codec);
    out.writeInt(CODEC_MAGIC);
    out.writeString(codec);
    out.writeInt(version);
  }

回到DefaultIndexingChain的writeNorms函数中,接下来通过getPerField获取PerField,其中的FieldInfo保存了域信息,代码如下
DefaultIndexingChain:flush->writeNorms->getPerField

  private PerField getPerField(String name) {
    final int hashPos = name.hashCode() & hashMask;
    PerField fp = fieldHash[hashPos];
    while (fp != null && !fp.fieldInfo.name.equals(name)) {
      fp = fp.next;
    }
    return fp;
  }

继续往下看,PerField中的成员变量norms在其构造函数中被定义为NormValuesWriter,对应的finish为空函数,而flush函数如下,
DefaultIndexingChain::flush->writeNorms->NormValuesWriter::flush

  public void flush(SegmentWriteState state, NormsConsumer normsConsumer) throws IOException {

    final int maxDoc = state.segmentInfo.maxDoc();
    final PackedLongValues values = pending.build();

    normsConsumer.addNormsField(fieldInfo,
                               new Iterable<Number>() {
                                 @Override
                                 public Iterator<Number> iterator() {
                                   return new NumericIterator(maxDoc, values);
                                 }
                               });
  }

这里的normsConsumer就是Lucene53NormsConsumer,对应的addNormsField函数如下所示,
DefaultIndexingChain::flush->writeNorms->NormValuesWriter::flush->Lucene53NormsConsumer::addNormsField

  public void addNormsField(FieldInfo field, Iterable<Number> values) throws IOException {
    meta.writeVInt(field.number);
    long minValue = Long.MAX_VALUE;
    long maxValue = Long.MIN_VALUE;
    int count = 0;

    for (Number nv : values) {
      final long v = nv.longValue();
      minValue = Math.min(minValue, v);
      maxValue = Math.max(maxValue, v);
      count++;
    }

    if (minValue == maxValue) {
      addConstant(minValue);
    } else if (minValue >= Byte.MIN_VALUE && maxValue <= Byte.MAX_VALUE) {
      addByte1(values);
    } else if (minValue >= Short.MIN_VALUE && maxValue <= Short.MAX_VALUE) {
      addByte2(values);
    } else if (minValue >= Integer.MIN_VALUE && maxValue <= Integer.MAX_VALUE) {
      addByte4(values);
    } else {
      addByte8(values);
    }
  }

该函数再往下看就是将FieldInfo中的数据通过刚刚创建的FSIndexOutput写入到.nvd和.nvm文件中。

DefaultIndexingChain的writeDocValues函数

看完了writeNorms函数,接下来看writeDocValues函数,
DefaultIndexingChain::flush->writeDocValues

  private void writeDocValues(SegmentWriteState state) throws IOException {
    int maxDoc = state.segmentInfo.maxDoc();
    DocValuesConsumer dvConsumer = null;
    boolean success = false;
    try {
      for (int i=0;i<fieldHash.length;i++) {
        PerField perField = fieldHash[i];
        while (perField != null) {
          if (perField.docValuesWriter != null) {
            if (dvConsumer == null) {
              DocValuesFormat fmt = state.segmentInfo.getCodec().docValuesFormat();
              dvConsumer = fmt.fieldsConsumer(state);
            }
            perField.docValuesWriter.finish(maxDoc);
            perField.docValuesWriter.flush(state, dvConsumer);
            perField.docValuesWriter = null;
          } 
          perField = perField.next;
        }
      }
      success = true;
    } finally {

    }
  }

和前面writeNorms函数中的分析类似,writeDocValues函数遍历得到每个PerField,PerField中的docValuesWriter根据不同的Field值域类型被定义为NumericDocValuesWriter、BinaryDocValuesWriter、SortedDocValuesWriter、SortedNumericDocValuesWriter和SortedSetDocValuesWriter,代码如下,
DefaultIndexingChain::flush->indexDocValue

  private void indexDocValue(PerField fp, DocValuesType dvType, IndexableField field) throws IOException {

    if (fp.fieldInfo.getDocValuesType() == DocValuesType.NONE) {
      fieldInfos.globalFieldNumbers.setDocValuesType(fp.fieldInfo.number, fp.fieldInfo.name, dvType);
    }
    fp.fieldInfo.setDocValuesType(dvType);
    int docID = docState.docID;

    switch(dvType) {

      case NUMERIC:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new NumericDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((NumericDocValuesWriter) fp.docValuesWriter).addValue(docID, field.numericValue().longValue());
        break;

      case BINARY:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new BinaryDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((BinaryDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
        break;

      case SORTED:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new SortedDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((SortedDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
        break;

      case SORTED_NUMERIC:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new SortedNumericDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((SortedNumericDocValuesWriter) fp.docValuesWriter).addValue(docID, field.numericValue().longValue());
        break;

      case SORTED_SET:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new SortedSetDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((SortedSetDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
        break;

      default:
        throw new AssertionError();
    }
  }

为了方便分析,下面假设PerField中的docValuesWriter被定义为BinaryDocValuesWriter。

回到writeDocValues函数中,再往下通过docValuesFormat函数返回一个PerFieldDocValuesFormat,并通过PerFieldDocValuesFormat的fieldsConsumer获得一个DocValuesConsumer。
DefaultIndexingChain::flush->writeDocValues->PerFieldDocValuesFormat::fieldsConsumer

  public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
    return new FieldsWriter(state);
  }

fieldsConsumer最后返回的其实是一个FieldsWriter。

回到DefaultIndexingChain的writeDocValues函数中,接下来继续调用docValuesWriter也即前面假设的BinaryDocValuesWriter的flush函数。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush

  public void flush(SegmentWriteState state, DocValuesConsumer dvConsumer) throws IOException {
    final int maxDoc = state.segmentInfo.maxDoc();
    bytes.freeze(false);
    final PackedLongValues lengths = this.lengths.build();
    dvConsumer.addBinaryField(fieldInfo,
                              new Iterable<BytesRef>() {
                                @Override
                                public Iterator<BytesRef> iterator() {
                                   return new BytesIterator(maxDoc, lengths);
                                }
                              });
  }

BinaryDocValuesWriter的flush函数主要调用了FieldsWriter的addBinaryField函数添加FieldInfo中的数据。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField

    public void addBinaryField(FieldInfo field, Iterable<BytesRef> values) throws IOException {
      getInstance(field).addBinaryField(field, values);
    }

addBinaryField首先通过getInstance函数最终获得一个Lucene54DocValuesConsumer。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField->getInstance

    private DocValuesConsumer getInstance(FieldInfo field) throws IOException {
      DocValuesFormat format = null;
      if (field.getDocValuesGen() != -1) {
        final String formatName = field.getAttribute(PER_FIELD_FORMAT_KEY);
        if (formatName != null) {
          format = DocValuesFormat.forName(formatName);
        }
      }
      if (format == null) {
        format = getDocValuesFormatForField(field.name);
      }

      final String formatName = format.getName();

      String previousValue = field.putAttribute(PER_FIELD_FORMAT_KEY, formatName);

      Integer suffix = null;

      ConsumerAndSuffix consumer = formats.get(format);
      if (consumer == null) {
        if (field.getDocValuesGen() != -1) {
          final String suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY);
          if (suffixAtt != null) {
            suffix = Integer.valueOf(suffixAtt);
          }
        }

        if (suffix == null) {
          suffix = suffixes.get(formatName);
          if (suffix == null) {
            suffix = 0;
          } else {
            suffix = suffix + 1;
          }
        }
        suffixes.put(formatName, suffix);

        final String segmentSuffix = getFullSegmentSuffix(segmentWriteState.segmentSuffix,
                                                          getSuffix(formatName, Integer.toString(suffix)));
        consumer = new ConsumerAndSuffix();
        consumer.consumer = format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix));
        consumer.suffix = suffix;
        formats.put(format, consumer);
      } else {
        suffix = consumer.suffix;
      }

      previousValue = field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix));

      return consumer.consumer;
    }

假设是第一次进入该函数,format会通过getDocValuesFormatForField函数被定义为Lucene54DocValuesFormat,然后通过Lucene54DocValuesFormat的fieldsConsumer函数构造一个Lucene54DocValuesConsumer并返回。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField->getInstance->Lucene54DocValuesFormat::fieldsConsumer

  public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
    return new Lucene54DocValuesConsumer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
  }

  public Lucene54DocValuesConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
    boolean success = false;
    try {
      String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
      data = state.directory.createOutput(dataName, state.context);
      CodecUtil.writeIndexHeader(data, dataCodec, Lucene54DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
      meta = state.directory.createOutput(metaName, state.context);
      CodecUtil.writeIndexHeader(meta, metaCodec, Lucene54DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      maxDoc = state.segmentInfo.maxDoc();
      success = true;
    } finally {

    }
  }

和前面的分析类似,这里创建了.dvd和.dvm的文件输出流并写入相应的头信息。
Lucene54DocValuesConsumer的addBinaryField函数就不往下看了,就是调用文件输出流写入相应的数据。

DefaultIndexingChain的writePoints函数

DefaultIndexingChain::flush->writePoints

  private void writePoints(SegmentWriteState state) throws IOException {
    PointsWriter pointsWriter = null;
    boolean success = false;
    try {
      for (int i=0;i<fieldHash.length;i++) {
        PerField perField = fieldHash[i];
        while (perField != null) {
          if (perField.pointValuesWriter != null) {
            if (pointsWriter == null) {
              PointsFormat fmt = state.segmentInfo.getCodec().pointsFormat();
              pointsWriter = fmt.fieldsWriter(state);
            }

            perField.pointValuesWriter.flush(state, pointsWriter);
            perField.pointValuesWriter = null;
          } 
          perField = perField.next;
        }
      }
      if (pointsWriter != null) {
        pointsWriter.finish();
      }
      success = true;
    } finally {

    }
  }

和前面的分析类似,writePoints函数中的pointsFormat最终返回Lucene60PointsFormat,然后通过其fieldsWriter函数获得一个Lucene60PointsWriter。

DefaultIndexingChain::writePoints->Lucene60PointsFormat::fieldsWriter

  public PointsWriter fieldsWriter(SegmentWriteState state) throws IOException {
    return new Lucene60PointsWriter(state);
  }

PerField中的成员变量pointValuesWriter被设置为PointValuesWriter,对应的flush函数如下所示,
DefaultIndexingChain::writePoints->PointValuesWriter::flush

  public void flush(SegmentWriteState state, PointsWriter writer) throws IOException {

    writer.writeField(fieldInfo,
                      new PointsReader() {
                        @Override
                        public void intersect(String fieldName, IntersectVisitor visitor) throws IOException {
                          if (fieldName.equals(fieldInfo.name) == false) {
                            throw new IllegalArgumentException();
                          }
                          for(int i=0;i<numPoints;i++) {
                            bytes.readBytes(packedValue.length * i, packedValue, 0, packedValue.length);
                            visitor.visit(docIDs[i], packedValue);
                          }
                        }

                        @Override
                        public void checkIntegrity() {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public long ramBytesUsed() {
                          return 0L;
                        }

                        @Override
                        public void close() {
                        }

                        @Override
                        public byte[] getMinPackedValue(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public byte[] getMaxPackedValue(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public int getNumDimensions(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public int getBytesPerDimension(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public long size(String fieldName) {
                          return numPoints;
                        }

                        @Override
                        public int getDocCount(String fieldName) {
                          return numDocs;
                        }
                      });
  }

PointValuesWriter的flush函数继而会调用Lucene60PointsWriter的writeField函数,如下所示,
DefaultIndexingChain::writePoints->PointValuesWriter::flush->Lucene60PointsWriter::writeField

  public void writeField(FieldInfo fieldInfo, PointsReader values) throws IOException {

    boolean singleValuePerDoc = values.size(fieldInfo.name) == values.getDocCount(fieldInfo.name);

    try (BKDWriter writer = new BKDWriter(writeState.segmentInfo.maxDoc(),
                                          writeState.directory,
                                          writeState.segmentInfo.name,
                                          fieldInfo.getPointDimensionCount(),
                                          fieldInfo.getPointNumBytes(),
                                          maxPointsInLeafNode,
                                          maxMBSortInHeap,
                                          values.size(fieldInfo.name),
                                          singleValuePerDoc)) {

      values.intersect(fieldInfo.name, new IntersectVisitor() {
          @Override
          public void visit(int docID) {
            throw new IllegalStateException();
          }

          public void visit(int docID, byte[] packedValue) throws IOException {
            writer.add(packedValue, docID);
          }

          @Override
          public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
            return Relation.CELL_CROSSES_QUERY;
          }
        });

      if (writer.getPointCount() > 0) {
        indexFPs.put(fieldInfo.name, writer.finish(dataOut));
      }
    }
  }

结合PointValuesWriter的flush函数中PointsReader的定义,以及Lucene60PointsWriter中writeField函数中visit函数的定义,writeField函数最终会调用BKDWriter的add函数,BKD是一种数据结构,add函数定义如下,

  public void add(byte[] packedValue, int docID) throws IOException {

    if (pointCount >= maxPointsSortInHeap) {
      if (offlinePointWriter == null) {
        spillToOffline();
      }
      offlinePointWriter.append(packedValue, pointCount, docID);
    } else {
      heapPointWriter.append(packedValue, pointCount, docID);
    }

    if (pointCount == 0) {
      System.arraycopy(packedValue, 0, minPackedValue, 0, packedBytesLength);
      System.arraycopy(packedValue, 0, maxPackedValue, 0, packedBytesLength);
    } else {
      for(int dim=0;dim<numDims;dim++) {
        int offset = dim*bytesPerDim;
        if (StringHelper.compare(bytesPerDim, packedValue, offset, minPackedValue, offset) < 0) {
          System.arraycopy(packedValue, offset, minPackedValue, offset, bytesPerDim);
        }
        if (StringHelper.compare(bytesPerDim, packedValue, offset, maxPackedValue, offset) > 0) {
          System.arraycopy(packedValue, offset, maxPackedValue, offset, bytesPerDim);
        }
      }
    }

    pointCount++;
    docsSeen.set(docID);
  }

成员变量heapPointWriter的类型为HeapPointWriter,用来将数据写入内存;offlinePointWriter的类型为OfflinePointWriter,用来将数据写入硬盘。一开始,数据将会通过HeapPointWriter被写入内存,当内存中的数据超过maxPointsSortInHeap时,就调用spillToOffline函数进行切换。

  private void spillToOffline() throws IOException {

    offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", 0, singleValuePerDoc);
    tempInput = offlinePointWriter.out;
    PointReader reader = heapPointWriter.getReader(0, pointCount);
    for(int i=0;i<pointCount;i++) {
      boolean hasNext = reader.next();
      offlinePointWriter.append(reader.packedValue(), i, heapPointWriter.docIDs[i]);
    }

    heapPointWriter = null;
  }

OfflinePointWriter的构造函数会创建类似”段名bkd_spill临时文件数量.tmp”的文件名对应的输出流,然后通过append函数复制HeapPointWriter中的数据。

回到DefaultIndexingChain的writePoints函数中,接下来通过finish函数将数据写入最终的.dim文件中,代码如下,

  public void finish() throws IOException {
    finished = true;
    CodecUtil.writeFooter(dataOut);

    String indexFileName = IndexFileNames.segmentFileName(writeState.segmentInfo.name,
                                                          writeState.segmentSuffix,
                                                          Lucene60PointsFormat.INDEX_EXTENSION);
    try (IndexOutput indexOut = writeState.directory.createOutput(indexFileName, writeState.context)) {
      CodecUtil.writeIndexHeader(indexOut,
                                 Lucene60PointsFormat.META_CODEC_NAME,
                                 Lucene60PointsFormat.INDEX_VERSION_CURRENT,
                                 writeState.segmentInfo.getId(),
                                 writeState.segmentSuffix);
      int count = indexFPs.size();
      indexOut.writeVInt(count);
      for(Map.Entry<String,Long> ent : indexFPs.entrySet()) {
        FieldInfo fieldInfo = writeState.fieldInfos.fieldInfo(ent.getKey());
        indexOut.writeVInt(fieldInfo.number);
        indexOut.writeVLong(ent.getValue());
      }
      CodecUtil.writeFooter(indexOut);
    }
  }

数据写入.fdt以及.fdx文件

继续看DefaultIndexingChain的flush函数,接下来通过initStoredFieldsWriter函数初始化一个StoredFieldsWriter,代码如下
DefaultIndexingChain::flush->initStoredFieldsWriter

  private void initStoredFieldsWriter() throws IOException {
    if (storedFieldsWriter == null) {
      storedFieldsWriter = docWriter.codec.storedFieldsFormat().fieldsWriter(docWriter.directory, docWriter.getSegmentInfo(), IOContext.DEFAULT);
    }
  }

storedFieldsFormat函数返回Lucene50StoredFieldsFormat,其fieldsWriter函数会接着调用CompressingStoredFieldsFormat的fieldsWriter函数,最后返回CompressingStoredFieldsWriter,
DefaultIndexingChain::flush->initStoredFieldsWriter->Lucene60Codec::storedFieldsFormat->Lucene50StoredFieldsFormat::fieldsWriter->CompressingStoredFieldsFormat::fieldsWriter

  public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si,
      IOContext context) throws IOException {
    return new CompressingStoredFieldsWriter(directory, si, segmentSuffix, context,
        formatName, compressionMode, chunkSize, maxDocsPerChunk, blockSize);
  }

CompressingStoredFieldsWriter的构造函数如下所示,
DefaultIndexingChain::flush->initStoredFieldsWriter->Lucene60Codec::storedFieldsFormat->Lucene50StoredFieldsFormat::fieldsWriter->CompressingStoredFieldsFormat::fieldsWriter->CompressingStoredFieldsWriter::CompressingStoredFieldsWriter

  public CompressingStoredFieldsWriter(Directory directory, SegmentInfo si, String segmentSuffix, IOContext context, String formatName, CompressionMode compressionMode, int chunkSize, int maxDocsPerChunk, int blockSize) throws IOException {
    assert directory != null;
    this.segment = si.name;
    this.compressionMode = compressionMode;
    this.compressor = compressionMode.newCompressor();
    this.chunkSize = chunkSize;
    this.maxDocsPerChunk = maxDocsPerChunk;
    this.docBase = 0;
    this.bufferedDocs = new GrowableByteArrayDataOutput(chunkSize);
    this.numStoredFields = new int[16];
    this.endOffsets = new int[16];
    this.numBufferedDocs = 0;

    boolean success = false;
    IndexOutput indexStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_INDEX_EXTENSION), context);
    try {
      fieldsStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_EXTENSION),context);

      final String codecNameIdx = formatName + CODEC_SFX_IDX;
      final String codecNameDat = formatName + CODEC_SFX_DAT;
      CodecUtil.writeIndexHeader(indexStream, codecNameIdx, VERSION_CURRENT, si.getId(), segmentSuffix);
      CodecUtil.writeIndexHeader(fieldsStream, codecNameDat, VERSION_CURRENT, si.getId(), segmentSuffix);

      indexWriter = new CompressingStoredFieldsIndexWriter(indexStream, blockSize);
      indexStream = null;

      fieldsStream.writeVInt(chunkSize);
      fieldsStream.writeVInt(PackedInts.VERSION_CURRENT);

      success = true;
    } finally {

    }
  }

CompressingStoredFieldsWriter的构造函数创建了.fdt和.fdx两个文件并建立输出流。

回到DefaultIndexingChain的flush函数中,接下来调用fillStoredFields,进而调用startStoredFields以及finishStoredFields函数,startStoredFields函数会调用刚刚上面构造的CompressingStoredFieldsWriter的startDocument函数,该函数为空,finishStoredFields函数会调用CompressingStoredFieldsWriter的finishDocument函数,代码如下
DefaultIndexingChain::flush->fillStoredFields->startStoredFields->CompressingStoredFieldsWriter::finishDocument

  public void finishDocu

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

代理模式之简单的动态代理 - 2016-07-25 19:07:30

目录 目录 动态代理定义 重点类和接口 代码示例 代码 代码运行截图 Java 动态代理具体有如下四步骤 美中不足 动态代理定义: 所谓Dynamic Proxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就宣称它实现了这些 interface。你当然可以把该class的实例当作这些interface中的任何一个来用。当然,这个Dynamic Proxy其实就是一个Proxy,它不会替你作实质性的工作,在生成它的实例时你必须提供一个h

[干货] Flume综述与实例 - 2016-07-25 19:07:29

Flume 是一个分布式的、可靠的数据收集、集合和移动的组件。基于流式数据模型,非常健壮、支持容错、故障转移等特性。本用实例辅助说明Flume的大部分核心概念。 版本记录: 2016-07-23 初稿 安装FLume Flume的安装非常简单,其核心就是agent。 从官网下载稳定版本: wget http://apache .fayea .com /flume/ 1.6 .0 /apache-flume- 1.6 .0 -bin .tar .gz tar zxvf apache-flume- 1.6 .
本文由EasyDarwin开源团队成员Alex贡献: http://blog.csdn.net/cai6811376/article/details/52006958 EasyDarwin云平台一直在稳步的升级迭代中,近日,EasyDarwin云平台实现了语音对讲的功能。对讲功能的加入,标志着EasyDarwin云平台进一步的完善。 流程设计 客户端使用POST的方式在body中携带协议报文向云平台发送开始对讲命令; 云平台组织协议报文向指定的设备发送; 设备执行开始对讲命令后向云平台返回相应报文; 云平

Spring事务管理(5)-开启事务 - 2016-07-25 18:07:44

在前几篇文章中,我们分析了Spring的AOP实现,AOP的实现原理即JdkDynamicAop或Cglib。目标方法执行的时候,进入invoke方法中先执行Advisors链。Spring的事务开启需要在目标方法执行前进行,因此可以作为一个前置增强添加到Advisors链中。 Spring的声明式事务配置如下: bean id = "transactionManager" class = "org.springframework.orm.hibernate3.HibernateTransactionMa

Linux之vim学习 - 2016-07-25 18:07:26

vim 分三种模式:一般模式、编辑模式、命令模式 1.一般模式 一般模式下可以进行移动光标、删除、粘贴复制等操作 移动光标操作 h或向左箭头:光标左移动一个字符j或向下箭头:光标下移动一个字符k或向上箭头:光标上移动一个字符l或向右箭头:光标右移动一个字符 30 j:光标下移动 30 个字符ctrl+f:屏幕向下移动一页,相当于Page Down (常用)ctrl+b:屏幕向上移动一页,相当于Page Down (常用)ctrl+d:屏幕向下移动半页ctrl+u:屏幕向上移动半页nspace右移动n个字符
1.DispatchAction-分派Action 1.1 为什么需要DispatchAction 如果每个请求都对应一个Action,就会造成action过多,程序显得比较臃肿,所以可以把一类请求写到一个action中处理,即DispatchAction 在没有使用框架之前,当我们通过一个控制器处理多个请求的时候,我们是通过在URL后面跟上参数来区别不同的操作,比如,下述链接: http://localhost:8080/UserManager/main?operateId=adduid=123 htt

[置顶] 逐步深入TCP/IP协议栈 - 2016-07-25 18:07:17

一、关于应用层用户数据的历程,见下图:                                                                             TCP/IP数据包的封装 过程: 应用层将数据通过协议栈逐层向下传递,其下的每层接到来自上层的数据时,根据每层的协议都要在其数据 的前端添加首部信息进行封装。不同的协议层对数据包有不同 的称谓,在传输层叫做数据段,在网络层叫做数据报, 在链路层叫做数据帧。在经过链路层时,数据已经封装成帧并递交给物理层的传输介质上,到

Linux命令应用大词典 目录 - 2016-07-25 18:07:40

【作者】 於岳 【编辑】 李永涛 【ISBN】 978-7-115-40151-9 【日期】 2015-12 【页数】 703页 【字数】 1123千字 【开本】 16 【定价】 89元 【总数】 本书包含46大类729个命令 按章节分 第1章 登录、退出、关机和重启 章节-页码 命令 介绍 备注 1.1-P1 login 用户登录系统 1.2-P1 logout 退出登录Shell 1.3-P1 nologin 限制用户登录 1.4-P2 exit 退出Shell 1.5-P2 sulogin 单用户登

Dubbo--生态系统安装 - 2016-07-25 18:07:33

dubbo的整套环境主要有几个系统: Zookeeper注册中心安装 管理控制台安装 简易监控中心安装       在 Dubbo官网 中已经讲的很详细。本文主要是为了记录一下自己在安装过程遇到的问题,同时也把安装完成之后的文件包记录 下。       运行环境: tate@ubuntu:~$ uname -aLinux ubuntu 3.19.0-65-generic #73~14.04.1-Ubuntu SMP Wed Jun 29 21:05:22 UTC 2016 x86_64 x86_64 x8
1.使用者与群组以及其他人的概念 假设有一家人,家裡只有三兄弟,分别是王大毛、王二毛与王三毛三个人, 而这个家庭是登记在王大毛的名下的!所以,『王大毛家有三个人,分别是王大毛、王二毛与王三毛』, 而且这三个人都有自己的房间,并且共同拥有一个客厅! 使用者的意义:由于王家三人各自拥有自己的房间,所以, 王二毛虽然可以进入王三毛的房间,但是二毛不能翻三毛的抽屉! 因为抽屉里面面可能有三毛自己私人的东西,例如日记等等的,这是『私人的空间』,所以当然不能让二毛拿!  群组的概念:由于共同拥有客厅,所以王家三兄弟可