MH-EX-001: Análisis básico de malware con Yara

En este ejemplo práctico crearemos una pequeña herramienta que nos permita analizar, de forma muy básica, malware usando Yara.

Yara es una herramienta que tiene como objetivo ayudar a los investigadores de malware a clasificar y detectar malware.

El uso de Yara puede ser tan sencillo o complejo como queramos. La herramienta propuesta tiene como objetivo ser un PoC simple, funcional y práctico. Siéntase libre de copiar el código y ampliarlo como necesite.

Problema

Analizar muestras de malware en batch puede puede no resultar sencillo, sobre todo si las muestras son de gran tamaño o tenemos que aplicar muchas reglas.

La principal dificultad suele ser el correcto diseño, implementación y documentación de la aplicación.

Solución

La solución propuesta implementa un sistema de procesamiento paralelo, distribuido y automatizado de análisis de muestras, en el que se pueden añadir nuevas muestras en cualquier momento, quedando en cola de espera para ser atendidas.

La aplicación generará un resultado en formato CSV compatible con Excel.

Las soluciones aquí propuestas siguen los casos de estudio propuestos en el bloque de desarrollo.

Cómo

Funcionamiento

Ayuda en la linea de comandos

Nuestra aplicación se ejecuta a través de la linea de comandos. Para ver todas las opciones disponibles tan solo tenemos que escribir:

python start.py --help

Y como resultado:

usage: start.py [-h] -p SAMPLE_PATH [-v VERBOSITY] [-o OUTPUT_FILE]
            [--rules-path RULES_PATH]

OMSTD Malware

optional arguments:
  -h, --help            show this help message and exit
  -p SAMPLE_PATH, --sample-path SAMPLE_PATH
                        binary sample path
  -v VERBOSITY          enable verbose mode
  -o OUTPUT_FILE        output file name
  --rules-path RULES_PATH
                        yara rules path (default .rules/)

Ejecución básica

El funcionamiento es muy simple. El analizador está hecho usando Celery, por lo que hay que seguir los mismos pasos explicados en el anexo del caso de estudio BH-001 para ejecutar la aplicación: Pasos para lanzar Celery.

Una vez lanzado el servicio de Celery, así como sus dependencias, nuestra aplicación ya está a la espera de recibir muestras para ser analizadas. Para esto ejecutaremos:

python start.py -p samples/sample.txt -o results

Aquí podemos ver la salida generada: Un CSV con la información:

