Merge pull request #4188 from octo/6/write_http_otlp_json

[collectd 6] Add OTLP support to the Write HTTP plugin.
This commit is contained in:
Florian Forster
2024-01-03 16:51:23 +01:00
committed by GitHub
6 changed files with 461 additions and 10 deletions

View File

@@ -476,7 +476,10 @@ if BUILD_WITH_LIBYAJL
noinst_LTLIBRARIES += libformat_json.la
libformat_json_la_SOURCES = \
src/utils/format_json/format_json.c \
src/utils/format_json/format_json.h
src/utils/format_json/format_json.h \
src/utils/format_json/open_telemetry.c \
src/utils/resource_metrics/resource_metrics.c \
src/utils/resource_metrics/resource_metrics.h
libformat_json_la_CPPFLAGS = $(AM_CPPFLAGS)
libformat_json_la_LDFLAGS = $(AM_LDFLAGS)
libformat_json_la_LIBADD = libmetric.la

View File

@@ -11257,7 +11257,8 @@ Password required to load the private key in B<ClientKey>.
=item B<Header> I<Header>
A HTTP header to add to the request. Multiple headers are added if this option is specified more than once. Example:
A HTTP header to add to the request. Multiple headers are added if this option
is specified more than once. Example:
Header "X-Custom-Header: custom_value"
@@ -11267,14 +11268,36 @@ Define which SSL protocol version must be used. By default C<libcurl> will
attempt to figure out the remote SSL protocol version. See
L<curl_easy_setopt(3)> for more details.
=item B<Format> B<Command>|B<JSON>|B<KAIROSDB>
=item B<Format> B<Command>|B<JSON>|B<KairosDB>|B<InfluxDB>|B<OTLP_JSON>
Format of the output to generate. If set to B<Command>, will create output that
is understood by the I<Exec> and I<UnixSock> plugins. When set to B<JSON>, will
create output in the I<JavaScript Object Notation> (JSON). When set to KAIROSDB
, will create output in the KairosDB format.
Set the format in which to export data. Possible values are:
Defaults to B<Command>.
=over 4
=item B<Command> (default)
Output that is understood by the I<Exec> and I<UnixSock> plugins.
=item B<JSON>
collectd specific I<JavaScript Object Notation> (JSON) format.
=item B<KairosDB>
KairosDB is a time series database based on Cassandra. This produces its JSON
format.
=item B<InfluxDB>
InfluxDB is an open-source time series database. This produces its line
protocol.
=item B<OTLP_JSON>
Producs OpenTelemetry's JSON format.
L<https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding>
=back
=item B<Attribute> I<String> I<String>

View File

