Android中的socket本地通讯框架

时间:2023-03-09 15:30:43
Android中的socket本地通讯框架

一、先分析Native层:

1、C++基类SocketListener:

  1. class SocketListener {
  2.     int mSock;
  3.     const
    char *mSocketName;
  4.     SocketClientCollection *mClients;
  5.     pthread_mutex_t mClientsLock;
  6.     bool mListen;
  7.     int mCtrlPipe[2];
  8.     pthread_t mThread;
  9. public:
  10.     SocketListener(const
    char *socketNames, bool listen);
  11.     SocketListener(int socketFd, bool listen);
  12.     virtual ~SocketListener();
  13.     int startListener();
  14.     int stopListener();
  15.     void sendBroadcast(int code, const
    char *msg, bool addErrno);
  16.     void sendBroadcast(const
    char *msg);
  17. protected:
  18.     virtual bool onDataAvailable(SocketClient *c) = 0;
  19. private:
  20.     static
    void *threadStart(void *obj);
  21.     void runListener();
  22. };
  23. #endif

看关键接口runListener:

  1. void SocketListener::runListener() {
  2.     while(1) {
  3.         SocketClientCollection::iterator it;
  4.         fd_set read_fds;
  5.         int rc = 0;
  6.         int max = 0;
  7.         FD_ZERO(&read_fds);
  8.         if (mListen) {
  9.             max = mSock;
  10.             FD_SET(mSock, &read_fds);
  11.         }
  12.         FD_SET(mCtrlPipe[0], &read_fds);
  13.         if (mCtrlPipe[0] > max)
  14.             max = mCtrlPipe[0];
  15.         pthread_mutex_lock(&mClientsLock);
  16.         for (it = mClients->begin(); it != mClients->end(); ++it) {
  17.             FD_SET((*it)->getSocket(), &read_fds);
  18.             if ((*it)->getSocket() > max)
  19.                 max = (*it)->getSocket();
  20.         }
  21.         pthread_mutex_unlock(&mClientsLock);
  22.         if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) {
  23.             SLOGE("select failed (%s)", strerror(errno));
  24.             sleep(1);
  25.             continue;
  26.         } else
    if (!rc)
  27.             continue;
  28.         if (FD_ISSET(mCtrlPipe[0], &read_fds))
  29.             break;
  30.         if (mListen && FD_ISSET(mSock, &read_fds)) {
  31.             struct sockaddr addr;
  32.             socklen_t alen = sizeof(addr);
  33.             int c;
  34.             if ((c = accept(mSock, &addr, &alen)) < 0) {
  35.                 SLOGE("accept failed (%s)", strerror(errno));
  36.                 sleep(1);
  37.                 continue;
  38.             }
  39.             pthread_mutex_lock(&mClientsLock);
  40.             mClients->push_back(new SocketClient(c));
  41.             pthread_mutex_unlock(&mClientsLock);
  42.         }
  43.         do {
  44.             pthread_mutex_lock(&mClientsLock);
  45.             for (it = mClients->begin(); it != mClients->end(); ++it) {
  46.                 int fd = (*it)->getSocket();
  47.                 if (FD_ISSET(fd, &read_fds)) {
  48.                     pthread_mutex_unlock(&mClientsLock);
  49.                     if (!onDataAvailable(*it)) {//由子类实现的接口。
  50.                         close(fd);
  51.                         pthread_mutex_lock(&mClientsLock);
  52.                         delete *it;
  53.                         it = mClients->erase(it);
  54.                         pthread_mutex_unlock(&mClientsLock);
  55.                     }
  56.                     FD_CLR(fd, &read_fds);
  57.                     continue;
  58.                 }
  59.             }
  60.             pthread_mutex_unlock(&mClientsLock);
  61.         } while (0);
  62.     }
  63. }

2、他的第一继承者:

  1. #include "SocketListener.h"
  2. #include "FrameworkCommand.h"
  3. class SocketClient;
  4. class FrameworkListener : public SocketListener {
  5. public:
  6.     static
    const
    int CMD_ARGS_MAX = 16;
  7. private:
  8.     FrameworkCommandCollection *mCommands;
  9. public:
  10.     FrameworkListener(const
    char *socketName);
  11.     virtual ~FrameworkListener() {}
  12. protected:
  13.     void registerCmd(FrameworkCommand *cmd);
  14.     virtual
    bool onDataAvailable(SocketClient *c);
  15. private:
  16.     void dispatchCommand(SocketClient *c, char *data);
  17. };

