aboutsummaryrefslogtreecommitdiff
path: root/bigtop-packages/src/charm/zeppelin/layer-zeppelin/lib/charms/layer/bigtop_zeppelin.py
blob: 0bb545a7475143b025a784b64209a0b1746e0242 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
import time
import requests
import socket

from path import Path
from urllib.parse import urljoin

from charmhelpers.core import hookenv, host, unitdata
from charms import layer
from charms.layer.apache_bigtop_base import Bigtop
from charms.reactive import is_state
from jujubigdata import utils


class Zeppelin(object):
    """
    This class manages Zeppelin.
    """
    def __init__(self):
        self.dist_config = utils.DistConfig(
            data=layer.options('apache-bigtop-base'))

    def _add_override(self, name, value):
        unitdata.kv().update({
            name: value,
        }, prefix='zeppelin.bigtop.overrides.')

    def install(self):
        '''
        Perform initial one-time setup, workaround upstream bugs, and
        trigger puppet.
        '''
        # Dirs are handled by the bigtop deb, so no need to call out to
        # dist_config to do that work.  However, we want to adjust the
        # groups for the `ubuntu` user for better interaction with Juju.
        self.dist_config.add_users()

        # Set ports based on layer.yaml options
        self._add_override('zeppelin::server::server_port',
                           self.dist_config.port('zeppelin'))
        self._add_override('zeppelin::server::web_socket_port',
                           self.dist_config.port('zeppelin_web'))

        # Default spark to local mode on initial install. This will be
        # reconfigured if/when hadoop or spark relations are made.
        self._add_override('zeppelin::server::spark_master_url', 'local[*]')

        ##########
        # BUG: BIGTOP-2742
        # Default zeppelin init script looks for the literal '$(hostname)'
        # string. Symlink it so it exists before the apt install from puppet
        # tries to start the service.
        import subprocess
        host = subprocess.check_output(['hostname']).decode('utf8').strip()
        zepp_pid = '/var/run/zeppelin/zeppelin-zeppelin-{}.pid'.format(host)
        utils.run_as('root', 'mkdir', '-p', '/var/run/zeppelin')
        utils.run_as('root', 'ln', '-sf',
                     zepp_pid,
                     '/var/run/zeppelin/zeppelin-zeppelin-$(hostname).pid')
        ##########

        self.trigger_bigtop()

        ##########
        # BUG: BIGTOP-2742
        # Puppet apply will call systemctl daemon-reload, which removes the
        # symlink we just created. Now that the bits are on disk, update the
        # init script $(hostname) that caused this mess to begin with.
        zepp_init_script = '/etc/init.d/zeppelin'
        utils.re_edit_in_place(zepp_init_script, {
            r'^# pidfile.*': '# pidfile: {}'.format(zepp_pid),
        })
        utils.run_as('root', 'systemctl', 'daemon-reload')
        self.restart()
        self.wait_for_api(30)
        ##########

    def trigger_bigtop(self):
        '''
        Trigger the Bigtop puppet recipe that handles the Zeppelin service.
        '''
        bigtop = Bigtop()
        overrides = unitdata.kv().getrange('zeppelin.bigtop.overrides.',
                                           strip=True)

        # The zep deb depends on spark-core, spark-python, and unfortunately,
        # most of hadoop. Include appropriate roles here to ensure these
        # packages are configured in the same way as our other Bigtop
        # software deployed with puppet.
        bigtop.render_site_yaml(
            roles=[
                'spark-client',
                'spark-yarn-slave',
                'zeppelin-server',
            ],
            overrides=overrides,
        )

        bigtop.trigger_puppet()
        self.wait_for_api(30)

    def setup_etc_env(self):
        '''
        Write some niceties to /etc/environment
        '''
        # Configure system-wide bits
        zeppelin_bin = self.dist_config.path('zeppelin') / 'bin'
        zeppelin_conf = self.dist_config.path('zeppelin_conf')
        with utils.environment_edit_in_place('/etc/environment') as env:
            if zeppelin_bin not in env['PATH']:
                env['PATH'] = ':'.join([env['PATH'], zeppelin_bin])
            env['ZEPPELIN_CONF_DIR'] = zeppelin_conf

    def reconfigure_zeppelin(self):
        '''
        Configure zeppelin based on current environment
        '''
        raise NotImplementedError()
        # NB (kwm): this method is not currently called because Bigtop spark
        # doesn't expose these settings. Leaving this here just in case
        # we update the bigtop charms to provide these bits in the future.
        etc_env = utils.read_etc_env()
        hadoop_extra_classpath = etc_env.get('HADOOP_EXTRA_CLASSPATH', '')
        spark_driver_mem = etc_env.get('SPARK_DRIVER_MEMORY', '1g')
        spark_exe_mode = os.environ.get('MASTER', 'yarn-client')
        spark_executor_mem = etc_env.get('SPARK_EXECUTOR_MEMORY', '1g')
        zeppelin_env = self.dist_config.path('zeppelin_conf') / 'zeppelin-env.sh'
        with open(zeppelin_env, "a") as f:
            f.write('export ZEPPELIN_CLASSPATH_OVERRIDES={}\n'.format(hadoop_extra_classpath))
            f.write('export ZEPPELIN_JAVA_OPTS="-Dspark.driver.memory={} -Dspark.executor.memory={}"\n'.format(
                spark_driver_mem,
                spark_executor_mem))
            f.write('export SPARK_SUBMIT_OPTIONS="--driver-memory {} --executor-memory {}"\n'.format(
                spark_driver_mem,
                spark_executor_mem))
            f.write('export MASTER={}\n'.format(spark_exe_mode))

    def configure_hadoop(self):
        # create hdfs storage space
        utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/zeppelin')
        utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', 'zeppelin', '/user/zeppelin')

        # If spark is ready, let it handle the spark_master_url. Otherwise,
        # zepp is in local mode; set it to yarn-client since hadoop is here.
        if not is_state('spark.ready'):
            self._add_override('zeppelin::server::spark_master_url', 'yarn-client')
            self.trigger_bigtop()

    def configure_spark(self, master_url):
        '''
        Configure the zeppelin spark interpreter
        '''
        # TODO: Need Puppet params created to set Spark driver and executor memory
        self._add_override('zeppelin::server::spark_master_url', master_url)
        self.trigger_bigtop()

    def configure_hive(self, hive_url):
        '''
        Configure the zeppelin hive interpreter
        '''
        self._add_override('zeppelin::server::hiveserver2_url', hive_url)
        self.trigger_bigtop()

    def restart(self):
        self.stop()
        self.start()

    def start(self):
        host.service_start('zeppelin')

    def check_connect(self, addr, port):
        try:
            with socket.create_connection((addr, port), timeout=10):
                return True
        except OSError:
            return False

    def wait_for_api(self, timeout):
        start = time.time()
        while time.time() - start < timeout:
            if self.check_connect('localhost', self.dist_config.port('zeppelin')):
                return True
            time.sleep(2)
        raise utils.TimeoutError('Timed-out waiting for connection to Zeppelin')

    def stop(self):
        host.service_stop('zeppelin')

    def open_ports(self):
        for port in self.dist_config.exposed_ports('zeppelin'):
            hookenv.open_port(port)

    def close_ports(self):
        for port in self.dist_config.exposed_ports('zeppelin'):
            hookenv.close_port(port)

    def register_notebook(self, local_id, contents):
        api = ZeppelinAPI()
        kv = unitdata.kv()
        notebook_ids = kv.get('zeppelin.notebooks.ids', {})
        if local_id in notebook_ids:
            hookenv.log('Replacing notebook {} registered as {}'.format(
                local_id, notebook_ids[local_id]))
            api.delete_notebook(notebook_ids[local_id])
        zeppelin_id = api.import_notebook(contents)
        if zeppelin_id:
            notebook_ids[local_id] = zeppelin_id
            hookenv.log('Registered notebook {} as {}'.format(local_id,
                                                              zeppelin_id))
            return True
        else:
            hookenv.log('Unable to register notebook: {}'.format(local_id),
                        hookenv.ERROR)
            return False
        kv.set('zeppelin.notebooks.ids', notebook_ids)

    def remove_notebook(self, local_id):
        api = ZeppelinAPI()
        kv = unitdata.kv()
        notebook_ids = kv.get('zeppelin.notebooks.ids', {})
        if local_id in notebook_ids:
            api.delete_notebook(notebook_ids[local_id])
            del notebook_ids[local_id]
        else:
            hookenv.log('Notebook not registered: {}'.format(local_id),
                        hookenv.ERROR)
        kv.set('zeppelin.notebooks.ids', notebook_ids)

    def register_hadoop_notebooks(self):
        for notebook in ('hdfs-tutorial', 'flume-tutorial'):
            contents = (Path('resources') / notebook / 'note.json').text()
            self.register_notebook(notebook, contents)

    def remove_hadoop_notebooks(self):
        for notebook in ('hdfs-tutorial', 'flume-tutorial'):
            self.remove_notebook(notebook)


class ZeppelinAPI(object):
    """
    Helper for interacting with the Appache Zeppelin REST API.
    """
    def _url(self, *parts):
        dc = utils.DistConfig(
            data=layer.options('apache-bigtop-base'))
        url = 'http://localhost:{}/api/'.format(dc.port('zeppelin'))
        for part in parts:
            url = urljoin(url, part)
        return url

    def import_notebook(self, contents):
        response = requests.post(self._url('notebook'), data=contents)
        if response.status_code != 201:
            return None
        return response.json()['body']

    def delete_notebook(self, notebook_id):
        requests.delete(self._url('notebook/', notebook_id))

    def modify_interpreter(self, interpreter_name, properties):
        response = requests.get(self._url('interpreter/', 'setting'))
        try:
            body = response.json()['body']
        except json.JSONDecodeError:
            hookenv.log('Invalid response from API server: {} {}'.format(
                response, response.text), hookenv.ERROR)
            raise
        for interpreter_data in body:
            if interpreter_data['name'] == interpreter_name:
                break
        else:
            raise ValueError('Interpreter not found: {}'.format(
                interpreter_name))
        interpreter_data['properties'].update(properties)
        response = requests.put(self._url('interpreter/', 'setting/',
                                          interpreter_data['id']),
                                data=json.dumps(interpreter_data))
        if response.status_code != 200:
            raise ValueError('Unable to update interpreter: {}'.format(
                response.text))