HelloWorld,True,Data: 'Hello world' (flags: 19 # offset: 0),1,Rule HelloWorld was found a match in binary 'sample.txt' with tags: No tags
HelloWorld,True,Data: 'Hello world' (flags: 19 # offset: 0),0,Rule HelloWorld was found a match in binary 'sample.txt' with tags: No tags
Another,False,,1,Rule Another was NOT found a match in binary 'sample.txt' with tags: No tags
SeeYou,True,Data: 'See you' (flags: 19 # offset: 12),1,Rule SeeYou was  found a match in binary 'sample.txt' with tags: No tags
Another,False,,0,Rule Another was NOT found a match in binary 'sample.txt' with tags: No tags

Muestras y reglas de ejemplo

Puede encontrar muestras y reglas Celery de ejemplo, para poder probar la herramienta inmediatamente:

En la sección Anexo puede encontrar un listado de enlaces con reglas listas para ser utilizadas.

Desglose de las piezas

Las piezas que compondrán la herramienta son las siguientes:

  • Servicio que recibe nuevas muestras.
  • Generador de resultados.
  • Medio para añadir nuevas muestras

Todo el código fuente lo puedes descargar y probar aquí.

La estructura y organización del proyecto sigue las directrices de ST-001.

Recepción de muestras

El servicio de recepción de muestras es un tarea de Celery, a la espera de recibir nueva información para analizar de forma asíncrona y en background.

El motor de análisis es muy sencillo. El análisis se hace en dos partes:

  • El análisis con Yara.
  • La interpretación y transformación de resultados.

Ambas partes están definidas como dos funciones en el fichero yara_task.py.

Analizador con Yara

La función yara_task, convertida en tarea con el decorador @celery.task, es la encargada de hacer la llamada a Yara y:

  1. Cargar las reglas Yara, con el método yara.compile(), indicándole el listado de ficheros “*.yara” con las reglas.
  2. Usando partials (LP-003) construimos la llamada al callback. Dicho callback será llamado por la librería de Yara, cada vez que ésta ejecute con éxito una regla.
  3. Finalmente se lanza la orden para hacer el “match”, rules.match(...), indicándole el partial anteriormente construido.

En el siguiente código se puede ver estos puntos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@celery.task
def yara_task(input_parameters):
    """
    Celery task that process a binary sample.

    :param input_parameters: Parameters object with global input parameters
    :type input_parameters: Parameters
    """

    # Load Yara rules and match with binary
    rules = yara.compile(filepaths=input_parameters.yara_rules_files)

    # Make custom callback function
    callback_function = partial(yara_callback, input_parameters)

    # Run Yara rules!
    rules.match(input_parameters.sample_path, callback=callback_function)

Analizador de resultados

La función yara_callback actúa como callback que Yara llamará cuando termine de procesar cada una de las reglas.

En ella, y tras comprobarse la validez de los parámetros de entrada, se llevan a cabo las siguientes acciones: #. Transformar los datos de entrada del formato Yara al formato interno, del tipo Results, según SP-003 #. Enviar la información, de forma asíncrona, a la tarea que se encarga de almacenar los resultados: celery.send_task("framework.tasks.export_results_task.export_to_csv", ...).

En el siguiente código se puede ver estos puntos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# ----------------------------------------------------------------------
def yara_callback(input_parameters, yara_results):
    """
    Call back for Yara match. Retreive Yara yara_results, as dictionary, and call Celery task that stores yara_results.

    :param input_parameters: Parameters object with global input parameters
    :type input_parameters: Parameters

    :param yara_results: Raw Yara match yara_results in format: {'matches': True, 'rule': 'HelloWorld', 'namespace': '0', 'tags': [], 'meta': {}, 'strings': [{'identifier': '$a', 'flags': 19, 'data': 'Hello world', 'offset': 0}]}
    :type yara_results: dict
    """

    # Inputs validations
    if not isinstance(yara_results, dict):
        raise TypeError("Expected dict, got '%s' instead" % type(yara_results))

    if yara_results is None:
        print("Yara rules returned not yara_results")
        return

    # Yara input format:
    #
    # {'matches': True,
    #  'meta': {},
    #  'namespace': '0',
    #  'rule': 'HelloWorld',
    #  'strings': [{'data': 'Hello world',
    #               'flags': 19,
    #               'identifier': '$a',
    #               'offset': 0}],
    #  'tags': []}
    #
    r_rule = yara_results['rule']
    r_matches = yara_results['matches']
    r_tags = ",".join(yara_results['tags']) if yara_results['tags'] else "No tags"
    r_namespace = int(yara_results['namespace'])

    # Fixing payload
    r_payload = "#".join(["Data: '%s' (flags: %s # offset: %s)" % (x['data'], x['flags'], x['offset'])
                          for x
                          in yara_results['strings']])

    # Make Results structure
    r = Results(rule=r_rule,
                matches=r_matches,
                payload=r_payload,
                namespace=r_namespace,
                description="Rule %s was %sfound a match in "
                            "binary '%s' with tags: %s" % (r_rule,
                                                           "" if r_matches else "NOT ",
                                                           input_parameters.sample_file_name,
                                                           r_tags))

    # Send info to exporter
    celery.send_task("framework.tasks.export_results_task.export_to_csv", (input_parameters, r))

Generador de resultados

Los resultados serán generados de forma asíncrona, al igual que la recepción de éstos. Para ello, se ha creado una tarea de Celery, a la espera de recibir nueva información, con la que generar los resultados deseados.

La herramienta genera los resultados a través de la función yara_callback(...). Ésta, una vez hecha la transformación, hace una llamada la a la tarea generadora de resultados, llamada: export_to_csv(...).

Hay varias formas de llamar a una tarea de Celery, como se puede estudiar en BH-001. En este caso nos hemos decantado por la opción my_task.send_task("..."), como se ha visto en la :ref:` sección anterior <mh-001-results-analysis-section>`.

La función, convertida en tarea, export_to_csv(...) hace 3 cosas muy sencillas:

  1. Una comprobación mínima de los parámetros de entrada.
  2. Abre un fichero, siguiendo las recomendaciones de LP-005, en modo “append” o para añadir información al final de éste.
  3. Escribe una nueva linea en el fichero en formato csv

Tal y como podemos ver en la lineas señaladas:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@celery.task
def export_to_csv(input_params, results):
    """
    Export results to CSV file.
    
    :param input_params: Global Parameters instance with input configuration
    :type input_params: Parameters
    
    :param results: Results object with analysis results.
    :type results: Results
    """

    if not isinstance(input_params, Parameters):
        raise TypeError("Expected Parameters, got '%s' instead" % type(input_params))

    with open("%s.csv" % input_params.output_file, "a") as f:
        csv_writer = csv.writer(f)

        # Title
        # csv_writer.writerow(["# Rule", "matches", "payload", "description"])

        csv_writer.writerow([
            results.rule,
            results.matches,
            results.payload,
            results.namespace,
            results.description
        ])

Añadir nuevas muestras

Añadir nuevas muestras, es equivalente a decir: enviar nuevas muestras a la cola de análisis, para que sean procesadas cuando toque su turno.

Para este PoC se ha optado por algo sencillo: un linea de comandos (en secciones anteriores se muestra cómo ejecutarse).

Hemos tenido en cuenta las siguientes medidas o recomendaciones:

  • Hemos usado la librería standard argparser, siguiendo la sintaxis de la documentación oficial, y teniendo en cuenta las buenas prácticas especificadas en IT-001.
  • Además, hemos prevenido la ejecución accidental o a destiempo aplicando LP-004.
  • Hemos centralizado la ejecución en un punto, usando ST-004, dejando preparado el proyecto para otras interfaces de usuario.
  • Hemos usado un punto central para almacenar los parámetros de ejecución del usuario, como se recomienda en ST-002.

Tal y como podemos ver en la lineas señaladas:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import argparse

from api import Parameters, run_all

# ----------------------------------------------------------------------
if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='OMSTD Malware')
    parser.add_argument("-p", "--sample-path", type=str, dest="sample_path", help="binary sample path", required=True)
    parser.add_argument("-v", dest="verbosity", type=int, help="enable verbose mode", default=False)
    parser.add_argument("-o", dest="output_file", type=str, help="output file name", default=None)
    parser.add_argument("--rules-path", dest="rules_path", type=str, help="yara rules path (default .rules/)",
                        default=None)

    params = parser.parse_args()

    input_parameters = Parameters(sample_path=params.sample_path,
                                  output_file=params.output_file,
                                  verbosity=params.verbosity,
                                  rules_path=params.rules_path)

    run_all(input_parameters)

La ejecución tiene lugar a través de la función run_all(...), incluido en el fichero api.py. Si observamos el código, podemos comprobar que lo que hace dicha función es una llamada a la tarea de análisis, con sintaxis Celery:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from lib.data import Parameters, Results
from framework.celery.celery import celery
from framework.tasks.yara_task import yara_task


# ----------------------------------------------------------------------
def run_all(input_parameters):

    # Display results
    yara_task.delay(input_parameters)