使用Spark+Cassandra打造高性能数据分析平台

时间:2022-07-28 19:39:34

Cassandra是一个分布式、高可扩展的数据库,用户可以创建线上应用程序,实时处理大量数据。

  Apache Spark是应用于Hadoop集群的处理引擎,在内存条件下可以为Hadoop加速100倍,在磁盘上运行时也能实现十倍的加速。Spark还提供SQL、流数据处理、机器学习和图型计算等功能。
  Cassandra与Spark的结合,让端到端的分析工作流的实现更为容易。另外,交易型数据库的分析性能也能得到很大的提升,企业可以更快地响应客户需求。
  对于需要向客户提供实时推荐和个性化的在线体验的公司,Cassandra与Spark的结合堪称福音。
Hadoop扩展性有余,实时性不足。Storm这样的实时流处理框架,但它只有处理固定的流程时才具有优势,弹性查询能力欠佳。
 现在Ooyala正在运行的就是Spark/Cassandra架构



1. Cassandra

NoSQL数据库的选择之痛,目前市面上有近150多种NoSQL数据库,如何在这么庞杂的队伍中选中适合业务场景的佼佼者,实非易事。

好的是经过大量的筛选,大家比较肯定的几款NoSQL数据库分别是HBase、MongoDB和Cassandra。

Cassandra在哪些方面吸引住了大量的开发人员呢?下面仅做一个粗略的分析。

1.1 高可靠性

Cassandra采用gossip作为集群中结点的通信协议,该协议整个集群中的节点都处于同等地位,没有主从之分,这就使得任一节点的退出都不会导致整个集群失效。

Cassandra和HBase都是借鉴了Google BigTable的思想来构建自己的系统,但Cassandra另一重要的创新就是将原本存在于文件共享架构的p2p(peer to peer)引入了NoSQL。

P2P的一大特点就是去中心化,集群中的所有节点享有同等地位,这极大避免了单个节点退出而使整个集群不能工作的可能。

与之形成对比的是HBase采用了Master/Slave的方式,这就存在单点失效的可能。

1.2 高可扩性

随着时间的推移,集群中原有的规模不足以存储新增加的数据,此时进行系统扩容。Cassandra级联可扩,非常容易实现添加新的节点到已有集群,操作简单。

1.3 最终一致性

分布式存储系统都要面临CAP定律问题,任何一个分布式存储系统不可能同时满足一致性(consistency),可用性(availability)和分区容错性(partition tolerance)。

Cassandra是优先保证AP,即可用性和分区容错性。

使用Spark+Cassandra打造高性能数据分析平台 

Cassandra为写操作和读操作提供了不同级别的一致性选择,用户可以根据具体的应用场景来选择不同的一致性级别。

1.4 高效写操作

写入操作非常高效,这对于实时数据非常大的应用场景,Cassandra的这一特性无疑极具优势。

数据读取方面则要视情况而定:

  • 如果是单个读取即指定了键值,会很快的返回查询结果。
  • 如果是范围查询,由于查询的目标可能存储在多个节点上,这就需要对多个节点进行查询,所以返回速度会很慢
  • 读取全表数据,非常低效。

1.5 结构化存储

Cassandra是一个面向列的数据库,对那些从RDBMS方面转过来的开发人员来说,其学习曲线相对平缓。

Cassandra同时提供了较为友好CQL语言,与SQL语句相似度很高。

1.6 维护简单

从系统维护的角度来说,由于Cassandra的对等系统架构,使其维护操作简单易行。如添加节点,删除节点,甚至于添加新的数据中心,操作步骤都非常的简单明了。

参考资料

  • 1.http://cassandra.apache.org
  • 2.http://www.datastax.com/doc
  • 3.http://planetcassandra.org/documentation/

2. Cassandra数据模型

2.1 单表查询

2.1.1 单表主键查询

在建立个人信息数据库的时候,以个人身份证id为主键,查询的时候也只以身份证为关键字进行查询,则表可以设计成为:

create table person (
userid text primary key,
fname text,
lname text,
age int,
gender int);

Primary key中的第一个列名是作为Partition key。也就是说根据针对partition key的hash结果决定将记录存储在哪一个partition中,如果不湊巧的情况下单一主键导致所有的hash结果全部落在同一分区,则会导致该分区数据被撑满。

