FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading

时间:2022-10-27 15:31:15

FunDA的并行数据库读取功能是指在多个线程中同时对多个独立的数据源进行读取。这些独立的数据源可以是在不同服务器上的数据库表,又或者把一个数据库表分成几个独立部分形成的独立数据源。当然,并行读取的最终目的是提高程序的运算效率。在FunDA中具体的实现方式是对多个独立的数据流进行并行读取形成一个统一综合的数据流。我们还是用上次示范所产生的表AQMRPT作为样板数据。在这次示范里我们需要把AQMRPT表中的STATENAME,COUNTYNAME字段抽取出来形成两个独立的表STATE和COUNTY。这两个表结构如下:

  case class StateModel(id: Int, name: String) extends FDAROW
class StateTable(tag: Tag) extends Table[StateModel](tag,"STATE") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length())
def * = (id,name)<>(StateModel.tupled,StateModel.unapply)
}
val StateQuery = TableQuery[StateTable] case class CountyModel(id: Int, name: String) extends FDAROW
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length())
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
}
val CountyQuery = TableQuery[CountyTable]

首先我们用一些铺垫代码把这两个表结构创建出来:

  //assume two distinct db objects
val db_a = Database.forConfig("h2db")
//another db object
val db_b = Database.forConfig("h2db") //create STATE table
val actionCreateState = Models.StateQuery.schema.create
val futCreateState = db_a.run(actionCreateState).andThen {
case Success(_) => println("State Table created successfully!")
case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}")
}
//would carry on even fail to create table
Await.ready(futCreateState,Duration.Inf) //create COUNTY table
val actionCreateCounty = Models.CountyQuery.schema.create
val futCreateCounty = db_a.run(actionCreateCounty).andThen {
case Success(_) => println("County Table created successfully!")
case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}")
}
//would carry on even fail to create table
Await.ready(futCreateCounty,Duration.Inf)

下一步我们把STATENAME从AQMRPT表里抽取出来形成一个数据源(data-source):

  //define query for extracting State names from AQMRPT
val qryStates = AQMRPTQuery.map(_.state).distinct.sorted // .distinctOn(r => r)
case class States(name: String) extends FDAROW
implicit def toStates(row: String) = States(row)
val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _)
val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(,)()

由于COUNTYNAME比较多,我们可以把AQMRPT表按STATENAME拆成三部分A-K、K-P、P-Z。然后把这三部分构建成三个独立的数据源:

  //define query for extracting County names from AQMRPT in separate chunks
//query with state name >A and <K
val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" &&
r.state.toUpperCase < "K")).map(r => (r.state,r.county))
.distinctOn(r => (r._1,r._2))
.sortBy(r => (r._1,r._2)) //query with state name >K and <P
val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" &&
r.state.toUpperCase < "P")).map(r => (r.state,r.county))
.distinctOn(r => (r._1,r._2))
.sortBy(r => (r._1,r._2)) //query with state name >P
val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P")
.map(r => (r.state,r.county))
.distinctOn(r => (r._1,r._2))
.sortBy(r => (r._1,r._2)) case class Counties(state: String, name: String) extends FDAROW
implicit def toCounties(row: (String,String)) = Counties(row._1,row._2)
val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _)
//3 separate streams to extract county names from the same database table AQMRPT
val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(,)()
val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(,)()
val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(,)()

然后对这四个数据源进行并行读取:

  //obtain a combined stream with parallel loading with max of 4 open computation
val combinedStream = fda_par_load(statesStream,countiesA_KStream,countiesK_PStream,countiesP_ZStream)()

现在这个组合的数据流里最少有两种不同的数据元素,分别是:case class States和case class Counties。我们可以在combinedStream上连接两个用户自定义函数(user-defined-task)分别截取States和Counties数据行并且把它们转化成各自的插入数据指令行(ActionRow):

  //define separate rows for different actions
case class StateActionRow(action: FDAAction) extends FDAROW
case class CountyActionRow(action: FDAAction) extends FDAROW
val actionRunner = FDAActionRunner(slick.jdbc.H2Profile) //user-task to catch rows of States type and transform them into db insert actions
def processStates: FDAUserTask[FDAROW] = row => {
row match {
//catch states row and transform it into insert action
case States(stateName) => //target row type
println(s"State name: ${stateName}")
val action = StateQuery += StateModel(,stateName)
fda_next(StateActionRow(action))
case others@ _ => //pass other types to next user-defined-tasks
fda_next(others)
}
}
//user-task to catch rows of Counties type and transform them into db insert actions
def processCounties: FDAUserTask[FDAROW] = row => {
row match {
//catch counties row and transform it into insert action
case Counties(stateName,countyName) => //target row type
println(s"County ${countyName} of ${stateName}")
val action = CountyQuery += CountyModel(,countyName+ " of "+stateName)
fda_next(CountyActionRow(action))
case others@ _ => //pass other types to next user-defined-tasks
fda_next(others)
}
}

经过processStates和processCounties两个自定义函数处理后combinedStream里又多了两种不同的元素:StateActionRow和CountyActionRow。同样,我们可以用两个自定义函数来运算这两种动作行:

  //user-task to catch States insert action rows and run them
def runStateAction: FDAUserTask[FDAROW] = row => {
row match {
case StateActionRow(action) => //this is a state action row type
println(s"runstate: ${action}")
actionRunner.fda_execAction(action)(db_a) //run this query with db_a context
fda_skip
case others@ _ => //otherwise pass alone to next user-defined-tasks
fda_next(others)
}
} //user-task to catch Counties insert action rows and run them
def runCountyAction: FDAUserTask[FDAROW] = row => {
row match {
case CountyActionRow(action) => //this is a county action row type
actionRunner.fda_execAction(action)(db_b) //run this query with db_b context
fda_skip
case others@ _ => //otherwise pass alone to next user-defined-tasks
fda_next(others)
}
}

好了,现在我们可以把这四个自定义函数在combinedStream上组合起来成为一个完整功能的程序:

  combinedStream.appendTask(processStates)
.appendTask(processCounties)
.appendTask(runStateAction)
.appendTask(runCountyAction)
.startRun

然后用startRun来正式运算这个程序。

下面就是本次示范的源代码:

