Analizando logs intento de inicio de sesión en server Debian

Bueno, la imagen principal claramente no refiere al termino “logs” del que va la entrada, pero creo que se ve bonita.

Anteriormente hice un post sobre el tema de los logs de autenticación: https://blog.greenborn.com.ar/2022/03/08/hay-que-mirar-el-auth-log-de-vez-en-cuando/ y en el mismo mencioné más que nada que esta bueno mirarlo.

En esta oportunidad, la idea es ir un poco más allá y empezar a realizar un análisis sobre los mismos.

Ya que por el momento tengo más de 50 MB de logs de autenticación (descomprimidos).

¿Que se pretende analizar?

Por lo pronto hacer una estadistica de la cantidad de indidencias por IP.

Para lo cual usaré un pequeño script en Python que se encargará de recorrer los archivos (que serán almacenados en un directorio “logs”) y generar el .CSV con el resultado del reporte que se guardará en la carpeta “resultado”.

También se publicará el listado de IPs con su correspondiente numero de intentos por si a alguien le sirve, aunque lo más probable es que aquellos que quieren ingresar sin credenciales de acceso ya tengan IPs diferentes salvo que el mismo se realice desde servidores especificos que ya fueron vulnerados.

Y si uno tiene contratado un server con una IP que se encuentre en dicha lista, y no se dedica a realizar dichas actividades, debería revisar su sistema a fondo por que es claro que están usando su equipo sin su consentimiento.

¿Como se hace el análisis?

Como se mencionnó anteriormente, usando Python3 y las librerías os y csv.

Con la librería os se puede obtener el listado de archivos de una carpeta, para lo que nos bastaría una linea como la siguiente:

lista_archivos = os.listdir("logs")

Luego realizo una función que se encarga de recorrer cada una de las lineas de los archivos cargados:

def procesar_log( nombre_archivo ):
  archivo = open(nombre_archivo, mode="r")
  for linea in archivo:
    split_linea = linea.split(":")
    fecha   = split_linea[0].split(" ")
    mes     = fecha[0]
    dia     = fecha[1]
    hora    = fecha[2]
    minuto  = split_linea[1]

    split_linea_2_s = split_linea[2].split(" ")

    segundo = split_linea_2_s[0]

    host = split_linea_2_s[1]

    app = split_linea_2_s[2]
    app_split = app.split('[')
    prioridad = ''
    
    if len(app_split) == 2:
      app = app_split[0]
      prioridad = app_split[1].split(']')[0]

    len_s_linea = len(split_linea) 

    c = 4
    mensaje = split_linea[3]
    while c < len_s_linea:
      mensaje = mensaje + ':' + split_linea[c]
      c += 1

    primera_palabra = mensaje.split(" ")[1]

    #Buscamos IPs IPv4
    mensaje_ip4 = get_ipv4(mensaje)
    
    #Armamos el registro
    registro = { "fecha":[mes, dia], "hora":[hora, minuto, segundo], "ip": mensaje_ip4, "primera_palabra":primera_palabra, "mensaje":mensaje }

    #Catalogar por primera palabra del mensaje
    
    if not primera_palabra in logs_palabra:
      logs_palabra[ primera_palabra ] = []

    logs_palabra[ primera_palabra ].append(registro)

    #Catalogar por ipv4
    if mensaje_ip4 == '':
      mensaje_ip4 = 'sin_catalogar'
    
    if not mensaje_ip4 in logs_ip:
      logs_ip[ mensaje_ip4 ] = []
      listado_ips[ mensaje_ip4 ] = { "cant_intentos": 1 }
    else:
      listado_ips[ mensaje_ip4 ][ "cant_intentos" ] += 1

    logs_ip[ mensaje_ip4 ].append(registro)

    #Catalogar por fecha y hora
    if not mes in logs_fecha_hora:
      logs_fecha_hora[mes] = {}
    
    if not dia in logs_fecha_hora[mes]:
      logs_fecha_hora[mes][dia] = {}
    
    if not hora in logs_fecha_hora[mes][dia]:
      logs_fecha_hora[mes][dia][hora] = {}
    
    if not minuto in logs_fecha_hora[mes][dia][hora]:
      logs_fecha_hora[mes][dia][hora][minuto] = {}

    if not segundo in logs_fecha_hora[mes][dia][hora][minuto]:
      logs_fecha_hora[mes][dia][hora][minuto][segundo] = []

    logs_fecha_hora[mes][dia][hora][minuto][segundo].append(registro)
  
  archivo.close()