解决这一问题的办法是通过组合分区键(compsoite key)来使得数据尽可能的均匀分布到各个节点上。

举例来说,可能将(userid,fname)设置为复合主键。那么相应的表创建语句可以写成

create table person (
userid text,
fname text,
lname text,
gender int,
age int,
primary key((userid,fname),lname);
) with clustering order by (lname desc);

稍微解释一下primary key((userid, fname),lname)的含义:

  • 其中(userid,fname)称为组合分区键(composite partition key)
  • lname是聚集列(clustering column)
  • ((userid,fname),lname)一起称为复合主键(composite primary key)

2.1.2 单表非主键查询

如果要查询表person中具有相同的first name的人员,那么就必须针对fname创建相应的索引,否则查询速度会非常缓慢。

Create index on person(fname);

Cassandra目前只能对表中的某一列建立索引,不允许对多列建立联合索引。

2.2 多表关联查询

Cassandra并不支持关联查询,也不支持分组和聚合操作。

那是不是就说明Cassandra只是看上去很美其实根本无法解决实际问题呢?答案显然是No,只要你不坚持用RDBMS的思路来解决问题就是了。

比如我们有两张表,一张表(Departmentt)记录了公司部门信息,另一张表(employee)记录了公司员工信息。显然每一个员工必定有归属的部门,如果想知道每一个部门拥有的所有员工。如果是用RDBMS的话,SQL语句可以写成:

select * from employee e , department d where e.depId = d.depId;
要用Cassandra来达到同样的效果,就必须在employee表和department表之外,再创建一张额外的表(dept_empl)来记录每一个部门拥有的员工信息。

