使用DataStax Java驱动程序的最佳实践

时间:2021-12-31 09:23:51

引言

如果您想开始建立自己的基于Cassandra的Java程序,欢迎!

也许您已经参加过我们精彩的DataStax Academy课程或开发者大会,又或者仔细阅读过Cassandra Java驱动的文档。相比示例程序,现在是时候步入真实世界、处理实际案例了。

那么问题来了:Java驱动提供了各种设置选项,在开始使用时我们需要知道哪些一般准则,以及从什么最佳实践入手,从而轻松构建一个在生产环境中有复原力的、实时的、高性能应用呢?

这是个好问题!基于大量和您类似的Datastax客户中所累积的经验,本文档将列出一些基本的准则。

在DataStax Java 驱动中有很多的旋钮和拉杆,并且每一个都有其存在的理由。但是绝大部分的用户不需要深入了解那些更深奥高阶的功能,所以这篇文档将会重点介绍大部分应用程序会遇到的一般情况。在这个过程中,我们会再详细介绍提到的高级选项,但是他们确实不是一般情况。 现在就让我们先关注一下主要情况吧。

在本文开始前,我们默认您已经对这些内容有所了解:客户端与服务端架构的应用程序、Cassandra基础,还有Datastax Java驱动的主要元素,例如Sessions(会话)、Statements(语句)、Results sets(结果集)、Rows(行)。附录中提供了一些帮助您熟悉这些概念的资源。

最佳实践

这一部分介绍了使用DataStax Java驱动创建一个Java程序的最佳实践方法。部分最佳实践的方法也可以运用到其它的开发语言中(及其关联的DataStax驱动),但是在本文中我们将会专注于Java驱动。我们将最佳实践的方法分为4组:总体指导、建立连接、查询语句和查询结果。

总体指导

  • 当今最佳的实践方法是使用最新的DataStax Java驱动,即版本4.x。虽然3.x版本的驱动仍然可用,且在必要情况下依然是个不错的选择,本文会把重点放在4.x的驱动。

  • 使用您正在使用的Datastax Java驱动主要版本(比如4.x)中最新的细分版本(比如截止2020年8月,4.x中的4.9.0或3.x中的3.10.0)。

  • 多数据中心部署的情况下,我们建议一个应用实例仅与单独一个数据中心挂钩使用。换句话说,如果您要将您的应用部署到多个地区,您应该部署多个应用,每个地区分别对应一个应用。每一个应用都应将驱动程序连接到每一个当地的数据中心(如下图所示),并且应使用Global Load Balancer(全局负载平衡器)在多个应用实例中引导流量。

    • 如果Cassandra在一个地区的数据中心无法访问(如脱机以进行定期维护),Global Load Balancer会将流量导到其它数据区域中的应用程序实例。

  • 还有一个最佳的实践方法是使用与数据库查询有关的指标来对应用程序进行监测。这个会非常有助于测试应用程序的查询性能。更多关于驱动程序监测的指标请参考这里。在默认情况下所有的指标收集功能都是处于禁用状态。如果您需要它们,您需要在建立连接的时候启用。详见下面信息。

    • 如果一个监测指标处于启用状态,您可以通过Session#getMetrics()方法获得指标内容。

建立与数据库的连接

用于建立与Cassandra的连接的CqlSession对象,可以通过很多不同的方式进行配置,包括通过配置文件或者编程方式建立连接——我们建议使用配置文件。当我们与Cassandra创建连接时,以下是一些最重要的考虑因素:

  • 请使用单独的CqlSession:一个应用程序只使用一个单独的CqlSession对象来连接到数据库。在一些更复杂的情况下,有可能会有多个Class,每一个Class 都需要连接到一个CqlSession。在这种情况下,我们希望它们能使用同一个CqlSession,因此最好将其创建为单例模式。一个比较常见的方法是使用依赖注入框架,例如Spring。

  • 在application.conf里设定选项:将所有非默认的选项在jar包里的application.conf文件中进行定义。这个设置文件根据类型安全配置框架(Typesafe Config framework)设置参数。reference.conf文件包括了所有的默认值,同时由于application.conf是基于它衍生出来的,因此您只需在applicaiton.conf里显式地指出任何想要复写的值。

  • 遵循您安全性的最佳做法,同时使用适当的身份验证和SSL选项。更多关于身份验证SSL加密内容请参照DataStax Java驱动的文档。

  • 在创建连接的时使用多个接触点(Contact Points)。这样的话,您的应用即使在单个(或多个节点)脱机时仍可与数据库建立连接。特意选择(或避免)种子节点并没有任何益处。

