消息简介
从广义角度来说,消息其实就是信息,但是和信息又有所不同。信息通常被定义为一组数据,而消息除了具有数据的特征之外,还有消息的来源与接收的概念。通常发送消息的一方称为消息的生产者,接收消息的一方称为消息的消费者
Java处理消息的标准规范
目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:
为什么是三大类,而不是三个技术呢?因为这些都是规范,就想JDBC技术,是个规范,开发针对规范开发,运行还要靠实现类,例如MySQL提供了JDBC的实现,最终运行靠的还是实现。并且这三类规范都是针对异步消息进行处理的,也符合消息的设计本质,处理异步的业务。对以上三种消息规范做一下普及
JMS
- JMS(Java Message Service):一个规范,等同于JDBC规范,提供了与消息服务相关的API接口
- JMS消息模型
- peer-2-peer:点对点模型,消息发送到一个队列中,队列保存消息。队列的消息只能被一个消费者消费,或超时
- publish-subscribe:发布订阅模型,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方的存在
- JMS消息种类
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
- ObjectMessage
- Message (只有消息头和属性)
- JMS实现:ActiveMQ、Redis、HornetMQ、RabbitMQ、RocketMQ(没有完全遵守JMS规范)
AMQP
- AMQP(advanced message queuing protocol):一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式,兼容JMS
- 优点:具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现
- AMQP消息模型
- direct exchange
- fanout exchange
- topic exchange
- headers exchange
- system exchange
- AMQP消息种类:byte[]
- AMQP实现:RabbitMQ、StormMQ、RocketMQ
MQTT
- MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一
KafKa
Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。
购物订单案例-发送短信
为了便于下面演示各种各样的消息中间件技术,我们创建一个购物过程生成订单时为用户发送短信的案例环境,模拟使用消息中间件实现发送手机短信的过程。
手机验证码案例需求如下:
执行下单业务时(模拟此过程),调用消息服务,将要发送短信的订单id传递给消息中间件
消息处理服务接收到要发送的订单id后输出订单id(模拟发短信)
由于不涉及数据读写,仅开发业务层与表现层,其中短信处理的业务代码独立开发,代码如下:
订单业务
@RestController @RequestMapping("/orders") public class OrderController { @Autowired private OrderService orderService;
@PostMapping("{id}") public void order(@PathVariable String id){ orderService.order(id); } }
|
public interface OrderService { void order(String id); }
|
@Service public class OrderServiceImpl implements OrderService { @Autowired private MessageService messageService; @Override public void order(String id) { System.out.println("订单处理开始"); messageService.sendMessage(id); System.out.println("订单处理结束"); System.out.println(); } }
|
短信处理业务
@RestController @RequestMapping("/msgs") public class MessageController { @Autowired private MessageService messageService;
@GetMapping public String doMessage(){ String id = messageService.doMessage(); return id; } }
|
public interface MessageService { void sendMessage(String id); String doMessage(); }
|
@Service public class MessageServiceImpl implements MessageService { private ArrayList<String> msgList = new ArrayList<String>();
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列,id:" + id); msgList.add(id); System.out.println("当前短信队列" + msgList.toString()); }
@Override public String doMessage() { String id = msgList.remove(0); System.out.println("已完成短信发送业务,id:" + id); System.out.println("当前短信队列" + msgList.toString()); return id; } }
|
短信处理表现层接口暂且开发出一个处理消息的入口,但是此业务是对应业务层中设计的模拟接口,实际业务不需要设计此接口。
下面开启springboot整合各种各样的消息中间件,从严格满足JMS规范的ActiveMQ开始
ActiveMQ
安装
windows版安装包下载地址:https://activemq.apache.org/components/classic/download/
下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件
启动服务器
运行bin目录下的win32或win64目录下的activemq.bat命令即可,根据自己的操作系统选择即可,默认对外服务端口61616。
访问web管理服务
ActiveMQ启动后会启动一个Web控制台服务,可以通过该服务管理ActiveMQ。
web管理服务默认端口8161,访问后可以打开ActiveMQ的管理界面,如下:
首先输入访问用户名和密码,初始化用户名和密码相同,均为:admin,成功登录后进入管理后台界面,如下:
看到上述界面视为启动ActiveMQ服务成功。
服务端口:61616,管理后台端口:8161
启动失败
在ActiveMQ启动时要占用多个端口,以下为正常启动信息:
wrapper | –> Wrapper Started as Console
wrapper | Launching a JVM…
jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved.
jvm 1 |
jvm 1 | Java Runtime: Oracle Corporation 1.8.0_172 D:\soft\jdk1.8.0_172\jre
jvm 1 | Heap sizes: current=249344k free=235037k max=932352k
jvm 1 | JVM args: -Dactivemq.home=…/… -Dactivemq.base=…/… -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=…/…/conf/broker.ks -Djavax.net.ssl.trustStore=…/…/conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=…/…/conf -Dactivemq.data=…/…/data -Djava.security.auth.login.config=…/…/conf/login.config -Xmx1024m -Djava.library.path=…/…/bin/win64 -Dwrapper.key=7ySrCD75XhLCpLjd -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=9364 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
jvm 1 | Extensions classpath:
jvm 1 | […\lib,…\lib\camel,…\lib\optional,…\lib\web,…\lib\extra]
jvm 1 | ACTIVEMQ_HOME: …
jvm 1 | ACTIVEMQ_BASE: …
jvm 1 | ACTIVEMQ_CONF: …\conf
jvm 1 | ACTIVEMQ_DATA: …\data
jvm 1 | Loading message broker from: xbean:activemq.xml
jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@5f3ebfe0: startup date [Mon Feb 28 16:07:48 CST 2022]; root of context hierarchy
jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\soft\activemq\bin\win64…\data\kahadb]
jvm 1 | INFO | KahaDB is version 7
jvm 1 | INFO | PListStore:[D:\soft\activemq\bin\win64…\data\localhost\tmp_storage] started
jvm 1 | INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10434-1646035669595-0:1) is starting
jvm 1 | INFO | Listening for connections at: tcp://CZBK-20210302VL:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector openwire started
jvm 1 | INFO | Listening for connections at: amqp://CZBK-20210302VL:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector amqp started
jvm 1 | INFO | Listening for connections at: stomp://CZBK-20210302VL:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector stomp started
jvm 1 | INFO | Listening for connections at: mqtt://CZBK-20210302VL:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector mqtt started
jvm 1 | INFO | Starting Jetty server
jvm 1 | INFO | Creating Jetty connector
jvm 1 | WARN | ServletContext@o.e.j.s.ServletContextHandler@7350746f{/,null,STARTING} has uncovered http methods for path: /
jvm 1 | INFO | Listening for connections at ws://CZBK-20210302VL:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector ws started
jvm 1 | INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10434-1646035669595-0:1) started
jvm 1 | INFO | For help or more information please see: http://activemq.apache.org
jvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\soft\activemq\bin\win64…\data\kahadb only has 68936 mb of usable space. - resetting to maximum available disk space: 68936 mb
jvm 1 | INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/
jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
==
其中占用的端口有:61616、5672、61613、1883、61614,如果启动失败,请先管理对应端口即可。以下就是某个端口占用的报错信息,可以从抛出异常的位置看出,启动5672端口时端口被占用,显示java.net.BindException: Address already in use: JVM_Bind。Windows系统中终止端口运行的操作参看【命令行启动常见问题及解决方案】
wrapper | --> Wrapper Started as Console wrapper | Launching a JVM... jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved. jvm 1 | jvm 1 | Java Runtime: Oracle Corporation 1.8.0_172 D:\soft\jdk1.8.0_172\jre jvm 1 | Heap sizes: current=249344k free=235038k max=932352k jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=QPJoy9ZoXeWmmwTS -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=14836 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1 jvm 1 | Extensions classpath: jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra] jvm 1 | ACTIVEMQ_HOME: ..\.. jvm 1 | ACTIVEMQ_BASE: ..\.. jvm 1 | ACTIVEMQ_CONF: ..\..\conf jvm 1 | ACTIVEMQ_DATA: ..\..\data jvm 1 | Loading message broker from: xbean:activemq.xml jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@2c9392f5: startup date [Mon Feb 28 16:06:16 CST 2022]; root of context hierarchy jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\soft\activemq\bin\win64\..\..\data\kahadb] jvm 1 | INFO | KahaDB is version 7 jvm 1 | INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] started jvm 1 | INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) is starting jvm 1 | INFO | Listening for connections at: tcp://CZBK-20210302VL:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector openwire started jvm 1 | ERROR | Failed to start Apache ActiveMQ (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) jvm 1 | java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind jvm 1 | at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:28) jvm 1 | at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2288) jvm 1 | at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2769) jvm 1 | at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2665) jvm 1 | at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:780) jvm 1 | at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:742) jvm 1 | at org.apache.activemq.broker.BrokerService.start(BrokerService.java:645) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:73) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1748) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1685) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1615) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553) jvm 1 | at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481) jvm 1 | at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312) jvm 1 | at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) jvm 1 | at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308) jvm 1 | at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) jvm 1 | at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54) jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63) jvm 1 | at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63) jvm 1 | at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.apache.activemq.console.Main.runTaskClass(Main.java:262) jvm 1 | at org.apache.activemq.console.Main.main(Main.java:115) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240) jvm 1 | at java.lang.Thread.run(Thread.java:748) jvm 1 | Caused by: java.io.IOException: Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind jvm 1 | at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:34) jvm 1 | at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:146) jvm 1 | at org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:62) jvm 1 | at org.apache.activemq.transport.TransportFactorySupport.bind(TransportFactorySupport.java:40) jvm 1 | at org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:335) jvm 1 | at org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:145) jvm 1 | at org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:110) jvm 1 | at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2283) jvm 1 | ... 46 more jvm 1 | Caused by: java.net.BindException: Address already in use: JVM_Bind jvm 1 | at java.net.DualStackPlainSocketImpl.bind0(Native Method) jvm 1 | at java.net.DualStackPlainSocketImpl.socketBind(DualStackPlainSocketImpl.java:106) jvm 1 | at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) jvm 1 | at java.net.PlainSocketImpl.bind(PlainSocketImpl.java:190) jvm 1 | at java.net.ServerSocket.bind(ServerSocket.java:375) jvm 1 | at java.net.ServerSocket.<init>(ServerSocket.java:237) jvm 1 | at javax.net.DefaultServerSocketFactory.createServerSocket(ServerSocketFactory.java:231) jvm 1 | at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:143) jvm 1 | ... 52 more jvm 1 | INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) is shutting down jvm 1 | INFO | socketQueue interrupted - stopping jvm 1 | INFO | Connector openwire stopped jvm 1 | INFO | Could not accept connection during shutdown : null (null) jvm 1 | INFO | Connector amqp stopped jvm 1 | INFO | Connector stomp stopped jvm 1 | INFO | Connector mqtt stopped jvm 1 | INFO | Connector ws stopped jvm 1 | INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] stopped jvm 1 | INFO | Stopping async queue tasks jvm 1 | INFO | Stopping async topic tasks jvm 1 | INFO | Stopped KahaDB jvm 1 | INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) uptime 0.426 seconds jvm 1 | INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) is shutdown jvm 1 | INFO | Closing org.apache.activemq.xbean.XBeanBrokerFactory$1@2c9392f5: startup date [Mon Feb 28 16:06:16 CST 2022]; root of context hierarchy jvm 1 | WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind jvm 1 | ERROR: java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:91) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63) jvm 1 | at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63) jvm 1 | at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.apache.activemq.console.Main.runTaskClass(Main.java:262) jvm 1 | at org.apache.activemq.console.Main.main(Main.java:115) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240) jvm 1 | at java.lang.Thread.run(Thread.java:748) jvm 1 | Caused by: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | at org.springframework.context.support.AbstractRefreshableApplicationContext.getBeanFactory(AbstractRefreshableApplicationContext.java:164) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1034) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:555) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54) jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87) jvm 1 | ... 16 more jvm 1 | ERROR: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext jvm 1 | at org.springframework.context.support.AbstractRefreshableApplicationContext.getBeanFactory(AbstractRefreshableApplicationContext.java:164) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1034) jvm 1 | at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:555) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64) jvm 1 | at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104) jvm 1 | at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71) jvm 1 | at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54) jvm 1 | at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63) jvm 1 | at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154) jvm 1 | at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63) jvm 1 | at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.apache.activemq.console.Main.runTaskClass(Main.java:262) jvm 1 | at org.apache.activemq.console.Main.main(Main.java:115) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) jvm 1 | at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) jvm 1 | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) jvm 1 | at java.lang.reflect.Method.invoke(Method.java:498) jvm 1 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240) jvm 1 | at java.lang.Thread.run(Thread.java:748) wrapper | <-- Wrapper Stopped 请按任意键继续. . .
|
小结:
- ActiveMQ下载与安装
- ActiveMQ服务启动(控制台)
springboot整合ActiveMQ
步骤①:导入springboot整合ActiveMQ的starter
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
|
步骤②:配置ActiveMQ的服务器地址
spring: activemq: broker-url: tcp://localhost:61616
|
步骤③:使用JmsMessagingTemplate操作ActiveMQ
@Service public class MessageServiceActivemqImpl implements MessageService { @Autowired private JmsMessagingTemplate messagingTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列,id:"+id); messagingTemplate.convertAndSend("order.queue.id",id); }
@Override public String doMessage() { String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class); System.out.println("已完成短信发送业务,id:"+id); return id; } }
|
发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使用id作为消息内容。
接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。
步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component public class MessageListener { @JmsListener(destination = "order.queue.id") @SendTo("order.other.queue.id") public String receive(String id){ System.out.println("已完成短信发送业务,id:"+id); return "new:"+id; } }
|
使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。
如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。
步骤⑤:切换消息模型由点对点模型到发布订阅模型,修改jms配置即可
spring: activemq: broker-url: tcp://localhost:61616 jms: pub-sub-domain: true
|
pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。
总结
- springboot整合ActiveMQ提供了JmsMessagingTemplate对象作为客户端操作消息队列
- 操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口61616
- 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@JmsListener
- 配置jms的pub-sub-domain属性可以在点对点模型和发布订阅模型间切换消息模型
RabbitMQ
安装
RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。
Erlang安装
windows版安装包下载地址:https😕/www.erlang.org/downloads
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕需要重启,需要重启,需要重启。
安装的过程中可能会出现依赖Windows组件的提示,根据提示下载安装即可,都是自动执行的,如下:
Erlang安装后需要配置环境变量,否则RabbitMQ将无法找到安装的Erlang。需要配置项如下,作用等同JDK配置环境变量的作用。
ERLANG_HOME
PATH
在系统变量的path里添加
安装
windows版安装包下载地址:https://rabbitmq.com/install-windows.html
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕后会得到如下文件
启动服务器
rabbitmq-service.bat start # 启动服务 rabbitmq-service.bat stop # 停止服务 rabbitmqctl status # 查看服务状态
|
运行sbin目录下的rabbitmq-service.bat命令即可(cmd命令行要有管理员权限),start参数表示启动,stop参数表示退出,默认对外服务端口5672。
注意:启动rabbitmq的过程实际上是开启rabbitmq对应的系统服务,需要管理员权限方可执行。
说明:有没有感觉5672的服务端口很熟悉?activemq与rabbitmq有一个端口冲突问题,学习阶段无论操作哪一个?请确保另一个处于关闭状态。
说明:不喜欢命令行的小伙伴可以使用任务管理器中的服务页,找到RabbitMQ服务,使用鼠标右键菜单控制服务的启停。
访问web管理服务
RabbitMQ也提供有web控制台服务,但是此功能是一个插件,需要先启用才可以使用,在sbin目录下运行下面的命令
rabbitmq-plugins.bat list # 查看当前所有插件的运行状态 rabbitmq-plugins.bat enable rabbitmq_management # 启动rabbitmq_management插件
|
启动插件后可以在插件运行状态中查看是否运行,运行后通过浏览器即可打开服务后台管理界面
web管理服务默认端口15672,服务端口:5672,访问后可以打开RabbitMQ的管理界面,如下:
首先输入访问用户名和密码,初始化用户名和密码相同,均为:guest,成功登录后进入管理后台界面,如下:
springboot整合RabbitMQ(direct模式)
RabbitMQ满足AMQP协议,因此不同的消息模型对应的制作不同,先使用最简单的direct模型开发。
步骤①:导入springboot整合amqp的starter,amqp协议默认实现为rabbitmq方案
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
步骤②:配置RabbitMQ的服务器地址
spring: rabbitmq: host: localhost port: 5672
|
步骤③:初始化直连模式系统设置
由于RabbitMQ不同模型要使用不同的交换机,因此需要先初始化RabbitMQ相关的对象,例如队列,交换机等
@Configuration public class RabbitConfigDirect { @Bean public Queue directQueue(){ return new Queue("direct_queue"); } @Bean public Queue directQueue2(){ return new Queue("direct_queue2"); } @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } @Bean public Binding bindingDirect(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct"); } @Bean public Binding bindingDirect2(){ return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2"); } }
|
队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。
步骤④:使用AmqpTemplate操作RabbitMQ
@Service public class MessageServiceRabbitmqDirectImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id:"+id); amqpTemplate.convertAndSend("directExchange","direct",id); } @Override public String doMessage() { return null; } }
|
amqp协议中的操作API接口名称看上去和jms规范的操作API接口很相似,但是传递参数差异很大。
步骤⑤:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component public class MessageListener { @RabbitListener(queues = "direct_queue") public void receive(String id){ System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id); } }
|
使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
小结:
- SpringBoot整合RabbitMQ直连交换机模式
springboot整合RabbitMQ(topic模式)
步骤①:同上
步骤②:同上
步骤③:初始化主题模式系统设置
@Configuration public class RabbitConfigTopic { @Bean public Queue topicQueue(){ return new Queue("topic_queue"); } @Bean public Queue topicQueue2(){ return new Queue("topic_queue2"); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding bindingTopic(){ return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id"); } @Bean public Binding bindingTopic2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*"); } }
|
主题模式支持routingKey匹配模式,
*表示匹配一个单词,
#表示匹配任意内容,
这样就可以通过主题交换机将消息分发到不同的队列中。
匹配键 |
topic.*.* |
topic.# |
topic.order.id |
true |
true |
order.topic.id |
false |
false |
topic.sm.order.id |
false |
true |
topic.sm.id |
false |
true |
topic.id.order |
true |
true |
topic.id |
false |
true |
topic.order |
false |
true |
步骤④:使用AmqpTemplate操作RabbitMQ
@Service public class MessageServiceRabbitmqTopicImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id:"+id); amqpTemplate.convertAndSend("topicExchange","topic.orders.id",id); } @Override public String doMessage() { return null; } }
|
发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进行匹配,规则匹配成功消息才会进入到对应的队列中。
步骤⑤:使用消息监听器在服务器启动后,监听指定队列
@Component public class MessageListener { @RabbitListener(queues = "topic_queue") public void receive(String id){ System.out.println("已完成短信发送业务(rabbitmq topic 1),id:"+id); } @RabbitListener(queues = "topic_queue2") public void receive2(String id){ System.out.println("已完成短信发送业务(rabbitmq topic 22222222),id:"+id); } }
|
使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
RocketMQ
安装
RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议。
安装
windows版安装包下载地址:https://rocketmq.apache.org/
下载完毕后得到zip压缩文件,解压缩即可使用,解压后得到如下文件
RocketMQ安装后需要配置环境变量,具体如下:
关于NAMESRV_ADDR对于初学者来说建议配置此项,也可以通过命令设置对应值,操作略显繁琐,建议配置。系统学习RocketMQ知识后即可灵活控制该项。
RocketMQ工作模式
在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器进行通信。broker启动后会通知命名服务器自己已经上线,这样命名服务器中就保存有所有的broker信息。当生产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到一个信息中心的作用。并且broker启动前必须保障命名服务器先启动。
启动服务器
先启动命名服务器
启动broker
运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876。
运行bin目录下的mqbroker命令即可启动broker服务器,如果环境变量中没有设置NAMESRV_ADDR则需要在运行mqbroker指令前通过set指令设置NAMESRV_ADDR的值,并且每次开启均需要设置此项。
测试服务器启动状态
RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用。
tools org.apache.rocketmq.example.quickstart.Producer # 生产消息 tools org.apache.rocketmq.example.quickstart.Consumer # 消费消息
|
springboot整合RocketMQ
步骤①:导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
|
步骤②:配置RocketMQ的服务器地址
rocketmq: name-server: localhost:9876 producer: group: group_rocketmq
|
设置默认的生产者消费者所属组group。
步骤③:使用RocketMQTemplate操作RocketMQ
使用asyncSend方法发送异步消息。
@Service public class MessageServiceRocketmqImpl implements MessageService {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Override public void sendMessage(String id) { SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功"); }
@Override public void onException(Throwable e) { System.out.println("消息发送失败!!!!!"); } }; rocketMQTemplate.asyncSend("order_id", id, callback); System.out.println("使用Rabbitmq将待发送短信的订单纳入处理队列,id:" + id); }
@Override public String doMessage() { return null; } }
|
步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component @RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq") public class MessageListener implements RocketMQListener<String> { @Override public void onMessage(String id) { System.out.println("已完成短信发送业务(rocketmq),id:"+id); } }
|
RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。
使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。
总结
- springboot整合RocketMQ使用RocketMQTemplate对象作为客户端操作消息队列
- 操作RocketMQ需要配置RocketMQ服务器地址,默认端口9876
- 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RocketMQMessageListener
Kafka
安装
安装
windows版安装包下载地址:https://kafka.apache.org/downloads
下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件
建议使用windows版2.8.1版本。
启动服务器
kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心,相关知识请到对应课程中学习。
zookeeper-server-start.bat ..\..\config\zookeeper.properties # 启动zookeeper kafka-server-start.bat ..\..\config\server.properties # 启动kafka
|
运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。
运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。
创建主题
和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic。
# 创建topic kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic # 查询topic kafka-topics.bat --zookeeper 127.0.0.1:2181 --list # 删除topic kafka-topics.bat --delete --zookeeper localhost:2181 --topic testTopic
|
测试服务器启动状态
Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。
kafka-console-producer.bat --broker-list localhost:9092 --topic testTopic # 测试生产消息 kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testTopic --from-beginning # 测试消息消费
|
springboot整合Kafka
步骤①:导入springboot整合Kafka的starter,此坐标由springboot维护版本
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
步骤②:配置Kafka的服务器地址
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: order
|
设置默认的生产者消费者所属组id。
步骤③:使用KafkaTemplate操作Kafka
@Service public class MessageServiceKafkaImpl implements MessageService { @Autowired private KafkaTemplate<String,String> kafkaTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id); kafkaTemplate.send("testTopic",id); } @Override public String doMessage() { return null; } }
|
使用send方法发送消息,需要传入topic名称。
步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component public class MessageListener { @KafkaListener(topics = "testTopic") public void onMessage(ConsumerRecord<String,String> record){ System.out.println("已完成短信发送业务(kafka),id:"+record.value()); } }
|
使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。
小结
- springboot整合Kafka使用KafkaTemplate对象作为客户端操作消息队列
- 操作Kafka需要配置Kafka服务器地址,默认端口9092
- 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@KafkaListener。接收消息保存在形参ConsumerRecord对象中