mahout之推荐系统源码笔记(2) ---相似度计算之RowSimilarityJob

mahout之推荐系统源码笔记(2) —相似度计算之RowSimilarityJob

本笔记承接笔记一。
在笔记1中我们分析了PreparePreferenceMatrixJob的源码,该job对输入数据进行了一定的预处理准备工作。接下来mahout使用RowSimilarityJob对数据user-item集的相似度进行计算,得到每个物品关于其他所有物品的相似度矩阵。

首先我们同样看RecommenderJob(org.apache.mahout.cf.taste.hadoop.item),可以到执行RowSimilarityJob的代码如下:

 //shouldRunNextPhase方法之前有过解释,容错机制,不再赘述。
 if (shouldRunNextPhase(parsedArgs, currentPhase)) {

      /* special behavior if phase 1 is skipped */
      //如果numberOfUsers等于-1则证明之前的prepare阶段被跳过了,
      //为什么会被跳过呢,博主猜测是因为多次处理相同的数据,我们不需要每一次task都预处理,
      //这样我们就可以跳过该阶段,那么我们需要读取之前存取写下的文件获得所有user的数量 
      if (numberOfUsers == -1) {
        numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
                PathType.LIST, null, getConf());
      }

      //calculate the co-occurrence matrix
      //计算相似度矩阵(共现矩阵),Option的设定传递,同prepare的job
      ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
        //注意这里的inputPath,它使用了之前PreparePreferenceMatrixJob得到的评价矩阵
        //即 [ itemID , Vector< userID , Pref > ]
        "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
        "--output", similarityMatrixPath.toString(),
        "--numberOfColumns", String.valueOf(numberOfUsers),
        "--similarityClassname", similarityClassname,
        "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity),
        "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity),
        "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
        "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
        "--threshold", String.valueOf(threshold),
        "--randomSeed", String.valueOf(randomSeed),
        "--tempDir", getTempPath().toString(),
      });

      // write out the similarity matrix if the user specified that behavior
      //此行为是根据用户的初始设定,是否将物品和物品之间的相似度矩阵输出,默认false不输出。
      if (hasOption("outputPathForSimilarityMatrix")) {
        Path outputPathForSimilarityMatrix = new Path(getOption("outputPathForSimilarityMatrix"));

        Job outputSimilarityMatrix = prepareJob(similarityMatrixPath, outputPathForSimilarityMatrix,
            SequenceFileInputFormat.class, ItemSimilarityJob.MostSimilarItemPairsMapper.class,
            EntityEntityWritable.class, DoubleWritable.class, ItemSimilarityJob.MostSimilarItemPairsReducer.class,
            EntityEntityWritable.class, DoubleWritable.class, TextOutputFormat.class);

        Configuration mostSimilarItemsConf = outputSimilarityMatrix.getConfiguration();
        mostSimilarItemsConf.set(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR,
            new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
        mostSimilarItemsConf.setInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, maxSimilaritiesPerItem);
        outputSimilarityMatrix.waitForCompletion(true);
      }
    }

我们首先跟进RowSimilarityJob(),这个job进行了相似矩阵的获取,比较重要。
进入RowSimilarityJob,我们看main函数:

public static void main(String[] args) throws Exception {
    ToolRunner.run(new RowSimilarityJob(), args);
  }

同recommenderjob相同,这里的run方法回调RowSimilarityJob中的run方法,接下来我们看run方法:


