the idea is we define the interfaces for a mapreduce framework. then we can try several different implementations of it, but the actual functions passed to map and reduce remain the same. we are abstracting out the way we iterate over a large number of files, so that this work can be parallelized and optimized. given a list of files, maybe one directory, or a root of a filetree or a whole filesystem. we want to perform an action on each file. we are hiding the intermediate files, and trying to keep the total memory used to aggregate low. mapred mapfn reducefn /fs essentially we have a large list of files, on which we run the map function on each record in a file. It emits a 'key val' pair. mapred will take care of storing into files and running the reduce function. the reduce function expects a key and list of values for that key. The simplest implementation is to walk the tree and run mapfn on each file, sort all the output and redirect to reduce. and we get one file as output. e.g. fn mapred { map = $1 reduce = $2 dir = $3 t = /tmp/mapred.${pid} mkdir $t rm $t/* >[2] /dev/null fs pipe @$map $dir |intermediate 13 $t/ for i in $t/* {sort $i | $reduce > $i.red} } The above is the simplest solution. We could start it in it's own emu. We could split it among a number of emus. We could improve the merging. And do some analysis of the filesystem to decide how to split it. The result could be a filesystem that can be passed to another mapred function. We should specify the output filesystem as part of the interface. The implementation changes depending on the environment and infrastructure. one implementation should work well on a single emu instance. another could be created for a cluster. but the interface to the user and the modules remains the same. We could try splitting the data and have /tmp/mapred.pid/Msplit/Rintermediate Define the map and reduce functions with specific interfaces using channels to receive input. the map and reduce functions should not be aware of the format stored on disk. map(key, value) emits a (key, value), both strings, onto a channel for map, key is typically a filename, value is implicit and is the contents of the file. else it's already the open Iobuf. We should deal with errors consistently. reduce(key, list) receives list on channel which it iterates until done emits a value (or nothing) then returns reader(file, offset, nbtyes) # reader module init(Mapper, emit: chan of (key, value)); the reader decides how to parse file. different readers can read different file types. the reader calls the mapper (so the reader should be part of a worker designated to work on a chunk of data by the master) master contains worker list, and intermediate list. once workers are done for an intermediate, the intermediate can be killed. once all workers and intermediates are done we exit. we need to watch a lot of processes. we need a command that given a file system /n/mbox will look at the size of all the files and split it into equal lists of files. e.g. divide /n/mbox by 16 we'd get 16 lists of files each with about equal total size. we'd need to do that by file sizes assuming a certain order, and starting files by offset, or just round up. get the total size, divide by 16 and break into equal chunks >= than 16. start with one reader, one map, one intermediate. we need a re-entrant buffered IO. The thing to expand first is have the reduce worker gather multiple files from multiple machines for the same hash partition, and do an external sort on that file. we still need to be able to sort large files with a bound on memory. We need a gather and sort function before reducer is called. The external sort is probably the most pressing thing to work on. We shouldn't improve anything else until we have that. gather streams all the lines into sorter which writes to disk and merges as necessary. once gather finishes, sort merges all the temporary files and writes out to one reduce partition. after that is implemented, we can create the remote job management system. we need a Worker interface that takes a job specification and runs and logs what it does and reports back to the master its progress. The worker might take the name of the module to load, the files and file splits on which to run. It reports back the files it creates once it's done with them and responds to heartbeats from the master, and reports nbytes processed. worker input is this format: /file/name offset nbytes Master launches Worker. Master writes job configuration to file. Worker reads file and write progress to /n/client/master which is a channel served by the master. Master should be a single fs that all the clients assigned to one ctl file. Master clone n/ n/ctl n/status Just open /mnt/mapreduce/clone and read records. The worker is connected and processing tracked while the file is open. If the file is closed without being completely written, then the job is restarted. However, if everything is read and file closed, the job is a success. Just open and read records from /mnt/mapred/clone The first line is split number M, and number of R Masterfs mounts itself on /mnt/master and then calls cpu.dis with args for worker. /dis/cpu.dis tcp!localhost mapworker So mapworker looks at /n/client/mnt/master/clone The first line it reads could be the argl for the worker, or we could pass the argl to the worker. Best have the serve process coordinate everything. That is the principle piece of design here. the fs is the supervisor. This filesystem is served for each remote host where we plan on launching any jobs using the rstyx protocol. The cfg just contains the modules to load and the filenames and splits. The Master launches each worker worker /mnt/mapreduce/clone Worker opens clone and gets a new connection to the Master. It reads the cfg file and loads the named modules. E.g. the reader and mapper. then it processes the files and passes them to reader. Reader passed them to mapper. The output files are written back to the data channel to master. All the clients are kept track of as open fids, just like a normal fs. If we loose a client we know. We can restart clients if we loose them and fail to get a proper close out message etc. All the clients are kept track of as open fids, just like a normal fs. If we lose a client we know. We can restart clients if we lose them and fail to get a proper close out message etc. A Masterfs is the way to go. The Masterfs could ask the registry for where to go to push the load. The rstyx on each server can update it's registry entry for each connection so the load is properly controlled. E.g. with max number of jobs. Or max number of connections. Start with modfs.b clone 1 ctl data Master wants to run job, say a mapper worker it looks up registry for available rstyx resource (anything without a maxed out number of connections) using rstyx protocol it connects to the server, writes the Worker command then it begins serving it's own fs on the connection (or it can bind its connection onto its own namespace and serve that). Worker knows to just open /n/client/mnt/master/clone and read the number of its connection. It then opens the ctl files. Worker reads the configuration and starts the job, when it's done it closes its connection The master can mark off that job as done The registry must change slightly to become a job exchange. A master asks the registry for a resource to run one connection, one job. The registry looks up a candidate and gives it out. But now there's a lag between the master connecting and the resource writing back to the registry that a connection's been opened. One way around it is just for the resource to just turn it away, so the master needs to keep coming back. And if there are no resources, what does the master do? just periodically poll the registry? grid/registry already practically does what we want with monitoring the number of connections. It just needs to write it back to the registry. Tasks 1. Implement External Sorter 2. Implement Resource partition gatherer 3. Modify grid/register* commands to update registry with number of connections and other resource usage. 4. Write generic worker and come up with filesystem interface 5. Write code to split large set of files into equal size chunks 6. Write Masterfs to spawn and monitor workers Gather is really just cat. we can script a lot of this. mapred needs to call a script to gather and sort the files. Mapworkers and Reduceworkers are also standalone commands. The result of a mapworker is a list of files. The input to a mapworker is a list of filesplits. the input also includes a config file that specifies the mapper and reader modules. The input to a reduceworker is the list of intermediate files. it calls the gather and sort command to create a local file for it to reduce. the gathersort manages mounting the fs as needed. The output of reduceworker is the reduce file. We need a filesplitter that can create a set of mapworker config files. It walks a fs calculates total size and splits on a set quanta. Much like du. Create netcat that takes as stdin tcp!host!port /path/file it builds a hash of network address and mounts one at a time and cats the /path/file assuming the full list of filenames can be stored in memory. or we can just do a fresh mount for each file. no bottlenecks, keep the data moving. the reduce worker can be running concurrently with the mapworkers, because the sorting can happen while we are receiving intermediate file names from finished mapworkers. netcat < files |sort |reduceworker cfgfile > out we will end up with a Family of mapreduce programs for different scales of data and grids. Implement a Reader for tar.gz files. 20070121 Test case; use mapreduce to generate /man/index. Each file needs to be deroff'd using man2txt. Call it mapreduce [ -m mntpoint ] mapper reducer path ... If no path arguments are given read pathnames from stdin. Then, we can use fs as a tree walker. - -o output dir option - reader option - file splits of large files - remote progs with cpu - integrate with registry - catch failures and restart - status file - workers write back progress in #bytes read - read paths from stdin