大数据周会-本周学习内容总结05

时间:2022-02-22 01:07:43

目录

00【陈师兄字节大厂经验分享】

01【MySQL数据同步ES】

1.1【Linux MySQL数据库数据同步Windows ES】

1.1.1【自动创建分片数为0的索引】

1.1.2【数据导入时间过长】

1.2【数据同步,增删改】

1.3【遇到的细节问题】

1.3.1【logstash配置文件,索引名必须小写】

1.4【Linux搭建es】

02【调研】

2.1【Mysql-ES 全量—增量更新机制并实现】

2.2【数据检索(也叫超市或中台)】


00【陈师兄字节大厂经验分享】

大数据
书:《大数据之路》阿里巴巴

大数据,计算和存储,分别用什么!

一致性模型和一致性协议

数据同步:dts canal datax sqoop

数据倾斜方面数据优化

拍照存储计算选型
存储 纠删码

省钱 减少数据资源

监控binlog日志,标计es,es增加一列,查询的时候根据列显示数据!
canal   logstash   并发流读取删除

计算:spark  sparkstreaming  flink  kafka
flume scpoop监听日志文件夹

rabbitmq保证数据一致性,kafka会丢失数据

数据调度,面经,肯定要问!

01【MySQL数据同步ES】

1.1【Linux MySQL数据库数据同步Windows ES】

步骤

  1. MySQL数据库Jar包
  2. xxx.conf配置文件,配置MySQL数据库信息与ES信息
  3. logstash -f ../config/gaokao/mysql03.conf
input {
	stdin {
    }

    jdbc { # 01
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from course_match_requirementcode"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "course_match_requirementcode"
	}

    jdbc { # 02
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from enroll_plan_finally"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "enroll_plan_finally"
	}

    jdbc { # 03
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from epp"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "epp"
	}

    jdbc { # 04
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from id_2022_2021"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "id_2022_2021"
	}

    jdbc { # 05
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from major_22_21"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "major_22_21"
	}

    jdbc { # 06
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from major_info_category"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "major_info_category"
	}

    jdbc { # 07
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from major_name_hot_cold"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "major_name_hot_cold"
	}

    jdbc { # 08
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from major_score"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "major_score"
	}

    jdbc { # 09
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from recommend_form"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "recommend_form"
	}

    jdbc { # 10
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from recommend_form_2021"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "recommend_form_2021"
	}

    jdbc { # 11
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from requirement_code"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "requirement_code"
	}

    jdbc { # 12
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from school_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "school_info"
	}

    jdbc { # 13
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://x.x.x.x:3306/recommend_form"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "hadoop"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "D:\\elk\\logstash\\logstash-7.1.1\\jar\\mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from sectionscore"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "sectionscore"
	}
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
	if[type] == "course_match_requirementcode" { # 01
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_course_match_requirementcode"
			document_id => "%{id}"
		}
	}

	if[type] == "enroll_plan_finally" { # 02
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_enroll_plan_finally"
			document_id => "%{id}"
		}
	}

	if[type] == "epp" { # 03
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_epp"
			document_id => "%{id}"
		}
	}

	if[type] == "id_2022_2021" { # 04
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_id_2022_2021"
			document_id => "%{id}"
		}
	}

	if[type] == "major_22_21" { # 05
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_major_22_21"
			document_id => "%{id}"
		}
	}

	if[type] == "major_info_category" { # 06
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_major_info_category"
			document_id => "%{id}"
		}
	}

	if[type] == "major_name_hot_cold" { # 07
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_major_name_hot_cold"
			document_id => "%{id}"
		}
	}

	if[type] == "major_score" { # 08
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_major_score"
			document_id => "%{id}"
		}
	}

	if[type] == "recommend_form" { # 09
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_recommend_form"
			document_id => "%{id}"
		}
	}

	if[type] == "recommend_form_2021" { # 10
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_recommend_form_2021"
			document_id => "%{id}"
		}
	}

	if[type] == "requirement_code" { # 11
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_requirement_code"
			document_id => "%{id}"
		}
	}

	if[type] == "school_info" { # 12
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_school_info"
			document_id => "%{id}"
		}
	}

	if[type] == "sectionscore" { # 13
		elasticsearch {
			hosts => ["127.0.0.1:9200"]
			index => "gaokao_test_section_score"
			document_id => "%{id}"
		}
	}

    stdout {
        codec => json_lines
    }
}

1.1.1【自动创建分片数为0的索引】

es中若未创建索引,则Logstash会根据配置文件xxx.conf中的信息自动创建索引:

大数据周会-本周学习内容总结05

大数据周会-本周学习内容总结05

查询数据,enroll_plan_finally数据库,在数据浏览页面数据字段展示不完全。

大数据周会-本周学习内容总结05

进行基本查询,可以查到单条数据的所有字段。

大数据周会-本周学习内容总结05

1.1.2【数据导入时间过长】

大数据周会-本周学习内容总结05

1.2【数据同步,增删改】

修改

大数据周会-本周学习内容总结05

1.3【遇到的细节问题】

1.3.1【logstash配置文件,索引名必须小写】

大数据周会-本周学习内容总结05

1.4【Linux搭建es】

大数据周会-本周学习内容总结05

大数据周会-本周学习内容总结05 大数据周会-本周学习内容总结05

大数据周会-本周学习内容总结05

02【调研】

2.1【Mysql-ES 全量—增量更新机制并实现】

ELK 是目前业界使用最广泛的日志数据处理平台。

调研文档:

  1. logstash jdbc全量更新与增量更新_我在北国不背锅的博客-CSDN博客,时间戳、唯一主键id
  2. centos7配置Logstash同步Mysql数据到Elasticsearch - JavaClub全栈架构师技术笔记

第一次同步时需要全量的数据,之后则需要定时去同步增量数据。1、根据唯一主键,2、根据时间戳。

2.2【数据检索(也叫超市或中台)】

数据检索

  1. 概念:数据检索即把数据库中存储的数据根据用户的需求提取出来。数据检索的结果会生成一个数据表,既可以放回数据库,也可以作为进一步处理的对象。
  2. 工作流程:先排序再筛选。
  3. 检索方法:顺序检索、对分检索、索引检索。

数据中台是对既有/新建信息化系统业务与数据的沉淀,是实现数据赋能新业务、新应用的中间、支撑性平台。

各种信息系统大多是独立建设的,无法做到信息的互联互通,导致形成了多个数据孤岛。数据中台的作用是融合新老信息,整合各个孤岛上的信息,快速形成数据服务能力,为企业经营决策、精细化运营提供支持。

数据中台详解:数据中台详解

大数据周会-本周学习内容总结05

数据中台解决方案:数据中台解决方案-最新全套文件_数据中台 技术方案

大数据周会-本周学习内容总结05