对安全组规则的合规性进行自动审计修复

若安全组规则对全网段(0.0.0.0/0)开放22(SSH服务)、3389(RDP)等高危风险端口,将会给系统带来严重的安全隐患。您可借助配置审计持续检测安全组配置并自动修复不合规的配置项,确保系统安全。

应用背景

在企业的云环境中,安全组作为网络流量的核心管控手段,通常用于定义服务器实例的访问规则。然而,在复杂的多实例场景下,因运维疏忽或策略设计缺陷,安全组规则可能存在以下典型风险配置:

  • 高危端口全网段暴露:例如,面向公网(0.0.0.0/0)开放SSH(22端口)、RDP(3389端口)或数据库服务端口(如3306、6379),导致实例直接暴露于互联网,成为暴力破解、数据泄露等攻击的首要目标。

  • 内网与公网服务混淆:未区分实例业务属性(如公网Web服务与内网数据库),错误对非公网实例开放全量IP访问权限,形成内部网络横向渗透风险。

解决方案

通过阿里云配置审计服务创建规则,实现对安全组规则变更的持续监控,并设置规则检测诸如22、3389、3306等高危端口是否对外开放。一旦发现新增或修改安全组规则允许这些端口对公网开放,即触发合规审计。此时,配置审计将自动启动函数计算执行自定义修复逻辑,利用阿里云SDK调整安全组设置,例如删除风险规则。修复后,系统会重新评估相关规则以确认修复效果。此外,用户可以通过配置审计控制台查看不合规资源的修正详情,整个过程透明且可追溯,有效防止未经授权的公网访问。此方案不仅提高了运维效率,减少了人工干预,还确保资源配置始终符合安全与合规要求,增强了环境的安全性和稳定性。

image

创建配置审计及函数计算修复规则

本方案通过Terraform创建配置审计规则,并结合函数计算实现不合规资源的自动修复,从而达成云资源合规性的自动化检测与管理。

说明

如果当前操作用户为RAM用户时,请为RAM用户授予以下权限。详细信息请参见RAM用户授权

RAM权限策略

此自定义权限策略允许用户管理和操作ECS安全组规则及函数计算服务和函数。

{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:CreateExplorerModuleVersion",
        "iacservice:GetExplorerModule",
        "iacservice:CreateExplorerModule",
        "iacservice:ListExplorerModules",
        "iacservice:UpdateExplorerModuleAttribute",
        "iacservice:DeleteExplorerModule"
      ],
      "Resource": "acs:iacservice:*:*:explorermodule/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:CreateExplorerTask",
        "iacservice:UpdateExplorerTaskAttribute",
        "iacservice:GetExplorerTask",
        "iacservice:DeleteExplorerTask"
      ],
      "Resource": "acs:iacservice:*:*:explorertask/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:CreateJob",
        "iacservice:GetJob",
        "iacservice:listJobs",
        "iacservice:OperateJob"
      ],
      "Resource": "acs:iacservice:*:*:explorertask/*/job/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:ListResources",
        "iacservice:ListExplorerHistories",
        "iacservice:CreateExplorerHistory",
        "iacservice:ExportTerraformCode"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "ecs:RevokeSecurityGroup",
        "ecs:DescribeSecurityGroups",
        "ecs:DescribeSecurityGroupAttributes"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "fc:CreateService",
        "fc:DeleteService",
        "fc:UpdateService",
        "fc:CreateFunction",
        "fc:DeleteFunction",
        "fc:UpdateFunction",
        "fc:InvokeFunction",
        "fc:ListServices",
        "fc:ListFunctions",
        "fc:GetService",
        "fc:GetFunction"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "config:*",
      "Resource": "*"
    }
  ]
}
说明

本教程所含示例代码支持一键运行,您可以直接运行代码。 一键运行

重要

本方案中采用直接删除不合规安全组方式实现自动修正,可能会影响业务的连续性,请根据实际业务修改函数计算中的修正代码。

Terraform代码

variable "region_id" {
  type    = string
  default = "cn-shenzhen"
}

provider "alicloud" {
  region = var.region_id
}

