Parallelisation : Writing a linear matrix algorithm for Map-Reduce

There are multiple ways to skin matrix multiplication. If you begin to think about it, there are probably 4 or 5 ways in which you could approach matrix multiplication. In this post, we look at another, easier, way of multiplying two matrices, and attempt to build a MapReduce version of the algorithm. Before we dive into the code itself, we’ll quickly review the actual algebraic process we’re trying to parallelise. We are going to view the quotient matrix as a series of column vectors, each of which is the linear combination of the weighted columns of the LHS (Left Hand Side) matrix. The best way to explain this is through a diagram explaining a sample calculation.

The diagram shows the multiplication of two matrices A and B, yielding C as the result. The algebraic process for the calculation of the first column of C is shown. As you can see, the this is derived from weighting each column of A by some scalar, and summing then up. The scalars, or ‘weights’, are simply the individual elements of the first column of B. Hence the phrase “linear combination of weighted columns.” The algebraic process, in fact, leads very intuitively to the MapReduce version of the algorithm. Given two matrices to be multiplied, these would be the steps.
* Mapper: This will emit the pairings consisting of (scalar, vector) for each of the columns of the LHS matrix.
* Mapper: This will be run for each (scalar, vector) pairing, and will simply multiply them to produce a new vector.
* Reducer: This will add up all the subvectors for a single column of the RHS matrix to produce the corresponding column of the quotient matrix.
* Reducer: This will join all the separate columns of the quotient matrix to form the complete result.

The code for this runs using Snail-MapReduce. Here is the code for the MapReduce version.

require 'rubygems'
require 'matrix'
require './matrix_block_mixin'
require './map_reduce'

def pair_up(key, value)
	inputs = []
	a = value[:a]
	b = value[:b]

	row = 0
	a.each_column do |a_column|
		column = 0
		b.row_vectors[row].each do |e|
			inputs << {:key => column.to_s, :value => {:scalar => e, :vector => a_column}}
			column += 1
		end
		row += 1
	end
	inputs
end

def multiply(key, value)
	[{ :key => key, :value => value[:vector].collect {|vc| vc * value[:scalar]}}]
end

def add_reduce(key, values)
	empty = Array.new(values.first.size)
	empty.fill(0)
	v = values.inject(Vector.elements(empty)) {|sum, v| sum + v}
	[{:key => 'X', :value => v}]
end

def append_reduce(key, values)
	[{:key => 'result', :value => Matrix.columns(values)}]
end

def m(order)
	Matrix.build(order, order) {|row, col| rand(20) }
end

order = 8
mappings = reductions = (Math.log2(order) - 1).to_i
m1 = m(order)
m2 = m(order)

operations = []
operations << Mapper.new {|k,v| pair_up(k,v)}
operations << Mapper.new {|k,v| multiply(k,v)}
operations << Reducer.new {|k,vs| add_reduce(k,vs)}
operations << Reducer.new {|k,vs| append_reduce(k,vs)}

result = MapReduceRunner.new(operations).run([{:key => "X", :value => {:a => m1, :b => m2}}])

puts m1*m2 == result[0][:value]
Could anything else be parallelised in this version? Well, the first Reduce step could be broken up into multiple summations, if the number of elements to be added up was very large; so that’s one thought. Next time, I’ll try digging into a few eigenvector computation algorithms to see what it takes to get them to run as MapReduce jobs.