22
33class StreamCounter (object ):
44 '''
5+ A class whose responsibility is to get the count of items
6+ in data comming as a stream.
57 '''
6- #TODO Doctests and examples
8+ #TODO Doctests and examples
9+ # When we receive a stream of data, we fix the max size of chunk
10+ # Think of chunk as a container, which can only fit a fixed no. of items
11+ # This will help us to keep control over RAM usage
712 DEFAULT_CHUNK_SIZE = 1000000
13+ # When we have a container, we also want to count the occurences of items
14+ # Max count will be maximum occurence of an item
815 DEFAULT_MAX_COUNTS = 1000000
916
1017 def __init__ (self , chunk_size = DEFAULT_CHUNK_SIZE ,
@@ -38,9 +45,40 @@ def __init__(self, chunk_size=DEFAULT_CHUNK_SIZE,
3845 self .counts_total = 0
3946
4047 def add (self , item , count = 1 ):
41- self .counts [item ] += count
42- self .counts_total += count
43-
48+ '''
49+ When we receive stream of data, we add them in the chunk
50+ which has limit on the no. of items that it will store.
51+ >>> s = StreamCounter(5,5)
52+ >>> data_stream = ['a','b','c','d']
53+ >>> for item in data_stream:
54+ ... s.add(item)
55+ >>> s.chunk_size
56+ 5
57+ >>> s.n_items_seen
58+ 4
59+ >>> s.n_chunk_items_seen
60+ 4
61+ >>> s.n_chunks
62+ 0
63+ >>> from pprint import pprint
64+ >>> pprint(s.chunked_counts.get(s.n_chunks, {}))
65+ {'a': 1, 'b': 1, 'c': 1, 'd': 1}
66+ >>> s.counts_total
67+ 4
68+ >>> data_stream = ['a','b','c','d','e','f','g','e']
69+ >>> for item in data_stream:
70+ ... s.add(item)
71+ >>> s.chunk_size
72+ 5
73+ >>> s.n_items_seen
74+ 12
75+ >>> s.n_chunk_items_seen
76+ 2
77+ >>> s.n_chunks
78+ 2
79+ >>> s.chunked_counts.get(s.n_chunks, {})
80+ {'g': 1, 'e': 1}
81+ '''
4482 self .n_items_seen += count
4583 self .n_chunk_items_seen += count
4684
@@ -67,6 +105,27 @@ def add(self, item, count=1):
67105 self ._drop_oldest_chunk ()
68106
69107 def _drop_oldest_chunk (self ):
108+ '''
109+ To handle the case when the items comming in the chunk
110+ is more than the maximum capacity of the chunk. Our intent
111+ behind is to remove the oldest chunk. So that the items come
112+ flowing in.
113+ >>> s = StreamCounter(5,5)
114+ >>> data_stream = ['a','b','c','d']
115+ >>> for item in data_stream:
116+ ... s.add(item)
117+ >>> min(s.chunked_counts.keys())
118+ 0
119+ >>> s.chunked_counts
120+ {0: {'a': 1, 'b': 1, 'c': 1, 'd': 1}}
121+ >>> data_stream = ['a','b','c','d','a','e','f']
122+ >>> for item in data_stream:
123+ ... s.add(item)
124+ >>> min(s.chunked_counts.keys())
125+ 2
126+ >>> s.chunked_counts
127+ {2: {'f': 1}}
128+ '''
70129 chunk_id = min (self .chunked_counts .keys ())
71130 chunk = self .chunked_counts .pop (chunk_id )
72131
@@ -76,6 +135,37 @@ def _drop_oldest_chunk(self):
76135 self .counts_total -= v
77136
78137 def get (self , item , default = 0 , normalized = False ):
138+ '''
139+ When we have the stream of data pushed in the chunk
140+ we can retrive count of an item using this method.
141+ >>> stream_counter_obj = StreamCounter(5,5)
142+ >>> data_stream = ['a','b','c']
143+ >>> for item in data_stream:
144+ ... stream_counter_obj.add(item)
145+ >>> stream_counter_obj.get('a')
146+ 1
147+ >>> stream_counter_obj.get('b')
148+ 1
149+ >>> stream_counter_obj.get('c')
150+ 1
151+ >>> stream_counter_obj.get('d')
152+ 0
153+ >>> data_stream.extend(['d','e','f'])
154+ >>> for item in data_stream:
155+ ... stream_counter_obj.add(item)
156+ >>> stream_counter_obj.get('a')
157+ 0
158+ >>> stream_counter_obj.get('b')
159+ 0
160+ >>> stream_counter_obj.get('c')
161+ 1
162+ >>> stream_counter_obj.get('d')
163+ 1
164+ >>> stream_counter_obj.get('e')
165+ 1
166+ >>> stream_counter_obj.get('f')
167+ 1
168+ '''
79169 c = self .counts .get (item , default )
80170 if not normalized :
81171 return c
0 commit comments