Vetx.x : EventLoop线程不要锁(Synchronized/ReentrantLock)
vert.x 里面有eventloop线程,而且eventloop线程是绝对不能阻塞的,
但是实际使用中,有时候我们对阻塞理解不深刻,所以一些隐藏的阻塞
没有那么好发现。
eventloop 线程和work线程共用相同的锁
下面的代码是一个简单的例子,用户请求localhost:8133/sync的时候
会在work现场锁sb对象,然后在eventloop线程也要获取sb这个对象的锁。
那么,我们发现,这样eventloop就会超时,可以看下日志。
package concurrency.file.vertx;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
/** * work 线程中有锁 * * @author kyle */
public class SynchronizedTest {
public static class Inner extends AbstractVerticle {
StringBuilder sb = new StringBuilder();
public void write(String str) {
synchronized (sb) {
sb.delete(0, sb.length());
sb.append(str);
}
}
@Override
public void start(Future<Void> future) throws Exception {
Router route = Router.router(vertx);
route.route().handler(BodyHandler.create());
route.route("/sync").handler(context1 -> {
vertx.executeBlocking(f -> {
foo();
}, f -> {
});
vertx.setTimer(1, f -> {
write(context1.getBodyAsString());
context1.response().end();
});
});
vertx.createHttpServer()
.requestHandler(route::accept)
.listen(8133, ar -> {
if (ar.failed()) future.fail(ar.cause());
else future.complete();
});
}
private void foo() {
synchronized (sb) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(20_000_000);
Vertx vertx = Vertx.vertx(vertxOptions);
vertx.deployVerticle(new Inner(), ar -> {
for (int i = 0; i < 50; i++) {
vertx.createHttpClient()
.getNow(8133, "localhost", "/sync", result -> {
System.out.println("finish");
});
}
});
}
}
日志如下
这是由于两个线程同时抢夺一个锁,如果eventloop线程拿不到锁,它就会阻塞。
仅eventloop中有锁
下面是上面的代码的修改,去掉了work线程对锁的把持。但是这样写代码依然有问题,如果多个eventloop线程
抢夺一个锁,必定会造成其他的eventloop线程的阻塞。所以也不要使用这种写法
package concurrency.file.vertx;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
/** * work 线程中有锁 * * @author kyle */
public class SynchronizedTest {
public static class Inner extends AbstractVerticle {
StringBuilder sb = new StringBuilder();
public void write(String str) {
synchronized (sb) {
sb.delete(0, sb.length());
sb.append(str);
}
}
@Override
public void start(Future<Void> future) throws Exception {
Router route = Router.router(vertx);
route.route().handler(BodyHandler.create());
route.route("/sync").handler(context1 -> {
write(context1.getBodyAsString());
context1.response().end();
});
vertx.createHttpServer()
.requestHandler(route::accept)
.listen(8133, ar -> {
if (ar.failed()) future.fail(ar.cause());
else future.complete();
});
}
}
public static void main(String[] args) {
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(20_000_000);
Vertx vertx = Vertx.vertx(vertxOptions);
vertx.deployVerticle(new Inner(), ar -> {
for (int i = 0; i < 50; i++) {
vertx.createHttpClient()
.getNow(8133, "localhost", "/sync", result -> {
System.out.println("finish");
});
}
});
}
}
由于这种现象需要高并发的情况下,才容易观察出问题,所以这种写法,经常会被我们忽略。
看不到的synchronize
看下面的一段代码,是不是觉得没有用到锁,不会阻塞线 。其实不是的,由于Files.write底层仍然使用了synchronized 锁,
所以也会阻塞 eventloop线程,这种情况下,我们很难发现和察觉。
package concurrency.file.vertx;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
/** * work 线程中有锁 * * @author kyle */
public class SynchronizedTest {
public static class Inner extends AbstractVerticle {
public void write(String str) {
Files.write(Path.get("temp"), str.getBytes());
}
@Override
public void start(Future<Void> future) throws Exception {
Router route = Router.router(vertx);
route.route().handler(BodyHandler.create());
route.route("/sync").handler(context1 -> {
write(context1.getBodyAsString());
context1.response().end();
});
vertx.createHttpServer()
.requestHandler(route::accept)
.listen(8133, ar -> {
if (ar.failed()) future.fail(ar.cause());
else future.complete();
});
}
}
public static void main(String[] args) {
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(20_000_000);
Vertx vertx = Vertx.vertx(vertxOptions);
vertx.deployVerticle(new Inner(), ar -> {
for (int i = 0; i < 50; i++) {
vertx.createHttpClient()
.getNow(8133, "localhost", "/sync", result -> {
System.out.println("finish");
});
}
});
}
}
不用锁,我该怎么办
我个人觉得,在Eventloop线程中是绝对不能用锁的,各种锁都不行,比如synchronize ,reentrantLock, CountdownLatch, CycleBarrier等等。那我们如何处理同步问题呢。我的一个思路是用单线程池,不用加锁,天然保持线程的同步。参考列子,如下。
package concurrency.file.vertx;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** * work 线程中有锁 * * @author kyle */
public class SynchronizedTest {
public static class Inner extends AbstractVerticle {
ExecutorService executorService = Executors.newSingleThreadExecutor();
StringBuilder sb = new StringBuilder();
public Future<Void> write(String str) {
Future<Void> future = Future.future();
executorService.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sb.delete(0, sb.length());
sb.append(str);
});
return future;
}
@Override
public void start(Future<Void> future) throws Exception {
Router route = Router.router(vertx);
route.route().handler(BodyHandler.create());
route.route("/sync").handler(context1 -> {
write(context1.getBodyAsString());
context1.response().end();
});
vertx.createHttpServer()
.requestHandler(route::accept)
.listen(8133, ar -> {
if (ar.failed()) future.fail(ar.cause());
else future.complete();
});
}
}
public static void main(String[] args) {
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(20_000_000);
Vertx vertx = Vertx.vertx(vertxOptions);
vertx.deployVerticle(new Inner(), ar -> {
for (int i = 0; i < 50; i++) {
vertx.createHttpClient()
.getNow(8133, "localhost", "/sync", result -> {
System.out.println("finish");
});
}
});
}
}