Add `get-stx.py` script to dowload Smalltalk/X "toy" archive
authorJan Vrany <jan.vrany@labware.com>
Tue, 02 Nov 2021 15:34:47 +0000
changeset 323 1836a0e14ddb
parent 322 d31ea885c8fa
child 324 980eac9a7a1d
Add `get-stx.py` script to dowload Smalltalk/X "toy" archive This script is Smalltalk/X jv-branch counterpart of Pharo's "zeroconf" scripts [1] which proved useful in various automated scripts (such as CI pipelines or alike). [1]: https://get.pharo.org/
bin/get-stx.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/get-stx.py	Tue Nov 02 15:34:47 2021 +0000
@@ -0,0 +1,193 @@
+#!/usr/bin/env python3
+
+"""
+This script downloads the latest Smalltalk/X jv-branch toy-archive.
+"""
+
+import sys
+import re
+import os
+import os.path
+from urllib.request import urlopen
+#from urllib2 import urlopen as urlopen2
+
+class Artifact:
+    def __init__(self, build, file, checksum = None):
+        self._build = build
+        self._file = file
+
+    def url(self):
+        return self._build.url() + '/' + self._file
+
+    def __repr__(self):
+        return "Artifact('%s')" % self.url()
+
+class PlainHTTPBuild:
+    def __init__(self, repo, name):
+        self._repo = repo
+        self._name = name
+
+    def url(self):
+        return self._repo.url() + '/' + self._name
+
+    def __repr__(self):
+        return "PlainHTTPBuild('%s')" % self.url()
+
+    def artifacts(self):
+        with urlopen(self.url()) as listing:
+            data = listing.read().decode()
+            matcher = re.compile('"(smalltalkx-jv-branch-\d+.\d+.\d+_build\d+[a-zA-Z0-9_\-]+\.(tar\.bz2|zip|7z|7zip)(\.sha(256|512))?)"')
+            files = [match.group(1) for match in re.finditer(matcher, data)]
+
+            checksums = []
+            artifacts = []
+
+            for file in files:
+                if file.endswith(".sha256") or file.endswith(".sha512"):
+                    checksums.append(file)
+                else:
+                    artifacts.append(file)
+
+            artifacts = [Artifact(self, file) for file in artifacts]
+            return artifacts
+
+    def artifact(self, pattern):
+        matcher = re.compile(pattern)
+        for artifact in self.artifacts():
+            if matcher.match(artifact._file):
+                return artifact
+        raise Exception("no artifact matching %s: %s" % ( pattern , self.url()))
+
+class PlainHTTPRepository:
+    def __init__(self, url = 'https://swing.fit.cvut.cz/download/smalltalkx/devel'):
+        self._url = url
+
+    def __repr__(self):
+        return "PlainHTTPRepository('%s')" % self._url
+
+    def url(self):
+        return self._url
+
+    def builds(self):
+        with urlopen(self._url) as listing:
+            data = listing.read().decode()
+            subdirs = re.compile('\d{4}-\d{2}-\d{2}_\d+').findall(data)
+            subdirs.sort()
+            builds = [PlainHTTPBuild(self , subdir) for subdir in subdirs]
+            return builds
+
+    def latest(self):
+        builds = self.builds()
+        if len(builds) == 0:
+            raise Exception("no builds found: %s" % self.url())
+        return self.builds()[-1]
+
+def basename_url(url):
+    return url.split('/')[-1]
+
+def download(url, file_name = None):
+    if file_name is None:
+        file_name = basename_url(url)
+    request = urlopen(url)
+    meta = request.info()
+    file_size = int(meta["Content-Length"])
+    print("Downloading: %s, size %s bytes" % (file_name, file_size))
+
+    try:
+        with open(file_name, 'wb') as file:
+            file_size_dl = 0
+            file_size_dl_percent = 0
+            file_size_dl_percent_reported = 0
+            block_sz = 1024*64 # download 64k chunks
+            while True:
+                buffer = request.read(block_sz)
+                if not buffer:
+                    break
+
+                file_size_dl += len(buffer)
+                file_size_dl_percent = file_size_dl * 100. / file_size
+                file.write(buffer)
+                if (file_size_dl_percent >= file_size_dl_percent_reported + 1) and sys.stdin.isatty():
+                    status = r"%10d  [%3.2f%%]" % (file_size_dl, file_size_dl_percent)
+                    status = status + chr(8)*(len(status)+1)
+                    print(status)
+                    file_size_dl_percent_reported = int(file_size_dl_percent)
+    except:
+        if os.path.exists(file_name):
+            os.remove(file_name)
+        raise
+
+def extract(url, dir_name = '.'):
+    def unpack_tar(tarfile, dir_name):
+        try:
+            tarfile_root_dir = None
+            unpacked = []
+            for member in tarfile:
+                if member.isdir():
+                    if tarfile_root_dir is None or len(tarfile_root_dir) > len(member.name):
+                        tarfile_root_dir = member.name
+                tarfile.extract(member, dir_name)
+            return os.path.join(dir_name, tarfile_root_dir)
+        except:
+            if tarfile_root_dir is not None and os.path.exists(tarfile_root_dir):
+                os.remove(tarfile_root_dir)
+            raise
+
+    file_name = os.path.join(dir_name, basename_url(url))
+    try:
+        if not os.path.exists(file_name):
+            download(url, file_name)
+
+        if file_name.endswith('.tar.bz2') or file_name.endswith('.tbz2'):
+            import tarfile
+            with tarfile.open(file_name, "r:bz2") as archive:
+                print("Extracting: %s" % file_name)
+                stx_dir = unpack_tar(archive, dir_name)
+        else:
+            raise Exception('archive type not supported')
+        if os.path.exists(stx_dir):
+            os.rename(stx_dir, os.path.join(dir_name, 'stx'))
+            print("")
+            print("Extracted Smalltalk/X into %s" % os.path.join(dir_name, 'stx'))
+    finally:
+        if os.path.exists(file_name):
+            os.remove(file_name)
+        pass
+
+def host_triplet():
+    import platform
+    uname = platform.uname()
+    if uname.machine == 'x86_64':
+        if uname.system == 'Linux':
+            return 'x86_64-pc-linux-gnu'
+    raise Exception("unsupported host type: %s" % uname)
+
+def get_stx(repo, dir_name):
+    if not os.path.exists(dir_name):
+        raise Exception("no such directory: %s" % dir_name)
+    elif not os.path.isdir(dir_name):
+        raise Exception("not a directory: %s" % dir_name)
+    elif os.path.exists(os.path.join(dir_name, 'stx')):
+        raise Exception("file or directory already exists: %s" % os.path.join(dir_name, 'stx'))
+
+    build = repo.latest()
+    artifact = build.artifact('smalltalkx-jv-branch-\d+.\d+.\d+_build\d+_%s\.' % host_triplet())
+    extract(artifact.url(), dir_name)
+
+if __name__ == '__main__':
+    import argparse
+    import sys
+    parser = argparse.ArgumentParser(description=__doc__)
+    parser.add_argument("directory",
+                      nargs='?',
+                      help="directory where download and extract Smalltalk/X jv-branch (default is current directory)",
+                      default='.')
+    options = parser.parse_args(sys.argv[1:])
+
+    repo = PlainHTTPRepository()
+    dir_name = options.directory
+
+    try:
+        get_stx(repo, dir_name)
+    except Exception as ex:
+        sys.stderr.write("ERROR: %s\n" % str(ex))