Android MQTT协议利用ActiveMQ发送消息给Android端接收

时间:2024-03-25 21:00:26

转载:https://blog.csdn.net/JOYU_/article/details/81383467

Android MQTT协议利用ActiveMQ发送消息给Android端接收

1、添加依赖:

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-releases/"
    }
}
 
dependencies {
   
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
 
    //用来传递消息
    implementation 'org.greenrobot:eventbus:3.0.0'
}

2、MQTTService类继承Service

public class MQTTService extends Service {
    //消息服务器的URL根据后台ActiveMQ的配置来设置
    public static final String BROKER_URL = "tcp://192.168.9.1:1883";

    //注意:端口号,一定给默认1883,给其他的话会报错:“意外错误”,断点查看是连接问题。
    //客户端ID,用来标识一个客户,可以根据不同的策略来生成
    public static final String clientId = "admin";
    //订阅的主题名
    public static final String TOPIC = "class_receiver";
    //mqtt客户端类
    private MqttClient mqttClient;
    //mqtt连接配置类
    private MqttConnectOptions options;
 
    private String userName = "admin";
    private String passWord = "admin";
 
    public MQTTService() {
    }
    @Override
    public IBinder onBind(Intent intent) {
        // TODO: Return the communication channel to the service.
        throw new UnsupportedOperationException("Not yet implemented");
    }
    @SuppressWarnings("AlibabaLowerCamelCaseVariableNaming")
    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        try {
            //在服务开始时new一个mqttClient实例,客户端ID为clientId,第三个参数说明是持久化客户端,如果是null则是非持久化
            mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
            // MQTT的连接设置
            options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            //换而言之,设置为false时可以客户端可以接受离线消息
            options.setCleanSession(false);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调  回调类的说明在后面
            mqttClient.setCallback(new PushCallback());
            MqttTopic topic = mqttClient.getTopic(TOPIC);
            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
            options.setWill(topic, "close".getBytes(), 2, true);
            //mqtt客户端连接服务器
            mqttClient.connect(options);
 
            //mqtt客户端订阅主题
            //在mqtt中用QoS来标识服务质量
            //QoS=0时,报文最多发送一次,有可能丢失
            //QoS=1时,报文至少发送一次,有可能重复
            //QoS=2时,报文只发送一次,并且确保消息只到达一次。
            int[] Qos = {1};
            String[] topic1 = {TOPIC};
            mqttClient.subscribe(topic1, Qos);
        } catch (MqttException e) {
            Toast.makeText(getApplicationContext(), "Something went wrong1!" + e.getMessage(), Toast.LENGTH_LONG).show();
            e.printStackTrace();
        }
        return super.onStartCommand(intent, flags, startId);
    }
 
    @Override
    public void onDestroy() {
        try {
            mqttClient.disconnect(0);
        } catch (MqttException e) {
            Toast.makeText(getApplicationContext(), "Something went wrong2!" + e.getMessage(), Toast.LENGTH_LONG).show();
            e.printStackTrace();
        }
    }
}

3、PushCallback类同时实现MqttCallback接口

public class PushCallback implements MqttCallback {
    private static String TAG = "PushCallback";
    @Override
    public void connectionLost(Throwable cause) {
        Log.i(TAG, "链接失败---");
    }
 
    @Override
    public void messageArrived(String topic, final MqttMessage message) throws Exception {
        Log.i(TAG, "有新消息到达时的回调方法---");
        Log.i("topic = ", topic);
        String msg = new String(message.getPayload());
        Log.i("msg = ", msg);
        Log.i("qos = ", message.getQos() + "");
 
        //通过EventBus将获取到的消息传递出去
        EventBus.getDefault().post(new MessageEvent(msg));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {        
 
        Log.i("--deliveryComplete--", "成功发布某一消息后的回调方法");
    }
}

4、EventBus的消息事件类MessageEvent


public class MessageEvent {
 
    private String message;
 
    public MessageEvent(String message) {
        this.message = message;
    }
 
    public String getMessage() {
        return message;
    }
 
    public void setMessage(String message) {
        this.message = message;
    }
}

5、MainActivity类中

public class MainActivity extends AppCompatActivity {
    
    TextView tv;
 
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        //启动服务
        Intent intent = new Intent(this, MQTTService.class);
        startService(intent);
        //在需要订阅时间的地方注册事件
        EventBus.getDefault().register(this);
        
         tv = findViewById(R.id.tv);
    }
 
    //处理事件 这里是将获取到的消息吐司了一下
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void showTheEventMessage(MessageEvent messageEvent){
        Toast.makeText(this, messageEvent.getMessage(), Toast.LENGTH_SHORT).show();
         tv.setText(messageEvent.getMessage());
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        //取消事件订阅
        EventBus.getDefault().unregister(this);
    }
}

6、添加权限

 <!-- 别忘了加下面的权限 -->
    <uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

7、注册Service

<!-- Mqtt Service -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
        <service
            android:name=".MQTTService"
            android:enabled="true"
            android:exported="true"></service>