如何在Vert.x中使用Jackson流式传输JSON结果(java)

时间:2022-03-16 18:02:04

I'm making a REST api for my Java database-like service using Vert.x. It's not too dificult to write the JSON result as a String to the request's stream, as shown below:

我正在使用Vert.x为我的类似Java数据库的服务制作REST API。将JSON结果作为String写入请求的流并不是太困难,如下所示:

...
routeMatcher.get("/myservice/api/v1/query/:query", req -> {

    // get query
    String querySring = req.params().get("query");           
    Query query = jsonMapper.readValue(querySring, Query.class);

    // my service creates a list of resulting records...
    List<Record> result = myservice.query(query);                
    String jsonResult = jsonMapper.writeValueAsString(result);

    // write entire string to response
    req.response().headers().set("Content-Type", "application/json; charset=UTF-8");
    req.response().end(jsonResult);    
});
...

However I'd like to stream the Java List to the request object by using Jackson's method:

但是,我想使用Jackson的方法将Java List流式传输到请求对象:

ObjectMapper objectMapper = new ObjectMapper();
objectMapper.writeValue(Outputstream, result);

But I don't know how to connect Jackson's Outputstream argument to Vert.x's re.response(), as they have their own Buffer system that seems incompatible with Jackson's java.io.Outputstream argument.

但我不知道如何将Jackson的Outputstream参数连接到Vert.x的re.response(),因为它们有自己的Buffer系统,似乎与Jackson的java.io.Outputstream参数不兼容。

Can't I use Jackson in combination with Vert.x? Should I write a custom serializer by hand with Vert.x's own JSON library? Other suggestions?

我不能将Jackson与Vert.x结合使用吗?我应该使用Vert.x自己的JSON库手动编写自定义序列化程序吗?其他建议?

2 个解决方案

#1


2  

I assume you are generating huge JSON documents as for the small ones string output is good enough: objectMapper.writeValue(<String>, result);

我假设你正在生成巨大的JSON文档,因为小的字符串输出就足够了:objectMapper.writeValue( ,result);

There's a problem with streams. ObjectMapper doesn't know the result size and you will end up with the exception:

流有问题。 ObjectMapper不知道结果大小,你最终会得到异常:

java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
        at org.vertx.java.core.http.impl.DefaultHttpServerResponse.write(DefaultHttpServerResponse.java:474)

So in your example I would use temporary files for JSON output and then flush them into response (I haven't tested the code)

所以在你的例子中我会使用临时文件进行JSON输出,然后将它们刷新到响应中(我还没有测试过代码)

File tmpFile = File.createTempFile("tmp", ".json");
mapper.writeValue(tmpFile, result);
req.response().sendFile(tmpFile.getAbsolutePath(), (result) -> tmpFile.delete());

In case you know content length initially you can use the following code to map OutputStream with WriteStream

如果您最初知道内容长度,则可以使用以下代码将OutputStream映射到WriteStream

import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

public class OutputWriterStream extends OutputStream {

    public WriteStream writeStream;
    public Runnable closeHandler;

    @Override
    public void write(int b) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        if (off == 0 && len == b.length) {
            writeStream.write(new Buffer(b));
            return;
        }

        byte[] bytes = new byte[len];
        System.arraycopy(b, off, bytes, 0, len);
        writeStream.write(new Buffer(bytes));
    }

    @Override
    public void write(byte[] b) throws IOException {
        writeStream.write(new Buffer(b));
    }

    @Override
    public void close() throws IOException {
        closeHandler.run();
    }
}

#2


2  

This might be a bit better (and updated for Vertx3) answer:

这可能会更好(并为Vertx3更新)答案:

import io.vertx.core.file.AsyncFile;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

public class OutputWriterStream extends OutputStream {

    public OutputWriterStream(final WriteStream response) {
        this.response = response;
        this.buffer = new byte[8192];
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        buffer[counter++] = (byte) b;

        if (counter >= buffer.length) {
            flush();
        }
    }

    @Override
    public void flush() throws IOException {
        super.flush();

        if (counter > 0) {
            byte[] remaining = buffer;

            if (counter < buffer.length) {
                remaining = new byte[counter];

                System.arraycopy(buffer, 0, remaining, 0, counter);
            }

            response.write(Buffer.buffer(remaining));
            counter = 0;
        }
    }

    @Override
    public void close() throws IOException {
        flush();

        super.close();

        if (response instanceof HttpServerResponse) {
            try {
                response.end();
            }
            catch (final IllegalStateException ignore) {
            }
        }
        else if (response instanceof AsyncFile) {
            ((AsyncFile) response).close();
        }
    }

    private final WriteStream<Buffer> response;
    private final byte[] buffer;
    private int counter = 0;

}

#1


2  

I assume you are generating huge JSON documents as for the small ones string output is good enough: objectMapper.writeValue(<String>, result);

我假设你正在生成巨大的JSON文档,因为小的字符串输出就足够了:objectMapper.writeValue( ,result);

There's a problem with streams. ObjectMapper doesn't know the result size and you will end up with the exception:

流有问题。 ObjectMapper不知道结果大小,你最终会得到异常:

java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
        at org.vertx.java.core.http.impl.DefaultHttpServerResponse.write(DefaultHttpServerResponse.java:474)

So in your example I would use temporary files for JSON output and then flush them into response (I haven't tested the code)

所以在你的例子中我会使用临时文件进行JSON输出,然后将它们刷新到响应中(我还没有测试过代码)

File tmpFile = File.createTempFile("tmp", ".json");
mapper.writeValue(tmpFile, result);
req.response().sendFile(tmpFile.getAbsolutePath(), (result) -> tmpFile.delete());

In case you know content length initially you can use the following code to map OutputStream with WriteStream

如果您最初知道内容长度,则可以使用以下代码将OutputStream映射到WriteStream

import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

public class OutputWriterStream extends OutputStream {

    public WriteStream writeStream;
    public Runnable closeHandler;

    @Override
    public void write(int b) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        if (off == 0 && len == b.length) {
            writeStream.write(new Buffer(b));
            return;
        }

        byte[] bytes = new byte[len];
        System.arraycopy(b, off, bytes, 0, len);
        writeStream.write(new Buffer(bytes));
    }

    @Override
    public void write(byte[] b) throws IOException {
        writeStream.write(new Buffer(b));
    }

    @Override
    public void close() throws IOException {
        closeHandler.run();
    }
}

#2


2  

This might be a bit better (and updated for Vertx3) answer:

这可能会更好(并为Vertx3更新)答案:

import io.vertx.core.file.AsyncFile;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

public class OutputWriterStream extends OutputStream {

    public OutputWriterStream(final WriteStream response) {
        this.response = response;
        this.buffer = new byte[8192];
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        buffer[counter++] = (byte) b;

        if (counter >= buffer.length) {
            flush();
        }
    }

    @Override
    public void flush() throws IOException {
        super.flush();

        if (counter > 0) {
            byte[] remaining = buffer;

            if (counter < buffer.length) {
                remaining = new byte[counter];

                System.arraycopy(buffer, 0, remaining, 0, counter);
            }

            response.write(Buffer.buffer(remaining));
            counter = 0;
        }
    }

    @Override
    public void close() throws IOException {
        flush();

        super.close();

        if (response instanceof HttpServerResponse) {
            try {
                response.end();
            }
            catch (final IllegalStateException ignore) {
            }
        }
        else if (response instanceof AsyncFile) {
            ((AsyncFile) response).close();
        }
    }

    private final WriteStream<Buffer> response;
    private final byte[] buffer;
    private int counter = 0;

}