import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import slick.jdbc.H2Profile.api._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import Models._
import scala.concurrent.ExecutionContext.Implicits.global object ParallelLoading extends App { //assume two distinct db objects
val db_a = Database.forConfig("h2db")
//another db object
val db_b = Database.forConfig("h2db") //create STATE table
val actionCreateState = Models.StateQuery.schema.create
val futCreateState = db_a.run(actionCreateState).andThen {
case Success(_) => println("State Table created successfully!")
case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}")
}
//would carry on even fail to create table
Await.ready(futCreateState,Duration.Inf) //create COUNTY table
val actionCreateCounty = Models.CountyQuery.schema.create
val futCreateCounty = db_a.run(actionCreateCounty).andThen {
case Success(_) => println("County Table created successfully!")
case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}")
}
//would carry on even fail to create table
Await.ready(futCreateCounty,Duration.Inf) //define query for extracting State names from AQMRPT
val qryStates = AQMRPTQuery.map(_.state).distinct.sorted // .distinctOn(r => r)
case class States(name: String) extends FDAROW
implicit def toStates(row: String) = States(row)
val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _)
val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(,)() //define query for extracting County names from AQMRPT in separate chunks
//query with state name >A and <K
val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" &&
r.state.toUpperCase < "K")).map(r => (r.state,r.county))
.distinctOn(r => (r._1,r._2))
.sortBy(r => (r._1,r._2)) //query with state name >K and <P
val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" &&
r.state.toUpperCase < "P")).map(r => (r.state,r.county))
.distinctOn(r => (r._1,r._2))
.sortBy(r => (r._1,r._2)) //query with state name >P
val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P")
.map(r => (r.state,r.county))
.distinctOn(r => (r._1,r._2))
.sortBy(r => (r._1,r._2)) case class Counties(state: String, name: String) extends FDAROW
implicit def toCounties(row: (String,String)) = Counties(row._1,row._2)
val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _)
//3 separate streams to extract county names from the same database table AQMRPT
val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(,)()
val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(,)()
val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(,)() //obtain a combined stream with parallel loading with max of 4 open computation
val combinedStream = fda_par_load(statesStream,countiesA_KStream,countiesK_PStream,countiesP_ZStream)() //define separate rows for different actions
case class StateActionRow(action: FDAAction) extends FDAROW
case class CountyActionRow(action: FDAAction) extends FDAROW
val actionRunner = FDAActionRunner(slick.jdbc.H2Profile) //user-task to catch rows of States type and transform them into db insert actions
def processStates: FDAUserTask[FDAROW] = row => {
row match {
//catch states row and transform it into insert action
case States(stateName) => //target row type
println(s"State name: ${stateName}")
val action = StateQuery += StateModel(,stateName)
fda_next(StateActionRow(action))
case others@ _ => //pass other types to next user-defined-tasks
fda_next(others)
}
}
//user-task to catch rows of Counties type and transform them into db insert actions
def processCounties: FDAUserTask[FDAROW] = row => {
row match {
//catch counties row and transform it into insert action
case Counties(stateName,countyName) => //target row type
println(s"County ${countyName} of ${stateName}")
val action = CountyQuery += CountyModel(,countyName+ " of "+stateName)
fda_next(CountyActionRow(action))
case others@ _ => //pass other types to next user-defined-tasks
fda_next(others)
}
} //user-task to catch States insert action rows and run them
def runStateAction: FDAUserTask[FDAROW] = row => {
row match {
case StateActionRow(action) => //this is a state action row type
println(s"runstate: ${action}")
actionRunner.fda_execAction(action)(db_a) //run this query with db_a context
fda_skip
case others@ _ => //otherwise pass alone to next user-defined-tasks
fda_next(others)
}
} //user-task to catch Counties insert action rows and run them
def runCountyAction: FDAUserTask[FDAROW] = row => {
row match {
case CountyActionRow(action) => //this is a county action row type
actionRunner.fda_execAction(action)(db_b) //run this query with db_b context
fda_skip
case others@ _ => //otherwise pass alone to next user-defined-tasks
fda_next(others)
}
} def showRows: FDAUserTask[FDAROW] = row => {
row match {
case States(nm) =>
println("")
println(s"State: $nm")
println("************")
fda_skip
case Counties(s,c) =>
println("")
println(s"County: $c")
println(s"state of $s")
println("------------")
fda_skip
case _ => fda_skip
}
} combinedStream.appendTask(processStates)
.appendTask(processCounties)
.appendTask(runStateAction)
.appendTask(runCountyAction)
.startRun }

FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading的更多相关文章

  1. 浅析基于微软SQL Server 2012 Parallel Data Warehouse的大数据解决方案

    作者 王枫发布于2014年2月19日 综述 随着越来越多的组织的数据从GB.TB级迈向PB级,标志着整个社会的信息化水平正在迈入新的时代 – 大数据时代.对海量数据的处理.分析能力,日益成为组织在这个 ...

  2. 转:浅析基于微软SQL Server 2012 Parallel Data Warehouse的大数据解决方案

    综述 随着越来越多的组织的数据从GB.TB级迈向PB级,标志着整个社会的信息化水平正在迈入新的时代 – 大数据时代.对海量数据的处理.分析能力,日益成为组织在这个时代决胜未来的关键因素,而基于大数据的 ...

  3. pytorch例子学习-DATA LOADING AND PROCESSING TUTORIAL

    参考:https://pytorch.org/tutorials/beginner/data_loading_tutorial.html DATA LOADING AND PROCESSING TUT ...

  4. FunDA(15)- 示范:任务并行运算 - user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算.原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例.这些函数运行实例同时在各自不同的线程里同步运算直至耗尽所有输入. ...

  5. FunDA(11)- 数据库操作的并行运算:Parallel data processing

    FunDA最重要的设计目标之一就是能够实现数据库操作的并行运算.我们先重温一下fs2是如何实现并行运算的.我们用interleave.merge.either这几种方式来同时处理两个Stream里的元 ...

  6. netty源码解解析&lpar;4&period;0&rpar;-14 Channel NIO实现&colon;读取数据

     本章分析Nio Channel的数据读取功能的实现. Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, Eve ...

  7. Lombok&lpar;1&period;14&period;8&rpar; - &commat;Getter&comma; &commat;Setter&comma; &commat;ToString&comma; &commat;EqualsAndHashCode &amp&semi; &commat;Data

    @Getter / @Setter @Getter 和 @Setter,分别实现了 Gette r和 Setter 方法. package com.huey.hello.bean; import ja ...

  8. Fast data loading from files to R

    Recently we were building a Shiny App in which we had to load data from a very large dataframe. It w ...

  9. DCGAN增强图片数据集

    DCGAN增强图片数据集 1.Dependencies Python 3.6+ PyTorch 0.4.0 numpy 1.14.1, matplotlib 2.2.2, scipy 1.1.0 im ...

随机推荐

  1. 前端 head 中mate 详解

    <meta name="viewport" content="width=device-width,height=device-height,initial-sca ...

  2. char &ast;p 与char p&lbrack;&rsqb; 比较

    看看下面的程序的输出: #include <stdio.h>char *returnStr(){    char *p="hello world!";    retur ...

  3. 用C&sol;C&plus;&plus;开发基于VLC SDK的视频播放器

    在windows系统如果开发万能播放器,一般都是基本DirectShow来开发,开发也很简单,但缺点也很多,一个文件格式是否能够播放完全取决于你 是否安装了正确的解析器和解码器,即使现在有了万能解器安 ...

  4. React Native 系列&lpar;二&rpar; -- React入门知识

    前言 本系列是基于React Native版本号0.44.3写的,最初学习React Native的时候,完全没有接触过React和JS,本文的目的是为了给那些JS和React小白提供一个快速入门,让 ...

  5. C&num;中RichEdit控件,保存文本和图片到mysql数据库

    分别通过内存流和RTF文件保存 方法1: //建立内存流 MemoryStream ms = new MemoryStream(); //ms.Position = 0; //把当前的richtext ...

  6. 洛谷 P1903 &lbrack;国家集训队&rsqb;数颜色 &sol; 维护队列

    墨墨购买了一套N支彩色画笔(其中有些颜色可能相同),摆成一排,你需要回答墨墨的提问.墨墨会向你发布如下指令: 1. \(Q\) \(L\) \(R\) 代表询问你从第L支画笔到第R支画笔*有几种不同 ...

  7. sonar结合jenkins

    一.下载jenkins插件 二.系统设置 三.获取token值 4.调整 Jenkins 构建设置

  8. PHP使用文件锁解决高并发问题示例

    新建一个.txt文件,文件中什么都不用写. [一].阻塞(等待)模式:(只要有其他进程已经加锁文件,当前进程会一直等其他进程解锁文件) <?php //连接数据库 $con=mysqli_con ...

  9. 2d场景背景无限滚动

    之前都是直接借用的DoTween插件,两个背景无限交替位置进行,还有就是三个背景在利用Trigger进行判断显示与否循环: 示例脚本: private List<RectTransform&gt ...

  10. win2008R2管理员密码修改文档

    场景:忘记了win2008R2服务器的管理员密码.解决办法:1. 制作一个U盘启动盘:2. 系统通过U盘启动进入WINpe系统3. 在知道Win2008安装位置的情况下:查找C:\windows\sy ...