resource "local_file" "python_script" {
  content  = <<EOF
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import sys
sys.path.append('/opt/python')
import json
import logging
import jmespath  # 使用jmespath代替jsonpath
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.auth.credentials import AccessKeyCredential
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.request import CommonRequest


logger = logging.getLogger()


def handler(event, context):
    logger.info(f"This is event: {str(event, encoding='utf-8')}")
    get_resources_non_compliant(event, context)


def get_resources_non_compliant(event, context):
    # 获取不合规的资源信息
    resources = parse_json(event)
    # 遍历不合规资源,进行修正操作
    for resource in resources:
        remediation(resource, context)


def parse_json(content):
    """
    Parse string to json object
    :param content: json string content
    :return: Json object
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


def remediation(resource, context):
    logger.info(f"需要修复的资源信息: {resource}")
    region_id = resource['regionId']
    account_id = resource['accountId']
    resource_id = resource['resourceId']
    resource_type = resource['resourceType']
    if resource_type == 'ACS::ECS::SecurityGroup' :
        # 获取不合规安全组的配置信息,重新校验,确保不合规安全组的评估准确性
        resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
        resource_json = json.loads(resource_result)
        configuration = json.loads(resource_json["DiscoveredResourceDetail"]["Configuration"])
        # 判断是否是托管的安全组
        is_managed_security_group = configuration.get('ServiceManaged')
        # 使用jmespath获取入方向为接受且授权0.0.0.0/0的安全组规则id
        delete_security_group_rule_ids = jmespath.search(
            "Permissions.Permission[?SourceCidrIp=='0.0.0.0/0'].SecurityGroupRuleId",
            configuration
        )
        # 非托管的安全组,且授权了0.0.0.0/0的入方向安全组规则,则删除
        if is_managed_security_group is False and delete_security_group_rule_ids:
            logger.info(f"注意:删除安全组规则 {region_id}:{resource_id}:{delete_security_group_rule_ids}")
            revoke_security_group(context, region_id, resource_id, delete_security_group_rule_ids)

def revoke_security_group(context, region_id, resource_id, security_group_rule_ids):
    creds = context.credentials
    client = AcsClient(creds.access_key_id, creds.access_key_secret, region_id=region_id)
    request = CommonRequest()
    request.set_accept_format('json')
    request.set_domain(f'ecs.{region_id}.aliyuncs.com')
    request.set_method('POST')
    request.set_protocol_type('https') # https | http
    request.set_version('2014-05-26')
    request.set_action_name('RevokeSecurityGroup')
    request.add_query_param('RegionId', region_id)
    for index, value in enumerate(security_group_rule_ids):
        request.add_query_param(f'SecurityGroupRuleId.{index + 1}', value)
    request.add_query_param('SecurityGroupId', resource_id)
    request.add_query_param('SecurityToken', creds.security_token)

    response = client.do_action_with_exception(request)
    logger.info(f"删除结果: {str(response, encoding='utf-8')}")


# 获取资源详情
def get_discovered_resource(context, resource_id, resource_type, region_id):
    """
    调用API获取资源配置详情
    :param context:函数计算上下文
    :param resource_id:资源ID
    :param resource_type:资源类型
    :param region_id:资源所属地域ID
    :return: 资源详情
    """
    # 需具备权限AliyunConfigFullAccess的函数计算FC的服务角色。
    creds = context.credentials
    client = AcsClient(creds.access_key_id, creds.access_key_secret, region_id='cn-shanghai')

    request = CommonRequest()
    request.set_domain('config.cn-shanghai.aliyuncs.com')
    request.set_version('2020-09-07')
    request.set_action_name('GetDiscoveredResource')
    request.add_query_param('ResourceId', resource_id)
    request.add_query_param('ResourceType', resource_type)
    request.add_query_param('Region', region_id)
    request.add_query_param('SecurityToken', creds.security_token)
    request.set_method('GET')

    try:
        response = client.do_action_with_exception(request)
        resource_result = str(response, encoding='utf-8')
        return resource_result
    except Exception as e:
        logger.error('GetDiscoveredResource error: %s' % e)
  EOF
  filename = "${path.module}/python/index.py"
}

resource "local_file" "requirements_txt" {
  content  = <<EOF
  aliyun-python-sdk-core==2.15.2
  jmespath>=0.10.0
  EOF
  filename = "${path.module}/python/requests/requirements.txt"
}
locals {
  code_dir          = "${path.module}/python/"
  archive_output    = "${path.module}/code.zip"
  base64_output     = "${path.module}/code_base64.txt"
}

data "archive_file" "code_package" {
  type        = "zip"
  source_dir  = local.code_dir
  output_path = local.archive_output

  depends_on = [
    local_file.python_script,
    local_file.requirements_txt,
  ]
}

resource "null_resource" "upload_code" {
  provisioner "local-exec" {
    command = <<EOT
      base64 -w 0 ${local.archive_output} > ${local.base64_output}
    EOT

    interpreter = ["sh", "-c"]
  }

  depends_on = [data.archive_file.code_package]
}

data "local_file" "base64_encoded_code" {
  filename = local.base64_output
  depends_on = [null_resource.upload_code]
}
resource "alicloud_fcv3_function" "fc_function" {
  runtime       = "python3.10"
  handler       = "index.handler"
  function_name = "HHM-FC-TEST"
  role          = alicloud_ram_role.role.arn

  code {
    zip_file = data.local_file.base64_encoded_code.content
  }
  lifecycle {
    ignore_changes = [
      code
    ]
  }

  # 显式设置 log_config 为空
  log_config {}

  depends_on = [data.local_file.base64_encoded_code]
}

resource "alicloud_config_rule" "default" {
  rule_name   = "SPM0014安全组不允许对全部网段开启风险端口"
  description = "禁止安全组对所有网段开放风险端口22, 3389"
  source_owner = "ALIYUN"
  # (必需,ForceNew)指定是您还是阿里云拥有并管理该规则。有效值: CUSTOM_FC: 该规则是自定义规则,您拥有该规则 ● ALIYUN: 该规则是托管规则,阿里云拥有该规则。
  source_identifier = "sg-risky-ports-check"
  #配置审计ARN(必需,ForceNew)规则的标识符。对于托管规则,值为托管规则的名称。对于自定义规则,值为自定义规则的ARN。
  resource_types_scope = ["ACS::ECS::SecurityGroup"]
  #规则监控被排除的资源ID,多个ID用逗号分隔,仅适用于基于托管规则创建的规则,定制规则此字段为空。
  config_rule_trigger_types = "ConfigurationItemChangeNotification" #规则在配置更改时被触发
  #有效值包括:One_Hour、Three_Hours、Six_Hours、Twelve_Hours、TwentyFour_Hours。
  risk_level = 1                                                           #    ● 1: 严重 ● 2: 警告● 3: 信息

  input_parameters = {
    "ports" : "22,3389"
  }
}

resource "alicloud_config_remediation" "default" {
  config_rule_id          = alicloud_config_rule.default.id
  remediation_template_id = alicloud_fcv3_function.fc_function.function_arn
  remediation_source_type = "CUSTOM"
  invoke_type             = "AUTO_EXECUTION"
  params                  = "{}"
  remediation_type        = "FC"
}

resource "random_integer" "default" {
  min = 10000
  max = 99999
}

resource "alicloud_ram_role" "role" {
  name        = "tf-example-role-${random_integer.default.result}"
  document    = <<EOF
{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "fc.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}
EOF
  description = "Ecs ram role."
  force       = true
}
resource "alicloud_ram_policy" "policy" {
  policy_name     = "tf-example-ram-policy-${random_integer.default.result}"
  policy_document = <<EOF
  {
    "Statement": [
      {
        "Action":  [
          "config:GetDiscoveredResource",
          "ecs:RevokeSecurityGroup"
        ],
        "Effect":  "Allow",
        "Resource": ["*"]
      }
    ],
      "Version": "1"
  }
  EOF
  description     = "this is a policy test"
  force           = true
}

resource "alicloud_ram_role_policy_attachment" "attach" {
  policy_name = alicloud_ram_policy.policy.policy_name
  policy_type = "Custom"
  role_name   = alicloud_ram_role.role.name
}

创建结果展示

  1. 登录配置审计控台查看创建规则。

    image

  1. 登录函数计算控台查看创建函数。

image

查看修正结果

修正前

  1. 配置审计不合规资源展示。

    image

  2. 登录ECS安全组查看。

    image

修正后

  1. 配置审计自动修正详情展示。

    image

  1. 修正后ECS安全组查看。

    image

相关文档

    OSZAR »