Advanced MapReduce
The example below demonstrates how to create a MapReduce Job with automatic staging and aggregation:
Core
func ExampleJobHandler02(w http.ResponseWriter, r *http.Request, bg *task.Background) {
job := task.MakeJob()
job.Tasks(
&task.Task{
task.SHORT,
task.BASE, "exampleFunc",
task.Collection{
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
},
task.Collection{0},
// stage 0
task.NewTaskContext(struct{}{}), 0},
&task.Task{
task.SHORT,
task.BASE, "exampleFunc",
task.Collection{
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
},
task.Collection{0},
// stage 0
task.NewTaskContext(struct{}{}), 0},
&task.Task{task.SHORT,
task.BASE, "exampleFunc",
task.Collection{
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
},
task.Collection{0},
// stage 1
task.NewTaskContext(struct{}{}), 1},
&task.Task{task.SHORT,
task.BASE, "exampleFunc",
task.Collection{
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
},
task.Collection{0},
// stage 1
task.NewTaskContext(struct{}{}), 1})
// map twice, reduce once at stage 0
job.Stacks("core.ExampleTask.AdvancedMapper", "core.ExampleTask.AdvancedMapper", "core.ExampleTask.AdvancedReducer")
// map once, reduce twice at stage 1
job.Stacks("core.ExampleTask.AdvancedMapper", "core.ExampleTask.AdvancedReducer", "core.ExampleTask.AdvancedReducer")
bg.Mount(job)
}
type AdvancedMapper int
func (m *AdvancedMapper) Map(inmaps map[int]*task.Task) (map[int]*task.Task, error) {
return taskHelper.Slice(inmaps, 3), nil
}
type AdvancedReducer int
func (r *AdvancedReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, error) {
var (
sum int
sortedSet = make([]*task.Task, 0)
)
// return the sorted keys
for _, k := range taskHelper.Keys(maps) {
s := maps[k]
sortedSet = append(sortedSet, s)
for _, r := range (*s).Result {
sum += r.(int)
}
}
fmt.Printf("The sum of numbers is: %v \n\n", sum)
fmt.Printf("The task set is: %v \n\n", maps)
fmt.Printf("The sorted set is: %v \n\n", sortedSet)
return maps, nil
}
Entry
func main() {
// example 02
advmp := new(core.AdvancedMapper)
advrd := new(core.AdvancedReducer)
collaborate.Set("Mapper", advmp, "core.ExampleTask.AdvancedMapper")
collaborate.Set("Reducer", advrd, "core.ExampleTask.AdvancedReducer")
collaborate.Set("Shared", []string{"GET", "POST"}, core.ExampleJobHandler02)
collaborate.Run()
}