Spring Integration
提供了基于 Spring 的 EIP(Enterprise Integration Patterns,企业集成模式)的实现,主要解决不同系统之间的交互问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。Spring Integration
主要由 Message、Channel
和 Message EndPoint
中,接收者从 Channel
中接收消息。Spring Integration
提供了三个 Channel
和 SubscribableChannel
是一个轮询接口,另外一个则是订阅接口。Spring Integration
通道名 | 方式 |
PublishSubscribeChannel | 以广播的形式将消息发送给所有订阅者 |
QueueChannel | 以轮询的方式从通道中接收消息,用一个 queue 接收消息,队列大小可配置 |
PriorityChannel | 按照优先级将数据存储到对,依据于消息的消息头 priority 属性 |
RendezvousChannel | 确保每一个接收者都接收到消息后再发送消息 |
DirectChannel | Spring Integration 默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收 |
ExecuteorChannel | 绑定一个多线程的 task executor 来发送消息 |
Spring Integration
海提供了一个 ChannelInterceptor
,也就是通道拦截器,用于拦截发送和接收消息的操作;通常我们只需实现这个接口即可,而可用 channel.addInterceptor(someInterceptor)
给所有的通道增加拦截器。端点 | 用途 |
Channel Adapter | 通道适配器,一种连接外部系统或传输协议的端点,分为 inbound 和 outbound,即入站适配器和出站适配器,入站负责接收消息,出站负责发送消息,Spring Integration 对大多数通信协议都有适配器支持。 |
Gateway | 网关,提供双向的请求/返回集成模式,同样分为入站和出站。 |
Router | 路由,可以消息体类型、消息头值以及定义好的接收表为条件,将消息分发到不同的通道 |
Service Activator | 调用 Bean 处理消息并将结果输出到指定的消息通道 |
Filter | 过滤器,决定消息是否可以传递给消息通道 |
Splitter | 拆分器,将消息拆分为几个部件单独处理,拆分器的返回值是一个集合或数组 |
Aggregator | 聚合器,以 java.util.List 为接收参数,将多个消息合并为一个 |
Enricher | 增强器,用于给收到的消息增加额外信息,分为消息头增强器和消息体增强器 |
Transformer | 转换器,对收到的消息进行逻辑转换,如格式转换 |
Bridge | 桥接器,用于将两个消息通道连接起来 |
transform() -> Transformer
filter() -> Filter
handle() -> ServiceActivator、Adapter、Gateway
split() -> Spliter
aggregate() -> Aggreator
route() -> Router
bridge() -> Bridge
public IntegrationFlow demo(){
return IntegrationFlows.from("news")
.<String, Integer>transform(Integer::parseInt)
<!-- Spring integration 对 atom 和 mail 的依赖-->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<relativePath/> <!-- lookup parent from repository -->
<description>Learning Integration and mail</description>
<!-- Spring integration 对 atom 和 mail 的依赖-->
package com.pyc.mymessage;
import static java.lang.System.getProperty;
import com.rometools.rome.feed.synd.SyndEntry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.mail.Mail;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.dsl.file.Files;
import java.io.File;
import java.io.IOException;
public class MymessageApplication {
public static void main(String[] args) {
SpringApplication.run(MymessageApplication.class, args);
// Flow path of read
// using @value annotation to gain resources from https://spring.io/blog.atom by automatic
Resource resource;
// using Fluent API and pollers to configure acquiescent way of poll
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers.fixedRate(500).get();
// here is a position where construct adapter of inbox channel of feed and use the adapter as data input
public FeedEntryMessageSource feedMessageSource() throws IOException{
return new FeedEntryMessageSource(resource.getURL(), "news");
public IntegrationFlow myFlow() throws IOException{
// flow path begin with the method called from
return IntegrationFlows.from(feedMessageSource())
// select route by route method,type of payload is SyndEntry,
//the criteria type is string and the criteria value from Categroy what is classify by payload
.<SyndEntry, String> route(payload->payload.getCategories().get(0).getName(),
// send the different value to different message channel
mapping->mapping.channelMapping("releases", "releasesChannel")
.channelMapping("engineering", "engineeringChannel")
.channelMapping("news", "newsChannel")
).get(); // Get the IntegrationFlow entity by get method and configure as a bean of spring
// Releases flow path
public IntegrationFlow releasesFlow(){
// start read the data came from message channel releasesChannel
return IntegrationFlows.from(MessageChannels.queue("releasesChannel",10))
// Data conversion by using transform method. type of payload is SyndEntry,
// convert it to string type and custom data format
.<SyndEntry, String>transform(payload->"《"+
payload.getTitle()+"》"+payload.getLink()+ getProperty("line.separator"))
// handling the outbound adapter of file by using handle method.
//Files class is a Fluent API provided by Spring Integration Java DSL to construct adapter of output files
Files.outboundAdapter(new File("springblog"))
.fileNameGenerator(message -> "releases.txt")
// Engineering flow path
public IntegrationFlow engineeringFlow(){
return IntegrationFlows.from(MessageChannels.queue("engineeringChannel",10))
.<SyndEntry, String>transform(
Files.outboundAdapter(new File("springblog"))
.fileNameGenerator(message -> "engineering.txt")
public IntegrationFlow newsFlow(){
return IntegrationFlows.from(MessageChannels.queue("newsChannel", 10))
.<SyndEntry, String>transform(
// add the information of message head by using enricherHeader
.subject("A news come from Spring")
// the information which send by mail is constructed by the method of Mail.headers provide by Spring Integration Java DSL
.credentials("15014366986@163.com", "*********")
.javaMailProperties(p->p.put("mail.debug", "false")),
// define the outbound adapter of send mail by using a method called handle
// Constructed by Mail.outboundAdapter provided from Spring Integration Java DSL
,服务器名称为 smtp
,通常用不用 SSL 协议的端口 25,在 credentials()
文件夹,里面有两个 txt 文件: