Skip to content

Control Plane Protocol

The Control Plane protocol is implemented on top of gRPC. Samplers and Clients establish bidirectional streaming RPCs connections with the Server.

There are three types of messages:

  • Requests: Can be sent at any time by any party. The sender waits for a response.
    • Registration or configuration requests.
  • Responses: Messages generated as replies to requests.
  • Messages: Can be sent at any time by any party. The receiver does not reply with a response.
    • Stats updates or error notifications.

Each Sampler and Client has an exclusive bidirectional stream, and within that stream, requests are processed sequentially. This means that responses are sent following the order in which requests were received. Messages can be interleaved within requests and responses.

In appendix A, you can find the Protobuf definitions used by the protocol implementation.

Registration

graph LR;
    Sampler--Registration Request-->Server
    Server--Registration Response-->Sampler

    Client--Registration Request-->Server
    Server--Registration Response-->Client

After establishing a connection with the server, they start the registration process which consists of:

  1. Samplers and Clients send a registration request containing an id that identifies the current session.
  2. Server replies with a registration response acknowledging their registration.

The registration request is the first message that the server expects after a new connection is established. Other messages are ignored until the registration process completes.

Inital configuration

During the registration, the Sampler sends the server its initial configuration, if any. Then, the server may decide to use it if there isn't any other configuration defined, in that case, it is sent back to the Sampler confirming that it should configure itself using its initial configuration.

Reconnections

The connection initiator automatically attempts to reestablish the connection creating a new stream if it is terminated for any reason, including network disconnects and any type of unhandled error. The registration process will need to start again.

Given that the registration request includes a session id, the Server is capable of resuming the session. For example, it may determine that there is no need to send the configuration to the Sampler again after a reconnection.

Appendix A: Protobuf definitions

syntax = "proto3";
option go_package = "github.com/neblic/platform/controlplane/protos";

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

/** common **/

message Status {
  enum Type {
    UNKNOWN = 0;
    OK = 1;
    BAD_REQUEST = 2;
  }
  Type type = 1;
  string error_message = 2;
}

message DeterministicSampling {
  // the minimum sample_rate valid value is 1 (all samples)
  int32 sample_rate = 1;
  bool sample_empty_determinant = 2;
}

message Sampling {
  oneof Sampling { DeterministicSampling deterministic_sampling = 1; }
}

message Limiter {
  // limit determines the maximum number of exported samples per second.
  // -1 means no exported samples limit
  // 0 means no samples will be exported
  int32 limit = 1;
}

message Rule {
  enum Language {
    UNKNOWN = 0;
    CEL = 1;
  }
  Language language = 1;
  string expression = 2;
}

message Stream {
  message Keyed {
    bool enabled = 1;
    google.protobuf.Duration ttl = 2;
    int32 max_keys = 3;
  }

  string uid = 1;
  string name = 2;
  Rule rule = 3;
  bool export_raw_samples = 4;
  Keyed keyed = 5;
  int32 max_sample_size = 6;
}

message Digest {
  message St { int32 max_processed_fields = 1; }

  message Value {
    // Maximum number of fields to process when processing a sample
    int32 max_processed_fields = 1;
  }

  string uid = 1;
  string name = 2;
  string stream_uid = 3;
  google.protobuf.Duration flush_period = 4;
  int32 buffer_size = 5;

  enum Location {
    UNKNOWN = 0;
    SAMPLER = 1;
    COLLECTOR = 2;
  }
  Location computation_location = 6;

  oneof Type {
    St st = 7;
    Value value = 8;
  }
}

enum SampleType {
  UNKNOWN = 0;
  RAW = 1;
  STRUCT_DIGEST = 2;
  EVENT = 3;
}

message Event {
  string uid = 1;
  string name = 2;
  string stream_uid = 3;
  SampleType sample_type = 4;
  Rule rule = 5;
  Limiter limiter = 6;
  string export_template = 7;
}

