Add metadata support to GRPC plugin

This commit is contained in:
Taylor Cramer
2017-07-24 10:36:27 -07:00
parent bea5daaed7
commit 9dad090759
4 changed files with 154 additions and 5 deletions

View File

@@ -38,6 +38,16 @@ message Identifier {
string type_instance = 5;
}
message MetadataValue {
oneof value {
string string_value = 1;
int64 int64_value = 2;
uint64 uint64_value = 3;
double double_value = 4;
bool bool_value = 5;
};
}
message Value {
oneof value {
uint64 counter = 1;
@@ -56,4 +66,5 @@ message ValueList {
Identifier identifier = 4;
repeated string ds_names = 5;
}
map<string, MetadataValue> meta_data = 6;
}

View File

@@ -913,6 +913,20 @@ int uc_iterator_get_interval(uc_iter_t *iter, cdtime_t *ret_interval) {
return 0;
} /* int uc_iterator_get_name */
int uc_iterator_get_meta(uc_iter_t *iter, meta_data_t **ret_meta) {
if ((iter == NULL) || (iter->entry == NULL) || (ret_meta == NULL))
return -1;
if (iter->entry->meta == NULL) {
*ret_meta = NULL;
return 0;
}
*ret_meta = meta_data_clone(iter->entry->meta);
return 0;
} /* int uc_iterator_get_meta */
/*
* Meta data interface
*/

View File

@@ -106,6 +106,8 @@ int uc_iterator_get_values(uc_iter_t *iter, value_t **ret_values,
size_t *ret_num);
/* Return the interval of the value at the current position. */
int uc_iterator_get_interval(uc_iter_t *iter, cdtime_t *ret_interval);
/* Return the metadata for the value at the current position. */
int uc_iterator_get_meta(uc_iter_t *iter, meta_data_t **ret_meta);
/*
* Meta data interface

View File

@@ -170,9 +170,87 @@ static grpc::Status marshal_value_list(const value_list_t *vl,
msg->set_allocated_time(new google::protobuf::Timestamp(t));
msg->set_allocated_interval(new google::protobuf::Duration(d));
msg->clear_meta_data();
if (vl->meta != nullptr) {
char **meta_data_keys = nullptr;
int meta_data_keys_len = meta_data_toc(vl->meta, &meta_data_keys);
if (meta_data_keys_len < 0) {
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("error getting metadata keys"));
}
for (int i = 0; i < meta_data_keys_len; i++) {
char *key = meta_data_keys[i];
int md_type = meta_data_type(vl->meta, key);
collectd::types::MetadataValue md_value;
md_value.Clear();
switch (md_type) {
case MD_TYPE_STRING:
char *md_string;
if (meta_data_get_string(vl->meta, key, &md_string) != 0 || md_string == nullptr) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_string_value(md_string);
free(md_string);
break;
case MD_TYPE_SIGNED_INT:
int64_t int64_value;
if (meta_data_get_signed_int(vl->meta, key, &int64_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_int64_value(int64_value);
break;
case MD_TYPE_UNSIGNED_INT:
uint64_t uint64_value;
if (meta_data_get_unsigned_int(vl->meta, key, &uint64_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_uint64_value(uint64_value);
break;
case MD_TYPE_DOUBLE:
double double_value;
if (meta_data_get_double(vl->meta, key, &double_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_double_value(double_value);
break;
case MD_TYPE_BOOLEAN:
bool bool_value;
if (meta_data_get_boolean(vl->meta, key, &bool_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_bool_value(bool_value);
break;
default:
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("unknown metadata type"));
}
(*msg->mutable_meta_data())[grpc::string(key)] = md_value;
if (meta_data_keys != nullptr) {
strarray_free(meta_data_keys, meta_data_keys_len);
}
}
}
for (size_t i = 0; i < vl->values_len; ++i) {
auto v = msg->add_values();
switch (ds->ds[i].type) {
int value_type = ds->ds[i].type;
switch (value_type) {
case DS_TYPE_COUNTER:
v->set_counter(vl->values[i].counter);
break;
@@ -186,6 +264,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl,
v->set_absolute(vl->values[i].absolute);
break;
default:
ERROR("grpc: invalid value type (%d)", value_type);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("unknown value type"));
}
@@ -207,6 +286,42 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
if (!status.ok())
return status;
vl->meta = meta_data_create();
if (vl->meta == nullptr) {
return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
grpc::string("failed to metadata list"));
}
for (auto kv: msg.meta_data()) {
auto k = kv.first.c_str();
auto v = kv.second;
// The meta_data collection individually allocates copies of the keys and
// string values for each entry, so it's safe for us to pass a reference
// to our short-lived strings.
switch (v.value_case()) {
case collectd::types::MetadataValue::ValueCase::kStringValue:
meta_data_add_string(vl->meta, k, v.string_value().c_str());
break;
case collectd::types::MetadataValue::ValueCase::kInt64Value:
meta_data_add_signed_int(vl->meta, k, v.int64_value());
break;
case collectd::types::MetadataValue::ValueCase::kUint64Value:
meta_data_add_unsigned_int(vl->meta, k, v.uint64_value());
break;
case collectd::types::MetadataValue::ValueCase::kDoubleValue:
meta_data_add_double(vl->meta, k, v.double_value());
break;
case collectd::types::MetadataValue::ValueCase::kBoolValue:
meta_data_add_boolean(vl->meta, k, v.bool_value());
break;
default:
meta_data_destroy(vl->meta);
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("Metadata of unknown type"));
}
}
value_t *values = NULL;
size_t values_len = 0;
@@ -249,8 +364,11 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
if (status.ok()) {
vl->values = values;
vl->values_len = values_len;
} else if (values) {
free(values);
} else {
meta_data_destroy(vl->meta);
if (values) {
free(values);
}
}
return status;
@@ -280,6 +398,7 @@ public:
auto vl = value_lists.front();
value_lists.pop();
sfree(vl.values);
meta_data_destroy(vl.meta);
}
return status;
@@ -328,7 +447,6 @@ private:
if (!ident_matches(&vl, match))
continue;
if (uc_iterator_get_time(iter, &vl.time) < 0) {
status =
grpc::Status(grpc::StatusCode::INTERNAL,
@@ -346,6 +464,10 @@ private:
grpc::string("failed to retrieve values"));
break;
}
if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to retrieve value metadata"));
}
value_lists->push(vl);
} // while (uc_iterator_next(iter, &name) == 0)