先上代码
Spring学习笔记3——消息队列(rabbitmq), 发送邮件,学习笔记rabbitmq
本节的内容是用户注册时,将邮箱地址先存入rabbitmq队列,之后返回给用户注册成功;之后消息队列的接收者从队列中获取消息,发送邮件给用户。
一、RabbitMQ介绍
如果之前对rabbitmq不了解,推荐先看一下RabbitMQ
Quick(快速手册)。
1、rabbitmq在mac上的安装。
2、rabbitmq简单介绍。
生产者: 负责发送消息到Exchange。
Exchange: 按照一定的策略,负责将消息存入到指定的队列。
队列queue: 负责保存消息。
消费者: 负责从队列中提取消息。
binding: 负责Exchange和队列的关联映射,Exchange和queue是多对多的关系。
二、RabbitMQ在Spring中的实现
1、引入依赖包。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.6.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.0.RELEASE</version>
</dependency>
2、rabbitmq配置文件。
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/rabbit"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--1、配置连接工厂, 如果不配置host, port, username, passowrd, 则按默认值localhost:5672, guest/guest-->
<!--<connection-factory id="connectionFactory" />-->
<connection-factory id="connectionFactory"
host="localhost"
port="5672"
username="everSeeker"
password="333" />
<!--2、配置队列queue, Exchange, 以及将他们结合在一起的binding-->
<!--在queue以及exchange中, 有一个重要的属性durable, 默认为true, 可以防止宕机后数据丢失。-->
<!--在listener-container中, 有acknowledge属性, 默认为auto, 即消费者成功处理消息后必须有个应答, 如果消费者程序发生异常或者宕机, 消息会被重新放回队列-->
<admin connection-factory="connectionFactory" />
<queue id="userAlertEmailQueue" name="user.alerts.email" durable="true" />
<queue id="userAlertCellphoneQueue" name="user.alerts.cellphone" /> <!--durable默认为true-->
<!--标准的AMQP Exchange有4种: Direct, Topic, Headers, Fanout, 根据实际需要选择。-->
<!--Direct: 如果消息的routing key与bingding的routing key直接匹配的话, 消息将会路由到该队列上。-->
<!--Topic: 如果消息的routing key与bingding的routing key符合通配符匹配的话, 消息将会路由到该队列上。-->
<!--Headers: 如果消息参数表中的头信息和值都与binding参数表中相匹配, 消息将会路由到该队列上。-->
<!--Fanout: 不管消息的routing key和参数表的头信息/值是什么, 消息将会路由到该队列上。-->
<direct-exchange name="user.alert.email.exchange" durable="true">
<bindings>
<binding queue="user.alerts.email" /> <!--默认的routing key与队列的名称相同-->
</bindings>
</direct-exchange>
<direct-exchange name="user.alert.cellphone.exchange">
<bindings>
<binding queue="user.alerts.cellphone" />
</bindings>
</direct-exchange>
<!--3、配置RabbitTemplate发送消息-->
<template id="rabbitTemplate"
connection-factory="connectionFactory" />
<!--4、配置监听器容器和监听器来接收消息-->
<beans:bean id="userListener" class="com.everSeeker.alerts.UserAlertHandler" />
<listener-container connection-factory="connectionFactory" acknowledge="auto">
<listener ref="userListener"
method="handleUserAlertToEmail"
queues="userAlertEmailQueue" />
<listener ref="userListener"
method="handleUserAlertToCellphone"
queues="userAlertCellphoneQueue" />
</listener-container>
</beans:beans>
如果配置connection-factory时,采用默认的guest/guest账号密码时,有可能会出现org.springframework.amqp.AmqpAuthenticationException:
com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED –
Login was refused using authentication mechanism PLAIN. For details see
the broker
logfile.的错误提示,解决办法是新建一个管理员权限的用户,并允许访问虚拟主机。步骤如下:
1、打开http://localhost:15672/
2、Admin ——> Users, 新建用户,administrator权限。
3、Virtual Hosts,设置新建用户允许访问。
3、生产者发送消息到exchange。
@Service("userAlertService")
public class UserAlertServiceImpl implements UserAlertService {
private RabbitTemplate rabbit;
@Autowired
public UserAlertServiceImpl(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendUserAlertToEmail(User user) {
//convertAndSend(String exchange, String routingKey, Object object), 将对象object封装成Message对象后, 发送给exchange
rabbit.convertAndSend("user.alert.email.exchange", "user.alerts.email", user);
}
}
4、配置消费者来接收消息。
public class UserAlertHandler {
public void handleUserAlertToEmail(User user) {
System.out.println(user);
}
三、通过javax.mail来发送邮件
1、引入依赖包。
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>
2、配置邮件服务器信息。
@Bean
public MailSender mailSender(Environment env) {
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
//如果为普通邮箱, 非ssl认证等, 比如163邮箱
mailSender.setHost(env.getProperty("mailserver.host"));
mailSender.setPort(Integer.parseInt(env.getProperty("mailserver.port")));
mailSender.setUsername(env.getProperty("mailserver.username"));
mailSender.setPassword(env.getProperty("mailserver.password"));
mailSender.setDefaultEncoding("utf-8");
//如果邮件服务器采用了ssl认证, 增加以下配置, 比如gmail邮箱, qq邮箱
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.socketFactory.class","javax.net.ssl.SSLSocketFactory");
props.put("mail.smtp.socketFactory.port", "465");
mailSender.setJavaMailProperties(props);
return mailSender;
}
3、发送邮件。
@Component("userMailService")
public class UserMailServiceImpl implements UserMailService {
private MailSender mailSender;
@Autowired
public UserMailServiceImpl(MailSender mailSender) {
this.mailSender = mailSender;
}
public void sendSimpleUserMail(String to, User user) {
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom("[email protected]");
message.setTo(to);
message.setSubject(user.getUsername() + "信息确认");
message.setText(user.toString());
mailSender.send(message);
}
}
4、消费者调用发送邮件方法即可。
1、参考文献:Spring实战(第4版)。
2、完整代码在github,地址:https://github.com/everseeker0307/register。
http://www.bkjia.com/Javabc/1143342.htmlwww.bkjia.comtruehttp://www.bkjia.com/Javabc/1143342.htmlTechArticleSpring学习笔记3——消息队列(rabbitmq),
发送邮件,学习笔记rabbitmq
本节的内容是用户注册时,将邮箱地址先存入rabbitmq队列,之后返回给用户…
namespace RabbitMQDemo
{
public partial class HelloWorld : Form
{
string queueName1 = "hello_queue1";//消费者1
string queueName2 = "hello_queue2";//消费者2
Action<string> SetText;
/// <summary>
/// 单线程实例
/// </summary>
private static readonly HelloWorld _helloWorld;
static HelloWorld()
{
_helloWorld = new HelloWorld();
}
/// <summary>
/// 单例模式
/// </summary>
public static HelloWorld SingleForm
{ get { return _helloWorld; } }
private HelloWorld()
{
CheckForIllegalCrossThreadCalls = false;
InitializeComponent();
ReseiveMsg(queueName1);
ReseiveMsg(queueName2);
SetText += OnSetText;
}
private void btnSendMsg_Click(object sender, EventArgs e)
{
SendMsg();
}
/// <summary>
/// 发送消息
/// </summary>
private void SendMsg()
{
string message = txtPublisher.Text;
if (message.Trim().Length <= 0)
{
MessageBox.Show("请输入要发送的消息");
}
string queueName = cbBoxQueues.SelectedValue.ToString();
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);
}
}
/// <summary>
/// 接收消息
/// </summary>
private void ReseiveMsg(string queueName)
{
//string queueName = cbBoxQueues.SelectedText;
try
{
var factory = new ConnectionFactory() { HostName = "localhost" };
//connection和channel不能使用using,否则会被dispose掉
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//声明队列 生产者和消费者都需要QueueDeclare
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
txtConsumer1.Invoke(SetText, message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
}
catch (Exception ex)
{
MessageBox.Show(ex.ToString());
}
}
private void OnSetText(string txtContent)
{
string queueName = cbBoxQueues.SelectedValue.ToString();
if (queueName == queueName1)
txtConsumer1.Text += string.Format("{0}\r\n", txtPublisher.Text);
if (queueName == queueName2)
txtConsumer2.Text += string.Format("{0}\r\n", txtPublisher.Text);
}
private void HelloWorld_Load(object sender, EventArgs e)
{
List<DataSource> lst = new List<DataSource>();
lst.Add(new DataSource("消费者1", "hello_queue1"));
lst.Add(new DataSource("消费者2", "hello_queue2"));
cbBoxQueues.DataSource = lst;
cbBoxQueues.DisplayMember = "DisplayMember";
cbBoxQueues.ValueMember = "DisplayValue";
}
private class DataSource
{
public DataSource(string displayMember,string displayValue)
{
DisplayMember = displayMember;
DisplayValue = displayValue;
}
public string DisplayMember { get; set; }
public string DisplayValue { get; set; }
}
}
}
View Code
界面如下:
大致流程是
生产者发送消息到队列,然后队列(rabbitmq)把消息发送给消费者(消费者向rabbitmq索取消息)
两个消费者: