awss3receiver

package module
v0.155.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: Apache-2.0 Imports: 31 Imported by: 3

README

AWS S3 Receiver

Status
Stability alpha: traces, metrics, logs
Distributions contrib
Issues Open issues Closed issues
Code coverage codecov
Code Owners @atoulme, @adcharre

Overview

Receiver for retrieving telemetry data (traces, metrics, and logs) previously stored in S3. This receiver is commonly used with the AWS S3 Exporter but can process any S3 objects containing telemetry data in supported formats.

The receiver supports:

  • Traces, Metrics, and Logs - All three OpenTelemetry signal types
  • Multiple formats - OTLP (JSON and Protocol Buffers), plus custom formats via encoding extensions
  • Two retrieval modes:
    • Time-based: Fetch data from a specific time range using S3 key partitioning
    • Event-driven: Process new objects as they arrive via SQS notifications

Configuration

The following exporter configuration parameters are supported.

Name Description Default Required
starttime The time at which to start retrieving data. Required if fetching by time
endtime The time at which to stop retrieving data. Required if fetching by time
s3downloader:
region AWS region. "us-east-1" Optional
s3_bucket S3 bucket Required
s3_prefix prefix for the S3 key (root directory inside bucket). Required
s3_partition_format Format for the partition key, See strftime for format specification. "year=%Y/month=%m/day=%d/hour=%H/minute=%M" Optional
s3_partition_timezone IANA timezone name applied when formatting the partition key. Local time Optional
file_prefix file prefix defined by user Optional
file_prefix_include_telemetry_type whether to append <telemetry_type>_ to the file prefix when building S3 keys true Optional
endpoint overrides the endpoint used by the exporter instead of constructing it from region and s3_bucket Optional
endpoint_partition_id partition id to use if endpoint is specified. "aws" Optional
s3_force_path_style set this to true to force the request to use path-style addressing false Optional
sqs:
queue_url The URL of the SQS queue that receives S3 bucket notifications Required if fetching by SQS notification
region AWS region of the SQS queue Required if fetching by SQS notification
endpoint Custom endpoint for the SQS service Optional
max_number_of_messages Maximum number of messages to retrieve in a single SQS request 10 Optional
wait_time_seconds Wait time in seconds for long polling SQS requests 20 Optional
encodings: An array of entries with the following properties: Optional
extension Extension to use for decoding a key with a matching suffix. Required
suffix Key suffix to match against. Required
notifications:
opampextension Name of the OpAMP Extension to use to send ingest progress notifications.
tag_object_after_ingestion If enabled the receiver will attempt to tag the object after successfully ingesting it. false Optional
skip_ingesting_tagged_objects If enabled the receiver will skip objects tagged by tag_object_after_ingestion. This can be used as a checkpointing mechanism, and requires an additional s3:GetObjectTagging permission false Optional

There are two modes of operation:

  1. Time Range Mode - Specify starttime and endtime to fetch data from a specific time range.
  2. SQS Message Mode - Subscribe to SQS messages to process new objects as they arrive.
SQS Message Configuration

The receiver can subscribe to an SQS queue that receives S3 event notifications:

sqs:
  # Required: The ARN of the SQS queue that receives S3 bucket notifications
  queue_url: "https:https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
  # Required: The AWS region of the SQS queue
  region: "us-east-1"

Note: You must configure your S3 bucket to send event notifications to the SQS queue. Time-based configuration (starttime/endtime) and SQS configuration cannot be used together.

Time format for starttime and endtime

The starttime and endtime fields are used to specify the time range for which to retrieve data. The time format is either RFC3339,YYYY-MM-DD HH:MM or simply YYYY-MM-DD, in which case the time is assumed to be 00:00.

Encodings

By default, the receiver understands the following encodings:

  • otlp_json (OpenTelemetry Protocol format represented as json) with a suffix of .json
  • otlp_proto (OpenTelemetry Protocol format represented as Protocol Buffers) with a suffix of .binpb

The encodings options allows you to specify Encoding Extensions to use to decode keys with matching suffixes.

Example Configuration
extension:
  # example of text encoding extension
  text_encoding:
    encoding: utf8
    marshaling_separator: "\n"
    unmarshaling_separator: "\r?\n"

receivers:
  awss3:
    starttime: "2024-01-01 01:00"
    endtime: "2024-01-02"
    s3downloader:
        region: "us-west-1"
        s3_bucket: "mybucket"
        s3_prefix: "trace"
        s3_partition_format: "year=%Y/month=%m/day=%d/hour=%H/minute=%M"
        s3_partition_timezone: "UTC"
        file_prefix_include_telemetry_type: true
    encodings:
      - extension: text_encoding
        suffix: ".txt"

receivers:
  awss3/sqs_traces:
    s3downloader:
      region: us-east-1
      s3_bucket: mybucket
      s3_prefix: mytrace
    sqs:
      queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
      region: "us-east-1"

exporters:
  otlp_grpc:
    endpoint: otelcol:4317

service:
  pipelines:
    traces:
      receivers: [awss3/traces]
      exporters: [otlp_grpc]

    traces/sqs:
      receivers: [awss3/sqs_traces]
      exporters: [otlp_grpc]

Notifications

The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of "org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus". The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The body of the record is set to status and the timestamp of the record is used to hold the ingest time. The record also has the following attributes:

