1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{sync::Arc, time::Instant};

use crate::{futures::Future, rpc, rpc_apis};

use parking_lot::Mutex;

use hyper::{service::service_fn_ok, Body, Method, Request, Response, Server, StatusCode};

use stats::{
    prometheus::{self, Encoder},
    PrometheusMetrics, PrometheusRegistry,
};

#[derive(Debug, Clone, PartialEq)]
pub struct MetricsConfiguration {
    /// Are metrics enabled (default is false)?
    pub enabled: bool,
    /// Prefix
    pub prefix: String,
    /// The IP of the network interface used (default is 127.0.0.1).
    pub interface: String,
    /// The network port (default is 3000).
    pub port: u16,
}

impl Default for MetricsConfiguration {
    fn default() -> Self {
        MetricsConfiguration {
            enabled: false,
            prefix: "".into(),
            interface: "127.0.0.1".into(),
            port: 3000,
        }
    }
}

struct State {
    rpc_apis: Arc<rpc_apis::FullDependencies>,
}

fn handle_request(
    req: Request<Body>,
    conf: Arc<MetricsConfiguration>,
    state: Arc<Mutex<State>>,
) -> Response<Body> {
    let (parts, _body) = req.into_parts();
    match (parts.method, parts.uri.path()) {
        (Method::GET, "/metrics") => {
            let start = Instant::now();

            let mut reg = PrometheusRegistry::new(conf.prefix.clone());
            let state = state.lock();
            state.rpc_apis.client.prometheus_metrics(&mut reg);
            state.rpc_apis.sync.prometheus_metrics(&mut reg);
            let elapsed = start.elapsed();
            reg.register_gauge(
                "metrics_time",
                "Time to perform rpc metrics",
                elapsed.as_millis() as i64,
            );

            let mut buffer = vec![];
            let encoder = prometheus::TextEncoder::new();
            let metric_families = reg.registry().gather();

            encoder
                .encode(&metric_families, &mut buffer)
                .expect("all source of metrics are static; qed");
            let text = String::from_utf8(buffer).expect("metrics encoding is ASCII; qed");

            Response::new(Body::from(text))
        }
        (_, _) => {
            let mut res = Response::new(Body::from("not found"));
            *res.status_mut() = StatusCode::NOT_FOUND;
            res
        }
    }
}

/// Start the prometheus metrics server accessible via GET <host>:<port>/metrics
pub fn start_prometheus_metrics(
    conf: &MetricsConfiguration,
    deps: &rpc::Dependencies<rpc_apis::FullDependencies>,
) -> Result<(), String> {
    if !conf.enabled {
        return Ok(());
    }

    let addr = format!("{}:{}", conf.interface, conf.port);
    let addr = addr
        .parse()
        .map_err(|err| format!("Failed to parse address '{}': {}", addr, err))?;

    let state = State {
        rpc_apis: deps.apis.clone(),
    };
    let state = Arc::new(Mutex::new(state));
    let conf = Arc::new(conf.to_owned());
    let server = Server::bind(&addr)
        .serve(move || {
            // This is the `Service` that will handle the connection.
            // `service_fn_ok` is a helper to convert a function that
            // returns a Response into a `Service`.
            let state = state.clone();
            let conf = conf.clone();
            service_fn_ok(move |req: Request<Body>| {
                handle_request(req, conf.clone(), state.clone())
            })
        })
        .map_err(|e| eprintln!("server error: {}", e));
    info!("Started prometeus metrics at http://{}/metrics", addr);

    deps.executor.spawn(server);

    Ok(())
}