Parallelisation : Refactoring a recursive block matrix algorithm for Map-Reduce

I’ve recently gotten interested in the parallelisation of algorithms in general; specifically, the type of algorithm design compatible with the MapReduce model of programming. Given that I’ll probably be dealing with bigger quantities of data in the near future, it behooves me to start think about parallelisation, actively. In this post, I will look at the matrix multiplication algorithm which uses block decomposition, to recursively compute the product of two matrices. I have spoken of the general idea here; you may want to read that first for the linear algebra groundwork, before continuing on with this post.

NOTE: The example in this post uses Snail-MapReduce, a Ruby-based, single-threaded, in-memory, barebones MapReduce framework I’m working on to quickly prototype and test parallel algorithms. You may want to install the gem using:

 gem install snail-map-reduce 

The Sequential Version

Before attempting any parallelisation, it’s instructive to study the sequential version of the code. This will help solidify the algebraic concept as well as serve as a basis for comparison with the parallel version. Before we do that, however, we need to mix in a new method into the Matrix class. This will allow us to extract block matrices out of a matrix very easily.

class Matrix
	def block(block_row, block_column)
		raise "Non 2^n matrix" if (row_size & (row_size - 1)) != 0 || (column_size & (column_size - 1)) != 0
		lower_order = row_size/2
		start_row = block_row * lower_order
		start_column = block_column * lower_order
		b = []
		lower_order.times do |r|
			row = []
			lower_order.times do |c|
				row << self[start_row + r, start_column + c]
			end
			b << row
		end
		Matrix.rows(b)
	end
end
First off, the restriction that we impose on the matrices is that they need to be square matrices of equal order, this order is 2^n \times 2^n. This is so that we can keep the example sufficiently simple without delving into more complicated partitioning techniques. The second consideration for the

block method is that it will address only 4 block matrices: (0,0), (0,1), (1,0), and (1,1). There is no explicit check for this for the moment. Nevertheless, those are the only 4 addresses we will use. Here is the code for the sequential, recursive, multiplication operation:

def join(left_block, right_block)
	rows = []
	lower_order = left_block.row_size
	lower_order.times do |t|
		rows << left_block.row(t).to_a + right_block.row(t).to_a
	end
	rows
end

def product(a,b)
	return a*b if a.row_size == 2
	p00 = product(a.block(0,0), b.block(0,0)) + product(a.block(0,1), b.block(1,0))
	p01 = product(a.block(0,0), b.block(0,1)) + product(a.block(0,1), b.block(1,1))
	p10 = product(a.block(1,0), b.block(0,0)) + product(a.block(1,1), b.block(1,0))
	p11 = product(a.block(1,0), b.block(0,1)) + product(a.block(1,1), b.block(1,1))

	Matrix.rows(join(p00, p01) + join(p10, p11))
end
We have chosen the halting condition as the point at which the order of the matrices being multiplied is 2×2. We can choose a bigger order if we want, but let’s stick with this for now. The logic is pretty simple to follow, if you look at this diagram. p00, p01, p10, and p11 calculate the block matrices for each of the quadrants of the big matrix. The block matrices themselves are decomposed in the same fashion till the halting condition is reached.

Analysing the Flow for Mappers/Reducers

This is all well and good, but we need an implementation which is amenable to execution using the MapReduce paradigm. The first step I take is to look at the trace of the algorithm for a reasonably small case. Let us look at the trace of the sequential algorithm for a matrix of order 8×8. Note that the diagram shows the logical code paths, and not the simultaneity of the execution paths (there isn’t any, in any case).

Note that after the halting condition, execution goes back up the chain, aggregating the sum of products at each step. This should be give us our first clue about the Reduce operations we will ultimately end up with. Let us consider what the aggregation steps are: 1. The product of two block matrices 2. The sum of two block matrices This seems sensible, given that these operations happen when the flow of logic is moving back up the tree. What about the Map operation (or operations), though? To answer the question about the Mappers, lets look at an example Reduce at an intermediate stage of our logic flow.

