Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ To learn more about graphs using python, check out [Matplotlib](https://matplotl
- An organisation ID is required. This defaults to the ID of the organisation that is executing the job on CircleCI.
- If sending metrics to Datadog, then a `DATADOG_API_KEY` is required.

## Serverless Usage

```shell
serverless deploy
```

```shell
serverless invoke --function circleci_usage_api_exporter
```

### Caveats

- My python skillz aren't great.
Expand Down
23 changes: 23 additions & 0 deletions handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import glob

import src.get_usage_report
import src.send_to_datadog


SEND_TO_DATADOG = os.environ['SEND_TO_DATADOG']


def handler(event, context):
src.get_usage_report.get_usage_report()
print(os.getcwd())
csv_files = glob.glob('/tmp/reports/*.{}'.format('csv'))
if SEND_TO_DATADOG:
if len(csv_files) > 0:
for csv_file in csv_files:
src.send_to_datadog.main(csv_file=csv_file)
else:
print("No CSV files found in /tmp/reports, skipping.")

else:
print("SEND_TO_DATADOG set to false, skipping.")
9 changes: 9 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
certifi==2025.7.14
charset-normalizer==3.4.2
datadog-api-client==2.40.0
idna==3.10
python-dateutil==2.9.0.post0
requests==2.32.4
six==1.17.0
typing_extensions==4.14.1
urllib3==2.5.0
34 changes: 34 additions & 0 deletions serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
frameworkVersion: '3'
service: circleci-usage-api-exporter

custom:
parsedSecrets:
circleciUsageApiExporterSecret: ${ssm:/aws/reference/secretsmanager//medallion/circleci-usage-api-exporter}

provider:
name: aws
runtime: python3.10
region: us-west-2
timeout: 300


functions:
circleci_usage_api_exporter:
name: circleci-usage-api-exporter
handler: handler.handler
environment:
CIRCLECI_API_TOKEN: ${self:custom.parsedSecrets.circleciUsageApiExporterSecret.CIRCLECI_TOKEN}
DATADOG_API_KEY: ${self:custom.parsedSecrets.circleciUsageApiExporterSecret.DD_API_KEY}
DATADOG_SITE: datadoghq.com
ORG_ID: b7800015-71e0-4c65-9883-9c777d8b197e
SEND_TO_DATADOG: true
events:
- schedule: rate(1 hour) #TODO rate(1 day)

plugins:
- serverless-python-requirements

## setting individually: true is required to resolve the following error: ERR_INVALID_ARG_TYPE when using ECR config
## see more: https://github.com/serverless/serverless-python-requirements/issues/774
#package:
# individually: true
163 changes: 93 additions & 70 deletions src/get_usage_report.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,105 @@
#!/usr/bin/env python3
# Import modules
import os
import requests
import shutil
import json
import time
import gzip
import sys

# Build data to send with requests
ORG_ID = os.getenv('ORG_ID')
CIRCLECI_TOKEN = os.getenv('CIRCLECI_API_TOKEN')
START_DATE = os.getenv('START_DATE')
END_DATE = os.getenv('END_DATE')

post_data = {
"start": f"{START_DATE}T00:00:01Z",
"end": f"{END_DATE}T00:00:01Z",
"shared_org_ids": []
}

# Request the usage report
response = requests.post(
f"https://circleci.com/api/v2/organizations/{ORG_ID}/usage_export_job",
headers={"Circle-Token": CIRCLECI_TOKEN, "Content-Type": "application/json"},
data=json.dumps(post_data)
)
#print out the API response for the usage report request
print("Response Content:", response.json()) # This will parse the JSON response

# Once requested, the report can take some time to process, so a retry is built-in
if response.status_code == 201:
print("Report requested successfully")
data = response.json()
USAGE_REPORT_ID = data.get("usage_export_job_id")
print(f"Report ID is {USAGE_REPORT_ID}")

# Check if the report is ready for downloading as it can take a while to process
for i in range(5):
print("Checking if report can be downloaded")
report = requests.get(
f"https://circleci.com/api/v2/organizations/{ORG_ID}/usage_export_job/{USAGE_REPORT_ID}",
headers={"Circle-Token": CIRCLECI_TOKEN}
).json()

report_status = report.get("state")

# Download the report and save it
if report_status == "completed":
print("Report generated. Now Downloading...")
download_urls = report.get("download_urls", [])

if not os.path.exists("reports"):
from datetime import datetime, timedelta, timezone


def get_usage_report():

# Default time period is 1 day
today = datetime.today()
yesterday = today + timedelta(days=-1)

today_utc = today.astimezone(timezone.utc)
yesterday_utc = yesterday.astimezone(timezone.utc)

today_utc = today_utc.isoformat()[:-13]
yesterday_utc = yesterday_utc.isoformat()[:-13]

# Build data to send with requests
ORG_ID = os.getenv('ORG_ID')
CIRCLECI_TOKEN = os.getenv('CIRCLECI_API_TOKEN')
os.environ.setdefault("START_DATE", yesterday_utc)
os.environ.setdefault("END_DATE", today_utc)
START_DATE = os.getenv('START_DATE')
END_DATE = os.getenv('END_DATE')

post_data = {
"start": f"{START_DATE}Z",
"end": f"{END_DATE}Z",
"shared_org_ids": []
}

# Request the usage report
response = requests.post(
f"https://circleci.com/api/v2/organizations/{ORG_ID}/usage_export_job",
headers={"Circle-Token": CIRCLECI_TOKEN, "Content-Type": "application/json"},
data=json.dumps(post_data)
)
#print out the API response for the usage report request
print("Response Content:", response.json()) # This will parse the JSON response

# Once requested, the report can take some time to process, so a retry is built-in
if response.status_code == 201:
print("Report requested successfully")
data = response.json()
USAGE_REPORT_ID = data.get("usage_export_job_id")
print(f"Report ID is {USAGE_REPORT_ID}")

# Check if the report is ready for downloading as it can take a while to process
for i in range(5):
print("Checking if report can be downloaded")
report = requests.get(
f"https://circleci.com/api/v2/organizations/{ORG_ID}/usage_export_job/{USAGE_REPORT_ID}",
headers={"Circle-Token": CIRCLECI_TOKEN}
).json()

report_status = report.get("state")

# Download the report and save it
if report_status == "completed":
print("Report generated. Now Downloading...")
download_urls = report.get("download_urls", [])

# If dir exists, clear it
if os.path.exists("/tmp/reports"):
shutil.rmtree("/tmp/reports")

# Remake the dir
os.makedirs("/tmp/reports")

for idx, url in enumerate(download_urls):
r = requests.get(url)
with open(f"/tmp/usage_report_{idx}.csv.gz", "wb") as f:
f.write(r.content)

with gzip.open(f"/tmp/usage_report_{idx}.csv.gz", "rb") as f_in:
with open(f"/tmp/reports/usage_report_{idx}.csv", "wb") as f_out:
f_out.write(f_in.read())

print(f"File {idx} downloaded and extracted")

print("All files downloaded and extracted to the /reports directory")
break

elif report_status == "processing":
print("Report still processing. Retrying in 1 minute...")
time.sleep(60) # Wait for 60 seconds before retrying


for idx, url in enumerate(download_urls):
r = requests.get(url)
with open(f"/tmp/usage_report_{idx}_{today_utc}.csv.gz", "wb") as f:
f.write(r.content)

with gzip.open(f"/tmp/usage_report_{idx}_{today_utc}.csv.gz", "rb") as f_in:
with open(f"/tmp/reports/usage_report_{idx}_{today_utc}.csv", "wb") as f_out:
f_out.write(f_in.read())

print(f"File {idx} downloaded and extracted")

print("All files downloaded and extracted to the /reports directory")
break

elif report_status == "processing":
print("Report still processing. Retrying in 1 minute...")
time.sleep(60) # Wait for 60 seconds before retrying

else:
print(f"Report status: {report_status}. Error occurred.")
break
else:
print(f"Report status: {report_status}. Error occurred.")
break
print("Report is still in processing state after 5 retries.")
sys.exit(1)
else:
print("Report is still in processing state after 5 retries.")
# Exit if something else happens, like requests are being throttled
print(f"{response}")
sys.exit(1)
else:
# Exit if something else happens, like requests are being throttled
print(f"{response}")
sys.exit(1)
50 changes: 18 additions & 32 deletions src/send_to_datadog.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
"""
Lightweight script to process a CSV file and send metrics to Datadog API.
Usage: python send_to_datadog.py <path_to_csv> [--api-key <key>] [--batch-size <size>] [--site <site>]
"""

import os, csv, argparse, time, math
Expand All @@ -19,12 +18,21 @@
from datadog_api_client.v1.model.event_create_request import EventCreateRequest


os.environ.setdefault("BATCH_SIZE", '100')
os.environ.setdefault("SEND_EVENTS", "false")
os.environ.setdefault("DATADOG_SITE", "datadoghq.com")

BATCH_SIZE = os.getenv('BATCH_SIZE')
SEND_EVENTS = os.getenv('SEND_EVENTS')
DATADOG_SITE = os.getenv('DATADOG_SITE')


class DatadogCSVIngest:
"""Process CSV data and send to Datadog."""

def __init__(self, api_key=None, application_key=None, site="datadoghq.eu"):
def __init__(self, api_key=None, application_key=None, site=DATADOG_SITE):
"""Set up Datadog client with API credentials."""
self.api_key = api_key or os.environ.get("DATADOG_API_KEY")
self.api_key = api_key or os.getenv('DATADOG_API_KEY')
self.application_key = application_key or os.environ.get("DATADOG_APP_KEY")

if not self.api_key:
Expand Down Expand Up @@ -254,38 +262,20 @@ def send_events(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return results


def main():
"""Run the script."""
parser = argparse.ArgumentParser(description='Process a CSV file and send to Datadog.')
parser.add_argument('csv_file', help='Path to the CSV file')
parser.add_argument('--api-key', help='Datadog API key')
parser.add_argument('--application-key', help='Datadog Application key')
parser.add_argument('--events', action='store_true', help='Send events to Datadog')
parser.add_argument('--dry-run', action='store_true', help='Process without sending')
parser.add_argument('--batch-size', type=int, default=100, help='Batch size (default: 100)')
parser.add_argument('--site', default='datadoghq.eu',
choices=['datadoghq.com', 'datadoghq.eu', 'us3.datadoghq.com', 'us5.datadoghq.com'],
help='Datadog site (default: datadoghq.eu)')

args = parser.parse_args()

def main(csv_file):
try:
# Initialize ingestor
ingestor = DatadogCSVIngest(args.api_key, args.application_key, args.site)
print(f"Sending to Datadog site: {args.site}")
ingestor = DatadogCSVIngest()
print(f"Sending to Datadog site: {DATADOG_SITE}")

# Process CSV
print(f"Processing CSV: {args.csv_file}")
data = ingestor.process_csv(args.csv_file)
print(f"Processing CSV: {csv_file}")
data = ingestor.process_csv(csv_file)
total_rows = len(data)
print(f"Processed {total_rows} rows")

if args.dry_run:
print("Dry run - not sending to Datadog")
return 0

# Process in batches
batch_size = args.batch_size
batch_size = int(BATCH_SIZE)
row_index = 0
batch_number = 1

Expand All @@ -307,7 +297,7 @@ def main():
print("Metrics sent successfully")

# Send events if requested
if args.events:
if SEND_EVENTS.lower() == "true":
print("Sending events...")
events = ingestor.send_events(batch)
print(f"Sent {len(events)} events")
Expand Down Expand Up @@ -346,7 +336,3 @@ def main():
return 1

return 0


if __name__ == "__main__":
exit(main())