关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/
文章转自公众号:腾讯云中间件,作者:王俊飞
本期排版:Tango@StreamNative
为什么使用 Pulsar Schema?
如果 producer 端要发送 POJO 类型的数据,则 Pulsar 需要一套序列化和反序列化工具,先将对象转化为字节数据再发送出去,下面为有无 schema 的两种情况:
无 Schema 的情况
若在不指定 schema 的情况下创建 producer,则 producer 只能发送字节数组类型的消息。在有 POJO 类数据要发送时,需要在发送消息前将 POJO 序列化为字节。
代码示例:
Producer<byte[]> producer = client.newProducer() .topic(topic) .create();
User user = new User(“Bill”, 40);
byte[] message = … // serialize the `user` by yourself;
producer.send(message);
有 Schema 的情况
若在指定 schema 的情况下创建 producer,则 producer 可以直接将类发送到 topic,无需考虑如何将 POJO 序列化为字节。
代码示例:
Producer<User> producer = client.newProducer(JSONSchema.of(User.class)) .topic(topic) .create();
User user = new User(“Bill”, 40);
producer.send(user);
此外,在上述 producer 发送数据、consumer 接收数据的流程中,还需考虑以下情况:
•信息对象里是否有字段缺失•结构里是否有字段类型发生改变
在这些情况下,为保证生产-消费模式的正常运行,所有 producer 与其相对应的 consumer 都需要进行相同的变化,若引入 schema 机制,可以简化上述操作。
Pulsar Schema 基本概念
Pulsar Schema 包含:
Schema Type
Pulsar Schema 支持的类型可分为 Primitive type
和 Complex type
Primitive type 包含的类型有 :
Primitive type |
描述 |
BOOLEAN |
1 比特二进制数值 |
INT8 |
8 位有符号整数 |
INT16 |
16 位有符号整数 |
INT32 |
32 位有符号整数 |
INT64 |
64 位有符号整数 |
FLOATE |
单精度浮点数 |
DOUBLE |
双精度浮点数 |
BYTES |
字节序列 |
STRING |
Unicode 字符集序列 |
TIMESTAMP(DATE, TIME) |
时间戳,保存形式为 64 位有符号整数 |
INSTANCE(2.7 版本新增) |
精度为纳秒的瞬时时间 |
LOCAL_DATE(2.7 版本新增) |
本地时间,格式为:yyyy-mm-dd |
LOCAL_TIME(2.7 版本新增) |
本地时间,格式为:hh-mm-ss |
LOCAL_DATE_TIME(2.7 版本新增) |
本地时间,格式为:yyyy-mm-dd : hh-mm-ss |
Complex type 目前支持的类型有:
Key/Value :
该模式下,Pulsar 将键和值的 schemaInfo 存储在一起
Pulsar 提供以下两种编码方式:
下面是使用 INLINE 编码类型构造 key/value schema:
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.INLINE
);
Struct 使用方式
Pulsar 提供以下三种方式使用 Struct:
1.Static2.Generic3.SchemaDefinition
Static
如果我们已知要发送消息的数据类型,可以使用 static schema
, 如下所示。
要发送的类为 User
,结构如下:
public class User { String name; int age;
}
使用 struct schema
创建生产者发送消息:
Producer producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(User.builder().userName("Pulsar-user").userId(1L).build()).send();
使用 struct schema
创建消费者接收消息:
Consumer consumer = client.newConsumer(Schema.AVRO(User.class)).create();
User user = consumer.receive();
Generic
如果我们不知道要发送消息的数据类型,可以使用 GenericSchemaBuilder
定义 struct schema,如下所示。
使用 RecordSchemaBuilder
构建一个 schema:
RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
recordSchemaBuilder.field("intField").type(SchemaType.INT32);
SchemaInfo schemaInfo =recordSchemaBuilder.build(SchemaType.AVRO);
Producer producer =client.newProducer(Schema.generic(schemaInfo)).create();
使用 RecordSchemaBuilder
构建一个 struct schema:
producer.newMessage().value(schema.newRecordBuilder().set("intField", 32).build()).send();
SchemaDefinition
可以通过 SchemaDefinition
生成一个 struct schema,示例如下。
要发送的类为 User
,结构如下:
public class User { String name; int age;
}
使用 Schema Definition
生成一个 producer 并发送消息:
SchemaDefinition<User> schemaDefinition =
SchemaDefinition.builder().withPojo(User.class).build();
Producer<User> producer = client.newProducer(schemaDefinition).create();
producer.newMessage().value(User.builder().userName("Pulsar-user").userId(1L).build()).send();
使用 SchemaDefinition
生成一个 consumer 并发送消息:
SchemaDefinition<User> schemaDefinition = SchemaDefinition.builder().withPojo(User.class).build();
Consumer<User> consumer = client.newConsumer(schemaDefinition).subscribe();
User user = consumer.receive();
SchemaInfo
SchemaInfo 是定义 schema 的一种数据结构,它包含以下字段:
示例如下:
{ "name": "test-string-schema", "type": "STRING", "schema": "", "properties": {}
}
Pulasr Schema 工作流程
在生产者端:
在消费者端:
Pulsar Schema 机制
Schema Version
Org. apache. Pulsar.common. schema
的 SchemaInfo With Version 有两个字段:long 类型的 version 和 SchemaInfo 类型的 schemaInfo。
Topic 下注册的 schema 会带有一个版本号,若版本号发生变化,需在原有版本号基础上 +1。Producer 发送带有 schemaInfo 的消息会附加一个版本号,所以当该消息被 consumer 消费时,客户端可以通过该版本号来获取对应的 schemaInfo,然后根据该 schemaInfo 对消息反序列化。
Schema Evolution
如果遇到业务发生变化的场景时,我们也许需要更新一下 schema,这种更新被称为 schema evolution,很显然,如果 schema 发生了更改,下游的 consumer 会受到影响,所以 schema evolution 应该能保证下游 consumer 能无缝处理旧版本和新版本的数据,这部分机制被称为 schema compatibility,该部分将在下一小节详细介绍。
以下为 schema evolution 的流程:
1.Producer、consumer 或 reader 连接至 broker 时,broker 会根据 schema Registry Compatibility Checkers 配置部署 schema compatibility checker,强制进行 schema 兼容性检查。2.Producer、consumer 或 reader 将 schemaInfo 发送给 broker,broker 收到后查询该 schema 类型的 schema compatibility checker,并根据 schema compatibility 策略检测该 schemaInfo 是否与 topic 目前版本的 schema 兼容(schema compatibility 策略被设置 namespace 级别,作用于该 namespace 下的所有 topic)。
Schema Compatibility Strategy
上小节介绍了 schema evolution,本小节将介绍 schema compatibility。Pulsar 有 8 种 schema 兼容性检查策略,如下表所示:
假设一个 topic 有三个 schema(V1, V2, V3),V1 是最早版本,V3 是最新版本。
兼容性检查策略名称 |
定义 |
是否允许更改 |
检查Schema |
优先级 |
ALWAYS_COMPATIBLE |
总是兼容(禁止兼容性检查) |
允许所有更改 |
所有版本 |
Any order |
ALWAYS_INCOMPATIBLE |
总是不兼容(禁止Schema Evolution)
|
禁止所有更改 |
无 |
无 |
BACKWARD |
使用 schema v3的消费者可以处理使用 schema v2 或 v3 的生产者编写的数据 |
– 添加可选字段 – 删除字段 |
最新版本 |
Consumer |
BACKWARD_TRANSITIVE |
使用 schema v3的消费者可以处理使用 schema v1、v2 或 v3 的生产者编写的数据 |
– 添加可选字段 – 删除字段 |
所有版本 |
Consumer |
FORWARD |
使用 schema v2 或 v3 的消费者可以处理使用 schema v3 的生产者编写的数据 |
– 添加字段 – 删除可选字段 |
最新版本 |
Producer |
FORWARD_TEANSITIVE |
使用 schema v1、v2 或 v3 的消费者可以处理使用 schema v3的生产者编写的数据 |
– 添加字段 – 删除可选字段 |
所有版本 |
Producer |
FULL(默认策略) |
使用 schema v2 或 v3 的消费者可以处理使用 schema v2 或 v3的生产者编写的数据 |
修改可选字段 |
最新版本 |
Any order |
FULL_TRANSITIVE |
使用 schema v1、v2 或 v3 的消费者可以处理使用 schema v1、v2 或 v3 的生产者编写的数据 |
修改可选字段 |
所有版本 |
Any order |
Auto Schema
如果不知道 topic 的模式类型,可以使用 Auto Schema
来生成,Auto Schema
有以下两种类型:
AUTO_PRODUCE 示例
假设以下情况:
•目前需要处理来自 Kafka topic K 消息•有一个 Pulsar topic P, 但是不清楚该 topic 的 schema 类型•应用需要从 Kafka topic K 读取消息,然后写入到 Pulsar topic P
基于上面情况,可以使用 AUTO_PRODUCE
验证 K 生成的字节是否可以发送到 P
Produce<byte[]> PulsarProducer =
client.newProducer(Schema.AUTO_PRODUCE())….create();
byte[] kafkaMessageBytes = … ;
PulsarProducer.produce(kafkaMessageBytes);
AUTO_CONSUME 示例
假设以下情况:
•目前有一个 Pulsar topic P•消费端 (例如 MySQL) 需要从 topic P 读取消息•应用读取来自 P 的消息,然后将读取的消息写入到 MySQL
基于上面情况,可以使用 AUTO-CONSUME
验证 P 生成的字节是否可以发送到 MySQL
Consumer<GenericRecord> PulsarConsumer =
client.newConsumer(Schema.AUTO_CONSUME())….subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
Schema AutoUpdate
如果 schema 通过了 schema 兼容性检测,则 producer 将自己的 schema 版本与 topic schema 版本同步
对于生产者,AutoUpdate
的流程如下:
对于消费者,AutoUpdate
的流程如下:
相关阅读
•快来看!Schema 真的好简单啊!•译文|Pulsar Schema Registry