spring boot 常驻 spring boot integration

admin2024-08-22  7


Spring Integration

  • 一、何为 Spring Integration
  • 1、Message
  • 2、Channel
  • 3、Message EndPoint
  • 二、Spring Integration Java DSL
  • 三、项目示例
  • 1、新建项目
  • 2、编辑流程
  • 3、测试运行


一、何为 Spring Integration

  • Spring Integration 提供了基于 Spring 的 EIP(Enterprise Integration Patterns,企业集成模式)的实现,主要解决不同系统之间的交互问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。
  • Spring Integration主要由 Message、ChannelMessage EndPoint 构成。

1、Message

  • Message 即数据,需要在不同部分之间传递的数据。Message 通常由消息体和消息头组成。消息的具体内容可以是任何能够被当前的技术解析可视化的数据类型。通常,消息头的元数据就是解释消息体的内容。

2、Channel

  • Channel 即通道,消息要在不同部分之间传递,就需要一种介质,这种介质就是Channel
  • 一个消息在收发两方的具体细节就是:消息发送者将消息发送到 Channel 中,接收者从 Channel中接收消息。
  • Spring Integration提供了三个 Channel 接口:MessageChannel、PollableChannelSubscribableChannel
  • 其中 MessageChannel 是顶级接口,是另外两个接口的父类;PollableChannle 是一个轮询接口,另外一个则是订阅接口。
  • Spring Integration 在提供接口的同时,提供了如下表的常用消息通道:

通道名

方式

PublishSubscribeChannel

以广播的形式将消息发送给所有订阅者

QueueChannel

以轮询的方式从通道中接收消息,用一个 queue 接收消息,队列大小可配置

PriorityChannel

按照优先级将数据存储到对,依据于消息的消息头 priority 属性

RendezvousChannel

确保每一个接收者都接收到消息后再发送消息

DirectChannel

Spring Integration 默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收

ExecuteorChannel

绑定一个多线程的 task executor 来发送消息

  • 与此同时,Spring Integration 海提供了一个 ChannelInterceptor,也就是通道拦截器,用于拦截发送和接收消息的操作;通常我们只需实现这个接口即可,而可用 channel.addInterceptor(someInterceptor) 给所有的通道增加拦截器。

3、Message EndPoint

  • Message EndPoint,译为消息端点,是真正处理消息的组件,可以控制通道的路由,Spring Integration 提供的可用消息端点如下:

端点

用途

Channel Adapter

通道适配器,一种连接外部系统或传输协议的端点,分为 inbound 和 outbound,即入站适配器和出站适配器,入站负责接收消息,出站负责发送消息,Spring Integration 对大多数通信协议都有适配器支持。

Gateway

网关,提供双向的请求/返回集成模式,同样分为入站和出站。

Router

路由,可以消息体类型、消息头值以及定义好的接收表为条件,将消息分发到不同的通道

Service Activator

调用 Bean 处理消息并将结果输出到指定的消息通道

Filter

过滤器,决定消息是否可以传递给消息通道

Splitter

拆分器,将消息拆分为几个部件单独处理,拆分器的返回值是一个集合或数组

Aggregator

聚合器,以 java.util.List 为接收参数,将多个消息合并为一个

Enricher

增强器,用于给收到的消息增加额外信息,分为消息头增强器和消息体增强器

Transformer

转换器,对收到的消息进行逻辑转换,如格式转换

Bridge

桥接器,用于将两个消息通道连接起来

二、Spring Integration Java DSL

  • Spring Integration 提供 IntegrationFlow 定义系统继承流程,通过 IntegrationFlows 和 IntegrationFlowBuilder 实现使用 Fluent API 定义流程。
  • Fluent API 中提供如下方法来映射 Spring Integration 的端点
transform()	-> Transformer
	filter()	-> Filter
	handle()	-> ServiceActivator、Adapter、Gateway
	split()		-> Spliter
	aggregate()	-> Aggreator
	route()		-> Router
	bridge()	-> Bridge
  • 简单流程示例:
@Bean
public IntegrationFlow demo(){
	return IntegrationFlows.from("news")
		.<String, Integer>transform(Integer::parseInt)
		.get();
}

三、项目示例

  • 从 https://spring.io/blog.atom 获取资源,主要是一些 URL 链接,然后根据不同关键字将消息发送到不同消息通道。

1、新建项目

  • 新建一个 Spring Boot 项目,初始依赖选择 Integration 和 mail。
  • 手动添加如下三个依赖,
<!--        Spring integration 对 atom 和 mail 的依赖-->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-feed</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mail</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
        <version>1.1.0.M1</version>
    </dependency>
  • 整个 POM 文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.0.M4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.pyc</groupId>
    <artifactId>mymessage</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mymessage</name>
    <description>Learning Integration and mail</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

<!--        Spring integration 对 atom 和 mail 的依赖-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-feed</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mail</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
            <version>1.1.0.M1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  • application.properties 文件可以什么都不编辑,也可以编辑日志文件的位置。

2、编辑流程

  • 流程分为读取流程、releases 流程、engineering 流程和 news 流程,所有流程的代码都在入口类中完成编辑,如下:
package com.pyc.mymessage;