datastax-java-driver.basic.contact-points = ["127.0.0.1:9042","localhost:9042"]
    • 只提供一个数据中心的接触点,这个数据中心会被设为本地的数据中心(如下所示)。

    • 没必要将一个数据中心所有的节点都设置为接触点。当驱动程序建立初始连接后,它会发现集群中所有剩下的节点,并通过负载平衡策略与这些节点建立直接的连接。

    • 当创建CqlSession时,您可以通过编程的方式设置连接点,这会复写application.conf中的参数。

CqlSession.builder().addContactPoint(new InetSocketAddress("127.0.0.1", 9042));
  • 使用默认的负载平衡策略:当建立连接时,使用默认参数DefaultLoadBalancingPolicy。此负载平衡策略会更有效且相对平衡地调用节点进行查询。

  • 当建立连接时,显式地指定使用本地的数据中心。
datastax-java-driver.basic.load-balancing-policy.local-datacenter = dc1
    • 当创建CqlSession时,您也可以通过编程的方式设置本地数据中心,这会复写application.conf中的参数。

CqlSession.builder().withLocalDatacenter("dc1")
  • 显式地将一致性级别设定为LOCAL_QUORUM并将默认串行一致性设为LOCAL_SERIAL。不然,默认的一致性是LOCAL_ONE,默认串行一致性级别为SERIAL,这些通常都不建议使用。

datastax-java-driver.basic.request.consistency = LOCAL_QUORUM
datastax-java-driver.basic.request.serial-consistency = LOCAL_SERIAL
  • 不要将默认查询幂等性设置为true。其默认值为false,请保留它。设置为true很危险,因为一些操作是通过幂等查询自动完成的,但事实上不是所有操作都是幂等的。因此,请针对每次查询显式明确地设置幂等查询操作。

  • 请在advanced.metrics中启用合适的指标,从而启用指标收集功能。因为没有一个“启用所有指标”的选项,您必须明确指出您需要启动的每一个指标。

    • advanced.metrics.session指定会话级(session-level)指标。下面列出了一些您可能感兴趣的advanced.metrics.session.enabled中的指标:

advanced.metrics.session.enabled = [bytes-sent, bytes-received, connected-nodes, cql-requests, cql-client-timeouts, cql-prepared-cache-size, throttling.delay, throttling.queue-size, throttling.errors]
    • advanced.metrics.node 指定了节点级(node-level)指标,下面列出了一些您可能感兴趣advanced.metrics.node.enabled:中的指标:

advanced.metrics.node.enabled = [pool.open-connections, pool.available-streams, pool.in-flight, pool.orphaned-streams, bytes-sent, bytes-received, cql-messages, errors.request.unsent, errors.request.aborted, errors.request.write-timeouts, errors.request.read-timeouts, errors.request.unavailables, errors.request.others, retries.total, retries.aborted, retries.read-timeout, retries.write-timeout, retries.unavailable, retries.other, ignores.total, ignores.aborted, ignores.read-timeout, ignores.write-timeout, ignores.unavailable, ignores.other, errors.connection.init, errors.connection.auth]

执行查询

Cassandra的查询会先创建一个Statement对象,再通过CqlSession对象执行。以下有很多种Statement种类:

  • 简易语句(SimpleStatement): 由CQL字符串或查询生成器(Query Builder)创建。

  • 参数化查询语句(PreparedStatement): 可以在构建一次后被重复使用多次的语句,对于有不同参数的常见查询而言,此种语句具有实用、高性能且更安全的优势。

  • 绑定语句(BoundStatement): 用于对PrepareStatement查询的单次调用,并允许用户绑定只适用于此次调用的参数。

  • 批处理语句(BatchStatement): 封装多个简易语句或者绑定语句,并批量执行。

当执行查询语句时,这里有一些需要注意的重要事项:

  • 请注意在DataStax Java驱动程序4.x的版本中,所有的Statements都是不可变的(immutable)。因此如果想要重新设置Statement的选项的话,您必须为这个值重新分配一个引用。