public int run(String[] args) throws Exception {

    //初始Option初始化以及各个传参赋值。
    addInputOption();
    addOutputOption();
    addOption("numberOfColumns", "r", "Number of columns in the input matrix", false);
    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
        + "one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
    addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: "
        + DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
    addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to themselves?", String.valueOf(false));
    addOption("threshold", "tr", "discard row pairs with a similarity value below this", false);
    addOption("maxObservationsPerRow", null, "sample rows down to this number of entries",
        String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_ROW));
    addOption("maxObservationsPerColumn", null, "sample columns down to this number of entries",
        String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_COLUMN));
    addOption("randomSeed", null, "use this seed for sampling", false);
    addOption(DefaultOptionCreator.overwriteOption().create());

    Map<String,List<String>> parsedArgs = parseArguments(args);
    if (parsedArgs == null) {
      return -1;
    }


    int numberOfColumns;

    if (hasOption("numberOfColumns")) {
      // Number of columns explicitly specified via CLI
      numberOfColumns = Integer.parseInt(getOption("numberOfColumns"));
    } else {
      // else get the number of columns by determining the cardinality of a vector in the input matrix
      numberOfColumns = getDimensions(getInputPath());
    }

    String similarityClassnameArg = getOption("similarityClassname");
    String similarityClassname;
    try {
      similarityClassname = VectorSimilarityMeasures.valueOf(similarityClassnameArg).getClassname();
    } catch (IllegalArgumentException iae) {
      similarityClassname = similarityClassnameArg;
    }

    // Clear the output and temp paths if the overwrite option has been set
    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
      // Clear the temp path
      HadoopUtil.delete(getConf(), getTempPath());
      // Clear the output path
      HadoopUtil.delete(getConf(), getOutputPath());
    }

    int maxSimilaritiesPerRow = Integer.parseInt(getOption("maxSimilaritiesPerRow"));
    boolean excludeSelfSimilarity = Boolean.parseBoolean(getOption("excludeSelfSimilarity"));
    double threshold = hasOption("threshold")
        ? Double.parseDouble(getOption("threshold")) : NO_THRESHOLD;
    long randomSeed = hasOption("randomSeed")
        ? Long.parseLong(getOption("randomSeed")) : NO_FIXED_RANDOM_SEED;

    int maxObservationsPerRow = Integer.parseInt(getOption("maxObservationsPerRow"));
    int maxObservationsPerColumn = Integer.parseInt(getOption("maxObservationsPerColumn"));

    Path weightsPath = getTempPath("weights");
    Path normsPath = getTempPath("norms.bin");
    Path numNonZeroEntriesPath = getTempPath("numNonZeroEntries.bin");
    Path maxValuesPath = getTempPath("maxValues.bin");
    Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");

    Path observationsPerColumnPath = getTempPath("observationsPerColumn.bin");

    AtomicInteger currentPhase = new AtomicInteger();

    /*以上操作大概为初始化Option,对run方法中使用的变量根据Option进行赋值 */



    //执行RowSimilarityJob的第一个job
    //统计每个user操作item的数量
    Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), CountObservationsMapper.class,
        NullWritable.class, VectorWritable.class, SumObservationsReducer.class, NullWritable.class,
        VectorWritable.class);
    countObservations.setCombinerClass(VectorSumCombiner.class);
    countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
    countObservations.setNumReduceTasks(1);
    countObservations.waitForCompletion(true);


    //执行RowSimilarityJob的第二个job
    //将item-user 矩阵转化为 user-item 矩阵,并且在转化的中间记录下每个item基于所有用户的偏好值的平方和
    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
          VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
      normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
      Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
      normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
      normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
      normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
      normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
      normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
      normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
      normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, String.valueOf(maxObservationsPerRow));
      normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, String.valueOf(maxObservationsPerColumn));
      normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed));

      boolean succeeded = normsAndTranspose.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }

    //执行RowSimilarityJob的第三个job
    //通过对 user-item 矩阵处理,得到每个item和其他item的偏好值得乘积之和,
    //并通过之前计算得到的每个item的偏好值平方计算item的相似度矩阵
    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
          IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
      pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
      Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
      pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
      pairwiseConf.set(NORMS_PATH, normsPath.toString());
      pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
      pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
      pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
      pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
      pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
      boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }


    //执行RowSimilarityJob的第四个job
    //通过上一个mapreduce计算得到的相似度矩阵,求出topN,并将item相似度矩阵补全,补全之后再求一遍topN。
    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
          IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
          VectorWritable.class);
      asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
      asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
      boolean succeeded = asMatrix.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }

    return 0;
  }

之上的RowSimilarityJob总共进行了四个mapreduce,分别是1(CountObservationsMapper,VectorSumCombiner,SumObservationsReducer)、2(VectorNormMapper,MergeVectorsReducer)、3(CooccurrencesMapper,SimilarityReducer)和 4(UnsymmetrifyMapper,MergeToTopKSimilaritiesReducer),接下来我们跟进这四个mapreduce,看看它们都做了什么。

