Spring Integration
提供了基于 Spring 的 EIP(Enterprise Integration Patterns,企业集成模式)的实现,主要解决不同系统之间的交互问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。Spring Integration
主要由 Message、Channel
和 Message EndPoint
构成。Message
即数据,需要在不同部分之间传递的数据。Message
通常由消息体和消息头组成。消息的具体内容可以是任何能够被当前的技术解析可视化的数据类型。通常,消息头的元数据就是解释消息体的内容。Channel
即通道,消息要在不同部分之间传递,就需要一种介质,这种介质就是Channel
。Channel
中,接收者从 Channel
中接收消息。Spring Integration
提供了三个 Channel
接口:MessageChannel、PollableChannel
和 SubscribableChannel
。MessageChannel
是顶级接口,是另外两个接口的父类;PollableChannle
是一个轮询接口,另外一个则是订阅接口。Spring Integration
在提供接口的同时,提供了如下表的常用消息通道:
通道名 | 方式 |
PublishSubscribeChannel | 以广播的形式将消息发送给所有订阅者 |
QueueChannel | 以轮询的方式从通道中接收消息,用一个 queue 接收消息,队列大小可配置 |
PriorityChannel | 按照优先级将数据存储到对,依据于消息的消息头 priority 属性 |
RendezvousChannel | 确保每一个接收者都接收到消息后再发送消息 |
DirectChannel | Spring Integration 默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收 |
ExecuteorChannel | 绑定一个多线程的 task executor 来发送消息 |
Spring Integration
海提供了一个 ChannelInterceptor
,也就是通道拦截器,用于拦截发送和接收消息的操作;通常我们只需实现这个接口即可,而可用 channel.addInterceptor(someInterceptor)
给所有的通道增加拦截器。端点 | 用途 |
Channel Adapter | 通道适配器,一种连接外部系统或传输协议的端点,分为 inbound 和 outbound,即入站适配器和出站适配器,入站负责接收消息,出站负责发送消息,Spring Integration 对大多数通信协议都有适配器支持。 |
Gateway | 网关,提供双向的请求/返回集成模式,同样分为入站和出站。 |
Router | 路由,可以消息体类型、消息头值以及定义好的接收表为条件,将消息分发到不同的通道 |
Service Activator | 调用 Bean 处理消息并将结果输出到指定的消息通道 |
Filter | 过滤器,决定消息是否可以传递给消息通道 |
Splitter | 拆分器,将消息拆分为几个部件单独处理,拆分器的返回值是一个集合或数组 |
Aggregator | 聚合器,以 java.util.List 为接收参数,将多个消息合并为一个 |
Enricher | 增强器,用于给收到的消息增加额外信息,分为消息头增强器和消息体增强器 |
Transformer | 转换器,对收到的消息进行逻辑转换,如格式转换 |
Bridge | 桥接器,用于将两个消息通道连接起来 |
transform() -> Transformer
filter() -> Filter
handle() -> ServiceActivator、Adapter、Gateway
split() -> Spliter
aggregate() -> Aggreator
route() -> Router
bridge() -> Bridge
@Bean
public IntegrationFlow demo(){
return IntegrationFlows.from("news")
.<String, Integer>transform(Integer::parseInt)
.get();
}
<!-- Spring integration 对 atom 和 mail 的依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.1.0.M1</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.0.M4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.pyc</groupId>
<artifactId>mymessage</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mymessage</name>
<description>Learning Integration and mail</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!-- Spring integration 对 atom 和 mail 的依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.1.0.M1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.pyc.mymessage;
import static java.lang.System.getProperty;
import com.rometools.rome.feed.synd.SyndEntry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.mail.Mail;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.dsl.file.Files;
import java.io.File;
import java.io.IOException;
@SpringBootApplication
public class MymessageApplication {
public static void main(String[] args) {
SpringApplication.run(MymessageApplication.class, args);
}
//---------------------------------------
// Flow path of read
// using @value annotation to gain resources from https://spring.io/blog.atom by automatic
@Value("https://spring.io/blog.atom")
Resource resource;
// using Fluent API and pollers to configure acquiescent way of poll
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers.fixedRate(500).get();
}
// here is a position where construct adapter of inbox channel of feed and use the adapter as data input
@Bean
public FeedEntryMessageSource feedMessageSource() throws IOException{
return new FeedEntryMessageSource(resource.getURL(), "news");
}
@Bean
public IntegrationFlow myFlow() throws IOException{
// flow path begin with the method called from
return IntegrationFlows.from(feedMessageSource())
// select route by route method,type of payload is SyndEntry,
//the criteria type is string and the criteria value from Categroy what is classify by payload
.<SyndEntry, String> route(payload->payload.getCategories().get(0).getName(),
// send the different value to different message channel
mapping->mapping.channelMapping("releases", "releasesChannel")
.channelMapping("engineering", "engineeringChannel")
.channelMapping("news", "newsChannel")
).get(); // Get the IntegrationFlow entity by get method and configure as a bean of spring
}
//--------------------------------------
//-------------------------------------------
// Releases flow path
@Bean
public IntegrationFlow releasesFlow(){
// start read the data came from message channel releasesChannel
return IntegrationFlows.from(MessageChannels.queue("releasesChannel",10))
// Data conversion by using transform method. type of payload is SyndEntry,
// convert it to string type and custom data format
.<SyndEntry, String>transform(payload->"《"+
payload.getTitle()+"》"+payload.getLink()+ getProperty("line.separator"))
// handling the outbound adapter of file by using handle method.
//Files class is a Fluent API provided by Spring Integration Java DSL to construct adapter of output files
.handle(
Files.outboundAdapter(new File("springblog"))
.fileExistsMode(FileExistsMode.APPEND)
.charset("UTF-8")
.fileNameGenerator(message -> "releases.txt")
.get()
).get();
}
//--------------------------------------------
//--------------------------------------------
// Engineering flow path
@Bean
public IntegrationFlow engineeringFlow(){
return IntegrationFlows.from(MessageChannels.queue("engineeringChannel",10))
.<SyndEntry, String>transform(
e->"《"+e.getTitle()+"》"+e.getLink()
+getProperty("line.separator")
).handle(
Files.outboundAdapter(new File("springblog"))
.fileExistsMode(FileExistsMode.APPEND)
.charset("UTF-8")
.fileNameGenerator(message -> "engineering.txt")
.get()
).get();
}
//-------------------------------------------
@Bean
public IntegrationFlow newsFlow(){
return IntegrationFlows.from(MessageChannels.queue("newsChannel", 10))
.<SyndEntry, String>transform(
payload->"《"+payload.getTitle()+"》"+payload.getLink()+getProperty("line.separator")
)
// add the information of message head by using enricherHeader
.enrichHeaders(
Mail.headers()
.subject("A news come from Spring")
.to("553481864@qq.com")
.from("15014366986@163.com"))
// the information which send by mail is constructed by the method of Mail.headers provide by Spring Integration Java DSL
.handle(
Mail.outboundAdapter("smtp.163.com")
.port(25)
.protocol("smtp")
.credentials("15014366986@163.com", "*********")
.javaMailProperties(p->p.put("mail.debug", "false")),
e->e.id("smtpOut")
// define the outbound adapter of send mail by using a method called handle
// Constructed by Mail.outboundAdapter provided from Spring Integration Java DSL
).get();
}
}
smtp.163.com
,服务器名称为 smtp
,通常用不用 SSL 协议的端口 25,在 credentials()
里填写的是自己的邮箱地址和邮箱密码,值得注意的是为了远程授权成功,密码通常用客户端授权码,不清楚的可以百度怎么获取163邮箱的客户端授权码。springblog
文件夹,里面有两个 txt 文件: