|
1 | 1 | # coding: utf-8 |
| 2 | +from __future__ import annotations |
2 | 3 |
|
3 | 4 | import hashlib |
4 | 5 | import os |
|
12 | 13 | import flow.record.fieldtypes |
13 | 14 | from flow.record import RecordDescriptor, RecordReader, RecordWriter |
14 | 15 | from flow.record.fieldtypes import ( |
15 | | - PATH_POSIX, |
16 | | - PATH_WINDOWS, |
17 | 16 | PY_312, |
| 17 | + TYPE_POSIX, |
| 18 | + TYPE_WINDOWS, |
18 | 19 | _is_posixlike_path, |
19 | 20 | _is_windowslike_path, |
| 21 | + command, |
20 | 22 | ) |
21 | 23 | from flow.record.fieldtypes import datetime as dt |
22 | | -from flow.record.fieldtypes import fieldtype_for_value, net, uri, windows_path |
| 24 | +from flow.record.fieldtypes import ( |
| 25 | + fieldtype_for_value, |
| 26 | + net, |
| 27 | + posix_command, |
| 28 | + uri, |
| 29 | + windows_command, |
| 30 | + windows_path, |
| 31 | +) |
23 | 32 |
|
24 | 33 | UTC = timezone.utc |
25 | 34 |
|
@@ -639,16 +648,16 @@ def test_path(): |
639 | 648 | assert isinstance(test_path, flow.record.fieldtypes.windows_path) |
640 | 649 |
|
641 | 650 | test_path = flow.record.fieldtypes.path.from_posix(posix_path_str) |
642 | | - assert test_path._pack() == (posix_path_str, PATH_POSIX) |
| 651 | + assert test_path._pack() == (posix_path_str, TYPE_POSIX) |
643 | 652 |
|
644 | | - test_path = flow.record.fieldtypes.path._unpack((posix_path_str, PATH_POSIX)) |
| 653 | + test_path = flow.record.fieldtypes.path._unpack((posix_path_str, TYPE_POSIX)) |
645 | 654 | assert str(test_path) == posix_path_str |
646 | 655 | assert isinstance(test_path, flow.record.fieldtypes.posix_path) |
647 | 656 |
|
648 | 657 | test_path = flow.record.fieldtypes.path.from_windows(windows_path_str) |
649 | | - assert test_path._pack() == (windows_path_str, PATH_WINDOWS) |
| 658 | + assert test_path._pack() == (windows_path_str, TYPE_WINDOWS) |
650 | 659 |
|
651 | | - test_path = flow.record.fieldtypes.path._unpack((windows_path_str, PATH_WINDOWS)) |
| 660 | + test_path = flow.record.fieldtypes.path._unpack((windows_path_str, TYPE_WINDOWS)) |
652 | 661 | assert str(test_path) == windows_path_str |
653 | 662 | assert isinstance(test_path, flow.record.fieldtypes.windows_path) |
654 | 663 |
|
@@ -998,5 +1007,130 @@ def test_datetime_comparisions(): |
998 | 1007 | assert dt("2023-01-02") != datetime(2023, 3, 4, tzinfo=UTC) |
999 | 1008 |
|
1000 | 1009 |
|
| 1010 | +def test_command_record() -> None: |
| 1011 | + TestRecord = RecordDescriptor( |
| 1012 | + "test/command", |
| 1013 | + [ |
| 1014 | + ("command", "commando"), |
| 1015 | + ], |
| 1016 | + ) |
| 1017 | + |
| 1018 | + record = TestRecord(commando="help.exe -h") |
| 1019 | + assert isinstance(record.commando, posix_command) |
| 1020 | + assert record.commando.executable == "help.exe" |
| 1021 | + assert record.commando.args == ["-h"] |
| 1022 | + |
| 1023 | + record = TestRecord(commando="something.so -h -q -something") |
| 1024 | + assert isinstance(record.commando, posix_command) |
| 1025 | + assert record.commando.executable == "something.so" |
| 1026 | + assert record.commando.args == ["-h", "-q", "-something"] |
| 1027 | + |
| 1028 | + |
| 1029 | +def test_command_integration(tmp_path: pathlib.Path) -> None: |
| 1030 | + TestRecord = RecordDescriptor( |
| 1031 | + "test/command", |
| 1032 | + [ |
| 1033 | + ("command", "commando"), |
| 1034 | + ], |
| 1035 | + ) |
| 1036 | + |
| 1037 | + with RecordWriter(tmp_path / "command_record") as writer: |
| 1038 | + record = TestRecord(commando=r"\\.\\?\some_command.exe -h,help /d quiet") |
| 1039 | + writer.write(record) |
| 1040 | + assert record.commando.executable == r"\\.\\?\some_command.exe" |
| 1041 | + assert record.commando.args == [r"-h,help /d quiet"] |
| 1042 | + |
| 1043 | + with RecordReader(tmp_path / "command_record") as reader: |
| 1044 | + for record in reader: |
| 1045 | + assert record.commando.executable == r"\\.\\?\some_command.exe" |
| 1046 | + assert record.commando.args == [r"-h,help /d quiet"] |
| 1047 | + |
| 1048 | + |
| 1049 | +def test_command_integration_none(tmp_path: pathlib.Path) -> None: |
| 1050 | + TestRecord = RecordDescriptor( |
| 1051 | + "test/command", |
| 1052 | + [ |
| 1053 | + ("command", "commando"), |
| 1054 | + ], |
| 1055 | + ) |
| 1056 | + |
| 1057 | + with RecordWriter(tmp_path / "command_record") as writer: |
| 1058 | + record = TestRecord(commando=command.from_posix(None)) |
| 1059 | + writer.write(record) |
| 1060 | + with RecordReader(tmp_path / "command_record") as reader: |
| 1061 | + for record in reader: |
| 1062 | + assert record.commando.executable is None |
| 1063 | + assert record.commando.args is None |
| 1064 | + |
| 1065 | + |
| 1066 | +@pytest.mark.parametrize( |
| 1067 | + "command_string, expected_executable, expected_argument", |
| 1068 | + [ |
| 1069 | + # Test relative windows paths |
| 1070 | + ("windows.exe something,or,somethingelse", "windows.exe", ["something,or,somethingelse"]), |
| 1071 | + # Test weird command strings for windows |
| 1072 | + ("windows.dll something,or,somethingelse", "windows.dll", ["something,or,somethingelse"]), |
| 1073 | + # Test environment variables |
| 1074 | + (r"%WINDIR%\\windows.dll something,or,somethingelse", r"%WINDIR%\\windows.dll", ["something,or,somethingelse"]), |
| 1075 | + # Test a quoted path |
| 1076 | + (r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]), |
| 1077 | + # Test a unquoted path |
| 1078 | + (r"'c:\Program Files\hello.exe'", r"c:\Program Files\hello.exe", []), |
| 1079 | + # Test an unquoted path with a path as argument |
| 1080 | + (r"'c:\Program Files\hello.exe' c:\startmepls.exe", r"c:\Program Files\hello.exe", [r"c:\startmepls.exe"]), |
| 1081 | + (None, None, None), |
| 1082 | + ], |
| 1083 | +) |
| 1084 | +def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: |
| 1085 | + cmd = windows_command(command_string) |
| 1086 | + |
| 1087 | + assert cmd.executable == expected_executable |
| 1088 | + assert cmd.args == expected_argument |
| 1089 | + |
| 1090 | + |
| 1091 | +@pytest.mark.parametrize( |
| 1092 | + "command_string, expected_executable, expected_argument", |
| 1093 | + [ |
| 1094 | + # Test relative posix command |
| 1095 | + ("some_file.so -h asdsad -f asdsadas", "some_file.so", ["-h", "asdsad", "-f", "asdsadas"]), |
| 1096 | + # Test command with spaces |
| 1097 | + (r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]), |
| 1098 | + ], |
| 1099 | +) |
| 1100 | +def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: |
| 1101 | + cmd = posix_command(command_string) |
| 1102 | + |
| 1103 | + assert cmd.executable == expected_executable |
| 1104 | + assert cmd.args == expected_argument |
| 1105 | + |
| 1106 | + |
| 1107 | +def test_command_equal() -> None: |
| 1108 | + assert command("hello.so -h") == command("hello.so -h") |
| 1109 | + assert command("hello.so -h") != command("hello.so") |
| 1110 | + |
| 1111 | + # Test different types with the comparitor |
| 1112 | + assert command("hello.so -h") == ["hello.so", "-h"] |
| 1113 | + assert command("hello.so -h") == ("hello.so", "-h") |
| 1114 | + assert command("hello.so -h") == "hello.so -h" |
| 1115 | + assert command("c:\\hello.dll -h -b") == "c:\\hello.dll -h -b" |
| 1116 | + |
| 1117 | + # Compare paths that contain spaces |
| 1118 | + assert command("'/home/some folder/file' -h") == "'/home/some folder/file' -h" |
| 1119 | + assert command("'c:\\Program files\\some.dll' -h -q") == "'c:\\Program files\\some.dll' -h -q" |
| 1120 | + assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h -q"] |
| 1121 | + assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h -q") |
| 1122 | + |
| 1123 | + # Test failure conditions |
| 1124 | + assert command("hello.so -h") != 1 |
| 1125 | + assert command("hello.so") != "hello.so -h" |
| 1126 | + assert command("hello.so") != ["hello.so", ""] |
| 1127 | + assert command("hello.so") != ("hello.so", "") |
| 1128 | + |
| 1129 | + |
| 1130 | +def test_command_failed() -> None: |
| 1131 | + with pytest.raises(ValueError): |
| 1132 | + command(b"failed") |
| 1133 | + |
| 1134 | + |
1001 | 1135 | if __name__ == "__main__": |
1002 | 1136 | __import__("standalone_test").main(globals()) |
0 commit comments