使用数据流加入Google Cloud Platform中的两个json

时间:2021-12-10 15:46:00

I want to find out only female employees out of the two different JSON files and select only the fields which we are interested in and write the output into another JSON.

我想从两个不同的JSON文件中找出女性员工,只选择我们感兴趣的字段并将输出写入另一个JSON。

Also I am trying to implement it in Google's cloud platform using Dataflow. Can someone please provide any sample Java code which can be implemented to get the result.

我也试图使用Dataflow在Google的云平台上实现它。有人可以提供任何可以实现的示例Java代码来获得结果。

Employee JSON

员工JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

Department JSON

部门JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

The expected output JSON file should be like

预期的输出JSON文件应该是这样的

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}

2 个解决方案

#1


6  

You can do this using CoGroupByKey (where shuffle will be used), or using side inputs, if your departments collection is significantly smaller.

如果您的部门集合明显较小,则可以使用CoGroupByKey(将使用shuffle)或使用侧输入来执行此操作。

I will give you code in Python, but you can use the same pipeline in Java.

我将在Python中为您提供代码,但您可以在Java中使用相同的管道。


With side inputs, you will:

使用侧输入,您将:

  1. Convert your departments PCollection into a dictionary that maps dept_id to the department JSON dictionary.

    将您的部门PCollection转换为将dept_id映射到部门JSON字典的字典。

  2. Then you take the employees PCollection as main input, where you can use the dept_id to get the JSON for each department in the departments PCollection.

    然后,您将员工PCollection作为主要输入,您可以使用dept_id为部门PCollection中的每个部门获取JSON。

Like so:

像这样:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

With CoGroupByKey, you can use dept_id as a key to group both collections. This will result in a PCollection of key-value pairs where the key is the dept_id, and the value are two iterables of the department, and the employees in that department.

使用CoGroupByKey,您可以使用dept_id作为键来对两个集合进行分组。这将导致PCollection的键值对,其中键是dept_id,值是部门的两个可迭代,以及该部门中的员工。

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)

#2


2  

Someone has asked for a Java-based solution for this question. Here is the Java code for this. It's more verbose, but it does essentially the same thing.

有人要求为这个问题提供基于Java的解决方案。这是Java代码。它更冗长,但它的功能基本相同。

// First we want to load all departments, and put them into a PCollection
// of key-value pairs, where the Key is their identifier. We assume that it's String-type.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// We then convert this PCollection into a map-type PCollectionView.
// We can access this map directly within a ParDo.
PCollectionView<Map<String, Department>> departmentSideInput = 
    departments.apply("ToMapSideInput", View.<String, Department>asMap());

// We load the PCollection of employees
PCollection<Employee> employees = p.apply(new LoadEmployees());

// Let's suppose that we will *extend* an employee's information with their
// Department information. I have assumed the existence of an ExtendedEmployee
// class to represent an employee extended with department information.
class JoinDeptEmployeeDoFn extends DoFn<Employee, ExtendedEmployee> {

  @ProcessElement
  public void processElement(ProcessContext c) {
    // We obtain the Map-type side input with department information.
    Map<String, Department> departmentMap = c.sideInput(departmentSideInput);
    Employee empl = c.element();
    Department dept = departmentMap.get(empl.getDepartmentId(), null);
    if (department == null) return;

    ExtendedEmployee result = empl.extendWith(dept);
    c.output(result);
  }

}

// We apply the ParDo to extend the employee with department information
// and specify that it takes in a departmentSideInput.
PCollection<ExtendedEmployee> extendedEmployees = 
    employees.apply(
        ParDo.of(new JoinDeptEmployeeDoFn()).withSideInput(departmentSideInput));

With CoGroupByKey, you can use dept_id as a key to group both collections. The way this looks in Beam Java SDK is a CoGbkResult.

使用CoGroupByKey,您可以使用dept_id作为键来对两个集合进行分组。在Beam Java SDK中看起来的方式是CoGbkResult。

// We load the departments, and make them a key-value collection, to Join them
// later with employees.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// Because we will perform a join, employees also need to be put into
// key-value pairs, where their key is their *department id*.
PCollection<KV<String, Employee>> employees = 
    p.apply(new LoadEmployees())
     .apply("getKey", MapElements.via((Employee empl) -> KV.of(empl.getDepartmentId(), empl)));

// We define a DoFn that is able to join a single department with multiple
// employees.
class JoinEmployeesWithDepartments extends DoFn<KV<String, CoGbkResult>, ExtendedEmployee> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<...> elm = c.element();
    // We assume one department with the same ID, and assume that
    // employees always have a department available.
    Department dept = elm.getValue().getOnly(departmentsTag);
    Iterable<Employee> employees = elm.getValue().getAll(employeesTag);

    for (Employee empl : employees) {
      ExtendedEmployee result = empl.extendWith(dept);
      c.output(result);
    }
  }
}

// The syntax for a CoGroupByKey operation is a bit verbose.
// In this step we define a TupleTag, which serves as identifier for a
// PCollection.
final TupleTag<String> employeesTag = new TupleTag<>();
final TupleTag<String> departmentsTag = new TupleTag<>();

// We use the PCollection tuple-tags to join the two PCollections.
PCollection<KV<String, CoGbkResult>> results =
    KeyedPCollectionTuple.of(departmentsTag, departments)
        .and(employeesTag, employees)
        .apply(CoGroupByKey.create());

