forked from marcboeker/gmail-to-sqlite
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync.py
More file actions
122 lines (92 loc) · 3.44 KB
/
sync.py
File metadata and controls
122 lines (92 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from email.utils import parseaddr, parsedate_to_datetime
from googleapiclient.discovery import build
from peewee import IntegrityError
import db
import message
MAX_RESULTS = 500
def get_labels(service) -> dict:
"""
Retrieves all labels from the Gmail API for the authenticated user.
Args:
service (object): The Gmail API service object.
Returns:
dict: A dictionary containing the labels, where the key is the label ID and the value is the label name.
"""
# Get all labels
labels = {}
for label in service.users().labels().list(userId="me").execute()["labels"]:
labels[label["id"]] = label["name"]
return labels
def all_messages(credentials, full_sync=False) -> int:
"""
Fetches messages from the Gmail API using the provided credentials.
Args:
credentials (object): The credentials object used to authenticate the API request.
full_sync (bool): Whether to do a full sync or not.
Returns:
int: The number of messages fetched.
"""
query = []
if not full_sync:
last = db.last_indexed()
if last:
query.append(f"after:{int(last.timestamp())}")
first = db.first_indexed()
if first:
query.append(f"before:{int(first.timestamp())}")
service = build("gmail", "v1", credentials=credentials)
labels = get_labels(service)
page_token = None
run = True
total_messages = 0
while run:
results = (
service.users()
.messages()
.list(
userId="me",
maxResults=MAX_RESULTS,
pageToken=page_token,
q=" | ".join(query),
)
.execute()
)
messages = results.get("messages", [])
total_messages += len(messages)
for i, m in enumerate(messages, start=total_messages - len(messages) + 1):
try:
raw_msg = (
service.users().messages().get(userId="me", id=m["id"]).execute()
)
msg = message.Message.from_raw(raw_msg, labels)
db.create_message(msg)
except IntegrityError as e:
print(f"Could not process message {m['id']}: {str(e)}")
except TimeoutError as e:
print(f"Could not get message from Gmail {m['id']}: {str(e)}")
print(f"Synced message {msg.id} from {msg.timestamp} (Count: {i})")
if "nextPageToken" in results:
page_token = results["nextPageToken"]
else:
run = False
return total_messages
def single_message(credentials, message_id: str) -> None:
"""
Syncs a single message from Gmail using the provided credentials and message ID.
Args:
credentials: The credentials used to authenticate the Gmail API.
message_id: The ID of the message to fetch.
Returns:
None
"""
service = build("gmail", "v1", credentials=credentials)
labels = get_labels(service)
try:
raw_msg = service.users().messages().get(userId="me", id=message_id).execute()
msg = message.Message.from_raw(raw_msg, labels)
db.create_message(msg)
except IntegrityError as e:
print(f"Could not process message {message_id}: {str(e)}")
except TimeoutError as e:
print(f"Could not get message from Gmail {message_id}: {str(e)}")
print(f"Synced message {message_id} from {msg.timestamp}")