Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import base64 
  2  import datetime 
  3  import functools 
  4  from functools import wraps, partial 
  5   
  6  from netaddr import IPAddress, IPNetwork 
  7  import re 
  8  import flask 
  9   
 10  from openid_teams.teams import TeamsRequest 
 11   
 12  from coprs import app 
 13  from coprs import db 
 14  from coprs import helpers 
 15  from coprs import models 
 16  from coprs import oid 
 17  from coprs.logic.complex_logic import ComplexLogic 
 18  from coprs.logic.users_logic import UsersLogic 
 19  from coprs.logic.coprs_logic import CoprsLogic 
20 21 22 -def fed_openidize_name(name):
23 """ 24 Create proper Fedora OpenID name from short name. 25 26 >>> fedoraoid == fed_openidize_name(user.name) 27 True 28 """ 29 30 return "http://{0}.id.fedoraproject.org/".format(name)
31
32 33 -def create_user_wrapper(username, email, timezone=None):
34 expiration_date_token = datetime.date.today() + \ 35 datetime.timedelta( 36 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 37 38 copr64 = base64.b64encode(b"copr") + b"##" 39 user = models.User(username=username, mail=email, 40 timezone=timezone, 41 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 42 app.config["API_TOKEN_LENGTH"] - len(copr64)), 43 api_token=helpers.generate_api_token( 44 app.config["API_TOKEN_LENGTH"]), 45 api_token_expiration=expiration_date_token) 46 return user
47
48 49 -def fed_raw_name(oidname):
50 return oidname.replace(".id.fedoraproject.org/", "") \ 51 .replace("http://", "")
52
53 54 -def krb_strip_realm(fullname):
55 return re.sub(r'@.*', '', fullname)
56
57 58 @app.before_request 59 -def set_empty_user():
60 flask.g.user = None
61
62 63 @app.before_request 64 -def lookup_current_user():
65 flask.g.user = username = None 66 if "openid" in flask.session: 67 username = fed_raw_name(flask.session["openid"]) 68 elif "krb5_login" in flask.session: 69 username = flask.session["krb5_login"] 70 71 if username: 72 flask.g.user = models.User.query.filter( 73 models.User.username == username).first()
74
75 76 @app.errorhandler(404) 77 -def page_not_found(message):
78 return flask.render_template("404.html", message=message), 404
79
80 81 @app.errorhandler(403) 82 -def access_restricted(message):
83 return flask.render_template("403.html", message=message), 403
84
85 86 -def generic_error(message, code=500, title=None):
87 """ 88 :type message: str 89 :type err: CoprHttpException 90 """ 91 return flask.render_template("_error.html", 92 message=message, 93 error_code=code, 94 error_title=title), code
95 96 97 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 98 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 99 100 app.errorhandler(500)(server_error_handler) 101 app.errorhandler(400)(bad_request_handler) 102 103 misc = flask.Blueprint("misc", __name__) 104 105 106 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
107 -def krb5_login(name):
108 """ 109 Handle the Kerberos authentication. 110 111 Note that if we are able to get here, either the user is authenticated 112 correctly, or apache is mis-configured and it does not perform KRB 113 authentication at all. Note also, even if that can be considered ugly, we 114 are reusing oid's get_next_url feature with kerberos login. 115 """ 116 117 # Already logged in? 118 if flask.g.user is not None: 119 return flask.redirect(oid.get_next_url()) 120 121 krb_config = app.config['KRB5_LOGIN'] 122 123 found = None 124 for key in krb_config.keys(): 125 if krb_config[key]['URI'] == name: 126 found = key 127 break 128 129 if not found: 130 # no KRB5_LOGIN.<name> configured in copr.conf 131 return flask.render_template("404.html"), 404 132 133 if 'REMOTE_USER' not in flask.request.environ: 134 nocred = "Kerberos authentication failed (no credentials provided)" 135 return flask.render_template("403.html", message=nocred), 403 136 137 krb_username = flask.request.environ['REMOTE_USER'] 138 username = krb_strip_realm(krb_username) 139 140 krb_login = ( 141 models.Krb5Login.query 142 .filter(models.Krb5Login.config_name == key) 143 .filter(models.Krb5Login.primary == username) 144 .first() 145 ) 146 if krb_login: 147 flask.g.user = krb_login.user 148 flask.session['krb5_login'] = krb_login.user.name 149 flask.flash(u"Welcome, {0}".format(flask.g.user.name)) 150 return flask.redirect(oid.get_next_url()) 151 152 # We need to create row in 'krb5_login' table 153 user = models.User.query.filter(models.User.username == username).first() 154 if not user: 155 # Even the item in 'user' table does not exist, create _now_ 156 email = username + "@" + krb_config[key]['email_domain'] 157 user = create_user_wrapper(username, email) 158 db.session.add(user) 159 160 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 161 db.session.add(krb_login) 162 db.session.commit() 163 164 flask.flash(u"Welcome, {0}".format(user.name)) 165 flask.g.user = user 166 flask.session['krb5_login'] = user.name 167 return flask.redirect(oid.get_next_url())
168 169 170 @misc.route("/login/", methods=["GET"])
171 @oid.loginhandler 172 -def login():
173 if flask.g.user is not None: 174 return flask.redirect(oid.get_next_url()) 175 else: 176 # a bit of magic 177 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 178 return oid.try_login("https://id.fedoraproject.org/", 179 ask_for=["email", "timezone"], 180 extensions=[team_req])
181
182 183 @oid.after_login 184 -def create_or_login(resp):
185 flask.session["openid"] = resp.identity_url 186 fasusername = resp.identity_url.replace( 187 ".id.fedoraproject.org/", "").replace("http://", "") 188 189 # kidding me.. or not 190 if fasusername and ( 191 ( 192 app.config["USE_ALLOWED_USERS"] and 193 fasusername in app.config["ALLOWED_USERS"] 194 ) or not app.config["USE_ALLOWED_USERS"]): 195 196 username = fed_raw_name(resp.identity_url) 197 user = models.User.query.filter( 198 models.User.username == username).first() 199 if not user: # create if not created already 200 user = create_user_wrapper(username, resp.email, resp.timezone) 201 else: 202 user.mail = resp.email 203 user.timezone = resp.timezone 204 if "lp" in resp.extensions: 205 team_resp = resp.extensions['lp'] # name space for the teams extension 206 user.openid_groups = {"fas_groups": team_resp.teams} 207 208 db.session.add(user) 209 db.session.commit() 210 flask.flash(u"Welcome, {0}".format(user.name)) 211 flask.g.user = user 212 213 if flask.request.url_root == oid.get_next_url(): 214 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 215 username=user.name)) 216 return flask.redirect(oid.get_next_url()) 217 else: 218 flask.flash("User '{0}' is not allowed".format(fasusername)) 219 return flask.redirect(oid.get_next_url())
220
221 222 @misc.route("/logout/") 223 -def logout():
224 flask.session.pop("openid", None) 225 flask.session.pop("krb5_login", None) 226 flask.flash(u"You were signed out") 227 return flask.redirect(oid.get_next_url())
228
229 230 -def api_login_required(f):
231 @functools.wraps(f) 232 def decorated_function(*args, **kwargs): 233 token = None 234 apt_login = None 235 if "Authorization" in flask.request.headers: 236 base64string = flask.request.headers["Authorization"] 237 base64string = base64string.split()[1].strip() 238 userstring = base64.b64decode(base64string) 239 (apt_login, token) = userstring.decode("utf-8").split(":") 240 token_auth = False 241 if token and apt_login: 242 user = UsersLogic.get_by_api_login(apt_login).first() 243 if (user and user.api_token == token and 244 user.api_token_expiration >= datetime.date.today()): 245 246 token_auth = True 247 flask.g.user = user 248 if not token_auth: 249 output = { 250 "output": "notok", 251 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(app.config["PUBLIC_COPR_HOSTNAME"]), 252 } 253 jsonout = flask.jsonify(output) 254 jsonout.status_code = 500 255 return jsonout 256 return f(*args, **kwargs)
257 return decorated_function 258
259 260 -def login_required(role=helpers.RoleEnum("user")):
261 def view_wrapper(f): 262 @functools.wraps(f) 263 def decorated_function(*args, **kwargs): 264 if flask.g.user is None: 265 return flask.redirect(flask.url_for("misc.login", 266 next=flask.request.url)) 267 268 if role == helpers.RoleEnum("admin") and not flask.g.user.admin: 269 flask.flash("You are not allowed to access admin section.") 270 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 271 272 return f(*args, **kwargs)
273 return decorated_function 274 # hack: if login_required is used without params, the "role" parameter 275 # is in fact the decorated function, so we need to return 276 # the wrapped function, not the wrapper 277 # proper solution would be to use login_required() with parentheses 278 # everywhere, even if they"re empty - TODO 279 if callable(role): 280 return view_wrapper(role) 281 else: 282 return view_wrapper 283
284 285 # backend authentication 286 -def backend_authenticated(f):
287 @functools.wraps(f) 288 def decorated_function(*args, **kwargs): 289 auth = flask.request.authorization 290 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 291 return "You have to provide the correct password\n", 401 292 293 return f(*args, **kwargs)
294 return decorated_function 295
296 297 -def intranet_required(f):
298 @functools.wraps(f) 299 def decorated_function(*args, **kwargs): 300 ip_addr = IPAddress(flask.request.remote_addr) 301 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 302 accept_ranges.add("127.0.0.1") # always accept from localhost 303 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 304 return ("Stats can be update only from intranet hosts, " 305 "not {}, check config\n".format(flask.request.remote_addr)), 403 306 307 return f(*args, **kwargs)
308 return decorated_function 309
310 311 -def req_with_copr(f):
312 @wraps(f) 313 def wrapper(**kwargs): 314 coprname = kwargs.pop("coprname") 315 if "group_name" in kwargs: 316 group_name = kwargs.pop("group_name") 317 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 318 else: 319 username = kwargs.pop("username") 320 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 321 return f(copr, **kwargs)
322 return wrapper 323
324 325 @misc.route("/migration-report/") 326 @misc.route("/migration-report/<username>") 327 -def coprs_migration_report(username=None):
328 if not username and not flask.g.user: 329 return generic_error("You are not logged in") 330 elif not username: 331 username = flask.g.user.name 332 user = UsersLogic.get(username).first() 333 334 coprs = CoprsLogic.filter_without_group_projects(CoprsLogic.get_multiple_owned_by_username(username)).all() 335 for group in UsersLogic.get_groups_by_fas_names_list(user.user_teams).all(): 336 coprs.extend(CoprsLogic.get_multiple_by_group_id(group.id).all()) 337 338 return render_migration_report(coprs, user=user)
339
340 341 @misc.route("/migration-report/g/<group_name>") 342 -def group_coprs_migration_report(group_name=None):
343 group = ComplexLogic.get_group_by_name_safe(group_name) 344 coprs = CoprsLogic.get_multiple_by_group_id(group.id) 345 return render_migration_report(coprs, group=group)
346
347 348 -def render_migration_report(coprs, user=None, group=None):
349 return flask.render_template("migration-report.html", 350 user=user, 351 group=group, 352 coprs=coprs)
353