This repository was archived by the owner on Nov 27, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathzaimapi.py
More file actions
137 lines (103 loc) · 4.58 KB
/
zaimapi.py
File metadata and controls
137 lines (103 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# coding: utf-8
import urlparse
import requests
from requests_oauthlib import OAuth1
# Zaim API ver 0.9.1
API_ROOT = "https://api.zaim.net/v1/"
request_token_url = API_ROOT + "auth/request"
authorize_url = "https://www.zaim.net/users/auth"
access_token_url = API_ROOT + "auth/access"
class Zaim(object):
def __init__(self, consumer_key, consumer_secret, access_token_key=None, access_token_secret=None):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.set_access_token(access_token_key, access_token_secret)
def set_access_token(self, access_token_key, access_token_secret):
self.access_token_key = access_token_key
self.access_token_secret = access_token_secret
def get_request_token(self, callback_url=u"http://oob/"):
auth = OAuth1(self.consumer_key, self.consumer_secret, callback_uri=callback_url)
r = requests.post(request_token_url, auth=auth)
r.raise_for_status()
request_token = dict(urlparse.parse_qsl(r.text))
return request_token
def get_authorize_url(self, request_token):
return "{0}?oauth_token={1}".format(authorize_url, request_token["oauth_token"])
def get_access_token(self, request_token, oauth_verifier):
auth = OAuth1(self.consumer_key, self.consumer_secret, request_token["oauth_token"], request_token["oauth_token_secret"], verifier=oauth_verifier)
r = requests.post(access_token_url, auth=auth)
r.raise_for_status()
access_token = dict(urlparse.parse_qsl(r.text))
return access_token
def get_pay_genres(self, lang=None):
endpoint = API_ROOT + "genre/pay.json"
data = None
if lang:
data = {"lang": lang}
r = requests.post(endpoint, data=data)
r.raise_for_status()
return r.json()["genres"]
def get_income_categories(self, lang=None):
endpoint = API_ROOT + "category/income.json"
data = None
if lang:
data = {"lang": lang}
r = requests.post(endpoint, data=data)
r.raise_for_status()
return r.json()["categories"]
def get_pay_categories(self, lang=None):
endpoint = API_ROOT + "category/pay.json"
data = None
if lang:
data = {"lang": lang}
r = requests.post(endpoint, data=data)
r.raise_for_status()
return r.json()["categories"]
def get_user_info(self):
endpoint = API_ROOT + "user/verify_credentials.json"
auth = OAuth1(self.consumer_key, self.consumer_secret, self.access_token_key, self.access_token_secret)
r = requests.post(endpoint, auth=auth)
r.raise_for_status()
return r.json()["user"]
def get_currencies(self):
endpoint = API_ROOT + "currency/index.json"
r = requests.post(endpoint)
r.raise_for_status()
return r.json()["currencies"]
def get_currency_sign(self, currency_code):
currencies = self.get_currencies()
for d in currencies:
if d["currency_code"] == currency_code:
return d["unit"]
raise RuntimeError("Invalid currency code")
def create_pay(self, **params):
endpoint = API_ROOT + "pay/create.json"
params = {
"category_id": params["pay_genre"][:3],
"genre_id": params["pay_genre"],
"price": unicode(params["price"]),
"date": params["date"].strftime("%Y-%m-%d") if params["date"] else "",
"comment": params["comment"],
}
auth = OAuth1(self.consumer_key, self.consumer_secret, self.access_token_key, self.access_token_secret)
r = requests.post(endpoint, data=params, auth=auth)
r.raise_for_status()
return r.json()
def create_income(self, **params):
endpoint = API_ROOT + "income/create.json"
params = {
"category_id": params["income_category"],
"price": unicode(params["price"]),
"date": params["date"].strftime("%Y-%m-%d") if params["date"] else "",
"comment": params["comment"],
}
auth = OAuth1(self.consumer_key, self.consumer_secret, self.access_token_key, self.access_token_secret)
r = requests.post(endpoint, data=params, auth=auth)
r.raise_for_status()
return r.json()
def get_money_records(self):
endpoint = API_ROOT + "money/index.json"
auth = OAuth1(self.consumer_key, self.consumer_secret, self.access_token_key, self.access_token_secret)
r = requests.post(endpoint, auth=auth)
r.raise_for_status()
return r.json()["money"]