Data Aggregration from CSV File
GoCollaborate provides a simple usage of IO streaming via different data sources, in this section, we will walk you through an example of how to:
- read the data from a csv file to custom struct
- perform computation based on the defined data set
The example source code is available at here
Core
Suppose you have a customer spreadsheet in CSV format with 6 fields mapped to 6 columns each in the data set:
id,first_name,last_name,email,gender,balance
[...],[...],[...],[...],[...],[...]
Now, you want to go through the balance of your existing customers, and figure out the total value of them before drafting a report for your monthly analysis, you can simply create a custom struct where one of the field name of it is mapped exactly to the column name (case insesitive) in the spreadsheet:
import (
"fmt"
"github.com/GoCollaborate/src/artifacts/task"
"github.com/GoCollaborate/src/wrappers/ioHelper"
"github.com/GoCollaborate/src/wrappers/taskHelper"
"net/http"
)
func ExampleJobHandler(w http.ResponseWriter, r *http.Request, bg *task.Background) {
var (
job = task.MakeJob()
path = "./data.csv"
raw = []struct {
Balance float64
}{}
source = task.Collection{}
)
ioHelper.FromPath(path).NewCSVOperator().Fill(&raw)
// reading data from file
// file, _ := os.Open(path)
// ioHelper.FromFile(file).NewCSVOperator().Fill(&data)
// reading data from bytes
// ioHelper.FromBytes([]byte(CSVFILE)).NewCSVOperator().Fill(&data)
// reading data from string
// ioHelper.FromString(CSVFILE).NewCSVOperator().Fill(&data)
for _, r := range raw {
source.Append(r.Balance)
}
job.Tasks(&task.Task{task.SHORT,
task.BASE, "exampleFunc",
source,
task.Collection{},
task.NewTaskContext(struct{}{}), 0})
job.Stacks("core.ExampleTask.Mapper", "core.ExampleTask.Reducer")
bg.Mount(job)
}
The value of each entry will be converted to the generic type Countable, and spead out across the Collaborators in parallel computation.
Create a simple reducer that adds up all the subtotal from its peers:
type SimpleReducer int
func (r *SimpleReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, error) {
var sum float64
for _, s := range maps {
for _, r := range (*s).Result {
sum += r.(float64)
}
}
fmt.Printf("The sum of balance is: %v \n", sum)
return maps, nil
}
Now run the application with:
go run main.go -mode=clbt