Basicamente se usa el metodo split para separar cada una de las lineas del archivos y poder procesarlas.

Y guarda la información en los objetos:

  • logs_palabra: Donde los registros se catalogan de acuerdo a la palabra con la cual empiezan
  • logs_fecha_hora: Para catalogar los registros por fecha
  • logs_ip: para catalogar los registros por IP
  • listado_ips: Se genera un listado de IPs al cual se le asigna la cantidad de veces que una ip aparece en un registro

Para obtener la IPv4 a partir de un texto uso:

def get_ipv4( text ):
  ip = ''
  numero_grupo = ''
  caracter_anterior = ''
  cant_puntos = 0
  c = 1
  len_text = len(text)
  if len_text > 0:
    caracter_anterior = text[0]

  while  c < len_text:
    if text[c].isnumeric():
      numero_grupo += text[c]
    elif text[c] == '.' or (text[c] == ' ' and caracter_anterior.isnumeric() and cant_puntos < 4):
      cant_puntos += 1
      if numero_grupo.isnumeric() and int(numero_grupo) < 256:
        ip += numero_grupo
        if cant_puntos < 4:
          ip += '.'
        numero_grupo = ''
    caracter_anterior = text[c]
    c += 1
  
  if cant_puntos == 4:
    return ip
  else:
    return ''

Entonces luego se recorre el listado de archivos de logs para llamar a la función que se encarga de cargar datos en los arreglos con:

for archivo in lista_archivos:
  print('procesando archivo: '+archivo)
  procesar_log("logs/"+archivo)

Y luego de tener listo el arreglo (ya se que no son arreglos, son diccionarios, pero por lo pronto los llamo así) listado_ips se procede a armar el .csv

with open('resultado/ips.csv', 'w') as csvfile:
  filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
  filewriter.writerow(['IP', 'Cantidad registros'])

  for ip in listado_ips:
    print(ip + ' > ' + str(listado_ips[ip]["cant_intentos"]))
    filewriter.writerow([ip, listado_ips[ip]["cant_intentos"]])

Consideraciones sobre el código anterior

Uno podría preguntarse para que definir los diccionarios: logs_palabra, logs_fecha_hora y
logs_ip si tienen información redundante y no se estan usando ahora!

La respuesta a esa hipotética pregunta es simple, pienso usarlos mas tarde por que pienso agregar más funcionalidades al script.

Otra cuestión a tener en cuenta es que la función que obtiene la IP no toma en cuenta algunas cosas como por ej los puertos, o que solo esté pensada para IPv4, cuando hace mucho tiempo IPv6 ya està entre nosotros.

Por lo pronto no llegaron peticiones con IPv6 asignadas creo que es interentarse preguntarse el por qué, así que por eso todavía no las consideré.

El codigo fuente pueden verlo en: https://github.com/Greenborn/analizar_logs

¿Donde veo el CSV con las IPs?

Está subido en el mismo repo: https://github.com/Greenborn/analizar_logs/blob/master/auth_logs/resultado/ips.csv

Los registros que dicen “sin_catalogar” corresponden a registros que están relacionados a una de las incidencias pero en los cuales no figuran un IP en específico, por lo cual no los inclui para no inflar las estadisticas, ya que por cada intento de inicio de sesión se crean al menos 4 registros.

Conclusiones

Creo que no podría terminar el post sin un gráfico, por lo que realizé uno a partir del .csv:

En si, intentando ser objetivo no nos daría mucha información, ya que sería necesario cruzar otros datos para poder identificar mejor la procedencia de los intentos de inicio de sesión, ya que quien quiera ingresar puede que use varias IPs diferentes.

Creo que las únicas certezas que podemos tener sobre los mismos por el momento, es que un aproximado de un 10% de las mismas se están realizando de forma automatizada con algún script o herramienta, ya que nadie en su sano juicio probaría más de 300 claves de forma manual.

Este gráfico más que certezas me dejan preguntas para seguir investigando…