aboutsummaryrefslogtreecommitdiff
path: root/wa/utils/postgres_convert.py
blob: 3a983204f6feead7440608cde813f0eec4b421b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#    Copyright 2018 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module contains additional casting and adaptation functions for several
different datatypes and metadata types for use with the psycopg2 module. The
casting functions will transform Postgresql data types into Python objects, and
the adapters the reverse. They are named this way according to the psycopg2
conventions.

For more information about the available adapters and casters in the standard
psycopg2 module, please see:

http://initd.org/psycopg/docs/extensions.html#sql-adaptation-protocol-objects

"""

import re

try:
    from psycopg2 import InterfaceError
    from psycopg2.extensions import AsIs
except ImportError:
    InterfaceError = None
    AsIs = None

from wa.utils.types import level


def cast_level(value, cur):  # pylint: disable=unused-argument
    """Generic Level caster for psycopg2"""
    if not InterfaceError:
        raise ImportError('There was a problem importing psycopg2.')
    if value is None:
        return None

    m = re.match(r"([^\()]*)\((\d*)\)", value)
    name = str(m.group(1))
    number = int(m.group(2))

    if m:
        return level(name, number)
    else:
        raise InterfaceError("Bad level representation: {}".format(value))


def cast_vanilla(value, cur):  # pylint: disable=unused-argument
    """Vanilla Type caster for psycopg2

    Simply returns the string representation.
    """
    if value is None:
        return None
    else:
        return str(value)


# List functions and classes for adapting

def adapt_level(a_level):
    """Generic Level Adapter for psycopg2"""
    return "{}({})".format(a_level.name, a_level.value)


class ListOfLevel(object):
    value = None

    def __init__(self, a_level):
        self.value = a_level

    def return_original(self):
        return self.value


def adapt_ListOfX(adapt_X):
    """This will create a multi-column adapter for a particular type.

    Note that the type must itself need to be in array form. Therefore
    this function serves to seaprate out individual lists into multiple
    big lists.
    E.g. if the X adapter produces array (a,b,c)
    then this adapter will take an list of Xs and produce a master array:
    ((a1,a2,a3),(b1,b2,b3),(c1,c2,c3))

    Takes as its argument the adapter for the type which must produce an
    SQL array string.
    Note that you should NOT put the AsIs in the adapt_X function.

    The need for this function arises from the fact that we may want to
    actually handle list-creating types differently if they themselves
    are in a list, as in the example above, we cannot simply adopt a
    recursive strategy.

    Note that master_list is the list representing the array. Each element
    in the list will represent a subarray (column). If there is only one
    subarray following processing then the outer {} are stripped to give a
    1 dimensional array.
    """
    def adapter_function(param):
        if not AsIs:
            raise ImportError('There was a problem importing psycopg2.')
        param = param.value
        result_list = []
        for element in param:  # Where param will be a list of X's
            result_list.append(adapt_X(element))
        test_element = result_list[0]
        num_items = len(test_element.split(","))
        master_list = []
        for x in range(num_items):
            master_list.append("")
        for element in result_list:
            element = element.strip("{").strip("}")
            element = element.split(",")
            for x in range(num_items):
                master_list[x] = master_list[x] + element[x] + ","
        if num_items > 1:
            master_sql_string = "{"
        else:
            master_sql_string = ""
        for x in range(num_items):
            # Remove trailing comma
            master_list[x] = master_list[x].strip(",")
            master_list[x] = "{" + master_list[x] + "}"
            master_sql_string = master_sql_string + master_list[x] + ","
        master_sql_string = master_sql_string.strip(",")
        if num_items > 1:
            master_sql_string = master_sql_string + "}"
        return AsIs("'{}'".format(master_sql_string))
    return adapter_function


def return_as_is(adapt_X):
    """Returns the AsIs appended function of the function passed

    This is useful for adapter functions intended to be used with the
    adapt_ListOfX function, which must return strings, as it allows them
    to be standalone adapters.
    """
    if not AsIs:
        raise ImportError('There was a problem importing psycopg2.')

    def adapter_function(param):
        return AsIs("'{}'".format(adapt_X(param)))
    return adapter_function


def adapt_vanilla(param):
    """Vanilla adapter: simply returns the string representation"""
    if not AsIs:
        raise ImportError('There was a problem importing psycopg2.')
    return AsIs("'{}'".format(param))


def create_iterable_adapter(array_columns, explicit_iterate=False):
    """Create an iterable adapter of a specified dimension

    If explicit_iterate is True, then it will be assumed that the param needs
    to be iterated upon via param.iteritems(). Otherwise it will simply be
    iterated vanilla.
    The value of array_columns will be equal to the number of indexed elements
    per item in the param iterable. E.g. a list of 3-element-long lists has
    3 elements per item in the iterable (the master list) and therefore
    array_columns should be equal to 3.
    If array_columns is 0, then this indicates that the iterable contains
    single items.
    """
    if not AsIs:
        raise ImportError('There was a problem importing psycopg2.')

    def adapt_iterable(param):
        """Adapts an iterable object into an SQL array"""
        final_string = ""  # String stores a string representation of the array
        if param:
            if array_columns > 1:
                for index in range(array_columns):
                    array_string = ""
                    for item in param.iteritems():
                        array_string = array_string + str(item[index]) + ","
                    array_string = array_string.strip(",")
                    array_string = "{" + array_string + "}"
                    final_string = final_string + array_string + ","
                final_string = final_string.strip(",")
                final_string = "{" + final_string + "}"
            else:
                # Simply return each item in the array
                if explicit_iterate:
                    for item in param.iteritems():
                        final_string = final_string + str(item) + ","
                else:
                    for item in param:
                        final_string = final_string + str(item) + ","
                final_string = "{" + final_string + "}"
        return AsIs("'{}'".format(final_string))
    return adapt_iterable


# For reference only and future use
def adapt_list(param):
    """Adapts a list into an array"""
    if not AsIs:
        raise ImportError('There was a problem importing psycopg2.')
    final_string = ""
    if param:
        for item in param:
            final_string = final_string + str(item) + ","
        final_string = "{" + final_string + "}"
    return AsIs("'{}'".format(final_string))