diff --git a/tests/test_buffer_work_space.py b/tests/test_buffer_work_space.py deleted file mode 100644 index 9330cf6..0000000 --- a/tests/test_buffer_work_space.py +++ /dev/null @@ -1,349 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -"""Tests for `buffer_work_space` module.""" - -import io -import os -import tempfile -import pytest -from pytest_mock import MockerFixture -from file_read_backwards.buffer_work_space import BufferWorkSpace -from file_read_backwards.buffer_work_space import new_lines_bytes -from file_read_backwards.buffer_work_space import _find_furthest_new_line -from file_read_backwards.buffer_work_space import _remove_trailing_new_line -from file_read_backwards.buffer_work_space import _get_file_size -from file_read_backwards.buffer_work_space import _is_partially_read_new_line -from file_read_backwards.buffer_work_space import _get_what_to_read_next -from file_read_backwards.buffer_work_space import _get_next_chunk - - -class TestFindFurthestNewLine: - def test_find_furthest_new_line_with_no_new_line_in_empty_byte_string(self): - test_string = b"" - r = _find_furthest_new_line(test_string) - assert r == -1 - - def test_find_furthest_new_line_with_no_new_line_in_non_empty_byte_string(self): - test_string = b"SomeRandomCharacters" - r = _find_furthest_new_line(test_string) - assert r == -1 - - def test_find_furthest_new_line_with_bytestring_with_new_line_at_the_end(self): - base_string = b"SomeRandomCharacters" - for n in new_lines_bytes: - test_string = base_string + n - expected_value = len(test_string) - 1 - r = _find_furthest_new_line(test_string) - assert r == expected_value - - def test_find_furthest_new_line_with_bytestring_with_new_line_in_the_middle(self): - base_string = b"SomeRandomCharacters" - for n in new_lines_bytes: - test_string = base_string + n + base_string - expected_value = len(base_string) + len(n) - 1 - r = _find_furthest_new_line(test_string) - assert r == expected_value - - def test_find_furthest_new_line_with_bytestring_with_new_line_in_the_middle_and_end(self): - base_string = b"SomeRandomCharacters" - for n in new_lines_bytes: - test_string = base_string + n + base_string + n - expected_value = len(test_string) - 1 - r = _find_furthest_new_line(test_string) - assert r == expected_value - - -class TestRemoveTrailingNewLine: - def test_remove_trailing_new_line_with_empty_byte_string(self): - test_string = b"" - expected_string = test_string - r = _remove_trailing_new_line(test_string) - assert r == expected_string - - def test_remove_trailing_new_line_with_non_empty_byte_string_with_no_new_line(self): - test_string = b"Something" - expected_string = test_string - r = _remove_trailing_new_line(test_string) - assert r == expected_string - - def test_remove_trailing_new_line_with_non_empty_byte_string_with_variety_of_new_lines(self): - expected_str = b"Something" - for n in new_lines_bytes: - test_string = expected_str + n - r = _remove_trailing_new_line(test_string) - assert r == expected_str - - def test_remove_trailing_new_line_with_non_empty_byte_string_with_variety_of_new_lines_in_the_middle(self): - base_string = b"Something" - for n in new_lines_bytes: - test_string = base_string + n + base_string - expected_string = test_string - r = _remove_trailing_new_line(test_string) - assert r == expected_string - - -class TestGetFileSize: - def test_empty_file(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - pass - expected_value = 0 - with io.open(t.name, mode="rb") as fp: - r = _get_file_size(fp) - assert r == expected_value - os.unlink(t.name) - - def test_file_with_eight_bytes(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - t.write(b"a" * 8) - expected_value = 8 - with io.open(t.name, mode="rb") as fp: - r = _get_file_size(fp) - assert r == expected_value - os.unlink(t.name) - - -class TestIsPartiallyReadNewLine: - def test_when_we_have_a_partially_read_new_line(self): - for n in new_lines_bytes: - if len(n) > 1: - b = n[-1] - r = _is_partially_read_new_line(b) - assert r - - -class TestGetWhatToReadNext: - def test_with_empty_file(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - pass - expected_result = (0, 0) - with io.open(t.name, mode="rb") as fp: - r = _get_what_to_read_next(fp, previously_read_position=0, chunk_size=3) - assert r == expected_result - os.unlink(t.name) - - def test_with_file_with_seven_bytes_of_alphanumeric(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - t.write(b"abcdefg") - expected_result = (4, 3) - with io.open(t.name, mode="rb") as fp: - r = _get_what_to_read_next(fp, previously_read_position=7, chunk_size=3) - assert r == expected_result - os.unlink(t.name) - - def test_with_file_with_single_new_line(self): - for n in new_lines_bytes: - with tempfile.NamedTemporaryFile(delete=False) as t: - t.write(n) - expected_result = (0, len(n)) - chunk_size = len(n) + 1 - with io.open(t.name, mode="rb") as fp: - r = _get_what_to_read_next(fp, previously_read_position=len(n), chunk_size=chunk_size) - assert r == expected_result - os.unlink(t.name) - - def test_with_file_where_we_need_to_read_more_than_chunk_size(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - t.write(b"abcd\nfg") - expected_result = (3, 4) - with io.open(t.name, mode="rb") as fp: - r = _get_what_to_read_next(fp, previously_read_position=7, chunk_size=3) - assert r == expected_result - os.unlink(t.name) - - -class TestGetNextChunk: - def test_with_empty_file(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - pass - expected_result = (b"", 0) - with io.open(t.name, mode="rb") as fp: - r = _get_next_chunk(fp, previously_read_position=0, chunk_size=3) - assert r == expected_result - os.unlink(t.name) - - def test_with_non_empty_file(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - t.write(b"abcdefg") - expected_result = (b"efg", 4) - with io.open(t.name, mode="rb") as fp: - r = _get_next_chunk(fp, previously_read_position=7, chunk_size=3) - assert r == expected_result - os.unlink(t.name) - - def test_with_non_empty_file_where_we_read_more_than_chunk_size(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - t.write(b"abcd\nfg") - expected_result = (b"d\nfg", 3) - with io.open(t.name, mode="rb") as fp: - r = _get_next_chunk(fp, previously_read_position=7, chunk_size=3) - assert r == expected_result - os.unlink(t.name) - - -class TestBufferWorkSpace: - def test_add_to_empty_buffer_work_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.add_to_buffer(content=b"aaa", read_position=1021) - assert b.read_buffer == b"aaa" - assert b.read_position == 1021 - - def test_add_to_non_empty_buffer_work_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.add_to_buffer(content=b"aaa", read_position=1021) - b.add_to_buffer(content=b"bbb", read_position=1018) - assert b.read_buffer == b"bbbaaa" - assert b.read_position == 1018 - - def test_yieldable_for_new_initialized_buffer_work_space(self): - with tempfile.NamedTemporaryFile(delete=False) as t: - with io.open(t.name, mode="rb") as fp: - b = BufferWorkSpace(fp, chunk_size=io.DEFAULT_BUFFER_SIZE) - r = b.yieldable() - assert not r - os.unlink(t.name) - - def test_yieldable_for_unexhausted_buffer_space_with_single_new_line(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 1024 - len(n) - b.read_buffer = n - expected_result = False - r = b.yieldable() - assert r == expected_result - - def test_yieldable_for_buffer_space_with_two_new_lines(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 1024 - (len(n) * 2) - b.read_buffer = n * 2 - expected_result = True - r = b.yieldable() - assert r == expected_result - - def test_yieldable_for_fully_read_with_unreturned_contents_in_buffer_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 0 - b.read_buffer = b"" - expected_result = True - r = b.yieldable() - assert r == expected_result - - def test_yieldable_for_fully_read_and_returned_contents_in_buffer_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 0 - b.read_buffer = None - r = b.yieldable() - assert not r - - def test_return_line_with_buffer_space_with_two_new_lines(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 1024 - (len(n) * 2) - b.read_buffer = n * 2 - expected_result = b"" - r = b.return_line() - assert r == expected_result - - def test_return_line_with_buffer_space_with_some_contents_between_two_new_lines(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 1024 - (len(n) * 2) - b.read_buffer = n + b"Something" + n - expected_result = b"Something" - r = b.return_line() - assert r == expected_result - - def test_return_line_with_buffer_space_with_fully_read_in_contents_at_its_last_line(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - for n in new_lines_bytes: - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 0 - b.read_buffer = b"LastLineYay" - expected_result = b"LastLineYay" - r = b.return_line() - assert r == expected_result - - def test_return_line_contract_violation(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 0 - - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - with pytest.raises(AssertionError): - b.return_line() - - def test_has_returned_every_line_empty_file(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 0 - - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - r = b.has_returned_every_line() - assert r - - def test_has_returned_every_line_with_not_fully_read_in_buffer_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 1 - r = b.has_returned_every_line() - assert not r - - def test_has_returned_every_line_with_fully_read_in_and_unprocessed_buffer_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 0 - b.read_buffer = b"abc" - r = b.has_returned_every_line() - assert not r - - def test_has_returned_every_line_with_fully_read_in_and_processed_buffer_space(self, mocker: MockerFixture): - _get_file_size_mock = mocker.patch("file_read_backwards.buffer_work_space._get_file_size") - fp_mock = mocker.Mock() - _get_file_size_mock.return_value = 1024 - - b = BufferWorkSpace(fp_mock, chunk_size=io.DEFAULT_BUFFER_SIZE) - b.read_position = 0 - b.read_buffer = None - r = b.has_returned_every_line() - assert r diff --git a/tests/test_file_read_backwards.py b/tests/test_file_read_backwards.py index 060bc49..8f8b6e6 100644 --- a/tests/test_file_read_backwards.py +++ b/tests/test_file_read_backwards.py @@ -14,13 +14,6 @@ from file_read_backwards.buffer_work_space import new_lines -# doing this xrange/range dance so that we don't need to add additional dependencies of future or six modules -try: - xrange -except NameError: - xrange = range - - created_files = set() @@ -32,7 +25,7 @@ def helper_write(t, s, encoding="utf-8"): def helper_create_temp_file(generator=None, encoding='utf-8'): global created_files if generator is None: - generator = ("line {}!\n".format(i) for i in xrange(42)) + generator = ("line {}!\n".format(i) for i in range(42)) temp_file = tempfile.NamedTemporaryFile(delete=False) for line in generator: helper_write(temp_file, line, encoding) @@ -69,85 +62,34 @@ def cleanup_files(): helper_destroy_temp_files() -class TestFileReadBackwards: - def test_with_completely_empty_file(self, empty_file): - f = FileReadBackwards(empty_file.name) - expected_lines = deque() - lines_read = deque() - for line in f: - lines_read.appendleft(line) - assert expected_lines == lines_read - - def test_file_with_a_single_new_line_char_with_different_encodings(self): +class TestFileReadBackwardsReadline: + """Test class focused specifically on the readline() functionality.""" + + def test_readline_with_completely_empty_file(self, empty_file): + """Test readline on completely empty file returns empty string.""" + with FileReadBackwards(empty_file.name) as f: + line = f.readline() + assert line == "" + + # Subsequent calls should also return empty string + line = f.readline() + assert line == "" + + def test_readline_with_single_newline_char_different_encodings(self): + """Test readline with files containing only a newline character in different encodings.""" for encoding, new_line in itertools.product(supported_encodings, new_lines): temp_file = helper_create_temp_file((line for line in [new_line]), encoding=encoding) - f = FileReadBackwards(temp_file.name) - expected_lines = deque([""]) - lines_read = deque() - for line in f: - lines_read.appendleft(line) - assert expected_lines == lines_read - - def test_file_with_one_line_of_text_with_accented_char_followed_by_a_new_line(self): - b = b'Caf\xc3\xa9' # accented e in utf-8 - s = b.decode("utf-8") - for new_line in new_lines: - temp_file = helper_create_temp_file((line for line in [s, new_line])) - f = FileReadBackwards(temp_file.name) - expected_lines = deque([s]) - lines_read = deque() - for line in f: - lines_read.appendleft(s) - assert expected_lines == lines_read - - def test_file_with_one_line_of_text_followed_by_a_new_line_with_different_encodings(self): - for encoding, new_line in itertools.product(supported_encodings, new_lines): - temp_file = helper_create_temp_file((line for line in ["something{0}".format(new_line)]), encoding=encoding) - f = FileReadBackwards(temp_file.name) - expected_lines = deque(["something"]) - lines_read = deque() - for line in f: - lines_read.appendleft(line) - assert expected_lines == lines_read - - def test_file_with_varying_number_of_new_lines_and_some_text_in_chunk_size(self): - chunk_size = 3 - s = "t" - for number_of_new_lines in xrange(21): - for new_line in new_lines: - temp_file = helper_create_temp_file((line for line in [new_line * number_of_new_lines, s * chunk_size])) - f = FileReadBackwards(temp_file.name, chunk_size=chunk_size) - expected_lines = deque() - for _ in xrange(number_of_new_lines): - expected_lines.append("") - expected_lines.append(s * chunk_size) - lines_read = deque() - for line in f: - lines_read.appendleft(line) - assert expected_lines == lines_read - - def test_file_with_new_lines_and_some_accented_characters_in_chunk_size(self): - chunk_size = 3 - b = b'\xc3\xa9' - s = b.decode("utf-8") - for number_of_new_lines in xrange(21): - for new_line in new_lines: - temp_file = helper_create_temp_file((line for line in [new_line * number_of_new_lines, s * chunk_size])) - f = FileReadBackwards(temp_file.name, chunk_size=chunk_size) - expected_lines = deque() - for _ in xrange(number_of_new_lines): - expected_lines.append("") - expected_lines.append(s * chunk_size) - lines_read = deque() - for line in f: - lines_read.appendleft(line) - assert expected_lines == lines_read - - def test_unsupported_encoding(self, empty_file): - with pytest.raises(NotImplementedError): - _ = FileReadBackwards(empty_file.name, encoding="not-supported-encoding") + with FileReadBackwards(temp_file.name, encoding=encoding) as f: + line = f.readline() + expected_line = "" + os.linesep + assert line == expected_line + + # Next readline should return empty string + line = f.readline() + assert line == "" - def test_file_with_one_line_of_text_readline(self): + def test_readline_with_single_line_text_and_newline(self): + """Test readline with a single line of text followed by newline.""" s = "Line0" for new_line in new_lines: temp_file = helper_create_temp_file((line for line in [s, new_line])) @@ -160,7 +102,8 @@ def test_file_with_one_line_of_text_readline(self): expected_second_line = "" assert second_line == expected_second_line - def test_file_with_two_lines_of_text_readline(self): + def test_readline_with_two_lines_of_text(self): + """Test readline with two lines of text - should read in reverse order.""" line0 = "Line0" line1 = "Line1" for new_line in new_lines: @@ -168,18 +111,122 @@ def test_file_with_two_lines_of_text_readline(self): line1_with_n = "{}{}".format(line1, new_line) temp_file = helper_create_temp_file((line for line in [line0_with_n, line1_with_n])) with FileReadBackwards(temp_file.name) as fp: + # First readline should return the last line (Line1) line = fp.readline() expected_line = line1 + os.linesep assert line == expected_line + # Second readline should return the first line (Line0) second_line = fp.readline() expected_second_line = line0 + os.linesep assert second_line == expected_second_line + # Third readline should return empty string third_line = fp.readline() expected_third_line = "" assert third_line == expected_third_line + def test_readline_with_multiple_lines(self): + """Test readline with multiple lines to ensure proper reverse order.""" + lines = ["First line", "Second line", "Third line", "Fourth line"] + for new_line in new_lines: + lines_with_newline = [line + new_line for line in lines] + temp_file = helper_create_temp_file((line for line in lines_with_newline)) + with FileReadBackwards(temp_file.name) as fp: + # Read all lines and verify they come in reverse order + read_lines = [] + while True: + line = fp.readline() + if line == "": + break + # Remove the os.linesep that readline() adds + clean_line = line.rstrip(os.linesep) + read_lines.append(clean_line) + + # Should read lines in reverse order + expected_order = list(reversed(lines)) + assert read_lines == expected_order + + def test_readline_with_accented_characters(self): + """Test readline with accented characters in different encodings.""" + b = b'Caf\xc3\xa9' # accented e in utf-8 + s = b.decode("utf-8") + for encoding in supported_encodings: + if encoding == "utf-8": # Only test with UTF-8 for accented chars + for new_line in new_lines: + temp_file = helper_create_temp_file((line for line in [s, new_line]), encoding=encoding) + with FileReadBackwards(temp_file.name, encoding=encoding) as f: + line = f.readline() + expected_line = s + os.linesep + assert line == expected_line + + def test_readline_with_varying_chunk_sizes(self): + """Test readline with different chunk sizes to ensure it works regardless of buffer size.""" + test_lines = ["Short", "A much longer line with more content", "Medium length line"] + chunk_sizes = [1, 3, 8, 16, 64] + + for chunk_size in chunk_sizes: + for new_line in new_lines: + lines_with_newline = [line + new_line for line in test_lines] + temp_file = helper_create_temp_file((line for line in lines_with_newline)) + with FileReadBackwards(temp_file.name, chunk_size=chunk_size) as fp: + read_lines = [] + while True: + line = fp.readline() + if line == "": + break + clean_line = line.rstrip(os.linesep) + read_lines.append(clean_line) + + expected_order = list(reversed(test_lines)) + assert read_lines == expected_order + + def test_readline_with_empty_lines(self): + """Test readline with files containing empty lines.""" + for new_line in new_lines: + # Create file with: line1, empty line, line2, empty line + content = ["Line1", new_line, "", new_line, "Line2", new_line, "", new_line] + temp_file = helper_create_temp_file((line for line in content)) + with FileReadBackwards(temp_file.name) as fp: + # Should read in reverse: empty, Line2, empty, Line1 + line1 = fp.readline().rstrip(os.linesep) + assert line1 == "" + + line2 = fp.readline().rstrip(os.linesep) + assert line2 == "Line2" + + line3 = fp.readline().rstrip(os.linesep) + assert line3 == "" + + line4 = fp.readline().rstrip(os.linesep) + assert line4 == "Line1" + + # Should be at end + line5 = fp.readline() + assert line5 == "" + + def test_readline_with_file_without_final_newline(self): + """Test readline with file that doesn't end with a newline.""" + content = "Line without newline" + temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w') + temp_file.write(content) + temp_file.close() + created_files.add(temp_file) + + with FileReadBackwards(temp_file.name) as fp: + line = fp.readline() + expected_line = content + os.linesep + assert line == expected_line + + # Next call should return empty + line = fp.readline() + assert line == "" + + def test_unsupported_encoding_raises_error(self, empty_file): + """Test that unsupported encoding raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + _ = FileReadBackwards(empty_file.name, encoding="not-supported-encoding") + class TestFileReadBackwardsAsContextManager: @pytest.fixture(scope="class", autouse=True)