The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped on a defined key and groups of both DataSets that share the same key are handed together to a user-defined co-group function. If for a specific key only one DataSet has a group, the co-group function is called with this group and an empty group. A co-group function can separately iterate over the elements of both groups and return an arbitrary number of result elements.
Similar to Reduce, GroupReduce, and Join, keys can be defined using the different key-selection methods.
1 与 join 类似。join 是 inner join 如果条件不满足,则结果中没有数据。
而CoGroup 要看 co-group function 的逻辑怎么处理。
2 按照 key 分组为集合。按照条件 eque 的 key 会进入co-group function(通过with调用)。
3 如果 DataSets 中没有数据则处理集合为空。
package com.river.mydemo.batch;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Set;
/**
* @author river
* @date 2019/5/17 15:28
**/
public class CoGroupTest {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Integer>> iVals = env.fromElements(
new Tuple2<String, Integer>("river", 30),
//new Tuple2<String, Integer>("lucy", 20),
// new Tuple2<String, Integer>("frank", 33),
new Tuple2<String, Integer>("cat", 5));
DataSet<Tuple2<String, Double>> dVals = env.fromElements(
new Tuple2<String, Double>("river", 1d),
//new Tuple2<String, Double>("lucy", 1d),
new Tuple2<String, Double>("lucy", 1d),
new Tuple2<String, Double>("frank", 1d),
new Tuple2<String, Double>("cat", 1d));
DataSet<Double> output = (DataSet<Double>) iVals.coGroup(dVals)
// group first DataSet on first tuple field
.where(0)
// group second DataSet on first tuple field
.equalTo(0)
// apply CoGroup function on each pair of groups
.with(new MyCoGrouper());
output.print();
}
public static class MyCoGrouper
implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
Iterable<Tuple2<String, Double>> dVals,
Collector<Double> out) {
Set<Integer> ints = new HashSet<Integer>();
System.out.println("ivals = " + new Gson().toJson(iVals) + " dVals = " + new Gson().toJson(dVals));
// add all Integer values in group to set
for (Tuple2<String, Integer> val : iVals) {
ints.add(val.f1);
}
// multiply each Double value with each unique Integer values of group
for (Tuple2<String, Double> val : dVals) {
for (Integer i : ints) {
out.collect(val.f1 * i);
}
}
}
}
}
执行结果
ivals = [] dVals = {"next":{"f0":"frank","f1":1.0},"iteratorAvailable":true}
ivals = {"next":{"f0":"cat","f1":5},"iteratorAvailable":true} dVals = {"next":{"f0":"cat","f1":1.0},"iteratorAvailable":true}
ivals = {"next":{"f0":"river","f1":30},"iteratorAvailable":true} dVals = {"next":{"f0":"river","f1":1.0},"iteratorAvailable":true}
ivals = [] dVals = {"next":{"f0":"lucy","f1":1.0},"iteratorAvailable":true}
![](https://img.haomeiwen.com/i8451293/913ce6933e5e9232.png)
网友评论