// Used to get and update the sampler configuration.
//
// When sent by the server to update a sampler, only the fields that are present
// are updated. If a field is present, the previous value is replaced with the
// new one.
//
// If the configuration option is configured from the client the process is as
// follows:
// * Add new option to SamplerConfig
// * Update platform/controlplane/data package to include new option to the
//   internal controlplane structs
// * Make sure new option can be set with the ClientSamplerConfigUpdate message
// * Update at package platform/controlplane/server/internal/registry the method
//   Client.UpdateSamplerConfig so the new option gets set in the server
//   registry
// * Update neblictl if the option can be configured using the CLI
message SamplerConfig {
  // Configures the sampler streams.
  repeated Stream streams = 1;
  // Sets an upper bound to the amount of samples that will be processed by the
  // sampler. It protects the application from an excessive CPU usage due to the
  // sampler processing a large amount of samples.
  Limiter limiter_in = 2;
  // Defines the sampling strategy to apply when a sample is received and before
  // processing it in any way (e.g. before determining if a sample belongs to a
  // stream which would require parsing it and evaluating the stream rules).
  // Sampling is performed after limiter_in has been applied.
  Sampling sampling_in = 3;
  // Sets an upper bound to the amount of samples that will be exported by the
  // sampler. It protects the application from overloading the network due to
  // the sampler exporting a large amount of samples.
  Limiter limiter_out = 4;
  // Configure the sampler digests.
  repeated Digest digests = 5;
  // Configure the sampler events.
  repeated Event events = 6;
}

message SamplerSamplingStats {
  uint64 samples_evaluated = 1;
  uint64 samples_exported = 2;
  uint64 samples_digested = 3;
}

message Schema {
  enum Type {
    SCHEMALESS = 0;
    NATIVE = 1;
    PROTOBUF = 2;
  }
  Type type = 1;
  google.protobuf.Any schema = 2;
}

message Sampler {
  message Tag {
    string name = 1;
    map<string, string> attrs = 2;
  }

  message CollectorStats { uint64 samples_collected = 1; }

  // Identifies a single Sampler
  string uid = 1;
  // User-friendly and user-defined name to identify a Sampler.
  string name = 2;
  // Defines the Sampler resource where it has been defined.
  // Multiple samplers can have the same name and resource but will hav
  // edifferent uids. e.g. multiple replicas of the same service will define
  // multiple times the same sampler with different uids.
  string resource = 3;
  // User-defined tags to identify a Sampler.
  // There are known tags that can be used to identify the Sampler type
  // and may be used by the platform to provide additional functionality.
  repeated Tag tags = 7;
  // Sampler capabilities defines what features can be performed by the
  // sampler
  Capabilities capabilities = 8;
  // Sampler schema information.
  Schema schema = 4;
  // The Sampler configuration,
  SamplerConfig config = 5;
  // Statistics related to the Sampler sampling.
  SamplerSamplingStats sampling_stats = 6;
  // Statistics related to the sampler in the collector context
  CollectorStats collector_stats = 9;
}

/** service **/

message SamplerToServer {
  google.protobuf.Timestamp timestamp = 1;
  string name = 2;
  string resouce = 3;
  string sampler_uid = 4;
  oneof Message {
    // Messages
    SamplerStatsMsg sampler_stats_msg = 5;
    // Requests
    SamplerRegisterReq register_req = 6;
    // Responses
    ServerSamplerConfRes conf_res = 7;
  }
}

message ServerToSampler {
  google.protobuf.Timestamp timestamp = 1;
  string server_uid = 2;
  oneof Message {
    // Responses
    SamplerRegisterRes register_res = 3;
    // Requests
    ServerSamplerConfReq conf_req = 4;
  }
}

message ClientToServer {
  google.protobuf.Timestamp timestamp = 1;
  string client_uid = 2;
  oneof Message {
    // Requests
    ClientRegisterReq register_req = 3;
    ClientListSamplersReq list_samplers_req = 4;
    ClientSamplerConfReq sampler_conf_req = 5;
  }
}

message ServerToClient {
  google.protobuf.Timestamp timestamp = 1;
  string server_uid = 2;
  oneof Message {
    // Messages
    ClientSamplerStatsMsg sampler_stats_msg = 3;
    // Responses
    ClientRegisterRes register_res = 4;
    ClientListSamplersRes list_samplers_res = 5;
    ClientSamplerConfRes sampler_conf_res = 6;
  }
}

