关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/

文章转自公众号:腾讯云中间件,作者:王俊飞 

本期排版:Tango@StreamNative

为什么使用 Pulsar Schema?

如果 producer 端要发送 POJO 类型的数据,则 Pulsar 需要一套序列化和反序列化工具,先将对象转化为字节数据再发送出去,下面为有无 schema 的两种情况:

无 Schema 的情况

若在不指定 schema 的情况下创建 producer,则 producer 只能发送字节数组类型的消息。在有 POJO 类数据要发送时,需要在发送消息前将 POJO 序列化为字节。

代码示例:

Producer<byte[]> producer = client.newProducer()      .topic(topic)        .create();
User user = new User(“Bill”, 40); 
byte[] message = … // serialize the `user` by yourself; 
producer.send(message);

有 Schema 的情况

若在指定 schema 的情况下创建 producer,则 producer 可以直接将类发送到 topic,无需考虑如何将 POJO 序列化为字节。

代码示例:

Producer<User> producer = client.newProducer(JSONSchema.of(User.class))         .topic(topic)         .create(); 
User user = new User(“Bill”, 40); 
producer.send(user);

此外,在上述 producer 发送数据、consumer 接收数据的流程中,还需考虑以下情况:

•信息对象里是否有字段缺失•结构里是否有字段类型发生改变

在这些情况下,为保证生产-消费模式的正常运行,所有 producer 与其相对应的 consumer 都需要进行相同的变化,若引入 schema 机制,可以简化上述操作。

Pulsar Schema 基本概念

Pulsar Schema 包含:

Schema Type

Pulsar Schema 支持的类型可分为 Primitive type 和 Complex type

Primitive type 包含的类型有 :

Primitive type

描述

BOOLEAN

1 比特二进制数值

INT8

8 位有符号整数

INT16

16 位有符号整数

INT32

32 位有符号整数

INT64

64 位有符号整数

FLOATE

单精度浮点数

DOUBLE

双精度浮点数

BYTES

字节序列

STRING

Unicode 字符集序列

TIMESTAMP(DATE, TIME)

时间戳,保存形式为 64 位有符号整数

INSTANCE(2.7 版本新增)

精度为纳秒的瞬时时间

LOCAL_DATE(2.7 版本新增)

本地时间,格式为:yyyy-mm-dd

LOCAL_TIME(2.7 版本新增)

本地时间,格式为:hh-mm-ss

LOCAL_DATE_TIME(2.7 版本新增)

本地时间,格式为:yyyy-mm-dd : hh-mm-ss

Complex type 目前支持的类型有:

博文推荐|深度解读 Pulsar Schema-编程知识网

Key/Value :

该模式下,Pulsar 将键和值的 schemaInfo 存储在一起

Pulsar 提供以下两种编码方式:

博文推荐|深度解读 Pulsar Schema-编程知识网

下面是使用 INLINE 编码类型构造 key/value schema:

Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.INLINE
);

Struct 使用方式

Pulsar 提供以下三种方式使用 Struct:

1.Static2.Generic3.SchemaDefinition

Static

如果我们已知要发送消息的数据类型,可以使用 static schema, 如下所示。

要发送的类为 User,结构如下:

public class User {     String name;     int age; 
}

使用 struct schema 创建生产者发送消息:

Producer producer =   client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(User.builder().userName("Pulsar-user").userId(1L).build()).send();

使用 struct schema 创建消费者接收消息:

Consumer consumer  = client.newConsumer(Schema.AVRO(User.class)).create(); 
User user = consumer.receive();

Generic

如果我们不知道要发送消息的数据类型,可以使用 GenericSchemaBuilder 定义 struct schema,如下所示。

使用 RecordSchemaBuilder 构建一个 schema:

RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName"); 
recordSchemaBuilder.field("intField").type(SchemaType.INT32); 
SchemaInfo schemaInfo =recordSchemaBuilder.build(SchemaType.AVRO);
Producer producer =client.newProducer(Schema.generic(schemaInfo)).create();

使用 RecordSchemaBuilder 构建一个 struct schema:

producer.newMessage().value(schema.newRecordBuilder().set("intField", 32).build()).send();

SchemaDefinition

可以通过 SchemaDefinition 生成一个 struct schema,示例如下。

要发送的类为 User,结构如下:

public class User {     String name;     int age; 
}

使用 Schema Definition 生成一个 producer 并发送消息:

SchemaDefinition<User> schemaDefinition =   
SchemaDefinition.builder().withPojo(User.class).build();
Producer<User> producer = client.newProducer(schemaDefinition).create();
producer.newMessage().value(User.builder().userName("Pulsar-user").userId(1L).build()).send();

使用 SchemaDefinition 生成一个 consumer 并发送消息:

SchemaDefinition<User> schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build();
Consumer<User> consumer = client.newConsumer(schemaDefinition).subscribe();
User user = consumer.receive();

SchemaInfo

SchemaInfo 是定义 schema 的一种数据结构,它包含以下字段:

博文推荐|深度解读 Pulsar Schema-编程知识网

示例如下:

{     "name": "test-string-schema",     "type": "STRING",     "schema": "",     "properties": {} 
}

Pulasr Schema 工作流程

在生产者端:

博文推荐|深度解读 Pulsar Schema-编程知识网博文推荐|深度解读 Pulsar Schema-编程知识网

在消费者端:

博文推荐|深度解读 Pulsar Schema-编程知识网博文推荐|深度解读 Pulsar Schema-编程知识网

Pulsar Schema 机制

Schema Version

Org. apache. Pulsar.common. schema 的 SchemaInfo With Version 有两个字段:long 类型的 version 和 SchemaInfo 类型的 schemaInfo。

Topic 下注册的 schema 会带有一个版本号,若版本号发生变化,需在原有版本号基础上 +1。Producer 发送带有 schemaInfo 的消息会附加一个版本号,所以当该消息被 consumer 消费时,客户端可以通过该版本号来获取对应的 schemaInfo,然后根据该 schemaInfo 对消息反序列化。

Schema Evolution

如果遇到业务发生变化的场景时,我们也许需要更新一下 schema,这种更新被称为 schema evolution,很显然,如果 schema 发生了更改,下游的 consumer 会受到影响,所以 schema evolution 应该能保证下游 consumer 能无缝处理旧版本和新版本的数据,这部分机制被称为 schema compatibility,该部分将在下一小节详细介绍。

以下为 schema evolution 的流程:

1.Producer、consumer 或 reader 连接至 broker 时,broker 会根据 schema Registry Compatibility Checkers 配置部署 schema compatibility checker,强制进行 schema 兼容性检查。2.Producer、consumer 或 reader 将 schemaInfo 发送给 broker,broker 收到后查询该 schema 类型的 schema compatibility checker,并根据 schema compatibility 策略检测该 schemaInfo 是否与 topic 目前版本的 schema 兼容(schema compatibility 策略被设置 namespace 级别,作用于该 namespace 下的所有 topic)。

Schema Compatibility Strategy

上小节介绍了 schema evolution,本小节将介绍 schema compatibility。Pulsar 有 8 种 schema 兼容性检查策略,如下表所示:

假设一个 topic 有三个 schema(V1, V2, V3),V1 是最早版本,V3 是最新版本。

兼容性检查策略名称

定义

是否允许更改

检查Schema

优先级   

ALWAYS_COMPATIBLE

总是兼容(禁止兼容性检查)

允许所有更改

      所有版本

Any order

ALWAYS_INCOMPATIBLE

总是不兼容(禁止Schema Evolution)

   

禁止所有更改

BACKWARD

使用 schema v3的消费者可以处理使用 schema v2 或 v3 的生产者编写的数据

– 添加可选字段

– 删除字段

最新版本

Consumer

BACKWARD_TRANSITIVE

使用 schema v3的消费者可以处理使用 schema v1、v2 或 v3 的生产者编写的数据

– 添加可选字段

– 删除字段

所有版本

Consumer

FORWARD

使用 schema v2 或 v3 的消费者可以处理使用 schema v3 的生产者编写的数据

– 添加字段

– 删除可选字段

最新版本

Producer

FORWARD_TEANSITIVE

使用 schema v1、v2 或 v3 的消费者可以处理使用 schema v3的生产者编写的数据

– 添加字段

– 删除可选字段

所有版本

Producer

FULL(默认策略)

使用 schema v2 或 v3 的消费者可以处理使用 schema v2 或 v3的生产者编写的数据

修改可选字段

最新版本

Any order

FULL_TRANSITIVE

使用 schema v1、v2 或 v3 的消费者可以处理使用 schema v1、v2 或 v3 的生产者编写的数据

修改可选字段

所有版本

Any order

Auto Schema

如果不知道 topic 的模式类型,可以使用 Auto Schema 来生成,Auto Schema 有以下两种类型:

博文推荐|深度解读 Pulsar Schema-编程知识网

AUTO_PRODUCE 示例

假设以下情况:

•目前需要处理来自 Kafka topic K 消息•有一个 Pulsar topic P, 但是不清楚该 topic 的 schema 类型•应用需要从 Kafka topic K 读取消息,然后写入到 Pulsar topic P

基于上面情况,可以使用 AUTO_PRODUCE 验证 K 生成的字节是否可以发送到 P

Produce<byte[]> PulsarProducer = 
client.newProducer(Schema.AUTO_PRODUCE())….create();
byte[] kafkaMessageBytes = … ; 
PulsarProducer.produce(kafkaMessageBytes);

AUTO_CONSUME 示例

假设以下情况:

•目前有一个 Pulsar topic P•消费端 (例如 MySQL) 需要从 topic P 读取消息•应用读取来自 P 的消息,然后将读取的消息写入到 MySQL

基于上面情况,可以使用 AUTO-CONSUME 验证 P 生成的字节是否可以发送到 MySQL

Consumer<GenericRecord> PulsarConsumer = 
client.newConsumer(Schema.AUTO_CONSUME())….subscribe();
Message<GenericRecord> msg = consumer.receive() ; 
GenericRecord record = msg.getValue();

Schema AutoUpdate

如果 schema 通过了 schema 兼容性检测,则 producer 将自己的 schema 版本与 topic schema 版本同步

对于生产者,AutoUpdate 的流程如下:

博文推荐|深度解读 Pulsar Schema-编程知识网博文推荐|深度解读 Pulsar Schema-编程知识网

对于消费者,AutoUpdate 的流程如下:

博文推荐|深度解读 Pulsar Schema-编程知识网博文推荐|深度解读 Pulsar Schema-编程知识网

相关阅读

•快来看!Schema 真的好简单啊!•译文|Pulsar Schema Registry

博文推荐|深度解读 Pulsar Schema-编程知识网点击“阅读原文”,获取 Apache Pulsar 硬核干货资料!