We know that any Reduce operation first partitions the input space by grouping entries by their keys. Thus, for a set of values to reach a single Reducer, they must all have the same key. The diagram above shows a possible addressing scheme. In this scheme, any operation in a quadrant is prefixed by the block matrix index of that quadrant (00/01/10/11). Since each quadrant operation is basically the sum of two products, the Reducer for sum uses this key. In addition, the keys for each of the two products are A or B, and are appended to the address of their parent. This address grows recursively the lower in the tree we go. In our example, the keys at the lowest level of operation (the level at which the product is actually computed because of the halting condition) are 00B00A, 00B00B, etc. Let’s look at the case for the Reduce operation at 00B00. If we assume that 00B00A and 00B00B‘s products have been computed already, the only way to make sure that a Reducer gets both of these values (so that it can perform the summation), is to lop off the A and the B from the ends of 00B00A and 00B00B respectively, so that they both become 00B00. This operation may be performed during the calculation of the products itself. Thus, we can implement the lowest-level multiplication of two block matrices as a Mapper, which gives you back two entries which have the same key (00B00) and the products as the values. If we do the same for the rest of the low-level products, the Partitioner will ensure that all products are summed up in their correct Reducers. The same logic applies for reducing 00B00, 00B01, 00B10, and 00B11. All that needs to happen in the summation Reducer stage, is to lop off the last 2 characters from the keys, so that all of them become 00B. However, in this case, the order is important, because each of these 4 matrices need to be merged into a supermatrix in the proper order. Therefore, the last two digits are preserved as part of the value, so that the Reducer at 00B can merge them in the correct order.

The MapReduce-compatible version

Here is the code for the matrix multiplication, compatible with MapReduce.

require 'rubygems'
require 'matrix'
require './matrix_block_mixin'
require './map_reduce'
require 'benchmark'

def map(key, value)
	inputs = []
	a = value[:a]
	b = value[:b]
	if a.row_size == 2
		inputs << {:key=> key, :a => a, :b => b}
		return inputs
	end
	inputs << {:key => key + "00A", :value => {:a => a.block(0,0), :b => b.block(0,0)}}
	inputs << {:key => key + "00B", :value => {:a => a.block(0,1), :b => b.block(1,0)}}

	inputs << {:key => key + "01A", :value => {:a => a.block(0,0), :b => b.block(0,1)}}
	inputs << {:key => key + "01B", :value => {:a => a.block(0,1), :b => b.block(1,1)}}

	inputs << {:key => key + "10A", :value => {:a => a.block(1,0), :b => b.block(0,0)}}
	inputs << {:key => key + "10B", :value => {:a => a.block(1,1), :b => b.block(1,0)}}

	inputs << {:key => key + "11A", :value => {:a => a.block(1,0), :b => b.block(0,1)}}
	inputs << {:key => key + "11B", :value => {:a => a.block(1,1), :b => b.block(1,1)}}

	inputs
end

def join(left_block, right_block)
	rows = []	
	lower_order = left_block.row_size
	lower_order.times do |t|
		rows << left_block.row(t).to_a + right_block.row(t).to_a
	end
	rows
end

def m(order)
	Matrix.build(order, order) {|row, col| rand(20) }
end

def block_join_reduce(key, values)
	p00 = values[values.index {|v| v[:identity] == '00'}][:matrix]
	p01 = values[values.index {|v| v[:identity] == '01'}][:matrix]
	p10 = values[values.index {|v| v[:identity] == '10'}][:matrix]
	p11 = values[values.index {|v| v[:identity] == '11'}][:matrix]
	[{:key => key[0..-2], :value => {:identity => key[-1], :matrix => Matrix.rows(join(p00, p01) + join(p10, p11))}}]
end

def block_matrix_sum(key, values)
	sum = Matrix.zero(values.first[:matrix].row_size)
	values.each {|m| sum += m[:matrix]}
	[{:key => key[0..-3], :value => {:matrix => sum, :identity => key[-2..-1]}}]
end

def primitive_map(key, value)
	[{:key => key[0..-2], :value =>  {:matrix => value[:a] * value[:b], :identity => key[0..-2]}}]
end

order = 16
mappings = reductions = (Math.log2(order) - 1).to_i
m1 = m(order)
m2 = m(order)

operations = []
mappings.times do
	operations << Mapper.new {|k,v| map(k,v)}
end

operations << Mapper.new {|k,v| primitive_map(k,v)}

reductions.times do
	operations << Reducer.new {|k,v| block_matrix_sum(k,v)}
	operations << Reducer.new {|k,v| block_join_reduce(k,v)}
end

result = MapReduceRunner.new(operations).run([{:key => "X", :value => {:a => m1, :b => m2}}])
puts m1*m2 == result[0][:value][:matrix]