Data Aggregration from CSV File

GoCollaborate provides a simple usage of IO streaming via different data sources, in this section, we will walk you through an example of how to:

  • read the data from a csv file to custom struct
  • perform computation based on the defined data set

The example source code is available at here

Core

Suppose you have a customer spreadsheet in CSV format with 6 fields mapped to 6 columns each in the data set:

id,first_name,last_name,email,gender,balance
[...],[...],[...],[...],[...],[...]

Now, you want to go through the balance of your existing customers, and figure out the total value of them before drafting a report for your monthly analysis, you can simply create a custom struct where one of the field name of it is mapped exactly to the column name (case insesitive) in the spreadsheet:

import (
    "fmt"
    "github.com/GoCollaborate/src/artifacts/task"
    "github.com/GoCollaborate/src/wrappers/ioHelper"
    "github.com/GoCollaborate/src/wrappers/taskHelper"
    "net/http"
)

func ExampleJobHandler(w http.ResponseWriter, r *http.Request, bg *task.Background) {
    var (
        job  = task.MakeJob()
        path = "./data.csv"
        raw  = []struct {
            Balance float64
        }{}
        source = task.Collection{}
    )

    ioHelper.FromPath(path).NewCSVOperator().Fill(&raw)

    // reading data from file
    // file, _ := os.Open(path)
    // ioHelper.FromFile(file).NewCSVOperator().Fill(&data)

    // reading data from bytes
    // ioHelper.FromBytes([]byte(CSVFILE)).NewCSVOperator().Fill(&data)

    // reading data from string
    // ioHelper.FromString(CSVFILE).NewCSVOperator().Fill(&data)

    for _, r := range raw {
        source.Append(r.Balance)
    }

    job.Tasks(&task.Task{task.SHORT,
        task.BASE, "exampleFunc",
        source,
        task.Collection{},
        task.NewTaskContext(struct{}{}), 0})
    job.Stacks("core.ExampleTask.Mapper", "core.ExampleTask.Reducer")

    bg.Mount(job)
}

The value of each entry will be converted to the generic type Countable, and spead out across the Collaborators in parallel computation.

Create a simple reducer that adds up all the subtotal from its peers:

type SimpleReducer int

func (r *SimpleReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, error) {
    var sum float64
    for _, s := range maps {
        for _, r := range (*s).Result {
            sum += r.(float64)
        }
    }
    fmt.Printf("The sum of balance is: %v \n", sum)
    return maps, nil
}

Now run the application with:

go run main.go -mode=clbt



results matching ""

    No results matching ""