16.2 Avro的C/C++实现

本节主要介绍Avro的C/C++实现,其中在Avro C库中已经嵌入Jansson(Jansson为编译和操控JSON数据的C语言库),这样可以将JSON解析成模式结构。目前C/C++实现支持:所有原始和复杂数据类型的二进制编码和解码;向Avro对象容器文件进行存储;模式解析、提升和映射;写入Avro数据的有效方式和无效方式,但C语言接口暂不支持远程过程调用RPC。

Avro C为所有模式和数据对象进行引用计数,当引用数降为零时便释放内存。例如,创建和释放一个字符串:


avro_datum_t string=avro_string("This is my string");

……

avro_datum_decref(string);


当考虑创建更加详细的模式和数据结构时就会有一点复杂,例如,创建带有字符串字段的记录:


avro_datum_t example=avro_record("Example");

avro_datum_t solo_field=avro_string("Example field value");

avro_record_set(example,"solo",solo_field);

……

avro_datum_decref(example);


在这个例子中,solo_field数据没有被释放,因为它有两个引用:原来的引用和隐藏在记录Example中的引用。调用avro_datum_decref(example)只能将引用数减少为一。如果想结束solo_field模式,则需要avro_datum_decref(solo_field)来完全删除solo_field数据并释放。

一些数据类型是可以“包装”和“给予”的,这可以让C程序员自由地决定谁负责内存的分配回收。以字符串为例,建立一个字符串数据有三种方式:


avro_datum_t avro_string(const char*str);

avro_datum_t avro_wrapstring(const char*str);

avro_datum_t avro_givestring(const char*str);


如果使用avro_string,那么Avro C会复制字符串并且当不再引用时释放它。在有些情况下,特别是当处理大量数据时要避免这种内存复制,这时需要使用avro_wrapstring和avro_givestring。如果使用avro_wrapstring,那么Avro C不做任何内存处理,它只保存指向数据的指针,这时需要自己来释放字符串。需要注意的是,当使用avro_wrapstring时,在用avro_datum_decref()取消引用数据前不要释放字符串。如果使用avro_givestring,那么Avro C在数据取消引用之后会释放字符串,从某种程度上说,avro_givestring将释放字符串的“责任”给了Avro C。需要注意的是,如果没有使用如malloc或strdup分配堆给字符串,则不要把“责任”给Avro C。例如,不能这样做:


avro_datum_t bad_idea=avro_givestring("This isn't allocated on the heap");


写入数据时可以使用下面的函数:


int avro_write_data(avro_writer_t writer,

avro_schema_t writers_schema, avro_datum_t datum);


如果省略writers_schema值,那么数据在发送给写数据的函数前必须检验数据格式的正确性。如果已经确定数据是正确的,那么可以设置writers_schema为NULL,这时Avro C不会检查格式。需要注意的是,写入Avro文件对象容器的数据总是要进行验证。

下面介绍一个简单例子,例子中建立了学生信息的数据库,并向数据库中读写记录:


/student.c/

include<avro.h>

include<inttypes.h>

include<stdio.h>

include<stdlib.h>

include<unistd.h>

avro_schema_t student_schema;

/id用于添加记录时为学生建立学号/

int64_t id=0;

/定义学生模式,拥有字段学号、姓名、学院、电话和年龄/

define STUDENT_SCHEMA\

"{\"type\":\"record\",\

\"name\":\"Student\",\

\"fields\":[\

{\"name\":\"SID\",\"type\":\"long\"},\

{\"name\":\"Name\",\"type\":\"string\"},\

{\"name\":\"Dept\",\"type\":\"string\"},\

{\"name\":\"Phone\",\"type\":\"string\"},\

{\"name\":\"Age\",\"type\":\"int\"}]}"

/把JSON定义的模式解析成模式的数据结构/

void init(void)

{

avro_schema_error_t error;

if(avro_schema_from_json(STUDENT_SCHEMA,

sizeof(STUDENT_SCHEMA),

&student_schema,&error)){

fprintf(stderr,"Failed to parse student schema\n");

exit(EXIT_FAILURE);

}

}

/添加学生记录/

void add_student(avro_file_writer_t db, const charname, const chardept, const

char*phone, int32_t age)

{

avro_datum_t student=avro_record("Student",NULL);

avro_datum_t sid_datum=avro_int64(++id);

avro_datum_t name_datum=avro_string(name);

avro_datum_t dept_datum=avro_string(dept);

avro_datum_t age_datum=avro_int32(age);

avro_datum_t phone_datum=avro_string(phone);

/创建学生记录/

if(avro_record_set(student,"SID",sid_datum)

||avro_record_set(student,"Name",name_datum)

||avro_record_set(student,"Dept",dept_datum)

||avro_record_set(student,"Age",age_datum)

||avro_record_set(student,"Phone",phone_datum)){

fprintf(stderr,"Failed to create student datum structure");

exit(EXIT_FAILURE);

}

/将记录添加到数据库文件中/

if(avro_file_writer_append(db, student)){

fprintf(stderr,"Failed to add student datum to database");

exit(EXIT_FAILURE);

}

/解除引用,释放内存空间/

avro_datum_decref(sid_datum);

avro_datum_decref(name_datum);

avro_datum_decref(dept_datum);

avro_datum_decref(age_datum);

avro_datum_decref(phone_datum);

avro_datum_decref(student);

fprintf(stdout,"Successfully added%s\n",name);

}

/输出数据库中的学生信息/

int show_student(avro_file_reader_t db,

avro_schema_t reader_schema)

{

int rval;

avro_datum_t student;

rval=avro_file_reader_read(db, reader_schema,&student);

if(rval==0){

int64_t i64;

int32_t i32;

char*p;

avro_datum_t sid_datum, name_datum, dept_datum,

phone_datum, age_datum;

if(avro_record_get(student,"SID",&sid_datum)==0){

avro_int64_get(sid_datum,&i64);

fprintf(stdout,"%"PRId64"",i64);

}

if(avro_record_get(student,"Name",&name_datum)==0){

avro_string_get(name_datum,&p);

fprintf(stdout,"%12s",p);

}

if(avro_record_get(student,"Dept",&dept_datum)==0){

avro_string_get(dept_datum,&p);

fprintf(stdout,"%12s",p);

}

if(avro_record_get(student,"Phone",&phone_datum)==0){

avro_string_get(phone_datum,&p);

fprintf(stdout,"%12s",p);

}

if(avro_record_get(student,"Age",&age_datum)==0){

avro_int32_get(age_datum,&i32);

fprintf(stdout,"%d",i32);

}

fprintf(stdout,"\n");

/释放记录/

avro_datum_decref(student);

}

return rval;

}

int main(void)

{

int rval;

avro_file_reader_t dbreader;

avro_file_writer_t db;

avro_schema_t extraction_schema, name_schema,

phone_schema;

int64_t i;

const char*dbname="student.db";

init();

/如果student.db存在,则删除/

unlink(dbname);

/创建数据库文件/

rval=avro_file_writer_create(dbname, student_schema,&db);

if(rval){

fprintf(stderr,"Failed to create%s\n",dbname);

exit(EXIT_FAILURE);

}

/向数据库文件中添加学生信息/

add_student(db,"Zhanghua","Law","15201161111",25);

add_student(db,"Lili","Economy","15201162222",24);

add_student(db,"Wangyu","Information","15201163333",25);

add_student(db,"Zhaoxin","Art","15201164444",23);

add_student(db,"Sunqin","Physics","15201165555",25);

add_student(db,"Zhouping","Math","15201166666",23);

avro_file_writer_close(db);

fprintf(stdout,"\nPrint all the records from database\n");

/读取并输出所有的学生信息/

avro_file_reader(dbname,&dbreader);

for(i=0;i<id;i++){

if(show_student(dbreader, NULL)){

fprintf(stderr,"Error printing student\n");

exit(EXIT_FAILURE);

}

}

avro_file_reader_close(dbreader);

/输出学生的姓名和电话信息/

extraction_schema=avro_schema_record("Student",NULL);

name_schema=avro_schema_string();

phone_schema=avro_schema_string();

avro_schema_record_field_append(extraction_schema,

"Name",name_schema);

avro_schema_record_field_append(extraction_schema,"Phone",phone_schema);

/只读取每个学生的姓名和电话/

fprintf(stdout,

"\n\nExtract Name&Phone of the records from database\n");

avro_file_reader(dbname,&dbreader);

for(i=0;i<id;i++){

if(show_student(dbreader, extraction_schema)){

fprintf(stderr,"Error printing student\n");

exit(EXIT_FAILURE);

}

}

avro_file_reader_close(dbreader);

avro_schema_decref(name_schema);

avro_schema_decref(phone_schema);

avro_schema_decref(extraction_schema);

/最后释放学生模式/

avro_schema_decref(student_schema);

return 0;

}


如果要编译上面的C文件,则需要安装Avro C。首先可以从网站http://www.apache.org/dyn/closer.cgi/avro/选择镜像下载avro-c-1.6.3.tar.gz文件,使用命令tar-zxvf avro-c-1.6.3.tar.gz解压后进入其目录,并使用命令./configure和make、make install进行编译安装。注意,需要在root的权限下进行安装。安装成功后,在编译C语言前需要将libavro加入动态链接库中,使用命令:


export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH


然后对程序进行编译:


gcc-o student-lavro student.c


运行生成的执行文件可得到如图16-5所示的结果。运行时在当前目录下生成student.db对象容器文件,可以使用命令cat查看文件中的内容—先存储学生的模式,然后存储学生的记录信息,具体内容可参见16.1.4节“对象容器文件”和图16-3。

16.2 Avro的C/C++实现 - 图1

图 16-5 运行结果

下面介绍Avro的C++应用程序接口。虽然Avro并不需要使用代码生成器,但是使用代码生成工具可以更简单地使用Avro C++库。代码生成器既可以读取模式并输出模式数据的C++对象,也可以产生代码来序列化或反序列化对象等所有复杂的译码工作。即使使用C++核心库来编写序列化器或者解析器,产生的代码也可以说明如何使用这些库。下面举一个使用模式的简单例子,此例用来表示一个虚数:


{

"type":"record",

"name":"complex",

"fields":[

{"name":"real","type":"double"},

{"name":"imaginary","type":"double"}

]

}


假设JSON可用来表示存储在名为imaginary文件中的模式,那么产生代码分成两步:

第一步:


precompile<imaginary>imaginary.flat


预编译会将模式转化为代码生成器所使用的中间格式,中间文件是模式的文本形式,它是通过对模式类型树深度优先遍历得到的。

第二步:


python scripts/gen-cppcode.py—input=example.flat—output=example.hh—namespace=Math


上面的命令告诉代码生成器去读取模式作为输入,并且在example.hh中生成C++头文件。可选参数将指定对象放置的命名空间,如果没有指定命名空间,仍可得到默认的命名空间。下面是所产生代码的开始部分:


namespace Math{

struct complex{

complex():

real(),

imaginary()

{}

double real;

double imaginary;

};


以上代码是用C++表示的模式,它创建记录、默认构造函数并为记录的每个字段建立成员。下面是序列化数据的例子:


void serializeMyData()

{

Math:complex c;

c.real=10.0;

c.imaginary=20.0;

//writer是实际I/O和缓冲结果的对象

avro:Writer writer;

//在对象上调用writer

avro:serialize(writer, c);

//这时,writer将序列化后的数据存储在缓冲区中

InputBuffer buffer=writer.buffer();

}


使用生成的代码,调用对象的avro:serialize()函数可以序列化数据,通过调用avro:InputBuffer对象可以获取数据,通过网络可以发送文件。下面读取序列化的数据到对象中:


void parseMyData(const avro:InputBuffer&myData)

{

Math:complex c;

//reader为实际I/O读取的对象

avro:Reader reader(myData);

//在对象上调用reader

avro:parse(reader, c);

//此时,C中存放的是反序列化后的数据

}


在下面的代码中avro:serialize()函数和avro:parse()函数可用于处理用户数据类型,具体实现如下:


template<typename Serializer>

inline void serialize(Serializer&s, const complex&val, const boost:true_type&)

{

s.writeRecord();

serialize(s, val.real);

serialize(s, val.imaginary);

s.writeRecordEnd();

}

template<typename Parser>

inline void parse(Parser&p, complex&val, const boost:true_type&){

p.readRecord();

parse(p, val.real);

parse(p, val.imaginary);

p.readRecordEnd();

}


以下内容也可加入avro命名空间中:


template<>struct is_serializable<Math:complex>:public boost:true_type{};


这样为复杂结构建立类型特征,告诉Avro对象的序列化和解析功能可用。

除了上面介绍的使用Avro C++代码生成器来读写对象外,Avro C++也可以读入JSON模式。库函数提供了一些工具来读取存储在JSON文件或字符串中的模式,如下所示:


void readSchema()

{

//My schema is stored in a file called"example"

std:ifstream in("example");

avro:ValidSchema mySchema;

avro:compileJsonSchema(in, mySchema);

}


上面代码读取文件并将JSON模式解析成avro:ValidSchema类型的对象。如果模式是无效的,将无法建立有效模式(ValidSchema)对象并抛出异常,那么如何从JSON存储的模式中建立有效模式对象呢?

有效模式(ValidSchema)可以保证开发者实际写入的类型匹配模式所期望的类型。现在重写序列化函数并需要检查模式:


void serializeMyData(const ValidSchema&mySchema)

{

Math:complex c;

c.real=10.0;

c.imaginary=20.0;

//ValidatingWriter保证序列化写入正确类型的数据

avro:ValidatingWriter writer(mySchema);

try{

avro:serialize(writer, c);

//这时,ostringstream"os"存储序列化后的数据

}

catch(avro:Exception&e){

std:cerr<<"ValidatingWriter encountered an error:"<<e.what();

}

}


这段代码和前面的区别就是用ValidatingWriter代替了Writer object。如果序列化函数错误地写入不匹配模式的类型,那么ValidatingWriter将抛出异常。ValidatingWriter会在写入数据的时候增加很多处理过程。对于产生的代码则没有必要进行验证,因为自动生成的代码是匹配模式的。然而,在写入和测试自己序列化的代码时加上安全验证还是必要的。解析数据时也可以使用有效模式,它不仅可以确保解析器读取的类型匹配模式有效,还提供了接口,通过该接口可以查询下一个期望的类型和记录成员字段的名称。下面的例子介绍了如何使用API:


void parseMyData(const avro:InputBuffer&myData, const avro:ValidSchema&mySchema)

{

//手动解析数据,解析对象将数据绑定到模式上

avro:Parser<ValidatingReader>parser(mySchema, myData);

assert(nextType(parser)==avro:AVRO_RECORD);

//开始解析

parser.readRecord();

Math:complex c;

std:string recordName;

assert(currentRecordName(parser, recordName)==true);

assert(recordName=="complex");

std:string fieldName;

for(int i=0;i<2;++i){

assert(nextType(parser)==avro:AVRO_DOUBLE);

assert(nextFieldName(parser, fieldName)==true);

if(fieldName=="real"){

c.real=parser.readDouble();

}

else if(fieldName=="imaginary"){

c.imaginary=parser.readDouble();

}

else{

std:cout<<"I did not expect that!\n";

}

}

parser.readRecordEnd();

}


上面的代码表明,如果编译时不知道模式,也可以通过写出解析数据的代码在运行时读取模式,并且查询ValidatingReader来了解序列化数据的内容。

在自己的代码中使用对象来建立模式是允许的,每个原始类型和复合类型都有模式对象,并且它们拥有共同的Schema基类。下面是一个为复数记录数组建立模式的例子:


void createMySchema()

{

//首先建立复数类型

avro:RecordSchema myRecord("complex");

//在记录中加入字段(每个字段又是一个模式)

myRecord.addField("real",avro:DoubleSchema());

myRecord.addField("imaginary",avro:DoubleSchema());

//这个复数记录和之前使用的一样,下面为这些记录的数组建立模式

avro:ArraySchema complexArray(myRecord);

//如果模式是无效的将抛出

avro:ValidSchema validComplexArray(complexArray);

//这样建立好了模式

//输出到屏幕上

validComplexArray.toJson(std:cout);

}


以上代码建立的模式可能是无效的,因此,为了使用模式,需要将它转化为ValidSchma对象。执行上述代码可以得到:


{

"type":"array",

"items":{

"type":"record",

"name":"complex",

"fields":[

{

"name":"real",

"type":"double"

},

{

"name":"imaginary",

"type":"double"

}

]

}

}


随着时间的变化,程序模式期望的数据可能与之前存储的数据不同,为了把一个模式转化为另一个模式,Avro提供了不完全一样的模式规则。这种情况下,代码生成工具就有用了,对于每个生成的结构都会建立一个用来读取数据的特别索引结构,即使数据是用不同的模式写的。在example.hh中的索引结构如下:


class complex_Layout:public avro:CompoundOffset{

public:

complex_Layout(size_t offset):

CompoundOffset(offset)

{

add(new avro:Offset(offset+offsetof(complex, real)));

add(new avro:Offset(offset+offsetof(complex, imaginary)));

}

};


数据前若是float类型而不是double类型,根据模式解决规则,floats可以升级为doubles,只要新旧模式都有用,就会建立一个动态的解析器来读取代码生成结构的数据。如下所示:


void dynamicParse(const avro:ValidSchema&writerSchema,

const avro:ValidSchema&readerSchema){

//实例化布局对象

Math:complex_Layout layout;

//创建已知类型布局和模式的模式解析器

resolverSchema(writerSchema, readerSchema, layout);

//设置reader

avro:ResolvingReader reader(resolverSchema, data);

Math:complex c;

//执行解析

avro:parse(reader, c);

//这时,c中存放的是反序列化后的数据

}