// Finally, we convert the joined PCollections into a kind that
// we can use: ExtendedEmployee.
PCollection<ExtendedEmployee> extendedEmployees =
    results.apply("ExtendInformation", ParDo.of(new JoinEmployeesWithDepartments()));

#1


6  

You can do this using CoGroupByKey (where shuffle will be used), or using side inputs, if your departments collection is significantly smaller.

如果您的部门集合明显较小,则可以使用CoGroupByKey(将使用shuffle)或使用侧输入来执行此操作。

I will give you code in Python, but you can use the same pipeline in Java.

我将在Python中为您提供代码,但您可以在Java中使用相同的管道。


With side inputs, you will:

使用侧输入,您将:

  1. Convert your departments PCollection into a dictionary that maps dept_id to the department JSON dictionary.

    将您的部门PCollection转换为将dept_id映射到部门JSON字典的字典。

  2. Then you take the employees PCollection as main input, where you can use the dept_id to get the JSON for each department in the departments PCollection.

    然后,您将员工PCollection作为主要输入,您可以使用dept_id为部门PCollection中的每个部门获取JSON。

Like so:

像这样:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

With CoGroupByKey, you can use dept_id as a key to group both collections. This will result in a PCollection of key-value pairs where the key is the dept_id, and the value are two iterables of the department, and the employees in that department.

使用CoGroupByKey,您可以使用dept_id作为键来对两个集合进行分组。这将导致PCollection的键值对,其中键是dept_id,值是部门的两个可迭代,以及该部门中的员工。

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)

#2


2  

Someone has asked for a Java-based solution for this question. Here is the Java code for this. It's more verbose, but it does essentially the same thing.

有人要求为这个问题提供基于Java的解决方案。这是Java代码。它更冗长,但它的功能基本相同。

// First we want to load all departments, and put them into a PCollection
// of key-value pairs, where the Key is their identifier. We assume that it's String-type.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// We then convert this PCollection into a map-type PCollectionView.
// We can access this map directly within a ParDo.
PCollectionView<Map<String, Department>> departmentSideInput = 
    departments.apply("ToMapSideInput", View.<String, Department>asMap());

// We load the PCollection of employees
PCollection<Employee> employees = p.apply(new LoadEmployees());

// Let's suppose that we will *extend* an employee's information with their
// Department information. I have assumed the existence of an ExtendedEmployee
// class to represent an employee extended with department information.
class JoinDeptEmployeeDoFn extends DoFn<Employee, ExtendedEmployee> {

  @ProcessElement
  public void processElement(ProcessContext c) {
    // We obtain the Map-type side input with department information.
    Map<String, Department> departmentMap = c.sideInput(departmentSideInput);
    Employee empl = c.element();
    Department dept = departmentMap.get(empl.getDepartmentId(), null);
    if (department == null) return;

    ExtendedEmployee result = empl.extendWith(dept);
    c.output(result);
  }

}

// We apply the ParDo to extend the employee with department information
// and specify that it takes in a departmentSideInput.
PCollection<ExtendedEmployee> extendedEmployees = 
    employees.apply(
        ParDo.of(new JoinDeptEmployeeDoFn()).withSideInput(departmentSideInput));

With CoGroupByKey, you can use dept_id as a key to group both collections. The way this looks in Beam Java SDK is a CoGbkResult.

使用CoGroupByKey,您可以使用dept_id作为键来对两个集合进行分组。在Beam Java SDK中看起来的方式是CoGbkResult。

// We load the departments, and make them a key-value collection, to Join them
// later with employees.
PCollection<KV<String, Department>> departments = 
    p.apply(new LoadDepts())
     .apply("getKey", MapElements.via((Department dept) -> KV.of(dept.getId(), dept)));

// Because we will perform a join, employees also need to be put into
// key-value pairs, where their key is their *department id*.
PCollection<KV<String, Employee>> employees = 
    p.apply(new LoadEmployees())
     .apply("getKey", MapElements.via((Employee empl) -> KV.of(empl.getDepartmentId(), empl)));

// We define a DoFn that is able to join a single department with multiple
// employees.
class JoinEmployeesWithDepartments extends DoFn<KV<String, CoGbkResult>, ExtendedEmployee> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<...> elm = c.element();
    // We assume one department with the same ID, and assume that
    // employees always have a department available.
    Department dept = elm.getValue().getOnly(departmentsTag);
    Iterable<Employee> employees = elm.getValue().getAll(employeesTag);

    for (Employee empl : employees) {
      ExtendedEmployee result = empl.extendWith(dept);
      c.output(result);
    }
  }
}

// The syntax for a CoGroupByKey operation is a bit verbose.
// In this step we define a TupleTag, which serves as identifier for a
// PCollection.
final TupleTag<String> employeesTag = new TupleTag<>();
final TupleTag<String> departmentsTag = new TupleTag<>();

// We use the PCollection tuple-tags to join the two PCollections.
PCollection<KV<String, CoGbkResult>> results =
    KeyedPCollectionTuple.of(departmentsTag, departments)
        .and(employeesTag, employees)
        .apply(CoGroupByKey.create());

// Finally, we convert the joined PCollections into a kind that
// we can use: ExtendedEmployee.
PCollection<ExtendedEmployee> extendedEmployees =
    results.apply("ExtendInformation", ParDo.of(new JoinEmployeesWithDepartments()));