他的onDataAvailable接口实现:

  1. bool FrameworkListener::onDataAvailable(SocketClient *c) {
  2.     char buffer[255];
  3.     int len;
  4.     if ((len = read(c->getSocket(), buffer, sizeof(buffer) -1)) < 0) {
  5.         SLOGE("read() failed (%s)", strerror(errno));
  6.         return errno;
  7.     } else
    if (!len)
  8.         return
    false;
  9.     int offset = 0;
  10.     int i;
  11.     for (i = 0; i < len; i++) {
  12.         if (buffer[i] == '\0') {
  13.             dispatchCommand(c, buffer + offset);
  14.             offset = i + 1;
  15.         }
  16.     }
  17.     return
    true;
  18. }

3、实例:netd的CommandListener类:

  1. class CommandListener : public FrameworkListener {
  2.     static TetherController *sTetherCtrl;
  3.     static NatController *sNatCtrl;
  4.     static PppController *sPppCtrl;
  5.     static PanController *sPanCtrl;
  6.     static SoftapController *sSoftapCtrl;
  7.     static UsbController *sUsbCtrl;
  8. public:
  9.     CommandListener();
  10.     virtual ~CommandListener() {}
  11. private:
  12.     static
    int readInterfaceCounters(const
    char *iface, unsigned long *rx, unsigned long *tx);
  13.     class UsbCmd : public NetdCommand {
  14.     public:
  15.         UsbCmd();
  16.         virtual ~UsbCmd() {}
  17.         int runCommand(SocketClient *c, int argc, char ** argv);
  18.     };
  19.     class SoftapCmd : public NetdCommand {
  20.     public:
  21.         SoftapCmd();
  22.         virtual ~SoftapCmd() {}
  23.         int runCommand(SocketClient *c, int argc, char ** argv);
  24.     };
  25.     class InterfaceCmd : public NetdCommand {
  26.     public:
  27.         InterfaceCmd();
  28.         virtual ~InterfaceCmd() {}
  29.         int runCommand(SocketClient *c, int argc, char ** argv);
  30.     };
  31.     class IpFwdCmd : public NetdCommand {
  32.     public:
  33.         IpFwdCmd();
  34.         virtual ~IpFwdCmd() {}
  35.         int runCommand(SocketClient *c, int argc, char ** argv);
  36.     };
  37.     class TetherCmd : public NetdCommand {
  38.     public:
  39.         TetherCmd();
  40.         virtual ~TetherCmd() {}
  41.         int runCommand(SocketClient *c, int argc, char ** argv);
  42.     };
  43.     class NatCmd : public NetdCommand {
  44.     public:
  45.         NatCmd();
  46.         virtual ~NatCmd() {}
  47.         int runCommand(SocketClient *c, int argc, char ** argv);
  48.     };
  49.     class ListTtysCmd : public NetdCommand {
  50.     public:
  51.         ListTtysCmd();
  52.         virtual ~ListTtysCmd() {}
  53.         int runCommand(SocketClient *c, int argc, char ** argv);
  54.     };
  55.     class PppdCmd : public NetdCommand {
  56.     public:
  57.         PppdCmd();
  58.         virtual ~PppdCmd() {}
  59.         int runCommand(SocketClient *c, int argc, char ** argv);
  60.     };
  61.     class PanCmd : public NetdCommand {
  62.     public:
  63.         PanCmd();
  64.         virtual ~PanCmd() {}
  65.         int runCommand(SocketClient *c, int argc, char ** argv);
  66.     };
  67. };

不能忘记NetdCommand类:

  1. #include <sysutils/FrameworkCommand.h>
  2. class NetdCommand : public FrameworkCommand {
  3. public:
  4.     NetdCommand(const
    char *cmd);
  5.     virtual ~NetdCommand() {}
  6. };
  7. 4、分析一个子类ListTtysCmd:
  8. CommandListener::ListTtysCmd::ListTtysCmd() :
  9.                  NetdCommand("list_ttys") {
  10. }
  11. int CommandListener::ListTtysCmd::runCommand(SocketClient *cli,
  12.                                              int argc, char **argv) {
  13.     TtyCollection *tlist = sPppCtrl->getTtyList();
  14.     TtyCollection::iterator it;
  15.     for (it = tlist->begin(); it != tlist->end(); ++it) {
  16.         cli->sendMsg(ResponseCode::TtyListResult, *it, false);
  17.     }
  18.     cli->sendMsg(ResponseCode::CommandOkay, "Ttys listed.", false);
  19.     return 0;
  20. }

Java层:

  1. private
    void listenToSocket() throws IOException {
  2.     LocalSocket socket = null;
  3.     try {
  4.         socket = new LocalSocket();
  5.         LocalSocketAddress address = new LocalSocketAddress(mSocket,
  6.                 LocalSocketAddress.Namespace.RESERVED);
  7.         socket.connect(address);
  8.         InputStream inputStream = socket.getInputStream();
  9.         mOutputStream = socket.getOutputStream();
  10.         mCallbacks.onDaemonConnected();
  11.         byte[] buffer = new
    byte[BUFFER_SIZE];
  12.         int start = 0;
  13.         while (true) {
  14.             int count = inputStream.read(buffer, start, BUFFER_SIZE - start);
  15.             if (count < 0) break;
  16.             // Add our starting point to the count and reset the start.
  17.             count += start;
  18.             start = 0;
  19.             for (int i = 0; i < count; i++) {
  20.                 if (buffer[i] == 0) {
  21.                     String event = new String(buffer, start, i - start);
  22.                     if (LOCAL_LOGD) Slog.d(TAG, String.format("RCV <- {%s}", event));
  23.                     String[] tokens = event.split("
    ", 2);
  24.                     try {
  25.                         int code = Integer.parseInt(tokens[0]);
  26.                         if (code >= ResponseCode.UnsolicitedInformational) {
  27.                             mCallbackHandler.sendMessage(
  28.                                     mCallbackHandler.obtainMessage(code, event));
  29.                         } else {
  30.                             try {
  31.                                 mResponseQueue.put(event);
  32.                             } catch (InterruptedException ex) {
  33.                                 Slog.e(TAG, "Failed to put response onto queue", ex);
  34.                             }
  35.                         }
  36.                     } catch (NumberFormatException nfe) {
  37.                         Slog.w(TAG, String.format("Bad msg (%s)", event));
  38.                     }
  39.                     start = i + 1;
  40.                 }
  41.             }
  42.             // We should end at the amount we read. If not, compact then
  43.             // buffer and read again.
  44.             if (start != count) {
  45.                 final int remaining = BUFFER_SIZE - start;
  46.                 System.arraycopy(buffer, start, buffer, 0, remaining);
  47.                 start = remaining;
  48.             } else {
  49.                 start = 0;
  50.             }
  51.         }
  52.     } catch (IOException ex) {
  53.         Slog.e(TAG, "Communications error", ex);
  54.         throw ex;
  55.     } finally {
  56.         synchronized (mDaemonLock) {
  57.             if (mOutputStream != null) {
  58.                 try {
  59.                     mOutputStream.close();
  60.                 } catch (IOException e) {
  61.                     Slog.w(TAG, "Failed closing output stream", e);
  62.                 }
  63.                 mOutputStream = null;
  64.             }
  65.         }
  66.         try {
  67.             if (socket != null) {
  68.                 socket.close();
  69.             }
  70.         } catch (IOException ex) {
  71.             Slog.w(TAG, "Failed closing socket", ex);
  72.         }
  73.     }
  74. }

sendCommandLocked接口

  1. /**
  2.   * Sends a command to the daemon with a single argument
  3.   *
  4.   * @param command The command to send to the daemon
  5.   * @param argument The argument to send with the command (or null)
  6.   */
  7.  private
    void sendCommandLocked(String command, String argument)
  8.          throws NativeDaemonConnectorException {
  9.      if (command != null && command.indexOf('\0') >= 0) {
  10.          throw
    new IllegalArgumentException("unexpected command: " + command);
  11.      }
  12.      if (argument != null && argument.indexOf('\0') >= 0) {
  13.          throw
    new IllegalArgumentException("unexpected argument: " + argument);
  14.      }
  15.      if (LOCAL_LOGD) Slog.d(TAG, String.format("SND -> {%s} {%s}", command, argument));
  16.      if (mOutputStream == null) {
  17.          Slog.e(TAG, "No connection to daemon", new IllegalStateException());
  18.          throw
    new NativeDaemonConnectorException("No output stream!");
  19.      } else {
  20.          StringBuilder builder = new StringBuilder(command);
  21.          if (argument != null) {
  22.              builder.append(argument);
  23.          }
  24.          builder.append('\0');
  25.          try {
  26.              mOutputStream.write(builder.toString().getBytes());
  27.          } catch (IOException ex) {
  28.              Slog.e(TAG, "IOException in sendCommand", ex);
  29.          }
  30.      }
  31.  }