鼓捣了一个多月的即时通讯项目,暂时告一段落,其实对于Android的IM以前有用TCP做过,但仅限于文字与图片,新的项目加入了语音聊天,使用WebRTC相关的技术。
此次通讯协议是MQTT,也是第一次接触,服务器端实现采用了Mosca,目前只是实现qos=1与qos=0的功能。Android端用到的MQTT库是eclipse提供的paho,不过他给出的Demo里有好多坑,修改之后,用起来还算稳定。
IM 的流程大致是MQTT建立连接,订阅(subscirbe),发布(publish)。
MQTT连接建立的代码(SSL方式)
public static void connect(Driver driver) {
ServerConfig serverConfig = UserModule.Instance.getServerConfig();
MqttConnectOptions conOpt = new MqttConnectOptions();
try {
SSLContext sslContext;
KeyStore ts = KeyStore.getInstance("BKS");
ts.load(context.getResources().openRawResource(R.raw.test_cert),
"123456".toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(ts);
TrustManager[] tm = tmf.getTrustManagers();
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, tm, null);
SocketFactory factory = sslContext.getSocketFactory();
conOpt.setSocketFactory(factory);
} catch (Exception e) {
e.printStackTrace();
}
<span style="white-space:pre"> </span>
<span style="white-space:pre"> </span>//paho库得 Iterator<Map.Entry<String, Connection>> it = Connections .getInstance(context).getConnections().entrySet().iterator(); while (it.hasNext()) { MqttClientAndroidService detectClient = it.next().getValue() .getClient(); try { detectClient.disconnect(); } catch (MqttException e) { e.printStackTrace(); } it.remove(); } // The basic client information MqttClientAndroidService client; client = Connections.getInstance(context).createClient(context, serverConfig.getUri(), serverConfig.clientId); Integer qos = Integer.parseInt(context.getResources().getString( R.string.qos)); Boolean retained = Boolean.parseBoolean(context.getResources() .getString(R.string.retained)); // connection options int timeout = Integer.parseInt(context.getResources().getString( (R.string.timeout))); int keepalive = Integer.parseInt(context.getResources().getString( R.string.keepalive)); Connection connection = new Connection(serverConfig.getClientHandle(), serverConfig.clientId, serverConfig.server, Integer.parseInt(serverConfig.port), context, client, serverConfig.ssl); // connection.registerChangeListener(changeListener); // connect client String[] actionArgs = new String[1]; actionArgs[0] = serverConfig.clientId; connection.changeConnectionStatus(ConnectionStatus.CONNECTING); boolean cleanSession = false; conOpt.setCleanSession(cleanSession); conOpt.setConnectionTimeout(timeout); conOpt.setKeepAliveInterval(keepalive); if (!TextUtils.isEmpty(serverConfig.user)) { conOpt.setUserName(serverConfig.user); } if (!TextUtils.isEmpty(serverConfig.pwd)) { conOpt.setPassword(serverConfig.pwd.toCharArray()); }// conOpt.setPassword("1111".toCharArray()); final ActionListener callback = new ActionListener(context, ActionListener.Action.CONNECT, driver.getMqttUtilsCallback(), serverConfig.getClientHandle(), actionArgs); boolean doConnect = true; String message = ActivityConstants.message; String topic = ActivityConstants.topic; if ((!TextUtils.isEmpty(message) || !TextUtils.isEmpty(topic))) { // need to make a message since last will is set try { conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (Exception e) { e.printStackTrace(); doConnect = false; callback.onFailure(null, e); } } client.setCallback(new MqttCallbackHandler(context, serverConfig .getClientHandle(), driver)); connection.addConnectionOptions(conOpt); Connections.getInstance(context).addConnection(connection); if (doConnect) { try { client.connect(conOpt, null, callback); } catch (MqttException e) { Log.e(TAG, "MqttException Occured", e); } } }发布(publish)代码
public static void publish(String clientHandle, String topic,订阅(subscribe)代码
JSONObject jsonObj, int qos) {
MqttClientAndroidService client = Connections.getInstance(context)
.getConnection(clientHandle).getClient();
if (!isConnected(context)) {
try {
client.connect();
} catch (MqttException e) {
e.printStackTrace();
}
Toast.makeText(context, "please try again", Toast.LENGTH_SHORT)
.show();
return;
}
if (topic == null) {
Toast.makeText(context, "can not get other's identity for now",
Toast.LENGTH_SHORT).show();
return;
}
String[] args = new String[2];
if (jsonObj.optLong("time") != 0) {
args[0] = String.valueOf(jsonObj.optLong("time"));
args[1] = topic;
}
Boolean retained = Boolean.parseBoolean(context.getResources()
.getString(R.string.retained));
try {
client.publish(topic, jsonObj.toString().getBytes(), qos, retained,
null, new ActionListener(context, Action.PUBLISH, null,
clientHandle, args));
} catch (MqttSecurityException e) {
Log.e(TAG,
"Failed to publish a messged from the client with the handle "
+ clientHandle, e);
} catch (MqttException e) {
Log.e(TAG,
"Failed to publish a messged from the client with the handle "
+ clientHandle, e);
}
}
public static void subscribe(MqttUtilsCallback mqttUtilsCallback,
String clientHandle, String topic) {
MqttClientAndroidService client = Connections.getInstance(context)
.getConnection(clientHandle).getClient();
if (client == null || (client != null && !client.isConnected())) {
Toast.makeText(context, "please connect to server first",
Toast.LENGTH_SHORT).show();
return;
}
if (TextUtils.isEmpty(topic)) {
topic = "hello";
}
String[] topics = new String[1];
topics[0] = topic;
try {
client.subscribe(topic, 1, null, new ActionListener(context,
ActionListener.Action.SUBSCRIBE, mqttUtilsCallback,
clientHandle, topics));
} catch (MqttSecurityException e) {
Log.e(TAG, "Failed to subscribe to" + topic
+ " the client with the handle " + clientHandle, e);
} catch (MqttException e) {
Log.e(TAG, "Failed to subscribe to" + topic
+ " the client with the handle " + clientHandle, e);
}
}
以上三个方法罗列出了MQTT的做为IM的基本流程,但还差一步很重要的,接收消息。paho为所有动作提供了两个回调接口,一个是IMqttActionListener,其中包含connect,publish,subscribe,disconnect成功与失败的回调方法;另一个是MqttCallback,所有IM的消息到达,连接断开在此接口处理,代码如下
public class MqttCallbackHandler implements MqttCallback {
/**
* {@link Context} for the application used to format and import external
* strings
**/
private Context context;
/**
* Client handle to reference the connection that this handler is attached
* to
**/
private String clientHandle;
private MqttUtilsCallback mqttUtilsCallback;
private Driver driver;
/**
* Creates an <code>MqttCallbackHandler</code> object
*
* @param context
* The application's context
* @param clientHandle
* The handle to a {@link Connection} object
* @param driver
* The bridge connects UI, MQTT and WebRTC, and abstract logic
* from activity
*/
public MqttCallbackHandler(Context context, String clientHandle, Driver driver) {
this.context = context;
this.clientHandle = clientHandle;
this.driver = driver;
if (driver != null) {
this.mqttUtilsCallback = driver.getMqttUtilsCallback();
}
}
/**
* @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.Throwable)
*/
@Override
public void connectionLost(Throwable cause) {
if (cause != null) {
Connection c = Connections.getInstance(context).getConnection(clientHandle);
c.addAction("Connection Lost");
c.changeConnectionStatus(ConnectionStatus.DISCONNECTED);
// format string to use a notification text
Object[] args = new Object[2];
args[0] = c.getId();
args[1] = c.getHostName();
String message = context.getString(R.string.connection_lost, args);
// build intent
// Intent intent = new Intent();
// intent.setClassName(context,
// "com.ibm.msg.android.ConnectionDetails");
// intent.putExtra("handle", clientHandle);
<span style="white-space:pre"> </span>//重连机制 //PollManager.Instance.init(); // notify the user // Notify.notifcation(context, message, intent, // R.string.notifyTitle_connectionLost); System.out.println(message); cause.printStackTrace(); } } /** * @see org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String, * org.eclipse.paho.client.mqttv3.MqttMessage) run on UIThread */ @Override public void messageArrived(String topic, MqttMessage message) { String strMsg = new String(message.getPayload()); System.out.println("msg arrived " + strMsg); } /** * @see org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken) */ @Override public void deliveryComplete(IMqttDeliveryToken token) { // Do nothing }以上代码基本就可以满足IM的功能需求,聊天界面及本地缓存策略可以自己设计。
------------------------------------------------------------分割线---------------------------------------------------------
语音聊天模块是基于WebRTC的,关于WebRTC的资料大部分来自网络,由于时间原因没有自己编译,引入libjingle-connection库,下载地址:https://github.com/njovy/AppRTCDemo。