Create table dept_empl (
deptId text,

看到这里想必你已经明白了,在Cassandra中通过数据冗余来实现高效的查询效果。将关联查询转换为单一的表操作。

2.3 分组和聚合

在RDBMS中常见的group by和max、min在Cassandra中是不存在的。

如果想将所有人员信息按照姓进行分组操作的话,那该如何创建数据模型呢?

Create table fname_person (
fname text,
userId text,
primary key(fname)
);
 2.4 子查询  

Cassandra不支持子查询,下图展示了一个在MySQL中的子查询例子: 

使用Spark+Cassandra打造高性能数据分析平台 

要用Cassandra来实现,必须通过添加额外的表来存储冗余信息。 

Create table office_empl (
officeCode text,
country text,
lastname text,
firstname,
primary key(officeCode,country));
create index on office_empl(country);

 2.5 小结

总的来说,在建立Cassandra数据模型的时候,要求对数据的读取需求进可能的清晰,然后利用反范式的设计方式来实现快速的读取,原则就是以空间来换取时间。


Cassandra高并发数据读取实现剖析

本文就spark-cassandra-connector的一些实现细节进行探讨,主要集中于如何快速将大量的数据从Cassandra中读取到本地内存或磁盘。

数据分区

存储在Cassandra中的数据一般都会比较多,记录数在千万级别或上亿级别是常见的事。如何将这些表中的内容快速加载到本地内存就是一个非常现实的问题。

解决这一挑战的思路从大的方面来说是比较简单的,那就是将整张表中的内容分成不同的区域,然后分区加载,不同的分区可以在不同的线程或进程中加载,利用并行化来减少整体加载时间。

顺着这一思路出发,要问的问题就是Cassandra中的数据如何才能分成不同的区域。

不同于MySQL,在Cassandra中是不存在Sequence Id这样的类型的,也就是说无法简单的使用seqId来指定查询或加载的数据范围。

既然没有SequenceID,在Cassandra中是否就没有办法了呢?答案显然是否定的,如果只是仅仅支持串行读取,Cassandra早就会被扔进垃圾桶了。

数据分区在Cassandra中至少可以通过两种途径实现 ,一是通过token range,另一个是slice range。这里主要讲解利用token range来实现目的。

1. Token Range

Cassandra将要存储的记录存储在不同的区域中,判断某一记录具体存储在哪个区域的依据是partition key的Hash值。

在Cassandra 1.2之前,组成Cassandra集群的所有节点(Node),都需要手动指定该节点的Hash值范围也就是Token Range。

手工计算Token Range显然是很繁琐,同时也不怎么容易维护,在Cassandra 1.2之后,引进了虚拟节点(vnode)的概念,主要目的是减少不必要的人工指定,同时也将token range的划分变得更为细粒度。比如原先手工指定token range,只能达到10000这样一个精度,而有了vnode之后,默认安装是每一个物理节点上有256个虚拟节点,这样子的话每一个range的范围就是10000/256,这样变的更为精细。

有关token range的信息存储在cassandra的system命名空间(keyspace)下的local和peers两张表中。其中local表示本节点的token range情况,而peers表示集群中其它节点的token range情况。这两张表中的tokens字段就存储有详细的信息。如果集群中只由一台机器组成,那么peers中的就会什么内容都没有。

简单实验,列出本节点的token range:

use system;
desc table local;
select tokens from local;

2. Thrift接口

Token Range告诉我们Cassandra的记录是分片存储的,也就意味着可以分片读取。现在的问题转换成为如何知道每一个Token Range的起止范围。

Cassandra支持的Thrift接口中describe_ring就是用来获取token range的具体起止范围的。我们常用的nodetool工具使用的就是thrift接口,nodetool 中有一个describering指令使用的就是describe_ring原语。

可以做一个简单的实验,利用nodetool来查看某个keyspace的token range具体情况。

nodetool -hcassandra_server_addr describering keyspacename

注意将cassandra_server和keyspacename换成实际的内容。

Spark-Cassandra-Connector

在第一节中讲解了Cassandra中Token Range信息的存储位置,以及可以使用哪些API来获取token range信息。

接下来就分析spark-cassandra-connector是如何以cassandra为数据源将数据加载进内存的。

以简单的查询语句为例,假设用户要从demo这个keyspace的tableX表中加载所有数据,用CQL来表述就是:

select * from demo.tableX

上述的查询使用spark-cassandra-connector来表述就是:

sc.cassandraTable(“demo”,”tableX”)

尽管上述语句没有触发Spark Job的提交,也就是说并不会将数据直正的从Cassandra的tableX表中加载进来,但spark-cassandra-connector还是需要进行一些数据库的操作。要解决的主要问题就是schema相关。

cassandraTable(“demo”,”tableX”)只是说要从tableX中加载数据,并没有告诉connector有哪些字段,每个字段的类型是什么。这些信息对后面使用诸如get[String](“fieldX”)来说却是非常关键的。

为了获取字段类型信息的元数据,需要读取system.schema_columns表,利用如下语句可以得到schema_columns表结构的详细信息:

desc table system.schema_columns

如果在conf/log4j.properties中将日志级别设置为DEBUG,然后再执行sc.cassandraTable语句就可以看到具体的CQL查询语句是什么。

1. CassandraRDDPartitioner

Spark-cassandra-connector添加了一种新的RDD实现,即CassandraRDD。我们知道对于一个Spark RDD来说,非常关键的就是确定getPartitions和compute函数。

getPartitions函数会调用CassandraRDDPartitioner来获取分区数目:

override def getPartitions: Array[Partition] = {
verify // let's fail fast
val tf = TokenFactory.forCassandraPartitioner(cassandraPartitionerClassName)
val partitions = new CassandraRDDPartitioner(connector, tableDef, splitSize)(tf).partitions(where)
logDebug(s"Created total ${partitions.size} partitions for $keyspaceName.$tableName.")
logTrace("Partitions: \n" + partitions.mkString("\n"))
partitions
}

CassandraRDDPartitioner中的partitions的处理逻辑大致如下:

  1. 首先确定token range,使用describe_ring
  2. 然后根据Cassandra中使用的Partitioner来确定某一个token range中可能的记录条数,这么做的原因就是为进一步控制加载的数据,提高并发度。否则并发度就永远是256了,比如有一个物理节点,其中有256个vnodes,也就是256个token分区。如果每个分区中大致的记录数是20000,而每次加载最大只允许1000的话,整个数据就可以分成256x2=512个分区。
  3. 对describeRing返回的token range进一步拆分的话,需要使用splitter,splitter的构建需要根据keyspace中使用了何种Partitioner来决定,Cassandra中默认的Partitioner是Murmur3Partitioner,Murmur3Hash算法可以让Hash值更为均匀的分布到不同节点。
  4. splitter中会利用到配置项spark.cassandra.input.split.size和spark.cassandra.page.row.size,分别表示一个线程最多读取多少记录,另一个表示每次读取多少行。

partitions的源码详见CasssandraRDDParitioner.scala

compute函数就利用确定的token的起止范围来加载内容,这里在理解的时候需要引起注意的就是flatMap是惰性执行的,也就是说只有在真正需要值的时候才会被执行,延迟触发。

数据真正的加载是发生在fetchTokenRange函数,这时使用到的就是Cassandra Java Driver了,平淡无奇。

2. fetchTokenRange

fetcchTokenRange函数使用Cassandra Java Driver提供的API接口来读取数据,利用Java API读取数据一般遵循以下步骤:

val cluster = ClusterBuilder.addContactPoint(“xx.xx.xx.xx”).build
val session = cluster.connect
val stmt = new SimpleStatement(queryCQL)
session.execute(session)
session.close
cluster.close

addContactPoint的参数是cassandra server的ip地址,在后面真正执行cql语句的时候,如果集群有多个节点构成,那么不同的cql就会在不同的节点上执行,自动实现了负载均衡。可以在addContactPoint的参数中设定多个节点的地址,这样可以防止某一节点挂掉,无法获取集群信息的情况发生。

session是线程安全的,在不同的线程使用同一个session是没有问题的,建议针对一个keySpace只使用一个session。

3. RDD中使用Session

在Spark RDD中是无法使用SparkContext的,否则会形成RDD嵌套的现象,因为利用SparkContext很容易构造出RDD,如果在RDD的函数中如map中调用SparkContext创建一个新的RDD,则形成深度嵌套进而导致Spark Job有嵌套。

但在实际的情况下,我们需要根据RDD中的值再去对数据库进行操作,那么有什么办法来打开数据库连接呢?

解决的办法就是直接使用Cassandra Java Driver而不再使用spark-cassandra-connector的高级封装,因为不能像这样子来使用cassandraRDD。

sc.cassandraRDD(“ks”,”tableX”)
.map(x=>sc.cassandraRDD(“ks”,”tableX”).where(filter))

如果是直接使用Cassandra Java Driver,为了避免每个RDD中的iterator都需要打开一个session,那么可以使用foreachPartition函数来进行操作,减少打开的session数。

val  rdd1 = sc.cassandraTable(“keyspace”,”tableX”)
rdd1.foreachPartition( lst => {
val cluster = ClusterBuilder.addContactPoint(“xx.xx.xx.xx”).build
val session = cluster.connect
while ( iter.hasNext ) {
val elem = iter.next
//do something by using session and elem
}
session.close
cluster.close
})

其实最好的办法是在外面建立一个session,然后在不同的partition中使用同一个session,但这种方法不行的原因是在执行的时候会需要”Task not Serializable”的错误,于是只有在foreachPartition函数内部新建session。

数据备份

尽管Cassandra号称可以做到宕机时间为零,但为了谨慎起见,还是需要对数据进行备份。

Cassandra提供了几种备份的方法

  1. 将数据导出成为json格式
  2. 利用copy将数据导出为csv格式
  3. 直接复制sstable文件

导出成为json或csv格式,当表中的记录非常多的时候,这显然不是一个好的选择。于是就只剩下备份sstable文件了。

问题是将sstable存储到哪里呢?放到HDFS当然没有问题,那有没有可能对放到HDFS上的sstable直接进行读取呢,在没有经过任务修改的情况下,这是不行的。

试想一下,sstable的文件会被拆分为多个块而存储到HDFS中,这样会破坏记录的完整性,HDFS在存储的时候并不知道某一block中包含有完成的记录信息。

为了做到记录信息不会被拆分到多个block中,需要根据sstable的格式自行提取信息,并将其存储到HDFS上。这样存储之后的文件就可以被并行访问。

Cassandra中提供了工具sstablesplit来将大的sstable分割成为小的文件。

DataStax的DSE企业版中提供了和Hadoop及Spark的紧密结合,其一个很大的基础就是先将sstable的内容存储到CFS中,大体的思路与刚才提及的应该差不多。

对sstable存储结构的分析是一个研究的热门,可以参考如下的链接。

之所以要研究备份策略是想将对数据的分析部分与业务部分相分离开,避免由于后台的数据分析导致Cassandra集群响应变得缓慢而致前台业务不可用,即将OLTP和OLAP的数据源分离开。

通过近乎实时的数据备份,后台OLAP就可以使用Spark来对数据进行分析和处理。

高级查询 Cassandra+Solr

与传统的RDBMS相比,Cassandra所能提供的查询功能实在是弱的可以,如果想到实现非常复杂的查询功能的,需要将Cassandra和Solr进行结合。

DSE企业版提供了该功能,如果想手工搭建的话,可以参考下面的链接:

  1. http://www.slideshare.net/planetcassandra/an-introduction-to-distributed-search-with-cassandra-and-solr 
  2. https://github.com/Stratio/stratio-cassandra 开源方面的尝试 Cassandra和Lucene的结合

共享SparkContext

SparkContext可以被多个线程使用,这意味着同个Spark Application中的Job可以同时提交到Spark Cluster中,减少了整体的等待时间。

在同一个线程中, Spark只能逐个提交Job,当Job在执行的时候,Driver Application中的提交线程是处于等待状态的。如果Job A没有执行完,Job B就无法提交到集群,就更不要提分配资源真正执行了。

那么如何来减少等待时间呢,比如在读取Cassandra数据的过程中,需要从两个不同的表中读取数据,一种办法就是先读取完成表A与读取表B,总的耗时是两者之和。

如果利用共享SparkContext的技术,在不同的线程中去读取,则耗时只是两者之间的最大值。

在Scala中有多种不同的方式来实现多线程,现仅以Future为例来说明问题:

val ll  = (1 to 3 toList).map(x=>sc.makeRDD(1 to 100000 toList, 3))
val futures = ll.map ( x => Future {
x.count()
})
val fl = Future.sequencce(futures)
Await.result(fl,3600 seconds)
  1. 简要说明一下代码逻辑
  2. 创建三个不同的RDD
  3. 在不同的线程(Future)中通过count函数来提交Job
  4. 使用Await来等待Future执行结束

Token是为数据中心中某一特定节点分配某一范围的数据的依据。

当启动一个Cassandra的集群,必须选择数据在集群中节点是如何分布的。partitioner是根据数据的key来决定这行数据存储在哪个节点上。token是独立与partitioner的。每一个节点都会分配一个token,这个token决定了节点在环中的位置,以及哪些数据会存储在这个节点上。分配给节点的token需要分布在所有的token可能的范围中。每一个token确定的范围是从前一个token开始,顺时针到自己的左开右闭区间。一个简单的例子:如果全部的token范围是0-100,而且集群中有四个节点。那可能每个节点的token为0,25,50,75。这种方法,保证了每个token确定的数据范围大小是一样的。每一个数据中心都应该作为一个独立的环来划分。

Note:集群中的每个节点在第一次启动之前必须设置好token,token的设置在cassandra.yaml配置文件中的initial_token项。

Token生成工具

DataStax提供了一个python脚本,用来自动生成token,token是0到2^127 – 1之间的整数。

具体步骤如下:

  1. 这里下载脚本,名为tokengentool
  2. 修改为可执行,chmod +x tokengentool
  3. 执行生成:./tokengentool <nodes_num_in_dc1> <nodes_num_in_dc2>
  4. 将生成的token分配给每个node的initial_token
token 是Cassandra 集群中十分重要的概念,因为他影响着每个节点所管辖的数据的范围:我们就利用程序来生成token然后分配给每个节点:

我们用下面的代码来生成token:

 
  
  1. #! /usr/bin/python 
  2. import sys 
  3. if (len(sys.argv) > 1): 
  4. num=int(sys.argv[1]) 
  5. else
  6. num=int(raw_input("How many nodes are in your cluster? ")) 
  7. for i in range(0, num): 
  8. print 'token %d: %d' % (i, (i*(2**127)/num)) 

然后我们保存到tokengentool,并且让其有可执行权 (chmod +x tokengentool):

我们运行这段程序,于是产生:

使用Spark+Cassandra打造高性能数据分析平台

这里产生了3个token,我们手动将他们赋给3个节点,通过编辑每个节点的cassandra.yaml文件:

我们把192.168.129.34的节点的initial_token设为token 0的值,192.168.129.35的节点的initial_token设为token 1的值,192.168.129.39的节点的initial_token设为token 2的值

再把3个节点都重启就可以了




为一个数据中心生成token

当我们只有一个数据中心的时候,使用RadomPartitioner,输入节点的数量即可。例如,我们有6个节点:

./tokengentool 6

然后会有如下的结果:

{
"0": {
"0": 0,
"1": 28356863910078205288614550619314017621,
"2": 56713727820156410577229101238628035242,
"3": 85070591730234615865843651857942052864,
"4": 113427455640312821154458202477256070485,
"5": 141784319550391026443072753096570088106
}
}

大家可以发现,基本是按照平均计算的

在一个数据中心,多机架的情况下,计算token

如果在一个数据中心中,多个机架,和之前一样,输入节点的数量,生成token。然后以交替的顺序分配给不同机架的节点。例如rack1,rack2,rack3,rack1,rack2,rack3等,作为一个最佳实践,每个机架上的服务器数量应该相同,这样可以均匀的交替分配。举例如下:

./tokengentool 8

具体分配如下图:

使用Spark+Cassandra打造高性能数据分析平台

为多数据中心生成token

在多数据中心部署的情况下,数据在每一个数据中心的存储策略需要根据NetworkTopologyStrategy。这个策略在不同的数据中心中独立决定数据存储的方式。第一个复本的存储会根据partitioner找到相应的节点,剩下的节点的选择是顺时针沿着环走,直到遇到一个和前面一个复本在不同机架的节点为止,这个节点就是要找的存储节点。如果这样的节点不存在,那么所有的复本都存在同一个机架上面了。具体可以详细参考NetworkTopologyStrategy。

当计算多数据中心的token的时候,可以有很多不同的方法。每一个数据中心内的节点都管理这等量的数据,这是重要的。而数据中心内的节点分布却不是那么重要。

交替的分配Token

使用tokengentool计算出token,并且将token交替的分配给不同数据中心的节点。计算方法如下,两个数据中心,每个三个节点:

./tokengentool 3 3

结果如下:

{
"0": {
"0": 0,
"1": 56713727820156410577229101238628035242,
"2": 113427455640312821154458202477256070485
},
"1": {
"0": 28356863910078205288614550619314017621,
"1": 85070591730234615865843651857942052863,
"2": 141784319550391026443072753096570088106
}
}

下图阐述了交替分配的结果:

使用Spark+Cassandra打造高性能数据分析平台

 

避免token冲突

避免token冲突的方法是将计算出来的token值做一些偏移,尽管你可以只增加1,但最好还是增加的大一些,比如100,这样可以有空间替换宕掉的节点。

下面的例子是两个具有三个节点的数据中心和一个有两个节点的数据中心:

./tokengentool 3

   {
"0": {
"0": 0,
"1": 56713727820156410577229101238628035242,
"2": 113427455640312821154458202477256070485
}
}

./tokentool 2

{
"0": {
"0": 0,
"1": 85070591730234615865843651857942052864
}
}

具体分配的图如下:

使用Spark+Cassandra打造高性能数据分析平台

要在生产系统中运维,则数据如何分布不得不做周详细致的考虑。

将Cassandra用于实际的生成环境,一个必须要考虑的关键问题是Token的选择。Token决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环,所以第一个节点保存的是大于最大Token小于等于最小Token之间的数据。

根据采用的分区策略的不同,Token的类型和设置原则也有所不同。 Cassandra (0.6版本)本身支持三种分区策略:

RandomPartitioner:随机分区是一种hash分区策略,使用的Token是大整数型(BigInteger),范围为0~2^127,因此极端情况下,一个采用随机分区策略的Cassandra集群的节点可以达到2^127+1个节点。嗯,为什么是2^127?因为Cassandra采用了MD5作为hash函数,其结果是128位的整数值(其中一位是符号位,Token取绝对值为结果)。采用随机分区策略的集群无法支持针对Key的范围查询。假如集群有N个节点,每个节点的hash空间采取平均分布的话,那么第i个节点的Token可以设置为:

 i * ( 2 ^ 127 / N )

下面的测试程序是从org.apache.cassandra.utils.FBUtilities类抽取出来的计算MD5值的函数,输入任何字符都可以得到其对应的MD5的整数值,利用该值和节点的Token对比即可知道该Key对应的数据归属于哪个节点:

import java.io.*;
import java.util.*;
import java.math.BigInteger;
import java.security.MessageDigest;

class get_md5{
static final Scanner cin=new Scanner(System.in);

public static byte[] hash(String type, byte[]... data){
byte[] result = null;
try{
MessageDigest messageDigest = MessageDigest.getInstance(type);
for(byte[] block : data)
messageDigest.update(block);
result = messageDigest.digest();
}
catch (Exception e){
throw new RuntimeException(e);
}
return result;
}

public static BigInteger hash(String data){
byte[] result = hash("MD5", data.getBytes());
BigInteger hash = new BigInteger(result);
return hash.abs();
}

public static void main(String[] args){
while(cin.hasNext()){
String str1=cin.next();
BigInteger a= hash(str1);
System.out.println(a);
}
}
}
D:>java get_md5
ningoo
100335222541762605209205022078301814192
江枫
48295316926871024838894171432474082043

OrderPreservingPartitioner:如果要支持针对Key的范围查询,那么可以选择这种有序分区策略。该策略采用的是字符串类型的Token。每个节点的具体选择需要根据Key的情况来确定。如果没有指定InitialToken,则系统会使用一个长度为16的随机字符串作为Token,字符串包含大小写字符和数字。

CollatingOrderPreservingPartitioner:和OrderPreservingPartitioner一样是有序分区策略。只是排序的方式不一样,采用的是字节型Token,支持设置不同语言环境的排序方式,代码中默认是en_US。

分区策略和每个节点的Token(Initial Token)都可以在storage-conf.xml配置文件中设置:

<Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
<InitialToken>10633823966279300000000000000000000000</InitialToken>

节点初始化完成以后,Token值做为元数据会保留在system keyspace中,每次启动会以该值为准,即使再改动配置文件中的InitialToken也不会产生任何影响。

Saved Token found: 10633823966279300000000000000000000000

通过nodetool的ring命令,可以查看集群各个节点的Token,这些Token值最好备份下来,当出现节点彻底顺坏时,可以重新设置同样的Token,确保数据分布可以不受节点损坏的影响。

nodetool -h test ring
Address Status Load Range Ring
85070591730234600000000000000000000000
192.168.0.1 Up 0 bytes 10633823966279300000000000000000000000 |<--|
192.168.0.2 Up 0 bytes 85070591730234600000000000000000000000 |-->|

PS: 在我的0.6.2的一个测试集群中,使用nodetool时不小心连到了9160端口,结果每次都会把节点搞挂,百试百灵。而且直接telnet到9160端口,随便发送个字符,也会把节点搞崩溃。不知道是我的测试环境的原因,还是Thrift有bug,这样节点的健壮性就有问题了,这个端口只能接受协议格式内的信息。对Java和Thrift都不太了解,把这个问题抛出来,希望有大牛能帮忙找到原因。

Update:之前贴的nodetool错连9160端口的报错可能有点误导大家,因为jmx用的默认的8080端口,连9160端口jmx报错是正常的,问题是节点不应该崩溃的。看了/var/log/cassandra/system.log中记录的节点错误信息,报的是OOM,Cassandra的java进程都消失了。调整了一下jvm参数,将heap的最小内存从默认的256MB设置到1G(-Xms1G),还是有同样的问题。另外,我的java环境是jre1.6.0_18。

ERROR [pool-1-thread-1] 2010-06-12 16:49:40,459 CassandraDaemon.java (line 78)
Fatal exception in thread Thread[pool-1-thread-1,5,main]
java.lang.OutOfMemoryError: Java heap space
at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:203)
at org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:1113)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:253)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Google了一把这个错误,也有人碰到过,并且发现Thrift确实有类似的bug: