一 自定义认证 1.1 添加依赖 在项目里添加以下以下依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.15.6</version>
</dependency>
注意:依赖的版本一定要与部署的activemq版本保持一致
1.2 创建认证拦截器 1 2 3 4 5 6 7 8 9 10 11 package com.yishuifengxiao.study.activemq_plugin; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerPlugin; public class AuthPlugin implements BrokerPlugin { @Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package com.yishuifengxiao.study.activemq_plugin; import java.security.Principal; import java.security.cert.X509Certificate; import java.util.HashSet; import java.util.Set; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.security.AbstractAuthenticationBroker; import org.apache.activemq.security.SecurityContext; public class AuthBroker extends AbstractAuthenticationBroker { public AuthBroker(Broker next) { super(next); } @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { System.out.println("-----------------------------------------> ConnectionInfo = " + info); SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); } try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } } /** * 在这里执行用户认证逻辑 */ @Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException { System.out.println("-----------------------------------------> authenticate username= " + username + " password= " + password); SecurityContext securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>(); groups.add(new Principal() { @Override public String getName() { // TODO Auto-generated method stub return username; } });// 默认加入了users的组 return groups; } }; return securityContext; } @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { super.removeDestination(context, destination, timeout); System.out.println("removeDestination -----------------------------------------> destination =" + destination); } @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { super.removeConnection(context, info, error); System.out.println("removeConnection -----------------------------------------> info =" + info + " error= " + error); } }
1.3 打包部署 将上述代码导出为jar
包,拷贝到ActiveMq
安装目录的lib
文件夹中
1.4 修改activemq.xml
在activemq.xml
文件的broker
节点下加入自定义的插件配置
1 2 3 4 5 6 7 8 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <!-- 插件配置 --> <plugins> <bean xmlns="http://www.springframework.org/schema/beans" id="AuthPlugin" class="com.yishuifengxiao.study.activemq_plugin.AuthPlugin"></bean> </plugins> </broker>
配置成功后,重启ActiveMq
1.5 观察结果 启动activemq,然后使用 客户端连接activemq,并在连接后执行断开操作,可以看到如下的输出结果。
1 2 3 jvm 1 | -----------------------------------------> ConnectionInfo = ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:DESKTOP-J2Q1CEL-24338-1602505390192-3:1, clientId = 9edd1383-3ffb-44da-9beb-137ca80135841602505402257, clientIp = tcp://127.0.0.1:24381, userName = aaaa, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false} jvm 1 | -----------------------------------------> authenticate username= aaaa password= 12344 jvm 1 | removeConnection -----------------------------------------> info =ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:DESKTOP-J2Q1CEL-24338-1602505390192-3:1, clientId = 9edd1383-3ffb-44da-9beb-137ca80135841602505402257, clientIp = tcp://127.0.0.1:24381, userName = aaaa, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false} error= null
二 进阶使用 通过使用 JdbcTemplate
访问数据库。
2.1 添加依赖 在项目依赖里加上 spring-jdbc
依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.2.6.RELEASE</version>
</dependency>
注意此spring-jdbc
的版本与 activemq里使用的spring
的版本保持一致。
2.2 修改代码 修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.yishuifengxiao.study.activemq_plugin; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerPlugin; import org.springframework.jdbc.core.JdbcTemplate; public class AuthPlugin implements BrokerPlugin { private JdbcTemplate jdbcTemplate; public AuthPlugin(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker, this.jdbcTemplate); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 package com.yishuifengxiao.study.activemq_plugin; import java.security.Principal; import java.security.cert.X509Certificate; import java.util.HashSet; import java.util.Set; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.security.AbstractAuthenticationBroker; import org.apache.activemq.security.SecurityContext; import org.springframework.jdbc.core.JdbcTemplate; public class AuthBroker extends AbstractAuthenticationBroker { private JdbcTemplate jdbcTemplate; public AuthBroker(Broker next, JdbcTemplate jdbcTemplate) { super(next); this.jdbcTemplate = jdbcTemplate; System.out.println("-----------------------------------------> constructor = " + jdbcTemplate); } @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { System.out.println("-----------------------------------------> ConnectionInfo = " + info); SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); } try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } } /** * 在这里执行用户认证逻辑 */ @Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException { System.out.println("-----------------------------------------> authenticate username= " + username + " password= " + password); SecurityContext securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>(); groups.add(new Principal() { @Override public String getName() { // TODO Auto-generated method stub return username; } });// 默认加入了users的组 return groups; } }; return securityContext; } @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { super.removeDestination(context, destination, timeout); System.out.println("removeDestination -----------------------------------------> destination =" + destination); } @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { super.removeConnection(context, info, error); System.out.println("removeDestination -----------------------------------------> info =" + info + " error= " + error); } }
2.3 添加jar
到activemq中 1.将mysql-connector-java-xxx.jar
复制到activemq\lib
目录下
2.数据库操作采用spring-jdbc
的方式,需要将spring-jdbc-xxx.RELEASE.jar
复制到activemq\lib\optional
目录下(spring-jdbc的版本应与lib\optional其他的spring相同 )
2.4 修改activemq.xml 1 先在 beans
节点下增加以下内容 (beans
为根节点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <!-- mysql数据库数据源--> <bean id="mySqlDataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${jdbc.driverClassName}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean> <!-- 增加jdbcTemplate--> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" abstract="false" lazy-init="false" autowire="default" > <property name="dataSource"> <ref bean="mySqlDataSource" /> </property> </bean>
2 修改 PropertyPlaceholderConfigurer
配置,修改后的内容如下:
1 2 3 4 5 6 7 8 9 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>file:${activemq.conf}/credentials.properties</value> <!--这一行是新增的--> <value>file:${activemq.conf}/db.properties</value> </list> </property> </bean>
3 将2.4中修改的broker
节点中的plugins
修改为以下内容
1 2 3 4 5 6 7 8 <plugins> <bean xmlns="http://www.springframework.org/schema/beans" id="AuthPlugin" class="com.yishuifengxiao.study.activemq_plugin.AuthPlugin"> <constructor-arg> <ref bean="jdbcTemplate"/> </constructor-arg> </bean> </plugins>
4 在activemq\conf\
目录下加入db.properties
文件
该文件的内容如下:
1 2 3 4 jdbc.driverClassName=com.mysql.jdbc.Driver jdbc.url=jdbc:mysql://192.168.195.130:3306/demo?autoReconnect=true&useUnicode=true&characterEncoding=utf8 jdbc.username=root jdbc.password=123456ss
完成上述修改后,重新将代码打包成jar
中,部署到activemq中,然后重启即可。
三 连接状态监控 通过订阅ActiveMQ.Advisory.Connection
就能获取到断开与超时
3.1 配置activeMQ.xml 在activeMQ.xml
的destinationPolicy
节点上配置
1 2 3 4 5 6 7 <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
打开Advisories , 默认Advisory的功能是关闭的
3.2 编写监听代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package activemqClient.activemqClient; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.RemoveInfo; public class ConnectionTest { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("manage","123456","tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(false/*支持事务*/, Session.AUTO_ACKNOWLEDGE); ActiveMQTopic test = AdvisorySupport.getConnectionAdvisoryTopic(); MessageConsumer consumer = session.createConsumer(test); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof ActiveMQMessage) { ActiveMQMessage msg = (ActiveMQMessage) message; System.out.println(msg); if(msg.getDataStructure() instanceof ConnectionInfo) { ConnectionInfo info = (ConnectionInfo) msg.getDataStructure(); System.out.println(info.getUserName() +"建立连接:"+info.getConnectionId()); }else if(msg.getDataStructure() instanceof RemoveInfo ) { RemoveInfo info = (RemoveInfo) msg.getDataStructure(); System.out.println("断开连接:"+info.getObjectId()); } } } }); } }
客户端一建立连接首先的请求就是 tcp://ActiveMQ.Advisory
,所以我们的建立连接以及 订阅或者发布都是tcp://XXX
形式来所以我们的 ActiveMQ.Advisory.Connection topic
也是可以订阅的
参考资料
1 2 3 4 5 6 7 https://blog.csdn.net/SpbDev/article/details/106520573 https://blog.csdn.net/weixin_33971977/article/details/92832351 https://www.itcto.cn/mqtt/connectauth/ https://www.cnblogs.com/huangzhex/p/6339761.html (重要)