Source code for dagster_github.resources

import time
from datetime import datetime

import jwt
import requests

from dagster import Field, IntSource, StringSource, resource


def to_seconds(dt):
    return (dt - datetime(1970, 1, 1)).total_seconds()


class GithubResource:
    def __init__(self, client, app_id, app_private_rsa_key, default_installation_id, hostname=None):
        self.client = client
        self.app_private_rsa_key = app_private_rsa_key
        self.app_id = app_id
        self.default_installation_id = default_installation_id
        self.installation_tokens = {}
        self.app_token = {}
        self.hostname = hostname

    def __set_app_token(self):
        # from https://developer.github.com/apps/building-github-apps/authenticating-with-github-apps/
        # needing to self-sign a JWT
        now = int(time.time())
        # JWT expiration time (10 minute maximum)
        expires = now + (10 * 60)
        encoded_token = jwt.encode(
            {
                # issued at time
                "iat": now,
                # JWT expiration time
                "exp": expires,
                # GitHub App's identifier
                "iss": self.app_id,
            },
            self.app_private_rsa_key,
            algorithm="RS256",
        )
        self.app_token = {
            "value": encoded_token,
            "expires": expires,
        }

    def __check_app_token(self):
        if ("expires" not in self.app_token) or (
            self.app_token["expires"] < (int(time.time()) + 60)
        ):
            self.__set_app_token()

    def get_installations(self, headers=None):
        if headers is None:
            headers = {}
        self.__check_app_token()
        headers["Authorization"] = "Bearer {}".format(self.app_token["value"])
        headers["Accept"] = "application/vnd.github.machine-man-preview+json"
        request = self.client.get(
            "https://api.github.com/app/installations"
            if self.hostname is None
            else "https://{}/api/v3/app/installations".format(self.hostname),
            headers=headers,
        )
        request.raise_for_status()
        return request.json()

    def __set_installation_token(self, installation_id, headers=None):
        if headers is None:
            headers = {}
        self.__check_app_token()
        headers["Authorization"] = "Bearer {}".format(self.app_token["value"])
        headers["Accept"] = "application/vnd.github.machine-man-preview+json"
        request = requests.post(
            "https://api.github.com/app/installations/{}/access_tokens".format(installation_id)
            if self.hostname is None
            else "https://{}/api/v3/app/installations/{}/access_tokens".format(
                self.hostname, installation_id
            ),
            headers=headers,
        )
        request.raise_for_status()
        auth = request.json()
        self.installation_tokens[installation_id] = {
            "value": auth["token"],
            "expires": to_seconds(datetime.strptime(auth["expires_at"], "%Y-%m-%dT%H:%M:%SZ")),
        }

    def __check_installation_tokens(self, installation_id):
        if (installation_id not in self.installation_tokens) or (
            self.installation_tokens[installation_id]["expires"] < (int(time.time()) + 60)
        ):
            self.__set_installation_token(installation_id)

    def execute(self, query, variables, headers=None, installation_id=None):
        if headers is None:
            headers = {}
        if installation_id is None:
            installation_id = self.default_installation_id
        self.__check_installation_tokens(installation_id)
        headers["Authorization"] = "token {}".format(
            self.installation_tokens[installation_id]["value"]
        )
        request = requests.post(
            "https://api.github.com/graphql"
            if self.hostname is None
            else "https://{}/api/graphql".format(self.hostname),
            json={"query": query, "variables": variables},
            headers=headers,
        )
        request.raise_for_status()
        return request.json()

    def create_issue(self, repo_name, repo_owner, title, body, installation_id=None):
        if installation_id is None:
            installation_id = self.default_installation_id
        res = self.execute(
            query="""
            query get_repo_id($repo_name: String!, $repo_owner: String!) {
                repository(name: $repo_name, owner: $repo_owner) {
                    id
                }
            }
            """,
            variables={"repo_name": repo_name, "repo_owner": repo_owner},
            installation_id=installation_id,
        )

        return self.execute(
            query="""
                mutation CreateIssue($id: ID!, $title: String!, $body: String!) {
                createIssue(input: {
                    repositoryId: $id,
                    title: $title,
                    body: $body
                }) {
                    clientMutationId,
                    issue {
                        body
                        title
                        url
                    }
                }
                }
            """,
            variables={
                "id": res["data"]["repository"]["id"],
                "title": title,
                "body": body,
            },
            installation_id=installation_id,
        )


[docs]@resource( config_schema={ "github_app_id": Field( IntSource, description="Github Application ID, for more info see https://developer.github.com/apps/", ), "github_app_private_rsa_key": Field( StringSource, description="Github Application Private RSA key text, for more info see https://developer.github.com/apps/", ), "github_installation_id": Field( IntSource, is_required=False, description="Github Application Installation ID, for more info see https://developer.github.com/apps/", ), "github_hostname": Field( StringSource, is_required=False, description="Github hostname. Defaults to `api.github.com`, for more info see https://developer.github.com/apps/", ), }, description="This resource is for connecting to Github", ) def github_resource(context): return GithubResource( client=requests.Session(), app_id=context.resource_config["github_app_id"], app_private_rsa_key=context.resource_config["github_app_private_rsa_key"], default_installation_id=context.resource_config["github_installation_id"], hostname=context.resource_config.get("github_hostname", None), )