Attribute Description
telemetry_type The type of telemetry being ingested. One of "traces", "metrics", or "logs".
ingest_status The status of the data ingestion. One of "ingesting", "failed", or "completed".
start_time The time to start retrieving data as an Int64, nanoseconds since Unix epoch.
end_time The time to stop retrieving data as an Int64, nanoseconds since Unix epoch.
failure_message Error message if ingest_status is "failed".

The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time. If during the processing of the data an error occurs a status message with ingest_status set to "failed" status with the time of the data being ingested when the failure occurred. If the ingest process completes successfully a status message with ingest_status set to "completed" is sent.

Object Lifecycle Management

If the tag_object_after_ingestion is enabled the receiver will make a best-effort attempt to tag objects with otel-collector:status = ingested after they are processed by the pipeline. This requires an additional s3:PutObjectTagging permission. This tag can then be used with a lifecycle policy to expire ingested objects or transition them to cheaper storage classes.

Documentation

Overview

Package awss3receiver implements a receiver that can be used by the OpenTelemetry collector to retrieve traces previously stored in S3 by the AWS S3 Exporter.

Index

Constants

View Source
const (
	IngestStatusCompleted = "completed"
	IngestStatusFailed    = "failed"
	IngestStatusIngesting = "ingesting"
	CustomCapability      = "org.opentelemetry.collector.receiver.awss3"
)

Variables

This section is empty.

Functions

func NewFactory

func NewFactory() receiver.Factory

Types

type Config

type Config struct {
	S3Downloader  S3DownloaderConfig `mapstructure:"s3downloader"`
	StartTime     string             `mapstructure:"starttime"`
	EndTime       string             `mapstructure:"endtime"`
	Encodings     []Encoding         `mapstructure:"encodings"`
	Notifications Notifications      `mapstructure:"notifications"`
	// SQS configures receiving S3 object change notifications via an SQS queue.
	SQS *SQSConfig `mapstructure:"sqs"`
}

Config defines the configuration for the file receiver.

func (Config) Validate

func (c Config) Validate() error

type Encoding added in v0.103.0

type Encoding struct {
	Extension component.ID `mapstructure:"extension"`
	Suffix    string       `mapstructure:"suffix"`
	// contains filtered or unexported fields
}

Encoding defines the encoding configuration for the file receiver.

type ListObjectsAPI added in v0.101.0

type ListObjectsAPI interface {
	NewListObjectsV2Paginator(params *s3.ListObjectsV2Input) ListObjectsV2Pager
}

type ListObjectsV2Pager added in v0.101.0

type ListObjectsV2Pager interface {
	HasMorePages() bool
	NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
}

type Notifications added in v0.112.0

type Notifications struct {
	OpAMP *component.ID `mapstructure:"opampextension"`
	// contains filtered or unexported fields
}

Notifications groups optional notification sources.

type S3DownloaderConfig

type S3DownloaderConfig struct {
	Region                         string `mapstructure:"region"`
	S3Bucket                       string `mapstructure:"s3_bucket"`
	S3Prefix                       string `mapstructure:"s3_prefix"`
	S3PartitionFormat              string `mapstructure:"s3_partition_format"`
	S3PartitionTimezone            string `mapstructure:"s3_partition_timezone"`
	FilePrefix                     string `mapstructure:"file_prefix"`
	FilePrefixIncludeTelemetryType bool   `mapstructure:"file_prefix_include_telemetry_type"`
	Endpoint                       string `mapstructure:"endpoint"`
	EndpointPartitionID            string `mapstructure:"endpoint_partition_id"`
	S3ForcePathStyle               bool   `mapstructure:"s3_force_path_style"`
	TagObjectAfterIngestion        bool   `mapstructure:"tag_object_after_ingestion"`
	SkipIngestingTaggedObjects     bool   `mapstructure:"skip_ingesting_tagged_objects"`
}

S3DownloaderConfig contains aws s3 downloader related config to controls things like bucket, prefix, batching, connections, retries, etc.

type SQSConfig added in v0.127.0

type SQSConfig struct {
	// QueueURL is the URL of the SQS queue to receive S3 notifications.
	QueueURL string `mapstructure:"queue_url"`
	// Region specifies the AWS region of the SQS queue.
	Region string `mapstructure:"region"`
	// Endpoint is the optional custom endpoint for SQS (useful for testing).
	Endpoint string `mapstructure:"endpoint"`
	// WaitTimeSeconds specifies the duration (in seconds) for long polling SQS messages.
	// Maximum is 20 seconds. Default is 20 seconds.
	WaitTimeSeconds *int64 `mapstructure:"wait_time_seconds"`
	// MaxNumberOfMessages specifies the maximum number of messages to receive in a single poll.
	// Valid values: 1-10. Default is 10.
	MaxNumberOfMessages *int64 `mapstructure:"max_number_of_messages"`
}

SQSConfig holds SQS queue configuration for receiving object change notifications.

type SingleObjectAPI added in v0.150.0

type SingleObjectAPI interface {
	GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	GetObjectTagging(ctx context.Context, params *s3.GetObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.GetObjectTaggingOutput, error)
	PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
}

Directories

Path Synopsis
internal
metadata
Package metadata contains the autogenerated telemetry and build information for the receiver/awss3 component.
Package metadata contains the autogenerated telemetry and build information for the receiver/awss3 component.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL