K way merge problem

Problem

Assume you have very  K large arrays of integers stored in the Disk. It is so large that I would not fit into your computer memory.

You need to write the application that merge all K large arrays into sorted one list and store it in the disk.

Example, say the memory size, K = 6. Your program should be able to merge the following three arrays.
Note that each file has sorted list of integers.

  File 1 = {3,7,9,15,35};
  File 2 = {2,8,45,60,70,100};
  File 3 = {5,6,50,101, 110};

You need to write a program that merges the arrays into one array with all the numbers sorted in ascending order.

Given the fact that the machine does not have enough memory to load all three files,
we can load part of the files each time and sort in the memory and flush out to the buffer.

Brute force approach

We have the K partially sorted arrays loaded on the memory, all we need to do is to find the largest numbers among the first elements in the each arrays and store it in the output buffer.
After that we can move the pointer to the chosen array to the next element, we can repeat this process until we reach the end of one of K arrays. 

If that happens, we load the next chunk of the data into the array and start the process again.

What is the running time of this process? Let's break down the steps:

1) find the largest elements among the K elements. 

Brute force comparison will take O(K2) and Quick sort will take O(KlogK)

2) Loading the files in to the memory.

For simplicity, let's assume it is the same as reading each element one by one, thereby running in O(N).

We will perform the step 1, for N elements, where N is the total number of integers stored in K files.
Total running time will be (N×KlogK)

Heap approach

Can we do better?

Yes, we can use  Heap data structure to sort the K elements. Heap has the following properties.

- Adding element: O(logK)
- Extracting element: O(logK).


K way merge using MinHeap


It guarantees the smallest element stays always on the top.

This will make the Step 1 in the "Brute force approach" run in O(logK).

Therefore total running time will be O(N×logK)
.


Here is the complete code.

#include <fstream>
#include <iostream>
#include <vector>
#include <string>
#include <stdlib.h>
#include <math.h>
using namespace std;
struct node {
int value;
int idx;
node(int v, int i) : value(v), idx(i){}
};
class MinHeap {
public:
node * extract() {
if (list.size() == 0)
return 0;
node * r = list[0];
swap(0, list.size() - 1);
list.pop_back();
heapify(0);
return r;
}
void insert(node * new_node)
{
list.push_back(new_node);
bubbleUp(list.size() - 1);
}
int size(){ return list.size(); }
private:
int left(int p) { return 2 * p+1; }
int right(int p) { return 2 * p + 2; }
int parent(int c) { return floor((double)((c -1)/ 2)); }
void bubbleUp(int c)
{
if (c <= 0)
return;
int p = parent(c);
if (p >= 0 &&!is_parent(p, c))
{
swap(p, c);
bubbleUp(p);
}
}
void heapify(int p)
{
int m = p;
int l = left(p);
int r = right(p);
if (l < list.size() && !is_parent(m, l))
m = l;
if (r < list.size() && !is_parent(m, r))
m = r;
if (p != m)
{
swap(p, m);
heapify(m);
}
}
void swap(int s, int d)
{
node * tmp = list[s];
list[s] = list[d];
list[d] = tmp;
}
bool is_parent(int p, int c)
{
return list[p]->value < list[c]->value;
}
vector<node *> list;
};
vector<int> convert(string& line)
{
vector<int> numbers;
int prev = INT_MIN;
int s = 0;
int e = 0;
if (line.length() == 0)
return numbers;
while ((e = line.find(' ', s)) != string::npos)
{
string num = line.substr(s, e - s);
int n = stoi(num, 0);
s = e + 1;
numbers.push_back(n);
}
if (s < line.length() - 1)
{
string num = line.substr(s, line.length() - s);
int n = stoi(num, 0);
numbers.push_back(n);
}
cout << endl;
return numbers;
}
void kway_merge_large(ofstream &out, ifstream* files, int len, int max_size)
{
int * arr_pos = new int[len];
string buf;
vector<vector<int>> input;
MinHeap heap;
string outbuf;
vector<int> arr;
memset(arr_pos, 0, len*sizeof(int));
for (int i = 0; i < len; i++)
{
getline(files[i], buf);
arr = convert(buf);
input.push_back(arr);
heap.insert(new node(arr[arr_pos[i]++], i));
}
while (heap.size())
{
node * next = heap.extract();
outbuf += to_string(next->value) + " ";
if (outbuf.length() >= max_size)
{
cout << outbuf;
out.write(outbuf.c_str(), outbuf.length());
outbuf.clear();
}
int i = next->idx;
if (arr_pos[i] < input[i].size())
{
heap.insert(new node(input[i][arr_pos[i]++], i));
}
else {
if (!files[i].eof())
{
getline(files[i], buf);
arr = convert(buf);
input[i] = arr;
if (arr.size() > 0)
{
arr_pos[i] = 0;
heap.insert(new node(input[i][arr_pos[i]++], i));
}
}
}
}
if (outbuf.length() > 0)
{
out.write(outbuf.c_str(), outbuf.length());
}
}
int main()
{
string inputs[7] = { "input1.txt", "input2.txt", "input3.txt", "input4.txt", "input5.txt", "input6.txt", "input7.txt" };
ifstream files[7];
for (int i = 0; i< 4; i++)
{
files[i].open(inputs[i]);
}
string line;
ofstream out;
out.open("out.txt", std::ofstream::app|std::ofstream::trunc);
kway_merge_large(out, files, 4, 10);
out.close();
ifstream result;
result.open("out.txt");
cout << "checking result:" <<endl;
while (!result.eof())
{
getline(result, line);
if (line.length())
cout << line;
cout << endl;
}
return 1;
}
view raw kway_merge.cpp hosted with ❤ by GitHub


