非阻塞文件IO在Java中。

时间:2021-11-25 21:46:35

I want to write to a named pipe (already created) without blocking on the reader. My reader is another application that may go down. If the reader does go down, I want the writer application to keep writing to that named pipe. Something like a this in Java

我想写入一个命名管道(已经创建),而不阻塞读取器。我的读者是另一个可能下降的应用。如果读取器确实下降了,我希望写入器应用程序继续写入那个命名管道。在Java中就像这样。

fopen(fPath, O_NONBLOCK)

So that when the reader comes up, it may resume from where it failed.

所以当读者出现时,它可能会从失败的地方恢复过来。

4 个解决方案

#1


8  

First I try to answer your questions. Next I will try to show you a code snippet I created that solves your problem using blocking IO.

首先我试着回答你的问题。接下来,我将向您展示我创建的一个代码片段,它使用阻塞IO解决了您的问题。

Your questions

I want to write to a named pipe (already created) without blocking on the reader

我想写入一个命名管道(已经创建),而不阻塞读取器。

You don't need non blocking IO to solve your problem. I think it can not even help you solve your problem. Blocking IO will also run good(maybe even better then non blocking IO because of the low concurrency). A plus is blocking IO is easier to program. Your reader can/should stay blocking.

你不需要非阻塞的IO来解决你的问题。我认为它甚至不能帮你解决问题。阻塞IO也会运行得很好(因为并发性较低,可能会更好)。另外,阻塞IO更容易编程。你的读者可以/应该继续*。

My reader is another application that may go down. If the reader does go down, I want the writer application to neep writing to the named pipe. So that when the reader comes up, it may resume from where it failed.

我的读者是另一个可能下降的应用。如果读取器确实下降,我希望写入器应用程序将写入到命名管道。所以当读者出现时,它可能会从失败的地方恢复过来。

just put the messages inside a blocking queue. Next write to the named pipe only when the reader is reading from it(happens automatically because of blocking IO). No need for non-blocking file IO when you use a blocking queue. The data is asynchronous delivered from the blocking queue when a reader is reading, which will sent your data from your writer to the reader.

只需将消息放入阻塞队列中。当读写器读取它的时候(由于阻塞IO而自动发生),接下来写入命名管道。当您使用阻塞队列时,不需要非阻塞的文件IO。当读取器读取时,数据是从阻塞队列发送的,这将把您的数据从写入器发送给读取器。

Something like a fopen(fPath, O_NONBLOCK) in Java

类似于Java中的fopen(fPath, O_NONBLOCK)。

You don't need non-blocking IO on the reader and even if you used it. just use blocking IO.

您不需要对阅读器进行非阻塞IO,即使您使用它。只使用阻塞IO。

CODE SNIPPET

A created a little snippet which I believe demonstrates what your needs.

我所创建的一个小片段展示了您的需求。

Components:

组件:

  • Writer.java: reads lines from console as an example. When you start program enter text followed by enter which will sent it to your named pipe. The writer will resume writing if necessary.
  • 作家。java:从控制台读取行作为示例。当您启动程序时,输入文本,然后输入将它发送到您指定的管道。如有必要,作者将继续写作。
  • Reader.java: reads lines written from your named pipe(Writer.java).
  • 读者。java:读从您的命名管道(Writer.java)写的行。
  • Named pipe: I assume you have created a pipe named "pipe" in the same directory.
  • 命名管道:我假设您已经在同一个目录中创建了一个名为“管道”的管道。

Writer.java

Writer.java

