本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。
在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来,您将深入研究Spring Integration的基础知识,例如通道,转换器和适配器。 在这里查看 !
目录
- 1.简介 2.什么是Spring Integration? 3. Spring Integration消息传递系统的核心概念
-
- 3.1讯息 3.2消息通道 3.3消息端点
-
- 4.1通道适配器 4.2变压器 4.3过滤器 4.4路由器 4.5拆分器和聚合器 4.6轮询器 4.7消息桥 4.8消息处理程序链
-
- 5.1信息渠道 5.2网关
4.组成
5.同步和异步通信
6.错误处理
1.简介
在第二篇教程中,您将学习构成Spring Integration核心的基本概念。 在解释了这些概念之后,我们将审查项目随附的不同组件。 此修订版基于3.0.1版本。 考虑到4.0.0版本即将发布,您可能会发现一些本教程中未介绍的新组件。 无论如何,您将获得足够的框架知识,以了解未来组件的行为。
总结本教程,您将学习Spring Integration如何支持不同类型的通信(异步和同步),以及该决定如何影响您的设计。 错误处理是一种特殊情况,上一节对此进行了说明。
本教程由以下部分组成:
- 介绍
- 什么是Spring Integration?
- Spring Integration消息传递系统的核心概念
- 组件
- 同步和异步通讯
- 错误处理
2.什么是Spring Integration?
如上一节所述,Spring Integration基于Enterprise Integration Patterns一书中解释的概念。 这是一个轻量级的消息传递解决方案,它将为您的Spring应用程序添加集成功能。 作为消息传递策略,它提供了一种快速共享信息的方式,并且所涉及的组件或应用程序之间具有高度的去耦性。 您将学习如何在Spring处理任何底层基础架构问题的同时完成此任务。 这将使您可以专注于业务逻辑。
当前,Spring Integration配置主要基于xml,尽管一些注释已开始包含在内。 本教程中显示的示例也将基于xml,尽管我将尽可能显示其各自的注释。
在解释了这一点之后,出现了一个问题:Spring Integration可以做什么? 该框架基本上允许您执行以下操作:
- 它允许基于内存消息传递在应用程序中的组件之间进行通信。 这允许这些应用程序组件彼此松散耦合,并通过消息通道共享数据。
- 它允许与外部系统通信。 您只需要发送信息; Spring Integration将处理将其发送到指定的外部系统,并在必要时带回响应。 当然,这是相反的。 Spring Integration将处理从外部系统到您的应用程序的传入调用。 本教程稍后将对此进行解释。
Spring Integration面向Spring框架的最佳实践,例如使用接口进行编程或在继承技术上进行组合。 它的主要优点是:
- 组件之间的耦合松散。
- 面向事件的体系结构。
- 集成逻辑(由框架处理)与业务逻辑分离。
在下一节中,您将学习此消息传递系统所基于的三个基本概念。
3. Spring Integration消息传递系统的核心概念
消息驱动的体系结构的基本概念是: 消息 , 消息通道和消息端点 。
该API非常简单:
- 消息发送到端点
- 端点之间通过MessageChannels连接
- 端点可以从MessageChannel接收消息
3.1讯息
一条消息包含将在应用程序的不同组件之间共享或发送到外部系统的信息。 但是,这是什么信息? 消息的结构如下:
如您在以下代码片段中所见,消息是一个接口,以GenericMessage作为其主要实现(也由框架提供):
- 标头 :包含有关消息的元信息。 如果检查MessageHeaders类,您将看到它只是Map的包装,但是其插入操作被标记为不支持。 框架这样标记它们,因为消息被认为是不可变的。 创建消息后,您将无法对其进行修改。 您可以以键值对的形式添加自己的标头,但它们主要用于传递传输信息。 例如,如果您要发送电子邮件,它将包含标题,例如to,subject,from。
- 有效载荷 :这只是一个普通的Java类,其中包含您要共享的信息。 它可以是任何Java类型。
如果要创建消息,则有两种选择。 第一个涉及使用构建器类( MessageBuilder )。
Message<String> message = MessageBuilder.withPayload("my message payload").setHeader("key1", "value1").setHeader("key2", "value2").build();
构建消息之前,您必须设置有效负载和必需的标头,因为一旦创建了消息,您将无法执行该操作,除非您创建新消息。
另一个选择是使用框架提供的实现:
Map<String, Object> headers = new HashMap<>();
headers.put("key1", "value1");
headers.put("key2", "value2");Message<String> message = new GenericMessage<String>("my message payload", headers);
3.2消息通道
消息通道是连接端点和消息通过的管道。 生产者向通道发送消息,而消费者从通道接收消息。 通过这种机制,您不需要任何类型的经纪人。
消息通道也可以用作拦截点或用于消息监视。
根据消息的使用方式,消息通道分类如下:
3.2.1点对点
消息通道上只有一个接收器。 好吧,这并非完全是100%正确。 如果是可订阅的频道,则可以有多个接收者,但只有一个可以处理该消息。 现在,请忘记这一点,因为这是一个高级主题,将在本课程的后面部分介绍(调度程序配置)。 这种类型的渠道有几种实现方式:
- DirectChannel :实现SubscribableChannel 。 该消息通过同一接收者的线程发送给订户。 此通信是同步的,并且生产方将阻塞,直到收到响应为止。 怎么运行的:
- 生产者将消息发送到通道。
- QueueChannel :实现PollableChannel 。 有一个端点连接到通道,没有用户。 这种通信是异步的。 接收者将通过其他线程检索消息。 怎么运行的:
- 生产者将消息发送到通道。
- ExecutorChannel :实现
SubscribableChannel
。 发送被委托给TaskExecutor。 这意味着send()方法将不会阻塞。
- PriorityChannel :实现
PollableChannel
。 与QueueChannel相似,但消息按优先级而不是FIFO排序。
- RendezvousChannel :实现
PollableChannel
。 与QueueChannel类似,但容量为零。 生产者将阻塞,直到接收者调用其receive()方法。
3.2.2发布-订阅
该通道可以有多个端点订阅。 因此,该消息将由不同的接收者处理。
- PublishSubscribeChannel :实现
SubscribableChannel
。 订阅的接收者可以通过生产者的线程连续调用。 如果我们指定TaskExecutor,则接收者将通过不同的线程并行调用。
3.2.3临时频道
这是一种特殊的通道,由没有明确定义输出通道的端点自动创建。 创建的通道是点对点匿名通道。 您可以在消息头中的replyChannel
名称下看到它的定义。
发送响应后,会自动删除这些类型的通道。 建议您不要显式定义输出通道(如果不需要)。 该框架将为您处理。
3.3消息端点
它的目标是以非侵入方式将应用程序与消息传递框架连接。 如果您熟悉Spring MVC,则端点将以与MVC控制器处理HTTP请求相同的方式处理消息。 端点将以MVC控制器映射到URL模式的相同方式映射到消息通道。
以下是带有可用消息端点的简要说明的列表:
- 通道适配器 :将应用程序连接到外部系统(单向)。
- 网关 :将应用程序连接到外部系统(双向)。
- 服务激活器 :可以调用服务对象上的操作。
- 变压器 :转换消息的内容。
- 过滤器 :确定消息是否可以继续发送到输出通道。
- 路由器 :决定将消息发送到哪个通道。
- 拆分器 :将邮件拆分为几个部分。
- 聚合器 :将多个消息合并为一个消息。
本教程的下一部分将说明这些端点中的每个端点。
4.组成
在本节中,您将学习什么是不同的端点,以及如何在Spring Integration中使用它们。
4.1通道适配器
通道适配器是允许您的应用程序与外部系统连接的端点。 如果您查看参考,您将看到所提供的类型,例如连接到JMS队列,MongoDB数据库,RMI,Web服务等。
适配器有四种类型:
- 入站通道适配器 :单向。 它从外部系统接收消息。 然后,它通过消息通道进入我们的消息传递系统,我们将在其中进行处理。
- 出站通道适配器 :单向。 我们的消息系统创建一条消息并将其发送到外部系统。
- 入站网关 :双向。 一条消息进入应用程序,并期望得到响应。 响应将发送回外部系统。
- 出站网关 :双向。 该应用程序创建一条消息并将其发送到外部系统。 然后,网关将等待响应。
4.2变压器
该端点用于有效负载转换。 它将有效负载的类型转换为另一种类型。 例如,从String到XML文档。 只要考虑到转换有效负载会产生一条新消息(请记住该消息是不可变的!)。 这种类型的端点增加了生产者与消费者之间的松散耦合,因为消费者不需要知道生产者是什么类型的。 转换器将负责处理并交付用户正在等待的内容类型。
Spring Integration提供了Transformer的几种实现 。 这里有些例子:
- HeaderEnricher:允许在消息中添加标题值。
- ObjectToMapTransformer:将对象转换为地图,将其属性转换为地图值。
- ObjectToStringTransformer:将对象转换为字符串。 它通过调用其toString()操作对其进行转换。
- PayloadSerializingTransformer / PayloadDeserializingTransformer:从Object转换为字节数组, 反之亦然 。
让我们看几个例子:
假设我们有以下模型:
public class Order implements Serializable {private static final long serialVersionUID = 1L;private int id;private String description;public Order() {}public Order(int id, String description) {this.id = id;this.description = description;}@Overridepublic String toString() {return String.valueOf(this.getId());}//Setters & Getters
}
将其发送到名为“ requestChannel”的消息通道时,以下代码段将通过调用Order实例的toString()方法将其自动转换为String:
<int:object-to-string-transformer input-channel="requestChannel" output-channel="transformedChannel"/>
结果字符串将被发送到名为transformedChannel
的输出通道。
如果需要更定制的转换,则可以实现自己的转换器,这是一个普通的bean。 您将需要在transformer元素中指定引用的bean,如下所示:
<int:transformer ref="myTransformer" method="transform"input-channel="requestChannel" output-channel="transformedChannel"/>
转换器将调用名为“ myTransformer”的bean的“ transform”方法。 该bean如下所示:
@Component("myTransformer")
public class MyTransformer {public Order transform(Order requestOrder) {return new Order(requestOrder.getId(), requestOrder.getDescription()+"_modified");}
}
在此示例中,变压器元素的method
属性不是必需的,因为变压器只有一种方法。 如果它有几种方法,则需要设置“ method”属性以告知框架要调用的方法。 或者,如果您更喜欢注释,则可以在方法级别使用@Transformer注释指定方法:
@Component("myTransformer")
public class MyTransformer {@Transformerpublic Order transform(Order requestOrder) {return new Order(requestOrder.getId(), requestOrder.getDescription()+"_modified");}public Order doOtherThings(Order requestOrder) {//do other things}
}
4.3过滤器
过滤器用于确定消息是否应继续其发送方式,或者相反,是否已丢弃。 要决定要做什么,它基于一些标准。
以下过滤器实现将从输入通道接收Order实例,并丢弃带有无效描述的实例。 有效订单将发送到输出通道:
<int:filter ref="myFilter" method="filterInvalidOrders" input-channel="requestChannel" output-channel="filteredChannel"/>
过滤器方法返回布尔类型。 如果返回false,则该消息将被丢弃:
@Component("myFilter")
public class MyFilter {public boolean filterInvalidOrders(Order order) {if (order == null || "invalid order".equals(order.getDescription())) {return false;}return true;}
}
与转换器一样,仅当在filter bean中定义了多个method
, method
属性才是必需的。 要指定您要调用的方法,请使用@Filter批注:
@Filter
public boolean filterInvalidOrders(Order order) {
Spring表达语言
如果您的过滤器非常简单,则可以跳过任何Java类来实现过滤器。 您可以使用SpEL定义过滤器。 例如,以下代码片段将实现与上述相同的过滤器,但没有Java代码:
<int:filter expression="!payload.description.equals('invalid order')" input-channel="requestChannel" output-channel="filteredChannel"/>
丢弃消息
使用默认配置,丢弃的消息只是被静默丢弃。 我们可以更改它,如果我们决定这样做,我们有两个选择:
1.我们可能不想丢失任何消息。 在这种情况下,我们可以抛出一个异常:
<int:filter expression="!payload.description.equals('invalid order')" input-channel="requestChannel" output-channel="filteredChannel"throw-exception-on-rejection="true"/>
2.我们要注册所有丢弃的消息。 我们可以配置一个丢弃通道:
<int:filter expression="!payload.description.equals('invalid order')" input-channel="requestChannel" output-channel="filteredChannel"discard-channel="discardedOrders"/>
4.4路由器
路由器允许您根据条件将消息重定向到特定的消息通道。
与往常一样,该框架提供了一些最基本的实现。 以下示例使用有效负载类型路由器。 它将从请求通道接收消息,并且根据有效负载的类型,它将把它发送到另一个输出通道:
<int:payload-type-router input-channel="requestChannel"><int:mapping type="String" channel="stringChannel"/><int:mapping type="Integer" channel="integerChannel"/>
</int:payload-type-router>
您可以在此处查看完整列表。
现在让我们回到订单示例,我们将实现一个路由器,该路由器将根据订单描述重定向消息。
<int:router ref="myRouter" input-channel="requestChannel" default-output-channel="genericOrders"/>
路由器实现包含一个方法,该方法返回将消息重定向到的消息通道的名称:
@Component("myRouter")
public class MyRouter {public String routeOrder(Order order) {String returnChannel = "genericOrders";if (order.getDescription().startsWith("US-")) {returnChannel = "usOrders";}else if (order.getDescription().startsWith("EU-")) {returnChannel = "europeOrders";}return returnChannel;}
}
如果有几种方法,可以使用@Router
批注:
@Router
public String routeOrder(Order order) {
与过滤器相同,您可以基于Spring表达式语言路由消息。
4.5拆分器和聚合器
拆分器的目标是接收消息并将其划分为几个部分。 这些零件然后分别发送,以便可以独立处理。 该端点通常与聚合器组合。
聚合器获取消息列表,并将它们组合为一条消息。 这与拆分器相反。
您将通过一个示例更好地看到这一点:
我们将修改订单示例,以便拆分器接收订单包。 该软件包包含拆分器将分离的几个相关订单。 拆分器获取订单包并返回订单列表:
<int:splitter input-channel="requestChannel" ref="mySplitter" output-channel="splitChannel"/>
拆分器的实现非常简单:
@Component("mySplitter")
public class MySplitter {public List<Order> splitOrderPackage(OrderPackage orderPackage) {return orderPackage.getOrders();}
}
拆分器返回订单列表,但它可以返回以下任意值:
- 消息的集合或数组。
- Java对象的集合或数组。 每个列表元素将作为消息有效内容包含在内。
- 一个消息。
- 一个Java对象(将包含在消息有效负载中)。
在此示例之后,有一个聚合器端点,该端点连接到“ splitChannel”通道。 该聚合器获取列表并合并其订单以形成订单确认,并添加每个订单的数量:
<int:channel id="splitChannel"/><int:aggregator ref="myAggregator" input-channel="splitChannel" output-channel="outputChannel"/>
聚合器实现:
@Component("myAggregator")
public class MyAggregator {public OrderConfirmation confirmOrders(List<Order> orders) {int total = 0;for (Order order:orders) {total += order.getQuantity();}OrderConfirmation confirmation = new OrderConfirmation("3");confirmation.setQuantity(total);return confirmation;}
}
4.5.1相关和发布策略
当消息由拆分器端点拆分时,将设置两个标头:
- MessageHeaders.CORRELATION_ID
- MessageHeaders.SEQUENCE_SIZE
聚合器端点使用这些标头能够正确组合消息。 它将保留消息,直到准备好一组具有相同相关性ID的消息为止。 何时准备就绪? 达到序列大小后即可准备就绪。
相关策略
允许对邮件进行分组。 默认情况下,它将在CORRELATION_ID
标头中将所有具有相同值的消息分组。 有几种策略可供选择。
发布策略
默认情况下,当一组消息的大小达到消息头SEQUENCE_SIZE
指定的值时,它将被视为完整。
4.6轮询器
在Spring Integration中,有两种类型的使用者:
- 活跃的消费者
- 被动消费者
被动组件是那些订阅了可订阅频道的组件。 这样,当消息发送到这种类型的通道时,该通道将调用其订户。 消费者的方法将被被动调用。
活动组件是连接到可轮询通道的组件。 这样,消息将排队进入通道,等待用户主动从通道中检索消息。
轮询程序用于指定活动使用者如何检索这些消息。 以下是几个示例:
基本轮询器配置
它将在一秒钟的间隔内轮询消息通道
<int:service-activator method="processOrder" input-channel="pollableChannel" ref="orderProcessor"><int:poller fixed-rate="1000"/>
</int:service-activator>
使用Cron表达式配置的轮询器
它将每30分钟轮询一次消息通道
<int:service-activator method="processOrder" input-channel="pollableChannel" ref="orderProcessor"><int:poller cron="0 0/30 * * * ?"/>
</int:service-activator>
要考虑的一件事是,如果使用者连接到可轮询的频道,则将需要一个轮询器。 如果不是,将引发异常。 如果不想为每个活动的使用者配置轮询器,则可以定义一个默认轮询器:
<int:poller id="defaultPoller" fixed-rate="1000" default="true"/>
不要忘记设置default
和id
属性。
4.7消息桥
这种类型的端点连接两个消息通道或两个通道适配器。 例如,您可以将SubscribableChannel
通道连接到PollableChannel
通道。
这是一个示例:
<int:channel id="requestChannel"/><int:bridge input-channel="requestChannel" output-channel="pollableChannel"/><int:channel id="pollableChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="pollableChannel" ref="orderProcessor"/><int:poller id="defaultPoller" fixed-rate="1000" default="true"/>
在此示例中,消息传递桥从输入通道接收消息,并将其发布到输出通道。 在这种情况下,我们将服务激活器连接到输出通道。 订单处理器(服务激活器)将每隔一秒钟轮询一次消息通道。
4.8消息处理程序链
当您以线性方式连接多个消息处理程序时,消息处理程序链用于简化配置。 以下示例显示了将通过处理程序链进行简化的消息传递配置:
<int:channel id="requestChannel"/>
<int:channel id="responseChannel"/><int:filter ref="myFilter" method="filterInvalidOrders" input-channel="requestChannel" output-channel="filteredChannel"/><int:channel id="filteredChannel"/><int:transformer ref="myTransformer" method="transform"input-channel="filteredChannel" output-channel="transformedChannel"/><int:channel id="transformedChannel"/><int:service-activator method="processOrder" input-channel="transformedChannel" ref="orderProcessor" output-channel="responseChannel"/>
消息通过过滤器,然后到达转换器,最后,消息将由服务激活器处理。 完成后,消息将发送到输出通道“ responseChannel”。
使用消息过滤器链,配置将简化如下:
<int:channel id="requestChannel"/>
<int:channel id="responseChannel"/><int:chain input-channel="requestChannel" output-channel="responseChannel"><int:filter ref="myFilter" method="filterInvalidOrders"/><int:transformer ref="myTransformer" method="transform"/><int:service-activator ref="orderProcessor" method="processOrder"/>
</int:chain>
5.同步和异步通信
如本课程的第一篇教程中所述,通信可以同步或异步执行。 本节说明如何更改此通信。
5.1信息渠道
根据您配置消息通道的方式,将同步或异步检索消息。 无需更改很多东西,只需更改配置即可。
例如,假设我们有一个类似下面的点对点直接渠道:
<int:channel id="requestChannel"/>
发送到该通道的消息将立即传递给被动使用者(订户)。 如果期望得到响应,则发件人将等待直到将其发送给他。 为了改变这一点,我们只需要添加一个队列:
<int:channel id="requestChannel"><int:queue capacity="5"/>
</int:channel>
而已。 现在,该通道最多可以将五个消息排队。 使用者将从与发件人不同的线程中主动检索在此通道中排队的消息。
现在,发布-订阅频道如何? 让我们以配置同步通道为例:
<int:publish-subscribe-channel id="mySubscribableChannel"/>
在这种情况下,我们将使用任务执行程序来更改其行为:
<int:publish-subscribe-channel id="mySubscribableChannel" task-executor="myTaskExecutor"/><task:executor id="myTaskExecutor" pool-size="5"/>
5.2网关
网关是一种通道适配器,可用于:
- 提供消息传递系统的进入/退出机制。 这样,应用程序可以将消息发送到消息传递系统,消息传递系统将通过其消息端点对其进行处理。
- 向外部系统发送消息并等待响应(输出网关)
- 接收来自外部系统的消息,并在处理后发送响应(入站网关)。
本示例使用第一种情况。 该应用程序将通过网关发送消息,并等待消息传递系统对其进行处理。 在这里,我们将使用同步网关。 因此,测试应用程序将发送消息并阻止,等待响应。
介面
网关将捕获对它的sendOrder
方法的所有调用。 看到没有该接口的实现。 网关将包装它以拦截那些呼叫。
public interface OrderService {@Gatewaypublic OrderConfirmation sendOrder(Order order);
}
配置
网关链接到接口,以便拦截其呼叫并将消息发送到消息传递系统。
<int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/>
考试
服务接口(网关)被注入到应用程序中。 调用"sendOrder"
方法会将Order对象发送到消息传递系统,并包装在消息中。
@Autowired
private OrderService service;@Test
public void testSendOrder() {OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));Assert.assertNotNull(confirmation);Assert.assertEquals("confirmed", confirmation.getId());
}
在另一个示例中,测试类将阻塞,直到将订单确认发送回为止。 现在我们将对其进行配置以使其异步:
介面
唯一的变化是返回未来
public interface OrderService {@Gatewaypublic Future<OrderConfirmation> sendFutureOrder(Order order);
}
考试
现在,测试必须处理将从网关返回的Future对象。
@Autowired
private OrderService service;@Test
public void testSendCorrectOrder() throws ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(3, "a correct order"));OrderConfirmation orderConfirmation = confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals("confirmed", orderConfirmation.getId());
}
6.错误处理
本教程的最后一部分将说明错误处理的差异,具体取决于我们配置的通信类型是同步还是异步。
在同步通信中,发送者在使用同一线程将消息发送到消息传递系统时阻塞。 显然,如果引发异常,它将到达应用程序(上一节示例中的测试)。
但是,在异步通信中,使用者从另一个线程检索消息。 如果引发异常,它将无法到达应用程序。 Spring Integration如何处理它? 这是错误通道进入的地方。
引发异常时,它将包装到MessagingException中 ,成为新消息的有效内容。 该消息发送至:
- 错误通道:此通道在原始消息头中定义为名为“ errorChannel”的头。
- 全局错误通道:如果消息头中未定义错误通道,则将其发送到全局错误通道。 这个通道是Spring Integration默认定义的。
全局错误通道
该频道是发布-订阅频道。 这意味着我们可以将自己的端点订阅到此通道,并接收引发的任何错误。 实际上,Spring Integration已经预订了一个端点:一个日志处理程序。 该处理程序记录发送到全局错误通道的所有消息的有效负载。
要订阅另一个端点以处理异常,我们只需要按以下方式进行配置:
<int:service-activator input-channel="errorChannel" ref="myExceptionHandler" method="handleInvalidOrder"/><bean id="myExceptionHandler" class="xpadro.spring.integration.activator.MyExceptionHandler"/>
我们的服务激活器端点的handleInvalidOrder
方法将收到消息传递异常:
public class MyExceptionHandler {@ServiceActivatorpublic void handleInvalidOrder(Message<MessageHandlingException> message) {//Retrieve the failed order (payload of the source message)Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();//Handle exception...}
}
翻译自: https://www.javacodegeeks.com/2015/09/spring-integration-fundamentals.html