Practice statistics:

3hrs: to write the code (spent most of time parsing the input data due to lack of solid assumptions)
           had a flaw in the heap implementation. Need better review practice

UPDATE(2019 06 26): solved this problem in python and took simpler approach by skipping the reading the number from file but implemented the flushing to the file part.

Here is the complete code in Python

import math
class Heap:
def __init__(self):
self.list = [None]
def add(self, number):
self.list.append(number)
self.bubbleUp(len(self.list) - 1)
def length(self):
return len(self.list) - 1
def extract(self):
if len(self.list) <= 1:
return None
value = self.list[1]
self.swap(1, len(self.list) - 1)
self.list.pop()
self.bubbleDown(1)
return value
def swap(self, l, r):
tmp = self.list[l]
self.list[l] = self.list[r]
self.list[r] = tmp
def left(self, p):
return int(2 * p)
def right(self, p):
return int(2 * p + 1)
def parent(self, c):
return int(math.floor(c/2))
def is_child(self, parent, child):
return self.list[parent] <= self.list[child]
def bubbleUp(self, child):
if child <= 1:
return
parent = self.parent(child)
if self.is_child(parent, child) != True:
self.swap(child, parent)
self.bubbleUp(parent)
def bubbleDown(self, parent):
if parent >= len(self.list) - 1:
return
left = self.left(parent)
if left >= len(self.list):
return
right = self.right(parent)
to_swap = left if right >= len(self.list) or self.is_child(left, right) == True else right
if self.is_child(parent, to_swap) != True:
self.swap(parent, to_swap)
self.bubbleDown(to_swap)
class KMerge:
def __init__(self, max_size):
self.file = []
self.heap = Heap()
self.max_size = max_size
self.fd = open("test.txt", "a+")
def write(self, number):
self.fd.write("{}, ".format(number))
self.file.append(number)
def close(self):
self.fd.close()
def sort(self, arrays):
if self.max_size < len(arrays):
print("number of files, {} can't be larger than {}. existing..".format(len(arrays), self.max_size))
return []
for i, a in enumerate(arrays):
self.heap.add([a[0], i, 0]) # value, position of file, position of value in file
while(self.heap.length() > 0):
next = self.heap.extract()
#write next smallest number to file
self.write(next[0])
#pull more number from array
if len(arrays[next[1]])- 1 > next[2]:
self.heap.add([arrays[next[1]][next[2] + 1], next[1], next[2] + 1])
return self.file
k = KMerge(3)
arr1 = [3,7,9,15,35]
arr2 = [2,8,45,60,70,100]
arr3 = [5,6,50,101, 110]
files = [arr1, arr2, arr3]
print("input = {} output = {}".format(files, k.sort(files)))
k.close()
view raw kway_merge.py hosted with ❤ by GitHub



Practice statistics:

29:00: to think of algorithm and write up the code without testing
10:00: to fix the typos and minor bugs (should not happen next time)


Comments

Popular posts from this blog

Planting flowers with no adjacent flower plots

Find the shorted path from the vertex 0 for given list of vertices.