import static java.lang.System.getProperty;
import com.rometools.rome.feed.synd.SyndEntry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.mail.Mail;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.dsl.file.Files;

import java.io.File;
import java.io.IOException;


@SpringBootApplication
public class MymessageApplication {

    public static void main(String[] args) {
        SpringApplication.run(MymessageApplication.class, args);
    }

    //---------------------------------------
    // Flow path of read
    // using @value annotation to gain resources from https://spring.io/blog.atom by automatic
    @Value("https://spring.io/blog.atom")
    Resource resource;

    // using Fluent API and pollers to configure acquiescent way of poll
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller(){
        return Pollers.fixedRate(500).get();
    }

    // here is a position where construct adapter of inbox channel of feed and use the adapter as data input
    @Bean
    public FeedEntryMessageSource feedMessageSource() throws IOException{
        return new FeedEntryMessageSource(resource.getURL(), "news");
    }

    @Bean
    public IntegrationFlow myFlow() throws IOException{
        // flow path begin with the method called from
        return IntegrationFlows.from(feedMessageSource())
                // select route by route method,type of payload is SyndEntry,
        //the criteria type is string and the criteria value from Categroy what is classify by payload
        .<SyndEntry, String> route(payload->payload.getCategories().get(0).getName(),
                // send the different value to different message channel
                mapping->mapping.channelMapping("releases", "releasesChannel")
                .channelMapping("engineering", "engineeringChannel")
                .channelMapping("news", "newsChannel")
                ).get();    // Get the IntegrationFlow entity by get method and configure as a bean of spring
    }
    //--------------------------------------
    //-------------------------------------------
    // Releases flow path
    @Bean
    public IntegrationFlow releasesFlow(){
        // start read the data came from message channel releasesChannel
        return IntegrationFlows.from(MessageChannels.queue("releasesChannel",10))
                // Data conversion by using transform method. type of payload is SyndEntry,
                // convert it to string type and custom data format
                .<SyndEntry, String>transform(payload->"《"+
                        payload.getTitle()+"》"+payload.getLink()+ getProperty("line.separator"))
                // handling the outbound adapter of file by using handle method.
                //Files class is a Fluent API provided by Spring Integration Java DSL to construct adapter of output files
                .handle(
                        Files.outboundAdapter(new File("springblog"))
                        .fileExistsMode(FileExistsMode.APPEND)
                        .charset("UTF-8")
                        .fileNameGenerator(message -> "releases.txt")
                        .get()
                ).get();
    }
    //--------------------------------------------
    //--------------------------------------------
    // Engineering flow path
    @Bean
    public IntegrationFlow engineeringFlow(){
        return IntegrationFlows.from(MessageChannels.queue("engineeringChannel",10))
                .<SyndEntry, String>transform(
                        e->"《"+e.getTitle()+"》"+e.getLink()
                        +getProperty("line.separator")
                ).handle(
                        Files.outboundAdapter(new File("springblog"))
                        .fileExistsMode(FileExistsMode.APPEND)
                        .charset("UTF-8")
                        .fileNameGenerator(message -> "engineering.txt")
                        .get()
                ).get();
    }
    //-------------------------------------------
    @Bean
    public IntegrationFlow newsFlow(){
        return IntegrationFlows.from(MessageChannels.queue("newsChannel", 10))
                .<SyndEntry, String>transform(
                        payload->"《"+payload.getTitle()+"》"+payload.getLink()+getProperty("line.separator")
                )
                // add the information of message head by using enricherHeader
                .enrichHeaders(
                        Mail.headers()
                        .subject("A news come from Spring")
                        .to("553481864@qq.com")
                        .from("15014366986@163.com"))
                // the information which send by mail is constructed by the method of Mail.headers provide by Spring Integration Java DSL
                .handle(
                        Mail.outboundAdapter("smtp.163.com")
                        .port(25)
                        .protocol("smtp")
                        .credentials("15014366986@163.com", "*********")
                        .javaMailProperties(p->p.put("mail.debug", "false")),
                        e->e.id("smtpOut")
                // define the outbound adapter of send mail by using a method called handle
                // Constructed by Mail.outboundAdapter provided from Spring Integration Java DSL
                ).get();
    }
}
  • news 流程特别说一下,因为在这个流程用到了 Mail,我这里用的是网易的163邮箱服务器发送邮件,通过网上查询 163 邮箱用于发送邮件的服务器地址是 smtp.163.com,服务器名称为 smtp,通常用不用 SSL 协议的端口 25,在 credentials() 里填写的是自己的邮箱地址和邮箱密码,值得注意的是为了远程授权成功,密码通常用客户端授权码,不清楚的可以百度怎么获取163邮箱的客户端授权码。

3、测试运行

  • 在本例中不用浏览器访问任何地址,只需等待程序处理完任务后,可以看到在项目的根路径中多了一个 springblog 文件夹,里面有两个 txt 文件:
  • txt 文件的具体内容分别如下:
  • 然后确认 news 流程是否成功,我第一次时是用我自己的163邮箱给自己的 QQ 邮箱发送邮件,然后又给同学和朋友的邮箱发送邮件,查看自己的 QQ 邮箱:
  • 问了同学和朋友,它们也收到了邮件,所以可以总结 news 流程成功无误。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!