Android mqtt即时通讯方案

时间:2022-02-09 13:25:04

鼓捣了一个多月的即时通讯项目,暂时告一段落,其实对于Android的IM以前有用TCP做过,但仅限于文字与图片,新的项目加入了语音聊天,使用WebRTC相关的技术。

此次通讯协议是MQTT,也是第一次接触,服务器端实现采用了Mosca,目前只是实现qos=1与qos=0的功能。Android端用到的MQTT库是eclipse提供的paho,不过他给出的Demo里有好多坑,修改之后,用起来还算稳定。

IM 的流程大致是MQTT建立连接,订阅(subscirbe),发布(publish)。

MQTT连接建立的代码(SSL方式)

public static void connect(Driver driver) {
ServerConfig serverConfig = UserModule.Instance.getServerConfig();
MqttConnectOptions conOpt = new MqttConnectOptions();
try {
SSLContext sslContext;
KeyStore ts = KeyStore.getInstance("BKS");
ts.load(context.getResources().openRawResource(R.raw.test_cert),
"123456".toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(ts);
TrustManager[] tm = tmf.getTrustManagers();
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, tm, null);

SocketFactory factory = sslContext.getSocketFactory();
conOpt.setSocketFactory(factory);
} catch (Exception e) {
e.printStackTrace();
}
<span style="white-space:pre"> </span>
<span style="white-space:pre">		</span>//paho库得		Iterator<Map.Entry<String, Connection>> it = Connections				.getInstance(context).getConnections().entrySet().iterator();		while (it.hasNext()) {			MqttClientAndroidService detectClient = it.next().getValue()					.getClient();			try {				detectClient.disconnect();			} catch (MqttException e) {				e.printStackTrace();			}			it.remove();		}		// The basic client information		MqttClientAndroidService client;		client = Connections.getInstance(context).createClient(context,				serverConfig.getUri(), serverConfig.clientId);		Integer qos = Integer.parseInt(context.getResources().getString(				R.string.qos));		Boolean retained = Boolean.parseBoolean(context.getResources()				.getString(R.string.retained));		// connection options		int timeout = Integer.parseInt(context.getResources().getString(				(R.string.timeout)));		int keepalive = Integer.parseInt(context.getResources().getString(				R.string.keepalive));		Connection connection = new Connection(serverConfig.getClientHandle(),				serverConfig.clientId, serverConfig.server,				Integer.parseInt(serverConfig.port), context, client,				serverConfig.ssl);		// connection.registerChangeListener(changeListener);		// connect client		String[] actionArgs = new String[1];		actionArgs[0] = serverConfig.clientId;		connection.changeConnectionStatus(ConnectionStatus.CONNECTING);		boolean cleanSession = false;		conOpt.setCleanSession(cleanSession);		conOpt.setConnectionTimeout(timeout);		conOpt.setKeepAliveInterval(keepalive);		if (!TextUtils.isEmpty(serverConfig.user)) {			conOpt.setUserName(serverConfig.user);		}		if (!TextUtils.isEmpty(serverConfig.pwd)) {			conOpt.setPassword(serverConfig.pwd.toCharArray());		}//		conOpt.setPassword("1111".toCharArray());		final ActionListener callback = new ActionListener(context,				ActionListener.Action.CONNECT, driver.getMqttUtilsCallback(),				serverConfig.getClientHandle(), actionArgs);		boolean doConnect = true;		String message = ActivityConstants.message;		String topic = ActivityConstants.topic;		if ((!TextUtils.isEmpty(message) || !TextUtils.isEmpty(topic))) {			// need to make a message since last will is set			try {				conOpt.setWill(topic, message.getBytes(), qos.intValue(),						retained.booleanValue());			} catch (Exception e) {				e.printStackTrace();				doConnect = false;				callback.onFailure(null, e);			}		}		client.setCallback(new MqttCallbackHandler(context, serverConfig				.getClientHandle(), driver));		connection.addConnectionOptions(conOpt);		Connections.getInstance(context).addConnection(connection);		if (doConnect) {			try {				client.connect(conOpt, null, callback);			} catch (MqttException e) {				Log.e(TAG, "MqttException Occured", e);			}		}	}
发布(publish)代码

public static void publish(String clientHandle, String topic,
JSONObject jsonObj, int qos) {
MqttClientAndroidService client = Connections.getInstance(context)
.getConnection(clientHandle).getClient();
if (!isConnected(context)) {
try {
client.connect();
} catch (MqttException e) {
e.printStackTrace();
}
Toast.makeText(context, "please try again", Toast.LENGTH_SHORT)
.show();
return;
}
if (topic == null) {
Toast.makeText(context, "can not get other's identity for now",
Toast.LENGTH_SHORT).show();
return;
}
String[] args = new String[2];
if (jsonObj.optLong("time") != 0) {
args[0] = String.valueOf(jsonObj.optLong("time"));
args[1] = topic;
}

Boolean retained = Boolean.parseBoolean(context.getResources()
.getString(R.string.retained));
try {
client.publish(topic, jsonObj.toString().getBytes(), qos, retained,
null, new ActionListener(context, Action.PUBLISH, null,
clientHandle, args));
} catch (MqttSecurityException e) {
Log.e(TAG,
"Failed to publish a messged from the client with the handle "
+ clientHandle, e);
} catch (MqttException e) {
Log.e(TAG,
"Failed to publish a messged from the client with the handle "
+ clientHandle, e);
}
}
订阅(subscribe)代码

public static void subscribe(MqttUtilsCallback mqttUtilsCallback,
String clientHandle, String topic) {
MqttClientAndroidService client = Connections.getInstance(context)
.getConnection(clientHandle).getClient();
if (client == null || (client != null && !client.isConnected())) {
Toast.makeText(context, "please connect to server first",
Toast.LENGTH_SHORT).show();
return;
}

if (TextUtils.isEmpty(topic)) {
topic = "hello";
}
String[] topics = new String[1];
topics[0] = topic;
try {
client.subscribe(topic, 1, null, new ActionListener(context,
ActionListener.Action.SUBSCRIBE, mqttUtilsCallback,
clientHandle, topics));
} catch (MqttSecurityException e) {
Log.e(TAG, "Failed to subscribe to" + topic
+ " the client with the handle " + clientHandle, e);
} catch (MqttException e) {
Log.e(TAG, "Failed to subscribe to" + topic
+ " the client with the handle " + clientHandle, e);
}
}

以上三个方法罗列出了MQTT的做为IM的基本流程,但还差一步很重要的,接收消息。paho为所有动作提供了两个回调接口,一个是IMqttActionListener,其中包含connect,publish,subscribe,disconnect成功与失败的回调方法;另一个是MqttCallback,所有IM的消息到达,连接断开在此接口处理,代码如下

public class MqttCallbackHandler implements MqttCallback {

/**
* {@link Context} for the application used to format and import external
* strings
**/
private Context context;
/**
* Client handle to reference the connection that this handler is attached
* to
**/
private String clientHandle;

private MqttUtilsCallback mqttUtilsCallback;
private Driver driver;

/**
* Creates an <code>MqttCallbackHandler</code> object
*
* @param context
* The application's context
* @param clientHandle
* The handle to a {@link Connection} object
* @param driver
* The bridge connects UI, MQTT and WebRTC, and abstract logic
* from activity
*/
public MqttCallbackHandler(Context context, String clientHandle, Driver driver) {
this.context = context;
this.clientHandle = clientHandle;
this.driver = driver;
if (driver != null) {
this.mqttUtilsCallback = driver.getMqttUtilsCallback();
}
}

/**
* @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.Throwable)
*/
@Override
public void connectionLost(Throwable cause) {
if (cause != null) {
Connection c = Connections.getInstance(context).getConnection(clientHandle);
c.addAction("Connection Lost");
c.changeConnectionStatus(ConnectionStatus.DISCONNECTED);

// format string to use a notification text
Object[] args = new Object[2];
args[0] = c.getId();
args[1] = c.getHostName();

String message = context.getString(R.string.connection_lost, args);
// build intent
// Intent intent = new Intent();
// intent.setClassName(context,
// "com.ibm.msg.android.ConnectionDetails");
// intent.putExtra("handle", clientHandle);

<span style="white-space:pre">			</span>//重连机制			//PollManager.Instance.init();			// notify the user			// Notify.notifcation(context, message, intent,			// R.string.notifyTitle_connectionLost);			System.out.println(message);			cause.printStackTrace();		}	}	/**	 * @see org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String,	 *      org.eclipse.paho.client.mqttv3.MqttMessage) run on UIThread	 */	@Override	public void messageArrived(String topic, MqttMessage message) {		String strMsg = new String(message.getPayload());		System.out.println("msg arrived " + strMsg);	}	/**	 * @see org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken)	 */	@Override	public void deliveryComplete(IMqttDeliveryToken token) {		// Do nothing	}
以上代码基本就可以满足IM的功能需求,聊天界面及本地缓存策略可以自己设计。


------------------------------------------------------------分割线---------------------------------------------------------

语音聊天模块是基于WebRTC的,关于WebRTC的资料大部分来自网络,由于时间原因没有自己编译,引入libjingle-connection库,下载地址:https://github.com/njovy/AppRTCDemo。