// 不正确 - bound2 的值并没有改变。
bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
// 正确 - bound2 的值发生了改变。
bound2 = bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
  • 避免以编程方式发出DDL语句,除非可以确保它们不会被应用程序中的多个实例并发执行。
    • 当集群中的节点对模式(Schema)产生分歧时,可能会因Cassandra分布式的特性出现问题。

    • 使用DDL语句时,应确保模式协议(schema agreement)达成一致。您可以通过调用Session#getExecutionInfo()方法获取ExecutionInfo, 之后调用ExecutionInfo#isSchemaInAgreement()方法。如果返回True,那您就可以确保所有节点达成一致。

    • 您也可以在任何时候通过调用Session#checkSchemaAgreement()的方法判断所有节点的模式是否一致。就好比,如果上述的方法返回了False,则可以进入一个循环直到Session#checkSchemaAgreement()返回为真(在发生异常之前会有一些超时响应或者计数器)。

  • 不要将CQL作为字符串执行(如通过CqlSession#execute(String)),而是为CQL字符串创建一个Simple Statement以便设置Statement的选项,如一致性级别或者幂等性。

SimpleStatement stmt1 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (1,2)");
stmt1 = stmt1.setIdempotent(true);
  • Java驱动程序的 QueryBuilder(查询生成器)是另一个很棒的通过编程创建CQL查询语句的方式。 这也比与用编程方式构建CQL字符串想法的动态CQL更可取。

SimpleStatement read = QueryBuilder.selectFrom(ks, tbl)
.columns("pkey", "x").build();
  • 通过CqlSession#prepare()准备所有可以重复使用的语句。

SimpleStatement stmt3 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (:pkey,:x)");
PreparedStatement prepared2 = session.prepare(stmt3);
BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);
    • 您也可以通过按位置而不是名字来绑定变量,但是当被执行的查询语句发生变化,使用位置绑定变量会更容易出错,因为这就意味着您需要更注意绑定变量的位置。

SimpleStatement stmt2 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (?,?)");
PreparedStatement prepared = session.prepare(stmt2);
BoundStatement bound1 = prepared.bind();
bound1 = bound1.setInt(0, 10);
bound1 = bound1.setInt(1, 20);
    • 当值绑定到准备好的语句时,请使用“unset”(未设置)选项来避免任何插入NULL值的可能性。或者,您可以仅将不为空的值与变量进行绑定,因为未绑定的绑定标记(bind marker)同样会导致该列被设定为“unset”。

BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);
bound2 = bound2.unset(1); // 为了展示需要
  • 如果一个CQL查询确实是幂等的,请通过Statement#setIdempotent()来设置幂等的选项。这是为了能够自动重复执行陈述语句(如:自动重试或者推测执行)。作为安全预防措施,驱动程序不会自动重新处理任何幂等设置为false(否)的语句。

bound2 = bound2.setIdempotent(true);
  • 谨慎使用Batch Statement(批处理语句),因为Cassandra集群的协调节点(coordinator)负责在内部处理这些查询,发出多个查询会增加协调节点的负荷。

    • 批处理语句的数量应该保持相对较少。 如小于20条语句。

    • 应只在用例需要时才使用logged batches(记入日志的批处理语句), 因为它们会占用更多资源。

    • 更多细节请查询这里

BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
BoundStatement bound3 = prepared.bind(1000,2000);
BoundStatement bound4 = prepared.bind(10000,20000);
batchBuilder.addStatement(bound3);
batchBuilder.addStatement(bound4);
BatchStatement batch = batchBuilder.build();
  • 我们应该避免Light weight transactions(轻量级事务),因为它们的性能相较于一般操作会慢很多。一般情况下,会有其它可以避免Light weight transactions的方式来建构应用程序。总之,轻量级事务的存在是有原因的,但请合理使用。

    • 轻量级事务和一般事务的查询语句一样。

    • 请确保将默认的SERIAL的串行一致性级别复写为LOCAL_SERIAL。

处理查询和结果

根据查询被执行的方式,查询结果会通过以下几种方式被返回:

  • Synchronous execution(同步执行):返回一个可以被同步处理的ResultSet

ResultSet resultSet = session.execute(read);
  • Asynchronous execution(异步执行):返回一个可以通过CompletionStage API 异步处理查询的CompletionStage<AsyncResultSet>。

CompletionStage<AsyncResultSet> asyncResult = session.executeAsync(bound1);
  • Reactive execution(响应式执行):返回一个ReactiveResultSet,它可以通过Reactive Labraries (响应库)以响应式的方法处理结果。

ReactiveResultSet reactiveResult = session.executeReactive(bound2);

最常见的处理方式排列序可能为Synchronous > Reactive > Asynchronous.

以下是一些处理查询结果的最佳实践方法:

  • 养成在CQL查询中指定所有需要被返回的列的习惯。比如但凡可以请避免使用 "SELECT * FROM …"这样的查询语句。如果表的结构将来发生变化的话,这样做会有帮助。

  • 像其它数据库一样,请注意像“SELECT * FROM ks.tbl”这样的简单语句会返回数据库中的所有数据。Cassandra是为储存海量数据而设计的,这种请求很可能会返回一个含有巨量数据的结果。尽管有时候确实需要进行全表扫描,但也请尽量避免这种情况。

    • 如果确实需要全表扫描,利用好的分布式计算方法,或者像Spark这样的分布式计算框架是一个不错的选择。DataStax为此开发了Spark Cassandra Connector (Cassandra Spark连接器),实现了数据扫描的最佳实践。

  • 当处理多个语句时,利用异步执行(asynchronous execution)同时处理多条查询。

    • 当加载数据时,这个方式可以大量缩短处理时间。

    • 当从多张表格获取数据时,这可以减少总体延迟。

    • 确保所有异步操作已被处理并成功执行。

    • 更多详情请参考这里

  • 对于INSERT、UPDATE或DELETE的操作,请确保捕获异常以确保操作成功。

    • 如果一个查询因为查询超时失败,请尝试重新尝试查询,因为Cassandra分布式的特性应该可以成功。如果一个查询超时,它会抛出一个异常(如 QueryExecutionException),您可以使用try-catch结构捕获这个异常。

  • 一般情况下,使用驱动程序默认的分页方法,这种方法在一页读取完毕时将自动获取下一页。

    • 响应式处理也可以自动处理分页功能。

    • 通过CompletionStage API使用自动分页。而使用AsynchronousResultSet这种手动处理API则需要手动分页。

  • 避免调用 ResultSet#all(),因为这会将所有结果导入一个在内存中的列表(List),这样会影响性能或产生内存错误

    • 与之相对,可以利用the ResultSet#iterator()方法迭代取出返回结果的每一行。

for (Row row : resultSet) {
System.out.println("pkey: " + row.getInt("pkey"));
}

总结

本文包含了许多适应大多数用例且最常见的最佳实践。但是在某些情况下,使用其它不同的设置或方法也许确实是得当的,所以这篇文章应该只作为一个起点。我们建议可以从这些设置和实践中入手,开始建立一个可用的应用程序。如果您发现应用程序没有如预期一样运行,可以考虑参考DataStax Java驱动程序文章(或者其它资源)来微调您的设置。

有了这些最佳实践作为您的起点,您正走在通往顺利搭建以Cassandra作为后台的应用程序的光明大道上。请享受这个过程吧!

附件 1 - 有用的链接

附件 2 - 代码案例

以下代码也可以在以下Github repository链接。 https://github.com/DataStax-Examples/ex_bestpractices

附件 2.1 - application.conf

datastax-java-driver {
# Contact Points
basic.contact-points = ["127.0.0.1:9042","localhost:9042"] # Local Data Center
basic.load-balancing-policy.local-datacenter = dc1 # Default Consistency Level
basic.request.consistency = LOCAL_QUORUM
basic.request.serial-consistency = LOCAL_SERIAL # Metrics
advanced.metrics {
# The session-level metrics
session {
enabled = [
bytes-sent, bytes-received,
connected-nodes,
cql-requests, cql-client-timeouts, cql-prepared-cache-size,
throttling.delay, throttling.queue-size, throttling.errors,
]
}
# The node-level metrics.
node {
enabled = [
pool.open-connections, pool.available-streams, pool.in-flight, pool.orphaned-streams,
bytes-sent, bytes-received,
cql-messages,
errors.request.unsent, errors.request.aborted, errors.request.write-timeouts, errors.request.read-timeouts, errors.request.unavailables, errors.request.others,
retries.total, retries.aborted, retries.read-timeout, retries.write-timeout, retries.unavailable, retries.other,
ignores.total, ignores.aborted, ignores.read-timeout, ignores.write-timeout, ignores.unavailable, ignores.other,
errors.connection.init, errors.connection.auth
]
}
}
}

附件 2.2 - SampleApplication.java