(CountObservationsMapper,VectorSumCombiner,SumObservationsReducer)计算用户操作数量。
输入:[ itemID , Vector< userID , Pref > ] (PreparePreferenceMatrixJob.ToItemVectorsReducer的输出,找不到见笔记(1))
输出:[ Null , Vector< userID , OperationNumOfuserID > ]
代码:

  public static class CountObservationsMapper extends Mapper<IntWritable,VectorWritable,NullWritable,VectorWritable> {

    private Vector columnCounts = new RandomAccessSparseVector(Integer.MAX_VALUE);

    @Override
    protected void map(IntWritable rowIndex, VectorWritable rowVectorWritable, Context ctx)
      throws IOException, InterruptedException {

      //get()方法获取输入数据中的Vector< userID , Pref >
      Vector row = rowVectorWritable.get();
      for (Element elem : row.nonZeroes()) {

        //这里通过向量columnCounts统计当前数据块中每个userID的条数,其实此条数就是操作数量
        columnCounts.setQuick(elem.index(), columnCounts.getQuick(elem.index()) + 1);
      }
    }

    @Override
    protected void cleanup(Context ctx) throws IOException, InterruptedException {

      //在cleanup中将columnCounts写下来,这样的好处是只写一次,而不需要每次map中的for循环都写一遍
      //其实我想这个write也可以放在每个map的最后。
      ctx.write(NullWritable.get(), new VectorWritable(columnCounts));
    }
  }


  public class VectorSumCombiner
      extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {

    private final VectorWritable result = new VectorWritable();

    @Override
    protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
      throws IOException, InterruptedException {

      //汇总map计算的结果,然后写下来
      result.set(Vectors.sum(values.iterator()));
      ctx.write(key, result);
    }
  }


  public static class SumObservationsReducer extends Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
    @Override
    protected void reduce(NullWritable nullWritable, Iterable<VectorWritable> partialVectors, Context ctx)
    throws IOException, InterruptedException {

      //将统计结果写在定义好的路径中,sum是将所有分散的汇总到一起
      Vector counts = Vectors.sum(partialVectors.iterator());
      Vectors.write(counts, new Path(ctx.getConfiguration().get(OBSERVATIONS_PER_COLUMN_PATH)), ctx.getConfiguration());
    }
  }

(CountObservationsMapper,VectorSumCombiner,SumObservationsReducer)将输入的item-user,pref格式的key-value进行转化,步骤大概如下:

map:
[ itemID , Vector< userID , Pref > ]
->Vector< userID , num >
->[ NULL , Vector< userID , num > ]

combine:
[ NULL , Vector< userID , num > ]
->[ NULL , VectorWritable< userID , num > ]

reduce:
[ NULL , Vector< userID , num > ]
->WriteResultToPath: OBSERVATIONS_PER_COLUMN_PATH

(VectorNormMapper,MergeVectorsReducer)转换用户矩阵以及计算每个商品偏好的平方和。
输入:[ itemID , Vector< userID , Pref > ] (PreparePreferenceMatrixJob.ToItemVectorsReducer的输出,找不到见笔记(1))
输出:[ userID , Vector< itemID , Pref > ] 和 [ itemID,norms (偏好平方和)]
代码:

这里又一个小细节,VectorNormMapper首先会用到一个sampleDown函数,我们首先分析这个函数:

private Vector sampleDown(Vector rowVector, Context ctx) {

      //getNumNondefaultElements过滤掉rowVector中存在默认值的向量,
      //可以把这一部当做一个数据的清洗处理,留下我们赋值过的向量。
      int observationsPerRow = rowVector.getNumNondefaultElements();

      //这里的maxObservationsPerRow是RecommenderJob中的maxPrefsInItemSimilarity
      //maxPrefsInItemSimilarity的意思是计算相似度时考虑偏好的最大个数
      //如果偏好的数量大于maxPrefsInItemSimilarity,那么就采用sampleDown进行采样
      //行采样比 = min(最大采样数量,矩阵中该行最大数量)/ 矩阵中该行最大数量
      double rowSampleRate = (double) Math.min(maxObservationsPerRow, observationsPerRow) / (double) observationsPerRow;

      //初始化一个和参数矩阵一样的相似矩阵
      Vector downsampledRow = rowVector.like();
      long usedObservations = 0;
      long neglectedObservations = 0;

      for (Element elem : rowVector.nonZeroes()) {

        //读取我们之前计算好的统计用户操作数量的数据,通过index得到某用户的操作个数
        int columnCount = observationsPerColumn.get(elem.index());

        //在RecommenderJob进行传参的时候我们可以找到:
        //maxObservationsPerColumn和maxObservationsPerRow是相等的
        double columnSampleRate = (double) Math.min(maxObservationsPerColumn, columnCount) / (double) columnCount;

        //随机抽样
        if (random.nextDouble() <= Math.min(rowSampleRate, columnSampleRate)) {
          downsampledRow.setQuick(elem.index(), elem.get());
          usedObservations++;
        } else {
          neglectedObservations++;
        }

      }

      //记录下我们抽样过程中使用的数量和忽视的数量
      ctx.getCounter(Counters.USED_OBSERVATIONS).increment(usedObservations);
      ctx.getCounter(Counters.NEGLECTED_OBSERVATIONS).increment(neglectedObservations);

      return downsampledRow;
    }

接下来我们回到VectorNormMapper:

public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {

    ...

    @Override
    protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
      throws IOException, InterruptedException {

      //这里用到一个采样函数sampleDown预处理
      Vector sampledRowVector = sampleDown(vectorWritable.get(), ctx);

      //偏好值正规化
      Vector rowVector = similarity.normalize(sampledRowVector);

      int numNonZeroEntries = 0;
      double maxValue = Double.MIN_VALUE;

      //这个for循环将输入格式为[itemID,Vector<userID,pref>]转化为[userID,Vector<itemID,pref>]输出给reduce
      //博主也有点莫名其妙他为什么换来换去。。
      for (Element element : rowVector.nonZeroes()) {
        RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
        partialColumnVector.setQuick(row.get(), element.get());
        ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));

        numNonZeroEntries++;
        if (maxValue < element.get()) {
          maxValue = element.get();
        }
      }

      if (threshold != NO_THRESHOLD) {
        nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
        maxValues.setQuick(row.get(), maxValue);
      }

      //这里计算每个item的所有user的偏好值的平方和:
      //[itemID , sum(valueOf(userID)^2)]
      //将结果放在norms中。
      norms.setQuick(row.get(), similarity.norm(rowVector));

      ctx.getCounter(Counters.ROWS).increment(1);
    }

    @Override
    protected void cleanup(Context ctx) throws IOException, InterruptedException {

      //记录下三个特殊结果,将key设置成NORM_VECTOR_MARKER、NUM_NON_ZERO_ENTRIES_VECTOR_MARKER、MAXVALUE_VECTOR_MARKER(这三个都是极小的负数)
      //来和其他的user-item区分开
      //分别保存了偏好平方和、非零项还有最大偏好值
      ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms));
      ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries));
      ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues));
    }
  }


private static class MergeVectorsCombiner extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
    @Override
    protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
      throws IOException, InterruptedException {
      //最简单的合并
      ctx.write(row, new VectorWritable(Vectors.merge(partialVectors)));
    }
  }

public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {

    ...

    @Override
    protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
      throws IOException, InterruptedException {
      Vector partialVector = Vectors.merge(partialVectors);

      //通过对key的判断抽出我们之前单独处理过的三个数值,将它们写到磁盘上供之后使用。
      if (row.get() == NORM_VECTOR_MARKER) {
        Vectors.write(partialVector, normsPath, ctx.getConfiguration());
      } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
        Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
      } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
        Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
      } else {
        ctx.write(row, new VectorWritable(partialVector));
      }
    }
  }

(VectorNormMapper,MergeVectorsReducer)步骤大概如下:

map: [itemID,Vector<userID,Pref>]
-> [userID , Vector<itemID , Pref>] 和 [itemID , sum(valueOf(userID)^2)]

combine&reduce: 正常的合并和记录,不多描述

总结一下这个mapreduce,这个mapreduce得到了每个item基于与其相关的所有用户的偏好值得平方和。然后将item-user翻转为user-item。个人认为这种item和user的来回变换反而增加运算量,非常耗费运算时间,可以进行一定量的简化。

(CooccurrencesMapper,SimilarityReducer)计算每个item的相似矩阵。
输入:[userID , Vector< itemID , Pref >]
输出:[itemA , Vector< itemB , sim >] (itemA< itemB)
代码:

 public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {

    ...

    @Override
    protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
      throws IOException, InterruptedException {

      //输入格式为[userID , Vector< itemID , Pref >]
      //这里抽出Vector< itemID , Pref >,变为Array并基于itemID排序
      Element[] occurrences = Vectors.toArray(occurrenceVector);
      Arrays.sort(occurrences, BY_INDEX);

      int cooccurrences = 0;
      int prunedCooccurrences = 0;

      //接下来这里是一个双重循环,针对每个user,计算其Pref(itemA)*Pref(itemB)
      //其中itemA< itemB
      for (int n = 0; n < occurrences.length; n++) {
        Element occurrenceA = occurrences[n];
        Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
        for (int m = n; m < occurrences.length; m++) {
          Element occurrenceB = occurrences[m];
          if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
            dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
            cooccurrences++;
          } else {
            prunedCooccurrences++;
          }
        }
        ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
      }
      ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
      ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
    }
  }
//经过以上map,我们就得到了每一个user中的各个item的偏好值相互的乘积
 public static class SimilarityReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {

     ...

    @Override
    protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
      throws IOException, InterruptedException {
      Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
      Vector dots = partialDotsIterator.next().get();

      //输入数据形式:[itemA , Iterable<Vector><itemB , agg>] agg即偏好乘积
      //这个while循环和for循环的双重循环是将所有itemA的index与itemB的index相同条目的agg偏好乘积加和,将结果存放在dots中
      //形象一点可以这样表示:
      //[A,<B,agg>,<C,agg>,<B,agg>] => [A,<B,agg + agg>,<C,agg>]
      while (partialDotsIterator.hasNext()) {
        Vector toAdd = partialDotsIterator.next().get();
        for (Element nonZeroElement : toAdd.nonZeroes()) {
          dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
        }
      }

      //dots存放当前itemA所有相关的itemB的index和偏好值乘积的和
      Vector similarities = dots.like();
      //这里取出norms,得到与itemA相关的所有user的偏好值的平方和
      double normA = norms.getQuick(row.get());
      for (Element b : dots.nonZeroes()) {

        //这里的norms.getQuick(b.index())是求出itemB的偏好值的平方和
        //通过用户设定的similarity计算方法计算itemA-itemB的相似度
        double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
        //是否达到最小过滤值,默认不存在
        if (similarityValue >= treshold) {
          similarities.set(b.index(), similarityValue);
        }
      }
      //是否需要排除关于自身的相似度如果需要,设置为0
      if (excludeSelfSimilarity) {
        similarities.setQuick(row.get(), 0);
      }
      ctx.write(row, new VectorWritable(similarities));
    }
  }

具体步骤:

map:
[userID , Vector<itemID , Pref>]
->SortBy(itemID)
->双重循环对Vector<itemID , Pref>处理得到[ItemA , <itemB , aggregete>](itemA< itemB)

reduce:
[itemA , Iterable<itemB , aggregete>]
->对所有itemA和itemB相同的条目进行加和
->通过保存下来的norms,和加和后的[itemA , Iterable<itemB , aggregete>]计算相似度
->[itemA , Vector<itemB , sim>](itemA< itemB)

总结一下上个job,本job首先计算出了每个user相关item集合两两之间的偏好值乘积,然后整合这些乘积就得到了所有item集合中两两之间偏好的乘积,至于为什么需要这个乘积我们可以参考推荐系统计算相似度的各个公式:

相似度公式

这里的x和y我们可以当做itemA和itemB。而item的向量维度我们和以解释为与item有关的所有user,我们可以发现计算过程中缺失值的情况mahout直接将他们当做0处理。

从上面的公式可以看到我们在similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns)这一步传参的时候分别传进去了xy、x^2、y^2。接下来就可以按照用户自选的计算相似度方法计算物品之间的相似度。

接下来我们看第四个mapreduce:(UnsymmetrifyMapper,MergeToTopKSimilaritiesReducer)
这个mapreduce的功能是选出topN个相似度最大的item然后输出。
输入:[itemA , Vector< itemB , sim >] (itemA< itemB)
输出:[itemA , Vector< itemB , sim >] (itemA< itemB || itemA< itemB)(其实就是将相似度矩阵补全了,这是个对称矩阵)
代码:

public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>  {

    ...

    @Override
    protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
      throws IOException, InterruptedException {
      Vector similarities = similaritiesWritable.get();
      // For performance, the creation of transposedPartial is moved out of the while loop and it is reused inside
      //这里批注一下RandomAccessSparseVector这个函数,其返回一个初始化的Vector
      //传参的功能分别为:RandomAccessSparseVector(Vector的格式,Vector的size)
      Vector transposedPartial = new RandomAccessSparseVector(similarities.size(), 1);
      TopElementsQueue topKQueue = new TopElementsQueue(maxSimilaritiesPerRow);
      //通过for循环更新topKQueue队列,队列大小根据用户定义数值maxSimilaritiesPerRow决定
      for (Element nonZeroElement : similarities.nonZeroes()) {
        MutableElement top = topKQueue.top();
        double candidateValue = nonZeroElement.get();
        if (candidateValue > top.get()) {
          top.setIndex(nonZeroElement.index());
          top.set(candidateValue);
          topKQueue.updateTop();
        }

        //这里直接取得转置的[itemB , <itemA , pref>],并没有对这部分做处理
        transposedPartial.setQuick(row.get(), candidateValue);
        ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
        transposedPartial.setQuick(row.get(), 0.0);
      }

      //写下通过topKQueue得到的TopN个相似item
      Vector topKSimilarities = new RandomAccessSparseVector(similarities.size(), maxSimilaritiesPerRow);
      for (Element topKSimilarity : topKQueue.getTopElements()) {
        topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
      }
      ctx.write(row, new VectorWritable(topKSimilarities));
    }
  }

  public static class MergeToTopKSimilaritiesReducer
      extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {

    ...

    @Override
    protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
      throws IOException, InterruptedException {
      Vector allSimilarities = Vectors.merge(partials);
      //重新求一遍TopN(因为转置以后得到的相似度还没有求过topN)
      //其实笔者认为数据量不大或者数据关联性不高的时候可以先求转置然后一次性直接求topN的
      //这里mahout它是map的时候对对称矩阵的上半段求TopN然后在reduce中对所有再求一遍TopN
      Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
      ctx.write(row, new VectorWritable(topKSimilarities));
    }
  }

具体步骤:

map:
[itemA , Vector<itemB , sim>](itemA< itemB)
->对itemA求topN 并且 翻转得到 [itemB , <itemA , sim>](itemA< itemB)

reduce:
[itemA , Iterable<itemB , sim>]
->对itemA求topN 

总结一下上个mapreduce其实就是对相似度矩阵进行了一定的限定,得到我们设定好的数值个相关item。

写到这里,RowSimilarityJob就结束了,可以看到RowSimilarityJob其实就是通过user-item,pref矩阵得到相应的item之间的相似度矩阵。

转载请注明出处:http://blog.csdn.net/utopia_1919/article/details/51851326

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
mahout之推荐系统源码笔记(1) —预处理之PreparePreferenceMatrixJob hadoop篇: 因为时间原因首先更新分布式hadoop上的推荐系统源码的阅读。 本笔记基于 apache-mahout-distribution-0.12.2-src 。 首先给出mahout中taste推荐系统的代码结构: taste common eval hadoop impl model neighborhood recommender similarity model neighborhood
作者: 赵怡 一、Neutron Kilo 版和Liberty版本主要区别: 新增的特性: 1. neutron支持IPv6前缀委托授权为IPv6子网分配CIDR 2. neutron支持QoS API, 初期只支持端口带宽限制 3.路由器HA (L3 HA/VRRP)在L2 population(l2_pop)设置为enable时, 可以正常工作了。 4. VPNaaS参考驱动现在可以和HA router一起正常工作了 5. HA路由器上使用的VRRP网络, 可以配置为特定的segmentation类型

Hadoop之hive学习_01 - 2016-07-08 14:07:22

Hive是构建在hdfs上的一个数据仓库,本质上就是数据库,用来存储数据 数据仓库是一个面向主题的、集成的、不可更新的、随时间不变化的数据集合,用于支持企业或组织的决策分析处理。 1.      面向主题:数据仓库的主题是按照一定得主题进行组织的,即用户所关注的重点对象,比如商品推荐系统。 2.      集成的:将分散的数据(文本文件,oracle数据,mysql数据。。。)进行加工处理才能够成为数据仓库的存储对象。 3.      不可更新的:数据仓库中的数据起主要用途是用于决策分析,所以主要的数据操
​ (上图为Linux基金会HyperLedger超级账本项目执行董事Brian Behlendorf) 区块链恐怕是时下最热门的前沿技术了。这项兴起于比特币的技术,现在已经被视为金融业和许多其他行业的颠覆性技术。尽管区块链的技术体系和应用框架还处于早期阶段,但这并不能阻挡上至行业巨头下至创业公司的热情。 2016年6月30日,Linux基金会下属的HyperLedger超级账本项目宣布了7位新入成员,其中包括莫斯科证券交易所和来自中国的三家技术公司。自去年12月成立以来,超级账本项目已经从最开始的30家

Hadoop面试题 - 2016-07-07 17:07:55

Hadoop MapReduce采用Master/Slave结构 1. 列举出hadoop中定义的最常用的InputFormats.哪个是默认的?     TextInputFormat(默认)用于读取纯文本文件,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型     KeyValueTextInputFormat 同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分 为value;如果没有分隔符,整行作为 key

使用 Ansible 管理 MySQL 复制 - 2016-07-06 19:07:28

Ansible 是一个配置管理和应用部署工具,功能类似于目前业界的配置管理工具 Chef,Puppet,Saltstack。Ansible 是通过 Python 语言开发。Ansible 平台由 Michael DeHaan 创建,他同时也是知名软件 Cobbler 与 Func 的作者。Ansible 的第一个版本发布于 2012 年 2 月,相比较其它同类产品来说,Ansible 还是非常年轻的,但这并不影响他的蓬勃发展与大家对他的热爱。 Ansible 默认通过 SSH 协议管理机器,所以 Ansi
1、节点规划   在master、backup节点上添加eth0、eth1两网卡,具体添加过程,参考“ 基于VMware为CentOS 6.5配置两个网卡 ” 2、IP规划   master backup eth0 192.168.46.128 192.168.46.130 eth1 192.168.46.129 192.168.46.131   上面这个表格说明master节点中的eth0网卡的IP是192.168.46.128,eth1网卡的IP是192.168.46.129;backup节点中eth
概述 在本篇中,学习创建和管理硬链接和符号链接。学习: 创建硬或软链接 识别链接并知道它们的类型 理解复制与链接文件之间的区别 使用链接执行系统管理任务 链接简介 在存储设备上,文件或目录包含在一些数据块中。有关某个文件的信息包含在一个 inode 中,它记录了所有者、最后访问该文件的时间、文件的大小、它是否是目录,以及谁可以读取或写入它等信息。inode 编号也称为 文件序列号 ,该编号在特定文件系统内是唯一的。一个 目录条目 包含一个文件或目录的名称,以及用来存储该文件或目录的信息的 inode 的指
文章亮点 将PHP应用及其依赖的服务容器化步骤 如何将应用容器镜像的构建自动化 应用容器如何快速部署到测试环境和生产环境中 快速上手 PHP官方在 hub.docker.com 上维护了官方的PHP Docker镜像,包含了从PHP 5.5到7.0的多种不同版本的镜像。 我们将以PHP官方的Docker镜像为基础,介绍如何将一个简单的PHP应用Docker化。 创建一个新目录 php-quickstart,作为我们的项目目录 在项目目录下创建文件 app.php ?php  echo “Hello Doc

hive参数调优汇总 - 2016-07-06 17:07:51

参考:http://blog.csdn.net/beckham008/article/details/23741151?utm_source=tuicoolutm_medium=referral 1.设置合理solt数 mapred.tasktracker.map.tasks.maximum  每个tasktracker可同时运行的最大map task数,默认值2。 mapred.tasktracker.reduce.tasks.maximum 每个tasktracker可同时运行的最大reduce ta