@@ -30,6 +30,7 @@
#include "collectd.h"
#include "plugin.h"
#include "utils/resource_metrics/resource_metrics.h"
#include "utils/strbuf/strbuf.h"
#ifndef JSON_GAUGE_FORMAT
@@ -48,4 +49,7 @@ int format_json_metric_family(strbuf_t *buf, metric_family_t const *fam,
int format_json_notification(char *buffer, size_t buffer_size,
notification_t const *n);
int format_json_open_telemetry(strbuf_t *buf,
resource_metrics_set_t const *set);
#endif /* UTILS_FORMAT_JSON_H */

View File

@@ -288,10 +288,69 @@ DEF_TEST(metric_family_append) {
return 0;
}
DEF_TEST(open_telemetry) {
metric_family_t *fams[2] = {
&(metric_family_t){
.name = "unit.tests",
.help = "Example gauge metric",
.unit = "1",
.type = METRIC_TYPE_GAUGE,
},
&(metric_family_t){
.name = "unit.test.count",
.help = "Example counter metric",
.unit = "{test}",
.type = METRIC_TYPE_COUNTER,
},
};
metric_family_resource_attribute_update(fams[0], "service.name", "unit test");
metric_family_resource_attribute_update(fams[1], "service.name", "unit test");
CHECK_ZERO(metric_family_append(fams[0], "metric.label", "test label",
(value_t){.gauge = 42}, NULL));
CHECK_ZERO(metric_family_append(fams[1], "metric.label", "bar",
(value_t){.counter = 31337}, NULL));
resource_metrics_set_t set = {0};
CHECK_ZERO(resource_metrics_add(&set, fams[0]));
CHECK_ZERO(resource_metrics_add(&set, fams[1]));
strbuf_t buf = STRBUF_CREATE;
CHECK_ZERO(format_json_open_telemetry(&buf, &set));
EXPECT_EQ_STR(
"{\"resourceMetrics\":[{\"resource\":{\"attributes\":[{\"key\":\"service."
"name\",\"value\":{\"stringValue\":\"unit "
"test\"}}]},\"scopeMetrics\":[{\"scope\":{\"name\":\"collectd\","
"\"version\":\"" PACKAGE_VERSION
"\"},\"metrics\":[{\"name\":\"unit.test.count\",\"unit\":\"{test}\","
"\"description\":\"Example counter "
"metric\",\"sum\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"metric."
"label\",\"value\":{\"stringValue\":\"bar\"}}],\"timeUnixNano\":0,"
"\"asInt\":31337}],\"aggregationTemporality\":\"2\",\"isMonotonic\":true}"
"},{\"name\":\"unit.tests\",\"unit\":\"1\",\"description\":\"Example "
"gauge "
"metric\",\"gauge\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"metric."
"label\",\"value\":{\"stringValue\":\"test "
"label\"}}],\"timeUnixNano\":0,\"asDouble\":42.0}]}}]}]}]}",
buf.ptr);
STRBUF_DESTROY(buf);
resource_metrics_reset(&set);
label_set_reset(&fams[0]->resource);
label_set_reset(&fams[1]->resource);
metric_family_metric_reset(fams[0]);
metric_family_metric_reset(fams[1]);
return 0;
}
int main(void) {
RUN_TEST(notification);
RUN_TEST(metric_family);
RUN_TEST(metric_family_append);
RUN_TEST(open_telemetry);
END_TEST;
}

View File

@@ -0,0 +1,292 @@
/**
* collectd - src/utils_format_json.c
* Copyright (C) 2023 Florian octo Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* Authors:
* Florian octo Forster <octo at collectd.org>
**/
#include "collectd.h"
#include "utils/format_json/format_json.h"
#include <yajl/yajl_gen.h>
static void log_yajl_gen_error(int line, const char *fname, int status) {
if (strncmp(fname, "yajl_gen_", strlen("yajl_gen_")) != 0) {
return;
}
ERROR("open_telemetry.c:%d: %s failed with status %d", line, fname, status);
}
#define CHECK(f) \
do { \
int status = (f); \
if (status != 0) { \
log_yajl_gen_error(__LINE__, #f, status); \
return status; \
} \
} while (0)
static int json_add_string(yajl_gen g, char const *str) /* {{{ */
{
if (str == NULL) {
CHECK(yajl_gen_null(g));
return 0;
}
int status = yajl_gen_string(g, (unsigned char const *)str, strlen(str));
if (status != yajl_gen_status_ok) {
ERROR("format_json: yajl_gen_string(\"%s\") failed with status %d", str,
status);
return status;
}
return 0;
} /* }}} int json_add_string */
static int key_value(yajl_gen g, label_pair_t label) {
CHECK(yajl_gen_map_open(g)); /* BEGIN KeyValue */
CHECK(json_add_string(g, "key"));
CHECK(json_add_string(g, label.name));
CHECK(json_add_string(g, "value"));
CHECK(yajl_gen_map_open(g)); /* BEGIN AnyValue */
CHECK(json_add_string(g, "stringValue"));
CHECK(json_add_string(g, label.value));
CHECK(yajl_gen_map_close(g)); /* END AnyValue */
CHECK(yajl_gen_map_close(g)); /* END KeyValue */
return 0;
}
static int number_data_point(yajl_gen g, metric_t const *m) {
CHECK(yajl_gen_map_open(g)); /* BEGIN NumberDataPoint */
CHECK(json_add_string(g, "attributes"));
CHECK(yajl_gen_array_open(g));
for (size_t i = 0; i < m->label.num; i++) {
CHECK(key_value(g, m->label.ptr[i]));
}
CHECK(yajl_gen_array_close(g));
CHECK(json_add_string(g, "timeUnixNano"));
CHECK(yajl_gen_integer(g, CDTIME_T_TO_NS(m->time)));
switch (m->family->type) {
case METRIC_TYPE_COUNTER:
CHECK(json_add_string(g, "asInt"));
CHECK(yajl_gen_integer(g, m->value.counter));
break;
case METRIC_TYPE_GAUGE:
CHECK(json_add_string(g, "asDouble"));
CHECK(yajl_gen_double(g, m->value.gauge));
break;
case METRIC_TYPE_UNTYPED:
// TODO
assert(0);
}
CHECK(yajl_gen_map_close(g)); /* END NumberDataPoint */
return 0;
}
static int gauge(yajl_gen g, metric_family_t const *fam) {
CHECK(yajl_gen_map_open(g)); /* BEGIN Gauge */
CHECK(json_add_string(g, "dataPoints"));
CHECK(yajl_gen_array_open(g));
for (size_t i = 0; i < fam->metric.num; i++) {
CHECK(number_data_point(g, fam->metric.ptr + i));
}
CHECK(yajl_gen_array_close(g));
CHECK(yajl_gen_map_close(g)); /* END Gauge */
return 0;
}
static int sum(yajl_gen g, metric_family_t const *fam) {
CHECK(yajl_gen_map_open(g)); /* BEGIN Sum */
CHECK(json_add_string(g, "dataPoints"));
CHECK(yajl_gen_array_open(g));
for (size_t i = 0; i < fam->metric.num; i++) {
CHECK(number_data_point(g, fam->metric.ptr + i));
}
CHECK(yajl_gen_array_close(g));
char const *aggregation_temporality_cumulative = "2";
CHECK(json_add_string(g, "aggregationTemporality"));
CHECK(json_add_string(g, aggregation_temporality_cumulative));
CHECK(json_add_string(g, "isMonotonic"));
CHECK(yajl_gen_bool(g, true));
CHECK(yajl_gen_map_close(g)); /* END Sum */
return 0;
}
static int metric(yajl_gen g, metric_family_t const *fam) {
CHECK(yajl_gen_map_open(g)); /* BEGIN Metric */
CHECK(json_add_string(g, "name"));
CHECK(json_add_string(g, fam->name));
if (fam->unit != NULL) {
CHECK(json_add_string(g, "unit"));
CHECK(json_add_string(g, fam->unit));
}
if (fam->help != NULL) {
CHECK(json_add_string(g, "description"));
CHECK(json_add_string(g, fam->help));
}
switch (fam->type) {
case METRIC_TYPE_COUNTER:
CHECK(json_add_string(g, "sum"));
CHECK(sum(g, fam));
break;
case METRIC_TYPE_GAUGE:
CHECK(json_add_string(g, "gauge"));
CHECK(gauge(g, fam));
break;
case METRIC_TYPE_UNTYPED:
// TODO
assert(0);
}
CHECK(yajl_gen_map_close(g)); /* END Metric */
return 0;
}
static int instrumentation_scope(yajl_gen g) {
CHECK(yajl_gen_map_open(g)); /* BEGIN InstrumentationScope */
CHECK(json_add_string(g, "name"));
CHECK(json_add_string(g, PACKAGE_NAME));
CHECK(json_add_string(g, "version"));
CHECK(json_add_string(g, PACKAGE_VERSION));
CHECK(yajl_gen_map_close(g)); /* END InstrumentationScope */
return 0;
}
static int scope_metrics(yajl_gen g, resource_metrics_t const *rm) {
CHECK(yajl_gen_map_open(g)); /* BEGIN ScopeMetrics */
CHECK(json_add_string(g, "scope"));
CHECK(instrumentation_scope(g));
CHECK(json_add_string(g, "metrics"));
CHECK(yajl_gen_array_open(g));
for (size_t i = 0; i < rm->families_num; i++) {
CHECK(metric(g, rm->families[i]));
}
CHECK(yajl_gen_array_close(g));
CHECK(yajl_gen_map_close(g)); /* END ScopeMetrics */
return 0;
}
static int resource(yajl_gen g, label_set_t res) {
CHECK(yajl_gen_map_open(g)); /* BEGIN Resource */
CHECK(json_add_string(g, "attributes"));
CHECK(yajl_gen_array_open(g));
for (size_t i = 0; i < res.num; i++) {
CHECK(key_value(g, res.ptr[i]));
}
CHECK(yajl_gen_array_close(g));
CHECK(yajl_gen_map_close(g)); /* END Resource */
return 0;
}
static int add_resource_metric(yajl_gen g, resource_metrics_t const *rm) {
CHECK(yajl_gen_map_open(g)); /* BEGIN ResourceMetrics */
if (rm->resource.num > 0) {
CHECK(json_add_string(g, "resource"));
CHECK(resource(g, rm->resource));
}
CHECK(json_add_string(g, "scopeMetrics"));
CHECK(yajl_gen_array_open(g));
CHECK(scope_metrics(g, rm));
CHECK(yajl_gen_array_close(g));
CHECK(yajl_gen_map_close(g)); /* END ResourceMetrics */
return 0;
}
int format_json_open_telemetry(strbuf_t *buf,
resource_metrics_set_t const *set) {
if (buf->pos != 0) {
ERROR("format_json_open_telemetry: buffer is not empty.");
return EINVAL;
}
yajl_gen g = yajl_gen_alloc(NULL);
if (g == NULL) {
ERROR("format_json_open_telemetry: yajl_gen_alloc() failed.");
return ENOMEM;
}
#if COLLECT_DEBUG
yajl_gen_config(g, yajl_gen_validate_utf8, 1);
#endif
CHECK(yajl_gen_map_open(g)); /* BEGIN ExportMetricsServiceRequest */
CHECK(json_add_string(g, "resourceMetrics"));
CHECK(yajl_gen_array_open(g));
unsigned char const *out = NULL;
for (size_t i = 0; i < set->num; i++) {
resource_metrics_t const *rm = set->ptr + i;
add_resource_metric(g, rm);
}
CHECK(yajl_gen_array_close(g));
CHECK(yajl_gen_map_close(g)); /* END ExportMetricsServiceRequest */
size_t out_len = 0;
if (yajl_gen_get_buf(g, &out, &out_len) != yajl_gen_status_ok) {
yajl_gen_clear(g);
yajl_gen_free(g);
return -1;
}
if (buf->fixed) {
size_t avail = (buf->size == 0) ? 0 : buf->size - (buf->pos + 1);
if (avail < out_len) {
yajl_gen_clear(g);
yajl_gen_free(g);
return ENOBUFS;
}
}
int status = strbuf_print(buf, (void *)out);
yajl_gen_clear(g);
yajl_gen_free(g);
return status;
}

View File

@@ -26,6 +26,7 @@
#include "collectd.h"
#include "plugin.h"
#include "utils/avltree/avltree.h"
#include "utils/cmds/putmetric.h"
#include "utils/common/common.h"
#include "utils/curl_stats/curl_stats.h"
@@ -71,6 +72,7 @@ struct wh_callback_s {
#define WH_FORMAT_JSON 1
#define WH_FORMAT_KAIROSDB 2
#define WH_FORMAT_INFLUXDB 3
#define WH_FORMAT_OTLP_JSON 5
int format;
bool send_metrics;
bool send_notifications;
@@ -84,6 +86,10 @@ struct wh_callback_s {
pthread_mutex_t send_buffer_lock;
strbuf_t send_buffer;
cdtime_t send_buffer_init_time;
resource_metrics_set_t resource_metrics;
c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL
c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
char response_buffer[WRITE_HTTP_RESPONSE_BUFFER_SIZE];
unsigned int response_buffer_pos;
@@ -142,6 +148,7 @@ static void wh_log_http_error(wh_callback_t *cb) {
static int wh_post(wh_callback_t *cb, char const *data, long size) {
pthread_mutex_lock(&cb->curl_lock);
cb->response_buffer_pos = 0;
curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size);
curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
@@ -200,11 +207,16 @@ static int wh_callback_init(wh_callback_t *cb) {
curl_easy_setopt(cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT);
cb->headers = curl_slist_append(cb->headers, "Accept: */*");
if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
switch (cb->format) {
case WH_FORMAT_JSON:
case WH_FORMAT_KAIROSDB:
case WH_FORMAT_OTLP_JSON:
cb->headers =
curl_slist_append(cb->headers, "Content-Type: application/json");
else
default:
cb->headers = curl_slist_append(cb->headers, "Content-Type: text/plain");
}
cb->headers = curl_slist_append(cb->headers, "Expect:");
curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, cb->headers);
@@ -263,6 +275,40 @@ static int wh_callback_init(wh_callback_t *cb) {
return 0;
} /* int wh_callback_init */
static int flush_resource_metrics(wh_callback_t *cb) {
/* You must hold cb->send_buffer_lock when calling. */
strbuf_t buf = STRBUF_CREATE;
int status = 0;
switch (cb->format) {
case WH_FORMAT_OTLP_JSON:
status = format_json_open_telemetry(&buf, &cb->resource_metrics);
if (status != 0) {
ERROR("write_http plugin: format_json_open_telemetry failed: %s",
STRERROR(status));
}
break;
default:
ERROR("write_http plugin: Unexpected format: %d", cb->format);
status = EINVAL;
}
if (status != 0) {
pthread_mutex_unlock(&cb->send_buffer_lock);
STRBUF_DESTROY(buf);
return status;
}
resource_metrics_reset(&cb->resource_metrics);
cb->send_buffer_init_time = cdtime();
pthread_mutex_unlock(&cb->send_buffer_lock);
status = wh_post(cb, buf.ptr, buf.pos);
STRBUF_DESTROY(buf);
return status;
}
static int wh_flush(cdtime_t timeout,
const char *identifier __attribute__((unused)),
user_data_t *user_data) {
@@ -287,6 +333,11 @@ static int wh_flush(cdtime_t timeout,
}
}
if (cb->format == WH_FORMAT_OTLP_JSON) {
/* cb->send_buffer_lock is unlocked in flush_resource_metrics. */
return flush_resource_metrics(cb);
}
if (cb->send_buffer.pos == 0) {
cb->send_buffer_init_time = cdtime();
pthread_mutex_unlock(&cb->send_buffer_lock);
@@ -437,6 +488,20 @@ static int wh_write_influxdb(metric_family_t const *fam, wh_callback_t *cb) {
return 0;
} /* wh_write_influxdb */
static int wh_write_resource_metrics(metric_family_t const *fam,
wh_callback_t *cb) {
pthread_mutex_lock(&cb->send_buffer_lock);
int status = resource_metrics_add(&cb->resource_metrics, fam);
pthread_mutex_unlock(&cb->send_buffer_lock);
if (status < 0) {
ERROR("write_http plugin: resource_metrics_add failed: %s",
STRERROR(status));
return status;
}
return 0;
}
static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
if ((fam == NULL) || (user_data == NULL)) {
return EINVAL;
@@ -457,6 +522,9 @@ static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
case WH_FORMAT_INFLUXDB:
status = wh_write_influxdb(fam, cb);
break;
case WH_FORMAT_OTLP_JSON:
status = wh_write_resource_metrics(fam, cb);
break;
default:
status = wh_write_command(fam, cb);
break;
@@ -513,6 +581,8 @@ static int config_set_format(wh_callback_t *cb, oconfig_item_t *ci) {
cb->format = WH_FORMAT_KAIROSDB;
else if (strcasecmp("INFLUXDB", string) == 0)
cb->format = WH_FORMAT_INFLUXDB;
else if (strcasecmp("OTLP_JSON", string) == 0)
cb->format = WH_FORMAT_OTLP_JSON;
else {
ERROR("write_http plugin: Invalid format string: %s", string);
return -1;