Advanced MapReduce

The example below demonstrates how to create a MapReduce Job with automatic staging and aggregation:

Core

func ExampleJobHandler02(w http.ResponseWriter, r *http.Request, bg *task.Background) {
    job := task.MakeJob()
    job.Tasks(
        &task.Task{
            task.SHORT,
            task.BASE, "exampleFunc",
            task.Collection{
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
            },
            task.Collection{0},
            // stage 0
            task.NewTaskContext(struct{}{}), 0},
        &task.Task{
            task.SHORT,
            task.BASE, "exampleFunc",
            task.Collection{
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
            },
            task.Collection{0},
            // stage 0
            task.NewTaskContext(struct{}{}), 0},
        &task.Task{task.SHORT,
            task.BASE, "exampleFunc",
            task.Collection{
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
            },
            task.Collection{0},
            // stage 1
            task.NewTaskContext(struct{}{}), 1},
        &task.Task{task.SHORT,
            task.BASE, "exampleFunc",
            task.Collection{
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
                1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4,
            },
            task.Collection{0},
            // stage 1
            task.NewTaskContext(struct{}{}), 1})
    // map twice, reduce once at stage 0
    job.Stacks("core.ExampleTask.AdvancedMapper", "core.ExampleTask.AdvancedMapper", "core.ExampleTask.AdvancedReducer")
    // map once, reduce twice at stage 1
    job.Stacks("core.ExampleTask.AdvancedMapper", "core.ExampleTask.AdvancedReducer", "core.ExampleTask.AdvancedReducer")

    bg.Mount(job)
}

type AdvancedMapper int

func (m *AdvancedMapper) Map(inmaps map[int]*task.Task) (map[int]*task.Task, error) {
    return taskHelper.Slice(inmaps, 3), nil
}

type AdvancedReducer int

func (r *AdvancedReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, error) {
    var (
        sum       int
        sortedSet = make([]*task.Task, 0)
    )

    // return the sorted keys
    for _, k := range taskHelper.Keys(maps) {
        s := maps[k]
        sortedSet = append(sortedSet, s)
        for _, r := range (*s).Result {
            sum += r.(int)
        }
    }
    fmt.Printf("The sum of numbers is: %v \n\n", sum)
    fmt.Printf("The task set is: %v \n\n", maps)
    fmt.Printf("The sorted set is: %v \n\n", sortedSet)
    return maps, nil
}

Entry

func main() {
    // example 02
    advmp := new(core.AdvancedMapper)
    advrd := new(core.AdvancedReducer)

    collaborate.Set("Mapper", advmp, "core.ExampleTask.AdvancedMapper")
    collaborate.Set("Reducer", advrd, "core.ExampleTask.AdvancedReducer")
    collaborate.Set("Shared", []string{"GET", "POST"}, core.ExampleJobHandler02)
    collaborate.Run()
}



results matching ""

    No results matching ""