-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate_amp_cache.py
More file actions
57 lines (47 loc) · 2.07 KB
/
Copy pathupdate_amp_cache.py
File metadata and controls
57 lines (47 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# import OpenSSL
from OpenSSL import crypto
import base64
import time
import re
import os,sys
import requests
update_cache_url="https://cdn.ampproject.org/caches.json"
privatekey_path = os.getenv("PRIVATE_KEY","private-key.pem")
privatekey = open(privatekey_path).read()
def keySign(data,private_key=privatekey):
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, private_key)
sign = crypto.sign(pkey, data.encode(), "sha256")
return base64.b64encode(sign).decode('UTF8').replace("/","_").replace("+","-").rstrip("=")
def get_flush_url(updateCacheApiDomainSuffix,url):
url = re.sub("^https?://","",url)
url = re.sub("[\?,#,&].+","",url)
domain = url.split("/",1)[0].replace(".","-")
amp_ts = int(time.time())
si_url = "/update-cache/c/s/{}?amp_action=flush&_ts={}".format(url,amp_ts)
amp_url_signature = keySign(si_url)
return "https://{}.{}{}&_url_signature={}".format(domain,updateCacheApiDomainSuffix,si_url,amp_url_signature)
def get_updateCacheApiDomainSuffix(url=update_cache_url):
r = requests.get(url)
r.close()
updateCacheApiDomainSuffix = {}
if r.status_code != 200:
sys.exit("ERROR: unable to get updateCacheApiDomainSuffix from utl: " + url)
resp = r.json()
for apiDomain in resp["caches"]:
updateCacheApiDomainSuffix[apiDomain["id"]] = apiDomain["updateCacheApiDomainSuffix"]
return updateCacheApiDomainSuffix
def updateCache(url,updateCacheApiDomains):
for k,v in updateCacheApiDomains.items():
if k == "bing":
continue
cache_url = get_flush_url(v,url)
r = requests.get(cache_url)
r.close()
if r.status_code != 200:
print("Error: status code {} when purging cache on {} \n\turl: {}\n\t{}".format(r.status_code,k ,update_cache_url,r.txt))
else:
print("Update cache Success on {}!\n\t{}".format(k,url))
if __name__ == '__main__':
if len(sys.argv) == 2:
updateCacheApiDomains = get_updateCacheApiDomainSuffix(update_cache_url)
updateCache(sys.argv[1],updateCacheApiDomains)