import java.io.BufferedWriter;
import java.io.Console;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Writer {
    private final BlockingDeque<StringBuffer> queue;
    private final String filename;

    public static void main(String[] args) throws Exception {
        final Console console = System.console();
        final Writer writer = new Writer("pipe");

        writer.init();

        while(true) {
            String readLine = console.readLine();
            writer.write(new StringBuffer(readLine));
        }
    }

    public Writer(final String filename){
        this.queue = new LinkedBlockingDeque<StringBuffer>();
        this.filename = filename;
    }

    public void write(StringBuffer buf) {
        queue.add(buf);
    }

    public void init() {
        ExecutorService single = Executors.newSingleThreadExecutor();

        Runnable runnable = new Runnable() {
            public void run() {
                while(true) {
                    PrintWriter w = null;
                    try {
                        String toString = queue.take().toString();
                        w = new PrintWriter(new BufferedWriter(new FileWriter(filename)), true);
                        w.println(toString);
                    } catch (Exception ex) {
                        Logger.getLogger(Writer.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };

        single.submit(runnable);
    }
}

Reader.java

Reader.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Reader {
    private final BufferedReader br;

    public Reader(final String filename) throws FileNotFoundException {
        br = new BufferedReader(new FileReader(filename));
    }

    public String readLine() throws IOException {
        return br.readLine();
    }

    public void close() {
        try {
            br.close();
        } catch (IOException ex) {
            Logger.getLogger(Reader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public static void main(String[] args) throws FileNotFoundException {
        Reader reader = new Reader("pipe");
        while(true) {
            try {
                String readLine = reader.readLine();
                System.out.println("readLine = " + readLine);
            } catch (IOException ex) {
                reader.close();
                break;
            }
        }
    }
}

#2


2  

If you want pipes to stay active and queue up messages, you probably want a messaging system rather than a raw pipe. In Java, the standard API is called "Java Messaging System" (JMS), and there are many standard implementations-- the most common of which I've seen being Apache ActiveMQ. If you want a cross-platform, sockets-like interface that does buffering and recovery I might suggest 0MQ, which while not being "pure Java" has bindings for many languages and excellent performance.

如果您希望管道保持活动并对消息进行排队,您可能需要一个消息传递系统,而不是原始管道。在Java中,标准的API被称为“Java消息传递系统”(JMS),并且有许多标准的实现,其中最常见的是Apache ActiveMQ。如果您想要一个跨平台的,类似于sockets的接口,它可以缓冲和恢复,我可能建议使用0MQ,而不是“纯Java”,它具有许多语言的绑定和出色的性能。

#3


1  

If there was such a thing as non-blocking file I/O in Java, which there isn't, a write to a named pipe that wasn't being read would return zero and not write anything. So non-blocking isn't part of the solution.

如果在Java中有一个非阻塞文件I/O(没有),那么写入一个未被读取的命名管道将返回0,而不会写任何东西。所以非阻塞不是解决方案的一部分。

There's also the issue that named pipes have a finite buffer size. They aren't infinite queues regardless of whether there is a reading process or not. I agree with the suggestion to look into JMS.

还有一个问题,命名管道的缓冲区大小是有限的。它们不是无限的队列,不管是否有阅读过程。我同意对JMS进行研究的建议。

#4


-1  

You should be able to use NIO's asynch write on a UNIX FIFO, just as you can to any other file:

您应该能够在UNIX FIFO上使用NIO的异步写,就像您可以对任何其他文件一样:

 AsynchronousFileChannel channel = AsynchronousFileChannel.open(...);
 Future<Integer> writeFuture = channel.write(...);

... or...

…还是……

 channel.write(..., myCompletionHandler);

However, it's not clear to me what you want to happen when the FIFO isn't accepting writes. Do you want it to buffer? If so you'll need to provide it within the Java program. Do you want it to time out? There's no simple timeout option on Java file writes.

然而,我不清楚你想在FIFO不接受写作时发生什么。你想要它缓冲吗?如果需要,您需要在Java程序中提供它。你想让它超时吗?在Java文件中没有简单的超时选项。

These aren't insurmountable problems. If you're determined you can probably get something working. But I wonder whether you'd not find life much easier if you just used a TCP socket or a JMS queue.

这些不是不可逾越的问题。如果你下定决心,你可能会得到一些有用的东西。但我想知道,如果您只是使用TCP套接字或JMS队列,您是否会觉得生活更简单。

#1


8  

First I try to answer your questions. Next I will try to show you a code snippet I created that solves your problem using blocking IO.

首先我试着回答你的问题。接下来,我将向您展示我创建的一个代码片段,它使用阻塞IO解决了您的问题。

Your questions

I want to write to a named pipe (already created) without blocking on the reader

我想写入一个命名管道(已经创建),而不阻塞读取器。

You don't need non blocking IO to solve your problem. I think it can not even help you solve your problem. Blocking IO will also run good(maybe even better then non blocking IO because of the low concurrency). A plus is blocking IO is easier to program. Your reader can/should stay blocking.

你不需要非阻塞的IO来解决你的问题。我认为它甚至不能帮你解决问题。阻塞IO也会运行得很好(因为并发性较低,可能会更好)。另外,阻塞IO更容易编程。你的读者可以/应该继续*。

My reader is another application that may go down. If the reader does go down, I want the writer application to neep writing to the named pipe. So that when the reader comes up, it may resume from where it failed.

我的读者是另一个可能下降的应用。如果读取器确实下降,我希望写入器应用程序将写入到命名管道。所以当读者出现时,它可能会从失败的地方恢复过来。

just put the messages inside a blocking queue. Next write to the named pipe only when the reader is reading from it(happens automatically because of blocking IO). No need for non-blocking file IO when you use a blocking queue. The data is asynchronous delivered from the blocking queue when a reader is reading, which will sent your data from your writer to the reader.

只需将消息放入阻塞队列中。当读写器读取它的时候(由于阻塞IO而自动发生),接下来写入命名管道。当您使用阻塞队列时,不需要非阻塞的文件IO。当读取器读取时,数据是从阻塞队列发送的,这将把您的数据从写入器发送给读取器。

Something like a fopen(fPath, O_NONBLOCK) in Java

类似于Java中的fopen(fPath, O_NONBLOCK)。

You don't need non-blocking IO on the reader and even if you used it. just use blocking IO.

您不需要对阅读器进行非阻塞IO,即使您使用它。只使用阻塞IO。

CODE SNIPPET

A created a little snippet which I believe demonstrates what your needs.

我所创建的一个小片段展示了您的需求。

Components:

组件:

  • Writer.java: reads lines from console as an example. When you start program enter text followed by enter which will sent it to your named pipe. The writer will resume writing if necessary.
  • 作家。java:从控制台读取行作为示例。当您启动程序时,输入文本,然后输入将它发送到您指定的管道。如有必要,作者将继续写作。
  • Reader.java: reads lines written from your named pipe(Writer.java).
  • 读者。java:读从您的命名管道(Writer.java)写的行。
  • Named pipe: I assume you have created a pipe named "pipe" in the same directory.
  • 命名管道:我假设您已经在同一个目录中创建了一个名为“管道”的管道。

Writer.java

Writer.java

import java.io.BufferedWriter;
import java.io.Console;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Writer {
    private final BlockingDeque<StringBuffer> queue;
    private final String filename;

    public static void main(String[] args) throws Exception {
        final Console console = System.console();
        final Writer writer = new Writer("pipe");

        writer.init();

        while(true) {
            String readLine = console.readLine();
            writer.write(new StringBuffer(readLine));
        }
    }

    public Writer(final String filename){
        this.queue = new LinkedBlockingDeque<StringBuffer>();
        this.filename = filename;
    }

    public void write(StringBuffer buf) {
        queue.add(buf);
    }

    public void init() {
        ExecutorService single = Executors.newSingleThreadExecutor();

        Runnable runnable = new Runnable() {
            public void run() {
                while(true) {
                    PrintWriter w = null;
                    try {
                        String toString = queue.take().toString();
                        w = new PrintWriter(new BufferedWriter(new FileWriter(filename)), true);
                        w.println(toString);
                    } catch (Exception ex) {
                        Logger.getLogger(Writer.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };

        single.submit(runnable);
    }
}

Reader.java

Reader.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Reader {
    private final BufferedReader br;

    public Reader(final String filename) throws FileNotFoundException {
        br = new BufferedReader(new FileReader(filename));
    }

    public String readLine() throws IOException {
        return br.readLine();
    }

    public void close() {
        try {
            br.close();
        } catch (IOException ex) {
            Logger.getLogger(Reader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public static void main(String[] args) throws FileNotFoundException {
        Reader reader = new Reader("pipe");
        while(true) {
            try {
                String readLine = reader.readLine();
                System.out.println("readLine = " + readLine);
            } catch (IOException ex) {
                reader.close();
                break;
            }
        }
    }
}

#2


2  

If you want pipes to stay active and queue up messages, you probably want a messaging system rather than a raw pipe. In Java, the standard API is called "Java Messaging System" (JMS), and there are many standard implementations-- the most common of which I've seen being Apache ActiveMQ. If you want a cross-platform, sockets-like interface that does buffering and recovery I might suggest 0MQ, which while not being "pure Java" has bindings for many languages and excellent performance.

如果您希望管道保持活动并对消息进行排队,您可能需要一个消息传递系统,而不是原始管道。在Java中,标准的API被称为“Java消息传递系统”(JMS),并且有许多标准的实现,其中最常见的是Apache ActiveMQ。如果您想要一个跨平台的,类似于sockets的接口,它可以缓冲和恢复,我可能建议使用0MQ,而不是“纯Java”,它具有许多语言的绑定和出色的性能。

#3


1  

If there was such a thing as non-blocking file I/O in Java, which there isn't, a write to a named pipe that wasn't being read would return zero and not write anything. So non-blocking isn't part of the solution.

如果在Java中有一个非阻塞文件I/O(没有),那么写入一个未被读取的命名管道将返回0,而不会写任何东西。所以非阻塞不是解决方案的一部分。

There's also the issue that named pipes have a finite buffer size. They aren't infinite queues regardless of whether there is a reading process or not. I agree with the suggestion to look into JMS.

还有一个问题,命名管道的缓冲区大小是有限的。它们不是无限的队列,不管是否有阅读过程。我同意对JMS进行研究的建议。

#4


-1  

You should be able to use NIO's asynch write on a UNIX FIFO, just as you can to any other file:

您应该能够在UNIX FIFO上使用NIO的异步写,就像您可以对任何其他文件一样:

 AsynchronousFileChannel channel = AsynchronousFileChannel.open(...);
 Future<Integer> writeFuture = channel.write(...);

... or...

…还是……

 channel.write(..., myCompletionHandler);

However, it's not clear to me what you want to happen when the FIFO isn't accepting writes. Do you want it to buffer? If so you'll need to provide it within the Java program. Do you want it to time out? There's no simple timeout option on Java file writes.

然而,我不清楚你想在FIFO不接受写作时发生什么。你想要它缓冲吗?如果需要,您需要在Java程序中提供它。你想让它超时吗?在Java文件中没有简单的超时选项。

These aren't insurmountable problems. If you're determined you can probably get something working. But I wonder whether you'd not find life much easier if you just used a TCP socket or a JMS queue.

这些不是不可逾越的问题。如果你下定决心,你可能会得到一些有用的东西。但我想知道,如果您只是使用TCP套接字或JMS队列,您是否会觉得生活更简单。