package com.datastax.example.bestpractices;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException;
import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import reactor.core.publisher.Flux;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter; import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage; public class SampleApplication {
public static void main(String[] args) {
final String dc = "dc1";
final String ks = "ks";
final String tbl = "tbl";
final int ddlRetries = 3;
final int ddlRetrySleepMs = 500;
// Create CqlSession
CqlSession session = CqlSession.builder()
// .withAuthCredentials("cass_user", "choose_a_better_password") // redundant with application.conf, but showing for demonstration
// .addContactPoint(new InetSocketAddress("127.0.0.1", 9042)) // redundant with application.conf, but showing for demonstration
.withLocalDatacenter("dc1")
.build(); // Set up driver metrics and expose via JMX (as an example)
MetricRegistry registry = session.getMetrics()
.orElseThrow(() -> new IllegalStateException("Metrics are disabled"))
.getRegistry(); JmxReporter reporter =
JmxReporter.forRegistry(registry)
.inDomain("com.datastax.oss.driver")
.build();
reporter.start(); // Create Keyspace
SimpleStatement createKeyspace = SimpleStatement.newInstance("CREATE KEYSPACE IF NOT EXISTS "+ks
+" WITH replication = {'class': 'NetworkTopologyStrategy', '" + dc + "': '1'}");
// Synchronous
ResultSet ddl = null;
try {
ddl = session.execute(createKeyspace);
}
catch (Exception e) {
throw new RuntimeException("Error creating keyspace");
}
if (!ddl.getExecutionInfo().isSchemaInAgreement()) {
int retries = 0;
while ((retries < ddlRetries) && session.checkSchemaAgreement()) {
try {
Thread.sleep(ddlRetrySleepMs);
}
catch (InterruptedException ie) {
throw new RuntimeException("Interrupted while waiting for schema agreement");
}
retries++;
}
} // Create Table (using helper method)
SimpleStatement createTable = SimpleStatement.newInstance("CREATE TABLE IF NOT EXISTS "+ks+"."+tbl
+"(pkey INT, x INT, PRIMARY KEY ((pkey)))");
if (!executeDdl(session, createTable, ddlRetries, ddlRetrySleepMs)) {
System.err.println("Error creating table");
System.exit(1);
} // Insert some data
// Simple
SimpleStatement stmt1 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (1,2)");
stmt1 = stmt1.setIdempotent(true);
try {
session.execute(stmt1);
}
catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
// Handle query failure
throw new RuntimeException("Error inserting first data");
} // Prepared
// Using positional bind markers
SimpleStatement stmt2 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (?,?)");
PreparedStatement prepared = session.prepare(stmt2);
BoundStatement bound1 = prepared.bind();
bound1 = bound1.setInt(0, 10);
bound1 = bound1.setInt(1, 20);
// Async
try {
CompletionStage<AsyncResultSet> asyncResult = session.executeAsync(bound1);
asyncResult.toCompletableFuture().get();
}
catch (Exception e) {
// process exception
} // Reactive
// Using named bind markers instead of positional
SimpleStatement stmt3 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (:pkey,:x)");
PreparedStatement prepared2 = session.prepare(stmt3);
BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);
bound2 = bound2.unset(1); // Showing for demonstration
bound2 = bound2.setIdempotent(true); // set idempotency
bound2 = bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
ReactiveResultSet reactiveResult = session.executeReactive(bound2);
Flux.from(reactiveResult).blockLast(); // Batch
BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
BoundStatement bound3 = prepared.bind(1000,2000);
BoundStatement bound4 = prepared2.bind().setInt("pkey",10000).setInt("x", 20000);
batchBuilder.addStatement(bound3);
batchBuilder.addStatement(bound4);
BatchStatement batch = batchBuilder.build();
try {
session.execute(batch);
}
catch (QueryExecutionException qee) {
// Handle query timeout - let's retry it
try {
session.execute(batch);
}
catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
// Handle query failure
throw new RuntimeException("Error second try inserting data");
} }
catch (QueryValidationException | AllNodesFailedException ex) {
// Handle query failure
throw new RuntimeException("Error inserting data");
} // Query Builder
SimpleStatement read = QueryBuilder.selectFrom(ks, tbl)
.columns("pkey", "x").build();
// Using a helper method to execute DML
ResultSet resultSet = executeDml(session, read, "Error reading data");
if (null != resultSet) {
for (Row row : resultSet) {
System.out.println("pkey: " + row.getInt("pkey") + ", x: " + row.getInt("x"));
}
} // Cleanup
session.close();
} private static boolean executeDdl(CqlSession session, Statement ddlStatement, int ddlRetries, int ddlRetrySleepMs) {
ResultSet ddl = null;
try {
ddl = session.execute(ddlStatement);
}
catch (Exception e) {
throw new RuntimeException("Exception while executing DDL (" + ddlStatement + ")");
}
if (!ddl.getExecutionInfo().isSchemaInAgreement()) {
int retries = 0;
while ((retries < ddlRetries) && session.checkSchemaAgreement()) {
try {
Thread.sleep(ddlRetrySleepMs);
}
catch (InterruptedException ie) {
throw new RuntimeException("Interrupted while waiting for schema agreement");
}
retries++;
}
}
return true;
} private static ResultSet executeDml(CqlSession session, Statement query, String errorString) {
ResultSet resultSet = null;
try {
resultSet = session.execute(query);
}
catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
// Handle query failure
throw new RuntimeException(errorString);
}
return resultSet;
}
}