service ControlPlane {
  rpc SamplerConn(stream SamplerToServer) returns (stream ServerToSampler);
  rpc ClientConn(stream ClientToServer) returns (stream ServerToClient);
}

/** sampler messages **/

// stats

message SamplerStatsMsg { SamplerSamplingStats sampling_stats = 2; }

// register

message SamplerRegisterReq {
  ClientSamplerConfigUpdate initial_config = 1;
  repeated Sampler.Tag tags = 2;
  Capabilities capabilities = 3;
}

message SamplerRegisterRes { Status status = 1; }

// configure sampling rules

message ServerSamplerConfReq {
  // The Sampler configuration.
  SamplerConfig sampler_config = 1;
}

message ServerSamplerConfRes { Status status = 1; }

/** client messages **/

// stats

message ClientSamplerStats {
  string sampler_uid = 1;
  SamplerSamplingStats sampling_stats = 2;
}

message ClientSamplerStatsMsg { repeated ClientSamplerStats sampler_stats = 1; }

// register

message StreamCapabilities { bool enabled = 1; }

message LimiterCapabilities { bool enabled = 1; }

message SamplingCapabilities {
  enum Type {
    UNKNOWN = 0;
    DETERMINISTIC = 1;
  }

  bool enabled = 1;
  repeated Type types = 2;
}

message DigestCapabilities {
  enum Type {
    UNKNOWN = 0;
    STRUCT = 1;
    VALUE = 2;
  }

  bool enabled = 1;
  repeated Type types = 2;
}

message Capabilities {
  StreamCapabilities stream = 1;
  LimiterCapabilities limiter_in = 3;
  SamplingCapabilities sampling_in = 4;
  LimiterCapabilities limiter_out = 5;
  DigestCapabilities digest = 2;
}

message ClientRegisterReq { map<string, string> tags = 1; }

message ClientRegisterRes { Status status = 1; }

// list samplers

message ClientListSamplersReq {}

message ClientListSamplersRes {
  Status status = 1;
  repeated Sampler samplers = 2;
}

// configure sampling list

message ClientStreamUpdate {
  enum Op {
    UNKNOWN = 0;
    UPSERT = 1;
    DELETE = 2;
  }

  Op op = 1;
  Stream stream = 2;
}

message ClientDigestUpdate {
  enum Op {
    UNKNOWN = 0;
    UPSERT = 1;
    DELETE = 2;
  }

  Op op = 1;
  Digest digest = 2;
}

message ClientEventUpdate {
  enum Op {
    UNKNOWN = 0;
    UPSERT = 1;
    DELETE = 2;
  }

  Op op = 1;
  Event event = 2;
}

message ClientSamplerConfigUpdate {
  // If a field is set to true, it means that the field is reset to its default.
  // If a configuration option is reset and set in the same request, it will be
  // first resetted and then set to its new value.
  message Reset {
    bool streams = 1;
    bool limiter_in = 2;
    bool sampling_in = 3;
    bool limiter_out = 4;
    bool digests = 5;
    bool events = 6;
  }
  Reset reset = 1;

  // All fields are optional. If a field is nil, it means that the field will
  // not be updated.
  repeated ClientStreamUpdate stream_updates = 2;
  Limiter limiter_in = 3;
  Sampling sampling_in = 4;
  Limiter limiter_out = 5;
  repeated ClientDigestUpdate digest_updates = 6;
  repeated ClientEventUpdate event_updates = 7;
}

message ClientSamplerConfReq {
  // Name of the Samplers to configure. All samplers with the provided name will
  // receive this configuration. It is a mandatory field.
  string sampler_name = 1;
  // Name of the resource where the sampler is defined
  string sampler_resource = 2;
  // Update configuration options. If not set, resets all configuration options
  ClientSamplerConfigUpdate sampler_config_update = 3;
}

message ClientSamplerConfRes { Status status = 1; }