How to get the total CPU and memory of an AWS ECS Fargate cluster using Python

3 min read

603 words

Hey there, 👋

Have you ever tried to get the total amount of CPU cores and memory of one of your ECS clusters? Today I wrote a script to do just that. I wrote it in Python because the boto3 library also powers the AWS CLI, so I thought this might be the best and easiest solution.

The only customization you need to do when setting up the script is to specify a profile name and a cluster name at the beginning of the main() function. Currently, the script only supports single clusters. It shouldn't be too hard to extend it to iterate over all clusters found in a region. But that is beyond the scope of this post.

The script itself gets up to 100 services defined in an ECS cluster and gets the current task definition of it. Using the information from the task definition, we check the CPU and memory requirements. As information we also get the CPU and memory information of each container in the task.

At the end of the script, we get a short list of the total amount of CPUs and memory, as well as the amounts split between the x86 and ARM CPU architectures. The output will be in the style in which you defined the task definitions. For example, half a CPU core will be displayed as 512. To get the full amount of CPU cores you have to divide the number by 1024.

The Python script

Without further ado, here's the script.

import boto3
from dataclasses import dataclass

@dataclass
class TaskDefinition:
    service: str
    task_definition: str
    cpu: int
    memory: int
    container_definitions: list
    cpu_architecture: str
    desired_count: int

def get_running_task_definitions(profile_name='staging', cluster='default') -> list[TaskDefinition]:
    session = boto3.Session(profile_name=profile_name)
    ecs_client = session.client('ecs')

    running_task_definitions: list[TaskDefinition] = []

    services = ecs_client.list_services(cluster=cluster, maxResults=100)['serviceArns']
    print(f"Services: {len(services)}")

    for service in services:
        service_desc = ecs_client.describe_services(cluster=cluster, services=[service])['services'][0]
        task_definition = service_desc['taskDefinition']

        task_def_desc = ecs_client.describe_task_definition(taskDefinition=task_definition)['taskDefinition']
        cpu_architecture = "x86_64"
        if 'runtimePlatform' in task_def_desc:
            cpu_architecture = task_def_desc['runtimePlatform']['cpuArchitecture'].lower()

        running_task_definitions.append(TaskDefinition(
            service=service,
            task_definition=task_definition,
            cpu=int(task_def_desc.get('cpu')),
            memory=int(task_def_desc.get('memory')),
            container_definitions=task_def_desc['containerDefinitions'],
            cpu_architecture=cpu_architecture,
            desired_count=int(service_desc['desiredCount'])
        ))

    return running_task_definitions

def calculate_total_resources(task_definitions: list[TaskDefinition]):
    total_cpus = 0
    total_memory = 0
    running_x86_cpus = 0
    running_arm_cpus = 0
    running_x86_ram = 0
    running_arm_ram = 0

    for task_def in task_definitions:
        total_cpus += (task_def.cpu * task_def.desired_count)
        total_memory += (task_def.memory * task_def.desired_count)

        match task_def.cpu_architecture:
            case "x86_64":
                running_x86_cpus += task_def.cpu * task_def.desired_count
                running_x86_ram += task_def.memory * task_def.desired_count
            case "arm64":
                running_arm_cpus += task_def.cpu * task_def.desired_count
                running_arm_ram += task_def.memory * task_def.desired_count

    return total_cpus, total_memory, running_x86_cpus, running_x86_ram, running_arm_cpus, running_arm_ram

def main():
    # Change the profile_name to the AWS profile you want to use
    profile_name = 'staging'
    # Change the cluster_name to the ECS cluster you want to get the information from
    cluster_name = 'default'
    task_definitions = get_running_task_definitions(profile_name, cluster_name)
    total_cpus, total_memory, running_x86_cpus, running_x86_ram, running_arm_cpus, running_arm_ram = calculate_total_resources(task_definitions)

    for task_def in task_definitions:
        print(f"Service: {task_def.service}")
        print(f"Task CPU: {task_def.cpu}, Task Memory: {task_def.memory}")
        print(f"CPU Architecture: {task_def.cpu_architecture}")

        # Comment out the following line if you don't want to see the container definitions
        print("Container Definitions:")
        for container in task_def.container_definitions:
            print(f"  Container: {container['name']}")
            print(f"    CPU: {container.get('cpu')}")
            print(f"    Memory: {container.get('memory')}")

    print(f"Total CPUs: {total_cpus}")
    print(f"Total Memory: {total_memory}")
    print(f"Total X86 CPUs: {running_x86_cpus}")
    print(f"Total X86 RAM: {running_x86_ram}")
    print(f"Total ARM CPUs: {running_arm_cpus}")
    print(f"Total ARM RAM: {running_arm_ram}")

if __name__ == "__main__":
    main()

I hope this script helps you to get a better overview of your ECS clusters.

Thanks for reading & have a nice day! 👋

Niklas